The Kafka transport handler allows the reading and writing from/to Kafka.
Needs the feature de.uniol.inf.is.odysseus.wrapper.kafka.feature
Options
Mandatory for reading and writing:
- topic: String defining the kafka topic to read/write
- messagetype: either 'string' or 'bytearray'; defines the type of values written to/read from Kafka
Mandatory for writing;
metadata.broker.list: comma separated list of Kafka brokers (host:port)
Mandatory for reading;
bootstrap.servers: comma separated list of Kafka brokers (host:port)
Optional:
- All other properties possible for Kafka Producers and Consumers respectively (see Kafka documentation for possible properties)
Example
PQL for writing
File Transport Handler
#PARSER PQL #RUNQUERY input = ACCESS({ source='Input', wrapper='GenericPull', transport='file', protocol='SimpleCSV', datahandler='Tuple', options=[ ['filename', '${PROJECTPATH}/input.csv'], ['csv.delimiter', ','], ['csv.trim', 'true'] ], schema=[ ['text', 'String'] ] } ) output = SENDER({ protocol = 'SimpleCSV', transport = 'Kafka', sink = 'Test', wrapper = 'GenericPush', options = [ ['topic', 'test'], ['messagetype', 'string'], ['metadata.broker.list', 'localhost:9092'] ] }, input )
PQL for reading
Important: There is no GenericPull version of the transport handler. So alway us GenericPush!
File Transport Handler
#PARSER PQL #ADDQUERY input := ACCESS({ source='test', wrapper='GenericPush', transport='Kafka', protocol='SimpleCSV', datahandler='Tuple', options=[ ['topic', 'test'], ['messagetype', 'string'], ['bootstrap.servers', 'localhost:9092'] ], schema=[ ['text', 'String'] ] } )