drill-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ben-Zvi <...@git.apache.org>
Subject [GitHub] drill pull request #717: DRILL-5080: Memory-managed version of external sort
Date Thu, 02 Feb 2017 03:14:28 GMT
Github user Ben-Zvi commented on a diff in the pull request:

    https://github.com/apache/drill/pull/717#discussion_r99036541
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSortTemplate.java
---
    @@ -0,0 +1,237 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.physical.impl.xsort.managed;
    +
    +import java.util.Queue;
    +
    +import javax.inject.Named;
    +
    +import org.apache.drill.exec.exception.SchemaChangeException;
    +import org.apache.drill.exec.memory.BaseAllocator;
    +import org.apache.drill.exec.memory.BufferAllocator;
    +import org.apache.drill.exec.ops.FragmentContext;
    +import org.apache.drill.exec.record.RecordBatch;
    +import org.apache.drill.exec.record.VectorContainer;
    +import org.apache.drill.exec.record.selection.SelectionVector4;
    +import org.apache.hadoop.util.IndexedSortable;
    +
    +import com.google.common.base.Preconditions;
    +import com.google.common.collect.Queues;
    +
    +import io.netty.buffer.DrillBuf;
    +
    +public abstract class MSortTemplate implements MSorter, IndexedSortable {
    +//  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MSortTemplate.class);
    +
    +  private SelectionVector4 vector4;
    +  private SelectionVector4 aux;
    +  @SuppressWarnings("unused")
    +  private long compares;
    +
    +  /**
    +   * Holds offsets into the SV4 of the start of each batch
    +   * (sorted run.)
    +   */
    +
    +  private Queue<Integer> runStarts = Queues.newLinkedBlockingQueue();
    +  private FragmentContext context;
    +
    +  /**
    +   * Controls the maximum size of batches exposed to downstream
    +   */
    +  private int desiredRecordBatchCount;
    +
    +  @Override
    +  public void setup(final FragmentContext context, final BufferAllocator allocator, final
SelectionVector4 vector4,
    +                    final VectorContainer hyperBatch, int outputBatchSize) throws SchemaChangeException{
    +    // we pass in the local hyperBatch since that is where we'll be reading data.
    +    Preconditions.checkNotNull(vector4);
    +    this.vector4 = vector4.createNewWrapperCurrent();
    +    this.context = context;
    +    vector4.clear();
    +    doSetup(context, hyperBatch, null);
    +
    +    // Populate the queue with the offset in the SV4 of each
    +    // batch. Note that this is expensive as it requires a scan
    +    // of all items to be sorted: potentially millions.
    +
    +    runStarts.add(0);
    +    int batch = 0;
    +    final int totalCount = this.vector4.getTotalCount();
    +    for (int i = 0; i < totalCount; i++) {
    +      final int newBatch = this.vector4.get(i) >>> 16;
    +      if (newBatch == batch) {
    +        continue;
    +      } else if (newBatch == batch + 1) {
    +        runStarts.add(i);
    +        batch = newBatch;
    +      } else {
    +        throw new UnsupportedOperationException(String.format("Missing batch. batch:
%d newBatch: %d", batch, newBatch));
    +      }
    +    }
    +
    +    // Create a temporary SV4 to hold the merged results.
    +
    +    @SuppressWarnings("resource")
    +    final DrillBuf drillBuf = allocator.buffer(4 * totalCount);
    +    desiredRecordBatchCount = Math.min(outputBatchSize, Character.MAX_VALUE);
    +    desiredRecordBatchCount = Math.min(desiredRecordBatchCount, totalCount);
    +    aux = new SelectionVector4(drillBuf, totalCount, desiredRecordBatchCount);
    +  }
    +
    +  /**
    +   * For given recordCount how much memory does MSorter needs for its own purpose. This
is used in
    +   * ExternalSortBatch to make decisions about whether to spill or not.
    +   *
    +   * @param recordCount
    +   * @return
    +   */
    +  public static long memoryNeeded(final int recordCount) {
    +    // We need 4 bytes (SV4) for each record.
    +    // The memory allocator will round this to the next
    +    // power of 2.
    +
    +    return BaseAllocator.nextPowerOfTwo(recordCount * 4);
    +  }
    +
    +  /**
    +   * Given two regions within the selection vector 4 (a left and a right), merge
    +   * the two regions to produce a combined output region in the auxiliary
    +   * selection vector.
    +   *
    +   * @param leftStart
    +   * @param rightStart
    +   * @param rightEnd
    +   * @param outStart
    +   * @return
    +   */
    +  protected int merge(final int leftStart, final int rightStart, final int rightEnd,
final int outStart) {
    +    int l = leftStart;
    +    int r = rightStart;
    +    int o = outStart;
    +    while (l < rightStart && r < rightEnd) {
    +      if (compare(l, r) <= 0) {
    +        aux.set(o++, vector4.get(l++));
    +      } else {
    +        aux.set(o++, vector4.get(r++));
    +      }
    +    }
    +    while (l < rightStart) {
    +      aux.set(o++, vector4.get(l++));
    +    }
    +    while (r < rightEnd) {
    +      aux.set(o++, vector4.get(r++));
    +    }
    +    assert o == outStart + (rightEnd - leftStart);
    +    return o;
    +  }
    +
    +  @Override
    +  public SelectionVector4 getSV4() {
    +    return vector4;
    +  }
    +
    +  /**
    +   * Sort (really, merge) a set of pre-sorted runs to produce a combined
    +   * result set. Merging is done in the selection vector, record data does
    +   * not move.
    +   * <p>
    +   * Runs are merge pairwise in multiple passes, providing performance
    +   * of O(n * m * log(n)), where n = number of runs, m = number of records
    +   * per run.
    +   */
    +
    +  @Override
    +  public void sort(final VectorContainer container) {
    +    while (runStarts.size() > 1) {
    +      final int totalCount = this.vector4.getTotalCount();
    +
    +      // check if we're cancelled/failed recently
    +      if (!context.shouldContinue()) {
    +        return; }
    +
    +      int outIndex = 0;
    +      final Queue<Integer> newRunStarts = Queues.newLinkedBlockingQueue();
    +      newRunStarts.add(outIndex);
    +      final int size = runStarts.size();
    +      for (int i = 0; i < size / 2; i++) {
    --- End diff --
    
    What happens when "size" is odd ? How is the last run handled ?  Maybe add a comment to
explain.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message