Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Remark: This handler must be installed (How to install new features).

There is now a new version named AMQPTransportHandler, it works the same but is now part of the core system.

Important: If you want to use this handler e.g. to send csv data to Odysseus, use CSV protocol handler and send csv-values with a trailing "\n", as the CSV handler needs this! e.g. "9,7,Blue,98.3\n"

This transport handler allow to send and receive object via the AQMP AMQP protocol, implemented with RabbitMQ (see see http://www.rabbitmq.com/). Further parameter information can be found here: http://www.rabbitmq.com/api-guide.html

...

  • queue_name: The queue to send to or receive from

  • exchange_name: The exchange to publish the message to
  • consumer_tag: The consumer tag

  • host: The host where the server runs

  • port: The port of the server (if not default)

  • username: Username (if required)

  • password: Password (if required)

  • virtualhost: The virtual host (if required)

  • publish_style: Specifies how messages will be sent (default: workqueue)

  • durable (true / false)

  • exclusive (true / false)
  • auto_delete (true / false)

In addition, arbitrary options can bedefinedbe defined. They need to start with the prefix prefix "rabbitamqp." which will be removed in the actual argument. Example: ['rabbitamqp.x-message-ttl',1000]  will be the argument 'x-message-ttl' with the long value '1000'

...

Example

Code Block
/// Send elements to bus (work queue)
SENDER = SENDER({
              transport='RabbitMQ',
              wrapper='GenericPush',
              protocol='SimpleByteBuffer',
              datahandler='Tuple',
              SINK="SENDER",
              options=[
                ['QUEUE_NAME','nexmark'],
                ['CONSUMER_TAG','person'],
                ['HOST','localhost'],
                ['ByteOrder', 'Little_Endian'] 
              ]                        
            },
            nexmark:person
          )

/// Receive elements from bus (work queue)
#PARSER PQL
#RUNQUERY
input = ACCESS({
            transport='RabbitMQAMQP',
            source= 'Receiver',
            wrapper='GenericPush',
            protocol='SimpleByteBufferSimpleCSV',
            datahandler='Tuple',
            options=[
			  ['PUBLISH_STYLE','workqueue'], 
              ['QUEUEEXCHANGE_NAME','nexmarkTelCoSim'],
              ['CONSUMERQUEUE_TAGNAME','personDataUsage'],
              ['HOSTCONSUMER_TAG','localhostOdysseus'],
              ['ByteOrderHOST', 'Little_Endianlocalhost'], 
              ['durableByteOrder',true], 
              ['exclusive',false], 'Little_Endian'],
              ['auto_delete',false], 
              ['rabbit.x-message-ttl',1000csv.delimiter',';']
            ],
            schema=[
              ['TIMESTAMP', 'STARTTIMESTAMP'],
              ['id', 'INTEGERString'],
              ['namevolume', 'STRING'],
              ['email', 'STRING'],
 Long']
             ['creditcard', 'STRING'],
              ['city', 'STRING'],
              ['state', 'STRING']
            ]                       
            }         
          }                
        )


Publish-subscribe example

See https://git.swl.informatik.uni-oldenburg.de/projects/ODYDATGEN/repos/telcosim/browse/src/main/java/de/uniol/inf/is/telCoSim/Simulation.java how the data is generated for this input queue.

See RabbitMQ use case for an example how to connect external software with Odysseus via RabbitMQ.

Send and Receive with AMQP

Here is a simple example where content is send to an AMQP broker (here RabbitMQ) and received with Odysseus again.

Code Block
#PARSER PQL
#ADDQUERY
timer = TIMER({PERIOD = 1000, SOURCE = 'timersource'})
Code Block
/// Send elements to bus (publish-subscribe)
SENDER = SENDER({
              transport='RabbitMQAMQP',
              wrapper='GenericPush',
              protocol='SimpleByteBuffercsv',
              datahandler='Tuple',
              SINK="SENDER",
              options=[
                ['EXCHANGEQUEUE_NAME','nexmark'],
                ['PUBLISH_STYLE','publishsubscribe'],
                ['CONSUMER_TAG','personbid'],
                ['HOST','localhost'],
                ['ByteOrder', 'Little_Endian']
              ]                        
            },
            nexmark:persontimer
          )

Receiver

Code Block
#PARSER PQL

/// Receive elements from bus (publish-subscribe)#REMOVEQUERY A
#QNAME A
#RUNQUERY
input = ACCESS({
            transport='RabbitMQAMQP',
            source= 'Receiver',
            wrapper='GenericPush',
            protocol='SimpleByteBuffercsv',
            datahandler='Tuple',
            options=[
              ['EXCHANGEQUEUE_NAME','nexmark'],
              ['PUBLISH_STYLE','publishsubscribe'],
              ['CONSUMER_TAG','personbid'],
              ['HOST','localhost'],
              ['ByteOrder', 'Little_EndianLittleEndian']
            ],
            schema=[
              ['TIMESTAMP', 'STARTTIMESTAMP'],
              ['idtime', 'INTEGER'],
              ['name', 'STRING'],
              ['email', 'STRING'],
StartTimestamp']]              ['creditcard', 'STRING'],
              ['city', 'STRING'],
              ['state', 'STRING']
            ]                                            
          }                
        )