Odyssseus has build in mechanisms to adapt query processing. Here is a very simple example for that.

Create different files with the following input.

The first file is used to import the other files and have an easier handling.

Init.qry
#INCLUDE ${PROJECTPATH}/InitKVStore.qry
#INCLUDE ${PROJECTPATH}/ControllerInit.qry

This file created key value stores

  • query_store should contain Strings with query text
  • context_store could contain different input. Here it will contain only the name of the running query.

Here the input is read from file, as this is easier to handle, Remark: This can also be used to store passwords in files and read them somewhere else with kvReadString.

InitKVStore.qry
#CREATE_KV_STORE query_store MemoryStore (createOnlyIfNotExists=false) 
#KV_STORE_WRITE_FROM_FILE query_store 0 ${PROJECTPATH}/SimpleQuery_0.qry 
#KV_STORE_WRITE_FROM_FILE query_store 1 ${PROJECTPATH}/SimpleQuery_1.qry 

#CREATE_KV_STORE context_store MemoryStore (createOnlyIfNotExists=false) 

The following two queries do nothing special. They just have a timer that is running in different speeds (1000 and 500 milliseconds). The output is end to rabbitmq (so the AMQP/RabbitMQ transport handler must be installed)

SimpleQuery_0.qry
#PARSER PQL
#QNAME Query0
#RUNQUERY
timer = TIMER({
            period = 1000,
            timefromstart = true,
            source = 'timerSource1'
          }
        )

out = SENDER({
          transport='RabbitMQ',
          wrapper='GenericPush',
          protocol='csv',
          datahandler='Tuple',
          sink="SENDER",
          options=[
            ['QUEUE_NAME','Ody_Monitor'],
            ['CONSUMER_TAG','example'],
            ['HOST','localhost']
          ]
        },
        timer
      )
SimpleQuery_1.qry
#PARSER PQL
#QNAME Query1
#RUNQUERY
timer = TIMER({
            period = 500,
            timefromstart = true,
            source = 'timerSource1'
          }
        )

out = SENDER({
          transport='RabbitMQ',
          wrapper='GenericPush',
          protocol='csv',
          datahandler='Tuple',
          sink="SENDER",
          options=[
            ['QUEUE_NAME','Ody_Monitor'],
            ['CONSUMER_TAG','example'],
            ['HOST','localhost']
          ]
        },
        timer
      )


The following query handles the query exchange.

In the first query, the a one time timer is called (i.e. the timer will only run once) to run the following map expression where the query with the id 0 is read from the kv store with name query_store. Additionally, the current query name (Query0) ist written to the context_store.

The command operator installs the query with the query text that is read from the context store with the OdysseusScript parser.

The second query receives the output from one of the above queries (SimpleQuery_0 or SimpleQuery_1) via RabbitMQ. In this simple scenario, each time the value is above 10000, the current query is removed and replaced by the query 1 in the context store (which is not changed here, but could be).

ControllerInit.qry
#PARSER PQL
#RUNQUERY
timer = TIMER({
            period = -1,
            timefromstart = true,
            source = 'controllerTimer'
          }
        )

enrich = MAP({
              expressions = [
                ['kvReadString("query_store",toString(0))','query'],
                ['kvWrite("context_store","currentQuery","Query0")','ignore']
              ]
            },
            timer
          )

command = COMMAND({
              commandexpression='addQuery(query,"OdysseusScript")'
            },
            enrich
          )

#RUNQUERY
input = ACCESS({
            transport='RabbitMQ',
            source= 'Receiver',
            wrapper='GenericPush',
            protocol='csv',
            datahandler='Tuple',
            options=[
              ['QUEUE_NAME','Ody_Monitor'],
              ['CONSUMER_TAG','example'],
              ['HOST','localhost'],
              ['ByteOrder', 'LittleEndian']
            ],
            schema=[['time', 'StartTimestamp']]                                                  
          }                        
        )
        
filter = SELECT({
              predicate = 'time > 10000'
            },
            input
          )

command = COMMAND({
              commandexpression='removeQueryByName(kvReadString("context_store","currentQuery"))'
            },
            filter
          )        

enrich = MAP({
              expressions = [
                ['kvReadString("query_store",toString(1))','query'],
                ['kvWrite("context_store","currentQuery","Query1")','ignore']
              ]
            },
            command
          )

command2 = COMMAND({
              commandexpression='addQuery(query,"OdysseusScript")'
            },
            enrich
          )  

s

  • No labels