pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From z..@apache.org
Subject svn commit: r1784224 [16/17] - in /pig/branches/spark: ./ bin/ conf/ contrib/piggybank/java/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/apachel...
Date Fri, 24 Feb 2017 03:34:40 GMT
Modified: pig/branches/spark/test/org/apache/pig/tez/TestTezAutoParallelism.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/tez/TestTezAutoParallelism.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/tez/TestTezAutoParallelism.java (original)
+++ pig/branches/spark/test/org/apache/pig/tez/TestTezAutoParallelism.java Fri Feb 24 03:34:37 2017
@@ -36,7 +36,6 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigServer;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.InputSizeReducerEstimator;
@@ -48,7 +47,6 @@ import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.test.MiniGenericCluster;
 import org.apache.pig.test.Util;
-import org.apache.tez.dag.api.TezConfiguration;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -64,23 +62,12 @@ public class TestTezAutoParallelism {
     private static Properties properties;
     private static MiniGenericCluster cluster;
 
-    private static final PathFilter PART_FILE_FILTER = new PathFilter() {
-        @Override
-        public boolean accept(Path path) {
-            if (path.getName().startsWith("part")) {
-                return true;
-            }
-            return false;
-        }
-    };
-
     @BeforeClass
     public static void oneTimeSetUp() throws Exception {
         cluster = MiniGenericCluster.buildCluster(MiniGenericCluster.EXECTYPE_TEZ);
         properties = cluster.getProperties();
         //Test spilling to disk as tests here have multiple splits
         properties.setProperty(PigConfiguration.PIG_TEZ_INPUT_SPLITS_MEM_THRESHOLD, "10");
-        properties.setProperty(PigConfiguration.PIG_TEZ_GRACE_PARALLELISM, "false");
         createFiles();
     }
 
@@ -97,11 +84,6 @@ public class TestTezAutoParallelism {
 
     @After
     public void tearDown() throws Exception {
-        removeProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION);
-        removeProperty(MRConfiguration.MAX_SPLIT_SIZE);
-        removeProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM);
-        removeProperty(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART);
-        removeProperty(TezConfiguration.TEZ_AM_LOG_LEVEL);
         pigServer.shutdown();
         pigServer = null;
     }
@@ -149,53 +131,32 @@ public class TestTezAutoParallelism {
     @Test
     public void testGroupBy() throws IOException{
         // parallelism is 3 originally, reduce to 1
-        setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
-        setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
-        setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
+        pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
+        pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
+        pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
                 Long.toString(InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER));
         pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);");
         pigServer.registerQuery("B = group A by name;");
         pigServer.store("B", "output1");
         FileSystem fs = cluster.getFileSystem();
-        FileStatus[] files = fs.listStatus(new Path("output1"), PART_FILE_FILTER);
+        FileStatus[] files = fs.listStatus(new Path("output1"), new PathFilter(){
+            @Override
+            public boolean accept(Path path) {
+                if (path.getName().startsWith("part")) {
+                    return true;
+                }
+                return false;
+            }
+        });
         assertEquals(files.length, 1);
-        fs.delete(new Path("output1"), true);
-    }
-
-    @Test
-    public void testBytesPerReducer() throws IOException{
-
-        NodeIdGenerator.reset();
-        PigServer.resetScope();
-
-        setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
-        setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
-        setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "1000");
-
-        StringWriter writer = new StringWriter();
-        Util.createLogAppender("testAutoParallelism", writer, TezDagBuilder.class);
-        try {
-            pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);");
-            pigServer.registerQuery("B = group A by name;");
-            pigServer.store("B", "output1");
-            FileSystem fs = cluster.getFileSystem();
-            FileStatus[] files = fs.listStatus(new Path("output1"), PART_FILE_FILTER);
-            assertEquals(files.length, 10);
-            String log = writer.toString();
-            assertTrue(log.contains("For vertex - scope-13: parallelism=3"));
-            assertTrue(log.contains("For vertex - scope-14: parallelism=10"));
-        } finally {
-            Util.removeLogAppender("testAutoParallelism", TezDagBuilder.class);
-            Util.deleteFile(cluster, "output1");
-        }
     }
 
     @Test
     public void testOrderbyDecreaseParallelism() throws IOException{
         // order by parallelism is 3 originally, reduce to 1
-        setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
-        setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
-        setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
+        pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
+        pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
+        pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
                 Long.toString(InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER));
         pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);");
         pigServer.registerQuery("B = group A by name parallel 3;");
@@ -203,54 +164,86 @@ public class TestTezAutoParallelism {
         pigServer.registerQuery("D = order C by age;");
         pigServer.store("D", "output2");
         FileSystem fs = cluster.getFileSystem();
-        FileStatus[] files = fs.listStatus(new Path("output2"), PART_FILE_FILTER);
+        FileStatus[] files = fs.listStatus(new Path("output2"), new PathFilter(){
+            @Override
+            public boolean accept(Path path) {
+                if (path.getName().startsWith("part")) {
+                    return true;
+                }
+                return false;
+            }
+        });
         assertEquals(files.length, 1);
     }
 
     @Test
     public void testOrderbyIncreaseParallelism() throws IOException{
         // order by parallelism is 3 originally, increase to 4
-        setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
-        setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
-        setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "1000");
+        pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
+        pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
+        pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "1000");
         pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);");
         pigServer.registerQuery("B = group A by name parallel 3;");
         pigServer.registerQuery("C = foreach B generate group as name, AVG(A.age) as age;");
         pigServer.registerQuery("D = order C by age;");
         pigServer.store("D", "output3");
         FileSystem fs = cluster.getFileSystem();
-        FileStatus[] files = fs.listStatus(new Path("output3"), PART_FILE_FILTER);
+        FileStatus[] files = fs.listStatus(new Path("output3"), new PathFilter(){
+            @Override
+            public boolean accept(Path path) {
+                if (path.getName().startsWith("part")) {
+                    return true;
+                }
+                return false;
+            }
+        });
         assertEquals(files.length, 4);
     }
 
     @Test
     public void testSkewedJoinDecreaseParallelism() throws IOException{
         // skewed join parallelism is 4 originally, reduce to 1
-        setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
-        setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
-        setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
+        pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
+        pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
+        pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
                 Long.toString(InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER));
         pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);");
         pigServer.registerQuery("B = load '" + INPUT_FILE2 + "' as (name:chararray, gender:chararray);");
         pigServer.registerQuery("C = join A by name, B by name using 'skewed';");
         pigServer.store("C", "output4");
         FileSystem fs = cluster.getFileSystem();
-        FileStatus[] files = fs.listStatus(new Path("output4"), PART_FILE_FILTER);
+        FileStatus[] files = fs.listStatus(new Path("output4"), new PathFilter(){
+            @Override
+            public boolean accept(Path path) {
+                if (path.getName().startsWith("part")) {
+                    return true;
+                }
+                return false;
+            }
+        });
         assertEquals(files.length, 1);
     }
 
     @Test
     public void testSkewedJoinIncreaseParallelism() throws IOException{
         // skewed join parallelism is 3 originally, increase to 5
-        setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
-        setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
-        setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "40000");
+        pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
+        pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
+        pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "40000");
         pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);");
         pigServer.registerQuery("B = load '" + INPUT_FILE2 + "' as (name:chararray, gender:chararray);");
         pigServer.registerQuery("C = join A by name, B by name using 'skewed';");
         pigServer.store("C", "output5");
         FileSystem fs = cluster.getFileSystem();
-        FileStatus[] files = fs.listStatus(new Path("output5"), PART_FILE_FILTER);
+        FileStatus[] files = fs.listStatus(new Path("output5"), new PathFilter(){
+            @Override
+            public boolean accept(Path path) {
+                if (path.getName().startsWith("part")) {
+                    return true;
+                }
+                return false;
+            }
+        });
         assertEquals(files.length, 5);
     }
 
@@ -258,15 +251,23 @@ public class TestTezAutoParallelism {
     public void testSkewedFullJoinIncreaseParallelism() throws IOException{
         // skewed full join parallelism take the initial setting, since the join vertex has a broadcast(sample) dependency,
         // which prevent it changing parallelism
-        setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
-        setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
-        setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "40000");
+        pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
+        pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
+        pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "40000");
         pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);");
         pigServer.registerQuery("B = load '" + INPUT_FILE2 + "' as (name:chararray, gender:chararray);");
         pigServer.registerQuery("C = join A by name full, B by name using 'skewed';");
         pigServer.store("C", "output6");
         FileSystem fs = cluster.getFileSystem();
-        FileStatus[] files = fs.listStatus(new Path("output5"), PART_FILE_FILTER);
+        FileStatus[] files = fs.listStatus(new Path("output5"), new PathFilter(){
+            @Override
+            public boolean accept(Path path) {
+                if (path.getName().startsWith("part")) {
+                    return true;
+                }
+                return false;
+            }
+        });
         assertEquals(files.length, 5);
     }
 
@@ -274,9 +275,9 @@ public class TestTezAutoParallelism {
     public void testSkewedJoinIncreaseParallelismWithScalar() throws IOException{
         // skewed join parallelism take the initial setting, since the join vertex has a broadcast(scalar) dependency,
         // which prevent it changing parallelism
-        setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
-        setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
-        setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "40000");
+        pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
+        pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
+        pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "40000");
         pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);");
         pigServer.registerQuery("B = load '" + INPUT_FILE2 + "' as (name:chararray, gender:chararray);");
         pigServer.registerQuery("C = join A by name, B by name using 'skewed';");
