hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rem...@apache.org
Subject svn commit: r1580179 - in /hive/trunk/ql/src: gen/vectorization/UDAFTemplates/ java/org/apache/hadoop/hive/ql/exec/ java/org/apache/hadoop/hive/ql/exec/vector/ java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/
Date Sat, 22 Mar 2014 08:08:02 GMT
Author: remusr
Date: Sat Mar 22 08:08:01 2014
New Revision: 1580179

URL: http://svn.apache.org/r1580179
Log:
HIVE-6222 Make Vector Group By operator abandon grouping if too many distinct keys (reviewed by Jitendra)

Added:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorUtilBatchObjectPool.java
Modified:
    hive/trunk/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFAvg.txt
    hive/trunk/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMax.txt
    hive/trunk/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxDecimal.txt
    hive/trunk/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxString.txt
    hive/trunk/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFSum.txt
    hive/trunk/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFVar.txt
    hive/trunk/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFVarDecimal.txt
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAggregationBufferRow.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapper.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapperBatch.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorAggregateExpression.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFAvgDecimal.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCount.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCountStar.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFSumDecimal.java

Modified: hive/trunk/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFAvg.txt
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFAvg.txt?rev=1580179&r1=1580178&r2=1580179&view=diff
==============================================================================
--- hive/trunk/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFAvg.txt (original)
+++ hive/trunk/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFAvg.txt Sat Mar 22 08:08:01 2014
@@ -75,6 +75,13 @@ public class <ClassName> extends VectorA
       public int getVariableSize() {
         throw new UnsupportedOperationException();
       }
+      
+      @Override
+      public void reset () {
+        isNull = true;
+        sum = 0;
+        count = 0L;
+      }
     }
     
     private VectorExpression inputExpression;
@@ -430,7 +437,7 @@ public class <ClassName> extends VectorA
     @Override
     public void reset(AggregationBuffer agg) throws HiveException {
       Aggregation myAgg = (Aggregation) agg;
-      myAgg.isNull = true;
+      myAgg.reset();
     }
 
     @Override

Modified: hive/trunk/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMax.txt
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMax.txt?rev=1580179&r1=1580178&r2=1580179&view=diff
==============================================================================
--- hive/trunk/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMax.txt (original)
+++ hive/trunk/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMax.txt Sat Mar 22 08:08:01 2014
@@ -68,6 +68,12 @@ public class <ClassName> extends VectorA
       public int getVariableSize() {
         throw new UnsupportedOperationException();
       }
+
+      @Override
+      public void reset () {
+        isNull = true;
+        value = 0;
+      }
     }
     
     private VectorExpression inputExpression;
@@ -405,7 +411,7 @@ public class <ClassName> extends VectorA
     @Override
     public void reset(AggregationBuffer agg) throws HiveException {
       Aggregation myAgg = (Aggregation) agg;
-      myAgg.isNull = true;
+      myAgg.reset();
     }
 
     @Override

Modified: hive/trunk/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxDecimal.txt
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxDecimal.txt?rev=1580179&r1=1580178&r2=1580179&view=diff
==============================================================================
--- hive/trunk/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxDecimal.txt (original)
+++ hive/trunk/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxDecimal.txt Sat Mar 22 08:08:01 2014
@@ -73,6 +73,12 @@ public class <ClassName> extends VectorA
       public int getVariableSize() {
         throw new UnsupportedOperationException();
       }
+
+      @Override
+      public void reset () {
+        isNull = true;
+        value.zeroClear();
+      }
     }
 
     private VectorExpression inputExpression;
@@ -423,7 +429,7 @@ public class <ClassName> extends VectorA
     @Override
     public void reset(AggregationBuffer agg) throws HiveException {
       Aggregation myAgg = (Aggregation) agg;
-      myAgg.isNull = true;
+      myAgg.reset();
     }
 
     @Override

Modified: hive/trunk/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxString.txt
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxString.txt?rev=1580179&r1=1580178&r2=1580179&view=diff
==============================================================================
--- hive/trunk/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxString.txt (original)
+++ hive/trunk/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxString.txt Sat Mar 22 08:08:01 2014
@@ -83,6 +83,13 @@ public class <ClassName> extends VectorA
         JavaDataModel model = JavaDataModel.get();
         return model.lengthForByteArrayOfSize(bytes.length);
       }
+
+      @Override
+      public void reset () {
+        isNull = true;
+        length = 0;
+      }
+
     }
 
     private VectorExpression inputExpression;
@@ -352,7 +359,7 @@ public class <ClassName> extends VectorA
     @Override
     public void reset(AggregationBuffer agg) throws HiveException {
       Aggregation myAgg = (Aggregation) agg;
-      myAgg.isNull = true;
+      myAgg.reset();
     }
 
     @Override

Modified: hive/trunk/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFSum.txt
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFSum.txt?rev=1580179&r1=1580178&r2=1580179&view=diff
==============================================================================
--- hive/trunk/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFSum.txt (original)
+++ hive/trunk/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFSum.txt Sat Mar 22 08:08:01 2014
@@ -69,6 +69,12 @@ public class <ClassName> extends VectorA
       public int getVariableSize() {
         throw new UnsupportedOperationException();
       }
+
+      @Override
+      public void reset () {
+        isNull = true;
+        sum = 0;;
+      }
     }
     
     private VectorExpression inputExpression;
@@ -396,7 +402,7 @@ public class <ClassName> extends VectorA
     @Override
     public void reset(AggregationBuffer agg) throws HiveException {
       Aggregation myAgg = (Aggregation) agg;
-      myAgg.isNull = true;
+      myAgg.reset();
     }
 
     @Override

Modified: hive/trunk/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFVar.txt
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFVar.txt?rev=1580179&r1=1580178&r2=1580179&view=diff
==============================================================================
--- hive/trunk/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFVar.txt (original)
+++ hive/trunk/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFVar.txt Sat Mar 22 08:08:01 2014
@@ -73,6 +73,14 @@ public class <ClassName> extends VectorA
       public int getVariableSize() {
         throw new UnsupportedOperationException();
       }
+
+      @Override
+      public void reset () {
+        isNull = true;
+        sum = 0;
+        count = 0;
+        variance = 0;
+      }
     }
 
     private VectorExpression inputExpression;
@@ -475,7 +483,7 @@ public class <ClassName> extends VectorA
     @Override
     public void reset(AggregationBuffer agg) throws HiveException {
       Aggregation myAgg = (Aggregation) agg;
-      myAgg.isNull = true;
+      myAgg.reset();
     }
 
     @Override

