drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jacq...@apache.org
Subject [1/3] drill git commit: DRILL-3033: Add memory leak fixes found so far in DRILL-1942 to 1.0.0 Fixes some missing buffer retains() and missing vector clears().
Date Tue, 12 May 2015 22:44:03 GMT
Repository: drill
Updated Branches:
  refs/heads/master d10769f47 -> 6d7cda8ea


DRILL-3033: Add memory leak fixes found so far in DRILL-1942 to 1.0.0 Fixes some missing buffer retains() and missing vector clears().


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/da70b63f
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/da70b63f
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/da70b63f

Branch: refs/heads/master
Commit: da70b63fb63419414f54b3159f7e92694039d1da
Parents: d10769f
Author: Chris Westin <cwestin@yahoo.com>
Authored: Mon May 11 22:06:30 2015 -0700
Committer: Jacques Nadeau <jacques@apache.org>
Committed: Tue May 12 14:06:55 2015 -0700

----------------------------------------------------------------------
 .../codegen/templates/FixedValueVectors.java    |   9 +-
 .../codegen/templates/NullableValueVectors.java |  75 +++---
 .../templates/VariableLengthVectors.java        | 111 ++++-----
 .../cache/VectorAccessibleSerializable.java     |  12 +-
 .../drill/exec/memory/BufferAllocator.java      |   3 +
 .../impl/TopN/PriorityQueueTemplate.java        |  17 +-
 .../exec/physical/impl/TopN/TopNBatch.java      |  44 ++--
 .../exec/physical/impl/join/MergeJoinBatch.java |   1 +
 .../impl/join/MergeJoinBatchBuilder.java        |  18 +-
 .../impl/mergereceiver/MergingRecordBatch.java  |   2 +
 .../OrderedPartitionRecordBatch.java            | 226 ++++++++++---------
 .../exec/physical/impl/sort/SortBatch.java      |   1 +
 .../impl/sort/SortRecordBatchBuilder.java       |  15 +-
 .../physical/impl/xsort/ExternalSortBatch.java  |   5 +
 .../exec/physical/impl/xsort/MSortTemplate.java |  10 +-
 .../impl/xsort/PriorityQueueCopierTemplate.java |   7 +-
 .../apache/drill/exec/record/WritableBatch.java |  55 ++---
 .../org/apache/drill/exec/vector/BitVector.java |   2 +
 .../exec/vector/complex/AbstractMapVector.java  |   9 +
 .../drill/exec/vector/complex/MapVector.java    |  10 +
 .../exec/record/vector/TestValueVector.java     |  37 ++-
 21 files changed, 376 insertions(+), 293 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/da70b63f/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java b/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java
index 7d85810..0dffa0b 100644
--- a/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java
+++ b/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java
@@ -50,7 +50,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
 
   private int allocationValueCount = INITIAL_VALUE_ALLOCATION;
   private int allocationMonitor = 0;
-  
+
   public ${minor.class}Vector(MaterializedField field, BufferAllocator allocator) {
     super(field, allocator);
   }
@@ -187,6 +187,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
     int currentWriterIndex = data.writerIndex();
     int startPoint = startIndex * ${type.width};
     int sliceLength = length * ${type.width};
+    target.clear();
     target.data = this.data.slice(startPoint, sliceLength);
     target.data.writerIndex(sliceLength);
     target.data.retain();
@@ -749,9 +750,9 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
    }
 
   </#if> <#-- type.width -->
-  
 
-  
+
+
    public void setValueCount(int valueCount) {
      int currentValueCapacity = getValueCapacity();
      int idx = (${type.width} * valueCount);
@@ -770,7 +771,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
 
 
 
-  
+
  }
 }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/da70b63f/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java b/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java
index 9d03efb..ce6a3a7 100644
--- a/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java
+++ b/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java
@@ -79,13 +79,14 @@ public final class ${className} extends BaseDataValueVector implements <#if type
     }
     return buffers;
   }
-  
+
   @Override
   public void clear() {
     bits.clear();
     values.clear();
+    super.clear();
   }
-  
+
   public int getBufferSize(){
     return values.getBufferSize() + bits.getBufferSize();
   }
@@ -120,7 +121,7 @@ public final class ${className} extends BaseDataValueVector implements <#if type
       throw new OutOfMemoryRuntimeException("Failure while allocating buffer.");
     }
   }
-  
+
   @Override
   public boolean allocateNewSafe() {
     if(!values.allocateNewSafe()) return false;
@@ -144,7 +145,7 @@ public final class ${className} extends BaseDataValueVector implements <#if type
   public int load(int dataBytes, int valueCount, DrillBuf buf){
     clear();
     int loaded = bits.load(valueCount, buf);
-    
+
     // remove bits part of buffer.
     buf = buf.slice(loaded, buf.capacity() - loaded);
     dataBytes -= loaded;
@@ -152,14 +153,14 @@ public final class ${className} extends BaseDataValueVector implements <#if type
     this.mutator.lastSet = valueCount;
     return loaded;
   }
-  
+
   @Override
   public void load(SerializedField metadata, DrillBuf buffer) {
     assert this.field.matches(metadata) : String.format("The field %s doesn't match the provided metadata %s.", this.field, metadata);
     int loaded = load(metadata.getBufferLength(), metadata.getValueCount(), buffer);
     assert metadata.getBufferLength() == loaded : String.format("Expected to load %d bytes but actually loaded %d bytes", metadata.getBufferLength(), loaded);
   }
-  
+
   @Override
   public int getByteCapacity(){
     return values.getByteCapacity();
@@ -180,7 +181,7 @@ public final class ${className} extends BaseDataValueVector implements <#if type
     mutator.reset();
     accessor.reset();
   }
-  
+
 
   @Override
   public boolean allocateNewSafe() {
@@ -213,22 +214,22 @@ public final class ${className} extends BaseDataValueVector implements <#if type
   public int load(int valueCount, DrillBuf buf){
     clear();
     int loaded = bits.load(valueCount, buf);
-    
+
     // remove bits part of buffer.
     buf = buf.slice(loaded, buf.capacity() - loaded);
     loaded += values.load(valueCount, buf);
     return loaded;
   }
-  
+
   @Override
   public void load(SerializedField metadata, DrillBuf buffer) {
     assert this.field.matches(metadata);
     int loaded = load(metadata.getValueCount(), buffer);
     assert metadata.getBufferLength() == loaded;
   }
-  
+
   </#if>
-  
+
   public TransferPair getTransferPair(){
     return new TransferImpl(getField());
   }
@@ -240,7 +241,7 @@ public final class ${className} extends BaseDataValueVector implements <#if type
     return new TransferImpl((Nullable${minor.class}Vector) to);
   }
 
-  
+
   public void transferTo(Nullable${minor.class}Vector target){
     bits.transferTo(target.bits);
     values.transferTo(target.values);
@@ -257,10 +258,10 @@ public final class ${className} extends BaseDataValueVector implements <#if type
     target.mutator.lastSet = length - 1;
     </#if>
   }
-  
+
   private class TransferImpl implements TransferPair{
     Nullable${minor.class}Vector to;
-    
+
     public TransferImpl(MaterializedField field){
       this.to = new Nullable${minor.class}Vector(field, allocator);
     }
@@ -272,7 +273,7 @@ public final class ${className} extends BaseDataValueVector implements <#if type
     public Nullable${minor.class}Vector getTo(){
       return to;
     }
-    
+
     public void transfer(){
       transferTo(to);
     }
@@ -286,15 +287,15 @@ public final class ${className} extends BaseDataValueVector implements <#if type
       to.copyFromSafe(fromIndex, toIndex, Nullable${minor.class}Vector.this);
     }
   }
