pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dvrya...@apache.org
Subject svn commit: r1214029 [1/2] - in /pig/trunk: ./ src/org/apache/pig/builtin/
Date Wed, 14 Dec 2011 01:54:09 GMT
Author: dvryaboy
Date: Wed Dec 14 01:54:08 2011
New Revision: 1214029

URL: http://svn.apache.org/viewvc?rev=1214029&view=rev
Log:
PIG-2403: Reduce code duplication in SUM, MAX, MIN udfs

Added:
    pig/trunk/src/org/apache/pig/builtin/AlgebraicByteArrayMathBase.java
    pig/trunk/src/org/apache/pig/builtin/AlgebraicDoubleMathBase.java
    pig/trunk/src/org/apache/pig/builtin/AlgebraicFloatMathBase.java
    pig/trunk/src/org/apache/pig/builtin/AlgebraicIntMathBase.java
    pig/trunk/src/org/apache/pig/builtin/AlgebraicLongMathBase.java
    pig/trunk/src/org/apache/pig/builtin/AlgebraicMathBase.java
Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/builtin/DoubleAbs.java
    pig/trunk/src/org/apache/pig/builtin/DoubleMax.java
    pig/trunk/src/org/apache/pig/builtin/DoubleMin.java
    pig/trunk/src/org/apache/pig/builtin/DoubleSum.java
    pig/trunk/src/org/apache/pig/builtin/FloatMax.java
    pig/trunk/src/org/apache/pig/builtin/FloatMin.java
    pig/trunk/src/org/apache/pig/builtin/FloatSum.java
    pig/trunk/src/org/apache/pig/builtin/IntMax.java
    pig/trunk/src/org/apache/pig/builtin/IntMin.java
    pig/trunk/src/org/apache/pig/builtin/IntSum.java
    pig/trunk/src/org/apache/pig/builtin/LongMax.java
    pig/trunk/src/org/apache/pig/builtin/LongMin.java
    pig/trunk/src/org/apache/pig/builtin/LongSum.java
    pig/trunk/src/org/apache/pig/builtin/MAX.java
    pig/trunk/src/org/apache/pig/builtin/MIN.java
    pig/trunk/src/org/apache/pig/builtin/SUM.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1214029&r1=1214028&r2=1214029&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Wed Dec 14 01:54:08 2011
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
 
 IMPROVEMENTS
 
+PIG-2403: Reduce code duplication in SUM, MAX, MIN udfs (dvryaboy)
+
 PIG-2245: Add end to end test for tokenize (markroddy via gates)
 
 PIG-2327: bin/pig doesn't have any hooks for picking up ZK installation deployed from tarballs (rvs via hashutosh)

Added: pig/trunk/src/org/apache/pig/builtin/AlgebraicByteArrayMathBase.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/AlgebraicByteArrayMathBase.java?rev=1214029&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/AlgebraicByteArrayMathBase.java (added)
+++ pig/trunk/src/org/apache/pig/builtin/AlgebraicByteArrayMathBase.java Wed Dec 14 01:54:08 2011
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.builtin;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.pig.Accumulator;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.PigException;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+
+/**
+ * Core logic for applying an accumulative/algebraic math function to a
+ * bag of doubles.
+ */
+public abstract class AlgebraicByteArrayMathBase extends AlgebraicMathBase<Double> implements Accumulator<Double> {
+
+    protected static Double getSeed(KNOWN_OP op) {
+        switch (op) {
+        case SUM: return 0.0;
+        case MIN: return Double.POSITIVE_INFINITY;
+        case MAX: return Double.NEGATIVE_INFINITY;
+        default: return null;
+        }
+    }
+
+    private static Double doWork(Double arg1, Double arg2, KNOWN_OP op) {
+        if (arg1 == null) {
+            return arg2;
+        } else if (arg2 == null) {
+            return arg1;
+        } else {
+            switch (op) {
+            case MAX: return Math.max(arg1, arg2);
+            case MIN: return Math.min(arg1, arg2);
+            case SUM: return arg1 + arg2;
+            default: return null;
+            }
+        }
+    }
+
+    protected static Double doTupleWork(Tuple input, KnownOpProvider opProvider, byte expectedType)
+            throws ExecException {
+        DataBag values = (DataBag)input.get(0);
+        // if we were handed an empty bag, return NULL
+        // this is in compliance with SQL standard
+        if(values.size() == 0) {
+            return null;
+        }
+        double sofar = AlgebraicByteArrayMathBase.getSeed(opProvider.getOp());
+        boolean sawNonNull = false;
+        for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
+            Tuple t = it.next();
+            try {
+                Double d;
+                switch (expectedType) {
+                case DataType.BYTEARRAY:
+                    DataByteArray dba = (DataByteArray)t.get(0);
+                    d = dba != null ? Double.valueOf(dba.toString()): null;
+                    break;
+                case DataType.DOUBLE:
+                    d = (Double) t.get(0);
+                    break;
+                default:
+                    throw new ExecException("Unexpected type in AlgebraicByteArrayMath "
+                            + DataType.findTypeName(expectedType));
+                }
+                if (d == null) continue;
+                sawNonNull = true;
+                sofar = doWork(sofar, d, opProvider.getOp());
+            }catch(RuntimeException exp) {
+                int errCode = 2103;
+                throw new ExecException("Problem doing work on Doubles", errCode, PigException.BUG, exp);
+            }
+        }
+        return sawNonNull ? sofar : null;
+    }
+
+    @Override
+    public Double exec(Tuple input) throws IOException {
+        try {
+            return doTupleWork(input, opProvider, DataType.BYTEARRAY);
+        } catch (ExecException ee) {
+            throw ee;
+        } catch (Exception e) {
+            int errCode = 2106;
+            throw new ExecException("Error executing function on Doubles", errCode, PigException.BUG, e);
+        }
+    }
+
+    static public abstract class Intermediate extends AlgebraicMathBase.Intermediate {
+        private static TupleFactory tfact = TupleFactory.getInstance();
+
+        @Override
+        public Tuple exec(Tuple input) throws IOException {
+            try {
+                return tfact.newTuple(doTupleWork(input, this, DataType.DOUBLE));
+            } catch (ExecException ee) {
+                throw ee;
+            } catch (Exception e) {
+                int errCode = 2106;
+                throw new ExecException("Error executing function on Doubles", errCode, PigException.BUG, e);
+            }
+        }
+    }
+
+    @Override
+    public String getInitial() {
+        return Initial.class.getName();
+    }
+
+    static public class Initial extends EvalFunc<Tuple> {
+        private static TupleFactory tfact = TupleFactory.getInstance();
+
+        @Override
+        public Tuple exec(Tuple input) throws IOException {
+            // Initial is called in the map.
+            // we just send the tuple down
+            try {
+                // input is a bag with one tuple containing
+                // the column we are trying to operate on
+                DataBag bg = (DataBag) input.get(0);
+                if (bg.iterator().hasNext()) {
+                    DataByteArray dba = (DataByteArray) bg.iterator().next().get(0);
+                    Double d = dba != null ? Double.valueOf(dba.toString()): null;
+                    return tfact.newTuple(d);
+                } else {
+                    // make sure that we call the object constructor, not the list constructor
+                    return tfact.newTuple((Object) null);
+                }
+            } catch (ExecException e) {
+                throw e;
+            } catch (Exception e) {
+                int errCode = 2106;
+                throw new ExecException("Error executing an algebraic function", errCode, PigException.BUG, e);
+            }
+        }
+    }
+
+    static public abstract class Final extends AlgebraicMathBase.Final<Double> {
+        @Override
+        public Double exec(Tuple input) throws IOException {
+            try {
+                return doTupleWork(input, this, DataType.DOUBLE);
+            } catch (ExecException ee) {
+                throw ee;
+            } catch (Exception e) {
+                int errCode = 2106;
+                throw new ExecException("Error executing function on Doubles", errCode, PigException.BUG, e);
+            }
+        }
+    }
+
+    @Override
+    public Schema outputSchema(Schema input) {
+        return new Schema(new Schema.FieldSchema(null, DataType.DOUBLE));
+    }
+
+    /* Accumulator interface implementation*/
+    private Double intermediateVal = null;
+
+    @Override
+    public void accumulate(Tuple b) throws IOException {
+        try {
+            Double curVal = doTupleWork(b, opProvider, DataType.BYTEARRAY);
+            if (curVal == null) {
+                return;
+            }
+            if (intermediateVal == null) {
+                intermediateVal = getSeed(opProvider.getOp());
+            }
+            intermediateVal = doWork(intermediateVal, curVal, opProvider.getOp());
+        } catch (ExecException ee) {
+            throw ee;
+        } catch (Exception e) {
+            int errCode = 2106;
+            throw new ExecException("Error executing function on Doubles", errCode, PigException.BUG, e);
+        }
+    }
+
+    @Override
+    public void cleanup() {
+        intermediateVal = null;
+    }
+
+    @Override
+    public Double getValue() {
+        return intermediateVal;
+    }
+}