@@ -286,29 +287,19 @@ public class TestTezAutoParallelism {
         pigServer.registerQuery("G = foreach C generate age/F.count, gender;");
         pigServer.store("G", "output7");
         FileSystem fs = cluster.getFileSystem();
-        FileStatus[] files = fs.listStatus(new Path("output7"), PART_FILE_FILTER);
+        FileStatus[] files = fs.listStatus(new Path("output7"), new PathFilter(){
+            @Override
+            public boolean accept(Path path) {
+                if (path.getName().startsWith("part")) {
+                    return true;
+                }
+                return false;
+            }
+        });
         assertEquals(files.length, 4);
     }
 
     @Test
-    public void testSkewedJoinRightInputAutoParallelism() throws IOException{
-        setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
-        setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
-        setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "40000");
-        setProperty(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, "1.0");
-        setProperty(TezConfiguration.TEZ_AM_LOG_LEVEL, "DEBUG");
-        pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);");
-        pigServer.registerQuery("B = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);");
-        pigServer.registerQuery("B = FILTER B by name == 'Noah';");
-        pigServer.registerQuery("B1 = group B by name;");
-        pigServer.registerQuery("C = join A by name, B1 by group using 'skewed';");
-        pigServer.store("C", "output8");
-        FileSystem fs = cluster.getFileSystem();
-        FileStatus[] files = fs.listStatus(new Path("output8"), PART_FILE_FILTER);
-        assertEquals(5, files.length);
-    }
-
-    @Test
     public void testFlattenParallelism() throws IOException{
         String outputDir = "/tmp/testFlattenParallelism";
         String script = "A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);"
@@ -395,9 +386,9 @@ public class TestTezAutoParallelism {
         // When there is a combiner operation involved user specified parallelism is overriden
         Util.createLogAppender("testAutoParallelism", writer, classesToLog);
         try {
-            setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
-            setProperty(MRConfiguration.MAX_SPLIT_SIZE, "4000");
-            setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "80000");
+            pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
+            pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE, "4000");
+            pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "80000");
             pigServer.setBatchOn();
             pigServer.registerScript(new ByteArrayInputStream(script.getBytes()));
             pigServer.executeBatch();
@@ -425,12 +416,4 @@ public class TestTezAutoParallelism {
             Util.deleteFile(cluster, outputDir);
         }
     }
-
-    private void setProperty(String property, String value) {
-        pigServer.getPigContext().getProperties().setProperty(property, value);
-    }
-
-    private void removeProperty(String property) {
-        pigServer.getPigContext().getProperties().remove(property);
-    }
 }

Modified: pig/branches/spark/test/org/apache/pig/tez/TestTezCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/tez/TestTezCompiler.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/tez/TestTezCompiler.java (original)
+++ pig/branches/spark/test/org/apache/pig/tez/TestTezCompiler.java Fri Feb 24 03:34:37 2017
@@ -20,21 +20,13 @@ package org.apache.pig.tez;
 import static org.junit.Assert.assertEquals;
 
 import java.io.ByteArrayOutputStream;
-import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
-import java.io.IOException;
 import java.io.PrintStream;
 import java.util.Properties;
-import java.util.Random;
 
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigServer;
-import org.apache.pig.StoreFunc;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.tez.TezLauncher;
@@ -43,9 +35,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainerPrinter;
 import org.apache.pig.builtin.OrcStorage;
 import org.apache.pig.builtin.PigStorage;
-import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.test.TestMultiQueryBasic.DummyStoreWithOutputFormat;
 import org.apache.pig.test.Util;
