...
The following table shows the available metadata:
Metadata | Possible values |
---|---|
outputmode outputMode (important) |
|
persistent |
|
minInputPorts |
|
maxInputPorts |
|
...
Event method | Description |
---|---|
processNext(IStreamObject object, int port) | New stream element is received |
processPunctuation(IPunctuation punctuation, int port) | New punctuation is received |
processOpen() | Processing has started |
processClose() | Processing has stopped |
processDone() | No more stream elements are sent |
processDone(int port) | No more stream elements are sent from a source |
createOutputSchema (int port) | Output schema has to be created |
sourceSubscribed() | New source is subscribed |
sourceUnsubscribed() | Source is unsubscribed |
ao_init() | Logical operator is initialized |
po_init() | Physical operator is initialized |
AO & PO methods
Besides validate and event methods it is also possible to create norma’ normal methods. The ao-keyword should be used if a method needs to call a method of AbstractLogicalOperator. The po-keyword should be used if a method needs to call a method of AbstractPipe.
Code Block | ||
---|---|---|
| ||
getOtherPort(int port) : int { return (port+1)%2; } ao getBaseTimeUnit : TimeUnit { SDFSchema schema = getInputSchema(0); SDFConstraint constraint = schema.getConstraint(SDFConstraint::BASE_TIME_UNIT); return constraint.value; } po ping(IPunctuation punctuation) { sendPunctuation(punctuation); } |
Grammar
Code Block | ||
---|---|---|
| ||
ODLModel ::= (Namespace)* (UserOperator | Class | Interface)*.
UserOperator ::= "operator" ID ("(" (Metadata ("," Metadata))? ")")? "{" (ODLAttribute | ODLMethod) "}".
ODLAttribute ::= (("optional")? "parameter")? ("(" (Metadata ("," Metadata))* ")")? Attribute.
ODLMethod ::= ("on" | "validate" | "override" | ("override")? "ao" | ("override")? "po")? (MethodDeclaration)? StatementBlock. |
Examples
Code Block | ||
---|---|---|
| ||
operator ODLProject(outputMode ="MODIFIED_INPUT", minInputPorts = 1, maxInputPorts = 1){
parameter(type=ResolvedSDFAttributeParameter) SDFAttribute[] attributes;
int[] restrictList;
on processOpen {
if (restrictList == null) {
restrictList = SDFSchema::calcRestrictList(getInputSchema(0), this.outputSchema);
}
}
on createOutputSchema(int port) : SDFSchema{
return SDFSchemaFactory::createNewWithAttributes(attributes, getInputSchema(0));
}
on processNext(Tuple tuple, int port) {
Tuple out = tuple.restrict(restrictList, false);
sendStreamElement(out);
}
on processPunctuation(IPunctuation punctuation, int port) {
sendPunctuation(punctuation);
}
} |
Code Block | ||
---|---|---|
| ||
operator ODLSelect(outputMode = "INPUT", minInputPorts = 1, maxInputPorts = 1){ parameter IPredicate predicate; on processOpen { predicate.init; } on processPunctuation(IPunctuation punctuation, int port) { sendPunctuation(punctuation); } on processNext(Tuple tuple, int port) { if (predicate.evaluate(tuple)) { sendStreamElement(tuple); } } } |
Code Block | ||
---|---|---|
| ||
operator ODLJoin(outputMode = "NEW_ELEMENT", minInputPorts = 2, maxInputPorts = 2){ optional parameter IPredicate predicate = TruePredicate::getInstance(); optional parameter(type = EnumParameter) Cardinalities card; ITimeIntervalSweepArea[] areas = []; IDataMergeFunction dataMerge; ITransferArea transferFunction; IMetadataMergeFunction metadataMerge; on po pint(IPunctuation punctuation) { sendPunctuation(punctuation); _init { String areaName = "TIJoinSA"; areas[0] = SweepAreaRegistry::getSweepArea(areaName); areas[0].queryPredicate = predicate; areas[1] = SweepAreaRegistry::getSweepArea(areaName); areas[1].queryPredicate = predicate; } on processOpen { if (dataMerge == null) { dataMerge = new RelationalMergeFunction(this.outputSchema.size); transferFunction = new TITransferArea(); List leftMeta = getInputSchema(0).metaAttributeNames; List rightMeta = getInputSchema(1).metaAttributeNames; metadataMerge = MetadataRegistry::getMergeFunction(leftMeta,rightMeta); } areas[0].clear; areas[1].clear; dataMerge.init; transferFunction.init(this, this.subscribedToSource.size); metadataMerge.init; predicate.init; } on createOutputSchema(int arg0) : SDFSchema{ SDFSchema left = getSubscribedToSource(0).schema; SDFSchema right = getSubscribedToSource(1).schema; return SDFSchema::join(left, right); } on processNext(Tuple tuple, int port) { transferFunction.newElement(tuple, port); int otherPort = getOtherPort(port); Order order = Order::fromOrdinal(port); areas[otherPort].purgeElements(tuple, order); boolean extract = false; if (card != null) { switch (card.toString) { case "ONE_ONE": extract = true; break; case "MANY_ONE": extract = port == 1; break; case "ONE_MANY": extract = port == 0; break; default: break; } } Iterator qualifies = areas[otherPort].queryCopy(tuple, order, extract); boolean hit = qualifies.hasNext; while(qualifies.hasNext) { Tuple next = qualifies.next; Tuple newElement = dataMerge.merge(tuple, next, metadataMerge, order); transferFunction.transfer(newElement); } if (card == null || card == Cardinalities::MANY_MANY) { areas[port].insert(tuple); } else { switch (card.toString) { case "ONE_ONE": if (!hit) { areas[port].insert(tuple); } break; case "ONE_MANY": if (port == 0 || (port == 1 && !hit)) { areas[port].insert(tuple); } break; case "MANY_ONE": if (port == 1 || (port == 0 && !hit)) { areas[port].insert(tuple); } break; default: areas[port].insert(tuple); break; } } } on processPunctuation(IPunctuation punctuation, int port) { if (punctuation.isHeartbeat) { areas[getOtherPort(port)].purgeElementsBefore(punctuation.time); } transferFunction.sendPunctuation(punctuation); transferFunction.newElement(punctuation, port); } on processDone(int port) { transferFunction.done(port); } po getOtherPort(int port) : int { if (port == 0) { return 1; } else { return 0; } } } |