| Table of Contents |
|---|
The aggregation operator is an alternative implementation of Aggregate (and Group) operator. In particular for sliding time windows with advance of 1 this operator is faster than the implementation with partial aggregates.
...
group_by:An optional list of attributes over which the grouping should occur.aggregations:A list of aggregate functions (see below).SUPPRESS_FULL_META_DATA_HANDLING:Boolean flag set to true if the handling of meta data other than Time Interval (e.g. Latency) should be supressed.
The following optional boolean parameters control when a new aggregation value is transferred (see below for useful examples):
eval_at_new_element: Outputs an updated aggregation value when a new element gets valid. In the case that more than one element gets valid at the same time (same start timestamp), this operator outputs for each element an output value in the order of arrival. The default value istrue.eval_at_outdating: Outputs an updated aggregation value when one ore more elements gets invalid with the value after the removal of the invalid elements. The default value istrue.eval_before_remove_outdating: Outputs an updated aggregation value before removing the invalid elements instead of after removal. The default value isfalse.eval_at_done: Outputs the value at the time the operator gets the done signal. The default value isfalse.output_only_changes: Suppresses elements that are equal to the previous outputted element. The default value isfalse. If you want to use this, make sure the equals-method for every attribute type is implemented.
Aggregation Functions
| Function Name | Description | Parameters | Examples | ||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| Count | Outputs the number of steam elements. |
| ['FUNCTION' = 'Count']['FUNCTION' = 'Count', 'OUTPUT_ATTRIBUTES' = 'number_of_elements'] | ||||||||||||||
| DistinctCount | Output the numbers of different stream elements | ||||||||||||||||
| Sum | Outputs the sum of elements. |
| ['FUNCTION' = 'Sum'] | ['FUNCTION' = 'Sum', 'INPUT_ATTRIBUTES' = 'value1'] | ['FUNCTION' = 'Sum', 'INPUT_ATTRIBUTES' = ['value1', 'value2']] | ||||||||||||
| Avg | Average value (mean) | TODO (similar to Sum) | |||||||||||||||
| Min | Min value | TODO (similar to Sum) | |||||||||||||||
| Max | Max value | TODO (similar to Sum) | |||||||||||||||
| First | The first element of a window. See example below. |
| You should use the following settings: output_only_changes = true This results in getting the first element in each window. Especially useful with a tumbling window. | ||||||||||||||
| Last | The last element of a window. See example below. |
| You should use the following settings: EVAL_AT_NEW_ELEMENT = false This results in getting the last element in each window. Especially useful with a tumbling window. | ||||||||||||||
| Trigger | The tuple that triggers the output. | TODO | |||||||||||||||
| Variance | Calculates the variance | TODO (similar to Sum) | |||||||||||||||
| StdDev | Standard deviation | ||||||||||||||||
| TopK | Calculates the top-K list | TODO | |||||||||||||||
| Nest | Nests the valid elements as list. | TODO |
Examples
| If given more than one attribute, this will contain the tuple projected on the attributes | INPUT_ATTRIBUTES, required |
| |
| DistincNest | Same as nest, but removes duplicates | ||
A number of Univariate Statistics: GeometricMean Kurtosis Skewness IQR (InterQuantileRange) MCQ (MeanCrossingRate) RMS (RootMeanSquare) FrEnergy (SpectralEnergy) FrDmEntropy FrPeakFreq FrMag5 MSE (Mean Square Error) RMSE (Root Mean Square Error) | See e.g. https://commons.apache.org/proper/commons-math/javadocs/api-3.6.1/org/apache/commons/math3/stat/descriptive/UnivariateStatistic.html | [ 'FUNCTION' = 'RMSE', 'INPUT_ATTRIBUTES' = 'error' ] [ 'FUNCTION' = 'MSE', 'INPUT_ATTRIBUTES' = 'error' ] |
Operator-level parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
AGGREGATIONS | List of maps | – | Required. List of aggregation function configurations (see below). |
GROUP_BY | List of attributes | – | Optional grouping attributes. |
EVAL_AT_NEW_ELEMENT | Boolean | true | Output a result when a new element becomes valid. |
EVAL_AT_OUTDATING | Boolean | true | Output a result when elements expire. |
EVAL_BEFORE_REMOVE_OUTDATING | Boolean | false | If true, evaluate before removing expiring elements instead of after. |
EVAL_AT_DONE | Boolean | false | Output the last value when the operator receives the done signal. |
OUTPUT_ONLY_CHANGES | Boolean | false | Suppress output tuples that are equal to the previous output. |
SUPPRESS_FULL_META_DATA_HANDLING | Boolean | false | For non-incremental functions: skip full metadata handling. |
...
Standard Functions
AVG
Computes the arithmetic mean.
| Property | Value |
|---|---|
FUNCTION | AVG |
| Input | one or more numeric attributes |
| Output type | Double per input attribute |
| Incremental | yes |
AGGREGATIONS = [['FUNCTION'='AVG', 'INPUT_ATTRIBUTES'='temperature', 'OUTPUT_ATTRIBUTES'='avg_temperature']]
COUNT
Counts the number of tuples in the current window. No input attribute is required.
| Property | Value |
|---|---|
FUNCTION | COUNT |
| Input | any attribute (or *) |
| Output type | Long |
| Incremental | yes |
AGGREGATIONS = [['FUNCTION'='COUNT', 'INPUT_ATTRIBUTES'='x', 'OUTPUT_ATTRIBUTES'='count_x']]
SUM
Computes the sum of values.
| Property | Value |
|---|---|
FUNCTION | SUM |
| Input | one or more numeric attributes |
| Output type | Double per input attribute |
| Incremental | yes |
AGGREGATIONS = [['FUNCTION'='SUM', 'INPUT_ATTRIBUTES'='value', 'OUTPUT_ATTRIBUTES'='sum_value']]
MIN
Returns the minimum value. Maintained with a min-priority queue.
| Property | Value |
|---|---|
FUNCTION | MIN |
| Input | one or more numeric attributes |
| Output type | same as input |
| Incremental | yes |
AGGREGATIONS = [['FUNCTION'='MIN', 'INPUT_ATTRIBUTES'='temperature', 'OUTPUT_ATTRIBUTES'='min_temperature']]
MAX
Returns the maximum value. Maintained with a max-priority queue.
| Property | Value |
|---|---|
FUNCTION | MAX |
| Input | one or more numeric attributes |
| Output type | same as input |
| Incremental | yes |
AGGREGATIONS = [['FUNCTION'='MAX', 'INPUT_ATTRIBUTES'='temperature', 'OUTPUT_ATTRIBUTES'='max_temperature']]
STDDEV
Population standard deviation, computed incrementally via Welford's online algorithm.
| Property | Value |
|---|---|
FUNCTION | STDDEV |
| Input | one or more numeric attributes |
| Output type | Double per input attribute |
| Incremental | yes |
AGGREGATIONS = [['FUNCTION'='STDDEV', 'INPUT_ATTRIBUTES'='x', 'OUTPUT_ATTRIBUTES'='stddev_x']]
VARIANCE
Population variance, computed incrementally via Welford's online algorithm.
| Property | Value |
|---|---|
FUNCTION | VARIANCE |
| Input | one or more numeric attributes |
| Output type | Double per input attribute |
| Incremental | yes |
AGGREGATIONS = [['FUNCTION'='VARIANCE', 'INPUT_ATTRIBUTES'='x', 'OUTPUT_ATTRIBUTES'='var_x']]
NEST
Collects all tuples of the current window into a list. The output attribute holds a List<Tuple>.
| Property | Value |
|---|---|
FUNCTION | NEST |
| Input | one or more attributes, or * for all |
| Output type | List<Tuple> |
| Incremental | yes |
Additional parameters:
| Key | Type | Default | Description |
|---|---|---|---|
PRESERVE_ORDERING | Boolean | false | Keep tuples in insertion order (uses LinkedHashSet internally). |
SORT | Boolean | false | Sort the tuples in the list (uses TreeSet internally). |
UNIQUE_ATTR | Attribute name | – | If set, only the most recent tuple per unique value of this attribute is kept. |
AGGREGATIONS = [['FUNCTION'='NEST', 'INPUT_ATTRIBUTES'='*', 'OUTPUT_ATTRIBUTES'='nested',
'PRESERVE_ORDERING'=true]]
DISTINCT_NEST
Like NEST, but eliminates duplicate tuples based on the projected attribute values.
| Property | Value |
|---|---|
FUNCTION | DISTINCT_NEST |
| Input | one or more attributes, or * for all |
| Output type | List<Tuple> |
| Incremental | yes |
Additional parameters: same as NEST (PRESERVE_ORDERING, SORT, UNIQUE_ATTR).
AGGREGATIONS = [['FUNCTION'='DISTINCT_NEST', 'INPUT_ATTRIBUTES'='sensorId', 'OUTPUT_ATTRIBUTES'='distinct_sensors']]
...
DISTINCT_COUNT
Counts the number of distinct tuples in the current window (based on the projected attribute values). Internally delegates to DISTINCT_NEST and returns its size.
| Property | Value |
|---|---|
FUNCTION | DISTINCT_COUNT |
| Input | one or more attributes |
| Output type | Integer |
| Incremental | yes |
Additional parameters:
| Key | Type | Default | Description |
|---|---|---|---|
UNIQUE_ATTR | Attribute name | – | The attribute(s) used to determine uniqueness. |
AGGREGATIONS = [['FUNCTION'='DISTINCT_COUNT', 'INPUT_ATTRIBUTES'='sensorId',
'OUTPUT_ATTRIBUTES'='num_distinct', 'UNIQUE_ATTR'='sensorId']]
...
FIRST
Returns the temporally first tuple of the current window as a nested tuple. The window must provide elements in start-timestamp order. Especially useful with tumbling windows.
| Property | Value |
|---|---|
FUNCTION | FIRST |
| Input | all attributes (full tuple) |
| Output type | Tuple |
| Incremental | no (requires ordered elements) |
AGGREGATIONS = [['FUNCTION'='FIRST', 'INPUT_ATTRIBUTES'='*', 'OUTPUT_ATTRIBUTES'='first_tuple']]
LAST
Returns the most recently added tuple in the current window as a nested tuple. Especially useful with tumbling windows.
| Property | Value |
|---|---|
FUNCTION | LAST |
| Input | all attributes (full tuple) |
| Output type | Tuple |
| Incremental | yes |
AGGREGATIONS = [['FUNCTION'='LAST', 'INPUT_ATTRIBUTES'='*', 'OUTPUT_ATTRIBUTES'='last_tuple']]
TOP_K
Returns the top-k tuples from the current window, ranked by one or more scoring attributes. The output attribute holds a List<Tuple>.
| Property | Value |
|---|---|
FUNCTION | TOP_K |
| Input | one or more attributes, or * |
| Output type | List<Tuple> |
| Incremental | yes (sorted set) |
Additional parameters:
| Key | Type | Default | Description |
|---|---|---|---|
TOP_K | Integer | – | Required. Number of top elements to keep. |
SCORING_ATTRIBUTES | Attribute name(s) | – | The attribute(s) used to rank elements. |
DESCENDING | Boolean | true | true = highest score first, false = lowest score first. |
NEWEST_ON_TOP | Boolean | true | When scores are equal, the newest tuple ranks higher. |
MIN_SCORE | Number | – | Tuples with a score ≤ this value are never included. |
UNIQUE_ATTR | Attribute name | – | If set, only the most recent tuple per unique value of this attribute is kept. |
ALWAYS_OUTPUT | Boolean | false | If false, suppresses output when the top-k list has not changed. |
AGGREGATIONS = [['FUNCTION'='TOP_K', 'INPUT_ATTRIBUTES'='*', 'OUTPUT_ATTRIBUTES'='top5',
'TOP_K'=5, 'SCORING_ATTRIBUTES'='temperature', 'DESCENDING'=true]]
...
TRIGGER
Returns the tuple that triggered the current aggregation evaluation (the most recently arrived tuple). Optionally, only selected attributes are forwarded.
| Property | Value |
|---|---|
FUNCTION | TRIGGER |
| Input | any attributes, or * |
| Output type | Tuple (when NESTED=true) or flat attributes (when NESTED=false) |
| Incremental | yes |
Additional parameters:
| Key | Type | Default | Description |
|---|---|---|---|
NESTED | Boolean | true | true: the trigger tuple is wrapped as a nested tuple attribute. false: the selected attributes are written directly as flat output attributes. |
AGGREGATIONS = [['FUNCTION'='TRIGGER', 'INPUT_ATTRIBUTES'='*', 'OUTPUT_ATTRIBUTES'='trigger_tuple']]
With NESTED=false:
AGGREGATIONS = [['FUNCTION'='TRIGGER', 'INPUT_ATTRIBUTES'='*', 'OUTPUT_ATTRIBUTES'='trigger_tuple',
'NESTED'=false]]
...
Statistical Functions (UnivariateStatistic)
These functions are all accessed through the same factory using the function name as the FUNCTION key. They all operate on a single numeric input attribute and produce a single Double output. All are incremental — the value buffer is maintained in a resizable array; elements are removed in FIFO order.
Note: These functions assume in-order element arrival. Out-of-order removal is not supported.
General syntax:
AGGREGATIONS = [['FUNCTION'='FunctionName', 'INPUT_ATTRIBUTES'='attrName', 'OUTPUT_ATTRIBUTES'='outName']]
FUNCTION value | Description | Formula |
|---|---|---|
GeometricMean | Geometric mean | (∏ xᵢ)^(1/n) |
Kurtosis | Excess kurtosis (Fisher) | 4th standardised central moment − 3 |
Skewness | Skewness (Fisher–Pearson) | 3rd standardised central moment |
IQR | Inter-Quartile Range | Q₃ − Q₁ (75th − 25th percentile) |
MCQ | Mean Crossing Rate | Fraction of consecutive pairs where the mean-centered signal changes sign |
RMS | Root Mean Square | √( (1/n) · Σ xᵢ² ) |
FrEnergy | Spectral Energy | Sum of FFT magnitudes divided by n (computed on mean-removed signal) |
FrDmEntropy | Frequency-Domain Entropy | Shannon entropy of the normalised FFT magnitude spectrum (mean-removed) |
FrPeakFreq | Peak Frequency | Frequency in [0, 5] Hz corresponding to the dominant FFT magnitude component (mean-removed) |
FrMag5 | FFT Magnitude at bin 5 | Magnitude of the 6th FFT bin (index 5, mean-removed) |
MSE | Mean Square Error | (1/n) · Σ (xᵢ − x̄)² — equivalent to population variance |
RMSE | Root Mean Square Error | √MSE |
Examples:
AGGREGATIONS = [['FUNCTION'='IQR', 'INPUT_ATTRIBUTES'='sensor_value', 'OUTPUT_ATTRIBUTES'='iqr_value']]
AGGREGATIONS = [['FUNCTION'='Kurtosis', 'INPUT_ATTRIBUTES'='sensor_value', 'OUTPUT_ATTRIBUTES'='kurtosis_value']]
AGGREGATIONS = [['FUNCTION'='RMS', 'INPUT_ATTRIBUTES'='sensor_value', 'OUTPUT_ATTRIBUTES'='rms_value']]
...
BFPRT (Deterministic k-th Smallest Element)
Applies the BFPRT algorithm (Blum–Floyd–Pratt–Rivest–Tarjan, also known as Median of Medians) to deterministically find the k-th smallest value among all tuples in the current window in guaranteed O(n) worst-case time.
By default (no extra parameters), the lower median is returned. The target rank can be configured either as an absolute 1-based index (K) or as a relative value (K_RELATIVE).
| Property | Value |
|---|---|
FUNCTION | BFPRT |
| Input | exactly one numeric attribute |
| Output type | Double |
| Incremental | no — BFPRT requires the complete window on every evaluation |
Additional parameters:
| Key | Type | Default | Description |
|---|---|---|---|
K | Integer ≥ 1 | – | 1-based absolute rank. K=1 → minimum, K=n → maximum. |
K_RELATIVE | Double in (0, 1] | – | Relative rank. K_RELATIVE=0.5 → median. Overrides K when both are present. |
When neither K nor K_RELATIVE is given, the lower median (⌊(n−1)/2⌋, 0-based) is returned.
Examples:
// Median (default)
AGGREGATIONS = [['FUNCTION'='BFPRT', 'INPUT_ATTRIBUTES'='temperature', 'OUTPUT_ATTRIBUTES'='median_temperature']]
// 3rd smallest value
AGGREGATIONS = [['FUNCTION'='BFPRT', 'INPUT_ATTRIBUTES'='temperature', 'OUTPUT_ATTRIBUTES'='p3_temperature',
'K'=3]]
// 90th percentile
AGGREGATIONS = [['FUNCTION'='BFPRT', 'INPUT_ATTRIBUTES'='temperature', 'OUTPUT_ATTRIBUTES'='p90_temperature',
'K_RELATIVE'=0.9]]
Examples
| Code Block | ||||
|---|---|---|---|---|
| ||||
counted = AGGREGATION({AGGREGATIONS = [['FUNCTION' = 'Count']], GROUP_BY = ['publisher', 'item']}, windowed) |
...
| Code Block | ||||
|---|---|---|---|---|
| ||||
/// count the number of items for each publisher
counted = AGGREGATION({AGGREGATIONS = [['FUNCTION' = 'Count']], GROUP_BY = ['publisher', 'item']}, windowed)
/// aggregate the 100 most frequent items for each publisher to an ordered list
TopKItemsByPublisher ::= AGGREGATION({AGGREGATIONS = [
[
'FUNCTION' = 'TopK',
'TOP_K' = '100', /// number of items
'SCORING_ATTRIBUTES' = 'Count', /// the attribute name that defines the order
'INPUT_ATTRIBUTES' = 'item', /// do not use the whole input tuple, just use the 'item' attribute for namecreating thatthe definesoutput thetop-k orderset
'INPUTMIN_ATTRIBUTESSCORE' = 'item0', /// remove doitems notthat usereaches thea wholescore inputof tuple,0 just(due useto the 'item' attribute for creating the output top-k set previous aggregation these are all items that has no valid tuple)
'MINUNIQUE_SCOREATTR' = '0item', /// use 'item' as ///a removeunique itemsattribute. that reachesmeans, a score of 0 (due tonew tuple with an known items id replaces the previous aggregation these are all items that has no valid tuplevalue. (this is some kind of element window in this operator)
'UNIQUE_ATTRdescending'='item' /// use 'item' as a unique attribute. that means, a new tuple with an known items id replaces the previous value. (this is some kind of element window in this operator)
= true, /// default is true. If you want to have the smallest elements, use 'false', if you want to have the biggest elements, use 'true'
'ALWAYS_OUTPUT' = true /// If set to false (default), 'null' is put out instead of the result if the result is equal to the previous result.
]], GROUP_BY = ['publisher']}, counted) |
...
| Code Block | ||
|---|---|---|
| ||
/// Tumnbling window
tumbling = TIMEWINDOW({
size = [5, 'MINUTES'],
advance = [5, 'MINUTES']
},
selectCenter
)
/// Select firstlast of tumbling
reduce = AGGREGATION({
aggregations = [['FUNCTION' = 'Last']],
EVAL_AT_NEW_ELEMENT = false,
EVAL_BEFORE_REMOVE_OUTDATING = true = [['FUNCTION' = 'Last']],
group_by = ['movingObjectId']
},
tumbling
)
/// Remove the grouping id (because it will be in the unnested tuple)
withoutId = PROJECT({
attributes = ['last']
},
reduce
)
/// Unnest the tuple
output = UNNEST({
attribute='last'
},
withoutId
) |
...
- Set
eval_at_new_elementtofalseandeval_before_remove_outdatingtotrueand add a preceding window with advance.
Remark: In this case, the starttimestamp of the output gets the timestamp of the value, that triggers the output (i.e. the element that states, that the current elements are outdated).
The following example calculates the number of elements in the stream impressions in one minute. It outputs the total number at the end of each minute instead of each update when a new item arrives.
| Code Block | ||||||
|---|---|---|---|---|---|---|
| ||||||
windowed = TIMEWINDOW({size = [1, 'Minutes'], ADVANCE = [1, 'MINUTES']}, impressions)
impressions_per_minute = AGGREGATION({AGGREGATIONS = [['FUNCTION' = 'Count']], EVAL_AT_NEW_ELEMENT = false, EVAL_BEFORE_REMOVE_OUTDATING = true}, windowed) |
Further information
How to create aggregation functions (in german)