pig-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "liyunzhang_intel (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (PIG-4675) FR+Limit case fails when enable MultiQuery because the predecessor information is wrongly calculated in current code.
Date Fri, 04 Dec 2015 06:33:11 GMT

    [ https://issues.apache.org/jira/browse/PIG-4675?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15040806#comment-15040806
] 

liyunzhang_intel commented on PIG-4675:
---------------------------------------

[~mohitsabharwal]:
  {quote}
   this looks like a pretty critical issue. It is potentially affecting many other query plans,
not just FRJoin with Limit, right ?
  {quote}
  Yes, query plans containing physical operator which has more than 1 predecessor maybe fail
when enable multiquery optimization.  Physical operators having more than 1 predecessor are:
FR,MergeJoin, SkewedJoin,Union.

*why the predecessor information was getting wrongly calculated?*
Let's use above query script as example:
{code}
SSN = load '/test/ssn.txt' using PigStorage() as (ssn:long);
SSN_NAME = load '/test/name.txt' using PigStorage() as (ssn:long, name:chararray);
X = JOIN SSN by ssn LEFT OUTER, SSN_NAME by ssn USING 'replicated';
R1 = limit SSN_NAME 10;
store R1 into '/tmp/test1_r1'; 
store X into '/tmp/test1_x';
{code}

The spark plan:  There are 3 spark plans(scope-29,scope-32,scope-36) before multiquery optimization.
After multiquery optimization, there is only 1 spark plan(scope-29). Unnecessary Store(scope-30),Load(scope-31)
and Load(scope-34) are removed. Before multiquery optimization, FR(scope-22) has two predecessors(scope-17
and scope-34). After multiquery optimization, FR(scope-22) has *only 1* predecessor(scope-17).
 Actually FR(scope-22) has two predecessors:scope-17 and scope-7.  The reason why predecessor
