hadoop-pig-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Ankur (JIRA)" <j...@apache.org>
Subject [jira] Commented: (PIG-871) Improve distribution of keys in reduce phase
Date Wed, 15 Jul 2009 11:17:14 GMT

    [ https://issues.apache.org/jira/browse/PIG-871?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12731389#action_12731389

Ankur commented on PIG-871:

Sorry for coming back late in this. 

The problem mostly manifest itself in case of large datasets which is skewed in favour of
few hundred group keys. The typical approach that user adopts in this case is to generate
an additional group key ( typically an integer) in a random or round-robin fashion, group
on original + generated and increase the parallelism and task heap size to see if that works.

A use case is that user wants to separate out the data into 5 - 10 major buckets (directories)
and for each major bucket he wants to have data separated out into 20 - 100 minor buckets
(again directories).

An example script would look like this

data1 = LOAD 'myfile_1' Using PigStorage() as (f1, f2, f3);
data2 = LOAD 'myfile_2' Using PigStorage() as (f1, f4, f5);
filtered = FILTER data By <some condition>
joined_data = JOIN data1 By f1, data2 By f1 Parallel 100
projected_data = FOREACH joined_data generate f1, GenerateRandomIntUpto(100) as part, f2,f3,f4,f5
SPLIT projected_data INTO major_bucket_1 IF <some_condition>, major_bucket_2 IF <some_condition>,
major_bucket_3 IF <some_condition>;

group1= Group major_bucket_1 By (f1, part) Parallel 100;
result1 = FOREACH group1 generate FLATTEN(group),  FLATTEN($1);
STORE result1 INTO 'major_bucket_1' Using CustomStore();

group2= Group major_bucket_2 By (f2, part) Parallel 100;
result2 = FOREACH group2 generate FLATTEN(group),  FLATTEN($1);
STORE result2 INTO 'major_bucket_2' Using CustomStore();

group3= Group major_bucket_3 By (f3, part) Parallel 100;
result3 = FOREACH group3 generate FLATTEN(group),  FLATTEN($1);
STORE result3 INTO 'major_bucket_3' Using CustomStore();

The expectation here is to get the final result in a directory structure like this



Note:- The minor_bucket directory name is derived from the group key.

> Improve distribution of keys in reduce phase
> --------------------------------------------
>                 Key: PIG-871
>                 URL: https://issues.apache.org/jira/browse/PIG-871
>             Project: Pig
>          Issue Type: Improvement
>    Affects Versions: 0.3.0
>            Reporter: Ankur
> The default hashing scheme used to distribute keys in reduce phase sometimes results
in an uneven distribution of keys resulting in 5 - 10 % of reducers being overloaded with
data. This bottleneck makes the PIG jobs really slow and gives users a bad impression.
> While there is no bullet proof solution to the problem in general, the hashing can certainly
be improved for better distribution. The proposal here is to evaluate and incorporate other
hashing schemes that give high avalanche and more even distribution. We can start by evaluating
MurmurHash which is Apache 2.0 licensed and freely available here - http://www.getopt.org/murmur/MurmurHash.java

This message is automatically generated by JIRA.
You can reply to this email to add a comment to the issue online.

View raw message