drill-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From paul-rogers <...@git.apache.org>
Subject [GitHub] drill pull request #717: DRILL-5080: Memory-managed version of external sort
Date Thu, 02 Feb 2017 22:45:13 GMT
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/717#discussion_r99236749
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
---
    @@ -0,0 +1,1321 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.physical.impl.xsort.managed;
    +
    +import java.io.IOException;
    +import java.util.Collection;
    +import java.util.LinkedList;
    +import java.util.List;
    +
    +import org.apache.drill.common.AutoCloseables;
    +import org.apache.drill.common.config.DrillConfig;
    +import org.apache.drill.common.exceptions.UserException;
    +import org.apache.drill.exec.ExecConstants;
    +import org.apache.drill.exec.exception.OutOfMemoryException;
    +import org.apache.drill.exec.exception.SchemaChangeException;
    +import org.apache.drill.exec.memory.BufferAllocator;
    +import org.apache.drill.exec.ops.FragmentContext;
    +import org.apache.drill.exec.ops.MetricDef;
    +import org.apache.drill.exec.physical.config.ExternalSort;
    +import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
    +import org.apache.drill.exec.physical.impl.xsort.MSortTemplate;
    +import org.apache.drill.exec.physical.impl.xsort.SingleBatchSorter;
    +import org.apache.drill.exec.physical.impl.xsort.managed.BatchGroup.InputBatch;
    +import org.apache.drill.exec.record.AbstractRecordBatch;
    +import org.apache.drill.exec.record.BatchSchema;
    +import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
    +import org.apache.drill.exec.record.RecordBatch;
    +import org.apache.drill.exec.record.SchemaUtil;
    +import org.apache.drill.exec.record.VectorContainer;
    +import org.apache.drill.exec.record.VectorWrapper;
    +import org.apache.drill.exec.record.WritableBatch;
    +import org.apache.drill.exec.record.selection.SelectionVector2;
    +import org.apache.drill.exec.record.selection.SelectionVector4;
    +import org.apache.drill.exec.testing.ControlsInjector;
    +import org.apache.drill.exec.testing.ControlsInjectorFactory;
    +import org.apache.drill.exec.vector.ValueVector;
    +import org.apache.drill.exec.vector.complex.AbstractContainerVector;
    +
    +import com.google.common.collect.Lists;
    +
    +/**
    + * External sort batch: a sort batch which can spill to disk in
    + * order to operate within a defined memory footprint.
    + * <p>
    + * <h4>Basic Operation</h4>
    + * The operator has three key phases:
    + * <p>
    + * <ul>
    + * <li>The load phase in which batches are read from upstream.</li>
    + * <li>The merge phase in which spilled batches are combined to
    + * reduce the number of files below the configured limit. (Best
    + * practice is to configure the system to avoid this phase.)
    + * <li>The delivery phase in which batches are combined to produce
    + * the final output.</li>
    + * </ul>
    + * During the load phase:
    + * <p>
    + * <ul>
    + * <li>The incoming (upstream) operator provides a series of batches.</li>
    + * <li>This operator sorts each batch, and accumulates them in an in-memory
    + * buffer.</li>
    + * <li>If the in-memory buffer becomes too large, this operator selects
    + * a subset of the buffered batches to spill.</li>
    + * <li>Each spill set is merged to create a new, sorted collection of
    + * batches, and each is spilled to disk.</li>
    + * <li>To allow the use of multiple disk storage, each spill group is written
    + * round-robin to a set of spill directories.</li>
    + * </ul>
    + * <p>
    + * During the sort/merge phase:
    + * <p>
    + * <ul>
    + * <li>When the input operator is complete, this operator merges the accumulated
    + * batches (which may be all in memory or partially on disk), and returns
    + * them to the output (downstream) operator in chunks of no more than
    + * 32K records.</li>
    + * <li>The final merge must combine a collection of in-memory and spilled
    + * batches. Several limits apply to the maximum "width" of this merge. For
    + * example, we each open spill run consumes a file handle, and we may wish
    + * to limit the number of file handles. A consolidation phase combines
    + * in-memory and spilled batches prior to the final merge to control final
    + * merge width.</li>
    + * <li>A special case occurs if no batches were spilled. In this case, the input
    + * batches are sorted in memory without merging.</li>
    + * </ul>
    + * <p>
    + * Many complex details are involved in doing the above; the details are explained
    + * in the methods of this class.
    + * <p>
    + * <h4>Configuration Options</h4>
    + * <dl>
    + * <dt>drill.exec.sort.external.spill.fs</dt>
    + * <dd>The file system (file://, hdfs://, etc.) of the spill directory.</dd>
    + * <dt>drill.exec.sort.external.spill.directories</dt>
    + * <dd>The (comma? space?) separated list of directories, on the above file
    + * system, to which to spill files in round-robin fashion. The query will
    + * fail if any one of the directories becomes full.</dt>
    + * <dt>drill.exec.sort.external.spill.file_size</dt>
    + * <dd>Target size for first-generation spill files Set this to large
    + * enough to get nice long writes, but not so large that spill directories
    + * are overwhelmed.</dd>
    + * <dt>drill.exec.sort.external.mem_limit</dt>
    + * <dd>Maximum memory to use for the in-memory buffer. (Primarily for testing.)</dd>
    + * <dt>drill.exec.sort.external.batch_limit</dt>
    + * <dd>Maximum number of batches to hold in memory. (Primarily for testing.)</dd>
    + * <dt>drill.exec.sort.external.spill.max_count</dt>
    + * <dd>Maximum number of batches to add to “first generation” files.
    + * Defaults to 0 (no limit). (Primarily for testing.)</dd>
    + * <dt>drill.exec.sort.external.spill.min_count</dt>
    + * <dd>Minimum number of batches to add to “first generation” files.
    + * Defaults to 0 (no limit). (Primarily for testing.)</dd>
    + * <dt>drill.exec.sort.external.merge_limit</dt>
    + * <dd>Sets the maximum number of runs to be merged in a single pass (limits
    + * the number of open files.)</dd>
    + * </dl>
    + * <p>
    + * The memory limit observed by this operator is the lesser of:
    + * <ul>
    + * <li>The maximum allocation allowed the the allocator assigned to this batch,
or</li>
    + * <li>The maximum limit set for this operator by the Foreman.</li>
    + * <li>The maximum limit configured in the mem_limit parameter above. (Primarily
for
    + * testing.</li>
    + * </ul>
    + * <h4>Output</h4>
    + * It is helpful to note that the sort operator will produce one of two kinds of
    + * output batches.
    + * <ul>
    + * <li>A large output with sv4 if data is sorted in memory. The sv4 addresses
    + * the entire in-memory sort set. A selection vector remover will copy results
    + * into new batches of a size determined by that operator.</li>
    + * <li>A series of batches, without a selection vector, if the sort spills to
    + * disk. In this case, the downstream operator will still be a selection vector
    + * remover, but there is nothing for that operator to remove. Each batch is
    + * of the size set by {@link #MAX_MERGED_BATCH_SIZE}.</li>
    + * </ul>
    + * Note that, even in the in-memory sort case, this operator could do the copying
    + * to eliminate the extra selection vector remover. That is left as an exercise
    + * for another time.
    + * <h4>Logging</h4>
    + * Logging in this operator serves two purposes:
    + * <li>
    + * <ul>
    + * <li>Normal diagnostic information.</li>
    + * <li>Capturing the essence of the operator functionality for analysis in unit
    + * tests.</li>
    + * </ul>
    + * Test logging is designed to capture key events and timings. Take care
    + * when changing or removing log messages as you may need to adjust unit tests
    + * accordingly.
    + */
    +
    +public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
    +  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExternalSortBatch.class);
    +  protected static final ControlsInjector injector = ControlsInjectorFactory.getInjector(ExternalSortBatch.class);
    +
    +  private final RecordBatch incoming;
    +
    +  /**
    +   * Memory allocator for this operator itself. Incoming batches are
    +   * transferred into this allocator. Intermediate batches used during
    +   * merge also reside here.
    +   */
    +
    +  private final BufferAllocator allocator;
    +
    +  /**
    +   * Schema of batches that this operator produces.
    +   */
    +
    +  private BatchSchema schema;
    +
    +  private LinkedList<BatchGroup.InputBatch> bufferedBatches = Lists.newLinkedList();
    +  private LinkedList<BatchGroup.SpilledRun> spilledRuns = Lists.newLinkedList();
    +  private SelectionVector4 sv4;
    +
    +  /**
    +   * The number of records to add to each output batch sent to the
    +   * downstream operator or spilled to disk.
    +   */
    +
    +  private int outputBatchRecordCount;
    +  private int peakNumBatches = -1;
    +
    +  /**
    +   * The copier uses the COPIER_BATCH_MEM_LIMIT to estimate the target
    +   * number of records to return in each batch.
    +   * <p>
    +   * For reference, see {@link FlattenTemplate#OUTPUT_MEMORY_LIMIT}.
    +   * <p>
    +   * WARNING: Do not allow any one vector to grow beyond 16 MB. Drill contains a
    +   * design flaw that may give rise to fatal memory fragmentation if we allow it to
    +   * allocate larger vectors.
    +   */
    +
    +  private static final int MAX_MERGED_BATCH_SIZE = 16 * 1024 * 1024;
    +
    +  /**
    +   * Smallest allowed output batch size. The smallest output batch
    +   * created even under constrained memory conditions.
    +   */
    +  private static final int MIN_MERGED_BATCH_SIZE = 256 * 1024;
    +
    +  /**
    +   * The preferred amount of memory to set aside to output batches
    +   * expressed as a ratio of available memory.
    +   */
    +
    +  private static final float MERGE_BATCH_ALLOWANCE = 0.10F;
    +
    +  public static final String INTERRUPTION_AFTER_SORT = "after-sort";
    +  public static final String INTERRUPTION_AFTER_SETUP = "after-setup";
    +  public static final String INTERRUPTION_WHILE_SPILLING = "spilling";
    +
    +  private long memoryLimit;
    +
    +  /**
    +   * Iterates over the final, sorted results.
    +   */
    +
    +  private SortResults resultsIterator;
    +
    +  /**
    +   * Manages the set of spill directories and files.
    +   */
    +
    +  private final SpillSet spillSet;
    +
    +  /**
    +   * Manages the copier used to merge a collection of batches into
    +   * a new set of batches.
    +   */
    +
    +  private final CopierHolder copierHolder;
    +
    +  private enum SortState { START, LOAD, DELIVER, DONE }
    +  private SortState sortState = SortState.START;
    +  private int inputRecordCount = 0;
    +  private int inputBatchCount = 0; // total number of batches received so far
    +  private final OperatorCodeGenerator opCodeGen;
    +
    +  /**
    +   * Estimated size of the records for this query, updated on each
    +   * new batch received from upstream.
    +   */
    +
    +  private int estimatedRecordSize;
    +
    +  /**
    +   * Estimated size of the spill and output batches that this
    +   * operator produces, estimated from the estimated record
    +   * size.
    +   */
    +
    +  private long estimatedOutputBatchSize;
    +  private long estimatedInputBatchSize;
    +
    +  /**
    +   * Maximum number of batches to hold in memory.
    +   * (Primarily for testing.)
    +   */
    +
    +  private int bufferedBatchLimit;
    +  private int mergeLimit;
    +  private int minSpillLimit;
    +  private int maxSpillLimit;
    +  private long spillFileSize;
    +  private long minimumBufferSpace;
    +
    +  /**
    +   * Minimum memory level before spilling occurs. That is, we can buffer input
    +   * batches in memory until we are down to the level given by the spill point.
    +   */
    +
    +  private long spillPoint;
    +  private long mergeMemoryPool;
    +  private long preferredMergeBatchSize;
    +
    +  // WARNING: The enum here is used within this class. But, the members of
    +  // this enum MUST match those in the (unmanaged) ExternalSortBatch since
    +  // that is the enum used in the UI to display metrics for the query profile.
    +
    +  public enum Metric implements MetricDef {
    +    SPILL_COUNT,            // number of times operator spilled to disk
    +    RETIRED1,               // Was: peak value for totalSizeInMemory
    +                            // But operator already provides this value
    +    PEAK_BATCHES_IN_MEMORY, // maximum number of batches kept in memory
    +    MERGE_COUNT,            // Number of second+ generation merges
    +    MIN_BUFFER,             // Minimum memory level observed in operation.
    +    INPUT_BATCHES;          // Number of batches read from upstream.
    +
    +    @Override
    +    public int metricId() {
    +      return ordinal();
    +    }
    +  }
    +
    +  /**
    +   * Iterates over the final sorted results. Implemented differently
    +   * depending on whether the results are in-memory or spilled to
    +   * disk.
    +   */
    +
    +  public interface SortResults {
    +    boolean next();
    +    void close();
    +    int getBatchCount();
    +    int getRecordCount();
    +  }
    +
    +  public ExternalSortBatch(ExternalSort popConfig, FragmentContext context, RecordBatch
incoming) {
    +    super(popConfig, context, true);
    +    this.incoming = incoming;
    +    allocator = oContext.getAllocator();
    +    opCodeGen = new OperatorCodeGenerator(context, popConfig);
    +
    +    spillSet = new SpillSet(context, popConfig);
    +    copierHolder = new CopierHolder(context, allocator, opCodeGen);
    +    configure(context.getConfig());
    +  }
    +
    +  private void configure(DrillConfig config) {
    +
    +    // The maximum memory this operator can use. It is either the
    +    // limit set on the allocator or on the operator, whichever is
    +    // less.
    +
    +    memoryLimit = Math.min(popConfig.getMaxAllocation(), allocator.getLimit());
    +
    +    // Optional configured memory limit, typically used only for testing.
    +
    +    long configLimit = config.getBytes(ExecConstants.EXTERNAL_SORT_MAX_MEMORY);
    +    if (configLimit > 0) {
    +      memoryLimit = Math.min(memoryLimit, configLimit);
    +    }
    +
    +    // Optional limit on the number of buffered in-memory batches.
    +    // 0 means no limit. Used primarily for testing. Must allow at least two
    +    // batches or no merging can occur.
    +
    +    bufferedBatchLimit = getConfigLimit(config, ExecConstants.EXTERNAL_SORT_BATCH_LIMIT,
Integer.MAX_VALUE, 2);
    +
    +    // Optional limit on the number of spilled runs to merge in a single
    +    // pass. Limits the number of open file handles. Must allow at least
    +    // two batches to merge to make progress.
    +
    +    mergeLimit = getConfigLimit(config, ExecConstants.EXTERNAL_SORT_MERGE_LIMIT, Integer.MAX_VALUE,
2);
    +
    +    // Limits on the minimum and maximum buffered batches to spill per
    +    // spill event.
    +
    +    minSpillLimit = getConfigLimit(config, ExecConstants.EXTERNAL_SORT_MIN_SPILL, Integer.MAX_VALUE,
2);
    +    maxSpillLimit = getConfigLimit(config, ExecConstants.EXTERNAL_SORT_MAX_SPILL, Integer.MAX_VALUE,
2);
    +    if (minSpillLimit > maxSpillLimit) {
    +      minSpillLimit = Math.min(minSpillLimit, maxSpillLimit);
    +      maxSpillLimit = minSpillLimit;
    +    }
    +
    +    // Limits the size of first-generation spill files.
    +
    +    spillFileSize = config.getBytes(ExecConstants.EXTERNAL_SORT_SPILL_FILE_SIZE);
    +
    +    // Set the target output batch size. Use the maximum size, but only if
    +    // this represents less than 10% of available memory. Otherwise, use 10%
    +    // of memory, but no smaller than the minimum size. In any event, an
    +    // output batch can contain no fewer than a single record.
    +
    +    long maxAllowance = (long) (memoryLimit * MERGE_BATCH_ALLOWANCE);
    +    preferredMergeBatchSize = Math.min(maxAllowance, MAX_MERGED_BATCH_SIZE);
    +    preferredMergeBatchSize = Math.max(preferredMergeBatchSize, MIN_MERGED_BATCH_SIZE);
    +
    +    logger.debug("Config: memory limit = {}, batch limit = {}, " +
    +                 "min, max spill limit: {}, {}, merge limit = {}, merge batch size =
{}",
    +                  memoryLimit, bufferedBatchLimit, minSpillLimit, maxSpillLimit, mergeLimit,
    +                  preferredMergeBatchSize);
    +  }
    +
    +  private int getConfigLimit(DrillConfig config, String paramName, int valueIfZero, int
minValue) {
    +    int limit = config.getInt(paramName);
    +    if (limit > 0) {
    +      limit = Math.max(limit, minValue);
    +    } else {
    +      limit = valueIfZero;
    +    }
    +    return limit;
    +  }
    +
    +  @Override
    +  public int getRecordCount() {
    +    if (sv4 != null) {
    +      return sv4.getCount();
    +    }
    +    return container.getRecordCount();
    +  }
    +
    +  @Override
    +  public SelectionVector4 getSelectionVector4() {
    +    return sv4;
    +  }
    +
    +  private void closeBatchGroups(Collection<? extends BatchGroup> groups) {
    +    for (BatchGroup group: groups) {
    +      try {
    +        group.close();
    +      } catch (Exception e) {
    +        // collect all failure and make sure to cleanup all remaining batches
    +        // Originally we would have thrown a RuntimeException that would propagate to
FragmentExecutor.closeOutResources()
    +        // where it would have been passed to context.fail()
    +        // passing the exception directly to context.fail(e) will let the cleanup process
continue instead of stopping
    +        // right away, this will also make sure we collect any additional exception we
may get while cleaning up
    +        context.fail(e);
    +      }
    +    }
    +  }
    +
    +  @Override
    +  public void close() {
    +    try {
    +      if (bufferedBatches != null) {
    +        closeBatchGroups(bufferedBatches);
    +        bufferedBatches = null;
    +      }
    +      if (spilledRuns != null) {
    +        closeBatchGroups(spilledRuns);
    +        spilledRuns = null;
    +      }
    +    } finally {
    +      if (sv4 != null) {
    +        sv4.clear();
    +      }
    +      if (resultsIterator != null) {
    +        resultsIterator.close();
    +      }
    +      copierHolder.close();
    +      spillSet.close();
    +      opCodeGen.close();
    +
    +      // The call to super.close() clears out the output container.
    +      // Doing so requires the allocator here, so it must be closed
    +      // after the super call.
    +
    +      super.close();
    +      allocator.close();
    +    }
    +  }
    +
    +  /**
    +   * Called by {@link AbstractRecordBatch} as a fast-path to obtain
    +   * the first record batch and setup the schema of this batch in order
    +   * to quickly return the schema to the client. Note that this method
    +   * fetches the first batch from upstream which will be waiting for
    +   * us the first time that {@link #innerNext()} is called.
    +   */
    +
    +  @Override
    +  public void buildSchema() {
    +    IterOutcome outcome = next(incoming);
    +    switch (outcome) {
    +      case OK:
    +      case OK_NEW_SCHEMA:
    +        for (VectorWrapper<?> w : incoming) {
    +          @SuppressWarnings("resource")
    +          ValueVector v = container.addOrGet(w.getField());
    +          if (v instanceof AbstractContainerVector) {
    +            w.getValueVector().makeTransferPair(v); // Can we remove this hack?
    +            v.clear();
    +          }
    +          v.allocateNew(); // Can we remove this? - SVR fails with NPE (TODO)
    +        }
    +        container.buildSchema(SelectionVectorMode.NONE);
    +        container.setRecordCount(0);
    +        break;
    +      case STOP:
    +        state = BatchState.STOP;
    +        break;
    +      case OUT_OF_MEMORY:
    +        state = BatchState.OUT_OF_MEMORY;
    +        break;
    +      case NONE:
    +        state = BatchState.DONE;
    +        break;
    +      default:
    +        break;
    +    }
    +  }
    +
    +  /**
    +   * Process each request for a batch. The first request retrieves
    +   * the all incoming batches and sorts them, optionally spilling to
    +   * disk as needed. Subsequent calls retrieve the sorted results in
    +   * fixed-size batches.
    +   */
    +
    +  @Override
    +  public IterOutcome innerNext() {
    +    switch (sortState) {
    +    case DONE:
    +      return IterOutcome.NONE;
    +    case START:
    +    case LOAD:
    +      return load();
    +    case DELIVER:
    +      return nextOutputBatch();
    +    default:
    +      throw new IllegalStateException("Unexpected sort state: " + sortState);
    +    }
    +  }
    +
    +  private IterOutcome nextOutputBatch() {
    +    if (resultsIterator.next()) {
    +      return IterOutcome.OK;
    +    } else {
    +      logger.trace("Deliver phase complete: Returned {} batches, {} records",
    +                    resultsIterator.getBatchCount(), resultsIterator.getRecordCount());
    +      sortState = SortState.DONE;
    +      return IterOutcome.NONE;
    +    }
    +  }
    +
    +  /**
    +   * Load and process a single batch, handling schema changes. In general, the
    +   * external sort accepts only one schema. It can handle compatible schemas
    +   * (which seems to mean the same columns in possibly different orders.)
    +   *
    +   * @return return code depending on the amount of data read from upstream
    +   */
    +
    +  private IterOutcome loadBatch() {
    +
    +    // If this is the very first batch, then AbstractRecordBatch
    +    // already loaded it for us in buildSchema().
    +
    +    IterOutcome upstream;
    +    if (sortState == SortState.START) {
    +      sortState = SortState.LOAD;
    +      upstream = IterOutcome.OK_NEW_SCHEMA;
    +    } else {
    +      upstream = next(incoming);
    +    }
    +    switch (upstream) {
    +    case NONE:
    +      return upstream;
    +    case NOT_YET:
    +      throw new UnsupportedOperationException();
    +    case STOP:
    +      return upstream;
    +    case OK_NEW_SCHEMA:
    +    case OK:
    +      setupSchema(upstream);
    +
    +      // Add the batch to the in-memory generation, spilling if
    +      // needed.
    +
    +      processBatch();
    +      break;
    +    case OUT_OF_MEMORY:
    +
    +      // Note: it is highly doubtful that this code actually works. It
    +      // requires that the upstream batches got to a safe place to run
    +      // out of memory and that no work as in-flight and thus abandoned.
    +      // Consider removing this case once resource management is in place.
    +
    +      logger.debug("received OUT_OF_MEMORY, trying to spill");
    +      if (bufferedBatches.size() > 2) {
    +        spillFromMemory();
    +      } else {
    +        logger.debug("not enough batches to spill, sending OUT_OF_MEMORY downstream");
    +        return IterOutcome.OUT_OF_MEMORY;
    +      }
    +      break;
    +    default:
    +      throw new UnsupportedOperationException();
    +    }
    +    return IterOutcome.OK;
    +  }
    +
    +  /**
    +   * Load the results and sort them. May bail out early if an exceptional
    +   * condition is passed up from the input batch.
    +   *
    +   * @return return code: OK_NEW_SCHEMA if rows were sorted,
    +   * NONE if no rows
    +   */
    +
    +  private IterOutcome load() {
    +    logger.trace("Start of load phase");
    +
    +    // Clear the temporary container created by
    +    // buildSchema().
    +
    +    container.clear();
    +
    +    // Loop over all input batches
    +
    +    for (;;) {
    +      IterOutcome result = loadBatch();
    +
    +      // None means all batches have been read.
    +
    +      if (result == IterOutcome.NONE) {
    +        break; }
    +
    +      // Any outcome other than OK means something went wrong.
    +
    +      if (result != IterOutcome.OK) {
    +        return result; }
    +    }
    +
    +    // Anything to actually sort?
    +
    +    if (inputRecordCount == 0) {
    +      sortState = SortState.DONE;
    +      return IterOutcome.NONE;
    +    }
    +    logger.trace("Completed load phase: read {} batches", inputBatchCount);
    +
    +    // Do the merge of the loaded batches. The merge can be done entirely in memory if
    +    // the results fit; else we have to do a disk-based merge of
    +    // pre-sorted spilled batches.
    +
    +    if (canUseMemoryMerge()) {
    +      return sortInMemory();
    +    } else {
    +      return mergeSpilledRuns();
    +    }
    +  }
    +
    +  /**
    +   * All data has been read from the upstream batch. Determine if we
    +   * can use a fast in-memory sort, or must use a merge (which typically,
    +   * but not always, involves spilled batches.)
    +   *
    +   * @return whether sufficient resources exist to do an in-memory sort
    +   * if all batches are still in memory
    +   */
    +
    +  private boolean canUseMemoryMerge() {
    +    if (spillSet.hasSpilled()) { return false; }
    +
    +    // Do we have enough memory for MSorter (the in-memory sorter)?
    +
    +    long allocMem = allocator.getAllocatedMemory();
    +    long availableMem = memoryLimit - allocMem;
    +    long neededForInMemorySort = MSortTemplate.memoryNeeded(inputRecordCount);
    +    if (availableMem < neededForInMemorySort) { return false; }
    +
    +    // Make sure we don't exceed the maximum number of batches SV4 can address.
    +
    +    if (bufferedBatches.size() > Character.MAX_VALUE) { return false; }
    +
    +    // We can do an in-memory merge.
    +
    +    return true;
    +  }
    +
    +  /**
    +   * Handle a new schema from upstream. The ESB is quite limited in its ability
    +   * to handle schema changes.
    +   *
    +   * @param upstream the status code from upstream: either OK or OK_NEW_SCHEMA
    +   */
    +
    +  private void setupSchema(IterOutcome upstream)  {
    +
    +    // First batch: we won't have a schema.
    +
    +    if (schema == null) {
    +      schema = incoming.getSchema();
    +
    +    // Subsequent batches, nothing to do if same schema.
    +
    +    } else if (upstream == IterOutcome.OK) {
    +      return;
    +
    +    // Only change in the case that the schema truly changes. Artificial schema changes
are ignored.
    +
    +    } else if (incoming.getSchema().equals(schema)) {
    +      return;
    +    } else if (unionTypeEnabled) {
    +        schema = SchemaUtil.mergeSchemas(schema, incoming.getSchema());
    +
    +        // New schema: must generate a new sorter and copier.
    +
    +        opCodeGen.setSchema(schema);
    +    } else {
    +      throw UserException.unsupportedError()
    +            .message("Schema changes not supported in External Sort. Please enable Union
type.")
    +            .build(logger);
    +    }
    +
    +    // Coerce all existing batches to the new schema.
    +
    +    for (BatchGroup b : bufferedBatches) {
    +//      System.out.println("Before: " + allocator.getAllocatedMemory()); // Debug only
    +      b.setSchema(schema);
    +//      System.out.println("After: " + allocator.getAllocatedMemory()); // Debug only
    +    }
    +    for (BatchGroup b : spilledRuns) {
    +      b.setSchema(schema);
    +    }
    +  }
    +
    +  /**
    +   * Convert an incoming batch into the agree-upon format. (Also seems to
    +   * make a persistent shallow copy of the batch saved until we are ready
    +   * to sort or spill.)
    +   *
    +   * @return the converted batch, or null if the incoming batch is empty
    +   */
    +
    +  private VectorContainer convertBatch() {
    +
    +    // Must accept the batch even if no records. Then clear
    +    // the vectors to release memory since we won't do any
    +    // further processing with the empty batch.
    +
    +    VectorContainer convertedBatch = SchemaUtil.coerceContainer(incoming, schema, oContext);
    +    if (incoming.getRecordCount() == 0) {
    +      for (VectorWrapper<?> w : convertedBatch) {
    +        w.clear();
    +      }
    +      return null;
    +    }
    +    return convertedBatch;
    +  }
    +
    +  private SelectionVector2 makeSelectionVector() {
    +    if (incoming.getSchema().getSelectionVectorMode() == BatchSchema.SelectionVectorMode.TWO_BYTE)
{
    +      return incoming.getSelectionVector2().clone();
    +    } else {
    +      return newSV2();
    +    }
    +  }
    +
    +  /**
    +   * Process the converted incoming batch by adding it to the in-memory store
    +   * of data, or spilling data to disk when necessary.
    +   */
    +
    +  @SuppressWarnings("resource")
    +  private void processBatch() {
    +
    +    // The heart of the external sort operator: spill to disk when
    +    // the in-memory generation exceeds the allowed memory limit.
    +    // Preemptively spill BEFORE accepting the new batch into our memory
    +    // pool. The allocator will throw an OOM exception if we accept the
    +    // batch when we are near the limit - despite the fact that the batch
    +    // is already in memory and no new memory is allocated during the transfer.
    +
    +    if (isSpillNeeded()) {
    +      spillFromMemory();
    +    }
    +
    +    // Sanity check. We should now be above the spill point.
    +
    +    long startMem = allocator.getAllocatedMemory();
    +    if (memoryLimit - startMem < spillPoint) {
    +      logger.error( "ERROR: Failed to spill below the spill point. Spill point = {},
free memory = {}",
    +                    spillPoint, memoryLimit - startMem);
    +    }
    +
    +    // Convert the incoming batch to the agreed-upon schema.
    +    // No converted batch means we got an empty input batch.
    +    // Converting the batch transfers memory ownership to our
    +    // allocator. This gives a round-about way to learn the batch
    +    // size: check the before and after memory levels, then use
    +    // the difference as the batch size, in bytes.
    +
    +    VectorContainer convertedBatch = convertBatch();
    +    if (convertedBatch == null) {
    +      return;
    +    }
    +
    +    SelectionVector2 sv2 = makeSelectionVector();
    +
    +    // Compute batch size, including allocation of an sv2.
    +
    +    long endMem = allocator.getAllocatedMemory();
    +    long batchSize = endMem - startMem;
    +    int count = sv2.getCount();
    +    inputRecordCount += count;
    +    inputBatchCount++;
    +    stats.setLongStat(Metric.INPUT_BATCHES, inputBatchCount);
    +
    +    // Update the minimum buffer space metric.
    +
    +    if (minimumBufferSpace == 0) {
    +      minimumBufferSpace = endMem;
    +    } else {
    +      minimumBufferSpace = Math.min(minimumBufferSpace, endMem);
    +    }
    +    stats.setLongStat(Metric.MIN_BUFFER, minimumBufferSpace);
    +
    +    // Update the size based on the actual record count, not
    +    // the effective count as given by the selection vector
    +    // (which may exclude some records due to filtering.)
    +
    +    updateMemoryEstimates(batchSize, convertedBatch.getRecordCount());
    +
    +    // Sort the incoming batch using either the original selection vector,
    +    // or a new one created here.
    +
    +    SingleBatchSorter sorter;
    +    sorter = opCodeGen.getSorter(convertedBatch);
    +    try {
    +      sorter.setup(context, sv2, convertedBatch);
    +    } catch (SchemaChangeException e) {
    +      convertedBatch.clear();
    +      throw UserException.unsupportedError(e)
    +            .message("Unexpected schema change.")
    +            .build(logger);
    +    }
    +    try {
    +      sorter.sort(sv2);
    +    } catch (SchemaChangeException e) {
    +      throw UserException.unsupportedError(e)
    +                .message("Unexpected schema change.")
    +                .build(logger);
    +    }
    +    RecordBatchData rbd = new RecordBatchData(convertedBatch, allocator);
    +    try {
    +      rbd.setSv2(sv2);
    +      bufferedBatches.add(new BatchGroup.InputBatch(rbd.getContainer(), rbd.getSv2(),
oContext, batchSize));
    +      if (peakNumBatches < bufferedBatches.size()) {
    +        peakNumBatches = bufferedBatches.size();
    +        stats.setLongStat(Metric.PEAK_BATCHES_IN_MEMORY, peakNumBatches);
    +      }
    +
    +    } catch (Throwable t) {
    +      rbd.clear();
    +      throw t;
    +    }
    +  }
    +
    +  /**
    +   * Update the data-driven memory use numbers including:
    +   * <ul>
    +   * <li>The average size of incoming records.</li>
    +   * <li>The estimated spill and output batch size.</li>
    +   * <li>The estimated number of average-size records per
    +   * spill and output batch.</li>
    +   * <li>The amount of memory set aside to hold the incoming
    +   * batches before spilling starts.</li>
    +   * </ul>
    +   *
    +   * @param actualBatchSize the overall size of the current batch received from
    +   * upstream
    +   * @param actualRecordCount the number of actual (not filtered) records in
    +   * that upstream batch
    +   */
    +
    +  private void updateMemoryEstimates(long actualBatchSize, int actualRecordCount) {
    +
    +    // The record count should never be zero, but better safe than sorry...
    +
    +    if (actualRecordCount == 0) {
    +      return; }
    +
    +    // We know the batch size and number of records. Use that to estimate
    +    // the average record size. Since a typical batch has many records,
    +    // the average size is a fairly good estimator. Note that the batch
    +    // size includes not just the actual vector data, but any unused space
    +    // resulting from power-of-two allocation. This means that we don't
    +    // have to do size adjustments for input batches as we will do below
    +    // when estimating the size of other objects.
    +
    +    int batchRecordSize = (int) (actualBatchSize / actualRecordCount);
    +
    +    // Record sizes may vary across batches. To be conservative, use
    +    // the largest size observed from incoming batches.
    +
    +    int origEstimate = estimatedRecordSize;
    +    estimatedRecordSize = Math.max(estimatedRecordSize, batchRecordSize);
    +
    +    // Go no further if nothing changed.
    +
    +    if (estimatedRecordSize == origEstimate) {
    +      return; }
    +
    +    // Maintain an estimate of the incoming batch size: the largest
    +    // batch yet seen. Used to reserve memory for the next incoming
    +    // batch.
    +
    +    estimatedInputBatchSize = Math.max(estimatedInputBatchSize, actualBatchSize);
    +
    +    // Estimate the input record count. Add one due to rounding.
    +
    +    long estimatedInputRecordCount = estimatedInputBatchSize / estimatedRecordSize +
1;
    +
    +    // Estimate the total size of each incoming batch plus sv2. Note that, due
    +    // to power-of-two rounding, the allocated size might be twice the data size.
    +
    +    long estimatedInputSize = estimatedInputBatchSize + 4 * estimatedInputRecordCount;
    +
    +    // Determine the number of records to spill per merge step. The goal is to
    +    // spill batches of either 32K records, or as many records as fit into the
    +    // amount of memory dedicated to each batch, whichever is less.
    +
    +    outputBatchRecordCount = (int) Math.max(1, preferredMergeBatchSize / estimatedRecordSize);
    +    outputBatchRecordCount = Math.min(outputBatchRecordCount, Short.MAX_VALUE);
    +
    +    // Compute the estimated size of batches that this operator creates.
    +    // Note that this estimate DOES NOT apply to incoming batches as we have
    +    // no control over those. Note that the output batch size should be about
    +    // the same as the MAX_MERGED_BATCH_SIZE constant used to create the
    +    // estimated record count. But, it can be bigger if we have jumbo sized
    +    // records larger than the target output size.
    +
    +    estimatedOutputBatchSize = outputBatchRecordCount * estimatedRecordSize;
    +    estimatedOutputBatchSize = Math.max(estimatedOutputBatchSize, preferredMergeBatchSize);
    +
    +    // Determine the minimum memory needed for spilling. Spilling is done just
    +    // before accepting a batch, so we must spill if we don't have room for a
    +    // (worst case) input batch. To spill, we need room for the output batch created
    +    // by merging the batches already in memory. Double this to allow for power-of-two
    +    // memory allocations, then add one more as a margin of safety.
    +
    +    spillPoint = estimatedInputBatchSize + 3 * estimatedOutputBatchSize;
    +
    +    // Determine the minimum total memory we would need to receive two input
    +    // batches (the minimum needed to make progress) and the allowance for the
    +    // output batch.
    +
    +    long minLoadMemory = spillPoint + estimatedInputSize;
    +
    +    // The merge memory pool assumes we can spill all input batches. To make
    +    // progress, we must have at least two merge batches (same size as an output
    +    // batch) and one output batch. Again, double to allow for power-of-two
    +    // allocation and add one for a margin of error.
    +
    +    long minMergeMemory = (2*3 + 1) * estimatedOutputBatchSize;
    +
    +    // Determine how much memory can be used to hold in-memory batches of spilled
    +    // runs when reading from disk.
    +
    +    mergeMemoryPool = Math.max(minMergeMemory,
    +                               (long) ((memoryLimit - 3 * estimatedOutputBatchSize) *
0.95));
    +
    +    // Sanity check: if we've been given too little memory to make progress,
    +    // issue a warning but proceed anyway. Should only occur if something is
    +    // configured terribly wrong.
    +
    +    long minMemoryNeeds = Math.max(minLoadMemory, minMergeMemory);
    +    if (minMemoryNeeds > memoryLimit) {
    +      logger.warn("updateMemoryEstimates: potential memory overflow! " +
    +                   "Minumum needed = {} bytes, actual available = {} bytes",
    +                   minMemoryNeeds, memoryLimit);
    +    }
    +
    +    // Log the calculated values. Turn this on if things seem amiss.
    +    // Message will appear only when the values change.
    +
    +    logger.debug("Memory Estimates: record size = {} bytes; input batch = {} bytes, {}
records; " +
    +                  "output batch size = {} bytes, {} records; " +
    +                  "Available memory: {}, spill point = {}, min. merge memory = {}",
    +                estimatedRecordSize, estimatedInputBatchSize, estimatedInputRecordCount,
    +                estimatedOutputBatchSize, outputBatchRecordCount,
    +                memoryLimit, spillPoint, minMergeMemory);
    +  }
    +
    +  /**
    +   * Determine if spill is needed before receiving the new record batch.
    +   * Spilling is driven purely by memory availability (and an optional
    +   * batch limit for testing.)
    +   *
    +   * @return true if spilling is needed, false otherwise
    +   */
    +
    +  private boolean isSpillNeeded() {
    +
    +    // Can't spill if less than two batches else the merge
    +    // can't make progress.
    +
    +    if (bufferedBatches.size() < 2) {
    +      return false; }
    +
    +    // Must spill if we are below the spill point (the amount of memory
    +    // needed to do the minimal spill.)
    +
    +    long freeMemory = memoryLimit - allocator.getAllocatedMemory();
    +
    +    // Sanity check. If the memory goes negative, the calcs are off.
    +
    +    if (freeMemory < 0) {
    +      logger.error("ERROR: Free memory is negative: {}. Spill point = {}",
    +                   freeMemory, spillPoint);
    +    }
    +    if (freeMemory <= spillPoint) {
    +      return true; }
    +
    +    // Number of incoming batches (BatchGroups) exceed the limit and number of incoming
    +    // batches accumulated since the last spill exceed the defined limit
    --- End diff --
    
    Stale comment about my attempt to understand the original implementation. Replaced with
a better comment about the actual current implementation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message