drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jacq...@apache.org
Subject [2/4] drill git commit: DRILL-2940, DRILL-2847: Improve Memory Characteristics
Date Sun, 03 May 2015 03:03:04 GMT
DRILL-2940, DRILL-2847: Improve Memory Characteristics

- Update Large Buffer allocation so Drill releases immediately rather than waiting for Garbage Collection
- Remove DrillBuf.wrap() and all references to it.
- Update Parquet Reader to reduce object churn and indirection.
- Add additional metric to memory iterator
- Add Large and small buffer metric historgram tracking
- Add memory tracking reporter
- Update Netty to 4.0.27


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/0dd0e833
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/0dd0e833
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/0dd0e833

Branch: refs/heads/master
Commit: 0dd0e833714120c77e3e7ef34de654f5246953b9
Parents: 88bb051
Author: Jacques Nadeau <jacques@apache.org>
Authored: Tue Apr 28 08:53:12 2015 -0700
Committer: Jacques Nadeau <jacques@apache.org>
Committed: Sat May 2 19:33:54 2015 -0700

----------------------------------------------------------------------
 exec/java-exec/pom.xml                          |   2 +-
 .../src/main/codegen/includes/vv_imports.ftl    |   1 +
 .../codegen/templates/NullableValueVectors.java |  13 +
 .../templates/VariableLengthVectors.java        |  12 +
 .../src/main/java/io/netty/buffer/DrillBuf.java |  47 +--
 .../main/java/io/netty/buffer/LargeBuffer.java  | 350 +++++++++++++++++++
 .../netty/buffer/PooledByteBufAllocatorL.java   | 199 ++++++++++-
 .../netty/buffer/UnsafeDirectLittleEndian.java  |  80 ++++-
 .../apache/drill/exec/TestMemoryRetention.java  | 144 ++++++++
 .../drill/exec/memory/TopLevelAllocator.java    |   2 +-
 .../apache/drill/exec/metrics/DrillMetrics.java |   6 +-
 .../drill/exec/physical/impl/ScanBatch.java     |   6 +
 .../apache/drill/exec/rpc/data/DataServer.java  |  74 ++--
 .../exec/store/parquet/ColumnDataReader.java    |  19 +-
 .../store/parquet/columnreaders/BitReader.java  |   5 +-
 .../parquet/columnreaders/ColumnReader.java     |  25 +-
 .../columnreaders/FixedByteAlignedReader.java   |  28 +-
 .../columnreaders/FixedWidthRepeatedReader.java |   2 +-
 .../columnreaders/NullableBitReader.java        |   2 +-
 .../columnreaders/NullableColumnReader.java     |   6 +-
 .../NullableFixedByteAlignedReaders.java        |  26 +-
 .../NullableVarLengthValuesColumn.java          |   6 +-
 .../store/parquet/columnreaders/PageReader.java | 249 +++++++------
 .../ParquetFixedWidthDictionaryReaders.java     |  18 +-
 .../columnreaders/VarLengthColumnReaders.java   |  94 ++---
 .../columnreaders/VarLengthValuesColumn.java    |   7 +-
 .../drill/exec/store/sys/MemoryIterator.java    |  25 +-
 .../apache/drill/exec/util/DecimalUtility.java  |   8 +
 .../exec/work/fragment/FragmentExecutor.java    |   4 +-
 .../parquet/hadoop/CodecFactoryExposer.java     |  19 +-
 .../exec/vector/complex/TestEmptyPopulator.java |  14 +-
 .../src/test/resources/drill-module.conf        |   3 +-
 pom.xml                                         |   2 +-
 33 files changed, 1134 insertions(+), 364 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/0dd0e833/exec/java-exec/pom.xml
----------------------------------------------------------------------
diff --git a/exec/java-exec/pom.xml b/exec/java-exec/pom.xml
index 17f5e6e..35df625 100644
--- a/exec/java-exec/pom.xml
+++ b/exec/java-exec/pom.xml
@@ -115,7 +115,7 @@
       <groupId>io.netty</groupId>
       <artifactId>netty-transport-native-epoll</artifactId>
       <classifier>linux-x86_64</classifier>
-      <version>4.0.24.Final</version>
+      <version>4.0.27.Final</version>
     </dependency>
     <dependency>
       <groupId>org.glassfish.jersey.ext</groupId>

http://git-wip-us.apache.org/repos/asf/drill/blob/0dd0e833/exec/java-exec/src/main/codegen/includes/vv_imports.ftl
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/includes/vv_imports.ftl b/exec/java-exec/src/main/codegen/includes/vv_imports.ftl
index d0f6291..92c8007 100644
--- a/exec/java-exec/src/main/codegen/includes/vv_imports.ftl
+++ b/exec/java-exec/src/main/codegen/includes/vv_imports.ftl
@@ -57,6 +57,7 @@ import java.util.List;
 import java.io.Closeable;
 import java.io.InputStream;
 import java.io.InputStreamReader;
+import java.nio.ByteBuffer;
 
 import java.sql.Date;
 import java.sql.Time;

http://git-wip-us.apache.org/repos/asf/drill/blob/0dd0e833/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java b/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java
index 9373fc3..9d03efb 100644
--- a/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java
+++ b/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java
@@ -460,6 +460,19 @@ public final class ${className} extends BaseDataValueVector implements <#if type
       <#if type.major == "VarLen">lastSet = index;</#if>
       </#if>
     }
+    
+    public void setSafe(int index, ByteBuffer value, int start, int length) {
+      <#if type.major != "VarLen">
+      throw new UnsupportedOperationException();
+      <#else>
+      fillEmpties(index);
+
+      bits.getMutator().setSafe(index, 1);
+      values.getMutator().setSafe(index, value, start, length);
+      setCount++;
+      <#if type.major == "VarLen">lastSet = index;</#if>
+      </#if>
+    }
 
     public void setNull(int index){
       bits.getMutator().setSafe(index, 0);

http://git-wip-us.apache.org/repos/asf/drill/blob/0dd0e833/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java b/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
index 0273304..8a4b663 100644
--- a/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
+++ b/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
@@ -444,6 +444,18 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
       offsetVector.getMutator().set(index + 1, currentOffset + length);
       data.setBytes(currentOffset, bytes, start, length);
     }
