Date: Fri, 29 Mar 2024 16:47:04 +0100 (CET) Message-ID: <1742148770.187.1711727224535@vmisdata19.uni-oldenburg.de> Subject: Exported From Confluence MIME-Version: 1.0 Content-Type: multipart/related; boundary="----=_Part_186_1332625838.1711727224534" ------=_Part_186_1332625838.1711727224534 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable Content-Location: file:///C:/exported.html
The Kafka transport handler allows the reading and writing from/= to Kafka.
Needs the feature de.uniol.inf.i= s.odysseus.wrapper.kafka.feature
Please note that the parameters are directly forwarded to kafka, so, ple= ase also check the Kafka documentation.
#PARSER = PQL #RUNQUERY input =3D ACCESS({ source=3D'Input', wrapper=3D'GenericPull', transport=3D'file', protocol=3D'SimpleCSV', datahandler=3D'Tuple', options=3D[ ['filename', '${PROJECTPATH}/input.csv'], ['csv.delimiter', ','], ['csv.trim', 'true'] ], schema=3D[ ['text', 'String'] ] = = = =20 } = = = =20 ) =20 output =3D SENDER({ protocol =3D 'SimpleCSV', transport =3D 'Kafka', sink =3D 'Test', wrapper =3D 'GenericPush', options =3D [ ['topic', 'test'],=20 ['messagetype', 'string'], ['bootstrap.servers', 'localhost:9092'] ] }, input )
Important: There is no GenericPull version of the transport hand= ler. So alway us GenericPush!
#PARSER = PQL #ADDQUERY input :=3D ACCESS({ source=3D'test', wrapper=3D'GenericPush', transport=3D'Kafka', protocol=3D'SimpleCSV', datahandler=3D'Tuple', options=3D[ ['topic', 'test'],=20 ['messagetype', 'string'], ['bootstrap.servers', 'localhost:9092'] ], schema=3D[ ['text', 'String'] ] = = = =20 } = = = =20 )
Odysseus can be used to copy all input of one Kafka cluster to another:<= /p>
In this example we use two docker-compose files (e.g. in folder kafka1 a= nd kafka2):
version= : '2' services: zookeeper: image: confluentinc/cp-zookeeper:latest environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 ports: - 22181:2181 kafka: image: confluentinc/cp-kafka:latest depends_on: - zookeeper ports: - 29092:29092 environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://l= ocalhost:29092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_H= OST:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
and
version= : '2' services: zookeeper: image: confluentinc/cp-zookeeper:latest environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 ports: - 22182:2181 kafka: image: confluentinc/cp-kafka:latest depends_on: - zookeeper ports: - 29093:29093 environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://l= ocalhost:29093 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_H= OST:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
Start first this query: This reads from kafka1 and writes to kafka2
#PARSER= PQL #QNAME ReadingAndWrinting #ADDQUERY input =3D ACCESS({ protocol =3D 'Odysseus', transport =3D 'Kafka', source =3D 'ReadFromKafka1', wrapper =3D 'GenericPush', datahandler =3D 'Tuple', READMETADATA =3D true, options =3D [ ['topic', 'test'], ['messagetype', 'bytearray'], ['metadata.broker.list', 'localhost:29092'], ['bootstrap.servers','localhost:29092'], ['group.id','marco1'] ], SCHEMA =3D [ =09['time','STARTTIMESTAMP'] ] }) =20 output =3D SENDER({ protocol =3D 'Odysseus', transport =3D 'Kafka', sink =3D 'WriteToKafka2', wrapper =3D 'GenericPush', WRITEMETADATA =3D true, options =3D [ ['topic', 'test_converted'], ['messagetype', 'bytearray'], ['metadata.broker.list', 'localhost:29093'], ['bootstrap.servers','localhost:29093'], ['group.id','marco1'] ] }, input )
And after that, start a query, that writes into the first kafka cluster.=
#PARSER= PQL #QNAME Writing #RUNQUERY input =3D TIMER({PERIOD =3D 1000, TIMEFROMSTART =3D true, SOURCE =3D 'sourc= e'}) output =3D SENDER({ protocol =3D 'Odysseus', transport =3D 'Kafka', sink =3D 'WriteToKafka1', wrapper =3D 'GenericPush', WRITEMETADATA =3D true, options =3D [ ['topic', 'test'], ['messagetype', 'bytearray'], ['metadata.broker.list', 'localhost:29092'], ['bootstrap.servers','localhost:29092'], ['group.id','marco1'] ] }, input )