hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From na...@apache.org
Subject svn commit: r1028072 [1/7] - in /hive/trunk: ./ data/files/ ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/java/org/apache/hadoop/hive/ql/parse/ ql/src/java/org/apache/hadoop/hive/ql/plan/ ql/src/test/queries/clientnegative/ ql/src/test/queries/cli...
Date Wed, 27 Oct 2010 19:00:05 GMT
Author: namit
Date: Wed Oct 27 19:00:02 2010
New Revision: 1028072

URL: http://svn.apache.org/viewvc?rev=1028072&view=rev
Log:
HIVE-474 Support for distinct selection on two or more columns
(Amareshwari Sriramadasu via namit)


Added:
    hive/trunk/data/files/in4.txt
    hive/trunk/ql/src/test/queries/clientnegative/groupby2_map_skew_multi_distinct.q
    hive/trunk/ql/src/test/queries/clientnegative/groupby2_multi_distinct.q
    hive/trunk/ql/src/test/queries/clientnegative/groupby3_map_skew_multi_distinct.q
    hive/trunk/ql/src/test/queries/clientnegative/groupby3_multi_distinct.q
    hive/trunk/ql/src/test/queries/clientpositive/count.q
    hive/trunk/ql/src/test/queries/clientpositive/groupby2_map_multi_distinct.q
    hive/trunk/ql/src/test/queries/clientpositive/groupby2_noskew_multi_distinct.q
    hive/trunk/ql/src/test/queries/clientpositive/groupby3_map_multi_distinct.q
    hive/trunk/ql/src/test/queries/clientpositive/groupby3_noskew_multi_distinct.q
    hive/trunk/ql/src/test/queries/clientpositive/groupby_map_ppr_multi_distinct.q
    hive/trunk/ql/src/test/queries/clientpositive/groupby_ppr_multi_distinct.q
    hive/trunk/ql/src/test/queries/clientpositive/join18_multi_distinct.q
    hive/trunk/ql/src/test/queries/clientpositive/nullgroup4_multi_distinct.q
    hive/trunk/ql/src/test/results/clientnegative/groupby2_map_skew_multi_distinct.q.out
    hive/trunk/ql/src/test/results/clientnegative/groupby2_multi_distinct.q.out
    hive/trunk/ql/src/test/results/clientnegative/groupby3_map_skew_multi_distinct.q.out
    hive/trunk/ql/src/test/results/clientnegative/groupby3_multi_distinct.q.out
    hive/trunk/ql/src/test/results/clientpositive/count.q.out
    hive/trunk/ql/src/test/results/clientpositive/groupby2_map_multi_distinct.q.out
    hive/trunk/ql/src/test/results/clientpositive/groupby2_noskew_multi_distinct.q.out
    hive/trunk/ql/src/test/results/clientpositive/groupby3_map_multi_distinct.q.out
    hive/trunk/ql/src/test/results/clientpositive/groupby3_noskew_multi_distinct.q.out
    hive/trunk/ql/src/test/results/clientpositive/groupby_map_ppr_multi_distinct.q.out
    hive/trunk/ql/src/test/results/clientpositive/groupby_ppr_multi_distinct.q.out
    hive/trunk/ql/src/test/results/clientpositive/join18_multi_distinct.q.out
    hive/trunk/ql/src/test/results/clientpositive/nullgroup4_multi_distinct.q.out
Modified:
    hive/trunk/CHANGES.txt
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeColumnEvaluator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ErrorMsg.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java
    hive/trunk/ql/src/test/results/clientpositive/bucketmapjoin1.q.out
    hive/trunk/ql/src/test/results/clientpositive/bucketmapjoin2.q.out
    hive/trunk/ql/src/test/results/clientpositive/bucketmapjoin4.q.out
    hive/trunk/ql/src/test/results/clientpositive/groupby2.q.out
    hive/trunk/ql/src/test/results/clientpositive/groupby2_map.q.out
    hive/trunk/ql/src/test/results/clientpositive/groupby2_map_skew.q.out
    hive/trunk/ql/src/test/results/clientpositive/groupby2_noskew.q.out
    hive/trunk/ql/src/test/results/clientpositive/groupby3.q.out
    hive/trunk/ql/src/test/results/clientpositive/groupby3_map.q.out
    hive/trunk/ql/src/test/results/clientpositive/groupby3_map_skew.q.out
    hive/trunk/ql/src/test/results/clientpositive/groupby3_noskew.q.out
    hive/trunk/ql/src/test/results/clientpositive/groupby_map_ppr.q.out
    hive/trunk/ql/src/test/results/clientpositive/groupby_ppr.q.out
    hive/trunk/ql/src/test/results/clientpositive/join18.q.out
    hive/trunk/ql/src/test/results/clientpositive/join25.q.out
    hive/trunk/ql/src/test/results/clientpositive/join26.q.out
    hive/trunk/ql/src/test/results/clientpositive/join27.q.out
    hive/trunk/ql/src/test/results/clientpositive/join28.q.out
    hive/trunk/ql/src/test/results/clientpositive/join29.q.out
    hive/trunk/ql/src/test/results/clientpositive/join30.q.out
    hive/trunk/ql/src/test/results/clientpositive/join31.q.out
    hive/trunk/ql/src/test/results/clientpositive/join32.q.out
    hive/trunk/ql/src/test/results/clientpositive/join33.q.out
    hive/trunk/ql/src/test/results/clientpositive/join34.q.out
    hive/trunk/ql/src/test/results/clientpositive/join35.q.out
    hive/trunk/ql/src/test/results/clientpositive/join36.q.out
    hive/trunk/ql/src/test/results/clientpositive/join37.q.out
    hive/trunk/ql/src/test/results/clientpositive/join38.q.out
    hive/trunk/ql/src/test/results/clientpositive/join39.q.out
    hive/trunk/ql/src/test/results/clientpositive/join40.q.out
    hive/trunk/ql/src/test/results/clientpositive/nullgroup4.q.out
    hive/trunk/ql/src/test/results/clientpositive/udf_count.q.out
    hive/trunk/ql/src/test/results/compiler/plan/groupby1.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/groupby2.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/groupby3.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/groupby4.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/groupby5.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/groupby6.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/input20.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/input4.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/input5.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/join1.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/join2.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/join3.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/join4.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/join5.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/join6.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/join7.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/join8.q.xml

Modified: hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hive/trunk/CHANGES.txt?rev=1028072&r1=1028071&r2=1028072&view=diff
==============================================================================
--- hive/trunk/CHANGES.txt (original)
+++ hive/trunk/CHANGES.txt Wed Oct 27 19:00:02 2010
@@ -101,6 +101,9 @@ Trunk -  Unreleased
     HIVE-1709. Add Postgres metastore schema migration scripts (0.5 -> 0.6)
     (Yuanjun Li via namit)
 
+    HIVE-474 Support for distinct selection on two or more columns
+    (Amareshwari Sriramadasu via namit)
+
   IMPROVEMENTS
 
     HIVE-1394. Do not update transient_lastDdlTime if the partition is modified by a housekeeping