Modified: hive/trunk/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFVarDecimal.txt
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFVarDecimal.txt?rev=1580179&r1=1580178&r2=1580179&view=diff
==============================================================================
--- hive/trunk/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFVarDecimal.txt (original)
+++ hive/trunk/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFVarDecimal.txt Sat Mar 22 08:08:01 2014
@@ -78,6 +78,14 @@ public class <ClassName> extends VectorA
         throw new UnsupportedOperationException();
       }
 
+      @Override
+      public void reset () {
+        isNull = true;
+        sum = 0f;
+        count = 0;
+        variance = 0f;
+      }
+
       public void updateValueWithCheckAndInit(Decimal128 value, short scale) {
         if (this.isNull) {
           this.init();
@@ -429,7 +437,7 @@ public class <ClassName> extends VectorA
     @Override
     public void reset(AggregationBuffer agg) throws HiveException {
       Aggregation myAgg = (Aggregation) agg;
-      myAgg.isNull = true;
+      myAgg.reset();
     }
 
     @Override
@@ -474,4 +482,4 @@ public class <ClassName> extends VectorA
   public void setInputExpression(VectorExpression inputExpression) {
     this.inputExpression = inputExpression;
   }
-}
+}
\ No newline at end of file

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java?rev=1580179&r1=1580178&r2=1580179&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java Sat Mar 22 08:08:01 2014
@@ -120,7 +120,7 @@ public class GroupByOperator extends Ope
 
   transient boolean firstRow;
   transient long totalMemory;
-  transient boolean hashAggr;
+  protected transient boolean hashAggr;
   // The reduction is happening on the reducer, and the grouping key and
   // reduction keys are different.
   // For example: select a, count(distinct b) from T group by a

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAggregationBufferRow.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAggregationBufferRow.java?rev=1580179&r1=1580178&r2=1580179&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAggregationBufferRow.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAggregationBufferRow.java Sat Mar 22 08:08:01 2014
@@ -71,5 +71,14 @@ public class VectorAggregationBufferRow 
     this.index  = index;
     this.version = version;
   }
+
+  /**
+   * Resets the aggregation buffers for reuse
+   */
+  public void reset() {
+    for(int i = 0; i < aggregationBuffers.length; ++i) {
+      aggregationBuffers[i].reset();
+    }
+  }
   
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java?rev=1580179&r1=1580178&r2=1580179&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java Sat Mar 22 08:08:01 2014
@@ -83,49 +83,542 @@ public class VectorGroupByOperator exten
    */
   private transient VectorHashKeyWrapperBatch keyWrappersBatch;
 
+  private transient Object[] forwardCache;
+
   /**
-   * Total per hashtable entry fixed memory (does not depend on key/agg values).
+   * Interface for processing mode: global, hash or streaming
    */
-  private transient int fixedHashEntrySize;
+  private static interface IProcessingMode {
+    public void initialize(Configuration hconf) throws HiveException;
+    public void processBatch(VectorizedRowBatch batch) throws HiveException;
+    public void close(boolean aborted) throws HiveException;
+  }
 
   /**
-   * Average per hashtable entry variable size memory (depends on key/agg value).
-   */
-  private transient int avgVariableSize;
+   * Base class for all processing modes
+   */
+  private abstract class ProcessingModeBase implements IProcessingMode {
+    /**
+     * Evaluates the aggregators on the current batch.
+     * The aggregationBatchInfo must have been prepared
+     * by calling {@link #prepareBatchAggregationBufferSets} first.
+     */
+    protected void processAggregators(VectorizedRowBatch batch) throws HiveException {
+      // We now have a vector of aggregation buffer sets to use for each row
+      // We can start computing the aggregates.
+      // If the number of distinct keys in the batch is 1 we can
+      // use the optimized code path of aggregateInput
+      VectorAggregationBufferRow[] aggregationBufferSets =
+          aggregationBatchInfo.getAggregationBuffers();
+      if (aggregationBatchInfo.getDistinctBufferSetCount() == 1) {
+        VectorAggregateExpression.AggregationBuffer[] aggregationBuffers =
+            aggregationBufferSets[0].getAggregationBuffers();
+        for (int i = 0; i < aggregators.length; ++i) {
+          aggregators[i].aggregateInput(aggregationBuffers[i], batch);
+        }
+      } else {
+        for (int i = 0; i < aggregators.length; ++i) {
+          aggregators[i].aggregateInputSelection(
+              aggregationBufferSets,
+              i,
+              batch);
+        }
+      }
+    }
+
+    /**
+     * allocates a new aggregation buffer set.
+     */
+    protected VectorAggregationBufferRow allocateAggregationBuffer() throws HiveException {
+      VectorAggregateExpression.AggregationBuffer[] aggregationBuffers =
+          new VectorAggregateExpression.AggregationBuffer[aggregators.length];
+      for (int i=0; i < aggregators.length; ++i) {
+        aggregationBuffers[i] = aggregators[i].getNewAggregationBuffer();
+        aggregators[i].reset(aggregationBuffers[i]);
+      }
+      VectorAggregationBufferRow bufferSet = new VectorAggregationBufferRow(aggregationBuffers);
+      return bufferSet;
+    }
+
+  }
 
   /**
-   * Number of entries added to the hashtable since the last check if it should flush.
+   * Global aggregates (no GROUP BY clause, no keys)
+   * This mode is very simple, there are no keys to consider, and only flushes one row at closing
+   * The one row must flush even if no input was seen (NULLs)
    */
-  private transient int numEntriesSinceCheck;
+  private class ProcessingModeGlobalAggregate extends ProcessingModeBase {
+
+    /**
+     * In global processing mode there is only one set of aggregation buffers 
+     */
+    private VectorAggregationBufferRow aggregationBuffers;
+
+    @Override
+    public void initialize(Configuration hconf) throws HiveException {
+      aggregationBuffers =  allocateAggregationBuffer();
+      LOG.info("using global aggregation processing mode");
+    }
+
+    @Override
+    public void processBatch(VectorizedRowBatch batch) throws HiveException {
+      for (int i = 0; i < aggregators.length; ++i) {
+        aggregators[i].aggregateInput(aggregationBuffers.getAggregationBuffer(i), batch);
+      }
+    }
+
+    @Override
+    public void close(boolean aborted) throws HiveException {
+      if (!aborted) {
+        flushSingleRow(null, aggregationBuffers);
+      }
+    }
+  }
 
   /**
-   * Sum of batch size processed (ie. rows).
+   * Hash Aggregate mode processing
    */
