drill-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (DRILL-5080) Create a memory-managed version of the External Sort operator
Date Wed, 01 Feb 2017 04:04:54 GMT

    [ https://issues.apache.org/jira/browse/DRILL-5080?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15847971#comment-15847971
] 

ASF GitHub Bot commented on DRILL-5080:
---------------------------------------

Github user Ben-Zvi commented on a diff in the pull request:

    https://github.com/apache/drill/pull/717#discussion_r98821658
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
---
    @@ -0,0 +1,1321 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.physical.impl.xsort.managed;
    +
    +import java.io.IOException;
    +import java.util.Collection;
    +import java.util.LinkedList;
    +import java.util.List;
    +
    +import org.apache.drill.common.AutoCloseables;
    +import org.apache.drill.common.config.DrillConfig;
    +import org.apache.drill.common.exceptions.UserException;
    +import org.apache.drill.exec.ExecConstants;
    +import org.apache.drill.exec.exception.OutOfMemoryException;
    +import org.apache.drill.exec.exception.SchemaChangeException;
    +import org.apache.drill.exec.memory.BufferAllocator;
    +import org.apache.drill.exec.ops.FragmentContext;
    +import org.apache.drill.exec.ops.MetricDef;
    +import org.apache.drill.exec.physical.config.ExternalSort;
    +import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
    +import org.apache.drill.exec.physical.impl.xsort.MSortTemplate;
    +import org.apache.drill.exec.physical.impl.xsort.SingleBatchSorter;
    +import org.apache.drill.exec.physical.impl.xsort.managed.BatchGroup.InputBatch;
    +import org.apache.drill.exec.record.AbstractRecordBatch;
    +import org.apache.drill.exec.record.BatchSchema;
    +import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
    +import org.apache.drill.exec.record.RecordBatch;
    +import org.apache.drill.exec.record.SchemaUtil;
    +import org.apache.drill.exec.record.VectorContainer;
    +import org.apache.drill.exec.record.VectorWrapper;
    +import org.apache.drill.exec.record.WritableBatch;
    +import org.apache.drill.exec.record.selection.SelectionVector2;
    +import org.apache.drill.exec.record.selection.SelectionVector4;
    +import org.apache.drill.exec.testing.ControlsInjector;
    +import org.apache.drill.exec.testing.ControlsInjectorFactory;
    +import org.apache.drill.exec.vector.ValueVector;
    +import org.apache.drill.exec.vector.complex.AbstractContainerVector;
    +
    +import com.google.common.collect.Lists;
    +
    +/**
    + * External sort batch: a sort batch which can spill to disk in
    + * order to operate within a defined memory footprint.
    + * <p>
    + * <h4>Basic Operation</h4>
    + * The operator has three key phases:
    + * <p>
    + * <ul>
    + * <li>The load phase in which batches are read from upstream.</li>
    + * <li>The merge phase in which spilled batches are combined to
    + * reduce the number of files below the configured limit. (Best
    + * practice is to configure the system to avoid this phase.)
    + * <li>The delivery phase in which batches are combined to produce
    + * the final output.</li>
    + * </ul>
    + * During the load phase:
    + * <p>
    + * <ul>
    + * <li>The incoming (upstream) operator provides a series of batches.</li>
    + * <li>This operator sorts each batch, and accumulates them in an in-memory
    + * buffer.</li>
    + * <li>If the in-memory buffer becomes too large, this operator selects
    + * a subset of the buffered batches to spill.</li>
    + * <li>Each spill set is merged to create a new, sorted collection of
    + * batches, and each is spilled to disk.</li>
    + * <li>To allow the use of multiple disk storage, each spill group is written
    + * round-robin to a set of spill directories.</li>
    + * </ul>
    + * <p>
    + * During the sort/merge phase:
    + * <p>
    + * <ul>
    + * <li>When the input operator is complete, this operator merges the accumulated
    + * batches (which may be all in memory or partially on disk), and returns
    + * them to the output (downstream) operator in chunks of no more than
    + * 32K records.</li>
    + * <li>The final merge must combine a collection of in-memory and spilled
    + * batches. Several limits apply to the maximum "width" of this merge. For
    + * example, we each open spill run consumes a file handle, and we may wish
    + * to limit the number of file handles. A consolidation phase combines
    + * in-memory and spilled batches prior to the final merge to control final
    + * merge width.</li>
    + * <li>A special case occurs if no batches were spilled. In this case, the input
    + * batches are sorted in memory without merging.</li>
    + * </ul>
    + * <p>
    + * Many complex details are involved in doing the above; the details are explained
    + * in the methods of this class.
    + * <p>
    + * <h4>Configuration Options</h4>
    + * <dl>
    + * <dt>drill.exec.sort.external.spill.fs</dt>
    + * <dd>The file system (file://, hdfs://, etc.) of the spill directory.</dd>
    + * <dt>drill.exec.sort.external.spill.directories</dt>
    + * <dd>The (comma? space?) separated list of directories, on the above file
    + * system, to which to spill files in round-robin fashion. The query will
    + * fail if any one of the directories becomes full.</dt>
    + * <dt>drill.exec.sort.external.spill.file_size</dt>
    + * <dd>Target size for first-generation spill files Set this to large
    + * enough to get nice long writes, but not so large that spill directories
    + * are overwhelmed.</dd>
    + * <dt>drill.exec.sort.external.mem_limit</dt>
    + * <dd>Maximum memory to use for the in-memory buffer. (Primarily for testing.)</dd>
    + * <dt>drill.exec.sort.external.batch_limit</dt>
    + * <dd>Maximum number of batches to hold in memory. (Primarily for testing.)</dd>
    + * <dt>drill.exec.sort.external.spill.max_count</dt>
    + * <dd>Maximum number of batches to add to “first generation” files.
    + * Defaults to 0 (no limit). (Primarily for testing.)</dd>
    + * <dt>drill.exec.sort.external.spill.min_count</dt>
    + * <dd>Minimum number of batches to add to “first generation” files.
    + * Defaults to 0 (no limit). (Primarily for testing.)</dd>
    + * <dt>drill.exec.sort.external.merge_limit</dt>
    + * <dd>Sets the maximum number of runs to be merged in a single pass (limits
    + * the number of open files.)</dd>
    + * </dl>
    + * <p>
    + * The memory limit observed by this operator is the lesser of:
    + * <ul>
    + * <li>The maximum allocation allowed the the allocator assigned to this batch,
or</li>
    + * <li>The maximum limit set for this operator by the Foreman.</li>
    + * <li>The maximum limit configured in the mem_limit parameter above. (Primarily
for
    + * testing.</li>
    + * </ul>
    + * <h4>Output</h4>
    + * It is helpful to note that the sort operator will produce one of two kinds of
    + * output batches.
    + * <ul>
    + * <li>A large output with sv4 if data is sorted in memory. The sv4 addresses
    + * the entire in-memory sort set. A selection vector remover will copy results
    + * into new batches of a size determined by that operator.</li>
    + * <li>A series of batches, without a selection vector, if the sort spills to
    + * disk. In this case, the downstream operator will still be a selection vector
    + * remover, but there is nothing for that operator to remove. Each batch is
    + * of the size set by {@link #MAX_MERGED_BATCH_SIZE}.</li>
    + * </ul>
    + * Note that, even in the in-memory sort case, this operator could do the copying
    + * to eliminate the extra selection vector remover. That is left as an exercise
    + * for another time.
    + * <h4>Logging</h4>
    + * Logging in this operator serves two purposes:
    + * <li>
    + * <ul>
    + * <li>Normal diagnostic information.</li>
    + * <li>Capturing the essence of the operator functionality for analysis in unit
    + * tests.</li>
    + * </ul>
    + * Test logging is designed to capture key events and timings. Take care
    + * when changing or removing log messages as you may need to adjust unit tests
    + * accordingly.
    + */
    +
    +public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
    +  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExternalSortBatch.class);
    +  protected static final ControlsInjector injector = ControlsInjectorFactory.getInjector(ExternalSortBatch.class);
    +
    +  private final RecordBatch incoming;
    +
    +  /**
    +   * Memory allocator for this operator itself. Incoming batches are
    +   * transferred into this allocator. Intermediate batches used during
    +   * merge also reside here.
    +   */
    +
    +  private final BufferAllocator allocator;
    +
    +  /**
    +   * Schema of batches that this operator produces.
    +   */
    +
    +  private BatchSchema schema;
    +
    +  private LinkedList<BatchGroup.InputBatch> bufferedBatches = Lists.newLinkedList();
    +  private LinkedList<BatchGroup.SpilledRun> spilledRuns = Lists.newLinkedList();
    +  private SelectionVector4 sv4;
    +
    +  /**
    +   * The number of records to add to each output batch sent to the
    +   * downstream operator or spilled to disk.
    +   */
    +
    +  private int outputBatchRecordCount;
    +  private int peakNumBatches = -1;
    +
    +  /**
    +   * The copier uses the COPIER_BATCH_MEM_LIMIT to estimate the target
    +   * number of records to return in each batch.
    +   * <p>
    +   * For reference, see {@link FlattenTemplate#OUTPUT_MEMORY_LIMIT}.
    +   * <p>
    +   * WARNING: Do not allow any one vector to grow beyond 16 MB. Drill contains a
    +   * design flaw that may give rise to fatal memory fragmentation if we allow it to
    +   * allocate larger vectors.
    +   */
    +
    +  private static final int MAX_MERGED_BATCH_SIZE = 16 * 1024 * 1024;
    +
    +  /**
    +   * Smallest allowed output batch size. The smallest output batch
    +   * created even under constrained memory conditions.
    +   */
    +  private static final int MIN_MERGED_BATCH_SIZE = 256 * 1024;
    +
    +  /**
    +   * The preferred amount of memory to set aside to output batches
    +   * expressed as a ratio of available memory.
    +   */
    +
    +  private static final float MERGE_BATCH_ALLOWANCE = 0.10F;
    +
    +  public static final String INTERRUPTION_AFTER_SORT = "after-sort";
    +  public static final String INTERRUPTION_AFTER_SETUP = "after-setup";
    +  public static final String INTERRUPTION_WHILE_SPILLING = "spilling";
    +
    +  private long memoryLimit;
    +
    +  /**
    +   * Iterates over the final, sorted results.
    +   */
    +
    +  private SortResults resultsIterator;
    +
    +  /**
    +   * Manages the set of spill directories and files.
    +   */
    +
    +  private final SpillSet spillSet;
    +
    +  /**
    +   * Manages the copier used to merge a collection of batches into
    +   * a new set of batches.
    +   */
    +
    +  private final CopierHolder copierHolder;
    +
    +  private enum SortState { START, LOAD, DELIVER, DONE }
    +  private SortState sortState = SortState.START;
    +  private int inputRecordCount = 0;
    +  private int inputBatchCount = 0; // total number of batches received so far
    +  private final OperatorCodeGenerator opCodeGen;
    +
    +  /**
    +   * Estimated size of the records for this query, updated on each
    +   * new batch received from upstream.
    +   */
    +
    +  private int estimatedRecordSize;
    +
    +  /**
    +   * Estimated size of the spill and output batches that this
    +   * operator produces, estimated from the estimated record
    +   * size.
    +   */
    +
    +  private long estimatedOutputBatchSize;
    +  private long estimatedInputBatchSize;
    +
    +  /**
    +   * Maximum number of batches to hold in memory.
    +   * (Primarily for testing.)
    +   */
    +
    +  private int bufferedBatchLimit;
    +  private int mergeLimit;
    +  private int minSpillLimit;
    +  private int maxSpillLimit;
    +  private long spillFileSize;
    +  private long minimumBufferSpace;
    +
    +  /**
    +   * Minimum memory level before spilling occurs. That is, we can buffer input
    +   * batches in memory until we are down to the level given by the spill point.
    +   */
    +
    +  private long spillPoint;
    +  private long mergeMemoryPool;
    +  private long preferredMergeBatchSize;
    +
    +  // WARNING: The enum here is used within this class. But, the members of
    +  // this enum MUST match those in the (unmanaged) ExternalSortBatch since
    +  // that is the enum used in the UI to display metrics for the query profile.
    +
    +  public enum Metric implements MetricDef {
    +    SPILL_COUNT,            // number of times operator spilled to disk
    +    RETIRED1,               // Was: peak value for totalSizeInMemory
    +                            // But operator already provides this value
    +    PEAK_BATCHES_IN_MEMORY, // maximum number of batches kept in memory
    +    MERGE_COUNT,            // Number of second+ generation merges
    +    MIN_BUFFER,             // Minimum memory level observed in operation.
    +    INPUT_BATCHES;          // Number of batches read from upstream.
    +
    +    @Override
    +    public int metricId() {
    +      return ordinal();
    +    }
    +  }
    +
    +  /**
    +   * Iterates over the final sorted results. Implemented differently
    +   * depending on whether the results are in-memory or spilled to
    +   * disk.
    +   */
    +
    +  public interface SortResults {
    +    boolean next();
    +    void close();
    +    int getBatchCount();
    +    int getRecordCount();
    +  }
    +
    +  public ExternalSortBatch(ExternalSort popConfig, FragmentContext context, RecordBatch
incoming) {
    +    super(popConfig, context, true);
    +    this.incoming = incoming;
    +    allocator = oContext.getAllocator();
    +    opCodeGen = new OperatorCodeGenerator(context, popConfig);
    +
    +    spillSet = new SpillSet(context, popConfig);
    +    copierHolder = new CopierHolder(context, allocator, opCodeGen);
    +    configure(context.getConfig());
    +  }
    +
    +  private void configure(DrillConfig config) {
    +
    +    // The maximum memory this operator can use. It is either the
    +    // limit set on the allocator or on the operator, whichever is
    +    // less.
    +
    +    memoryLimit = Math.min(popConfig.getMaxAllocation(), allocator.getLimit());
    +
    +    // Optional configured memory limit, typically used only for testing.
    +
    +    long configLimit = config.getBytes(ExecConstants.EXTERNAL_SORT_MAX_MEMORY);
    +    if (configLimit > 0) {
    +      memoryLimit = Math.min(memoryLimit, configLimit);
    +    }
    +
    +    // Optional limit on the number of buffered in-memory batches.
    +    // 0 means no limit. Used primarily for testing. Must allow at least two
    +    // batches or no merging can occur.
    +
    +    bufferedBatchLimit = getConfigLimit(config, ExecConstants.EXTERNAL_SORT_BATCH_LIMIT,
Integer.MAX_VALUE, 2);
    +
    +    // Optional limit on the number of spilled runs to merge in a single
    +    // pass. Limits the number of open file handles. Must allow at least
    +    // two batches to merge to make progress.
    +
    +    mergeLimit = getConfigLimit(config, ExecConstants.EXTERNAL_SORT_MERGE_LIMIT, Integer.MAX_VALUE,
2);
    +
    +    // Limits on the minimum and maximum buffered batches to spill per
    +    // spill event.
    +
    +    minSpillLimit = getConfigLimit(config, ExecConstants.EXTERNAL_SORT_MIN_SPILL, Integer.MAX_VALUE,
2);
    +    maxSpillLimit = getConfigLimit(config, ExecConstants.EXTERNAL_SORT_MAX_SPILL, Integer.MAX_VALUE,
2);
    +    if (minSpillLimit > maxSpillLimit) {
    +      minSpillLimit = Math.min(minSpillLimit, maxSpillLimit);
    +      maxSpillLimit = minSpillLimit;
    +    }
    +
    +    // Limits the size of first-generation spill files.
    +
    +    spillFileSize = config.getBytes(ExecConstants.EXTERNAL_SORT_SPILL_FILE_SIZE);
    +
    +    // Set the target output batch size. Use the maximum size, but only if
    +    // this represents less than 10% of available memory. Otherwise, use 10%
    +    // of memory, but no smaller than the minimum size. In any event, an
    +    // output batch can contain no fewer than a single record.
    +
    +    long maxAllowance = (long) (memoryLimit * MERGE_BATCH_ALLOWANCE);
    +    preferredMergeBatchSize = Math.min(maxAllowance, MAX_MERGED_BATCH_SIZE);
    +    preferredMergeBatchSize = Math.max(preferredMergeBatchSize, MIN_MERGED_BATCH_SIZE);
    +
    +    logger.debug("Config: memory limit = {}, batch limit = {}, " +
    +                 "min, max spill limit: {}, {}, merge limit = {}, merge batch size =
{}",
    +                  memoryLimit, bufferedBatchLimit, minSpillLimit, maxSpillLimit, mergeLimit,
    +                  preferredMergeBatchSize);
    +  }
    +
    +  private int getConfigLimit(DrillConfig config, String paramName, int valueIfZero, int
minValue) {
    +    int limit = config.getInt(paramName);
    +    if (limit > 0) {
    +      limit = Math.max(limit, minValue);
    +    } else {
    +      limit = valueIfZero;
    +    }
    +    return limit;
    +  }
    +
    +  @Override
    +  public int getRecordCount() {
    +    if (sv4 != null) {
    +      return sv4.getCount();
    +    }
    +    return container.getRecordCount();
    +  }
    +
    +  @Override
    +  public SelectionVector4 getSelectionVector4() {
    +    return sv4;
    +  }
    +
    +  private void closeBatchGroups(Collection<? extends BatchGroup> groups) {
    +    for (BatchGroup group: groups) {
    +      try {
    +        group.close();
    +      } catch (Exception e) {
    +        // collect all failure and make sure to cleanup all remaining batches
    +        // Originally we would have thrown a RuntimeException that would propagate to
FragmentExecutor.closeOutResources()
    +        // where it would have been passed to context.fail()
    +        // passing the exception directly to context.fail(e) will let the cleanup process
continue instead of stopping
    +        // right away, this will also make sure we collect any additional exception we
may get while cleaning up
    +        context.fail(e);
    +      }
    +    }
    +  }
    +
    +  @Override
    +  public void close() {
    +    try {
    +      if (bufferedBatches != null) {
    +        closeBatchGroups(bufferedBatches);
    +        bufferedBatches = null;
    +      }
    +      if (spilledRuns != null) {
    +        closeBatchGroups(spilledRuns);
    +        spilledRuns = null;
    +      }
    +    } finally {
    +      if (sv4 != null) {
    +        sv4.clear();
    +      }
    +      if (resultsIterator != null) {
    +        resultsIterator.close();
    +      }
    +      copierHolder.close();
    +      spillSet.close();
    +      opCodeGen.close();
    +
    +      // The call to super.close() clears out the output container.
    +      // Doing so requires the allocator here, so it must be closed
    +      // after the super call.
    +
    +      super.close();
    +      allocator.close();
    +    }
    +  }
    +
    +  /**
    +   * Called by {@link AbstractRecordBatch} as a fast-path to obtain
    +   * the first record batch and setup the schema of this batch in order
    +   * to quickly return the schema to the client. Note that this method
    +   * fetches the first batch from upstream which will be waiting for
    +   * us the first time that {@link #innerNext()} is called.
    +   */
    +
    +  @Override
    +  public void buildSchema() {
    +    IterOutcome outcome = next(incoming);
    +    switch (outcome) {
    +      case OK:
    +      case OK_NEW_SCHEMA:
    +        for (VectorWrapper<?> w : incoming) {
    +          @SuppressWarnings("resource")
    +          ValueVector v = container.addOrGet(w.getField());
    +          if (v instanceof AbstractContainerVector) {
    +            w.getValueVector().makeTransferPair(v); // Can we remove this hack?
    +            v.clear();
    +          }
    +          v.allocateNew(); // Can we remove this? - SVR fails with NPE (TODO)
    +        }
    +        container.buildSchema(SelectionVectorMode.NONE);
    +        container.setRecordCount(0);
    +        break;
    +      case STOP:
    +        state = BatchState.STOP;
    +        break;
    +      case OUT_OF_MEMORY:
    +        state = BatchState.OUT_OF_MEMORY;
    +        break;
    +      case NONE:
    +        state = BatchState.DONE;
    +        break;
    +      default:
    +        break;
    +    }
    +  }
    +
    +  /**
    +   * Process each request for a batch. The first request retrieves
    +   * the all incoming batches and sorts them, optionally spilling to
    +   * disk as needed. Subsequent calls retrieve the sorted results in
    +   * fixed-size batches.
    +   */
    +
    +  @Override
    +  public IterOutcome innerNext() {
    +    switch (sortState) {
    +    case DONE:
    +      return IterOutcome.NONE;
    +    case START:
    +    case LOAD:
    +      return load();
    +    case DELIVER:
    +      return nextOutputBatch();
    +    default:
    +      throw new IllegalStateException("Unexpected sort state: " + sortState);
    +    }
    +  }
    +
    +  private IterOutcome nextOutputBatch() {
    +    if (resultsIterator.next()) {
    +      return IterOutcome.OK;
    +    } else {
    +      logger.trace("Deliver phase complete: Returned {} batches, {} records",
    +                    resultsIterator.getBatchCount(), resultsIterator.getRecordCount());
    +      sortState = SortState.DONE;
    +      return IterOutcome.NONE;
    +    }
    +  }
    +
    +  /**
    +   * Load and process a single batch, handling schema changes. In general, the
    +   * external sort accepts only one schema. It can handle compatible schemas
    +   * (which seems to mean the same columns in possibly different orders.)
    +   *
    +   * @return return code depending on the amount of data read from upstream
    +   */
    +
    +  private IterOutcome loadBatch() {
    +
    +    // If this is the very first batch, then AbstractRecordBatch
    +    // already loaded it for us in buildSchema().
    +
    +    IterOutcome upstream;
    +    if (sortState == SortState.START) {
    +      sortState = SortState.LOAD;
    +      upstream = IterOutcome.OK_NEW_SCHEMA;
    +    } else {
    +      upstream = next(incoming);
    +    }
    +    switch (upstream) {
    +    case NONE:
    +      return upstream;
    +    case NOT_YET:
    +      throw new UnsupportedOperationException();
    +    case STOP:
    +      return upstream;
    +    case OK_NEW_SCHEMA:
    +    case OK:
    +      setupSchema(upstream);
    +
    +      // Add the batch to the in-memory generation, spilling if
    +      // needed.
    +
    +      processBatch();
    +      break;
    +    case OUT_OF_MEMORY:
    +
    +      // Note: it is highly doubtful that this code actually works. It
    +      // requires that the upstream batches got to a safe place to run
    +      // out of memory and that no work as in-flight and thus abandoned.
    +      // Consider removing this case once resource management is in place.
    +
    +      logger.debug("received OUT_OF_MEMORY, trying to spill");
    +      if (bufferedBatches.size() > 2) {
    +        spillFromMemory();
    +      } else {
    +        logger.debug("not enough batches to spill, sending OUT_OF_MEMORY downstream");
    +        return IterOutcome.OUT_OF_MEMORY;
    +      }
    +      break;
    +    default:
    +      throw new UnsupportedOperationException();
    +    }
    +    return IterOutcome.OK;
    +  }
    +
    +  /**
    +   * Load the results and sort them. May bail out early if an exceptional
    +   * condition is passed up from the input batch.
    +   *
    +   * @return return code: OK_NEW_SCHEMA if rows were sorted,
    +   * NONE if no rows
    +   */
    +
    +  private IterOutcome load() {
    +    logger.trace("Start of load phase");
    +
    +    // Clear the temporary container created by
    +    // buildSchema().
    +
    +    container.clear();
    +
    +    // Loop over all input batches
    +
    +    for (;;) {
    +      IterOutcome result = loadBatch();
    +
    +      // None means all batches have been read.
    +
    +      if (result == IterOutcome.NONE) {
    +        break; }
    +
    +      // Any outcome other than OK means something went wrong.
    +
    +      if (result != IterOutcome.OK) {
    +        return result; }
    +    }
    +
    +    // Anything to actually sort?
    +
    +    if (inputRecordCount == 0) {
    +      sortState = SortState.DONE;
    +      return IterOutcome.NONE;
    +    }
    +    logger.trace("Completed load phase: read {} batches", inputBatchCount);
    +
    +    // Do the merge of the loaded batches. The merge can be done entirely in memory if
    +    // the results fit; else we have to do a disk-based merge of
    +    // pre-sorted spilled batches.
    +
    +    if (canUseMemoryMerge()) {
    +      return sortInMemory();
    +    } else {
    +      return mergeSpilledRuns();
    +    }
    +  }
    +
    +  /**
    +   * All data has been read from the upstream batch. Determine if we
    +   * can use a fast in-memory sort, or must use a merge (which typically,
    +   * but not always, involves spilled batches.)
    +   *
    +   * @return whether sufficient resources exist to do an in-memory sort
    +   * if all batches are still in memory
    +   */
    +
    +  private boolean canUseMemoryMerge() {
    +    if (spillSet.hasSpilled()) { return false; }
    +
    +    // Do we have enough memory for MSorter (the in-memory sorter)?
    +
    +    long allocMem = allocator.getAllocatedMemory();
    +    long availableMem = memoryLimit - allocMem;
    +    long neededForInMemorySort = MSortTemplate.memoryNeeded(inputRecordCount);
    +    if (availableMem < neededForInMemorySort) { return false; }
    +
    +    // Make sure we don't exceed the maximum number of batches SV4 can address.
    +
    +    if (bufferedBatches.size() > Character.MAX_VALUE) { return false; }
    +
    +    // We can do an in-memory merge.
    +
    +    return true;
    +  }
    +
    +  /**
    +   * Handle a new schema from upstream. The ESB is quite limited in its ability
    +   * to handle schema changes.
    +   *
    +   * @param upstream the status code from upstream: either OK or OK_NEW_SCHEMA
    +   */
    +
    +  private void setupSchema(IterOutcome upstream)  {
    +
    +    // First batch: we won't have a schema.
    +
    +    if (schema == null) {
    +      schema = incoming.getSchema();
    +
    +    // Subsequent batches, nothing to do if same schema.
    +
    +    } else if (upstream == IterOutcome.OK) {
    +      return;
    +
    +    // Only change in the case that the schema truly changes. Artificial schema changes
are ignored.
    +
    +    } else if (incoming.getSchema().equals(schema)) {
    +      return;
    +    } else if (unionTypeEnabled) {
    +        schema = SchemaUtil.mergeSchemas(schema, incoming.getSchema());
    +
    +        // New schema: must generate a new sorter and copier.
    +
    +        opCodeGen.setSchema(schema);
    +    } else {
    +      throw UserException.unsupportedError()
    +            .message("Schema changes not supported in External Sort. Please enable Union
type.")
    +            .build(logger);
    +    }
    +
    +    // Coerce all existing batches to the new schema.
    +
    +    for (BatchGroup b : bufferedBatches) {
    +//      System.out.println("Before: " + allocator.getAllocatedMemory()); // Debug only
    +      b.setSchema(schema);
    +//      System.out.println("After: " + allocator.getAllocatedMemory()); // Debug only
    +    }
    +    for (BatchGroup b : spilledRuns) {
    +      b.setSchema(schema);
    +    }
    +  }
    +
    +  /**
    +   * Convert an incoming batch into the agree-upon format. (Also seems to
    +   * make a persistent shallow copy of the batch saved until we are ready
    +   * to sort or spill.)
    +   *
    +   * @return the converted batch, or null if the incoming batch is empty
    +   */
    +
    +  private VectorContainer convertBatch() {
    +
    +    // Must accept the batch even if no records. Then clear
    +    // the vectors to release memory since we won't do any
    +    // further processing with the empty batch.
    +
    +    VectorContainer convertedBatch = SchemaUtil.coerceContainer(incoming, schema, oContext);
    +    if (incoming.getRecordCount() == 0) {
    +      for (VectorWrapper<?> w : convertedBatch) {
    +        w.clear();
    +      }
    +      return null;
    +    }
    +    return convertedBatch;
    +  }
    +
    +  private SelectionVector2 makeSelectionVector() {
    +    if (incoming.getSchema().getSelectionVectorMode() == BatchSchema.SelectionVectorMode.TWO_BYTE)
{
    +      return incoming.getSelectionVector2().clone();
    +    } else {
    +      return newSV2();
    +    }
    +  }
    +
    +  /**
    +   * Process the converted incoming batch by adding it to the in-memory store
    +   * of data, or spilling data to disk when necessary.
    +   */
    +
    +  @SuppressWarnings("resource")
    +  private void processBatch() {
    +
    +    // The heart of the external sort operator: spill to disk when
    +    // the in-memory generation exceeds the allowed memory limit.
    +    // Preemptively spill BEFORE accepting the new batch into our memory
    +    // pool. The allocator will throw an OOM exception if we accept the
    +    // batch when we are near the limit - despite the fact that the batch
    +    // is already in memory and no new memory is allocated during the transfer.
    +
    +    if (isSpillNeeded()) {
    +      spillFromMemory();
    +    }
    +
    +    // Sanity check. We should now be above the spill point.
    +
    +    long startMem = allocator.getAllocatedMemory();
    +    if (memoryLimit - startMem < spillPoint) {
    +      logger.error( "ERROR: Failed to spill below the spill point. Spill point = {},
free memory = {}",
    +                    spillPoint, memoryLimit - startMem);
    +    }
    +
    +    // Convert the incoming batch to the agreed-upon schema.
    +    // No converted batch means we got an empty input batch.
    +    // Converting the batch transfers memory ownership to our
    +    // allocator. This gives a round-about way to learn the batch
    +    // size: check the before and after memory levels, then use
    +    // the difference as the batch size, in bytes.
    +
    +    VectorContainer convertedBatch = convertBatch();
    +    if (convertedBatch == null) {
    +      return;
    +    }
    +
    +    SelectionVector2 sv2 = makeSelectionVector();
    +
    +    // Compute batch size, including allocation of an sv2.
    +
    +    long endMem = allocator.getAllocatedMemory();
    +    long batchSize = endMem - startMem;
    +    int count = sv2.getCount();
    +    inputRecordCount += count;
    +    inputBatchCount++;
    +    stats.setLongStat(Metric.INPUT_BATCHES, inputBatchCount);
    +
    +    // Update the minimum buffer space metric.
    +
    +    if (minimumBufferSpace == 0) {
    +      minimumBufferSpace = endMem;
    +    } else {
    +      minimumBufferSpace = Math.min(minimumBufferSpace, endMem);
    +    }
    +    stats.setLongStat(Metric.MIN_BUFFER, minimumBufferSpace);
    +
    +    // Update the size based on the actual record count, not
    +    // the effective count as given by the selection vector
    +    // (which may exclude some records due to filtering.)
    +
    +    updateMemoryEstimates(batchSize, convertedBatch.getRecordCount());
    +
    +    // Sort the incoming batch using either the original selection vector,
    +    // or a new one created here.
    +
    +    SingleBatchSorter sorter;
    +    sorter = opCodeGen.getSorter(convertedBatch);
    +    try {
    +      sorter.setup(context, sv2, convertedBatch);
    +    } catch (SchemaChangeException e) {
    +      convertedBatch.clear();
    +      throw UserException.unsupportedError(e)
    +            .message("Unexpected schema change.")
    +            .build(logger);
    +    }
    +    try {
    +      sorter.sort(sv2);
    +    } catch (SchemaChangeException e) {
    +      throw UserException.unsupportedError(e)
    +                .message("Unexpected schema change.")
    +                .build(logger);
    +    }
    +    RecordBatchData rbd = new RecordBatchData(convertedBatch, allocator);
    +    try {
    +      rbd.setSv2(sv2);
    +      bufferedBatches.add(new BatchGroup.InputBatch(rbd.getContainer(), rbd.getSv2(),
oContext, batchSize));
    +      if (peakNumBatches < bufferedBatches.size()) {
    +        peakNumBatches = bufferedBatches.size();
    +        stats.setLongStat(Metric.PEAK_BATCHES_IN_MEMORY, peakNumBatches);
    +      }
    +
    +    } catch (Throwable t) {
    +      rbd.clear();
    +      throw t;
    +    }
    +  }
    +
    +  /**
    +   * Update the data-driven memory use numbers including:
    +   * <ul>
    +   * <li>The average size of incoming records.</li>
    +   * <li>The estimated spill and output batch size.</li>
    +   * <li>The estimated number of average-size records per
    +   * spill and output batch.</li>
    +   * <li>The amount of memory set aside to hold the incoming
    +   * batches before spilling starts.</li>
    +   * </ul>
    +   *
    +   * @param actualBatchSize the overall size of the current batch received from
    +   * upstream
    +   * @param actualRecordCount the number of actual (not filtered) records in
    +   * that upstream batch
    +   */
    +
    +  private void updateMemoryEstimates(long actualBatchSize, int actualRecordCount) {
    +
    +    // The record count should never be zero, but better safe than sorry...
    +
    +    if (actualRecordCount == 0) {
    +      return; }
    +
    +    // We know the batch size and number of records. Use that to estimate
    +    // the average record size. Since a typical batch has many records,
    +    // the average size is a fairly good estimator. Note that the batch
    +    // size includes not just the actual vector data, but any unused space
    +    // resulting from power-of-two allocation. This means that we don't
    +    // have to do size adjustments for input batches as we will do below
    +    // when estimating the size of other objects.
    +
    +    int batchRecordSize = (int) (actualBatchSize / actualRecordCount);
    +
    +    // Record sizes may vary across batches. To be conservative, use
    +    // the largest size observed from incoming batches.
    +
    +    int origEstimate = estimatedRecordSize;
    +    estimatedRecordSize = Math.max(estimatedRecordSize, batchRecordSize);
    +
    +    // Go no further if nothing changed.
    +
    +    if (estimatedRecordSize == origEstimate) {
    +      return; }
    +
    +    // Maintain an estimate of the incoming batch size: the largest
    +    // batch yet seen. Used to reserve memory for the next incoming
    +    // batch.
    +
    +    estimatedInputBatchSize = Math.max(estimatedInputBatchSize, actualBatchSize);
    --- End diff --
    
    Need to move this line above the "return" on line 892 above -- Just in case a huge batch
comes, but with a huge record count such that the average record does not exceed the previous
estimate -- then we exit on line 892 without capturing that huge batch size.
      Actually the same applies to some more code below that makes use of the est. input batch
size.



> Create a memory-managed version of the External Sort operator
> -------------------------------------------------------------
>
>                 Key: DRILL-5080
>                 URL: https://issues.apache.org/jira/browse/DRILL-5080
>             Project: Apache Drill
>          Issue Type: Improvement
>    Affects Versions: 1.8.0
>            Reporter: Paul Rogers
>            Assignee: Paul Rogers
>             Fix For: 1.10
>
>         Attachments: ManagedExternalSortDesign.pdf
>
>
> We propose to create a "managed" version of the external sort operator that works to
a clearly-defined memory limit. Attached is a design specification for the work.
> The project will include fixing a number of bugs related to the external sort, include
as sub-tasks of this umbrella task.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Mime
View raw message