Remark: This is currently in redesign.

The KeyValue feature allows to read, process and write data as key-value pairs, which don't have an fixed schema like tuples.

To use the feature you have to install it and use the "KeyValueObject" data handler. The applicable operators will be automatically chosen. Until now selection and projection are supported. Also there are operators to transform key-value objects to tuples and the other way round. The feature also includes wrapper for handling of JSON data.

Operators

The following new or modificated operators are provided in keyvalue feature.

KeyValueToTuple (Deprecated, use ToTuple operator instead.)

Transforms key-value object to tuples. Use the SCHEMA attribute to define:

  • the path of the input that should be part of the tuple. This could be a dot based path: e.g. root.id.value. This is only possible if the result is not a KeyValueObject! You can also use JSONPATH.
  • optional the name that should be use in the schema of the tuple (remark special charactes from Jsonpath like "$", "*", etc. are replaced automatically
  • the datatype of the element that is retrieved from the key value object. This could also be a key value object or List

Examples for schema could be:

          ['$','root','KeyValueObject'],
          ['monday', 'List(Integer)'],
          ['$..monday.length()','MondaySize','mondayCount','Integer'],
          ['$.uuid', 'String'],
          ['$.monday[0]',MondayFirst, 'Integer'],


WIth the type field you can set the type of the tuple.

#DEFINE SCHEMA [['timestamp.unixtimestamp', 'List(Integer)'],['timestamp.iso', 'String'],['track.name', 'String']]
tuple = KEYVALUETOTUPLE({schema=${SCHEMA}, TYPE = 'type'}, receiverJSON)

TupleToKeyValue

Transforms tuples to key-value objects based on the tuples schema.

tupleToKeyValue = TUPLETOKEYVALUE(receiverTuple)

Select

The select operator can be used for key value objects in the same way as for tuples.

selectJSON = SELECT({predicate='track.artist.name = "Evanescence"'}, receiverJSON)
selectJSONList = SELECT({predicate='timestamp.unixtimestamp[0] = 1162304033'}, receiverJSON)

Project

For the key value projection the "paths" attribute has to be used. The "attributes" attribute only works for relational tuples.

 projectJSON = PROJECT({paths = [['timestamp.unixtimestamp', 'List(Integer)'],['timestamp.iso', 'String']]}, receiverJSON)

Rename

To rename the attributes of key value objects the "pairs" parameter has to be set to true and the old and new attributenames have to be given in the "aliases" parameter.

renamed = RENAME({aliases = ['Zeit', 'ZeitNew', 'Band', 'BandNew'], pairs = 'true'}, mapped)

Map

For the key value MAP operator the "kvexpressions" attribute has to be used. The "expressions" attribute only works for relational tuples.

output = MAP({
              kvexpressions = [
								['timestamp.unixtimestamp', 'Zeit'],
								['toLong(timestamp.unixtimestamp + 1)', 'Zeit2'],
                                ['track.artist.name', 'Band'],
                                ['track.name', 'Lied']
							]
             }, input)

MEP-Functions

There are some MEP-Functions that can be used in the MAP-Operator:

  • TODO

Wrapper

The feature adds protocol handler for the common key-value data formats JSON and BSON (binary JSON).

JSON

Read JSON file

 json = ACCESS({
    source='json', 
    wrapper='GenericPull',
    transport='File',
    protocol='JSON',
    dataHandler='KeyValueObject',
    options=[['filename','${WORKSPACEPROJECT}\scrobbles-2006-10_linebreak.json']]
/// schema=[['timestamp','STARTTIMESTAMP'], ['timestamp2','ENDTIMESTAMP']] /// only used for definition of START- or ENDTIMESTAMP
})

Write JSON file

SENDERjson = SENDER({
    transport='File',
    wrapper='GenericPush',
    protocol='JSON',
    dataHandler='KeyValueObject',
    SINK="SENDERjson",
    options=[['filename','${WORKSPACEPROJECT}\output\test2'],
		['json.write.metadata','true'],
        ['json.write.starttimestamp','metadata.starttimestamp'], 
        ['json.write.endtimestamp','metadata.endtimestamp'] ]
}, json)

Transfer data with JSON

JSON protocol handler can not only be used to read and write files, but also to transfer data from one odysseus instance to another. The example shows the use of TCP as transport protocol, but RabbitMQ can be used the same way.

 SENDERjson = SENDER({
    transport='TCPServer',
    wrapper='GenericPush',
    protocol='JSON',
    dataHandler='KeyValueObject',
    SINK="SENDERjson",
    options=[
        ['port', '8080'],
        ['HOST','localhost'],
    ]}, json)
receiverJSON = RECEIVE({
    transport='TCPClient',
    source= 'ReceiverJSON',
    protocol='JSON',
    dataHandler='KeyValueObject',
    options=[['port', '8080']]         
})

Labels (2)

Comments  (0)

Attachments  (0)

Add Attachment
No files shared here yet.