-  private transient long sumBatchSize;
+  private class ProcessingModeHashAggregate extends ProcessingModeBase {
+
+    /**
+     * The global key-aggregation hash map.
+     */
+    private Map<KeyWrapper, VectorAggregationBufferRow> mapKeysAggregationBuffers;
+
+    /**
+     * Total per hashtable entry fixed memory (does not depend on key/agg values).
+     */
+    private int fixedHashEntrySize;
+
+    /**
+     * Average per hashtable entry variable size memory (depends on key/agg value).
+     */
+    private int avgVariableSize;
+
+    /**
+     * Number of entries added to the hashtable since the last check if it should flush.
+     */
+    private int numEntriesSinceCheck;
+
+    /**
+     * Sum of batch size processed (ie. rows).
+     */
+    private long sumBatchSize;
+
+    /**
+     * Max number of entries in the vector group by aggregation hashtables. 
+     * Exceeding this will trigger a flush irrelevant of memory pressure condition.
+     */
+    private int maxHtEntries = 1000000;
+
+    /**
+     * The number of new entries that must be added to the hashtable before a memory size check.
+     */
+    private int checkInterval = 10000;
+
+    /**
+     * Percent of entries to flush when memory threshold exceeded.
+     */
+    private float percentEntriesToFlush = 0.1f;
   
-  /**
-   * Max number of entries in the vector group by aggregation hashtables. 
-   * Exceeding this will trigger a flush irrelevant of memory pressure condition.
-   */
-  private transient int maxHtEntries = 1000000;
+    /**
+     * A soft reference used to detect memory pressure
+     */
+    private SoftReference<Object> gcCanary = new SoftReference<Object>(new Object());
+    
+    /**
+     * Counts the number of time the gcCanary died and was resurrected
+     */
+    private long gcCanaryFlushes = 0L;
+
+    /**
+     * Count of rows since the last check for changing from aggregate to streaming mode
+     */
+    private long lastModeCheckRowCount = 0;
+
+    /**
+     * Minimum factor for hash table to reduce number of entries
+     * If this is not met, the processing switches to streaming mode
+     */
+    private float minReductionHashAggr;
+
+    /**
+     * Number of rows processed between checks for minReductionHashAggr factor
+     * TODO: there is overlap between numRowsCompareHashAggr and checkInterval
+     */
+    private long numRowsCompareHashAggr;
+
+    @Override
+    public void initialize(Configuration hconf) throws HiveException {
+      // hconf is null in unit testing
+      if (null != hconf) {
+        this.percentEntriesToFlush = HiveConf.getFloatVar(hconf,
+          HiveConf.ConfVars.HIVE_VECTORIZATION_GROUPBY_FLUSH_PERCENT);
+        this.checkInterval = HiveConf.getIntVar(hconf,
+          HiveConf.ConfVars.HIVE_VECTORIZATION_GROUPBY_CHECKINTERVAL);
+        this.maxHtEntries = HiveConf.getIntVar(hconf,
+            HiveConf.ConfVars.HIVE_VECTORIZATION_GROUPBY_MAXENTRIES);
+        this.minReductionHashAggr = HiveConf.getFloatVar(hconf,
+            HiveConf.ConfVars.HIVEMAPAGGRHASHMINREDUCTION);
+          this.numRowsCompareHashAggr = HiveConf.getLongVar(hconf,
+            HiveConf.ConfVars.HIVEGROUPBYMAPINTERVAL);
+      } 
+      else {
+        this.percentEntriesToFlush =
+            HiveConf.ConfVars.HIVE_VECTORIZATION_GROUPBY_FLUSH_PERCENT.defaultFloatVal;
+        this.checkInterval =
+            HiveConf.ConfVars.HIVE_VECTORIZATION_GROUPBY_CHECKINTERVAL.defaultIntVal;
+        this.maxHtEntries =
+            HiveConf.ConfVars.HIVE_VECTORIZATION_GROUPBY_MAXENTRIES.defaultIntVal;
+        this.minReductionHashAggr =
+            HiveConf.ConfVars.HIVEMAPAGGRHASHMINREDUCTION.defaultFloatVal;
+          this.numRowsCompareHashAggr =
+            HiveConf.ConfVars.HIVEGROUPBYMAPINTERVAL.defaultIntVal;
+      }
 
-  /**
-   * The number of new entries that must be added to the hashtable before a memory size check.
-   */
-  private transient int checkInterval = 10000;
+      mapKeysAggregationBuffers = new HashMap<KeyWrapper, VectorAggregationBufferRow>();
+      computeMemoryLimits();
+      LOG.info("using hash aggregation processing mode");
+    }
+
+    @Override
+    public void processBatch(VectorizedRowBatch batch) throws HiveException {
+
+      // First we traverse the batch to evaluate and prepare the KeyWrappers
+      // After this the KeyWrappers are properly set and hash code is computed
+      keyWrappersBatch.evaluateBatch(batch);
+
+      // Next we locate the aggregation buffer set for each key
+      prepareBatchAggregationBufferSets(batch);
+
+      // Finally, evaluate the aggregators
+      processAggregators(batch);
+
+      //Flush if memory limits were reached
+      // We keep flushing until the memory is under threshold 
+      int preFlushEntriesCount = numEntriesHashTable;
+      while (shouldFlush(batch)) {
+        flush(false);
+
+        if(gcCanary.get() == null) {
+          gcCanaryFlushes++;
+          gcCanary = new SoftReference<Object>(new Object()); 
+        }
+
+        //Validate that some progress is being made
+        if (!(numEntriesHashTable < preFlushEntriesCount)) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(String.format("Flush did not progress: %d entries before, %d entries after",
+                preFlushEntriesCount,
+                numEntriesHashTable));
+          }
+          break;
+        }
+        preFlushEntriesCount = numEntriesHashTable;
+      }
+
+      if (sumBatchSize == 0 && 0 != batch.size) {
+        // Sample the first batch processed for variable sizes.
+        updateAvgVariableSize(batch);
+      }
+
+      sumBatchSize += batch.size;
+      lastModeCheckRowCount += batch.size;
+
+      // Check if we should turn into streaming mode
+      checkHashModeEfficiency();
+    }
+
+    @Override
+    public void close(boolean aborted) throws HiveException {
+      if (!aborted) {
+        flush(true);
+      }
+    }
+
+    /**
+     * Locates the aggregation buffer sets to use for each key in the current batch.
+     * The keyWrappersBatch must have evaluated the current batch first.
+     */
+    private void prepareBatchAggregationBufferSets(VectorizedRowBatch batch) throws HiveException {
+      // The aggregation batch vector needs to know when we start a new batch
+      // to bump its internal version.
+      aggregationBatchInfo.startBatch();
+
+      // We now have to probe the global hash and find-or-allocate
+      // the aggregation buffers to use for each key present in the batch
+      VectorHashKeyWrapper[] keyWrappers = keyWrappersBatch.getVectorHashKeyWrappers();
+      for (int i=0; i < batch.size; ++i) {
+        VectorHashKeyWrapper kw = keyWrappers[i];
+        VectorAggregationBufferRow aggregationBuffer = mapKeysAggregationBuffers.get(kw);
+        if (null == aggregationBuffer) {
+          // the probe failed, we must allocate a set of aggregation buffers
+          // and push the (keywrapper,buffers) pair into the hash.
+          // is very important to clone the keywrapper, the one we have from our
+          // keyWrappersBatch is going to be reset/reused on next batch.
+          aggregationBuffer = allocateAggregationBuffer();
+          mapKeysAggregationBuffers.put(kw.copyKey(), aggregationBuffer);
+          numEntriesHashTable++;
+          numEntriesSinceCheck++;
+        }
+        aggregationBatchInfo.mapAggregationBufferSet(aggregationBuffer, i);
+      }
+    }
+
+    /**
+     * Computes the memory limits for hash table flush (spill).
+     */
+    private void computeMemoryLimits() {
+      JavaDataModel model = JavaDataModel.get();
+
+      fixedHashEntrySize =
+          model.hashMapEntry() +
+          keyWrappersBatch.getKeysFixedSize() +
+          aggregationBatchInfo.getAggregatorsFixedSize();
+
+      MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
+      maxMemory = memoryMXBean.getHeapMemoryUsage().getMax();
+      memoryThreshold = conf.getMemoryThreshold();
+      // Tests may leave this unitialized, so better set it to 1
+      if (memoryThreshold == 0.0f) {
+        memoryThreshold = 1.0f;
+      }
+
+      maxHashTblMemory = (int)(maxMemory * memoryThreshold);
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(String.format("maxMemory:%dMb (%d * %f) fixSize:%d (key:%d agg:%d)",
+            maxHashTblMemory/1024/1024,
+            maxMemory/1024/1024,
+            memoryThreshold,
+            fixedHashEntrySize,
+            keyWrappersBatch.getKeysFixedSize(),
+            aggregationBatchInfo.getAggregatorsFixedSize()));
+      }
+    }
+
+    /**
+     * Flushes the entries in the hash table by emiting output (forward).
+     * When parameter 'all' is true all the entries are flushed.
+     * @param all
+     * @throws HiveException
+     */
+    private void flush(boolean all) throws HiveException {
+
+      int entriesToFlush = all ? numEntriesHashTable :
+        (int)(numEntriesHashTable * this.percentEntriesToFlush);
+      int entriesFlushed = 0;
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(String.format(
+            "Flush %d %s entries:%d fixed:%d variable:%d (used:%dMb max:%dMb) gcCanary:%s",
+            entriesToFlush, all ? "(all)" : "",
+            numEntriesHashTable, fixedHashEntrySize, avgVariableSize,
+            numEntriesHashTable * (fixedHashEntrySize + avgVariableSize)/1024/1024,
+            maxHashTblMemory/1024/1024,
+            gcCanary.get() == null ? "dead" : "alive"));
+      }
+
+      /* Iterate the global (keywrapper,aggregationbuffers) map and emit
+       a row for each key */
+      Iterator<Map.Entry<KeyWrapper, VectorAggregationBufferRow>> iter =
+          mapKeysAggregationBuffers.entrySet().iterator();
+      while(iter.hasNext()) {
+        Map.Entry<KeyWrapper, VectorAggregationBufferRow> pair = iter.next();
+
+        flushSingleRow((VectorHashKeyWrapper) pair.getKey(), pair.getValue());
+
+        if (!all) {
+          iter.remove();
+          --numEntriesHashTable;
+          if (++entriesFlushed >= entriesToFlush) {
+            break;
+          }
+        }
+      }
+
+      if (all) {
+        mapKeysAggregationBuffers.clear();
+        numEntriesHashTable = 0;
+      }
+      
+      if (all && LOG.isDebugEnabled()) {
+        LOG.debug(String.format("GC canary caused %d flushes", gcCanaryFlushes));
+      }
+    }
+
+    /**
+     * Returns true if the memory threshold for the hash table was reached.
+     */
+    private boolean shouldFlush(VectorizedRowBatch batch) {
+      if (batch.size == 0) {
+        return false;
+      }
+      //numEntriesSinceCheck is the number of entries added to the hash table
+      // since the last time we checked the average variable size
+      if (numEntriesSinceCheck >= this.checkInterval) {
+        // Were going to update the average variable row size by sampling the current batch
+        updateAvgVariableSize(batch);
+        numEntriesSinceCheck = 0;
+      }
+      if (numEntriesHashTable > this.maxHtEntries ||
+          numEntriesHashTable * (fixedHashEntrySize + avgVariableSize) > maxHashTblMemory) {
+        return true;
+      }
+      if (gcCanary.get() == null) {
+        return true;
+      }
+      
+      return false;
+    }
+
+    /**
+     * Updates the average variable size of the hash table entries.
+     * The average is only updates by probing the batch that added the entry in the hash table
+     * that caused the check threshold to be reached.
+     */
+    private void updateAvgVariableSize(VectorizedRowBatch batch) {
+      int keyVariableSize = keyWrappersBatch.getVariableSize(batch.size);
+      int aggVariableSize = aggregationBatchInfo.getVariableSize(batch.size);
+
+      // This assumes the distribution of variable size keys/aggregates in the input
+      // is the same as the distribution of variable sizes in the hash entries
+      avgVariableSize = (int)((avgVariableSize * sumBatchSize + keyVariableSize +aggVariableSize) /
+          (sumBatchSize + batch.size));
+    }
+
+    /**
+     * Checks if the HT reduces the number of entries by at least minReductionHashAggr factor 
+     * @throws HiveException
+     */
+    private void checkHashModeEfficiency() throws HiveException {
+      if (lastModeCheckRowCount > numRowsCompareHashAggr) {
+        lastModeCheckRowCount = 0;
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(String.format("checkHashModeEfficiency: HT:%d RC:%d MIN:%d", 
+              numEntriesHashTable, sumBatchSize, (long)(sumBatchSize * minReductionHashAggr)));
+        }
+        if (numEntriesHashTable > sumBatchSize * minReductionHashAggr) {
+          flush(true);
+
+          changeToStreamingMode();
+        }
+      }
+    }
+  }
 
   /**
-   * Percent of entries to flush when memory threshold exceeded.
+   * Streaming processing mode. Intermediate values are flushed each time key changes.
+   * In this mode we're relying on the MR shuffle and merge the intermediates in the reduce.
    */