Added: pig/trunk/src/org/apache/pig/builtin/AlgebraicDoubleMathBase.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/AlgebraicDoubleMathBase.java?rev=1214029&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/AlgebraicDoubleMathBase.java (added)
+++ pig/trunk/src/org/apache/pig/builtin/AlgebraicDoubleMathBase.java Wed Dec 14 01:54:08 2011
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.builtin;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.pig.Accumulator;
+import org.apache.pig.PigException;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+
+/**
+ * Core logic for applying an accumulative/algebraic math function to a
+ * bag of doubles.
+ */
+public abstract class AlgebraicDoubleMathBase extends AlgebraicMathBase<Double> implements Accumulator<Double> {
+
+    protected static Double getSeed(KNOWN_OP op) {
+        switch (op) {
+        case SUM: return 0.0;
+        case MIN: return Double.POSITIVE_INFINITY;
+        case MAX: return Double.NEGATIVE_INFINITY;
+        default: return null;
+        }
+    }
+
+    private static Double doWork(Double arg1, Double arg2, KNOWN_OP op) {
+        if (arg1 == null) {
+            return arg2;
+        } else if (arg2 == null) {
+            return arg1;
+        } else {
+            switch (op) {
+            case MAX: return Math.max(arg1, arg2);
+            case MIN: return Math.min(arg1, arg2);
+            case SUM: return arg1 + arg2;
+            default: return null;
+            }
+        }
+    }
+
+    protected static Double doTupleWork(Tuple input, KnownOpProvider opProvider) throws ExecException {
+        DataBag values = (DataBag)input.get(0);
+        // if we were handed an empty bag, return NULL
+        // this is in compliance with SQL standard
+        if(values.size() == 0) {
+            return null;
+        }
+        double sofar = AlgebraicDoubleMathBase.getSeed(opProvider.getOp());
+        boolean sawNonNull = false;
+        for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
+            Tuple t = it.next();
+            try {
+                Number n = (Number)(t.get(0));
+                if (n == null) continue;
+                Double d = n.doubleValue();
+                sawNonNull = true;
+                sofar = doWork(sofar, d, opProvider.getOp());
+            }catch(RuntimeException exp) {
+                int errCode = 2103;
+                throw new ExecException("Problem doing work on Doubles", errCode, PigException.BUG, exp);
+            }
+        }
+        return sawNonNull ? sofar : null;
+    }
+
+    @Override
+    public Double exec(Tuple input) throws IOException {
+        try {
+            return doTupleWork(input, opProvider);
+        } catch (ExecException ee) {
+            throw ee;
+        } catch (Exception e) {
+            int errCode = 2106;
+            throw new ExecException("Error executing function on Doubles", errCode, PigException.BUG, e);
+        }
+    }
+
+    static public abstract class Intermediate extends AlgebraicMathBase.Intermediate {
+        private static TupleFactory tfact = TupleFactory.getInstance();
+
+        @Override
+        public Tuple exec(Tuple input) throws IOException {
+            try {
+                return tfact.newTuple(doTupleWork(input, this));
+            } catch (ExecException ee) {
+                throw ee;
+            } catch (Exception e) {
+                int errCode = 2106;
+                throw new ExecException("Error executing function on Doubles", errCode, PigException.BUG, e);
+            }
+        }
+    }
+
+    static public abstract class Final extends AlgebraicMathBase.Final<Double> {
+        @Override
+        public Double exec(Tuple input) throws IOException {
+            try {
+                return doTupleWork(input, this);
+            } catch (ExecException ee) {
+                throw ee;
+            } catch (Exception e) {
+                int errCode = 2106;
+                throw new ExecException("Error executing function on Doubles", errCode, PigException.BUG, e);
+            }
+        }
+    }
+
+    @Override
+    public Schema outputSchema(Schema input) {
+        return new Schema(new Schema.FieldSchema(null, DataType.DOUBLE));
+    }
+
+    /* Accumulator interface implementation*/
+    private Double intermediateVal = null;
+
+    @Override
+    public void accumulate(Tuple b) throws IOException {
+        try {
+            Double curVal = doTupleWork(b, opProvider);
+            if (curVal == null) {
+                return;
+            }
+            if (intermediateVal == null) {
+                intermediateVal = getSeed(opProvider.getOp());
+            }
+            intermediateVal = doWork(intermediateVal, curVal, opProvider.getOp());
+        } catch (ExecException ee) {
+            throw ee;
+        } catch (Exception e) {
+            int errCode = 2106;
+            throw new ExecException("Error executing function on Doubles", errCode, PigException.BUG, e);
+        }
+    }
+
+    @Override
+    public void cleanup() {
+        intermediateVal = null;
+    }
+
+    @Override
+    public Double getValue() {
+        return intermediateVal;
+    }
+}

Added: pig/trunk/src/org/apache/pig/builtin/AlgebraicFloatMathBase.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/AlgebraicFloatMathBase.java?rev=1214029&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/AlgebraicFloatMathBase.java (added)
+++ pig/trunk/src/org/apache/pig/builtin/AlgebraicFloatMathBase.java Wed Dec 14 01:54:08 2011
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.builtin;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.pig.Accumulator;
+import org.apache.pig.PigException;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+
+/**
+ * Core logic for applying an accumulative/algebraic math function to a
+ * bag of Floats.
+ */
+public abstract class AlgebraicFloatMathBase extends AlgebraicMathBase<Float> implements Accumulator<Float> {
+
+    protected static Float getSeed(KNOWN_OP op) {
+        switch (op) {
+        case SUM: return 0.0f;
+        case MIN: return Float.POSITIVE_INFINITY;
+        case MAX: return Float.NEGATIVE_INFINITY;
+        default: return null;
+        }
+    }
+
+    private static Float doWork(Float arg1, Float arg2, KNOWN_OP op) {
+        if (arg1 == null) {
+            return arg2;
+        } else if (arg2 == null) {
+            return arg1;
+        } else {
+            switch (op) {
+            case MAX: return Math.max(arg1, arg2);
+            case MIN: return Math.min(arg1, arg2);
+            case SUM: return arg1 + arg2;
+            default: return null;
+            }
+        }
+    }
+
+    protected static Float doTupleWork(Tuple input, KnownOpProvider opProvider) throws ExecException {
+        DataBag values = (DataBag)input.get(0);
+        // if we were handed an empty bag, return NULL
+        // this is in compliance with SQL standard
+        if(values.size() == 0) {
+            return null;
+        }
+        Float sofar = AlgebraicFloatMathBase.getSeed(opProvider.getOp());
+        boolean sawNonNull = false;
+        for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
+            Tuple t = it.next();
+            try {
+                Float d = (Float)(t.get(0));
+                if (d == null) continue;
+                sawNonNull = true;
+                sofar = doWork(sofar, d, opProvider.getOp());
+            }catch(RuntimeException exp) {
+                int errCode = 2103;
+                throw new ExecException("Problem doing work on Floats", errCode, PigException.BUG, exp);
+            }
+        }
+        return sawNonNull ? sofar : null;
+    }
+
+    @Override
+    public Float exec(Tuple input) throws IOException {
+        try {
+            return doTupleWork(input, opProvider);
+        } catch (ExecException ee) {
+            throw ee;
+        } catch (Exception e) {
+            int errCode = 2106;
+            throw new ExecException("Error executing function on Floats", errCode, PigException.BUG, e);
+        }
+    }
+
+    static public abstract class Intermediate extends AlgebraicMathBase.Intermediate {
+        private static TupleFactory tfact = TupleFactory.getInstance();
+
+        @Override
+        public Tuple exec(Tuple input) throws IOException {
+            try {
+                return tfact.newTuple(doTupleWork(input, this));
+            } catch (ExecException ee) {
+                throw ee;
+            } catch (Exception e) {
+                int errCode = 2106;
+                throw new ExecException("Error executing function on Floats", errCode, PigException.BUG, e);
+            }
+        }
+    }
+
+    static public abstract class Final extends AlgebraicMathBase.Final<Float> {
+        @Override
+        public Float exec(Tuple input) throws IOException {
+            try {
+                return doTupleWork(input, this);
+            } catch (ExecException ee) {
+                throw ee;
+            } catch (Exception e) {
+                int errCode = 2106;
+                throw new ExecException("Error executing function on Floats", errCode, PigException.BUG, e);
+            }
+        }
+    }
+
+    @Override
+    public Schema outputSchema(Schema input) {
+        return new Schema(new Schema.FieldSchema(null, DataType.FLOAT));
+    }
+
+    /* Accumulator interface implementation*/
+    private Float intermediateVal = null;
+
+    @Override
+    public void accumulate(Tuple b) throws IOException {
+        try {
+            Float curVal = doTupleWork(b, opProvider);
+            if (curVal == null) {
+                return;
+            }
+            if (intermediateVal == null) {
+                intermediateVal = getSeed(opProvider.getOp());
+            }
+            intermediateVal = doWork(intermediateVal, curVal, opProvider.getOp());
+        } catch (ExecException ee) {
+            throw ee;
+        } catch (Exception e) {
+            int errCode = 2106;
+            throw new ExecException("Error executing function on Floats", errCode, PigException.BUG, e);
+        }
+    }
+
+    @Override
+    public void cleanup() {
+        intermediateVal = null;
+    }
+
+    @Override
+    public Float getValue() {
+        return intermediateVal;
+    }
+}

