...
| Function Name | Description | Parameters | Examples | ||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| Count | Outputs the number of steam elements. |
| ['FUNCTION' = 'Count'] ['FUNCTION' = 'Count', 'OUTPUT_ATTRIBUTES' = 'number_of_elements'] | ||||||||||||
| DistinctCount | Output the numbers of different stream elements | ||||||||||||||
| Sum | Outputs the sum of elements. |
| ['FUNCTION' = 'Sum'] ['FUNCTION' = 'Sum', 'INPUT_ATTRIBUTES' = 'value1'] ['FUNCTION' = 'Sum', 'INPUT_ATTRIBUTES' = ['value1', 'value2']] | ||||||||||||
| Avg | Average value (mean) | TODO (similar to Sum) | |||||||||||||
| Min | Min value | TODO (similar to Sum) | |||||||||||||
| Max | Max value | TODO (similar to Sum) | |||||||||||||
| First | The first element of a window. See example below. |
| You should use the following settings: output_only_changes = true This results in getting the first element in each window. Especially useful with a tumbling window. | ||||||||||||
| Last | The last element of a window. See example below. |
| You should use the following settings: EVAL_AT_NEW_ELEMENT = false This results in getting the last element in each window. Especially useful with a tumbling window. | ||||||||||||
| Trigger | The tuple that triggers the output. | TODO | |||||||||||||
| Variance | Calculates the variance | TODO (similar to Sum) | |||||||||||||
| StdDev | Standard deviation | ||||||||||||||
| TopK | Calculates the top-K list | TODO | |||||||||||||
| Nest | Nests the valid elements as list. If given more than one attribute, this will contain the tuple projected on the attributes | INPUT_ATTRIBUTES, required |
| ||||||||||||
| DistincNest | Same as nest, but removes duplicates | ||||||||||||||
A number of Univariate Statistics: GeometricMean Kurtosis Skewness IQR (InterQuantileRange) MCQ (MeanCrossingRate) RMS (RootMeanSquare) FrEnergy (SpectralEnergy) FrDmEntropy FrPeakFreq FrMag5 MSE (Mean Square Error) RMSE (Root Mean Square Error) | See e.g. https://commons.apache.org/proper/commons-math/javadocs/api-3.6.1/org/apache/commons/math3/stat/descriptive/UnivariateStatistic.html | [ 'FUNCTION' = 'RMSE', 'INPUT_ATTRIBUTES' = 'error' ] [ 'FUNCTION' = 'MSE', 'INPUT_ATTRIBUTES' = 'error' ] |
Operator-level parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
AGGREGATIONS | List of maps | – | Required. List of aggregation function configurations (see below). |
GROUP_BY | List of attributes | – | Optional grouping attributes. |
EVAL_AT_NEW_ELEMENT | Boolean | true | Output a result when a new element becomes valid. |
EVAL_AT_OUTDATING | Boolean | true | Output a result when elements expire. |
EVAL_BEFORE_REMOVE_OUTDATING | Boolean | false | If true, evaluate before removing expiring elements instead of after. |
EVAL_AT_DONE | Boolean | false | Output the last value when the operator receives the done signal. |
OUTPUT_ONLY_CHANGES | Boolean | false | Suppress output tuples that are equal to the previous output. |
SUPPRESS_FULL_META_DATA_HANDLING | Boolean | false | For non-incremental functions: skip full metadata handling. |
...
Standard Functions
AVG
Computes the arithmetic mean.
| Property | Value |
|---|---|
FUNCTION | AVG |
| Input | one or more numeric attributes |
| Output type | Double per input attribute |
| Incremental | yes |
AGGREGATIONS = [['FUNCTION'='AVG', 'INPUT_ATTRIBUTES'='temperature', 'OUTPUT_ATTRIBUTES'='avg_temperature']]
COUNT
Counts the number of tuples in the current window. No input attribute is required.
| Property | Value |
|---|---|
FUNCTION | COUNT |
| Input | any attribute (or *) |
| Output type | Long |
| Incremental | yes |
AGGREGATIONS = [['FUNCTION'='COUNT', 'INPUT_ATTRIBUTES'='x', 'OUTPUT_ATTRIBUTES'='count_x']]
SUM
Computes the sum of values.
| Property | Value |
|---|---|
FUNCTION | SUM |
| Input | one or more numeric attributes |
| Output type | Double per input attribute |
| Incremental | yes |
AGGREGATIONS = [['FUNCTION'='SUM', 'INPUT_ATTRIBUTES'='value', 'OUTPUT_ATTRIBUTES'='sum_value']]
MIN
Returns the minimum value. Maintained with a min-priority queue.
| Property | Value |
|---|---|
FUNCTION | MIN |
| Input | one or more numeric attributes |
| Output type | same as input |
| Incremental | yes |
AGGREGATIONS = [['FUNCTION'='MIN', 'INPUT_ATTRIBUTES'='temperature', 'OUTPUT_ATTRIBUTES'='min_temperature']]
MAX
Returns the maximum value. Maintained with a max-priority queue.
| Property | Value |
|---|---|
FUNCTION | MAX |
| Input | one or more numeric attributes |
| Output type | same as input |
| Incremental | yes |
AGGREGATIONS = [['FUNCTION'='MAX', 'INPUT_ATTRIBUTES'='temperature', 'OUTPUT_ATTRIBUTES'='max_temperature']]
STDDEV
Population standard deviation, computed incrementally via Welford's online algorithm.
| Property | Value |
|---|---|
FUNCTION | STDDEV |
| Input | one or more numeric attributes |
| Output type | Double per input attribute |
| Incremental | yes |
AGGREGATIONS = [['FUNCTION'='STDDEV', 'INPUT_ATTRIBUTES'='x', 'OUTPUT_ATTRIBUTES'='stddev_x']]
VARIANCE
Population variance, computed incrementally via Welford's online algorithm.
| Property | Value |
|---|---|
FUNCTION | VARIANCE |
| Input | one or more numeric attributes |
| Output type | Double per input attribute |
| Incremental | yes |
AGGREGATIONS = [['FUNCTION'='VARIANCE', 'INPUT_ATTRIBUTES'='x', 'OUTPUT_ATTRIBUTES'='var_x']]
NEST
Collects all tuples of the current window into a list. The output attribute holds a List<Tuple>.
| Property | Value |
|---|---|
FUNCTION | NEST |
| Input | one or more attributes, or * for all |
| Output type | List<Tuple> |
| Incremental | yes |
Additional parameters:
| Key | Type | Default | Description |
|---|---|---|---|
PRESERVE_ORDERING | Boolean | false | Keep tuples in insertion order (uses LinkedHashSet internally). |
SORT | Boolean | false | Sort the tuples in the list (uses TreeSet internally). |
UNIQUE_ATTR | Attribute name | – | If set, only the most recent tuple per unique value of this attribute is kept. |
AGGREGATIONS = [['FUNCTION'='NEST', 'INPUT_ATTRIBUTES'='*', 'OUTPUT_ATTRIBUTES'='nested',
'PRESERVE_ORDERING'=true]]
DISTINCT_NEST
Like NEST, but eliminates duplicate tuples based on the projected attribute values.
| Property | Value |
|---|---|
FUNCTION | DISTINCT_NEST |
| Input | one or more attributes, or * for all |
| Output type | List<Tuple> |
| Incremental | yes |
Additional parameters: same as NEST (PRESERVE_ORDERING, SORT, UNIQUE_ATTR).
AGGREGATIONS = [['FUNCTION'='DISTINCT_NEST', 'INPUT_ATTRIBUTES'='sensorId', 'OUTPUT_ATTRIBUTES'='distinct_sensors']]
...
DISTINCT_COUNT
Counts the number of distinct tuples in the current window (based on the projected attribute values). Internally delegates to DISTINCT_NEST and returns its size.
| Property | Value |
|---|---|
FUNCTION | DISTINCT_COUNT |
| Input | one or more attributes |
| Output type | Integer |
| Incremental | yes |
Additional parameters:
| Key | Type | Default | Description |
|---|---|---|---|
UNIQUE_ATTR | Attribute name | – | The attribute(s) used to determine uniqueness. |
AGGREGATIONS = [['FUNCTION'='DISTINCT_COUNT', 'INPUT_ATTRIBUTES'='sensorId',
'OUTPUT_ATTRIBUTES'='num_distinct', 'UNIQUE_ATTR'='sensorId']]
...
FIRST
Returns the temporally first tuple of the current window as a nested tuple. The window must provide elements in start-timestamp order. Especially useful with tumbling windows.
| Property | Value |
|---|---|
FUNCTION | FIRST |
| Input | all attributes (full tuple) |
| Output type | Tuple |
| Incremental | no (requires ordered elements) |
AGGREGATIONS = [['FUNCTION'='FIRST', 'INPUT_ATTRIBUTES'='*', 'OUTPUT_ATTRIBUTES'='first_tuple']]
LAST
Returns the most recently added tuple in the current window as a nested tuple. Especially useful with tumbling windows.
| Property | Value |
|---|---|
FUNCTION | LAST |
| Input | all attributes (full tuple) |
| Output type | Tuple |
| Incremental | yes |
AGGREGATIONS = [['FUNCTION'='LAST', 'INPUT_ATTRIBUTES'='*', 'OUTPUT_ATTRIBUTES'='last_tuple']]
TOP_K
Returns the top-k tuples from the current window, ranked by one or more scoring attributes. The output attribute holds a List<Tuple>.
| Property | Value |
|---|---|
FUNCTION | TOP_K |
| Input | one or more attributes, or * |
| Output type | List<Tuple> |
| Incremental | yes (sorted set) |
Additional parameters:
| Key | Type | Default | Description |
|---|---|---|---|
TOP_K | Integer | – | Required. Number of top elements to keep. |
SCORING_ATTRIBUTES | Attribute name(s) | – | The attribute(s) used to rank elements. |
DESCENDING | Boolean | true | true = highest score first, false = lowest score first. |
NEWEST_ON_TOP | Boolean | true | When scores are equal, the newest tuple ranks higher. |
MIN_SCORE | Number | – | Tuples with a score ≤ this value are never included. |
UNIQUE_ATTR | Attribute name | – | If set, only the most recent tuple per unique value of this attribute is kept. |
ALWAYS_OUTPUT | Boolean | false | If false, suppresses output when the top-k list has not changed. |
AGGREGATIONS = [['FUNCTION'='TOP_K', 'INPUT_ATTRIBUTES'='*', 'OUTPUT_ATTRIBUTES'='top5',
'TOP_K'=5, 'SCORING_ATTRIBUTES'='temperature', 'DESCENDING'=true]]
...
TRIGGER
Returns the tuple that triggered the current aggregation evaluation (the most recently arrived tuple). Optionally, only selected attributes are forwarded.
| Property | Value |
|---|---|
FUNCTION | TRIGGER |
| Input | any attributes, or * |
| Output type | Tuple (when NESTED=true) or flat attributes (when NESTED=false) |
| Incremental | yes |
Additional parameters:
| Key | Type | Default | Description |
|---|---|---|---|
NESTED | Boolean | true | true: the trigger tuple is wrapped as a nested tuple attribute. false: the selected attributes are written directly as flat output attributes. |
AGGREGATIONS = [['FUNCTION'='TRIGGER', 'INPUT_ATTRIBUTES'='*', 'OUTPUT_ATTRIBUTES'='trigger_tuple']]
With NESTED=false:
AGGREGATIONS = [['FUNCTION'='TRIGGER', 'INPUT_ATTRIBUTES'='*', 'OUTPUT_ATTRIBUTES'='trigger_tuple',
'NESTED'=false]]
...
Statistical Functions (UnivariateStatistic)
These functions are all accessed through the same factory using the function name as the FUNCTION key. They all operate on a single numeric input attribute and produce a single Double output. All are incremental — the value buffer is maintained in a resizable array; elements are removed in FIFO order.
Note: These functions assume in-order element arrival. Out-of-order removal is not supported.
General syntax:
AGGREGATIONS = [['FUNCTION'='FunctionName', 'INPUT_ATTRIBUTES'='attrName', 'OUTPUT_ATTRIBUTES'='outName']]
FUNCTION value | Description | Formula |
|---|---|---|
GeometricMean | Geometric mean | (∏ xᵢ)^(1/n) |
Kurtosis | Excess kurtosis (Fisher) | 4th standardised central moment − 3 |
Skewness | Skewness (Fisher–Pearson) | 3rd standardised central moment |
IQR | Inter-Quartile Range | Q₃ − Q₁ (75th − 25th percentile) |
MCQ | Mean Crossing Rate | Fraction of consecutive pairs where the mean-centered signal changes sign |
RMS | Root Mean Square | √( (1/n) · Σ xᵢ² ) |
FrEnergy | Spectral Energy | Sum of FFT magnitudes divided by n (computed on mean-removed signal) |
FrDmEntropy | Frequency-Domain Entropy | Shannon entropy of the normalised FFT magnitude spectrum (mean-removed) |
FrPeakFreq | Peak Frequency | Frequency in [0, 5] Hz corresponding to the dominant FFT magnitude component (mean-removed) |
FrMag5 | FFT Magnitude at bin 5 | Magnitude of the 6th FFT bin (index 5, mean-removed) |
MSE | Mean Square Error | (1/n) · Σ (xᵢ − x̄)² — equivalent to population variance |
RMSE | Root Mean Square Error | √MSE |
Examples:
AGGREGATIONS = [['FUNCTION'='IQR', 'INPUT_ATTRIBUTES'='sensor_value', 'OUTPUT_ATTRIBUTES'='iqr_value']]
AGGREGATIONS = [['FUNCTION'='Kurtosis', 'INPUT_ATTRIBUTES'='sensor_value', 'OUTPUT_ATTRIBUTES'='kurtosis_value']]
AGGREGATIONS = [['FUNCTION'='RMS', 'INPUT_ATTRIBUTES'='sensor_value', 'OUTPUT_ATTRIBUTES'='rms_value']]
...
BFPRT (Deterministic k-th Smallest Element)
Applies the BFPRT algorithm (Blum–Floyd–Pratt–Rivest–Tarjan, also known as Median of Medians) to deterministically find the k-th smallest value among all tuples in the current window in guaranteed O(n) worst-case time.
By default (no extra parameters), the lower median is returned. The target rank can be configured either as an absolute 1-based index (K) or as a relative value (K_RELATIVE).
| Property | Value |
|---|---|
FUNCTION | BFPRT |
| Input | exactly one numeric attribute |
| Output type | Double |
| Incremental | no — BFPRT requires the complete window on every evaluation |
Additional parameters:
| Key | Type | Default | Description |
|---|---|---|---|
K | Integer ≥ 1 | – | 1-based absolute rank. K=1 → minimum, K=n → maximum. |
K_RELATIVE | Double in (0, 1] | – | Relative rank. K_RELATIVE=0.5 → median. Overrides K when both are present. |
When neither K nor K_RELATIVE is given, the lower median (⌊(n−1)/2⌋, 0-based) is returned.
Examples:
// Median (default)
AGGREGATIONS = [['FUNCTION'='BFPRT', 'INPUT_ATTRIBUTES'='temperature', 'OUTPUT_ATTRIBUTES'='median_temperature']]
// 3rd smallest value
AGGREGATIONS = [['FUNCTION'='BFPRT', 'INPUT_ATTRIBUTES'='temperature', 'OUTPUT_ATTRIBUTES'='p3_temperature',
'K'=3]]
// 90th percentile
AGGREGATIONS = [['FUNCTION'='BFPRT', 'INPUT_ATTRIBUTES'='temperature', 'OUTPUT_ATTRIBUTES'='p90_temperature',
'K_RELATIVE'=0.9]]
Examples
| Code Block | ||||
|---|---|---|---|---|
| ||||
counted = AGGREGATION({AGGREGATIONS = [['FUNCTION' = 'Count']], GROUP_BY = ['publisher', 'item']}, windowed) |
...