pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From roh...@apache.org
Subject svn commit: r1736916 [1/2] - in /pig/branches/branch-0.15: ./ contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/ src/org/apache/pig/builtin/ src/org/apache/pig/newplan/logical/rules/ test/org/apache/pig/test/ test/org/apache/pi...
Date Mon, 28 Mar 2016 18:54:01 GMT
Author: rohini
Date: Mon Mar 28 18:54:01 2016
New Revision: 1736916

URL: http://svn.apache.org/viewvc?rev=1736916&view=rev
Log:
PIG-4851: Null not padded when input has less fields than declared schema for some loader (rohini)

Modified:
    pig/branches/branch-0.15/CHANGES.txt
    pig/branches/branch-0.15/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestCSVStorage.java
    pig/branches/branch-0.15/src/org/apache/pig/builtin/PigStorage.java
    pig/branches/branch-0.15/src/org/apache/pig/newplan/logical/rules/TypeCastInserter.java
    pig/branches/branch-0.15/test/org/apache/pig/test/TestMergeForEachOptimization.java
    pig/branches/branch-0.15/test/org/apache/pig/test/TestMultiQueryCompiler.java
    pig/branches/branch-0.15/test/org/apache/pig/test/TestNewPlanFilterAboveForeach.java
    pig/branches/branch-0.15/test/org/apache/pig/test/TestNewPlanFilterRule.java
    pig/branches/branch-0.15/test/org/apache/pig/test/TestNewPlanLogicalOptimizer.java
    pig/branches/branch-0.15/test/org/apache/pig/test/TestNewPlanPushDownForeachFlatten.java
    pig/branches/branch-0.15/test/org/apache/pig/test/TestNewPlanPushUpFilter.java
    pig/branches/branch-0.15/test/org/apache/pig/test/TestPigStorage.java
    pig/branches/branch-0.15/test/org/apache/pig/test/data/DotFiles/explain1.dot

Modified: pig/branches/branch-0.15/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.15/CHANGES.txt?rev=1736916&r1=1736915&r2=1736916&view=diff
==============================================================================
--- pig/branches/branch-0.15/CHANGES.txt (original)
+++ pig/branches/branch-0.15/CHANGES.txt Mon Mar 28 18:54:01 2016
@@ -28,6 +28,8 @@ OPTIMIZATIONS
 
 BUG FIXES
 
+PIG-4851: Null not padded when input has less fields than declared schema for some loader (rohini)
+
 PIG-4812: Register Groovy UDF with relative path does not work (daijy)
 
 PIG-4808: PluckTuple overwrites regex if used more than once in the same script (eval via daijy)

Modified: pig/branches/branch-0.15/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestCSVStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.15/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestCSVStorage.java?rev=1736916&r1=1736915&r2=1736916&view=diff
==============================================================================
--- pig/branches/branch-0.15/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestCSVStorage.java (original)
+++ pig/branches/branch-0.15/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestCSVStorage.java Mon Mar 28 18:54:01 2016
@@ -29,17 +29,19 @@ import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
+import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.test.MiniCluster;
 import org.apache.pig.test.Util;
+import org.junit.Assert;
 import org.junit.Test;
 
 public class TestCSVStorage {
     protected static final Log LOG = LogFactory.getLog(TestCSVStorage.class);
-    
+
     private PigServer pigServer;
     private MiniCluster cluster;
-    
+
     public TestCSVStorage() throws ExecException, IOException {
         cluster = MiniCluster.buildCluster();
         pigServer = new PigServer(ExecType.LOCAL, new Properties());
@@ -59,8 +61,8 @@ public class TestCSVStorage {
         Iterator<Tuple> it = pigServer.openIterator("a");
         assertEquals(Util.createTuple(new String[] {"foo", "bar", "baz"}), it.next());
     }
-   
-    @Test 
+
+    @Test
     public void testQuotedCommas() throws IOException {
         String inputFileName = "TestCSVLoader-quotedcommas.txt";
         Util.createLocalInputFile(inputFileName, new String[] {"\"foo,bar,baz\"", "fee,foe,fum"});
@@ -71,11 +73,11 @@ public class TestCSVStorage {
         assertEquals(Util.createTuple(new String[] {"foo,bar,baz", null, null}), it.next());
         assertEquals(Util.createTuple(new String[] {"fee", "foe", "fum"}), it.next());
     }
-    
+
     @Test
     public void testQuotedQuotes() throws IOException {
         String inputFileName = "TestCSVLoader-quotedquotes.txt";
-        Util.createLocalInputFile(inputFileName, 
+        Util.createLocalInputFile(inputFileName,
                 new String[] {"\"foo,\"\"bar\"\",baz\"", "\"\"\"\"\"\"\"\""});
         String script = "a = load '" + inputFileName + "' using org.apache.pig.piggybank.storage.CSVLoader() " +
         "   as (a:chararray); ";
@@ -84,5 +86,21 @@ public class TestCSVStorage {
         assertEquals(Util.createTuple(new String[] {"foo,\"bar\",baz"}), it.next());
         assertEquals(Util.createTuple(new String[] {"\"\"\""}), it.next());
     }
-    
+
+    @Test
+    public void testNullPadding() throws IOException {
+        String inputFileName = "TestCSVLoader-nullpadding.txt";
+        Util.createLocalInputFile(inputFileName, new String[] { "a", "b,", "c,d", ",e"});
+        String script = "a = load '" + inputFileName + "' using org.apache.pig.piggybank.storage.CSVLoader() " +
+        "   as (field1, field2); dump a;";
+        Util.registerMultiLineQuery(pigServer, script);
+        Iterator<Tuple> it = pigServer.openIterator("a");
+        assertEquals(Util.createTuple(new DataByteArray[] {new DataByteArray("a"), null}), it.next());
+        assertEquals(Util.createTuple(new DataByteArray[] {new DataByteArray("b"), null}), it.next());
+        assertEquals(Util.createTuple(new DataByteArray[] {new DataByteArray("c"), new DataByteArray("d")}), it.next());
+        assertEquals(Util.createTuple(new DataByteArray[] {new DataByteArray(""), new DataByteArray("e")}), it.next());
+        Assert.assertFalse(it.hasNext());
+    }
+
+
 }

Modified: pig/branches/branch-0.15/src/org/apache/pig/builtin/PigStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.15/src/org/apache/pig/builtin/PigStorage.java?rev=1736916&r1=1736915&r2=1736916&view=diff
==============================================================================
--- pig/branches/branch-0.15/src/org/apache/pig/builtin/PigStorage.java (original)
+++ pig/branches/branch-0.15/src/org/apache/pig/builtin/PigStorage.java Mon Mar 28 18:54:01 2016
@@ -71,7 +71,6 @@ import org.apache.pig.bzip2r.Bzip2TextIn
 import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.util.CastUtils;
 import org.apache.pig.impl.util.ObjectSerializer;
@@ -172,7 +171,7 @@ LoadPushDown, LoadMetadata, StoreMetadat
         overwrite.setArgs(1);
         overwrite.setArgName("overwrite");
         validOptions.addOption(overwrite);
-        
+
         return validOptions;
     }
 
@@ -219,7 +218,7 @@ LoadPushDown, LoadMetadata, StoreMetadat
                 if ("true".equalsIgnoreCase(value)) {
                     overwriteOutput = true;
                 }
-            }       
+            }
             dontLoadSchema = configuredOptions.hasOption("noschema");
             tagFile = configuredOptions.hasOption(TAG_SOURCE_FILE);
             tagPath = configuredOptions.hasOption(TAG_SOURCE_PATH);
@@ -296,31 +295,18 @@ LoadPushDown, LoadMetadata, StoreMetadat
             Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass(),
                     new String[] {signature});
             String serializedSchema = p.getProperty(signature+".schema");
-            if (serializedSchema != null) {
-                try {
-                    schema = new ResourceSchema(Utils.getSchemaFromString(serializedSchema));
-                } catch (ParserException e) {
-                    mLog.error("Unable to parse serialized schema " + serializedSchema, e);
-                    // all bets are off - there's no guarantee that we'll return
-                    // either the fields in the data or the fields in the schema
-                    // the user specified (or required)
-                }
+            if (serializedSchema == null) return tup;
+            try {
+                schema = new ResourceSchema(Utils.getSchemaFromString(serializedSchema));
+            } catch (ParserException e) {
+                mLog.error("Unable to parse serialized schema " + serializedSchema, e);
+                // all bets are off - there's no guarantee that we'll return
+                // either the fields in the data or the fields in the schema
+                // the user specified (or required)
             }
         }
 