Added: hive/trunk/data/files/in4.txt
URL: http://svn.apache.org/viewvc/hive/trunk/data/files/in4.txt?rev=1028072&view=auto
==============================================================================
--- hive/trunk/data/files/in4.txt (added)
+++ hive/trunk/data/files/in4.txt Wed Oct 27 19:00:02 2010
@@ -0,0 +1,7 @@
+35236
+101000501
+100100103
+12802
+101005
+10100454
+12100757

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeColumnEvaluator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeColumnEvaluator.java?rev=1028072&r1=1028071&r2=1028072&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeColumnEvaluator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeColumnEvaluator.java Wed Oct 27 19:00:02 2010
@@ -23,6 +23,8 @@ import org.apache.hadoop.hive.ql.plan.Ex
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StandardUnionObjectInspector.StandardUnion;
 
 /**
  * This evaluator gets the column from the row object.
@@ -33,6 +35,7 @@ public class ExprNodeColumnEvaluator ext
 
   transient StructObjectInspector[] inspectors;
   transient StructField[] fields;
+  transient boolean[] unionField;
 
   public ExprNodeColumnEvaluator(ExprNodeColumnDesc expr) {
     this.expr = expr;
@@ -46,15 +49,32 @@ public class ExprNodeColumnEvaluator ext
     String[] names = expr.getColumn().split("\\.");
     inspectors = new StructObjectInspector[names.length];
     fields = new StructField[names.length];
+    unionField = new boolean[names.length];
+    int unionIndex = -1;
 
     for (int i = 0; i < names.length; i++) {
       if (i == 0) {
         inspectors[0] = (StructObjectInspector) rowInspector;
       } else {
-        inspectors[i] = (StructObjectInspector) fields[i - 1]
+        if (unionIndex != -1) {
+          inspectors[i] = (StructObjectInspector) (
+            (UnionObjectInspector)fields[i-1].getFieldObjectInspector()).
+            getObjectInspectors().get(unionIndex);
+        } else {
+          inspectors[i] = (StructObjectInspector) fields[i - 1]
             .getFieldObjectInspector();
+        }
+      }
+      // to support names like _colx:1._coly
+      String[] unionfields = names[i].split("\\:");
+      fields[i] = inspectors[i].getStructFieldRef(unionfields[0]);
+      if (unionfields.length > 1) {
+        unionIndex = Integer.parseInt(unionfields[1]);
+        unionField[i] = true;
+      } else {
+        unionIndex = -1;
+        unionField[i] = false;
       }
-      fields[i] = inspectors[i].getStructFieldRef(names[i]);
     }
     return fields[names.length - 1].getFieldObjectInspector();
   }
@@ -64,6 +84,9 @@ public class ExprNodeColumnEvaluator ext
     Object o = row;
     for (int i = 0; i < fields.length; i++) {
       o = inspectors[i].getStructFieldData(o, fields[i]);
+      if (unionField[i]) {
+        o = ((StandardUnion)o).getObject();
+      }
     }
     return o;
   }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java?rev=1028072&r1=1028071&r2=1028072&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java Wed Oct 27 19:00:02 2010
@@ -27,6 +27,7 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -35,22 +36,27 @@ import org.apache.hadoop.hive.conf.HiveC
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.parse.OpParseContext;
 import org.apache.hadoop.hive.ql.plan.AggregationDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.GroupByDesc;
 import org.apache.hadoop.hive.ql.plan.api.OperatorType;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer;
-import org.apache.hadoop.hive.serde2.objectinspector.ListObjectsEqualComparer;
 import org.apache.hadoop.hive.serde2.lazy.LazyPrimitive;
 import org.apache.hadoop.hive.serde2.lazy.objectinspector.primitive.LazyStringObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ListObjectsEqualComparer;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.UnionObject;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
 import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 import org.apache.hadoop.io.Text;
 
 /**
@@ -77,6 +83,16 @@ public class GroupByOperator extends Ope
   // the same SQL clause,
   // so aggregationIsDistinct is a boolean array instead of a single number.
   protected transient boolean[] aggregationIsDistinct;
+  // Map from integer tag to distinct aggrs
+  transient protected Map<Integer, Set<Integer>> distinctKeyAggrs =
+    new HashMap<Integer, Set<Integer>>();
+  // Map from integer tag to non-distinct aggrs with key parameters.
+  transient protected Map<Integer, Set<Integer>> nonDistinctKeyAggrs =
+    new HashMap<Integer, Set<Integer>>();
+  // List of non-distinct aggrs.
+  transient protected List<Integer> nonDistinctAggrs = new ArrayList<Integer>();
+  // Union expr for distinct keys
+  transient ExprNodeEvaluator unionExprEval = null;
 
   transient GenericUDAFEvaluator[] aggregationEvaluators;
 
@@ -187,17 +203,45 @@ public class GroupByOperator extends Ope
     }
     newKeys = new ArrayList<Object>(keyFields.length);
 
+    // initialize unionExpr for reduce-side
+    // reduce KEY has union field as the last field if there are distinct
+    // aggregates in group-by.
+    List<? extends StructField> sfs =
+      ((StandardStructObjectInspector) rowInspector).getAllStructFieldRefs();
+    if (sfs.size() > 0) {
+      StructField keyField = sfs.get(0);
+      if (keyField.getFieldName().toUpperCase().equals(
+          Utilities.ReduceField.KEY.name())) {
+        ObjectInspector keyObjInspector = keyField.getFieldObjectInspector();
+        if (keyObjInspector instanceof StandardStructObjectInspector) {
+          List<? extends StructField> keysfs =
+            ((StandardStructObjectInspector) keyObjInspector).getAllStructFieldRefs();
+          if (keysfs.size() > 0) {
+            // the last field is the union field, if any
+            StructField sf = keysfs.get(keysfs.size() - 1);
+            if (sf.getFieldObjectInspector().getCategory().equals(
+                ObjectInspector.Category.UNION)) {
+              unionExprEval = ExprNodeEvaluatorFactory.get(
+                new ExprNodeColumnDesc(TypeInfoUtils.getTypeInfoFromObjectInspector(
+                sf.getFieldObjectInspector()),
+                keyField.getFieldName() + "." + sf.getFieldName(), null,
+                false));
+              unionExprEval.initialize(rowInspector);
+            }
+          }
+        }
+      }
+    }
     // init aggregationParameterFields
-    aggregationParameterFields = new ExprNodeEvaluator[conf.getAggregators()
-        .size()][];
-    aggregationParameterObjectInspectors = new ObjectInspector[conf
-        .getAggregators().size()][];
-    aggregationParameterStandardObjectInspectors = new ObjectInspector[conf
-        .getAggregators().size()][];
-    aggregationParameterObjects = new Object[conf.getAggregators().size()][];
-    for (int i = 0; i < aggregationParameterFields.length; i++) {
-      ArrayList<ExprNodeDesc> parameters = conf.getAggregators().get(i)
-          .getParameters();
+    ArrayList<AggregationDesc> aggrs = conf.getAggregators();
+    aggregationParameterFields = new ExprNodeEvaluator[aggrs.size()][];
+    aggregationParameterObjectInspectors = new ObjectInspector[aggrs.size()][];
+    aggregationParameterStandardObjectInspectors = new ObjectInspector[aggrs.size()][];
+    aggregationParameterObjects = new Object[aggrs.size()][];
+    aggregationIsDistinct = new boolean[aggrs.size()];
+    for (int i = 0; i < aggrs.size(); i++) {
+      AggregationDesc aggr = aggrs.get(i);
+      ArrayList<ExprNodeDesc> parameters = aggr.getParameters();
       aggregationParameterFields[i] = new ExprNodeEvaluator[parameters.size()];
       aggregationParameterObjectInspectors[i] = new ObjectInspector[parameters
           .size()];
@@ -209,17 +253,55 @@ public class GroupByOperator extends Ope
             .get(parameters.get(j));
         aggregationParameterObjectInspectors[i][j] = aggregationParameterFields[i][j]
             .initialize(rowInspector);
+        if (unionExprEval != null) {
+          String[] names = parameters.get(j).getExprString().split("\\.");
+          // parameters of the form : KEY.colx:t.coly
+          if (Utilities.ReduceField.KEY.name().equals(names[0])) {
+            String name = names[names.length - 2];
+            int tag = Integer.parseInt(name.split("\\:")[1]);
+            if (aggr.getDistinct()) {
+              // is distinct
+              Set<Integer> set = distinctKeyAggrs.get(tag);
+              if (null == set) {
+                set = new HashSet<Integer>();
+                distinctKeyAggrs.put(tag, set);
+              }
+              if (!set.contains(i)) {
+                set.add(i);
+              }
+            } else {
+              Set<Integer> set = nonDistinctKeyAggrs.get(tag);
+              if (null == set) {
+                set = new HashSet<Integer>();
+                nonDistinctKeyAggrs.put(tag, set);
+              }
+              if (!set.contains(i)) {
+                set.add(i);
+              }
+            }
+          } else {
+            // will be VALUE._COLx
+            if (!nonDistinctAggrs.contains(i)) {
+              nonDistinctAggrs.add(i);
+            }
+          }
+        } else {
+          if (aggr.getDistinct()) {
+            aggregationIsDistinct[i] = true;
+          }
+        }
         aggregationParameterStandardObjectInspectors[i][j] = ObjectInspectorUtils
             .getStandardObjectInspector(
             aggregationParameterObjectInspectors[i][j],
             ObjectInspectorCopyOption.WRITABLE);
         aggregationParameterObjects[i][j] = null;
       }
-    }
-    // init aggregationIsDistinct
-    aggregationIsDistinct = new boolean[conf.getAggregators().size()];
-    for (int i = 0; i < aggregationIsDistinct.length; i++) {
-      aggregationIsDistinct[i] = conf.getAggregators().get(i).getDistinct();
+      if (parameters.size() == 0) {
+        // for ex: count(*)
+        if (!nonDistinctAggrs.contains(i)) {
+          nonDistinctAggrs.add(i);
+        }
+      }
     }
 
     // init aggregationClasses
@@ -482,37 +564,108 @@ public class GroupByOperator extends Ope
   protected void updateAggregations(AggregationBuffer[] aggs, Object row,
       ObjectInspector rowInspector, boolean hashAggr,
       boolean newEntryForHashAggr, Object[][] lastInvoke) throws HiveException {
+    if (unionExprEval == null) {
+      for (int ai = 0; ai < aggs.length; ai++) {
+        // Calculate the parameters
+        Object[] o = new Object[aggregationParameterFields[ai].length];
+        for (int pi = 0; pi < aggregationParameterFields[ai].length; pi++) {
+          o[pi] = aggregationParameterFields[ai][pi].evaluate(row);
+        }
 
-    for (int ai = 0; ai < aggs.length; ai++) {
-
-      // Calculate the parameters
-      Object[] o = new Object[aggregationParameterFields[ai].length];
-      for (int pi = 0; pi < aggregationParameterFields[ai].length; pi++) {
-        o[pi] = aggregationParameterFields[ai][pi].evaluate(row);
+        // Update the aggregations.
+        if (aggregationIsDistinct[ai]) {
+          if (hashAggr) {
+            if (newEntryForHashAggr) {
+              aggregationEvaluators[ai].aggregate(aggs[ai], o);
+            }
+          } else {
+            if (lastInvoke[ai] == null) {
+              lastInvoke[ai] = new Object[o.length];
+            }
+            if (ObjectInspectorUtils.compare(o,
+                aggregationParameterObjectInspectors[ai], lastInvoke[ai],
+                aggregationParameterStandardObjectInspectors[ai]) != 0) {
+              aggregationEvaluators[ai].aggregate(aggs[ai], o);
+              for (int pi = 0; pi < o.length; pi++) {
+                lastInvoke[ai][pi] = ObjectInspectorUtils.copyToStandardObject(
+                    o[pi], aggregationParameterObjectInspectors[ai][pi],
+                    ObjectInspectorCopyOption.WRITABLE);
+              }
+            }
+          }
+        } else {
+          aggregationEvaluators[ai].aggregate(aggs[ai], o);
+        }
       }
+      return;
+    }
 
-      // Update the aggregations.
-      if (aggregationIsDistinct[ai]) {
-        if (hashAggr) {
-          if (newEntryForHashAggr) {
-            aggregationEvaluators[ai].aggregate(aggs[ai], o);
+    if (distinctKeyAggrs.size() > 0) {
+      // evaluate union object
+      UnionObject uo = (UnionObject) (unionExprEval.evaluate(row));
+      int unionTag = uo.getTag();
+
+      // update non-distinct key aggregations : "KEY._colx:t._coly"
+      if (nonDistinctKeyAggrs.get(unionTag) != null) {
+        for (int pos : nonDistinctKeyAggrs.get(unionTag)) {
+          Object[] o = new Object[aggregationParameterFields[pos].length];
+          for (int pi = 0; pi < aggregationParameterFields[pos].length; pi++) {
+            o[pi] = aggregationParameterFields[pos][pi].evaluate(row);
           }
-        } else {
-          if (lastInvoke[ai] == null) {
-            lastInvoke[ai] = new Object[o.length];
+          aggregationEvaluators[pos].aggregate(aggs[pos], o);
+        }
+      }
+      // there may be multi distinct clauses for one column
+      // update them all.
+      if (distinctKeyAggrs.get(unionTag) != null) {
+        for (int i : distinctKeyAggrs.get(unionTag)) {
+          Object[] o = new Object[aggregationParameterFields[i].length];
+          for (int pi = 0; pi < aggregationParameterFields[i].length; pi++) {
+            o[pi] = aggregationParameterFields[i][pi].evaluate(row);
           }
-          if (ObjectInspectorUtils.compare(o,
-              aggregationParameterObjectInspectors[ai], lastInvoke[ai],
-              aggregationParameterStandardObjectInspectors[ai]) != 0) {
-            aggregationEvaluators[ai].aggregate(aggs[ai], o);
-            for (int pi = 0; pi < o.length; pi++) {
-              lastInvoke[ai][pi] = ObjectInspectorUtils.copyToStandardObject(
-                  o[pi], aggregationParameterObjectInspectors[ai][pi],
-                  ObjectInspectorCopyOption.WRITABLE);
+
+          if (hashAggr) {
+            if (newEntryForHashAggr) {
+              aggregationEvaluators[i].aggregate(aggs[i], o);
+            }
+          } else {
+            if (lastInvoke[i] == null) {
+              lastInvoke[i] = new Object[o.length];
+            }
+            if (ObjectInspectorUtils.compare(o,
+                aggregationParameterObjectInspectors[i],
+                lastInvoke[i],
+                aggregationParameterStandardObjectInspectors[i]) != 0) {
+              aggregationEvaluators[i].aggregate(aggs[i], o);
+              for (int pi = 0; pi < o.length; pi++) {
+                lastInvoke[i][pi] = ObjectInspectorUtils.copyToStandardObject(
+                    o[pi], aggregationParameterObjectInspectors[i][pi],
+                    ObjectInspectorCopyOption.WRITABLE);
+              }
             }
           }
         }
-      } else {
+      }
+
+      // update non-distinct value aggregations: 'VALUE._colx'
+      // these aggregations should be updated only once.
+      if (unionTag == 0) {
+        for (int pos : nonDistinctAggrs) {
+          Object[] o = new Object[aggregationParameterFields[pos].length];
+          for (int pi = 0; pi < aggregationParameterFields[pos].length; pi++) {
+            o[pi] = aggregationParameterFields[pos][pi].evaluate(row);
+          }
+          aggregationEvaluators[pos].aggregate(aggs[pos], o);
+        }
+      }
+    } else {
+      for (int ai = 0; ai < aggs.length; ai++) {
+        // there is no distinct aggregation,
+        // update all aggregations
+        Object[] o = new Object[aggregationParameterFields[ai].length];
+        for (int pi = 0; pi < aggregationParameterFields[ai].length; pi++) {
+          o[pi] = aggregationParameterFields[ai][pi].evaluate(row);
+        }
         aggregationEvaluators[ai].aggregate(aggs[ai], o);
       }
     }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java?rev=1028072&r1=1028071&r2=1028072&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java Wed Oct 27 19:00:02 2010
@@ -849,6 +849,20 @@ public abstract class Operator<T extends
   }
 
   /**
+   * Initialize an array of ExprNodeEvaluator from start, for specified length
+   * and return the result ObjectInspectors.
+   */
+  protected static ObjectInspector[] initEvaluators(ExprNodeEvaluator[] evals,
+      int start, int length,
+      ObjectInspector rowInspector) throws HiveException {
+    ObjectInspector[] result = new ObjectInspector[length];
+    for (int i = 0; i < length; i++) {
+      result[i] = evals[start + i].initialize(rowInspector);
+    }
+    return result;
+  }
+
+  /**
    * Initialize an array of ExprNodeEvaluator and put the return values into a
    * StructObjectInspector with integer field names.
    */

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java?rev=1028072&r1=1028071&r2=1028072&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java Wed Oct 27 19:00:02 2010
@@ -20,9 +20,13 @@ package org.apache.hadoop.hive.ql.exec;
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
 import java.util.Random;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.io.HiveKey;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
