Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Function NameDescriptionParametersExamples
CountOutputs the number of steam elements.
NameDescriptionDefault ValueOptional?

OUTPUT_ATTRIBUTES

The name for the output attribute.countTrue
['FUNCTION' = 'Count']


['FUNCTION' = 'Count', 'OUTPUT_ATTRIBUTES' = 'number_of_elements']
DistinctCountOutput the numbers of different stream elements

SumOutputs the sum of elements. 
NameDescriptionDefault ValueOptional?
INPUT_ATTRIBUTESThe single string or a list of the name(s) of the attribute(s) in the input tuples. By default, all input attributes are used. This could raise an error if attributes are not numeric.(all attributes)True
OUTPUT_ATTRIBUTESA single string or list of output attributes. By default, the string "Sum_" concatenated with the original input attribute name is used."Sum_" + intput attribute nameTrue
['FUNCTION' = 'Sum']

['FUNCTION' = 'Sum', 'INPUT_ATTRIBUTES' = 'value1']

['FUNCTION' = 'Sum', 'INPUT_ATTRIBUTES' = ['value1', 'value2']]
AvgAverage value (mean)TODO (similar to Sum)
MinMin valueTODO (similar to Sum)
MaxMax valueTODO (similar to Sum)
FirstThe first element of a window. See example below.
NameDescriptionDefault ValueOptional?

OUTPUT_ATTRIBUTES

The name for the output attribute.firstTrue
You should use the following settings:

output_only_changes = true

This results in getting the first element in each window. Especially useful with a tumbling window.

LastThe last element of a window. See example below.
NameDescriptionDefault ValueOptional?

OUTPUT_ATTRIBUTES

The name for the output attribute.lastTrue
You should use the following settings:

EVAL_AT_NEW_ELEMENT = false
EVAL_BEFORE_REMOVE_OUTDATING = true

This results in getting the last element in each window. Especially useful with a tumbling window.

TriggerThe tuple that triggers the output.TODO
VarianceCalculates the varianceTODO (similar to Sum)
StdDevStandard deviation

TopKCalculates the top-K listTODO
NestNests the valid elements as list. If given more than one attribute, this will contain the tuple projected on the attributesINPUT_ATTRIBUTES, required

['FUNCTION' = 'Nest','INPUT_ATTRIBUTES' = 'id']

['FUNCTION' = 'Nest','INPUT_ATTRIBUTES' = ['id','name']]

DistincNestSame as nest, but removes duplicates


A number of Univariate Statistics:

GeometricMean

Kurtosis

Skewness

IQR (InterQuantileRange)

MCQ (MeanCrossingRate)

RMS (RootMeanSquare)

FrEnergy (SpectralEnergy)

FrDmEntropy 

FrPeakFreq

FrMag5

MSE (Mean Square Error)

RMSE (Root Mean Square Error)


See e.g. https://commons.apache.org/proper/commons-math/javadocs/api-3.6.1/org/apache/commons/math3/stat/descriptive/UnivariateStatistic.html

[

'FUNCTION' = 'RMSE',

'INPUT_ATTRIBUTES' = 'error'

 ]

[

'FUNCTION' = 'MSE',

'INPUT_ATTRIBUTES' = 'error'

]


Operator-level parameters

ParameterTypeDefaultDescription
AGGREGATIONSList of mapsRequired. List of aggregation function configurations (see below).
GROUP_BYList of attributesOptional grouping attributes.
EVAL_AT_NEW_ELEMENTBooleantrueOutput a result when a new element becomes valid.
EVAL_AT_OUTDATINGBooleantrueOutput a result when elements expire.
EVAL_BEFORE_REMOVE_OUTDATINGBooleanfalseIf true, evaluate before removing expiring elements instead of after.
EVAL_AT_DONEBooleanfalseOutput the last value when the operator receives the done signal.
OUTPUT_ONLY_CHANGESBooleanfalseSuppress output tuples that are equal to the previous output.
SUPPRESS_FULL_META_DATA_HANDLINGBooleanfalseFor non-incremental functions: skip full metadata handling.

