Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The moving object algebra defines some functions that work directly on temporal attributes and therefore do not need this kind of translation described above. An example would be the SpeedFunction from the spatio-temporal feature. It takes a temporal spatial point (tpoint) and creates a tdouble with the speed of the object at each point in time. A direct temporal function implements the interface TemporalFunction. If the temporal function does not return a temporal value itself, but a non-temporal , it also implements the interface RemoveTemporalFunction. An example would be the TrajectoryFunction from the spatio-temporal feature, which gets a tpoint and creates a non-temporal trajectory, i.e., a spatial LineString.

...

Combining Functions in Mixed Expressions
Anchor
Limitations
Limitations

As of now, there is a limitation when combining When mixing direct temporal functions and normal functions in one expression. In that case, Odysseus creates an error message and you have to split your expression in multiple consecutive map operators. This is because the temporal feature creates a TemporalRelationalExpression for the whole expression, which then evaluates it in a non-temporal way. Unfortunately, the temporal function needs a temporal value as its input and not a non-temporal value. Splitting the expression in two parts helps here, as the following example shows (already existing in Odysseus, need non-temporal input values), a MixedTemporalExpression is created. As a user you don't have to worry about this and can simply use them as if they were not-mixed expressions. Except - it could be that they are a little slower. From a functional aspect, they are identical as if a normal TemporalRelationalExpression is used. The following example shows a mixed expression with Trajectory being a direct temporal function and SpatialLength a normal non-temporal function:

Code Block
// Not possible You can combine these two ...
calcTraj = MAP( {
	expressions = [[ ’ SpatialLength ( Trajectory ( tempSpatialPoint , PredictionTimes ) ) ’ , ’trajtraj ’]],
	keepinput = true
} , predTime )

// Possible
calcTraj = MAP( {
	expressions = [[ ’ Trajectory ( tempSpatialPoint , PredictionTimes ) ’ , ’ traj ’’SpatialLength(traj)’,’len’]],
	keepinput = true
} , predTimecalcTraj )

// ... into this mixed expression
calcTraj = MAP( { 
	expressions = [[’SpatialLength(traj)’,’len’ ’ SpatialLength ( Trajectory ( tempSpatialPoint , PredictionTimes ) ) ’ , ’traj ’]], 
	keepinput = true
 } , calcTrajpredTime )

Operators

As a user, you can use the normal operators as you would without using temporal attributes. Nevertheless, the operators behave a little different. Their behavior with the temporal feature is explained in the following. Additionally, you need to define the PredictionTime, which is done with the PredictionTime operator. Additionally, you need to create some temporal attributes. We call this process temporalization, which is also explained in the following.

...

  • A union merge function for the PredictionTimes is applied. This leads to a result that ignores if some elements that participate in an aggregation are not valid in the prediction time dimension at some points in time, which is probably what the user wants. An intersection could be useful in some situations, but is not the default option in the implementation.
  • Most aggregation functions are incremental functions. Currently, only those are supported to work with temporal attributes. They are translated to a TemporalIncrementalAggregationFunction, which creates a temporal output for the function.

Examples

Text

Energy Consumption

Text

In the following, a few examples that use the temporal feature are presented.

Energy Consumption

Imagine you have a few smart meters that send you the current total energy value in an interval of 15 minutes. You need to know the consumption in-between these measurements or you want to know which households are consuming a high amount of energy right now. The following query does that for you. It takes the energy consumption and converts it into temporal attributes. Then the prediction time is set to the next 15 minutes. The derivative function can be used to get the wh per minute. Finally, the select operator selects the points in time where the consumption is "high".

Code Block
#PARSER PQL
#DOREWRITE false

#DEFINE PATH_LOCAL '/media/mydata/dev/odysseus_workspace/phd-workspace/Moving Object/Evaluation/scenarios/energy/energy_data.csv'
#ADDQUERY


