Remark: This handler must be installed (How to install new features).
Important: If you want to use this handler e.g. to send csv data to Odysseus, use CSV protocol handler and send csv-values with a trailing "\n", as the CSV handler needs this! e.g. "9,7,Blue,98.3\n
"
This transport handler allow to send and receive object via the AQMP protocol, implemented with RabbitMQ (see http://www.rabbitmq.com/). Further parameter information can be found here: http://www.rabbitmq.com/api-guide.html
Remark: In an older version, there was a publish_style.
This will be ignored in future versions.
Options
Queuequeue_name:
Thequeue to send to or receive from
exchange_name:
The exchange to publish the message toconsumer_tag:
The consumer taghost:
The host where the server runs
The port of the server (if not default)port
:user
Username (if required)username
:
Password (if required)password
:virtualhost
: The virtual host (if required)durable (true / false)
- exclusive (true / false)
- auto_delete (true / false)
In addition, arbitrary options can bedefined. They need to start with the prefix "rabbit."
which will be removed in the actual argument. Example:
['rabbit.x-message-ttl',1000]
will be the argument 'x-message-ttl'
with the long value '1000'
.
Example
Code Block |
---|
/// Send elements to bus SENDER = SENDER({ #PARSER PQL #REQUIRED de.uniol.inf.is.odysseus.wrapper.rabbitmq.feature #RUNQUERY input = ACCESS({ transport='RabbitMQ', source= 'Receiver', wrapper='GenericPush', protocol='SimpleByteBufferSimpleCSV', datahandler='Tuple', SINK="SENDER",options=[ options=[ ['EXCHANGE_NAME','TelCoSim'], ['QUEUE_NAME','nexmarkDataUsage'], ['CONSUMER_TAG','personOdysseus'], ['HOST','localhost'], ['ByteOrder', 'LittleEndianLittle_Endian'], ['csv.delimiter',';'] ], schema=[ ['TIMESTAMP', 'STARTTIMESTAMP'], }['id', 'String'], nexmark:person ['volume', 'Long'] ] ) /// Receive elements from bus input = ACCESS({ } ) |
See https://git.swl.informatik.uni-oldenburg.de/projects/ODYDATGEN/repos/telcosim/browse/src/main/java/de/uniol/inf/is/telCoSim/Simulation.java how the data is generated for this input queue.
See RabbitMQ use case for an example how to connect external software with Odysseus via RabbitMQ.
Send and Receive with RabbitMQ
Here is a simple example where content is send to RabbitMQ and received with Odysseus again.
Code Block |
---|
#PARSER PQL #ADDQUERY timer = TIMER({PERIOD = 1000, SOURCE = 'timersource'}) SENDER = SENDER({ transport='RabbitMQ', source wrapper= 'ReceiverGenericPush', wrapper protocol='GenericPushcsv', protocoldatahandler='SimpleByteBufferTuple', datahandler='Tuple' SINK="SENDER", options=[ ['QUEUE_NAME','nexmark'], ['CONSUMER_TAG','personbid'], ['HOST','localhost'], ['ByteOrder', 'LittleEndian']] ]}, timer ) |
Receiver
Code Block |
---|
#PARSER PQL #REMOVEQUERY A #QNAME A #RUNQUERY input = ACCESS({ schema=[ ['TIMESTAMP', transport='STARTTIMESTAMPRabbitMQ'], source= ['idReceiver', 'INTEGER'] wrapper='GenericPush', ['name', 'STRING'],protocol='csv', datahandler='Tuple', options=[ ['emailQUEUE_NAME', 'STRINGnexmark'], ['creditcardCONSUMER_TAG', 'STRINGbid'], ['cityHOST', 'STRINGlocalhost'], ['stateByteOrder', 'STRINGLittleEndian'] ], schema=[['time', 'StartTimestamp']] } } ) |