The Kafka transport handler allows the reading and writing from/to Kafka.
Options
...
Needs the feature de.uniol.inf.is.odysseus.wrapper.kafka.feature
Options
Mandatory for reading and writing:
...
- topic: String defining the kafka topic to read/write
- messagetype: either 'string' or 'bytearray'; defines the type of values written to/read from Kafka
Mandatory for writing;
- metadata.broker.list: comma separated list of Kafka brokers (host:port)
...
- bootstrap.servers: comma separated list of Kafka brokers (host:port)
...
Optional:
- All other properties possible for Kafka Producers and Consumers respectively (see Kafka documentation for possible properties)
Please note that the parameters are directly forwarded to kafka, so, please also check the Kafka documentation.
Example
PQL for writing
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
#PARSER PQL #RUNQUERY input = ACCESS({ source='Input', wrapper='GenericPull', transport='file', protocol='SimpleCSV', datahandler='Tuple', options=[ ['filename', '${PROJECTPATH}/input.csv'], ['csv.delimiter', ','], ['csv.trim', 'true'] ], schema=[ ['text', 'String'] ] } ) output = SENDER({ protocol = 'SimpleCSV', transport = 'Kafka', sink = 'Test', wrapper = 'GenericPush', options = [ ['topic', 'test'], ['messagetype', 'string'], ['metadatabootstrap.broker.listservers', 'localhost:9092'] ] }, input ) |
PQL for reading
Important: There is no GenericPull version of the transport handler. So alway us GenericPush!
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
#PARSER PQL
#ADDQUERY
input := ACCESS({
source='test',
wrapper='GenericPush',
transport='Kafka',
protocol='SimpleCSV',
datahandler='Tuple',
options=[
['topic', 'test'],
['messagetype', 'string'],
['bootstrap.servers', 'localhost:9092']
],
schema=[
['text', 'String']
]
}
) |
Szenario: Reading from one Kafka and writing to another
Odysseus can be used to copy all input of one Kafka cluster to another:
In this example we use two docker-compose files (e.g. in folder kafka1 and kafka2):
Code Block |
---|
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- 22181:2181
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- 29092:29092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 |
and
Code Block |
---|
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- 22182:2181
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- 29093:29093
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 |
Start first this query: This reads from kafka1 and writes to kafka2
Code Block |
---|
#PARSER PQL
#QNAME ReadingAndWrinting
#ADDQUERY
input = ACCESS({
protocol = 'Odysseus',
transport = 'Kafka',
source = 'ReadFromKafka1',
wrapper = 'GenericPush',
datahandler = 'Tuple',
READMETADATA = true,
options = [
['topic', 'test'],
['messagetype', 'bytearray'],
['metadata.broker.list', 'localhost:29092'],
['bootstrap.servers','localhost:29092'],
['group.id','marco1']
],
SCHEMA = [
['time','STARTTIMESTAMP']
]
})
output = SENDER({
protocol = 'Odysseus',
transport = 'Kafka',
sink = 'WriteToKafka2',
wrapper = 'GenericPush',
WRITEMETADATA = true,
options = [
['topic', 'test_converted'],
['messagetype', 'bytearray'],
['metadata.broker.list', 'localhost:29093'],
['bootstrap.servers','localhost:29093'],
['group.id','marco1']
]
},
input
) |
And after that, start a query, that writes into the first kafka cluster.
Code Block |
---|
#PARSER PQL
#QNAME Writing
#RUNQUERY
input = TIMER({PERIOD = 1000, TIMEFROMSTART = true, SOURCE = 'source'})
output = SENDER({
protocol = 'Odysseus',
transport = 'Kafka',
sink = 'WriteToKafka1',
wrapper = 'GenericPush',
WRITEMETADATA = true,
options = [
['topic', 'test'],
['messagetype', 'bytearray'],
['metadata.broker.list', 'localhost:29092'],
['bootstrap.servers','localhost:29092'],
['group.id','marco1']
]
},
input
) |