You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 2 Next »

This transport handler allows to communicate with an MQTT broker (e.g. Mosquito)

The library used here for processing is PAHO.

Options

  • topic: The topic to send or receive from
  • broker: The adress of the broker
  • client_id: The name of the client. Important: If you use Odysseus for sending and receiving, there must be different client_ids!

 

Example

// Receiving elements

#PARSER PQL
#RUNQUERY
in = RECEIVE({
          transport = 'MQTT',
          protocol = 'simplecsv',
          source = 'mqttsource',
          datahandler = 'Tuple',
          options = [
            ['topic','Test'],
            ['Broker','tcp://localhost:1883'],
            ['Client_ID','OdysseusReceiver']
          ],
          schema = [['Message', 'String']]                
        }            
      )

// Sending elements
#PARSER PQL
#RUNQUERY
ping = TIMER({PERIOD = 1000, SOURCE = 'Timer'})

input = MAP({EXPRESSIONS = ['"Hallo " + toString(counter())']}, ping)

out = SENDER({
          transport = 'MQTT',
          protocol = 'simplecsv',
          sink = 'mqttsink',
          WRAPPER = 'GenericPush',
          datahandler = 'Tuple',
          options = [
            ['topic','Test'],
            ['Broker','tcp://localhost:1883'],
            ['Client_ID','OdysseusSender']
          ]                
        }, input            
      )

Remark: Protocol and data handler are just examples. MQTT can be used with any other protcol and data handler (e.g. KeyValue):

#PARSER PQL
#RUNQUERY
in = RECEIVE({
          transport = 'MQTT',
          protocol = 'JSON',
          source = 'mqttsource',
          datahandler = 'KeyValueObject',
          options = [
            ['topic','Test'],
            ['Broker','tcp://localhost:1883'],
            ['Client_ID','OdysseusReceiverKV']
          ]                
        }            
      )

#PARSER PQL
#RUNQUERY
ping = TIMER({PERIOD = 1000, SOURCE = 'Timer'})

input = MAP({EXPRESSIONS = [['"Hallo " + toString(counter())','message']]}, ping)

kv_input = TUPLETOKEYVALUE(input)

out = SENDER({
          transport = 'MQTT',
          protocol = 'JSON',
          sink = 'mqttsink',
          WRAPPER = 'GenericPush',
          datahandler = 'KeyValueObject',
          options = [
            ['topic','Test'],
            ['Broker','tcp://localhost:1883'],
            ['Client_ID','OdysseusSenderKV']
          ]                
        }, kv_input            
      )

  • No labels