Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
#PARSER PQL
#PARALLELIZATION (type=INTER_OPERATOR) (degree=4) (buffersize=AUTO) (optimization=true)
#INTEROPERATORPARALLELIZATION aggregateId 2 GLOBAL AggregateMultithreadedTransformationStrategyNonGroupedAggregateTransformationStrategy ShuffleFragmentAO
/// other possible definition of parameters for this keyword
#INTEROPERATORPARALLELIZATION (id=aggregateId) (degree=2) (buffersize=GLOBAL) (strategy=AggregateMultithreadedTransformationStrategyNonGroupedAggregateTransformationStrategy) (fragment=ShuffleFragmentAO)
#RUNQUERY

windowBid = TIMEWINDOW({SIZE = [1, 'MINUTES'],
                  advance = [1, 'SECONDS']
                  }, bid)

windowAuction = TIMEWINDOW({SIZE = [10, 'MINUTES'],
                  advance = [1, 'SECONDS']
                  }, auction)

join = JOIN({ID = 'joinId', PREDICATE = 'bid.bidder == auction.id'}, windowBid, windowAuction)

sum_price_bidder = AGGREGATE({ID = 'aggregateId',
                              aggregations = [
                                ['SUM', 'price', 'sum_price_bidder']
                              ],
                              FASTGROUPING = true                                                                    
                            },
                            join
                          )

Strategies and supported fragmentation types

...

Info

Bold fragmentation types shows the preferred type if nothing is defined.

Definition of Endpoints for parallelization

If the #INTEROPERATORPARALLELIZATION-keyword is used, it is also possible to define a start and enpoint for parallelization. The definition of the start and end point of parallelization is possible with a tuple or triple inside of the id-parameter. The following example shows how to use this feature.

Code Block
#PARSER PQL
#PARALLELIZATION (type=INTER_OPERATOR) (degree=4) (buffersize=AUTO) (optimization=true)
#INTEROPERATORPARALLELIZATION (id=(aggregateId:selectId) (degree=2) (buffersize=GLOBAL) (strategy=NonGroupedAggregateTransformationStrategy) (fragment=ShuffleFragmentAO)
/// avoids an semantic change of the query
#INTEROPERATORPARALLELIZATION (id=(aggregateId:selectId:true) (degree=2) (buffersize=GLOBAL) (strategy=NonGroupedAggregateTransformationStrategy) (fragment=ShuffleFragmentAO)
#RUNQUERY

windowBid = TIMEWINDOW({SIZE = [1, 'MINUTES'],
                  advance = [1, 'SECONDS']
                  }, bid)

sum_price_bidder = AGGREGATE({ID = 'aggregateId',
                              aggregations = [
                                ['SUM', 'price', 'sum_price_bidder']
                              ]                                                                   
                            },
                            windowBid
                          )



In this example the aggregate operator is parallelized, but it is also needed that the following selection is also parallelized. So to do this at first the id of the aggregation is set, after that, seperated with a double quote, the second id of the selection is definied as the endpoint of parallelization. There is the possibility, that the parallelization changes the semantic of the query. If you want to avoid this append :true to the pair of start and end id. By default this option is set to false. If it is enabled, a possible semantic change results in an exception.