drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j..@apache.org
Subject [19/27] drill git commit: DRILL-5284: Roll-up of final fixes for managed sort
Date Thu, 02 Mar 2017 20:59:46 GMT
DRILL-5284: Roll-up of final fixes for managed sort

See subtasks for details.

* Provide detailed, accurate estimate of size consumed by a record batch
* Managed external sort spills too often with Parquet data
* Managed External Sort fails with OOM
* External sort refers to the deprecated HDFS fs.default.name param
* Config param drill.exec.sort.external.batch.size is not used
* NPE in managed external sort while spilling to disk
* External Sort BatchGroup leaks memory if an OOM occurs during read
* DRILL-5294: Under certain low-memory conditions, need to force the sort to merge
two batches to make progress, even though this is a bit more than
comfortably fits into memory.

close #761


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/79811db5
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/79811db5
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/79811db5

Branch: refs/heads/master
Commit: 79811db5aa8c7f2cdbe6f74c0a40124bea9fb1fd
Parents: 69de3a1
Author: Paul Rogers <progers@maprtech.com>
Authored: Fri Feb 24 10:31:25 2017 -0800
Committer: Jinfeng Ni <jni@apache.org>
Committed: Wed Mar 1 23:15:34 2017 -0800

----------------------------------------------------------------------
 .../org/apache/drill/exec/ExecConstants.java    |   3 +-
 .../impl/sort/SortRecordBatchBuilder.java       |   7 +-
 .../physical/impl/spill/RecordBatchSizer.java   |  97 ++-----
 .../exec/physical/impl/spill/SpillSet.java      |  52 +++-
 .../exec/physical/impl/xsort/BatchGroup.java    |   2 +-
 .../physical/impl/xsort/managed/BatchGroup.java |  30 +-
 .../impl/xsort/managed/ExternalSortBatch.java   | 291 ++++++++++++-------
 .../drill/exec/record/SimpleVectorWrapper.java  |  10 +
 .../codegen/templates/FixedValueVectors.java    | 149 +++++-----
 .../codegen/templates/NullableValueVectors.java |  30 +-
 .../src/main/codegen/templates/UnionVector.java |  16 +
 .../templates/VariableLengthVectors.java        |  19 ++
 .../drill/exec/vector/BaseDataValueVector.java  |   5 +
 .../org/apache/drill/exec/vector/BitVector.java |   6 +
 .../apache/drill/exec/vector/ObjectVector.java  |  12 +
 .../apache/drill/exec/vector/ValueVector.java   |  12 +
 .../drill/exec/vector/VariableWidthVector.java  |   4 +-
 .../apache/drill/exec/vector/ZeroVector.java    |  10 +
 .../exec/vector/complex/AbstractMapVector.java  |  22 +-
 .../vector/complex/BaseRepeatedValueVector.java |  12 +-
 .../drill/exec/vector/complex/ListVector.java   |  10 +
 .../exec/vector/complex/RepeatedListVector.java |  10 +
 .../exec/vector/complex/RepeatedMapVector.java  |   5 +
 23 files changed, 525 insertions(+), 289 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/79811db5/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 460702a..60d6265 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -66,7 +66,6 @@ public interface ExecConstants {
 
   // External Sort Boot configuration
 
-  String EXTERNAL_SORT_TARGET_BATCH_SIZE = "drill.exec.sort.external.batch.size";
   String EXTERNAL_SORT_TARGET_SPILL_BATCH_SIZE = "drill.exec.sort.external.spill.batch.size";
   String EXTERNAL_SORT_SPILL_GROUP_SIZE = "drill.exec.sort.external.spill.group.size";
   String EXTERNAL_SORT_SPILL_THRESHOLD = "drill.exec.sort.external.spill.threshold";
@@ -79,6 +78,8 @@ public interface ExecConstants {
   String EXTERNAL_SORT_SPILL_BATCH_SIZE = "drill.exec.sort.external.spill.spill_batch_size";
   String EXTERNAL_SORT_MERGE_BATCH_SIZE = "drill.exec.sort.external.spill.merge_batch_size";
   String EXTERNAL_SORT_MAX_MEMORY = "drill.exec.sort.external.mem_limit";
+
+  // Used only by the "unmanaged" sort.
   String EXTERNAL_SORT_BATCH_LIMIT = "drill.exec.sort.external.batch_limit";
 
   // External Sort Runtime options

http://git-wip-us.apache.org/repos/asf/drill/blob/79811db5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
index 33338dd..d46990f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
@@ -238,14 +238,15 @@ public class SortRecordBatchBuilder implements AutoCloseable {
   }
 
   /**
-   * For given recordcount how muchmemory does SortRecordBatchBuilder needs for its own purpose. This is used in
+   * For given record count how much memory does SortRecordBatchBuilder needs for its own purpose. This is used in
    * ExternalSortBatch to make decisions about whether to spill or not.
    *
    * @param recordCount
    * @return
    */
   public static long memoryNeeded(int recordCount) {
-    // We need 4 bytes (SV4) for each record.
-    return recordCount * 4;
+    // We need 4 bytes (SV4) for each record. Due to power-of-two allocations, the
+    // backing buffer might be twice this size.
+    return recordCount * 2 * 4;
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/79811db5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/RecordBatchSizer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/RecordBatchSizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/RecordBatchSizer.java
index 05354e5..22b1b0e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/RecordBatchSizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/RecordBatchSizer.java
@@ -27,14 +27,7 @@ import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.record.selection.SelectionVector2;
-import org.apache.drill.exec.vector.BaseDataValueVector;
-import org.apache.drill.exec.vector.FixedWidthVector;
-import org.apache.drill.exec.vector.NullableVarCharVector;
-import org.apache.drill.exec.vector.NullableVector;
 import org.apache.drill.exec.vector.ValueVector;
-import org.apache.drill.exec.vector.VarCharVector;
-
-import io.netty.buffer.DrillBuf;
 
 /**
  * Given a record batch or vector container, determines the actual memory
@@ -42,7 +35,7 @@ import io.netty.buffer.DrillBuf;
  */
 
 public class RecordBatchSizer {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RecordBatchSizer.class);
+//  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RecordBatchSizer.class);
 
   /**
    * Column size information.
@@ -53,23 +46,22 @@ public class RecordBatchSizer {
     /**
      * Assumed size from Drill metadata.
      */
+
     public int stdSize;
+
     /**
      * Actual memory consumed by all the vectors associated with this column.
      */
+
     public int totalSize;
+
     /**
      * Actual average column width as determined from actual memory use. This
      * size is larger than the actual data size since this size includes per-
      * column overhead such as any unused vector space, etc.
      */
-    public int estSize;
 
-    /**
-     * The size of the data vector backing the column. Useful for detecting
-     * cases of possible direct memory fragmentation.
-     */
-    public int dataVectorSize;
+    public int estSize;
     public int capacity;
     public int density;
     public int dataSize;
@@ -86,26 +78,21 @@ public class RecordBatchSizer {
       if (rowCount == 0) {
         return;
       }
-      DrillBuf[] bufs = v.getBuffers(false);
-      for (DrillBuf buf : bufs) {
-        totalSize += buf.capacity();
-      }
+
+      // Total size taken by all vectors (and underlying buffers)
+      // associated with this vector.
+
+      totalSize = v.getAllocatedByteCount();
 
       // Capacity is the number of values that the vector could
       // contain. This is useful only for fixed-length vectors.
 
       capacity = v.getValueCapacity();
 
-      // Crude way to get the size of the buffer underlying simple (scalar) values.
-      // Ignores maps, lists and other esoterica. Uses a crude way to subtract out
-      // the null "bit" (really byte) buffer size for nullable vectors.
+      // The amount of memory consumed by the payload: the actual
+      // data stored in the vectors.
 
-      if (v instanceof BaseDataValueVector) {
-        dataVectorSize = totalSize;
-        if (v instanceof NullableVector) {
-          dataVectorSize -= bufs[0].getActualMemoryConsumed();
-        }
-      }
+      dataSize = v.getPayloadByteCount();
 
       // Determine "density" the number of rows compared to potential
       // capacity. Low-density batches occur at block boundaries, ends
@@ -113,26 +100,9 @@ public class RecordBatchSizer {
       // for Varchar columns because we don't know the actual number of
       // bytes consumed (that information is hidden behind the Varchar
       // implementation where we can't get at it.)
-      //
-      // A better solution is to have each vector do this calc rather
-      // than trying to do it generically, but that increases the code
-      // change footprint and slows the commit process.
-
-      if (v instanceof FixedWidthVector) {
-        dataSize = stdSize * rowCount;
-      } else if ( v instanceof VarCharVector ) {
-        VarCharVector vv = (VarCharVector) v;
-        dataSize = vv.getOffsetVector().getAccessor().get(rowCount);
-      } else if ( v instanceof NullableVarCharVector ) {
-        NullableVarCharVector vv = (NullableVarCharVector) v;
-        dataSize = vv.getValuesVector().getOffsetVector().getAccessor().get(rowCount);
-      } else {
-        dataSize = 0;
-      }
-      if (dataSize > 0) {
-        density = roundUp(dataSize * 100, dataVectorSize);
-        estSize = roundUp(dataSize, rowCount);
-      }
+
+      density = roundUp(dataSize * 100, totalSize);
+      estSize = roundUp(dataSize, rowCount);
     }
 
     @Override
@@ -145,8 +115,6 @@ public class RecordBatchSizer {
       buf.append(estSize);
       buf.append(", total size: ");
       buf.append(totalSize);
-      buf.append(", vector size: ");
-      buf.append(dataVectorSize);
       buf.append(", data size: ");
       buf.append(dataSize);
       buf.append(", row capacity: ");
@@ -187,10 +155,12 @@ public class RecordBatchSizer {
   private int sv2Size;
   private int avgDensity;
 
+  private int netBatchSize;
+
   public RecordBatchSizer(VectorAccessible va) {
     rowCount = va.getRecordCount();
     for (VectorWrapper<?> vw : va) {
-      measureField(vw);
+      measureColumn(vw);
     }
 
     if (rowCount > 0) {
@@ -201,8 +171,8 @@ public class RecordBatchSizer {
     if (hasSv2) {
       @SuppressWarnings("resource")
       SelectionVector2 sv2 = va.getSelectionVector2();
-      sv2Size = sv2.getBuffer().capacity();
-      grossRowWidth += sv2Size;
+      sv2Size = sv2.getBuffer(false).capacity();
+      grossRowWidth += sv2Size / rowCount;
       netRowWidth += 2;
     }
 
@@ -227,12 +197,13 @@ public class RecordBatchSizer {
     totalBatchSize += sv2Size;
   }
 
-  private void measureField(VectorWrapper<?> vw) {
+  private void measureColumn(VectorWrapper<?> vw) {
     ColumnSize colSize = new ColumnSize(vw);
     columnSizes.add(colSize);
 
     stdRowWidth += colSize.stdSize;
     totalBatchSize += colSize.totalSize;
+    netBatchSize += colSize.dataSize;
     netRowWidth += colSize.estSize;
   }
 
@@ -249,27 +220,11 @@ public class RecordBatchSizer {
   public int netRowWidth() { return netRowWidth; }
   public int actualSize() { return totalBatchSize; }
   public boolean hasSv2() { return hasSv2; }
-  public int getAvgDensity() { return avgDensity; }
+  public int avgDensity() { return avgDensity; }
+  public int netSize() { return netBatchSize; }
 
   public static final int MAX_VECTOR_SIZE = 16 * 1024 * 1024; // 16 MiB
 
-  /**
-   * Look for columns backed by vectors larger than the 16 MiB size
-   * employed by the Netty allocator. Such large blocks can lead to
-   * memory fragmentation and unexpected OOM errors.
-   * @return true if any column is oversized
-   */
-  public boolean checkOversizeCols() {
-    boolean hasOversize = false;
-    for (ColumnSize colSize : columnSizes) {
-      if ( colSize.dataVectorSize > MAX_VECTOR_SIZE) {
-        logger.warn( "Column is wider than 256 characters: OOM due to memory fragmentation is possible - " + colSize.metadata.getPath() );
-        hasOversize = true;
-      }
-    }
-    return hasOversize;
-  }
-
   @Override
   public String toString() {
     StringBuilder buf = new StringBuilder();

http://git-wip-us.apache.org/repos/asf/drill/blob/79811db5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/SpillSet.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/SpillSet.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/SpillSet.java
index 4615500..74e1fb5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/SpillSet.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/SpillSet.java
@@ -26,6 +26,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Set;
 
 import org.apache.drill.common.config.DrillConfig;
@@ -105,7 +106,7 @@ public class SpillSet {
 
     protected HadoopFileManager(String fsName) {
       Configuration conf = new Configuration();
-      conf.set("fs.default.name", fsName);
+      conf.set(FileSystem.FS_DEFAULT_NAME_KEY, fsName);
       try {
         fs = FileSystem.get(conf);
       } catch (IOException e) {
@@ -169,6 +170,12 @@ public class SpillSet {
     }
   }
 
+  /**
+   * Wrapper around an input stream to collect the total bytes
+   * read through the stream for use in reporting performance
+   * metrics.
+   */
+
   public static class CountingInputStream extends InputStream
   {
     private InputStream in;
@@ -218,6 +225,12 @@ public class SpillSet {
     public long getCount() { return count; }
   }
 
+  /**
+   * Wrapper around an output stream to collect the total bytes
+   * written through the stream for use in reporting performance
+   * metrics.
+   */
+
   public static class CountingOutputStream extends OutputStream {
 
     private OutputStream out;
@@ -333,6 +346,7 @@ public class SpillSet {
    */
 
   private final String spillDirName;
+  private final String spillFileName;
 
   private int fileCount = 0;
 
@@ -343,8 +357,30 @@ public class SpillSet {
   private long writeBytes;
 
   public SpillSet(FragmentContext context, PhysicalOperator popConfig) {
+    this(context, popConfig, null, "spill");
+  }
+
+  public SpillSet(FragmentContext context, PhysicalOperator popConfig,
+                  String opName, String fileName) {
+    FragmentHandle handle = context.getHandle();
     DrillConfig config = context.getConfig();
-    dirs = Iterators.cycle(config.getStringList(ExecConstants.EXTERNAL_SORT_SPILL_DIRS));
+    spillFileName = fileName;
+    List<String> dirList = config.getStringList(ExecConstants.EXTERNAL_SORT_SPILL_DIRS);
+    dirs = Iterators.cycle(dirList);
+
+    // If more than one directory, semi-randomly choose an offset into
+    // the list to avoid overloading the first directory in the list.
+
+    if (dirList.size() > 1) {
+      int hash = handle.getQueryId().hashCode() +
+                 handle.getMajorFragmentId() +
+                 handle.getMinorFragmentId() +
+                 popConfig.getOperatorId();
+      int offset = hash % dirList.size();
+      for (int i = 0; i < offset; i++) {
+        dirs.next();
+      }
+    }
 
     // Use the high-performance local file system if the local file
     // system is selected and impersonation is off. (We use that
@@ -357,9 +393,13 @@ public class SpillSet {
     } else {
       fileManager = new HadoopFileManager(spillFs);
     }
-    FragmentHandle handle = context.getHandle();
-    spillDirName = String.format("%s_major%s_minor%s_op%s", QueryIdHelper.getQueryId(handle.getQueryId()),
-        handle.getMajorFragmentId(), handle.getMinorFragmentId(), popConfig.getOperatorId());
+    spillDirName = String.format(
+        "%s_major%d_minor%d_op%d%s",
+        QueryIdHelper.getQueryId(handle.getQueryId()),
+        handle.getMajorFragmentId(),
+        handle.getMinorFragmentId(),
+        popConfig.getOperatorId(),
+        (opName == null) ? "" : "_" + opName);
   }
 
   public String getNextSpillFile() {
@@ -371,7 +411,7 @@ public class SpillSet {
     String spillDir = dirs.next();
     String currSpillPath = Joiner.on("/").join(spillDir, spillDirName);
     currSpillDirs.add(currSpillPath);
-    String outputFile = Joiner.on("/").join(currSpillPath, "spill" + ++fileCount);
+    String outputFile = Joiner.on("/").join(currSpillPath, spillFileName + ++fileCount);
     try {
         fileManager.deleteOnExit(currSpillPath);
     } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/drill/blob/79811db5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java
index 0a818ee..13f0dbe 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java
@@ -113,7 +113,7 @@ public class BatchGroup implements VectorAccessible, AutoCloseable {
     if (schema != null) {
       c = SchemaUtil.coerceContainer(c, schema, context);
     }
-//    logger.debug("Took {} us to read {} records", watch.elapsed(TimeUnit.MICROSECONDS), c.getRecordCount());
+    logger.trace("Took {} us to read {} records", watch.elapsed(TimeUnit.MICROSECONDS), c.getRecordCount());
     spilledBatches--;
     currentContainer.zeroVectors();
     Iterator<VectorWrapper<?>> wrapperIterator = c.iterator();

http://git-wip-us.apache.org/repos/asf/drill/blob/79811db5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BatchGroup.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BatchGroup.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BatchGroup.java
index cd5cd1f..7ea599c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BatchGroup.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BatchGroup.java
@@ -75,17 +75,21 @@ public abstract class BatchGroup implements VectorAccessible, AutoCloseable {
    */
 
   public static class InputBatch extends BatchGroup {
-    private SelectionVector2 sv2;
+    private final SelectionVector2 sv2;
+    private final int dataSize;
 
-    public InputBatch(VectorContainer container, SelectionVector2 sv2, OperatorContext context, long batchSize) {
-      super(container, context, batchSize);
+    public InputBatch(VectorContainer container, SelectionVector2 sv2, OperatorContext context, int dataSize) {
+      super(container, context);
       this.sv2 = sv2;
+      this.dataSize = dataSize;
     }
 
     public SelectionVector2 getSv2() {
       return sv2;
     }
 
+    public int getDataSize() { return dataSize; }
+
     @Override
     public int getRecordCount() {
       if (sv2 != null) {
@@ -148,8 +152,8 @@ public abstract class BatchGroup implements VectorAccessible, AutoCloseable {
     private BufferAllocator allocator;
     private int spilledBatches = 0;
 
-    public SpilledRun(SpillSet spillSet, String path, OperatorContext context, long batchSize) throws IOException {
-      super(null, context, batchSize);
+    public SpilledRun(SpillSet spillSet, String path, OperatorContext context) throws IOException {
+      super(null, context);
       this.spillSet = spillSet;
       this.path = path;
       this.allocator = context.getAllocator();
@@ -275,25 +279,23 @@ public abstract class BatchGroup implements VectorAccessible, AutoCloseable {
       if (outputStream == null) {
         return 0;
       }
-      long posn = spillSet.getPosition(outputStream);
-      spillSet.tallyWriteBytes(posn);
+      long writeSize = spillSet.getPosition(outputStream);
+      spillSet.tallyWriteBytes(writeSize);
       outputStream.close();
       outputStream = null;
-      logger.trace("Summary: Wrote {} bytes to {}", posn, path);
-      return posn;
+      logger.trace("Summary: Wrote {} bytes to {}", writeSize, path);
+      return writeSize;
     }
   }
 
   protected VectorContainer currentContainer;
   protected int pointer = 0;
-  protected OperatorContext context;
+  protected final OperatorContext context;
   protected BatchSchema schema;
-  protected long dataSize;
 
-  public BatchGroup(VectorContainer container, OperatorContext context, long dataSize) {
+  public BatchGroup(VectorContainer container, OperatorContext context) {
     this.currentContainer = container;
     this.context = context;
-    this.dataSize = dataSize;
   }
 
   /**
@@ -348,8 +350,6 @@ public abstract class BatchGroup implements VectorAccessible, AutoCloseable {
     return currentContainer.getRecordCount();
   }
 
-  public long getDataSize() { return dataSize; }
-
   @Override
   public Iterator<VectorWrapper<?>> iterator() {
     return currentContainer.iterator();

http://git-wip-us.apache.org/repos/asf/drill/blob/79811db5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
index 783865c..a1162a0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
@@ -200,6 +200,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
   public static final String INTERRUPTION_AFTER_SORT = "after-sort";
   public static final String INTERRUPTION_AFTER_SETUP = "after-setup";
   public static final String INTERRUPTION_WHILE_SPILLING = "spilling";
+  public static final String INTERRUPTION_WHILE_MERGING = "merging";
   public static final long DEFAULT_SPILL_BATCH_SIZE = 8L * 1024 * 1024;
   public static final long MIN_SPILL_BATCH_SIZE = 256 * 1024;
 
@@ -219,6 +220,11 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
 
   private BatchSchema schema;
 
+  /**
+   * Incoming batches buffered in memory prior to spilling
+   * or an in-memory merge.
+   */
+
   private LinkedList<BatchGroup.InputBatch> bufferedBatches = Lists.newLinkedList();
   private LinkedList<BatchGroup.SpilledRun> spilledRuns = Lists.newLinkedList();
   private SelectionVector4 sv4;
@@ -231,6 +237,12 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
   private int mergeBatchRowCount;
   private int peakNumBatches = -1;
 
+  /**
+   * Maximum memory this operator may use. Usually comes from the
+   * operator definition, but may be overridden by a configuration
+   * parameter for unit testing.
+   */
+
   private long memoryLimit;
 
   /**
@@ -280,28 +292,65 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
   private long estimatedInputBatchSize;
 
   /**
-   * Maximum number of batches to hold in memory.
-   * (Primarily for testing.)
+   * Maximum number of spilled runs that can be merged in a single pass.
    */
 
-  private int bufferedBatchLimit;
   private int mergeLimit;
+
+  /**
+   * Target size of the first-generation spill files.
+   */
   private long spillFileSize;
+
+  /**
+   * Tracks the minimum amount of remaining memory for use
+   * in populating an operator metric.
+   */
+
   private long minimumBufferSpace;
 
   /**
-   * Minimum memory level before spilling occurs. That is, we can buffer input
-   * batches in memory until we are down to the level given by the spill point.
+   * Maximum memory level before spilling occurs. That is, we can buffer input
+   * batches in memory until we reach the level given by the buffer memory pool.
+   */
+
+  private long bufferMemoryPool;
+
+  /**
+   * Maximum memory that can hold batches during the merge
+   * phase.
    */
 
-  private long spillPoint;
   private long mergeMemoryPool;
+
+  /**
+   * The target size for merge batches sent downstream.
+   */
+
   private long preferredMergeBatchSize;
-  private long bufferMemoryPool;
-  private boolean hasOversizeCols;
+
+  /**
+   * Sum of the total number of bytes read from upstream.
+   * This is the raw memory bytes, not actual data bytes.
+   */
+
   private long totalInputBytes;
-  private Long spillBatchSize;
+
+  /**
+   * The configured size for each spill batch.
+   */
+  private Long preferredSpillBatchSize;
+
+  /**
+   * Tracks the maximum density of input batches. Density is
+   * the amount of actual data / amount of memory consumed.
+   * Low density batches indicate an EOF or something wrong in
+   * an upstream operator because a low-density batch wastes
+   * memory.
+   */
+
   private int maxDensity;
+  private int lastDensity = -1;
 
   /**
    * Estimated number of rows that fit into a single spill batch.
@@ -309,6 +358,13 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
 
   private int spillBatchRowCount;
 
+  /**
+   * The estimated actual spill batch size which depends on the
+   * details of the data rows for any particular query.
+   */
+
+  private int targetSpillBatchSize;
+
   // WARNING: The enum here is used within this class. But, the members of
   // this enum MUST match those in the (unmanaged) ExternalSortBatch since
   // that is the enum used in the UI to display metrics for the query profile.
@@ -349,7 +405,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
     allocator = oContext.getAllocator();
     opCodeGen = new OperatorCodeGenerator(context, popConfig);
 
-    spillSet = new SpillSet(context, popConfig);
+    spillSet = new SpillSet(context, popConfig, "sort", "run");
     copierHolder = new CopierHolder(context, allocator, opCodeGen);
     configure(context.getConfig());
   }
@@ -368,12 +424,6 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
       memoryLimit = Math.min(memoryLimit, configLimit);
     }
 
-    // Optional limit on the number of buffered in-memory batches.
-    // 0 means no limit. Used primarily for testing. Must allow at least two
-    // batches or no merging can occur.
-
-    bufferedBatchLimit = getConfigLimit(config, ExecConstants.EXTERNAL_SORT_BATCH_LIMIT, Integer.MAX_VALUE, 2);
-
     // Optional limit on the number of spilled runs to merge in a single
     // pass. Limits the number of open file handles. Must allow at least
     // two batches to merge to make progress.
@@ -392,8 +442,17 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
     // Set too large and the ratio between memory and input data sizes becomes
     // small. Set too small and disk seek times dominate performance.
 
-    spillBatchSize = config.getBytes(ExecConstants.EXTERNAL_SORT_SPILL_BATCH_SIZE);
-    spillBatchSize = Math.max(spillBatchSize, MIN_SPILL_BATCH_SIZE);
+    preferredSpillBatchSize = config.getBytes(ExecConstants.EXTERNAL_SORT_SPILL_BATCH_SIZE);
+
+    // In low memory, use no more than 1/4 of memory for each spill batch. Ensures we
+    // can merge.
+
+    preferredSpillBatchSize = Math.min(preferredSpillBatchSize, memoryLimit / 4);
+
+    // But, the spill batch should be above some minimum size to prevent complete
+    // thrashing.
+
+    preferredSpillBatchSize = Math.max(preferredSpillBatchSize, MIN_SPILL_BATCH_SIZE);
 
     // Set the target output batch size. Use the maximum size, but only if
     // this represents less than 10% of available memory. Otherwise, use 10%
@@ -401,13 +460,13 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
     // output batch can contain no fewer than a single record.
 
     preferredMergeBatchSize = config.getBytes(ExecConstants.EXTERNAL_SORT_MERGE_BATCH_SIZE);
-    long maxAllowance = (long) (memoryLimit * MERGE_BATCH_ALLOWANCE);
+    long maxAllowance = (long) (memoryLimit - 2 * preferredSpillBatchSize);
     preferredMergeBatchSize = Math.min(maxAllowance, preferredMergeBatchSize);
     preferredMergeBatchSize = Math.max(preferredMergeBatchSize, MIN_MERGED_BATCH_SIZE);
 
-    logger.debug("Config: memory limit = {}, batch limit = {}, " +
-                 "spill file size = {}, batch size = {}, merge limit = {}, merge batch size = {}",
-                  memoryLimit, bufferedBatchLimit, spillFileSize, spillBatchSize, mergeLimit,
+    logger.debug("Config: memory limit = {}, " +
+                 "spill file size = {}, spill batch size = {}, merge limit = {}, merge batch size = {}",
+                  memoryLimit, spillFileSize, preferredSpillBatchSize, mergeLimit,
                   preferredMergeBatchSize);
   }
 
@@ -513,11 +572,21 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
 
   private IterOutcome nextOutputBatch() {
     if (resultsIterator.next()) {
+      injector.injectUnchecked(context.getExecutionControls(), INTERRUPTION_WHILE_MERGING);
       return IterOutcome.OK;
     } else {
       logger.trace("Deliver phase complete: Returned {} batches, {} records",
                     resultsIterator.getBatchCount(), resultsIterator.getRecordCount());
       sortState = SortState.DONE;
+
+      // Close the iterator here to release any remaining resources such
+      // as spill files. This is important when a query has a join: the
+      // first branch sort may complete before the second branch starts;
+      // it may be quite a while after returning the last row before the
+      // fragment executor calls this opeator's close method.
+
+      resultsIterator.close();
+      resultsIterator = null;
       return IterOutcome.NONE;
     }
   }
@@ -561,11 +630,11 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
       // out of memory and that no work as in-flight and thus abandoned.
       // Consider removing this case once resource management is in place.
 
-      logger.debug("received OUT_OF_MEMORY, trying to spill");
+      logger.error("received OUT_OF_MEMORY, trying to spill");
       if (bufferedBatches.size() > 2) {
         spillFromMemory();
       } else {
-        logger.debug("not enough batches to spill, sending OUT_OF_MEMORY downstream");
+        logger.error("not enough batches to spill, sending OUT_OF_MEMORY downstream");
         return IterOutcome.OUT_OF_MEMORY;
       }
       break;
@@ -693,9 +762,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
     // Coerce all existing batches to the new schema.
 
     for (BatchGroup b : bufferedBatches) {
-//      System.out.println("Before: " + allocator.getAllocatedMemory()); // Debug only
       b.setSchema(schema);
-//      System.out.println("After: " + allocator.getAllocatedMemory()); // Debug only
     }
     for (BatchGroup b : spilledRuns) {
       b.setSchema(schema);
@@ -765,12 +832,12 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
       spillFromMemory();
     }
 
-    // Sanity check. We should now be above the spill point.
+    // Sanity check. We should now be below the buffer memory maximum.
 
     long startMem = allocator.getAllocatedMemory();
-    if (memoryLimit - startMem < spillPoint) {
-      logger.error( "ERROR: Failed to spill below the spill point. Spill point = {}, free memory = {}",
-                    spillPoint, memoryLimit - startMem);
+    if (startMem > bufferMemoryPool) {
+      logger.error( "ERROR: Failed to spill above buffer limit. Buffer pool = {}, memory = {}",
+          bufferMemoryPool, startMem);
     }
 
     // Convert the incoming batch to the agreed-upon schema.
@@ -835,7 +902,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
     RecordBatchData rbd = new RecordBatchData(convertedBatch, allocator);
     try {
       rbd.setSv2(sv2);
-      bufferedBatches.add(new BatchGroup.InputBatch(rbd.getContainer(), rbd.getSv2(), oContext, batchSize));
+      bufferedBatches.add(new BatchGroup.InputBatch(rbd.getContainer(), rbd.getSv2(), oContext, sizer.netSize()));
       if (peakNumBatches < bufferedBatches.size()) {
         peakNumBatches = bufferedBatches.size();
         stats.setLongStat(Metric.PEAK_BATCHES_IN_MEMORY, peakNumBatches);
@@ -857,9 +924,6 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
   private RecordBatchSizer analyzeIncomingBatch() {
     RecordBatchSizer sizer = new RecordBatchSizer(incoming);
     sizer.applySv2();
-    if (! hasOversizeCols) {
-      hasOversizeCols = sizer.checkOversizeCols();
-    }
     if (inputBatchCount == 0) {
       logger.debug("{}", sizer.toString());
     }
@@ -887,7 +951,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
     long actualBatchSize = sizer.actualSize();
     int actualRecordCount = sizer.rowCount();
 
-    if (actualBatchSize < memoryDelta) {
+    if (actualBatchSize != memoryDelta) {
       logger.debug("Memory delta: {}, actual batch size: {}, Diff: {}",
                    memoryDelta, actualBatchSize, memoryDelta - actualBatchSize);
     }
@@ -905,11 +969,12 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
     // We actually track the max density seen, and compare to 75% of that since
     // Parquet produces very low density record batches.
 
-    if (sizer.getAvgDensity() < maxDensity * 0.75) {
-      logger.debug("Saw low density batch. Density: {}", sizer.getAvgDensity());
+    if (sizer.avgDensity() < maxDensity * 3 / 4 && sizer.avgDensity() != lastDensity) {
+      logger.trace("Saw low density batch. Density: {}", sizer.avgDensity());
+      lastDensity = sizer.avgDensity();
       return;
     }
-    maxDensity = Math.max(maxDensity, sizer.getAvgDensity());
+    maxDensity = Math.max(maxDensity, sizer.avgDensity());
 
     // We know the batch size and number of records. Use that to estimate
     // the average record size. Since a typical batch has many records,
@@ -934,6 +999,14 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
     long origInputBatchSize = estimatedInputBatchSize;
     estimatedInputBatchSize = Math.max(estimatedInputBatchSize, actualBatchSize);
 
+    // The row width may end up as zero if all fields are nulls or some
+    // other unusual situation. In this case, assume a width of 10 just
+    // to avoid lots of special case code.
+
+    if (estimatedRowWidth == 0) {
+      estimatedRowWidth = 10;
+    }
+
     // Go no further if nothing changed.
 
     if (estimatedRowWidth == origRowEstimate && estimatedInputBatchSize == origInputBatchSize) {
@@ -948,16 +1021,23 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
     // spill batches of either 64K records, or as many records as fit into the
     // amount of memory dedicated to each spill batch, whichever is less.
 
-    spillBatchRowCount = (int) Math.max(1, spillBatchSize / estimatedRowWidth);
+    spillBatchRowCount = (int) Math.max(1, preferredSpillBatchSize / estimatedRowWidth / 2);
     spillBatchRowCount = Math.min(spillBatchRowCount, Character.MAX_VALUE);
 
+    // Compute the actual spill batch size which may be larger or smaller
+    // than the preferred size depending on the row width. Double the estimated
+    // memory needs to allow for power-of-two rounding.
+
+    targetSpillBatchSize = spillBatchRowCount * estimatedRowWidth * 2;
+
     // Determine the number of records per batch per merge step. The goal is to
     // merge batches of either 64K records, or as many records as fit into the
     // amount of memory dedicated to each merge batch, whichever is less.
 
-    targetMergeBatchSize = preferredMergeBatchSize;
-    mergeBatchRowCount = (int) Math.max(1, targetMergeBatchSize / estimatedRowWidth);
+    mergeBatchRowCount = (int) Math.max(1, preferredMergeBatchSize / estimatedRowWidth / 2);
     mergeBatchRowCount = Math.min(mergeBatchRowCount, Character.MAX_VALUE);
+    mergeBatchRowCount = Math.max(1,  mergeBatchRowCount);
+    targetMergeBatchSize = mergeBatchRowCount * estimatedRowWidth * 2;
 
     // Determine the minimum memory needed for spilling. Spilling is done just
     // before accepting a batch, so we must spill if we don't have room for a
@@ -965,33 +1045,27 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
     // by merging the batches already in memory. Double this to allow for power-of-two
     // memory allocations.
 
-    spillPoint = estimatedInputBatchSize + 2 * spillBatchSize;
+    long spillPoint = estimatedInputBatchSize + 2 * targetSpillBatchSize;
 
     // The merge memory pool assumes we can spill all input batches. To make
     // progress, we must have at least two merge batches (same size as an output
     // batch) and one output batch. Again, double to allow for power-of-two
     // allocation and add one for a margin of error.
 
-    int minMergeBatches = 2 * 3 + 1;
-    long minMergeMemory = minMergeBatches * targetMergeBatchSize;
+    long minMergeMemory = 2 * targetSpillBatchSize + targetMergeBatchSize;
 
     // If we are in a low-memory condition, then we might not have room for the
     // default output batch size. In that case, pick a smaller size.
 
-    long minMemory = Math.max(spillPoint, minMergeMemory);
-    if (minMemory > memoryLimit) {
+    if (minMergeMemory > memoryLimit) {
 
-      // Figure out the minimum output batch size based on memory, but can't be
-      // any smaller than the defined minimum.
+      // Figure out the minimum output batch size based on memory,
+      // must hold at least one complete row.
 
-      targetMergeBatchSize = Math.max(MIN_MERGED_BATCH_SIZE, memoryLimit / minMergeBatches);
-
-      // Regardless of anything else, the batch must hold at least one
-      // complete row.
-
-      targetMergeBatchSize = Math.max(estimatedRowWidth, targetMergeBatchSize);
-      spillPoint = estimatedInputBatchSize + 2 * spillBatchSize;
-      minMergeMemory = minMergeBatches * targetMergeBatchSize;
+      long mergeAllowance = memoryLimit - 2 * targetSpillBatchSize;
+      targetMergeBatchSize = Math.max(estimatedRowWidth, mergeAllowance / 2);
+      mergeBatchRowCount = (int) (targetMergeBatchSize / estimatedRowWidth / 2);
+      minMergeMemory = 2 * targetSpillBatchSize + targetMergeBatchSize;
     }
 
     // Determine the minimum total memory we would need to receive two input
@@ -1004,7 +1078,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
     // runs when reading from disk.
 
     bufferMemoryPool = memoryLimit - spillPoint;
-    mergeMemoryPool = Math.max(minMergeMemory,
+    mergeMemoryPool = Math.max(memoryLimit - minMergeMemory,
                                (long) ((memoryLimit - 3 * targetMergeBatchSize) * 0.95));
 
     // Sanity check: if we've been given too little memory to make progress,
@@ -1021,14 +1095,14 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
     // Log the calculated values. Turn this on if things seem amiss.
     // Message will appear only when the values change.
 
-    logger.debug("Memory Estimates: record size = {} bytes; input batch = {} bytes, {} records; " +
-                  "merge batch size = {} bytes, {} records; " +
-                  "output batch size = {} bytes, {} records; " +
-                  "Available memory: {}, spill point = {}, min. merge memory = {}",
-                estimatedRowWidth, estimatedInputBatchSize, actualRecordCount,
-                spillBatchSize, spillBatchRowCount,
-                targetMergeBatchSize, mergeBatchRowCount,
-                memoryLimit, spillPoint, minMergeMemory);
+    logger.debug("Input Batch Estimates: record size = {} bytes; input batch = {} bytes, {} records",
+                 estimatedRowWidth, estimatedInputBatchSize, actualRecordCount);
+    logger.debug("Merge batch size = {} bytes, {} records; spill file size: {} bytes",
+                 targetSpillBatchSize, spillBatchRowCount, spillFileSize);
+    logger.debug("Output batch size = {} bytes, {} records",
+                 targetMergeBatchSize, mergeBatchRowCount);
+    logger.debug("Available memory: {}, buffer memory = {}, merge memory = {}",
+                 memoryLimit, bufferMemoryPool, mergeMemoryPool);
   }
 
   /**
@@ -1050,14 +1124,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
     // Must spill if we are below the spill point (the amount of memory
     // needed to do the minimal spill.)
 
-    if (allocator.getAllocatedMemory() + incomingSize >= bufferMemoryPool) {
-      return true; }
-
-    // For test purposes, configuration may have set a limit on the number of
-    // batches in memory. Spill if we exceed this limit. (By default the number
-    // of in-memory batches is unlimited.)
-
-    return bufferedBatches.size() > bufferedBatchLimit;
+    return allocator.getAllocatedMemory() + incomingSize >= bufferMemoryPool;
   }
 
   /**
@@ -1068,8 +1135,8 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
    */
 
   private IterOutcome sortInMemory() {
-    logger.info("Starting in-memory sort. Batches = {}, Records = {}, Memory = {}",
-                bufferedBatches.size(), inputRecordCount, allocator.getAllocatedMemory());
+    logger.debug("Starting in-memory sort. Batches = {}, Records = {}, Memory = {}",
+                 bufferedBatches.size(), inputRecordCount, allocator.getAllocatedMemory());
 
     // Note the difference between how we handle batches here and in the spill/merge
     // case. In the spill/merge case, this class decides on the batch size to send
@@ -1088,8 +1155,8 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
         sortState = SortState.DONE;
         return IterOutcome.STOP;
       } else {
-        logger.info("Completed in-memory sort. Memory = {}",
-                allocator.getAllocatedMemory());
+        logger.debug("Completed in-memory sort. Memory = {}",
+                     allocator.getAllocatedMemory());
         resultsIterator = memoryMerge;
         memoryMerge = null;
         sortState = SortState.DELIVER;
@@ -1111,9 +1178,9 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
    */
 
   private IterOutcome mergeSpilledRuns() {
-    logger.info("Starting consolidate phase. Batches = {}, Records = {}, Memory = {}, In-memory batches {}, spilled runs {}",
-                inputBatchCount, inputRecordCount, allocator.getAllocatedMemory(),
-                bufferedBatches.size(), spilledRuns.size());
+    logger.debug("Starting consolidate phase. Batches = {}, Records = {}, Memory = {}, In-memory batches {}, spilled runs {}",
+                 inputBatchCount, inputRecordCount, allocator.getAllocatedMemory(),
+                 bufferedBatches.size(), spilledRuns.size());
 
     // Consolidate batches to a number that can be merged in
     // a single last pass.
@@ -1132,7 +1199,8 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
     allBatches.addAll(spilledRuns);
     spilledRuns.clear();
 
-    logger.info("Starting merge phase. Runs = {}, Alloc. memory = {}", allBatches.size(), allocator.getAllocatedMemory());
+    logger.debug("Starting merge phase. Runs = {}, Alloc. memory = {}",
+                 allBatches.size(), allocator.getAllocatedMemory());
 
     // Do the final merge as a results iterator.
 
@@ -1153,9 +1221,13 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
 
     // Can't merge more than will fit into memory at one time.
 
-    int maxMergeWidth = (int) (mergeMemoryPool / targetMergeBatchSize);
+    int maxMergeWidth = (int) (mergeMemoryPool / targetSpillBatchSize);
     maxMergeWidth = Math.min(mergeLimit, maxMergeWidth);
 
+    // But, must merge at least two batches.
+
+    maxMergeWidth = Math.max(maxMergeWidth, 2);
+
     // If we can't fit all batches in memory, must spill any in-memory
     // batches to make room for multiple spill-merge-spill cycles.
 
@@ -1177,7 +1249,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
       // is available, spill some in-memory batches.
 
       long allocated = allocator.getAllocatedMemory();
-      long totalNeeds = spilledRunsCount * targetMergeBatchSize + allocated;
+      long totalNeeds = spilledRunsCount * targetSpillBatchSize + allocated;
       if (totalNeeds > mergeMemoryPool) {
         spillFromMemory();
         return true;
@@ -1231,7 +1303,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
    * This method spills only half the accumulated batches
    * minimizing unnecessary disk writes. The exact count must lie between
    * the minimum and maximum spill counts.
-    */
+   */
 
   private void spillFromMemory() {
 
@@ -1239,30 +1311,29 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
     // of the desired size. The actual file size might be a bit larger
     // or smaller than the target, which is expected.
 
-    long estSize = 0;
     int spillCount = 0;
+    long spillSize = 0;
     for (InputBatch batch : bufferedBatches) {
-      estSize += batch.getDataSize();
-      if (estSize > spillFileSize) {
-        break; }
+      long batchSize = batch.getDataSize();
+      spillSize += batchSize;
       spillCount++;
+      if (spillSize + batchSize / 2 > spillFileSize) {
+        break; }
     }
 
-    // Should not happen, but just to be sure...
+    // Must always spill at least 2, even if this creates an over-size
+    // spill file. But, if this is a final consolidation, we may have only
+    // a single batch.
 
-    if (spillCount == 0) {
-      return; }
+    spillCount = Math.max(spillCount, 2);
+    spillCount = Math.min(spillCount, bufferedBatches.size());
 
     // Do the actual spill.
 
-    logger.trace("Starting spill from memory. Memory = {}, Buffered batch count = {}, Spill batch count = {}",
-                 allocator.getAllocatedMemory(), bufferedBatches.size(), spillCount);
     mergeAndSpill(bufferedBatches, spillCount);
   }
 
   private void mergeAndSpill(LinkedList<? extends BatchGroup> source, int count) {
-    if (count == 0) {
-      return; }
     spilledRuns.add(doMergeAndSpill(source, count));
   }
 
@@ -1270,13 +1341,8 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
     List<BatchGroup> batchesToSpill = Lists.newArrayList();
     spillCount = Math.min(batchGroups.size(), spillCount);
     assert spillCount > 0 : "Spill count to mergeAndSpill must not be zero";
-    long spillSize = 0;
     for (int i = 0; i < spillCount; i++) {
-      @SuppressWarnings("resource")
-      BatchGroup batch = batchGroups.pollFirst();
-      assert batch != null : "Encountered a null batch during merge and spill operation";
-      batchesToSpill.add(batch);
-      spillSize += batch.getDataSize();
+      batchesToSpill.add(batchGroups.pollFirst());
     }
 
     // Merge the selected set of matches and write them to the
@@ -1288,8 +1354,11 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
     BatchGroup.SpilledRun newGroup = null;
     try (AutoCloseable ignored = AutoCloseables.all(batchesToSpill);
          CopierHolder.BatchMerger merger = copierHolder.startMerge(schema, batchesToSpill, spillBatchRowCount)) {
-      logger.trace("Merging and spilling to {}", outputFile);
-      newGroup = new BatchGroup.SpilledRun(spillSet, outputFile, oContext, spillSize);
+      logger.trace("Spilling {} of {} batches, {} rows, memory = {}, write to {}",
+                   batchesToSpill.size(), bufferedBatches.size() + batchesToSpill.size(),
+                   spillBatchRowCount,
+                   allocator.getAllocatedMemory(), outputFile);
+      newGroup = new BatchGroup.SpilledRun(spillSet, outputFile, oContext);
 
       // The copier will merge records from the buffered batches into
       // the outputContainer up to targetRecordCount number of rows.
@@ -1298,8 +1367,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
       while (merger.next()) {
 
         // Add a new batch of records (given by merger.getOutput()) to the spill
-        // file, opening the file if not yet open, and creating the target
-        // directory if it does not yet exist.
+        // file.
         //
         // note that addBatch also clears the merger's output container
 
@@ -1322,7 +1390,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
       // It will release the memory in the close() call.
 
       try {
-        // Rethrow so we can organize how to handle the error.
+        // Rethrow so we can decide how to handle the error.
 
         throw e;
       }
@@ -1444,11 +1512,12 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
     } catch (RuntimeException e) {
       ex = (ex == null) ? e : ex;
     }
-    try {
-      allocator.close();
-    } catch (RuntimeException e) {
-      ex = (ex == null) ? e : ex;
-    }
+    // Note: allocator is closed by the FragmentManager
+//    try {
+//      allocator.close();
+//    } catch (RuntimeException e) {
+//      ex = (ex == null) ? e : ex;
+//    }
     if (ex != null) {
       throw ex;
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/79811db5/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java
index 49562af..0a9f3d6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java
@@ -78,6 +78,7 @@ public class SimpleVectorWrapper<T extends ValueVector> implements VectorWrapper
   }
 
 
+  @SuppressWarnings("resource")
   @Override
   public VectorWrapper<?> getChildWrapper(int[] ids) {
     if (ids.length == 1) {
@@ -108,4 +109,13 @@ public class SimpleVectorWrapper<T extends ValueVector> implements VectorWrapper
     vector.makeTransferPair(((SimpleVectorWrapper<?>)destination).vector).transfer();
   }
 
+  @Override
+  public String toString() {
+    if (vector == null) {
+      return "null";
+    } else {
+      return vector.toString();
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/79811db5/exec/vector/src/main/codegen/templates/FixedValueVectors.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/codegen/templates/FixedValueVectors.java b/exec/vector/src/main/codegen/templates/FixedValueVectors.java
index eb0d616..b2a5dc3 100644
--- a/exec/vector/src/main/codegen/templates/FixedValueVectors.java
+++ b/exec/vector/src/main/codegen/templates/FixedValueVectors.java
@@ -69,7 +69,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
 
   @Override
   public int getValueCapacity(){
-    return (int) (data.capacity() *1.0 / ${type.width});
+    return data.capacity() / ${type.width};
   }
 
   @Override
@@ -196,7 +196,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
     data = buffer.slice(0, actualLength);
     data.retain(1);
     data.writerIndex(actualLength);
-    }
+  }
 
   public TransferPair getTransferPair(BufferAllocator allocator){
     return new TransferImpl(getField(), allocator);
@@ -227,6 +227,11 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
     target.data.writerIndex(sliceLength);
   }
 
+  @Override
+  public int getPayloadByteCount() {
+    return getAccessor().getValueCount() * ${type.width};
+  }
+
   private class TransferImpl implements TransferPair{
     private ${minor.class}Vector to;
 
@@ -390,7 +395,6 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
       return p.plusDays(days).plusMillis(millis);
     }
 
-
     public StringBuilder getAsStringBuilder(int index) {
       final int offsetIndex = index * ${type.width};
 
@@ -539,6 +543,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
     public ${friendlyType} getObject(int index) {
       return get(index);
     }
+
     public ${minor.javaType!type.javaType} getPrimitiveObject(int index) {
       return get(index);
     }
@@ -557,9 +562,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
       holder.isSet = 1;
       holder.value = data.get${(minor.javaType!type.javaType)?cap_first}(index * ${type.width});
     }
-
-
-   </#if> <#-- type.width -->
+    </#if> <#-- type.width -->
  }
 
  /**
@@ -728,84 +731,84 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
    }
 
    <#else> <#-- type.width <= 8 -->
-   public void set(int index, <#if (type.width >= 4)>${minor.javaType!type.javaType}<#else>int</#if> value) {
-     data.set${(minor.javaType!type.javaType)?cap_first}(index * ${type.width}, value);
-   }
+    public void set(int index, <#if (type.width >= 4)>${minor.javaType!type.javaType}<#else>int</#if> value) {
+      data.set${(minor.javaType!type.javaType)?cap_first}(index * ${type.width}, value);
+    }
 
    public void setSafe(int index, <#if (type.width >= 4)>${minor.javaType!type.javaType}<#else>int</#if> value) {
      while(index >= getValueCapacity()) {
-       reAlloc();
-     }
-     set(index, value);
-   }
-
-   protected void set(int index, ${minor.class}Holder holder){
-     data.set${(minor.javaType!type.javaType)?cap_first}(index * ${type.width}, holder.value);
-   }
+        reAlloc();
+      }
+      set(index, value);
+    }
 
-   public void setSafe(int index, ${minor.class}Holder holder){
-     while(index >= getValueCapacity()) {
-       reAlloc();
-     }
-     set(index, holder);
-   }
+    protected void set(int index, ${minor.class}Holder holder){
+      data.set${(minor.javaType!type.javaType)?cap_first}(index * ${type.width}, holder.value);
+    }
 
-   protected void set(int index, Nullable${minor.class}Holder holder){
-     data.set${(minor.javaType!type.javaType)?cap_first}(index * ${type.width}, holder.value);
-   }
+    public void setSafe(int index, ${minor.class}Holder holder){
+      while(index >= getValueCapacity()) {
+        reAlloc();
+      }
+      set(index, holder);
+    }
 
-   public void setSafe(int index, Nullable${minor.class}Holder holder){
-     while(index >= getValueCapacity()) {
-       reAlloc();
-     }
-     set(index, holder);
-   }
+    protected void set(int index, Nullable${minor.class}Holder holder){
+      data.set${(minor.javaType!type.javaType)?cap_first}(index * ${type.width}, holder.value);
+    }
 
-   @Override
-   public void generateTestData(int size) {
-     setValueCount(size);
-     boolean even = true;
-     final int valueCount = getAccessor().getValueCount();
-     for(int i = 0; i < valueCount; i++, even = !even) {
-       if(even){
-         set(i, ${minor.boxedType!type.boxedType}.MIN_VALUE);
-       }else{
-         set(i, ${minor.boxedType!type.boxedType}.MAX_VALUE);
-       }
-     }
-   }
+    public void setSafe(int index, Nullable${minor.class}Holder holder){
+      while(index >= getValueCapacity()) {
+        reAlloc();
+      }
+      set(index, holder);
+    }
 
-   public void generateTestDataAlt(int size) {
-     setValueCount(size);
-     boolean even = true;
-     final int valueCount = getAccessor().getValueCount();
-     for(int i = 0; i < valueCount; i++, even = !even) {
-       if(even){
-         set(i, (${(minor.javaType!type.javaType)}) 1);
-       }else{
-         set(i, (${(minor.javaType!type.javaType)}) 0);
-       }
-     }
-   }
+    @Override
+    public void generateTestData(int size) {
+      setValueCount(size);
+      boolean even = true;
+      final int valueCount = getAccessor().getValueCount();
+      for(int i = 0; i < valueCount; i++, even = !even) {
+        if(even) {
+          set(i, ${minor.boxedType!type.boxedType}.MIN_VALUE);
+        } else {
+          set(i, ${minor.boxedType!type.boxedType}.MAX_VALUE);
+        }
+      }
+    }
+
+    public void generateTestDataAlt(int size) {
+      setValueCount(size);
+      boolean even = true;
+      final int valueCount = getAccessor().getValueCount();
+      for(int i = 0; i < valueCount; i++, even = !even) {
+        if(even) {
+          set(i, (${(minor.javaType!type.javaType)}) 1);
+        } else {
+          set(i, (${(minor.javaType!type.javaType)}) 0);
+        }
+      }
+    }
 
   </#if> <#-- type.width -->
 
-   @Override
-   public void setValueCount(int valueCount) {
-     final int currentValueCapacity = getValueCapacity();
-     final int idx = (${type.width} * valueCount);
-     while(valueCount > getValueCapacity()) {
-       reAlloc();
-     }
-     if (valueCount > 0 && currentValueCapacity > valueCount * 2) {
-       incrementAllocationMonitor();
-     } else if (allocationMonitor > 0) {
-       allocationMonitor = 0;
-     }
-     VectorTrimmer.trim(data, idx);
-     data.writerIndex(valueCount * ${type.width});
-   }
- }
+    @Override
+    public void setValueCount(int valueCount) {
+      final int currentValueCapacity = getValueCapacity();
+      final int idx = (${type.width} * valueCount);
+      while(valueCount > getValueCapacity()) {
+        reAlloc();
+      }
+      if (valueCount > 0 && currentValueCapacity > valueCount * 2) {
+        incrementAllocationMonitor();
+      } else if (allocationMonitor > 0) {
+        allocationMonitor = 0;
+      }
+      VectorTrimmer.trim(data, idx);
+      data.writerIndex(valueCount * ${type.width});
+    }
+  }
 }
 
 </#if> <#-- type.major -->

http://git-wip-us.apache.org/repos/asf/drill/blob/79811db5/exec/vector/src/main/codegen/templates/NullableValueVectors.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/codegen/templates/NullableValueVectors.java b/exec/vector/src/main/codegen/templates/NullableValueVectors.java
index 6c0a16b..b242728 100644
--- a/exec/vector/src/main/codegen/templates/NullableValueVectors.java
+++ b/exec/vector/src/main/codegen/templates/NullableValueVectors.java
@@ -45,12 +45,24 @@ package org.apache.drill.exec.vector;
  * NB: this class is automatically generated from ${.template_name} and ValueVectorTypes.tdd using FreeMarker.
  */
 @SuppressWarnings("unused")
-public final class ${className} extends BaseDataValueVector implements <#if type.major == "VarLen">VariableWidth<#else>FixedWidth</#if>Vector, NullableVector{
+public final class ${className} extends BaseDataValueVector implements <#if type.major == "VarLen">VariableWidth<#else>FixedWidth</#if>Vector, NullableVector {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(${className}.class);
 
   private final FieldReader reader = new Nullable${minor.class}ReaderImpl(Nullable${minor.class}Vector.this);
 
   private final MaterializedField bitsField = MaterializedField.create("$bits$", Types.required(MinorType.UINT1));
+
+  /**
+   * Set value flag. Meaning:
+   * <ul>
+   * <li>0: value is not set (value is null).</li>
+   * <li>1: value is set (value is not null).</li>
+   * </ul>
+   * That is, a 1 means that the values vector has a value. 0
+   * means that the vector is null. Thus, all values start as
+   * not set (null) and must be explicitly set (made not null).
+   */
+
   private final UInt1Vector bits = new UInt1Vector(bitsField, allocator);
   private final ${valuesName} values = new ${minor.class}Vector(field, allocator);
 
@@ -108,8 +120,8 @@ public final class ${className} extends BaseDataValueVector implements <#if type
       return 0;
     }
 
-    return values.getBufferSizeFor(valueCount)
-        + bits.getBufferSizeFor(valueCount);
+    return values.getBufferSizeFor(valueCount) +
+           bits.getBufferSizeFor(valueCount);
   }
 
   @Override
@@ -163,6 +175,18 @@ public final class ${className} extends BaseDataValueVector implements <#if type
     return success;
   }
 
+  @Override
+  public int getAllocatedByteCount() {
+    return bits.getAllocatedByteCount() + values.getAllocatedByteCount();
+  }
+
+  @Override
+  public int getPayloadByteCount() {
+    // For nullable, we include all values, null or not, in computing
+    // the value length.
+    return bits.getPayloadByteCount() + values.getPayloadByteCount();
+  }
+
   <#if type.major == "VarLen">
   @Override
   public void allocateNew(int totalBytes, int valueCount) {

http://git-wip-us.apache.org/repos/asf/drill/blob/79811db5/exec/vector/src/main/codegen/templates/UnionVector.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/codegen/templates/UnionVector.java b/exec/vector/src/main/codegen/templates/UnionVector.java
index f80bb25..93854e7 100644
--- a/exec/vector/src/main/codegen/templates/UnionVector.java
+++ b/exec/vector/src/main/codegen/templates/UnionVector.java
@@ -202,6 +202,22 @@ public class UnionVector implements ValueVector {
   }
 
   @Override
+  public int getAllocatedByteCount() {
+    // Most vectors are held inside the internal map.
+
+    int count = internalMap.getAllocatedByteCount();
+    if (bit != null) {
+      count += bit.getAllocatedByteCount();
+    }
+    return count;
+  }
+
+  @Override
+  public int getPayloadByteCount() {
+    return internalMap.getPayloadByteCount();
+  }
+
+  @Override
   public TransferPair getTransferPair(BufferAllocator allocator) {
     return new TransferImpl(field, allocator);
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/79811db5/exec/vector/src/main/codegen/templates/VariableLengthVectors.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/codegen/templates/VariableLengthVectors.java b/exec/vector/src/main/codegen/templates/VariableLengthVectors.java
index 27432d2..ea3c9de 100644
--- a/exec/vector/src/main/codegen/templates/VariableLengthVectors.java
+++ b/exec/vector/src/main/codegen/templates/VariableLengthVectors.java
@@ -238,6 +238,25 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
     return true;
   }
 
+  @Override
+  public int getAllocatedByteCount() {
+    return offsetVector.getAllocatedByteCount() + super.getAllocatedByteCount();
+  }
+
+  @Override
+  public int getPayloadByteCount() {
+    UInt${type.width}Vector.Accessor a = offsetVector.getAccessor();
+    int count = a.getValueCount();
+    if (count == 0) {
+      return 0;
+    } else {
+      // If 1 or more values, then the last value is set to
+      // the offset of the next value, which is the same as
+      // the length of existing values.
+      return a.get(count-1);
+    }
+  }
+
   private class TransferImpl implements TransferPair{
     ${minor.class}Vector to;
 

http://git-wip-us.apache.org/repos/asf/drill/blob/79811db5/exec/vector/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java
index f812209..4def5b8 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java
@@ -87,4 +87,9 @@ public abstract class BaseDataValueVector extends BaseValueVector {
    * the value vector. The purpose is to move the value vector to a "mutate" state
    */
   public void reset() {}
+
+  @Override
+  public int getAllocatedByteCount() {
+    return data.capacity();
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/79811db5/exec/vector/src/main/java/org/apache/drill/exec/vector/BitVector.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/BitVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/BitVector.java
index 2b22f52..a6c0cea 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/BitVector.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/BitVector.java
@@ -449,4 +449,10 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
     this.valueCount = 0;
     super.clear();
   }
+
+  @Override
+  public int getPayloadByteCount() {
+    // One byte per value
+    return valueCount;
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/79811db5/exec/vector/src/main/java/org/apache/drill/exec/vector/ObjectVector.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/ObjectVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/ObjectVector.java
index 4479db0..f69dc98 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/ObjectVector.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/ObjectVector.java
@@ -218,4 +218,16 @@ public class ObjectVector extends BaseValueVector {
       holder.obj = getObject(index);
     }
   }
+
+  @Override
+  public int getAllocatedByteCount() {
+    // Values not stored in direct memory?
+    return 0;
+  }
+
+  @Override
+  public int getPayloadByteCount() {
+    // Values not stored in direct memory?
+    return 0;
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/79811db5/exec/vector/src/main/java/org/apache/drill/exec/vector/ValueVector.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/ValueVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/ValueVector.java
index 47cf143..f4c7935 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/ValueVector.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/ValueVector.java
@@ -176,6 +176,18 @@ public interface ValueVector extends Closeable, Iterable<ValueVector> {
   void load(SerializedField metadata, DrillBuf buffer);
 
   /**
+   * Return the total memory consumed by all buffers within this vector.
+   */
+
+  int getAllocatedByteCount();
+
+  /**
+   * Return the number of value bytes consumed by actual data.
+   */
+
+  int getPayloadByteCount();
+
+  /**
    * An abstraction that is used to read from this vector instance.
    */
   interface Accessor {

http://git-wip-us.apache.org/repos/asf/drill/blob/79811db5/exec/vector/src/main/java/org/apache/drill/exec/vector/VariableWidthVector.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/VariableWidthVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/VariableWidthVector.java
index ee9c039..d04234c 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/VariableWidthVector.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/VariableWidthVector.java
@@ -17,9 +17,7 @@
  */
 package org.apache.drill.exec.vector;
 
-import io.netty.buffer.DrillBuf;
-
-public interface VariableWidthVector extends ValueVector{
+public interface VariableWidthVector extends ValueVector {
 
   /**
    * Allocate a new memory space for this vector.  Must be called prior to using the ValueVector.

http://git-wip-us.apache.org/repos/asf/drill/blob/79811db5/exec/vector/src/main/java/org/apache/drill/exec/vector/ZeroVector.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/ZeroVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/ZeroVector.java
index 3f40d4c..9181f20 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/ZeroVector.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/ZeroVector.java
@@ -176,4 +176,14 @@ public class ZeroVector implements ValueVector {
 
   @Override
   public void load(UserBitShared.SerializedField metadata, DrillBuf buffer) { }
+
+  @Override
+  public int getAllocatedByteCount() {
+    return 0;
+  }
+
+  @Override
+  public int getPayloadByteCount() {
+    return 0;
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/79811db5/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java
index 08952ab..baba086 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java
@@ -266,7 +266,7 @@ public abstract class AbstractMapVector extends AbstractContainerVector {
 
   @Override
   public int getBufferSize() {
-    int actualBufSize = 0 ;
+    int actualBufSize = 0;
 
     for (final ValueVector v : vectors.values()) {
       for (final DrillBuf buf : v.getBuffers(false)) {
@@ -275,4 +275,24 @@ public abstract class AbstractMapVector extends AbstractContainerVector {
     }
     return actualBufSize;
   }
+
+  @Override
+  public int getAllocatedByteCount() {
+    int count = 0;
+
+    for (final ValueVector v : vectors.values()) {
+      count += v.getAllocatedByteCount();
+    }
+    return count;
+  }
+
+  @Override
+  public int getPayloadByteCount() {
+    int count = 0;
+
+    for (final ValueVector v : vectors.values()) {
+      count += v.getPayloadByteCount();
+    }
+    return count;
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/79811db5/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/BaseRepeatedValueVector.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/BaseRepeatedValueVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/BaseRepeatedValueVector.java
index bc90eda..1664b0a 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/BaseRepeatedValueVector.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/BaseRepeatedValueVector.java
@@ -209,6 +209,17 @@ public abstract class BaseRepeatedValueVector extends BaseValueVector implements
     vector = v;
   }
 
+
+  @Override
+  public int getAllocatedByteCount() {
+    return offsets.getAllocatedByteCount() + vector.getAllocatedByteCount();
+  }
+
+  @Override
+  public int getPayloadByteCount() {
+    return offsets.getPayloadByteCount() + vector.getPayloadByteCount();
+  }
+
   public abstract class BaseRepeatedAccessor extends BaseValueVector.BaseAccessor implements RepeatedAccessor {
 
     @Override
@@ -256,5 +267,4 @@ public abstract class BaseRepeatedValueVector extends BaseValueVector implements
       vector.getMutator().setValueCount(childValueCount);
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/79811db5/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/ListVector.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/ListVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/ListVector.java
index 33d6ddc..f71baa7 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/ListVector.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/ListVector.java
@@ -317,4 +317,14 @@ public class ListVector extends BaseRepeatedValueVector {
       bits.getMutator().setValueCount(valueCount);
     }
   }
+
+  @Override
+  public int getAllocatedByteCount() {
+    return offsets.getAllocatedByteCount() + bits.getAllocatedByteCount() + super.getAllocatedByteCount();
+  }
+
+  @Override
+  public int getPayloadByteCount() {
+    return offsets.getPayloadByteCount() + bits.getPayloadByteCount() + super.getPayloadByteCount();
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/79811db5/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java
index 0cc3628..b5c97bf 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java
@@ -426,4 +426,14 @@ public class RepeatedListVector extends AbstractContainerVector
   public void copyFromSafe(int fromIndex, int thisIndex, RepeatedListVector from) {
     delegate.copyFromSafe(fromIndex, thisIndex, from.delegate);
   }
+
+  @Override
+  public int getAllocatedByteCount() {
+    return delegate.getAllocatedByteCount();
+  }
+
+  @Override
+  public int getPayloadByteCount() {
+    return delegate.getPayloadByteCount();
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/79811db5/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
index 94cf4a6..3707ff0 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
@@ -584,4 +584,9 @@ public class RepeatedMapVector extends AbstractMapVector
       vector.clear();
     }
   }
+
+  @Override
+  public int getAllocatedByteCount() {
+    return super.getAllocatedByteCount( ) + offsets.getAllocatedByteCount();
+  }
 }


Mime
View raw message