...
Code Block | ||||
---|---|---|---|---|
| ||||
/// count the number of items for each publisher counted = AGGREGATION({AGGREGATIONS = [['FUNCTION' = 'Count']], GROUP_BY = ['publisher', 'item']}, windowed) /// aggregate the 100 most frequent items for each publisher to an ordered list TopKItemsByPublisher ::= AGGREGATION({AGGREGATIONS = [ [ 'FUNCTION' = 'TopK', 'TOP_K' = '100', /// number of items 'SCORING_ATTRIBUTES' = 'Count', /// the attribute name that defines the order 'INPUT_ATTRIBUTES' = 'item', /// do not use the whole input tuple, just use the 'item' attribute for creating the output top-k set 'MIN_SCORE' = '0', /// remove items that reaches a score of 0 (due to the previous aggregation these are all items that has no valid tuple) 'UNIQUE_ATTR'='item' /// use 'item' as a unique attribute. that means, a new tuple with an known items id replaces the previous value. (this is some kind of element window in this operator) ]], GROUP_BY = ['publisher']}, counted) |
First
...
Here, we use a tumbling window with the "First" aggregate function to only get the first element per 5-minute window.
Code Block | ||
---|---|---|
| ||
/// Tumnbling window
tumbling = TIMEWINDOW({
size = [5, 'MINUTES'],
advance = [5, 'MINUTES']
},
selectCenter
)
/// Select first of tumbling
reduce = AGGREGATION({
aggregations = [['FUNCTION' = 'First']],
output_only_changes = true,
group_by = ['movingObjectId']
},
tumbling
)
/// Remove the grouping id (because it will be in the unnested tuple)
withoutId = PROJECT({
attributes = ['first']
},
reduce
)
/// Unnest the tuple
output = UNNEST({
attribute='first'
},
withoutId
) |
Last
Here, we use a tumbling window and the "Last" aggregate function to only get the last element per 5-minute window.
...