Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

The temporal feature integrated temporal attributes into Odysseus. These can be useful to predict unknown or future values of an attribute that develops over time. The temporal feature brings a complete semantics for temporal processing and is the foundation for spatio-temporal processing in Odysseus. The temporal feature is closely related to the moving object algebra by Güting et al. The implementation is a prototype and can be buggy in some places. This article is meant to give you an overview if you want to use or extend this part of Odysseus.

Temporal Attributes

Temporal attributes are attributes that not only have one value, but one value for each point in time for a certain time interval. In other words, a temporal attribute of a certain type is a function that for each point in time returns a value of its type. Take a temporal integer over the interval [0,5) as an example. It could be the function f(t) = 5 * t. Then, it has five values, one for each point in the time interval: 0→ 0, 1→ 5, 2→ 10, 3→ 15, 4→ 20. A temporal integer can be denoted as tinteger, a temporal boolean as tboolean and so on.

...

The moving object algebra defines some functions that work directly on temporal attributes and therefore do not need this kind of translation described above. An example would be the SpeedFunction from the spatio-temporal feature. It takes a temporal spatial point (tpoint) and creates a tdouble with the speed of the object at each point in time. A direct temporal function implements the interface TemporalFunction. If the temporal function does not return a temporal value itself, but a non-temporal , it also implements the interface RemoveTemporalFunction. An example would be the TrajectoryFunction from the spatio-temporal feature, which gets a tpoint and creates a non-temporal trajectory, i.e., a spatial LineString.

...

Combining Functions in Mixed Expressions
Anchor
Limitations
Limitations

As of now, there is a limitation when combining When mixing direct temporal functions and normal functions in one expression. In that case, Odysseus creates an error message and you have to split your expression in multiple consecutive map operators. This is because the temporal feature creates a TemporalRelationalExpression for the whole expression, which then evaluates it in a non-temporal way. Unfortunately, the temporal function needs a temporal value as its input and not a non-temporal value. Splitting the expression in two parts helps here, as the following example shows (already existing in Odysseus, need non-temporal input values), a MixedTemporalExpression is created. As a user you don't have to worry about this and can simply use them as if they were not-mixed expressions. Except - it could be that they are a little slower. From a functional aspect, they are identical as if a normal TemporalRelationalExpression is used. The following example shows a mixed expression with Trajectory being a direct temporal function and SpatialLength a normal non-temporal function:

Code Block
// Not possible You can combine these two ...
calcTraj = MAP( {
	expressions = [[ ’ SpatialLength ( Trajectory ( tempSpatialPoint , PredictionTimes ) ) ’ , ’trajtraj ’]],
	keepinput = true
} , predTime )

// Possible
calcTraj = MAP( {
	expressions = [[ ’ Trajectory ( tempSpatialPoint , PredictionTimes ) ’ , ’ traj ’’SpatialLength(traj)’,’len’]],
	keepinput = true
} , predTimecalcTraj )

// ... into this mixed expression
calcTraj = MAP( { 
	expressions = [[’SpatialLength(traj)’,’len’ ’ SpatialLength ( Trajectory ( tempSpatialPoint , PredictionTimes ) ) ’ , ’traj ’]], 
	keepinput = true
 } , calcTrajpredTime )

Operators

As a user, you can use the normal operators as you would without using temporal attributes. Nevertheless, the operators behave a little different. Their behavior with the temporal feature is explained in the following. Additionally, you need to define the PredictionTime, which is done with the PredictionTime operator. Additionally, you need to create some temporal attributes. We call this process temporalization, which is also explained in the following.

...

  • A union merge function for the PredictionTimes is applied. This leads to a result that ignores if some elements that participate in an aggregation are not valid in the prediction time dimension at some points in time, which is probably what the user wants. An intersection could be useful in some situations, but is not the default option in the implementation.
  • Most aggregation functions are incremental functions. Currently, only those are supported to work with temporal attributes. They are translated to a TemporalIncrementalAggregationFunction, which creates a temporal output for the function.

Examples

In the following, a few examples that use the temporal feature are presented.

Energy Consumption

Imagine you have a few smart meters that send you the current total energy value in an interval of 15 minutes. You need to know the consumption in-between these measurements or you want to know which households are consuming a high amount of energy right now. The following query does that for you. It takes the energy consumption and converts it into temporal attributes. Then the prediction time is set to the next 15 minutes. The derivative function can be used to get the wh per minute. Finally, the select operator selects the points in time where the consumption is "high".

Code Block
#PARSER PQL
#DOREWRITE false

#DEFINE PATH_LOCAL '/media/mydata/dev/odysseus_workspace/phd-workspace/Moving Object/Evaluation/scenarios/energy/energy_data.csv'
#ADDQUERY