@@ -33,8 +37,11 @@ import org.apache.hadoop.hive.serde2.Ser
 import org.apache.hadoop.hive.serde2.Serializer;
 import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StandardUnionObjectInspector.StandardUnion;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
@@ -71,6 +78,8 @@ public class ReduceSinkOperator extends 
   transient Serializer valueSerializer;
   transient int tag;
   transient byte[] tagByte = new byte[1];
+  transient protected int numDistributionKeys;
+  transient protected int numDistinctExprs;
 
   @Override
   protected void initializeOp(Configuration hconf) throws HiveException {
@@ -82,6 +91,10 @@ public class ReduceSinkOperator extends 
         keyEval[i++] = ExprNodeEvaluatorFactory.get(e);
       }
 
+      numDistributionKeys = conf.getNumDistributionKeys();
+      distinctColIndices = conf.getDistinctColumnIndices();
+      numDistinctExprs = distinctColIndices.size();
+
       valueEval = new ExprNodeEvaluator[conf.getValueCols().size()];
       i = 0;
       for (ExprNodeDesc e : conf.getValueCols()) {
@@ -125,61 +138,76 @@ public class ReduceSinkOperator extends 
   transient StructObjectInspector valueObjectInspector;
   transient ObjectInspector[] partitionObjectInspectors;
 
-  transient Object[] cachedKeys;
+  transient Object[][] cachedKeys;
   transient Object[] cachedValues;
+  transient List<List<Integer>> distinctColIndices;
 
   boolean firstRow;
 
   transient Random random;
 
+  /**
+   * Initializes array of ExprNodeEvaluator. Adds Union field for distinct
+   * column indices for group by.
+   * Puts the return values into a StructObjectInspector with output column
+   * names.
+   *
+   * If distinctColIndices is empty, the object inspector is same as
+   * {@link Operator#initEvaluatorsAndReturnStruct(ExprNodeEvaluator[], List, ObjectInspector)}
+   */
+  protected static StructObjectInspector initEvaluatorsAndReturnStruct(
+      ExprNodeEvaluator[] evals, List<List<Integer>> distinctColIndices,
+      List<String> outputColNames,
+      int length, ObjectInspector rowInspector)
+      throws HiveException {
+    int inspectorLen = evals.length > length ? length + 1 : evals.length;
+    List<ObjectInspector> sois = new ArrayList<ObjectInspector>(inspectorLen);
+
+    // keys
+    ObjectInspector[] fieldObjectInspectors = initEvaluators(evals, 0, length, rowInspector);
+    sois.addAll(Arrays.asList(fieldObjectInspectors));
+
+    if (evals.length > length) {
+      // union keys
+      List<ObjectInspector> uois = new ArrayList<ObjectInspector>();
+      for (List<Integer> distinctCols : distinctColIndices) {
+        List<String> names = new ArrayList<String>();
+        List<ObjectInspector> eois = new ArrayList<ObjectInspector>();
+        int numExprs = 0;
+        for (int i : distinctCols) {
+          names.add(HiveConf.getColumnInternalName(numExprs));
+          eois.add(evals[i].initialize(rowInspector));
+          numExprs++;
+        }
+        uois.add(ObjectInspectorFactory.getStandardStructObjectInspector(names, eois));
+      }
+      UnionObjectInspector uoi =
+        ObjectInspectorFactory.getStandardUnionObjectInspector(uois);
+      sois.add(uoi);
+    }
+    return ObjectInspectorFactory.getStandardStructObjectInspector(outputColNames, sois );
+  }
+
   @Override
   public void processOp(Object row, int tag) throws HiveException {
     try {
       ObjectInspector rowInspector = inputObjInspectors[tag];
       if (firstRow) {
         firstRow = false;
-        keyObjectInspector = initEvaluatorsAndReturnStruct(keyEval, conf
-            .getOutputKeyColumnNames(), rowInspector);
+        keyObjectInspector = initEvaluatorsAndReturnStruct(keyEval,
+            distinctColIndices,
+            conf.getOutputKeyColumnNames(), numDistributionKeys, rowInspector);
         valueObjectInspector = initEvaluatorsAndReturnStruct(valueEval, conf
             .getOutputValueColumnNames(), rowInspector);
         partitionObjectInspectors = initEvaluators(partitionEval, rowInspector);
-
-        cachedKeys = new Object[keyEval.length];
+        int numKeys = numDistinctExprs > 0 ? numDistinctExprs : 1;
+        int keyLen = numDistinctExprs > 0 ? numDistributionKeys + 1 :
+          numDistributionKeys;
+        cachedKeys = new Object[numKeys][keyLen];
         cachedValues = new Object[valueEval.length];
       }
 
-      // Evaluate the keys
-      for (int i = 0; i < keyEval.length; i++) {
-        cachedKeys[i] = keyEval[i].evaluate(row);
-      }
-
-      // Serialize the keys and append the tag
-      if (keyIsText) {
-        Text key = (Text) keySerializer.serialize(cachedKeys,
-            keyObjectInspector);
-        if (tag == -1) {
-          keyWritable.set(key.getBytes(), 0, key.getLength());
-        } else {
-          int keyLength = key.getLength();
-          keyWritable.setSize(keyLength + 1);
-          System.arraycopy(key.getBytes(), 0, keyWritable.get(), 0, keyLength);
-          keyWritable.get()[keyLength] = tagByte[0];
-        }
-      } else {
-        // Must be BytesWritable
-        BytesWritable key = (BytesWritable) keySerializer.serialize(cachedKeys,
-            keyObjectInspector);
-        if (tag == -1) {
-          keyWritable.set(key.get(), 0, key.getSize());
-        } else {
-          int keyLength = key.getSize();
-          keyWritable.setSize(keyLength + 1);
-          System.arraycopy(key.get(), 0, keyWritable.get(), 0, keyLength);
-          keyWritable.get()[keyLength] = tagByte[0];
-        }
-      }
-
-      // Set the HashCode
+      // Evaluate the HashCode
       int keyHashCode = 0;
       if (partitionEval.length == 0) {
         // If no partition cols, just distribute the data uniformly to provide
@@ -199,7 +227,6 @@ public class ReduceSinkOperator extends 
               + ObjectInspectorUtils.hashCode(o, partitionObjectInspectors[i]);
         }
       }
-      keyWritable.setHashCode(keyHashCode);
 
       // Evaluate the value
       for (int i = 0; i < valueEval.length; i++) {
@@ -208,23 +235,71 @@ public class ReduceSinkOperator extends 
       // Serialize the value
       value = valueSerializer.serialize(cachedValues, valueObjectInspector);
 
-    } catch (SerDeException e) {
-      throw new HiveException(e);
-    }
+      // Evaluate the keys
+      Object[] distributionKeys = new Object[numDistributionKeys];
+      for (int i = 0; i < numDistributionKeys; i++) {
+        distributionKeys[i] = keyEval[i].evaluate(row);
+      }
 
-    try {
-      if (out != null) {
-        out.collect(keyWritable, value);
-        // Since this is a terminal operator, update counters explicitly -
-        // forward is not called
-        if (counterNameToEnum != null) {
-          ++outputRows;
-          if (outputRows % 1000 == 0) {
-            incrCounter(numOutputRowsCntr, outputRows);
-            outputRows = 0;
+      if (numDistinctExprs > 0) {
+        // with distinct key(s)
+        for (int i = 0; i < numDistinctExprs; i++) {
+          System.arraycopy(distributionKeys, 0, cachedKeys[i], 0, numDistributionKeys);
+          Object[] distinctParameters =
+            new Object[distinctColIndices.get(i).size()];
+          for (int j = 0; j < distinctParameters.length; j++) {
+            distinctParameters[j] =
+              keyEval[distinctColIndices.get(i).get(j)].evaluate(row);
+          }
+          cachedKeys[i][numDistributionKeys] =
+              new StandardUnion((byte)i, distinctParameters);
+        }
+      } else {
+        // no distinct key
+        System.arraycopy(distributionKeys, 0, cachedKeys[0], 0, numDistributionKeys);
+      }
+      // Serialize the keys and append the tag
+      for (int i = 0; i < cachedKeys.length; i++) {
+        if (keyIsText) {
+          Text key = (Text) keySerializer.serialize(cachedKeys[i],
+              keyObjectInspector);
+          if (tag == -1) {
+            keyWritable.set(key.getBytes(), 0, key.getLength());
+          } else {
+            int keyLength = key.getLength();
+            keyWritable.setSize(keyLength + 1);
+            System.arraycopy(key.getBytes(), 0, keyWritable.get(), 0, keyLength);
+            keyWritable.get()[keyLength] = tagByte[0];
+          }
+        } else {
+          // Must be BytesWritable
+          BytesWritable key = (BytesWritable) keySerializer.serialize(
+              cachedKeys[i], keyObjectInspector);
+          if (tag == -1) {
+            keyWritable.set(key.getBytes(), 0, key.getLength());
+          } else {
+            int keyLength = key.getLength();
+            keyWritable.setSize(keyLength + 1);
+            System.arraycopy(key.getBytes(), 0, keyWritable.get(), 0, keyLength);
+            keyWritable.get()[keyLength] = tagByte[0];
+          }
+        }
+        keyWritable.setHashCode(keyHashCode);
+        if (out != null) {
+          out.collect(keyWritable, value);
+          // Since this is a terminal operator, update counters explicitly -
+          // forward is not called
+          if (counterNameToEnum != null) {
+            ++outputRows;
+            if (outputRows % 1000 == 0) {
+              incrCounter(numOutputRowsCntr, outputRows);
+              outputRows = 0;
+            }
           }
         }
       }
+    } catch (SerDeException e) {
+      throw new HiveException(e);
     } catch (IOException e) {
       throw new HiveException(e);
     }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ErrorMsg.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ErrorMsg.java?rev=1028072&r1=1028071&r2=1028072&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ErrorMsg.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ErrorMsg.java Wed Oct 27 19:00:02 2010
@@ -59,7 +59,7 @@ public enum ErrorMsg {
   INVALID_JOIN_CONDITION_3("OR not supported in Join currently"),
   INVALID_TRANSFORM("TRANSFORM with Other Select Columns not Supported"),
   DUPLICATE_GROUPBY_KEY("Repeated Key in Group By"),
-  UNSUPPORTED_MULTIPLE_DISTINCTS("DISTINCT on Different Columns not Supported"),
+  UNSUPPORTED_MULTIPLE_DISTINCTS("DISTINCT on Different Columns not Supported with skew in data"),
   NO_SUBQUERY_ALIAS("No Alias For Subquery"),
   NO_INSERT_INSUBQUERY("Cannot insert in a Subquery. Inserting to table "),
   NON_KEY_EXPR_IN_GROUPBY("Expression Not In Group By Key"),

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java?rev=1028072&r1=1028071&r2=1028072&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java Wed Oct 27 19:00:02 2010
@@ -81,7 +81,7 @@ public class QBParseInfo {
 
   // used by GroupBy
   private final LinkedHashMap<String, LinkedHashMap<String, ASTNode>> destToAggregationExprs;
-  private final HashMap<String, ASTNode> destToDistinctFuncExpr;
+  private final HashMap<String, List<ASTNode>> destToDistinctFuncExprs;
 
   @SuppressWarnings("unused")
   private static final Log LOG = LogFactory.getLog(QBParseInfo.class.getName());
@@ -100,7 +100,7 @@ public class QBParseInfo {
     destToLimit = new HashMap<String, Integer>();
 
     destToAggregationExprs = new LinkedHashMap<String, LinkedHashMap<String, ASTNode>>();
-    destToDistinctFuncExpr = new HashMap<String, ASTNode>();
+    destToDistinctFuncExprs = new HashMap<String, List<ASTNode>>();
 
     this.alias = alias;
     this.isSubQ = isSubQ;
@@ -120,12 +120,12 @@ public class QBParseInfo {
     return destToAggregationExprs.get(clause);
   }
 
-  public void setDistinctFuncExprForClause(String clause, ASTNode ast) {
-    destToDistinctFuncExpr.put(clause, ast);
+  public void setDistinctFuncExprsForClause(String clause, List<ASTNode> ast) {
+    destToDistinctFuncExprs.put(clause, ast);
   }
 
-  public ASTNode getDistinctFuncExprForClause(String clause) {
-    return destToDistinctFuncExpr.get(clause);
+  public List<ASTNode> getDistinctFuncExprsForClause(String clause) {
+    return destToDistinctFuncExprs.get(clause);
   }
 
   public void setSelExprForClause(String clause, ASTNode ast) {
@@ -340,12 +340,12 @@ public class QBParseInfo {
       }
     }
 
-    if (!destToDistinctFuncExpr.isEmpty()) {
-      Iterator<Map.Entry<String, ASTNode>> distn = destToDistinctFuncExpr
+    if (!destToDistinctFuncExprs.isEmpty()) {
+      Iterator<Map.Entry<String, List<ASTNode>>> distn = destToDistinctFuncExprs
           .entrySet().iterator();
       while (distn.hasNext()) {
-        ASTNode ct = distn.next().getValue();
-        if (ct != null) {
+        List<ASTNode> ct = distn.next().getValue();
+        if (!ct.isEmpty()) {
           return false;
         }
       }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java?rev=1028072&r1=1028071&r2=1028072&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java Wed Oct 27 19:00:02 2010
@@ -344,22 +344,17 @@ public class SemanticAnalyzer extends Ba
     }
   }
 
-  private ASTNode doPhase1GetDistinctFuncExpr(
+  private List<ASTNode> doPhase1GetDistinctFuncExprs(
       HashMap<String, ASTNode> aggregationTrees) throws SemanticException {
-    ASTNode expr = null;
+    List<ASTNode> exprs = new ArrayList<ASTNode>();
     for (Map.Entry<String, ASTNode> entry : aggregationTrees.entrySet()) {
       ASTNode value = entry.getValue();
       assert (value != null);
       if (value.getToken().getType() == HiveParser.TOK_FUNCTIONDI) {
-        if (expr == null) {
-          expr = value;
-        } else {
-          throw new SemanticException(ErrorMsg.UNSUPPORTED_MULTIPLE_DISTINCTS
-              .getMsg());
-        }
+        exprs.add(value);
       }
     }
-    return expr;
+    return exprs;
   }
 
   /**
@@ -591,8 +586,8 @@ public class SemanticAnalyzer extends Ba
 
         LinkedHashMap<String, ASTNode> aggregations = doPhase1GetAggregationsFromSelect(ast);
         qbp.setAggregationExprsForClause(ctx_1.dest, aggregations);
-        qbp.setDistinctFuncExprForClause(ctx_1.dest,
-            doPhase1GetDistinctFuncExpr(aggregations));
+        qbp.setDistinctFuncExprsForClause(ctx_1.dest,
+            doPhase1GetDistinctFuncExprs(aggregations));
         break;
 
       case HiveParser.TOK_WHERE:
@@ -2188,11 +2183,24 @@ public class SemanticAnalyzer extends Ba
     HashMap<String, ASTNode> aggregationTrees = parseInfo
         .getAggregationExprsForClause(dest);
     assert (aggregationTrees != null);
+    // get the last colName for the reduce KEY
+    // it represents the column name corresponding to distinct aggr, if any
+    String lastKeyColName = null;
+    if (reduceSinkOperatorInfo.getConf() instanceof ReduceSinkDesc) {
+      List<String> inputKeyCols = ((ReduceSinkDesc)
+          reduceSinkOperatorInfo.getConf()).getOutputKeyColumnNames();
+      if (inputKeyCols.size() > 0) {
+        lastKeyColName = inputKeyCols.get(inputKeyCols.size()-1);
+      }
+    }
+    int numDistinctUDFs = 0;
     for (Map.Entry<String, ASTNode> entry : aggregationTrees.entrySet()) {
       ASTNode value = entry.getValue();
 
       // This is the GenericUDAF name
       String aggName = value.getChild(0).getText();
+      boolean isDistinct = value.getType() == HiveParser.TOK_FUNCTIONDI;
+      boolean isAllColumns = value.getType() == HiveParser.TOK_FUNCTIONSTAR;
 
       // Convert children to aggParameters
       ArrayList<ExprNodeDesc> aggParameters = new ArrayList<ExprNodeDesc>();
@@ -2207,13 +2215,22 @@ public class SemanticAnalyzer extends Ba
 
         String paraExpression = paraExprInfo.getInternalName();
         assert (paraExpression != null);
+        if (isDistinct && lastKeyColName != null) {
+          // if aggr is distinct, the parameter is name is constructed as
+          // KEY.lastKeyColName:<tag>._colx
+          paraExpression = Utilities.ReduceField.KEY.name() + "." +
+          lastKeyColName + ":" + numDistinctUDFs + "." +
+          getColumnInternalName(i-1);
+
+        }
         aggParameters.add(new ExprNodeColumnDesc(paraExprInfo.getType(),
-            paraExprInfo.getInternalName(), paraExprInfo.getTabAlias(),
+            paraExpression, paraExprInfo.getTabAlias(),
             paraExprInfo.getIsVirtualCol()));
       }
 
-      boolean isDistinct = value.getType() == HiveParser.TOK_FUNCTIONDI;
-      boolean isAllColumns = value.getType() == HiveParser.TOK_FUNCTIONSTAR;
+      if (isDistinct) {
+        numDistinctUDFs++;
+      }
       Mode amode = groupByDescModeToUDAFMode(mode, isDistinct);
       GenericUDAFEvaluator genericUDAFEvaluator = getGenericUDAFEvaluator(
           aggName, aggParameters, value, isDistinct, isAllColumns);
@@ -2290,10 +2307,22 @@ public class SemanticAnalyzer extends Ba
 
     HashMap<String, ASTNode> aggregationTrees = parseInfo
         .getAggregationExprsForClause(dest);
+    // get the last colName for the reduce KEY
+    // it represents the column name corresponding to distinct aggr, if any
+    String lastKeyColName = null;
+    if (reduceSinkOperatorInfo.getConf() instanceof ReduceSinkDesc) {
+      List<String> inputKeyCols = ((ReduceSinkDesc)
+          reduceSinkOperatorInfo.getConf()).getOutputKeyColumnNames();
+      if (inputKeyCols.size() > 0) {
+        lastKeyColName = inputKeyCols.get(inputKeyCols.size()-1);
+      }
+    }
+    int numDistinctUDFs = 0;
     for (Map.Entry<String, ASTNode> entry : aggregationTrees.entrySet()) {
       ASTNode value = entry.getValue();
       String aggName = value.getChild(0).getText();
       ArrayList<ExprNodeDesc> aggParameters = new ArrayList<ExprNodeDesc>();
+      boolean isDistinct = (value.getType() == HiveParser.TOK_FUNCTIONDI);
 
       // If the function is distinct, partial aggregartion has not been done on
       // the client side.
@@ -2305,8 +2334,7 @@ public class SemanticAnalyzer extends Ba
       // Otherwise, we look for b+c.
       // For distincts, partial aggregation is never performed on the client
       // side, so always look for the parameters: d+e
-      boolean partialAggDone = !(distPartAgg
-          || (value.getToken().getType() == HiveParser.TOK_FUNCTIONDI));
+      boolean partialAggDone = !(distPartAgg || isDistinct);
       if (!partialAggDone) {
         // 0 is the function name
         for (int i = 1; i < value.getChildCount(); i++) {
@@ -2320,8 +2348,16 @@ public class SemanticAnalyzer extends Ba
 
           String paraExpression = paraExprInfo.getInternalName();
           assert (paraExpression != null);
+          if (isDistinct && lastKeyColName != null) {
+            // if aggr is distinct, the parameter is name is constructed as
+            // KEY.lastKeyColName:<tag>._colx
+            paraExpression = Utilities.ReduceField.KEY.name() + "." +
+            lastKeyColName + ":" + numDistinctUDFs + "."
+            + getColumnInternalName(i-1);
+
+          }
           aggParameters.add(new ExprNodeColumnDesc(paraExprInfo.getType(),
-              paraExprInfo.getInternalName(), paraExprInfo.getTabAlias(),
+              paraExpression, paraExprInfo.getTabAlias(),
               paraExprInfo.getIsVirtualCol()));
         }
       } else {
@@ -2335,7 +2371,9 @@ public class SemanticAnalyzer extends Ba
             paraExpression, paraExprInfo.getTabAlias(), paraExprInfo
             .getIsVirtualCol()));
       }
-      boolean isDistinct = (value.getType() == HiveParser.TOK_FUNCTIONDI);
+      if (isDistinct) {
+        numDistinctUDFs++;
+      }
       boolean isAllColumns = value.getType() == HiveParser.TOK_FUNCTIONSTAR;
       Mode amode = groupByDescModeToUDAFMode(mode, isDistinct);
       GenericUDAFEvaluator genericUDAFEvaluator = null;
@@ -2414,22 +2452,25 @@ public class SemanticAnalyzer extends Ba
     }
 
     // If there is a distinctFuncExp, add all parameters to the reduceKeys.
-    if (parseInfo.getDistinctFuncExprForClause(dest) != null) {
-      ASTNode value = parseInfo.getDistinctFuncExprForClause(dest);
+    if (!parseInfo.getDistinctFuncExprsForClause(dest).isEmpty()) {
+      List<ASTNode> list = parseInfo.getDistinctFuncExprsForClause(dest);
       int numDistn = 0;
-      // 0 is function name
-      for (int i = 1; i < value.getChildCount(); i++) {
-        ASTNode parameter = (ASTNode) value.getChild(i);
-        if (groupByOutputRowResolver.getExpression(parameter) == null) {
-          ExprNodeDesc distExprNode = genExprNodeDesc(parameter,
-              groupByInputRowResolver);
-          groupByKeys.add(distExprNode);
-          numDistn++;
-          String field = getColumnInternalName(grpByExprs.size() + numDistn - 1);
-          outputColumnNames.add(field);
-          groupByOutputRowResolver.putExpression(parameter, new ColumnInfo(field,
-              distExprNode.getTypeInfo(), "", false));
-          colExprMap.put(field, groupByKeys.get(groupByKeys.size() - 1));
+      for(ASTNode value: list) {
+        // 0 is function name
+        for (int i = 1; i < value.getChildCount(); i++) {
+          ASTNode parameter = (ASTNode) value.getChild(i);
+          if (groupByOutputRowResolver.getExpression(parameter) == null) {
+            ExprNodeDesc distExprNode = genExprNodeDesc(parameter,
+                groupByInputRowResolver);
+            groupByKeys.add(distExprNode);
+            numDistn++;
+            String field = getColumnInternalName(grpByExprs.size() + numDistn -
+                1);
+            outputColumnNames.add(field);
+            groupByOutputRowResolver.putExpression(parameter, new ColumnInfo(
+                field, distExprNode.getTypeInfo(), "", false));
+            colExprMap.put(field, groupByKeys.get(groupByKeys.size() - 1));
+          }
         }
       }
     }
@@ -2513,7 +2554,8 @@ public class SemanticAnalyzer extends Ba
     ArrayList<ExprNodeDesc> reduceKeys = new ArrayList<ExprNodeDesc>();
     // Pre-compute group-by keys and store in reduceKeys
 
-    List<String> outputColumnNames = new ArrayList<String>();
+    List<String> outputKeyColumnNames = new ArrayList<String>();
+    List<String> outputValueColumnNames = new ArrayList<String>();
     List<ASTNode> grpByExprs = getGroupByForClause(parseInfo, dest);
     for (int i = 0; i < grpByExprs.size(); ++i) {
       ASTNode grpbyExpr = grpByExprs.get(i);
@@ -2521,7 +2563,7 @@ public class SemanticAnalyzer extends Ba
           reduceSinkInputRowResolver);
       reduceKeys.add(inputExpr);
       if (reduceSinkOutputRowResolver.getExpression(grpbyExpr) == null) {
-        outputColumnNames.add(getColumnInternalName(reduceKeys.size() - 1));
+        outputKeyColumnNames.add(getColumnInternalName(reduceKeys.size() - 1));
         String field = Utilities.ReduceField.KEY.toString() + "."
             + getColumnInternalName(reduceKeys.size() - 1);
         ColumnInfo colInfo = new ColumnInfo(field, reduceKeys.get(
@@ -2534,24 +2576,43 @@ public class SemanticAnalyzer extends Ba
       }
     }
 
+    List<List<Integer>> distinctColIndices = new ArrayList<List<Integer>>();
     // If there is a distinctFuncExp, add all parameters to the reduceKeys.
-    if (parseInfo.getDistinctFuncExprForClause(dest) != null) {
-      ASTNode value = parseInfo.getDistinctFuncExprForClause(dest);
-      // 0 is function name
-      for (int i = 1; i < value.getChildCount(); i++) {
-        ASTNode parameter = (ASTNode) value.getChild(i);
-        if (reduceSinkOutputRowResolver.getExpression(parameter) == null) {
-          reduceKeys
-              .add(genExprNodeDesc(parameter, reduceSinkInputRowResolver));
-          outputColumnNames.add(getColumnInternalName(reduceKeys.size() - 1));
-          String field = Utilities.ReduceField.KEY.toString() + "."
-              + getColumnInternalName(reduceKeys.size() - 1);
-          ColumnInfo colInfo = new ColumnInfo(field, reduceKeys.get(
-              reduceKeys.size() - 1).getTypeInfo(), null, false);
+    if (!parseInfo.getDistinctFuncExprsForClause(dest).isEmpty()) {
+      List<ASTNode> distFuncs = parseInfo.getDistinctFuncExprsForClause(dest);
+      String colName = getColumnInternalName(reduceKeys.size());
+      outputKeyColumnNames.add(colName);
+      for (int i = 0; i < distFuncs.size(); i++) {
+        ASTNode value = distFuncs.get(i);
+        int numExprs = 0;
+        List<Integer> distinctIndices = new ArrayList<Integer>();
+        // 0 is function name
+        for (int j = 1; j < value.getChildCount(); j++) {
+          ASTNode parameter = (ASTNode) value.getChild(j);
+          ExprNodeDesc expr = genExprNodeDesc(parameter, reduceSinkInputRowResolver);
+          // see if expr is already present in reduceKeys.
+          // get index of expr in reduceKeys
+          int ri;
+          for (ri = 0; ri < reduceKeys.size(); ri++) {
+            if (reduceKeys.get(ri).getExprString().equals(expr.getExprString())) {
+              break;
+            }
+          }
+          // add the expr to reduceKeys if it is not present
+          if (ri == reduceKeys.size()) {
+            reduceKeys.add(expr);
+          }
+          // add the index of expr in reduceKeys to distinctIndices
+          distinctIndices.add(ri);
+          String name = getColumnInternalName(numExprs);
+          String field = Utilities.ReduceField.KEY.toString() + "." + colName
+          + ":" + i
+          + "." + name;
+          ColumnInfo colInfo = new ColumnInfo(field, expr.getTypeInfo(), null, false);
           reduceSinkOutputRowResolver.putExpression(parameter, colInfo);
-          colExprMap.put(colInfo.getInternalName(), reduceKeys.get(reduceKeys
-              .size() - 1));
+          numExprs++;
         }
+        distinctColIndices.add(distinctIndices);
       }
     }
 
@@ -2569,7 +2630,7 @@ public class SemanticAnalyzer extends Ba
           if (reduceSinkOutputRowResolver.getExpression(parameter) == null) {
             reduceValues.add(genExprNodeDesc(parameter,
                 reduceSinkInputRowResolver));
-            outputColumnNames
+            outputValueColumnNames
                 .add(getColumnInternalName(reduceValues.size() - 1));
             String field = Utilities.ReduceField.VALUE.toString() + "."
                 + getColumnInternalName(reduceValues.size() - 1);
@@ -2590,7 +2651,7 @@ public class SemanticAnalyzer extends Ba
         reduceValues.add(new ExprNodeColumnDesc(type,
             getColumnInternalName(inputField), "", false));
         inputField++;
-        outputColumnNames.add(getColumnInternalName(reduceValues.size() - 1));
+        outputValueColumnNames.add(getColumnInternalName(reduceValues.size() - 1));
         String field = Utilities.ReduceField.VALUE.toString() + "."
             + getColumnInternalName(reduceValues.size() - 1);
         reduceSinkOutputRowResolver.putExpression(entry.getValue(),
@@ -2600,7 +2661,8 @@ public class SemanticAnalyzer extends Ba
 
     ReduceSinkOperator rsOp = (ReduceSinkOperator) putOpInsertMap(
         OperatorFactory.getAndMakeChild(PlanUtils.getReduceSinkDesc(reduceKeys,
-        reduceValues, outputColumnNames, true, -1, numPartitionFields,
+        grpByExprs.size(), reduceValues, distinctColIndices,
+        outputKeyColumnNames, outputValueColumnNames, true, -1, numPartitionFields,
         numReducers), new RowSchema(reduceSinkOutputRowResolver
         .getColumnInfos()), inputOperatorInfo), reduceSinkOutputRowResolver);
     rsOp.setColumnExprMap(colExprMap);
@@ -2788,7 +2850,7 @@ public class SemanticAnalyzer extends Ba
    *
    *           Generate a Group-By plan using 1 map-reduce job. Spray by the
    *           group by key, and sort by the distinct key (if any), and compute
-   *           aggregates * The agggregation evaluation functions are as
+   *           aggregates * The aggregation evaluation functions are as
    *           follows: Partitioning Key: grouping key
    *
    *           Sorting Key: grouping key if no DISTINCT grouping + distinct key
@@ -2796,7 +2858,7 @@ public class SemanticAnalyzer extends Ba
    *
    *           Reducer: iterate/merge (mode = COMPLETE)
    **/
-  @SuppressWarnings({"unused", "nls"})
+  @SuppressWarnings({"nls"})
   private Operator genGroupByPlan1MR(String dest, QB qb, Operator input)
       throws SemanticException {
 
@@ -2940,8 +3002,8 @@ public class SemanticAnalyzer extends Ba
     // operator. We set the numPartitionColumns to -1 for this purpose. This is
     // captured by WritableComparableHiveObject.hashCode() function.
     Operator reduceSinkOperatorInfo = genGroupByPlanReduceSinkOperator(qb,
-        dest, input, (parseInfo.getDistinctFuncExprForClause(dest) == null ? -1
-        : Integer.MAX_VALUE), -1, false);
+        dest, input, (parseInfo.getDistinctFuncExprsForClause(dest).isEmpty() ?
+        -1 : Integer.MAX_VALUE), -1, false);
 
     // ////// 2. Generate GroupbyOperator
     Map<String, GenericUDAFEvaluator> genericUDAFEvaluators =
@@ -2974,7 +3036,7 @@ public class SemanticAnalyzer extends Ba
       return false;
     }
 
-    if (qb.getParseInfo().getDistinctFuncExprForClause(dest) != null) {
+    if (!qb.getParseInfo().getDistinctFuncExprsForClause(dest).isEmpty()) {
       return false;
     }
 
@@ -3093,7 +3155,7 @@ public class SemanticAnalyzer extends Ba
       // ////// Generate ReduceSink Operator
       Operator reduceSinkOperatorInfo = genGroupByPlanReduceSinkOperator(qb,
           dest, groupByOperatorInfo, (parseInfo
-          .getDistinctFuncExprForClause(dest) == null ? -1
+          .getDistinctFuncExprsForClause(dest).isEmpty() ? -1
           : Integer.MAX_VALUE), -1, true);
 
       // ////// Generate GroupbyOperator for a partial aggregation
@@ -3436,8 +3498,8 @@ public class SemanticAnalyzer extends Ba
       dest_part = qbm.getDestPartitionForAlias(dest);
       dest_tab = dest_part.getTable();
       Path tabPath = dest_tab.getPath();
-      Path partPath = dest_part.getPartitionPath(); 
-      
+      Path partPath = dest_part.getPartitionPath();
+
         // if the table is in a different dfs than the partition,
         // replace the partition's dfs with the table's dfs.
       dest_path = new Path(tabPath.toUri().getScheme(), tabPath.toUri()
@@ -5060,35 +5122,36 @@ public class SemanticAnalyzer extends Ba
       }
 
       // All distinct expressions must be the same
-      ASTNode value = qbp.getDistinctFuncExprForClause(dest);
-      if (value == null) {
+      List<ASTNode> list = qbp.getDistinctFuncExprsForClause(dest);
+      if (list.isEmpty()) {
         return null;
       }
 
       List<ExprNodeDesc> currDestList = new ArrayList<ExprNodeDesc>();
       List<ASTNode> currASTList = new ArrayList<ASTNode>();
-      try {
-        // 0 is function name
-        for (int i = 1; i < value.getChildCount(); i++) {
-          ASTNode parameter = (ASTNode) value.getChild(i);
-          currDestList.add(genExprNodeDesc(parameter, inputRR));
-          currASTList.add(parameter);
-        }
-      } catch (SemanticException e) {
-        return null;
-      }
-
-      if (oldList == null) {
-        oldList = currDestList;
-        oldASTList = currASTList;
-      } else {
-        if (oldList.size() != currDestList.size()) {
+      for (ASTNode value: list) {
+        try {
+          // 0 is function name
+          for (int i = 1; i < value.getChildCount(); i++) {
+            ASTNode parameter = (ASTNode) value.getChild(i);
+            currDestList.add(genExprNodeDesc(parameter, inputRR));
+            currASTList.add(parameter);
+          }
+        } catch (SemanticException e) {
           return null;
         }
-        for (int pos = 0; pos < oldList.size(); pos++) {
-          if (!oldList.get(pos).isSame(currDestList.get(pos))) {
+        if (oldList == null) {
+          oldList = currDestList;
+          oldASTList = currASTList;
+        } else {
+          if (oldList.size() != currDestList.size()) {
             return null;
           }
+          for (int pos = 0; pos < oldList.size(); pos++) {
+            if (!oldList.get(pos).isSame(currDestList.get(pos))) {
+              return null;
+            }
+          }
         }
       }
     }
@@ -5237,6 +5300,13 @@ public class SemanticAnalyzer extends Ba
 
         if (qbp.getAggregationExprsForClause(dest).size() != 0
             || getGroupByForClause(qbp, dest).size() > 0) {
+          //multiple distincts is not supported with skew in data
+          if (conf.getVar(HiveConf.ConfVars.HIVEGROUPBYSKEW)
+              .equalsIgnoreCase("true") &&
+             qbp.getDistinctFuncExprsForClause(dest).size() > 1) {
+            throw new SemanticException(ErrorMsg.UNSUPPORTED_MULTIPLE_DISTINCTS.
+                getMsg());
+          }
           // insert a select operator here used by the ColumnPruner to reduce
           // the data to shuffle
           curr = insertSelectAllPlanForGroupBy(dest, curr);

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java?rev=1028072&r1=1028071&r2=1028072&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java Wed Oct 27 19:00:02 2010
@@ -29,8 +29,9 @@ public class GroupByDesc implements java
    * PARTIAL1: partial aggregation - first phase: iterate, terminatePartial
    * PARTIAL2: partial aggregation - second phase: merge, terminatePartial
    * PARTIALS: For non-distinct the same as PARTIAL2, for distinct the same as
-   * PARTIAL1 FINAL: partial aggregation - final phase: merge, terminate HASH:
-   * For non-distinct the same as PARTIAL1 but use hash-table-based aggregation
+   *           PARTIAL1
+   * FINAL: partial aggregation - final phase: merge, terminate
+   * HASH: For non-distinct the same as PARTIAL1 but use hash-table-based aggregation
    * MERGEPARTIAL: FINAL for non-distinct aggregations, COMPLETE for distinct
    * aggregations.
    */

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java?rev=1028072&r1=1028071&r2=1028072&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java Wed Oct 27 19:00:02 2010
@@ -54,6 +54,8 @@ import org.apache.hadoop.hive.serde2.bin
 import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe;
 import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
 import org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.SequenceFileInputFormat;
 import org.apache.hadoop.mapred.SequenceFileOutputFormat;
@@ -357,6 +359,43 @@ public final class PlanUtils {
 
   /**
    * Convert the ColumnList to FieldSchema list.
+   *
+   * Adds uniontype for distinctColIndices.
+   */
+  public static List<FieldSchema> getFieldSchemasFromColumnListWithLength(
+      List<ExprNodeDesc> cols, List<List<Integer>> distinctColIndices,
+      List<String> outputColumnNames, int length,
+      String fieldPrefix) {
+    // last one for union column.
+    List<FieldSchema> schemas = new ArrayList<FieldSchema>(length + 1);
+    for (int i = 0; i < length; i++) {
+      schemas.add(MetaStoreUtils.getFieldSchemaFromTypeInfo(
+          fieldPrefix + outputColumnNames.get(i), cols.get(i).getTypeInfo()));
+    }
+
+    List<TypeInfo> unionTypes = new ArrayList<TypeInfo>();
+    for (List<Integer> distinctCols : distinctColIndices) {
+      List<String> names = new ArrayList<String>();
+      List<TypeInfo> types = new ArrayList<TypeInfo>();
+      int numExprs = 0;
+      for (int i : distinctCols) {
+        names.add(HiveConf.getColumnInternalName(numExprs));
+        types.add(cols.get(i).getTypeInfo());
+        numExprs++;
+      }
+      unionTypes.add(TypeInfoFactory.getStructTypeInfo(names, types));
+    }
+    if (cols.size() - length > 0) {
+      schemas.add(MetaStoreUtils.getFieldSchemaFromTypeInfo(
+          fieldPrefix + outputColumnNames.get(length),
+          TypeInfoFactory.getUnionTypeInfo(unionTypes)));
+    }
+
+    return schemas;
+  }
+
+  /**
+   * Convert the ColumnList to FieldSchema list.
    */
   public static List<FieldSchema> getFieldSchemasFromColumnList(
       List<ExprNodeDesc> cols, List<String> outputColumnNames, int start,
@@ -446,33 +485,70 @@ public final class PlanUtils {
       ArrayList<ExprNodeDesc> keyCols, ArrayList<ExprNodeDesc> valueCols,
       List<String> outputColumnNames, boolean includeKeyCols, int tag,
       ArrayList<ExprNodeDesc> partitionCols, String order, int numReducers) {
+    return getReduceSinkDesc(keyCols, keyCols.size(), valueCols,
+        new ArrayList<List<Integer>>(),
+        includeKeyCols ? outputColumnNames.subList(0, keyCols.size()) :
+          new ArrayList<String>(),
+        includeKeyCols ? outputColumnNames.subList(keyCols.size(),
+            outputColumnNames.size()) : outputColumnNames,
+        includeKeyCols, tag, partitionCols, order, numReducers);
+  }
+
+  /**
+   * Create the reduce sink descriptor.
+   *
+   * @param keyCols
+   *          The columns to be stored in the key
+   * @param numKeys
+   *          number of distribution key numbers. Equals to group-by-key
+   *          numbers usually.
+   * @param valueCols
+   *          The columns to be stored in the value
+   * @param distinctColIndices
+   *          column indices for distinct aggregate parameters
+   * @param outputKeyColumnNames
+   *          The output key columns names
+   * @param outputValueColumnNames
+   *          The output value columns names
+   * @param tag
+   *          The tag for this reducesink
+   * @param partitionCols
+   *          The columns for partitioning.
+   * @param numReducers
+   *          The number of reducers, set to -1 for automatic inference based on
+   *          input data size.
+   * @return The reduceSinkDesc object.
+   */
+  public static ReduceSinkDesc getReduceSinkDesc(
+      final ArrayList<ExprNodeDesc> keyCols, int numKeys,
+      ArrayList<ExprNodeDesc> valueCols,
+      List<List<Integer>> distinctColIndices,
+      List<String> outputKeyColumnNames,
+      List<String> outputValueColumnNames,
+      boolean includeKeyCols, int tag,
+      ArrayList<ExprNodeDesc> partitionCols, String order, int numReducers) {
     TableDesc keyTable = null;
     TableDesc valueTable = null;
     ArrayList<String> outputKeyCols = new ArrayList<String>();
     ArrayList<String> outputValCols = new ArrayList<String>();
     if (includeKeyCols) {
-      keyTable = getReduceKeyTableDesc(getFieldSchemasFromColumnList(keyCols,
-          outputColumnNames, 0, ""), order);
-      outputKeyCols.addAll(outputColumnNames.subList(0, keyCols.size()));
-      valueTable = getReduceValueTableDesc(getFieldSchemasFromColumnList(
-          valueCols, outputColumnNames, keyCols.size(), ""));
-      outputValCols.addAll(outputColumnNames.subList(keyCols.size(),
-          outputColumnNames.size()));
+      keyTable = getReduceKeyTableDesc(getFieldSchemasFromColumnListWithLength(
+          keyCols, distinctColIndices, outputKeyColumnNames, numKeys, ""),
+          order);
+      outputKeyCols.addAll(outputKeyColumnNames);
     } else {
-      keyTable = getReduceKeyTableDesc(getFieldSchemasFromColumnList(keyCols,
-          "reducesinkkey"), order);
-      for (int i = 0; i < keyCols.size(); i++) {
+      keyTable = getReduceKeyTableDesc(getFieldSchemasFromColumnList(
+          keyCols, "reducesinkkey"),order);
+     for (int i = 0; i < keyCols.size(); i++) {
         outputKeyCols.add("reducesinkkey" + i);
       }
-      valueTable = getReduceValueTableDesc(getFieldSchemasFromColumnList(
-          valueCols, outputColumnNames, 0, ""));
-      outputValCols.addAll(outputColumnNames);
     }
-    return new ReduceSinkDesc(keyCols, valueCols, outputKeyCols, outputValCols,
+    valueTable = getReduceValueTableDesc(getFieldSchemasFromColumnList(
+        valueCols, outputValueColumnNames, 0, ""));
+    outputValCols.addAll(outputValueColumnNames);
+    return new ReduceSinkDesc(keyCols, numKeys, valueCols, outputKeyCols,
+        distinctColIndices, outputValCols,
         tag, partitionCols, numReducers, keyTable,
-            // Revert to DynamicSerDe:
-        // getBinaryTableDesc(getFieldSchemasFromColumnList(valueCols,
-        // "reducesinkvalue")));
         valueTable);
   }
 
@@ -499,6 +575,48 @@ public final class PlanUtils {
       ArrayList<ExprNodeDesc> keyCols, ArrayList<ExprNodeDesc> valueCols,
       List<String> outputColumnNames, boolean includeKey, int tag,
       int numPartitionFields, int numReducers) throws SemanticException {
+    return getReduceSinkDesc(keyCols, keyCols.size(), valueCols,
+        new ArrayList<List<Integer>>(),
+        includeKey ? outputColumnNames.subList(0, keyCols.size()) :
+          new ArrayList<String>(),
+        includeKey ?
+            outputColumnNames.subList(keyCols.size(), outputColumnNames.size())
+            : outputColumnNames,
+        includeKey, tag, numPartitionFields, numReducers);
+  }
+
+  /**
+   * Create the reduce sink descriptor.
+   *
+   * @param keyCols
+   *          The columns to be stored in the key
+   * @param numKeys  number of distribution keys. Equals to group-by-key
+   *        numbers usually.
+   * @param valueCols
+   *          The columns to be stored in the value
+   * @param distinctColIndices
+   *          column indices for distinct aggregates
+   * @param outputKeyColumnNames
+   *          The output key columns names
+   * @param outputValueColumnNames
+   *          The output value columns names
+   * @param tag
+   *          The tag for this reducesink
+   * @param numPartitionFields
+   *          The first numPartitionFields of keyCols will be partition columns.
+   *          If numPartitionFields=-1, then partition randomly.
+   * @param numReducers
+   *          The number of reducers, set to -1 for automatic inference based on
+   *          input data size.
+   * @return The reduceSinkDesc object.
+   */
+  public static ReduceSinkDesc getReduceSinkDesc(
+      ArrayList<ExprNodeDesc> keyCols, int numKeys,
+      ArrayList<ExprNodeDesc> valueCols,
+      List<List<Integer>> distinctColIndices,
+      List<String> outputKeyColumnNames, List<String> outputValueColumnNames,
+      boolean includeKey, int tag,
+      int numPartitionFields, int numReducers) throws SemanticException {
     ArrayList<ExprNodeDesc> partitionCols = null;
 
     if (numPartitionFields >= keyCols.size()) {
@@ -519,8 +637,9 @@ public final class PlanUtils {
     for (int i = 0; i < keyCols.size(); i++) {
       order.append("+");
     }
-    return getReduceSinkDesc(keyCols, valueCols, outputColumnNames, includeKey,
-        tag, partitionCols, order.toString(), numReducers);
+    return getReduceSinkDesc(keyCols, numKeys, valueCols, distinctColIndices,
+        outputKeyColumnNames, outputValueColumnNames, includeKey, tag,
+        partitionCols, order.toString(), numReducers);
   }
 
   /**

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java?rev=1028072&r1=1028071&r2=1028072&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java Wed Oct 27 19:00:02 2010
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hive.ql.plan;
 
 import java.io.Serializable;
+import java.util.List;
 
 /**
  * ReduceSinkDesc.
@@ -32,6 +33,7 @@ public class ReduceSinkDesc implements S
    */
   private java.util.ArrayList<ExprNodeDesc> keyCols;
   private java.util.ArrayList<java.lang.String> outputKeyColumnNames;
+  private List<List<Integer>> distinctColumnIndices;
   /**
    * Value columns are passed to reducer in the "value".
    */
@@ -52,6 +54,11 @@ public class ReduceSinkDesc implements S
   private int tag;
 
   /**
+   * Number of distribution keys.
+   */
+  private int numDistributionKeys;
+
+  /**
    * The partition columns (CLUSTER BY or DISTRIBUTE BY in Hive language).
    * Partition columns decide the reducer that the current row goes to.
    * Partition columns are not passed to reducer.
@@ -64,20 +71,24 @@ public class ReduceSinkDesc implements S
   }
 
   public ReduceSinkDesc(java.util.ArrayList<ExprNodeDesc> keyCols,
+      int numDistributionKeys,
       java.util.ArrayList<ExprNodeDesc> valueCols,
       java.util.ArrayList<java.lang.String> outputKeyColumnNames,
-      java.util.ArrayList<java.lang.String> outputValueolumnNames, int tag,
+      List<List<Integer>> distinctColumnIndices,
+      java.util.ArrayList<java.lang.String> outputValueColumnNames, int tag,
       java.util.ArrayList<ExprNodeDesc> partitionCols, int numReducers,
       final TableDesc keySerializeInfo, final TableDesc valueSerializeInfo) {
     this.keyCols = keyCols;
+    this.numDistributionKeys = numDistributionKeys;
     this.valueCols = valueCols;
     this.outputKeyColumnNames = outputKeyColumnNames;
-    outputValueColumnNames = outputValueolumnNames;
+    this.outputValueColumnNames = outputValueColumnNames;
     this.tag = tag;
     this.numReducers = numReducers;
     this.partitionCols = partitionCols;
     this.keySerializeInfo = keySerializeInfo;
     this.valueSerializeInfo = valueSerializeInfo;
+    this.distinctColumnIndices = distinctColumnIndices;
   }
 
   public java.util.ArrayList<java.lang.String> getOutputKeyColumnNames() {
@@ -107,6 +118,14 @@ public class ReduceSinkDesc implements S
     this.keyCols = keyCols;
   }
 
+  public int getNumDistributionKeys() {
+    return this.numDistributionKeys;
+  }
+
+  public void setNumDistributionKeys(int numKeys) {
+    this.numDistributionKeys = numKeys;
+  }
+
   @Explain(displayName = "value expressions")
   public java.util.ArrayList<ExprNodeDesc> getValueCols() {
     return valueCols;
@@ -184,4 +203,12 @@ public class ReduceSinkDesc implements S
         orderStr);
   }
 
+  public List<List<Integer>> getDistinctColumnIndices() {
+    return distinctColumnIndices;
+  }
+
+  public void setDistinctColumnIndices(
+      List<List<Integer>> distinctColumnIndices) {
+    this.distinctColumnIndices = distinctColumnIndices;
+  }
 }

Added: hive/trunk/ql/src/test/queries/clientnegative/groupby2_map_skew_multi_distinct.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientnegative/groupby2_map_skew_multi_distinct.q?rev=1028072&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientnegative/groupby2_map_skew_multi_distinct.q (added)
+++ hive/trunk/ql/src/test/queries/clientnegative/groupby2_map_skew_multi_distinct.q Wed Oct 27 19:00:02 2010
@@ -0,0 +1,14 @@
+set hive.map.aggr=true;
+set hive.groupby.skewindata=true;
+set mapred.reduce.tasks=31;
+
+CREATE TABLE dest1(key STRING, c1 INT, c2 STRING, c3 INT, c4 INT) STORED AS TEXTFILE;
+
+EXPLAIN
+FROM src
+INSERT OVERWRITE TABLE dest1 SELECT substr(src.key,1,1), count(DISTINCT substr(src.value,5)), concat(substr(src.key,1,1),sum(substr(src.value,5))), sum(DISTINCT substr(src.value, 5)), count(src.value) GROUP BY substr(src.key,1,1);
+
+FROM src
+INSERT OVERWRITE TABLE dest1 SELECT substr(src.key,1,1), count(DISTINCT substr(src.value,5)), concat(substr(src.key,1,1),sum(substr(src.value,5))), sum(DISTINCT substr(src.value, 5)), count(src.value) GROUP BY substr(src.key,1,1);
+
+SELECT dest1.* FROM dest1;

Added: hive/trunk/ql/src/test/queries/clientnegative/groupby2_multi_distinct.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientnegative/groupby2_multi_distinct.q?rev=1028072&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientnegative/groupby2_multi_distinct.q (added)
+++ hive/trunk/ql/src/test/queries/clientnegative/groupby2_multi_distinct.q Wed Oct 27 19:00:02 2010
@@ -0,0 +1,13 @@
+set hive.map.aggr=false;
+set hive.groupby.skewindata=true;
+
+CREATE TABLE dest_g2(key STRING, c1 INT, c2 STRING, c3 INT, c4 INT) STORED AS TEXTFILE;
+
+EXPLAIN
+FROM src
+INSERT OVERWRITE TABLE dest_g2 SELECT substr(src.key,1,1), count(DISTINCT substr(src.value,5)), concat(substr(src.key,1,1),sum(substr(src.value,5))), sum(DISTINCT substr(src.value, 5)), count(src.value) GROUP BY substr(src.key,1,1);
+
+FROM src
+INSERT OVERWRITE TABLE dest_g2 SELECT substr(src.key,1,1), count(DISTINCT substr(src.value,5)), concat(substr(src.key,1,1),sum(substr(src.value,5))), sum(DISTINCT substr(src.value, 5)), count(src.value) GROUP BY substr(src.key,1,1);
+
+SELECT dest_g2.* FROM dest_g2;

Added: hive/trunk/ql/src/test/queries/clientnegative/groupby3_map_skew_multi_distinct.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientnegative/groupby3_map_skew_multi_distinct.q?rev=1028072&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientnegative/groupby3_map_skew_multi_distinct.q (added)
+++ hive/trunk/ql/src/test/queries/clientnegative/groupby3_map_skew_multi_distinct.q Wed Oct 27 19:00:02 2010
@@ -0,0 +1,36 @@
+set hive.map.aggr=true;
+set hive.groupby.skewindata=true;
+set mapred.reduce.tasks=31;
+
+CREATE TABLE dest1(c1 DOUBLE, c2 DOUBLE, c3 DOUBLE, c4 DOUBLE, c5 DOUBLE, c6 DOUBLE, c7 DOUBLE, c8 DOUBLE, c9 DOUBLE, c10 DOUBLE, c11 DOUBLE) STORED AS TEXTFILE;
+
+EXPLAIN
+FROM src
+INSERT OVERWRITE TABLE dest1 SELECT
+  sum(substr(src.value,5)),
+  avg(substr(src.value,5)),
+  avg(DISTINCT substr(src.value,5)),
+  max(substr(src.value,5)),
+  min(substr(src.value,5)),
+  std(substr(src.value,5)),
+  stddev_samp(substr(src.value,5)),
+  variance(substr(src.value,5)),
+  var_samp(substr(src.value,5)),
+  sum(DISTINCT substr(src.value, 5)),
+  count(DISTINCT substr(src.value, 5));
+
+FROM src
+INSERT OVERWRITE TABLE dest1 SELECT
+  sum(substr(src.value,5)),
+  avg(substr(src.value,5)),
+  avg(DISTINCT substr(src.value,5)),
+  max(substr(src.value,5)),
+  min(substr(src.value,5)),
+  std(substr(src.value,5)),
+  stddev_samp(substr(src.value,5)),
+  variance(substr(src.value,5)),
+  var_samp(substr(src.value,5)),
+  sum(DISTINCT substr(src.value, 5)),
+  count(DISTINCT substr(src.value, 5));
+
+SELECT dest1.* FROM dest1;

Added: hive/trunk/ql/src/test/queries/clientnegative/groupby3_multi_distinct.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientnegative/groupby3_multi_distinct.q?rev=1028072&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientnegative/groupby3_multi_distinct.q (added)
+++ hive/trunk/ql/src/test/queries/clientnegative/groupby3_multi_distinct.q Wed Oct 27 19:00:02 2010
@@ -0,0 +1,36 @@
+set hive.map.aggr=false;
+set hive.groupby.skewindata=true;
+
+CREATE TABLE dest1(c1 DOUBLE, c2 DOUBLE, c3 DOUBLE, c4 DOUBLE, c5 DOUBLE, c6 DOUBLE, c7 DOUBLE, c8 DOUBLE, c9 DOUBLE, c10 DOUBLE, c11 DOUBLE) STORED AS TEXTFILE;
+
+EXPLAIN
+FROM src
+INSERT OVERWRITE TABLE dest1 SELECT 
+  sum(substr(src.value,5)), 
+  avg(substr(src.value,5)), 
+  avg(DISTINCT substr(src.value,5)), 
+  max(substr(src.value,5)),
+  min(substr(src.value,5)), 
+  std(substr(src.value,5)),
+  stddev_samp(substr(src.value,5)),
+  variance(substr(src.value,5)),
+  var_samp(substr(src.value,5)),
+  sum(DISTINCT substr(src.value, 5)),
+  count(DISTINCT substr(src.value, 5));
+  
+
+FROM src
+INSERT OVERWRITE TABLE dest1 SELECT 
+  sum(substr(src.value,5)), 
+  avg(substr(src.value,5)), 
+  avg(DISTINCT substr(src.value,5)), 
+  max(substr(src.value,5)), 
+  min(substr(src.value,5)), 
+  std(substr(src.value,5)),
+  stddev_samp(substr(src.value,5)),
+  variance(substr(src.value,5)),
+  var_samp(substr(src.value,5)),
+  sum(DISTINCT substr(src.value, 5)),
+  count(DISTINCT substr(src.value, 5));
+
+SELECT dest1.* FROM dest1;

Added: hive/trunk/ql/src/test/queries/clientpositive/count.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/count.q?rev=1028072&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/count.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/count.q Wed Oct 27 19:00:02 2010
@@ -0,0 +1,17 @@
+create table abcd (a int, b int, c int, d int);
+LOAD DATA LOCAL INPATH '../data/files/in4.txt' INTO TABLE abcd;
+
+select * from abcd;
+set hive.map.aggr=true;
+explain select a, count(distinct b), count(distinct c), sum(d) from abcd group by a;
+select a, count(distinct b), count(distinct c), sum(d) from abcd group by a;
+
+explain select count(1), count(*), count(a), count(b), count(c), count(d), count(distinct a), count(distinct b), count(distinct c), count(distinct d), count(distinct a,b), count(distinct b,c), count(distinct c,d), count(distinct a,d), count(distinct a,c), count(distinct b,d), count(distinct a,b,c), count(distinct b,c,d), count(distinct a,c,d), count(distinct a,b,d), count(distinct a,b,c,d) from abcd;
+select count(1), count(*), count(a), count(b), count(c), count(d), count(distinct a), count(distinct b), count(distinct c), count(distinct d), count(distinct a,b), count(distinct b,c), count(distinct c,d), count(distinct a,d), count(distinct a,c), count(distinct b,d), count(distinct a,b,c), count(distinct b,c,d), count(distinct a,c,d), count(distinct a,b,d), count(distinct a,b,c,d) from abcd;
+
+set hive.map.aggr=false;
+explain select a, count(distinct b), count(distinct c), sum(d) from abcd group by a;
+select a, count(distinct b), count(distinct c), sum(d) from abcd group by a;
+
+explain select count(1), count(*), count(a), count(b), count(c), count(d), count(distinct a), count(distinct b), count(distinct c), count(distinct d), count(distinct a,b), count(distinct b,c), count(distinct c,d), count(distinct a,d), count(distinct a,c), count(distinct b,d), count(distinct a,b,c), count(distinct b,c,d), count(distinct a,c,d), count(distinct a,b,d), count(distinct a,b,c,d) from abcd;
+select count(1), count(*), count(a), count(b), count(c), count(d), count(distinct a), count(distinct b), count(distinct c), count(distinct d), count(distinct a,b), count(distinct b,c), count(distinct c,d), count(distinct a,d), count(distinct a,c), count(distinct b,d), count(distinct a,b,c), count(distinct b,c,d), count(distinct a,c,d), count(distinct a,b,d), count(distinct a,b,c,d) from abcd;

Added: hive/trunk/ql/src/test/queries/clientpositive/groupby2_map_multi_distinct.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/groupby2_map_multi_distinct.q?rev=1028072&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/groupby2_map_multi_distinct.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/groupby2_map_multi_distinct.q Wed Oct 27 19:00:02 2010
@@ -0,0 +1,14 @@
+set hive.map.aggr=true;
+set hive.groupby.skewindata=false;
+set mapred.reduce.tasks=31;
+
+CREATE TABLE dest1(key STRING, c1 INT, c2 STRING, c3 INT, c4 INT) STORED AS TEXTFILE;
+
+EXPLAIN
+FROM src
+INSERT OVERWRITE TABLE dest1 SELECT substr(src.key,1,1), count(DISTINCT substr(src.value,5)), concat(substr(src.key,1,1),sum(substr(src.value,5))), sum(DISTINCT substr(src.value, 5)), count(src.value) GROUP BY substr(src.key,1,1);
+
+FROM src
+INSERT OVERWRITE TABLE dest1 SELECT substr(src.key,1,1), count(DISTINCT substr(src.value,5)), concat(substr(src.key,1,1),sum(substr(src.value,5))), sum(DISTINCT substr(src.value, 5)), count(src.value) GROUP BY substr(src.key,1,1);
+
+SELECT dest1.* FROM dest1;

Added: hive/trunk/ql/src/test/queries/clientpositive/groupby2_noskew_multi_distinct.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/groupby2_noskew_multi_distinct.q?rev=1028072&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/groupby2_noskew_multi_distinct.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/groupby2_noskew_multi_distinct.q Wed Oct 27 19:00:02 2010
@@ -0,0 +1,14 @@
+set hive.map.aggr=false;
+set hive.groupby.skewindata=false;
+set mapred.reduce.tasks=31;
+
+CREATE TABLE dest_g2(key STRING, c1 INT, c2 STRING, c3 INT, c4 INT) STORED AS TEXTFILE;
+
+EXPLAIN
+FROM src
+INSERT OVERWRITE TABLE dest_g2 SELECT substr(src.key,1,1), count(DISTINCT substr(src.value,5)), concat(substr(src.key,1,1),sum(substr(src.value,5))), sum(DISTINCT substr(src.value, 5)), count(src.value) GROUP BY substr(src.key,1,1);
+
+FROM src
+INSERT OVERWRITE TABLE dest_g2 SELECT substr(src.key,1,1), count(DISTINCT substr(src.value,5)), concat(substr(src.key,1,1),sum(substr(src.value,5))), sum(DISTINCT substr(src.value, 5)), count(src.value) GROUP BY substr(src.key,1,1);
+
+SELECT dest_g2.* FROM dest_g2;

Added: hive/trunk/ql/src/test/queries/clientpositive/groupby3_map_multi_distinct.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/groupby3_map_multi_distinct.q?rev=1028072&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/groupby3_map_multi_distinct.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/groupby3_map_multi_distinct.q Wed Oct 27 19:00:02 2010
@@ -0,0 +1,36 @@
+set hive.map.aggr=true;
+set hive.groupby.skewindata=false;
+set mapred.reduce.tasks=31;
+
+CREATE TABLE dest1(c1 DOUBLE, c2 DOUBLE, c3 DOUBLE, c4 DOUBLE, c5 DOUBLE, c6 DOUBLE, c7 DOUBLE, c8 DOUBLE, c9 DOUBLE, c10 DOUBLE, c11 DOUBLE) STORED AS TEXTFILE;
+
+EXPLAIN
+FROM src
+INSERT OVERWRITE TABLE dest1 SELECT
+  sum(substr(src.value,5)),
+  avg(substr(src.value,5)),
+  avg(DISTINCT substr(src.value,5)),
+  max(substr(src.value,5)),
+  min(substr(src.value,5)),
+  std(substr(src.value,5)),
+  stddev_samp(substr(src.value,5)),
+  variance(substr(src.value,5)),
+  var_samp(substr(src.value,5)),
+  sum(DISTINCT substr(src.value, 5)),
+  count(DISTINCT substr(src.value, 5));
+
+FROM src
+INSERT OVERWRITE TABLE dest1 SELECT
+  sum(substr(src.value,5)),
+  avg(substr(src.value,5)),
+  avg(DISTINCT substr(src.value,5)),
+  max(substr(src.value,5)),
+  min(substr(src.value,5)),
+  std(substr(src.value,5)),
+  stddev_samp(substr(src.value,5)),
+  variance(substr(src.value,5)),
+  var_samp(substr(src.value,5)),
+  sum(DISTINCT substr(src.value, 5)),
+  count(DISTINCT substr(src.value, 5));
+
+SELECT dest1.* FROM dest1;

Added: hive/trunk/ql/src/test/queries/clientpositive/groupby3_noskew_multi_distinct.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/groupby3_noskew_multi_distinct.q?rev=1028072&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/groupby3_noskew_multi_distinct.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/groupby3_noskew_multi_distinct.q Wed Oct 27 19:00:02 2010
@@ -0,0 +1,38 @@
+set hive.map.aggr=false;
+
+set hive.groupby.skewindata=false;
+set mapred.reduce.tasks=31;
+
+CREATE TABLE dest1(c1 DOUBLE, c2 DOUBLE, c3 DOUBLE, c4 DOUBLE, c5 DOUBLE, c6 DOUBLE, c7 DOUBLE, c8 DOUBLE, c9 DOUBLE, c10 DOUBLE, c11 DOUBLE) STORED AS TEXTFILE;
+
+EXPLAIN
+FROM src
+INSERT OVERWRITE TABLE dest1 SELECT
+  sum(substr(src.value,5)),
+  avg(substr(src.value,5)),
+  avg(DISTINCT substr(src.value,5)),
+  max(substr(src.value,5)),
+  min(substr(src.value,5)),
+  std(substr(src.value,5)),
+  stddev_samp(substr(src.value,5)),
+  variance(substr(src.value,5)),
+  var_samp(substr(src.value,5)),
+  sum(DISTINCT substr(src.value, 5)),
+  count(DISTINCT substr(src.value, 5));
+
+FROM src
+INSERT OVERWRITE TABLE dest1 SELECT
+  sum(substr(src.value,5)),
+  avg(substr(src.value,5)),
+  avg(DISTINCT substr(src.value,5)),
+  max(substr(src.value,5)),
+  min(substr(src.value,5)),
+  std(substr(src.value,5)),
+  stddev_samp(substr(src.value,5)),
+  variance(substr(src.value,5)),
+  var_samp(substr(src.value,5)),
+  sum(DISTINCT substr(src.value, 5)),
+  count(DISTINCT substr(src.value, 5));
+
+SELECT dest1.* FROM dest1;
+



Mime
View raw message