drill-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (DRILL-5325) Implement sub-operator unit tests for managed external sort
Date Sat, 27 May 2017 01:23:05 GMT

    [ https://issues.apache.org/jira/browse/DRILL-5325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16027099#comment-16027099
] 

ASF GitHub Bot commented on DRILL-5325:
---------------------------------------

Github user Ben-Zvi commented on a diff in the pull request:

    https://github.com/apache/drill/pull/808#discussion_r118797001
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortImpl.java
---
    @@ -0,0 +1,495 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.physical.impl.xsort.managed;
    +
    +import java.io.IOException;
    +import java.util.List;
    +
    +import org.apache.drill.exec.memory.BufferAllocator;
    +import org.apache.drill.exec.ops.OperExecContext;
    +import org.apache.drill.exec.physical.impl.spill.RecordBatchSizer;
    +import org.apache.drill.exec.physical.impl.xsort.MSortTemplate;
    +import org.apache.drill.exec.physical.impl.xsort.managed.BatchGroup.InputBatch;
    +import org.apache.drill.exec.physical.impl.xsort.managed.SortMemoryManager.MergeTask;
    +import org.apache.drill.exec.record.BatchSchema;
    +import org.apache.drill.exec.record.VectorAccessible;
    +import org.apache.drill.exec.record.VectorAccessibleUtilities;
    +import org.apache.drill.exec.record.VectorContainer;
    +import org.apache.drill.exec.record.selection.SelectionVector2;
    +import org.apache.drill.exec.record.selection.SelectionVector4;
    +
    +/**
    + * Implementation of the external sort which is wrapped into the Drill
    + * "next" protocol by the {@link ExternalSortBatch} class.
    + * <p>
    + * Accepts incoming batches. Sorts each and will spill to disk as needed.
    + * When all input is delivered, can either do an in-memory merge or a
    + * merge from disk. If runs spilled, may have to do one or more "consolidation"
    + * passes to reduce the number of runs to the level that will fit in memory.
    + */
    +
    +public class SortImpl {
    +  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExternalSortBatch.class);
    +
    +  /**
    +   * Iterates over the final sorted results. Implemented differently
    +   * depending on whether the results are in-memory or spilled to
    +   * disk.
    +   */
    +
    +  public interface SortResults {
    +    /**
    +     * Container into which results are delivered. May the
    +     * the original operator container, or may be a different
    +     * one. This is the container that should be sent
    +     * downstream. This is a fixed value for all returned
    +     * results.
    +     * @return
    +     */
    +    VectorContainer getContainer();
    +    boolean next();
    +    void close();
    +    int getBatchCount();
    +    int getRecordCount();
    +    SelectionVector2 getSv2();
    +    SelectionVector4 getSv4();
    +  }
    +
    +  private final SortConfig config;
    +  private final SortMetrics metrics;
    +  private final SortMemoryManager memManager;
    +  private VectorContainer outputBatch;
    +  private OperExecContext context;
    +
    +  /**
    +   * Memory allocator for this operator itself. Incoming batches are
    +   * transferred into this allocator. Intermediate batches used during
    +   * merge also reside here.
    +   */
    +
    +  private final BufferAllocator allocator;
    +
    +  private final SpilledRuns spilledRuns;
    +
    +  private final BufferedBatches bufferedBatches;
    +
    +  public SortImpl(OperExecContext opContext, SortConfig sortConfig, SpilledRuns spilledRuns,
VectorContainer batch) {
    +    this.context = opContext;
    +    outputBatch = batch;
    +    this.spilledRuns = spilledRuns;
    +    allocator = opContext.getAllocator();
    +    config = sortConfig;
    +    memManager = new SortMemoryManager(config, allocator.getLimit());
    +    metrics = new SortMetrics(opContext.getStats());
    +    bufferedBatches = new BufferedBatches(opContext);
    +
    +    // Reset the allocator to allow a 10% safety margin. This is done because
    +    // the memory manager will enforce the original limit. Changing the hard
    +    // limit will reduce the probability that random chance causes the allocator
    +    // to kill the query because of a small, spurious over-allocation.
    +
    +    allocator.setLimit((long)(allocator.getLimit() * 1.10));
    +  }
    +
    +  public void setSchema(BatchSchema schema) {
    +    bufferedBatches.setSchema(schema);
    +    spilledRuns.setSchema(schema);
    +  }
    +
    +  public boolean forceSpill() {
    +    if (bufferedBatches.size() < 2) {
    +      return false;
    +    }
    +    spillFromMemory();
    +    return true;
    +  }
    +
    +  /**
    +   * Process the converted incoming batch by adding it to the in-memory store
    +   * of data, or spilling data to disk when necessary.
    +   * @param incoming
    +   */
    +
    +  public void addBatch(VectorAccessible incoming) {
    +
    +    // Skip empty batches (such as the first one.)
    +
    +    if (incoming.getRecordCount() == 0) {
    +      VectorAccessibleUtilities.clear(incoming);
    +      return;
    +    }
    +
    +    // Determine actual sizes of the incoming batch before taking
    +    // ownership. Allows us to figure out if we need to spill first,
    +    // to avoid overflowing memory simply due to ownership transfer.
    +
    +    RecordBatchSizer sizer = analyzeIncomingBatch(incoming);
    +
    +    // The heart of the external sort operator: spill to disk when
    +    // the in-memory generation exceeds the allowed memory limit.
    +    // Preemptively spill BEFORE accepting the new batch into our memory
    +    // pool. The allocator will throw an OOM exception if we accept the
    +    // batch when we are near the limit - despite the fact that the batch
    +    // is already in memory and no new memory is allocated during the transfer.
    +
    +    if ( isSpillNeeded(sizer.actualSize())) {
    +      spillFromMemory();
    +    }
    +
    +    // Sanity check. We should now be below the buffer memory maximum.
    +
    +    long startMem = allocator.getAllocatedMemory();
    +    bufferedBatches.add(incoming, sizer.netSize());
    +
    +    // Compute batch size, including allocation of an sv2.
    +
    +    long endMem = allocator.getAllocatedMemory();
    +    long batchSize = endMem - startMem;
    +
    +    // Update the minimum buffer space metric.
    +
    +    metrics.updateInputMetrics(sizer.rowCount(), sizer.actualSize());
    +    metrics.updateMemory(memManager.freeMemory(endMem));
    +    metrics.updatePeakBatches(bufferedBatches.size());
    +
    +    // Update the size based on the actual record count, not
    +    // the effective count as given by the selection vector
    +    // (which may exclude some records due to filtering.)
    +
    +    validateBatchSize(sizer.actualSize(), batchSize);
    +    memManager.updateEstimates((int) batchSize, sizer.netRowWidth(), sizer.rowCount());
    +  }
    +
    +  /**
    +   * Scan the vectors in the incoming batch to determine batch size.
    +   *
    +   * @return an analysis of the incoming batch
    +   */
    +
    +  private RecordBatchSizer analyzeIncomingBatch(VectorAccessible incoming) {
    +    RecordBatchSizer sizer = new RecordBatchSizer(incoming);
    +    sizer.applySv2();
    +    if (metrics.getInputBatchCount() == 0) {
    +      logger.debug("{}", sizer.toString());
    +    }
    +    return sizer;
    +  }
    +
    +  /**
    +   * Determine if spill is needed before receiving the new record batch.
    +   * Spilling is driven purely by memory availability (and an optional
    +   * batch limit for testing.)
    +   *
    +   * @return true if spilling is needed, false otherwise
    +   */
    +
    +  private boolean isSpillNeeded(int incomingSize) {
    +
    +    // Can't spill if less than two batches else the merge
    +    // can't make progress.
    +
    +    if (bufferedBatches.size() < 2) {
    +      return false; }
    +
    +    if (bufferedBatches.size() >= config.getBufferedBatchLimit()) {
    +      return true; }
    +    return memManager.isSpillNeeded(allocator.getAllocatedMemory(), incomingSize);
    +  }
    +
    +  private void validateBatchSize(long actualBatchSize, long memoryDelta) {
    +    if (actualBatchSize != memoryDelta) {
    +      ExternalSortBatch.logger.debug("Memory delta: {}, actual batch size: {}, Diff:
{}",
    +                   memoryDelta, actualBatchSize, memoryDelta - actualBatchSize);
    +    }
    +  }
    +
    +  /**
    +   * This operator has accumulated a set of sorted incoming record batches.
    +   * We wish to spill some of them to disk. To do this, a "copier"
    +   * merges the target batches to produce a stream of new (merged) batches
    +   * which are then written to disk.
    +   * <p>
    +   * This method spills only half the accumulated batches
    +   * minimizing unnecessary disk writes. The exact count must lie between
    +   * the minimum and maximum spill counts.
    +   */
    +
    +  private void spillFromMemory() {
    +    int startCount = bufferedBatches.size();
    +    List<BatchGroup> batchesToSpill = bufferedBatches.prepareSpill(config.spillFileSize());
    +
    +    // Do the actual spill.
    +
    +    logger.trace("Spilling {} of {} batches, memory = {}",
    +        batchesToSpill.size(), startCount,
    +        allocator.getAllocatedMemory());
    +    int spillBatchRowCount = memManager.getSpillBatchRowCount();
    +    spilledRuns.mergeAndSpill(batchesToSpill, spillBatchRowCount);
    +    metrics.incrSpillCount();
    +  }
    +
    +  public SortMetrics getMetrics() { return metrics; }
    +
    +  public static class EmptyResults implements SortResults {
    +
    +    private final VectorContainer dest;
    +
    +    public EmptyResults(VectorContainer dest) {
    +      this.dest = dest;
    +    }
    +
    +    @Override
    +    public boolean next() { return false; }
    +
    +    @Override
    +    public void close() { }
    +
    +    @Override
    +    public int getBatchCount() { return 0; }
    +
    +    @Override
    +    public int getRecordCount() { return 0; }
    +
    +    @Override
    +    public SelectionVector4 getSv4() { return null; }
    +
    +    @Override
    +    public SelectionVector2 getSv2() { return null; }
    +
    +    @Override
    +    public VectorContainer getContainer() { return dest; }
    +  }
    +
    +  public SortResults startMerge() {
    +    if (metrics.getInputRowCount() == 0) {
    +      return new EmptyResults(outputBatch);
    +    }
    +
    +    logger.debug("Completed load phase: read {} batches, spilled {} times, total input
bytes: {}",
    +        metrics.getInputBatchCount(), spilledRuns.size(),
    +        metrics.getInputBytes());
    +
    +    // Do the merge of the loaded batches. The merge can be done entirely in memory if
    +    // the results fit; else we have to do a disk-based merge of
    +    // pre-sorted spilled batches.
    +
    +    boolean optimizeOn = true; // Debug only
    +    if (optimizeOn && metrics.getInputBatchCount() == 1) {
    +      return singleBatchResult();
    +    } else if (canUseMemoryMerge()) {
    +      return mergeInMemory();
    +    } else {
    +      return mergeSpilledRuns();
    +    }
    +  }
    +
    +  /**
    +   * Return results for a single input batch. No merge is needed;
    +   * the original (sorted) input batch is simply passed as the result.
    +   * Note that this version requires replacing the operator output
    +   * container with the batch container. (Vector ownership transfer
    +   * was already done when accepting the input batch.)
    +   */
    +
    +  public static class SingleBatchResults implements SortResults {
    +
    +    private boolean done;
    +    private final BatchGroup.InputBatch batch;
    +
    +    public SingleBatchResults(BatchGroup.InputBatch batch) {
    +      this.batch = batch;
    +    }
    +
    +    @Override
    +    public boolean next() {
    +      if (done) {
    +        return false;
    +      }
    +      done = true;
    +      return true;
    +    }
    +
    +    @Override
    +    public void close() {
    +      try {
    +        batch.close();
    +      } catch (IOException e) {
    +        // Should never occur for an input batch
    +        throw new IllegalStateException(e);
    +      }
    +    }
    +
    +    @Override
    +    public int getBatchCount() { return 1; }
    +
    +    @Override
    +    public int getRecordCount() { return batch.getRecordCount(); }
    +
    +    @Override
    +    public SelectionVector4 getSv4() { return null; }
    +
    +    @Override
    +    public SelectionVector2 getSv2() { return batch.getSv2(); }
    +
    +    @Override
    +    public VectorContainer getContainer() {return batch.getContainer(); }
    +  }
    +
    +  /**
    +   * Input consists of a single batch. Just return that batch as
    +   * the output.
    +   * @return results iterator over the single input batch
    +   */
    +
    +  private SortResults singleBatchResult() {
    +    List<InputBatch> batches = bufferedBatches.removeAll();
    +    return new SingleBatchResults(batches.get(0));
    +  }
    +
    +  /**
    +   * All data has been read from the upstream batch. Determine if we
    +   * can use a fast in-memory sort, or must use a merge (which typically,
    +   * but not always, involves spilled batches.)
    +   *
    +   * @return whether sufficient resources exist to do an in-memory sort
    +   * if all batches are still in memory
    +   */
    +
    +  private boolean canUseMemoryMerge() {
    +    if (spilledRuns.hasSpilled()) {
    +      return false; }
    +
    +    // Do we have enough memory for MSorter (the in-memory sorter)?
    +
    +    if (! memManager.hasMemoryMergeCapacity(allocator.getAllocatedMemory(), MSortTemplate.memoryNeeded(metrics.getInputRowCount())))
{
    +      return false; }
    +
    +    // Make sure we don't exceed the maximum number of batches SV4 can address.
    +
    +    if (bufferedBatches.size() > Character.MAX_VALUE) {
    +      return false; }
    +
    +    // We can do an in-memory merge.
    +
    +    return true;
    +  }
    +
    +  /**
    +   * Perform an in-memory sort of the buffered batches. Obviously can
    +   * be used only for the non-spilling case.
    +   *
    +   * @return DONE if no rows, OK_NEW_SCHEMA if at least one row
    +   */
    +
    +  private SortResults mergeInMemory() {
    +    logger.debug("Starting in-memory sort. Batches = {}, Records = {}, Memory = {}",
    +                 bufferedBatches.size(), metrics.getInputRowCount(),
    +                 allocator.getAllocatedMemory());
    +
    +    // Note the difference between how we handle batches here and in the spill/merge
    +    // case. In the spill/merge case, this class decides on the batch size to send
    +    // downstream. However, in the in-memory case, we must pass along all batches
    +    // in a single SV4. Attempts to do paging will result in errors. In the memory
    +    // merge case, the downstream Selection Vector Remover will split the one
    +    // big SV4 into multiple smaller batches to send further downstream.
    +
    +    // If the sort fails or is empty, clean up here. Otherwise, cleanup is done
    +    // by closing the resultsIterator after all results are returned downstream.
    +
    +    MergeSortWrapper memoryMerge = new MergeSortWrapper(context, outputBatch);
    +    try {
    +      memoryMerge.merge(bufferedBatches.removeAll());
    +    } catch (Throwable t) {
    +      memoryMerge.close();
    +      throw t;
    +    }
    +    logger.debug("Completed in-memory sort. Memory = {}",
    +                 allocator.getAllocatedMemory());
    +    return memoryMerge;
    +  }
    +
    +  /**
    +   * Perform merging of (typically spilled) batches. First consolidates batches
    +   * as needed, then performs a final merge that is read one batch at a time
    +   * to deliver batches to the downstream operator.
    +   *
    +   * @return an iterator over the merged batches
    +   */
    +
    +  private SortResults mergeSpilledRuns() {
    +    logger.debug("Starting consolidate phase. Batches = {}, Records = {}, Memory = {},
In-memory batches {}, spilled runs {}",
    +                 metrics.getInputBatchCount(), metrics.getInputRowCount(),
    +                 allocator.getAllocatedMemory(),
    +                 bufferedBatches.size(), spilledRuns.size());
    +
    +    // Consolidate batches to a number that can be merged in
    +    // a single last pass.
    +
    +    while (consolidateBatches()) {
    --- End diff --
    
    This empty body loop looks odd; how about inlining consolidateBatches() here instead ?



> Implement sub-operator unit tests for managed external sort
> -----------------------------------------------------------
>
>                 Key: DRILL-5325
>                 URL: https://issues.apache.org/jira/browse/DRILL-5325
>             Project: Apache Drill
>          Issue Type: Improvement
>          Components: Tools, Build & Test
>    Affects Versions: 1.11.0
>            Reporter: Paul Rogers
>            Assignee: Paul Rogers
>             Fix For: 1.11.0
>
>
> Validate the proposed sub-operator test framework, by creating low-level unit tests for
the managed version of the external sort.
> The external sort has a small number of existing tests, but those tests are quite superficial;
the "managed sort" project found many bugs. The managed sort itself was tested with ad-hoc
system-level tests created using the new "cluster fixture" framework. But, again, such tests
could not reach deep inside the sort code to exercise very specific conditions.
> As a result, we spent far too much time using QA functional tests to identify specific
code issues.
> Using the sub-opeator unit test framework, we can instead test each bit of functionality
at the unit test level.
> If doing so works, and is practical, it can serve as a model for other operator testing
projects.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message