Date: Thu, 28 Mar 2024 15:19:57 +0100 (CET) Message-ID: <2012161587.27.1711635597782@vmisdata19.uni-oldenburg.de> Subject: Exported From Confluence MIME-Version: 1.0 Content-Type: multipart/related; boundary="----=_Part_26_1119249053.1711635597781" ------=_Part_26_1119249053.1711635597781 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable Content-Location: file:///C:/exported.html
To create a new TransportHandler, the Interface ITransportHandle= r must be implemented or the class AbstractTransportHandler be extended.
Depending on the way, the handler works, different methods need to be im= plemented.
public = ITransportHandler createInstance(IProtocolHandler<?> protocolHandler,= Map<String, String> options);
This method must return a new initialized transport handler. Typically, = the constructor is called. When using Procedural Query Language (PQL) the options map is = generated automatically from the option field (e.g. Access operator)
@Ov= erride public ITransportHandler createInstance( IProtocolHandler<?> protocolHandler, Map<String, Strin= g> options) { return new FileHandler(protocolHandler, options); }
The methode getName() must deliver a global unique transport handler nam= e.
Str= ing getName();
Its a good was to use this a follows (again example of FileHandler):
=09publ= ic static final String NAME =3D "File"; =20 =09@Override public String getName() { return NAME; }
When implementing ITransportHandler, open and close need to be implement= ed.
Hint: In the following we will assume, that AbstractTransportHandler wil= l be overwritten.
AbstractTransportHandler provides already default implementations that c= annot be overwritten (its implementend in AbstractTransportHandlerDelegate)= :
fin= al synchronized public void open() throws UnknownHostException, IOException { if (openCounter =3D=3D 0) { if (getExchangePattern() !=3D null && (getExchangePattern().equals( ITransportExchangePattern.InOnly) || getExchangePattern().equals( ITransportExchangePattern.InOptionalOut= ) || getExchangePattern() .equals(ITransportExchangePattern.InOut))) { callOnMe.processInOpen(); } if (getExchangePattern() !=3D null && (getExchangePattern().equals( ITransportExchangePattern.OutOnly) || getExchangePattern().equals( ITransportExchangePattern.OutOptionalIn= ) || getExchangePattern() .equals(ITransportExchangePattern.InOut))) { callOnMe.processOutOpen(); } } openCounter++; } =20 final synchronized public void close() throws IOException { openCounter--; if (openCounter =3D=3D 0) { if (getExchangePattern() !=3D null && (getExchangePattern().equals( ITransportExchangePattern.InOnly) || getExchangePattern().equals( ITransportExchangePattern.InOptionalOut= ) || getExchangePattern() .equals(ITransportExchangePattern.InOut))) { callOnMe.processInClose(); } if (getExchangePattern() !=3D null && (getExchangePattern().equals( ITransportExchangePattern.OutOnly) || getExchangePattern().equals( ITransportExchangePattern.OutOptionalIn= ) || getExchangePattern() .equals(ITransportExchangePattern.InOut))) { callOnMe.processOutClose(); } } }
A TransportHandler can provide different exchange pattern. The handler m= ust deliver the pattern when calling the following method:
pub= lic ITransportExchangePattern getExchangePattern();
Currently, the following values are available (https://en.wikipedia.org/wiki/Message_Exchange_Pattern):
AbstractTransportHandler calls according to the exchange pattern the cor= responding methods:
In this methods the TransportHandler must open or close the connections.= The "IN"-methods are called for sources, the "OUT" for sinks. When startin= g or stopping a query, open and close are called respectively.
In the following again the implemenations for the FileHandler
@Ov= erride public void processInOpen() throws IOException { =20 if (!preload) { final File file =3D new File(filename); try { in =3D new FileInputStream(file); fireOnConnect(); } catch (Exception e) { fireOnDisconnect(); throw e; } } else { fis =3D new FileInputStream(filename); FileChannel channel =3D fis.getChannel(); long size =3D channel.size(); double x =3D size / (double) Integer.MAX_VALUE; int n =3D (int) Math.ceil(x); ByteBuffer buffers[] =3D new ByteBuffer[n]; for (int i =3D 0; i < n; i++) { buffers[i] =3D ByteBuffer.allocateDirect(Integer.MAX_VALUE)= ; channel.read(buffers[i]); buffers[i].rewind(); } in =3D createInputStream(buffers); fireOnConnect(); } } @Override public void processInClose() throws IOException { super.processInClose(); if (fis !=3D null) { fis.close(); } } @Override public void processOutOpen() throws IOException { =20 final File file =3D new File(filename); try { out =3D new FileOutputStream(file, append); fireOnConnect(); } catch (Exception e) { fireOnDisconnect(); throw e; } } @Override public void processOutClose() throws IOException { fireOnDisconnect(); out.flush(); out.close(); }
Imporant: Use processInStart if the transport handler connects t= o a service that immediatly sends data!
Hint: There are scenarios in which it is not feasible to separate transp= ort and protocol layer. In such cases, one can implement the combination as= a transport handler and use it in combination with the "None" protocol han= dler.
After the connection is inialized, the framework tries to retrieve data = from the TransportHandler. To be generic, we decided to use an InputStream = for sources and an OutputStream for sinks. So the following methods need to= be overwritten (Remark: It its not necessary to implement both methods, if= the TransportHandler e.g. should only be used for sources):
public = InputStream getInputStream(); public OutputStream getOutputStream();
A typical implementation in FileHandler:
@Ov= erride public InputStream getInputStream() { return in; } @Override public OutputStream getOutputStream() { return out; }
In some cases, sources deliver not an endless data stream. For such case= s the method
public= boolean isDone();
can be overwrittten.
Another way, implementing a transport handler that is less generic and d= elivers e.g. tuples directly can be found in the next example. In this case= , the protocol handler must be 'none' ..
package= de.uniol.inf.is.odysseus.wrapper.temper1.physicaloperator.access; import java.util.Map; import java.util.Random; import de.uniol.inf.is.odysseus.core.collection.Tuple; import de.uniol.inf.is.odysseus.core.physicaloperator.access.protocol.IProt= ocolHandler; import de.uniol.inf.is.odysseus.core.physicaloperator.access.transport.Abst= ractSimplePullTransportHandler; import de.uniol.inf.is.odysseus.core.physicaloperator.access.transport.ITra= nsportHandler; public class RandomTransportHandler extends AbstractSimplePullTransportHand= ler<Tuple<?>>{ private static final String NAME =3D "Random"; private static final Random RAND =3D new Random(); =20 @Override public ITransportHandler createInstance(IProtocolHandler<?> proto= colHandler, Map<String, String> options) { RandomTransportHandler tHandler =3D new RandomTransportHandler(); =20 protocolHandler.setTransportHandler(tHandler); =20 return tHandler; } @Override public String getName() { return NAME; } =20 private static float readDevice() { return 20f + ( 10 * RAND.nextFloat()); } @Override public boolean hasNext() { return true; } @SuppressWarnings("rawtypes") @Override public Tuple<?> getNext() { Tuple<?> tuple =3D new Tuple(1, false); tuple.setAttribute(0, readDevice()); return tuple; } =20 @Override public boolean isSemanticallyEqualImpl(ITransportHandler other) { return false; } }
In generic push szenarios for sources there is no method that can be ove= rwritten because it depends on the transport type and e.g. libararies that = receive data from external sources. The information that is read must be se= nd to the corresponding transport handler. To simplify the process, Abstrac= tTransportHandler(Delegate) provides the following methods that should be u= sed:
pub= lic void fireProcess(ByteBuffer message) { for (ITransportHandlerListener<T> l : transportHandlerListene= r) { // TODO: flip() erases the contents of the message if // it was already flipped or just created... // In other words: This method expects that the byte buffer // is not fully prepared message.flip(); l.process(message); } } =20 public void fireProcess(T m) { for (ITransportHandlerListener<T> l : transportHandlerListene= r) { l.process(m); } =20 } public void fireProcess(String[] message) { for (ITransportHandlerListener<T> l : transportHandlerListene= r) { l.process(message); } }
The fireProcess methods can be used with ByteBuffers and String-Arrays o= r with a Generic. In the latter case, the corresponding ProtocolHandler mus= t read this type, else a class cast exception will be thrown.
Important: A transport handler must not send data before processInStart = is called!
An example to the use fireProcess-Methods (and processInOpen/processInSt= art) can be found in the RabbitMQ transport handler:
= @Override public void processInOpen() throws IOException { try { internalOpen(); } catch (TimeoutException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } } @Override public void processInStart() { try { if (publishStyle =3D=3D PublishStyle.PublishSubscribe) { String queueName =3D channel.queueDeclare().getQueue(); channel.queueBind(queueName, exchangeName, ""); } // Create Consumer boolean autoAck =3D false; channel.basicConsume(queueName, autoAck, consumerTag, new Defau= ltConsumer(channel) { @Override public void handleDelivery(String consumerTag, com.rabbitmq= .client.Envelope envelope, com.rabbitmq.client.AMQP.BasicProperties properties= , byte[] body) throws IOException { // String routingKey =3D envelope.getRoutingKey(); // String contentType =3D properties.getContentType(); long deliveryTag =3D envelope.getDeliveryTag(); try { ByteBuffer wrapped =3D ByteBuffer.wrap(body); wrapped.position(wrapped.limit()); fireProcess(wrapped); } catch (Exception e) { LOG.warn("Error processing input", e); } channel.basicAck(deliveryTag, false); }; }); connection.addShutdownListener(new ShutdownListener() { @Override public void shutdownCompleted(ShutdownSignalException cause= ) { LOG.warn("Connection shutdown.", cause); } }); channel.addShutdownListener(new ShutdownListener() { @Override public void shutdownCompleted(ShutdownSignalException cause= ) { LOG.warn("Channel shutdown.", cause); } }); } catch (IOException e) { throw new StartFailedException(e); } }
Here you can see, that every source type needs a special handling for se= nding. Here e.g. a callback object (DefaultConsumer) is defined in RabbitMQ= that calls fireProcess.
Two additional methods are used to inform listener about connection stat= es (connect and disconnect)
pub= lic void fireOnConnect(ITransportHandler handler) { for (ITransportHandlerListener<T> l : transportHandlerListene= r) { l.onConnect(handler); } } public void fireOnDisconnect(ITransportHandler handler) { for (ITransportHandlerListener<T> l : transportHandlerListene= r) { l.onDisonnect(handler); } }
These methode are defined in the interface ITransportHandlerListener tha= t is implemented by IProtocolHandler, the basic interface for ProtocolHandl= er.
Odysseus is OSGi based and all the handler are implemented as declartive= services.
For this, you have to create a xml file, typically placed under a folder= called OSGI-INF, where you state the global unique name of the handler, th= e implemenation class and the interface that this handler provides. In the = following is the example for the FacebookTransportHandler.
<?xml= version=3D"1.0" encoding=3D"UTF-8"?> <scr:component xmlns:scr=3D"http://www.osgi.org/xmlns/scr/v1.1.0" name= =3D"de.uniol.inf.is.odysseus.wrapper.facebook.physicaloperator.access.Faceb= ookTransportHandler"> <implementation class=3D"de.uniol.inf.is.odysseus.wrapper.facebook.ph= ysicaloperator.access.FacebookTransportHandler"/> <service> <provide interface=3D"de.uniol.inf.is.odysseus.core.physicaloperat= or.access.transport.ITransportHandler"/> </service> </scr:component>
The file MANIFEST.MF (typically provided in META-INF) must contain a hin= t to this new file
Service= -Component: OSGI-INF/FacebookTransportHandler.xml, OSGI-INF/FacebookProtoco= lHandler.xml
Remark: An eclipse wizzard can be used to create this file and the refer= ence inside the MANIFEST.MF: File/New/Component Definition