Page tree
Skip to end of metadata
Go to start of metadata

The Kafka transport handler allows the reading and writing from/to Kafka.

Needs the feature de.uniol.inf.is.odysseus.wrapper.kafka.feature 

Options

Mandatory for reading and writing:

  • topic: String defining the kafka topic to read/write
  • messagetype: either 'string' or 'bytearray'; defines the type of values written to/read from Kafka

Mandatory for writing;

  • metadata.broker.list: comma separated list of Kafka brokers (host:port)

Mandatory for reading;

  • bootstrap.servers: comma separated list of Kafka brokers (host:port)

Optional:

  • All other properties possible for Kafka Producers and Consumers respectively (see Kafka documentation for possible properties)

Example

PQL for writing

File Transport Handler
#PARSER PQL
#RUNQUERY
input = ACCESS({
            source='Input',
            wrapper='GenericPull',
            transport='file',
            protocol='SimpleCSV',
            datahandler='Tuple',
            options=[
              ['filename', '${PROJECTPATH}/input.csv'],
              ['csv.delimiter', ','],
              ['csv.trim', 'true']
            ],
            schema=[
              ['text', 'String']
            ]                                                                                                                                                                                                                                                        
          }                                                                                                                                                                                                                                  
        )
        
output = SENDER({
              protocol = 'SimpleCSV',
              transport = 'Kafka',
              sink = 'Test',
              wrapper = 'GenericPush',
              options = [
                ['topic', 'test'], 
                ['messagetype', 'string'],
                ['metadata.broker.list', 'localhost:9092']
              ]
            },
            input
          )

PQL for reading

File Transport Handler
#PARSER PQL
#ADDQUERY
input := ACCESS({
            source='test',
            wrapper='GenericPush',
            transport='Kafka',
            protocol='SimpleCSV',
            datahandler='Tuple',
            options=[
              ['topic', 'test'], 
              ['messagetype', 'string'],
              ['bootstrap.servers', 'localhost:9092']
            ],
            schema=[
              ['text', 'String']
            ]                                                                                                                                                                                                                                                        
          }                                                                                                                                                                                                                                  
        )
  • No labels