pig-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "liyunzhang_intel (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (PIG-4594) Enable "TestMultiQuery" in spark mode
Date Fri, 03 Jul 2015 12:26:04 GMT

    [ https://issues.apache.org/jira/browse/PIG-4594?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14613163#comment-14613163
] 

liyunzhang_intel commented on PIG-4594:
---------------------------------------

[~mohitsabharwal]:
Let 's make an example to explain why need to add PhysicalPlan#forceConnect and OperatorPlan#forceConnect.
 cat bin/testMultiQueryJiraPig983_2.pig 
{code}
a = load './passwd' using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int,
gid:int);
b = filter a by uid < 5;
c = filter a by uid >= 5;
d = join b by uname, c by uname;
{code}

{code}
#--------------------------------------------------
# Spark Plan                                  
#--------------------------------------------------

Spark node scope-67
Store(hdfs://zly1.sh.intel.com:8020/tmp/temp-1052928641/tmp1820070054:org.apache.pig.impl.io.InterStorage)
- scope-68
|
|---a: New For Each(false,false,false,false)[bag] - scope-13
    |   |
    |   Cast[chararray] - scope-2
    |   |
    |   |---Project[bytearray][0] - scope-1
    |   |
    |   Cast[chararray] - scope-5
    |   |
    |   |---Project[bytearray][1] - scope-4
    |   |
    |   Cast[int] - scope-8
    |   |
    |   |---Project[bytearray][2] - scope-7
    |   |
    |   Cast[int] - scope-11
    |   |
    |   |---Project[bytearray][3] - scope-10
    |
    |---a: Load(hdfs://zly1.sh.intel.com:8020/user/root/passwd:PigStorage(':')) - scope-0--------

Spark node scope-73
Store(hdfs://zly1.sh.intel.com:8020/tmp/temp-1052928641/tmp-2075734880:org.apache.pig.impl.io.InterStorage)
- scope-74
|
|---d: New For Each(true,true)[tuple] - scope-37
    |   |
    |   Project[bag][1] - scope-35
    |   |
    |   Project[bag][2] - scope-36
    |
    |---d: Package(Packager)[tuple]{chararray} - scope-30
        |
        |---d: Global Rearrange[tuple] - scope-29
            |
            |---d: Local Rearrange[tuple]{chararray}(false) - scope-31
            |   |   |
            |   |   Project[chararray][0] - scope-32
            |   |
            |   |---b: Filter[bag] - scope-17
            |       |   |
            |       |   Less Than[boolean] - scope-20
            |       |   |
            |       |   |---Project[int][2] - scope-18
            |       |   |
            |       |   |---Constant(5) - scope-19
            |       |
            |       |---Load(hdfs://zly1.sh.intel.com:8020/tmp/temp-1052928641/tmp1820070054:org.apache.pig.impl.io.InterStorage)
- scope-69
            |
            |---d: Local Rearrange[tuple]{chararray}(false) - scope-33
                |   |
                |   Project[chararray][0] - scope-34
                |
                |---c: Filter[bag] - scope-23
                    |   |
                    |   Greater Than or Equal[boolean] - scope-26
                    |   |
                    |   |---Project[int][2] - scope-24
                    |   |
                    |   |---Constant(5) - scope-25
                    |
                    |---Load(hdfs://zly1.sh.intel.com:8020/tmp/temp-1052928641/tmp1820070054:org.apache.pig.impl.io.InterStorage)
- scope-71--------
{code}

If multiquery optimization is enabled, SparkOperator(scope-67) and SparkOperator(scope-73)
need to be merged. remove scope-68, scope-69,scope-71.Connect scope-13 and scope-17 and scope-13
and scope-23.  When you use org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan#connect
not org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan#forceConnect
in org.apache.pig.backend.hadoop.executionengine.spark.optimizer.MultiQueryOptimizerSpark#visitSparkOp.
 


After scope-13 and scope-17 are connected, if we connect scope-13 and scope-13,exception will
be thrown out because scope-13(POForEach#supportsMultipleOutputs() is false. The exception
info:
{code}
org.apache.pig.impl.plan.VisitorException: ERROR 0: org.apache.pig.impl.plan.PlanException:
ERROR 0: Attempt to give operator of type org.apache.p     ig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach
multiple outputs.  This operator does not support multiple outputs.
 628         at org.apache.pig.backend.hadoop.executionengine.spark.optimizer.MultiQueryOptimizerSpark.visitSparkOp(MultiQueryOptimizerSpark.java:131)
{code}

org.apache.pig.backend.hadoop.executionengine.spark.optimizer.MultiQueryOptimizerSpark#visitSparkOp
{code}
 @Override
    public void visitSparkOp(SparkOperator sparkOp) throws VisitorException {
       …..
            if (splittees.size() == 1) {
                // We don't need a POSplit here, we can merge the splittee into spliter
                SparkOperator singleSplitee = splittees.get(0);
                POStore poStore = null;
                PhysicalOperator firstNodeLeaf = sparkOp.physicalPlan.getLeaves().get(0);
                if (firstNodeLeaf instanceof POStore) {
                    poStore = (POStore) firstNodeLeaf;
                }
                PhysicalOperator firstNodeLeafPred = sparkOp.physicalPlan.getPredecessors(firstNodeLeaf).get(0);
                sparkOp.physicalPlan.remove(poStore);  // remove  unnecessary store
                List<PhysicalOperator> firstNodeRoots = singleSplitee.physicalPlan.getRoots();
                sparkOp.physicalPlan.merge(singleSplitee.physicalPlan);
                for (int j = 0; j < firstNodeRoots.size(); j++) {
                    PhysicalOperator firstNodeRoot = firstNodeRoots.get(j);
                    POLoad poLoad = null;
                    if (firstNodeRoot instanceof POLoad && poStore != null) {
                        poLoad = (POLoad) firstNodeRoot;
                        if (poLoad.getLFile().getFileName().equals(poStore.getSFile().getFileName()))
{
                            PhysicalOperator firstNodeRootSucc = sparkOp.physicalPlan.getSuccessors(firstNodeRoot).get(0);
                            sparkOp.physicalPlan.remove(poLoad); // remove unnecessary load
                            sparkOp.physicalPlan.forceConnect(firstNodeLeafPred, firstNodeRootSucc);
// If use sparkOp.physicalPlan.connect(firstNodeLeafPred,firstNodeRootSucc);.it will throw
exception:POForEach multiple outputs.  This operator does not support multiple outputs.
                        }
                    }
                }
                addSubPlanPropertiesToParent(sparkOp, singleSplitee);
                removeSplittee(getPlan(), sparkOp, singleSplitee);
            } else {
            …..
    }
{code}




> Enable "TestMultiQuery" in spark mode
> -------------------------------------
>
>                 Key: PIG-4594
>                 URL: https://issues.apache.org/jira/browse/PIG-4594
>             Project: Pig
>          Issue Type: Sub-task
>          Components: spark
>            Reporter: liyunzhang_intel
>            Assignee: liyunzhang_intel
>             Fix For: spark-branch
>
>         Attachments: PIG-4594.patch, PIG-4594_1.patch
>
>
> in https://builds.apache.org/job/Pig-spark/211/#showFailuresLink,it shows that 
> following unit test failures fail:
> org.apache.pig.test.TestMultiQuery.testMultiQueryJiraPig1068
> org.apache.pig.test.TestMultiQuery.testMultiQueryJiraPig1157
> org.apache.pig.test.TestMultiQuery.testMultiQueryJiraPig1252
> org.apache.pig.test.TestMultiQuery.testMultiQueryJiraPig1438



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message