Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

The predicate window opens and closes the window regarding a start and optional an end condition.

Parameter

  • start: The start condition for a predicate window. If the condition evaluates to true, the windows is opened until the end predicate evaluates to true (or if not given the start predicate evaluates to false). Note, that all elements that are not inside a window are send to ouput port 1
  • end: The end condition for a predicate window. The tuple for which this condition is evaluated to true is only part of the result, if keepEndingElement is set to true!
  • sameStartTime: For predicate windows: If set to true, all produced elements get the same start timestamp
  • size: The maximum size of the window. Can be either a single number or a pair of a number and a time unit. Possible values for the unit are one of TimeUnit like SECONDS, NANOSECODS etc. - default time is the base time of the stream (typically milliseconds)
  • keepEndingElement: Typically, the object that fulfils the end condition will not be part of the result. If setting this attribute to true, the element will be part

  • partition: Evaluate the predicates on partitioned defined by different values of this attribute (similar to group by in aggreation)

Remark: This is a blocking operator. The operator does not write elements before it sees new elements not belonging to the window anymore (similiar to ElementWindow)

Example


In the following we provide some examples and the corresponding output.

As input, we assume the following simple input:

Code Block
ID	Time	isLast
A    1    false
A    2    false
A    3    false
A    4    true
B    5    false
B    6    false
B    7    false
B    8    false
B    9    false
B    10    true
C    11    false
C    12    false
C    13    false
C    14    false
C    15    false
C    16    false

Preprocessing

With some preprocessing we will get:


Using only a start predicate


Code Block
#PARSER PQL
#ADDQUERY
in = CSVFILESOURCE({SCHEMA = [['ID', 'String'],['pos','STARTTIMESTAMP'],['isLast','Boolean']], DELIMITER = '\t', SOURCE = 'source', FILENAME = '${PROJECTPATH}/input.csv'})

map = STATEMAP({EXPRESSIONS = [['isNull(__last_1.ID) OR (__last_1.ID != ID)','newElem']], KEEPINPUT = true}, in)

win = PREDICATEWINDOW({start = 'newElem', SAMESTARTTIME = true}, map)

will result in:

Code Block
A|1|false|true | META | 1|2
B|5|false|true | META | 5|6
C|11|false|true | META | 11|12

Here the window is opened for every true evaluation of the start condition and is closed for every evaluation of !start. All elements between these elements are discarded. They do not open a new window.

Using a start and an end predicate

Code Block
#PARSER PQL
#ADDQUERY
in = CSVFILESOURCE({SCHEMA = [['ID', 'String'],['pos','STARTTIMESTAMP'],['isLast','Boolean']], DELIMITER = '\t', SOURCE = 'source', FILENAME = '${PROJECTPATH}/input.csv'})

map = STATEMAP({EXPRESSIONS = [['isNull(__last_1.ID) OR (__last_1.ID != ID)','newElem']], KEEPINPUT = true}, in)

win = PREDICATEWINDOW({start = 'newElem', end = 'newElem', SAMESTARTTIME = true}, map)
Code Block
A|1|false|true | META | 1|5
A|2|false|false | META | 1|5
A|3|false|false | META | 1|5
A|4|true|false | META | 1|5
B|5|false|true | META | 5|11
B|6|false|false | META | 5|11
B|7|false|false | META | 5|11
B|8|false|false | META | 5|11
B|9|false|false | META | 5|11
B|10|true|false | META | 5|11
C|11|false|true | META | 11|17
C|12|false|false | META | 11|17
C|13|false|false | META | 11|17
C|14|false|false | META | 11|17
C|15|false|false | META | 11|17
C|16|false|false | META | 11|17

Here each time a new window opens, the old window is closed, i.e. the same input element is responsible for starting and closing a window.


Using a start and an end predicate and keeping the ending element:

Code Block
#PARSER PQL
#ADDQUERY
in = CSVFILESOURCE({SCHEMA = [['ID', 'String'],['pos','STARTTIMESTAMP'],['isLast','Boolean']], DELIMITER = '\t', SOURCE = 'source', FILENAME = '${PROJECTPATH}/input.csv'})

map = STATEMAP({EXPRESSIONS = [['isNull(__last_1.ID) OR (__last_1.ID != ID)','newElem']], KEEPINPUT = true}, in)

win = PREDICATEWINDOW({start = 'newElem', end = 'isLast', KEEPENDINGELEMENT = true, SAMESTARTTIME = true}, map)

will result in:

Code Block
A|1|false|true | META | 1|4
A|2|false|false | META | 1|4
A|3|false|false | META | 1|4
A|4|true|false | META | 1|4
B|5|false|true | META | 5|10
B|6|false|false | META | 5|10
B|7|false|false | META | 5|10
B|8|false|false | META | 5|10
B|9|false|false | META | 5|10
B|10|true|false | META | 5|10
C|11|false|true | META | 11|17
C|12|false|false | META | 11|17
C|13|false|false | META | 11|17
C|14|false|false | META | 11|17
C|15|false|false | META | 11|17
C|16|false|false | META | 11|17

Remark the difference: This operator blocks only until the end predicate is reached. This works only, if samestarttime is set to true, else e.g. A|4|true|false | META | 1|4 would be A|4|true|false | META | 4|4, this has no validitiy and will not be produced.