Versions Compared
Key
- This line was added.
- This line was removed.
- Formatting was changed.
This document describes the basic concepts of the Procedural Query Language (PQL) of Odysseus and shows how to use the language. In contrast to languages SQL based languages like the Continuous Query Language (CQL) or StreamSQL, PQL is more procedural and functional than declarative. This document shows how to formulate queries with PQL.
Using PQL in Queries
PQL is an operator based language where an operator can be seen as a logical building block of the query. Thus, PQL is the connection of several operators. Since Odysseus differentiates between logical operators and their physical operators, which are the implementing counterpart, PQL is based upon logical operators. Therefore, it may happen that the query gets changed during the transformation from the logical query plan into the physical query plan. This includes also logical optimization techniques like the restructuring of the logical query plan. To avoid this, you can explicitly turn off the query optimization.
Define an Operator
An operator can be used in PQL via its name and some optional settings, which can be compared with a function and the variables for the function:
OPERATORNAME(parameter, operator, operator, ...)
The first variable (parameter) describes operator dependent parameters and is used for configuring the operator. Note, that there is only one parameter variable! The other variables (operator) are input operators, which are the preceding operators that push their data into this operator. The inputs of an operator can be directly defined by the definition of another operator:
OPERATOR1(parameter1, OPERATOR2(Parameter2, OPERATOR3(...)))
Except for source operators (usually the first operator of a query) each operator should have at least one input operator. Thus, the operator can only have parameters:
OPERATOR1(parameter1)
Accordingly, the operator may only have input operators but no parameters:
OPERATOR1(OPERATOR2(OPERATOR3(...)))
Alternatively, if the operator has neither parameters nor input operators, the operator only exists of its name (without any brackets!), so just:
OPERATORNAME
It is also possible to combine all kinds of definitions, for example:
OPERATOR1(OPERATOR2(Parameter2, OPERATOR3))
Intermediate Names, Views and Sources
Since the nesting of operators may lead to an unreadable code, it is possible to name operators to reuse intermediate result. This is done via the "=" symbol. Thus, we can temporary save parts of the query, for example:
Result2 = OPERATOR2(Parameter2, OPERATOR3)
The defined names can be used like operators, so that we can insert them as the input for another operator, for example:
Result2 = OPERATOR2(Parameter2, OPERATOR3)OPERATOR1(Result2)
There could be also more than one intermediate result, if they have different names:
Result1 = OPERATOR1(Parameter1, …)Result2 = OPERATOR2(Parameter2, Result1)Result3 = OPERATOR3(Parameter3, Result2)
And you can use the intermediate name more than one time, e.g. if there are two or more operators that should get the same preceding operator:
Result1 = OPERATOR1(Parameter1, …)OPERATOR2(Parameter2, Result1)OPERATOR3(Parameter3, Result1)
All intermediate results that are defined via the "=" are only valid within the query. Thus, they are lost after the query is parsed and runs. This can be avoided with views.
A view is defined like the previous described intermediate results but uses ":=" instead of "=", e.g.:
Result2 := OPERATOR2(Parameter2, OPERATOR3)
Such a definition creates an entry into the data dictionary, so that the view is globally accessible and can be also used in other query languages like CQL.
Alternatively, the result of an operator can also be stored as a source into the data dictionary by using "::="
Result2 ::= OPERATOR2(Parameter2, OPERATOR3)
The difference between a view and a source is the kind of query plan that is saved into the data dictionary and is reused. If a view is defined, the result of the operator is saved as a logical query plan, which exists of logical operators. Thus, if another query uses the view, the logical operators are fetched from the data dictionary and build the lower part of the new operator plan or query. If an operator is saved as a source, the result of the operator is saved as a physical query plan, which exists of already transformed and maybe optimized physical operators. Thus, reusing a source is like a manually query sharing where parts of two or more different queries are used together. Additionally, the part of the source is not recognized if the new part of the query that uses the source is optimized. In contrast, the logical query plan that is used via the a view is recognized, but will not compulsorily lead to a query sharing.
Finally, all possibilities gives the following structure:
QUERY = (TEMPORARYSTREAM | VIEW | SHAREDSTREAM)+
TEMPORARYSTREAM = STROM "=" OPERATOR
VIEW = VIEWNAME ":=" OPERATOR
SHAREDSTREAM = SOURCENAME "::=" OPERATOR
Parameters – Configure an Operator
As mentioned before, the definition of an operator can contain a parameter. More precisely, the parameter is a list of parameters and is encapsulated via two curly brackets:
OPERATOR({parameter1, paramter2, …}, operatorinput)
A parameter itself exists of a name and a value that are defined via a "=". For example, if we have the parameter port and want to set this parameter to the 1234, we use the following definition:
OPERATOR({port=1234}, …)
The value can be one of the following simple types:
- Integer or long:
OPERATOR({port=1234}, …)
- Double:
OPERATOR({possibility=0.453}, …)
- String:
OPERATOR({host='localhost'}, …)
Furthermore, there are also some complex types:
- Predicate: A predicate is normally an expression that can be evaluated and returns either true or false. In general, a predicate is defined by via a function with a string, e.g.:
OPERATOR({predicate=RelationalPredicte('1<1234')}, …)
(This example uses the RelationalPredicate, which also can be replaced by other predicates.)
- List:It is also possible to pass a list of values. For that, the values have to be surrounded with squared brackets:
OPERATOR({color=['green', 'red', 'blue']}, …)
(Type of elements: integer, double, string, predicate, list, map).
- Map: This one allows maps like the HashMap in Java. Thus, one parameter can have a list of key-value pairs, where the key and the value are one of the described type. So, you can use this, to define a set of pairs where the key and the value are strings using the "=" for separating the key from the value:
OPERATOR({def=['left'='green', 'right'='blue']}, …)
It is also possible that values are lists:
OPERATOR({def=['left'=['green','red'],'right'=['blue']]}, …)
Remember, although the key can be another data type than the value, all keys must have the same data type and all values must have the same data type
Notice, that all parameters and their types (string or integer or list or…) are defined by their operator. Therefore, maybe it is not guaranteed that the same parameters of different operators use the same parameter declaration – although we aim to uniform all parameters.
Ports – What if the Operator Has More Than One Output?
There are some operators that have more than one output. Each output is provided via a port. The default port is 0, the second one is 1 etc. The selection for example, pushes all elements that fulfill the predicate to output port 0 and all other to output port 1. So, if you want to use another port, you can prepend the port number with a colon in front of the operator. For example, if you want the second output (port 1) of the select:
PROJECT({…}, 1:SELECT({predicate=RelationalPredicate('1<x')}, …))
The Full Grammar of PQL
QUERY = (TEMPORARYSTREAM | VIEW | SHAREDSTREAM)+
TEMPORARYSTREAM = STREAM "=" OPERATOR
VIEW = VIEWNAME ":=" OPERATOR
SHAREDSTREAM = SOURCENAME "::=" OPERATOR
OPERATOR = QUERY | [OUTPUTPORT ":"] OPERATORTYPE "(" (PARAMETERLIST [ "," OPERATORLIST ] | OPERATORLIST) ")"
OPERATORLIST = [ OPERATOR ("," OPERATOR)* ]
PARAMETERLIST = "{" PARAMETER ("," PARAMETER)* "}"
PARAMETER = NAME "=" PARAMETERVALUE
PARAMETERVALUE = LONG | DOUBLE | STRING | PREDICATE | LIST | MAP
LIST = "[" [PARAMETERVALUE ("," PARAMETERVALUE)*] "]"
MAP = "[" [MAPENTRY ("," MAPENTRY*] "]"
MAPENTRY = PARAMETERVALUE "=" PARAMETERVALUE
STRING = "'" [~']* "'"
PREDICATE = PREDICATETYPE "(" STRING ")"
List of available PQL Operators
Odysseus has a wide range of operators build in and are explained here.
Base Operators
ACCESS
Description
The access operator can be used to integrate new sources into Odysseus. Further information can be found in the Documentation to the Access Operator Framework.
Parameter
source:
The name of the access operatorwrapper:
In Odysseus the default wrappers are GenericPush and GenericPulltransport:
The transport defines the transport protocol to use.protocol:
The protocol parameter defines the application protocol to transform the processing results.datahandler:
This parameter defines the transform of the single attributes of the processing results.options:
Transport protocol and application protocol depending optionsschema:
The output schema of the access operator (may depend on the protocol handler)
Example
PQL
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
input = ACCESS({source='Source', wrapper='GenericPush', transport='TCPClient', protocol='CSV', dataHandler='Tuple', options=[['host', 'example.com'],['port', '8080'],['read', '10240'],['write', '10240']], schema=[ ['id', 'Double'], ['data', 'String']] }) |
CQL
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
CREATE STREAM source (id Double, data STRING) WRAPPER 'GenericPush' PROTOCOL 'CSV' TRANSPORT 'TCPClient' DATAHANDLER 'Tuple' OPTIONS ( 'host' 'example.com', 'port' '8080', 'read' '10240', 'write' '10240') |
AGGREGATE
Description
This operator is used to aggregate and group the input stream.
Parameter
group_by:
An optional list of attributes over which the grouping should occur.aggregations:
A list if elements where each element contains up to four string:- The name of the aggregate function, e.g. MAX
- The input attribute over which the aggregation should be done
- The name of the output attribute for this aggregation
- The optional type of the output
dumpAtValueCount:
This parameter is used in the interval approach. Here a result is generated, each time an aggregation cannot be changed anymore. This leads to fewer results with a larger validity. With this parameter the production rate can be raised. A value of 1 means, that for every new element the aggregation operator receives new output elements are generated. This leads to more results with a shorter validity.
Aggregation Functions
The set of aggregate functions is extensible. The following list is in the core Odysseus:
MAX
: The maximum elementMIN
: The minimum elementAVG
: The average elementSUM
: The sum of all elementsCOUNT
: The number of elementsSTDDEV
: The standard deviation
Some nonstandard aggregations: These should only be used, if you a familiar with them:
FIRST
: The first elementLAST
: The last elementNTH
: The nth elementRATE
: The number of elements per time unit
Example:
PQL
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
output = AGGREGATE({ group_by = ['bidder'], aggregations=[ ['MAX', 'price', 'max_price', 'double'] ] }, input) |
CQL
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
SELECT MAX(price) AS max_price FROM input GROUP BY bidder |
ASSUREHEARTBEAT
Description
This operator assures that there will be periodically a heartbeat to avoid blocking because of missing information about time progress. The operator guarantees, that no element (heartbeat or streamobject) is send, that is older than the last send heartbeat (i.e. the generated heartbeats are in order and indicate time progress). Heartbeats can be send periodically (sendAlwaysHeartbeats = true) or only if no other stream elements indicate time progress (e.g. in out of order scenarios) independent if a new element has been received or not.
Parameter
RealTimeDelay: How long should the operator wait in transaction time (real time) before it should send a punctuation
ApplicationTimeDelay: How long is the realTimeDelay in terms of application time (typically this should be the same, but for simulations this could be adapted)
TimeUnit: What is the time unit (see Java TimeUnit). Minimum Time unit is milliseconds!
SendAlwaysHeartbeat: If true, a heartbeat is send periodically for every realTimeDelay. This is useful for out of order processing
AllowOutOfOrder
: If set to true, the operator allows heartbeats to be send, that lie before the last send element. In other cases this is not allowed.
Example
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
output = ASSUREHEARTBEAT({realTimeDelay=5000, applicationTimeDelay=5000, sendAlwaysHeartbeat='false', allowOutOfOrder='false'}, input) |
BUFFER
Description
Typically, Odysseus provides a buffer placement strategy to place buffers in the query plan. This operator allows adding buffers by hand. Buffers receives data stream elements and stores them in an internal elementbuffer. The scheduler stops the execution here for now. Later, the scheduler resumes to execution (e.g. with an another thread).
Parameter
Type:
The type of the buffer. The following types are currently part of Odysseus:Normal:
This is the standard type and will be used if the type parameter is absent- Currently, no further types are supported in the core Odysseus. In the future, there will be some buffers integrated to treat prioritized elements.
MaxBufferSize
: If this value is set, a buffer will block if it reaches its maximal capacity. No values will be thrown away; this is not a load shedding operator!
Example
BUFFEREDFILTER
Description
This operator can be used to reduce data rate. It buffers incoming elements on port 0 (left) for bufferTime and evaluates a predicate over the elements on port 1 (right). If the predicate for the current element e evaluates to true, all elements from port 0 that are younger than e.startTimeStamp()-bufferTime will be enriched with e and delivered for deliverTime. Each time the predicate evaluates to true, the deliverTime will be increased.
Parameter
Predicate
: The predicate that will be evaluated on element of port 1BufferTime:
The time in history, the elements will be kept in historyDeliverTime:
The time in the future, the elements will be delivered
Example
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
output = BUFFEREDFILTER({predicate = RelationalPredicate('id == 2 || id == 4 || id == 10'), bufferTime = 5000, deliverTime = 20000}, left, right) |
CALCLATENCY
Description
Odysseus has some features to measure the latency of single stream elements. This latency information is modeled as an interval. An operator in Odysseus can modify the start point of this interval. This operator sets the endpoint and determines the place in the query plan, where the latency measurement finds place. There can be multiple operators in the plan, to measure latency at different places.
Parameter
none
Example
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
output = CALCLATENCY(input) |
CHANGEDETECT
Description
This operator can be used to filter out equal elements and reduce data rate.
Parameter
ATTR (List<String>)
: only these attribute are considered for the comparisonHeartbeatrate (Integer):
For each nth element that is filtered out, a heartbeat is generated to state the progress of timedeliverFirstElement (Boolean):
If true, the first element will be send to the next operator.
DIFFERENCE
Description
This operator calculates the difference between two input sets.
Parameter
None
Example
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
output = DIFFERENCE(left, right) |
ENRICH
Description
This operator enriches tuples with information from the context store. Further Information can be found here.
Parameter
ATTRIBUTES:
The attributes from the store object, that should be used for enrichmentSTORE:
The name of the storeOUTER: Enrich with null if the store is empty else the input is discarded.
Example
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
output = ENRICH({store='StoreName', outer='true'}, input) |
EVENTTRIGGER
Description
Parameter
Example
EXISTENCE
Description
This operator tests an existence predicate and can be used with the type EXISTS (semi join) and NOT_EXISTS (anti semi join). The predicates can be evaluated against the element from the first input and the second input.
Semi join: All elements in the first input for which there are elements in the second input that fulfills the predicate are sent.
Semi anti join: All elements in the first input for which there is no element in the second input that fulfills the predicate are sent.
Parameter
Predicate: The predicate
Type: Either EXISTS or NOT_EXISTS
Example
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
output = EXISTENCE({ type='EXISTS', predicate=RelationalPredicate('auction = auction_id') }, left, right) output = EXISTENCE({ type='NOT_EXISTS', predicate=RelationalPredicate('auction = auction_id') }, left, right) |
FILESINK
Description
The operator can be used to dump the results of an operator to a file.
Deprecated: Use the Sender Operator
Parameter
FILE:
The filename to dump. If the path does not exist it will be created.FILETYPE:
The type of file, CSV: Print as CSV, if not given, the element will be dump to file a raw format (JavatoString()
-Method)
Example
JOIN
Description
Joins tuple from two input streams iff their timestamps are overlapping and if the optional predicate validates to true.
Parameter
predicate: The predicate to evaluate over each incoming tuple from left and right
Example
PQL
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
output = join({predicate = RelationalPredicate('auction_id = auction')}, left, right) |
CQL
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
SELECT * FROM left, right WHERE auction_id = auction |
LEFTJOIN
Description
Parameter
Example
MAP
Description
Performs a mapping of incoming attributes to out-coming attributes using map functions. Odysseus also provides a wide range of mapping functions.
Parameter
expressions:
A list of expressions to map multiple incoming attribute values to out-coming attributes
Example
PQL
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
output = MAP({ expressions = ['auction_id * 5','sqrt(auction_id)'] }, input) |
CQL
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
SELECT auction_id * 5, sqrt(auction_id) FROM input |
PROJECT
Description
Parameter
attributes:
A list of attribute names to project on
Example
PQL
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
output = PROJECT({ attributes = ['auction', 'bidder'] }, input) |
CQL
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
SELECT auction, bidder FROM input |
PUNCTUATION
Description
Der Punctuation Operator erlaubt das Einfügen von Punctuation in den Verarbeitungsstrom um so nachfolgende Operatoren vor einem möglichen Buffer-Overflow zu schützen. Hierfür hat der Operator zwei Eingänge I1 und I2. Der Eingang I1 stellt den Dateneingang dar und leitet eingehende Tupel direkt an den Ausgang. Der zweite Eingang I2 dient als Frequenzgeber. Sobald die Differenz der Eingangstupel zwischen dem ersten Eingang I1 und dem zweiten Eingang I2 über einen bestimmten Schwellwert steigt, wird in den linken Strom an I1 eine Punctuation eingefügt und ältere Tupel die eventuell danach eintreffen verworfen.
Der Punctuation Operator kann dabei ein nicht deterministisches Verhalten erzeugen, weil das Einfügen von Punctuations von der aktuellen Systemlast abhängen kann und sollte nur verwendet werden, wenn eine normale Verarbeitung der Daten aufgrund ihrer schwankenden Frequenzen nicht möglich ist.
Parameter
Example
punctuation({ratio = 10}, left, right)
RENAME
Description
Renames the attributes.
Parameter
aliases
: The list new attribute names to use from now on. If the flagpairs
is set,aliases
will be interpreted as pairs of (old_name, new_name). See the examples below.type
: By default, each schema has the name of the source from which the data comes from (i.e. it is the name of the type that is processed by this operator). With this parameter the type (source) name can be changed. This is needed in the SASE operator.pairs:
Optional boolean value that flags, if the given list of aliases should be interpreted as pairs of (old_name, new_name). Default value is false.
Example
PQL
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
// Renames the first attribute to auction_id, the second to bidder_id and the last to another_id. output = RENAME({ aliases = ['auction_id', 'bidder_id', 'another_id'] }, input) // Due the set flag pairs, the rename operator renames the attribute auction_id to auction and bidder_id to bidder. output = RENAME({ aliases = ['auction_id', 'auction', 'bidder_id', 'bidder'], pairs = 'true' }, input) |
CQL
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
SELECT auction_id AS auction, bidder_id AS bidder FROM input |
ROUTE
Description
This operator can be used to route the elements in the stream to different further processing operators, depending on the predicate.
Parameter
predicates
: A list of predicates
Example
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
route({predicates=[RelationalPredicate('price > 200'), RelationalPredicate('price > 300')], RelationalPredicate('price > 400')}, input) |
SASE
Description
This operator allows to define temporal pattern to match against the stream. For this purpose we use the SASE+ query language. The query is expressed in the parameter query. The used source has to be of the correct type (for the example s05 and s08). The order is not important. If the type of the source is not set or wrong it can be set using the Rename Operator.
Dieser Operator ermöglicht es, Anfragen mit zeitlichen Mustern zu definieren. Zu diesem Zweck wird die SASE+ Anfragesprache verwendet und die Anfrage im Parameter query übergeben. Die angebenen Quellen müssen den passenden Typ haben (für das folgende Beispiel also s05 und s08), die Reihenfolge ist egal. Ggf. muss der Typ einer Quelle vorher noch mit Hilfe der Rename-Operation definiert werden. Der Parameter heartbeatrate legt fest, wie oft ein Hearbeat generiert werden soll, wenn ein Element verarbeitet wurde, welches aber nicht zu einem Ergebnis geführt hat.
Parameter
heartbeatrate: The rate to generate heartbeats if an element was processed without given a result.
query: The SASE+ query
OneMatchPerInstance
Example
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
s05 = RENAME({type='s05', aliases = ['ts', 'edge']},...) PATTERNDETECT({heartbeatrate=1,query='PATTERN SEQ(s05 s1, s08 s2) where skip_till_any_match(s1,s2){ s1.edge=s2.edge } return s1.ts,s2.ts'}, s05, s08) |
SELECT
Description
The select operator filter the incoming data stream according to the given predicate.
Parameter
predicate:
The predicate to evaluate over each incoming tuple
Example
PQL
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
output = SELECT({ predicate=RelationalPredicate('price > 100') }, input) |
CQL
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
SELECT * FROM input WHERE price > 100 |
SENDER
Description
This operator can be used to publish processing results to multiple endpoints using different transport and application protocols.
Parameter
wrapper:
In Odysseus the default wrappers are GenericPush and GenericPulltransport:
The transport defines the transport protocol to use.protocol:
The protocol parameter defines the application protocol to transform the processing results.datahandler:
This parameter defines the transform of the single attributes of the processing results.options:
Transport protocol and application protocol depending options
Example
PQL
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
output = SENDER({sink='Sink', wrapper='GenericPush', transport='TCPClient', protocol='CSV', dataHandler='Tuple', options=[['host', 'example.com'],['port', '8081'],['read', '10240'],['write', '10240']] }, input) |
CQL
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
CREATE SINK sink (timestamp STARTTIMESTAMP, auction INTEGER, bidder INTEGER, datetime LONG, price DOUBLE) WRAPPER 'GenericPush' PROTOCOL 'CSV' TRANSPORT 'TCPClient' DATAHANDLER 'Tuple' OPTIONS ( 'host' 'example.com', 'port' '8081', 'read', '10240', 'write', '10240' ) STREAM TO sink SELECT * FROM input |
SOCKETSINK
Description
This operator can be used to send/provide data from Odysseus via a tcp socket connection. (Remark: This operator will potentially change in future)
Deprecated: Use the Sender operator.
Parameter
Depending on the parameter push:Boolean
(todo: change name!), the parameter have to following meaning:
push = false
(Default): On Odysseus side a server socket connection is openedhost
:String
typically 'localhost'sinkport:Integer
On which port should the socket be opened
- push = true: Odysseus connects to a server
host:String
Server to connect toport:Integer
Port on host to connect to
Example
socketsink({host='localhost', push=false, sinkport=4712, sinkType='bytebuffer', sinkName='driverChannel'}, timestampToPayload(person))
STORE
Description
Transfer temporary information in a context store for use with the Enrich operator. More information about the context store can be found here.
Parameter
store
: The name of the store
Example
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
STORE({store = 'StoreName'}, input) |
TIMESTAMPTOPAYLOAD
Description
This operator is needed before data is send to another system (e.g. via a socket sink) to keep the time meta information (i.e. start and end time stamp). The input object gets two new fields with start and end timestamp. If this output is read again by (another) Odysseus instance, the following needs to be attached to the schema:
['start', 'StartTimestamp'], ['end', 'EndTimestamp']
Parameter
The operator is parameterless.
Example
driverChannel = socketsink({sinkport=4712, sinkType='bytebuffer', sinkName='driverChannel'}, timestampToPayload(person))
UNION
Description
This operator calculates the union of two input sets
Parameter
none
Example
PQL
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
output = UNION(left, right) |
CQL
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
SELECT * FROM left UNION SELECT * FROM rigtht |
UNNEST
Description
The UnNest operator unpacks incoming tuple with a multi value attribute to create multiple tuples
Parameter
attribute:
The attribute that should be unpack.
Example
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
output = UNNEST({ attribute='myAttribute' },input) |
UDO
Description
The UDO operator calls a user defined operator.
Parameter
class:
The name of the user defined operatorinit:
Additional parameter for the user defined operator
Example
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
output = UDO({ class='MyUDO', init='some parameter' }, input) |
WINDOW
Description
The window sets – dependent on the used parameters – the validity of the tuple. If a time based window is used, the default time granularity is in milliseconds. So, if you have another time granularity, you may use the unit-parameter (e.g. use 5 for size and SECONDS for the unit parameter) or you have to adjust the arity (e.g. use 5000 for size without the unit parameter)
Parameter
size:
The size of the windowadvance:
The advance the window moves forwardslide:
The slide of the windowtype:
The type of the window. The possible values are Time, Tuple, and Unboundpartition:
The partition attribute of the windowstartCondition:
The start condition for a predicate windowendCondition:
The end condition for a predicate window
Example
unit
: The unit for the time granularity - Possible values are one of TimeUnit like SECONDS, NANOSECODS etc. - default time
Example
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
Code Block | ||||||||
| ||||||||
//sliding time window (notice: size and advance is directly based on the used timestamps. //if they are in milliseconds (which is default), size and advance are in milliseconds too. output = WINDOW({ size = 5, advance = 1, type = 'time' }, input) //sliding time window with another time unit for size and advance. //size and advance are converted from seconds into milliseconds (since this is the default time granularity). This means, size will be 5000 and advance will be 1000 output //sliding time window output = WINDOW({ size = 5, advance = 1, type = 'time', unit = 'SECONDS' }, input) //sliding tuple window partitioned over bidder output = WINDOW({ size = 5, advance = 1, type = 'tuple', partition=['bidder'] }, input) //unbounded window output = WINDOW({ type = 'unbounded' }, input) //now window (size = 1, advance = 1) output = WINDOW({ type = 'time' }, input) //sliding delta window, reduces time granularity to value of slide output = WINDOW({ size = 5, type = 'time', slide = 5 }, input) // Predicate window output = WINDOW({ startCondition = RelationalPredicate('a>10'), endCondition = RelationalPredicate('a<10') }, input) |
Benchmark
BATCHPRODUCER
Description
Parameter
Example
BENCHMARK
Description
Parameter
Example
PRIOIDJOIN
Description
Parameter
Example
TESTPRODUCER
Description
Parameter
Example
Data Mining
CLASSIFY
Description
Parameter
Example
HOEFFDINGTREE
Description
Parameter
Example
LEADER
Description
Parameter
Example
SIMPLESINGLEPASSKMEANS
Description
Parameter
Example
FREQUENT_ITEM
Description
Parameter
Example
Storing
DATABASESOURCE
Description
This operator can read data from a relational database. Look at Database Feature for a detailed description.
DATABASESINK
This operator can write data to a relational database. Look at Database Feature for a detailed description.
Table of Contents | ||
---|---|---|
|