hadoop-hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From na...@apache.org
Subject svn commit: r794129 [1/8] - in /hadoop/hive/trunk: ./ ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ ql/src/java/org/apache/hadoop/hive/ql/parse/ ql/src/java/org/apache/hadoop/hive/ql/plan/ ql/src/java/org/...
Date Wed, 15 Jul 2009 01:52:45 GMT
Author: namit
Date: Wed Jul 15 01:52:41 2009
New Revision: 794129

URL: http://svn.apache.org/viewvc?rev=794129&view=rev
Log:
HIVE-522. Extend GenericUDAF to support complex types
(Zheng Shao via namit)


Added:
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFBridge.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCount.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFEvaluator.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFResolver.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFSum.java
Modified:
    hadoop/hive/trunk/CHANGES.txt
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/AmbiguousMethodException.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionInfo.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/UDFArgumentException.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticException.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/aggregationDesc.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/exprNodeFuncDesc.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/exprNodeGenericFuncDesc.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/groupByDesc.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFUtils.java
    hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExpressionEvaluator.java
    hadoop/hive/trunk/ql/src/test/results/clientpositive/binarysortable_1.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/cluster.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/groupby1.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/groupby1_limit.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/groupby1_map.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/groupby1_map_nomap.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/groupby1_map_skew.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/groupby1_noskew.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/groupby2.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/groupby2_limit.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/groupby2_map.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/groupby2_map_skew.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/groupby2_noskew.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/groupby3.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/groupby3_map.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/groupby3_map_skew.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/groupby3_noskew.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/groupby4.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/groupby5.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/groupby5_map.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/groupby5_map_skew.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/groupby5_noskew.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/groupby6.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/groupby6_map_skew.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/groupby7_map.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/groupby7_map_skew.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/groupby7_noskew.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/groupby8.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/groupby8_map.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/groupby8_map_skew.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/groupby8_noskew.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/groupby9.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/input14_limit.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/input1_limit.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/input25.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/input26.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/input3_limit.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/input4_limit.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/join0.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/join13.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/join15.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/join18.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/join19.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/join2.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/join20.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/join21.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/join22.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/join23.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/join24.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/join29.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/join31.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/join32.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/join33.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/join35.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/join_hive_626.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/no_hooks.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/nullgroup.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/nullgroup2.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/nullgroup4.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/ppd_clusterby.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/ppd_gby2.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/ppd_gby_join.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/ppd_join2.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/ppd_udf_case.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/regex_col.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/sample8.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/udf3.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/udf8.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/udf_case_column_pruning.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/union10.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/union11.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/union12.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/union14.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/union15.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/union17.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/union18.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/union19.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/union20.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/union3.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/union4.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/union5.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/union6.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/union7.q.out
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/case_sensitivity.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/groupby1.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/groupby2.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/groupby3.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/groupby4.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/groupby5.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/groupby6.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/input1.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/input2.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/input20.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/input3.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/input4.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/input5.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/input6.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/input7.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/input9.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/input_testsequencefile.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/join1.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/join2.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/join3.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/join4.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/join5.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/join6.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/join7.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/join8.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/sample2.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/sample3.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/sample4.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/sample5.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/sample6.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/sample7.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/subq.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/union.q.xml
    hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/BinarySortableSerDe.java
    hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/lazy/objectinspector/LazySimpleStructObjectInspector.java
    hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ColumnarStructObjectInspector.java
    hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java
    hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ReflectionStructObjectInspector.java
    hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/StandardStructObjectInspector.java
    hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/StructObjectInspector.java
    hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/UnionStructObjectInspector.java
    hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/primitive/PrimitiveObjectInspectorFactory.java
    hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/primitive/PrimitiveObjectInspectorUtils.java
    hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/typeinfo/TypeInfoUtils.java

Modified: hadoop/hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/CHANGES.txt?rev=794129&r1=794128&r2=794129&view=diff
==============================================================================
--- hadoop/hive/trunk/CHANGES.txt (original)
+++ hadoop/hive/trunk/CHANGES.txt Wed Jul 15 01:52:41 2009
@@ -71,6 +71,9 @@
     HIVE-438. Make Hive work with apache thrift
     (Raghu Murthy via namit)
 
+    HIVE-522. Extend GenericUDAF to support complex types
+    (Zheng Shao via namit)
+
   IMPROVEMENTS
     HIVE-389. Option to build without ivy (jssarma)
 

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/AmbiguousMethodException.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/AmbiguousMethodException.java?rev=794129&r1=794128&r2=794129&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/AmbiguousMethodException.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/AmbiguousMethodException.java Wed Jul 15 01:52:41 2009
@@ -20,13 +20,15 @@
 
 import java.util.List;
 
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 
 /**
  * Exception thrown by the UDF and UDAF method resolvers in case a unique method is not found.
  *
  */