-        if (schema == null) {
-            // if the number of required fields are less than or equal to 
-            // the number of fields in the data then we're OK as we've already
-            // read only the required number of fields into the tuple. If 
-            // more fields are required than are in the data then we'll pad
-            // with nulls:
-            int numRequiredColumns = 0;
-            for (int i = 0; mRequiredColumns != null && i < mRequiredColumns.length; i++)
-                if(mRequiredColumns[i])
-                    ++numRequiredColumns;
-            for (int i = tup.size();i < numRequiredColumns; ++i)
-                tup.append(null);
-        } else {
+        if (schema != null) {
             ResourceFieldSchema[] fieldSchemas = schema.getFields();
             int tupleIdx = 0;
             // If some fields have been projected out, the tuple
@@ -332,7 +318,7 @@ LoadPushDown, LoadMetadata, StoreMetadat
                     if (tupleIdx >= tup.size()) {
                         tup.append(null);
                     }
-                    
+
                     Object val = null;
                     if(tup.get(tupleIdx) != null){
                         byte[] bytes = ((DataByteArray) tup.get(tupleIdx)).get();

Modified: pig/branches/branch-0.15/src/org/apache/pig/newplan/logical/rules/TypeCastInserter.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.15/src/org/apache/pig/newplan/logical/rules/TypeCastInserter.java?rev=1736916&r1=1736915&r2=1736916&view=diff
==============================================================================
--- pig/branches/branch-0.15/src/org/apache/pig/newplan/logical/rules/TypeCastInserter.java (original)
+++ pig/branches/branch-0.15/src/org/apache/pig/newplan/logical/rules/TypeCastInserter.java Mon Mar 28 18:54:01 2016
@@ -18,18 +18,13 @@
 package org.apache.pig.newplan.logical.rules;
 
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeSet;
 
 import org.apache.pig.FuncSpec;
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.streaming.StreamingCommand;
 import org.apache.pig.impl.streaming.StreamingCommand.HandleSpec;
-import org.apache.pig.impl.util.Pair;
 import org.apache.pig.newplan.Operator;
 import org.apache.pig.newplan.OperatorPlan;
 import org.apache.pig.newplan.logical.expression.CastExpression;
@@ -58,7 +53,7 @@ public abstract class TypeCastInserter e
     public Transformer getNewTransformer() {
         return new TypeCastInserterTransformer();
     }
-    
+
     public class TypeCastInserterTransformer extends Transformer {
         @Override
         public boolean check(OperatorPlan matched) throws FrontendException {
@@ -77,18 +72,15 @@ public abstract class TypeCastInserter e
             }
 
             // Now that we've narrowed it down to an operation that *can* have casts added,
-            // (because the user specified some types which might not match the data) let's 
+            // (because the user specified some types which might not match the data) let's
             // see if they're actually needed:
             LogicalSchema determinedSchema = determineSchema(op);
-            if(atLeastOneCastNeeded(determinedSchema, s)) {
-                return true;
-            }
-
             if(determinedSchema == null || determinedSchema.size() != s.size()) {
                 // we don't know what the data looks like, but the user has specified
-                // that they want a certain number of fields loaded. We'll use a 
-                // projection (or pruning) to make sure the columns show up (with NULL
-                // values) or are truncated from the right hand side of the input data.
+                // that they want a certain number of fields loaded.
+                return true;
+            }
+            if(atLeastOneCastNeeded(determinedSchema, s)) {
                 return true;
             }
 
@@ -98,7 +90,7 @@ public abstract class TypeCastInserter e
         private boolean atLeastOneCastNeeded(LogicalSchema determinedSchema, LogicalSchema s) {
             for (int i = 0; i < s.size(); i++) {
                 LogicalSchema.LogicalFieldSchema fs = s.getField(i);
-                if (fs.type != DataType.BYTEARRAY && (determinedSchema == null || (!fs.isEqual(determinedSchema.getField(i))))) {
+                if (fs.type != DataType.BYTEARRAY && !fs.isEqual(determinedSchema.getField(i))) {
                     // we have to cast this field from the default BYTEARRAY type to
                     // whatever the user specified in the 'AS' clause of the LOAD
                     // statement (the fs.type).
@@ -120,64 +112,36 @@ public abstract class TypeCastInserter e
                 return;
             }
 
-            if(!atLeastOneCastNeeded(determinedSchema, s) && op instanceof LOLoad) {
-                // we're not going to insert any casts, but we might reduce or increase
-                // the number of columns coming out of the LOAD. If the loader supports
-                // it we'll use the 'requiredColumns' functionality rather than bolting
-                // on a FOREACH
-                Set<Integer> required = new TreeSet<Integer>();
-                for(int i = 0; i < s.size(); ++i) {
-                    // if we know the data source's schema, pick out the columns we need,
-                    // otherwise take the first n
-                    int index = determinedSchema == null ? i : determinedSchema.findField(s.getField(i).uid);
-                    if(index >= 0)
-                        required.add(index);
-                }
-
-                // pass the indices of the fields we need to a pruner, and fire it off
-                // so it configures the LOLoad (and the LoadFunc it contains)
-                Map<LOLoad, Pair<Map<Integer, Set<String>>, Set<Integer>>> requiredMap = 
-                        new HashMap<LOLoad, Pair<Map<Integer,Set<String>>,Set<Integer>>>(1);
-                Pair<Map<Integer, Set<String>>, Set<Integer>> pair = 
-                        new Pair<Map<Integer,Set<String>>, Set<Integer>>(null, required);
-                requiredMap.put((LOLoad) op, pair);
-                new ColumnPruneVisitor(currentPlan, requiredMap , true).visit((LOLoad) op);
-
-                // we only want to process this node once, so mark it:
-                markCastNoNeed(op);
-                return;
-            }
-
             // For every field, build a logical plan.  If the field has a type
             // other than byte array, then the plan will be cast(project).  Else
             // it will just be project.
             LogicalPlan innerPlan = new LogicalPlan();
-            
+
             LOForEach foreach = new LOForEach(currentPlan);
             foreach.setInnerPlan(innerPlan);
             foreach.setAlias(op.getAlias());
             // Insert the foreach into the plan and patch up the plan.
             Operator next = currentPlan.getSuccessors(op).get(0);
             currentPlan.insertBetween(op, foreach, next);
-            
+
             List<LogicalExpressionPlan> exps = new ArrayList<LogicalExpressionPlan>();
             LOGenerate gen = new LOGenerate(innerPlan, exps, new boolean[s.size()]);
             innerPlan.add(gen);
 
             for (int i = 0; i < s.size(); i++) {
                 LogicalSchema.LogicalFieldSchema fs = s.getField(i);
-                
+
                 LOInnerLoad innerLoad = new LOInnerLoad(innerPlan, foreach, i);
-                innerPlan.add(innerLoad);          
+                innerPlan.add(innerLoad);
                 innerPlan.connect(innerLoad, gen);
-                
+
                 LogicalExpressionPlan exp = new LogicalExpressionPlan();
-                
+
                 ProjectExpression prj = new ProjectExpression(exp, i, -1, gen);
                 exp.add(prj);
-                
+
                 if (fs.type != DataType.BYTEARRAY && (determinedSchema == null || (!fs.isEqual(determinedSchema.getField(i))))) {
-                    // Either no schema was determined by loader OR the type 
+                    // Either no schema was determined by loader OR the type
                     // from the "determinedSchema" is different
                     // from the type specified - so we need to cast
                     CastExpression cast = new CastExpression(exp, prj, new LogicalSchema.LogicalFieldSchema(fs));
@@ -187,7 +151,7 @@ public abstract class TypeCastInserter e
                         loadFuncSpec = ((LOLoad)op).getFileSpec().getFuncSpec();
                     } else if (op instanceof LOStream) {
                         StreamingCommand command = ((LOStream)op).getStreamingCommand();
-                        HandleSpec streamOutputSpec = command.getOutputSpec(); 
+                        HandleSpec streamOutputSpec = command.getOutputSpec();
                         loadFuncSpec = new FuncSpec(streamOutputSpec.getSpec());
                     } else {
                         String msg = "TypeCastInserter invoked with an invalid operator class name: " + innerPlan.getClass().getSimpleName();
@@ -199,7 +163,7 @@ public abstract class TypeCastInserter e
             }
             markCastInserted(op);
         }
-        
+
         @Override
         public OperatorPlan reportChanges() {
             return currentPlan;

Modified: pig/branches/branch-0.15/test/org/apache/pig/test/TestMergeForEachOptimization.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.15/test/org/apache/pig/test/TestMergeForEachOptimization.java?rev=1736916&r1=1736915&r2=1736916&view=diff
==============================================================================
--- pig/branches/branch-0.15/test/org/apache/pig/test/TestMergeForEachOptimization.java (original)
+++ pig/branches/branch-0.15/test/org/apache/pig/test/TestMergeForEachOptimization.java Mon Mar 28 18:54:01 2016
@@ -55,37 +55,37 @@ public class TestMergeForEachOptimizatio
     LogicalPlan plan = null;
     PigContext pc = new PigContext( ExecType.LOCAL, new Properties() );
     PigServer pigServer = null;
-  
+
     @Before
     public void setup() throws ExecException {
         pigServer = new PigServer( pc );
     }
-    
+
     @After
     public void tearDown() {
-        
+
     }
-    
+
     /**
      * Basic test case. Two simple FOREACH statements can be merged to one.
-     * @throws Exception 
+     * @throws Exception
      */
-    @Test   
+    @Test
     public void testSimple() throws Exception  {
         String query = "A = load 'file.txt' as (a, b, c);" +
          "B = foreach A generate a+b, c-b;" +
          "C = foreach B generate $0+5, $1;" +
-         "store C into 'empty';";  
+         "store C into 'empty';";
         LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);
-        
+
         int forEachCount1 = getForEachOperatorCount( newLogicalPlan );
         int outputExprCount1 = getOutputExprCount( newLogicalPlan );
         LOForEach foreach1 = getForEachOperator( newLogicalPlan );
         Assert.assertTrue( foreach1.getAlias().equals( "C" ) );
-               
+
         PlanOptimizer optimizer = new MyPlanOptimizer( newLogicalPlan, 3 );
         optimizer.optimize();
-        
+
         int forEachCount2 = getForEachOperatorCount( newLogicalPlan );
         Assert.assertEquals( 1, forEachCount1 - forEachCount2 );
         int outputExprCount2 = getOutputExprCount( newLogicalPlan );
@@ -93,26 +93,26 @@ public class TestMergeForEachOptimizatio
         LOForEach foreach2 = getForEachOperator( newLogicalPlan );
         Assert.assertTrue( foreach2.getAlias().equals( "C" ) );
     }
-    
+
     /**
      * Test more complex case where the first for each in the script has inner plan.
-     * @throws Exception 
+     * @throws Exception
      */
     @Test
     public void testComplex() throws Exception {
         String query = "A = load 'file.txt' as (a:int, b, c:bag{t:tuple(c0:int,c1:int)});" +
          "B = foreach A { S = ORDER c BY $0; generate $0, COUNT(S), SIZE(S); };" +
-         "C = foreach B generate $2+5 as x, $0-$1/2 as y;" + "store C into 'empty';" ;  
+         "C = foreach B generate $2+5 as x, $0-$1/2 as y;" + "store C into 'empty';" ;
         LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);
-        
+
         int forEachCount1 = getForEachOperatorCount( newLogicalPlan );
         int outputExprCount1 = getOutputExprCount( newLogicalPlan );
         LOForEach foreach1 = getForEachOperator( newLogicalPlan );
         Assert.assertTrue( foreach1.getAlias().equals( "C" ) );
-               
+
         PlanOptimizer optimizer = new MyPlanOptimizer( newLogicalPlan, 3 );
         optimizer.optimize();
-        
+
         int forEachCount2 = getForEachOperatorCount( newLogicalPlan );
         // The number of FOREACHes didn't change because one is genereated because of type cast and
         // one is reduced because of the merge.
@@ -125,10 +125,10 @@ public class TestMergeForEachOptimizatio
         Assert.assertTrue(newSchema.getField(0).alias.equals("x"));
         Assert.assertTrue(newSchema.getField(1).alias.equals("y"));
     }
-    
+
     /**
      * One output of first foreach was referred more than once in the second foreach
-     * @throws Exception 
+     * @throws Exception
      */
     @Test
     public void testDuplicateInputs() throws Exception {
@@ -136,84 +136,84 @@ public class TestMergeForEachOptimizatio
          "A1 = foreach A generate (int)a0 as a0, (double)a1 as a1;" +
          "B = group A1 all;" +
          "C = foreach B generate A1;" +
-         "D = foreach C generate SUM(A1.a0), AVG(A1.a1);" + "store D into 'empty';" ;  
+         "D = foreach C generate SUM(A1.a0), AVG(A1.a1);" + "store D into 'empty';" ;
         LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);
-        
+
         Operator store = newLogicalPlan.getSinks().get(0);
         int forEachCount1 = getForEachOperatorCount( newLogicalPlan );
         LOForEach foreach1 = (LOForEach)newLogicalPlan.getPredecessors(store).get(0);
         Assert.assertTrue( foreach1.getAlias().equals( "D" ) );
-               
+
         PlanOptimizer optimizer = new MyPlanOptimizer( newLogicalPlan, 3 );
         optimizer.optimize();
-        
+
         int forEachCount2 = getForEachOperatorCount( newLogicalPlan );
         // The number of FOREACHes didn't change because one is genereated because of type cast and
         // one is reduced because of the merge.
         Assert.assertEquals( 1, forEachCount1 - forEachCount2 );
-        
+
         LOForEach foreach2 = (LOForEach)newLogicalPlan.getPredecessors(store).get(0);
         Assert.assertTrue( foreach2.getAlias().equals( "D" ) );
     }
-    
+
     /**
      * Not all consecutive FOREACHes can be merged. In this case, the second FOREACH statment
      * has inner plan, which cannot be merged with one before it.
-     * @throws Exception 
+     * @throws Exception
      */
     @Test
     public void testNegative1() throws Exception {
         String query = "A = LOAD 'file.txt' as (a, b, c, d:bag{t:tuple(c0:int,c1:int)});" +
          "B = FOREACH A GENERATE a+5 AS u, b-c/2 AS v, d AS w;" +
          "C = FOREACH B { S = ORDER w BY $0; GENERATE $0 as x, COUNT(S) as y; };" +
-         "store C into 'empty';";  
+         "store C into 'empty';";
         LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);
-        
+
         int forEachCount1 = getForEachOperatorCount( newLogicalPlan );
-               
+
         PlanOptimizer optimizer = new MyPlanOptimizer( newLogicalPlan, 3 );
         optimizer.optimize();
         int forEachCount2 = getForEachOperatorCount( newLogicalPlan );
-        
+
         // Actually MergeForEach optimization is happening here. A new foreach will be inserted after A because
-        // of typ casting. The inserted one and the one in B can be merged due to this optimization. However, 
+        // of typ casting. The inserted one and the one in B can be merged due to this optimization. However,
         // the plan cannot be further optimized because C has inner plan.
         Assert.assertEquals( forEachCount1, forEachCount2 );
     }
-    
+
     /**
      * MergeForEach Optimization is off if the first statement has a FLATTEN operator.
-     * @throws Exception 
+     * @throws Exception
      */
     @Test
     public void testNegative2() throws Exception {
         String query = "A = LOAD 'file.txt' as (a, b, c);" +
          "B = FOREACH A GENERATE FLATTEN(a), b, c;" +
-         "C = FOREACH B GENERATE $0, $1+$2;" + "store C into 'empty';" ;  
+         "C = FOREACH B GENERATE $0, $1+$2;" + "store C into 'empty';" ;
         LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);
-        
+
         int forEachCount1 = getForEachOperatorCount( newLogicalPlan );
-               
+
         PlanOptimizer optimizer = new MyPlanOptimizer( newLogicalPlan, 3 );
         optimizer.optimize();
-        
+
         int forEachCount2 = getForEachOperatorCount( newLogicalPlan );
         Assert.assertEquals( 2, forEachCount1 );
         Assert.assertEquals( 2, forEachCount2 );
     }
-    
-    
+
+
     /**
      * Ensure that join input order does not get reversed (PIG-1672)
-     * @throws Exception 
+     * @throws Exception
      */
-    @Test   
+    @Test
     public void testJoinInputOrder() throws Exception  {
         String query = "l1 = load 'y' as (a);" +
          "l2 = load 'z' as (a1,b1,c1,d1);" +
          "f1 = foreach l2 generate a1, b1, c1, d1;" +
          "f2 = foreach f1 generate a1, b1, c1;" +
-         "j1 = join f2 by a1, l1 by a using 'replicated';" + "store j1 into 'empty';" ;  
+         "j1 = join f2 by a1, l1 by a using 'replicated';" + "store j1 into 'empty';" ;
         LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);
 
         int forEachCount1 = getForEachOperatorCount( newLogicalPlan );
@@ -225,15 +225,15 @@ public class TestMergeForEachOptimizatio
         }
         LOForEach foreachL2 = (LOForEach)newLogicalPlan.getSuccessors(l2).get(0);
         foreachL2 = (LOForEach)newLogicalPlan.getSuccessors(foreachL2).get(0);
-        
+
         int outputExprCount1 = ((LOGenerate)foreachL2.getInnerPlan().getSinks().get(0)).getOutputPlans().size();
-               
+
         PlanOptimizer optimizer = new MyPlanOptimizer( newLogicalPlan, 3 );
         optimizer.optimize();
-        
+
         int forEachCount2 = getForEachOperatorCount( newLogicalPlan );
-        Assert.assertEquals( 1, forEachCount1 - forEachCount2 );
-        
+        Assert.assertEquals( 0, forEachCount1 - forEachCount2 );
+
         loads = newLogicalPlan.getSources();
         l2 = null;
         for (Operator load : loads) {
@@ -241,21 +241,21 @@ public class TestMergeForEachOptimizatio
                 l2 = load;
         }
         foreachL2 = (LOForEach)newLogicalPlan.getSuccessors(l2).get(0);
-        
+
         int outputExprCount2 = ((LOGenerate)foreachL2.getInnerPlan().getSinks().get(0)).getOutputPlans().size();
-        
+
         Assert.assertTrue( outputExprCount1 == outputExprCount2 );
         Assert.assertTrue( foreachL2.getAlias().equals( "f2" ) );
-        
+
         LOJoin join = (LOJoin)getOperator(newLogicalPlan, LOJoin.class);
         LogicalRelationalOperator leftInp =
             (LogicalRelationalOperator)newLogicalPlan.getPredecessors(join).get(0);
-        assertEquals("join child left", leftInp.getAlias(), "f2"); 
-        
+        assertEquals("join child left", leftInp.getAlias(), "f2");
+
         LogicalRelationalOperator rightInp =
             (LogicalRelationalOperator)newLogicalPlan.getPredecessors(join).get(1);
-        assertEquals("join child right", rightInp.getAlias(), "l1"); 
-        
+        assertEquals("join child right", rightInp.getAlias(), "l1");
+
     }
 
     private int getForEachOperatorCount(LogicalPlan plan) {
@@ -268,7 +268,7 @@ public class TestMergeForEachOptimizatio
         }
         return count;
     }
