...
Code Block | ||
---|---|---|
| ||
ODLModel ::= (Namespace)* (UserOperator | Class | Interface)*. UserOperator ::= "operator" ID ("(" (Metadata ("," Metadata))? ")")? "{" (ODLAttribute | ODLMethod) "}". ODLAttribute ::= (("optional")? "parameter")? ("(" (Metadata ("," Metadata))? ")")? Attribute. ODLMethod ::= ("on" | "validate" | "override" | ("override")? "ao" | ("override")? "po")? (MethodDeclaration)? StatementBlock. |
Examples
Code Block | ||
---|---|---|
| ||
operator ODLProject(outputMode ="MODIFIED_INPUT", minInputPorts = 1, maxInputPorts = 1){
parameter(type=ResolvedSDFAttributeParameter) SDFAttribute[] attributes;
int[] restrictList;
on processOpen {
if (restrictList == null) {
restrictList = SDFSchema::calcRestrictList(getInputSchema(0), this.outputSchema);
}
}
on createOutputSchema(int port) : SDFSchema{
return SDFSchemaFactory::createNewWithAttributes(attributes, getInputSchema(0));
}
on processNext(Tuple tuple, int port) {
Tuple out = tuple.restrict(restrictList, false);
sendStreamElement(out);
}
on processPunctuation(IPunctuation punctuation, int port) {
sendPunctuation(punctuation);
}
} |
Code Block | ||
---|---|---|
| ||
operator ODLSelect(outputMode = "INPUT", minInputPorts = 1, maxInputPorts = 1){
parameter IPredicate predicate;
on processOpen {
predicate.init;
}
on processPunctuation(IPunctuation punctuation, int port) {
sendPunctuation(punctuation);
}
on processNext(Tuple tuple, int port) {
if (predicate.evaluate(tuple)) {
sendStreamElement(tuple);
}
}
} |
Code Block | ||
---|---|---|
| ||
operator ODLJoin(outputMode = "NEW_ELEMENT", minInputPorts = 2, maxInputPorts = 2){
optional parameter IPredicate predicate = TruePredicate::getInstance();
optional parameter(type = EnumParameter) Cardinalities card;
ITimeIntervalSweepArea[] areas = [];
IDataMergeFunction dataMerge;
ITransferArea transferFunction;
IMetadataMergeFunction metadataMerge;
on po_init {
String areaName = "TIJoinSA";
areas[0] = SweepAreaRegistry::getSweepArea(areaName);
areas[0].queryPredicate = predicate;
areas[1] = SweepAreaRegistry::getSweepArea(areaName);
areas[1].queryPredicate = predicate;
}
on processOpen {
if (dataMerge == null) {
dataMerge = new RelationalMergeFunction(this.outputSchema.size);
transferFunction = new TITransferArea();
List leftMeta = getInputSchema(0).metaAttributeNames;
List rightMeta = getInputSchema(1).metaAttributeNames;
metadataMerge = MetadataRegistry::getMergeFunction(leftMeta,rightMeta);
}
areas[0].clear;
areas[1].clear;
dataMerge.init;
transferFunction.init(this, this.subscribedToSource.size);
metadataMerge.init;
predicate.init;
}
on createOutputSchema(int arg0) : SDFSchema{
SDFSchema left = getSubscribedToSource(0).schema;
SDFSchema right = getSubscribedToSource(1).schema;
return SDFSchema::join(left, right);
}
on processNext(Tuple tuple, int port) {
transferFunction.newElement(tuple, port);
int otherPort = getOtherPort(port);
Order order = Order::fromOrdinal(port);
areas[otherPort].purgeElements(tuple, order);
boolean extract = false;
if (card != null) {
switch (card.toString) {
case "ONE_ONE":
extract = true;
break;
case "MANY_ONE":
extract = port == 1;
break;
case "ONE_MANY":
extract = port == 0;
break;
default:
break;
}
}
Iterator qualifies = areas[otherPort].queryCopy(tuple, order, extract);
boolean hit = qualifies.hasNext;
while(qualifies.hasNext) {
Tuple next = qualifies.next;
Tuple newElement = dataMerge.merge(tuple, next, metadataMerge, order);
transferFunction.transfer(newElement);
}
if (card == null || card == Cardinalities::MANY_MANY) {
areas[port].insert(tuple);
} else {
switch (card.toString) {
case "ONE_ONE":
if (!hit) {
areas[port].insert(tuple);
}
break;
case "ONE_MANY":
if (port == 0 || (port == 1 && !hit)) {
areas[port].insert(tuple);
}
break;
case "MANY_ONE":
if (port == 1 || (port == 0 && !hit)) {
areas[port].insert(tuple);
}
break;
default:
areas[port].insert(tuple);
break;
}
}
}
on processPunctuation(IPunctuation punctuation, int port) {
if (punctuation.isHeartbeat) {
areas[getOtherPort(port)].purgeElementsBefore(punctuation.time);
}
transferFunction.sendPunctuation(punctuation);
transferFunction.newElement(punctuation, port);
}
on processDone(int port) {
transferFunction.done(port);
}
po getOtherPort(int port) : int {
if (port == 0) {
return 1;
} else {
return 0;
}
}
} |