...
| Code Block | ||||
|---|---|---|---|---|
| ||||
/// count the number of items for each publisher
counted = AGGREGATION({AGGREGATIONS = [['FUNCTION' = 'Count']], GROUP_BY = ['publisher', 'item']}, windowed)
/// aggregate the 100 most frequent items for each publisher to an ordered list
TopKItemsByPublisher ::= AGGREGATION({AGGREGATIONS = [
[
'FUNCTION' = 'TopK',
'TOP_K' = '100', /// number of items
'SCORING_ATTRIBUTES' = 'Count', /// the attribute name that defines the order
'INPUT_ATTRIBUTES' = 'item', /// do not use the whole input tuple, just use the 'item' attribute for creating the output top-k set
'MIN_SCORE' = '0', /// remove items that reaches a score of 0 (due to the previous aggregation these are all items that has no valid tuple)
'UNIQUE_ATTR'='item' /// use 'item' as a unique attribute. that means, a new tuple with an known items id replaces the previous value. (this is some kind of element window in this operator)
]], GROUP_BY = ['publisher']}, counted) |
First
...
Here, we use a tumbling window with the "First" aggregate function to only get the first element per 5-minute window.
| Code Block | ||
|---|---|---|
| ||
/// Tumnbling window
tumbling = TIMEWINDOW({
size = [5, 'MINUTES'],
advance = [5, 'MINUTES']
},
selectCenter
)
/// Select first of tumbling
reduce = AGGREGATION({
aggregations = [['FUNCTION' = 'First']],
output_only_changes = true,
group_by = ['movingObjectId']
},
tumbling
)
/// Remove the grouping id (because it will be in the unnested tuple)
withoutId = PROJECT({
attributes = ['first']
},
reduce
)
/// Unnest the tuple
output = UNNEST({
attribute='first'
},
withoutId
) |
Last
Here, we use a tumbling window and the "Last" aggregate function to only get the last element per 5-minute window.
...