The Odysseus Operator Framework
...
...
...
...
Open, next, close, transfer, process_open, process_next, process_close
Creating new Operators
To create a new operator from scratch, the following steps have to be followed:
- Create a logical operator, which holds the necessary configuration parameters, e.g. a predicate for a filter or an attribute list for a projection. ILogicalOperator is the interface that needs to be implemented, AbstractLogicalOperator the base class that should be extended. Important: If the logical operator contains predicates, it must implement the interface IProvidesPredicates!
- Create (at least) one physical operator implementation that is initialized by the information provided by the logical operator and can process the input data.
- Optionally, create a rewrite rule. These rules are used to modify the logical query plan. In most cases this is the switch of operators, e.g. to push selections and projections close to the sources. If you create a new operator where the placement in the query plan does not matter. Provide rules to allow switching or else plans with the new operator may be optimizable.
- Create a transformation rule. A transformation rule translates a logical operator into one of its physical counterparts.
Example: Route operator
In the following we will use the route operator to demonstrate how easy the integration of new operators in Odysseus is. The route operator is a more general form of the selection operator. It needs a number of predicates. If the first predicate evaluates on the incoming data to true, the input will be send to first output (port 0), if the seconds evaluates to true to the seconds output and finally, if no predicate evaluates to true to the last output port.
Step 1: Creating the logical operator
First, you need to decide if you want to place the operator in a new OSGi bundle or in an existing one. Independently of that, the operator must be placed in a bundle ending with logicaloperator if it should be integrated automatically.
To create your own logical operator you need to extend AbstractLogicalOperator (which implements ILogicalOperator). For convenience reasons there are base implementations for operators with single input (UnaryLogicalOp) and binary input (BinaryLogicalOp). As a naming convention, the class name should end with AO (for algebra operator).
This class must provide (at least) two constructors and the clone method. The default constructor is required as instances of logical operators are created by newInstance(). Clone must call the copy constructor and the copy constructor must call the super copy constructor! If at runtime an error like "has no owner" is called, in most cases this is because of the missing call to the super copy constructor.
Finally, the class needs setters and getters for the parameter it should keep
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
package de.uniol.inf.is.odysseus.core.server.logicaloperator;
import java.util.List;
import de.uniol.inf.is.odysseus.core.predicate.IPredicate;
import de.uniol.inf.is.odysseus.core.server.logicaloperator.annotations.LogicalOperator;
import de.uniol.inf.is.odysseus.core.server.logicaloperator.annotations.Parameter;
import de.uniol.inf.is.odysseus.core.server.logicaloperator.builder.PredicateParameter;
@LogicalOperator(name="ROUTE", minInputPorts=1, maxInputPorts=1)
public class RouteAO extends UnaryLogicalOp {
private static final long serialVersionUID = -8015847502104587689L;
public RouteAO(){
super();
}
public RouteAO(RouteAO routeAO){
super(routeAO);
}
@Override
@Parameter(type=PredicateParameter.class, isList=true)
public void setPredicates(List<IPredicate<?>> predicates) {
super.setPredicates(predicates);
}
@Override
public AbstractLogicalOperator clone() {
return new RouteAO(this);
}
} |
Every operator needs to provide an output schema, i.e. what is the schema of the elements that are produced. As a default implementation AbstractLogicalOperator delivers the input schema as the output schema. Route does not change the input schema, so this implementation is sufficient. If the input schema is not the same as the output schema, the class needs to implement getOutputSchemaIntern(int port), where port is the output port of the operator.
Step 1b: Annotating the logical operator
For the easy integration of new logical operators into to PQL query language, annotations should be used. Another way is to extend AbstractOperatorBuilder. In these annotations the name of the operator and number of minimal and maximal inputs and the parameters are described.
On the right side, the whole implementation of the RouteAO can be found. Because predicates need to be initialized before the processing, they should be saved by the AbstractLogicalOperator.
Further information about the annotations can be found in the PQL documentation.
Step 2: Create the physical operator
Until now, only descriptive information about the new operator is given. Next the concrete implementation of its functionality needs to be provided.
To create an own physical operator, AbstractPipe<R,W> needs to be extended. To allow generic implementations we utilize the Java generics approach. R is the type of the objects that are read by the operator and W is the type of the objects that are written. Although in most cases, this will be the same class (e.g. Tuple) some operators may read other objects than they write.
The following methods need to be overwritten:
- getOutputMode(): Needed for locking, Goal: Reduce object copies
- INPUT: read element will not be modified (e.g. selection)
- MODIFIED_INPUT: read element will be modified (e.g. projection)
- NEW_ELEMENT: operator creates a new element (e.g. join)
Code Block | ||||
---|---|---|---|---|
| ||||
@Override
public OutputMode getOutputMode() {
return OutputMode.INPUT;
} |
...
Code Block | ||||
---|---|---|---|---|
| ||||
@Override
protected void process_next(T object, int port) {
for (int i=0;i<predicates.size();i++){
if (predicates.get(i).evaluate(object)) {
transfer(object,i);
return;
}
}
transfer(object,predicates.size());
} |
If necessary: process_open, process_close. These methods are called when the query is initialized and terminated, respectively.
- processPunctuation(PointInTime timestamp, intport)
- This method is needed to process special punctuations, called heartbeats. Heartbeats allow determining the progress of time without the need to produce new elements. It is important, that heartbeats and elements are timely ordered, i.e. if a heartbeat is send, no other element is allowed to be send with an older timestamp anymore! If the operator has no state, it is enough to call sendPunctuation.
Code Block | ||||
---|---|---|---|---|
| ||||
@Override
public void processPunctuation(PointInTime timestamp, int port) {
sendPunctuation(timestamp);
} |
- process_isSemanticallyEqual
- This method is needed for query sharing. Return true, if the operator given and the current operator are equivalent and can be replaced by each other.
Code Block | ||||
---|---|---|---|---|
| ||||
@Override
public boolean process_isSemanticallyEqual(IPhysicalOperator ipo) {
if(!(ipo instanceof RoutePO)) {
return false;
}
RoutePO spo = (RoutePO) ipo;
if(this.hasSameSources(spo) &&
this.predicates.size() == spo.predicates.size()) {
for(int i = 0; i<this.predicates.size(); i++) {
if(!this.predicates.get(i).equals(spo.predicates.get(i))) {
return false;
}
}
return true;
}
return false;
} |
...
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
package de.uniol.inf.is.odysseus.core.server.physicaloperator;
import java.util.ArrayList;
import java.util.List;
import de.uniol.inf.is.odysseus.core.metadata.PointInTime;
import de.uniol.inf.is.odysseus.core.physicaloperator.IPhysicalOperator;
import de.uniol.inf.is.odysseus.core.physicaloperator.OpenFailedException;
import de.uniol.inf.is.odysseus.core.predicate.IPredicate;
/**
* @author Marco Grawunder
*/
@SuppressWarnings({"rawtypes"})
public class RoutePO<T> extends AbstractPipe<T, T> {
private List<IPredicate<? super T>> predicates;
public RoutePO(List<IPredicate<? super T>> predicates) {
super();
initPredicates(predicates);
}
public RoutePO(RoutePO<T> splitPO) {
super();
initPredicates(splitPO.predicates);
}
private void initPredicates(List<IPredicate<? super T>> predicates) {
this.predicates = new ArrayList<IPredicate<? super T>>(predicates.size());
for (IPredicate<? super T> p: predicates){
this.predicates.add(p.clone());
}
}
@Override
public OutputMode getOutputMode() {
return OutputMode.INPUT;
}
@Override
public void process_open() throws OpenFailedException{
super.process_open();
for (IPredicate<? super T> p: predicates){
p.init();
}
}
@Override
protected void process_next(T object, int port) {
for (int i=0;i<predicates.size();i++){
if (predicates.get(i).evaluate(object)) {
transfer(object,i);
return;
}
}
transfer(object,predicates.size());
}
@Override
public RoutePO<T> clone() {
return new RoutePO<T>(this);
}
@Override
public void processPunctuation(PointInTime timestamp, int port) {
sendPunctuation(timestamp);
}
@Override
public boolean process_isSemanticallyEqual(IPhysicalOperator ipo) {
if(!(ipo instanceof RoutePO)) {
return false;
}
RoutePO spo = (RoutePO) ipo;
if(this.hasSameSources(spo) &&
this.predicates.size() == spo.predicates.size()) {
for(int i = 0; i<this.predicates.size(); i++) {
if(!this.predicates.get(i).equals(spo.predicates.get(i))) {
return false;
}
}
return true;
}
return false;
}
}
|
TODO: InputSyncArea, OutputTransferArea
Query Rewrite
http://odysseus.offis.uni-oldenburg.de/twiki/bin/view/Main/RegelnF%fcrRewriteErstellen
Transformation
To translate the logical operator into the physical counterpart the transformation engine is needed. The rule for the route operator can be found in the following example. Further information can be found in the description of the transformation component. (http://odysseus.offis.uni-oldenburg.de/twiki/bin/view/Main/RegelnF%fcrTransformationErstellen)
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
package de.uniol.inf.is.odysseus.transform.rules;
import de.uniol.inf.is.odysseus.core.server.logicaloperator.RouteAO;
import de.uniol.inf.is.odysseus.core.server.physicaloperator.RoutePO;
import de.uniol.inf.is.odysseus.core.server.planmanagement.TransformationConfiguration;
import de.uniol.inf.is.odysseus.ruleengine.ruleflow.IRuleFlowGroup;
import de.uniol.inf.is.odysseus.transform.flow.TransformRuleFlowGroup;
import de.uniol.inf.is.odysseus.transform.rule.AbstractTransformationRule;
@SuppressWarnings({"unchecked","rawtypes"})
public class TRouteAORule extends AbstractTransformationRule<RouteAO> {
@Override
public int getPriority() {
return 0;
}
@Override
public void execute(RouteAO routeAO, TransformationConfiguration config) {
defaultExecute(routeAO, new RoutePO(routeAO.getPredicates()), config, true, true);
}
@Override
public boolean isExecutable(RouteAO operator, TransformationConfiguration transformConfig) {
return operator.isAllPhysicalInputSet();
}
@Override
public String getName() {
return "RouteAO -> RoutePO";
}
@Override
public IRuleFlowGroup getRuleFlowGroup() {
return TransformRuleFlowGroup.TRANSFORMATION;
}
@Override
public Class<? super RouteAO> getConditionClass() {
return RouteAO.class;
}
}
|
...