input = ACCESS({
            source='households',
            wrapper='GenericPull',
            schema=[
              ['id','Integer'],
              ['wh','Integer'],
              ['BaseDateTime','StartTimeStamp']
            ],
            inputschema=['Integer','Integer','Integer'],
            transport='File',
            protocol='csv',
            datahandler='Tuple',
            metaattribute = ['TimeInterval',  'PredictionTimes'],
            options=[
              ['filename', ${PATH_LOCAL}],
              ['Delimiter',','],
              ['TextDelimiter','"'],
              ['delay','1000'],
              ['readfirstline','false'],
              ['BaseTimeUnit','MINUTES']
            ]                                                                                                                                
          }                                                                                                        
        )
        
/// Only use the last two measured values
time = TIMEWINDOW({
            size = [20, 'minutes']                                                                                                                                                                                                                                                                              
          },
          input
        )
        
/// Convert the wh-attribute to a temporal double
temporalize = AGGREGATION({
                  aggregations = [
                    ['function' = 'ToTemporalDouble', 'input_attributes' = 'wh', 'output_attributes' = 'temp_wh']
                  ],
                  group_by = ['id'],
                  eval_at_outdating = false                                                                                                                                                                                                                                                                                                                                                                                
                },
                time
              )
              
predTime = PREDICTIONTIME({
                addtostartvalue = [0, 'MINUTES'],
                addtoendvalue = [15, 'MINUTEs']                                                                      
              },
              temporalize
            )
            
            
/// Energy consumption per household per minute in the next 15 minutes
derivative = MAP({
                  expressions = [
                    ['derivative(temp_wh, PredictionTimes)','whPerMinute'],
                    ['id','id']
                  ]           
                },
                predTime
              )
              
/// Energy consumption per household per minute in the next 15 minutes
watts = MAP({
                  expressions = [
                    ['whPerMinute * 60','watt'],
                    ['id','id']
                  ]           
                },
                derivative
              )

/// Filter out elements with a low energy consumption              
highConsumption = SELECT({
                      predicate = 'watt > 300'
                    },
                    watts
                  )

And here's an example file with energy consumption data:

Code Block
titleenergy_data.csv
id,wh,time
1,0,0
2,0,3
3,0,6
1,115,15
2,50,18
3,250,21
1,200,30
2,60,33
3,500,36
1,210,45
2,100,48
3,600,51

Spatio-Temporal Radius

This is a spatio-temporal query, i.e., you need the spatio-temporal feature. It searches for all ships (moving objects) that are is an radius of 5000 meters to the ships with the ids 367629990 and 316004106.

Code Block
collapsetrue
#PARSER PQL
#QNAME radius_normal

#PARSER PQL
#DEFINE data_path '/home/odysseus/data/2017/AIS_2017_04_01_Zone10_1000_1200.csv'
#DEFINE input_delay_ms 0
#DEFINE result_path '/home/odysseus/data/eval/radius_normal/join/radius_ToLinearTemporalPoint30m_2c_1MILLISECONDS_10pmin_5000m_buffer_1000000_0.csv'
#DEFINE radius 5000
#DEFINE center_ids toList(367629990,316004106)
#DEFINE time_other_join_window 1
#DEFINE unit_other_join_window 'MILLISECONDS'
#DEFINE prediction_minutes 10
#DEFINE temporalization_window 30
#DEFINE temporalization_method 'ToLinearTemporalPoint'
#DEFINE buffer_size 1000000

#RUNQUERY

input = ACCESS({
            source='vesselRoute2',
            wrapper='GenericPull',
            schema=[
              ['MMSI','Long'],
              ['BaseDateTime','StartTimeStamp'],
              ['latitude','Double'],
              ['longitude','Double'],
              ['SOG','Double'],
              ['COG','Double'],
              ['Heading','Double'],
              ['VesselName','String'],
              ['IMO','String'],
              ['CallSign','String'],
              ['VesselType','String'],
              ['Status','String'],
              ['Length','Double'],
              ['Width','Double'],
              ['Draft','Double'],
              ['Cargo','Integer']
            ],
            inputschema=[
'Long','String','Double','Double','Double','Double','Double','String','String','String','String','String','Double','Double','Double','Integer'                                                                                
            ],
            transport='File',
            protocol='csv',
            metaattribute = [
'TimeInterval',  'PredictionTimes', 'Datarate', 'Latency'                                                                                    
            ],
            datahandler='Tuple',
            dateformat = 'yyyy-MM-dd\'T\'HH:mm:ss',
            options=[
              ['filename', ${data_path}],
              ['Delimiter',','],
              ['TextDelimiter','"'],
              ['delay',${input_delay_ms}],
              ['readfirstline','false']
            ]                                                                    
          }                                                        
        )
        
/// Measure the datarate
rate = DATARATE({
            updaterate = 100,
            key = 'datarate'                                                                                                                                            
          },
          input
        )
        
/// We only want to use the elements from the last time
time = TIMEWINDOW({
            size = [${temporalization_window}, 'minutes']                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                
          },
          rate
        )
        