input = ACCESS({
            source='households',
            wrapper='GenericPull',
            schema=[
              ['id','Integer'],
              ['wh','Integer'],
              ['BaseDateTime','StartTimeStamp']
            ],
            inputschema=['Integer','Integer','Integer'],
            transport='File',
            protocol='csv',
            datahandler='Tuple',
            metaattribute = ['TimeInterval',  'PredictionTimes'],
            options=[
              ['filename', ${PATH_LOCAL}],
              ['Delimiter',','],
              ['TextDelimiter','"'],
              ['delay','1000'],
              ['readfirstline','false'],
              ['BaseTimeUnit','MINUTES']
            ]                                                                                                                                
          }                                                                                                        
        )
        
/// Only use the last two measured values
time = TIMEWINDOW({
            size = [20, 'minutes']                                                                                                                                                                                                                                                                              
          },
          input
        )
        
/// Convert the wh-attribute to a temporal double
temporalize = AGGREGATION({
                  aggregations = [
                    ['function' = 'ToTemporalDouble', 'input_attributes' = 'wh', 'output_attributes' = 'temp_wh']
                  ],
                  group_by = ['id'],
                  eval_at_outdating = false                                                                                                                                                                                                                                                                                                                                                                                
                },
                time
              )
              
predTime = PREDICTIONTIME({
                addtostartvalue = [0, 'MINUTES'],
                addtoendvalue = [15, 'MINUTEs']                                                                      
              },
              temporalize
            )
            
            
/// Energy consumption per household per minute in the next 15 minutes
derivative = MAP({
                  expressions = [
                    ['derivative(temp_wh, PredictionTimes)','whPerMinute'],
                    ['id','id']
                  ]           
                },
                predTime
              )
              
/// Energy consumption per household per minute in the next 15 minutes
watts = MAP({
                  expressions = [
                    ['whPerMinute * 60','watt'],
                    ['id','id']
                  ]           
                },
                derivative
              )

/// Filter out elements with a low energy consumption              
highConsumption = SELECT({
                      predicate = 'watt > 300'
                    },
                    watts
                  )

And here's an example file with energy consumption data:

Code Block
titleenergy_data.csv
id,wh,time
1,0,0
2,0,3
3,0,6
1,115,15
2,50,18
3,250,21
1,200,30
2,60,33
3,500,36
1,210,45
2,100,48
3,600,51

Spatio-Temporal Radius

This is a spatio-temporal query, i.e., you need the spatio-temporal feature. It searches for all ships (moving objects) that are is an radius of 5000 meters to the ships with the ids 367629990 and 316004106.

Code Block
collapsetrue
#PARSER PQL
#QNAME radius_normal

#PARSER PQL
#DEFINE data_path '/home/odysseus/data/2017/AIS_2017_04_01_Zone10_1000_1200.csv'
#DEFINE input_delay_ms 0
#DEFINE result_path '/home/odysseus/data/eval/radius_normal/join/radius_ToLinearTemporalPoint30m_2c_1MILLISECONDS_10pmin_5000m_buffer_1000000_0.csv'
#DEFINE radius 5000
#DEFINE center_ids toList(367629990,316004106)
#DEFINE time_other_join_window 1
#DEFINE unit_other_join_window 'MILLISECONDS'
#DEFINE prediction_minutes 10
#DEFINE temporalization_window 30
#DEFINE temporalization_method 'ToLinearTemporalPoint'
#DEFINE buffer_size 1000000

#RUNQUERY

