The aggregation operator is an alternative implementation of Aggregate (and Group) operator. In particular for sliding time windows with advance of 1 this operator is faster than the implementation with partial aggregates.

Differences in the use of this operator compared to Aggregate (and Group) operator:

  • This operator has a more flexible PQL interface that allows to specify key value parameters.
  • This operator does not set end timestamps of the resulting data stream elements. If you need the validity of the aggregation value you need to append an element window of size 1.
  • This operator outputs "empty aggregations" if no tuple is valid at a specific point in time. E.g., the sum aggregation function would output 0. This is necessary to determine the end timestamp with a subsequent element window.

These aggregation functions are still in development. Especially the keys for the parameters are preliminary and subject to change.

Parameter

  • group_by: An optional list of attributes over which the grouping should occur.
  • aggregations: A list of aggregate functions (see below).
  • SUPPRESS_FULL_META_DATA_HANDLING: Boolean flag set to true if the handling of meta data other than Time Interval (e.g. Latency) should be supressed.

 

The following optional boolean parameters control when a new aggregation value is transferred (see below for useful examples):

  • eval_at_new_element: Outputs an updated aggregation value when a new element gets valid. In the case that more than one element gets valid at the same time (same start timestamp), this operator outputs for each element an output value in the order of arrival. The default value is true.
  • eval_at_outdating: Outputs an updated aggregation value when one ore more elements gets invalid with the value after the removal of the invalid elements. The default value is true.
  • eval_before_remove_outdating: Outputs an updated aggregation value before removing the invalid elements instead of after removal. The default value is false.
  • eval_at_done: Outputs the value at the time the operator gets the done signal. The default value is false.
  • output_only_changes: Suppresses elements that are equal to the previous outputted element. The default value is false. If you want to use this, make sure the equals-method for every attribute type is implemented.

Aggregation Functions

Function NameDescriptionParametersExamples
CountOutputs the number of steam elements.
NameDescriptionDefault ValueOptional?

OUTPUT_ATTRIBUTES

The name for the output attribute.countTrue
['FUNCTION' = 'Count']


['FUNCTION' = 'Count', 'OUTPUT_ATTRIBUTES' = 'number_of_elements']
DistinctCountOutput the numbers of different stream elements

SumOutputs the sum of elements. 
NameDescriptionDefault ValueOptional?
INPUT_ATTRIBUTESThe single string or a list of the name(s) of the attribute(s) in the input tuples. By default, all input attributes are used. This could raise an error if attributes are not numeric.(all attributes)True
OUTPUT_ATTRIBUTESA single string or list of output attributes. By default, the string "Sum_" concatenated with the original input attribute name is used."Sum_" + intput attribute nameTrue
['FUNCTION' = 'Sum']

['FUNCTION' = 'Sum', 'INPUT_ATTRIBUTES' = 'value1']

['FUNCTION' = 'Sum', 'INPUT_ATTRIBUTES' = ['value1', 'value2']]
AvgAverage value (mean)TODO (similar to Sum)
MinMin valueTODO (similar to Sum)
MaxMax valueTODO (similar to Sum)
FirstThe first element of a window. See example below.
NameDescriptionDefault ValueOptional?

OUTPUT_ATTRIBUTES

The name for the output attribute.firstTrue
You should use the following settings:

output_only_changes = true

This results in getting the first element in each window. Especially useful with a tumbling window.

LastThe last element of a window. See example below.
NameDescriptionDefault ValueOptional?

OUTPUT_ATTRIBUTES

The name for the output attribute.lastTrue
You should use the following settings:

EVAL_AT_NEW_ELEMENT = false
EVAL_BEFORE_REMOVE_OUTDATING = true

This results in getting the last element in each window. Especially useful with a tumbling window.

TriggerThe tuple that triggers the output.TODO
VarianceCalculates the varianceTODO (similar to Sum)
StdDevStandard deviation

TopKCalculates the top-K listTODO
NestNests the valid elements as list. If given more than one attribute, this will contain the tuple projected on the attributesINPUT_ATTRIBUTES, required

['FUNCTION' = 'Nest','INPUT_ATTRIBUTES' = 'id']

['FUNCTION' = 'Nest','INPUT_ATTRIBUTES' = ['id','name']]

DistincNestSame as nest, but removes duplicates


A number of Univariate Statistics:

GeometricMean

Kurtosis

Skewness

IQR (InterQuantileRange)

