The file transport handler allows the reading and writing to arbitrary files

Options

Example

PQL

input = ACCESS({source='Source',
wrapper='GenericPull',
transport='File',
protocol='CSV',
dataHandler='Tuple',
options=[['filename', '/src/odysseus/input.csv']],
schema=[
['id', 'Double'],
['data', 'String']]
})

output = SENDER({sink='Sink',
wrapper='GenericPush',
transport='File',
protocol='CSV',
dataHandler='Tuple',
options=[['filename', '/src/odysseus/output.csv']]
}, input)

Example with directory:

/// Data from tankerkoenig.de, example to stream all historic price data
#PARSER PQL
#ADDQUERY
in = CSVFILESOURCE({
          schema = [
            ['date', 'STARTTIMESTAMPSTRING'],
            ['station_uuid','STRING'],
            ['diesel','FLOAT'],
            ['e5','FLOAT'],
            ['e10','FLOAT'],
            ['dieselchange','BOOLEAN'],
            ['e5change','BOOLEAN'],
            ['e10change','BOOLEAN']
          ],
          READFIRSTLINE = false,
          directory = 'D:/tankerkoenig-data/prices/',
          DATEFORMAT = 'yyyy-MM-dd HH:mm:ssX',
          source = 'Tanker-Prices'                  
        }      
      )

Remark: CSVFILESOURCE is a short hand for ACCESS as above. Typically, it is better to use this form, as options are provided as parameters.

CQL

CREATE STREAM source (id Double, data STRING)
    WRAPPER 'GenericPush'
    PROTOCOL 'CSV'
    TRANSPORT 'File'
    DATAHANDLER 'Tuple'
    OPTIONS ( 'filename' '/src/odysseus/input.csv')


CREATE SINK sink (id Double, data STRING)
    WRAPPER 'GenericPush'
    PROTOCOL 'CSV'
    TRANSPORT 'File'
    DATAHANDLER 'Tuple'
    OPTIONS ( 'filename' '/src/odysseus/output.csv')