...
Code Block | ||
---|---|---|
| ||
@Override public void processInOpen() throws IOException { if (!preload) { final File file = new File(filename); try { in = new FileInputStream(file); fireOnConnect(); } catch (Exception e) { fireOnDisconnect(); throw e; } } else { fis = new FileInputStream(filename); FileChannel channel = fis.getChannel(); long size = channel.size(); double x = size / (double) Integer.MAX_VALUE; int n = (int) Math.ceil(x); ByteBuffer buffers[] = new ByteBuffer[n]; for (int i = 0; i < n; i++) { buffers[i] = ByteBuffer.allocateDirect(Integer.MAX_VALUE); channel.read(buffers[i]); buffers[i].rewind(); } in = createInputStream(buffers); fireOnConnect(); } } @Override public void processInClose() throws IOException { super.processInClose(); if (fis != null) { fis.close(); } } @Override public void processOutOpen() throws IOException { final File file = new File(filename); try { out = new FileOutputStream(file, append); fireOnConnect(); } catch (Exception e) { fireOnDisconnect(); throw e; } } @Override public void processOutClose() throws IOException { fireOnDisconnect(); out.flush(); out.close(); } |
Imporant: Use processInStart if the transport handler connects to a service that immediatly sends data!
Hint: There are scenarios in which it is not feasible to separate transport and protocol layer. In such cases, one can implement the combination as a transport handler and use it in combination with the "None" protocol handler.
Generic Pull
After the connection is inialized, the framework tries to retrieve data from the TransportHandler. To be generic, we decided to use an InputStream for sources and an OutputStream for sinks. So the following methods need to be overwritten (Remark: It its not necessary to implement both methods, if the TransportHandler e.g. should only be used for sources):
...
Code Block | ||
---|---|---|
| ||
public boolean isDone(); |
can be overwrittten.
Simple Transport Handler
Another way, implementing a transport handler that is less generic and delivers e.g. tuples directly can be found in the next example. In this case, the protocol handler must be 'none' ..
TODO ... build AbstractSimplePullTransportHandler
Code Block | ||
---|---|---|
| ||
package de.uniol.inf.is.odysseus.wrapper.temper1.physicaloperator.access; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.util.util.Map; import java.util.Random; import com.codeminders.hidapi.ClassPathLibraryLoader; import de.uniol.inf.is.odysseus.core.collection.Tuple; import de.uniol.inf.is.odysseus.core.physicaloperator.access.protocol.IIteratable; import de.uniol.inf.is.odysseus.core.physicaloperator.access.protocol.IProtocolHandler; import de.uniol.inf.is.odysseus.core.physicaloperator.access.transport.AbstractTransportHandlerAbstractSimplePullTransportHandler; import de.uniol.inf.is.odysseus.core.physicaloperator.access.transport.ITransportHandler; public class Temper1TransportHandlerRandomTransportHandler extends AbstractTransportHandler implements IIteratable<Tuple<AbstractSimplePullTransportHandler<Tuple<?>>{ private static final String NAME = "Temper1Random"; private static final Random RAND = new Random(); static { ClassPathLibraryLoader.loadNativeHIDLibrary(); } @Override @Override public ITransportHandler createInstance(IProtocolHandler<?> protocolHandler, Map<String, String> options) { Temper1TransportHandlerRandomTransportHandler tHandler = new Temper1TransportHandlerRandomTransportHandler(); protocolHandler.setTransportHandler(tHandler); return tHandler; } @Override public String getName() { return NAME; } private static float readDevice() { return 20f + ( 10 * RAND.nextFloat()); } @Override public boolean hasNext() { return true; } @SuppressWarnings("rawtypes") @Override public Tuple<?> getNext() { Tuple<?> tuple = new Tuple(1, false); tuple.setAttribute(0, readDevice()); return tuple; } @Override public voidboolean processInOpenisSemanticallyEqualImpl() throws IOExceptionITransportHandler other) { } @Override public void processOutOpen() throws IOException {return false; } @Override } |
GenericPush
In generic push szenarios for sources there is no method that can be overwritten because it depends on the transport type and e.g. libararies that receive data from external sources. The information that is read must be send to the corresponding transport handler. To simplify the process, AbstractTransportHandler(Delegate) provides the following methods that should be used:
Code Block | ||
---|---|---|
| ||
public void processInClosefireProcess(ByteBuffer message) throws IOException { } @Override for (ITransportHandlerListener<T> l public void processOutClose(: transportHandlerListener) throws IOException { } @Override public// voidTODO: sendflip(byte[] message) throwserases IOExceptionthe { contents of the message }if @Override public InputStream getInputStream() { // it was already flipped or just return null; created... } @Override // publicIn OutputStream getOutputStream() { return null; other words: This method expects that the byte buffer } @Override // is not publicfully booleanprepared isSemanticallyEqualImpl(ITransportHandler other) { return falsemessage.flip(); } } |
GenericPush
l.process(message);
}
}
public void fireProcess(T m) {
for (ITransportHandlerListener<T> l : transportHandlerListener) {
l.process(m);
}
}
public void fireProcess(String[] message) {
for (ITransportHandlerListener<T> l : transportHandlerListener) {
l.process(message);
}
} |
The fireProcess methods can be used with ByteBuffers and String-Arrays or with a Generic. In the latter case, the corresponding ProtocolHandler must read this type, else a class cast exception will be thrown.
Important: A transport handler must not send data before processInStart is called!
An example to the use fireProcess-Methods (and processInOpen/processInStart) can be found in the RabbitMQ transport handlerIn generic push szenarios for sources there is no method that can be overwritten because it depends on the transport type and e.g. libararies that receive data from external sources. The information that is read must be send to the corresponding transport handler. To simplify the process, AbstractTransportHandler(Delegate) provides the following methods that should be used:
Code Block | ||
---|---|---|
| ||
@Override public void fireProcess(ByteBuffer message)processInOpen() throws IOException { try { for (ITransportHandlerListener<T> l : transportHandlerListenerinternalOpen(); } catch (TimeoutException e1) { // TODO: flip() erases the contents of the message if Auto-generated catch block e1.printStackTrace(); } // it was already} flipped or just created...@Override public void processInStart() { // In othertry words:{ This method expects that the byte buffer if (publishStyle == PublishStyle.PublishSubscribe) { // is not fully prepared String queueName = message.flipchannel.queueDeclare().getQueue(); l.process(message channel.queueBind(queueName, exchangeName, ""); } } public void fireProcess(T m) { for// (ITransportHandlerListener<T>Create lConsumer : transportHandlerListener) { boolean autoAck l.process(m)= false; } channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) { } public void fireProcess(String[] message) {@Override for (ITransportHandlerListener<T> l : transportHandlerListener) { public void handleDelivery(String consumerTag, com.rabbitmq.client.Envelope envelope, l.process(message); } } |
The fireProcess methods can be used with ByteBuffers and String-Arrays or with a Generic. In the latter case, the corresponding ProtocolHandler must read this type, else a class cast exception will be thrown.
An example to the use fireProcess-Methods can be found in the RabbitMQ transport handler:
Code Block | ||
---|---|---|
| ||
@Override public void processInOpen(com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body) throws IOException { internalOpen(); // Create Consumer // boolean autoAckString routingKey = falseenvelope.getRoutingKey(); channel.basicConsume(queueName, autoAck, consumerTag, // String contentType new= DefaultConsumerproperties.getContentType(channel) {; @Override long deliveryTag = envelope.getDeliveryTag(); public void handleDelivery( try { ByteBuffer wrapped String consumerTag,= ByteBuffer.wrap(body); com.rabbitmq.client.Envelope envelope,wrapped.position(wrapped.limit()); fireProcess(wrapped); com.rabbitmq.client.AMQP.BasicProperties properties, } catch (Exception e) { byte[] body) throws IOException { // LOG.warn("Error processing input", e); String routingKey = envelope.getRoutingKey(); // } String contentType = propertieschannel.getContentTypebasicAck(deliveryTag, false); }; long deliveryTag = envelope.getDeliveryTag(}); connection.addShutdownListener(new ShutdownListener() try{ @Override ByteBuffer wrapped = ByteBuffer.wrap(body); public void shutdownCompleted(ShutdownSignalException cause) { wrappedLOG.position(wrapped.limit())warn("Connection shutdown.", cause); } fireProcess(wrapped}); channel.addShutdownListener(new ShutdownListener() { }catch(Exception e){ @Override public void e.printStackTrace();shutdownCompleted(ShutdownSignalException cause) { LOG.warn("Channel shutdown.", cause); } } channel.basicAck(deliveryTag, false }); } catch (IOException e) { }; throw new StartFailedException(e); }); } |
Here you can see, that every source type needs a special handling for sending. Here e.g. a callback object (DefaultConsumer) is defined in RabbitMQ that calls fireProcess.
...
These methode are defined in the interface ITransportHandlerListener that is implemented by IProtocolHandler, the basic interface for ProtocolHandler.
Registering the handler
Odysseus is OSGi based and all the handler are implemented as declartive services.
...