pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dvrya...@apache.org
Subject svn commit: r1052127 [2/3] - in /pig/trunk: ./ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/ src/org/apache/pig/backend/hadoop/executionengine/physi...
Date Thu, 23 Dec 2010 01:33:45 GMT
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/NotEqualToExpr.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/NotEqualToExpr.java?rev=1052127&r1=1052126&r2=1052127&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/NotEqualToExpr.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/NotEqualToExpr.java Thu Dec 23 01:33:44 2010
@@ -27,7 +27,6 @@ import org.apache.pig.backend.executione
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
-import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.NodeIdGenerator;
@@ -36,7 +35,7 @@ import org.apache.pig.impl.plan.VisitorE
 public class NotEqualToExpr extends BinaryComparisonOperator {
 
     /**
-     * 
+     *
      */
     private static final long serialVersionUID = 1L;
     transient private final Log log = LogFactory.getLog(getClass());
@@ -62,104 +61,47 @@ public class NotEqualToExpr extends Bina
 
     @Override
     public Result getNext(Boolean bool) throws ExecException {
-        byte status;
         Result left, right;
 
         switch (operandType) {
-        case DataType.BYTEARRAY: {
-            Result r = accumChild(null, dummyDBA);
-            if (r != null) {
-                return r;
-            }
-            left = lhs.getNext(dummyDBA);
-            right = rhs.getNext(dummyDBA);
-            return doComparison(left, right);
-                            }
-
-        case DataType.DOUBLE: {
-            Result r = accumChild(null, dummyDouble);
-            if (r != null) {
-                return r;
-            }
-            left = lhs.getNext(dummyDouble);
-            right = rhs.getNext(dummyDouble);
-            return doComparison(left, right);
-                            }
-
-        case DataType.FLOAT: {
-            Result r = accumChild(null, dummyFloat);
-            if (r != null) {
-                return r;
-            }
-            left = lhs.getNext(dummyFloat);
-            right = rhs.getNext(dummyFloat);
-            return doComparison(left, right);
-                            }
-
-        case DataType.INTEGER: {
-            Result r = accumChild(null, dummyInt);
-            if (r != null) {
-                return r;
-            }
-            left = lhs.getNext(dummyInt);
-            right = rhs.getNext(dummyInt);
-            return doComparison(left, right);
-                            }
-
-        case DataType.LONG: {
-            Result r = accumChild(null, dummyLong);
-            if (r != null) {
-                return r;
-            }
-            left = lhs.getNext(dummyLong);
-            right = rhs.getNext(dummyLong);
-            return doComparison(left, right);
-                            }
-
-        case DataType.CHARARRAY: {
-            Result r = accumChild(null, dummyString);
-            if (r != null) {
-                return r;
-            }
-            left = lhs.getNext(dummyString);
-            right = rhs.getNext(dummyString);
-            return doComparison(left, right);
-                            }
-        case DataType.TUPLE: {
-            Result r = accumChild(null, dummyTuple);
-            if (r != null) {
-                return r;
-            }
-            left = lhs.getNext(dummyTuple);
-            right = rhs.getNext(dummyTuple);
-            return doComparison(left, right);
-                            }
-        
+        case DataType.BYTEARRAY:
+        case DataType.DOUBLE:
+        case DataType.FLOAT:
+        case DataType.INTEGER:
+        case DataType.LONG:
+        case DataType.CHARARRAY:
+        case DataType.TUPLE:
         case DataType.MAP: {
-            Result r = accumChild(null, dummyMap);
+            Object dummy = getDummy(operandType);
+            Result r = accumChild(null, dummy, operandType);
             if (r != null) {
                 return r;
             }
-            left = lhs.getNext(dummyMap);
-            right = rhs.getNext(dummyMap);
+            left = lhs.getNext(dummy, operandType);
+            right = rhs.getNext(dummy, operandType);
             return doComparison(left, right);
-                            }
-
+        }
         default: {
             int errCode = 2067;
             String msg = this.getClass().getSimpleName() + " does not know how to " +
             "handle type: " + DataType.findTypeName(operandType);
             throw new ExecException(msg, errCode, PigException.BUG);
         }
-        
+
         }
     }
 
     @SuppressWarnings("unchecked")
     private Result doComparison(Result left, Result right) throws ExecException {
-        if (trueRef == null) initializeRefs();
-        if (left.returnStatus != POStatus.STATUS_OK) return left;
-        if (right.returnStatus != POStatus.STATUS_OK) return right;
+        if (trueRef == null) {
+            initializeRefs();
+        }
+        if (left.returnStatus != POStatus.STATUS_OK) {
+            return left;
+        }
+        if (right.returnStatus != POStatus.STATUS_OK) {
+            return right;
+        }
         // if either operand is null, the result should be
         // null
         if(left.result == null || right.result == null) {
@@ -167,9 +109,9 @@ public class NotEqualToExpr extends Bina
             left.returnStatus = POStatus.STATUS_NULL;
             return left;
         }
-        
+
         if (left.result instanceof Comparable && right.result instanceof Comparable){
-            if (((Comparable)left.result).compareTo((Comparable)right.result) != 0) {
+            if (((Comparable)left.result).compareTo(right.result) != 0) {
                 left.result = trueRef;
             } else {
                 left.result = falseRef;
@@ -177,10 +119,11 @@ public class NotEqualToExpr extends Bina
         }else if (left.result instanceof HashMap && right.result instanceof HashMap){
             HashMap leftMap=(HashMap)left.result;
             HashMap rightMap=(HashMap)right.result;
-            if (leftMap.equals(rightMap))
+            if (leftMap.equals(rightMap)) {
                 left.result = falseRef;
-            else
+            } else {
                 left.result = trueRef;
+            }
         }else{
             throw new ExecException("The left side and right side has the different types");
         }
@@ -190,7 +133,7 @@ public class NotEqualToExpr extends Bina
 
     @Override
     public NotEqualToExpr clone() throws CloneNotSupportedException {
-        NotEqualToExpr clone = new NotEqualToExpr(new OperatorKey(mKey.scope, 
+        NotEqualToExpr clone = new NotEqualToExpr(new OperatorKey(mKey.scope,
             NodeIdGenerator.getGenerator().getNextNodeId(mKey.scope)));
         clone.cloneHelper(this);
         return clone;

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POBinCond.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POBinCond.java?rev=1052127&r1=1052126&r2=1052127&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POBinCond.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POBinCond.java Thu Dec 23 01:33:44 2010
@@ -29,254 +29,115 @@ import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.impl.util.IdentityHashSet;
 
 public class POBinCond extends ExpressionOperator {
-    
+
     private static final long serialVersionUID = 1L;
     ExpressionOperator cond;
     ExpressionOperator lhs;
     ExpressionOperator rhs;
     private transient List<ExpressionOperator> child;
-    
+
     public POBinCond(OperatorKey k) {
         super(k);
     }
-    
+
     public POBinCond(OperatorKey k, int rp) {
         super(k, rp);
     }
-    
+
     public POBinCond(OperatorKey k, int rp, ExpressionOperator cond, ExpressionOperator lhs, ExpressionOperator rhs) {
         super(k, rp);
         this.cond = cond;
         this.lhs = lhs;
         this.rhs = rhs;
     }
-    
+
+    public Result genericGetNext(Object obj, byte dataType) throws ExecException {
+        List<ExpressionOperator> list = new ArrayList<ExpressionOperator>();
+        list.add(cond);
+        Result r = accumChild(list, dummyBool);
+
+        if (r != null) {
+            if (r.returnStatus != POStatus.STATUS_BATCH_OK) {
+                return r;
+            }
+            list.clear();
+            list.add(lhs);
+            list.add(rhs);
+            r = accumChild(list, obj, dataType);
+            return r;
+        }
+        Result res = cond.getNext(dummyBool);
+        if (res.result==null || res.returnStatus != POStatus.STATUS_OK) {
+            return res;
+        }
+        Result result = ((Boolean)res.result) == true ? lhs.getNext(obj, dataType) : rhs.getNext(obj, dataType);
+        illustratorMarkup(null, result.result, ((Boolean)res.result) ? 0 : 1);
+        return result;
+    }
+
     @Override
     public Result getNext(Boolean b) throws ExecException {
         Result r = accumChild(null, b);
         if (r != null) {
             return r;
         }
-        
+
         Result res = cond.getNext(b);
-        if (res.result==null || res.returnStatus != POStatus.STATUS_OK) return res;
-        Result result = ((Boolean)res.result) == true ? lhs.getNext(b) : rhs.getNext(b);
-        illustratorMarkup(null, result.result, ((Boolean)res.result) ? 0 : 1);
-        return result;
-        
+        if (res.result==null || res.returnStatus != POStatus.STATUS_OK) {
+            return res;
+        }
+        return ((Boolean)res.result) == true ? lhs.getNext(b) : rhs.getNext(b);
+
     }
 
     @Override
     public Result getNext(DataBag db) throws ExecException {
-        List<ExpressionOperator> l = new ArrayList<ExpressionOperator>();
-        l.add(cond);
-        Result r = accumChild(l, dummyBool);
-        
-        if (r != null) {    	
-            if (r.returnStatus != POStatus.STATUS_BATCH_OK) {
-                return r;
-            }
-            l.clear();
-            l.add(lhs);
-            l.add(rhs);
-            r = accumChild(l, db);
-            return r;    		
-        }
-                        
-        Result res = cond.getNext(dummyBool);
-        if (res.result==null || res.returnStatus != POStatus.STATUS_OK) return res;
-        Result result = ((Boolean)res.result) == true ? lhs.getNext(db) : rhs.getNext(db);
-        illustratorMarkup(null, result.result, ((Boolean)res.result) ? 0 : 1);
-        return result;
+        return genericGetNext(db, DataType.BAG);
     }
 
     @Override
     public Result getNext(DataByteArray ba) throws ExecException {
-        List<ExpressionOperator> list = new ArrayList<ExpressionOperator>();
-        list.add(cond);
-        Result r = accumChild(list, dummyBool);
-        
-        if (r != null) {    	
-            if (r.returnStatus != POStatus.STATUS_BATCH_OK) {
-                return r;
-            }
-            list.clear();
-            list.add(lhs);
-            list.add(rhs);
-            r = accumChild(list, ba);
-            return r;    		
-        }
-        Result res = cond.getNext(dummyBool);
-        if (res.result==null || res.returnStatus != POStatus.STATUS_OK) return res;
-        Result result = ((Boolean)res.result) == true ? lhs.getNext(ba) : rhs.getNext(ba);
-        illustratorMarkup(null, result.result, ((Boolean)res.result) ? 0 : 1);
-        return result;
+        return genericGetNext(ba, DataType.BYTEARRAY);
     }
 
     @Override
     public Result getNext(Double d) throws ExecException {
-        List<ExpressionOperator> list = new ArrayList<ExpressionOperator>();
-        list.add(cond);
-        Result r = accumChild(list, dummyBool);
-        
-        if (r != null) {    	
-            if (r.returnStatus != POStatus.STATUS_BATCH_OK) {
-                return r;
-            }
-            list.clear();
-            list.add(lhs);
-            list.add(rhs);
-            r = accumChild(list, d);
-            return r;    		
-        }
-        Result res = cond.getNext(dummyBool);
-        if (res.result==null || res.returnStatus != POStatus.STATUS_OK) return res;
-        Result result = ((Boolean)res.result) == true ? lhs.getNext(d) : rhs.getNext(d);
-        illustratorMarkup(null, result.result, ((Boolean)res.result) ? 0 : 1);
-        return result;
+        return genericGetNext(d, DataType.DOUBLE);
     }
 
     @Override
     public Result getNext(Float f) throws ExecException {
-        List<ExpressionOperator> list = new ArrayList<ExpressionOperator>();
-        list.add(cond);
-        Result r = accumChild(list, dummyBool);
-        
-        if (r != null) {    	
-            if (r.returnStatus != POStatus.STATUS_BATCH_OK) {
-                return r;
-            }
-            list.clear();
-            list.add(lhs);
-            list.add(rhs);
-            r = accumChild(list, f);
-            return r;    		
-        }
-        Result res = cond.getNext(dummyBool);
-        if (res.result==null || res.returnStatus != POStatus.STATUS_OK) return res;
-        Result result = ((Boolean)res.result) == true ? lhs.getNext(f) : rhs.getNext(f);
-        illustratorMarkup(null, result.result, ((Boolean)res.result) ? 0 : 1);
-        return result;
+        return genericGetNext(f, DataType.FLOAT);
     }
 
     @Override
     public Result getNext(Integer i) throws ExecException {
-        List<ExpressionOperator> list = new ArrayList<ExpressionOperator>();
-        list.add(cond);
-        Result r = accumChild(list, dummyBool);    	
-        if (r != null) {    	
-            if (r.returnStatus != POStatus.STATUS_BATCH_OK) {
-                return r;
-            }
-            list.clear();
-            list.add(lhs);
-            list.add(rhs);
-            r = accumChild(list, i);
-            return r;    		
-        }
-        Result res = cond.getNext(dummyBool);
-        if (res.result==null || res.returnStatus != POStatus.STATUS_OK) return res;
-        Result result = ((Boolean)res.result) == true ? lhs.getNext(i) : rhs.getNext(i);
-        illustratorMarkup(null, result.result, ((Boolean)res.result) ? 0 : 1);
-        return result;
+        return genericGetNext(i, DataType.INTEGER);
     }
 
     @Override
     public Result getNext(Long l) throws ExecException {
-        List<ExpressionOperator> list = new ArrayList<ExpressionOperator>();
-        list.add(cond);
-        Result r = accumChild(list, dummyBool);
-        
-        if (r != null) {    	
-            if (r.returnStatus != POStatus.STATUS_BATCH_OK) {
-                return r;
-            }
-            list.clear();
-            list.add(lhs);
-            list.add(rhs);
-            r = accumChild(list, l);
-            return r;    		
-        }
-        Result res = cond.getNext(dummyBool);
-        if (res.result==null || res.returnStatus != POStatus.STATUS_OK) return res;
-        Result result = ((Boolean)res.result) == true ? lhs.getNext(l) : rhs.getNext(l);
-        illustratorMarkup(null, result.result, ((Boolean)res.result) ? 0 : 1);
-        return result;
+        return genericGetNext(l, DataType.LONG);
     }
 
     @Override
     public Result getNext(Map m) throws ExecException {
-        List<ExpressionOperator> list = new ArrayList<ExpressionOperator>();
-        list.add(cond);
-        Result r = accumChild(list, dummyBool);
-        
-        if (r != null) {    	
-            if (r.returnStatus != POStatus.STATUS_BATCH_OK) {
-                return r;
-            }
-            list.clear();
-            list.add(lhs);
-            list.add(rhs);
-            r = accumChild(list, m);
-            return r;    		
-        }
-        Result res = cond.getNext(dummyBool);
-        if (res.result==null || res.returnStatus != POStatus.STATUS_OK) return res;
-        Result result = ((Boolean)res.result) == true ? lhs.getNext(m) : rhs.getNext(m);
-        illustratorMarkup(null, result.result, ((Boolean)res.result) ? 0 : 1);
-        return result;
+        return genericGetNext(m, DataType.MAP);
     }
 
     @Override
     public Result getNext(String s) throws ExecException {
-        List<ExpressionOperator> list = new ArrayList<ExpressionOperator>();
-        list.add(cond);
-        Result r = accumChild(list, dummyBool);
-        
-        if (r != null) {    	
-            if (r.returnStatus != POStatus.STATUS_BATCH_OK) {
-                return r;
-            }
-            list.clear();
-            list.add(lhs);
-            list.add(rhs);
-            r = accumChild(list, s);
-            return r;    		
-        }
-        Result res = cond.getNext(dummyBool);
-        if (res.result==null || res.returnStatus != POStatus.STATUS_OK) return res;
-        Result result = ((Boolean)res.result) == true ? lhs.getNext(s) : rhs.getNext(s);
-        illustratorMarkup(null, result.result, ((Boolean)res.result) ? 0 : 1);
-        return result;
+        return genericGetNext(s, DataType.CHARARRAY);
     }
 
     @Override
     public Result getNext(Tuple t) throws ExecException {
-        List<ExpressionOperator> list = new ArrayList<ExpressionOperator>();
-        list.add(cond);
-        Result r = accumChild(list, dummyBool);
-        
-        if (r != null) {    	
-            if (r.returnStatus != POStatus.STATUS_BATCH_OK) {
-                return r;
-            }
-            list.clear();
-            list.add(lhs);
-            list.add(rhs);
-            r = accumChild(list, t);
-            return r;    		
-        }
-        Result res = cond.getNext(dummyBool);
-        if (res.result==null || res.returnStatus != POStatus.STATUS_OK) return res;
-        Result result = ((Boolean)res.result) == true ? lhs.getNext(t) : rhs.getNext(t);
-        illustratorMarkup(null, result.result, ((Boolean)res.result) ? 0 : 1);
-        return result;
+        return genericGetNext(t, DataType.TUPLE);
     }
 
     @Override
@@ -288,22 +149,22 @@ public class POBinCond extends Expressio
     public String name() {
         return "POBinCond" + "[" + DataType.findTypeName(resultType) + "]" +" - " + mKey.toString();
     }
-    
+
     @Override
     public void attachInput(Tuple t) {
         cond.attachInput(t);
         lhs.attachInput(t);
         rhs.attachInput(t);
     }
-    
+
     public void setCond(ExpressionOperator condOp) {
         this.cond = condOp;
     }
-    
+
     public void setRhs(ExpressionOperator rhs) {
         this.rhs = rhs;
     }
-    
+
     public void setLhs(ExpressionOperator lhs) {
         this.lhs = lhs;
     }
@@ -314,21 +175,21 @@ public class POBinCond extends Expressio
     public ExpressionOperator getCond() {
         return this.cond;
     }
-    
+
     /**
      * Get right expression
      */
     public ExpressionOperator getRhs() {
         return this.rhs;
     }
-    
+
     /**
      * Get left expression
      */
     public ExpressionOperator getLhs() {
         return this.lhs;
     }
-    
+
     @Override
     public boolean supportsMultipleInputs() {
         return true;
@@ -336,8 +197,8 @@ public class POBinCond extends Expressio
 
     @Override
     public POBinCond clone() throws CloneNotSupportedException {
-        POBinCond clone = new POBinCond(new OperatorKey(mKey.scope, 
-            NodeIdGenerator.getGenerator().getNextNodeId(mKey.scope)));
+        POBinCond clone = new POBinCond(new OperatorKey(mKey.scope,
+                NodeIdGenerator.getGenerator().getNextNodeId(mKey.scope)));
         clone.cloneHelper(this);
         clone.cond = cond.clone();
         clone.lhs = lhs.clone();
@@ -362,7 +223,7 @@ public class POBinCond extends Expressio
     @Override
     public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
         if(illustrator != null) {
-            
+
         }
         return null;
     }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POIsNull.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POIsNull.java?rev=1052127&r1=1052126&r2=1052127&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POIsNull.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POIsNull.java Thu Dec 23 01:33:44 2010
@@ -17,18 +17,12 @@
  */
 package org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators;
 
-import java.util.Map;
-
 import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.DataType;
-import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.VisitorException;
@@ -39,14 +33,14 @@ public class POIsNull extends UnaryCompa
 
     public POIsNull(OperatorKey k, int rp) {
         super(k, rp);
-        
+
     }
 
     public POIsNull(OperatorKey k) {
         super(k);
-        
+
     }
-    
+
     public POIsNull(OperatorKey k, int rp, ExpressionOperator in) {
         super(k, rp);
         this.expr = in;
@@ -65,110 +59,20 @@ public class POIsNull extends UnaryCompa
 
     @Override
     public Result getNext(Boolean b) throws ExecException {
-        
+
         Result res = null;
         switch(operandType) {
         case DataType.BYTEARRAY:
-            res = expr.getNext(dummyDBA);
-            if(res.returnStatus == POStatus.STATUS_OK) {
-                if (res.result == null) {
-                    res.result = true;
-                } else {
-                    res.result = false;
-                }
-                illustratorMarkup(null, res.result, (Boolean) res.result ? 0 : 1);
-            }
-            return res;
         case DataType.DOUBLE:
-            res = expr.getNext(dummyDouble);
-            if(res.returnStatus == POStatus.STATUS_OK) {
-                if (res.result == null) {
-                    res.result = true;
-                } else {
-                    res.result = false;
-                }
-                illustratorMarkup(null, res.result, (Boolean) res.result ? 0 : 1);
-            }
-            return res;
         case DataType.INTEGER:
-            res = expr.getNext(dummyInt);
-            if(res.returnStatus == POStatus.STATUS_OK) {
-                if (res.result == null) {
-                    res.result = true;
-                } else {
-                    res.result = false;
-                }
-                illustratorMarkup(null, res.result, (Boolean) res.result ? 0 : 1);
-            }
-            return res;
         case DataType.CHARARRAY:
-            res = expr.getNext(dummyString);
-            if(res.returnStatus == POStatus.STATUS_OK) {
-                if (res.result == null) {
-                    res.result = true;
-                } else {
-                    res.result = false;
-                }
-                illustratorMarkup(null, res.result, (Boolean) res.result ? 0 : 1);
-            }
-            return res;
         case DataType.BOOLEAN:
-            res = expr.getNext(dummyBool);
-            if(res.returnStatus == POStatus.STATUS_OK) {
-                if (res.result == null) {
-                    res.result = true;
-                } else {
-                    res.result = false;
-                }
-                illustratorMarkup(null, res.result, (Boolean) res.result ? 0 : 1);
-            }
-            return res;
         case DataType.LONG:
-            res = expr.getNext(dummyLong);
-            if(res.returnStatus == POStatus.STATUS_OK) {
-                if (res.result == null) {
-                    res.result = true;
-                } else {
-                    res.result = false;
-                }
-                illustratorMarkup(null, res.result, (Boolean) res.result ? 0 : 1);
-            }
-            return res;
         case DataType.FLOAT:
-            res = expr.getNext(dummyFloat);
-            if(res.returnStatus == POStatus.STATUS_OK) {
-                if (res.result == null) {
-                    res.result = true;
-                } else {
-                    res.result = false;
-                }
-                illustratorMarkup(null, res.result, (Boolean) res.result ? 0 : 1);
-            }
-            return res;
         case DataType.MAP:
-            res = expr.getNext(dummyMap);
-            if(res.returnStatus == POStatus.STATUS_OK) {
-                if (res.result == null) {
-                    res.result = true;
-                } else {
-                    res.result = false;
-                }
-                illustratorMarkup(null, res.result, (Boolean) res.result ? 0 : 1);
-            }
-            return res;
         case DataType.TUPLE:
-            res = expr.getNext(dummyTuple);
-            if(res.returnStatus == POStatus.STATUS_OK) {
-                if (res.result == null) {
-                    res.result = true;
-                } else {
-                    res.result = false;
-                }
-                illustratorMarkup(null, res.result, (Boolean) res.result ? 0 : 1);
-            }
-            return res;
         case DataType.BAG:
-            res = expr.getNext(dummyBag);
+            res = expr.getNext(getDummy(operandType), operandType);
             if(res.returnStatus == POStatus.STATUS_OK) {
                 if (res.result == null) {
                     res.result = true;
@@ -177,20 +81,20 @@ public class POIsNull extends UnaryCompa
                 }
                 illustratorMarkup(null, res.result, (Boolean) res.result ? 0 : 1);
             }
-            return res;        
+            return res;
         default: {
             int errCode = 2067;
             String msg = this.getClass().getSimpleName() + " does not know how to " +
             "handle type: " + DataType.findTypeName(operandType);
             throw new ExecException(msg, errCode, PigException.BUG);
         }
-        
+
         }
     }
 
     @Override
     public POIsNull clone() throws CloneNotSupportedException {
-        POIsNull clone = new POIsNull(new OperatorKey(mKey.scope, 
+        POIsNull clone = new POIsNull(new OperatorKey(mKey.scope,
             NodeIdGenerator.getGenerator().getNextNodeId(mKey.scope)));
         clone.cloneHelper(this);
         return clone;

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java?rev=1052127&r1=1052126&r2=1052127&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java Thu Dec 23 01:33:44 2010
@@ -44,7 +44,6 @@ import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.logicalLayer.LogicalOperator;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.OperatorKey;
@@ -53,12 +52,12 @@ import org.apache.pig.impl.plan.VisitorE
 public class POUserFunc extends ExpressionOperator {
 
     /**
-     * 
+     *
      */
     private static final long serialVersionUID = 1L;
     transient EvalFunc func;
     transient private String[] cacheFiles = null;
-    
+
     transient private final Log log = LogFactory.getLog(getClass());
     FuncSpec funcSpec;
     FuncSpec origFSpec;
@@ -67,9 +66,9 @@ public class POUserFunc extends Expressi
     public static final byte FINAL = 2;
     private boolean initialized = false;
     private MonitoredUDFExecutor executor = null;
-    
+
     private PhysicalOperator referencedOperator = null;
-    
+
     public PhysicalOperator getReferencedOperator() {
         return referencedOperator;
     }
@@ -91,7 +90,7 @@ public class POUserFunc extends Expressi
             FuncSpec funcSpec) {
         this(k, rp, inp, funcSpec, null);
     }
-    
+
     public POUserFunc(
             OperatorKey k,
             int rp,
@@ -121,7 +120,8 @@ public class POUserFunc extends Expressi
         this.func.setReporter(reporter);
         this.func.setPigLogger(pigLogger);
     }
-    
+
+    @Override
     public Result processInput() throws ExecException {
 
         // Make sure the reporter is set, because it isn't getting carried
@@ -143,9 +143,11 @@ public class POUserFunc extends Expressi
         }
 
         //Should be removed once the model is clear
-        if(reporter!=null) reporter.progress();
+        if(reporter!=null) {
+            reporter.progress();
+        }
+
 
-        
         if(isInputAttached()) {
             res.result = input;
             res.returnStatus = POStatus.STATUS_OK;
@@ -153,52 +155,23 @@ public class POUserFunc extends Expressi
             return res;
         } else {
             res.result = TupleFactory.getInstance().newTuple();
-            
+
             Result temp = null;
             for(PhysicalOperator op : inputs) {
-                switch(op.getResultType()){
-                case DataType.BAG:
-                    temp = op.getNext(dummyBag);
-                    break;
-                case DataType.BOOLEAN:
-                    temp = op.getNext(dummyBool);
-                    break;
-                case DataType.BYTEARRAY:
-                    temp = op.getNext(dummyDBA);
-                    break;
-                case DataType.CHARARRAY:
-                    temp = op.getNext(dummyString);
-                    break;
-                case DataType.DOUBLE:
-                    temp = op.getNext(dummyDouble);
-                    break;
-                case DataType.FLOAT:
-                    temp = op.getNext(dummyFloat);
-                    break;
-                case DataType.INTEGER:
-                    temp = op.getNext(dummyInt);
-                    break;
-                case DataType.LONG:
-                    temp = op.getNext(dummyLong);
-                    break;
-                case DataType.MAP:
-                    temp = op.getNext(dummyMap);
-                    break;
-                case DataType.TUPLE:
-                    temp = op.getNext(dummyTuple);
-                    break;
-                }
-                if(temp.returnStatus!=POStatus.STATUS_OK)
+                temp = op.getNext(getDummy(op.getResultType()), op.getResultType());
+                if(temp.returnStatus!=POStatus.STATUS_OK) {
                     return temp;
-                
+                }
+
                 if(op instanceof POProject &&
                         op.getResultType() == DataType.TUPLE){
                     POProject projOp = (POProject)op;
                     if(projOp.isStar()){
                         Tuple trslt = (Tuple) temp.result;
                         Tuple rslt = (Tuple) res.result;
-                        for(int i=0;i<trslt.size();i++)
+                        for(int i=0;i<trslt.size();i++) {
                             rslt.append(trslt.get(i));
+                        }
                         continue;
                     }
                 }
@@ -210,17 +183,17 @@ public class POUserFunc extends Expressi
     }
 
     private Result getNext() throws ExecException {
-        Result result = processInput();        
+        Result result = processInput();
         String errMsg = "";
-        try {						
+        try {
             if(result.returnStatus == POStatus.STATUS_OK) {
                 if (isAccumulative()) {
-                    if (isAccumStarted()) {							
+                    if (isAccumStarted()) {
                         ((Accumulator)func).accumulate((Tuple)result.result);
                         result.returnStatus = POStatus.STATUS_BATCH_OK;
                         result.result = null;
-                    }else{												
-                        result.result = ((Accumulator)func).getValue();	
+                    }else{
+                        result.result = ((Accumulator)func).getValue();
                         ((Accumulator)func).cleanup();
                     }
                 } else {
@@ -239,15 +212,15 @@ public class POUserFunc extends Expressi
                 }
                 return result;
             }
-                        
+
             return result;
         } catch (ExecException ee) {
             throw ee;
         } catch (IOException ioe) {
             int errCode = 2078;
-            String msg = "Caught error from UDF: " + funcSpec.getClassName(); 
+            String msg = "Caught error from UDF: " + funcSpec.getClassName();
             String footer = " [" + ioe.getMessage() + "]";
-            
+
             if(ioe instanceof PigException) {
                 int udfErrorCode = ((PigException)ioe).getErrorCode();
                 if(udfErrorCode != 0) {
@@ -259,11 +232,11 @@ public class POUserFunc extends Expressi
             } else {
                 msg += footer;
             }
-            
+
             throw new ExecException(msg, errCode, PigException.BUG, ioe);
         } catch (IndexOutOfBoundsException ie) {
             int errCode = 2078;
-            String msg = "Caught error from UDF: " + funcSpec.getClassName() + 
+            String msg = "Caught error from UDF: " + funcSpec.getClassName() +
             ", Out of bounds access [" + ie.getMessage() + "]";
             throw new ExecException(msg, errCode, PigException.BUG, ie);
         }
@@ -448,13 +421,13 @@ public class POUserFunc extends Expressi
     @Override
     public POUserFunc clone() throws CloneNotSupportedException {
         // Inputs will be patched up later by PhysicalPlan.clone()
-        POUserFunc clone = new POUserFunc(new OperatorKey(mKey.scope, 
+        POUserFunc clone = new POUserFunc(new OperatorKey(mKey.scope,
             NodeIdGenerator.getGenerator().getNextNodeId(mKey.scope)),
             requestedParallelism, null, funcSpec.clone());
         clone.setResultType(resultType);
         return clone;
     }
-    
+
     private void readObject(ObjectInputStream is) throws IOException, ClassNotFoundException{
         is.defaultReadObject();
         instantiateFunc(funcSpec);
@@ -464,19 +437,20 @@ public class POUserFunc extends Expressi
      * Get child expression of this expression
      */
     @Override
-    public List<ExpressionOperator> getChildExpressions() {		
+    public List<ExpressionOperator> getChildExpressions() {
         return null;
     }
-    
+
     @SuppressWarnings("unchecked")
     @Override
-    public void setAccumStart() {        
+    public void setAccumStart() {
         if (isAccumulative() && !isAccumStarted()) {
             super.setAccumStart();
             ((Accumulator)func).cleanup();
-        }        
+        }
     }
-    
+
+    @Override
     public void setResultType(byte resultType) {
         this.resultType = resultType;
     }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Subtract.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Subtract.java?rev=1052127&r1=1052126&r2=1052127&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Subtract.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Subtract.java Thu Dec 23 01:33:44 2010
@@ -29,7 +29,7 @@ import org.apache.pig.impl.plan.VisitorE
 public class Subtract extends BinaryExpressionOperator {
 
     /**
-     * 
+     *
      */
     private static final long serialVersionUID = 1L;
 
@@ -51,121 +51,77 @@ public class Subtract extends BinaryExpr
         return "Subtract" + "[" + DataType.findTypeName(resultType) + "]" +" - " + mKey.toString();
     }
 
-    @Override
-    public Result getNext(Double d) throws ExecException {
-        Result r = accumChild(null, d);
+    /*
+     * This method is used to invoke the appropriate subtraction method, as Java does not provide generic
+     * dispatch for it.
+     */
+    @SuppressWarnings("unchecked")
+    protected <T extends Number> T subtract(T a, T b, byte dataType) throws ExecException {
+        switch(dataType) {
+        case DataType.DOUBLE:
+            return (T) Double.valueOf((Double) a - (Double) b);
+        case DataType.INTEGER:
+            return (T) Integer.valueOf((Integer) a - (Integer) b);
+        case DataType.LONG:
+            return (T) Long.valueOf((Long) a - (Long) b);
+        case DataType.FLOAT:
+            return (T) Float.valueOf((Float) a - (Float) b);
+        default:
+            throw new ExecException("called on unsupported Number class " + DataType.findTypeName(dataType));
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    protected <T extends Number> Result genericGetNext(T number, byte dataType) throws ExecException {
+        Result r = accumChild(null, number, dataType);
         if (r != null) {
             return r;
         }
-        
+
         byte status;
         Result res;
-        Double left = null, right = null;
-        res = lhs.getNext(left);
+        T left = null, right = null;
+        res = lhs.getNext(left, dataType);
         status = res.returnStatus;
         if(status != POStatus.STATUS_OK || res.result == null) {
             return res;
         }
-        left = (Double) res.result;
-        
-        res = rhs.getNext(right);
+        left = (T) res.result;
+
+        res = rhs.getNext(right, dataType);
         status = res.returnStatus;
         if(status != POStatus.STATUS_OK || res.result == null) {
             return res;
         }
-        right = (Double) res.result;
-        
-        res.result = new Double(left - right);
+        right = (T) res.result;
+
+        res.result = subtract(left, right, dataType);
         return res;
     }
-    
+
+    @Override
+    public Result getNext(Double d) throws ExecException {
+        return genericGetNext(d, DataType.DOUBLE);
+    }
+
     @Override
     public Result getNext(Float f) throws ExecException {
-        Result r = accumChild(null, f);
-        if (r != null) {
-            return r;
-        }
-        
-        byte status;
-        Result res;
-        Float left = null, right = null;
-        res = lhs.getNext(left);
-        status = res.returnStatus;
-        if(status != POStatus.STATUS_OK || res.result == null) {
-            return res;
-        }
-        left = (Float) res.result;
-        
-        res = rhs.getNext(right);
-        status = res.returnStatus;
-        if(status != POStatus.STATUS_OK || res.result == null) {
-            return res;
-        }
-        right = (Float) res.result;
-        
-        res.result = new Float(left - right);
-        return res;
+        return genericGetNext(f, DataType.FLOAT);
     }
-    
+
     @Override
     public Result getNext(Integer i) throws ExecException {
-        Result r = accumChild(null, i);
-        if (r != null) {
-            return r;
-        }
-        
-        byte status;
-        Result res;
-        Integer left = null, right = null;
-        res = lhs.getNext(left);
-        status = res.returnStatus;
-        if(status != POStatus.STATUS_OK || res.result == null) {
-            return res;
-        }
-        left = (Integer) res.result;
-        
-        res = rhs.getNext(right);
-        status = res.returnStatus;
-        if(status != POStatus.STATUS_OK || res.result == null) {
-            return res;
-        }
-        right = (Integer) res.result;
-        
-        res.result = Integer.valueOf(left - right);
-        return res;
+        return genericGetNext(i, DataType.INTEGER);
     }
-    
+
     @Override
     public Result getNext(Long l) throws ExecException {
-        Result r = accumChild(null, l);
-        if (r != null) {
-            return r;
-        }
-        
-        byte status;
-        Result res;
-        Long left = null, right = null;
-        res = lhs.getNext(left);
-        status = res.returnStatus;
-        if(status != POStatus.STATUS_OK || res.result == null) {
-            return res;
-        }
-        left = (Long) res.result;
-        
-        res = rhs.getNext(right);
-        status = res.returnStatus;
-        if(status != POStatus.STATUS_OK || res.result == null) {
-            return res;
-        }
-        right = (Long) res.result;
-        
-        res.result = Long.valueOf(left - right);
-        return res;
+        return genericGetNext(l, DataType.LONG);
     }
 
     @Override
     public Subtract clone() throws CloneNotSupportedException {
-        Subtract clone = new Subtract(new OperatorKey(mKey.scope, 
+        Subtract clone = new Subtract(new OperatorKey(mKey.scope,
             NodeIdGenerator.getGenerator().getNextNodeId(mKey.scope)));
         clone.cloneHelper(this);
         return clone;

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java?rev=1052127&r1=1052126&r2=1052127&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java Thu Dec 23 01:33:44 2010
@@ -44,10 +44,10 @@ import org.apache.pig.impl.util.Identity
 
 /**
  * The collected group operator is a special operator used when users give
- * the hint 'using "collected"' in a group by clause. It implements a map-side  
- * group that collects all records for a given key into a buffer. When it sees 
- * a key change it will emit the key and bag for records it had buffered. 
- * It will assume that all keys for a given record are collected together 
+ * the hint 'using "collected"' in a group by clause. It implements a map-side
+ * group that collects all records for a given key into a buffer. When it sees
+ * a key change it will emit the key and bag for records it had buffered.
+ * It will assume that all keys for a given record are collected together
  * and thus there is not need to buffer across keys.
  *
  */
@@ -62,19 +62,19 @@ public class POCollectedGroup extends Ph
 //    private Log log = LogFactory.getLog(getClass());
 
     protected List<PhysicalPlan> plans;
-    
+
     protected List<ExpressionOperator> leafOps;
 
     protected byte keyType;
 
-    private Tuple output;
+    private final Tuple output;
 
     private DataBag outputBag = null;
-    
+
     private Object prevKey = null;
-    
+
     private boolean useDefaultBag = false;
-    
+
     public POCollectedGroup(OperatorKey k) {
         this(k, -1, null);
     }
@@ -124,13 +124,13 @@ public class POCollectedGroup extends Ph
     public void attachInput(Tuple t) {
         super.attachInput(t);
     }
-    
+
     @SuppressWarnings("unchecked")
     @Override
     public Result getNext(Tuple t) throws ExecException {
-    
+
         // Since the output is buffered, we need to flush the last
-        // set of records when the close method is called by mapper. 
+        // set of records when the close method is called by mapper.
         if (this.parentPlan.endOfAllInput) {
             if (outputBag != null) {
                 Tuple tup = mTupleFactory.newTuple(2);
@@ -138,8 +138,8 @@ public class POCollectedGroup extends Ph
                 tup.set(1, outputBag);
                 outputBag = null;
                 return new Result(POStatus.STATUS_OK, tup);
-            } 
-                
+            }
+
             return new Result(POStatus.STATUS_EOP, null);
         }
 
@@ -148,66 +148,34 @@ public class POCollectedGroup extends Ph
 
         while (true) {
             inp = processInput();
-            if (inp.returnStatus == POStatus.STATUS_EOP || 
+            if (inp.returnStatus == POStatus.STATUS_EOP ||
                     inp.returnStatus == POStatus.STATUS_ERR) {
                 break;
             }
-            
+
             if (inp.returnStatus == POStatus.STATUS_NULL) {
                 continue;
             }
-            
+
             for (PhysicalPlan ep : plans) {
                 ep.attachInput((Tuple)inp.result);
             }
-            
+
             List<Result> resLst = new ArrayList<Result>();
             for (ExpressionOperator op : leafOps) {
-                
-                switch (op.getResultType()){
-                case DataType.BAG:
-                    res = op.getNext(dummyBag);
-                    break;
-                case DataType.BOOLEAN:
-                    res = op.getNext(dummyBool);
-                    break;
-                case DataType.BYTEARRAY:
-                    res = op.getNext(dummyDBA);
-                    break;
-                case DataType.CHARARRAY:
-                    res = op.getNext(dummyString);
-                    break;
-                case DataType.DOUBLE:
-                    res = op.getNext(dummyDouble);
-                    break;
-                case DataType.FLOAT:
-                    res = op.getNext(dummyFloat);
-                    break;
-                case DataType.INTEGER:
-                    res = op.getNext(dummyInt);
-                    break;
-                case DataType.LONG:
-                    res = op.getNext(dummyLong);
-                    break;
-                case DataType.MAP:
-                    res = op.getNext(dummyMap);
-                    break;
-                case DataType.TUPLE:
-                    res = op.getNext(dummyTuple);
-                    break;
-                }
+                res = op.getNext(getDummy(op.getResultType()), op.getResultType());
                 if (res.returnStatus != POStatus.STATUS_OK) {
                     return new Result();
                 }
                 resLst.add(res);
             }
-            
+
             Tuple tup = constructOutput(resLst,(Tuple)inp.result);
             Object curKey = tup.get(0);
 
             // the first time, just create a new buffer and continue.
             if (prevKey == null && outputBag == null) {
-                
+
                 if (PigMapReduce.sJobConf != null) {
                     String bagType = PigMapReduce.sJobConf.get("pig.cachedbag.type");
                     if (bagType != null && bagType.equalsIgnoreCase("default")) {
@@ -215,40 +183,40 @@ public class POCollectedGroup extends Ph
                     }
                 }
                 prevKey = curKey;
-                outputBag = useDefaultBag ? BagFactory.getInstance().newDefaultBag() 
-                // In a very rare case if there is a POStream after this 
+                outputBag = useDefaultBag ? BagFactory.getInstance().newDefaultBag()
+                // In a very rare case if there is a POStream after this
                 // POCollectedGroup in the pipeline and is also blocking the pipeline;
                 // constructor argument should be 2. But for one obscure
                 // case we don't want to pay the penalty all the time.
-                        
+
                 // Additionally, if there is a merge join(on a different key) following POCollectedGroup
                 // default bags should be used. But since we don't allow anything
-                // before Merge Join currently we are good.        
+                // before Merge Join currently we are good.
                         : new InternalCachedBag(1);
                 outputBag.add((Tuple)tup.get(1));
                 continue;
             }
-            
+
             // no key change
             if (prevKey == null && curKey == null) {
                 outputBag.add((Tuple)tup.get(1));
                 continue;
             }
-            
+
             // no key change
             if (prevKey != null && curKey != null && ((Comparable)curKey).compareTo(prevKey) == 0) {
                 outputBag.add((Tuple)tup.get(1));
                 continue;
-            } 
-            
+            }
+
             // key change
             Tuple tup2 = mTupleFactory.newTuple(2);
             tup2.set(0, prevKey);
             tup2.set(1, outputBag);
             res.result = tup2;
-               
+
             prevKey = curKey;
-            outputBag = useDefaultBag ? BagFactory.getInstance().newDefaultBag() 
+            outputBag = useDefaultBag ? BagFactory.getInstance().newDefaultBag()
                     : new InternalCachedBag(1);
             outputBag.add((Tuple)tup.get(1));
             illustratorMarkup(null, tup2, 0);
@@ -257,28 +225,28 @@ public class POCollectedGroup extends Ph
 
         return inp;
     }
-    
+
     protected Tuple constructOutput(List<Result> resLst, Tuple value) throws ExecException{
-        
+
         // Construct key
         Object key;
-        
+
         if (resLst.size() > 1) {
             Tuple t = mTupleFactory.newTuple(resLst.size());
             int i = -1;
             for (Result res : resLst) {
                 t.set(++i, res.result);
             }
-            key = t;           
-        } 
+            key = t;
+        }
         else {
             key = resLst.get(0).result;
         }
-        
+
         // Put key and value in a tuple and return
         output.set(0, key);
         output.set(1, value);
-                
+
         return output;
     }
 
@@ -298,9 +266,9 @@ public class POCollectedGroup extends Ph
         this.plans = plans;
         leafOps.clear();
         for (PhysicalPlan plan : plans) {
-            ExpressionOperator leaf = (ExpressionOperator)plan.getLeaves().get(0); 
+            ExpressionOperator leaf = (ExpressionOperator)plan.getLeaves().get(0);
             leafOps.add(leaf);
-        }            
+        }
    }
     
     private void setIllustratorEquivalenceClasses(Tuple tin) {

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java?rev=1052127&r1=1052126&r2=1052127&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java Thu Dec 23 01:33:44 2010
@@ -18,20 +18,15 @@
 package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
 
 import java.io.IOException;
-import java.io.UnsupportedEncodingException;
 import java.util.ArrayList;
 import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.AccumulativeBag;
 import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
@@ -47,7 +42,6 @@ import org.apache.pig.impl.plan.Dependen
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.impl.util.IdentityHashSet;
 import org.apache.pig.pen.util.ExampleTuple;
 import org.apache.pig.pen.util.LineageTracer;
 
@@ -56,10 +50,10 @@ import org.apache.pig.pen.util.LineageTr
 public class POForEach extends PhysicalOperator {
 
     /**
-     * 
+     *
      */
     private static final long serialVersionUID = 1L;
-    
+
     protected List<PhysicalPlan> inputPlans;
     protected List<PhysicalOperator> opsToBeReset;
     transient protected Log log = LogFactory.getLog(getClass());
@@ -68,35 +62,35 @@ public class POForEach extends PhysicalO
     //as the generate can potentially return multiple tuples for
     //same call.
     protected boolean processingPlan = false;
-    
+
     //its holds the iterators of the databags given by the input expressions which need flattening.
     transient protected Iterator<Tuple> [] its = null;
-    
+
     //This holds the outputs given out by the input expressions of any datatype
     protected Object [] bags = null;
-    
+
     //This is the template whcih contains tuples and is flattened out in createTuple() to generate the final output
     protected Object[] data = null;
-    
+
     // store result types of the plan leaves
     protected byte[] resultTypes = null;
-    
+
     // array version of isToBeFlattened - this is purely
     // for optimization - instead of calling isToBeFlattened.get(i)
     // we can do the quicker array access - isToBeFlattenedArray[i].
     // Also we can store "boolean" values rather than "Boolean" objects
     // so we can also save on the Boolean.booleanValue() calls
     protected boolean[] isToBeFlattenedArray;
-    
+
     ExampleTuple tIn = null;
     protected int noItems;
 
     protected PhysicalOperator[] planLeafOps = null;
-    
+
     protected transient AccumulativeTupleBuffer buffer;
-    
+
     protected Tuple inpTuple;
-    
+
     public POForEach(OperatorKey k) {
         this(k,-1,null,null);
     }
@@ -112,7 +106,7 @@ public class POForEach extends PhysicalO
     public POForEach(OperatorKey k, List inp) {
         this(k,-1,inp,null);
     }
-    
+
     public POForEach(OperatorKey k, int rp, List<PhysicalPlan> inp, List<Boolean>  isToBeFlattened){
         super(k, rp);
         setUpFlattens(isToBeFlattened);
@@ -127,15 +121,16 @@ public class POForEach extends PhysicalO
     }
 
     @Override
-    public String name() {      
+    public String name() {
         return getAliasString() + "New For Each" + "(" + getFlatStr() + ")" + "["
                 + DataType.findTypeName(resultType) + "]" + " - "
                 + mKey.toString();
     }
-    
+
     String getFlatStr() {
-        if(isToBeFlattenedArray ==null)
+        if(isToBeFlattenedArray ==null) {
             return "";
+        }
         StringBuilder sb = new StringBuilder();
         for (Boolean b : isToBeFlattenedArray) {
             sb.append(b);
@@ -155,11 +150,12 @@ public class POForEach extends PhysicalO
     @Override
     public boolean supportsMultipleOutputs() {
         return false;
-    }      
-          
+    }
+
+    @Override
     public void setAccumulative() {
         super.setAccumulative();
-        for(PhysicalPlan p : inputPlans) {            
+        for(PhysicalPlan p : inputPlans) {
             Iterator<PhysicalOperator> iter = p.iterator();
             while(iter.hasNext()) {
                 PhysicalOperator po = iter.next();
@@ -170,9 +166,10 @@ public class POForEach extends PhysicalO
         }
     }
 
+    @Override
     public void setAccumStart() {
         super.setAccumStart();
-        for(PhysicalPlan p : inputPlans) {            
+        for(PhysicalPlan p : inputPlans) {
             Iterator<PhysicalOperator> iter = p.iterator();
             while(iter.hasNext()) {
                 PhysicalOperator po = iter.next();
@@ -182,14 +179,15 @@ public class POForEach extends PhysicalO
             }
         }
     }
-    
-    public void setAccumEnd() {    	
+
+    @Override
+    public void setAccumEnd() {
         super.setAccumEnd();
-        for(PhysicalPlan p : inputPlans) {            
+        for(PhysicalPlan p : inputPlans) {
             Iterator<PhysicalOperator> iter = p.iterator();
             while(iter.hasNext()) {
                 PhysicalOperator po = iter.next();
-                if (po instanceof ExpressionOperator || po instanceof PODistinct) {    				
+                if (po instanceof ExpressionOperator || po instanceof PODistinct) {
                     po.setAccumEnd();
                 }
             }
@@ -202,7 +200,7 @@ public class POForEach extends PhysicalO
      * to denote the begin and end of the nested plan processing.
      */
     @Override
-    public Result getNext(Tuple t) throws ExecException {    	
+    public Result getNext(Tuple t) throws ExecException {
         Result res = null;
         Result inp = null;
         //The nested plan is under processing
@@ -210,15 +208,16 @@ public class POForEach extends PhysicalO
         //returns
         if(processingPlan){
             while(true) {
-                res = processPlan();               
-                
+                res = processPlan();
+
                 if(res.returnStatus==POStatus.STATUS_OK) {
                     return res;
                 }
                 if(res.returnStatus==POStatus.STATUS_EOP) {
-                    processingPlan = false; 
-                    for(PhysicalPlan plan : inputPlans)
+                    processingPlan = false;
+                    for(PhysicalPlan plan : inputPlans) {
                         plan.detachInput();
+                    }
                     break;
                 }
                 if(res.returnStatus==POStatus.STATUS_ERR) {
@@ -242,16 +241,16 @@ public class POForEach extends PhysicalO
             if (inp.returnStatus == POStatus.STATUS_NULL) {
                 continue;
             }
-                       
+
             attachInputToPlans((Tuple) inp.result);
             inpTuple = (Tuple)inp.result;
-            
+
             for (PhysicalOperator po : opsToBeReset) {
                 po.reset();
             }
-            
-            if (isAccumulative()) {            	
-                for(int i=0; i<inpTuple.size(); i++) {            		
+
+            if (isAccumulative()) {
+                for(int i=0; i<inpTuple.size(); i++) {
                     if (inpTuple.getType(i) == DataType.BAG) {
                         // we only need to check one bag, because all the bags
                         // share the same buffer
@@ -259,51 +258,51 @@ public class POForEach extends PhysicalO
                         break;
                     }
                 }
-                
-                while(true) {                    		
-                    if (buffer.hasNextBatch()) {        
+
+                while(true) {
+                    if (buffer.hasNextBatch()) {
                         try {
                             buffer.nextBatch();
                         }catch(IOException e) {
                             throw new ExecException(e);
                         }
-                        
-                        setAccumStart();                		
+                        setAccumStart();
                     }else{
                         inpTuple = ((POPackage.POPackageTupleBuffer) buffer).illustratorMarkup(null, inpTuple, 0);
  //                       buffer.clear();
-                        setAccumEnd();                		
+                        setAccumEnd();
                     }
-                    
-                    res = processPlan();            	
-                    
+
+                    res = processPlan();
+
                     if (res.returnStatus == POStatus.STATUS_BATCH_OK) {
                         // attach same input again to process next batch
                         attachInputToPlans((Tuple) inp.result);
                     } else {
                         break;
                     }
-                } 
-                
-            } else {                        
-                res = processPlan();          
+                }
+
+            } else {
+                res = processPlan();
             }
-            
+
             processingPlan = true;
-            
+
             return res;
         }
     }
 
-    protected Result processPlan() throws ExecException{    	
+    protected Result processPlan() throws ExecException{
         Result res = new Result();
 
         //We check if all the databags have exhausted the tuples. If so we enforce the reading of new data by setting data and its to null
         if(its != null) {
             boolean restartIts = true;
             for(int i = 0; i < noItems; ++i) {
-                if(its[i] != null && isToBeFlattenedArray[i] == true)
+                if(its[i] != null && isToBeFlattenedArray[i] == true) {
                     restartIts &= !its[i].hasNext();
+                }
             }
             //this means that all the databags have reached their last elements. so we need to force reading of fresh databags
             if(restartIts) {
@@ -311,64 +310,44 @@ public class POForEach extends PhysicalO
                 data = null;
             }
         }
-        
- 
+
+
         if(its == null) {
-            //getNext being called for the first time OR starting with a set of new data from inputs 
+            //getNext being called for the first time OR starting with a set of new data from inputs
             its = new Iterator[noItems];
             bags = new Object[noItems];
-            
+
             for(int i = 0; i < noItems; ++i) {
                 //Getting the iterators
                 //populate the input data
                 Result inputData = null;
                 switch(resultTypes[i]) {
                 case DataType.BAG:
-                    inputData = planLeafOps[i].getNext(dummyBag);
-                    break;
-
                 case DataType.TUPLE :
-                inputData = planLeafOps[i].getNext(dummyTuple);
-                break;
                 case DataType.BYTEARRAY :
-                inputData = planLeafOps[i].getNext(dummyDBA);
-                break; 
                 case DataType.MAP :
-                inputData = planLeafOps[i].getNext(dummyMap);
-                break;
                 case DataType.BOOLEAN :
-                inputData = planLeafOps[i].getNext(dummyBool);
-                break;
                 case DataType.INTEGER :
-                inputData = planLeafOps[i].getNext(dummyInt);
-                break;
                 case DataType.DOUBLE :
-                inputData = planLeafOps[i].getNext(dummyDouble);
-                break;
                 case DataType.LONG :
-                inputData = planLeafOps[i].getNext(dummyLong);
-                break;
                 case DataType.FLOAT :
-                inputData = planLeafOps[i].getNext(dummyFloat);
-                break;
                 case DataType.CHARARRAY :
-                inputData = planLeafOps[i].getNext(dummyString);
-                break;
-
+                    inputData = planLeafOps[i].getNext(getDummy(resultTypes[i]), resultTypes[i]);
+                    break;
                 default: {
                     int errCode = 2080;
                     String msg = "Foreach currently does not handle type " + DataType.findTypeName(resultTypes[i]);
                     throw new ExecException(msg, errCode, PigException.BUG);
                 }
-                
+
                 }
 
-                if (inputData.returnStatus == POStatus.STATUS_BATCH_OK) {                	
+                if (inputData.returnStatus == POStatus.STATUS_BATCH_OK) {
                     continue;
                 }
 
                 if(inputData.returnStatus == POStatus.STATUS_EOP) {
-                    //we are done with all the elements. Time to return.                	
+                    //we are done with all the elements. Time to return.
                     its = null;
                     bags = null;
                     return inputData;
@@ -379,22 +358,23 @@ public class POForEach extends PhysicalO
                 }
 
 //                Object input = null;
-                
+
                 bags[i] = inputData.result;
-                
-                if(inputData.result instanceof DataBag && isToBeFlattenedArray[i]) 
+
+                if(inputData.result instanceof DataBag && isToBeFlattenedArray[i]) {
                     its[i] = ((DataBag)bags[i]).iterator();
-                else 
+                } else {
                     its[i] = null;
+                }
             }
         }
 
         // if accumulating, we haven't got data yet for some fields, just return
         if (isAccumulative() && isAccumStarted()) {
-            res.returnStatus = POStatus.STATUS_BATCH_OK;        	
+            res.returnStatus = POStatus.STATUS_BATCH_OK;
             return res;
         }
-        
+
         while(true) {
             if(data == null) {
                 //getNext being called for the first time or starting on new input data
@@ -416,9 +396,11 @@ public class POForEach extends PhysicalO
                     } else {
                         data[i] = bags[i];
                     }
-                    
+
+                }
+                if(reporter!=null) {
+                    reporter.progress();
                 }
-                if(reporter!=null) reporter.progress();
                 //createTuple(data);
                 res.result = createTuple(data);
                 res.returnStatus = POStatus.STATUS_OK;
@@ -447,12 +429,12 @@ public class POForEach extends PhysicalO
                 }
             }
         }
-        
+
         //return null;
     }
-    
+
     /**
-     * 
+     *
      * @param data array that is the template for the final flattened tuple
      * @return the final flattened tuple
      */
@@ -460,30 +442,32 @@ public class POForEach extends PhysicalO
         Tuple out =  mTupleFactory.newTuple();
         for(int i = 0; i < data.length; ++i) {
             Object in = data[i];
-            
+
             if(isToBeFlattenedArray[i] && in instanceof Tuple) {
                 Tuple t = (Tuple)in;
                 int size = t.size();
                 for(int j = 0; j < size; ++j) {
                     out.append(t.get(j));
                 }
-            } else
+            } else {
                 out.append(in);
+            }
         }
-        if (inpTuple != null)
+        if (inpTuple != null) {
             return illustratorMarkup(inpTuple, out, 0);
-        else
+        } else {
             return illustratorMarkup2(data, out);
+        }
     }
 
-    
+
     protected void attachInputToPlans(Tuple t) {
-        //super.attachInput(t);    	
-        for(PhysicalPlan p : inputPlans) {        	
+        //super.attachInput(t);
+        for(PhysicalPlan p : inputPlans) {
             p.attachInput(t);
         }
     }
-    
+
     public void getLeaves() {
         if (inputPlans != null) {
             int i=-1;
@@ -493,19 +477,20 @@ public class POForEach extends PhysicalO
             planLeafOps = new PhysicalOperator[inputPlans.size()];
             for(PhysicalPlan p : inputPlans) {
                 ++i;
-                PhysicalOperator leaf = p.getLeaves().get(0); 
+                PhysicalOperator leaf = p.getLeaves().get(0);
                 planLeafOps[i] = leaf;
                 if(leaf instanceof POProject &&
                         leaf.getResultType() == DataType.TUPLE &&
-                        ((POProject)leaf).isStar())
+                        ((POProject)leaf).isStar()) {
                     isToBeFlattenedArray[i] = true;
+                }
             }
         }
         // we are calculating plan leaves
         // so lets reinitialize
         reInitialize();
     }
-    
+
     private void reInitialize() {
         if(planLeafOps != null) {
             noItems = planLeafOps.length;
@@ -517,7 +502,7 @@ public class POForEach extends PhysicalO
             noItems = 0;
             resultTypes = null;
         }
-        
+
         if(inputPlans != null) {
             for (PhysicalPlan pp : inputPlans) {
                 try {
@@ -530,7 +515,7 @@ public class POForEach extends PhysicalO
             }
         }
     }
-    
+
     public List<PhysicalPlan> getInputPlans() {
         return inputPlans;
     }
@@ -550,9 +535,9 @@ public class POForEach extends PhysicalO
             newPlanLeafOps[i] = planLeafOps[i];
         }
         // add to the end
-        newPlanLeafOps[planLeafOps.length] = plan.getLeaves().get(0); 
+        newPlanLeafOps[planLeafOps.length] = plan.getLeaves().get(0);
         planLeafOps = newPlanLeafOps;
-        
+
         // add to isToBeFlattenedArray
         // copy existing values
         boolean[] newIsToBeFlattenedArray = new boolean[isToBeFlattenedArray.length + 1];
@@ -562,7 +547,7 @@ public class POForEach extends PhysicalO
         // add to end
         newIsToBeFlattenedArray[isToBeFlattenedArray.length] = flatten;
         isToBeFlattenedArray = newIsToBeFlattenedArray;
-        
+
         // we just added a leaf - reinitialize
         reInitialize();
     }
@@ -583,7 +568,7 @@ public class POForEach extends PhysicalO
     }
 
     /**
-     * Make a deep copy of this operator.  
+     * Make a deep copy of this operator.
      * @throws CloneNotSupportedException
      */
     @Override
@@ -601,12 +586,12 @@ public class POForEach extends PhysicalO
                 flattens.add(b);
             }
         }
-        
+
         List<PhysicalOperator> ops = new ArrayList<PhysicalOperator>(opsToBeReset.size());
         for (PhysicalOperator op : opsToBeReset) {
             ops.add(op);
         }
-        POForEach clone = new POForEach(new OperatorKey(mKey.scope, 
+        POForEach clone = new POForEach(new OperatorKey(mKey.scope,
                 NodeIdGenerator.getGenerator().getNextNodeId(mKey.scope)),
                 requestedParallelism, plans, flattens);
         clone.setOpsToBeReset(ops);
@@ -619,7 +604,7 @@ public class POForEach extends PhysicalO
     {
         return processingPlan;
     }
-    
+
     protected void setUpFlattens(List<Boolean> isToBeFlattened) {
         if(isToBeFlattened == null) {
             isToBeFlattenedArray = null;
@@ -660,7 +645,7 @@ public class POForEach extends PhysicalO
             // FIXME: add only if limit is present
             opsToBeReset.add(sort);
         }
-        
+
         /* (non-Javadoc)
          * @see org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor#visitProject(org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject)
          */
@@ -685,7 +670,7 @@ public class POForEach extends PhysicalO
     public void setOpsToBeReset(List<PhysicalOperator> opsToBeReset) {
         this.opsToBeReset = opsToBeReset;
     }
-    
+
     private Tuple illustratorMarkup2(Object[] in, Object out) {
       if(illustrator != null) {
           ExampleTuple tOut = new ExampleTuple((Tuple) out);
@@ -698,15 +683,19 @@ public class POForEach extends PhysicalO
           }
           illustrator.addData(tOut);
           int i;
-          for (i = 0; i < noItems; ++i)
-              if (((DataBag)bags[i]).size() < 2)
-                  break;
-          if (i >= noItems && !illustrator.getEqClassesShared())
-              illustrator.getEquivalenceClasses().get(0).add(tOut);
+          for (i = 0; i < noItems; ++i) {
+            if (((DataBag)bags[i]).size() < 2) {
+                break;
+            }
+        }
+          if (i >= noItems && !illustrator.getEqClassesShared()) {
+            illustrator.getEquivalenceClasses().get(0).add(tOut);
+        }
           tOut.synthetic = synthetic;
           return tOut;
-      } else
-          return (Tuple) out;
+      } else {
+        return (Tuple) out;
+    }
     }
 
     @Override
@@ -714,14 +703,16 @@ public class POForEach extends PhysicalO
         if(illustrator != null) {
             ExampleTuple tOut = new ExampleTuple((Tuple) out);
             illustrator.addData(tOut);
-            if (!illustrator.getEqClassesShared())
+            if (!illustrator.getEqClassesShared()) {
                 illustrator.getEquivalenceClasses().get(0).add(tOut);
+            }
             LineageTracer lineageTracer = illustrator.getLineage();
             lineageTracer.insert(tOut);
             tOut.synthetic = ((ExampleTuple) in).synthetic;
             lineageTracer.union((ExampleTuple) in , tOut);
             return tOut;
-        } else
-          return (Tuple) out;
+        } else {
+            return (Tuple) out;
+        }
     }
 }



Mime
View raw message