drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From s..@apache.org
Subject drill git commit: DRILL-4215: Transfer buffer ownership in TransferPair
Date Tue, 29 Dec 2015 01:09:46 GMT
Repository: drill
Updated Branches:
  refs/heads/master de008810c -> 6dea42994


DRILL-4215: Transfer buffer ownership in TransferPair


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/6dea4299
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/6dea4299
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/6dea4299

Branch: refs/heads/master
Commit: 6dea429949a3d6a68aefbdb3d78de41e0955239b
Parents: de00881
Author: Steven Phillips <smp@apache.org>
Authored: Tue Dec 1 00:34:41 2015 -0800
Committer: Steven Phillips <smp@apache.org>
Committed: Mon Dec 28 15:43:07 2015 -0800

----------------------------------------------------------------------
 .../drill/exec/physical/impl/ScreenCreator.java |  2 +-
 .../exec/physical/impl/SingleSenderCreator.java |  2 +-
 .../exec/physical/impl/TopN/TopNBatch.java      |  4 ++--
 .../physical/impl/aggregate/InternalBatch.java  |  9 +++++----
 .../impl/aggregate/StreamingAggBatch.java       |  2 +-
 .../impl/aggregate/StreamingAggTemplate.java    |  7 ++++---
 .../impl/aggregate/StreamingAggregator.java     |  3 ++-
 .../BroadcastSenderRootExec.java                |  2 +-
 .../physical/impl/filter/FilterRecordBatch.java |  2 +-
 .../impl/flatten/FlattenRecordBatch.java        |  4 ++--
 .../exec/physical/impl/join/HashJoinBatch.java  |  2 +-
 .../physical/impl/join/NestedLoopJoinBatch.java |  2 +-
 .../materialize/VectorRecordMaterializer.java   |  8 ++++++--
 .../OrderedPartitionRecordBatch.java            |  2 +-
 .../impl/producer/ProducerConsumerBatch.java    |  2 +-
 .../physical/impl/sort/RecordBatchData.java     |  5 +++--
 .../impl/sort/SortRecordBatchBuilder.java       |  2 +-
 .../physical/impl/trace/TraceRecordBatch.java   |  2 +-
 .../physical/impl/window/WindowDataBatch.java   |  2 +-
 .../physical/impl/xsort/ExternalSortBatch.java  |  6 +++---
 .../drill/exec/record/HyperVectorWrapper.java   |  3 ++-
 .../drill/exec/record/RecordIterator.java       |  4 +++-
 .../apache/drill/exec/record/SchemaUtil.java    |  2 +-
 .../drill/exec/record/SimpleVectorWrapper.java  |  5 +++--
 .../drill/exec/record/VectorContainer.java      | 11 ++++++-----
 .../apache/drill/exec/record/VectorWrapper.java |  3 ++-
 .../apache/drill/exec/record/WritableBatch.java | 12 ++++++++++++
 .../drill/TestTpchDistributedConcurrent.java    |  2 +-
 .../drill/exec/vector/TestSplitAndTransfer.java |  2 +-
 .../codegen/templates/FixedValueVectors.java    | 16 +++++++---------
 .../codegen/templates/NullableValueVectors.java | 10 +++++-----
 .../codegen/templates/RepeatedValueVectors.java | 10 +++++-----
 .../src/main/codegen/templates/UnionVector.java | 14 +++++++++-----
 .../templates/VariableLengthVectors.java        | 17 ++++++++---------
 .../drill/exec/vector/BaseValueVector.java      |  9 +++++++--
 .../org/apache/drill/exec/vector/BitVector.java | 10 +++++-----
 .../apache/drill/exec/vector/ObjectVector.java  |  4 ++--
 .../apache/drill/exec/vector/ValueVector.java   |  7 +++++--
 .../apache/drill/exec/vector/ZeroVector.java    |  9 +++++++--
 .../vector/complex/AbstractContainerVector.java |  4 ++++
 .../drill/exec/vector/complex/ListVector.java   |  6 +++---
 .../drill/exec/vector/complex/MapVector.java    | 12 ++++++------
 .../exec/vector/complex/RepeatedListVector.java | 10 +++++-----
 .../exec/vector/complex/RepeatedMapVector.java  | 20 ++++++++++----------
 .../vector/complex/impl/PromotableWriter.java   |  2 +-
 45 files changed, 159 insertions(+), 115 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/6dea4299/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
