You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 4 Next »

The Odysseus Access Operator Framework

This document describes Odysseus possibilities to integrate external data streams.

Usage

To process external data streams they need to be registered in Odysseus. This is typically done with one of the query languages Odysseus provides. Although it is possible to use CQL to attach data streams, the PQL approach is much more flexible. In the following we will concentrate on the integration approach with PQL (some corresponding approaches with CQL can be found in the appendix)

PQL - ACCESS

To integrate new streams with PQL the ACCESS-Operator is needed. Because of compatibility issues, there are a lot of more deprecated parameters, which can be set. In the following we will only describe the preferred parameters. The deprecated parameters will be removed in a future version.


The following parameters can be used in the ACCESS-Operator:

  • Source: This is the system wide unique name of the source. If the source name is already used and further parameters are given, an error is thrown. An already created source can be reused by using this source parameter only.
  • Wrapper: This parameter allows the selection of the wrapper that is responsible for the integration of the sources. In Odysseus the default wrappers are GenericPush and GenericPull. Other extensions provide further names.
  • Schema: This parameter is needed as the output schema of the access operator and for the creation of some data handler (e.g. Tuple). For each Element there must be a base data handler available. The special types StartTimestamp(String) and EndTimestamp(String) are used to set the time meta data of the created element. Example:[['TIMESTAMP','StartTimeStamp', ['NAME','String'],['TEMP','Double'],

['AccX','Double'],['AccY','Double'],
['AccZ','Double'],['PosX','Double']]

  • InputSchema: If this parameter is used, different input data handlers are used to create the data. It is important that these handlers produce elements that are compatible with the elements that are created by the Schema. The output schema is not affected.
  • dateFormat: This parameter must be given, if the String-Version of the Timestamps are used. The format is the same as in Java SimpleDateFormat.Example: dateFormat="yyyy-MM-dd'T'HH:mm:ss.SSS"

The following parameters are to further describe the wrapper GenericPush and GenericPull. GenericPull is needed, when the data needs to be extracted from the sources (e.g. from a file) and GenericPush is needed, when the data from the source is send actively. Pull requires scheduling (done automatically), push not.
Each parameter typically needs further configurations parameters (e.g. a file name for a file wrapper). These additional parameters are set in the options-Parameter, consisting of key-value pairs:
Options = [['key1', 'value1'], ['key2', 'value2'], … , ['keyN', 'valueN']]

Transport

This parameter selects the input type of the Wrapper. The following values are currently supported for the GenericPull-Wrapper:

  • File: File references to a local file (local where the query is executed, i.e. in a client/server scenario this file must be located on the server).
    • The following options are available:
      • Filename: The path to the file.
  • TcpSocket: This parameter defines a tcp socket connection to a server where the input data need to be retrieved from the source. It blocks until data is available
    • The following options are available:
      • Host: The name or IP of the server
      • Port: The port number of the server

If the source needs login and password

      • User
      • Password

The following values are currently supported for the GenericPush-Wrapper:

  • NonBlockingTcp: This parameter defines a tcp socket connection to a server where the communication does not block. Each time new data is available it is send to the system automatically (Java Nio).
    • The following options are available:
      • Host: The name or IP of the server
      • Port: The port number of the server
      • Autoconnect: A boolean indicating if on a access failure the connection should be reinitialized again. (currently not supported!)

If the source needs login and password

      • User: The login
      • Password: The password

Protocol

The parameter determines how the input from the transport is processed. The main task for this component is the identification of objects in the input and the preparation for the data handler (see next parameter).
The following protocols are currently available in Odysseus.

GenericPull

  • Line: This simple handler just reads one line from the input and sends the Text to the data handler.
    • Delay [in ms] To reduce the data rate
  • SimpleCSV: This handler is similar to line. Additionally, it splits the line based on a delimiter that needs to be set in the options. This handler does not treat escaping of the delimiter (e.g. by quotes or backslash). A string array is send to the data handler.
    • Delimiter: The delimiter that separates each element from another.
  • CSV: Same as SimpleCSV but treats quotes. Because this version is slower used SimpleCSV if no quoted elements are contained in the source.
  • Text: This handler can be used to identify elements in a character stream where a distinct delimiter is used to separate the objects. The whole object is send to the data handler.
    • Delimiter: The delimiter that should be used to separate the object
    • KeepDelimiter: A flag that indicates if the delimiter should be part of the result send to the data handler or not
    • Charset: The java char set that should be used to decode the input (e.g. "utf-8").

 

GenericPush

  • SizeByteBuffer: This handler initially reads 4 byte and interpreting them as input size. The byte order is the java byte order (BIG_ENDIAN) or can be set with the parameter ByteOrder to "LITTLE_ENDIAN". After reading size bytes the Buffer is send to the data handler.
  • MarkerByteBuffer: This handler reads elements between a start and an end byte.
    • Start: The byte that marks the start token (token is not part of the object)
    • End: The byte that marks the end token (token is not part of the object)
    • ByteOrder: The Order of the Bytes (default BIG_EDIAN)

DataHandler

Finally, this option defines the data handler that is responsible for the creation of the objects that will be processed inside Odysseus. The set of data handlers can be distinguished into handler for base types (like long, boolean or int) and constructors for complex types (like tuple or list). For the following set of base data types Odysseus provides data handler:

  • Boolean: Can be created from 0/1 and true/false
  • Date: Currently these elements are treated as long values (TODO)
  • Double, Float: Processing of double values
  • Integer, Byte, Short: Processed as integer values (4 bytes)
  • Long, Timestamp, StartTimestamp, EndTimestamp: Processed as long values (8 bytes)
  • String: Integer with size in bytes, chars (TODO: Encoding?)

Odysseus provides the following type constructors:

  • Multi_Value: Creates a list. The schema defines the type of the list elements.
  • Tuple: Creates a tuple. The schema parameter defines the set type of the elements

Examples

The following pql command creates a new source with

nexmark_person := ACCESS({source='nexmark:person', wrapper='GenericPush', transport='NonBlockingTcp', protocol='SizeByteBuffer', 
 						  dataHandler='Tuple',options=[['host','odysseus.offis.uni-oldenburg.de'],['port','65440'],['ByteOrder','Little_Endian']],
						  schema=[['timestamp','StartTimeStamp'],
								  ['id','INTEGER'],
								  ['name','String'],
								  ['email','String'],
								  ['creditcard','String'],
								  ['city','String'],
								  ['state','String'] 
 ]})
worldBoundaries := ACCESS({Source='WorldBoundaries', Wrapper='GenericPull', 
 Schema=[['geometry','SpatialGeometry'], 
 ['geometry_vertex_count','Integer'],
 ['OBJECTID','Integer'],
 ['ISO_2DIGIT','String'],
 ['Shape_Leng','Double'],
 ['Shape_Area','Double'],
 ['Name','String'],
 ['import_notes','String'],
 ['Google requests','String']
 ],
 InputSchema=['SpatialKML','Integer','Integer','String','Double','Double','String','String','String'],
 transport='File',
 protocol='csv',
 dataHandler='Tuple',

 Options=[['filename','C:/Users/Marco Grawunder/Documents/My 
Dropbox/OdysseusQuickShare/Daten/Geo/World Country Boundaries.csv'],
 ['Delimiter',',']]}

Extending the framework

AccessAO, AccessAOBuilder ¿ PQL Documentation
GenericPush and GenericPull
New Wrapper

Existing Extensions

Scai
Sick

 

PQL - Sender

To publish processed data with PQL the SENDER-Operator is needed. This operator takes care of the application depending and transport depending transformation and delivery of the processed elements in the data stream.


The following parameters are required in the SENDER-Operator:

Parameter

Wrapper

This parameter allows the selection of the wrapper that is responsible for the delivery of the data. In Odysseus the default wrappers are GenericPush and GenericPull. Other extensions provide further names.

Transport

The transport handler is responsible for the delivery of the processed data stream elements at a given endpoint.

Protocol

The protocol handler is responsible for the transformation of the processed sensor data elements into an application depending protocol to transport them over a given transport protocol to an endpoint.

DataHandler

The data handler transforms the elements in a data stream to the right representations (I.e. String or Byte Array). Depending on the protocol handler a specific data handler may be required. However, in most cases the data handler Tuple should be adequate.

Options

The options field includes additional parameter for the transport and protocol handlers.

Example

 

Sender Operator
output = SENDER({
wrapper='GenericPush',
transport='TCPClient',
protocol='CSV',
dataHandler='Tuple',
options=[['host', 'example.com'],['port', '8081'],['read', '10240'],['write', '10240']]
}, input)

Available Protocol Handler

CSV

The CSV protocol handler parses comma separated text lines.

Options:

  • delimiter: The delimiter for splitting the input

  • skipfirstline: Skip the first line of input

Available Transport Handler

TCPClient

Description

The TCPClient transport handler allows the delivery of results over TCP/IP to an arbitrary host. This transport handler can be used in both directions, connecting to a server to receive data and connect to a server to publish data.

Options

  • host: The host/IP of the target

  • port: The port of the target system

  • read: The size of the read buffer

  • write: The size of the write buffer

Example

 

TCPClient Transport Handler
input = ACCESS({source='Source',
wrapper='GenericPush',
transport='TCPClient',
protocol='CSV',
dataHandler='Tuple',
options=[['host', 'example.com'],['port', '8080'],['read', '10240'],['write', '10240']],
schema=[
['id', 'Double'],
['data', 'String']]
})

output = SENDER({wrapper='GenericPush',
transport='TCPClient',
protocol='CSV',
dataHandler='Tuple',
options=[['host', 'example.com'],['port', '8081'],['read', '10240'],['write', '10240']]
}, input)

TCPServer

Description

The TCPServer transport handler allows the propagation of results over TCP/IP as a server. This transport handler can be used in both directions, other client can connect to this server to receive data and to publish data.

Options

  • port: The port to listen for connections

  • read: The size of the read buffer

  • write: The size of the write buffer

Example

 

TCPServer Transport Handler
input = ACCESS({source='Source',
wrapper='GenericPush',
transport='TCPServer',
protocol='CSV',
dataHandler='Tuple',
options=[['port', '8080'],['read', '10240'],['write', '10240']],
schema=[
['id', 'Double'],
['data', 'String']]
})

output = SENDER({wrapper='GenericPush',
transport='TCPServer',
protocol='CSV',
dataHandler='Tuple',
options=[['port', '8081'],['read', '10240'],['write', '10240']]
}, input)

 

 

UDPClient

Description

The UDPClient transport handler allows the delivery of results over UDP/IP to an arbitrary host. This transport handler can only be used in one direction, to publish data over UDP.

Options

  • host: The host/IP of the target

  • port: The port of the target system

  • write: The size of the write buffer

Example

 

UDPClient Transport Handler
output = SENDER({wrapper='GenericPush',
transport='UDPClient',
protocol='CSV',
dataHandler='Tuple',
options=[['host', 'example.com'],['port', '8080'],['write', '10240']]
}, input)

 

 

UDPServer

Description

The UDPServer transport handler allows the propagation of results over UDP/IP as a server. This transport handler can only be used in one direction, other UDP clients can send data to this server to publish data.

Options

  • Port: The port to listen for data

  • read: The size of the read buffer



Example

 

UDPServer Transport Handler
input = ACCESS({source='Source',
wrapper='GenericPush',
transport='UDPServer',
protocol='CSV',
dataHandler='Tuple',
options=[['port', '8080'],['read', '10240']],
schema=[
['id', 'Double'],
['data', 'String']]
})

 

 

 

 





 

  • No labels