Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id DDDC6200CBE for ; Mon, 29 May 2017 17:00:48 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id DCC73160BCE; Mon, 29 May 2017 15:00:48 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id F3004160BE5 for ; Mon, 29 May 2017 17:00:45 +0200 (CEST) Received: (qmail 11589 invoked by uid 500); 29 May 2017 15:00:45 -0000 Mailing-List: contact commits-help@pig.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@pig.apache.org Delivered-To: mailing list commits@pig.apache.org Received: (qmail 10953 invoked by uid 99); 29 May 2017 15:00:44 -0000 Received: from Unknown (HELO svn01-us-west.apache.org) (209.188.14.144) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 29 May 2017 15:00:44 +0000 Received: from svn01-us-west.apache.org (localhost [127.0.0.1]) by svn01-us-west.apache.org (ASF Mail Server at svn01-us-west.apache.org) with ESMTP id 5B93F3A2563 for ; Mon, 29 May 2017 15:00:43 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1796639 [12/12] - in /pig/trunk: ./ bin/ ivy/ src/META-INF/services/ src/docs/src/documentation/content/xdocs/ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executio... Date: Mon, 29 May 2017 15:00:41 -0000 To: commits@pig.apache.org From: szita@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20170529150043.5B93F3A2563@svn01-us-west.apache.org> archived-at: Mon, 29 May 2017 15:00:49 -0000 Modified: pig/trunk/test/org/apache/pig/test/TestPigRunner.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPigRunner.java?rev=1796639&r1=1796638&r2=1796639&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/TestPigRunner.java (original) +++ pig/trunk/test/org/apache/pig/test/TestPigRunner.java Mon May 29 15:00:39 2017 @@ -41,11 +41,13 @@ import org.apache.commons.lang3.ArrayUti import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.Counters; +import org.apache.pig.ExecType; import org.apache.pig.PigConfiguration; import org.apache.pig.PigRunner; import org.apache.pig.PigRunner.ReturnCode; import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration; +import org.apache.pig.backend.hadoop.executionengine.spark.SparkExecType; import org.apache.pig.impl.PigContext; import org.apache.pig.impl.io.FileLocalizer; import org.apache.pig.impl.plan.OperatorPlan; @@ -58,6 +60,7 @@ import org.apache.pig.tools.pigstats.Pig import org.apache.pig.tools.pigstats.PigStatsUtil; import org.apache.pig.tools.pigstats.mapreduce.MRJobStats; import org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil; +import org.apache.pig.tools.pigstats.spark.SparkJobStats; import org.junit.AfterClass; import org.junit.Assume; import org.junit.Before; @@ -207,12 +210,13 @@ public class TestPigRunner { PigStats stats = PigRunner.run(args, new TestNotificationListener(execType)); assertTrue(stats.isSuccessful()); - if (execType.toString().startsWith("tez")) { - assertEquals(1, stats.getNumberJobs()); - assertEquals(stats.getJobGraph().size(), 1); - } else { + if (execType.equals("mapreduce")) { assertEquals(2, stats.getNumberJobs()); assertEquals(stats.getJobGraph().size(), 2); + } else { + // Tez and Spark + assertEquals(1, stats.getNumberJobs()); + assertEquals(stats.getJobGraph().size(), 1); } Configuration conf = ConfigurationUtil.toConfiguration(stats.getPigProperties()); @@ -274,6 +278,10 @@ public class TestPigRunner { assertEquals(stats.getJobGraph().size(), 1); // 5 vertices assertEquals(stats.getJobGraph().getSources().get(0).getPlan().size(), 5); + } else if (execType.equals("spark")) { + // In spark mode,the number of spark job is calculated by the number of POStore. + // 1 POStore generates 1 spark job. + assertEquals(stats.getJobGraph().size(), 1); } else { assertEquals(stats.getJobGraph().size(), 4); } @@ -294,7 +302,12 @@ public class TestPigRunner { // Need to investigate // assertEquals("B", ((JobStats) stats.getJobGraph().getPredecessors( // js).get(0)).getAlias()); + } else if (execType.equals("spark")) { + assertEquals("A,B", ((JobStats) stats.getJobGraph().getSources().get( + 0)).getAlias()); + // TODO: alias is not set for sample-aggregation/partition/sort job. } else { + assertEquals("A", ((JobStats) stats.getJobGraph().getSources().get( 0)).getAlias()); assertEquals("B", ((JobStats) stats.getJobGraph().getPredecessors( @@ -323,7 +336,14 @@ public class TestPigRunner { String[] args = { "-x", execType, PIG_FILE }; PigStats stats = PigRunner.run(args, new TestNotificationListener(execType)); assertTrue(stats.isSuccessful()); - assertTrue(stats.getJobGraph().size() == 1); + if (execType.equals("spark")) { + // In spark mode,the number of spark job is calculated by the number of POStore. + // 2 POStore generates 2 spark jobs. + assertTrue(stats.getJobGraph().size() == 2); + } else { + assertTrue(stats.getJobGraph().size() == 1); + } + // Each output file should include the following: // output: // 1\t2\t3\n @@ -372,7 +392,13 @@ public class TestPigRunner { String[] args = { "-x", execType, PIG_FILE }; PigStats stats = PigRunner.run(args, new TestNotificationListener(execType)); assertTrue(stats.isSuccessful()); - assertEquals(stats.getJobGraph().size(), 1); + if (execType.equals("spark")) { + // In spark mode,the number of spark job is calculated by the number of POStore. + // 2 POStore generates 2 spark jobs. + assertEquals(stats.getJobGraph().size(), 2); + } else { + assertEquals(stats.getJobGraph().size(), 1); + } // Each output file should include the following: // output: @@ -430,11 +456,15 @@ public class TestPigRunner { w.close(); try { - String[] args = { "-x", execType, PIG_FILE }; - PigStats stats = PigRunner.run(args, new TestNotificationListener(execType)); + String[] args = null; + args = new String[]{"-x", execType, PIG_FILE}; + PigStats stats = PigRunner.run(args, new TestNotificationListener(execType)); assertTrue(stats.isSuccessful()); if (Util.isMapredExecType(cluster.getExecType())) { assertEquals(3, stats.getJobGraph().size()); + } else if (Util.isSparkExecType(cluster.getExecType())) { + // One for each store and 3 for join. + assertEquals(4, stats.getJobGraph().size()); } else { assertEquals(1, stats.getJobGraph().size()); } @@ -475,10 +505,14 @@ public class TestPigRunner { }); assertEquals(5, inputStats.get(0).getNumberRecords()); assertEquals(3, inputStats.get(1).getNumberRecords()); - // For mapreduce, since hdfs bytes read includes replicated tables bytes read is wrong // Since Tez does has only one load per job its values are correct + // the result of inputStats in spark mode is also correct if (!Util.isMapredExecType(cluster.getExecType())) { assertEquals(30, inputStats.get(0).getBytes()); + } + + //TODO PIG-5240:Fix TestPigRunner#simpleMultiQueryTest3 in spark mode for wrong inputStats + if (!Util.isMapredExecType(cluster.getExecType()) && !Util.isSparkExecType(cluster.getExecType())) { assertEquals(18, inputStats.get(1).getBytes()); } } finally { @@ -504,17 +538,17 @@ public class TestPigRunner { PigStats stats = PigRunner.run(args, null); Iterator iter = stats.getJobGraph().iterator(); while (iter.hasNext()) { - JobStats js=iter.next(); - if (execType.equals("tez")) { - assertEquals(js.getState().name(), "FAILED"); - } else { - if(js.getState().name().equals("FAILED")) { - List ops=stats.getJobGraph().getSuccessors(js); - for(Operator op : ops ) { - assertEquals(((JobStats)op).getState().toString(), "UNKNOWN"); - } - } - } + JobStats js=iter.next(); + if (execType.equals("mapreduce")) { + if (js.getState().name().equals("FAILED")) { + List ops = stats.getJobGraph().getSuccessors(js); + for (Operator op : ops) { + assertEquals(((JobStats) op).getState().toString(), "UNKNOWN"); + } + } + } else { + assertEquals(js.getState().name(), "FAILED"); + } } } finally { new File(PIG_FILE).delete(); @@ -723,8 +757,14 @@ public class TestPigRunner { PigStats stats = PigRunner.run(args, new TestNotificationListener(execType)); assertTrue(stats.isSuccessful()); - - assertEquals(1, stats.getNumberJobs()); + //In spark mode, one POStore will generate a spark action(spark job). + //In this case, the sparkplan has 1 sparkOperator(after multiquery optimization) but has 2 POStores + //which generate 2 spark actions(spark jobs). + if (execType.equals("spark")) { + assertEquals(2, stats.getNumberJobs()); + } else { + assertEquals(1, stats.getNumberJobs()); + } List outputs = stats.getOutputStats(); assertEquals(2, outputs.size()); for (OutputStats outstats : outputs) { @@ -835,7 +875,14 @@ public class TestPigRunner { assertTrue(!stats.isSuccessful()); assertTrue(stats.getReturnCode() != 0); - assertTrue(stats.getOutputStats().size() == 0); + if (execType.equals("spark")) { + //Currently, even if failed, spark engine will add a failed OutputStats, + // see: SparkPigStats.addFailJobStats() + assertTrue(stats.getOutputStats().size() == 1); + assertTrue(stats.getOutputStats().get(0).isSuccessful() == false); + } else { + assertTrue(stats.getOutputStats().size() == 0); + } } finally { new File(PIG_FILE).delete(); @@ -858,7 +905,14 @@ public class TestPigRunner { assertTrue(!stats.isSuccessful()); assertTrue(stats.getReturnCode() != 0); - assertTrue(stats.getOutputStats().size() == 0); + //Currently, even if failed, spark engine will add a failed OutputStats, + // see: SparkPigStats.addFailJobStats() + if (execType.equals("spark")) { + assertTrue(stats.getOutputStats().size() == 1); + assertTrue(stats.getOutputStats().get(0).isSuccessful() == false); + } else { + assertTrue(stats.getOutputStats().size() == 0); + } } finally { new File(PIG_FILE).delete(); @@ -919,8 +973,14 @@ public class TestPigRunner { PigStats stats = PigRunner.run(args, new TestNotificationListener(execType)); assertTrue(stats.isSuccessful()); - - assertEquals(1, stats.getNumberJobs()); + //In spark mode, one POStore will generate a spark action(spark job). + //In this case, the sparkplan has 1 sparkOperator(after multiquery optimization) but has 2 POStores + //which generate 2 spark actions(spark jobs). + if (execType.equals("spark")) { + assertEquals(2, stats.getNumberJobs()); + } else { + assertEquals(1, stats.getNumberJobs()); + } List outputs = stats.getOutputStats(); assertEquals(2, outputs.size()); for (OutputStats outstats : outputs) { @@ -968,7 +1028,13 @@ public class TestPigRunner { List outputs = stats.getOutputStats(); assertEquals(1, outputs.size()); OutputStats outstats = outputs.get(0); - assertEquals(9, outstats.getNumberRecords()); + //In spark mode, if pig.disable.counter = true, the number of records of the + //output are not calculated. + if (execType.equals("spark")) { + assertEquals(-1, outstats.getNumberRecords()); + } else { + assertEquals(9, outstats.getNumberRecords()); + } } finally { new File(PIG_FILE).delete(); Util.deleteFile(cluster, OUTPUT_FILE); @@ -1008,6 +1074,28 @@ public class TestPigRunner { MRPigStatsUtil.HDFS_BYTES_WRITTEN).getValue()); assertEquals(new File(INPUT_FILE).length(),counter.getGroup(FS_COUNTER_GROUP).getCounterForName( MRPigStatsUtil.HDFS_BYTES_READ).getValue()); + } else if (execType.equals("spark")) { + /** Uncomment code until changes of PIG-4788 are merged to master + //There are 2 spark jobs because of 2 POStore although the spark plan is optimized by multiquery optimization. + List jobs = stats.getJobGraph().getJobList(); + JobStats firstJob = jobs.get(0); + JobStats secondJob = jobs.get(1); + //the hdfs_bytes_read of two spark jobs are same(because the two spark jobs have same poLoad), we only + //use one of those to compare with expected hdfs_bytes_read(30) + //we count the hdfs_bytes_written of the two spark jobs to calculate the total hdfs_bytes_written + long hdfs_bytes_read = 0; + long hdfs_bytes_written = 0; + + hdfs_bytes_read += firstJob.getHadoopCounters().getGroup(SparkJobStats.FS_COUNTER_GROUP).getCounterForName( + MRPigStatsUtil.HDFS_BYTES_READ).getValue(); + hdfs_bytes_written += firstJob.getHadoopCounters().getGroup(SparkJobStats.FS_COUNTER_GROUP).getCounterForName( + MRPigStatsUtil.HDFS_BYTES_WRITTEN).getValue(); + hdfs_bytes_written += secondJob.getHadoopCounters().getGroup(SparkJobStats.FS_COUNTER_GROUP).getCounterForName( + MRPigStatsUtil.HDFS_BYTES_WRITTEN).getValue(); + + assertEquals(30, hdfs_bytes_read); + assertEquals(20, hdfs_bytes_written); + **/ } else { Counters counter= ((MRJobStats)stats.getJobGraph().getSinks().get(0)).getHadoopCounters(); assertEquals(5, counter.getGroup(MRPigStatsUtil.TASK_COUNTER_GROUP).getCounterForName( @@ -1050,8 +1138,12 @@ public class TestPigRunner { PigStats stats = PigRunner.run(args, new TestNotificationListener(execType)); assertTrue(stats.isSuccessful()); - - assertEquals(1, stats.getNumberJobs()); + if (execType.equals("spark")) { + //2 POStore generates 2 spark jobs + assertEquals(2, stats.getNumberJobs()); + } else { + assertEquals(1, stats.getNumberJobs()); + } List outputs = stats.getOutputStats(); assertEquals(2, outputs.size()); if (execType.equals("tez")) { @@ -1072,7 +1164,11 @@ public class TestPigRunner { List inputs = stats.getInputStats(); assertEquals(1, inputs.size()); InputStats instats = inputs.get(0); - assertEquals(5, instats.getNumberRecords()); + if (execType.equals("spark")) { + assertEquals(-1, instats.getNumberRecords()); + } else { + assertEquals(5, instats.getNumberRecords()); + } } finally { new File(PIG_FILE).delete(); Util.deleteFile(cluster, OUTPUT_FILE); Modified: pig/trunk/test/org/apache/pig/test/TestPigServer.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPigServer.java?rev=1796639&r1=1796638&r2=1796639&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/TestPigServer.java (original) +++ pig/trunk/test/org/apache/pig/test/TestPigServer.java Mon May 29 15:00:39 2017 @@ -62,6 +62,7 @@ import org.apache.pig.impl.util.Properti import org.apache.pig.impl.util.Utils; import org.junit.After; import org.junit.AfterClass; +import org.junit.Assume; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -547,9 +548,8 @@ public class TestPigServer { public void testExplainXmlComplex() throws Throwable { // TODO: Explain XML output is not supported in non-MR mode. Remove the // following condition once it's implemented in Tez. - if (cluster.getExecType() != ExecType.MAPREDUCE) { - return; - } + String execType = cluster.getExecType().toString().toLowerCase(); + Assume.assumeTrue("Skip this test for TEZ", Util.isMapredExecType(cluster.getExecType()) || Util.isSparkExecType(cluster.getExecType())); PigServer pig = new PigServer(cluster.getExecType(), properties); pig.registerQuery("a = load 'a' as (site: chararray, count: int, itemCounts: bag { itemCountsTuple: tuple (type: chararray, typeCount: int, f: float, m: map[]) } ) ;") ; pig.registerQuery("b = foreach a generate site, count, FLATTEN(itemCounts);") ; @@ -574,6 +574,55 @@ public class TestPigServer { assertEquals(1, physicalPlan.getLength()); assertTrue(physicalPlan.item(0).getTextContent().contains("Not Supported")); + + if (execType.equals(ExecType.MAPREDUCE.name().toLowerCase())){ + verifyExplainXmlComplexMR(doc); + } else if (execType.equals(MiniGenericCluster.EXECTYPE_SPARK)){ + verifyExplainXmlComplexSpark(doc); + } + + + } + + private void verifyExplainXmlComplexSpark(Document doc) { + NodeList stores = doc.getElementsByTagName("POStore"); + assertEquals(1, stores.getLength()); + + NodeList groups = doc.getElementsByTagName("POJoinGroupSpark"); + assertEquals(2, groups.getLength()); + + Node innerGroup = groups.item(1); + + NodeList groupChildren = innerGroup.getChildNodes(); + + int foreachCount = 0; + int castCount = 0; + int loadCount = 0; + + for (int i = 0; i < groupChildren.getLength(); i++) { + Node node = groupChildren.item(i); + if (node.getNodeName().equals("POForEach")){ + ++foreachCount; + NodeList foreachNodes = node.getChildNodes(); + for (int j = 0; j < foreachNodes.getLength(); j++) { + Node innerNode = foreachNodes.item(j); + if (innerNode.getNodeName().equals("alias")){ + assertEquals("b",innerNode.getTextContent()); + }else if (innerNode.getNodeName().equals("POCast")){ + ++castCount; + }else if (innerNode.getNodeName().equals("POLoad")) { + ++loadCount; + } + } + } + } + + assertEquals(1,foreachCount); + assertEquals(3,castCount); + assertEquals(1,loadCount); + } + + private void verifyExplainXmlComplexMR(Document doc) { //Verify we have two loads and one is temporary NodeList loads = doc.getElementsByTagName("POLoad"); assertEquals(2, loads.getLength()); Modified: pig/trunk/test/org/apache/pig/test/TestPigServerLocal.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPigServerLocal.java?rev=1796639&r1=1796638&r2=1796639&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/TestPigServerLocal.java (original) +++ pig/trunk/test/org/apache/pig/test/TestPigServerLocal.java Mon May 29 15:00:39 2017 @@ -255,6 +255,16 @@ public class TestPigServerLocal { _testSkipParseInRegisterForBatch(false, 8, 4); _testSkipParseInRegisterForBatch(true, 5, 1); _testParseBatchWithScripting(5, 1); + } else if (Util.getLocalTestMode().toString().startsWith("SPARK")) { + // 6 = 4 (Once per registerQuery) + 2 (SortConverter , PigRecordReader) + // 4 (Once per registerQuery) + _testSkipParseInRegisterForBatch(false, 6, 4); + + // 3 = 1 (registerQuery) + 2 (SortConverter, PigRecordReader) + // 1 (registerQuery) + _testSkipParseInRegisterForBatch(true, 3, 1); + + _testParseBatchWithScripting(3, 1); } else { // numTimesInitiated = 10. 4 (once per registerQuery) + 6 (launchPlan->RandomSampleLoader, // InputSizeReducerEstimator, getSplits->RandomSampleLoader, Modified: pig/trunk/test/org/apache/pig/test/TestProjectRange.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestProjectRange.java?rev=1796639&r1=1796638&r2=1796639&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/TestProjectRange.java (original) +++ pig/trunk/test/org/apache/pig/test/TestProjectRange.java Mon May 29 15:00:39 2017 @@ -644,13 +644,14 @@ public class TestProjectRange { Iterator it = pigServer.openIterator("f"); - List expectedRes = - Util.getTuplesFromConstantTupleStrings( - new String[] { - "(10,{(10,20,30,40,50)})", - "(11,{(11,21,31,41,51)})", - }); - Util.checkQueryOutputs(it, expectedRes); + String[] expectedRes = + new String[] { + "(10,{(10,20,30,40,50)})", + "(11,{(11,21,31,41,51)})", + }; + Schema s = pigServer.dumpSchema("f"); + Util.checkQueryOutputs(it, expectedRes,org.apache.pig.newplan.logical.Util.translateSchema(s), + Util.isSparkExecType(cluster.getExecType())); } /** @@ -732,12 +733,14 @@ public class TestProjectRange { Iterator it = pigServer.openIterator("f"); - List expectedRes = - Util.getTuplesFromConstantTupleStrings( - new String[] { - "(1,{(11,21,31,41,51),(10,20,30,40,50)})", - }); - Util.checkQueryOutputs(it, expectedRes); + String[] expectedRes = + new String[] { + "(1,{(11,21,31,41,51),(10,20,30,40,50)})", + }; + Schema s = pigServer.dumpSchema("f"); + Util.checkQueryOutputs(it, expectedRes,org.apache.pig.newplan.logical.Util.translateSchema(s), + Util.isSparkExecType(cluster.getExecType())); + } private LOSort checkNumExpressionPlansForSort(LogicalPlan lp, int numPlans, boolean[] isAsc) { @@ -925,8 +928,8 @@ public class TestProjectRange { " g = group l1 by .. c, l2 by .. c;" ; String expectedSchStr = "grp: (a: int,b: long,c: int)," + - "l1: {t : (a: int,b: long,c: int,d: int,e: int)}," + - "l2: {t : (a: int,b: long,c: int,d: int,e: int)}"; + "l1: {t : (a: int,b: long,c: int,d: int,e: int)}," + + "l2: {t : (a: int,b: long,c: int,d: int,e: int)}"; Schema expectedSch = getCleanedGroupSchema(expectedSchStr); compileAndCompareSchema(expectedSch, query, "g"); @@ -938,14 +941,15 @@ public class TestProjectRange { Util.registerMultiLineQuery(pigServer, query); - List expectedRes = - Util.getTuplesFromConstantTupleStrings( - new String[] { - "((10,20,30),{(10,20,30,40,50)},{(10,20,30,40,50)})", - "((11,21,31),{(11,21,31,41,51)},{(11,21,31,41,51)})", - }); + String[] expectedRes = + new String[] { + "((10,20,30),{(10,20,30,40,50)},{(10,20,30,40,50)})", + "((11,21,31),{(11,21,31,41,51)},{(11,21,31,41,51)})", + }; Iterator it = pigServer.openIterator("g"); - Util.checkQueryOutputs(it, expectedRes); + Schema s = pigServer.dumpSchema("g"); + Util.checkQueryOutputs(it, expectedRes,org.apache.pig.newplan.logical.Util.translateSchema(s), + Util.isSparkExecType(cluster.getExecType())); } /** @@ -1005,14 +1009,15 @@ public class TestProjectRange { Util.registerMultiLineQuery(pigServer, query); - List expectedRes = - Util.getTuplesFromConstantTupleStrings( - new String[] { - "((30,30,40,50),{(10,20,30,40,50)},{(10,20,30,40,50)})", - "((32,31,41,51),{(11,21,31,41,51)},{(11,21,31,41,51)})", - }); + String[] expectedRes = + new String[] { + "((30,30,40,50),{(10,20,30,40,50)},{(10,20,30,40,50)})", + "((32,31,41,51),{(11,21,31,41,51)},{(11,21,31,41,51)})", + }; Iterator it = pigServer.openIterator("g"); - Util.checkQueryOutputs(it, expectedRes); + Schema s = pigServer.dumpSchema("g"); + Util.checkQueryOutputs(it,expectedRes, org.apache.pig.newplan.logical.Util.translateSchema(s), + Util.isSparkExecType(cluster.getExecType())); } @Test @@ -1056,14 +1061,15 @@ public class TestProjectRange { Util.registerMultiLineQuery(pigServer, query); - List expectedRes = - Util.getTuplesFromConstantTupleStrings( - new String[] { - "((30,40,50),{(10,20L,30,40,50)})", - "((31,41,51),{(11,21L,31,41,51)})", - }); + String[] expectedRes = + new String[] { + "((30,40,50),{(10,20L,30,40,50)})", + "((31,41,51),{(11,21L,31,41,51)})", + }; Iterator it = pigServer.openIterator("lim"); - Util.checkQueryOutputsAfterSort(it, expectedRes); + Schema s = pigServer.dumpSchema("lim"); + Util.checkQueryOutputs(it, expectedRes,org.apache.pig.newplan.logical.Util.translateSchema(s), + Util.isSparkExecType(cluster.getExecType())); } @@ -1118,14 +1124,15 @@ public class TestProjectRange { Util.registerMultiLineQuery(pigServer, query); - List expectedRes = - Util.getTuplesFromConstantTupleStrings( - new String[] { - "((30,40,50),{(10,20,30,40,50)})", - "((31,41,51),{(11,21,31,41,51)})", - }); + String[] expectedRes = + new String[] { + "((30,40,50),{(10,20,30,40,50)})", + "((31,41,51),{(11,21,31,41,51)})", + }; Iterator it = pigServer.openIterator("g"); - Util.checkQueryOutputs(it, expectedRes); + Schema s = pigServer.dumpSchema("g"); + Util.checkQueryOutputs(it, expectedRes, org.apache.pig.newplan.logical.Util.translateSchema(s), + Util.isSparkExecType(cluster.getExecType())); } private void setAliasesToNull(Schema schema) { @@ -1157,14 +1164,15 @@ public class TestProjectRange { Util.registerMultiLineQuery(pigServer, query); - List expectedRes = - Util.getTuplesFromConstantTupleStrings( - new String[] { - "(10,20,30,40,50,10,20,30,40,50)", - "(11,21,31,41,51,11,21,31,41,51)", - }); + String[] expectedRes = + new String[] { + "(10,20,30,40,50,10,20,30,40,50)", + "(11,21,31,41,51,11,21,31,41,51)", + }; Iterator it = pigServer.openIterator("j"); - Util.checkQueryOutputs(it, expectedRes); + Schema s = pigServer.dumpSchema("j"); + Util.checkQueryOutputs(it, expectedRes, org.apache.pig.newplan.logical.Util.translateSchema(s), + Util.isSparkExecType(cluster.getExecType())); } @Test @@ -1185,14 +1193,15 @@ public class TestProjectRange { Util.registerMultiLineQuery(pigServer, query); - List expectedRes = - Util.getTuplesFromConstantTupleStrings( - new String[] { - "(10,20,30,40,50,10,20,30,40,50)", - "(11,21,31,41,51,11,21,31,41,51)", - }); + String[] expectedRes = + new String[] { + "(10,20,30,40,50,10,20,30,40,50)", + "(11,21,31,41,51,11,21,31,41,51)", + }; Iterator it = pigServer.openIterator("j"); - Util.checkQueryOutputs(it, expectedRes); + Schema s = pigServer.dumpSchema("j"); + Util.checkQueryOutputs(it, expectedRes, org.apache.pig.newplan.logical.Util.translateSchema(s), + Util.isSparkExecType(cluster.getExecType())); } @Test @@ -1204,7 +1213,7 @@ public class TestProjectRange { " l2 = load '" + INP_FILE_5FIELDS + "';" + " g = cogroup l1 by ($0 .. ), l2 by ($0 .. );"; Util.checkExceptionMessage(query, "g", "Cogroup/Group by '*' or 'x..' " + - "(range of columns to the end) " + + "(range of columns to the end) " + "is only allowed if the input has a schema"); } Modified: pig/trunk/test/org/apache/pig/test/TestPruneColumn.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPruneColumn.java?rev=1796639&r1=1796638&r2=1796639&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/TestPruneColumn.java (original) +++ pig/trunk/test/org/apache/pig/test/TestPruneColumn.java Mon May 29 15:00:39 2017 @@ -554,27 +554,9 @@ public class TestPruneColumn { pigServer.registerQuery("C = cogroup A by $1, B by $1;"); pigServer.registerQuery("D = foreach C generate AVG($1.$1);"); Iterator iter = pigServer.openIterator("D"); - - assertTrue(iter.hasNext()); - Tuple t = iter.next(); - - assertEquals(1, t.size()); - assertNull(t.get(0)); - - assertTrue(iter.hasNext()); - t = iter.next(); - - assertEquals(1, t.size()); - assertEquals("2.0", t.get(0).toString()); - - assertTrue(iter.hasNext()); - t = iter.next(); - - assertEquals(1, t.size()); - assertEquals("5.0", t.get(0).toString()); - - assertFalse(iter.hasNext()); - + String[] expectedRes = new String[]{"()","(2.0)","(5.0)"}; + Schema s = pigServer.dumpSchema("D"); + Util.checkQueryOutputsAfterSortRecursive(iter,expectedRes,org.apache.pig.newplan.logical.Util.translateSchema(s)); assertTrue(checkLogFileMessage(new String[]{"Columns pruned for B: $0"})); } @@ -596,26 +578,19 @@ public class TestPruneColumn { @Test public void testCoGroup3() throws Exception { - pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1:int, a2);"); + pigServer.registerQuery("A = load '" + Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1:int, a2);"); pigServer.registerQuery("B = group A by $1;"); pigServer.registerQuery("C = foreach B generate $1, '1';"); Iterator iter = pigServer.openIterator("C"); assertTrue(iter.hasNext()); - Tuple t = iter.next(); - - assertEquals(2, t.size()); - assertEquals("{(1,2,3)}", t.get(0).toString()); - assertEquals("1", t.get(1).toString()); - - assertTrue(iter.hasNext()); - t = iter.next(); - assertEquals(2, t.size()); - assertEquals("{(2,5,2)}", t.get(0).toString()); - assertEquals("1", t.get(1).toString()); - - assertFalse(iter.hasNext()); + String[] expected = new String[] { + "({(1,2,3)},1)", + "({(2,5,2)},1)" + }; + Util.checkQueryOutputs(iter, expected, org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("C")), + Util.isSparkExecType(Util.getLocalTestMode())); assertTrue(emptyLogFileMessage()); } @@ -629,27 +604,14 @@ public class TestPruneColumn { Iterator iter = pigServer.openIterator("D"); assertTrue(iter.hasNext()); - Tuple t = iter.next(); - - assertEquals(2, t.size()); - assertEquals("{}", t.get(0).toString()); - assertEquals("{(1)}", t.get(1).toString()); - - assertTrue(iter.hasNext()); - t = iter.next(); - - assertEquals(2, t.size()); - assertEquals("{(2)}", t.get(0).toString()); - assertEquals("{(2)}", t.get(1).toString()); - - assertTrue(iter.hasNext()); - t = iter.next(); - assertEquals(2, t.size()); - assertEquals("{(5)}", t.get(0).toString()); - assertEquals("{}", t.get(1).toString()); - - assertFalse(iter.hasNext()); + String[] expected = new String[] { + "({},{(1)})", + "({(2)},{(2)})", + "({(5)},{})" + }; + Util.checkQueryOutputs(iter, expected, org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("D")), + Util.isSparkExecType(Util.getLocalTestMode())); assertTrue(emptyLogFileMessage()); } @@ -755,22 +717,12 @@ public class TestPruneColumn { Iterator iter = pigServer.openIterator("D"); assertTrue(iter.hasNext()); - Tuple t = iter.next(); - - assertEquals(3, t.size()); - assertEquals("{}", t.get(0).toString()); - assertEquals("1", t.get(1).toString()); - assertEquals("1", t.get(2).toString()); - - assertTrue(iter.hasNext()); - t = iter.next(); - assertEquals(3, t.size()); - assertEquals("{(1,2,3)}", t.get(0).toString()); - assertEquals("2", t.get(1).toString()); - assertEquals("2", t.get(2).toString()); - - assertFalse(iter.hasNext()); + String[] expected = new String[] { + "({},1,1)", + "({(1,2,3)},2,2)" + }; + Util.checkQueryOutputsAfterSortRecursive(iter, expected, org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("D"))); assertTrue(emptyLogFileMessage()); } @@ -784,24 +736,13 @@ public class TestPruneColumn { Iterator iter = pigServer.openIterator("D"); assertTrue(iter.hasNext()); - Tuple t = iter.next(); - - assertEquals(4, t.size()); - assertEquals("1", t.get(0).toString()); - assertEquals("2", t.get(1).toString()); - assertEquals("3", t.get(2).toString()); - assertEquals("{(2)}", t.get(3).toString()); - assertTrue(iter.hasNext()); - t = iter.next(); - - assertEquals(4, t.size()); - assertEquals("2", t.get(0).toString()); - assertEquals("5", t.get(1).toString()); - assertEquals("2", t.get(2).toString()); - assertEquals("{}", t.get(3).toString()); - - assertFalse(iter.hasNext()); + String[] expected = new String[] { + "(1,2,3,{(2)})", + "(2,5,2,{})" + }; + Util.checkQueryOutputs(iter, expected, org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("D")), + Util.isSparkExecType(Util.getLocalTestMode())); assertTrue(emptyLogFileMessage()); } @@ -900,20 +841,13 @@ public class TestPruneColumn { Iterator iter = pigServer.openIterator("D"); assertTrue(iter.hasNext()); - Tuple t = iter.next(); - - assertEquals(2, t.size()); - assertEquals("1", t.get(0).toString()); - assertEquals("1", t.get(1).toString()); - - assertTrue(iter.hasNext()); - t = iter.next(); - assertEquals(2, t.size()); - assertEquals("2", t.get(0).toString()); - assertEquals("2", t.get(1).toString()); - - assertFalse(iter.hasNext()); + String[] expected = new String[] { + "(1,1)", + "(2,2)" + }; + Util.checkQueryOutputs(iter, expected, org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("D")), + Util.isSparkExecType(Util.getLocalTestMode())); assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $1, $2", "Columns pruned for B: $1"})); @@ -1022,20 +956,14 @@ public class TestPruneColumn { pigServer.registerQuery("B = group A by *;"); pigServer.registerQuery("C = foreach B generate $0;"); Iterator iter = pigServer.openIterator("C"); - assertTrue(iter.hasNext()); - Tuple t = iter.next(); - assertEquals(1, t.size()); - assertEquals("(1,2,3)", t.get(0).toString()); - - assertTrue(iter.hasNext()); - t = iter.next(); - - assertEquals(1, t.size()); - assertEquals("(2,5,2)", t.get(0).toString()); - - assertFalse(iter.hasNext()); + String[] expected = new String[] { + "((1,2,3))", + "((2,5,2))" + }; + Util.checkQueryOutputs(iter, expected, org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("C")), + Util.isSparkExecType(Util.getLocalTestMode())); assertTrue(emptyLogFileMessage()); } @@ -1578,21 +1506,16 @@ public class TestPruneColumn { Iterator iter = pigServer.openIterator("G"); assertTrue(iter.hasNext()); - Tuple t = iter.next(); - assertEquals("({(1)},1)", t.toString()); - - assertTrue(iter.hasNext()); - t = iter.next(); - assertEquals("({(2),(2)},2)", t.toString()); - - assertTrue(iter.hasNext()); - t = iter.next(); - assertEquals("({(3),(3),(3)},3)", t.toString()); - assertFalse(iter.hasNext()); + String[] expected = new String[] { + "({(1)},1)", + "({(2),(2)},2)", + "({(3),(3),(3)},3)" + }; + Util.checkQueryOutputs(iter, expected, org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("G")), + Util.isSparkExecType(Util.getLocalTestMode())); assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $1"})); - pigServer.getPigContext().getProperties().remove(PigImplConstants.PIG_OPTIMIZER_RULES_KEY); } @@ -1604,26 +1527,15 @@ public class TestPruneColumn { pigServer.registerQuery("D = foreach C generate $0, $1;"); Iterator iter = pigServer.openIterator("D"); - assertTrue(iter.hasNext()); - Tuple t = iter.next(); - assertEquals(2, t.size()); - assertEquals("1", t.get(0).toString()); - assertEquals("{}", t.get(1).toString()); - assertTrue(iter.hasNext()); - t = iter.next(); - assertEquals(2, t.size()); - assertEquals("2", t.get(0).toString()); - assertEquals("{(1,2,3)}", t.get(1).toString()); - - assertTrue(iter.hasNext()); - t = iter.next(); - assertEquals(2, t.size()); - assertEquals("5", t.get(0).toString()); - assertEquals("{(2,5,2)}", t.get(1).toString()); - - assertFalse(iter.hasNext()); + String[] expected = new String[] { + "(1,{})", + "(2,{(1,2,3)})", + "(5,{(2,5,2)})" + }; + Util.checkQueryOutputs(iter, expected, org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("D")) + , Util.isSparkExecType(Util.getLocalTestMode())); assertTrue(checkLogFileMessage(new String[]{"Columns pruned for B: $0"})); } @@ -1947,12 +1859,14 @@ public class TestPruneColumn { Iterator iter = pigServer.openIterator("C"); assertTrue(iter.hasNext()); - Tuple t = iter.next(); - assertEquals("(1,2,3,1)", t.toString()); - assertTrue(iter.hasNext()); - t = iter.next(); - assertEquals("(2,5,2,2)", t.toString()); + String[] expected = new String[] { + "(1,2,3,1)", + "(2,5,2,2)" + }; + Util.checkQueryOutputs(iter, expected, org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("C")), + Util.isSparkExecType(Util.getLocalTestMode())); + assertTrue(emptyLogFileMessage()); } Modified: pig/trunk/test/org/apache/pig/test/TestRank1.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestRank1.java?rev=1796639&r1=1796638&r2=1796639&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/TestRank1.java (original) +++ pig/trunk/test/org/apache/pig/test/TestRank1.java Mon May 29 15:00:39 2017 @@ -23,7 +23,6 @@ import static org.junit.Assert.assertTru import java.io.IOException; import java.util.List; -import java.util.Set; import org.apache.pig.PigServer; import org.apache.pig.builtin.mock.Storage.Data; Modified: pig/trunk/test/org/apache/pig/test/TestRank2.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestRank2.java?rev=1796639&r1=1796638&r2=1796639&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/TestRank2.java (original) +++ pig/trunk/test/org/apache/pig/test/TestRank2.java Mon May 29 15:00:39 2017 @@ -23,7 +23,6 @@ import static org.junit.Assert.assertTru import java.io.IOException; import java.util.List; -import java.util.Set; import org.apache.pig.PigServer; import org.apache.pig.builtin.mock.Storage.Data; Modified: pig/trunk/test/org/apache/pig/test/TestRank3.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestRank3.java?rev=1796639&r1=1796638&r2=1796639&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/TestRank3.java (original) +++ pig/trunk/test/org/apache/pig/test/TestRank3.java Mon May 29 15:00:39 2017 @@ -23,7 +23,6 @@ import static org.junit.Assert.assertTru import java.io.IOException; import java.util.List; -import java.util.Set; import org.apache.pig.PigServer; import org.apache.pig.backend.executionengine.ExecException; Modified: pig/trunk/test/org/apache/pig/test/TestSecondarySort.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestSecondarySort.java?rev=1796639&r1=1796638&r2=1796639&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/TestSecondarySort.java (original) +++ pig/trunk/test/org/apache/pig/test/TestSecondarySort.java Mon May 29 15:00:39 2017 @@ -18,7 +18,6 @@ package org.apache.pig.test; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import java.io.File; @@ -37,6 +36,7 @@ import org.apache.pig.impl.io.FileLocali import org.apache.pig.impl.logicalLayer.schema.Schema; import org.apache.pig.impl.plan.VisitorException; import org.junit.AfterClass; +import org.junit.Assume; import org.junit.Before; import org.junit.Ignore; import org.junit.Test; @@ -237,11 +237,11 @@ public abstract class TestSecondarySort pigServer .registerQuery("D = foreach C { E = limit A 10; F = E.a1; G = DISTINCT F; generate group, COUNT(G);};"); Iterator iter = pigServer.openIterator("D"); - assertTrue(iter.hasNext()); - assertEquals("(2,1)", iter.next().toString()); - assertTrue(iter.hasNext()); - assertEquals("(1,2)", iter.next().toString()); - assertFalse(iter.hasNext()); + String[] expectedRes = new String[]{"(2,1)","(1,2)"}; + Schema s = pigServer.dumpSchema("D"); + Util.checkQueryOutputs(iter,expectedRes,org.apache + .pig.newplan.logical.Util.translateSchema(s), + Util.isSparkExecType(cluster.getExecType())); Util.deleteFile(cluster, file1ClusterPath); Util.deleteFile(cluster, file2ClusterPath); } @@ -265,11 +265,14 @@ public abstract class TestSecondarySort pigServer.registerQuery("B = group A by $0 parallel 2;"); pigServer.registerQuery("C = foreach B { D = distinct A; generate group, D;};"); Iterator iter = pigServer.openIterator("C"); - assertTrue(iter.hasNext()); - assertEquals("(2,{(2,3,4)})", iter.next().toString()); - assertTrue(iter.hasNext()); - assertEquals("(1,{(1,2,3),(1,2,4),(1,3,4)})", iter.next().toString()); - assertFalse(iter.hasNext()); + Schema s = pigServer.dumpSchema("C"); + String expected[] = { + "(2,{(2,3,4)})", + "(1,{(1,2,3),(1,2,4),(1,3,4)})" + }; + Util.checkQueryOutputs(iter, expected, org.apache + .pig.newplan.logical.Util.translateSchema(s), + Util.isSparkExecType(Util.getLocalTestMode())); Util.deleteFile(cluster, clusterPath); } @@ -359,15 +362,10 @@ public abstract class TestSecondarySort pigServer.registerQuery("D = ORDER C BY group;"); pigServer.registerQuery("E = foreach D { F = limit A 10; G = ORDER F BY a2; generate group, COUNT(G);};"); Iterator iter = pigServer.openIterator("E"); - assertTrue(iter.hasNext()); - assertEquals("((1,2),4)", iter.next().toString()); - assertTrue(iter.hasNext()); - assertEquals("((1,3),1)", iter.next().toString()); - assertTrue(iter.hasNext()); - assertEquals("((1,4),0)", iter.next().toString()); - assertTrue(iter.hasNext()); - assertEquals("((2,3),1)", iter.next().toString()); - assertFalse(iter.hasNext()); + Schema s = pigServer.dumpSchema("E"); + String[] expectedRes = new String[]{"((1,2),4)","((1,3),1)","((1,4),0)","((2,3),1)"}; + Util.checkQueryOutputs(iter, expectedRes,org.apache.pig.newplan.logical.Util.translateSchema(s), + Util.isSparkExecType(cluster.getExecType())); Util.deleteFile(cluster, clusterPath1); Util.deleteFile(cluster, clusterPath2); } @@ -515,6 +513,8 @@ public abstract class TestSecondarySort @Test // Once custom partitioner is used, we cannot use secondary key optimizer, see PIG-3827 public void testCustomPartitionerWithSort() throws Exception { + Assume.assumeFalse("Skip this test for Spark", Util.isSparkExecType(cluster.getExecType())); + File tmpFile1 = Util.createTempFileDelOnExit("test", "txt"); PrintStream ps1 = new PrintStream(new FileOutputStream(tmpFile1)); ps1.println("1\t2\t3"); Modified: pig/trunk/test/org/apache/pig/test/TestStoreBase.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestStoreBase.java?rev=1796639&r1=1796638&r2=1796639&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/TestStoreBase.java (original) +++ pig/trunk/test/org/apache/pig/test/TestStoreBase.java Mon May 29 15:00:39 2017 @@ -144,24 +144,66 @@ public abstract class TestStoreBase { String outputFileName2 = TESTDIR + "/TestStore-output-" + new Random().nextLong() + ".txt"; Map filesToVerify = new HashMap(); - filesToVerify.put(outputFileName1 + "_cleanupOnFailure_succeeded1", Boolean.TRUE); - filesToVerify.put(outputFileName2 + "_cleanupOnFailure_succeeded2", Boolean.TRUE); - filesToVerify.put(outputFileName1 + "_cleanupOnFailure_failed1", Boolean.FALSE); - filesToVerify.put(outputFileName2 + "_cleanupOnFailure_failed2", Boolean.FALSE); - filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED + "1", Boolean.TRUE); - filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED + "2", Boolean.TRUE); - filesToVerify.put(DummyOutputCommitter.FILE_SETUPTASK_CALLED + "1", Boolean.TRUE); - filesToVerify.put(DummyOutputCommitter.FILE_SETUPTASK_CALLED + "2", Boolean.TRUE); - filesToVerify.put(DummyOutputCommitter.FILE_COMMITTASK_CALLED + "1", Boolean.FALSE); - filesToVerify.put(DummyOutputCommitter.FILE_COMMITTASK_CALLED + "2", Boolean.FALSE); - filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + "1", Boolean.TRUE); - filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + "2", Boolean.TRUE); - filesToVerify.put(DummyOutputCommitter.FILE_COMMITJOB_CALLED + "1", Boolean.FALSE); - filesToVerify.put(DummyOutputCommitter.FILE_COMMITJOB_CALLED + "2", Boolean.FALSE); - filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED + "1", Boolean.TRUE); - filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED + "2", Boolean.TRUE); - filesToVerify.put(DummyOutputCommitter.FILE_CLEANUPJOB_CALLED + "1", Boolean.FALSE); - filesToVerify.put(DummyOutputCommitter.FILE_CLEANUPJOB_CALLED + "2", Boolean.FALSE); + if (mode.toString().startsWith("SPARK")) { + filesToVerify.put(outputFileName1 + "_cleanupOnFailure_succeeded1", Boolean.TRUE); + filesToVerify.put(outputFileName2 + "_cleanupOnFailure_succeeded2", Boolean.TRUE); + filesToVerify.put(outputFileName1 + "_cleanupOnFailure_failed1", Boolean.FALSE); + filesToVerify.put(outputFileName2 + "_cleanupOnFailure_failed2", Boolean.FALSE); + filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED + "1", Boolean.TRUE); + /* A = load xx; + store A into '1.out' using DummyStore('true','1'); -- first job should fail + store A into '2.out' using DummyStore('false','1'); -- second job should success + After multiQuery optimization the spark plan will be: + Split - scope-14 + | | + | a: Store(hdfs://1.out:myudfs.DummyStore('true','1')) - scope-4 + | | + | a: Store(hdfs://2.out:myudfs.DummyStore('false','1')) - scope-7 + | + |---a: Load(hdfs://zly2.sh.intel.com:8020/user/root/multiStore.txt:org.apache.pig.builtin.PigStorage) - scope-0------ + In current code base, once the first job fails, the second job will not be executed. + the FILE_SETUPJOB_CALLED of second job will not exist. + I explain more detail in PIG-4243 + */ + filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED + "2", Boolean.FALSE); + filesToVerify.put(DummyOutputCommitter.FILE_SETUPTASK_CALLED + "1", Boolean.TRUE); + /* + In current code base, once the first job fails, the second job will not be executed. + the FILE_SETUPTASK_CALLED of second job will not exist. + */ + filesToVerify.put(DummyOutputCommitter.FILE_SETUPTASK_CALLED + "2", Boolean.FALSE); + filesToVerify.put(DummyOutputCommitter.FILE_COMMITTASK_CALLED + "1", Boolean.FALSE); + filesToVerify.put(DummyOutputCommitter.FILE_COMMITTASK_CALLED + "2", Boolean.FALSE); + // OutputCommitter.abortTask will not be invoked in spark mode. Detail see SPARK-7953 + filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + "1", Boolean.FALSE); + filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + "2", Boolean.FALSE); + filesToVerify.put(DummyOutputCommitter.FILE_COMMITJOB_CALLED + "1", Boolean.FALSE); + filesToVerify.put(DummyOutputCommitter.FILE_COMMITJOB_CALLED + "2", Boolean.FALSE); + // OutputCommitter.abortJob will not be invoked in spark mode. Detail see SPARK-7953 + filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED + "1", Boolean.FALSE); + filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED + "2", Boolean.FALSE); + filesToVerify.put(DummyOutputCommitter.FILE_CLEANUPJOB_CALLED + "1", Boolean.FALSE); + filesToVerify.put(DummyOutputCommitter.FILE_CLEANUPJOB_CALLED + "2", Boolean.FALSE); + } else { + filesToVerify.put(outputFileName1 + "_cleanupOnFailure_succeeded1", Boolean.TRUE); + filesToVerify.put(outputFileName2 + "_cleanupOnFailure_succeeded2", Boolean.TRUE); + filesToVerify.put(outputFileName1 + "_cleanupOnFailure_failed1", Boolean.FALSE); + filesToVerify.put(outputFileName2 + "_cleanupOnFailure_failed2", Boolean.FALSE); + filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED + "1", Boolean.TRUE); + filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED + "2", Boolean.TRUE); + filesToVerify.put(DummyOutputCommitter.FILE_SETUPTASK_CALLED + "1", Boolean.TRUE); + filesToVerify.put(DummyOutputCommitter.FILE_SETUPTASK_CALLED + "2", Boolean.TRUE); + filesToVerify.put(DummyOutputCommitter.FILE_COMMITTASK_CALLED + "1", Boolean.FALSE); + filesToVerify.put(DummyOutputCommitter.FILE_COMMITTASK_CALLED + "2", Boolean.FALSE); + filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + "1", Boolean.TRUE); + filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + "2", Boolean.TRUE); + filesToVerify.put(DummyOutputCommitter.FILE_COMMITJOB_CALLED + "1", Boolean.FALSE); + filesToVerify.put(DummyOutputCommitter.FILE_COMMITJOB_CALLED + "2", Boolean.FALSE); + filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED + "1", Boolean.TRUE); + filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED + "2", Boolean.TRUE); + filesToVerify.put(DummyOutputCommitter.FILE_CLEANUPJOB_CALLED + "1", Boolean.FALSE); + filesToVerify.put(DummyOutputCommitter.FILE_CLEANUPJOB_CALLED + "2", Boolean.FALSE); + } String[] inputData = new String[]{"hello\tworld", "bye\tworld"}; Modified: pig/trunk/test/org/apache/pig/test/TezMiniCluster.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TezMiniCluster.java?rev=1796639&r1=1796638&r2=1796639&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/TezMiniCluster.java (original) +++ pig/trunk/test/org/apache/pig/test/TezMiniCluster.java Mon May 29 15:00:39 2017 @@ -41,19 +41,12 @@ import org.apache.tez.dag.api.TezConfigu import org.apache.tez.mapreduce.hadoop.MRJobConfig; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; -public class TezMiniCluster extends MiniGenericCluster { - private static final File CONF_DIR = new File("build/classes"); +public class TezMiniCluster extends YarnMiniCluster { + private static final File TEZ_LIB_DIR = new File("build/ivy/lib/Pig"); private static final File TEZ_CONF_FILE = new File(CONF_DIR, "tez-site.xml"); - private static final File CORE_CONF_FILE = new File(CONF_DIR, "core-site.xml"); - private static final File HDFS_CONF_FILE = new File(CONF_DIR, "hdfs-site.xml"); - private static final File MAPRED_CONF_FILE = new File(CONF_DIR, "mapred-site.xml"); - private static final File YARN_CONF_FILE = new File(CONF_DIR, "yarn-site.xml"); - private static final ExecType TEZ = new TezExecType(); - protected MiniMRYarnCluster m_mr = null; - private Configuration m_dfs_conf = null; - private Configuration m_mr_conf = null; + private static final ExecType TEZ = new TezExecType(); @Override public ExecType getExecType() { @@ -61,66 +54,9 @@ public class TezMiniCluster extends Mini } @Override - public void setupMiniDfsAndMrClusters() { + protected void setupMiniDfsAndMrClusters() { + super.setupMiniDfsAndMrClusters(); try { - deleteConfFiles(); - CONF_DIR.mkdirs(); - - // Build mini DFS cluster - Configuration hdfsConf = new Configuration(); - m_dfs = new MiniDFSCluster.Builder(hdfsConf) - .numDataNodes(2) - .format(true) - .racks(null) - .build(); - m_fileSys = m_dfs.getFileSystem(); - m_dfs_conf = m_dfs.getConfiguration(0); - //Create user home directory - m_fileSys.mkdirs(m_fileSys.getWorkingDirectory()); - - // Write core-site.xml - Configuration core_site = new Configuration(false); - core_site.set(FileSystem.FS_DEFAULT_NAME_KEY, m_dfs_conf.get(FileSystem.FS_DEFAULT_NAME_KEY)); - core_site.writeXml(new FileOutputStream(CORE_CONF_FILE)); - - Configuration hdfs_site = new Configuration(false); - for (Entry conf : m_dfs_conf) { - if (ArrayUtils.contains(m_dfs_conf.getPropertySources(conf.getKey()), "programatically")) { - hdfs_site.set(conf.getKey(), m_dfs_conf.getRaw(conf.getKey())); - } - } - hdfs_site.writeXml(new FileOutputStream(HDFS_CONF_FILE)); - - // Build mini YARN cluster - m_mr = new MiniMRYarnCluster("PigMiniCluster", 2); - m_mr.init(m_dfs_conf); - m_mr.start(); - m_mr_conf = m_mr.getConfig(); - File libDir = new File(System.getProperty("ivy.lib.dir", "build/ivy/lib/Pig")); - File classesDir = new File(System.getProperty("build.classes", "build/classes")); - File testClassesDir = new File(System.getProperty("test.build.classes", "test/build/classes")); - String classpath = libDir.getAbsolutePath() + "/*" - + File.pathSeparator + classesDir.getAbsolutePath() - + File.pathSeparator + testClassesDir.getAbsolutePath(); - m_mr_conf.set(YarnConfiguration.YARN_APPLICATION_CLASSPATH, classpath); - m_mr_conf.set(MRJobConfig.MAP_JAVA_OPTS, "-Xmx512m"); - m_mr_conf.set(MRJobConfig.REDUCE_JAVA_OPTS, "-Xmx512m"); - - Configuration mapred_site = new Configuration(false); - Configuration yarn_site = new Configuration(false); - for (Entry conf : m_mr_conf) { - if (ArrayUtils.contains(m_mr_conf.getPropertySources(conf.getKey()), "programatically")) { - if (conf.getKey().contains("yarn")) { - yarn_site.set(conf.getKey(), m_mr_conf.getRaw(conf.getKey())); - } else if (!conf.getKey().startsWith("dfs")){ - mapred_site.set(conf.getKey(), m_mr_conf.getRaw(conf.getKey())); - } - } - } - - mapred_site.writeXml(new FileOutputStream(MAPRED_CONF_FILE)); - yarn_site.writeXml(new FileOutputStream(YARN_CONF_FILE)); - // Write tez-site.xml Configuration tez_conf = new Configuration(false); // TODO PIG-3659 - Remove this once memory management is fixed @@ -150,12 +86,9 @@ public class TezMiniCluster extends Mini } } - m_conf = m_mr_conf; // Turn FetchOptimizer off so that we can actually test Tez m_conf.set(PigConfiguration.PIG_OPT_FETCH, System.getProperty("test.opt.fetch", "false")); - System.setProperty("junit.hadoop.conf", CONF_DIR.getPath()); - System.setProperty("hadoop.log.dir", "build/test/logs"); } catch (IOException e) { throw new RuntimeException(e); } @@ -169,29 +102,15 @@ public class TezMiniCluster extends Mini @Override protected void shutdownMiniMrClusters() { - deleteConfFiles(); - if (m_mr != null) { - m_mr.stop(); - m_mr = null; - } + super.shutdownMiniMrClusters(); } - private void deleteConfFiles() { + @Override + protected void deleteConfFiles() { + super.deleteConfFiles(); if(TEZ_CONF_FILE.exists()) { TEZ_CONF_FILE.delete(); } - if(CORE_CONF_FILE.exists()) { - CORE_CONF_FILE.delete(); - } - if(HDFS_CONF_FILE.exists()) { - HDFS_CONF_FILE.delete(); - } - if(MAPRED_CONF_FILE.exists()) { - MAPRED_CONF_FILE.delete(); - } - if(YARN_CONF_FILE.exists()) { - YARN_CONF_FILE.delete(); - } } static public Launcher getLauncher() { Modified: pig/trunk/test/org/apache/pig/test/Util.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/Util.java?rev=1796639&r1=1796638&r2=1796639&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/Util.java (original) +++ pig/trunk/test/org/apache/pig/test/Util.java Mon May 29 15:00:39 2017 @@ -585,6 +585,29 @@ public class Util { checkQueryOutputsAfterSort(actualResList, expectedResList); } + /** + * Helper function to check if the result of Pig Query is in line with expected results. + * It sorts actual and expected results before comparison. + * The tuple size in the tuple list can vary. Pass by a two-dimension array, it will be converted to be a tuple list. + * e.g. expectedTwoDimensionObjects is [{{10, "will_join", 10, "will_join"}, {11, "will_not_join", null}, {null, 12, "will_not_join"}}], + * the field size of these 3 tuples are [4,3,3] + * + * @param actualResultsIt + * @param expectedTwoDimensionObjects represents a tuple list, in which the tuple can have variable size. + */ + static public void checkQueryOutputsAfterSort(Iterator actualResultsIt, + Object[][] expectedTwoDimensionObjects) { + List expectedResTupleList = new ArrayList(); + for (int i = 0; i < expectedTwoDimensionObjects.length; ++i) { + Tuple t = TupleFactory.getInstance().newTuple(); + for (int j = 0; j < expectedTwoDimensionObjects[i].length; ++j) { + t.append(expectedTwoDimensionObjects[i][j]); + } + expectedResTupleList.add(t); + } + checkQueryOutputsAfterSort(actualResultsIt, expectedResTupleList); + } + static public void checkQueryOutputsAfterSort( List actualResList, List expectedResList) { Collections.sort(actualResList); @@ -592,7 +615,7 @@ public class Util { Assert.assertEquals("Comparing actual and expected results. ", expectedResList, actualResList); - + } /** @@ -757,7 +780,8 @@ public class Util { if(Util.WINDOWS){ filename = filename.replace('\\','/'); } - if (context.getExecType() == ExecType.MAPREDUCE || context.getExecType().name().equals("TEZ")) { + if (context.getExecType() == ExecType.MAPREDUCE || context.getExecType().name().equals("TEZ") || + context.getExecType().name().equals("SPARK")) { return FileLocalizer.hadoopify(filename, context); } else if (context.getExecType().isLocal()) { return filename; @@ -1195,7 +1219,7 @@ public class Util { } - static private void convertBagToSortedBag(Tuple t) { + static public void convertBagToSortedBag(Tuple t) { for (int i=0;i actualResList, boolean toSort){ + if( toSort == true) { + for (Tuple t : actualResList) { + Util.convertBagToSortedBag(t); + } + Collections.sort(actualResList); + } + } + + public static void checkQueryOutputs(Iterator actualResults, List expectedResults, boolean checkAfterSort) { + if (checkAfterSort) { + checkQueryOutputsAfterSort(actualResults, expectedResults); + } else { + checkQueryOutputs(actualResults, expectedResults); + } + } + + static public void checkQueryOutputs(Iterator actualResultsIt, + String[] expectedResArray, LogicalSchema schema, boolean + checkAfterSort) throws IOException { + if (checkAfterSort) { + checkQueryOutputsAfterSortRecursive(actualResultsIt, + expectedResArray, schema); + } else { + checkQueryOutputs(actualResultsIt, + expectedResArray, schema); + } + } + + static void checkQueryOutputs(Iterator actualResultsIt, + String[] expectedResArray, LogicalSchema schema) throws IOException { + LogicalFieldSchema fs = new LogicalFieldSchema("tuple", schema, DataType.TUPLE); + ResourceFieldSchema rfs = new ResourceFieldSchema(fs); + + LoadCaster caster = new Utf8StorageConverter(); + List actualResList = new ArrayList(); + while (actualResultsIt.hasNext()) { + actualResList.add(actualResultsIt.next()); + } + + List expectedResList = new ArrayList(); + for (String str : expectedResArray) { + Tuple newTuple = caster.bytesToTuple(str.getBytes(), rfs); + expectedResList.add(newTuple); + } + + for (Tuple t : actualResList) { + convertBagToSortedBag(t); + } + + for (Tuple t : expectedResList) { + convertBagToSortedBag(t); + } + + Assert.assertEquals("Comparing actual and expected results. ", + expectedResList, actualResList); + } + public static void assertParallelValues(long defaultParallel, long requestedParallel, long estimatedParallel, @@ -1399,11 +1489,15 @@ public class Util { public static ExecType getLocalTestMode() throws Exception { String execType = System.getProperty("test.exec.type"); - if (execType!=null && execType.equals("tez")) { - return ExecTypeProvider.fromString("tez_local"); - } else { - return ExecTypeProvider.fromString("local"); + if (execType != null) { + if (execType.equals("tez")) { + return ExecTypeProvider.fromString("tez_local"); + } else if (execType.equals("spark")) { + return ExecTypeProvider.fromString("spark_local"); + } } + + return ExecTypeProvider.fromString("local"); } public static void createLogAppender(String appenderName, Writer writer, Class...clazzes) { Added: pig/trunk/test/org/apache/pig/test/YarnMiniCluster.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/YarnMiniCluster.java?rev=1796639&view=auto ============================================================================== --- pig/trunk/test/org/apache/pig/test/YarnMiniCluster.java (added) +++ pig/trunk/test/org/apache/pig/test/YarnMiniCluster.java Mon May 29 15:00:39 2017 @@ -0,0 +1,140 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.test; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.Map; + +import org.apache.commons.lang.ArrayUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.pig.ExecType; + +public abstract class YarnMiniCluster extends MiniGenericCluster { + protected static final File CONF_DIR = new File("build/classes"); + protected static final File CORE_CONF_FILE = new File(CONF_DIR, "core-site.xml"); + protected static final File HDFS_CONF_FILE = new File(CONF_DIR, "hdfs-site.xml"); + protected static final File MAPRED_CONF_FILE = new File(CONF_DIR, "mapred-site.xml"); + protected static final File YARN_CONF_FILE = new File(CONF_DIR, "yarn-site.xml"); + + + protected Configuration m_dfs_conf = null; + protected MiniMRYarnCluster m_mr = null; + protected Configuration m_mr_conf = null; + + + @Override + protected void setupMiniDfsAndMrClusters() { + try { + deleteConfFiles(); + CONF_DIR.mkdirs(); + + // Build mini DFS cluster + Configuration hdfsConf = new Configuration(); + m_dfs = new MiniDFSCluster.Builder(hdfsConf) + .numDataNodes(2) + .format(true) + .racks(null) + .build(); + m_fileSys = m_dfs.getFileSystem(); + m_dfs_conf = m_dfs.getConfiguration(0); + + //Create user home directory + m_fileSys.mkdirs(m_fileSys.getWorkingDirectory()); + // Write core-site.xml + Configuration core_site = new Configuration(false); + core_site.set(FileSystem.FS_DEFAULT_NAME_KEY, m_dfs_conf.get(FileSystem.FS_DEFAULT_NAME_KEY)); + core_site.writeXml(new FileOutputStream(CORE_CONF_FILE)); + + Configuration hdfs_site = new Configuration(false); + for (Map.Entry conf : m_dfs_conf) { + if (ArrayUtils.contains(m_dfs_conf.getPropertySources(conf.getKey()), "programatically")) { + hdfs_site.set(conf.getKey(), m_dfs_conf.getRaw(conf.getKey())); + } + } + hdfs_site.writeXml(new FileOutputStream(HDFS_CONF_FILE)); + + // Build mini YARN cluster + m_mr = new MiniMRYarnCluster("PigMiniCluster", 2); + m_mr.init(m_dfs_conf); + m_mr.start(); + m_mr_conf = m_mr.getConfig(); + File libDir = new File(System.getProperty("ivy.lib.dir", "build/ivy/lib/Pig")); + File classesDir = new File(System.getProperty("build.classes", "build/classes")); + File testClassesDir = new File(System.getProperty("test.build.classes", "test/build/classes")); + String classpath = libDir.getAbsolutePath() + "/*" + + File.pathSeparator + classesDir.getAbsolutePath() + + File.pathSeparator + testClassesDir.getAbsolutePath(); + m_mr_conf.set(YarnConfiguration.YARN_APPLICATION_CLASSPATH, classpath); + m_mr_conf.set(MRJobConfig.MAP_JAVA_OPTS, "-Xmx512m"); + m_mr_conf.set(MRJobConfig.REDUCE_JAVA_OPTS, "-Xmx512m"); + + Configuration mapred_site = new Configuration(false); + Configuration yarn_site = new Configuration(false); + for (Map.Entry conf : m_mr_conf) { + if (ArrayUtils.contains(m_mr_conf.getPropertySources(conf.getKey()), "programatically")) { + if (conf.getKey().contains("yarn")) { + yarn_site.set(conf.getKey(), m_mr_conf.getRaw(conf.getKey())); + } else if (!conf.getKey().startsWith("dfs")) { + mapred_site.set(conf.getKey(), m_mr_conf.getRaw(conf.getKey())); + } + } + } + + mapred_site.writeXml(new FileOutputStream(MAPRED_CONF_FILE)); + yarn_site.writeXml(new FileOutputStream(YARN_CONF_FILE)); + + m_conf = m_mr_conf; + System.setProperty("junit.hadoop.conf", CONF_DIR.getPath()); + System.setProperty("hadoop.log.dir", "build/test/logs"); + } catch (IOException e) { + throw new RuntimeException(e); + + } + } + + protected void deleteConfFiles() { + if(CORE_CONF_FILE.exists()) { + CORE_CONF_FILE.delete(); + } + if(HDFS_CONF_FILE.exists()) { + HDFS_CONF_FILE.delete(); + } + if(MAPRED_CONF_FILE.exists()) { + MAPRED_CONF_FILE.delete(); + } + if(YARN_CONF_FILE.exists()) { + YARN_CONF_FILE.delete(); + } + } + + @Override + protected void shutdownMiniMrClusters() { + deleteConfFiles(); + if (m_mr != null) { + m_mr.stop(); + m_mr = null; + } + } +} \ No newline at end of file Added: pig/trunk/test/org/apache/pig/test/data/GoldenFiles/spark/SPARKC-LoadStore-1-dot.gld URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/spark/SPARKC-LoadStore-1-dot.gld?rev=1796639&view=auto ============================================================================== --- pig/trunk/test/org/apache/pig/test/data/GoldenFiles/spark/SPARKC-LoadStore-1-dot.gld (added) +++ pig/trunk/test/org/apache/pig/test/data/GoldenFiles/spark/SPARKC-LoadStore-1-dot.gld Mon May 29 15:00:39 2017 @@ -0,0 +1,71 @@ +digraph plan { +compound=true; +node [shape=rect]; +s487399236_in [label="", style=invis, height=0, width=0]; +s487399236_out [label="", style=invis, height=0, width=0]; +subgraph cluster_487399236 { +label="Spark(-1,PigStorage)"; style="filled"; fillcolor="#EEEEEE"labelloc=b; +s0_in [label="", style=invis, height=0, width=0]; +s0_out [label="", style=invis, height=0, width=0]; +subgraph cluster_0 { +label="spark"; style="filled"; fillcolor="white"labelloc=b; +487399275 [label="a: Load(file:///tmp/input,\norg.apache.pig.builtin.PigStorage)", style="filled", fillcolor="gray"]; +s487399268_in [label="", style=invis, height=0, width=0]; +s487399268_out [label="", style=invis, height=0, width=0]; +subgraph cluster_487399268 { +label="a: New For Each(false,false)[bag]"labelloc=b; +487399274 [label="Project[bytearray][0]"]; +487399273 [label="Cast[int]"]; +487399274 -> 487399273 +s487399268_in -> 487399274 [style=invis]; +487399270 [label="Cast[int]"]; +487399271 [label="Project[bytearray][1]"]; +487399271 -> 487399270 +s487399268_in -> 487399271 [style=invis]; +}; +487399273 -> s487399268_out [style=invis]; +487399270 -> s487399268_out [style=invis]; +487399267 [label="a: Store(file:///tmp/pigoutput,\norg.apache.pig.builtin.PigStorage)", style="filled", fillcolor="gray"]; +487399275 -> s487399268_in [lhead=cluster_487399268] +s487399268_out -> 487399267 +s0_in -> 487399275 [style=invis]; +}; +487399267 -> s0_out [style=invis]; +s487399236_in -> s0_in [style=invis]; +}; +s0_out -> s487399236_out [style=invis]; +s487399235_in [label="", style=invis, height=0, width=0]; +s487399235_out [label="", style=invis, height=0, width=0]; +subgraph cluster_487399235 { +label="Spark(-1,PigStorage)"; style="filled"; fillcolor="#EEEEEE"labelloc=b; +s1_in [label="", style=invis, height=0, width=0]; +s1_out [label="", style=invis, height=0, width=0]; +subgraph cluster_1 { +label="spark"; style="filled"; fillcolor="white"labelloc=b; +s487399238_in [label="", style=invis, height=0, width=0]; +s487399238_out [label="", style=invis, height=0, width=0]; +subgraph cluster_487399238 { +label="b: New For Each(false,false)[bag]"labelloc=b; +487399244 [label="Project[bytearray][0]"]; +487399243 [label="Cast[int]"]; +487399244 -> 487399243 +s487399238_in -> 487399244 [style=invis]; +487399241 [label="Project[bytearray][1]"]; +487399240 [label="Cast[int]"]; +487399241 -> 487399240 +s487399238_in -> 487399241 [style=invis]; +}; +487399243 -> s487399238_out [style=invis]; +487399240 -> s487399238_out [style=invis]; +487399237 [label="b: Store(file:///tmp/pigoutput1,\norg.apache.pig.builtin.PigStorage)", style="filled", fillcolor="gray"]; +487399266 [label="b: Load(file:///tmp/pigoutput,\norg.apache.pig.builtin.PigStorage)", style="filled", fillcolor="gray"]; +s487399238_out -> 487399237 +487399266 -> s487399238_in [lhead=cluster_487399238] +s1_in -> 487399266 [style=invis]; +}; +487399237 -> s1_out [style=invis]; +s487399235_in -> s1_in [style=invis]; +}; +s1_out -> s487399235_out [style=invis]; +s487399236_out -> s487399235_in [lhead=cluster_487399235] +} Added: pig/trunk/test/org/apache/pig/test/data/GoldenFiles/spark/SPARKC-LoadStore-1-text.gld URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/spark/SPARKC-LoadStore-1-text.gld?rev=1796639&view=auto ============================================================================== --- pig/trunk/test/org/apache/pig/test/data/GoldenFiles/spark/SPARKC-LoadStore-1-text.gld (added) +++ pig/trunk/test/org/apache/pig/test/data/GoldenFiles/spark/SPARKC-LoadStore-1-text.gld Mon May 29 15:00:39 2017 @@ -0,0 +1,33 @@ +#-------------------------------------------------- +# Spark Plan +#-------------------------------------------------- + +Spark node scope-18 +a: Store(file:///tmp/pigoutput:org.apache.pig.builtin.PigStorage) - scope-8 +| +|---a: New For Each(false,false)[bag] - scope-7 + | | + | Cast[int] - scope-2 + | | + | |---Project[bytearray][0] - scope-1 + | | + | Cast[int] - scope-5 + | | + | |---Project[bytearray][1] - scope-4 + | + |---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-0-------- + +Spark node scope-19 +b: Store(file:///tmp/pigoutput1:org.apache.pig.builtin.PigStorage) - scope-17 +| +|---b: New For Each(false,false)[bag] - scope-16 + | | + | Cast[int] - scope-11 + | | + | |---Project[bytearray][0] - scope-10 + | | + | Cast[int] - scope-14 + | | + | |---Project[bytearray][1] - scope-13 + | + |---b: Load(file:///tmp/pigoutput:org.apache.pig.builtin.PigStorage) - scope-9-------- Added: pig/trunk/test/org/apache/pig/test/data/GoldenFiles/spark/SPARKC-LoadStore-1-xml.gld URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/spark/SPARKC-LoadStore-1-xml.gld?rev=1796639&view=auto ============================================================================== --- pig/trunk/test/org/apache/pig/test/data/GoldenFiles/spark/SPARKC-LoadStore-1-xml.gld (added) +++ pig/trunk/test/org/apache/pig/test/data/GoldenFiles/spark/SPARKC-LoadStore-1-xml.gld Mon May 29 15:00:39 2017 @@ -0,0 +1,49 @@ + + + + a + file:///tmp/pigoutput + false + + a + + x + + + + y + + + + a + file:///tmp/input + false + + + + + + + b + file:///tmp/pigoutput1 + false + + b + + x + + + + y + + + + b + file:///tmp/pigoutput + false + + + + + +