Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

The aggregation operator is an alternative implementation of Aggregate (and Group) operator. In particular for sliding time windows with advance of 1 this operator is faster than the implementation with partial aggregates.

...

  • group_by: An optional list of attributes over which the grouping should occur.
  • aggregations: A list of aggregate functions (see below).
  • SUPPRESS_FULL_META_DATA_HANDLING: Boolean flag set to true if the handling of meta data other than Time Interval (e.g. Latency) should be supressed.

 

The following optional boolean parameters control when a new aggregation value is transferred (see below for useful examples):

...

Function NameDescriptionParametersExamples
CountOutputs the number of steam elements.
NameDescriptionDefault ValueOptional?

OUTPUT_ATTRIBUTES

The name for the output attribute.countTrue
['FUNCTION' = 'Count']


['FUNCTION' = 'Count', 'OUTPUT_ATTRIBUTES' = 'number_of_elements']
SumOutputs the sum of elements. 
NameDescriptionDefault ValueOptional?
INPUT_ATTRIBUTESThe single string or a list of the name(s) of the attribute(s) in the input tuples. By default, all input attributes are used. This could raise an error if attributes are not numeric.(all attributes)True
OUTPUT_ATTRIBUTESA single string or list of output attributes. By default, the string "Sum_" concatenated with the original input attribute name is used."Sum_" + intput attribute nameTrue
['FUNCTION' = 'Sum'] 

['FUNCTION' = 'Sum', 'INPUT_ATTRIBUTES' = 'value1'] 

['FUNCTION' = 'Sum', 'INPUT_ATTRIBUTES' = ['value1', 'value2']]
AvgAverage value (mean)TODO (similar to Sum)
MinMin valueTODO (similar to Sum)
MaxMax valueTODO (similar to Sum)
FirstThe first element of a window. See example below.
NameDescriptionDefault ValueOptional?

OUTPUT_ATTRIBUTES

The name for the output attribute.firstTrue
You should use the following settings:

output_only_changes = true

This results in getting the first element in each window. Especially useful with a tumbling window.

LastThe last element of a window. See example below.
NameDescriptionDefault ValueOptional?

OUTPUT_ATTRIBUTES

The name for the output attribute.lastTrue
You should use the following settings:

EVAL_AT_NEW_ELEMENT = false
EVAL_BEFORE_REMOVE_OUTDATING = true

This results in getting the last element in each window. Especially useful with a tumbling window.

TriggerThe tuple that triggers the output.TODO
VarianceCalculates the varianceTODO (similar to Sum)
TopKCalculates the top-K listTODO
NestNests the valid elements as list. TODO

Examples

...

languagejs
linenumberstrue

...

If given more than one attribute, this will contain the tuple projected on the attributesINPUT_ATTRIBUTES, required

['FUNCTION'

...

=

...

'

...

Nest','INPUT_ATTRIBUTES' = 'id']

['FUNCTION' = 'Nest','INPUT_ATTRIBUTES' = ['id','name']]


Examples

Code Block
languagejs
linenumberstrue
counted = AGGREGATION({AGGREGATIONS = [['FUNCTION' = 'Count']], ['FUNCTION'GROUP_BY = 'Sum', 'INPUT_ATTRIBUTES' = 'value1']], GROUP_BY = ['publisher', 'item']}, windowed)

You can use more than one aggregation function:

Code Block
languagejs
linenumberstrue
/// count the number of items for each publisher
counted = AGGREGATION({AGGREGATIONS = [['FUNCTION' = 'Count']], ['FUNCTION' = 'Sum', 'INPUT_ATTRIBUTES' = 'value1']], GROUP_BY = ['publisher', 'item']}, windowed)
Code Block
languagejs
linenumberstrue

/// aggregatecount the 100number mostof frequent items for each publisher to an ordered list
TopKItemsByPublisher ::
counted = AGGREGATION({AGGREGATIONS = [
	[
		'FUNCTION' = 'TopKCount',
		'TOP_K']], GROUP_BY = ['100publisher', 'item']}, windowed)
/// aggregate the 100 most frequent items for each publisher to an ordered list
TopKItemsByPublisher ::= AGGREGATION({AGGREGATIONS = [
	[
		'FUNCTION' = 'TopK',
		'TOP_K' = '100',                        /// number of items
		'SCORING_ATTRIBUTES' = 'Count',         /// the attribute name that defines the order
		'INPUT_ATTRIBUTES' = 'item',            /// do not use the whole input tuple, just use the 'item' attribute for creating the output top-k set
		'MIN_SCORE' = '0',                      /// remove items that reaches a score of 0 (due to the previous aggregation these are all items that has no valid tuple)
		'UNIQUE_ATTR'='item',                   /// use 'item' as a unique attribute. that means, a new tuple with an known items id replaces the previous value. (this is some kind of element window in this operator)
		'descending' = true	,					/// default is true. If you want to have the smallest elements, use 'false', if you want to have the biggest elements, use 'true'
 '
		'ALWAYS_OUTPUT' = true					/// If set to false (default), 'null' is put out instead of the result if the result is equal to the previous result.
  ]], GROUP_BY = ['publisher']}, counted)

...

Code Block
collapsetrue
/// Tumnbling window
tumbling = TIMEWINDOW({
                size = [5, 'MINUTES'],
                advance = [5, 'MINUTES']                                                                                                
              },
              selectCenter
            )
            
/// Select firstlast of tumbling
reduce = AGGREGATION({
              aggregations = [['FUNCTION' = 'Last']],
              EVAL_AT_NEW_ELEMENT = false,
              EVAL_BEFORE_REMOVE_OUTDATING = true]],
              group_by = ['movingObjectId']               
            },
            tumbling
          )            
          
/// Remove the grouping id (because it will be in the unnested tuple)
withoutId = PROJECT({
                attributes = ['last']              
              },
              reduce
            )

/// Unnest the tuple
output = UNNEST({
              attribute='last'                                        
            },
            withoutId
          )  

...

  • Set eval_at_new_element to false and eval_before_remove_outdating to true and add a preceding window with advance.

Remark: In this case, the starttimestamp of the output gets the timestamp of the value, that triggers the output (i.e. the element that states, that the current elements are outdated).

The following example calculates the number of elements in the stream impressions in one minute. It outputs the total number at the end of each minute instead of each update when a new item arrives.

Code Block
languagejs
themeEclipse
linenumberstrue
windowed = TIMEWINDOW({size = [1, 'Minutes'], ADVANCE = [1, 'MINUTES']}, impressions)
impressions_per_minute = AGGREGATION({AGGREGATIONS = [['FUNCTION' = 'Count']], EVAL_AT_NEW_ELEMENT = false, EVAL_BEFORE_REMOVE_OUTDATING = true}, windowed)

Further information

How to create aggregation functions (in german)