The fragmentation mechanism in Odysseus

The fragmentation mechanism is part of the Peer-Feature and can increase the performance of a distributed query processing in a Peer-to-Peer network of Odysseus instances. Currently, the primary horizontal fragmentation (round robin, hash and range) is provided. Figure 1 shows a simplified operator graph. All operators between sources and sinks are replicated several times. A Fragment operator splits the data stream into several, disjunctive partial streams (fragments). Therefore, each replica gets just a fraction of the complete data stream for processing and each replica produces a different result stream. To merge the different result streams, a Union operator is inserted.

A simplified operator graph for fragmentation.

 

Typically, fragments are to be processed on different peers in order to reduce the load for each peer. A normal, not simplified operator graph for the same query as in figure 1 is shown in figure 2. Here, special JxtaSender and JxtaReceiver operators handle the transport of data stream elements from one peer to another.

A normal, not simplified operator graph for fragmentation.

 

A query partitioning provides the basis for the fragmentation mechanism. The partitioning of a query means the creation of disjunctive partial queries with the objective to determine those operators, which shall be executed on the same peer. In figure 1 and figure 2 a partition strategy was used, which creates one single partial query with all operators.


 

Due to the characteristics of different operators, not every operator can be part of a fragment. By no means exhaustive those operators are ElementWindow, Aggregate, FastMedian, and TupleAggregate. The fragmentation mechanism will exclude those operators. More concrete, the data streams will be merged before them and split again after them.


 

Note that a hash fragmentation can help to use aggregations within fragments. It will be explained below.

The activation of fragmentation via Odysseus Script

The user can enable the fragmentation mechanism via Odysseus Script. Listing 1 shows the query definition in PQL (with Odysseus Script) for the query shown in figure 2. At first, the query distribution has to be enabled. After that, a series of distribution commands follow: The partition strategy defines the partial queries as described above (QueryCloud creates one partial query containing all operators). The modification command enables the fragmentation (concrete the round robin fragmentation). The first additional argument is the source, which data stream shall be fragmented, and the second argument is the degree of fragmentation (how many fragments). The last distribution command (allocation) defines which peer executes which partial query (here, roundrobin is a simple, well known strategy). The PQL query itself has not to be changed to enable fragmentation (e.g., no need to insert a Fragment operator manually).

Odysseus Script example for fragmentation.
#PARSER PQL

#CONFIG distribute true


#PEER_PARTITION QUERYCLOUD

#PEER_MODIFICATION FRAGMENTATION_HORIZONTAL_ROUNDROBIN nexmark:bid 2

#PEER_ALLOCATE ROUNDROBIN

	

#ADDQUERY	


proj = PROJECT({

		  attributes=['auction', 'price']

		}, nexmark:bid)

max = AGGREGATE({

		  aggregations=[['max', 'price', 'maxPrice']],

		  group_by=['auction']

		}, proj)

out = SINK({

		  Sink='MaxPrice'

		}, max) 

 

The round robin fragmentation

 

The usage of the round robin fragmentation is already shown in listing 1 and the routing algorithm is outport = k mod n with k as the current element counting and n as the number of fragments.

 

The hash fragmentation

 

The usage of the hash fragmentation is shown in listing 2.

Odysseus Script (short) example for hash fragmentation.
#PEER_MODIFICATION FRAGMENTATION_HORIZONTAL_HASH nexmark:bid 2 auction 

 

The shown line replaces the corresponding line in listing 1. As an optional additional argument a hash key consisting of one or more attributes can be specified (e.g., auction). The routing algorithm is outport = h mod n with h as the hash code generated from the hash key and n as the number of fragments. If no hash keys are given, all attributes are considered for h.


 

The hash fragmentation with the usage of hash keys can enable aggregations (AggregateFastMedian, and TupleAggregate) to be part of fragments, if the grouping attributes are used as hash key.

 

The range fragmentation

 

The usage of the range fragmentation is shown in listing 3.


Odysseus Script (short) example for range fragmentation.
#PEER_MODIFICATION FRAGMENTATION_HORIZONTAL_RANGE nexmark:bid.price 50 

 

The shown line replaces the corresponding line in listing 1. In a different way from the other fragmentation algorithms, the given source of fragmentation must be a full quantified attribute. It determines the attribute, for which ranges shall be used. All other arguments define those ranges as lower bounds and the number of these lower bounds + 1 is the number of fragments. For example, "nexmark:bid.price 50" means to build ranges for the attribute price. The only lower bound is 50 resulting in two fragments. The first fragment is for all elements, having price >= 50. All other elements are routed to the second fragment.

 

Partial fragmentation

 

Besides the fragmentation of a complete query (more exactly of one source stream), the user can decide to fragment just a substream. Listing 4 shows the query definition in PQL. It's the same query as in Listing 1 with the exception that the first argument for the fragmentation mechanism has changed. It now defines the start and end point of the fragmentation (the fragmentation will begin after the start and end before the end). Values for start and end can either be a source name (e.g., nexmark:bid; only allowed for start) or a unique ID of an operator.

Odysseus Script example for partial fragmentation.
#PARSER PQL

#CONFIG distribute true

#PEER_PARTITION QUERYCLOUD

#PEER_MODIFICATION FRAGMENTATION_HORIZONTAL_ROUNDROBIN [start,end] 2

#PEER_ALLOCATE ROUNDROBIN

	

#ADDQUERY	

proj = PROJECT({

		  attributes=['auction', 'price'],

		  ID = 'start'

		}, nexmark:bid)

max = AGGREGATE({

		  aggregations=[['max', 'price', 'maxPrice']],

		  group_by=['auction']

		}, proj)

out = SINK({

		  Sink='MaxPrice',

		  ID = 'end'

		}, max) 


  • No labels