This transport handler allows to communicate with an MQTT broker (e.g. Mosquito)
The library used here for processing is PAHO.
To use this transport handler you need to install the MQTT Wrapper Feature.
Options
- topic: The topic to send or receive from
- broker: The adress of the broker
- client_id: The name of the client. Important: If you use Odysseus for sending and receiving, there must be different client_ids!
Example
// Receiving elements #PARSER PQL #RUNQUERY in = RECEIVE({ transport = 'MQTT', protocol = 'simplecsv', source = 'mqttsource', datahandler = 'Tuple', options = [ ['topic','Test'], ['Broker','tcp://localhost:1883'], ['Client_ID','OdysseusReceiver'] ], schema = [['Message', 'String']] } ) // Sending elements #PARSER PQL #RUNQUERY ping = TIMER({PERIOD = 1000, SOURCE = 'Timer'}) input = MAP({EXPRESSIONS = ['"Hallo " + toString(counter())']}, ping) out = SENDER({ transport = 'MQTT', protocol = 'simplecsv', sink = 'mqttsink', WRAPPER = 'GenericPush', datahandler = 'Tuple', options = [ ['topic','Test'], ['Broker','tcp://localhost:1883'], ['Client_ID','OdysseusSender'] ] }, input )
Remark: Protocol and data handler are just examples. MQTT can be used with any other protcol and data handler (e.g. KeyValue):
#PARSER PQL #RUNQUERY in = RECEIVE({ transport = 'MQTT', protocol = 'JSON', source = 'mqttsource', datahandler = 'KeyValueObject', options = [ ['topic','Test'], ['Broker','tcp://localhost:1883'], ['Client_ID','OdysseusReceiverKV'] ] } ) #PARSER PQL #RUNQUERY ping = TIMER({PERIOD = 1000, SOURCE = 'Timer'}) input = MAP({EXPRESSIONS = [['"Hallo " + toString(counter())','message']]}, ping) kv_input = TUPLETOKEYVALUE(input) out = SENDER({ transport = 'MQTT', protocol = 'JSON', sink = 'mqttsink', WRAPPER = 'GenericPush', datahandler = 'KeyValueObject', options = [ ['topic','Test'], ['Broker','tcp://localhost:1883'], ['Client_ID','OdysseusSenderKV'] ] }, kv_input )