pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ga...@apache.org
Subject svn commit: r665739 [1/2] - in /incubator/pig/branches/types: src/org/apache/pig/impl/logicalLayer/ src/org/apache/pig/impl/physicalLayer/expressionOperators/ src/org/apache/pig/impl/physicalLayer/plans/ test/org/apache/pig/test/ test/org/apache/pig/te...
Date Mon, 09 Jun 2008 16:18:43 GMT
Author: gates
Date: Mon Jun  9 09:18:42 2008
New Revision: 665739

URL: http://svn.apache.org/viewvc?rev=665739&view=rev
Log:
PIG-161 Added missing physical operators not, and, or, regexp.  Changed ComparisonOperator to be an interface and added BinaryComparisonOperator and
UnaryComparisonOperator implementations.


Added:
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/BinaryComparisonOperator.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POAnd.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/PONot.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POOr.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/PORegexp.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/UnaryComparisonOperator.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestBoolean.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestRegexp.java
Modified:
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LORegexp.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogToPhyTranslationVisitor.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/BinaryExpressionOperator.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/ComparisonOperator.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/EqualToExpr.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/GTOrEqualToExpr.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/GreaterThanExpr.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/LTOrEqualToExpr.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/LessThanExpr.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/NotEqualToExpr.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POIsNull.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/PONegative.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/UnaryExpressionOperator.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/ExprPlanVisitor.java
    incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/IsNull1.gld
    incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/IsNull2.gld
    incubator/pig/branches/types/test/org/apache/pig/test/utils/GenPhyOp.java

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LORegexp.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LORegexp.java?rev=665739&r1=665738&r2=665739&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LORegexp.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LORegexp.java Mon Jun  9 09:18:42 2008
@@ -29,14 +29,12 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-public class LORegexp extends ExpressionOperator {
+public class LORegexp extends BinaryExpressionOperator {
     private static final long serialVersionUID = 2L;
 
     /**
      * The expression and the column to be projected.
      */
-    private ExpressionOperator mOperand;
-    private String mRegexp;
     private static Log log = LogFactory.getLog(LORegexp.class);
 
     /**
@@ -52,21 +50,31 @@
      */
     public LORegexp(LogicalPlan plan, OperatorKey key,
             ExpressionOperator operand, String regexp) {
-        super(plan, key);
-        mOperand = operand;
-        mRegexp = regexp;
+        super(plan, key, operand, new LOConst(plan, key, regexp));
     }
 
     public ExpressionOperator getOperand() {
-        return mOperand;
+        return getLhsOperand();
     }
 
     public void setOperand(ExpressionOperator op) {
-        mOperand = op ;
+        setLhsOperand(op) ;
     }
 
     public String getRegexp() {
-        return mRegexp;
+        ExpressionOperator op = getRhsOperand();
+        if (!(op instanceof LOConst)) {
+            throw new RuntimeException(
+                "Regular expression patterns must be a constant.");
+        }
+        Object o = ((LOConst)op).getValue();
+        // better be a string
+        if (!(o instanceof String)) {
+            throw new RuntimeException(
+                "Regular expression patterns must be a string.");
+        }
+
+        return (String)o;
     }
     
     @Override

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogToPhyTranslationVisitor.java?rev=665739&r1=665738&r2=665739&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogToPhyTranslationVisitor.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogToPhyTranslationVisitor.java Mon Jun  9 09:18:42 2008
@@ -45,794 +45,883 @@
 import org.apache.pig.impl.plan.VisitorException;
 
 public class LogToPhyTranslationVisitor extends LOVisitor {
-	
+    
  
-	Map<LogicalOperator, PhysicalOperator> LogToPhyMap;
+    Map<LogicalOperator, PhysicalOperator> LogToPhyMap;
 
-	Random r = new Random();
+    Random r = new Random();
 
-	Stack<PhysicalPlan<? extends PhysicalOperator>> currentPlans;
-	PhysicalPlan currentPlan;
+    Stack<PhysicalPlan<? extends PhysicalOperator>> currentPlans;
+    PhysicalPlan currentPlan;
 
-	NodeIdGenerator nodeGen = NodeIdGenerator.getGenerator();
-	
-	private Log log = LogFactory.getLog(getClass());
-	PigContext pc;
-	
-	public LogToPhyTranslationVisitor(LogicalPlan plan) {
-		super(plan, new DependencyOrderWalker<LogicalOperator, LogicalPlan>(plan));
-
-		currentPlans = new Stack<PhysicalPlan<? extends PhysicalOperator>>();
-		currentPlan = new PhysicalPlan<PhysicalOperator>();
-		LogToPhyMap = new HashMap<LogicalOperator, PhysicalOperator>();
-	}
-	
-	public void setPigContext(PigContext pc) {
-		this.pc = pc;
-	}
-	
-	public PhysicalPlan<PhysicalOperator> getPhysicalPlan() {
-
-		return currentPlan;
-	}
-	
-	@Override
-	public void visit(LOGreaterThan op) throws VisitorException {
-		String scope = op.getOperatorKey().scope;
-		ExpressionOperator exprOp = new GreaterThanExpr(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), op.getRequestedParallelism());
-		exprOp.setResultType(op.getLhsOperand().getType());
-		LogicalPlan lp = op.mPlan;
-		
-		currentPlan.add(exprOp);
-		LogToPhyMap.put(op, exprOp);
-		
-		List<LogicalOperator> predecessors = lp.getPredecessors(op);
-		
-		if(predecessors == null) return;
-		for(LogicalOperator lo : predecessors) {
-			PhysicalOperator from = LogToPhyMap.get(lo);
-			try {
-				//currentExprPlan.connect(from, exprOp);
-				currentPlan.connect(from, exprOp);
-			} catch (PlanException e) {
-				log.error("Invalid physical operators in the physical plan" + e.getMessage());
-			}
-		}
-	}
-	
-	@Override
-	public void visit(LOLesserThan op) throws VisitorException {
-		String scope = op.getOperatorKey().scope;
-		ExpressionOperator exprOp = new LessThanExpr(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), op.getRequestedParallelism());
-		exprOp.setResultType(op.getLhsOperand().getType());
-		LogicalPlan lp = op.mPlan;
-		
-		currentPlan.add(exprOp);
-		LogToPhyMap.put(op, exprOp);
-		
-		List<LogicalOperator> predecessors = lp.getPredecessors(op);
-		
-		if(predecessors == null) return;
-		for(LogicalOperator lo : predecessors) {
-			PhysicalOperator from = LogToPhyMap.get(lo);
-			try {
-				currentPlan.connect(from, exprOp);
-			} catch (PlanException e) {
-				log.error("Invalid physical operators in the physical plan" + e.getMessage());
-			}
-		}
-	}
-	
-	@Override
-	public void visit(LOGreaterThanEqual op) throws VisitorException {
-		String scope = op.getOperatorKey().scope;
-		ExpressionOperator exprOp = new GTOrEqualToExpr(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), op.getRequestedParallelism());
-		exprOp.setResultType(op.getLhsOperand().getType());
-		LogicalPlan lp = op.mPlan;
-		
-		currentPlan.add(exprOp);
-		LogToPhyMap.put(op, exprOp);
-		
-		List<LogicalOperator> predecessors = lp.getPredecessors(op);
-		if(predecessors == null) return;
-		for(LogicalOperator lo : predecessors) {
-			PhysicalOperator from = LogToPhyMap.get(lo);
-			try {
-				currentPlan.connect(from, exprOp);
-			} catch (PlanException e) {
-				log.error("Invalid physical operators in the physical plan" + e.getMessage());
-			}
-		}
-	}
-	
-	@Override
-	public void visit(LOLesserThanEqual op) throws VisitorException {
-		String scope = op.getOperatorKey().scope;
-		ExpressionOperator exprOp = new LTOrEqualToExpr(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), op.getRequestedParallelism());
-		exprOp.setResultType(op.getLhsOperand().getType());
-		LogicalPlan lp = op.mPlan;
-		
-		currentPlan.add(exprOp);
-		LogToPhyMap.put(op, exprOp);
-		
-		List<LogicalOperator> predecessors = lp.getPredecessors(op);
-		if(predecessors == null) return;
-		for(LogicalOperator lo : predecessors) {
-			PhysicalOperator from = LogToPhyMap.get(lo);
-			try {
-				currentPlan.connect(from, exprOp);
-			} catch (PlanException e) {
-				log.error("Invalid physical operators in the physical plan" + e.getMessage());
-			}
-		}
-	}
-	
-	@Override
-	public void visit(LOEqual op) throws VisitorException {
-		String scope = op.getOperatorKey().scope;
-		ExpressionOperator exprOp = new EqualToExpr(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), op.getRequestedParallelism());
-		exprOp.setResultType(op.getLhsOperand().getType());
-		LogicalPlan lp = op.mPlan;
-		
-		currentPlan.add(exprOp);
-		LogToPhyMap.put(op, exprOp);
-		
-		List<LogicalOperator> predecessors = lp.getPredecessors(op);
-		if(predecessors == null) return;
-		for(LogicalOperator lo : predecessors) {
-			PhysicalOperator from = LogToPhyMap.get(lo);
-			try {
-				currentPlan.connect(from, exprOp);
-			} catch (PlanException e) {
-				log.error("Invalid physical operators in the physical plan" + e.getMessage());
-			}
-		}
-	}
-	
-	@Override
-	public void visit(LONotEqual op) throws VisitorException {
-		String scope = op.getOperatorKey().scope;
-		ExpressionOperator exprOp = new NotEqualToExpr(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), op.getRequestedParallelism());
-		exprOp.setResultType(op.getLhsOperand().getType());
-		LogicalPlan lp = op.mPlan;
-		
-		currentPlan.add(exprOp);
-		LogToPhyMap.put(op, exprOp);
-		
-		List<LogicalOperator> predecessors = lp.getPredecessors(op);
-		if(predecessors == null) return;
-		for(LogicalOperator lo : predecessors) {
-			PhysicalOperator from = LogToPhyMap.get(lo);
-			try {
-				currentPlan.connect(from, exprOp);
-			} catch (PlanException e) {
-				log.error("Invalid physical operators in the physical plan" + e.getMessage());
-			}
-		}
-	}
-	
-	@Override
-	public void visit(LOAdd op) throws VisitorException {
-		String scope = op.getOperatorKey().scope;
-		ExpressionOperator exprOp = new Add(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), op.getRequestedParallelism());
-		exprOp.setResultType(op.getType());
-		LogicalPlan lp = op.mPlan;
-		
-		currentPlan.add(exprOp);
-		LogToPhyMap.put(op, exprOp);
-		
-		List<LogicalOperator> predecessors = lp.getPredecessors(op);
-		if(predecessors == null) return;
-		for(LogicalOperator lo : predecessors) {
-			PhysicalOperator from = LogToPhyMap.get(lo);
-			try {
-				currentPlan.connect(from, exprOp);
-			} catch (PlanException e) {
-				log.error("Invalid physical operators in the physical plan" + e.getMessage());
-			}
-		}
-	}
-	
-	@Override
-	public void visit(LOSubtract op) throws VisitorException {
-		String scope = op.getOperatorKey().scope;
-		ExpressionOperator exprOp = new Subtract(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), op.getRequestedParallelism());
-		exprOp.setResultType(op.getType());
-		LogicalPlan lp = op.mPlan;
-		
-		currentPlan.add(exprOp);
-		LogToPhyMap.put(op, exprOp);
-		
-		List<LogicalOperator> predecessors = lp.getPredecessors(op);
-		if(predecessors == null) return;
-		for(LogicalOperator lo : predecessors) {
-			PhysicalOperator from = LogToPhyMap.get(lo);
-			try {
-				currentPlan.connect(from, exprOp);
-			} catch (PlanException e) {
-				log.error("Invalid physical operators in the physical plan" + e.getMessage());
-			}
-		}
-	}
-	
-	@Override
-	public void visit(LOMultiply op) throws VisitorException {
-		String scope = op.getOperatorKey().scope;
-		ExpressionOperator exprOp = new Multiply(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), op.getRequestedParallelism());
-		exprOp.setResultType(op.getType());
-		LogicalPlan lp = op.mPlan;
-		
-		currentPlan.add(exprOp);
-		LogToPhyMap.put(op, exprOp);
-		
-		List<LogicalOperator> predecessors = lp.getPredecessors(op);
-		if(predecessors == null) return;
-		for(LogicalOperator lo : predecessors) {
-			PhysicalOperator from = LogToPhyMap.get(lo);
-			try {
-				currentPlan.connect(from, exprOp);
-			} catch (PlanException e) {
-				log.error("Invalid physical operators in the physical plan" + e.getMessage());
-			}
-		}
-	}
-	
-	@Override
-	public void visit(LODivide op) throws VisitorException {
-		String scope = op.getOperatorKey().scope;
-		ExpressionOperator exprOp = new Divide(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), op.getRequestedParallelism());
-		exprOp.setResultType(op.getType());
-		LogicalPlan lp = op.mPlan;
-		
-		currentPlan.add(exprOp);
-		LogToPhyMap.put(op, exprOp);
-		
-		List<LogicalOperator> predecessors = lp.getPredecessors(op);
-		if(predecessors == null) return;
-		for(LogicalOperator lo : predecessors) {
-			PhysicalOperator from = LogToPhyMap.get(lo);
-			try {
-				currentPlan.connect(from, exprOp);
-			} catch (PlanException e) {
-				log.error("Invalid physical operators in the physical plan" + e.getMessage());
-			}
-		}
-	}
-	
-	@Override
-	public void visit(LOMod op) throws VisitorException {
-		String scope = op.getOperatorKey().scope;
-		ExpressionOperator exprOp = new Mod(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), op.getRequestedParallelism());
-		exprOp.setResultType(op.getType());
-		LogicalPlan lp = op.mPlan;
-		
-		currentPlan.add(exprOp);
-		LogToPhyMap.put(op, exprOp);
-		
-		List<LogicalOperator> predecessors = lp.getPredecessors(op);
-		if(predecessors == null) return;
-		for(LogicalOperator lo : predecessors) {
-			PhysicalOperator from = LogToPhyMap.get(lo);
-			try {
-				currentPlan.connect(from, exprOp);
-			} catch (PlanException e) {
-				log.error("Invalid physical operators in the physical plan" + e.getMessage());
-			}
-		}
-	}
-	
-	@Override
-	public void visit(LOCogroup cg) throws VisitorException {
-		boolean currentPhysicalPlan = false;
-		String scope = cg.getOperatorKey().scope;
-		List<LogicalOperator> inputs = cg.getInputs();
-		
-		POGlobalRearrange poGlobal = new POGlobalRearrange(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), cg.getRequestedParallelism());
-		POPackage poPackage = new POPackage(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), cg.getRequestedParallelism());
-		
-		currentPlan.add(poGlobal);
-		currentPlan.add(poPackage);
-		
-		try {
-			currentPlan.connect(poGlobal, poPackage);
-		} catch (PlanException e1) {
-			log.error("Invalid physical operators in the physical plan" + e1.getMessage());
-		}
-		
-		int count = 0;
-		Byte type = null;
-		for(LogicalOperator op : inputs) {
-			List<LogicalPlan> plans = (List<LogicalPlan>) cg.getGroupByPlans().get(op);
-			POLocalRearrange physOp = new POLocalRearrange(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), cg.getRequestedParallelism());
-			List<ExprPlan> exprPlans = new ArrayList<ExprPlan>();
-			currentPlans.push(currentPlan);
-			for(LogicalPlan lp : plans) {
-				currentPlan = new ExprPlan();
-				PlanWalker<LogicalOperator, LogicalPlan> childWalker = mCurrentWalker.spawnChildWalker(lp);
-				pushWalker(childWalker);
-				mCurrentWalker.walk(this);
-				exprPlans.add((ExprPlan) currentPlan);
-				popWalker();
-				
-			}
-			currentPlan = currentPlans.pop();
-			physOp.setPlans(exprPlans);
-			physOp.setIndex(count++);
-			if(plans.size() > 1) {
-				type = DataType.TUPLE;
-				physOp.setKeyType(type);
-			} else {
-				type = exprPlans.get(0).getLeaves().get(0).getResultType();
-				physOp.setKeyType(type);
-			}
-			physOp.setResultType(DataType.TUPLE);
-			
-			currentPlan.add(physOp);
-			
-			try {
-				currentPlan.connect(LogToPhyMap.get(op), physOp);
-				currentPlan.connect(physOp, poGlobal);
-			} catch (PlanException e) {
-				log.error("Invalid physical operators in the physical plan" + e.getMessage());
-			}
-			
-		}
-		poPackage.setKeyType(type);
-		poPackage.setResultType(DataType.TUPLE);
+    NodeIdGenerator nodeGen = NodeIdGenerator.getGenerator();
+    
+    private Log log = LogFactory.getLog(getClass());
+    PigContext pc;
+    
+    public LogToPhyTranslationVisitor(LogicalPlan plan) {
+        super(plan, new DependencyOrderWalker<LogicalOperator, LogicalPlan>(plan));
+
+        currentPlans = new Stack<PhysicalPlan<? extends PhysicalOperator>>();
+        currentPlan = new PhysicalPlan<PhysicalOperator>();
+        LogToPhyMap = new HashMap<LogicalOperator, PhysicalOperator>();
+    }
+    
+    public void setPigContext(PigContext pc) {
+        this.pc = pc;
+    }
+    
+    public PhysicalPlan<PhysicalOperator> getPhysicalPlan() {
+
+        return currentPlan;
+    }
+    
+    @Override
+    public void visit(LOGreaterThan op) throws VisitorException {
+        String scope = op.getOperatorKey().scope;
+        ExpressionOperator exprOp = new GreaterThanExpr(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), op.getRequestedParallelism());
+        exprOp.setResultType(op.getLhsOperand().getType());
+        LogicalPlan lp = op.mPlan;
+        
+        currentPlan.add(exprOp);
+        LogToPhyMap.put(op, exprOp);
+        
+        List<LogicalOperator> predecessors = lp.getPredecessors(op);
+        
+        if(predecessors == null) return;
+        for(LogicalOperator lo : predecessors) {
+            PhysicalOperator from = LogToPhyMap.get(lo);
+            try {
+                //currentExprPlan.connect(from, exprOp);
+                currentPlan.connect(from, exprOp);
+            } catch (PlanException e) {
+                log.error("Invalid physical operators in the physical plan" + e.getMessage());
+            }
+        }
+    }
+    
+    @Override
+    public void visit(LOLesserThan op) throws VisitorException {
+        String scope = op.getOperatorKey().scope;
+        ExpressionOperator exprOp = new LessThanExpr(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), op.getRequestedParallelism());
+        exprOp.setResultType(op.getLhsOperand().getType());
+        LogicalPlan lp = op.mPlan;
+        
+        currentPlan.add(exprOp);
+        LogToPhyMap.put(op, exprOp);
+        
+        List<LogicalOperator> predecessors = lp.getPredecessors(op);
+        
+        if(predecessors == null) return;
+        for(LogicalOperator lo : predecessors) {
+            PhysicalOperator from = LogToPhyMap.get(lo);
+            try {
+                currentPlan.connect(from, exprOp);
+            } catch (PlanException e) {
+                log.error("Invalid physical operators in the physical plan" + e.getMessage());
+            }
+        }
+    }
+    
+    @Override
+    public void visit(LOGreaterThanEqual op) throws VisitorException {
+        String scope = op.getOperatorKey().scope;
+        ExpressionOperator exprOp = new GTOrEqualToExpr(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), op.getRequestedParallelism());
+        exprOp.setResultType(op.getLhsOperand().getType());
+        LogicalPlan lp = op.mPlan;
+        
+        currentPlan.add(exprOp);
+        LogToPhyMap.put(op, exprOp);
+        
+        List<LogicalOperator> predecessors = lp.getPredecessors(op);
+        if(predecessors == null) return;
+        for(LogicalOperator lo : predecessors) {
+            PhysicalOperator from = LogToPhyMap.get(lo);
+            try {
+                currentPlan.connect(from, exprOp);
+            } catch (PlanException e) {
+                log.error("Invalid physical operators in the physical plan" + e.getMessage());
+            }
+        }
+    }
+    
+    @Override
+    public void visit(LOLesserThanEqual op) throws VisitorException {
+        String scope = op.getOperatorKey().scope;
+        ExpressionOperator exprOp = new LTOrEqualToExpr(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), op.getRequestedParallelism());
+        exprOp.setResultType(op.getLhsOperand().getType());
+        LogicalPlan lp = op.mPlan;
+        
+        currentPlan.add(exprOp);
+        LogToPhyMap.put(op, exprOp);
+        
+        List<LogicalOperator> predecessors = lp.getPredecessors(op);
+        if(predecessors == null) return;
+        for(LogicalOperator lo : predecessors) {
+            PhysicalOperator from = LogToPhyMap.get(lo);
+            try {
+                currentPlan.connect(from, exprOp);
+            } catch (PlanException e) {
+                log.error("Invalid physical operators in the physical plan" + e.getMessage());
+            }
+        }
+    }
+    
+    @Override
+    public void visit(LOEqual op) throws VisitorException {
+        String scope = op.getOperatorKey().scope;
+        ExpressionOperator exprOp = new EqualToExpr(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), op.getRequestedParallelism());
+        exprOp.setResultType(op.getLhsOperand().getType());
+        LogicalPlan lp = op.mPlan;
+        
+        currentPlan.add(exprOp);
+        LogToPhyMap.put(op, exprOp);
+        
+        List<LogicalOperator> predecessors = lp.getPredecessors(op);
+        if(predecessors == null) return;
+        for(LogicalOperator lo : predecessors) {
+            PhysicalOperator from = LogToPhyMap.get(lo);
+            try {
+                currentPlan.connect(from, exprOp);
+            } catch (PlanException e) {
+                log.error("Invalid physical operators in the physical plan" + e.getMessage());
+            }
+        }
+    }
+    
+    @Override
+    public void visit(LONotEqual op) throws VisitorException {
+        String scope = op.getOperatorKey().scope;
+        ExpressionOperator exprOp = new NotEqualToExpr(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), op.getRequestedParallelism());
+        exprOp.setResultType(op.getLhsOperand().getType());
+        LogicalPlan lp = op.mPlan;
+        
+        currentPlan.add(exprOp);
+        LogToPhyMap.put(op, exprOp);
+        
+        List<LogicalOperator> predecessors = lp.getPredecessors(op);
+        if(predecessors == null) return;
+        for(LogicalOperator lo : predecessors) {
+            PhysicalOperator from = LogToPhyMap.get(lo);
+            try {
+                currentPlan.connect(from, exprOp);
+            } catch (PlanException e) {
+                log.error("Invalid physical operators in the physical plan" + e.getMessage());
+            }
+        }
+    }
+
+    @Override
+    public void visit(LORegexp op) throws VisitorException {
+        String scope = op.getOperatorKey().scope;
+        ExpressionOperator exprOp = new PORegexp(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), op.getRequestedParallelism());
+        exprOp.setResultType(op.getLhsOperand().getType());
+        LogicalPlan lp = op.mPlan;
+        
+        currentPlan.add(exprOp);
+        LogToPhyMap.put(op, exprOp);
+        
+        List<LogicalOperator> predecessors = lp.getPredecessors(op);
+        if(predecessors == null) return;
+        for(LogicalOperator lo : predecessors) {
+            PhysicalOperator from = LogToPhyMap.get(lo);
+            try {
+                currentPlan.connect(from, exprOp);
+            } catch (PlanException e) {
+                log.error("Invalid physical operators in the physical plan" + e.getMessage());
+            }
+        }
+    }
+    
+    @Override
+    public void visit(LOAdd op) throws VisitorException {
+        String scope = op.getOperatorKey().scope;
+        ExpressionOperator exprOp = new Add(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), op.getRequestedParallelism());
+        exprOp.setResultType(op.getType());
+        LogicalPlan lp = op.mPlan;
+        
+        currentPlan.add(exprOp);
+        LogToPhyMap.put(op, exprOp);
+        
+        List<LogicalOperator> predecessors = lp.getPredecessors(op);
+        if(predecessors == null) return;
+        for(LogicalOperator lo : predecessors) {
+            PhysicalOperator from = LogToPhyMap.get(lo);
+            try {
+                currentPlan.connect(from, exprOp);
+            } catch (PlanException e) {
+                log.error("Invalid physical operators in the physical plan" + e.getMessage());
+            }
+        }
+    }
+    
+    @Override
+    public void visit(LOSubtract op) throws VisitorException {
+        String scope = op.getOperatorKey().scope;
+        ExpressionOperator exprOp = new Subtract(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), op.getRequestedParallelism());
+        exprOp.setResultType(op.getType());
+        LogicalPlan lp = op.mPlan;
+        
+        currentPlan.add(exprOp);
+        LogToPhyMap.put(op, exprOp);
+        
+        List<LogicalOperator> predecessors = lp.getPredecessors(op);
+        if(predecessors == null) return;
+        for(LogicalOperator lo : predecessors) {
+            PhysicalOperator from = LogToPhyMap.get(lo);
+            try {
+                currentPlan.connect(from, exprOp);
+            } catch (PlanException e) {
+                log.error("Invalid physical operators in the physical plan" + e.getMessage());
+            }
+        }
+    }
+    
+    @Override
+    public void visit(LOMultiply op) throws VisitorException {
+        String scope = op.getOperatorKey().scope;
+        ExpressionOperator exprOp = new Multiply(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), op.getRequestedParallelism());
+        exprOp.setResultType(op.getType());
+        LogicalPlan lp = op.mPlan;
+        
+        currentPlan.add(exprOp);
+        LogToPhyMap.put(op, exprOp);
+        
+        List<LogicalOperator> predecessors = lp.getPredecessors(op);
+        if(predecessors == null) return;
+        for(LogicalOperator lo : predecessors) {
+            PhysicalOperator from = LogToPhyMap.get(lo);
+            try {
+                currentPlan.connect(from, exprOp);
+            } catch (PlanException e) {
+                log.error("Invalid physical operators in the physical plan" + e.getMessage());
+            }
+        }
+    }
+    
+    @Override
+    public void visit(LODivide op) throws VisitorException {
+        String scope = op.getOperatorKey().scope;
+        ExpressionOperator exprOp = new Divide(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), op.getRequestedParallelism());
+        exprOp.setResultType(op.getType());
+        LogicalPlan lp = op.mPlan;
+        
+        currentPlan.add(exprOp);
+        LogToPhyMap.put(op, exprOp);
+        
+        List<LogicalOperator> predecessors = lp.getPredecessors(op);
+        if(predecessors == null) return;
+        for(LogicalOperator lo : predecessors) {
+            PhysicalOperator from = LogToPhyMap.get(lo);
+            try {
+                currentPlan.connect(from, exprOp);
+            } catch (PlanException e) {
+                log.error("Invalid physical operators in the physical plan" + e.getMessage());
+            }
+        }
+    }
+    
+    @Override
+    public void visit(LOMod op) throws VisitorException {
+        String scope = op.getOperatorKey().scope;
+        ExpressionOperator exprOp = new Mod(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), op.getRequestedParallelism());
+        exprOp.setResultType(op.getType());
+        LogicalPlan lp = op.mPlan;
+        
+        currentPlan.add(exprOp);
+        LogToPhyMap.put(op, exprOp);
+        
+        List<LogicalOperator> predecessors = lp.getPredecessors(op);
+        if(predecessors == null) return;
+        for(LogicalOperator lo : predecessors) {
+            PhysicalOperator from = LogToPhyMap.get(lo);
+            try {
+                currentPlan.connect(from, exprOp);
+            } catch (PlanException e) {
+                log.error("Invalid physical operators in the physical plan" + e.getMessage());
+            }
+        }
+    }
+
+    @Override
+    public void visit(LOAnd op) throws VisitorException {
+        String scope = op.getOperatorKey().scope;
+        ExpressionOperator exprOp = new POAnd(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), op.getRequestedParallelism());
+        exprOp.setResultType(DataType.BOOLEAN);
+        LogicalPlan lp = op.mPlan;
+        
+        currentPlan.add(exprOp);
+        LogToPhyMap.put(op, exprOp);
+        
+        List<LogicalOperator> predecessors = lp.getPredecessors(op);
+        if(predecessors == null) return;
+        for(LogicalOperator lo : predecessors) {
+            PhysicalOperator from = LogToPhyMap.get(lo);
+            try {
+                currentPlan.connect(from, exprOp);
+            } catch (PlanException e) {
+                log.error("Invalid physical operators in the physical plan" + e.getMessage());
+            }
+        }
+    }
+
+    @Override
+    public void visit(LOOr op) throws VisitorException {
+        String scope = op.getOperatorKey().scope;
+        ExpressionOperator exprOp = new POOr(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), op.getRequestedParallelism());
+        exprOp.setResultType(DataType.BOOLEAN);
+        LogicalPlan lp = op.mPlan;
+        
+        currentPlan.add(exprOp);
+        LogToPhyMap.put(op, exprOp);
+        
+        List<LogicalOperator> predecessors = lp.getPredecessors(op);
+        if(predecessors == null) return;
+        for(LogicalOperator lo : predecessors) {
+            PhysicalOperator from = LogToPhyMap.get(lo);
+            try {
+                currentPlan.connect(from, exprOp);
+            } catch (PlanException e) {
+                log.error("Invalid physical operators in the physical plan" + e.getMessage());
+            }
+        }
+    }
+
+    @Override
+    public void visit(LONot op) throws VisitorException {
+        String scope = op.getOperatorKey().scope;
+        ExpressionOperator exprOp = new PONot(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), op.getRequestedParallelism());
+        exprOp.setResultType(DataType.BOOLEAN);
+        LogicalPlan lp = op.mPlan;
+        
+        currentPlan.add(exprOp);
+        LogToPhyMap.put(op, exprOp);
+        
+        List<LogicalOperator> predecessors = lp.getPredecessors(op);
+        if(predecessors == null) return;
+        PhysicalOperator from = LogToPhyMap.get(predecessors.get(0));
+        try {
+            currentPlan.connect(from, exprOp);
+        } catch (PlanException e) {
+            log.error("Invalid physical operators in the physical plan" + e.getMessage());
+        }
+    }
+    
+    @Override
+    public void visit(LOCogroup cg) throws VisitorException {
+        boolean currentPhysicalPlan = false;
+        String scope = cg.getOperatorKey().scope;
+        List<LogicalOperator> inputs = cg.getInputs();
+        
+        POGlobalRearrange poGlobal = new POGlobalRearrange(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), cg.getRequestedParallelism());
+        POPackage poPackage = new POPackage(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), cg.getRequestedParallelism());
+        
+        currentPlan.add(poGlobal);
+        currentPlan.add(poPackage);
+        
+        try {
+            currentPlan.connect(poGlobal, poPackage);
+        } catch (PlanException e1) {
+            log.error("Invalid physical operators in the physical plan" + e1.getMessage());
+        }
+        
+        int count = 0;
+        Byte type = null;
+        for(LogicalOperator op : inputs) {
+            List<LogicalPlan> plans = (List<LogicalPlan>) cg.getGroupByPlans().get(op);
+            POLocalRearrange physOp = new POLocalRearrange(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), cg.getRequestedParallelism());
+            List<ExprPlan> exprPlans = new ArrayList<ExprPlan>();
+            currentPlans.push(currentPlan);
+            for(LogicalPlan lp : plans) {
+                currentPlan = new ExprPlan();
+                PlanWalker<LogicalOperator, LogicalPlan> childWalker = mCurrentWalker.spawnChildWalker(lp);
+                pushWalker(childWalker);
+                mCurrentWalker.walk(this);
+                exprPlans.add((ExprPlan) currentPlan);
+                popWalker();
+                
+            }
+            currentPlan = currentPlans.pop();
+            physOp.setPlans(exprPlans);
+            physOp.setIndex(count++);
+            if(plans.size() > 1) {
+                type = DataType.TUPLE;
+                physOp.setKeyType(type);
+            } else {
+                type = exprPlans.get(0).getLeaves().get(0).getResultType();
+                physOp.setKeyType(type);
+            }
+            physOp.setResultType(DataType.TUPLE);
+            
+            currentPlan.add(physOp);
+            
+            try {
+                currentPlan.connect(LogToPhyMap.get(op), physOp);
+                currentPlan.connect(physOp, poGlobal);
+            } catch (PlanException e) {
+                log.error("Invalid physical operators in the physical plan" + e.getMessage());
+            }
+            
+        }
+        poPackage.setKeyType(type);
+        poPackage.setResultType(DataType.TUPLE);
         poPackage.setNumInps(count);
         poPackage.setInner(cg.getInner());
