...
To process external data streams they need to be registered in Odysseus. This is typically done with one of the query languages Odysseus provides. Although it is possible to use CQL to attach data streams, the PQL approach is much more flexible. In the following we will concentrate on the integration approach with PQL (the corresponding usage with CQL can be found in The Access Operator Framework in CQL(StreamSQL)):
- In PQL this framework is used by the ACCESS and the SENDER operator
- In CQL this framework is used by the CREATE STREAM and the CREATE SINK statements
The general structure of the framework is as follows:
PQL - ACCESS
To integrate new streams with PQL the ACCESS
-Operator is needed. Because of compatibility issues, there are a lot of more deprecated parameters, which can be set. In the following we will only describe the preferred parameters. The deprecated parameters will be removed in a future version.
...
Code Block |
---|
source = ACCESS({source='nexmark:person', wrapper='GenericPush', transport='NonBlockingTcp', protocol='SizeByteBuffer',
dataHandler='Tuple',options=[['host','odysseus.offis.uni-oldenburg.de'],['port','65440'],['ByteOrder','Little_Endian']],
schema=[['timestamp','StartTimeStamp'],
['id','INTEGER'],
['name','String'],
['email','String'],
['creditcard','String'],
['city','String'],
['state','String']
]}) |
Code Block |
---|
source = ACCESS({Source='WorldBoundaries', Wrapper='GenericPull',
Schema=[['geometry','SpatialGeometry'],
['geometry_vertex_count','Integer'],
['OBJECTID','Integer'],
['ISO_2DIGIT','String'],
['Shape_Leng','Double'],
['Shape_Area','Double'],
['Name','String'],
['import_notes','String'],
['Google requests','String']
],
InputSchema=['SpatialKML','Integer','Integer','String','Double','Double','String','String','String'],
transport='File',
protocol='csv',
dataHandler='Tuple',
Options=[['filename','C:/Users/Marco Grawunder/Documents/My
Dropbox/OdysseusQuickShare/Daten/Geo/World Country Boundaries.csv'],
['Delimiter',',']]} |
...
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
output = SENDER({
sink='Sink',
wrapper='GenericPush',
transport='TCPClient',
protocol='CSV',
dataHandler='Tuple',
options=[['host', 'example.com'],['port', '8081'],['read', '10240'],['write', '10240']]
}, input) |
...