drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jacq...@apache.org
Subject [4/6] drill git commit: DRILL-4246: Fix Allocator concurrency bug and improve error detection
Date Wed, 20 Jan 2016 15:31:12 GMT
DRILL-4246: Fix Allocator concurrency bug and improve error detection

- Rename the internal DrillBuf field to udle to better express its purpose.
- Rename AllocatorManager to AllocationManager to better express its purpose.
- Address situation where dangling ledger could be transferred into while it was being released released by protecting association and release inside the AllocationManager.
- Add allocator assertions to ensure allocator operations are done while the allocator is open.
- Simplify AllocationManager locking model.
- Exclude HDFS reference to netty-all
- Improve debugging messages for allocators (and fix debug message bugs)

This closes #323.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/a0ae83c2
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/a0ae83c2
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/a0ae83c2

Branch: refs/heads/master
Commit: a0ae83c2f6c35f536bf16819fc2f2c6b0469cc6c
Parents: 664d34e
Author: Jacques Nadeau <jacques@apache.org>
Authored: Sun Jan 10 20:08:03 2016 -0800
Committer: Jacques Nadeau <jacques@apache.org>
Committed: Wed Jan 20 07:30:12 2016 -0800

----------------------------------------------------------------------
 exec/java-exec/pom.xml                          |   6 +
 .../src/main/java/io/netty/buffer/DrillBuf.java | 135 +++---
 .../netty/buffer/UnsafeDirectLittleEndian.java  |   8 +
 .../drill/exec/memory/AllocationManager.java    | 434 +++++++++++++++++++
 .../drill/exec/memory/AllocatorManager.java     | 356 ---------------
 .../apache/drill/exec/memory/BaseAllocator.java |  78 +++-
 .../drill/exec/memory/BufferAllocator.java      |   5 +
 .../java/org/apache/drill/exec/memory/README.md |  22 +-
 8 files changed, 586 insertions(+), 458 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/a0ae83c2/exec/java-exec/pom.xml
----------------------------------------------------------------------
diff --git a/exec/java-exec/pom.xml b/exec/java-exec/pom.xml
index 7d5e4b3..516c0ed 100644
--- a/exec/java-exec/pom.xml
+++ b/exec/java-exec/pom.xml
@@ -405,6 +405,12 @@
       <artifactId>hadoop-hdfs</artifactId>
       <scope>test</scope>
       <classifier>tests</classifier>
+      <exclusions>
+        <exclusion>
+          <groupId>io.netty</groupId>
+          <artifactId>netty-all</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
   </dependencies>
 

http://git-wip-us.apache.org/repos/asf/drill/blob/a0ae83c2/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java
----------------------------------------------------------------------
diff --git a/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java b/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java
index 138495c..3793f25 100644
--- a/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java
+++ b/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java
@@ -31,14 +31,13 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.drill.common.HistoricalLog;
-import org.apache.drill.exec.memory.AllocatorManager.BufferLedger;
+import org.apache.drill.exec.memory.AllocationManager.BufferLedger;
 import org.apache.drill.exec.memory.BaseAllocator;
 import org.apache.drill.exec.memory.BaseAllocator.Verbosity;
 import org.apache.drill.exec.memory.BoundsChecking;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.BufferManager;
 
-import com.google.common.base.Charsets;
 import com.google.common.base.Preconditions;
 
 public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
@@ -48,7 +47,7 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
 
   private final long id = idGenerator.incrementAndGet();
   private final AtomicInteger refCnt;
-  private final UnsafeDirectLittleEndian byteBuf;
+  private final UnsafeDirectLittleEndian udle;
   private final long addr;
   private final int offset;
   private final BufferLedger ledger;
@@ -56,7 +55,6 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
   private final ByteBufAllocator alloc;
   private final boolean isEmpty;
   private volatile int length;
-
   private final HistoricalLog historicalLog = BaseAllocator.DEBUG ?
       new HistoricalLog(BaseAllocator.DEBUG_LOG_LENGTH, "DrillBuf[%d]", id) : null;
 
@@ -71,7 +69,7 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
       boolean isEmpty) {
     super(byteBuf.maxCapacity());
     this.refCnt = refCnt;
-    this.byteBuf = byteBuf;
+    this.udle = byteBuf;
     this.isEmpty = isEmpty;
     this.bufManager = manager;
     this.alloc = alloc;
@@ -123,18 +121,20 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
         historicalLog.logHistory(logger);
       }
       throw new IndexOutOfBoundsException(String.format(
-              "index: %d, length: %d (expected: range(0, %d))", index, fieldLength, capacity()));
+          "index: %d, length: %d (expected: range(0, %d))", index, fieldLength, capacity()));
     }
   }
 
   /**
    * Allows a function to determine whether not reading a particular string of bytes is valid.
    *
-   * Will throw an exception if the memory is not readable for some reason.  Only doesn't something in the
-   * case that AssertionUtil.BOUNDS_CHECKING_ENABLED is true.
+   * Will throw an exception if the memory is not readable for some reason. Only doesn't something in the case that
+   * AssertionUtil.BOUNDS_CHECKING_ENABLED is true.
    *
-   * @param start The starting position of the bytes to be read.
-   * @param end The exclusive endpoint of the bytes to be read.
+   * @param start
+   *          The starting position of the bytes to be read.
+   * @param end
+   *          The exclusive endpoint of the bytes to be read.
    */
   public void checkBytes(int start, int end) {
     if (BoundsChecking.BOUNDS_CHECKING_ENABLED) {
@@ -161,24 +161,24 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
    *
    * This operation has no impact on the reference count of this DrillBuf. The newly created DrillBuf with either have a
    * reference count of 1 (in the case that this is the first time this memory is being associated with the new
-   * allocator) or the current value of the reference count + 1 for the other AllocatorManager/BufferLedger combination
+   * allocator) or the current value of the reference count + 1 for the other AllocationManager/BufferLedger combination
    * in the case that the provided allocator already had an association to this underlying memory.
    *
-   * @param allocator
+   * @param target
    *          The target allocator to create an association with.
    * @return A new DrillBuf which shares the same underlying memory as this DrillBuf.
    */
-  public DrillBuf retain(BufferAllocator allocator) {
+  public DrillBuf retain(BufferAllocator target) {
 
     if (isEmpty) {
       return this;
     }
 
     if (BaseAllocator.DEBUG) {
-      historicalLog.recordEvent("retain(%s)", allocator.getName());
+      historicalLog.recordEvent("retain(%s)", target.getName());
     }
-    BufferLedger otherLedger = this.ledger.getLedgerForAllocator(allocator);
-    return otherLedger.newDrillBuf(offset, length, null, true);
+    final BufferLedger otherLedger = this.ledger.getLedgerForAllocator(target);
+    return otherLedger.newDrillBuf(offset, length, null);
   }
 
   /**
@@ -190,7 +190,7 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
    *
    * This operation has no impact on the reference count of this DrillBuf. The newly created DrillBuf with either have a
    * reference count of 1 (in the case that this is the first time this memory is being associated with the new
-   * allocator) or the current value of the reference count for the other AllocatorManager/BufferLedger combination in
+   * allocator) or the current value of the reference count for the other AllocationManager/BufferLedger combination in
    * the case that the provided allocator already had an association to this underlying memory.
    *
    * Transfers will always succeed, even if that puts the other allocator into an overlimit situation. This is possible
@@ -212,7 +212,7 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
     }
 
     final BufferLedger otherLedger = this.ledger.getLedgerForAllocator(target);
-    final DrillBuf newBuf = otherLedger.newDrillBuf(offset, length, null, true);
+    final DrillBuf newBuf = otherLedger.newDrillBuf(offset, length, null);
     final boolean allocationFit = this.ledger.transferBalance(otherLedger);
     return new TransferResult(allocationFit, newBuf);
   }
@@ -245,10 +245,11 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
   }
 
   /**
-   * Release the provided number of reference counts.  If this is a root buffer, will decrease accounting if the local reference count returns to zero.
+   * Release the provided number of reference counts.
    */
   @Override
-  public synchronized boolean release(int decrement) {
+  public boolean release(int decrement) {
+
     if (isEmpty) {
       return false;
     }
@@ -258,7 +259,7 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
           decrement, toVerboseString()));
     }
 
-    final int refCnt = this.refCnt.addAndGet(-decrement);
+    final int refCnt = ledger.decrement(decrement);
 
     if (BaseAllocator.DEBUG) {
       historicalLog.recordEvent("release(%d). original value: %d", decrement, refCnt + decrement);
@@ -267,14 +268,10 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
     if (refCnt < 0) {
       throw new IllegalStateException(
           String.format("DrillBuf[%d] refCnt has gone negative. Buffer Info: %s", id, toVerboseString()));
-
-    }
-    if (refCnt == 0) {
-      ledger.release();
-      return true;
     }
 
-    return false;
+    return refCnt == 0;
+
   }
 
   @Override
@@ -301,7 +298,7 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
 
   @Override
   public ByteBufAllocator alloc() {
-    return byteBuf.alloc();
+    return udle.alloc();
   }
 
   @Override
@@ -316,7 +313,7 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
 
   @Override
   public ByteBuf unwrap() {
-    return byteBuf;
+    return udle;
   }
 
   @Override
@@ -370,9 +367,8 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
     }
 
     /*
-     * Re the behavior of reference counting,
-     * see http://netty.io/wiki/reference-counted-objects.html#wiki-h3-5, which explains
-     * that derived buffers share their reference count with their parent
+     * Re the behavior of reference counting, see http://netty.io/wiki/reference-counted-objects.html#wiki-h3-5, which
+     * explains that derived buffers share their reference count with their parent
      */
     final DrillBuf newBuf = ledger.newDrillBuf(offset + index, length);
     newBuf.writerIndex(length);