-  private transient float percentEntriesToFlush = 0.1f;
-  
-  private transient SoftReference<Object> gcCanary = new SoftReference<Object>(new Object());
-  private transient long gcCanaryFlushes = 0L;
+  private class ProcessingModeStreaming extends ProcessingModeBase {
+
+    /** 
+     * The aggreagation buffers used in streaming mode
+     */
+    private VectorAggregationBufferRow currentStreamingAggregators;
+
+    /**
+     * The current key, used in streaming mode
+     */
+    private VectorHashKeyWrapper streamingKey;
+
+    /**
+     * The keys that needs to be flushed at the end of the current batch
+     */
+    private final VectorHashKeyWrapper[] keysToFlush = 
+        new VectorHashKeyWrapper[VectorizedRowBatch.DEFAULT_SIZE];
+
+    /**
+     * The aggregates that needs to be flushed at the end of the current batch
+     */
+    private final VectorAggregationBufferRow[] rowsToFlush = 
+        new VectorAggregationBufferRow[VectorizedRowBatch.DEFAULT_SIZE];
+
+    /**
+     * A pool of VectorAggregationBufferRow to avoid repeated allocations
+     */
+    private VectorUtilBatchObjectPool<VectorAggregationBufferRow> 
+      streamAggregationBufferRowPool;
+
+    @Override
+    public void initialize(Configuration hconf) throws HiveException {
+      streamAggregationBufferRowPool = new VectorUtilBatchObjectPool<VectorAggregationBufferRow>(
+          VectorizedRowBatch.DEFAULT_SIZE,
+          new VectorUtilBatchObjectPool.IAllocator<VectorAggregationBufferRow>() {
+
+            @Override
+            public VectorAggregationBufferRow alloc() throws HiveException {
+              return allocateAggregationBuffer();
+            }
+
+            @Override
+            public void free(VectorAggregationBufferRow t) {
+              // Nothing to do
+            }
+          });
+      LOG.info("using streaming aggregation processing mode");
+    }
+
+    @Override
+    public void processBatch(VectorizedRowBatch batch) throws HiveException {
+      // First we traverse the batch to evaluate and prepare the KeyWrappers
+      // After this the KeyWrappers are properly set and hash code is computed
+      keyWrappersBatch.evaluateBatch(batch);
+
+      VectorHashKeyWrapper[] batchKeys = keyWrappersBatch.getVectorHashKeyWrappers();
+
+      if (streamingKey == null) {
+        // This is the first batch we process after switching from hash mode
+        currentStreamingAggregators = streamAggregationBufferRowPool.getFromPool();
+        streamingKey = (VectorHashKeyWrapper) batchKeys[0].copyKey();
+      }
+
+      aggregationBatchInfo.startBatch();
+      int flushMark = 0;
+
+      for(int i = 0; i < batch.size; ++i) {
+        if (!batchKeys[i].equals(streamingKey)) {
+          // We've encountered a new key, must save current one
+          // We can't forward yet, the aggregators have not been evaluated
+          rowsToFlush[flushMark] = currentStreamingAggregators;
+          if (keysToFlush[flushMark] == null) {
+            keysToFlush[flushMark] = (VectorHashKeyWrapper) streamingKey.copyKey();
+          }
+          else {
+            streamingKey.duplicateTo(keysToFlush[flushMark]);
+          }
+
+          currentStreamingAggregators = streamAggregationBufferRowPool.getFromPool();
+          batchKeys[i].duplicateTo(streamingKey);
+          ++flushMark;
+        }
+        aggregationBatchInfo.mapAggregationBufferSet(currentStreamingAggregators, i);
+      }
+
+      // evaluate the aggregators
+      processAggregators(batch);
+
+      // Now flush/forward all keys/rows, except the last (current) one
+      for (int i = 0; i < flushMark; ++i) {
+        flushSingleRow(keysToFlush[i], rowsToFlush[i]);
+        rowsToFlush[i].reset();
+        streamAggregationBufferRowPool.putInPool(rowsToFlush[i]);
+      }
+    }
+
+    @Override
+    public void close(boolean aborted) throws HiveException {
+      if (!aborted && null != streamingKey) {
+        flushSingleRow(streamingKey, currentStreamingAggregators);
+      }
+    }
+  }
 
   /**
-   * The global key-aggregation hash map.
+   * Current processing mode. Processing mode can change (eg. hash -> streaming).
    */