-		LogToPhyMap.put(cg, poPackage);
-	}
+        LogToPhyMap.put(cg, poPackage);
+    }
 
-	@Override
-	public void visit(LOFilter filter) throws VisitorException {
-		String scope = filter.getOperatorKey().scope;
-		POFilter poFilter = new POFilter(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), filter.getRequestedParallelism());
-		poFilter.setResultType(filter.getType());
-		currentPlan.add(poFilter);
-		LogToPhyMap.put(filter, poFilter);
-		currentPlans.push(currentPlan);
-		
-		currentPlan = new ExprPlan();
-		
-		PlanWalker<LogicalOperator, LogicalPlan> childWalker = mCurrentWalker.spawnChildWalker(filter.getComparisonPlan());
-		pushWalker(childWalker);
-		mCurrentWalker.walk(this);
-		popWalker();
-		
-		poFilter.setPlan((ExprPlan) currentPlan);
-		currentPlan = currentPlans.pop();
-		
-		List<LogicalOperator> op = filter.getPlan().getPredecessors(filter);
-		
-		PhysicalOperator from = LogToPhyMap.get(op.get(0));
-		try {
-			currentPlan.connect(from, poFilter);
-		} catch (PlanException e) {
-			log.error("Invalid physical operators in the physical plan" + e.getMessage());
-		}
-	}
-	
-	@Override
-	public void visit(LOProject op) throws VisitorException {
-		String scope = op.getOperatorKey().scope;
-		POProject exprOp = new POProject(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), op.getRequestedParallelism());
-		exprOp.setResultType(op.getType());
-		exprOp.setColumn(op.getCol());
-		exprOp.setStar(op.isStar());
-		LogicalPlan lp = op.mPlan;
-		LogToPhyMap.put(op, exprOp);
-		currentPlan.add(exprOp);
-		
-		List<LogicalOperator> predecessors = lp.getPredecessors(op);
-		
-		//Project might not have any predecessors
-		if(predecessors == null) return;
-		
-		for(LogicalOperator lo : predecessors) {
-			PhysicalOperator from = LogToPhyMap.get(lo);
-			try {
-				currentPlan.connect(from, exprOp);
-			} catch (PlanException e) {
-				
-				log.error("Invalid physical operators in the physical plan" + e.getMessage());
-			}
-		}
-	}
-
-	@Override
-	public void visit(LOForEach forEach) throws VisitorException {
-		String scope = forEach.getOperatorKey().scope;
-		//This needs to be handled specially.
-		//We need to be able to handle arbitrary levels of nesting
-		
-		//push the current physical plan in the stack.
-		currentPlans.push(currentPlan);
-		
-		//create a new physical plan
-		currentPlan = new PhysicalPlan<PhysicalOperator>();
-		PlanWalker<LogicalOperator, LogicalPlan> childWalker = mCurrentWalker.spawnChildWalker(forEach.getForEachPlan());
-		
-		//now populate the physical plan by walking
-		pushWalker(childWalker);
-		mCurrentWalker.walk(this);
-		popWalker();
-		
-		POForEach fe = new POForEach(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), forEach.getRequestedParallelism());
-		fe.setPlan(currentPlan);
-		fe.setResultType(DataType.TUPLE);
-		LogToPhyMap.put(forEach, fe);
-		
-		//now connect foreach to its inputs
-		currentPlan = currentPlans.pop();
-		currentPlan.add(fe);
-		PhysicalOperator<PhyPlanVisitor> from = LogToPhyMap.get(mPlan.getPredecessors(forEach).get(0));
-		try {
-			currentPlan.connect(from, fe);
-		} catch (PlanException e) {
-			log.error("Invalid physical operators in the physical plan" + e.getMessage());
-		
-		}
-		
-	}
-
-	@Override
-	public void visit(LOGenerate g) throws VisitorException {
-		boolean currentPhysicalPlan = false;
-		String scope = g.getOperatorKey().scope;
-		List<ExprPlan> exprPlans = new ArrayList<ExprPlan>();
-		List<LogicalPlan> plans = g.getGeneratePlans();
-		
-		currentPlans.push(currentPlan);
-		for(LogicalPlan plan : plans) {
-			currentPlan = new ExprPlan();
-			PlanWalker<LogicalOperator, LogicalPlan> childWalker = mCurrentWalker.spawnChildWalker(plan);
-			pushWalker(childWalker);
-			childWalker.walk(this);
-			exprPlans.add((ExprPlan) currentPlan);
-			popWalker();
-		}
-		currentPlan = currentPlans.pop();
-		
-		//PhysicalOperator poGen = new POGenerate(new OperatorKey("", r.nextLong()), inputs, toBeFlattened);
-		PhysicalOperator poGen = new POGenerate(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), g.getRequestedParallelism(), exprPlans, g.getFlatten());
-		poGen.setResultType(DataType.TUPLE);
-		LogToPhyMap.put(g, poGen);
-		currentPlan.add(poGen);
-		
-		//generate cannot have multiple inputs
-		List<LogicalOperator> op = g.getPlan().getPredecessors(g);
-		
-		//generate may not have any predecessors
-		if(op == null)
-			return;
-		
-		PhysicalOperator from = LogToPhyMap.get(op.get(0));
-		try {
-			currentPlan.connect(from, poGen);
-		} catch (PlanException e) {
-			log.error("Invalid physical operators in the physical plan" + e.getMessage());
-		}
-		
-	}
-
-	@Override
-	public void visit(LOSort s) throws VisitorException {
-		String scope = s.getOperatorKey().scope;
-		List<LogicalPlan> logPlans = s.getSortColPlans();
-		List<ExprPlan> sortPlans = new ArrayList<ExprPlan>(logPlans.size());
-		
-		//convert all the logical expression plans to physical expression plans
-		currentPlans.push(currentPlan);
-		for(LogicalPlan plan : logPlans) {
-			currentPlan = new ExprPlan();
-			PlanWalker<LogicalOperator, LogicalPlan> childWalker = mCurrentWalker.spawnChildWalker(plan);
-			pushWalker(childWalker);
-			childWalker.walk(this);
-			sortPlans.add((ExprPlan) currentPlan);
-			popWalker();
-		}
-		currentPlan = currentPlans.pop();
-		
-		//get the physical operator for sort
-		POSort sort;
-		if(s.getUserFunc() == null) { 
-			sort = new POSort(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), s.getRequestedParallelism(), null, sortPlans, s.getAscendingCols(), null);
-		} else {
-			POUserFunc comparator = new POUserComparisonFunc(
-					new OperatorKey(scope, nodeGen.getNextNodeId(scope)), s.getRequestedParallelism(), null, s.getUserFunc());
-			sort = new POSort(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), s.getRequestedParallelism(), null,
-					sortPlans, s.getAscendingCols(), comparator);
-		}
-		sort.setRequestedParallelism(s.getType());
-		LogToPhyMap.put(s, sort);
-		currentPlan.add(sort);
-		PhysicalOperator<PhyPlanVisitor> from = LogToPhyMap.get(s.mPlan.getPredecessors(s).get(0));
-		try {
-			currentPlan.connect(from, sort);
-		} catch (PlanException e) {
-			log.error("Invalid physical operator in the plan" + e.getMessage());
-		}
-		
-		sort.setResultType(s.getType());
-		
-	}
-	
-	@Override
-	public void visit(LODistinct op) throws VisitorException {
-		String scope = op.getOperatorKey().scope;
-		//This is simpler. No plans associated with this. Just create the physical operator,
-		//push it in the current plan and make the connections
-		PhysicalOperator physOp = new PODistinct(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), op.getRequestedParallelism());
-		physOp.setResultType(op.getType());
-		LogToPhyMap.put(op, physOp);
-		currentPlan.add(physOp);
-		//Distinct will only have a single input
-		PhysicalOperator from = LogToPhyMap.get(op.mPlan.getPredecessors(op).get(0));
-		try {
-			currentPlan.connect(from, physOp);
-		} catch (PlanException e) {
-			log.error("Invalid physical operator in the plan" + e.getMessage());
-		}
-	}
-	
-/*	public void visit(LOSplit split) throws VisitorException {
-		String scope = split.getKey().scope;
-		PhysicalOperator physOp = new POSplit(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), split.getRequestedParallelism());
-		LogToPhyMap.put(split, physOp);
-		
-		currentPlan.add(physOp);
-		PhysicalOperator from = LogToPhyMap.get(split.getPlan().getPredecessors(split).get(0));
-		try {
-			currentPlan.connect(from, physOp);
-		} catch (PlanException e) {
-			log.error("Invalid physical operator in the plan" + e.getMessage());
-		}
-		
-		Collection<LogicalPlan> plans = split.getConditionPlans();
-		
-		for(LogicalPlan plan : plans) {
-			currentPlans.push(currentPlan);
-			currentPlan = new ExprPlan();
-			PlanWalker<LogicalOperator, LogicalPlan> childWalker = mCurrentWalker.spawnChildWalker(plan);
-			pushWalker(childWalker);
-			mCurrentWalker.walk(this);
-			popWalker();
-			PhysicalOperator filter = new POFilter(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), split.getRequestedParallelism());
-			((POFilter) filter).setPlan((ExprPlan) currentPlan);
-			currentPlan = currentPlans.pop();
-			currentPlan.add(filter);
-			try {
-				currentPlan.connect(physOp, filter);
-			} catch (PlanException e) {
-				log.error("Invalid physical operator in the plan" + e.getMessage());
-			}
-		}
-		
-		
-	}*/
-	
-	@Override
-	public void visit(LOSplit split) throws VisitorException {
-		String scope = split.getOperatorKey().scope;
-		PhysicalOperator physOp = new POSplit(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), split.getRequestedParallelism());
-		LogToPhyMap.put(split, physOp);
-		
-		currentPlan.add(physOp);
-		PhysicalOperator from = LogToPhyMap.get(split.getPlan().getPredecessors(split).get(0));
-		try {
-			currentPlan.connect(from, physOp);
-		} catch (PlanException e) {
-			log.error("Invalid physical operator in the plan" + e.getMessage());
-		}
-	}
-	
-	@Override
-	public void visit(LOSplitOutput split) throws VisitorException {
-		String scope = split.getOperatorKey().scope;
-		PhysicalOperator physOp = new POFilter(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), split.getRequestedParallelism());
-		LogToPhyMap.put(split, physOp);
-		
-		currentPlan.add(physOp);
-		currentPlans.push(currentPlan);
-		currentPlan = new ExprPlan();
-		PlanWalker<LogicalOperator, LogicalPlan> childWalker = mCurrentWalker.spawnChildWalker(split.getConditionPlan());
-		pushWalker(childWalker);
-		mCurrentWalker.walk(this);
-		popWalker();
-		
-		((POFilter) physOp).setPlan((ExprPlan) currentPlan);
-		currentPlan = currentPlans.pop();
-		currentPlan.add(physOp);
-		PhysicalOperator from = LogToPhyMap.get(split.getPlan().getPredecessors(split).get(0));
-		try {
-			currentPlan.connect(from, physOp);
-		} catch (PlanException e) {
-			log.error("Invalid physical operator in the plan" + e.getMessage());
-		}
-	}
-
-	
-	@Override
-	public void visit(LOUserFunc func) throws VisitorException {
-		String scope = func.getOperatorKey().scope;
-		Object f = PigContext.instantiateFuncFromSpec(func.getFuncSpec());
-		PhysicalOperator p;
-		if(f instanceof EvalFunc) { 
-			p = new POUserFunc(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), func.getRequestedParallelism(), null, func.getFuncSpec(), (EvalFunc)f);
-		} else {
-			p = new POUserComparisonFunc(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), func.getRequestedParallelism(), null, func.getFuncSpec(), (ComparisonFunc)f);
-		}
-		p.setResultType(func.getType());
-		currentPlan.add(p);
-		List<LogicalOperator> fromList = func.getPlan().getPredecessors(func);
-		for(LogicalOperator op : fromList) {
-			PhysicalOperator<PhyPlanVisitor> from = LogToPhyMap.get(op);
-			try {
-				currentPlan.connect(from, p);
-			} catch (PlanException e) {
-				log.error("Invalid physical operator in the plan" + e.getMessage());
-			}	
-		}
-		LogToPhyMap.put(func, p);
-		
-	}
-	
-	@Override
-	public void visit(LOLoad loLoad) throws VisitorException {
-		String scope = loLoad.getOperatorKey().scope;
-		//This would be a root operator. We don't need to worry about finding its predecessors
-		POLoad load = new POLoad(new OperatorKey(scope, nodeGen.getNextNodeId(scope)));
-		load.setLFile(loLoad.getInputFile());
-		load.setPc(pc);
-		load.setResultType(loLoad.getType());
-		currentPlan.add(load);
-		LogToPhyMap.put(loLoad, load);
-	}
-	
-	@Override
-	public void visit(LOStore loStore) throws VisitorException {
-		String scope = loStore.getOperatorKey().scope;
-		POStore store = new POStore(new OperatorKey(scope, nodeGen.getNextNodeId(scope)));
-		store.setSFile(loStore.getOutputFile());
-		store.setPc(pc);
-		currentPlan.add(store);
-		PhysicalOperator<PhyPlanVisitor> from = LogToPhyMap.get(loStore.getPlan().getPredecessors(loStore).get(0));
-		try {
-			currentPlan.connect(from, store);
-		} catch (PlanException e) {
-			log.error("Invalid physical operator in the plan" + e.getMessage());
-		}
-		LogToPhyMap.put(loStore, store);
-	}
-	
-	@Override
-	public void visit(LOConst op) throws VisitorException {
-		String scope = op.getOperatorKey().scope;
-		ConstantExpression ce = new ConstantExpression(new OperatorKey(scope, nodeGen.getNextNodeId(scope)));
-		ce.setValue(op.getValue());
-		ce.setResultType(op.getType());
-		//this operator doesn't have any predecessors
-		currentPlan.add(ce);
-		LogToPhyMap.put(op, ce);
-	}
-	
-	@Override
-	public void visit(LOBinCond op) throws VisitorException {
-		String scope = op.getOperatorKey().scope;
-		ExpressionOperator physOp = new POBinCond(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), op.getRequestedParallelism());
-		LogToPhyMap.put(op, physOp);
-		POBinCond phy = (POBinCond)physOp;
-		ExpressionOperator cond = (ExpressionOperator) LogToPhyMap.get(op.getCond());
-		phy.setCond(cond);
-		ExpressionOperator lhs = (ExpressionOperator) LogToPhyMap.get(op.getLhsOp());
-		phy.setLhs(lhs);
-		ExpressionOperator rhs = (ExpressionOperator) LogToPhyMap.get(op.getRhsOp());
-		phy.setRhs(rhs);
-		
-		currentPlan.add(physOp);
-		
-		List<LogicalOperator> ops = op.getPlan().getPredecessors(op);
-		
-		for(LogicalOperator l : ops) {
-			ExpressionOperator from = (ExpressionOperator) LogToPhyMap.get(l);
-			try {
-				currentPlan.connect(from, physOp);
-			} catch (PlanException e) {
-				log.error("Invalid physical operator in the plan" + e.getMessage());
-			}
-		}
-		
-	}
-	
-	@Override
-	public void visit(LONegative op) throws VisitorException {
-		String scope = op.getOperatorKey().scope;
-		ExpressionOperator physOp = new PONegative(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), op.getRequestedParallelism(), null);
-		currentPlan.add(physOp);
-		
-		LogToPhyMap.put(op, physOp);
-		ExpressionOperator from = (ExpressionOperator) LogToPhyMap.get(op.getPlan().getPredecessors(op).get(0));
-		((PONegative)physOp).setInput(from);
-		try {
-			currentPlan.connect(from, physOp);
-		} catch (PlanException e) {
-			log.error("Invalid physical operator in the plan" + e.getMessage());
-		}
-		
-	}
-	
-	@Override
-	public void visit(LOIsNull op) throws VisitorException {
-		String scope = op.getOperatorKey().scope;
-		ExpressionOperator physOp = new POIsNull(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), op.getRequestedParallelism(), null);
-		currentPlan.add(physOp);
-		
-		LogToPhyMap.put(op, physOp);
-		ExpressionOperator from = (ExpressionOperator) LogToPhyMap.get(op.getPlan().getPredecessors(op).get(0));
-		((POIsNull)physOp).setInput(from);
-		try {
-			currentPlan.connect(from, physOp);
-		} catch (PlanException e) {
-			log.error("Invalid physical operator in the plan" + e.getMessage());
-		}
-		
-	}
-	
-	@Override
-	public void visit(LOMapLookup op) throws VisitorException {
-		String scope = ((OperatorKey)op.getOperatorKey()).scope;
-		ExpressionOperator physOp = new POMapLookUp(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), op.getRequestedParallelism(), op.getLookUpKey());
-		physOp.setResultType(op.getType());
-		currentPlan.add(physOp);
-		
-		LogToPhyMap.put(op, physOp);
-		
-		ExpressionOperator from = (ExpressionOperator) LogToPhyMap.get(op.getMap());
-		try {
-			currentPlan.connect(from, physOp);
-		} catch (PlanException e) {
-			log.error("Invalid physical operator in the plan" + e.getMessage());
-		}
-		
-	}
-	
-	@Override
-	public void visit(LOCast op) throws VisitorException {
-		String scope = op.getOperatorKey().scope;
-		ExpressionOperator physOp = new POCast(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), op.getRequestedParallelism());
-		currentPlan.add(physOp);
-		
-		LogToPhyMap.put(op, physOp);
-		ExpressionOperator from = (ExpressionOperator) LogToPhyMap.get(op.getExpression());
-		physOp.setResultType(op.getType());
-		
-		try {
-			currentPlan.connect(from, physOp);
-		} catch (PlanException e) {
-			log.error("Invalid physical operator in the plan" + e.getMessage());
-		}
-		
-	}
-	
-	@Override
-	public void visit(LOUnion op) throws VisitorException {
-		String scope = op.getOperatorKey().scope;
-		POUnion physOp = new POUnion(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), op.getRequestedParallelism());
-		currentPlan.add(physOp);
-		physOp.setResultType(op.getType());
-		LogToPhyMap.put(op, physOp);
-		List<LogicalOperator> ops = op.getInputs();
-		
-		for(LogicalOperator l : ops) {
-			PhysicalOperator from = LogToPhyMap.get(l);
-			try {
-				currentPlan.connect(from, physOp);
-			} catch (PlanException e) {
-				log.error("Invalid physical operator in the plan" + e.getMessage());
-			}
-		}
-	}
-	
-	
+    @Override
+    public void visit(LOFilter filter) throws VisitorException {
+        String scope = filter.getOperatorKey().scope;
+        POFilter poFilter = new POFilter(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), filter.getRequestedParallelism());
+        poFilter.setResultType(filter.getType());
+        currentPlan.add(poFilter);
+        LogToPhyMap.put(filter, poFilter);
+        currentPlans.push(currentPlan);
+        
+        currentPlan = new ExprPlan();
+        
+        PlanWalker<LogicalOperator, LogicalPlan> childWalker = mCurrentWalker.spawnChildWalker(filter.getComparisonPlan());
+        pushWalker(childWalker);
+        mCurrentWalker.walk(this);
+        popWalker();
+        
+        poFilter.setPlan((ExprPlan) currentPlan);
+        currentPlan = currentPlans.pop();
+        
+        List<LogicalOperator> op = filter.getPlan().getPredecessors(filter);
+        
+        PhysicalOperator from = LogToPhyMap.get(op.get(0));
+        try {
+            currentPlan.connect(from, poFilter);
+        } catch (PlanException e) {
+            log.error("Invalid physical operators in the physical plan" + e.getMessage());
+        }
+    }
+    
+    @Override
+    public void visit(LOProject op) throws VisitorException {
+        String scope = op.getOperatorKey().scope;
+        POProject exprOp = new POProject(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), op.getRequestedParallelism());
+        exprOp.setResultType(op.getType());
+        exprOp.setColumn(op.getCol());
+        exprOp.setStar(op.isStar());
+        LogicalPlan lp = op.mPlan;
+        LogToPhyMap.put(op, exprOp);
+        currentPlan.add(exprOp);
+        
+        List<LogicalOperator> predecessors = lp.getPredecessors(op);
+        
+        //Project might not have any predecessors
+        if(predecessors == null) return;
+        
+        for(LogicalOperator lo : predecessors) {
+            PhysicalOperator from = LogToPhyMap.get(lo);
+            try {
+                currentPlan.connect(from, exprOp);
+            } catch (PlanException e) {
+                
+                log.error("Invalid physical operators in the physical plan" + e.getMessage());
+            }
+        }
+    }
+
+    @Override
+    public void visit(LOForEach forEach) throws VisitorException {
+        String scope = forEach.getOperatorKey().scope;
+        //This needs to be handled specially.
+        //We need to be able to handle arbitrary levels of nesting
+        
+        //push the current physical plan in the stack.
+        currentPlans.push(currentPlan);
+        
+        //create a new physical plan
+        currentPlan = new PhysicalPlan<PhysicalOperator>();
+        PlanWalker<LogicalOperator, LogicalPlan> childWalker = mCurrentWalker.spawnChildWalker(forEach.getForEachPlan());
+        
+        //now populate the physical plan by walking
+        pushWalker(childWalker);
+        mCurrentWalker.walk(this);
+        popWalker();
+        
+        POForEach fe = new POForEach(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), forEach.getRequestedParallelism());
+        fe.setPlan(currentPlan);
+        fe.setResultType(DataType.TUPLE);
+        LogToPhyMap.put(forEach, fe);
+        
+        //now connect foreach to its inputs
+        currentPlan = currentPlans.pop();
+        currentPlan.add(fe);
+        PhysicalOperator<PhyPlanVisitor> from = LogToPhyMap.get(mPlan.getPredecessors(forEach).get(0));
+        try {
+            currentPlan.connect(from, fe);
+        } catch (PlanException e) {
+            log.error("Invalid physical operators in the physical plan" + e.getMessage());
+        
+        }
+        
+    }
+
+    @Override
+    public void visit(LOGenerate g) throws VisitorException {
+        boolean currentPhysicalPlan = false;
+        String scope = g.getOperatorKey().scope;
+        List<ExprPlan> exprPlans = new ArrayList<ExprPlan>();
+        List<LogicalPlan> plans = g.getGeneratePlans();
+        
+        currentPlans.push(currentPlan);
+        for(LogicalPlan plan : plans) {
+            currentPlan = new ExprPlan();
+            PlanWalker<LogicalOperator, LogicalPlan> childWalker = mCurrentWalker.spawnChildWalker(plan);
+            pushWalker(childWalker);
+            childWalker.walk(this);
+            exprPlans.add((ExprPlan) currentPlan);
+            popWalker();
+        }
+        currentPlan = currentPlans.pop();
+        
+        //PhysicalOperator poGen = new POGenerate(new OperatorKey("", r.nextLong()), inputs, toBeFlattened);
+        PhysicalOperator poGen = new POGenerate(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), g.getRequestedParallelism(), exprPlans, g.getFlatten());
+        poGen.setResultType(DataType.TUPLE);
+        LogToPhyMap.put(g, poGen);
+        currentPlan.add(poGen);
+        
+        //generate cannot have multiple inputs
+        List<LogicalOperator> op = g.getPlan().getPredecessors(g);
+        
+        //generate may not have any predecessors
+        if(op == null)
+            return;
+        
+        PhysicalOperator from = LogToPhyMap.get(op.get(0));
+        try {
+            currentPlan.connect(from, poGen);
+        } catch (PlanException e) {
+            log.error("Invalid physical operators in the physical plan" + e.getMessage());
+        }
+        
+    }
+
+    @Override
+    public void visit(LOSort s) throws VisitorException {
+        String scope = s.getOperatorKey().scope;
+        List<LogicalPlan> logPlans = s.getSortColPlans();
+        List<ExprPlan> sortPlans = new ArrayList<ExprPlan>(logPlans.size());
+        
+        //convert all the logical expression plans to physical expression plans
+        currentPlans.push(currentPlan);
+        for(LogicalPlan plan : logPlans) {
+            currentPlan = new ExprPlan();
+            PlanWalker<LogicalOperator, LogicalPlan> childWalker = mCurrentWalker.spawnChildWalker(plan);
+            pushWalker(childWalker);
+            childWalker.walk(this);
+            sortPlans.add((ExprPlan) currentPlan);
+            popWalker();
+        }
+        currentPlan = currentPlans.pop();
+        
+        //get the physical operator for sort
+        POSort sort;
+        if(s.getUserFunc() == null) { 
+            sort = new POSort(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), s.getRequestedParallelism(), null, sortPlans, s.getAscendingCols(), null);
+        } else {
+            POUserFunc comparator = new POUserComparisonFunc(
+                    new OperatorKey(scope, nodeGen.getNextNodeId(scope)), s.getRequestedParallelism(), null, s.getUserFunc());
+            sort = new POSort(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), s.getRequestedParallelism(), null,
+                    sortPlans, s.getAscendingCols(), comparator);
+        }
+        sort.setRequestedParallelism(s.getType());
+        LogToPhyMap.put(s, sort);
+        currentPlan.add(sort);
+        PhysicalOperator<PhyPlanVisitor> from = LogToPhyMap.get(s.mPlan.getPredecessors(s).get(0));
+        try {
+            currentPlan.connect(from, sort);
+        } catch (PlanException e) {
+            log.error("Invalid physical operator in the plan" + e.getMessage());
+        }
+        
+        sort.setResultType(s.getType());
+        
+    }
+    
+    @Override
+    public void visit(LODistinct op) throws VisitorException {
+        String scope = op.getOperatorKey().scope;
+        //This is simpler. No plans associated with this. Just create the physical operator,
+        //push it in the current plan and make the connections
+        PhysicalOperator physOp = new PODistinct(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), op.getRequestedParallelism());
+        physOp.setResultType(op.getType());
+        LogToPhyMap.put(op, physOp);
+        currentPlan.add(physOp);
+        //Distinct will only have a single input
+        PhysicalOperator from = LogToPhyMap.get(op.mPlan.getPredecessors(op).get(0));
+        try {
+            currentPlan.connect(from, physOp);
+        } catch (PlanException e) {
+            log.error("Invalid physical operator in the plan" + e.getMessage());
+        }
+    }
+    
+/*    public void visit(LOSplit split) throws VisitorException {
+        String scope = split.getKey().scope;
+        PhysicalOperator physOp = new POSplit(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), split.getRequestedParallelism());
+        LogToPhyMap.put(split, physOp);
+        
+        currentPlan.add(physOp);
+        PhysicalOperator from = LogToPhyMap.get(split.getPlan().getPredecessors(split).get(0));
+        try {
+            currentPlan.connect(from, physOp);
+        } catch (PlanException e) {
+            log.error("Invalid physical operator in the plan" + e.getMessage());
+        }
+        
+        Collection<LogicalPlan> plans = split.getConditionPlans();
+        
+        for(LogicalPlan plan : plans) {
+            currentPlans.push(currentPlan);
+            currentPlan = new ExprPlan();
+            PlanWalker<LogicalOperator, LogicalPlan> childWalker = mCurrentWalker.spawnChildWalker(plan);
+            pushWalker(childWalker);
+            mCurrentWalker.walk(this);
+            popWalker();
+            PhysicalOperator filter = new POFilter(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), split.getRequestedParallelism());
+            ((POFilter) filter).setPlan((ExprPlan) currentPlan);
+            currentPlan = currentPlans.pop();
+            currentPlan.add(filter);
+            try {
+                currentPlan.connect(physOp, filter);
+            } catch (PlanException e) {
+                log.error("Invalid physical operator in the plan" + e.getMessage());
+            }
+        }
+        
+        
+    }*/
+    
+    @Override
+    public void visit(LOSplit split) throws VisitorException {
+        String scope = split.getOperatorKey().scope;
+        PhysicalOperator physOp = new POSplit(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), split.getRequestedParallelism());
+        LogToPhyMap.put(split, physOp);
+        
+        currentPlan.add(physOp);
+        PhysicalOperator from = LogToPhyMap.get(split.getPlan().getPredecessors(split).get(0));
+        try {
+            currentPlan.connect(from, physOp);
+        } catch (PlanException e) {
+            log.error("Invalid physical operator in the plan" + e.getMessage());
+        }
+    }
+    
+    @Override
+    public void visit(LOSplitOutput split) throws VisitorException {
+        String scope = split.getOperatorKey().scope;
+        PhysicalOperator physOp = new POFilter(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), split.getRequestedParallelism());
+        LogToPhyMap.put(split, physOp);
+        
+        currentPlan.add(physOp);
+        currentPlans.push(currentPlan);
+        currentPlan = new ExprPlan();
+        PlanWalker<LogicalOperator, LogicalPlan> childWalker = mCurrentWalker.spawnChildWalker(split.getConditionPlan());
+        pushWalker(childWalker);
+        mCurrentWalker.walk(this);
+        popWalker();
+        
+        ((POFilter) physOp).setPlan((ExprPlan) currentPlan);
+        currentPlan = currentPlans.pop();
+        currentPlan.add(physOp);
+        PhysicalOperator from = LogToPhyMap.get(split.getPlan().getPredecessors(split).get(0));
+        try {
+            currentPlan.connect(from, physOp);
+        } catch (PlanException e) {
+            log.error("Invalid physical operator in the plan" + e.getMessage());
+        }
+    }
+
+    @Override
+    public void visit(LOUserFunc func) throws VisitorException {
+        String scope = func.getOperatorKey().scope;
+        Object f = PigContext.instantiateFuncFromSpec(func.getFuncSpec());
+        PhysicalOperator p;
+        if(f instanceof EvalFunc) { 
+            p = new POUserFunc(new OperatorKey(scope,
+                nodeGen.getNextNodeId(scope)), func.getRequestedParallelism(),
+                null, func.getFuncSpec(), (EvalFunc)f);
+        } else {
+            p = new POUserComparisonFunc(new OperatorKey(scope,
+                nodeGen.getNextNodeId(scope)), func.getRequestedParallelism(),
+                null, func.getFuncSpec(), (ComparisonFunc)f);
+        }
+        p.setResultType(func.getType());
+        currentPlan.add(p);
+        List<LogicalOperator> fromList = func.getPlan().getPredecessors(func);
+        for(LogicalOperator op : fromList) {
+            PhysicalOperator<PhyPlanVisitor> from = LogToPhyMap.get(op);
+            try {
+                currentPlan.connect(from, p);
+            } catch (PlanException e) {
+                log.error("Invalid physical operator in the plan" + e.getMessage());
+            }    
+        }
+        LogToPhyMap.put(func, p);
+        
+    }
+    
+    @Override
+    public void visit(LOLoad loLoad) throws VisitorException {
+        String scope = loLoad.getOperatorKey().scope;
+        //This would be a root operator. We don't need to worry about finding its predecessors
+        POLoad load = new POLoad(new OperatorKey(scope, nodeGen.getNextNodeId(scope)));
+        load.setLFile(loLoad.getInputFile());
+        load.setPc(pc);
+        load.setResultType(loLoad.getType());
+        currentPlan.add(load);
+        LogToPhyMap.put(loLoad, load);
+    }
+    
+    @Override
+    public void visit(LOStore loStore) throws VisitorException {
+        String scope = loStore.getOperatorKey().scope;
+        POStore store = new POStore(new OperatorKey(scope, nodeGen.getNextNodeId(scope)));
+        store.setSFile(loStore.getOutputFile());
+        store.setPc(pc);
+        currentPlan.add(store);
+        PhysicalOperator<PhyPlanVisitor> from = LogToPhyMap.get(loStore.getPlan().getPredecessors(loStore).get(0));
+        try {
+            currentPlan.connect(from, store);
+        } catch (PlanException e) {
+            log.error("Invalid physical operator in the plan" + e.getMessage());
+        }
+        LogToPhyMap.put(loStore, store);
+    }
+    
+    @Override
+    public void visit(LOConst op) throws VisitorException {
+        String scope = op.getOperatorKey().scope;
+        ConstantExpression ce = new ConstantExpression(new OperatorKey(scope, nodeGen.getNextNodeId(scope)));
+        ce.setValue(op.getValue());
+        ce.setResultType(op.getType());
+        //this operator doesn't have any predecessors
+        currentPlan.add(ce);
+        LogToPhyMap.put(op, ce);
+    }
+    
+    @Override
+    public void visit(LOBinCond op) throws VisitorException {
+        String scope = op.getOperatorKey().scope;
+        ExpressionOperator physOp = new POBinCond(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), op.getRequestedParallelism());
+        LogToPhyMap.put(op, physOp);
+        POBinCond phy = (POBinCond)physOp;
+        ExpressionOperator cond = (ExpressionOperator) LogToPhyMap.get(op.getCond());
+        phy.setCond(cond);
+        ExpressionOperator lhs = (ExpressionOperator) LogToPhyMap.get(op.getLhsOp());
+        phy.setLhs(lhs);
+        ExpressionOperator rhs = (ExpressionOperator) LogToPhyMap.get(op.getRhsOp());
+        phy.setRhs(rhs);
+        
+        currentPlan.add(physOp);
+        
+        List<LogicalOperator> ops = op.getPlan().getPredecessors(op);
+        
+        for(LogicalOperator l : ops) {
+            ExpressionOperator from = (ExpressionOperator) LogToPhyMap.get(l);
+            try {
+                currentPlan.connect(from, physOp);
+            } catch (PlanException e) {
+                log.error("Invalid physical operator in the plan" + e.getMessage());
+            }
+        }
+        
+    }
+    
+    @Override
+    public void visit(LONegative op) throws VisitorException {
+        String scope = op.getOperatorKey().scope;
+        ExpressionOperator physOp = new PONegative(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), op.getRequestedParallelism(), null);
+        currentPlan.add(physOp);
+        
+        LogToPhyMap.put(op, physOp);
+        ExpressionOperator from = (ExpressionOperator) LogToPhyMap.get(op.getPlan().getPredecessors(op).get(0));
+        ((PONegative)physOp).setInput(from);
+        try {
+            currentPlan.connect(from, physOp);
+        } catch (PlanException e) {
+            log.error("Invalid physical operator in the plan" + e.getMessage());
+        }
+        
+    }
+    
+    @Override
+    public void visit(LOIsNull op) throws VisitorException {
+        String scope = op.getOperatorKey().scope;
+        ExpressionOperator physOp = new POIsNull(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), op.getRequestedParallelism(), null);
+        currentPlan.add(physOp);
+        
+        LogToPhyMap.put(op, physOp);
+        ExpressionOperator from = (ExpressionOperator) LogToPhyMap.get(op.getPlan().getPredecessors(op).get(0));
+        ((POIsNull)physOp).setInput(from);
+        try {
+            currentPlan.connect(from, physOp);
+        } catch (PlanException e) {
+            log.error("Invalid physical operator in the plan" + e.getMessage());
+        }
+        
+    }
+    
+    @Override
+    public void visit(LOMapLookup op) throws VisitorException {
+        String scope = ((OperatorKey)op.getOperatorKey()).scope;
+        ExpressionOperator physOp = new POMapLookUp(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), op.getRequestedParallelism(), op.getLookUpKey());
+        physOp.setResultType(op.getType());
+        currentPlan.add(physOp);
+        
+        LogToPhyMap.put(op, physOp);
+        
+        ExpressionOperator from = (ExpressionOperator) LogToPhyMap.get(op.getMap());
+        try {
+            currentPlan.connect(from, physOp);
+        } catch (PlanException e) {
+            log.error("Invalid physical operator in the plan" + e.getMessage());
+        }
+        
+    }
+    
+    @Override
+    public void visit(LOCast op) throws VisitorException {
+        String scope = op.getOperatorKey().scope;
+        ExpressionOperator physOp = new POCast(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), op.getRequestedParallelism());
+        currentPlan.add(physOp);
+        
+        LogToPhyMap.put(op, physOp);
+        ExpressionOperator from = (ExpressionOperator) LogToPhyMap.get(op.getExpression());
+        physOp.setResultType(op.getType());
+        
+        try {
+            currentPlan.connect(from, physOp);
+        } catch (PlanException e) {
+            log.error("Invalid physical operator in the plan" + e.getMessage());
+        }
+        
+    }
+    
+    @Override
+    public void visit(LOUnion op) throws VisitorException {
+        String scope = op.getOperatorKey().scope;
+        POUnion physOp = new POUnion(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), op.getRequestedParallelism());
+        currentPlan.add(physOp);
+        physOp.setResultType(op.getType());
+        LogToPhyMap.put(op, physOp);
+        List<LogicalOperator> ops = op.getInputs();
+        
+        for(LogicalOperator l : ops) {
+            PhysicalOperator from = LogToPhyMap.get(l);
+            try {
+                currentPlan.connect(from, physOp);
+            } catch (PlanException e) {
+                log.error("Invalid physical operator in the plan" + e.getMessage());
+            }
+        }
+    }
+    
+    
 }