-       
+
     private int getOutputExprCount(LogicalPlan plan) throws IOException {
         LOForEach foreach = getForEachOperator( plan );
         LogicalPlan inner = foreach.getInnerPlan();
@@ -276,7 +276,7 @@ public class TestMergeForEachOptimizatio
         LOGenerate gen = (LOGenerate)ops.get( 0 );
         return gen.getOutputPlans().size();
     }
-    
+
     private LOForEach getForEachOperator(LogicalPlan plan) throws IOException {
         Iterator<Operator> ops = plan.getOperators();
         while( ops.hasNext() ) {
@@ -290,7 +290,7 @@ public class TestMergeForEachOptimizatio
         }
         return null;
     }
-    
+
     /**
      * returns first operator that is an instance of given class c
      * @param plan
@@ -303,41 +303,42 @@ public class TestMergeForEachOptimizatio
         while( ops.hasNext() ) {
             Operator op = ops.next();
             if( op.getClass().equals(c)) {
-                return op;          
+                return op;
             }
         }
         return null;
     }
-    
+
 
     public class MyPlanOptimizer extends LogicalPlanOptimizer {
         protected MyPlanOptimizer(OperatorPlan p,  int iterations) {
             super(p, iterations, new HashSet<String>());
         }
-        
-        protected List<Set<Rule>> buildRuleSets() {            
+
+        @Override
+        protected List<Set<Rule>> buildRuleSets() {
             List<Set<Rule>> ls = new ArrayList<Set<Rule>>();
-            
+
             Set<Rule> s = new HashSet<Rule>();
             // add split filter rule
             Rule r = new LoadTypeCastInserter( "TypeCastInserter" );
             s.add(r);
             ls.add(s);
-             
+
             // Split Set
             // This set of rules does splitting of operators only.
             // It does not move operators
             s = new HashSet<Rule>();
             r = new AddForEach( "AddForEach" );
-            s.add(r);            
+            s.add(r);
             ls.add(s);
-            
+
             s = new HashSet<Rule>();
             r = new MergeForEach("MergeForEach");
-            s.add(r);            
+            s.add(r);
             ls.add(s);
 
             return ls;
         }
-    }    
+    }
 }

Modified: pig/branches/branch-0.15/test/org/apache/pig/test/TestMultiQueryCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.15/test/org/apache/pig/test/TestMultiQueryCompiler.java?rev=1736916&r1=1736915&r2=1736916&view=diff
==============================================================================
--- pig/branches/branch-0.15/test/org/apache/pig/test/TestMultiQueryCompiler.java (original)
+++ pig/branches/branch-0.15/test/org/apache/pig/test/TestMultiQueryCompiler.java Mon Mar 28 18:54:01 2016
@@ -575,7 +575,7 @@ public class TestMultiQueryCompiler {
 
             LogicalPlan lp = checkLogicalPlan(2, 1, 7);
 
-            PhysicalPlan pp = checkPhysicalPlan(lp, 2, 1, 11);
+            PhysicalPlan pp = checkPhysicalPlan(lp, 2, 1, 13);
 
             checkMRPlan(pp, 1, 1, 2);
 

Modified: pig/branches/branch-0.15/test/org/apache/pig/test/TestNewPlanFilterAboveForeach.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.15/test/org/apache/pig/test/TestNewPlanFilterAboveForeach.java?rev=1736916&r1=1736915&r2=1736916&view=diff
==============================================================================
--- pig/branches/branch-0.15/test/org/apache/pig/test/TestNewPlanFilterAboveForeach.java (original)
+++ pig/branches/branch-0.15/test/org/apache/pig/test/TestNewPlanFilterAboveForeach.java Mon Mar 28 18:54:01 2016
@@ -318,6 +318,8 @@ public class TestNewPlanFilterAboveForea
         Assert.assertTrue( filter instanceof LOFilter );
         Operator fe = newLogicalPlan.getSuccessors( filter ).get( 0 );
         Assert.assertTrue( fe instanceof LOForEach );
+        fe = newLogicalPlan.getSuccessors( fe ).get( 0 );
+        Assert.assertTrue( fe instanceof LOForEach );
         Operator store = newLogicalPlan.getSuccessors( fe ).get( 0 );
         Assert.assertTrue( store instanceof LOStore );
     }
@@ -335,6 +337,8 @@ public class TestNewPlanFilterAboveForea
         Assert.assertTrue( load instanceof LOLoad );
         Operator fe = newLogicalPlan.getSuccessors( load ).get( 0 );
         Assert.assertTrue( fe instanceof LOForEach );
+        fe = newLogicalPlan.getSuccessors( fe ).get( 0 );
+        Assert.assertTrue( fe instanceof LOForEach );
         Operator filter = newLogicalPlan.getSuccessors( fe ).get( 0 );
         Assert.assertTrue( filter instanceof LOFilter );
         Operator store = newLogicalPlan.getSuccessors( filter ).get( 0 );
@@ -354,6 +358,8 @@ public class TestNewPlanFilterAboveForea
         Assert.assertTrue( load instanceof LOLoad );
         Operator fe = newLogicalPlan.getSuccessors( load ).get( 0 );
         Assert.assertTrue( fe instanceof LOForEach );
+        fe = newLogicalPlan.getSuccessors( fe ).get( 0 );
+        Assert.assertTrue( fe instanceof LOForEach );
         Operator filter = newLogicalPlan.getSuccessors( fe ).get( 0 );
         Assert.assertTrue( filter instanceof LOFilter );
         Operator store = newLogicalPlan.getSuccessors( filter ).get( 0 );
@@ -375,6 +381,8 @@ public class TestNewPlanFilterAboveForea
         Assert.assertTrue( filter instanceof LOFilter );
         Operator fe = newLogicalPlan.getSuccessors( filter ).get( 0 );
         Assert.assertTrue( fe instanceof LOForEach );
+        fe = newLogicalPlan.getSuccessors( fe ).get( 0 );
+        Assert.assertTrue( fe instanceof LOForEach );
         Operator store = newLogicalPlan.getSuccessors( fe ).get( 0 );
         Assert.assertTrue( store instanceof LOStore );
     }
@@ -395,6 +403,8 @@ public class TestNewPlanFilterAboveForea
         Assert.assertTrue( filter instanceof LOFilter );
         Operator fe = newLogicalPlan.getSuccessors( filter ).get( 0 );
         Assert.assertTrue( fe instanceof LOForEach );
+        fe = newLogicalPlan.getSuccessors( fe ).get( 0 );
+        Assert.assertTrue( fe instanceof LOForEach );
         Operator store = newLogicalPlan.getSuccessors( fe ).get( 0 );
         Assert.assertTrue( store instanceof LOStore );
     }
@@ -414,6 +424,8 @@ public class TestNewPlanFilterAboveForea
         Assert.assertTrue( filter instanceof LOFilter );
         Operator fe = newLogicalPlan.getSuccessors( filter ).get( 0 );
         Assert.assertTrue( fe instanceof LOForEach );
+        fe = newLogicalPlan.getSuccessors( fe ).get( 0 );
+        Assert.assertTrue( fe instanceof LOForEach );
         Operator store = newLogicalPlan.getSuccessors( fe ).get( 0 );
         Assert.assertTrue( store instanceof LOStore );
     }
@@ -433,6 +445,8 @@ public class TestNewPlanFilterAboveForea
         Assert.assertTrue( filter instanceof LOFilter );
         Operator fe = newLogicalPlan.getSuccessors( filter ).get( 0 );
         Assert.assertTrue( fe instanceof LOForEach );
+        fe = newLogicalPlan.getSuccessors( fe ).get( 0 );
+        Assert.assertTrue( fe instanceof LOForEach );
         Operator store = newLogicalPlan.getSuccessors( fe ).get( 0 );
         Assert.assertTrue( store instanceof LOStore );
     }

Modified: pig/branches/branch-0.15/test/org/apache/pig/test/TestNewPlanFilterRule.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.15/test/org/apache/pig/test/TestNewPlanFilterRule.java?rev=1736916&r1=1736915&r2=1736916&view=diff
==============================================================================
--- pig/branches/branch-0.15/test/org/apache/pig/test/TestNewPlanFilterRule.java (original)
+++ pig/branches/branch-0.15/test/org/apache/pig/test/TestNewPlanFilterRule.java Mon Mar 28 18:54:01 2016
@@ -488,7 +488,9 @@ public class TestNewPlanFilterRule {
 
         Operator load = newLogicalPlan.getSources().get( 0 );
         Assert.assertTrue( load instanceof LOLoad );
-        Operator group = newLogicalPlan.getSuccessors( load ).get( 0 );
+        Operator fe = newLogicalPlan.getSuccessors( load ).get( 0 );
+        Assert.assertTrue( fe instanceof LOForEach );
+        Operator group = newLogicalPlan.getSuccessors( fe ).get( 0 );
         Assert.assertTrue( group instanceof LOCogroup );
         Operator filter = newLogicalPlan.getSuccessors( group ).get( 0 );
         Assert.assertTrue( filter instanceof LOFilter );

Modified: pig/branches/branch-0.15/test/org/apache/pig/test/TestNewPlanLogicalOptimizer.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.15/test/org/apache/pig/test/TestNewPlanLogicalOptimizer.java?rev=1736916&r1=1736915&r2=1736916&view=diff
==============================================================================
--- pig/branches/branch-0.15/test/org/apache/pig/test/TestNewPlanLogicalOptimizer.java (original)
+++ pig/branches/branch-0.15/test/org/apache/pig/test/TestNewPlanLogicalOptimizer.java Mon Mar 28 18:54:01 2016
@@ -21,6 +21,7 @@ import static org.apache.pig.newplan.log
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
+import java.util.HashSet;
 import java.util.Properties;
 
 import org.apache.hadoop.conf.Configuration;
@@ -39,6 +40,7 @@ import org.apache.pig.newplan.logical.ex
 import org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer;
 import org.apache.pig.newplan.logical.optimizer.SchemaResetter;
 import org.apache.pig.newplan.logical.relational.LOFilter;
+import org.apache.pig.newplan.logical.relational.LOForEach;
 import org.apache.pig.newplan.logical.relational.LOJoin;
 import org.apache.pig.newplan.logical.relational.LOJoin.JOINTYPE;
 import org.apache.pig.newplan.logical.relational.LOLoad;
@@ -172,6 +174,11 @@ public class TestNewPlanLogicalOptimizer
             expected.add(DA);
             expected.connect(A, DA);
 
+            // A = foreach
+            LOForEach foreachA = org.apache.pig.newplan.logical.Util.addForEachAfter(expected, DA, 0, new HashSet<Integer>());
+            foreachA.setAlias("A");
+            foreachA.neverUseForRealSetSchema(aschema);
+
             // B = load
             LogicalSchema bschema = new LogicalSchema();
             bschema.addField(new LogicalSchema.LogicalFieldSchema(
@@ -193,6 +200,11 @@ public class TestNewPlanLogicalOptimizer
             expected.add(DB);
             expected.connect(B, DB);
 
+            // B = foreach
+            LOForEach foreachB = org.apache.pig.newplan.logical.Util.addForEachAfter(expected, DB, 0, new HashSet<Integer>());
+            foreachB.setAlias("B");
+            foreachB.neverUseForRealSetSchema(bschema);
+
             // C = join
             LogicalSchema cschema = new LogicalSchema();
             cschema.addField(new LogicalSchema.LogicalFieldSchema(
@@ -221,8 +233,8 @@ public class TestNewPlanLogicalOptimizer
             mm.put(1, bprojplan);
             C.neverUseForRealSetSchema(cschema);
             expected.add(C);
-            expected.connect(DA, C);
-            expected.connect(DB, C);
+            expected.connect(foreachA, C);
+            expected.connect(foreachB, C);
 
             // D = filter
             LogicalExpressionPlan filterPlan = new LogicalExpressionPlan();

Modified: pig/branches/branch-0.15/test/org/apache/pig/test/TestNewPlanPushDownForeachFlatten.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.15/test/org/apache/pig/test/TestNewPlanPushDownForeachFlatten.java?rev=1736916&r1=1736915&r2=1736916&view=diff
==============================================================================
--- pig/branches/branch-0.15/test/org/apache/pig/test/TestNewPlanPushDownForeachFlatten.java (original)
+++ pig/branches/branch-0.15/test/org/apache/pig/test/TestNewPlanPushDownForeachFlatten.java Mon Mar 28 18:54:01 2016
@@ -64,25 +64,25 @@ public class TestNewPlanPushDownForeachF
     }
 
     /**
-     * 
+     *
      * A simple filter UDF for testing
      *
      */
     static public class MyFilterFunc extends FilterFunc {
-        
+
         @Override
         public Boolean exec(Tuple input) {
             return false;
         }
     }
