Return-Path: Delivered-To: apmail-hadoop-pig-commits-archive@www.apache.org Received: (qmail 29931 invoked from network); 7 May 2009 19:53:55 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 7 May 2009 19:53:55 -0000 Received: (qmail 86506 invoked by uid 500); 7 May 2009 19:53:55 -0000 Delivered-To: apmail-hadoop-pig-commits-archive@hadoop.apache.org Received: (qmail 86489 invoked by uid 500); 7 May 2009 19:53:55 -0000 Mailing-List: contact pig-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: pig-dev@hadoop.apache.org Delivered-To: mailing list pig-commits@hadoop.apache.org Received: (qmail 86480 invoked by uid 500); 7 May 2009 19:53:55 -0000 Delivered-To: apmail-incubator-pig-commits@incubator.apache.org Received: (qmail 86477 invoked by uid 99); 7 May 2009 19:53:55 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 07 May 2009 19:53:55 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.130] (HELO eos.apache.org) (140.211.11.130) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 07 May 2009 19:53:52 +0000 Received: from eos.apache.org (localhost [127.0.0.1]) by eos.apache.org (Postfix) with ESMTP id 12120118AC for ; Thu, 7 May 2009 19:53:31 +0000 (GMT) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: Apache Wiki To: pig-commits@incubator.apache.org Date: Thu, 07 May 2009 19:53:30 -0000 Message-ID: <20090507195330.28466.71017@eos.apache.org> Subject: [Pig Wiki] Update of "PigSkewedJoinSpec" by SriranjanManjunath X-Virus-Checked: Checked by ClamAV on apache.org Dear Wiki user, You have subscribed to a wiki page or wiki category on "Pig Wiki" for change notification. The following page has been changed by SriranjanManjunath: http://wiki.apache.org/pig/PigSkewedJoinSpec ------------------------------------------------------------------------------ [[Anchor(Intro)]] == Introduction == - Parallel joins are vulnerable to the presence of skew in the underlying data. If the underlying data is sufficiently skewed, load imbalances will swamp any of the parallelism gains (1). In order to counteract this problem, skewed join computes a histogram of the key space and uses this data to allocate reducers for a given key. Skewed join does not place a restriction on the size of the input tables. It accomplishes this by splitting one of the input table on the join predicate and streaming the other table. + Parallel joins are vulnerable to the presence of skew in the underlying data. If the underlying data is sufficiently skewed, load imbalances will swamp any of the parallelism gains [#References (1)]. In order to counteract this problem, skewed join computes a histogram of the key space and uses this data to allocate reducers for a given key. Skewed join does not place a restriction on the size of the input tables. It accomplishes this by splitting one of the input table on the join predicate and streaming the other table. [[Anchor(Use_cases)]] == Use cases == @@ -26, +26 @@ [[Anchor(Implementation)]] == Implementation == - Skewed join translates into two map/reduce jobs - Sample and Join. The first job samples the input records and computes a histogram of the underlying key space. The second map/reduce job partitions the input table and performs a join on the predicate. In order to join the two tables, one of the tables is partitioned and other is streamed to the map tasks. The map task of this job uses the =pig.quantiles= file to determine the number of reducers per key. It then sends the key to each of the reducers in a round robin fashion. Skewed joins happen in the reduce phase. + Skewed join translates into two map/reduce jobs - Sample and Join. The first job samples the input records and computes a histogram of the underlying key space. The second map/reduce job partitions the input table and performs a join on the predicate. In order to join the two tables, one of the tables is partitioned and other is streamed to the map tasks. The map task of this job uses the ~-pig.quantiles-~ file to determine the number of reducers per key. It then sends the key to each of the reducers in a round robin fashion. Skewed joins happen in the reduce phase. - %ATTACHURL%/Slide1.jpg + attachment:partition.jpg + [[Anchor(Sampler_phase)]] === Sampler phase === - If the underlying data is sufficiently skewed, load imbalances will result in a few reducers getting a lot of keys. As a first task, the sampler creates a histogram of the key distribution and stores it in the =pig.keydist= file. This key distribution will be used to allocate the right number of reducers for a key. For the table which is partitioned, the partitioner uses the key distribution to copy the output to the reducer buffer regions in a round robin fashion. For the table which is streamed, the mapper task uses the =pig.keydist= file to copy the data to each of the reduce partitions. + If the underlying data is sufficiently skewed, load imbalances will result in a few reducers getting a lot of keys. As a first task, the sampler creates a histogram of the key distribution and stores it in the ~-pig.keydist-~ file. This key distribution will be used to allocate the right number of reducers for a key. For the table which is partitioned, the partitioner uses the key distribution to copy the output to the reducer buffer regions in a round robin fashion. For the table which is streamed, the mapper task uses the ~-pig.keydist-~ file to copy the data to each of the reduce partitions. As a first stab at the implementation, we will be using the uniform random sampler used by Order BY. The sampler currently does not output the key distribution. It will be modified to support the same. [[Anchor(Sort_phase)]] @@ -41, +42 @@ === Join Phase === Skewed join happens in the reduce phase. As a convention, the first table in the join command is partitioned and sent to the various reducers. Partitioning allows us to support massive tables without having to worry about the memory limitations. The partitioner is overridden to send the data in a round robin fashion to each of the reducers associated with a key. The partitioner obtains the reducer information from the key distribution file. To counteract the issues with reducer starvation (i.e. the keys that require more than 1 reducer are granted the reducers whereas the other keys are starved for the reducers), the user is allowed to set a config parameter pig.mapreduce.skewedjoin.uniqreducers. The value is a percentage of unique reducers the partitioner should use. For ex: if the value is 90, 10% of the total reducers will be used for highly skewed data. - For the streaming table, since more than one reducer can be associated with a key, the streamed table records (that match the key) needs to be copied over to each of these reducers. The mapper function uses the key distribution in =pig.keydist= file to copy the records over to each of the partition. It accomplishes this be inserting a PRop to the logical plan. The PRop sets a partition index to each of the key/value pair which is then used by the partitioner to send the pair to the right reducer. + For the streaming table, since more than one reducer can be associated with a key, the streamed table records (that match the key) needs to be copied over to each of these reducers. The mapper function uses the key distribution in ~-pig.keydist-~ file to copy the records over to each of the partition. It accomplishes this be inserting a [#PRop PRop] to the logical plan. The [#PRop PRop] sets a partition index to each of the key/value pair which is then used by the partitioner to send the pair to the right reducer. [[Anchor(PRop)]] ==== Partition Rearrange operator ====