This document describes Odysseus possibilities to integrate external data streams.
Usage
To process external data streams they need to be registered in Odysseus. This is typically done with one of the query languages Odysseus provides:
...
The general structure of the framework is as follows:
...
ACCESS
To integrate new streams with PQL the ACCESS
-Operator is needed. Because of compatibility issues, there are a lot of more deprecated parameters, which can be set. In the following we will only describe the preferred parameters. The deprecated parameters will be removed in a future version.
...
- Multi_Value: Creates a list. The schema defines the type of the list elements.
- Tuple: Creates a tuple. The schema parameter defines the set type of the elements
Examples
The following PQL command creates a new source with
...
Code Block |
---|
source = ACCESS({source='source', Wrapper='GenericPull', Schema=[['geometry','SpatialGeometry'], ['geometry_vertex_count','Integer'], ['OBJECTID','Integer'], ['ISO_2DIGIT','String'], ['Shape_Leng','Double'], ['Shape_Area','Double'], ['Name','String'], ['import_notes','String'], ['Google requests','String'] ], InputSchema=['SpatialKML','Integer','Integer','String','Double','Double','String','String','String'], transport='File', protocol='csv', dataHandler='Tuple', Options=[['filename','C:/Users/Marco Grawunder/Documents/My Dropbox/OdysseusQuickShare/Daten/Geo/World Country Boundaries.csv'], ['Delimiter',',']]} |
...
Sender
To publish processed data with PQL the SENDER
-Operator is needed. This operator takes care of the application depending and transport depending transformation and delivery of the processed elements in the data stream.
...
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
output = SENDER({
wrapper='GenericPush',
transport='TCPClient',
protocol='CSV',
dataHandler='Tuple',
options=[['host', 'example.com'],['port', '8081'],['read', '10240'],['write', '10240']]
}, input) |
Mapping between CQL and PQL
The following shows how the access operator framework can be mapped between CQL and PQL.
Create Streams (access)
As described in Access framework, the access for incoming data can be flexibly done in PQL. An example would be:
Code Block | ||
---|---|---|
| ||
nexmark_person := ACCESS({wrapper='GenericPush', transport='NonBlockingTcp', protocol='SizeByteBuffer',
dataHandler='Tuple',options=[['host','odysseus.offis.uni-oldenburg.de'],['port','65440'],['ByteOrder','Little_Endian']],
schema=[['timestamp','StartTimeStamp'],
['id','INTEGER'],
['name','String'],
['email','String'],
['creditcard','String'],
['city','String'],
['state','String']
]})
|
An equivalent access to streams using CQL looks like follows:
Code Block | ||
---|---|---|
| ||
CREATE STREAM nexmark:person (timestamp STARTTIMESTAMP, id INTEGER, name STRING, email STRING, creditcard STRING, city STRING, state STRING)
WRAPPER 'GenericPush'
PROTOCOL 'SizeByteBuffer'
TRANSPORT 'NonBlockingTcp'
DATAHANDLER 'Tuple'
OPTIONS ( 'port' '65440', 'host' 'odysseus.offis.uni-oldenburg.de', 'ByteOrder' 'Little_Endian')
|
As you may see, there is a direct mapping between the needed parameters. So you can use each Protocol Handler and Data handler and Transport Handler in a CREATE STREAM statement like it is used in PQL . Thus, the wrapper must be also existing, which are e.g. GenericPush or GenericPull (see also Access framework). The Options-parameter is optional and is a comma separated list of key value pairs that are enclosed by quotation marks.
Now, you can use this stream like:
Code Block | ||
---|---|---|
| ||
SELECT * FROM nexmark:person WHERE...
|
Create Sink (sender)
Similar to creating sources for incoming data, you can also create sinks for outgoing data. The notation is very similar to "create stream". Since it is also based on the Access Framework, you can also need different Protocol Handler and Data handler and Transport Handler. For example, the following creates a sink that writes a CSV file:
Code Block | ||
---|---|---|
| ||
CREATE SINK writeout (timestamp STARTTIMESTAMP, auction INTEGER, bidder INTEGER, datetime LONG, price DOUBLE)
WRAPPER 'GenericPush'
PROTOCOL 'CSV'
TRANSPORT 'File'
DATAHANDLER 'Tuple'
OPTIONS ( 'filename' 'E:\test') |
Now you can use this sink by a STREAM-TO query:
Code Block | ||
---|---|---|
| ||
STREAM TO writeout SELECT * FROM nexmark:person WHERE...
|
This example would push all data that is produced by "SELECT * FROM nexmark:person WHERE..." into the sink named writeout, which is a file-writer in our case.