-    
+
     /**
      * Old plan is empty, so is the optimized new plan.
      */
     @Test
     public void testErrorEmptyInput() throws Exception {
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( "" );
-        
+
         Assert.assertTrue( newLogicalPlan.getOperators().hasNext() ==  false );
     }
 
@@ -100,7 +100,7 @@ public class TestNewPlanPushDownForeachF
         List<Operator> nexts = newLogicalPlan.getSuccessors( load );
         Assert.assertTrue( nexts != null && nexts.size() == 1 );
 }
-    
+
     @Test
     public void testForeachNoFlatten() throws Exception {
         String query = "A = load 'myfile' as (name, age, gpa);" +
@@ -108,28 +108,30 @@ public class TestNewPlanPushDownForeachF
         "C = order B by $0, $1;" +
          "D = store C into 'dummy';";
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-        
+
         Operator load = newLogicalPlan.getSources().get( 0 );
         Assert.assertTrue( load instanceof LOLoad );
         Operator foreach = newLogicalPlan.getSuccessors( load ).get( 0 );
         Assert.assertTrue( foreach instanceof LOForEach );
+        foreach = newLogicalPlan.getSuccessors( foreach ).get( 0 );
+        Assert.assertTrue( foreach instanceof LOForEach );
         Operator sort = newLogicalPlan.getSuccessors( foreach ).get( 0 );
         Assert.assertTrue( sort instanceof LOSort );
     }