MCQ (MeanCrossingRate)

RMS (RootMeanSquare)

FrEnergy (SpectralEnergy)

FrDmEntropy 

FrPeakFreq

FrMag5

MSE (Mean Square Error)

RMSE (Root Mean Square Error)


See e.g. https://commons.apache.org/proper/commons-math/javadocs/api-3.6.1/org/apache/commons/math3/stat/descriptive/UnivariateStatistic.html

[

'FUNCTION' = 'RMSE',

'INPUT_ATTRIBUTES' = 'error'

 ]

[

'FUNCTION' = 'MSE',

'INPUT_ATTRIBUTES' = 'error'

]


Operator-level parameters

ParameterTypeDefaultDescription
AGGREGATIONSList of mapsRequired. List of aggregation function configurations (see below).
GROUP_BYList of attributesOptional grouping attributes.
EVAL_AT_NEW_ELEMENTBooleantrueOutput a result when a new element becomes valid.
EVAL_AT_OUTDATINGBooleantrueOutput a result when elements expire.
EVAL_BEFORE_REMOVE_OUTDATINGBooleanfalseIf true, evaluate before removing expiring elements instead of after.
EVAL_AT_DONEBooleanfalseOutput the last value when the operator receives the done signal.
OUTPUT_ONLY_CHANGESBooleanfalseSuppress output tuples that are equal to the previous output.
SUPPRESS_FULL_META_DATA_HANDLINGBooleanfalseFor non-incremental functions: skip full metadata handling.

Standard Functions

AVG

Computes the arithmetic mean.

PropertyValue
FUNCTIONAVG
Inputone or more numeric attributes
Output typeDouble per input attribute
Incrementalyes
AGGREGATIONS = [['FUNCTION'='AVG', 'INPUT_ATTRIBUTES'='temperature', 'OUTPUT_ATTRIBUTES'='avg_temperature']]


COUNT

Counts the number of tuples in the current window. No input attribute is required.

PropertyValue
FUNCTIONCOUNT
Inputany attribute (or *)
Output typeLong
Incrementalyes
AGGREGATIONS = [['FUNCTION'='COUNT', 'INPUT_ATTRIBUTES'='x', 'OUTPUT_ATTRIBUTES'='count_x']]


SUM

Computes the sum of values.

PropertyValue
FUNCTIONSUM
Inputone or more numeric attributes
Output typeDouble per input attribute
Incrementalyes
AGGREGATIONS = [['FUNCTION'='SUM', 'INPUT_ATTRIBUTES'='value', 'OUTPUT_ATTRIBUTES'='sum_value']]


MIN

Returns the minimum value. Maintained with a min-priority queue.

PropertyValue
FUNCTIONMIN
Inputone or more numeric attributes
Output typesame as input
Incrementalyes
AGGREGATIONS = [['FUNCTION'='MIN', 'INPUT_ATTRIBUTES'='temperature', 'OUTPUT_ATTRIBUTES'='min_temperature']]


MAX

Returns the maximum value. Maintained with a max-priority queue.

PropertyValue
FUNCTIONMAX
Inputone or more numeric attributes
Output typesame as input
Incrementalyes
AGGREGATIONS = [['FUNCTION'='MAX', 'INPUT_ATTRIBUTES'='temperature', 'OUTPUT_ATTRIBUTES'='max_temperature']]


STDDEV

Population standard deviation, computed incrementally via Welford's online algorithm.

PropertyValue
FUNCTIONSTDDEV
Inputone or more numeric attributes
Output typeDouble per input attribute
Incrementalyes
AGGREGATIONS = [['FUNCTION'='STDDEV', 'INPUT_ATTRIBUTES'='x', 'OUTPUT_ATTRIBUTES'='stddev_x']]


VARIANCE

Population variance, computed incrementally via Welford's online algorithm.

PropertyValue
FUNCTIONVARIANCE
Inputone or more numeric attributes
Output typeDouble per input attribute
Incrementalyes
AGGREGATIONS = [['FUNCTION'='VARIANCE', 'INPUT_ATTRIBUTES'='x', 'OUTPUT_ATTRIBUTES'='var_x']]


NEST

Collects all tuples of the current window into a list. The output attribute holds a List<Tuple>.

PropertyValue
FUNCTIONNEST
Inputone or more attributes, or * for all
Output typeList<Tuple>
Incrementalyes

Additional parameters:

KeyTypeDefaultDescription
PRESERVE_ORDERINGBooleanfalseKeep tuples in insertion order (uses LinkedHashSet internally).
SORTBooleanfalseSort the tuples in the list (uses TreeSet internally).
UNIQUE_ATTRAttribute nameIf set, only the most recent tuple per unique value of this attribute is kept.
AGGREGATIONS = [['FUNCTION'='NEST', 'INPUT_ATTRIBUTES'='*', 'OUTPUT_ATTRIBUTES'='nested',
                 'PRESERVE_ORDERING'=true]]


DISTINCT_NEST

Like NEST, but eliminates duplicate tuples based on the projected attribute values.

PropertyValue
FUNCTIONDISTINCT_NEST
Inputone or more attributes, or * for all
Output typeList<Tuple>
Incrementalyes

Additional parameters: same as NEST (PRESERVE_ORDERING, SORT, UNIQUE_ATTR).

AGGREGATIONS = [['FUNCTION'='DISTINCT_NEST', 'INPUT_ATTRIBUTES'='sensorId', 'OUTPUT_ATTRIBUTES'='distinct_sensors']]

DISTINCT_COUNT

Counts the number of distinct tuples in the current window (based on the projected attribute values). Internally delegates to DISTINCT_NEST and returns its size.

PropertyValue
FUNCTIONDISTINCT_COUNT
Inputone or more attributes
Output typeInteger
Incrementalyes

Additional parameters:

KeyTypeDefaultDescription
UNIQUE_ATTRAttribute nameThe attribute(s) used to determine uniqueness.
AGGREGATIONS = [['FUNCTION'='DISTINCT_COUNT', 'INPUT_ATTRIBUTES'='sensorId',
                 'OUTPUT_ATTRIBUTES'='num_distinct', 'UNIQUE_ATTR'='sensorId']]

FIRST

Returns the temporally first tuple of the current window as a nested tuple. The window must provide elements in start-timestamp order. Especially useful with tumbling windows.

PropertyValue
FUNCTIONFIRST
Inputall attributes (full tuple)
Output typeTuple
Incrementalno (requires ordered elements)
AGGREGATIONS = [['FUNCTION'='FIRST', 'INPUT_ATTRIBUTES'='*', 'OUTPUT_ATTRIBUTES'='first_tuple']]


LAST

Returns the most recently added tuple in the current window as a nested tuple. Especially useful with tumbling windows.

PropertyValue
FUNCTIONLAST
Inputall attributes (full tuple)
Output typeTuple
Incrementalyes
AGGREGATIONS = [['FUNCTION'='LAST', 'INPUT_ATTRIBUTES'='*', 'OUTPUT_ATTRIBUTES'='last_tuple']]


TOP_K

Returns the top-k tuples from the current window, ranked by one or more scoring attributes. The output attribute holds a List<Tuple>.

PropertyValue
FUNCTIONTOP_K
Inputone or more attributes, or *
Output typeList<Tuple>
Incrementalyes (sorted set)

Additional parameters:

KeyTypeDefaultDescription
TOP_KIntegerRequired. Number of top elements to keep.
SCORING_ATTRIBUTESAttribute name(s)The attribute(s) used to rank elements.
DESCENDINGBooleantruetrue = highest score first, false = lowest score first.
NEWEST_ON_TOPBooleantrueWhen scores are equal, the newest tuple ranks higher.
MIN_SCORENumberTuples with a score ≤ this value are never included.
UNIQUE_ATTRAttribute nameIf set, only the most recent tuple per unique value of this attribute is kept.
ALWAYS_OUTPUTBooleanfalseIf false, suppresses output when the top-k list has not changed.
AGGREGATIONS = [['FUNCTION'='TOP_K', 'INPUT_ATTRIBUTES'='*', 'OUTPUT_ATTRIBUTES'='top5',
                 'TOP_K'=5, 'SCORING_ATTRIBUTES'='temperature', 'DESCENDING'=true]]

TRIGGER

Returns the tuple that triggered the current aggregation evaluation (the most recently arrived tuple). Optionally, only selected attributes are forwarded.

PropertyValue
FUNCTIONTRIGGER
Inputany attributes, or *
Output typeTuple (when NESTED=true) or flat attributes (when NESTED=false)
Incrementalyes

Additional parameters:

KeyTypeDefaultDescription
NESTEDBooleantruetrue: the trigger tuple is wrapped as a nested tuple attribute. false: the selected attributes are written directly as flat output attributes.
AGGREGATIONS = [['FUNCTION'='TRIGGER', 'INPUT_ATTRIBUTES'='*', 'OUTPUT_ATTRIBUTES'='trigger_tuple']]

With NESTED=false:

AGGREGATIONS = [['FUNCTION'='TRIGGER', 'INPUT_ATTRIBUTES'='*', 'OUTPUT_ATTRIBUTES'='trigger_tuple',
                 'NESTED'=false]]

Statistical Functions (UnivariateStatistic)

These functions are all accessed through the same factory using the function name as the FUNCTION key. They all operate on a single numeric input attribute and produce a single Double output. All are incremental — the value buffer is maintained in a resizable array; elements are removed in FIFO order.

Note: These functions assume in-order element arrival. Out-of-order removal is not supported.

General syntax:

AGGREGATIONS = [['FUNCTION'='FunctionName', 'INPUT_ATTRIBUTES'='attrName', 'OUTPUT_ATTRIBUTES'='outName']]
FUNCTION valueDescriptionFormula
GeometricMeanGeometric mean(∏ xᵢ)^(1/n)
KurtosisExcess kurtosis (Fisher)4th standardised central moment − 3
SkewnessSkewness (Fisher–Pearson)3rd standardised central moment
IQRInter-Quartile RangeQ₃ − Q₁ (75th − 25th percentile)
MCQMean Crossing RateFraction of consecutive pairs where the mean-centered signal changes sign
RMSRoot Mean Square√( (1/n) · Σ xᵢ² )
FrEnergySpectral EnergySum of FFT magnitudes divided by n (computed on mean-removed signal)
FrDmEntropyFrequency-Domain EntropyShannon entropy of the normalised FFT magnitude spectrum (mean-removed)
FrPeakFreqPeak FrequencyFrequency in [0, 5] Hz corresponding to the dominant FFT magnitude component (mean-removed)
FrMag5FFT Magnitude at bin 5Magnitude of the 6th FFT bin (index 5, mean-removed)
MSEMean Square Error(1/n) · Σ (xᵢ − x̄)² — equivalent to population variance
RMSERoot Mean Square Error√MSE

Examples:

AGGREGATIONS = [['FUNCTION'='IQR', 'INPUT_ATTRIBUTES'='sensor_value', 'OUTPUT_ATTRIBUTES'='iqr_value']]

AGGREGATIONS = [['FUNCTION'='Kurtosis', 'INPUT_ATTRIBUTES'='sensor_value', 'OUTPUT_ATTRIBUTES'='kurtosis_value']]

AGGREGATIONS = [['FUNCTION'='RMS', 'INPUT_ATTRIBUTES'='sensor_value', 'OUTPUT_ATTRIBUTES'='rms_value']]

BFPRT (Deterministic k-th Smallest Element)

Applies the BFPRT algorithm (Blum–Floyd–Pratt–Rivest–Tarjan, also known as Median of Medians) to deterministically find the k-th smallest value among all tuples in the current window in guaranteed O(n) worst-case time.

By default (no extra parameters), the lower median is returned. The target rank can be configured either as an absolute 1-based index (K) or as a relative value (K_RELATIVE).

PropertyValue
FUNCTIONBFPRT
Inputexactly one numeric attribute
Output typeDouble
Incrementalno — BFPRT requires the complete window on every evaluation

Additional parameters:

KeyTypeDefaultDescription
KInteger ≥ 11-based absolute rank. K=1 → minimum, K=n → maximum.
K_RELATIVEDouble in (0, 1]Relative rank. K_RELATIVE=0.5 → median. Overrides K when both are present.

When neither K nor K_RELATIVE is given, the lower median (⌊(n−1)/2⌋, 0-based) is returned.

Examples:

// Median (default)
AGGREGATIONS = [['FUNCTION'='BFPRT', 'INPUT_ATTRIBUTES'='temperature', 'OUTPUT_ATTRIBUTES'='median_temperature']]

// 3rd smallest value
AGGREGATIONS = [['FUNCTION'='BFPRT', 'INPUT_ATTRIBUTES'='temperature', 'OUTPUT_ATTRIBUTES'='p3_temperature',
                 'K'=3]]

