pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From da...@apache.org
Subject svn commit: r982345 [11/13] - in /hadoop/pig/trunk: ./ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/ src/org/apache/pig/experimental/ src/org/apache/pig/newplan/ src/org/apache/pig/newplan/logical/ src/org/apache/pig/newplan/lo...
Date Wed, 04 Aug 2010 17:46:48 GMT
Added: hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanFilterAboveForeach.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanFilterAboveForeach.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanFilterAboveForeach.java (added)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanFilterAboveForeach.java Wed Aug  4 17:46:42 2010
@@ -0,0 +1,995 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.test;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.pig.data.DataType;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.logical.expression.ConstantExpression;
+import org.apache.pig.newplan.logical.expression.EqualExpression;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.newplan.logical.expression.ProjectExpression;
+import org.apache.pig.newplan.logical.optimizer.ProjectionPatcher;
+import org.apache.pig.newplan.logical.optimizer.SchemaPatcher;
+import org.apache.pig.newplan.logical.relational.LOFilter;
+import org.apache.pig.newplan.logical.relational.LOForEach;
+import org.apache.pig.newplan.logical.relational.LOGenerate;
+import org.apache.pig.newplan.logical.relational.LOInnerLoad;
+import org.apache.pig.newplan.logical.relational.LOLoad;
+import org.apache.pig.newplan.logical.relational.LOStore;
+import org.apache.pig.newplan.logical.relational.LogicalPlan;
+import org.apache.pig.newplan.logical.relational.LogicalSchema;
+import org.apache.pig.newplan.logical.relational.LogicalSchema.LogicalFieldSchema;
+import org.apache.pig.newplan.logical.rules.FilterAboveForeach;
+import org.apache.pig.newplan.optimizer.PlanOptimizer;
+import org.apache.pig.newplan.optimizer.PlanTransformListener;
+import org.apache.pig.newplan.optimizer.Rule;
+
+import junit.framework.TestCase;
+
+public class TestNewPlanFilterAboveForeach extends TestCase {
+    
+    public void testSimple() throws Exception {
+        
+        // Plan here is 
+        // Load (name, cuisines{t:(name)}) -> foreach gen name,flatten(cuisines) 
+        // -> filter name == 'joe' --> stor
+        
+        LogicalPlan plan = null;
+        LOLoad load = null;
+        LOForEach foreach = null;
+        LOFilter filter = null;
+        LOStore stor = null;
+        
+        plan = new LogicalPlan();
+        
+        LogicalSchema schema = new LogicalSchema();
+        schema.addField(new LogicalSchema.LogicalFieldSchema("name", null, DataType.CHARARRAY));
+        LogicalSchema bagSchema = new LogicalSchema();
+        LogicalSchema bagTupleSchema = new LogicalSchema();
+        bagTupleSchema.addField( new LogicalSchema.LogicalFieldSchema("name", null, DataType.CHARARRAY) );
+        bagSchema.addField( new LogicalSchema.LogicalFieldSchema( "t", bagTupleSchema, DataType.TUPLE ) );
+        schema.addField(new LogicalSchema.LogicalFieldSchema("cuisines", bagSchema, DataType.BAG));
+        
+        load = new LOLoad(null, schema, plan, null);
+        load.setAlias("A");
+        plan.add(load);
+        
+        foreach = new LOForEach(plan);
+        
+        LogicalPlan innerPlan = new LogicalPlan();
+        List<LogicalExpressionPlan> expPlans = new ArrayList<LogicalExpressionPlan>();
+        
+        boolean flatten[] = new boolean[2];
+        flatten[0] = false;
+        flatten[1] = true;
+        
+        LOGenerate generate = new LOGenerate(innerPlan, expPlans, flatten);        
+        
+        LOInnerLoad innerLoad = new LOInnerLoad(innerPlan, foreach, 0);
+        innerPlan.add(innerLoad);
+        
+        LOInnerLoad innerLoad2 = new LOInnerLoad(innerPlan, foreach, 1);
+        innerPlan.add(innerLoad2);
+        
+        LogicalExpressionPlan namePrj = new LogicalExpressionPlan();        
+        ProjectExpression prjName = new ProjectExpression(namePrj, 0, 0, generate);
+        namePrj.add(prjName);
+        
+        LogicalExpressionPlan cuisinesPrj = new LogicalExpressionPlan();
+        ProjectExpression prjCuisines = new ProjectExpression(cuisinesPrj, 1, 0, generate);
+        cuisinesPrj.add(prjCuisines);
+        
+        
+        expPlans.add(namePrj);
+        expPlans.add(cuisinesPrj);
+        
+        innerPlan.add(generate);
+        innerPlan.connect(innerLoad, generate);
+        innerPlan.connect(innerLoad2, generate);
+        
+        foreach.setInnerPlan(innerPlan);
+        foreach.setAlias("B");
+        plan.add(foreach);
+        
+        plan.connect(load, foreach);
+        
+        filter = new LOFilter(plan);
+        LogicalExpressionPlan filterPlan = new LogicalExpressionPlan();
+        ProjectExpression namePrj2 = new ProjectExpression(filterPlan, 0, 0, filter);
+        filterPlan.add(namePrj2);
+        ConstantExpression constExp = new ConstantExpression(filterPlan, "joe", 
+                new LogicalFieldSchema(null, null, DataType.CHARARRAY));
+        filterPlan.add(constExp);
+        EqualExpression equal = new EqualExpression(filterPlan, namePrj2, constExp);
+        filterPlan.add(equal);
+        
+        filter.setFilterPlan(filterPlan);
+        filter.setAlias("C");
+        plan.add(filter);
+        
+        plan.connect(foreach, filter);
+        
+        stor = new LOStore(plan);
+        stor.setAlias("D");
+        plan.add(stor);
+        plan.connect(filter,stor);
+        
+        // run filter rule
+        Rule r = new FilterAboveForeach("FilterAboveFlatten");
+        Set<Rule> s = new HashSet<Rule>();
+        s.add(r);
+        List<Set<Rule>> ls = new ArrayList<Set<Rule>>();
+        ls.add(s);
+        
+        // Test Plan before optimizing
+        List<Operator> list = plan.getSinks();
+        assertTrue( list.contains(stor) );
+        
+        list = plan.getSources();
+        assertTrue( list.contains(load) );
+        
+        assertTrue( plan.getPredecessors(stor).contains(filter) ); 
+        assertEquals( 1, plan.getPredecessors(stor).size() );
+        
+        assertTrue( plan.getPredecessors(filter).contains(foreach) );
+        assertEquals( 1, plan.getPredecessors(filter).size() );
+        
+        assertTrue( foreach.getInnerPlan().getSinks().contains(generate) );
+        assertEquals( 1, foreach.getInnerPlan().getSinks().size() );
+        assertTrue( foreach.getInnerPlan().getSources().contains(innerLoad) );
+        assertTrue( foreach.getInnerPlan().getSources().contains(innerLoad2) );
+        assertEquals( 2, foreach.getInnerPlan().getSources().size() );
+        
+        // Run the optimizer
+        MyPlanOptimizer optimizer = new MyPlanOptimizer(plan, ls, 3);
+        optimizer.addPlanTransformListener(new SchemaPatcher());
+        optimizer.addPlanTransformListener(new ProjectionPatcher());
+        optimizer.optimize();
+        
+        // Test after optimization
+        list = plan.getSinks();
+        assertTrue( list.contains(stor) );
+        
+        list = plan.getSources();
+        assertTrue( list.contains(load) );
+        
+        assertTrue( plan.getPredecessors(stor).contains(foreach) ); 
+        assertEquals( 1, plan.getPredecessors(stor).size() );
+        
+        assertTrue( plan.getPredecessors(filter).contains(load) );
+        assertEquals( 1, plan.getPredecessors(filter).size() );
+        assertEquals( load.getSchema().getField(0).uid, namePrj2.getFieldSchema().uid );
+        assertEquals( namePrj2.getFieldSchema().uid, prjName.getFieldSchema().uid );
+        
+        assertTrue( plan.getPredecessors(foreach).contains(filter) );
+        assertEquals( 1, plan.getPredecessors(foreach).size() );
+        
+        assertTrue( foreach.getInnerPlan().getSinks().contains(generate) );
+        assertEquals( 1, foreach.getInnerPlan().getSinks().size() );
+        assertTrue( foreach.getInnerPlan().getSources().contains(innerLoad) );
+        assertTrue( foreach.getInnerPlan().getSources().contains(innerLoad2) );
+        assertEquals( 2, foreach.getInnerPlan().getSources().size() );
+    }
+    
+    public void testMultipleFilter() throws Exception {
+        
+        // Plan here is 
+        // Load (name, cuisines{t:(name)}) -> foreach gen name,flatten(cuisines) 
+        // -> filter $1 == 'joe' --> filter name == 'joe' --> stor
+        
+        LogicalPlan plan = null;
+        LOLoad load = null;
+        LOForEach foreach = null;
+        LOFilter filter = null;
+        LOStore stor = null;
+        
+        plan = new LogicalPlan();
+        
+        LogicalSchema schema = new LogicalSchema();
+        schema.addField(new LogicalSchema.LogicalFieldSchema("name", null, DataType.CHARARRAY));
+        LogicalSchema bagSchema = new LogicalSchema();
+        LogicalSchema bagTupleSchema = new LogicalSchema();
+        bagTupleSchema.addField( new LogicalSchema.LogicalFieldSchema("name", null, DataType.CHARARRAY) );
+        bagSchema.addField( new LogicalSchema.LogicalFieldSchema( "t", bagTupleSchema, DataType.TUPLE ) );
+        schema.addField(new LogicalSchema.LogicalFieldSchema("cuisines", bagSchema, DataType.BAG));
+        
+        load = new LOLoad(null, schema, plan, null);
+        load.setAlias("A");
+        plan.add(load);
+        
+        foreach = new LOForEach(plan);
+        
+        LogicalPlan innerPlan = new LogicalPlan();
+        List<LogicalExpressionPlan> expPlans = new ArrayList<LogicalExpressionPlan>();
+        
+        boolean flatten[] = new boolean[2];
+        flatten[0] = false;
+        flatten[1] = true;
+        
+        LOGenerate generate = new LOGenerate(innerPlan, expPlans, flatten);        
+        
+        LOInnerLoad innerLoad = new LOInnerLoad(innerPlan, foreach, 0);
+        innerPlan.add(innerLoad);
+        
+        LOInnerLoad innerLoad2 = new LOInnerLoad(innerPlan, foreach, 1);
+        innerPlan.add(innerLoad2);
+        
+        LogicalExpressionPlan namePrj = new LogicalExpressionPlan();        
+        ProjectExpression prjName = new ProjectExpression(namePrj, 0, 0, generate);
+        namePrj.add(prjName);
+        
+        LogicalExpressionPlan cuisinesPrj = new LogicalExpressionPlan();
+        ProjectExpression prjCuisines = new ProjectExpression(cuisinesPrj, 1, 0, generate);
+        cuisinesPrj.add(prjCuisines);
+        
+        
+        expPlans.add(namePrj);
+        expPlans.add(cuisinesPrj);
+        
+        innerPlan.add(generate);
+        innerPlan.connect(innerLoad, generate);
+        innerPlan.connect(innerLoad2, generate);
+        
+        foreach.setInnerPlan(innerPlan);
+        foreach.setAlias("B");
+        plan.add(foreach);
+        
+        plan.connect(load, foreach);
+        
+        filter = new LOFilter(plan);
+        LogicalExpressionPlan filterPlan = new LogicalExpressionPlan();
+        ProjectExpression namePrj2 = new ProjectExpression(filterPlan, 0, 0, filter);
+        filterPlan.add(namePrj2);
+        ConstantExpression constExp = new ConstantExpression(filterPlan, "joe", 
+            new LogicalFieldSchema(null, null, DataType.CHARARRAY));
+        filterPlan.add(constExp);
+        EqualExpression equal = new EqualExpression(filterPlan, namePrj2, constExp);
+        filterPlan.add(equal);
+        
+        filter.setFilterPlan(filterPlan);
+        filter.setAlias("C");
+        plan.add(filter);
+        
+        LOFilter filter2 = new LOFilter(plan);
+        LogicalExpressionPlan filter2Plan = new LogicalExpressionPlan();
+        ProjectExpression name2Prj2 = new ProjectExpression(filter2Plan, 0, 1, filter2);
+        filter2Plan.add(name2Prj2);
+        ConstantExpression const2Exp = new ConstantExpression(filter2Plan, "joe", 
+                new LogicalFieldSchema(null, null, DataType.CHARARRAY));
+        filter2Plan.add(const2Exp);
+        EqualExpression equal2 = new EqualExpression(filter2Plan, namePrj2, constExp);
+        filter2Plan.add(equal2);
+        
+        filter2.setFilterPlan(filter2Plan);
+        filter2.setAlias("C1");
+        plan.add(filter2);
+        
+        plan.connect(foreach, filter2);
+        plan.connect(filter2, filter);
+        
+        stor = new LOStore(plan);
+        stor.setAlias("D");
+        plan.add(stor);
+        plan.connect(filter,stor);
+        
+        // run filter rule
+        Rule r = new FilterAboveForeach("FilterAboveFlatten");
+        Set<Rule> s = new HashSet<Rule>();
+        s.add(r);
+        List<Set<Rule>> ls = new ArrayList<Set<Rule>>();
+        ls.add(s);
+        
+        // Test Plan before optimizing
+        List<Operator> list = plan.getSinks();
+        assertTrue( list.contains(stor) );
+        
+        list = plan.getSources();
+        assertTrue( list.contains(load) );
+        
+        assertTrue( plan.getPredecessors(stor).contains(filter) ); 
+        assertEquals( 1, plan.getPredecessors(stor).size() );
+        
+        assertTrue( plan.getPredecessors(filter2).contains(foreach) );
+        assertEquals( 1, plan.getPredecessors(filter2).size() );
+        
+        assertTrue( plan.getPredecessors(foreach).contains(load) );
+        assertEquals( 1, plan.getPredecessors(foreach).size() );
+        
+        assertTrue( plan.getPredecessors(filter).contains(filter2) );
+        assertEquals( 1, plan.getPredecessors(filter).size() );
+        
+        assertTrue( foreach.getInnerPlan().getSinks().contains(generate) );
+        assertEquals( 1, foreach.getInnerPlan().getSinks().size() );
+        assertTrue( foreach.getInnerPlan().getSources().contains(innerLoad) );
+        assertTrue( foreach.getInnerPlan().getSources().contains(innerLoad2) );
+        assertEquals( 2, foreach.getInnerPlan().getSources().size() );
+        
+        // Run the optimizer
+        MyPlanOptimizer optimizer = new MyPlanOptimizer(plan, ls, 3);
+        optimizer.addPlanTransformListener(new SchemaPatcher());
+        optimizer.addPlanTransformListener(new ProjectionPatcher());
+        optimizer.optimize();
+        
+        // Test after optimization
+        list = plan.getSinks();
+        assertTrue( list.contains(stor) );
+        
+        list = plan.getSources();
+        assertTrue( list.contains(load) );
+        
+        assertTrue( plan.getPredecessors(stor).contains(filter2) ); 
+        assertEquals( 1, plan.getPredecessors(stor).size() );
+        
+        assertTrue( plan.getPredecessors(filter2).contains(foreach) );
+        assertEquals( 1, plan.getPredecessors(filter2).size() );
+        
+        assertTrue( plan.getPredecessors(foreach).contains(filter) );
+        assertEquals( 1, plan.getPredecessors(foreach).size() );
+        
+        assertTrue( plan.getPredecessors(filter).contains(load) );
+        assertEquals( 1, plan.getPredecessors(filter).size() );
+        assertEquals( load.getSchema().getField(0).uid, namePrj2.getFieldSchema().uid );
+        assertEquals( namePrj2.getFieldSchema().uid, prjName.getFieldSchema().uid );
+        
+        assertTrue( foreach.getInnerPlan().getSinks().contains(generate) );
+        assertEquals( 1, foreach.getInnerPlan().getSinks().size() );
+        assertTrue( foreach.getInnerPlan().getSources().contains(innerLoad) );
+        assertTrue( foreach.getInnerPlan().getSources().contains(innerLoad2) );
+        assertEquals( 2, foreach.getInnerPlan().getSources().size() );
+        
+    }
+    
+    public void testMultipleFilter2() throws Exception {
+        
+        // Plan here is 
+        // Load (name, cuisines{t:(name)}) -> foreach gen name,cuisines 
+        // -> filter name == 'joe2' --> filter name == 'joe' --> stor
+        
+        LogicalPlan plan = null;
+        LOLoad load = null;
+        LOForEach foreach = null;
+        LOFilter filter = null;
+        LOStore stor = null;
+        
+        plan = new LogicalPlan();
+        
+        LogicalSchema schema = new LogicalSchema();
+        schema.addField(new LogicalSchema.LogicalFieldSchema("name", null, DataType.CHARARRAY));
+        LogicalSchema bagSchema = new LogicalSchema();
+        LogicalSchema bagTupleSchema = new LogicalSchema();
+        bagTupleSchema.addField( new LogicalSchema.LogicalFieldSchema("name", null, DataType.CHARARRAY) );
+        bagSchema.addField( new LogicalSchema.LogicalFieldSchema( "t", bagTupleSchema, DataType.TUPLE ) );
+        schema.addField(new LogicalSchema.LogicalFieldSchema("cuisines", bagSchema, DataType.BAG));
+        
+        load = new LOLoad(null, schema, plan, null);
+        load.setAlias("A");
+        plan.add(load);
+        
+        foreach = new LOForEach(plan);
+        
+        LogicalPlan innerPlan = new LogicalPlan();
+        
+        List<LogicalExpressionPlan> expPlans = new ArrayList<LogicalExpressionPlan>();
+        
+        boolean flatten[] = new boolean[2];
+        flatten[0] = false;
+        flatten[1] = true;
+        
+        LOGenerate generate = new LOGenerate(innerPlan, expPlans, flatten);
+        LOInnerLoad innerLoad = new LOInnerLoad(innerPlan, foreach, 0);
+        innerPlan.add(innerLoad);
+        
+        LOInnerLoad innerLoad2 = new LOInnerLoad(innerPlan, foreach, 1);
+        innerPlan.add(innerLoad2);
+        
+        LogicalExpressionPlan namePrj = new LogicalExpressionPlan();        
+        ProjectExpression prjName = new ProjectExpression(namePrj, 0, 0, generate);
+        namePrj.add(prjName);
+        
+        LogicalExpressionPlan cuisinesPrj = new LogicalExpressionPlan();
+        ProjectExpression prjCuisines = new ProjectExpression(cuisinesPrj, 1, 0, generate);
+        cuisinesPrj.add(prjCuisines);
+        
+        
+        expPlans.add(namePrj);
+        expPlans.add(cuisinesPrj);
+        
+        
+        innerPlan.add(generate);
+        innerPlan.connect(innerLoad, generate);
+        innerPlan.connect(innerLoad2, generate);
+        
+        foreach.setInnerPlan(innerPlan);
+        foreach.setAlias("B");
+        plan.add(foreach);
+        
+        plan.connect(load, foreach);
+        
+        filter = new LOFilter(plan);
+        LogicalExpressionPlan filterPlan = new LogicalExpressionPlan();
+        ProjectExpression namePrj2 = new ProjectExpression(filterPlan, 0, 0, filter);
+        filterPlan.add(namePrj2);
+        ConstantExpression constExp = new ConstantExpression(filterPlan, "joe", 
+                new LogicalFieldSchema(null, null, DataType.CHARARRAY));
+        filterPlan.add(constExp);
+        EqualExpression equal = new EqualExpression(filterPlan, namePrj2, constExp);
+        filterPlan.add(equal);
+        
+        filter.setFilterPlan(filterPlan);
+        filter.setAlias("C");
+        plan.add(filter);
+        
+        LOFilter filter2 = new LOFilter(plan);
+        LogicalExpressionPlan filter2Plan = new LogicalExpressionPlan();
+        ProjectExpression name2Prj2 = new ProjectExpression(filter2Plan, 0, 0, filter2);
+        filter2Plan.add(name2Prj2);
+        ConstantExpression const2Exp = new ConstantExpression(filter2Plan, "joe2", 
+                new LogicalFieldSchema(null, null, DataType.CHARARRAY));
+        filter2Plan.add(const2Exp);
+        EqualExpression equal2 = new EqualExpression(filter2Plan, namePrj2, constExp);
+        filter2Plan.add(equal2);
+        
+        filter2.setFilterPlan(filter2Plan);
+        filter2.setAlias("C1");
+        plan.add(filter2);
+        
+        plan.connect(foreach, filter2);
+        plan.connect(filter2, filter);
+        
+        stor = new LOStore(plan);
+        stor.setAlias("D");
+        plan.add(stor);
+        plan.connect(filter,stor);
+        
+        // run filter rule
+        Rule r = new FilterAboveForeach("FilterAboveFlatten");
+        Set<Rule> s = new HashSet<Rule>();
+        s.add(r);
+        List<Set<Rule>> ls = new ArrayList<Set<Rule>>();
+        ls.add(s);
+        
+        // Test Plan before optimizing
+        List<Operator> list = plan.getSinks();
+        assertTrue( list.contains(stor) );
+        
+        list = plan.getSources();
+        assertTrue( list.contains(load) );
+        
+        assertTrue( plan.getPredecessors(stor).contains(filter) ); 
+        assertEquals( 1, plan.getPredecessors(stor).size() );
+        
+        assertTrue( plan.getPredecessors(filter2).contains(foreach) );
+        assertEquals( 1, plan.getPredecessors(filter2).size() );
+        
+        assertTrue( plan.getPredecessors(foreach).contains(load) );
+        assertEquals( 1, plan.getPredecessors(foreach).size() );
+        
+        assertTrue( plan.getPredecessors(filter).contains(filter2) );
+        assertEquals( 1, plan.getPredecessors(filter).size() );
+        
+        assertTrue( foreach.getInnerPlan().getSinks().contains(generate) );
+        assertEquals( 1, foreach.getInnerPlan().getSinks().size() );
+        assertTrue( foreach.getInnerPlan().getSources().contains(innerLoad) );
+        assertTrue( foreach.getInnerPlan().getSources().contains(innerLoad2) );
+        assertEquals( 2, foreach.getInnerPlan().getSources().size() );
+        
+        // Run the optimizer
+        MyPlanOptimizer optimizer = new MyPlanOptimizer(plan, ls, 3);
+        optimizer.addPlanTransformListener(new SchemaPatcher());
+        optimizer.addPlanTransformListener(new ProjectionPatcher());
+        optimizer.optimize();
+        
+        // Test after optimization
+        list = plan.getSinks();
+        assertTrue( list.contains(stor) );
+        
+        list = plan.getSources();
+        assertTrue( list.contains(load) );
+        
+        assertTrue( plan.getPredecessors(stor).contains(foreach) ); 
+        assertEquals( 1, plan.getPredecessors(stor).size() );
+        
+        assertTrue( plan.getPredecessors(filter2).contains(load) );
+        assertEquals( 1, plan.getPredecessors(filter2).size() );
+        
+        assertTrue( plan.getPredecessors(foreach).contains(filter) );
+        assertEquals( 1, plan.getPredecessors(foreach).size() );
+        
+        assertTrue( plan.getPredecessors(filter).contains(filter2) );
+        assertEquals( 1, plan.getPredecessors(filter).size() );
+        
+        assertEquals( load.getSchema().getField(0).uid, namePrj2.getFieldSchema().uid );
+        assertEquals( namePrj2.getFieldSchema().uid, name2Prj2.getFieldSchema().uid );
+        assertEquals( name2Prj2.getFieldSchema().uid, prjName.getFieldSchema().uid );
+        
+        assertTrue( foreach.getInnerPlan().getSinks().contains(generate) );
+        assertEquals( 1, foreach.getInnerPlan().getSinks().size() );
+        assertTrue( foreach.getInnerPlan().getSources().contains(innerLoad) );
+        assertTrue( foreach.getInnerPlan().getSources().contains(innerLoad2) );
+        assertEquals( 2, foreach.getInnerPlan().getSources().size() );        
+    }
+    
+public void testMultipleFilterNotPossible() throws Exception {
+        
+        // Plan here is 
+        // Load (name, cuisines{t:(name)}) -> foreach gen name,cuisines 
+        // -> filter $1 == 'joe2' --> filter $1 == 'joe' --> stor
+        
+        LogicalPlan plan = null;
+        LOLoad load = null;
+        LOForEach foreach = null;
+        LOFilter filter = null;
+        LOStore stor = null;
+        
+        plan = new LogicalPlan();
+        
+        LogicalSchema schema = new LogicalSchema();
+        schema.addField(new LogicalSchema.LogicalFieldSchema("name", null, DataType.CHARARRAY));
+        LogicalSchema bagSchema = new LogicalSchema();
+        LogicalSchema bagTupleSchema = new LogicalSchema();
+        bagTupleSchema.addField( new LogicalSchema.LogicalFieldSchema("name", null, DataType.CHARARRAY) );
+        bagSchema.addField( new LogicalSchema.LogicalFieldSchema( "t", bagTupleSchema, DataType.TUPLE ) );
+        schema.addField(new LogicalSchema.LogicalFieldSchema("cuisines", bagSchema, DataType.BAG));
+        
+        load = new LOLoad(null, schema, plan, null);
+        load.setAlias("A");
+        plan.add(load);
+        
+        foreach = new LOForEach(plan);
+        
+        LogicalPlan innerPlan = new LogicalPlan();
+        List<LogicalExpressionPlan> expPlans = new ArrayList<LogicalExpressionPlan>();
+        
+        boolean flatten[] = new boolean[2];
+        flatten[0] = false;
+        flatten[1] = true;
+        
+        LOGenerate generate = new LOGenerate(innerPlan, expPlans, flatten);        
+        
+        
+        LOInnerLoad innerLoad = new LOInnerLoad(innerPlan, foreach, 0);
+        innerPlan.add(innerLoad);
+        
+        LOInnerLoad innerLoad2 = new LOInnerLoad(innerPlan, foreach, 1);
+        innerPlan.add(innerLoad2);
+        
+        LogicalExpressionPlan namePrj = new LogicalExpressionPlan();        
+        ProjectExpression prjName = new ProjectExpression(namePrj, 0, 0, generate);
+        namePrj.add(prjName);
+        
+        LogicalExpressionPlan cuisinesPrj = new LogicalExpressionPlan();
+        ProjectExpression prjCuisines = new ProjectExpression(cuisinesPrj, 1, 0, generate);
+        cuisinesPrj.add(prjCuisines);
+        
+        expPlans.add(namePrj);
+        expPlans.add(cuisinesPrj);
+        
+        innerPlan.add(generate);
+        innerPlan.connect(innerLoad, generate);
+        innerPlan.connect(innerLoad2, generate);
+        
+        foreach.setInnerPlan(innerPlan);
+        foreach.setAlias("B");
+        plan.add(foreach);
+        
+        plan.connect(load, foreach);
+        
+        filter = new LOFilter(plan);
+        LogicalExpressionPlan filterPlan = new LogicalExpressionPlan();
+        ProjectExpression namePrj2 = new ProjectExpression(filterPlan, 0, 1, filter);
+        filterPlan.add(namePrj2);
+        ConstantExpression constExp = new ConstantExpression(filterPlan, "joe", 
+                new LogicalFieldSchema(null, null, DataType.CHARARRAY));
+        filterPlan.add(constExp);
+        EqualExpression equal = new EqualExpression(filterPlan, namePrj2, constExp);
+        filterPlan.add(equal);
+        
+        filter.setFilterPlan(filterPlan);
+        filter.setAlias("C");
+        plan.add(filter);
+        
+        LOFilter filter2 = new LOFilter(plan);
+        LogicalExpressionPlan filter2Plan = new LogicalExpressionPlan();
+        ProjectExpression name2Prj2 = new ProjectExpression(filter2Plan, 0, 1, filter2);
+        filter2Plan.add(name2Prj2);
+        ConstantExpression const2Exp = new ConstantExpression(filter2Plan, "joe2", 
+                new LogicalFieldSchema(null, null, DataType.CHARARRAY));
+        filter2Plan.add(const2Exp);
+        EqualExpression equal2 = new EqualExpression(filter2Plan, namePrj2, constExp);
+        filter2Plan.add(equal2);
+        
+        filter2.setFilterPlan(filter2Plan);
+        filter2.setAlias("C1");
+        plan.add(filter2);
+        
+        plan.connect(foreach, filter2);
+        plan.connect(filter2, filter);
+        
+        stor = new LOStore(plan);
+        stor.setAlias("D");
+        plan.add(stor);
+        plan.connect(filter,stor);
+        
+        // run filter rule
+        Rule r = new FilterAboveForeach("FilterAboveFlatten");
+        Set<Rule> s = new HashSet<Rule>();
+        s.add(r);
+        List<Set<Rule>> ls = new ArrayList<Set<Rule>>();
+        ls.add(s);
+        
+        // Test Plan before optimizing
+        List<Operator> list = plan.getSinks();
+        assertTrue( list.contains(stor) );
+        
+        list = plan.getSources();
+        assertTrue( list.contains(load) );
+        
+        assertTrue( plan.getPredecessors(stor).contains(filter) ); 
+        assertEquals( 1, plan.getPredecessors(stor).size() );
+        
+        assertTrue( plan.getPredecessors(filter2).contains(foreach) );
+        assertEquals( 1, plan.getPredecessors(filter2).size() );
+        
+        assertTrue( plan.getPredecessors(foreach).contains(load) );
+        assertEquals( 1, plan.getPredecessors(foreach).size() );
+        
+        assertTrue( plan.getPredecessors(filter).contains(filter2) );
+        assertEquals( 1, plan.getPredecessors(filter).size() );
+        
+        assertTrue( foreach.getInnerPlan().getSinks().contains(generate) );
+        assertEquals( 1, foreach.getInnerPlan().getSinks().size() );
+        assertTrue( foreach.getInnerPlan().getSources().contains(innerLoad) );
+        assertTrue( foreach.getInnerPlan().getSources().contains(innerLoad2) );
+        assertEquals( 2, foreach.getInnerPlan().getSources().size() );
+        
+        // Run the optimizer
+        MyPlanOptimizer optimizer = new MyPlanOptimizer(plan, ls, 3);
+        optimizer.addPlanTransformListener(new SchemaPatcher());
+        optimizer.addPlanTransformListener(new ProjectionPatcher());
+        optimizer.optimize();
+        
+        // Test after optimization
+        list = plan.getSinks();
+        assertTrue( list.contains(stor) );
+        
+        list = plan.getSources();
+        assertTrue( list.contains(load) );
+        
+        assertTrue( plan.getPredecessors(stor).contains(filter) ); 
+        assertEquals( 1, plan.getPredecessors(stor).size() );
+        
+        assertTrue( plan.getPredecessors(filter2).contains(foreach) );
+        assertEquals( 1, plan.getPredecessors(filter2).size() );
+        
+        assertTrue( plan.getPredecessors(foreach).contains(load) );
+        assertEquals( 1, plan.getPredecessors(foreach).size() );
+        
+        assertFalse( prjCuisines.getFieldSchema().uid == namePrj2.getFieldSchema().uid );
+        assertFalse( prjCuisines.getFieldSchema().uid == name2Prj2.getFieldSchema().uid );
+        
+        assertTrue( plan.getPredecessors(filter).contains(filter2) );
+        assertEquals( 1, plan.getPredecessors(filter).size() );
+        
+        assertTrue( foreach.getInnerPlan().getSinks().contains(generate) );
+        assertEquals( 1, foreach.getInnerPlan().getSinks().size() );
+        assertTrue( foreach.getInnerPlan().getSources().contains(innerLoad) );
+        assertTrue( foreach.getInnerPlan().getSources().contains(innerLoad2) );
+        assertEquals( 2, foreach.getInnerPlan().getSources().size() );    
+    }
+    
+    public void testNotPossibleFilter() throws Exception {
+        // Plan here is 
+        // Load (name, cuisines{t:(name)}) -> foreach gen name,flatten(cuisines) 
+        // -> filter $1 == 'joe' --> stor
+
+        LogicalPlan plan = null;
+        LOLoad load = null;
+        LOForEach foreach = null;
+        LOFilter filter = null;
+        LOStore stor = null;
+
+        plan = new LogicalPlan();
+
+        LogicalSchema schema = new LogicalSchema();
+        schema.addField(new LogicalSchema.LogicalFieldSchema("name", null, DataType.CHARARRAY));
+        LogicalSchema bagSchema = new LogicalSchema();
+        LogicalSchema bagTupleSchema = new LogicalSchema();
+        bagTupleSchema.addField( new LogicalSchema.LogicalFieldSchema("name", null, DataType.CHARARRAY) );
+        bagSchema.addField( new LogicalSchema.LogicalFieldSchema( "t", bagTupleSchema, DataType.TUPLE ) );
+        schema.addField(new LogicalSchema.LogicalFieldSchema("cuisines", bagSchema, DataType.BAG));
+        
+        load = new LOLoad(null, schema, plan, null);
+        load.setAlias("A");
+        plan.add(load);
+        
+        foreach = new LOForEach(plan);
+        
+        LogicalPlan innerPlan = new LogicalPlan();
+        boolean flatten[] = new boolean[2];
+        flatten[0] = false;
+        flatten[1] = true;
+        
+        List<LogicalExpressionPlan> expPlans = new ArrayList<LogicalExpressionPlan>();
+
+        LOGenerate generate = new LOGenerate(innerPlan, expPlans, flatten);        
+
+        LOInnerLoad innerLoad = new LOInnerLoad(innerPlan, foreach, 0);
+        innerPlan.add(innerLoad);
+        
+        LOInnerLoad innerLoad2 = new LOInnerLoad(innerPlan, foreach, 1);
+        innerPlan.add(innerLoad2);
+        
+        LogicalExpressionPlan namePrj = new LogicalExpressionPlan();        
+        ProjectExpression prjName = new ProjectExpression(namePrj, 0, 0, generate);
+        namePrj.add(prjName);
+        
+        LogicalExpressionPlan cuisinesPrj = new LogicalExpressionPlan();
+        ProjectExpression prjCuisines = new ProjectExpression(cuisinesPrj, 1, 0, generate);
+        cuisinesPrj.add(prjCuisines);
+        
+        expPlans.add(namePrj);
+        expPlans.add(cuisinesPrj);
+        
+        innerPlan.add(generate);
+        innerPlan.connect(innerLoad, generate);
+        innerPlan.connect(innerLoad2, generate);
+        
+        foreach.setInnerPlan(innerPlan);
+        foreach.setAlias("B");
+        plan.add(foreach);
+        
+        plan.connect(load, foreach);
+        
+        filter = new LOFilter(plan);
+        LogicalExpressionPlan filterPlan = new LogicalExpressionPlan();
+        ProjectExpression namePrj2 = new ProjectExpression(filterPlan, 0, 1, filter);
+        filterPlan.add(namePrj2);
+        ConstantExpression constExp = new ConstantExpression(filterPlan, "joe", 
+                new LogicalFieldSchema(null, null, DataType.CHARARRAY));
+        filterPlan.add(constExp);
+        EqualExpression equal = new EqualExpression(filterPlan, namePrj2, constExp);
+        filterPlan.add(equal);
+        
+        filter.setFilterPlan(filterPlan);
+        filter.setAlias("C");
+        plan.add(filter);
+        
+        plan.connect(foreach, filter);
+        
+        stor = new LOStore(plan);
+        stor.setAlias("D");
+        plan.add(stor);
+        plan.connect(filter,stor);
+        
+        // run filter rule
+        Rule r = new FilterAboveForeach("FilterAboveFlatten");
+        Set<Rule> s = new HashSet<Rule>();
+        s.add(r);
+        List<Set<Rule>> ls = new ArrayList<Set<Rule>>();
+        ls.add(s);
+        
+        // Test Plan before optimizing
+        List<Operator> list = plan.getSinks();
+        assertTrue( list.contains(stor) );
+        
+        list = plan.getSources();
+        assertTrue( list.contains(load) );
+        
+        assertTrue( plan.getPredecessors(stor).contains(filter) ); 
+        assertEquals( 1, plan.getPredecessors(stor).size() );
+        
+        assertTrue( plan.getPredecessors(filter).contains(foreach) );
+        assertEquals( 1, plan.getPredecessors(filter).size() );
+        
+        assertTrue( foreach.getInnerPlan().getSinks().contains(generate) );
+        assertEquals( 1, foreach.getInnerPlan().getSinks().size() );
+        assertTrue( foreach.getInnerPlan().getSources().contains(innerLoad) );
+        assertTrue( foreach.getInnerPlan().getSources().contains(innerLoad2) );
+        assertEquals( 2, foreach.getInnerPlan().getSources().size() );
+        
+        // Run the optimizer
+        MyPlanOptimizer optimizer = new MyPlanOptimizer(plan, ls, 3);
+        optimizer.addPlanTransformListener(new SchemaPatcher());
+        optimizer.addPlanTransformListener(new ProjectionPatcher());
+        optimizer.optimize();
+        
+        // Test after optimization
+        list = plan.getSinks();
+        assertTrue( list.contains(stor) );
+        
+        list = plan.getSources();
+        assertTrue( list.contains(load) );
+        
+        assertTrue( plan.getPredecessors(stor).contains(filter) ); 
+        assertEquals( 1, plan.getPredecessors(stor).size() );
+        
+        assertFalse( prjCuisines.getFieldSchema().uid == namePrj2.getFieldSchema().uid );
+        
+        assertTrue( plan.getPredecessors(filter).contains(foreach) );
+        assertEquals( 1, plan.getPredecessors(filter).size() );
+        
+        assertTrue( foreach.getInnerPlan().getSinks().contains(generate) );
+        assertEquals( 1, foreach.getInnerPlan().getSinks().size() );
+        assertTrue( foreach.getInnerPlan().getSources().contains(innerLoad) );
+        assertTrue( foreach.getInnerPlan().getSources().contains(innerLoad2) );
+        assertEquals( 2, foreach.getInnerPlan().getSources().size() );
+    }
+    
+    public void testSimple2() throws Exception {
+        
+        // Plan here is 
+        // Load (name, cuisines{t:(name)}) -> foreach gen name,cuisines 
+        // -> filter name == 'joe' --> stor
+        
+        LogicalPlan plan = null;
+        LOLoad load = null;
+        LOForEach foreach = null;
+        LOFilter filter = null;
+        LOStore stor = null;
+        
+        plan = new LogicalPlan();
+        
+        LogicalSchema schema = new LogicalSchema();
+        schema.addField(new LogicalSchema.LogicalFieldSchema("name", null, DataType.CHARARRAY));
+        LogicalSchema bagSchema = new LogicalSchema();
+        LogicalSchema bagTupleSchema = new LogicalSchema();
+        bagTupleSchema.addField( new LogicalSchema.LogicalFieldSchema("name", null, DataType.CHARARRAY) );
+        bagSchema.addField( new LogicalSchema.LogicalFieldSchema( "t", bagTupleSchema, DataType.TUPLE ) );
+        schema.addField(new LogicalSchema.LogicalFieldSchema("cuisines", bagSchema, DataType.BAG));
+        
+        load = new LOLoad(null, schema, plan, null);
+        load.setAlias("A");
+        plan.add(load);
+        
+        foreach = new LOForEach(plan);
+        
+        LogicalPlan innerPlan = new LogicalPlan();
+
+        List<LogicalExpressionPlan> expPlans = new ArrayList<LogicalExpressionPlan>();
+        
+        boolean flatten[] = new boolean[2];
+        flatten[0] = false;
+        flatten[1] = false;
+        
+        LOGenerate generate = new LOGenerate(innerPlan, expPlans, flatten);        
+
+        LOInnerLoad innerLoad = new LOInnerLoad(innerPlan, foreach, 0);
+        innerPlan.add(innerLoad);
+        
+        LOInnerLoad innerLoad2 = new LOInnerLoad(innerPlan, foreach, 1);
+        innerPlan.add(innerLoad2);
+        
+        LogicalExpressionPlan namePrj = new LogicalExpressionPlan();        
+        ProjectExpression prjName = new ProjectExpression(namePrj, 0, 0, generate);
+        namePrj.add(prjName);
+        
+        LogicalExpressionPlan cuisinesPrj = new LogicalExpressionPlan();
+        ProjectExpression prjCuisines = new ProjectExpression(cuisinesPrj, 1, 0, generate);
+        cuisinesPrj.add(prjCuisines);
+        
+        expPlans.add(namePrj);
+        expPlans.add(cuisinesPrj);
+        
+        innerPlan.add(generate);
+        innerPlan.connect(innerLoad, generate);
+        innerPlan.connect(innerLoad2, generate);
+        
+        foreach.setInnerPlan(innerPlan);
+        foreach.setAlias("B");
+        plan.add(foreach);
+        
+        plan.connect(load, foreach);
+        
+        filter = new LOFilter(plan);
+        LogicalExpressionPlan filterPlan = new LogicalExpressionPlan();
+        ProjectExpression namePrj2 = new ProjectExpression(filterPlan, 0, 0, filter);
+        filterPlan.add(namePrj2);
+        ConstantExpression constExp = new ConstantExpression(filterPlan, "joe", 
+                new LogicalFieldSchema(null, null, DataType.CHARARRAY));
+        filterPlan.add(constExp);
+        EqualExpression equal = new EqualExpression(filterPlan, namePrj2, constExp);
+        filterPlan.add(equal);
+        
+        filter.setFilterPlan(filterPlan);
+        filter.setAlias("C");
+        plan.add(filter);
+        
+        plan.connect(foreach, filter);
+        
+        stor = new LOStore(plan);
+        stor.setAlias("D");
+        plan.add(stor);
+        plan.connect(filter,stor);
+        
+        // run filter rule
+        Rule r = new FilterAboveForeach("FilterAboveFlatten");
+        Set<Rule> s = new HashSet<Rule>();
+        s.add(r);
+        List<Set<Rule>> ls = new ArrayList<Set<Rule>>();
+        ls.add(s);
+        
+        // Test Plan before optimizing
+        List<Operator> list = plan.getSinks();
+        assertTrue( list.contains(stor) );
+        
+        list = plan.getSources();
+        assertTrue( list.contains(load) );
+        
+        assertTrue( plan.getPredecessors(stor).contains(filter) ); 
+        assertEquals( 1, plan.getPredecessors(stor).size() );
+        
+        assertTrue( plan.getPredecessors(filter).contains(foreach) );
+        assertEquals( 1, plan.getPredecessors(filter).size() );
+        
+        assertTrue( foreach.getInnerPlan().getSinks().contains(generate) );
+        assertEquals( 1, foreach.getInnerPlan().getSinks().size() );
+        assertTrue( foreach.getInnerPlan().getSources().contains(innerLoad) );
+        assertTrue( foreach.getInnerPlan().getSources().contains(innerLoad2) );
+        assertEquals( 2, foreach.getInnerPlan().getSources().size() );
+        
+        // Run the optimizer
+        MyPlanOptimizer optimizer = new MyPlanOptimizer(plan, ls, 3);
+        optimizer.addPlanTransformListener(new SchemaPatcher());
+        optimizer.addPlanTransformListener(new ProjectionPatcher());
+        optimizer.optimize();
+        
+        // Test after optimization
+        list = plan.getSinks();
+        assertTrue( list.contains(stor) );
+        
+        list = plan.getSources();
+        assertTrue( list.contains(load) );
+        
+        assertTrue( plan.getPredecessors(stor).contains(foreach) ); 
+        assertEquals( 1, plan.getPredecessors(stor).size() );
+        
+        assertTrue( plan.getPredecessors(filter).contains(load) );
+        assertEquals( 1, plan.getPredecessors(filter).size() );
+        
+        assertTrue( plan.getPredecessors(foreach).contains(filter) );
+        assertEquals( 1, plan.getPredecessors(foreach).size() );
+        
+        assertEquals( load.getSchema().getField(0).uid , namePrj2.getFieldSchema().uid );
+        assertEquals( namePrj2.getFieldSchema().uid, prjName.getFieldSchema().uid );
+        
+        assertTrue( foreach.getInnerPlan().getSinks().contains(generate) );
+        assertEquals( 1, foreach.getInnerPlan().getSinks().size() );
+        assertTrue( foreach.getInnerPlan().getSources().contains(innerLoad) );
+        assertTrue( foreach.getInnerPlan().getSources().contains(innerLoad2) );
+        assertEquals( 2, foreach.getInnerPlan().getSources().size() );
+    }
+    
+    public class MyPlanOptimizer extends PlanOptimizer {
+
+        protected MyPlanOptimizer(OperatorPlan p, List<Set<Rule>> rs,
+                int iterations) {
+            super(p, rs, iterations);           
+        }
+        
+        public void addPlanTransformListener(PlanTransformListener listener) {
+            super.addPlanTransformListener(listener);
+        }
+        
+    }
+}