+    
+    public void setSafe(int index, ByteBuffer bytes, int start, int length) {
+      assert index >= 0;
+
+      int currentOffset = offsetVector.getAccessor().get(index);
+
+      while (data.capacity() < currentOffset + length) {
+        reAlloc();
+      }
+      offsetVector.getMutator().setSafe(index + 1, currentOffset + length);
+      data.setBytes(currentOffset, bytes, start, length);
+    }
 
     public void setSafe(int index, byte[] bytes, int start, int length) {
       assert index >= 0;

http://git-wip-us.apache.org/repos/asf/drill/blob/0dd0e833/exec/java-exec/src/main/java/io/netty/buffer/DrillBuf.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/io/netty/buffer/DrillBuf.java b/exec/java-exec/src/main/java/io/netty/buffer/DrillBuf.java
index 43b8b48..2016e1e 100644
--- a/exec/java-exec/src/main/java/io/netty/buffer/DrillBuf.java
+++ b/exec/java-exec/src/main/java/io/netty/buffer/DrillBuf.java
@@ -36,6 +36,8 @@ import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.util.AssertionUtil;
 
+import com.google.common.base.Preconditions;
+
 public final class DrillBuf extends AbstractByteBuf {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillBuf.class);
 
@@ -71,19 +73,6 @@ public final class DrillBuf extends AbstractByteBuf {
     this.allocator = allocator;
   }
 
-  private DrillBuf(ByteBuffer bb) {
-    super(bb.remaining());
-    UnpooledUnsafeDirectByteBuf bytebuf = new UnpooledUnsafeDirectByteBuf(UnpooledByteBufAllocator.DEFAULT, bb, bb.remaining());
-    this.acct = FakeAllocator.FAKE_ACCOUNTOR;
-    this.addr = bytebuf.memoryAddress();
-    this.allocator = FakeAllocator.FAKE_ALLOCATOR;
-    this.b = bytebuf;
-    this.length = bytebuf.capacity();
-    this.offset = 0;
-    this.rootBuffer = true;
-    this.writerIndex(bb.remaining());
-  }
-
   private DrillBuf(BufferAllocator allocator, Accountor a) {
     super(0);
     this.b = new EmptyByteBuf(allocator.getUnderlyingAllocator()).order(ByteOrder.LITTLE_ENDIAN);
@@ -257,7 +246,9 @@ public final class DrillBuf extends AbstractByteBuf {
   public synchronized boolean release(int decrement) {
 
     if(rootBuffer){
-      if(0 == this.rootRefCnt.addAndGet(-decrement)){
+      final long newRefCnt = this.rootRefCnt.addAndGet(-decrement);
+      Preconditions.checkArgument(newRefCnt > -1, "Buffer has negative reference count.");
+      if (newRefCnt == 0) {
         b.release(decrement);
         acct.release(this, length);
         return true;
@@ -699,6 +690,25 @@ public final class DrillBuf extends AbstractByteBuf {
     return this;
   }
 
+  public ByteBuf setBytes(int index, ByteBuffer src, int srcIndex, int length) {
+    if (src.isDirect()) {
+      checkIndex(index, length);
+      PlatformDependent.copyMemory(PlatformDependent.directBufferAddress(src) + srcIndex, this.memoryAddress() + index,
+          length);
+    } else {
+      if (srcIndex == 0 && src.capacity() == length) {
+        b.setBytes(index + offset, src);
+      } else {
+        ByteBuffer newBuf = src.duplicate();
+        newBuf.position(srcIndex);
+        newBuf.limit(srcIndex + length);
+        b.setBytes(index + offset, src);
+      }
+    }
+
+    return this;
+  }
+
   @Override
   public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
     b.setBytes(index + offset, src, srcIndex, length);
@@ -735,13 +745,4 @@ public final class DrillBuf extends AbstractByteBuf {
     return rootBuffer;
   }
 
-  public static DrillBuf wrapByteBuffer(ByteBuffer b) {
-    if (!b.isDirect()) {
-      throw new IllegalStateException("DrillBufs can only refer to direct memory.");
-    } else {
-      return new DrillBuf(b);
-    }
-
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/0dd0e833/exec/java-exec/src/main/java/io/netty/buffer/LargeBuffer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/io/netty/buffer/LargeBuffer.java b/exec/java-exec/src/main/java/io/netty/buffer/LargeBuffer.java
new file mode 100644
index 0000000..f1d4842
--- /dev/null
+++ b/exec/java-exec/src/main/java/io/netty/buffer/LargeBuffer.java
@@ -0,0 +1,350 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.netty.buffer;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.GatheringByteChannel;
+import java.nio.channels.ScatteringByteChannel;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * This is basically a complete copy of DuplicatedByteBuf. We copy because we can't override the release methods to keep
+ * global track of created Large Buffers.
+ */
+public class LargeBuffer extends AbstractByteBuf {
+
+  private final AtomicLong hugeBufferSize;
+  private final AtomicLong hugeBufferCount;
+
+  @Override
+  public ByteBuffer nioBuffer(int index, int length) {
+    return unwrap().nioBuffer(index, length);
+  }
+
+  private final ByteBuf buffer;
+  private final int initCap;
+
+  public LargeBuffer(ByteBuf buffer, AtomicLong hugeBufferSize, AtomicLong hugeBufferCount) {
+    super(buffer.maxCapacity());
+    initCap = buffer.capacity();
+    this.hugeBufferCount = hugeBufferCount;
+    this.hugeBufferSize = hugeBufferSize;
+
+    if (buffer instanceof LargeBuffer) {
+      this.buffer = ((LargeBuffer) buffer).buffer;
+    } else {
+      this.buffer = buffer;
+    }
+
+    setIndex(buffer.readerIndex(), buffer.writerIndex());
+  }
+
+  @Override
+  public ByteBuf unwrap() {
+    return buffer;
+  }
+
+  @Override
+  public ByteBufAllocator alloc() {
+    return buffer.alloc();
+  }
+
+  @Override
+  public ByteOrder order() {
+    return buffer.order();
+  }
+
+  @Override
+  public boolean isDirect() {
+    return buffer.isDirect();
+  }
+
+  @Override
+  public int capacity() {
+    return buffer.capacity();
+  }
+
+  @Override
+  public ByteBuf capacity(int newCapacity) {
+    buffer.capacity(newCapacity);
+    return this;
+  }
+
+  @Override
+  public boolean hasArray() {
+    return buffer.hasArray();
+  }
+
+  @Override
+  public byte[] array() {
+    return buffer.array();
+  }
+
+  @Override
+  public int arrayOffset() {
+    return buffer.arrayOffset();
+  }
+
+  @Override
+  public boolean hasMemoryAddress() {
+    return buffer.hasMemoryAddress();
+  }
+
+  @Override
+  public long memoryAddress() {
+    return buffer.memoryAddress();
+  }
+
+  @Override
+  public byte getByte(int index) {
+    return _getByte(index);
+  }
+
+  @Override
+  protected byte _getByte(int index) {
+    return buffer.getByte(index);
+  }
+
+  @Override
+  public short getShort(int index) {
+    return _getShort(index);
+  }
+
+  @Override
+  protected short _getShort(int index) {
+    return buffer.getShort(index);
+  }
+
+  @Override
+  public int getUnsignedMedium(int index) {
+    return _getUnsignedMedium(index);
+  }
+
+  @Override
+  protected int _getUnsignedMedium(int index) {
+    return buffer.getUnsignedMedium(index);
+  }
+
+  @Override
+  public int getInt(int index) {
+    return _getInt(index);
+  }
+
+  @Override
+  protected int _getInt(int index) {
+    return buffer.getInt(index);
+  }
+
+  @Override
+  public long getLong(int index) {
+    return _getLong(index);
+  }
+
+  @Override
+  protected long _getLong(int index) {
+    return buffer.getLong(index);
+  }
+
+  @Override
+  public ByteBuf copy(int index, int length) {
+    return new LargeBuffer(buffer.copy(index, length), hugeBufferSize, hugeBufferCount);
+  }
+
+  @Override
+  public ByteBuf slice(int index, int length) {
+    return new SlicedByteBuf(this, index, length);
+  }
+
+  @Override
+  public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) {
+    buffer.getBytes(index, dst, dstIndex, length);
+    return this;
+  }
+
+  @Override
+  public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
+    buffer.getBytes(index, dst, dstIndex, length);
+    return this;
+  }
+
+  @Override
+  public ByteBuf getBytes(int index, ByteBuffer dst) {
+    buffer.getBytes(index, dst);
+    return this;
+  }
+
+  @Override
+  public ByteBuf setByte(int index, int value) {
+    _setByte(index, value);
+    return this;
+  }
+
+  @Override
+  protected void _setByte(int index, int value) {
+    buffer.setByte(index, value);
+  }
+
+  @Override
+  public ByteBuf setShort(int index, int value) {
+    _setShort(index, value);
+    return this;
+  }
+
+  @Override
+  protected void _setShort(int index, int value) {
+    buffer.setShort(index, value);
+  }
+
+  @Override
+  public ByteBuf setMedium(int index, int value) {
+    _setMedium(index, value);
+    return this;
+  }
+
+  @Override
+  protected void _setMedium(int index, int value) {
+    buffer.setMedium(index, value);
+  }
+
+  @Override
+  public ByteBuf setInt(int index, int value) {
+    _setInt(index, value);
+    return this;
+  }
+
+  @Override
+  protected void _setInt(int index, int value) {
+    buffer.setInt(index, value);
+  }
+
+  @Override
+  public ByteBuf setLong(int index, long value) {
+    _setLong(index, value);
+    return this;
+  }
+
+  @Override
+  protected void _setLong(int index, long value) {
+    buffer.setLong(index, value);
+  }
+
+  @Override
+  public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
+    buffer.setBytes(index, src, srcIndex, length);
+    return this;
+  }
+
+  @Override
+  public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
+    buffer.setBytes(index, src, srcIndex, length);
+    return this;
+  }
+
+  @Override
+  public ByteBuf setBytes(int index, ByteBuffer src) {
+    buffer.setBytes(index, src);
+    return this;
+  }
+
+  @Override
+  public ByteBuf getBytes(int index, OutputStream out, int length)
+      throws IOException {
+    buffer.getBytes(index, out, length);
+    return this;
+  }
+
+  @Override
+  public int getBytes(int index, GatheringByteChannel out, int length)
+      throws IOException {
+    return buffer.getBytes(index, out, length);
+  }
+
+  @Override
+  public int setBytes(int index, InputStream in, int length)
+      throws IOException {
+    return buffer.setBytes(index, in, length);
+  }
+
+  @Override
+  public int setBytes(int index, ScatteringByteChannel in, int length)
+      throws IOException {
+    return buffer.setBytes(index, in, length);
+  }
+
+  @Override
+  public int nioBufferCount() {
+    return buffer.nioBufferCount();
+  }
+
+  @Override
+  public ByteBuffer[] nioBuffers(int index, int length) {
+    return buffer.nioBuffers(index, length);
+  }
+
+  @Override
+  public ByteBuffer internalNioBuffer(int index, int length) {
+    return nioBuffer(index, length);
+  }
+
+  @Override
+  public int forEachByte(int index, int length, ByteBufProcessor processor) {
+    return buffer.forEachByte(index, length, processor);
+  }
+
+  @Override
+  public int forEachByteDesc(int index, int length, ByteBufProcessor processor) {
+    return buffer.forEachByteDesc(index, length, processor);
+  }
+
+  @Override
+  public final int refCnt() {
+    return unwrap().refCnt();
+  }
+
+  @Override
+  public final ByteBuf retain() {
+    unwrap().retain();
+    return this;
+  }
+
+  @Override
+  public final ByteBuf retain(int increment) {
+    unwrap().retain(increment);
+    return this;
+  }
+
+  @Override
+  public boolean release() {
+    return release(1);
+  }
+
+  @Override
+  public boolean release(int decrement) {
+    boolean released = unwrap().release(decrement);
+    if (released) {
+      hugeBufferSize.addAndGet(-initCap);
+      hugeBufferCount.decrementAndGet();
+    }
+    return released;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/0dd0e833/exec/java-exec/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java b/exec/java-exec/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java
index c0de544..2ca79f0 100644
--- a/exec/java-exec/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java
+++ b/exec/java-exec/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java
@@ -17,18 +17,100 @@
  */
 package io.netty.buffer;
 
+import io.netty.util.internal.StringUtil;
+
+import java.lang.reflect.Field;
 import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.exec.metrics.DrillMetrics;
+import org.apache.drill.exec.util.AssertionUtil;
+
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Metric;
+import com.codahale.metrics.MetricFilter;
+import com.codahale.metrics.MetricRegistry;
 
 public class PooledByteBufAllocatorL extends PooledByteBufAllocator{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PooledByteBufAllocatorL.class);
 
+  private static final org.slf4j.Logger memoryLogger = org.slf4j.LoggerFactory.getLogger("drill.allocator");
+  private static final int MEMORY_LOGGER_FREQUENCY_SECONDS = 60;
+
+  private static final String METRIC_PREFIX = "drill.allocator.";
   public static final PooledByteBufAllocatorL DEFAULT = new PooledByteBufAllocatorL();
 
-//  public final UnsafeDirectLittleEndian emptyBuf;
+  private final MetricRegistry registry = DrillMetrics.getInstance();
+  private final AtomicLong hugeBufferSize = new AtomicLong(0);
+  private final AtomicLong hugeBufferCount = new AtomicLong(0);
+  private final AtomicLong normalBufferSize = new AtomicLong(0);
+  private final AtomicLong normalBufferCount = new AtomicLong(0);
 
-  public PooledByteBufAllocatorL() {
+  private final PoolArena<ByteBuffer>[] directArenas;
+  private final MemoryStatusThread statusThread;
+  private final Histogram largeBuffersHist;
+  private final Histogram normalBuffersHist;
+
+  private PooledByteBufAllocatorL() {
     super(true);
-//    emptyBuf = newDirectBuffer(0,0);
+    try {
+      Field f = PooledByteBufAllocator.class.getDeclaredField("directArenas");
+      f.setAccessible(true);
+      this.directArenas = (PoolArena<ByteBuffer>[]) f.get(this);
+    } catch (Exception e) {
+      throw new RuntimeException("Failure while initializing allocator.  Unable to retrieve direct arenas field.", e);
+    }
+
+    if (memoryLogger.isTraceEnabled()) {
+      statusThread = new MemoryStatusThread();
+      statusThread.start();
+    } else {
+      statusThread = null;
+    }
+    removeOldMetrics();
+
+    registry.register(METRIC_PREFIX + "normal.size", new Gauge<Long>() {
+      @Override
+      public Long getValue() {
+        return normalBufferSize.get();
+      }
+    });
+
+    registry.register(METRIC_PREFIX + "normal.count", new Gauge<Long>() {
+      @Override
+      public Long getValue() {
+        return normalBufferCount.get();
+      }
+    });
+
+    registry.register(METRIC_PREFIX + "huge.size", new Gauge<Long>() {
+      @Override
+      public Long getValue() {
+        return hugeBufferSize.get();
+      }
+    });
+
+    registry.register(METRIC_PREFIX + "huge.count", new Gauge<Long>() {
+      @Override
+      public Long getValue() {
+        return hugeBufferCount.get();
+      }
+    });
+
+    largeBuffersHist = registry.histogram(METRIC_PREFIX + "huge.hist");
+    normalBuffersHist = registry.histogram(METRIC_PREFIX + "normal.hist");
+
+  }
+
+  private synchronized void removeOldMetrics() {
+    registry.removeMatching(new MetricFilter() {
+      @Override
+      public boolean matches(String name, Metric metric) {
+        return name.startsWith("drill.allocator.");
+      }
+    });
   }
 
   @Override
@@ -41,19 +123,42 @@ public class PooledByteBufAllocatorL extends PooledByteBufAllocator{
     PoolThreadCache cache = threadCache.get();
     PoolArena<ByteBuffer> directArena = cache.directArena;
 
-    ByteBuf buf;
     if (directArena != null) {
-        buf = directArena.allocate(cache, initialCapacity, maxCapacity);
-    } else {
-      throw new UnsupportedOperationException("Drill requries that the allocator operates in DirectBuffer mode.");
-    }
 
-    if(buf instanceof PooledUnsafeDirectByteBuf){
-      return new UnsafeDirectLittleEndian( (PooledUnsafeDirectByteBuf) buf);
-    }else{
-      throw new UnsupportedOperationException("Drill requries that the JVM used supports access sun.misc.Unsafe.  This platform didn't provide that functionality.");
+      if (initialCapacity > directArena.chunkSize) {
+        // This is beyond chunk size so we'll allocate separately.
+        ByteBuf buf = UnpooledByteBufAllocator.DEFAULT.directBuffer(initialCapacity, maxCapacity);
+
+        hugeBufferCount.incrementAndGet();
+        hugeBufferSize.addAndGet(buf.capacity());
+        largeBuffersHist.update(buf.capacity());
+        // logger.debug("Allocating huge buffer of size {}", initialCapacity, new Exception());
+        return new UnsafeDirectLittleEndian(new LargeBuffer(buf, hugeBufferSize, hugeBufferCount));
+
+      } else {
+        // within chunk, use arena.
+        ByteBuf buf = directArena.allocate(cache, initialCapacity, maxCapacity);
+        if (!(buf instanceof PooledUnsafeDirectByteBuf)) {
+          fail();
+        }
+
+        normalBuffersHist.update(buf.capacity());
+        if (AssertionUtil.ASSERT_ENABLED) {
+          normalBufferSize.addAndGet(buf.capacity());
+          normalBufferCount.incrementAndGet();
+        }
+
+        return new UnsafeDirectLittleEndian((PooledUnsafeDirectByteBuf) buf, normalBufferCount, normalBufferSize);
+      }
+
+    } else {
+      throw fail();
     }
+  }
 
+  private UnsupportedOperationException fail() {
+    return new UnsupportedOperationException(
+        "Drill requries that the JVM used supports access sun.misc.Unsafe.  This platform didn't provide that functionality.");
   }
 
 
@@ -81,5 +186,73 @@ public class PooledByteBufAllocatorL extends PooledByteBufAllocator{
                 "initialCapacity: %d (expected: not greater than maxCapacity(%d)",
                 initialCapacity, maxCapacity));
     }
-}
+  }
+
+  private class MemoryStatusThread extends Thread {
+
+    public MemoryStatusThread() {
+      super("memory-status-logger");
+      this.setDaemon(true);
+      this.setName("allocation.logger");
+    }
+
+    @Override
+    public void run() {
+      while (true) {
+        memoryLogger.trace("Memory Usage: \n{}", PooledByteBufAllocatorL.this.toString());
+        try {
+          Thread.sleep(MEMORY_LOGGER_FREQUENCY_SECONDS * 1000);
+        } catch (InterruptedException e) {
+          return;
+        }
+
+      }
+    }
+
+  }
+
+  public void checkAndReset() {
+    if (hugeBufferCount.get() != 0 || normalBufferCount.get() != 0) {
+      StringBuilder buf = new StringBuilder();
+      buf.append("Large buffers outstanding: ");
+      buf.append(hugeBufferCount.get());
+      buf.append(" totaling ");
+      buf.append(hugeBufferSize.get());
+      buf.append(" bytes.");
+      buf.append('\n');
+      buf.append("Normal buffers outstanding: ");
+      buf.append(normalBufferCount.get());
+      buf.append(" totaling ");
+      buf.append(normalBufferSize.get());
+      buf.append(" bytes.");
+      hugeBufferCount.set(0);
+      normalBufferCount.set(0);
+      hugeBufferSize.set(0);
+      normalBufferSize.set(0);
+      throw new DrillRuntimeException(buf.toString());
+    }
+  }
+
+  public String toString() {
+    StringBuilder buf = new StringBuilder();
+    buf.append(directArenas.length);
+    buf.append(" direct arena(s):");
+    buf.append(StringUtil.NEWLINE);
+    for (PoolArena<ByteBuffer> a : directArenas) {
+      buf.append(a);
+    }
+
+    buf.append("Large buffers outstanding: ");
+    buf.append(this.hugeBufferCount.get());
+    buf.append(" totaling ");
+    buf.append(this.hugeBufferSize.get());
+    buf.append(" bytes.");
+    buf.append('\n');
+    buf.append("Normal buffers outstanding: ");
+    buf.append(this.normalBufferCount.get());
+    buf.append(" totaling ");
+    buf.append(this.normalBufferSize.get());
+    buf.append(" bytes.");
+    return buf.toString();
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/0dd0e833/exec/java-exec/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java b/exec/java-exec/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java
index dfdc114..e332b13 100644
--- a/exec/java-exec/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java
+++ b/exec/java-exec/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java
@@ -21,21 +21,39 @@ package io.netty.buffer;
 import io.netty.util.internal.PlatformDependent;
 
 import java.nio.ByteOrder;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.drill.exec.util.AssertionUtil;
 
 public final class UnsafeDirectLittleEndian extends WrappedByteBuf {
     private static final boolean NATIVE_ORDER = ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN;
-    private final PooledUnsafeDirectByteBuf wrapped;
-    private final long memoryAddress;
-
-    UnsafeDirectLittleEndian(PooledUnsafeDirectByteBuf buf) {
-        super(buf);
-        if (!NATIVE_ORDER || buf.order() != ByteOrder.BIG_ENDIAN) {
-          throw new IllegalStateException("Drill only runs on LittleEndian systems.");
-        }
-        wrapped = buf;
-        this.memoryAddress = buf.memoryAddress();
-    }
-
+  private final AbstractByteBuf wrapped;
+  private final long memoryAddress;
+  private AtomicLong bufferCount;
+  private AtomicLong bufferSize;
+  private long initCap = -1;
+
+  UnsafeDirectLittleEndian(LargeBuffer buf) {
+    this(buf, true);
+  }
+
+  UnsafeDirectLittleEndian(PooledUnsafeDirectByteBuf buf, AtomicLong bufferCount, AtomicLong bufferSize) {
+    this(buf, true);
+    this.bufferCount = bufferCount;
+    this.bufferSize = bufferSize;
+
+    // initCap is used if we're tracking memory release. If we're in non-debug mode, we'll skip this.
+    this.initCap = AssertionUtil.ASSERT_ENABLED ? capacity() : -1;
+  }
+
+  private UnsafeDirectLittleEndian(AbstractByteBuf buf, boolean fake) {
+    super(buf);
+    if (!NATIVE_ORDER || buf.order() != ByteOrder.BIG_ENDIAN) {
+      throw new IllegalStateException("Drill only runs on LittleEndian systems.");
+    }
+    wrapped = buf;
+    this.memoryAddress = buf.memoryAddress();
+  }
     private long addr(int index) {
         return memoryAddress + index;
     }
@@ -52,7 +70,27 @@ public final class UnsafeDirectLittleEndian extends WrappedByteBuf {
         return Float.intBitsToFloat(getInt(index));
     }
 
-    @Override
+  @Override
+  public ByteBuf slice() {
+    return slice(this.readerIndex(), readableBytes());
+  }
+
+  @Override
+  public ByteBuf slice(int index, int length) {
+    return new SlicedByteBuf(this, index, length);
+  }
+
+  @Override
+  public ByteOrder order() {
+    return ByteOrder.LITTLE_ENDIAN;
+  }
+
+  @Override
+  public ByteBuf order(ByteOrder endianness) {
+    return this;
+  }
+
+  @Override
     public double getDouble(int index) {
         return Double.longBitsToDouble(getLong(index));
     }
@@ -190,4 +228,20 @@ public final class UnsafeDirectLittleEndian extends WrappedByteBuf {
       return this;
     }
 
+  @Override
+  public boolean release() {
+    return release(1);
+  }
+
+  @Override
+  public boolean release(int decrement) {
+    boolean released = super.release(decrement);
+    if (released && initCap != -1) {
+      bufferCount.decrementAndGet();
+      bufferSize.addAndGet(-initCap);
+    }
+    return released;
+  }
+
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/0dd0e833/exec/java-exec/src/main/java/org/apache/drill/exec/TestMemoryRetention.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/TestMemoryRetention.java b/exec/java-exec/src/main/java/org/apache/drill/exec/TestMemoryRetention.java
new file mode 100644
index 0000000..37e5389
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/TestMemoryRetention.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec;
+
+import io.netty.buffer.DrillBuf;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.memory.TopLevelAllocator;
+
+import com.google.common.collect.Lists;
+
+public class TestMemoryRetention {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestMemoryRetention.class);
+
+  static final int SMALL_AVERAGE_BYTES = 1024 * 32;
+  static final int LARGE_BYTES = 32 * 1024 * 1024;
+  static final int PARALLEL_THREADS = 32;
+  static final double SMALL_ALLOCATION_MEM = 0.20;
+  static final double OVERHEAD_ALLOWANCE = 0.20;
+  static final List<Integer> ALLOCATIONS;
+  static final int MAX_ALLOCS = 100;
+  static final AtomicInteger ALLOCS = new AtomicInteger(0);
+
+  static {
+    Random r = new Random();
+    long maxMemory = DrillConfig.getMaxDirectMemory();
+    long maxPerThread = maxMemory / PARALLEL_THREADS;
+    double smallCount = (maxPerThread * SMALL_ALLOCATION_MEM) / SMALL_AVERAGE_BYTES;
+    double largeCount = (maxPerThread * (1 - SMALL_ALLOCATION_MEM - OVERHEAD_ALLOWANCE)) / LARGE_BYTES;
+    List<Integer> allocations = Lists.newArrayList();
+
+    for (int i = 0; i < smallCount; i++) {
+      allocations.add(SMALL_AVERAGE_BYTES / 2 + r.nextInt(SMALL_AVERAGE_BYTES));
+    }
+
+    for (int i = 0; i < largeCount; i++) {
+      allocations.add(LARGE_BYTES);
+    }
+    Collections.shuffle(allocations);
+    ALLOCATIONS = allocations;
+  }
+
+  public static void main(String[] args) throws Exception {
+
+    final DrillConfig config = DrillConfig.create();
+    final TopLevelAllocator a = new TopLevelAllocator(config);
+    for (int i = 0; i < PARALLEL_THREADS; i++) {
+      Alloc alloc = new Alloc(a);
+      alloc.start();
+    }
+  }
+
+  private static class Alloc extends Thread {
+    final TopLevelAllocator allocator;
+
+    Alloc(TopLevelAllocator allocator) {
+      this.allocator = allocator;
+    }
+
+    @Override
+    public void run() {
+      Random r = new Random();
+      try {
+
+        if (ALLOCS.incrementAndGet() > MAX_ALLOCS) {
+          Thread.sleep(50000000000L);
+        }
+
+        Thread.sleep(r.nextInt(8000));
+      } catch (InterruptedException e) {
+        return;
+      }
+
+      logger.info("Starting alloc.");
+      final List<DrillBuf> bufs = Lists.newLinkedList();
+      for (Integer i : ALLOCATIONS) {
+        bufs.add(allocator.buffer(i));
+      }
+      Collections.shuffle(bufs);
+      logger.info("Finished alloc.");
+
+      final Dealloc d = new Dealloc(bufs, allocator);
+
+      // sometimes we'll deallocate locally, sometimes in different thread.
+      if (r.nextBoolean()) {
+        d.start();
+      } else {
+        d.run();
+      }
+
+    }
+
+  }
+
+  private static class Dealloc extends Thread {
+    final List<DrillBuf> bufs;
+    final TopLevelAllocator a;
+
+    public Dealloc(List<DrillBuf> bufs, TopLevelAllocator a) {
+      this.bufs = bufs;
+      this.a = a;
+    }
+
+    public void run() {
+      try {
+        Thread.sleep(8000);
+        logger.info("Starting release.");
+        for (DrillBuf buf : bufs) {
+          buf.release();
+        }
+        logger.info("Finished release.");
+
+      } catch (InterruptedException e) {
+        return;
+      }
+
+      // start another.
+      Alloc alloc = new Alloc(a);
+      alloc.start();
+    }
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/0dd0e833/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
index 22fcb8e..a78deb6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
@@ -22,8 +22,8 @@ import io.netty.buffer.DrillBuf;
 import io.netty.buffer.PooledByteBufAllocatorL;
 import io.netty.buffer.UnsafeDirectLittleEndian;
 
-import java.util.IdentityHashMap;
 import java.util.HashMap;
+import java.util.IdentityHashMap;
 import java.util.Map;
 import java.util.Map.Entry;
 

http://git-wip-us.apache.org/repos/asf/drill/blob/0dd0e833/exec/java-exec/src/main/java/org/apache/drill/exec/metrics/DrillMetrics.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/metrics/DrillMetrics.java b/exec/java-exec/src/main/java/org/apache/drill/exec/metrics/DrillMetrics.java
index a9799b2..7ef121e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/metrics/DrillMetrics.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/metrics/DrillMetrics.java
@@ -44,12 +44,14 @@ public class DrillMetrics {
 
   private static class RegistryHolder {
     public static final MetricRegistry REGISTRY;
-//    private static JmxReporter jmxReporter = getJmxReporter();
-//    private static Slf4jReporter logReporter = getLogReporter();
+    // private static final JmxReporter JMX_REPORTER;
+    private static final Slf4jReporter LOG_REPORTER;
 
     static {
       REGISTRY = new MetricRegistry();
       registerSysStats();
+      // JMX_REPORTER = getJmxReporter();
+      LOG_REPORTER = getLogReporter();
     }
 
     private static void registerSysStats(){

http://git-wip-us.apache.org/repos/asf/drill/blob/0dd0e833/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index 6ea43cd..4700dbd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -94,10 +94,16 @@ public class ScanBatch implements CloseableRecordBatch {
     this.oContext = oContext;
     this.currentReader.setOperatorContext(this.oContext);
 
+    boolean setup = false;
     try {
       oContext.getStats().startProcessing();
       this.currentReader.setup(mutator);
+      setup = true;
     } finally {
+      // if we had an exception during setup, make sure to release existing data.
+      if (!setup) {
+        currentReader.cleanup();
+      }
       oContext.getStats().stopProcessing();
     }
     this.partitionColumns = partitionColumns.iterator();

http://git-wip-us.apache.org/repos/asf/drill/blob/0dd0e833/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
index 62f1429..6f8e20b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
@@ -17,15 +17,14 @@
  */
 package org.apache.drill.exec.rpc.data;
 
-import java.io.IOException;
-
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.DrillBuf;
-import io.netty.buffer.UnsafeDirectLittleEndian;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.util.concurrent.GenericFutureListener;
 
+import java.io.IOException;
+
 import org.apache.drill.exec.exception.FragmentSetupException;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.proto.BitData.BitClientHandshake;
@@ -105,7 +104,7 @@ public class DataServer extends BasicServer<RpcType, BitServerConnection> {
   private final static FragmentRecordBatch OOM_FRAGMENT = FragmentRecordBatch.newBuilder().setIsOutOfMemory(true).build();
 
 
-  private FragmentHandle getHandle(FragmentRecordBatch batch, int index){
+  private static FragmentHandle getHandle(FragmentRecordBatch batch, int index) {
     return FragmentHandle.newBuilder()
         .setQueryId(batch.getQueryId())
         .setMajorFragmentId(batch.getReceivingMajorFragmentId())
@@ -138,32 +137,14 @@ public class DataServer extends BasicServer<RpcType, BitServerConnection> {
         }
 
       }else{
-
-        for(int minor = 0; minor < targetCount; minor++){
-          FragmentManager manager = workBus.getFragmentManager(getHandle(fragmentBatch, minor));
-          if(manager == null){
-            continue;
+        if (targetCount > 1) {
+          for (int minor = 0; minor < targetCount; minor++) {
+            send(fragmentBatch, (DrillBuf) body, minor, ack, true);
           }
-
-          BufferAllocator allocator = manager.getFragmentContext().getAllocator();
-
-          boolean withinMemoryEnvelope = allocator.takeOwnership((DrillBuf) body, out);
-
-          if(!withinMemoryEnvelope){
-            // if we over reserved, we need to add poison pill before batch.
-            dataHandler.handle(manager, OOM_FRAGMENT, null, null);
-          }
-
-          ack.increment();
-          dataHandler.handle(manager, fragmentBatch, out.value, ack);
-
-          // make sure to release the reference count we have to the new buffer.
-          // dataHandler.handle should have taken any ownership it needed.
-          out.value.release();
+        } else {
+          send(fragmentBatch, (DrillBuf) body, 0, ack, false);
         }
-        out = null;
       }
-
     } catch (IOException | FragmentSetupException e) {
       logger.error("Failure while getting fragment manager. {}",
           QueryIdHelper.getQueryIdentifiers(fragmentBatch.getQueryId(),
@@ -181,6 +162,45 @@ public class DataServer extends BasicServer<RpcType, BitServerConnection> {
     }
   }
 
+  private void send(final FragmentRecordBatch fragmentBatch, final DrillBuf body, final int minor, final AckSender ack,
+      final boolean shared)
+      throws FragmentSetupException, IOException {
+
+    FragmentManager manager = workBus.getFragmentManager(getHandle(fragmentBatch, minor));
+    if (manager == null) {
+      return;
+    }
+
+    final BufferAllocator allocator = manager.getFragmentContext().getAllocator();
+    final Pointer<DrillBuf> out = new Pointer<DrillBuf>();
+
+    final boolean withinMemoryEnvelope;
+    final DrillBuf submitBody;
+
+    if (shared) {
+      withinMemoryEnvelope = allocator.takeOwnership((DrillBuf) body, out);
+      submitBody = out.value;
+    }else{
+      withinMemoryEnvelope = allocator.takeOwnership((DrillBuf) body.unwrap());
+      submitBody = body;
+    }
+
+    if (!withinMemoryEnvelope) {
+      // if we over reserved, we need to add poison pill before batch.
+      dataHandler.handle(manager, OOM_FRAGMENT, null, null);
+    }
+
+    ack.increment();
+    dataHandler.handle(manager, fragmentBatch, submitBody, ack);
+
+    if (shared) {
+      // make sure to release the reference count we have to the new buffer.
+      // dataHandler.handle should have taken any ownership it needed.
+      out.value.release();
+    }
+
+  }
+
   private class ProxyCloseHandler implements GenericFutureListener<ChannelFuture> {
 
     private volatile GenericFutureListener<ChannelFuture> handler;

http://git-wip-us.apache.org/repos/asf/drill/blob/0dd0e833/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnDataReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnDataReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnDataReader.java
index 1b10b1d..1663cd9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnDataReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnDataReader.java
@@ -17,13 +17,12 @@
  */
 package org.apache.drill.exec.store.parquet;
 
-import io.netty.buffer.ByteBuf;
+import io.netty.buffer.DrillBuf;
 
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
 
-import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.hadoop.fs.FSDataInputStream;
 
 import parquet.bytes.BytesInput;
@@ -53,19 +52,11 @@ public class ColumnDataReader {
     return new HadoopBytesInput(b);
   }
 
-  public ByteBuf getPageAsBytesBuf(ByteBuf byteBuf, int pageLength) throws IOException{
-    ByteBuffer directBuffer=byteBuf.nioBuffer(0, pageLength);
-    int l=directBuffer.remaining();
-    int bl=byteBuf.capacity();
-    try{
-      while (directBuffer.remaining() > 0) {
-        CompatibilityUtil.getBuf(input, directBuffer, directBuffer.remaining());
-      }
-    }catch(Exception e) {
-      logger.error("Failed to read data into Direct ByteBuffer with exception: "+e.getMessage());
-      throw new DrillRuntimeException(e.getMessage());
+  public void loadPage(DrillBuf target, int pageLength) throws IOException {
+    ByteBuffer directBuffer = target.nioBuffer(0, pageLength);
+    while (directBuffer.remaining() > 0) {
+      CompatibilityUtil.getBuf(input, directBuffer, directBuffer.remaining());
     }
-    return byteBuf;
   }
 
   public void clear(){

http://git-wip-us.apache.org/repos/asf/drill/blob/0dd0e833/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/BitReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/BitReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/BitReader.java
index 7416463..81b8002 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/BitReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/BitReader.java
@@ -17,10 +17,7 @@
  */
 package org.apache.drill.exec.store.parquet.columnreaders;
 
-import io.netty.buffer.ByteBuf;
-
 import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.vector.BaseDataValueVector;
 import org.apache.drill.exec.vector.BitVector;
 import org.apache.drill.exec.vector.ValueVector;
 
@@ -38,7 +35,7 @@ final class BitReader extends ColumnReader {
   @Override
   protected void readField(long recordsToReadInThisPass) {
 
-    recordsReadInThisIteration = Math.min(pageReader.currentPage.getValueCount()
+    recordsReadInThisIteration = Math.min(pageReader.currentPageCount
         - pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass);
 
     // A more optimized reader for bit columns was removed to fix the bug

http://git-wip-us.apache.org/repos/asf/drill/blob/0dd0e833/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java
index d3f1a30..5650ae3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java
@@ -17,7 +17,7 @@
  */
 package org.apache.drill.exec.store.parquet.columnreaders;
 
-import io.netty.buffer.ByteBuf;
+import io.netty.buffer.DrillBuf;
 
 import java.io.IOException;
 
@@ -67,7 +67,7 @@ public abstract class ColumnReader<V extends ValueVector> {
   int dataTypeLengthInBits;
   int bytesReadInCurrentPass;
 
-  protected ByteBuf vectorData;
+  protected DrillBuf vectorData;
   // when reading definition levels for nullable columns, it is a one-way stream of integers
   // when reading var length data, where we don't know if all of the records will fit until we've read all of them
   // we must store the last definition level an use it in at the start of the next batch
@@ -106,14 +106,14 @@ public abstract class ColumnReader<V extends ValueVector> {
       do {
         determineSize(recordsToReadInThisPass, 0);
 
-      } while (valuesReadInCurrentPass < recordsToReadInThisPass && pageReader.currentPage != null);
+      } while (valuesReadInCurrentPass < recordsToReadInThisPass && pageReader.hasPage());
     }
     valueVec.getMutator().setValueCount(valuesReadInCurrentPass);
   }
 
   public void clear() {
     valueVec.clear();
-    this.pageReader.clear();
+    pageReader.clear();
   }
 
   public void readValues(long recordsToRead) {
@@ -189,11 +189,11 @@ public abstract class ColumnReader<V extends ValueVector> {
 
   // Read a page if we need more data, returns true if we need to exit the read loop
   public boolean readPage() throws IOException {
-    if (pageReader.currentPage == null
-        || totalValuesReadAndReadyToReadInPage() == pageReader.currentPage.getValueCount()) {
+    if (!pageReader.hasPage()
+        || totalValuesReadAndReadyToReadInPage() == pageReader.currentPageCount) {
       readRecords(pageReader.valuesReadyToRead);
-      if (pageReader.currentPage != null) {
-        totalValuesRead += pageReader.currentPage.getValueCount();
+      if (pageReader.hasPage()) {
+        totalValuesRead += pageReader.currentPageCount;
       }
       if (!pageReader.next()) {
         hitRowGroupEnd();
@@ -225,4 +225,13 @@ public abstract class ColumnReader<V extends ValueVector> {
     return false;
   }
 
+  // copied out of parquet library, didn't want to deal with the uneeded throws statement they had declared
+  public static int readIntLittleEndian(DrillBuf in, int offset) {
+    int ch4 = in.getByte(offset) & 0xff;
+    int ch3 = in.getByte(offset + 1) & 0xff;
+    int ch2 = in.getByte(offset + 2) & 0xff;
+    int ch1 = in.getByte(offset + 3) & 0xff;
+    return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0));
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/0dd0e833/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java
index a425bc1..fe0234b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java
@@ -17,7 +17,6 @@
  */
 package org.apache.drill.exec.store.parquet.columnreaders;
 
-import io.netty.buffer.ByteBuf;
 import io.netty.buffer.DrillBuf;
 
 import java.math.BigDecimal;
@@ -52,21 +51,20 @@ class FixedByteAlignedReader extends ColumnReader {
   @Override
   protected void readField(long recordsToReadInThisPass) {
 
-    recordsReadInThisIteration = Math.min(pageReader.currentPage.getValueCount()
+    recordsReadInThisIteration = Math.min(pageReader.currentPageCount
         - pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass);
 
     readStartInBytes = pageReader.readPosInBytes;
     readLengthInBits = recordsReadInThisIteration * dataTypeLengthInBits;
     readLength = (int) Math.ceil(readLengthInBits / 8.0);
 
-    bytebuf = pageReader.pageDataByteArray;
+    bytebuf = pageReader.pageData;
     // vectorData is assigned by the superclass read loop method
     writeData();
   }
 
   protected void writeData() {
-    vectorData.writeBytes(bytebuf,
-        (int) readStartInBytes, (int) readLength);
+    vectorData.writeBytes(bytebuf, (int) readStartInBytes, (int) readLength);
   }
 
   public static class FixedBinaryReader extends FixedByteAlignedReader {
@@ -120,12 +118,12 @@ class FixedByteAlignedReader extends ColumnReader {
 
   public static class DateReader extends ConvertedReader {
 
-    DateVector dateVector;
+    private final DateVector.Mutator mutator;
 
     DateReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
                     boolean fixedLength, ValueVector v, SchemaElement schemaElement) throws ExecutionSetupException {
       super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
-      dateVector = (DateVector) v;
+      mutator = ((DateVector) v).getMutator();
     }
 
     @Override
@@ -137,17 +135,9 @@ class FixedByteAlignedReader extends ColumnReader {
         intValue = readIntLittleEndian(bytebuf, start);
       }
 
-      dateVector.getMutator().set(index, DateTimeUtils.fromJulianDay(intValue - ParquetOutputRecordWriter.JULIAN_DAY_EPOC - 0.5));
+      mutator.set(index, DateTimeUtils.fromJulianDay(intValue - ParquetOutputRecordWriter.JULIAN_DAY_EPOC - 0.5));
     }
 
-    // copied out of parquet library, didn't want to deal with the uneeded throws statement they had declared
-    public static int readIntLittleEndian(ByteBuf in, int offset) {
-      int ch4 = in.getByte(offset) & 0xff;
-      int ch3 = in.getByte(offset + 1) & 0xff;
-      int ch2 = in.getByte(offset + 2) & 0xff;
-      int ch1 = in.getByte(offset + 3) & 0xff;
-      return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0));
-    }
   }
 
   public static class Decimal28Reader extends ConvertedReader {
@@ -163,7 +153,8 @@ class FixedByteAlignedReader extends ColumnReader {
     @Override
     void addNext(int start, int index) {
       int width = Decimal28SparseHolder.WIDTH;
-      BigDecimal intermediate = DecimalUtility.getBigDecimalFromDrillBuf(bytebuf, start, dataTypeLengthInBytes, schemaElement.getScale());
+      BigDecimal intermediate = DecimalUtility.getBigDecimalFromDrillBuf(bytebuf, start, dataTypeLengthInBytes,
+          schemaElement.getScale());
       DecimalUtility.getSparseFromBigDecimal(intermediate, decimal28Vector.getBuffer(), index * width, schemaElement.getScale(),
               schemaElement.getPrecision(), Decimal28SparseHolder.nDecimalDigits);
     }
@@ -182,7 +173,8 @@ class FixedByteAlignedReader extends ColumnReader {
     @Override
     void addNext(int start, int index) {
       int width = Decimal38SparseHolder.WIDTH;
-      BigDecimal intermediate = DecimalUtility.getBigDecimalFromDrillBuf(bytebuf, start, dataTypeLengthInBytes, schemaElement.getScale());
+      BigDecimal intermediate = DecimalUtility.getBigDecimalFromDrillBuf(bytebuf, start, dataTypeLengthInBytes,
+          schemaElement.getScale());
       DecimalUtility.getSparseFromBigDecimal(intermediate, decimal38Vector.getBuffer(), index * width, schemaElement.getScale(),
               schemaElement.getPrecision(), Decimal38SparseHolder.nDecimalDigits);
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/0dd0e833/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java
index 05f6417..7f8b611 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java
@@ -178,7 +178,7 @@ public class FixedWidthRepeatedReader extends VarLengthColumn {
             definitionLevelsRead++;
 
             // we hit the end of this page, without confirmation that we reached the end of the current record
-            if (definitionLevelsRead == pageReader.currentPage.getValueCount()) {
+            if (definitionLevelsRead == pageReader.currentPageCount) {
               // check that we have not hit the end of the row group (in which case we will not find the repetition level indicating
               // the end of this record as there is no next page to check, we have read all the values in this repetition so it is okay
               // to add it to the read )

http://git-wip-us.apache.org/repos/asf/drill/blob/0dd0e833/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableBitReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableBitReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableBitReader.java
index a4143d5..8a8ac29 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableBitReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableBitReader.java
@@ -43,7 +43,7 @@ final class NullableBitReader extends ColumnReader {
   @Override
   public void readField(long recordsToReadInThisPass) {
 
-    recordsReadInThisIteration = Math.min(pageReader.currentPage.getValueCount()
+    recordsReadInThisIteration = Math.min(pageReader.currentPageCount
         - pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass);
     int defLevel;
     for (int i = 0; i < recordsReadInThisIteration; i++){

http://git-wip-us.apache.org/repos/asf/drill/blob/0dd0e833/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableColumnReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableColumnReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableColumnReader.java
index 9e62520..d721601 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableColumnReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableColumnReader.java
@@ -70,9 +70,9 @@ abstract class NullableColumnReader<V extends ValueVector> extends ColumnReader<
       boolean lastRunBrokenByNull = false;
       while (indexInOutputVector < recordsToReadInThisPass && indexInOutputVector < valueVec.getValueCapacity()){
         // read a page if needed
-        if ( pageReader.currentPage == null
+      if (!pageReader.hasPage()
             || ((readStartInBytes + readLength >= pageReader.byteLength && bitsUsed == 0) &&
-            definitionLevelsRead >= pageReader.currentPage.getValueCount())) {
+          definitionLevelsRead >= pageReader.currentPageCount)) {
           if (!pageReader.next()) {
             break;
           }
@@ -89,7 +89,7 @@ abstract class NullableColumnReader<V extends ValueVector> extends ColumnReader<
         // loop to find the longest run of defined values available, can be preceded by several nulls
         while(indexInOutputVector < recordsToReadInThisPass
             && indexInOutputVector < valueVec.getValueCapacity()
-            && definitionLevelsRead < pageReader.currentPage.getValueCount()){
+          && definitionLevelsRead < pageReader.currentPageCount) {
           currentDefinitionLevel = pageReader.definitionLevels.readInteger();
           definitionLevelsRead++;
           indexInOutputVector++;

http://git-wip-us.apache.org/repos/asf/drill/blob/0dd0e833/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
index ff1d7f9..c2221d6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
@@ -17,7 +17,6 @@
  */
 package org.apache.drill.exec.store.parquet.columnreaders;
 
-import io.netty.buffer.ByteBuf;
 import io.netty.buffer.DrillBuf;
 
 import java.math.BigDecimal;
@@ -29,15 +28,15 @@ import org.apache.drill.exec.store.ParquetOutputRecordWriter;
 import org.apache.drill.exec.util.DecimalUtility;
 import org.apache.drill.exec.vector.NullableBigIntVector;
 import org.apache.drill.exec.vector.NullableDateVector;
+import org.apache.drill.exec.vector.NullableDecimal18Vector;
 import org.apache.drill.exec.vector.NullableDecimal28SparseVector;
 import org.apache.drill.exec.vector.NullableDecimal38SparseVector;
+import org.apache.drill.exec.vector.NullableDecimal9Vector;
 import org.apache.drill.exec.vector.NullableFloat4Vector;
 import org.apache.drill.exec.vector.NullableFloat8Vector;
 import org.apache.drill.exec.vector.NullableIntVector;
-import org.apache.drill.exec.vector.NullableDecimal9Vector;
-import org.apache.drill.exec.vector.NullableDecimal18Vector;
-import org.apache.drill.exec.vector.NullableTimeVector;
 import org.apache.drill.exec.vector.NullableTimeStampVector;
+import org.apache.drill.exec.vector.NullableTimeVector;
 import org.apache.drill.exec.vector.ValueVector;
 import org.joda.time.DateTimeUtils;
 
@@ -58,7 +57,7 @@ public class NullableFixedByteAlignedReaders {
     // this method is called by its superclass during a read loop
     @Override
     protected void readField(long recordsToReadInThisPass) {
-      this.bytebuf = pageReader.pageDataByteArray;
+      this.bytebuf = pageReader.pageData;
 
       // fill in data.
       vectorData.writeBytes(bytebuf, (int) readStartInBytes, (int) readLength);
@@ -259,7 +258,7 @@ public class NullableFixedByteAlignedReaders {
     @Override
     protected void readField(long recordsToReadInThisPass) {
 
-      this.bytebuf = pageReader.pageDataByteArray;
+      this.bytebuf = pageReader.pageData;
 
       dataTypeLengthInBytes = (int) Math.ceil(dataTypeLengthInBits / 8.0);
       for (int i = 0; i < recordsToReadInThisPass; i++) {
@@ -292,15 +291,6 @@ public class NullableFixedByteAlignedReaders {
       dateVector.getMutator().set(index, DateTimeUtils.fromJulianDay(intValue - ParquetOutputRecordWriter.JULIAN_DAY_EPOC - 0.5));
     }
 
-    // copied out of parquet library, didn't want to deal with the uneeded throws statement they had declared
-    public static int readIntLittleEndian(ByteBuf in, int offset) {
-      int ch4 = in.getByte(offset) & 0xff;
-      int ch3 = in.getByte(offset + 1) & 0xff;
-      int ch2 = in.getByte(offset + 2) & 0xff;
-      int ch1 = in.getByte(offset + 3) & 0xff;
-      return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0));
-    }
-
   }
 
   public static class NullableDecimal28Reader extends NullableConvertedReader {
@@ -316,7 +306,8 @@ public class NullableFixedByteAlignedReaders {
     @Override
     void addNext(int start, int index) {
       int width = NullableDecimal28SparseHolder.WIDTH;
-      BigDecimal intermediate = DecimalUtility.getBigDecimalFromDrillBuf(bytebuf, start, dataTypeLengthInBytes, schemaElement.getScale());
+      BigDecimal intermediate = DecimalUtility.getBigDecimalFromDrillBuf(bytebuf, start, dataTypeLengthInBytes,
+          schemaElement.getScale());
       DecimalUtility.getSparseFromBigDecimal(intermediate, decimal28Vector.getBuffer(), index * width, schemaElement.getScale(),
           schemaElement.getPrecision(), NullableDecimal28SparseHolder.nDecimalDigits);
     }
@@ -335,7 +326,8 @@ public class NullableFixedByteAlignedReaders {
     @Override
     void addNext(int start, int index) {
       int width = NullableDecimal38SparseHolder.WIDTH;
-      BigDecimal intermediate = DecimalUtility.getBigDecimalFromDrillBuf(bytebuf, start, dataTypeLengthInBytes, schemaElement.getScale());
+      BigDecimal intermediate = DecimalUtility.getBigDecimalFromDrillBuf(bytebuf, start, dataTypeLengthInBytes,
+          schemaElement.getScale());
       DecimalUtility.getSparseFromBigDecimal(intermediate, decimal38Vector.getBuffer(), index * width, schemaElement.getScale(),
           schemaElement.getPrecision(), NullableDecimal38SparseHolder.nDecimalDigits);
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/0dd0e833/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableVarLengthValuesColumn.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableVarLengthValuesColumn.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableVarLengthValuesColumn.java
index aa3d9c5..528b6db 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableVarLengthValuesColumn.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableVarLengthValuesColumn.java
@@ -88,10 +88,10 @@ public abstract class NullableVarLengthValuesColumn<V extends ValueVector> exten
     }
     else {
       // re-purposing  this field here for length in BYTES to prevent repetitive multiplication/division
-      dataTypeLengthInBits = pageReader.pageDataByteArray.getInt((int) pageReader.readyToReadPosInBytes);
+      dataTypeLengthInBits = pageReader.pageData.getInt((int) pageReader.readyToReadPosInBytes);
     }
     // I think this also needs to happen if it is null for the random access
-    boolean success = setSafe(valuesReadInCurrentPass + pageReader.valuesReadyToRead, pageReader.pageDataByteArray,
+    boolean success = setSafe(valuesReadInCurrentPass + pageReader.valuesReadyToRead, pageReader.pageData,
         (int) pageReader.readyToReadPosInBytes + 4, dataTypeLengthInBits);
     if ( ! success ) {
       return true;
@@ -130,7 +130,7 @@ public abstract class NullableVarLengthValuesColumn<V extends ValueVector> exten
       }
       // re-purposing  this field here for length in BYTES to prevent repetitive multiplication/division
       dataTypeLengthInBits = variableWidthVector.getAccessor().getValueLength(valuesReadInCurrentPass);
-      boolean success = setSafe(valuesReadInCurrentPass, pageReader.pageDataByteArray,
+      boolean success = setSafe(valuesReadInCurrentPass, pageReader.pageData,
           (int) pageReader.readPosInBytes + 4, dataTypeLengthInBits);
       assert success;
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/0dd0e833/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
index d260029..6a41a04 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
@@ -17,14 +17,15 @@
  */
 package org.apache.drill.exec.store.parquet.columnreaders;
 
+import static parquet.format.converter.ParquetMetadataConverter.fromParquetStatistics;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.DrillBuf;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.store.parquet.ColumnDataReader;
 import org.apache.drill.exec.store.parquet.ParquetFormatPlugin;
@@ -34,30 +35,35 @@ import org.apache.hadoop.fs.Path;
 
 import parquet.bytes.BytesInput;
 import parquet.column.Dictionary;
+import parquet.column.Encoding;
 import parquet.column.ValuesType;
-import parquet.column.page.DataPageV1;
 import parquet.column.page.DictionaryPage;
+import parquet.column.statistics.Statistics;
 import parquet.column.values.ValuesReader;
 import parquet.column.values.dictionary.DictionaryValuesReader;
 import parquet.format.PageHeader;
 import parquet.format.PageType;
 import parquet.format.Util;
+import parquet.format.converter.ParquetMetadataConverter;
+import parquet.hadoop.CodecFactoryExposer;
+import parquet.hadoop.CodecFactoryExposer.HadoopByteBufBytesInput;
 import parquet.hadoop.metadata.ColumnChunkMetaData;
 import parquet.hadoop.metadata.CompressionCodecName;
 import parquet.schema.PrimitiveType;
 
-import static parquet.format.converter.ParquetMetadataConverter.fromParquetStatistics;
+import com.google.common.base.Preconditions;
 
 // class to keep track of the read position of variable length columns
 final class PageReader {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PageReader.class);
 
+  public static final ParquetMetadataConverter METADATA_CONVERTER = ParquetFormatPlugin.parquetMetadataConverter;
+
   private final ColumnReader parentColumnReader;
   private final ColumnDataReader dataReader;
-  // store references to the pages that have been uncompressed, but not copied to ValueVectors yet
-  DataPageV1 currentPage;
+
   // buffer to store bytes of current page
-  DrillBuf pageDataByteArray;
+  DrillBuf pageData;
 
   // for variable length data we need to keep track of our current position in the page data
   // as the values and lengths are intermixed, making random access to the length data impossible
@@ -90,60 +96,72 @@ final class PageReader {
   Dictionary dictionary;
   PageHeader pageHeader = null;
 
-  List<ByteBuf> allocatedBuffers;
+  int currentPageCount = -1;
 
   // These need to be held throughout reading of the entire column chunk
   List<ByteBuf> allocatedDictionaryBuffers;
 
-  PageReader(ColumnReader parentStatus, FileSystem fs, Path path, ColumnChunkMetaData columnChunkMetaData)
+  private final CodecFactoryExposer codecFactory;
+
+  PageReader(ColumnReader<?> parentStatus, FileSystem fs, Path path, ColumnChunkMetaData columnChunkMetaData)
     throws ExecutionSetupException{
     this.parentColumnReader = parentStatus;
-    allocatedBuffers = new ArrayList<ByteBuf>();
     allocatedDictionaryBuffers = new ArrayList<ByteBuf>();
+    codecFactory = parentColumnReader.parentReader.getCodecFactoryExposer();
 
-    long totalByteLength = columnChunkMetaData.getTotalUncompressedSize();
     long start = columnChunkMetaData.getFirstDataPageOffset();
     try {
       FSDataInputStream f = fs.open(path);
       this.dataReader = new ColumnDataReader(f, start, columnChunkMetaData.getTotalSize());
-      if (columnChunkMetaData.getDictionaryPageOffset() > 0) {
-        f.seek(columnChunkMetaData.getDictionaryPageOffset());
-        PageHeader pageHeader = Util.readPageHeader(f);
-        assert pageHeader.type == PageType.DICTIONARY_PAGE;
-
-        BytesInput bytesIn;
-        ByteBuf uncompressedData=allocateBuffer(pageHeader.getUncompressed_page_size());
-        allocatedDictionaryBuffers.add(uncompressedData);
-        if(parentColumnReader.columnChunkMetaData.getCodec()==CompressionCodecName.UNCOMPRESSED) {
-          dataReader.getPageAsBytesBuf(uncompressedData, pageHeader.compressed_page_size);
-          bytesIn=parentColumnReader.parentReader.getCodecFactoryExposer().getBytesInput(uncompressedData,
-            pageHeader.getUncompressed_page_size());
-        }else{
-          ByteBuf compressedData=allocateBuffer(pageHeader.compressed_page_size);
-          dataReader.getPageAsBytesBuf(compressedData, pageHeader.compressed_page_size);
-          bytesIn = parentColumnReader.parentReader.getCodecFactoryExposer()
-            .decompress(parentColumnReader.columnChunkMetaData.getCodec(),
+      loadDictionaryIfExists(parentStatus, columnChunkMetaData, f);
+
+    } catch (IOException e) {
+      throw new ExecutionSetupException("Error opening or reading metadata for parquet file at location: "
+          + path.getName(), e);
+    }
+
+  }
+
+  private void loadDictionaryIfExists(final ColumnReader<?> parentStatus,
+      final ColumnChunkMetaData columnChunkMetaData, final FSDataInputStream f) throws IOException {
+    if (columnChunkMetaData.getDictionaryPageOffset() > 0) {
+      f.seek(columnChunkMetaData.getDictionaryPageOffset());
+      final PageHeader pageHeader = Util.readPageHeader(f);
+      assert pageHeader.type == PageType.DICTIONARY_PAGE;
+
+      final DrillBuf dictionaryData = allocateDictionaryBuffer(pageHeader.getUncompressed_page_size());
+
+      if (parentColumnReader.columnChunkMetaData.getCodec() == CompressionCodecName.UNCOMPRESSED) {
+        dataReader.loadPage(dictionaryData, pageHeader.compressed_page_size);
+      } else {
+        final DrillBuf compressedData = allocateTemporaryBuffer(pageHeader.compressed_page_size);
+        try {
+          dataReader.loadPage(compressedData, pageHeader.compressed_page_size);
+          codecFactory.decompress(parentColumnReader.columnChunkMetaData.getCodec(),
               compressedData,
-              uncompressedData,
+              dictionaryData,
               pageHeader.compressed_page_size,
               pageHeader.getUncompressed_page_size());
+
+        } finally {
           compressedData.release();
         }
-        DictionaryPage page = new DictionaryPage(
-            bytesIn,
-            pageHeader.uncompressed_page_size,
-            pageHeader.dictionary_page_header.num_values,
-            parquet.column.Encoding.valueOf(pageHeader.dictionary_page_header.encoding.name())
-        );
-        this.dictionary = page.getEncoding().initDictionary(parentStatus.columnDescriptor, page);
       }
-    } catch (IOException e) {
-      throw new ExecutionSetupException("Error opening or reading metadata for parquet file at location: "
-        + path.getName(), e);
-    }
 
+      DictionaryPage page = new DictionaryPage(
+          getBytesInput(dictionaryData),
+          pageHeader.uncompressed_page_size,
+          pageHeader.dictionary_page_header.num_values,
+          parquet.column.Encoding.valueOf(pageHeader.dictionary_page_header.encoding.name())
+          );
+      this.dictionary = page.getEncoding().initDictionary(parentStatus.columnDescriptor, page);
+    }
   }
 
+  public static BytesInput getBytesInput(DrillBuf uncompressedByteBuf) throws IOException {
+    final ByteBuffer outBuffer = uncompressedByteBuf.nioBuffer(0, uncompressedByteBuf.capacity());
+    return new HadoopByteBufBytesInput(outBuffer, 0, outBuffer.limit());
+  }
 
   /**
    * Grab the next page.
@@ -153,7 +171,7 @@ final class PageReader {
    */
   public boolean next() throws IOException {
 
-    currentPage = null;
+    currentPageCount = -1;
     valuesRead = 0;
     valuesReadyToRead = 0;
 
@@ -172,26 +190,24 @@ final class PageReader {
       if (pageHeader.getType() == PageType.DICTIONARY_PAGE) {
 
         //TODO: Handle buffer allocation exception
-        BytesInput bytesIn;
-        ByteBuf uncompressedData=allocateBuffer(pageHeader.getUncompressed_page_size());
-        allocatedDictionaryBuffers.add(uncompressedData);
+        DrillBuf uncompressedData = allocateDictionaryBuffer(pageHeader.getUncompressed_page_size());
         if( parentColumnReader.columnChunkMetaData.getCodec()== CompressionCodecName.UNCOMPRESSED) {
-          dataReader.getPageAsBytesBuf(uncompressedData, pageHeader.compressed_page_size);
-          bytesIn=parentColumnReader.parentReader.getCodecFactoryExposer().getBytesInput(uncompressedData,
-            pageHeader.getUncompressed_page_size());
+          dataReader.loadPage(uncompressedData, pageHeader.compressed_page_size);
         }else{
-          ByteBuf compressedData=allocateBuffer(pageHeader.compressed_page_size);
-          dataReader.getPageAsBytesBuf(compressedData, pageHeader.compressed_page_size);
-          bytesIn = parentColumnReader.parentReader.getCodecFactoryExposer()
-            .decompress(parentColumnReader.columnChunkMetaData.getCodec(),
-              compressedData,
-              uncompressedData,
-              pageHeader.compressed_page_size,
-              pageHeader.getUncompressed_page_size());
-          compressedData.release();
+          final DrillBuf compressedData = allocateTemporaryBuffer(pageHeader.compressed_page_size);
+          try{
+            dataReader.loadPage(compressedData, pageHeader.compressed_page_size);
+            codecFactory.decompress(parentColumnReader.columnChunkMetaData.getCodec(),
+                compressedData,
+                uncompressedData,
+                pageHeader.compressed_page_size,
+                pageHeader.getUncompressed_page_size());
+          } finally {
+            compressedData.release();
+          }
         }
         DictionaryPage page = new DictionaryPage(
-            bytesIn,
+            getBytesInput(uncompressedData),
             pageHeader.uncompressed_page_size,
             pageHeader.dictionary_page_header.num_values,
             parquet.column.Encoding.valueOf(pageHeader.dictionary_page_header.encoding.name())
@@ -201,47 +217,40 @@ final class PageReader {
     } while (pageHeader.getType() == PageType.DICTIONARY_PAGE);
 
     //TODO: Handle buffer allocation exception
-    BytesInput bytesIn;
-    ByteBuf uncompressedData=allocateBuffer(pageHeader.getUncompressed_page_size());
-    allocatedBuffers.add(uncompressedData);
+
+    allocatePageData(pageHeader.getUncompressed_page_size());
+
     if(parentColumnReader.columnChunkMetaData.getCodec()==CompressionCodecName.UNCOMPRESSED) {
-      dataReader.getPageAsBytesBuf(uncompressedData, pageHeader.compressed_page_size);
-      bytesIn=parentColumnReader.parentReader.getCodecFactoryExposer().getBytesInput(uncompressedData,
-        pageHeader.getUncompressed_page_size());
+      dataReader.loadPage(pageData, pageHeader.compressed_page_size);
     }else{
-      ByteBuf compressedData=allocateBuffer(pageHeader.compressed_page_size);
-      dataReader.getPageAsBytesBuf(compressedData, pageHeader.compressed_page_size);
-      bytesIn = parentColumnReader.parentReader.getCodecFactoryExposer()
-        .decompress(parentColumnReader.columnChunkMetaData.getCodec(),
+      final DrillBuf compressedData = allocateTemporaryBuffer(pageHeader.compressed_page_size);
+      dataReader.loadPage(compressedData, pageHeader.compressed_page_size);
+      codecFactory.decompress(parentColumnReader.columnChunkMetaData.getCodec(),
           compressedData,
-          uncompressedData,
+          pageData,
           pageHeader.compressed_page_size,
           pageHeader.getUncompressed_page_size());
       compressedData.release();
     }
-    currentPage = new DataPageV1(
-        bytesIn,
-        pageHeader.data_page_header.num_values,
-        pageHeader.uncompressed_page_size,
-        fromParquetStatistics(pageHeader.data_page_header.getStatistics(), parentColumnReader.getColumnDescriptor().getType()), // ?
-        ParquetFormatPlugin.parquetMetadataConverter.getEncoding(pageHeader.data_page_header.repetition_level_encoding),
-        ParquetFormatPlugin.parquetMetadataConverter.getEncoding(pageHeader.data_page_header.definition_level_encoding),
-        ParquetFormatPlugin.parquetMetadataConverter.getEncoding(pageHeader.data_page_header.encoding)
-    );
 
-    byteLength = pageHeader.uncompressed_page_size;
+    currentPageCount = pageHeader.data_page_header.num_values;
 
-    if (currentPage == null) {
-      return false;
-    }
+    final int uncompressedPageSize = pageHeader.uncompressed_page_size;
+    final Statistics<?> stats = fromParquetStatistics(pageHeader.data_page_header.getStatistics(), parentColumnReader
+        .getColumnDescriptor().getType());
+    final Encoding rlEncoding = METADATA_CONVERTER.getEncoding(pageHeader.data_page_header.repetition_level_encoding);
+
+    final Encoding dlEncoding = METADATA_CONVERTER.getEncoding(pageHeader.data_page_header.definition_level_encoding);
+    final Encoding valueEncoding = METADATA_CONVERTER.getEncoding(pageHeader.data_page_header.encoding);
+
+    byteLength = pageHeader.uncompressed_page_size;
 
-    pageDataByteArray = DrillBuf.wrapByteBuffer(currentPage.getBytes().toByteBuffer());
-    allocatedBuffers.add(pageDataByteArray);
+    final ByteBuffer pageDataBuffer = pageData.nioBuffer(0, pageData.capacity());
 
     readPosInBytes = 0;
     if (parentColumnReader.getColumnDescriptor().getMaxRepetitionLevel() > 0) {
-      repetitionLevels = currentPage.getRlEncoding().getValuesReader(parentColumnReader.columnDescriptor, ValuesType.REPETITION_LEVEL);
-      repetitionLevels.initFromPage(currentPage.getValueCount(), pageDataByteArray.nioBuffer(), (int) readPosInBytes);
+      repetitionLevels = rlEncoding.getValuesReader(parentColumnReader.columnDescriptor, ValuesType.REPETITION_LEVEL);
+      repetitionLevels.initFromPage(currentPageCount, pageDataBuffer, (int) readPosInBytes);
       // we know that the first value will be a 0, at the end of each list of repeated values we will hit another 0 indicating
       // a new record, although we don't know the length until we hit it (and this is a one way stream of integers) so we
       // read the first zero here to simplify the reading processes, and start reading the first value the same as all
@@ -252,25 +261,25 @@ final class PageReader {
     }
     if (parentColumnReader.columnDescriptor.getMaxDefinitionLevel() != 0){
       parentColumnReader.currDefLevel = -1;
-      definitionLevels = currentPage.getDlEncoding().getValuesReader(parentColumnReader.columnDescriptor, ValuesType.DEFINITION_LEVEL);
-      definitionLevels.initFromPage(currentPage.getValueCount(), pageDataByteArray.nioBuffer(), (int) readPosInBytes);
+      definitionLevels = dlEncoding.getValuesReader(parentColumnReader.columnDescriptor, ValuesType.DEFINITION_LEVEL);
+      definitionLevels.initFromPage(currentPageCount, pageDataBuffer, (int) readPosInBytes);
       readPosInBytes = definitionLevels.getNextOffset();
-      if ( ! currentPage.getValueEncoding().usesDictionary()) {
-        valueReader = currentPage.getValueEncoding().getValuesReader(parentColumnReader.columnDescriptor, ValuesType.VALUES);
-        valueReader.initFromPage(currentPage.getValueCount(), pageDataByteArray.nioBuffer(), (int) readPosInBytes);
+      if (!valueEncoding.usesDictionary()) {
+        valueReader = valueEncoding.getValuesReader(parentColumnReader.columnDescriptor, ValuesType.VALUES);
+        valueReader.initFromPage(currentPageCount, pageDataBuffer, (int) readPosInBytes);
       }
     }
     if (parentColumnReader.columnDescriptor.getType() == PrimitiveType.PrimitiveTypeName.BOOLEAN) {
-      valueReader = currentPage.getValueEncoding().getValuesReader(parentColumnReader.columnDescriptor, ValuesType.VALUES);
-      valueReader.initFromPage(currentPage.getValueCount(), pageDataByteArray.nioBuffer(), (int) readPosInBytes);
+      valueReader = valueEncoding.getValuesReader(parentColumnReader.columnDescriptor, ValuesType.VALUES);
+      valueReader.initFromPage(currentPageCount, pageDataBuffer, (int) readPosInBytes);
     }
-    if (currentPage.getValueEncoding().usesDictionary()) {
+    if (valueEncoding.usesDictionary()) {
       // initialize two of the dictionary readers, one is for determining the lengths of each value, the second is for
       // actually copying the values out into the vectors
       dictionaryLengthDeterminingReader = new DictionaryValuesReader(dictionary);
-      dictionaryLengthDeterminingReader.initFromPage(currentPage.getValueCount(), pageDataByteArray.nioBuffer(), (int) readPosInBytes);
+      dictionaryLengthDeterminingReader.initFromPage(currentPageCount, pageDataBuffer, (int) readPosInBytes);
       dictionaryValueReader = new DictionaryValuesReader(dictionary);
-      dictionaryValueReader.initFromPage(currentPage.getValueCount(), pageDataByteArray.nioBuffer(), (int) readPosInBytes);
+      dictionaryValueReader.initFromPage(currentPageCount, pageDataBuffer, (int) readPosInBytes);
       parentColumnReader.usingDictionary = true;
     } else {
       parentColumnReader.usingDictionary = false;
@@ -283,11 +292,41 @@ final class PageReader {
     return true;
   }
 
+  /**
+   * Allocate a page data buffer. Note that only one page data buffer should be active at a time. The reader will ensure
+   * that the page data is released after the reader is completed.
+   */
+  private void allocatePageData(int size) {
+    Preconditions.checkArgument(pageData == null);
+    pageData = parentColumnReader.parentReader.getOperatorContext().getAllocator().buffer(size);
+  }
+
+  /**
+   * Allocate a buffer which the user should release immediately. The reader does not manage release of these buffers.
+   */
+  private DrillBuf allocateTemporaryBuffer(int size) {
+    return parentColumnReader.parentReader.getOperatorContext().getAllocator().buffer(size);
+  }
+
+  /**
+   * Allocate and return a dictionary buffer. These are maintained for the life of the reader and then released when the
+   * reader is cleared.
+   */
+  private DrillBuf allocateDictionaryBuffer(int size) {
+    DrillBuf buf = parentColumnReader.parentReader.getOperatorContext().getAllocator().buffer(size);
+    allocatedDictionaryBuffers.add(buf);
+    return buf;
+  }
+
+  protected boolean hasPage() {
+    return currentPageCount != -1;
+  }
+
   public void clearBuffers() {
-    for (ByteBuf b : allocatedBuffers) {
-      b.release();
+    if (pageData != null) {
+      pageData.release();
+      pageData = null;
     }
-    allocatedBuffers.clear();
   }
 
   public void clearDictionaryBuffers() {
@@ -306,20 +345,6 @@ final class PageReader {
     //}
   }
 
-  /*
-    Allocate direct memory to read data into
-   */
-  private ByteBuf allocateBuffer(int size) {
-    ByteBuf b;
-    try {
-      b = parentColumnReader.parentReader.getOperatorContext().getAllocator().buffer(size);
-      //b = UnpooledByteBufAllocator.DEFAULT.heapBuffer(size);
-    }catch(Exception e){
-      throw new DrillRuntimeException("Unable to allocate "+size+" bytes of memory in the Parquet Reader."+
-        "[Exception: "+e.getMessage()+"]"
-      );
-    }
-    return b;
-  }
+
 
 }


Mime
View raw message