input = ACCESS({
            source='vesselRoute2',
            wrapper='GenericPull',
            schema=[
              ['MMSI','Long'],
              ['BaseDateTime','StartTimeStamp'],
              ['latitude','Double'],
              ['longitude','Double'],
              ['SOG','Double'],
              ['COG','Double'],
              ['Heading','Double'],
              ['VesselName','String'],
              ['IMO','String'],
              ['CallSign','String'],
              ['VesselType','String'],
              ['Status','String'],
              ['Length','Double'],
              ['Width','Double'],
              ['Draft','Double'],
              ['Cargo','Integer']
            ],
            inputschema=[
'Long','String','Double','Double','Double','Double','Double','String','String','String','String','String','Double','Double','Double','Integer'                                                                                
            ],
            transport='File',
            protocol='csv',
            metaattribute = [
'TimeInterval',  'PredictionTimes', 'Datarate', 'Latency'                                                                                    
            ],
            datahandler='Tuple',
            dateformat = 'yyyy-MM-dd\'T\'HH:mm:ss',
            options=[
              ['filename', ${data_path}],
              ['Delimiter',','],
              ['TextDelimiter','"'],
              ['delay',${input_delay_ms}],
              ['readfirstline','false']
            ]                                                                    
          }                                                        
        )
        
/// Measure the datarate
rate = DATARATE({
            updaterate = 100,
            key = 'datarate'                                                                                                                                            
          },
          input
        )
        
/// We only want to use the elements from the last time
time = TIMEWINDOW({
            size = [${temporalization_window}, 'minutes']                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                
          },
          rate
        )
        
createSpatialObject = MAP({
                          expressions = [
                            ['ToPoint(latitude, longitude, 4326)', 'SpatialPoint'],
                            ['SOG * 0.514444', 'SOGms'],
                            ['MMSI','id'],
                            ['BaseDateTime','recordDateTime']
                          ],
                          keepinput = false                                                                                                                                                                                                                                                                                                                                                                        
                        },
                        time
                      )
                      
/// Temporalize the location attribute
temporalize = AGGREGATION({
                  aggregations = [
                    ['function' = ${temporalization_method}, 'input_attributes' = 'SpatialPoint', 'output_attributes' = 'temp_SpatialPoint'],
                    ['function' = 'Trigger']
                  ],
                  group_by = ['id'],
                  eval_at_outdating = false                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                         
                },
                createSpatialObject
              )
              
lat1 = CALCLATENCY(temporalize                                                
        )
        
maplat1 = MAP({
              expressions = [['latency', 'latency1']],
              keepinput = true                                        
            },
            lat1
          )
              
/// Select the center        
selectCenter = SELECT({
                    predicate = 'contains(id, ${center_ids})',
                    heartbeatrate = 1
                  },
                  maplat1
                )
                      
renameCenter = MAP({
                    expressions = [
                      ['id','id_center'],
                      ['temp_SpatialPoint','center_temp_SpatialPoint'],
                      ['latency','center_latency1']
                    ]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                    
                  },
                  selectCenter
                )
                
allObjects = TIMEWINDOW({
                  size = [60, 'minutes']                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  
                },
                maplat1
              )
        
                
updateOnCenter = TIMEWINDOW({
                      size = [
${time_other_join_window}, ${unit_other_join_window}                                                                  
                      ]                                                                                                                                                                                                        
                    },
                    renameCenter
                  )
                
recombine = JOIN({
                predicate = 'id != id_center',
                elementsizeport0 = 1,
                elementsizeport1 = 1,
                group_by_port_0 = ['id_center'],
                group_by_port_1 = ['id']                                                                                    
              },
              updateOnCenter,
              allObjects
            )
           
            
lat2 = CALCLATENCY(recombine                                                
        )
             
maplat2 = MAP({
              expressions = [
                ['min(latency1, center_latency1)', 'latency_min1'],
                ['latency', 'latency2']
              ],
              keepinput = true                                        
            },
            lat2
          )
          
#IF toInteger(buffer_size) > 0
buf = BUFFER({
          THREADED = true,
          maxbuffersize = ${buffer_size}
        },
        maplat2
      )

            
/// Set the prediction time
predTime = PREDICTIONTIME({
                addtostartvalue = [0, 'MINUTES'],
                addtoendvalue = [${prediction_minutes}, 'MINUTES'],
                predictionbasetimeunit = 'MINUTES'                                                                                                                                                                                                                                                                                                                                                                                                                      
              },
              buf
            )