-  private transient Map<KeyWrapper, VectorAggregationBufferRow> mapKeysAggregationBuffers;
+  private transient IProcessingMode processingMode;
 
   private static final long serialVersionUID = 1L;
 
@@ -150,16 +643,6 @@ public class VectorGroupByOperator exten
 
   @Override
   protected void initializeOp(Configuration hconf) throws HiveException {
-    
-    // hconf is null in unit testing
-    if (null != hconf) {
-      this.percentEntriesToFlush = HiveConf.getFloatVar(hconf,
-        HiveConf.ConfVars.HIVE_VECTORIZATION_GROUPBY_FLUSH_PERCENT);
-      this.checkInterval = HiveConf.getIntVar(hconf,
-        HiveConf.ConfVars.HIVE_VECTORIZATION_GROUPBY_CHECKINTERVAL);
-      this.maxHtEntries = HiveConf.getIntVar(hconf,
-          HiveConf.ConfVars.HIVE_VECTORIZATION_GROUPBY_MAXENTRIES);
-    }
 
     List<ObjectInspector> objectInspectors = new ArrayList<ObjectInspector>();
 
@@ -182,7 +665,6 @@ public class VectorGroupByOperator exten
       keyWrappersBatch = VectorHashKeyWrapperBatch.compileKeyWrapperBatch(keyExpressions);
       aggregationBatchInfo = new VectorAggregationBufferBatch();
       aggregationBatchInfo.compileAggregationBatchInfo(aggregators);
-      mapKeysAggregationBuffers = new HashMap<KeyWrapper, VectorAggregationBufferRow>();
 
       List<String> outputFieldNames = conf.getOutputColumnNames();
       outputObjInspector = ObjectInspectorFactory.getStandardStructObjectInspector(
@@ -194,271 +676,69 @@ public class VectorGroupByOperator exten
       throw new HiveException(e);
     }
 
-    computeMemoryLimits();
-
     initializeChildren(hconf);
-  }
 
