The Kafka transport handler allows the reading and writing from/to Kafka.
Needs the feature de.uniol.inf.is.odysseus.wrapper.kafka.feature
Options
Mandatory for reading and writing:
- topic: String defining the kafka topic to read/write
- messagetype: either 'string' or 'bytearray'; defines the type of values written to/read from Kafka
- bootstrap.servers: comma separated list of Kafka brokers (host:port)
Optional:
- All other properties possible for Kafka Producers and Consumers respectively (see Kafka documentation for possible properties)
Please note that the parameters are directly forwarded to kafka, so, please also check the Kafka documentation.
Example
PQL for writing
File Transport Handler
#PARSER PQL #RUNQUERY input = ACCESS({ source='Input', wrapper='GenericPull', transport='file', protocol='SimpleCSV', datahandler='Tuple', options=[ ['filename', '${PROJECTPATH}/input.csv'], ['csv.delimiter', ','], ['csv.trim', 'true'] ], schema=[ ['text', 'String'] ] } ) output = SENDER({ protocol = 'SimpleCSV', transport = 'Kafka', sink = 'Test', wrapper = 'GenericPush', options = [ ['topic', 'test'], ['messagetype', 'string'], ['bootstrap.servers', 'localhost:9092'] ] }, input )
PQL for reading
Important: There is no GenericPull version of the transport handler. So alway us GenericPush!
File Transport Handler
#PARSER PQL #ADDQUERY input := ACCESS({ source='test', wrapper='GenericPush', transport='Kafka', protocol='SimpleCSV', datahandler='Tuple', options=[ ['topic', 'test'], ['messagetype', 'string'], ['bootstrap.servers', 'localhost:9092'] ], schema=[ ['text', 'String'] ] } )