pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From xu...@apache.org
Subject svn commit: r1733627 [16/18] - in /pig/branches/spark: ./ bin/ conf/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/math/ contrib/piggybank/java/src/mai...
Date Fri, 04 Mar 2016 18:17:47 GMT
Modified: pig/branches/spark/test/org/apache/pig/test/TestMultiQuery.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestMultiQuery.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestMultiQuery.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestMultiQuery.java Fri Mar  4 18:17:39 2016
@@ -28,6 +28,7 @@ import java.util.Properties;
 import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigServer;
 import org.apache.pig.backend.executionengine.ExecJob;
+import org.apache.pig.builtin.mock.Storage;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
@@ -38,7 +39,6 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
-import org.apache.pig.builtin.mock.Storage;
 
 @RunWith(JUnit4.class)
 public class TestMultiQuery {
@@ -811,6 +811,75 @@ public class TestMultiQuery {
         iter.next().toString().equals("(world,{(2,world)})");
     }
 
+    @Test
+    public void testMultiQueryJiraPig4480() throws Exception {
+
+        Storage.Data data = Storage.resetData(myPig);
+        data.set("inputLocation",
+                Storage.tuple(1, Storage.bag(Storage.tuple("hello"), Storage.tuple("world"), Storage.tuple("program"))),
+                Storage.tuple(2, Storage.bag(Storage.tuple("my"), Storage.tuple("world"))));
+
+        myPig.setBatchOn();
+        myPig.registerQuery("A = load 'inputLocation' using mock.Storage() as (a:int, b:bag{(c:chararray)});");
+        myPig.registerQuery("A = foreach A generate a, flatten(b);");
+        myPig.registerQuery("A1 = foreach A generate a;");
+        myPig.registerQuery("A1 = distinct A1;");
+        myPig.registerQuery("A2 = filter A by c == 'world';");
+        myPig.registerQuery("A2 = ORDER A2 by a parallel 2;");
+        myPig.registerQuery("store A1 into 'output1' using mock.Storage();");
+        myPig.registerQuery("store A2 into 'output2' using mock.Storage();");
+
+        myPig.executeBatch();
+
+        List<Tuple> actualResults = data.get("output1");
+        List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
+                new String[] {"(1)", "(2)"});
+        Util.checkQueryOutputs(actualResults.iterator(), expectedResults);
+
+        actualResults = data.get("output2");
+        expectedResults = Util.getTuplesFromConstantTupleStrings(
+                new String[] {"(1, 'world')", "(2, 'world')"});
+        Util.checkQueryOutputs(actualResults.iterator(), expectedResults);
+    }
+
+    @Test
+    public void testMultiQueryJiraPig4493() throws Exception {
+
+        // Union followed by Split
+        Storage.Data data = Storage.resetData(myPig);
+        data.set("inputLocation",
+                Storage.tuple("1", "Dyson"),
+                Storage.tuple("2", "Miele"),
+                Storage.tuple("3", "Black & Decker")
+                );
+
+        myPig.setBatchOn();
+        myPig.registerQuery("A = load 'inputLocation' using mock.Storage();");
+        myPig.registerQuery("A = foreach A generate (int)$0 as a, (chararray)$1 as b;");
+        myPig.registerQuery("A1 = FILTER A by b matches '.*[a-zA-Z] *& *[a-zA-Z].*';");
+        myPig.registerQuery("A1 = FOREACH A1 generate a, REPLACE(b,'&','and')  as b;");
+        myPig.registerQuery("A = UNION A1, A;");
+        myPig.registerQuery("A = FOREACH A generate a, LOWER(b) as b;");
+        myPig.registerQuery("A2 = GROUP A by a;");
+        myPig.registerQuery("A2 = FOREACH A2 generate group, COUNT(A) as cnt;");
+        myPig.registerQuery("store A2 into 'output1' using mock.Storage();");
+        myPig.registerQuery("A = FILTER A BY b is not null and b != '';");
+        myPig.registerQuery("store A into 'output2' using mock.Storage();");
+
+        myPig.executeBatch();
+
+        List<Tuple> actualResults = data.get("output1");
+        List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
+                new String[] {"(1, 1L)", "(2, 1L)", "(3, 2L)"});
+        Util.checkQueryOutputsAfterSort(actualResults.iterator(), expectedResults);
+
+        actualResults = data.get("output2");
+        expectedResults = Util.getTuplesFromConstantTupleStrings(new String[] {
+                "(1, 'dyson')", "(2, 'miele')", "(3, 'black & decker')",
+                "(3, 'black and decker')" });
+        Util.checkQueryOutputsAfterSort(actualResults.iterator(), expectedResults);
+    }
+
     // --------------------------------------------------------------------------
     // Helper methods
 

Modified: pig/branches/spark/test/org/apache/pig/test/TestMultiQueryLocal.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestMultiQueryLocal.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestMultiQueryLocal.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestMultiQueryLocal.java Fri Mar  4 18:17:39 2016
@@ -27,8 +27,6 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.Properties;
 
-import junit.framework.Assert;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -58,6 +56,7 @@ import org.apache.pig.tools.pigscript.pa
 import org.apache.pig.tools.pigstats.JobStats;
 import org.apache.pig.tools.pigstats.PigStats;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Assume;
 import org.junit.Before;
 import org.junit.Test;
@@ -84,7 +83,7 @@ public class TestMultiQueryLocal {
     }
 
     @Test
