The Kafka transport handler allows the reading and writing from/to Kafka.
Needs the feature de.uniol.inf.is.odysseus.wrapper.kafka.feature
Options
Mandatory for reading and writing:
- topic: String defining the kafka topic to read/write
- messagetype: either 'string' or 'bytearray'; defines the type of values written to/read from Kafka
- bootstrap.servers: comma separated list of Kafka brokers (host:port)
Optional:
- All other properties possible for Kafka Producers and Consumers respectively (see Kafka documentation for possible properties)
Please note that the parameters are directly forwarded to kafka, so, please also check the Kafka documentation.
Example
PQL for writing
#PARSER PQL #RUNQUERY input = ACCESS({ source='Input', wrapper='GenericPull', transport='file', protocol='SimpleCSV', datahandler='Tuple', options=[ ['filename', '${PROJECTPATH}/input.csv'], ['csv.delimiter', ','], ['csv.trim', 'true'] ], schema=[ ['text', 'String'] ] } ) output = SENDER({ protocol = 'SimpleCSV', transport = 'Kafka', sink = 'Test', wrapper = 'GenericPush', options = [ ['topic', 'test'], ['messagetype', 'string'], ['bootstrap.servers', 'localhost:9092'] ] }, input )
PQL for reading
Important: There is no GenericPull version of the transport handler. So alway us GenericPush!
#PARSER PQL #ADDQUERY input := ACCESS({ source='test', wrapper='GenericPush', transport='Kafka', protocol='SimpleCSV', datahandler='Tuple', options=[ ['topic', 'test'], ['messagetype', 'string'], ['bootstrap.servers', 'localhost:9092'] ], schema=[ ['text', 'String'] ] } )
Szenario: Reading from one Kafka and writing to another
Odysseus can be used to copy all input of one Kafka cluster to another:
In this example we use two docker-compose files (e.g. in folder kafka1 and kafka2):
version: '2' services: zookeeper: image: confluentinc/cp-zookeeper:latest environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 ports: - 22181:2181 kafka: image: confluentinc/cp-kafka:latest depends_on: - zookeeper ports: - 29092:29092 environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
and
version: '2' services: zookeeper: image: confluentinc/cp-zookeeper:latest environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 ports: - 22182:2181 kafka: image: confluentinc/cp-kafka:latest depends_on: - zookeeper ports: - 29093:29093 environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29093 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
Start first this query: This reads from kafka1 and writes to kafka2
#PARSER PQL #QNAME ReadingAndWrinting #ADDQUERY input = ACCESS({ protocol = 'Odysseus', transport = 'Kafka', source = 'ReadFromKafka1', wrapper = 'GenericPush', datahandler = 'Tuple', READMETADATA = true, options = [ ['topic', 'test'], ['messagetype', 'bytearray'], ['metadata.broker.list', 'localhost:29092'], ['bootstrap.servers','localhost:29092'], ['group.id','marco1'] ], SCHEMA = [ ['time','STARTTIMESTAMP'] ] }) output = SENDER({ protocol = 'Odysseus', transport = 'Kafka', sink = 'WriteToKafka2', wrapper = 'GenericPush', WRITEMETADATA = true, options = [ ['topic', 'test_converted'], ['messagetype', 'bytearray'], ['metadata.broker.list', 'localhost:29093'], ['bootstrap.servers','localhost:29093'], ['group.id','marco1'] ] }, input )
And after that, start a query, that writes into the first kafka cluster.
#PARSER PQL #QNAME Writing #RUNQUERY input = TIMER({PERIOD = 1000, TIMEFROMSTART = true, SOURCE = 'source'}) output = SENDER({ protocol = 'Odysseus', transport = 'Kafka', sink = 'WriteToKafka1', wrapper = 'GenericPush', WRITEMETADATA = true, options = [ ['topic', 'test'], ['messagetype', 'bytearray'], ['metadata.broker.list', 'localhost:29092'], ['bootstrap.servers','localhost:29092'], ['group.id','marco1'] ] }, input )