-  /**
-   * Computes the memory limits for hash table flush (spill).
-   */
-  private void computeMemoryLimits() {
-    JavaDataModel model = JavaDataModel.get();
+    forwardCache =new Object[keyExpressions.length + aggregators.length];
 
-    fixedHashEntrySize =
-        model.hashMapEntry() +
-        keyWrappersBatch.getKeysFixedSize() +
-        aggregationBatchInfo.getAggregatorsFixedSize();
-
-    MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
-    maxMemory = memoryMXBean.getHeapMemoryUsage().getMax();
-    memoryThreshold = conf.getMemoryThreshold();
-    // Tests may leave this unitialized, so better set it to 1
-    if (memoryThreshold == 0.0f) {
-      memoryThreshold = 1.0f;
+    if (keyExpressions.length == 0) {
+      processingMode = this.new ProcessingModeGlobalAggregate();
     }
-
-    maxHashTblMemory = (int)(maxMemory * memoryThreshold);
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug(String.format("maxMemory:%dMb (%d * %f) fixSize:%d (key:%d agg:%d)",
-          maxHashTblMemory/1024/1024,
-          maxMemory/1024/1024,
-          memoryThreshold,
-          fixedHashEntrySize,
-          keyWrappersBatch.getKeysFixedSize(),
-          aggregationBatchInfo.getAggregatorsFixedSize()));
+    else {
+      //TODO: consider if parent can offer order guarantees
+      // If input is sorted, is more efficient to use the streaming mode
+      processingMode = this.new ProcessingModeHashAggregate();
     }
+    processingMode.initialize(hconf);
+  }
 
+  /**
+   * changes the processing mode to streaming
+   * This is done at the request of the hash agg mode, if the number of keys 
+   * exceeds the minReductionHashAggr factor
+   * @throws HiveException 
+   */
+  private void changeToStreamingMode() throws HiveException {
+    processingMode = this.new ProcessingModeStreaming();
+    processingMode.initialize(null);
+    LOG.trace("switched to streaming mode");
   }
 
   @Override
   public void processOp(Object row, int tag) throws HiveException {
     VectorizedRowBatch batch = (VectorizedRowBatch) row;
 
-    // First we traverse the batch to evaluate and prepare the KeyWrappers
-    // After this the KeyWrappers are properly set and hash code is computed
-    keyWrappersBatch.evaluateBatch(batch);
-
-    // Next we locate the aggregation buffer set for each key
-    prepareBatchAggregationBufferSets(batch);
-
-    // Finally, evaluate the aggregators
-    processAggregators(batch);
-
-    //Flush if memory limits were reached
-    // We keep flushing until the memory is under threshold 
-    int preFlushEntriesCount = numEntriesHashTable;
-    while (shouldFlush(batch)) {
-      flush(false);
-      
-      if(gcCanary.get() == null) {
-        gcCanaryFlushes++;
-        gcCanary = new SoftReference<Object>(new Object()); 
-      }
-      //Validate that some progress is being made
-      if (!(numEntriesHashTable < preFlushEntriesCount)) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(String.format("Flush did not progress: %d entries before, %d entries after",
-              preFlushEntriesCount,
-              numEntriesHashTable));
-        }
-        break;
-      }
-      preFlushEntriesCount = numEntriesHashTable;
+    if (batch.size > 0) {
+      processingMode.processBatch(batch);
     }
-
-    if (sumBatchSize == 0 && 0 != batch.size) {
-      // Sample the first batch processed for variable sizes.
-      updateAvgVariableSize(batch);
-    }
-
-    sumBatchSize += batch.size;
   }
 
   /**
-   * Flushes the entries in the hash table by emiting output (forward).
-   * When parameter 'all' is true all the entries are flushed.
-   * @param all
+   * Emits a single row, made from the key and the row aggregation buffers values
+   * kw is null if keyExpressions.length is 0
+   * @param kw
+   * @param agg
    * @throws HiveException
    */
