You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 8 Next »

The predicate window opens and closes the window regarding a start and optional an end condition.

Parameter

  • start: The start condition for a predicate window. If the condition evaluates to true, the windows is opened until the end predicate evaluates to true (or if not given the start predicate evaluates to false). Note, that all elements that are not inside a window are send to ouput port 1
  • end: The end condition for a predicate window. The tuple for which this condition is evaluated to true is only part of the result, if keepEndingElement is set to true!
  • sameStartTime: For predicate windows: If set to true, all produced elements get the same start timestamp
  • size: The maximum size of the window. Can be either a single number or a pair of a number and a time unit. Possible values for the unit are one of TimeUnit like SECONDS, NANOSECODS etc. - default time is the base time of the stream (typically milliseconds)
  • keepEndingElement: Typically, the object that fulfils the end condition will not be part of the result. If setting this attribute to true, the element will be part

  • partition: Evaluate the predicates on partitioned defined by different values of this attribute (similar to group by in aggreation)

  • closeWindowWithHeartbeat: if true, the window is closed when a heartbeat is received. Take a look at the session window to see how it works.

Remark: This is a blocking operator. The operator does not write elements before it sees new elements not belonging to the window anymore (similiar to ElementWindow)

Example


In the following we provide some examples and the corresponding output.

As input, we assume the following simple input:

ID	Time	isLast
A    1    false
A    2    false
A    3    false
A    4    true
B    5    false
B    6    false
B    7    false
B    8    false
B    9    false
B    10    true
C    11    false
C    12    false
C    13    false
C    14    false
C    15    false
C    16    false

Preprocessing

With some preprocessing

#PARSER PQL
#ADDQUERY
in = CSVFILESOURCE({SCHEMA = [['ID', 'String'],['pos','STARTTIMESTAMP'],['isLast','Boolean']], DELIMITER = '\t', SOURCE = 'source', FILENAME = '${PROJECTPATH}/input.csv'})

map = STATEMAP({EXPRESSIONS = [['isNull(__last_1.ID) OR (__last_1.ID != ID)','newElem']], KEEPINPUT = true}, in)


we will get:

ID|TIME|ISLAST|NEWELEM
A|1|false|true | META | 1|oo
A|2|false|false | META | 2|oo
A|3|false|false | META | 3|oo
A|4|true|false | META | 4|oo
B|5|false|true | META | 5|oo
B|6|false|false | META | 6|oo
B|7|false|false | META | 7|oo
B|8|false|false | META | 8|oo
B|9|false|false | META | 9|oo
B|10|true|false | META | 10|oo
C|11|false|true | META | 11|oo
C|12|false|false | META | 12|oo
C|13|false|false | META | 13|oo
C|14|false|false | META | 14|oo
C|15|false|false | META | 15|oo
C|16|false|false | META | 16|oo

Using only a start predicate

win = PREDICATEWINDOW({start = 'newElem', SAMESTARTTIME = true}, map)

will result in:

A|1|false|true | META | 1|2
B|5|false|true | META | 5|6
C|11|false|true | META | 11|12

Here the window is opened for every true evaluation of the start condition and is closed for every evaluation of !start. All elements between these elements are discarded. They do not open a new window.

Using a start and an end predicate

win = PREDICATEWINDOW({start = 'newElem', end = 'newElem', SAMESTARTTIME = true}, map)
A|1|false|true | META | 1|5
A|2|false|false | META | 1|5
A|3|false|false | META | 1|5
A|4|true|false | META | 1|5
B|5|false|true | META | 5|11
B|6|false|false | META | 5|11
B|7|false|false | META | 5|11
B|8|false|false | META | 5|11
B|9|false|false | META | 5|11
B|10|true|false | META | 5|11
C|11|false|true | META | 11|17
C|12|false|false | META | 11|17
C|13|false|false | META | 11|17
C|14|false|false | META | 11|17
C|15|false|false | META | 11|17
C|16|false|false | META | 11|17

Here each time a new window opens, the old window is closed, i.e. the same input element is responsible for starting and closing a window.


Using a start and an end predicate and keeping the ending element:

win = PREDICATEWINDOW({start = 'newElem', end = 'isLast', KEEPENDINGELEMENT = true, SAMESTARTTIME = true}, map)

will result in:

A|1|false|true | META | 1|4
A|2|false|false | META | 1|4
A|3|false|false | META | 1|4
A|4|true|false | META | 1|4
B|5|false|true | META | 5|10
B|6|false|false | META | 5|10
B|7|false|false | META | 5|10
B|8|false|false | META | 5|10
B|9|false|false | META | 5|10
B|10|true|false | META | 5|10
C|11|false|true | META | 11|17
C|12|false|false | META | 11|17
C|13|false|false | META | 11|17
C|14|false|false | META | 11|17
C|15|false|false | META | 11|17
C|16|false|false | META | 11|17

Remark the difference: This operator blocks only until the end predicate is reached. This works only, if samestarttime is set to true, else e.g. A|4|true|false | META | 1|4 would be A|4|true|false | META | 4|4, this has no validitiy and will not be produced.


  • No labels