pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From xu...@apache.org
Subject svn commit: r1099123 [4/16] - in /pig/branches/branch-0.9: ./ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/impl/ src/org/apache/pig/impl/logi...
Date Tue, 03 May 2011 16:58:21 GMT
Modified: pig/branches/branch-0.9/test/org/apache/pig/test/TestMRCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/test/org/apache/pig/test/TestMRCompiler.java?rev=1099123&r1=1099122&r2=1099123&view=diff
==============================================================================
--- pig/branches/branch-0.9/test/org/apache/pig/test/TestMRCompiler.java (original)
+++ pig/branches/branch-0.9/test/org/apache/pig/test/TestMRCompiler.java Tue May  3 16:58:19 2011
@@ -17,14 +17,9 @@
  */
 package org.apache.pig.test;
 
-import static org.junit.Assert.assertTrue;
-
 import java.io.ByteArrayOutputStream;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
-import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
@@ -34,6 +29,7 @@ import java.util.Random;
 import org.apache.pig.ComparisonFunc;
 import org.apache.pig.ExecType;
 import org.apache.pig.FuncSpec;
+import org.apache.pig.PigServer;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.builtin.AVG;
 import org.apache.pig.builtin.COUNT;
@@ -42,7 +38,6 @@ import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.builtin.GFCross;
-import org.apache.pig.impl.logicalLayer.LogicalPlan;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompilerException;
@@ -54,13 +49,8 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.*;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserComparisonFunc;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
 import org.apache.pig.impl.plan.NodeIdGenerator;
-import org.apache.pig.impl.plan.PlanException;
-import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.test.TestPOSort.WeirdComparator;
 import org.apache.pig.test.utils.GenPhyOp;
-import org.apache.pig.test.utils.LogicalPlanTester;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -99,8 +89,8 @@ public class TestMRCompiler extends juni
         r = new Random(SEED);
     }
     
-    LogicalPlanTester planTester = new LogicalPlanTester(pc) ;
-    LogicalPlanTester planTesterMR = new LogicalPlanTester(pcMR) ;
+    PigServer pigServer = null;
+    PigServer pigServerMR = null;
 
     // if for some reason, the golden files need
     // to be regenerated, set this to true - THIS
@@ -116,6 +106,8 @@ public class TestMRCompiler extends juni
         
         GenPhyOp.setPc(pc);
         NodeIdGenerator.getGenerator().reset("");
+        pigServer = new PigServer( pc );
+        pigServerMR = new PigServer( pcMR );
     }
 
     @Override