info is wrongly calculated is because we just use [sparkPlan#getPredecessors|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java#L563]
to calculate the predecessor of a physical operator. sparkPlan#getPredecessors only provides
the predecessors in graph when enabling multiquery optimization.
{code}
before multiquery optimization:
scope-29->scope-32 scope-36
scope-32
scope-36
#--------------------------------------------------
# Spark Plan                                 
#--------------------------------------------------

Spark node scope-29
Store(hdfs://zly1.sh.intel.com:8020/tmp/temp-1300541843/tmp1035780073:org.apache.pig.impl.io.InterStorage)
- scope-30
|
|---SSN_NAME: New For Each(false,false)[bag] - scope-7
    |   |
    |   Cast[long] - scope-2
    |   |
    |   |---Project[bytearray][0] - scope-1
    |   |
    |   Cast[chararray] - scope-5
    |   |
    |   |---Project[bytearray][1] - scope-4
    |
    |---SSN_NAME: Load(hdfs://zly1.sh.intel.com:8020/user/root/name.txt:PigStorage) - scope-0--------

Spark node scope-32
R1: Store(hdfs://zly1.sh.intel.com:8020/user/root/test1_r1.out:org.apache.pig.builtin.PigStorage)
- scope-12
|
|---R1: Limit - scope-11
    |
    |---Load(hdfs://zly1.sh.intel.com:8020/tmp/temp-1300541843/tmp1035780073:org.apache.pig.impl.io.InterStorage)
- scope-31--------

Spark node scope-36
X: Store(hdfs://zly1.sh.intel.com:8020/user/root/test1_x.out:org.apache.pig.builtin.PigStorage)
- scope-28
|
|---X: FRJoin[tuple] - scope-22
    |   |
    |   Project[long][0] - scope-20
    |   |
    |   Project[long][0] - scope-21
    |
    |---SSN: New For Each(false)[bag] - scope-17
    |   |   |
    |   |   Cast[long] - scope-15
    |   |   |
    |   |   |---Project[bytearray][0] - scope-14
    |   |
    |   |---SSN: Load(hdfs://zly1.sh.intel.com:8020/user/root/ssn.txt:PigStorage) - scope-13
    |
    |---Load(hdfs://zly1.sh.intel.com:8020/tmp/temp-1300541843/tmp1035780073:org.apache.pig.impl.io.InterStorage)
- scope-34--------
after multiquery optimization:
scope-29
#--------------------------------------------------
# Spark Plan                                 
#--------------------------------------------------

Spark node scope-29
Split - scope-37
|   |
|   R1: Store(hdfs://zly1.sh.intel.com:8020/user/root/test1_r1.out:org.apache.pig.builtin.PigStorage)
- scope-12
|   |
|   |---R1: Limit - scope-11
|   |
|   X: Store(hdfs://zly1.sh.intel.com:8020/user/root/test1_x.out:org.apache.pig.builtin.PigStorage)
- scope-28
|   |
|   |---X: FRJoin[tuple] - scope-22
|       |   |
|       |   Project[long][0] - scope-20
|       |   |
|       |   Project[long][0] - scope-21
|       |
|       |---SSN: New For Each(false)[bag] - scope-17
|           |   |
|           |   Cast[long] - scope-15
|           |   |
|           |   |---Project[bytearray][0] - scope-14
|           |
|           |---SSN: Load(hdfs://zly1.sh.intel.com:8020/user/root/ssn.txt:PigStorage) - scope-13
|
|---SSN_NAME: New For Each(false,false)[bag] - scope-7
    |   |
    |   Cast[long] - scope-2
    |   |
    |   |---Project[bytearray][0] - scope-1
    |   |
    |   Cast[chararray] - scope-5
    |   |
    |   |---Project[bytearray][1] - scope-4
    |
    |---SSN_NAME: Load(hdfs://zly1.sh.intel.com:8020/user/root/name.txt:PigStorage) - scope-0--------
{code}

*The approach to fix the problem*
After optimization, the predecessor of scope-22 should be scope-17 and scope-7. We can store
the relationship(from scope-7 to scope-22) in MultiQueryOptimize#visitSparkOp when removing
unnecessary Store(scope-30) and Load(scope-34) in sparkOperator#multiQueryOptimizeConnectionMap(created
in PIG-4675_1.patch).  Then we can use sparkPlan#getPredecessor and relationshipMap(sparkOperator#multiQueryOptimizeConnectionMap)
to calculate the predecessors of scope-22.



> FR+Limit case fails when enable MultiQuery because the predecessor information is wrongly
calculated in current code.
> ---------------------------------------------------------------------------------------------------------------------
>
>                 Key: PIG-4675
>                 URL: https://issues.apache.org/jira/browse/PIG-4675
>             Project: Pig
>          Issue Type: Sub-task
>          Components: spark
>            Reporter: Peter Lin
>            Assignee: liyunzhang_intel
>             Fix For: spark-branch
>
>         Attachments: PIG-4675_1.patch, name.txt, ssn.txt, test.pig
>
>
> We are testing the spark branch pig recently with mapr3 and spark 1.5. It turns out if
we use more than 1 store command in the pig script will have exception from the second store
command. 
>  SSN = load '/test/ssn.txt' using PigStorage() as (ssn:long);
>  SSN_NAME = load '/test/name.txt' using PigStorage() as (ssn:long, name:chararray);
>  X = JOIN SSN by ssn LEFT OUTER, SSN_NAME by ssn USING 'replicated';
>  R1 = limit SSN_NAME 10;
>  store R1 into '/tmp/test1_r1'; 
>  store X into '/tmp/test1_x';
> Exception Details:
> 15/09/11 13:37:00 INFO storage.MemoryStore: ensureFreeSpace(114448) called with curMem=359237,
maxMem=503379394
> 15/09/11 13:37:00 INFO storage.MemoryStore: Block broadcast_2 stored as values in memory
(estimated size 111.8 KB, free 479.6 MB)
> 15/09/11 13:37:00 INFO storage.MemoryStore: ensureFreeSpace(32569) called with curMem=473685,
maxMem=503379394
> 15/09/11 13:37:00 INFO storage.MemoryStore: Block broadcast_2_piece0 stored as bytes
in memory (estimated size 31.8 KB, free 479.6 MB)
> 15/09/11 13:37:00 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on
10.51.2.82:55960 (size: 31.8 KB, free: 479.9 MB)
> 15/09/11 13:37:00 INFO spark.SparkContext: Created broadcast 2 from newAPIHadoopRDD at
LoadConverter.java:88
> 15/09/11 13:37:00 WARN util.ClosureCleaner: Expected a closure; got org.apache.pig.backend.hadoop.executionengine.spark.converter.LoadConverter$ToTupleFunction
> 15/09/11 13:37:00 INFO spark.SparkLauncher: Converting operator POForEach (Name: SSN:
New For Each(false)[bag] - scope-17 Operator Key: scope-17)
> 15/09/11 13:37:00 INFO spark.SparkLauncher: Converting operator POFRJoin (Name: X: FRJoin[tuple]
- scope-22 Operator Key: scope-22)
> 15/09/11 13:37:00 ERROR spark.SparkLauncher: throw exception in sparkOperToRDD:
> java.lang.RuntimeException: Should have greater than1 predecessors for class org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin.
Got : 1
>         at org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil.assertPredecessorSizeGreaterThan(SparkUtil.java:93)
>         at org.apache.pig.backend.hadoop.executionengine.spark.converter.FRJoinConverter.convert(FRJoinConverter.java:55)
>         at org.apache.pig.backend.hadoop.executionengine.spark.converter.FRJoinConverter.convert(FRJoinConverter.java:46)
>         at org.apache.pig.backend.hadoop.executionengine.spark.SparkLauncher.physicalToRDD(SparkLauncher.java:633)
>         at org.apache.pig.backend.hadoop.executionengine.spark.SparkLauncher.physicalToRDD(SparkLauncher.java:600)
>         at org.apache.pig.backend.hadoop.executionengine.spark.SparkLauncher.physicalToRDD(SparkLauncher.java:621)
>         at org.apache.pig.backend.hadoop.executionengine.spark.SparkLauncher.sparkOperToRDD(SparkLauncher.java:552)
>         at org.apache.pig.backend.hadoop.executionengine.spark.SparkLauncher.sparkPlanToRDD(SparkLauncher.java:501)
>         at org.apache.pig.backend.hadoop.executionengine.spark.SparkLauncher.launchPig(SparkLauncher.java:204)
>         at org.apache.pig.backend.hadoop.executionengine.HExecutionEngine.launchPig(HExecutionEngine.java:301)
>         at org.apache.pig.PigServer.launchPlan(PigServer.java:1390)
>         at org.apache.pig.PigServer.executeCompiledLogicalPlan(PigServer.java:1375)
>         at org.apache.pig.PigServer.execute(PigServer.java:1364)
>         at org.apache.pig.PigServer.executeBatch(PigServer.java:415)
>         at org.apache.pig.PigServer.executeBatch(PigServer.java:398)
>         at org.apache.pig.tools.grunt.GruntParser.executeBatch(GruntParser.java:171)
>         at org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:234)
>         at org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:205)
>         at org.apache.pig.tools.grunt.Grunt.exec(Grunt.java:81)
>         at org.apache.pig.Main.run(Main.java:624)
>         at org.apache.pig.Main.main(Main.java:170)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message