@@ -396,37 +392,37 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
 
   @Override
   public ByteBuffer nioBuffer(int index, int length) {
-    return byteBuf.nioBuffer(offset + index, length);
+    return udle.nioBuffer(offset + index, length);
   }
 
   @Override
   public ByteBuffer internalNioBuffer(int index, int length) {
-    return byteBuf.internalNioBuffer(offset + index, length);
+    return udle.internalNioBuffer(offset + index, length);
   }
 
   @Override
   public ByteBuffer[] nioBuffers() {
-    return new ByteBuffer[]{nioBuffer()};
+    return new ByteBuffer[] { nioBuffer() };
   }
 
   @Override
   public ByteBuffer[] nioBuffers(int index, int length) {
-    return new ByteBuffer[]{nioBuffer(index, length)};
+    return new ByteBuffer[] { nioBuffer(index, length) };
   }
 
   @Override
   public boolean hasArray() {
-    return byteBuf.hasArray();
+    return udle.hasArray();
   }
 
   @Override
   public byte[] array() {
-    return byteBuf.array();
+    return udle.array();
   }
 
   @Override
   public int arrayOffset() {
-    return byteBuf.arrayOffset();
+    return udle.arrayOffset();
   }
 
   @Override
@@ -441,7 +437,7 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
 
   @Override
   public String toString() {
-    return toString(0, 0, Charsets.UTF_8);
+    return String.format("DrillBuf[%d], udle: [%d %d..%d]", id, udle.id, offset, offset + capacity());
   }
 
   @Override
@@ -451,24 +447,12 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
 
   @Override
   public String toString(int index, int length, Charset charset) {
-    final String basics =
-        String.format("{DrillBuf[%d], udle identityHashCode == %d, identityHashCode == %d}",
-            id, System.identityHashCode(byteBuf), System.identityHashCode(refCnt));
 
     if (length == 0) {
-      return basics;
-    }
-
-    final ByteBuffer nioBuffer;
-    if (nioBufferCount() == 1) {
-      nioBuffer = nioBuffer(index, length);
-    } else {
-      nioBuffer = ByteBuffer.allocate(length);
-      getBytes(index, nioBuffer);
-      nioBuffer.flip();
+      return "";
     }
 
-    return basics + '\n' + ByteBufUtil.decodeString(nioBuffer, charset);
+    return ByteBufUtil.decodeString(nioBuffer(index, length), charset);
   }
 
   @Override
@@ -494,7 +478,8 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
       historicalLog.recordEvent("retain(%d)", increment);
     }
 
-    refCnt.addAndGet(increment);
+    final int originalReferenceCount = refCnt.getAndAdd(increment);
+    Preconditions.checkArgument(originalReferenceCount > 0);
     return this;
   }
 
@@ -641,13 +626,13 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
 
   @Override
   public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
-    byteBuf.getBytes(index + offset, dst, dstIndex, length);
+    udle.getBytes(index + offset, dst, dstIndex, length);
     return this;
   }
 
   @Override
   public ByteBuf getBytes(int index, ByteBuffer dst) {
-    byteBuf.getBytes(index + offset, dst);
+    udle.getBytes(index + offset, dst);
     return this;
   }
 
@@ -658,12 +643,12 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
     return this;
   }
 
-  public void setByte(int index, byte b){
+  public void setByte(int index, byte b) {
     chk(index, 1);
     PlatformDependent.putByte(addr(index), b);
   }
 
-  public void writeByteUnsafe(byte b){
+  public void writeByteUnsafe(byte b) {
     PlatformDependent.putByte(addr(readerIndex), b);
     readerIndex++;
   }
@@ -715,13 +700,13 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
 
   @Override
   public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) {
-    byteBuf.getBytes(index + offset, dst, dstIndex, length);
+    udle.getBytes(index + offset, dst, dstIndex, length);
     return this;
   }
 
   @Override
   public ByteBuf getBytes(int index, OutputStream out, int length) throws IOException {
-    byteBuf.getBytes(index + offset, out, length);
+    udle.getBytes(index + offset, out, length);
     return this;
   }
 
@@ -729,18 +714,18 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
   protected int _getUnsignedMedium(int index) {
     final long addr = addr(index);
     return (PlatformDependent.getByte(addr) & 0xff) << 16 |
-            (PlatformDependent.getByte(addr + 1) & 0xff) << 8 |
-            PlatformDependent.getByte(addr + 2) & 0xff;
+        (PlatformDependent.getByte(addr + 1) & 0xff) << 8 |
+        PlatformDependent.getByte(addr + 2) & 0xff;
   }
 
   @Override
   public int getBytes(int index, GatheringByteChannel out, int length) throws IOException {
-    return byteBuf.getBytes(index + offset, out, length);
+    return udle.getBytes(index + offset, out, length);
   }
 
   @Override
   public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
-    byteBuf.setBytes(index + offset, src, srcIndex, length);
+    udle.setBytes(index + offset, src, srcIndex, length);
     return this;
   }
 
@@ -751,12 +736,12 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
           length);
     } else {
       if (srcIndex == 0 && src.capacity() == length) {
-        byteBuf.setBytes(index + offset, src);
+        udle.setBytes(index + offset, src);
       } else {
         ByteBuffer newBuf = src.duplicate();
         newBuf.position(srcIndex);
         newBuf.limit(srcIndex + length);
-        byteBuf.setBytes(index + offset, src);
+        udle.setBytes(index + offset, src);
       }
     }
 
@@ -765,24 +750,24 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
 
   @Override
   public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