-  
+
   public Accessor getAccessor(){
     return accessor;
   }
-  
+
   public Mutator getMutator(){
     return mutator;
   }
-  
+
   public ${minor.class}Vector convertToRequiredVector(){
     ${minor.class}Vector v = new ${minor.class}Vector(getField().getOtherNullableVersion(), allocator);
     v.data = values.data;
@@ -303,14 +304,14 @@ public final class ${className} extends BaseDataValueVector implements <#if type
     return v;
   }
 
-  
+
   public void copyFrom(int fromIndex, int thisIndex, Nullable${minor.class}Vector from){
     if (!from.getAccessor().isNull(fromIndex)) {
       mutator.set(thisIndex, from.getAccessor().get(fromIndex));
     }
   }
 
-  
+
   public void copyFromSafe(int fromIndex, int thisIndex, ${minor.class}Vector from){
     <#if type.major == "VarLen">
     mutator.fillEmpties(thisIndex);
@@ -318,7 +319,7 @@ public final class ${className} extends BaseDataValueVector implements <#if type
     values.copyFromSafe(fromIndex, thisIndex, from);
     bits.getMutator().setSafe(thisIndex, 1);
   }
-  
+
   public void copyFromSafe(int fromIndex, int thisIndex, Nullable${minor.class}Vector from){
     <#if type.major == "VarLen">
     mutator.fillEmpties(thisIndex);
@@ -340,7 +341,9 @@ public final class ${className} extends BaseDataValueVector implements <#if type
      * @throws  NullValueException if the value is null
      */
     public <#if type.major == "VarLen">byte[]<#else>${minor.javaType!type.javaType}</#if> get(int index) {
-      assert !isNull(index) : "Tried to get null value";
+      if (isNull(index)) {
+          throw new IllegalStateException("Can't get a null value");
+      }
       return vAccessor.get(index);
     }
 
@@ -351,7 +354,7 @@ public final class ${className} extends BaseDataValueVector implements <#if type
     public int isSet(int index){
       return bAccessor.get(index);
     }
-    
+
     <#if type.major == "VarLen">
     public long getStartEnd(int index){
       return vAccessor.getStartEnd(index);
@@ -394,12 +397,12 @@ public final class ${className} extends BaseDataValueVector implements <#if type
     public int getValueCount(){
       return bits.getAccessor().getValueCount();
     }
-    
+
     public void reset(){}
   }
-  
+
   public final class Mutator extends BaseDataValueVector.BaseMutator implements NullableVectorDefinitionSetter<#if type.major = "VarLen">, VariableWidthVector.VariableWidthMutator</#if> {
-    
+
     private int setCount;
     <#if type.major = "VarLen"> private int lastSet = -1;</#if>
 
@@ -447,7 +450,7 @@ public final class ${className} extends BaseDataValueVector implements <#if type
       values.getMutator().setValueLengthSafe(index, length);
     }
     </#if>
-    
+
     public void setSafe(int index, byte[] value, int start, int length) {
       <#if type.major != "VarLen">
       throw new UnsupportedOperationException();
@@ -460,7 +463,7 @@ public final class ${className} extends BaseDataValueVector implements <#if type
       <#if type.major == "VarLen">lastSet = index;</#if>
       </#if>
     }
-    
+
     public void setSafe(int index, ByteBuffer value, int start, int length) {
       <#if type.major != "VarLen">
       throw new UnsupportedOperationException();
@@ -477,7 +480,7 @@ public final class ${className} extends BaseDataValueVector implements <#if type
     public void setNull(int index){
       bits.getMutator().setSafe(index, 0);
     }
-    
+
     public void setSkipNull(int index, ${minor.class}Holder holder){
       values.getMutator().set(index, holder);
     }
@@ -485,8 +488,8 @@ public final class ${className} extends BaseDataValueVector implements <#if type
     public void setSkipNull(int index, Nullable${minor.class}Holder holder){
       values.getMutator().set(index, holder);
     }
-    
-    
+
+
     public void set(int index, Nullable${minor.class}Holder holder){
       <#if type.major == "VarLen">
       for (int i = lastSet + 1; i < index; i++) {
@@ -508,7 +511,7 @@ public final class ${className} extends BaseDataValueVector implements <#if type
       values.getMutator().set(index, holder);
       <#if type.major == "VarLen">lastSet = index;</#if>
     }
-    
+
     public boolean isSafe(int outIndex) {
       return outIndex < Nullable${minor.class}Vector.this.getValueCapacity();
     }
@@ -524,12 +527,12 @@ public final class ${className} extends BaseDataValueVector implements <#if type
       values.getMutator().set(index<#list fields as field><#if field.include!true >, ${field.name}Field</#if></#list>);
       <#if type.major == "VarLen">lastSet = index;</#if>
     }
-    
+
     public void setSafe(int index, int isSet<#list fields as field><#if field.include!true >, ${field.type} ${field.name}Field</#if></#list> ) {
       <#if type.major == "VarLen">
       fillEmpties(index);
       </#if>
-      
+
       bits.getMutator().setSafe(index, isSet);
       values.getMutator().setSafe(index<#list fields as field><#if field.include!true >, ${field.name}Field</#if></#list>);
       setCount++;
@@ -558,7 +561,7 @@ public final class ${className} extends BaseDataValueVector implements <#if type
       setCount++;
       <#if type.major == "VarLen">lastSet = index;</#if>
     }
-    
+
     <#if !(type.major == "VarLen" || minor.class == "Decimal28Sparse" || minor.class == "Decimal38Sparse" || minor.class == "Decimal28Dense" || minor.class == "Decimal38Dense" || minor.class == "Interval" || minor.class == "IntervalDay")>
       public void setSafe(int index, ${minor.javaType!type.javaType} value) {
         <#if type.major == "VarLen">
@@ -591,7 +594,7 @@ public final class ${className} extends BaseDataValueVector implements <#if type
       setCount = 0;
       <#if type.major = "VarLen">lastSet = -1;</#if>
     }
-    
+
   }
 }
 </#list>

http://git-wip-us.apache.org/repos/asf/drill/blob/da70b63f/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java b/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
index 659d99b..529f21b 100644
--- a/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
+++ b/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
@@ -55,15 +55,15 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
   private static final int DEFAULT_RECORD_BYTE_COUNT = 8;
   private static final int INITIAL_BYTE_COUNT = 4096 * DEFAULT_RECORD_BYTE_COUNT;
   private static final int MIN_BYTE_COUNT = 4096;
-  
+
   private final UInt${type.width}Vector offsetVector;
   private final FieldReader reader = new ${minor.class}ReaderImpl(${minor.class}Vector.this);
 
   private final Accessor accessor;
   private final Mutator mutator;
-  
+
   private final UInt${type.width}Vector.Accessor oAccessor;
-  
+
 
   private int allocationTotalByteCount = INITIAL_BYTE_COUNT;
   private int allocationMonitor = 0;
@@ -85,23 +85,23 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
     if (getAccessor().getValueCount() == 0) return 0;
     return offsetVector.getBufferSize() + data.writerIndex();
   }