-    
+
     @Test
     public void testForeachNoSuccessors() throws Exception {
         String query = "A = load 'myfile' as (name, age, gpa);" +
                        "B = foreach A generate flatten($1);" +
                        "Store B into 'output';";
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-        
+
         Operator load = newLogicalPlan.getSources().get( 0 );
         Assert.assertTrue( load instanceof LOLoad );
         Operator foreach = newLogicalPlan.getSuccessors( load ).get( 0 );
         Assert.assertTrue( foreach instanceof LOForEach );
     }
-    
+
     @Test
     public void testForeachStreaming() throws Exception {
         String query = "A = load 'myfile' as (name, age, gpa);" +
@@ -137,61 +139,65 @@ public class TestNewPlanPushDownForeachF
         "C = stream B through `" + "pc -l" + "`;" +
         "Store C into 'output';";
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-        
+
         Operator load = newLogicalPlan.getSources().get( 0 );
         Assert.assertTrue( load instanceof LOLoad );
         Operator foreach = newLogicalPlan.getSuccessors( load ).get( 0 );
         Assert.assertTrue( foreach instanceof LOForEach );
     }
-    
+
     @Test
     public void testForeachDistinct() throws Exception {
         String query = "A = load 'myfile' as (name, age, gpa);" +
         "B = foreach A generate flatten($1);" +
         "C = distinct B;" +
         "store C into 'output';";
-        
+
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-        
+
         Operator load = newLogicalPlan.getSources().get( 0 );
         Assert.assertTrue( load instanceof LOLoad );
         Operator foreach = newLogicalPlan.getSuccessors( load ).get( 0 );
         Assert.assertTrue( foreach instanceof LOForEach );
     }
-    
+
     @Test
     public void testForeachForeach() throws Exception {
         String query = "A = load 'myfile' as (name, age, gpa);" +
-        "B = foreach A generate $0, $1, flatten(1);" +        
+        "B = foreach A generate $0, $1, flatten(1);" +
         "C = foreach B generate $0;" +
         "store C into 'output';";
-        
+
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-        
+
         Operator load = newLogicalPlan.getSources().get( 0 );
         Assert.assertTrue( load instanceof LOLoad );
         Operator foreach = newLogicalPlan.getSuccessors( load ).get( 0 );
         Assert.assertTrue( foreach instanceof LOForEach );
+        foreach = newLogicalPlan.getSuccessors( foreach ).get( 0 );
+        Assert.assertTrue( foreach instanceof LOForEach );
         Assert.assertTrue( OptimizerUtils.hasFlatten( (LOForEach)foreach ) );
         foreach = newLogicalPlan.getSuccessors( foreach ).get( 0 );
         Assert.assertTrue( foreach instanceof LOForEach );
         Assert.assertTrue( !OptimizerUtils.hasFlatten( (LOForEach)foreach ) );
     }
