Page tree
Skip to end of metadata
Go to start of metadata

This transport handler allows to communicate with an MQTT broker (e.g. Mosquito)

The library used here for processing is PAHO.

To use this transport handler you need to install the MQTT Wrapper Feature.

Options

  • topic: The topic to send or receive from
  • broker: The adress of the broker
  • client_id: The name of the client. Important: If you use Odysseus for sending and receiving, there must be different client_ids!

 

Example

// Receiving elements

#PARSER PQL
#RUNQUERY
in = RECEIVE({
          transport = 'MQTT',
          protocol = 'simplecsv',
          source = 'mqttsource',
          datahandler = 'Tuple',
          options = [
            ['topic','Test'],
            ['Broker','tcp://localhost:1883'],
            ['Client_ID','OdysseusReceiver']
          ],
          schema = [['Message', 'String']]                
        }            
      )

// Sending elements
#PARSER PQL
#RUNQUERY
ping = TIMER({PERIOD = 1000, SOURCE = 'Timer'})

input = MAP({EXPRESSIONS = ['"Hallo " + toString(counter())']}, ping)

out = SENDER({
          transport = 'MQTT',
          protocol = 'simplecsv',
          sink = 'mqttsink',
          WRAPPER = 'GenericPush',
          datahandler = 'Tuple',
          options = [
            ['topic','Test'],
            ['Broker','tcp://localhost:1883'],
            ['Client_ID','OdysseusSender']
          ]                
        }, input            
      )

Remark: Protocol and data handler are just examples. MQTT can be used with any other protcol and data handler (e.g. KeyValue):

#PARSER PQL
#RUNQUERY
in = RECEIVE({
          transport = 'MQTT',
          protocol = 'JSON',
          source = 'mqttsource',
          datahandler = 'KeyValueObject',
          options = [
            ['topic','Test'],
            ['Broker','tcp://localhost:1883'],
            ['Client_ID','OdysseusReceiverKV']
          ]                
        }            
      )

#PARSER PQL
#RUNQUERY
ping = TIMER({PERIOD = 1000, SOURCE = 'Timer'})

input = MAP({EXPRESSIONS = [['"Hallo " + toString(counter())','message']]}, ping)

kv_input = TUPLETOKEYVALUE(input)

out = SENDER({
          transport = 'MQTT',
          protocol = 'JSON',
          sink = 'mqttsink',
          WRAPPER = 'GenericPush',
          datahandler = 'KeyValueObject',
          options = [
            ['topic','Test'],
            ['Broker','tcp://localhost:1883'],
            ['Client_ID','OdysseusSenderKV']
          ]                
        }, kv_input            
      )

  • No labels