hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hashut...@apache.org
Subject svn commit: r1623929 [1/2] - in /hive/trunk: common/src/java/org/apache/hadoop/hive/conf/ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/ ql/src/java/org/apache/hadoop/hive/ql/plan/ ql/src/te...
Date Wed, 10 Sep 2014 07:41:19 GMT
Author: hashutosh
Date: Wed Sep 10 07:41:19 2014
New Revision: 1623929

URL: http://svn.apache.org/r1623929
Log:
HIVE-7405 : Vectorize GROUP BY on the Reduce-Side (Part 1 – Basic) (Matt McCline via Ashutosh Chauhan)

Added:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnSetInfo.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupKeyHelper.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractVectorDesc.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/VectorDesc.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/VectorGroupByDesc.java
Modified:
    hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/AggregateDefinition.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapper.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapperBatch.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java
    hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/optimizer/physical/TestVectorizer.java
    hive/trunk/ql/src/test/queries/clientpositive/dynpart_sort_opt_vectorization.q
    hive/trunk/ql/src/test/results/clientpositive/dynpart_sort_opt_vectorization.q.out
    hive/trunk/ql/src/test/results/clientpositive/tez/dynpart_sort_opt_vectorization.q.out
    hive/trunk/ql/src/test/results/clientpositive/tez/tez_join_hash.q.out
    hive/trunk/ql/src/test/results/clientpositive/tez/vector_left_outer_join.q.out
    hive/trunk/ql/src/test/results/clientpositive/tez/vectorized_nested_mapjoin.q.out
    hive/trunk/ql/src/test/results/clientpositive/tez/vectorized_ptf.q.out

Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1623929&r1=1623928&r2=1623929&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Wed Sep 10 07:41:19 2014
@@ -1710,6 +1710,9 @@ public class HiveConf extends Configurat
     HIVE_VECTORIZATION_ENABLED("hive.vectorized.execution.enabled", false,
         "This flag should be set to true to enable vectorized mode of query execution.\n" +
         "The default value is false."),