-  
+
   int getSizeFromCount(int valueCount) {
     return valueCount * ${type.width};
   }
-  
+
   public int getValueCapacity(){
     return offsetVector.getValueCapacity() - 1;
   }
-  
+
   public int getByteCapacity(){
-    return data.capacity(); 
+    return data.capacity();
   }
 
   public int getCurrentSizeInBytes() {
     return offsetVector.getAccessor().get(getAccessor().getValueCount());
   }
-  
+
   /**
    * Return the number of bytes contained in the current var len byte vector.
    * @return
@@ -111,7 +111,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
     if(valueCount == 0) return 0;
     return offsetVector.getAccessor().get(valueCount);
   }
-  
+
   @Override
   public SerializedField getMetadata() {
     return getMetadataBuilder() //
@@ -126,26 +126,27 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
       allocateNew(0,0);
       return 0;
     }
+    clear();
     int loaded = offsetVector.load(valueCount+1, buf);
     data = buf.slice(loaded, dataBytes - loaded);
     data.retain();
     return  dataBytes;
   }
-  
+
   @Override
   public void load(SerializedField metadata, DrillBuf buffer) {
     assert this.field.matches(metadata) : String.format("The field %s doesn't match the provided metadata %s.", this.field, metadata);
     int loaded = load(metadata.getBufferLength(), metadata.getValueCount(), buffer);
     assert metadata.getBufferLength() == loaded : String.format("Expected to load %d bytes but actually loaded %d bytes", metadata.getBufferLength(), loaded);
   }
-  
+
   @Override
   public void clear() {
     super.clear();
     offsetVector.clear();
   }
 
-  
+
   @Override
   public DrillBuf[] getBuffers(boolean clear) {
     DrillBuf[] buffers = ObjectArrays.concat(offsetVector.getBuffers(false), super.getBuffers(false), DrillBuf.class);
@@ -158,15 +159,15 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
     }
     return buffers;
   }
-  
+
   public long getOffsetAddr(){
     return offsetVector.getBuffer().memoryAddress();
   }
-  
+
   public UInt${type.width}Vector getOffsetVector(){
     return offsetVector;
   }
-  
+
   public TransferPair getTransferPair(){
     return new TransferImpl(getField());
   }
@@ -177,7 +178,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
   public TransferPair makeTransferPair(ValueVector to) {
     return new TransferImpl((${minor.class}Vector) to);
   }
-  
+
   public void transferTo(${minor.class}Vector target){
     target.clear();
     this.offsetVector.transferTo(target.offsetVector);
@@ -198,25 +199,25 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
     target.data.retain();
     target.getMutator().setValueCount(length);
 }
-  
+
   protected void copyFrom(int fromIndex, int thisIndex, ${minor.class}Vector from){
     int start = from.offsetVector.getAccessor().get(fromIndex);
     int end =   from.offsetVector.getAccessor().get(fromIndex+1);
     int len = end - start;
-    
+
     int outputStart = offsetVector.data.get${(minor.javaType!type.javaType)?cap_first}(thisIndex * ${type.width});
     from.data.getBytes(start, data, outputStart, len);
     offsetVector.data.set${(minor.javaType!type.javaType)?cap_first}( (thisIndex+1) * ${type.width}, outputStart + len);
   }
-  
+
   public boolean copyFromSafe(int fromIndex, int thisIndex, ${minor.class}Vector from){
 
     int start = from.offsetVector.getAccessor().get(fromIndex);
     int end =   from.offsetVector.getAccessor().get(fromIndex+1);
     int len = end - start;
-    
+
     int outputStart = offsetVector.data.get${(minor.javaType!type.javaType)?cap_first}(thisIndex * ${type.width});
-    
+
     while(data.capacity() < outputStart + len) {
         reAlloc();
     }
@@ -228,10 +229,10 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
     return true;
   }
 
-  
+
   private class TransferImpl implements TransferPair{
     ${minor.class}Vector to;
-    
+
     public TransferImpl(MaterializedField field){
       this.to = new ${minor.class}Vector(field, allocator);
     }
@@ -243,7 +244,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
     public ${minor.class}Vector getTo(){
       return to;
     }
-    
+
     public void transfer(){
       transferTo(to);
     }
@@ -251,7 +252,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
     public void splitAndTransfer(int startIndex, int length) {
       splitAndTransferTo(startIndex, length, to);
     }
-    
+
     @Override
     public void copyValueSafe(int fromIndex, int toIndex) {
       to.copyFromSafe(fromIndex, toIndex, ${minor.class}Vector.this);
@@ -269,7 +270,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
       throw new OutOfMemoryRuntimeException("Failure while allocating buffer.");
     }
   }
-  
+
   @Override
   public boolean allocateNewSafe() {
     clear();
@@ -295,7 +296,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
     offsetVector.zeroVector();
     return true;
   }
-  
+
   public void allocateNew(int totalBytes, int valueCount) {
     clear();
     assert totalBytes >= 0;
@@ -338,18 +339,18 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
   public Accessor getAccessor(){
     return accessor;
   }
-  
+
   public Mutator getMutator() {
     return mutator;
   }
-  
+
   public final class Accessor extends BaseValueVector.BaseAccessor implements VariableWidthAccessor {
     final UInt${type.width}Vector.Accessor oAccessor = offsetVector.getAccessor();
 
     public long getStartEnd(int index){
       return oAccessor.getTwoAsLong(index);
     }
-    
+
     public byte[] get(int index) {
       assert index >= 0;
       int startIdx = oAccessor.get(index);
@@ -363,20 +364,20 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
     public int getValueLength(int index) {
       return offsetVector.getAccessor().get(index + 1) - offsetVector.getAccessor().get(index);
     }
-    
+
     public void get(int index, ${minor.class}Holder holder){
       holder.start = oAccessor.get(index);
       holder.end = oAccessor.get(index + 1);
       holder.buffer = data;
     }
-    
+
     public void get(int index, Nullable${minor.class}Holder holder){
       holder.isSet = 1;
       holder.start = oAccessor.get(index);
       holder.end = oAccessor.get(index + 1);
       holder.buffer = data;
     }
-    
+
 
     <#switch minor.class>
     <#case "VarChar">
@@ -397,9 +398,9 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
     }
 
     </#switch>
-    
-    
-    
+
+
+
     public int getValueCount() {
       return Math.max(offsetVector.getAccessor().getValueCount()-1, 0);
     }
@@ -407,12 +408,12 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
     public boolean isNull(int index){
       return false;
     }
-    
+
     public UInt${type.width}Vector getOffsetVector(){
       return offsetVector;
     }
   }
-  
+
   /**
    * Mutable${minor.class} implements a vector of variable width values.  Elements in the vector
    * are accessed by position from the logical start of the vector.  A fixed width offsetVector
@@ -464,7 +465,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
       offsetVector.getMutator().set(index + 1, currentOffset + length);
       data.setBytes(currentOffset, bytes, start, length);
     }
-    
+
     public void setSafe(int index, ByteBuffer bytes, int start, int length) {
       assert index >= 0;
 
@@ -500,51 +501,51 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
 
     public void setSafe(int index, int start, int end, DrillBuf buffer){
       int len = end - start;
-      
+
       int outputStart = offsetVector.data.get${(minor.javaType!type.javaType)?cap_first}(index * ${type.width});
-      
+
       while(data.capacity() < outputStart + len) {
         reAlloc();
       }
-      
+
       offsetVector.getMutator().setSafe( index+1,  outputStart + len);
       buffer.getBytes(start, data, outputStart, len);
     }
-    
-    
+
+
     public void setSafe(int index, Nullable${minor.class}Holder holder){
       assert holder.isSet == 1;
 
       int start = holder.start;
       int end =   holder.end;
       int len = end - start;
-      
+
       int outputStart = offsetVector.data.get${(minor.javaType!type.javaType)?cap_first}(index * ${type.width});
-      
+
       while(data.capacity() < outputStart + len) {
         reAlloc();
       }
-      
+
       holder.buffer.getBytes(start, data, outputStart, len);
       offsetVector.getMutator().setSafe( index+1,  outputStart + len);
     }
-    
+
     public void setSafe(int index, ${minor.class}Holder holder){
 
       int start = holder.start;
       int end =   holder.end;
       int len = end - start;
-      
+
       int outputStart = offsetVector.data.get${(minor.javaType!type.javaType)?cap_first}(index * ${type.width});
-      
+
       while(data.capacity() < outputStart + len) {
         reAlloc();
       }
-      
+
       holder.buffer.getBytes(start, data, outputStart, len);
       offsetVector.getMutator().setSafe( index+1,  outputStart + len);
     }
-    
+
     protected void set(int index, int start, int length, DrillBuf buffer){
       assert index >= 0;
       int currentOffset = offsetVector.getAccessor().get(index);
@@ -559,14 +560,14 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
       offsetVector.getMutator().set(index + 1, currentOffset + length);
       data.setBytes(currentOffset, holder.buffer, holder.start, length);
     }
-    
+
     protected void set(int index, ${minor.class}Holder holder){
       int length = holder.end - holder.start;
       int currentOffset = offsetVector.getAccessor().get(index);
       offsetVector.getMutator().set(index + 1, currentOffset + length);
       data.setBytes(currentOffset, holder.buffer, holder.start, length);
     }
-    
+
     public void setValueCount(int valueCount) {
       int currentByteCapacity = getByteCapacity();
       int idx = offsetVector.getAccessor().get(valueCount);
@@ -601,7 +602,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
       setValueCount(size);
     }
   }
-  
+
 }
 
 

http://git-wip-us.apache.org/repos/asf/drill/blob/da70b63f/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
index 8e2ce96..016cd92 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
@@ -117,10 +117,14 @@ public class VectorAccessibleSerializable extends AbstractStreamSerializable {
       if (buf == null) {
         throw new IOException(new OutOfMemoryException());
       }
-      buf.writeBytes(input, dataLength);
-      ValueVector vector = TypeHelper.getNewVector(field, allocator);
-      vector.load(metaData, buf);
-      buf.release();
+      final ValueVector vector;
+      try {
+        buf.writeBytes(input, dataLength);
+        vector = TypeHelper.getNewVector(field, allocator);
+        vector.load(metaData, buf);
+      } finally {
+        buf.release();
+      }
       vectorList.add(vector);
     }
     container.addCollection(vectorList);

http://git-wip-us.apache.org/repos/asf/drill/blob/da70b63f/exec/java-exec/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java
index c233ac5..811cceb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java
@@ -101,6 +101,9 @@ public interface BufferAllocator extends Closeable {
 
   /**
    * Not thread safe.
+   *
+   * WARNING: unclaimed pre-allocations leak memory. If you call preAllocate(), you must
+   * make sure to ultimately try to get the buffer and release it.
    */
   public interface PreAllocator {
     public boolean preAllocate(int bytes);

http://git-wip-us.apache.org/repos/asf/drill/blob/da70b63f/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java
index 369c0ec..7e22e65 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java
@@ -17,6 +17,8 @@
  */
 package org.apache.drill.exec.physical.impl.TopN;
 
+import io.netty.buffer.DrillBuf;
+
 import java.util.concurrent.TimeUnit;
 
 import javax.inject.Named;
@@ -53,9 +55,8 @@ public abstract class PriorityQueueTemplate implements PriorityQueue {
     this.limit = limit;
     this.context = context;
     this.allocator = allocator;
-    BufferAllocator.PreAllocator preAlloc = allocator.getNewPreAllocator();
-    preAlloc.preAllocate(4 * (limit + 1));
-    heapSv4 = new SelectionVector4(preAlloc.getAllocation(), limit, Character.MAX_VALUE);
+    final DrillBuf drillBuf = allocator.buffer(4 * (limit + 1));
+    heapSv4 = new SelectionVector4(drillBuf, limit, Character.MAX_VALUE);
     this.hasSv2 = hasSv2;
   }
 
@@ -70,9 +71,8 @@ public abstract class PriorityQueueTemplate implements PriorityQueue {
     newContainer.buildSchema(BatchSchema.SelectionVectorMode.FOUR_BYTE);
     this.hyperBatch = new ExpandableHyperContainer(newContainer);
     this.batchCount = hyperBatch.iterator().next().getValueVectors().length;
-    BufferAllocator.PreAllocator preAlloc = allocator.getNewPreAllocator();
-    preAlloc.preAllocate(4 * (limit + 1));
-    this.heapSv4 = new SelectionVector4(preAlloc.getAllocation(), limit, Character.MAX_VALUE);
+    final DrillBuf drillBuf = allocator.buffer(4 * (limit + 1));
+    this.heapSv4 = new SelectionVector4(drillBuf, limit, Character.MAX_VALUE);
     for (int i = 0; i < v4.getTotalCount(); i++) {
       heapSv4.set(i, v4.get(i));
     }
@@ -120,9 +120,8 @@ public abstract class PriorityQueueTemplate implements PriorityQueue {
   public void generate() throws SchemaChangeException {
     Stopwatch watch = new Stopwatch();
     watch.start();
-    BufferAllocator.PreAllocator preAlloc = allocator.getNewPreAllocator();
-    preAlloc.preAllocate(4 * queueSize);
-    finalSv4 = new SelectionVector4(preAlloc.getAllocation(), queueSize, 4000);
+    final DrillBuf drillBuf = allocator.buffer(4 * queueSize);
+    finalSv4 = new SelectionVector4(drillBuf, queueSize, 4000);
     for (int i = queueSize - 1; i >= 0; i--) {
       finalSv4.set(i, pop());
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/da70b63f/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
index 1cf6213..349f1b1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
@@ -278,26 +278,30 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
       copier.setupRemover(context, batch, newBatch);
     }
     SortRecordBatchBuilder builder = new SortRecordBatchBuilder(oContext.getAllocator(), MAX_SORT_BYTES);
-    do {
-      int count = selectionVector4.getCount();
-      int copiedRecords = copier.copyRecords(0, count);
-      assert copiedRecords == count;
-      for (VectorWrapper<?> v : newContainer) {
-        ValueVector.Mutator m = v.getValueVector().getMutator();
-        m.setValueCount(count);
-      }
-      newContainer.buildSchema(BatchSchema.SelectionVectorMode.NONE);
-      newContainer.setRecordCount(count);
-      builder.add(newBatch);
-    } while (selectionVector4.next());
-    selectionVector4.clear();
-    c.clear();
-    VectorContainer newQueue = new VectorContainer();
-    builder.canonicalize();
-    builder.build(context, newQueue);
-    priorityQueue.resetQueue(newQueue, builder.getSv4().createNewWrapperCurrent());
-    builder.getSv4().clear();
-    selectionVector4.clear();
+    try {
+      do {
+        int count = selectionVector4.getCount();
+        int copiedRecords = copier.copyRecords(0, count);
+        assert copiedRecords == count;
+        for (VectorWrapper<?> v : newContainer) {
+          ValueVector.Mutator m = v.getValueVector().getMutator();
+          m.setValueCount(count);
+        }
+        newContainer.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+        newContainer.setRecordCount(count);
+        builder.add(newBatch);
+      } while (selectionVector4.next());
+      selectionVector4.clear();
+      c.clear();
+      VectorContainer newQueue = new VectorContainer();
+      builder.canonicalize();
+      builder.build(context, newQueue);
+      priorityQueue.resetQueue(newQueue, builder.getSv4().createNewWrapperCurrent());
+      builder.getSv4().clear();
+      selectionVector4.clear();
+    } finally {
+      builder.close();
+    }
     logger.debug("Took {} us to purge", watch.elapsed(TimeUnit.MICROSECONDS));
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/da70b63f/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
index 026d79e..ee2ce7f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -256,6 +256,7 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
   }
 
   public void resetBatchBuilder() {
+    batchBuilder.close();
     batchBuilder = new MergeJoinBatchBuilder(oContext.getAllocator(), status);
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/da70b63f/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatchBuilder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatchBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatchBuilder.java
index 1187bd6..2798010 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatchBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatchBuilder.java
@@ -17,6 +17,8 @@
  */
 package org.apache.drill.exec.physical.impl.join;
 
+import io.netty.buffer.DrillBuf;
+
 import java.util.List;
 
 import org.apache.drill.exec.exception.SchemaChangeException;
@@ -33,7 +35,7 @@ import org.apache.drill.exec.vector.ValueVector;
 
 import com.google.common.collect.ArrayListMultimap;
 
-public class MergeJoinBatchBuilder {
+public class MergeJoinBatchBuilder implements AutoCloseable {
 
   private final ArrayListMultimap<BatchSchema, RecordBatchData> queuedRightBatches = ArrayListMultimap.create();
   private VectorContainer container;
@@ -41,6 +43,7 @@ public class MergeJoinBatchBuilder {
   private int runningBatches;
   private int recordCount;
   private PreAllocator svAllocator;
+  private boolean svAllocatorUsed = false;
   private JoinStatus status;
 
   public MergeJoinBatchBuilder(BufferAllocator allocator, JoinStatus status) {
@@ -90,7 +93,9 @@ public class MergeJoinBatchBuilder {
     if (queuedRightBatches.size() > Character.MAX_VALUE) {
       throw new SchemaChangeException("Join cannot work on more than %d batches at a time.", (int) Character.MAX_VALUE);
     }
-    status.sv4 = new SelectionVector4(svAllocator.getAllocation(), recordCount, Character.MAX_VALUE);
+    final DrillBuf drillBuf = svAllocator.getAllocation();
+    svAllocatorUsed = true;
+    status.sv4 = new SelectionVector4(drillBuf, recordCount, Character.MAX_VALUE);
     BatchSchema schema = queuedRightBatches.keySet().iterator().next();
     List<RecordBatchData> data = queuedRightBatches.get(schema);
 
@@ -140,4 +145,13 @@ public class MergeJoinBatchBuilder {
     container.buildSchema(BatchSchema.SelectionVectorMode.FOUR_BYTE);
   }
 
+  @Override
+  public void close() {
+    if (!svAllocatorUsed) {
+      final DrillBuf drillBuf = svAllocator.getAllocation();
+      if (drillBuf != null) {
+        drillBuf.release();
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/da70b63f/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
index b28b7b0..611052b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
@@ -733,6 +733,8 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
         f.cleanup();
       }
     }
+
+    super.close();
   }
 
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/da70b63f/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
index ca6d83c..1286fe1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
@@ -172,74 +172,84 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
     // Start collecting batches until recordsToSample records have been collected
 
     SortRecordBatchBuilder builder = new SortRecordBatchBuilder(oContext.getAllocator(), MAX_SORT_BYTES);
-    builder.add(incoming);
-
-    recordsSampled += incoming.getRecordCount();
-
-    outer: while (recordsSampled < recordsToSample) {
-      upstream = next(incoming);
-      switch (upstream) {
-      case NONE:
-      case NOT_YET:
-      case STOP:
-        upstreamNone = true;
-        break outer;
-      default:
-        // fall through
-      }
+    WritableBatch batch = null;
+    CachedVectorContainer sampleToSave = null;
+    VectorContainer containerToCache = new VectorContainer();
+    try {
       builder.add(incoming);
+
       recordsSampled += incoming.getRecordCount();
-      if (upstream == IterOutcome.NONE) {
-        break;
-      }
-    }
-  VectorContainer sortedSamples = new VectorContainer();
-    builder.build(context, sortedSamples);
 
-    // Sort the records according the orderings given in the configuration
+      outer: while (recordsSampled < recordsToSample) {
+        upstream = next(incoming);
+        switch (upstream) {
+        case NONE:
+        case NOT_YET:
+        case STOP:
+          upstreamNone = true;
+          break outer;
+        default:
+          // fall through
+        }
+        builder.add(incoming);
+        recordsSampled += incoming.getRecordCount();
+        if (upstream == IterOutcome.NONE) {
+          break;
+        }
+      }
+      VectorContainer sortedSamples = new VectorContainer();
+      builder.build(context, sortedSamples);
+
+      // Sort the records according the orderings given in the configuration
+
+      Sorter sorter = SortBatch.createNewSorter(context, popConfig.getOrderings(), sortedSamples);
+      SelectionVector4 sv4 = builder.getSv4();
+      sorter.setup(context, sv4, sortedSamples);
+      sorter.sort(sv4, sortedSamples);
+
+      // Project every Nth record to a new vector container, where N = recordsSampled/(samplingFactor * partitions).
+      // Uses the
+      // the expressions from the Orderings to populate each column. There is one column for each Ordering in
+      // popConfig.orderings.
+
+      List<ValueVector> localAllocationVectors = Lists.newArrayList();
+      SampleCopier copier = getCopier(sv4, sortedSamples, containerToCache, popConfig.getOrderings(), localAllocationVectors);
+      int allocationSize = 50;
+      while (true) {
+        for (ValueVector vv : localAllocationVectors) {
+          AllocationHelper.allocate(vv, samplingFactor * partitions, allocationSize);
+        }
+        if (copier.copyRecords(recordsSampled / (samplingFactor * partitions), 0, samplingFactor * partitions)) {
+          break;
+        } else {
+          containerToCache.zeroVectors();
+          allocationSize *= 2;
+        }
+      }
+      for (VectorWrapper<?> vw : containerToCache) {
+        vw.getValueVector().getMutator().setValueCount(copier.getOutputRecords());
+      }
+      containerToCache.setRecordCount(copier.getOutputRecords());
 
-    Sorter sorter = SortBatch.createNewSorter(context, popConfig.getOrderings(), sortedSamples);
-    SelectionVector4 sv4 = builder.getSv4();
-    sorter.setup(context, sv4, sortedSamples);
-    sorter.sort(sv4, sortedSamples);
+      // Get a distributed multimap handle from the distributed cache, and put the vectors from the new vector container
+      // into a serializable wrapper object, and then add to distributed map
 
-    // Project every Nth record to a new vector container, where N = recordsSampled/(samplingFactor * partitions).
-    // Uses the
-    // the expressions from the Orderings to populate each column. There is one column for each Ordering in
-    // popConfig.orderings.
+      batch = WritableBatch.getBatchNoHVWrap(containerToCache.getRecordCount(), containerToCache, false);
+      sampleToSave = new CachedVectorContainer(batch, context.getAllocator());
 
-    VectorContainer containerToCache = new VectorContainer();
-    List<ValueVector> localAllocationVectors = Lists.newArrayList();
-    SampleCopier copier = getCopier(sv4, sortedSamples, containerToCache, popConfig.getOrderings(), localAllocationVectors);
-    int allocationSize = 50;
-    while (true) {
-      for (ValueVector vv : localAllocationVectors) {
-        AllocationHelper.allocate(vv, samplingFactor * partitions, allocationSize);
+      mmap.put(mapKey, sampleToSave);
+      this.sampledIncomingBatches = builder.getHeldRecordBatches();
+    } finally {
+      builder.clear();
+      builder.close();
+      if (batch != null) {
+        batch.clear();
       }
-      if (copier.copyRecords(recordsSampled / (samplingFactor * partitions), 0, samplingFactor * partitions)) {
-        break;
-      } else {
-        containerToCache.zeroVectors();
-        allocationSize *= 2;
+      containerToCache.clear();
+      if (sampleToSave != null) {
+        sampleToSave.clear();
       }
     }
-    for (VectorWrapper<?> vw : containerToCache) {
-      vw.getValueVector().getMutator().setValueCount(copier.getOutputRecords());
-    }
-    containerToCache.setRecordCount(copier.getOutputRecords());
-
-    // Get a distributed multimap handle from the distributed cache, and put the vectors from the new vector container
-    // into a serializable wrapper object, and then add to distributed map
-
-    WritableBatch batch = WritableBatch.getBatchNoHVWrap(containerToCache.getRecordCount(), containerToCache, false);
-    CachedVectorContainer sampleToSave = new CachedVectorContainer(batch, context.getAllocator());
-
-    mmap.put(mapKey, sampleToSave);
-    this.sampledIncomingBatches = builder.getHeldRecordBatches();
-    builder.clear();
-    batch.clear();
-    containerToCache.clear();
-    sampleToSave.clear();
     return true;
 
 
@@ -335,57 +345,63 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
     // Get all samples from distributed map
 
     SortRecordBatchBuilder containerBuilder = new SortRecordBatchBuilder(context.getAllocator(), MAX_SORT_BYTES);
-    for (CachedVectorContainer w : mmap.get(mapKey)) {
-      containerBuilder.add(w.get());
-    }
-    VectorContainer allSamplesContainer = new VectorContainer();
-    containerBuilder.build(context, allSamplesContainer);
-
-    List<Ordering> orderDefs = Lists.newArrayList();
-    int i = 0;
-    for (Ordering od : popConfig.getOrderings()) {
-      SchemaPath sp = SchemaPath.getSimplePath("f" + i++);
-      orderDefs.add(new Ordering(od.getDirection(), new FieldReference(sp)));
-    }
+    final VectorContainer allSamplesContainer = new VectorContainer();
+    final VectorContainer candidatePartitionTable = new VectorContainer();
+    CachedVectorContainer wrap = null;
+    try {
+      for (CachedVectorContainer w : mmap.get(mapKey)) {
+        containerBuilder.add(w.get());
+      }
+      containerBuilder.build(context, allSamplesContainer);
 
-    // sort the data incoming samples.
-    SelectionVector4 newSv4 = containerBuilder.getSv4();
-    Sorter sorter = SortBatch.createNewSorter(context, orderDefs, allSamplesContainer);
-    sorter.setup(context, newSv4, allSamplesContainer);
-    sorter.sort(newSv4, allSamplesContainer);
-
-    // Copy every Nth record from the samples into a candidate partition table, where N = totalSampledRecords/partitions
-    // Attempt to push this to the distributed map. Only the first candidate to get pushed will be used.
-    VectorContainer candidatePartitionTable = new VectorContainer();
-    SampleCopier copier = null;
-    List<ValueVector> localAllocationVectors = Lists.newArrayList();
-    copier = getCopier(newSv4, allSamplesContainer, candidatePartitionTable, orderDefs, localAllocationVectors);
-    int allocationSize = 50;
-    while (true) {
-      for (ValueVector vv : localAllocationVectors) {
-        AllocationHelper.allocate(vv, samplingFactor * partitions, allocationSize);
+      List<Ordering> orderDefs = Lists.newArrayList();
+      int i = 0;
+      for (Ordering od : popConfig.getOrderings()) {
+        SchemaPath sp = SchemaPath.getSimplePath("f" + i++);
+        orderDefs.add(new Ordering(od.getDirection(), new FieldReference(sp)));
       }
-      int skipRecords = containerBuilder.getSv4().getTotalCount() / partitions;
-      if (copier.copyRecords(skipRecords, skipRecords, partitions - 1)) {
-        assert copier.getOutputRecords() == partitions - 1 : String.format("output records: %d partitions: %d", copier.getOutputRecords(), partitions);
-        for (VectorWrapper<?> vw : candidatePartitionTable) {
-          vw.getValueVector().getMutator().setValueCount(copier.getOutputRecords());
+
+      // sort the data incoming samples.
+      SelectionVector4 newSv4 = containerBuilder.getSv4();
+      Sorter sorter = SortBatch.createNewSorter(context, orderDefs, allSamplesContainer);
+      sorter.setup(context, newSv4, allSamplesContainer);
+      sorter.sort(newSv4, allSamplesContainer);
+
+      // Copy every Nth record from the samples into a candidate partition table, where N = totalSampledRecords/partitions
+      // Attempt to push this to the distributed map. Only the first candidate to get pushed will be used.
+      SampleCopier copier = null;
+      List<ValueVector> localAllocationVectors = Lists.newArrayList();
+      copier = getCopier(newSv4, allSamplesContainer, candidatePartitionTable, orderDefs, localAllocationVectors);
+      int allocationSize = 50;
+      while (true) {
+        for (ValueVector vv : localAllocationVectors) {
+          AllocationHelper.allocate(vv, samplingFactor * partitions, allocationSize);
         }
-        break;
-      } else {
-        candidatePartitionTable.zeroVectors();
-        allocationSize *= 2;
+        int skipRecords = containerBuilder.getSv4().getTotalCount() / partitions;
+        if (copier.copyRecords(skipRecords, skipRecords, partitions - 1)) {
+          assert copier.getOutputRecords() == partitions - 1 : String.format("output records: %d partitions: %d", copier.getOutputRecords(), partitions);
+          for (VectorWrapper<?> vw : candidatePartitionTable) {
+            vw.getValueVector().getMutator().setValueCount(copier.getOutputRecords());
+          }
+          break;
+        } else {
+          candidatePartitionTable.zeroVectors();
+          allocationSize *= 2;
+        }
+      }
+      candidatePartitionTable.setRecordCount(copier.getOutputRecords());
+      WritableBatch batch = WritableBatch.getBatchNoHVWrap(candidatePartitionTable.getRecordCount(), candidatePartitionTable, false);
+      wrap = new CachedVectorContainer(batch, context.getDrillbitContext().getAllocator());
+      tableMap.putIfAbsent(mapKey + "final", wrap, 1, TimeUnit.MINUTES);
+    } finally {
+      candidatePartitionTable.clear();
+      allSamplesContainer.clear();
+      containerBuilder.clear();
+      containerBuilder.close();
+      if (wrap != null) {
+        wrap.clear();
       }
     }
-    candidatePartitionTable.setRecordCount(copier.getOutputRecords());
-    WritableBatch batch = WritableBatch.getBatchNoHVWrap(candidatePartitionTable.getRecordCount(), candidatePartitionTable, false);
-    CachedVectorContainer wrap = new CachedVectorContainer(batch, context.getDrillbitContext().getAllocator());
-    tableMap.putIfAbsent(mapKey + "final", wrap, 1, TimeUnit.MINUTES);
-
-    candidatePartitionTable.clear();
-    allSamplesContainer.clear();
-    containerBuilder.clear();
-    wrap.clear();
 
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/da70b63f/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
index 8748aaf..dea6ba8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
@@ -85,6 +85,7 @@ public class SortBatch extends AbstractRecordBatch<Sort> {
   @Override
   public void close() {
     builder.clear();
+    builder.close();
     super.close();
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/da70b63f/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
index e559ece..00f1992 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
@@ -39,7 +39,7 @@ import org.apache.drill.exec.vector.ValueVector;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.Lists;
 
-public class SortRecordBatchBuilder {
+public class SortRecordBatchBuilder implements AutoCloseable {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SortRecordBatchBuilder.class);
 
   private final ArrayListMultimap<BatchSchema, RecordBatchData> batches = ArrayListMultimap.create();
@@ -50,6 +50,7 @@ public class SortRecordBatchBuilder {
   private final long maxBytes;
   private SelectionVector4 sv4;
   final PreAllocator svAllocator;
+  private boolean svAllocatorUsed = false;
 
   public SortRecordBatchBuilder(BufferAllocator a, long maxBytes) {
     this.maxBytes = maxBytes;
@@ -165,6 +166,7 @@ public class SortRecordBatchBuilder {
     if (svBuffer == null) {
       throw new OutOfMemoryError("Failed to allocate direct memory for SV4 vector in SortRecordBatchBuilder.");
     }
+    svAllocatorUsed = true;
     sv4 = new SelectionVector4(svBuffer, recordCount, Character.MAX_VALUE);
     BatchSchema schema = batches.keySet().iterator().next();
     List<RecordBatchData> data = batches.get(schema);
@@ -228,6 +230,17 @@ public class SortRecordBatchBuilder {
     }
   }
 
+  @Override
+  public void close() {
+    // Don't leak unused pre-allocated memory.
+    if (!svAllocatorUsed) {
+      final DrillBuf drillBuf = svAllocator.getAllocation();
+      if (drillBuf != null) {
+        drillBuf.release();
+      }
+    }
+  }
+
   public List<VectorContainer> getHeldRecordBatches() {
     ArrayList<VectorContainer> containerList = Lists.newArrayList();
     for (BatchSchema bs : batches.keySet()) {

http://git-wip-us.apache.org/repos/asf/drill/blob/da70b63f/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
index 5cdd2bb..612777e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
@@ -160,6 +160,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
     }
     if (builder != null) {
       builder.clear();
+      builder.close();
     }
     if (sv4 != null) {
       sv4.clear();
@@ -346,6 +347,10 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
         Stopwatch watch = new Stopwatch();
         watch.start();
 
+        if (builder != null) {
+          builder.clear();
+          builder.close();
+        }
         builder = new SortRecordBatchBuilder(oContext.getAllocator(), MAX_SORT_BYTES);
 
         for (BatchGroup group : batchGroups) {

http://git-wip-us.apache.org/repos/asf/drill/blob/da70b63f/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
index 9b97e1c..9acae9e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
@@ -17,6 +17,8 @@
  */
 package org.apache.drill.exec.physical.impl.xsort;
 
+import io.netty.buffer.DrillBuf;
+
 import java.util.Queue;
 
 import javax.inject.Named;
@@ -53,7 +55,8 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable{
     doSetup(context, hyperBatch, null);
     runStarts.add(0);
     int batch = 0;
-    for (int i = 0; i < this.vector4.getTotalCount(); i++) {
+    final int totalCount = this.vector4.getTotalCount();
+    for (int i = 0; i < totalCount; i++) {
       final int newBatch = this.vector4.get(i) >>> 16;
       if (newBatch == batch) {
         continue;
@@ -64,9 +67,8 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable{
         throw new UnsupportedOperationException("Missing batch");
       }
     }
-    final BufferAllocator.PreAllocator preAlloc = allocator.getNewPreAllocator();
-    preAlloc.preAllocate(4 * this.vector4.getTotalCount());
-    aux = new SelectionVector4(preAlloc.getAllocation(), this.vector4.getTotalCount(), Character.MAX_VALUE);
+    final DrillBuf drillBuf = allocator.buffer(4 * totalCount);
+    aux = new SelectionVector4(drillBuf, totalCount, Character.MAX_VALUE);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/drill/blob/da70b63f/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java
index f7786b7..facf192 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java
@@ -17,6 +17,8 @@
  */
 package org.apache.drill.exec.physical.impl.xsort;
 
+import io.netty.buffer.DrillBuf;
+
 import java.util.List;
 
 import javax.inject.Named;
@@ -47,9 +49,8 @@ public abstract class PriorityQueueCopierTemplate implements PriorityQueueCopier
     this.outgoing = outgoing;
     this.size = batchGroups.size();
 
-    BufferAllocator.PreAllocator preAlloc = allocator.getNewPreAllocator();
-    preAlloc.preAllocate(4 * size);
-    vector4 = new SelectionVector4(preAlloc.getAllocation(), size, Character.MAX_VALUE);
+    final DrillBuf drillBuf = allocator.buffer(4 * size);
+    vector4 = new SelectionVector4(drillBuf, size, Character.MAX_VALUE);
     doSetup(context, hyperBatch, outgoing);
 
     queueSize = 0;

http://git-wip-us.apache.org/repos/asf/drill/blob/da70b63f/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
index 308a8bc..324829a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
@@ -68,33 +68,36 @@ public class WritableBatch {
       }
 
       DrillBuf newBuf = buffers[0].getAllocator().buffer(len);
-
-      /* Copy data from each buffer into the compound buffer */
-      int offset = 0;
-      for (DrillBuf buf : buffers) {
-        newBuf.setBytes(offset, buf);
-        offset += buf.capacity();
-        buf.release();
-      }
-
-      List<SerializedField> fields = def.getFieldList();
-
-      int bufferOffset = 0;
-
-      /*
-       * For each value vector slice up the appropriate size from the compound buffer and load it into the value vector
-       */
-      int vectorIndex = 0;
-
-      for (VectorWrapper<?> vv : container) {
-        SerializedField fmd = fields.get(vectorIndex);
-        ValueVector v = vv.getValueVector();
-        DrillBuf bb = newBuf.slice(bufferOffset, fmd.getBufferLength());
+      try {
+        /* Copy data from each buffer into the compound buffer */
+        int offset = 0;
+        for (DrillBuf buf : buffers) {
+          newBuf.setBytes(offset, buf);
+          offset += buf.capacity();
+          buf.release();
+        }
+
+        List<SerializedField> fields = def.getFieldList();
+
+        int bufferOffset = 0;
+
+        /*
+         * For each value vector slice up the appropriate size from the compound buffer and load it into the value vector
+         */
+        int vectorIndex = 0;
+
+        for (VectorWrapper<?> vv : container) {
+          SerializedField fmd = fields.get(vectorIndex);
+          ValueVector v = vv.getValueVector();
+          DrillBuf bb = newBuf.slice(bufferOffset, fmd.getBufferLength());
 //        v.load(fmd, cbb.slice(bufferOffset, fmd.getBufferLength()));
-        v.load(fmd, bb);
-        bb.release();
-        vectorIndex++;
-        bufferOffset += fmd.getBufferLength();
+          v.load(fmd, bb);
+          vectorIndex++;
+          bufferOffset += fmd.getBufferLength();
+        }
+      } finally {
+        // Any vectors that loaded material from newBuf slices above will retain those.
+        newBuf.release(1);
       }
     }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/da70b63f/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
index ae5fad5..f88a7bc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
@@ -200,6 +200,7 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
 
 
   public void transferTo(BitVector target) {
+    target.clear();
     target.data = data;
     target.data.retain();
     target.valueCount = valueCount;
@@ -212,6 +213,7 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
     int byteSize = getSizeFromCount(length);
     int offset = startIndex % 8;
     if (offset == 0) {
+      target.clear();
       // slice
       target.data = (DrillBuf) this.data.slice(firstByte, byteSize);
       target.data.retain();

http://git-wip-us.apache.org/repos/asf/drill/blob/da70b63f/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java
index 78846dc..3c01939 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java
@@ -242,4 +242,13 @@ public abstract class AbstractMapVector extends AbstractContainerVector {
     }
     return actualBufSize;
   }
+
+  @Override
+  public void close() {
+   for(final ValueVector valueVector : vectors.values()) {
+     valueVector.close();
+   }
+
+   super.close();
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/da70b63f/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java
index b615b66..d0f38c2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java
@@ -342,4 +342,14 @@ public class MapVector extends AbstractMapVector {
       v.clear();
     }
   }
+
+  @Override
+  public void close() {
+    for (final ValueVector v : getChildren()) {
+      v.close();
+    }
+    valueCount = 0;
+
+    super.close();
+ }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/da70b63f/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestValueVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestValueVector.java b/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestValueVector.java
index 2b1dff0..037c8c6 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestValueVector.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestValueVector.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.record.vector;
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 import java.nio.charset.Charset;
 
@@ -102,12 +103,10 @@ public class TestValueVector extends ExecTest {
     boolean b = false;
     try {
       v.getAccessor().get(3);
-    } catch(AssertionError e) {
+    } catch(IllegalStateException e) {
       b = true;
     }finally{
-      if(!b){
-        assert false;
-      }
+      assertTrue(b);
     }
 
   }
@@ -139,12 +138,10 @@ public class TestValueVector extends ExecTest {
       boolean b = false;
       try {
         v.getAccessor().get(3);
-      } catch(AssertionError e) {
+      } catch(IllegalStateException e) {
         b = true;
       }finally{
-        if(!b){
-          assert false;
-        }
+        assertTrue(b);
       }
     }
 
@@ -154,12 +151,10 @@ public class TestValueVector extends ExecTest {
       boolean b = false;
       try {
         v.getAccessor().get(0);
-      } catch(AssertionError e) {
+      } catch(IllegalStateException e) {
         b = true;
       }finally{
-        if(!b){
-          assert false;
-        }
+        assertTrue(b);
       }
     }
 
@@ -180,12 +175,10 @@ public class TestValueVector extends ExecTest {
       boolean b = false;
       try {
         v.getAccessor().get(3);
-      } catch(AssertionError e) {
+      } catch(IllegalStateException e) {
         b = true;
       }finally{
-        if(!b){
-          assert false;
-        }
+        assertTrue(b);
       }
     }
 
@@ -217,12 +210,10 @@ public class TestValueVector extends ExecTest {
       boolean b = false;
       try {
         v.getAccessor().get(3);
-      } catch(AssertionError e) {
+      } catch(IllegalStateException e) {
         b = true;
       }finally{
-        if(!b){
-          assert false;
-        }
+        assertTrue(b);
       }
     }
 
@@ -231,12 +222,10 @@ public class TestValueVector extends ExecTest {
       boolean b = false;
       try {
         v.getAccessor().get(0);
-      } catch(AssertionError e) {
+      } catch(IllegalStateException e) {
         b = true;
       }finally{
-        if(!b){
-          assert false;
-        }
+        assertTrue(b);
       }
     }
   }


Mime
View raw message