-  private void flush(boolean all) throws HiveException {
-
-    int entriesToFlush = all ? numEntriesHashTable :
-      (int)(numEntriesHashTable * this.percentEntriesToFlush);
-    int entriesFlushed = 0;
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug(String.format("Flush %d %s entries:%d fixed:%d variable:%d (used:%dMb max:%dMb gcCanary: %s)",
-          entriesToFlush, all ? "(all)" : "",
-          numEntriesHashTable, fixedHashEntrySize, avgVariableSize,
-          numEntriesHashTable * (fixedHashEntrySize + avgVariableSize)/1024/1024,
-          maxHashTblMemory/1024/1024, gcCanary.get() == null ? "dead" : "alive"));
-    }
-
-    Object[] forwardCache = new Object[keyExpressions.length + aggregators.length];
-    if (keyExpressions.length == 0 && mapKeysAggregationBuffers.isEmpty()) {
-      // if this is a global aggregation (no keys) and empty set, must still emit NULLs
-      VectorAggregationBufferRow emptyBuffers = allocateAggregationBuffer();
-      for (int i = 0; i < aggregators.length; ++i) {
-        forwardCache[i] = aggregators[i].evaluateOutput(emptyBuffers.getAggregationBuffer(i));
-      }
-      forward(forwardCache, outputObjInspector);
-    } else {
-      /* Iterate the global (keywrapper,aggregationbuffers) map and emit
-       a row for each key */
-      Iterator<Map.Entry<KeyWrapper, VectorAggregationBufferRow>> iter =
-          mapKeysAggregationBuffers.entrySet().iterator();
-      while(iter.hasNext()) {
-        Map.Entry<KeyWrapper, VectorAggregationBufferRow> pair = iter.next();
-        int fi = 0;
-        for (int i = 0; i < keyExpressions.length; ++i) {
-          VectorHashKeyWrapper kw = (VectorHashKeyWrapper)pair.getKey();
-          forwardCache[fi++] = keyWrappersBatch.getWritableKeyValue (
-              kw, i, keyOutputWriters[i]);
-        }
-        for (int i = 0; i < aggregators.length; ++i) {
-          forwardCache[fi++] = aggregators[i].evaluateOutput(pair.getValue()
-              .getAggregationBuffer(i));
-        }
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(String.format("forwarding keys: %s: %s",
-              pair.getKey().toString(), Arrays.toString(forwardCache)));
-        }
-        forward(forwardCache, outputObjInspector);
-
-        if (!all) {
-          iter.remove();
-          --numEntriesHashTable;
-          if (++entriesFlushed >= entriesToFlush) {
-            break;
-          }
-        }
-      }
-    }
-
-    if (all) {
-      mapKeysAggregationBuffers.clear();
-      numEntriesHashTable = 0;
-    }
-  }
-
-  /**
-   * Returns true if the memory threshold for the hash table was reached.
-   */
-  private boolean shouldFlush(VectorizedRowBatch batch) {
-    if (batch.size == 0) {
-      return false;
-    }
-    //numEntriesSinceCheck is the number of entries added to the hash table
-    // since the last time we checked the average variable size
-    if (numEntriesSinceCheck >= this.checkInterval) {
-      // Were going to update the average variable row size by sampling the current batch
-      updateAvgVariableSize(batch);
-      numEntriesSinceCheck = 0;
-    }
-    if(numEntriesHashTable > this.maxHtEntries ||
-        numEntriesHashTable * (fixedHashEntrySize + avgVariableSize) > maxHashTblMemory) {
-      return true;
-    } else {
-      return (gcCanary.get() == null);
-    }
-  }
-
-  /**
-   * Updates the average variable size of the hash table entries.
-   * The average is only updates by probing the batch that added the entry in the hash table
-   * that caused the check threshold to be reached.
-   */
-  private void updateAvgVariableSize(VectorizedRowBatch batch) {
-    int keyVariableSize = keyWrappersBatch.getVariableSize(batch.size);
-    int aggVariableSize = aggregationBatchInfo.getVariableSize(batch.size);
-
-    // This assumes the distribution of variable size keys/aggregates in the input
-    // is the same as the distribution of variable sizes in the hash entries
-    avgVariableSize = (int)((avgVariableSize * sumBatchSize + keyVariableSize +aggVariableSize) /
-        (sumBatchSize + batch.size));
-  }
-
-  /**
-   * Evaluates the aggregators on the current batch.
-   * The aggregationBatchInfo must have been prepared
-   * by calling {@link #prepareBatchAggregationBufferSets} first.
-   */
-  private void processAggregators(VectorizedRowBatch batch) throws HiveException {
-    // We now have a vector of aggregation buffer sets to use for each row
-    // We can start computing the aggregates.
-    // If the number of distinct keys in the batch is 1 we can
-    // use the optimized code path of aggregateInput
-    VectorAggregationBufferRow[] aggregationBufferSets =
-        aggregationBatchInfo.getAggregationBuffers();
-    if (aggregationBatchInfo.getDistinctBufferSetCount() == 1) {
-      VectorAggregateExpression.AggregationBuffer[] aggregationBuffers =
-          aggregationBufferSets[0].getAggregationBuffers();
-      for (int i = 0; i < aggregators.length; ++i) {
-        aggregators[i].aggregateInput(aggregationBuffers[i], batch);
-      }
-    } else {
-      for (int i = 0; i < aggregators.length; ++i) {
-        aggregators[i].aggregateInputSelection(
-            aggregationBufferSets,
-            i,
-            batch);
-      }
+  private void flushSingleRow(VectorHashKeyWrapper kw, VectorAggregationBufferRow agg)
+      throws HiveException {
+    int fi = 0;
+    for (int i = 0; i < keyExpressions.length; ++i) {
+      forwardCache[fi++] = keyWrappersBatch.getWritableKeyValue (
+          kw, i, keyOutputWriters[i]);
     }
-  }
-
-  /**
-   * Locates the aggregation buffer sets to use for each key in the current batch.
-   * The keyWrappersBatch must have evaluated the current batch first.
-   */
-  private void prepareBatchAggregationBufferSets(VectorizedRowBatch batch) throws HiveException {
-    // The aggregation batch vector needs to know when we start a new batch
-    // to bump its internal version.
-    aggregationBatchInfo.startBatch();
-
-    // We now have to probe the global hash and find-or-allocate
-    // the aggregation buffers to use for each key present in the batch
-    VectorHashKeyWrapper[] keyWrappers = keyWrappersBatch.getVectorHashKeyWrappers();
-    for (int i=0; i < batch.size; ++i) {
-      VectorHashKeyWrapper kw = keyWrappers[i];
-      VectorAggregationBufferRow aggregationBuffer = mapKeysAggregationBuffers.get(kw);
-      if (null == aggregationBuffer) {
-        // the probe failed, we must allocate a set of aggregation buffers
-        // and push the (keywrapper,buffers) pair into the hash.
-        // is very important to clone the keywrapper, the one we have from our
-        // keyWrappersBatch is going to be reset/reused on next batch.
-        aggregationBuffer = allocateAggregationBuffer();
-        mapKeysAggregationBuffers.put(kw.copyKey(), aggregationBuffer);
-        numEntriesHashTable++;
-        numEntriesSinceCheck++;
-      }
-      aggregationBatchInfo.mapAggregationBufferSet(aggregationBuffer, i);
+    for (int i = 0; i < aggregators.length; ++i) {
+      forwardCache[fi++] = aggregators[i].evaluateOutput(agg.getAggregationBuffer(i));
     }
-  }
-
-  /**
-   * allocates a new aggregation buffer set.
-   */
-  private VectorAggregationBufferRow allocateAggregationBuffer() throws HiveException {
-    VectorAggregateExpression.AggregationBuffer[] aggregationBuffers =
-        new VectorAggregateExpression.AggregationBuffer[aggregators.length];
-    for (int i=0; i < aggregators.length; ++i) {
-      aggregationBuffers[i] = aggregators[i].getNewAggregationBuffer();
-      aggregators[i].reset(aggregationBuffers[i]);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(String.format("forwarding keys: %s: %s",
+          kw, Arrays.toString(forwardCache)));
     }
-    VectorAggregationBufferRow bufferSet = new VectorAggregationBufferRow(aggregationBuffers);
-    return bufferSet;
+    forward(forwardCache, outputObjInspector);
   }
 
   @Override
   public void closeOp(boolean aborted) throws HiveException {
-    if(LOG.isDebugEnabled()) {
-      LOG.debug(String.format("GC canary caused %d flushes", gcCanaryFlushes));
-    }
-    if (!aborted) {
-      flush(true);
-    }
+    processingMode.close(aborted);
   }
 
   static public String getOperatorName() {
@@ -482,4 +762,3 @@ public class VectorGroupByOperator exten
   }
 
 }
-

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapper.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapper.java?rev=1580179&r1=1580178&r2=1580179&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapper.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapper.java Sat Mar 22 08:08:01 2014
@@ -143,6 +143,11 @@ public class VectorHashKeyWrapper extend
   @Override
   protected Object clone() {
     VectorHashKeyWrapper clone = new VectorHashKeyWrapper();
+    duplicateTo(clone);
+    return clone;
+  }
+    
+  public void duplicateTo(VectorHashKeyWrapper clone) {
     clone.longValues = longValues.clone();
     clone.doubleValues = doubleValues.clone();
     clone.isNull = isNull.clone();
@@ -167,7 +172,6 @@ public class VectorHashKeyWrapper extend
     }
     clone.hashcode = hashcode;
     assert clone.equals(this);
-    return clone;
   }
 
   @Override

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapperBatch.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapperBatch.java?rev=1580179&r1=1580178&r2=1580179&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapperBatch.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapperBatch.java Sat Mar 22 08:08:01 2014
@@ -617,9 +617,8 @@ public class VectorHashKeyWrapperBatch {
     compiledKeyWrapperBatch.vectorHashKeyWrappers =
         new VectorHashKeyWrapper[VectorizedRowBatch.DEFAULT_SIZE];
     for(int i=0;i<VectorizedRowBatch.DEFAULT_SIZE; ++i) {
-      compiledKeyWrapperBatch.vectorHashKeyWrappers[i] =
-          new VectorHashKeyWrapper(longIndicesIndex, doubleIndicesIndex,
-                  stringIndicesIndex, decimalIndicesIndex);
+      compiledKeyWrapperBatch.vectorHashKeyWrappers[i] = 
+          compiledKeyWrapperBatch.allocateKeyWrapper();
     }
 
     JavaDataModel model = JavaDataModel.get();
@@ -643,6 +642,11 @@ public class VectorHashKeyWrapperBatch {
 
     return compiledKeyWrapperBatch;
   }
+  
+  public VectorHashKeyWrapper allocateKeyWrapper() {
+    return new VectorHashKeyWrapper(longIndices.length, doubleIndices.length,
+        stringIndices.length, decimalIndices.length);
+  }
 
   /**
    * Get the row-mode writable object value of a key from a key wrapper

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorUtilBatchObjectPool.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorUtilBatchObjectPool.java?rev=1580179&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorUtilBatchObjectPool.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorUtilBatchObjectPool.java Sat Mar 22 08:08:01 2014
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector;
+
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+
+/**
+ * 
+ */
+public class VectorUtilBatchObjectPool<T extends Object> {
+  private final T[] buffer;
+  
+  /**
+   * Head of the pool. This is where where we should insert the next
+   * object returned to the pool  
+   */
+  private int head = 0;
+  
+  /**
+   * Count of available elements. They are behind the head, with wrap-around
+   * The head itself is not free, is null
+   */
+  private int count = 0;
+  
+  private IAllocator<T> allocator; 
+  
+  public static interface IAllocator<T> {
+    public T alloc() throws HiveException;
+    public void free(T t);
+  }
+  
+  @SuppressWarnings("unchecked")
+  public VectorUtilBatchObjectPool(int size, IAllocator<T> allocator) {
+    buffer = (T[]) new Object[size];
+    this.allocator = allocator;
+  }
+  
+  public T getFromPool() throws HiveException {
+    T ret = null;
+    if (count == 0) {
+      // Pool is exhausted, return a new object
+      ret = allocator.alloc();
+    }
+    else {
+      int tail = (head + buffer.length - count) % buffer.length;
+      ret = buffer[tail];
+      buffer[tail] = null;
+      --count;
+    }
+    
+    return ret;
+  }
+  
+  public void putInPool(T object) {
+    if (count < buffer.length) {
+      buffer[head] = object;
+      ++count;
+      ++head;
+      if (head == buffer.length) {
+        head = 0;
+      }
+    }
+    else {
+      allocator.free(object);
+    }
+  }
+}

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorAggregateExpression.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorAggregateExpression.java?rev=1580179&r1=1580178&r2=1580179&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorAggregateExpression.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorAggregateExpression.java Sat Mar 22 08:08:01 2014
@@ -38,6 +38,8 @@ public abstract class VectorAggregateExp
    */
   public static interface AggregationBuffer extends Serializable {
     int getVariableSize();
+
+    void reset();
   };
 
   public abstract AggregationBuffer getNewAggregationBuffer() throws HiveException;

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFAvgDecimal.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFAvgDecimal.java?rev=1580179&r1=1580178&r2=1580179&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFAvgDecimal.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFAvgDecimal.java Sat Mar 22 08:08:01 2014
@@ -83,6 +83,13 @@ public class VectorUDAFAvgDecimal extend
       public int getVariableSize() {
         throw new UnsupportedOperationException();
       }
+
+      @Override
+      public void reset() {
+        isNull = true;
+        sum.zeroClear();
+        count = 0L;
+      }
     }
 
     private VectorExpression inputExpression;
@@ -463,7 +470,7 @@ public class VectorUDAFAvgDecimal extend
     @Override
     public void reset(AggregationBuffer agg) throws HiveException {
       Aggregation myAgg = (Aggregation) agg;
-      myAgg.isNull = true;
+      myAgg.reset();
     }
 
     @Override

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCount.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCount.java?rev=1580179&r1=1580178&r2=1580179&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCount.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCount.java Sat Mar 22 08:08:01 2014
@@ -60,6 +60,12 @@ public class VectorUDAFCount extends Vec
       public int getVariableSize() {
         throw new UnsupportedOperationException();
       }
+
+      @Override
+      public void reset() {
+        isNull = true;
+        value = 0L;
+      }
     }
 
     private VectorExpression inputExpression = null;
