Remark: This is currently in redesign.
The KeyValue feature allows to read, process and write data as key-value pairs, which don't have an fixed schema like tuples.
To use the feature you have to install it and use the "KeyValueObject" or "NestedKeyValueObject" data handler. The applicable operators will be automatically chosen. Until now selection and projection are supported. Also there are operators to transform key-value objects to tuples and the other way round. The feature also includes wrapper for handling of JSON and BSON data.
Operators
The following new or modificated operators are provided in keyvalue feature.
KeyValueToTuple
Transforms key-value object to tuples. All data fields which are not defined in schema will be lost.
#DEFINE SCHEMA [['timestamp.unixtimestamp', 'List(Integer)'],['timestamp.iso', 'String'],['track.name', 'String']] tuple = KEYVALUETOTUPLE({schema=${SCHEMA}, KEEPINPUT = 'false', TYPE = 'type'}, receiverJSON)
TupleToKeyValue
Transforms tuples to key-value objects based on tuples schema.
tupleToKeyValue = TUPLETOKEYVALUE({type='KEYVALUEOBJECT'}, receiverTuple)
Select
The select operator can be used for key value objects in the same way as for tuples.
selectJSON = SELECT({predicate='track.artist.name = "Evanescence"'}, receiverJSON) selectJSONList = SELECT({predicate='timestamp.unixtimestamp[0] = 1162304033'}, receiverJSON)
Project
For the key value projection the "paths" attribute has to be used. The "attributes" attribute only works for relational tuples.
projectJSON = PROJECT({paths = [['timestamp.unixtimestamp', 'List(Integer)'],['timestamp.iso', 'String']]}, receiverJSON)
Rename
To rename the attributes of key value objects the "pairs"Â parameter has to be set to true and the old and new attributenames have to be given in the "aliases" parameter.
renamed = RENAME({aliases = ['Zeit', 'ZeitNew', 'Band', 'BandNew'], pairs = 'true'}, mapped)
Map
For the key value MAP operator the "kvexpressions" attribute has to be used. The "expressions" attribute only works for relational tuples.
output = MAP({ kvexpressions = [ ['timestamp.unixtimestamp', 'Zeit'], ['toLong(timestamp.unixtimestamp + 1)', 'Zeit2'], ['track.artist.name', 'Band'], ['track.name', 'Lied'] ] }, input)
MEP-Functions
There are some MEP-Functions that can be used in the MAP-Operator:
- TODO
Wrapper
The feature adds protocol handler for the common key-value data formats JSON and BSON (binary JSON).
JSON
Read JSON file
json = ACCESS({ source='json', wrapper='GenericPull', transport='File', protocol='JSON', dataHandler='KeyValueObject', options=[['filename','${WORKSPACEPROJECT}\scrobbles-2006-10_linebreak.json']] /// schema=[['timestamp','STARTTIMESTAMP'], ['timestamp2','ENDTIMESTAMP']] /// only used for definition of START- or ENDTIMESTAMP })
Write JSON file
SENDERjson = SENDER({ transport='File', wrapper='GenericPush', protocol='JSON', dataHandler='KeyValueObject', SINK="SENDERjson", options=[['filename','${WORKSPACEPROJECT}\output\test2'], ['json.write.metadata','true'], ['json.write.starttimestamp','metadata.starttimestamp'], ['json.write.endtimestamp','metadata.endtimestamp'] ] }, json)
Transfer data with JSON
JSON protocol handler can not only be used to read and write files, but also to transfer data from one odysseus instance to another. The example shows the use of TCP as transport protocol, but RabbitMQ can be used the same way.
SENDERjson = SENDER({ transport='TCPServer', wrapper='GenericPush', protocol='JSON', dataHandler='KeyValueObject', SINK="SENDERjson", options=[ ['port', '8080'], ['HOST','localhost'], ]}, json) receiverJSON = RECEIVE({ transport='TCPClient', source= 'ReceiverJSON', protocol='JSON', dataHandler='KeyValueObject', options=[['port', '8080']] })
BSON
BSON can be used in exactly the same way as JSON - just replace the protocol handler.
More information about BSON can be found at http://bsonspec.org/.