This transport handler allows to communicate with an MQTT broker (e.g. Mosquito)
The library used here for processing is PAHO.
To use this transport handler you need to install the MQTT Wrapper Feature.
// Receiving elements #PARSER PQL #RUNQUERY in = RECEIVE({ transport = 'MQTT', protocol = 'simplecsv', source = 'mqttsource', datahandler = 'Tuple', options = [ ['topic','Test'], ['Broker','tcp://localhost:1883'], ['Client_ID','OdysseusReceiver'] ], schema = [['Message', 'String']] } ) // Sending elements #PARSER PQL #RUNQUERY ping = TIMER({PERIOD = 1000, SOURCE = 'Timer'}) input = MAP({EXPRESSIONS = ['"Hallo " + toString(counter())']}, ping) out = SENDER({ transport = 'MQTT', protocol = 'simplecsv', sink = 'mqttsink', WRAPPER = 'GenericPush', datahandler = 'Tuple', options = [ ['topic','Test'], ['Broker','tcp://localhost:1883'], ['Client_ID','OdysseusSender'] ] }, input ) |
Remark: Protocol and data handler are just examples. MQTT can be used with any other protcol and data handler (e.g. KeyValue):
#PARSER PQL #RUNQUERY in = RECEIVE({ transport = 'MQTT', protocol = 'JSON', source = 'mqttsource', datahandler = 'KeyValueObject', options = [ ['topic','Test'], ['Broker','tcp://localhost:1883'], ['Client_ID','OdysseusReceiverKV'] ] } ) #PARSER PQL #RUNQUERY ping = TIMER({PERIOD = 1000, SOURCE = 'Timer'}) input = MAP({EXPRESSIONS = [['"Hallo " + toString(counter())','message']]}, ping) kv_input = TUPLETOKEYVALUE(input) out = SENDER({ transport = 'MQTT', protocol = 'JSON', sink = 'mqttsink', WRAPPER = 'GenericPush', datahandler = 'KeyValueObject', options = [ ['topic','Test'], ['Broker','tcp://localhost:1883'], ['Client_ID','OdysseusSenderKV'] ] }, kv_input ) |