drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amansi...@apache.org
Subject [3/3] drill git commit: DRILL-5080: Memory-managed version of external sort
Date Mon, 13 Feb 2017 03:50:42 GMT
DRILL-5080: Memory-managed version of external sort

Please see JIRA entry for reasons for revision, design spec and list of
changes.

This PR covers the changes to the external sort itself. Tests for this
operator require the test framework in DRILL-5126 and the mock data
source in DRILL-5152. Tests for this operator will be issued as a
separate PR once those two dependencies are committed.

Until then, the new operator is disabled by default. It can be enabled
using drill.sort.external.disable_managed: false.

The operator now spills before receiving a new batch. Revised memory calcs and
merge calcs to make them a bit clearer and provide more margin of error
for the power-of-two allocations used when allocating vectors.

We have two external sort implementations, but only one operator code
for both. They can use only one Metrics enum between them. When adding
new metrics to the new version, didn’t add matching metrics to the old
one. This fixes that issue. (The issue will go away once the old one is
retired.)

Revised memory calculations to reflect limit of 16 MB per vector.
Current revision limits to 16 MB per output batch to be safe. Next
revision will enforce per-vector limits to allow the overall batch to
be larger when possible.

Also simplified the merge-time calculations.

Original code provided only crude methods to learn the size of a record
batch. Adds a "RecordBatchSizer" to provide detailed analysis so the
sort can know the amount of memory used to buffer a batch, the number
of rows, and the expected row width once the rows are copied to a
spill file or the output.

Moved generic spill classes to a separate package.

Created parameters for spill batch size and merge batch size. Separated
these values in code. Deprecated the min, max spill parameters as they
no longer add much value. Minor code rearranging.

Bug fix

Fixes a corner case of merging spilled files in a low-memory condition.

Fixes from code review

close apache/drill#717


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/300e9349
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/300e9349
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/300e9349

Branch: refs/heads/master
Commit: 300e9349a52df7f6ad866a78868602005eadb392
Parents: c9a6ac4
Author: Paul Rogers <progers@maprtech.com>
Authored: Thu Dec 15 19:54:05 2016 -0800
Committer: Aman Sinha <asinha@maprtech.com>
Committed: Sun Feb 12 19:41:34 2017 -0800

