The aggregation operator is an alternative implementation of Aggregate (and Group) operator. In particular for sliding time windows with advance of 1 this operator is faster than the implementation with partial aggregates.

Differences in the use of this operator compared to Aggregate (and Group) operator:

This aggregation functions are still in development. Especially the keys for the parameters are preliminary and subject to change.

Parameter


The following optional boolean parameters control when a new aggregation value is transferred (see below for useful examples):

Aggregation Functions

Function NameDescriptionParametersExamples
CountOutputs the number of steam elements.
NameDescriptionDefault ValueOptional?

OUTPUT_ATTRIBUTES

The name for the output attribute.countTrue
['FUNCTION' = 'Count']


['FUNCTION' = 'Count', 'OUTPUT_ATTRIBUTES' = 'number_of_elements']
SumOutputs the sum of elements. 
NameDescriptionDefault ValueOptional?
INPUT_ATTRIBUTESThe single string or a list of the name(s) of the attribute(s) in the input tuples. By default, all input attributes are used. This could raise an error if attributes are not numeric.(all attributes)True
OUTPUT_ATTRIBUTESA single string or list of output attributes. By default, the string "Sum_" concatenated with the original input attribute name is used."Sum_" + intput attribute nameTrue
['FUNCTION' = 'Sum']
 
['FUNCTION' = 'Sum', 'INPUT_ATTRIBUTES' = 'value1']
 
['FUNCTION' = 'Sum', 'INPUT_ATTRIBUTES' = ['value1', 'value2']]
AvgAverage value (mean)TODO (similar to Sum)
MinMin valueTODO (similar to Sum)
MaxMax valueTODO (similar to Sum)
TriggerThe tuple that triggers the output.TODO
VarianceCalculates the varianceTODO (similar to Sum)
TopKCalculates the top-K listTODO
NestNests the valid elements as list.TODO


Examples

counted = AGGREGATION({AGGREGATIONS = [['FUNCTION' = 'Count']], GROUP_BY = ['publisher', 'item']}, windowed)

You can use more than one aggregation function:

counted = AGGREGATION({AGGREGATIONS = [['FUNCTION' = 'Count'], ['FUNCTION' = 'Sum', 'INPUT_ATTRIBUTES' = 'value1']], GROUP_BY = ['publisher', 'item']}, windowed)
/// count the number of items for each publisher
counted = AGGREGATION({AGGREGATIONS = [['FUNCTION' = 'Count']], GROUP_BY = ['publisher', 'item']}, windowed)
/// aggregate the 100 most frequent items for each publisher to an ordered list
TopKItemsByPublisher ::= AGGREGATION({AGGREGATIONS = [
	[
		'FUNCTION' = 'TopK',
		'TOP_K' = '100',                         /// number of items
		'SCORING_ATTRIBUTES' = 'Count',          /// the attribute name that defines the order
		'INPUT_ATTRIBUTES' = 'item',             /// do not use the whole input tuple, just use the 'item' attribute for creating the output top-k set
		'MIN_SCORE' = '0',                       /// remove items that reaches a score of 0 (due to the previous aggregation these are all items that has no valid tuple)
		'UNIQUE_ATTR'='item'                     /// use 'item' as a unique attribute. that means, a new tuple with an known items id replaces the previous value. (this is some kind of element window in this operator)
	]], GROUP_BY = ['publisher']}, counted)


Changing the way this operator outputs values

By using the default values, this operator act as Aggregate (and Group) operator (with the limitations explained above). Useful alternative settings are:

The following example calculates the number of elements in the stream impressions in one minute. It outputs the total number at the end of each minute instead of each update when a new item arrives.

windowed = TIMEWINDOW({size = [1, 'Minutes'], ADVANCE = [1, 'MINUTES']}, impressions)
impressions_per_minute = AGGREGATION({AGGREGATIONS = [['FUNCTION' = 'Count']], EVAL_AT_NEW_ELEMENT = false, EVAL_BEFORE_REMOVE_OUTDATING = true}, windowed)