-    
+
 
     @Test
     public void testForeachFilter() throws Exception {
         String query = "A = load 'myfile' as (name, age, gpa);" +
-        "B = foreach A generate $0, $1, flatten($2);" +        
+        "B = foreach A generate $0, $1, flatten($2);" +
         "C = filter B by $1 < 18;" +
         "store C into 'output';";
-        
+
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-        
+
         Operator load = newLogicalPlan.getSources().get( 0 );
         Assert.assertTrue( load instanceof LOLoad );
         Operator foreach = newLogicalPlan.getSuccessors( load ).get( 0 );
         Assert.assertTrue( foreach instanceof LOForEach );
+        foreach = newLogicalPlan.getSuccessors( foreach ).get( 0 );
+        Assert.assertTrue( foreach instanceof LOForEach );
         Assert.assertTrue( OptimizerUtils.hasFlatten( (LOForEach)foreach ) );
     }
 
@@ -200,15 +206,17 @@ public class TestNewPlanPushDownForeachF
         String query = "A = load 'myfile' as (name, age, gpa);" +
         "B = foreach A generate $0, $1, flatten($2);" +
         "split B into C if $1 < 18, D if $1 >= 18;" +
-        "store C into 'output1';" + 
+        "store C into 'output1';" +
         "store D into 'output2';";
-        
+
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
 
         Operator load = newLogicalPlan.getSources().get( 0 );
         Assert.assertTrue( load instanceof LOLoad );
         Operator foreach = newLogicalPlan.getSuccessors( load ).get( 0 );
         Assert.assertTrue( foreach instanceof LOForEach );
+        foreach = newLogicalPlan.getSuccessors( foreach ).get( 0 );
+        Assert.assertTrue( foreach instanceof LOForEach );
         Assert.assertTrue( OptimizerUtils.hasFlatten( (LOForEach)foreach ) );
     }
 
@@ -218,13 +226,15 @@ public class TestNewPlanPushDownForeachF
         "B = foreach A generate $0, $1, flatten($2);" +
         "C = limit B 10;" +
         "store C into 'output';";
-        
+
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-        
+
         Operator load = newLogicalPlan.getSources().get( 0 );
         Assert.assertTrue( load instanceof LOLoad );
         Operator foreach = newLogicalPlan.getSuccessors( load ).get( 0 );
         Assert.assertTrue( foreach instanceof LOForEach );
+        foreach = newLogicalPlan.getSuccessors( foreach ).get( 0 );
+        Assert.assertTrue( foreach instanceof LOForEach );
         Assert.assertTrue( OptimizerUtils.hasFlatten( (LOForEach)foreach ) );
     }
 
@@ -235,24 +245,26 @@ public class TestNewPlanPushDownForeachF
         "C = load 'anotherfile' as (name, age, preference);" +
         "D = union B, C;" +
         "store D into 'output';";
-        
+
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-        
+
         List<Operator> loads = newLogicalPlan.getSources();
         Assert.assertTrue( loads.size() == 2 );
         Assert.assertTrue( loads.get( 0 ) instanceof LOLoad );
         Assert.assertTrue( loads.get( 1 ) instanceof LOLoad );
         Operator load = null;
-        if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) ) 
+        if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) )
             load = loads.get( 0 );
         else
             load = loads.get( 1 );
         Assert.assertTrue( load instanceof LOLoad );
         Operator foreach = newLogicalPlan.getSuccessors( load ).get( 0 );
         Assert.assertTrue( foreach instanceof LOForEach );
+        foreach = newLogicalPlan.getSuccessors( foreach ).get( 0 );
+        Assert.assertTrue( foreach instanceof LOForEach );
         Assert.assertTrue( OptimizerUtils.hasFlatten( (LOForEach)foreach ) );
     }
-    
+
     @Test
     public void testForeachCogroup() throws Exception {
         String query = "A = load 'myfile' as (name, age, gpa);" +
@@ -260,7 +272,7 @@ public class TestNewPlanPushDownForeachF
         "C = load 'anotherfile' as (name, age, preference);" +
         "D = cogroup B by $0, C by $0;" +
         "store D into 'output';";
-        
+
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
 
         List<Operator> loads = newLogicalPlan.getSources();
@@ -268,32 +280,36 @@ public class TestNewPlanPushDownForeachF
         Assert.assertTrue( loads.get( 0 ) instanceof LOLoad );
         Assert.assertTrue( loads.get( 1 ) instanceof LOLoad );
         Operator load = null;
-        if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) ) 
+        if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) )
             load = loads.get( 0 );
         else
             load = loads.get( 1 );
         Assert.assertTrue( load instanceof LOLoad );
         Operator foreach = newLogicalPlan.getSuccessors( load ).get( 0 );
         Assert.assertTrue( foreach instanceof LOForEach );
+        foreach = newLogicalPlan.getSuccessors( foreach ).get( 0 );
+        Assert.assertTrue( foreach instanceof LOForEach );
         Assert.assertTrue( OptimizerUtils.hasFlatten( (LOForEach)foreach ) );
     }
-    
+
     @Test
     public void testForeachGroupBy() throws Exception {
         String query = "A = load 'myfile' as (name, age, gpa);" +
         "B = foreach A generate $0, $1, flatten($2);" +
         "C = group B by $0;" +
         "store C into 'output';";
-        
+
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-        
+
         Operator load = newLogicalPlan.getSources().get( 0 );
         Assert.assertTrue( load instanceof LOLoad );
         Operator foreach = newLogicalPlan.getSuccessors( load ).get( 0 );
         Assert.assertTrue( foreach instanceof LOForEach );
+        foreach = newLogicalPlan.getSuccessors( foreach ).get( 0 );
+        Assert.assertTrue( foreach instanceof LOForEach );
         Assert.assertTrue( OptimizerUtils.hasFlatten( (LOForEach)foreach ) );
     }
-    
+
     @Test
     public void testForeachSort() throws Exception {
         String query = "A = load 'myfile' as (name, age, gpa);" +
@@ -301,16 +317,18 @@ public class TestNewPlanPushDownForeachF
         "C = order B by $0, $1;" +
         "D = store C into 'dummy';";
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-        
+
         Operator load = newLogicalPlan.getSources().get( 0 );
         Assert.assertTrue( load instanceof LOLoad );
-        Operator sort = newLogicalPlan.getSuccessors( load ).get( 0 );
+        Operator foreach = newLogicalPlan.getSuccessors( load ).get( 0 );
+        Assert.assertTrue( foreach instanceof LOForEach );
+        Operator sort = newLogicalPlan.getSuccessors( foreach ).get( 0 );
         Assert.assertTrue( sort instanceof LOSort );
-        Operator foreach = newLogicalPlan.getSuccessors( sort ).get( 0 );
+        foreach = newLogicalPlan.getSuccessors( sort ).get( 0 );
         Assert.assertTrue( foreach instanceof LOForEach );
         Assert.assertTrue( OptimizerUtils.hasFlatten( (LOForEach)foreach ) );
     }
-    
+
     /**
      * Non-pure-projection, not optimizable.
      */
@@ -321,16 +339,18 @@ public class TestNewPlanPushDownForeachF
         "C = order B by $0, $1;" +
          "D = store C into 'dummy';";
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-        
+
         Operator load = newLogicalPlan.getSources().get( 0 );
         Assert.assertTrue( load instanceof LOLoad );
         Operator foreach = newLogicalPlan.getSuccessors( load ).get( 0 );
         Assert.assertTrue( foreach instanceof LOForEach );
+        foreach = newLogicalPlan.getSuccessors( foreach ).get( 0 );
+        Assert.assertTrue( foreach instanceof LOForEach );
         Operator sort = newLogicalPlan.getSuccessors( foreach ).get( 0 );
         Assert.assertTrue( sort instanceof LOSort );
     }
-    
-    
+
+
     /**
      * If the flattened field is referenced in the sort condition, then no optimization can be done.
      */
@@ -341,7 +361,7 @@ public class TestNewPlanPushDownForeachF
         "C = order B by $0, $3;" +
         "D = store C into 'dummy';";
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-        
+
         Operator load = newLogicalPlan.getSources().get( 0 );
         Assert.assertTrue( load instanceof LOLoad );
         Operator foreach = newLogicalPlan.getSuccessors( load ).get( 0 );
@@ -360,49 +380,55 @@ public class TestNewPlanPushDownForeachF
         "store C into 'output';";
 
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-        
+
         Operator load = newLogicalPlan.getSources().get( 0 );
         Assert.assertTrue( load instanceof LOLoad );
         Operator foreach = newLogicalPlan.getSuccessors( load ).get( 0 );
         Assert.assertTrue( foreach instanceof LOForEach );
+        foreach = newLogicalPlan.getSuccessors( foreach ).get( 0 );
+        Assert.assertTrue( foreach instanceof LOForEach );
         Operator sort = newLogicalPlan.getSuccessors( foreach ).get( 0 );
         Assert.assertTrue( sort instanceof LOSort );
     }