----------------------------------------------------------------------
 .../org/apache/drill/exec/ExecConstants.java    |   16 +
 .../drill/exec/physical/base/AbstractBase.java  |   20 +-
 .../exec/physical/config/ExternalSort.java      |   34 +-
 .../physical/impl/spill/RecordBatchSizer.java   |  293 ++++
 .../exec/physical/impl/spill/SpillSet.java      |  431 ++++++
 .../exec/physical/impl/spill/package-info.java  |   22 +
 .../physical/impl/xsort/ExternalSortBatch.java  |   16 +-
 .../impl/xsort/ExternalSortBatchCreator.java    |   26 +-
 .../physical/impl/xsort/managed/BatchGroup.java |  367 +++++
 .../impl/xsort/managed/CopierHolder.java        |  307 ++++
 .../impl/xsort/managed/ExternalSortBatch.java   | 1456 ++++++++++++++++++
 .../impl/xsort/managed/MSortTemplate.java       |  237 +++
 .../physical/impl/xsort/managed/MSorter.java    |   40 +
 .../physical/impl/xsort/managed/MergeSort.java  |  167 ++
 .../xsort/managed/OperatorCodeGenerator.java    |  259 ++++
 .../impl/xsort/managed/PriorityQueueCopier.java |   40 +
 .../managed/PriorityQueueCopierTemplate.java    |  170 ++
 .../server/options/SystemOptionManager.java     |   16 +-
 .../exec/util/MemoryAllocationUtilities.java    |    8 +-
 .../src/main/resources/drill-module.conf        |   54 +-
 .../physical/impl/xsort/TestExternalSort.java   |  113 +-
 .../apache/drill/exec/memory/BaseAllocator.java |   13 +-
 22 files changed, 4011 insertions(+), 94 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/300e9349/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 07cc3a7..d739f88 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -63,13 +63,29 @@ public interface ExecConstants {
   String SPOOLING_BUFFER_DELETE = "drill.exec.buffer.spooling.delete";
   String SPOOLING_BUFFER_MEMORY = "drill.exec.buffer.spooling.size";
   String BATCH_PURGE_THRESHOLD = "drill.exec.sort.purge.threshold";
+
+  // External Sort Boot configuration
+
   String EXTERNAL_SORT_TARGET_BATCH_SIZE = "drill.exec.sort.external.batch.size";
   String EXTERNAL_SORT_TARGET_SPILL_BATCH_SIZE = "drill.exec.sort.external.spill.batch.size";
   String EXTERNAL_SORT_SPILL_GROUP_SIZE = "drill.exec.sort.external.spill.group.size";
   String EXTERNAL_SORT_SPILL_THRESHOLD = "drill.exec.sort.external.spill.threshold";
   String EXTERNAL_SORT_SPILL_DIRS = "drill.exec.sort.external.spill.directories";
   String EXTERNAL_SORT_SPILL_FILESYSTEM = "drill.exec.sort.external.spill.fs";
+  String EXTERNAL_SORT_SPILL_FILE_SIZE = "drill.exec.sort.external.spill.file_size";
   String EXTERNAL_SORT_MSORT_MAX_BATCHSIZE = "drill.exec.sort.external.msort.batch.maxsize";
+  String EXTERNAL_SORT_DISABLE_MANAGED = "drill.exec.sort.external.disable_managed";
+  String EXTERNAL_SORT_MERGE_LIMIT = "drill.exec.sort.external.merge_limit";
+  String EXTERNAL_SORT_SPILL_BATCH_SIZE = "drill.exec.sort.external.spill.spill_batch_size";
+  String EXTERNAL_SORT_MERGE_BATCH_SIZE = "drill.exec.sort.external.spill.merge_batch_size";
+  String EXTERNAL_SORT_MAX_MEMORY = "drill.exec.sort.external.mem_limit";
+  String EXTERNAL_SORT_BATCH_LIMIT = "drill.exec.sort.external.batch_limit";
+
+  // External Sort Runtime options
+
+  BooleanValidator EXTERNAL_SORT_DISABLE_MANAGED_OPTION = new BooleanValidator("exec.sort.disable_managed", false);
+
+
   String TEXT_LINE_READER_BATCH_SIZE = "drill.exec.storage.file.text.batch.size";
   String TEXT_LINE_READER_BUFFER_SIZE = "drill.exec.storage.file.text.buffer.size";
   String HAZELCAST_SUBNETS = "drill.exec.cache.hazel.subnets";

http://git-wip-us.apache.org/repos/asf/drill/blob/300e9349/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java
index c7b0e7e..526d728 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -27,8 +27,8 @@ public abstract class AbstractBase implements PhysicalOperator{
 
   private final String userName;
 
-  protected long initialAllocation = 1000000L;
-  protected long maxAllocation = 10000000000L;
+  protected long initialAllocation = 1_000_000L;
+  protected long maxAllocation = 10_000_000_000L;
   private int id;
   private double cost;
 
@@ -78,19 +78,33 @@ public abstract class AbstractBase implements PhysicalOperator{
     return SelectionVectorMode.NONE;
   }
 
+  // Not available. Presumably because Drill does not currently use
+  // this value, though it does appear in some test physical plans.
+//  public void setInitialAllocation(long alloc) {
+//    initialAllocation = alloc;
+//  }
+
   @Override
   public long getInitialAllocation() {
     return initialAllocation;
   }
 
+  @Override
   public double getCost() {
     return cost;
   }
 
+  @Override
   public void setCost(double cost) {
     this.cost = cost;
   }
 
+  // Not available. Presumably because Drill does not currently use
+  // this value, though it does appear in some test physical plans.
+//  public void setMaxAllocation(long alloc) {
+//    maxAllocation = alloc;
+//  }
+
   @Override
   public long getMaxAllocation() {
     return maxAllocation;

http://git-wip-us.apache.org/repos/asf/drill/blob/300e9349/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/ExternalSort.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/ExternalSort.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/ExternalSort.java
index 456b9c0..17848d0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/ExternalSort.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/ExternalSort.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -21,7 +21,6 @@ import java.util.List;
 
 import org.apache.drill.common.logical.data.Order.Ordering;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.physical.base.PhysicalVisitor;
 import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
@@ -32,26 +31,10 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
 public class ExternalSort extends Sort {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExternalSort.class);
 
-  private long initialAllocation = 20000000;
-
   @JsonCreator
   public ExternalSort(@JsonProperty("child") PhysicalOperator child, @JsonProperty("orderings") List<Ordering> orderings, @JsonProperty("reverse") boolean reverse) {
     super(child, orderings, reverse);
-  }
-
-  @Override
-  public List<Ordering> getOrderings() {
-    return orderings;
-  }
-
-  @Override
-  public boolean getReverse() {
-    return reverse;
-  }
-
-  @Override
-  public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E{
-    return physicalVisitor.visitSort(this, value);
+    initialAllocation = 20_000_000;
   }
 
   @Override
@@ -66,13 +49,12 @@ public class ExternalSort extends Sort {
     return CoreOperatorType.EXTERNAL_SORT_VALUE;
   }
 
-  public void setMaxAllocation(long maxAllocation) {
-    this.maxAllocation = Math.max(initialAllocation, maxAllocation);
-  }
+  // Set here, rather than the base class, because this is the only
+  // operator, at present, that makes use of the maximum allocation.
+  // Remove this, in favor of the base class version, when Drill
+  // sets the memory allocation for all operators.
 
-  @Override
-  public long getInitialAllocation() {
-    return initialAllocation;
+  public void setMaxAllocation(long maxAllocation) {
+    this.maxAllocation = maxAllocation;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/300e9349/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/RecordBatchSizer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/RecordBatchSizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/RecordBatchSizer.java
new file mode 100644
index 0000000..05354e5
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/RecordBatchSizer.java
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.spill;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.memory.BaseAllocator;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.vector.BaseDataValueVector;
+import org.apache.drill.exec.vector.FixedWidthVector;
+import org.apache.drill.exec.vector.NullableVarCharVector;
+import org.apache.drill.exec.vector.NullableVector;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.VarCharVector;
+
+import io.netty.buffer.DrillBuf;
+
+/**
+ * Given a record batch or vector container, determines the actual memory
+ * consumed by each column, the average row, and the entire record batch.
+ */
+
+public class RecordBatchSizer {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RecordBatchSizer.class);
+
+  /**
+   * Column size information.
+   */
+  public static class ColumnSize {
+    public final MaterializedField metadata;
+
+    /**
+     * Assumed size from Drill metadata.
+     */
+    public int stdSize;
+    /**
+     * Actual memory consumed by all the vectors associated with this column.
+     */
+    public int totalSize;
+    /**
+     * Actual average column width as determined from actual memory use. This
+     * size is larger than the actual data size since this size includes per-
+     * column overhead such as any unused vector space, etc.
+     */
+    public int estSize;
+
+    /**
+     * The size of the data vector backing the column. Useful for detecting
+     * cases of possible direct memory fragmentation.
+     */
+    public int dataVectorSize;
+    public int capacity;
+    public int density;
+    public int dataSize;
+
+    @SuppressWarnings("resource")
+    public ColumnSize(VectorWrapper<?> vw) {
+      metadata = vw.getField();
+      stdSize = TypeHelper.getSize(metadata.getType());
+
+      // Can't get size estimates if this is an empty batch.
+
+      ValueVector v = vw.getValueVector();
+      int rowCount = v.getAccessor().getValueCount();
+      if (rowCount == 0) {
+        return;
+      }
+      DrillBuf[] bufs = v.getBuffers(false);
+      for (DrillBuf buf : bufs) {
+        totalSize += buf.capacity();
+      }
+
+      // Capacity is the number of values that the vector could
+      // contain. This is useful only for fixed-length vectors.
+
+      capacity = v.getValueCapacity();
+
+      // Crude way to get the size of the buffer underlying simple (scalar) values.
+      // Ignores maps, lists and other esoterica. Uses a crude way to subtract out
+      // the null "bit" (really byte) buffer size for nullable vectors.
+
+      if (v instanceof BaseDataValueVector) {
+        dataVectorSize = totalSize;
+        if (v instanceof NullableVector) {
+          dataVectorSize -= bufs[0].getActualMemoryConsumed();
+        }
+      }
+
+      // Determine "density" the number of rows compared to potential
+      // capacity. Low-density batches occur at block boundaries, ends
+      // of files and so on. Low-density batches throw off our estimates
+      // for Varchar columns because we don't know the actual number of
+      // bytes consumed (that information is hidden behind the Varchar
+      // implementation where we can't get at it.)
+      //
+      // A better solution is to have each vector do this calc rather
+      // than trying to do it generically, but that increases the code
+      // change footprint and slows the commit process.
+
+      if (v instanceof FixedWidthVector) {
+        dataSize = stdSize * rowCount;
+      } else if ( v instanceof VarCharVector ) {
+        VarCharVector vv = (VarCharVector) v;
+        dataSize = vv.getOffsetVector().getAccessor().get(rowCount);
+      } else if ( v instanceof NullableVarCharVector ) {
+        NullableVarCharVector vv = (NullableVarCharVector) v;
+        dataSize = vv.getValuesVector().getOffsetVector().getAccessor().get(rowCount);
+      } else {
+        dataSize = 0;
+      }
+      if (dataSize > 0) {
+        density = roundUp(dataSize * 100, dataVectorSize);
+        estSize = roundUp(dataSize, rowCount);
+      }
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder buf = new StringBuilder();
+      buf.append(metadata.getName());
+      buf.append("(std col. size: ");
+      buf.append(stdSize);
+      buf.append(", actual col. size: ");
+      buf.append(estSize);
+      buf.append(", total size: ");
+      buf.append(totalSize);
+      buf.append(", vector size: ");
+      buf.append(dataVectorSize);
+      buf.append(", data size: ");
+      buf.append(dataSize);
+      buf.append(", row capacity: ");
+      buf.append(capacity);
+      buf.append(", density: ");
+      buf.append(density);
+      buf.append(")");
+      return buf.toString();
+    }
+  }
+
+  List<ColumnSize> columnSizes = new ArrayList<>();
+
+  /**
+   * Number of records (rows) in the batch.
+   */
+  private int rowCount;
+  /**
+   * Standard row width using Drill meta-data.
+   */
+  private int stdRowWidth;
+  /**
+   * Actual batch size summing all buffers used to store data
+   * for the batch.
+   */
+  private int totalBatchSize;
+  /**
+   * Actual row width computed by dividing total batch memory by the
+   * record count.
+   */
+  private int grossRowWidth;
+  /**
+   * Actual row width computed by summing columns. Use this if the
+   * vectors are partially full; prevents overestimating row width.
+   */
+  private int netRowWidth;
+  private boolean hasSv2;
+  private int sv2Size;
+  private int avgDensity;
+
+  public RecordBatchSizer(VectorAccessible va) {
+    rowCount = va.getRecordCount();
+    for (VectorWrapper<?> vw : va) {
+      measureField(vw);
+    }
+
+    if (rowCount > 0) {
+      grossRowWidth = roundUp(totalBatchSize, rowCount);
+    }
+
+    hasSv2 = va.getSchema().getSelectionVectorMode() == BatchSchema.SelectionVectorMode.TWO_BYTE;
+    if (hasSv2) {
+      @SuppressWarnings("resource")
+      SelectionVector2 sv2 = va.getSelectionVector2();
+      sv2Size = sv2.getBuffer().capacity();
+      grossRowWidth += sv2Size;
+      netRowWidth += 2;
+    }
+
+    int totalDensity = 0;
+    int usableCount = 0;
+    for (ColumnSize colSize : columnSizes) {
+      if ( colSize.density > 0 ) {
+        usableCount++;
+      }
+      totalDensity += colSize.density;
+    }
+    avgDensity = roundUp(totalDensity, usableCount);
+  }
+
+  public void applySv2() {
+    if (hasSv2) {
+      return;
+    }
+
+    sv2Size = BaseAllocator.nextPowerOfTwo(2 * rowCount);
+    grossRowWidth += roundUp(sv2Size, rowCount);
+    totalBatchSize += sv2Size;
+  }
+
+  private void measureField(VectorWrapper<?> vw) {
+    ColumnSize colSize = new ColumnSize(vw);
+    columnSizes.add(colSize);
+
+    stdRowWidth += colSize.stdSize;
+    totalBatchSize += colSize.totalSize;
+    netRowWidth += colSize.estSize;
+  }
+
+  public static int roundUp(int num, int denom) {
+    if(denom == 0) {
+      return 0;
+    }
+    return (int) Math.ceil((double) num / denom);
+  }
+
+  public int rowCount() { return rowCount; }
+  public int stdRowWidth() { return stdRowWidth; }
+  public int grossRowWidth() { return grossRowWidth; }
+  public int netRowWidth() { return netRowWidth; }
+  public int actualSize() { return totalBatchSize; }
+  public boolean hasSv2() { return hasSv2; }
+  public int getAvgDensity() { return avgDensity; }
+
+  public static final int MAX_VECTOR_SIZE = 16 * 1024 * 1024; // 16 MiB
+
+  /**
+   * Look for columns backed by vectors larger than the 16 MiB size
+   * employed by the Netty allocator. Such large blocks can lead to
+   * memory fragmentation and unexpected OOM errors.
+   * @return true if any column is oversized
+   */
+  public boolean checkOversizeCols() {
+    boolean hasOversize = false;
+    for (ColumnSize colSize : columnSizes) {
+      if ( colSize.dataVectorSize > MAX_VECTOR_SIZE) {
+        logger.warn( "Column is wider than 256 characters: OOM due to memory fragmentation is possible - " + colSize.metadata.getPath() );
+        hasOversize = true;
+      }
+    }
+    return hasOversize;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder buf = new StringBuilder();
+    buf.append("Actual batch schema & sizes {\n");
+    for (ColumnSize colSize : columnSizes) {
+      buf.append("  ");
+      buf.append(colSize.toString());
+      buf.append("\n");
+    }
+    buf.append( "  Records: " );
+    buf.append(rowCount);
+    buf.append(", Total size: ");
+    buf.append(totalBatchSize);
+    buf.append(", Row width:");
+    buf.append(grossRowWidth);
+    buf.append(", Density:");
+    buf.append(avgDensity);
+    buf.append("}");
+    return buf.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/300e9349/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/SpillSet.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/SpillSet.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/SpillSet.java
new file mode 100644
index 0000000..4615500
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/SpillSet.java
@@ -0,0 +1,431 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.spill;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Iterator;
+import java.util.Set;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Sets;
+
+/**
+ * Generates the set of spill files for this sort session.
+ */
+
+public class SpillSet {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SpillSet.class);
+
+  /**
+   * Spilling on the Mac using the HDFS file system is very inefficient,
+   * affects performance numbers. This interface allows using HDFS in
+   * production, but to bypass the HDFS file system when needed.
+   */
+
+  private interface FileManager {
+
+    void deleteOnExit(String fragmentSpillDir) throws IOException;
+
+    OutputStream createForWrite(String fileName) throws IOException;
+
+    InputStream openForInput(String fileName) throws IOException;
+
+    void deleteFile(String fileName) throws IOException;
+
+    void deleteDir(String fragmentSpillDir) throws IOException;
+
+    /**
+     * Given a manager-specific output stream, return the current write position.
+     * Used to report total write bytes.
+     *
+     * @param outputStream output stream created by the file manager
+     * @return
+     */
+    long getWriteBytes(OutputStream outputStream);
+
+    /**
+     * Given a manager-specific input stream, return the current read position.
+     * Used to report total read bytes.
+     *
+     * @param outputStream input stream created by the file manager
+     * @return
+     */
+    long getReadBytes(InputStream inputStream);
+  }
+
+  /**
+   * Normal implementation of spill files using the HDFS file system.
+   */
+
+  private static class HadoopFileManager implements FileManager{
+    /**
+     * The HDFS file system (for local directories, HDFS storage, etc.) used to
+     * create the temporary spill files. Allows spill files to be either on local
+     * disk, or in a DFS. (The admin can choose to put spill files in DFS when
+     * nodes provide insufficient local disk space)
+     */
+
+    private FileSystem fs;
+
+    protected HadoopFileManager(String fsName) {
+      Configuration conf = new Configuration();
+      conf.set("fs.default.name", fsName);
+      try {
+        fs = FileSystem.get(conf);
+      } catch (IOException e) {
+        throw UserException.resourceError(e)
+              .message("Failed to get the File System for external sort")
+              .build(logger);
+      }
+    }
+
+    @Override
+    public void deleteOnExit(String fragmentSpillDir) throws IOException {
+      fs.deleteOnExit(new Path(fragmentSpillDir));
+    }
+
+    @Override
+    public OutputStream createForWrite(String fileName) throws IOException {
+      return fs.create(new Path(fileName));
+    }
+
+    @Override
+    public InputStream openForInput(String fileName) throws IOException {
+      return fs.open(new Path(fileName));
+    }
+
+    @Override
+    public void deleteFile(String fileName) throws IOException {
+      Path path = new Path(fileName);
+      if (fs.exists(path)) {
+        fs.delete(path, false);
+      }
+    }
+
+    @Override
+    public void deleteDir(String fragmentSpillDir) throws IOException {
+      Path path = new Path(fragmentSpillDir);
+      if (path != null && fs.exists(path)) {
+        if (fs.delete(path, true)) {
+            fs.cancelDeleteOnExit(path);
+        }
+      }
+    }
+
+    @Override
+    public long getWriteBytes(OutputStream outputStream) {
+      try {
+        return ((FSDataOutputStream) outputStream).getPos();
+      } catch (IOException e) {
+        // Just used for logging, not worth dealing with the exception.
+        return 0;
+      }
+    }
+
+    @Override
+    public long getReadBytes(InputStream inputStream) {
+      try {
+        return ((FSDataInputStream) inputStream).getPos();
+      } catch (IOException e) {
+        // Just used for logging, not worth dealing with the exception.
+        return 0;
+      }
+    }
+  }
+
+  public static class CountingInputStream extends InputStream
+  {
+    private InputStream in;
+    private long count;
+
+    public CountingInputStream(InputStream in) {
+      this.in = in;
+    }
+
+    @Override
+    public int read() throws IOException {
+      int b = in.read();
+      if (b != -1) {
+        count++;
+      }
+      return b;
+    }
+
+    @Override
+    public int read(byte b[]) throws IOException {
+      int n = in.read(b);
+      if (n != -1) {
+        count += n;
+      }
+      return n;
+    }
+
+    @Override
+    public int read(byte b[], int off, int len) throws IOException {
+      int n = in.read(b, off, len);
+      if (n != -1) {
+        count += n;
+      }
+      return n;
+    }
+
+    @Override
+    public long skip(long n) throws IOException {
+      return in.skip(n);
+    }
+
+    @Override
+    public void close() throws IOException {
+      in.close();
+    }
+
+    public long getCount() { return count; }
+  }
+
+  public static class CountingOutputStream extends OutputStream {
+
+    private OutputStream out;
+    private long count;
+
+    public CountingOutputStream(OutputStream out) {
+      this.out = out;
+    }
+
+    @Override
+    public void write(int b) throws IOException {
+      count++;
+      out.write(b);
+    }
+
+    @Override
+    public void write(byte[] b) throws IOException {
+      count += b.length;
+      out.write(b);
+    }
+
+    @Override
+    public void write(byte[] b, int off, int len) throws IOException {
+      count += len;
+      out.write(b, off, len);
+    }
+
+    @Override
+    public void flush() throws IOException {
+      out.flush();
+    }
+
+    @Override
+    public void close() throws IOException {
+      out.close();
+    }
+
+    public long getCount() { return count; }
+  }
+
+  /**
+   * Performance-oriented direct access to the local file system which
+   * bypasses HDFS.
+   */
+
+  private static class LocalFileManager implements FileManager {
+
+    private File baseDir;
+
+    public LocalFileManager(String fsName) {
+      baseDir = new File(fsName.replace("file://", ""));
+    }
+
+    @Override
+    public void deleteOnExit(String fragmentSpillDir) throws IOException {
+      File dir = new File(baseDir, fragmentSpillDir);
+      dir.mkdirs();
+      dir.deleteOnExit();
+    }
+
+    @SuppressWarnings("resource")
+    @Override
+    public OutputStream createForWrite(String fileName) throws IOException {
+      return new CountingOutputStream(
+                new BufferedOutputStream(
+                    new FileOutputStream(new File(baseDir, fileName))));
+    }
+
+    @SuppressWarnings("resource")
+    @Override
+    public InputStream openForInput(String fileName) throws IOException {
+      return new CountingInputStream(
+                new BufferedInputStream(
+                    new FileInputStream(new File(baseDir, fileName))));
+    }
+
+    @Override
+    public void deleteFile(String fileName) throws IOException {
+      new File(baseDir, fileName).delete();
+    }
+
+    @Override
+    public void deleteDir(String fragmentSpillDir) throws IOException {
+      new File(baseDir, fragmentSpillDir).delete();
+    }
+
+    @Override
+    public long getWriteBytes(OutputStream outputStream) {
+      return ((CountingOutputStream) outputStream).getCount();
+    }
+
+    @Override
+    public long getReadBytes(InputStream inputStream) {
+      return ((CountingInputStream) inputStream).getCount();
+    }
+  }
+
+  private final Iterator<String> dirs;
+
+  /**
+   * Set of directories to which this operator should write spill files in a round-robin
+   * fashion. The operator requires at least one spill directory, but can
+   * support any number. The admin must ensure that sufficient space exists
+   * on all directories as this operator does not check space availability
+   * before writing to the directories.
+   */
+
+  private Set<String> currSpillDirs = Sets.newTreeSet();
+
+  /**
+   * The base part of the file name for spill files. Each file has this
+   * name plus an appended spill serial number.
+   */
+
+  private final String spillDirName;
+
+  private int fileCount = 0;
+
+  private FileManager fileManager;
+
+  private long readBytes;
+
+  private long writeBytes;
+
+  public SpillSet(FragmentContext context, PhysicalOperator popConfig) {
+    DrillConfig config = context.getConfig();
+    dirs = Iterators.cycle(config.getStringList(ExecConstants.EXTERNAL_SORT_SPILL_DIRS));
+
+    // Use the high-performance local file system if the local file
+    // system is selected and impersonation is off. (We use that
+    // as a proxy for a non-production Drill setup.)
+
+    String spillFs = config.getString(ExecConstants.EXTERNAL_SORT_SPILL_FILESYSTEM);
+    boolean impersonationEnabled = config.getBoolean(ExecConstants.IMPERSONATION_ENABLED);
+    if (spillFs.startsWith("file:///") && ! impersonationEnabled) {
+      fileManager = new LocalFileManager(spillFs);
+    } else {
+      fileManager = new HadoopFileManager(spillFs);
+    }
+    FragmentHandle handle = context.getHandle();
+    spillDirName = String.format("%s_major%s_minor%s_op%s", QueryIdHelper.getQueryId(handle.getQueryId()),
+        handle.getMajorFragmentId(), handle.getMinorFragmentId(), popConfig.getOperatorId());
+  }
+
+  public String getNextSpillFile() {
+
+    // Identify the next directory from the round-robin list to
+    // the file created from this round of spilling. The directory must
+    // must have sufficient space for the output file.
+
+    String spillDir = dirs.next();
+    String currSpillPath = Joiner.on("/").join(spillDir, spillDirName);
+    currSpillDirs.add(currSpillPath);
+    String outputFile = Joiner.on("/").join(currSpillPath, "spill" + ++fileCount);
+    try {
+        fileManager.deleteOnExit(currSpillPath);
+    } catch (IOException e) {
+        // since this is meant to be used in a batches's spilling, we don't propagate the exception
+        logger.warn("Unable to mark spill directory " + currSpillPath + " for deleting on exit", e);
+    }
+    return outputFile;
+  }
+
+  public boolean hasSpilled() {
+    return fileCount > 0;
+  }
+
+  public int getFileCount() { return fileCount; }
+
+  public InputStream openForInput(String fileName) throws IOException {
+    return fileManager.openForInput(fileName);
+  }
+
+  public OutputStream openForOutput(String fileName) throws IOException {
+    return fileManager.createForWrite(fileName);
+  }
+
+  public void delete(String fileName) throws IOException {
+    fileManager.deleteFile(fileName);
+  }
+
+  public long getWriteBytes() { return writeBytes; }
+  public long getReadBytes() { return readBytes; }
+
+  public void close() {
+    for (String path : currSpillDirs) {
+      try {
+        fileManager.deleteDir(path);
+      } catch (IOException e) {
+          // since this is meant to be used in a batches's cleanup, we don't propagate the exception
+          logger.warn("Unable to delete spill directory " + path,  e);
+      }
+    }
+  }
+
+  public long getPosition(InputStream inputStream) {
+    return fileManager.getReadBytes(inputStream);
+  }
+
+  public long getPosition(OutputStream outputStream) {
+    return fileManager.getWriteBytes(outputStream);
+  }
+
+  public void tallyReadBytes(long readLength) {
+    readBytes += readLength;
+  }
+
+  public void tallyWriteBytes(long writeLength) {
+    writeBytes += writeLength;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/300e9349/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/package-info.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/package-info.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/package-info.java
new file mode 100644
index 0000000..6cc74f5
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Collection of classes shared by operators that implement spill-to-disk.
+ */
+
+package org.apache.drill.exec.physical.impl.spill;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/300e9349/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
index 8fe05f0..c009cc0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
@@ -133,10 +133,22 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
   public static final String INTERRUPTION_AFTER_SETUP = "after-setup";
   public static final String INTERRUPTION_WHILE_SPILLING = "spilling";
 
+  // Be careful here! This enum is used in TWO places! First, it is used
+  // in this code to build up metrics. Easy enough. But, it is also used
+  // in OperatorMetricRegistry to define the metrics for the
+  // operator ID defined in CoreOperatorType. As a result, the values
+  // defined here are shared between this legacy version AND the new
+  // managed version. (Though the new, managed version has its own
+  // copy of this enum.) The two enums MUST be identical.
+
   public enum Metric implements MetricDef {
     SPILL_COUNT,            // number of times operator spilled to disk
-    PEAK_SIZE_IN_MEMORY,    // peak value for totalSizeInMemory
-    PEAK_BATCHES_IN_MEMORY; // maximum number of batches kept in memory
+    RETIRED1,               // Was: peak value for totalSizeInMemory
+                            // But operator already provides this value
+    PEAK_BATCHES_IN_MEMORY, // maximum number of batches kept in memory
+    MERGE_COUNT,            // Used only by the managed version.
+    MIN_BUFFER,             // Used only by the managed version.
+    INPUT_BATCHES;          // Used only by the managed version.
 
     @Override
     public int metricId() {

http://git-wip-us.apache.org/repos/asf/drill/blob/300e9349/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatchCreator.java
index b9f6396..e579fc2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatchCreator.java
@@ -19,23 +19,41 @@ package org.apache.drill.exec.physical.impl.xsort;
 
 import java.util.List;
 
+import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.config.ExternalSort;
 import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.record.AbstractRecordBatch;
 import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.server.options.OptionManager;
 
 import com.google.common.base.Preconditions;
 
 public class ExternalSortBatchCreator implements BatchCreator<ExternalSort>{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExternalSortBatchCreator.class);
 
   @Override
-  public ExternalSortBatch getBatch(FragmentContext context, ExternalSort config, List<RecordBatch> children)
+  public AbstractRecordBatch<ExternalSort> getBatch(FragmentContext context, ExternalSort config, List<RecordBatch> children)
       throws ExecutionSetupException {
     Preconditions.checkArgument(children.size() == 1);
-    return new ExternalSortBatch(config, context, children.iterator().next());
-  }
 
+    // Prefer the managed version, but provide runtime and boot-time options
+    // to disable it and revert to the "legacy" version. The legacy version
+    // is retained primarily to allow cross-check testing against the managed
+    // version, and as a fall back in the first release of the managed version.
 
+    OptionManager optionManager = context.getOptions();
+    boolean disableManaged = optionManager.getOption(ExecConstants.EXTERNAL_SORT_DISABLE_MANAGED_OPTION);
+    if ( ! disableManaged ) {
+      DrillConfig drillConfig = context.getConfig();
+      disableManaged = drillConfig.hasPath(ExecConstants.EXTERNAL_SORT_DISABLE_MANAGED) &&
+                       drillConfig.getBoolean(ExecConstants.EXTERNAL_SORT_DISABLE_MANAGED);
+    }
+    if (disableManaged) {
+      return new ExternalSortBatch(config, context, children.iterator().next());
+    } else {
+      return new org.apache.drill.exec.physical.impl.xsort.managed.ExternalSortBatch(config, context, children.iterator().next());
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/300e9349/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BatchGroup.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BatchGroup.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BatchGroup.java
new file mode 100644
index 0000000..cd5cd1f
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BatchGroup.java
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.xsort.managed;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Iterator;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.cache.VectorAccessibleSerializable;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.spill.SpillSet;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.SchemaUtil;
+import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.record.WritableBatch;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+
+import com.google.common.base.Stopwatch;
+
+/**
+ * Represents a group of batches spilled to disk.
+ * <p>
+ * The batches are defined by a schema which can change over time. When the schema changes,
+ * all existing and new batches are coerced into the new schema. Provides a
+ * uniform way to iterate over records for one or more batches whether
+ * the batches are in memory or on disk.
+ * <p>
+ * The <code>BatchGroup</code> operates in two modes as given by the two
+ * subclasses:
+ * <ul>
+ * <li>Input mode (@link InputBatchGroup): Used to buffer in-memory batches
+ * prior to spilling.</li>
+ * <li>Spill mode (@link SpilledBatchGroup): Holds a "memento" to a set
+ * of batches written to disk. Acts as both a reader and writer for
+ * those batches.</li>
+ */
+
+public abstract class BatchGroup implements VectorAccessible, AutoCloseable {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BatchGroup.class);
+
+  /**
+   * The input batch group gathers batches buffered in memory before
+   * spilling. The structure of the data is:
+   * <ul>
+   * <li>Contains a single batch received from the upstream (input)
+   * operator.</li>
+   * <li>Associated selection vector that provides a sorted
+   * indirection to the values in the batch.</li>
+   * </ul>
+   */
+
+  public static class InputBatch extends BatchGroup {
+    private SelectionVector2 sv2;
+
+    public InputBatch(VectorContainer container, SelectionVector2 sv2, OperatorContext context, long batchSize) {
+      super(container, context, batchSize);
+      this.sv2 = sv2;
+    }
+
+    public SelectionVector2 getSv2() {
+      return sv2;
+    }
+
+    @Override
+    public int getRecordCount() {
+      if (sv2 != null) {
+        return sv2.getCount();
+      } else {
+        return super.getRecordCount();
+      }
+    }
+
+    @Override
+    public int getNextIndex() {
+      int val = super.getNextIndex();
+      if (val == -1) {
+        return val;
+      }
+      return sv2.getIndex(val);
+    }
+
+    @Override
+    public void close() throws IOException {
+      try {
+        super.close();
+      }
+      finally {
+        if (sv2 != null) {
+          sv2.clear();
+        }
+      }
+    }
+  }
+
+  /**
+   * Holds a set of spilled batches, represented by a file on disk.
+   * Handles reads from, and writes to the spill file. The data structure
+   * is:
+   * <ul>
+   * <li>A pointer to a file that contains serialized batches.</li>
+   * <li>When writing, each batch is appended to the output file.</li>
+   * <li>When reading, iterates over each spilled batch, and for each
+   * of those, each spilled record.</li>
+   * </ul>
+   * <p>
+   * Starts out with no current batch. Defines the current batch to be the
+   * (shell: schema without data) of the last batch spilled to disk.
+   * <p>
+   * When reading, has destructive read-once behavior: closing the
+   * batch (after reading) deletes the underlying spill file.
+   * <p>
+   * This single class does three tasks: load data, hold data and
+   * read data. This should be split into three separate classes. But,
+   * the original (combined) structure is retained for expedience at
+   * present.
+   */
+
+  public static class SpilledRun extends BatchGroup {
+    private InputStream inputStream;
+    private OutputStream outputStream;
+    private String path;
+    private SpillSet spillSet;
+    private BufferAllocator allocator;
+    private int spilledBatches = 0;
+
+    public SpilledRun(SpillSet spillSet, String path, OperatorContext context, long batchSize) throws IOException {
+      super(null, context, batchSize);
+      this.spillSet = spillSet;
+      this.path = path;
+      this.allocator = context.getAllocator();
+      outputStream = spillSet.openForOutput(path);
+    }
+
+    public void addBatch(VectorContainer newContainer) throws IOException {
+      int recordCount = newContainer.getRecordCount();
+      @SuppressWarnings("resource")
+      WritableBatch batch = WritableBatch.getBatchNoHVWrap(recordCount, newContainer, false);
+      VectorAccessibleSerializable outputBatch = new VectorAccessibleSerializable(batch, allocator);
+      Stopwatch watch = Stopwatch.createStarted();
+      outputBatch.writeToStream(outputStream);
+      newContainer.zeroVectors();
+      logger.trace("Wrote {} records in {} us", recordCount, watch.elapsed(TimeUnit.MICROSECONDS));
+      spilledBatches++;
+
+      // Hold onto the husk of the last added container so that we have a
+      // current container when starting to read rows back later.
+
+      currentContainer = newContainer;
+      currentContainer.setRecordCount(0);
+    }
+
+    @Override
+    public int getNextIndex() {
+      if (pointer == getRecordCount()) {
+        if (spilledBatches == 0) {
+          return -1;
+        }
+        try {
+          currentContainer.zeroVectors();
+          getBatch();
+        } catch (IOException e) {
+          // Release any partially-loaded data.
+          currentContainer.clear();
+          throw UserException.dataReadError(e)
+              .message("Failure while reading spilled data")
+              .build(logger);
+        }
+
+        // The pointer indicates the NEXT index, not the one we
+        // return here. At this point, we just started reading a
+        // new batch and have returned index 0. So, the next index
+        // is 1.
+
+        pointer = 1;
+        return 0;
+      }
+      return super.getNextIndex();
+    }
+
+    private VectorContainer getBatch() throws IOException {
+      if (inputStream == null) {
+        inputStream = spillSet.openForInput(path);
+      }
+      VectorAccessibleSerializable vas = new VectorAccessibleSerializable(allocator);
+      Stopwatch watch = Stopwatch.createStarted();
+      vas.readFromStream(inputStream);
+      VectorContainer c =  vas.get();
+      if (schema != null) {
+        c = SchemaUtil.coerceContainer(c, schema, context);
+      }
+      logger.trace("Read {} records in {} us", c.getRecordCount(), watch.elapsed(TimeUnit.MICROSECONDS));
+      spilledBatches--;
+      currentContainer.zeroVectors();
+      Iterator<VectorWrapper<?>> wrapperIterator = c.iterator();
+      for (@SuppressWarnings("rawtypes") VectorWrapper w : currentContainer) {
+        TransferPair pair = wrapperIterator.next().getValueVector().makeTransferPair(w.getValueVector());
+        pair.transfer();
+      }
+      currentContainer.setRecordCount(c.getRecordCount());
+      c.zeroVectors();
+      return c;
+    }
+
+    /**
+     * Close resources owned by this batch group. Each can fail; report
+     * only the first error. This is cluttered because this class tries
+     * to do multiple tasks. TODO: Split into multiple classes.
+     */
+
+    @Override
+    public void close() throws IOException {
+      IOException ex = null;
+      try {
+        super.close();
+      } catch (IOException e) {
+        ex = e;
+      }
+      try {
+        closeOutputStream();
+      } catch (IOException e) {
+        ex = ex == null ? e : ex;
+      }
+      try {
+        closeInputStream();
+      } catch (IOException e) {
+        ex = ex == null ? e : ex;
+      }
+      try {
+        spillSet.delete(path);
+      } catch (IOException e) {
+        ex = ex == null ? e : ex;
+      }
+      if (ex != null) {
+        throw ex;
+      }
+    }
+
+    private void closeInputStream() throws IOException {
+      if (inputStream == null) {
+        return;
+      }
+      long readLength = spillSet.getPosition(inputStream);
+      spillSet.tallyReadBytes(readLength);
+      inputStream.close();
+      inputStream = null;
+      logger.trace("Summary: Read {} bytes from {}", readLength, path);
+    }
+
+    public long closeOutputStream() throws IOException {
+      if (outputStream == null) {
+        return 0;
+      }
+      long posn = spillSet.getPosition(outputStream);
+      spillSet.tallyWriteBytes(posn);
+      outputStream.close();
+      outputStream = null;
+      logger.trace("Summary: Wrote {} bytes to {}", posn, path);
+      return posn;
+    }
+  }
+
+  protected VectorContainer currentContainer;
+  protected int pointer = 0;
+  protected OperatorContext context;
+  protected BatchSchema schema;
+  protected long dataSize;
+
+  public BatchGroup(VectorContainer container, OperatorContext context, long dataSize) {
+    this.currentContainer = container;
+    this.context = context;
+    this.dataSize = dataSize;
+  }
+
+  /**
+   * Updates the schema for this batch group. The current as well as any
+   * deserialized batches will be coerced to this schema.
+   * @param schema
+   */
+  public void setSchema(BatchSchema schema) {
+    currentContainer = SchemaUtil.coerceContainer(currentContainer, schema, context);
+    this.schema = schema;
+  }
+
+  public int getNextIndex() {
+    if (pointer == getRecordCount()) {
+      return -1;
+    }
+    int val = pointer++;
+    assert val < currentContainer.getRecordCount();
+    return val;
+  }
+
+  public VectorContainer getContainer() {
+    return currentContainer;
+  }
+
+  @Override
+  public void close() throws IOException {
+    currentContainer.zeroVectors();
+  }
+
+  @Override
+  public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids) {
+    return currentContainer.getValueAccessorById(clazz, ids);
+  }
+
+  @Override
+  public TypedFieldId getValueVectorId(SchemaPath path) {
+    return currentContainer.getValueVectorId(path);
+  }
+
+  @Override
+  public BatchSchema getSchema() {
+    return currentContainer.getSchema();
+  }
+
+  @Override
+  public int getRecordCount() {
+    return currentContainer.getRecordCount();
+  }
+
+  public int getUnfilteredRecordCount() {
+    return currentContainer.getRecordCount();
+  }
+
+  public long getDataSize() { return dataSize; }
+
+  @Override
+  public Iterator<VectorWrapper<?>> iterator() {
+    return currentContainer.iterator();
+  }
+
+  @Override
+  public SelectionVector2 getSelectionVector2() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public SelectionVector4 getSelectionVector4() {
+    throw new UnsupportedOperationException();
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/300e9349/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/CopierHolder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/CopierHolder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/CopierHolder.java
new file mode 100644
index 0000000..4fa520d
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/CopierHolder.java
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.xsort.managed;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.impl.xsort.managed.ExternalSortBatch.SortResults;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.vector.ValueVector;
+
+import com.google.common.base.Stopwatch;
+
+/**
+ * Manages a {@link PriorityQueueCopier} instance produced from code generation.
+ * Provides a wrapper around a copier "session" to simplify reading batches
+ * from the copier.
+ */
+
+public class CopierHolder {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CopierHolder.class);
+
+  private PriorityQueueCopier copier;
+
+  private final FragmentContext context;
+  private final BufferAllocator allocator;
+  private OperatorCodeGenerator opCodeGen;
+
+  public CopierHolder(FragmentContext context, BufferAllocator allocator, OperatorCodeGenerator opCodeGen) {
+    this.context = context;
+    this.allocator = allocator;
+    this.opCodeGen = opCodeGen;
+  }
+
+  /**
+   * Start a merge operation using a temporary vector container. Used for
+   * intermediate merges.
+   *
+   * @param schema
+   * @param batchGroupList
+   * @param targetRecordCount
+   * @return
+   */
+
+  public CopierHolder.BatchMerger startMerge(BatchSchema schema, List<? extends BatchGroup> batchGroupList, int targetRecordCount) {
+    return new BatchMerger(this, schema, batchGroupList, targetRecordCount);
+  }
+
+  /**
+   * Start a merge operation using the specified vector container. Used for
+   * the final merge operation.
+   *
+   * @param schema
+   * @param batchGroupList
+   * @param outputContainer
+   * @param targetRecordCount
+   * @return
+   */
+  public CopierHolder.BatchMerger startFinalMerge(BatchSchema schema, List<? extends BatchGroup> batchGroupList, VectorContainer outputContainer, int targetRecordCount) {
+    return new BatchMerger(this, schema, batchGroupList, outputContainer, targetRecordCount);
+  }
+
+  /**
+   * Prepare a copier which will write a collection of vectors to disk. The copier
+   * uses generated code to do the actual writes. If the copier has not yet been
+   * created, generate code and create it. If it has been created, close it and
+   * prepare it for a new collection of batches.
+   *
+   * @param batch the (hyper) batch of vectors to be copied
+   * @param batchGroupList same batches as above, but represented as a list
+   * of individual batches
+   * @param outputContainer the container into which to copy the batches
+   */
+
+  @SuppressWarnings("unchecked")
+  private void createCopier(VectorAccessible batch, List<? extends BatchGroup> batchGroupList, VectorContainer outputContainer) {
+    if (copier != null) {
+      opCodeGen.closeCopier();
+    } else {
+      copier = opCodeGen.getCopier(batch);
+    }
+
+    // Initialize the value vectors for the output container
+
+    for (VectorWrapper<?> i : batch) {
+      @SuppressWarnings("resource")
+      ValueVector v = TypeHelper.getNewVector(i.getField(), allocator);
+      outputContainer.add(v);
+    }
+    try {
+      copier.setup(context, allocator, batch, (List<BatchGroup>) batchGroupList, outputContainer);
+    } catch (SchemaChangeException e) {
+      throw UserException.unsupportedError(e)
+            .message("Unexpected schema change - likely code error.")
+            .build(logger);
+    }
+  }
+
+  public BufferAllocator getAllocator() { return allocator; }
+
+  public void close() {
+    opCodeGen.closeCopier();
+    copier = null;
+  }
+
+  /**
+   * We've gathered a set of batches, each of which has been sorted. The batches
+   * may have passed through a filter and thus may have "holes" where rows have
+   * been filtered out. We will spill records in blocks of targetRecordCount.
+   * To prepare, copy that many records into an outputContainer as a set of
+   * contiguous values in new vectors. The result is a single batch with
+   * vectors that combine a collection of input batches up to the
+   * given threshold.
+   * <p>
+   * Input. Here the top line is a selection vector of indexes.
+   * The second line is a set of batch groups (separated by underscores)
+   * with letters indicating individual records:<pre>
+   * [3 7 4 8 0 6 1] [5 3 6 8 2 0]
+   * [eh_ad_ibf]     [r_qm_kn_p]</pre>
+   * <p>
+   * Output, assuming blocks of 5 records. The brackets represent
+   * batches, the line represents the set of batches copied to the
+   * spill file.<pre>
+   * [abcde] [fhikm] [npqr]</pre>
+   * <p>
+   * The copying operation does a merge as well: copying
+   * values from the sources in ordered fashion. Consider a different example,
+   * we want to merge two input batches to produce a single output batch:
+   * <pre>
+   * Input:  [aceg] [bdfh]
+   * Output: [abcdefgh]</pre>
+   * <p>
+   * In the above, the input consists of two sorted batches. (In reality,
+   * the input batches have an associated selection vector, but that is omitted
+   * here and just the sorted values shown.) The output is a single batch
+   * with the merged records (indicated by letters) from the two input batches.
+   * <p>
+   * Here we bind the copier to the batchGroupList of sorted, buffered batches
+   * to be merged. We bind the copier output to outputContainer: the copier will write its
+   * merged "batches" of records to that container.
+   * <p>
+   * Calls to the {@link #next()} method sequentially return merged batches
+   * of the desired row count.
+    */
+
+  public static class BatchMerger implements SortResults, AutoCloseable {
+
+    private CopierHolder holder;
+    private VectorContainer hyperBatch;
+    private VectorContainer outputContainer;
+    private int targetRecordCount;
+    private int copyCount;
+    private int batchCount;
+
+    /**
+     * Creates a merger with an temporary output container.
+     *
+     * @param holder the copier that does the work
+     * @param schema schema for the input and output batches
+     * @param batchGroupList the input batches
+     * @param targetRecordCount number of records for each output batch
+     */
+    private BatchMerger(CopierHolder holder, BatchSchema schema, List<? extends BatchGroup> batchGroupList,
+                        int targetRecordCount) {
+      this(holder, schema, batchGroupList, new VectorContainer(), targetRecordCount);
+    }
+
+    /**
+     * Creates a merger with the specified output container
+     *
+     * @param holder the copier that does the work
+     * @param schema schema for the input and output batches
+     * @param batchGroupList the input batches
+     * @param outputContainer merges output batch into the given output container
+     * @param targetRecordCount number of records for each output batch
+     */
+    private BatchMerger(CopierHolder holder, BatchSchema schema, List<? extends BatchGroup> batchGroupList,
+                        VectorContainer outputContainer, int targetRecordCount) {
+      this.holder = holder;
+      hyperBatch = constructHyperBatch(schema, batchGroupList);
+      copyCount = 0;
+      this.targetRecordCount = targetRecordCount;
+      this.outputContainer = outputContainer;
+      holder.createCopier(hyperBatch, batchGroupList, outputContainer);
+    }
+
+    /**
+     * Return the output container.
+     *
+     * @return the output container
+     */
+    public VectorContainer getOutput() {
+      return outputContainer;
+    }
+
+    /**
+     * Read the next merged batch. The batch holds the specified row count, but
+     * may be less if this is the last batch.
+     *
+     * @return the number of rows in the batch, or 0 if no more batches
+     * are available
+     */
+
+    @Override
+    public boolean next() {
+      Stopwatch w = Stopwatch.createStarted();
+      int count = holder.copier.next(targetRecordCount);
+      copyCount += count;
+      if (count > 0) {
+        long t = w.elapsed(TimeUnit.MICROSECONDS);
+        logger.trace("Took {} us to merge {} records", t, count);
+      } else {
+        logger.trace("copier returned 0 records");
+      }
+      batchCount++;
+
+      // Identify the schema to be used in the output container. (Since
+      // all merged batches have the same schema, the schema we identify
+      // here should be the same as that which we already had.
+
+      outputContainer.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+
+      // The copier does not set the record count in the output
+      // container, so do that here.
+
+      outputContainer.setRecordCount(count);
+
+      return count > 0;
+    }
+
+    /**
+     * Construct a vector container that holds a list of batches, each represented as an
+     * array of vectors. The entire collection of vectors has a common schema.
+     * <p>
+     * To build the collection, we go through the current schema (which has been
+     * devised to be common for all batches.) For each field in the schema, we create
+     * an array of vectors. To create the elements, we iterate over all the incoming
+     * batches and search for the vector that matches the current column.
+     * <p>
+     * Finally, we build a new schema for the combined container. That new schema must,
+     * because of the way the container was created, match the current schema.
+     *
+     * @param schema schema for the hyper batch
+     * @param batchGroupList list of batches to combine
+     * @return a container where each column is represented as an array of vectors
+     * (hence the "hyper" in the method name)
+     */
+
+    private VectorContainer constructHyperBatch(BatchSchema schema, List<? extends BatchGroup> batchGroupList) {
+      VectorContainer cont = new VectorContainer();
+      for (MaterializedField field : schema) {
+        ValueVector[] vectors = new ValueVector[batchGroupList.size()];
+        int i = 0;
+        for (BatchGroup group : batchGroupList) {
+          vectors[i++] = group.getValueAccessorById(
+              field.getValueClass(),
+              group.getValueVectorId(SchemaPath.getSimplePath(field.getPath())).getFieldIds())
+              .getValueVector();
+        }
+        cont.add(vectors);
+      }
+      cont.buildSchema(BatchSchema.SelectionVectorMode.FOUR_BYTE);
+      return cont;
+    }
+
+    @Override
+    public void close() {
+      hyperBatch.clear();
+      holder.close();
+    }
+
+    @Override
+    public int getRecordCount() {
+      return copyCount;
+    }
+
+    @Override
+    public int getBatchCount() {
+      return batchCount;
+    }
+  }
+}


Mime
View raw message