pig-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "liyunzhang_intel (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (PIG-5029) Optimize sort case when data is skewed
Date Tue, 20 Sep 2016 04:55:21 GMT

    [ https://issues.apache.org/jira/browse/PIG-5029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15505394#comment-15505394
] 

liyunzhang_intel edited comment on PIG-5029 at 9/20/16 4:54 AM:
----------------------------------------------------------------

[~knoguchi]:
{quote}
If node goes down after reducer0_attempt0 pulled map output but before reducer1_attempt0 started
pulling, then map output needs to be re-computed.
{quote}
 It seems that Hadoop will delete the output of  before [recover|https://github.com/apache/hadoop/blob/2e1d0ff4e901b8313c8d71869735b94ed8bc40a0/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java#L658]
from a failed task.
 




was (Author: kellyzly):
[~knoguchi]:
{quote}
If node goes down after reducer0_attempt0 pulled map output but before reducer1_attempt0 started
pulling, then map output needs to be re-computed.
{quote}
 Hadoop will not delete the outputs before recovery( create a new task attempt to recompute)?
 



> Optimize sort case when data is skewed
> --------------------------------------
>
>                 Key: PIG-5029
>                 URL: https://issues.apache.org/jira/browse/PIG-5029
>             Project: Pig
>          Issue Type: Sub-task
>          Components: spark
>            Reporter: liyunzhang_intel
>            Assignee: liyunzhang_intel
>             Fix For: spark-branch
>
>         Attachments: PIG-5029.patch, SkewedData_L9.docx
>
>
> In PigMix L9.pig
> {code}
> register $PIGMIX_JAR
> A = load '$HDFS_ROOT/page_views' using org.apache.pig.test.pigmix.udf.PigPerformanceLoader()
>     as (user, action, timespent, query_term, ip_addr, timestamp,
>         estimated_revenue, page_info, page_links);
> B = order A by query_term parallel $PARALLEL;
> store B into '$PIGMIX_OUTPUT/L9out';
> {code}
> The pig physical plan will be changed to spark plan and to spark lineage:
> {code}
> [main] 2016-09-08 01:49:09,844 DEBUG converter.StoreConverter (StoreConverter.java:convert(110))
- RDD lineage: (23) MapPartitionsRDD[8] at map at StoreConverter.java:80 []
>  |   MapPartitionsRDD[7] at mapPartitions at SortConverter.java:58 []
>  |   ShuffledRDD[6] at sortByKey at SortConverter.java:56 []
>  +-(23) MapPartitionsRDD[3] at map at SortConverter.java:49 []
>     |   MapPartitionsRDD[2] at mapPartitions at ForEachConverter.java:64 []
>     |   MapPartitionsRDD[1] at map at LoadConverter.java:127 []
>     |   NewHadoopRDD[0] at newAPIHadoopRDD at LoadConverter.java:102 []
> {code}
> We use [sortByKey|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java#L56]
to implement the sort feature. Although [RangePartitioner|https://github.com/apache/spark/blob/d6dc12ef0146ae409834c78737c116050961f350/core/src/main/scala/org/apache/spark/Partitioner.scala#L106]
is used by RDD.sortByKey and RangePartitiner will sample data and ranges the key roughly into
equal range, the test result(attached  document) shows that one partition will load most keys
and take long time to finish.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message