#ELSE
/// Set the prediction time
predTime = PREDICTIONTIME({
                addtostartvalue = [0, 'MINUTES'],
                addtoendvalue = [${prediction_minutes}, 'MINUTES'],
                predictionbasetimeunit = 'MINUTES'                                                                                                                                                                                                                                                                                                                                                                                                                      
              },
              maplat2
            ) 
#ENDIF

              
/// Calculate exact distance for refine step
calcDistance = MAP({
                    expressions = [
          
Code Block
#PARSER PQL
#DOREWRITE false

#DEFINE PATH_LOCAL '/media/mydata/dev/odysseus_workspace/phd-workspace/Moving Object/Evaluation/scenarios/energy/energy_data.csv'
#ADDQUERY


input = ACCESS({
            source='households',['OrthodromicDistance(center_temp_SpatialPoint, temp_SpatialPoint)','tdistance']
            wrapper='GenericPull',
        ],
    schema=[
              ['id','Integer'],
  keepinput = true          ['wh','Integer'],
              ['BaseDateTime','StartTimeStamp']
            ],
            inputschema=['Integer','Integer','Integer'],
            transport='File',
            protocol='csv',
            datahandler='Tuple',
            metaattribute = ['TimeInterval',  'PredictionTimes'],
            options=[
              ['filename', ${PATH_LOCAL}],
              ['Delimiter',','],
              ['TextDelimiter','"'],
              ['delay','1000'],
              ['readfirstline','false'],
              ['BaseTimeUnit','MINUTES']
            ]                                                                                                                                
          }                                                                                                        
        )
          
/// Only use the last two measured values
time = TIMEWINDOW({
},
                  predTime
      size = [20, 'minutes']       )

/// Refine step
distanceSelect = SELECT({
                      predicate = 'tdistance < ${radius}'                                                                                                                                                                                                                                         
          },
          input
        )
        
/// Convert the wh-attribute to a temporal double
temporalize = AGGREGATION({
                  aggregations = [
                    ['function' = 'ToTemporalDouble', 'input_attributes' = 'wh', 'output_attributes' = 'temp_wh']
                  ],
                  group_by = ['id'],
                  eval_at_outdating = false                                                                                                                    
                    },
                    calcDistance
                  )   

lat3 = CALCLATENCY(distanceSelect                                        
        )
      
/// Get datarate into data
getDatarate = MAP({
                  expressions = [
                    ['id_center', 'id_center'],
                    ['id', 'id'],
                    ['latency_min1', 'latency1'],
                    ['latency2', 'latency2'],
                    ['latency', 'latency3'],
    
                },['last(first(Measurements))','datarate']
                time
  ],
            )
      keepinput = false      
predTime = PREDICTIONTIME({
                addtostartvalue = [0, 'MINUTES'],
                addtoendvalue = [15, 'MINUTEs']                                                                      
              },
              temporalize
            )
    },
        
        lat3
    
/// Energy consumption per household per minute in the next 15 minutes)
derivative = MAP({
                  expressions = [
  
/// Store the results
output = SENDER({
             ['derivative(temp_wh, PredictionTimes)','whPerMinute'] sink='Sink',
                    ['id','id']wrapper='GenericPush',
              transport='File',
    ]           protocol='CSV',
                }datahandler='Tuple',
              options=[
  predTime
              )['delimiter',','],
              
/// Energy consumption per household per minute in the next 15 minutes
watts = MAP({
      ['textDelimiter',"'"],
                ['csv.writeheading',true],
              expressions = ['filename', ${result_path}],
                    ['whPerMinute * 60createdir','watt' true],
              ]      ['id','id']
                  ]           
                },
                derivative
              )

/// Filter out elements with a low energy consumption              
highConsumption = SELECT({
                      predicate = 'watt > 300'
                    },
        
            watts},
            getDatarate
          )     

If you want to try this query, you can do it with this data from the marine cadastre. It is a little preprocessed:

View file
nameAIS_2017_04_01_Zone10_1000_1200.csv
height150