Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The following table shows the available metadata:

MetadataPossible values
outputmode outputMode (important)
  • "INPUT"
    • read element wil not be modified
  • "MODIFIED_INPUT" (default)
    • read element will be modified
  • "NEW_ELEMENT"
    • operator creates a new element
persistent
  • true
    • operator will be automatically added to operator framework after each start of Odysseus
  • false (default)
minInputPorts
  • Integer-Value (default 1)
    • Minimal number of ports that the operator needs
maxInputPorts
  • Integer-Value (default 2147483647)
    • Maximum number of ports that the operator needs

...

Event methodDescription

processNext(IStreamObject object, int port)

New stream element is received 

processPunctuation(IPunctuation punctuation, int port)

New punctuation is received 

processOpen()

 Processing has started

processClose()

 Processing has stopped

processDone() 

No more stream elements are sent

processDone(int port) 

No more stream elements are sent from a source

createOutputSchema (int port) 

Output schema has to be created

sourceSubscribed() 

New source is subscribed

sourceUnsubscribed()

Source is unsubscribed 

ao_init()

 Logical operator is initialized

po_init()

 Physical operator is initialized

AO & PO methods

Besides validate and event methods it is also possible to create norma’ normal methods. The ao-keyword should be used if a method needs to call a method of AbstractLogicalOperator. The po-keyword should be used if a method needs to call a method of AbstractPipe.

Code Block
titleExample
getOtherPort(int port) : int {
	return (port+1)%2;
}
 
ao getBaseTimeUnit : TimeUnit {
	SDFSchema schema = getInputSchema(0);
	SDFConstraint constraint = schema.getConstraint(SDFConstraint::BASE_TIME_UNIT);
	return constraint.value;
}
 
po pintping(IPunctuation punctuation) {
	sendPunctuation(punctuation);
}

Grammar

Code Block
titleGrammar
ODLModel               ::= (Namespace)* (UserOperator | Class | Interface)*.                         
UserOperator           ::= "operator" ID ("(" (Metadata ("," Metadata))? ")")? "{" (ODLAttribute | ODLMethod) "}".
ODLAttribute           ::= (("optional")? "parameter")? ("(" (Metadata ("," Metadata))* ")")? Attribute.
ODLMethod              ::= ("on" | "validate" | "override" | ("override")? "ao" | ("override")? "po")? (MethodDeclaration)? StatementBlock.

Examples

Code Block
titleODLProject
operator ODLProject(outputMode ="MODIFIED_INPUT", minInputPorts = 1, maxInputPorts = 1){    
    
    parameter(type=ResolvedSDFAttributeParameter) SDFAttribute[] attributes;
        
    int[] restrictList;    
        
    on processOpen {
        if (restrictList == null) {
            restrictList = SDFSchema::calcRestrictList(getInputSchema(0), this.outputSchema);
        }
    }
    
    on createOutputSchema(int port) : SDFSchema{
        return SDFSchemaFactory::createNewWithAttributes(attributes, getInputSchema(0));        
    }
    
    on processNext(Tuple tuple, int port) {
        Tuple out = tuple.restrict(restrictList, false);                
        sendStreamElement(out);        
    }    
    
    on processPunctuation(IPunctuation punctuation, int port) {
        sendPunctuation(punctuation);        
    }             
}
Code Block
titleODLSelect
operator ODLSelect(outputMode = "INPUT", minInputPorts = 1, maxInputPorts = 1){
    
    parameter IPredicate predicate;
    
    on processOpen {
        predicate.init;
    }    
    
    on processPunctuation(IPunctuation punctuation, int port) {
        sendPunctuation(punctuation);        
    }    
    
    on processNext(Tuple tuple, int port) {
        if (predicate.evaluate(tuple)) {
            sendStreamElement(tuple);
        }
    }
}
Code Block
titleODLJoin
operator ODLJoin(outputMode = "NEW_ELEMENT", minInputPorts = 2, maxInputPorts = 2){    
    
    optional parameter IPredicate predicate = TruePredicate::getInstance();        
    
    optional parameter(type = EnumParameter) Cardinalities card;
        
    ITimeIntervalSweepArea[] areas = [];    
    
    IDataMergeFunction dataMerge;
    
    ITransferArea transferFunction;        
    
    IMetadataMergeFunction metadataMerge;
        
    on po_init {        
        String areaName = "TIJoinSA";
        areas[0] = SweepAreaRegistry::getSweepArea(areaName);        
        areas[0].queryPredicate = predicate;        
                            
        areas[1] = SweepAreaRegistry::getSweepArea(areaName);
        areas[1].queryPredicate = predicate;
    }    
            
    on processOpen {
        if (dataMerge == null) {
            dataMerge = new RelationalMergeFunction(this.outputSchema.size);        
            transferFunction = new TITransferArea();
            List leftMeta = getInputSchema(0).metaAttributeNames;
            List rightMeta = getInputSchema(1).metaAttributeNames;
            metadataMerge = MetadataRegistry::getMergeFunction(leftMeta,rightMeta);            
        }
        areas[0].clear;
        areas[1].clear;        
        dataMerge.init;                
        transferFunction.init(this, this.subscribedToSource.size);        
        metadataMerge.init;            
        predicate.init;        
    }
    
    on createOutputSchema(int arg0) : SDFSchema{
        SDFSchema left = getSubscribedToSource(0).schema;
        SDFSchema right = getSubscribedToSource(1).schema;        
        return SDFSchema::join(left, right);
    }
    
    on processNext(Tuple tuple, int port) {
        transferFunction.newElement(tuple, port);                
        int otherPort = getOtherPort(port);                    
        Order order = Order::fromOrdinal(port);                
        areas[otherPort].purgeElements(tuple, order);
        boolean extract = false;
        if (card != null) {
            switch (card.toString) {
                case "ONE_ONE":
                    extract = true;
                    break;
                case "MANY_ONE":
                    extract = port == 1;
                    break;
                case "ONE_MANY":
                    extract = port == 0;
                    break;
                default:
                    break;
            }
        }
                    
        Iterator qualifies = areas[otherPort].queryCopy(tuple, order, extract);
        
        boolean hit = qualifies.hasNext;        
        while(qualifies.hasNext) {
            Tuple next = qualifies.next;
            Tuple newElement = dataMerge.merge(tuple, next, metadataMerge, order);
            transferFunction.transfer(newElement);
        }        
                
        if (card == null || card == Cardinalities::MANY_MANY) {
                areas[port].insert(tuple);
        } else {
            switch (card.toString) {
            case "ONE_ONE":
                if (!hit) {
                    areas[port].insert(tuple);
                }
                break;
            case "ONE_MANY":
                if (port == 0 || (port == 1 && !hit)) {
                    areas[port].insert(tuple);
                }
                break;
            case "MANY_ONE":
                if (port == 1 || (port == 0 && !hit)) {
                    areas[port].insert(tuple);
                }
                break;
            default:
                areas[port].insert(tuple);
                break;
            }
        }
    }
    
    on processPunctuation(IPunctuation punctuation, int port) {
        if (punctuation.isHeartbeat) {
            areas[getOtherPort(port)].purgeElementsBefore(punctuation.time);        
        }
        transferFunction.sendPunctuation(punctuation);
        transferFunction.newElement(punctuation, port);
    }
    
    on processDone(int port) {
        transferFunction.done(port);
    }
    
    po getOtherPort(int port) : int {
        if (port == 0) {
            return 1;
        } else {
            return 0;
        }
    }
}