-    
+
     @Test
     public void testForeachUDFSort() throws Exception {
         String query = "A = load 'myfile' as (name, age, gpa);" +
         "B = foreach A generate $0, $1, " + Identity.class.getName() + "($2) ;" +
         "C = order B by $0, $1;" +
         "store C into 'output';";
-        
+
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-        
+
         Operator load = newLogicalPlan.getSources().get( 0 );
         Assert.assertTrue( load instanceof LOLoad );
         Operator foreach = newLogicalPlan.getSuccessors( load ).get( 0 );
         Assert.assertTrue( foreach instanceof LOForEach );
+        foreach = newLogicalPlan.getSuccessors( foreach ).get( 0 );
+        Assert.assertTrue( foreach instanceof LOForEach );
         Operator sort = newLogicalPlan.getSuccessors( foreach ).get( 0 );
         Assert.assertTrue( sort instanceof LOSort );
     }
-    
+
     @Test
     public void testForeachCastSort() throws Exception {
         String query = "A = load 'myfile' as (name, age, gpa);" +
-        "B = foreach A generate (chararray)$0, $1, flatten($2);" +        
+        "B = foreach A generate (chararray)$0, $1, flatten($2);" +
         "C = order B by $0, $1;" +
         "store C into 'output';";
 
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-        
+
         Operator load = newLogicalPlan.getSources().get( 0 );
         Assert.assertTrue( load instanceof LOLoad );
         Operator foreach = newLogicalPlan.getSuccessors( load ).get( 0 );
         Assert.assertTrue( foreach instanceof LOForEach );
+        foreach = newLogicalPlan.getSuccessors( foreach ).get( 0 );
+        Assert.assertTrue( foreach instanceof LOForEach );
         Operator sort = newLogicalPlan.getSuccessors( foreach ).get( 0 );
         Assert.assertTrue( sort instanceof LOSort );
     }
-    
+
     @Test
     public void testForeachCross() throws Exception {
         String query = "A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));" +
@@ -413,13 +439,13 @@ public class TestNewPlanPushDownForeachF
         "store E into 'output';";
 
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-        
+
         List<Operator> loads = newLogicalPlan.getSources();
         Assert.assertTrue( loads.size() == 2 );
         Assert.assertTrue( loads.get( 0 ) instanceof LOLoad );
         Assert.assertTrue( loads.get( 1 ) instanceof LOLoad );
         Operator op = null;
-        if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) ) 
+        if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) )
             op = loads.get( 0 );
         else
             op = loads.get( 1 );
@@ -454,7 +480,7 @@ public class TestNewPlanPushDownForeachF
         Assert.assertTrue( loads.get( 0 ) instanceof LOLoad );
         Assert.assertTrue( loads.get( 1 ) instanceof LOLoad );
         Operator op = null;
-        if( ((LOLoad)loads.get( 0 )).getAlias().equals( "B" ) ) 
+        if( ((LOLoad)loads.get( 0 )).getAlias().equals( "B" ) )
             op = loads.get( 0 );
         else
             op = loads.get( 1 );
@@ -487,16 +513,16 @@ public class TestNewPlanPushDownForeachF
         "store F into 'output';";
 
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-        
+
         // No optimization about foreach flatten.
         Operator store = newLogicalPlan.getSinks().get( 0 );
         Operator limit = newLogicalPlan.getPredecessors(store).get(0);
         Operator cross = newLogicalPlan.getPredecessors(limit).get(0);
         Assert.assertTrue( cross instanceof LOCross );
     }
-    
+
     /**
-     * This actually is a valid case, even though the optimization may not provide any performance benefit. However, detecting 
+     * This actually is a valid case, even though the optimization may not provide any performance benefit. However, detecting
      * such a case requires more coding. Thus, we allow optimization to go thru in this case.
      */
     @Test
@@ -509,13 +535,13 @@ public class TestNewPlanPushDownForeachF
         "store E into 'output';";
 
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-        
+
         List<Operator> loads = newLogicalPlan.getSources();
         Assert.assertTrue( loads.size() == 2 );
         Assert.assertTrue( loads.get( 0 ) instanceof LOLoad );
         Assert.assertTrue( loads.get( 1 ) instanceof LOLoad );
         Operator op = null;
-        if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) ) 
+        if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) )
             op = loads.get( 0 );
         else
             op = loads.get( 1 );
@@ -547,13 +573,13 @@ public class TestNewPlanPushDownForeachF
         "store E into 'output';";
 
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-        
+
         List<Operator> loads = newLogicalPlan.getSources();
         Assert.assertTrue( loads.size() == 2 );
         Assert.assertTrue( loads.get( 0 ) instanceof LOLoad );
         Assert.assertTrue( loads.get( 1 ) instanceof LOLoad );
         Operator op = null;
-        if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) ) 
+        if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) )
             op = loads.get( 0 );
         else
             op = loads.get( 1 );
@@ -571,7 +597,7 @@ public class TestNewPlanPushDownForeachF
         op = newLogicalPlan.getSuccessors( op ).get( 0 );
         Assert.assertTrue( op instanceof LOLimit );
     }
-    
+
     /**
      * Cast should NOT matter to cross. This is a valid positive test case.
      */
@@ -585,13 +611,13 @@ public class TestNewPlanPushDownForeachF
         "store E into 'output';";
 
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-        
+
         List<Operator> loads = newLogicalPlan.getSources();
         Assert.assertTrue( loads.size() == 2 );
         Assert.assertTrue( loads.get( 0 ) instanceof LOLoad );
         Assert.assertTrue( loads.get( 1 ) instanceof LOLoad );
         Operator op = null;
-        if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) ) 
+        if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) )
             op = loads.get( 0 );
         else
             op = loads.get( 1 );
@@ -609,7 +635,7 @@ public class TestNewPlanPushDownForeachF
         op = newLogicalPlan.getSuccessors( op ).get( 0 );
         Assert.assertTrue( op instanceof LOLimit );
     }
-    
+
     @Test
     public void testForeachFRJoin() throws Exception {
         String query = "A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));" +
@@ -619,7 +645,7 @@ public class TestNewPlanPushDownForeachF
         "E = limit D 10;" +
         "store E into 'output';";
 
-        
+
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
 
         List<Operator> loads = newLogicalPlan.getSources();
@@ -627,7 +653,7 @@ public class TestNewPlanPushDownForeachF
         Assert.assertTrue( loads.get( 0 ) instanceof LOLoad );
         Assert.assertTrue( loads.get( 1 ) instanceof LOLoad );
         Operator op = null;
-        if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) ) 
+        if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) )
             op = loads.get( 0 );
         else
             op = loads.get( 1 );
@@ -662,7 +688,7 @@ public class TestNewPlanPushDownForeachF
         Assert.assertTrue( loads.get( 0 ) instanceof LOLoad );
         Assert.assertTrue( loads.get( 1 ) instanceof LOLoad );
         Operator op = null;
-        if( ((LOLoad)loads.get( 0 )).getAlias().equals( "B" ) ) 
+        if( ((LOLoad)loads.get( 0 )).getAlias().equals( "B" ) )
             op = loads.get( 0 );
         else
             op = loads.get( 1 );
@@ -693,7 +719,7 @@ public class TestNewPlanPushDownForeachF
         "E = join B by $0, D by $0 using 'replicated';" +
         "F = limit E 10;" +
         "store F into 'output';";
-        
+
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
 
         // No optimization about foreach flatten.
@@ -702,7 +728,7 @@ public class TestNewPlanPushDownForeachF
         Operator join = newLogicalPlan.getPredecessors( limit ).get( 0 );
         Assert.assertTrue( join instanceof LOJoin );
     }
