pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ga...@apache.org
Subject svn commit: r909165 [4/6] - in /hadoop/pig/trunk: src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/ src/org/apache/pig/experimental/logical/ src/org/apache/pig/experimental/logical/expression/ src/org/apache/pig/experimental/logica...
Date Thu, 11 Feb 2010 22:12:43 GMT
Modified: hadoop/pig/trunk/src/org/apache/pig/experimental/plan/BaseOperatorPlan.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/plan/BaseOperatorPlan.java?rev=909165&r1=909164&r2=909165&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/plan/BaseOperatorPlan.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/plan/BaseOperatorPlan.java Thu Feb 11 22:12:36 2010
@@ -60,9 +60,9 @@
      * @return all operators in the plan that have no predecessors, or
      * an empty list if the plan is empty.
      */
-    public List<Operator> getRoots() {
+    public List<Operator> getSources() {
         if (roots.size() == 0 && ops.size() > 0) {
-            for (Operator op : ops) {
+            for (Operator op : ops) {               
                 if (toEdges.get(op) == null) {
                     roots.add(op);
                 }
@@ -76,7 +76,7 @@
      * @return all operators in the plan that have no successors, or
      * an empty list if the plan is empty.
      */
-    public List<Operator> getLeaves() {
+    public List<Operator> getSinks() {
         if (leaves.size() == 0 && ops.size() > 0) {
             for (Operator op : ops) {
                 if (fromEdges.get(op) == null) {
@@ -200,5 +200,62 @@
     public Iterator<Operator> getOperators() {
         return ops.iterator();
     }
-
+   
+    public boolean isEqual(OperatorPlan other) {
+        return isEqual(this, other);
+    }
+    
+    private static boolean checkPredecessors(Operator op1,
+                                      Operator op2) {
+        try {
+            List<Operator> preds = op1.getPlan().getPredecessors(op1);
+            List<Operator> otherPreds = op2.getPlan().getPredecessors(op2);
+            if (preds == null && otherPreds == null) {
+                // intentionally blank
+            } else if (preds == null || otherPreds == null) {
+                return false;
+            } else {
+                if (preds.size() != otherPreds.size()) return false;
+                for (int i = 0; i < preds.size(); i++) {
+                    Operator p1 = preds.get(i);
+                    Operator p2 = otherPreds.get(i);
+                    if (!p1.isEqual(p2)) return false;
+                    if (!checkPredecessors(p1, p2)) return false;
+                }
+            }
+            return true;
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }   
+    
+    protected static boolean isEqual(OperatorPlan p1, OperatorPlan p2) {
+        if (p1 == p2) {
+            return true;
+        }
+        
+        if (p1 != null && p2 != null) {
+            List<Operator> leaves = p1.getSinks();
+            List<Operator> otherLeaves = p2.getSinks();
+            if (leaves.size() != otherLeaves.size()) return false;
+            // Must find some leaf that is equal to each leaf.  There is no
+            // guarantee leaves will be returned in any particular order.
+            boolean foundAll = true;
+            for (Operator op1 : leaves) {
+                boolean foundOne = false;
+                for (Operator op2 : otherLeaves) {
+                    if (op1.isEqual(op2) && checkPredecessors(op1, op2)) {
+                        foundOne = true;
+                        break;
+                    }
+                }
+                foundAll &= foundOne;
+                if (!foundAll) return false;
+            }
+            return foundAll;
+        }
+        
+        return false;
+    }
+    
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/experimental/plan/DependencyOrderWalker.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/plan/DependencyOrderWalker.java?rev=909165&r1=909164&r2=909165&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/plan/DependencyOrderWalker.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/plan/DependencyOrderWalker.java Thu Feb 11 22:12:36 2010
@@ -64,7 +64,7 @@
 
         List<Operator> fifo = new ArrayList<Operator>();
         Set<Operator> seen = new HashSet<Operator>();
-        List<Operator> leaves = plan.getLeaves();
+        List<Operator> leaves = plan.getSinks();
         if (leaves == null) return;
         for (Operator op : leaves) {
             doAllPredecessors(op, seen, fifo);

Modified: hadoop/pig/trunk/src/org/apache/pig/experimental/plan/DepthFirstWalker.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/plan/DepthFirstWalker.java?rev=909165&r1=909164&r2=909165&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/plan/DepthFirstWalker.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/plan/DepthFirstWalker.java Thu Feb 11 22:12:36 2010
@@ -45,7 +45,7 @@
      */
     @Override
     public void walk(PlanVisitor visitor) throws IOException {
-        List<Operator> roots = plan.getRoots();
+        List<Operator> roots = plan.getSources();
         Set<Operator> seen = new HashSet<Operator>();
 
         depthFirst(null, roots, seen, visitor);

Modified: hadoop/pig/trunk/src/org/apache/pig/experimental/plan/Operator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/plan/Operator.java?rev=909165&r1=909164&r2=909165&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/plan/Operator.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/plan/Operator.java Thu Feb 11 22:12:36 2010
@@ -18,6 +18,7 @@
 
 package org.apache.pig.experimental.plan;
 
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -26,6 +27,7 @@
     protected String name;
     protected OperatorPlan plan; // plan that contains this operator
     protected Map<String, Object> annotations;
+    protected final int hashPrime = 31;
 
     public Operator(String n, OperatorPlan p) {
         name = n;
@@ -36,8 +38,9 @@
     /**
      * Accept a visitor at this node in the graph.
      * @param v Visitor to accept.
+     * @throws IOException 
      */
-    public abstract void accept(PlanVisitor v);
+    public abstract void accept(PlanVisitor v) throws IOException;
 
     public String getName() {
         return name;
@@ -70,4 +73,12 @@
         return annotations.get(key);
     }
 
+    /**
+     * This is like a shallow equals comparison.
+     * It returns true if two operators have equivalent properties even if they are 
+     * different objects. Here properties mean equivalent plan and equivalent name.
+     * @param operator
+     * @return true if two object have equivalent properties, else false
+     */
+    public abstract boolean isEqual(Operator operator);
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/experimental/plan/OperatorPlan.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/plan/OperatorPlan.java?rev=909165&r1=909164&r2=909165&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/plan/OperatorPlan.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/plan/OperatorPlan.java Thu Feb 11 22:12:36 2010
@@ -19,14 +19,9 @@
 package org.apache.pig.experimental.plan;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Set;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.pig.impl.util.Pair;
 
 public interface OperatorPlan {
@@ -41,20 +36,20 @@
      * @return all operators in the plan that have no predecessors, or
      * an empty list if the plan is empty.
      */
-    public List<Operator> getRoots();
+    public List<Operator> getSources();
 
     /**
      * Get all operators in the plan that have no successors.
      * @return all operators in the plan that have no successors, or
      * an empty list if the plan is empty.
      */
-    public List<Operator> getLeaves();
+    public List<Operator> getSinks();
 
     /**
      * For a given operator, get all operators immediately before it in the
      * plan.
      * @param op operator to fetch predecessors of
-     * @return list of all operators imeediately before op, or an empty list
+     * @return list of all operators immediately before op, or an empty list
      * if op is a root.
      * @throws IOException if op is not in the plan.
      */
@@ -63,7 +58,7 @@
     /**
      * For a given operator, get all operators immediately after it.
      * @param op operator to fetch successors of
-     * @return list of all operators imeediately after op, or an empty list
+     * @return list of all operators immediately after op, or an empty list
      * if op is a leaf.
      * @throws IOException if op is not in the plan.
      */
@@ -117,4 +112,13 @@
      * @return an iterator of all operators in this plan
      */
     public Iterator<Operator> getOperators();
+    
+    /**
+     * This is like a shallow comparison.
+     * Two plans are equal if they have equivalent operators and equivalent 
+     * structure.
+     * @param other object to compare
+     * @return boolean if both the plans are equivalent
+     */
+    public boolean isEqual( OperatorPlan other );
 }

Added: hadoop/pig/trunk/src/org/apache/pig/experimental/plan/OperatorSubPlan.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/plan/OperatorSubPlan.java?rev=909165&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/plan/OperatorSubPlan.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/plan/OperatorSubPlan.java Thu Feb 11 22:12:36 2010
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.experimental.plan;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.pig.impl.util.Pair;
+
+/**
+ * Class to represent a view of a plan. The view contains a subset of the plan.
+ * All the operators returned from the view are the same objects to the operators
+ * in its base plan. It is used to represent match results. 
+ *
+ */
+public class OperatorSubPlan implements OperatorPlan {
+
+    private OperatorPlan basePlan;
+    private List<Operator> roots;
+    private List<Operator> leaves;
+    private Set<Operator> operators;
+
+    public OperatorSubPlan(OperatorPlan base) {
+        basePlan = base;
+        roots = new ArrayList<Operator>();
+        leaves = new ArrayList<Operator>();
+        operators = new HashSet<Operator>();
+    }    	    	
+    
+    public OperatorPlan getBasePlan() {
+        return basePlan;
+    }
+    
+    public void add(Operator op) {
+        operators.add(op);
+    }
+
+    public void connect(Operator from, int fromPos, Operator to, int toPos) {
+        throw new UnsupportedOperationException("connect() can not be called on OperatorSubPlan");
+    }
+
+    public void connect(Operator from, Operator to) {
+        throw new UnsupportedOperationException("connect() can not be called on OperatorSubPlan");
+    }
+
+    public Pair<Integer, Integer> disconnect(Operator from, Operator to) throws IOException {
+        throw new UnsupportedOperationException("disconnect() can not be called on OperatorSubPlan");
+    }
+
+    public List<Operator> getSinks() {
+        if (leaves.size() == 0 && operators.size() > 0) {
+            for (Operator op : operators) {       
+                try {
+                    if (getSuccessors(op) == null) {
+                        leaves.add(op);
+                    }
+                }catch(Exception e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        }
+        return leaves;
+    }
+
+    public Iterator<Operator> getOperators() {
+        return operators.iterator();
+    }
+
+    public List<Operator> getPredecessors(Operator op) throws IOException {
+        List<Operator> l = basePlan.getPredecessors(op);
+        List<Operator> list = null;
+        if (l != null) {
+            for(Operator oper: l) {
+                if (operators.contains(oper)) {
+                    if (list == null) {
+                        list = new ArrayList<Operator>();
+                    }
+                    list.add(oper);
+                }
+            }
+        }
+        
+        return list;
+    }
+
+    public List<Operator> getSources() {
+        if (roots.size() == 0 && operators.size() > 0) {
+            for (Operator op : operators) {       
+                try {
+                    if (getPredecessors(op) == null) {
+                        roots.add(op);
+                    }
+                }catch(Exception e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        }
+        return roots;
+    }
+
+    public List<Operator> getSuccessors(Operator op) throws IOException {
+        List<Operator> l = basePlan.getSuccessors(op);
+        List<Operator> list = null;
+        if (l != null) {
+            for(Operator oper: l) {
+                if (operators.contains(oper)) {
+                    if (list == null) {
+                        list = new ArrayList<Operator>();
+                    }
+                    list.add(oper);
+                }
+            }
+        }
+        
+        return list;
+    }
+
+    public void remove(Operator op) throws IOException {
+        operators.remove(op);
+        leaves.clear();
+        roots.clear();
+    }
+
+    public int size() {
+        return operators.size();
+    }
+
+    @Override
+    public boolean isEqual(OperatorPlan other) {		
+        return BaseOperatorPlan.isEqual(this, other);
+    }    
+}

Modified: hadoop/pig/trunk/src/org/apache/pig/experimental/plan/PlanEdge.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/plan/PlanEdge.java?rev=909165&r1=909164&r2=909165&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/plan/PlanEdge.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/plan/PlanEdge.java Thu Feb 11 22:12:36 2010
@@ -83,7 +83,8 @@
         Operator keeper = null;
         for (int j = 0; i.hasNext(); j++) {
             keeper = i.next();
-            if (keeper.equals(value)) {
+            //if (keeper.equals(value)) {
+            if (keeper == value) {
                 i.remove();
                 index = j;
                 break;

Modified: hadoop/pig/trunk/src/org/apache/pig/experimental/plan/PlanVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/plan/PlanVisitor.java?rev=909165&r1=909164&r2=909165&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/plan/PlanVisitor.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/plan/PlanVisitor.java Thu Feb 11 22:12:36 2010
@@ -31,6 +31,9 @@
  */
 public abstract class PlanVisitor {
 
+    // TODO Remove this scope value
+    final protected static String DEFAULT_SCOPE = "scope";
+    
     protected OperatorPlan plan;
 
     /**

Modified: hadoop/pig/trunk/src/org/apache/pig/experimental/plan/ReverseDependencyOrderWalker.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/plan/ReverseDependencyOrderWalker.java?rev=909165&r1=909164&r2=909165&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/plan/ReverseDependencyOrderWalker.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/plan/ReverseDependencyOrderWalker.java Thu Feb 11 22:12:36 2010
@@ -58,7 +58,7 @@
 
         List<Operator> fifo = new ArrayList<Operator>();
         Set<Operator> seen = new HashSet<Operator>();
-        List<Operator> roots = plan.getRoots();
+        List<Operator> roots = plan.getSources();
         if (roots == null) return;
         for (Operator op : roots) {
             doAllSuccessors(op, seen, fifo);

Added: hadoop/pig/trunk/src/org/apache/pig/experimental/plan/SubtreeDependencyOrderWalker.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/plan/SubtreeDependencyOrderWalker.java?rev=909165&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/plan/SubtreeDependencyOrderWalker.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/plan/SubtreeDependencyOrderWalker.java Thu Feb 11 22:12:36 2010
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.experimental.plan;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public class SubtreeDependencyOrderWalker extends DependencyOrderWalker {
+    private Operator startNode;
+    
+    public SubtreeDependencyOrderWalker(OperatorPlan plan) {
+        super(plan);            
+    }
+    
+    public SubtreeDependencyOrderWalker(OperatorPlan plan, Operator startNode) {
+        super(plan);            
+        this.startNode = startNode;
+    }
+    
+    public void walk(PlanVisitor visitor) throws IOException {          
+        List<Operator> fifo = new ArrayList<Operator>();
+        Set<Operator> seen = new HashSet<Operator>();
+
+        // get all predecessors of startNode
+        doAllPredecessors(startNode, seen, fifo);           
+
+        for (Operator op: fifo) {
+            op.accept(visitor);
+        }
+    }
+}

Modified: hadoop/pig/trunk/src/org/apache/pig/experimental/plan/optimizer/PlanOptimizer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/plan/optimizer/PlanOptimizer.java?rev=909165&r1=909164&r2=909165&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/plan/optimizer/PlanOptimizer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/plan/optimizer/PlanOptimizer.java Thu Feb 11 22:12:36 2010
@@ -37,7 +37,7 @@
  * Each rule is has two parts:  a pattern and and associated transformer.
  * Transformers have two important functions:   check(), and transform().
  * The pattern describes a pattern of node types that the optimizer will
- * look ot match.  If that match is found anywhere in the plan, then check()
+ * look to match.  If that match is found anywhere in the plan, then check()
  * will be called.  check() allows the rule to look more in depth at the 
  * matched pattern and decide whether the rule should be run or not.  For
  * example, one might design a rule to push filters above join that would
@@ -74,7 +74,13 @@
         maxIter = (iterations < 1 ? defaultIterations : iterations);
     }
     
-    public void addPlanTransformListener(PlanTransformListener listener) {
+    /**
+     * Adds a listener to the optimization.  This listener will be fired 
+     * after each rule transforms a plan.  Listeners are guaranteed to
+     * be fired in the order they are added.
+     * @param listener
+     */
+    protected void addPlanTransformListener(PlanTransformListener listener) {
         listeners.add(listener);
     }
     
@@ -87,6 +93,7 @@
      * @throws OptimizerException
      */
     public void optimize() throws IOException {
+
         for (Set<Rule> rs : ruleSets) {
             boolean sawMatch = false;
             int numIterations = 0;
@@ -101,7 +108,7 @@
                                 sawMatch = true;
                                 transformer.transform(m);
                                 for(PlanTransformListener l: listeners) {
-                                    l.transformed(plan, transformer);
+                                    l.transformed(plan, transformer.reportChanges());
                                 }
                             }
                         }
@@ -109,5 +116,5 @@
                 }
             } while(sawMatch && ++numIterations < maxIter);
         }
-    }
+    }    
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/experimental/plan/optimizer/PlanTransformListener.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/plan/optimizer/PlanTransformListener.java?rev=909165&r1=909164&r2=909165&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/plan/optimizer/PlanTransformListener.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/plan/optimizer/PlanTransformListener.java Thu Feb 11 22:12:36 2010
@@ -18,6 +18,8 @@
 
 package org.apache.pig.experimental.plan.optimizer;
 
+import java.io.IOException;
+
 import org.apache.pig.experimental.plan.OperatorPlan;
 
 /**
@@ -26,9 +28,10 @@
 public interface PlanTransformListener {
     /**
      * the listener that is notified after a plan is transformed
-     * @param plan  the plan that is transformed
-     * @param transformer the transformer that transforms this plan
+     * @param fp  the full plan that has been transformed
+     * @param tp  a plan containing only the operators that have been transformed
+     * @throws IOException 
      */
-    public void transformed(OperatorPlan plan, Transformer transformer);
+    public void transformed(OperatorPlan fp, OperatorPlan tp) throws IOException;
 
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/experimental/plan/optimizer/Rule.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/plan/optimizer/Rule.java?rev=909165&r1=909164&r2=909165&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/plan/optimizer/Rule.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/plan/optimizer/Rule.java Thu Feb 11 22:12:36 2010
@@ -28,33 +28,15 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.pig.experimental.plan.BaseOperatorPlan;
 import org.apache.pig.experimental.plan.Operator;
 import org.apache.pig.experimental.plan.OperatorPlan;
-import org.apache.pig.experimental.plan.PlanVisitor;
-import org.apache.pig.impl.util.Pair;
+import org.apache.pig.experimental.plan.OperatorSubPlan;
 
 /**
  * Rules describe a pattern of operators.  They also reference a Transformer.
  * If the pattern of operators is found one or more times in the provided plan,
  * then the optimizer will use the associated Transformer to transform the
  * plan.
- *
- * The syntax for rules is  (x(y, z)) where x is a base node, and y and z are
- * node that precede x.  So the graph for the Pig Latin script:
- * A = load;
- * B = load;
- * C = join A, B;
- * D = filter C
- * would be:  filter(join(load, load));
- * 
- * Rules with multiple end points (leaves) are expressed as (x(), y()) where
- * both x and y are leaves.
- * 
- * It is expected that the name given to each node in the pattern exactly
- * matches the name of the class of the node in the Plan to be matched.  So
- * to build a rule that matched a join followed by a filter in the logical
- * plan, the pattern would be LOFilter(LOJoin).
  */
 public abstract class Rule {
 
@@ -121,7 +103,7 @@
     public List<OperatorPlan> match(OperatorPlan plan) {
         currentPlan = plan;
         
-        List<Operator> leaves = pattern.getLeaves();
+        List<Operator> leaves = pattern.getSinks();
         
         Iterator<Operator> iter = plan.getOperators();
         List<OperatorPlan> matchedList = new ArrayList<OperatorPlan>();       
@@ -162,11 +144,11 @@
                                 siblings = plan.getSuccessors(s);
                             }else{
                                 // for a root, we get its siblings by getting all roots
-                                siblings = plan.getRoots();
+                                siblings = plan.getSources();
                             }
                         }catch(IOException e) {
                             // not going to happen
-			    throw new RuntimeException(e);
+                            throw new RuntimeException(e);
                         }
                         int index = siblings.indexOf(op);
                         if (siblings.size()-index < leaves.size()) {
@@ -234,21 +216,14 @@
     }
     
  
-    private class PatternMatchOperatorPlan implements OperatorPlan {
-        OperatorPlan parent;
-        List<Operator> roots;
-        List<Operator> leaves;
-        Set<Operator> operators;
-
-        public PatternMatchOperatorPlan(OperatorPlan parent) {
-            this.parent = parent;
-            roots = new ArrayList<Operator>();
-            leaves = new ArrayList<Operator>();
-            operators = new HashSet<Operator>();
+    private class PatternMatchOperatorPlan extends OperatorSubPlan {
+        
+        public PatternMatchOperatorPlan(OperatorPlan basePlan) {
+            super(basePlan);
         }    	    	
         
         protected boolean check(List<Operator> planOps) throws IOException {
-            List<Operator> patternOps = pattern.getLeaves();
+            List<Operator> patternOps = pattern.getSinks();
             if (planOps.size() != patternOps.size()) {
                 return false;
             }
@@ -258,10 +233,13 @@
                 if (!check(planOps.get(i), patternOps.get(i), s)) {
                     return false;
                 }
-                operators.addAll(s);
+                Iterator<Operator> iter = s.iterator();
+                while(iter.hasNext()) {
+                    add(iter.next());
+                }
             }
             
-            if (operators.size() == pattern.size()) {
+            if (size() == pattern.size()) {
                 return true;
             }
             
@@ -283,13 +261,9 @@
             if (!match(planOp, patternOp)) {
                 return false;
             }
-            
-            if (pattern.getLeaves().contains(patternOp) && !leaves.contains(planOp)) {
-                leaves.add(planOp);
-            }
-            
+                 
             // check if their predecessors match
-            List<Operator> preds1 = parent.getPredecessors(planOp);
+            List<Operator> preds1 = getBasePlan().getPredecessors(planOp);
             List<Operator> preds2 = pattern.getPredecessors(patternOp);
             if (preds1 == null && preds2 != null) {
                 return false;
@@ -300,27 +274,25 @@
             }
             
             // we've reached the root of the pattern, so a match is found
-            if (preds2 == null || preds2.size() == 0) {
-                if (!roots.contains(planOp)) {
-                    roots.add(planOp);
-                }
+            if (preds2 == null || preds2.size() == 0) {       
                 opers.push(planOp);
                 return true;
             }
             
-            int index = 0;
-            boolean match = true;
+            int index = 0;            
             // look for predecessors 
             while(index < preds1.size()) {
+                boolean match = true;
                 if (match(preds1.get(index), preds2.get(0))) {
                     if ( (preds1.size() - index) < preds2.size()) {
                         return false;
                     }
-                                        
+                             
+                    int oldSize = opers.size();
                     for(int i=0; i<preds2.size(); i++) {
                         if (!check(preds1.get(index+i), preds2.get(i), opers)) {
-                            for(int j=0; j<i; j++) {
-                                opers.pop();
+                            for(int j=opers.size(); j>oldSize; j--) {
+                                opers.pop();                                
                             }
                             match = false;
                             break;
@@ -335,67 +307,6 @@
             }
             
             return false;
-        }
-        
-        public void add(Operator op) {
-            throw new UnsupportedOperationException("add() can not be called on PatternMatchOperatorPlan");
-        }
-
-        public void connect(Operator from, int fromPos, Operator to, int toPos) {
-            throw new UnsupportedOperationException("connect() can not be called on PatternMatchOperatorPlan");
-        }
-
-        public void connect(Operator from, Operator to) {
-            throw new UnsupportedOperationException("connect() can not be called on PatternMatchOperatorPlan");
-        }
-
-        public Pair<Integer, Integer> disconnect(Operator from, Operator to) throws IOException {
-            throw new UnsupportedOperationException("disconnect() can not be called on PatternMatchOperatorPlan");
-        }
-
-        public List<Operator> getLeaves() {
-            return leaves;
-        }
-
-        public Iterator<Operator> getOperators() {
-            return operators.iterator();
-        }
-
-        public List<Operator> getPredecessors(Operator op) throws IOException {
-            List<Operator> l = parent.getPredecessors(op);
-            List<Operator> list = new ArrayList<Operator>();
-            for(Operator oper: l) {
-                if (operators.contains(oper)) {
-                    list.add(oper);
-                }
-            }
-            
-            return list;
-        }
-
-        public List<Operator> getRoots() {
-            return roots;
-        }
-
-        public List<Operator> getSuccessors(Operator op) throws IOException {
-            List<Operator> l = parent.getSuccessors(op);
-            List<Operator> list = new ArrayList<Operator>();
-            for(Operator oper: l) {
-                if (operators.contains(oper)) {
-                    list.add(oper);
-                }
-            }
-            
-            return list;
-        }
-
-        public void remove(Operator op) throws IOException {
-            throw new UnsupportedOperationException("remove() can not be called on PatternMatchOperatorPlan");
-        }
-
-        public int size() {
-            return operators.size();
-        }
-        
+        }    
     }
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/experimental/plan/optimizer/Transformer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/plan/optimizer/Transformer.java?rev=909165&r1=909164&r2=909165&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/plan/optimizer/Transformer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/plan/optimizer/Transformer.java Thu Feb 11 22:12:36 2010
@@ -43,5 +43,14 @@
      * @throws IOException
      */
     public abstract void transform(OperatorPlan matched) throws IOException;
+    
+    /**
+     * Report what parts of the tree were transformed.  This is so that 
+     * listeners can know which part of the tree to visit and modify
+     * schemas, annotations, etc.  So any nodes that were removed need
+     * will not be in this plan, only nodes that were added or moved.
+     * @return OperatorPlan that describes just the changed nodes.
+     */
+    public abstract OperatorPlan reportChanges();
 
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/io/FileSpec.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/io/FileSpec.java?rev=909165&r1=909164&r2=909165&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/io/FileSpec.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/io/FileSpec.java Thu Feb 11 22:12:36 2010
@@ -20,7 +20,6 @@
 import java.io.Serializable;
 
 import org.apache.pig.FuncSpec;
-import org.apache.pig.impl.PigContext;
 
 
 /**
@@ -59,4 +58,20 @@
     public int getSize() {
         throw new UnsupportedOperationException("File Size not implemented yet");
     }
+    
+    @Override
+    public boolean equals(Object other) {
+        if (other != null && other instanceof FileSpec) {
+            FileSpec ofs = (FileSpec)other;
+            if (!fileName.equals(ofs.fileName)) return false;
+            return funcSpec.equals(ofs.funcSpec);
+        } else {
+            return false;
+        }
+    }
+    
+    @Override
+    public int hashCode() {
+        return getFuncName().hashCode() + fileName.hashCode();
+    }
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/BinaryExpressionOperator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/BinaryExpressionOperator.java?rev=909165&r1=909164&r2=909165&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/BinaryExpressionOperator.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/BinaryExpressionOperator.java Thu Feb 11 22:12:36 2010
@@ -20,11 +20,10 @@
 
 import java.util.List;
 
-import org.apache.pig.impl.plan.PlanVisitor;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+//import org.apache.commons.logging.Log;
+//import org.apache.commons.logging.LogFactory;
 
 /**
  * This abstract class represents the logical Binary Expression Operator
@@ -35,7 +34,7 @@
 
 public abstract class BinaryExpressionOperator extends ExpressionOperator {
     private static final long serialVersionUID = 2L;
-    private static Log log = LogFactory.getLog(BinaryExpressionOperator.class);
+    // private static Log log = LogFactory.getLog(BinaryExpressionOperator.class);
 
     /**
      * @param plan

Added: hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalFilterAboveForeach.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalFilterAboveForeach.java?rev=909165&view=auto
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalFilterAboveForeach.java (added)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalFilterAboveForeach.java Thu Feb 11 22:12:36 2010
@@ -0,0 +1,1204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.test;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.pig.data.DataType;
+import org.apache.pig.experimental.logical.expression.ConstantExpression;
+import org.apache.pig.experimental.logical.expression.EqualExpression;
+import org.apache.pig.experimental.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.experimental.logical.expression.ProjectExpression;
+import org.apache.pig.experimental.logical.optimizer.ProjectionPatcher;
+import org.apache.pig.experimental.logical.optimizer.SchemaPatcher;
+import org.apache.pig.experimental.logical.optimizer.UidStamper;
+import org.apache.pig.experimental.logical.relational.LOFilter;
+import org.apache.pig.experimental.logical.relational.LOForEach;
+import org.apache.pig.experimental.logical.relational.LOGenerate;
+import org.apache.pig.experimental.logical.relational.LOInnerLoad;
+import org.apache.pig.experimental.logical.relational.LOLoad;
+import org.apache.pig.experimental.logical.relational.LOStore;
+import org.apache.pig.experimental.logical.relational.LogicalPlan;
+import org.apache.pig.experimental.logical.relational.LogicalSchema;
+import org.apache.pig.experimental.logical.rules.FilterAboveForeach;
+import org.apache.pig.experimental.plan.Operator;
+import org.apache.pig.experimental.plan.OperatorPlan;
+import org.apache.pig.experimental.plan.optimizer.PlanOptimizer;
+import org.apache.pig.experimental.plan.optimizer.PlanTransformListener;
+import org.apache.pig.experimental.plan.optimizer.Rule;
+import junit.framework.TestCase;
+
+public class TestExperimentalFilterAboveForeach extends TestCase {
+    
+    public void testSimple() throws Exception {
+        
+        // Plan here is 
+        // Load (name, cuisines{t:(name)}) -> foreach gen name,flatten(cuisines) 
+        // -> filter name == 'joe' --> stor
+        
+        LogicalPlan plan = null;
+        LOLoad load = null;
+        LOForEach foreach = null;
+        LOFilter filter = null;
+        LOStore stor = null;
+        
+        plan = new LogicalPlan();
+        
+        LogicalSchema schema = new LogicalSchema();
+        schema.addField(new LogicalSchema.LogicalFieldSchema("name", null, DataType.CHARARRAY));
+        LogicalSchema bagSchema = new LogicalSchema();
+        LogicalSchema bagTupleSchema = new LogicalSchema();
+        bagTupleSchema.addField( new LogicalSchema.LogicalFieldSchema("name", null, DataType.CHARARRAY) );
+        bagSchema.addField( new LogicalSchema.LogicalFieldSchema( "t", bagTupleSchema, DataType.TUPLE ) );
+        schema.addField(new LogicalSchema.LogicalFieldSchema("cuisines", bagSchema, DataType.BAG));
+        
+        load = new LOLoad(null, schema, plan);
+        load.setAlias("A");
+        plan.add(load);
+        
+        foreach = new LOForEach(plan);
+        
+        LogicalPlan innerPlan = new LogicalPlan();
+        LOInnerLoad innerLoad = new LOInnerLoad(innerPlan, foreach, 0);
+        innerPlan.add(innerLoad);
+        
+        LOInnerLoad innerLoad2 = new LOInnerLoad(innerPlan, foreach, 1);
+        innerPlan.add(innerLoad2);
+        
+        LogicalExpressionPlan namePrj = new LogicalExpressionPlan();        
+        ProjectExpression prjName = new ProjectExpression(namePrj, DataType.CHARARRAY, 0, 0);
+        namePrj.add(prjName);
+        
+        LogicalExpressionPlan cuisinesPrj = new LogicalExpressionPlan();
+        ProjectExpression prjCuisines = new ProjectExpression(cuisinesPrj, DataType.BAG, 1, 0);
+        cuisinesPrj.add(prjCuisines);
+        
+        List<LogicalExpressionPlan> expPlans = new ArrayList<LogicalExpressionPlan>();
+        expPlans.add(namePrj);
+        expPlans.add(cuisinesPrj);
+        
+        boolean flatten[] = new boolean[2];
+        flatten[0] = false;
+        flatten[1] = true;
+        
+        LOGenerate generate = new LOGenerate(innerPlan, expPlans, flatten);        
+        innerPlan.add(generate);
+        innerPlan.connect(innerLoad, generate);
+        innerPlan.connect(innerLoad2, generate);
+        
+        foreach.setInnerPlan(innerPlan);
+        foreach.setAlias("B");
+        plan.add(foreach);
+        
+        plan.connect(load, foreach);
+        
+        filter = new LOFilter(plan);
+        LogicalExpressionPlan filterPlan = new LogicalExpressionPlan();
+        ProjectExpression namePrj2 = new ProjectExpression(filterPlan, DataType.CHARARRAY, 0, 0);
+        filterPlan.add(namePrj2);
+        ConstantExpression constExp = new ConstantExpression(filterPlan, DataType.CHARARRAY, "joe");
+        filterPlan.add(constExp);
+        EqualExpression equal = new EqualExpression(filterPlan, namePrj2, constExp);
+        filterPlan.add(equal);
+        
+        filter.setFilterPlan(filterPlan);
+        filter.setAlias("C");
+        plan.add(filter);
+        
+        plan.connect(foreach, filter);
+        
+        stor = new LOStore(plan);
+        stor.setAlias("D");
+        plan.add(stor);
+        plan.connect(filter,stor);
+        
+        try {
+            // Stamp everything with a Uid
+            UidStamper stamper = new UidStamper(plan);
+            stamper.visit();
+        }catch(Exception e) {
+            assertTrue("Failed to set a valid uid", false );
+        }
+        
+        
+        // run filter rule
+        Rule r = new FilterAboveForeach("FilterAboveFlatten");
+        Set<Rule> s = new HashSet<Rule>();
+        s.add(r);
+        List<Set<Rule>> ls = new ArrayList<Set<Rule>>();
+        ls.add(s);
+        
+        // Test Plan before optimizing
+        List<Operator> list = plan.getSinks();
+        assertTrue( list.contains(stor) );
+        
+        list = plan.getSources();
+        assertTrue( list.contains(load) );
+        
+        assertTrue( plan.getPredecessors(stor).contains(filter) ); 
+        assertEquals( 1, plan.getPredecessors(stor).size() );
+        
+        assertTrue( plan.getPredecessors(filter).contains(foreach) );
+        assertEquals( 1, plan.getPredecessors(filter).size() );
+        
+        assertTrue( foreach.getInnerPlan().getSinks().contains(generate) );
+        assertEquals( 1, foreach.getInnerPlan().getSinks().size() );
+        assertTrue( foreach.getInnerPlan().getSources().contains(innerLoad) );
+        assertTrue( foreach.getInnerPlan().getSources().contains(innerLoad2) );
+        assertEquals( 2, foreach.getInnerPlan().getSources().size() );
+        
+        // Run the optimizer
+        MyPlanOptimizer optimizer = new MyPlanOptimizer(plan, ls, 3);
+        optimizer.addPlanTransformListener(new ProjectionPatcher());
+        optimizer.addPlanTransformListener(new SchemaPatcher());
+        optimizer.optimize();
+        
+        // Test after optimization
+        list = plan.getSinks();
+        assertTrue( list.contains(stor) );
+        
+        list = plan.getSources();
+        assertTrue( list.contains(load) );
+        
+        assertTrue( plan.getPredecessors(stor).contains(foreach) ); 
+        assertEquals( 1, plan.getPredecessors(stor).size() );
+        
+        assertTrue( plan.getPredecessors(filter).contains(load) );
+        assertEquals( 1, plan.getPredecessors(filter).size() );
+        assertEquals( load.getSchema().getField(0).uid, namePrj2.getUid() );
+        assertEquals( namePrj2.getUid(), prjName.getUid() );
+        
+        assertTrue( plan.getPredecessors(foreach).contains(filter) );
+        assertEquals( 1, plan.getPredecessors(foreach).size() );
+        
+        assertTrue( foreach.getInnerPlan().getSinks().contains(generate) );
+        assertEquals( 1, foreach.getInnerPlan().getSinks().size() );
+        assertTrue( foreach.getInnerPlan().getSources().contains(innerLoad) );
+        assertTrue( foreach.getInnerPlan().getSources().contains(innerLoad2) );
+        assertEquals( 2, foreach.getInnerPlan().getSources().size() );
+    }
+    
+    public void testMultipleFilter() throws Exception {
+        
+        // Plan here is 
+        // Load (name, cuisines{t:(name)}) -> foreach gen name,flatten(cuisines) 
+        // -> filter $1 == 'joe' --> filter name == 'joe' --> stor
+        
+        LogicalPlan plan = null;
+        LOLoad load = null;
+        LOForEach foreach = null;
+        LOFilter filter = null;
+        LOStore stor = null;
+        
+        plan = new LogicalPlan();
+        
+        LogicalSchema schema = new LogicalSchema();
+        schema.addField(new LogicalSchema.LogicalFieldSchema("name", null, DataType.CHARARRAY));
+        LogicalSchema bagSchema = new LogicalSchema();
+        LogicalSchema bagTupleSchema = new LogicalSchema();
+        bagTupleSchema.addField( new LogicalSchema.LogicalFieldSchema("name", null, DataType.CHARARRAY) );
+        bagSchema.addField( new LogicalSchema.LogicalFieldSchema( "t", bagTupleSchema, DataType.TUPLE ) );
+        schema.addField(new LogicalSchema.LogicalFieldSchema("cuisines", bagSchema, DataType.BAG));
+        
+        load = new LOLoad(null, schema, plan);
+        load.setAlias("A");
+        plan.add(load);
+        
+        foreach = new LOForEach(plan);
+        
+        LogicalPlan innerPlan = new LogicalPlan();
+        LOInnerLoad innerLoad = new LOInnerLoad(innerPlan, foreach, 0);
+        innerPlan.add(innerLoad);
+        
+        LOInnerLoad innerLoad2 = new LOInnerLoad(innerPlan, foreach, 1);
+        innerPlan.add(innerLoad2);
+        
+        LogicalExpressionPlan namePrj = new LogicalExpressionPlan();        
+        ProjectExpression prjName = new ProjectExpression(namePrj, DataType.CHARARRAY, 0, 0);
+        namePrj.add(prjName);
+        
+        LogicalExpressionPlan cuisinesPrj = new LogicalExpressionPlan();
+        ProjectExpression prjCuisines = new ProjectExpression(cuisinesPrj, DataType.BAG, 1, 0);
+        cuisinesPrj.add(prjCuisines);
+        
+        List<LogicalExpressionPlan> expPlans = new ArrayList<LogicalExpressionPlan>();
+        expPlans.add(namePrj);
+        expPlans.add(cuisinesPrj);
+        
+        boolean flatten[] = new boolean[2];
+        flatten[0] = false;
+        flatten[1] = true;
+        
+        LOGenerate generate = new LOGenerate(innerPlan, expPlans, flatten);        
+        innerPlan.add(generate);
+        innerPlan.connect(innerLoad, generate);
+        innerPlan.connect(innerLoad2, generate);
+        
+        foreach.setInnerPlan(innerPlan);
+        foreach.setAlias("B");
+        plan.add(foreach);
+        
+        plan.connect(load, foreach);
+        
+        filter = new LOFilter(plan);
+        LogicalExpressionPlan filterPlan = new LogicalExpressionPlan();
+        ProjectExpression namePrj2 = new ProjectExpression(filterPlan, DataType.CHARARRAY, 0, 0);
+        filterPlan.add(namePrj2);
+        ConstantExpression constExp = new ConstantExpression(filterPlan, DataType.CHARARRAY, "joe");
+        filterPlan.add(constExp);
+        EqualExpression equal = new EqualExpression(filterPlan, namePrj2, constExp);
+        filterPlan.add(equal);
+        
+        filter.setFilterPlan(filterPlan);
+        filter.setAlias("C");
+        plan.add(filter);
+        
+        LOFilter filter2 = new LOFilter(plan);
+        LogicalExpressionPlan filter2Plan = new LogicalExpressionPlan();
+        ProjectExpression name2Prj2 = new ProjectExpression(filter2Plan, DataType.CHARARRAY, 0, 1);
+        filter2Plan.add(name2Prj2);
+        ConstantExpression const2Exp = new ConstantExpression(filter2Plan, DataType.CHARARRAY, "joe");
+        filter2Plan.add(const2Exp);
+        EqualExpression equal2 = new EqualExpression(filter2Plan, namePrj2, constExp);
+        filter2Plan.add(equal2);
+        
+        filter2.setFilterPlan(filter2Plan);
+        filter2.setAlias("C1");
+        plan.add(filter2);
+        
+        plan.connect(foreach, filter2);
+        plan.connect(filter2, filter);
+        
+        stor = new LOStore(plan);
+        stor.setAlias("D");
+        plan.add(stor);
+        plan.connect(filter,stor);
+        
+        try {
+            // Stamp everything with a Uid
+            UidStamper stamper = new UidStamper(plan);
+            stamper.visit();
+        }catch(Exception e) {
+            assertTrue("Failed to set a valid uid", false );
+        }
+        
+        
+        // run filter rule
+        Rule r = new FilterAboveForeach("FilterAboveFlatten");
+        Set<Rule> s = new HashSet<Rule>();
+        s.add(r);
+        List<Set<Rule>> ls = new ArrayList<Set<Rule>>();
+        ls.add(s);
+        
+        // Test Plan before optimizing
+        List<Operator> list = plan.getSinks();
+        assertTrue( list.contains(stor) );
+        
+        list = plan.getSources();
+        assertTrue( list.contains(load) );
+        
+        assertTrue( plan.getPredecessors(stor).contains(filter) ); 
+        assertEquals( 1, plan.getPredecessors(stor).size() );
+        
+        assertTrue( plan.getPredecessors(filter2).contains(foreach) );
+        assertEquals( 1, plan.getPredecessors(filter2).size() );
+        
+        assertTrue( plan.getPredecessors(foreach).contains(load) );
+        assertEquals( 1, plan.getPredecessors(foreach).size() );
+        
+        assertTrue( plan.getPredecessors(filter).contains(filter2) );
+        assertEquals( 1, plan.getPredecessors(filter).size() );
+        
+        assertTrue( foreach.getInnerPlan().getSinks().contains(generate) );
+        assertEquals( 1, foreach.getInnerPlan().getSinks().size() );
+        assertTrue( foreach.getInnerPlan().getSources().contains(innerLoad) );
+        assertTrue( foreach.getInnerPlan().getSources().contains(innerLoad2) );
+        assertEquals( 2, foreach.getInnerPlan().getSources().size() );
+        
+        // Run the optimizer
+        MyPlanOptimizer optimizer = new MyPlanOptimizer(plan, ls, 3);
+        optimizer.addPlanTransformListener(new ProjectionPatcher());
+        optimizer.addPlanTransformListener(new SchemaPatcher());
+        optimizer.optimize();
+        
+        // Test after optimization
+        list = plan.getSinks();
+        assertTrue( list.contains(stor) );
+        
+        list = plan.getSources();
+        assertTrue( list.contains(load) );
+        
+        assertTrue( plan.getPredecessors(stor).contains(filter2) ); 
+        assertEquals( 1, plan.getPredecessors(stor).size() );
+        
+        assertTrue( plan.getPredecessors(filter2).contains(foreach) );
+        assertEquals( 1, plan.getPredecessors(filter2).size() );
+        
+        assertTrue( plan.getPredecessors(foreach).contains(filter) );
+        assertEquals( 1, plan.getPredecessors(foreach).size() );
+        
+        assertTrue( plan.getPredecessors(filter).contains(load) );
+        assertEquals( 1, plan.getPredecessors(filter).size() );
+        
+        assertTrue( foreach.getInnerPlan().getSinks().contains(generate) );
+        assertEquals( 1, foreach.getInnerPlan().getSinks().size() );
+        assertTrue( foreach.getInnerPlan().getSources().contains(innerLoad) );
+        assertTrue( foreach.getInnerPlan().getSources().contains(innerLoad2) );
+        assertEquals( 2, foreach.getInnerPlan().getSources().size() );
+        
+    }
+    
+    public void testMultipleFilter2() throws Exception {
+        
+        // Plan here is 
+        // Load (name, cuisines{t:(name)}) -> foreach gen name,cuisines 
+        // -> filter name == 'joe2' --> filter name == 'joe' --> stor
+        
+        LogicalPlan plan = null;
+        LOLoad load = null;
+        LOForEach foreach = null;
+        LOFilter filter = null;
+        LOStore stor = null;
+        
+        plan = new LogicalPlan();
+        
+        LogicalSchema schema = new LogicalSchema();
+        schema.addField(new LogicalSchema.LogicalFieldSchema("name", null, DataType.CHARARRAY));
+        LogicalSchema bagSchema = new LogicalSchema();
+        LogicalSchema bagTupleSchema = new LogicalSchema();
+        bagTupleSchema.addField( new LogicalSchema.LogicalFieldSchema("name", null, DataType.CHARARRAY) );
+        bagSchema.addField( new LogicalSchema.LogicalFieldSchema( "t", bagTupleSchema, DataType.TUPLE ) );
+        schema.addField(new LogicalSchema.LogicalFieldSchema("cuisines", bagSchema, DataType.BAG));
+        
+        load = new LOLoad(null, schema, plan);
+        load.setAlias("A");
+        plan.add(load);
+        
+        foreach = new LOForEach(plan);
+        
+        LogicalPlan innerPlan = new LogicalPlan();
+        LOInnerLoad innerLoad = new LOInnerLoad(innerPlan, foreach, 0);
+        innerPlan.add(innerLoad);
+        
+        LOInnerLoad innerLoad2 = new LOInnerLoad(innerPlan, foreach, 1);
+        innerPlan.add(innerLoad2);
+        
+        LogicalExpressionPlan namePrj = new LogicalExpressionPlan();        
+        ProjectExpression prjName = new ProjectExpression(namePrj, DataType.CHARARRAY, 0, 0);
+        namePrj.add(prjName);
+        
+        LogicalExpressionPlan cuisinesPrj = new LogicalExpressionPlan();
+        ProjectExpression prjCuisines = new ProjectExpression(cuisinesPrj, DataType.BAG, 1, 0);
+        cuisinesPrj.add(prjCuisines);
+        
+        List<LogicalExpressionPlan> expPlans = new ArrayList<LogicalExpressionPlan>();
+        expPlans.add(namePrj);
+        expPlans.add(cuisinesPrj);
+        
+        boolean flatten[] = new boolean[2];
+        flatten[0] = false;
+        flatten[1] = true;
+        
+        LOGenerate generate = new LOGenerate(innerPlan, expPlans, flatten);        
+        innerPlan.add(generate);
+        innerPlan.connect(innerLoad, generate);
+        innerPlan.connect(innerLoad2, generate);
+        
+        foreach.setInnerPlan(innerPlan);
+        foreach.setAlias("B");
+        plan.add(foreach);
+        
+        plan.connect(load, foreach);
+        
+        filter = new LOFilter(plan);
+        LogicalExpressionPlan filterPlan = new LogicalExpressionPlan();
+        ProjectExpression namePrj2 = new ProjectExpression(filterPlan, DataType.CHARARRAY, 0, 0);
+        filterPlan.add(namePrj2);
+        ConstantExpression constExp = new ConstantExpression(filterPlan, DataType.CHARARRAY, "joe");
+        filterPlan.add(constExp);
+        EqualExpression equal = new EqualExpression(filterPlan, namePrj2, constExp);
+        filterPlan.add(equal);
+        
+        filter.setFilterPlan(filterPlan);
+        filter.setAlias("C");
+        plan.add(filter);
+        
+        LOFilter filter2 = new LOFilter(plan);
+        LogicalExpressionPlan filter2Plan = new LogicalExpressionPlan();
+        ProjectExpression name2Prj2 = new ProjectExpression(filter2Plan, DataType.CHARARRAY, 0, 0);
+        filter2Plan.add(name2Prj2);
+        ConstantExpression const2Exp = new ConstantExpression(filter2Plan, DataType.CHARARRAY, "joe2");
+        filter2Plan.add(const2Exp);
+        EqualExpression equal2 = new EqualExpression(filter2Plan, namePrj2, constExp);
+        filter2Plan.add(equal2);
+        
+        filter2.setFilterPlan(filter2Plan);
+        filter2.setAlias("C1");
+        plan.add(filter2);
+        
+        plan.connect(foreach, filter2);
+        plan.connect(filter2, filter);
+        
+        stor = new LOStore(plan);
+        stor.setAlias("D");
+        plan.add(stor);
+        plan.connect(filter,stor);
+        
+        try {
+            // Stamp everything with a Uid
+            UidStamper stamper = new UidStamper(plan);
+            stamper.visit();
+        }catch(Exception e) {
+            assertTrue("Failed to set a valid uid", false );
+        }
+        
+        
+        // run filter rule
+        Rule r = new FilterAboveForeach("FilterAboveFlatten");
+        Set<Rule> s = new HashSet<Rule>();
+        s.add(r);
+        List<Set<Rule>> ls = new ArrayList<Set<Rule>>();
+        ls.add(s);
+        
+        // Test Plan before optimizing
+        List<Operator> list = plan.getSinks();
+        assertTrue( list.contains(stor) );
+        
+        list = plan.getSources();
+        assertTrue( list.contains(load) );
+        
+        assertTrue( plan.getPredecessors(stor).contains(filter) ); 
+        assertEquals( 1, plan.getPredecessors(stor).size() );
+        
+        assertTrue( plan.getPredecessors(filter2).contains(foreach) );
+        assertEquals( 1, plan.getPredecessors(filter2).size() );
+        
+        assertTrue( plan.getPredecessors(foreach).contains(load) );
+        assertEquals( 1, plan.getPredecessors(foreach).size() );
+        
+        assertTrue( plan.getPredecessors(filter).contains(filter2) );
+        assertEquals( 1, plan.getPredecessors(filter).size() );
+        
+        assertTrue( foreach.getInnerPlan().getSinks().contains(generate) );
+        assertEquals( 1, foreach.getInnerPlan().getSinks().size() );
+        assertTrue( foreach.getInnerPlan().getSources().contains(innerLoad) );
+        assertTrue( foreach.getInnerPlan().getSources().contains(innerLoad2) );
+        assertEquals( 2, foreach.getInnerPlan().getSources().size() );
+        
+        // Run the optimizer
+        MyPlanOptimizer optimizer = new MyPlanOptimizer(plan, ls, 3);
+        optimizer.addPlanTransformListener(new ProjectionPatcher());
+        optimizer.addPlanTransformListener(new SchemaPatcher());
+        optimizer.optimize();
+        
+        // Test after optimization
+        list = plan.getSinks();
+        assertTrue( list.contains(stor) );
+        
+        list = plan.getSources();
+        assertTrue( list.contains(load) );
+        
+        assertTrue( plan.getPredecessors(stor).contains(foreach) ); 
+        assertEquals( 1, plan.getPredecessors(stor).size() );
+        
+        assertTrue( plan.getPredecessors(filter2).contains(load) );
+        assertEquals( 1, plan.getPredecessors(filter2).size() );
+        
+        assertTrue( plan.getPredecessors(foreach).contains(filter) );
+        assertEquals( 1, plan.getPredecessors(foreach).size() );
+        
+        assertTrue( plan.getPredecessors(filter).contains(filter2) );
+        assertEquals( 1, plan.getPredecessors(filter).size() );
+        
+        assertTrue( foreach.getInnerPlan().getSinks().contains(generate) );
+        assertEquals( 1, foreach.getInnerPlan().getSinks().size() );
+        assertTrue( foreach.getInnerPlan().getSources().contains(innerLoad) );
+        assertTrue( foreach.getInnerPlan().getSources().contains(innerLoad2) );
+        assertEquals( 2, foreach.getInnerPlan().getSources().size() );        
+    }
+    
+public void testMultipleFilterNotPossible() throws Exception {
+        
+        // Plan here is 
+        // Load (name, cuisines{t:(name)}) -> foreach gen name,cuisines 
+        // -> filter $1 == 'joe2' --> filter $1 == 'joe' --> stor
+        
+        LogicalPlan plan = null;
+        LOLoad load = null;
+        LOForEach foreach = null;
+        LOFilter filter = null;
+        LOStore stor = null;
+        
+        plan = new LogicalPlan();
+        
+        LogicalSchema schema = new LogicalSchema();
+        schema.addField(new LogicalSchema.LogicalFieldSchema("name", null, DataType.CHARARRAY));
+        LogicalSchema bagSchema = new LogicalSchema();
+        LogicalSchema bagTupleSchema = new LogicalSchema();
+        bagTupleSchema.addField( new LogicalSchema.LogicalFieldSchema("name", null, DataType.CHARARRAY) );
+        bagSchema.addField( new LogicalSchema.LogicalFieldSchema( "t", bagTupleSchema, DataType.TUPLE ) );
+        schema.addField(new LogicalSchema.LogicalFieldSchema("cuisines", bagSchema, DataType.BAG));
+        
+        load = new LOLoad(null, schema, plan);
+        load.setAlias("A");
+        plan.add(load);
+        
+        foreach = new LOForEach(plan);
+        
+        LogicalPlan innerPlan = new LogicalPlan();
+        LOInnerLoad innerLoad = new LOInnerLoad(innerPlan, foreach, 0);
+        innerPlan.add(innerLoad);
+        
+        LOInnerLoad innerLoad2 = new LOInnerLoad(innerPlan, foreach, 1);
+        innerPlan.add(innerLoad2);
+        
+        LogicalExpressionPlan namePrj = new LogicalExpressionPlan();        
+        ProjectExpression prjName = new ProjectExpression(namePrj, DataType.CHARARRAY, 0, 0);
+        namePrj.add(prjName);
+        
+        LogicalExpressionPlan cuisinesPrj = new LogicalExpressionPlan();
+        ProjectExpression prjCuisines = new ProjectExpression(cuisinesPrj, DataType.BAG, 1, 0);
+        cuisinesPrj.add(prjCuisines);
+        
+        List<LogicalExpressionPlan> expPlans = new ArrayList<LogicalExpressionPlan>();
+        expPlans.add(namePrj);
+        expPlans.add(cuisinesPrj);
+        
+        boolean flatten[] = new boolean[2];
+        flatten[0] = false;
+        flatten[1] = true;
+        
+        LOGenerate generate = new LOGenerate(innerPlan, expPlans, flatten);        
+        innerPlan.add(generate);
+        innerPlan.connect(innerLoad, generate);
+        innerPlan.connect(innerLoad2, generate);
+        
+        foreach.setInnerPlan(innerPlan);
+        foreach.setAlias("B");
+        plan.add(foreach);
+        
+        plan.connect(load, foreach);
+        
+        filter = new LOFilter(plan);
+        LogicalExpressionPlan filterPlan = new LogicalExpressionPlan();
+        ProjectExpression namePrj2 = new ProjectExpression(filterPlan, DataType.CHARARRAY, 0, 1);
+        filterPlan.add(namePrj2);
+        ConstantExpression constExp = new ConstantExpression(filterPlan, DataType.CHARARRAY, "joe");
+        filterPlan.add(constExp);
+        EqualExpression equal = new EqualExpression(filterPlan, namePrj2, constExp);
+        filterPlan.add(equal);
+        
+        filter.setFilterPlan(filterPlan);
+        filter.setAlias("C");
+        plan.add(filter);
+        
+        LOFilter filter2 = new LOFilter(plan);
+        LogicalExpressionPlan filter2Plan = new LogicalExpressionPlan();
+        ProjectExpression name2Prj2 = new ProjectExpression(filter2Plan, DataType.CHARARRAY, 0, 1);
+        filter2Plan.add(name2Prj2);
+        ConstantExpression const2Exp = new ConstantExpression(filter2Plan, DataType.CHARARRAY, "joe2");
+        filter2Plan.add(const2Exp);
+        EqualExpression equal2 = new EqualExpression(filter2Plan, namePrj2, constExp);
+        filter2Plan.add(equal2);
+        
+        filter2.setFilterPlan(filter2Plan);
+        filter2.setAlias("C1");
+        plan.add(filter2);
+        
+        plan.connect(foreach, filter2);
+        plan.connect(filter2, filter);
+        
+        stor = new LOStore(plan);
+        stor.setAlias("D");
+        plan.add(stor);
+        plan.connect(filter,stor);
+        
+        try {
+            // Stamp everything with a Uid
+            UidStamper stamper = new UidStamper(plan);
+            stamper.visit();
+        }catch(Exception e) {
+            assertTrue("Failed to set a valid uid", false );
+        }
+        
+        
+        // run filter rule
+        Rule r = new FilterAboveForeach("FilterAboveFlatten");
+        Set<Rule> s = new HashSet<Rule>();
+        s.add(r);
+        List<Set<Rule>> ls = new ArrayList<Set<Rule>>();
+        ls.add(s);
+        
+        // Test Plan before optimizing
+        List<Operator> list = plan.getSinks();
+        assertTrue( list.contains(stor) );
+        
+        list = plan.getSources();
+        assertTrue( list.contains(load) );
+        
+        assertTrue( plan.getPredecessors(stor).contains(filter) ); 
+        assertEquals( 1, plan.getPredecessors(stor).size() );
+        
+        assertTrue( plan.getPredecessors(filter2).contains(foreach) );
+        assertEquals( 1, plan.getPredecessors(filter2).size() );
+        
+        assertTrue( plan.getPredecessors(foreach).contains(load) );
+        assertEquals( 1, plan.getPredecessors(foreach).size() );
+        
+        assertTrue( plan.getPredecessors(filter).contains(filter2) );
+        assertEquals( 1, plan.getPredecessors(filter).size() );
+        
+        assertTrue( foreach.getInnerPlan().getSinks().contains(generate) );
+        assertEquals( 1, foreach.getInnerPlan().getSinks().size() );
+        assertTrue( foreach.getInnerPlan().getSources().contains(innerLoad) );
+        assertTrue( foreach.getInnerPlan().getSources().contains(innerLoad2) );
+        assertEquals( 2, foreach.getInnerPlan().getSources().size() );
+        
+        // Run the optimizer
+        MyPlanOptimizer optimizer = new MyPlanOptimizer(plan, ls, 3);
+        optimizer.addPlanTransformListener(new ProjectionPatcher());
+        optimizer.addPlanTransformListener(new SchemaPatcher());
+        optimizer.optimize();
+        
+        // Test after optimization
+        list = plan.getSinks();
+        assertTrue( list.contains(stor) );
+        
+        list = plan.getSources();
+        assertTrue( list.contains(load) );
+        
+        assertTrue( plan.getPredecessors(stor).contains(filter) ); 
+        assertEquals( 1, plan.getPredecessors(stor).size() );
+        
+        assertTrue( plan.getPredecessors(filter2).contains(foreach) );
+        assertEquals( 1, plan.getPredecessors(filter2).size() );
+        
+        assertTrue( plan.getPredecessors(foreach).contains(load) );
+        assertEquals( 1, plan.getPredecessors(foreach).size() );
+        
+        assertTrue( plan.getPredecessors(filter).contains(filter2) );
+        assertEquals( 1, plan.getPredecessors(filter).size() );
+        
+        assertTrue( foreach.getInnerPlan().getSinks().contains(generate) );
+        assertEquals( 1, foreach.getInnerPlan().getSinks().size() );
+        assertTrue( foreach.getInnerPlan().getSources().contains(innerLoad) );
+        assertTrue( foreach.getInnerPlan().getSources().contains(innerLoad2) );
+        assertEquals( 2, foreach.getInnerPlan().getSources().size() );    
+    }
+    
+    public void testNotPossibleFilter() throws Exception {
+        // Plan here is 
+        // Load (name, cuisines{t:(name)}) -> foreach gen name,flatten(cuisines) 
+        // -> filter $1 == 'joe' --> stor
+
+        LogicalPlan plan = null;
+        LOLoad load = null;
+        LOForEach foreach = null;
+        LOFilter filter = null;
+        LOStore stor = null;
+
+        plan = new LogicalPlan();
+
+        LogicalSchema schema = new LogicalSchema();
+        schema.addField(new LogicalSchema.LogicalFieldSchema("name", null, DataType.CHARARRAY));
+        LogicalSchema bagSchema = new LogicalSchema();
+        LogicalSchema bagTupleSchema = new LogicalSchema();
+        bagTupleSchema.addField( new LogicalSchema.LogicalFieldSchema("name", null, DataType.CHARARRAY) );
+        bagSchema.addField( new LogicalSchema.LogicalFieldSchema( "t", bagTupleSchema, DataType.TUPLE ) );
+        schema.addField(new LogicalSchema.LogicalFieldSchema("cuisines", bagSchema, DataType.BAG));
+        
+        load = new LOLoad(null, schema, plan);
+        load.setAlias("A");
+        plan.add(load);
+        
+        foreach = new LOForEach(plan);
+        
+        LogicalPlan innerPlan = new LogicalPlan();
+        LOInnerLoad innerLoad = new LOInnerLoad(innerPlan, foreach, 0);
+        innerPlan.add(innerLoad);
+        
+        LOInnerLoad innerLoad2 = new LOInnerLoad(innerPlan, foreach, 1);
+        innerPlan.add(innerLoad2);
+        
+        LogicalExpressionPlan namePrj = new LogicalExpressionPlan();        
+        ProjectExpression prjName = new ProjectExpression(namePrj, DataType.CHARARRAY, 0, 0);
+        namePrj.add(prjName);
+        
+        LogicalExpressionPlan cuisinesPrj = new LogicalExpressionPlan();
+        ProjectExpression prjCuisines = new ProjectExpression(cuisinesPrj, DataType.BAG, 1, 0);
+        cuisinesPrj.add(prjCuisines);
+        
+        List<LogicalExpressionPlan> expPlans = new ArrayList<LogicalExpressionPlan>();
+        expPlans.add(namePrj);
+        expPlans.add(cuisinesPrj);
+        
+        boolean flatten[] = new boolean[2];
+        flatten[0] = false;
+        flatten[1] = true;
+        
+        LOGenerate generate = new LOGenerate(innerPlan, expPlans, flatten);        
+        innerPlan.add(generate);
+        innerPlan.connect(innerLoad, generate);
+        innerPlan.connect(innerLoad2, generate);
+        
+        foreach.setInnerPlan(innerPlan);
+        foreach.setAlias("B");
+        plan.add(foreach);
+        
+        plan.connect(load, foreach);
+        
+        filter = new LOFilter(plan);
+        LogicalExpressionPlan filterPlan = new LogicalExpressionPlan();
+        ProjectExpression namePrj2 = new ProjectExpression(filterPlan, DataType.CHARARRAY, 0, 1);
+        filterPlan.add(namePrj2);
+        ConstantExpression constExp = new ConstantExpression(filterPlan, DataType.CHARARRAY, "joe");
+        filterPlan.add(constExp);
+        EqualExpression equal = new EqualExpression(filterPlan, namePrj2, constExp);
+        filterPlan.add(equal);
+        
+        filter.setFilterPlan(filterPlan);
+        filter.setAlias("C");
+        plan.add(filter);
+        
+        plan.connect(foreach, filter);
+        
+        stor = new LOStore(plan);
+        stor.setAlias("D");
+        plan.add(stor);
+        plan.connect(filter,stor);
+        
+        try {
+            // Stamp everything with a Uid
+            UidStamper stamper = new UidStamper(plan);
+            stamper.visit();
+        }catch(Exception e) {
+            assertTrue("Failed to set a valid uid", false );
+        }
+        
+        
+        // run filter rule
+        Rule r = new FilterAboveForeach("FilterAboveFlatten");
+        Set<Rule> s = new HashSet<Rule>();
+        s.add(r);
+        List<Set<Rule>> ls = new ArrayList<Set<Rule>>();
+        ls.add(s);
+        
+        // Test Plan before optimizing
+        List<Operator> list = plan.getSinks();
+        assertTrue( list.contains(stor) );
+        
+        list = plan.getSources();
+        assertTrue( list.contains(load) );
+        
+        assertTrue( plan.getPredecessors(stor).contains(filter) ); 
+        assertEquals( 1, plan.getPredecessors(stor).size() );
+        
+        assertTrue( plan.getPredecessors(filter).contains(foreach) );
+        assertEquals( 1, plan.getPredecessors(filter).size() );
+        
+        assertTrue( foreach.getInnerPlan().getSinks().contains(generate) );
+        assertEquals( 1, foreach.getInnerPlan().getSinks().size() );
+        assertTrue( foreach.getInnerPlan().getSources().contains(innerLoad) );
+        assertTrue( foreach.getInnerPlan().getSources().contains(innerLoad2) );
+        assertEquals( 2, foreach.getInnerPlan().getSources().size() );
+        
+        // Run the optimizer
+        MyPlanOptimizer optimizer = new MyPlanOptimizer(plan, ls, 3);
+        optimizer.addPlanTransformListener(new ProjectionPatcher());
+        optimizer.addPlanTransformListener(new SchemaPatcher());
+        optimizer.optimize();
+        
+        // Test after optimization
+        list = plan.getSinks();
+        assertTrue( list.contains(stor) );
+        
+        list = plan.getSources();
+        assertTrue( list.contains(load) );
+        
+        assertTrue( plan.getPredecessors(stor).contains(filter) ); 
+        assertEquals( 1, plan.getPredecessors(stor).size() );
+        
+        assertTrue( plan.getPredecessors(filter).contains(foreach) );
+        assertEquals( 1, plan.getPredecessors(filter).size() );
+        
+        assertTrue( foreach.getInnerPlan().getSinks().contains(generate) );
+        assertEquals( 1, foreach.getInnerPlan().getSinks().size() );
+        assertTrue( foreach.getInnerPlan().getSources().contains(innerLoad) );
+        assertTrue( foreach.getInnerPlan().getSources().contains(innerLoad2) );
+        assertEquals( 2, foreach.getInnerPlan().getSources().size() );
+    }
+    
+    public void testSimple2() throws Exception {
+        
+        // Plan here is 
+        // Load (name, cuisines{t:(name)}) -> foreach gen name,cuisines 
+        // -> filter name == 'joe' --> stor
+        
+        LogicalPlan plan = null;
+        LOLoad load = null;
+        LOForEach foreach = null;
+        LOFilter filter = null;
+        LOStore stor = null;
+        
+        plan = new LogicalPlan();
+        
+        LogicalSchema schema = new LogicalSchema();
+        schema.addField(new LogicalSchema.LogicalFieldSchema("name", null, DataType.CHARARRAY));
+        LogicalSchema bagSchema = new LogicalSchema();
+        LogicalSchema bagTupleSchema = new LogicalSchema();
+        bagTupleSchema.addField( new LogicalSchema.LogicalFieldSchema("name", null, DataType.CHARARRAY) );
+        bagSchema.addField( new LogicalSchema.LogicalFieldSchema( "t", bagTupleSchema, DataType.TUPLE ) );
+        schema.addField(new LogicalSchema.LogicalFieldSchema("cuisines", bagSchema, DataType.BAG));
+        
+        load = new LOLoad(null, schema, plan);
+        load.setAlias("A");
+        plan.add(load);
+        
+        foreach = new LOForEach(plan);
+        
+        LogicalPlan innerPlan = new LogicalPlan();
+        LOInnerLoad innerLoad = new LOInnerLoad(innerPlan, foreach, 0);
+        innerPlan.add(innerLoad);
+        
+        LOInnerLoad innerLoad2 = new LOInnerLoad(innerPlan, foreach, 1);
+        innerPlan.add(innerLoad2);
+        
+        LogicalExpressionPlan namePrj = new LogicalExpressionPlan();        
+        ProjectExpression prjName = new ProjectExpression(namePrj, DataType.CHARARRAY, 0, 0);
+        namePrj.add(prjName);
+        
+        LogicalExpressionPlan cuisinesPrj = new LogicalExpressionPlan();
+        ProjectExpression prjCuisines = new ProjectExpression(cuisinesPrj, DataType.BAG, 1, 0);
+        cuisinesPrj.add(prjCuisines);
+        
+        List<LogicalExpressionPlan> expPlans = new ArrayList<LogicalExpressionPlan>();
+        expPlans.add(namePrj);
+        expPlans.add(cuisinesPrj);
+        
+        boolean flatten[] = new boolean[2];
+        flatten[0] = false;
+        flatten[1] = false;
+        
+        LOGenerate generate = new LOGenerate(innerPlan, expPlans, flatten);        
+        innerPlan.add(generate);
+        innerPlan.connect(innerLoad, generate);
+        innerPlan.connect(innerLoad2, generate);
+        
+        foreach.setInnerPlan(innerPlan);
+        foreach.setAlias("B");
+        plan.add(foreach);
+        
+        plan.connect(load, foreach);
+        
+        filter = new LOFilter(plan);
+        LogicalExpressionPlan filterPlan = new LogicalExpressionPlan();
+        ProjectExpression namePrj2 = new ProjectExpression(filterPlan, DataType.CHARARRAY, 0, 0);
+        filterPlan.add(namePrj2);
+        ConstantExpression constExp = new ConstantExpression(filterPlan, DataType.CHARARRAY, "joe");
+        filterPlan.add(constExp);
+        EqualExpression equal = new EqualExpression(filterPlan, namePrj2, constExp);
+        filterPlan.add(equal);
+        
+        filter.setFilterPlan(filterPlan);
+        filter.setAlias("C");
+        plan.add(filter);
+        
+        plan.connect(foreach, filter);
+        
+        stor = new LOStore(plan);
+        stor.setAlias("D");
+        plan.add(stor);
+        plan.connect(filter,stor);
+        
+        try {
+            // Stamp everything with a Uid
+            UidStamper stamper = new UidStamper(plan);
+            stamper.visit();
+        }catch(Exception e) {
+            assertTrue("Failed to set a valid uid", false );
+        }
+        
+        
+        // run filter rule
+        Rule r = new FilterAboveForeach("FilterAboveFlatten");
+        Set<Rule> s = new HashSet<Rule>();
+        s.add(r);
+        List<Set<Rule>> ls = new ArrayList<Set<Rule>>();
+        ls.add(s);
+        
+        // Test Plan before optimizing
+        List<Operator> list = plan.getSinks();
+        assertTrue( list.contains(stor) );
+        
+        list = plan.getSources();
+        assertTrue( list.contains(load) );
+        
+        assertTrue( plan.getPredecessors(stor).contains(filter) ); 
+        assertEquals( 1, plan.getPredecessors(stor).size() );
+        
+        assertTrue( plan.getPredecessors(filter).contains(foreach) );
+        assertEquals( 1, plan.getPredecessors(filter).size() );
+        
+        assertTrue( foreach.getInnerPlan().getSinks().contains(generate) );
+        assertEquals( 1, foreach.getInnerPlan().getSinks().size() );
+        assertTrue( foreach.getInnerPlan().getSources().contains(innerLoad) );
+        assertTrue( foreach.getInnerPlan().getSources().contains(innerLoad2) );
+        assertEquals( 2, foreach.getInnerPlan().getSources().size() );
+        
+        // Run the optimizer
+        MyPlanOptimizer optimizer = new MyPlanOptimizer(plan, ls, 3);
+        optimizer.addPlanTransformListener(new ProjectionPatcher());
+        optimizer.addPlanTransformListener(new SchemaPatcher());
+        optimizer.optimize();
+        
+        // Test after optimization
+        list = plan.getSinks();
+        assertTrue( list.contains(stor) );
+        
+        list = plan.getSources();
+        assertTrue( list.contains(load) );
+        
+        assertTrue( plan.getPredecessors(stor).contains(foreach) ); 
+        assertEquals( 1, plan.getPredecessors(stor).size() );
+        
+        assertTrue( plan.getPredecessors(filter).contains(load) );
+        assertEquals( 1, plan.getPredecessors(filter).size() );
+        
+        assertTrue( plan.getPredecessors(foreach).contains(filter) );
+        assertEquals( 1, plan.getPredecessors(foreach).size() );
+        
+        assertTrue( foreach.getInnerPlan().getSinks().contains(generate) );
+        assertEquals( 1, foreach.getInnerPlan().getSinks().size() );
+        assertTrue( foreach.getInnerPlan().getSources().contains(innerLoad) );
+        assertTrue( foreach.getInnerPlan().getSources().contains(innerLoad2) );
+        assertEquals( 2, foreach.getInnerPlan().getSources().size() );
+    }
+    
+    public class MyPlanOptimizer extends PlanOptimizer {
+
+        protected MyPlanOptimizer(OperatorPlan p, List<Set<Rule>> rs,
+                int iterations) {
+            super(p, rs, iterations);           
+        }
+        
+        public void addPlanTransformListener(PlanTransformListener listener) {
+            super.addPlanTransformListener(listener);
+        }
+        
+    }
+    
+//    public class MyPrintVisitor extends AllExpressionVisitor {
+//
+//        private PrintStream mStream = null;
+//        private String TAB1 = "    ";
+//        private String TABMore = "|   ";
+//        private String LSep = "|\n|---";
+//        private String USep = "|   |\n|   ";
+//        private int levelCntr = -1;
+//        private boolean isVerbose = true;
+//        
+//        public MyPrintVisitor(OperatorPlan plan, PrintStream ps) {
+//            super(plan, new DepthFirstWalker(plan));
+//            mStream = ps;
+//        }
+//
+//        @Override
+//        protected LogicalExpressionVisitor getVisitor(LogicalExpressionPlan expr) {
+//            // TODO Auto-generated method stub
+//            return null;
+//        }      
+//        
+//        @Override
+//        public void visit() throws VisitorException {
+//            try {
+//                mStream.write(depthFirstLP().getBytes());
+//            } catch (IOException e) {
+//                throw new VisitorException(e);
+//            }
+//        }
+//
+//        public void setVerbose(boolean verbose) {
+//            isVerbose = verbose;
+//        }
+//
+//        public void print(OutputStream printer) throws VisitorException, IOException {
+//            printer.write(depthFirstLP().getBytes());
+//        }
+//
+//        class LogicalRelationalOperatorCompare implements Comparator<LogicalRelationalOperator> {
+//
+//            @Override
+//            public int compare(LogicalRelationalOperator o1,
+//                    LogicalRelationalOperator o2) {
+//                return 0;
+//            }
+//            
+//        }
+//
+//        protected String depthFirstLP() throws VisitorException, IOException {
+//            StringBuilder sb = new StringBuilder();
+//            List<Operator> leaves = plan.getSinks();
+//            // Collections.sort(leaves, c)
+//            for (Operator leaf : leaves) {
+//                sb.append(depthFirst(leaf));
+//                sb.append("\n");
+//            }
+//            //sb.delete(sb.length() - "\n".length(), sb.length());
+//            //sb.delete(sb.length() - "\n".length(), sb.length());
+//            return sb.toString();
+//        }
+//        
+//        private String planString(LogicalPlan lp) throws VisitorException, IOException {
+//            StringBuilder sb = new StringBuilder();
+//            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+//            if(lp!=null)
+//                lp.explain(baos, mStream);
+//            else
+//                return "";
+//            sb.append(USep);
+//            sb.append(shiftStringByTabs(baos.toString(), 2));
+//            return sb.toString();
+//        }
+//        
+//        private String planString(
+//                List<LogicalPlan> logicalPlanList) throws VisitorException, IOException {
+//            StringBuilder sb = new StringBuilder();
+//            if(logicalPlanList!=null)
+//                for (LogicalPlan lp : logicalPlanList) {
+//                    sb.append(planString(lp));
+//                }
+//            return sb.toString();
+//        }
+//
+//        private String depthFirst(Operator node) throws VisitorException, IOException {
+//            StringBuilder sb = new StringBuilder(node.getName());
+//            if(node instanceof LogicalExpression) {
+//                sb.append(" FieldSchema: ");
+//                try {
+//                    sb.append(((LogicalExpression)node).getUid());
+//                    sb.append(" Type: " + DataType.findTypeName(((LogicalExpression)node).getType()));
+//                } catch (Exception e) {
+//                    sb.append("Caught Exception: " + e.getMessage());
+//                }
+//            } else if( node instanceof LogicalRelationalOperator ){
+//                sb.append(" Schema: ");
+//                try {
+//                    sb.append(((LogicalRelationalOperator)node).getSchema());
+//                } catch (Exception e) {
+//                    sb.append("Caught exception: " + e.getMessage());
+//                }
+//            }
+//
+//            sb.append("\n");
+//
+//            if (isVerbose) {
+//                if(node instanceof LOFilter){
+//                    sb.append(planString(((LOFilter)node).getComparisonPlan()));
+//                }
+//                else if(node instanceof LOForEach){
+//                    sb.append(planString(((LOForEach)node).getForEachPlans()));        
+//                }
+//                else if(node instanceof LOGenerate){
+//                    sb.append(planString(((LOGenerate)node).getGeneratePlans())); 
+//                    
+//                }
+//                else if(node instanceof LOCogroup){
+//                    MultiMap<LogicalOperator, LogicalPlan> plans = ((LOCogroup)node).getGroupByPlans();
+//                    for (LogicalOperator lo : plans.keySet()) {
+//                        // Visit the associated plans
+//                        for (LogicalPlan plan : plans.get(lo)) {
+//                            sb.append(planString(plan));
+//                        }
+//                    }
+//                }
+//                else if(node instanceof LOJoin){
+//                    MultiMap<LogicalOperator, LogicalPlan> plans = ((LOJoin)node).getJoinPlans();
+//                    for (LogicalOperator lo : plans.keySet()) {
+//                        // Visit the associated plans
+//                        for (LogicalPlan plan : plans.get(lo)) {
+//                            sb.append(planString(plan));
+//                        }
+//                    }
+//                }
+//                else if(node instanceof LOJoin){
+//                    MultiMap<LogicalOperator, LogicalPlan> plans = ((LOJoin)node).getJoinPlans();
+//                    for (LogicalOperator lo : plans.keySet()) {
+//                        // Visit the associated plans
+//                        for (LogicalPlan plan : plans.get(lo)) {
+//                            sb.append(planString(plan));
+//                        }
+//                    }
+//                }
+//                else if(node instanceof LOSort){
+//                    sb.append(planString(((LOSort)node).getSortColPlans())); 
+//                }
+//                else if(node instanceof LOSplitOutput){
+//                    sb.append(planString(((LOSplitOutput)node).getConditionPlan()));
+//                }
+//                else if (node instanceof LOProject) {
+//                    sb.append("Input: ");
+//                    sb.append(((LOProject)node).getExpression().name());
+//                }
+//            }
+//            
+//            List<LogicalOperator> originalPredecessors =  mPlan.getPredecessors(node);
+//            if (originalPredecessors == null)
+//                return sb.toString();
+//            
+//            List<LogicalOperator> predecessors =  new ArrayList<LogicalOperator>(originalPredecessors);
+//            
+//            Collections.sort(predecessors);
+//            int i = 0;
+//            for (LogicalOperator pred : predecessors) {
+//                i++;
+//                String DFStr = depthFirst(pred);
+//                if (DFStr != null) {
+//                    sb.append(LSep);
+//                    if (i < predecessors.size())
+//                        sb.append(shiftStringByTabs(DFStr, 2));
+//                    else
+//                        sb.append(shiftStringByTabs(DFStr, 1));
+//                }
+//            }
+//            return sb.toString();
+//        }
+//
+//        private String shiftStringByTabs(String DFStr, int TabType) {
+//            StringBuilder sb = new StringBuilder();
+//            String[] spl = DFStr.split("\n");
+//
+//            String tab = (TabType == 1) ? TAB1 : TABMore;
+//
+//            sb.append(spl[0] + "\n");
+//            for (int i = 1; i < spl.length; i++) {
+//                sb.append(tab);
+//                sb.append(spl[i]);
+//                sb.append("\n");
+//            }
+//            return sb.toString();
+//        }
+//
+//        private void dispTabs() {
+//            for (int i = 0; i < levelCntr; i++)
+//                System.out.print(TAB1);
+//        }
+//    }
+}



Mime
View raw message