#PARSER PQL #METADATA TimeInterval #METADATA Latency #PARALLELIZATION (type=INTER_OPERATOR) (degree=4) (buffersize=AUTO) #DEFINE ID_LEFTSHIFT 1000 /// for internal id calculation #DEFINE BUFFERSIZE 100000 #IFQNDEF QUERY1 #QNAME QUERY1 #ADDQUERY query1_start = BUFFER({ name="Q1_BUFFER", threaded = true, drainatclose = false, maxbuffersize = ${BUFFERSIZE} }, taxi ) cell_reduction = MAP({ expressions = [ 'pickup_datetime','dropoff_datetime', ['ceil(pickup_cell_x/2) * ${ID_LEFTSHIFT} + ceil(pickup_cell_y/2)', 'pickup_cell_id'], ['ceil(dropoff_cell_x/2) * ${ID_LEFTSHIFT} + ceil(dropoff_cell_y/2)', 'dropoff_cell_id'] ] }, query1_start ) cell_id_calc = TIMEWINDOW({ size = [30, 'MINUTES'] }, cell_reduction ) num_trips_per_cellpair = AGGREGATE({ id = "TripCellCounter", name = "TripCellCounter", aggregations = [ ['LAST', 'pickup_datetime', 'pickup_datetime', 'Date'], ['LAST', 'dropoff_datetime', 'dropoff_datetime', 'Date'], ['COUNT', 'pickup_datetime', 'count'] ], group_by = [ 'pickup_cell_id', 'dropoff_cell_id' ], fastgrouping = false }, cell_id_calc ) top10_pre = TOPK({ k = 10, scoringfunction = 'count', TIEWITHTIMESTAMP = true }, num_trips_per_cellpair ) /// Merge all top k with same time stamp top10 = CHANGEDETECT({ATTR = ['TimeInterval.start','TimeInterval.end'], SENDLASTOFSAMEOBJECTS = true}, top10_pre) mapped10 = MAP({ expressions = [ ['format(elementAt(trigger,2),"YYYY-MM-DD HH:mm:ss")','pickup_datetime'], ['format(elementAt(trigger,3),"YYYY-MM-DD HH:mm:ss"))','dropoff_datetime'] #LOOP i 0 upto 9 ,['concat(concat(toInteger(elementAt(topk[${i}],0)/${ID_LEFTSHIFT}), "."), toInteger(elementAt(topk[${i}],0)%${ID_LEFTSHIFT}))','start_cell_id_${i}'] ,['concat(concat(toInteger(elementAt(topk[${i}],1)/${ID_LEFTSHIFT}), "."), toInteger(elementAt(topk[${i}],1)%${ID_LEFTSHIFT}))','end_cell_id_${i}'] ,['toInteger(elementAt(topk[${i}],4))','v_${i}'] #ENDLOOP ,['Latency.latency', 'delay'] ,['format(toDate(TimeInterval.start),"YYYY-MM-DD HH:mm:ss")','start'] ,['format(toDate(TimeInterval.end),"YYYY-MM-DD HH:mm:ss")','end'] ], ALLOWNULL = true, SUPPRESSERRORS = true }, CALCLATENCY(top10) ) #ENDIF #IFQNDEF QUERY2 #QNAME QUERY2 #ADDQUERY query2_start = BUFFER({ name="Q2_BUFFER", threaded = true, drainatclose = false, maxbuffersize = ${BUFFERSIZE} }, taxi ) cell_id_calc = MAP({ expressions = [ 'medallion','pickup_datetime', 'dropoff_datetime', ['toLong(pickup_cell_x * ${ID_LEFTSHIFT} + pickup_cell_y)', 'pickup_cell_id'], ['toLong(dropoff_cell_x * ${ID_LEFTSHIFT} + dropoff_cell_y)', 'dropoff_cell_id'], 'fare_amount','tip_amount' ] }, query2_start ) /// empty taxis empty_taxis_windowed = TIMEWINDOW({ size = [30, 'MINUTES'] }, cell_id_calc ) last_taxi_event = AGGREGATE({ name = 'LAST_TAXI_EVENT', aggregations = [ ['LAST', 'pickup_datetime', 'pickup_datetime','Date'], ['LAST', 'dropoff_datetime', 'dropoff_datetime','Date'], ['LAST', 'dropoff_cell_id', 'dropoff_cell_id', 'Long'] ], group_by = ['medallion'] }, empty_taxis_windowed ) empty_taxis_count = AGGREGATE({ name = 'EMPTY_TAXIS', aggregations = [ ['LAST', 'pickup_datetime', 'pickup_datetime','Date'], ['LAST', 'dropoff_datetime', 'dropoff_datetime','Date'], ['COUNT', 'medallion', 'empty_taxis', 'Long'] ] , group_by = ['dropoff_cell_id'] }, last_taxi_event ) /// profit profit_windowed = TIMEWINDOW({ size = [15, 'MINUTES'] }, cell_id_calc ) fare_above_zero = SELECT({ predicate = 'fare_amount >= 0 && tip_amount >= 0' }, profit_windowed ) profit_fare_plus_tip = MAP({ expressions = [ 'pickup_datetime', 'pickup_cell_id', ['fare_amount + tip_amount', 'profit'] ] }, fare_above_zero ) profit_median = AGGREGATE({ name = 'profit', aggregations = [['MEDIAN', 'profit', 'profit']], group_by = ['pickup_cell_id'], fastgrouping = true }, profit_fare_plus_tip ) /// together profit_joined_empty_taxis = JOIN({ name= 'Join', predicate = 'pickup_cell_id = dropoff_cell_id' }, empty_taxis_count, profit_median ) profitability = MAP({ expressions = [ 'pickup_datetime', 'dropoff_datetime', 'dropoff_cell_id', 'empty_taxis', 'profit', ['profit/empty_taxis', 'profitability'] ] }, profit_joined_empty_taxis ) top10 = TOPK({ k = 10, scoringfunction = 'profitability', TIEWITHTIMESTAMP = true }, profitability ) mapedtop10 = MAP({ expressions = [ ['format(elementAt(trigger,0),"YYYY-MM-DD HH:mm:ss"))','pickup_datetime'], ['format(elementAt(trigger,1),"YYYY-MM-DD HH:mm:ss"))','dropoff_datetime'] #LOOP i 0 upto 9 ,['concat(concat(toInteger(elementAt(topk[${i}],2)/${ID_LEFTSHIFT}), "."), toInteger(elementAt(topk[${i}],2)%${ID_LEFTSHIFT}))','profitable_cell_id_${i}'] ,['toInteger(elementAt(topk[${i}],3))','empty_taxies_in_cell_${i}'] ,['toDouble(elementAt(topk[${i}],4))','median_profit_in_cell_${i}'] ,['toDouble(elementAt(topk[${i}],5))','profitability_of_cell_${i}'] #ENDLOOP ,['Latency.latency', 'delay'] ], ALLOWNULL = true, SUPPRESSERRORS = true }, CALCLATENCY(top10) ) #ENDIF