Added: hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanFilterRule.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanFilterRule.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanFilterRule.java (added)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanFilterRule.java Wed Aug  4 17:46:42 2010
@@ -0,0 +1,493 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.test;
+
+import java.io.IOException;
+import java.util.*;
+import org.apache.pig.data.DataType;
+import org.apache.pig.impl.util.MultiMap;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.logical.expression.*;
+import org.apache.pig.newplan.logical.optimizer.ProjectionPatcher;
+import org.apache.pig.newplan.logical.optimizer.SchemaPatcher;
+import org.apache.pig.newplan.logical.relational.LOFilter;
+import org.apache.pig.newplan.logical.relational.LOJoin;
+import org.apache.pig.newplan.logical.relational.LOLoad;
+import org.apache.pig.newplan.logical.relational.LOStore;
+import org.apache.pig.newplan.logical.relational.LogicalPlan;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.newplan.logical.relational.LogicalSchema;
+import org.apache.pig.newplan.logical.rules.MergeFilter;
+import org.apache.pig.newplan.logical.rules.PushUpFilter;
+import org.apache.pig.newplan.logical.rules.SplitFilter;
+import org.apache.pig.newplan.optimizer.PlanOptimizer;
+import org.apache.pig.newplan.optimizer.PlanTransformListener;
+import org.apache.pig.newplan.optimizer.Rule;
+
+import junit.framework.TestCase;
+
+public class TestNewPlanFilterRule extends TestCase {
+
+    LogicalPlan plan = null;
+    LogicalRelationalOperator load1 = null;
+    LogicalRelationalOperator load2 = null;
+    LogicalRelationalOperator filter = null;
+    LogicalRelationalOperator join = null;
+    LogicalRelationalOperator store = null;    
+    
+    private void prep() {
+        plan = new LogicalPlan();
+        LogicalSchema schema = new LogicalSchema();
+        schema.addField(new LogicalSchema.LogicalFieldSchema("id", null, DataType.INTEGER));
+        schema.addField(new LogicalSchema.LogicalFieldSchema("name", null, DataType.CHARARRAY));
+        schema.addField(new LogicalSchema.LogicalFieldSchema("age", null, DataType.INTEGER));    
+        schema.getField(0).uid = 1;
+        schema.getField(1).uid = 2;
+        schema.getField(2).uid = 3;
+        LogicalRelationalOperator l1 = new LOLoad(null, schema, plan, null);
+        l1.setAlias("A");
+        plan.add(l1);
+
+        schema = new LogicalSchema();
+        schema.addField(new LogicalSchema.LogicalFieldSchema("id", null, DataType.INTEGER));
+        schema.addField(new LogicalSchema.LogicalFieldSchema("dept", null, DataType.INTEGER));
+        schema.addField(new LogicalSchema.LogicalFieldSchema("salary", null, DataType.FLOAT));    
+        schema.getField(0).uid = 4;
+        schema.getField(1).uid = 5;
+        schema.getField(2).uid = 6;
+        LogicalRelationalOperator l2 = new LOLoad(null, schema, plan, null);
+        l2.setAlias("B");
+        plan.add(l2);
+        
+        MultiMap<Integer, LogicalExpressionPlan> joinPlans = new MultiMap<Integer, LogicalExpressionPlan>();
+        
+        LogicalRelationalOperator j1 = new LOJoin(plan, joinPlans, LOJoin.JOINTYPE.HASH, new boolean[]{true, true});
+        LogicalExpressionPlan p1 = new LogicalExpressionPlan();
+        ProjectExpression lp1 = new ProjectExpression(p1, 0, 1, j1);
+        p1.add(lp1);
+        joinPlans.put(0, p1);
+        
+        LogicalExpressionPlan p2 = new LogicalExpressionPlan();
+        ProjectExpression lp2 = new ProjectExpression(p2, 1, 1, j1);
+        p2.add(lp2);
+        joinPlans.put(1, p2);
+        
+        j1.setAlias("C");
+        plan.add(j1);
+        
+        // build an expression with no AND
+        LogicalExpressionPlan p3 = new LogicalExpressionPlan();
+        LogicalRelationalOperator f1 = new LOFilter(plan, p3);
+        
+        LogicalExpression lp3 = new ProjectExpression(p3, 0, 2, f1);
+        LogicalSchema.LogicalFieldSchema fs = new LogicalSchema.LogicalFieldSchema(null, null, DataType.INTEGER);
+        LogicalExpression cont = new ConstantExpression(p3, new Integer(3), fs);
+        p3.add(lp3);
+        p3.add(cont);       
+        LogicalExpression eq = new EqualExpression(p3, lp3, cont);        
+        
+        
+        f1.setAlias("D");
+        plan.add(f1);
+        
+        LogicalRelationalOperator s1 = new LOStore(plan);
+        plan.add(s1);       
+        
+        // load --|-join - filter - store
+        // load --|   
+        plan.connect(l1, j1);
+        plan.connect(l2, j1);
+        plan.connect(j1, f1);        
+        plan.connect(f1, s1);      
+        
+        filter = f1;
+        store = s1;
+        join = j1;
+        load1 = l1;
+        load2 = l2;
+    }
+    
+    public void testFilterRule() throws Exception  {
+        prep();
+        // run split filter rule
+        Rule r = new SplitFilter("SplitFilter");
+        Set<Rule> s = new HashSet<Rule>();
+        s.add(r);
+        List<Set<Rule>> ls = new ArrayList<Set<Rule>>();
+        ls.add(s);
+        MyPlanOptimizer optimizer = new MyPlanOptimizer(plan, ls, 3);
+        optimizer.optimize();
+        
+        assertEquals(plan.getPredecessors(filter).get(0), join);
+        assertEquals(plan.getSuccessors(filter).get(0), store);
+        
+        // run push up filter rule
+        r = new PushUpFilter("PushUpFilter");
+        s = new HashSet<Rule>();
+        s.add(r);
+        ls = new ArrayList<Set<Rule>>();
+        ls.add(s);
+        optimizer = new MyPlanOptimizer(plan, ls, 3);
+        optimizer.optimize();
+        
+        // the filter should be moved up to be after load
+        assertEquals(plan.getSuccessors(load1).get(0), filter);
+        assertEquals(plan.getSuccessors(filter).get(0), join);
+        assertEquals(plan.getSuccessors(join).get(0), store);
+        
+        // run merge filter rule
+        r = new MergeFilter("MergeFilter");
+        s = new HashSet<Rule>();
+        s.add(r);
+        ls = new ArrayList<Set<Rule>>();
+        ls.add(s);
+        optimizer = new MyPlanOptimizer(plan, ls, 3);
+        optimizer.optimize();
+        
+        // the filter should the same as before, nothing to merge
+        assertEquals(plan.getSuccessors(load1).get(0), filter);
+        assertEquals(plan.getSuccessors(filter).get(0), join);
+        assertEquals(plan.getSuccessors(join).get(0), store);
+    }
+        
+    // build an expression with 1 AND, it should split into 2 filters
+    public void testFilterRuleWithAnd() throws Exception  {
+        prep();
+        
+        LogicalExpressionPlan p4 = new LogicalExpressionPlan();        
+        LogicalExpression lp3 = new ProjectExpression(p4, 0, 2, filter);
+        LogicalSchema.LogicalFieldSchema fs = new LogicalSchema.LogicalFieldSchema(null, null, DataType.INTEGER);
+        LogicalExpression cont = new ConstantExpression(p4, new Integer(3), fs);
+        p4.add(lp3);
+        p4.add(cont);
+        LogicalExpression eq = new EqualExpression(p4, lp3, cont);
+      
+        LogicalExpression lp4 = new ProjectExpression(p4, 0, 5, filter);
+        fs = new LogicalSchema.LogicalFieldSchema(null, null, DataType.FLOAT);
+        LogicalExpression cont2 = new ConstantExpression(p4, new Float(100), fs);
+        p4.add(lp4);
+        p4.add(cont2);
+        LogicalExpression eq2 = new EqualExpression(p4, lp4, cont2);        
+    
+        LogicalExpression and = new AndExpression(p4, eq, eq2);        
+        
+        ((LOFilter)filter).setFilterPlan(p4);
+        
+        // run split filter rule
+        Rule r = new SplitFilter("SplitFilter");
+        Set<Rule> s = new HashSet<Rule>();
+        s.add(r);
+        List<Set<Rule>> ls = new ArrayList<Set<Rule>>();
+        ls.add(s);
+        PlanOptimizer optimizer = new MyPlanOptimizer(plan, ls, 3);
+        optimizer.optimize();
+        
+        assertEquals(plan.getPredecessors(filter).get(0), join);
+        Operator next = plan.getSuccessors(filter).get(0);
+        assertEquals(LOFilter.class, next.getClass());        
+        next = plan.getSuccessors(next).get(0);
+        assertEquals(LOStore.class, next.getClass());
+        
+        // run push up filter rule
+        r = new PushUpFilter("PushUpFilter");
+        s = new HashSet<Rule>();
+        s.add(r);
+        ls = new ArrayList<Set<Rule>>();
+        ls.add(s);
+        optimizer = new MyPlanOptimizer(plan, ls, 3);
+        optimizer.optimize();
+        
+        // both filters should be moved up to be after each load
+        next = plan.getSuccessors(load1).get(0);
+        assertEquals(next.getClass(), LOFilter.class);
+        assertEquals(plan.getSuccessors(next).get(0), join);
+        
+        next = plan.getSuccessors(load2).get(0);
+        assertEquals(next.getClass(), LOFilter.class);
+        assertEquals(plan.getSuccessors(next).get(0), join);
+        
+        assertEquals(plan.getSuccessors(join).get(0), store);
+        
+        // run merge filter rule
+        r = new MergeFilter("MergeFilter");
+        s = new HashSet<Rule>();
+        s.add(r);
+        ls = new ArrayList<Set<Rule>>();
+        ls.add(s);
+        optimizer = new MyPlanOptimizer(plan, ls, 3);
+        optimizer.optimize();
+        
+        // the filters should the same as before, nothing to merge
+        next = plan.getSuccessors(load1).get(0);
+        assertEquals(next.getClass(), LOFilter.class);
+        assertEquals(plan.getSuccessors(next).get(0), join);
+        
+        next = plan.getSuccessors(load2).get(0);
+        assertEquals(next.getClass(), LOFilter.class);
+        assertEquals(plan.getSuccessors(next).get(0), join);
+        
+        assertEquals(plan.getSuccessors(join).get(0), store);
+    }
+    
+    public void testFilterRuleWith2And() throws Exception  {
+        prep();
+        // build an expression with 2 AND, it should split into 3 filters
+        LogicalExpressionPlan p5 = new LogicalExpressionPlan();
+        
+       
+        LogicalExpression lp3 = new ProjectExpression(p5, 0, 2, filter);
+        LogicalSchema.LogicalFieldSchema fs = new LogicalSchema.LogicalFieldSchema(null, null, DataType.INTEGER);
+        LogicalExpression cont = new ConstantExpression(p5, new Integer(3),fs);
+        p5.add(lp3);
+        p5.add(cont);       
+        LogicalExpression eq = new EqualExpression(p5, lp3, cont);
+        
+        LogicalExpression lp4 = new ProjectExpression(p5, 0, 3, filter);
+        LogicalExpression cont2 = new ConstantExpression(p5, new Integer(3), fs);        
+        p5.add(lp4);
+        p5.add(cont2);
+        LogicalExpression eq2 = new EqualExpression(p5, lp4, cont2);        
+        
+        LogicalExpression and1 = new AndExpression(p5, eq, eq2);
+       
+        lp3 = new ProjectExpression(p5, 0, 0, filter);
+        lp4 = new ProjectExpression(p5, 0, 3, filter);
+        p5.add(lp3);
+        p5.add(lp4);   
+        eq2 = new EqualExpression(p5, lp3, lp4);        
+              
+        LogicalExpression and2 = new AndExpression(p5, and1, eq2);        
+        
+        ((LOFilter)filter).setFilterPlan(p5);
+        
+        Rule r = new SplitFilter("SplitFilter");
+        Set<Rule> s = new HashSet<Rule>();
+        s.add(r);
+        List<Set<Rule>> ls = new ArrayList<Set<Rule>>();
+        ls.add(s);
+        MyPlanOptimizer optimizer = new MyPlanOptimizer(plan, ls, 3);
+        MyPlanTransformListener listener = new MyPlanTransformListener();
+        optimizer.addPlanTransformListener(listener);
+        optimizer.optimize();
+        
+        assertEquals(plan.getPredecessors(filter).get(0), join);
+        Operator next = plan.getSuccessors(filter).get(0);
+        assertEquals(next.getClass(), LOFilter.class);
+        next = plan.getSuccessors(next).get(0);
+        assertEquals(next.getClass(), LOFilter.class);
+        next = plan.getSuccessors(next).get(0);
+        assertEquals(LOStore.class, next.getClass());
+        
+        OperatorPlan transformed = listener.getTransformed();
+        assertEquals(transformed.size(), 3);
+        
+        // run push up filter rule
+        r = new PushUpFilter("PushUpFilter");
+        s = new HashSet<Rule>();
+        s.add(r);
+        ls = new ArrayList<Set<Rule>>();
+        ls.add(s);
+        optimizer = new MyPlanOptimizer(plan, ls, 3);
+        listener = new MyPlanTransformListener();
+        optimizer.addPlanTransformListener(listener);
+        optimizer.optimize();
+        
+        // 2 filters should be moved up to be after each load, and one filter should remain
+        next = plan.getSuccessors(load1).get(0);
+        assertEquals(next.getClass(), LOFilter.class);
+        assertEquals(plan.getSuccessors(next).get(0), join);
+        
+        next = plan.getSuccessors(load2).get(0);
+        assertEquals(next.getClass(), LOFilter.class);
+        assertEquals(plan.getSuccessors(next).get(0), join);
+        
+        next = plan.getSuccessors(join).get(0);
+        assertEquals(next.getClass(), LOFilter.class);
+        
+        next = plan.getSuccessors(next).get(0);
+        assertEquals(next.getClass(), LOStore.class);
+        
+        transformed = listener.getTransformed();
+        assertEquals(transformed.size(), 4);
+        assertEquals(transformed.getSinks().get(0).getClass(), LOFilter.class);
+        assertEquals(transformed.getSources().get(0).getClass(), LOLoad.class);
+        
+        // run merge filter rule
+        r = new MergeFilter("MergeFilter");
+        s = new HashSet<Rule>();
+        s.add(r);
+        ls = new ArrayList<Set<Rule>>();
+        ls.add(s);
+        optimizer = new MyPlanOptimizer(plan, ls, 3);
+        listener = new MyPlanTransformListener();
+        optimizer.addPlanTransformListener(listener);
+        optimizer.optimize();
+        
+        // the filters should the same as before, nothing to merge
+        next = plan.getSuccessors(load1).get(0);
+        assertEquals(next.getClass(), LOFilter.class);
+        assertEquals(plan.getSuccessors(next).get(0), join);
+        
+        next = plan.getSuccessors(load2).get(0);
+        assertEquals(next.getClass(), LOFilter.class);
+        assertEquals(plan.getSuccessors(next).get(0), join);
+        
+        next = plan.getSuccessors(join).get(0);
+        assertEquals(next.getClass(), LOFilter.class);
+        
+        next = plan.getSuccessors(next).get(0);
+        assertEquals(next.getClass(), LOStore.class);
+        
+        transformed = listener.getTransformed();
+        assertNull(transformed);
+    }   
+    
+    public void testFilterRuleWith2And2() throws Exception  {
+        prep();
+        // build an expression with 2 AND, it should split into 3 filters
+        LogicalExpressionPlan p5 = new LogicalExpressionPlan();
+        
+        LogicalExpression lp3 = new ProjectExpression(p5, 0, 2, filter);
+        LogicalSchema.LogicalFieldSchema fs = new LogicalSchema.LogicalFieldSchema(null, null, DataType.INTEGER);
+        LogicalExpression cont = new ConstantExpression(p5, new Integer(3),fs);
+        p5.add(lp3);
+        p5.add(cont);
+        LogicalExpression eq = new EqualExpression(p5, lp3, cont);      
+        
+        lp3 = new ProjectExpression(p5, 0, 0, filter);
+        LogicalExpression lp4 = new ProjectExpression(p5, 0, 3, filter);        
+        p5.add(lp4);
+        p5.add(lp3);
+        LogicalExpression eq2 = new EqualExpression(p5, lp3, lp4);
+        
+        LogicalExpression and1 = new AndExpression(p5, eq, eq2);
+        
+        lp3 = new ProjectExpression(p5, 0, 2, filter);
+        fs = new LogicalSchema.LogicalFieldSchema(null, null, DataType.FLOAT);
+        lp4 = new ProjectExpression(p5, 0, 5, filter);
+        p5.add(lp3);
+        p5.add(lp4);
+        eq2 = new EqualExpression(p5, lp3, lp4);
+        
+        LogicalExpression and2 = new AndExpression(p5, and1, eq2);    
+        
+        ((LOFilter)filter).setFilterPlan(p5);
+        
+        Rule r = new SplitFilter("SplitFilter");
+        Set<Rule> s = new HashSet<Rule>();
+        s.add(r);
+        List<Set<Rule>> ls = new ArrayList<Set<Rule>>();
+        ls.add(s);
+        MyPlanOptimizer optimizer = new MyPlanOptimizer(plan, ls, 3);
+        optimizer.optimize();
+        
+        assertEquals(plan.getPredecessors(filter).get(0), join);
+        Operator next = plan.getSuccessors(filter).get(0);
+        assertEquals(next.getClass(), LOFilter.class);
+        next = plan.getSuccessors(next).get(0);
+        assertEquals(next.getClass(), LOFilter.class);
+        next = plan.getSuccessors(next).get(0);
+        assertEquals(LOStore.class, next.getClass());
+        
+        // run push up filter rule
+        r = new PushUpFilter("PushUpFilter");
+        s = new HashSet<Rule>();
+        s.add(r);
+        ls = new ArrayList<Set<Rule>>();
+        ls.add(s);
+        optimizer = new MyPlanOptimizer(plan, ls, 3);
+        optimizer.optimize();
+        
+        // 1 filter should be moved up to be after a load, and 2 filters should remain
+        next = plan.getSuccessors(load1).get(0);
+        assertEquals(next.getClass(), LOFilter.class);
+        assertEquals(plan.getSuccessors(next).get(0), join);
+        
+        next = plan.getSuccessors(load2).get(0);
+        assertEquals(next, join);     
+        
+        next = plan.getSuccessors(join).get(0);
+        assertEquals(next.getClass(), LOFilter.class);
+        
+        next = plan.getSuccessors(next).get(0);
+        assertEquals(next.getClass(), LOFilter.class);
+                
+        next = plan.getSuccessors(next).get(0);
+        assertEquals(next.getClass(), LOStore.class);
+        
+        // run merge filter rule
+        r = new MergeFilter("MergeFilter");
+        s = new HashSet<Rule>();
+        s.add(r);
+        ls = new ArrayList<Set<Rule>>();
+        ls.add(s);
+        optimizer = new MyPlanOptimizer(plan, ls, 3);
+        MyPlanTransformListener listener = new MyPlanTransformListener();
+        optimizer.addPlanTransformListener(listener);
+        optimizer.optimize();
+        
+        // the 2 filters after join should merge
+        next = plan.getSuccessors(load1).get(0);
+        assertEquals(next.getClass(), LOFilter.class);
+        assertEquals(plan.getSuccessors(next).get(0), join);
+        
+        next = plan.getSuccessors(load2).get(0);
+        assertEquals(next, join);        
+        
+        next = plan.getSuccessors(join).get(0);
+        assertEquals(next.getClass(), LOFilter.class);
+        
+        next = plan.getSuccessors(next).get(0);
+        assertEquals(next.getClass(), LOStore.class);
+        
+        OperatorPlan transformed = listener.getTransformed();
+        assertEquals(transformed.size(), 2);
+    }   
+    
+    public class MyPlanOptimizer extends PlanOptimizer {
+
+        protected MyPlanOptimizer(OperatorPlan p, List<Set<Rule>> rs,
+                int iterations) {
+            super(p, rs, iterations);			
+            addPlanTransformListener(new SchemaPatcher());
+            addPlanTransformListener(new ProjectionPatcher());
+        }
+        
+        public void addPlanTransformListener(PlanTransformListener listener) {
+            super.addPlanTransformListener(listener);
+        }
+        
+    }
+    
+    public class MyPlanTransformListener implements PlanTransformListener {
+
+        private OperatorPlan tp;
+
+        @Override
+        public void transformed(OperatorPlan fp, OperatorPlan tp)
+                throws IOException {
+            this.tp = tp;
+        }
+        
+        public OperatorPlan getTransformed() {
+            return tp;
+        }
+    }
+}

