Remark: This handler must be installed (How to install new features).
Important: If you want to use this handler e.g. to send csv data to Odysseus, use CSV protocol handler and send csv-values with a trailing "\n", as the CSV handler needs this! e.g. "9,7,Blue,98.3\n
"
This transport handler allow to send and receive object via the AQMP protocol, implemented with RabbitMQ (see http://www.rabbitmq.com/). Further parameter information can be found here: http://www.rabbitmq.com/api-guide.html
Remark: In an older version, there was a publish_style.
This will be ignored in future versions.
queue_name:
The queue to send to or receive from
exchange_name:
The exchange to publish the message toconsumer_tag:
The consumer tag
host:
The host where the server runs
The port of the server (if not default)port
:
Username (if required)username
:
Password (if required)password
:
virtualhost
: The virtual host (if required)
durable (true / false)
In addition, arbitrary options can bedefined. They need to start with the prefix "rabbit."
which will be removed in the actual argument. Example:
['rabbit.x-message-ttl',1000]
will be the argument 'x-message-ttl'
with the long value '1000'
.
#PARSER PQL #REQUIRED de.uniol.inf.is.odysseus.wrapper.rabbitmq.feature #RUNQUERY input = ACCESS({ transport='RabbitMQ', source= 'Receiver', wrapper='GenericPush', protocol='SimpleCSV', datahandler='Tuple', options=[ ['EXCHANGE_NAME','TelCoSim'], ['QUEUE_NAME','DataUsage'], ['CONSUMER_TAG','Odysseus'], ['HOST','localhost'], ['ByteOrder', 'Little_Endian'], ['csv.delimiter',';'] ], schema=[ ['TIMESTAMP', 'STARTTIMESTAMP'], ['id', 'String'], ['volume', 'Long'] ] } ) |
See https://git.swl.informatik.uni-oldenburg.de/projects/ODYDATGEN/repos/telcosim/browse/src/main/java/de/uniol/inf/is/telCoSim/Simulation.java how the data is generated for this input queue.
See RabbitMQ use case for an example how to connect external software with Odysseus via RabbitMQ.
Here is a simple example where content is send to RabbitMQ and received with Odysseus again.
#PARSER PQL #ADDQUERY timer = TIMER({PERIOD = 1000, SOURCE = 'timersource'}) SENDER = SENDER({ transport='RabbitMQ', wrapper='GenericPush', protocol='csv', datahandler='Tuple', SINK="SENDER", options=[ ['QUEUE_NAME','nexmark'], ['CONSUMER_TAG','bid'], ['HOST','localhost'] ] }, timer ) |
Receiver
#PARSER PQL #REMOVEQUERY A #QNAME A #RUNQUERY input = ACCESS({ transport='RabbitMQ', source= 'Receiver', wrapper='GenericPush', protocol='csv', datahandler='Tuple', options=[ ['QUEUE_NAME','nexmark'], ['CONSUMER_TAG','bid'], ['HOST','localhost'], ['ByteOrder', 'LittleEndian'] ], schema=[['time', 'StartTimestamp']] } |