drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amansi...@apache.org
Subject [2/3] drill git commit: DRILL-5080: Memory-managed version of external sort
Date Mon, 13 Feb 2017 03:50:41 GMT
http://git-wip-us.apache.org/repos/asf/drill/blob/300e9349/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
new file mode 100644
index 0000000..783865c
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
@@ -0,0 +1,1456 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.xsort.managed;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.MetricDef;
+import org.apache.drill.exec.physical.config.ExternalSort;
+import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
+import org.apache.drill.exec.physical.impl.spill.RecordBatchSizer;
+import org.apache.drill.exec.physical.impl.spill.SpillSet;
+import org.apache.drill.exec.physical.impl.xsort.MSortTemplate;
+import org.apache.drill.exec.physical.impl.xsort.SingleBatchSorter;
+import org.apache.drill.exec.physical.impl.xsort.managed.BatchGroup.InputBatch;
+import org.apache.drill.exec.record.AbstractRecordBatch;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.SchemaUtil;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.record.WritableBatch;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+import org.apache.drill.exec.testing.ControlsInjector;
+import org.apache.drill.exec.testing.ControlsInjectorFactory;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.complex.AbstractContainerVector;
+
+import com.google.common.collect.Lists;
+
+/**
+ * External sort batch: a sort batch which can spill to disk in
+ * order to operate within a defined memory footprint.
+ * <p>
+ * <h4>Basic Operation</h4>
+ * The operator has three key phases:
+ * <p>
+ * <ul>
+ * <li>The load phase in which batches are read from upstream.</li>
+ * <li>The merge phase in which spilled batches are combined to
+ * reduce the number of files below the configured limit. (Best
+ * practice is to configure the system to avoid this phase.)
+ * <li>The delivery phase in which batches are combined to produce
+ * the final output.</li>
+ * </ul>
+ * During the load phase:
+ * <p>
+ * <ul>
+ * <li>The incoming (upstream) operator provides a series of batches.</li>
+ * <li>This operator sorts each batch, and accumulates them in an in-memory
+ * buffer.</li>
+ * <li>If the in-memory buffer becomes too large, this operator selects
+ * a subset of the buffered batches to spill.</li>
+ * <li>Each spill set is merged to create a new, sorted collection of
+ * batches, and each is spilled to disk.</li>
+ * <li>To allow the use of multiple disk storage, each spill group is written
+ * round-robin to a set of spill directories.</li>
+ * </ul>
+ * <p>
+ * Data is spilled to disk as a "run". A run consists of one or more (typically
+ * many) batches, each of which is itself a sorted run of records.
+ * <p>
+ * During the sort/merge phase:
+ * <p>
+ * <ul>
+ * <li>When the input operator is complete, this operator merges the accumulated
+ * batches (which may be all in memory or partially on disk), and returns
+ * them to the output (downstream) operator in chunks of no more than
+ * 64K records.</li>
+ * <li>The final merge must combine a collection of in-memory and spilled
+ * batches. Several limits apply to the maximum "width" of this merge. For
+ * example, each open spill run consumes a file handle, and we may wish
+ * to limit the number of file handles. Further, memory must hold one batch
+ * from each run, so we may need to reduce the number of runs so that the
+ * remaining runs can fit into memory. A consolidation phase combines
+ * in-memory and spilled batches prior to the final merge to control final
+ * merge width.</li>
+ * <li>A special case occurs if no batches were spilled. In this case, the input
+ * batches are sorted in memory without merging.</li>
+ * </ul>
+ * <p>
+ * Many complex details are involved in doing the above; the details are explained
+ * in the methods of this class.
+ * <p>
+ * <h4>Configuration Options</h4>
+ * <dl>
+ * <dt>drill.exec.sort.external.spill.fs</dt>
+ * <dd>The file system (file://, hdfs://, etc.) of the spill directory.</dd>
+ * <dt>drill.exec.sort.external.spill.directories</dt>
+ * <dd>The comma delimited list of directories, on the above file
+ * system, to which to spill files in round-robin fashion. The query will
+ * fail if any one of the directories becomes full.</dt>
+ * <dt>drill.exec.sort.external.spill.file_size</dt>
+ * <dd>Target size for first-generation spill files Set this to large
+ * enough to get nice long writes, but not so large that spill directories
+ * are overwhelmed.</dd>
+ * <dt>drill.exec.sort.external.mem_limit</dt>
+ * <dd>Maximum memory to use for the in-memory buffer. (Primarily for testing.)</dd>
+ * <dt>drill.exec.sort.external.batch_limit</dt>
+ * <dd>Maximum number of batches to hold in memory. (Primarily for testing.)</dd>
+ * <dt>drill.exec.sort.external.spill.max_count</dt>
+ * <dd>Maximum number of batches to add to “first generation” files.
+ * Defaults to 0 (no limit). (Primarily for testing.)</dd>
+ * <dt>drill.exec.sort.external.spill.min_count</dt>
+ * <dd>Minimum number of batches to add to “first generation” files.
+ * Defaults to 0 (no limit). (Primarily for testing.)</dd>
+ * <dt>drill.exec.sort.external.merge_limit</dt>
+ * <dd>Sets the maximum number of runs to be merged in a single pass (limits
+ * the number of open files.)</dd>
+ * </dl>
+ * <p>
+ * The memory limit observed by this operator is the lesser of:
+ * <ul>
+ * <li>The maximum allocation allowed the allocator assigned to this batch
+ * as set by the Foreman, or</li>
+ * <li>The maximum limit configured in the mem_limit parameter above. (Primarily for
+ * testing.</li>
+ * </ul>
+ * <h4>Output</h4>
+ * It is helpful to note that the sort operator will produce one of two kinds of
+ * output batches.
+ * <ul>
+ * <li>A large output with sv4 if data is sorted in memory. The sv4 addresses
+ * the entire in-memory sort set. A selection vector remover will copy results
+ * into new batches of a size determined by that operator.</li>
+ * <li>A series of batches, without a selection vector, if the sort spills to
+ * disk. In this case, the downstream operator will still be a selection vector
+ * remover, but there is nothing for that operator to remove. Each batch is
+ * of the size set by {@link #MAX_MERGED_BATCH_SIZE}.</li>
+ * </ul>
+ * Note that, even in the in-memory sort case, this operator could do the copying
+ * to eliminate the extra selection vector remover. That is left as an exercise
+ * for another time.
+ * <h4>Logging</h4>
+ * Logging in this operator serves two purposes:
+ * <li>
+ * <ul>
+ * <li>Normal diagnostic information.</li>
+ * <li>Capturing the essence of the operator functionality for analysis in unit
+ * tests.</li>
+ * </ul>
+ * Test logging is designed to capture key events and timings. Take care
+ * when changing or removing log messages as you may need to adjust unit tests
+ * accordingly.
+ */
+
+public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExternalSortBatch.class);
+  protected static final ControlsInjector injector = ControlsInjectorFactory.getInjector(ExternalSortBatch.class);
+
+  /**
+   * Smallest allowed output batch size. The smallest output batch
+   * created even under constrained memory conditions.
+   */
+  private static final int MIN_MERGED_BATCH_SIZE = 256 * 1024;
+
+  /**
+   * The preferred amount of memory to set aside to output batches
+   * expressed as a ratio of available memory.
+   */
+
+  private static final float MERGE_BATCH_ALLOWANCE = 0.10F;
+
+  /**
+   * In the bizarre case where the user gave us an unrealistically low
+   * spill file size, set a floor at some bare minimum size. (Note that,
+   * at this size, big queries will create a huge number of files, which
+   * is why the configuration default is one the order of hundreds of MB.)
+   */
+
+  private static final long MIN_SPILL_FILE_SIZE = 1 * 1024 * 1024;
+
+  public static final String INTERRUPTION_AFTER_SORT = "after-sort";
+  public static final String INTERRUPTION_AFTER_SETUP = "after-setup";
+  public static final String INTERRUPTION_WHILE_SPILLING = "spilling";
+  public static final long DEFAULT_SPILL_BATCH_SIZE = 8L * 1024 * 1024;
+  public static final long MIN_SPILL_BATCH_SIZE = 256 * 1024;
+
+  private final RecordBatch incoming;
+
+  /**
+   * Memory allocator for this operator itself. Incoming batches are
+   * transferred into this allocator. Intermediate batches used during
+   * merge also reside here.
+   */
+
+  private final BufferAllocator allocator;
+
+  /**
+   * Schema of batches that this operator produces.
+   */
+
+  private BatchSchema schema;
+
+  private LinkedList<BatchGroup.InputBatch> bufferedBatches = Lists.newLinkedList();
+  private LinkedList<BatchGroup.SpilledRun> spilledRuns = Lists.newLinkedList();
+  private SelectionVector4 sv4;
+
+  /**
+   * The number of records to add to each output batch sent to the
+   * downstream operator or spilled to disk.
+   */
+
+  private int mergeBatchRowCount;
+  private int peakNumBatches = -1;
+
+  private long memoryLimit;
+
+  /**
+   * Iterates over the final, sorted results.
+   */
+
+  private SortResults resultsIterator;
+
+  /**
+   * Manages the set of spill directories and files.
+   */
+
+  private final SpillSet spillSet;
+
+  /**
+   * Manages the copier used to merge a collection of batches into
+   * a new set of batches.
+   */
+
+  private final CopierHolder copierHolder;
+
+  private enum SortState { START, LOAD, DELIVER, DONE }
+  private SortState sortState = SortState.START;
+  private int inputRecordCount = 0;
+  private int inputBatchCount = 0; // total number of batches received so far
+  private final OperatorCodeGenerator opCodeGen;
+
+  /**
+   * Estimated size of the records for this query, updated on each
+   * new batch received from upstream.
+   */
+
+  private int estimatedRowWidth;
+
+  /**
+   * Size of the merge batches that this operator produces. Generally
+   * the same as the merge batch size, unless low memory forces a smaller
+   * value.
+   */
+
+  private long targetMergeBatchSize;
+
+  /**
+   * Estimate of the input batch size based on the largest batch seen
+   * thus far.
+   */
+  private long estimatedInputBatchSize;
+
+  /**
+   * Maximum number of batches to hold in memory.
+   * (Primarily for testing.)
+   */
+
+  private int bufferedBatchLimit;
+  private int mergeLimit;
+  private long spillFileSize;
+  private long minimumBufferSpace;
+
+  /**
+   * Minimum memory level before spilling occurs. That is, we can buffer input
+   * batches in memory until we are down to the level given by the spill point.
+   */
+
+  private long spillPoint;
+  private long mergeMemoryPool;
+  private long preferredMergeBatchSize;
+  private long bufferMemoryPool;
+  private boolean hasOversizeCols;
+  private long totalInputBytes;
+  private Long spillBatchSize;
+  private int maxDensity;
+
+  /**
+   * Estimated number of rows that fit into a single spill batch.
+   */
+
+  private int spillBatchRowCount;
+
+  // WARNING: The enum here is used within this class. But, the members of
+  // this enum MUST match those in the (unmanaged) ExternalSortBatch since
+  // that is the enum used in the UI to display metrics for the query profile.
+
+  public enum Metric implements MetricDef {
+    SPILL_COUNT,            // number of times operator spilled to disk
+    RETIRED1,               // Was: peak value for totalSizeInMemory
+                            // But operator already provides this value
+    PEAK_BATCHES_IN_MEMORY, // maximum number of batches kept in memory
+    MERGE_COUNT,            // Number of second+ generation merges
+    MIN_BUFFER,             // Minimum memory level observed in operation.
+    SPILL_MB;               // Number of MB of data spilled to disk. This
+                            // amount is first written, then later re-read.
+                            // So, disk I/O is twice this amount.
+
+    @Override
+    public int metricId() {
+      return ordinal();
+    }
+  }
+
+  /**
+   * Iterates over the final sorted results. Implemented differently
+   * depending on whether the results are in-memory or spilled to
+   * disk.
+   */
+
+  public interface SortResults {
+    boolean next();
+    void close();
+    int getBatchCount();
+    int getRecordCount();
+  }
+
+  public ExternalSortBatch(ExternalSort popConfig, FragmentContext context, RecordBatch incoming) {
+    super(popConfig, context, true);
+    this.incoming = incoming;
+    allocator = oContext.getAllocator();
+    opCodeGen = new OperatorCodeGenerator(context, popConfig);
+
+    spillSet = new SpillSet(context, popConfig);
+    copierHolder = new CopierHolder(context, allocator, opCodeGen);
+    configure(context.getConfig());
+  }
+
+  private void configure(DrillConfig config) {
+
+    // The maximum memory this operator can use as set by the
+    // operator definition (propagated to the allocator.)
+
+    memoryLimit = allocator.getLimit();
+
+    // Optional configured memory limit, typically used only for testing.
+
+    long configLimit = config.getBytes(ExecConstants.EXTERNAL_SORT_MAX_MEMORY);
+    if (configLimit > 0) {
+      memoryLimit = Math.min(memoryLimit, configLimit);
+    }
+
+    // Optional limit on the number of buffered in-memory batches.
+    // 0 means no limit. Used primarily for testing. Must allow at least two
+    // batches or no merging can occur.
+
+    bufferedBatchLimit = getConfigLimit(config, ExecConstants.EXTERNAL_SORT_BATCH_LIMIT, Integer.MAX_VALUE, 2);
+
+    // Optional limit on the number of spilled runs to merge in a single
+    // pass. Limits the number of open file handles. Must allow at least
+    // two batches to merge to make progress.
+
+    mergeLimit = getConfigLimit(config, ExecConstants.EXTERNAL_SORT_MERGE_LIMIT, Integer.MAX_VALUE, 2);
+
+    // Limits the size of first-generation spill files.
+
+    spillFileSize = config.getBytes(ExecConstants.EXTERNAL_SORT_SPILL_FILE_SIZE);
+
+    // Ensure the size is reasonable.
+
+    spillFileSize = Math.max(spillFileSize, MIN_SPILL_FILE_SIZE);
+
+    // The spill batch size. This is a critical setting for performance.
+    // Set too large and the ratio between memory and input data sizes becomes
+    // small. Set too small and disk seek times dominate performance.
+
+    spillBatchSize = config.getBytes(ExecConstants.EXTERNAL_SORT_SPILL_BATCH_SIZE);
+    spillBatchSize = Math.max(spillBatchSize, MIN_SPILL_BATCH_SIZE);
+
+    // Set the target output batch size. Use the maximum size, but only if
+    // this represents less than 10% of available memory. Otherwise, use 10%
+    // of memory, but no smaller than the minimum size. In any event, an
+    // output batch can contain no fewer than a single record.
+
+    preferredMergeBatchSize = config.getBytes(ExecConstants.EXTERNAL_SORT_MERGE_BATCH_SIZE);
+    long maxAllowance = (long) (memoryLimit * MERGE_BATCH_ALLOWANCE);
+    preferredMergeBatchSize = Math.min(maxAllowance, preferredMergeBatchSize);
+    preferredMergeBatchSize = Math.max(preferredMergeBatchSize, MIN_MERGED_BATCH_SIZE);
+
+    logger.debug("Config: memory limit = {}, batch limit = {}, " +
+                 "spill file size = {}, batch size = {}, merge limit = {}, merge batch size = {}",
+                  memoryLimit, bufferedBatchLimit, spillFileSize, spillBatchSize, mergeLimit,
+                  preferredMergeBatchSize);
+  }
+
+  private int getConfigLimit(DrillConfig config, String paramName, int valueIfZero, int minValue) {
+    int limit = config.getInt(paramName);
+    if (limit > 0) {
+      limit = Math.max(limit, minValue);
+    } else {
+      limit = valueIfZero;
+    }
+    return limit;
+  }
+
+  @Override
+  public int getRecordCount() {
+    if (sv4 != null) {
+      return sv4.getCount();
+    }
+    return container.getRecordCount();
+  }
+
+  @Override
+  public SelectionVector4 getSelectionVector4() {
+    return sv4;
+  }
+
+  private void closeBatchGroups(Collection<? extends BatchGroup> groups) {
+    for (BatchGroup group: groups) {
+      try {
+        group.close();
+      } catch (Exception e) {
+        // collect all failure and make sure to cleanup all remaining batches
+        // Originally we would have thrown a RuntimeException that would propagate to FragmentExecutor.closeOutResources()
+        // where it would have been passed to context.fail()
+        // passing the exception directly to context.fail(e) will let the cleanup process continue instead of stopping
+        // right away, this will also make sure we collect any additional exception we may get while cleaning up
+        context.fail(e);
+      }
+    }
+  }
+
+  /**
+   * Called by {@link AbstractRecordBatch} as a fast-path to obtain
+   * the first record batch and setup the schema of this batch in order
+   * to quickly return the schema to the client. Note that this method
+   * fetches the first batch from upstream which will be waiting for
+   * us the first time that {@link #innerNext()} is called.
+   */
+
+  @Override
+  public void buildSchema() {
+    IterOutcome outcome = next(incoming);
+    switch (outcome) {
+      case OK:
+      case OK_NEW_SCHEMA:
+        for (VectorWrapper<?> w : incoming) {
+          @SuppressWarnings("resource")
+          ValueVector v = container.addOrGet(w.getField());
+          if (v instanceof AbstractContainerVector) {
+            w.getValueVector().makeTransferPair(v); // Can we remove this hack?
+            v.clear();
+          }
+          v.allocateNew(); // Can we remove this? - SVR fails with NPE (TODO)
+        }
+        container.buildSchema(SelectionVectorMode.NONE);
+        container.setRecordCount(0);
+        break;
+      case STOP:
+        state = BatchState.STOP;
+        break;
+      case OUT_OF_MEMORY:
+        state = BatchState.OUT_OF_MEMORY;
+        break;
+      case NONE:
+        state = BatchState.DONE;
+        break;
+      default:
+        throw new IllegalStateException("Unexpected iter outcome: " + outcome);
+    }
+  }
+
+  /**
+   * Process each request for a batch. The first request retrieves
+   * all the incoming batches and sorts them, optionally spilling to
+   * disk as needed. Subsequent calls retrieve the sorted results in
+   * fixed-size batches.
+   */
+
+  @Override
+  public IterOutcome innerNext() {
+    switch (sortState) {
+    case DONE:
+      return IterOutcome.NONE;
+    case START:
+    case LOAD:
+      return load();
+    case DELIVER:
+      return nextOutputBatch();
+    default:
+      throw new IllegalStateException("Unexpected sort state: " + sortState);
+    }
+  }
+
+  private IterOutcome nextOutputBatch() {
+    if (resultsIterator.next()) {
+      return IterOutcome.OK;
+    } else {
+      logger.trace("Deliver phase complete: Returned {} batches, {} records",
+                    resultsIterator.getBatchCount(), resultsIterator.getRecordCount());
+      sortState = SortState.DONE;
+      return IterOutcome.NONE;
+    }
+  }
+
+  /**
+   * Load and process a single batch, handling schema changes. In general, the
+   * external sort accepts only one schema.
+   *
+   * @return return code depending on the amount of data read from upstream
+   */
+
+  private IterOutcome loadBatch() {
+
+    // If this is the very first batch, then AbstractRecordBatch
+    // already loaded it for us in buildSchema().
+
+    IterOutcome upstream;
+    if (sortState == SortState.START) {
+      sortState = SortState.LOAD;
+      upstream = IterOutcome.OK_NEW_SCHEMA;
+    } else {
+      upstream = next(incoming);
+    }
+    switch (upstream) {
+    case NONE:
+    case STOP:
+      return upstream;
+    case OK_NEW_SCHEMA:
+    case OK:
+      setupSchema(upstream);
+
+      // Add the batch to the in-memory generation, spilling if
+      // needed.
+
+      processBatch();
+      break;
+    case OUT_OF_MEMORY:
+
+      // Note: it is highly doubtful that this code actually works. It
+      // requires that the upstream batches got to a safe place to run
+      // out of memory and that no work as in-flight and thus abandoned.
+      // Consider removing this case once resource management is in place.
+
+      logger.debug("received OUT_OF_MEMORY, trying to spill");
+      if (bufferedBatches.size() > 2) {
+        spillFromMemory();
+      } else {
+        logger.debug("not enough batches to spill, sending OUT_OF_MEMORY downstream");
+        return IterOutcome.OUT_OF_MEMORY;
+      }
+      break;
+    default:
+      throw new IllegalStateException("Unexpected iter outcome: " + upstream);
+    }
+    return IterOutcome.OK;
+  }
+
+  /**
+   * Load the results and sort them. May bail out early if an exceptional
+   * condition is passed up from the input batch.
+   *
+   * @return return code: OK_NEW_SCHEMA if rows were sorted,
+   * NONE if no rows
+   */
+
+  private IterOutcome load() {
+    logger.trace("Start of load phase");
+
+    // Clear the temporary container created by
+    // buildSchema().
+
+    container.clear();
+
+    // Loop over all input batches
+
+    for (;;) {
+      IterOutcome result = loadBatch();
+
+      // None means all batches have been read.
+
+      if (result == IterOutcome.NONE) {
+        break; }
+
+      // Any outcome other than OK means something went wrong.
+
+      if (result != IterOutcome.OK) {
+        return result; }
+    }
+
+    // Anything to actually sort?
+
+    if (inputRecordCount == 0) {
+      sortState = SortState.DONE;
+      return IterOutcome.NONE;
+    }
+    logger.debug("Completed load phase: read {} batches, spilled {} times, total input bytes: {}",
+                 inputBatchCount, spilledRuns.size(), totalInputBytes);
+
+    // Do the merge of the loaded batches. The merge can be done entirely in memory if
+    // the results fit; else we have to do a disk-based merge of
+    // pre-sorted spilled batches.
+
+    if (canUseMemoryMerge()) {
+      return sortInMemory();
+    } else {
+      return mergeSpilledRuns();
+    }
+  }
+
+  /**
+   * All data has been read from the upstream batch. Determine if we
+   * can use a fast in-memory sort, or must use a merge (which typically,
+   * but not always, involves spilled batches.)
+   *
+   * @return whether sufficient resources exist to do an in-memory sort
+   * if all batches are still in memory
+   */
+
+  private boolean canUseMemoryMerge() {
+    if (spillSet.hasSpilled()) { return false; }
+
+    // Do we have enough memory for MSorter (the in-memory sorter)?
+
+    long allocMem = allocator.getAllocatedMemory();
+    long availableMem = memoryLimit - allocMem;
+    long neededForInMemorySort = MSortTemplate.memoryNeeded(inputRecordCount);
+    if (availableMem < neededForInMemorySort) { return false; }
+
+    // Make sure we don't exceed the maximum number of batches SV4 can address.
+
+    if (bufferedBatches.size() > Character.MAX_VALUE) { return false; }
+
+    // We can do an in-memory merge.
+
+    return true;
+  }
+
+  /**
+   * Handle a new schema from upstream. The ESB is quite limited in its ability
+   * to handle schema changes.
+   *
+   * @param upstream the status code from upstream: either OK or OK_NEW_SCHEMA
+   */
+
+  private void setupSchema(IterOutcome upstream)  {
+
+    // First batch: we won't have a schema.
+
+    if (schema == null) {
+      schema = incoming.getSchema();
+
+    // Subsequent batches, nothing to do if same schema.
+
+    } else if (upstream == IterOutcome.OK) {
+      return;
+
+    // Only change in the case that the schema truly changes. Artificial schema changes are ignored.
+
+    } else if (incoming.getSchema().equals(schema)) {
+      return;
+    } else if (unionTypeEnabled) {
+        schema = SchemaUtil.mergeSchemas(schema, incoming.getSchema());
+
+        // New schema: must generate a new sorter and copier.
+
+        opCodeGen.setSchema(schema);
+    } else {
+      throw UserException.unsupportedError()
+            .message("Schema changes not supported in External Sort. Please enable Union type.")
+            .build(logger);
+    }
+
+    // Coerce all existing batches to the new schema.
+
+    for (BatchGroup b : bufferedBatches) {
+//      System.out.println("Before: " + allocator.getAllocatedMemory()); // Debug only
+      b.setSchema(schema);
+//      System.out.println("After: " + allocator.getAllocatedMemory()); // Debug only
+    }
+    for (BatchGroup b : spilledRuns) {
+      b.setSchema(schema);
+    }
+  }
+
+  /**
+   * Convert an incoming batch into the agree-upon format. (Also seems to
+   * make a persistent shallow copy of the batch saved until we are ready
+   * to sort or spill.)
+   *
+   * @return the converted batch, or null if the incoming batch is empty
+   */
+
+  private VectorContainer convertBatch() {
+
+    // Must accept the batch even if no records. Then clear
+    // the vectors to release memory since we won't do any
+    // further processing with the empty batch.
+
+    VectorContainer convertedBatch = SchemaUtil.coerceContainer(incoming, schema, oContext);
+    if (incoming.getRecordCount() == 0) {
+      for (VectorWrapper<?> w : convertedBatch) {
+        w.clear();
+      }
+      return null;
+    }
+    return convertedBatch;
+  }
+
+  private SelectionVector2 makeSelectionVector() {
+    if (incoming.getSchema().getSelectionVectorMode() == BatchSchema.SelectionVectorMode.TWO_BYTE) {
+      return incoming.getSelectionVector2().clone();
+    } else {
+      return newSV2();
+    }
+  }
+
+  /**
+   * Process the converted incoming batch by adding it to the in-memory store
+   * of data, or spilling data to disk when necessary.
+   */
+
+  @SuppressWarnings("resource")
+  private void processBatch() {
+
+    // Skip empty batches (such as the first one.)
+
+    if (incoming.getRecordCount() == 0) {
+      return;
+    }
+
+    // Determine actual sizes of the incoming batch before taking
+    // ownership. Allows us to figure out if we need to spill first,
+    // to avoid overflowing memory simply due to ownership transfer.
+
+    RecordBatchSizer sizer = analyzeIncomingBatch();
+
+    // The heart of the external sort operator: spill to disk when
+    // the in-memory generation exceeds the allowed memory limit.
+    // Preemptively spill BEFORE accepting the new batch into our memory
+    // pool. The allocator will throw an OOM exception if we accept the
+    // batch when we are near the limit - despite the fact that the batch
+    // is already in memory and no new memory is allocated during the transfer.
+
+    if ( isSpillNeeded(sizer.actualSize())) {
+      spillFromMemory();
+    }
+
+    // Sanity check. We should now be above the spill point.
+
+    long startMem = allocator.getAllocatedMemory();
+    if (memoryLimit - startMem < spillPoint) {
+      logger.error( "ERROR: Failed to spill below the spill point. Spill point = {}, free memory = {}",
+                    spillPoint, memoryLimit - startMem);
+    }
+
+    // Convert the incoming batch to the agreed-upon schema.
+    // No converted batch means we got an empty input batch.
+    // Converting the batch transfers memory ownership to our
+    // allocator. This gives a round-about way to learn the batch
+    // size: check the before and after memory levels, then use
+    // the difference as the batch size, in bytes.
+
+    VectorContainer convertedBatch = convertBatch();
+    if (convertedBatch == null) {
+      return;
+    }
+
+    SelectionVector2 sv2 = makeSelectionVector();
+
+    // Compute batch size, including allocation of an sv2.
+
+    long endMem = allocator.getAllocatedMemory();
+    long batchSize = endMem - startMem;
+    int count = sv2.getCount();
+    inputRecordCount += count;
+    inputBatchCount++;
+    totalInputBytes += sizer.actualSize();
+
+    // Update the minimum buffer space metric.
+
+    if (minimumBufferSpace == 0) {
+      minimumBufferSpace = endMem;
+    } else {
+      minimumBufferSpace = Math.min(minimumBufferSpace, endMem);
+    }
+    stats.setLongStat(Metric.MIN_BUFFER, minimumBufferSpace);
+
+    // Update the size based on the actual record count, not
+    // the effective count as given by the selection vector
+    // (which may exclude some records due to filtering.)
+
+    updateMemoryEstimates(batchSize, sizer);
+
+    // Sort the incoming batch using either the original selection vector,
+    // or a new one created here.
+
+    SingleBatchSorter sorter;
+    sorter = opCodeGen.getSorter(convertedBatch);
+    try {
+      sorter.setup(context, sv2, convertedBatch);
+    } catch (SchemaChangeException e) {
+      convertedBatch.clear();
+      throw UserException.unsupportedError(e)
+            .message("Unexpected schema change.")
+            .build(logger);
+    }
+    try {
+      sorter.sort(sv2);
+    } catch (SchemaChangeException e) {
+      convertedBatch.clear();
+      throw UserException.unsupportedError(e)
+                .message("Unexpected schema change.")
+                .build(logger);
+    }
+    RecordBatchData rbd = new RecordBatchData(convertedBatch, allocator);
+    try {
+      rbd.setSv2(sv2);
+      bufferedBatches.add(new BatchGroup.InputBatch(rbd.getContainer(), rbd.getSv2(), oContext, batchSize));
+      if (peakNumBatches < bufferedBatches.size()) {
+        peakNumBatches = bufferedBatches.size();
+        stats.setLongStat(Metric.PEAK_BATCHES_IN_MEMORY, peakNumBatches);
+      }
+
+    } catch (Throwable t) {
+      rbd.clear();
+      throw t;
+    }
+  }
+
+  /**
+   * Scan the vectors in the incoming batch to determine batch size and if
+   * any oversize columns exist. (Oversize columns cause memory fragmentation.)
+   *
+   * @return an analysis of the incoming batch
+   */
+
+  private RecordBatchSizer analyzeIncomingBatch() {
+    RecordBatchSizer sizer = new RecordBatchSizer(incoming);
+    sizer.applySv2();
+    if (! hasOversizeCols) {
+      hasOversizeCols = sizer.checkOversizeCols();
+    }
+    if (inputBatchCount == 0) {
+      logger.debug("{}", sizer.toString());
+    }
+    return sizer;
+  }
+
+  /**
+   * Update the data-driven memory use numbers including:
+   * <ul>
+   * <li>The average size of incoming records.</li>
+   * <li>The estimated spill and output batch size.</li>
+   * <li>The estimated number of average-size records per
+   * spill and output batch.</li>
+   * <li>The amount of memory set aside to hold the incoming
+   * batches before spilling starts.</li>
+   * </ul>
+   *
+   * @param actualBatchSize the overall size of the current batch received from
+   * upstream
+   * @param actualRecordCount the number of actual (not filtered) records in
+   * that upstream batch
+   */
+
+  private void updateMemoryEstimates(long memoryDelta, RecordBatchSizer sizer) {
+    long actualBatchSize = sizer.actualSize();
+    int actualRecordCount = sizer.rowCount();
+
+    if (actualBatchSize < memoryDelta) {
+      logger.debug("Memory delta: {}, actual batch size: {}, Diff: {}",
+                   memoryDelta, actualBatchSize, memoryDelta - actualBatchSize);
+    }
+
+    // The record count should never be zero, but better safe than sorry...
+
+    if (actualRecordCount == 0) {
+      return; }
+
+    // If the vector is less than 75% full, just ignore it, except in the
+    // unfortunate case where it is the first batch. Low-density batches generally
+    // occur only at the end of a file or at the end of a DFS block. In such a
+    // case, we will continue to rely on estimates created on previous, high-
+    // density batches.
+    // We actually track the max density seen, and compare to 75% of that since
+    // Parquet produces very low density record batches.
+
+    if (sizer.getAvgDensity() < maxDensity * 0.75) {
+      logger.debug("Saw low density batch. Density: {}", sizer.getAvgDensity());
+      return;
+    }
+    maxDensity = Math.max(maxDensity, sizer.getAvgDensity());
+
+    // We know the batch size and number of records. Use that to estimate
+    // the average record size. Since a typical batch has many records,
+    // the average size is a fairly good estimator. Note that the batch
+    // size includes not just the actual vector data, but any unused space
+    // resulting from power-of-two allocation. This means that we don't
+    // have to do size adjustments for input batches as we will do below
+    // when estimating the size of other objects.
+
+    int batchRowWidth = sizer.netRowWidth();
+
+    // Record sizes may vary across batches. To be conservative, use
+    // the largest size observed from incoming batches.
+
+    int origRowEstimate = estimatedRowWidth;
+    estimatedRowWidth = Math.max(estimatedRowWidth, batchRowWidth);
+
+    // Maintain an estimate of the incoming batch size: the largest
+    // batch yet seen. Used to reserve memory for the next incoming
+    // batch.
+
+    long origInputBatchSize = estimatedInputBatchSize;
+    estimatedInputBatchSize = Math.max(estimatedInputBatchSize, actualBatchSize);
+
+    // Go no further if nothing changed.
+
+    if (estimatedRowWidth == origRowEstimate && estimatedInputBatchSize == origInputBatchSize) {
+      return; }
+
+    // Estimate the total size of each incoming batch plus sv2. Note that, due
+    // to power-of-two rounding, the allocated size might be twice the data size.
+
+    long estimatedInputSize = estimatedInputBatchSize + 4 * actualRecordCount;
+
+    // Determine the number of records to spill per spill batch. The goal is to
+    // spill batches of either 64K records, or as many records as fit into the
+    // amount of memory dedicated to each spill batch, whichever is less.
+
+    spillBatchRowCount = (int) Math.max(1, spillBatchSize / estimatedRowWidth);
+    spillBatchRowCount = Math.min(spillBatchRowCount, Character.MAX_VALUE);
+
+    // Determine the number of records per batch per merge step. The goal is to
+    // merge batches of either 64K records, or as many records as fit into the
+    // amount of memory dedicated to each merge batch, whichever is less.
+
+    targetMergeBatchSize = preferredMergeBatchSize;
+    mergeBatchRowCount = (int) Math.max(1, targetMergeBatchSize / estimatedRowWidth);
+    mergeBatchRowCount = Math.min(mergeBatchRowCount, Character.MAX_VALUE);
+
+    // Determine the minimum memory needed for spilling. Spilling is done just
+    // before accepting a batch, so we must spill if we don't have room for a
+    // (worst case) input batch. To spill, we need room for the output batch created
+    // by merging the batches already in memory. Double this to allow for power-of-two
+    // memory allocations.
+
+    spillPoint = estimatedInputBatchSize + 2 * spillBatchSize;
+
+    // The merge memory pool assumes we can spill all input batches. To make
+    // progress, we must have at least two merge batches (same size as an output
+    // batch) and one output batch. Again, double to allow for power-of-two
+    // allocation and add one for a margin of error.
+
+    int minMergeBatches = 2 * 3 + 1;
+    long minMergeMemory = minMergeBatches * targetMergeBatchSize;
+
+    // If we are in a low-memory condition, then we might not have room for the
+    // default output batch size. In that case, pick a smaller size.
+
+    long minMemory = Math.max(spillPoint, minMergeMemory);
+    if (minMemory > memoryLimit) {
+
+      // Figure out the minimum output batch size based on memory, but can't be
+      // any smaller than the defined minimum.
+
+      targetMergeBatchSize = Math.max(MIN_MERGED_BATCH_SIZE, memoryLimit / minMergeBatches);
+
+      // Regardless of anything else, the batch must hold at least one
+      // complete row.
+
+      targetMergeBatchSize = Math.max(estimatedRowWidth, targetMergeBatchSize);
+      spillPoint = estimatedInputBatchSize + 2 * spillBatchSize;
+      minMergeMemory = minMergeBatches * targetMergeBatchSize;
+    }
+
+    // Determine the minimum total memory we would need to receive two input
+    // batches (the minimum needed to make progress) and the allowance for the
+    // output batch.
+
+    long minLoadMemory = spillPoint + estimatedInputSize;
+
+    // Determine how much memory can be used to hold in-memory batches of spilled
+    // runs when reading from disk.
+
+    bufferMemoryPool = memoryLimit - spillPoint;
+    mergeMemoryPool = Math.max(minMergeMemory,
+                               (long) ((memoryLimit - 3 * targetMergeBatchSize) * 0.95));
+
+    // Sanity check: if we've been given too little memory to make progress,
+    // issue a warning but proceed anyway. Should only occur if something is
+    // configured terribly wrong.
+
+    long minMemoryNeeds = Math.max(minLoadMemory, minMergeMemory);
+    if (minMemoryNeeds > memoryLimit) {
+      logger.warn("Potential memory overflow! " +
+                   "Minumum needed = {} bytes, actual available = {} bytes",
+                   minMemoryNeeds, memoryLimit);
+    }
+
+    // Log the calculated values. Turn this on if things seem amiss.
+    // Message will appear only when the values change.
+
+    logger.debug("Memory Estimates: record size = {} bytes; input batch = {} bytes, {} records; " +
+                  "merge batch size = {} bytes, {} records; " +
+                  "output batch size = {} bytes, {} records; " +
+                  "Available memory: {}, spill point = {}, min. merge memory = {}",
+                estimatedRowWidth, estimatedInputBatchSize, actualRecordCount,
+                spillBatchSize, spillBatchRowCount,
+                targetMergeBatchSize, mergeBatchRowCount,
+                memoryLimit, spillPoint, minMergeMemory);
+  }
+
+  /**
+   * Determine if spill is needed before receiving the new record batch.
+   * Spilling is driven purely by memory availability (and an optional
+   * batch limit for testing.)
+   *
+   * @return true if spilling is needed, false otherwise
+   */
+
+  private boolean isSpillNeeded(int incomingSize) {
+
+    // Can't spill if less than two batches else the merge
+    // can't make progress.
+
+    if (bufferedBatches.size() < 2) {
+      return false; }
+
+    // Must spill if we are below the spill point (the amount of memory
+    // needed to do the minimal spill.)
+
+    if (allocator.getAllocatedMemory() + incomingSize >= bufferMemoryPool) {
+      return true; }
+
+    // For test purposes, configuration may have set a limit on the number of
+    // batches in memory. Spill if we exceed this limit. (By default the number
+    // of in-memory batches is unlimited.)
+
+    return bufferedBatches.size() > bufferedBatchLimit;
+  }
+
+  /**
+   * Perform an in-memory sort of the buffered batches. Obviously can
+   * be used only for the non-spilling case.
+   *
+   * @return DONE if no rows, OK_NEW_SCHEMA if at least one row
+   */
+
+  private IterOutcome sortInMemory() {
+    logger.info("Starting in-memory sort. Batches = {}, Records = {}, Memory = {}",
+                bufferedBatches.size(), inputRecordCount, allocator.getAllocatedMemory());
+
+    // Note the difference between how we handle batches here and in the spill/merge
+    // case. In the spill/merge case, this class decides on the batch size to send
+    // downstream. However, in the in-memory case, we must pass along all batches
+    // in a single SV4. Attempts to do paging will result in errors. In the memory
+    // merge case, the downstream Selection Vector Remover will split the one
+    // big SV4 into multiple smaller batches to send further downstream.
+
+    // If the sort fails or is empty, clean up here. Otherwise, cleanup is done
+    // by closing the resultsIterator after all results are returned downstream.
+
+    MergeSort memoryMerge = new MergeSort(context, allocator, opCodeGen);
+    try {
+      sv4 = memoryMerge.merge(bufferedBatches, this, container);
+      if (sv4 == null) {
+        sortState = SortState.DONE;
+        return IterOutcome.STOP;
+      } else {
+        logger.info("Completed in-memory sort. Memory = {}",
+                allocator.getAllocatedMemory());
+        resultsIterator = memoryMerge;
+        memoryMerge = null;
+        sortState = SortState.DELIVER;
+        return IterOutcome.OK_NEW_SCHEMA;
+      }
+    } finally {
+      if (memoryMerge != null) {
+        memoryMerge.close();
+      }
+    }
+  }
+
+  /**
+   * Perform merging of (typically spilled) batches. First consolidates batches
+   * as needed, then performs a final merge that is read one batch at a time
+   * to deliver batches to the downstream operator.
+   *
+   * @return always returns OK_NEW_SCHEMA
+   */
+
+  private IterOutcome mergeSpilledRuns() {
+    logger.info("Starting consolidate phase. Batches = {}, Records = {}, Memory = {}, In-memory batches {}, spilled runs {}",
+                inputBatchCount, inputRecordCount, allocator.getAllocatedMemory(),
+                bufferedBatches.size(), spilledRuns.size());
+
+    // Consolidate batches to a number that can be merged in
+    // a single last pass.
+
+    int mergeCount = 0;
+    while (consolidateBatches()) {
+      mergeCount++;
+    }
+    stats.addLongStat(Metric.MERGE_COUNT, mergeCount);
+
+    // Merge in-memory batches and spilled runs for the final merge.
+
+    List<BatchGroup> allBatches = new LinkedList<>();
+    allBatches.addAll(bufferedBatches);
+    bufferedBatches.clear();
+    allBatches.addAll(spilledRuns);
+    spilledRuns.clear();
+
+    logger.info("Starting merge phase. Runs = {}, Alloc. memory = {}", allBatches.size(), allocator.getAllocatedMemory());
+
+    // Do the final merge as a results iterator.
+
+    CopierHolder.BatchMerger merger = copierHolder.startFinalMerge(schema, allBatches, container, mergeBatchRowCount);
+    merger.next();
+    resultsIterator = merger;
+    sortState = SortState.DELIVER;
+    return IterOutcome.OK_NEW_SCHEMA;
+  }
+
+  private boolean consolidateBatches() {
+
+    // Determine additional memory needed to hold one batch from each
+    // spilled run.
+
+    int inMemCount = bufferedBatches.size();
+    int spilledRunsCount = spilledRuns.size();
+
+    // Can't merge more than will fit into memory at one time.
+
+    int maxMergeWidth = (int) (mergeMemoryPool / targetMergeBatchSize);
+    maxMergeWidth = Math.min(mergeLimit, maxMergeWidth);
+
+    // If we can't fit all batches in memory, must spill any in-memory
+    // batches to make room for multiple spill-merge-spill cycles.
+
+    if (inMemCount > 0) {
+      if (spilledRunsCount > maxMergeWidth) {
+        spillFromMemory();
+        return true;
+      }
+
+      // If we just plain have too many batches to merge, spill some
+      // in-memory batches to reduce the burden.
+
+      if (inMemCount + spilledRunsCount > mergeLimit) {
+        spillFromMemory();
+        return true;
+      }
+
+      // If the on-disk batches and in-memory batches need more memory than
+      // is available, spill some in-memory batches.
+
+      long allocated = allocator.getAllocatedMemory();
+      long totalNeeds = spilledRunsCount * targetMergeBatchSize + allocated;
+      if (totalNeeds > mergeMemoryPool) {
+        spillFromMemory();
+        return true;
+      }
+    }
+
+    // Merge on-disk batches if we have too many.
+
+    int mergeCount = spilledRunsCount - maxMergeWidth;
+    if (mergeCount <= 0) {
+      return false;
+    }
+
+    // Must merge at least 2 batches to make progress.
+
+    mergeCount = Math.max(2, mergeCount);
+
+    // We will merge. This will create yet another spilled
+    // run. Account for that.
+
+    mergeCount += 1;
+
+    mergeCount = Math.min(mergeCount, maxMergeWidth);
+
+    // If we are going to merge, and we have batches in memory,
+    // spill them and try again. We need to do this to ensure we
+    // have adequate memory to hold the merge batches. We are into
+    // a second-generation sort/merge so there is no point in holding
+    // onto batches in memory.
+
+    if (inMemCount > 0) {
+      spillFromMemory();
+      return true;
+    }
+
+    // Do the merge, then loop to try again in case not
+    // all the target batches spilled in one go.
+
+    logger.trace("Merging {} on-disk runs, Alloc. memory = {}",
+        mergeCount, allocator.getAllocatedMemory());
+    mergeAndSpill(spilledRuns, mergeCount);
+    return true;
+  }
+
+  /**
+   * This operator has accumulated a set of sorted incoming record batches.
+   * We wish to spill some of them to disk. To do this, a "copier"
+   * merges the target batches to produce a stream of new (merged) batches
+   * which are then written to disk.
+   * <p>
+   * This method spills only half the accumulated batches
+   * minimizing unnecessary disk writes. The exact count must lie between
+   * the minimum and maximum spill counts.
+    */
+
+  private void spillFromMemory() {
+
+    // Determine the number of batches to spill to create a spill file
+    // of the desired size. The actual file size might be a bit larger
+    // or smaller than the target, which is expected.
+
+    long estSize = 0;
+    int spillCount = 0;
+    for (InputBatch batch : bufferedBatches) {
+      estSize += batch.getDataSize();
+      if (estSize > spillFileSize) {
+        break; }
+      spillCount++;
+    }
+
+    // Should not happen, but just to be sure...
+
+    if (spillCount == 0) {
+      return; }
+
+    // Do the actual spill.
+
+    logger.trace("Starting spill from memory. Memory = {}, Buffered batch count = {}, Spill batch count = {}",
+                 allocator.getAllocatedMemory(), bufferedBatches.size(), spillCount);
+    mergeAndSpill(bufferedBatches, spillCount);
+  }
+
+  private void mergeAndSpill(LinkedList<? extends BatchGroup> source, int count) {
+    if (count == 0) {
+      return; }
+    spilledRuns.add(doMergeAndSpill(source, count));
+  }
+
+  private BatchGroup.SpilledRun doMergeAndSpill(LinkedList<? extends BatchGroup> batchGroups, int spillCount) {
+    List<BatchGroup> batchesToSpill = Lists.newArrayList();
+    spillCount = Math.min(batchGroups.size(), spillCount);
+    assert spillCount > 0 : "Spill count to mergeAndSpill must not be zero";
+    long spillSize = 0;
+    for (int i = 0; i < spillCount; i++) {
+      @SuppressWarnings("resource")
+      BatchGroup batch = batchGroups.pollFirst();
+      assert batch != null : "Encountered a null batch during merge and spill operation";
+      batchesToSpill.add(batch);
+      spillSize += batch.getDataSize();
+    }
+
+    // Merge the selected set of matches and write them to the
+    // spill file. After each write, we release the memory associated
+    // with the just-written batch.
+
+    String outputFile = spillSet.getNextSpillFile();
+    stats.setLongStat(Metric.SPILL_COUNT, spillSet.getFileCount());
+    BatchGroup.SpilledRun newGroup = null;
+    try (AutoCloseable ignored = AutoCloseables.all(batchesToSpill);
+         CopierHolder.BatchMerger merger = copierHolder.startMerge(schema, batchesToSpill, spillBatchRowCount)) {
+      logger.trace("Merging and spilling to {}", outputFile);
+      newGroup = new BatchGroup.SpilledRun(spillSet, outputFile, oContext, spillSize);
+
+      // The copier will merge records from the buffered batches into
+      // the outputContainer up to targetRecordCount number of rows.
+      // The actual count may be less if fewer records are available.
+
+      while (merger.next()) {
+
+        // Add a new batch of records (given by merger.getOutput()) to the spill
+        // file, opening the file if not yet open, and creating the target
+        // directory if it does not yet exist.
+        //
+        // note that addBatch also clears the merger's output container
+
+        newGroup.addBatch(merger.getOutput());
+      }
+      injector.injectChecked(context.getExecutionControls(), INTERRUPTION_WHILE_SPILLING, IOException.class);
+      newGroup.closeOutputStream();
+      logger.trace("mergeAndSpill: completed, memory = {}, spilled {} records to {}",
+                   allocator.getAllocatedMemory(), merger.getRecordCount(), outputFile);
+      return newGroup;
+    } catch (Throwable e) {
+      // we only need to clean up newGroup if spill failed
+      try {
+        if (newGroup != null) {
+          AutoCloseables.close(e, newGroup);
+        }
+      } catch (Throwable t) { /* close() may hit the same IO issue; just ignore */ }
+
+      // Here the merger is holding onto a partially-completed batch.
+      // It will release the memory in the close() call.
+
+      try {
+        // Rethrow so we can organize how to handle the error.
+
+        throw e;
+      }
+
+      // If error is a User Exception, just use as is.
+
+      catch (UserException ue) { throw ue; }
+      catch (Throwable ex) {
+        throw UserException.resourceError(ex)
+              .message("External Sort encountered an error while spilling to disk")
+              .build(logger);
+      }
+    }
+  }
+
+  /**
+   * Allocate and initialize the selection vector used as the sort index.
+   * Assumes that memory is available for the vector since memory management
+   * ensured space is available.
+   *
+   * @return a new, populated selection vector 2
+   */
+
+  private SelectionVector2 newSV2() {
+    SelectionVector2 sv2 = new SelectionVector2(allocator);
+    if (!sv2.allocateNewSafe(incoming.getRecordCount())) {
+      throw UserException.resourceError(new OutOfMemoryException("Unable to allocate sv2 buffer"))
+            .build(logger);
+    }
+    for (int i = 0; i < incoming.getRecordCount(); i++) {
+      sv2.setIndex(i, (char) i);
+    }
+    sv2.setRecordCount(incoming.getRecordCount());
+    return sv2;
+  }
+
+  @Override
+  public WritableBatch getWritableBatch() {
+    throw new UnsupportedOperationException("A sort batch is not writable.");
+  }
+
+  @Override
+  protected void killIncoming(boolean sendUpstream) {
+    incoming.kill(sendUpstream);
+  }
+
+  /**
+   * Extreme paranoia to avoid leaving resources unclosed in the case
+   * of an error. Since generally only the first error is of interest,
+   * we track only the first exception, not potential cascading downstream
+   * exceptions.
+   * <p>
+   * Some Drill code ends up calling close() two or more times. The code
+   * here protects itself from these undesirable semantics.
+   */
+
+  @Override
+  public void close() {
+    if (spillSet.getWriteBytes() > 0) {
+      logger.debug("End of sort. Total write bytes: {}, Total read bytes: {}",
+                   spillSet.getWriteBytes(), spillSet.getWriteBytes());
+    }
+    stats.setLongStat(Metric.SPILL_MB,
+        (int) Math.round( spillSet.getWriteBytes() / 1024.0D / 1024.0 ) );
+    RuntimeException ex = null;
+    try {
+      if (bufferedBatches != null) {
+        closeBatchGroups(bufferedBatches);
+        bufferedBatches = null;
+      }
+    } catch (RuntimeException e) {
+      ex = e;
+    }
+    try {
+      if (spilledRuns != null) {
+        closeBatchGroups(spilledRuns);
+        spilledRuns = null;
+      }
+    } catch (RuntimeException e) {
+      ex = (ex == null) ? e : ex;
+    }
+    try {
+      if (sv4 != null) {
+        sv4.clear();
+      }
+    } catch (RuntimeException e) {
+      ex = (ex == null) ? e : ex;
+    }
+    try {
+      if (resultsIterator != null) {
+        resultsIterator.close();
+        resultsIterator = null;
+      }
+    } catch (RuntimeException e) {
+      ex = (ex == null) ? e : ex;
+    }
+    try {
+      copierHolder.close();
+    } catch (RuntimeException e) {
+      ex = (ex == null) ? e : ex;
+    }
+    try {
+      spillSet.close();
+    } catch (RuntimeException e) {
+      ex = (ex == null) ? e : ex;
+    }
+    try {
+      opCodeGen.close();
+    } catch (RuntimeException e) {
+      ex = (ex == null) ? e : ex;
+    }
+
+    // The call to super.close() clears out the output container.
+    // Doing so requires the allocator here, so it must be closed
+    // after the super call.
+
+    try {
+      super.close();
+    } catch (RuntimeException e) {
+      ex = (ex == null) ? e : ex;
+    }
+    try {
+      allocator.close();
+    } catch (RuntimeException e) {
+      ex = (ex == null) ? e : ex;
+    }
+    if (ex != null) {
+      throw ex;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/300e9349/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSortTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSortTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSortTemplate.java
new file mode 100644
index 0000000..31475d2
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSortTemplate.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.xsort.managed;
+
+import java.util.Queue;
+
+import javax.inject.Named;
+
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.memory.BaseAllocator;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+import org.apache.hadoop.util.IndexedSortable;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Queues;
+
+import io.netty.buffer.DrillBuf;
+
+public abstract class MSortTemplate implements MSorter, IndexedSortable {
+//  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MSortTemplate.class);
+
+  private SelectionVector4 vector4;
+  private SelectionVector4 aux;
+  @SuppressWarnings("unused")
+  private long compares;
+
+  /**
+   * Holds offsets into the SV4 of the start of each batch
+   * (sorted run.)
+   */
+
+  private Queue<Integer> runStarts = Queues.newLinkedBlockingQueue();
+  private FragmentContext context;
+
+  /**
+   * Controls the maximum size of batches exposed to downstream
+   */
+  private int desiredRecordBatchCount;
+
+  @Override
+  public void setup(final FragmentContext context, final BufferAllocator allocator, final SelectionVector4 vector4,
+                    final VectorContainer hyperBatch, int outputBatchSize) throws SchemaChangeException{
+    // we pass in the local hyperBatch since that is where we'll be reading data.
+    Preconditions.checkNotNull(vector4);
+    this.vector4 = vector4.createNewWrapperCurrent();
+    this.context = context;
+    vector4.clear();
+    doSetup(context, hyperBatch, null);
+
+    // Populate the queue with the offset in the SV4 of each
+    // batch. Note that this is expensive as it requires a scan
+    // of all items to be sorted: potentially millions.
+
+    runStarts.add(0);
+    int batch = 0;
+    final int totalCount = this.vector4.getTotalCount();
+    for (int i = 0; i < totalCount; i++) {
+      final int newBatch = this.vector4.get(i) >>> 16;
+      if (newBatch == batch) {
+        continue;
+      } else if (newBatch == batch + 1) {
+        runStarts.add(i);
+        batch = newBatch;
+      } else {
+        throw new UnsupportedOperationException(String.format("Missing batch. batch: %d newBatch: %d", batch, newBatch));
+      }
+    }
+
+    // Create a temporary SV4 to hold the merged results.
+
+    @SuppressWarnings("resource")
+    final DrillBuf drillBuf = allocator.buffer(4 * totalCount);
+    desiredRecordBatchCount = Math.min(outputBatchSize, Character.MAX_VALUE);
+    desiredRecordBatchCount = Math.min(desiredRecordBatchCount, totalCount);
+    aux = new SelectionVector4(drillBuf, totalCount, desiredRecordBatchCount);
+  }
+
+  /**
+   * For given recordCount how much memory does MSorter needs for its own purpose. This is used in
+   * ExternalSortBatch to make decisions about whether to spill or not.
+   *
+   * @param recordCount
+   * @return
+   */
+  public static long memoryNeeded(final int recordCount) {
+    // We need 4 bytes (SV4) for each record.
+    // The memory allocator will round this to the next
+    // power of 2.
+
+    return BaseAllocator.nextPowerOfTwo(recordCount * 4);
+  }
+
+  /**
+   * Given two regions within the selection vector 4 (a left and a right), merge
+   * the two regions to produce a combined output region in the auxiliary
+   * selection vector.
+   *
+   * @param leftStart
+   * @param rightStart
+   * @param rightEnd
+   * @param outStart
+   * @return
+   */
+  protected int merge(final int leftStart, final int rightStart, final int rightEnd, final int outStart) {
+    int l = leftStart;
+    int r = rightStart;
+    int o = outStart;
+    while (l < rightStart && r < rightEnd) {
+      if (compare(l, r) <= 0) {
+        aux.set(o++, vector4.get(l++));
+      } else {
+        aux.set(o++, vector4.get(r++));
+      }
+    }
+    while (l < rightStart) {
+      aux.set(o++, vector4.get(l++));
+    }
+    while (r < rightEnd) {
+      aux.set(o++, vector4.get(r++));
+    }
+    assert o == outStart + (rightEnd - leftStart);
+    return o;
+  }
+
+  @Override
+  public SelectionVector4 getSV4() {
+    return vector4;
+  }
+
+  /**
+   * Sort (really, merge) a set of pre-sorted runs to produce a combined
+   * result set. Merging is done in the selection vector, record data does
+   * not move.
+   * <p>
+   * Runs are merge pairwise in multiple passes, providing performance
+   * of O(n * m * log(n)), where n = number of runs, m = number of records
+   * per run.
+   */
+
+  @Override
+  public void sort(final VectorContainer container) {
+    while (runStarts.size() > 1) {
+      final int totalCount = this.vector4.getTotalCount();
+
+      // check if we're cancelled/failed recently
+      if (!context.shouldContinue()) {
+        return; }
+
+      int outIndex = 0;
+      final Queue<Integer> newRunStarts = Queues.newLinkedBlockingQueue();
+      newRunStarts.add(outIndex);
+      final int size = runStarts.size();
+      for (int i = 0; i < size / 2; i++) {
+        final int left = runStarts.poll();
+        final int right = runStarts.poll();
+        Integer end = runStarts.peek();
+        if (end == null) {
+          end = totalCount;
+        }
+        outIndex = merge(left, right, end, outIndex);
+        if (outIndex < vector4.getTotalCount()) {
+          newRunStarts.add(outIndex);
+        }
+      }
+      if (outIndex < totalCount) {
+        copyRun(outIndex, totalCount);
+      }
+      @SuppressWarnings("resource")
+      final SelectionVector4 tmp = aux.createNewWrapperCurrent(desiredRecordBatchCount);
+      aux.clear();
+      aux = vector4.createNewWrapperCurrent(desiredRecordBatchCount);
+      vector4.clear();
+      vector4 = tmp.createNewWrapperCurrent(desiredRecordBatchCount);
+      tmp.clear();
+      runStarts = newRunStarts;
+    }
+    aux.clear();
+  }
+
+  private void copyRun(final int start, final int end) {
+    for (int i = start; i < end; i++) {
+      aux.set(i, vector4.get(i));
+    }
+  }
+
+  @Override
+  public void swap(final int sv0, final int sv1) {
+    final int tmp = vector4.get(sv0);
+    vector4.set(sv0, vector4.get(sv1));
+    vector4.set(sv1, tmp);
+  }
+
+  @Override
+  public int compare(final int leftIndex, final int rightIndex) {
+    final int sv1 = vector4.get(leftIndex);
+    final int sv2 = vector4.get(rightIndex);
+    compares++;
+    try {
+      return doEval(sv1, sv2);
+    } catch (SchemaChangeException e) {
+      throw new IllegalStateException(e);
+    }
+  }
+
+  @Override
+  public void clear() {
+    if(vector4 != null) {
+      vector4.clear();
+    }
+
+    if(aux != null) {
+      aux.clear();
+    }
+  }
+
+  public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") VectorContainer incoming, @Named("outgoing") RecordBatch outgoing) throws SchemaChangeException;
+  public abstract int doEval(@Named("leftIndex") int leftIndex, @Named("rightIndex") int rightIndex) throws SchemaChangeException;
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/300e9349/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSorter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSorter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSorter.java
new file mode 100644
index 0000000..0d04b7e
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSorter.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.xsort.managed;
+
+import org.apache.drill.exec.compile.TemplateClassDefinition;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+
+/**
+ * In-memory sorter. Takes a list of batches as input, produces a selection
+ * vector 4, with sorted results, as output.
+ */
+
+public interface MSorter {
+  public void setup(FragmentContext context, BufferAllocator allocator, SelectionVector4 vector4, VectorContainer hyperBatch, int outputBatchSize) throws SchemaChangeException;
+  public void sort(VectorContainer container);
+  public SelectionVector4 getSV4();
+
+  public static TemplateClassDefinition<MSorter> TEMPLATE_DEFINITION = new TemplateClassDefinition<MSorter>(MSorter.class, MSortTemplate.class);
+
+  public void clear();
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/300e9349/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MergeSort.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MergeSort.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MergeSort.java
new file mode 100644
index 0000000..c3e2dbe
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MergeSort.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.xsort.managed;
+
+import java.util.LinkedList;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
+import org.apache.drill.exec.physical.impl.sort.SortRecordBatchBuilder;
+import org.apache.drill.exec.physical.impl.xsort.managed.ExternalSortBatch.SortResults;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+
+/**
+ * Wrapper around the "MSorter" (in memory merge sorter). As batches have
+ * arrived to the sort, they have been individually sorted and buffered
+ * in memory. At the completion of the sort, we detect that no batches
+ * were spilled to disk. In this case, we can merge the in-memory batches
+ * using an efficient memory-based approach implemented here.
+ * <p>
+ * Since all batches are in memory, we don't want to use the usual merge
+ * algorithm as that makes a copy of the original batches (which were read
+ * from a spill file) to produce an output batch. Instead, we want to use
+ * the in-memory batches as-is. To do this, we use a selection vector 4
+ * (SV4) as a global index into the collection of batches. The SV4 uses
+ * the upper two bytes as the batch index, and the lower two as an offset
+ * of a record within the batch.
+ * <p>
+ * The merger ("M Sorter") populates the SV4 by scanning the set of
+ * in-memory batches, searching for the one with the lowest value of the
+ * sort key. The batch number and offset are placed into the SV4. The process
+ * continues until all records from all batches have an entry in the SV4.
+ * <p>
+ * The actual implementation uses an iterative merge to perform the above
+ * efficiently.
+ * <p>
+ * A sort can only do a single merge. So, we do not attempt to share the
+ * generated class; we just generate it internally and discard it at
+ * completion of the merge.
+ */
+
+public class MergeSort implements SortResults {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MergeSort.class);
+
+  private SortRecordBatchBuilder builder;
+  private MSorter mSorter;
+  private final FragmentContext context;
+  private final BufferAllocator oAllocator;
+  private SelectionVector4 sv4;
+  private final OperatorCodeGenerator opCg;
+  private int batchCount;
+
+  public MergeSort(FragmentContext context, BufferAllocator allocator, OperatorCodeGenerator opCg) {
+    this.context = context;
+    this.oAllocator = allocator;
+    this.opCg = opCg;
+  }
+
+  /**
+   * Merge the set of in-memory batches to produce a single logical output in the given
+   * destination container, indexed by an SV4.
+   *
+   * @param batchGroups the complete set of in-memory batches
+   * @param batch the record batch (operator) for the sort operator
+   * @param destContainer the vector container for the sort operator
+   * @return the sv4 for this operator
+   */
+
+  public SelectionVector4 merge(LinkedList<BatchGroup.InputBatch> batchGroups, VectorAccessible batch,
+                                VectorContainer destContainer) {
+
+    // Add the buffered batches to a collection that MSorter can use.
+    // The builder takes ownership of the batches and will release them if
+    // an error occurs.
+
+    builder = new SortRecordBatchBuilder(oAllocator);
+    for (BatchGroup.InputBatch group : batchGroups) {
+      RecordBatchData rbd = new RecordBatchData(group.getContainer(), oAllocator);
+      rbd.setSv2(group.getSv2());
+      builder.add(rbd);
+    }
+    batchGroups.clear();
+
+    // Generate the msorter.
+
+    try {
+      builder.build(context, destContainer);
+      sv4 = builder.getSv4();
+      mSorter = opCg.createNewMSorter(batch);
+      mSorter.setup(context, oAllocator, sv4, destContainer, sv4.getCount());
+    } catch (SchemaChangeException e) {
+      throw UserException.unsupportedError(e)
+            .message("Unexpected schema change - likely code error.")
+            .build(logger);
+    }
+
+    // For testing memory-leaks, inject exception after mSorter finishes setup
+    ExternalSortBatch.injector.injectUnchecked(context.getExecutionControls(), ExternalSortBatch.INTERRUPTION_AFTER_SETUP);
+    mSorter.sort(destContainer);
+
+    // sort may have prematurely exited due to should continue returning false.
+    if (!context.shouldContinue()) {
+      return null;
+    }
+
+    // For testing memory-leak purpose, inject exception after mSorter finishes sorting
+    ExternalSortBatch.injector.injectUnchecked(context.getExecutionControls(), ExternalSortBatch.INTERRUPTION_AFTER_SORT);
+    sv4 = mSorter.getSV4();
+
+    destContainer.buildSchema(SelectionVectorMode.FOUR_BYTE);
+    return sv4;
+  }
+
+  /**
+   * The SV4 provides a built-in iterator that returns a virtual set of record
+   * batches so that the downstream operator need not consume the entire set
+   * of accumulated batches in a single step.
+   */
+
+  @Override
+  public boolean next() {
+    boolean more = sv4.next();
+    if (more) { batchCount++; }
+    return more;
+  }
+
+  @Override
+  public void close() {
+    if (builder != null) {
+      builder.clear();
+      builder.close();
+    }
+    if (mSorter != null) {
+      mSorter.clear();
+    }
+  }
+
+  @Override
+  public int getBatchCount() {
+    return batchCount;
+  }
+
+  @Override
+  public int getRecordCount() {
+    return sv4.getTotalCount();
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/300e9349/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/OperatorCodeGenerator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/OperatorCodeGenerator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/OperatorCodeGenerator.java
new file mode 100644
index 0000000..57846db
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/OperatorCodeGenerator.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.xsort.managed;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.calcite.rel.RelFieldCollation.Direction;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.ErrorCollector;
+import org.apache.drill.common.expression.ErrorCollectorImpl;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.logical.data.Order.Ordering;
+import org.apache.drill.exec.compile.sig.GeneratorMapping;
+import org.apache.drill.exec.compile.sig.MappingSet;
+import org.apache.drill.exec.exception.ClassTransformationException;
+import org.apache.drill.exec.expr.ClassGenerator;
+import org.apache.drill.exec.expr.ClassGenerator.HoldingContainer;
+import org.apache.drill.exec.expr.CodeGenerator;
+import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
+import org.apache.drill.exec.expr.fn.FunctionGenerationHelper;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.config.ExternalSort;
+import org.apache.drill.exec.physical.config.Sort;
+import org.apache.drill.exec.physical.impl.xsort.SingleBatchSorter;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.vector.CopyUtil;
+
+import com.sun.codemodel.JConditional;
+import com.sun.codemodel.JExpr;
+
+/**
+ * Generates and manages the data-specific classes for this operator.
+ * <p>
+ * Several of the code generation methods take a batch, but the methods
+ * are called for many batches, and generate code only for the first one.
+ * Better would be to generate code from a schema; but Drill is not set
+ * up for that at present.
+ */
+
+public class OperatorCodeGenerator {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OperatorCodeGenerator.class);
+
+  protected static final MappingSet MAIN_MAPPING = new MappingSet((String) null, null, ClassGenerator.DEFAULT_SCALAR_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
+  protected static final MappingSet LEFT_MAPPING = new MappingSet("leftIndex", null, ClassGenerator.DEFAULT_SCALAR_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
+  protected static final MappingSet RIGHT_MAPPING = new MappingSet("rightIndex", null, ClassGenerator.DEFAULT_SCALAR_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
+
+  private static final GeneratorMapping COPIER_MAPPING = new GeneratorMapping("doSetup", "doCopy", null, null);
+  private static final MappingSet COPIER_MAPPING_SET = new MappingSet(COPIER_MAPPING, COPIER_MAPPING);
+
+  private final FragmentContext context;
+  @SuppressWarnings("unused")
+  private BatchSchema schema;
+
+  /**
+   * A single PriorityQueueCopier instance is used for 2 purposes:
+   * 1. Merge sorted batches before spilling
+   * 2. Merge sorted batches when all incoming data fits in memory
+   */
+
+  private PriorityQueueCopier copier;
+  private final Sort popConfig;
+
+  /**
+   * Generated sort operation used to sort each incoming batch according to
+   * the sort criteria specified in the {@link ExternalSort} definition of
+   * this operator.
+   */
+
+  private SingleBatchSorter sorter;
+
+  public OperatorCodeGenerator(FragmentContext context, Sort popConfig) {
+    this.context = context;
+    this.popConfig = popConfig;
+  }
+
+  public void setSchema(BatchSchema schema) {
+    close();
+    this.schema = schema;
+  }
+
+  public void close() {
+    closeCopier();
+    sorter = null;
+  }
+
+  public void closeCopier() {
+    if (copier == null) {
+      return; }
+    try {
+      copier.close();
+      copier = null;
+    } catch (IOException e) {
+      throw UserException.dataWriteError(e)
+            .message("Failure while flushing spilled data")
+            .build(logger);
+    }
+  }
+
+  public PriorityQueueCopier getCopier(VectorAccessible batch) {
+    if (copier == null) {
+      copier = generateCopier(batch);
+    }
+    return copier;
+  }
+
+  private PriorityQueueCopier generateCopier(VectorAccessible batch) {
+    // Generate the copier code and obtain the resulting class
+
+    CodeGenerator<PriorityQueueCopier> cg = CodeGenerator.get(PriorityQueueCopier.TEMPLATE_DEFINITION, context.getFunctionRegistry(), context.getOptions());
+    ClassGenerator<PriorityQueueCopier> g = cg.getRoot();
+    cg.plainJavaCapable(true);
+    // Uncomment out this line to debug the generated code.
+//  cg.saveCodeForDebugging(true);
+
+    generateComparisons(g, batch);
+
+    g.setMappingSet(COPIER_MAPPING_SET);
+    CopyUtil.generateCopies(g, batch, true);
+    g.setMappingSet(MAIN_MAPPING);
+    return getInstance(cg);
+  }
+
+  public MSorter createNewMSorter(VectorAccessible batch) {
+    return createNewMSorter(popConfig.getOrderings(), batch, MAIN_MAPPING, LEFT_MAPPING, RIGHT_MAPPING);
+  }
+
+  private MSorter createNewMSorter(List<Ordering> orderings, VectorAccessible batch, MappingSet mainMapping, MappingSet leftMapping, MappingSet rightMapping) {
+    CodeGenerator<MSorter> cg = CodeGenerator.get(MSorter.TEMPLATE_DEFINITION, context.getFunctionRegistry(), context.getOptions());
+    cg.plainJavaCapable(true);
+
+    // Uncomment out this line to debug the generated code.
+//  cg.saveCodeForDebugging(true);
+    ClassGenerator<MSorter> g = cg.getRoot();
+    g.setMappingSet(mainMapping);
+
+    for (Ordering od : orderings) {
+      // first, we rewrite the evaluation stack for each side of the comparison.
+      ErrorCollector collector = new ErrorCollectorImpl();
+      final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), batch, collector, context.getFunctionRegistry());
+      if (collector.hasErrors()) {
+        throw UserException.unsupportedError()
+              .message("Failure while materializing expression. " + collector.toErrorString())
+              .build(logger);
+      }
+      g.setMappingSet(leftMapping);
+      HoldingContainer left = g.addExpr(expr, ClassGenerator.BlkCreateMode.FALSE);
+      g.setMappingSet(rightMapping);
+      HoldingContainer right = g.addExpr(expr, ClassGenerator.BlkCreateMode.FALSE);
+      g.setMappingSet(mainMapping);
+
+      // next we wrap the two comparison sides and add the expression block for the comparison.
+      LogicalExpression fh =
+          FunctionGenerationHelper.getOrderingComparator(od.nullsSortHigh(), left, right,
+                                                         context.getFunctionRegistry());
+      HoldingContainer out = g.addExpr(fh, ClassGenerator.BlkCreateMode.FALSE);
+      JConditional jc = g.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0)));
+
+      if (od.getDirection() == Direction.ASCENDING) {
+        jc._then()._return(out.getValue());
+      }else{
+        jc._then()._return(out.getValue().minus());
+      }
+      g.rotateBlock();
+    }
+
+    g.rotateBlock();
+    g.getEvalBlock()._return(JExpr.lit(0));
+
+    return getInstance(cg);
+  }
+
+  public SingleBatchSorter getSorter(VectorAccessible batch) {
+    if (sorter == null) {
+      sorter = createNewSorter(batch);
+    }
+    return sorter;
+  }
+
+  private SingleBatchSorter createNewSorter(VectorAccessible batch) {
+    CodeGenerator<SingleBatchSorter> cg = CodeGenerator.get(
+        SingleBatchSorter.TEMPLATE_DEFINITION, context.getFunctionRegistry(),
+        context.getOptions());
+    ClassGenerator<SingleBatchSorter> g = cg.getRoot();
+    cg.plainJavaCapable(true);
+    // Uncomment out this line to debug the generated code.
+//  cg.saveCodeForDebugging(true);
+
+    generateComparisons(g, batch);
+    return getInstance(cg);
+  }
+
+  private <T> T getInstance(CodeGenerator<T> cg) {
+    try {
+      return context.getImplementationClass(cg);
+    } catch (ClassTransformationException e) {
+      throw UserException.unsupportedError(e)
+            .message("Code generation error - likely code error.")
+            .build(logger);
+    } catch (IOException e) {
+      throw UserException.resourceError(e)
+            .message("IO Error during code generation.")
+            .build(logger);
+    }
+  }
+
+  protected void generateComparisons(ClassGenerator<?> g, VectorAccessible batch)  {
+    g.setMappingSet(MAIN_MAPPING);
+
+    for (Ordering od : popConfig.getOrderings()) {
+      // first, we rewrite the evaluation stack for each side of the comparison.
+      ErrorCollector collector = new ErrorCollectorImpl();
+      final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), batch, collector, context.getFunctionRegistry());
+      if (collector.hasErrors()) {
+        throw UserException.unsupportedError()
+              .message("Failure while materializing expression. " + collector.toErrorString())
+              .build(logger);
+      }
+      g.setMappingSet(LEFT_MAPPING);
+      HoldingContainer left = g.addExpr(expr, ClassGenerator.BlkCreateMode.FALSE);
+      g.setMappingSet(RIGHT_MAPPING);
+      HoldingContainer right = g.addExpr(expr, ClassGenerator.BlkCreateMode.FALSE);
+      g.setMappingSet(MAIN_MAPPING);
+
+      // next we wrap the two comparison sides and add the expression block for the comparison.
+      LogicalExpression fh =
+          FunctionGenerationHelper.getOrderingComparator(od.nullsSortHigh(), left, right,
+                                                         context.getFunctionRegistry());
+      HoldingContainer out = g.addExpr(fh, ClassGenerator.BlkCreateMode.FALSE);
+      JConditional jc = g.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0)));
+
+      if (od.getDirection() == Direction.ASCENDING) {
+        jc._then()._return(out.getValue());
+      }else{
+        jc._then()._return(out.getValue().minus());
+      }
+      g.rotateBlock();
+    }
+
+    g.rotateBlock();
+    g.getEvalBlock()._return(JExpr.lit(0));
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/300e9349/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/PriorityQueueCopier.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/PriorityQueueCopier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/PriorityQueueCopier.java
new file mode 100644
index 0000000..2657bb8
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/PriorityQueueCopier.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.xsort.managed;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.drill.exec.compile.TemplateClassDefinition;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.record.VectorAccessible;
+
+public interface PriorityQueueCopier extends AutoCloseable {
+  public void setup(FragmentContext context, BufferAllocator allocator, VectorAccessible hyperBatch,
+      List<BatchGroup> batchGroups, VectorAccessible outgoing) throws SchemaChangeException;
+
+  public int next(int targetRecordCount);
+
+  public final static TemplateClassDefinition<PriorityQueueCopier> TEMPLATE_DEFINITION =
+      new TemplateClassDefinition<>(PriorityQueueCopier.class, PriorityQueueCopierTemplate.class);
+
+  @Override
+  abstract public void close() throws IOException; // specify this to leave out the Exception
+}


Mime
View raw message