The KeyValue feature allows to read, process and write data as key-value pairs, which don't have an fixed schema like tuples.
To use the feature you have to install it and use the "KeyValueObject" or "NestedKeyValueObject" data handler. The applicable operators will be automatically chosen. Until now selection and projection are supported. Also there are operators to transform key-value objects to tuples and the other way round. The feature also includes wrapper for handling of JSON and BSON data.
The following new or modificated operators are provided in keyvalue feature.
Transforms key-value object to tuples. All data fields which are not defined in schema will be lost.
#DEFINE SCHEMA [['timestamp.unixtimestamp', 'List(Integer)'],['timestamp.iso', 'String'],['track.name', 'String']] tuple = KEYVALUETOTUPLE({schema=${SCHEMA}, KEEPINPUT = 'false', TYPE = 'type'}, receiverJSON) |
Transforms tuples to key-value objects based on tuples schema.
tupleToKeyValue = TUPLETOKEYVALUE({type='KEYVALUEOBJECT'}, receiverTuple) |
The select operator can be used for key value objects in the same way as for tuples.
selectJSON = SELECT({predicate='track.artist.name = "Evanescence"'}, receiverJSON) selectJSONList = SELECT({predicate='timestamp.unixtimestamp[0] = 1162304033'}, receiverJSON) |
For the key value projection the "paths" attribute has to be used. The "attributes" attribute only works for relational tuples.
projectJSON = PROJECT({paths = [['timestamp.unixtimestamp', 'List(Integer)'],['timestamp.iso', 'String']]}, receiverJSON) |
To rename the attributes of key value objects the "pairs" parameter has to be set to true and the old and new attributenames have to be given in the "aliases" parameter.
renamed = RENAME({aliases = ['Zeit', 'ZeitNew', 'Band', 'BandNew'], pairs = 'true'}, mapped) |
For the key value MAP operator the "kvexpressions" attribute has to be used. The "expressions" attribute only works for relational tuples.
output = MAP({ kvexpressions = [ ['timestamp.unixtimestamp', 'Zeit'], ['toLong(timestamp.unixtimestamp + 1)', 'Zeit2'], ['track.artist.name', 'Band'], ['track.name', 'Lied'] ] }, input) |
The feature adds protocol handler for the common key-value data formats JSON and BSON (binary JSON).
json = ACCESS({ source='json', wrapper='GenericPull', transport='File', protocol='JSON', dataHandler='KeyValueObject', options=[['filename','${WORKSPACEPROJECT}\scrobbles-2006-10_linebreak.json']] /// schema=[['timestamp','STARTTIMESTAMP'], ['timestamp2','ENDTIMESTAMP']] /// only used for definition of START- or ENDTIMESTAMP }) |
SENDERjson = SENDER({ transport='File', wrapper='GenericPush', protocol='JSON', dataHandler='KeyValueObject', SINK="SENDERjson", options=[['filename','${WORKSPACEPROJECT}\output\test2'], ['json.write.metadata','true'], ['json.write.starttimestamp','metadata.starttimestamp'], ['json.write.endtimestamp','metadata.endtimestamp'] ] }, json) |
JSON protocol handler can not only be used to read and write files, but also to transfer data from one odysseus instance to another. The example shows the use of TCP as transport protocol, but RabbitMQ can be used the same way.
SENDERjson = SENDER({ transport='TCPServer', wrapper='GenericPush', protocol='JSON', dataHandler='KeyValueObject', SINK="SENDERjson", options=[ ['port', '8080'], ['HOST','localhost'], ]}, json) receiverJSON = RECEIVE({ transport='TCPClient', source= 'ReceiverJSON', protocol='JSON', dataHandler='KeyValueObject', options=[['port', '8080']] }) |
BSON can be used in exactly the same way as JSON - just replace the protocol handler.
More information about BSON can be found at http://bsonspec.org/.