The aggregation
operator is an alternative implementation of Aggregate (and Group) operator. In particular for sliding time windows with advance of 1 this operator is faster then the implementation with partial aggregates.
Differences in the use of this operator compared to Aggregate (and Group) operator:
- This operator has a more flexible PQL interface that allows to specify key value parameters.
- This operator does not set end timestamps of the resulting data stream elements. If you need the validity of the aggregation value you need to append an element window of size 1.
- This operator outputs "empty aggregations" if no tuple is valid at a specific point in time. E.g., the sum aggregation function would output 0. This is necessary to determine the end timestamp with a subsequent element window.
This aggregation functions are still in development. Especially the keys for the parameters are preliminary and subject to change.
Parameter
group_by:
An optional list of attributes over which the grouping should occur.aggregations:
A list of aggregate functions (see below).
The following optional boolean parameters control when a new aggregation value is transferred (see below for useful examples):
eval_at_new_element
: Outputs an updated aggregation value when a new element gets valid. In the case that more than one element gets valid at the same time (same start timestamp), this operator outputs for each element an output value in the order of arrival. The default value istrue
.eval_at_outdating
: Outputs an updated aggregation value when one ore more elements gets invalid with the value after the removal of the invalid elements. The default value istrue
.eval_before_remove_outdating
: Outputs an updated aggregation value before removing the invalid elements instead of after removal. The default value isfalse
.eval_at_done
: Outputs the value at the time the operator gets the done signal. The default value isfalse
.output_only_changes
: Suppresses elements that are equal to the previous outputted element. The default value isfalse
.
Aggregation Functions
Function Name | Description | Parameters | Examples | ||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Count | Outputs the number of steam elements. |
| ['FUNCTION' = 'Count']
['FUNCTION' = 'Count', 'OUTPUT_ATTRIBUTES' = 'number_of_elements'] | ||||||||||||
Sum | Outputs the sum of elements. |
| ['FUNCTION' = 'Sum']
['FUNCTION' = 'Sum', 'INPUT_ATTRIBUTES' = 'value1']
['FUNCTION' = 'Sum', 'INPUT_ATTRIBUTES' = ['value1', 'value2']] |
Examples
counted = AGGREGATION({AGGREGATIONS = [['FUNCTION' = 'Count']], GROUP_BY = ['publisher', 'item']}, windowed)
You can use more than one aggregation function:
counted = AGGREGATION({AGGREGATIONS = [['FUNCTION' = 'Count'], ['FUNCTION' = 'Sum', 'INPUT_ATTRIBUTES' = 'value1']], GROUP_BY = ['publisher', 'item']}, windowed)
Changing the way this operator outputs values
By using the default values, this operator act as Aggregate (and Group) operator (with the limitations explained above). Useful alternative settings are:
- Set
eval_at_new_element
tofalse
andeval_before_remove_outdating
totrue
and add a preceding window with advance:
The following example calculates the number of elements in the stream impressions in one minute. It outputs the total number at the end of each minute instead of each update when a new item arrives.
windowed = TIMEWINDOW({size = [1, 'Minutes'], ADVANCE = [1, 'MINUTES']}, impressions) impressions_per_minute = AGGREGATION({AGGREGATIONS = [['FUNCTION' = 'Count']], EVAL_AT_NEW_ELEMENT = false, EVAL_BEFORE_REMOVE_OUTDATING = true}, windowed)