Added: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/BinaryComparisonOperator.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/BinaryComparisonOperator.java?rev=665739&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/BinaryComparisonOperator.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/BinaryComparisonOperator.java Mon Jun  9 09:18:42 2008
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.physicalLayer.expressionOperators;
+
+import org.apache.pig.impl.plan.OperatorKey;
+
+/**
+ * This is a base class for all binary comparison operators. Supports the
+ * use of operand type instead of result type as the result type is
+ * always boolean.
+ * 
+ * All comparison operators fetch the lhs and rhs operands and compare
+ * them for each type using different comparison methods based on what
+ * comparison is being implemented.
+ *
+ */
+public abstract class BinaryComparisonOperator extends BinaryExpressionOperator
+        implements ComparisonOperator {
+    //The result type for comparison operators is always
+    //Boolean. So the plans evaluating these should consider
+    //the type of the operands instead of the result.
+    //The result will be comunicated using the Status object.
+    //This is a slight abuse of the status object.
+    protected byte operandType;
+    
+    public BinaryComparisonOperator(OperatorKey k) {
+        this(k,-1);
+    }
+
+    public BinaryComparisonOperator(OperatorKey k, int rp) {
+        super(k, rp);
+    }
+
+    public byte getOperandType() {
+        return operandType;
+    }
+
+    public void setOperandType(byte operandType) {
+        this.operandType = operandType;
+    }
+}

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/BinaryExpressionOperator.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/BinaryExpressionOperator.java?rev=665739&r1=665738&r2=665739&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/BinaryExpressionOperator.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/BinaryExpressionOperator.java Mon Jun  9 09:18:42 2008
@@ -60,4 +60,9 @@
     public void setRhs(ExpressionOperator rhs) {
         this.rhs = rhs;
     }