@@ -76,14 +66,11 @@ public class TestTezCompiler {
 
     @BeforeClass
     public static void setUpBeforeClass() throws Exception {
-        resetFileLocalizer();
         pc = new PigContext(new TezLocalExecType(), new Properties());
-        FileUtils.deleteDirectory(new File("/tmp/pigoutput"));
     }
 
     @AfterClass
     public static void tearDownAfterClass() throws Exception {
-        resetFileLocalizer();
     }
 
     @Before
@@ -92,7 +79,6 @@ public class TestTezCompiler {
         pc.getProperties().remove(PigConfiguration.PIG_OPT_MULTIQUERY);
         pc.getProperties().remove(PigConfiguration.PIG_TEZ_OPT_UNION);
         pc.getProperties().remove(PigConfiguration.PIG_EXEC_NO_SECONDARY_KEY);
-        pc.getProperties().remove(PigConfiguration.PIG_BLOOMJOIN_STRATEGY);
         pigServer = new PigServer(pc);
     }
 
@@ -102,20 +88,13 @@ public class TestTezCompiler {
         TezPlanContainer.resetScope();
     }
 
-    private static void resetFileLocalizer() {
-        FileLocalizer.deleteTempFiles();
-        FileLocalizer.setInitialized(false);
-        // Set random seed to generate deterministic temporary paths
-        FileLocalizer.setR(new Random(1331L));
-    }
-
     @Test
     public void testStoreLoad() throws Exception {
         String query =
                 "a = load 'file:///tmp/input' as (x:int, y:int);" +
-                "store a into 'file:///tmp/pigoutput';" +
-                "b = load 'file:///tmp/pigoutput' as (x:int, y:int);" +
-                "store b into 'file:///tmp/pigoutput1';";
+                "store a into 'file:///tmp/output';" +
+                "b = load 'file:///tmp/output' as (x:int, y:int);" +
+                "store b into 'file:///tmp/output1';";
 
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-LoadStore-1.gld");
     }
@@ -124,85 +103,25 @@ public class TestTezCompiler {
     public void testStoreLoadMultiple() throws Exception {
         String query =
                 "a = load 'file:///tmp/input';" +
-                "store a into 'file:///tmp/pigoutput/Dir1';" +
-                "a = load 'file:///tmp/pigoutput/Dir1';" +
-                "store a into 'file:///tmp/pigoutput/Dir2' using BinStorage();" +
-                "a = load 'file:///tmp/pigoutput/Dir1';" +
-                "store a into 'file:///tmp/pigoutput/Dir3';" +
-                "a = load 'file:///tmp/pigoutput/Dir2' using BinStorage();" +
-                "store a into 'file:///tmp/pigoutput/Dir4';" +
-                "a = load 'file:///tmp/pigoutput/Dir3';" +
-                "b = load 'file:///tmp/pigoutput/Dir2' using BinStorage();" +
-                "c = load 'file:///tmp/pigoutput/Dir1';" +
+                "store a into 'file:///tmp/output/Dir1';" +
+                "a = load 'file:///tmp/output/Dir1';" +
+                "store a into 'file:///tmp/output/Dir2' using BinStorage();" +
+                "a = load 'file:///tmp/output/Dir1';" +
+                "store a into 'file:///tmp/output/Dir3';" +
+                "a = load 'file:///tmp/output/Dir2' using BinStorage();" +
+                "store a into 'file:///tmp/output/Dir4';" +
+                "a = load 'file:///tmp/output/Dir3';" +
+                "b = load 'file:///tmp/output/Dir2' using BinStorage();" +
+                "c = load 'file:///tmp/output/Dir1';" +
                 "d = cogroup a by $0, b by $0, c by $0;" +
-                "store d into 'file:///tmp/pigoutput/Dir5';";
-
-        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-LoadStore-2.gld");
-    }
-
-    @Test
-    public void testStoreLoadJoinMultiple() throws Exception {
-        // Case where different store load statements are used in a single join
-        String query =
-                "a = load 'file:///tmp/pigoutput/Dir1';" +
-                "b = filter a by $0 == 1;" +
-                "c = filter a by $0 == 2;" +
-                "store b into 'file:///tmp/pigoutput/Dir2';" +
-                "store c into 'file:///tmp/pigoutput/Dir3';" +
-                "d = load 'file:///tmp/pigoutput/Dir2';" +
-                "e = load 'file:///tmp/pigoutput/Dir3';" +
-                "f = join d by $0, e by $0;" +
-                "store f into 'file:///tmp/pigoutput/Dir5';";
-
-        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-LoadStore-3.gld");
-
-        resetScope();
-        query =
-                "a = load 'file:///tmp/pigoutput/Dir1';" +
-                "b = distinct a;" +
-                "c = group a by $0;" +
-                "store b into 'file:///tmp/pigoutput/Dir2';" +
-                "store c into 'file:///tmp/pigoutput/Dir3';" +
-                "d = load 'file:///tmp/pigoutput/Dir2';" +
-                "e = load 'file:///tmp/pigoutput/Dir3';" +
-                "f = load 'file:///tmp/pigoutput/Dir4';" +
-                "g = join d by $0, f by $0 using 'repl';" +
-                "h = join e by $0, f by $0 using 'repl';" +
-                "store g into 'file:///tmp/pigoutput/Dir4';" +
-                "store h into 'file:///tmp/pigoutput/Dir5';";
-
-        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-LoadStore-4.gld");
-    }
-
-    @Test
-    public void testStoreLoadSplit() throws Exception {
-        // Cases where segmenting into two DAGs is not straight forward due to Split.
-        // The Split operator is required in both the segments.
+                "store d into 'file:///tmp/output/Dir5';";
 
-        resetFileLocalizer();
-        // Split operator as root vertex
-        String query =
-                "a = load 'file:///tmp/input';" +
-                "a1 = filter a by $0 == 5;" +
-                "store a1 into 'file:///tmp/pigoutput/Dir1';" +
-                "b = load 'file:///tmp/pigoutput/Dir1';" +
-                "c = join a by $0, b by $0;" +
-                "store c into 'file:///tmp/pigoutput/Dir2';";
-
-        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-LoadStore-5.gld");
-
-        // Split operator as intermediate vertex
-        query =
-                "a = load 'file:///tmp/input';" +
-                "a = distinct a;" +
-                "store a into 'file:///tmp/pigoutput/Dir1';" +
-                "b = load 'file:///tmp/pigoutput/Dir1';" +
-                "c = join a by $0, b by $0;" +
-                "store c into 'file:///tmp/pigoutput/Dir2';";
-
-        resetScope();
-        resetFileLocalizer();
-        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-LoadStore-6.gld");
+        // To get around difference in ordering of operators in plan due to JDK7 and JDK8
+        if (System.getProperties().getProperty("java.version").startsWith("1.8")) {
+            run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-LoadStore-2.gld");
+        } else {
+            run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-LoadStore-2-JDK7.gld");
+        }
     }
 
     @Test
@@ -210,7 +129,7 @@ public class TestTezCompiler {
         String query =
                 "a = load 'file:///tmp/input' as (x:int, y:int);" +
                 "b = native 'hadoop-examples.jar' Store a into '/tmp/table_testNativeMRJobSimple_input' Load '/tmp/table_testNativeMRJobSimple_output' `wordcount /tmp/table_testNativeMRJobSimple_input /tmp/table_testNativeMRJobSimple_output`;" +
-                "store b into 'file:///tmp/pigoutput';";
+                "store b into 'file:///tmp/output';";
 
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Native-1.gld");
     }
@@ -221,7 +140,7 @@ public class TestTezCompiler {
                 "a = load 'file:///tmp/input' as (x:int, y:int);" +
                 "b = filter a by x > 0;" +
                 "c = foreach b generate y;" +
-                "store c into 'file:///tmp/pigoutput';";
+                "store c into 'file:///tmp/output';";
 
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Filter-1.gld");
     }
@@ -232,7 +151,7 @@ public class TestTezCompiler {
                 "a = load 'file:///tmp/input' as (x:int, y:int);" +
                 "b = group a by x;" +
                 "c = foreach b generate group, COUNT(a.x);" +
-                "store c into 'file:///tmp/pigoutput';";
+                "store c into 'file:///tmp/output';";
 
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Group-1.gld");
     }
@@ -244,131 +163,12 @@ public class TestTezCompiler {
                 "b = load 'file:///tmp/input2' as (x:int, z:int);" +
                 "c = join a by x, b by x;" +
                 "d = foreach c generate a::x as x, y, z;" +
-                "store d into 'file:///tmp/pigoutput';";
+                "store d into 'file:///tmp/output';";
 
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Join-1.gld");
     }
 
     @Test
-    public void testBloomJoin() throws Exception {
-        String query =
-                "a = load 'file:///tmp/input1' as (x, y:int);" +
-                "b = load 'file:///tmp/input2' as (x, z:int);" +
-                "c = load 'file:///tmp/input2' as (x, w:int);" +
-                "d = join b by x, a by x, c by x using 'bloom';" +
-                "e = foreach d generate a::x as x, y, z, w;" +
-                "store e into 'file:///tmp/pigoutput';";
-
-        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-1.gld");
-        resetScope();
-        setProperty(PigConfiguration.PIG_BLOOMJOIN_STRATEGY, "reduce");
-        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-1-KeyToReducer.gld");
-    }
-
-    @Test
-    public void testBloomJoinLeftOuter() throws Exception {
-        String query =
-                "a = load 'file:///tmp/input1' as (x:chararray, y:int);" +
-                "b = load 'file:///tmp/input2' as (x:chararray, z:int);" +
-                "d = join a by x left, b by x using 'bloom';" +
-                "e = foreach d generate a::x as x, y, z;" +
-                "store e into 'file:///tmp/pigoutput';";
-
-        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-2.gld");
-        resetScope();
-        setProperty(PigConfiguration.PIG_BLOOMJOIN_STRATEGY, "reduce");
-        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-2-KeyToReducer.gld");
-    }
-
-    @Test
-    public void testBloomJoinUnion() throws Exception {
-        // Left input from a union
-        String query =
-                "a = load 'file:///tmp/input1' as (x:int, y:int);" +
-                "b = load 'file:///tmp/input2' as (x:int, z:int);" +
-                "c = load 'file:///tmp/input3' as (x:int, z:int);" +
-                "b = union b, c;" +
-                "d = join a by x, b by x using 'bloom';" +
-                "e = foreach d generate a::x as x, y, z;" +
-                "store e into 'file:///tmp/pigoutput';";
-
-        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-3.gld");
-        resetScope();
-        setProperty(PigConfiguration.PIG_BLOOMJOIN_STRATEGY, "reduce");
-        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-3-KeyToReducer.gld");
-        setProperty(PigConfiguration.PIG_BLOOMJOIN_STRATEGY, null);
-
-        resetScope();
-        // Right input from a union
-        query =
-                "a = load 'file:///tmp/input1' as (x:int, y:int);" +
-                "b = load 'file:///tmp/input2' as (x:int, z:int);" +
-                "c = load 'file:///tmp/input3' as (x:int, z:int);" +
-                "b = union b, c;" +
-                "d = join b by x, a by x using 'bloom';" +
-                "e = foreach d generate a::x as x, y, z;" +
-                "store e into 'file:///tmp/pigoutput';";
-
-        // Needs shared edges and PIG-3856 to be a more optimial plan
-        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-4.gld");
-        resetScope();
-        setProperty(PigConfiguration.PIG_BLOOMJOIN_STRATEGY, "reduce");
-        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-4-KeyToReducer.gld");
-    }
-
-    @Test
-    public void testBloomJoinSplit() throws Exception {
-        // Left input from a split
-        String query =
-                "a = load 'file:///tmp/input1' as (x:int, y:int);" +
-                "b = load 'file:///tmp/input2' as (x:int, z:int);" +
-                "a1 = filter a by x == 3;" +
-                "a2 = filter a by x == 4;" +
-                "d = join a1 by x, a2 by x, b by x using 'bloom';" +
-                "e = foreach d generate a1::x as x, a1::y as y1, a2::y as y2, z;" +
-                "store e into 'file:///tmp/pigoutput';";
-
-        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-5.gld");
-        resetScope();
-        setProperty(PigConfiguration.PIG_BLOOMJOIN_STRATEGY, "reduce");
-        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-5-KeyToReducer.gld");
-        setProperty(PigConfiguration.PIG_BLOOMJOIN_STRATEGY, null);
-
-        resetScope();
-        // Right input from a split
-        query =
-                "a = load 'file:///tmp/input1' as (x:int, y:int);" +
-                "b = load 'file:///tmp/input2' as (x:int, z:int);" +
-                "a1 = filter a by x == 3;" +
-                "a2 = filter a by x == 4;" +
-                "d = join b by x, a1 by x using 'bloom';" +
-                "e = foreach d generate a1::x as x, y, z;" +
-                "store a2 into 'file:///tmp/pigoutput/a2';" +
-                "store e into 'file:///tmp/pigoutput/e';";
-
-        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-6.gld");
-        resetScope();
-        setProperty(PigConfiguration.PIG_BLOOMJOIN_STRATEGY, "reduce");
-        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-6-KeyToReducer.gld");
-    }
-
-    @Test
-    public void testBloomSelfJoin() throws Exception {
-        String query =
-                "a = load 'file:///tmp/input1' as (x:int, y:int);" +
-                "b = filter a by x < 5;" +
-                "c = filter a by x == 10;" +
-                "d = filter a by x > 10;" +
-                "e = join b by x, c by x, d by x using 'bloom';" +
-                "store e into 'file:///tmp/pigoutput';";
-
-        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-7.gld");
-        resetScope();
-        setProperty(PigConfiguration.PIG_BLOOMJOIN_STRATEGY, "reduce");
-        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-7-KeyToReducer.gld");
-    }
-
-    @Test
     public void testSelfJoin() throws Exception {
         String query =
                 "a = load 'file:///tmp/input1' as (x:int, y:int);" +
@@ -376,7 +176,7 @@ public class TestTezCompiler {
                 "c = filter a by x == 10;" +
                 "d = filter a by x > 10;" +
                 "e = join b by x, c by x, d by x;" +
-                "store e into 'file:///tmp/pigoutput';";
+                "store e into 'file:///tmp/output';";
 
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-1.gld");
     }
@@ -388,7 +188,7 @@ public class TestTezCompiler {
                 "b = filter a by x < 5;" +
                 "c = filter a by x == 10;" +
                 "d = join b by x, c by x using 'skewed';" +
-                "store d into 'file:///tmp/pigoutput';";
+                "store d into 'file:///tmp/output';";
 
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-2.gld");
     }
@@ -401,7 +201,7 @@ public class TestTezCompiler {
                 "c = filter a by x == 10;" +
                 "d = filter a by x > 10;" +
                 "e = join b by x, c by x, d by x using 'replicated';" +
-                "store e into 'file:///tmp/pigoutput';";
+                "store e into 'file:///tmp/output';";
 
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-3.gld");
     }
@@ -413,7 +213,7 @@ public class TestTezCompiler {
                 "b = load 'file:///tmp/input2' as (x:int, z:int);" +
                 "c = union a, b;" +
                 "d = join b by x, c by x using 'replicated';" +
-                "store d into 'file:///tmp/pigoutput';";
+                "store d into 'file:///tmp/output';";
 
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-4.gld");
     }
@@ -426,7 +226,7 @@ public class TestTezCompiler {
                 "a2 = filter a by x < 2;" +
                 "b = union a1, a2;" +
                 "c = join b by x, a by x;" +
-                "store c into 'file:///tmp/pigoutput';";
+                "store c into 'file:///tmp/output';";
 
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-5.gld");
     }
@@ -442,7 +242,7 @@ public class TestTezCompiler {
                 "a5 = foreach a4 generate a2::x as x, a3::y as y;" +
                 "b = union a1, a5;" +
                 "c = join b by x, a by x;" +
-                "store c into 'file:///tmp/pigoutput';";
+                "store c into 'file:///tmp/output';";
 
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-6.gld");
     }
@@ -454,7 +254,7 @@ public class TestTezCompiler {
                 "b = load 'file:///tmp/input2' as (x:int, z:int);" +
                 "c = cross a, b;" +
                 "d = foreach c generate a::x as x, y, z;" +
-                "store d into 'file:///tmp/pigoutput';";
+                "store d into 'file:///tmp/output';";
 
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Cross-1.gld");
     }
@@ -466,7 +266,7 @@ public class TestTezCompiler {
                 "b = filter a by x < 5;" +
                 "c = filter a by x == 10;" +
                 "d = cross b, c;" +
-                "store d into 'file:///tmp/pigoutput';";
+                "store d into 'file:///tmp/output';";
 
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Cross-2.gld");
     }
@@ -478,7 +278,7 @@ public class TestTezCompiler {
                 "b = load 'file:///tmp/input2' as (x:int, z:int);" +
                 "c = cross b, a;" +
                 "d = foreach c generate a.x, a.y, z;" + //Scalar
-                "store d into 'file:///tmp/pigoutput';";
+                "store d into 'file:///tmp/output';";
 
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Cross-3.gld");
     }
@@ -490,7 +290,7 @@ public class TestTezCompiler {
                 "b = load 'file:///tmp/input2' as (x:int, z:int);" +
                 "c = join a by x, b by x using 'skewed';" +
                 "d = foreach c generate a::x as x, y, z;" +
-                "store d into 'file:///tmp/pigoutput';";
+                "store d into 'file:///tmp/output';";
 
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SkewJoin-1.gld");
     }
@@ -503,7 +303,7 @@ public class TestTezCompiler {
                 "b = load 'file:///tmp/input2' as (x:int, z:int);" +
                 "c = join a by x, b by x using 'skewed';" +
                 "d = foreach c generate a::x as x, y, z;" +
-                "store d into 'file:///tmp/pigoutput';";
+                "store d into 'file:///tmp/output';";
 
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SkewJoin-2.gld");
     }
@@ -514,7 +314,7 @@ public class TestTezCompiler {
                 "a = load 'file:///tmp/input' as (x:int, y:int);" +
                 "b = limit a 10;" +
                 "c = foreach b generate y;" +
-                "store c into 'file:///tmp/pigoutput';";
+                "store c into 'file:///tmp/output';";
 
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Limit-1.gld");
     }
@@ -525,7 +325,7 @@ public class TestTezCompiler {
                 "a = load 'file:///tmp/input' as (x:int, y:int);" +
                 "b = order a by x, y;" +
                 "c = limit b 10;" +
-                "store c into 'file:///tmp/pigoutput';";
+                "store c into 'file:///tmp/output';";
 
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Limit-2.gld");
     }
@@ -538,31 +338,18 @@ public class TestTezCompiler {
                 "g = group a all;" +
                 "h = foreach g generate COUNT(a) as sum;" +
                 "c = limit b h.sum/2;" +
-                "store c into 'file:///tmp/pigoutput';";
+                "store c into 'file:///tmp/output';";
 
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Limit-3.gld");
     }
 
     @Test
-    public void testLimitReplJoin() throws Exception {
-        String query =
-                "a = load 'file:///tmp/input' as (x:int, y:int);" +
-                "b = load 'file:///tmp/input' as (x:int, y:int);" +
-                "c = limit a 1;" +
-                "d = join c by x, b by x using 'replicated';" +
-                "store a into 'file:///tmp/pigoutput/a';" +
-                "store d into 'file:///tmp/pigoutput/d';";
-
-        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Limit-4.gld");
-    }
-
-    @Test
     public void testDistinct() throws Exception {
         String query =
                 "a = load 'file:///tmp/input' as (x:int, y:int);" +
                 "b = distinct a;" +
                 "c = foreach b generate y;" +
-                "store c into 'file:///tmp/pigoutput';";
+                "store c into 'file:///tmp/output';";
 
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Distinct-1.gld");
     }
@@ -573,7 +360,7 @@ public class TestTezCompiler {
                 "a = load 'file:///tmp/input' as (x:int, y:int);" +
                 "b = group a by x;" +
                 "c = foreach b { d = distinct a; generate COUNT(d); };" +
-                "store c into 'file:///tmp/pigoutput';";
+                "store c into 'file:///tmp/output';";
 
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Distinct-2.gld");
     }
@@ -587,7 +374,7 @@ public class TestTezCompiler {
                 "b = load 'file:///tmp/input2' as (x:int, z:int);" +
                 "c = load 'file:///tmp/input3' as (x:int, z:int);" +
                 "d = join a by x, b by x, c by x using 'replicated';" +
-                "store d into 'file:///tmp/pigoutput/d';";
+                "store d into 'file:///tmp/output/d';";
 
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-FRJoin-1.gld");
     }
@@ -600,7 +387,7 @@ public class TestTezCompiler {
                 "b1 = foreach b generate group, COUNT(a.y);" +
                 "c = load 'file:///tmp/input2' as (x:int, z:int);" +
                 "d = join b1 by group, c by x using 'replicated';" +
-                "store d into 'file:///tmp/pigoutput/e';";
+                "store d into 'file:///tmp/output/e';";
 
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-FRJoin-2.gld");
     }
@@ -610,7 +397,7 @@ public class TestTezCompiler {
         String query =
                 "a = load 'file:///tmp/input' using PigStorage(',') as (x:int, y:int);" +
                 "b = stream a through `stream.pl -n 5`;" +
-                "STORE b INTO 'file:///tmp/pigoutput';";
+                "STORE b INTO 'file:///tmp/output';";
 
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Stream-1.gld");
     }
@@ -621,7 +408,7 @@ public class TestTezCompiler {
                 "a = load 'file:///tmp/input' using PigStorage(',') as (x:int, y:int, z:int);" +
                 "b = group a by $0;" +
                 "c = foreach b { d = limit a 10; e = order d by $1; f = order e by $0; generate group, f;};"+
-                "store c INTO 'file:///tmp/pigoutput';";
+                "store c INTO 'file:///tmp/output';";
 
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SecKeySort-1.gld");
 
@@ -635,7 +422,7 @@ public class TestTezCompiler {
         String query =
                 "a = load 'file:///tmp/input' using PigStorage(',') as (x:int, y:int);" +
                 "b = order a by x;" +
-                "STORE b INTO 'file:///tmp/pigoutput';";
+                "STORE b INTO 'file:///tmp/output';";
 
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Order-1.gld");
     }
@@ -646,7 +433,7 @@ public class TestTezCompiler {
                 "a = load 'file:///tmp/input' using PigStorage(',') as (x:int, y:int);" +
                 "b = filter a by x == 1;" +
                 "c = order b by x;" +
-                "STORE c INTO 'file:///tmp/pigoutput';";
+                "STORE c INTO 'file:///tmp/output';";
 
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Order-2.gld");
     }
@@ -657,7 +444,7 @@ public class TestTezCompiler {
         String query =
                 "a = load 'file:///tmp/input' using org.apache.pig.backend.hadoop.hbase.HBaseStorage(',') as (x:int, y:int);" +
                 "b = order a by x;" +
-                "STORE b INTO 'file:///tmp/pigoutput';";
+                "STORE b INTO 'file:///tmp/output';";
 
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Order-3.gld");
         setProperty("pig.sort.readonce.loadfuncs", null);
@@ -672,7 +459,7 @@ public class TestTezCompiler {
                 "b = load 'file:///tmp/input2' as (x:int, z:int);" +
                 "c = cogroup a by x, b by x;" +
                 "d = foreach c generate group, COUNT(a.y), COUNT(b.z);" +
-                "store d into 'file:///tmp/pigoutput/d';";
+                "store d into 'file:///tmp/output/d';";
 
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Cogroup-1.gld");
     }
@@ -682,9 +469,9 @@ public class TestTezCompiler {
         String query =
                 "a = load 'file:///tmp/input' as (x:int, y:int);" +
                 "split a into b if x <= 5, c if x <= 10, d if x >10;" +
-                "store b into 'file:///tmp/pigoutput/b';" +
-                "store c into 'file:///tmp/pigoutput/c';" +
-                "store d into 'file:///tmp/pigoutput/d';";
+                "store b into 'file:///tmp/output/b';" +
+                "store c into 'file:///tmp/output/c';" +
+                "store d into 'file:///tmp/output/d';";
 
         setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + true);
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-1.gld");
@@ -713,14 +500,14 @@ public class TestTezCompiler {
                 // Needs to be removed in Tez plan as well.
                 "f1 = limit f 1;" +
                 "f2 = union d1, f1;" +
-                "store b1 into 'file:///tmp/pigoutput/b1';" +
-                "store b2 into 'file:///tmp/pigoutput/b2';" +
-                "store c1 into 'file:///tmp/pigoutput/c1';" +
-                "store c3 into 'file:///tmp/pigoutput/c1';" +
-                "store d1 into 'file:///tmp/pigoutput/d1';" +
-                "store e1 into 'file:///tmp/pigoutput/e1';" +
-                "store f1 into 'file:///tmp/pigoutput/f1';" +
-                "store f2 into 'file:///tmp/pigoutput/f2';";
+                "store b1 into 'file:///tmp/output/b1';" +
+                "store b2 into 'file:///tmp/output/b2';" +
+                "store c1 into 'file:///tmp/output/c1';" +
+                "store c3 into 'file:///tmp/output/c1';" +
+                "store d1 into 'file:///tmp/output/d1';" +
+                "store e1 into 'file:///tmp/output/e1';" +
+                "store f1 into 'file:///tmp/output/f1';" +
+                "store f2 into 'file:///tmp/output/f2';";
 
         setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + true);
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-2.gld");
@@ -736,8 +523,8 @@ public class TestTezCompiler {
                 "b = foreach b generate group, COUNT(a.x);" +
                 "c = group a by (x,y);" +
                 "c = foreach c generate group, COUNT(a.y);" +
-                "store b into 'file:///tmp/pigoutput/b';" +
-                "store c into 'file:///tmp/pigoutput/c';";
+                "store b into 'file:///tmp/output/b';" +
+                "store c into 'file:///tmp/output/c';";
 
         setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + true);
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-3.gld");
@@ -753,9 +540,9 @@ public class TestTezCompiler {
                 "c = join a by x, b by x;" +
                 "d = foreach c generate $0, $1, $3;" +
                 "e = foreach c generate $0, $1, $2, $3;" +
-                "store c into 'file:///tmp/pigoutput/c';" +
-                "store d into 'file:///tmp/pigoutput/d';" +
-                "store e into 'file:///tmp/pigoutput/e';";
+                "store c into 'file:///tmp/output/c';" +
+                "store d into 'file:///tmp/output/d';" +
+                "store e into 'file:///tmp/output/e';";
 
         setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + true);
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-4.gld");
@@ -768,13 +555,13 @@ public class TestTezCompiler {
         String query =
                 "a = load 'file:///tmp/input' as (x:int, y:int);" +
                 "b = group a by x;" + //b: {group: int,a: {(x: int,y: int)}}
-                "store b into 'file:///tmp/pigoutput/b';" +
+                "store b into 'file:///tmp/output/b';" +
                 "c = foreach b generate a.x, a.y;" + //c: {{(x: int)},{(y: int)}}
-                "store c into 'file:///tmp/pigoutput/c';" +
+                "store c into 'file:///tmp/output/c';" +
                 "d = foreach b GENERATE FLATTEN(a);" + //d: {a::x: int,a::y: int}
-                "store d into 'file:///tmp/pigoutput/d';" +
+                "store d into 'file:///tmp/output/d';" +
                 "e = foreach d GENERATE a::x, a::y;" +
-                "store e into 'file:///tmp/pigoutput/e';";
+                "store e into 'file:///tmp/output/e';";
 
         setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + true);
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-5.gld");
@@ -789,8 +576,8 @@ public class TestTezCompiler {
                 "b = group a by x;" +
                 "c = foreach b generate group, COUNT(a) as cnt;" +
                 "SPLIT a into d if (2 * c.cnt) < y, e OTHERWISE;" +
-                "store d into 'file:///tmp/pigoutput1';" +
-                "store e into 'file:///tmp/pigoutput2';";
+                "store d into 'file:///tmp/output1';" +
+                "store e into 'file:///tmp/output2';";
 
         setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + true);
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-6.gld");
@@ -807,7 +594,7 @@ public class TestTezCompiler {
                 "c = join a by $0, b by $0 using 'replicated';" +
                 "d = join a by $1, b by $1 using 'replicated';" +
                 "e = union c,d;" +
-                "store e into 'file:///tmp/pigoutput';";
+                "store e into 'file:///tmp/output';";
 
         setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + true);
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-7.gld");
@@ -826,7 +613,7 @@ public class TestTezCompiler {
                 "c = foreach c generate $0 as c1;" +
                 "d = group a by x;" +
                 "e = foreach d generate group, b.b1, c.c1;" +
-                "store e into 'file:///tmp/pigoutput';";
+                "store e into 'file:///tmp/output';";
 
         setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + true);
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-8.gld");
@@ -836,67 +623,18 @@ public class TestTezCompiler {
     }
 
     @Test
-    public void testMultiQueryMultipleReplicateJoinWithUnion() throws Exception {
-        // Replicate joins are from a split
-        String query =
-                "a = load 'file:///tmp/input1' as (x:int, y:int);" +
-                "b = load 'file:///tmp/input2' as (x:int, y:int);" +
-                "c = load 'file:///tmp/input3' as (x:int, y:int);" +
-                "d = union a, b;" +
-                "e = filter c by y < 2;" +
-                "f = filter c by y > 5;" +
-                "g = join d by x, e by x using 'replicated';" +
-                "h = join g by d::x, f by x using 'replicated';" +
-                "store h into 'file:///tmp/pigoutput';";
-
-        setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + true);
-        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-9.gld");
-        resetScope();
-        setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + false);
-        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-9-OPTOFF.gld");
-
-        // Union is also from a split
-        query =
-                "a = load 'file:///tmp/input1' as (x:int, y:int);" +
-                "b = filter a by x == 2;" +
-                "c = load 'file:///tmp/input3' as (x:int, y:int);" +
-                "d = union a, b;" +
-                "e = filter c by y < 2;" +
-                "f = filter c by y > 5;" +
-                "g = join d by x, e by x using 'replicated';" +
-                "h = join g by d::x, f by x using 'replicated';" +
-                "store h into 'file:///tmp/pigoutput';";
-
-        resetScope();
-        setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + true);
-        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-10.gld");
-        resetScope();
-        setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + false);
-        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-10-OPTOFF.gld");
-    }
-
-    @Test
     public void testUnionStore() throws Exception {
         String query =
                 "a = load 'file:///tmp/input' as (x:int, y:chararray);" +
                 "b = load 'file:///tmp/input' as (y:chararray, x:int);" +
                 "c = union onschema a, b;" +
-                "store c into 'file:///tmp/pigoutput';";
+                "store c into 'file:///tmp/output';";
 
         setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1.gld");
         resetScope();
         setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false);
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-OPTOFF.gld");
-        resetScope();
-        setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
-        query =
-                "a = load 'file:///tmp/input' as (x:int, y:chararray);" +
-                "b = load 'file:///tmp/input' as (y:chararray, x:int);" +
-                "c = union onschema a, b PARALLEL 15;" +
-                "store c into 'file:///tmp/pigoutput';";
-        // Union optimization should be turned off if PARALLEL clause is specified
-        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-OPTOFF.gld");
     }
 
     @Test
