Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
titleGrammar
ODLModel               ::= (Namespace)* (UserOperator | Class | Interface)*.                         
UserOperator           ::= "operator" ID ("(" (Metadata ("," Metadata))? ")")? "{" (ODLAttribute | ODLMethod) "}".
ODLAttribute           ::= (("optional")? "parameter")? ("(" (Metadata ("," Metadata))? ")")? Attribute.
ODLMethod              ::= ("on" | "validate" | "override" | ("override")? "ao" | ("override")? "po")? (MethodDeclaration)? StatementBlock.

Examples

Code Block
titleODLProject
operator ODLProject(outputMode ="MODIFIED_INPUT", minInputPorts = 1, maxInputPorts = 1){    
    
    parameter(type=ResolvedSDFAttributeParameter) SDFAttribute[] attributes;
        
    int[] restrictList;    
        
    on processOpen {
        if (restrictList == null) {
            restrictList = SDFSchema::calcRestrictList(getInputSchema(0), this.outputSchema);
        }
    }
    
    on createOutputSchema(int port) : SDFSchema{
        return SDFSchemaFactory::createNewWithAttributes(attributes, getInputSchema(0));        
    }
    
    on processNext(Tuple tuple, int port) {
        Tuple out = tuple.restrict(restrictList, false);                
        sendStreamElement(out);        
    }    
    
    on processPunctuation(IPunctuation punctuation, int port) {
        sendPunctuation(punctuation);        
    }             
}
Code Block
titleODLSelect
operator ODLSelect(outputMode = "INPUT", minInputPorts = 1, maxInputPorts = 1){
    
    parameter IPredicate predicate;
    
    on processOpen {
        predicate.init;
    }    
    
    on processPunctuation(IPunctuation punctuation, int port) {
        sendPunctuation(punctuation);        
    }    
    
    on processNext(Tuple tuple, int port) {
        if (predicate.evaluate(tuple)) {
            sendStreamElement(tuple);
        }
    }
}
Code Block
titleODLJoin
operator ODLJoin(outputMode = "NEW_ELEMENT", minInputPorts = 2, maxInputPorts = 2){    
    
    optional parameter IPredicate predicate = TruePredicate::getInstance();        
    
    optional parameter(type = EnumParameter) Cardinalities card;
        
    ITimeIntervalSweepArea[] areas = [];    
    
    IDataMergeFunction dataMerge;
    
    ITransferArea transferFunction;        
    
    IMetadataMergeFunction metadataMerge;
        
    on po_init {        
        String areaName = "TIJoinSA";
        areas[0] = SweepAreaRegistry::getSweepArea(areaName);        
        areas[0].queryPredicate = predicate;        
                            
        areas[1] = SweepAreaRegistry::getSweepArea(areaName);
        areas[1].queryPredicate = predicate;
    }    
            
    on processOpen {
        if (dataMerge == null) {
            dataMerge = new RelationalMergeFunction(this.outputSchema.size);        
            transferFunction = new TITransferArea();
            List leftMeta = getInputSchema(0).metaAttributeNames;
            List rightMeta = getInputSchema(1).metaAttributeNames;
            metadataMerge = MetadataRegistry::getMergeFunction(leftMeta,rightMeta);            
        }
        areas[0].clear;
        areas[1].clear;        
        dataMerge.init;                
        transferFunction.init(this, this.subscribedToSource.size);        
        metadataMerge.init;            
        predicate.init;        
    }
    
    on createOutputSchema(int arg0) : SDFSchema{
        SDFSchema left = getSubscribedToSource(0).schema;
        SDFSchema right = getSubscribedToSource(1).schema;        
        return SDFSchema::join(left, right);
    }
    
    on processNext(Tuple tuple, int port) {
        transferFunction.newElement(tuple, port);                
        int otherPort = getOtherPort(port);                    
        Order order = Order::fromOrdinal(port);                
        areas[otherPort].purgeElements(tuple, order);
        boolean extract = false;
        if (card != null) {
            switch (card.toString) {
                case "ONE_ONE":
                    extract = true;
                    break;
                case "MANY_ONE":
                    extract = port == 1;
                    break;
                case "ONE_MANY":
                    extract = port == 0;
                    break;
                default:
                    break;
            }
        }
                    
        Iterator qualifies = areas[otherPort].queryCopy(tuple, order, extract);
        
        boolean hit = qualifies.hasNext;        
        while(qualifies.hasNext) {
            Tuple next = qualifies.next;
            Tuple newElement = dataMerge.merge(tuple, next, metadataMerge, order);
            transferFunction.transfer(newElement);
        }        
                
        if (card == null || card == Cardinalities::MANY_MANY) {
                areas[port].insert(tuple);
        } else {
            switch (card.toString) {
            case "ONE_ONE":
                if (!hit) {
                    areas[port].insert(tuple);
                }
                break;
            case "ONE_MANY":
                if (port == 0 || (port == 1 && !hit)) {
                    areas[port].insert(tuple);
                }
                break;
            case "MANY_ONE":
                if (port == 1 || (port == 0 && !hit)) {
                    areas[port].insert(tuple);
                }
                break;
            default:
                areas[port].insert(tuple);
                break;
            }
        }
    }
    
    on processPunctuation(IPunctuation punctuation, int port) {
        if (punctuation.isHeartbeat) {
            areas[getOtherPort(port)].purgeElementsBefore(punctuation.time);        
        }
        transferFunction.sendPunctuation(punctuation);
        transferFunction.newElement(punctuation, port);
    }
    
    on processDone(int port) {
        transferFunction.done(port);
    }
    
    po getOtherPort(int port) : int {
        if (port == 0) {
            return 1;
        } else {
            return 0;
        }
    }
}