Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • topic: String defining the kafka topic to read/write
  • messagetype: either 'string' or 'bytearray'; defines the type of values written to/read from Kafka

Mandatory for writing;

  • metadata.broker.list: comma separated list of Kafka brokers (host:port)

Mandatory for reading;

  • bootstrap.servers: comma separated list of Kafka brokers (host:port)

Optional:

  • All other properties possible for Kafka Producers and Consumers respectively (see Kafka documentation for possible properties)

Please note that the parameters are directly forwarded to kafka, so, please also check the Kafka documentation.

Example

PQL for writing

Code Block
languagejavascript
themeEclipse
titleFile Transport Handler
linenumberstrue
#PARSER PQL
#RUNQUERY
input = ACCESS({
            source='Input',
            wrapper='GenericPull',
            transport='file',
            protocol='SimpleCSV',
            datahandler='Tuple',
            options=[
              ['filename', '${PROJECTPATH}/input.csv'],
              ['csv.delimiter', ','],
              ['csv.trim', 'true']
            ],
            schema=[
              ['text', 'String']
            ]                                                                                                                                                                                                                                                        
          }                                                                                                                                                                                                                                  
        )
        
output = SENDER({
              protocol = 'SimpleCSV',
              transport = 'Kafka',
              sink = 'Test',
              wrapper = 'GenericPush',
              options = [
                ['topic', 'test'], 
                ['messagetype', 'string'],
                ['metadatabootstrap.broker.listservers', 'localhost:9092']
              ]
            },
            input
          )

...

Code Block
languagejavascript
themeEclipse
titleFile Transport Handler
linenumberstrue
#PARSER PQL
#ADDQUERY
input := ACCESS({
            source='test',
            wrapper='GenericPush',
            transport='Kafka',
            protocol='SimpleCSV',
            datahandler='Tuple',
            options=[
              ['topic', 'test'], 
              ['messagetype', 'string'],
              ['bootstrap.servers', 'localhost:9092']
            ],
            schema=[
              ['text', 'String']
            ]                                                                                                                                                                                                                                                        
          }                                                                                                                                                                                                                                  
        )


Szenario: Reading from one Kafka and writing to another

Odysseus can be used to copy all input of one Kafka cluster to another:

In this example we use two docker-compose files (e.g. in folder kafka1 and kafka2):

Code Block
version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - 22181:2181

  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - 29092:29092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1

and

Code Block
version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - 22182:2181

  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - 29093:29093
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29093
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1


Start first this query: This reads from kafka1 and writes to kafka2

Code Block
#PARSER PQL
#QNAME ReadingAndWrinting
#ADDQUERY
input = ACCESS({
              protocol = 'Odysseus',
              transport = 'Kafka',
              source = 'ReadFromKafka1',
              wrapper = 'GenericPush',
              datahandler = 'Tuple',
              READMETADATA = true,
              options = [
                ['topic', 'test'],
                ['messagetype', 'bytearray'],
                ['metadata.broker.list', 'localhost:29092'],
                ['bootstrap.servers','localhost:29092'],
                ['group.id','marco1']
              ],
              SCHEMA = [
              	['time','STARTTIMESTAMP']
              ]
            })
            
output = SENDER({
              protocol = 'Odysseus',
              transport = 'Kafka',
              sink = 'WriteToKafka2',
              wrapper = 'GenericPush',
              WRITEMETADATA = true,
              options = [
                ['topic', 'test_converted'],
                ['messagetype', 'bytearray'],
                ['metadata.broker.list', 'localhost:29093'],
                ['bootstrap.servers','localhost:29093'],
                ['group.id','marco1']
              ]
            },
            input
          )            

And after that, start a query, that writes into the first kafka cluster.

Code Block
#PARSER PQL
#QNAME Writing
#RUNQUERY

input = TIMER({PERIOD = 1000, TIMEFROMSTART = true, SOURCE = 'source'})

output = SENDER({
              protocol = 'Odysseus',
              transport = 'Kafka',
              sink = 'WriteToKafka1',
              wrapper = 'GenericPush',
              WRITEMETADATA = true,
              options = [
                ['topic', 'test'],
                ['messagetype', 'bytearray'],
                ['metadata.broker.list', 'localhost:29092'],
                ['bootstrap.servers','localhost:29092'],
                ['group.id','marco1']
              ]
            },
            input
          )