@@ -835,10 +827,11 @@ public class TestMRCompiler extends juni
     
     @Test
     public void testMRCompilerErr() throws Exception {
-    	planTester.buildPlan("a = load 'input';");
-    	LogicalPlan lp = planTester.buildPlan("b = filter a by $0 > 5;");
+    	String query = "a = load 'input';" +
+    	"b = filter a by $0 > 5;" +
+    	"store b into 'output';";
     	
-    	PhysicalPlan pp = Util.buildPhysicalPlan(lp, pc);
+    	PhysicalPlan pp = Util.buildPp(pigServer, query);
     	pp.remove(pp.getRoots().get(0));
     	try {
     		Util.buildMRPlan(new PhysicalPlan(), pc);
@@ -873,12 +866,12 @@ public class TestMRCompiler extends juni
      */
     @Test
     public void testNumReducersInLimit() throws Exception {
-    	planTester.buildPlan("a = load 'input';");
-    	planTester.buildPlan("b = order a by $0;");
-    	planTester.buildPlan("c = limit b 10;");
-    	LogicalPlan lp = planTester.buildPlan("store c into '/tmp';");
+    	String query = "a = load 'input';" +
+    	"b = order a by $0;" +
+    	"c = limit b 10;" +
+    	"store c into 'output';";
     	
-    	PhysicalPlan pp = Util.buildPhysicalPlan(lp, pc);
+    	PhysicalPlan pp = Util.buildPp(pigServer, query);
     	MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
     	MapReduceOper mrOper = mrPlan.getRoots().get(0);
     	int count = 1;
@@ -896,12 +889,11 @@ public class TestMRCompiler extends juni
      */
     @Test
     public void testNumReducersInLimitWithParallel() throws Exception {
-    	planTesterMR.buildPlan("a = load 'input';");
-    	planTesterMR.buildPlan("b = order a by $0 parallel 2;");
-    	planTesterMR.buildPlan("c = limit b 10;");
-    	LogicalPlan lp = planTesterMR.buildPlan("store c into '/tmp';");
+    	String query = "a = load 'input';" + 
+    	"b = order a by $0 parallel 2;" +
+    	"c = limit b 10;" + "store c into 'output';";
     	
-    	PhysicalPlan pp = Util.buildPhysicalPlan(lp, pc);
+    	PhysicalPlan pp = Util.buildPp(pigServerMR, query);
     	MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
     	MapReduceOper mrOper = mrPlan.getRoots().get(0);
     	int count = 1;
@@ -915,12 +907,11 @@ public class TestMRCompiler extends juni
 
     @Test
     public void testUDFInJoin() throws Exception {
-        planTester.buildPlan("a = load 'input1' using BinStorage();");
-        planTester.buildPlan("b = load 'input2';");
-        planTester.buildPlan("c = join a by $0, b by $0;");
-        LogicalPlan lp = planTester.buildPlan("store c into '/tmp';");
+        String query = "a = load 'input1' using BinStorage();" +
+        "b = load 'input2';" +
+        "c = join a by $0, b by $0;" + "store c into 'output';";
         
-        PhysicalPlan pp = Util.buildPhysicalPlan(lp, pc);
+        PhysicalPlan pp = Util.buildPp(pigServer, query);
         MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
         MapReduceOper mrOper = mrPlan.getRoots().get(0);
         
@@ -932,14 +923,12 @@ public class TestMRCompiler extends juni
 
     @Test
     public void testMergeJoin() throws Exception{
-
-        //generate = true;
-        planTester.buildPlan("a = load '/tmp/input1';");
-        planTester.buildPlan("b = load '/tmp/input2';");
-        planTester.buildPlan("c = join a by $0, b by $0 using \"merge\";");
-        LogicalPlan lp = planTester.buildPlan("store c into '/tmp';");
+        String query = "a = load '/tmp/input1';" +
+        "b = load '/tmp/input2';" +
+        "c = join a by $0, b by $0 using 'merge';" +
+        "store c into '/tmp/output1';";
         
-        PhysicalPlan pp = Util.buildPhysicalPlan(lp, pc);
+        PhysicalPlan pp = Util.buildPp(pigServer, query);
         run(pp, "test/org/apache/pig/test/data/GoldenFiles/MRC18.gld");
     }
     
@@ -964,15 +953,12 @@ public class TestMRCompiler extends juni
     
     @Test
     public void testMergeJoinWithIndexableLoadFunc() throws Exception{
-
-        //generate = true;
-        planTester.buildPlan("a = load '/tmp/input1';");
-        planTester.buildPlan("b = load '/tmp/input2' using " +
-            TestMergeJoin.DummyIndexableLoader.class.getName() + ";");
-        planTester.buildPlan("c = join a by $0, b by $0 using \"merge\";");
-        LogicalPlan lp = planTester.buildPlan("store c into '/tmp';");
+        String query = "a = load 'input1';" +
+        "b = load 'input2' using " +
+            TestMergeJoin.DummyIndexableLoader.class.getName() + ";" +
+        "c = join a by $0, b by $0 using 'merge';" + "store c into 'output';";
         
-        PhysicalPlan pp = Util.buildPhysicalPlan(lp, pc);
+        PhysicalPlan pp = Util.buildPp(pigServer, query);
         MROperPlan mp = Util.buildMRPlan(pp, pc);
         assertEquals("Checking number of MR Jobs for merge join with " +
         		"IndexableLoadFunc:", 1, mp.size());
@@ -981,17 +967,14 @@ public class TestMRCompiler extends juni
     
     @Test
     public void testCastFuncShipped() throws Exception{
-        
-        planTester.buildPlan("a = load '/tmp/input1' using " + PigStorageNoDefCtor.class.getName() + 
-                "('\t') as (a0, a1, a2);");
-        planTester.buildPlan("b = group a by a0;");
-        planTester.buildPlan("c = foreach b generate flatten(a);");
-        planTester.buildPlan("d = order c by a0;");
-        planTester.buildPlan("e = foreach d generate a1+a2;");
-        LogicalPlan lp = planTester.buildPlan("store e into '/tmp';");
-        planTester.typeCheckPlan(lp);
-        
-        PhysicalPlan pp = Util.buildPhysicalPlan(lp, pc);
+        String query = "a = load 'input1' using " + PigStorageNoDefCtor.class.getName() + 
+                "('\t') as (a0, a1, a2);" +
+        "b = group a by a0;" +
+        "c = foreach b generate flatten(a);" +
+        "d = order c by a0;" +
+        "e = foreach d generate a1+a2;" +
+        "store e into 'output';";
+        PhysicalPlan pp = Util.buildPp(pigServer, query);
         MROperPlan mp = Util.buildMRPlan(pp, pc);
         MapReduceOper op = mp.getLeaves().get(0);
         assertTrue(op.UDFs.contains(new FuncSpec(PigStorageNoDefCtor.class.getName())+"('\t')"));
@@ -999,14 +982,12 @@ public class TestMRCompiler extends juni
     
     @Test
     public void testLimitAdjusterFuncShipped() throws Exception{
- 
-        planTesterMR.buildPlan("a = load 'input';");
-        planTesterMR.buildPlan("b = order a by $0 parallel 2;");
-        planTesterMR.buildPlan("c = limit b 7;");
-        LogicalPlan lp = planTesterMR.buildPlan("store c into '/tmp' using "
-                + PigStorageNoDefCtor.class.getName() + "('\t');");
+        String query = "a = load 'input';" + 
+        "b = order a by $0 parallel 2;" +
+        "c = limit b 7;" + "store c into 'output' using "
+                + PigStorageNoDefCtor.class.getName() + "('\t');";
          
-        PhysicalPlan pp = Util.buildPhysicalPlan(lp, pc);
+        PhysicalPlan pp = Util.buildPp(pigServerMR, query);
         MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
         MapReduceOper mrOper = mrPlan.getRoots().get(0);
         int count = 1;

Modified: pig/branches/branch-0.9/test/org/apache/pig/test/TestMapSideCogroup.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/test/org/apache/pig/test/TestMapSideCogroup.java?rev=1099123&r1=1099122&r2=1099123&view=diff
==============================================================================
--- pig/branches/branch-0.9/test/org/apache/pig/test/TestMapSideCogroup.java (original)
+++ pig/branches/branch-0.9/test/org/apache/pig/test/TestMapSideCogroup.java Tue May  3 16:58:19 2011
@@ -37,25 +37,26 @@ import org.apache.pig.IndexableLoadFunc;
 import org.apache.pig.PigServer;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.LogicalToPhysicalTranslatorException;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.builtin.PigStorage;
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DefaultTuple;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.logicalLayer.LOCogroup;
-import org.apache.pig.impl.logicalLayer.LogicalPlan;
-import org.apache.pig.test.utils.LogicalPlanTester;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.logical.relational.LOCogroup;
+import org.apache.pig.newplan.logical.relational.LogicalPlan;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.Test;
-public class TestMapSideCogroup {
 
+public class TestMapSideCogroup {
     private static final String INPUT_FILE1 = "testCogrpInput1.txt";
     private static final String INPUT_FILE2 = "testCogrpInput2.txt";
     private static final String INPUT_FILE3 = "testCogrpInput3.txt";
@@ -144,21 +145,26 @@ public class TestMapSideCogroup {
     @Test
     public void testCompilation(){
         try{
-            LogicalPlanTester lpt = new LogicalPlanTester();
-            lpt.buildPlan("A = LOAD 'data1' using "+ DummyCollectableLoader.class.getName() +"() as (id, name, grade);");
-            lpt.buildPlan("B = LOAD 'data2' using "+ DummyIndexableLoader.class.getName() +"() as (id, name, grade);");
-            lpt.buildPlan("D = LOAD 'data2' using "+ DummyIndexableLoader.class.getName() +"() as (id, name, grade);");
-            LogicalPlan lp = lpt.buildPlan("C = cogroup A by id, B by id, D by id using 'merge';");
-            assertEquals(LOCogroup.GROUPTYPE.MERGE, ((LOCogroup)lp.getLeaves().get(0)).getGroupType());
+            PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+            String query = "A = LOAD 'data1' using "+ DummyCollectableLoader.class.getName() +"() as (id, name, grade);" + 
+            "B = LOAD 'data2' using "+ DummyIndexableLoader.class.getName() +"() as (id, name, grade);" +
+            "D = LOAD 'data2' using "+ DummyIndexableLoader.class.getName() +"() as (id, name, grade);" +
+            "C = cogroup A by id, B by id, D by id using 'merge';" +
+            "store C into 'output';";
+            LogicalPlan lp = Util.buildLp(pigServer, query);
+            Operator op = lp.getSinks().get(0);
+            LOCogroup cogrp = (LOCogroup)lp.getPredecessors(op).get(0);
+            assertEquals(LOCogroup.GROUPTYPE.MERGE, cogrp.getGroupType());
 
             PigContext pc = new PigContext(ExecType.MAPREDUCE,cluster.getProperties());
             pc.connect();
-            PhysicalPlan phyP = Util.buildPhysicalPlan(lp, pc);
+            PhysicalPlan phyP = Util.buildPp(pigServer, query);
             PhysicalOperator phyOp = phyP.getLeaves().get(0);
+            assertTrue(phyOp instanceof POStore);
+            phyOp = phyOp.getInputs().get(0);
             assertTrue(phyOp instanceof POMergeCogroup);
 
-            lp = lpt.buildPlan("store C into 'out';");
-            MROperPlan mrPlan = Util.buildMRPlan(Util.buildPhysicalPlan(lp, pc),pc);            
+            MROperPlan mrPlan = Util.buildMRPlan(phyP,pc);            
             assertEquals(2,mrPlan.size());
 
             Iterator<MapReduceOper> itr = mrPlan.iterator();
@@ -169,8 +175,6 @@ public class TestMapSideCogroup {
             oper = itr.next();
             assertFalse(oper.reducePlan.isEmpty());
             assertFalse(oper.mapPlan.isEmpty());
-
-
         } catch(Exception e){
             e.printStackTrace();
             fail("Compilation of merged cogroup failed.");
@@ -178,43 +182,52 @@ public class TestMapSideCogroup {
 
     }
 
-    @Test
-    public void testFailure1() throws Exception{
-        LogicalPlanTester lpt = new LogicalPlanTester();
-        lpt.buildPlan("A = LOAD 'data1' using "+ DummyCollectableLoader.class.getName() +"() as (id, name, grade);");
-        lpt.buildPlan("E = group A by id;");
-        lpt.buildPlan("B = LOAD 'data2' using "+ DummyIndexableLoader.class.getName() +"() as (id, name, grade);");
-        lpt.buildPlan("D = LOAD 'data2' using "+ DummyIndexableLoader.class.getName() +"() as (id, name, grade);");
-        LogicalPlan lp = lpt.buildPlan("C = cogroup E by A.id, B by id, D by id using 'merge';");
-        assertEquals(LOCogroup.GROUPTYPE.MERGE, ((LOCogroup)lp.getLeaves().get(0)).getGroupType());
-
-        PigContext pc = new PigContext(ExecType.MAPREDUCE,cluster.getProperties());
-        pc.connect();
-        boolean exceptionCaught = false;
-        try{
-            Util.buildPhysicalPlan(lp, pc);   
-        }catch (LogicalToPhysicalTranslatorException e){
-            assertEquals(1103,e.getErrorCode());
-            exceptionCaught = true;
-        }
-        assertTrue(exceptionCaught);
-    }
+//    @Test // PIG-2018
+//    public void testFailure1() throws Exception{
+//        PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+//        String query = "A = LOAD 'data1' using "+ DummyCollectableLoader.class.getName() +"() as (id, name, grade);" +
+//        "E = group A by id;" +
+//        "B = LOAD 'data2' using "+ DummyIndexableLoader.class.getName() +"() as (id, name, grade);" +
+//        "D = LOAD 'data2' using "+ DummyIndexableLoader.class.getName() +"() as (id, name, grade);" +
+//        "C = cogroup E by A.id, B by id, D by id using 'merge';" +
+//        "store C into 'output';";
+//        LogicalPlan lp = Util.buildLp(pigServer, query);
+//        Operator op = lp.getSinks().get(0);
+//        LOCogroup cogrp = (LOCogroup)lp.getPredecessors(op).get(0);
+//        assertEquals(LOCogroup.GROUPTYPE.MERGE, cogrp.getGroupType());
+//
+//        PigContext pc = new PigContext(ExecType.MAPREDUCE,cluster.getProperties());
+//        pc.connect();
+//        boolean exceptionCaught = false;
+//        try{
+//            Util.buildPp(pigServer, query);   
+//        }catch (java.lang.reflect.InvocationTargetException e){
+//        	FrontendException ex = (FrontendException)e.getTargetException();
+//            assertEquals(1103,ex.getErrorCode());
+//            exceptionCaught = true;
+//        }
+//        assertTrue(exceptionCaught);
+//    }
     
     @Test
     public void testFailure2() throws Exception{
-        LogicalPlanTester lpt = new LogicalPlanTester();
-        lpt.buildPlan("A = LOAD 'data1' using "+ DummyCollectableLoader.class.getName() +"() as (id, name, grade);");
-        lpt.buildPlan("B = LOAD 'data2' using "+ DummyIndexableLoader.class.getName() +"() as (id, name, grade);");
-        lpt.buildPlan("D = LOAD 'data2' using "+ DummyIndexableLoader.class.getName() +"() as (id, name, grade);");
-        LogicalPlan lp = lpt.buildPlan("C = cogroup A by id inner, B by id, D by id inner using 'merge';");
-        assertEquals(LOCogroup.GROUPTYPE.MERGE, ((LOCogroup)lp.getLeaves().get(0)).getGroupType());
+        PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        String query = "A = LOAD 'data1' using "+ DummyCollectableLoader.class.getName() +"() as (id, name, grade);" +
+        "B = LOAD 'data2' using "+ DummyIndexableLoader.class.getName() +"() as (id, name, grade);" +
+        "D = LOAD 'data2' using "+ DummyIndexableLoader.class.getName() +"() as (id, name, grade);" +
+        "C = cogroup A by id inner, B by id, D by id inner using 'merge';" +
+        "store C into 'output';";
+        LogicalPlan lp = Util.buildLp(pigServer, query);
+        Operator op = lp.getSinks().get(0);
+        LOCogroup cogrp = (LOCogroup)lp.getPredecessors(op).get(0);
+        assertEquals(LOCogroup.GROUPTYPE.MERGE, cogrp.getGroupType());
 
         PigContext pc = new PigContext(ExecType.MAPREDUCE,cluster.getProperties());
         pc.connect();
         boolean exceptionCaught = false;
         try{
-            Util.buildPhysicalPlan(lp, pc);   
-        }catch (LogicalToPhysicalTranslatorException e){
+            Util.buildPp(pigServer, query);   
+        }catch (java.lang.reflect.InvocationTargetException e){
             exceptionCaught = true;
         }
         assertTrue(exceptionCaught);

Modified: pig/branches/branch-0.9/test/org/apache/pig/test/TestMergeForEachOptimization.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/test/org/apache/pig/test/TestMergeForEachOptimization.java?rev=1099123&r1=1099122&r2=1099123&view=diff
==============================================================================
--- pig/branches/branch-0.9/test/org/apache/pig/test/TestMergeForEachOptimization.java (original)
+++ pig/branches/branch-0.9/test/org/apache/pig/test/TestMergeForEachOptimization.java Tue May  3 16:58:19 2011
@@ -24,14 +24,13 @@ import java.io.IOException;
 import java.util.*;
 
 import org.apache.pig.ExecType;
-import org.apache.pig.newplan.logical.LogicalPlanMigrationVistor;
+import org.apache.pig.PigServer;
 import org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer;
 import org.apache.pig.newplan.logical.relational.LOForEach;
 import org.apache.pig.newplan.logical.relational.LOGenerate;
 import org.apache.pig.newplan.logical.relational.LOJoin;
 import org.apache.pig.newplan.logical.relational.LogicalPlan;
 import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
-import org.apache.pig.newplan.logical.relational.LogicalRelationalNodesVisitor;
 import org.apache.pig.newplan.logical.relational.LogicalSchema;
 import org.apache.pig.newplan.logical.rules.AddForEach;
 import org.apache.pig.newplan.logical.rules.LoadTypeCastInserter;
@@ -40,50 +39,41 @@ import org.apache.pig.newplan.Operator;
 import org.apache.pig.newplan.OperatorPlan;
 import org.apache.pig.newplan.optimizer.PlanOptimizer;
 import org.apache.pig.newplan.optimizer.Rule;
+import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.test.utils.LogicalPlanTester;
 
 import junit.framework.Assert;
 
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
 public class TestMergeForEachOptimization {
     LogicalPlan plan = null;
     PigContext pc = new PigContext( ExecType.LOCAL, new Properties() );
+    PigServer pigServer = null;
   
-    private LogicalPlan migratePlan(org.apache.pig.impl.logicalLayer.LogicalPlan lp) throws VisitorException{
-        LogicalPlanMigrationVistor visitor = new LogicalPlanMigrationVistor(lp);        
-        visitor.visit();
-        org.apache.pig.newplan.logical.relational.LogicalPlan newPlan = visitor.getNewLogicalPlan();
-        return newPlan;
+    @Before
+    public void setup() throws ExecException {
+        pigServer = new PigServer( pc );
     }
     
-    @BeforeClass
-    public static void setup() {
-        
-    }
-    
-    @AfterClass
-    public static void tearDown() {
+    @After
+    public void tearDown() {
         
     }
     
     /**
      * Basic test case. Two simple FOREACH statements can be merged to one.
-     * 
-     * @throws IOException
+     * @throws Exception 
      */
     @Test   
-    public void testSimple() throws IOException  {
-        LogicalPlanTester lpt = new LogicalPlanTester( pc );
-        lpt.buildPlan( "A = load 'file.txt' as (a, b, c);" );
-        lpt.buildPlan( "B = foreach A generate a+b, c-b;" );
-        lpt.buildPlan( "C = foreach B generate $0+5, $1;" );
-        org.apache.pig.impl.logicalLayer.LogicalPlan plan = lpt.buildPlan( "store C into 'empty';" );  
-        LogicalPlan newLogicalPlan = migratePlan( plan );
+    public void testSimple() throws Exception  {
+        String query = "A = load 'file.txt' as (a, b, c);" +
+         "B = foreach A generate a+b, c-b;" +
+         "C = foreach B generate $0+5, $1;" +
+         "store C into 'empty';";  
+        LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);
         
         int forEachCount1 = getForEachOperatorCount( newLogicalPlan );
         int outputExprCount1 = getOutputExprCount( newLogicalPlan );
@@ -103,17 +93,14 @@ public class TestMergeForEachOptimizatio
     
     /**
      * Test more complex case where the first for each in the script has inner plan.
-     * 
-     * @throws IOException
+     * @throws Exception 
      */
     @Test
-    public void testComplex() throws IOException {
-        LogicalPlanTester lpt = new LogicalPlanTester( pc );
-        lpt.buildPlan( "A = load 'file.txt' as (a:int, b, c:bag{t:tuple(c0:int,c1:int)});" );
-        lpt.buildPlan( "B = foreach A { S = ORDER c BY $0; generate $0, COUNT(S), SUM(S); };" );
-        lpt.buildPlan( "C = foreach B generate $2+5 as x, $0-$1/2 as y;" );
-        org.apache.pig.impl.logicalLayer.LogicalPlan plan = lpt.buildPlan( "store C into 'empty';" );  
-        LogicalPlan newLogicalPlan = migratePlan( plan );
+    public void testComplex() throws Exception {
+        String query = "A = load 'file.txt' as (a:int, b, c:bag{t:tuple(c0:int,c1:int)});" +
+         "B = foreach A { S = ORDER c BY $0; generate $0, COUNT(S), SIZE(S); };" +
+         "C = foreach B generate $2+5 as x, $0-$1/2 as y;" + "store C into 'empty';" ;  
+        LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);
         
         int forEachCount1 = getForEachOperatorCount( newLogicalPlan );
         int outputExprCount1 = getOutputExprCount( newLogicalPlan );
@@ -138,19 +125,16 @@ public class TestMergeForEachOptimizatio
     
     /**
      * One output of first foreach was referred more than once in the second foreach
-     * 
-     * @throws IOException
+     * @throws Exception 
      */
     @Test
-    public void testDuplicateInputs() throws IOException {
-        LogicalPlanTester lpt = new LogicalPlanTester( pc );
-        lpt.buildPlan( "A = load 'file.txt' as (a0:int, a1:double);" );
-        lpt.buildPlan( "A1 = foreach A generate (int)a0 as a0, (double)a1 as a1;" );
-        lpt.buildPlan( "B = group A1 all;" );
-        lpt.buildPlan( "C = foreach B generate A1;" );
-        lpt.buildPlan( "D = foreach C generate SUM(A1.a0), AVG(A1.a1);" );
-        org.apache.pig.impl.logicalLayer.LogicalPlan plan = lpt.buildPlan( "store D into 'empty';" );  
-        LogicalPlan newLogicalPlan = migratePlan( plan );
+    public void testDuplicateInputs() throws Exception {
+        String query = "A = load 'file.txt' as (a0:int, a1:double);" +
+         "A1 = foreach A generate (int)a0 as a0, (double)a1 as a1;" +
+         "B = group A1 all;" +
+         "C = foreach B generate A1;" +
+         "D = foreach C generate SUM(A1.a0), AVG(A1.a1);" + "store D into 'empty';" ;  
+        LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);
         
         Operator store = newLogicalPlan.getSinks().get(0);
         int forEachCount1 = getForEachOperatorCount( newLogicalPlan );
@@ -172,17 +156,15 @@ public class TestMergeForEachOptimizatio
     /**
      * Not all consecutive FOREACHes can be merged. In this case, the second FOREACH statment
      * has inner plan, which cannot be merged with one before it.
-     * 
-     * @throws IOException
+     * @throws Exception 
      */
     @Test
-    public void testNegative1() throws IOException {
-        LogicalPlanTester lpt = new LogicalPlanTester( pc );
-        lpt.buildPlan( "A = LOAD 'file.txt' as (a, b, c, d:bag{t:tuple(c0:int,c1:int)});" );
-        lpt.buildPlan( "B = FOREACH A GENERATE a+5 AS u, b-c/2 AS v, d AS w;" );
-        lpt.buildPlan( "C = FOREACH B { S = ORDER w BY $0; GENERATE $0 as x, COUNT(S) as y; };" );
-        org.apache.pig.impl.logicalLayer.LogicalPlan plan = lpt.buildPlan( "store C into 'empty';" );  
-        LogicalPlan newLogicalPlan = migratePlan( plan );
+    public void testNegative1() throws Exception {
+        String query = "A = LOAD 'file.txt' as (a, b, c, d:bag{t:tuple(c0:int,c1:int)});" +
+         "B = FOREACH A GENERATE a+5 AS u, b-c/2 AS v, d AS w;" +
+         "C = FOREACH B { S = ORDER w BY $0; GENERATE $0 as x, COUNT(S) as y; };" +
+         "store C into 'empty';";  
+        LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);
         
         int forEachCount1 = getForEachOperatorCount( newLogicalPlan );
                
@@ -198,17 +180,14 @@ public class TestMergeForEachOptimizatio
     
     /**
      * MergeForEach Optimization is off if the first statement has a FLATTEN operator.
-     * 
-     * @throws IOException
+     * @throws Exception 
      */
     @Test
-    public void testNegative2() throws IOException {
-        LogicalPlanTester lpt = new LogicalPlanTester( pc );
-        lpt.buildPlan( "A = LOAD 'file.txt' as (a, b, c);" );
-        lpt.buildPlan( "B = FOREACH A GENERATE FLATTEN(a), b, c;" );
-        lpt.buildPlan( "C = FOREACH B GENERATE $0, $1+$2;" );
-        org.apache.pig.impl.logicalLayer.LogicalPlan plan = lpt.buildPlan( "store C into 'empty';" );  
-        LogicalPlan newLogicalPlan = migratePlan( plan );
+    public void testNegative2() throws Exception {
+        String query = "A = LOAD 'file.txt' as (a, b, c);" +
+         "B = FOREACH A GENERATE FLATTEN(a), b, c;" +
+         "C = FOREACH B GENERATE $0, $1+$2;" + "store C into 'empty';" ;  
+        LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);
         
         int forEachCount1 = getForEachOperatorCount( newLogicalPlan );
                
@@ -223,20 +202,16 @@ public class TestMergeForEachOptimizatio
     
     /**
      * Ensure that join input order does not get reversed (PIG-1672)
-     * 
-     * @throws IOException
+     * @throws Exception 
      */
     @Test   
-    public void testJoinInputOrder() throws IOException  {
-        LogicalPlanTester lpt = new LogicalPlanTester( pc );
-        lpt.buildPlan( "l1 = load 'y' as (a);" );
-        lpt.buildPlan( "l2 = load 'z' as (a1,b1,c1,d1);" );
-        lpt.buildPlan( "f1 = foreach l2 generate a1, b1, c1, d1;" );
-        lpt.buildPlan( "f2 = foreach f1 generate a1, b1, c1;" );
-        lpt.buildPlan( "j1 = join f2 by a1, l1 by a using 'replicated';" );
-        
-        org.apache.pig.impl.logicalLayer.LogicalPlan plan = lpt.buildPlan( "store j1 into 'empty';" );  
-        LogicalPlan newLogicalPlan = migratePlan( plan );
+    public void testJoinInputOrder() throws Exception  {
+        String query = "l1 = load 'y' as (a);" +
+         "l2 = load 'z' as (a1,b1,c1,d1);" +
+         "f1 = foreach l2 generate a1, b1, c1, d1;" +
+         "f2 = foreach f1 generate a1, b1, c1;" +
+         "j1 = join f2 by a1, l1 by a using 'replicated';" + "store j1 into 'empty';" ;  
+        LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);
 
         int forEachCount1 = getForEachOperatorCount( newLogicalPlan );
         List<Operator> loads = newLogicalPlan.getSources();

Modified: pig/branches/branch-0.9/test/org/apache/pig/test/TestMergeJoin.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/test/org/apache/pig/test/TestMergeJoin.java?rev=1099123&r1=1099122&r2=1099123&view=diff
==============================================================================
--- pig/branches/branch-0.9/test/org/apache/pig/test/TestMergeJoin.java (original)
+++ pig/branches/branch-0.9/test/org/apache/pig/test/TestMergeJoin.java Tue May  3 16:58:19 2011
@@ -40,10 +40,8 @@ import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.logicalLayer.LogicalPlan;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.util.LogUtils;
-import org.apache.pig.test.utils.LogicalPlanTester;
 import org.apache.pig.test.utils.TestHelper;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -458,15 +456,13 @@ public class TestMergeJoin {
 
     @Test
     public void testParallelism() throws Exception{
-
-        LogicalPlanTester tester = new LogicalPlanTester();
-        tester.buildPlan("A = LOAD '" + INPUT_FILE + "';");
-        tester.buildPlan("B = LOAD '" + INPUT_FILE + "';");
-        tester.buildPlan("C = join A by $0, B by $0 using 'merge' parallel 50;");
-        LogicalPlan lp = tester.buildPlan("store C into 'out';");
+        String query = "A = LOAD '" + INPUT_FILE + "';" +
+                       "B = LOAD '" + INPUT_FILE + "';" +
+                       "C = join A by $0, B by $0 using 'merge' parallel 50;" + 
+                       "store C into 'out';";
 	PigContext pc = new PigContext(ExecType.MAPREDUCE,cluster.getProperties());
-        pc.connect();
-	MROperPlan mro = Util.buildMRPlan(Util.buildPhysicalPlan(lp, pc),pc);
+    pc.connect();
+	MROperPlan mro = Util.buildMRPlan(Util.buildPp(pigServer, query),pc);
         Assert.assertEquals(1,mro.getRoots().get(0).getRequestedParallelism());
     }
 

Modified: pig/branches/branch-0.9/test/org/apache/pig/test/TestMergeJoinOuter.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/test/org/apache/pig/test/TestMergeJoinOuter.java?rev=1099123&r1=1099122&r2=1099123&view=diff
==============================================================================
--- pig/branches/branch-0.9/test/org/apache/pig/test/TestMergeJoinOuter.java (original)
+++ pig/branches/branch-0.9/test/org/apache/pig/test/TestMergeJoinOuter.java Tue May  3 16:58:19 2011
@@ -32,18 +32,20 @@ import org.apache.pig.PigServer;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.LogicalToPhysicalTranslatorException;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.logicalLayer.LOJoin;
-import org.apache.pig.impl.logicalLayer.LogicalPlan;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.logical.relational.LOJoin;
+import org.apache.pig.newplan.logical.relational.LOStore;
+import org.apache.pig.newplan.logical.relational.LogicalPlan;
 import org.apache.pig.test.TestMapSideCogroup.DummyCollectableLoader;
 import org.apache.pig.test.TestMapSideCogroup.DummyIndexableLoader;
-import org.apache.pig.test.utils.LogicalPlanTester;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -93,22 +95,26 @@ public class TestMergeJoinOuter {
     @Test
     public void testCompilation(){
         try{
-            LogicalPlanTester lpt = new LogicalPlanTester();
-            lpt.buildPlan("A = LOAD 'data1' using "+ DummyCollectableLoader.class.getName() +"() as (id, name, grade);");
-            lpt.buildPlan("B = LOAD 'data2' using "+ DummyIndexableLoader.class.getName() +"() as (id, name, grade);");
-            LogicalPlan lp = lpt.buildPlan("C = join A by id left, B by id using 'merge';");
-            assertEquals(LOJoin.JOINTYPE.MERGE, ((LOJoin)lp.getLeaves().get(0)).getJoinType());
+            String query = "A = LOAD 'data1' using "+ DummyCollectableLoader.class.getName() +"() as (id, name, grade);" + 
+            "B = LOAD 'data2' using "+ DummyIndexableLoader.class.getName() +"() as (id, name, grade);" +
+            "C = join A by id left, B by id using 'merge';" +
+            "store C into 'out';";
+            LogicalPlan lp = Util.buildLp(pigServer, query);
+            LOStore store = (LOStore)lp.getSinks().get(0);
+            LOJoin join = (LOJoin)lp.getPredecessors(store).get(0);
+            assertEquals(LOJoin.JOINTYPE.MERGE, join.getJoinType());
 
             PigContext pc = new PigContext(ExecType.MAPREDUCE,cluster.getProperties());
             pc.connect();
-            PhysicalPlan phyP = Util.buildPhysicalPlan(lp, pc);
+            PhysicalPlan phyP = Util.buildPp(pigServer, query);
             PhysicalOperator phyOp = phyP.getLeaves().get(0);
+            assertTrue(phyOp instanceof POStore);
+            phyOp = phyOp.getInputs().get(0);
             assertTrue(phyOp instanceof POForEach);
             assertEquals(1,phyOp.getInputs().size());
             assertTrue(phyOp.getInputs().get(0) instanceof POMergeCogroup);
             
-            lp = lpt.buildPlan("store C into 'out';");
-            MROperPlan mrPlan = Util.buildMRPlan(Util.buildPhysicalPlan(lp, pc),pc);            
+            MROperPlan mrPlan = Util.buildMRPlan(phyP,pc);            
             assertEquals(2,mrPlan.size());
 
             Iterator<MapReduceOper> itr = mrPlan.iterator();
@@ -130,20 +136,24 @@ public class TestMergeJoinOuter {
 
     @Test
     public void testFailure() throws Exception{
-        LogicalPlanTester lpt = new LogicalPlanTester();
-        lpt.buildPlan("A = LOAD 'data1' using "+ DummyCollectableLoader.class.getName() +"() as (id, name, grade);");
-        lpt.buildPlan("E = group A by id;");
-        lpt.buildPlan("B = LOAD 'data2' using "+ DummyIndexableLoader.class.getName() +"() as (id, name, grade);");
-        LogicalPlan lp = lpt.buildPlan("C = join E by A.id, B by id using 'merge';");
-        assertEquals(LOJoin.JOINTYPE.MERGE, ((LOJoin)lp.getLeaves().get(0)).getJoinType());
+        String query = "A = LOAD 'data1' using "+ DummyCollectableLoader.class.getName() +"() as (id, name, grade);" +
+        "E = group A by id;" +
+        "B = LOAD 'data2' using "+ DummyIndexableLoader.class.getName() +"() as (id, name, grade);" +
+        "C = join E by A.id, B by id using 'merge';" +
+        "store C into 'output';";
+        LogicalPlan lp = Util.buildLp(pigServer, query);
+        Operator op = lp.getSinks().get(0);
+        LOJoin join = (LOJoin)lp.getPredecessors(op).get(0);
+        assertEquals(LOJoin.JOINTYPE.MERGE, join.getJoinType());
 
         PigContext pc = new PigContext(ExecType.MAPREDUCE,cluster.getProperties());
         pc.connect();
         boolean exceptionCaught = false;
         try{
-            Util.buildPhysicalPlan(lp, pc);   
-        }catch (LogicalToPhysicalTranslatorException e){
-            assertEquals(1103,e.getErrorCode());
+            Util.buildPp(pigServer, query);   
+        }catch (java.lang.reflect.InvocationTargetException e){
+        	FrontendException ex = (FrontendException)e.getTargetException();
+            assertEquals(1103,ex.getErrorCode());
             exceptionCaught = true;
         }
         assertTrue(exceptionCaught);

Modified: pig/branches/branch-0.9/test/org/apache/pig/test/TestNewPlanColumnPrune.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/test/org/apache/pig/test/TestNewPlanColumnPrune.java?rev=1099123&r1=1099122&r2=1099123&view=diff
==============================================================================
--- pig/branches/branch-0.9/test/org/apache/pig/test/TestNewPlanColumnPrune.java (original)
+++ pig/branches/branch-0.9/test/org/apache/pig/test/TestNewPlanColumnPrune.java Tue May  3 16:58:19 2011
@@ -4,7 +4,7 @@
  * distributed with this work for additional information
  * regarding copyright ownership.  The ASF licenses this file
  * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
+ * "License" + you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
  *     http://www.apache.org/licenses/LICENSE-2.0
@@ -28,14 +28,11 @@ import java.util.Set;
 import junit.framework.TestCase;
 
 import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
 import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.logicalLayer.FrontendException;
-import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.newplan.Operator;
 import org.apache.pig.newplan.OperatorPlan;
-import org.apache.pig.newplan.logical.LogicalPlanMigrationVistor;
 import org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer;
-import org.apache.pig.newplan.logical.optimizer.SchemaResetter;
 import org.apache.pig.newplan.logical.relational.LOLoad;
 import org.apache.pig.newplan.logical.relational.LogicalPlan;
 import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
@@ -44,260 +41,229 @@ import org.apache.pig.newplan.logical.ru
 import org.apache.pig.newplan.logical.rules.MapKeysPruneHelper;
 import org.apache.pig.newplan.optimizer.PlanOptimizer;
 import org.apache.pig.newplan.optimizer.Rule;
-import org.apache.pig.test.utils.LogicalPlanTester;
 
 public class TestNewPlanColumnPrune extends TestCase {
 
     LogicalPlan plan = null;
     PigContext pc = new PigContext(ExecType.LOCAL, new Properties());
   
-    private LogicalPlan migratePlan(org.apache.pig.impl.logicalLayer.LogicalPlan lp) throws FrontendException{
-        LogicalPlanMigrationVistor visitor = new LogicalPlanMigrationVistor(lp);        
-        visitor.visit();
-        org.apache.pig.newplan.logical.relational.LogicalPlan newPlan = visitor.getNewLogicalPlan();
-        
-        SchemaResetter schemaResetter = new SchemaResetter(newPlan);
-        schemaResetter.visit();
-        
-        return newPlan;
+    private LogicalPlan buildPlan(String query) throws Exception{
+        PigServer pigServer = new PigServer( pc );
+        return Util.buildLp(pigServer, query);
     }
     
    
     public void testNoPrune() throws Exception  {
         // no foreach
-        LogicalPlanTester lpt = new LogicalPlanTester(pc);
-        lpt.buildPlan("a = load 'd.txt' as (id, v1, v2);");
-        lpt.buildPlan("b = filter a by v1==NULL;");        
-        org.apache.pig.impl.logicalLayer.LogicalPlan plan = lpt.buildPlan("store b into 'empty';");  
-        LogicalPlan newLogicalPlan = migratePlan(plan);
+        String query = "a = load 'd.txt' as (id, v1, v2);" +
+        "b = filter a by v1==NULL;" +        
+        "store b into 'empty';";  
+        LogicalPlan newLogicalPlan = buildPlan(query);
                
         PlanOptimizer optimizer = new MyPlanOptimizer(newLogicalPlan, 3);
         optimizer.optimize();
         
-        lpt = new LogicalPlanTester(pc);
-        lpt.buildPlan("a = load 'd.txt' as (id, v1, v2);");
-        lpt.buildPlan("b = filter a by v1==NULL;");        
-        plan = lpt.buildPlan("store b into 'empty';");  
-        LogicalPlan expected = migratePlan(plan);
+        query  = "a = load 'd.txt' as (id, v1, v2);" +
+        "b = filter a by v1==NULL;"  +       
+        "store b into 'empty';";  
+        LogicalPlan expected = buildPlan(query);
         
         assertTrue(expected.isEqual(newLogicalPlan));
         
         // no schema
-        lpt = new LogicalPlanTester(pc);
-        lpt.buildPlan("a = load 'd.txt';");
-        lpt.buildPlan("b = foreach a generate $0, $1;");
-        plan = lpt.buildPlan("store b into 'empty';");  
-        newLogicalPlan = migratePlan(plan);
+        query = "a = load 'd.txt';" +
+        "b = foreach a generate $0, $1;" +
+        "store b into 'empty';";  
+        newLogicalPlan = buildPlan(query);
                
         optimizer = new MyPlanOptimizer(newLogicalPlan, 3);
         optimizer.optimize();
         
-        lpt = new LogicalPlanTester(pc);
-        lpt.buildPlan("a = load 'd.txt';");
-        lpt.buildPlan("b = foreach a generate $0, $1;");
-        plan = lpt.buildPlan("store b into 'empty';");  
-        expected = migratePlan(plan);
+        query = "a = load 'd.txt';"+
+        "b = foreach a generate $0, $1;"+
+        "store b into 'empty';";  
+        expected = buildPlan(query);
         assertTrue(expected.isEqual(newLogicalPlan));
     }
        
     public void testPrune() throws Exception  {
         // only foreach
-        LogicalPlanTester lpt = new LogicalPlanTester(pc);
-        lpt.buildPlan("a = load 'd.txt' as (id, v1, v2);");
-        lpt.buildPlan("b = foreach a generate id;");        
-        org.apache.pig.impl.logicalLayer.LogicalPlan plan = lpt.buildPlan("store b into 'empty';");  
-        LogicalPlan newLogicalPlan = migratePlan(plan);
+        String query = "a = load 'd.txt' as (id, v1, v2);" +
+        "b = foreach a generate id;"+        
+        "store b into 'empty';";  
+        LogicalPlan newLogicalPlan = buildPlan(query);
                
         PlanOptimizer optimizer = new MyPlanOptimizer(newLogicalPlan, 3);
         optimizer.optimize();
         
-        lpt = new LogicalPlanTester(pc);
-        lpt.buildPlan("a = load 'd.txt' as (id);");
-        lpt.buildPlan("b = foreach a generate id;");        
-        plan = lpt.buildPlan("store b into 'empty';");  
-        LogicalPlan expected = migratePlan(plan);
+        query = "a = load 'd.txt' as (id);" +
+        "b = foreach a generate id;"+        
+        "store b into 'empty';";  
+        LogicalPlan expected = buildPlan(query);
         
         assertTrue(expected.isEqual(newLogicalPlan));
         
         // with filter
-        lpt = new LogicalPlanTester(pc);
-        lpt.buildPlan("a = load 'd.txt' as (id, v1, v5, v3, v4, v2);");
-        lpt.buildPlan("b = filter a by v1 != NULL AND (v2+v3)<100;");
-        lpt.buildPlan("c = foreach b generate id;");
-        plan = lpt.buildPlan("store c into 'empty';");  
-        newLogicalPlan = migratePlan(plan);
+        query = "a = load 'd.txt' as (id, v1, v5, v3, v4, v2);"+
+        "b = filter a by v1 != NULL AND (v2+v3)<100;"+
+        "c = foreach b generate id;"+
+        "store c into 'empty';";  
+        newLogicalPlan = buildPlan(query);
                
         optimizer = new MyPlanOptimizer(newLogicalPlan, 3);
         optimizer.optimize();
         
-        lpt = new LogicalPlanTester(pc);
-        lpt.buildPlan("a = load 'd.txt' as (id, v1, v3, v2);");
-        lpt.buildPlan("b = filter a by v1 != NULL AND (v2+v3)<100;");
-        lpt.buildPlan("c = foreach b generate id;");
-        plan = lpt.buildPlan("store c into 'empty';"); 
-        expected = migratePlan(plan);
+        query = "a = load 'd.txt' as (id, v1, v3, v2);" +
+        "b = filter a by v1 != NULL AND (v2+v3)<100;" +
+        "c = foreach b generate id;" +
+        "store c into 'empty';"; 
+        expected = buildPlan(query);
         assertTrue(expected.isEqual(newLogicalPlan));
         
         // with 2 foreach
-        lpt = new LogicalPlanTester(pc);
-        lpt.buildPlan("a = load 'd.txt' as (id, v1, v5, v3, v4, v2);");
-        lpt.buildPlan("b = foreach a generate v2, v5, v4;");
-        lpt.buildPlan("c = foreach b generate v5, v4;");
-        plan = lpt.buildPlan("store c into 'empty';");  
-        newLogicalPlan = migratePlan(plan);
+        query = "a = load 'd.txt' as (id, v1, v5, v3, v4, v2);" +
+        "b = foreach a generate v2, v5, v4;" +
+        "c = foreach b generate v5, v4;" +
+        "store c into 'empty';";  
+        newLogicalPlan = buildPlan(query);
                
         optimizer = new MyPlanOptimizer(newLogicalPlan, 3);
         optimizer.optimize();
         
-        lpt = new LogicalPlanTester(pc);
-        lpt.buildPlan("a = load 'd.txt' as (v5, v4);");
-        lpt.buildPlan("b = foreach a generate v5, v4;");
-        lpt.buildPlan("c = foreach b generate v5, v4;");
-        plan = lpt.buildPlan("store c into 'empty';"); 
-        expected = migratePlan(plan);
+        query = "a = load 'd.txt' as (v5, v4);" +
+        "b = foreach a generate v5, v4;" +
+        "c = foreach b generate v5, v4;" +
+        "store c into 'empty';"; 
+        expected = buildPlan(query);
         assertTrue(expected.isEqual(newLogicalPlan));
         
         // with 2 foreach
-        lpt = new LogicalPlanTester(pc);
-        lpt.buildPlan("a = load 'd.txt' as (id, v1, v5, v3, v4, v2);");
-        lpt.buildPlan("b = foreach a generate id, v1, v5, v3, v4;");
-        lpt.buildPlan("c = foreach b generate v5, v4;");
-        plan = lpt.buildPlan("store c into 'empty';");  
-        newLogicalPlan = migratePlan(plan);
+        query = "a = load 'd.txt' as (id, v1, v5, v3, v4, v2);" +
+        "b = foreach a generate id, v1, v5, v3, v4;" +
+        "c = foreach b generate v5, v4;" +
+        "store c into 'empty';";  
+        newLogicalPlan = buildPlan(query);
                
         optimizer = new MyPlanOptimizer(newLogicalPlan, 3);
         optimizer.optimize();
         
-        lpt = new LogicalPlanTester(pc);
-        lpt.buildPlan("a = load 'd.txt' as (v5, v4);");
-        lpt.buildPlan("b = foreach a generate v5, v4;");
-        lpt.buildPlan("c = foreach b generate v5, v4;");
-        plan = lpt.buildPlan("store c into 'empty';"); 
-        expected = migratePlan(plan);
+        query = "a = load 'd.txt' as (v5, v4);" +
+        "b = foreach a generate v5, v4;" +
+        "c = foreach b generate v5, v4;" +
+        "store c into 'empty';"; 
+        expected = buildPlan(query);
         assertTrue(expected.isEqual(newLogicalPlan));
         
         // with 2 foreach and filter in between
-        lpt = new LogicalPlanTester(pc);
-        lpt.buildPlan("a = load 'd.txt' as (id, v1, v5, v3, v4, v2);");
-        lpt.buildPlan("b = foreach a generate v2, v5, v4;");
-        lpt.buildPlan("c = filter b by v2 != NULL;");
-        lpt.buildPlan("d = foreach c generate v5, v4;");
-        plan = lpt.buildPlan("store d into 'empty';");  
-        newLogicalPlan = migratePlan(plan);
+        query = "a =load 'd.txt' as (id, v1, v5, v3, v4, v2);" +
+        "b = foreach a generate v2, v5, v4;" +
+        "c = filter b by v2 != NULL;" +
+        "d = foreach c generate v5, v4;" +
+        "store d into 'empty';";  
+        newLogicalPlan = buildPlan(query);
                
         optimizer = new MyPlanOptimizer(newLogicalPlan, 3);
         optimizer.optimize();
         
-        lpt = new LogicalPlanTester(pc);
-        lpt.buildPlan("a = load 'd.txt' as (v5, v4, v2);");
-        lpt.buildPlan("b = foreach a generate v2, v5, v4;");
-        lpt.buildPlan("c = filter b by v2 != NULL;");
-        lpt.buildPlan("d = foreach c generate v5, v4;");
-        plan = lpt.buildPlan("store d into 'empty';");  
-        expected = migratePlan(plan);
+        query = "a =load 'd.txt' as (v5, v4, v2);" +
+        "b = foreach a generate v2, v5, v4;" +
+        "c = filter b by v2 != NULL;" +
+        "d = foreach c generate v5, v4;" +
+        "store d into 'empty';";  
+        expected = buildPlan(query);
         assertTrue(expected.isEqual(newLogicalPlan));
         
         // with 2 foreach after join
-        lpt = new LogicalPlanTester(pc);
-        lpt.buildPlan("a = load 'd.txt' as (id, v1, v2, v3);");
-        lpt.buildPlan("b = load 'c.txt' as (id, v4, v5, v6);");
-        lpt.buildPlan("c = join a by id, b by id;");       
-        lpt.buildPlan("d = foreach c generate a::id, v5, v3, v4;");
-        plan = lpt.buildPlan("store d into 'empty';");  
-        newLogicalPlan = migratePlan(plan);
+        query = "a =load 'd.txt' as (id, v1, v2, v3);" +
+        "b = load 'c.txt' as (id, v4, v5, v6);" +
+        "c = join a by id, b by id;" +       
+        "d = foreach c generate a::id, v5, v3, v4;" +
+        "store d into 'empty';";  
+        newLogicalPlan = buildPlan(query);
                
         optimizer = new MyPlanOptimizer(newLogicalPlan, 3);
         optimizer.optimize();
         
-        lpt = new LogicalPlanTester(pc);
-        lpt.buildPlan("a = load 'd.txt' as (id, v3);");
-        lpt.buildPlan("b = load 'c.txt' as (id, v4, v5);");
-        lpt.buildPlan("c = join a by id, b by id;");  
-        lpt.buildPlan("d = foreach c generate a::id, v5, v3, v4;");
-        plan = lpt.buildPlan("store d into 'empty';");  
-        expected = migratePlan(plan);
+        query = "a =load 'd.txt' as (id, v3);" +
+        "b = load 'c.txt' as (id, v4, v5);" +
+        "c = join a by id, b by id;" +  
+        "d = foreach c generate a::id, v5, v3, v4;" +
+        "store d into 'empty';";  
+        expected = buildPlan(query);
         assertTrue(expected.isEqual(newLogicalPlan));
         
         // with BinStorage, insert foreach after load
-        lpt = new LogicalPlanTester(pc);
-        lpt.buildPlan("a = load 'd.txt' using BinStorage() as (id, v1, v5, v3, v4, v2);");        
-        lpt.buildPlan("c = filter a by v2 != NULL;");
-        lpt.buildPlan("d = foreach c generate v5, v4;");
-        plan = lpt.buildPlan("store d into 'empty';");  
-        newLogicalPlan = migratePlan(plan);
+        query = "a =load 'd.txt' using BinStorage() as (id, v1, v5, v3, v4, v2);" +        
+        "c = filter a by v2 != NULL;" +
+        "d = foreach c generate v5, v4;" +
+        "store d into 'empty';";  
+        newLogicalPlan = buildPlan(query);
                
         optimizer = new MyPlanOptimizer(newLogicalPlan, 3);
         optimizer.optimize();
         
-        lpt = new LogicalPlanTester(pc);
-        lpt.buildPlan("a = load 'd.txt' using BinStorage() as (id, v1, v5, v3, v4, v2);");
-        lpt.buildPlan("b = foreach a generate v5, v4, v2;");
-        lpt.buildPlan("c = filter b by v2 != NULL;");
-        lpt.buildPlan("d = foreach c generate v5, v4;");
-        plan = lpt.buildPlan("store d into 'empty';");  
-        expected = migratePlan(plan);
+        query = "a =load 'd.txt' using BinStorage() as (id, v1, v5, v3, v4, v2);" +
+        "b = foreach a generate v5, v4, v2;" +
+        "c = filter b by v2 != NULL;" +
+        "d = foreach c generate v5, v4;" +
+        "store d into 'empty';";  
+        expected = buildPlan(query);
         assertTrue(expected.isEqual(newLogicalPlan));
         
        // with BinStorage, not to insert foreach after load if there is already one
-        lpt = new LogicalPlanTester(pc);
-        lpt.buildPlan("a = load 'd.txt' using BinStorage() as (id, v1, v5, v3, v4, v2);");    
-        lpt.buildPlan("b = foreach a generate v5, v4, v2;");
-        lpt.buildPlan("c = filter b by v2 != NULL;");
-        lpt.buildPlan("d = foreach c generate v5;");
-        plan = lpt.buildPlan("store d into 'empty';");  
-        newLogicalPlan = migratePlan(plan);
+        query = "a =load 'd.txt' using BinStorage() as (id, v1, v5, v3, v4, v2);" +    
+        "b = foreach a generate v5, v4, v2;" +
+        "c = filter b by v2 != NULL;" +
+        "d = foreach c generate v5;" +
+        "store d into 'empty';";  
+        newLogicalPlan = buildPlan(query);
                
         optimizer = new MyPlanOptimizer(newLogicalPlan, 3);
         optimizer.optimize();
         
-        lpt = new LogicalPlanTester(pc);
-        lpt.buildPlan("a = load 'd.txt' using BinStorage() as (id, v1, v5, v3, v4, v2);");
-        lpt.buildPlan("b = foreach a generate v5, v2;");
-        lpt.buildPlan("c = filter b by v2 != NULL;");
-        lpt.buildPlan("d = foreach c generate v5;");
-        plan = lpt.buildPlan("store d into 'empty';");  
-        expected = migratePlan(plan);
+        query = "a =load 'd.txt' using BinStorage() as (id, v1, v5, v3, v4, v2);" +
+        "b = foreach a generate v5, v2;" +
+        "c = filter b by v2 != NULL;" +
+        "d = foreach c generate v5;" +
+        "store d into 'empty';";  
+        expected = buildPlan(query);
         assertTrue(expected.isEqual(newLogicalPlan));
         
        // with BinStorage, not to insert foreach after load if there is already one
-        lpt = new LogicalPlanTester(pc);
-        lpt.buildPlan("a = load 'd.txt' using BinStorage() as (id, v1, v5, v3, v4, v2);");    
-        lpt.buildPlan("b = foreach a generate v5, v4, v2, 10;");
-        lpt.buildPlan("c = filter b by v2 != NULL;");
-        lpt.buildPlan("d = foreach c generate v5;");
-        plan = lpt.buildPlan("store d into 'empty';");  
-        newLogicalPlan = migratePlan(plan);
+        query = "a =load 'd.txt' using BinStorage() as (id, v1, v5, v3, v4, v2);" +    
+        "b = foreach a generate v5, v4, v2, 10;" +
+        "c = filter b by v2 != NULL;" +
+        "d = foreach c generate v5;" +
+        "store d into 'empty';";  
+        newLogicalPlan = buildPlan(query);
                
         optimizer = new MyPlanOptimizer(newLogicalPlan, 3);
         optimizer.optimize();
         
-        lpt = new LogicalPlanTester(pc);
-        lpt.buildPlan("a = load 'd.txt' using BinStorage() as (id, v1, v5, v3, v4, v2);");
-        lpt.buildPlan("b = foreach a generate v5, v2, 10;");
-        lpt.buildPlan("c = filter b by v2 != NULL;");
-        lpt.buildPlan("d = foreach c generate v5;");
-        plan = lpt.buildPlan("store d into 'empty';");  
-        expected = migratePlan(plan);
+        query = "a =load 'd.txt' using BinStorage() as (id, v1, v5, v3, v4, v2);" +
+        "b = foreach a generate v5, v2, 10;" +
+        "c = filter b by v2 != NULL;" +
+        "d = foreach c generate v5;" +
+        "store d into 'empty';";  
+        expected = buildPlan(query);
         assertTrue(expected.isEqual(newLogicalPlan));
     }
     
     @SuppressWarnings("unchecked")
     public void testPruneWithMapKey() throws Exception {
          // only foreach
-        LogicalPlanTester lpt = new LogicalPlanTester(pc);
-        lpt.buildPlan("a = load 'd.txt' as (id, v1, m:map[]);");
-        lpt.buildPlan("b = foreach a generate id, m#'path';");        
-        org.apache.pig.impl.logicalLayer.LogicalPlan plan = lpt.buildPlan("store b into 'empty';");  
-        LogicalPlan newLogicalPlan = migratePlan(plan);
+        String query = "a =load 'd.txt' as (id, v1, m:map[]);" +
+        "b = foreach a generate id, m#'path';" +        
+        "store b into 'empty';";  
+        LogicalPlan newLogicalPlan = buildPlan(query);
                
         PlanOptimizer optimizer = new MyPlanOptimizer(newLogicalPlan, 3);
         optimizer.optimize();
         
-        lpt = new LogicalPlanTester(pc);
-        lpt.buildPlan("a = load 'd.txt' as (id, m:map[]);");
-        lpt.buildPlan("b = foreach a generate id, m#'path';");        
-        plan = lpt.buildPlan("store b into 'empty';");  
-        LogicalPlan expected = migratePlan(plan);
+        query = "a =load 'd.txt' as (id, m:map[]);" +
+        "b = foreach a generate id, m#'path';" +        
+        "store b into 'empty';";  
+        LogicalPlan expected = buildPlan(query);
         
         assertTrue(expected.isEqual(newLogicalPlan));
         
@@ -310,26 +276,24 @@ public class TestNewPlanColumnPrune exte
         assertEquals(annotation.get(2), s);
         
         // foreach with join
-        lpt = new LogicalPlanTester(pc);
-        lpt.buildPlan("a = load 'd.txt' as (id, v1, m:map[]);");
-        lpt.buildPlan("b = load 'd.txt' as (id, v1, m:map[]);");
-        lpt.buildPlan("c = join a by id, b by id;");
-        lpt.buildPlan("d = filter c by a::m#'path' != NULL;");
-        lpt.buildPlan("e = foreach d generate a::id, b::id, b::m#'path', a::m;");        
-        plan = lpt.buildPlan("store e into 'empty';");  
-        newLogicalPlan = migratePlan(plan);
+        query = "a =load 'd.txt' as (id, v1, m:map[]);" +
+        "b = load 'd.txt' as (id, v1, m:map[]);" +
+        "c = join a by id, b by id;" +
+        "d = filter c by a::m#'path' != NULL;" +
+        "e = foreach d generate a::id, b::id, b::m#'path', a::m;" +        
+        "store e into 'empty';";  
+        newLogicalPlan = buildPlan(query);
                
         optimizer = new MyPlanOptimizer(newLogicalPlan, 3);
         optimizer.optimize();
         
-        lpt = new LogicalPlanTester(pc);
-        lpt.buildPlan("a = load 'd.txt' as (id, m:map[]);");
-        lpt.buildPlan("b = load 'd.txt' as (id, m:map[]);");
-        lpt.buildPlan("c = join a by id, b by id;");
-        lpt.buildPlan("d = filter c by a::m#'path' != NULL;");
-        lpt.buildPlan("e = foreach d generate a::id, b::id, b::m#'path', a::m;");        
-        plan = lpt.buildPlan("store e into 'empty';");  
-        expected = migratePlan(plan);
+        query = "a =load 'd.txt' as (id, m:map[]);" +
+        "b = load 'd.txt' as (id, m:map[]);" +
+        "c = join a by id, b by id;" +
+        "d = filter c by a::m#'path' != NULL;" +
+        "e = foreach d generate a::id, b::id, b::m#'path', a::m;" +        
+        "store e into 'empty';";  
+        expected = buildPlan(query);
         
         assertTrue(expected.isEqual(newLogicalPlan));
         
@@ -364,71 +328,65 @@ public class TestNewPlanColumnPrune exte
     
     public void testPruneWithBag() throws Exception  {
         // filter above foreach
-        LogicalPlanTester lpt = new LogicalPlanTester(pc);
-        lpt.buildPlan("a = load 'd.txt' as (id, v:bag{t:(s1,s2,s3)});");
-        lpt.buildPlan("b = filter a by id>10;");
-        lpt.buildPlan("c = foreach b generate id, FLATTEN(v);");    
-        lpt.buildPlan("d = foreach c generate id, v::s2;");    
-        org.apache.pig.impl.logicalLayer.LogicalPlan plan = lpt.buildPlan("store d into 'empty';");  
-        LogicalPlan newLogicalPlan = migratePlan(plan);
+        String query = "a =load 'd.txt' as (id, v:bag{t:(s1,s2,s3)});" +
+        "b = filter a by id>10;" +
+        "c = foreach b generate id, FLATTEN(v);" +    
+        "d = foreach c generate id, v::s2;" +    
+        "store d into 'empty';";  
+        LogicalPlan newLogicalPlan = buildPlan(query);
                
         PlanOptimizer optimizer = new MyPlanOptimizer(newLogicalPlan, 3);
         optimizer.optimize();
         
-        lpt = new LogicalPlanTester(pc);
-        lpt.buildPlan("a = load 'd.txt' as (id, v:bag{t:(s1,s2,s3)});");
-        lpt.buildPlan("b = filter a by id>10;");
-        lpt.buildPlan("c = foreach b generate id, FLATTEN(v);");    
-        lpt.buildPlan("d = foreach c generate id, v::s2;");    
-        plan = lpt.buildPlan("store d into 'empty';");
-        LogicalPlan expected = migratePlan(plan);
+        query = "a =load 'd.txt' as (id, v:bag{t:(s1,s2,s3)});" +
+        "b = filter a by id>10;" +
+        "c = foreach b generate id, FLATTEN(v);" +    
+        "d = foreach c generate id, v::s2;" +    
+        "store d into 'empty';";
+        LogicalPlan expected = buildPlan(query);
         
         assertTrue(expected.isEqual(newLogicalPlan));
     }
     
     public void testAddForeach() throws Exception  {
         // filter above foreach
-        LogicalPlanTester lpt = new LogicalPlanTester(pc);
-        lpt.buildPlan("a = load 'd.txt' as (id, v1, v2);");
-        lpt.buildPlan("b = filter a by v1>10;");
-        lpt.buildPlan("c = foreach b generate id;");        
-        org.apache.pig.impl.logicalLayer.LogicalPlan plan = lpt.buildPlan("store c into 'empty';");  
-        LogicalPlan newLogicalPlan = migratePlan(plan);
+    	String query = "a =load 'd.txt' as (id, v1, v2);" +
+        "b = filter a by v1>10;" +
+        "c = foreach b generate id;" +        
+        "store c into 'empty';";  
+        LogicalPlan newLogicalPlan = buildPlan(query);
                
         PlanOptimizer optimizer = new MyPlanOptimizer(newLogicalPlan, 3);
         optimizer.optimize();
         
-        lpt = new LogicalPlanTester(pc);
-        lpt.buildPlan("a = load 'd.txt' as (id, v1);");
-        lpt.buildPlan("b = filter a by v1>10;"); 
-        lpt.buildPlan("c = foreach b generate id;");      
-        plan = lpt.buildPlan("store c into 'empty';");  
-        LogicalPlan expected = migratePlan(plan);
+        query = "a =load 'd.txt' as (id, v1);" +
+        "b = filter a by v1>10;" + 
+        "c = foreach b generate id;" +      
+        "store c into 'empty';";  
+        LogicalPlan expected = buildPlan(query);
         
         assertTrue(expected.isEqual(newLogicalPlan));
         
         // join with foreach
-        lpt = new LogicalPlanTester(pc);
-        lpt.buildPlan("a = load 'd.txt' as (id, v1, v2);");
-        lpt.buildPlan("b = load 'd.txt' as (id, v1, v2);");
-        lpt.buildPlan("c = join a by id, b by id;");
-        lpt.buildPlan("d = filter c by a::v1>b::v1;");
-        lpt.buildPlan("e = foreach d generate a::id;");        
-        plan = lpt.buildPlan("store e into 'empty';");  
-        newLogicalPlan = migratePlan(plan);
+        query = "a =load 'd.txt' as (id, v1, v2);" +
+        "b = load 'd.txt' as (id, v1, v2);" +
+        "c = join a by id, b by id;" +
+        "d = filter c by a::v1>b::v1;" +
+        "e = foreach d generate a::id;" +        
+        "store e into 'empty';";  
+        newLogicalPlan = buildPlan(query);
                
         optimizer = new MyPlanOptimizer(newLogicalPlan, 3);
         optimizer.optimize();
         
-        lpt = new LogicalPlanTester(pc);
-        lpt.buildPlan("a = load 'd.txt' as (id, v1);");
-        lpt.buildPlan("b = load 'd.txt' as (id, v1);");
-        lpt.buildPlan("c = join a by id, b by id;");
-        lpt.buildPlan("d = foreach c generate a::id, a::v1, b::v1;");        
-        lpt.buildPlan("e = filter d by a::v1>b::v1;");  
-        lpt.buildPlan("f = foreach e generate a::id;");        
-        plan = lpt.buildPlan("store f into 'empty';");  
-        expected = migratePlan(plan);
+        query = "a =load 'd.txt' as (id, v1);" +
+        "b = load 'd.txt' as (id, v1);" +
+        "c = join a by id, b by id;" +
+        "d = foreach c generate a::id, a::v1, b::v1;" +        
+        "e = filter d by a::v1>b::v1;" +  
+        "f = foreach e generate a::id;" +        
+        "store f into 'empty';";  
+        expected = buildPlan(query);
         
         assertTrue(expected.isEqual(newLogicalPlan));
     }

Modified: pig/branches/branch-0.9/test/org/apache/pig/test/TestNewPlanFilterAboveForeach.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/test/org/apache/pig/test/TestNewPlanFilterAboveForeach.java?rev=1099123&r1=1099122&r2=1099123&view=diff
==============================================================================
--- pig/branches/branch-0.9/test/org/apache/pig/test/TestNewPlanFilterAboveForeach.java (original)
+++ pig/branches/branch-0.9/test/org/apache/pig/test/TestNewPlanFilterAboveForeach.java Tue May  3 16:58:19 2011
@@ -25,12 +25,12 @@ import java.util.Properties;
 import java.util.Set;
 
 import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.builtin.COUNT;
+import org.apache.pig.builtin.SIZE;
 import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.logicalLayer.FrontendException;
-import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.newplan.Operator;
 import org.apache.pig.newplan.OperatorPlan;
-import org.apache.pig.newplan.logical.LogicalPlanMigrationVistor;
 import org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer;
 import org.apache.pig.newplan.logical.relational.LOFilter;
 import org.apache.pig.newplan.logical.relational.LOForEach;
@@ -42,24 +42,21 @@ import org.apache.pig.newplan.logical.ru
 import org.apache.pig.newplan.optimizer.PlanOptimizer;
 import org.apache.pig.newplan.optimizer.PlanTransformListener;
 import org.apache.pig.newplan.optimizer.Rule;
-import org.apache.pig.test.utils.Identity;
-import org.apache.pig.test.utils.LogicalPlanTester;
+import org.apache.pig.test.TestNewPlanPushDownForeachFlatten.MyFilterFunc;
 import org.junit.Assert;
 import org.junit.Test;
 
 
 public class TestNewPlanFilterAboveForeach {
     PigContext pc = new PigContext(ExecType.LOCAL, new Properties());
-    LogicalPlanTester planTester = new LogicalPlanTester(pc) ;
     
     @Test
     public void testSimple() throws Exception {
-        LogicalPlanTester lpt = new LogicalPlanTester( pc );
-        lpt.buildPlan( "A = LOAD 'file.txt' AS (name, cuisines:bag{ t : ( cuisine ) } );" );
-        lpt.buildPlan( "B = FOREACH A GENERATE name, flatten(cuisines);" );
-        lpt.buildPlan( "C = FILTER B BY name == 'joe';" );
-        org.apache.pig.impl.logicalLayer.LogicalPlan plan = lpt.buildPlan( "D = STORE C INTO 'empty';" );  
-        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( plan );
+        String query = "A =LOAD 'file.txt' AS (name, cuisines:bag{ t : ( cuisine ) } );" +
+        "B = FOREACH A GENERATE name, flatten(cuisines);" +
+        "C = FILTER B BY name == 'joe';" +
+        "D = STORE C INTO 'empty';" ;  
+        LogicalPlan newLogicalPlan = buildPlan( query );
 
         Operator load = newLogicalPlan.getSources().get( 0 );
         Assert.assertTrue( load instanceof LOLoad );
@@ -73,13 +70,12 @@ public class TestNewPlanFilterAboveForea
     
     @Test
     public void testMultipleFilter() throws Exception {
-        LogicalPlanTester lpt = new LogicalPlanTester( pc );
-        lpt.buildPlan( "A = LOAD 'file.txt' AS (name, cuisines : bag{ t : ( cuisine ) } );" );
-        lpt.buildPlan( "B = FOREACH A GENERATE name, flatten(cuisines);" );
-        lpt.buildPlan( "C = FILTER B BY $1 == 'french';" );
-        lpt.buildPlan( "D = FILTER C BY name == 'joe';" );
-        org.apache.pig.impl.logicalLayer.LogicalPlan plan = lpt.buildPlan( "E = STORE D INTO 'empty';" );  
-        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( plan );
+        String query = "A =LOAD 'file.txt' AS (name, cuisines : bag{ t : ( cuisine ) } );" +
+        "B = FOREACH A GENERATE name, flatten(cuisines);" +
+        "C = FILTER B BY $1 == 'french';" +
+        "D = FILTER C BY name == 'joe';" +
+        "E = STORE D INTO 'empty';";  
+        LogicalPlan newLogicalPlan = buildPlan( query );
         
         Operator load = newLogicalPlan.getSources().get( 0 );
         Assert.assertTrue( load instanceof LOLoad );
@@ -95,13 +91,12 @@ public class TestNewPlanFilterAboveForea
     
     @Test
     public void testMultipleFilter2() throws Exception {
-        LogicalPlanTester lpt = new LogicalPlanTester( pc );
-        lpt.buildPlan( "A = LOAD 'file.txt' AS (name, age, cuisines : bag{ t : ( cuisine ) } );" );
-        lpt.buildPlan( "B = FOREACH A GENERATE name, age, flatten(cuisines);" );
-        lpt.buildPlan( "C = FILTER B BY name == 'joe';" );
-        lpt.buildPlan( "D = FILTER C BY age == 30;" );
-        org.apache.pig.impl.logicalLayer.LogicalPlan plan = lpt.buildPlan( "E = STORE D INTO 'empty';" );  
-        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( plan );
+        String query = "A =LOAD 'file.txt' AS (name, age, cuisines : bag{ t : ( cuisine ) } );" +
+        "B = FOREACH A GENERATE name, age, flatten(cuisines);" +
+        "C = FILTER B BY name == 'joe';" +
+        "D = FILTER C BY age == 30;" +
+        "E = STORE D INTO 'empty';";  
+        LogicalPlan newLogicalPlan = buildPlan( query );
         
         Operator load = newLogicalPlan.getSources().get( 0 );
         Assert.assertTrue( load instanceof LOLoad );
@@ -117,13 +112,12 @@ public class TestNewPlanFilterAboveForea
     
     @Test
     public void testMultipleFilterNotPossible() throws Exception {
-        LogicalPlanTester lpt = new LogicalPlanTester( pc );
-        lpt.buildPlan( "A = LOAD 'file.txt' AS (name, cuisines : bag{ t : ( cuisine, region ) } );" );
-        lpt.buildPlan( "B = FOREACH A GENERATE name, flatten(cuisines);" );
-        lpt.buildPlan( "C = FILTER B BY $1 == 'French';" );
-        lpt.buildPlan( "D = FILTER C BY $2 == 'Europe';" );
-        org.apache.pig.impl.logicalLayer.LogicalPlan plan = lpt.buildPlan( "E = STORE D INTO 'empty';" );  
-        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( plan );
+        String query = "A =LOAD 'file.txt' AS (name, cuisines : bag{ t : ( cuisine, region ) } );" +
+        "B = FOREACH A GENERATE name, flatten(cuisines);" +
+        "C = FILTER B BY $1 == 'French';" +
+        "D = FILTER C BY $2 == 'Europe';" +
+        "E = STORE D INTO 'empty';";  
+        LogicalPlan newLogicalPlan = buildPlan( query );
 
         Operator load = newLogicalPlan.getSources().get( 0 );
         Assert.assertTrue( load instanceof LOLoad );
@@ -139,12 +133,11 @@ public class TestNewPlanFilterAboveForea
     
     @Test
     public void testNotPossibleFilter() throws Exception {
-        LogicalPlanTester lpt = new LogicalPlanTester( pc );
-        lpt.buildPlan( "A = LOAD 'file.txt' AS (name, cuisines:bag{ t : ( cuisine ) } );" );
-        lpt.buildPlan( "B = FOREACH A GENERATE name, flatten(cuisines);" );
-        lpt.buildPlan( "C = FILTER B BY cuisine == 'French';" );
-        org.apache.pig.impl.logicalLayer.LogicalPlan plan = lpt.buildPlan( "D = STORE C INTO 'empty';" );  
-        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( plan );
+        String query = "A =LOAD 'file.txt' AS (name, cuisines:bag{ t : ( cuisine ) } );" +
+        "B = FOREACH A GENERATE name, flatten(cuisines);" +
+        "C = FILTER B BY cuisine == 'French';" +
+        "D = STORE C INTO 'empty';";  
+        LogicalPlan newLogicalPlan = buildPlan( query );
 
         Operator load = newLogicalPlan.getSources().get( 0 );
         Assert.assertTrue( load instanceof LOLoad );
@@ -158,12 +151,11 @@ public class TestNewPlanFilterAboveForea
     
     @Test
     public void testSimple2() throws Exception {
-        LogicalPlanTester lpt = new LogicalPlanTester( pc );
-        lpt.buildPlan( "A = LOAD 'file.txt' AS (name, cuisines:bag{ t : ( cuisine ) } );" );
-        lpt.buildPlan( "B = FOREACH A GENERATE name, cuisines;" );
-        lpt.buildPlan( "C = FILTER B BY name == 'joe';" );
-        org.apache.pig.impl.logicalLayer.LogicalPlan plan = lpt.buildPlan( "D = STORE C INTO 'empty';" );  
-        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( plan );
+        String query = "A =LOAD 'file.txt' AS (name, cuisines:bag{ t : ( cuisine ) } );" +
+        "B = FOREACH A GENERATE name, cuisines;" +
+        "C = FILTER B BY name == 'joe';" +
+        "D = STORE C INTO 'empty';";  
+        LogicalPlan newLogicalPlan = buildPlan( query );
 
         Operator load = newLogicalPlan.getSources().get( 0 );
         Assert.assertTrue( load instanceof LOLoad );
@@ -178,15 +170,15 @@ public class TestNewPlanFilterAboveForea
     /**
      * Normal test case: all fields from Foreach are used by exhaustive list.
      * Optimization should kick in.
+     * @throws Exception 
      */
     @Test
-    public void test1() throws FrontendException {
-        LogicalPlanTester lpt = new LogicalPlanTester( pc );
-        lpt.buildPlan( "A = LOAD 'file.txt' AS (a(u,v), b, c);" );
-        lpt.buildPlan( "B = FOREACH A GENERATE $0, b;" );
-        lpt.buildPlan( "C = FILTER B BY " + Identity.class.getName() +"($0, $1) > 5;" );
-        org.apache.pig.impl.logicalLayer.LogicalPlan plan = lpt.buildPlan( "STORE C INTO 'empty';" );  
-        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( plan );
+    public void test1() throws Exception {
+        String query = "A =LOAD 'file.txt' AS (a:bag{(u,v)}, b, c);" +
+        "B = FOREACH A GENERATE $0, b;" +
+        "C = FILTER B BY " + COUNT.class.getName() +"($0) > 5;" +
+        "STORE C INTO 'empty';";  
+        LogicalPlan newLogicalPlan = buildPlan( query );
 
         Operator load = newLogicalPlan.getSources().get( 0 );
         Assert.assertTrue( load instanceof LOLoad );
@@ -200,15 +192,15 @@ public class TestNewPlanFilterAboveForea
     
     /**
      * Identical to test1() except that it use project *.
+     * @throws Exception 
      */
     @Test
-    public void test2() throws FrontendException {
-        LogicalPlanTester lpt = new LogicalPlanTester( pc );
-        lpt.buildPlan( "A = LOAD 'file.txt' AS (a(u,v), b, c);" );
-        lpt.buildPlan( "B = FOREACH A GENERATE $0, b;" );
-        lpt.buildPlan( "C = FILTER B BY " + Identity.class.getName() +"(*) > 5;" );
-        org.apache.pig.impl.logicalLayer.LogicalPlan plan = lpt.buildPlan( "STORE C INTO 'empty';" );  
-        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( plan );
+    public void test2() throws Exception {
+        String query = "A =LOAD 'file.txt' AS (a:(u,v), b, c);" +
+        "B = FOREACH A GENERATE $0, b;" +
+        "C = FILTER B BY " + SIZE.class.getName() +"(*) > 5;" +
+        "STORE C INTO 'empty';";  
+        LogicalPlan newLogicalPlan = buildPlan( query );
 
         Operator load = newLogicalPlan.getSources().get( 0 );
         Assert.assertTrue( load instanceof LOLoad );
@@ -222,15 +214,15 @@ public class TestNewPlanFilterAboveForea
 
     /**
      * No fields are used in filter condition at all.
+     * @throws Exception 
      */
     @Test
-    public void test3() throws FrontendException {
-        LogicalPlanTester lpt = new LogicalPlanTester( pc );
-        lpt.buildPlan( "A = LOAD 'file.txt' AS (a(u,v), b, c);" );
-        lpt.buildPlan( "B = FOREACH A GENERATE $0, b;" );
-        lpt.buildPlan( "C = FILTER B BY 8 > 5;" );
-        org.apache.pig.impl.logicalLayer.LogicalPlan plan = lpt.buildPlan( "STORE C INTO 'empty';" );  
-        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( plan );
+    public void test3() throws Exception {
+        String query = "A =LOAD 'file.txt' AS (a:(u,v), b, c);" +
+        "B = FOREACH A GENERATE $0, b;" +
+        "C = FILTER B BY 8 > 5;" +
+        "STORE C INTO 'empty';";  
+        LogicalPlan newLogicalPlan = buildPlan( query );
 
         Operator load = newLogicalPlan.getSources().get( 0 );
         Assert.assertTrue( load instanceof LOLoad );
@@ -245,15 +237,15 @@ public class TestNewPlanFilterAboveForea
     /**
      * Similar to test2, but not all fields are available from the operator before foreach.
      * Optimziation doesn't kick in.
+     * @throws Exception 
      */
     @Test
-    public void test4() throws FrontendException {
-        LogicalPlanTester lpt = new LogicalPlanTester( pc );
-        lpt.buildPlan( "A = LOAD 'file.txt' AS (a(u,v), b, c);" );
-        lpt.buildPlan( "B = FOREACH A GENERATE $0, b, flatten(1);" );
-        lpt.buildPlan( "C = FILTER B BY " + Identity.class.getName() +"(*) > 5;" );
-        org.apache.pig.impl.logicalLayer.LogicalPlan plan = lpt.buildPlan( "STORE C INTO 'empty';" );  
-        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( plan );
+    public void test4() throws Exception {
+        String query = "A =LOAD 'file.txt' AS (a:(u,v), b, c);" +
+        "B = FOREACH A GENERATE $0, b, flatten(1);" +
+        "C = FILTER B BY " + SIZE.class.getName() +"(*) > 5;" +
+        "STORE C INTO 'empty';";  
+        LogicalPlan newLogicalPlan = buildPlan( query );
 
         Operator load = newLogicalPlan.getSources().get( 0 );
         Assert.assertTrue( load instanceof LOLoad );
@@ -267,12 +259,12 @@ public class TestNewPlanFilterAboveForea
 
     @Test
     public void testFilterForeach() throws Exception {
-        planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
-        planTester.buildPlan("B = foreach A generate $1, $2;");        
-        planTester.buildPlan("C = filter B by $0 < 18;");
-        org.apache.pig.impl.logicalLayer.LogicalPlan plan = planTester.buildPlan( "D = STORE C INTO 'empty';" ); 
+        String query = "A =load 'myfile' as (name, age, gpa);" +
+        "B = foreach A generate $1, $2;" +        
+        "C = filter B by $0 < 18;" +
+         "D = STORE C INTO 'empty';"; 
 
-        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( plan );
+        LogicalPlan newLogicalPlan = buildPlan( query );
 
         Operator load = newLogicalPlan.getSources().get( 0 );
         Assert.assertTrue( load instanceof LOLoad );
@@ -288,12 +280,12 @@ public class TestNewPlanFilterAboveForea
 
     @Test
     public void testFilterForeachAddedField() throws Exception {
-        planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
-        planTester.buildPlan("B = foreach A generate $1, $2, COUNT({(1)});");        
-        planTester.buildPlan("C = filter B by $2 < 18;");
-        org.apache.pig.impl.logicalLayer.LogicalPlan plan = planTester.buildPlan( "D = STORE C INTO 'empty';" ); 
+        String query = "A =load 'myfile' as (name, age, gpa);" +
+        "B = foreach A generate $1, $2, COUNT({(1)});" +        
+        "C = filter B by $2 < 18;" +
+         "D = STORE C INTO 'empty';"; 
 
-        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( plan );
+        LogicalPlan newLogicalPlan = buildPlan( query );
 
         Operator load = newLogicalPlan.getSources().get( 0 );
         Assert.assertTrue( load instanceof LOLoad );
@@ -309,12 +301,12 @@ public class TestNewPlanFilterAboveForea
 
     @Test
     public void testFilterForeachCast() throws Exception {
-        planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
-        planTester.buildPlan("B = foreach A generate (int)$1, $2;");        
-        planTester.buildPlan("C = filter B by $0 < 18;");
-        org.apache.pig.impl.logicalLayer.LogicalPlan plan = planTester.buildPlan( "D = STORE C INTO 'empty';" ); 
+        String query = "A =load 'myfile' as (name, age, gpa);" +
+        "B = foreach A generate (int)$1, $2;" +        
+        "C = filter B by $0 < 18;" +
+         "D = STORE C INTO 'empty';"; 
 
-        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( plan );
+        LogicalPlan newLogicalPlan = buildPlan( query );
 
         Operator load = newLogicalPlan.getSources().get( 0 );
         Assert.assertTrue( load instanceof LOLoad );
@@ -330,12 +322,12 @@ public class TestNewPlanFilterAboveForea
 
     @Test
     public void testFilterCastForeach() throws Exception {
-        planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
-        planTester.buildPlan("B = foreach A generate $1, $2;");        
-        planTester.buildPlan("C = filter B by (int)$0 < 18;");
-        org.apache.pig.impl.logicalLayer.LogicalPlan plan = planTester.buildPlan( "D = STORE C INTO 'empty';" ); 
+        String query = "A =load 'myfile' as (name, age, gpa);" +
+        "B = foreach A generate $1, $2;" +        
+        "C = filter B by (int)$0 < 18;" +
+         "D = STORE C INTO 'empty';"; 
 
-        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( plan );
+        LogicalPlan newLogicalPlan = buildPlan( query );
 
         Operator load = newLogicalPlan.getSources().get( 0 );
         Assert.assertTrue( load instanceof LOLoad );
@@ -352,12 +344,12 @@ public class TestNewPlanFilterAboveForea
 
     @Test
     public void testFilterConstantConditionForeach() throws Exception {
-        planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
-        planTester.buildPlan("B = foreach A generate $1, $2;");        
-        planTester.buildPlan("C = filter B by 1 == 1;");
-        org.apache.pig.impl.logicalLayer.LogicalPlan plan = planTester.buildPlan( "D = STORE C INTO 'empty';" ); 
+        String query = "A =load 'myfile' as (name, age, gpa);" +
+        "B = foreach A generate $1, $2;" +        
+        "C = filter B by 1 == 1;" +
+         "D = STORE C INTO 'empty';"; 
 
-        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( plan );
+        LogicalPlan newLogicalPlan = buildPlan( query );
 
         Operator load = newLogicalPlan.getSources().get( 0 );
         Assert.assertTrue( load instanceof LOLoad );
@@ -373,12 +365,12 @@ public class TestNewPlanFilterAboveForea
 
     @Test
     public void testFilterUDFForeach() throws Exception {
-        planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
-        planTester.buildPlan("B = foreach A generate $1, $2;");        
-        planTester.buildPlan("C = filter B by " + Identity.class.getName() + "($1) ;");
-        org.apache.pig.impl.logicalLayer.LogicalPlan plan = planTester.buildPlan( "D = STORE C INTO 'empty';" ); 
+        String query = "A =load 'myfile' as (name, age, gpa);" +
+        "B = foreach A generate $1, $2;" +        
+        "C = filter B by " + MyFilterFunc.class.getName() + "($1) ;" +
+         "D = STORE C INTO 'empty';"; 
 
-        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( plan );
+        LogicalPlan newLogicalPlan = buildPlan( query );
 
         Operator load = newLogicalPlan.getSources().get( 0 );
         Assert.assertTrue( load instanceof LOLoad );
@@ -394,12 +386,12 @@ public class TestNewPlanFilterAboveForea
     
     @Test
     public void testFilterForeachFlatten() throws Exception {
-        planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
-        planTester.buildPlan("B = foreach A generate $1, flatten($2);");        
-        planTester.buildPlan("C = filter B by $0 < 18;");
-        org.apache.pig.impl.logicalLayer.LogicalPlan plan = planTester.buildPlan( "D = STORE C INTO 'empty';" ); 
+        String query = "A =load 'myfile' as (name, age, gpa);" +
+        "B = foreach A generate $1, flatten($2);" +        
+        "C = filter B by $0 < 18;" +
+         "D = STORE C INTO 'empty';"; 
 
-        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( plan );
+        LogicalPlan newLogicalPlan = buildPlan( query );
 
         Operator load = newLogicalPlan.getSources().get( 0 );
         Assert.assertTrue( load instanceof LOLoad );
@@ -416,14 +408,14 @@ public class TestNewPlanFilterAboveForea
     // See PIG-1669
     @Test
     public void testPushUpFilterWithScalar() throws Exception {
-        planTester.buildPlan("a = load 'studenttab10k' as (name, age, gpa);");
-        planTester.buildPlan("b = group a all;");
-        planTester.buildPlan("c = foreach b generate AVG(a.age) as age;");
-        planTester.buildPlan("d = foreach a generate name, age;");
-        planTester.buildPlan("e = filter d by age > c.age;");
-        org.apache.pig.impl.logicalLayer.LogicalPlan plan = planTester.buildPlan("f = store e into 'empty';");
+        String query = "a = load 'studenttab10k' as (name, age, gpa);" +
+        "b = group a all;" +
+        "c = foreach b generate AVG(a.age) as age;" +
+        "d = foreach a generate name, age;" +
+        "e = filter d by age > c.age;" +
+        "f = store e into 'empty';";
 
-        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( plan );
+        LogicalPlan newLogicalPlan = buildPlan( query );
 
         Operator store = newLogicalPlan.getSinks().get( 0 );
         Operator foreach = newLogicalPlan.getPredecessors(store).get(0);
@@ -433,32 +425,26 @@ public class TestNewPlanFilterAboveForea
     // See PIG-1935
     @Test
     public void testPushUpFilterAboveBinCond() throws Exception {
-        planTester.buildPlan("data = LOAD 'data.txt' as (referrer:chararray, canonical_url:chararray, ip:chararray);");
-        planTester.buildPlan("best_url = FOREACH data GENERATE ((canonical_url != '' and canonical_url is not null) ? canonical_url : referrer) AS url, ip;");
-        planTester.buildPlan("filtered = FILTER best_url BY url == 'badsite.com';");
-        org.apache.pig.impl.logicalLayer.LogicalPlan plan = planTester.buildPlan("store filtered into 'empty';");
+        String query = "data = LOAD 'data.txt' as (referrer:chararray, canonical_url:chararray, ip:chararray);" +
+        "best_url = FOREACH data GENERATE ((canonical_url != '' and canonical_url is not null) ? canonical_url : referrer) AS url, ip;" +
+        "filtered = FILTER best_url BY url == 'badsite.com';" +
+        "store filtered into 'empty';";
 
-        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( plan );
+        LogicalPlan newLogicalPlan = buildPlan( query );
 
         Operator store = newLogicalPlan.getSinks().get( 0 );
         Operator filter = newLogicalPlan.getPredecessors(store).get(0);
         Assert.assertTrue( filter instanceof LOFilter );
     }
 
-    private LogicalPlan migrateAndOptimizePlan(org.apache.pig.impl.logicalLayer.LogicalPlan plan) throws FrontendException {
-        LogicalPlan newLogicalPlan = migratePlan( plan );
+    private LogicalPlan buildPlan(String query) throws Exception {
+        PigServer pigServer = new PigServer( pc );
+        LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);
         PlanOptimizer optimizer = new MyPlanOptimizer( newLogicalPlan, 3 );
         optimizer.optimize();
         return newLogicalPlan;
     }
     
-    private LogicalPlan migratePlan(org.apache.pig.impl.logicalLayer.LogicalPlan lp) throws VisitorException{
-        LogicalPlanMigrationVistor visitor = new LogicalPlanMigrationVistor(lp);        
-        visitor.visit();
-        org.apache.pig.newplan.logical.relational.LogicalPlan newPlan = visitor.getNewLogicalPlan();
-        return newPlan;
-    }
-
     public class MyPlanOptimizer extends LogicalPlanOptimizer {
         protected MyPlanOptimizer(OperatorPlan p,  int iterations) {
             super(p, iterations, new HashSet<String>());



Mime
View raw message