arrow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jacq...@apache.org
Subject [11/17] arrow git commit: ARROW-1: Initial Arrow Code Commit
Date Wed, 17 Feb 2016 12:39:46 GMT
ARROW-1: Initial Arrow Code Commit


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/fa5f0299
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/fa5f0299
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/fa5f0299

Branch: refs/heads/master
Commit: fa5f0299f046c46e1b2f671e5e3b4f1956522711
Parents: cbc56bf
Author: Steven Phillips <steven@dremio.com>
Authored: Wed Feb 17 04:37:53 2016 -0800
Committer: Jacques Nadeau <jacques@apache.org>
Committed: Wed Feb 17 04:38:39 2016 -0800

----------------------------------------------------------------------
 java/.gitignore                                 |  22 +
 java/memory/pom.xml                             |  50 ++
 .../src/main/java/io/netty/buffer/ArrowBuf.java | 863 +++++++++++++++++++
 .../java/io/netty/buffer/ExpandableByteBuf.java |  55 ++
 .../main/java/io/netty/buffer/LargeBuffer.java  |  59 ++
 .../io/netty/buffer/MutableWrappedByteBuf.java  | 336 ++++++++
 .../netty/buffer/PooledByteBufAllocatorL.java   | 272 ++++++
 .../netty/buffer/UnsafeDirectLittleEndian.java  | 270 ++++++
 .../org/apache/arrow/memory/Accountant.java     | 272 ++++++
 .../apache/arrow/memory/AllocationManager.java  | 433 ++++++++++
 .../arrow/memory/AllocationReservation.java     |  86 ++
 .../arrow/memory/AllocatorClosedException.java  |  31 +
 .../org/apache/arrow/memory/BaseAllocator.java  | 781 +++++++++++++++++
 .../org/apache/arrow/memory/BoundsChecking.java |  35 +
 .../apache/arrow/memory/BufferAllocator.java    | 151 ++++
 .../org/apache/arrow/memory/BufferManager.java  |  66 ++
 .../org/apache/arrow/memory/ChildAllocator.java |  53 ++
 .../arrow/memory/DrillByteBufAllocator.java     | 141 +++
 .../arrow/memory/OutOfMemoryException.java      |  50 ++
 .../main/java/org/apache/arrow/memory/README.md | 121 +++
 .../org/apache/arrow/memory/RootAllocator.java  |  39 +
 .../org/apache/arrow/memory/package-info.java   |  24 +
 .../apache/arrow/memory/util/AssertionUtil.java |  37 +
 .../arrow/memory/util/AutoCloseableLock.java    |  43 +
 .../apache/arrow/memory/util/HistoricalLog.java | 185 ++++
 .../org/apache/arrow/memory/util/Metrics.java   |  40 +
 .../org/apache/arrow/memory/util/Pointer.java   |  28 +
 .../apache/arrow/memory/util/StackTrace.java    |  70 ++
 .../memory/src/main/resources/drill-module.conf |  25 +
 .../org/apache/arrow/memory/TestAccountant.java | 164 ++++
 .../apache/arrow/memory/TestBaseAllocator.java  | 648 ++++++++++++++
 .../org/apache/arrow/memory/TestEndianess.java  |  43 +
 java/pom.xml                                    | 470 ++++++++++
 java/vector/pom.xml                             | 165 ++++
 java/vector/src/main/codegen/config.fmpp        |  24 +
 .../src/main/codegen/data/ValueVectorTypes.tdd  | 168 ++++
 .../src/main/codegen/includes/license.ftl       |  18 +
 .../src/main/codegen/includes/vv_imports.ftl    |  62 ++
 .../codegen/templates/AbstractFieldReader.java  | 124 +++
 .../codegen/templates/AbstractFieldWriter.java  | 147 ++++
 .../AbstractPromotableFieldWriter.java          | 142 +++
 .../src/main/codegen/templates/BaseReader.java  |  73 ++
 .../src/main/codegen/templates/BaseWriter.java  | 117 +++
 .../main/codegen/templates/BasicTypeHelper.java | 538 ++++++++++++
 .../main/codegen/templates/ComplexCopier.java   | 133 +++
 .../main/codegen/templates/ComplexReaders.java  | 183 ++++
 .../main/codegen/templates/ComplexWriters.java  | 151 ++++
 .../codegen/templates/FixedValueVectors.java    | 813 +++++++++++++++++
 .../codegen/templates/HolderReaderImpl.java     | 290 +++++++
 .../src/main/codegen/templates/ListWriters.java | 234 +++++
 .../src/main/codegen/templates/MapWriters.java  | 240 ++++++
 .../src/main/codegen/templates/NullReader.java  | 138 +++
 .../codegen/templates/NullableValueVectors.java | 630 ++++++++++++++
 .../codegen/templates/RepeatedValueVectors.java | 421 +++++++++
 .../main/codegen/templates/UnionListWriter.java | 185 ++++
 .../src/main/codegen/templates/UnionReader.java | 194 +++++
 .../src/main/codegen/templates/UnionVector.java | 467 ++++++++++
 .../src/main/codegen/templates/UnionWriter.java | 228 +++++
 .../main/codegen/templates/ValueHolders.java    | 116 +++
 .../templates/VariableLengthVectors.java        | 644 ++++++++++++++
 .../org/apache/arrow/vector/AddOrGetResult.java |  38 +
 .../apache/arrow/vector/AllocationHelper.java   |  61 ++
 .../arrow/vector/BaseDataValueVector.java       |  91 ++
 .../apache/arrow/vector/BaseValueVector.java    | 125 +++
 .../java/org/apache/arrow/vector/BitVector.java | 450 ++++++++++
 .../apache/arrow/vector/FixedWidthVector.java   |  35 +
 .../org/apache/arrow/vector/NullableVector.java |  23 +
 .../vector/NullableVectorDefinitionSetter.java  |  23 +
 .../org/apache/arrow/vector/ObjectVector.java   | 220 +++++
 .../arrow/vector/SchemaChangeCallBack.java      |  52 ++
 .../apache/arrow/vector/ValueHolderHelper.java  | 203 +++++
 .../org/apache/arrow/vector/ValueVector.java    | 222 +++++
 .../arrow/vector/VariableWidthVector.java       |  51 ++
 .../apache/arrow/vector/VectorDescriptor.java   |  83 ++
 .../org/apache/arrow/vector/VectorTrimmer.java  |  33 +
 .../org/apache/arrow/vector/ZeroVector.java     | 181 ++++
 .../vector/complex/AbstractContainerVector.java | 143 +++
 .../arrow/vector/complex/AbstractMapVector.java | 278 ++++++
 .../vector/complex/BaseRepeatedValueVector.java | 260 ++++++
 .../vector/complex/ContainerVectorLike.java     |  43 +
 .../vector/complex/EmptyValuePopulator.java     |  54 ++
 .../apache/arrow/vector/complex/ListVector.java | 321 +++++++
 .../apache/arrow/vector/complex/MapVector.java  | 374 ++++++++
 .../arrow/vector/complex/Positionable.java      |  22 +
 .../complex/RepeatedFixedWidthVectorLike.java   |  40 +
 .../vector/complex/RepeatedListVector.java      | 428 +++++++++
 .../arrow/vector/complex/RepeatedMapVector.java | 584 +++++++++++++
 .../vector/complex/RepeatedValueVector.java     |  85 ++
 .../RepeatedVariableWidthVectorLike.java        |  35 +
 .../apache/arrow/vector/complex/StateTool.java  |  34 +
 .../arrow/vector/complex/VectorWithOrdinal.java |  30 +
 .../vector/complex/impl/AbstractBaseReader.java | 100 +++
 .../vector/complex/impl/AbstractBaseWriter.java |  59 ++
 .../vector/complex/impl/ComplexWriterImpl.java  | 193 +++++
 .../complex/impl/MapOrListWriterImpl.java       | 112 +++
 .../vector/complex/impl/PromotableWriter.java   | 196 +++++
 .../complex/impl/RepeatedListReaderImpl.java    | 145 ++++
 .../complex/impl/RepeatedMapReaderImpl.java     | 192 +++++
 .../impl/SingleLikeRepeatedMapReaderImpl.java   |  89 ++
 .../complex/impl/SingleListReaderImpl.java      |  88 ++
 .../complex/impl/SingleMapReaderImpl.java       | 108 +++
 .../vector/complex/impl/UnionListReader.java    |  98 +++
 .../vector/complex/reader/FieldReader.java      |  29 +
 .../vector/complex/writer/FieldWriter.java      |  27 +
 .../arrow/vector/holders/ComplexHolder.java     |  25 +
 .../arrow/vector/holders/ObjectHolder.java      |  38 +
 .../vector/holders/RepeatedListHolder.java      |  23 +
 .../arrow/vector/holders/RepeatedMapHolder.java |  23 +
 .../arrow/vector/holders/UnionHolder.java       |  37 +
 .../arrow/vector/holders/ValueHolder.java       |  31 +
 .../arrow/vector/types/MaterializedField.java   | 217 +++++
 .../org/apache/arrow/vector/types/Types.java    | 132 +++
 .../arrow/vector/util/ByteFunctionHelpers.java  | 233 +++++
 .../org/apache/arrow/vector/util/CallBack.java  |  23 +
 .../arrow/vector/util/CoreDecimalUtility.java   |  91 ++
 .../apache/arrow/vector/util/DateUtility.java   | 682 +++++++++++++++
 .../arrow/vector/util/DecimalUtility.java       | 737 ++++++++++++++++
 .../arrow/vector/util/JsonStringArrayList.java  |  57 ++
 .../arrow/vector/util/JsonStringHashMap.java    |  76 ++
 .../arrow/vector/util/MapWithOrdinal.java       | 248 ++++++
 .../util/OversizedAllocationException.java      |  49 ++
 .../util/SchemaChangeRuntimeException.java      |  41 +
 .../java/org/apache/arrow/vector/util/Text.java | 621 +++++++++++++
 .../apache/arrow/vector/util/TransferPair.java  |  27 +
 124 files changed, 22077 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/fa5f0299/java/.gitignore