+
+    // TODO Don't we need something here that hooks lhs and rhs to our inputs in
+    // the plan?  Extenders of this class, such as Add, are depending on lhs and
+    // rhs being set.  LogToPhyTranslator is setting inputs.  I don't see
+    // anywhere connecting them together.
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/ComparisonOperator.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/ComparisonOperator.java?rev=665739&r1=665738&r2=665739&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/ComparisonOperator.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/ComparisonOperator.java Mon Jun  9 09:18:42 2008
@@ -17,39 +17,56 @@
  */
 package org.apache.pig.impl.physicalLayer.expressionOperators;
 
+import java.util.Map;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.physicalLayer.Result;
 import org.apache.pig.impl.plan.OperatorKey;
 
 /**
- * This is a base class for all comparison operators. Supports the
+ * This is an interface for all comparison operators. Supports the
  * use of operand type instead of result type as the result type is
  * always boolean.
  * 
- * All comparison operators fetch the lhs and rhs operands and compare
- * them for each type using different comparison methods based on what
- * comparison is being implemented.
- *
  */
-public abstract class ComparisonOperator extends BinaryExpressionOperator {
-    //The result type for comparison operators is always
-    //Boolean. So the plans evaluating these should consider
-    //the type of the operands instead of the result.
-    //The result will be comunicated using the Status object.
-    //This is a slight abuse of the status object.
-    protected byte operandType;
+public interface ComparisonOperator {
     
-    public ComparisonOperator(OperatorKey k) {
-        this(k,-1);
-    }
-
-    public ComparisonOperator(OperatorKey k, int rp) {
-        super(k, rp);
-    }
-
-    public byte getOperandType() {
-        return operandType;
-    }
-
-    public void setOperandType(byte operandType) {
-        this.operandType = operandType;
-    }
+    /**
+     * Determine the type of the operand(s) of this comparator.
+     * @return type, as a byte (using DataType types).
+     */
+    byte getOperandType();
+
+    /**
+     * Set the type of the operand(s) of this comparator.
+     * @param operandType Type of the operand(s), as a byte (using DataType
+     * types).
+     */
+    void setOperandType(byte operandType);
+
+    // Stupid java doesn't allow multiple inheritence, so I have to duplicate
+    // all the getNext functions here so that comparitors can have them.
+    public Result getNext(Integer i) throws ExecException;
+
+    public Result getNext(Long l) throws ExecException;
+
+    public Result getNext(Double d) throws ExecException;
+
+    public Result getNext(Float f) throws ExecException;
+
+    public Result getNext(String s) throws ExecException;
+
+    public Result getNext(DataByteArray ba) throws ExecException;
+
+    public Result getNext(Map m) throws ExecException;
+
+    public Result getNext(Boolean b) throws ExecException;
+
+    public Result getNext(Tuple t) throws ExecException;
+
+    public Result getNext(DataBag db) throws ExecException;
+
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/EqualToExpr.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/EqualToExpr.java?rev=665739&r1=665738&r2=665739&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/EqualToExpr.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/EqualToExpr.java Mon Jun  9 09:18:42 2008
@@ -32,7 +32,7 @@
 import org.apache.pig.impl.physicalLayer.plans.ExprPlanVisitor;
 import org.apache.pig.impl.plan.VisitorException;
 
-public class EqualToExpr extends ComparisonOperator {
+public class EqualToExpr extends BinaryComparisonOperator {
 
     /**
      * 

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/GTOrEqualToExpr.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/GTOrEqualToExpr.java?rev=665739&r1=665738&r2=665739&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/GTOrEqualToExpr.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/GTOrEqualToExpr.java Mon Jun  9 09:18:42 2008
@@ -28,7 +28,7 @@
 import org.apache.pig.backend.executionengine.ExecException;
 
 
-public class GTOrEqualToExpr extends ComparisonOperator {
+public class GTOrEqualToExpr extends BinaryComparisonOperator {
 
     /**
      * 

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/GreaterThanExpr.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/GreaterThanExpr.java?rev=665739&r1=665738&r2=665739&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/GreaterThanExpr.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/GreaterThanExpr.java Mon Jun  9 09:18:42 2008
@@ -27,7 +27,7 @@
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.backend.executionengine.ExecException;
 
-public class GreaterThanExpr extends ComparisonOperator {
+public class GreaterThanExpr extends BinaryComparisonOperator {
 
     /**
      * 

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/LTOrEqualToExpr.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/LTOrEqualToExpr.java?rev=665739&r1=665738&r2=665739&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/LTOrEqualToExpr.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/LTOrEqualToExpr.java Mon Jun  9 09:18:42 2008
@@ -27,7 +27,7 @@
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.backend.executionengine.ExecException;
 
-public class LTOrEqualToExpr extends ComparisonOperator {
+public class LTOrEqualToExpr extends BinaryComparisonOperator {
     /**
      * 
      */

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/LessThanExpr.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/LessThanExpr.java?rev=665739&r1=665738&r2=665739&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/LessThanExpr.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/LessThanExpr.java Mon Jun  9 09:18:42 2008
@@ -27,7 +27,7 @@
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.backend.executionengine.ExecException;
 
-public class LessThanExpr extends ComparisonOperator {
+public class LessThanExpr extends BinaryComparisonOperator {
 
     /**
      * 

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/NotEqualToExpr.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/NotEqualToExpr.java?rev=665739&r1=665738&r2=665739&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/NotEqualToExpr.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/NotEqualToExpr.java Mon Jun  9 09:18:42 2008
@@ -32,7 +32,7 @@
 import org.apache.pig.impl.physicalLayer.plans.ExprPlanVisitor;
 import org.apache.pig.impl.plan.VisitorException;
 
-public class NotEqualToExpr extends ComparisonOperator {
+public class NotEqualToExpr extends BinaryComparisonOperator {
 
     /**
      * 

Added: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POAnd.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POAnd.java?rev=665739&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POAnd.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POAnd.java Mon Jun  9 09:18:42 2008
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.physicalLayer.expressionOperators;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.physicalLayer.POStatus;
+import org.apache.pig.impl.physicalLayer.Result;
+import org.apache.pig.impl.physicalLayer.plans.ExprPlanVisitor;
+import org.apache.pig.impl.plan.VisitorException;
+
+/**
+ * Boolean and operator.
+ */
+
+public class POAnd extends BinaryComparisonOperator {
+
+    /**
+     * 
+     */
+    private static final long serialVersionUID = 1L;
+
+    public POAnd(OperatorKey k) {
+        super(k);
+    }
+
+    public POAnd(OperatorKey k, int rp) {
+        super(k, rp);
+    }
+
+    @Override
+    public void visit(ExprPlanVisitor v) throws VisitorException {
+        v.visitAnd(this);
+    }
+
+    @Override
+    public String name() {
+        return "And - " + mKey.toString();
+    }
+
+    @Override
+    public Result getNext(Boolean b) throws ExecException {
+        byte status;
+        Result res;
+        Boolean left = null, right = null;
+        res = lhs.getNext(left);
+        status = res.returnStatus;
+        if(status != POStatus.STATUS_OK) {
+            return res;
+        }
+        left = (Boolean)res.result;
+        // Short circuit.
+        if (!left) return res;
+        
+        res = rhs.getNext(right);
+        status = res.returnStatus;
+        if(status != POStatus.STATUS_OK) {
+            return res;
+        }
+        right = (Boolean)res.result;
+        
+        res.result = new Boolean(left && right);
+        return res;
+    }
+}



Mime
View raw message