The aggregation
operator is an alternative implementation of Aggregate (and Group) operator. In particular for sliding time windows with advance of 1 this operator is faster then than the implementation with partial aggregates.
...
eval_at_new_element
: Outputs an updated aggregation value when a new element gets valid. In the case that more than one element gets valid at the same time (same start timestamp), this operator outputs for each element an output value in the order of arrival. The default value is true
.eval_at_outdating
: Outputs an updated aggregation value when one ore more elements gets invalid with the value after the removal of the invalid elements. The default value is true
.eval_before_remove_outdating
: Outputs an updated aggregation value before removing the invalid elements instead of after removal. The default value is false
.eval_at_done
: Outputs the value at the time the operator gets the done signal. The default value is false
.
output_only_changes
: Suppresses elements that are equal to the previous outputted element. The default value is false
.
Aggregation Functions
Function Name | Description | Parameters | Examples |
---|
Count | Outputs the number of steam elements. | Name | Description | Default Value | Optional? |
---|
OUTPUT_ATTRIBUTES
| The name for the output attribute. | count | True |
| ['FUNCTION' = 'Count'] |
['FUNCTION' = 'Count', 'OUTPUT_ATTRIBUTES' = 'number_of_elements'] |
Sum | Outputs the sum of elements. | Name | Description | Default Value | Optional? |
---|
INPUT_ATTRIBUTES | The single string or a list of the name(s) of the attribute(s) in the input tuples. By default, all input attributes are used. This could raise an error if attributes are not numeric. | (all attributes) | True | OUTPUT_ATTRIBUTES | A single string or list of output attributes. By default, the string "Sum_" concatenated with the original input attribute name is used. | "Sum_" + intput attribute name | True |
| ['FUNCTION' = 'Sum'] ['FUNCTION' = 'Sum', 'INPUT_ATTRIBUTES' = 'value1'] ['FUNCTION' = 'Sum', 'INPUT_ATTRIBUTES' = ['value1', 'value2']] |
Avg | Average value (mean) | TODO (similar to Sum) |
|
|
Min | Min value | TODO (similar to Sum) |
|
|
Max | Max value | TODO (similar to Sum) |
|
|
Trigger | The tuple that triggers the output. | TODO |
|
|
Variance | Calculates the variance | TODO (similar to Sum) |
|
|
TopK | Calculates the top-K list | TODO |
|
|
Nest | Nests the valid elements as list. | TODO |
|
Examples
Code Block |
---|
language | js |
---|
linenumbers | true |
---|
|
counted = AGGREGATION({AGGREGATIONS = [['FUNCTION' = 'Count']], GROUP_BY = ['publisher', 'item']}, windowed) |
...
Code Block |
---|
language | js |
---|
linenumbers | true |
---|
|
/// count the number of items for each publisher
counted = AGGREGATION({AGGREGATIONS = [['FUNCTION' = 'Count']], GROUP_BY = ['publisher', 'item']}, windowed)
/// aggregate the 100 most frequent items for each publisher to an ordered list
TopKItemsByPublisher ::= AGGREGATION({AGGREGATIONS = [
[
'FUNCTION' = 'TopK',
'TOP_K' = '100', /// number of items
'SCORING_ATTRIBUTES' = 'Count', /// the attribute name that defines the order
'INPUT_ATTRIBUTES' = 'item', /// do not use the whole input tuple, just use the 'item' attribute for creating the output top-k set
'MIN_SCORE' = '0', /// remove items that reaches a score of 0 (due to the previous aggregation these are all items that has no valid tuple)
'UNIQUE_ATTR'='item' /// use 'item' as a unique attribute. that means, a new tuple with an known items id replaces the previous value. (this is some kind of element window in this operator)
]], GROUP_BY = ['publisher']}, counted) |
Changing the way this operator outputs values
...
Code Block |
---|
language | js |
---|
theme | Eclipse |
---|
linenumbers | true |
---|
|
windowed = TIMEWINDOW({size = [1, 'Minutes'], ADVANCE = [1, 'MINUTES']}, impressions)
impressions_per_minute = AGGREGATION({AGGREGATIONS = [['FUNCTION' = 'Count']], EVAL_AT_NEW_ELEMENT = false, EVAL_BEFORE_REMOVE_OUTDATING = true}, windowed) |
...