pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dvrya...@apache.org
Subject svn commit: r1214029 [2/2] - in /pig/trunk: ./ src/org/apache/pig/builtin/
Date Wed, 14 Dec 2011 01:54:09 GMT
Modified: pig/trunk/src/org/apache/pig/builtin/IntMax.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/IntMax.java?rev=1214029&r1=1214028&r2=1214029&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/IntMax.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/IntMax.java Wed Dec 14 01:54:08 2011
@@ -17,176 +17,27 @@
  */
 package org.apache.pig.builtin;
 
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.apache.pig.Accumulator;
-import org.apache.pig.Algebraic;
-import org.apache.pig.EvalFunc;
-import org.apache.pig.PigException;
-import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DataType;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-
 /**
  * This method should never be used directly, use {@link MAX}.
  */
-public class IntMax extends EvalFunc<Integer> implements Algebraic, Accumulator<Integer> {
-
-    @Override
-    public Integer exec(Tuple input) throws IOException {
-         try {
-            return max(input);
-        } catch (ExecException ee) {
-            throw ee;
-        } catch (Exception e) {
-            int errCode = 2106;
-            String msg = "Error while computing max in " + this.getClass().getSimpleName();
-            throw new ExecException(msg, errCode, PigException.BUG, e);           
-        }
-    }
+public class IntMax extends AlgebraicIntMathBase {
 
-    public String getInitial() {
-        return Initial.class.getName();
+    public IntMax() {
+        setOp(KNOWN_OP.MAX);
     }
 
-    public String getIntermed() {
-        return Intermediate.class.getName();
-    }
-
-    public String getFinal() {
-        return Final.class.getName();
-    }
-
-    static public class Initial extends EvalFunc<Tuple> {
-        private static TupleFactory tfact = TupleFactory.getInstance();
-
+    public static class Intermediate extends AlgebraicIntMathBase.Intermediate {
         @Override
-        public Tuple exec(Tuple input) throws IOException {
-            try {
-                // input is a bag with one tuple containing
-                // the column we are trying to max on
-                DataBag bg = (DataBag) input.get(0);
-                Integer i = null;
-                if(bg.iterator().hasNext()) {
-                    Tuple tp = bg.iterator().next();
-                    i = (Integer)(tp.get(0));
-                }
-                return tfact.newTuple(i);
-            } catch (ExecException ee) {
-                throw ee;
-            } catch (Exception e) {
-                int errCode = 2106;
-                String msg = "Error while computing max in " + this.getClass().getSimpleName();
-                throw new ExecException(msg, errCode, PigException.BUG, e);           
+        public KNOWN_OP getOp() {
+            return KNOWN_OP.MAX;
             }
         }
-    }
-
-    static public class Intermediate extends EvalFunc<Tuple> {
-        private static TupleFactory tfact = TupleFactory.getInstance();
 
+    public static class Final extends AlgebraicIntMathBase.Final {
         @Override
-        public Tuple exec(Tuple input) throws IOException {
-            try {
-                return tfact.newTuple(max(input));
-            } catch (ExecException ee) {
-                throw ee;
-            } catch (Exception e) {
-                int errCode = 2106;
-                String msg = "Error while computing max in " + this.getClass().getSimpleName();
-                throw new ExecException(msg, errCode, PigException.BUG, e);           
-            }
-        }
-    }
-    static public class Final extends EvalFunc<Integer> {
-        @Override
-        public Integer exec(Tuple input) throws IOException {
-            try {
-                return max(input);
-            } catch (ExecException ee) {
-                throw ee;
-            } catch (Exception e) {
-                int errCode = 2106;
-                String msg = "Error while computing max in " + this.getClass().getSimpleName();
-                throw new ExecException(msg, errCode, PigException.BUG, e);           
-            }
+        public KNOWN_OP getOp() {
+            return KNOWN_OP.MAX;
         }
     }
-
-    static protected Integer max(Tuple input) throws ExecException {
-        DataBag values = (DataBag)(input.get(0));
-
-        // if we were handed an empty bag, return NULL
-        // this is in compliance with SQL standard
-        if(values.size() == 0) {
-            return null;
-        }
-
-        int curMax = Integer.MIN_VALUE;
-        boolean sawNonNull = false;
-        for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
-            Tuple t = it.next();
-            try {
-                Integer i = (Integer)(t.get(0));
-                if (i == null) continue;
-                sawNonNull = true;
-                curMax = java.lang.Math.max(curMax, i);
-            } catch (RuntimeException exp) {
-                int errCode = 2103;
-                String msg = "Problem while computing max of ints.";
-                throw new ExecException(msg, errCode, PigException.BUG, exp);
-            }
-        }
-
-        if(sawNonNull) {
-            return Integer.valueOf(curMax);
-        } else {
-            return null;
-        }
-    }
-
-    @Override
-    public Schema outputSchema(Schema input) {
-        return new Schema(new Schema.FieldSchema(null, DataType.INTEGER)); 
-    }
-    
-    /* Accumulator interface */
-    
-    private Integer intermediateMax = null;
-    
-    @Override
-    public void accumulate(Tuple b) throws IOException {
-        try {
-            Integer curMax = max(b);
-            if (curMax == null) {
-                return;
-            }
-            /* if bag is not null, initialize intermediateMax to negative infinity */
-            if (intermediateMax == null) {
-                intermediateMax = Integer.MIN_VALUE;
-            }
-            intermediateMax = java.lang.Math.max(intermediateMax, curMax);
-        } catch (ExecException ee) {
-            throw ee;
-        } catch (Exception e) {
-            int errCode = 2106;
-            String msg = "Error while computing max in " + this.getClass().getSimpleName();
-            throw new ExecException(msg, errCode, PigException.BUG, e);           
-        }
-    }
-
-    @Override
-    public void cleanup() {
-        intermediateMax = null;
-    }
-
-    @Override
-    public Integer getValue() {
-        return intermediateMax;
-    }
     
 }

Modified: pig/trunk/src/org/apache/pig/builtin/IntMin.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/IntMin.java?rev=1214029&r1=1214028&r2=1214029&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/IntMin.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/IntMin.java Wed Dec 14 01:54:08 2011
@@ -17,176 +17,27 @@
  */
 package org.apache.pig.builtin;
 
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.apache.pig.Accumulator;
-import org.apache.pig.Algebraic;
-import org.apache.pig.EvalFunc;
-import org.apache.pig.PigException;
-import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DataType;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-
-
-
 /**
  * This method should never be used directly, use {@link MIN}.
  */
-public class IntMin extends EvalFunc<Integer> implements Algebraic, Accumulator<Integer> {
-
-    @Override
-    public Integer exec(Tuple input) throws IOException {
-        try {
-            return min(input);
-        } catch (ExecException ee) {
-            throw ee;
-        } catch (Exception e) {
-            int errCode = 2106;
-            String msg = "Error while computing min in " + this.getClass().getSimpleName();
-            throw new ExecException(msg, errCode, PigException.BUG, e);           
-        }
-    }
+public class IntMin extends AlgebraicIntMathBase {
 
-    public String getInitial() {
-        return Initial.class.getName();
+    public IntMin() {
+        setOp(KNOWN_OP.MIN);
     }
 
-    public String getIntermed() {
-        return Intermediate.class.getName();
-    }
-
-    public String getFinal() {
-        return Final.class.getName();
-    }
-
-    static public class Initial extends EvalFunc<Tuple> {
-        private static TupleFactory tfact = TupleFactory.getInstance();
-
+    public static class Intermediate extends AlgebraicIntMathBase.Intermediate {
         @Override
-        public Tuple exec(Tuple input) throws IOException {
-            try {
-                // input is a bag with one tuple containing
-                // the column we are trying to min on
-                DataBag bg = (DataBag) input.get(0);
-                Integer i = null;
-                if(bg.iterator().hasNext()) {
-                    Tuple tp = bg.iterator().next();
-                    i = (Integer)(tp.get(0));
-                }
-                return tfact.newTuple(i);
-            } catch (ExecException ee) {
-                throw ee;
-            } catch (Exception e) {
-                int errCode = 2106;
-                String msg = "Error while computing min in " + this.getClass().getSimpleName();
-                throw new ExecException(msg, errCode, PigException.BUG, e);           
-            }
+        public KNOWN_OP getOp() {
+            return KNOWN_OP.MIN;
         }
     }
 
-    static public class Intermediate extends EvalFunc<Tuple> {
-        private static TupleFactory tfact = TupleFactory.getInstance();
-
-        @Override
-        public Tuple exec(Tuple input) throws IOException {
-            try {
-                return tfact.newTuple(min(input));
-            } catch (ExecException ee) {
-                throw ee;
-            } catch (Exception e) {
-                int errCode = 2106;
-                String msg = "Error while computing min in " + this.getClass().getSimpleName();
-                throw new ExecException(msg, errCode, PigException.BUG, e);           
-            }
-        }
-    }
-    static public class Final extends EvalFunc<Integer> {
+    public static class Final extends AlgebraicIntMathBase.Final {
         @Override
-        public Integer exec(Tuple input) throws IOException {
-            try {
-                return min(input);
-            } catch (ExecException ee) {
-                throw ee;
-            } catch (Exception e) {
-                int errCode = 2106;
-                String msg = "Error while computing min in " + this.getClass().getSimpleName();
-                throw new ExecException(msg, errCode, PigException.BUG, e);           
-            }
-        }
-    }
-
-    static protected Integer min(Tuple input) throws ExecException {
-        DataBag values = (DataBag)input.get(0);
-        
-        // if we were handed an empty bag, return NULL
-        // this is in compliance with SQL standard
-        if(values.size() == 0) {
-            return null;
-        }
-
-        int curMin = Integer.MAX_VALUE;
-        boolean sawNonNull = false;
-        for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
-            Tuple t = it.next();
-            try {
-                Integer i = (Integer)(t.get(0));
-                if (i == null) continue;
-                sawNonNull = true;
-                curMin = java.lang.Math.min(curMin, i);
-            } catch (RuntimeException exp) {
-                int errCode = 2103;
-                String msg = "Problem while computing min of floats.";
-                throw new ExecException(msg, errCode, PigException.BUG, exp);
-            }
+        public KNOWN_OP getOp() {
+            return KNOWN_OP.MIN;
         }
-    
-        if(sawNonNull) {
-            return Integer.valueOf(curMin);
-        } else {
-            return null;
-        }
-    }
-
-    @Override
-    public Schema outputSchema(Schema input) {
-        return new Schema(new Schema.FieldSchema(null, DataType.INTEGER)); 
-    }
-    
-    /* Accumulator interface implementation */
-    private Integer intermediateMin = null;
-    
-    @Override
-    public void accumulate(Tuple b) throws IOException {
-        try {
-            Integer curMin = min(b);
-            if (curMin == null) {
-                return;
-            }
-            /* if bag is not null, initialize intermediateMax to negative infinity */
-            if (intermediateMin == null) {
-                intermediateMin = Integer.MAX_VALUE;
-            }
-            intermediateMin = java.lang.Math.min(intermediateMin, curMin);
-        } catch (ExecException ee) {
-            throw ee;
-        } catch (Exception e) {
-            int errCode = 2106;
-            String msg = "Error while computing min in " + this.getClass().getSimpleName();
-            throw new ExecException(msg, errCode, PigException.BUG, e);           
-        }
-    }
-
-    @Override
-    public void cleanup() {
-        intermediateMin = null;
     }
 
-    @Override
-    public Integer getValue() {
-        return intermediateMin;
-    }    
 }

Modified: pig/trunk/src/org/apache/pig/builtin/IntSum.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/IntSum.java?rev=1214029&r1=1214028&r2=1214029&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/IntSum.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/IntSum.java Wed Dec 14 01:54:08 2011
@@ -17,214 +17,10 @@
  */
 package org.apache.pig.builtin;
 
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.apache.pig.Accumulator;
-import org.apache.pig.Algebraic;
-import org.apache.pig.EvalFunc;
-import org.apache.pig.PigException;
-import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DataType;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-
 /**
  * This method should never be used directly, use {@link SUM}.
  */
-public class IntSum extends EvalFunc<Long> implements Algebraic, Accumulator<Long> {
-
-    @Override
-    public Long exec(Tuple input) throws IOException {
-        try {
-            return sum(input);
-        } catch (ExecException ee) {
-            throw ee;
-        } catch (Exception e) {
-            int errCode = 2106;
-            String msg = "Error while computing sum in " + this.getClass().getSimpleName();
-            throw new ExecException(msg, errCode, PigException.BUG, e);           
-        }
-    }
-
-    public String getInitial() {
-        return Initial.class.getName();
-    }
-
-    public String getIntermed() {
-        return Intermediate.class.getName();
-    }
-
-    public String getFinal() {
-        return Final.class.getName();
-    }
-
-    static public class Initial extends EvalFunc<Tuple> {
-        private static TupleFactory tfact = TupleFactory.getInstance();
-
-        @Override
-        public Tuple exec(Tuple input) throws IOException {
-            // Initial is called in the map - for SUM
-            // we just send the tuple down
-            try {
-                // input is a bag with one tuple containing
-                // the column we are trying to sum
-                DataBag bg = (DataBag) input.get(0);
-                Integer i = null;
-                if(bg.iterator().hasNext()) {
-                    Tuple tp = bg.iterator().next();
-                    i = (Integer)tp.get(0);
-                }
-                return tfact.newTuple(i != null ? 
-                        Long.valueOf(i) : null);
-            }catch(NumberFormatException nfe){
-                // treat this particular input as null
-                Tuple t = tfact.newTuple(1);
-                t.set(0, null);
-                return t;
-            } catch (ExecException e) {
-                throw e;
-            } catch (Exception e) {
-                int errCode = 2106;
-                String msg = "Error while computing sum in " + this.getClass().getSimpleName();
-                throw new ExecException(msg, errCode, PigException.BUG, e);           
-            }
-        }
-    }
-    static public class Intermediate extends EvalFunc<Tuple> {
-        private static TupleFactory tfact = TupleFactory.getInstance();
-
-        @Override
-        public Tuple exec(Tuple input) throws IOException {
-            try {
-                return tfact.newTuple(sumLongs(input));
-            } catch (ExecException ee) {
-                throw ee;
-            } catch (Exception e) {
-                int errCode = 2106;
-                String msg = "Error while computing sum in " + this.getClass().getSimpleName();
-                throw new ExecException(msg, errCode, PigException.BUG, e);           
-            }
-        }
-    }
-    static public class Final extends EvalFunc<Long> {
-        @Override
-        public Long exec(Tuple input) throws IOException {
-            try {
-                return sumLongs(input);
-            } catch (ExecException e) {
-                 throw e;
-            } catch (Exception e) {
-                int errCode = 2106;
-                String msg = "Error while computing sum in " + this.getClass().getSimpleName();
-                throw new ExecException(msg, errCode, PigException.BUG, e);           
-            }
-        }
-    }
-
-    static protected Long sumLongs(Tuple input) throws ExecException {
-        // Can't just call sum, because the intermediate results are
-        // now Longs insteads of Integers.
-        DataBag values = (DataBag)input.get(0);
-
-        // if we were handed an empty bag, return NULL
-        // this is in compliance with SQL standard
-        if(values.size() == 0) {
-            return null;
-        }
-
-        long sum = 0;
-        boolean sawNonNull = false;
-        for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
-            Tuple t = it.next();
-            try {
-                Long l = (Long)(t.get(0));
-                if (l == null) continue;
-                sawNonNull = true;
-                sum += l;
-            }catch(RuntimeException exp) {
-                int errCode = 2103;
-                String msg = "Problem while computing sum of longs.";
-                throw new ExecException(msg, errCode, PigException.BUG, exp);
-            }
-        }
-
-        
-        if(sawNonNull) {
-            return Long.valueOf(sum);
-        } else {
-            return null;
-        }
-    }
-
-    static protected  Long sum(Tuple input) throws ExecException {
-        DataBag values = (DataBag)input.get(0);
-        
-        // if we were handed an empty bag, return NULL
-        // this is in compliance with SQL standard
-        if(values.size() == 0) {
-            return null;
-        }
-
-        long sum = 0;
-        boolean sawNonNull = false;
-        for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
-            Tuple t = it.next();
-            try {
-                Integer i = (Integer)(t.get(0));
-                if (i == null) continue;
-                sawNonNull = true;
-                sum += i;
-            }catch(RuntimeException exp) {
-                int errCode = 2103;
-                String msg = "Problem while computing sum of ints.";
-                throw new ExecException(msg, errCode, PigException.BUG, exp);
-            }
-        }
-
-        
-        if(sawNonNull) {
-            return Long.valueOf(sum);
-        } else {
-            return null;
-        }
-
-    }
-
-    @Override
-    public Schema outputSchema(Schema input) {
-        return new Schema(new Schema.FieldSchema(null, DataType.LONG)); 
-    }
-
-    /* Accumulator interface implementation*/
-    private Long intermediateSum = null;
-    
-    @Override
-    public void accumulate(Tuple b) throws IOException {
-        try {
-            Long curSum = sum(b);
-            if (curSum == null) {
-                return;
-            }
-            intermediateSum = (intermediateSum == null ? 0L : intermediateSum) + curSum;
-        } catch (ExecException ee) {
-            throw ee;
-        } catch (Exception e) {
-            int errCode = 2106;
-            String msg = "Error while computing min in " + this.getClass().getSimpleName();
-            throw new ExecException(msg, errCode, PigException.BUG, e);           
-        }
-    }
-
-    @Override
-    public void cleanup() {
-        intermediateSum = null;
-    }
-
-    @Override
-    public Long getValue() {
-        return intermediateSum;
-    }    
+public class IntSum extends LongSum {
+    // Just here for backwards compatibility
+    public IntSum() {}
 }

Modified: pig/trunk/src/org/apache/pig/builtin/LongMax.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/LongMax.java?rev=1214029&r1=1214028&r2=1214029&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/LongMax.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/LongMax.java Wed Dec 14 01:54:08 2011
@@ -17,175 +17,27 @@
  */
 package org.apache.pig.builtin;
 
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.apache.pig.Accumulator;
-import org.apache.pig.Algebraic;
-import org.apache.pig.EvalFunc;
-import org.apache.pig.PigException;
-import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DataType;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-
 /**
  * This method should never be used directly, use {@link MAX}.
  */
-public class LongMax extends EvalFunc<Long> implements Algebraic, Accumulator<Long> {
-
-    @Override
-    public Long exec(Tuple input) throws IOException {
-         try {
-            return max(input);
-        } catch (ExecException ee) {
-            throw ee;
-        } catch (Exception e) {
-            int errCode = 2106;
-            String msg = "Error while computing max in " + this.getClass().getSimpleName();
-            throw new ExecException(msg, errCode, PigException.BUG, e);           
-        }
-    }
-
-    public String getInitial() {
-        return Initial.class.getName();
-    }
-
-    public String getIntermed() {
-        return Intermediate.class.getName();
-    }
-
-    public String getFinal() {
-        return Final.class.getName();
-    }
-
-    static public class Initial extends EvalFunc<Tuple> {
-        private static TupleFactory tfact = TupleFactory.getInstance();
+public class LongMax extends AlgebraicLongMathBase {
 
-        @Override
-        public Tuple exec(Tuple input) throws IOException {
-            try {
-                // input is a bag with one tuple containing
-                // the column we are trying to max on
-                DataBag bg = (DataBag) input.get(0);
-                Long l = null;
-                if(bg.iterator().hasNext()) {
-                    Tuple tp = bg.iterator().next();
-                    l = (Long)(tp.get(0));
-                }
-                return tfact.newTuple(l);
-            } catch (ExecException ee) {
-                throw ee;
-            } catch (Exception e) {
-                int errCode = 2106;
-                String msg = "Error while computing max in " + this.getClass().getSimpleName();
-                throw new ExecException(msg, errCode, PigException.BUG, e);           
-            }
-        }
+    public LongMax() {
+        setOp(KNOWN_OP.MAX);
     }
 
-    static public class Intermediate extends EvalFunc<Tuple> {
-        private static TupleFactory tfact = TupleFactory.getInstance();
-
+    public static class Intermediate extends AlgebraicLongMathBase.Intermediate {
         @Override
-        public Tuple exec(Tuple input) throws IOException {
-            try {
-                return tfact.newTuple(max(input));
-            } catch (ExecException ee) {
-                throw ee;
-            } catch (Exception e) {
-                int errCode = 2106;
-                String msg = "Error while computing max in " + this.getClass().getSimpleName();
-                throw new ExecException(msg, errCode, PigException.BUG, e);           
+        public KNOWN_OP getOp() {
+            return KNOWN_OP.MAX;
             }
         }
-    }
-    static public class Final extends EvalFunc<Long> {
-        @Override
-        public Long exec(Tuple input) throws IOException {
-            try {
-                return max(input);
-            } catch (ExecException ee) {
-                throw ee;
-            } catch (Exception e) {
-                int errCode = 2106;
-                String msg = "Error while computing max in " + this.getClass().getSimpleName();
-                throw new ExecException(msg, errCode, PigException.BUG, e);           
-            }
-        }
-    }
-
-    static protected Long max(Tuple input) throws ExecException {
-        DataBag values = (DataBag)(input.get(0));
-
-        // if we were handed an empty bag, return NULL
-        // this is in compliance with SQL standard
-        if(values.size() == 0) {
-            return null;
-        }
-
-        long curMax = Long.MIN_VALUE;
-        boolean sawNonNull = false;
-        for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
-            Tuple t = it.next();
-            try {
-                Long l = (Long)(t.get(0));
-                if (l == null) continue;
-                sawNonNull = true;
-                curMax = java.lang.Math.max(curMax, l);
-            } catch (RuntimeException exp) {
-                int errCode = 2103;
-                String msg = "Problem while computing max of longs.";
-                throw new ExecException(msg, errCode, PigException.BUG, exp);
-            }
-        }
-
-        if(sawNonNull) {
-            return Long.valueOf(curMax);
-        } else {
-            return null;
-        }
-    }
-
-    @Override
-    public Schema outputSchema(Schema input) {
-        return new Schema(new Schema.FieldSchema(null, DataType.LONG)); 
-    }
-    
-    /* Accumulator interface */
-    
-    private Long intermediateMax = null;
     
+    public static class Final extends AlgebraicLongMathBase.Final {
     @Override
-    public void accumulate(Tuple b) throws IOException {
-        try {
-            Long curMax = max(b);
-            if (curMax == null) {
-                return;
-            }
-            /* if bag is not null, initialize intermediateMax to negative infinity */
-            if (intermediateMax == null) {
-                intermediateMax = Long.MIN_VALUE;
-            }
-            intermediateMax = java.lang.Math.max(intermediateMax, curMax);
-        } catch (ExecException ee) {
-            throw ee;
-        } catch (Exception e) {
-            int errCode = 2106;
-            String msg = "Error while computing min in " + this.getClass().getSimpleName();
-            throw new ExecException(msg, errCode, PigException.BUG, e);           
+        public KNOWN_OP getOp() {
+            return KNOWN_OP.MAX;
         }
     }
 
-    @Override
-    public void cleanup() {
-        intermediateMax = null;
-    }
-
-    @Override
-    public Long getValue() {
-        return intermediateMax;
-    }
 }

Modified: pig/trunk/src/org/apache/pig/builtin/LongMin.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/LongMin.java?rev=1214029&r1=1214028&r2=1214029&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/LongMin.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/LongMin.java Wed Dec 14 01:54:08 2011
@@ -17,174 +17,27 @@
  */
 package org.apache.pig.builtin;
 
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.apache.pig.Accumulator;
-import org.apache.pig.Algebraic;
-import org.apache.pig.EvalFunc;
-import org.apache.pig.PigException;
-import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DataType;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-
 /**
  * This method should never be used directly, use {@link MIN}.
  */
-public class LongMin extends EvalFunc<Long> implements Algebraic, Accumulator<Long> {
-
-    @Override
-    public Long exec(Tuple input) throws IOException {
-        try {
-            return min(input);
-        } catch (ExecException ee) {
-            throw ee;
-        } catch (Exception e) {
-            int errCode = 2106;
-            String msg = "Error while computing min in " + this.getClass().getSimpleName();
-            throw new ExecException(msg, errCode, PigException.BUG, e);           
-        }
-    }
-
-    public String getInitial() {
-        return Initial.class.getName();
-    }
-
-    public String getIntermed() {
-        return Intermediate.class.getName();
-    }
+public class LongMin extends AlgebraicLongMathBase {
 
-    public String getFinal() {
-        return Final.class.getName();
+    public LongMin() {
+        setOp(KNOWN_OP.MIN);
     }
 
-    static public class Initial extends EvalFunc<Tuple> {
-        private static TupleFactory tfact = TupleFactory.getInstance();
-
+    public static class Intermediate extends AlgebraicLongMathBase.Intermediate {
         @Override
-        public Tuple exec(Tuple input) throws IOException {
-            try {
-                // input is a bag with one tuple containing
-                // the column we are trying to min on
-                DataBag bg = (DataBag) input.get(0);
-                Long l = null;
-                if(bg.iterator().hasNext()) {
-                    Tuple tp = bg.iterator().next();
-                    l = (Long)(tp.get(0)); 
+        public KNOWN_OP getOp() {
+            return KNOWN_OP.MIN;
                 }
-                return tfact.newTuple(l);
-            } catch (ExecException ee) {
-                throw ee;
-            } catch (Exception e) {
-                int errCode = 2106;
-                String msg = "Error while computing min in " + this.getClass().getSimpleName();
-                throw new ExecException(msg, errCode, PigException.BUG, e);            
             }
-        }
-    }
-
-    static public class Intermediate extends EvalFunc<Tuple> {
-        private static TupleFactory tfact = TupleFactory.getInstance();
 
+    public static class Final extends AlgebraicLongMathBase.Final {
         @Override
-        public Tuple exec(Tuple input) throws IOException {
-            try {
-                return tfact.newTuple(min(input));
-            } catch (ExecException ee) {
-                throw ee;
-            } catch (Exception e) {
-                int errCode = 2106;
-                String msg = "Error while computing min in " + this.getClass().getSimpleName();
-                throw new ExecException(msg, errCode, PigException.BUG, e);            
-            }
-        }
-    }
-    static public class Final extends EvalFunc<Long> {
-        @Override
-        public Long exec(Tuple input) throws IOException {
-            try {
-                return min(input);
-            } catch (ExecException ee) {
-                throw ee;
-            } catch (Exception e) {
-                int errCode = 2106;
-                String msg = "Error while computing min in " + this.getClass().getSimpleName();
-                throw new ExecException(msg, errCode, PigException.BUG, e);            
+        public KNOWN_OP getOp() {
+            return KNOWN_OP.MIN;
             }
         }
-    }
-
-    static protected Long min(Tuple input) throws ExecException {
-        DataBag values = (DataBag)input.get(0);
-
-        // if we were handed an empty bag, return NULL
-        // this is in compliance with SQL standard
-        if(values.size() == 0) {
-            return null;
-        }
-
-        long curMin = Long.MAX_VALUE;
-        boolean sawNonNull = false;
-        for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
-            Tuple t = it.next();
-            try {
-                Long l = (Long)(t.get(0));
-                if (l == null) continue;
-                sawNonNull = true;
-                curMin = java.lang.Math.min(curMin, l);
-            } catch (RuntimeException exp) {
-                int errCode = 2103;
-                String msg = "Problem while computing min of longs.";
-                throw new ExecException(msg, errCode, PigException.BUG, exp);
-            }
-        }
-    
-        if(sawNonNull) {
-            return Long.valueOf(curMin);
-        } else {
-            return null;
-        }
-    }
-
-    @Override
-    public Schema outputSchema(Schema input) {
-        return new Schema(new Schema.FieldSchema(null, DataType.LONG)); 
-    }
-    
-    /* Accumulator interface implementation */
-    private Long intermediateMin = null;
-    
-    @Override
-    public void accumulate(Tuple b) throws IOException {
-        try {
-            Long curMin = min(b);
-            if (curMin == null) {
-                return;
-            }
-            /* if bag is not null, initialize intermediateMax to negative infinity */
-            if (intermediateMin == null) {
-                intermediateMin = Long.MAX_VALUE;
-            }
-            intermediateMin = java.lang.Math.min(intermediateMin, curMin);
-        } catch (ExecException ee) {
-            throw ee;
-        } catch (Exception e) {
-            int errCode = 2106;
-            String msg = "Error while computing min in " + this.getClass().getSimpleName();
-            throw new ExecException(msg, errCode, PigException.BUG, e);           
-        }
-    }
-
-    @Override
-    public void cleanup() {
-        intermediateMin = null;
-    }
 
-    @Override
-    public Long getValue() {
-        return intermediateMin;
-    }    
 }

Modified: pig/trunk/src/org/apache/pig/builtin/LongSum.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/LongSum.java?rev=1214029&r1=1214028&r2=1214029&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/LongSum.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/LongSum.java Wed Dec 14 01:54:08 2011
@@ -17,173 +17,27 @@
  */
 package org.apache.pig.builtin;
 
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.apache.pig.Accumulator;
-import org.apache.pig.Algebraic;
-import org.apache.pig.EvalFunc;
-import org.apache.pig.PigException;
-import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DataType;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-
-
 /**
  * This method should never be used directly, use {@link SUM}.
  */
-public class LongSum extends EvalFunc<Long> implements Algebraic, Accumulator<Long> {
+public class LongSum extends AlgebraicLongMathBase {
 
-    @Override
-    public Long exec(Tuple input) throws IOException {
-        try {
-            return sum(input);
-        } catch (ExecException ee) {
-            throw ee;
-        } catch (Exception e) {
-            int errCode = 2106;
-            String msg = "Error while computing sum in " + this.getClass().getSimpleName();
-            throw new ExecException(msg, errCode, PigException.BUG, e);           
-        }
+    public LongSum() {
+        setOp(KNOWN_OP.SUM);
     }
 
-    public String getInitial() {
-        return Initial.class.getName();
-    }
-
-    public String getIntermed() {
-        return Intermediate.class.getName();
-    }
-
-    public String getFinal() {
-        return Final.class.getName();
-    }
-
-    static public class Initial extends EvalFunc<Tuple> {
-        private static TupleFactory tfact = TupleFactory.getInstance();
-
+    public static class Intermediate extends AlgebraicLongMathBase.Intermediate {
         @Override
-        public Tuple exec(Tuple input) throws IOException {
-            // Initial is called in the map - for SUM
-            // we just send the tuple down
-            try {
-                // input is a bag with one tuple containing
-                // the column we are trying to sum
-                DataBag bg = (DataBag) input.get(0);
-                Long l = null;
-                if(bg.iterator().hasNext()) {
-                    Tuple tp = bg.iterator().next();
-                    l = (Long)tp.get(0);
-                }
-                return tfact.newTuple(l);
-            } catch (ExecException e) {
-                throw e;
-            } catch (Exception e) {
-                int errCode = 2106;
-                String msg = "Error while computing sum in " + this.getClass().getSimpleName();
-                throw new ExecException(msg, errCode, PigException.BUG, e);
-            }
+        public KNOWN_OP getOp() {
+            return KNOWN_OP.SUM;
         }
     }
-    static public class Intermediate extends EvalFunc<Tuple> {
-        private static TupleFactory tfact = TupleFactory.getInstance();
 
+    public static class Final extends AlgebraicLongMathBase.Final {
         @Override
-        public Tuple exec(Tuple input) throws IOException {
-            try {
-                return tfact.newTuple(sum(input));
-            } catch (ExecException ee) {
-                throw ee;
-            } catch (Exception e) {
-                int errCode = 2106;
-                String msg = "Error while computing sum in " + this.getClass().getSimpleName();
-                throw new ExecException(msg, errCode, PigException.BUG, e);
-            }
-        }
-    }
-    static public class Final extends EvalFunc<Long> {
-        @Override
-        public Long exec(Tuple input) throws IOException {
-            try {
-                return sum(input);
-            } catch (ExecException ee) {
-                throw ee;
-            } catch (Exception e) {
-                int errCode = 2106;
-                String msg = "Error while computing sum in " + this.getClass().getSimpleName();
-                throw new ExecException(msg, errCode, PigException.BUG, e);
+        public KNOWN_OP getOp() {
+            return KNOWN_OP.SUM;
             }
         }
-    }
-
-    static protected  Long sum(Tuple input) throws ExecException {
-        DataBag values = (DataBag)input.get(0);
-        
-        // if we were handed an empty bag, return NULL
-        // this is in compliance with SQL standard
-        if(values.size() == 0) {
-            return null;
-        }
-
-        long sum = 0;
-        boolean sawNonNull = false;
-        for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
-            Tuple t = it.next();
-            try {
-                Long l = (Long)(t.get(0));
-                if (l == null) continue;
-                sawNonNull = true;
-                sum += l;
-            }catch(RuntimeException exp) {
-                int errCode = 2103;
-                String msg = "Problem while computing sum of longs.";
-                throw new ExecException(msg, errCode, PigException.BUG, exp);
-            }
-        }
-
-        if(sawNonNull) {
-            return Long.valueOf(sum);
-        } else {
-            return null;
-        }
-    }
-
-    @Override
-    public Schema outputSchema(Schema input) {
-        return new Schema(new Schema.FieldSchema(null, DataType.LONG)); 
-    }
-    
-    /* Accumulator interface implementation*/
-    private Long intermediateSum = null;
-    
-    @Override
-    public void accumulate(Tuple b) throws IOException {
-        try {
-            Long curSum = sum(b);
-            if (curSum == null) {
-                return;
-            }
-            intermediateSum = (intermediateSum == null ? 0L : intermediateSum) + curSum;
-        } catch (ExecException ee) {
-            throw ee;
-        } catch (Exception e) {
-            int errCode = 2106;
-            String msg = "Error while computing min in " + this.getClass().getSimpleName();
-            throw new ExecException(msg, errCode, PigException.BUG, e);           
-        }
-    }
-
-    @Override
-    public void cleanup() {
-        intermediateSum = null;
-    }
-
-    @Override
-    public Long getValue() {
-        return intermediateSum;
-    }    
 
 }

Modified: pig/trunk/src/org/apache/pig/builtin/MAX.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/MAX.java?rev=1214029&r1=1214028&r2=1214029&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/MAX.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/MAX.java Wed Dec 14 01:54:08 2011
@@ -17,22 +17,11 @@
  */
 package org.apache.pig.builtin;
 
-import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Iterator;
 import java.util.List;
 
-import org.apache.pig.Accumulator;
-import org.apache.pig.Algebraic;
-import org.apache.pig.EvalFunc;
 import org.apache.pig.FuncSpec;
-import org.apache.pig.PigException;
-import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.DataType;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 
@@ -54,169 +43,26 @@ import org.apache.pig.impl.logicalLayer.
  * the preferred method of usage it is available in case the combiner can not be
  * used for a given calculation.
  */
-public class MAX extends EvalFunc<Double> implements Algebraic, Accumulator<Double> {
+public class MAX extends AlgebraicByteArrayMathBase {
 
-    @Override
-    public Double exec(Tuple input) throws IOException {
-         try {
-            return max(input);
-        } catch (ExecException ee) {
-            throw ee;
-        } catch (Exception e) {
-            int errCode = 2106;
-            String msg = "Error while computing max in " + this.getClass().getSimpleName();
-            throw new ExecException(msg, errCode, PigException.BUG, e);           
-        }
-    }
-
-    public String getInitial() {
-        return Initial.class.getName();
-    }
-
-    public String getIntermed() {
-        return Intermediate.class.getName();
+    public MAX() {
+        setOp(KNOWN_OP.MAX);
     }
 
-    public String getFinal() {
-        return Final.class.getName();
-    }
-
-    static public class Initial extends EvalFunc<Tuple> {
-        private static TupleFactory tfact = TupleFactory.getInstance();
-
+    public static class Intermediate extends AlgebraicByteArrayMathBase.Intermediate {
         @Override
-        public Tuple exec(Tuple input) throws IOException {
-            try {
-                // input is a bag with one tuple containing
-                // the column we are trying to max on 
-                DataBag bg = (DataBag) input.get(0);
-                DataByteArray dba = null;
-                if(bg.iterator().hasNext()) {
-                    Tuple tp = bg.iterator().next();
-                    dba = (DataByteArray)tp.get(0);
-                }
-                return tfact.newTuple(dba != null ?
-                        Double.valueOf(dba.toString()): null);
-            } catch (NumberFormatException e) {
-                Tuple t = tfact.newTuple(1);
-                t.set (0, null);
-                return t;
-            } catch (ExecException ee) {
-                throw ee;
-            } catch (Exception e) {
-                int errCode = 2106;
-                String msg = "Error while computing max in " + this.getClass().getSimpleName();
-                throw new ExecException(msg, errCode, PigException.BUG, e);           
+        public KNOWN_OP getOp() {
+            return KNOWN_OP.MAX;
             }
         }
-    }
-
-    static public class Intermediate extends EvalFunc<Tuple> {
-        private static TupleFactory tfact = TupleFactory.getInstance();
 
+    public static class Final extends AlgebraicByteArrayMathBase.Final {
         @Override
-        public Tuple exec(Tuple input) throws IOException {
-            try {
-                return tfact.newTuple(maxDoubles(input));
-            } catch (ExecException ee) {
-                throw ee;
-            } catch (Exception e) {
-                int errCode = 2106;
-                String msg = "Error while computing max in " + this.getClass().getSimpleName();
-                throw new ExecException(msg, errCode, PigException.BUG, e);           
-            }
-        }
-    }
-    static public class Final extends EvalFunc<Double> {
-        @Override
-        public Double exec(Tuple input) throws IOException {
-            try {
-                return maxDoubles(input);
-            } catch (ExecException ee) {
-                throw ee;
-            } catch (Exception e) {
-                int errCode = 2106;
-                String msg = "Error while computing max in " + this.getClass().getSimpleName();
-                throw new ExecException(msg, errCode, PigException.BUG, e);           
-            }
-        }
-    }
-
-    static protected Double max(Tuple input) throws ExecException {
-        DataBag values = (DataBag)input.get(0);
-        
-        // if we were handed an empty bag, return NULL
-        // this is in compliance with SQL standard
-        if(values.size() == 0) {
-            return null;
-        }
-
-        double curMax = Double.NEGATIVE_INFINITY;
-        boolean sawNonNull = false;
-        for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
-            Tuple t = it.next();
-            try {
-                DataByteArray dba = (DataByteArray)t.get(0);
-                Double d = dba != null ? Double.valueOf(dba.toString()) : null;
-                if (d == null) continue;
-                sawNonNull = true;
-                curMax = java.lang.Math.max(curMax, d);
-            } catch (RuntimeException exp) {
-                int errCode = 2103;
-                String msg = "Problem while computing max of doubles.";
-                throw new ExecException(msg, errCode, PigException.BUG, exp);
-            }
-        }
-
-        if(sawNonNull) {
-            return new Double(curMax);
-        } else {
-            return null;
+        public KNOWN_OP getOp() {
+            return KNOWN_OP.MAX;
         }
     }
     
-    // same as above function except all its inputs are 
-    // always Double - this should be used for better performance
-    // since we don't have to check the type of the object to
-    // decide it is a double. This should be used when the initial,
-    // intermediate and final versions are used.
-    static protected Double maxDoubles(Tuple input) throws ExecException {
-        DataBag values = (DataBag)input.get(0);
-        
-        // if we were handed an empty bag, return NULL
-        // this is in compliance with SQL standard
-        if(values.size() == 0) {
-            return null;
-        }
-
-        double curMax = Double.NEGATIVE_INFINITY;
-        boolean sawNonNull = false;
-        for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
-            Tuple t = it.next();
-            try {
-                Double d = (Double)t.get(0);
-                if (d == null) continue;
-                sawNonNull = true;
-                curMax = java.lang.Math.max(curMax, d);
-            } catch (RuntimeException exp) {
-                int errCode = 2103;
-                String msg = "Problem while computing max of doubles.";
-                throw new ExecException(msg, errCode, PigException.BUG, exp);
-            }
-        }
-
-        if(sawNonNull) {
-            return new Double(curMax);
-        } else {
-            return null;
-        }
-    }
-
-    @Override
-    public Schema outputSchema(Schema input) {
-        return new Schema(new Schema.FieldSchema(null, DataType.DOUBLE)); 
-    }
-    
     /* (non-Javadoc)
      * @see org.apache.pig.EvalFunc#getArgToFuncMapping()
      */
@@ -231,38 +77,4 @@ public class MAX extends EvalFunc<Double
         funcList.add(new FuncSpec(StringMax.class.getName(), Schema.generateNestedSchema(DataType.BAG, DataType.CHARARRAY)));
         return funcList;
     }
-
-    /* Accumulator interface implementation */
-    private Double intermediateMax = null;
-    
-    @Override
-    public void accumulate(Tuple b) throws IOException {
-        try {
-            Double curMax = max(b);
-            if (curMax == null) {
-                return;
-            }
-            /* if bag is not null, initialize intermediateMax to negative infinity */
-            if (intermediateMax == null) {
-                intermediateMax = Double.NEGATIVE_INFINITY;
-            }
-            intermediateMax = java.lang.Math.max(intermediateMax, curMax);
-        } catch (ExecException ee) {
-            throw ee;
-        } catch (Exception e) {
-            int errCode = 2106;
-            String msg = "Error while computing min in " + this.getClass().getSimpleName();
-            throw new ExecException(msg, errCode, PigException.BUG, e);           
-        }
-    }
-
-    @Override
-    public void cleanup() {
-        intermediateMax = null;
-    }
-
-    @Override
-    public Double getValue() {
-        return intermediateMax;
-    }    
 }

Modified: pig/trunk/src/org/apache/pig/builtin/MIN.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/MIN.java?rev=1214029&r1=1214028&r2=1214029&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/MIN.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/MIN.java Wed Dec 14 01:54:08 2011
@@ -17,22 +17,11 @@
  */
 package org.apache.pig.builtin;
 
-import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Iterator;
 import java.util.List;
 
-import org.apache.pig.Accumulator;
-import org.apache.pig.Algebraic;
-import org.apache.pig.EvalFunc;
 import org.apache.pig.FuncSpec;
-import org.apache.pig.PigException;
-import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.DataType;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 
@@ -54,170 +43,26 @@ import org.apache.pig.impl.logicalLayer.
  * the preferred method of usage it is available in case the combiner can not be
  * used for a given calculation.
  */
-public class MIN extends EvalFunc<Double> implements Algebraic, Accumulator<Double> {
+public class MIN extends  AlgebraicByteArrayMathBase {
 
-    @Override
-    public Double exec(Tuple input) throws IOException {
-        try {
-            return min(input);
-        } catch (ExecException ee) {
-            throw ee;
-        } catch (Exception e) {
-            int errCode = 2106;
-            String msg = "Error while computing min in " + this.getClass().getSimpleName();
-            throw new ExecException(msg, errCode, PigException.BUG, e);           
-        }
-    }
-
-    public String getInitial() {
-        return Initial.class.getName();
-    }
-
-    public String getIntermed() {
-        return Intermediate.class.getName();
+    public MIN() {
+        setOp(KNOWN_OP.MIN);
     }
 
-    public String getFinal() {
-        return Final.class.getName();
-    }
-
-    static public class Initial extends EvalFunc<Tuple> {
-        private static TupleFactory tfact = TupleFactory.getInstance();
-
+    public static class Intermediate extends AlgebraicByteArrayMathBase.Intermediate {
         @Override
-        public Tuple exec(Tuple input) throws IOException {
-            try {
-                // input is a bag with one tuple containing
-                // the column we are trying to min on
-                DataBag bg = (DataBag) input.get(0);
-                DataByteArray dba = null;
-                if(bg.iterator().hasNext()) {
-                    Tuple tp = bg.iterator().next();
-                    dba = (DataByteArray)tp.get(0);
-                }
-                return tfact.newTuple(dba != null?
-                        Double.valueOf(dba.toString()) : null);
-            } catch (NumberFormatException e) {
-                // invalid input, send null
-                Tuple t =  tfact.newTuple(1);
-                t.set(0, null);
-                return t;
-            } catch (ExecException ee) {
-                throw ee;
-            } catch (Exception e) {
-                int errCode = 2106;
-                String msg = "Error while computing min in " + this.getClass().getSimpleName();
-                throw new ExecException(msg, errCode, PigException.BUG, e);           
+        public KNOWN_OP getOp() {
+            return KNOWN_OP.MIN;
             }
         }
-    }
-
-    static public class Intermediate extends EvalFunc<Tuple> {
-        private static TupleFactory tfact = TupleFactory.getInstance();
 
+    public static class Final extends AlgebraicByteArrayMathBase.Final {
         @Override
-        public Tuple exec(Tuple input) throws IOException {
-            try {
-                return tfact.newTuple(minDoubles(input));
-            } catch (ExecException ee) {
-                throw ee;
-            } catch (Exception e) {
-                int errCode = 2106;
-                String msg = "Error while computing min in " + this.getClass().getSimpleName();
-                throw new ExecException(msg, errCode, PigException.BUG, e);           
-            }
-        }
-    }
-    static public class Final extends EvalFunc<Double> {
-        @Override
-        public Double exec(Tuple input) throws IOException {
-            try {
-                return minDoubles(input);
-            } catch (ExecException ee) {
-                throw ee;
-            } catch (Exception e) {
-                int errCode = 2106;
-                String msg = "Error while computing min in " + this.getClass().getSimpleName();
-                throw new ExecException(msg, errCode, PigException.BUG, e);           
-            }
-        }
-    }
-
-    static protected Double min(Tuple input) throws ExecException {
-        DataBag values = (DataBag)input.get(0);
-        
-        // if we were handed an empty bag, return NULL
-        // this is in compliance with SQL standard
-        if(values.size() == 0) {
-            return null;
-        }
-
-        double curMin = Double.POSITIVE_INFINITY;
-        boolean sawNonNull = false;
-        for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
-            Tuple t = it.next();
-            try {
-                DataByteArray dba = (DataByteArray)t.get(0);
-                Double d = dba != null ? Double.valueOf(dba.toString()): null;
-                if (d == null) continue;
-                sawNonNull = true;
-                curMin = java.lang.Math.min(curMin, d);
-            } catch (RuntimeException exp) {
-                int errCode = 2103;
-                String msg = "Problem while computing min of doubles.";
-                throw new ExecException(msg, errCode, PigException.BUG, exp);
-            }
-        }
-    
-        if(sawNonNull) {
-            return new Double(curMin);
-        } else {
-            return null;
+        public KNOWN_OP getOp() {
+            return KNOWN_OP.MIN;
         }
     }
     
-    // same as above function except all its inputs are 
-    // always Double - this should be used for better performance
-    // since we don't have to check the type of the object to
-    // decide it is a double. This should be used when the initial,
-    // intermediate and final versions are used.
-    static protected Double minDoubles(Tuple input) throws ExecException {
-        DataBag values = (DataBag)input.get(0);
-        
-        // if we were handed an empty bag, return NULL
-        // this is in compliance with SQL standard
-        if(values.size() == 0) {
-            return null;
-        }
-
-        double curMin = Double.POSITIVE_INFINITY;
-        boolean sawNonNull = false;
-        for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
-            Tuple t = it.next();
-            try {
-                Double d = (Double)t.get(0);
-                if (d == null) continue;
-                sawNonNull = true;
-                curMin = java.lang.Math.min(curMin, d);
-            } catch (RuntimeException exp) {
-                int errCode = 2103;
-                String msg = "Problem while computing min of doubles.";
-                throw new ExecException(msg, errCode, PigException.BUG, exp);
-            }
-        }
-    
-        if(sawNonNull) {
-            return new Double(curMin);
-        } else {
-            return null;
-        }
-    }
-
-    @Override
-    public Schema outputSchema(Schema input) {
-        return new Schema(new Schema.FieldSchema(null, DataType.DOUBLE)); 
-    }
-    
     /* (non-Javadoc)
      * @see org.apache.pig.EvalFunc#getArgToFuncMapping()
      */
@@ -232,38 +77,4 @@ public class MIN extends EvalFunc<Double
         funcList.add(new FuncSpec(StringMin.class.getName(), Schema.generateNestedSchema(DataType.BAG, DataType.CHARARRAY)));
         return funcList;
     }
-
-    /* Accumulator interface implementation */
-    private Double intermediateMin = null;
-    
-    @Override
-    public void accumulate(Tuple b) throws IOException {
-        try {
-            Double curMin = min(b);
-            if (curMin == null) {
-                return;
-            }
-            /* if bag is not null, initialize intermediateMax to negative infinity */
-            if (intermediateMin == null) {
-                intermediateMin = Double.POSITIVE_INFINITY;
-            }
-            intermediateMin = java.lang.Math.min(intermediateMin, curMin);
-        } catch (ExecException ee) {
-            throw ee;
-        } catch (Exception e) {
-            int errCode = 2106;
-            String msg = "Error while computing min in " + this.getClass().getSimpleName();
-            throw new ExecException(msg, errCode, PigException.BUG, e);           
-        }
-    }
-
-    @Override
-    public void cleanup() {
-        intermediateMin = null;
-    }
-
-    @Override
-    public Double getValue() {
-        return intermediateMin;
-    }    
 }

Modified: pig/trunk/src/org/apache/pig/builtin/SUM.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/SUM.java?rev=1214029&r1=1214028&r2=1214029&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/SUM.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/SUM.java Wed Dec 14 01:54:08 2011
@@ -17,22 +17,11 @@
  */
 package org.apache.pig.builtin;
 
-import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Iterator;
 import java.util.List;
 
-import org.apache.pig.Accumulator;
-import org.apache.pig.Algebraic;
-import org.apache.pig.EvalFunc;
 import org.apache.pig.FuncSpec;
-import org.apache.pig.PigException;
-import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.DataType;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 
@@ -55,175 +44,24 @@ import org.apache.pig.impl.logicalLayer.
  * the preferred method of usage it is available in case the combiner can not be
  * used for a given calculation.
  */
-public class SUM extends EvalFunc<Double> implements Algebraic, Accumulator<Double> {
+public class SUM extends AlgebraicByteArrayMathBase {
 
-    @Override
-    public Double exec(Tuple input) throws IOException {
-        try {
-            return sum(input);
-        } catch (ExecException ee) {
-            throw ee;
-        } catch (Exception e) {
-            int errCode = 2106;
-            String msg = "Error while computing sum in " + this.getClass().getSimpleName();
-            throw new ExecException(msg, errCode, PigException.BUG, e);           
-        }
-    }
-
-    public String getInitial() {
-        return Initial.class.getName();
-    }
-
-    public String getIntermed() {
-        return Intermediate.class.getName();
+    public SUM() {
+        setOp(KNOWN_OP.SUM);
     }
 
-    public String getFinal() {
-        return Final.class.getName();
-    }
-
-    static public class Initial extends EvalFunc<Tuple> {
-        private static TupleFactory tfact = TupleFactory.getInstance();
-
+    public static class Intermediate extends AlgebraicByteArrayMathBase.Intermediate {
         @Override
-        public Tuple exec(Tuple input) throws IOException {
-            // Initial is called in the map - for SUM
-            // we just send the tuple down
-            try {
-                // input is a bag with one tuple containing
-                // the column we are trying to sum
-                DataBag bg = (DataBag) input.get(0);
-                DataByteArray dba = null;
-                if(bg.iterator().hasNext()) {
-                    Tuple tp = bg.iterator().next();
-                    dba = (DataByteArray)tp.get(0); 
-                }
-                return tfact.newTuple(dba != null?
-                        Double.valueOf(dba.toString()): null);
-            }catch(NumberFormatException nfe){
-                // treat this particular input as null
-                Tuple t = tfact.newTuple(1);
-                t.set(0, null);
-                return t;
-            } catch (ExecException e) {
-                throw e;
-            } catch (Exception e) {
-                int errCode = 2106;
-                String msg = "Error while computing sum in " + this.getClass().getSimpleName();
-                throw new ExecException(msg, errCode, PigException.BUG, e);
+        public KNOWN_OP getOp() {
+            return KNOWN_OP.SUM;
             }
         }
-    }
-    static public class Intermediate extends EvalFunc<Tuple> {
-        private static TupleFactory tfact = TupleFactory.getInstance();
 
+    public static class Final extends AlgebraicByteArrayMathBase.Final {
         @Override
-        public Tuple exec(Tuple input) throws IOException {
-        	try {
-                return tfact.newTuple(sumDoubles(input));
-            } catch (ExecException ee) {
-                throw ee;
-            } catch (Exception e) {
-                int errCode = 2106;
-                String msg = "Error while computing sum in " + this.getClass().getSimpleName();
-                throw new ExecException(msg, errCode, PigException.BUG, e);
-            }
-        }
+        public KNOWN_OP getOp() {
+            return KNOWN_OP.SUM;
     }
-    static public class Final extends EvalFunc<Double> {
-        @Override
-        public Double exec(Tuple input) throws IOException {
-            try {
-                return sumDoubles(input);
-            } catch (ExecException ee) {
-                throw ee;
-            } catch (Exception e) {
-                int errCode = 2106;
-                String msg = "Error while computing sum in " + this.getClass().getSimpleName();
-                throw new ExecException(msg, errCode, PigException.BUG, e);
-            }
-        }
-    }
-
-    static protected Double sum(Tuple input) throws ExecException {
-        DataBag values = (DataBag)input.get(0);
-        
-        // if we were handed an empty bag, return NULL
-        // this is in compliance with SQL standard
-        if(values.size() == 0) {
-            return null;
-        }
-
-        double sum = 0;
-        boolean sawNonNull = false;
-        for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
-            Tuple t = it.next();
-            try {
-                DataByteArray dba = (DataByteArray)t.get(0);
-                Double d = 
-                    dba != null ? Double.valueOf(dba.toString()): null;
-                if (d == null) continue;
-                sawNonNull = true;
-                sum += d;
-            
-            }catch(RuntimeException exp) {
-                int errCode = 2103;
-                String msg = "Problem while computing sum of doubles.";
-                throw new ExecException(msg, errCode, PigException.BUG, exp);
-            }
-        }
-        
-        
-        if(sawNonNull) {
-            return new Double(sum);
-        } else {
-            return null;
-        }
-    }
-
-    // same as above function except all its inputs are 
-    // always Double - this should be used for better performance
-    // since we don't have to check the type of the object to
-    // decide it is a double. This should be used when the initial,
-    // intermediate and final versions are used.
-    static protected Double sumDoubles(Tuple input) throws ExecException {
-        DataBag values = (DataBag)input.get(0);
-        
-        // if we were handed an empty bag, return NULL
-        // this is in compliance with SQL standard
-        if(values.size() == 0) {
-            return null;
-        }
-
-        double sum = 0;
-        boolean sawNonNull = false;
-        for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
-            Tuple t = it.next();
-            try {
-                // we can cast directly because we SHOULD
-                // only be getting Doubles here
-                Double d = (Double)(t.get(0));
-                if (d == null) continue;
-                sawNonNull = true;
-                sum += d;
-            
-            }catch(RuntimeException exp) {
-                int errCode = 2103;
-                String msg = "Problem while computing sum of doubles.";
-                throw new ExecException(msg, errCode, PigException.BUG, exp);
-            }
-        }
-        
-        if(sawNonNull) {
-            return new Double(sum);
-        } else {
-            return null;
-        }
-    }
-
-    @Override
-    public Schema outputSchema(Schema input) {
-        return new Schema(new Schema.FieldSchema(null, DataType.DOUBLE)); 
     }
 
     /* (non-Javadoc)
@@ -233,41 +71,13 @@ public class SUM extends EvalFunc<Double
     public List<FuncSpec> getArgToFuncMapping() throws FrontendException {
         List<FuncSpec> funcList = new ArrayList<FuncSpec>();
         funcList.add(new FuncSpec(this.getClass().getName(), Schema.generateNestedSchema(DataType.BAG, DataType.BYTEARRAY)));
+        // DoubleSum works for both Floats and Doubles
         funcList.add(new FuncSpec(DoubleSum.class.getName(), Schema.generateNestedSchema(DataType.BAG, DataType.DOUBLE)));
-        funcList.add(new FuncSpec(FloatSum.class.getName(), Schema.generateNestedSchema(DataType.BAG, DataType.FLOAT)));
-        funcList.add(new FuncSpec(IntSum.class.getName(), Schema.generateNestedSchema(DataType.BAG, DataType.INTEGER)));
+        funcList.add(new FuncSpec(DoubleSum.class.getName(), Schema.generateNestedSchema(DataType.BAG, DataType.FLOAT)));
+        // LongSum works for both Ints and Longs.
+        funcList.add(new FuncSpec(LongSum.class.getName(), Schema.generateNestedSchema(DataType.BAG, DataType.INTEGER)));
         funcList.add(new FuncSpec(LongSum.class.getName(), Schema.generateNestedSchema(DataType.BAG, DataType.LONG)));
         return funcList;
     }
 
-    /* Accumulator interface implementation*/
-    private Double intermediateSum = null;
-    
-    @Override
-    public void accumulate(Tuple b) throws IOException {
-        try {
-            Double curSum = sum(b);
-            if (curSum == null) {
-                return;
-            }
-            intermediateSum = (intermediateSum == null ? 0.0 : intermediateSum) + curSum;
-        } catch (ExecException ee) {
-            throw ee;
-        } catch (Exception e) {
-            int errCode = 2106;
-            String msg = "Error while computing sum in " + this.getClass().getSimpleName();
-            throw new ExecException(msg, errCode, PigException.BUG, e);
-        }
-    }
-
-    @Override
-    public void cleanup() {
-        intermediateSum = null;
-    }
-
-    @Override
-    public Double getValue() {
-        return intermediateSum;
-    }    
-    
 }



Mime
View raw message