@@ -905,15 +643,14 @@ public class TestTezCompiler {
                 "a = load 'file:///tmp/input' as (x:int, y:chararray);" +
                 "b = load 'file:///tmp/input' as (y:chararray, x:int);" +
                 "c = union onschema a, b;" +
-                "store c into 'file:///tmp/pigoutput';";
+                "store c into 'file:///tmp/output';";
 
         setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
         String oldSupported = getProperty(PigConfiguration.PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS);
         String oldUnSupported = getProperty(PigConfiguration.PIG_TEZ_OPT_UNION_UNSUPPORTED_STOREFUNCS);
         setProperty(PigConfiguration.PIG_TEZ_OPT_UNION_UNSUPPORTED_STOREFUNCS, PigStorage.class.getName());
-        // Plan should not have union optimization applied as PigStorage is unsupported
+        // Plan should not have union optimization applied
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-OPTOFF.gld");
-
         resetScope();
         setProperty(PigConfiguration.PIG_TEZ_OPT_UNION_UNSUPPORTED_STOREFUNCS, null);
         setProperty(PigConfiguration.PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS, OrcStorage.class.getName());
@@ -921,37 +658,27 @@ public class TestTezCompiler {
                 "a = load 'file:///tmp/input' as (x:int, y:chararray);" +
                 "b = load 'file:///tmp/input' as (y:chararray, x:int);" +
                 "c = union onschema a, b;" +
-                "store c into 'file:///tmp/pigoutput' using " + DummyStoreWithOutputFormat.class.getName() + "();";
-        // Plan should not have union optimization applied as only ORC is supported
+                "store c into 'file:///tmp/output' using " + DummyStoreWithOutputFormat.class.getName() + "();";
+        // Plan should not have union optimization applied
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-DummyStore-OPTOFF.gld");
 
         resetScope();
-        setProperty(PigConfiguration.PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS, null);
-        query =
-                "a = load 'file:///tmp/input' as (x:int, y:chararray);" +
-                "b = load 'file:///tmp/input' as (y:chararray, x:int);" +
-                "c = union onschema a, b;" +
-                "store c into 'file:///tmp/pigoutput' using " + TestDummyStoreFunc.class.getName() + "();";
-        // Plan should not have union optimization applied as supportsParallelWriteToStoreLocation returns false
-        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-DummyStore2-OPTOFF.gld");
-
-        resetScope();
         setProperty(PigConfiguration.PIG_TEZ_OPT_UNION_UNSUPPORTED_STOREFUNCS, PigStorage.class.getName());
         setProperty(PigConfiguration.PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS, null);
         query =
                 "a = load 'file:///tmp/input' as (x:int, y:chararray);" +
                 "split a into b if x > 5, c if x == 7, d if x == 8, e otherwise;" +
                 "u1 = union onschema b, c;" +
-                "store u1 into 'file:///tmp/pigoutput/u1';" +
+                "store u1 into 'file:///tmp/output/u1';" +
                 //TODO: multiple levels of split not merged
                 "u2 = union onschema a, b, c;" +
-                "store u2 into 'file:///tmp/pigoutput/u2';" +
+                "store u2 into 'file:///tmp/output/u2';" +
                 "u3 = union onschema d, e;" +
-                "store u3 into 'file:///tmp/pigoutput/u3';" +
+                "store u3 into 'file:///tmp/output/u3';" +
                 "j1 = join d by x, a by x using 'replicated';" +
                 "j1 = foreach j1 generate d::x as x, d::y as y;" +
                 "u4 = union onschema j1, a;" +
-                "store u4 into 'file:///tmp/pigoutput/u4';";
+                "store u4 into 'file:///tmp/output/u4';";
 
         // Plan should have union optimization applied even for unsupported storefunc
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-SplitStore.gld");
@@ -969,7 +696,7 @@ public class TestTezCompiler {
                 "c = union onschema a, b;" +
                 "d = group c by x;" +
                 "e = foreach d generate group, SUM(c.y);" +
-                "store e into 'file:///tmp/pigoutput';";
+                "store e into 'file:///tmp/output';";
 
         setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-2.gld");
@@ -985,7 +712,7 @@ public class TestTezCompiler {
                 "c = union onschema a, b;" +
                 "d = load 'file:///tmp/input1' as (x:int, z:chararray);" +
                 "e = join c by x, d by x;" +
-                "store e into 'file:///tmp/pigoutput';";
+                "store e into 'file:///tmp/output';";
 
         setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-3.gld");
@@ -1002,7 +729,7 @@ public class TestTezCompiler {
                 "c = union onschema a, b;" +
                 "d = load 'file:///tmp/input1' as (x:int, z:chararray);" +
                 "e = join c by x, d by x using 'replicated';" +
-                "store e into 'file:///tmp/pigoutput';";
+                "store e into 'file:///tmp/output';";
 
         //TODO: PIG-3856 Not optimized
         setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
@@ -1016,7 +743,7 @@ public class TestTezCompiler {
                 "c = union onschema a, b;" +
                 "d = load 'file:///tmp/input1' as (x:int, z:chararray);" +
                 "e = join d by x, c by x using 'replicated';" +
-                "store e into 'file:///tmp/pigoutput';";
+                "store e into 'file:///tmp/output';";
 
         // Optimized
         setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
@@ -1035,7 +762,7 @@ public class TestTezCompiler {
                 "c = union onschema a, b;" +
                 "d = load 'file:///tmp/input1' as (x:int, z:chararray);" +
                 "e = join c by x, d by x using 'skewed';" +
-                "store e into 'file:///tmp/pigoutput';";
+                "store e into 'file:///tmp/output';";
 
         setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6.gld");
@@ -1052,7 +779,7 @@ public class TestTezCompiler {
                 "b = load 'file:///tmp/input' as (y:chararray, x:int);" +
                 "c = union onschema a, b;" +
                 "d = order c by x;" +
-                "store d into 'file:///tmp/pigoutput';";
+                "store d into 'file:///tmp/output';";
 
         setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-7.gld");
@@ -1068,7 +795,7 @@ public class TestTezCompiler {
                 "b = load 'file:///tmp/input' as (y:chararray, x:int);" +
                 "c = union onschema a, b;" +
                 "d = limit c 1;" +
-                "store d into 'file:///tmp/pigoutput';";
+                "store d into 'file:///tmp/output';";
 
         setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-8.gld");
@@ -1084,9 +811,9 @@ public class TestTezCompiler {
                 "split a into a1 if x > 100, a2 otherwise;" +
                 "c = union onschema a1, a2, b;" +
                 "split c into d if x > 500, e otherwise;" +
-                "store a2 into 'file:///tmp/pigoutput/a2';" +
-                "store d into 'file:///tmp/pigoutput/d';" +
-                "store e into 'file:///tmp/pigoutput/e';";
+                "store a2 into 'file:///tmp/output/a2';" +
+                "store d into 'file:///tmp/output/d';" +
+                "store e into 'file:///tmp/output/e';";
 
         setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-9.gld");
@@ -1104,8 +831,8 @@ public class TestTezCompiler {
                 "d = load 'file:///tmp/input1' as (x:int, y:chararray);" +
                 "e = union onschema c, d;" +
                 "f = group e by x;" +
-                "store e into 'file:///tmp/pigoutput1';" +
-                "store f into 'file:///tmp/pigoutput2';";
+                "store e into 'file:///tmp/output1';" +
+                "store f into 'file:///tmp/output2';";
 
         setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-10.gld");
@@ -1123,7 +850,7 @@ public class TestTezCompiler {
                 "c = union onschema a, b;" +
                 "d = load 'file:///tmp/input1' as (x:int, y:chararray);" +
                 "e = union onschema c, d;" +
-                "store e into 'file:///tmp/pigoutput';";
+                "store e into 'file:///tmp/output';";
 
         setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-11.gld");
@@ -1145,10 +872,10 @@ public class TestTezCompiler {
                 "c2 = foreach c generate y, x;" +
                 "c3 = union c1, c2;" +
                 "a1 = union onschema b3, c3;" +
-                "store a1 into 'file:///tmp/pigoutput1';" +
+                "store a1 into 'file:///tmp/output1';" +
                 "d = load 'file:///tmp/input1' as (x:int, z:chararray);" +
                 "e = join a1 by x, d by x using 'skewed';" +
-                "store e into 'file:///tmp/pigoutput2';";
+                "store e into 'file:///tmp/output2';";
 
         setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-12.gld");
@@ -1165,7 +892,7 @@ public class TestTezCompiler {
                 "c = union onschema a, b;" +
                 "d = load 'file:///tmp/input1' as (x:int, z:chararray);" +
                 "e = join c by x, d by x using 'replicated';" +
-                "store e into 'file:///tmp/pigoutput';";
+                "store e into 'file:///tmp/output';";
 
         setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-13.gld");
@@ -1179,7 +906,7 @@ public class TestTezCompiler {
                 "c = union onschema a, b;" +
                 "d = load 'file:///tmp/input1' as (x:int, z:chararray);" +
                 "e = join d by x, c by x using 'replicated';" +
-                "store e into 'file:///tmp/pigoutput';";
+                "store e into 'file:///tmp/output';";
 
         resetScope();
         setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
@@ -1197,7 +924,7 @@ public class TestTezCompiler {
                 "c = union onschema a, b;" +
                 "d = load 'file:///tmp/input1' as (x:int, z:chararray);" +
                 "e = join c by x, d by x using 'skewed';" +
-                "store e into 'file:///tmp/pigoutput';";
+                "store e into 'file:///tmp/output';";
 
         setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-15.gld");
@@ -1211,7 +938,7 @@ public class TestTezCompiler {
                 "c = union onschema a, b;" +
                 "d = load 'file:///tmp/input1' as (x:int, z:chararray);" +
                 "e = join d by x, c by x using 'skewed';" +
-                "store e into 'file:///tmp/pigoutput';";
+                "store e into 'file:///tmp/output';";
 
         resetScope();
         setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
@@ -1229,7 +956,7 @@ public class TestTezCompiler {
                 "c = union onschema a, b;" +
                 "d = load 'file:///tmp/input1' as (x:int, z:chararray);" +
                 "e = filter c by x == d.x;" +
-                "store e into 'file:///tmp/pigoutput';";
+                "store e into 'file:///tmp/output';";
 
         setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-17.gld");
@@ -1243,7 +970,7 @@ public class TestTezCompiler {
                 "c = union onschema a, b;" +
                 "d = load 'file:///tmp/input1' as (x:int, z:chararray);" +
                 "e = filter d by x == c.x;" +
-                "store e into 'file:///tmp/pigoutput';";
+                "store e into 'file:///tmp/output';";
 
         resetScope();
         setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
@@ -1254,76 +981,11 @@ public class TestTezCompiler {
     }
 
     @Test
-    public void testUnionSplitUnionStore() throws Exception {
-        String query =
-                "a = load 'file:///tmp/input' as (x:int, y:chararray);" +
-                "b = load 'file:///tmp/input1' as (y:chararray, x:int);" +
-                "c = union onschema a, b;" +
-                "split c into d if x <= 5, e if x <= 10, f if x >10, g if y == '6';" +
-                "h = union onschema d, e;" +
-                "i = union onschema f, g;" +
-                "store h into 'file:///tmp/pigoutput/1';" +
-                "store i into 'file:///tmp/pigoutput/2';";
-
-        resetScope();
-        setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
-        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-19.gld");
-        resetScope();
-        setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false);
-        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-19-OPTOFF.gld");
-
-        // With a join in between
-        query =
-                "a = load 'file:///tmp/input' as (x:chararray);" +
-                "b = load 'file:///tmp/input' as (x:chararray);" +
-                "c = load 'file:///tmp/input' as (y:chararray);" +
-                "u1 = union onschema a, b;" +
-                "SPLIT u1 INTO r IF x != '', s OTHERWISE;" +
-                "d = JOIN r BY x LEFT, c BY y;" +
-                "u2 = UNION ONSCHEMA d, s;" +
-                "e = FILTER u2 BY x == '';" +
-                "f = FILTER u2 BY x == 'm';" +
-                "u3 = UNION ONSCHEMA e, f;" +
-                "store u3 into 'file:///tmp/pigoutput';";
-
-        setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
-        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-20.gld");
-        resetScope();
-        setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false);
-        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-20-OPTOFF.gld");
-    }
-
-    @Test
-    public void testUnionSplitUnionLimitStore() throws Exception {
-        // Similar to previous testcase but a LIMIT at the end to test a non-store vertex group
-        String query =
-                "a = load 'file:///tmp/input' as (x:chararray);" +
-                "b = load 'file:///tmp/input' as (x:chararray);" +
-                "c = load 'file:///tmp/input' as (y:chararray);" +
-                "u1 = union onschema a, b;" +
-                "SPLIT u1 INTO r IF x != '', s OTHERWISE;" +
-                "d = JOIN r BY x LEFT, c BY y;" +
-                "u2 = UNION ONSCHEMA d, s;" +
-                "e = FILTER u2 BY x == '';" +
-                "f = FILTER u2 BY x == 'm';" +
-                "u3 = UNION ONSCHEMA e, f;" +
-                "SPLIT u3 INTO t if x != '', u OTHERWISE;" +
-                "v = LIMIT t 10;" +
-                "store v into 'file:///tmp/pigoutput';";
-
-        setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
-        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-21.gld");
-        resetScope();
-        setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false);
-        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-21-OPTOFF.gld");
-    }
-
-    @Test
     public void testRank() throws Exception {
         String query =
                 "a = load 'file:///tmp/input1' as (x:int, y:int);" +
                 "b = rank a;" +
-                "store b into 'file:///tmp/pigoutput/d';";
+                "store b into 'file:///tmp/output/d';";
 
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Rank-1.gld");
     }
@@ -1334,7 +996,7 @@ public class TestTezCompiler {
         String query =
                 "a = load 'file:///tmp/input1' as (x:int, y:int);" +
                 "b = rank a by x;" +
-                "store b into 'file:///tmp/pigoutput/d';";
+                "store b into 'file:///tmp/output/d';";
 
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Rank-2.gld");
     }
@@ -1390,32 +1052,5 @@ public class TestTezCompiler {
         assertEquals(TestHelper.sortUDFs(Util.removeSignature(goldenPlanClean)),
                 TestHelper.sortUDFs(Util.removeSignature(compiledPlanClean)));
     }
-
-    public static class TestDummyStoreFunc extends StoreFunc {
-
-        @Override
-        public OutputFormat getOutputFormat() throws IOException {
-            return null;
-        }
-
-        @Override
-        public void setStoreLocation(String location, Job job)
-                throws IOException {
-        }
-
-        @Override
-        public void prepareToWrite(RecordWriter writer) throws IOException {
-        }
-
-        @Override
-        public void putNext(Tuple t) throws IOException {
-        }
-
-        @Override
-        public Boolean supportsParallelWriteToStoreLocation() {
-            return false;
-        }
-
-    }
 }
 

Modified: pig/branches/spark/test/org/apache/pig/tez/TestTezGraceParallelism.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/tez/TestTezGraceParallelism.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/tez/TestTezGraceParallelism.java (original)
+++ pig/branches/spark/test/org/apache/pig/tez/TestTezGraceParallelism.java Fri Feb 24 03:34:37 2017
@@ -117,15 +117,15 @@ public class TestTezGraceParallelism {
         Util.createLogAppender("testDecreaseParallelism", writer, new Class[]{PigGraceShuffleVertexManager.class, ShuffleVertexManager.class});
         try {
             // DAG: 47 \
-            //           -> 49(join) -> 52(distinct) -> 56(group)
+            //           -> 49(join) -> 52(distinct) -> 61(group)
             //      48 /
             // Parallelism at compile time:
             // DAG: 47(1) \
-            //              -> 49(2) -> 52(20) -> 56(200)
+            //              -> 49(2) -> 52(20) -> 61(200)
             //      48(1) /
             // However, when 49 finishes, the actual output of 49 only justify parallelism 1.
-            // We adjust the parallelism for 56 to 7 based on this.
-            // At runtime, ShuffleVertexManager still kick in and further reduce parallelism from 7 to 1.
+            // We adjust the parallelism for 61 to 100 based on this.
+            // At runtime, ShuffleVertexManager still kick in and further reduce parallelism from 100 to 1.
             //
             pigServer.registerQuery("A = load '" + INPUT_DIR + "/" + INPUT_FILE1 + "' as (name:chararray, age:int);");
             pigServer.registerQuery("B = load '" + INPUT_DIR + "/" + INPUT_FILE2 + "' as (name:chararray, gender:chararray);");
@@ -140,10 +140,10 @@ public class TestTezGraceParallelism {
                             "('F',1349L)", "('M',1373L)"});
             Util.checkQueryOutputsAfterSort(iter, expectedResults);
             assertTrue(writer.toString().contains("Initialize parallelism for scope-52 to 18"));
-            assertTrue(writer.toString().contains("Initialize parallelism for scope-56 to 7"));
+            assertTrue(writer.toString().contains("Initialize parallelism for scope-61 to 7"));
             assertTrue(writer.toString().contains("Reduce auto parallelism for vertex: scope-49 to 1 from 2"));
             assertTrue(writer.toString().contains("Reduce auto parallelism for vertex: scope-52 to 1 from 18"));
-            assertTrue(writer.toString().contains("Reduce auto parallelism for vertex: scope-56 to 1 from 7"));
+            assertTrue(writer.toString().contains("Reduce auto parallelism for vertex: scope-61 to 1 from 7"));
         } finally {
             Util.removeLogAppender("testDecreaseParallelism", PigGraceShuffleVertexManager.class, ShuffleVertexManager.class);
         }
@@ -217,8 +217,8 @@ public class TestTezGraceParallelism {
                 count++;
             }
             assertEquals(count, 20);
-            assertTrue(writer.toString().contains("All predecessors for scope-79 are finished, time to set parallelism for scope-80"));
-            assertTrue(writer.toString().contains("Initialize parallelism for scope-80 to 10"));
+            assertTrue(writer.toString().contains("All predecessors for scope-84 are finished, time to set parallelism for scope-85"));
+            assertTrue(writer.toString().contains("Initialize parallelism for scope-85 to 10"));
         } finally {
             Util.removeLogAppender("testJoinWithDifferentDepth", PigGraceShuffleVertexManager.class);
         }
@@ -262,9 +262,9 @@ public class TestTezGraceParallelism {
         StringWriter writer = new StringWriter();
         Util.createLogAppender("testJoinWithUnion", writer, PigGraceShuffleVertexManager.class);
         try {
-            // DAG: 29 -> 32 -> 36 \
-            //                       -> 55 (vertex group) -> 51
-            //      37 -> 40 -> 44 /
+            // DAG: 29 -> 32 -> 41 \
+            //                       -> 70 (vertex group) -> 61
+            //      42 -> 45 -> 54 /
             pigServer.registerQuery("A = load '" + INPUT_DIR + "/" + INPUT_FILE2 + "' as (name:chararray, gender:chararray);");
             pigServer.registerQuery("B = distinct A;");
             pigServer.registerQuery("C = group B by name;");
@@ -280,8 +280,8 @@ public class TestTezGraceParallelism {
                 count++;
             }
             assertEquals(count, 20);
-            assertTrue(writer.toString().contains("time to set parallelism for scope-36"));
-            assertTrue(writer.toString().contains("time to set parallelism for scope-44"));
+            assertTrue(writer.toString().contains("time to set parallelism for scope-41"));
+            assertTrue(writer.toString().contains("time to set parallelism for scope-54"));
         } finally {
             Util.removeLogAppender("testJoinWithUnion", PigGraceShuffleVertexManager.class);
         }
@@ -322,33 +322,4 @@ public class TestTezGraceParallelism {
             super.setStoreLocation(location, job);
         }
     }
-
-    @Test
-    // See PIG-4786
-    public void testCross() throws IOException{
-        // scope-90 is the cross vertex. It should not use PigGraceShuffleVertexManager
-        NodeIdGenerator.reset();
-        PigServer.resetScope();
-        StringWriter writer = new StringWriter();
-        Util.createLogAppender("testCross", writer, PigGraceShuffleVertexManager.class);
-        File outputDir = File.createTempFile("intemediate", "txt");
-        outputDir.delete();
-        pigServer.getPigContext().getProperties().setProperty("mapreduce.input.fileinputformat.split.maxsize", "3000");
-        pigServer.getPigContext().getProperties().setProperty("pig.noSplitCombination", "true");
-        pigServer.registerQuery("A = load '" + INPUT_DIR + "/" + INPUT_FILE2 + "' as (name:chararray, gender:chararray);");
-        pigServer.registerQuery("B = order A by name;");
-        pigServer.registerQuery("C = distinct B;");
-        pigServer.registerQuery("D = load '" + INPUT_DIR + "/" + INPUT_FILE1 + "' as (name:chararray, age:int);");
-        pigServer.registerQuery("E = group D by name;");
-        pigServer.registerQuery("F = foreach E generate group as name, AVG(D.age) as avg_age;");
-        pigServer.registerQuery("G = cross C, F;");
-        Iterator<Tuple> iter = pigServer.openIterator("G");
-        int count = 0;
-        while (iter.hasNext()) {
-            iter.next();
-            count++;
-        }
-        assertEquals(count, 400);
-        assertFalse(writer.toString().contains("scope-90"));
-    }
 }

Modified: pig/branches/spark/test/org/apache/pig/tez/TestTezJobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/tez/TestTezJobControlCompiler.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/tez/TestTezJobControlCompiler.java (original)
+++ pig/branches/spark/test/org/apache/pig/tez/TestTezJobControlCompiler.java Fri Feb 24 03:34:37 2017
@@ -21,9 +21,7 @@ import static org.junit.Assert.assertEqu
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
-import java.io.File;
 import java.io.IOException;
-import java.lang.reflect.Field;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -31,20 +29,17 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Properties;
 
-import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigServer;
 import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.InputSizeReducerEstimator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.tez.TezJobCompiler;
 import org.apache.pig.backend.hadoop.executionengine.tez.TezLauncher;
@@ -53,7 +48,6 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainerNode;
-import org.apache.pig.backend.hadoop.executionengine.tez.runtime.PigGraceShuffleVertexManager;
 import org.apache.pig.builtin.PigStorage;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.plan.OperatorKey;
@@ -63,11 +57,8 @@ import org.apache.pig.test.junit.Ordered
 import org.apache.pig.test.junit.OrderedJUnit4Runner.TestOrder;
 import org.apache.pig.tools.pigstats.ScriptState;
 import org.apache.pig.tools.pigstats.tez.TezScriptState;
-import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.Vertex;
-import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
-import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -86,8 +77,7 @@ import org.junit.runner.RunWith;
     "testTezParallelismEstimatorFilterFlatten",
     "testTezParallelismEstimatorHashJoin",
     "testTezParallelismEstimatorSplitBranch",
-    "testTezParallelismDefaultParallelism",
-    "testShuffleVertexManagerConfig"
+    "testTezParallelismDefaultParallelism"
 })
 public class TestTezJobControlCompiler {
     private static PigContext pc;
@@ -99,7 +89,6 @@ public class TestTezJobControlCompiler {
     public static void setUpBeforeClass() throws Exception {
         input1 = Util.createTempFileDelOnExit("input1", "txt").toURI();
         input2 = Util.createTempFileDelOnExit("input2", "txt").toURI();
-        FileUtils.deleteDirectory(new File("/tmp/pigoutput"));
     }
 
     @AfterClass
@@ -118,7 +107,7 @@ public class TestTezJobControlCompiler {
                 "a = load '" + input1 +"' as (x:int, y:int);" +
                 "b = filter a by x > 0;" +
                 "c = foreach b generate y;" +
-                "store c into 'file:///tmp/pigoutput';";
+                "store c into 'file:///tmp/output';";
 
         Pair<TezOperPlan, DAG> compiledPlan = compile(query);
 
@@ -138,7 +127,7 @@ public class TestTezJobControlCompiler {
                 "a = load '" + input1 +"' as (x:int, y:int);" +
                 "b = group a by x;" +
                 "c = foreach b generate group, a;" +
-                "store c into 'file:///tmp/pigoutput';";
+                "store c into 'file:///tmp/output';";
 
         Pair<TezOperPlan, DAG> compiledPlan = compile(query);
 
@@ -170,7 +159,7 @@ public class TestTezJobControlCompiler {
                 "b = load '" + input2 +"' as (x:int, z:int);" +
                 "c = join a by x, b by x;" +
                 "d = foreach c generate a::x as x, y, z;" +
-                "store d into 'file:///tmp/pigoutput';";
+                "store d into 'file:///tmp/output';";
 
         Pair<TezOperPlan, DAG> compiledPlan = compile(query);
 
@@ -300,72 +289,6 @@ public class TestTezJobControlCompiler {
         TezOperator leafOper = compiledPlan.first.getLeaves().get(0);
         Vertex leafVertex = compiledPlan.second.getVertex(leafOper.getOperatorKey().toString());
         assertEquals(leafVertex.getParallelism(), 5);
-        pc.defaultParallel = -1;
-    }
-
-    @Test
-    public void testShuffleVertexManagerConfig() throws Exception{
-        pc.getProperties().setProperty(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, "0.3");
-        pc.getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "500");
-
-        try {
-
-            String query = "a = load '10' using " + ArbitarySplitsLoader.class.getName()
-                    + "() as (name:chararray, age:int, gpa:double);"
-                    + "b = limit a 5;"
-                    + "c = group b by name;"
-                    + "store c into 'output';";
-
-            VertexManagerPluginDescriptor vmPlugin = getLeafVertexVMPlugin(query);
-            Configuration vmPluginConf = TezUtils.createConfFromUserPayload(vmPlugin.getUserPayload());
-
-            // Case of grace auto parallelism (PigGraceShuffleVertexManager)
-            assertEquals(PigGraceShuffleVertexManager.class.getName(), vmPlugin.getClassName());
-            // min and max src fraction, auto parallel, desired size, bytes.per.reducer, pig.tez.plan and pigcontext
-            assertEquals(7, vmPluginConf.size());
-            assertEquals("0.3", vmPluginConf.get(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION));
-            assertEquals("0.3", vmPluginConf.get(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION));
-            assertEquals("true", vmPluginConf.get(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL));
-            assertEquals("500", vmPluginConf.get(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE));
-            assertEquals("500", vmPluginConf.get(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM));
-
-            // Case of auto parallelism (ShuffleVertexManager)
-            pc.getProperties().setProperty(PigConfiguration.PIG_TEZ_GRACE_PARALLELISM, "false");
-            vmPlugin = getLeafVertexVMPlugin(query);
-            vmPluginConf = TezUtils.createConfFromUserPayload(vmPlugin.getUserPayload());
-            assertEquals(ShuffleVertexManager.class.getName(), vmPlugin.getClassName());
-            // min and max src fraction, auto parallel, desired size
-            assertEquals(4, vmPluginConf.size());
-            assertEquals("0.3", vmPluginConf.get(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION));
-            assertEquals("0.3", vmPluginConf.get(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION));
-            assertEquals("true", vmPluginConf.get(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL));
-            assertEquals("500", vmPluginConf.get(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE));
-
-            // Case of default parallel or PARALLEL (ShuffleVertexManager)
-            pc.defaultParallel = 2;
-            vmPlugin = getLeafVertexVMPlugin(query);
-            vmPluginConf = TezUtils.createConfFromUserPayload(vmPlugin.getUserPayload());
-            assertEquals(ShuffleVertexManager.class.getName(), vmPlugin.getClassName());
-            // min and max src fraction
-            assertEquals(2, vmPluginConf.size());
-            assertEquals("0.3", vmPluginConf.get(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION));
-            assertEquals("0.3", vmPluginConf.get(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION));
-        } finally {
-            pc.getProperties().remove(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART);
-            pc.getProperties().remove(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM);
-            pc.getProperties().remove(PigConfiguration.PIG_TEZ_GRACE_PARALLELISM);
-            pc.defaultParallel = -1;
-        }
-    }
-
-    private VertexManagerPluginDescriptor getLeafVertexVMPlugin(String query) throws Exception {
-        Pair<TezOperPlan, DAG> compiledPlan = compile(query);
-        TezOperator leafOper = compiledPlan.first.getLeaves().get(0);
-        Vertex leafVertex = compiledPlan.second.getVertex(leafOper.getOperatorKey().toString());
-        Field vmPluginField = Vertex.class.getDeclaredField("vertexManagerPlugin");
-        vmPluginField.setAccessible(true);
-        VertexManagerPluginDescriptor vmPlugin = (VertexManagerPluginDescriptor) vmPluginField.get(leafVertex);
-        return vmPlugin;
     }
 
     private Pair<TezOperPlan, DAG> compile(String query) throws Exception {



Mime
View raw message