Versions Compared


  • This line was added.
  • This line was removed.
  • Formatting was changed.

The aggregation operator is an alternative implementation of Aggregate (and Group) operator. In particular for sliding time windows with advance of 1 this operator is faster then than the implementation with partial aggregates.


  • eval_at_new_element: Outputs an updated aggregation value when a new element gets valid. In the case that more than one element gets valid at the same time (same start timestamp), this operator outputs for each element an output value in the order of arrival. The default value is true.
  • eval_at_outdating: Outputs an updated aggregation value when one ore more elements gets invalid with the value after the removal of the invalid elements. The default value is true.
  • eval_before_remove_outdating: Outputs an updated aggregation value before removing the invalid elements instead of after removal. The default value is false.
  • eval_at_done: Outputs the value at the time the operator gets the done signal. The default value is false.
  • output_only_changes: Suppresses elements that are equal to the previous outputted element. The default value is false.

Aggregation Functions

Function NameDescriptionParametersExamples
CountOutputs the number of steam elements.
NameDescriptionDefault ValueOptional?


The name for the output attribute.countTrue
['FUNCTION' = 'Count']

['FUNCTION' = 'Count', 'OUTPUT_ATTRIBUTES' = 'number_of_elements']
SumOutputs the sum of elements. 
NameDescriptionDefault ValueOptional?
INPUT_ATTRIBUTESThe single string or a list of the name(s) of the attribute(s) in the input tuples. By default, all input attributes are used. This could raise an error if attributes are not numeric.(all attributes)True
OUTPUT_ATTRIBUTESA single string or list of output attributes. By default, the string "Sum_" concatenated with the original input attribute name is used."Sum_" + intput attribute nameTrue
['FUNCTION' = 'Sum']
['FUNCTION' = 'Sum', 'INPUT_ATTRIBUTES' = 'value1']
['FUNCTION' = 'Sum', 'INPUT_ATTRIBUTES' = ['value1', 'value2']]
AvgAverage value (mean)TODO (similar to Sum)

MinMin valueTODO (similar to Sum)

MaxMax valueTODO (similar to Sum)

TriggerThe tuple that triggers the output.TODO

VarianceCalculates the varianceTODO (similar to Sum)

TopKCalculates the top-K listTODO

NestNests the valid elements as list.TODO



Code Block
counted = AGGREGATION({AGGREGATIONS = [['FUNCTION' = 'Count']], GROUP_BY = ['publisher', 'item']}, windowed)


Code Block
/// count the number of items for each publisher
counted = AGGREGATION({AGGREGATIONS = [['FUNCTION' = 'Count']], GROUP_BY = ['publisher', 'item']}, windowed)
/// aggregate the 100 most frequent items for each publisher to an ordered list
		'FUNCTION' = 'TopK',
		'TOP_K' = '100',                         /// number of items
		'SCORING_ATTRIBUTES' = 'Count',          /// the attribute name that defines the order
		'INPUT_ATTRIBUTES' = 'item',             /// do not use the whole input tuple, just use the 'item' attribute for creating the output top-k set
		'MIN_SCORE' = '0',                       /// remove items that reaches a score of 0 (due to the previous aggregation these are all items that has no valid tuple)
		'UNIQUE_ATTR'='item'                     /// use 'item' as a unique attribute. that means, a new tuple with an known items id replaces the previous value. (this is some kind of element window in this operator)
	]], GROUP_BY = ['publisher']}, counted)


Changing the way this operator outputs values


Code Block
windowed = TIMEWINDOW({size = [1, 'Minutes'], ADVANCE = [1, 'MINUTES']}, impressions)
impressions_per_minute = AGGREGATION({AGGREGATIONS = [['FUNCTION' = 'Count']], EVAL_AT_NEW_ELEMENT = false, EVAL_BEFORE_REMOVE_OUTDATING = true}, windowed)

