This document describes the basic concepts of the Odysseus operator framework and shows how to extend the framework with new operators. In Odysseus an operator consumes events streams, does some calculations and produces results event streams. Different operators can be connected by a publish/subscribe concept to form an operator plan (or query plan).
A logical operator can be compared to an algebra operator. It contains not the concrete algorithms for the processing of data but describes what to do with the data, e.g. in a projection it states the attributes to deliver, and in a join it gives the join predicates. A physical operator is one possible implementation of an algorithm to process the operation. So, different physical operators can be provided for the same logical operator. Logical operators are translated by transformation rules to physical operators.
A typical approach to describe queries is done by declarative query languages like SQL. Odysseus also provides a stream optimized version of SQL named CQL. Because data stream queries are often hard to express with Select-From-Where clauses, Odysseus also provides a more operator based query language named PQL. Within this language the logical operators are the basic building blocks. Further information to PQL can be found in the corresponding documentation The Odysseus Procedural Query Language (PQL) Framework.
Physical operators are divided into operators that add data to the systems (sources), remove data from the system (sinks) and process data (pipes). In a more query plan like view, the sources are operators with no input but with output port, the sinks are operators with only inputs ports and the pipes are both sources and sinks. Odysseus provides basic implementations for sources (AbstractSource), sinks (AbstractSinks) and pipes (AbstractPipe) that must be used to create own operators. The abstract implementations provide most necessary common processing code to concentrate on special processing for the concrete operator. In the following we will demonstrate the creation of a simple route operator, that routes data tuples to different output ports depending on some predicates. The physical operators are connected by a subscription.
TODO: Interaction between sources and sinks
Open, next, close, transfer, process_open, process_next, process_close
To create a new operator from scratch, the following steps have to be followed:
In the following we will use the route operator to demonstrate how easy the integration of new operators in Odysseus is. The route operator is a more general form of the selection operator. It needs a number of predicates. If the first predicate evaluates on the incoming data to true, the input will be send to first output (port 0), if the seconds evaluates to true to the seconds output and finally, if no predicate evaluates to true to the last output port.
First, you need to decide if you want to place the operator in a new OSGi bundle or in an existing one. Independently of that, the operator must be placed in a bundle ending with logicaloperator if it should be integrated automatically.
To create your own logical operator you need to extend AbstractLogicalOperator (which implements ILogicalOperator). For convenience reasons there are base implementations for operators with single input (UnaryLogicalOp) and binary input (BinaryLogicalOp). As a naming convention, the class name should end with AO (for algebra operator).
This class must provide (at least) two constructors and the clone method. The default constructor is required as instances of logical operators are created by newInstance(). Clone must call the copy constructor and the copy constructor must call the super copy constructor! If at runtime an error like "has no owner" is called, in most cases this is because of the missing call to the super copy constructor.
Finally, the class needs setters and getters for the parameter it should keep
package de.uniol.inf.is.odysseus.core.server.logicaloperator; import java.util.List; import de.uniol.inf.is.odysseus.core.predicate.IPredicate; import de.uniol.inf.is.odysseus.core.server.logicaloperator.annotations.LogicalOperator; import de.uniol.inf.is.odysseus.core.server.logicaloperator.annotations.Parameter; import de.uniol.inf.is.odysseus.core.server.logicaloperator.builder.PredicateParameter; @LogicalOperator(name="ROUTE", minInputPorts=1, maxInputPorts=1) public class RouteAO extends UnaryLogicalOp { private static final long serialVersionUID = -8015847502104587689L; public RouteAO(){ super(); } public RouteAO(RouteAO routeAO){ super(routeAO); } @Override @Parameter(type=PredicateParameter.class, isList=true) public void setPredicates(List<IPredicate<?>> predicates) { super.setPredicates(predicates); } @Override public AbstractLogicalOperator clone() { return new RouteAO(this); } } |
Every operator needs to provide an output schema, i.e. what is the schema of the elements that are produced. As a default implementation AbstractLogicalOperator delivers the input schema as the output schema. Route does not change the input schema, so this implementation is sufficient. If the input schema is not the same as the output schema, the class needs to implement getOutputSchemaIntern(int port), where port is the output port of the operator.
For the easy integration of new logical operators into to PQL query language, annotations should be used. Another way is to extend AbstractOperatorBuilder. In these annotations the name of the operator and number of minimal and maximal inputs and the parameters are described.
On the right side, the whole implementation of the RouteAO can be found. Because predicates need to be initialized before the processing, they should be saved by the AbstractLogicalOperator.
Further information about the annotations can be found in the PQL documentation.
Until now, only descriptive information about the new operator is given. Next the concrete implementation of its functionality needs to be provided.
To create an own physical operator, AbstractPipe<R,W> needs to be extended. To allow generic implementations we utilize the Java generics approach. R is the type of the objects that are read by the operator and W is the type of the objects that are written. Although in most cases, this will be the same class (e.g. Tuple) some operators may read other objects than they write.
The following methods need to be overwritten:
@Override public OutputMode getOutputMode() { return OutputMode.INPUT; } |
@Override protected void process_next(T object, int port) { for (int i=0;i<predicates.size();i++){ if (predicates.get(i).evaluate(object)) { transfer(object,i); return; } } transfer(object,predicates.size()); } |
If necessary: process_open, process_close. These methods are called when the query is initialized and terminated, respectively.
@Override public void processPunctuation(PointInTime timestamp, int port) { sendPunctuation(timestamp); } |
@Override public boolean process_isSemanticallyEqual(IPhysicalOperator ipo) { if(!(ipo instanceof RoutePO)) { return false; } RoutePO spo = (RoutePO) ipo; if(this.hasSameSources(spo) && this.predicates.size() == spo.predicates.size()) { for(int i = 0; i<this.predicates.size(); i++) { if(!this.predicates.get(i).equals(spo.predicates.get(i))) { return false; } } return true; } return false; } |
As a naming convention the class name should end with PO (physical operator).
package de.uniol.inf.is.odysseus.core.server.physicaloperator; import java.util.ArrayList; import java.util.List; import de.uniol.inf.is.odysseus.core.metadata.PointInTime; import de.uniol.inf.is.odysseus.core.physicaloperator.IPhysicalOperator; import de.uniol.inf.is.odysseus.core.physicaloperator.OpenFailedException; import de.uniol.inf.is.odysseus.core.predicate.IPredicate; /** * @author Marco Grawunder */ @SuppressWarnings({"rawtypes"}) public class RoutePO<T> extends AbstractPipe<T, T> { private List<IPredicate<? super T>> predicates; public RoutePO(List<IPredicate<? super T>> predicates) { super(); initPredicates(predicates); } public RoutePO(RoutePO<T> splitPO) { super(); initPredicates(splitPO.predicates); } private void initPredicates(List<IPredicate<? super T>> predicates) { this.predicates = new ArrayList<IPredicate<? super T>>(predicates.size()); for (IPredicate<? super T> p: predicates){ this.predicates.add(p.clone()); } } @Override public OutputMode getOutputMode() { return OutputMode.INPUT; } @Override public void process_open() throws OpenFailedException{ super.process_open(); for (IPredicate<? super T> p: predicates){ p.init(); } } @Override protected void process_next(T object, int port) { for (int i=0;i<predicates.size();i++){ if (predicates.get(i).evaluate(object)) { transfer(object,i); return; } } transfer(object,predicates.size()); } @Override public RoutePO<T> clone() { return new RoutePO<T>(this); } @Override public void processPunctuation(PointInTime timestamp, int port) { sendPunctuation(timestamp); } @Override public boolean process_isSemanticallyEqual(IPhysicalOperator ipo) { if(!(ipo instanceof RoutePO)) { return false; } RoutePO spo = (RoutePO) ipo; if(this.hasSameSources(spo) && this.predicates.size() == spo.predicates.size()) { for(int i = 0; i<this.predicates.size(); i++) { if(!this.predicates.get(i).equals(spo.predicates.get(i))) { return false; } } return true; } return false; } } |
TODO: InputSyncArea, OutputTransferArea
Rewriting is used to swtich, add, remove or replace logical operators before they are transformed into their physical counterparts. Normally, the aim of the rewriting process is to optimize the query, for example, by pushing down selection operator before costly joins. The implementation is done by rewriting rules, which are implemented like transformation rules (see next section).
To translate the logical operator into the physical counterpart the transformation engine is needed. The rule for the route operator can be found in the following example. Further information can be found in the description of the transformation component.
package de.uniol.inf.is.odysseus.transform.rules; import de.uniol.inf.is.odysseus.core.server.logicaloperator.RouteAO; import de.uniol.inf.is.odysseus.core.server.physicaloperator.RoutePO; import de.uniol.inf.is.odysseus.core.server.planmanagement.TransformationConfiguration; import de.uniol.inf.is.odysseus.ruleengine.ruleflow.IRuleFlowGroup; import de.uniol.inf.is.odysseus.transform.flow.TransformRuleFlowGroup; import de.uniol.inf.is.odysseus.transform.rule.AbstractTransformationRule; @SuppressWarnings({"unchecked","rawtypes"}) public class TRouteAORule extends AbstractTransformationRule<RouteAO> { @Override public int getPriority() { return 0; } @Override public void execute(RouteAO routeAO, TransformationConfiguration config) { defaultExecute(routeAO, new RoutePO(routeAO.getPredicates()), config, true, true); } @Override public boolean isExecutable(RouteAO operator, TransformationConfiguration transformConfig) { return operator.isAllPhysicalInputSet(); } @Override public String getName() { return "RouteAO -> RoutePO"; } @Override public IRuleFlowGroup getRuleFlowGroup() { return TransformRuleFlowGroup.TRANSFORMATION; } @Override public Class<? super RouteAO> getConditionClass() { return RouteAO.class; } } |
Next, Odysseus needs to know this rule. This is done by a RuleProvider. Create a new class, which we call RuleProvider:
package de.uniol.inf.is.odysseus.transform.rules; import java.util.ArrayList; import java.util.List; import de.uniol.inf.is.odysseus.ruleengine.rule.IRule; import de.uniol.inf.is.odysseus.transform.flow.ITransformRuleProvider; public class RuleProvider implements ITransformRuleProvider { @Override public List<IRule<?, ?>> getRules() { List<IRule<?, ?>> rules = new ArrayList<IRule<?,?>>(); rules.add(new TRouteAORule()); return rules; } } |
This rule provider is a service and offers Odysseus a list of rules. Now, you have to declare this service so that it can be found. Do the following:
Attention:
If you copy the component definition, you have to add it to the MANIFEST.MF by hand! So, if the service is not started, check, if the manifest has the following line (assuming that your component definition file is named RuleService.xml)
Service-Component: OSGI-INF/RuleService.xml
A rather simple way for the creation of user defined operators is to use the UDO-PQL Operator. For further information see PQL The Odysseus Procedural Query Language (PQL) Framework