...
- getOutputMode(): Needed for locking, Goal: Reduce object copies
- INPUT: read element will not be modified (e.g. selection)
- MODIFIED_INPUT: read element will be modified (e.g. projection)
- NEW_ELEMENT: operator creates a new element (e.g. join)
Code Block | ||||
---|---|---|---|---|
| ||||
@Override
public OutputMode getOutputMode() {
return OutputMode.INPUT;
} |
- process_next(R input, int port)
- Process new input element on input port, a new created element can be send to the next operator with the transfer-method
- Process new input element on input port, a new created element can be send to the next operator with the transfer-method
...
Code Block | ||||
---|---|---|---|---|
| ||||
@Override
protected void process_next(T object, int port) {
for (int i=0;i<predicates.size();i++){
if (predicates.get(i).evaluate(object)) {
transfer(object,i);
return;
}
}
transfer(object,predicates.size());
} |
If necessary: process_open, process_close. These methods are called when the query is initialized and terminated, respectively.
- processPunctuation(PointInTime timestamp, intport)
- This method is needed to process special punctuations, called heartbeats. Heartbeats allow determining the progress of time without the need to produce new elements. It is important, that heartbeats and elements are timely ordered, i.e. if a heartbeat is send, no other element is allowed to be send with an older timestamp anymore! If the operator has no state, it is enough to call sendPunctuation.
Code Block | ||||
---|---|---|---|---|
| ||||
@Override
public void processPunctuation(PointInTime timestamp, int port) {
sendPunctuation(timestamp);
} |
- process_isSemanticallyEqual
- This method is needed for query sharing. Return true, if the operator given and the current operator are equivalent and can be replaced by each other.
...
Code Block | ||||
---|---|---|---|---|
| ||||
@Override
public boolean process_isSemanticallyEqual(IPhysicalOperator ipo) {
if(!(ipo instanceof RoutePO)) {
return false;
}
RoutePO spo = (RoutePO) ipo;
if(this.hasSameSources(spo) &&
this.predicates.size() == spo.predicates.size()) {
for(int i = 0; i<this.predicates.size(); i++) {
if(!this.predicates.get(i).equals(spo.predicates.get(i))) {
return false;
}
}
return true;
}
return false;
} |
As a naming convention the class name should end with PO (physical operator).
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
package de.uniol.inf.is.odysseus.core.server.physicaloperator;
import java.util.ArrayList;
import java.util.List;
import de.uniol.inf.is.odysseus.core.metadata.PointInTime;
import de.uniol.inf.is.odysseus.core.physicaloperator.IPhysicalOperator;
import de.uniol.inf.is.odysseus.core.physicaloperator.OpenFailedException;
import de.uniol.inf.is.odysseus.core.predicate.IPredicate;
/**
* @author Marco Grawunder
*/
@SuppressWarnings({"rawtypes"})
public class RoutePO<T> extends AbstractPipe<T, T> {
private List<IPredicate<? super T>> predicates;
public RoutePO(List<IPredicate<? super T>> predicates) {
super();
initPredicates(predicates);
}
public RoutePO(RoutePO<T> splitPO) {
super();
initPredicates(splitPO.predicates);
}
private void initPredicates(List<IPredicate<? super T>> predicates) {
this.predicates = new ArrayList<IPredicate<? super T>>(predicates.size());
for (IPredicate<? super T> p: predicates){
this.predicates.add(p.clone());
}
}
@Override
public OutputMode getOutputMode() {
return OutputMode.INPUT;
}
@Override
public void process_open() throws OpenFailedException{
super.process_open();
for (IPredicate<? super T> p: predicates){
p.init();
}
}
@Override
protected void process_next(T object, int port) {
for (int i=0;i<predicates.size();i++){
if (predicates.get(i).evaluate(object)) {
transfer(object,i);
return;
}
}
transfer(object,predicates.size());
}
@Override
public RoutePO<T> clone() {
return new RoutePO<T>(this);
}
@Override
public void processPunctuation(PointInTime timestamp, int port) {
sendPunctuation(timestamp);
}
@Override
public boolean process_isSemanticallyEqual(IPhysicalOperator ipo) {
if(!(ipo instanceof RoutePO)) {
return false;
}
RoutePO spo = (RoutePO) ipo;
if(this.hasSameSources(spo) &&
this.predicates.size() == spo.predicates.size()) {
for(int i = 0; i<this.predicates.size(); i++) {
if(!this.predicates.get(i).equals(spo.predicates.get(i))) {
return false;
}
}
return true;
}
return false;
}
}
|
TODO: InputSyncArea, OutputTransferArea
...
To translate the logical operator into the physical counterpart the transformation engine is needed. The rule for the route operator can be found in the following example. Further information can be found in the description of the transformation component. (http://odysseus.offis.uni-oldenburg.de/twiki/bin/view/Main/RegelnF%fcrTransformationErstellen)
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
package de.uniol.inf.is.odysseus.transform.rules;
import de.uniol.inf.is.odysseus.core.server.logicaloperator.RouteAO;
import de.uniol.inf.is.odysseus.core.server.physicaloperator.RoutePO;
import de.uniol.inf.is.odysseus.core.server.planmanagement.TransformationConfiguration;
import de.uniol.inf.is.odysseus.ruleengine.ruleflow.IRuleFlowGroup;
import de.uniol.inf.is.odysseus.transform.flow.TransformRuleFlowGroup;
import de.uniol.inf.is.odysseus.transform.rule.AbstractTransformationRule;
@SuppressWarnings({"unchecked","rawtypes"})
public class TRouteAORule extends AbstractTransformationRule<RouteAO> {
@Override
public int getPriority() {
return 0;
}
@Override
public void execute(RouteAO routeAO, TransformationConfiguration config) {
defaultExecute(routeAO, new RoutePO(routeAO.getPredicates()), config, true, true);
}
@Override
public boolean isExecutable(RouteAO operator, TransformationConfiguration transformConfig) {
return operator.isAllPhysicalInputSet();
}
@Override
public String getName() {
return "RouteAO -> RoutePO";
}
@Override
public IRuleFlowGroup getRuleFlowGroup() {
return TransformRuleFlowGroup.TRANSFORMATION;
}
@Override
public Class<? super RouteAO> getConditionClass() {
return RouteAO.class;
}
}
|
User defined Operators
A rather simple way for the creation of user defined operators is to use the UDO-PQL Operator. For further information see PQL (http://odysseus.offis.uni-oldenburg.de/twiki/bin/view/Main/PQL:UDO)