Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
titleenergy_data.csv
id,wh,time
1,0,0
2,0,3
3,0,6
1,115,15
2,50,18
3,250,21
1,200,30
2,60,33
3,500,36
1,210,45
2,100,48
3,600,51

Spatio-Temporal Radius

This is a spatio-temporal query, i.e., you need the spatio-temporal feature. It searches for all ships (moving objects) that are is an radius of 5000 meters to the ships with the ids 367629990 and 316004106.

Code Block
collapsetrue
#PARSER PQL
#QNAME radius_normal

#PARSER PQL
#DEFINE data_path '/home/odysseus/data/2017/AIS_2017_04_01_Zone10_1000_1200.csv'
#DEFINE input_delay_ms 0
#DEFINE result_path '/home/odysseus/data/eval/radius_normal/join/radius_ToLinearTemporalPoint30m_2c_1MILLISECONDS_10pmin_5000m_buffer_1000000_0.csv'
#DEFINE radius 5000
#DEFINE center_ids toList(367629990,316004106)
#DEFINE time_other_join_window 1
#DEFINE unit_other_join_window 'MILLISECONDS'
#DEFINE prediction_minutes 10
#DEFINE temporalization_window 30
#DEFINE temporalization_method 'ToLinearTemporalPoint'
#DEFINE buffer_size 1000000
#DEFINE NUMBER_STEPS 3

#RUNQUERY

input = ACCESS({
            source='vesselRoute2',
            wrapper='GenericPull',
            schema=[
              ['MMSI','Long'],
              ['BaseDateTime','StartTimeStamp'],
              ['latitude','Double'],
              ['longitude','Double'],
              ['SOG','Double'],
              ['COG','Double'],
              ['Heading','Double'],
              ['VesselName','String'],
              ['IMO','String'],
              ['CallSign','String'],
              ['VesselType','String'],
              ['Status','String'],
              ['Length','Double'],
              ['Width','Double'],
              ['Draft','Double'],
              ['Cargo','Integer']
            ],
            inputschema=[
'Long','String','Double','Double','Double','Double','Double','String','String','String','String','String','Double','Double','Double','Integer'                                                                                
            ],
            transport='File',
            protocol='csv',
            metaattribute = [
'TimeInterval',  'PredictionTimes', 'Datarate', 'Latency'                                                                                    
            ],
            datahandler='Tuple',
            dateformat = 'yyyy-MM-dd\'T\'HH:mm:ss',
            options=[
              ['filename', ${data_path}],
              ['Delimiter',','],
              ['TextDelimiter','"'],
              ['delay',${input_delay_ms}],
              ['readfirstline','false']
            ]                                                                    
          }                                                        
        )
        
/// Measure the datarate
rate = DATARATE({
            updaterate = 100,
            key = 'datarate'                                                                                                                                            
          },
          input
        )
        
        /// We only want to use the elements from the last time
time = TIMEWINDOW({
            size = [${temporalization_window}, 'minutes']                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                
          },
          rate
        )
        
createSpatialObject = MAP({
                          expressions = [
                            ['ToPoint(latitude, longitude, 4326)', 'SpatialPoint'],
                            ['SOG * 0.514444', 'SOGms'],
                            ['MMSI','id'],
                            ['BaseDateTime','recordDateTime']
                          ],
                          keepinput = false                                                                                                                                                                                                                                                                                                                                                                        
                        },
                        time
                      )
                      
/// Temporalize the location attribute
temporalize = AGGREGATION({
                  aggregations = [
                    ['function' = ${temporalization_method}, 'input_attributes' = 'SpatialPoint', 'output_attributes' = 'temp_SpatialPoint'],
                    ['function' = 'Trigger']
                  ],
                  group_by = ['id'],
                  eval_at_outdating = false                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                         
                },
                createSpatialObject
              )
              
lat1 = CALCLATENCY(temporalize                                                
        )
        
