Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The fireProcess methods can be used with ByteBuffers and String-Arrays or with a Generic. In the latter case, the corresponding ProtocolHandler must read this type, else a class cast exception will be thrown.

Important: A transport handler must not send data before processInStart is called!

An example to the use fireProcess-Methods (and processInOpen/processInStart) can be found in the RabbitMQ transport handler:

Code Block
languagejava
        @Override
    public void processInOpen() throws IOException {
        internalOpen();try {
        //   Create ConsumerinternalOpen();
        } booleancatch autoAck(TimeoutException =e1) false;{
        channel.basicConsume(queueName, autoAck, consumerTag,
  // TODO Auto-generated catch block
          new DefaultConsumer(channel  e1.printStackTrace();
        }
    }

    @Override
    public void processInStart() {
        try {
            if (publishStyle == PublishStyle.PublishSubscribe) {
                String queueName = channel.queueDeclare().getQueue();
                channel.queueBind(queueName, exchangeName, "");
            }

            // Create Consumer
            boolean autoAck = false;
            channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, com.rabbitmq.client.Envelope envelope,
                        com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body) throws IOException {
                    // String routingKey = envelope.getRoutingKey();
                    // String contentType = properties.getContentType();
                @Override
    long deliveryTag = envelope.getDeliveryTag();
             public void handleDelivery(
     try {
                      String consumerTag,
 ByteBuffer wrapped = ByteBuffer.wrap(body);
                        comwrapped.rabbitmq.client.Envelope envelope,position(wrapped.limit());
                        fireProcess(wrapped);
    com.rabbitmq.client.AMQP.BasicProperties properties,
                } catch (Exception e) {
          byte[] body) throws IOException {
//          LOG.warn("Error processing input", e);
           String routingKey = envelope.getRoutingKey();
//      }
                  String contentType = propertieschannel.getContentTypebasicAck(deliveryTag, false);
                };
        long  deliveryTag = envelope.getDeliveryTag(});
        
            connection.addShutdownListener(new ShutdownListener()   try{

                @Override
            ByteBuffer wrapped = ByteBuffer.wrap(body);
   public void shutdownCompleted(ShutdownSignalException cause) {
                     wrappedLOG.position(wrapped.limit())warn("Connection shutdown.", cause);
                }
            fireProcess(wrapped});

            channel.addShutdownListener(new      ShutdownListener() {

      }catch(Exception e){
         @Override
                public void  e.printStackTrace();shutdownCompleted(ShutdownSignalException cause) {
                    LOG.warn("Channel shutdown.", cause);
  }
              }
           channel.basicAck(deliveryTag, false });
        } catch (IOException e) {
        };
    throw new StartFailedException(e);
          });

    }

Here you can see, that every source type needs a special handling for sending. Here e.g. a callback object (DefaultConsumer) is defined in RabbitMQ that calls fireProcess.

...