Return-Path: X-Original-To: apmail-pig-dev-archive@www.apache.org Delivered-To: apmail-pig-dev-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E73FC1804B for ; Fri, 3 Jul 2015 12:26:04 +0000 (UTC) Received: (qmail 60351 invoked by uid 500); 3 Jul 2015 12:26:04 -0000 Delivered-To: apmail-pig-dev-archive@pig.apache.org Received: (qmail 60302 invoked by uid 500); 3 Jul 2015 12:26:04 -0000 Mailing-List: contact dev-help@pig.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@pig.apache.org Delivered-To: mailing list dev@pig.apache.org Received: (qmail 60291 invoked by uid 500); 3 Jul 2015 12:26:04 -0000 Delivered-To: apmail-hadoop-pig-dev@hadoop.apache.org Received: (qmail 60288 invoked by uid 99); 3 Jul 2015 12:26:04 -0000 Received: from arcas.apache.org (HELO arcas.apache.org) (140.211.11.28) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 03 Jul 2015 12:26:04 +0000 Date: Fri, 3 Jul 2015 12:26:04 +0000 (UTC) From: "liyunzhang_intel (JIRA)" To: pig-dev@hadoop.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (PIG-4594) Enable "TestMultiQuery" in spark mode MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: quoted-printable X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 [ https://issues.apache.org/jira/browse/PIG-4594?page=3Dcom.atlassian.j= ira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=3D146131= 63#comment-14613163 ]=20 liyunzhang_intel commented on PIG-4594: --------------------------------------- [~mohitsabharwal]: Let 's make an example to explain why need to add PhysicalPlan#forceConnect= and OperatorPlan#forceConnect. cat bin/testMultiQueryJiraPig983_2.pig=20 {code} a =3D load './passwd' using PigStorage(':') as (uname:chararray, passwd:cha= rarray, uid:int, gid:int); b =3D filter a by uid < 5; c =3D filter a by uid >=3D 5; d =3D join b by uname, c by uname; {code} {code} #-------------------------------------------------- # Spark Plan =20 #-------------------------------------------------- Spark node scope-67 Store(hdfs://zly1.sh.intel.com:8020/tmp/temp-1052928641/tmp1820070054:org.a= pache.pig.impl.io.InterStorage) - scope-68 | |---a: New For Each(false,false,false,false)[bag] - scope-13 | | | Cast[chararray] - scope-2 | | | |---Project[bytearray][0] - scope-1 | | | Cast[chararray] - scope-5 | | | |---Project[bytearray][1] - scope-4 | | | Cast[int] - scope-8 | | | |---Project[bytearray][2] - scope-7 | | | Cast[int] - scope-11 | | | |---Project[bytearray][3] - scope-10 | |---a: Load(hdfs://zly1.sh.intel.com:8020/user/root/passwd:PigStorage('= :')) - scope-0-------- Spark node scope-73 Store(hdfs://zly1.sh.intel.com:8020/tmp/temp-1052928641/tmp-2075734880:org.= apache.pig.impl.io.InterStorage) - scope-74 | |---d: New For Each(true,true)[tuple] - scope-37 | | | Project[bag][1] - scope-35 | | | Project[bag][2] - scope-36 | |---d: Package(Packager)[tuple]{chararray} - scope-30 | |---d: Global Rearrange[tuple] - scope-29 | |---d: Local Rearrange[tuple]{chararray}(false) - scope-31 | | | | | Project[chararray][0] - scope-32 | | | |---b: Filter[bag] - scope-17 | | | | | Less Than[boolean] - scope-20 | | | | | |---Project[int][2] - scope-18 | | | | | |---Constant(5) - scope-19 | | | |---Load(hdfs://zly1.sh.intel.com:8020/tmp/temp-1052928= 641/tmp1820070054:org.apache.pig.impl.io.InterStorage) - scope-69 | |---d: Local Rearrange[tuple]{chararray}(false) - scope-33 | | | Project[chararray][0] - scope-34 | |---c: Filter[bag] - scope-23 | | | Greater Than or Equal[boolean] - scope-26 | | | |---Project[int][2] - scope-24 | | | |---Constant(5) - scope-25 | |---Load(hdfs://zly1.sh.intel.com:8020/tmp/temp-1052928= 641/tmp1820070054:org.apache.pig.impl.io.InterStorage) - scope-71-------- {code} If multiquery optimization is enabled, SparkOperator(scope-67) and SparkOpe= rator(scope-73) need to be merged. remove scope-68, scope-69,scope-71.Conne= ct scope-13 and scope-17 and scope-13 and scope-23. When you use org.apach= e.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan#conne= ct not org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.Ph= ysicalPlan#forceConnect in org.apache.pig.backend.hadoop.executionengine.sp= ark.optimizer.MultiQueryOptimizerSpark#visitSparkOp. =20 After scope-13 and scope-17 are connected, if we connect scope-13 and scope= -13,exception will be thrown out because scope-13(POForEach#supportsMultipl= eOutputs() is false. The exception info=EF=BC=9A {code} org.apache.pig.impl.plan.VisitorException: ERROR 0: org.apache.pig.impl.pla= n.PlanException: ERROR 0: Attempt to give operator of type org.apache.p = ig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForE= ach multiple outputs. This operator does not support multiple outputs. 628 at org.apache.pig.backend.hadoop.executionengine.spark.optimiz= er.MultiQueryOptimizerSpark.visitSparkOp(MultiQueryOptimizerSpark.java:131) {code} org.apache.pig.backend.hadoop.executionengine.spark.optimizer.MultiQueryOpt= imizerSpark#visitSparkOp {code} @Override public void visitSparkOp(SparkOperator sparkOp) throws VisitorException= { =E2=80=A6.. if (splittees.size() =3D=3D 1) { // We don't need a POSplit here, we can merge the splittee = into spliter SparkOperator singleSplitee =3D splittees.get(0); POStore poStore =3D null; PhysicalOperator firstNodeLeaf =3D sparkOp.physicalPlan.get= Leaves().get(0); if (firstNodeLeaf instanceof POStore) { poStore =3D (POStore) firstNodeLeaf; } PhysicalOperator firstNodeLeafPred =3D sparkOp.physicalPlan= .getPredecessors(firstNodeLeaf).get(0); sparkOp.physicalPlan.remove(poStore); // remove unnecessa= ry store List firstNodeRoots =3D singleSplitee.phy= sicalPlan.getRoots(); sparkOp.physicalPlan.merge(singleSplitee.physicalPlan); for (int j =3D 0; j < firstNodeRoots.size(); j++) { PhysicalOperator firstNodeRoot =3D firstNodeRoots.get(j= ); POLoad poLoad =3D null; if (firstNodeRoot instanceof POLoad && poStore !=3D nul= l) { poLoad =3D (POLoad) firstNodeRoot; if (poLoad.getLFile().getFileName().equals(poStore.= getSFile().getFileName())) { PhysicalOperator firstNodeRootSucc =3D sparkOp.= physicalPlan.getSuccessors(firstNodeRoot).get(0); sparkOp.physicalPlan.remove(poLoad); // remove = unnecessary load sparkOp.physicalPlan.forceConnect(firstNodeLeaf= Pred, firstNodeRootSucc); // If use sparkOp.physicalPlan.connect(firstNodeL= eafPred,firstNodeRootSucc);.it will throw exception:POForEach multiple outp= uts. This operator does not support multiple outputs. } } } addSubPlanPropertiesToParent(sparkOp, singleSplitee); removeSplittee(getPlan(), sparkOp, singleSplitee); } else { =E2=80=A6.. } {code} > Enable "TestMultiQuery" in spark mode > ------------------------------------- > > Key: PIG-4594 > URL: https://issues.apache.org/jira/browse/PIG-4594 > Project: Pig > Issue Type: Sub-task > Components: spark > Reporter: liyunzhang_intel > Assignee: liyunzhang_intel > Fix For: spark-branch > > Attachments: PIG-4594.patch, PIG-4594_1.patch > > > in https://builds.apache.org/job/Pig-spark/211/#showFailuresLink,it shows= that=20 > following unit test failures fail: > org.apache.pig.test.TestMultiQuery.testMultiQueryJiraPig1068 > org.apache.pig.test.TestMultiQuery.testMultiQueryJiraPig1157 > org.apache.pig.test.TestMultiQuery.testMultiQueryJiraPig1252 > org.apache.pig.test.TestMultiQuery.testMultiQueryJiraPig1438 -- This message was sent by Atlassian JIRA (v6.3.4#6332)