-public class AmbiguousMethodException extends Exception {
+public class AmbiguousMethodException extends SemanticException {
 
   /**
    * 

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java?rev=794129&r1=794128&r2=794129&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java Wed Jul 15 01:52:41 2009
@@ -339,8 +339,6 @@
     job.setMapperClass(ExecMapper.class);
 
     job.setMapOutputKeyClass(HiveKey.class);
-    // LazySimpleSerDe writes to Text
-    // Revert to DynamicSerDe: job.setMapOutputValueClass(BytesWritable.class); 
     job.setMapOutputValueClass(Text.class);
 
     job.setNumReduceTasks(work.getNumReduceTasks().intValue());

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionInfo.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionInfo.java?rev=794129&r1=794128&r2=794129&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionInfo.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionInfo.java Wed Jul 15 01:52:41 2009
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hive.ql.exec;
 
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFResolver;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
 
 public class FunctionInfo {
@@ -29,33 +30,33 @@
   private boolean isOperator;
   
   private Class<? extends UDF> udfClass;
-
-  private Class<? extends UDAF> udafClass;
   
   private Class<? extends GenericUDF> genericUDFClass;
 
+  private GenericUDAFResolver genericUDAFResolver;
+
   public static enum OperatorType { NO_OP, PREFIX, INFIX, POSTFIX };
 
-  public FunctionInfo(String displayName, Class<? extends UDF> udfClass, Class<? extends UDAF> udafClass,
+  public FunctionInfo(String displayName, Class<? extends UDF> udfClass,
       Class<? extends GenericUDF> genericUdfClass) {
-    assert(udfClass == null || udafClass == null);
     this.displayName = displayName;
     opType = OperatorType.NO_OP;
     isOperator = false;
     this.udfClass = udfClass;
-    this.udafClass = udafClass;
     this.genericUDFClass = genericUdfClass;
+    this.genericUDAFResolver = null;
   }
 
-  public FunctionInfo(String displayName, OperatorType opType, Class<? extends UDF> udfClass) {
+  public FunctionInfo(String displayName, GenericUDAFResolver genericUDAFResolver) {
     this.displayName = displayName;
-    this.opType = opType;
-    this.udfClass = udfClass;
-    this.udafClass = null;
+    this.opType = OperatorType.NO_OP;
+    this.udfClass = null;
+    this.genericUDFClass = null;
+    this.genericUDAFResolver = genericUDAFResolver;
   }
 
   public boolean isAggFunction() {
-    return (udafClass != null && udfClass == null);
+    return genericUDAFResolver != null;
   }
 
   public boolean isOperator() {
@@ -78,14 +79,14 @@
     return udfClass;
   }
 
-  public Class<? extends UDAF> getUDAFClass() {
-    return udafClass;
-  }
-  
   public Class<? extends GenericUDF> getGenericUDFClass() {
     return genericUDFClass;
   }
   
+  public GenericUDAFResolver getGenericUDAFResolver() {
+    return genericUDAFResolver;
+  }
+  
   public String getDisplayName() {
     return displayName;
   }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java?rev=794129&r1=794128&r2=794129&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java Wed Jul 15 01:52:41 2009
@@ -30,6 +30,7 @@
 
 import org.apache.hadoop.hive.ql.exec.FunctionInfo.OperatorType;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.groupByDesc;
 import org.apache.hadoop.hive.ql.udf.*;
 import org.apache.hadoop.hive.ql.udf.generic.*;
@@ -38,6 +39,7 @@
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.util.ReflectionUtils;
 
 public class FunctionRegistry {
 
@@ -154,11 +156,11 @@
                 UDFToString.class.getSimpleName());
 
     // Aggregate functions
-    registerUDAF("sum", UDAFSum.class);
-    registerUDAF("count", UDAFCount.class);
+    registerGenericUDAF("sum", new GenericUDAFSum());
+    registerGenericUDAF("count", new GenericUDAFCount());
+    registerGenericUDAF("avg", new GenericUDAFAverage());
     registerUDAF("max", UDAFMax.class);
     registerUDAF("min", UDAFMin.class);
-    registerUDAF("avg", UDAFAvg.class);
     
     // Generic UDFs
     registerGenericUDF("case", GenericUDFCase.class);
@@ -168,24 +170,12 @@
     registerGenericUDF("index", GenericUDFIndex.class);
   }
 
-  public static FunctionInfo getInfo(Class<?> fClass) {
+  public static FunctionInfo getUDFInfo(Class<?> fClass) {
     for(Map.Entry<String, FunctionInfo> ent: mFunctions.entrySet()) {
       FunctionInfo val = ent.getValue();
       if (val.getUDFClass() == fClass) {
         return val;
       }
-      // Otherwise this is potentially an aggregate evaluator
-      if (val.getUDAFClass() == fClass) {
-        return val;
-      }
-      // Otherwise check if the aggregator is one of the classes within the UDAF
-      if (val.getUDAFClass() != null) {
-        for(Class<?> c: val.getUDAFClass().getClasses()) {
-          if (c == fClass) {
-            return val;
-          }
-        }
-      }
     }
 
     return null;
@@ -194,7 +184,7 @@
   public static void registerUDF(String functionName, Class<? extends UDF> UDFClass,
                                  FunctionInfo.OperatorType opt, boolean isOperator) {
     if (UDF.class.isAssignableFrom(UDFClass)) {
-      FunctionInfo fI = new FunctionInfo(functionName.toLowerCase(), UDFClass, null, null);
+      FunctionInfo fI = new FunctionInfo(functionName.toLowerCase(), UDFClass, null);
       fI.setIsOperator(isOperator);
       fI.setOpType(opt);
       mFunctions.put(functionName.toLowerCase(), fI);
@@ -207,7 +197,7 @@
                                  FunctionInfo.OperatorType opt, boolean isOperator,
                                  String displayName) {
     if (UDF.class.isAssignableFrom(UDFClass)) {
-      FunctionInfo fI = new FunctionInfo(displayName, UDFClass, null, null);
+      FunctionInfo fI = new FunctionInfo(displayName, UDFClass, null);
       fI.setIsOperator(isOperator);
       fI.setOpType(opt);
       mFunctions.put(functionName.toLowerCase(), fI);
@@ -218,7 +208,7 @@
 
   public static void registerGenericUDF(String functionName, Class<? extends GenericUDF> genericUDFClass) {
     if (GenericUDF.class.isAssignableFrom(genericUDFClass)) {
-      FunctionInfo fI = new FunctionInfo(functionName, null, null, genericUDFClass);
+      FunctionInfo fI = new FunctionInfo(functionName, null, genericUDFClass);
       mFunctions.put(functionName.toLowerCase(), fI);
     } else {
       throw new RuntimeException("Registering GenericUDF Class " + genericUDFClass
@@ -343,25 +333,21 @@
   }
 
   /**
-   * Get the UDAF evaluator for the name and argumentClasses.
+   * Get the GenericUDAF evaluator for the name and argumentClasses.
    * @param name the name of the UDAF
    * @param argumentTypeInfos
    * @return The UDAF evaluator
    */
-  public static Class<? extends UDAFEvaluator> getUDAFEvaluator(String name, List<TypeInfo> argumentTypeInfos) {
-    Class<? extends UDAF> udf = getUDAF(name);
-    if (udf == null) return null;
-
-    Class<? extends UDAFEvaluator> evalClass = null;
-    try {
-      evalClass = udf.newInstance().getResolver().getEvaluatorClass(argumentTypeInfos);
+  public static GenericUDAFEvaluator getGenericUDAFEvaluator(String name, List<TypeInfo> argumentTypeInfos) 
+      throws SemanticException {
+    GenericUDAFResolver udaf = getGenericUDAFResolver(name);
+    if (udaf == null) return null;
+
+    TypeInfo[] parameters = new TypeInfo[argumentTypeInfos.size()];
+    for(int i=0; i<parameters.length; i++) {
+      parameters[i] = argumentTypeInfos.get(i);
     }
-    catch (AmbiguousMethodException e) {
-    }
-    catch (Exception e) {
-      throw new RuntimeException("Cannot get UDAF for " + name + argumentTypeInfos, e);
-    }
-    return evalClass;
+    return udaf.getEvaluator(parameters);
   }
 
   /**
@@ -385,66 +371,28 @@
     return getUDFMethod(name, Arrays.asList(argumentClasses));
   }
 
-  public static void registerUDAF(String functionName, Class<? extends UDAF> UDAFClass) {
+  public static void registerGenericUDAF(String functionName, GenericUDAFResolver genericUDAFResolver) {
+    mFunctions.put(functionName.toLowerCase(), 
+        new FunctionInfo(functionName.toLowerCase(), genericUDAFResolver));
+  }
 
-    if (UDAF.class.isAssignableFrom(UDAFClass)) {
-      mFunctions.put(functionName.toLowerCase(), new FunctionInfo(functionName
-                                                                  .toLowerCase(), null, UDAFClass, null));
-    } else {
-      throw new RuntimeException("Registering UDAF Class " + UDAFClass
-                                 + " which does not extends " + UDAF.class);
-    }
-    mFunctions.put(functionName.toLowerCase(), new FunctionInfo(functionName
-                                                                .toLowerCase(), null, UDAFClass, null));
+  public static void registerUDAF(String functionName, Class<? extends UDAF> udafClass) {
+    mFunctions.put(functionName.toLowerCase(), 
+        new FunctionInfo(functionName.toLowerCase(), 
+            new GenericUDAFBridge((UDAF)ReflectionUtils.newInstance(udafClass, null))));
   }
 
-  public static Class<? extends UDAF> getUDAF(String functionName) {
-    LOG.debug("Looking up UDAF: " + functionName);
+  public static GenericUDAFResolver getGenericUDAFResolver(String functionName) {
+    LOG.debug("Looking up GenericUDAF: " + functionName);
     FunctionInfo finfo = mFunctions.get(functionName.toLowerCase());
     if (finfo == null) {
       return null;
     }
-    Class<? extends UDAF> result = finfo.getUDAFClass();
+    GenericUDAFResolver result = finfo.getGenericUDAFResolver();
     return result;
   }
 
-  /**
-   * Returns the "iterate" method of the UDAF.
-   */
-  public static Method getUDAFMethod(String name, List<TypeInfo> argumentClasses) {
-    Class<? extends UDAF> udaf = getUDAF(name);
-    if (udaf == null)
-      return null;
-    return FunctionRegistry.getMethodInternal(udaf, "iterate", false,
-                                         argumentClasses);
-  }
-
-  /**
-   * Returns the evaluate method for the UDAF based on the aggregation mode.
-   * See groupByDesc.Mode for details.
-   *
-   * @param name  name of the UDAF
-   * @param mode  the mode of the aggregation
-   * @return      null if no such UDAF is found
-   */
-  public static Method getUDAFEvaluateMethod(String name, groupByDesc.Mode mode) {
-    Class<? extends UDAF> udaf = getUDAF(name);
-    if (udaf == null)
-      return null;
-    return FunctionRegistry.getMethodInternal(udaf,
-        (mode == groupByDesc.Mode.COMPLETE || mode == groupByDesc.Mode.FINAL)
-        ? "terminate" : "terminatePartial", true,
-        new ArrayList<TypeInfo>() );
-  }
-
-  /**
-   * Returns the "aggregate" method of the UDAF.
-   */
-  public static Method getUDAFMethod(String name, TypeInfo... argumentClasses) {
-    return getUDAFMethod(name, Arrays.asList(argumentClasses));
-  }
-
-  public static Object invoke(Method m, Object thisObject, Object[] arguments) throws HiveException {
+  public static Object invoke(Method m, Object thisObject, Object... arguments) throws HiveException {
     Object o;
     try {
       o = m.invoke(thisObject, arguments);

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java?rev=794129&r1=794128&r2=794129&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java Wed Jul 15 01:52:41 2009
@@ -57,7 +57,7 @@
                                               (Class<? extends GenericUDF>) udfClass);
           return 0;
         } else if(UDAF.class.isAssignableFrom(udfClass)) {
-          FunctionRegistry.registerUDAF(createFunctionDesc.getFunctionName(), 
+          FunctionRegistry.registerUDAF(createFunctionDesc.getFunctionName(),
                                         (Class<? extends UDAF>) udfClass);
           return 0;
         } 

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java?rev=794129&r1=794128&r2=794129&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java Wed Jul 15 01:52:41 2009
@@ -25,7 +25,6 @@
 import java.util.Iterator;
 import java.util.Map;
 import java.io.Serializable;
-import java.lang.reflect.Method;
 import java.lang.reflect.Field;
 import java.lang.IllegalAccessException;
 
@@ -36,14 +35,13 @@
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
-import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
 import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.ql.parse.OpParseContext;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -74,22 +72,20 @@
   // so aggregationIsDistinct is a boolean array instead of a single number. 
   transient protected boolean[] aggregationIsDistinct;
 
-  transient Class<? extends UDAFEvaluator>[] aggregationClasses; 
-  transient protected Method[] aggregationsAggregateMethods;
-  transient protected Method[] aggregationsEvaluateMethods;
-
+  transient GenericUDAFEvaluator[] aggregationEvaluators;
+  
   transient protected ArrayList<ObjectInspector> objectInspectors;
   transient protected ObjectInspector outputObjectInspector;
   transient ArrayList<String> fieldNames;
 
-  // Used by sort-based GroupBy: Mode = COMPLETE, PARTIAL1, PARTIAL2
+  // Used by sort-based GroupBy: Mode = COMPLETE, PARTIAL1, PARTIAL2, MERGEPARTIAL
   transient protected ArrayList<Object> currentKeys;
   transient protected ArrayList<Object> newKeys;  
-  transient protected UDAFEvaluator[] aggregations;
+  transient protected AggregationBuffer[] aggregations;
   transient protected Object[][] aggregationsParametersLastInvoke;
 
-  // Used by hash-based GroupBy: Mode = HASH
-  transient protected HashMap<ArrayList<Object>, UDAFEvaluator[]> hashAggregations;
+  // Used by hash-based GroupBy: Mode = HASH, PARTIALS
+  transient protected HashMap<ArrayList<Object>, AggregationBuffer[]> hashAggregations;
   
   transient boolean firstRow;
   transient long    totalMemory;
@@ -144,19 +140,23 @@
     numRowsInput = 0;
     numRowsHashTbl = 0;
 
+    assert(inputObjInspector.length == 1);
+    ObjectInspector rowInspector = inputObjInspector[0];
+
     // init keyFields
     keyFields = new ExprNodeEvaluator[conf.getKeys().size()];
     keyObjectInspectors = new ObjectInspector[conf.getKeys().size()];
+    currentKeyObjectInspectors = new ObjectInspector[conf.getKeys().size()];
     keyObjects = new Object[conf.getKeys().size()];
     for (int i = 0; i < keyFields.length; i++) {
       keyFields[i] = ExprNodeEvaluatorFactory.get(conf.getKeys().get(i));
-      keyObjectInspectors[i] = null;
+      keyObjectInspectors[i] = keyFields[i].initialize(rowInspector);
+      currentKeyObjectInspectors[i] = ObjectInspectorUtils.getStandardObjectInspector(keyObjectInspectors[i], 
+          ObjectInspectorCopyOption.WRITABLE);
       keyObjects[i] = null;
     }
     newKeys = new ArrayList<Object>(keyFields.length);
     
-    currentKeyObjectInspectors = new ObjectInspector[conf.getKeys().size()];
-    
     // init aggregationParameterFields
     aggregationParameterFields = new ExprNodeEvaluator[conf.getAggregators().size()][];
     aggregationParameterObjectInspectors = new ObjectInspector[conf.getAggregators().size()][];
@@ -170,8 +170,10 @@
       aggregationParameterObjects[i] = new Object[parameters.size()];
       for (int j = 0; j < parameters.size(); j++) {
         aggregationParameterFields[i][j] = ExprNodeEvaluatorFactory.get(parameters.get(j));
-        aggregationParameterObjectInspectors[i][j] = null;
-        aggregationParameterStandardObjectInspectors[i][j] = null;
+        aggregationParameterObjectInspectors[i][j] = aggregationParameterFields[i][j].initialize(rowInspector);
+        aggregationParameterStandardObjectInspectors[i][j] = 
+            ObjectInspectorUtils.getStandardObjectInspector(aggregationParameterObjectInspectors[i][j], 
+                ObjectInspectorCopyOption.WRITABLE);
         aggregationParameterObjects[i][j] = null;
       }
     }
@@ -182,47 +184,10 @@
     }
 
     // init aggregationClasses  
-    aggregationClasses = (Class<? extends UDAFEvaluator>[]) new Class[conf.getAggregators().size()];
-    for (int i = 0; i < conf.getAggregators().size(); i++) {
+    aggregationEvaluators = new GenericUDAFEvaluator[conf.getAggregators().size()];
+    for (int i = 0; i < aggregationEvaluators.length; i++) {
       aggregationDesc agg = conf.getAggregators().get(i);
-      aggregationClasses[i] = agg.getAggregationClass();
-    }
-
-    // init aggregations, aggregationsAggregateMethods,
-    // aggregationsEvaluateMethods
-    aggregationsAggregateMethods = new Method[aggregationClasses.length];
-    aggregationsEvaluateMethods = new Method[aggregationClasses.length];
-
-    for(int i=0; i<aggregationClasses.length; i++) {
-      String evaluateMethodName = conf.getEvalMethods().get(i);
-      String aggregateMethodName = conf.getAggMethods().get(i);
-
-      // aggregationsAggregateMethods
-      for( Method m : aggregationClasses[i].getMethods() ){
-        if( m.getName().equals( aggregateMethodName ) 
-            && m.getParameterTypes().length == aggregationParameterFields[i].length) {              
-          aggregationsAggregateMethods[i] = m;
-          break;
-        }
-      }
-      if (null == aggregationsAggregateMethods[i]) {
-        throw new HiveException("Cannot find " + aggregateMethodName + " method of UDAF class "
-                                 + aggregationClasses[i].getName() + " that accepts "
-                                 + aggregationParameterFields[i].length + " parameters!");
-      }
-      // aggregationsEvaluateMethods
-      try {
-        aggregationsEvaluateMethods[i] = aggregationClasses[i].getMethod(evaluateMethodName);
-      } catch (Exception e) {
-        throw new HiveException("Unable to get the method named " + evaluateMethodName + " from " 
-            + aggregationClasses[i] + ": " + e.getMessage());
-      }
-
-      if (null == aggregationsEvaluateMethods[i]) {
-        throw new HiveException("Cannot find " + evaluateMethodName + " method of UDAF class "
-                                 + aggregationClasses[i].getName() + "!");
-      }
-      assert(aggregationsEvaluateMethods[i] != null);
+      aggregationEvaluators[i] = agg.createGenericUDAFEvaluator();
     }
 
     aggregationsParametersLastInvoke = new Object[conf.getAggregators().size()][];
@@ -230,7 +195,7 @@
       aggregations = newAggregations();
       hashAggr = false;
     } else {
-      hashAggregations = new HashMap<ArrayList<Object>, UDAFEvaluator[]>();
+      hashAggregations = new HashMap<ArrayList<Object>, AggregationBuffer[]>();
       aggregations = newAggregations();
       hashAggr = true;
       keyPositionsSize = new ArrayList<Integer>();
@@ -243,26 +208,21 @@
     }
 
     // init objectInspectors
-    int totalFields = keyFields.length + aggregationClasses.length;
+    int totalFields = keyFields.length + aggregationEvaluators.length;
     objectInspectors = new ArrayList<ObjectInspector>(totalFields);
     for(int i=0; i<keyFields.length; i++) {
       objectInspectors.add(null);
     }
-    for(int i=0; i<aggregationClasses.length; i++) {
-      objectInspectors.add(PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(
-          PrimitiveObjectInspectorUtils.getTypeEntryFromPrimitiveWritableClass(
-          aggregationsEvaluateMethods[i].getReturnType()).primitiveCategory));
+    for(int i=0; i<aggregationEvaluators.length; i++) {
+      ObjectInspector roi = aggregationEvaluators[i].init(
+          conf.getAggregators().get(i).getMode(), aggregationParameterObjectInspectors[i]);
+      objectInspectors.add(roi);
     }
 
     fieldNames = conf.getOutputColumnNames();
 
     for (int i = 0; i < keyFields.length; i++) {
-      if (keyObjectInspectors[i] == null) {
-        keyObjectInspectors[i] = keyFields[i].initialize(inputObjInspector[0]);
-        currentKeyObjectInspectors[i] = ObjectInspectorUtils.getStandardObjectInspector(keyObjectInspectors[i], 
-            ObjectInspectorCopyOption.WRITABLE);
-      }
-      objectInspectors.set(i, currentKeyObjectInspectors[i]);
+      objectInspectors.set(i, currentKeyObjectInspectors[i]);      
     }
     
     // Generate key names
@@ -271,9 +231,9 @@
       keyNames.add(fieldNames.get(i));
     }
     newKeyObjectInspector = 
-      ObjectInspectorFactory.getStandardStructObjectInspector(keyNames, Arrays.asList(keyObjectInspectors));
+        ObjectInspectorFactory.getStandardStructObjectInspector(keyNames, Arrays.asList(keyObjectInspectors));
     currentKeyObjectInspector = 
-      ObjectInspectorFactory.getStandardStructObjectInspector(keyNames, Arrays.asList(currentKeyObjectInspectors));
+        ObjectInspectorFactory.getStandardStructObjectInspector(keyNames, Arrays.asList(currentKeyObjectInspectors));
     
     outputObjectInspector = 
       ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, objectInspectors);
@@ -294,7 +254,7 @@
    * based on that. 
    * @return number of entries that can fit in hash table - useful for map-side aggregation only
    **/
-  private void computeMaxEntriesHashAggr(Configuration hconf) {
+  private void computeMaxEntriesHashAggr(Configuration hconf) throws HiveException {
     maxHashTblMemory = (long)(HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVEMAPAGGRHASHMEMORY) * Runtime.getRuntime().maxMemory());
     estimateRowSize();
   }
@@ -389,7 +349,7 @@
   /**
    * @return the size of each row
    **/
-  private void estimateRowSize() {
+  private void estimateRowSize() throws HiveException {
     // estimate the size of each entry - 
     // a datatype with unknown size (String/Struct etc. - is assumed to be 256 bytes for now).
     // 64 bytes is the overhead for a reference
@@ -403,10 +363,10 @@
 
     // Go over all the aggregation classes and and get the size of the fields of fixed length. Keep track of the variable length
     // fields in these aggregation classes.
-    for(int i=0; i < aggregationClasses.length; i++) {
+    for(int i=0; i < aggregationEvaluators.length; i++) {
 
       fixedRowSize += javaObjectOverHead;
-      Class<? extends UDAFEvaluator> agg = aggregationClasses[i];
+      Class<? extends AggregationBuffer> agg = aggregationEvaluators[i].getNewAggregationBuffer().getClass();
       Field[] fArr = ObjectInspectorUtils.getDeclaredNonStaticFields(agg);
       for (Field f : fArr) {
         fixedRowSize += getSize(i, f.getType(), f);
@@ -414,22 +374,23 @@
     }
   }
 
-  protected UDAFEvaluator[] newAggregations() throws HiveException {      
-    UDAFEvaluator[] aggs = new UDAFEvaluator[aggregationClasses.length];
-    for(int i=0; i<aggregationClasses.length; i++) {
-      try {
-        aggs[i] = aggregationClasses[i].newInstance();
-      } catch (Exception e) {
-        e.printStackTrace();
-        throw new HiveException("Unable to create an instance of class " + aggregationClasses[i] + ": " + e.getMessage());
-      }
-      aggs[i].init();
+  protected AggregationBuffer[] newAggregations() throws HiveException {      
+    AggregationBuffer[] aggs = new AggregationBuffer[aggregationEvaluators.length];
+    for(int i=0; i<aggregationEvaluators.length; i++) {
+      aggs[i] = aggregationEvaluators[i].getNewAggregationBuffer();
+      // aggregationClasses[i].reset(aggs[i]);
     }
     return aggs;
   }
 
+  protected void resetAggregations(AggregationBuffer[] aggs) throws HiveException {      
+    for(int i=0; i<aggs.length; i++) {
+      aggregationEvaluators[i].reset(aggs[i]);
+    }
+  }
+
   
-  protected void updateAggregations(UDAFEvaluator[] aggs, Object row, ObjectInspector rowInspector, boolean hashAggr, boolean newEntry,
+  protected void updateAggregations(AggregationBuffer[] aggs, Object row, ObjectInspector rowInspector, boolean hashAggr, boolean newEntry,
                                     Object[][] lastInvoke) throws HiveException {
     
     for(int ai=0; ai<aggs.length; ai++) {
@@ -437,58 +398,32 @@
       // Calculate the parameters 
       Object[] o = new Object[aggregationParameterFields[ai].length];
       for(int pi=0; pi<aggregationParameterFields[ai].length; pi++) {
-        ObjectInspector oi = aggregationParameterObjectInspectors[ai][pi];
-        if (oi == null) {
-          oi = aggregationParameterFields[ai][pi].initialize(rowInspector);
-          aggregationParameterObjectInspectors[ai][pi] = oi;
-          aggregationParameterStandardObjectInspectors[ai][pi]
-              = ObjectInspectorUtils.getStandardObjectInspector(oi, ObjectInspectorCopyOption.WRITABLE);
-        }
-        PrimitiveObjectInspector poi = (PrimitiveObjectInspector)oi;
-        o[pi] = poi.getPrimitiveWritableObject(
-            aggregationParameterFields[ai][pi].evaluate(row));
+        o[pi] = aggregationParameterFields[ai][pi].evaluate(row);
       }
 
       // Update the aggregations.
       if (aggregationIsDistinct[ai]) {
         if (hashAggr) {
           if (newEntry) {
-            FunctionRegistry.invoke(aggregationsAggregateMethods[ai], aggs[ai], o);
+            aggregationEvaluators[ai].aggregate(aggs[ai], o);
           }
         }
         else {
-          boolean differentParameters = false;
-          if ((lastInvoke == null) || (lastInvoke[ai] == null))
-            differentParameters = true;
-          else {
-            for(int pi=0; pi<o.length; pi++) {
-              if (o[pi] == null) {
-                if (lastInvoke[ai][pi] != null) {
-                  differentParameters = true;
-                  break;
-                }
-              }
-              else if (!o[pi].equals(lastInvoke[ai][pi])) {
-                differentParameters = true;
-                break;
-              }
-            }  
+          if (lastInvoke[ai] == null) {
+            lastInvoke[ai] = new Object[o.length];
           }
-
-          if (differentParameters) {
-            FunctionRegistry.invoke(aggregationsAggregateMethods[ai], aggs[ai], o);
-            if (lastInvoke[ai] == null) {
-              lastInvoke[ai] = new Object[o.length];
-            }
+          if (ObjectInspectorUtils.compare(o, aggregationParameterObjectInspectors[ai],
+                lastInvoke[ai], aggregationParameterStandardObjectInspectors[ai]) != 0) {
+            aggregationEvaluators[ai].aggregate(aggs[ai], o);
             for (int pi=0; pi<o.length; pi++) {
               lastInvoke[ai][pi] = ObjectInspectorUtils.copyToStandardObject(o[pi],
-                  aggregationParameterStandardObjectInspectors[ai][pi], ObjectInspectorCopyOption.WRITABLE);
+                  aggregationParameterObjectInspectors[ai][pi], ObjectInspectorCopyOption.WRITABLE);
             }
           }
         }
       }
       else {
-        FunctionRegistry.invoke(aggregationsAggregateMethods[ai], aggs[ai], o);
+        aggregationEvaluators[ai].aggregate(aggs[ai], o);
       }
     }
   }
@@ -554,7 +489,7 @@
   
   private void processHashAggr(Object row, ObjectInspector rowInspector, ArrayList<Object> newKeys) throws HiveException {
     // Prepare aggs for updating
-    UDAFEvaluator[] aggs = null;
+    AggregationBuffer[] aggs = null;
     boolean newEntry = false;
 
     // hash-based aggregations
@@ -576,9 +511,10 @@
     }
   }
 
+  // Non-hash aggregation
   private void processAggr(Object row, ObjectInspector rowInspector, ArrayList<Object> newKeys) throws HiveException {
     // Prepare aggs for updating
-    UDAFEvaluator[] aggs = null;
+    AggregationBuffer[] aggs = null;
     Object[][] lastInvoke = null;
     boolean keysAreEqual = ObjectInspectorUtils.compare(
         newKeys, newKeyObjectInspector,
@@ -595,9 +531,8 @@
       }
       deepCopyElements(keyObjects, keyObjectInspectors, currentKeys, ObjectInspectorCopyOption.WRITABLE);
       
-      // Init aggregations
-      for(UDAFEvaluator aggregation: aggregations)
-        aggregation.init();
+      // Reset the aggregations
+      resetAggregations(aggregations);
       
       // clear parameters in last-invoke
       for(int i=0; i<aggregationsParametersLastInvoke.length; i++)
@@ -608,6 +543,7 @@
     
     lastInvoke = aggregationsParametersLastInvoke;
     // Update the aggs
+    
     updateAggregations(aggs, row, rowInspector, false, false, lastInvoke);
   }
 
@@ -632,14 +568,14 @@
         }
       }
 
-      UDAFEvaluator[] aggs = null;
+      AggregationBuffer[] aggs = null;
       if (aggrPositions.size() > 0)
         aggs = hashAggregations.get(newKeys);
 
       for (varLenFields v : aggrPositions) {
         int     aggrPos          = v.getAggrPos();
         List<Field> fieldsVarLen = v.getFields();
-        UDAFEvaluator    agg              = aggs[aggrPos];
+        AggregationBuffer    agg = aggs[aggrPos];
 
         try 
         {
@@ -669,9 +605,10 @@
     // changed in the future
 
     if (complete) {
-      Iterator iter = hashAggregations.entrySet().iterator();
+      Iterator<Map.Entry<ArrayList<Object>, AggregationBuffer[]>> 
+          iter = hashAggregations.entrySet().iterator();
       while (iter.hasNext()) {
-        Map.Entry<ArrayList<Object>, UDAFEvaluator[]> m = (Map.Entry)iter.next();
+        Map.Entry<ArrayList<Object>, AggregationBuffer[]> m = iter.next();
         forward(m.getKey(), m.getValue());
       }
       hashAggregations.clear();
@@ -682,10 +619,11 @@
 
     int oldSize = hashAggregations.size();
     LOG.warn("Hash Tbl flush: #hash table = " + oldSize);
-    Iterator iter = hashAggregations.entrySet().iterator();
+    Iterator<Map.Entry<ArrayList<Object>, AggregationBuffer[]>>
+        iter = hashAggregations.entrySet().iterator();
     int numDel = 0;
     while (iter.hasNext()) {
-      Map.Entry<ArrayList<Object>, UDAFEvaluator[]> m = (Map.Entry)iter.next();
+      Map.Entry<ArrayList<Object>, AggregationBuffer[]> m = iter.next();
       forward(m.getKey(), m.getValue());
       iter.remove();
       numDel++;
@@ -705,7 +643,7 @@
    *          The keys in the record
    * @throws HiveException
    */
-  protected void forward(ArrayList<Object> keys, UDAFEvaluator[] aggs) throws HiveException {
+  protected void forward(ArrayList<Object> keys, AggregationBuffer[] aggs) throws HiveException {
     int totalFields = keys.size() + aggs.length;
     if (forwardCache == null) {
       forwardCache = new Object[totalFields];
@@ -714,12 +652,7 @@
       forwardCache[i] = keys.get(i);
     }
     for(int i=0; i<aggs.length; i++) {
-      try {
-        forwardCache[keys.size() + i] = aggregationsEvaluateMethods[i].invoke(aggs[i]);
-      } catch (Exception e) {
-        throw new HiveException("Unable to execute UDAF function " + aggregationsEvaluateMethods[i] + " " 
-            + " on object " + "(" + aggs[i] + ") " + ": " + e.getMessage());
-      }
+      forwardCache[keys.size() + i] = aggregationEvaluators[i].evaluate(aggs[i]);
     }
     forward(forwardCache, outputObjectInspector);
   }
@@ -740,9 +673,10 @@
           for(int ai=0; ai<aggregations.length; ai++) {
             // Calculate the parameters 
             Object[] o = new Object[aggregationParameterFields[ai].length];
-            for(int pi=0; pi<aggregationParameterFields[ai].length; pi++) 
+            for(int pi=0; pi<aggregationParameterFields[ai].length; pi++) { 
               o[pi] = null;
-            FunctionRegistry.invoke(aggregationsAggregateMethods[ai], aggregations[ai], o);
+            }
+            aggregationEvaluators[ai].aggregate(aggregations[ai], o);
           }
           
           // create dummy keys - size 0
@@ -753,7 +687,7 @@
             LOG.warn("Begin Hash Table flush at close: size = " + hashAggregations.size());
             Iterator iter = hashAggregations.entrySet().iterator();
             while (iter.hasNext()) {
-              Map.Entry<ArrayList<Object>, UDAFEvaluator[]> m = (Map.Entry)iter.next();
+              Map.Entry<ArrayList<Object>, AggregationBuffer[]> m = (Map.Entry)iter.next();
               forward(m.getKey(), m.getValue());
               iter.remove();
             }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java?rev=794129&r1=794128&r2=794129&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java Wed Jul 15 01:52:41 2009
@@ -29,6 +29,7 @@
 import org.apache.hadoop.hive.ql.plan.reduceSinkDesc;
 import org.apache.hadoop.hive.ql.plan.tableDesc;
 import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
 import org.apache.hadoop.hive.serde2.Serializer;
 import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -204,8 +205,9 @@
     }
     
     try {
-      if (out != null)
+      if (out != null) {
         out.collect(keyWritable, value);
+      }
     } catch (IOException e) {
       throw new HiveException (e);
     }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/UDFArgumentException.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/UDFArgumentException.java?rev=794129&r1=794128&r2=794129&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/UDFArgumentException.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/UDFArgumentException.java Wed Jul 15 01:52:41 2009
@@ -18,12 +18,12 @@
 
 package org.apache.hadoop.hive.ql.exec;
 
-import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
 
 /**                                                                                     
  * exception class, thrown when udf argument have something wrong.
  */
-public class UDFArgumentException extends HiveException {
+public class UDFArgumentException extends SemanticException {
 
   public UDFArgumentException() {
     super();

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java?rev=794129&r1=794128&r2=794129&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java Wed Jul 15 01:52:41 2009
@@ -404,7 +404,7 @@
     reduce.getSchema().setSignature(sig);
     reduceConf.setOutputValueColumnNames(newOutputColNames);
     reduceConf.setValueCols(newValueEval);
-    tableDesc newValueTable = PlanUtils.getLazySimpleSerDeTableDesc(PlanUtils.getFieldSchemasFromColumnList(
+    tableDesc newValueTable = PlanUtils.getReduceValueTableDesc(PlanUtils.getFieldSchemasFromColumnList(
         reduceConf.getValueCols(), newOutputColNames, 0, ""));
     reduceConf.setValueSerializeInfo(newValueTable);
   }
@@ -508,7 +508,7 @@
         }
 
         tableDesc valueTableDesc = PlanUtils
-            .getLazySimpleSerDeTableDesc(PlanUtils
+            .getMapJoinValueTableDesc(PlanUtils
                 .getFieldSchemasFromColumnList(valueCols, "mapjoinvalue"));
 
         valueTableDescs.add(valueTableDesc);

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java?rev=794129&r1=794128&r2=794129&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java Wed Jul 15 01:52:41 2009
@@ -113,8 +113,8 @@
       uPlan = (mapredWork)uTask.getWork();
     }
 
-    tableDesc tt_desc = 
-      PlanUtils.getBinaryTableDesc(PlanUtils.getFieldSchemasFromRowSchema(parent.getSchema(), "temporarycol")); 
+    tableDesc tt_desc = PlanUtils.getIntermediateFileTableDesc(
+          PlanUtils.getFieldSchemasFromRowSchema(parent.getSchema(), "temporarycol")); 
     
     // generate the temporary file
     Context baseCtx = parseCtx.getContext();

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java?rev=794129&r1=794128&r2=794129&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java Wed Jul 15 01:52:41 2009
@@ -615,7 +615,7 @@
     
     Operator<? extends Serializable> parent = op.getParentOperators().get(posn);
     tableDesc tt_desc = 
-      PlanUtils.getBinaryTableDesc(PlanUtils.getFieldSchemasFromRowSchema(parent.getSchema(), "temporarycol")); 
+      PlanUtils.getIntermediateFileTableDesc(PlanUtils.getFieldSchemasFromRowSchema(parent.getSchema(), "temporarycol")); 
     
     // Create a file sink operator for this file name
     Operator<? extends Serializable> fs_op =

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java?rev=794129&r1=794128&r2=794129&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java Wed Jul 15 01:52:41 2009
@@ -197,7 +197,8 @@
       Task<? extends Serializable> mjTask = TaskFactory.get(mjPlan, parseCtx.getConf());
       
       tableDesc tt_desc = 
-        PlanUtils.getLazySimpleSerDeTableDesc(PlanUtils.sortFieldSchemas(PlanUtils.getFieldSchemasFromRowSchema(mapJoin.getSchema(), "temporarycol"))); 
+        PlanUtils.getIntermediateFileTableDesc(PlanUtils.sortFieldSchemas(
+            PlanUtils.getFieldSchemasFromRowSchema(mapJoin.getSchema(), "temporarycol"))); 
       
       // generate the temporary file
       Context baseCtx = parseCtx.getContext();

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java?rev=794129&r1=794128&r2=794129&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java Wed Jul 15 01:52:41 2009
@@ -225,7 +225,7 @@
     }
     
     tableDesc keyTableDesc = 
-      PlanUtils.getLazySimpleSerDeTableDesc(PlanUtils.getFieldSchemasFromColumnList(keyCols, "mapjoinkey"));
+      PlanUtils.getMapJoinKeyTableDesc(PlanUtils.getFieldSchemasFromColumnList(keyCols, "mapjoinkey"));
 
     List<tableDesc> valueTableDescs = new ArrayList<tableDesc>();
     
@@ -237,7 +237,7 @@
       }
               
       tableDesc valueTableDesc = 
-        PlanUtils.getLazySimpleSerDeTableDesc(PlanUtils.getFieldSchemasFromColumnList(valueCols, "mapjoinvalue"));
+        PlanUtils.getMapJoinValueTableDesc(PlanUtils.getFieldSchemasFromColumnList(valueCols, "mapjoinvalue"));
     
       valueTableDescs.add(valueTableDesc);
     }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java?rev=794129&r1=794128&r2=794129&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java Wed Jul 15 01:52:41 2009
@@ -110,7 +110,10 @@
 import org.apache.hadoop.hive.ql.plan.tableScanDesc;
 import org.apache.hadoop.hive.ql.plan.unionDesc;
 import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFResolver;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFHash;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.Mode;
 import org.apache.hadoop.hive.serde2.Deserializer;
 import org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe;
 import org.apache.hadoop.hive.serde2.SerDeException;
@@ -284,7 +287,7 @@
       assert (expressionTree.getChildCount() != 0);
       if (expressionTree.getChild(0).getType() == HiveParser.Identifier) {
         String functionName = unescapeIdentifier(expressionTree.getChild(0).getText());
-        if (FunctionRegistry.getUDAF(functionName) != null) {
+        if (FunctionRegistry.getGenericUDAFResolver(functionName) != null) {
           aggregations.put(expressionTree.toStringTree(), expressionTree);
           return;
         }
@@ -972,7 +975,7 @@
   }
 
   @SuppressWarnings("nls")
-  public Operator<? extends Serializable> putOpInsertMap(Operator<? extends Serializable> op, RowResolver rr) 
+  public <T extends Serializable> Operator<T> putOpInsertMap(Operator<T> op, RowResolver rr) 
   {
     OpParseContext ctx = new OpParseContext(rr);
     opParseCtx.put(op, ctx);
@@ -1347,93 +1350,123 @@
   }
 
   /**
-   * Class to store UDAF related information.
+   * Class to store GenericUDAF related information.
    */
-  static class UDAFInfo {
+  static class GenericUDAFInfo {
     ArrayList<exprNodeDesc> convertedParameters;
-    Class<?> retType;
-    Class<? extends UDAFEvaluator> evalClass;
-    Method evalMethod;
-    Method aggMethod;
+    GenericUDAFEvaluator genericUDAFEvaluator;
+    TypeInfo returnType;
   }
 
   /**
-   * Returns the UDAFInfo struct for the aggregation
+   * Convert exprNodeDesc array to Typeinfo array. 
+   */
+  static ArrayList<TypeInfo> getTypeInfo(ArrayList<exprNodeDesc> exprs) {
+    ArrayList<TypeInfo> result = new ArrayList<TypeInfo>();
+    for(exprNodeDesc expr: exprs) {
+      result.add(expr.getTypeInfo());
+    }
+    return result;
+  }
+  
+  /**
+   * Convert exprNodeDesc array to Typeinfo array. 
+   */
+  static ObjectInspector[] getStandardObjectInspector(ArrayList<TypeInfo> exprs) {
+    ObjectInspector[] result = new ObjectInspector[exprs.size()];
+    for (int i=0; i<exprs.size(); i++) {
+      result[i] = TypeInfoUtils
+          .getStandardWritableObjectInspectorFromTypeInfo(exprs.get(i));
+    }
+    return result;
+  }
+  
+  /**
+   * Returns the GenericUDAFEvaluator for the aggregation.
+   * This is called once for each GroupBy aggregation.
+   */
+  static GenericUDAFEvaluator getGenericUDAFEvaluator(String aggName, 
+      ArrayList<exprNodeDesc> aggParameters, 
+      ASTNode aggTree) throws SemanticException {
+    ArrayList<TypeInfo> originalParameterTypeInfos = getTypeInfo(aggParameters);
+    GenericUDAFEvaluator result = FunctionRegistry.getGenericUDAFEvaluator(
+        aggName, originalParameterTypeInfos);
+    if (null == result) {
+      String reason = "Looking for UDAF Evaluator\"" + aggName + "\" with parameters " 
+          + originalParameterTypeInfos;
+      throw new SemanticException(ErrorMsg.INVALID_FUNCTION_SIGNATURE.
+          getMsg((ASTNode)aggTree.getChild(0), reason));
+    }
+    return result;
+  }
+  
+  /**
+   * Returns the GenericUDAFInfo struct for the aggregation.
    * @param aggName  The name of the UDAF.
-   * @param mode     The mode of the aggregation. This affects the evaluate method.
-   * @param aggParameters  The actual exprNodeDesc of the parameters.
+   * @param aggParameters  The exprNodeDesc of the original parameters 
    * @param aggTree   The ASTNode node of the UDAF in the query.
-   * @return UDAFInfo
+   * @return GenericUDAFInfo 
    * @throws SemanticException when the UDAF is not found or has problems.
    */
-  UDAFInfo getUDAFInfo(String aggName, groupByDesc.Mode mode,
-      ArrayList<exprNodeDesc> aggParameters, ASTNode aggTree) throws SemanticException {
-    UDAFInfo r = new UDAFInfo();
-    ArrayList<TypeInfo> aggTypeInfos = new ArrayList<TypeInfo>();
-    for(exprNodeDesc expr: aggParameters) {
-      aggTypeInfos.add(expr.getTypeInfo());
-    }
-    r.evalClass = FunctionRegistry.getUDAFEvaluator(aggName, aggTypeInfos);
-    if (null == r.evalClass) {
-      String reason = "Looking for UDAF Evaluator\"" + aggName + "\" with parameters " + aggTypeInfos;
-      throw new SemanticException(ErrorMsg.INVALID_FUNCTION_SIGNATURE.getMsg((ASTNode)aggTree.getChild(0), reason));
-    }
+  static GenericUDAFInfo getGenericUDAFInfo(GenericUDAFEvaluator evaluator, 
+      GenericUDAFEvaluator.Mode emode, ArrayList<exprNodeDesc> aggParameters) 
+      throws SemanticException {
     
-    r.aggMethod = null;
-    String funcName = (((mode == groupByDesc.Mode.PARTIAL1) || (mode == groupByDesc.Mode.HASH) ||
-                        (mode == groupByDesc.Mode.COMPLETE)) ? "iterate" : "merge");
-    if (aggTree.getToken().getType() == HiveParser.TOK_FUNCTIONDI && (mode != groupByDesc.Mode.FINAL))
-        funcName = "iterate";
-
-    for(Method m: r.evalClass.getMethods()) {
-      if (m.getName().equalsIgnoreCase(funcName)) {
-        r.aggMethod = m;
-      }
-    }
+    GenericUDAFInfo r = new GenericUDAFInfo();
     
-    if (null == r.aggMethod) {
-      String reason = "Looking for UDAF Evaluator Iterator\"" + aggName + "\" with parameters " + aggTypeInfos;
-      throw new SemanticException(ErrorMsg.INVALID_FUNCTION_SIGNATURE.getMsg((ASTNode)aggTree.getChild(0), reason));
-    }
-
-    r.convertedParameters = convertParameters(r.aggMethod, aggParameters);
+    // set r.genericUDAFEvaluator
+    r.genericUDAFEvaluator = evaluator;
 
-    funcName = ((mode == groupByDesc.Mode.PARTIAL1 || mode == groupByDesc.Mode.HASH ||
-                 mode == groupByDesc.Mode.PARTIAL2) ? "terminatePartial" : "terminate");
-    r.evalMethod = null;
-    for(Method m: r.evalClass.getMethods()) {
-      if (m.getName().equalsIgnoreCase(funcName)) {
-        r.evalMethod = m;
-      }
-    }
-    if (r.evalMethod == null) {
-      String reason = "UDAF \"" + aggName + "\" does not have terminate()/terminatePartial() methods.";
-      throw new SemanticException(ErrorMsg.INVALID_FUNCTION.getMsg((ASTNode)aggTree.getChild(0), reason)); 
+    // set r.returnType
+    ObjectInspector returnOI = null;
+    try {
+      ObjectInspector[] aggObjectInspectors = 
+          getStandardObjectInspector(getTypeInfo(aggParameters));
+      returnOI = r.genericUDAFEvaluator.init(emode, aggObjectInspectors);
+      r.returnType = TypeInfoUtils.getTypeInfoFromObjectInspector(returnOI);
+    } catch (HiveException e) {
+      throw new SemanticException(e);
     }
-    r.retType = r.evalMethod.getReturnType();
+    // set r.convertedParameters
+    // TODO: type conversion
+    r.convertedParameters = aggParameters;
     
     return r;
   }
-  
+
+  private static GenericUDAFEvaluator.Mode groupByDescModeToUDAFMode(groupByDesc.Mode mode, boolean isDistinct) {
+    switch (mode) {
+      case COMPLETE: return GenericUDAFEvaluator.Mode.COMPLETE;
+      case PARTIAL1: return GenericUDAFEvaluator.Mode.PARTIAL1;
+      case PARTIAL2: return GenericUDAFEvaluator.Mode.PARTIAL2;
+      case PARTIALS: return isDistinct ? GenericUDAFEvaluator.Mode.PARTIAL1 : GenericUDAFEvaluator.Mode.PARTIAL2;
+      case FINAL: return GenericUDAFEvaluator.Mode.FINAL;
+      case HASH: return GenericUDAFEvaluator.Mode.PARTIAL1;
+      case MERGEPARTIAL: return isDistinct ? GenericUDAFEvaluator.Mode.COMPLETE : GenericUDAFEvaluator.Mode.FINAL;
+      default:
+        throw new RuntimeException("internal error in groupByDescModeToUDAFMode");
+    }
+  }
   /**
    * Generate the GroupByOperator for the Query Block (parseInfo.getXXX(dest)).
    * The new GroupByOperator will be a child of the reduceSinkOperatorInfo.
    * 
    * @param mode The mode of the aggregation (PARTIAL1 or COMPLETE)
+   * @param genericUDAFEvaluators  If not null, this function will store the mapping
+   *            from Aggregation StringTree to the genericUDAFEvaluator in this parameter,
+   *            so it can be used in the next-stage GroupBy aggregations. 
    * @return the new GroupByOperator
    */
   @SuppressWarnings("nls")
   private Operator genGroupByPlanGroupByOperator(
         QBParseInfo parseInfo, String dest, Operator reduceSinkOperatorInfo,
-        groupByDesc.Mode mode)
+        groupByDesc.Mode mode, Map<String, GenericUDAFEvaluator> genericUDAFEvaluators)
     throws SemanticException {
     RowResolver groupByInputRowResolver = opParseCtx.get(reduceSinkOperatorInfo).getRR();
     RowResolver groupByOutputRowResolver = new RowResolver();
     groupByOutputRowResolver.setIsExprResolver(true);
     ArrayList<exprNodeDesc> groupByKeys = new ArrayList<exprNodeDesc>();
     ArrayList<aggregationDesc> aggregations = new ArrayList<aggregationDesc>();
-    ArrayList<String> evalMethods = new ArrayList<String>();
-    ArrayList<String> aggMethods = new ArrayList<String>();
     ArrayList<String> outputColumnNames = new ArrayList<String>();
     Map<String, exprNodeDesc> colExprMap = new HashMap<String, exprNodeDesc>();
     List<ASTNode> grpByExprs = getGroupByForClause(parseInfo, dest);
@@ -1459,9 +1492,11 @@
     assert (aggregationTrees != null);
     for (Map.Entry<String, ASTNode> entry : aggregationTrees.entrySet()) {
       ASTNode value = entry.getValue();
+      
+      // This is the GenericUDAF name
       String aggName = value.getChild(0).getText();
-      Class<? extends UDAF> aggClass = FunctionRegistry.getUDAF(aggName);
-      assert (aggClass != null);
+      
+      // Convert children to aggParameters
       ArrayList<exprNodeDesc> aggParameters = new ArrayList<exprNodeDesc>();
       // 0 is the function name
       for (int i = 1; i < value.getChildCount(); i++) {
@@ -1476,22 +1511,27 @@
         assert(paraExpression != null);
         aggParameters.add(new exprNodeColumnDesc(paraExprInfo.getType(), paraExprInfo.getInternalName()));
       }
-
-      UDAFInfo udaf = getUDAFInfo(aggName, mode, aggParameters, value);
       
-      aggregations.add(new aggregationDesc(udaf.evalClass, udaf.convertedParameters,
-          value.getToken().getType() == HiveParser.TOK_FUNCTIONDI));
-      evalMethods.add(udaf.evalMethod.getName());
-      aggMethods.add(udaf.aggMethod.getName());
+      boolean isDistinct = value.getType() == HiveParser.TOK_FUNCTIONDI;
+      Mode amode = groupByDescModeToUDAFMode(mode, isDistinct);
+      GenericUDAFEvaluator genericUDAFEvaluator = getGenericUDAFEvaluator(aggName, aggParameters, value);
+      assert(genericUDAFEvaluator != null);
+      GenericUDAFInfo udaf = getGenericUDAFInfo(genericUDAFEvaluator, amode, aggParameters);
+      aggregations.add(new aggregationDesc(aggName.toLowerCase(), udaf.genericUDAFEvaluator, udaf.convertedParameters,
+          isDistinct, amode));
       String field = getColumnInternalName(groupByKeys.size() + aggregations.size() -1);
       outputColumnNames.add(field);
       groupByOutputRowResolver.put("",value.toStringTree(),
                                    new ColumnInfo(field,
-                                       udaf.retType));
+                                       udaf.returnType));
+      // Save the evaluator so that it can be used by the next-stage GroupByOperators 
+      if (genericUDAFEvaluators != null) {
+        genericUDAFEvaluators.put(entry.getKey(), genericUDAFEvaluator);
+      }
     }
 
     Operator op =  
-      putOpInsertMap(OperatorFactory.getAndMakeChild(new groupByDesc(mode, outputColumnNames, groupByKeys, aggregations, evalMethods, aggMethods),
+      putOpInsertMap(OperatorFactory.getAndMakeChild(new groupByDesc(mode, outputColumnNames, groupByKeys, aggregations),
                                                      new RowSchema(groupByOutputRowResolver.getColumnInfos()),
                                                      reduceSinkOperatorInfo),
         groupByOutputRowResolver
@@ -1504,13 +1544,15 @@
    * Generate the GroupByOperator for the Query Block (parseInfo.getXXX(dest)).
    * The new GroupByOperator will be a child of the reduceSinkOperatorInfo.
    * 
-   * @param mode The mode of the aggregation (PARTIAL2)
+   * @param mode The mode of the aggregation (MERGEPARTIAL, PARTIAL2)
+   * @param genericUDAFEvaluators  The mapping from Aggregation StringTree to the 
+   *            genericUDAFEvaluator. 
    * @return the new GroupByOperator
    */
   @SuppressWarnings("nls")
   private Operator genGroupByPlanGroupByOperator1(
         QBParseInfo parseInfo, String dest, Operator reduceSinkOperatorInfo,
-        groupByDesc.Mode mode)
+        groupByDesc.Mode mode, Map<String, GenericUDAFEvaluator> genericUDAFEvaluators)
     throws SemanticException {
     ArrayList<String> outputColumnNames = new ArrayList<String>();
     RowResolver groupByInputRowResolver = opParseCtx.get(reduceSinkOperatorInfo).getRR();
@@ -1518,8 +1560,6 @@
     groupByOutputRowResolver.setIsExprResolver(true);
     ArrayList<exprNodeDesc> groupByKeys = new ArrayList<exprNodeDesc>();
     ArrayList<aggregationDesc> aggregations = new ArrayList<aggregationDesc>();
-    ArrayList<String> evalMethods = new ArrayList<String>();
-    ArrayList<String> aggMethods = new ArrayList<String>();
     List<ASTNode> grpByExprs = getGroupByForClause(parseInfo, dest);
     Map<String, exprNodeDesc> colExprMap = new HashMap<String, exprNodeDesc>();
     for (int i = 0; i < grpByExprs.size(); ++i) {
@@ -1544,8 +1584,6 @@
     for (Map.Entry<String, ASTNode> entry : aggregationTrees.entrySet()) {
       ASTNode value = entry.getValue();
       String aggName = value.getChild(0).getText();
-      Class<? extends UDAF> aggClass = FunctionRegistry.getUDAF(aggName);
-      assert (aggClass != null);
       ArrayList<exprNodeDesc> aggParameters = new ArrayList<exprNodeDesc>();
 
       if (value.getToken().getType() == HiveParser.TOK_FUNCTIONDI) {
@@ -1574,21 +1612,22 @@
         assert(paraExpression != null);
         aggParameters.add(new exprNodeColumnDesc(paraExprInfo.getType(), paraExpression));
       }
-
-      UDAFInfo udaf = getUDAFInfo(aggName, mode, aggParameters, value);
-      aggregations.add(new aggregationDesc(udaf.evalClass, udaf.convertedParameters, 
-          ((mode == groupByDesc.Mode.FINAL) ? false : (value.getToken().getType() == HiveParser.TOK_FUNCTIONDI))));
-      evalMethods.add(udaf.evalMethod.getName());
-      aggMethods.add(udaf.aggMethod.getName());
+      boolean isDistinct = (value.getType() == HiveParser.TOK_FUNCTIONDI);
+      Mode amode = groupByDescModeToUDAFMode(mode, isDistinct); 
+      GenericUDAFEvaluator genericUDAFEvaluator = genericUDAFEvaluators.get(entry.getKey());
+      assert(genericUDAFEvaluator != null);
+      GenericUDAFInfo udaf = getGenericUDAFInfo(genericUDAFEvaluator, amode, aggParameters);
+      aggregations.add(new aggregationDesc(aggName.toLowerCase(), udaf.genericUDAFEvaluator, udaf.convertedParameters, 
+          (mode != groupByDesc.Mode.FINAL && isDistinct), amode));
       String field = getColumnInternalName(groupByKeys.size() + aggregations.size() - 1);
       outputColumnNames.add(field);
       groupByOutputRowResolver.put("", value.toStringTree(),
                                     new ColumnInfo(field,
-                                        udaf.retType));
+                                        udaf.returnType));
     }
 
     Operator op = putOpInsertMap(
-        OperatorFactory.getAndMakeChild(new groupByDesc(mode, outputColumnNames, groupByKeys, aggregations, evalMethods, aggMethods),
+        OperatorFactory.getAndMakeChild(new groupByDesc(mode, outputColumnNames, groupByKeys, aggregations),
                                         new RowSchema(groupByOutputRowResolver.getColumnInfos()),
                                         reduceSinkOperatorInfo),
         groupByOutputRowResolver);
@@ -1601,11 +1640,14 @@
    * The new GroupByOperator will be a child of the inputOperatorInfo.
    * 
    * @param mode The mode of the aggregation (HASH)
+   * @param genericUDAFEvaluators  If not null, this function will store the mapping
+   *            from Aggregation StringTree to the genericUDAFEvaluator in this parameter,
+   *            so it can be used in the next-stage GroupBy aggregations. 
    * @return the new GroupByOperator
    */
   @SuppressWarnings("nls")
   private Operator genGroupByPlanMapGroupByOperator(QB qb, String dest, Operator inputOperatorInfo, 
-                                                    groupByDesc.Mode mode) throws SemanticException {
+        groupByDesc.Mode mode, Map<String, GenericUDAFEvaluator> genericUDAFEvaluators) throws SemanticException {
 
     RowResolver groupByInputRowResolver = opParseCtx.get(inputOperatorInfo).getRR();
     QBParseInfo parseInfo = qb.getParseInfo();
@@ -1614,8 +1656,6 @@
     ArrayList<exprNodeDesc> groupByKeys = new ArrayList<exprNodeDesc>();
     ArrayList<String> outputColumnNames = new ArrayList<String>();
     ArrayList<aggregationDesc> aggregations = new ArrayList<aggregationDesc>();
-    ArrayList<String> evalMethods = new ArrayList<String>();
-    ArrayList<String> aggMethods = new ArrayList<String>();
     Map<String, exprNodeDesc> colExprMap = new HashMap<String, exprNodeDesc>();
     List<ASTNode> grpByExprs = getGroupByForClause(parseInfo, dest);
     for (int i = 0; i < grpByExprs.size(); ++i) {
@@ -1658,8 +1698,6 @@
     for (Map.Entry<String, ASTNode> entry : aggregationTrees.entrySet()) {
       ASTNode value = entry.getValue();
       String aggName = value.getChild(0).getText();
-      Class<? extends UDAF> aggClass = FunctionRegistry.getUDAF(aggName);
-      assert (aggClass != null);
       ArrayList<exprNodeDesc> aggParameters = new ArrayList<exprNodeDesc>();
       ArrayList<Class<?>> aggClasses = new ArrayList<Class<?>>();
       // 0 is the function name
@@ -1670,21 +1708,27 @@
         aggParameters.add(paraExprNode);
       }
 
-      UDAFInfo udaf = getUDAFInfo(aggName, mode, aggParameters, value);
-      
-      aggregations.add(new aggregationDesc(udaf.evalClass, udaf.convertedParameters,
-                                           value.getToken().getType() == HiveParser.TOK_FUNCTIONDI));
-      evalMethods.add(udaf.evalMethod.getName());
-      aggMethods.add(udaf.aggMethod.getName());
+      boolean isDistinct = value.getType() == HiveParser.TOK_FUNCTIONDI;
+      Mode amode = groupByDescModeToUDAFMode(mode, isDistinct);
+
+      GenericUDAFEvaluator genericUDAFEvaluator = getGenericUDAFEvaluator(aggName, aggParameters, value);
+      assert(genericUDAFEvaluator != null);
+      GenericUDAFInfo udaf = getGenericUDAFInfo(genericUDAFEvaluator, amode, aggParameters);
+      aggregations.add(new aggregationDesc(aggName.toLowerCase(), udaf.genericUDAFEvaluator, udaf.convertedParameters,
+          isDistinct, amode));
       String field = getColumnInternalName(groupByKeys.size() + aggregations.size() -1);
       outputColumnNames.add(field);
       groupByOutputRowResolver.put("",value.toStringTree(),
                                    new ColumnInfo(field,
-                                       udaf.retType));
+                                       udaf.returnType));
+      // Save the evaluator so that it can be used by the next-stage GroupByOperators 
+      if (genericUDAFEvaluators != null) {
+        genericUDAFEvaluators.put(entry.getKey(), genericUDAFEvaluator);
+      }
     }
 
     Operator op = putOpInsertMap(
-      OperatorFactory.getAndMakeChild(new groupByDesc(mode, outputColumnNames, groupByKeys, aggregations, evalMethods, aggMethods),
+      OperatorFactory.getAndMakeChild(new groupByDesc(mode, outputColumnNames, groupByKeys, aggregations),
                                       new RowSchema(groupByOutputRowResolver.getColumnInfos()),
                                       inputOperatorInfo),
       groupByOutputRowResolver);
@@ -1880,20 +1924,21 @@
    * results.
    * 
    * @param mode the mode of aggregation (FINAL)  
+   * @param genericUDAFEvaluators  The mapping from Aggregation StringTree to the 
+   *            genericUDAFEvaluator. 
    * @return the new GroupByOperator
    * @throws SemanticException
    */
   @SuppressWarnings("nls")
   private Operator genGroupByPlanGroupByOperator2MR(
-    QBParseInfo parseInfo, String dest, Operator reduceSinkOperatorInfo2, groupByDesc.Mode mode)
-    throws SemanticException {
+        QBParseInfo parseInfo, String dest, Operator reduceSinkOperatorInfo2, 
+        groupByDesc.Mode mode, Map<String, GenericUDAFEvaluator> genericUDAFEvaluators)
+        throws SemanticException {
     RowResolver groupByInputRowResolver2 = opParseCtx.get(reduceSinkOperatorInfo2).getRR();
     RowResolver groupByOutputRowResolver2 = new RowResolver();
     groupByOutputRowResolver2.setIsExprResolver(true);
     ArrayList<exprNodeDesc> groupByKeys = new ArrayList<exprNodeDesc>();
     ArrayList<aggregationDesc> aggregations = new ArrayList<aggregationDesc>();
-    ArrayList<String> evalMethods = new ArrayList<String>();
-    ArrayList<String> aggMethods = new ArrayList<String>();
     Map<String, exprNodeDesc> colExprMap = new HashMap<String, exprNodeDesc>();
     List<ASTNode> grpByExprs = getGroupByForClause(parseInfo, dest);
     ArrayList<String> outputColumnNames = new ArrayList<String>(); 
@@ -1928,23 +1973,24 @@
       aggParameters.add(new exprNodeColumnDesc(paraExprInfo.getType(), paraExpression));
 
       String aggName = value.getChild(0).getText();
-      Class<? extends UDAF> aggClass = FunctionRegistry.getUDAF(aggName);
-      assert (aggClass != null);
 
-      UDAFInfo udaf = getUDAFInfo(aggName, mode, aggParameters, value);      
-      aggregations.add(new aggregationDesc(udaf.evalClass, udaf.convertedParameters, 
-                                           ((mode == groupByDesc.Mode.FINAL) ? false : (value.getToken().getType() == HiveParser.TOK_FUNCTIONDI))));
-      evalMethods.add(udaf.evalMethod.getName());
-      aggMethods.add(udaf.aggMethod.getName());
+      boolean isDistinct = value.getType() == HiveParser.TOK_FUNCTIONDI;
+      Mode amode = groupByDescModeToUDAFMode(mode, isDistinct); 
+      GenericUDAFEvaluator genericUDAFEvaluator = genericUDAFEvaluators.get(entry.getKey());
+      assert(genericUDAFEvaluator != null);
+      GenericUDAFInfo udaf = getGenericUDAFInfo(genericUDAFEvaluator, amode, aggParameters);
+      aggregations.add(new aggregationDesc(aggName.toLowerCase(), udaf.genericUDAFEvaluator, udaf.convertedParameters, 
+                                           (mode != groupByDesc.Mode.FINAL && value.getToken().getType() == HiveParser.TOK_FUNCTIONDI),
+                                           amode));
       String field = getColumnInternalName(groupByKeys.size() + aggregations.size() - 1);
       outputColumnNames.add(field);
       groupByOutputRowResolver2.put("", value.toStringTree(),
                                     new ColumnInfo(field,
-                                        udaf.retType));
+                                        udaf.returnType));
     }
 
     Operator op = putOpInsertMap(
-      OperatorFactory.getAndMakeChild(new groupByDesc(mode, outputColumnNames, groupByKeys, aggregations, evalMethods, aggMethods),
+      OperatorFactory.getAndMakeChild(new groupByDesc(mode, outputColumnNames, groupByKeys, aggregations),
                                       new RowSchema(groupByOutputRowResolver2.getColumnInfos()),
                                       reduceSinkOperatorInfo2),
         groupByOutputRowResolver2
@@ -1999,11 +2045,19 @@
 
     // ////// 2. Generate GroupbyOperator
     Operator groupByOperatorInfo = genGroupByPlanGroupByOperator(parseInfo,
-        dest, reduceSinkOperatorInfo, groupByDesc.Mode.COMPLETE);
+        dest, reduceSinkOperatorInfo, groupByDesc.Mode.COMPLETE, null);
 
     return groupByOperatorInfo;
   }
 
+  static ArrayList<GenericUDAFEvaluator> getUDAFEvaluators(ArrayList<aggregationDesc> aggs) {
+    ArrayList<GenericUDAFEvaluator> result = new ArrayList<GenericUDAFEvaluator>();
+    for (int i=0; i<aggs.size(); i++) {
+      result.add(aggs.get(i).createGenericUDAFEvaluator());
+    }
+    return result;
+  }
+  
   /**
    * Generate a Group-By plan using a 2 map-reduce jobs (5 operators will be
    * inserted):
@@ -2067,9 +2121,11 @@
                         : Integer.MAX_VALUE), -1, false);
 
     // ////// 2. Generate GroupbyOperator
-    Operator groupByOperatorInfo = genGroupByPlanGroupByOperator(parseInfo,
-        dest, reduceSinkOperatorInfo, groupByDesc.Mode.PARTIAL1);
-
+    Map<String, GenericUDAFEvaluator> genericUDAFEvaluators = 
+      new LinkedHashMap<String, GenericUDAFEvaluator>();
+    GroupByOperator groupByOperatorInfo = (GroupByOperator)genGroupByPlanGroupByOperator(parseInfo,
+        dest, reduceSinkOperatorInfo, groupByDesc.Mode.PARTIAL1, genericUDAFEvaluators);
+    
     int numReducers = -1;
     List<ASTNode> grpByExprs = getGroupByForClause(parseInfo, dest);
     if (grpByExprs.isEmpty())
@@ -2081,7 +2137,8 @@
 
     // ////// 4. Generate GroupbyOperator2
     Operator groupByOperatorInfo2 = 
-      genGroupByPlanGroupByOperator2MR(parseInfo, dest, reduceSinkOperatorInfo2, groupByDesc.Mode.FINAL);
+      genGroupByPlanGroupByOperator2MR(parseInfo, dest, reduceSinkOperatorInfo2, 
+          groupByDesc.Mode.FINAL, genericUDAFEvaluators);
 
     return groupByOperatorInfo2;
   }
@@ -2126,8 +2183,10 @@
     QBParseInfo parseInfo = qb.getParseInfo();
 
     // ////// Generate GroupbyOperator for a map-side partial aggregation
-    Operator groupByOperatorInfo = genGroupByPlanMapGroupByOperator(qb,
-      dest, inputOperatorInfo, groupByDesc.Mode.HASH);
+    Map<String, GenericUDAFEvaluator> genericUDAFEvaluators = 
+      new LinkedHashMap<String, GenericUDAFEvaluator>();
+    GroupByOperator groupByOperatorInfo = (GroupByOperator)genGroupByPlanMapGroupByOperator(qb,
+      dest, inputOperatorInfo, groupByDesc.Mode.HASH, genericUDAFEvaluators);
 
     int numReducers = -1;
 
@@ -2138,13 +2197,16 @@
     
     // ////// Generate ReduceSink Operator
     Operator reduceSinkOperatorInfo = 
-      genGroupByPlanReduceSinkOperator(qb, dest, groupByOperatorInfo, grpByExprs.size(), numReducers, true);
+      genGroupByPlanReduceSinkOperator(qb, dest, groupByOperatorInfo, 
+          grpByExprs.size(), numReducers, true);
     
     // This is a 1-stage map-reduce processing of the groupby. Tha map-side aggregates was just used to
     // reduce output data. In case of distincts, partial results are not used, and so iterate is again
     // invoked on the reducer. In case of non-distincts, partial results are used, and merge is invoked
     // on the reducer.
-    return genGroupByPlanGroupByOperator1(parseInfo, dest, reduceSinkOperatorInfo, groupByDesc.Mode.MERGEPARTIAL);
+    return genGroupByPlanGroupByOperator1(parseInfo, dest, 
+        reduceSinkOperatorInfo, groupByDesc.Mode.MERGEPARTIAL,
+        genericUDAFEvaluators);
   }
 
   /**
@@ -2172,7 +2234,7 @@
    * 
    *   Reducer: iterate/terminatePartial if DISTINCT
    *            merge/terminatePartial if NO DISTINCT
-   *   (mode = PARTIAL2)
+   *   (mode = MERGEPARTIAL)
    * 
    *   STAGE 2
    *
@@ -2193,9 +2255,11 @@
     QBParseInfo parseInfo = qb.getParseInfo();
 
     // ////// Generate GroupbyOperator for a map-side partial aggregation
-    Operator groupByOperatorInfo = genGroupByPlanMapGroupByOperator(qb,
-      dest, inputOperatorInfo, groupByDesc.Mode.HASH);
-
+    Map<String, GenericUDAFEvaluator> genericUDAFEvaluators = 
+      new LinkedHashMap<String, GenericUDAFEvaluator>();
+    GroupByOperator groupByOperatorInfo = (GroupByOperator)genGroupByPlanMapGroupByOperator(qb,
+      dest, inputOperatorInfo, groupByDesc.Mode.HASH, genericUDAFEvaluators);
+    
     // Optimize the scenario when there are no grouping keys and no distinct - 2 map-reduce jobs are not needed
     // For eg: select count(1) from T where t.ds = ....
     if (!optimizeMapAggrGroupBy(dest, qb)) {
@@ -2207,8 +2271,9 @@
                                           : Integer.MAX_VALUE), -1, true);
       
       // ////// Generate GroupbyOperator for a partial aggregation
-      Operator groupByOperatorInfo2 = genGroupByPlanGroupByOperator1(parseInfo, dest, reduceSinkOperatorInfo, 
-                                                                         groupByDesc.Mode.PARTIAL2);
+      Operator groupByOperatorInfo2 = genGroupByPlanGroupByOperator1(parseInfo,
+          dest, reduceSinkOperatorInfo, groupByDesc.Mode.PARTIALS,
+          genericUDAFEvaluators);
 
       int numReducers = -1;
       List<ASTNode> grpByExprs = getGroupByForClause(parseInfo, dest);
@@ -2220,14 +2285,14 @@
                                                                              grpByExprs.size(), numReducers);
 
       // ////// Generate GroupbyOperator3
-      return genGroupByPlanGroupByOperator2MR(parseInfo, dest, reduceSinkOperatorInfo2, groupByDesc.Mode.FINAL);
+      return genGroupByPlanGroupByOperator2MR(parseInfo, dest, reduceSinkOperatorInfo2, groupByDesc.Mode.FINAL, genericUDAFEvaluators);
     }
     else {
       // ////// Generate ReduceSink Operator
       Operator reduceSinkOperatorInfo = 
         genGroupByPlanReduceSinkOperator(qb, dest, groupByOperatorInfo, getGroupByForClause(parseInfo, dest).size(), 1, true);
       
-      return genGroupByPlanGroupByOperator2MR(parseInfo, dest, reduceSinkOperatorInfo, groupByDesc.Mode.FINAL);
+      return genGroupByPlanGroupByOperator2MR(parseInfo, dest, reduceSinkOperatorInfo, groupByDesc.Mode.FINAL, genericUDAFEvaluators);
     }
   }
 
@@ -2834,7 +2899,7 @@
       reduceSinkDesc now = ((ReduceSinkOperator)(oi)).getConf();
 
       now.setKeySerializeInfo(
-          PlanUtils.getBinarySortableTableDesc(
+          PlanUtils.getReduceKeyTableDesc(
               PlanUtils.getFieldSchemasFromColumnList(now.getKeyCols(), "joinkey"),
               now.getOrder()
           )

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticException.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticException.java?rev=794129&r1=794128&r2=794129&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticException.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticException.java Wed Jul 15 01:52:41 2009
@@ -18,11 +18,13 @@
 
 package org.apache.hadoop.hive.ql.parse;
 
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+
 /**
  * Exception from SemanticAnalyzer
  */
 
-public class SemanticException extends Exception {
+public class SemanticException extends HiveException {
 
     private static final long serialVersionUID = 1L;
 



Mime
View raw message