...
Imporant: Use processInStart if the transport handler connects to a service that immediatly sends data!
Hint: There are scenarios in which it is not feasible to separate transport and protocol layer. In such cases, one can implement the combination as a transport handler and use it in combination with the "None" protocol handler.
Generic Pull
After the connection is inialized, the framework tries to retrieve data from the TransportHandler. To be generic, we decided to use an InputStream for sources and an OutputStream for sinks. So the following methods need to be overwritten (Remark: It its not necessary to implement both methods, if the TransportHandler e.g. should only be used for sources):
...
The fireProcess methods can be used with ByteBuffers and String-Arrays or with a Generic. In the latter case, the corresponding ProtocolHandler must read this type, else a class cast exception will be thrown.
Important: A transport handler must not send data before processInStart is called!
An example to the use fireProcess-Methods (and processInOpen/processInStart) can be found in the RabbitMQ transport handler:
Code Block | ||
---|---|---|
| ||
@Override public void processInOpen() throws IOException { internalOpen();try { // Create ConsumerinternalOpen(); boolean autoAck = false;} catch (TimeoutException e1) { channel.basicConsume(queueName, autoAck, consumerTag, // TODO Auto-generated catch block new DefaultConsumere1.printStackTrace(channel) { ; } } @Override public void processInStart() { try { if (publishStyle == PublishStyle.PublishSubscribe) { String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, exchangeName, ""); } // Create Consumer boolean autoAck = false; channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, com.rabbitmq.client.Envelope envelope, com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body) throws IOException { // String routingKey = envelope.getRoutingKey(); // String contentType = properties.getContentType(); @Override long deliveryTag = envelope.getDeliveryTag(); public void handleDelivery( try { String consumerTag, ByteBuffer wrapped = ByteBuffer.wrap(body); comwrapped.rabbitmq.client.Envelope envelope,position(wrapped.limit()); fireProcess(wrapped); com.rabbitmq.client.AMQP.BasicProperties properties, } catch (Exception e) { byte[] body) throws IOException { // LOG.warn("Error processing input", e); String routingKey = envelope.getRoutingKey(); // } String contentType = propertieschannel.getContentTypebasicAck(deliveryTag, false); }; long deliveryTag = envelope.getDeliveryTag(}); connection.addShutdownListener(new ShutdownListener() try{ @Override ByteBuffer wrapped = ByteBuffer.wrap(body); public void shutdownCompleted(ShutdownSignalException cause) { wrappedLOG.position(wrapped.limit())warn("Connection shutdown.", cause); } fireProcess(wrapped}); channel.addShutdownListener(new ShutdownListener() { }catch(Exception e){ @Override public void e.printStackTrace();shutdownCompleted(ShutdownSignalException cause) { LOG.warn("Channel shutdown.", cause); } } channel.basicAck(deliveryTag, false }); } catch (IOException e) { }; throw new StartFailedException(e); }); } |
Here you can see, that every source type needs a special handling for sending. Here e.g. a callback object (DefaultConsumer) is defined in RabbitMQ that calls fireProcess.
...