Added: hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanListener.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanListener.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanListener.java (added)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanListener.java Wed Aug  4 17:46:42 2010
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.test;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.pig.data.DataType;
+import org.apache.pig.impl.util.MultiMap;
+import org.apache.pig.newplan.DepthFirstWalker;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.logical.expression.AndExpression;
+import org.apache.pig.newplan.logical.expression.ConstantExpression;
+import org.apache.pig.newplan.logical.expression.EqualExpression;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionVisitor;
+import org.apache.pig.newplan.logical.expression.ProjectExpression;
+import org.apache.pig.newplan.logical.optimizer.AllExpressionVisitor;
+import org.apache.pig.newplan.logical.optimizer.AllSameRalationalNodesVisitor;
+import org.apache.pig.newplan.logical.optimizer.ProjectionPatcher;
+import org.apache.pig.newplan.logical.optimizer.SchemaPatcher;
+import org.apache.pig.newplan.logical.relational.LOFilter;
+import org.apache.pig.newplan.logical.relational.LOJoin;
+import org.apache.pig.newplan.logical.relational.LOLoad;
+import org.apache.pig.newplan.logical.relational.LogicalPlan;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.newplan.logical.relational.LogicalSchema;
+import org.apache.pig.newplan.logical.relational.LOJoin.JOINTYPE;
+import org.apache.pig.newplan.logical.relational.LogicalSchema.LogicalFieldSchema;
+import org.junit.Before;
+import org.junit.Test;
+
+import junit.framework.TestCase;
+
+/**
+ * Tests for PlanTransformListerns
+ *
+ */
+public class TestNewPlanListener extends TestCase {
+    
+    private LogicalPlan lp;
+    private LogicalPlan changedPlan;
+
+    /* (non-Javadoc)
+     * @see junit.framework.TestCase#setUp()
+     */
+    @Before
+    protected void setUp() throws Exception {
+        super.setUp();
+        
+        // Build a plan that looks like it has just been transformed
+        // It is roughly the logical plan for
+        // A = load 'bla' as (x);
+        // B = load 'morebla' as (y);
+        // C = join A on x, B on y;
+        // D = filter C by y > 0;
+        // The plan is built with the filter having been pushed above the join
+        // but the listners not yet having been called.
+        // A = load
+        lp = new LogicalPlan();
+        LogicalSchema aschema = new LogicalSchema();
+        aschema.addField(new LogicalSchema.LogicalFieldSchema(
+            "x", null, DataType.INTEGER));
+        aschema.getField(0).uid = 1;
+        LOLoad A = new LOLoad(null, null, lp, null);
+        A.neverUseForRealSetSchema(aschema);
+        lp.add(A);
+        
+        // B = load
+        LogicalSchema bschema = new LogicalSchema();
+        bschema.addField(new LogicalSchema.LogicalFieldSchema(
+            "y", null, DataType.INTEGER));
+        bschema.getField(0).uid = 2;
+        LOLoad B = new LOLoad(null, null, lp, null);
+        B.neverUseForRealSetSchema(bschema);
+        lp.add(B);
+        
+        // C = join
+        LogicalSchema cschema = new LogicalSchema();
+        cschema.addField(new LogicalSchema.LogicalFieldSchema(
+            "x", null, DataType.INTEGER));
+        cschema.addField(new LogicalSchema.LogicalFieldSchema(
+            "y", null, DataType.INTEGER));
+        cschema.getField(0).uid = 1;
+        cschema.getField(1).uid = 2;
+
+        MultiMap<Integer, LogicalExpressionPlan> mm = 
+            new MultiMap<Integer, LogicalExpressionPlan>();
+        LOJoin C = new LOJoin(lp, mm, JOINTYPE.HASH, new boolean[] {true, true});
+        
+        LogicalExpressionPlan aprojplan = new LogicalExpressionPlan();
+        ProjectExpression x = new ProjectExpression(aprojplan, 0, 0, C);
+        x.neverUseForRealSetFieldSchema(new LogicalFieldSchema(null, null,
+                DataType.INTEGER, 1));
+        LogicalExpressionPlan bprojplan = new LogicalExpressionPlan();
+        ProjectExpression y = new ProjectExpression(bprojplan, 1, 0, C);
+        y.neverUseForRealSetFieldSchema(new LogicalFieldSchema(null, null,
+                DataType.INTEGER, 2));
+        mm.put(0, aprojplan);
+        mm.put(1, bprojplan);
+        
+        C.neverUseForRealSetSchema(cschema);
+        // Don't add it to the plan quite yet
+        
+        // D = filter
+        LogicalExpressionPlan filterPlan = new LogicalExpressionPlan();
+        LOFilter D = new LOFilter(lp, filterPlan);
+        ProjectExpression fy = new ProjectExpression(filterPlan, 0, 1, D);
+        fy.neverUseForRealSetFieldSchema(new LogicalFieldSchema(null, null,
+                DataType.INTEGER, 2));
+        ConstantExpression fc = new ConstantExpression(filterPlan, new Integer(0), new LogicalFieldSchema(null, null, DataType.INTEGER));
+        new EqualExpression(filterPlan, fy, fc);
+        
+        D.neverUseForRealSetSchema(cschema);
+        // Connect D to B, since the transform has happened.
+        lp.add(D);
+        lp.connect(B, D);
+        
+        // Now add in C, connected to A and D.
+        lp.add(C);
+        lp.connect(A, C);
+        lp.connect(D, C);
+        
+        changedPlan = new LogicalPlan();
+        changedPlan.add(D);
+        changedPlan.add(C);
+        changedPlan.connect(D, C);
+    }
+    
+    private static class SillySameVisitor extends AllSameRalationalNodesVisitor {
+        StringBuffer buf = new StringBuffer();
+
+        SillySameVisitor(OperatorPlan plan) {
+            super(plan, new DepthFirstWalker(plan));
+        }
+        
+        @Override
+        protected void execute(LogicalRelationalOperator op) throws IOException {
+            buf.append(op.getName());
+            buf.append(" ");
+        }
+        
+        @Override
+        public String toString() {
+            return buf.toString();
+        }
+        
+    }
+    
+    // Test that the AllSameVisitor calls execute on every node
+    // in the plan.
+    @Test
+    public void testAllSameVisitor() throws IOException {
+        SillySameVisitor v = new SillySameVisitor(lp);
+        v.visit();
+        assertTrue("LOLoad LOJoin LOLoad LOFilter ".equals(v.toString()) ||
+            "LOLoad LOFilter LOJoin LOLoad ".equals(v.toString()));
+        
+    }
+    
+    private static class SillyExpressionVisitor extends LogicalExpressionVisitor {
+        StringBuffer buf;
+
+        protected SillyExpressionVisitor(OperatorPlan p, StringBuffer b) {
+            super(p, new DepthFirstWalker(p));
+            buf = b;
+        }
+        
+        @Override
+        public void visit(AndExpression andExpr) throws IOException {
+            buf.append("and ");
+        }
+        
+        @Override
+        public void visit(EqualExpression equal) throws IOException {
+            buf.append("equal ");
+        }
+        
+        @Override
+        public void visit(ProjectExpression p) throws IOException {
+            buf.append("proj ");
+        }
+        
+        @Override
+        public void visit(ConstantExpression c) throws IOException {
+            buf.append("const ");
+        }
+    }
+    
+    private static class SillyAllExpressionVisitor extends AllExpressionVisitor {
+        StringBuffer buf = new StringBuffer();
+
+        public SillyAllExpressionVisitor(OperatorPlan plan) {
+            super(plan, new DepthFirstWalker(plan));
+        }
+     
+
+        @Override
+        protected LogicalExpressionVisitor getVisitor(LogicalExpressionPlan expr) {
+            return new SillyExpressionVisitor(expr, buf);
+        }   
+        
+        @Override
+        public String toString() {
+            return buf.toString();
+        }
+    }
+    
+    // Test that the AllExpressionVisitor executes on every
+    // expression in the plan
+    @Test
+    public void testAllExpressionVisitor() throws IOException {
+        SillyAllExpressionVisitor v = new SillyAllExpressionVisitor(lp);
+        v.visit();
+        assertTrue("proj proj equal proj const ".equals(v.toString()) ||
+            "equal proj const proj proj ".equals(v.toString()));
+    }
+    
+    // Test that schemas are patched up after a transform
+    @Test
+    public void testSchemaPatcher() throws IOException {
+        SchemaPatcher patcher = new SchemaPatcher();
+        patcher.transformed(lp, changedPlan);
+        
+        // Check that the filter now has the proper schema.
+        List<Operator> roots = changedPlan.getSources();
+        assertEquals(1, roots.size());
+        LOFilter D = (LOFilter)roots.get(0);
+        assertNotNull(D);
+        LogicalSchema dschema = D.getSchema();
+        assertEquals(1, dschema.size());
+        LogicalSchema.LogicalFieldSchema y = dschema.getField(0);
+        assertEquals("y", y.alias);
+        assertEquals(2, y.uid);
+    }
+    
+    // Test that projections are patched up after a transform
+    @Test
+    public void testProjectionPatcher() throws IOException {
+        ProjectionPatcher patcher = new ProjectionPatcher();
+        patcher.transformed(lp, changedPlan);
+        
+        // Check that the projections in filter are now set properly
+        List<Operator> roots = changedPlan.getSources();
+        assertEquals(1, roots.size());
+        LOFilter D = (LOFilter)roots.get(0);
+        assertNotNull(D);
+        LogicalExpressionPlan filterPlan = D.getFilterPlan();
+        List<Operator> leaves = filterPlan.getSinks();
+        assertEquals(2, leaves.size());
+        ProjectExpression proj = null;
+        for (Operator leaf : leaves) {
+            if (leaf instanceof ProjectExpression) {
+                proj = (ProjectExpression)leaf;
+                break;
+            }
+        }
+        assertNotNull(proj);
+        assertEquals(0, proj.getInputNum());
+        assertEquals(0, proj.getColNum());
+    }
+
+}
+



Mime
View raw message