This document describes the steps to create a new user defined operator with ODL.
Name
First you need to choose a meaningful name for the new operator. Note, there is no other operator with the same name.
operator ODLSelect {
//...
}
Metadata
Next you can set metadata to configure the operator with static information.
operator ODLSelect(outputMode = "INPUT", minInputPorts = 1, maxInputPorts = 1){
//...
}
The following table shows the available metadata:
| Metadata | Possible values |
|---|---|
| outputMode (important) |
|
| persistent |
|
| minInputPorts |
|
| maxInputPorts |
|
Please look at the LogicalOperator-Interface of Odysseus for more available metadata (e.g. doc, url, category).
Parameter
You need to define parameters if the operator should be configurable while building a continuous query. A parameter can be optional and consists of a data type and a name.
operator ODLSelect(outputMode = "INPUT", minInputPorts = 1, maxInputPorts = 1){
parameter IPredicate predicate;
optional parameter int heartbeatrate;
//...
}
Note that not every data type can be used for a parameter because each data type must have a corresponding parameter class in Odysseus. The following table shows the available data types:
| Data type | Parameter class |
|---|---|
| String | StringParameter FileNameParameter HttpStringParameter |
| Boolean | BooleanParameter |
| Byte | ByteParameter |
| Integer | IntegerParameter |
| Double | DoubleParameter |
| Long | LongParameter |
| Resource | ResourceParameter AccessAOSourceParameter |
AggregateItem | AggregateItemParameter |
BitVector | BitVectorParameter |
RenameAttribute | CreateAndRenameSDFAttributeParameter |
SDFAttribute | CreateSDFAttributeParameter ResolvedSDFAttributeParameter |
File | ValidatedFileNameParameter |
IMetaAttribute | MetaAttributeParameter |
IPhysicalOperator | PhysicalOperatorParameter |
IPredicate | PredicateParameter |
NamedExpression | SDFExpressionParameter |
AccessAO | SourceParameter |
TimeValueItem | TimeParameter |
Some data types have more than one corresponding parameter class. In these cases you can optionally set a parameter class as a metadata of a parameter. Please look at the Parameter-Interface of Odysseus for more available metadata.
operator ODLProject(outputMode ="MODIFIED_INPUT", minInputPorts = 1, maxInputPorts = 1){
parameter(type=ResolvedSDFAttributeParameter) SDFAttribute[] attributes;
//...
}
Attributes
Besides parameters it is also possible to define attributes to store operator’s data.
operator ODLJoin(outputMode = "NEW_ELEMENT", minInputPorts = 2, maxInputPorts = 2){
ITimeIntervalSweepArea[] areas;
IDataMergeFunction dataMerge;
ITransferArea transferFunction;
IMetadataMergeFunction metadataMerge;
//...
}
Validate methods
Each parameter can have a validate method. This method must return whether the value of the parameter is correct.
operator ODLProject(outputMode ="MODIFIED_INPUT", minInputPorts = 1, maxInputPorts = 1){
validate attributes {
if (this.attributes.isEmpty) {
error("attributes have to be set");
return false;
}
return true;
}
//...
}
There is also a validate method which can be used if a parameter has dependencies to other parameters.
operator ODLProject(outputMode ="MODIFIED_INPUT", minInputPorts = 1, maxInputPorts = 1){
validate {
return true;
}
//...
}
Event methods
These methods are automatically called when events occur. The processNext-method is the most important event method and is called whenever a new stream element is received. The type of a stream element can be Tuple, KeyValueObject or other subtypes of IStreamObject.
operator ODLSelect(outputMode = "INPUT", minInputPorts = 1, maxInputPorts = 1){
on processNext(Tuple tuple, int port) {
if (predicate.evaluate(tuple)) {
sendStreamElement(tuple);
}
}
on processPunctuation(IPunctuation punctuation, int port) {
sendPunctuation(punctuation);
}
//...
}
The following table shows an overview of available event methods. Note that you can call methods of AbstractLogicalOperator or AbstractPipe in event methods, e.g. to send stream elements or punctuations.
| Event method | Description |
|---|---|
processNext(IStreamObject object, int port) | New stream element is received |
processPunctuation(IPunctuation punctuation, int port) | New punctuation is received |
processOpen() | Processing has started |
processClose() | Processing has stopped |
processDone() | No more stream elements are sent |
processDone(int port) | No more stream elements are sent from a source |
createOutputSchema (int port) | Output schema has to be created |
sourceSubscribed() | New source is subscribed |
sourceUnsubscribed() | Source is unsubscribed |
ao_init() | Logical operator is initialized |
po_init() | Physical operator is initialized |
AO & PO methods
Besides validate and event methods it is also possible to create normal methods. The ao-keyword should be used if a method needs to call a method of AbstractLogicalOperator. The po-keyword should be used if a method needs to call a method of AbstractPipe.
getOtherPort(int port) : int {
return (port+1)%2;
}
ao getBaseTimeUnit : TimeUnit {
SDFSchema schema = getInputSchema(0);
SDFConstraint constraint = schema.getConstraint(SDFConstraint::BASE_TIME_UNIT);
return constraint.value;
}
po ping(IPunctuation punctuation) {
sendPunctuation(punctuation);
}
Grammar
ODLModel ::= (Namespace)* (UserOperator | Class | Interface)*.
UserOperator ::= "operator" ID ("(" (Metadata ("," Metadata))? ")")? "{" (ODLAttribute | ODLMethod) "}".
ODLAttribute ::= (("optional")? "parameter")? ("(" (Metadata ("," Metadata))* ")")? Attribute.
ODLMethod ::= ("on" | "validate" | "override" | ("override")? "ao" | ("override")? "po")? (MethodDeclaration)? StatementBlock.
Examples
operator ODLProject(outputMode ="MODIFIED_INPUT", minInputPorts = 1, maxInputPorts = 1){
parameter(type=ResolvedSDFAttributeParameter) SDFAttribute[] attributes;
int[] restrictList;
on processOpen {
if (restrictList == null) {
restrictList = SDFSchema::calcRestrictList(getInputSchema(0), this.outputSchema);
}
}
on createOutputSchema(int port) : SDFSchema{
return SDFSchemaFactory::createNewWithAttributes(attributes, getInputSchema(0));
}
on processNext(Tuple tuple, int port) {
Tuple out = tuple.restrict(restrictList, false);
sendStreamElement(out);
}
on processPunctuation(IPunctuation punctuation, int port) {
sendPunctuation(punctuation);
}
}
operator ODLSelect(outputMode = "INPUT", minInputPorts = 1, maxInputPorts = 1){
parameter IPredicate predicate;
on processOpen {
predicate.init;
}
on processPunctuation(IPunctuation punctuation, int port) {
sendPunctuation(punctuation);
}
on processNext(Tuple tuple, int port) {
if (predicate.evaluate(tuple)) {
sendStreamElement(tuple);
}
}
}
operator ODLJoin(outputMode = "NEW_ELEMENT", minInputPorts = 2, maxInputPorts = 2){
optional parameter IPredicate predicate = TruePredicate::getInstance();
optional parameter(type = EnumParameter) Cardinalities card;
ITimeIntervalSweepArea[] areas = [];
IDataMergeFunction dataMerge;
ITransferArea transferFunction;
IMetadataMergeFunction metadataMerge;
on po_init {
String areaName = "TIJoinSA";
areas[0] = SweepAreaRegistry::getSweepArea(areaName);
areas[0].queryPredicate = predicate;
areas[1] = SweepAreaRegistry::getSweepArea(areaName);
areas[1].queryPredicate = predicate;
}
on processOpen {
if (dataMerge == null) {
dataMerge = new RelationalMergeFunction(this.outputSchema.size);
transferFunction = new TITransferArea();
List leftMeta = getInputSchema(0).metaAttributeNames;
List rightMeta = getInputSchema(1).metaAttributeNames;
metadataMerge = MetadataRegistry::getMergeFunction(leftMeta,rightMeta);
}
areas[0].clear;
areas[1].clear;
dataMerge.init;
transferFunction.init(this, this.subscribedToSource.size);
metadataMerge.init;
predicate.init;
}
on createOutputSchema(int arg0) : SDFSchema{
SDFSchema left = getSubscribedToSource(0).schema;
SDFSchema right = getSubscribedToSource(1).schema;
return SDFSchema::join(left, right);
}
on processNext(Tuple tuple, int port) {
transferFunction.newElement(tuple, port);
int otherPort = getOtherPort(port);
Order order = Order::fromOrdinal(port);
areas[otherPort].purgeElements(tuple, order);
boolean extract = false;
if (card != null) {
switch (card.toString) {
case "ONE_ONE":
extract = true;
break;
case "MANY_ONE":
extract = port == 1;
break;
case "ONE_MANY":
extract = port == 0;
break;
default:
break;
}
}
Iterator qualifies = areas[otherPort].queryCopy(tuple, order, extract);
boolean hit = qualifies.hasNext;
while(qualifies.hasNext) {
Tuple next = qualifies.next;
Tuple newElement = dataMerge.merge(tuple, next, metadataMerge, order);
transferFunction.transfer(newElement);
}
if (card == null || card == Cardinalities::MANY_MANY) {
areas[port].insert(tuple);
} else {
switch (card.toString) {
case "ONE_ONE":
if (!hit) {
areas[port].insert(tuple);
}
break;
case "ONE_MANY":
if (port == 0 || (port == 1 && !hit)) {
areas[port].insert(tuple);
}
break;
case "MANY_ONE":
if (port == 1 || (port == 0 && !hit)) {
areas[port].insert(tuple);
}
break;
default:
areas[port].insert(tuple);
break;
}
}
}
on processPunctuation(IPunctuation punctuation, int port) {
if (punctuation.isHeartbeat) {
areas[getOtherPort(port)].purgeElementsBefore(punctuation.time);
}
transferFunction.sendPunctuation(punctuation);
transferFunction.newElement(punctuation, port);
}
on processDone(int port) {
transferFunction.done(port);
}
po getOtherPort(int port) : int {
if (port == 0) {
return 1;
} else {
return 0;
}
}
}