-    byteBuf.setBytes(index + offset, src, srcIndex, length);
+    udle.setBytes(index + offset, src, srcIndex, length);
     return this;
   }
 
   @Override
   public ByteBuf setBytes(int index, ByteBuffer src) {
-    byteBuf.setBytes(index + offset, src);
+    udle.setBytes(index + offset, src);
     return this;
   }
 
   @Override
   public int setBytes(int index, InputStream in, int length) throws IOException {
-    return byteBuf.setBytes(index + offset, in, length);
+    return udle.setBytes(index + offset, in, length);
   }
 
   @Override
   public int setBytes(int index, ScatteringByteChannel in, int length) throws IOException {
-    return byteBuf.setBytes(index + offset, in, length);
+    return udle.setBytes(index + offset, in, length);
   }
 
   @Override
@@ -820,6 +805,7 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
 
   /**
    * Return the buffer's byte contents in the form of a hex dump.
+   *
    * @param start
    *          the starting byte index
    * @param length
@@ -831,13 +817,13 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
 
     final StringBuilder sb = new StringBuilder("buffer byte dump\n");
     int index = roundedStart;
-    for(int nLogged = 0; nLogged < length; nLogged += LOG_BYTES_PER_ROW) {
+    for (int nLogged = 0; nLogged < length; nLogged += LOG_BYTES_PER_ROW) {
       sb.append(String.format(" [%05d-%05d]", index, index + LOG_BYTES_PER_ROW - 1));
-      for(int i = 0; i < LOG_BYTES_PER_ROW; ++i) {
+      for (int i = 0; i < LOG_BYTES_PER_ROW; ++i) {
         try {
           final byte b = getByte(index++);
           sb.append(String.format(" 0x%02x", b));
-        } catch(IndexOutOfBoundsException ioob) {
+        } catch (IndexOutOfBoundsException ioob) {
           sb.append(" <ioob>");
         }
       }
@@ -855,7 +841,6 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
     return id;
   }
 
-
   public String toVerboseString() {
     if (isEmpty) {
       return toString();

http://git-wip-us.apache.org/repos/asf/drill/blob/a0ae83c2/exec/memory/base/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java
----------------------------------------------------------------------
diff --git a/exec/memory/base/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java b/exec/memory/base/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java
index 12e9907..6495d5d 100644
--- a/exec/memory/base/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java
+++ b/exec/memory/base/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java
@@ -29,6 +29,9 @@ import java.util.concurrent.atomic.AtomicLong;
  */
 public final class UnsafeDirectLittleEndian extends WrappedByteBuf {
   private static final boolean NATIVE_ORDER = ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN;
+  private static final AtomicLong ID_GENERATOR = new AtomicLong(0);
+
+  public final long id = ID_GENERATOR.incrementAndGet();
   private final AbstractByteBuf wrapped;
   private final long memoryAddress;
 
@@ -251,6 +254,11 @@ public final class UnsafeDirectLittleEndian extends WrappedByteBuf {
     return released;
   }
 
+  @Override
+  public int hashCode() {
+    return System.identityHashCode(this);
+  }
+
   public static final boolean ASSERT_ENABLED;
 
   static {

http://git-wip-us.apache.org/repos/asf/drill/blob/a0ae83c2/exec/memory/base/src/main/java/org/apache/drill/exec/memory/AllocationManager.java
----------------------------------------------------------------------
diff --git a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/AllocationManager.java b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/AllocationManager.java
new file mode 100644
index 0000000..f63aade
--- /dev/null
+++ b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/AllocationManager.java
@@ -0,0 +1,434 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.memory;
+
+import static org.apache.drill.exec.memory.BaseAllocator.indent;
+import io.netty.buffer.DrillBuf;
+import io.netty.buffer.PooledByteBufAllocatorL;
+import io.netty.buffer.UnsafeDirectLittleEndian;
+
+import java.util.IdentityHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.drill.common.HistoricalLog;
+import org.apache.drill.common.concurrent.AutoCloseableLock;
+import org.apache.drill.exec.memory.BaseAllocator.Verbosity;
+import org.apache.drill.exec.metrics.DrillMetrics;
+import org.apache.drill.exec.ops.BufferManager;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Manages the relationship between one or more allocators and a particular UDLE. Ensures that one allocator owns the
+ * memory that multiple allocators may be referencing. Manages a BufferLedger between each of its associated allocators.
+ * This class is also responsible for managing when memory is allocated and returned to the Netty-based
+ * PooledByteBufAllocatorL.
+ *
+ * The only reason that this isn't package private is we're forced to put DrillBuf in Netty's package which need access
+ * to these objects or methods.
+ *
+ * Threading: AllocationManager manages thread-safety internally. Operations within the context of a single BufferLedger
+ * are lockless in nature and can be leveraged by multiple threads. Operations that cross the context of two ledgers
+ * will acquire a lock on the AllocationManager instance. Important note, there is one AllocationManager per
+ * UnsafeDirectLittleEndian buffer allocation. As such, there will be thousands of these in a typical query. The
+ * contention of acquiring a lock on AllocationManager should be very low.
+ *
+ */
+public class AllocationManager {
+  // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AllocationManager.class);
+
+  private static final AtomicLong MANAGER_ID_GENERATOR = new AtomicLong(0);
+  private static final AtomicLong LEDGER_ID_GENERATOR = new AtomicLong(0);
+  static final PooledByteBufAllocatorL INNER_ALLOCATOR = new PooledByteBufAllocatorL(DrillMetrics.getInstance());
+
+  private final RootAllocator root;
+  private final long allocatorManagerId = MANAGER_ID_GENERATOR.incrementAndGet();
+  private final int size;
+  private final UnsafeDirectLittleEndian underlying;
+  private final IdentityHashMap<BufferAllocator, BufferLedger> map = new IdentityHashMap<>();
+  private final ReadWriteLock lock = new ReentrantReadWriteLock();
+  private final AutoCloseableLock readLock = new AutoCloseableLock(lock.readLock());
+  private final AutoCloseableLock writeLock = new AutoCloseableLock(lock.writeLock());
+  private final long amCreationTime = System.nanoTime();
+
+  private volatile BufferLedger owningLedger;
+  private volatile long amDestructionTime = 0;
+
+  AllocationManager(BaseAllocator accountingAllocator, int size) {
+    Preconditions.checkNotNull(accountingAllocator);
+    accountingAllocator.assertOpen();
+
+    this.root = accountingAllocator.root;
+    this.underlying = INNER_ALLOCATOR.allocate(size);
+
+    // we do a no retain association since our creator will want to retrieve the newly created ledger and will create a
+    // reference count at that point
+    this.owningLedger = associate(accountingAllocator, false);
+    this.size = underlying.capacity();
+  }
+
+  /**
+   * Associate the existing underlying buffer with a new allocator. This will increase the reference count to the
+   * provided ledger by 1.
+   * @param allocator
+   *          The target allocator to associate this buffer with.
+   * @return The Ledger (new or existing) that associates the underlying buffer to this new ledger.
+   */
+  BufferLedger associate(final BaseAllocator allocator) {
+    return associate(allocator, true);
+  }
+
+  private BufferLedger associate(final BaseAllocator allocator, final boolean retain) {
+    allocator.assertOpen();
+
+    if (root != allocator.root) {
+      throw new IllegalStateException(
+          "A buffer can only be associated between two allocators that share the same root.");
+    }
+
+    try (AutoCloseableLock read = readLock.open()) {
+
+      final BufferLedger ledger = map.get(allocator);
+      if (ledger != null) {
+        if (retain) {
+          ledger.inc();
+        }
+        return ledger;
+      }
+
+    }
+    try (AutoCloseableLock write = writeLock.open()) {
+      // we have to recheck existing ledger since a second reader => writer could be competing with us.
+
+      final BufferLedger existingLedger = map.get(allocator);
+      if (existingLedger != null) {
+        if (retain) {
+          existingLedger.inc();
+        }
+        return existingLedger;
+      }
+
+      final BufferLedger ledger = new BufferLedger(allocator, new ReleaseListener(allocator));
+      if (retain) {
+        ledger.inc();
+      }
+      BufferLedger oldLedger = map.put(allocator, ledger);
+      Preconditions.checkArgument(oldLedger == null);
+      allocator.associateLedger(ledger);
+      return ledger;
+    }
+  }
+
+
+  /**
+   * The way that a particular BufferLedger communicates back to the AllocationManager that it now longer needs to hold
+   * a reference to particular piece of memory.
+   */
+  private class ReleaseListener {
+
+    private final BufferAllocator allocator;
+
+    public ReleaseListener(BufferAllocator allocator) {
+      this.allocator = allocator;
+    }
+
+    /**
+     * Can only be called when you already hold the writeLock.
+     */
+    public void release() {
+      allocator.assertOpen();
+
+      final BufferLedger oldLedger = map.remove(allocator);
+      oldLedger.allocator.dissociateLedger(oldLedger);
+
+      if (oldLedger == owningLedger) {
+        if (map.isEmpty()) {
+          // no one else owns, lets release.
+          oldLedger.allocator.releaseBytes(size);
+          underlying.release();
+          amDestructionTime = System.nanoTime();
+          owningLedger = null;
+        } else {
+          // we need to change the owning allocator. we've been removed so we'll get whatever is top of list
+          BufferLedger newLedger = map.values().iterator().next();
+
+          // we'll forcefully transfer the ownership and not worry about whether we exceeded the limit
+          // since this consumer can't do anything with this.
+          oldLedger.transferBalance(newLedger);
+        }
+      } else {
+        if (map.isEmpty()) {
+          throw new IllegalStateException("The final removal of a ledger should be connected to the owning ledger.");
+        }
+      }
+
+
+    }
+  }
+
+  /**
+   * The reference manager that binds an allocator manager to a particular BaseAllocator. Also responsible for creating
+   * a set of DrillBufs that share a common fate and set of reference counts.
+   * As with AllocationManager, the only reason this is public is due to DrillBuf being in io.netty.buffer package.
+   */
+  public class BufferLedger {
+
+    private final IdentityHashMap<DrillBuf, Object> buffers =
+        BaseAllocator.DEBUG ? new IdentityHashMap<DrillBuf, Object>() : null;
+
+    private final long ledgerId = LEDGER_ID_GENERATOR.incrementAndGet(); // unique ID assigned to each ledger
+    private final AtomicInteger bufRefCnt = new AtomicInteger(0); // start at zero so we can manage request for retain
+                                                                  // correctly
+    private final long lCreationTime = System.nanoTime();
+    private volatile long lDestructionTime = 0;
+    private final BaseAllocator allocator;
+    private final ReleaseListener listener;
+    private final HistoricalLog historicalLog = BaseAllocator.DEBUG ? new HistoricalLog(BaseAllocator.DEBUG_LOG_LENGTH,
+        "BufferLedger[%d]", 1)
+        : null;
+
+    private BufferLedger(BaseAllocator allocator, ReleaseListener listener) {
+      this.allocator = allocator;
+      this.listener = listener;
+    }
+
+    /**
+     * Transfer any balance the current ledger has to the target ledger. In the case that the current ledger holds no
+     * memory, no transfer is made to the new ledger.
+     * @param target
+     *          The ledger to transfer ownership account to.
+     * @return Whether transfer fit within target ledgers limits.
+     */
+    public boolean transferBalance(final BufferLedger target) {
+      Preconditions.checkNotNull(target);
+      Preconditions.checkArgument(allocator.root == target.allocator.root,
+          "You can only transfer between two allocators that share the same root.");
+      allocator.assertOpen();
+
+      target.allocator.assertOpen();
+      // if we're transferring to ourself, just return.
+      if (target == this) {
+        return true;
+      }
+
+      // since two balance transfers out from the allocator manager could cause incorrect accounting, we need to ensure
+      // that this won't happen by synchronizing on the allocator manager instance.
+      try (AutoCloseableLock write = writeLock.open()) {
+        if (owningLedger != this) {
+          return true;
+        }
+
+        if (BaseAllocator.DEBUG) {
+          this.historicalLog.recordEvent("transferBalance(%s)", target.allocator.name);
+          target.historicalLog.recordEvent("incoming(from %s)", owningLedger.allocator.name);
+        }
+
+        boolean overlimit = target.allocator.forceAllocate(size);
+        allocator.releaseBytes(size);
+        owningLedger = target;
+        return overlimit;
+      }
+
+    }
+
+    /**
+     * Print the current ledger state to a the provided StringBuilder.
+     * @param sb
+     *          The StringBuilder to populate.
+     * @param indent
+     *          The level of indentation to position the data.
+     * @param verbosity
+     *          The level of verbosity to print.
+     */
+    public void print(StringBuilder sb, int indent, Verbosity verbosity) {
+      indent(sb, indent)
+          .append("ledger[")
+          .append(ledgerId)
+          .append("] allocator: ")
+          .append(allocator.name)
+          .append("), isOwning: ")
+          .append(owningLedger == this)
+          .append(", size: ")
+          .append(size)
+          .append(", references: ")
+          .append(bufRefCnt.get())
+          .append(", life: ")
+          .append(lCreationTime)
+          .append("..")
+          .append(lDestructionTime)
+          .append(", allocatorManager: [")
+          .append(AllocationManager.this.allocatorManagerId)
+          .append(", life: ")
+          .append(amCreationTime)
+          .append("..")
+          .append(amDestructionTime);
+
+      if (!BaseAllocator.DEBUG) {
+        sb.append("]\n");
+      } else {
+        synchronized (buffers) {
+          sb.append("] holds ")
+              .append(buffers.size())
+              .append(" buffers. \n");
+          for (DrillBuf buf : buffers.keySet()) {
+            buf.print(sb, indent + 2, verbosity);
+            sb.append('\n');
+          }
+        }
+      }
+
+    }
+
+    private void inc() {
+      bufRefCnt.incrementAndGet();
+    }
+
+    /**
+     * Decrement the ledger's reference count. If the ledger is decremented to zero, this ledger should release its
+     * ownership back to the AllocationManager
+     */
+    public int decrement(int decrement) {
+      allocator.assertOpen();
+
+      final int outcome;
+      try (AutoCloseableLock write = writeLock.open()) {
+        outcome = bufRefCnt.addAndGet(-decrement);
+        if (outcome == 0) {
+          lDestructionTime = System.nanoTime();
+          listener.release();
+        }
+      }
+
+      return outcome;
+    }
+
+    /**
+     * Returns the ledger associated with a particular BufferAllocator. If the BufferAllocator doesn't currently have a
+     * ledger associated with this AllocationManager, a new one is created. This is placed on BufferLedger rather than
+     * AllocationManager directly because DrillBufs don't have access to AllocationManager and they are the ones
+     * responsible for exposing the ability to associate multiple allocators with a particular piece of underlying
+     * memory. Note that this will increment the reference count of this ledger by one to ensure the ledger isn't
+     * destroyed before use.
+     *
+     * @param allocator
+     * @return
+     */
+    public BufferLedger getLedgerForAllocator(BufferAllocator allocator) {
+      return associate((BaseAllocator) allocator);
+    }
+
+    /**
+     * Create a new DrillBuf associated with this AllocationManager and memory. Does not impact reference count.
+     * Typically used for slicing.
+     * @param offset
+     *          The offset in bytes to start this new DrillBuf.
+     * @param length
+     *          The length in bytes that this DrillBuf will provide access to.
+     * @return A new DrillBuf that shares references with all DrillBufs associated with this BufferLedger
+     */
+    public DrillBuf newDrillBuf(int offset, int length) {
+      allocator.assertOpen();
+      return newDrillBuf(offset, length, null);
+    }
+
+    /**
+     * Create a new DrillBuf associated with this AllocationManager and memory.
+     * @param offset
+     *          The offset in bytes to start this new DrillBuf.
+     * @param length
+     *          The length in bytes that this DrillBuf will provide access to.
+     * @param manager
+     *          An optional BufferManager argument that can be used to manage expansion of this DrillBuf
+     * @param retain
+     *          Whether or not the newly created buffer should get an additional reference count added to it.
+     * @return A new DrillBuf that shares references with all DrillBufs associated with this BufferLedger
+     */
+    public DrillBuf newDrillBuf(int offset, int length, BufferManager manager) {
+      allocator.assertOpen();
+
+      final DrillBuf buf = new DrillBuf(
+          bufRefCnt,
+          this,
+          underlying,
+          manager,
+          allocator.getAsByteBufAllocator(),
+          offset,
+          length,
+          false);
+
+      if (BaseAllocator.DEBUG) {
+        historicalLog.recordEvent(
+            "DrillBuf(BufferLedger, BufferAllocator[%s], UnsafeDirectLittleEndian[identityHashCode == "
+                + "%d](%s)) => ledger hc == %d",
+            allocator.name, System.identityHashCode(buf), buf.toString(),
+            System.identityHashCode(this));
+
+        synchronized (buffers) {
+          buffers.put(buf, null);
+        }
+      }
+
+      return buf;
+
+    }
+
+    /**
+     * What is the total size (in bytes) of memory underlying this ledger.
+     *
+     * @return Size in bytes
+     */
+    public int getSize() {
+      return size;
+    }
+
+    /**
+     * How much memory is accounted for by this ledger. This is either getSize() if this is the owning ledger for the
+     * memory or zero in the case that this is not the owning ledger associated with this memory.
+     *
+     * @return Amount of accounted(owned) memory associated with this ledger.
+     */
+    public int getAccountedSize() {
+      try (AutoCloseableLock read = readLock.open()) {
+        if (owningLedger == this) {
+          return size;
+        } else {
+          return 0;
+        }
+      }
+    }
+
+    /**
+     * Package visible for debugging/verification only.
+     */
+    UnsafeDirectLittleEndian getUnderlying() {
+      return underlying;
+    }
+
+    /**
+     * Package visible for debugging/verification only.
+     */
+    boolean isOwningLedger() {
+      return this == owningLedger;
+    }
+
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/a0ae83c2/exec/memory/base/src/main/java/org/apache/drill/exec/memory/AllocatorManager.java
----------------------------------------------------------------------
diff --git a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/AllocatorManager.java b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/AllocatorManager.java
deleted file mode 100644
index 5142806..0000000
--- a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/AllocatorManager.java
+++ /dev/null
@@ -1,356 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.memory;
-
-import static org.apache.drill.exec.memory.BaseAllocator.indent;
-import io.netty.buffer.DrillBuf;
-import io.netty.buffer.PooledByteBufAllocatorL;
-import io.netty.buffer.UnsafeDirectLittleEndian;
-
-import java.util.IdentityHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.apache.drill.common.HistoricalLog;
-import org.apache.drill.common.concurrent.AutoCloseableLock;
-import org.apache.drill.exec.memory.BaseAllocator.Verbosity;
-import org.apache.drill.exec.metrics.DrillMetrics;
-import org.apache.drill.exec.ops.BufferManager;
-
-import com.google.common.base.Preconditions;
-
-/**
- * Manages the relationship between one or more allocators and a particular UDLE. Ensures that one allocator owns the
- * memory that multiple allocators may be referencing. Manages a BufferLedger between each of its associated allocators.
- * This class is also responsible for managing when memory is allocated and returned to the Netty-based
- * PooledByteBufAllocatorL.
- *
- * The only reason that this isn't package private is we're forced to put DrillBuf in Netty's package which need access
- * to these objects or methods.
- *
- * Threading: AllocatorManager manages thread-safety internally. Operations within the context of a single BufferLedger
- * are lockless in nature and can be leveraged by multiple threads. Operations that cross the context of two ledgers
- * will acquire a lock on the AllocatorManager instance. Important note, there is one AllocatorManager per
- * UnsafeDirectLittleEndian buffer allocation. As such, there will be thousands of these in a typical query. The
- * contention of acquiring a lock on AllocatorManager should be very low.
- *
- */
-public class AllocatorManager {
-  // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AllocatorManager.class);
-
-  private static final AtomicLong LEDGER_ID_GENERATOR = new AtomicLong(0);
-  static final PooledByteBufAllocatorL INNER_ALLOCATOR = new PooledByteBufAllocatorL(DrillMetrics.getInstance());
-
-  private final RootAllocator root;
-  private volatile BufferLedger owningLedger;
-  private final int size;
-  private final UnsafeDirectLittleEndian underlying;
-  private final IdentityHashMap<BufferAllocator, BufferLedger> map = new IdentityHashMap<>();
-  private final ReadWriteLock lock = new ReentrantReadWriteLock();
-  private final AutoCloseableLock readLock = new AutoCloseableLock(lock.readLock());
-  private final AutoCloseableLock writeLock = new AutoCloseableLock(lock.writeLock());
-  private final IdentityHashMap<DrillBuf, Object> buffers =
-      BaseAllocator.DEBUG ? new IdentityHashMap<DrillBuf, Object>() : null;
-
-  AllocatorManager(BaseAllocator accountingAllocator, int size) {
-    Preconditions.checkNotNull(accountingAllocator);
-    this.root = accountingAllocator.root;
-    this.underlying = INNER_ALLOCATOR.allocate(size);
-    this.owningLedger = associate(accountingAllocator);
-    this.size = underlying.capacity();
-  }
-
-  /**
-   * Associate the existing underlying buffer with a new allocator.
-   *
-   * @param allocator
-   *          The target allocator to associate this buffer with.
-   * @return The Ledger (new or existing) that associates the underlying buffer to this new ledger.
-   */
-  public BufferLedger associate(final BaseAllocator allocator) {
-    if (root != allocator.root) {
-      throw new IllegalStateException(
-          "A buffer can only be associated between two allocators that share the same root.");
-    }
-
-    try (AutoCloseableLock read = readLock.open()) {
-
-      final BufferLedger ledger = map.get(allocator);
-      if (ledger != null) {
-        return ledger;
-      }
-
-    }
-    try (AutoCloseableLock write = writeLock.open()) {
-      final BufferLedger ledger = new BufferLedger(allocator, new ReleaseListener(allocator));
-      map.put(allocator, ledger);
-      allocator.associateLedger(ledger);
-      return ledger;
-    }
-  }
-
-
-  /**
-   * The way that a particular BufferLedger communicates back to the AllocatorManager that it now longer needs to hold a
-   * reference to particular piece of memory.
-   */
-  private class ReleaseListener {
-
-    private final BufferAllocator allocator;
-
-    public ReleaseListener(BufferAllocator allocator) {
-      this.allocator = allocator;
-    }
-
-    public void release() {
-      try (AutoCloseableLock write = writeLock.open()) {
-        final BufferLedger oldLedger = map.remove(allocator);
-        oldLedger.allocator.dissociateLedger(oldLedger);
-
-        if (oldLedger == owningLedger) {
-          if (map.isEmpty()) {
-            // no one else owns, lets release.
-            oldLedger.allocator.releaseBytes(size);
-            underlying.release();
-          } else {
-            // we need to change the owning allocator. we've been removed so we'll get whatever is top of list
-            BufferLedger newLedger = map.values().iterator().next();
-
-            // we'll forcefully transfer the ownership and not worry about whether we exceeded the limit
-            // since this consumer can do anything with this.
-            oldLedger.transferBalance(newLedger);
-          }
-        }
-
-
-      }
-    }
-  }
-
-  /**
-   * The reference manager that binds an allocator manager to a particular BaseAllocator. Also responsible for creating
-   * a set of DrillBufs that share a common fate and set of reference counts.
-   *
-   * As with AllocatorManager, the only reason this is public is due to DrillBuf being in io.netty.buffer package.
-   */
-  public class BufferLedger {
-    private final long id = LEDGER_ID_GENERATOR.incrementAndGet(); // unique ID assigned to each ledger
-    private final AtomicInteger bufRefCnt = new AtomicInteger(0); // start at zero so we can manage request for retain
-                                                                  // correctly
-    private final BaseAllocator allocator;
-    private final ReleaseListener listener;
-    private final HistoricalLog historicalLog = BaseAllocator.DEBUG ? new HistoricalLog(BaseAllocator.DEBUG_LOG_LENGTH,
-        "BufferLedger[%d]", 1)
-        : null;
-
-    private BufferLedger(BaseAllocator allocator, ReleaseListener listener) {
-      this.allocator = allocator;
-      this.listener = listener;
-    }
-
-    /**
-     * Transfer any balance the current ledger has to the target ledger. In the case that the current ledger holds no
-     * memory, no transfer is made to the new ledger.
-     *
-     * @param target
-     *          The ledger to transfer ownership account to.
-     * @return Whether transfer fit within target ledgers limits.
-     */
-    public boolean transferBalance(BufferLedger target) {
-      Preconditions.checkNotNull(target);
-      Preconditions.checkArgument(allocator.root == target.allocator.root,
-          "You can only transfer between two allocators that share the same root.");
-
-      // since two balance transfers out from the allocator manager could cause incorrect accounting, we need to ensure
-      // that this won't happen by synchronizing on the allocator manager instance.
-      synchronized (AllocatorManager.this) {
-        if (this != owningLedger || target == this) {
-          return true;
-        }
-
-        if (BaseAllocator.DEBUG) {
-          this.historicalLog.recordEvent("transferBalance(%s)", target.allocator.name);
-          target.historicalLog.recordEvent("incoming(from %s)", owningLedger.allocator.name);
-        }
-
-        boolean overlimit = target.allocator.forceAllocate(size);
-        allocator.releaseBytes(size);
-        owningLedger = target;
-        return overlimit;
-      }
-
-    }
-
-    /**
-     * Print the current ledger state to a the provided StringBuilder.
-     *
-     * @param sb
-     *          The StringBuilder to populate.
-     * @param indent
-     *          The level of indentation to position the data.
-     * @param verbosity
-     *          The level of verbosity to print.
-     */
-    public void print(StringBuilder sb, int indent, Verbosity verbosity) {
-      indent(sb, indent)
-          .append("ledger (allocator: ")
-          .append(allocator.name)
-          .append("), isOwning: ")
-          .append(owningLedger == this)
-          .append(", size: ")
-          .append(size)
-          .append(", references: ")
-          .append(bufRefCnt.get())
-          .append('\n');
-
-      if (BaseAllocator.DEBUG) {
-        synchronized (buffers) {
-          indent(sb, indent + 1).append("BufferLedger[" + id + "] holds ").append(buffers.size())
-              .append(" buffers. \n");
-          for (DrillBuf buf : buffers.keySet()) {
-            buf.print(sb, indent + 2, verbosity);
-            sb.append('\n');
-          }
-        }
-      }
-
-    }
-
-    /**
-     * Release this ledger. This means that all reference counts associated with this ledger are no longer used. This
-     * will inform the AllocatorManager to make a decision about how to manage any memory owned by this particular
-     * BufferLedger
-     */
-    public void release() {
-      listener.release();
-    }
-
-    /**
-     * Returns the ledger associated with a particular BufferAllocator. If the BufferAllocator doesn't currently have a
-     * ledger associated with this AllocatorManager, a new one is created. This is placed on BufferLedger rather than
-     * AllocatorManager direclty because DrillBufs don't have access to AllocatorManager and they are the ones
-     * responsible for exposing the ability to associate mutliple allocators with a particular piece of underlying
-     * memory.
-     *
-     * @param allocator
-     * @return
-     */
-    public BufferLedger getLedgerForAllocator(BufferAllocator allocator) {
-      return associate((BaseAllocator) allocator);
-    }
-
-    /**
-     * Create a new DrillBuf associated with this AllocatorManager and memory. Does not impact reference count.
-     * Typically used for slicing.
-     * @param offset
-     *          The offset in bytes to start this new DrillBuf.
-     * @param length
-     *          The length in bytes that this DrillBuf will provide access to.
-     * @return A new DrillBuf that shares references with all DrillBufs associated with this BufferLedger
-     */
-    public DrillBuf newDrillBuf(int offset, int length) {
-      return newDrillBuf(offset, length, null, false);
-    }
-
-    /**
-     * Create a new DrillBuf associated with this AllocatorManager and memory.
-     * @param offset
-     *          The offset in bytes to start this new DrillBuf.
-     * @param length
-     *          The length in bytes that this DrillBuf will provide access to.
-     * @param manager
-     *          An optional BufferManager argument that can be used to manage expansion of this DrillBuf
-     * @param retain
-     *          Whether or not the newly created buffer should get an additional reference count added to it.
-     * @return A new DrillBuf that shares references with all DrillBufs associated with this BufferLedger
-     */
-    public DrillBuf newDrillBuf(int offset, int length, BufferManager manager, boolean retain) {
-      final DrillBuf buf = new DrillBuf(
-          bufRefCnt,
-          this,
-          underlying,
-          manager,
-          allocator.getAsByteBufAllocator(),
-          offset,
-          length,
-          false);
-
-      if (retain) {
-        buf.retain();
-      }
-
-      if (BaseAllocator.DEBUG) {
-        historicalLog.recordEvent(
-            "DrillBuf(BufferLedger, BufferAllocator[%s], UnsafeDirectLittleEndian[identityHashCode == "
-                + "%d](%s)) => ledger hc == %d",
-            allocator.name, System.identityHashCode(buf), buf.toString(),
-            System.identityHashCode(this));
-
-        synchronized (buffers) {
-          buffers.put(buf, null);
-        }
-      }
-
-      return buf;
-
-    }
-
-    /**
-     * What is the total size (in bytes) of memory underlying this ledger.
-     *
-     * @return Size in bytes
-     */
-    public int getSize() {
-      return size;
-    }
-
-    /**
-     * How much memory is accounted for by this ledger. This is either getSize() if this is the owning ledger for the
-     * memory or zero in the case that this is not the owning ledger associated with this memory.
-     *
-     * @return Amount of accounted(owned) memory associated with this ledger.
-     */
-    public int getAccountedSize() {
-      try (AutoCloseableLock read = readLock.open()) {
-        if (owningLedger == this) {
-          return size;
-        } else {
-          return 0;
-        }
-      }
-    }
-
-    /**
-     * Package visible for debugging/verification only.
-     */
-    UnsafeDirectLittleEndian getUnderlying() {
-      return underlying;
-    }
-
-    /**
-     * Package visible for debugging/verification only.
-     */
-    boolean isOwningLedger() {
-      return this == owningLedger;
-    }
-
-  }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/a0ae83c2/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java
----------------------------------------------------------------------
diff --git a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java
index bf4dc8a..8c7e7ca 100644
--- a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java
+++ b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java
@@ -29,7 +29,7 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.drill.common.HistoricalLog;
 import org.apache.drill.exec.exception.OutOfMemoryException;
-import org.apache.drill.exec.memory.AllocatorManager.BufferLedger;
+import org.apache.drill.exec.memory.AllocationManager.BufferLedger;
 import org.apache.drill.exec.ops.BufferManager;
 import org.apache.drill.exec.util.AssertionUtil;
 
@@ -41,7 +41,7 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato
   public static final String DEBUG_ALLOCATOR = "drill.memory.debug.allocator";
 
   private static final AtomicLong ID_GENERATOR = new AtomicLong(0);
-  private static final int CHUNK_SIZE = AllocatorManager.INNER_ALLOCATOR.getChunkSize();
+  private static final int CHUNK_SIZE = AllocationManager.INNER_ALLOCATOR.getChunkSize();
 
   public static final int DEBUG_LOG_LENGTH = 6;
   public static final boolean DEBUG = AssertionUtil.isAssertionsEnabled()
@@ -101,6 +101,15 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato
 
   }
 
+  public void assertOpen() {
+    if (AssertionUtil.ASSERT_ENABLED) {
+      if (isClosed) {
+        throw new IllegalStateException("Attempting operation on allocator when allocator is closed.\n"
+            + toVerboseString());
+      }
+    }
+  }
+
   @Override
   public String getName() {
     return name;
@@ -108,14 +117,16 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato
 
   @Override
   public DrillBuf getEmpty() {
+    assertOpen();
     return empty;
   }
 
   /**
-   * For debug/verification purposes only. Allows an AllocatorManager to tell the allocator that we have a new ledger
+   * For debug/verification purposes only. Allows an AllocationManager to tell the allocator that we have a new ledger
    * associated with this allocator.
    */
   void associateLedger(BufferLedger ledger) {
+    assertOpen();
     if (DEBUG) {
       synchronized (DEBUG_LOCK) {
         childLedgers.put(ledger, null);
@@ -124,10 +135,11 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato
   }
 
   /**
-   * For debug/verification purposes only. Allows an AllocatorManager to tell the allocator that we are removing a
+   * For debug/verification purposes only. Allows an AllocationManager to tell the allocator that we are removing a
    * ledger associated with this allocator
    */
   void dissociateLedger(BufferLedger ledger) {
+    assertOpen();
     if (DEBUG) {
       synchronized (DEBUG_LOCK) {
         if (!childLedgers.containsKey(ledger)) {
@@ -145,6 +157,8 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato
    *          The child allocator that has been closed.
    */
   private void childClosed(final BaseAllocator childAllocator) {
+    assertOpen();
+
     if (DEBUG) {
       Preconditions.checkArgument(childAllocator != null, "child allocator can't be null");
 
@@ -172,15 +186,20 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato
 
   @Override
   public DrillBuf buffer(final int initialRequestSize) {
+    assertOpen();
+
     return buffer(initialRequestSize, null);
   }
 
   private DrillBuf createEmpty(){
-    return new DrillBuf(new AtomicInteger(), null, AllocatorManager.INNER_ALLOCATOR.empty, null, null, 0, 0, true);
+    assertOpen();
+
+    return new DrillBuf(new AtomicInteger(), null, AllocationManager.INNER_ALLOCATOR.empty, null, null, 0, 0, true);
   }
 
   @Override
   public DrillBuf buffer(final int initialRequestSize, BufferManager manager) {
+    assertOpen();
 
     Preconditions.checkArgument(initialRequestSize >= 0, "the requested size must be non-negative");
 
@@ -215,9 +234,11 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato
    * with creating a new buffer.
    */
   private DrillBuf bufferWithoutReservation(final int size, BufferManager bufferManager) throws OutOfMemoryException {
-    AllocatorManager manager = new AllocatorManager(this, size);
-    BufferLedger ledger = manager.associate(this);
-    DrillBuf buffer = ledger.newDrillBuf(0, size, bufferManager, true);
+    assertOpen();
+
+    final AllocationManager manager = new AllocationManager(this, size);
+    final BufferLedger ledger = manager.associate(this); // +1 ref cnt (required)
+    final DrillBuf buffer = ledger.newDrillBuf(0, size, bufferManager);
 
     // make sure that our allocation is equal to what we expected.
     Preconditions.checkArgument(buffer.capacity() == size,
@@ -236,6 +257,8 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato
       final String name,
       final long initReservation,
       final long maxAllocation) {
+    assertOpen();
+
     final ChildAllocator childAllocator = new ChildAllocator(this, name, initReservation, maxAllocation);
 
     if (DEBUG) {
@@ -267,6 +290,8 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato
     }
 
     public boolean add(final int nBytes) {
+      assertOpen();
+
       Preconditions.checkArgument(nBytes >= 0, "nBytes(%d) < 0", nBytes);
       Preconditions.checkState(!closed, "Attempt to increase reservation after reservation has been closed");
       Preconditions.checkState(!used, "Attempt to increase reservation after reservation has been used");
@@ -286,6 +311,8 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato
     }
 
     public DrillBuf allocateBuffer() {
+      assertOpen();
+
       Preconditions.checkState(!closed, "Attempt to allocate after closed");
       Preconditions.checkState(!used, "Attempt to allocate more than once");
 
@@ -308,6 +335,8 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato
 
     @Override
     public void close() {
+      assertOpen();
+
       if (closed) {
         return;
       }
@@ -338,6 +367,8 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato
     }
 
     public boolean reserve(int nBytes) {
+      assertOpen();
+
       final AllocationOutcome outcome = BaseAllocator.this.allocateBytes(nBytes);
 
       if (DEBUG) {
@@ -358,6 +389,8 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato
      * @return the buffer, or null, if the request cannot be satisfied
      */
     private DrillBuf allocate(int nBytes) {
+      assertOpen();
+
       boolean success = false;
 
       /*
@@ -387,6 +420,8 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato
      *          the size of the reservation
      */
     private void releaseReservation(int nBytes) {
+      assertOpen();
+
       releaseBytes(nBytes);
 
       if (DEBUG) {
@@ -398,6 +433,8 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato
 
   @Override
   public AllocationReservation newReservation() {
+    assertOpen();
+
     return new Reservation();
   }
 
@@ -412,6 +449,8 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato
       return;
     }
 
+    isClosed = true;
+
     if (DEBUG) {
       synchronized(DEBUG_LOCK) {
         verifyAllocator();
@@ -447,12 +486,12 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato
       }
     }
 
-      // Is there unaccounted-for outstanding allocation?
-      final long allocated = getAllocatedMemory();
-      if (allocated > 0) {
-        throw new IllegalStateException(
-          String.format("Unaccounted for outstanding allocation (%d)\n%s", allocated, toString()));
-      }
+    // Is there unaccounted-for outstanding allocation?
+    final long allocated = getAllocatedMemory();
+    if (allocated > 0) {
+      throw new IllegalStateException(
+          String.format("Memory was leaked by query. Memory leaked: (%d)\n%s", allocated, toString()));
+    }
 
     // we need to release our memory to our parent before we tell it we've closed.
     super.close();
@@ -469,8 +508,6 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato
           name));
     }
 
-    isClosed = true;
-
 
   }
 
@@ -648,6 +685,15 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato
         }
 
         logger.debug(sb.toString());
+
+        final long allocated2 = getAllocatedMemory();
+
+        if (allocated2 != allocated) {
+          throw new IllegalStateException(String.format(
+              "allocator[%s]: allocated t1 (%d) + allocated t2 (%d). Someone released memory while in verification.",
+              name, allocated, allocated2));
+
+        }
         throw new IllegalStateException(String.format(
             "allocator[%s]: buffer space (%d) + prealloc space (%d) + child space (%d) != allocated (%d)",
             name, bufferTotal, reservedTotal, childTotal, allocated));

http://git-wip-us.apache.org/repos/asf/drill/blob/a0ae83c2/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java
----------------------------------------------------------------------
diff --git a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java
index 0226254..64f7d86 100644
--- a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java
+++ b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java
@@ -146,4 +146,9 @@ public interface BufferAllocator extends AutoCloseable {
    */
   public String toVerboseString();
 
+  /**
+   * Asserts (using java assertions) that the provided allocator is currently open. If assertions are disabled, this is
+   * a no-op.
+   */
+  public void assertOpen();
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/a0ae83c2/exec/memory/base/src/main/java/org/apache/drill/exec/memory/README.md
----------------------------------------------------------------------
diff --git a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/README.md b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/README.md
index cbb8d96..9832e57 100644
--- a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/README.md
+++ b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/README.md
@@ -35,8 +35,8 @@ Memory management can be broken into the following main components:
   - `RootAllocator` - The root allocator. Typically only one created for a JVM
   - `ChildAllocator` - A child allocator that derives from the root allocator
 - Buffer ownership and transfer capabilities
-  - `AllocatorManager` - Responsible for managing the relationship between multiple allocators and a single chunk of memory
-  - `BufferLedger` - Responsible for allowing maintaining the relationship between an `AllocatorManager`, a `BufferAllocator` and one or more individual `DrillBuf`s 
+  - `AllocationManager` - Responsible for managing the relationship between multiple allocators and a single chunk of memory
+  - `BufferLedger` - Responsible for allowing maintaining the relationship between an `AllocationManager`, a `BufferAllocator` and one or more individual `DrillBuf`s 
 - Memory access
   - `DrillBuf` - The facade for interacting directly with a chunk of memory.
  
@@ -64,9 +64,9 @@ Drill provides two different ways to reserve memory:
   - `AllocationReservation` via BufferAllocator.newReservation(): Allows a short-term preallocation strategy so that a particular subsystem can ensure future memory is available to support a particular request.
   
 ## Memory Ownership, Reference Counts and Sharing
-Many BufferAllocators can reference the same piece of memory at the same time. The most common situation for this is in the case of a Broadcast Join: in this situation many downstream operators in the same Drillbit will receive the same physical memory. Each of these operators will be operating within its own Allocator context. We therefore have multiple allocators all pointing at the same physical memory. It is the AllocatorManager's responsibility to ensure that in this situation, that all memory is accurately accounted for from the Root's perspective and also to ensure that the memory is correctly released once all BufferAllocators have stopped using that memory.
+Many BufferAllocators can reference the same piece of memory at the same time. The most common situation for this is in the case of a Broadcast Join: in this situation many downstream operators in the same Drillbit will receive the same physical memory. Each of these operators will be operating within its own Allocator context. We therefore have multiple allocators all pointing at the same physical memory. It is the AllocationManager's responsibility to ensure that in this situation, that all memory is accurately accounted for from the Root's perspective and also to ensure that the memory is correctly released once all BufferAllocators have stopped using that memory.
 
-For simplicity of accounting, we treat that memory as being used by one of the BufferAllocators associated with the memory. When that allocator releases its claim on that memory, the memory ownership is then moved to another BufferLedger belonging to the same AllocatorManager. Note that because a DrillBuf.release() is what actually causes memory ownership transfer to occur, we always precede with ownership transfer (even if that violates an allocator limit). It is the responsibility of the application owning a particular allocator to frequently confirm whether the allocator is over its memory limit (BufferAllocator.isOverLimit()) and if so, attempt to aggresively release memory to ameliorate the situation.
+For simplicity of accounting, we treat that memory as being used by one of the BufferAllocators associated with the memory. When that allocator releases its claim on that memory, the memory ownership is then moved to another BufferLedger belonging to the same AllocationManager. Note that because a DrillBuf.release() is what actually causes memory ownership transfer to occur, we always precede with ownership transfer (even if that violates an allocator limit). It is the responsibility of the application owning a particular allocator to frequently confirm whether the allocator is over its memory limit (BufferAllocator.isOverLimit()) and if so, attempt to aggresively release memory to ameliorate the situation.
 
 All DrillBufs (direct or sliced) related to a single BufferLedger/BufferAllocator combination share the same reference count and either all will be valid or all will be invalid.
 
@@ -76,9 +76,9 @@ There are two main ways that someone can look at the object hierarchy for Drill'
 
 ### Memory Perspective
 <pre>
-+ AllocatorManager
++ AllocationManager
 |
-|-- UnsignedDirectLittleEndian (One per AllocatorManager)
+|-- UnsignedDirectLittleEndian (One per AllocationManager)
 |
 |-+ BufferLedger 1 ==> Allocator A (owning)
 | ` - DrillBuf 1
@@ -102,20 +102,20 @@ In this picture, a piece of memory is owned by an allocator manager. An allocato
 |-+ ChildAllocator 2
 |-+ ChildAllocator 3
 | |
-| |-+ BufferLedger 1 ==> AllocatorManager 1 (owning) ==> UDLE
+| |-+ BufferLedger 1 ==> AllocationManager 1 (owning) ==> UDLE
 | | `- DrillBuf 1
-| `-+ BufferLedger 2 ==> AllocatorManager 2 (non-owning)==> UDLE
+| `-+ BufferLedger 2 ==> AllocationManager 2 (non-owning)==> UDLE
 | 	`- DrillBuf 2
 |
-|-+ BufferLedger 3 ==> AllocatorManager 1 (non-owning)==> UDLE
+|-+ BufferLedger 3 ==> AllocationManager 1 (non-owning)==> UDLE
 | ` - DrillBuf 3
-|-+ BufferLedger 4 ==> AllocatorManager 2 (owning) ==> UDLE
+|-+ BufferLedger 4 ==> AllocationManager 2 (owning) ==> UDLE
   | - DrillBuf 4
   | - DrillBuf 5
   ` - DrillBuf 6
 </pre>
 
-In this picture, a RootAllocator owns three ChildAllocators. The first ChildAllocator (ChildAllocator 1) owns a subsequent ChildAllocator. ChildAllocator has two BufferLedgers/AllocatorManager references. Coincidentally, each of these AllocatorManager's is also associated with the RootAllocator. In this case, one of the these AllocatorManagers is owned by ChildAllocator 3 (AllocatorManager 1) while the other AllocatorManager (AllocatorManager 2) is owned/accounted for by the RootAllocator. Note that in this scenario, DrillBuf 1 is sharing the underlying memory as DrillBuf 3. However the subset of that memory (e.g. through slicing) might be different. Also note that DrillBuf 2 and DrillBuf 4, 5 and 6 are also sharing the same underlying memory. Also note that DrillBuf 4, 5 and 6 all share the same reference count and fate.
+In this picture, a RootAllocator owns three ChildAllocators. The first ChildAllocator (ChildAllocator 1) owns a subsequent ChildAllocator. ChildAllocator has two BufferLedgers/AllocationManager references. Coincidentally, each of these AllocationManager's is also associated with the RootAllocator. In this case, one of the these AllocationManagers is owned by ChildAllocator 3 (AllocationManager 1) while the other AllocationManager (AllocationManager 2) is owned/accounted for by the RootAllocator. Note that in this scenario, DrillBuf 1 is sharing the underlying memory as DrillBuf 3. However the subset of that memory (e.g. through slicing) might be different. Also note that DrillBuf 2 and DrillBuf 4, 5 and 6 are also sharing the same underlying memory. Also note that DrillBuf 4, 5 and 6 all share the same reference count and fate.
 
 ## Debugging Issues
 The Allocator object provides a useful set of tools to better understand the status of the allocator. If in `DEBUG` mode, the allocator and supporting classes will record additional debug tracking information to better track down memory leaks and issues. To enable DEBUG mode, either enable Java assertions with `-ea` or pass the following system property to the VM when starting `-Ddrill.memory.debug.allocator=true`. The BufferAllocator also provides a `BufferAllocator.toVerboseString()` which can be used in DEBUG mode to get extensive stacktrace information and events associated with various Allocator behaviors.
\ No newline at end of file


Mime
View raw message