index 3b90979..60355fb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
@@ -107,7 +107,7 @@ public class ScreenCreator implements RootCreator<Screen> {
 
         return false;
       case OK_NEW_SCHEMA:
-        materializer = new VectorRecordMaterializer(context, incoming);
+        materializer = new VectorRecordMaterializer(context, oContext, incoming);
         //$FALL-THROUGH$
       case OK:
         injector.injectPause(context.getExecutionControls(), "sending-data", logger);

http://git-wip-us.apache.org/repos/asf/drill/blob/6dea4299/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
index 23e97d0..2f33193 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
@@ -120,7 +120,7 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
         final FragmentWritableBatch batch = new FragmentWritableBatch(
             false, handle.getQueryId(), handle.getMajorFragmentId(),
             handle.getMinorFragmentId(), recMajor, oppositeHandle.getMinorFragmentId(),
-            incoming.getWritableBatch());
+            incoming.getWritableBatch().transfer(oContext.getAllocator()));
         updateStats(batch);
         stats.startWait();
         try {

http://git-wip-us.apache.org/repos/asf/drill/blob/6dea4299/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
index c287bc3..a6c3269 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
@@ -229,9 +229,9 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
           batchCount++;
           RecordBatchData batch;
           if (schemaChanged) {
-            batch = new RecordBatchData(SchemaUtil.coerceContainer(incoming, this.schema, oContext));
+            batch = new RecordBatchData(SchemaUtil.coerceContainer(incoming, this.schema, oContext), oContext.getAllocator());
           } else {
-            batch = new RecordBatchData(incoming);
+            batch = new RecordBatchData(incoming, oContext.getAllocator());
           }
           boolean success = false;
           try {

http://git-wip-us.apache.org/repos/asf/drill/blob/6dea4299/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java
index dae9eae..9e96727 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.physical.impl.aggregate;
 
 import java.util.Iterator;
 
+import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.VectorContainer;
@@ -34,11 +35,11 @@ public class InternalBatch implements Iterable<VectorWrapper<?>>{
   private final SelectionVector2 sv2;
   private final SelectionVector4 sv4;
 
-  public InternalBatch(RecordBatch incoming) {
-    this(incoming, null);
+  public InternalBatch(RecordBatch incoming, OperatorContext oContext) {
+    this(incoming, null, oContext);
   }
 
-  public InternalBatch(RecordBatch incoming, VectorWrapper[] ignoreWrappers){
+  public InternalBatch(RecordBatch incoming, VectorWrapper[] ignoreWrappers, OperatorContext oContext){
     switch(incoming.getSchema().getSelectionVectorMode()){
     case FOUR_BYTE:
       this.sv4 = incoming.getSelectionVector4().createNewWrapperCurrent();
@@ -53,7 +54,7 @@ public class InternalBatch implements Iterable<VectorWrapper<?>>{
       this.sv2 = null;
     }
     this.schema = incoming.getSchema();
-    this.container = VectorContainer.getTransferClone(incoming, ignoreWrappers);
+    this.container = VectorContainer.getTransferClone(incoming, ignoreWrappers, oContext);
   }
 
   public BatchSchema getSchema() {

http://git-wip-us.apache.org/repos/asf/drill/blob/6dea4299/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
index ee9a0ab..c084e39 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
@@ -309,7 +309,7 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
 
     container.buildSchema(SelectionVectorMode.NONE);
     StreamingAggregator agg = context.getImplementationClass(cg);
-    agg.setup(context, incoming, this);
+    agg.setup(oContext, incoming, this);
     return agg;
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/6dea4299/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
index 4932b0f..82e8777 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
@@ -21,6 +21,7 @@ import javax.inject.Named;
 
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.RecordBatch.IterOutcome;
 import org.apache.drill.exec.record.VectorWrapper;
@@ -41,12 +42,12 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
   private int outputCount = 0;
   private RecordBatch incoming;
   private StreamingAggBatch outgoing;
-  private FragmentContext context;
   private boolean done = false;
+  private OperatorContext context;
 
 
   @Override
-  public void setup(FragmentContext context, RecordBatch incoming, StreamingAggBatch outgoing) throws SchemaChangeException {
+  public void setup(OperatorContext context, RecordBatch incoming, StreamingAggBatch outgoing) throws SchemaChangeException {
     this.context = context;
     this.incoming = incoming;
     this.outgoing = outgoing;
@@ -164,7 +165,7 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
           previousIndex = currentIndex;
         }
 
-        InternalBatch previous = new InternalBatch(incoming);
+        InternalBatch previous = new InternalBatch(incoming, context);
 
         try {
           while (true) {

http://git-wip-us.apache.org/repos/asf/drill/blob/6dea4299/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java
index 96da00b..61c82d8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.physical.impl.aggregate;
 import org.apache.drill.exec.compile.TemplateClassDefinition;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.RecordBatch.IterOutcome;
 
@@ -31,7 +32,7 @@ public interface StreamingAggregator {
     RETURN_OUTCOME, CLEANUP_AND_RETURN, UPDATE_AGGREGATOR;
   }
 
-  public abstract void setup(FragmentContext context, RecordBatch incoming, StreamingAggBatch outgoing) throws SchemaChangeException;
+  public abstract void setup(OperatorContext context, RecordBatch incoming, StreamingAggBatch outgoing) throws SchemaChangeException;
 
   public abstract IterOutcome getOutcome();
 

http://git-wip-us.apache.org/repos/asf/drill/blob/6dea4299/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
index c88c72d..80d7744 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
@@ -117,7 +117,7 @@ public class BroadcastSenderRootExec extends BaseRootExec {
 
       case OK_NEW_SCHEMA:
       case OK:
-        WritableBatch writableBatch = incoming.getWritableBatch();
+        WritableBatch writableBatch = incoming.getWritableBatch().transfer(oContext.getAllocator());
         if (tunnels.length > 1) {
           writableBatch.retainBuffers(tunnels.length - 1);
         }

http://git-wip-us.apache.org/repos/asf/drill/blob/6dea4299/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
index e435e79..c0e8944 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
@@ -153,7 +153,7 @@ public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter>{
 
     for (final VectorWrapper<?> vw : incoming) {
       for (final ValueVector vv : vw.getValueVectors()) {
-        final TransferPair pair = vv.getTransferPair();
+        final TransferPair pair = vv.getTransferPair(oContext.getAllocator());
         container.add(pair.getTo());
         transfers.add(pair);
       }

http://git-wip-us.apache.org/repos/asf/drill/blob/6dea4299/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
index c3b3f45..dcaa244 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
@@ -265,12 +265,12 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> {
 
     TransferPair tp = null;
     if (flattenField instanceof RepeatedMapVector) {
-      tp = ((RepeatedMapVector)flattenField).getTransferPairToSingleMap(reference);
+      tp = ((RepeatedMapVector)flattenField).getTransferPairToSingleMap(reference, oContext.getAllocator());
     } else {
       final ValueVector vvIn = RepeatedValueVector.class.cast(flattenField).getDataVector();
       // vvIn may be null because of fast schema return for repeated list vectors
       if (vvIn != null) {
-        tp = vvIn.getTransferPair(reference);
+        tp = vvIn.getTransferPair(reference, oContext.getAllocator());
       }
     }
     return tp;

http://git-wip-us.apache.org/repos/asf/drill/blob/6dea4299/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 2d0bd43..3ea97c6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -380,7 +380,7 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
                      * to the hyper vector container. Will be used when we want to retrieve
                      * records that have matching keys on the probe side.
                      */
-        final RecordBatchData nextBatch = new RecordBatchData(right);
+        final RecordBatchData nextBatch = new RecordBatchData(right, oContext.getAllocator());
         boolean success = false;
         try {
           if (hyperContainer == null) {

http://git-wip-us.apache.org/repos/asf/drill/blob/6dea4299/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
index b390b8f..f0e53e1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
@@ -340,7 +340,7 @@ public class NestedLoopJoinBatch extends AbstractRecordBatch<NestedLoopJoinPOP>
   }
 
   private void addBatchToHyperContainer(RecordBatch inputBatch) {
-    final RecordBatchData batchCopy = new RecordBatchData(inputBatch);
+    final RecordBatchData batchCopy = new RecordBatchData(inputBatch, oContext.getAllocator());
     boolean success = false;
     try {
       rightCounts.addLast(inputBatch.getRecordCount());

http://git-wip-us.apache.org/repos/asf/drill/blob/6dea4299/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/VectorRecordMaterializer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/VectorRecordMaterializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/VectorRecordMaterializer.java
index 3933ddd..ba8df92 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/VectorRecordMaterializer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/VectorRecordMaterializer.java
@@ -17,7 +17,9 @@
  */
 package org.apache.drill.exec.physical.impl.materialize;
 
+import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.proto.UserBitShared.QueryData;
 import org.apache.drill.exec.record.BatchSchema;
@@ -29,10 +31,12 @@ public class VectorRecordMaterializer implements RecordMaterializer{
 
   private QueryId queryId;
   private RecordBatch batch;
+  private BufferAllocator allocator;
 
-  public VectorRecordMaterializer(FragmentContext context, RecordBatch batch) {
+  public VectorRecordMaterializer(FragmentContext context, OperatorContext oContext, RecordBatch batch) {
     this.queryId = context.getHandle().getQueryId();
     this.batch = batch;
+    this.allocator = oContext.getAllocator();
     BatchSchema schema = batch.getSchema();
     assert schema != null : "Schema must be defined.";
 
@@ -43,7 +47,7 @@ public class VectorRecordMaterializer implements RecordMaterializer{
 
   public QueryWritableBatch convertNext() {
     //batch.getWritableBatch().getDef().getRecordCount()
-    WritableBatch w = batch.getWritableBatch();
+    WritableBatch w = batch.getWritableBatch().transfer(allocator);
 
     QueryData header = QueryData.newBuilder() //
         .setQueryId(queryId) //

http://git-wip-us.apache.org/repos/asf/drill/blob/6dea4299/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
index c9483ae..64cfad0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
@@ -590,7 +590,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
         OrderedPartitionProjector.TEMPLATE_DEFINITION, context.getFunctionRegistry());
 
     for (VectorWrapper<?> vw : batch) {
-      TransferPair tp = vw.getValueVector().getTransferPair();
+      TransferPair tp = vw.getValueVector().getTransferPair(oContext.getAllocator());
       transfers.add(tp);
       container.add(tp.getTo());
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/6dea4299/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
index e3033b4..38d08b6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
@@ -141,7 +141,7 @@ public class ProducerConsumerBatch extends AbstractRecordBatch {
               return;
             case OK_NEW_SCHEMA:
             case OK:
-              wrapper = RecordBatchDataWrapper.batch(new RecordBatchData(incoming));
+              wrapper = RecordBatchDataWrapper.batch(new RecordBatchData(incoming, oContext.getAllocator()));
               queue.put(wrapper);
               wrapper = null;
               break;

http://git-wip-us.apache.org/repos/asf/drill/blob/6dea4299/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java
index af774db..0cd55eb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.physical.impl.sort;
 
 import java.util.List;
 
+import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.TransferPair;
@@ -40,7 +41,7 @@ public class RecordBatchData {
   private int recordCount;
   VectorContainer container = new VectorContainer();
 
-  public RecordBatchData(VectorAccessible batch) {
+  public RecordBatchData(VectorAccessible batch, BufferAllocator allocator) {
     List<ValueVector> vectors = Lists.newArrayList();
     recordCount = batch.getRecordCount();
 
@@ -54,7 +55,7 @@ public class RecordBatchData {
       if (v.isHyper()) {
         throw new UnsupportedOperationException("Record batch data can't be created based on a hyper batch.");
       }
-      TransferPair tp = v.getValueVector().getTransferPair();
+      TransferPair tp = v.getValueVector().getTransferPair(allocator);
       tp.transfer();
       vectors.add(tp.getTo());
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/6dea4299/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
index f2302ce..33338dd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
@@ -91,7 +91,7 @@ public class SortRecordBatchBuilder implements AutoCloseable {
     }
 
 
-    RecordBatchData bd = new RecordBatchData(batch);
+    RecordBatchData bd = new RecordBatchData(batch, allocator);
     runningBatches++;
     batches.put(batch.getSchema(), bd);
     recordCount += bd.getRecordCount();

http://git-wip-us.apache.org/repos/asf/drill/blob/6dea4299/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
index ff83cc9..209624b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
@@ -140,7 +140,7 @@ public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace> {
 
     /* Add all the value vectors in the container */
     for (VectorWrapper<?> vv : incoming) {
-      TransferPair tp = vv.getValueVector().getTransferPair();
+      TransferPair tp = vv.getValueVector().getTransferPair(oContext.getAllocator());
       container.add(tp.getTo());
     }
     container.buildSchema(incoming.getSchema().getSelectionVectorMode());

http://git-wip-us.apache.org/repos/asf/drill/blob/6dea4299/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowDataBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowDataBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowDataBatch.java
index b2befa3..7abc03c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowDataBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowDataBatch.java
@@ -47,7 +47,7 @@ public class WindowDataBatch implements VectorAccessible {
       if (v.isHyper()) {
         throw new UnsupportedOperationException("Record batch data can't be created based on a hyper batch.");
       }
-      TransferPair tp = v.getValueVector().getTransferPair();
+      TransferPair tp = v.getValueVector().getTransferPair(oContext.getAllocator());
       tp.transfer();
       vectors.add(tp.getTo());
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/6dea4299/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
index 4dbd92d..6e79f01 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
@@ -366,7 +366,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
           totalCount += count;
           sorter.setup(context, sv2, convertedBatch);
           sorter.sort(sv2);
-          RecordBatchData rbd = new RecordBatchData(convertedBatch);
+          RecordBatchData rbd = new RecordBatchData(convertedBatch, oContext.getAllocator());
           boolean success = false;
           try {
             rbd.setSv2(sv2);
@@ -446,7 +446,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
         builder = new SortRecordBatchBuilder(oContext.getAllocator());
 
         for (BatchGroup group : batchGroups) {
-          RecordBatchData rbd = new RecordBatchData(group.getContainer());
+          RecordBatchData rbd = new RecordBatchData(group.getContainer(), oContext.getAllocator());
           rbd.setSv2(group.getSv2());
           builder.add(rbd);
         }
@@ -562,7 +562,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
 
     // 1 output container is kept in memory, so we want to hold on to it and transferClone
     // allows keeping ownership
-    VectorContainer c1 = VectorContainer.getTransferClone(outputContainer);
+    VectorContainer c1 = VectorContainer.getTransferClone(outputContainer, oContext);
     c1.buildSchema(BatchSchema.SelectionVectorMode.NONE);
     c1.setRecordCount(count);
 

http://git-wip-us.apache.org/repos/asf/drill/blob/6dea4299/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java
index 7fc7960..322339e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java
@@ -21,6 +21,7 @@ import java.util.AbstractMap;
 
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.complex.AbstractContainerVector;
 import org.apache.drill.exec.vector.complex.AbstractMapVector;
@@ -116,7 +117,7 @@ public class HyperVectorWrapper<T extends ValueVector> implements VectorWrapper<
 
   @Override
   @SuppressWarnings("unchecked")
-  public VectorWrapper<T> cloneAndTransfer() {
+  public VectorWrapper<T> cloneAndTransfer(BufferAllocator allocator) {
     return new HyperVectorWrapper<T>(f, vectors, false);
 //    T[] newVectors = (T[]) Array.newInstance(vectors.getClass().getComponentType(), vectors.length);
 //    for(int i =0; i < newVectors.length; i++) {

http://git-wip-us.apache.org/repos/asf/drill/blob/6dea4299/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java
index 77cb9a1..af0a753 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java
@@ -53,6 +53,7 @@ public class RecordIterator implements VectorAccessible {
   private int inputIndex;           // For two way merge join 0:left, 1:right
   private boolean lastBatchRead;    // True if all batches are consumed.
   private boolean initialized;
+  private OperatorContext oContext;
 
   private final VectorContainer container; // Holds VectorContainer of current record batch
   private final TreeRangeMap<Long, RecordBatchData> batches = TreeRangeMap.create();
@@ -66,6 +67,7 @@ public class RecordIterator implements VectorAccessible {
     this.inputIndex = inputIndex;
     this.lastBatchRead = false;
     this.container = new VectorContainer(oContext);
+    this.oContext = oContext;
     resetIndices();
     this.initialized = false;
   }
@@ -181,7 +183,7 @@ public class RecordIterator implements VectorAccessible {
             nextOuterPosition = 0;
           }
           // Transfer vectors from incoming record batch.
-          final RecordBatchData rbd = new RecordBatchData(incoming);
+          final RecordBatchData rbd = new RecordBatchData(incoming, oContext.getAllocator());
           innerRecordCount = incoming.getRecordCount();
           if (!initialized) {
             for (VectorWrapper<?> w : rbd.getContainer()) {

http://git-wip-us.apache.org/repos/asf/drill/blob/6dea4299/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaUtil.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaUtil.java
index 8cf90ab..48f0a36 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaUtil.java
@@ -100,7 +100,7 @@ public class SchemaUtil {
                                            int recordCount, OperatorContext context) {
     if (v != null) {
       int valueCount = v.getAccessor().getValueCount();
-      TransferPair tp = v.getTransferPair();
+      TransferPair tp = v.getTransferPair(context.getAllocator());
       tp.transfer();
       if (v.getField().getType().getMinorType().equals(field.getType().getMinorType())) {
         if (field.getType().getMinorType() == MinorType.UNION) {

http://git-wip-us.apache.org/repos/asf/drill/blob/6dea4299/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java
index f1b60d4..1e8a52f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java
@@ -26,6 +26,7 @@ import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.common.types.TypeProtos.MajorTypeOrBuilder;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.complex.AbstractContainerVector;
 import org.apache.drill.exec.vector.complex.AbstractMapVector;
@@ -74,8 +75,8 @@ public class SimpleVectorWrapper<T extends ValueVector> implements VectorWrapper
 
   @SuppressWarnings("unchecked")
   @Override
-  public VectorWrapper<T> cloneAndTransfer() {
-    TransferPair tp = vector.getTransferPair();
+  public VectorWrapper<T> cloneAndTransfer(BufferAllocator allocator) {
+    TransferPair tp = vector.getTransferPair(allocator);
     tp.transfer();
     return new SimpleVectorWrapper<T>((T) tp.getTo());
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/6dea4299/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
index c483650..33351ba 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
@@ -28,6 +28,7 @@ import java.util.Set;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.apache.drill.exec.vector.SchemaChangeCallBack;
@@ -148,15 +149,15 @@ public class VectorContainer implements Iterable<VectorWrapper<?>>, VectorAccess
    *          The RecordBatch iterator the contains the batch we should take over.
    * @return A cloned vector container.
    */
-  public static VectorContainer getTransferClone(VectorAccessible incoming) {
-    VectorContainer vc = new VectorContainer();
+  public static VectorContainer getTransferClone(VectorAccessible incoming, OperatorContext oContext) {
+    VectorContainer vc = new VectorContainer(oContext);
     for (VectorWrapper<?> w : incoming) {
       vc.cloneAndTransfer(w);
     }
     return vc;
   }
 
-  public static VectorContainer getTransferClone(VectorAccessible incoming, VectorWrapper[] ignoreWrappers) {
+  public static VectorContainer getTransferClone(VectorAccessible incoming, VectorWrapper[] ignoreWrappers, OperatorContext oContext) {
     Iterable<VectorWrapper<?>> wrappers = incoming;
     if (ignoreWrappers != null) {
       final List<VectorWrapper> ignored = Lists.newArrayList(ignoreWrappers);
@@ -165,7 +166,7 @@ public class VectorContainer implements Iterable<VectorWrapper<?>>, VectorAccess
       wrappers = resultant;
     }
 
-    final VectorContainer vc = new VectorContainer();
+    final VectorContainer vc = new VectorContainer(oContext);
     for (VectorWrapper<?> w : wrappers) {
       vc.cloneAndTransfer(w);
     }
@@ -198,7 +199,7 @@ public class VectorContainer implements Iterable<VectorWrapper<?>>, VectorAccess
   }
 
   private void cloneAndTransfer(VectorWrapper<?> wrapper) {
-    wrappers.add(wrapper.cloneAndTransfer());
+    wrappers.add(wrapper.cloneAndTransfer(oContext.getAllocator()));
   }
 
   public void addCollection(Iterable<ValueVector> vectors) {

http://git-wip-us.apache.org/repos/asf/drill/blob/6dea4299/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorWrapper.java
index 5250f98..65ea457 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorWrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorWrapper.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.record;
 
 import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.vector.ValueVector;
 
 
@@ -31,7 +32,7 @@ public interface VectorWrapper<T extends ValueVector> {
   public T[] getValueVectors();
   public boolean isHyper();
   public void clear();
-  public VectorWrapper<T> cloneAndTransfer();
+  public VectorWrapper<T> cloneAndTransfer(BufferAllocator allocator);
   public VectorWrapper<?> getChildWrapper(int[] ids);
   public void transfer(VectorWrapper<?> destination);
 

http://git-wip-us.apache.org/repos/asf/drill/blob/6dea4299/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
index d39ce5e..bcec920 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
@@ -51,6 +51,18 @@ public class WritableBatch implements AutoCloseable {
     this.buffers = buffers;
   }
 
+  public WritableBatch transfer(BufferAllocator allocator) {
+    List<DrillBuf> newBuffers = Lists.newArrayList();
+    for (DrillBuf buf : buffers) {
+      int writerIndex = buf.writerIndex();
+      DrillBuf newBuf = buf.transferOwnership(allocator).buffer;
+      newBuf.writerIndex(writerIndex);
+      newBuffers.add(newBuf);
+    }
+    clear();
+    return new WritableBatch(def, newBuffers);
+  }
+
   public RecordBatchDef getDef() {
     return def;
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/6dea4299/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributedConcurrent.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributedConcurrent.java b/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributedConcurrent.java
index f7843f5..66b7571 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributedConcurrent.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributedConcurrent.java
@@ -46,7 +46,7 @@ import static org.junit.Assert.assertNull;
  * any particular order of execution. We ignore the results.
  */
 public class TestTpchDistributedConcurrent extends BaseTestQuery {
-  @Rule public final TestRule TIMEOUT = TestTools.getTimeoutRule(120000); // Longer timeout than usual.
+  @Rule public final TestRule TIMEOUT = TestTools.getTimeoutRule(140000); // Longer timeout than usual.
 
   /*
    * Valid test names taken from TestTpchDistributed. Fuller path prefixes are

http://git-wip-us.apache.org/repos/asf/drill/blob/6dea4299/exec/java-exec/src/test/java/org/apache/drill/exec/vector/TestSplitAndTransfer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/TestSplitAndTransfer.java b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/TestSplitAndTransfer.java
index b2054e6..6ac8e97 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/TestSplitAndTransfer.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/TestSplitAndTransfer.java
@@ -51,7 +51,7 @@ public class TestSplitAndTransfer {
     }
     mutator.setValueCount(valueCount);
 
-    final TransferPair tp = varCharVector.getTransferPair();
+    final TransferPair tp = varCharVector.getTransferPair(allocator);
     final NullableVarCharVector newVarCharVector = (NullableVarCharVector) tp.getTo();
     final Accessor accessor = newVarCharVector.getAccessor();
     final int[][] startLengths = {{0, 201}, {201, 200}, {401, 99}};

http://git-wip-us.apache.org/repos/asf/drill/blob/6dea4299/exec/vector/src/main/codegen/templates/FixedValueVectors.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/codegen/templates/FixedValueVectors.java b/exec/vector/src/main/codegen/templates/FixedValueVectors.java
index f67614a..8e77dcc 100644
--- a/exec/vector/src/main/codegen/templates/FixedValueVectors.java
+++ b/exec/vector/src/main/codegen/templates/FixedValueVectors.java
@@ -198,13 +198,13 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
     data.writerIndex(actualLength);
     }
 
-  public TransferPair getTransferPair(){
-    return new TransferImpl(getField());
+  public TransferPair getTransferPair(BufferAllocator allocator){
+    return new TransferImpl(getField(), allocator);
   }
 
   @Override
-  public TransferPair getTransferPair(FieldReference ref){
-    return new TransferImpl(getField().withPath(ref));
+  public TransferPair getTransferPair(FieldReference ref, BufferAllocator allocator){
+    return new TransferImpl(getField().withPath(ref), allocator);
   }
 
   @Override
@@ -214,8 +214,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
 
   public void transferTo(${minor.class}Vector target){
     target.clear();
-    target.data = data;
-    target.data.retain(1);
+    target.data = data.transferOwnership(target.allocator).buffer;
     target.data.writerIndex(data.writerIndex());
     clear();
   }
@@ -224,15 +223,14 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
     final int startPoint = startIndex * ${type.width};
     final int sliceLength = length * ${type.width};
     target.clear();
-    target.data = data.slice(startPoint, sliceLength);
-    target.data.retain(1);
+    target.data = data.slice(startPoint, sliceLength).transferOwnership(target.allocator).buffer;
     target.data.writerIndex(sliceLength);
   }
 
   private class TransferImpl implements TransferPair{
     private ${minor.class}Vector to;
 
-    public TransferImpl(MaterializedField field){
+    public TransferImpl(MaterializedField field, BufferAllocator allocator){
       to = new ${minor.class}Vector(field, allocator);
     }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/6dea4299/exec/vector/src/main/codegen/templates/NullableValueVectors.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/codegen/templates/NullableValueVectors.java b/exec/vector/src/main/codegen/templates/NullableValueVectors.java
index 13bdd4f..d2c17ff 100644
--- a/exec/vector/src/main/codegen/templates/NullableValueVectors.java
+++ b/exec/vector/src/main/codegen/templates/NullableValueVectors.java
@@ -243,13 +243,13 @@ public final class ${className} extends BaseDataValueVector implements <#if type
   }
 
   @Override
-  public TransferPair getTransferPair(){
-    return new TransferImpl(getField());
+  public TransferPair getTransferPair(BufferAllocator allocator){
+    return new TransferImpl(getField(), allocator);
   }
 
   @Override
-  public TransferPair getTransferPair(FieldReference ref){
-    return new TransferImpl(getField().withPath(ref));
+  public TransferPair getTransferPair(FieldReference ref, BufferAllocator allocator){
+    return new TransferImpl(getField().withPath(ref), allocator);
   }
 
   @Override
@@ -277,7 +277,7 @@ public final class ${className} extends BaseDataValueVector implements <#if type
   private class TransferImpl implements TransferPair {
     Nullable${minor.class}Vector to;
 
-    public TransferImpl(MaterializedField field){
+    public TransferImpl(MaterializedField field, BufferAllocator allocator){
       to = new Nullable${minor.class}Vector(field, allocator);
     }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/6dea4299/exec/vector/src/main/codegen/templates/RepeatedValueVectors.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/codegen/templates/RepeatedValueVectors.java b/exec/vector/src/main/codegen/templates/RepeatedValueVectors.java
index 21f5616..ca39d71 100644
--- a/exec/vector/src/main/codegen/templates/RepeatedValueVectors.java
+++ b/exec/vector/src/main/codegen/templates/RepeatedValueVectors.java
@@ -79,13 +79,13 @@ public final class Repeated${minor.class}Vector extends BaseRepeatedValueVector
   }
 
   @Override
-  public TransferPair getTransferPair() {
-    return new TransferImpl(getField());
+  public TransferPair getTransferPair(BufferAllocator allocator) {
+    return new TransferImpl(getField(), allocator);
   }
 
   @Override
-  public TransferPair getTransferPair(FieldReference ref){
-    return new TransferImpl(getField().withPath(ref));
+  public TransferPair getTransferPair(FieldReference ref, BufferAllocator allocator){
+    return new TransferImpl(getField().withPath(ref), allocator);
   }
 
   @Override
@@ -131,7 +131,7 @@ public final class Repeated${minor.class}Vector extends BaseRepeatedValueVector
   private class TransferImpl implements TransferPair {
     final Repeated${minor.class}Vector to;
 
-    public TransferImpl(MaterializedField field) {
+    public TransferImpl(MaterializedField field, BufferAllocator allocator) {
       this.to = new Repeated${minor.class}Vector(field, allocator);
     }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/6dea4299/exec/vector/src/main/codegen/templates/UnionVector.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/codegen/templates/UnionVector.java b/exec/vector/src/main/codegen/templates/UnionVector.java
index cc541e5..2e278b1 100644
--- a/exec/vector/src/main/codegen/templates/UnionVector.java
+++ b/exec/vector/src/main/codegen/templates/UnionVector.java
@@ -80,6 +80,10 @@ public class UnionVector implements ValueVector {
     this.callBack = callBack;
   }
 
+  public BufferAllocator getAllocator() {
+    return allocator;
+  }
+
   public List<MinorType> getSubTypes() {
     return majorType.getSubTypeList();
   }
@@ -198,13 +202,13 @@ public class UnionVector implements ValueVector {
   }
 
   @Override
-  public TransferPair getTransferPair() {
-    return new TransferImpl(field);
+  public TransferPair getTransferPair(BufferAllocator allocator) {
+    return new TransferImpl(field, allocator);
   }
 
   @Override
-  public TransferPair getTransferPair(FieldReference ref) {
-    return new TransferImpl(field.withPath(ref));
+  public TransferPair getTransferPair(FieldReference ref, BufferAllocator allocator) {
+    return new TransferImpl(field.withPath(ref), allocator);
   }
 
   @Override
@@ -242,7 +246,7 @@ public class UnionVector implements ValueVector {
 
     UnionVector to;
 
-    public TransferImpl(MaterializedField field) {
+    public TransferImpl(MaterializedField field, BufferAllocator allocator) {
       to = new UnionVector(field, allocator, null);
     }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/6dea4299/exec/vector/src/main/codegen/templates/VariableLengthVectors.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/codegen/templates/VariableLengthVectors.java b/exec/vector/src/main/codegen/templates/VariableLengthVectors.java
index f734734..56d2d52 100644
--- a/exec/vector/src/main/codegen/templates/VariableLengthVectors.java
+++ b/exec/vector/src/main/codegen/templates/VariableLengthVectors.java
@@ -174,13 +174,13 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
   }
 
   @Override
-  public TransferPair getTransferPair(){
-    return new TransferImpl(getField());
+  public TransferPair getTransferPair(BufferAllocator allocator){
+    return new TransferImpl(getField(), allocator);
   }
 
   @Override
-  public TransferPair getTransferPair(FieldReference ref){
-    return new TransferImpl(getField().withPath(ref));
+  public TransferPair getTransferPair(FieldReference ref, BufferAllocator allocator){
+    return new TransferImpl(getField().withPath(ref), allocator);
   }
 
   @Override
@@ -191,8 +191,8 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
   public void transferTo(${minor.class}Vector target){
     target.clear();
     this.offsetVector.transferTo(target.offsetVector);
-    target.data = data;
-    target.data.retain(1);
+    target.data = data.transferOwnership(target.allocator).buffer;
+    target.data.writerIndex(data.writerIndex());
     clear();
   }
 
@@ -207,8 +207,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
     for (int i = 0; i < length + 1; i++) {
       targetOffsetVectorMutator.set(i, offsetVectorAccessor.get(startIndex + i) - startPoint);
     }
-    target.data = data.slice(startPoint, sliceLength);
-    target.data.retain(1);
+    target.data = data.slice(startPoint, sliceLength).transferOwnership(target.allocator).buffer;
     target.getMutator().setValueCount(length);
 }
 
@@ -242,7 +241,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
   private class TransferImpl implements TransferPair{
     ${minor.class}Vector to;
 
-    public TransferImpl(MaterializedField field){
+    public TransferImpl(MaterializedField field, BufferAllocator allocator){
       to = new ${minor.class}Vector(field, allocator);
     }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/6dea4299/exec/vector/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java
index eb5dbcd..23ad778 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java
@@ -71,8 +71,8 @@ public abstract class BaseValueVector implements ValueVector {
   }
 
   @Override
-  public TransferPair getTransferPair() {
-    return getTransferPair(new FieldReference(getField().getPath()));
+  public TransferPair getTransferPair(BufferAllocator allocator) {
+    return getTransferPair(new FieldReference(getField().getPath()), allocator);
   }
 
   @Override
@@ -119,5 +119,10 @@ public abstract class BaseValueVector implements ValueVector {
 
     return true;
   }
+
+  @Override
+  public BufferAllocator getAllocator() {
+    return allocator;
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/6dea4299/exec/vector/src/main/java/org/apache/drill/exec/vector/BitVector.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/BitVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/BitVector.java
index c1504c6..3ba11e2 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/BitVector.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/BitVector.java
@@ -208,13 +208,13 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
   }
 
   @Override
-  public TransferPair getTransferPair() {
-    return new TransferImpl(getField());
+  public TransferPair getTransferPair(BufferAllocator allocator) {
+    return new TransferImpl(getField(), allocator);
   }
 
   @Override
-  public TransferPair getTransferPair(FieldReference ref) {
-    return new TransferImpl(getField().withPath(ref));
+  public TransferPair getTransferPair(FieldReference ref, BufferAllocator allocator) {
+    return new TransferImpl(getField().withPath(ref), allocator);
   }
 
   @Override
@@ -273,7 +273,7 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
   private class TransferImpl implements TransferPair {
     BitVector to;
 
-    public TransferImpl(MaterializedField field) {
+    public TransferImpl(MaterializedField field, BufferAllocator allocator) {
       this.to = new BitVector(field, allocator);
     }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/6dea4299/exec/vector/src/main/java/org/apache/drill/exec/vector/ObjectVector.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/ObjectVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/ObjectVector.java
index 9ca4410..494f234 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/ObjectVector.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/ObjectVector.java
@@ -147,7 +147,7 @@ public class ObjectVector extends BaseValueVector {
   }
 
   @Override
-  public TransferPair getTransferPair() {
+  public TransferPair getTransferPair(BufferAllocator allocator) {
     throw new UnsupportedOperationException("ObjectVector does not support this");
   }
 
@@ -157,7 +157,7 @@ public class ObjectVector extends BaseValueVector {
   }
 
   @Override
-  public TransferPair getTransferPair(FieldReference ref) {
+  public TransferPair getTransferPair(FieldReference ref, BufferAllocator allocator) {
     throw new UnsupportedOperationException("ObjectVector does not support this");
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/6dea4299/exec/vector/src/main/java/org/apache/drill/exec/vector/ValueVector.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/ValueVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/ValueVector.java
index b39fcfe..a4a071e 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/ValueVector.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/ValueVector.java
@@ -23,6 +23,7 @@ import io.netty.buffer.DrillBuf;
 
 import org.apache.drill.common.expression.FieldReference;
 import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.proto.UserBitShared.SerializedField;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.TransferPair;
@@ -68,6 +69,8 @@ public interface ValueVector extends Closeable, Iterable<ValueVector> {
    */
   boolean allocateNewSafe();
 
+  BufferAllocator getAllocator();
+
   /**
    * Set the initial record capacity
    * @param numRecords
@@ -99,9 +102,9 @@ public interface ValueVector extends Closeable, Iterable<ValueVector> {
    * Returns a {@link org.apache.drill.exec.record.TransferPair transfer pair}, creating a new target vector of
    * the same type.
    */
-  TransferPair getTransferPair();
+  TransferPair getTransferPair(BufferAllocator allocator);
 
-  TransferPair getTransferPair(FieldReference ref);
+  TransferPair getTransferPair(FieldReference ref, BufferAllocator allocator);
 
   /**
    * Returns a new {@link org.apache.drill.exec.record.TransferPair transfer pair} that is used to transfer underlying

http://git-wip-us.apache.org/repos/asf/drill/blob/6dea4299/exec/vector/src/main/java/org/apache/drill/exec/vector/ZeroVector.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/ZeroVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/ZeroVector.java
index c5326f6..165fc14 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/ZeroVector.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/ZeroVector.java
@@ -94,7 +94,7 @@ public class ZeroVector implements ValueVector {
   }
 
   @Override
-  public TransferPair getTransferPair() {
+  public TransferPair getTransferPair(BufferAllocator allocator) {
     return defaultPair;
   }
 
@@ -138,6 +138,11 @@ public class ZeroVector implements ValueVector {
   }
 
   @Override
+  public BufferAllocator getAllocator() {
+    throw new UnsupportedOperationException("Tried to get allocator from ZeroVector");
+  }
+
+  @Override
   public void setInitialCapacity(int numRecords) { }
 
   @Override
@@ -146,7 +151,7 @@ public class ZeroVector implements ValueVector {
   }
 
   @Override
-  public TransferPair getTransferPair(FieldReference ref) {
+  public TransferPair getTransferPair(FieldReference ref, BufferAllocator allocator) {
     return defaultPair;
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/6dea4299/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/AbstractContainerVector.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/AbstractContainerVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/AbstractContainerVector.java
index caedb96..0ac2417 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/AbstractContainerVector.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/AbstractContainerVector.java
@@ -62,6 +62,10 @@ public abstract class AbstractContainerVector implements ValueVector {
     }
   }
 
+  public BufferAllocator getAllocator() {
+    return allocator;
+  }
+
   /**
    * Returns the field definition of this instance.
    */

http://git-wip-us.apache.org/repos/asf/drill/blob/6dea4299/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/ListVector.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/ListVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/ListVector.java
index b780d1a..10975f5 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/ListVector.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/ListVector.java
@@ -103,8 +103,8 @@ public class ListVector extends BaseRepeatedValueVector {
   }
 
   @Override
-  public TransferPair getTransferPair(FieldReference ref) {
-    return new TransferImpl(field.withPath(ref));
+  public TransferPair getTransferPair(FieldReference ref, BufferAllocator allocator) {
+    return new TransferImpl(field.withPath(ref), allocator);
   }
 
   @Override
@@ -116,7 +116,7 @@ public class ListVector extends BaseRepeatedValueVector {
 
     ListVector to;
 
-    public TransferImpl(MaterializedField field) {
+    public TransferImpl(MaterializedField field, BufferAllocator allocator) {
       to = new ListVector(field, allocator, null);
       to.addOrGetVector(new VectorDescriptor(vector.getField().getType()));
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/6dea4299/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java
index 60d74c1..6784ed4 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java
@@ -143,8 +143,8 @@ public class MapVector extends AbstractMapVector {
   }
 
   @Override
-  public TransferPair getTransferPair() {
-    return new MapTransferPair(this, getField().getPath());
+  public TransferPair getTransferPair(BufferAllocator allocator) {
+    return new MapTransferPair(this, getField().getPath(), allocator);
   }
 
   @Override
@@ -153,8 +153,8 @@ public class MapVector extends AbstractMapVector {
   }
 
   @Override
-  public TransferPair getTransferPair(FieldReference ref) {
-    return new MapTransferPair(this, ref);
+  public TransferPair getTransferPair(FieldReference ref, BufferAllocator allocator) {
+    return new MapTransferPair(this, ref, allocator);
   }
 
   protected static class MapTransferPair implements TransferPair{
@@ -162,8 +162,8 @@ public class MapVector extends AbstractMapVector {
     private final MapVector from;
     private final MapVector to;
 
-    public MapTransferPair(MapVector from, SchemaPath path) {
-      this(from, new MapVector(MaterializedField.create(path, TYPE), from.allocator, from.callBack), false);
+    public MapTransferPair(MapVector from, SchemaPath path, BufferAllocator allocator) {
+      this(from, new MapVector(MaterializedField.create(path, TYPE), allocator, from.callBack), false);
     }
 
     public MapTransferPair(MapVector from, MapVector to) {

http://git-wip-us.apache.org/repos/asf/drill/blob/6dea4299/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java
index cbc61f8..4706999 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java
@@ -189,7 +189,7 @@ public class RepeatedListVector extends AbstractContainerVector
     }
 
     @Override
-    public TransferPair getTransferPair(FieldReference ref) {
+    public TransferPair getTransferPair(FieldReference ref, BufferAllocator allocator) {
       return makeTransferPair(new DelegateRepeatedVector(ref, allocator));
     }
 
@@ -344,13 +344,13 @@ public class RepeatedListVector extends AbstractContainerVector
   }
 
   @Override
-  public TransferPair getTransferPair() {
-    return new RepeatedListTransferPair(delegate.getTransferPair());
+  public TransferPair getTransferPair(BufferAllocator allocator) {
+    return new RepeatedListTransferPair(delegate.getTransferPair(allocator));
   }
 
   @Override
-  public TransferPair getTransferPair(FieldReference ref) {
-    return new RepeatedListTransferPair(delegate.getTransferPair(ref));
+  public TransferPair getTransferPair(FieldReference ref, BufferAllocator allocator) {
+    return new RepeatedListTransferPair(delegate.getTransferPair(ref, allocator));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/6dea4299/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
index cb597be..b13de9d 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
@@ -160,8 +160,8 @@ public class RepeatedMapVector extends AbstractMapVector
   }
 
   @Override
-  public TransferPair getTransferPair() {
-    return new RepeatedMapTransferPair(this, getField().getPath());
+  public TransferPair getTransferPair(BufferAllocator allocator) {
+    return new RepeatedMapTransferPair(this, getField().getPath(), allocator);
   }
 
   @Override
@@ -224,13 +224,13 @@ public class RepeatedMapVector extends AbstractMapVector
     return super.getFieldIdIfMatches(builder, addToBreadCrumb, seg);
   }
 
-  public TransferPair getTransferPairToSingleMap(FieldReference reference) {
-    return new SingleMapTransferPair(this, reference);
+  public TransferPair getTransferPairToSingleMap(FieldReference reference, BufferAllocator allocator) {
+    return new SingleMapTransferPair(this, reference, allocator);
   }
 
   @Override
-  public TransferPair getTransferPair(FieldReference ref) {
-    return new RepeatedMapTransferPair(this, ref);
+  public TransferPair getTransferPair(FieldReference ref, BufferAllocator allocator) {
+    return new RepeatedMapTransferPair(this, ref, allocator);
   }
 
   @Override
@@ -261,8 +261,8 @@ public class RepeatedMapVector extends AbstractMapVector
     private final MapVector to;
     private static final MajorType MAP_TYPE = Types.required(MinorType.MAP);
 
-    public SingleMapTransferPair(RepeatedMapVector from, SchemaPath path) {
-      this(from, new MapVector(MaterializedField.create(path, MAP_TYPE), from.allocator, from.callBack), false);
+    public SingleMapTransferPair(RepeatedMapVector from, SchemaPath path, BufferAllocator allocator) {
+      this(from, new MapVector(MaterializedField.create(path, MAP_TYPE), allocator, from.callBack), false);
     }
 
     public SingleMapTransferPair(RepeatedMapVector from, MapVector to) {
@@ -326,8 +326,8 @@ public class RepeatedMapVector extends AbstractMapVector
     private final RepeatedMapVector to;
     private final RepeatedMapVector from;
 
-    public RepeatedMapTransferPair(RepeatedMapVector from, SchemaPath path) {
-      this(from, new RepeatedMapVector(MaterializedField.create(path, TYPE), from.allocator, from.callBack), false);
+    public RepeatedMapTransferPair(RepeatedMapVector from, SchemaPath path, BufferAllocator allocator) {
+      this(from, new RepeatedMapVector(MaterializedField.create(path, TYPE), allocator, from.callBack), false);
     }
 
     public RepeatedMapTransferPair(RepeatedMapVector from, RepeatedMapVector to) {

http://git-wip-us.apache.org/repos/asf/drill/blob/6dea4299/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/impl/PromotableWriter.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/impl/PromotableWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/impl/PromotableWriter.java
index 894e60e..ee16e97 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/impl/PromotableWriter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/impl/PromotableWriter.java
@@ -151,7 +151,7 @@ public class PromotableWriter extends AbstractPromotableFieldWriter {
 
   private FieldWriter promoteToUnion() {
     String name = vector.getField().getLastName();
-    TransferPair tp = vector.getTransferPair(new FieldReference(vector.getField().getType().getMinorType().name().toLowerCase()));
+    TransferPair tp = vector.getTransferPair(new FieldReference(vector.getField().getType().getMinorType().name().toLowerCase()), vector.getAllocator());
     tp.transfer();
     if (parentContainer != null) {
       unionVector = parentContainer.addOrGet(name, Types.optional(MinorType.UNION), UnionVector.class);


Mime
View raw message