Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Remark: This example adds an operator to an existing bundle. See Add a new Bundle if a new bundles should be created.

Important: See below if the output schema of the operator is different than the input schema.

Example: Route operator

In the following we will use the route operator to demonstrate how easy the integration of new operators in Odysseus is. The route operator is a more general form of the selection operator. It needs a number of predicates. If the first predicate evaluates on the incoming data to true, the input will be send to first output (port 0), if the seconds evaluates to true to the seconds output and finally, if no predicate evaluates to true to the last output port.

...

First, you need to decide if you want to place the operator in a new OSGi bundle or in an existing one. Independently of that, the operator must be placed in a package ending with logicaloperator if it should be integrated automatically.
To create your own logical operator you need to extend AbstractLogicalOperator (which implements ILogicalOperator). For convenience reasons there are base implementations for operators with single input (UnaryLogicalOp) and binary input (BinaryLogicalOp). As a naming convention, the class name should end with AO (for algebra operator).
This class must provide (at least) two constructors and the clone method. The default constructor is required as instances of logical operators are created by newInstance(). Clone must call the copy constructor and the copy constructor must call the super copy constructor! If at runtime an error like "has no owner" is called, in most cases this is because of the missing call to the super copy constructor.
Finally, the class needs setters and getters for the parameter it should keep

Code Block
language
languagejava
themeEclipse
javatitleRouteAO
linenumberstrue
package de.uniol.inf.is.odysseus.core.server.logicaloperator;

import java.util.List;

import de.uniol.inf.is.odysseus.core.predicate.IPredicate;
import de.uniol.inf.is.odysseus.core.server.logicaloperator.annotations.LogicalOperator;
import de.uniol.inf.is.odysseus.core.server.logicaloperator.annotations.Parameter;
import de.uniol.inf.is.odysseus.core.server.logicaloperator.builder.PredicateParameter;

@LogicalOperator(name="ROUTE", minInputPorts=1, maxInputPorts=1, doc = "My operator is doing ...", url = "http://example.com/MyOperator.html", category = { "Category" })
public class RouteAO extends UnaryLogicalOp {

    private static final long serialVersionUID = -8015847502104587689L;
    
    public RouteAO(){
        super();
    }
    
    public RouteAO(RouteAO routeAO){
        super(routeAO);
    }

    @Override
    @Parameter(type=PredicateParameter.class, isList=true, optional = false, doc = "This parameter sets the selection predicate")
    public void setPredicates(List<IPredicate<?>> predicates) {
        super.setPredicates(predicates);
    }
    
    @Override
    public AbstractLogicalOperator clone() {
        return new RouteAO(this);
    }
    
}

...

For the easy integration of new logical operators into to PQL query language, annotations should be used. Another way is to extend AbstractOperatorBuilder. In these annotations the name of the operator and number of minimal and maximal inputs and the parameters are described. In addition, the annotations may include some documentation and a URL for further information about the operator. These information are also available in the Odysseus Studio GUI.
On the right side, the whole implementation of the RouteAO can be found. Because predicates need to be initialized before the processing, they should be saved by the AbstractLogicalOperator.
Further information about the annotations can be found in the PQL documentation.

Step 2: Create the physical operator

Until now, only descriptive information about the new operator is given. Next the concrete implementation of its functionality needs to be provided.
To create an own physical operator, AbstractPipe<R,W> needs to be extended. To allow generic implementations we utilize the Java generics approach. R is the type of the objects that are read by the operator and W is the type of the objects that are written. Although in most cases, this will be the same class (e.g. Tuple) some operators may read other objects than they write.
The following methods need to be overwritten:

  • getOutputMode(): Needed for locking, Goal: Reduce object copies
    • INPUT: read element will not be modified (e.g. selection)
    • MODIFIED_INPUT: read element will be modified (e.g. projection)
    • NEW_ELEMENT: operator creates a new element (e.g. join)

Changing the output schema

For many operators the input schema and the output schema are the same but often you need to create you own output schema. For this, overwrite the method getInputSchemaInternal(port).

If you just want to merge the input from the left and the right, you just can use the method as in the following (from EnrichAO)

Code Block
languagejava
 
