You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 12 Next »

One important operation in streaming and CEP applications is the aggregation. We will give an example on how Odysseus computes aggregations. A problem for aggregations in combination with sliding windows is how to handle events which leave a window (they become invalid). A simple approach is to keep every event and calculate the aggregation on evaluation time. Here, the event that get invalid can be simply removed. But this causes a large memory overhead. Odysseus adapts the concept of online aggregations. In this concept, only the minimal needed information is kept to calculate an aggregation over a window by utilizing so called partial aggregates. A very intuitive example is the calculation of an average: The partial aggregate needs to keep a running sum and the count of events aggregated so far. This can be used to calculate the concrete result anytime by the division of sum and count.

But with this partial aggregates, it is not possible to remove the values of the first event without knowing its content. In Odysseus we adapted the approach of Partial aggregates. Partial aggregates are built only from events that overlap in their time interval. For this, new events must be combined with different existing partial aggregates.

 

 

Fig. 2 illustrates an example. To make the gure more readable,

this example shows a count aggregation. The AGGRE-

GATE operator keeps a state of all current partial aggregates.

In this example there are two partial aggregates with

This operator is used to aggregate and group the input stream.

Parameter

  • group_by: An optional list of attributes over which the grouping should occur.
  • aggregations: A list if elements where each element contains up to four string:
    • The name of the aggregate function, e.g. MAX
    • The input attribute over which the aggregation should be done
    • The name of the output attribute for this aggregation
    • The optional type of the output. If not given, DOUBLE will be used.
  • dumpAtValueCount: This parameter is used in the interval approach. Here a result is generated, each time an aggregation cannot be changed anymore. This leads to fewer results with a larger validity. With this parameter the production rate can be raised. A value of 1 means, that for every new element the aggregation operator receives new output elements are generated. This leads to more results with a shorter validity.
  • outputPA: This parameter allow to dump partial aggregates instead of evaluted values. The partial aggregates can be send to other aggregation operators and do a final aggregation (e.g. in case of distribution). The input schema of an aggregate operator that read partial aggregates must state a datatype that is a partial aggregated (see example below). Remark: Aggregate has one input and requires ordered input. To combine different parital aggregations e.g. a union operator is needed to reorder the input elements.
  • drainAtDone: Boolean, default true: If done is called, all not already written elements will be written.
  • drainAtClose: Boolean, default false: If close is called, all not already written elements will be written.
  • FastGrouping: Use hash code instead of compare to create group. Potentially unsafe!

Aggregation Functions

The set of aggregate functions is extensible. The following list is in the core Odysseus:

  • MAX: The maximum element
  • MIN: The minimum element
  • AVG: The average element
  • SUM: The sum of all elements
  • COUNT: The number of elements
  • MEDIAN: The median element
  • STDDEV: The standard deviation
  • VAR: The variance
  • CORR: The correlation between two attributes
  • COV: The covariance between two attributes


Some nonstandard aggregations: These should only be used, if you a familiar with them:

  • FIRST: The first element
  • LAST: The last element
  • NTH: The nth element
  • RATE: The number of elements per time unit
  • NEST: Nest the attribute values in a list
  • COMPLETENESS: Ratio of NULL-value elements to number of elements

Example:

PQL
Aggregate Operator
output = AGGREGATE({
                    group_by = ['bidder'], 
                    aggregations=[ ['MAX', 'price', 'max_price', 'double'] ]
                   }, input)

// Parital Aggregate example
pa = AGGREGATE({
          name='PRE_AGG',
          aggregations=[
            ['count', 'id', 'count', 'PartialAggregate'],
            ['sum', 'id', 'avgsum', 'PartialAggregate'],
            ['min', 'id', 'min', 'PartialAggregate'],
            ['max', 'id', 'max', 'PartialAggregate']
          ],
          outputpa='true'        
        },
        nexmark:person
      )
      
out = AGGREGATE({
          name='AGG',
          aggregations=[
            ['count', 'count', 'count', 'Integer'],
            ['sum', 'avgsum', 'sum', 'Double'],
            ['avg', 'avgsum', 'avg', 'Double'],
            ['min', 'min', 'min', 'Integer'],
            ['max', 'max', 'max', 'Integer']
          ]        
        },
        pa
      )

/// Example for aggregations on multiple attributes
out = AGGREGATE({
          aggregations=[
            ['corr', ['x', 'y'], 'correlation', 'Double'],
            ['cov', ['x', 'y'], 'covariance', 'Double']
          ]        
        },
        input
      )
CQL
Aggregate Operator
SELECT MAX(price) AS max_price FROM input GROUP BY bidder
  • No labels