...

Standard Functions

AVG

Computes the arithmetic mean.

PropertyValue
FUNCTIONAVG
Inputone or more numeric attributes
Output typeDouble per input attribute
Incrementalyes
AGGREGATIONS = [['FUNCTION'='AVG', 'INPUT_ATTRIBUTES'='temperature', 'OUTPUT_ATTRIBUTES'='avg_temperature']]


COUNT

Counts the number of tuples in the current window. No input attribute is required.

PropertyValue
FUNCTIONCOUNT
Inputany attribute (or *)
Output typeLong
Incrementalyes
AGGREGATIONS = [['FUNCTION'='COUNT', 'INPUT_ATTRIBUTES'='x', 'OUTPUT_ATTRIBUTES'='count_x']]


SUM

Computes the sum of values.

PropertyValue
FUNCTIONSUM
Inputone or more numeric attributes
Output typeDouble per input attribute
Incrementalyes
AGGREGATIONS = [['FUNCTION'='SUM', 'INPUT_ATTRIBUTES'='value', 'OUTPUT_ATTRIBUTES'='sum_value']]


MIN

Returns the minimum value. Maintained with a min-priority queue.

PropertyValue
FUNCTIONMIN
Inputone or more numeric attributes
Output typesame as input
Incrementalyes
AGGREGATIONS = [['FUNCTION'='MIN', 'INPUT_ATTRIBUTES'='temperature', 'OUTPUT_ATTRIBUTES'='min_temperature']]


MAX

Returns the maximum value. Maintained with a max-priority queue.

PropertyValue
FUNCTIONMAX
Inputone or more numeric attributes
Output typesame as input
Incrementalyes
AGGREGATIONS = [['FUNCTION'='MAX', 'INPUT_ATTRIBUTES'='temperature', 'OUTPUT_ATTRIBUTES'='max_temperature']]


STDDEV

Population standard deviation, computed incrementally via Welford's online algorithm.

PropertyValue
FUNCTIONSTDDEV
Inputone or more numeric attributes
Output typeDouble per input attribute
Incrementalyes
AGGREGATIONS = [['FUNCTION'='STDDEV', 'INPUT_ATTRIBUTES'='x', 'OUTPUT_ATTRIBUTES'='stddev_x']]


VARIANCE

Population variance, computed incrementally via Welford's online algorithm.

PropertyValue
FUNCTIONVARIANCE
Inputone or more numeric attributes
Output typeDouble per input attribute
Incrementalyes
AGGREGATIONS = [['FUNCTION'='VARIANCE', 'INPUT_ATTRIBUTES'='x', 'OUTPUT_ATTRIBUTES'='var_x']]


NEST

Collects all tuples of the current window into a list. The output attribute holds a List<Tuple>.

PropertyValue
FUNCTIONNEST
Inputone or more attributes, or * for all
Output typeList<Tuple>
Incrementalyes

Additional parameters:

KeyTypeDefaultDescription
PRESERVE_ORDERINGBooleanfalseKeep tuples in insertion order (uses LinkedHashSet internally).
SORTBooleanfalseSort the tuples in the list (uses TreeSet internally).
UNIQUE_ATTRAttribute nameIf set, only the most recent tuple per unique value of this attribute is kept.
AGGREGATIONS = [['FUNCTION'='NEST', 'INPUT_ATTRIBUTES'='*', 'OUTPUT_ATTRIBUTES'='nested',
                 'PRESERVE_ORDERING'=true]]


DISTINCT_NEST

Like NEST, but eliminates duplicate tuples based on the projected attribute values.

PropertyValue
FUNCTIONDISTINCT_NEST
Inputone or more attributes, or * for all
Output typeList<Tuple>
Incrementalyes

Additional parameters: same as NEST (PRESERVE_ORDERING, SORT, UNIQUE_ATTR).

AGGREGATIONS = [['FUNCTION'='DISTINCT_NEST', 'INPUT_ATTRIBUTES'='sensorId', 'OUTPUT_ATTRIBUTES'='distinct_sensors']]

