pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Apache Wiki <wikidi...@apache.org>
Subject [Pig Wiki] Update of "PigSkewedJoinSpec" by SriranjanManjunath
Date Thu, 07 May 2009 19:43:57 GMT
Dear Wiki user,

You have subscribed to a wiki page or wiki category on "Pig Wiki" for change notification.

The following page has been changed by SriranjanManjunath:
http://wiki.apache.org/pig/PigSkewedJoinSpec

New page:
= Skewed Join =
[[Anchor(Intro)]]
== Introduction ==

Parallel joins are vulnerable to the presence of skew in the underlying data. If the underlying
data is sufficiently skewed, load imbalances will swamp any of the parallelism gains (1).
In order to counteract this problem, skewed join computes a histogram of the key space and
uses this data to allocate reducers for a given key. Skewed join does not place a restriction
on the size of the input tables. It accomplishes this by splitting one of the input table
on the join predicate and streaming the other table.
[[Anchor(Use_cases)]]
== Use cases ==

Skewed join can be used when the underlying data is sufficiently skewed and the user needs
a finer control over the allocation of reducers to counteract the skew. It should also be
used when the tables are too large to fit in memory.

{{{
big = LOAD 'big_data' AS (b1,b2,b3);

massive = LOAD 'massive_data' AS (m1,m2,m3);

C = JOIN big BY b1, massive BY m1 USING "skewed";
}}}
[[Anchor(Requirements)]]
== Requirements ==

   * Support a 'skewed' condition for the join command - Modify Join operator to have a "skewed"
option.
   * Handle considerably large skew in the input data efficiently
   * Join tables whose keys are too big to fit in memory
[[Anchor(Implementation)]]
== Implementation ==

Skewed join translates into two map/reduce jobs - Sample and Join. The first job samples the
input records and computes a histogram of the underlying key space. The second map/reduce
job partitions the input table and performs a join on the predicate. In order to join the
two tables, one of the tables is partitioned and other is streamed to the map tasks. The map
task of this job uses the =pig.quantiles= file to determine the number of reducers per key.
It then sends the key to each of the reducers in a round robin fashion. Skewed joins happen
in the reduce phase. 

%ATTACHURL%/Slide1.jpg
[[Anchor(Sampler_phase)]]
=== Sampler phase ===
If the underlying data is sufficiently skewed, load imbalances will result in a few reducers
getting a lot of keys. As a first task, the sampler creates a histogram of the key distribution
and stores it in the =pig.keydist= file. This key distribution will be used to allocate the
right number of reducers for a key. For the table which is partitioned, the partitioner uses
the key distribution to copy the output to the reducer buffer regions in a round robin fashion.
For the table which is streamed, the mapper task uses the =pig.keydist= file to copy the data
to each of the reduce partitions. 

As a first stab at the implementation, we will be using the uniform random sampler used by
Order BY. The sampler currently does not output the key distribution. It will be modified
to support the same.
[[Anchor(Sort_phase)]]
=== Sort phase ===
The keys are sorted based on the input predicate.
[[Anchor(Join_phase)]]
=== Join Phase ===
Skewed join happens in the reduce phase. As a convention, the first table in the join command
is partitioned and sent to the various reducers. Partitioning allows us to support massive
tables without having to worry about the memory limitations. The partitioner is overridden
to send the data in a round robin fashion to each of the reducers associated with a key. The
partitioner obtains the reducer information from the key distribution file. To counteract
the issues with reducer starvation (i.e. the keys that require more than 1 reducer are granted
the reducers whereas the other keys are starved for the reducers), the user is allowed to
set a config parameter pig.mapreduce.skewedjoin.uniqreducers. The value is a percentage of
unique reducers the partitioner should use. For ex: if the value is 90, 10% of the total reducers
will be used for highly skewed data.

For the streaming table, since more than one reducer can be associated with a key, the streamed
table records (that match the key) needs to be copied over to each of these reducers. The
mapper function uses the key distribution in =pig.keydist= file to copy the records over to
each of the partition. It accomplishes this be inserting a PRop to the logical plan. The PRop
sets a partition index to each of the key/value pair which is then used by the partitioner
to send the pair to the right reducer.

[[Anchor(PRop)]]
==== Partition Rearrange operator ====
The partition rearrange operator (PRop) is an overloaded version of the local rearrange operator.
Similar to local rearrange, it takes an input tuple and outputs a key/value pair with the
tuple being the value. PRop however outputs the reducer index along with the tuple. The reducer
index is represented as a 1 byte field. This index is used by the partitioner to copy the
streaming input record to the multiple reducers. 

[[Anchor(number_of_reducers)]]
== Determining the number of reducers per key ==
The number of reducers for a key is obtained from the key distribution file. Along with the
distribution, the sampler estimates the number of reducers needed for a key by calculating
the number of records that fit in a reducer. It computes this by estimating the size of the
sample and the amount of heap available to the jvm for the join operation. The amount of heap
is given as a config parameter pig.mapred.skewedjoin.heapsize by the user. Knowing the number
of records per reducer helps minimize disk spillage.

[[Anchor(3_way_join)]]
== Handling 3-way joins ==
Currently we do not support more than two tables for skewed join. Specifying 3+ way joins
will fail validation. For such joins, we rely on the user to break them up into 2 way joins.


[[Anchor(Implementation_stages)]]
== Implementation stages ==

The implementation of skewed join is split into two phases:
   * In the first phase the skewed join uses the order by sampling to compute a histogram
of the records. It then relies on user configs to pass the intermediate keys to the right
reducers.
   * In the second phase the current uniform random sampling used by order by will be replaced
by a block level sampler which will avoid the problem of over-sampling the data for large
inputs.

[[Anchor(References)]]
== References ==
   (1) "Practical Skew Handling in Parallel Joins" - David J. Dewitt, Jeffrey F. Naughton,
Donovan A. Schneider, S. Seshadri

Mime
View raw message