#PARSER PQL
#QNAME radius_normal
#PARSER PQL
#DEFINE data_path '/home/odysseus/data/2017/AIS_2017_04_01_Zone10_1000_1200.csv'
#DEFINE input_delay_ms 0
#DEFINE result_path '/home/odysseus/data/eval/radius_normal/join/radius_ToLinearTemporalPoint30m_2c_1MILLISECONDS_10pmin_5000m_buffer_1000000_0.csv'
#DEFINE radius 5000
#DEFINE center_ids toList(367629990,316004106)
#DEFINE time_other_join_window 1
#DEFINE unit_other_join_window 'MILLISECONDS'
#DEFINE prediction_minutes 10
#DEFINE temporalization_window 30
#DEFINE temporalization_method 'ToLinearTemporalPoint'
#DEFINE buffer_size 1000000
#DEFINE NUMBER_STEPS 3
#RUNQUERY
input = ACCESS({
source='vesselRoute2',
wrapper='GenericPull',
schema=[
['MMSI','Long'],
['BaseDateTime','StartTimeStamp'],
['latitude','Double'],
['longitude','Double'],
['SOG','Double'],
['COG','Double'],
['Heading','Double'],
['VesselName','String'],
['IMO','String'],
['CallSign','String'],
['VesselType','String'],
['Status','String'],
['Length','Double'],
['Width','Double'],
['Draft','Double'],
['Cargo','Integer']
],
inputschema=[
'Long','String','Double','Double','Double','Double','Double','String','String','String','String','String','Double','Double','Double','Integer'
],
transport='File',
protocol='csv',
metaattribute = [
'TimeInterval', 'PredictionTimes', 'Datarate', 'Latency'
],
datahandler='Tuple',
dateformat = 'yyyy-MM-dd\'T\'HH:mm:ss',
options=[
['filename', ${data_path}],
['Delimiter',','],
['TextDelimiter','"'],
['delay',${input_delay_ms}],
['readfirstline','false']
]
}
)
/// Measure the datarate
rate = DATARATE({
updaterate = 100,
key = 'datarate'
},
input
)
/// We only want to use the elements from the last time
time = TIMEWINDOW({
size = [${temporalization_window}, 'minutes']
},
rate
)
createSpatialObject = MAP({
expressions = [
['ToPoint(latitude, longitude, 4326)', 'SpatialPoint'],
['SOG * 0.514444', 'SOGms'],
['MMSI','id'],
['BaseDateTime','recordDateTime']
],
keepinput = false
},
time
)
/// Temporalize the location attribute
temporalize = AGGREGATION({
aggregations = [
['function' = ${temporalization_method}, 'input_attributes' = 'SpatialPoint', 'output_attributes' = 'temp_SpatialPoint'],
['function' = 'Trigger']
],
group_by = ['id'],
eval_at_outdating = false
},
createSpatialObject
)
lat1 = CALCLATENCY(temporalize
)
maplat1 = MAP({
expressions = [['latency', 'latency1']],
keepinput = true
},
lat1
)
/// Select the center
selectCenter = SELECT({
predicate = 'contains(id, ${center_ids})',
heartbeatrate = 1
},
maplat1
)
renameCenter = MAP({
expressions = [
['id','id_center'],
['temp_SpatialPoint','center_temp_SpatialPoint'],
['latency','center_latency1']
]
},
selectCenter
)
allObjects = TIMEWINDOW({
size = [60, 'minutes']
},
maplat1
)
updateOnCenter = TIMEWINDOW({
size = [
${time_other_join_window}, ${unit_other_join_window}
]
},
renameCenter
)
recombine = JOIN({
predicate = 'id != id_center',
elementsizeport0 = 1,
elementsizeport1 = 1,
group_by_port_0 = ['id_center'],
group_by_port_1 = ['id']
},
updateOnCenter,
allObjects
)
lat2 = CALCLATENCY(recombine
)
maplat2 = MAP({
expressions = [
['min(latency1, center_latency1)', 'latency_min1'],
['latency', 'latency2']
],
keepinput = true
},
lat2
)
#IF toInteger(buffer_size) > 0
buf = BUFFER({
THREADED = true,
maxbuffersize = ${buffer_size}
},
maplat2
)
/// Set the prediction time
predTime = PREDICTIONTIME({
addtostartvalue = [0, 'MINUTES'],
addtoendvalue = [${prediction_minutes}, 'MINUTES'],
predictionbasetimeunit = 'MINUTES'
},
buf
)
#ELSE
/// Set the prediction time
predTime = PREDICTIONTIME({
addtostartvalue = [0, 'MINUTES'],
addtoendvalue = [${prediction_minutes}, 'MINUTES'],
predictionbasetimeunit = 'MINUTES'
},
maplat2
)
#ENDIF
/// Calculate exact distance for refine step
calcDistance = MAP({
expressions = [
['OrthodromicDistance(center_temp_SpatialPoint, temp_SpatialPoint)','tdistance']
],
keepinput = true
},
predTime
)
/// Refine step
distanceSelect = SELECT({
predicate = 'tdistance < ${radius}'
},
calcDistance
)
lat3 = CALCLATENCY(distanceSelect
)
/// Get datarate into data
getDatarate = MAP({
expressions = [
['id_center', 'id_center'],
['id', 'id'],
['latency_min1', 'latency1'],
['latency2', 'latency2'],
['latency', 'latency3'],
['last(first(Measurements))','datarate']
],
keepinput = false
},
lat3
)
/// Store the results
output = SENDER({
sink='Sink',
wrapper='GenericPush',
transport='File',
protocol='CSV',
datahandler='Tuple',
options=[
['delimiter',','],
['textDelimiter',"'"],
['csv.writeheading',true],
['filename', ${result_path}],
['createdir', true]
]
},
getDatarate
) |