Remark: This window is very complex and may somethime not behave as expected. It can simulate any other window, but if not necessary, you should prefer TimeWindow or ElementWindow

The predicate window opens and closes the window regarding a start and optional an end condition. Elements that are not inside a window are discarded and send to output port 1.

The operator works as follows:



For the output there are different configurations:

To simulate some kind of sliding window, the following parameters can be used:

Remark: Advance is only used, if an output is generated and will be used after the results are produced. To clear the buffer independent of an output, clear needs to be used.

Parameters

Parameters for MaxWindowTime

Remark: This is a blocking operator. The operator does not write elements before it sees new elements not belonging to the window anymore (similiar to ElementWindow)

Example

In the following we provide some examples and the corresponding output.

As input, we assume the following simple input:

ID	Time	isLast
A    1    false
A    2    false
A    3    false
A    4    true
B    5    false
B    6    false
B    7    false
B    8    false
B    9    false
B    10    true
C    11    false
C    12    false
C    13    false
C    14    false
C    15    false
C    16    false

Preprocessing

With some preprocessing (to allow more examples)

#PARSER PQL
#ADDQUERY
in = CSVFILESOURCE({SCHEMA = [['ID', 'String'],['pos','STARTTIMESTAMP'],['isLast','Boolean']], DELIMITER = '\t', SOURCE = 'source', FILENAME = '${PROJECTPATH}/input.csv'})

map = STATEMAP({EXPRESSIONS = [['isNull(__last_1.ID) OR (__last_1.ID != ID)','newElem']], KEEPINPUT = true}, in)


we will get:

As you can see, the new field newElem is added, that is set to true, if it is the first element or if the element is different that the last element.

Using only a start predicate

win = PREDICATEWINDOW({start = 'newElem', SAMESTARTTIME = true}, map)

will result in:

Here the window is opened for every true evaluation of the start condition and is closed for every evaluation of !start. All elements between these elements are discarded. They do not open a new window. This may not be, what you expect! 

By this, you could keep a window open as long the start condition is true.

Using an end predicate

win = PREDICATEWINDOW({start = 'true', end = 'newElem', SAMESTARTTIME = true}, map)

Here each time a new window opens, the old window is closed, i.e. the same input element is responsible for starting and closing a window. The output contains two windows, one from 1 to 5 and one from 5 to 11. As the final window is not closed, the out starting from C (at 11) is discarded. Use DrainAtDone = true, to allow the output of these elements.

Using a start and an end predicate and keeping the ending element:

win = PREDICATEWINDOW({start = 'newElem', end = 'isLast', KEEPENDINGELEMENT = true, SAMESTARTTIME = true}, map)

will result in:

Remark the difference: This operator blocks only until the end predicate is reached. This works only, if samestarttime is set to true, else e.g. A|4|true|false | META | 1|4 would be A|4|true|false | META | 4|4, this has no validitiy and will not be produced.

When you want to close the window with closing the stream, you could as always use DrainAtDone=true.

win = PREDICATEWINDOW({start = 'newElem', end = 'isLast', KEEPENDINGELEMENT = true, SAMESTARTTIME = true, DRAINATDONE = true}, map)


Using predicate window to simulate standard windows

The predicate window can be used to simulate other windows. Remark: This is only for demonstration purposes as the element and time window are much faster!

All the following examples use this source:

#PARSER PQL
#RUNQUERY
timer = TIMER({
            period = 1000,
            source = 'timer'
          }
        )

ticker := MAP({
              expressions = [['counter()','tick'],['TimeInterval.START','TS']]
            },
            timer
          )

Element Window

#PARSER PQL
#IFSRCNDEF ticker
#INCLUDE ${PROJECTPATH}/TickerSource.qry
#ENDIF

#ADDQUERY
/// Tumbling element window
out = PREDICATEWINDOW({
		  /// Start window with every element
          start = "true",
          /// close window, when size of buffer is 5
          end = "size(__all)==5",
		  /// output as list
          nesting = true
        },
        ticker
      )
#PARSER PQL
#IFSRCNDEF ticker
#INCLUDE ${PROJECTPATH}/TickerSource.qry
#ENDIF

#ADDQUERY
/// Sliding element window
out = PREDICATEWINDOW({
		  /// start window with every element
          start = "true",
          /// If set to false, end element is not part of result
          KEEPENDINGELEMENT = true,
		  /// end predicate is tested before element is added to window
		  /// thats why size(__all) must be one below window size!
          end = "size(__all)=2",
          /// Move window when size of window is 3
          ADVANCEWHEN = 'size(__all)=3',
          /// move by one position
          ADVANCESIZE = 1,
          /// output as list
          nesting = true
        },
        ticker
      )   

Time Window

#PARSER PQL
#IFSRCNDEF ticker
#INCLUDE ${PROJECTPATH}/TickerSource.qry
#ENDIF

#ADDQUERY
/// Tumbling time window
out = PREDICATEWINDOW({
        	start = "true",
        	end = "!isNull(__first.TS) && __first.TS + 10 < TS",
        	nesting = true
        },
        ticker
      )