...

DISTINCT_COUNT

Counts the number of distinct tuples in the current window (based on the projected attribute values). Internally delegates to DISTINCT_NEST and returns its size.

PropertyValue
FUNCTIONDISTINCT_COUNT
Inputone or more attributes
Output typeInteger
Incrementalyes

Additional parameters:

KeyTypeDefaultDescription
UNIQUE_ATTRAttribute nameThe attribute(s) used to determine uniqueness.
AGGREGATIONS = [['FUNCTION'='DISTINCT_COUNT', 'INPUT_ATTRIBUTES'='sensorId',
                 'OUTPUT_ATTRIBUTES'='num_distinct', 'UNIQUE_ATTR'='sensorId']]

...

FIRST

Returns the temporally first tuple of the current window as a nested tuple. The window must provide elements in start-timestamp order. Especially useful with tumbling windows.

PropertyValue
FUNCTIONFIRST
Inputall attributes (full tuple)
Output typeTuple
Incrementalno (requires ordered elements)
AGGREGATIONS = [['FUNCTION'='FIRST', 'INPUT_ATTRIBUTES'='*', 'OUTPUT_ATTRIBUTES'='first_tuple']]


LAST

Returns the most recently added tuple in the current window as a nested tuple. Especially useful with tumbling windows.

PropertyValue
FUNCTIONLAST
Inputall attributes (full tuple)
Output typeTuple
Incrementalyes
AGGREGATIONS = [['FUNCTION'='LAST', 'INPUT_ATTRIBUTES'='*', 'OUTPUT_ATTRIBUTES'='last_tuple']]


TOP_K

Returns the top-k tuples from the current window, ranked by one or more scoring attributes. The output attribute holds a List<Tuple>.

PropertyValue
FUNCTIONTOP_K
Inputone or more attributes, or *
Output typeList<Tuple>
Incrementalyes (sorted set)

Additional parameters:

KeyTypeDefaultDescription
TOP_KIntegerRequired. Number of top elements to keep.
SCORING_ATTRIBUTESAttribute name(s)The attribute(s) used to rank elements.
DESCENDINGBooleantruetrue = highest score first, false = lowest score first.
NEWEST_ON_TOPBooleantrueWhen scores are equal, the newest tuple ranks higher.
MIN_SCORENumberTuples with a score ≤ this value are never included.
UNIQUE_ATTRAttribute nameIf set, only the most recent tuple per unique value of this attribute is kept.
ALWAYS_OUTPUTBooleanfalseIf false, suppresses output when the top-k list has not changed.
AGGREGATIONS = [['FUNCTION'='TOP_K', 'INPUT_ATTRIBUTES'='*', 'OUTPUT_ATTRIBUTES'='top5',
                 'TOP_K'=5, 'SCORING_ATTRIBUTES'='temperature', 'DESCENDING'=true]]

...

TRIGGER

Returns the tuple that triggered the current aggregation evaluation (the most recently arrived tuple). Optionally, only selected attributes are forwarded.

PropertyValue
FUNCTIONTRIGGER
Inputany attributes, or *
Output typeTuple (when NESTED=true) or flat attributes (when NESTED=false)
Incrementalyes

Additional parameters:

KeyTypeDefaultDescription
NESTEDBooleantruetrue: the trigger tuple is wrapped as a nested tuple attribute. false: the selected attributes are written directly as flat output attributes.
AGGREGATIONS = [['FUNCTION'='TRIGGER', 'INPUT_ATTRIBUTES'='*', 'OUTPUT_ATTRIBUTES'='trigger_tuple']]

With NESTED=false:

AGGREGATIONS = [['FUNCTION'='TRIGGER', 'INPUT_ATTRIBUTES'='*', 'OUTPUT_ATTRIBUTES'='trigger_tuple',
                 'NESTED'=false]]

...

Statistical Functions (UnivariateStatistic)

These functions are all accessed through the same factory using the function name as the FUNCTION key. They all operate on a single numeric input attribute and produce a single Double output. All are incremental — the value buffer is maintained in a resizable array; elements are removed in FIFO order.

