...
- topic: String defining the kafka topic to read/write
- messagetype: either 'string' or 'bytearray'; defines the type of values written to/read from Kafka
Mandatory for writing;
metadata.broker.list: comma separated list of Kafka brokers (host:port)
Mandatory for reading;
- bootstrap.servers: comma separated list of Kafka brokers (host:port)
Optional:
- All other properties possible for Kafka Producers and Consumers respectively (see Kafka documentation for possible properties)
Please note that the parameters are directly forwarded to kafka, so, please also check the Kafka documentation.
Example
PQL for writing
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
#PARSER PQL #RUNQUERY input = ACCESS({ source='Input', wrapper='GenericPull', transport='file', protocol='SimpleCSV', datahandler='Tuple', options=[ ['filename', '${PROJECTPATH}/input.csv'], ['csv.delimiter', ','], ['csv.trim', 'true'] ], schema=[ ['text', 'String'] ] } ) output = SENDER({ protocol = 'SimpleCSV', transport = 'Kafka', sink = 'Test', wrapper = 'GenericPush', options = [ ['topic', 'test'], ['messagetype', 'string'], ['metadatabootstrap.broker.listservers', 'localhost:9092'] ] }, input ) |
...
Important: There is no GenericPull -Version version of the transport handler. So alway us GenericPush!
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
#PARSER PQL
#ADDQUERY
input := ACCESS({
source='test',
wrapper='GenericPush',
transport='Kafka',
protocol='SimpleCSV',
datahandler='Tuple',
options=[
['topic', 'test'],
['messagetype', 'string'],
['bootstrap.servers', 'localhost:9092']
],
schema=[
['text', 'String']
]
}
) |
Szenario: Reading from one Kafka and writing to another
Odysseus can be used to copy all input of one Kafka cluster to another:
In this example we use two docker-compose files (e.g. in folder kafka1 and kafka2):
Code Block |
---|
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- 22181:2181
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- 29092:29092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 |
and
Code Block |
---|
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- 22182:2181
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- 29093:29093
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 |
Start first this query: This reads from kafka1 and writes to kafka2
Code Block |
---|
#PARSER PQL #QNAME ReadingAndWrinting #ADDQUERY input = ACCESS({ protocol = 'Odysseus', transport = 'Kafka', source = 'ReadFromKafka1', wrapper = 'GenericPush', datahandler = 'Tuple', READMETADATA = true, options = [ ['topic', 'test'], ['messagetype', 'bytearray'], ['metadata.broker.list', 'localhost:29092'], ['bootstrap.servers','localhost:29092'], ['group.id','marco1'] ], SCHEMA = [ ['time','STARTTIMESTAMP'] ] }) output = SENDER({ protocol = 'Odysseus', transport = 'Kafka', sink = 'WriteToKafka2', wrapper = 'GenericPush', WRITEMETADATA = true, options = [ ['topic', 'test_converted'], ['messagetype', 'bytearray'], ['metadata.broker.list', 'localhost:29093'], ['bootstrap.servers','localhost:29093'], ['group.id','marco1'] ] }, input ) |
And after that, start a query, that writes into the first kafka cluster.
Code Block |
---|
#PARSER PQL #QNAME Writing #RUNQUERY input = TIMER({PERIOD = 1000, TIMEFROMSTART = true, SOURCE = 'source'}) output = SENDER({ protocol = 'Odysseus', transport = 'Kafka', sink = 'WriteToKafka1', wrapper = 'GenericPush', WRITEMETADATA = true, options = [ ['topic', 'test'], ['messagetype', 'bytearray'], ['metadata.broker.list', 'localhost:29092'], ['bootstrap.servers','localhost:29092'], ['group.id','marco1'] ] }, input ) |