Code Block
themeEclipse
languagejava
    @Override
    public synchronized OutputModeSDFSchema getOutputModegetOutputSchemaIntern(int pos) {
        return OutputMode.INPUT;
    }

 

...

Code Block
themeEclipse
languagejava
// The Sum of all InputSchema
    @Override
    protectedIterator<LogicalSubscription> iter void process_next(T object, int port) {= getSubscribedToSource().iterator();
        forSDFSchema (intleft i=0;i<predicates.size= iter.next().getSchema();i++){
        SDFSchema right =  if (predicates.get(iiter.next().evaluategetSchema(object)) {;
        SDFSchema outputSchema       transfer(object,i= SDFSchema.join(left,right);
        
        returnsetOutputSchema(outputSchema);
            }
        }
        transfer(object,predicates.size())return outputSchema;
    }

If necessary: process_open, process_close. These methods are called when the query is initialized and terminated, respectively.

  • processPunctuation(Punctuation punctuation, int port)
    • This method is needed to process punctuations (e.g.heartbeat, Heartbeats allow determining the progress of time without the need to produce new elements). It is important, that punctuations and elements are timely ordered, i.e. if a punctuation is send, no other element is allowed to be send with an older timestamp anymore! If the operator has no state, it is enough to call sendPunctuation.

 

you want to add new Attributes, it is important that you create a new schema based on the input one (because there are many hidden stream descriptions inside the schema). In the following there is an example for this

Code Block
languagejava
    @Override
    public SDFSchema getOutputSchemaIntern(int pos) {

        final String start = "meta_valid_start";
        final String end = "meta_valid_end";

		// Create new Attributes
Code Block
themeEclipse
languagejava
    @Override
    public void processPunctuation(PointInTime timestamp, int port) {
        sendPunctuation(timestamp);SDFAttribute starttimeStamp = new SDFAttribute(null, start,
    }

 

  • process_isSemanticallyEqual(IPhysicalOperator ipo)
    • This method is needed for query sharing. Return true, if the operator given and the current operator are equivalent and can be replaced by each other.

 

Code Block
themeEclipse
languagejava
    @Override
            SDFDatatype.TIMESTAMP, null, null, null);
     public boolean process_isSemanticallyEqual(IPhysicalOperator ipo) {
     SDFAttribute endtimeStamp = new SDFAttribute(null, end,
    if(!(ipo instanceof RoutePO)) {
         SDFDatatype.TIMESTAMP, null,  return false;
        }null, null);

        RoutePOList<SDFAttribute> spooutputAttributes = new ArrayList<SDFAttribute>(RoutePO) ipo;
	    // Retrieve old attributes if(this.hasSameSources(spo) &&
       (they should all be part of the output schema)
         this.predicates.size() == spo.predicates.size()) {
outputAttributes.addAll(getInputSchema(0).getAttributes());

	    // add new Attributes
     for(int i = 0; i<this.predicates.size(); i++) {
       outputAttributes.add(starttimeStamp);
        outputAttributes.add(endtimeStamp);

		// Create new Schema with Factory, keep input Schema!
         if(!this.predicates.get(i).equals(spo.predicates.get(i))) {SDFSchema schema = SDFSchemaFactory.createNewWithAttributes(outputAttributes, getInputSchema(0)));
                    return false;
    return schema;
            }
            }
            return true;
        }
        return false;
    }

...

}
Step 2: Create the physical operator

Until now, only descriptive information about the new operator is given. Next the concrete implementation of its functionality needs to be provided.
To create an own physical operator, AbstractPipe<R,W> needs to be extended. To allow generic implementations we utilize the Java generics approach. R is the type of the objects that are read by the operator and W is the type of the objects that are written. Although in most cases, this will be the same class (e.g. Tuple) some operators may read other objects than they write.
The following methods need to be overwritten:

  • getOutputMode(): Needed for locking, Goal: Reduce object copies
    • INPUT: read element will not be modified (e.g. selection)
    • MODIFIED_INPUT: read element will be modified (e.g. projection)
    • NEW_ELEMENT: operator creates a new element (e.g. join)
Code Block
languagejava
themeEclipse
    @Override
    public OutputMode getOutputMode() {
        return OutputMode.INPUT;
    }

 

  • process_next(R input, int port)
    • Process new input element on input port, a new created element can be send to the next operator with the transfer() method

