Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

The Kafka transport handler allows the reading and writing from/to Kafka.

Needs the feature de.uniol.inf.is.odysseus.wrapper.kafka.feature 

Options

 

Mandatory for reading and writing:

...

  • topic: String defining the kafka topic to read/write
  • messagetype: either 'string' or 'bytearray'; defines the type of values written to/read from Kafka

Mandatory for writing;

 

  • metadata.broker.list: comma separated list of Kafka brokers (host:port)

...

  • bootstrap.servers: comma separated list of Kafka brokers (host:port)

 

Optional:

  • All other properties possible for Kafka Producers and Consumers respectively (see Kafka documentation for possible properties)

Please note that the parameters are directly forwarded to kafka, so, please also check the Kafka documentation.

Example

PQL for writing

Code Block
languagejavascript
themeEclipse
titleFile Transport Handler
linenumberstrue
#PARSER PQL
#RUNQUERY
input = ACCESS({
            source='Input',
            wrapper='GenericPull',
            transport='file',
            protocol='SimpleCSV',
            datahandler='Tuple',
            options=[
              ['filename', '${PROJECTPATH}/input.csv'],
              ['csv.delimiter', ','],
              ['csv.trim', 'true']
            ],
            schema=[
              ['text', 'String']
            ]                                                                                                                                                                                                                                                        
          }                                                                                                                                                                                                                                  
        )
        
output = SENDER({
              protocol = 'SimpleCSV',
              transport = 'Kafka',
              sink = 'Test',
              wrapper = 'GenericPush',
              options = [
                ['topic', 'test'], 
                ['messagetype', 'string'],
                ['metadatabootstrap.broker.listservers', 'localhost:9092']
              ]
            },
            input
          )

PQL for reading

Important: There is no GenericPull version of the transport handler. So alway us GenericPush!

Code Block
languagejavascript
themeEclipse
titleFile Transport Handler
linenumberstrue
#PARSER PQL
#ADDQUERY
input := ACCESS({
            source='test',
            wrapper='GenericPush',
            transport='Kafka',
            protocol='SimpleCSV',
            datahandler='Tuple',
            options=[
              ['topic', 'test'], 
              ['messagetype', 'string'],
              ['bootstrap.servers', 'localhost:9092']
            ],
            schema=[
              ['text', 'String']
            ]                                                                                                                                                                                                                                                        
          }                                                                                                                                                                                                                                  
        )