New (2022/11/02): See also our video about this (here we provide another example).
Typically, you will create an operator in an existing bundle. If you start with a new project from the scratch, you will need to create a new bundle.
...
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
/********************************************************************************** * Copyright 2011 The Odysseus Team * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package de.uniol.inf.is.odysseus.coretutorial.server.logicaloperator; import java.util.LinkedList; import java.util.List; import de.uniol.inf.is.odysseus.core.logicaloperator.LogicalOperatorCategory; import de.uniol.inf.is.odysseus.core.predicate.IPredicate; import de.uniol.inf.is.odysseus.core.sdf.schema.SDFSchema; import de.uniol.inf.is.odysseus.core.server.logicaloperator.AbstractLogicalOperator; import de.uniol.inf.is.odysseus.core.server.logicaloperator.UnaryLogicalOp; import de.uniol.inf.is.odysseus.core.server.logicaloperator.annotations.LogicalOperator; import de.uniol.inf.is.odysseus.core.server.logicaloperator.annotations.Parameter; import de.uniol.inf.is.odysseus.core.server.logicaloperator.builder.BooleanParameter; import de.uniol.inf.is.odysseus.core.server.logicaloperator.builder.PredicateParameter; import de.uniol.inf.is.odysseus.core.server.physicaloperator.IHasPredicates; @LogicalOperator(name = "ROUTE", minInputPorts = 1, maxInputPorts = 1, doc = "This operator can be used to route the elements in the stream to different further processing operators, depending on the predicate.", category = { LogicalOperatorCategory.PROCESSING }) public class RouteAO extends UnaryLogicalOp implements IHasPredicates{ private static final long serialVersionUID = -8015847502104587689L; private boolean overlappingPredicates = false; private List<IPredicate<?>> predicates = new LinkedList<IPredicate<?>>(); /** * if an element is routed to an output, heartbeats will be send to all * other outputs. */ private boolean sendingHeartbeats = false; public RouteAO() { super(); } public RouteAO(RouteAO routeAO) { super(routeAO); this.overlappingPredicates = routeAO.overlappingPredicates; this.overlappingPredicates = routeAO.overlappingPredicates; this.sendingHeartbeats = routeAO.sendingHeartbeats; if (routeAO.predicates != null) { for (IPredicate<?> pred : routeAO.predicates) { this.predicates.add(pred.clone()); } } } @Override } } } @Override protected SDFSchema getOutputSchemaIntern(int pos) { // since it is a routing, schema is always from input port 0 return super.getOutputSchemaIntern(0); } @Parameter(type = PredicateParameter.class, isList = true) public void setPredicates(List<IPredicate<?>> predicates) { this.predicates = predicates; } @Override } @Override public List<IPredicate<?>> getPredicates() { return predicates; } @Parameter(name = "overlappingPredicates", type = BooleanParameter.class, optional = true, doc = "Evaluate all (true) or only until first true predicate (false), i.e. deliver to all ports where predicate is true or only to first") public void setOverlappingPredicates(boolean overlappingPredicates) { this.overlappingPredicates = overlappingPredicates; } @Override public AbstractLogicalOperator clone() { return new RouteAO(this); } /** * @return */ public boolean isOverlappingPredicates() { return this.overlappingPredicates; } @Parameter(name = "sendingHeartbeats", type = BooleanParameter.class, optional = true, doc = "If an element is routed to an output, heartbeats will be send to all other outputs") public void setSendingHeartbeats(boolean sendingHeartbeats) { this.sendingHeartbeats = sendingHeartbeats; } public boolean isSendingHeartbeats() { return this return this.sendingHeartbeats; } } |
Every operator needs to provide an output schema, i.e. what is the schema of the elements that are produced. As a default implementation AbstractLogicalOperator delivers the input schema as the output schema. Route does not change the input schema, so this implementation is sufficient. If the input schema is not the same as the output schema, the class needs to implement getOutputSchemaIntern(int port), where port is the output port of the operator.
...
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
package de.uniol.inf.is.odysseus.coretutorial.server.physicaloperator; import java.util.ArrayList; import java.util.List; import de.uniol.inf.is.odysseus.core.metadata.PointInTime; import de.uniol.inf.is.odysseus.core.physicaloperator.IPhysicalOperator; import de.uniol.inf.is.odysseus.core.physicaloperator.OpenFailedException; import de.uniol.inf.is.odysseus.core.predicate.IPredicate; /** * @author Marco Grawunder */ @SuppressWarnings({"rawtypes"}) public class RoutePO<T> extends AbstractPipe<T, T> { private List<IPredicate<? super T>> predicates; public RoutePO(List<IPredicate<? super T>> predicates) { super(); initPredicates(predicates); } public RoutePO(RoutePO<T> splitPO) { super(); initPredicates(splitPO.predicates); } private void initPredicates(List<IPredicate<? super T>> predicates) { this.predicates = new ArrayList<IPredicate<? super T>>(predicates.size()); for (IPredicate<? super T> p: predicates){ this.predicates.add(p.clone()); } } @Override public OutputMode getOutputMode() { return OutputMode.INPUT; } @Override public void process_open() throws OpenFailedException{ super.process_open(); for (IPredicate<? super T> p: predicates){ p.init(); } } @Override protected void process_next(T object, int port) { for (int i=0;i<predicates.size();i++){ if (predicates.get(i).evaluate(object)) { transfer(object,i); return; } } transfer(object,predicates.size()); } @Override public RoutePO<T> clone() { return new RoutePO<T>(this); } @Override public void processPunctuation(PointInTime timestamp, int port) { sendPunctuation(timestamp); } @Override public boolean process_isSemanticallyEqual(IPhysicalOperator ipo) { if(!(ipo instanceof RoutePO)) { return false; } RoutePO spo = (RoutePO) ipo; if(this.hasSameSources(spo) && this.predicates.size() == spo.predicates.size()) { for(int i = 0; i<this.predicates.size(); i++) { if(!this.predicates.get(i).equals(spo.predicates.get(i))) { return false; } } return true; } return false; } } |
TODO: InputSyncArea, OutputTransferArea
Important: If you create a new StreamObject at runtime you must copy the cloned metadata!
Code Block | ||
---|---|---|
| ||
out.setMetadata(input.getMetadata().clone()); |
Query Rewrite
Rewriting is used to switch, add, remove or replace logical operators before they are transformed into their physical counterparts. Normally, the aim of the rewriting process is to optimize the query, for example, by pushing down selection operator before costly joins. The implementation is done by rewriting rules, which are implemented like transformation rules (see next section).
Transformation
To translate the logical operator into the physical counterpart the transformation engine is needed. The rule for the route operator can be found in the following example. Further information can be found in the description of the transformation component.
physicaloperator;
/**********************************************************************************
* Copyright 2011 The Odysseus Team
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import de.uniol.inf.is.odysseus.core.metadata.IMetaAttribute;
import de.uniol.inf.is.odysseus.core.metadata.IStreamObject;
import de.uniol.inf.is.odysseus.core.metadata.ITimeInterval;
import de.uniol.inf.is.odysseus.core.physicaloperator.Heartbeat;
import de.uniol.inf.is.odysseus.core.physicaloperator.IPhysicalOperator;
import de.uniol.inf.is.odysseus.core.physicaloperator.IPunctuation;
import de.uniol.inf.is.odysseus.core.predicate.IPredicate;
import de.uniol.inf.is.odysseus.core.server.physicaloperator.AbstractPipe;
import de.uniol.inf.is.odysseus.core.server.physicaloperator.IHasPredicates;
/**
* @author Marco Grawunder
*/
@SuppressWarnings({ "rawtypes" })
public class RoutePO<T extends IStreamObject<IMetaAttribute>> extends AbstractPipe<T, T> implements IHasPredicates {
private List<IPredicate<?>> predicates;
final boolean overlappingPredicates;
/**
* if an element is routed to an output, heartbeats will be send to all other outputs.
*/
final boolean sendingHeartbeats;
public RoutePO(List<IPredicate<T>> predicates,
boolean overlappingPredicates, boolean sendingHeartbeats) {
super();
this.overlappingPredicates = overlappingPredicates;
this.sendingHeartbeats = sendingHeartbeats;
initPredicates(predicates);
}
@Override
public List<IPredicate<?>> getPredicates() {
return predicates;
}
private void initPredicates(List<IPredicate<T>> predicates) {
this.predicates = new ArrayList<IPredicate<?>>(
predicates.size());
for (IPredicate<?> p : predicates) {
this.predicates.add(p.clone());
}
}
@Override
public OutputMode getOutputMode() {
return OutputMode.INPUT;
}
@SuppressWarnings("unchecked")
@Override
protected void process_next(T object, int port) {
boolean found = false;
Collection<Integer> routedToPorts = null;
if(sendingHeartbeats) {
routedToPorts = new ArrayList<>();
}
for (int i = 0; i < predicates.size(); i++) {
if (((IPredicate<T>)predicates.get(i)).evaluate(object)) {
T out = object;
// If object is send to multiple output ports
// it MUST be cloned!
if (overlappingPredicates) {
out = (T)object.clone();
out.setMetadata(object.getMetadata().clone());
}
transfer(out, i);
found = true;
if ((sendingHeartbeats) && (routedToPorts != null)) {
routedToPorts.add(i);
}
if (!overlappingPredicates) {
break;
}
}
}
if (!found) {
transfer(object, predicates.size());
if ((sendingHeartbeats) && (routedToPorts != null)) {
routedToPorts.add(predicates.size());
}
}
if ((sendingHeartbeats) && (routedToPorts != null)) {
// Sending heartbeats to all other ports
for(int i = 0; i < predicates.size(); i++) {
if(!routedToPorts.contains(i))
this.sendPunctuation(Heartbeat.createNewHeartbeat(((IStreamObject<? extends ITimeInterval>) object).getMetadata().getStart()), i);
}
}
}
@Override
public void processPunctuation(IPunctuation punctuation, int port) {
for (int i=0;i<predicates.size();i++){
sendPunctuation(punctuation,i);
}
}
@Override
public boolean process_isSemanticallyEqual(IPhysicalOperator ipo) {
if (!(ipo instanceof RoutePO)) {
return false;
}
RoutePO spo = (RoutePO) ipo;
if (this.predicates.size() == spo.predicates.size()) {
for (int i = 0; i < this.predicates.size(); i++) {
if (!this.predicates.get(i).equals(spo.predicates.get(i))) {
return false;
}
}
return true;
}
return false;
}
@Override
public void setPredicates(List<IPredicate<?>> predicates) {
this.predicates = predicates;
}
} |
TODO: InputSyncArea, OutputTransferArea
Important: If you create a new StreamObject at runtime you must copy the cloned metadata!
Code Block | ||
---|---|---|
| ||
out.setMetadata(input.getMetadata().clone()); |
Query Rewrite (Step 3)
Rewriting is used to switch, add, remove or replace logical operators before they are transformed into their physical counterparts. Normally, the aim of the rewriting process is to optimize the query, for example, by pushing down selection operator before costly joins. The implementation is done by rewriting rules, which are implemented like transformation rules (see next section).
Transformation (Step 4)
To translate the logical operator into the physical counterpart the transformation engine is needed. The rule for the route operator can be found in the following example. Further information can be found in the description of the transformation component.
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
/**********************************************************************************
* Copyright 2011 The Odysseus Team
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
| ||||||||
Code Block | ||||||||
| ||||||||
package de.uniol.inf.is.odysseus.transformtutorial.rules; import de.uniol.inf.is.odysseus.core.server.logicaloperator.RouteAO; import de.uniol.inf.is.odysseus.core.server.physicaloperator.RoutePO; import de.uniol.inf.is.odysseus.core.server.planmanagement.TransformationConfiguration; import de.uniol.inf.is.odysseus.ruleengine.rule.RuleException; import de.uniol.inf.is.odysseus.ruleengine.ruleflow.IRuleFlowGroup; import de.uniol.inf.is.odysseus.transform.flow.TransformRuleFlowGroup; import de.uniol.inf.is.odysseus.transform.rule.AbstractTransformationRule; @SuppressWarnings({"unchecked","rawtypes"}) public class TRouteAORule extends AbstractTransformationRule<RouteAO> { @Override public int getPriority() { return 0; } @Override public void execute(RouteAO routeAO, TransformationConfiguration config) throws RuleException { defaultExecute(routeAO, new RoutePO(routeAO.getPredicates(), routeAO.isOverlappingPredicates(), routeAO.isSendingHeartbeats()), config, true, true); } @Override public boolean isExecutable(RouteAO operator, TransformationConfiguration transformConfig) { return operator.isAllPhysicalInputSet(); } @Override public String getName() { return "RouteAO -> RoutePO"; } @Override public IRuleFlowGroup getRuleFlowGroup() { return TransformRuleFlowGroup.TRANSFORMATION; } @Override public Class<? super RouteAO> getConditionClass() { return RouteAO.class; } } |