Remark: This handler must be installed (How to install new features).
There is now a new version named AMQPTransportHandler, it works the same but is now part of the core system.
Important: If you want to use this handler e.g. to send csv data to Odysseus, use CSV protocol handler and send csv-values with a trailing "\n", as the CSV handler needs this! e.g. "9,7,Blue,98.3\n"
This transport handler allow to send and receive object via the AQMP AMQP protocol, implemented with RabbitMQ (see see http://www.rabbitmq.com/). Further parameter information can be found here: http://www.rabbitmq.com/api-guide.html
Remark: In an older version, there was a publish_style. This will be ignored in future versions.
Options
queue_name:The queue to send to or receive fromexchange_name:The exchange to publish the message toconsumer_tag:The consumer taghost:The host where the server runsThe port of the server (if not default)port:Username (if required)username:Password (if required)password:virtualhost: The virtual host (if required)durable (true / false)
- exclusive (true / false)
- auto_delete (true / false)
In addition, arbitrary options can bedefinedbe defined. They need to start with the prefix prefix "rabbitamqp." which will be removed in the actual argument. Example: ['rabbitamqp.x-message-ttl',1000] will be the argument 'x-message-ttl' with the long value '1000'.
Example
| Code Block |
|---|
#PARSER PQL #REQUIRED de.uniol.inf.is.odysseus.wrapper.rabbitmq.feature #RUNQUERY input = ACCESS({ transport='RabbitMQAMQP', source= 'Receiver', wrapper='GenericPush', protocol='SimpleCSV', datahandler='Tuple', options=[ ['EXCHANGE_NAME','TelCoSim'], ['QUEUE_NAME','DataUsage'], ['CONSUMER_TAG','Odysseus'], ['HOST','localhost'], ['ByteOrder', 'Little_Endian'], ['csv.delimiter',';'] ], schema=[ ['TIMESTAMP', 'STARTTIMESTAMP'], ['id', 'String'], ['volume', 'Long'] ] } ) |
...
See RabbitMQ use case for an example how to connect external software with Odysseus via RabbitMQ.
Send and Receive with
...
AMQP
Here is a simple example where content is send to an AMQP broker (here RabbitMQ) and received with Odysseus again.
| Code Block |
|---|
#PARSER PQL
#ADDQUERY
timer = TIMER({PERIOD = 1000, SOURCE = 'timersource'})
SENDER = SENDER({
transport='RabbitMQAMQP',
wrapper='GenericPush',
protocol='csv',
datahandler='Tuple',
SINK="SENDER",
options=[
['QUEUE_NAME','nexmark'],
['CONSUMER_TAG','bid'],
['HOST','localhost']
]
},
timer
) |
...
| Code Block |
|---|
#PARSER PQL
#REMOVEQUERY A
#QNAME A
#RUNQUERY
input = ACCESS({
transport='RabbitMQAMQP',
source= 'Receiver',
wrapper='GenericPush',
protocol='csv',
datahandler='Tuple',
options=[
['QUEUE_NAME','nexmark'],
['CONSUMER_TAG','bid'],
['HOST','localhost'],
['ByteOrder', 'LittleEndian']
],
schema=[['time', 'StartTimestamp']]
} |
...