-    public void testMultiQueryWithTwoStores() {
+    public void testMultiQueryWithTwoStores() throws Exception {
 
         System.out.println("===== test multi-query with 2 stores =====");
 
@@ -106,32 +105,23 @@ public class TestMultiQueryLocal {
 
             Assert.assertTrue(executePlan(pp));
 
-        } catch (Exception e) {
-            e.printStackTrace();
-            Assert.fail();
         } finally {
             deleteOutputFiles();
         }
     }
 
     @Test
-    public void testEmptyExecute() {
+    public void testEmptyExecute() throws Exception {
         System.out.println("=== test empty execute ===");
 
-        try {
-            myPig.setBatchOn();
-            myPig.executeBatch();
-            myPig.executeBatch();
-            myPig.discardBatch();
-        }
-        catch (Exception e) {
-            e.printStackTrace();
-            Assert.fail();
-        }
+        myPig.setBatchOn();
+        myPig.executeBatch();
+        myPig.executeBatch();
+        myPig.discardBatch();
     }
 
     @Test
-    public void testMultiQueryWithTwoStores2() {
+    public void testMultiQueryWithTwoStores2() throws Exception {
 
         System.out.println("===== test multi-query with 2 stores (2) =====");
 
@@ -147,16 +137,13 @@ public class TestMultiQueryLocal {
 
             myPig.executeBatch();
 
-        } catch (Exception e) {
-            e.printStackTrace();
-            Assert.fail();
         } finally {
             deleteOutputFiles();
         }
     }
 
     @Test
-    public void testMultiQueryWithTwoStores2Execs() {
+    public void testMultiQueryWithTwoStores2Execs() throws Exception {
 
         System.out.println("===== test multi-query with 2 stores (2) =====");
 
@@ -175,16 +162,13 @@ public class TestMultiQueryLocal {
             myPig.executeBatch();
             myPig.discardBatch();
 
-        } catch (Exception e) {
-            e.printStackTrace();
-            Assert.fail();
         } finally {
             deleteOutputFiles();
         }
     }
 
     @Test
-    public void testMultiQueryWithThreeStores() {
+    public void testMultiQueryWithThreeStores() throws Exception {
 
         System.out.println("===== test multi-query with 3 stores =====");
 
@@ -206,16 +190,13 @@ public class TestMultiQueryLocal {
 
             Assert.assertTrue(executePlan(pp));
 
-        } catch (Exception e) {
-            e.printStackTrace();
-            Assert.fail();
         } finally {
             deleteOutputFiles();
         }
     }
 
     @Test
-    public void testMultiQueryWithThreeStores2() {
+    public void testMultiQueryWithThreeStores2() throws Exception {
 
         System.out.println("===== test multi-query with 3 stores (2) =====");
 
@@ -234,16 +215,13 @@ public class TestMultiQueryLocal {
             myPig.executeBatch();
             myPig.discardBatch();
 
-        } catch (Exception e) {
-            e.printStackTrace();
-            Assert.fail();
         } finally {
             deleteOutputFiles();
         }
     }
 
     @Test
-    public void testMultiQueryWithTwoLoads() {
+    public void testMultiQueryWithTwoLoads() throws Exception {
 
         System.out.println("===== test multi-query with two loads =====");
 
@@ -268,16 +246,13 @@ public class TestMultiQueryLocal {
 
             Assert.assertTrue(executePlan(pp));
 
-        } catch (Exception e) {
-            e.printStackTrace();
-            Assert.fail();
         } finally {
             deleteOutputFiles();
         }
     }
 
     @Test
-    public void testMultiQueryWithTwoLoads2() {
+    public void testMultiQueryWithTwoLoads2() throws Exception {
 
         System.out.println("===== test multi-query with two loads (2) =====");
 
@@ -298,58 +273,45 @@ public class TestMultiQueryLocal {
             myPig.executeBatch();
             myPig.discardBatch();
 
-        } catch (Exception e) {
-            e.printStackTrace();
-            Assert.fail();
         } finally {
             deleteOutputFiles();
         }
     }
 
     @Test
-    public void testMultiQueryWithNoStore() {
+    public void testMultiQueryWithNoStore() throws Exception {
 
         System.out.println("===== test multi-query with no store =====");
 
-        try {
-            myPig.setBatchOn();
+        myPig.setBatchOn();
 
-            myPig.registerQuery("a = load 'test/org/apache/pig/test/data/passwd' " +
-                                "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int,gid:int);");
-            myPig.registerQuery("b = filter a by uid > 5;");
-            myPig.registerQuery("group b by gid;");
+        myPig.registerQuery("a = load 'test/org/apache/pig/test/data/passwd' " +
+                            "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int,gid:int);");
+        myPig.registerQuery("b = filter a by uid > 5;");
+        myPig.registerQuery("group b by gid;");
 
-            LogicalPlan lp = checkLogicalPlan(0, 0, 0);
+        LogicalPlan lp = checkLogicalPlan(0, 0, 0);
 
-            // XXX Physical plan has one less node in the local case
-            PhysicalPlan pp = checkPhysicalPlan(lp, 0, 0, 0);
+        // XXX Physical plan has one less node in the local case
+        checkPhysicalPlan(lp, 0, 0, 0);
 
-        } catch (Exception e) {
-            e.printStackTrace();
-            Assert.fail();
-        }
     }
 
     @Test
-    public void testMultiQueryWithNoStore2() {
+    public void testMultiQueryWithNoStore2() throws Exception {
 
         System.out.println("===== test multi-query with no store (2) =====");
 
-        try {
-            myPig.setBatchOn();
+        myPig.setBatchOn();
 
-            myPig.registerQuery("a = load 'test/org/apache/pig/test/data/passwd' " +
-                                "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int,gid:int);");
-            myPig.registerQuery("b = filter a by uid > 5;");
-            myPig.registerQuery("group b by gid;");
+        myPig.registerQuery("a = load 'test/org/apache/pig/test/data/passwd' " +
+                            "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int,gid:int);");
+        myPig.registerQuery("b = filter a by uid > 5;");
+        myPig.registerQuery("group b by gid;");
 
-            myPig.executeBatch();
-            myPig.discardBatch();
+        myPig.executeBatch();
+        myPig.discardBatch();
 
-        } catch (Exception e) {
-            e.printStackTrace();
-            Assert.fail();
-        }
     }
 
     public static class PigStorageWithConfig extends PigStorage {
@@ -428,43 +390,38 @@ public class TestMultiQueryLocal {
 
     // See PIG-2912
     @Test
-    public void testMultiStoreWithConfig() {
+    public void testMultiStoreWithConfig() throws Exception {
 
         System.out.println("===== test multi-query with competing config =====");
 
-        try {
-            myPig.setBatchOn();
+        myPig.setBatchOn();
 
-            myPig.registerQuery("a = load 'test/org/apache/pig/test/data/passwd' " +
-                                "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int,gid:int);");
-            myPig.registerQuery("b = filter a by uid < 5;");
-            myPig.registerQuery("c = filter a by uid > 5;");
-            myPig.registerQuery("store b into '" + TMP_DIR + "/Pig-TestMultiQueryLocal1' using " + PigStorageWithConfig.class.getName() + "('test.key1', 'a');");
-            myPig.registerQuery("store c into '" + TMP_DIR + "/Pig-TestMultiQueryLocal2' using " + PigStorageWithConfig.class.getName() + "('test.key2', 'b');");
+        myPig.registerQuery("a = load 'test/org/apache/pig/test/data/passwd' " +
+                            "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int,gid:int);");
+        myPig.registerQuery("b = filter a by uid < 5;");
+        myPig.registerQuery("c = filter a by uid > 5;");
+        myPig.registerQuery("store b into '" + TMP_DIR + "/Pig-TestMultiQueryLocal1' using " + PigStorageWithConfig.class.getName() + "('test.key1', 'a');");
+        myPig.registerQuery("store c into '" + TMP_DIR + "/Pig-TestMultiQueryLocal2' using " + PigStorageWithConfig.class.getName() + "('test.key2', 'b');");
 
-            myPig.executeBatch();
-            myPig.discardBatch();
-            FileSystem fs = FileSystem.getLocal(new Configuration());
-            BufferedReader reader = new BufferedReader(new InputStreamReader
-                    (fs.open(Util.getFirstPartFile(new Path(TMP_DIR + "/Pig-TestMultiQueryLocal1")))));
-            String line;
-            while ((line = reader.readLine())!=null) {
-                Assert.assertTrue(line.endsWith("a"));
-            }
-            reader = new BufferedReader(new InputStreamReader
-                    (fs.open(Util.getFirstPartFile(new Path(TMP_DIR + "/Pig-TestMultiQueryLocal2")))));
-            while ((line = reader.readLine())!=null) {
-                Assert.assertTrue(line.endsWith("b"));
-            }
-
-        } catch (Exception e) {
-            e.printStackTrace();
-            Assert.fail();
+        myPig.executeBatch();
+        myPig.discardBatch();
+        FileSystem fs = FileSystem.getLocal(new Configuration());
+        BufferedReader reader = new BufferedReader(new InputStreamReader
+                (fs.open(Util.getFirstPartFile(new Path("file:///" + TMP_DIR + "/Pig-TestMultiQueryLocal1")))));
+        String line;
+        while ((line = reader.readLine())!=null) {
+            Assert.assertTrue(line.endsWith("a"));
+        }
+        reader = new BufferedReader(new InputStreamReader
+                (fs.open(Util.getFirstPartFile(new Path("file:///" + TMP_DIR + "/Pig-TestMultiQueryLocal2")))));
+        while ((line = reader.readLine())!=null) {
+            Assert.assertTrue(line.endsWith("b"));
         }
+
     }
 
     @Test
-    public void testMultiQueryWithExplain() {
+    public void testMultiQueryWithExplain() throws Exception {
 
         System.out.println("===== test multi-query with explain =====");
 
@@ -479,16 +436,13 @@ public class TestMultiQueryLocal {
             parser.setInteractive(false);
             parser.parseStopOnError();
 
-        } catch (Exception e) {
-            e.printStackTrace();
-            Assert.fail();
         } finally {
             deleteOutputFiles();
         }
     }
 
     @Test
-    public void testMultiQueryWithDump() {
+    public void testMultiQueryWithDump() throws Exception {
 
         System.out.println("===== test multi-query with dump =====");
 
@@ -503,16 +457,13 @@ public class TestMultiQueryLocal {
             parser.setInteractive(false);
             parser.parseStopOnError();
 
-        } catch (Exception e) {
-            e.printStackTrace();
-            Assert.fail();
         } finally {
             deleteOutputFiles();
         }
     }
 
     @Test
-    public void testMultiQueryWithDescribe() {
+    public void testMultiQueryWithDescribe() throws Exception {
 
         System.out.println("===== test multi-query with describe =====");
 
@@ -527,9 +478,6 @@ public class TestMultiQueryLocal {
             parser.setInteractive(false);
             parser.parseStopOnError();
 
-        } catch (Exception e) {
-            e.printStackTrace();
-            Assert.fail();
         } finally {
             deleteOutputFiles();
         }
@@ -554,9 +502,6 @@ public class TestMultiQueryLocal {
             myPig.getPigContext().getProperties().setProperty("pig.usenewlogicalplan", "true");
             parser.parseStopOnError();
 
-        } catch (Exception e) {
-            e.printStackTrace();
-            Assert.fail();
         } finally {
             deleteOutputFiles();
             myPig.getPigContext().getProperties().setProperty("pig.usenewlogicalplan", "false");
@@ -564,7 +509,7 @@ public class TestMultiQueryLocal {
     }
 
     @Test
-    public void testStoreOrder() {
+    public void testStoreOrder() throws Exception {
         System.out.println("===== multi-query store order =====");
 
         try {
@@ -583,7 +528,7 @@ public class TestMultiQueryLocal {
             myPig.registerQuery("store c into '" + TMP_DIR + "/Pig-TestMultiQueryLocal5';");
 
             LogicalPlan lp = checkLogicalPlan(1, 3, 12);
-            PhysicalPlan pp = checkPhysicalPlan(lp, 1, 3, 15);
+            checkPhysicalPlan(lp, 1, 3, 15);
 
             myPig.executeBatch();
             myPig.discardBatch();
@@ -594,10 +539,6 @@ public class TestMultiQueryLocal {
             Assert.assertTrue(new File(TMP_DIR + "/Pig-TestMultiQueryLocal4").exists());
             Assert.assertTrue(new File(TMP_DIR + "/Pig-TestMultiQueryLocal5").exists());
 
-
-        } catch (Exception e) {
-            e.printStackTrace();
-            Assert.fail();
         } finally {
             deleteOutputFiles();
         }
@@ -657,7 +598,7 @@ public class TestMultiQueryLocal {
             int expectedLeaves, int expectedSize) throws IOException {
 
         lp.optimize(myPig.getPigContext());
-        System.out.println("===== check physical plan =====");        
+        System.out.println("===== check physical plan =====");
 
         PhysicalPlan pp = ((HExecutionEngine)myPig.getPigContext().getExecutionEngine()).compile(
                 lp, null);

Modified: pig/branches/spark/test/org/apache/pig/test/TestPOCast.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestPOCast.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestPOCast.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestPOCast.java Fri Mar  4 18:17:39 2016
@@ -1777,12 +1777,11 @@ public class TestPOCast {
 			plan.attachInput(t);
 			DataByteArray dba = (DataByteArray) t.get(0);
 			Result res = op.getNextDataByteArray();
-			assertEquals(POStatus.STATUS_ERR, res.returnStatus);
+			assertEquals(POStatus.STATUS_OK, res.returnStatus);
 
 			planToTestBACasts.attachInput(t);
 			res = opWithInputTypeAsBA.getNextDataByteArray();
-			if(res.returnStatus == POStatus.STATUS_OK)
-				assertEquals(POStatus.STATUS_ERR, res.returnStatus);
+			assertEquals(POStatus.STATUS_OK, res.returnStatus);
 		}
 
 		{

Modified: pig/branches/spark/test/org/apache/pig/test/TestPOPartialAgg.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestPOPartialAgg.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestPOPartialAgg.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestPOPartialAgg.java Fri Mar  4 18:17:39 2016
@@ -77,8 +77,12 @@ public class TestPOPartialAgg {
     }
 
     private void createPOPartialPlan(int valueCount) throws PlanException {
+        createPOPartialPlan(valueCount, false);
+    }
+
+    private void createPOPartialPlan(int valueCount, boolean isGroupAll) throws PlanException {
         parentPlan = new PhysicalPlan();
-        partAggOp = GenPhyOp.topPOPartialAgg();
+        partAggOp = new POPartialAgg(GenPhyOp.getOK(), isGroupAll);
         partAggOp.setParentPlan(parentPlan);
 
         // setup key plan
@@ -357,6 +361,25 @@ public class TestPOPartialAgg {
         assertEquals(new Long(1), spilled.get());
     }
 
+    @Test
+    public void testGroupAll() throws Exception {
+        createPOPartialPlan(1, true);
+        Result res;
+        for (long i=1; i <= 10010; i ++) {
+            Tuple t = tuple("all", tuple(i));
+            partAggOp.attachInput(t);
+            res = partAggOp.getNextTuple();
+            assertEquals(POStatus.STATUS_EOP, res.returnStatus);
+        }
+        // end of all input, now expecting all tuples
+        parentPlan.endOfAllInput = true;
+        res = partAggOp.getNextTuple();
+        assertEquals(tuple("all", tuple(50105055L)), res.result);
+        assertEquals(POStatus.STATUS_OK, res.returnStatus);
+        res = partAggOp.getNextTuple();
+        assertEquals(POStatus.STATUS_EOP, res.returnStatus);
+    }
+
     private static class Spill implements Callable<Long> {
 
         private Spillable spillable;

Modified: pig/branches/spark/test/org/apache/pig/test/TestPigRunner.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestPigRunner.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestPigRunner.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestPigRunner.java Fri Mar  4 18:17:39 2016
@@ -26,6 +26,9 @@ import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
 import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -423,6 +426,89 @@ public class TestPigRunner {
     }
 
     @Test
+    public void simpleMultiQueryTest3() throws Exception {
+        final String INPUT_FILE_2 = "input2";
+        final String OUTPUT_FILE_2 = "output2";
+
+        PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE_2));
+        w.println("3\t4\t5");
+        w.println("5\t6\t7");
+        w.println("3\t7\t8");
+        w.close();
+        Util.copyFromLocalToCluster(cluster, INPUT_FILE_2, INPUT_FILE_2);
+        new File(INPUT_FILE_2).delete();
+
+        w = new PrintWriter(new FileWriter(PIG_FILE));
+        w.println("A = load '" + INPUT_FILE + "' as (a0:int, a1:int, a2:int);");
+        w.println("A1 = load '" + INPUT_FILE_2 + "' as (a0:int, a1:int, a2:int);");
+        w.println("B = filter A by a0 == 3;");
+        w.println("C = filter A by a1 <=5;");
+        w.println("D = join C by a0, B by a0, A1 by a0 using 'replicated';");
+        w.println("store C into '" + OUTPUT_FILE + "';");
+        w.println("store D into '" + OUTPUT_FILE_2 + "';");
+        w.close();
+
+        try {
+            String[] args = { "-x", execType, PIG_FILE };
+            PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
+            assertTrue(stats.isSuccessful());
+            if (Util.isMapredExecType(cluster.getExecType())) {
+                assertEquals(3, stats.getJobGraph().size());
+            } else {
+                assertEquals(1, stats.getJobGraph().size());
+            }
+
+            // Each output file should include the following:
+            // output:
+            //   1\t2\t3\n
+            //   5\t3\t4\n
+            //   3\t4\t5\n
+            // output2:
+            //   3\t4\t5\t3\t4\t5\t3\t4\t5\n
+            //   3\t4\t5\t3\t4\t5\t3\t7\t8\n
+            //   3\t4\t5\t3\t7\t8\t3\t4\t5\n
+            //   3\t4\t5\t3\t4\t5\t3\t7\t8\n
+            final int numOfRecords1 = 3;
+            final int numOfRecords2 = 4;
+            final int numOfBytesWritten1 = 18;
+            final int numOfBytesWritten2 = 72;
+
+            assertEquals(numOfRecords1 + numOfRecords2, stats.getRecordWritten());
+            assertEquals(numOfBytesWritten1 + numOfBytesWritten2, stats.getBytesWritten());
+
+            List<String> outputNames = new ArrayList<String>(stats.getOutputNames());
+            assertTrue(outputNames.size() == 2);
+            Collections.sort(outputNames);
+            assertEquals(OUTPUT_FILE, outputNames.get(0));
+            assertEquals(OUTPUT_FILE_2, outputNames.get(1));
+            assertEquals(3, stats.getNumberRecords(OUTPUT_FILE));
+            assertEquals(4, stats.getNumberRecords(OUTPUT_FILE_2));
+
+            List<InputStats> inputStats = new ArrayList<InputStats>(stats.getInputStats());
+            assertTrue(inputStats.size() == 2);
+            Collections.sort(inputStats, new Comparator<InputStats>() {
+                @Override
+                public int compare(InputStats o1, InputStats o2) {
+                    return o1.getLocation().compareTo(o2.getLocation());
+                }
+            });
+            assertEquals(5, inputStats.get(0).getNumberRecords());
+            assertEquals(3, inputStats.get(1).getNumberRecords());
+            // For mapreduce, since hdfs bytes read includes replicated tables bytes read is wrong
+            // Since Tez does has only one load per job its values are correct
+            if (!Util.isMapredExecType(cluster.getExecType())) {
+                assertEquals(30, inputStats.get(0).getBytes());
+                assertEquals(18, inputStats.get(1).getBytes());
+            }
+        } finally {
+            new File(PIG_FILE).delete();
+            Util.deleteFile(cluster, INPUT_FILE_2);
+            Util.deleteFile(cluster, OUTPUT_FILE);
+            Util.deleteFile(cluster, OUTPUT_FILE_2);
+        }
+    }
+
+    @Test
     public void MQDepJobFailedTest() throws Exception {
         final String OUTPUT_FILE_2 = "output2";
         PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
@@ -974,7 +1060,7 @@ public class TestPigRunner {
                         "OUTPUT_RECORDS").getValue());
                 assertEquals(20,counter.getGroup(FS_COUNTER_GROUP).getCounterForName(
                         MRPigStatsUtil.HDFS_BYTES_WRITTEN).getValue());
-                assertEquals(30,counter.getGroup(FS_COUNTER_GROUP).getCounterForName(
+                assertEquals(new File(INPUT_FILE).length(),counter.getGroup(FS_COUNTER_GROUP).getCounterForName(
                         MRPigStatsUtil.HDFS_BYTES_READ).getValue());
             } else if (execType.equals("spark")) {
                 /** Uncomment code until changes of PIG-4788 are merged to master

Modified: pig/branches/spark/test/org/apache/pig/test/TestPigServerLocal.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestPigServerLocal.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestPigServerLocal.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestPigServerLocal.java Fri Mar  4 18:17:39 2016
@@ -50,8 +50,10 @@ import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.PropertiesUtil;
+import org.apache.pig.scripting.ScriptEngine;
 import org.apache.pig.tools.grunt.Grunt;
 import org.apache.pig.tools.grunt.GruntParser;
+import org.apache.pig.tools.pigstats.PigStats;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -249,13 +251,10 @@ public class TestPigServerLocal {
 
     @Test
     public void testSkipParseInRegisterForBatch() throws Throwable {
-        // numTimesInitiated = 10. 4 (once per registerQuery) + 6 (launchPlan->RandomSampleLoader,
-        // InputSizeReducerEstimator, getSplits->RandomSampleLoader,
-        // createRecordReader->RandomSampleLoader, getSplits, createRecordReader)
-        // numTimesSchemaCalled = 4 (once per registerQuery)
         if (Util.getLocalTestMode().toString().startsWith("TEZ")) {
-            _testSkipParseInRegisterForBatch(false, 6, 4);
-            _testSkipParseInRegisterForBatch(true, 3, 1);
+            _testSkipParseInRegisterForBatch(false, 8, 4);
+            _testSkipParseInRegisterForBatch(true, 5, 1);
+            _testParseBatchWithScripting(5, 1);
         } else if (Util.getLocalTestMode().toString().startsWith("SPARK")) {
             // 6 = 4 (Once per registerQuery) + 2 (SortConverter , PigRecordReader)
             // 4 (Once per registerQuery)
@@ -265,13 +264,18 @@ public class TestPigServerLocal {
             // 1 (registerQuery)
             _testSkipParseInRegisterForBatch(true, 3, 1);
         } else {
+            // numTimesInitiated = 10. 4 (once per registerQuery) + 6 (launchPlan->RandomSampleLoader,
+            // InputSizeReducerEstimator, getSplits->RandomSampleLoader,
+            // createRecordReader->RandomSampleLoader, getSplits, createRecordReader)
+            // numTimesSchemaCalled = 4 (once per registerQuery)
             _testSkipParseInRegisterForBatch(false, 10, 4);
+            // numTimesInitiated = 7 (parseAndBuild, launchPlan->RandomSampleLoader,
+            // InputSizeReducerEstimator, getSplits->RandomSampleLoader,
+            // createRecordReader->RandomSampleLoader, getSplits, createRecordReader)
+            // numTimesSchemaCalled = 1 (parseAndBuild)
             _testSkipParseInRegisterForBatch(true, 7, 1);
+            _testParseBatchWithScripting(7, 1);
         }
-        // numTimesInitiated = 7 (parseAndBuild, launchPlan->RandomSampleLoader,
-        // InputSizeReducerEstimator, getSplits->RandomSampleLoader,
-        // createRecordReader->RandomSampleLoader, getSplits, createRecordReader)
-        // numTimesSchemaCalled = 1 (parseAndBuild)
     }
 
     @Test
@@ -330,6 +334,53 @@ public class TestPigServerLocal {
         }
 
         assertEquals(numTimesInitiated, MockTrackingStorage.numTimesInitiated);
+        assertEquals(numTimesSchemaCalled, MockTrackingStorage.numTimesSchemaCalled);
+        List<Tuple> out = data.get("bar");
+        assertEquals(2, out.size());
+        assertEquals(tuple("a", 1, "b"), out.get(0));
+        assertEquals(tuple("b", 2, "c"), out.get(1));
+    }
+
+    private void _testParseBatchWithScripting(int numTimesInitiated, int numTimesSchemaCalled) throws Throwable {
+        MockTrackingStorage.numTimesInitiated = 0;
+        MockTrackingStorage.numTimesSchemaCalled = 0;
+
+        String[] script = {
+                "#!/usr/bin/python",
+                "from org.apache.pig.scripting import *",
+                "P = Pig.compile(\"\"\"" +
+                        "A = load 'foo' USING org.apache.pig.test.TestPigServerLocal\\$MockTrackingStorage();" +
+                        "B = order A by $0,$1,$2;" +
+                        "C = LIMIT B 2;" +
+                        "store C into 'bar' USING mock.Storage();" +
+                        "\"\"\")",
+                "Q = P.bind()",
+                "stats = Q.runSingle()",
+                "if stats.isSuccessful():",
+                "\tprint 'success!'",
+                "else:",
+                "\traise 'failed'"
+        };
+
+        Properties properties = new Properties();
+        properties.setProperty("io.sort.mb", "2");
+        PigContext pigContext = new PigContext(Util.getLocalTestMode(), properties);
+        PigServer pigServer = new PigServer(pigContext);
+        Data data = resetData(pigContext);
+        data.set("foo", tuple("a", 1, "b"), tuple("b", 2, "c"), tuple("c", 3, "d"));
+
+        String scriptFile = tempDir + File.separator + "_testParseBatchWithScripting.py";
+        Util.createLocalInputFile(scriptFile , script);
+        ScriptEngine scriptEngine = ScriptEngine.getInstance("jython");
+        Map<String, List<PigStats>> statsMap = scriptEngine.run(pigServer.getPigContext(), scriptFile);
+
+        for (List<PigStats> stats : statsMap.values()) {
+            for (PigStats s : stats) {
+                assertTrue(s.isSuccessful());
+            }
+        }
+
+        assertEquals(numTimesInitiated, MockTrackingStorage.numTimesInitiated);
         assertEquals(numTimesSchemaCalled, MockTrackingStorage.numTimesSchemaCalled);
         List<Tuple> out = data.get("bar");
         assertEquals(2, out.size());

Modified: pig/branches/spark/test/org/apache/pig/test/TestPigStats.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestPigStats.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestPigStats.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestPigStats.java Fri Mar  4 18:17:39 2016
@@ -26,9 +26,6 @@ import java.io.FileWriter;
 import java.io.IOException;
 import java.io.PrintWriter;
 
-import junit.framework.Assert;
-
-import org.apache.commons.codec.binary.Base64;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -38,6 +35,7 @@ import org.apache.pig.backend.executione
 import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.newplan.logical.relational.LogicalPlan;
 import org.apache.pig.tools.pigstats.PigStats;
 import org.junit.Ignore;
@@ -46,10 +44,10 @@ import org.junit.Test;
 @Ignore
 abstract public class TestPigStats  {
 
-    private static final Log LOG = LogFactory.getLog(TestPigStats.class);
+    protected static final Log LOG = LogFactory.getLog(TestPigStats.class);
+
+    abstract public void addSettingsToConf(Configuration conf, String scriptFileName) throws IOException;
 
-    abstract public void addSettingsToConf(Configuration conf, String scriptFileName);
-    
     @Test
     public void testPigScriptInConf() throws Exception {
         PrintWriter w = new PrintWriter(new FileWriter("test.pig"));
@@ -58,22 +56,22 @@ abstract public class TestPigStats  {
         w.println("register /mydir/lib/jackson-core-asl-1.4.2.jar");
         w.println("register /mydir/lib/jackson-mapper-asl-1.4.2.jar");
         w.close();
-        
+
         Configuration conf = new Configuration();
         addSettingsToConf(conf, "test.pig");
-        
+
         String s = conf.get("pig.script");
-        String script = new String(Base64.decodeBase64(s.getBytes()));
-        
-        String expected = 
+        String script = (String) ObjectSerializer.deserialize(s);
+
+        String expected =
             "register /mydir/sath.jar\n" +
             "register /mydir/lib/hadoop-tools-0.20.201.0-SNAPSHOT.jar\n" +
             "register /mydir/lib/jackson-core-asl-1.4.2.jar\n"  +
             "register /mydir/lib/jackson-mapper-asl-1.4.2.jar\n";
-        
-        Assert.assertEquals(expected, script);
+
+        assertEquals(expected, script);
     }
-    
+
     @Test
     public void testJythonScriptInConf() throws Exception {
         String[] script = {
@@ -90,16 +88,16 @@ abstract public class TestPigStats  {
                 "else:",
                 "\traise 'failed'"
         };
-        
+
         Util.createLocalInputFile( "testScript.py", script);
-        
+
         Configuration conf = new Configuration();
         addSettingsToConf(conf, "testScript.py");
-        
+
         String s = conf.get("pig.script");
-        String actual = new String(Base64.decodeBase64(s.getBytes()));
-        
-        String expected = 
+        String actual = (String) ObjectSerializer.deserialize(s);
+
+        String expected =
             "#!/usr/bin/python\n" +
             "from org.apache.pig.scripting import *\n" +
             "Pig.fs(\"rmr simple_out\")\n" +
@@ -112,10 +110,10 @@ abstract public class TestPigStats  {
             "\tprint 'success!'\n" +
             "else:\n" +
             "\traise 'failed'\n";
-        
-        Assert.assertEquals(expected, actual);
+
+        assertEquals(expected, actual);
     }
-    
+
     @Test
     public void testBytesWritten_JIRA_1027() throws Exception {
 
@@ -157,7 +155,7 @@ abstract public class TestPigStats  {
             pig.registerQuery("D = order C by $1;");
             pig.registerQuery("E = limit D 10;");
             pig.registerQuery("store E into 'alias_output';");
-            
+
             LogicalPlan lp = getLogicalPlan(pig);
             lp.optimize(pig.getPigContext());
             PhysicalPlan pp = ((HExecutionEngine)pig.getPigContext().getExecutionEngine()).compile(lp,
@@ -201,7 +199,7 @@ abstract public class TestPigStats  {
             }
         }
     }
-    
+
     private void deleteDirectory(File dir) {
         try {
             FileUtils.deleteDirectory(dir);
@@ -215,5 +213,5 @@ abstract public class TestPigStats  {
         buildLp.setAccessible(true);
         return (LogicalPlan ) buildLp.invoke( pig );
     }
-     
+
 }

Modified: pig/branches/spark/test/org/apache/pig/test/TestPigStatsMR.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestPigStatsMR.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestPigStatsMR.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestPigStatsMR.java Fri Mar  4 18:17:39 2016
@@ -18,24 +18,62 @@
 package org.apache.pig.test;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
 
 import java.io.File;
+import java.io.IOException;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.pig.PigServer;
 import org.apache.pig.backend.executionengine.ExecJob;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.plan.OperatorKey;
-import org.apache.pig.tools.pigstats.ScriptState;
+import org.apache.pig.tools.pigstats.PigStats;
 import org.apache.pig.tools.pigstats.PigStats.JobGraph;
+import org.apache.pig.tools.pigstats.ScriptState;
 import org.apache.pig.tools.pigstats.mapreduce.MRScriptState;
+import org.junit.Test;
 
 public class TestPigStatsMR extends TestPigStats {
+
+    @Override
+    @Test
+    public void testBytesWritten_JIRA_1027() throws Exception {
+
+        FileLocalizer.setInitialized(false);
+        // This test cannot be run in MR local mode due to lack of counters
+        MiniGenericCluster cluster = MiniGenericCluster.buildCluster();
+        try {
+            String filePath = "/tmp/" + this.getClass().getName() + "_"
+                    + "testBytesWritten_JIRA_1027";
+
+            PigServer pig = new PigServer(cluster.getExecType(), cluster.getProperties());
+            String inputFile = "test/org/apache/pig/test/data/passwd";
+            Util.copyFromLocalToCluster(cluster, inputFile, inputFile);
+            pig.registerQuery("A = load '" + inputFile + "';");
+            ExecJob job = pig.store("A", filePath);
+            PigStats stats = job.getStatistics();
+            Path dataFile = Util.getFirstPartFile(new Path(filePath));
+            FileStatus fs = cluster.getFileSystem().getFileStatus(dataFile);
+            assertEquals(fs.getLen(), stats.getBytesWritten());
+        } catch (IOException e) {
+            LOG.error("Error while generating file", e);
+            fail("Encountered IOException");
+        } finally {
+            FileLocalizer.setInitialized(false);
+            cluster.shutDown();
+        }
+    }
+
     @Override
-    public void addSettingsToConf(Configuration conf, String scriptFileName) {
+    public void addSettingsToConf(Configuration conf, String scriptFileName) throws IOException {
         MRScriptState ss = MRScriptState.get();
         ss.setScript(new File(scriptFileName));
         MapReduceOper mro = new MapReduceOper(new OperatorKey());
@@ -71,7 +109,7 @@ public class TestPigStatsMR extends Test
         compile.setAccessible(true);
         return (MROperPlan) compile.invoke(launcher, new Object[] { pp, ctx });
     }
-           
+
     private static String getAlias(MapReduceOper mro) throws Exception {
         ScriptState ss = ScriptState.get();
         java.lang.reflect.Method getAlias = ss.getClass()

Modified: pig/branches/spark/test/org/apache/pig/test/TestRank1.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestRank1.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestRank1.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestRank1.java Fri Mar  4 18:17:39 2016
@@ -19,6 +19,7 @@ package org.apache.pig.test;
 
 import static org.apache.pig.builtin.mock.Storage.resetData;
 import static org.apache.pig.builtin.mock.Storage.tuple;
+import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 import java.util.List;
@@ -30,6 +31,12 @@ import org.apache.pig.data.TupleFactory;
 import org.junit.Before;
 import org.junit.Test;
 
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMultiset;
+import com.google.common.collect.ImmutableSortedSet;
+import com.google.common.collect.TreeMultiset;
+import com.google.common.collect.Multiset;
+
 public class TestRank1 {
     private static TupleFactory tf = TupleFactory.getInstance();
     private static PigServer pigServer;
@@ -70,20 +77,21 @@ public class TestRank1 {
             + "store C into 'result' using mock.Storage();";
 
         Util.registerMultiLineQuery(pigServer, query);
-        List<Tuple> expected = Util.getTuplesFromConstantTupleStrings(new String[]{
-                "(1L,'A',1,'N')",
-                "(2L,'B',2,'N')",
-                "(3L,'C',3,'M')",
-                "(4L,'D',4,'P')",
-                "(5L,'E',4,'Q')",
-                "(6L,'E',4,'Q')",
-                "(7L,'F',8,'Q')",
-                "(8L,'F',7,'Q')",
-                "(9L,'F',8,'T')",
-                "(10L,'F',8,'Q')",
-                "(11L,'G',10,'V')"
-        });
-        Util.checkQueryOutputsAfterSort(data.get("result"), expected);
+
+        Multiset<Tuple> expected = ImmutableMultiset.of(
+                tf.newTuple(ImmutableList.of((long) 1, "A", 1, "N")),
+                tf.newTuple(ImmutableList.of((long) 2, "B", 2, "N")),
+                tf.newTuple(ImmutableList.of((long) 3, "C", 3, "M")),
+                tf.newTuple(ImmutableList.of((long) 4, "D", 4, "P")),
+                tf.newTuple(ImmutableList.of((long) 5, "E", 4, "Q")),
+                tf.newTuple(ImmutableList.of((long) 6, "E", 4, "Q")),
+                tf.newTuple(ImmutableList.of((long) 7, "F", 8, "Q")),
+                tf.newTuple(ImmutableList.of((long) 8, "F", 7, "Q")),
+                tf.newTuple(ImmutableList.of((long) 9, "F", 8, "T")),
+                tf.newTuple(ImmutableList.of((long) 10, "F", 8, "Q")),
+                tf.newTuple(ImmutableList.of((long) 11, "G", 10, "V")));
+
+        verifyExpected(data.get("result"), expected);
     }
 
     @Test
@@ -93,22 +101,23 @@ public class TestRank1 {
             + "store B into 'result' using mock.Storage();";
 
         Util.registerMultiLineQuery(pigServer, query);
-        List<Tuple> expected = Util.getTuplesFromConstantTupleStrings(new String[]{
-                "(1L,'Michael', 'Blythe', 1,1, 1, 1, 4557045.046, 98027)",
-                "(2L,'Linda','Mitchell', 2, 1, 1, 1, 5200475.231, 98027)",
-                "(3L,'Jillian', 'Carson', 3,1, 1, 1, 3857163.633, 98027)",
-                "(4L,'Garrett','Vargas', 4, 1, 1, 1, 1764938.986, 98027)",
-                "(5L,'Tsvi', 'Reiter',5, 1, 1, 2, 2811012.715, 98027)",
-                "(6L,'Shu', 'Ito', 6,6, 2, 2, 3018725.486, 98055)",
-                "(7L,'Jose', 'Saraiva',7, 6, 2, 2, 3189356.247, 98055)",
-                "(8L,'David','Campbell', 8, 6, 2, 3, 3587378.426, 98055)",
-                "(9L,'Tete', 'Mensa-Annan',9, 6, 2, 3, 1931620.184, 98055)",
-                "(10L, 'Lynn','Tsoflias', 10, 6, 2, 3, 1758385.926, 98055)",
-                "(11L, 'Rachel', 'Valdez', 11,6, 2, 4, 2241204.042, 98055)",
-                "(12L, 'Jae', 'Pak', 12,6, 2, 4, 5015682.375, 98055)",
-                "(13L, 'Ranjit','Varkey Chudukatil', 13, 6, 2, 4, 3827950.238,98055)"
-        });
-        Util.checkQueryOutputsAfterSort(data.get("result"), expected);
+
+        Multiset<Tuple> expected = ImmutableMultiset.of(
+                tf.newTuple(ImmutableList.of((long) 1, "Michael", "Blythe", 1,1, 1, 1, 4557045.046, 98027)),
+                tf.newTuple(ImmutableList.of((long) 2, "Linda","Mitchell", 2, 1, 1, 1, 5200475.231, 98027)),
+                tf.newTuple(ImmutableList.of((long) 3, "Jillian", "Carson", 3,1, 1, 1, 3857163.633, 98027)),
+                tf.newTuple(ImmutableList.of((long) 4, "Garrett","Vargas", 4, 1, 1, 1, 1764938.986, 98027)),
+                tf.newTuple(ImmutableList.of((long) 5, "Tsvi", "Reiter",5, 1, 1, 2, 2811012.715, 98027)),
+                tf.newTuple(ImmutableList.of((long) 6, "Shu", "Ito", 6,6, 2, 2, 3018725.486, 98055)),
+                tf.newTuple(ImmutableList.of((long) 7, "Jose", "Saraiva",7, 6, 2, 2, 3189356.247, 98055)),
+                tf.newTuple(ImmutableList.of((long) 8, "David","Campbell", 8, 6, 2, 3, 3587378.426, 98055)),
+                tf.newTuple(ImmutableList.of((long) 9, "Tete", "Mensa-Annan",9, 6, 2, 3, 1931620.184, 98055)),
+                tf.newTuple(ImmutableList.of((long) 10, "Lynn","Tsoflias", 10, 6, 2, 3, 1758385.926, 98055)),
+                tf.newTuple(ImmutableList.of((long) 11, "Rachel", "Valdez", 11,6, 2, 4, 2241204.042, 98055)),
+                tf.newTuple(ImmutableList.of((long) 12, "Jae", "Pak", 12,6, 2, 4, 5015682.375, 98055)),
+                tf.newTuple(ImmutableList.of((long) 13, "Ranjit","Varkey Chudukatil", 13, 6, 2, 4, 3827950.238,98055)));
+
+        verifyExpected(data.get("result"), expected);
     }
 
     @Test
@@ -118,20 +127,21 @@ public class TestRank1 {
             + "store C into 'result' using mock.Storage();";
 
         Util.registerMultiLineQuery(pigServer, query);
-        List<Tuple> expected = Util.getTuplesFromConstantTupleStrings(new String[]{
-                "(1L,'C',3,'M')",
-                "(2L,'A',1,'N')",
-                "(2L,'B',2,'N')",
-                "(4L,'D',4,'P')",
-                "(5L,'E',4,'Q')",
-                "(5L,'E',4,'Q')",
-                "(5L,'F',8,'Q')",
-                "(5L,'F',7,'Q')",
-                "(5L,'F',8,'Q')",
-                "(10L,'F',8,'T')",
-                "(11L,'G',10,'V')"
-        });
-        Util.checkQueryOutputsAfterSort(data.get("result"), expected);
+
+        Multiset<Tuple> expected = ImmutableMultiset.of(
+                tf.newTuple(ImmutableList.of((long) 1, "C", 3, "M")),
+                tf.newTuple(ImmutableList.of((long) 2, "A", 1, "N")),
+                tf.newTuple(ImmutableList.of((long) 2, "B", 2, "N")),
+                tf.newTuple(ImmutableList.of((long) 4, "D", 4, "P")),
+                tf.newTuple(ImmutableList.of((long) 5, "E", 4, "Q")),
+                tf.newTuple(ImmutableList.of((long) 5, "E", 4, "Q")),
+                tf.newTuple(ImmutableList.of((long) 5, "F", 8, "Q")),
+                tf.newTuple(ImmutableList.of((long) 5, "F", 7, "Q")),
+                tf.newTuple(ImmutableList.of((long) 5, "F", 8, "Q")),
+                tf.newTuple(ImmutableList.of((long) 10, "F", 8, "T")),
+                tf.newTuple(ImmutableList.of((long) 11, "G", 10, "V")));
+
+        verifyExpected(data.get("result"), expected);
     }
 
     @Test
@@ -141,20 +151,21 @@ public class TestRank1 {
             + "store C into 'result' using mock.Storage();";
 
         Util.registerMultiLineQuery(pigServer, query);
-        List<Tuple> expected = Util.getTuplesFromConstantTupleStrings(new String[]{
-                "(1L,'A',1,'N')",
-                "(2L,'B',2,'N')",
-                "(3L,'C',3,'M')",
-                "(4L,'D',4,'P')",
-                "(4L,'E',4,'Q')",
-                "(4L,'E',4,'Q')",
-                "(7L,'F',7,'Q')",
-                "(8L,'F',8,'Q')",
-                "(8L,'F',8,'Q')",
-                "(8L,'F',8,'T')",
-                "(11L,'G',10,'V')"
-        });
-        Util.checkQueryOutputsAfterSort(data.get("result"), expected);
+
+        Multiset<Tuple> expected = ImmutableMultiset.of(
+                tf.newTuple(ImmutableList.of((long) 1, "A", 1, "N")),
+                tf.newTuple(ImmutableList.of((long) 2, "B", 2, "N")),
+                tf.newTuple(ImmutableList.of((long) 3, "C", 3, "M")),
+                tf.newTuple(ImmutableList.of((long) 4, "D", 4, "P")),
+                tf.newTuple(ImmutableList.of((long) 4, "E", 4, "Q")),
+                tf.newTuple(ImmutableList.of((long) 4, "E", 4, "Q")),
+                tf.newTuple(ImmutableList.of((long) 7, "F", 7, "Q")),
+                tf.newTuple(ImmutableList.of((long) 8, "F", 8, "Q")),
+                tf.newTuple(ImmutableList.of((long) 8, "F", 8, "Q")),
+                tf.newTuple(ImmutableList.of((long) 8, "F", 8, "T")),
+                tf.newTuple(ImmutableList.of((long) 11, "G", 10, "V")));
+
+        verifyExpected(data.get("result"), expected);
     }
 
     @Test
@@ -164,20 +175,21 @@ public class TestRank1 {
             + "store C into 'result' using mock.Storage();";
 
         Util.registerMultiLineQuery(pigServer, query);
-        List<Tuple> expected = Util.getTuplesFromConstantTupleStrings(new String[]{
-                "(1L,'G',10,'V')",
-                "(2L,'F',8,'T')",
-                "(2L,'F',8,'Q')",
-                "(2L,'F',8,'Q')",
-                "(2L,'F',7,'Q')",
-                "(6L,'E',4,'Q')",
-                "(6L,'E',4,'Q')",
-                "(8L,'D',4,'P')",
-                "(9L,'C',3,'M')",
-                "(10L,'B',2,'N')",
-                "(11L,'A',1,'N')"
-        });
-        Util.checkQueryOutputsAfterSort(data.get("result"), expected);
+
+        Multiset<Tuple> expected = ImmutableMultiset.of(
+                tf.newTuple(ImmutableList.of((long) 1, "G", 10, "V")),
+                tf.newTuple(ImmutableList.of((long) 2, "F", 8, "T")),
+                tf.newTuple(ImmutableList.of((long) 2, "F", 8, "Q")),
+                tf.newTuple(ImmutableList.of((long) 2, "F", 8, "Q")),
+                tf.newTuple(ImmutableList.of((long) 2, "F", 7, "Q")),
+                tf.newTuple(ImmutableList.of((long) 6, "E", 4, "Q")),
+                tf.newTuple(ImmutableList.of((long) 6, "E", 4, "Q")),
+                tf.newTuple(ImmutableList.of((long) 8, "D", 4, "P")),
+                tf.newTuple(ImmutableList.of((long) 9, "C", 3, "M")),
+                tf.newTuple(ImmutableList.of((long) 10, "B", 2, "N")),
+                tf.newTuple(ImmutableList.of((long) 11, "A", 1, "N")));
+
+        verifyExpected(data.get("result"), expected);
     }
 
     @Test
@@ -187,22 +199,23 @@ public class TestRank1 {
             + "store C into 'result' using mock.Storage();";
 
         Util.registerMultiLineQuery(pigServer, query);
-        List<Tuple> expected = Util.getTuplesFromConstantTupleStrings(new String[]{
-                "(1L,'Michael','Blythe',1,1,1,1,4557045.046,98027)",
-                "(1L,'Linda','Mitchell',2,1,1,1,5200475.231,98027)",
-                "(1L,'Jillian','Carson',3,1,1,1,3857163.633,98027)",
-                "(1L,'Garrett','Vargas',4,1,1,1,1764938.986,98027)",
-                "(1L,'Tsvi','Reiter',5,1,1,2,2811012.715,98027)",
-                "(6L,'Shu','Ito',6,6,2,2,3018725.486,98055)",
-                "(6L,'Jose','Saraiva',7,6,2,2,3189356.247,98055)",
-                "(6L,'David','Campbell',8,6,2,3,3587378.426,98055)",
-                "(6L,'Tete','Mensa-Annan',9,6,2,3,1931620.184,98055)",
-                "(6L,'Lynn','Tsoflias',10,6,2,3,1758385.926,98055)",
-                "(6L,'Rachel','Valdez',11,6,2,4,2241204.042,98055)",
-                "(6L,'Jae','Pak',12,6,2,4,5015682.375,98055)",
-                "(6L,'Ranjit','Varkey Chudukatil',13,6,2,4,3827950.238,98055)",
-        });
-        Util.checkQueryOutputsAfterSort(data.get("result"), expected);
+
+        Multiset<Tuple> expected = ImmutableMultiset.of(
+                tf.newTuple(ImmutableList.of((long) 1, "Michael", "Blythe", 1,1, 1, 1, 4557045.046, 98027)),
+                tf.newTuple(ImmutableList.of((long) 1, "Linda","Mitchell", 2, 1, 1, 1, 5200475.231, 98027)),
+                tf.newTuple(ImmutableList.of((long) 1, "Jillian", "Carson", 3,1, 1, 1, 3857163.633, 98027)),
+                tf.newTuple(ImmutableList.of((long) 1, "Garrett","Vargas", 4, 1, 1, 1, 1764938.986, 98027)),
+                tf.newTuple(ImmutableList.of((long) 1, "Tsvi", "Reiter",5, 1, 1, 2, 2811012.715, 98027)),
+                tf.newTuple(ImmutableList.of((long) 6, "Shu", "Ito", 6,6, 2, 2, 3018725.486, 98055)),
+                tf.newTuple(ImmutableList.of((long) 6, "Jose", "Saraiva",7, 6, 2, 2, 3189356.247, 98055)),
+                tf.newTuple(ImmutableList.of((long) 6, "David","Campbell", 8, 6, 2, 3, 3587378.426, 98055)),
+                tf.newTuple(ImmutableList.of((long) 6, "Tete", "Mensa-Annan",9, 6, 2, 3, 1931620.184, 98055)),
+                tf.newTuple(ImmutableList.of((long) 6, "Lynn","Tsoflias", 10, 6, 2, 3, 1758385.926, 98055)),
+                tf.newTuple(ImmutableList.of((long) 6, "Rachel", "Valdez", 11,6, 2, 4, 2241204.042, 98055)),
+                tf.newTuple(ImmutableList.of((long) 6, "Jae", "Pak", 12,6, 2, 4, 5015682.375, 98055)),
+                tf.newTuple(ImmutableList.of((long) 6, "Ranjit","Varkey Chudukatil", 13, 6, 2, 4, 3827950.238,98055)));
+
+        verifyExpected(data.get("result"), expected);
     }
 
     @Test
@@ -212,22 +225,23 @@ public class TestRank1 {
             + "store C into 'result' using mock.Storage();";
 
         Util.registerMultiLineQuery(pigServer, query);
-        List<Tuple> expected = Util.getTuplesFromConstantTupleStrings(new String[]{
-                "(1L,'David','Campbell',8,6,2,3,3587378.426,98055)",
-                "(2L,'Garrett','Vargas',4,1,1,1,1764938.986,98027)",
-                "(3L,'Jae','Pak',12,6,2,4,5015682.375,98055)",
-                "(4L,'Jillian','Carson',3,1,1,1,3857163.633,98027)",
-                "(5L,'Jose','Saraiva',7,6,2,2,3189356.247,98055)",
-                "(6L,'Linda','Mitchell',2,1,1,1,5200475.231,98027)",
-                "(7L,'Lynn','Tsoflias',10,6,2,3,1758385.926,98055)",
-                "(8L,'Michael','Blythe',1,1,1,1,4557045.046,98027)",
-                "(9L,'Rachel','Valdez',11,6,2,4,2241204.042,98055)",
-                "(10L,'Ranjit','Varkey Chudukatil',13,6,2,4,3827950.238,98055)",
-                "(11L,'Shu','Ito',6,6,2,2,3018725.486,98055)",
-                "(12L,'Tete','Mensa-Annan',9,6,2,3,1931620.184,98055)",
-                "(13L,'Tsvi','Reiter',5,1,1,2,2811012.715,98027)"
-        });
-        Util.checkQueryOutputsAfterSort(data.get("result"), expected);
+
+        Multiset<Tuple> expected = ImmutableMultiset.of(
+                tf.newTuple(ImmutableList.of((long) 1, "David", "Campbell", 8,6, 2, 3, 3587378.426, 98055)),
+                tf.newTuple(ImmutableList.of((long) 2, "Garrett","Vargas", 4, 1, 1, 1, 1764938.986, 98027)),
+                tf.newTuple(ImmutableList.of((long) 3, "Jae", "Pak", 12,6, 2, 4, 5015682.375, 98055)),
+                tf.newTuple(ImmutableList.of((long) 4, "Jillian","Carson", 3, 1, 1, 1, 3857163.633, 98027)),
+                tf.newTuple(ImmutableList.of((long) 5, "Jose", "Saraiva",7, 6, 2, 2, 3189356.247, 98055)),
+                tf.newTuple(ImmutableList.of((long) 6, "Linda","Mitchell", 2, 1, 1, 1, 5200475.231, 98027)),
+                tf.newTuple(ImmutableList.of((long) 7, "Lynn", "Tsoflias", 10,6, 2, 3, 1758385.926, 98055)),
+                tf.newTuple(ImmutableList.of((long) 8, "Michael","Blythe", 1, 1, 1, 1, 4557045.046, 98027)),
+                tf.newTuple(ImmutableList.of((long) 9, "Rachel","Valdez", 11, 6, 2, 4, 2241204.042, 98055)),
+                tf.newTuple(ImmutableList.of((long) 10, "Ranjit","Varkey Chudukatil", 13, 6, 2, 4, 3827950.238, 98055)),
+                tf.newTuple(ImmutableList.of((long) 11, "Shu", "Ito", 6, 6, 2, 2, 3018725.486,98055)),
+                tf.newTuple(ImmutableList.of((long) 12, "Tete", "Mensa-Annan", 9, 6, 2, 3,1931620.184, 98055)),
+                tf.newTuple(ImmutableList.of((long) 13, "Tsvi", "Reiter", 5, 1, 1, 2, 2811012.715,98027)));
+
+        verifyExpected(data.get("result"), expected);
     }
 
     @Test
@@ -237,22 +251,23 @@ public class TestRank1 {
             + "store C into 'result' using mock.Storage();";
 
         Util.registerMultiLineQuery(pigServer, query);
-        List<Tuple> expected = Util.getTuplesFromConstantTupleStrings(new String[]{
-                "(1L,'David','Campbell',8,6,2,3,3587378.426,98055)",
-                "(2L,'Garrett','Vargas',4,1,1,1,1764938.986,98027)",
-                "(3L,'Jae','Pak',12,6,2,4,5015682.375,98055)",
-                "(4L,'Jillian','Carson',3,1,1,1,3857163.633,98027)",
-                "(5L,'Jose','Saraiva',7,6,2,2,3189356.247,98055)",
-                "(6L,'Linda','Mitchell',2,1,1,1,5200475.231,98027)",
-                "(7L,'Lynn','Tsoflias',10,6,2,3,1758385.926,98055)",
-                "(8L,'Michael','Blythe',1,1,1,1,4557045.046,98027)",
-                "(9L,'Rachel','Valdez',11,6,2,4,2241204.042,98055)",
-                "(10L,'Ranjit','Varkey Chudukatil',13,6,2,4,3827950.238,98055)",
-                "(11L,'Shu','Ito',6,6,2,2,3018725.486,98055)",
-                "(12L,'Tete','Mensa-Annan',9,6,2,3,1931620.184,98055)",
-                "(13L,'Tsvi','Reiter',5,1,1,2,2811012.715,98027)"
-        });
-        Util.checkQueryOutputsAfterSort(data.get("result"), expected);
+
+        Multiset<Tuple> expected = ImmutableMultiset.of(
+                tf.newTuple(ImmutableList.of((long) 1, "David", "Campbell", 8, 6, 2, 3, 3587378.426, 98055)),
+                tf.newTuple(ImmutableList.of((long) 2, "Garrett","Vargas", 4, 1, 1, 1, 1764938.986, 98027)),
+                tf.newTuple(ImmutableList.of((long) 3, "Jae", "Pak", 12,6, 2, 4, 5015682.375, 98055)),
+                tf.newTuple(ImmutableList.of((long) 4, "Jillian","Carson", 3, 1, 1, 1, 3857163.633, 98027)),
+                tf.newTuple(ImmutableList.of((long) 5, "Jose", "Saraiva",7, 6, 2, 2, 3189356.247, 98055)),
+                tf.newTuple(ImmutableList.of((long) 6, "Linda","Mitchell", 2, 1, 1, 1, 5200475.231, 98027)),
+                tf.newTuple(ImmutableList.of((long) 7, "Lynn", "Tsoflias", 10,6, 2, 3, 1758385.926, 98055)),
+                tf.newTuple(ImmutableList.of((long) 8, "Michael","Blythe", 1, 1, 1, 1, 4557045.046, 98027)),
+                tf.newTuple(ImmutableList.of((long) 9, "Rachel","Valdez", 11, 6, 2, 4, 2241204.042, 98055)),
+                tf.newTuple(ImmutableList.of((long) 10, "Ranjit","Varkey Chudukatil", 13, 6, 2, 4, 3827950.238, 98055)),
+                tf.newTuple(ImmutableList.of((long) 11, "Shu", "Ito", 6, 6, 2, 2, 3018725.486,98055)),
+                tf.newTuple(ImmutableList.of((long) 12, "Tete", "Mensa-Annan", 9, 6, 2, 3,1931620.184, 98055)),
+                tf.newTuple(ImmutableList.of((long) 13, "Tsvi", "Reiter", 5, 1, 1, 2, 2811012.715,98027)));
+
+        verifyExpected(data.get("result"), expected);
     }
 
     @Test
@@ -262,20 +277,38 @@ public class TestRank1 {
             + "store C into 'result' using mock.Storage();";
 
         Util.registerMultiLineQuery(pigServer, query);
-        List<Tuple> expected = Util.getTuplesFromConstantTupleStrings(new String[]{
-                "(1L,'A',1,'N')",
-                "(2L,'B',2,'N')",
-                "(3L,'C',3,'M')",
-                "(4L,'D',4,'P')",
-                "(5L,'E',4,'Q')",
-                "(5L,'E',4,'Q')",
-                "(7L,'F',7,'Q')",
-                "(8L,'F',8,'Q')",
-                "(8L,'F',8,'Q')",
-                "(10L,'F',8,'T')",
-                "(11L,'G',10,'V')"
-        });
-        Util.checkQueryOutputsAfterSort(data.get("result"), expected);
+
+        Multiset<Tuple> expected = ImmutableMultiset.of(
+                tf.newTuple(ImmutableList.of((long) 1, "A", 1, "N")),
+                tf.newTuple(ImmutableList.of((long) 2, "B", 2, "N")),
+                tf.newTuple(ImmutableList.of((long) 3, "C", 3, "M")),
+                tf.newTuple(ImmutableList.of((long) 4, "D", 4, "P")),
+                tf.newTuple(ImmutableList.of((long) 5, "E", 4, "Q")),
+                tf.newTuple(ImmutableList.of((long) 5, "E", 4, "Q")),
+                tf.newTuple(ImmutableList.of((long) 7, "F", 7, "Q")),
+                tf.newTuple(ImmutableList.of((long) 8, "F", 8, "Q")),
+                tf.newTuple(ImmutableList.of((long) 8, "F", 8, "Q")),
+                tf.newTuple(ImmutableList.of((long) 10, "F", 8, "T")),
+                tf.newTuple(ImmutableList.of((long) 11, "G", 10, "V")));
+
+        verifyExpected(data.get("result"), expected);
     }
 
+    public void verifyExpected(List<Tuple> out, Multiset<Tuple> expected) {
+        Multiset<Tuple> resultMultiset = TreeMultiset.create();
+        for (Tuple tup : out) {
+          resultMultiset.add(tup);
+        }
+
+        StringBuilder error = new StringBuilder("Result does not match.\nActual result:\n");
+        for (Tuple tup : resultMultiset.elementSet() ) {
+            error.append(tup).append(" x ").append(resultMultiset.count(tup)).append("\n");
+        }
+        error.append("Expceted result:\n");
+        for (Tuple tup : ImmutableSortedSet.copyOf(expected) ) {
+            error.append(tup).append(" x ").append(expected.count(tup)).append("\n");
+        }
+
+        assertTrue(error.toString(), resultMultiset.equals(expected));
+    }
 }

Modified: pig/branches/spark/test/org/apache/pig/test/TestRank2.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestRank2.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestRank2.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestRank2.java Fri Mar  4 18:17:39 2016
@@ -19,6 +19,7 @@ package org.apache.pig.test;
 
 import static org.apache.pig.builtin.mock.Storage.resetData;
 import static org.apache.pig.builtin.mock.Storage.tuple;
+import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 import java.util.List;
@@ -30,6 +31,11 @@ import org.apache.pig.data.TupleFactory;
 import org.junit.Before;
 import org.junit.Test;
 
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMultiset;
+import com.google.common.collect.ImmutableSortedSet;
+import com.google.common.collect.TreeMultiset;
+import com.google.common.collect.Multiset;
 
 public class TestRank2 {
     private static PigServer pigServer;
@@ -71,20 +77,21 @@ public class TestRank2 {
             + "store C into 'result' using mock.Storage();";
 
         Util.registerMultiLineQuery(pigServer, query);
-        List<Tuple> expected = Util.getTuplesFromConstantTupleStrings(new String[]{
-                "(1L,'C',3,'M')",
-                "(2L,'A',1,'N')",
-                "(2L,'B',2,'N')",
-                "(3L,'D',4,'P')",
-                "(4L,'E',4,'Q')",
-                "(4L,'E',4,'Q')",
-                "(4L,'F',8,'Q')",
-                "(4L,'F',7,'Q')",
-                "(4L,'F',8,'Q')",
-                "(5L,'F',8,'T')",
-                "(6L,'G',10,'V')"
-        });
-        Util.checkQueryOutputsAfterSort(data.get("result"), expected);
+
+        Multiset<Tuple> expected = ImmutableMultiset.of(
+                tf.newTuple(ImmutableList.of((long) 1, "C", 3, "M")),
+                tf.newTuple(ImmutableList.of((long) 2, "A", 1, "N")),
+                tf.newTuple(ImmutableList.of((long) 2, "B", 2, "N")),
+                tf.newTuple(ImmutableList.of((long) 3, "D", 4, "P")),
+                tf.newTuple(ImmutableList.of((long) 4, "E", 4, "Q")),
+                tf.newTuple(ImmutableList.of((long) 4, "E", 4, "Q")),
+                tf.newTuple(ImmutableList.of((long) 4, "F", 8, "Q")),
+                tf.newTuple(ImmutableList.of((long) 4, "F", 7, "Q")),
+                tf.newTuple(ImmutableList.of((long) 4, "F", 8, "Q")),
+                tf.newTuple(ImmutableList.of((long) 5, "F", 8, "T")),
+                tf.newTuple(ImmutableList.of((long) 6, "G", 10, "V")));
+
+        verifyExpected(data.get("result"), expected);
     }
 
     @Test
@@ -94,20 +101,21 @@ public class TestRank2 {
             + "store C into 'result' using mock.Storage();";
 
         Util.registerMultiLineQuery(pigServer, query);
-        List<Tuple> expected = Util.getTuplesFromConstantTupleStrings(new String[]{
-                "(1L,'A',1,'N')",
-                "(2L,'B',2,'N')",
-                "(3L,'C',3,'M')",
-                "(4L,'D',4,'P')",
-                "(4L,'E',4,'Q')",
-                "(4L,'E',4,'Q')",
-                "(5L,'F',7,'Q')",
-                "(6L,'F',8,'Q')",
-                "(6L,'F',8,'Q')",
-                "(6L,'F',8,'T')",
-                "(7L,'G',10,'V')"
-        });
-        Util.checkQueryOutputsAfterSort(data.get("result"), expected);
+
+        Multiset<Tuple> expected = ImmutableMultiset.of(
+                tf.newTuple(ImmutableList.of((long) 1, "A", 1, "N")),
+                tf.newTuple(ImmutableList.of((long) 2, "B", 2, "N")),
+                tf.newTuple(ImmutableList.of((long) 3, "C", 3, "M")),
+                tf.newTuple(ImmutableList.of((long) 4, "D", 4, "P")),
+                tf.newTuple(ImmutableList.of((long) 4, "E", 4, "Q")),
+                tf.newTuple(ImmutableList.of((long) 4, "E", 4, "Q")),
+                tf.newTuple(ImmutableList.of((long) 5, "F", 7, "Q")),
+                tf.newTuple(ImmutableList.of((long) 6, "F", 8, "Q")),
+                tf.newTuple(ImmutableList.of((long) 6, "F", 8, "Q")),
+                tf.newTuple(ImmutableList.of((long) 6, "F", 8, "T")),
+                tf.newTuple(ImmutableList.of((long) 7, "G", 10, "V")));
+
+        verifyExpected(data.get("result"), expected);
     }
 
     @Test
@@ -117,20 +125,21 @@ public class TestRank2 {
             + "store C into 'result' using mock.Storage();";
 
         Util.registerMultiLineQuery(pigServer, query);
-        List<Tuple> expected = Util.getTuplesFromConstantTupleStrings(new String[]{
-                "(1L,'G',10,'V')",
-                "(2L,'F',8,'T')",
-                "(2L,'F',8,'Q')",
-                "(2L,'F',8,'Q')",
-                "(2L,'F',7,'Q')",
-                "(3L,'E',4,'Q')",
-                "(3L,'E',4,'Q')",
-                "(4L,'D',4,'P')",
-                "(5L,'C',3,'M')",
-                "(6L,'B',2,'N')",
-                "(7L,'A',1,'N')"
-        });
-        Util.checkQueryOutputsAfterSort(data.get("result"), expected);
+
+        Multiset<Tuple> expected = ImmutableMultiset.of(
+                tf.newTuple(ImmutableList.of((long) 1, "G", 10, "V")),
+                tf.newTuple(ImmutableList.of((long) 2, "F", 8, "T")),
+                tf.newTuple(ImmutableList.of((long) 2, "F", 8, "Q")),
+                tf.newTuple(ImmutableList.of((long) 2, "F", 8, "Q")),
+                tf.newTuple(ImmutableList.of((long) 2, "F", 7, "Q")),
+                tf.newTuple(ImmutableList.of((long) 3, "E", 4, "Q")),
+                tf.newTuple(ImmutableList.of((long) 3, "E", 4, "Q")),
+                tf.newTuple(ImmutableList.of((long) 4, "D", 4, "P")),
+                tf.newTuple(ImmutableList.of((long) 5, "C", 3, "M")),
+                tf.newTuple(ImmutableList.of((long) 6, "B", 2, "N")),
+                tf.newTuple(ImmutableList.of((long) 7, "A", 1, "N")));
+
+        verifyExpected(data.get("result"), expected);
     }
 
     @Test
@@ -140,20 +149,40 @@ public class TestRank2 {
             + "store C into 'result' using mock.Storage();";
 
         Util.registerMultiLineQuery(pigServer, query);
-        List<Tuple> expected = Util.getTuplesFromConstantTupleStrings(new String[]{
-                "(1L,'A',1,'N')",
-                "(2L,'B',2,'N')",
-                "(3L,'C',3,'M')",
-                "(4L,'D',4,'P')",
-                "(5L,'E',4,'Q')",
-                "(5L,'E',4,'Q')",
-                "(6L,'F',8,'Q')",
-                "(6L,'F',8,'Q')",
-                "(6L,'F',8,'T')",
-                "(7L,'F',7,'Q')",
-                "(8L,'G',10,'V')"
-        });
-        Util.checkQueryOutputsAfterSort(data.get("result"), expected);
+
+        Multiset<Tuple> expected = ImmutableMultiset.of(
+                tf.newTuple(ImmutableList.of((long) 1, "A", 1, "N")),
+                tf.newTuple(ImmutableList.of((long) 2, "B", 2, "N")),
+                tf.newTuple(ImmutableList.of((long) 3, "C", 3, "M")),
+                tf.newTuple(ImmutableList.of((long) 4, "D", 4, "P")),
+                tf.newTuple(ImmutableList.of((long) 5, "E", 4, "Q")),
+                tf.newTuple(ImmutableList.of((long) 5, "E", 4, "Q")),
+                tf.newTuple(ImmutableList.of((long) 6, "F", 8, "Q")),
+                tf.newTuple(ImmutableList.of((long) 6, "F", 8, "Q")),
+                tf.newTuple(ImmutableList.of((long) 6, "F", 8, "T")),
+                tf.newTuple(ImmutableList.of((long) 7, "F", 7, "Q")),
+                tf.newTuple(ImmutableList.of((long) 8, "G", 10, "V")));
+
+        verifyExpected(data.get("result"), expected);
+    }
+
+    public void verifyExpected(List<Tuple> out, Multiset<Tuple> expected) {
+        Multiset<Tuple> resultMultiset = TreeMultiset.create();
+        for (Tuple tup : out) {
+          resultMultiset.add(tup);
+        }
+
+        StringBuilder error = new StringBuilder("Result does not match.\nActual result:\n");
+        for (Tuple tup : resultMultiset.elementSet() ) {
+            error.append(tup).append(" x ").append(resultMultiset.count(tup)).append("\n");
+        }
+        error.append("Expceted result:\n");
+        for (Tuple tup : ImmutableSortedSet.copyOf(expected) ) {
+            error.append(tup).append(" x ").append(expected.count(tup)).append("\n");
+        }
+
+        //This one line test should be sufficient but adding the above
+        //for-loop for better error messages
+        assertTrue(error.toString(), resultMultiset.equals(expected));
     }
-    
-}
\ No newline at end of file
+}

Modified: pig/branches/spark/test/org/apache/pig/test/TestRank3.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestRank3.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestRank3.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestRank3.java Fri Mar  4 18:17:39 2016
@@ -19,6 +19,7 @@ package org.apache.pig.test;
 
 import static org.apache.pig.builtin.mock.Storage.resetData;
 import static org.apache.pig.builtin.mock.Storage.tuple;
+import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 import java.util.List;
@@ -32,6 +33,11 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMultiset;
+import com.google.common.collect.ImmutableSortedSet;
+import com.google.common.collect.TreeMultiset;
+import com.google.common.collect.Multiset;
 
 public class TestRank3 {
     private static PigServer pigServer;
@@ -108,39 +114,41 @@ public class TestRank3 {
             + "store R8 into 'result' using mock.Storage();";
 
         Util.registerMultiLineQuery(pigServer, query);
-        List<Tuple> expected = Util.getTuplesFromConstantTupleStrings(new String[]{
-                "(1L,21L,5L,7L,1L,1L,0L,8L,8L)",
-                "(2L,26L,2L,3L,2L,5L,1L,9L,10L)",
-                "(3L,30L,24L,21L,2L,3L,1L,3L,10L)",
-                "(4L,6L,10L,8L,3L,4L,1L,7L,2L)",
-                "(5L,8L,28L,25L,3L,2L,1L,0L,2L)",
-                "(6L,28L,11L,12L,4L,6L,2L,7L,10L)",
-                "(7L,9L,26L,22L,5L,7L,3L,2L,3L)",
-                "(8L,5L,6L,5L,6L,8L,3L,8L,1L)",
-                "(9L,29L,16L,15L,7L,9L,4L,6L,10L)",
-                "(10L,18L,12L,10L,8L,11L,5L,7L,6L)",
-                "(11L,14L,17L,14L,9L,10L,5L,6L,5L)",
-                "(12L,6L,12L,8L,10L,11L,5L,7L,2L)",
-                "(13L,2L,17L,13L,11L,10L,5L,6L,0L)",
-                "(14L,26L,3L,3L,12L,14L,6L,9L,10L)",
-                "(15L,15L,20L,18L,13L,13L,6L,4L,5L)",
-                "(16L,3L,29L,24L,14L,12L,6L,0L,0L)",
-                "(17L,23L,21L,19L,15L,16L,7L,4L,8L)",
-                "(18L,19L,19L,16L,16L,17L,7L,5L,6L)",
-                "(19L,20L,30L,26L,16L,15L,7L,0L,6L)",
-                "(20L,12L,21L,17L,17L,16L,7L,4L,4L)",
-                "(21L,4L,1L,1L,18L,19L,7L,10L,1L)",
-                "(22L,1L,7L,4L,19L,18L,7L,8L,0L)",
-                "(23L,24L,14L,11L,20L,21L,8L,7L,9L)",
-                "(24L,16L,25L,20L,21L,20L,8L,3L,5L)",
-                "(25L,25L,27L,23L,22L,22L,9L,1L,9L)",
-                "(26L,21L,8L,7L,23L,25L,9L,8L,8L)",
-                "(27L,17L,4L,2L,24L,26L,9L,9L,6L)",
-                "(28L,10L,8L,6L,25L,25L,9L,8L,4L)",
-                "(29L,11L,15L,9L,25L,24L,9L,7L,4L)",
-                "(30L,12L,23L,17L,25L,23L,9L,4L,4L)"
-        });
-        Util.checkQueryOutputsAfterSort(data.get("result"), expected);
+
+        Multiset<Tuple> expected = ImmutableMultiset.of(
+                tf.newTuple(ImmutableList.of(1L,21L,5L,7L,1L,1L,0L,8L,8L)),
+                tf.newTuple(ImmutableList.of(2L,26L,2L,3L,2L,5L,1L,9L,10L)),
+                tf.newTuple(ImmutableList.of(3L,30L,24L,21L,2L,3L,1L,3L,10L)),
+                tf.newTuple(ImmutableList.of(4L,6L,10L,8L,3L,4L,1L,7L,2L)),
+                tf.newTuple(ImmutableList.of(5L,8L,28L,25L,3L,2L,1L,0L,2L)),
+                tf.newTuple(ImmutableList.of(6L,28L,11L,12L,4L,6L,2L,7L,10L)),
+                tf.newTuple(ImmutableList.of(7L,9L,26L,22L,5L,7L,3L,2L,3L)),
+                tf.newTuple(ImmutableList.of(8L,5L,6L,5L,6L,8L,3L,8L,1L)),
+                tf.newTuple(ImmutableList.of(9L,29L,16L,15L,7L,9L,4L,6L,10L)),
+                tf.newTuple(ImmutableList.of(10L,18L,12L,10L,8L,11L,5L,7L,6L)),
+                tf.newTuple(ImmutableList.of(11L,14L,17L,14L,9L,10L,5L,6L,5L)),
+                tf.newTuple(ImmutableList.of(12L,6L,12L,8L,10L,11L,5L,7L,2L)),
+                tf.newTuple(ImmutableList.of(13L,2L,17L,13L,11L,10L,5L,6L,0L)),
+                tf.newTuple(ImmutableList.of(14L,26L,3L,3L,12L,14L,6L,9L,10L)),
+                tf.newTuple(ImmutableList.of(15L,15L,20L,18L,13L,13L,6L,4L,5L)),
+                tf.newTuple(ImmutableList.of(16L,3L,29L,24L,14L,12L,6L,0L,0L)),
+                tf.newTuple(ImmutableList.of(17L,23L,21L,19L,15L,16L,7L,4L,8L)),
+                tf.newTuple(ImmutableList.of(18L,19L,19L,16L,16L,17L,7L,5L,6L)),
+                tf.newTuple(ImmutableList.of(19L,20L,30L,26L,16L,15L,7L,0L,6L)),
+                tf.newTuple(ImmutableList.of(20L,12L,21L,17L,17L,16L,7L,4L,4L)),
+                tf.newTuple(ImmutableList.of(21L,4L,1L,1L,18L,19L,7L,10L,1L)),
+                tf.newTuple(ImmutableList.of(22L,1L,7L,4L,19L,18L,7L,8L,0L)),
+                tf.newTuple(ImmutableList.of(23L,24L,14L,11L,20L,21L,8L,7L,9L)),
+                tf.newTuple(ImmutableList.of(24L,16L,25L,20L,21L,20L,8L,3L,5L)),
+                tf.newTuple(ImmutableList.of(25L,25L,27L,23L,22L,22L,9L,1L,9L)),
+                tf.newTuple(ImmutableList.of(26L,21L,8L,7L,23L,25L,9L,8L,8L)),
+                tf.newTuple(ImmutableList.of(27L,17L,4L,2L,24L,26L,9L,9L,6L)),
+                tf.newTuple(ImmutableList.of(28L,10L,8L,6L,25L,25L,9L,8L,4L)),
+                tf.newTuple(ImmutableList.of(29L,11L,15L,9L,25L,24L,9L,7L,4L)),
+                tf.newTuple(ImmutableList.of(30L,12L,23L,17L,25L,23L,9L,4L,4L))
+        );
+
+        verifyExpected(data.get("result"), expected);
     }
 
     // See PIG-3726
@@ -151,8 +159,9 @@ public class TestRank3 {
         + "store A into 'empty_result' using mock.Storage();";
 
       Util.registerMultiLineQuery(pigServer, query);
-      List<Tuple> expected = Util.getTuplesFromConstantTupleStrings(new String[]{});
-      Util.checkQueryOutputsAfterSort(data.get("empty_result"), expected);
+
+      Multiset<Tuple> expected = ImmutableMultiset.of();
+      verifyExpected(data.get("empty_result"), expected);
     }
 
     @Test
@@ -188,4 +197,24 @@ public class TestRank3 {
         Util.checkQueryOutputsAfterSort(data.get("R4"), expectedResults);
     }
 
+    public void verifyExpected(List<Tuple> out, Multiset<Tuple> expected) {
+        Multiset<Tuple> resultMultiset = TreeMultiset.create();
+        for (Tuple tup : out) {
+          resultMultiset.add(tup);
+        }
+
+        StringBuilder error = new StringBuilder("Result does not match.\nActual result:\n");
+        for (Tuple tup : resultMultiset.elementSet() ) {
+            error.append(tup).append(" x ").append(resultMultiset.count(tup)).append("\n");
+        }
+        error.append("Expceted result:\n");
+        for (Tuple tup : ImmutableSortedSet.copyOf(expected) ) {
+            error.append(tup).append(" x ").append(expected.count(tup)).append("\n");
+        }
+
+        //This one line test should be sufficient but adding the above
+        //for-loop for better error messages
+        assertTrue(error.toString(), resultMultiset.equals(expected));
+    }
+
 }

Modified: pig/branches/spark/test/org/apache/pig/test/TestRegisteredJarVisibility.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestRegisteredJarVisibility.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestRegisteredJarVisibility.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestRegisteredJarVisibility.java Fri Mar  4 18:17:39 2016
@@ -24,12 +24,9 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-import java.util.jar.JarFile;
 import java.util.jar.JarOutputStream;
 import java.util.zip.ZipEntry;
 
@@ -38,17 +35,9 @@ import javax.tools.JavaFileObject;
 import javax.tools.StandardJavaFileManager;
 import javax.tools.ToolProvider;
 
-import org.apache.commons.io.FileUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.log4j.FileAppender;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-import org.apache.log4j.SimpleLayout;
-import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigServer;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
-import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.util.JarManager;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -73,16 +62,15 @@ public class TestRegisteredJarVisibility
 
     private static MiniGenericCluster cluster;
     private static File jarFile;
+    private static File testDataDir;
 
     @BeforeClass()
     public static void setUp() throws IOException {
 
         String testResourcesDir =  "test/resources/" + PACKAGE_NAME.replace(".", "/");
 
-        String testBuildDataDir = "build/test/data";
         // Create the test data directory if needed
-        File testDataDir = new File(testBuildDataDir,
-                TestRegisteredJarVisibility.class.getCanonicalName());
+        testDataDir = new File(Util.getTestDirectory(TestRegisteredJarVisibility.class));
         testDataDir.mkdirs();
 
         jarFile = new File(testDataDir, JAR_FILE_NAME);
@@ -111,6 +99,7 @@ public class TestRegisteredJarVisibility
     @AfterClass()
     public static void tearDown() {
         cluster.shutDown();
+        Util.deleteDirectory(testDataDir);
     }
 
     @Before
@@ -163,7 +152,7 @@ public class TestRegisteredJarVisibility
         // When jackson jar is not registered, jackson-core from the first jar in
         // classpath (pig.jar) should be picked up (version 1.8.8 in this case).
         String jacksonJar = JarManager.findContainingJar(org.codehaus.jackson.JsonParser.class);
-        Assert.assertTrue(new File(jacksonJar).getName().contains("1.8.8"));
+        Assert.assertTrue(new File(jacksonJar).getName().contains("1.9.13"));
 
         PigServer pigServer = new PigServer(Util.getLocalTestMode(), new Properties());
         pigServer.registerJar("test/resources/jackson-core-asl-1.9.9.jar");

Modified: pig/branches/spark/test/org/apache/pig/test/TestScalarAliases.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestScalarAliases.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestScalarAliases.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestScalarAliases.java Fri Mar  4 18:17:39 2016
@@ -28,7 +28,6 @@ import java.util.Iterator;
 import org.apache.pig.PigServer;
 import org.apache.pig.data.Tuple;
 import org.junit.AfterClass;
-import org.junit.Assume;
 import org.junit.Test;
 
 public class TestScalarAliases  {
@@ -93,7 +92,6 @@ public class TestScalarAliases  {
 
     @Test
     public void testScalarErrMultipleRowsInInput() throws Exception{
-        Assume.assumeTrue("Skip this test for TEZ. See PIG-3994", Util.isMapredExecType(cluster.getExecType()));
         Util.resetStateForExecModeSwitch();
         pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
         String[] input = {

Modified: pig/branches/spark/test/org/apache/pig/test/TestScalarAliasesLocal.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestScalarAliasesLocal.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestScalarAliasesLocal.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestScalarAliasesLocal.java Fri Mar  4 18:17:39 2016
@@ -25,8 +25,10 @@ import static org.junit.Assert.fail;
 import java.io.File;
 import java.io.IOException;
 import java.util.Iterator;
+import java.util.List;
 
 import org.apache.pig.PigServer;
+import org.apache.pig.builtin.mock.Storage;
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
@@ -556,4 +558,39 @@ public class TestScalarAliasesLocal {
         );
     }
 
+    @Test
+    public void testScalarNullValue() throws Exception{
+        Storage.Data data = Storage.resetData(pigServer);
+        data.set("input", Storage.tuple("a", 1), Storage.tuple("b", 2));
+
+        pigServer.setBatchOn();
+        pigServer.registerQuery("A = load 'input' using mock.Storage() as (a:chararray, b:int);");
+        pigServer.registerQuery("B = FILTER A by a == 'c';");
+        pigServer.registerQuery("C = FOREACH A generate a, b + B.b;");
+        pigServer.registerQuery("store C into 'output' using mock.Storage();");
+
+        pigServer.executeBatch();
+
+        List<Tuple> actualResults = data.get("output");
+        List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
+                new String[] {"('a', null)", "('b', null)"});
+        Util.checkQueryOutputsAfterSort(actualResults.iterator(), expectedResults);
+
+    }
+
+    @Test
+    public void testScalarNullValue2() throws Exception{
+        Storage.Data data = Storage.resetData(pigServer);
+        data.set("input", Storage.tuple("a", 1), Storage.tuple("b", 2));
+
+        pigServer.registerQuery("A = load 'input' using mock.Storage() as (a:chararray, b:int);");
+        pigServer.registerQuery("B = FILTER A by a == 'c';");
+        pigServer.registerQuery("C = GROUP B ALL;");
+        pigServer.registerQuery("D = FOREACH C GENERATE COUNT(B.b) as count;");
+        pigServer.registerQuery("E = FOREACH A GENERATE (D.count IS NOT NULL? D.count : 0l);;");
+
+        Iterator<Tuple> iter = pigServer.openIterator("E");
+        Tuple t = iter.next();
+        assertTrue(t.toString().equals("(0)"));
+    }
 }

Modified: pig/branches/spark/test/org/apache/pig/test/TestSecondarySort.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestSecondarySort.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestSecondarySort.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestSecondarySort.java Fri Mar  4 18:17:39 2016
@@ -442,6 +442,38 @@ public abstract class TestSecondarySort
 
     }
 
+    @Test
+    public void testNestedSortMultiQueryEndToEnd3() throws Exception {
+        File input1 = Util.createTempFileDelOnExit("test", "txt");
+        PrintStream ps1 = new PrintStream(new FileOutputStream(input1));
+        ps1.println("a\t0");
+        ps1.println("a\t2");
+        ps1.println("a\t1");
+        ps1.close();
+        Util.copyFromLocalToCluster(cluster, input1.getCanonicalPath(), "testNestedSortMultiQueryEndToEnd3-input-1.txt");
+
+        File input2 = Util.createTempFileDelOnExit("test", "txt");
+        PrintStream ps2 = new PrintStream(new FileOutputStream(input2));
+        ps2.println("a");
+        ps2.close();
+        Util.copyFromLocalToCluster(cluster, input2.getCanonicalPath(), "testNestedSortMultiQueryEndToEnd3-input-2.txt");
+
+        try {
+            pigServer.setBatchOn();
+            pigServer.registerQuery("a = load 'testNestedSortMultiQueryEndToEnd3-input-1.txt' as (a0:chararray, a1:chararray);");
+            pigServer.registerQuery("b = load 'testNestedSortMultiQueryEndToEnd3-input-2.txt' as (b0);");
+            pigServer.registerQuery("c = cogroup b by b0, a by a0;");
+            pigServer.registerQuery("d = foreach c {a_sorted = order a by a1 desc;generate group, a_sorted, b;}");
+            Iterator<Tuple> iter = pigServer.openIterator("d");
+
+            assertEquals(iter.next().toString(), "(a,{(a,2),(a,1),(a,0)},{(a)})");
+        } finally {
+            Util.deleteFile(cluster, "testNestedSortMultiQueryEndToEnd3-input-1.txt");
+            Util.deleteFile(cluster, "testNestedSortMultiQueryEndToEnd3-input-2.txt");
+        }
+
+    }
+
     // See PIG-1978
     @Test
     public void testForEachTwoInput() throws Exception {




Mime
View raw message