drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From prog...@apache.org
Subject [1/2] drill git commit: DRILL-5457: Spill implementation for Hash Aggregate
Date Wed, 21 Jun 2017 00:19:12 GMT
Repository: drill
Updated Branches:
  refs/heads/master be43a9edd -> c16e5f807


http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
index 7cc43ad..21d5a4a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
@@ -24,8 +24,8 @@ import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.exec.compile.TemplateClassDefinition;
 import org.apache.drill.exec.exception.ClassTransformationException;
 import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.ops.OperatorStats;
 import org.apache.drill.exec.physical.config.HashAggregate;
 import org.apache.drill.exec.physical.impl.common.HashTableConfig;
@@ -40,13 +40,17 @@ public interface HashAggregator {
       new TemplateClassDefinition<HashAggregator>(HashAggregator.class, HashAggTemplate.class);
 
   public static enum AggOutcome {
-    RETURN_OUTCOME, CLEANUP_AND_RETURN, UPDATE_AGGREGATOR
+    RETURN_OUTCOME, CLEANUP_AND_RETURN, UPDATE_AGGREGATOR, CALL_WORK_AGAIN
   }
 
+  // For returning results from outputCurrentBatch
+  // OK - batch returned, NONE - end of data, RESTART - call again
+  public enum AggIterOutcome { AGG_OK, AGG_NONE, AGG_RESTART }
+
   public abstract void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, FragmentContext context,
-      OperatorStats stats, BufferAllocator allocator, RecordBatch incoming, HashAggBatch outgoing,
-      LogicalExpression[] valueExprs, List<TypedFieldId> valueFieldIds, TypedFieldId[] keyFieldIds,
-      VectorContainer outContainer) throws SchemaChangeException, IOException, ClassTransformationException;
+                             OperatorStats stats, OperatorContext oContext, RecordBatch incoming, HashAggBatch outgoing,
+                             LogicalExpression[] valueExprs, List<TypedFieldId> valueFieldIds, TypedFieldId[] keyFieldIds,
+                             VectorContainer outContainer) throws SchemaChangeException, IOException, ClassTransformationException;
 
   public abstract IterOutcome getOutcome();
 
