Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Merge pages

This document describes Odysseus possibilities to integrate external data streams.

Usage

To process external data streams they need to be registered in Odysseus. This is typically done with one of the query languages Odysseus provides:

...

The general structure of the framework is as follows:

...

ACCESS

To integrate new streams with PQL the ACCESS-Operator is needed. Because of compatibility issues, there are a lot of more deprecated parameters, which can be set. In the following we will only describe the preferred parameters. The deprecated parameters will be removed in a future version.

...

  • Multi_Value: Creates a list. The schema defines the type of the list elements.
  • Tuple: Creates a tuple. The schema parameter defines the set type of the elements

Examples

The following PQL command creates a new source with

...

Code Block
source = ACCESS({source='source', Wrapper='GenericPull', 
 Schema=[['geometry','SpatialGeometry'], 
 ['geometry_vertex_count','Integer'],
 ['OBJECTID','Integer'],
 ['ISO_2DIGIT','String'],
 ['Shape_Leng','Double'],
 ['Shape_Area','Double'],
 ['Name','String'],
 ['import_notes','String'],
 ['Google requests','String']
 ],
 InputSchema=['SpatialKML','Integer','Integer','String','Double','Double','String','String','String'],
 transport='File',
 protocol='csv',
 dataHandler='Tuple',

 Options=[['filename','C:/Users/Marco Grawunder/Documents/My 
Dropbox/OdysseusQuickShare/Daten/Geo/World Country Boundaries.csv'],
 ['Delimiter',',']]}

...

Sender

To publish processed data with PQL the SENDER-Operator is needed. This operator takes care of the application depending and transport depending transformation and delivery of the processed elements in the data stream.

...

Code Block
themeEclipse
languagejavascript
titleSender Operator
linenumberstrue
output = SENDER({
wrapper='GenericPush',
transport='TCPClient',
protocol='CSV',
dataHandler='Tuple',
options=[['host', 'example.com'],['port', '8081'],['read', '10240'],['write', '10240']]
}, input)

 

Mapping between CQL and PQL

The following shows how the access operator framework can be mapped between CQL and PQL.

Create Streams (access)

As described in Access framework, the access for incoming data can be flexibly done in PQL. An example would be:

Code Block
languagejava
nexmark_person := ACCESS({wrapper='GenericPush', transport='NonBlockingTcp', protocol='SizeByteBuffer', 
                          dataHandler='Tuple',options=[['host','odysseus.offis.uni-oldenburg.de'],['port','65440'],['ByteOrder','Little_Endian']],
                          schema=[['timestamp','StartTimeStamp'],
                                  ['id','INTEGER'],
                                  ['name','String'],
                                  ['email','String'],
                                  ['creditcard','String'],
                                  ['city','String'],
                                  ['state','String'] 
 ]})

An equivalent access to streams using CQL looks like follows:

Code Block
languagejava
CREATE STREAM nexmark:person (timestamp STARTTIMESTAMP, id INTEGER, name STRING, email STRING, creditcard STRING, city STRING, state STRING)
    WRAPPER 'GenericPush' 
    PROTOCOL 'SizeByteBuffer'
    TRANSPORT 'NonBlockingTcp'
    DATAHANDLER 'Tuple'
    OPTIONS ( 'port' '65440', 'host' 'odysseus.offis.uni-oldenburg.de', 'ByteOrder' 'Little_Endian')

As you may see, there is a direct mapping between the needed parameters. So you can use each Protocol Handler and Data handler and Transport Handler in a CREATE STREAM statement like it is used in PQL . Thus, the wrapper must be also existing, which are e.g. GenericPush or GenericPull (see also Access framework). The Options-parameter is optional and is a comma separated list of key value pairs that are enclosed by quotation marks.

Now, you can use this stream like:

Code Block
languagejava
SELECT * FROM nexmark:person WHERE...

 

Create Sink (sender)

Similar to creating sources for incoming data, you can also create sinks for outgoing data. The notation is very similar to "create stream". Since it is also based on the Access Framework, you can also need different Protocol Handler and Data handler and Transport Handler. For example, the following creates a sink that writes a CSV file:

Code Block
languagejava
CREATE SINK writeout (timestamp STARTTIMESTAMP,    auction INTEGER, bidder INTEGER, datetime LONG,    price DOUBLE)
    WRAPPER 'GenericPush'
    PROTOCOL 'CSV'
    TRANSPORT 'File'
    DATAHANDLER 'Tuple'
    OPTIONS ( 'filename' 'E:\test')

Now you can use this sink by a STREAM-TO query:

Code Block
languagejava
STREAM TO writeout SELECT * FROM nexmark:person WHERE...

This example would push all data that is produced by "SELECT * FROM nexmark:person WHERE..." into the sink named writeout, which is a file-writer in our case.