...
Odysseus has a wide range of operators build in and are explained here.
Base Operators
ACCESS
The access operator can be used to integrate new sources into odysseus. Further information can be found in the Documentation to the "Access Operator Framework".
...
This operator is used to aggregate and group the input stream.
Parameter
group_by:
An optional list of attributes over which the grouping should occur.aggregations:
A list if elements where each element contains up to four string:- The name of the aggregate function, e.g. MAX
- The input attribute over which the aggregation should be done
- The name of the output attribute for this aggregation
- The optional type of the output
dumpAtValueCount:
This parameter is used in the interval approach. Here a result is generated, each time an aggregation cannot be changed anymore. This leads to fewer results with a larger validity. With this parameter the production rate can be raised. A value of 1 means, that for every new element the aggregation operator receives new output elements are generated. This leads to more results with a shorter validity.
Example:
aggregated = aggregate({group_by = ['bidder'], aggregations=[ ['MAX', 'price', 'max_price', 'double'] ]}, windowed)
The set of aggregate functions is extensible. The following list is in the core Odysseus:
MAX
: The maximum elementMIN
: The minimum elementAVG
: The average elementSUM
: The sum of all elementsCOUNT
: The number of elementsSTDDEV
: The standard deviation
Some nonstandard aggregations: These should only be used, if you a familiar with them:
FIRST
: The first elementLAST
: The last elementNTH
: The nth elementRATE
:??
BUFFER
Description
Typically, Odysseus provides a buffer placement strategy to place buffers in the query plan. This operator allows adding additional buffers by hand. Buffers receives data stream elements and stores them in an internal elementbuffer. The scheduler stops the execution here for now. Later, the scheduler resumes to execution (e.g. with an another thread).
Parameter
Type:
The type of the buffer. The following types are currently part of Odysseus:Normal:
This is the standard type and will be used if the type parameter is absent- Currently, no further types are supported in the core Odysseus. In the future, there will be some buffers integrated to treat prioritized elements.
MaxBufferSize
: If this value is set, a buffer will block if it reaches its maximal capacity. No values will be thrown away; this is not a load shedding operator!
...
This operator can be used to reduce data rate. It buffers incoming elements on port 0 for bufferTime and evaluates a predicate over the elements on port 1. If the predicate for the current element e evaluates to true, all elements from port 0 that are younger than e.startTimeStamp()-bufferTime will be enriched with e and delivered for deliverTime. Each time the predicate evaluates to true, the deliverTime will be increased.
Parameter
Predicate
: The predicate that will be evaluated on element of port 1BufferTime:
The time in history, the elements will be kept in historyDeliverTime:
The time in the future, the elements will be delivered
Example
BufferedFilter({predicate = RelationalPredicate('id == 2 || id == 4 || id == 10'), bufferTime = 5000, deliverTime = 20000}, nexmark:bid2, nexmark:person2)
CALCLATENCY
Description
Odysseus has some features to measure the latency of single stream elements. This latency information is modeled as an interval. An operator in Odysseus can modify the start point of this interval. This operator sets the endpoint and determines the place in the query plan, where the latency measurement finds place. The There can be multiple operators in the plan, to measure latency at different places.
...
This operator can be used to filter out equal elements and reduce data rate.
Parameter
ATTR (List<String>)
: only these attribute are considered for the comparisonHeartbeatrate (Integer):
For each nth element that is filtered out, a heartbeat is generated to state the progress of timedeliverFirstElement (Boolean):
If true, the first element will be send to the next operator.
...
This operator calculates the difference between two input sets.
Parameter
None
Example
diff = difference(mapped, projected)
ENRICH
Description
This operator enriches tuples with information from the context store. Further Information: http://odysseus.offis.uni-oldenburg.de/twiki/bin/view/Main/ContextStore
Parameter
ATTRIBUTES:
The attributes from the store object, that should be used for enrichmentSTORE:
The name of the storeOUTER:
Example
EVENTTRIGGER
Description
...
This operator tests an existence predicate and can be used with the type EXISTS (semi join) and NOT_EXISTS (anti semi join). The predicates can be evaluated against the element from the first input and the second input.
Semi join: All elements in the first input for which there are elements in the second input that fulfills the predicate are sent.
Semi anti join: All elements in the first input for which there is no element in the second input that fulfills the predicate are sent.
Parameter
Predicate
Type
Example
existencial = existence({type='EXISTS', predicate=RelationalPredicate('auction = auction_id')}, projected, renamed)
existence({type='NOT_EXISTS', predicate=RelationalPredicate('auction = auction_id')}, projected, renamed)
FILESINK
Description
The operator can be used to dump the results of an operator to a file.
Parameter
FILE:
The filename to dump. If the path does not exist it will be created.FILETYPE:
The type of file, CSV: Print as CSV, if not given, the element will be dump to file a raw format (JavatoString()
-Method)
Example
JOIN
Description
Parameter
Example
joined = join({predicate = RelationalPredicate('auction_id = auction')}, renamed, nexmarkBid)
LEFTJOIN
Description
Parameter
...
Description
Parameter
Example
mapped = map({expressions = ['auction_id * 5','sqrt(auction_id)']}, renamed)
SASE
Description
Dieser Operator ermöglicht es, Anfragen mit zeitlichen Mustern zu definieren. Zu diesem Zweck wird die SASE+ Anfragesprache verwendet und die Anfrage im Parameter query übergeben. Die angebenen Quellen müssen den passenden Typ haben (für das folgende Beispiel also s05 und s08), die Reihenfolge ist egal. Ggf. muss der Typ einer Quelle vorher noch mit Hilfe der Rename-Operation definiert werden. Der Parameter heartbeatrate legt fest, wie oft ein Hearbeat generiert werden soll, wenn ein Element verarbeitet wurde, welches aber nicht zu einem Ergebnis geführt hat.
Parameter
heartbeatrate
query
OneMatchPerInstance
Example
s05 = RENAME({type='s05', aliases = ['ts', 'edge']},...)
PATTERNDETECT({heartbeatrate=1,query='
PATTERN SEQ(s05 s1, s08 s2)
where where skip_till_any_match(s1,s2){
s1 s1.edge=s2.edge }
return return s1.ts,s2.ts'}
, s05, s08)
PROJECT
Description
Parameter
Example
projected = project({attributes = ['auction', 'bidder']}, selected)
PUNCTUATION
Description
Der Punctuation Operator erlaubt das Einfügen von Punctuation in den Verarbeitungsstrom um so nachfolgende Operatoren vor einem möglichen Buffer-Overflow zu schützen. Hierfür hat der Operator zwei Eingänge I1 und I2. Der Eingang I1 stellt den Dateneingang dar und leitet eingehende Tupel direkt an den Ausgang. Der zweite Eingang I2 dient als Frequenzgeber. Sobald die Differenz der Eingangstupel zwischen dem ersten Eingang I1 und dem zweiten Eingang I2 über einen bestimmten Schwellwert steigt, wird in den linken Strom an I1 eine Punctuation eingefügt und ältere Tupel die eventuell danach eintreffen verworfen.
Der Punctuation Operator kann dabei ein nicht deterministisches Verhalten erzeugen, weil das Einfügen von Punctuations von der aktuellen Systemlast abhängen kann und sollte nur verwendet werden, wenn eine normale Verarbeitung der Daten aufgrund ihrer schwankenden Frequenzen nicht möglich ist.
Parameter
Example
punctuation({ratio = 10}, left, right)
RENAME
Description
Renames the attributes.
Parameter
Example
renamed = rename({aliases = ['auction_id', 'bidder_id']}, projected)
aliases
: The list new attribute names to use from now on. If the flagpairs
is set,aliases
will be interpretet as pairs of (old_name, new_name). See the examples below.- type:
pairs:
Optional boolean value that flags, if the given list of aliases should be interpretet as pairs of (old_name, new_name). Default value is false.
Example
renamed = rename({aliases = ['auction_id', 'bidder_id', 'another_id']}, projected)
(Renames the first attribute to auction_id,
the second to bidder_id
and the last to another_id
.)
renamedPairs = rename({aliases = ['auction_id', 'auction', 'bidder_id', 'bidder'], pairs = 'true'}, projected)
(Due the set flag pairs
, the rename operator renames the attribute auction_id
to auction
and bidder_id
to bidder
.)
ROUTE
Description
This operator can be used to route the elements in the stream to different further processing operators, depending on the predicate.
Parameter
Example
route({predicates=[RelationalPredicate('price > 200'), RelationalPredicate('price > 300')], RelationalPredicate('price > 400')}, nexmark:bid2)
SELECT
Description
Parameter
Example
selected = select({ predicate=
Anchor
RelationalPredicate('price > 100') }, nexmarkBid)
SINK
Description
Parameter
Example
...
Description
Parameter
Example
united = union(mapped, projected)
UNNEST
Description
Parameter
...
Description
Parameter
Example
//sliding time window
windowed = window({size = 5, advance = 1, type = 'time'}, selected)
//sliding tuple window partioniert ueber bidder
window({size = 5, advance = 1, type = 'tuple', partition=['bidder']}, selected)
//unbounded window
window({type = 'unbounded'}, selected)
//now window (size = 1, advance = 1)
window({type = 'time'}, selected)
//sliding delta window, reduces time granularity to value of slide
window({size = 5, type = 'time', slide = 5}, selected)
// Predicate window
window({startCondition=RelationalPredicate('a>10'), endCondition = RelationalPredicate('a<10')}, selected)
Benchmark
BATCHPRODUCER
Description
...