Added: pig/trunk/src/org/apache/pig/builtin/AlgebraicIntMathBase.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/AlgebraicIntMathBase.java?rev=1214029&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/AlgebraicIntMathBase.java (added)
+++ pig/trunk/src/org/apache/pig/builtin/AlgebraicIntMathBase.java Wed Dec 14 01:54:08 2011
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.builtin;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.pig.Accumulator;
+import org.apache.pig.PigException;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+
+/**
+ * Core logic for applying an accumulative/algebraic math function to a
+ * bag of doubles.
+ */
+public abstract class AlgebraicIntMathBase extends AlgebraicMathBase<Integer> implements Accumulator<Integer> {
+
+    protected static Integer getSeed(KNOWN_OP op) {
+        switch (op) {
+        case SUM: return 0;
+        case MIN: return Integer.MAX_VALUE;
+        case MAX: return Integer.MIN_VALUE;
+        default: return null;
+        }
+    }
+
+    private static Integer doWork(Integer arg1, Integer arg2, KNOWN_OP op) {
+        if (arg1 == null) {
+            return arg2;
+        } else if (arg2 == null) {
+            return arg1;
+        } else {
+            switch (op) {
+            case MAX: return Math.max(arg1, arg2);
+            case MIN: return Math.min(arg1, arg2);
+            case SUM: return arg1 + arg2;
+            default: return null;
+            }
+        }
+    }
+
+    protected static Integer doTupleWork(Tuple input, KnownOpProvider opProvider) throws ExecException {
+        DataBag values = (DataBag)input.get(0);
+        // if we were handed an empty bag, return NULL
+        // this is in compliance with SQL standard
+        if(values.size() == 0) {
+            return null;
+        }
+        int sofar = AlgebraicIntMathBase.getSeed(opProvider.getOp());
+        boolean sawNonNull = false;
+        for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
+            Tuple t = it.next();
+            try {
+                Integer d = (Integer)(t.get(0));
+                if (d == null) continue;
+                sawNonNull = true;
+                sofar = doWork(sofar, d, opProvider.getOp());
+            }catch(RuntimeException exp) {
+                int errCode = 2103;
+                throw new ExecException("Problem doing work on Doubles", errCode, PigException.BUG, exp);
+            }
+        }
+        return sawNonNull ? sofar : null;
+    }
+
+    @Override
+    public Integer exec(Tuple input) throws IOException {
+        try {
+            return doTupleWork(input, opProvider);
+        } catch (ExecException ee) {
+            throw ee;
+        } catch (Exception e) {
+            int errCode = 2106;
+            throw new ExecException("Error executing function on Doubles", errCode, PigException.BUG, e);
+        }
+    }
+
+
+    static public abstract class Intermediate extends AlgebraicMathBase.Intermediate {
+        private static TupleFactory tfact = TupleFactory.getInstance();
+
+        @Override
+        public Tuple exec(Tuple input) throws IOException {
+            try {
+                return tfact.newTuple(doTupleWork(input, this));
+            } catch (ExecException ee) {
+                throw ee;
+            } catch (Exception e) {
+                int errCode = 2106;
+                throw new ExecException("Error executing function ", errCode, PigException.BUG, e);
+            }
+
+        }
+    }
+    static public abstract class Final extends AlgebraicMathBase.Final<Integer> {
+        @Override
+        public Integer exec(Tuple input) throws IOException {
+            try {
+                return doTupleWork(input, this);
+            } catch (ExecException ee) {
+                throw ee;
+            } catch (Exception e) {
+                int errCode = 2106;
+                throw new ExecException("Error executing function ", errCode, PigException.BUG, e);
+            }
+        }
+    }
+
+    @Override
+    public Schema outputSchema(Schema input) {
+        return new Schema(new Schema.FieldSchema(null, DataType.INTEGER));
+    }
+
+    /* Accumulator interface implementation*/
+    private Integer intermediateVal = null;
+
+    @Override
+    public void accumulate(Tuple b) throws IOException {
+        try {
+            Integer curVal = doTupleWork(b, opProvider);
+            if (curVal == null) {
+                return;
+            }
+            if (intermediateVal == null) {
+                intermediateVal = getSeed(opProvider.getOp());
+            }
+            intermediateVal = doWork(intermediateVal, curVal, opProvider.getOp());
+        } catch (ExecException ee) {
+            throw ee;
+        } catch (Exception e) {
+            int errCode = 2106;
+            throw new ExecException("Error executing function on Doubles", errCode, PigException.BUG, e);
+        }
+    }
+
+    @Override
+    public void cleanup() {
+        intermediateVal = null;
+    }
+
+    @Override
+    public Integer getValue() {
+        return intermediateVal;
+    }
+
+}

