pig-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Thomas Graves (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (PIG-5029) Optimize sort case when data is skewed
Date Wed, 28 Sep 2016 16:01:20 GMT

    [ https://issues.apache.org/jira/browse/PIG-5029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15530065#comment-15530065

Thomas Graves commented on PIG-5029:

Is the question whether spark supports maps that aren't idempotent?   ie if it runs again
it could generate different output?  If so I think that is a bad assumption even if it did.
 maps should be idempotent because you have things like speculative execution and failure
cases as you have talked about that rely or could rely on this.

Spark does try to be intelligent about rerunning stages, so if you get a fetchFailure of a
map after one of the reducers has already finished, it will go rerun that map and then only
rerun the reducers that haven't finished.  So if the map output is different you could end
up with different results across those reducers.

The external shuffle doesn't necessarily solve this problem. It helps but if the node that
has the map output goes away, it would have to be recomputed. 

I'm assuming in the presentation they are using a salt key such that it generates the same
random if the map task is reran like Koji mentioned. For instance can you use the task id
(not attempt) such that the key is always the same if it re-runs. [~rohini] how is pig handling
skew for MR/TEZ?

> Optimize sort case when data is skewed
> --------------------------------------
>                 Key: PIG-5029
>                 URL: https://issues.apache.org/jira/browse/PIG-5029
>             Project: Pig
>          Issue Type: Sub-task
>          Components: spark
>            Reporter: liyunzhang_intel
>            Assignee: liyunzhang_intel
>             Fix For: spark-branch
>         Attachments: PIG-5029.patch, PIG-5029_2.patch, SkewedData_L9.docx
> In PigMix L9.pig
> {code}
> register $PIGMIX_JAR
> A = load '$HDFS_ROOT/page_views' using org.apache.pig.test.pigmix.udf.PigPerformanceLoader()
>     as (user, action, timespent, query_term, ip_addr, timestamp,
>         estimated_revenue, page_info, page_links);
> B = order A by query_term parallel $PARALLEL;
> store B into '$PIGMIX_OUTPUT/L9out';
> {code}
> The pig physical plan will be changed to spark plan and to spark lineage:
> {code}
> [main] 2016-09-08 01:49:09,844 DEBUG converter.StoreConverter (StoreConverter.java:convert(110))
- RDD lineage: (23) MapPartitionsRDD[8] at map at StoreConverter.java:80 []
>  |   MapPartitionsRDD[7] at mapPartitions at SortConverter.java:58 []
>  |   ShuffledRDD[6] at sortByKey at SortConverter.java:56 []
>  +-(23) MapPartitionsRDD[3] at map at SortConverter.java:49 []
>     |   MapPartitionsRDD[2] at mapPartitions at ForEachConverter.java:64 []
>     |   MapPartitionsRDD[1] at map at LoadConverter.java:127 []
>     |   NewHadoopRDD[0] at newAPIHadoopRDD at LoadConverter.java:102 []
> {code}
> We use [sortByKey|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java#L56]
to implement the sort feature. Although [RangePartitioner|https://github.com/apache/spark/blob/d6dc12ef0146ae409834c78737c116050961f350/core/src/main/scala/org/apache/spark/Partitioner.scala#L106]
is used by RDD.sortByKey and RangePartitiner will sample data and ranges the key roughly into
equal range, the test result(attached  document) shows that one partition will load most keys
and take long time to finish.

This message was sent by Atlassian JIRA

View raw message