+    HIVE_VECTORIZATION_REDUCE_ENABLED("hive.vectorized.execution.reduce.enabled", true,
+            "This flag should be set to true to enable vectorized mode of the reduce-side of query execution.\n" +
+            "The default value is true."),
     HIVE_VECTORIZATION_GROUPBY_CHECKINTERVAL("hive.vectorized.groupby.checkinterval", 100000,
         "Number of entries added to the group by aggregation hash before a recomputation of average entry size is performed."),
     HIVE_VECTORIZATION_GROUPBY_MAXENTRIES("hive.vectorized.groupby.maxentries", 1000000,

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/AggregateDefinition.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/AggregateDefinition.java?rev=1623929&r1=1623928&r2=1623929&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/AggregateDefinition.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/AggregateDefinition.java Wed Sep 10 07:41:19 2014
@@ -19,16 +19,20 @@
 package org.apache.hadoop.hive.ql.exec.vector;
 
 import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
+import org.apache.hadoop.hive.ql.plan.GroupByDesc;
 
 class AggregateDefinition {
+
   private String name;
   private VectorExpressionDescriptor.ArgumentType type;
+  private GroupByDesc.Mode mode;
   private Class<? extends VectorAggregateExpression> aggClass;
 
   AggregateDefinition(String name, VectorExpressionDescriptor.ArgumentType type, 
-            Class<? extends VectorAggregateExpression> aggClass) {
+		  GroupByDesc.Mode mode, Class<? extends VectorAggregateExpression> aggClass) {
     this.name = name;
     this.type = type;
+    this.mode = mode;
     this.aggClass = aggClass;
   }
 
@@ -38,6 +42,9 @@ class AggregateDefinition {
   VectorExpressionDescriptor.ArgumentType getType() {
     return type;
   }
+  GroupByDesc.Mode getMode() {
+	return mode;
+  }
   Class<? extends VectorAggregateExpression> getAggClass() {
     return aggClass;
   }

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnSetInfo.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnSetInfo.java?rev=1623929&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnSetInfo.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnSetInfo.java Wed Sep 10 07:41:19 2014
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector;
+
+import java.util.Arrays;
+
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+
+/**
+ * Class to keep information on a set of typed vector columns.  Used by
+ * other classes to efficiently access the set of columns.
+ */
+public class VectorColumnSetInfo {
+
+  // For simpler access, we make these members protected instead of
+  // providing get methods.
+
+  /**
+   * indices of LONG primitive keys.
+   */
+  protected int[] longIndices;
+
+  /**
+   * indices of DOUBLE primitive keys.
+   */
+  protected int[] doubleIndices;
+
+  /**
+   * indices of string (byte[]) primitive keys.
+   */
+  protected int[] stringIndices;
+
+  /**
+   * indices of decimal primitive keys.
+   */
+  protected int[] decimalIndices;
+
+  /**
+   * Helper class for looking up a key value based on key index.
+   */
+  public class KeyLookupHelper {
+    public int longIndex;
+    public int doubleIndex;
+    public int stringIndex;
+    public int decimalIndex;
+
+    private static final int INDEX_UNUSED = -1;
+
+    private void resetIndices() {
+        this.longIndex = this.doubleIndex = this.stringIndex = this.decimalIndex = INDEX_UNUSED;
+    }
+    public void setLong(int index) {
+      resetIndices();
+      this.longIndex= index;
+    }
+
+    public void setDouble(int index) {
+      resetIndices();
+      this.doubleIndex = index;
+    }
+
+    public void setString(int index) {
+      resetIndices();
+      this.stringIndex = index;
+    }
+
+    public void setDecimal(int index) {
+      resetIndices();
+      this.decimalIndex = index;
+    }
+  }
+
+  /**
+   * Lookup vector to map from key index to primitive type index.
+   */
+  protected KeyLookupHelper[] indexLookup;
+
+  private int keyCount;
+  private int addIndex;
+
+  protected int longIndicesIndex;
+  protected int doubleIndicesIndex;
+  protected int stringIndicesIndex;
+  protected int decimalIndicesIndex;
+
+  protected VectorColumnSetInfo(int keyCount) {
+    this.keyCount = keyCount;
+    this.addIndex = 0;
+
+    // We'll over allocate and then shrink the array for each type
+    longIndices = new int[this.keyCount];
+    longIndicesIndex = 0;
+    doubleIndices = new int[this.keyCount];
+    doubleIndicesIndex  = 0;
+    stringIndices = new int[this.keyCount];
+    stringIndicesIndex = 0;
+    decimalIndices = new int[this.keyCount];
+    decimalIndicesIndex = 0;
+    indexLookup = new KeyLookupHelper[this.keyCount];
+  }
+
+  protected void addKey(String outputType) throws HiveException {
+    indexLookup[addIndex] = new KeyLookupHelper();
+    if (VectorizationContext.isIntFamily(outputType) ||
+        VectorizationContext.isDatetimeFamily(outputType)) {
+      longIndices[longIndicesIndex] = addIndex;
+      indexLookup[addIndex].setLong(longIndicesIndex);
+      ++longIndicesIndex;
+    } else if (VectorizationContext.isFloatFamily(outputType)) {
+      doubleIndices[doubleIndicesIndex] = addIndex;
+      indexLookup[addIndex].setDouble(doubleIndicesIndex);
+      ++doubleIndicesIndex;
+    } else if (VectorizationContext.isStringFamily(outputType)) {
+      stringIndices[stringIndicesIndex]= addIndex;
+      indexLookup[addIndex].setString(stringIndicesIndex);
+      ++stringIndicesIndex;
+    } else if (VectorizationContext.isDecimalFamily(outputType)) {
+        decimalIndices[decimalIndicesIndex]= addIndex;
+        indexLookup[addIndex].setDecimal(decimalIndicesIndex);
+        ++decimalIndicesIndex;
+    }
+    else {
+      throw new HiveException("Unsuported vector output type: " + outputType);
+    }
+    addIndex++;
+  }
+
+  protected void finishAdding() {
+    longIndices = Arrays.copyOf(longIndices, longIndicesIndex);
+    doubleIndices = Arrays.copyOf(doubleIndices, doubleIndicesIndex);
+    stringIndices = Arrays.copyOf(stringIndices, stringIndicesIndex);
+    decimalIndices = Arrays.copyOf(decimalIndices, decimalIndicesIndex);
+  }
+}
\ No newline at end of file

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java?rev=1623929&r1=1623928&r2=1623929&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java Wed Sep 10 07:41:19 2014
@@ -32,8 +32,11 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator;
+import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluatorFactory;
 import org.apache.hadoop.hive.ql.exec.GroupByOperator;
 import org.apache.hadoop.hive.ql.exec.KeyWrapper;
+import org.apache.hadoop.hive.ql.exec.KeyWrapperFactory;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriterFactory;
@@ -47,13 +50,17 @@ import org.apache.hadoop.hive.ql.plan.ap
 import org.apache.hadoop.hive.ql.util.JavaDataModel;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.DataOutputBuffer;
 
 /**
  * Vectorized GROUP BY operator implementation. Consumes the vectorized input and
  * stores the aggregate operators' intermediate states. Emits row mode output.
  *
  */
-public class VectorGroupByOperator extends GroupByOperator {
+public class VectorGroupByOperator extends GroupByOperator implements VectorizationContextRegion {
 
   private static final Log LOG = LogFactory.getLog(
       VectorGroupByOperator.class.getName());
@@ -70,6 +77,17 @@ public class VectorGroupByOperator exten
    */
   private VectorExpression[] keyExpressions;
 
+  private boolean isVectorOutput;
+
+  // Create a new outgoing vectorization context because column name map will change.
+  private VectorizationContext vOutContext = null;
+
+  private String fileKey;
+
+  // The above members are initialized by the constructor and must not be
+  // transient.
+  //---------------------------------------------------------------------------
+
   private transient VectorExpressionWriter[] keyOutputWriters;
 
   /**
@@ -85,11 +103,18 @@ public class VectorGroupByOperator exten
 
   private transient Object[] forwardCache;
 
+  private transient VectorizedRowBatch outputBatch;
+  private transient VectorizedRowBatchCtx vrbCtx;
+
+  private transient VectorColumnAssign[] vectorColumnAssign;
+  
   /**
-   * Interface for processing mode: global, hash or streaming
+   * Interface for processing mode: global, hash, unsorted streaming, or group batch
    */
   private static interface IProcessingMode {
     public void initialize(Configuration hconf) throws HiveException;
+    public void startGroup() throws HiveException;
+    public void endGroup() throws HiveException;
     public void processBatch(VectorizedRowBatch batch) throws HiveException;
     public void close(boolean aborted) throws HiveException;
   }
@@ -98,6 +123,15 @@ public class VectorGroupByOperator exten
    * Base class for all processing modes
    */
   private abstract class ProcessingModeBase implements IProcessingMode {
+
+    // Overridden and used in sorted reduce group batch processing mode.
+    public void startGroup() throws HiveException {
+      // Do nothing.
+    }
+    public void endGroup() throws HiveException {
+      // Do nothing.
+    }
+
     /**
      * Evaluates the aggregators on the current batch.
      * The aggregationBatchInfo must have been prepared
@@ -170,7 +204,7 @@ public class VectorGroupByOperator exten
     @Override
     public void close(boolean aborted) throws HiveException {
       if (!aborted) {
-        flushSingleRow(null, aggregationBuffers);
+        writeSingleRow(null, aggregationBuffers);
       }
     }
   }
@@ -426,7 +460,7 @@ public class VectorGroupByOperator exten
       while(iter.hasNext()) {
         Map.Entry<KeyWrapper, VectorAggregationBufferRow> pair = iter.next();
 
-        flushSingleRow((VectorHashKeyWrapper) pair.getKey(), pair.getValue());
+        writeSingleRow((VectorHashKeyWrapper) pair.getKey(), pair.getValue());
 
         if (!all) {
           iter.remove();
@@ -501,20 +535,21 @@ public class VectorGroupByOperator exten
         if (numEntriesHashTable > sumBatchSize * minReductionHashAggr) {
           flush(true);
 
-          changeToStreamingMode();
+          changeToUnsortedStreamingMode();
         }
       }
     }
   }
 
   /**
-   * Streaming processing mode. Intermediate values are flushed each time key changes.
-   * In this mode we're relying on the MR shuffle and merge the intermediates in the reduce.
+   * Unsorted streaming processing mode. Each input VectorizedRowBatch may have
+   * a mix of different keys (hence unsorted).  Intermediate values are flushed
+   * each time key changes.
    */
-  private class ProcessingModeStreaming extends ProcessingModeBase {
+  private class ProcessingModeUnsortedStreaming extends ProcessingModeBase {
 
     /** 
-     * The aggreagation buffers used in streaming mode
+     * The aggregation buffers used in streaming mode
      */
     private VectorAggregationBufferRow currentStreamingAggregators;
 
@@ -557,7 +592,7 @@ public class VectorGroupByOperator exten
               // Nothing to do
             }
           });
-      LOG.info("using streaming aggregation processing mode");
+      LOG.info("using unsorted streaming aggregation processing mode");
     }
 
     @Override
@@ -601,7 +636,7 @@ public class VectorGroupByOperator exten
 
       // Now flush/forward all keys/rows, except the last (current) one
       for (int i = 0; i < flushMark; ++i) {
-        flushSingleRow(keysToFlush[i], rowsToFlush[i]);
+        writeSingleRow(keysToFlush[i], rowsToFlush[i]);
         rowsToFlush[i].reset();
         streamAggregationBufferRowPool.putInPool(rowsToFlush[i]);
       }
@@ -610,7 +645,79 @@ public class VectorGroupByOperator exten
     @Override
     public void close(boolean aborted) throws HiveException {
       if (!aborted && null != streamingKey) {
-        flushSingleRow(streamingKey, currentStreamingAggregators);
+        writeSingleRow(streamingKey, currentStreamingAggregators);
+      }
+    }
+  }
+
+  /**
+   * Sorted reduce group batch processing mode. Each input VectorizedRowBatch will have the
+   * same key.  On endGroup (or close), the intermediate values are flushed.
+   */
+  private class ProcessingModeGroupBatches extends ProcessingModeBase {
+
+    private boolean inGroup;
+    private boolean first;
+
+    /**
+     * The group vector key helper.
+     */
+    VectorGroupKeyHelper groupKeyHelper;
+
+    /** 
+     * The group vector aggregation buffers.
+     */
+    private VectorAggregationBufferRow groupAggregators;
+
+    /**
+     * Buffer to hold string values.
+     */
+    private DataOutputBuffer buffer;
+
+    @Override
+    public void initialize(Configuration hconf) throws HiveException {
+      inGroup = false;
+      groupKeyHelper = new VectorGroupKeyHelper(keyExpressions.length);
+      groupKeyHelper.init(keyExpressions);
+      groupAggregators = allocateAggregationBuffer();
+      buffer = new DataOutputBuffer();
+      LOG.info("using sorted group batch aggregation processing mode");
+    }
+
+    @Override
+    public void startGroup() throws HiveException {
+      inGroup = true;
+      first = true;
+    }
+
+    @Override
+    public void endGroup() throws HiveException {
+      if (inGroup && !first) {
+        writeGroupRow(groupAggregators, buffer);
+        groupAggregators.reset();
+      }
+      inGroup = false;
+    }
+
+    @Override
+    public void processBatch(VectorizedRowBatch batch) throws HiveException {
+      assert(inGroup);
+      if (first) {
+        // Copy the group key to output batch now.  We'll copy in the aggregates at the end of the group.
+        first = false;
+        groupKeyHelper.copyGroupKey(batch, outputBatch, buffer);
+      }
+
+      // Aggregate this batch.
+      for (int i = 0; i < aggregators.length; ++i) {
+        aggregators[i].aggregateInput(groupAggregators.getAggregationBuffer(i), batch);
+      }
+    }
+
+    @Override
+    public void close(boolean aborted) throws HiveException {
+      if (!aborted && inGroup && !first) {
+        writeGroupRow(groupAggregators, buffer);
       }
     }
   }
@@ -633,8 +740,20 @@ public class VectorGroupByOperator exten
     aggregators = new VectorAggregateExpression[aggrDesc.size()];
     for (int i = 0; i < aggrDesc.size(); ++i) {
       AggregationDesc aggDesc = aggrDesc.get(i);
-      aggregators[i] = vContext.getAggregatorExpression(aggDesc);
+      aggregators[i] = vContext.getAggregatorExpression(aggDesc, desc.getVectorDesc().isReduce());
     }
+    
+    isVectorOutput = desc.getVectorDesc().isVectorOutput();
+
+    List<String> outColNames = desc.getOutputColumnNames();
+    Map<String, Integer> mapOutCols = new HashMap<String, Integer>(outColNames.size());
+    int outColIndex = 0;
+    for(String outCol: outColNames) {
+      mapOutCols.put(outCol,  outColIndex++);
+    }
+    vOutContext = new VectorizationContext(mapOutCols, outColIndex);
+    vOutContext.setFileKey(vContext.getFileKey() + "/_GROUPBY_");
+    fileKey = vOutContext.getFileKey();
   }
 
   public VectorGroupByOperator() {
@@ -662,13 +781,23 @@ public class VectorGroupByOperator exten
         objectInspectors.add(aggregators[i].getOutputObjectInspector());
       }
 
-      keyWrappersBatch = VectorHashKeyWrapperBatch.compileKeyWrapperBatch(keyExpressions);
-      aggregationBatchInfo = new VectorAggregationBufferBatch();
-      aggregationBatchInfo.compileAggregationBatchInfo(aggregators);
-
+      if (!conf.getVectorDesc().isVectorGroupBatches()) {
+        // These data structures are only used by the map-side processing modes.
+        keyWrappersBatch = VectorHashKeyWrapperBatch.compileKeyWrapperBatch(keyExpressions);
+        aggregationBatchInfo = new VectorAggregationBufferBatch();
+        aggregationBatchInfo.compileAggregationBatchInfo(aggregators);
+      }
+      LOG.warn("VectorGroupByOperator is vector output " + isVectorOutput);
       List<String> outputFieldNames = conf.getOutputColumnNames();
       outputObjInspector = ObjectInspectorFactory.getStandardStructObjectInspector(
           outputFieldNames, objectInspectors);
+      if (isVectorOutput) {
+          vrbCtx = new VectorizedRowBatchCtx();
+          vrbCtx.init(hconf, fileKey, (StructObjectInspector) outputObjInspector);
+          outputBatch = vrbCtx.createVectorizedRowBatch();
+          vectorColumnAssign = VectorColumnAssignFactory.buildAssigners(
+              outputBatch, outputObjInspector, vOutContext.getColumnMap(), conf.getOutputColumnNames());
+      }
 
     } catch (HiveException he) {
       throw he;
@@ -678,32 +807,43 @@ public class VectorGroupByOperator exten
 
     initializeChildren(hconf);
 
-    forwardCache =new Object[keyExpressions.length + aggregators.length];
+    forwardCache = new Object[keyExpressions.length + aggregators.length];
 
     if (keyExpressions.length == 0) {
-      processingMode = this.new ProcessingModeGlobalAggregate();
-    }
-    else {
-      //TODO: consider if parent can offer order guarantees
-      // If input is sorted, is more efficient to use the streaming mode
+        processingMode = this.new ProcessingModeGlobalAggregate();
+    } else if (conf.getVectorDesc().isVectorGroupBatches()) {
+      // Sorted GroupBy of vector batches where an individual batch has the same group key (e.g. reduce).
+      processingMode = this.new ProcessingModeGroupBatches();
+    } else {
+      // We start in hash mode and may dynamically switch to unsorted stream mode.
       processingMode = this.new ProcessingModeHashAggregate();
     }
     processingMode.initialize(hconf);
   }
 
   /**
-   * changes the processing mode to streaming
+   * changes the processing mode to unsorted streaming
    * This is done at the request of the hash agg mode, if the number of keys 
    * exceeds the minReductionHashAggr factor
    * @throws HiveException 
    */
-  private void changeToStreamingMode() throws HiveException {
-    processingMode = this.new ProcessingModeStreaming();
+  private void changeToUnsortedStreamingMode() throws HiveException {
+    processingMode = this.new ProcessingModeUnsortedStreaming();
     processingMode.initialize(null);
     LOG.trace("switched to streaming mode");
   }
 
   @Override
+  public void startGroup() throws HiveException {
+    processingMode.startGroup();
+  }
+
+  @Override
+  public void endGroup() throws HiveException {
+    processingMode.endGroup();
+  }
+
+  @Override
   public void processOp(Object row, int tag) throws HiveException {
     VectorizedRowBatch batch = (VectorizedRowBatch) row;
 
@@ -719,26 +859,72 @@ public class VectorGroupByOperator exten
    * @param agg
    * @throws HiveException
    */
-  private void flushSingleRow(VectorHashKeyWrapper kw, VectorAggregationBufferRow agg)
+  private void writeSingleRow(VectorHashKeyWrapper kw, VectorAggregationBufferRow agg)
       throws HiveException {
     int fi = 0;
-    for (int i = 0; i < keyExpressions.length; ++i) {
-      forwardCache[fi++] = keyWrappersBatch.getWritableKeyValue (
-          kw, i, keyOutputWriters[i]);
+    if (!isVectorOutput) {
+      // Output row.
+      for (int i = 0; i < keyExpressions.length; ++i) {
+        forwardCache[fi++] = keyWrappersBatch.getWritableKeyValue (
+            kw, i, keyOutputWriters[i]);
+      }
+      for (int i = 0; i < aggregators.length; ++i) {
+        forwardCache[fi++] = aggregators[i].evaluateOutput(agg.getAggregationBuffer(i));
+      }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(String.format("forwarding keys: %s: %s",
+            kw, Arrays.toString(forwardCache)));
+      }
+      forward(forwardCache, outputObjInspector);
+    } else {
+      // Output keys and aggregates into the output batch.
+      for (int i = 0; i < keyExpressions.length; ++i) {
+        vectorColumnAssign[fi++].assignObjectValue(keyWrappersBatch.getWritableKeyValue (
+                  kw, i, keyOutputWriters[i]), outputBatch.size);
+      }
+      for (int i = 0; i < aggregators.length; ++i) {
+        vectorColumnAssign[fi++].assignObjectValue(aggregators[i].evaluateOutput(
+                  agg.getAggregationBuffer(i)), outputBatch.size);
+      }
+      ++outputBatch.size;
+      if (outputBatch.size == VectorizedRowBatch.DEFAULT_SIZE) {
+        flushOutput();
+      }
     }
+  }
+
+  /**
+   * Emits a (reduce) group row, made from the key (copied in at the beginning of the group) and
+   * the row aggregation buffers values
+   * @param agg
+   * @param buffer
+   * @throws HiveException
+   */
+  private void writeGroupRow(VectorAggregationBufferRow agg, DataOutputBuffer buffer)
+      throws HiveException {
+    int fi = keyExpressions.length;   // Start after group keys.
     for (int i = 0; i < aggregators.length; ++i) {
-      forwardCache[fi++] = aggregators[i].evaluateOutput(agg.getAggregationBuffer(i));
+      vectorColumnAssign[fi++].assignObjectValue(aggregators[i].evaluateOutput(
+                agg.getAggregationBuffer(i)), outputBatch.size);
     }
-    if (LOG.isDebugEnabled()) {
-      LOG.debug(String.format("forwarding keys: %s: %s",
-          kw, Arrays.toString(forwardCache)));
+    ++outputBatch.size;
+    if (outputBatch.size == VectorizedRowBatch.DEFAULT_SIZE) {
+      flushOutput();
+      buffer.reset();
     }
-    forward(forwardCache, outputObjInspector);
+  }
+
+  private void flushOutput() throws HiveException {
+    forward(outputBatch, null);
+    outputBatch.reset();
   }
 
   @Override
   public void closeOp(boolean aborted) throws HiveException {
     processingMode.close(aborted);
+    if (!aborted && isVectorOutput && outputBatch.size > 0) {
+      flushOutput();
+    }
   }
 
   static public String getOperatorName() {
@@ -761,4 +947,8 @@ public class VectorGroupByOperator exten
     this.aggregators = aggregators;
   }
 
+  @Override
+  public VectorizationContext getOuputVectorizationContext() {
+    return vOutContext;
+  }
 }

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupKeyHelper.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupKeyHelper.java?rev=1623929&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupKeyHelper.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupKeyHelper.java Wed Sep 10 07:41:19 2014
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.io.DataOutputBuffer;
+
+/**
+ * Class for copying the group key from an input batch to an output batch.
+ */
+public class VectorGroupKeyHelper extends VectorColumnSetInfo {
+
+  public VectorGroupKeyHelper(int keyCount) {
+    super(keyCount);
+   }
+
+  void init(VectorExpression[] keyExpressions) throws HiveException {
+    // Inspect the output type of each key expression.
+    for(int i=0; i < keyExpressions.length; ++i) {
+      addKey(keyExpressions[i].getOutputType());
+    }
+    finishAdding();
+  }
+
+  public void copyGroupKey(VectorizedRowBatch inputBatch, VectorizedRowBatch outputBatch,
+          DataOutputBuffer buffer) throws HiveException {
+    // Grab the key at index 0.  We don't care about selected or repeating since all keys in the input batch are the same.
+    for(int i = 0; i< longIndices.length; ++i) {
+      int keyIndex = longIndices[i];
+      LongColumnVector inputColumnVector = (LongColumnVector) inputBatch.cols[keyIndex];
+      LongColumnVector outputColumnVector = (LongColumnVector) outputBatch.cols[keyIndex];
+      if (inputColumnVector.noNulls || !inputColumnVector.isNull[0]) {
+        outputColumnVector.vector[outputBatch.size] = inputColumnVector.vector[0];
+      } else if (inputColumnVector.noNulls ){
+        outputColumnVector.noNulls = false;
+        outputColumnVector.isNull[outputBatch.size] = true;
+      } else {
+        outputColumnVector.isNull[outputBatch.size] = true;
+      }
+    }
+    for(int i=0;i<doubleIndices.length; ++i) {
+      int keyIndex = doubleIndices[i];
+      DoubleColumnVector inputColumnVector = (DoubleColumnVector) inputBatch.cols[keyIndex];
+      DoubleColumnVector outputColumnVector = (DoubleColumnVector) outputBatch.cols[keyIndex];
+      if (inputColumnVector.noNulls || !inputColumnVector.isNull[0]) {
+        outputColumnVector.vector[outputBatch.size] = inputColumnVector.vector[0];
+      } else if (inputColumnVector.noNulls ){
+        outputColumnVector.noNulls = false;
+        outputColumnVector.isNull[outputBatch.size] = true;
+      } else {
+        outputColumnVector.isNull[outputBatch.size] = true;
+      }
+    }
+    for(int i=0;i<stringIndices.length; ++i) {
+      int keyIndex = stringIndices[i];
+      BytesColumnVector inputColumnVector = (BytesColumnVector) inputBatch.cols[keyIndex];
+      BytesColumnVector outputColumnVector = (BytesColumnVector) outputBatch.cols[keyIndex];
+      if (inputColumnVector.noNulls || !inputColumnVector.isNull[0]) {
+        // Copy bytes into scratch buffer.
+        int start = buffer.getLength();
+        int length = inputColumnVector.length[0];
+        try {
+          buffer.write(inputColumnVector.vector[0], inputColumnVector.start[0], length);
+        } catch (IOException ioe) {
+          throw new IllegalStateException("bad write", ioe);
+        }
+        outputColumnVector.setRef(outputBatch.size, buffer.getData(), start, length);
+      } else if (inputColumnVector.noNulls ){
+        outputColumnVector.noNulls = false;
+        outputColumnVector.isNull[outputBatch.size] = true;
+      } else {
+        outputColumnVector.isNull[outputBatch.size] = true;
+      }
+    }
+    for(int i=0;i<decimalIndices.length; ++i) {
+      int keyIndex = decimalIndices[i];
+      DecimalColumnVector inputColumnVector = (DecimalColumnVector) inputBatch.cols[keyIndex];
+      DecimalColumnVector outputColumnVector = (DecimalColumnVector) outputBatch.cols[keyIndex];
+      if (inputColumnVector.noNulls || !inputColumnVector.isNull[0]) {
+        outputColumnVector.vector[outputBatch.size] = inputColumnVector.vector[0];
+      } else if (inputColumnVector.noNulls ){
+        outputColumnVector.noNulls = false;
+        outputColumnVector.isNull[outputBatch.size] = true;
+      } else {
+        outputColumnVector.isNull[outputBatch.size] = true;
+      }
+    }
+  }
+}

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapper.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapper.java?rev=1623929&r1=1623928&r2=1623929&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapper.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapper.java Wed Sep 10 07:41:19 2014
@@ -60,6 +60,7 @@ public class VectorHashKeyWrapper extend
     byteStarts = new int[byteValuesCount];
     byteLengths = new int[byteValuesCount];
     isNull = new boolean[longValuesCount + doubleValuesCount + byteValuesCount + decimalValuesCount];
+    hashcode = 0;
   }
 
   private VectorHashKeyWrapper() {

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapperBatch.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapperBatch.java?rev=1623929&r1=1623928&r2=1623929&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapperBatch.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapperBatch.java Wed Sep 10 07:41:19 2014
@@ -32,41 +32,10 @@ import org.apache.hadoop.hive.serde2.laz
  * This class stores additional information about keys needed to evaluate and output the key values.
  *
  */
-public class VectorHashKeyWrapperBatch {
+public class VectorHashKeyWrapperBatch extends VectorColumnSetInfo {
 
-  /**
-   * Helper class for looking up a key value based on key index.
-   */
-  private static class KeyLookupHelper {
-    private int longIndex;
-    private int doubleIndex;
-    private int stringIndex;
-    private int decimalIndex;
-
-    private static final int INDEX_UNUSED = -1;
-
-    private void resetIndices() {
-        this.longIndex = this.doubleIndex = this.stringIndex = this.decimalIndex = INDEX_UNUSED;
-    }
-    public void setLong(int index) {
-        resetIndices();
-        this.longIndex= index;
-    }
-
-    public void setDouble(int index) {
-        resetIndices();
-        this.doubleIndex = index;
-    }
-
-    public void setString(int index) {
-        resetIndices();
-        this.stringIndex = index;
-    }
-
-    public void setDecimal(int index) {
-        resetIndices();
-        this.decimalIndex = index;
-    }
+  public VectorHashKeyWrapperBatch(int keyCount) {
+    super(keyCount);
   }
 
   /**
@@ -80,26 +49,6 @@ public class VectorHashKeyWrapperBatch {
   private VectorExpression[] keyExpressions;
 
   /**
-   * indices of LONG primitive keys.
-   */
-  private int[] longIndices;
-
-  /**
-   * indices of DOUBLE primitive keys.
-   */
-  private int[] doubleIndices;
-
-  /**
-   * indices of string (byte[]) primitive keys.
-   */
-  private int[] stringIndices;
-
-  /**
-   * indices of decimal primitive keys.
-   */
-  private int[] decimalIndices;
-
-  /**
    * Pre-allocated batch size vector of keys wrappers.
    * N.B. these keys are **mutable** and should never be used in a HashMap.
    * Always clone the key wrapper to obtain an immutable keywrapper suitable
@@ -108,11 +57,6 @@ public class VectorHashKeyWrapperBatch {
   private VectorHashKeyWrapper[] vectorHashKeyWrappers;
 
   /**
-   * Lookup vector to map from key index to primitive type index.
-   */
-  private KeyLookupHelper[] indexLookup;
-
-  /**
    * The fixed size of the key wrappers.
    */
   private int keysFixedSize;
@@ -567,53 +511,17 @@ public class VectorHashKeyWrapperBatch {
    */
   public static VectorHashKeyWrapperBatch compileKeyWrapperBatch(VectorExpression[] keyExpressions)
     throws HiveException {
-    VectorHashKeyWrapperBatch compiledKeyWrapperBatch = new VectorHashKeyWrapperBatch();
+    VectorHashKeyWrapperBatch compiledKeyWrapperBatch = new VectorHashKeyWrapperBatch(keyExpressions.length);
     compiledKeyWrapperBatch.keyExpressions = keyExpressions;
 
     compiledKeyWrapperBatch.keysFixedSize = 0;
 
-    // We'll overallocate and then shrink the array for each type
-    int[] longIndices = new int[keyExpressions.length];
-    int longIndicesIndex = 0;
-    int[] doubleIndices = new int[keyExpressions.length];
-    int doubleIndicesIndex  = 0;
-    int[] stringIndices = new int[keyExpressions.length];
-    int stringIndicesIndex = 0;
-    int[] decimalIndices = new int[keyExpressions.length];
-    int decimalIndicesIndex = 0;
-    KeyLookupHelper[] indexLookup = new KeyLookupHelper[keyExpressions.length];
-
     // Inspect the output type of each key expression.
     for(int i=0; i < keyExpressions.length; ++i) {
-      indexLookup[i] = new KeyLookupHelper();
-      String outputType = keyExpressions[i].getOutputType();
-      if (VectorizationContext.isIntFamily(outputType) ||
-          VectorizationContext.isDatetimeFamily(outputType)) {
-        longIndices[longIndicesIndex] = i;
-        indexLookup[i].setLong(longIndicesIndex);
-        ++longIndicesIndex;
-      } else if (VectorizationContext.isFloatFamily(outputType)) {
-        doubleIndices[doubleIndicesIndex] = i;
-        indexLookup[i].setDouble(doubleIndicesIndex);
-        ++doubleIndicesIndex;
-      } else if (VectorizationContext.isStringFamily(outputType)) {
-        stringIndices[stringIndicesIndex]= i;
-        indexLookup[i].setString(stringIndicesIndex);
-        ++stringIndicesIndex;
-      } else if (VectorizationContext.isDecimalFamily(outputType)) {
-          decimalIndices[decimalIndicesIndex]= i;
-          indexLookup[i].setDecimal(decimalIndicesIndex);
-          ++decimalIndicesIndex;
-      }
-      else {
-        throw new HiveException("Unsuported vector output type: " + outputType);
-      }
-    }
-    compiledKeyWrapperBatch.indexLookup = indexLookup;
-    compiledKeyWrapperBatch.longIndices = Arrays.copyOf(longIndices, longIndicesIndex);
-    compiledKeyWrapperBatch.doubleIndices = Arrays.copyOf(doubleIndices, doubleIndicesIndex);
-    compiledKeyWrapperBatch.stringIndices = Arrays.copyOf(stringIndices, stringIndicesIndex);
-    compiledKeyWrapperBatch.decimalIndices = Arrays.copyOf(decimalIndices, decimalIndicesIndex);
+      compiledKeyWrapperBatch.addKey(keyExpressions[i].getOutputType());
+    }
+    compiledKeyWrapperBatch.finishAdding();
+
     compiledKeyWrapperBatch.vectorHashKeyWrappers =
         new VectorHashKeyWrapper[VectorizedRowBatch.DEFAULT_SIZE];
     for(int i=0;i<VectorizedRowBatch.DEFAULT_SIZE; ++i) {
@@ -632,11 +540,11 @@ public class VectorHashKeyWrapperBatch {
         model.memoryAlign());
 
     // Now add the key wrapper arrays
-    compiledKeyWrapperBatch.keysFixedSize += model.lengthForLongArrayOfSize(longIndicesIndex);
-    compiledKeyWrapperBatch.keysFixedSize += model.lengthForDoubleArrayOfSize(doubleIndicesIndex);
-    compiledKeyWrapperBatch.keysFixedSize += model.lengthForObjectArrayOfSize(stringIndicesIndex);
-    compiledKeyWrapperBatch.keysFixedSize += model.lengthForObjectArrayOfSize(decimalIndicesIndex);
-    compiledKeyWrapperBatch.keysFixedSize += model.lengthForIntArrayOfSize(longIndicesIndex) * 2;
+    compiledKeyWrapperBatch.keysFixedSize += model.lengthForLongArrayOfSize(compiledKeyWrapperBatch.longIndices.length);
+    compiledKeyWrapperBatch.keysFixedSize += model.lengthForDoubleArrayOfSize(compiledKeyWrapperBatch.doubleIndices.length);
+    compiledKeyWrapperBatch.keysFixedSize += model.lengthForObjectArrayOfSize(compiledKeyWrapperBatch.stringIndices.length);
+    compiledKeyWrapperBatch.keysFixedSize += model.lengthForObjectArrayOfSize(compiledKeyWrapperBatch.decimalIndices.length);
+    compiledKeyWrapperBatch.keysFixedSize += model.lengthForIntArrayOfSize(compiledKeyWrapperBatch.longIndices.length) * 2;
     compiledKeyWrapperBatch.keysFixedSize +=
         model.lengthForBooleanArrayOfSize(keyExpressions.length);
 

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java?rev=1623929&r1=1623928&r2=1623929&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java Wed Sep 10 07:41:19 2014
@@ -44,6 +44,7 @@ import org.apache.hadoop.hive.ql.exec.UD
 import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor.InputExpressionType;
 import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor.Mode;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.*;
+import org.apache.hadoop.hive.ql.exec.vector.AggregateDefinition;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorUDAFAvgDecimal;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorUDAFCount;
@@ -83,6 +84,7 @@ import org.apache.hadoop.hive.ql.plan.Ex
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeNullDesc;
+import org.apache.hadoop.hive.ql.plan.GroupByDesc;
 import org.apache.hadoop.hive.ql.udf.SettableUDF;
 import org.apache.hadoop.hive.ql.udf.UDFConv;
 import org.apache.hadoop.hive.ql.udf.UDFHex;
@@ -198,7 +200,8 @@ public class VectorizationContext {
 
   protected int getInputColumnIndex(String name) {
     if (!columnMap.containsKey(name)) {
-      LOG.error(String.format("The column %s is not in the vectorization context column map.", name));
+      LOG.error(String.format("The column %s is not in the vectorization context column map %s.", 
+                 name, columnMap.toString()));
     }
     return columnMap.get(name);
   }
@@ -1880,50 +1883,55 @@ public class VectorizationContext {
     }
   }
 
+  // TODO: When we support vectorized STRUCTs and can handle more in the reduce-side (MERGEPARTIAL):
+  // TODO:   Write reduce-side versions of AVG. Currently, only map-side (HASH) versions are in table. 
+  // TODO:   And, investigate if different reduce-side versions are needed for var* and std*, or if map-side aggregate can be used..  Right now they are conservatively
+  //         marked map-side (HASH).
   static ArrayList<AggregateDefinition> aggregatesDefinition = new ArrayList<AggregateDefinition>() {{
-    add(new AggregateDefinition("min",         VectorExpressionDescriptor.ArgumentType.INT_FAMILY,    VectorUDAFMinLong.class));
-    add(new AggregateDefinition("min",         VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,  VectorUDAFMinDouble.class));
-    add(new AggregateDefinition("min",         VectorExpressionDescriptor.ArgumentType.STRING_FAMILY, VectorUDAFMinString.class));
-    add(new AggregateDefinition("min",         VectorExpressionDescriptor.ArgumentType.DECIMAL,       VectorUDAFMinDecimal.class));
-    add(new AggregateDefinition("max",         VectorExpressionDescriptor.ArgumentType.INT_FAMILY,    VectorUDAFMaxLong.class));
-    add(new AggregateDefinition("max",         VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,  VectorUDAFMaxDouble.class));
-    add(new AggregateDefinition("max",         VectorExpressionDescriptor.ArgumentType.STRING_FAMILY, VectorUDAFMaxString.class));
-    add(new AggregateDefinition("max",         VectorExpressionDescriptor.ArgumentType.DECIMAL,       VectorUDAFMaxDecimal.class));
-    add(new AggregateDefinition("count",       VectorExpressionDescriptor.ArgumentType.NONE,          VectorUDAFCountStar.class));
-    add(new AggregateDefinition("count",       VectorExpressionDescriptor.ArgumentType.INT_FAMILY,    VectorUDAFCount.class));
-    add(new AggregateDefinition("count",       VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,  VectorUDAFCount.class));
-    add(new AggregateDefinition("count",       VectorExpressionDescriptor.ArgumentType.STRING_FAMILY, VectorUDAFCount.class));
-    add(new AggregateDefinition("count",       VectorExpressionDescriptor.ArgumentType.DECIMAL,       VectorUDAFCount.class));
-    add(new AggregateDefinition("sum",         VectorExpressionDescriptor.ArgumentType.INT_FAMILY,    VectorUDAFSumLong.class));
-    add(new AggregateDefinition("sum",         VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,  VectorUDAFSumDouble.class));
-    add(new AggregateDefinition("sum",         VectorExpressionDescriptor.ArgumentType.DECIMAL,       VectorUDAFSumDecimal.class));
-    add(new AggregateDefinition("avg",         VectorExpressionDescriptor.ArgumentType.INT_FAMILY,    VectorUDAFAvgLong.class));
-    add(new AggregateDefinition("avg",         VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,  VectorUDAFAvgDouble.class));
-    add(new AggregateDefinition("avg",         VectorExpressionDescriptor.ArgumentType.DECIMAL,       VectorUDAFAvgDecimal.class));
-    add(new AggregateDefinition("variance",    VectorExpressionDescriptor.ArgumentType.INT_FAMILY,    VectorUDAFVarPopLong.class));
-    add(new AggregateDefinition("var_pop",     VectorExpressionDescriptor.ArgumentType.INT_FAMILY,    VectorUDAFVarPopLong.class));
-    add(new AggregateDefinition("variance",    VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,  VectorUDAFVarPopDouble.class));
-    add(new AggregateDefinition("var_pop",     VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,  VectorUDAFVarPopDouble.class));
-    add(new AggregateDefinition("variance",    VectorExpressionDescriptor.ArgumentType.DECIMAL,       VectorUDAFVarPopDecimal.class));
-    add(new AggregateDefinition("var_pop",     VectorExpressionDescriptor.ArgumentType.DECIMAL,       VectorUDAFVarPopDecimal.class));
-    add(new AggregateDefinition("var_samp",    VectorExpressionDescriptor.ArgumentType.INT_FAMILY,    VectorUDAFVarSampLong.class));
-    add(new AggregateDefinition("var_samp" ,   VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,  VectorUDAFVarSampDouble.class));
-    add(new AggregateDefinition("var_samp" ,   VectorExpressionDescriptor.ArgumentType.DECIMAL,       VectorUDAFVarSampDecimal.class));
-    add(new AggregateDefinition("std",         VectorExpressionDescriptor.ArgumentType.INT_FAMILY,    VectorUDAFStdPopLong.class));
-    add(new AggregateDefinition("stddev",      VectorExpressionDescriptor.ArgumentType.INT_FAMILY,    VectorUDAFStdPopLong.class));
-    add(new AggregateDefinition("stddev_pop",  VectorExpressionDescriptor.ArgumentType.INT_FAMILY,    VectorUDAFStdPopLong.class));
-    add(new AggregateDefinition("std",         VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,  VectorUDAFStdPopDouble.class));
-    add(new AggregateDefinition("stddev",      VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,  VectorUDAFStdPopDouble.class));
-    add(new AggregateDefinition("stddev_pop",  VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,  VectorUDAFStdPopDouble.class));
-    add(new AggregateDefinition("std",         VectorExpressionDescriptor.ArgumentType.DECIMAL,       VectorUDAFStdPopDecimal.class));
-    add(new AggregateDefinition("stddev",      VectorExpressionDescriptor.ArgumentType.DECIMAL,       VectorUDAFStdPopDecimal.class));
-    add(new AggregateDefinition("stddev_pop",  VectorExpressionDescriptor.ArgumentType.DECIMAL,       VectorUDAFStdPopDecimal.class));
-    add(new AggregateDefinition("stddev_samp", VectorExpressionDescriptor.ArgumentType.INT_FAMILY,    VectorUDAFStdSampLong.class));
-    add(new AggregateDefinition("stddev_samp", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,  VectorUDAFStdSampDouble.class));
-    add(new AggregateDefinition("stddev_samp", VectorExpressionDescriptor.ArgumentType.DECIMAL,       VectorUDAFStdSampDecimal.class));
+    add(new AggregateDefinition("min",         VectorExpressionDescriptor.ArgumentType.INT_FAMILY,    null,                          VectorUDAFMinLong.class));
+    add(new AggregateDefinition("min",         VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,  null,                          VectorUDAFMinDouble.class));
+    add(new AggregateDefinition("min",         VectorExpressionDescriptor.ArgumentType.STRING_FAMILY, null,                          VectorUDAFMinString.class));
+    add(new AggregateDefinition("min",         VectorExpressionDescriptor.ArgumentType.DECIMAL,       null,                          VectorUDAFMinDecimal.class));
+    add(new AggregateDefinition("max",         VectorExpressionDescriptor.ArgumentType.INT_FAMILY,    null,                          VectorUDAFMaxLong.class));
+    add(new AggregateDefinition("max",         VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,  null,                          VectorUDAFMaxDouble.class));
+    add(new AggregateDefinition("max",         VectorExpressionDescriptor.ArgumentType.STRING_FAMILY, null,                          VectorUDAFMaxString.class));
+    add(new AggregateDefinition("max",         VectorExpressionDescriptor.ArgumentType.DECIMAL,       null,                          VectorUDAFMaxDecimal.class));
+    add(new AggregateDefinition("count",       VectorExpressionDescriptor.ArgumentType.NONE,          GroupByDesc.Mode.HASH,         VectorUDAFCountStar.class));
+    add(new AggregateDefinition("count",       VectorExpressionDescriptor.ArgumentType.INT_FAMILY,    GroupByDesc.Mode.HASH,         VectorUDAFCount.class));
+    add(new AggregateDefinition("count",       VectorExpressionDescriptor.ArgumentType.INT_FAMILY,    GroupByDesc.Mode.MERGEPARTIAL, VectorUDAFSumLong.class));
+    add(new AggregateDefinition("count",       VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,  GroupByDesc.Mode.HASH,         VectorUDAFCount.class));
+    add(new AggregateDefinition("count",       VectorExpressionDescriptor.ArgumentType.STRING_FAMILY, GroupByDesc.Mode.HASH,         VectorUDAFCount.class));
+    add(new AggregateDefinition("count",       VectorExpressionDescriptor.ArgumentType.DECIMAL,       GroupByDesc.Mode.HASH,         VectorUDAFCount.class));
+    add(new AggregateDefinition("sum",         VectorExpressionDescriptor.ArgumentType.INT_FAMILY,    null,                          VectorUDAFSumLong.class));
+    add(new AggregateDefinition("sum",         VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,  null,                          VectorUDAFSumDouble.class));
+    add(new AggregateDefinition("sum",         VectorExpressionDescriptor.ArgumentType.DECIMAL,       null,                          VectorUDAFSumDecimal.class));
+    add(new AggregateDefinition("avg",         VectorExpressionDescriptor.ArgumentType.INT_FAMILY,    GroupByDesc.Mode.HASH,         VectorUDAFAvgLong.class));
+    add(new AggregateDefinition("avg",         VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,  GroupByDesc.Mode.HASH,         VectorUDAFAvgDouble.class));
+    add(new AggregateDefinition("avg",         VectorExpressionDescriptor.ArgumentType.DECIMAL,       GroupByDesc.Mode.HASH,         VectorUDAFAvgDecimal.class));
+    add(new AggregateDefinition("variance",    VectorExpressionDescriptor.ArgumentType.INT_FAMILY,    GroupByDesc.Mode.HASH,         VectorUDAFVarPopLong.class));
+    add(new AggregateDefinition("var_pop",     VectorExpressionDescriptor.ArgumentType.INT_FAMILY,    GroupByDesc.Mode.HASH,         VectorUDAFVarPopLong.class));
+    add(new AggregateDefinition("variance",    VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,  GroupByDesc.Mode.HASH,         VectorUDAFVarPopDouble.class));
+    add(new AggregateDefinition("var_pop",     VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,  GroupByDesc.Mode.HASH,         VectorUDAFVarPopDouble.class));
+    add(new AggregateDefinition("variance",    VectorExpressionDescriptor.ArgumentType.DECIMAL,       GroupByDesc.Mode.HASH,         VectorUDAFVarPopDecimal.class));
+    add(new AggregateDefinition("var_pop",     VectorExpressionDescriptor.ArgumentType.DECIMAL,       GroupByDesc.Mode.HASH,         VectorUDAFVarPopDecimal.class));
+    add(new AggregateDefinition("var_samp",    VectorExpressionDescriptor.ArgumentType.INT_FAMILY,    GroupByDesc.Mode.HASH,         VectorUDAFVarSampLong.class));
+    add(new AggregateDefinition("var_samp" ,   VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,  GroupByDesc.Mode.HASH,         VectorUDAFVarSampDouble.class));
+    add(new AggregateDefinition("var_samp" ,   VectorExpressionDescriptor.ArgumentType.DECIMAL,       GroupByDesc.Mode.HASH,         VectorUDAFVarSampDecimal.class));
+    add(new AggregateDefinition("std",         VectorExpressionDescriptor.ArgumentType.INT_FAMILY,    GroupByDesc.Mode.HASH,         VectorUDAFStdPopLong.class));
+    add(new AggregateDefinition("stddev",      VectorExpressionDescriptor.ArgumentType.INT_FAMILY,    GroupByDesc.Mode.HASH,         VectorUDAFStdPopLong.class));
+    add(new AggregateDefinition("stddev_pop",  VectorExpressionDescriptor.ArgumentType.INT_FAMILY,    GroupByDesc.Mode.HASH,         VectorUDAFStdPopLong.class));
+    add(new AggregateDefinition("std",         VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,  GroupByDesc.Mode.HASH,         VectorUDAFStdPopDouble.class));
+    add(new AggregateDefinition("stddev",      VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,  GroupByDesc.Mode.HASH,         VectorUDAFStdPopDouble.class));
+    add(new AggregateDefinition("stddev_pop",  VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,  GroupByDesc.Mode.HASH,         VectorUDAFStdPopDouble.class));
+    add(new AggregateDefinition("std",         VectorExpressionDescriptor.ArgumentType.DECIMAL,       GroupByDesc.Mode.HASH,         VectorUDAFStdPopDecimal.class));
+    add(new AggregateDefinition("stddev",      VectorExpressionDescriptor.ArgumentType.DECIMAL,       GroupByDesc.Mode.HASH,         VectorUDAFStdPopDecimal.class));
+    add(new AggregateDefinition("stddev_pop",  VectorExpressionDescriptor.ArgumentType.DECIMAL,       GroupByDesc.Mode.HASH,         VectorUDAFStdPopDecimal.class));
+    add(new AggregateDefinition("stddev_samp", VectorExpressionDescriptor.ArgumentType.INT_FAMILY,    GroupByDesc.Mode.HASH,         VectorUDAFStdSampLong.class));
+    add(new AggregateDefinition("stddev_samp", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,  GroupByDesc.Mode.HASH,         VectorUDAFStdSampDouble.class));
+    add(new AggregateDefinition("stddev_samp", VectorExpressionDescriptor.ArgumentType.DECIMAL,       GroupByDesc.Mode.HASH,         VectorUDAFStdSampDecimal.class));
   }};
 
-  public VectorAggregateExpression getAggregatorExpression(AggregationDesc desc)
+  public VectorAggregateExpression getAggregatorExpression(AggregationDesc desc, boolean isReduce)
       throws HiveException {
 
     ArrayList<ExprNodeDesc> paramDescList = desc.getParameters();
@@ -1948,8 +1956,15 @@ public class VectorizationContext {
     for (AggregateDefinition aggDef : aggregatesDefinition) {
       if (aggregateName.equalsIgnoreCase(aggDef.getName()) &&
           ((aggDef.getType() == VectorExpressionDescriptor.ArgumentType.NONE &&
-            inputType == VectorExpressionDescriptor.ArgumentType.NONE) ||
+           inputType == VectorExpressionDescriptor.ArgumentType.NONE) ||
           (aggDef.getType().isSameTypeOrFamily(inputType)))) {
+
+    	if (aggDef.getMode() == GroupByDesc.Mode.HASH && isReduce) {
+    	  continue;
+    	} else if (aggDef.getMode() == GroupByDesc.Mode.MERGEPARTIAL && !isReduce) {
+    	  continue;
+    	}
+
         Class<? extends VectorAggregateExpression> aggClass = aggDef.getAggClass();
         try
         {
@@ -1967,7 +1982,7 @@ public class VectorizationContext {
     }
 
     throw new HiveException("Vector aggregate not implemented: \"" + aggregateName +
-        "\" for type: \"" + inputType.name() + "");
+        "\" for type: \"" + inputType.name() + " (reduce-side = " + isReduce + ")");
   }
 
   public Map<Integer, String> getOutputColumnTypeMap() {

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java?rev=1623929&r1=1623928&r2=1623929&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java Wed Sep 10 07:41:19 2014
@@ -39,6 +39,7 @@ import org.apache.hadoop.hive.ql.exec.Ut
 import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
 import org.apache.hadoop.hive.ql.io.IOPrepareCache;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.MapredWork;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
 import org.apache.hadoop.hive.serde2.Deserializer;
@@ -124,15 +125,20 @@ public class VectorizedRowBatchCtx {
    * Used by non-tablescan operators when they change the vectorization context 
    * @param hiveConf
    * @param fileKey 
-   *          The key on which to retrieve the extra column mapping from the map scratch
+   *          The key on which to retrieve the extra column mapping from the map/reduce scratch
    * @param rowOI
    *          Object inspector that shapes the column types
    */
   public void init(Configuration hiveConf, String fileKey,
       StructObjectInspector rowOI) {
-    columnTypeMap = Utilities
-        .getMapRedWork(hiveConf).getMapWork().getScratchColumnVectorTypes()
-        .get(fileKey);
+    MapredWork mapredWork = Utilities.getMapRedWork(hiveConf);
+    Map<String, Map<Integer, String>> scratchColumnVectorTypes;
+    if (mapredWork.getMapWork() != null) {
+      scratchColumnVectorTypes = mapredWork.getMapWork().getScratchColumnVectorTypes();
+    } else {
+      scratchColumnVectorTypes = mapredWork.getReduceWork().getScratchColumnVectorTypes();
+    }
+    columnTypeMap = scratchColumnVectorTypes.get(fileKey);
     this.rowOI= rowOI;
     this.rawRowOI = rowOI;
   }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java?rev=1623929&r1=1623928&r2=1623929&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java Wed Sep 10 07:41:19 2014
@@ -38,9 +38,11 @@ import org.apache.hadoop.hive.ql.exec.mr
 import org.apache.hadoop.hive.ql.exec.tez.TezTask;
 import org.apache.hadoop.hive.ql.exec.vector.VectorExtractOperator;
 import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor;
+import org.apache.hadoop.hive.ql.exec.vector.VectorGroupByOperator;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizationContextRegion;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
 import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
 import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
 import org.apache.hadoop.hive.ql.lib.Dispatcher;
@@ -61,6 +63,7 @@ import org.apache.hadoop.hive.ql.plan.Ba
 import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.plan.GroupByDesc;
 import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
@@ -71,6 +74,7 @@ import org.apache.hadoop.hive.ql.plan.SM
 import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.ql.plan.TableScanDesc;
 import org.apache.hadoop.hive.ql.plan.TezWork;
+import org.apache.hadoop.hive.ql.plan.VectorGroupByDesc;
 import org.apache.hadoop.hive.ql.plan.api.OperatorType;
 import org.apache.hadoop.hive.ql.udf.UDFAcos;
 import org.apache.hadoop.hive.ql.udf.UDFAsin;
@@ -290,23 +294,26 @@ public class Vectorizer implements Physi
         throws SemanticException {
       Task<? extends Serializable> currTask = (Task<? extends Serializable>) nd;
       if (currTask instanceof MapRedTask) {
-        convertMapWork(((MapRedTask) currTask).getWork().getMapWork());
+        convertMapWork(((MapRedTask) currTask).getWork().getMapWork(), false);
       } else if (currTask instanceof TezTask) {
         TezWork work = ((TezTask) currTask).getWork();
         for (BaseWork w: work.getAllWork()) {
           if (w instanceof MapWork) {
-            convertMapWork((MapWork)w);
+            convertMapWork((MapWork) w, true);
           } else if (w instanceof ReduceWork) {
             // We are only vectorizing Reduce under Tez.
-            convertReduceWork((ReduceWork)w);
+            if (HiveConf.getBoolVar(pctx.getConf(),
+                        HiveConf.ConfVars.HIVE_VECTORIZATION_REDUCE_ENABLED)) {
+              convertReduceWork((ReduceWork) w);
+            }
           }
         }
       }
       return null;
     }
 
-    private void convertMapWork(MapWork mapWork) throws SemanticException {
-      boolean ret = validateMapWork(mapWork);
+    private void convertMapWork(MapWork mapWork, boolean isTez) throws SemanticException {
+      boolean ret = validateMapWork(mapWork, isTez);
       if (ret) {
         vectorizeMapWork(mapWork);
       }
@@ -319,7 +326,8 @@ public class Vectorizer implements Physi
           + ReduceSinkOperator.getOperatorName()), np);
     }
 
-    private boolean validateMapWork(MapWork mapWork) throws SemanticException {
+    private boolean validateMapWork(MapWork mapWork, boolean isTez) throws SemanticException {
+      LOG.info("Validating MapWork...");
 
       // Validate the input format
       for (String path : mapWork.getPathToPartitionInfo().keySet()) {
@@ -333,7 +341,7 @@ public class Vectorizer implements Physi
         }
       }
       Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
-      MapWorkValidationNodeProcessor vnp = new MapWorkValidationNodeProcessor();
+      MapWorkValidationNodeProcessor vnp = new MapWorkValidationNodeProcessor(isTez);
       addMapWorkRules(opRules, vnp);
       Dispatcher disp = new DefaultRuleDispatcher(vnp, opRules, null);
       GraphWalker ogw = new DefaultGraphWalker(disp);
@@ -417,9 +425,12 @@ public class Vectorizer implements Physi
     private void addReduceWorkRules(Map<Rule, NodeProcessor> opRules, NodeProcessor np) {
       opRules.put(new RuleRegExp("R1", ExtractOperator.getOperatorName() + ".*"), np);
       opRules.put(new RuleRegExp("R2", GroupByOperator.getOperatorName() + ".*"), np);
+      opRules.put(new RuleRegExp("R3", SelectOperator.getOperatorName() + ".*"), np);
     }
 
     private boolean validateReduceWork(ReduceWork reduceWork) throws SemanticException {
+      LOG.info("Validating ReduceWork...");
+
       // Validate input to ReduceWork.
       if (!getOnlyStructObjectInspectors(reduceWork)) {
         return false;
@@ -487,16 +498,21 @@ public class Vectorizer implements Physi
 
   class MapWorkValidationNodeProcessor implements NodeProcessor {
 
+    private boolean isTez;
+
+    public MapWorkValidationNodeProcessor(boolean isTez) {
+      this.isTez = isTez;
+    }
+
     @Override
     public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
         Object... nodeOutputs) throws SemanticException {
       for (Node n : stack) {
         Operator<? extends OperatorDesc> op = (Operator<? extends OperatorDesc>) n;
-        if ((op.getType().equals(OperatorType.REDUCESINK) || op.getType().equals(OperatorType.FILESINK)) &&
-            op.getParentOperators().get(0).getType().equals(OperatorType.GROUPBY)) {
+        if (nonVectorizableChildOfGroupBy(op)) {
           return new Boolean(true);
         }
-        boolean ret = validateMapWorkOperator(op);
+        boolean ret = validateMapWorkOperator(op, isTez);
         if (!ret) {
           LOG.info("MapWork Operator: " + op.getName() + " could not be vectorized.");
           return new Boolean(false);
@@ -513,6 +529,9 @@ public class Vectorizer implements Physi
         Object... nodeOutputs) throws SemanticException {
       for (Node n : stack) {
         Operator<? extends OperatorDesc> op = (Operator<? extends OperatorDesc>) n;
+        if (nonVectorizableChildOfGroupBy(op)) {
+          return new Boolean(true);
+        }
         boolean ret = validateReduceWorkOperator(op);
         if (!ret) {
           LOG.info("ReduceWork Operator: " + op.getName() + " could not be vectorized.");
@@ -579,21 +598,6 @@ public class Vectorizer implements Physi
       return vContext;
     }
 
-    public Boolean nonVectorizableChildOfGroupBy(Operator<? extends OperatorDesc> op) {
-      Operator<? extends OperatorDesc> currentOp = op;
-      while (currentOp.getParentOperators().size() > 0) {
-        currentOp = currentOp.getParentOperators().get(0);
-        if (currentOp.getType().equals(OperatorType.GROUPBY)) {
-          // No need to vectorize
-          if (!opsDone.contains(op)) {
-            opsDone.add(op);
-          }
-          return true;
-        }
-      }
-      return false;
-    }
-
     public Operator<? extends OperatorDesc> doVectorize(Operator<? extends OperatorDesc> op, VectorizationContext vContext)
             throws SemanticException {
       Operator<? extends OperatorDesc> vectorOp = op;
@@ -665,9 +669,13 @@ public class Vectorizer implements Physi
 
       assert vContext != null;
 
-      // Currently, Vectorized GROUPBY outputs rows, not vectorized row batchs.  So, don't vectorize
-      // any operators below GROUPBY.
+      // When Vectorized GROUPBY outputs rows instead of vectorized row batchs, we don't
+      // vectorize the operators below it.
       if (nonVectorizableChildOfGroupBy(op)) {
+        // No need to vectorize
+        if (!opsDone.contains(op)) {
+            opsDone.add(op);
+          }
         return null;
       }
 
@@ -719,13 +727,22 @@ public class Vectorizer implements Physi
 
       assert vContext != null;
 
-      // Currently, Vectorized GROUPBY outputs rows, not vectorized row batchs.  So, don't vectorize
-      // any operators below GROUPBY.
+      // When Vectorized GROUPBY outputs rows instead of vectorized row batchs, we don't
+      // vectorize the operators below it.
       if (nonVectorizableChildOfGroupBy(op)) {
+        // No need to vectorize
+        if (!opsDone.contains(op)) {
+          opsDone.add(op);
+        }
         return null;
       }
 
       Operator<? extends OperatorDesc> vectorOp = doVectorize(op, vContext);
+      if (vectorOp instanceof VectorGroupByOperator) {
+        VectorGroupByOperator groupBy = (VectorGroupByOperator) vectorOp;
+        VectorGroupByDesc vectorDesc = groupBy.getConf().getVectorDesc();
+        vectorDesc.setVectorGroupBatches(true);
+      }
       if (saveRootVectorOp && op != vectorOp) {
         rootVectorOp = vectorOp;
       }
@@ -772,7 +789,7 @@ public class Vectorizer implements Physi
     return pctx;
   }
 
-  boolean validateMapWorkOperator(Operator<? extends OperatorDesc> op) {
+  boolean validateMapWorkOperator(Operator<? extends OperatorDesc> op, boolean isTez) {
     boolean ret = false;
     switch (op.getType()) {
       case MAPJOIN:
@@ -783,7 +800,7 @@ public class Vectorizer implements Physi
         }
         break;
       case GROUPBY:
-        ret = validateGroupByOperator((GroupByOperator) op);
+        ret = validateGroupByOperator((GroupByOperator) op, false, isTez);
         break;
       case FILTER:
         ret = validateFilterOperator((FilterOperator) op);
@@ -814,6 +831,17 @@ public class Vectorizer implements Physi
       case EXTRACT:
         ret = validateExtractOperator((ExtractOperator) op);
         break;
+      case MAPJOIN:
+        // Does MAPJOIN actually get planned in Reduce?
+        if (op instanceof MapJoinOperator) {
+          ret = validateMapJoinOperator((MapJoinOperator) op);
+        } else if (op instanceof SMBMapJoinOperator) {
+          ret = validateSMBMapJoinOperator((SMBMapJoinOperator) op);
+        }
+        break;
+      case GROUPBY:
+        ret = validateGroupByOperator((GroupByOperator) op, true, true);
+        break;
       case FILTER:
         ret = validateFilterOperator((FilterOperator) op);
         break;
@@ -836,6 +864,23 @@ public class Vectorizer implements Physi
     return ret;
   }
 
+  public Boolean nonVectorizableChildOfGroupBy(Operator<? extends OperatorDesc> op) {
+    Operator<? extends OperatorDesc> currentOp = op;
+    while (currentOp.getParentOperators().size() > 0) {
+      currentOp = currentOp.getParentOperators().get(0);
+      if (currentOp.getType().equals(OperatorType.GROUPBY)) {
+        GroupByDesc desc = (GroupByDesc)currentOp.getConf();
+        boolean isVectorOutput = desc.getVectorDesc().isVectorOutput();
+        if (isVectorOutput) {
+          // This GROUP BY does vectorize its output.
+          return false;
+        }
+        return true;
+      }
+    }
+    return false;
+  }
+
   private boolean validateSMBMapJoinOperator(SMBMapJoinOperator op) {
     SMBJoinDesc desc = op.getConf();
     // Validation is the same as for map join, since the 'small' tables are not vectorized
@@ -886,16 +931,57 @@ public class Vectorizer implements Physi
     return validateExprNodeDesc(desc, VectorExpressionDescriptor.Mode.FILTER);
   }
 
-  private boolean validateGroupByOperator(GroupByOperator op) {
-    if (op.getConf().isGroupingSetsPresent()) {
-      LOG.warn("Grouping sets not supported in vector mode");
+  private boolean validateGroupByOperator(GroupByOperator op, boolean isReduce, boolean isTez) {
+    GroupByDesc desc = op.getConf();
+    VectorGroupByDesc vectorDesc = desc.getVectorDesc();
+
+    if (desc.isGroupingSetsPresent()) {
+      LOG.info("Grouping sets not supported in vector mode");
       return false;
     }
-    boolean ret = validateExprNodeDesc(op.getConf().getKeys());
+    boolean ret = validateExprNodeDesc(desc.getKeys());
     if (!ret) {
       return false;
     }
-    return validateAggregationDesc(op.getConf().getAggregators());
+    ret = validateAggregationDesc(desc.getAggregators(), isReduce);
+    if (!ret) {
+      return false;
+    }
+    boolean isVectorOutput = isTez && aggregatorsOutputIsPrimitive(desc.getAggregators(), isReduce);
+    vectorDesc.setVectorOutput(isVectorOutput);
+    if (isReduce) {
+      if (desc.isDistinct()) {
+        LOG.info("Distinct not supported in reduce vector mode");
+        return false;    
+      }
+      // Sort-based GroupBy?
+      if (desc.getMode() != GroupByDesc.Mode.COMPLETE &&
+          desc.getMode() != GroupByDesc.Mode.PARTIAL1 &&
+          desc.getMode() != GroupByDesc.Mode.PARTIAL2 &&
+          desc.getMode() != GroupByDesc.Mode.MERGEPARTIAL) {
+        LOG.info("Reduce vector mode not supported when input for GROUP BY not sorted");
+        return false;
+      }
+      LOG.info("Reduce GROUP BY mode is " + desc.getMode().name());
+      if (desc.getGroupKeyNotReductionKey()) {
+        LOG.info("Reduce vector mode not supported when group key is not reduction key");
+        return false;    
+      }
+      if (!isVectorOutput) {
+        LOG.info("Reduce vector mode only supported when aggregate outputs are primitive types");
+        return false;    
+      }
+      if (desc.getKeys().size() > 0) {
+        LOG.info("Reduce-side GROUP BY will process key groups");
+        vectorDesc.setVectorGroupBatches(true);
+      } else {
+        LOG.info("Reduce-side GROUP BY will do global aggregation");
+      }
+      vectorDesc.setIsReduce(true);
+    } else {
+      LOG.info("Downstream operators of map-side GROUP BY will be vectorized: " + isVectorOutput);
+    }
+    return true;
   }
 
   private boolean validateExtractOperator(ExtractOperator op) {
@@ -930,9 +1016,9 @@ public class Vectorizer implements Physi
     return true;
   }
 
-  private boolean validateAggregationDesc(List<AggregationDesc> descs) {
+  private boolean validateAggregationDesc(List<AggregationDesc> descs, boolean isReduce) {
     for (AggregationDesc d : descs) {
-      boolean ret = validateAggregationDesc(d);
+      boolean ret = validateAggregationDesc(d, isReduce);
       if (!ret) {
         return false;
       }
@@ -952,9 +1038,7 @@ public class Vectorizer implements Physi
     String typeName = desc.getTypeInfo().getTypeName();
     boolean ret = validateDataType(typeName);
     if (!ret) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Cannot vectorize " + desc.toString() + " of type " + typeName);
-      }
+      LOG.info("Cannot vectorize " + desc.toString() + " of type " + typeName);
       return false;
     }
     if (desc instanceof ExprNodeGenericFuncDesc) {
@@ -987,12 +1071,11 @@ public class Vectorizer implements Physi
       VectorizationContext vc = new ValidatorVectorizationContext();
       if (vc.getVectorExpression(desc, mode) == null) {
         // TODO: this cannot happen - VectorizationContext throws in such cases.
+        LOG.info("getVectorExpression returned null");
         return false;
       }
     } catch (Exception e) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Failed to vectorize", e);
-      }
+      LOG.info("Failed to vectorize", e);
       return false;
     }
     return true;
@@ -1011,16 +1094,56 @@ public class Vectorizer implements Physi
     }
   }
 
-  private boolean validateAggregationDesc(AggregationDesc aggDesc) {
+  private boolean validateAggregationDesc(AggregationDesc aggDesc, boolean isReduce) {
     if (!supportedAggregationUdfs.contains(aggDesc.getGenericUDAFName().toLowerCase())) {
       return false;
     }
     if (aggDesc.getParameters() != null) {
       return validateExprNodeDesc(aggDesc.getParameters());
     }
+    // See if we can vectorize the aggregation.
+    try {
+      VectorizationContext vc = new ValidatorVectorizationContext();
+      if (vc.getAggregatorExpression(aggDesc, isReduce) == null) {
+        // TODO: this cannot happen - VectorizationContext throws in such cases.
+        LOG.info("getAggregatorExpression returned null");
+        return false;
+      }
+    } catch (Exception e) {
+      LOG.info("Failed to vectorize", e);
+      return false;
+    }
+    return true;
+  }
+
+  private boolean aggregatorsOutputIsPrimitive(List<AggregationDesc> descs, boolean isReduce) {
+    for (AggregationDesc d : descs) {
+      boolean ret = aggregatorsOutputIsPrimitive(d, isReduce);
+      if (!ret) {
+        return false;
+      }
+    }
     return true;
   }
 
+  private boolean aggregatorsOutputIsPrimitive(AggregationDesc aggDesc, boolean isReduce) {
+    VectorizationContext vc = new ValidatorVectorizationContext();
+    VectorAggregateExpression vectorAggrExpr;
+    try {
+        vectorAggrExpr = vc.getAggregatorExpression(aggDesc, isReduce);
+    } catch (Exception e) {
+      // We should have already attempted to vectorize in validateAggregationDesc.
+      LOG.info("Vectorization of aggreation should have succeeded ", e);
+      return false;
+    }
+
+    ObjectInspector outputObjInspector = vectorAggrExpr.getOutputObjectInspector();
+    if (outputObjInspector.getCategory() == ObjectInspector.Category.PRIMITIVE) {
+      return true;
+    }
+    return false;
+  }
+
   private boolean validateDataType(String type) {
     return supportedDataTypesPattern.matcher(type.toLowerCase()).matches();
   }

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractVectorDesc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractVectorDesc.java?rev=1623929&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractVectorDesc.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractVectorDesc.java Wed Sep 10 07:41:19 2014
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.plan;
+
+public class AbstractVectorDesc implements VectorDesc {
+
+  @Override
+  public Object clone() throws CloneNotSupportedException {
+    throw new CloneNotSupportedException("clone not supported");
+  }
+}

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java?rev=1623929&r1=1623928&r2=1623929&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java Wed Sep 10 07:41:19 2014
@@ -69,7 +69,11 @@ public class GroupByDesc extends Abstrac
   transient private boolean isDistinct;
   private boolean dontResetAggrsDistinct;
 
+  // Extra parameters only for vectorization.
+  private VectorGroupByDesc vectorDesc;
+
   public GroupByDesc() {
+    vectorDesc = new VectorGroupByDesc();
   }
 
   public GroupByDesc(
@@ -102,6 +106,7 @@ public class GroupByDesc extends Abstrac
       final boolean groupingSetsPresent,
       final int groupingSetsPosition,
       final boolean isDistinct) {
+    vectorDesc = new VectorGroupByDesc();
     this.mode = mode;
     this.outputColumnNames = outputColumnNames;
     this.keys = keys;
@@ -116,6 +121,14 @@ public class GroupByDesc extends Abstrac
     this.isDistinct = isDistinct;
   }
 
+  public void setVectorDesc(VectorGroupByDesc vectorDesc) {
+    this.vectorDesc = vectorDesc;
+  }
+
+  public VectorGroupByDesc getVectorDesc() {
+    return vectorDesc;
+  }
+
   public Mode getMode() {
     return mode;
   }
@@ -268,6 +281,14 @@ public class GroupByDesc extends Abstrac
     this.groupingSetPosition = groupingSetPosition;
   }
 
+  public boolean isDontResetAggrsDistinct() {
+    return dontResetAggrsDistinct;
+  }
+
+  public void setDontResetAggrsDistinct(boolean dontResetAggrsDistinct) {
+    this.dontResetAggrsDistinct = dontResetAggrsDistinct;
+  }
+
   public boolean isDistinct() {
     return isDistinct;
   }
@@ -276,11 +297,4 @@ public class GroupByDesc extends Abstrac
     this.isDistinct = isDistinct;
   }
 
-  public boolean isDontResetAggrsDistinct() {
-    return dontResetAggrsDistinct;
-  }
-
-  public void setDontResetAggrsDistinct(boolean dontResetAggrsDistinct) {
-    this.dontResetAggrsDistinct = dontResetAggrsDistinct;
-  }
 }

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/VectorDesc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/VectorDesc.java?rev=1623929&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/VectorDesc.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/VectorDesc.java Wed Sep 10 07:41:19 2014
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.plan;
+
+import java.io.Serializable;
+
+public interface VectorDesc extends Serializable, Cloneable {
+  public Object clone() throws CloneNotSupportedException;
+}

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/VectorGroupByDesc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/VectorGroupByDesc.java?rev=1623929&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/VectorGroupByDesc.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/VectorGroupByDesc.java Wed Sep 10 07:41:19 2014
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.plan;
+
+/**
+ * VectorGroupByDesc.
+ *
+ * Extra parameters beyond GroupByDesc just for the VectorGroupByOperator.
+ *
+ * We don't extend GroupByDesc because the base OperatorDesc doesn't support
+ * clone and adding it is a lot work for little gain.
+ */
+public class VectorGroupByDesc extends AbstractVectorDesc  {
+
+  private static long serialVersionUID = 1L;
+
+  private boolean isReduce;
+  private boolean isVectorGroupBatches;
+  private boolean isVectorOutput;
+
+  public VectorGroupByDesc() {
+    this.isReduce = false;
+    this.isVectorGroupBatches = false;
+    this.isVectorOutput = false;
+  }
+
+  public boolean isReduce() {
+    return isReduce;
+  }
+
+  public void setIsReduce(boolean isReduce) {
+    this.isReduce = isReduce;
+  }
+
+  public boolean isVectorGroupBatches() {
+    return isVectorGroupBatches;
+  }
+
+  public void setVectorGroupBatches(boolean isVectorGroupBatches) {
+    this.isVectorGroupBatches = isVectorGroupBatches;
+  }
+
+  public boolean isVectorOutput() {
+    return isVectorOutput;
+  }
+
+  public void setVectorOutput(boolean isVectorOutput) {
+    this.isVectorOutput = isVectorOutput;
+  }
+}

Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/optimizer/physical/TestVectorizer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/optimizer/physical/TestVectorizer.java?rev=1623929&r1=1623928&r2=1623929&view=diff
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/optimizer/physical/TestVectorizer.java (original)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/optimizer/physical/TestVectorizer.java Wed Sep 10 07:41:19 2014
@@ -107,7 +107,7 @@ public class TestVectorizer {
     gbyOp.setConf(desc);
 
     Vectorizer v = new Vectorizer();
-    Assert.assertTrue(v.validateMapWorkOperator(gbyOp));
+    Assert.assertTrue(v.validateMapWorkOperator(gbyOp, false));
     VectorGroupByOperator vectorOp = (VectorGroupByOperator) v.vectorizeOperator(gbyOp, vContext);
     Assert.assertEquals(VectorUDAFSumLong.class, vectorOp.getAggregators()[0].getClass());
     VectorUDAFSumLong udaf = (VectorUDAFSumLong) vectorOp.getAggregators()[0];
@@ -150,7 +150,7 @@ public class TestVectorizer {
   /**
   * prepareAbstractMapJoin prepares a join operator descriptor, used as helper by SMB and Map join tests. 
   */
-  private void prepareAbstractMapJoin(AbstractMapJoinOperator<? extends MapJoinDesc> mop, MapJoinDesc mjdesc) {
+  private void prepareAbstractMapJoin(AbstractMapJoinOperator<? extends MapJoinDesc> map, MapJoinDesc mjdesc) {
       mjdesc.setPosBigTable(0);
       List<ExprNodeDesc> expr = new ArrayList<ExprNodeDesc>();
       expr.add(new ExprNodeColumnDesc(Integer.class, "col1", "T", false));
@@ -180,14 +180,14 @@ public class TestVectorizer {
   */
   @Test
   public void testValidateMapJoinOperator() {
-    MapJoinOperator mop = new MapJoinOperator();
+    MapJoinOperator map = new MapJoinOperator();
     MapJoinDesc mjdesc = new MapJoinDesc();
     
-    prepareAbstractMapJoin(mop, mjdesc);
-    mop.setConf(mjdesc);
+    prepareAbstractMapJoin(map, mjdesc);
+    map.setConf(mjdesc);
  
     Vectorizer vectorizer = new Vectorizer();
-    Assert.assertTrue(vectorizer.validateMapWorkOperator(mop));
+    Assert.assertTrue(vectorizer.validateMapWorkOperator(map, false));
   }
 
   
@@ -196,13 +196,13 @@ public class TestVectorizer {
   */
   @Test
   public void testValidateSMBJoinOperator() {
-      SMBMapJoinOperator mop = new SMBMapJoinOperator();
+      SMBMapJoinOperator map = new SMBMapJoinOperator();
       SMBJoinDesc mjdesc = new SMBJoinDesc();
       
-      prepareAbstractMapJoin(mop, mjdesc);
-      mop.setConf(mjdesc);
+      prepareAbstractMapJoin(map, mjdesc);
+      map.setConf(mjdesc);
     
       Vectorizer vectorizer = new Vectorizer();
-      Assert.assertTrue(vectorizer.validateMapWorkOperator(mop)); 
+      Assert.assertTrue(vectorizer.validateMapWorkOperator(map, false)); 
   }
 }

Modified: hive/trunk/ql/src/test/queries/clientpositive/dynpart_sort_opt_vectorization.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/dynpart_sort_opt_vectorization.q?rev=1623929&r1=1623928&r2=1623929&view=diff
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/dynpart_sort_opt_vectorization.q (original)
+++ hive/trunk/ql/src/test/queries/clientpositive/dynpart_sort_opt_vectorization.q Wed Sep 10 07:41:19 2014
@@ -150,7 +150,9 @@ insert overwrite table over1k_part_buck_
 desc formatted over1k_part_buck_sort2_orc partition(t=27);
 desc formatted over1k_part_buck_sort2_orc partition(t="__HIVE_DEFAULT_PARTITION__");
 
+explain select * from over1k_part_buck_sort2_orc;
 select * from over1k_part_buck_sort2_orc;
+explain select count(*) from over1k_part_buck_sort2_orc;
 select count(*) from over1k_part_buck_sort2_orc;
 
 set hive.optimize.sort.dynamic.partition=true;
@@ -159,5 +161,7 @@ insert overwrite table over1k_part_buck_
 desc formatted over1k_part_buck_sort2_orc partition(t=27);
 desc formatted over1k_part_buck_sort2_orc partition(t="__HIVE_DEFAULT_PARTITION__");
 
+explain select * from over1k_part_buck_sort2_orc;
 select * from over1k_part_buck_sort2_orc;
+explain select count(*) from over1k_part_buck_sort2_orc;
 select count(*) from over1k_part_buck_sort2_orc;



Mime
View raw message