@@ -60,6 +64,9 @@ public interface HashAggregator {
 
   public abstract boolean buildComplete();
 
-  public abstract IterOutcome outputCurrentBatch();
+  public abstract AggIterOutcome outputCurrentBatch();
+
+  public abstract boolean earlyOutput();
 
+  public abstract RecordBatch getNewIncoming();
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java
new file mode 100644
index 0000000..b05353e
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.aggregate;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.cache.VectorAccessibleSerializable;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.spill.SpillSet;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.CloseableRecordBatch;
+import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.record.WritableBatch;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+
+/**
+ * A class to replace "incoming" - instead scanning a spilled partition file
+ */
+public class SpilledRecordbatch implements CloseableRecordBatch {
+  private VectorContainer container;
+  private InputStream spillStream;
+  private int spilledBatches;
+  private FragmentContext context;
+  private BatchSchema schema;
+  private OperatorContext oContext;
+  private SpillSet spillSet;
+  // Path spillStreamPath;
+  private String spillFile;
+  VectorAccessibleSerializable vas;
+
+  public SpilledRecordbatch(String spillFile,/* Path spillStreamPath,*/ int spilledBatches, FragmentContext context, BatchSchema schema, OperatorContext oContext, SpillSet spillSet) {
+    this.context = context;
+    this.schema = schema;
+    this.spilledBatches = spilledBatches;
+    this.oContext = oContext;
+    this.spillSet = spillSet;
+    //this.spillStreamPath = spillStreamPath;
+    this.spillFile = spillFile;
+    vas = new VectorAccessibleSerializable(oContext.getAllocator());
+    container = vas.get();
+
+    try {
+      this.spillStream = this.spillSet.openForInput(spillFile);
+    } catch (IOException e) { throw new RuntimeException(e);}
+
+    next(); // initialize the container
+  }
+
+  @Override
+  public SelectionVector2 getSelectionVector2() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public SelectionVector4 getSelectionVector4() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public TypedFieldId getValueVectorId(SchemaPath path) {
+    return container.getValueVectorId(path);
+  }
+
+  @Override
+  public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids) {
+    return container.getValueAccessorById(clazz, ids);
+  }
+
+  @Override
+  public Iterator<VectorWrapper<?>> iterator() {
+    return container.iterator();
+  }
+
+  @Override
+  public FragmentContext getContext() { return context; }
+
+  @Override
+  public BatchSchema getSchema() { return schema; }
+
+  @Override
+  public WritableBatch getWritableBatch() {
+    return WritableBatch.get(this);
+  }
+
+  @Override
+  public VectorContainer getOutgoingContainer() { return container; }
+
+  @Override
+  public int getRecordCount() { return container.getRecordCount(); }
+
+  @Override
+  public void kill(boolean sendUpstream) {
+    this.close(); // delete the current spill file
+  }
+
+  /**
+   * Read the next batch from the spill file
+   *
+   * @return IterOutcome
+   */
+  @Override
+  public IterOutcome next() {
+
+    if ( spilledBatches <= 0 ) { // no more batches to read in this partition
+      this.close();
+      return IterOutcome.NONE;
+    }
+
+    if ( spillStream == null ) {
+      throw new IllegalStateException("Spill stream was null");
+    };
+
+    if ( spillSet.getPosition(spillStream)  < 0 ) {
+      HashAggTemplate.logger.warn("Position is {} for stream {}", spillSet.getPosition(spillStream), spillStream.toString());
+    }
+
+    try {
+      if ( container.getNumberOfColumns() > 0 ) { // container already initialized
+        // Pass our container to the reader because other classes (e.g. HashAggBatch, HashTable)
+        // may have a reference to this container (as an "incoming")
+        vas.readFromStreamWithContainer(container, spillStream);
+      }
+      else { // first time - create a container
+        vas.readFromStream(spillStream);
+        container = vas.get();
+      }
+    } catch (IOException e) {
+      throw UserException.dataReadError(e).addContext("Failed reading from a spill file").build(HashAggTemplate.logger);
+    }
+
+    spilledBatches-- ; // one less batch to read
+    return IterOutcome.OK;
+  }
+
+  @Override
+  public void close() {
+    container.clear();
+    try {
+      if (spillStream != null) {
+        spillStream.close();
+        spillStream = null;
+      }
+
+      spillSet.delete(spillFile);
+    }
+    catch (IOException e) {
+      throw new RuntimeException(e);
+    } finally {
+      spillSet.close();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
index 77ebb0d..436480e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
@@ -114,7 +114,7 @@ public class ChainedHashTable {
   private HashTableConfig htConfig;
   private final FragmentContext context;
   private final BufferAllocator allocator;
-  private final RecordBatch incomingBuild;
+  private RecordBatch incomingBuild;
   private final RecordBatch incomingProbe;
   private final RecordBatch outgoing;
 
@@ -129,14 +129,18 @@ public class ChainedHashTable {
     this.outgoing = outgoing;
   }
 
-  public HashTable createAndSetupHashTable(TypedFieldId[] outKeyFieldIds) throws ClassTransformationException,
+  public void updateIncoming(RecordBatch incomingBuild) {
+    this.incomingBuild = incomingBuild;
+  }
+
+  public HashTable createAndSetupHashTable(TypedFieldId[] outKeyFieldIds, int numPartitions) throws ClassTransformationException,
       IOException, SchemaChangeException {
     CodeGenerator<HashTable> top = CodeGenerator.get(HashTable.TEMPLATE_DEFINITION, context.getFunctionRegistry(), context.getOptions());
     top.plainJavaCapable(true);
     // Uncomment out this line to debug the generated code.
     // This code is called from generated code, so to step into this code,
     // persist the code generated in HashAggBatch also.
-//  top.saveCodeForDebugging(true);
+    // top.saveCodeForDebugging(true);
     ClassGenerator<HashTable> cg = top.getRoot();
     ClassGenerator<HashTable> cgInner = cg.getInnerGenerator("BatchHolder");
 

http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
index ef7dadf..9c93c16 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.physical.impl.common;
 
 import org.apache.drill.exec.compile.TemplateClassDefinition;
+import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.record.RecordBatch;
@@ -43,7 +44,7 @@ public interface HashTable {
    */
   static final public float DEFAULT_LOAD_FACTOR = 0.75f;
 
-  static public enum PutStatus {KEY_PRESENT, KEY_ADDED, PUT_FAILED;}
+  static public enum PutStatus {KEY_PRESENT, KEY_ADDED, NEW_BATCH_ADDED, KEY_ADDED_LAST, PUT_FAILED;}
 
   /**
    * The batch size used for internal batch holders
@@ -51,30 +52,35 @@ public interface HashTable {
   static final public int BATCH_SIZE = Character.MAX_VALUE + 1;
   static final public int BATCH_MASK = 0x0000FFFF;
 
-  /** Variable width vector size in bytes */
-  public static final int VARIABLE_WIDTH_VECTOR_SIZE = 50 * BATCH_SIZE;
+  public void setup(HashTableConfig htConfig, FragmentContext context, BufferAllocator allocator, RecordBatch incomingBuild, RecordBatch incomingProbe, RecordBatch outgoing, VectorContainer htContainerOrig);
 
-  public void setup(HashTableConfig htConfig, FragmentContext context, BufferAllocator allocator,
-      RecordBatch incomingBuild, RecordBatch incomingProbe,
-      RecordBatch outgoing, VectorContainer htContainerOrig);
+  public void updateBatches() throws SchemaChangeException;
 
-  public void updateBatches();
+  public int getHashCode(int incomingRowIdx) throws SchemaChangeException;
 
-  public void put(int incomingRowIdx, IndexPointer htIdxHolder, int retryCount);
+  public PutStatus put(int incomingRowIdx, IndexPointer htIdxHolder, int hashCode) throws SchemaChangeException;
 
-  public int containsKey(int incomingRowIdx, boolean isProbe);
+  public int containsKey(int incomingRowIdx, boolean isProbe) throws SchemaChangeException;
 
   public void getStats(HashTableStats stats);
 
+  public long extraMemoryNeededForResize();
+
   public int size();
 
   public boolean isEmpty();
 
   public void clear();
 
+  public void reinit(RecordBatch newIncoming);
+
+  public void reset();
+
+  public void setMaxVarcharSize(int size);
+
   public boolean outputKeys(int batchIdx, VectorContainer outContainer, int outStartIndex, int numRecords);
 
-  public void addNewKeyBatch();
+  // public void addNewKeyBatch();
 }
 
 

http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableStats.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableStats.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableStats.java
index c494c85..7baa9d9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableStats.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableStats.java
@@ -26,6 +26,13 @@ public class HashTableStats {
 
   public HashTableStats() {
   }
+
+  public void addStats (HashTableStats newStats) {
+    this.numBuckets += newStats.numBuckets ;
+    this.numEntries += newStats.numEntries ;
+    this.numResizing += newStats.numResizing ;
+    this.resizingTime += newStats.resizingTime ;
+  }
 }
 
 

http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
index 96f9422..3209c27 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
@@ -25,6 +25,7 @@ import javax.inject.Named;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.compile.sig.RuntimeOverridden;
+import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
@@ -50,14 +51,19 @@ public abstract class HashTableTemplate implements HashTable {
   // A hash 'bucket' consists of the start index to indicate start of a hash chain
 
   // Array of start indexes. start index is a global index across all batch holders
+  //  This is the "classic hash table", where  Hash-Value % size-of-table  yields
+  // the offset/position (in the startIndices) of the beginning of the hash chain.
   private IntVector startIndices;
 
   // Array of batch holders..each batch holder can hold up to BATCH_SIZE entries
   private ArrayList<BatchHolder> batchHolders;
 
-  // Size of the hash table in terms of number of buckets
+  // Current size of the hash table in terms of number of buckets
   private int tableSize = 0;
 
+  // Original size of the hash table (needed when re-initializing)
+  private int originalTableSize;
+
   // Threshold after which we rehash; It must be the tableSize * loadFactor
   private int threshold;
 
@@ -95,6 +101,8 @@ public abstract class HashTableTemplate implements HashTable {
 
   private int resizingTime = 0;
 
+  private int maxVarcharSize = 8; // for varchar allocation
+
   // This class encapsulates the links, keys and values for up to BATCH_SIZE
   // *unique* records. Thus, suppose there are N incoming record batches, each
   // of size BATCH_SIZE..but they have M unique keys altogether, the number of
@@ -134,7 +142,9 @@ public abstract class HashTableTemplate implements HashTable {
           if (vv instanceof FixedWidthVector) {
             ((FixedWidthVector) vv).allocateNew(BATCH_SIZE);
           } else if (vv instanceof VariableWidthVector) {
-            ((VariableWidthVector) vv).allocateNew(VARIABLE_WIDTH_VECTOR_SIZE, BATCH_SIZE);
+            long beforeMem = allocator.getAllocatedMemory();
+            ((VariableWidthVector) vv).allocateNew(maxVarcharSize * BATCH_SIZE, BATCH_SIZE);
+            logger.trace("HT allocated {} for varchar of max width {}",allocator.getAllocatedMemory() - beforeMem, maxVarcharSize);
           } else {
             vv.allocateNew();
           }
@@ -166,7 +176,7 @@ public abstract class HashTableTemplate implements HashTable {
       hashValues.getMutator().setValueCount(size);
     }
 
-    protected void setup() {
+    protected void setup() throws SchemaChangeException {
       setupInterior(incomingBuild, incomingProbe, outgoing, htContainer);
     }
 
@@ -175,7 +185,7 @@ public abstract class HashTableTemplate implements HashTable {
     // currentIdxHolder with the index of the next link.
     private boolean isKeyMatch(int incomingRowIdx,
         IndexPointer currentIdxHolder,
-        boolean isProbe) {
+        boolean isProbe) throws SchemaChangeException {
 
       int currentIdxWithinBatch = currentIdxHolder.value & BATCH_MASK;
       boolean match = false;
@@ -201,7 +211,7 @@ public abstract class HashTableTemplate implements HashTable {
 
     // Insert a new <key1, key2...keyN> entry coming from the incoming batch into the hash table
     // container at the specified index
-    private void insertEntry(int incomingRowIdx, int currentIdx, int hashValue, BatchHolder lastEntryBatch, int lastEntryIdxWithinBatch) {
+    private void insertEntry(int incomingRowIdx, int currentIdx, int hashValue, BatchHolder lastEntryBatch, int lastEntryIdxWithinBatch) throws SchemaChangeException {
       int currentIdxWithinBatch = currentIdx & BATCH_MASK;
 
       setValue(incomingRowIdx, currentIdxWithinBatch);
@@ -405,36 +415,34 @@ public abstract class HashTableTemplate implements HashTable {
         @Named("incomingBuild") RecordBatch incomingBuild,
         @Named("incomingProbe") RecordBatch incomingProbe,
         @Named("outgoing") RecordBatch outgoing,
-        @Named("htContainer") VectorContainer htContainer) {
+        @Named("htContainer") VectorContainer htContainer) throws SchemaChangeException {
     }
 
     @RuntimeOverridden
     protected boolean isKeyMatchInternalBuild(
-        @Named("incomingRowIdx") int incomingRowIdx, @Named("htRowIdx") int htRowIdx) {
+        @Named("incomingRowIdx") int incomingRowIdx, @Named("htRowIdx") int htRowIdx) throws SchemaChangeException {
       return false;
     }
 
     @RuntimeOverridden
     protected boolean isKeyMatchInternalProbe(
-        @Named("incomingRowIdx") int incomingRowIdx, @Named("htRowIdx") int htRowIdx) {
+        @Named("incomingRowIdx") int incomingRowIdx, @Named("htRowIdx") int htRowIdx) throws SchemaChangeException {
       return false;
     }
 
     @RuntimeOverridden
-    protected void setValue(@Named("incomingRowIdx") int incomingRowIdx, @Named("htRowIdx") int htRowIdx) {
+    protected void setValue(@Named("incomingRowIdx") int incomingRowIdx, @Named("htRowIdx") int htRowIdx) throws SchemaChangeException {
     }
 
     @RuntimeOverridden
-    protected void outputRecordKeys(@Named("htRowIdx") int htRowIdx, @Named("outRowIdx") int outRowIdx) {
+    protected void outputRecordKeys(@Named("htRowIdx") int htRowIdx, @Named("outRowIdx") int outRowIdx) throws SchemaChangeException {
     }
 
   } // class BatchHolder
 
 
   @Override
-  public void setup(HashTableConfig htConfig, FragmentContext context, BufferAllocator allocator,
-      RecordBatch incomingBuild, RecordBatch incomingProbe,
-      RecordBatch outgoing, VectorContainer htContainerOrig) {
+  public void setup(HashTableConfig htConfig, FragmentContext context, BufferAllocator allocator, RecordBatch incomingBuild, RecordBatch incomingProbe, RecordBatch outgoing, VectorContainer htContainerOrig) {
     float loadf = htConfig.getLoadFactor();
     int initialCap = htConfig.getInitialCapacity();
 
@@ -465,6 +473,7 @@ public abstract class HashTableTemplate implements HashTable {
     if (tableSize > MAXIMUM_CAPACITY) {
       tableSize = MAXIMUM_CAPACITY;
     }
+    originalTableSize = tableSize ; // retain original size
 
     threshold = (int) Math.ceil(tableSize * loadf);
 
@@ -476,13 +485,17 @@ public abstract class HashTableTemplate implements HashTable {
     batchHolders = new ArrayList<BatchHolder>();
     // First BatchHolder is created when the first put request is received.
 
-    doSetup(incomingBuild, incomingProbe);
+    try {
+      doSetup(incomingBuild, incomingProbe);
+    } catch (SchemaChangeException e) {
+      throw new IllegalStateException("Unexpected schema change", e);
+    }
 
     currentIdxHolder = new IndexPointer();
   }
 
   @Override
-  public void updateBatches() {
+  public void updateBatches() throws SchemaChangeException {
     doSetup(incomingBuild, incomingProbe);
     for (BatchHolder batchHolder : batchHolders) {
       batchHolder.setup();
@@ -497,6 +510,21 @@ public abstract class HashTableTemplate implements HashTable {
     return numResizing;
   }
 
+  /**
+   *
+   * @return Size of extra memory needed if the HT (i.e. startIndices) is doubled
+   */
+  @Override
+  public long extraMemoryNeededForResize() {
+    if (tableSize == MAXIMUM_CAPACITY) { return 0; } // will not resize
+    int newSize = roundUpToPowerOf2(2 * tableSize);
+
+    if (newSize > MAXIMUM_CAPACITY) {
+      newSize  = MAXIMUM_CAPACITY;
+    }
+    return newSize * 4 /* sizeof(int) */;
+  }
+
   @Override
   public int size() {
     return numEntries;
@@ -526,7 +554,7 @@ public abstract class HashTableTemplate implements HashTable {
       batchHolders = null;
     }
     startIndices.clear();
-    currentIdxHolder = null;
+    // currentIdxHolder = null; // keep IndexPointer in case HT is reused
     numEntries = 0;
   }
 
@@ -544,86 +572,69 @@ public abstract class HashTableTemplate implements HashTable {
     return rounded;
   }
 
-  @Override
-  public void put(int incomingRowIdx, IndexPointer htIdxHolder, int retryCount) {
-    put(incomingRowIdx, htIdxHolder);
+  public int getHashCode(int incomingRowIdx) throws SchemaChangeException {
+    return getHashBuild(incomingRowIdx);
   }
 
-  private PutStatus put(int incomingRowIdx, IndexPointer htIdxHolder) {
+  /** put() uses the hash code (from gethashCode() above) to insert the key(s) from the incoming
+   * row into the hash table. The code selects the bucket in the startIndices, then the keys are
+   * placed into the chained list - by storing the key values into a batch, and updating its
+   * "links" member. Last it modifies the index holder to the batch offset so that the caller
+   * can store the remaining parts of the row into a matching batch (outside the hash table).
+   * Returning
+   *
+   * @param incomingRowIdx - position of the incoming row
+   * @param htIdxHolder - to return batch + batch-offset (for caller to manage a matching batch)
+   * @param hashCode - computed over the key(s) by calling getHashCode()
+   * @return Status - the key(s) was ADDED or was already PRESENT
+   */
+  @Override
+  public PutStatus put(int incomingRowIdx, IndexPointer htIdxHolder, int hashCode) throws SchemaChangeException {
 
-    int hash = getHashBuild(incomingRowIdx);
-    int i = getBucketIndex(hash, numBuckets());
-    int startIdx = startIndices.getAccessor().get(i);
+    int bucketIndex = getBucketIndex(hashCode, numBuckets());
+    int startIdx = startIndices.getAccessor().get(bucketIndex);
     int currentIdx;
-    int currentIdxWithinBatch;
-    BatchHolder bh;
     BatchHolder lastEntryBatch = null;
     int lastEntryIdxWithinBatch = EMPTY_SLOT;
 
+    // if startIdx is non-empty, follow the hash chain links until we find a matching
+    // key or reach the end of the chain (and remember the last link there)
+    for ( currentIdxHolder.value = startIdx;
+          currentIdxHolder.value != EMPTY_SLOT;
+          /* isKeyMatch() below also advances the currentIdxHolder to the next link */) {
 
-    if (startIdx == EMPTY_SLOT) {
-      // this is the first entry in this bucket; find the first available slot in the
-      // container of keys and values
-      currentIdx = freeIndex++;
-      addBatchIfNeeded(currentIdx);
+      // remember the current link, which would be the last when the next link is empty
+      lastEntryBatch = batchHolders.get((currentIdxHolder.value >>> 16) & HashTable.BATCH_MASK);
+      lastEntryIdxWithinBatch = currentIdxHolder.value & BATCH_MASK;
 
-      if (EXTRA_DEBUG) {
-        logger.debug("Empty bucket index = {}. incomingRowIdx = {}; inserting new entry at currentIdx = {}.", i,
-            incomingRowIdx, currentIdx);
+      if (lastEntryBatch.isKeyMatch(incomingRowIdx, currentIdxHolder, false)) {
+        htIdxHolder.value = currentIdxHolder.value;
+        return PutStatus.KEY_PRESENT;
       }
-
-      insertEntry(incomingRowIdx, currentIdx, hash, lastEntryBatch, lastEntryIdxWithinBatch);
-      // update the start index array
-      startIndices.getMutator().setSafe(getBucketIndex(hash, numBuckets()), currentIdx);
-      htIdxHolder.value = currentIdx;
-      return PutStatus.KEY_ADDED;
     }
 
-    currentIdx = startIdx;
-    boolean found = false;
-
-    bh = batchHolders.get((currentIdx >>> 16) & BATCH_MASK);
-    currentIdxHolder.value = currentIdx;
-
-    // if startIdx is non-empty, follow the hash chain links until we find a matching
-    // key or reach the end of the chain
-    while (true) {
-      currentIdxWithinBatch = currentIdxHolder.value & BATCH_MASK;
+    // no match was found, so insert a new entry
+    currentIdx = freeIndex++;
+    boolean addedBatch = addBatchIfNeeded(currentIdx);
 
-      if (bh.isKeyMatch(incomingRowIdx, currentIdxHolder, false)) {
-        htIdxHolder.value = currentIdxHolder.value;
-        found = true;
-        break;
-      } else if (currentIdxHolder.value == EMPTY_SLOT) {
-        lastEntryBatch = bh;
-        lastEntryIdxWithinBatch = currentIdxWithinBatch;
-        break;
-      } else {
-        bh = batchHolders.get((currentIdxHolder.value >>> 16) & HashTable.BATCH_MASK);
-        lastEntryBatch = bh;
-      }
+    if (EXTRA_DEBUG) {
+      logger.debug("No match was found for incomingRowIdx = {}; inserting new entry at currentIdx = {}.", incomingRowIdx, currentIdx);
     }
 
-    if (!found) {
-      // no match was found, so insert a new entry
-      currentIdx = freeIndex++;
-      addBatchIfNeeded(currentIdx);
+    insertEntry(incomingRowIdx, currentIdx, hashCode, lastEntryBatch, lastEntryIdxWithinBatch);
 
-      if (EXTRA_DEBUG) {
-        logger.debug("No match was found for incomingRowIdx = {}; inserting new entry at currentIdx = {}.", incomingRowIdx, currentIdx);
-      }
-
-      insertEntry(incomingRowIdx, currentIdx, hash, lastEntryBatch, lastEntryIdxWithinBatch);
-      htIdxHolder.value = currentIdx;
-      return PutStatus.KEY_ADDED;
+    // if there was no hash chain at this bucket, need to update the start index array
+    if (startIdx == EMPTY_SLOT) {
+      startIndices.getMutator().setSafe(getBucketIndex(hashCode, numBuckets()), currentIdx);
     }
-
-    return found ? PutStatus.KEY_PRESENT : PutStatus.KEY_ADDED;
+    htIdxHolder.value = currentIdx;
+    return  addedBatch ? PutStatus.NEW_BATCH_ADDED :
+        ( freeIndex + 1 > batchHolders.size() * BATCH_SIZE ) ?
+        PutStatus.KEY_ADDED_LAST : // the last key in the batch
+        PutStatus.KEY_ADDED;     // otherwise
   }
 
-  private void insertEntry(int incomingRowIdx, int currentIdx, int hashValue, BatchHolder lastEntryBatch, int lastEntryIdx) {
-
-    addBatchIfNeeded(currentIdx);
+  private void insertEntry(int incomingRowIdx, int currentIdx, int hashValue, BatchHolder lastEntryBatch, int lastEntryIdx) throws SchemaChangeException {
 
     BatchHolder bh = batchHolders.get((currentIdx >>> 16) & BATCH_MASK);
 
@@ -640,60 +651,39 @@ public abstract class HashTableTemplate implements HashTable {
 
   // Return -1 if key is not found in the hash table. Otherwise, return the global index of the key
   @Override
-  public int containsKey(int incomingRowIdx, boolean isProbe) {
+  public int containsKey(int incomingRowIdx, boolean isProbe) throws SchemaChangeException {
     int hash = isProbe ? getHashProbe(incomingRowIdx) : getHashBuild(incomingRowIdx);
-    int i = getBucketIndex(hash, numBuckets());
-
-    int currentIdx = startIndices.getAccessor().get(i);
-
-    if (currentIdx == EMPTY_SLOT) {
-      return -1;
-    }
-
-    BatchHolder bh = batchHolders.get((currentIdx >>> 16) & BATCH_MASK);
-    currentIdxHolder.value = currentIdx;
+    int bucketIndex = getBucketIndex(hash, numBuckets());
 
-    boolean found = false;
-
-    while (true) {
+    for ( currentIdxHolder.value = startIndices.getAccessor().get(bucketIndex);
+          currentIdxHolder.value != EMPTY_SLOT; ) {
+      BatchHolder bh = batchHolders.get((currentIdxHolder.value >>> 16) & BATCH_MASK);
       if (bh.isKeyMatch(incomingRowIdx, currentIdxHolder, isProbe)) {
-        found = true;
-        break;
-      } else if (currentIdxHolder.value == EMPTY_SLOT) {
-        break;
-      } else {
-        bh = batchHolders.get((currentIdxHolder.value >>> 16) & BATCH_MASK);
+        return currentIdxHolder.value;
       }
     }
-
-    return found ? currentIdxHolder.value : -1;
+    return -1;
   }
 
   // Add a new BatchHolder to the list of batch holders if needed. This is based on the supplied
   // currentIdx; since each BatchHolder can hold up to BATCH_SIZE entries, if the currentIdx exceeds
-  // the capacity, we will add a new BatchHolder.
-  private BatchHolder addBatchIfNeeded(int currentIdx) {
+  // the capacity, we will add a new BatchHolder. Return true if a new batch was added.
+  private boolean addBatchIfNeeded(int currentIdx) throws SchemaChangeException {
     int totalBatchSize = batchHolders.size() * BATCH_SIZE;
 
     if (currentIdx >= totalBatchSize) {
-      BatchHolder bh = addBatchHolder();
+      BatchHolder bh = newBatchHolder(batchHolders.size());
+      batchHolders.add(bh);
+      bh.setup();
       if (EXTRA_DEBUG) {
         logger.debug("HashTable: Added new batch. Num batches = {}.", batchHolders.size());
       }
-      return bh;
-    } else {
-      return batchHolders.get(batchHolders.size() - 1);
+      return true;
     }
+    return false;
   }
 
-  private BatchHolder addBatchHolder() {
-    BatchHolder bh = newBatchHolder(batchHolders.size());
-    batchHolders.add(bh);
-    bh.setup();
-    return bh;
-  }
-
-  protected BatchHolder newBatchHolder(int index) {
+  protected BatchHolder newBatchHolder(int index) { // special method to allow debugging of gen code
     return new BatchHolder(index);
   }
 
@@ -755,6 +745,34 @@ public abstract class HashTableTemplate implements HashTable {
     numResizing++;
   }
 
+  /**
+   * Reinit the hash table to its original size, and clear up all its prior batch holder
+   *
+   */
+  public void reset() {
+    // long before = allocator.getAllocatedMemory();
+    this.clear(); // Clear all current batch holders and hash table (i.e. free their memory)
+    // long after = allocator.getAllocatedMemory();
+
+    // logger.debug("Reinit Hash Table: Memory before {} After {}  Percent after: {}",before,after, (100 * after ) / before);
+
+    freeIndex = 0; // all batch holders are gone
+    // reallocate batch holders, and the hash table to the original size
+    batchHolders = new ArrayList<BatchHolder>();
+    startIndices = allocMetadataVector(originalTableSize, EMPTY_SLOT);
+  }
+  public void reinit(RecordBatch newIncoming) {
+    incomingBuild = newIncoming;
+    reset();
+    try {
+      updateBatches();  // Needed ? (to update the new incoming?)
+    } catch (SchemaChangeException e) {
+      throw new IllegalStateException("Unexpected schema change", e);
+    } catch(IndexOutOfBoundsException ioob) {
+      throw new IllegalStateException("reinit update batches", ioob);
+    }
+  }
+
   @Override
   public boolean outputKeys(int batchIdx, VectorContainer outContainer, int outStartIndex, int numRecords) {
     assert batchIdx < batchHolders.size();
@@ -775,17 +793,20 @@ public abstract class HashTableTemplate implements HashTable {
   }
 
   @Override
+  public void setMaxVarcharSize(int size) { maxVarcharSize = size; }
+
+/*  @Override
   public void addNewKeyBatch() {
     int numberOfBatches = batchHolders.size();
     this.addBatchHolder();
     freeIndex = numberOfBatches * BATCH_SIZE;
   }
-
+*/
   // These methods will be code-generated in the context of the outer class
-  protected abstract void doSetup(@Named("incomingBuild") RecordBatch incomingBuild, @Named("incomingProbe") RecordBatch incomingProbe);
+  protected abstract void doSetup(@Named("incomingBuild") RecordBatch incomingBuild, @Named("incomingProbe") RecordBatch incomingProbe) throws SchemaChangeException;
 
-  protected abstract int getHashBuild(@Named("incomingRowIdx") int incomingRowIdx);
+  protected abstract int getHashBuild(@Named("incomingRowIdx") int incomingRowIdx) throws SchemaChangeException;
 
-  protected abstract int getHashProbe(@Named("incomingRowIdx") int incomingRowIdx);
+  protected abstract int getHashProbe(@Named("incomingRowIdx") int incomingRowIdx) throws SchemaChangeException;
 
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index e2c016b..4af1664 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -315,7 +315,7 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
     // Create the chained hash table
     final ChainedHashTable ht =
         new ChainedHashTable(htConfig, context, oContext.getAllocator(), this.right, this.left, null);
-    hashTable = ht.createAndSetupHashTable(null);
+    hashTable = ht.createAndSetupHashTable(null, 1);
   }
 
   public void executeBuildPhase() throws SchemaChangeException, ClassTransformationException, IOException {
@@ -374,7 +374,8 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
 
         // For every record in the build batch , hash the key columns
         for (int i = 0; i < currentRecordCount; i++) {
-          hashTable.put(i, htIndex, 1 /* retry count */);
+          int hashCode = hashTable.getHashCode(i);
+          hashTable.put(i, htIndex, hashCode);
 
                         /* Use the global index returned by the hash table, to store
                          * the current record index and batch index. This will be used

http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/RecordBatchSizer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/RecordBatchSizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/RecordBatchSizer.java
index 4cb2bae..a1b8169 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/RecordBatchSizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/RecordBatchSizer.java
@@ -31,6 +31,7 @@ import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.complex.AbstractMapVector;
+import org.apache.drill.exec.vector.VariableWidthVector;
 
 /**
  * Given a record batch or vector container, determines the actual memory
@@ -68,14 +69,14 @@ public class RecordBatchSizer {
     public int capacity;
     public int density;
     public int dataSize;
+    public boolean variableWidth;
 
-    public ColumnSize(ValueVector v) {
-      metadata = v.getField();
+    public ColumnSize(ValueVector vv) {
+      metadata = vv.getField();
       stdSize = TypeHelper.getSize(metadata.getType());
 
       // Can't get size estimates if this is an empty batch.
-
-      int rowCount = v.getAccessor().getValueCount();
+      int rowCount = vv.getAccessor().getValueCount();
       if (rowCount == 0) {
         estSize = stdSize;
         return;
@@ -84,17 +85,17 @@ public class RecordBatchSizer {
       // Total size taken by all vectors (and underlying buffers)
       // associated with this vector.
 
-      totalSize = v.getAllocatedByteCount();
+      totalSize = vv.getAllocatedByteCount();
 
       // Capacity is the number of values that the vector could
       // contain. This is useful only for fixed-length vectors.
 
-      capacity = v.getValueCapacity();
+      capacity = vv.getValueCapacity();
 
       // The amount of memory consumed by the payload: the actual
       // data stored in the vectors.
 
-      dataSize = v.getPayloadByteCount();
+      dataSize = vv.getPayloadByteCount();
 
       // Determine "density" the number of rows compared to potential
       // capacity. Low-density batches occur at block boundaries, ends
@@ -105,6 +106,7 @@ public class RecordBatchSizer {
 
       density = roundUp(dataSize * 100, totalSize);
       estSize = roundUp(dataSize, rowCount);
+      variableWidth = vv instanceof VariableWidthVector ;
     }
 
     @Override
@@ -155,6 +157,7 @@ public class RecordBatchSizer {
    * vectors are partially full; prevents overestimating row width.
    */
   private int netRowWidth;
+  private int netRowWidthCap50;
   private boolean hasSv2;
   private int sv2Size;
   private int avgDensity;
@@ -167,6 +170,18 @@ public class RecordBatchSizer {
          batch.getSelectionVector2() : null);
   }
 
+  /**
+   *  Maximum width of a column; used for memory estimation in case of Varchars
+   */
+  public int maxSize;
+  /**
+   *  Count the nullable columns; used for memory estimation
+   */
+  public int numNullables;
+  /**
+   *
+   * @param va
+   */
   public RecordBatchSizer(VectorAccessible va) {
     this(va, null);
   }
@@ -174,7 +189,9 @@ public class RecordBatchSizer {
   public RecordBatchSizer(VectorAccessible va, SelectionVector2 sv2) {
     rowCount = va.getRecordCount();
     for (VectorWrapper<?> vw : va) {
-      measureColumn(vw);
+      int size = measureColumn(vw.getValueVector());
+      if ( size > maxSize ) { maxSize = size; }
+      if ( vw.getField().isNullable() ) { numNullables++; }
     }
 
     if (rowCount > 0) {
@@ -208,32 +225,45 @@ public class RecordBatchSizer {
     totalBatchSize += sv2Size;
   }
 
-  private void measureColumn(VectorWrapper<?> vw) {
-    measureColumn(vw.getValueVector());
+  /**
+   *  Round up (if needed) to the next power of 2 (only up to 64)
+   * @param arg Number to round up (must be < 64)
+   * @return power of 2 result
+   */
+  private int roundUpToPowerOf2(int arg) {
+    if ( arg <= 2 ) { return 2; }
+    if ( arg <= 4 ) { return 4; }
+    if ( arg <= 8 ) { return 8; }
+    if ( arg <= 16 ) { return 16; }
+    if ( arg <= 32 ) { return 32; }
+    return 64;
   }
-
-  private void measureColumn(ValueVector v) {
-
+  private int measureColumn(ValueVector vv) {
     // Maps consume no size themselves. However, their contained
     // vectors do consume space, so visit columns recursively.
-
-    if (v.getField().getType().getMinorType() == MinorType.MAP) {
-      expandMap((AbstractMapVector) v);
-      return;
+    if (vv.getField().getType().getMinorType() == MinorType.MAP) {
+      return expandMap((AbstractMapVector) vv);
     }
-    ColumnSize colSize = new ColumnSize(v);
+
+    ColumnSize colSize = new ColumnSize(vv);
     columnSizes.add(colSize);
 
     stdRowWidth += colSize.stdSize;
     totalBatchSize += colSize.totalSize;
     netBatchSize += colSize.dataSize;
     netRowWidth += colSize.estSize;
+    netRowWidthCap50 += ! colSize.variableWidth ? colSize.estSize :
+        8 /* offset vector */ + roundUpToPowerOf2( Math.min(colSize.estSize,50) );
+        // above change 8 to 4 after DRILL-5446 is fixed
+    return colSize.estSize;
   }
 
-  private void expandMap(AbstractMapVector mapVector) {
+  private int expandMap(AbstractMapVector mapVector) {
+    int accum = 0;
     for (ValueVector vector : mapVector) {
-      measureColumn(vector);
+      accum += measureColumn(vector);
     }
+    return accum;
   }
 
   public static int roundUp(int num, int denom) {
@@ -247,10 +277,18 @@ public class RecordBatchSizer {
   public int stdRowWidth() { return stdRowWidth; }
   public int grossRowWidth() { return grossRowWidth; }
   public int netRowWidth() { return netRowWidth; }
+  /**
+   * Compute the "real" width of the row, taking into account each varchar column size
+   * (historically capped at 50, and rounded up to power of 2 to match drill buf allocation)
+   * and null marking columns.
+   * @return "real" width of the row
+   */
+  public int netRowWidthCap50() { return netRowWidthCap50 + numNullables; }
   public int actualSize() { return totalBatchSize; }
   public boolean hasSv2() { return hasSv2; }
   public int avgDensity() { return avgDensity; }
   public int netSize() { return netBatchSize; }
+  public int maxSize() { return maxSize; }
 
   public static final int MAX_VECTOR_SIZE = 16 * 1024 * 1024; // 16 MiB
 

http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/SpillSet.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/SpillSet.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/SpillSet.java
index 74e1fb5..87eebc6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/SpillSet.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/SpillSet.java
@@ -30,11 +30,13 @@ import java.util.List;
 import java.util.Set;
 
 import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.UserBitShared;
 import org.apache.drill.exec.proto.helper.QueryIdHelper;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
@@ -84,7 +86,7 @@ public class SpillSet {
      * Given a manager-specific input stream, return the current read position.
      * Used to report total read bytes.
      *
-     * @param outputStream input stream created by the file manager
+     * @param inputStream input stream created by the file manager
      * @return
      */
     long getReadBytes(InputStream inputStream);
@@ -346,7 +348,6 @@ public class SpillSet {
    */
 
   private final String spillDirName;
-  private final String spillFileName;
 
   private int fileCount = 0;
 
@@ -356,16 +357,34 @@ public class SpillSet {
 
   private long writeBytes;
 
-  public SpillSet(FragmentContext context, PhysicalOperator popConfig) {
-    this(context, popConfig, null, "spill");
-  }
-
-  public SpillSet(FragmentContext context, PhysicalOperator popConfig,
-                  String opName, String fileName) {
+  public SpillSet(FragmentContext context, PhysicalOperator popConfig, UserBitShared.CoreOperatorType optype) {
     FragmentHandle handle = context.getHandle();
+    String operName = "Unknown";
+
+    // Set the spill options from the configuration
     DrillConfig config = context.getConfig();
-    spillFileName = fileName;
-    List<String> dirList = config.getStringList(ExecConstants.EXTERNAL_SORT_SPILL_DIRS);
+    String spillFs;
+    List<String> dirList;
+
+    // Set the operator name (used as part of the spill file name),
+    // and set oper. specific options (the config file defaults to using the
+    // common options; users may override those - per operator)
+    switch (optype) {
+      case EXTERNAL_SORT:
+        operName = "Sort";
+        spillFs = config.getString(ExecConstants.EXTERNAL_SORT_SPILL_FILESYSTEM);
+        dirList = config.getStringList(ExecConstants.EXTERNAL_SORT_SPILL_DIRS);
+        break;
+      case HASH_AGGREGATE:
+        operName = "HashAgg";
+        spillFs = config.getString(ExecConstants.HASHAGG_SPILL_FILESYSTEM);
+        dirList = config.getStringList(ExecConstants.HASHAGG_SPILL_DIRS);
+        break;
+      default: // just use the common ones
+        spillFs = config.getString(ExecConstants.SPILL_FILESYSTEM);
+        dirList = config.getStringList(ExecConstants.SPILL_DIRS);
+    }
+
     dirs = Iterators.cycle(dirList);
 
     // If more than one directory, semi-randomly choose an offset into
@@ -386,23 +405,18 @@ public class SpillSet {
     // system is selected and impersonation is off. (We use that
     // as a proxy for a non-production Drill setup.)
 
-    String spillFs = config.getString(ExecConstants.EXTERNAL_SORT_SPILL_FILESYSTEM);
     boolean impersonationEnabled = config.getBoolean(ExecConstants.IMPERSONATION_ENABLED);
     if (spillFs.startsWith("file:///") && ! impersonationEnabled) {
       fileManager = new LocalFileManager(spillFs);
     } else {
       fileManager = new HadoopFileManager(spillFs);
     }
-    spillDirName = String.format(
-        "%s_major%d_minor%d_op%d%s",
-        QueryIdHelper.getQueryId(handle.getQueryId()),
-        handle.getMajorFragmentId(),
-        handle.getMinorFragmentId(),
-        popConfig.getOperatorId(),
-        (opName == null) ? "" : "_" + opName);
+
+    spillDirName = String.format("%s_%s_%s-%s_minor%s", QueryIdHelper.getQueryId(handle.getQueryId()),
+        operName, handle.getMajorFragmentId(), popConfig.getOperatorId(), handle.getMinorFragmentId());
   }
 
-  public String getNextSpillFile() {
+  public String getNextSpillFile(String extraName) {
 
     // Identify the next directory from the round-robin list to
     // the file created from this round of spilling. The directory must
@@ -411,7 +425,12 @@ public class SpillSet {
     String spillDir = dirs.next();
     String currSpillPath = Joiner.on("/").join(spillDir, spillDirName);
     currSpillDirs.add(currSpillPath);
-    String outputFile = Joiner.on("/").join(currSpillPath, spillFileName + ++fileCount);
+
+    String outputFile = Joiner.on("/").join(currSpillPath, "spill" + ++fileCount);
+    if ( extraName != null ) {
+      outputFile += "_" + extraName;
+    }
+
     try {
         fileManager.deleteOnExit(currSpillPath);
     } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
index 69e9b4c..4d5f290 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
@@ -39,6 +39,8 @@ import org.apache.drill.exec.physical.impl.xsort.MSortTemplate;
 import org.apache.drill.exec.physical.impl.xsort.SingleBatchSorter;
 import org.apache.drill.exec.physical.impl.xsort.managed.BatchGroup.InputBatch;
 import org.apache.drill.exec.physical.impl.xsort.managed.BatchGroup.SpilledRun;
+
+import org.apache.drill.exec.proto.UserBitShared;
 import org.apache.drill.exec.record.AbstractRecordBatch;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
@@ -399,7 +401,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
     allocator = oContext.getAllocator();
     opCodeGen = new OperatorCodeGenerator(context, popConfig);
 
-    spillSet = new SpillSet(context, popConfig, "sort", "run");
+    spillSet = new SpillSet(context, popConfig, UserBitShared.CoreOperatorType.EXTERNAL_SORT);
     copierHolder = new CopierHolder(context, allocator, opCodeGen);
     configure(context.getConfig());
   }
@@ -1390,7 +1392,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
     // spill file. After each write, we release the memory associated
     // with the just-written batch.
 
-    String outputFile = spillSet.getNextSpillFile();
+    String outputFile = spillSet.getNextSpillFile(null);
     stats.setLongStat(Metric.SPILL_COUNT, spillSet.getFileCount());
     BatchGroup.SpilledRun newGroup = null;
     try (AutoCloseable ignored = AutoCloseables.all(batchesToSpill);

http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java
index 732ff15..8c69930 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java
@@ -51,7 +51,7 @@ import java.util.List;
 
 public abstract class AggPrelBase extends DrillAggregateRelBase implements Prel {
 
-  protected static enum OperatorPhase {PHASE_1of1, PHASE_1of2, PHASE_2of2};
+  public static enum OperatorPhase {PHASE_1of1, PHASE_1of2, PHASE_2of2};
 
   protected OperatorPhase operPhase = OperatorPhase.PHASE_1of1 ; // default phase
   protected List<NamedExpression> keys = Lists.newArrayList();

http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
index b911f6b..460ee8a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
@@ -61,6 +61,9 @@ public abstract class AggPruleBase extends Prule {
   // currently won't generate a 2 phase plan.
   protected boolean create2PhasePlan(RelOptRuleCall call, DrillAggregateRel aggregate) {
     PlannerSettings settings = PrelUtil.getPlannerSettings(call.getPlanner());
+    if ( settings.isForce2phaseAggr() ) { // for testing - force 2 phase aggr
+      return true;
+    }
     RelNode child = call.rel(0).getInputs().get(0);
     boolean smallInput = child.getRows() < settings.getSliceTarget();
     if (! settings.isMultiPhaseAggEnabled() || settings.isSingleMode() || smallInput) {

http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java
index c382af6..09d33fd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java
@@ -73,7 +73,7 @@ public class HashAggPrel extends AggPrelBase implements Prel{
   public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException {
 
     Prel child = (Prel) this.getInput();
-    HashAggregate g = new HashAggregate(child.getPhysicalOperator(creator), keys, aggExprs, 1.0f);
+    HashAggregate g = new HashAggregate(child.getPhysicalOperator(creator), operPhase, keys, aggExprs, 1.0f);
 
     return creator.addMetadata(this, g);
 

http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
index 648adb7..15314ba 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
@@ -133,6 +133,9 @@ public class PlannerSettings implements Context{
      the need to turn off join optimization may go away.
    */
   public static final BooleanValidator JOIN_OPTIMIZATION = new BooleanValidator("planner.enable_join_optimization", true);
+  // for testing purpose
+  public static final String FORCE_2PHASE_AGGR_KEY = "planner.force_2phase_aggr";
+  public static final BooleanValidator FORCE_2PHASE_AGGR = new BooleanValidator(FORCE_2PHASE_AGGR_KEY, false);
 
   public OptionManager options = null;
   public FunctionImplementationRegistry functionImplementationRegistry = null;
@@ -274,6 +277,8 @@ public class PlannerSettings implements Context{
     return options.getOption(TYPE_INFERENCE);
   }
 
+  public boolean isForce2phaseAggr() { return options.getOption(FORCE_2PHASE_AGGR);} // for testing
+
   public long getInSubqueryThreshold() {
     return options.getOption(IN_SUBQUERY_THRESHOLD);
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
index 04cf8fc..0daa6b3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
@@ -34,7 +34,7 @@ import org.apache.drill.exec.record.selection.SelectionVector4;
  * <p>
  *   A key thing to know is that the Iterator provided by a record batch must
  *   align with the rank positions of the field IDs provided using
- *   {@link getValueVectorId}.
+ *   {@link #getValueVectorId}.
  * </p>
  */
 public interface RecordBatch extends VectorAccessible {

http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 8492f36..c2a4d65 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -93,6 +93,10 @@ public class SystemOptionManager extends BaseOptionManager implements OptionMana
       PlannerSettings.PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_THRESHOLD,
       PlannerSettings.QUOTING_IDENTIFIERS,
       PlannerSettings.JOIN_OPTIMIZATION,
+      PlannerSettings.FORCE_2PHASE_AGGR, // for testing
+      ExecConstants.HASHAGG_NUM_PARTITIONS_VALIDATOR,
+      ExecConstants.HASHAGG_MAX_MEMORY_VALIDATOR,
+      ExecConstants.HASHAGG_MIN_BATCHES_PER_PARTITION_VALIDATOR, // for tuning
       ExecConstants.CAST_TO_NULLABLE_NUMERIC_OPTION,
       ExecConstants.OUTPUT_FORMAT_VALIDATOR,
       ExecConstants.PARQUET_BLOCK_SIZE_VALIDATOR,

http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/util/MemoryAllocationUtilities.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/MemoryAllocationUtilities.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/MemoryAllocationUtilities.java
index d06424e..79b49e4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/MemoryAllocationUtilities.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/MemoryAllocationUtilities.java
@@ -26,7 +26,6 @@ import org.apache.drill.exec.memory.RootAllocatorFactory;
 import org.apache.drill.exec.ops.QueryContext;
 import org.apache.drill.exec.physical.PhysicalPlan;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.physical.config.ExternalSort;
 import org.apache.drill.exec.server.options.OptionManager;
 
 public class MemoryAllocationUtilities {
@@ -40,7 +39,7 @@ public class MemoryAllocationUtilities {
    * @param plan
    * @param queryContext
    */
-  public static void setupSortMemoryAllocations(final PhysicalPlan plan, final QueryContext queryContext) {
+  public static void setupBufferedOpsMemoryAllocations(final PhysicalPlan plan, final QueryContext queryContext) {
 
     // Test plans may already have a pre-defined memory plan.
     // Otherwise, determine memory allocation.
@@ -49,30 +48,30 @@ public class MemoryAllocationUtilities {
       return;
     }
     // look for external sorts
-    final List<ExternalSort> sortList = new LinkedList<>();
+    final List<PhysicalOperator> bufferedOpList = new LinkedList<>();
     for (final PhysicalOperator op : plan.getSortedOperators()) {
-      if (op instanceof ExternalSort) {
-        sortList.add((ExternalSort) op);
+      if ( op.isBufferedOperator() ) {
+        bufferedOpList.add(op);
       }
     }
 
     // if there are any sorts, compute the maximum allocation, and set it on them
-    if (sortList.size() > 0) {
+    if (bufferedOpList.size() > 0) {
       final OptionManager optionManager = queryContext.getOptions();
       final long maxWidthPerNode = optionManager.getOption(ExecConstants.MAX_WIDTH_PER_NODE_KEY).num_val;
       long maxAllocPerNode = Math.min(DrillConfig.getMaxDirectMemory(),
           queryContext.getConfig().getLong(RootAllocatorFactory.TOP_LEVEL_MAX_ALLOC));
       maxAllocPerNode = Math.min(maxAllocPerNode,
           optionManager.getOption(ExecConstants.MAX_QUERY_MEMORY_PER_NODE_KEY).num_val);
-      final long maxSortAlloc = maxAllocPerNode / (sortList.size() * maxWidthPerNode);
-      logger.debug("Max sort alloc: {}", maxSortAlloc);
+      final long maxOperatorAlloc = maxAllocPerNode / (bufferedOpList.size() * maxWidthPerNode);
+      logger.debug("Max buffered operator alloc: {}", maxOperatorAlloc);
 
-      for(final ExternalSort externalSort : sortList) {
+      for(final PhysicalOperator op : bufferedOpList) {
         // Ensure that the sort receives the minimum memory needed to make progress.
         // Without this, the math might work out to allocate too little memory.
 
-        long alloc = Math.max(maxSortAlloc, externalSort.getInitialAllocation());
-        externalSort.setMaxAllocation(alloc);
+        long alloc = Math.max(maxOperatorAlloc, op.getInitialAllocation());
+        op.setMaxAllocation(alloc);
       }
     }
     plan.getProperties().hasResourcePlan = true;

http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index 5e5fef0..62c2307 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -433,7 +433,7 @@ public class Foreman implements Runnable {
 
   private void runPhysicalPlan(final PhysicalPlan plan) throws ExecutionSetupException {
     validatePlan(plan);
-    MemoryAllocationUtilities.setupSortMemoryAllocations(plan, queryContext);
+    MemoryAllocationUtilities.setupBufferedOpsMemoryAllocations(plan, queryContext);
     //Marking endTime of Planning
     queryManager.markPlanningEndTime();
 

http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/PlanSplitter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/PlanSplitter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/PlanSplitter.java
index 7ffb224..2f945d8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/PlanSplitter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/PlanSplitter.java
@@ -97,7 +97,7 @@ public class PlanSplitter {
       throw new IllegalStateException("Planning fragments supports only SQL or PHYSICAL QueryType");
     }
 
-    MemoryAllocationUtilities.setupSortMemoryAllocations(plan, queryContext);
+    MemoryAllocationUtilities.setupBufferedOpsMemoryAllocations(plan, queryContext);
 
     final PhysicalOperator rootOperator = plan.getSortedOperators(false).iterator().next();
 

http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index c2a2bf0..8aedaf6 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -207,6 +207,33 @@ drill.exec: {
     // java ... -ea -Ddrill.exec.debug.validate_vectors=true ...
     validate_vectors: false
   },
+  spill: {
+    // *** Options common to all the operators that may spill
+    // File system to use. Local file system by default.
+    fs: "file:///",
+    // List of directories to use. Directories are created
+    // if they do not exist.
+    directories: [ "/tmp/drill/spill" ]
+  },
+  hashagg: {
+    // An internal tuning; should not be changed
+    min_batches_per_partition: 3,
+    // An option for testing - force a memory limit
+    mem_limit: 0,
+    // The max number of partitions in each hashagg operator
+    // This number is tuned down when memory is limited
+    // Setting it to 1 means: No spilling
+    num_partitions: 32,
+    spill: {
+        // -- The 2 options below can be used to override the common ones
+        // -- (common to all spilling operators)
+        // File system to use. Local file system by default.
+        fs: ${drill.exec.spill.fs},
+        // List of directories to use. Directories are created
+        // if they do not exist.
+        directories:  ${drill.exec.spill.directories},
+    }
+  },
   sort: {
     purge.threshold : 1000,
     external: {
@@ -232,11 +259,15 @@ drill.exec: {
         group.size: 40000,
         // Deprecated for managed xsort; used only by legacy xsort
         threshold: 40000,
+        // -- The two options below can be used to override the options common
+        // -- for all spilling operators (see "spill" above).
+        // -- This is done for backward compatibility; in the future they
+        // -- would be deprecated (you should be using only the common ones)
         // File system to use. Local file system by default.
-        fs: "file:///"
+        fs: ${drill.exec.spill.fs},
         // List of directories to use. Directories are created
         // if they do not exist.
-        directories: [ "/tmp/drill/spill" ],
+        directories:  ${drill.exec.spill.directories},
         // Size of the batches written to, and read from, the spill files.
         // Determines the ratio of memory to input data size for a single-
         // generation sort. Smaller values give larger ratios, but at a

http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/test/java/org/apache/drill/TestBugFixes.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestBugFixes.java b/exec/java-exec/src/test/java/org/apache/drill/TestBugFixes.java
index 27df710..1a4d63b 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestBugFixes.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestBugFixes.java
@@ -211,12 +211,13 @@ public class TestBugFixes extends BaseTestQuery {
     int limit = 65536;
     ImmutableList.Builder<Map<String, Object>> baselineBuilder = ImmutableList.builder();
     for (int i = 0; i < limit; i++) {
-      baselineBuilder.add(Collections.<String, Object>singletonMap("`id`", String.valueOf(i + 1)));
+      baselineBuilder.add(Collections.<String, Object>singletonMap("`id`", /*String.valueOf */ (i + 1)));
     }
     List<Map<String, Object>> baseline = baselineBuilder.build();
 
     testBuilder()
-            .sqlQuery(String.format("select id from dfs_test.`%s/bugs/DRILL-4884/limit_test_parquet/test0_0_0.parquet` group by id limit %s", TEST_RES_PATH, limit))
+            .sqlQuery(String.format("select cast(id as int) as id from dfs_test.`%s/bugs/DRILL-4884/limit_test_parquet/test0_0_0.parquet` group by id order by 1 limit %s",
+                TEST_RES_PATH, limit))
             .unOrdered()
             .baselineRecords(baseline)
             .go();

http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributedConcurrent.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributedConcurrent.java b/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributedConcurrent.java
index 66b7571..f15e757 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributedConcurrent.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributedConcurrent.java
@@ -46,7 +46,7 @@ import static org.junit.Assert.assertNull;
  * any particular order of execution. We ignore the results.
  */
 public class TestTpchDistributedConcurrent extends BaseTestQuery {
-  @Rule public final TestRule TIMEOUT = TestTools.getTimeoutRule(140000); // Longer timeout than usual.
+  @Rule public final TestRule TIMEOUT = TestTools.getTimeoutRule(180000); // Longer timeout than usual.
 
   /*
    * Valid test names taken from TestTpchDistributed. Fuller path prefixes are

http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggrSpill.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggrSpill.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggrSpill.java
new file mode 100644
index 0000000..fe6fcbc
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggrSpill.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.physical.impl.agg;
+
+import ch.qos.logback.classic.Level;
+import org.apache.drill.BaseTestQuery;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.physical.impl.aggregate.HashAggTemplate;
+import org.apache.drill.exec.planner.physical.PlannerSettings;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.test.ClientFixture;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.FixtureBuilder;
+import org.apache.drill.test.LogFixture;
+import org.apache.drill.test.ProfileParser;
+import org.apache.drill.test.QueryBuilder;
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ *  Test spilling for the Hash Aggr operator (using the mock reader)
+ */
+public class TestHashAggrSpill extends BaseTestQuery {
+
+    private void runAndDump(ClientFixture client, String sql, long expectedRows, long spillCycle, long spilledPartitions) throws Exception {
+        String plan = client.queryBuilder().sql(sql).explainJson();
+
+        QueryBuilder.QuerySummary summary = client.queryBuilder().sql(sql).run();
+        if ( expectedRows > 0 ) {
+            assertEquals(expectedRows, summary.recordCount());
+        }
+        // System.out.println(String.format("======== \n Results: %,d records, %d batches, %,d ms\n ========", summary.recordCount(), summary.batchCount(), summary.runTimeMs() ) );
+
+        //System.out.println("Query ID: " + summary.queryIdString());
+        ProfileParser profile = client.parseProfile(summary.queryIdString());
+        //profile.print();
+        List<ProfileParser.OperatorProfile> ops = profile.getOpsOfType(UserBitShared.CoreOperatorType.HASH_AGGREGATE_VALUE);
+
+        assertTrue( ! ops.isEmpty() );
+        // check for the first op only
+        ProfileParser.OperatorProfile hag0 = ops.get(0);
+        long opCycle = hag0.getMetric(HashAggTemplate.Metric.SPILL_CYCLE.ordinal());
+        assertEquals(spillCycle, opCycle);
+        long op_spilled_partitions = hag0.getMetric(HashAggTemplate.Metric.SPILLED_PARTITIONS.ordinal());
+        assertEquals(spilledPartitions, op_spilled_partitions);
+        /* assertEquals(3, ops.size());
+        for ( int i = 0; i < ops.size(); i++ ) {
+            ProfileParser.OperatorProfile hag = ops.get(i);
+            long cycle = hag.getMetric(HashAggTemplate.Metric.SPILL_CYCLE.ordinal());
+            long num_partitions = hag.getMetric(HashAggTemplate.Metric.NUM_PARTITIONS.ordinal());
+            long spilled_partitions = hag.getMetric(HashAggTemplate.Metric.SPILLED_PARTITIONS.ordinal());
+            long mb_spilled = hag.getMetric(HashAggTemplate.Metric.SPILL_MB.ordinal());
+            System.out.println(String.format("(%d) Spill cycle: %d, num partitions: %d, spilled partitions: %d, MB spilled: %d", i,cycle, num_partitions, spilled_partitions,
+                mb_spilled));
+        } */
+    }
+
+    /**
+     * Test "normal" spilling: Only 2 partitions (out of 4) would require spilling
+     * ("normal spill" means spill-cycle = 1 )
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testHashAggrSpill() throws Exception {
+        LogFixture.LogFixtureBuilder logBuilder = LogFixture.builder()
+            .toConsole()
+            .logger("org.apache.drill.exec.physical.impl.aggregate", Level.DEBUG)
+            ;
+
+        FixtureBuilder builder = ClusterFixture.builder()
+            .configProperty(ExecConstants.HASHAGG_MAX_MEMORY,74_000_000)
+            .configProperty(ExecConstants.HASHAGG_NUM_PARTITIONS,16)
+            .configProperty(ExecConstants.HASHAGG_MIN_BATCHES_PER_PARTITION,3)
+            .sessionOption(PlannerSettings.FORCE_2PHASE_AGGR_KEY,true)
+            // .sessionOption(PlannerSettings.EXCHANGE.getOptionName(), true)
+            .maxParallelization(2)
+            .saveProfiles()
+            //.keepLocalFiles()
+            ;
+        try (LogFixture logs = logBuilder.build();
+             ClusterFixture cluster = builder.build();
+             ClientFixture client = cluster.clientFixture()) {
+            String sql = "SELECT empid_s17, dept_i, branch_i, AVG(salary_i) FROM `mock`.`employee_1200K` GROUP BY empid_s17, dept_i, branch_i";
+            runAndDump(client, sql, 1_200_000, 1, 1);
+        }
+    }
+
+    /**
+     *  Test "secondary" spilling -- Some of the spilled partitions cause more spilling as they are read back
+     *  (Hence spill-cycle = 2 )
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testHashAggrSecondaryTertiarySpill() throws Exception {
+        LogFixture.LogFixtureBuilder logBuilder = LogFixture.builder()
+            .toConsole()
+            .logger("org.apache.drill.exec.physical.impl.aggregate", Level.DEBUG)
+            .logger("org.apache.drill.exec.cache", Level.INFO)
+            ;
+
+        FixtureBuilder builder = ClusterFixture.builder()
+            .configProperty(ExecConstants.HASHAGG_MAX_MEMORY,58_000_000)
+            .configProperty(ExecConstants.HASHAGG_NUM_PARTITIONS,16)
+            .configProperty(ExecConstants.HASHAGG_MIN_BATCHES_PER_PARTITION,3)
+            .sessionOption(PlannerSettings.FORCE_2PHASE_AGGR_KEY,true)
+            .sessionOption(PlannerSettings.STREAMAGG.getOptionName(),false)
+            // .sessionOption(PlannerSettings.EXCHANGE.getOptionName(), true)
+            .maxParallelization(1)
+            .saveProfiles()
+            //.keepLocalFiles()
+            ;
+        try (LogFixture logs = logBuilder.build();
+             ClusterFixture cluster = builder.build();
+             ClientFixture client = cluster.clientFixture()) {
+            String sql = "SELECT empid_s44, dept_i, branch_i, AVG(salary_i) FROM `mock`.`employee_1100K` GROUP BY empid_s44, dept_i, branch_i";
+            runAndDump(client, sql, 1_100_000, 3, 2);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/BasicPhysicalOpUnitTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/BasicPhysicalOpUnitTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/BasicPhysicalOpUnitTest.java
index e39a644..66588b1 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/BasicPhysicalOpUnitTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/BasicPhysicalOpUnitTest.java
@@ -34,6 +34,7 @@ import org.apache.drill.exec.physical.config.MergingReceiverPOP;
 import org.apache.drill.exec.physical.config.Project;
 import org.apache.drill.exec.physical.config.StreamingAggregate;
 import org.apache.drill.exec.physical.config.TopN;
+import org.apache.drill.exec.planner.physical.AggPrelBase;
 import org.junit.Ignore;
 import org.junit.Test;
 
@@ -125,7 +126,7 @@ public class BasicPhysicalOpUnitTest extends PhysicalOpUnitTestBase {
 
   @Test
   public void testSimpleHashAgg() {
-    HashAggregate aggConf = new HashAggregate(null, parseExprs("a", "a"), parseExprs("sum(b)", "b_sum"), 1.0f);
+    HashAggregate aggConf = new HashAggregate(null, AggPrelBase.OperatorPhase.PHASE_1of1, parseExprs("a", "a"), parseExprs("sum(b)", "b_sum"), 1.0f);
     List<String> inputJsonBatches = Lists.newArrayList(
         "[{\"a\": 5, \"b\" : 1 }]",
         "[{\"a\": 5, \"b\" : 5},{\"a\": 3, \"b\" : 8}]");

http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/jdbc/pom.xml
----------------------------------------------------------------------
diff --git a/exec/jdbc/pom.xml b/exec/jdbc/pom.xml
index cb0c517..eecbdfa 100644
--- a/exec/jdbc/pom.xml
+++ b/exec/jdbc/pom.xml
@@ -119,6 +119,7 @@
             <exclude>**/.checkstyle</exclude>
             <exclude>**/.buildpath</exclude>
             <exclude>**/*.json</exclude>
+            <exclude>**/*.iml</exclude>
             <exclude>**/git.properties</exclude>
             <exclude>**/donuts-output-data.txt</exclude>
             <exclude>**/*.tbl</exclude>

http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
----------------------------------------------------------------------
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcBus.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
index aa713f8..deed7a7 100644
--- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
@@ -192,8 +192,8 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp
           r.pBody, r.dBodies);
       if (RpcConstants.EXTRA_DEBUGGING) {
         logger.debug("Adding message to outbound buffer. {}", outMessage);
+        logger.debug("Sending response with Sender {}", System.identityHashCode(this));
       }
-      logger.debug("Sending response with Sender {}", System.identityHashCode(this));
       connection.getChannel().writeAndFlush(outMessage);
     }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/vector/src/main/codegen/templates/VariableLengthVectors.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/codegen/templates/VariableLengthVectors.java b/exec/vector/src/main/codegen/templates/VariableLengthVectors.java
index 105ea47..581a9f8 100644
--- a/exec/vector/src/main/codegen/templates/VariableLengthVectors.java
+++ b/exec/vector/src/main/codegen/templates/VariableLengthVectors.java
@@ -377,6 +377,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
       throw new OversizedAllocationException("Unable to expand the buffer. Max allowed buffer size is reached.");
     }
 
+    logger.trace("Reallocating VarChar, new size {}",newAllocationSize);
     final DrillBuf newBuf = allocator.buffer((int)newAllocationSize);
     newBuf.setBytes(0, data, 0, data.capacity());
     data.release();


Mime
View raw message