drill-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From adeneche <...@git.apache.org>
Subject [GitHub] drill pull request: DRILL-4134: Allocator updates
Date Tue, 01 Dec 2015 23:26:31 GMT
Github user adeneche commented on a diff in the pull request:

    https://github.com/apache/drill/pull/283#discussion_r46355906
  
    --- Diff: exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java
---
    @@ -0,0 +1,689 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.memory;
    +
    +import io.netty.buffer.ByteBufAllocator;
    +import io.netty.buffer.DrillBuf;
    +import io.netty.buffer.UnsafeDirectLittleEndian;
    +
    +import java.util.Arrays;
    +import java.util.IdentityHashMap;
    +import java.util.Set;
    +import java.util.concurrent.atomic.AtomicInteger;
    +import java.util.concurrent.atomic.AtomicLong;
    +
    +import org.apache.drill.common.HistoricalLog;
    +import org.apache.drill.exec.exception.OutOfMemoryException;
    +import org.apache.drill.exec.memory.AllocatorManager.BufferLedger;
    +import org.apache.drill.exec.ops.BufferManager;
    +import org.apache.drill.exec.util.AssertionUtil;
    +
    +import com.google.common.base.Preconditions;
    +
    +public abstract class BaseAllocator extends Accountant implements BufferAllocator {
    +  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseAllocator.class);
    +
    +  public static final String DEBUG_ALLOCATOR = "drill.memory.debug.allocator";
    +
    +  private static final AtomicLong ID_GENERATOR = new AtomicLong(0);
    +  private static final int CHUNK_SIZE = AllocatorManager.INNER_ALLOCATOR.getChunkSize();
    +
    +  public static final int DEBUG_LOG_LENGTH = 6;
    +  public static final boolean DEBUG = AssertionUtil.isAssertionsEnabled()
    +      || Boolean.parseBoolean(System.getProperty(DEBUG_ALLOCATOR, "false"));
    +  private final Object DEBUG_LOCK = DEBUG ? new Object() : null;
    +
    +  private final BaseAllocator parentAllocator;
    +  private final ByteBufAllocator thisAsByteBufAllocator;
    +  private final IdentityHashMap<BaseAllocator, Object> childAllocators;
    +  private final DrillBuf empty;
    +
    +  private volatile boolean isClosed = false; // the allocator has been closed
    +
    +  // Package exposed for sharing between AllocatorManger and BaseAllocator objects
    +  final long id = ID_GENERATOR.incrementAndGet(); // unique ID assigned to each allocator
    +  final String name;
    +  final RootAllocator root;
    +
    +  // members used purely for debugging
    +  private final IdentityHashMap<BufferLedger, Object> childLedgers;
    +  private final IdentityHashMap<Reservation, Object> reservations;
    +  private final HistoricalLog historicalLog;
    +
    +  protected BaseAllocator(
    +      final BaseAllocator parentAllocator,
    +      final String name,
    +      final long initReservation,
    +      final long maxAllocation) throws OutOfMemoryException {
    +    super(parentAllocator, initReservation, maxAllocation);
    +
    +    if (parentAllocator != null) {
    +      this.root = parentAllocator.root;
    +      empty = parentAllocator.empty;
    +    } else if (this instanceof RootAllocator) {
    +      this.root = (RootAllocator) this;
    +      empty = createEmpty();
    +    } else {
    +      throw new IllegalStateException("An parent allocator must either carry a root or
be the root.");
    +    }
    +
    +    this.parentAllocator = parentAllocator;
    +    this.name = name;
    +
    +    // TODO: DRILL-4131
    +    // this.thisAsByteBufAllocator = new DrillByteBufAllocator(this);
    +    this.thisAsByteBufAllocator = AllocatorManager.INNER_ALLOCATOR.allocator;
    +
    +    if (DEBUG) {
    +      childAllocators = new IdentityHashMap<>();
    +      reservations = new IdentityHashMap<>();
    +      childLedgers = new IdentityHashMap<>();
    +      historicalLog = new HistoricalLog(DEBUG_LOG_LENGTH, "allocator[%d]", id);
    +      hist("created by \"%s\", owned = %d", name.toString(), this.getAllocatedMemory());
    +    } else {
    +      childAllocators = null;
    +      reservations = null;
    +      historicalLog = null;
    +      childLedgers = null;
    +    }
    +
    +  }
    +
    +  @Override
    +  public String getName() {
    +    return name;
    +  }
    +
    +  @Override
    +  public DrillBuf getEmpty() {
    +    return empty;
    +  }
    +
    +  /**
    +   * For debug/verification purposes only. Allows an AllocatorManager to tell the allocator
that we have a new ledger
    +   * associated with this allocator.
    +   */
    +  void associateLedger(BufferLedger ledger) {
    +    if (DEBUG) {
    +      synchronized (DEBUG_LOCK) {
    +        childLedgers.put(ledger, null);
    +      }
    +    }
    +  }
    +
    +  /**
    +   * For debug/verification purposes only. Allows an AllocatorManager to tell the allocator
that we are removing a
    +   * ledger associated with this allocator
    +   */
    +  void dissociateLedger(BufferLedger ledger) {
    +    if (DEBUG) {
    +      synchronized (DEBUG_LOCK) {
    +        if (!childLedgers.containsKey(ledger)) {
    +          throw new IllegalStateException("Trying to remove a child ledger that doesn't
exist.");
    +        }
    +        childLedgers.remove(ledger);
    +      }
    +    }
    +  }
    +
    +  /**
    +   * Track when a ChildAllocator of this BaseAllocator is closed. Used for debugging
purposes.
    +   *
    +   * @param childAllocator
    +   *          The child allocator that has been closed.
    +   */
    +  private void childClosed(final BaseAllocator childAllocator) {
    +    if (DEBUG) {
    +      Preconditions.checkArgument(childAllocator != null, "child allocator can't be null");
    +
    +      synchronized (DEBUG_LOCK) {
    +        final Object object = childAllocators.remove(childAllocator);
    +        if (object == null) {
    +          childAllocator.historicalLog.logHistory(logger);
    +          throw new IllegalStateException("Child allocator[" + childAllocator.id
    +              + "] not found in parent allocator[" + id + "]'s childAllocators");
    +        }
    +      }
    +    }
    +  }
    +
    +  private static String createErrorMsg(final BufferAllocator allocator, final int rounded,
final int requested) {
    +    if (rounded != requested) {
    +      return String.format(
    +          "Unable to allocate buffer of size %d (rounded from %d) due to memory limit.
Current allocation: %d",
    +          rounded, requested, allocator.getAllocatedMemory());
    +    } else {
    +      return String.format("Unable to allocate buffer of size %d due to memory limit.
Current allocation: %d",
    +          rounded, allocator.getAllocatedMemory());
    +    }
    +  }
    +
    +  @Override
    +  public DrillBuf buffer(final int initialRequestSize) {
    +    return buffer(initialRequestSize, null);
    +  }
    +
    +  private DrillBuf createEmpty(){
    +    return new DrillBuf(new AtomicInteger(), null, AllocatorManager.INNER_ALLOCATOR.empty,
null, null, 0, 0, true);
    +  }
    +
    +  @Override
    +  public DrillBuf buffer(final int initialRequestSize, BufferManager manager) {
    +
    +    Preconditions.checkArgument(initialRequestSize >= 0, "the minimimum requested
size must be non-negative");
    +    Preconditions.checkArgument(initialRequestSize >= 0, "the maximum requested size
must be non-negative");
    +
    +    if (initialRequestSize == 0) {
    +      return empty;
    +    }
    +
    +    // round to next largest power of two if we're within a chunk since that is how our
allocator operates
    +    final int actualRequestSize = initialRequestSize < CHUNK_SIZE ?
    +        nextPowerOfTwo(initialRequestSize)
    +        : initialRequestSize;
    +    AllocationOutcome outcome = this.allocateBytes(actualRequestSize);
    +    if (!outcome.isOk()) {
    +      throw new OutOfMemoryException(createErrorMsg(this, actualRequestSize, initialRequestSize));
    +    }
    +
    +    boolean success = true;
    +    try {
    +      DrillBuf buffer = bufferWithoutReservation(actualRequestSize, manager);
    +      success = true;
    +      return buffer;
    +    } finally {
    +      if (!success) {
    +        releaseBytes(actualRequestSize);
    +      }
    +    }
    +
    +  }
    +
    +  /**
    +   * Used by usual allocation as well as for allocating a pre-reserved buffer. Skips
the typical accounting associated
    +   * with creating a new buffer.
    +   */
    +  private DrillBuf bufferWithoutReservation(final int size, BufferManager bufferManager)
throws OutOfMemoryException {
    +    AllocatorManager manager = new AllocatorManager(this, size);
    +    BufferLedger ledger = manager.associate(this);
    +    DrillBuf buffer = ledger.newDrillBuf(0, size, bufferManager, true);
    +
    +    // make sure that our allocation is equal to what we expected.
    +    Preconditions.checkArgument(buffer.capacity() == size,
    +        "Allocated capacity %d was not equal to requested capacity %d.", buffer.capacity(),
size);
    +
    +    return buffer;
    +  }
    +
    +  @Override
    +  public ByteBufAllocator getAsByteBufAllocator() {
    +    return thisAsByteBufAllocator;
    +  }
    +
    +  /**
    +   * Return a unique Id for an allocator. Id's may be recycled after a long period of
time.
    +   *
    +   * <p>
    +   * Primary use for this is for debugging output.
    +   * </p>
    +   *
    +   * @return the allocator's id
    +   */
    +  long getId() {
    +    return id;
    +  }
    +
    +  @Override
    +  public BufferAllocator newChildAllocator(
    +      final String name,
    +      final long initReservation,
    +      final long maxAllocation) {
    +    final ChildAllocator childAllocator = new ChildAllocator(this, name, initReservation,
maxAllocation);
    +
    +    if (DEBUG) {
    +      childAllocators.put(childAllocator, childAllocator);
    +      historicalLog.recordEvent("allocator[%d] created new child allocator[%d]",
    +          id, childAllocator.id);
    +    }
    +
    +    return childAllocator;
    +  }
    +
    +  private class Reservation extends AllocationReservation {
    +    private final HistoricalLog historicalLog;
    +
    +    public Reservation() {
    +      if (DEBUG) {
    +        historicalLog = new HistoricalLog("Reservation[allocator[%d], %d]", id, System.identityHashCode(this));
    +        historicalLog.recordEvent("created");
    +        synchronized (DEBUG_LOCK) {
    +          reservations.put(this, this);
    +        }
    +      } else {
    +        historicalLog = null;
    +      }
    +    }
    +
    +    @Override
    +    public void close() {
    +      if (DEBUG) {
    +        if (!isClosed()) {
    +          final Object object;
    +          synchronized (DEBUG_LOCK) {
    +            object = reservations.remove(this);
    +          }
    +          if (object == null) {
    +            final StringBuilder sb = new StringBuilder();
    +            print(sb, 0, Verbosity.LOG_WITH_STACKTRACE);
    +            logger.debug(sb.toString());
    +            throw new IllegalStateException(
    +                String.format("Didn't find closing reservation[%d]", System.identityHashCode(this)));
    +          }
    +
    +          historicalLog.recordEvent("closed");
    +        }
    +      }
    +
    +      super.close();
    +    }
    +
    +    @Override
    +    protected boolean reserve(int nBytes) {
    +      final AllocationOutcome outcome = BaseAllocator.this.allocateBytes(nBytes);
    +
    +      if (DEBUG) {
    +        historicalLog.recordEvent("reserve(%d) => %s", nBytes, Boolean.toString(outcome.isOk()));
    +      }
    +
    +      return outcome.isOk();
    +    }
    +
    +    @Override
    +    protected DrillBuf allocate(int nBytes) {
    +      boolean success = false;
    +
    +      /*
    +       * The reservation already added the requested bytes to the allocators owned and
allocated bytes via reserve().
    +       * This ensures that they can't go away. But when we ask for the buffer here, that
will add to the allocated bytes
    +       * as well, so we need to return the same number back to avoid double-counting
them.
    +       */
    +      try {
    +        final DrillBuf drillBuf = BaseAllocator.this.bufferWithoutReservation(nBytes,
null);
    +
    +        if (DEBUG) {
    +          historicalLog.recordEvent("allocate() => %s",
    +              drillBuf == null ? "null" : String.format("DrillBuf[%d]", drillBuf.getId()));
    +        }
    +        success = true;
    +        return drillBuf;
    +      } finally {
    +        if (!success) {
    +          releaseBytes(nBytes);
    +        }
    +      }
    +    }
    +
    +    @Override
    +    protected void releaseReservation(int nBytes) {
    +      releaseBytes(nBytes);
    +
    +      if (DEBUG) {
    +        historicalLog.recordEvent("releaseReservation(%d)", nBytes);
    +      }
    +    }
    +
    +  }
    +
    +  @Override
    +  public AllocationReservation newReservation() {
    +    return new Reservation();
    +  }
    +
    +
    +  @Override
    +  public synchronized void close() {
    +    /*
    +     * Some owners may close more than once because of complex cleanup and shutdown
    +     * procedures.
    +     */
    +    if (isClosed) {
    +      return;
    +    }
    +
    +    if (DEBUG) {
    +      synchronized(DEBUG_LOCK) {
    +        verifyAllocator();
    +
    +        // are there outstanding child allocators?
    +        if (!childAllocators.isEmpty()) {
    +          for (final BaseAllocator childAllocator : childAllocators.keySet()) {
    +            if (childAllocator.isClosed) {
    +              logger.warn(String.format(
    +                  "Closed child allocator[%s] on parent allocator[%s]'s child list.\n%s",
    +                  childAllocator.name, name, toString()));
    +            }
    +          }
    +
    +          throw new IllegalStateException(
    +              String.format("Allocator[%s] closed with outstanding child allocators.\n%s",
name, toString()));
    +        }
    +
    +        // are there outstanding buffers?
    +        final int allocatedCount = childLedgers.size();
    +        if (allocatedCount > 0) {
    +          throw new IllegalStateException(
    +              String.format("Allocator[%s] closed with outstanding buffers allocated
(%d).\n%s",
    +                  name, allocatedCount, toString()));
    +        }
    +
    +        if (reservations.size() != 0) {
    +          throw new IllegalStateException(
    +              String.format("Allocator[%s] closed with outstanding reservations (%d).\n%s",
name, reservations.size(),
    +                  toString()));
    +        }
    +
    +      }
    +    }
    +
    +      // Is there unaccounted-for outstanding allocation?
    +      final long allocated = getAllocatedMemory();
    +      if (allocated > 0) {
    +        throw new IllegalStateException(
    +          String.format("Unaccounted for outstanding allocation (%d)\n%s", allocated,
toString()));
    +      }
    +
    +    // we need to release our memory to our parent before we tell it we've closed.
    +    super.close();
    +
    +    // Inform our parent allocator that we've closed
    +    if (parentAllocator != null) {
    +      parentAllocator.childClosed(this);
    +    }
    +
    +    if (DEBUG) {
    +      historicalLog.recordEvent("closed");
    +      logger.debug(String.format(
    +          "closed allocator[%s].",
    +          name));
    +    }
    +
    +    isClosed = true;
    +
    +
    +  }
    +
    +  public String toString() {
    +    final Verbosity verbosity = logger.isTraceEnabled() || true ? Verbosity.LOG_WITH_STACKTRACE
: Verbosity.BASIC;
    --- End diff --
    
    this one too :P


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message