This document describes the steps to create a new user defined operator with ODL.
Name
First you need to choose a meaningful name for the new operator. Note, there is no other operator with the same name.
operator ODLSelect { //... }
Metadata
Next you can set metadata to configure the operator with static information.
operator ODLSelect(outputMode = "INPUT", minInputPorts = 1, maxInputPorts = 1){ //... }
The following table shows the available metadata:
Metadata | Possible values |
---|---|
outputmode (important) |
|
persistent |
|
minInputPorts |
|
maxInputPorts |
|
Please look at the LogicalOperator-Interface of Odysseus for more available metadata (e.g. doc, url, category).
Parameter
You need to define parameters if the operator should be configurable while building a continuous query. A parameter can be optional and consists of a data type and a name.
operator ODLSelect(outputMode = "INPUT", minInputPorts = 1, maxInputPorts = 1){ parameter IPredicate predicate; optional parameter int heartbeatrate; //... }
Note that not every data type can be used for a parameter because each data type must have a corresponding parameter class in Odysseus. The following table shows the available data types:
Data type | Parameter class |
---|---|
String | StringParameter FileNameParameter HttpStringParameter |
Boolean | BooleanParameter |
Byte | ByteParameter |
Integer | IntegerParameter |
Double | DoubleParameter |
Long | LongParameter |
Resource | ResourceParameter AccessAOSourceParameter |
AggregateItem | AggregateItemParameter |
BitVector | BitVectorParameter |
RenameAttribute | CreateAndRenameSDFAttributeParameter |
SDFAttribute | CreateSDFAttributeParameter ResolvedSDFAttributeParameter |
File | ValidatedFileNameParameter |
IMetaAttribute | MetaAttributeParameter |
IPhysicalOperator | PhysicalOperatorParameter |
IPredicate | PredicateParameter |
NamedExpression | SDFExpressionParameter |
AccessAO | SourceParameter |
TimeValueItem | TimeParameter |
Some data types have more than one corresponding parameter class. In these cases you can optionally set a parameter class as a metadata of a parameter. Please look at the Parameter-Interface of Odysseus for more available metadata.
operator ODLProject(outputMode ="MODIFIED_INPUT", minInputPorts = 1, maxInputPorts = 1){ parameter(type=ResolvedSDFAttributeParameter) SDFAttribute[] attributes; //... }
Attributes
Besides parameters it is also possible to define attributes to store operator’s data.
operator ODLJoin(outputMode = "NEW_ELEMENT", minInputPorts = 2, maxInputPorts = 2){ ITimeIntervalSweepArea[] areas; IDataMergeFunction dataMerge; ITransferArea transferFunction; IMetadataMergeFunction metadataMerge; //... }
Validate methods
Each parameter can have a validate method. This method must return whether the value of the parameter is correct.
operator ODLProject(outputMode ="MODIFIED_INPUT", minInputPorts = 1, maxInputPorts = 1){ validate attributes { if (this.attributes.isEmpty) { error("attributes have to be set"); return false; } return true; } //... }
There is also a validate method which can be used if a parameter has dependencies to other parameters.
operator ODLProject(outputMode ="MODIFIED_INPUT", minInputPorts = 1, maxInputPorts = 1){ validate { return true; } //... }
Event methods
These methods are automatically called when events occur. The processNext-method is the most important event method and is called whenever a new stream element is received. The type of a stream element can be Tuple, KeyValueObject or other subtypes of IStreamObject.
operator ODLSelect(outputMode = "INPUT", minInputPorts = 1, maxInputPorts = 1){ on processNext(Tuple tuple, int port) { if (predicate.evaluate(tuple)) { sendStreamElement(tuple); } } on processPunctuation(IPunctuation punctuation, int port) { sendPunctuation(punctuation); } //... }
The following table shows an overview of available event methods.
Event method | Description |
---|---|
processNext(IStreamObject object, int port) |
|
processPunctuation(IPunctuation punctuation, int port) |
|
processOpen() |
|
processClose() |
|
processDone() |
|
processDone(int port) |
|
createOutputSchema (int port) |
|
sourceSubscribed() |
|
sourceUnsubscribed() |
|
ao_init() |
|
po_init() |
|