...
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
#PARSER PQL #ADDQUERY input := ACCESS({ source='test', wrapper='GenericPush', transport='Kafka', protocol='SimpleCSV', datahandler='Tuple', options=[ ['topic', 'test'], ['messagetype', 'string'], ['bootstrap.servers', 'localhost:9092'] ], schema=[ ['text', 'String'] ] } ) |
Szenario: Reading from one Kafka and writing to another
Odysseus can be used to copy all input of one Kafka cluster to another:
In this example we use two docker-compose files (e.g. in folder kafka1 and kafka2):
Code Block |
---|
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- 22181:2181
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- 29092:29092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 |
and
Code Block |
---|
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- 22182:2181
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- 29093:29093
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 |
Start first this query: This reads from kafka1 and writes to kafka2
Code Block |
---|
#PARSER PQL
#QNAME ReadingAndWrinting
#ADDQUERY
input = ACCESS({
protocol = 'Odysseus',
transport = 'Kafka',
source = 'ReadFromKafka1',
wrapper = 'GenericPush',
datahandler = 'Tuple',
READMETADATA = true,
options = [
['topic', 'test'],
['messagetype', 'bytearray'],
['metadata.broker.list', 'localhost:29092'],
['bootstrap.servers','localhost:29092'],
['group.id','marco1']
],
SCHEMA = [
['time','STARTTIMESTAMP']
]
})
output = SENDER({
protocol = 'Odysseus',
transport = 'Kafka',
sink = 'WriteToKafka2',
wrapper = 'GenericPush',
WRITEMETADATA = true,
options = [
['topic', 'test_converted'],
['messagetype', 'bytearray'],
['metadata.broker.list', 'localhost:29093'],
['bootstrap.servers','localhost:29093'],
['group.id','marco1']
]
},
input
) |
And after that, start a query, that writes into the first kafka cluster.
Code Block |
---|
#PARSER PQL
#QNAME Writing
#RUNQUERY
input = TIMER({PERIOD = 1000, TIMEFROMSTART = true, SOURCE = 'source'})
output = SENDER({
protocol = 'Odysseus',
transport = 'Kafka',
sink = 'WriteToKafka1',
wrapper = 'GenericPush',
WRITEMETADATA = true,
options = [
['topic', 'test'],
['messagetype', 'bytearray'],
['metadata.broker.list', 'localhost:29092'],
['bootstrap.servers','localhost:29092'],
['group.id','marco1']
]
},
input
) |