Added: pig/trunk/src/org/apache/pig/builtin/AlgebraicLongMathBase.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/AlgebraicLongMathBase.java?rev=1214029&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/AlgebraicLongMathBase.java (added)
+++ pig/trunk/src/org/apache/pig/builtin/AlgebraicLongMathBase.java Wed Dec 14 01:54:08 2011
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.builtin;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.pig.Accumulator;
+import org.apache.pig.PigException;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+
+/**
+ * Core logic for applying an accumulative/algebraic math function to a
+ * bag of Longs.
+ */
+public abstract class AlgebraicLongMathBase extends AlgebraicMathBase<Long> implements Accumulator<Long> {
+
+    protected static Long getSeed(KNOWN_OP op) {
+        switch (op) {
+        case SUM: return 0L;
+        case MIN: return Long.MAX_VALUE;
+        case MAX: return Long.MIN_VALUE;
+        default: return null;
+        }
+    }
+
+    private static Long doWork(Long arg1, Long arg2, KNOWN_OP op) {
+        if (arg1 == null) {
+            return arg2;
+        } else if (arg2 == null) {
+            return arg1;
+        } else {
+            switch (op) {
+            case MAX: return Math.max(arg1, arg2);
+            case MIN: return Math.min(arg1, arg2);
+            case SUM: return arg1 + arg2;
+            default: return null;
+            }
+        }
+    }
+
+    protected static Long doTupleWork(Tuple input, KnownOpProvider opProvider) throws ExecException {
+        DataBag values = (DataBag)input.get(0);
+        // if we were handed an empty bag, return NULL
+        // this is in compliance with SQL standard
+        if(values.size() == 0) {
+            return null;
+        }
+        Long sofar = AlgebraicLongMathBase.getSeed(opProvider.getOp());
+        boolean sawNonNull = false;
+        for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
+            Tuple t = it.next();
+            try {
+                // Note: use Number.longValue() instead of casting to Long
+                // to allow Integers, too.
+                Number n = ((Number)(t.get(0)));
+                if (n == null) continue;
+                Long d = n.longValue();
+                sawNonNull = true;
+                sofar = doWork(sofar, d, opProvider.getOp());
+            }catch(RuntimeException exp) {
+                int errCode = 2103;
+                throw new ExecException("Problem doing work on Longs", errCode, PigException.BUG, exp);
+            }
+        }
+        return sawNonNull ? sofar : null;
+    }
+
+    @Override
+    public Long exec(Tuple input) throws IOException {
+        try {
+            return doTupleWork(input, opProvider);
+        } catch (ExecException ee) {
+            throw ee;
+        } catch (Exception e) {
+            int errCode = 2106;
+            throw new ExecException("Error executing function on Longs", errCode, PigException.BUG, e);
+        }
+    }
+
+    static public abstract class Intermediate extends AlgebraicMathBase.Intermediate {
+        private static TupleFactory tfact = TupleFactory.getInstance();
+
+        @Override
+        public Tuple exec(Tuple input) throws IOException {
+            try {
+                return tfact.newTuple(doTupleWork(input, this));
+            } catch (ExecException ee) {
+                throw ee;
+            } catch (Exception e) {
+                int errCode = 2106;
+                throw new ExecException("Error executing function on Longs", errCode, PigException.BUG, e);
+            }
+        }
+    }
+
+    static public abstract class Final extends AlgebraicMathBase.Final<Long> {
+        @Override
+        public Long exec(Tuple input) throws IOException {
+            try {
+                return doTupleWork(input, this);
+            } catch (ExecException ee) {
+                throw ee;
+            } catch (Exception e) {
+                int errCode = 2106;
+                throw new ExecException("Error executing function on Longs", errCode, PigException.BUG, e);
+            }
+        }
+    }
+
+    @Override
+    public Schema outputSchema(Schema input) {
+        return new Schema(new Schema.FieldSchema(null, DataType.LONG));
+    }
+
+    /* Accumulator interface implementation*/
+    private Long intermediateVal = null;
+
+    @Override
+    public void accumulate(Tuple b) throws IOException {
+        try {
+            Long curVal = doTupleWork(b, opProvider);
+            if (curVal == null) {
+                return;
+            }
+            if (intermediateVal == null) {
+                intermediateVal = getSeed(opProvider.getOp());
+            }
+            intermediateVal = doWork(intermediateVal, curVal, opProvider.getOp());
+        } catch (ExecException ee) {
+            throw ee;
+        } catch (Exception e) {
+            int errCode = 2106;
+            throw new ExecException("Error executing function on Longs", errCode, PigException.BUG, e);
+        }
+    }
+
+    @Override
+    public void cleanup() {
+        intermediateVal = null;
+    }
+
+    @Override
+    public Long getValue() {
+        return intermediateVal;
+    }
+}

Added: pig/trunk/src/org/apache/pig/builtin/AlgebraicMathBase.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/AlgebraicMathBase.java?rev=1214029&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/AlgebraicMathBase.java (added)
+++ pig/trunk/src/org/apache/pig/builtin/AlgebraicMathBase.java Wed Dec 14 01:54:08 2011
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.builtin;
+
+import java.io.IOException;
+
+import org.apache.pig.Algebraic;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.PigException;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+
+/**
+ * The purpose of this class is to hold some of the common code shared among
+ * the typed Basic*Funcs (BasicDoubleFunc, BasicIntegerFunc, etc).
+ */
+abstract class AlgebraicMathBase<T> extends EvalFunc<T> implements Algebraic {
+
+    protected static enum KNOWN_OP {
+        SUM, MIN, MAX;
+    }
+
+    protected interface KnownOpProvider {
+        public KNOWN_OP getOp();
+    }
+
+    protected KnownOpProvider opProvider;
+
+    protected void setOp(final KNOWN_OP op) {
+        opProvider = new KnownOpProvider() {
+            @Override
+            public KNOWN_OP getOp() {
+                return op;
+            }};
+    }
+
+    @Override
+    public String getInitial() {
+        return Initial.class.getName();
+    }
+
+
+    static public class Initial extends EvalFunc<Tuple> {
+        private static TupleFactory tfact = TupleFactory.getInstance();
+
+        @Override
+        public Tuple exec(Tuple input) throws IOException {
+            // Initial is called in the map.
+            // we just send the tuple down
+            try {
+                // input is a bag with one tuple containing
+                // the column we are trying to operate on
+                DataBag bg = (DataBag) input.get(0);
+                if (bg.iterator().hasNext()) {
+                    return bg.iterator().next();
+                } else {
+                    // make sure that we call the object constructor, not the list constructor
+                    return tfact.newTuple((Object) null);
+                }
+            } catch (ExecException e) {
+                throw e;
+            } catch (Exception e) {
+                int errCode = 2106;
+                throw new ExecException("Error executing an algebraic function", errCode, PigException.BUG, e);
+            }
+        }
+    }
+
+    @Override
+    public String getIntermed() {
+        for (@SuppressWarnings("rawtypes")
+        Class c : this.getClass().getDeclaredClasses()) {
+            if (Intermediate.class.isAssignableFrom(c)) {
+                return c.getName();
+            }
+        }
+        // Try inheritance
+        for (@SuppressWarnings("rawtypes")
+        Class c : this.getClass().getClasses()) {
+            if (Intermediate.class.isAssignableFrom(c)) {
+                return c.getName();
+            }
+        }
+        return null;
+    }
+
+    @Override
+    public String getFinal() {
+        for (@SuppressWarnings("rawtypes")
+        Class c : this.getClass().getDeclaredClasses()) {
+            if (Final.class.isAssignableFrom(c)) {
+                return c.getName();
+            }
+        }
+        for (@SuppressWarnings("rawtypes")
+        Class c : this.getClass().getClasses()) {
+            if (Final.class.isAssignableFrom(c)) {
+                return c.getName();
+            }
+        }
+        return null;
+    }
+
+    static public abstract class Intermediate extends EvalFunc<Tuple> implements KnownOpProvider {
+    }
+    static public abstract class Final<T> extends EvalFunc<T> implements KnownOpProvider {
+    }
+
+}

Modified: pig/trunk/src/org/apache/pig/builtin/DoubleAbs.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/DoubleAbs.java?rev=1214029&r1=1214028&r2=1214029&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/DoubleAbs.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/DoubleAbs.java Wed Dec 14 01:54:08 2011
@@ -18,35 +18,10 @@
 
 package org.apache.pig.builtin;
 