@@ -240,7 +246,7 @@ public class VectorUDAFCount extends Vec
     @Override
     public void reset(AggregationBuffer agg) throws HiveException {
       Aggregation myAgg = (Aggregation) agg;
-      myAgg.isNull = true;
+      myAgg.reset();
     }
 
     @Override

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCountStar.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCountStar.java?rev=1580179&r1=1580178&r2=1580179&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCountStar.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCountStar.java Sat Mar 22 08:08:01 2014
@@ -51,6 +51,12 @@ public class VectorUDAFCountStar extends
       public int getVariableSize() {
         throw new UnsupportedOperationException();
       }
+
+      @Override
+      public void reset() {
+        isNull = true;
+        value = 0L;
+      }
     }
 
     transient private final LongWritable result;
@@ -117,7 +123,7 @@ public class VectorUDAFCountStar extends
     @Override
     public void reset(AggregationBuffer agg) throws HiveException {
       Aggregation myAgg = (Aggregation) agg;
-      myAgg.isNull = true;
+      myAgg.reset();
     }
 
     @Override

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFSumDecimal.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFSumDecimal.java?rev=1580179&r1=1580178&r2=1580179&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFSumDecimal.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFSumDecimal.java Sat Mar 22 08:08:01 2014
@@ -64,6 +64,12 @@ public class VectorUDAFSumDecimal extend
       public int getVariableSize() {
         throw new UnsupportedOperationException();
       }
+
+      @Override
+      public void reset() {
+        isNull = true;
+        sum.zeroClear();
+      }
     }
 
     private VectorExpression inputExpression;
@@ -411,7 +417,7 @@ public class VectorUDAFSumDecimal extend
     @Override
     public void reset(AggregationBuffer agg) throws HiveException {
       Aggregation myAgg = (Aggregation) agg;
-      myAgg.isNull = true;
+      myAgg.reset();
     }
 
     @Override



Mime
View raw message