// 90th percentile
AGGREGATIONS = [['FUNCTION'='BFPRT', 'INPUT_ATTRIBUTES'='temperature', 'OUTPUT_ATTRIBUTES'='p90_temperature',
                 'K_RELATIVE'=0.9]]

Examples

counted = AGGREGATION({AGGREGATIONS = [['FUNCTION' = 'Count']], GROUP_BY = ['publisher', 'item']}, windowed)

You can use more than one aggregation function:

counted = AGGREGATION({AGGREGATIONS = [['FUNCTION' = 'Count'], ['FUNCTION' = 'Sum', 'INPUT_ATTRIBUTES' = 'value1']], GROUP_BY = ['publisher', 'item']}, windowed)
/// count the number of items for each publisher
counted = AGGREGATION({AGGREGATIONS = [['FUNCTION' = 'Count']], GROUP_BY = ['publisher', 'item']}, windowed)
/// aggregate the 100 most frequent items for each publisher to an ordered list
TopKItemsByPublisher ::= AGGREGATION({AGGREGATIONS = [
	[
		'FUNCTION' = 'TopK',
		'TOP_K' = '100',                        /// number of items
		'SCORING_ATTRIBUTES' = 'Count',         /// the attribute name that defines the order
		'INPUT_ATTRIBUTES' = 'item',            /// do not use the whole input tuple, just use the 'item' attribute for creating the output top-k set
		'MIN_SCORE' = '0',                      /// remove items that reaches a score of 0 (due to the previous aggregation these are all items that has no valid tuple)
		'UNIQUE_ATTR'='item',                   /// use 'item' as a unique attribute. that means, a new tuple with an known items id replaces the previous value. (this is some kind of element window in this operator)
		'descending' = true,					/// default is true. If you want to have the smallest elements, use 'false', if you want to have the biggest elements, use 'true'
		'ALWAYS_OUTPUT' = true					/// If set to false (default), 'null' is put out instead of the result if the result is equal to the previous result.
  ]], GROUP_BY = ['publisher']}, counted)

First

Here, we use a tumbling window with the "First" aggregate function to only get the first element per 5-minute window.

/// Tumnbling window
tumbling = TIMEWINDOW({
                size = [5, 'MINUTES'],
                advance = [5, 'MINUTES']                                                                                                
              },
              selectCenter
            )
            
/// Select first of tumbling
reduce = AGGREGATION({
              aggregations = [['FUNCTION' = 'First']],
              output_only_changes = true,
              group_by = ['movingObjectId']               
            },
            tumbling
          )            
          
/// Remove the grouping id (because it will be in the unnested tuple)
withoutId = PROJECT({
                attributes = ['first']              
              },
              reduce
            )

/// Unnest the tuple
output = UNNEST({
              attribute='first'                                        
            },
            withoutId
          )  

Last

Here, we use a tumbling window and the "Last" aggregate function to only get the last element per 5-minute window.

/// Tumnbling window
tumbling = TIMEWINDOW({
                size = [5, 'MINUTES'],
                advance = [5, 'MINUTES']                                                                                                
              },
              selectCenter
            )
            
/// Select last of tumbling
reduce = AGGREGATION({
              aggregations = [['FUNCTION' = 'Last']],
              group_by = ['movingObjectId']               
            },
            tumbling
          )            
          
/// Remove the grouping id (because it will be in the unnested tuple)
withoutId = PROJECT({
                attributes = ['last']              
              },
              reduce
            )

/// Unnest the tuple
output = UNNEST({
              attribute='last'                                        
            },
            withoutId
          )  

Changing the way this operator outputs values

By using the default values, this operator act as Aggregate (and Group) operator (with the limitations explained above). Useful alternative settings are:

  • Set eval_at_new_element to false and eval_before_remove_outdating to true and add a preceding window with advance.

Remark: In this case, the starttimestamp of the output gets the timestamp of the value, that triggers the output (i.e. the element that states, that the current elements are outdated).

The following example calculates the number of elements in the stream impressions in one minute. It outputs the total number at the end of each minute instead of each update when a new item arrives.

windowed = TIMEWINDOW({size = [1, 'Minutes'], ADVANCE = [1, 'MINUTES']}, impressions)
impressions_per_minute = AGGREGATION({AGGREGATIONS = [['FUNCTION' = 'Count']], EVAL_AT_NEW_ELEMENT = false, EVAL_BEFORE_REMOVE_OUTDATING = true}, windowed)

Further information

How to create aggregation functions (in german)


  • No labels