-import java.io.IOException;
-
-import org.apache.pig.EvalFunc;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.data.DataType;
-
-public class DoubleAbs extends EvalFunc<Double>{
-	/**
-	 * java level API
-	 * @param input expects a single numeric value
-	 * @return output returns a single numeric value, absolute value of the argument
-	 */
-	public Double exec(Tuple input) throws IOException {
-        if (input == null || input.size() == 0)
-            return null;
-
-        Double d;
-        try{
-            d = (Double)input.get(0);
-        } catch (Exception e){
-            throw new IOException("Caught exception processing input row ", e);
-        }
-
-		return Math.abs(d);
-	}
+public class DoubleAbs extends DoubleBase {
 	
 	@Override
-	public Schema outputSchema(Schema input) {
-        return new Schema(new Schema.FieldSchema(getSchemaName(this.getClass().getName().toLowerCase(), input), DataType.DOUBLE));
+    Double compute(Double input) {
+        return Math.abs(input);
 	}
 }

Modified: pig/trunk/src/org/apache/pig/builtin/DoubleMax.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/DoubleMax.java?rev=1214029&r1=1214028&r2=1214029&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/DoubleMax.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/DoubleMax.java Wed Dec 14 01:54:08 2011
@@ -17,178 +17,27 @@
  */
 package org.apache.pig.builtin;
 
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.apache.pig.Accumulator;
-import org.apache.pig.Algebraic;
-import org.apache.pig.EvalFunc;
-import org.apache.pig.PigException;
-import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DataType;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.logicalLayer.FrontendException;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-
 /**
  * This method should never be used directly, use {@link MAX}.
  */
-public class DoubleMax extends EvalFunc<Double> implements Algebraic, Accumulator<Double> {
-
-    @Override
-    public Double exec(Tuple input) throws IOException {
-         try {
-            return max(input);
-        } catch (ExecException ee) {
-            throw ee;
-        } catch (Exception e) {
-            int errCode = 2106;
-            String msg = "Error while computing max in " + this.getClass().getSimpleName();
-            throw new ExecException(msg, errCode, PigException.BUG, e);           
-        }
-    }
-
-    public String getInitial() {
-        return Initial.class.getName();
-    }
-
-    public String getIntermed() {
-        return Intermediate.class.getName();
-    }
-
-    public String getFinal() {
-        return Final.class.getName();
-    }
+public class DoubleMax extends AlgebraicDoubleMathBase {
 
-    static public class Initial extends EvalFunc<Tuple> {
-        private static TupleFactory tfact = TupleFactory.getInstance();
-
-        @Override
-        public Tuple exec(Tuple input) throws IOException {
-            try {
-                // input is a bag with one tuple containing
-                // the column we are trying to max on
-                DataBag bg = (DataBag) input.get(0);
-                Double d = null;
-                if(bg.iterator().hasNext()) {
-                    Tuple tp = bg.iterator().next();
-                    d = (Double)(tp.get(0));
-                }
-                return tfact.newTuple(d);
-            } catch (ExecException ee) {
-                throw ee;
-            } catch (Exception e) {
-                int errCode = 2106;
-                String msg = "Error while computing max in " + this.getClass().getSimpleName();
-                throw new ExecException(msg, errCode, PigException.BUG, e);            
-            }
-        }
+    public DoubleMax() {
+        setOp(KNOWN_OP.MAX);
     }
 
-    static public class Intermediate extends EvalFunc<Tuple> {
-        private static TupleFactory tfact = TupleFactory.getInstance();
-
-        @Override
-        public Tuple exec(Tuple input) throws IOException {
-            try {
-                return tfact.newTuple(max(input));
-            } catch (ExecException ee) {
-                throw ee;
-            } catch (Exception e) {
-                int errCode = 2106;
-                String msg = "Error while computing max in " + this.getClass().getSimpleName();
-                throw new ExecException(msg, errCode, PigException.BUG, e);           
-            }
-        }
-    }
-    static public class Final extends EvalFunc<Double> {
-        @Override
-        public Double exec(Tuple input) throws IOException {
-            try {
-                return max(input);
-            } catch (ExecException ee) {
-                throw ee;
-            } catch (Exception e) {
-                int errCode = 2106;
-                String msg = "Error while computing max in " + this.getClass().getSimpleName();
-                throw new ExecException(msg, errCode, PigException.BUG, e);           
-            }
-        }
-    }
-
-    static protected Double max(Tuple input) throws ExecException {
-        
-        DataBag values = (DataBag)input.get(0);
-
-        // if we were handed an empty bag, return NULL
-        // this is in compliance with SQL standard
-        if(values.size() == 0) {
-            return null;
-        }
-
-        double curMax = Double.NEGATIVE_INFINITY;
-        boolean sawNonNull = false;
-        for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
-            Tuple t = it.next();
-            try {
-                Double d = (Double)(t.get(0));
-                if (d == null) continue;
-                sawNonNull = true;
-                curMax = java.lang.Math.max(curMax, d);
-            } catch (RuntimeException exp) {
-                int errCode = 2103;
-                String msg = "Problem while computing max of doubles.";
-                throw new ExecException(msg, errCode, PigException.BUG, exp);
-            }
-        }
-
-        if(sawNonNull) {
-            return new Double(curMax);
-        } else {
-            return null;
-        }
-    }
-
-    @Override
-    public Schema outputSchema(Schema input) {
-        return new Schema(new Schema.FieldSchema(null, DataType.DOUBLE)); 
-    }
-
-    /* Accumulator interface */
-    
-    private Double intermediateMax = null;
-    
+    public static class Intermediate extends AlgebraicDoubleMathBase.Intermediate {
     @Override
-    public void accumulate(Tuple b) throws IOException {
-        try {
-            Double curMax = max(b);
-            if (curMax == null) {
-                return;
-            }
-            /* if bag is not null, initialize intermediateMax to negative infinity */
-            if (intermediateMax == null) {
-                intermediateMax = Double.NEGATIVE_INFINITY;
-            }
-            intermediateMax = java.lang.Math.max(intermediateMax, curMax);
-        } catch (ExecException ee) {
-            throw ee;
-        } catch (Exception e) {
-            int errCode = 2106;
-            String msg = "Error while computing max in " + this.getClass().getSimpleName();
-            throw new ExecException(msg, errCode, PigException.BUG, e);           
+        public KNOWN_OP getOp() {
+            return KNOWN_OP.MAX;
         }
     }
 
+    public static class Final extends AlgebraicDoubleMathBase.Final {
     @Override
-    public void cleanup() {
-        intermediateMax = null;
+        public KNOWN_OP getOp() {
+            return KNOWN_OP.MAX;
     }
-
-    @Override
-    public Double getValue() {
-        return intermediateMax;
     }
     
 }

Modified: pig/trunk/src/org/apache/pig/builtin/DoubleMin.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/DoubleMin.java?rev=1214029&r1=1214028&r2=1214029&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/DoubleMin.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/DoubleMin.java Wed Dec 14 01:54:08 2011
@@ -17,174 +17,27 @@
  */
 package org.apache.pig.builtin;
 
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.apache.pig.Accumulator;
-import org.apache.pig.Algebraic;
-import org.apache.pig.EvalFunc;
-import org.apache.pig.PigException;
-import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DataType;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-
 /**
  * This method should never be used directly, use {@link MIN}.
  */
-public class DoubleMin extends EvalFunc<Double> implements Algebraic, Accumulator<Double> {
-
-    @Override
-    public Double exec(Tuple input) throws IOException {
-        try {
-            return min(input);
-        } catch (ExecException ee) {
-            throw ee;
-        } catch (Exception e) {
-            int errCode = 2106;
-            String msg = "Error while computing min in " + this.getClass().getSimpleName();
-            throw new ExecException(msg, errCode, PigException.BUG, e);           
-        }
-    }
-
-    public String getInitial() {
-        return Initial.class.getName();
-    }
-
-    public String getIntermed() {
-        return Intermediate.class.getName();
-    }
+public class DoubleMin extends AlgebraicDoubleMathBase {
 
-    public String getFinal() {
-        return Final.class.getName();
+    public DoubleMin() {
+        setOp(KNOWN_OP.MIN);
     }
 
-    static public class Initial extends EvalFunc<Tuple> {
-        private static TupleFactory tfact = TupleFactory.getInstance();
-
+    public static class Intermediate extends AlgebraicDoubleMathBase.Intermediate {
         @Override
-        public Tuple exec(Tuple input) throws IOException {
-            try {
-                // input is a bag with one tuple containing
-                // the column we are trying to min on
-                DataBag bg = (DataBag) input.get(0);
-                Double d = null;
-                if(bg.iterator().hasNext()) {
-                    Tuple tp = bg.iterator().next();
-                    d = (Double)(tp.get(0));
+        public KNOWN_OP getOp() {
+            return KNOWN_OP.MIN;
                 }
-                return tfact.newTuple(d);
-            } catch (ExecException ee) {
-                throw ee;
-            } catch (Exception e) {
-                int errCode = 2106;
-                String msg = "Error while computing min in " + this.getClass().getSimpleName();
-                throw new ExecException(msg, errCode, PigException.BUG, e);            
             }
-        }
-    }
-
-    static public class Intermediate extends EvalFunc<Tuple> {
-        private static TupleFactory tfact = TupleFactory.getInstance();
 
+    public static class Final extends AlgebraicDoubleMathBase.Final {
         @Override
-        public Tuple exec(Tuple input) throws IOException {
-            try {
-                return tfact.newTuple(min(input));
-            } catch (ExecException ee) {
-                throw ee;
-            } catch (Exception e) {
-                int errCode = 2106;
-                String msg = "Error while computing min in " + this.getClass().getSimpleName();
-                throw new ExecException(msg, errCode, PigException.BUG, e);            
-            }
-        }
-    }
-    static public class Final extends EvalFunc<Double> {
-        @Override
-        public Double exec(Tuple input) throws IOException {
-            try {
-                return min(input);
-            } catch (ExecException ee) {
-                throw ee;
-            } catch (Exception e) {
-                int errCode = 2106;
-                String msg = "Error while computing min in " + this.getClass().getSimpleName();
-                throw new ExecException(msg, errCode, PigException.BUG, e);            
+        public KNOWN_OP getOp() {
+            return KNOWN_OP.MIN;
             }
         }
-    }
-
-    static protected Double min(Tuple input) throws ExecException {
-        DataBag values = (DataBag)input.get(0);
-
-        // if we were handed an empty bag, return NULL
-        // this is in compliance with SQL standard
-        if(values.size() == 0) {
-            return null;
-        }
-
-        double curMin = Double.POSITIVE_INFINITY;
-        boolean sawNonNull = false;
-        for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
-            Tuple t = it.next();
-            try {
-                Double d = (Double)(t.get(0));
-                if (d == null) continue;
-                sawNonNull = true;
-                curMin = java.lang.Math.min(curMin, d);
-            } catch (RuntimeException exp) {
-                int errCode = 2103;
-                String msg = "Problem while computing min of doubles.";
-                throw new ExecException(msg, errCode, PigException.BUG, exp);
-            }
-        }
-    
-        if(sawNonNull) {
-            return new Double(curMin);
-        } else {
-            return null;
-        }
-    }
-
-    @Override
-    public Schema outputSchema(Schema input) {
-        return new Schema(new Schema.FieldSchema(null, DataType.DOUBLE)); 
-    }
-    
-    /* Accumulator interface implementation */
-    private Double intermediateMin = null;
-    
-    @Override
-    public void accumulate(Tuple b) throws IOException {
-        try {
-            Double curMin = min(b);
-            if (curMin == null) {
-                return;
-            }
-            /* if bag is not null, initialize intermediateMax to negative infinity */
-            if (intermediateMin == null) {
-                intermediateMin = Double.POSITIVE_INFINITY;
-            }
-            intermediateMin = java.lang.Math.min(intermediateMin, curMin);
-        } catch (ExecException ee) {
-            throw ee;
-        } catch (Exception e) {
-            int errCode = 2106;
-            String msg = "Error while computing min in " + this.getClass().getSimpleName();
-            throw new ExecException(msg, errCode, PigException.BUG, e);           
-        }
-    }
-
-    @Override
-    public void cleanup() {
-        intermediateMin = null;
-    }
 
-    @Override
-    public Double getValue() {
-        return intermediateMin;
-    }    
 }

Modified: pig/trunk/src/org/apache/pig/builtin/DoubleSum.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/DoubleSum.java?rev=1214029&r1=1214028&r2=1214029&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/DoubleSum.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/DoubleSum.java Wed Dec 14 01:54:08 2011
@@ -17,177 +17,27 @@
  */
 package org.apache.pig.builtin;
 
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Iterator;
-
-import org.apache.pig.Accumulator;
-import org.apache.pig.Algebraic;
-import org.apache.pig.EvalFunc;
-import org.apache.pig.FuncSpec;
-import org.apache.pig.PigException;
-import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DataType;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.logicalLayer.FrontendException;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-
-
 /**
  * This method should never be used directly, use {@link SUM}.
  */
-public class DoubleSum extends EvalFunc<Double> implements Algebraic, Accumulator<Double> {
-
-    @Override
-    public Double exec(Tuple input) throws IOException {
-        try {
-            return sum(input);
-        } catch (ExecException ee) {
-            throw ee;
-        } catch (Exception e) {
-            int errCode = 2106;
-            String msg = "Error while computing sum in " + this.getClass().getSimpleName();
-            throw new ExecException(msg, errCode, PigException.BUG, e);           
-        }
-    }
-
-    public String getInitial() {
-        return Initial.class.getName();
-    }
-
-    public String getIntermed() {
-        return Intermediate.class.getName();
-    }
-
-    public String getFinal() {
-        return Final.class.getName();
-    }
-
-    static public class Initial extends EvalFunc<Tuple> {
-        private static TupleFactory tfact = TupleFactory.getInstance();
-
-        @Override
-        public Tuple exec(Tuple input) throws IOException {
-            // Initial is called in the map - for SUM
-            // we just send the tuple down
-            try {
-                // input is a bag with one tuple containing
-                // the column we are trying to sum
-                DataBag bg = (DataBag) input.get(0);
-                Double d = null;
-                if(bg.iterator().hasNext()) {
-                    Tuple tp = bg.iterator().next();
-                    d = (Double)( tp.get(0));
-                }
-                return tfact.newTuple(d);
-            } catch (ExecException e) {
-                throw e;
-            } catch (Exception e) {
-                int errCode = 2106;
-                String msg = "Error while computing sum in " + this.getClass().getSimpleName();
-                throw new ExecException(msg, errCode, PigException.BUG, e);
-            }
-        }
-    }
-    static public class Intermediate extends EvalFunc<Tuple> {
-        private static TupleFactory tfact = TupleFactory.getInstance();
-
-        @Override
-        public Tuple exec(Tuple input) throws IOException {
-            try {
-                return tfact.newTuple(sum(input));
-            } catch (ExecException ee) {
-                throw ee;
-            } catch (Exception e) {
-                int errCode = 2106;
-                String msg = "Error while computing sum in " + this.getClass().getSimpleName();
-                throw new ExecException(msg, errCode, PigException.BUG, e);
-            }
-            
-        }
-    }
-    static public class Final extends EvalFunc<Double> {
-        @Override
-        public Double exec(Tuple input) throws IOException {
-            try {
-                return sum(input);
-            } catch (ExecException ee) {
-                throw ee;
-            } catch (Exception e) {
-                int errCode = 2106;
-                String msg = "Error while computing sum in " + this.getClass().getSimpleName();
-                throw new ExecException(msg, errCode, PigException.BUG, e);
-            }
-        }
-    }
-
-    static protected Double sum(Tuple input) throws ExecException {
-        DataBag values = (DataBag)input.get(0);
-        
-        // if we were handed an empty bag, return NULL
-        // this is in compliance with SQL standard
-        if(values.size() == 0) {
-            return null;
-        }
+public class DoubleSum extends AlgebraicDoubleMathBase {
 
-        double sum = 0;
-        boolean sawNonNull = false;
-        for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
-            Tuple t = it.next();
-            try {
-                Double d = (Double)(t.get(0));
-                if (d == null) continue;
-                sawNonNull = true;
-                sum += d;
-            }catch(RuntimeException exp) {
-                int errCode = 2103;
-                String msg = "Problem while computing sum of doubles.";
-                throw new ExecException(msg, errCode, PigException.BUG, exp);
-            }
-        }
-        
-        if(sawNonNull) {
-            return new Double(sum);
-        } else {
-            return null;
-        }
+    public DoubleSum() {
+        setOp(KNOWN_OP.SUM);
     }
 
+    public static class Intermediate extends AlgebraicDoubleMathBase.Intermediate {
     @Override
-    public Schema outputSchema(Schema input) {
-        return new Schema(new Schema.FieldSchema(null, DataType.DOUBLE)); 
-    }
-    
-    /* Accumulator interface implementation*/
-    private Double intermediateSum = null;
-    
-    @Override
-    public void accumulate(Tuple b) throws IOException {
-        try {
-            Double curSum = sum(b);
-            if (curSum == null) {
-                return;
-            }
-            intermediateSum = (intermediateSum == null ? 0.0 : intermediateSum) + curSum;
-        } catch (ExecException ee) {
-            throw ee;
-        } catch (Exception e) {
-            int errCode = 2106;
-            String msg = "Error while computing sum in " + this.getClass().getSimpleName();
-            throw new ExecException(msg, errCode, PigException.BUG, e);           
+        public KNOWN_OP getOp() {
+            return KNOWN_OP.SUM;
         }
     }
 
+    public static class Final extends AlgebraicDoubleMathBase.Final {
     @Override
-    public void cleanup() {
-        intermediateSum = null;
+        public KNOWN_OP getOp() {
+            return KNOWN_OP.SUM;
     }
-
-    @Override
-    public Double getValue() {
-        return intermediateSum;
     }    
     
 }

Modified: pig/trunk/src/org/apache/pig/builtin/FloatMax.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/FloatMax.java?rev=1214029&r1=1214028&r2=1214029&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/FloatMax.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/FloatMax.java Wed Dec 14 01:54:08 2011
@@ -17,175 +17,27 @@
  */
 package org.apache.pig.builtin;
 
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.apache.pig.Accumulator;
-import org.apache.pig.Algebraic;
-import org.apache.pig.EvalFunc;
-import org.apache.pig.PigException;
-import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DataType;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-
 /**
  * This method should never be used directly, use {@link MAX}.
  */
-public class FloatMax extends EvalFunc<Float> implements Algebraic, Accumulator<Float> {
-
-    @Override
-    public Float exec(Tuple input) throws IOException {
-         try {
-            return max(input);
-        } catch (ExecException ee) {
-            throw ee;
-        } catch (Exception e) {
-            int errCode = 2106;
-            String msg = "Error while computing max in " + this.getClass().getSimpleName();
-            throw new ExecException(msg, errCode, PigException.BUG, e);           
-        }
-    }
-
-    public String getInitial() {
-        return Initial.class.getName();
-    }
-
-    public String getIntermed() {
-        return Intermediate.class.getName();
-    }
-
-    public String getFinal() {
-        return Final.class.getName();
-    }
-
-    static public class Initial extends EvalFunc<Tuple> {
-        private static TupleFactory tfact = TupleFactory.getInstance();
+public class FloatMax extends AlgebraicFloatMathBase {
 
-        @Override
-        public Tuple exec(Tuple input) throws IOException {
-            try {
-                // input is a bag with one tuple containing
-                // the column we are trying to max on
-                DataBag bg = (DataBag) input.get(0);
-                Float f = null;
-                if(bg.iterator().hasNext()) {
-                    Tuple tp = bg.iterator().next();
-                    f = (Float)(tp.get(0));
-                }
-                return tfact.newTuple(f);
-            } catch (ExecException ee) {
-                throw ee;
-            } catch (Exception e) {
-                int errCode = 2106;
-                String msg = "Error while computing max in " + this.getClass().getSimpleName();
-                throw new ExecException(msg, errCode, PigException.BUG, e);           
-            }
-        }
+    public FloatMax() {
+        setOp(KNOWN_OP.MAX);
     }
 
-    static public class Intermediate extends EvalFunc<Tuple> {
-        private static TupleFactory tfact = TupleFactory.getInstance();
-
+    public static class Intermediate extends AlgebraicFloatMathBase.Intermediate {
         @Override
-        public Tuple exec(Tuple input) throws IOException {
-            try {
-                return tfact.newTuple(max(input));
-            } catch (ExecException ee) {
-                throw ee;
-            } catch (Exception e) {
-                int errCode = 2106;
-                String msg = "Error while computing max in " + this.getClass().getSimpleName();
-                throw new ExecException(msg, errCode, PigException.BUG, e);           
+        public KNOWN_OP getOp() {
+            return KNOWN_OP.MAX;
             }
         }
-    }
-    static public class Final extends EvalFunc<Float> {
-        @Override
-        public Float exec(Tuple input) throws IOException {
-            try {
-                return max(input);
-            } catch (ExecException ee) {
-                throw ee;
-            } catch (Exception e) {
-                int errCode = 2106;
-                String msg = "Error while computing max in " + this.getClass().getSimpleName();
-                throw new ExecException(msg, errCode, PigException.BUG, e);           
-            }
-        }
-    }
-
-    static protected Float max(Tuple input) throws ExecException {
-        DataBag values = (DataBag)(input.get(0));
-
-        // if we were handed an empty bag, return NULL
-        // this is in compliance with SQL standard
-        if(values.size() == 0) {
-            return null;
-        }
-
-        float curMax = Float.NEGATIVE_INFINITY;
-        boolean sawNonNull = false;
-        for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
-            Tuple t = it.next();
-            try {
-                Float f = (Float)(t.get(0));
-                if (f == null) continue;
-                sawNonNull = true;
-                curMax = java.lang.Math.max(curMax, f);
-            } catch (RuntimeException exp) {
-                int errCode = 2103;
-                String msg = "Problem while computing max of floats.";
-                throw new ExecException(msg, errCode, PigException.BUG, exp);
-            }
-        }
-
-        if(sawNonNull) {
-            return new Float(curMax);
-        } else {
-            return null;
-        }
-    }
-
-    @Override
-    public Schema outputSchema(Schema input) {
-        return new Schema(new Schema.FieldSchema(null, DataType.FLOAT)); 
-    }
-    
-    /* Accumulator interface */
-    
-    private Float intermediateMax = null;
     
+    public static class Final extends AlgebraicFloatMathBase.Final {
     @Override
-    public void accumulate(Tuple b) throws IOException {
-        try {
-            Float curMax = max(b);
-            if (curMax == null) {
-                return;
-            }
-            /* if bag is not null, initialize intermediateMax to negative infinity */
-            if (intermediateMax == null) {
-                intermediateMax = Float.NEGATIVE_INFINITY;
-            }
-            intermediateMax = java.lang.Math.max(intermediateMax, curMax);
-        } catch (ExecException ee) {
-            throw ee;
-        } catch (Exception e) {
-            int errCode = 2106;
-            String msg = "Error while computing min in " + this.getClass().getSimpleName();
-            throw new ExecException(msg, errCode, PigException.BUG, e);           
+        public KNOWN_OP getOp() {
+            return KNOWN_OP.MAX;
         }
     }
 
-    @Override
-    public void cleanup() {
-        intermediateMax = null;
-    }
-
-    @Override
-    public Float getValue() {
-        return intermediateMax;
-    }
 }

Modified: pig/trunk/src/org/apache/pig/builtin/FloatMin.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/FloatMin.java?rev=1214029&r1=1214028&r2=1214029&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/FloatMin.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/FloatMin.java Wed Dec 14 01:54:08 2011
@@ -17,174 +17,27 @@
  */
 package org.apache.pig.builtin;
 
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.apache.pig.Accumulator;
-import org.apache.pig.Algebraic;
-import org.apache.pig.EvalFunc;
-import org.apache.pig.PigException;
-import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DataType;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-
 /**
  * This method should never be used directly, use {@link MIN}.
  */
-public class FloatMin extends EvalFunc<Float> implements Algebraic, Accumulator<Float> {
-
-    @Override
-    public Float exec(Tuple input) throws IOException {
-        try {
-            return min(input);
-        } catch (ExecException ee) {
-            throw ee;
-        } catch (Exception e) {
-            int errCode = 2106;
-            String msg = "Error while computing min in " + this.getClass().getSimpleName();
-            throw new ExecException(msg, errCode, PigException.BUG, e);           
-        }
-    }
-
-    public String getInitial() {
-        return Initial.class.getName();
-    }
-
-    public String getIntermed() {
-        return Intermediate.class.getName();
-    }
+public class FloatMin extends AlgebraicFloatMathBase {
 
-    public String getFinal() {
-        return Final.class.getName();
+    public FloatMin() {
+        setOp(KNOWN_OP.MIN);
     }
 
-    static public class Initial extends EvalFunc<Tuple> {
-        private static TupleFactory tfact = TupleFactory.getInstance();
-
+    public static class Intermediate extends AlgebraicFloatMathBase.Intermediate {
         @Override
-        public Tuple exec(Tuple input) throws IOException {
-            try {
-                // input is a bag with one tuple containing
-                // the column we are trying to min on
-                DataBag bg = (DataBag) input.get(0);
-                Float f = null;
-                if(bg.iterator().hasNext()) {
-                    Tuple tp = bg.iterator().next();
-                    f = (Float)(tp.get(0));
+        public KNOWN_OP getOp() {
+            return KNOWN_OP.MIN;
                 }
-                return tfact.newTuple(f);
-            } catch (ExecException ee) {
-                throw ee;
-            } catch (Exception e) {
-                int errCode = 2106;
-                String msg = "Error while computing min in " + this.getClass().getSimpleName();
-                throw new ExecException(msg, errCode, PigException.BUG, e);           
             }
-        }
-    }
-
-    static public class Intermediate extends EvalFunc<Tuple> {
-        private static TupleFactory tfact = TupleFactory.getInstance();
 
+    public static class Final extends AlgebraicFloatMathBase.Final {
         @Override
-        public Tuple exec(Tuple input) throws IOException {
-            try {
-                return tfact.newTuple(min(input));
-            } catch (ExecException ee) {
-                throw ee;
-            } catch (Exception e) {
-                int errCode = 2106;
-                String msg = "Error while computing min in " + this.getClass().getSimpleName();
-                throw new ExecException(msg, errCode, PigException.BUG, e);           
-            }
-        }
-    }
-    static public class Final extends EvalFunc<Float> {
-        @Override
-        public Float exec(Tuple input) throws IOException {
-            try {
-                return min(input);
-            } catch (ExecException ee) {
-                throw ee;
-            } catch (Exception e) {
-                int errCode = 2106;
-                String msg = "Error while computing min in " + this.getClass().getSimpleName();
-                throw new ExecException(msg, errCode, PigException.BUG, e);           
+        public KNOWN_OP getOp() {
+            return KNOWN_OP.MIN;
             }
         }
-    }
-
-    static protected Float min(Tuple input) throws ExecException {
-        DataBag values = (DataBag)input.get(0);
-
-        // if we were handed an empty bag, return NULL
-        // this is in compliance with SQL standard
-        if(values.size() == 0) {
-            return null;
-        }
-
-        float curMin = Float.POSITIVE_INFINITY;
-        boolean sawNonNull = false;
-        for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
-            Tuple t = it.next();
-            try {
-                Float f = (Float)(t.get(0));
-                if (f == null) continue;
-                sawNonNull = true;
-                curMin = java.lang.Math.min(curMin, f);
-            } catch (RuntimeException exp) {
-                int errCode = 2103;
-                String msg = "Problem while computing min of floats.";
-                throw new ExecException(msg, errCode, PigException.BUG, exp);
-            }
-        }
-    
-        if(sawNonNull) {
-            return new Float(curMin);
-        } else {
-            return null;
-        }
-    }
-
-    @Override
-    public Schema outputSchema(Schema input) {
-        return new Schema(new Schema.FieldSchema(null, DataType.FLOAT)); 
-    }
-    
-    /* Accumulator interface implementation */
-    private Float intermediateMin = null;
-    
-    @Override
-    public void accumulate(Tuple b) throws IOException {
-        try {
-            Float curMin = min(b);
-            if (curMin == null) {
-                return;
-            }
-            /* if bag is not null, initialize intermediateMax to negative infinity */
-            if (intermediateMin == null) {
-                intermediateMin = Float.POSITIVE_INFINITY;
-            }
-            intermediateMin = java.lang.Math.min(intermediateMin, curMin);
-        } catch (ExecException ee) {
-            throw ee;
-        } catch (Exception e) {
-            int errCode = 2106;
-            String msg = "Error while computing min in " + this.getClass().getSimpleName();
-            throw new ExecException(msg, errCode, PigException.BUG, e);           
-        }
-    }
-
-    @Override
-    public void cleanup() {
-        intermediateMin = null;
-    }
 
-    @Override
-    public Float getValue() {
-        return intermediateMin;
-    }    
 }

Modified: pig/trunk/src/org/apache/pig/builtin/FloatSum.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/FloatSum.java?rev=1214029&r1=1214028&r2=1214029&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/FloatSum.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/FloatSum.java Wed Dec 14 01:54:08 2011
@@ -17,214 +17,10 @@
  */
 package org.apache.pig.builtin;
 
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.apache.pig.Accumulator;
-import org.apache.pig.Algebraic;
-import org.apache.pig.EvalFunc;
-import org.apache.pig.PigException;
-import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DataType;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-
-
 /**
  * This method should never be used directly, use {@link SUM}.
  */
-public class FloatSum extends EvalFunc<Double> implements Algebraic, Accumulator<Double>{
-
-    @Override
-    public Double exec(Tuple input) throws IOException {
-        try {
-            return sum(input);
-        } catch (ExecException ee) {
-            throw ee;
-        } catch (Exception e) {
-            int errCode = 2106;
-            String msg = "Error while computing sum in " + this.getClass().getSimpleName();
-            throw new ExecException(msg, errCode, PigException.BUG, e);           
-        }
-    }
-
-    public String getInitial() {
-        return Initial.class.getName();
-    }
-
-    public String getIntermed() {
-        return Intermediate.class.getName();
-    }
-
-    public String getFinal() {
-        return Final.class.getName();
-    }
-
-    static public class Initial extends EvalFunc<Tuple> {
-        private static TupleFactory tfact = TupleFactory.getInstance();
-
-        @Override
-        public Tuple exec(Tuple input) throws IOException {
-            // Initial is called in the map - for SUM
-            // we just send the tuple down
-            try {
-                // input is a bag with one tuple containing
-                // the column we are trying to sum
-                DataBag bg = (DataBag) input.get(0);
-                Float f = null;
-                if(bg.iterator().hasNext()) {
-                    Tuple tp = bg.iterator().next();
-                    f = (Float)tp.get(0);
-                }
-                // send down a double since intermediate
-                // would  be sending a double
-                return tfact.newTuple(f != null ? 
-                        new Double(f) : null);
-            } catch (ExecException e) {
-                throw e;
-            } catch (Exception e) {
-                int errCode = 2106;
-                String msg = "Error while computing sum in " + this.getClass().getSimpleName();
-                throw new ExecException(msg, errCode, PigException.BUG, e);
-            }
-        }
-    }
-    static public class Intermediate extends EvalFunc<Tuple> {
-        private static TupleFactory tfact = TupleFactory.getInstance();
-
-        @Override
-        public Tuple exec(Tuple input) throws IOException {
-            try {
-                return tfact.newTuple(sumDoubles(input));
-            } catch (ExecException ee) {
-                throw ee;
-            } catch (Exception e) {
-                int errCode = 2106;
-                String msg = "Error while computing sum in " + this.getClass().getSimpleName();
-                throw new ExecException(msg, errCode, PigException.BUG, e);
-            }
-        }
-    }
-    static public class Final extends EvalFunc<Double> {
-        @Override
-        public Double exec(Tuple input) throws IOException {
-            try {
-                return sumDoubles(input);                
-            } catch (ExecException ee) {
-                throw ee;
-            } catch (Exception e) {
-                int errCode = 2106;
-                String msg = "Error while computing sum in " + this.getClass().getSimpleName();
-                throw new ExecException(msg, errCode, PigException.BUG, e);
-            }
-        }
-    }
-    
-    static protected Double sumDoubles(Tuple input) throws ExecException {
-        // Can't just call sum, because the intermediate results are
-        // now Doubles insteads of Floats.
-        DataBag values = (DataBag)input.get(0);
-
-        // if we were handed an empty bag, return NULL
-        // this is in compliance with SQL standard
-        if(values.size() == 0) {
-            return null;
-        }
-
-        double sum = 0;
-        boolean sawNonNull = false;
-        for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
-            Tuple t = it.next();
-            try {
-                Double d = (Double)(t.get(0));
-                if (d == null) continue;
-                sawNonNull = true;
-                sum += d;
-            }catch(RuntimeException exp) {
-                ExecException newE =  new ExecException("Error processing: " +
-                        t.toString() + exp.getMessage(), exp);
-                    throw newE;
-            }
-        }
-
-        
-        if(sawNonNull) {
-            return new Double(sum);
-        } else {
-            return null;
-        }
-
-    }
-
-    static protected  Double sum(Tuple input) throws ExecException {
-        DataBag values = (DataBag)input.get(0);
-        
-        // if we were handed an empty bag, return NULL
-        // this is in compliance with SQL standard
-        if(values.size() == 0) {
-            return null;
-        }
-
-        double sum = 0.0;
-        boolean sawNonNull = false;
-        for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
-            Tuple t = it.next();
-            try {
-                Float f = (Float)(t.get(0));
-                if (f == null) continue;
-                sawNonNull = true;
-                sum += f;
-            }catch(RuntimeException exp) {
-                int errCode = 2103;
-                String msg = "Problem while computing sum of floats.";
-                throw new ExecException(msg, errCode, PigException.BUG, exp);
-            }
-        }
-
-        
-        if(sawNonNull) {
-            return new Double(sum);
-        } else {
-            return null;
-        }
-
-    }
-
-    @Override
-    public Schema outputSchema(Schema input) {
-        return new Schema(new Schema.FieldSchema(null, DataType.DOUBLE)); 
-    }
-    
-    /* Accumulator interface implementation*/
-    private Double intermediateSum = null;
-    
-    @Override
-    public void accumulate(Tuple b) throws IOException {
-        try {
-            Double curSum = sum(b);
-            if (curSum == null) {
-                return;
-            }
-            intermediateSum = (intermediateSum == null ? 0.0 : intermediateSum) + curSum;
-        } catch (ExecException ee) {
-            throw ee;
-        } catch (Exception e) {
-            int errCode = 2106;
-            String msg = "Error while computing min in " + this.getClass().getSimpleName();
-            throw new ExecException(msg, errCode, PigException.BUG, e);           
-        }
-    }
-
-    @Override
-    public void cleanup() {
-        intermediateSum = null;
-    }
-
-    @Override
-    public Double getValue() {
-        return intermediateSum;
-    }    
-
+public class FloatSum extends DoubleSum {
+    // just here for backwards compatibility
+    public FloatSum() {}
 }



Mime
View raw message