drill-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (DRILL-4134) Incorporate remaining patches from DRILL-1942 Allocator refactor
Date Wed, 02 Dec 2015 21:24:11 GMT

    [ https://issues.apache.org/jira/browse/DRILL-4134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15036659#comment-15036659
] 

ASF GitHub Bot commented on DRILL-4134:
---------------------------------------

Github user adeneche commented on a diff in the pull request:

    https://github.com/apache/drill/pull/283#discussion_r46477393
  
    --- Diff: exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java
---
    @@ -0,0 +1,689 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.memory;
    +
    +import io.netty.buffer.ByteBufAllocator;
    +import io.netty.buffer.DrillBuf;
    +import io.netty.buffer.UnsafeDirectLittleEndian;
    +
    +import java.util.Arrays;
    +import java.util.IdentityHashMap;
    +import java.util.Set;
    +import java.util.concurrent.atomic.AtomicInteger;
    +import java.util.concurrent.atomic.AtomicLong;
    +
    +import org.apache.drill.common.HistoricalLog;
    +import org.apache.drill.exec.exception.OutOfMemoryException;
    +import org.apache.drill.exec.memory.AllocatorManager.BufferLedger;
    +import org.apache.drill.exec.ops.BufferManager;
    +import org.apache.drill.exec.util.AssertionUtil;
    +
    +import com.google.common.base.Preconditions;
    +
    +public abstract class BaseAllocator extends Accountant implements BufferAllocator {
    +  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseAllocator.class);
    +
    +  public static final String DEBUG_ALLOCATOR = "drill.memory.debug.allocator";
    +
    +  private static final AtomicLong ID_GENERATOR = new AtomicLong(0);
    +  private static final int CHUNK_SIZE = AllocatorManager.INNER_ALLOCATOR.getChunkSize();
    +
    +  public static final int DEBUG_LOG_LENGTH = 6;
    +  public static final boolean DEBUG = AssertionUtil.isAssertionsEnabled()
    +      || Boolean.parseBoolean(System.getProperty(DEBUG_ALLOCATOR, "false"));
    +  private final Object DEBUG_LOCK = DEBUG ? new Object() : null;
    +
    +  private final BaseAllocator parentAllocator;
    +  private final ByteBufAllocator thisAsByteBufAllocator;
    +  private final IdentityHashMap<BaseAllocator, Object> childAllocators;
    +  private final DrillBuf empty;
    +
    +  private volatile boolean isClosed = false; // the allocator has been closed
    +
    +  // Package exposed for sharing between AllocatorManger and BaseAllocator objects
    +  final long id = ID_GENERATOR.incrementAndGet(); // unique ID assigned to each allocator
    +  final String name;
    +  final RootAllocator root;
    +
    +  // members used purely for debugging
    +  private final IdentityHashMap<BufferLedger, Object> childLedgers;
    +  private final IdentityHashMap<Reservation, Object> reservations;
    +  private final HistoricalLog historicalLog;
    +
    +  protected BaseAllocator(
    +      final BaseAllocator parentAllocator,
    +      final String name,
    +      final long initReservation,
    +      final long maxAllocation) throws OutOfMemoryException {
    +    super(parentAllocator, initReservation, maxAllocation);
    +
    +    if (parentAllocator != null) {
    +      this.root = parentAllocator.root;
    +      empty = parentAllocator.empty;
    +    } else if (this instanceof RootAllocator) {
    +      this.root = (RootAllocator) this;
    +      empty = createEmpty();
    +    } else {
    +      throw new IllegalStateException("An parent allocator must either carry a root or
be the root.");
    +    }
    +
    +    this.parentAllocator = parentAllocator;
    +    this.name = name;
    +
    +    // TODO: DRILL-4131
    +    // this.thisAsByteBufAllocator = new DrillByteBufAllocator(this);
    +    this.thisAsByteBufAllocator = AllocatorManager.INNER_ALLOCATOR.allocator;
    +
    +    if (DEBUG) {
    +      childAllocators = new IdentityHashMap<>();
    +      reservations = new IdentityHashMap<>();
    +      childLedgers = new IdentityHashMap<>();
    +      historicalLog = new HistoricalLog(DEBUG_LOG_LENGTH, "allocator[%d]", id);
    +      hist("created by \"%s\", owned = %d", name.toString(), this.getAllocatedMemory());
    +    } else {
    +      childAllocators = null;
    +      reservations = null;
    +      historicalLog = null;
    +      childLedgers = null;
    +    }
    +
    +  }
    +
    +  @Override
    +  public String getName() {
    +    return name;
    +  }
    +
    +  @Override
    +  public DrillBuf getEmpty() {
    +    return empty;
    +  }
    +
    +  /**
    +   * For debug/verification purposes only. Allows an AllocatorManager to tell the allocator
that we have a new ledger
    +   * associated with this allocator.
    +   */
    +  void associateLedger(BufferLedger ledger) {
    +    if (DEBUG) {
    +      synchronized (DEBUG_LOCK) {
    +        childLedgers.put(ledger, null);
    +      }
    +    }
    +  }
    +
    +  /**
    +   * For debug/verification purposes only. Allows an AllocatorManager to tell the allocator
that we are removing a
    +   * ledger associated with this allocator
    +   */
    +  void dissociateLedger(BufferLedger ledger) {
    +    if (DEBUG) {
    +      synchronized (DEBUG_LOCK) {
    +        if (!childLedgers.containsKey(ledger)) {
    +          throw new IllegalStateException("Trying to remove a child ledger that doesn't
exist.");
    +        }
    +        childLedgers.remove(ledger);
    +      }
    +    }
    +  }
    +
    +  /**
    +   * Track when a ChildAllocator of this BaseAllocator is closed. Used for debugging
purposes.
    +   *
    +   * @param childAllocator
    +   *          The child allocator that has been closed.
    +   */
    +  private void childClosed(final BaseAllocator childAllocator) {
    +    if (DEBUG) {
    +      Preconditions.checkArgument(childAllocator != null, "child allocator can't be null");
    +
    +      synchronized (DEBUG_LOCK) {
    +        final Object object = childAllocators.remove(childAllocator);
    +        if (object == null) {
    +          childAllocator.historicalLog.logHistory(logger);
    +          throw new IllegalStateException("Child allocator[" + childAllocator.id
    +              + "] not found in parent allocator[" + id + "]'s childAllocators");
    +        }
    +      }
    +    }
    +  }
    +
    +  private static String createErrorMsg(final BufferAllocator allocator, final int rounded,
final int requested) {
    +    if (rounded != requested) {
    +      return String.format(
    +          "Unable to allocate buffer of size %d (rounded from %d) due to memory limit.
Current allocation: %d",
    +          rounded, requested, allocator.getAllocatedMemory());
    +    } else {
    +      return String.format("Unable to allocate buffer of size %d due to memory limit.
Current allocation: %d",
    +          rounded, allocator.getAllocatedMemory());
    +    }
    +  }
    +
    +  @Override
    +  public DrillBuf buffer(final int initialRequestSize) {
    +    return buffer(initialRequestSize, null);
    +  }
    +
    +  private DrillBuf createEmpty(){
    +    return new DrillBuf(new AtomicInteger(), null, AllocatorManager.INNER_ALLOCATOR.empty,
null, null, 0, 0, true);
    +  }
    +
    +  @Override
    +  public DrillBuf buffer(final int initialRequestSize, BufferManager manager) {
    +
    +    Preconditions.checkArgument(initialRequestSize >= 0, "the minimimum requested
size must be non-negative");
    +    Preconditions.checkArgument(initialRequestSize >= 0, "the maximum requested size
must be non-negative");
    +
    +    if (initialRequestSize == 0) {
    +      return empty;
    +    }
    +
    +    // round to next largest power of two if we're within a chunk since that is how our
allocator operates
    +    final int actualRequestSize = initialRequestSize < CHUNK_SIZE ?
    +        nextPowerOfTwo(initialRequestSize)
    +        : initialRequestSize;
    +    AllocationOutcome outcome = this.allocateBytes(actualRequestSize);
    +    if (!outcome.isOk()) {
    +      throw new OutOfMemoryException(createErrorMsg(this, actualRequestSize, initialRequestSize));
    +    }
    +
    +    boolean success = true;
    +    try {
    +      DrillBuf buffer = bufferWithoutReservation(actualRequestSize, manager);
    +      success = true;
    +      return buffer;
    +    } finally {
    +      if (!success) {
    +        releaseBytes(actualRequestSize);
    +      }
    +    }
    +
    +  }
    +
    +  /**
    +   * Used by usual allocation as well as for allocating a pre-reserved buffer. Skips
the typical accounting associated
    +   * with creating a new buffer.
    +   */
    +  private DrillBuf bufferWithoutReservation(final int size, BufferManager bufferManager)
throws OutOfMemoryException {
    +    AllocatorManager manager = new AllocatorManager(this, size);
    +    BufferLedger ledger = manager.associate(this);
    +    DrillBuf buffer = ledger.newDrillBuf(0, size, bufferManager, true);
    +
    +    // make sure that our allocation is equal to what we expected.
    +    Preconditions.checkArgument(buffer.capacity() == size,
    +        "Allocated capacity %d was not equal to requested capacity %d.", buffer.capacity(),
size);
    +
    +    return buffer;
    +  }
    +
    +  @Override
    +  public ByteBufAllocator getAsByteBufAllocator() {
    +    return thisAsByteBufAllocator;
    +  }
    +
    +  /**
    +   * Return a unique Id for an allocator. Id's may be recycled after a long period of
time.
    +   *
    +   * <p>
    +   * Primary use for this is for debugging output.
    +   * </p>
    +   *
    +   * @return the allocator's id
    +   */
    +  long getId() {
    +    return id;
    +  }
    +
    +  @Override
    +  public BufferAllocator newChildAllocator(
    +      final String name,
    +      final long initReservation,
    +      final long maxAllocation) {
    +    final ChildAllocator childAllocator = new ChildAllocator(this, name, initReservation,
maxAllocation);
    +
    +    if (DEBUG) {
    +      childAllocators.put(childAllocator, childAllocator);
    +      historicalLog.recordEvent("allocator[%d] created new child allocator[%d]",
    +          id, childAllocator.id);
    +    }
    +
    +    return childAllocator;
    +  }
    +
    +  private class Reservation extends AllocationReservation {
    +    private final HistoricalLog historicalLog;
    +
    +    public Reservation() {
    +      if (DEBUG) {
    +        historicalLog = new HistoricalLog("Reservation[allocator[%d], %d]", id, System.identityHashCode(this));
    +        historicalLog.recordEvent("created");
    +        synchronized (DEBUG_LOCK) {
    +          reservations.put(this, this);
    +        }
    +      } else {
    +        historicalLog = null;
    +      }
    +    }
    +
    +    @Override
    +    public void close() {
    +      if (DEBUG) {
    +        if (!isClosed()) {
    +          final Object object;
    +          synchronized (DEBUG_LOCK) {
    +            object = reservations.remove(this);
    +          }
    +          if (object == null) {
    +            final StringBuilder sb = new StringBuilder();
    +            print(sb, 0, Verbosity.LOG_WITH_STACKTRACE);
    +            logger.debug(sb.toString());
    +            throw new IllegalStateException(
    +                String.format("Didn't find closing reservation[%d]", System.identityHashCode(this)));
    +          }
    +
    +          historicalLog.recordEvent("closed");
    +        }
    +      }
    +
    +      super.close();
    +    }
    +
    +    @Override
    +    protected boolean reserve(int nBytes) {
    +      final AllocationOutcome outcome = BaseAllocator.this.allocateBytes(nBytes);
    +
    +      if (DEBUG) {
    +        historicalLog.recordEvent("reserve(%d) => %s", nBytes, Boolean.toString(outcome.isOk()));
    +      }
    +
    +      return outcome.isOk();
    +    }
    +
    +    @Override
    +    protected DrillBuf allocate(int nBytes) {
    +      boolean success = false;
    +
    +      /*
    +       * The reservation already added the requested bytes to the allocators owned and
allocated bytes via reserve().
    +       * This ensures that they can't go away. But when we ask for the buffer here, that
will add to the allocated bytes
    +       * as well, so we need to return the same number back to avoid double-counting
them.
    +       */
    +      try {
    +        final DrillBuf drillBuf = BaseAllocator.this.bufferWithoutReservation(nBytes,
null);
    +
    +        if (DEBUG) {
    +          historicalLog.recordEvent("allocate() => %s",
    +              drillBuf == null ? "null" : String.format("DrillBuf[%d]", drillBuf.getId()));
    --- End diff --
    
    `drillBuf == null` check is unnecessary, `bufferWithoutReservation()` cannot return `null`


> Incorporate remaining patches from DRILL-1942 Allocator refactor
> ----------------------------------------------------------------
>
>                 Key: DRILL-4134
>                 URL: https://issues.apache.org/jira/browse/DRILL-4134
>             Project: Apache Drill
>          Issue Type: Sub-task
>          Components: Execution - Flow
>            Reporter: Jacques Nadeau
>            Assignee: Jacques Nadeau
>             Fix For: 1.4.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message