createSpatialObject = MAP({
                          expressions = [
                            ['ToPoint(latitude, longitude, 4326)', 'SpatialPoint'],
                            ['SOG * 0.514444', 'SOGms'],
                            ['MMSI','id'],
                            ['BaseDateTime','recordDateTime']
                          ],
                          keepinput = false                                                                                                                                                                                                                                                                                                                                                                        
                        },
                        time
                      )
                      
/// Temporalize the location attribute
temporalize = AGGREGATION({
                  aggregations = [
                    ['function' = ${temporalization_method}, 'input_attributes' = 'SpatialPoint', 'output_attributes' = 'temp_SpatialPoint'],
                    ['function' = 'Trigger']
                  ],
                  group_by = ['id'],
                  eval_at_outdating = false                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                         
                },
                createSpatialObject
              )
              
lat1 = CALCLATENCY(temporalize                                                
        )
        
maplat1 = MAP({
              expressions = [['latency', 'latency1']],
              keepinput = true                                        
            },
            lat1
          )
              
/// Select the center        
selectCenter = SELECT({
                    predicate = 'contains(id, ${center_ids})',
                    heartbeatrate = 1
                  },
                  maplat1
                )
                      
renameCenter = MAP({
                    expressions = [
                      ['id','id_center'],
                      ['temp_SpatialPoint','center_temp_SpatialPoint'],
                      ['latency','center_latency1']
                    ]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                    
                  },
                  selectCenter
                )
                
allObjects = TIMEWINDOW({
                  size = [60, 'minutes']                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  
                },
                maplat1
              )
        
                
updateOnCenter = TIMEWINDOW({
                      size = [
${time_other_join_window}, ${unit_other_join_window}                                                                  
                      ]                                                                                                                                                                                                        
                    },
                    renameCenter
                  )
                
recombine = JOIN({
                predicate = 'id != id_center',
                elementsizeport0 = 1,
                elementsizeport1 = 1,
                group_by_port_0 = ['id_center'],
                group_by_port_1 = ['id']                                                                                    
              },
              updateOnCenter,
              allObjects
            )
           
            
lat2 = CALCLATENCY(recombine                                                
        )
             
maplat2 = MAP({
              expressions = [
                ['min(latency1, center_latency1)', 'latency_min1'],
                ['latency', 'latency2']
              ],
              keepinput = true                                        
            },
            lat2
          )
          
#IF toInteger(buffer_size) > 0
buf = BUFFER({
          THREADED = true,
          maxbuffersize = ${buffer_size}
        },
        maplat2
      )

            
/// Set the prediction time
predTime = PREDICTIONTIME({
                addtostartvalue = [0, 'MINUTES'],
                addtoendvalue = [${prediction_minutes}, 'MINUTES'],
                predictionbasetimeunit = 'MINUTES'                                                                                                                                                                                                                                                                                                                                                                                                                      
              },
              buf
            )
#ELSE
/// Set the prediction time
predTime = PREDICTIONTIME({
                addtostartvalue = [0, 'MINUTES'],
                addtoendvalue = [${prediction_minutes}, 'MINUTES'],
                predictionbasetimeunit = 'MINUTES'                                                                                                                                                                                                                                                                                                                                                                                                                      
              },
              maplat2
            ) 
#ENDIF

              
/// Calculate exact distance for refine step
calcDistance = MAP({
                    expressions = [
                      ['OrthodromicDistance(center_temp_SpatialPoint, temp_SpatialPoint)','tdistance']
                    ],
                    keepinput = true                                                                                                                                                                                                                                                                                                                                                                                                                                                                  
                  },
                  predTime
                )

/// Refine step
distanceSelect = SELECT({
                      predicate = 'tdistance < ${radius}'                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                    
                    },
                    calcDistance
                  )   

lat3 = CALCLATENCY(distanceSelect                                        
        )
      
/// Get datarate into data
getDatarate = MAP({
                  expressions = [
                    ['id_center', 'id_center'],
                    ['id', 'id'],
                    ['latency_min1', 'latency1'],
                    ['latency2', 'latency2'],
                    ['latency', 'latency3'],
                    ['last(first(Measurements))','datarate']
                  ],
                  keepinput = false                                                                                                                                                
                },
                lat3
              )
                        
/// Store the results
output = SENDER({
              sink='Sink',
              wrapper='GenericPush',
              transport='File',
              protocol='CSV',
              datahandler='Tuple',
              options=[
                ['delimiter',','],
                ['textDelimiter',"'"],
                ['csv.writeheading',true],
                ['filename', ${result_path}],
                ['createdir', true]
              ]                                                                                                                                                               
            },
            getDatarate
          )     

If you want to try this query, you can do it with this data from the marine cadastre. It is a little preprocessed:

View file
nameAIS_2017_04_01_Zone10_1000_1200.csv
height150
Text