...
The fireProcess methods can be used with ByteBuffers and String-Arrays or with a Generic. In the latter case, the corresponding ProtocolHandler must read this type, else a class cast exception will be thrown.
Important: A transport handler must not send data before processInStart is called!
An example to the use fireProcess-Methods (and processInOpen/processInStart) can be found in the RabbitMQ transport handler:
Code Block | ||
---|---|---|
| ||
@Override public void processInOpen() throws IOException { internalOpen();try { // Create ConsumerinternalOpen(); } booleancatch autoAck(TimeoutException =e1) false;{ channel.basicConsume(queueName, autoAck, consumerTag, // TODO Auto-generated catch block new DefaultConsumer(channel e1.printStackTrace(); } } @Override public void processInStart() { try { if (publishStyle == PublishStyle.PublishSubscribe) { String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, exchangeName, ""); } // Create Consumer boolean autoAck = false; channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, com.rabbitmq.client.Envelope envelope, com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body) throws IOException { // String routingKey = envelope.getRoutingKey(); // String contentType = properties.getContentType(); @Override long deliveryTag = envelope.getDeliveryTag(); public void handleDelivery( try { String consumerTag, ByteBuffer wrapped = ByteBuffer.wrap(body); comwrapped.rabbitmq.client.Envelope envelope,position(wrapped.limit()); fireProcess(wrapped); com.rabbitmq.client.AMQP.BasicProperties properties, } catch (Exception e) { byte[] body) throws IOException { // LOG.warn("Error processing input", e); String routingKey = envelope.getRoutingKey(); // } String contentType = propertieschannel.getContentTypebasicAck(deliveryTag, false); }; long deliveryTag = envelope.getDeliveryTag(}); connection.addShutdownListener(new ShutdownListener() try{ @Override ByteBuffer wrapped = ByteBuffer.wrap(body); public void shutdownCompleted(ShutdownSignalException cause) { wrappedLOG.position(wrapped.limit())warn("Connection shutdown.", cause); } fireProcess(wrapped}); channel.addShutdownListener(new ShutdownListener() { }catch(Exception e){ @Override public void e.printStackTrace();shutdownCompleted(ShutdownSignalException cause) { LOG.warn("Channel shutdown.", cause); } } channel.basicAck(deliveryTag, false }); } catch (IOException e) { }; throw new StartFailedException(e); }); } |
Here you can see, that every source type needs a special handling for sending. Here e.g. a callback object (DefaultConsumer) is defined in RabbitMQ that calls fireProcess.
...