-    
+
     /**
      * Valid positive test case, even though the benefit from the optimization is questionable. However, putting in additinal check for
      * this condition requires extra coding.
@@ -717,13 +743,13 @@ public class TestNewPlanPushDownForeachF
         "store E into 'output';";
 
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-        
+
         List<Operator> loads = newLogicalPlan.getSources();
         Assert.assertTrue( loads.size() == 2 );
         Assert.assertTrue( loads.get( 0 ) instanceof LOLoad );
         Assert.assertTrue( loads.get( 1 ) instanceof LOLoad );
         Operator op = null;
-        if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) ) 
+        if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) )
             op = loads.get( 0 );
         else
             op = loads.get( 1 );
@@ -756,13 +782,13 @@ public class TestNewPlanPushDownForeachF
         "store E into 'output';";
 
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-        
+
         List<Operator> loads = newLogicalPlan.getSources();
         Assert.assertTrue( loads.size() == 2 );
         Assert.assertTrue( loads.get( 0 ) instanceof LOLoad );
         Assert.assertTrue( loads.get( 1 ) instanceof LOLoad );
         Operator op = null;
-        if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) ) 
+        if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) )
             op = loads.get( 0 );
         else
             op = loads.get( 1 );
@@ -795,13 +821,13 @@ public class TestNewPlanPushDownForeachF
         "store E into 'output';";
 
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-        
+
         List<Operator> loads = newLogicalPlan.getSources();
         Assert.assertTrue( loads.size() == 2 );
         Assert.assertTrue( loads.get( 0 ) instanceof LOLoad );
         Assert.assertTrue( loads.get( 1 ) instanceof LOLoad );
         Operator op = null;
-        if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) ) 
+        if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) )
             op = loads.get( 0 );
         else
             op = loads.get( 1 );
@@ -830,13 +856,13 @@ public class TestNewPlanPushDownForeachF
         "store E into 'output';";
 
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-        
+
         List<Operator> loads = newLogicalPlan.getSources();
         Assert.assertTrue( loads.size() == 2 );
         Assert.assertTrue( loads.get( 0 ) instanceof LOLoad );
         Assert.assertTrue( loads.get( 1 ) instanceof LOLoad );
         Operator op = null;
-        if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) ) 
+        if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) )
             op = loads.get( 0 );
         else
             op = loads.get( 1 );
@@ -854,7 +880,7 @@ public class TestNewPlanPushDownForeachF
         op = newLogicalPlan.getSuccessors( op ).get( 0 );
         Assert.assertTrue( op instanceof LOLimit );
     }
-    
+
     @Test
     public void testForeachInnerJoin1() throws Exception {
         String query = "A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));" +
@@ -865,13 +891,13 @@ public class TestNewPlanPushDownForeachF
         "store E into 'output';";
 
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-        
+
         List<Operator> loads = newLogicalPlan.getSources();
         Assert.assertTrue( loads.size() == 2 );
         Assert.assertTrue( loads.get( 0 ) instanceof LOLoad );
         Assert.assertTrue( loads.get( 1 ) instanceof LOLoad );
         Operator op = null;
-        if( ((LOLoad)loads.get( 0 )).getAlias().equals( "B" ) ) 
+        if( ((LOLoad)loads.get( 0 )).getAlias().equals( "B" ) )
             op = loads.get( 0 );
         else
             op = loads.get( 1 );
@@ -911,9 +937,9 @@ public class TestNewPlanPushDownForeachF
         Operator join = newLogicalPlan.getPredecessors( limit ).get( 0 );
         Assert.assertTrue( join instanceof LOJoin );
     }
-    
+
     /**
-     * This is actually a valid positive test case, even though the benefit of such optimization is questionable. However, 
+     * This is actually a valid positive test case, even though the benefit of such optimization is questionable. However,
      * checking for such condition requires additional coding effort.
      */
     @Test
@@ -926,13 +952,13 @@ public class TestNewPlanPushDownForeachF
         "store E into 'output';";
 
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-        
+
         List<Operator> loads = newLogicalPlan.getSources();
         Assert.assertTrue( loads.size() == 2 );
         Assert.assertTrue( loads.get( 0 ) instanceof LOLoad );
         Assert.assertTrue( loads.get( 1 ) instanceof LOLoad );
         Operator op = null;
-        if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) ) 
+        if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) )
             op = loads.get( 0 );
         else
             op = loads.get( 1 );
@@ -964,13 +990,13 @@ public class TestNewPlanPushDownForeachF
         "store E into 'output';";
 
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-        
+
         List<Operator> loads = newLogicalPlan.getSources();
         Assert.assertTrue( loads.size() == 2 );
         Assert.assertTrue( loads.get( 0 ) instanceof LOLoad );
         Assert.assertTrue( loads.get( 1 ) instanceof LOLoad );
         Operator op = null;
-        if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) ) 
+        if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) )
             op = loads.get( 0 );
         else
             op = loads.get( 1 );
@@ -1002,13 +1028,13 @@ public class TestNewPlanPushDownForeachF
         "store E into 'output';";
 
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-        
+
         List<Operator> loads = newLogicalPlan.getSources();
         Assert.assertTrue( loads.size() == 2 );
         Assert.assertTrue( loads.get( 0 ) instanceof LOLoad );
         Assert.assertTrue( loads.get( 1 ) instanceof LOLoad );
         Operator op = null;
-        if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) ) 
+        if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) )
             op = loads.get( 0 );
         else
             op = loads.get( 1 );
@@ -1045,7 +1071,7 @@ public class TestNewPlanPushDownForeachF
         Operator join = newLogicalPlan.getPredecessors( limit ).get( 0 );
         Assert.assertTrue( join instanceof LOJoin );
     }
-    
+
     // See PIG-1374
     @Test
     public void testForeachRequiredField() throws Exception {
@@ -1055,7 +1081,7 @@ public class TestNewPlanPushDownForeachF
         "store C into 'output';";
 
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-        
+
         Operator load = newLogicalPlan.getSources().get( 0 );
         Assert.assertTrue( load instanceof LOLoad );
         Operator foreach = newLogicalPlan.getSuccessors( load ).get( 0 );
@@ -1065,7 +1091,7 @@ public class TestNewPlanPushDownForeachF
         Operator sort = newLogicalPlan.getSuccessors( foreach1 ).get( 0 );
         Assert.assertTrue( sort instanceof LOSort );
     }
-    
+
     // See PIG-1706
     @Test
     public void testForeachWithUserDefinedSchema() throws Exception {
@@ -1076,13 +1102,13 @@ public class TestNewPlanPushDownForeachF
         "store d into 'output';";
 
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-        
+
         Operator store = newLogicalPlan.getSinks().get( 0 );
         LOForEach foreach = (LOForEach)newLogicalPlan.getPredecessors(store).get(0);
         Assert.assertTrue(foreach.getSchema().getField(1).alias.equals("q1"));
         Assert.assertTrue(foreach.getSchema().getField(2).alias.equals("q2"));
     }
-    
+
     // See PIG-1751
     @Test
     public void testForeachWithUserDefinedSchema2() throws Exception {
@@ -1093,7 +1119,7 @@ public class TestNewPlanPushDownForeachF
         "store d into 'output';";
 
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-        
+
         Operator store = newLogicalPlan.getSinks().get( 0 );
         Operator op = newLogicalPlan.getPredecessors(store).get(0);
         Assert.assertTrue(op instanceof LOJoin);
@@ -1112,7 +1138,7 @@ public class TestNewPlanPushDownForeachF
 
         Operator load = newLogicalPlan.getSources().get( 0 );
         Assert.assertTrue( load instanceof LOLoad );
-        Assert.assertTrue( "Field \"a1\" is dropped by ColumnMapKeyPrune" + 
+        Assert.assertTrue( "Field \"a1\" is dropped by ColumnMapKeyPrune" +
                   "even though it should be stored",
                   ((LOLoad)load).getSchema().getField("a1") != null );
     }
@@ -1143,6 +1169,7 @@ public class TestNewPlanPushDownForeachF
             addPlanTransformListener(new ProjectionPatcher());
         }
 
+        @Override
         protected List<Set<Rule>> buildRuleSets() {
             List<Set<Rule>> ls = new ArrayList<Set<Rule>>();
 
@@ -1178,24 +1205,25 @@ public class TestNewPlanPushDownForeachF
         protected MyPlanOptimizer(OperatorPlan p,  int iterations) {
             super(p, iterations, new HashSet<String>());
         }
-        
-        protected List<Set<Rule>> buildRuleSets() {            
+
+        @Override
+        protected List<Set<Rule>> buildRuleSets() {
             List<Set<Rule>> ls = new ArrayList<Set<Rule>>();
-            
+
             Set<Rule> s = new HashSet<Rule>();
             // add split filter rule
             Rule r = new LoadTypeCastInserter( "TypeCastInserter" );
             s.add(r);
             ls.add(s);
-             
+
             s = new HashSet<Rule>();
             r = new PushDownForEachFlatten( "PushDownForEachFlatten" );
-            s.add(r);            
+            s.add(r);
             ls.add(s);
-            
+
             return ls;
         }
-    }    
+    }
 
     private LogicalPlan migrateAndOptimizePlan(String query) throws Exception {
     	PigServer pigServer = new PigServer( pc );
@@ -1212,16 +1240,18 @@ public class TestNewPlanPushDownForeachF
         "C = order B by $0, $1;" +
         "D = store C into 'dummy';";
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-        
+
         Operator load = newLogicalPlan.getSources().get( 0 );
         Assert.assertTrue( load instanceof LOLoad );
         Operator foreach = newLogicalPlan.getSuccessors( load ).get( 0 );
         Assert.assertTrue( foreach instanceof LOForEach );
+        foreach = newLogicalPlan.getSuccessors( foreach ).get( 0 );
+        Assert.assertTrue( foreach instanceof LOForEach );
         Operator sort = newLogicalPlan.getSuccessors( foreach ).get( 0 );
         Assert.assertTrue( sort instanceof LOSort );
-        
+
     }
-    
+
     @Test
     // See PIG-3826
     public void testOuterJoin() throws Exception {
@@ -1232,7 +1262,7 @@ public class TestNewPlanPushDownForeachF
         "t3 = join B by id LEFT OUTER, t2 by id;" +
         "store t3 into 'output';";
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-        
+
         Operator store = newLogicalPlan.getSinks().get( 0 );
         Operator join = newLogicalPlan.getPredecessors(store).get(0);
         Assert.assertTrue( join instanceof LOJoin );



Mime
View raw message