Code Block
languagejava
themeEclipse
    @Override
    protected void process_next(T object, int port) {
        for (int i=0;i<predicates.size();i++){
            if (predicates.get(i).evaluate(object))
Code Block
themeEclipse
languagejava
titleRoutePO
linenumberstrue
package de.uniol.inf.is.odysseus.core.server.physicaloperator;

import java.util.ArrayList;
import java.util.List;

import de.uniol.inf.is.odysseus.core.metadata.PointInTime;
import de.uniol.inf.is.odysseus.core.physicaloperator.IPhysicalOperator;
import de.uniol.inf.is.odysseus.core.physicaloperator.OpenFailedException;
import de.uniol.inf.is.odysseus.core.predicate.IPredicate;

/**
 * @author Marco Grawunder
 */
@SuppressWarnings({"rawtypes"})
public class RoutePO<T> extends AbstractPipe<T, T> {

    private List<IPredicate<? super T>> predicates;

    public RoutePO(List<IPredicate<? super T>> predicates)  {
        super();
        initPredicatestransfer(predicatesobject,i);
    }

    public RoutePO(RoutePO<T> splitPO) {
        super()return;
        initPredicates(splitPO.predicates);
    }
    
    private void initPredicates(List<IPredicate<? super T>> predicates) {}
        this.predicates = new ArrayList<IPredicate<? super T>>(transfer(object,predicates.size());
        for (IPredicate<? super T> p: predicates){
            this.predicates.add(p.clone());
        }
    }
    


    @Override
    public OutputMode getOutputMode() {
        return OutputMode.INPUT;
    }

    @Override
    public void process_open() throws OpenFailedException{
        super.process_open();
        for (IPredicate<? super T> p: predicates)}

If necessary: process_open, process_close. These methods are called when the query is initialized and terminated, respectively.

  • processPunctuation(Punctuation punctuation, int port)
    • This method is needed to process punctuations (e.g.heartbeat, Heartbeats allow determining the progress of time without the need to produce new elements). It is important, that punctuations and elements are timely ordered, i.e. if a punctuation is send, no other element is allowed to be send with an older timestamp anymore! If the operator has no state, it is enough to call sendPunctuation.

 

Code Block
languagejava
themeEclipse
    @Override
    public void processPunctuation(PointInTime timestamp, int port) {
            p.init(sendPunctuation(timestamp);
    }

 

  • process_isSemanticallyEqual(IPhysicalOperator ipo)
    • This method is needed for query sharing. Return true, if the operator given and the current operator are equivalent and can be replaced by each other.

 

Code Block
languagejava
themeEclipse
    @Override
    public boolean process_isSemanticallyEqual(IPhysicalOperator ipo    }        
    }
    
    @Override
    protected void process_next(T object, int port) {
        for (int i=0;i<predicates.size();i++)if(!(ipo instanceof RoutePO)) {
            if (predicates.get(i).evaluate(object)) {
return false;
        }
        RoutePO spo = transfer(object,iRoutePO) ipo;
        if(this.hasSameSources(spo) &&
       return;
         this.predicates.size() ==  }spo.predicates.size()) {
        }
    for(int i =  transfer(object,0; i<this.predicates.size());
 i++) {
  }
    
    @Override
    public RoutePO<T> clone() {
   if(!this.predicates.get(i).equals(spo.predicates.get(i))) {
      return new RoutePO<T>(this);
    }

    @Override
    publicreturn void processPunctuation(PointInTime timestamp, int port) {
false;
             sendPunctuation(timestamp);
    }
    
    @Override
    public}
  boolean process_isSemanticallyEqual(IPhysicalOperator ipo) {
        if(!(ipo instanceof RoutePO)) {
            return falsereturn true;
        }
        RoutePO spo = (RoutePO) iporeturn false;
    }


As a naming convention the class name should end with PO (physical operator).

Code Block
languagejava
themeEclipse
titleRoutePO
linenumberstrue
package de.uniol.inf.is.odysseus.core.server.physicaloperator;

import java.util.ArrayList;
import java.util.List;

import de.uniol.inf.is.odysseus.core.metadata.PointInTime;
import de.uniol.inf.is.odysseus.core.physicaloperator.IPhysicalOperator;
import de.uniol.inf.is.odysseus.core.physicaloperator.OpenFailedException;
import de.uniol.inf.is.odysseus.core.predicate.IPredicate;

/**
 * @author Marco Grawunder
 */
@SuppressWarnings({"rawtypes"})
public class RoutePO<T> extends AbstractPipe<T, T> {

    private List<IPredicate<? super T>> predicates;

    public RoutePO(List<IPredicate<? super T>> predicates)  {
        super();    if(this.hasSameSources(spo) &&
                this.predicates.size() == spo.predicates.size()) {
            for(int i = 0; i<this.predicates.size(); i++) {
                if(!this.predicates.get(i).equals(spo.predicates.get(i))) {
                    return false;
                }
        initPredicates(predicates);
    }

    public RoutePO(RoutePO<T> splitPO) {
       return truesuper();
        initPredicates(splitPO.predicates);
    }
    
    return false;
    }

}


TODO: InputSyncArea, OutputTransferArea

Query Rewrite

Rewriting is used to switch, add, remove or replace logical operators before they are transformed into their physical counterparts. Normally, the aim of the rewriting process is to optimize the query, for example, by pushing down selection operator before costly joins. The implementation is done by rewriting rules, which are implemented like transformation rules (see next section).

Transformation

To translate the logical operator into the physical counterpart the transformation engine is needed. The rule for the route operator can be found in the following example. Further information can be found in the description of the transformation component. 

Code Block
themeEclipse
languagejava
titleTRouteAO
linenumberstrue
package de.uniol.inf.is.odysseus.transform.rules;

import de.uniol.inf.is.odysseus.core.server.logicaloperator.RouteAO;
import de.uniol.inf.is.odysseus.core.server.physicaloperator.RoutePO;
import de.uniol.inf.is.odysseus.core.server.planmanagement.TransformationConfiguration;
import de.uniol.inf.is.odysseus.ruleengine.ruleflow.IRuleFlowGroup;
import de.uniol.inf.is.odysseus.transform.flow.TransformRuleFlowGroup;
import de.uniol.inf.is.odysseus.transform.rule.AbstractTransformationRule;

@SuppressWarnings({"unchecked","rawtypes"})
public class TRouteAORule extends AbstractTransformationRule<RouteAO> {

 @Override
 public int getPriority() {
 return 0;
 }

 
 @Override
 public void execute(RouteAO routeAO, TransformationConfiguration config) { 
 defaultExecute(routeAO, new RoutePO(routeAO.getPredicates()), config, true, true);
 }

 @Override
 public boolean isExecutable(RouteAO operator, TransformationConfiguration transformConfig) {
 return operator.isAllPhysicalInputSet();
 }

 @Override
 public String getName() {
 return "RouteAO -> RoutePO";
 }
 
 @Override
 public IRuleFlowGroup getRuleFlowGroup() {
 return TransformRuleFlowGroup.TRANSFORMATION;
 }
 
 @Override
 public Class<? super RouteAO> getConditionClass() { 
 return RouteAO.class;
 }

}

 

Deprecated: The following is no longer needed.

Next, Odysseus needs to know this rule. This is done by a RuleProvider. Create a new class, which we call RuleProvider:

Code Block
languagejava
package de.uniol.inf.is.odysseus.transform.rules;
import java.util.ArrayList;
import java.util.List;
import de.uniol.inf.is.odysseus.ruleengine.rule.IRule;
import de.uniol.inf.is.odysseus.transform.flow.ITransformRuleProvider;

public class RuleProvider implements ITransformRuleProvider {

    @Override
    public List<IRule<?, ?>> getRules(private void initPredicates(List<IPredicate<? super T>> predicates) {
        this.predicates = new ArrayList<IPredicate<? super T>>(predicates.size());
        for (IPredicate<? super T> p: predicates){
            this.predicates.add(p.clone());
        }
    }
    


    @Override
    public OutputMode getOutputMode() {
        return OutputMode.INPUT;
    }

    @Override
    public void process_open() throws OpenFailedException{
        super.process_open();
        for (IPredicate<? super T> p: predicates){
            p.init();
        }        
    }
    
    @Override
    protected void process_next(T object, int port) {
        for (int i=0;i<predicates.size();i++){
            if (predicates.get(i).evaluate(object)) {
                transfer(object,i);
                return;
            }
        }
        transfer(object,predicates.size());
    }
    
    @Override
    public RoutePO<T> clone() {
        return new RoutePO<T>(this);
    }

    @Override
    public void processPunctuation(PointInTime timestamp, int port) {
        sendPunctuation(timestamp);
    }
    
    @Override
    public boolean process_isSemanticallyEqual(IPhysicalOperator ipo) {
        if(!(ipo instanceof RoutePO)) {
        List<IRule<?, ?>> rules = new ArrayList<IRule<?,?>>();return false;
        }
        rules.add(new TRouteAORule())RoutePO spo = (RoutePO) ipo;
        return rules;if(this.hasSameSources(spo) &&
    }
}

This rule provider is a service and offers Odysseus a list of rules. Now, you have to declare this service so that it can be found. Do the following:

  • Create a folder (if does not exist yet) called OSGI-INF in the root directory of your bundle.
  • Create a Component Definition (right click --> new).
  • In the tab Overview:
    • Name should be something unique (e.g. your package or bundle name + rule provider)
    • Choose your new RuleProvider as Class
  • Switch to tab Services:
    • Click on Add... of Provided Services and choose the interface de.uniol.inf.is.odysseus.transform.flow.ITransformRuleProvider

Attention:

If you copy the component definition, you have to add it to the MANIFEST.MF by hand! So, if the service is not started, check, if the manifest has the following line (assuming that your component definition file is named RuleService.xml)

            this.predicates.size() == spo.predicates.size()) {
            for(int i = 0; i<this.predicates.size(); i++) {
                if(!this.predicates.get(i).equals(spo.predicates.get(i))) {
                    return false;
                }
            }
            return true;
        }
        return false;
    }

}


TODO: InputSyncArea, OutputTransferArea

Important: If you create a new StreamObject at runtime you must copy the cloned metadata!

Code Block
languagejava
out.setMetadata(input.getMetadata().clone());

Query Rewrite

Rewriting is used to switch, add, remove or replace logical operators before they are transformed into their physical counterparts. Normally, the aim of the rewriting process is to optimize the query, for example, by pushing down selection operator before costly joins. The implementation is done by rewriting rules, which are implemented like transformation rules (see next section).

Transformation

To translate the logical operator into the physical counterpart the transformation engine is needed. The rule for the route operator can be found in the following example. Further information can be found in the description of the transformation component. 

Code Block
languagejava
themeEclipse
titleTRouteAO
linenumberstrue
package de.uniol.inf.is.odysseus.transform.rules;

import de.uniol.inf.is.odysseus.core.server.logicaloperator.RouteAO;
import de.uniol.inf.is.odysseus.core.server.physicaloperator.RoutePO;
import de.uniol.inf.is.odysseus.core.server.planmanagement.TransformationConfiguration;
import de.uniol.inf.is.odysseus.ruleengine.ruleflow.IRuleFlowGroup;
import de.uniol.inf.is.odysseus.transform.flow.TransformRuleFlowGroup;
import de.uniol.inf.is.odysseus.transform.rule.AbstractTransformationRule;

@SuppressWarnings({"unchecked","rawtypes"})
public class TRouteAORule extends AbstractTransformationRule<RouteAO> {

 @Override
 public int getPriority() {
 return 0;
 }

 
 @Override
 public void execute(RouteAO routeAO, TransformationConfiguration config) { 
 defaultExecute(routeAO, new RoutePO(routeAO.getPredicates()), config, true, true);
 }

 @Override
 public boolean isExecutable(RouteAO operator, TransformationConfiguration transformConfig) {
 return operator.isAllPhysicalInputSet();
 }

 @Override
 public String getName() {
 return "RouteAO -> RoutePO";
 }
 
 @Override
 public IRuleFlowGroup getRuleFlowGroup() {
 return TransformRuleFlowGroup.TRANSFORMATION;
 }
 
 @Override
 public Class<? super RouteAO> getConditionClass() { 
 return RouteAO.class;
 }

}

 

 Service-Component: OSGI-INF/RuleService.xml