maplat1 = MAP({
              expressions = [['latency', 'latency1']],
              keepinput = true                                        
            },
            lat1
          )
              
/// Select the center        
selectCenter = SELECT({
                    predicate = 'contains(id, ${center_ids})',
                    heartbeatrate = 1
                  },
                  maplat1
                )
                      
renameCenter = MAP({
                    expressions = [
                      ['id','id_center'],
                      ['temp_SpatialPoint','center_temp_SpatialPoint'],
                      ['latency','center_latency1']
                    ]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                    
                  },
                  selectCenter
                )
                
allObjects = TIMEWINDOW({
                  size = [60, 'minutes']                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  
                },
                maplat1
              )
        
                
updateOnCenter = TIMEWINDOW({
                      size = [
${time_other_join_window}, ${unit_other_join_window}                                                                  
                      ]                                                                                                                                                                                                        
                    },
                    renameCenter
                  )
                
recombine = JOIN({
                predicate = 'id != id_center',
                elementsizeport0 = 1,
                elementsizeport1 = 1,
                group_by_port_0 = ['id_center'],
                group_by_port_1 = ['id']                                                                                    
              },
              updateOnCenter,
              allObjects
            )
           
            
lat2 = CALCLATENCY(recombine                                                
        )
             
maplat2 = MAP({
              expressions = [
                ['min(latency1, center_latency1)', 'latency_min1'],
                ['latency', 'latency2']
              ],
              keepinput = true                                        
            },
            lat2
          )
          
#IF toInteger(buffer_size) > 0
buf = BUFFER({
          THREADED = true,
          maxbuffersize = ${buffer_size}
        },
        maplat2
      )

            
/// Set the prediction time
predTime = PREDICTIONTIME({
                addtostartvalue = [0, 'MINUTES'],
                addtoendvalue = [${prediction_minutes}, 'MINUTES'],
                predictionbasetimeunit = 'MINUTES'                                                                                                                                                                                                                                                                                                                                                                                                                      
              },
              buf
            )
#ELSE
/// Set the prediction time
predTime = PREDICTIONTIME({
                addtostartvalue = [0, 'MINUTES'],
                addtoendvalue = [${prediction_minutes}, 'MINUTES'],
                predictionbasetimeunit = 'MINUTES'                                                                                                                                                                                                                                                                                                                                                                                                                      
              },
              maplat2
            ) 
#ENDIF

              
/// Calculate exact distance for refine step
calcDistance = MAP({
                    expressions = [
                      ['OrthodromicDistance(center_temp_SpatialPoint, temp_SpatialPoint)','tdistance']
                    ],
                    keepinput = true                                                                                                                                                                                                                                                                                                                                                                                                                                                                  
                  },
                  predTime
                )

/// Refine step
distanceSelect = SELECT({
                      predicate = 'tdistance < ${radius}'                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                    
                    },
                    calcDistance
                  )   

lat3 = CALCLATENCY(distanceSelect                                        
        )
      
/// Get datarate into data
getDatarate = MAP({
                  expressions = [
                    ['id_center', 'id_center'],
                    ['id', 'id'],
                    ['latency_min1', 'latency1'],
                    ['latency2', 'latency2'],
                    ['latency', 'latency3'],
                    ['last(first(Measurements))','datarate']
                  ],
                  keepinput = false                                                                                                                                                
                },
                lat3
              )
                        
/// Store the results
output = SENDER({
              sink='Sink',
              wrapper='GenericPush',
              transport='File',
              protocol='CSV',
              datahandler='Tuple',
              options=[
                ['delimiter',','],
                ['textDelimiter',"'"],
                ['csv.writeheading',true],
                ['filename', ${result_path}],
                ['createdir', true]
              ]                                                                                                                                                               
            },
            getDatarate
          )     

If you want to try this query, you can do it with this data from the marine cadastre. It is a little preprocessed:

View file
nameAIS_2017_04_01_Zone10_1000_1200.csv
height150

...