[ https://issues.apache.org/jira/browse/PIG-4675?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15040806#comment-15040806
]
liyunzhang_intel commented on PIG-4675:
---------------------------------------
[~mohitsabharwal]:
{quote}
this looks like a pretty critical issue. It is potentially affecting many other query plans,
not just FRJoin with Limit, right ?
{quote}
Yes, query plans containing physical operator which has more than 1 predecessor maybe fail
when enable multiquery optimization. Physical operators having more than 1 predecessor are:
FR,MergeJoin, SkewedJoin,Union.
*why the predecessor information was getting wrongly calculated?*
Let's use above query script as example:
{code}
SSN = load '/test/ssn.txt' using PigStorage() as (ssn:long);
SSN_NAME = load '/test/name.txt' using PigStorage() as (ssn:long, name:chararray);
X = JOIN SSN by ssn LEFT OUTER, SSN_NAME by ssn USING 'replicated';
R1 = limit SSN_NAME 10;
store R1 into '/tmp/test1_r1';
store X into '/tmp/test1_x';
{code}
The spark plan: There are 3 spark plans(scope-29,scope-32,scope-36) before multiquery optimization.
After multiquery optimization, there is only 1 spark plan(scope-29). Unnecessary Store(scope-30),Load(scope-31)
and Load(scope-34) are removed. Before multiquery optimization, FR(scope-22) has two predecessors(scope-17
and scope-34). After multiquery optimization, FR(scope-22) has *only 1* predecessor(scope-17).
Actually FR(scope-22) has two predecessors:scope-17 and scope-7. The reason why predecessor
info is wrongly calculated is because we just use [sparkPlan#getPredecessors|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java#L563]
to calculate the predecessor of a physical operator. sparkPlan#getPredecessors only provides
the predecessors in graph when enabling multiquery optimization.
{code}
before multiquery optimization:
scope-29->scope-32 scope-36
scope-32
scope-36
#--------------------------------------------------
# Spark Plan
#--------------------------------------------------
Spark node scope-29
Store(hdfs://zly1.sh.intel.com:8020/tmp/temp-1300541843/tmp1035780073:org.apache.pig.impl.io.InterStorage)
- scope-30
|
|---SSN_NAME: New For Each(false,false)[bag] - scope-7
| |
| Cast[long] - scope-2
| |
| |---Project[bytearray][0] - scope-1
| |
| Cast[chararray] - scope-5
| |
| |---Project[bytearray][1] - scope-4
|
|---SSN_NAME: Load(hdfs://zly1.sh.intel.com:8020/user/root/name.txt:PigStorage) - scope-0--------
Spark node scope-32
R1: Store(hdfs://zly1.sh.intel.com:8020/user/root/test1_r1.out:org.apache.pig.builtin.PigStorage)
- scope-12
|
|---R1: Limit - scope-11
|
|---Load(hdfs://zly1.sh.intel.com:8020/tmp/temp-1300541843/tmp1035780073:org.apache.pig.impl.io.InterStorage)
- scope-31--------
Spark node scope-36
X: Store(hdfs://zly1.sh.intel.com:8020/user/root/test1_x.out:org.apache.pig.builtin.PigStorage)
- scope-28
|
|---X: FRJoin[tuple] - scope-22
| |
| Project[long][0] - scope-20
| |
| Project[long][0] - scope-21
|
|---SSN: New For Each(false)[bag] - scope-17
| | |
| | Cast[long] - scope-15
| | |
| | |---Project[bytearray][0] - scope-14
| |
| |---SSN: Load(hdfs://zly1.sh.intel.com:8020/user/root/ssn.txt:PigStorage) - scope-13
|
|---Load(hdfs://zly1.sh.intel.com:8020/tmp/temp-1300541843/tmp1035780073:org.apache.pig.impl.io.InterStorage)
- scope-34--------
after multiquery optimization:
scope-29
#--------------------------------------------------
# Spark Plan
#--------------------------------------------------
Spark node scope-29
Split - scope-37
| |
| R1: Store(hdfs://zly1.sh.intel.com:8020/user/root/test1_r1.out:org.apache.pig.builtin.PigStorage)
- scope-12
| |
| |---R1: Limit - scope-11
| |
| X: Store(hdfs://zly1.sh.intel.com:8020/user/root/test1_x.out:org.apache.pig.builtin.PigStorage)
- scope-28
| |
| |---X: FRJoin[tuple] - scope-22
| | |
| | Project[long][0] - scope-20
| | |
| | Project[long][0] - scope-21
| |
| |---SSN: New For Each(false)[bag] - scope-17
| | |
| | Cast[long] - scope-15
| | |
| | |---Project[bytearray][0] - scope-14
| |
| |---SSN: Load(hdfs://zly1.sh.intel.com:8020/user/root/ssn.txt:PigStorage) - scope-13
|
|---SSN_NAME: New For Each(false,false)[bag] - scope-7
| |
| Cast[long] - scope-2
| |
| |---Project[bytearray][0] - scope-1
| |
| Cast[chararray] - scope-5
| |
| |---Project[bytearray][1] - scope-4
|
|---SSN_NAME: Load(hdfs://zly1.sh.intel.com:8020/user/root/name.txt:PigStorage) - scope-0--------
{code}
*The approach to fix the problem*
After optimization, the predecessor of scope-22 should be scope-17 and scope-7. We can store
the relationship(from scope-7 to scope-22) in MultiQueryOptimize#visitSparkOp when removing
unnecessary Store(scope-30) and Load(scope-34) in sparkOperator#multiQueryOptimizeConnectionMap(created
in PIG-4675_1.patch). Then we can use sparkPlan#getPredecessor and relationshipMap(sparkOperator#multiQueryOptimizeConnectionMap)
to calculate the predecessors of scope-22.
> FR+Limit case fails when enable MultiQuery because the predecessor information is wrongly
calculated in current code.
> ---------------------------------------------------------------------------------------------------------------------
>
> Key: PIG-4675
> URL: https://issues.apache.org/jira/browse/PIG-4675
> Project: Pig
> Issue Type: Sub-task
> Components: spark
> Reporter: Peter Lin
> Assignee: liyunzhang_intel
> Fix For: spark-branch
>
> Attachments: PIG-4675_1.patch, name.txt, ssn.txt, test.pig
>
>
> We are testing the spark branch pig recently with mapr3 and spark 1.5. It turns out if
we use more than 1 store command in the pig script will have exception from the second store
command.
> SSN = load '/test/ssn.txt' using PigStorage() as (ssn:long);
> SSN_NAME = load '/test/name.txt' using PigStorage() as (ssn:long, name:chararray);
> X = JOIN SSN by ssn LEFT OUTER, SSN_NAME by ssn USING 'replicated';
> R1 = limit SSN_NAME 10;
> store R1 into '/tmp/test1_r1';
> store X into '/tmp/test1_x';
> Exception Details:
> 15/09/11 13:37:00 INFO storage.MemoryStore: ensureFreeSpace(114448) called with curMem=359237,
maxMem=503379394
> 15/09/11 13:37:00 INFO storage.MemoryStore: Block broadcast_2 stored as values in memory
(estimated size 111.8 KB, free 479.6 MB)
> 15/09/11 13:37:00 INFO storage.MemoryStore: ensureFreeSpace(32569) called with curMem=473685,
maxMem=503379394
> 15/09/11 13:37:00 INFO storage.MemoryStore: Block broadcast_2_piece0 stored as bytes
in memory (estimated size 31.8 KB, free 479.6 MB)
> 15/09/11 13:37:00 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on
10.51.2.82:55960 (size: 31.8 KB, free: 479.9 MB)
> 15/09/11 13:37:00 INFO spark.SparkContext: Created broadcast 2 from newAPIHadoopRDD at
LoadConverter.java:88
> 15/09/11 13:37:00 WARN util.ClosureCleaner: Expected a closure; got org.apache.pig.backend.hadoop.executionengine.spark.converter.LoadConverter$ToTupleFunction
> 15/09/11 13:37:00 INFO spark.SparkLauncher: Converting operator POForEach (Name: SSN:
New For Each(false)[bag] - scope-17 Operator Key: scope-17)
> 15/09/11 13:37:00 INFO spark.SparkLauncher: Converting operator POFRJoin (Name: X: FRJoin[tuple]
- scope-22 Operator Key: scope-22)
> 15/09/11 13:37:00 ERROR spark.SparkLauncher: throw exception in sparkOperToRDD:
> java.lang.RuntimeException: Should have greater than1 predecessors for class org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin.
Got : 1
> at org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil.assertPredecessorSizeGreaterThan(SparkUtil.java:93)
> at org.apache.pig.backend.hadoop.executionengine.spark.converter.FRJoinConverter.convert(FRJoinConverter.java:55)
> at org.apache.pig.backend.hadoop.executionengine.spark.converter.FRJoinConverter.convert(FRJoinConverter.java:46)
> at org.apache.pig.backend.hadoop.executionengine.spark.SparkLauncher.physicalToRDD(SparkLauncher.java:633)
> at org.apache.pig.backend.hadoop.executionengine.spark.SparkLauncher.physicalToRDD(SparkLauncher.java:600)
> at org.apache.pig.backend.hadoop.executionengine.spark.SparkLauncher.physicalToRDD(SparkLauncher.java:621)
> at org.apache.pig.backend.hadoop.executionengine.spark.SparkLauncher.sparkOperToRDD(SparkLauncher.java:552)
> at org.apache.pig.backend.hadoop.executionengine.spark.SparkLauncher.sparkPlanToRDD(SparkLauncher.java:501)
> at org.apache.pig.backend.hadoop.executionengine.spark.SparkLauncher.launchPig(SparkLauncher.java:204)
> at org.apache.pig.backend.hadoop.executionengine.HExecutionEngine.launchPig(HExecutionEngine.java:301)
> at org.apache.pig.PigServer.launchPlan(PigServer.java:1390)
> at org.apache.pig.PigServer.executeCompiledLogicalPlan(PigServer.java:1375)
> at org.apache.pig.PigServer.execute(PigServer.java:1364)
> at org.apache.pig.PigServer.executeBatch(PigServer.java:415)
> at org.apache.pig.PigServer.executeBatch(PigServer.java:398)
> at org.apache.pig.tools.grunt.GruntParser.executeBatch(GruntParser.java:171)
> at org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:234)
> at org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:205)
> at org.apache.pig.tools.grunt.Grunt.exec(Grunt.java:81)
> at org.apache.pig.Main.run(Main.java:624)
> at org.apache.pig.Main.main(Main.java:170)
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
|