...
The following table shows the available metadata:
Metadata | Possible values |
---|---|
outputmode outputMode (important) |
|
persistent |
|
minInputPorts |
|
maxInputPorts |
|
...
The following table shows an overview of available event methods. Note that you can call methods of AbstractLogicalOperator or AbstractPipe in event methods, e.g. to send stream elements or punctuations.
Event method | Description | ||
---|---|---|---|
processNext(IStreamObject object, int port) | New stream element is received | ||
processPunctuation(IPunctuation punctuation, int port) | New punctuation is received | ||
processOpen() | Processing has started | ||
processClose() | Processing has stopped | ||
processDone() | No more stream elements are sent | ||
processDone(int port) | No more stream elements are sent from a source | ||
createOutputSchema (int port) | Output schema has to be created | ||
sourceSubscribed() | New source is subscribed | ||
sourceUnsubscribed() | Source is unsubscribed | ||
ao_init() |
| po_init() | Logical operator is initialized |
po_init() | Physical operator is initialized |
AO & PO methods
Besides validate and event methods it is also possible to create normal methods. The ao-keyword should be used if a method needs to call a method of AbstractLogicalOperator. The po-keyword should be used if a method needs to call a method of AbstractPipe.
Code Block | ||
---|---|---|
| ||
getOtherPort(int port) : int {
return (port+1)%2;
}
ao getBaseTimeUnit : TimeUnit {
SDFSchema schema = getInputSchema(0);
SDFConstraint constraint = schema.getConstraint(SDFConstraint::BASE_TIME_UNIT);
return constraint.value;
}
po ping(IPunctuation punctuation) {
sendPunctuation(punctuation);
} |
Grammar
Code Block | ||
---|---|---|
| ||
ODLModel ::= (Namespace)* (UserOperator | Class | Interface)*.
UserOperator ::= "operator" ID ("(" (Metadata ("," Metadata))? ")")? "{" (ODLAttribute | ODLMethod) "}".
ODLAttribute ::= (("optional")? "parameter")? ("(" (Metadata ("," Metadata))* ")")? Attribute.
ODLMethod ::= ("on" | "validate" | "override" | ("override")? "ao" | ("override")? "po")? (MethodDeclaration)? StatementBlock. |
Examples
Code Block | ||
---|---|---|
| ||
operator ODLProject(outputMode ="MODIFIED_INPUT", minInputPorts = 1, maxInputPorts = 1){
parameter(type=ResolvedSDFAttributeParameter) SDFAttribute[] attributes;
int[] restrictList;
on processOpen {
if (restrictList == null) {
restrictList = SDFSchema::calcRestrictList(getInputSchema(0), this.outputSchema);
}
}
on createOutputSchema(int port) : SDFSchema{
return SDFSchemaFactory::createNewWithAttributes(attributes, getInputSchema(0));
}
on processNext(Tuple tuple, int port) {
Tuple out = tuple.restrict(restrictList, false);
sendStreamElement(out);
}
on processPunctuation(IPunctuation punctuation, int port) {
sendPunctuation(punctuation);
}
} |
Code Block | ||
---|---|---|
| ||
operator ODLSelect(outputMode = "INPUT", minInputPorts = 1, maxInputPorts = 1){
parameter IPredicate predicate;
on processOpen {
predicate.init;
}
on processPunctuation(IPunctuation punctuation, int port) {
sendPunctuation(punctuation);
}
on processNext(Tuple tuple, int port) {
if (predicate.evaluate(tuple)) {
sendStreamElement(tuple);
}
}
} |
Code Block | ||
---|---|---|
| ||
operator ODLJoin(outputMode = "NEW_ELEMENT", minInputPorts = 2, maxInputPorts = 2){
optional parameter IPredicate predicate = TruePredicate::getInstance();
optional parameter(type = EnumParameter) Cardinalities card;
ITimeIntervalSweepArea[] areas = [];
IDataMergeFunction dataMerge;
ITransferArea transferFunction;
IMetadataMergeFunction metadataMerge;
on po_init {
String areaName = "TIJoinSA";
areas[0] = SweepAreaRegistry::getSweepArea(areaName);
areas[0].queryPredicate = predicate;
areas[1] = SweepAreaRegistry::getSweepArea(areaName);
areas[1].queryPredicate = predicate;
}
on processOpen {
if (dataMerge == null) {
dataMerge = new RelationalMergeFunction(this.outputSchema.size);
transferFunction = new TITransferArea();
List leftMeta = getInputSchema(0).metaAttributeNames;
List rightMeta = getInputSchema(1).metaAttributeNames;
metadataMerge = MetadataRegistry::getMergeFunction(leftMeta,rightMeta);
}
areas[0].clear;
areas[1].clear;
dataMerge.init;
transferFunction.init(this, this.subscribedToSource.size);
metadataMerge.init;
predicate.init;
}
on createOutputSchema(int arg0) : SDFSchema{
SDFSchema left = getSubscribedToSource(0).schema;
SDFSchema right = getSubscribedToSource(1).schema;
return SDFSchema::join(left, right);
}
on processNext(Tuple tuple, int port) {
transferFunction.newElement(tuple, port);
int otherPort = getOtherPort(port);
Order order = Order::fromOrdinal(port);
areas[otherPort].purgeElements(tuple, order);
boolean extract = false;
if (card != null) {
switch (card.toString) {
case "ONE_ONE":
extract = true;
break;
case "MANY_ONE":
extract = port == 1;
break;
case "ONE_MANY":
extract = port == 0;
break;
default:
break;
}
}
Iterator qualifies = areas[otherPort].queryCopy(tuple, order, extract);
boolean hit = qualifies.hasNext;
while(qualifies.hasNext) {
Tuple next = qualifies.next;
Tuple newElement = dataMerge.merge(tuple, next, metadataMerge, order);
transferFunction.transfer(newElement);
}
if (card == null || card == Cardinalities::MANY_MANY) {
areas[port].insert(tuple);
} else {
switch (card.toString) {
case "ONE_ONE":
if (!hit) {
areas[port].insert(tuple);
}
break;
case "ONE_MANY":
if (port == 0 || (port == 1 && !hit)) {
areas[port].insert(tuple);
}
break;
case "MANY_ONE":
if (port == 1 || (port == 0 && !hit)) {
areas[port].insert(tuple);
}
break;
default:
areas[port].insert(tuple);
break;
}
}
}
on processPunctuation(IPunctuation punctuation, int port) {
if (punctuation.isHeartbeat) {
areas[getOtherPort(port)].purgeElementsBefore(punctuation.time);
}
transferFunction.sendPunctuation(punctuation);
transferFunction.newElement(punctuation, port);
}
on processDone(int port) {
transferFunction.done(port);
}
po getOtherPort(int port) : int {
if (port == 0) {
return 1;
} else {
return 0;
}
}
} |