The Kafka transport handler allows the reading and writing from/to Kafka.
Needs the feature de.uniol.inf.is.odysseus.wrapper.kafka.feature
Please note that the parameters are directly forwarded to kafka, so, please also check the Kafka documentation.
#PARSER PQL #RUNQUERY input = ACCESS({ source='Input', wrapper='GenericPull', transport='file', protocol='SimpleCSV', datahandler='Tuple', options=[ ['filename', '${PROJECTPATH}/input.csv'], ['csv.delimiter', ','], ['csv.trim', 'true'] ], schema=[ ['text', 'String'] ] } ) output = SENDER({ protocol = 'SimpleCSV', transport = 'Kafka', sink = 'Test', wrapper = 'GenericPush', options = [ ['topic', 'test'], ['messagetype', 'string'], ['bootstrap.servers', 'localhost:9092'] ] }, input ) |
Important: There is no GenericPull version of the transport handler. So alway us GenericPush!
#PARSER PQL #ADDQUERY input := ACCESS({ source='test', wrapper='GenericPush', transport='Kafka', protocol='SimpleCSV', datahandler='Tuple', options=[ ['topic', 'test'], ['messagetype', 'string'], ['bootstrap.servers', 'localhost:9092'] ], schema=[ ['text', 'String'] ] } ) |
Odysseus can be used to copy all input of one Kafka cluster to another:
In this example we use two docker-compose files (e.g. in folder kafka1 and kafka2):
version: '2' services: zookeeper: image: confluentinc/cp-zookeeper:latest environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 ports: - 22181:2181 kafka: image: confluentinc/cp-kafka:latest depends_on: - zookeeper ports: - 29092:29092 environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 |
and
version: '2' services: zookeeper: image: confluentinc/cp-zookeeper:latest environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 ports: - 22182:2181 kafka: image: confluentinc/cp-kafka:latest depends_on: - zookeeper ports: - 29093:29093 environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29093 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 |
Start first this query: This reads from kafka1 and writes to kafka2
#PARSER PQL #QNAME ReadingAndWrinting #ADDQUERY input = ACCESS({ protocol = 'Odysseus', transport = 'Kafka', source = 'ReadFromKafka1', wrapper = 'GenericPush', datahandler = 'Tuple', READMETADATA = true, options = [ ['topic', 'test'], ['messagetype', 'bytearray'], ['metadata.broker.list', 'localhost:29092'], ['bootstrap.servers','localhost:29092'], ['group.id','marco1'] ], SCHEMA = [ ['time','STARTTIMESTAMP'] ] }) output = SENDER({ protocol = 'Odysseus', transport = 'Kafka', sink = 'WriteToKafka2', wrapper = 'GenericPush', WRITEMETADATA = true, options = [ ['topic', 'test_converted'], ['messagetype', 'bytearray'], ['metadata.broker.list', 'localhost:29093'], ['bootstrap.servers','localhost:29093'], ['group.id','marco1'] ] }, input ) |
And after that, start a query, that writes into the first kafka cluster.
#PARSER PQL #QNAME Writing #RUNQUERY input = TIMER({PERIOD = 1000, TIMEFROMSTART = true, SOURCE = 'source'}) output = SENDER({ protocol = 'Odysseus', transport = 'Kafka', sink = 'WriteToKafka1', wrapper = 'GenericPush', WRITEMETADATA = true, options = [ ['topic', 'test'], ['messagetype', 'bytearray'], ['metadata.broker.list', 'localhost:29092'], ['bootstrap.servers','localhost:29092'], ['group.id','marco1'] ] }, input ) |