#PARSER PQL
#METADATA TimeInterval
#METADATA Latency
#PARALLELIZATION (type=INTER_OPERATOR) (degree=4) (buffersize=AUTO) 
#DEFINE ID_LEFTSHIFT 1000 /// for internal id calculation
#DEFINE BUFFERSIZE 100000
#IFQNDEF QUERY1
#QNAME QUERY1
#ADDQUERY
query1_start = BUFFER({
                      name="Q1_BUFFER",
                      threaded = true,
                      drainatclose = false,
                      maxbuffersize = ${BUFFERSIZE}
                    },
                    taxi
                )
cell_reduction = MAP({
                      expressions = [
                            'pickup_datetime','dropoff_datetime',                                                                                          
                            ['ceil(pickup_cell_x/2) * 
                                   ${ID_LEFTSHIFT} + ceil(pickup_cell_y/2)', 'pickup_cell_id'],
                            ['ceil(dropoff_cell_x/2) * 
                                  ${ID_LEFTSHIFT} + ceil(dropoff_cell_y/2)', 'dropoff_cell_id']
                      ]                  
                    },
                    query1_start
                  )                        
cell_id_calc = TIMEWINDOW({
                    size = [30, 'MINUTES']                                                                                                                                
                  },
                  cell_reduction
                )            
            
                                
num_trips_per_cellpair = AGGREGATE({
                              id = "TripCellCounter",
                              name = "TripCellCounter",
                              aggregations = [
                                ['LAST', 'pickup_datetime', 'pickup_datetime', 'Date'],
                                ['LAST', 'dropoff_datetime', 'dropoff_datetime', 'Date'],
                                ['COUNT', 'pickup_datetime', 'count']
                              ],
                              group_by = [
                                'pickup_cell_id', 'dropoff_cell_id'                                                                                                                                            
                              ],
                              fastgrouping = false                                                                                                                            
                            },
                            cell_id_calc
                          )
top10_pre = TOPK({
                k = 10,
                scoringfunction = 'count',
                TIEWITHTIMESTAMP = true                                                  
              },
              num_trips_per_cellpair
        )
/// Merge all top k with same time stamp
top10 = CHANGEDETECT({ATTR = ['TimeInterval.start','TimeInterval.end'], SENDLASTOFSAMEOBJECTS = true}, top10_pre)
        
mapped10 = MAP({
            expressions = [
                ['format(elementAt(trigger,2),"YYYY-MM-DD HH:mm:ss")','pickup_datetime'],
                ['format(elementAt(trigger,3),"YYYY-MM-DD HH:mm:ss"))','dropoff_datetime']
                #LOOP i 0 upto 9
                      ,['concat(concat(toInteger(elementAt(topk[${i}],0)/${ID_LEFTSHIFT}), "."), toInteger(elementAt(topk[${i}],0)%${ID_LEFTSHIFT}))','start_cell_id_${i}']
                     ,['concat(concat(toInteger(elementAt(topk[${i}],1)/${ID_LEFTSHIFT}), "."), toInteger(elementAt(topk[${i}],1)%${ID_LEFTSHIFT}))','end_cell_id_${i}']
                     ,['toInteger(elementAt(topk[${i}],4))','v_${i}']
                #ENDLOOP
                ,['Latency.latency', 'delay']
                ,['format(toDate(TimeInterval.start),"YYYY-MM-DD HH:mm:ss")','start']
                ,['format(toDate(TimeInterval.end),"YYYY-MM-DD HH:mm:ss")','end']
            ],
                ALLOWNULL = true,
                SUPPRESSERRORS = true        
          }, 
          CALCLATENCY(top10)
          )
              
#ENDIF
#IFQNDEF QUERY2
#QNAME QUERY2
#ADDQUERY
query2_start = BUFFER({
                  name="Q2_BUFFER",
                  threaded = true,
                  drainatclose = false,
                  maxbuffersize = ${BUFFERSIZE}                  
                },
                taxi
              )
cell_id_calc = MAP({
                    expressions = [
                        'medallion','pickup_datetime', 'dropoff_datetime',                                                                                  
                        ['toLong(pickup_cell_x * ${ID_LEFTSHIFT} + pickup_cell_y)', 'pickup_cell_id'],
                        ['toLong(dropoff_cell_x * ${ID_LEFTSHIFT} + dropoff_cell_y)', 'dropoff_cell_id'],                                        
                        'fare_amount','tip_amount'
                    ]                
                  },
                  query2_start
                ) 
            
/// empty taxis
empty_taxis_windowed = TIMEWINDOW({
                            size = [30, 'MINUTES']                                                                                    
                          },
                          cell_id_calc
                        )
                      
last_taxi_event = AGGREGATE({
                      name = 'LAST_TAXI_EVENT',
                      aggregations = [
                        ['LAST', 'pickup_datetime', 'pickup_datetime','Date'],
                        ['LAST', 'dropoff_datetime', 'dropoff_datetime','Date'],
                        ['LAST', 'dropoff_cell_id', 'dropoff_cell_id', 'Long']
                      ],
                      group_by = ['medallion']                                        
                    },
                    empty_taxis_windowed
                  )
        
empty_taxis_count = AGGREGATE({
                        name = 'EMPTY_TAXIS',
                        aggregations = [
                          ['LAST', 'pickup_datetime', 'pickup_datetime','Date'],
                          ['LAST', 'dropoff_datetime', 'dropoff_datetime','Date'],
                          ['COUNT', 'medallion', 'empty_taxis', 'Long']
                        ]                        ,
                        group_by = ['dropoff_cell_id']                                                
                      },
                      last_taxi_event
                    )
/// profit
profit_windowed = TIMEWINDOW({
                      size = [15, 'MINUTES']                                        
                    },
                    cell_id_calc
                  )
                  
fare_above_zero =  SELECT({
                        predicate = 'fare_amount >= 0 && tip_amount >= 0'                            
                      },
                      profit_windowed
                    )  
              
profit_fare_plus_tip = MAP({
                            expressions = [
                                'pickup_datetime', 'pickup_cell_id',                                                                                                                            
                                ['fare_amount + tip_amount', 'profit']
                            ]                                                    
                          },
                          fare_above_zero
                        )
profit_median = AGGREGATE({
                    name = 'profit',
                    aggregations = [['MEDIAN', 'profit', 'profit']],
                    group_by = ['pickup_cell_id'],
                    fastgrouping = true                                    
                  },
                  profit_fare_plus_tip
                )
/// together
profit_joined_empty_taxis = JOIN({
                                name= 'Join',
                                predicate = 'pickup_cell_id = dropoff_cell_id'                                                            
                              },
                              empty_taxis_count,
                              profit_median
                            )
profitability = MAP({
                    expressions = [
                        'pickup_datetime', 'dropoff_datetime',                                                 
                        'dropoff_cell_id', 'empty_taxis', 'profit',                                                                                            
                        ['profit/empty_taxis', 'profitability']
                    ]                                    
                  },
                  profit_joined_empty_taxis
                )
top10 = TOPK({    k = 10,
                    scoringfunction = 'profitability',
                    TIEWITHTIMESTAMP = true
                  },
                  profitability
            )
        
        
mapedtop10 = MAP({
            expressions = [
                ['format(elementAt(trigger,0),"YYYY-MM-DD HH:mm:ss"))','pickup_datetime'],
                ['format(elementAt(trigger,1),"YYYY-MM-DD HH:mm:ss"))','dropoff_datetime']
                #LOOP i 0 upto 9
                    ,['concat(concat(toInteger(elementAt(topk[${i}],2)/${ID_LEFTSHIFT}), "."), toInteger(elementAt(topk[${i}],2)%${ID_LEFTSHIFT}))','profitable_cell_id_${i}']
                      ,['toInteger(elementAt(topk[${i}],3))','empty_taxies_in_cell_${i}']
                      ,['toDouble(elementAt(topk[${i}],4))','median_profit_in_cell_${i}']
                      ,['toDouble(elementAt(topk[${i}],5))','profitability_of_cell_${i}']
                #ENDLOOP
                ,['Latency.latency', 'delay']
            ],
                ALLOWNULL = true,
                SUPPRESSERRORS = true        
          },
          CALCLATENCY(top10)
         )
#ENDIF
  • No labels