Note: These functions assume in-order element arrival. Out-of-order removal is not supported.

General syntax:

AGGREGATIONS = [['FUNCTION'='FunctionName', 'INPUT_ATTRIBUTES'='attrName', 'OUTPUT_ATTRIBUTES'='outName']]
FUNCTION valueDescriptionFormula
GeometricMeanGeometric mean(∏ xᵢ)^(1/n)
KurtosisExcess kurtosis (Fisher)4th standardised central moment − 3
SkewnessSkewness (Fisher–Pearson)3rd standardised central moment
IQRInter-Quartile RangeQ₃ − Q₁ (75th − 25th percentile)
MCQMean Crossing RateFraction of consecutive pairs where the mean-centered signal changes sign
RMSRoot Mean Square√( (1/n) · Σ xᵢ² )
FrEnergySpectral EnergySum of FFT magnitudes divided by n (computed on mean-removed signal)
FrDmEntropyFrequency-Domain EntropyShannon entropy of the normalised FFT magnitude spectrum (mean-removed)
FrPeakFreqPeak FrequencyFrequency in [0, 5] Hz corresponding to the dominant FFT magnitude component (mean-removed)
FrMag5FFT Magnitude at bin 5Magnitude of the 6th FFT bin (index 5, mean-removed)
MSEMean Square Error(1/n) · Σ (xᵢ − x̄)² — equivalent to population variance
RMSERoot Mean Square Error√MSE

Examples:

AGGREGATIONS = [['FUNCTION'='IQR', 'INPUT_ATTRIBUTES'='sensor_value', 'OUTPUT_ATTRIBUTES'='iqr_value']]

AGGREGATIONS = [['FUNCTION'='Kurtosis', 'INPUT_ATTRIBUTES'='sensor_value', 'OUTPUT_ATTRIBUTES'='kurtosis_value']]

AGGREGATIONS = [['FUNCTION'='RMS', 'INPUT_ATTRIBUTES'='sensor_value', 'OUTPUT_ATTRIBUTES'='rms_value']]

...

BFPRT (Deterministic k-th Smallest Element)

Applies the BFPRT algorithm (Blum–Floyd–Pratt–Rivest–Tarjan, also known as Median of Medians) to deterministically find the k-th smallest value among all tuples in the current window in guaranteed O(n) worst-case time.

By default (no extra parameters), the lower median is returned. The target rank can be configured either as an absolute 1-based index (K) or as a relative value (K_RELATIVE).

PropertyValue
FUNCTIONBFPRT
Inputexactly one numeric attribute
Output typeDouble
Incrementalno — BFPRT requires the complete window on every evaluation

Additional parameters:

KeyTypeDefaultDescription
KInteger ≥ 11-based absolute rank. K=1 → minimum, K=n → maximum.
K_RELATIVEDouble in (0, 1]Relative rank. K_RELATIVE=0.5 → median. Overrides K when both are present.

When neither K nor K_RELATIVE is given, the lower median (⌊(n−1)/2⌋, 0-based) is returned.

Examples:

// Median (default)
AGGREGATIONS = [['FUNCTION'='BFPRT', 'INPUT_ATTRIBUTES'='temperature', 'OUTPUT_ATTRIBUTES'='median_temperature']]

// 3rd smallest value
AGGREGATIONS = [['FUNCTION'='BFPRT', 'INPUT_ATTRIBUTES'='temperature', 'OUTPUT_ATTRIBUTES'='p3_temperature',
                 'K'=3]]

// 90th percentile
AGGREGATIONS = [['FUNCTION'='BFPRT', 'INPUT_ATTRIBUTES'='temperature', 'OUTPUT_ATTRIBUTES'='p90_temperature',
                 'K_RELATIVE'=0.9]]

Examples

Code Block
languagejs
linenumberstrue
counted = AGGREGATION({AGGREGATIONS = [['FUNCTION' = 'Count']], GROUP_BY = ['publisher', 'item']}, windowed)

...