----------------------------------------------------------------------
diff --git a/java/.gitignore b/java/.gitignore
new file mode 100644
index 0000000..73c1be4
--- /dev/null
+++ b/java/.gitignore
@@ -0,0 +1,22 @@
+.project
+.buildpath
+.classpath
+.checkstyle
+.settings/
+.idea/
+TAGS
+*.log
+*.lck
+*.iml
+target/
+*.DS_Store
+*.patch
+*~
+git.properties
+contrib/native/client/build/
+contrib/native/client/build/*
+CMakeCache.txt
+CMakeFiles
+Makefile
+cmake_install.cmake
+install_manifest.txt

http://git-wip-us.apache.org/repos/asf/arrow/blob/fa5f0299/java/memory/pom.xml
----------------------------------------------------------------------
diff --git a/java/memory/pom.xml b/java/memory/pom.xml
new file mode 100644
index 0000000..44332f5
--- /dev/null
+++ b/java/memory/pom.xml
@@ -0,0 +1,50 @@
+<?xml version="1.0"?>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor 
+  license agreements. See the NOTICE file distributed with this work for additional 
+  information regarding copyright ownership. The ASF licenses this file to 
+  You under the Apache License, Version 2.0 (the "License"); you may not use 
+  this file except in compliance with the License. You may obtain a copy of 
+  the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required 
+  by applicable law or agreed to in writing, software distributed under the 
+  License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS 
+  OF ANY KIND, either express or implied. See the License for the specific 
+  language governing permissions and limitations under the License. -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.arrow</groupId>
+    <artifactId>arrow-java-root</artifactId>
+    <version>0.1-SNAPSHOT</version>
+  </parent>
+  <artifactId>arrow-memory</artifactId>
+  <name>arrow-memory</name>
+
+  <dependencies>
+
+    <dependency>
+      <groupId>com.codahale.metrics</groupId>
+      <artifactId>metrics-core</artifactId>
+      <version>3.0.1</version>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.code.findbugs</groupId>
+      <artifactId>jsr305</artifactId>
+      <version>3.0.1</version>
+    </dependency>
+
+    <dependency>
+      <groupId>com.carrotsearch</groupId>
+      <artifactId>hppc</artifactId>
+      <version>0.7.1</version>
+    </dependency>
+  </dependencies>
+
+
+  <build>
+  </build>
+
+
+
+</project>

http://git-wip-us.apache.org/repos/asf/arrow/blob/fa5f0299/java/memory/src/main/java/io/netty/buffer/ArrowBuf.java
----------------------------------------------------------------------
diff --git a/java/memory/src/main/java/io/netty/buffer/ArrowBuf.java b/java/memory/src/main/java/io/netty/buffer/ArrowBuf.java
new file mode 100644
index 0000000..f033ba6
--- /dev/null
+++ b/java/memory/src/main/java/io/netty/buffer/ArrowBuf.java
@@ -0,0 +1,863 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.netty.buffer;
+
+import io.netty.util.internal.PlatformDependent;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.GatheringByteChannel;
+import java.nio.channels.ScatteringByteChannel;
+import java.nio.charset.Charset;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.arrow.memory.BaseAllocator;
+import org.apache.arrow.memory.BoundsChecking;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.BufferManager;
+import org.apache.arrow.memory.AllocationManager.BufferLedger;
+import org.apache.arrow.memory.BaseAllocator.Verbosity;
+import org.apache.arrow.memory.util.HistoricalLog;
+
+import com.google.common.base.Preconditions;
+
+public final class ArrowBuf extends AbstractByteBuf implements AutoCloseable {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ArrowBuf.class);
+
+  private static final AtomicLong idGenerator = new AtomicLong(0);
+
+  private final long id = idGenerator.incrementAndGet();
+  private final AtomicInteger refCnt;
+  private final UnsafeDirectLittleEndian udle;
+  private final long addr;
+  private final int offset;
+  private final BufferLedger ledger;
+  private final BufferManager bufManager;
+  private final ByteBufAllocator alloc;
+  private final boolean isEmpty;
+  private volatile int length;
+  private final HistoricalLog historicalLog = BaseAllocator.DEBUG ?
+      new HistoricalLog(BaseAllocator.DEBUG_LOG_LENGTH, "DrillBuf[%d]", id) : null;
+
+  public ArrowBuf(
+      final AtomicInteger refCnt,
+      final BufferLedger ledger,
+      final UnsafeDirectLittleEndian byteBuf,
+      final BufferManager manager,
+      final ByteBufAllocator alloc,
+      final int offset,
+      final int length,
+      boolean isEmpty) {
+    super(byteBuf.maxCapacity());
+    this.refCnt = refCnt;
+    this.udle = byteBuf;
+    this.isEmpty = isEmpty;
+    this.bufManager = manager;
+    this.alloc = alloc;
+    this.addr = byteBuf.memoryAddress() + offset;
+    this.ledger = ledger;
+    this.length = length;
+    this.offset = offset;
+
+    if (BaseAllocator.DEBUG) {
+      historicalLog.recordEvent("create()");
+    }
+
+  }
+
+  public ArrowBuf reallocIfNeeded(final int size) {
+    Preconditions.checkArgument(size >= 0, "reallocation size must be non-negative");
+
+    if (this.capacity() >= size) {
+      return this;
+    }
+
+    if (bufManager != null) {
+      return bufManager.replace(this, size);
+    } else {
+      throw new UnsupportedOperationException("Realloc is only available in the context of an operator's UDFs");
+    }
+  }
+
+  @Override
+  public int refCnt() {
+    if (isEmpty) {
+      return 1;
+    } else {
+      return refCnt.get();
+    }
+  }
+
+  private long addr(int index) {
+    return addr + index;
+  }
+
+  private final void checkIndexD(int index, int fieldLength) {
+    ensureAccessible();
+    if (fieldLength < 0) {
+      throw new IllegalArgumentException("length: " + fieldLength + " (expected: >= 0)");
+    }
+    if (index < 0 || index > capacity() - fieldLength) {
+      if (BaseAllocator.DEBUG) {
+        historicalLog.logHistory(logger);
+      }
+      throw new IndexOutOfBoundsException(String.format(
+          "index: %d, length: %d (expected: range(0, %d))", index, fieldLength, capacity()));
+    }
+  }
+
+  /**
+   * Allows a function to determine whether not reading a particular string of bytes is valid.
+   *
+   * Will throw an exception if the memory is not readable for some reason. Only doesn't something in the case that
+   * AssertionUtil.BOUNDS_CHECKING_ENABLED is true.
+   *
+   * @param start
+   *          The starting position of the bytes to be read.
+   * @param end
+   *          The exclusive endpoint of the bytes to be read.
+   */
+  public void checkBytes(int start, int end) {
+    if (BoundsChecking.BOUNDS_CHECKING_ENABLED) {
+      checkIndexD(start, end - start);
+    }
+  }
+
+  private void chk(int index, int width) {
+    if (BoundsChecking.BOUNDS_CHECKING_ENABLED) {
+      checkIndexD(index, width);
+    }
+  }
+
+  private void ensure(int width) {
+    if (BoundsChecking.BOUNDS_CHECKING_ENABLED) {
+      ensureWritable(width);
+    }
+  }
+
+  /**
+   * Create a new DrillBuf that is associated with an alternative allocator for the purposes of memory ownership and
+   * accounting. This has no impact on the reference counting for the current DrillBuf except in the situation where the
+   * passed in Allocator is the same as the current buffer.
+   *
+   * This operation has no impact on the reference count of this DrillBuf. The newly created DrillBuf with either have a
+   * reference count of 1 (in the case that this is the first time this memory is being associated with the new
+   * allocator) or the current value of the reference count + 1 for the other AllocationManager/BufferLedger combination
+   * in the case that the provided allocator already had an association to this underlying memory.
+   *
+   * @param target
+   *          The target allocator to create an association with.
+   * @return A new DrillBuf which shares the same underlying memory as this DrillBuf.
+   */
+  public ArrowBuf retain(BufferAllocator target) {
+
+    if (isEmpty) {
+      return this;
+    }
+
+    if (BaseAllocator.DEBUG) {
+      historicalLog.recordEvent("retain(%s)", target.getName());
+    }
+    final BufferLedger otherLedger = this.ledger.getLedgerForAllocator(target);
+    return otherLedger.newDrillBuf(offset, length, null);
+  }
+
+  /**
+   * Transfer the memory accounting ownership of this DrillBuf to another allocator. This will generate a new DrillBuf
+   * that carries an association with the underlying memory of this DrillBuf. If this DrillBuf is connected to the
+   * owning BufferLedger of this memory, that memory ownership/accounting will be transferred to the taret allocator. If
+   * this DrillBuf does not currently own the memory underlying it (and is only associated with it), this does not
+   * transfer any ownership to the newly created DrillBuf.
+   *
+   * This operation has no impact on the reference count of this DrillBuf. The newly created DrillBuf with either have a
+   * reference count of 1 (in the case that this is the first time this memory is being associated with the new
+   * allocator) or the current value of the reference count for the other AllocationManager/BufferLedger combination in
+   * the case that the provided allocator already had an association to this underlying memory.
+   *
+   * Transfers will always succeed, even if that puts the other allocator into an overlimit situation. This is possible
+   * due to the fact that the original owning allocator may have allocated this memory out of a local reservation
+   * whereas the target allocator may need to allocate new memory from a parent or RootAllocator. This operation is done
+   * in a mostly-lockless but consistent manner. As such, the overlimit==true situation could occur slightly prematurely
+   * to an actual overlimit==true condition. This is simply conservative behavior which means we may return overlimit
+   * slightly sooner than is necessary.
+   *
+   * @param target
+   *          The allocator to transfer ownership to.
+   * @return A new transfer result with the impact of the transfer (whether it was overlimit) as well as the newly
+   *         created DrillBuf.
+   */
+  public TransferResult transferOwnership(BufferAllocator target) {
+
+    if (isEmpty) {
+      return new TransferResult(true, this);
+    }
+
+    final BufferLedger otherLedger = this.ledger.getLedgerForAllocator(target);
+    final ArrowBuf newBuf = otherLedger.newDrillBuf(offset, length, null);
+    final boolean allocationFit = this.ledger.transferBalance(otherLedger);
+    return new TransferResult(allocationFit, newBuf);
+  }
+
+  /**
+   * The outcome of a Transfer.
+   */
+  public class TransferResult {
+
+    /**
+     * Whether this transfer fit within the target allocator's capacity.
+     */
+    public final boolean allocationFit;
+
+    /**
+     * The newly created buffer associated with the target allocator.
+     */
+    public final ArrowBuf buffer;
+
+    private TransferResult(boolean allocationFit, ArrowBuf buffer) {
+      this.allocationFit = allocationFit;
+      this.buffer = buffer;
+    }
+
+  }
+
+  @Override
+  public boolean release() {
+    return release(1);
+  }
+
+  /**
+   * Release the provided number of reference counts.
+   */
+  @Override
+  public boolean release(int decrement) {
+
+    if (isEmpty) {
+      return false;
+    }
+
+    if (decrement < 1) {
+      throw new IllegalStateException(String.format("release(%d) argument is not positive. Buffer Info: %s",
+          decrement, toVerboseString()));
+    }
+
+    final int refCnt = ledger.decrement(decrement);
+
+    if (BaseAllocator.DEBUG) {
+      historicalLog.recordEvent("release(%d). original value: %d", decrement, refCnt + decrement);
+    }
+
+    if (refCnt < 0) {
+      throw new IllegalStateException(
+          String.format("DrillBuf[%d] refCnt has gone negative. Buffer Info: %s", id, toVerboseString()));
+    }
+
+    return refCnt == 0;
+
+  }
+
+  @Override
+  public int capacity() {
+    return length;
+  }
+
+  @Override
+  public synchronized ArrowBuf capacity(int newCapacity) {
+
+    if (newCapacity == length) {
+      return this;
+    }
+
+    Preconditions.checkArgument(newCapacity >= 0);
+
+    if (newCapacity < length) {
+      length = newCapacity;
+      return this;
+    }
+
+    throw new UnsupportedOperationException("Buffers don't support resizing that increases the size.");
+  }
+
+  @Override
+  public ByteBufAllocator alloc() {
+    return udle.alloc();
+  }
+
+  @Override
+  public ByteOrder order() {
+    return ByteOrder.LITTLE_ENDIAN;
+  }
+
+  @Override
+  public ByteBuf order(ByteOrder endianness) {
+    return this;
+  }
+
+  @Override
+  public ByteBuf unwrap() {
+    return udle;
+  }
+
+  @Override
+  public boolean isDirect() {
+    return true;
+  }
+
+  @Override
+  public ByteBuf readBytes(int length) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public ByteBuf readSlice(int length) {
+    final ByteBuf slice = slice(readerIndex(), length);
+    readerIndex(readerIndex() + length);
+    return slice;
+  }
+
+  @Override
+  public ByteBuf copy() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public ByteBuf copy(int index, int length) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public ByteBuf slice() {
+    return slice(readerIndex(), readableBytes());
+  }
+
+  public static String bufferState(final ByteBuf buf) {
+    final int cap = buf.capacity();
+    final int mcap = buf.maxCapacity();
+    final int ri = buf.readerIndex();
+    final int rb = buf.readableBytes();
+    final int wi = buf.writerIndex();
+    final int wb = buf.writableBytes();
+    return String.format("cap/max: %d/%d, ri: %d, rb: %d, wi: %d, wb: %d",
+        cap, mcap, ri, rb, wi, wb);
+  }
+
+  @Override
+  public ArrowBuf slice(int index, int length) {
+
+    if (isEmpty) {
+      return this;
+    }
+
+    /*
+     * Re the behavior of reference counting, see http://netty.io/wiki/reference-counted-objects.html#wiki-h3-5, which
+     * explains that derived buffers share their reference count with their parent
+     */
+    final ArrowBuf newBuf = ledger.newDrillBuf(offset + index, length);
+    newBuf.writerIndex(length);
+    return newBuf;
+  }
+
+  @Override
+  public ArrowBuf duplicate() {
+    return slice(0, length);
+  }
+
+  @Override
+  public int nioBufferCount() {
+    return 1;
+  }
+
+  @Override
+  public ByteBuffer nioBuffer() {
+    return nioBuffer(readerIndex(), readableBytes());
+  }
+
+  @Override
+  public ByteBuffer nioBuffer(int index, int length) {
+    return udle.nioBuffer(offset + index, length);
+  }
+
+  @Override
+  public ByteBuffer internalNioBuffer(int index, int length) {
+    return udle.internalNioBuffer(offset + index, length);
+  }
+
+  @Override
+  public ByteBuffer[] nioBuffers() {
+    return new ByteBuffer[] { nioBuffer() };
+  }
+
+  @Override
+  public ByteBuffer[] nioBuffers(int index, int length) {
+    return new ByteBuffer[] { nioBuffer(index, length) };
+  }
+
+  @Override
+  public boolean hasArray() {
+    return udle.hasArray();
+  }
+
+  @Override
+  public byte[] array() {
+    return udle.array();
+  }
+
+  @Override
+  public int arrayOffset() {
+    return udle.arrayOffset();
+  }
+
+  @Override
+  public boolean hasMemoryAddress() {
+    return true;
+  }
+
+  @Override
+  public long memoryAddress() {
+    return this.addr;
+  }
+
+  @Override
+  public String toString() {
+    return String.format("DrillBuf[%d], udle: [%d %d..%d]", id, udle.id, offset, offset + capacity());
+  }
+
+  @Override
+  public String toString(Charset charset) {
+    return toString(readerIndex, readableBytes(), charset);
+  }
+
+  @Override
+  public String toString(int index, int length, Charset charset) {
+
+    if (length == 0) {
+      return "";
+    }
+
+    return ByteBufUtil.decodeString(nioBuffer(index, length), charset);
+  }
+
+  @Override
+  public int hashCode() {
+    return System.identityHashCode(this);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    // identity equals only.
+    return this == obj;
+  }
+
+  @Override
+  public ByteBuf retain(int increment) {
+    Preconditions.checkArgument(increment > 0, "retain(%d) argument is not positive", increment);
+
+    if (isEmpty) {
+      return this;
+    }
+
+    if (BaseAllocator.DEBUG) {
+      historicalLog.recordEvent("retain(%d)", increment);
+    }
+
+    final int originalReferenceCount = refCnt.getAndAdd(increment);
+    Preconditions.checkArgument(originalReferenceCount > 0);
+    return this;
+  }
+
+  @Override
+  public ByteBuf retain() {
+    return retain(1);
+  }
+
+  @Override
+  public long getLong(int index) {
+    chk(index, 8);
+    final long v = PlatformDependent.getLong(addr(index));
+    return v;
+  }
+
+  @Override
+  public float getFloat(int index) {
+    return Float.intBitsToFloat(getInt(index));
+  }
+
+  @Override
+  public double getDouble(int index) {
+    return Double.longBitsToDouble(getLong(index));
+  }
+
+  @Override
+  public char getChar(int index) {
+    return (char) getShort(index);
+  }
+
+  @Override
+  public long getUnsignedInt(int index) {
+    return getInt(index) & 0xFFFFFFFFL;
+  }
+
+  @Override
+  public int getInt(int index) {
+    chk(index, 4);
+    final int v = PlatformDependent.getInt(addr(index));
+    return v;
+  }
+
+  @Override
+  public int getUnsignedShort(int index) {
+    return getShort(index) & 0xFFFF;
+  }
+
+  @Override
+  public short getShort(int index) {
+    chk(index, 2);
+    short v = PlatformDependent.getShort(addr(index));
+    return v;
+  }
+
+  @Override
+  public ByteBuf setShort(int index, int value) {
+    chk(index, 2);
+    PlatformDependent.putShort(addr(index), (short) value);
+    return this;
+  }
+
+  @Override
+  public ByteBuf setInt(int index, int value) {
+    chk(index, 4);
+    PlatformDependent.putInt(addr(index), value);
+    return this;
+  }
+
+  @Override
+  public ByteBuf setLong(int index, long value) {
+    chk(index, 8);
+    PlatformDependent.putLong(addr(index), value);
+    return this;
+  }
+
+  @Override
+  public ByteBuf setChar(int index, int value) {
+    chk(index, 2);
+    PlatformDependent.putShort(addr(index), (short) value);
+    return this;
+  }
+
+  @Override
+  public ByteBuf setFloat(int index, float value) {
+    chk(index, 4);
+    PlatformDependent.putInt(addr(index), Float.floatToRawIntBits(value));
+    return this;
+  }
+
+  @Override
+  public ByteBuf setDouble(int index, double value) {
+    chk(index, 8);
+    PlatformDependent.putLong(addr(index), Double.doubleToRawLongBits(value));
+    return this;
+  }
+
+  @Override
+  public ByteBuf writeShort(int value) {
+    ensure(2);
+    PlatformDependent.putShort(addr(writerIndex), (short) value);
+    writerIndex += 2;
+    return this;
+  }
+
+  @Override
+  public ByteBuf writeInt(int value) {
+    ensure(4);
+    PlatformDependent.putInt(addr(writerIndex), value);
+    writerIndex += 4;
+    return this;
+  }
+
+  @Override
+  public ByteBuf writeLong(long value) {
+    ensure(8);
+    PlatformDependent.putLong(addr(writerIndex), value);
+    writerIndex += 8;
+    return this;
+  }
+
+  @Override
+  public ByteBuf writeChar(int value) {
+    ensure(2);
+    PlatformDependent.putShort(addr(writerIndex), (short) value);
+    writerIndex += 2;
+    return this;
+  }
+
+  @Override
+  public ByteBuf writeFloat(float value) {
+    ensure(4);
+    PlatformDependent.putInt(addr(writerIndex), Float.floatToRawIntBits(value));
+    writerIndex += 4;
+    return this;
+  }
+
+  @Override
+  public ByteBuf writeDouble(double value) {
+    ensure(8);
+    PlatformDependent.putLong(addr(writerIndex), Double.doubleToRawLongBits(value));
+    writerIndex += 8;
+    return this;
+  }
+
+  @Override
+  public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
+    udle.getBytes(index + offset, dst, dstIndex, length);
+    return this;
+  }
+
+  @Override
+  public ByteBuf getBytes(int index, ByteBuffer dst) {
+    udle.getBytes(index + offset, dst);
+    return this;
+  }
+
+  @Override
+  public ByteBuf setByte(int index, int value) {
+    chk(index, 1);
+    PlatformDependent.putByte(addr(index), (byte) value);
+    return this;
+  }
+
+  public void setByte(int index, byte b) {
+    chk(index, 1);
+    PlatformDependent.putByte(addr(index), b);
+  }
+
+  public void writeByteUnsafe(byte b) {
+    PlatformDependent.putByte(addr(readerIndex), b);
+    readerIndex++;
+  }
+
+  @Override
+  protected byte _getByte(int index) {
+    return getByte(index);
+  }
+
+  @Override
+  protected short _getShort(int index) {
+    return getShort(index);
+  }
+
+  @Override
+  protected int _getInt(int index) {
+    return getInt(index);
+  }
+
+  @Override
+  protected long _getLong(int index) {
+    return getLong(index);
+  }
+
+  @Override
+  protected void _setByte(int index, int value) {
+    setByte(index, value);
+  }
+
+  @Override
+  protected void _setShort(int index, int value) {
+    setShort(index, value);
+  }
+
+  @Override
+  protected void _setMedium(int index, int value) {
+    setMedium(index, value);
+  }
+
+  @Override
+  protected void _setInt(int index, int value) {
+    setInt(index, value);
+  }
+
+  @Override
+  protected void _setLong(int index, long value) {
+    setLong(index, value);
+  }
+
+  @Override
+  public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) {
+    udle.getBytes(index + offset, dst, dstIndex, length);
+    return this;
+  }
+
+  @Override
+  public ByteBuf getBytes(int index, OutputStream out, int length) throws IOException {
+    udle.getBytes(index + offset, out, length);
+    return this;
+  }
+
+  @Override
+  protected int _getUnsignedMedium(int index) {
+    final long addr = addr(index);
+    return (PlatformDependent.getByte(addr) & 0xff) << 16 |
+        (PlatformDependent.getByte(addr + 1) & 0xff) << 8 |
+        PlatformDependent.getByte(addr + 2) & 0xff;
+  }
+
+  @Override
+  public int getBytes(int index, GatheringByteChannel out, int length) throws IOException {
+    return udle.getBytes(index + offset, out, length);
+  }
+
+  @Override
+  public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
+    udle.setBytes(index + offset, src, srcIndex, length);
+    return this;
+  }
+
+  public ByteBuf setBytes(int index, ByteBuffer src, int srcIndex, int length) {
+    if (src.isDirect()) {
+      checkIndex(index, length);
+      PlatformDependent.copyMemory(PlatformDependent.directBufferAddress(src) + srcIndex, this.memoryAddress() + index,
+          length);
+    } else {
+      if (srcIndex == 0 && src.capacity() == length) {
+        udle.setBytes(index + offset, src);
+      } else {
+        ByteBuffer newBuf = src.duplicate();
+        newBuf.position(srcIndex);
+        newBuf.limit(srcIndex + length);
+        udle.setBytes(index + offset, src);
+      }
+    }
+
+    return this;
+  }
+
+  @Override
+  public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
+    udle.setBytes(index + offset, src, srcIndex, length);
+    return this;
+  }
+
+  @Override
+  public ByteBuf setBytes(int index, ByteBuffer src) {
+    udle.setBytes(index + offset, src);
+    return this;
+  }
+
+  @Override
+  public int setBytes(int index, InputStream in, int length) throws IOException {
+    return udle.setBytes(index + offset, in, length);
+  }
+
+  @Override
+  public int setBytes(int index, ScatteringByteChannel in, int length) throws IOException {
+    return udle.setBytes(index + offset, in, length);
+  }
+
+  @Override
+  public byte getByte(int index) {
+    chk(index, 1);
+    return PlatformDependent.getByte(addr(index));
+  }
+
+  @Override
+  public void close() {
+    release();
+  }
+
+  /**
+   * Returns the possible memory consumed by this DrillBuf in the worse case scenario. (not shared, connected to larger
+   * underlying buffer of allocated memory)
+   *
+   * @return Size in bytes.
+   */
+  public int getPossibleMemoryConsumed() {
+    return ledger.getSize();
+  }
+
+  /**
+   * Return that is Accounted for by this buffer (and its potentially shared siblings within the context of the
+   * associated allocator).
+   *
+   * @return Size in bytes.
+   */
+  public int getActualMemoryConsumed() {
+    return ledger.getAccountedSize();
+  }
+
+  private final static int LOG_BYTES_PER_ROW = 10;
+
+  /**
+   * Return the buffer's byte contents in the form of a hex dump.
+   *
+   * @param start
+   *          the starting byte index
+   * @param length
+   *          how many bytes to log
+   * @return A hex dump in a String.
+   */
+  public String toHexString(final int start, final int length) {
+    final int roundedStart = (start / LOG_BYTES_PER_ROW) * LOG_BYTES_PER_ROW;
+
+    final StringBuilder sb = new StringBuilder("buffer byte dump\n");
+    int index = roundedStart;
+    for (int nLogged = 0; nLogged < length; nLogged += LOG_BYTES_PER_ROW) {
+      sb.append(String.format(" [%05d-%05d]", index, index + LOG_BYTES_PER_ROW - 1));
+      for (int i = 0; i < LOG_BYTES_PER_ROW; ++i) {
+        try {
+          final byte b = getByte(index++);
+          sb.append(String.format(" 0x%02x", b));
+        } catch (IndexOutOfBoundsException ioob) {
+          sb.append(" <ioob>");
+        }
+      }
+      sb.append('\n');
+    }
+    return sb.toString();
+  }
+
+  /**
+   * Get the integer id assigned to this DrillBuf for debugging purposes.
+   *
+   * @return integer id
+   */
+  public long getId() {
+    return id;
+  }
+
+  public String toVerboseString() {
+    if (isEmpty) {
+      return toString();
+    }
+
+    StringBuilder sb = new StringBuilder();
+    ledger.print(sb, 0, Verbosity.LOG_WITH_STACKTRACE);
+    return sb.toString();
+  }
+
+  public void print(StringBuilder sb, int indent, Verbosity verbosity) {
+    BaseAllocator.indent(sb, indent).append(toString());
+
+    if (BaseAllocator.DEBUG && !isEmpty && verbosity.includeHistoricalLog) {
+      sb.append("\n");
+      historicalLog.buildHistory(sb, indent + 1, verbosity.includeStackTraces);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/arrow/blob/fa5f0299/java/memory/src/main/java/io/netty/buffer/ExpandableByteBuf.java
----------------------------------------------------------------------
diff --git a/java/memory/src/main/java/io/netty/buffer/ExpandableByteBuf.java b/java/memory/src/main/java/io/netty/buffer/ExpandableByteBuf.java
new file mode 100644
index 0000000..5988647
--- /dev/null
+++ b/java/memory/src/main/java/io/netty/buffer/ExpandableByteBuf.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.netty.buffer;
+
+import org.apache.arrow.memory.BufferAllocator;
+
+/**
+ * Allows us to decorate DrillBuf to make it expandable so that we can use them in the context of the Netty framework
+ * (thus supporting RPC level memory accounting).
+ */
+public class ExpandableByteBuf extends MutableWrappedByteBuf {
+
+  private final BufferAllocator allocator;
+
+  public ExpandableByteBuf(ByteBuf buffer, BufferAllocator allocator) {
+    super(buffer);
+    this.allocator = allocator;
+  }
+
+  @Override
+  public ByteBuf copy(int index, int length) {
+    return new ExpandableByteBuf(buffer.copy(index, length), allocator);
+  }
+
+  @Override
+  public ByteBuf capacity(int newCapacity) {
+    if (newCapacity > capacity()) {
+      ByteBuf newBuf = allocator.buffer(newCapacity);
+      newBuf.writeBytes(buffer, 0, buffer.capacity());
+      newBuf.readerIndex(buffer.readerIndex());
+      newBuf.writerIndex(buffer.writerIndex());
+      buffer.release();
+      buffer = newBuf;
+      return newBuf;
+    } else {
+      return super.capacity(newCapacity);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/arrow/blob/fa5f0299/java/memory/src/main/java/io/netty/buffer/LargeBuffer.java
----------------------------------------------------------------------
diff --git a/java/memory/src/main/java/io/netty/buffer/LargeBuffer.java b/java/memory/src/main/java/io/netty/buffer/LargeBuffer.java
new file mode 100644
index 0000000..5f5e904
--- /dev/null
+++ b/java/memory/src/main/java/io/netty/buffer/LargeBuffer.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.netty.buffer;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * A MutableWrappedByteBuf that also maintains a metric of the number of huge buffer bytes and counts.
+ */
+public class LargeBuffer extends MutableWrappedByteBuf {
+
+  private final AtomicLong hugeBufferSize;
+  private final AtomicLong hugeBufferCount;
+
+  private final int initCap;
+
+  public LargeBuffer(ByteBuf buffer, AtomicLong hugeBufferSize, AtomicLong hugeBufferCount) {
+    super(buffer);
+    initCap = buffer.capacity();
+    this.hugeBufferCount = hugeBufferCount;
+    this.hugeBufferSize = hugeBufferSize;
+  }
+
+  @Override
+  public ByteBuf copy(int index, int length) {
+    return new LargeBuffer(buffer.copy(index, length), hugeBufferSize, hugeBufferCount);
+  }
+
+  @Override
+  public boolean release() {
+    return release(1);
+  }
+
+  @Override
+  public boolean release(int decrement) {
+    boolean released = unwrap().release(decrement);
+    if (released) {
+      hugeBufferSize.addAndGet(-initCap);
+      hugeBufferCount.decrementAndGet();
+    }
+    return released;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/arrow/blob/fa5f0299/java/memory/src/main/java/io/netty/buffer/MutableWrappedByteBuf.java
----------------------------------------------------------------------
diff --git a/java/memory/src/main/java/io/netty/buffer/MutableWrappedByteBuf.java b/java/memory/src/main/java/io/netty/buffer/MutableWrappedByteBuf.java
new file mode 100644
index 0000000..5709473
--- /dev/null
+++ b/java/memory/src/main/java/io/netty/buffer/MutableWrappedByteBuf.java
@@ -0,0 +1,336 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.netty.buffer;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.GatheringByteChannel;
+import java.nio.channels.ScatteringByteChannel;
+
+/**
+ * This is basically a complete copy of DuplicatedByteBuf. We copy because we want to override some behaviors and make
+ * buffer mutable.
+ */
+abstract class MutableWrappedByteBuf extends AbstractByteBuf {
+
+  @Override
+  public ByteBuffer nioBuffer(int index, int length) {
+    return unwrap().nioBuffer(index, length);
+  }
+
+  ByteBuf buffer;
+
+  public MutableWrappedByteBuf(ByteBuf buffer) {
+    super(buffer.maxCapacity());
+
+    if (buffer instanceof MutableWrappedByteBuf) {
+      this.buffer = ((MutableWrappedByteBuf) buffer).buffer;
+    } else {
+      this.buffer = buffer;
+    }
+
+    setIndex(buffer.readerIndex(), buffer.writerIndex());
+  }
+
+  @Override
+  public ByteBuf unwrap() {
+    return buffer;
+  }
+
+  @Override
+  public ByteBufAllocator alloc() {
+    return buffer.alloc();
+  }
+
+  @Override
+  public ByteOrder order() {
+    return buffer.order();
+  }
+
+  @Override
+  public boolean isDirect() {
+    return buffer.isDirect();
+  }
+
+  @Override
+  public int capacity() {
+    return buffer.capacity();
+  }
+
+  @Override
+  public ByteBuf capacity(int newCapacity) {
+    buffer.capacity(newCapacity);
+    return this;
+  }
+
+  @Override
+  public boolean hasArray() {
+    return buffer.hasArray();
+  }
+
+  @Override
+  public byte[] array() {
+    return buffer.array();
+  }
+
+  @Override
+  public int arrayOffset() {
+    return buffer.arrayOffset();
+  }
+
+  @Override
+  public boolean hasMemoryAddress() {
+    return buffer.hasMemoryAddress();
+  }
+
+  @Override
+  public long memoryAddress() {
+    return buffer.memoryAddress();
+  }
+
+  @Override
+  public byte getByte(int index) {
+    return _getByte(index);
+  }
+
+  @Override
+  protected byte _getByte(int index) {
+    return buffer.getByte(index);
+  }
+
+  @Override
+  public short getShort(int index) {
+    return _getShort(index);
+  }
+
+  @Override
+  protected short _getShort(int index) {
+    return buffer.getShort(index);
+  }
+
+  @Override
+  public int getUnsignedMedium(int index) {
+    return _getUnsignedMedium(index);
+  }
+
+  @Override
+  protected int _getUnsignedMedium(int index) {
+    return buffer.getUnsignedMedium(index);
+  }
+
+  @Override
+  public int getInt(int index) {
+    return _getInt(index);
+  }
+
+  @Override
+  protected int _getInt(int index) {
+    return buffer.getInt(index);
+  }
+
+  @Override
+  public long getLong(int index) {
+    return _getLong(index);
+  }
+
+  @Override
+  protected long _getLong(int index) {
+    return buffer.getLong(index);
+  }
+
+  @Override
+  public abstract ByteBuf copy(int index, int length);
+
+  @Override
+  public ByteBuf slice(int index, int length) {
+    return new SlicedByteBuf(this, index, length);
+  }
+
+  @Override
+  public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) {
+    buffer.getBytes(index, dst, dstIndex, length);
+    return this;
+  }
+
+  @Override
+  public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
+    buffer.getBytes(index, dst, dstIndex, length);
+    return this;
+  }
+
+  @Override
+  public ByteBuf getBytes(int index, ByteBuffer dst) {
+    buffer.getBytes(index, dst);
+    return this;
+  }
+
+  @Override
+  public ByteBuf setByte(int index, int value) {
+    _setByte(index, value);
+    return this;
+  }
+
+  @Override
+  protected void _setByte(int index, int value) {
+    buffer.setByte(index, value);
+  }
+
+  @Override
+  public ByteBuf setShort(int index, int value) {
+    _setShort(index, value);
+    return this;
+  }
+
+  @Override
+  protected void _setShort(int index, int value) {
+    buffer.setShort(index, value);
+  }
+
+  @Override
+  public ByteBuf setMedium(int index, int value) {
+    _setMedium(index, value);
+    return this;
+  }
+
+  @Override
+  protected void _setMedium(int index, int value) {
+    buffer.setMedium(index, value);
+  }
+
+  @Override
+  public ByteBuf setInt(int index, int value) {
+    _setInt(index, value);
+    return this;
+  }
+
+  @Override
+  protected void _setInt(int index, int value) {
+    buffer.setInt(index, value);
+  }
+
+  @Override
+  public ByteBuf setLong(int index, long value) {
+    _setLong(index, value);
+    return this;
+  }
+
+  @Override
+  protected void _setLong(int index, long value) {
+    buffer.setLong(index, value);
+  }
+
+  @Override
+  public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
+    buffer.setBytes(index, src, srcIndex, length);
+    return this;
+  }
+
+  @Override
+  public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
+    buffer.setBytes(index, src, srcIndex, length);
+    return this;
+  }
+
+  @Override
+  public ByteBuf setBytes(int index, ByteBuffer src) {
+    buffer.setBytes(index, src);
+    return this;
+  }
+
+  @Override
+  public ByteBuf getBytes(int index, OutputStream out, int length)
+      throws IOException {
+    buffer.getBytes(index, out, length);
+    return this;
+  }
+
+  @Override
+  public int getBytes(int index, GatheringByteChannel out, int length)
+      throws IOException {
+    return buffer.getBytes(index, out, length);
+  }
+
+  @Override
+  public int setBytes(int index, InputStream in, int length)
+      throws IOException {
+    return buffer.setBytes(index, in, length);
+  }
+
+  @Override
+  public int setBytes(int index, ScatteringByteChannel in, int length)
+      throws IOException {
+    return buffer.setBytes(index, in, length);
+  }
+
+  @Override
+  public int nioBufferCount() {
+    return buffer.nioBufferCount();
+  }
+
+  @Override
+  public ByteBuffer[] nioBuffers(int index, int length) {
+    return buffer.nioBuffers(index, length);
+  }
+
+  @Override
+  public ByteBuffer internalNioBuffer(int index, int length) {
+    return nioBuffer(index, length);
+  }
+
+  @Override
+  public int forEachByte(int index, int length, ByteBufProcessor processor) {
+    return buffer.forEachByte(index, length, processor);
+  }
+
+  @Override
+  public int forEachByteDesc(int index, int length, ByteBufProcessor processor) {
+    return buffer.forEachByteDesc(index, length, processor);
+  }
+
+  @Override
+  public final int refCnt() {
+    return unwrap().refCnt();
+  }
+
+  @Override
+  public final ByteBuf retain() {
+    unwrap().retain();
+    return this;
+  }
+
+  @Override
+  public final ByteBuf retain(int increment) {
+    unwrap().retain(increment);
+    return this;
+  }
+
+  @Override
+  public boolean release() {
+    return release(1);
+  }
+
+  @Override
+  public boolean release(int decrement) {
+    boolean released = unwrap().release(decrement);
+    return released;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/arrow/blob/fa5f0299/java/memory/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java
----------------------------------------------------------------------
diff --git a/java/memory/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java b/java/memory/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java
new file mode 100644
index 0000000..1610028
--- /dev/null
+++ b/java/memory/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java
@@ -0,0 +1,272 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.netty.buffer;
+
+import io.netty.util.internal.StringUtil;
+
+import java.lang.reflect.Field;
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.arrow.memory.OutOfMemoryException;
+
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Metric;
+import com.codahale.metrics.MetricFilter;
+import com.codahale.metrics.MetricRegistry;
+
+/**
+ * The base allocator that we use for all of Drill's memory management. Returns UnsafeDirectLittleEndian buffers.
+ */
+public class PooledByteBufAllocatorL {
+  private static final org.slf4j.Logger memoryLogger = org.slf4j.LoggerFactory.getLogger("drill.allocator");
+
+  private static final int MEMORY_LOGGER_FREQUENCY_SECONDS = 60;
+
+
+  public static final String METRIC_PREFIX = "drill.allocator.";
+
+  private final MetricRegistry registry;
+  private final AtomicLong hugeBufferSize = new AtomicLong(0);
+  private final AtomicLong hugeBufferCount = new AtomicLong(0);
+  private final AtomicLong normalBufferSize = new AtomicLong(0);
+  private final AtomicLong normalBufferCount = new AtomicLong(0);
+
+  private final InnerAllocator allocator;
+  public final UnsafeDirectLittleEndian empty;
+
+  public PooledByteBufAllocatorL(MetricRegistry registry) {
+    this.registry = registry;
+    allocator = new InnerAllocator();
+    empty = new UnsafeDirectLittleEndian(new DuplicatedByteBuf(Unpooled.EMPTY_BUFFER));
+  }
+
+  public UnsafeDirectLittleEndian allocate(int size) {
+    try {
+      return allocator.directBuffer(size, Integer.MAX_VALUE);
+    } catch (OutOfMemoryError e) {
+      throw new OutOfMemoryException("Failure allocating buffer.", e);
+    }
+
+  }
+
+  public int getChunkSize() {
+    return allocator.chunkSize;
+  }
+
+  private class InnerAllocator extends PooledByteBufAllocator {
+
+
+    private final PoolArena<ByteBuffer>[] directArenas;
+    private final MemoryStatusThread statusThread;
+    private final Histogram largeBuffersHist;
+    private final Histogram normalBuffersHist;
+    private final int chunkSize;
+
+    public InnerAllocator() {
+      super(true);
+
+      try {
+        Field f = PooledByteBufAllocator.class.getDeclaredField("directArenas");
+        f.setAccessible(true);
+        this.directArenas = (PoolArena<ByteBuffer>[]) f.get(this);
+      } catch (Exception e) {
+        throw new RuntimeException("Failure while initializing allocator.  Unable to retrieve direct arenas field.", e);
+      }
+
+      this.chunkSize = directArenas[0].chunkSize;
+
+      if (memoryLogger.isTraceEnabled()) {
+        statusThread = new MemoryStatusThread();
+        statusThread.start();
+      } else {
+        statusThread = null;
+      }
+      removeOldMetrics();
+
+      registry.register(METRIC_PREFIX + "normal.size", new Gauge<Long>() {
+        @Override
+        public Long getValue() {
+          return normalBufferSize.get();
+        }
+      });
+
+      registry.register(METRIC_PREFIX + "normal.count", new Gauge<Long>() {
+        @Override
+        public Long getValue() {
+          return normalBufferCount.get();
+        }
+      });
+
+      registry.register(METRIC_PREFIX + "huge.size", new Gauge<Long>() {
+        @Override
+        public Long getValue() {
+          return hugeBufferSize.get();
+        }
+      });
+
+      registry.register(METRIC_PREFIX + "huge.count", new Gauge<Long>() {
+        @Override
+        public Long getValue() {
+          return hugeBufferCount.get();
+        }
+      });
+
+      largeBuffersHist = registry.histogram(METRIC_PREFIX + "huge.hist");
+      normalBuffersHist = registry.histogram(METRIC_PREFIX + "normal.hist");
+
+    }
+
+
+    private synchronized void removeOldMetrics() {
+      registry.removeMatching(new MetricFilter() {
+        @Override
+        public boolean matches(String name, Metric metric) {
+          return name.startsWith("drill.allocator.");
+        }
+
+      });
+    }
+
+    private UnsafeDirectLittleEndian newDirectBufferL(int initialCapacity, int maxCapacity) {
+      PoolThreadCache cache = threadCache.get();
+      PoolArena<ByteBuffer> directArena = cache.directArena;
+
+      if (directArena != null) {
+
+        if (initialCapacity > directArena.chunkSize) {
+          // This is beyond chunk size so we'll allocate separately.
+          ByteBuf buf = UnpooledByteBufAllocator.DEFAULT.directBuffer(initialCapacity, maxCapacity);
+
+          hugeBufferCount.incrementAndGet();
+          hugeBufferSize.addAndGet(buf.capacity());
+          largeBuffersHist.update(buf.capacity());
+          // logger.debug("Allocating huge buffer of size {}", initialCapacity, new Exception());
+          return new UnsafeDirectLittleEndian(new LargeBuffer(buf, hugeBufferSize, hugeBufferCount));
+
+        } else {
+          // within chunk, use arena.
+          ByteBuf buf = directArena.allocate(cache, initialCapacity, maxCapacity);
+          if (!(buf instanceof PooledUnsafeDirectByteBuf)) {
+            fail();
+          }
+
+          normalBuffersHist.update(buf.capacity());
+          if (ASSERT_ENABLED) {
+            normalBufferSize.addAndGet(buf.capacity());
+            normalBufferCount.incrementAndGet();
+          }
+
+          return new UnsafeDirectLittleEndian((PooledUnsafeDirectByteBuf) buf, normalBufferCount,
+              normalBufferSize);
+        }
+
+      } else {
+        throw fail();
+      }
+    }
+
+    private UnsupportedOperationException fail() {
+      return new UnsupportedOperationException(
+          "Drill requries that the JVM used supports access sun.misc.Unsafe.  This platform didn't provide that functionality.");
+    }
+
+    public UnsafeDirectLittleEndian directBuffer(int initialCapacity, int maxCapacity) {
+      if (initialCapacity == 0 && maxCapacity == 0) {
+        newDirectBuffer(initialCapacity, maxCapacity);
+      }
+      validate(initialCapacity, maxCapacity);
+      return newDirectBufferL(initialCapacity, maxCapacity);
+    }
+
+    @Override
+    public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) {
+      throw new UnsupportedOperationException("Drill doesn't support using heap buffers.");
+    }
+
+
+    private void validate(int initialCapacity, int maxCapacity) {
+      if (initialCapacity < 0) {
+        throw new IllegalArgumentException("initialCapacity: " + initialCapacity + " (expectd: 0+)");
+      }
+      if (initialCapacity > maxCapacity) {
+        throw new IllegalArgumentException(String.format(
+            "initialCapacity: %d (expected: not greater than maxCapacity(%d)",
+            initialCapacity, maxCapacity));
+      }
+    }
+
+    private class MemoryStatusThread extends Thread {
+
+      public MemoryStatusThread() {
+        super("memory-status-logger");
+        this.setDaemon(true);
+        this.setName("allocation.logger");
+      }
+
+      @Override
+      public void run() {
+        while (true) {
+          memoryLogger.trace("Memory Usage: \n{}", PooledByteBufAllocatorL.this.toString());
+          try {
+            Thread.sleep(MEMORY_LOGGER_FREQUENCY_SECONDS * 1000);
+          } catch (InterruptedException e) {
+            return;
+          }
+
+        }
+      }
+
+    }
+
+    public String toString() {
+      StringBuilder buf = new StringBuilder();
+      buf.append(directArenas.length);
+      buf.append(" direct arena(s):");
+      buf.append(StringUtil.NEWLINE);
+      for (PoolArena<ByteBuffer> a : directArenas) {
+        buf.append(a);
+      }
+
+      buf.append("Large buffers outstanding: ");
+      buf.append(hugeBufferCount.get());
+      buf.append(" totaling ");
+      buf.append(hugeBufferSize.get());
+      buf.append(" bytes.");
+      buf.append('\n');
+      buf.append("Normal buffers outstanding: ");
+      buf.append(normalBufferCount.get());
+      buf.append(" totaling ");
+      buf.append(normalBufferSize.get());
+      buf.append(" bytes.");
+      return buf.toString();
+    }
+
+
+  }
+
+  public static final boolean ASSERT_ENABLED;
+
+  static {
+    boolean isAssertEnabled = false;
+    assert isAssertEnabled = true;
+    ASSERT_ENABLED = isAssertEnabled;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/arrow/blob/fa5f0299/java/memory/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java
----------------------------------------------------------------------
diff --git a/java/memory/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java b/java/memory/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java
new file mode 100644
index 0000000..6495d5d
--- /dev/null
+++ b/java/memory/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java
@@ -0,0 +1,270 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.netty.buffer;
+
+import io.netty.util.internal.PlatformDependent;
+
+import java.nio.ByteOrder;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * The underlying class we use for little-endian access to memory. Is used underneath DrillBufs to abstract away the
+ * Netty classes and underlying Netty memory management.
+ */
+public final class UnsafeDirectLittleEndian extends WrappedByteBuf {
+  private static final boolean NATIVE_ORDER = ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN;
+  private static final AtomicLong ID_GENERATOR = new AtomicLong(0);
+
+  public final long id = ID_GENERATOR.incrementAndGet();
+  private final AbstractByteBuf wrapped;
+  private final long memoryAddress;
+
+  private final AtomicLong bufferCount;
+  private final AtomicLong bufferSize;
+  private final long initCap;
+
+  UnsafeDirectLittleEndian(DuplicatedByteBuf buf) {
+    this(buf, true, null, null);
+  }
+
+  UnsafeDirectLittleEndian(LargeBuffer buf) {
+    this(buf, true, null, null);
+  }
+
+  UnsafeDirectLittleEndian(PooledUnsafeDirectByteBuf buf, AtomicLong bufferCount, AtomicLong bufferSize) {
+    this(buf, true, bufferCount, bufferSize);
+
+  }
+
+  private UnsafeDirectLittleEndian(AbstractByteBuf buf, boolean fake, AtomicLong bufferCount, AtomicLong bufferSize) {
+    super(buf);
+    if (!NATIVE_ORDER || buf.order() != ByteOrder.BIG_ENDIAN) {
+      throw new IllegalStateException("Drill only runs on LittleEndian systems.");
+    }
+
+    this.bufferCount = bufferCount;
+    this.bufferSize = bufferSize;
+
+    // initCap is used if we're tracking memory release. If we're in non-debug mode, we'll skip this.
+    this.initCap = ASSERT_ENABLED ? buf.capacity() : -1;
+
+    this.wrapped = buf;
+    this.memoryAddress = buf.memoryAddress();
+  }
+    private long addr(int index) {
+        return memoryAddress + index;
+    }
+
+    @Override
+    public long getLong(int index) {
+//        wrapped.checkIndex(index, 8);
+        long v = PlatformDependent.getLong(addr(index));
+        return v;
+    }
+
+    @Override
+    public float getFloat(int index) {
+        return Float.intBitsToFloat(getInt(index));
+    }
+
+  @Override
+  public ByteBuf slice() {
+    return slice(this.readerIndex(), readableBytes());
+  }
+
+  @Override
+  public ByteBuf slice(int index, int length) {
+    return new SlicedByteBuf(this, index, length);
+  }
+
+  @Override
+  public ByteOrder order() {
+    return ByteOrder.LITTLE_ENDIAN;
+  }
+
+  @Override
+  public ByteBuf order(ByteOrder endianness) {
+    return this;
+  }
+
+  @Override
+  public double getDouble(int index) {
+    return Double.longBitsToDouble(getLong(index));
+  }
+
+  @Override
+  public char getChar(int index) {
+    return (char) getShort(index);
+  }
+
+  @Override
+  public long getUnsignedInt(int index) {
+    return getInt(index) & 0xFFFFFFFFL;
+  }
+
+  @Override
+  public int getInt(int index) {
+    int v = PlatformDependent.getInt(addr(index));
+    return v;
+  }
+
+  @Override
+  public int getUnsignedShort(int index) {
+    return getShort(index) & 0xFFFF;
+  }
+
+  @Override
+  public short getShort(int index) {
+    short v = PlatformDependent.getShort(addr(index));
+    return v;
+  }
+
+  @Override
+  public ByteBuf setShort(int index, int value) {
+    wrapped.checkIndex(index, 2);
+    _setShort(index, value);
+    return this;
+  }
+
+  @Override
+  public ByteBuf setInt(int index, int value) {
+    wrapped.checkIndex(index, 4);
+    _setInt(index, value);
+    return this;
+  }
+
+  @Override
+  public ByteBuf setLong(int index, long value) {
+    wrapped.checkIndex(index, 8);
+    _setLong(index, value);
+    return this;
+  }
+
+  @Override
+  public ByteBuf setChar(int index, int value) {
+    setShort(index, value);
+    return this;
+  }
+
+  @Override
+  public ByteBuf setFloat(int index, float value) {
+    setInt(index, Float.floatToRawIntBits(value));
+    return this;
+  }
+
+  @Override
+  public ByteBuf setDouble(int index, double value) {
+    setLong(index, Double.doubleToRawLongBits(value));
+    return this;
+  }
+
+  @Override
+  public ByteBuf writeShort(int value) {
+    wrapped.ensureWritable(2);
+    _setShort(wrapped.writerIndex, value);
+    wrapped.writerIndex += 2;
+    return this;
+  }
+
+  @Override
+  public ByteBuf writeInt(int value) {
+    wrapped.ensureWritable(4);
+    _setInt(wrapped.writerIndex, value);
+    wrapped.writerIndex += 4;
+    return this;
+  }
+
+  @Override
+  public ByteBuf writeLong(long value) {
+    wrapped.ensureWritable(8);
+    _setLong(wrapped.writerIndex, value);
+    wrapped.writerIndex += 8;
+    return this;
+  }
+
+  @Override
+  public ByteBuf writeChar(int value) {
+    writeShort(value);
+    return this;
+  }
+
+  @Override
+  public ByteBuf writeFloat(float value) {
+    writeInt(Float.floatToRawIntBits(value));
+    return this;
+  }
+
+  @Override
+  public ByteBuf writeDouble(double value) {
+    writeLong(Double.doubleToRawLongBits(value));
+    return this;
+  }
+
+  private void _setShort(int index, int value) {
+    PlatformDependent.putShort(addr(index), (short) value);
+  }
+
+  private void _setInt(int index, int value) {
+    PlatformDependent.putInt(addr(index), value);
+  }
+
+  private void _setLong(int index, long value) {
+    PlatformDependent.putLong(addr(index), value);
+  }
+
+  @Override
+  public byte getByte(int index) {
+    return PlatformDependent.getByte(addr(index));
+  }
+
+  @Override
+  public ByteBuf setByte(int index, int value) {
+    PlatformDependent.putByte(addr(index), (byte) value);
+    return this;
+  }
+
+  @Override
+  public boolean release() {
+    return release(1);
+  }
+
+  @Override
+  public boolean release(int decrement) {
+    final boolean released = super.release(decrement);
+    if (ASSERT_ENABLED && released && bufferCount != null && bufferSize != null) {
+      bufferCount.decrementAndGet();
+      bufferSize.addAndGet(-initCap);
+    }
+    return released;
+  }
+
+  @Override
+  public int hashCode() {
+    return System.identityHashCode(this);
+  }
+
+  public static final boolean ASSERT_ENABLED;
+
+  static {
+    boolean isAssertEnabled = false;
+    assert isAssertEnabled = true;
+    ASSERT_ENABLED = isAssertEnabled;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/arrow/blob/fa5f0299/java/memory/src/main/java/org/apache/arrow/memory/Accountant.java
----------------------------------------------------------------------
diff --git a/java/memory/src/main/java/org/apache/arrow/memory/Accountant.java b/java/memory/src/main/java/org/apache/arrow/memory/Accountant.java
new file mode 100644
index 0000000..dc75e5d
--- /dev/null
+++ b/java/memory/src/main/java/org/apache/arrow/memory/Accountant.java
@@ -0,0 +1,272 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.memory;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Provides a concurrent way to manage account for memory usage without locking. Used as basis for Allocators. All
+ * operations are threadsafe (except for close).
+ */
+@ThreadSafe
+class Accountant implements AutoCloseable {
+  // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Accountant.class);
+
+  /**
+   * The parent allocator
+   */
+  protected final Accountant parent;
+
+  /**
+   * The amount of memory reserved for this allocator. Releases below this amount of memory will not be returned to the
+   * parent Accountant until this Accountant is closed.
+   */
+  protected final long reservation;
+
+  private final AtomicLong peakAllocation = new AtomicLong();
+
+  /**
+   * Maximum local memory that can be held. This can be externally updated. Changing it won't cause past memory to
+   * change but will change responses to future allocation efforts
+   */
+  private final AtomicLong allocationLimit = new AtomicLong();
+
+  /**
+   * Currently allocated amount of memory;
+   */
+  private final AtomicLong locallyHeldMemory = new AtomicLong();
+
+  public Accountant(Accountant parent, long reservation, long maxAllocation) {
+    Preconditions.checkArgument(reservation >= 0, "The initial reservation size must be non-negative.");
+    Preconditions.checkArgument(maxAllocation >= 0, "The maximum allocation limit must be non-negative.");
+    Preconditions.checkArgument(reservation <= maxAllocation,
+        "The initial reservation size must be <= the maximum allocation.");
+    Preconditions.checkArgument(reservation == 0 || parent != null, "The root accountant can't reserve memory.");
+
+    this.parent = parent;
+    this.reservation = reservation;
+    this.allocationLimit.set(maxAllocation);
+
+    if (reservation != 0) {
+      // we will allocate a reservation from our parent.
+      final AllocationOutcome outcome = parent.allocateBytes(reservation);
+      if (!outcome.isOk()) {
+        throw new OutOfMemoryException(String.format(
+            "Failure trying to allocate initial reservation for Allocator. "
+                + "Attempted to allocate %d bytes and received an outcome of %s.", reservation, outcome.name()));
+      }
+    }
+  }
+
+  /**
+   * Attempt to allocate the requested amount of memory. Either completely succeeds or completely fails. Constructs a a
+   * log of delta
+   *
+   * If it fails, no changes are made to accounting.
+   *
+   * @param size
+   *          The amount of memory to reserve in bytes.
+   * @return True if the allocation was successful, false if the allocation failed.
+   */
+  AllocationOutcome allocateBytes(long size) {
+    final AllocationOutcome outcome = allocate(size, true, false);
+    if (!outcome.isOk()) {
+      releaseBytes(size);
+    }
+    return outcome;
+  }
+
+  private void updatePeak() {
+    final long currentMemory = locallyHeldMemory.get();
+    while (true) {
+
+      final long previousPeak = peakAllocation.get();
+      if (currentMemory > previousPeak) {
+        if (!peakAllocation.compareAndSet(previousPeak, currentMemory)) {
+          // peak allocation changed underneath us. try again.
+          continue;
+        }
+      }
+
+      // we either succeeded to set peak allocation or we weren't above the previous peak, exit.
+      return;
+    }
+  }
+
+
+  /**
+   * Increase the accounting. Returns whether the allocation fit within limits.
+   *
+   * @param size
+   *          to increase
+   * @return Whether the allocation fit within limits.
+   */
+  boolean forceAllocate(long size) {
+    final AllocationOutcome outcome = allocate(size, true, true);
+    return outcome.isOk();
+  }
+
+  /**
+   * Internal method for allocation. This takes a forced approach to allocation to ensure that we manage reservation
+   * boundary issues consistently. Allocation is always done through the entire tree. The two options that we influence
+   * are whether the allocation should be forced and whether or not the peak memory allocation should be updated. If at
+   * some point during allocation escalation we determine that the allocation is no longer possible, we will continue to
+   * do a complete and consistent allocation but we will stop updating the peak allocation. We do this because we know
+   * that we will be directly unwinding this allocation (and thus never actually making the allocation). If force
+   * allocation is passed, then we continue to update the peak limits since we now know that this allocation will occur
+   * despite our moving past one or more limits.
+   *
+   * @param size
+   *          The size of the allocation.
+   * @param incomingUpdatePeak
+   *          Whether we should update the local peak for this allocation.
+   * @param forceAllocation
+   *          Whether we should force the allocation.
+   * @return The outcome of the allocation.
+   */
+  private AllocationOutcome allocate(final long size, final boolean incomingUpdatePeak, final boolean forceAllocation) {
+    final long newLocal = locallyHeldMemory.addAndGet(size);
+    final long beyondReservation = newLocal - reservation;
+    final boolean beyondLimit = newLocal > allocationLimit.get();
+    final boolean updatePeak = forceAllocation || (incomingUpdatePeak && !beyondLimit);
+
+    AllocationOutcome parentOutcome = AllocationOutcome.SUCCESS;
+    if (beyondReservation > 0 && parent != null) {
+      // we need to get memory from our parent.
+      final long parentRequest = Math.min(beyondReservation, size);
+      parentOutcome = parent.allocate(parentRequest, updatePeak, forceAllocation);
+    }
+
+    final AllocationOutcome finalOutcome = beyondLimit ? AllocationOutcome.FAILED_LOCAL :
+        parentOutcome.ok ? AllocationOutcome.SUCCESS : AllocationOutcome.FAILED_PARENT;
+
+    if (updatePeak) {
+      updatePeak();
+    }
+
+    return finalOutcome;
+  }
+
+  public void releaseBytes(long size) {
+    // reduce local memory. all memory released above reservation should be released up the tree.
+    final long newSize = locallyHeldMemory.addAndGet(-size);
+
+    Preconditions.checkArgument(newSize >= 0, "Accounted size went negative.");
+
+    final long originalSize = newSize + size;
+    if(originalSize > reservation && parent != null){
+      // we deallocated memory that we should release to our parent.
+      final long possibleAmountToReleaseToParent = originalSize - reservation;
+      final long actualToReleaseToParent = Math.min(size, possibleAmountToReleaseToParent);
+      parent.releaseBytes(actualToReleaseToParent);
+    }
+
+  }
+
+  /**
+   * Set the maximum amount of memory that can be allocated in the this Accountant before failing an allocation.
+   *
+   * @param newLimit
+   *          The limit in bytes.
+   */
+  public void setLimit(long newLimit) {
+    allocationLimit.set(newLimit);
+  }
+
+  public boolean isOverLimit() {
+    return getAllocatedMemory() > getLimit() || (parent != null && parent.isOverLimit());
+  }
+
+  /**
+   * Close this Accountant. This will release any reservation bytes back to a parent Accountant.
+   */
+  public void close() {
+    // return memory reservation to parent allocator.
+    if (parent != null) {
+      parent.releaseBytes(reservation);
+    }
+  }
+
+  /**
+   * Return the current limit of this Accountant.
+   *
+   * @return Limit in bytes.
+   */
+  public long getLimit() {
+    return allocationLimit.get();
+  }
+
+  /**
+   * Return the current amount of allocated memory that this Accountant is managing accounting for. Note this does not
+   * include reservation memory that hasn't been allocated.
+   *
+   * @return Currently allocate memory in bytes.
+   */
+  public long getAllocatedMemory() {
+    return locallyHeldMemory.get();
+  }
+
+  /**
+   * The peak memory allocated by this Accountant.
+   *
+   * @return The peak allocated memory in bytes.
+   */
+  public long getPeakMemoryAllocation() {
+    return peakAllocation.get();
+  }
+
+  /**
+   * Describes the type of outcome that occurred when trying to account for allocation of memory.
+   */
+  public static enum AllocationOutcome {
+
+    /**
+     * Allocation succeeded.
+     */
+    SUCCESS(true),
+
+    /**
+     * Allocation succeeded but only because the allocator was forced to move beyond a limit.
+     */
+    FORCED_SUCESS(true),
+
+    /**
+     * Allocation failed because the local allocator's limits were exceeded.
+     */
+    FAILED_LOCAL(false),
+
+    /**
+     * Allocation failed because a parent allocator's limits were exceeded.
+     */
+    FAILED_PARENT(false);
+
+    private final boolean ok;
+
+    AllocationOutcome(boolean ok) {
+      this.ok = ok;
+    }
+
+    public boolean isOk() {
+      return ok;
+    }
+  }
+}


Mime
View raw message