drill-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (DRILL-5080) Create a memory-managed version of the External Sort operator
Date Wed, 01 Feb 2017 04:04:55 GMT

    [ https://issues.apache.org/jira/browse/DRILL-5080?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15847976#comment-15847976
] 

ASF GitHub Bot commented on DRILL-5080:
---------------------------------------

Github user Ben-Zvi commented on a diff in the pull request:

    https://github.com/apache/drill/pull/717#discussion_r98793059
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
---
    @@ -0,0 +1,1321 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.physical.impl.xsort.managed;
    +
    +import java.io.IOException;
    +import java.util.Collection;
    +import java.util.LinkedList;
    +import java.util.List;
    +
    +import org.apache.drill.common.AutoCloseables;
    +import org.apache.drill.common.config.DrillConfig;
    +import org.apache.drill.common.exceptions.UserException;
    +import org.apache.drill.exec.ExecConstants;
    +import org.apache.drill.exec.exception.OutOfMemoryException;
    +import org.apache.drill.exec.exception.SchemaChangeException;
    +import org.apache.drill.exec.memory.BufferAllocator;
    +import org.apache.drill.exec.ops.FragmentContext;
    +import org.apache.drill.exec.ops.MetricDef;
    +import org.apache.drill.exec.physical.config.ExternalSort;
    +import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
    +import org.apache.drill.exec.physical.impl.xsort.MSortTemplate;
    +import org.apache.drill.exec.physical.impl.xsort.SingleBatchSorter;
    +import org.apache.drill.exec.physical.impl.xsort.managed.BatchGroup.InputBatch;
    +import org.apache.drill.exec.record.AbstractRecordBatch;
    +import org.apache.drill.exec.record.BatchSchema;
    +import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
    +import org.apache.drill.exec.record.RecordBatch;
    +import org.apache.drill.exec.record.SchemaUtil;
    +import org.apache.drill.exec.record.VectorContainer;
    +import org.apache.drill.exec.record.VectorWrapper;
    +import org.apache.drill.exec.record.WritableBatch;
    +import org.apache.drill.exec.record.selection.SelectionVector2;
    +import org.apache.drill.exec.record.selection.SelectionVector4;
    +import org.apache.drill.exec.testing.ControlsInjector;
    +import org.apache.drill.exec.testing.ControlsInjectorFactory;
    +import org.apache.drill.exec.vector.ValueVector;
    +import org.apache.drill.exec.vector.complex.AbstractContainerVector;
    +
    +import com.google.common.collect.Lists;
    +
    +/**
    + * External sort batch: a sort batch which can spill to disk in
    + * order to operate within a defined memory footprint.
    + * <p>
    + * <h4>Basic Operation</h4>
    + * The operator has three key phases:
    + * <p>
    + * <ul>
    + * <li>The load phase in which batches are read from upstream.</li>
    + * <li>The merge phase in which spilled batches are combined to
    + * reduce the number of files below the configured limit. (Best
    + * practice is to configure the system to avoid this phase.)
    + * <li>The delivery phase in which batches are combined to produce
    + * the final output.</li>
    + * </ul>
    + * During the load phase:
    + * <p>
    + * <ul>
    + * <li>The incoming (upstream) operator provides a series of batches.</li>
    + * <li>This operator sorts each batch, and accumulates them in an in-memory
    + * buffer.</li>
    + * <li>If the in-memory buffer becomes too large, this operator selects
    + * a subset of the buffered batches to spill.</li>
    + * <li>Each spill set is merged to create a new, sorted collection of
    + * batches, and each is spilled to disk.</li>
    + * <li>To allow the use of multiple disk storage, each spill group is written
    + * round-robin to a set of spill directories.</li>
    + * </ul>
    + * <p>
    + * During the sort/merge phase:
    + * <p>
    + * <ul>
    + * <li>When the input operator is complete, this operator merges the accumulated
    + * batches (which may be all in memory or partially on disk), and returns
    + * them to the output (downstream) operator in chunks of no more than
    + * 32K records.</li>
    + * <li>The final merge must combine a collection of in-memory and spilled
    + * batches. Several limits apply to the maximum "width" of this merge. For
    + * example, we each open spill run consumes a file handle, and we may wish
    + * to limit the number of file handles. A consolidation phase combines
    + * in-memory and spilled batches prior to the final merge to control final
    + * merge width.</li>
    + * <li>A special case occurs if no batches were spilled. In this case, the input
    + * batches are sorted in memory without merging.</li>
    + * </ul>
    + * <p>
    + * Many complex details are involved in doing the above; the details are explained
    + * in the methods of this class.
    + * <p>
    + * <h4>Configuration Options</h4>
    + * <dl>
    + * <dt>drill.exec.sort.external.spill.fs</dt>
    + * <dd>The file system (file://, hdfs://, etc.) of the spill directory.</dd>
    + * <dt>drill.exec.sort.external.spill.directories</dt>
    + * <dd>The (comma? space?) separated list of directories, on the above file
    + * system, to which to spill files in round-robin fashion. The query will
    + * fail if any one of the directories becomes full.</dt>
    + * <dt>drill.exec.sort.external.spill.file_size</dt>
    + * <dd>Target size for first-generation spill files Set this to large
    + * enough to get nice long writes, but not so large that spill directories
    + * are overwhelmed.</dd>
    + * <dt>drill.exec.sort.external.mem_limit</dt>
    + * <dd>Maximum memory to use for the in-memory buffer. (Primarily for testing.)</dd>
    + * <dt>drill.exec.sort.external.batch_limit</dt>
    + * <dd>Maximum number of batches to hold in memory. (Primarily for testing.)</dd>
    + * <dt>drill.exec.sort.external.spill.max_count</dt>
    + * <dd>Maximum number of batches to add to “first generation” files.
    + * Defaults to 0 (no limit). (Primarily for testing.)</dd>
    + * <dt>drill.exec.sort.external.spill.min_count</dt>
    + * <dd>Minimum number of batches to add to “first generation” files.
    + * Defaults to 0 (no limit). (Primarily for testing.)</dd>
    + * <dt>drill.exec.sort.external.merge_limit</dt>
    + * <dd>Sets the maximum number of runs to be merged in a single pass (limits
    + * the number of open files.)</dd>
    + * </dl>
    + * <p>
    + * The memory limit observed by this operator is the lesser of:
    + * <ul>
    + * <li>The maximum allocation allowed the the allocator assigned to this batch,
or</li>
    + * <li>The maximum limit set for this operator by the Foreman.</li>
    + * <li>The maximum limit configured in the mem_limit parameter above. (Primarily
for
    + * testing.</li>
    + * </ul>
    + * <h4>Output</h4>
    + * It is helpful to note that the sort operator will produce one of two kinds of
    + * output batches.
    + * <ul>
    + * <li>A large output with sv4 if data is sorted in memory. The sv4 addresses
    + * the entire in-memory sort set. A selection vector remover will copy results
    + * into new batches of a size determined by that operator.</li>
    + * <li>A series of batches, without a selection vector, if the sort spills to
    + * disk. In this case, the downstream operator will still be a selection vector
    + * remover, but there is nothing for that operator to remove. Each batch is
    + * of the size set by {@link #MAX_MERGED_BATCH_SIZE}.</li>
    + * </ul>
    + * Note that, even in the in-memory sort case, this operator could do the copying
    + * to eliminate the extra selection vector remover. That is left as an exercise
    + * for another time.
    + * <h4>Logging</h4>
    + * Logging in this operator serves two purposes:
    + * <li>
    + * <ul>
    + * <li>Normal diagnostic information.</li>
    + * <li>Capturing the essence of the operator functionality for analysis in unit
    + * tests.</li>
    + * </ul>
    + * Test logging is designed to capture key events and timings. Take care
    + * when changing or removing log messages as you may need to adjust unit tests
    + * accordingly.
    + */
    +
    +public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
    +  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExternalSortBatch.class);
    +  protected static final ControlsInjector injector = ControlsInjectorFactory.getInjector(ExternalSortBatch.class);
    +
    +  private final RecordBatch incoming;
    +
    +  /**
    +   * Memory allocator for this operator itself. Incoming batches are
    +   * transferred into this allocator. Intermediate batches used during
    +   * merge also reside here.
    +   */
    +
    +  private final BufferAllocator allocator;
    +
    +  /**
    +   * Schema of batches that this operator produces.
    +   */
    +
    +  private BatchSchema schema;
    +
    +  private LinkedList<BatchGroup.InputBatch> bufferedBatches = Lists.newLinkedList();
    +  private LinkedList<BatchGroup.SpilledRun> spilledRuns = Lists.newLinkedList();
    +  private SelectionVector4 sv4;
    +
    +  /**
    +   * The number of records to add to each output batch sent to the
    +   * downstream operator or spilled to disk.
    +   */
    +
    +  private int outputBatchRecordCount;
    +  private int peakNumBatches = -1;
    +
    +  /**
    +   * The copier uses the COPIER_BATCH_MEM_LIMIT to estimate the target
    +   * number of records to return in each batch.
    +   * <p>
    +   * For reference, see {@link FlattenTemplate#OUTPUT_MEMORY_LIMIT}.
    +   * <p>
    +   * WARNING: Do not allow any one vector to grow beyond 16 MB. Drill contains a
    +   * design flaw that may give rise to fatal memory fragmentation if we allow it to
    +   * allocate larger vectors.
    +   */
    +
    +  private static final int MAX_MERGED_BATCH_SIZE = 16 * 1024 * 1024;
    +
    +  /**
    +   * Smallest allowed output batch size. The smallest output batch
    +   * created even under constrained memory conditions.
    +   */
    +  private static final int MIN_MERGED_BATCH_SIZE = 256 * 1024;
    +
    +  /**
    +   * The preferred amount of memory to set aside to output batches
    +   * expressed as a ratio of available memory.
    +   */
    +
    +  private static final float MERGE_BATCH_ALLOWANCE = 0.10F;
    +
    +  public static final String INTERRUPTION_AFTER_SORT = "after-sort";
    +  public static final String INTERRUPTION_AFTER_SETUP = "after-setup";
    +  public static final String INTERRUPTION_WHILE_SPILLING = "spilling";
    +
    +  private long memoryLimit;
    +
    +  /**
    +   * Iterates over the final, sorted results.
    +   */
    +
    +  private SortResults resultsIterator;
    +
    +  /**
    +   * Manages the set of spill directories and files.
    +   */
    +
    +  private final SpillSet spillSet;
    +
    +  /**
    +   * Manages the copier used to merge a collection of batches into
    +   * a new set of batches.
    +   */
    +
    +  private final CopierHolder copierHolder;
    +
    +  private enum SortState { START, LOAD, DELIVER, DONE }
    +  private SortState sortState = SortState.START;
    +  private int inputRecordCount = 0;
    +  private int inputBatchCount = 0; // total number of batches received so far
    +  private final OperatorCodeGenerator opCodeGen;
    +
    +  /**
    +   * Estimated size of the records for this query, updated on each
    +   * new batch received from upstream.
    +   */
    +
    +  private int estimatedRecordSize;
    +
    +  /**
    +   * Estimated size of the spill and output batches that this
    +   * operator produces, estimated from the estimated record
    +   * size.
    +   */
    +
    +  private long estimatedOutputBatchSize;
    +  private long estimatedInputBatchSize;
    +
    +  /**
    +   * Maximum number of batches to hold in memory.
    +   * (Primarily for testing.)
    +   */
    +
    +  private int bufferedBatchLimit;
    +  private int mergeLimit;
    +  private int minSpillLimit;
    +  private int maxSpillLimit;
    +  private long spillFileSize;
    +  private long minimumBufferSpace;
    +
    +  /**
    +   * Minimum memory level before spilling occurs. That is, we can buffer input
    +   * batches in memory until we are down to the level given by the spill point.
    +   */
    +
    +  private long spillPoint;
    +  private long mergeMemoryPool;
    +  private long preferredMergeBatchSize;
    +
    +  // WARNING: The enum here is used within this class. But, the members of
    +  // this enum MUST match those in the (unmanaged) ExternalSortBatch since
    +  // that is the enum used in the UI to display metrics for the query profile.
    +
    +  public enum Metric implements MetricDef {
    +    SPILL_COUNT,            // number of times operator spilled to disk
    +    RETIRED1,               // Was: peak value for totalSizeInMemory
    +                            // But operator already provides this value
    +    PEAK_BATCHES_IN_MEMORY, // maximum number of batches kept in memory
    +    MERGE_COUNT,            // Number of second+ generation merges
    +    MIN_BUFFER,             // Minimum memory level observed in operation.
    +    INPUT_BATCHES;          // Number of batches read from upstream.
    +
    +    @Override
    +    public int metricId() {
    +      return ordinal();
    +    }
    +  }
    +
    +  /**
    +   * Iterates over the final sorted results. Implemented differently
    +   * depending on whether the results are in-memory or spilled to
    +   * disk.
    +   */
    +
    +  public interface SortResults {
    +    boolean next();
    +    void close();
    +    int getBatchCount();
    +    int getRecordCount();
    +  }
    +
    +  public ExternalSortBatch(ExternalSort popConfig, FragmentContext context, RecordBatch
incoming) {
    +    super(popConfig, context, true);
    +    this.incoming = incoming;
    +    allocator = oContext.getAllocator();
    +    opCodeGen = new OperatorCodeGenerator(context, popConfig);
    +
    +    spillSet = new SpillSet(context, popConfig);
    +    copierHolder = new CopierHolder(context, allocator, opCodeGen);
    +    configure(context.getConfig());
    +  }
    +
    +  private void configure(DrillConfig config) {
    +
    +    // The maximum memory this operator can use. It is either the
    +    // limit set on the allocator or on the operator, whichever is
    +    // less.
    +
    +    memoryLimit = Math.min(popConfig.getMaxAllocation(), allocator.getLimit());
    +
    +    // Optional configured memory limit, typically used only for testing.
    +
    +    long configLimit = config.getBytes(ExecConstants.EXTERNAL_SORT_MAX_MEMORY);
    +    if (configLimit > 0) {
    +      memoryLimit = Math.min(memoryLimit, configLimit);
    +    }
    +
    +    // Optional limit on the number of buffered in-memory batches.
    +    // 0 means no limit. Used primarily for testing. Must allow at least two
    +    // batches or no merging can occur.
    +
    +    bufferedBatchLimit = getConfigLimit(config, ExecConstants.EXTERNAL_SORT_BATCH_LIMIT,
Integer.MAX_VALUE, 2);
    +
    +    // Optional limit on the number of spilled runs to merge in a single
    +    // pass. Limits the number of open file handles. Must allow at least
    +    // two batches to merge to make progress.
    +
    +    mergeLimit = getConfigLimit(config, ExecConstants.EXTERNAL_SORT_MERGE_LIMIT, Integer.MAX_VALUE,
2);
    +
    +    // Limits on the minimum and maximum buffered batches to spill per
    +    // spill event.
    +
    +    minSpillLimit = getConfigLimit(config, ExecConstants.EXTERNAL_SORT_MIN_SPILL, Integer.MAX_VALUE,
2);
    +    maxSpillLimit = getConfigLimit(config, ExecConstants.EXTERNAL_SORT_MAX_SPILL, Integer.MAX_VALUE,
2);
    +    if (minSpillLimit > maxSpillLimit) {
    +      minSpillLimit = Math.min(minSpillLimit, maxSpillLimit);
    +      maxSpillLimit = minSpillLimit;
    +    }
    +
    +    // Limits the size of first-generation spill files.
    +
    +    spillFileSize = config.getBytes(ExecConstants.EXTERNAL_SORT_SPILL_FILE_SIZE);
    +
    +    // Set the target output batch size. Use the maximum size, but only if
    +    // this represents less than 10% of available memory. Otherwise, use 10%
    +    // of memory, but no smaller than the minimum size. In any event, an
    +    // output batch can contain no fewer than a single record.
    +
    +    long maxAllowance = (long) (memoryLimit * MERGE_BATCH_ALLOWANCE);
    +    preferredMergeBatchSize = Math.min(maxAllowance, MAX_MERGED_BATCH_SIZE);
    +    preferredMergeBatchSize = Math.max(preferredMergeBatchSize, MIN_MERGED_BATCH_SIZE);
    +
    +    logger.debug("Config: memory limit = {}, batch limit = {}, " +
    +                 "min, max spill limit: {}, {}, merge limit = {}, merge batch size =
{}",
    +                  memoryLimit, bufferedBatchLimit, minSpillLimit, maxSpillLimit, mergeLimit,
    +                  preferredMergeBatchSize);
    +  }
    +
    +  private int getConfigLimit(DrillConfig config, String paramName, int valueIfZero, int
minValue) {
    +    int limit = config.getInt(paramName);
    +    if (limit > 0) {
    +      limit = Math.max(limit, minValue);
    +    } else {
    +      limit = valueIfZero;
    +    }
    +    return limit;
    +  }
    +
    +  @Override
    +  public int getRecordCount() {
    +    if (sv4 != null) {
    +      return sv4.getCount();
    +    }
    +    return container.getRecordCount();
    +  }
    +
    +  @Override
    +  public SelectionVector4 getSelectionVector4() {
    +    return sv4;
    +  }
    +
    +  private void closeBatchGroups(Collection<? extends BatchGroup> groups) {
    +    for (BatchGroup group: groups) {
    +      try {
    +        group.close();
    +      } catch (Exception e) {
    +        // collect all failure and make sure to cleanup all remaining batches
    +        // Originally we would have thrown a RuntimeException that would propagate to
FragmentExecutor.closeOutResources()
    +        // where it would have been passed to context.fail()
    +        // passing the exception directly to context.fail(e) will let the cleanup process
continue instead of stopping
    +        // right away, this will also make sure we collect any additional exception we
may get while cleaning up
    +        context.fail(e);
    +      }
    +    }
    +  }
    +
    +  @Override
    +  public void close() {
    +    try {
    +      if (bufferedBatches != null) {
    +        closeBatchGroups(bufferedBatches);
    +        bufferedBatches = null;
    +      }
    +      if (spilledRuns != null) {
    +        closeBatchGroups(spilledRuns);
    +        spilledRuns = null;
    +      }
    +    } finally {
    +      if (sv4 != null) {
    +        sv4.clear();
    +      }
    +      if (resultsIterator != null) {
    +        resultsIterator.close();
    +      }
    +      copierHolder.close();
    +      spillSet.close();
    +      opCodeGen.close();
    +
    +      // The call to super.close() clears out the output container.
    +      // Doing so requires the allocator here, so it must be closed
    +      // after the super call.
    +
    +      super.close();
    +      allocator.close();
    +    }
    +  }
    +
    +  /**
    +   * Called by {@link AbstractRecordBatch} as a fast-path to obtain
    +   * the first record batch and setup the schema of this batch in order
    +   * to quickly return the schema to the client. Note that this method
    +   * fetches the first batch from upstream which will be waiting for
    +   * us the first time that {@link #innerNext()} is called.
    +   */
    +
    +  @Override
    +  public void buildSchema() {
    +    IterOutcome outcome = next(incoming);
    +    switch (outcome) {
    +      case OK:
    +      case OK_NEW_SCHEMA:
    +        for (VectorWrapper<?> w : incoming) {
    +          @SuppressWarnings("resource")
    +          ValueVector v = container.addOrGet(w.getField());
    +          if (v instanceof AbstractContainerVector) {
    +            w.getValueVector().makeTransferPair(v); // Can we remove this hack?
    +            v.clear();
    +          }
    +          v.allocateNew(); // Can we remove this? - SVR fails with NPE (TODO)
    +        }
    +        container.buildSchema(SelectionVectorMode.NONE);
    +        container.setRecordCount(0);
    +        break;
    +      case STOP:
    +        state = BatchState.STOP;
    +        break;
    +      case OUT_OF_MEMORY:
    +        state = BatchState.OUT_OF_MEMORY;
    +        break;
    +      case NONE:
    +        state = BatchState.DONE;
    +        break;
    +      default:
    +        break;
    +    }
    +  }
    +
    +  /**
    +   * Process each request for a batch. The first request retrieves
    +   * the all incoming batches and sorts them, optionally spilling to
    +   * disk as needed. Subsequent calls retrieve the sorted results in
    +   * fixed-size batches.
    +   */
    +
    +  @Override
    +  public IterOutcome innerNext() {
    +    switch (sortState) {
    +    case DONE:
    +      return IterOutcome.NONE;
    +    case START:
    +    case LOAD:
    +      return load();
    +    case DELIVER:
    +      return nextOutputBatch();
    +    default:
    +      throw new IllegalStateException("Unexpected sort state: " + sortState);
    +    }
    +  }
    +
    +  private IterOutcome nextOutputBatch() {
    +    if (resultsIterator.next()) {
    +      return IterOutcome.OK;
    +    } else {
    +      logger.trace("Deliver phase complete: Returned {} batches, {} records",
    +                    resultsIterator.getBatchCount(), resultsIterator.getRecordCount());
    +      sortState = SortState.DONE;
    +      return IterOutcome.NONE;
    +    }
    +  }
    +
    +  /**
    +   * Load and process a single batch, handling schema changes. In general, the
    +   * external sort accepts only one schema. It can handle compatible schemas
    +   * (which seems to mean the same columns in possibly different orders.)
    --- End diff --
    
    Not sure about the different column order ... maybe add a "maybe" to avoid misleading.



> Create a memory-managed version of the External Sort operator
> -------------------------------------------------------------
>
>                 Key: DRILL-5080
>                 URL: https://issues.apache.org/jira/browse/DRILL-5080
>             Project: Apache Drill
>          Issue Type: Improvement
>    Affects Versions: 1.8.0
>            Reporter: Paul Rogers
>            Assignee: Paul Rogers
>             Fix For: 1.10
>
>         Attachments: ManagedExternalSortDesign.pdf
>
>
> We propose to create a "managed" version of the external sort operator that works to
a clearly-defined memory limit. Attached is a design specification for the work.
> The project will include fixing a number of bugs related to the external sort, include
as sub-tasks of this umbrella task.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message