The Kafka transport handler allows the reading and writing from/to Kafka.
Needs the feature de.uniol.inf.is.odysseus.wrapper.kafka.feature
Options
Mandatory for reading and writing:
- topic: String defining the kafka topic to read/write
- messagetype: either 'string' or 'bytearray'; defines the type of values written to/read from Kafka
Mandatory for writing;
metadata.broker.list: comma separated list of Kafka brokers (host:port)
Mandatory for reading;
bootstrap.servers: comma separated list of Kafka brokers (host:port)
Optional:
- All other properties possible for Kafka Producers and Consumers respectively (see Kafka documentation for possible properties)
Example
PQL for writing
File Transport Handler
#PARSER PQL
#RUNQUERY
input = ACCESS({
source='Input',
wrapper='GenericPull',
transport='file',
protocol='SimpleCSV',
datahandler='Tuple',
options=[
['filename', '${PROJECTPATH}/input.csv'],
['csv.delimiter', ','],
['csv.trim', 'true']
],
schema=[
['text', 'String']
]
}
)
output = SENDER({
protocol = 'SimpleCSV',
transport = 'Kafka',
sink = 'Test',
wrapper = 'GenericPush',
options = [
['topic', 'test'],
['messagetype', 'string'],
['metadata.broker.list', 'localhost:9092']
]
},
input
)
PQL for reading
Important: There is no GenericPull-Version of the transport handler. So alway us GenericPush!
File Transport Handler
#PARSER PQL
#ADDQUERY
input := ACCESS({
source='test',
wrapper='GenericPush',
transport='Kafka',
protocol='SimpleCSV',
datahandler='Tuple',
options=[
['topic', 'test'],
['messagetype', 'string'],
['bootstrap.servers', 'localhost:9092']
],
schema=[
['text', 'String']
]
}
)