Return-Path: X-Original-To: apmail-drill-commits-archive@www.apache.org Delivered-To: apmail-drill-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 9AF8918900 for ; Tue, 29 Dec 2015 01:09:46 +0000 (UTC) Received: (qmail 85236 invoked by uid 500); 29 Dec 2015 01:09:46 -0000 Delivered-To: apmail-drill-commits-archive@drill.apache.org Received: (qmail 85199 invoked by uid 500); 29 Dec 2015 01:09:46 -0000 Mailing-List: contact commits-help@drill.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: commits@drill.apache.org Delivered-To: mailing list commits@drill.apache.org Received: (qmail 85185 invoked by uid 99); 29 Dec 2015 01:09:46 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 29 Dec 2015 01:09:46 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 45F38E05D9; Tue, 29 Dec 2015 01:09:46 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: smp@apache.org To: commits@drill.apache.org Message-Id: <45af3c188b55426db714e1f7ccfcd70b@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: drill git commit: DRILL-4215: Transfer buffer ownership in TransferPair Date: Tue, 29 Dec 2015 01:09:46 +0000 (UTC) Repository: drill Updated Branches: refs/heads/master de008810c -> 6dea42994 DRILL-4215: Transfer buffer ownership in TransferPair Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/6dea4299 Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/6dea4299 Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/6dea4299 Branch: refs/heads/master Commit: 6dea429949a3d6a68aefbdb3d78de41e0955239b Parents: de00881 Author: Steven Phillips Authored: Tue Dec 1 00:34:41 2015 -0800 Committer: Steven Phillips Committed: Mon Dec 28 15:43:07 2015 -0800 ---------------------------------------------------------------------- .../drill/exec/physical/impl/ScreenCreator.java | 2 +- .../exec/physical/impl/SingleSenderCreator.java | 2 +- .../exec/physical/impl/TopN/TopNBatch.java | 4 ++-- .../physical/impl/aggregate/InternalBatch.java | 9 +++++---- .../impl/aggregate/StreamingAggBatch.java | 2 +- .../impl/aggregate/StreamingAggTemplate.java | 7 ++++--- .../impl/aggregate/StreamingAggregator.java | 3 ++- .../BroadcastSenderRootExec.java | 2 +- .../physical/impl/filter/FilterRecordBatch.java | 2 +- .../impl/flatten/FlattenRecordBatch.java | 4 ++-- .../exec/physical/impl/join/HashJoinBatch.java | 2 +- .../physical/impl/join/NestedLoopJoinBatch.java | 2 +- .../materialize/VectorRecordMaterializer.java | 8 ++++++-- .../OrderedPartitionRecordBatch.java | 2 +- .../impl/producer/ProducerConsumerBatch.java | 2 +- .../physical/impl/sort/RecordBatchData.java | 5 +++-- .../impl/sort/SortRecordBatchBuilder.java | 2 +- .../physical/impl/trace/TraceRecordBatch.java | 2 +- .../physical/impl/window/WindowDataBatch.java | 2 +- .../physical/impl/xsort/ExternalSortBatch.java | 6 +++--- .../drill/exec/record/HyperVectorWrapper.java | 3 ++- .../drill/exec/record/RecordIterator.java | 4 +++- .../apache/drill/exec/record/SchemaUtil.java | 2 +- .../drill/exec/record/SimpleVectorWrapper.java | 5 +++-- .../drill/exec/record/VectorContainer.java | 11 ++++++----- .../apache/drill/exec/record/VectorWrapper.java | 3 ++- .../apache/drill/exec/record/WritableBatch.java | 12 ++++++++++++ .../drill/TestTpchDistributedConcurrent.java | 2 +- .../drill/exec/vector/TestSplitAndTransfer.java | 2 +- .../codegen/templates/FixedValueVectors.java | 16 +++++++--------- .../codegen/templates/NullableValueVectors.java | 10 +++++----- .../codegen/templates/RepeatedValueVectors.java | 10 +++++----- .../src/main/codegen/templates/UnionVector.java | 14 +++++++++----- .../templates/VariableLengthVectors.java | 17 ++++++++--------- .../drill/exec/vector/BaseValueVector.java | 9 +++++++-- .../org/apache/drill/exec/vector/BitVector.java | 10 +++++----- .../apache/drill/exec/vector/ObjectVector.java | 4 ++-- .../apache/drill/exec/vector/ValueVector.java | 7 +++++-- .../apache/drill/exec/vector/ZeroVector.java | 9 +++++++-- .../vector/complex/AbstractContainerVector.java | 4 ++++ .../drill/exec/vector/complex/ListVector.java | 6 +++--- .../drill/exec/vector/complex/MapVector.java | 12 ++++++------ .../exec/vector/complex/RepeatedListVector.java | 10 +++++----- .../exec/vector/complex/RepeatedMapVector.java | 20 ++++++++++---------- .../vector/complex/impl/PromotableWriter.java | 2 +- 45 files changed, 159 insertions(+), 115 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/6dea4299/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java index 3b90979..60355fb 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java @@ -107,7 +107,7 @@ public class ScreenCreator implements RootCreator { return false; case OK_NEW_SCHEMA: - materializer = new VectorRecordMaterializer(context, incoming); + materializer = new VectorRecordMaterializer(context, oContext, incoming); //$FALL-THROUGH$ case OK: injector.injectPause(context.getExecutionControls(), "sending-data", logger); http://git-wip-us.apache.org/repos/asf/drill/blob/6dea4299/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java index 23e97d0..2f33193 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java @@ -120,7 +120,7 @@ public class SingleSenderCreator implements RootCreator{ final FragmentWritableBatch batch = new FragmentWritableBatch( false, handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), recMajor, oppositeHandle.getMinorFragmentId(), - incoming.getWritableBatch()); + incoming.getWritableBatch().transfer(oContext.getAllocator())); updateStats(batch); stats.startWait(); try { http://git-wip-us.apache.org/repos/asf/drill/blob/6dea4299/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java index c287bc3..a6c3269 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java @@ -229,9 +229,9 @@ public class TopNBatch extends AbstractRecordBatch { batchCount++; RecordBatchData batch; if (schemaChanged) { - batch = new RecordBatchData(SchemaUtil.coerceContainer(incoming, this.schema, oContext)); + batch = new RecordBatchData(SchemaUtil.coerceContainer(incoming, this.schema, oContext), oContext.getAllocator()); } else { - batch = new RecordBatchData(incoming); + batch = new RecordBatchData(incoming, oContext.getAllocator()); } boolean success = false; try { http://git-wip-us.apache.org/repos/asf/drill/blob/6dea4299/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java index dae9eae..9e96727 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java @@ -19,6 +19,7 @@ package org.apache.drill.exec.physical.impl.aggregate; import java.util.Iterator; +import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.VectorContainer; @@ -34,11 +35,11 @@ public class InternalBatch implements Iterable>{ private final SelectionVector2 sv2; private final SelectionVector4 sv4; - public InternalBatch(RecordBatch incoming) { - this(incoming, null); + public InternalBatch(RecordBatch incoming, OperatorContext oContext) { + this(incoming, null, oContext); } - public InternalBatch(RecordBatch incoming, VectorWrapper[] ignoreWrappers){ + public InternalBatch(RecordBatch incoming, VectorWrapper[] ignoreWrappers, OperatorContext oContext){ switch(incoming.getSchema().getSelectionVectorMode()){ case FOUR_BYTE: this.sv4 = incoming.getSelectionVector4().createNewWrapperCurrent(); @@ -53,7 +54,7 @@ public class InternalBatch implements Iterable>{ this.sv2 = null; } this.schema = incoming.getSchema(); - this.container = VectorContainer.getTransferClone(incoming, ignoreWrappers); + this.container = VectorContainer.getTransferClone(incoming, ignoreWrappers, oContext); } public BatchSchema getSchema() { http://git-wip-us.apache.org/repos/asf/drill/blob/6dea4299/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java index ee9a0ab..c084e39 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java @@ -309,7 +309,7 @@ public class StreamingAggBatch extends AbstractRecordBatch { container.buildSchema(SelectionVectorMode.NONE); StreamingAggregator agg = context.getImplementationClass(cg); - agg.setup(context, incoming, this); + agg.setup(oContext, incoming, this); return agg; } http://git-wip-us.apache.org/repos/asf/drill/blob/6dea4299/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java index 4932b0f..82e8777 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java @@ -21,6 +21,7 @@ import javax.inject.Named; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.RecordBatch.IterOutcome; import org.apache.drill.exec.record.VectorWrapper; @@ -41,12 +42,12 @@ public abstract class StreamingAggTemplate implements StreamingAggregator { private int outputCount = 0; private RecordBatch incoming; private StreamingAggBatch outgoing; - private FragmentContext context; private boolean done = false; + private OperatorContext context; @Override - public void setup(FragmentContext context, RecordBatch incoming, StreamingAggBatch outgoing) throws SchemaChangeException { + public void setup(OperatorContext context, RecordBatch incoming, StreamingAggBatch outgoing) throws SchemaChangeException { this.context = context; this.incoming = incoming; this.outgoing = outgoing; @@ -164,7 +165,7 @@ public abstract class StreamingAggTemplate implements StreamingAggregator { previousIndex = currentIndex; } - InternalBatch previous = new InternalBatch(incoming); + InternalBatch previous = new InternalBatch(incoming, context); try { while (true) { http://git-wip-us.apache.org/repos/asf/drill/blob/6dea4299/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java index 96da00b..61c82d8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java @@ -20,6 +20,7 @@ package org.apache.drill.exec.physical.impl.aggregate; import org.apache.drill.exec.compile.TemplateClassDefinition; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.RecordBatch.IterOutcome; @@ -31,7 +32,7 @@ public interface StreamingAggregator { RETURN_OUTCOME, CLEANUP_AND_RETURN, UPDATE_AGGREGATOR; } - public abstract void setup(FragmentContext context, RecordBatch incoming, StreamingAggBatch outgoing) throws SchemaChangeException; + public abstract void setup(OperatorContext context, RecordBatch incoming, StreamingAggBatch outgoing) throws SchemaChangeException; public abstract IterOutcome getOutcome(); http://git-wip-us.apache.org/repos/asf/drill/blob/6dea4299/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java index c88c72d..80d7744 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java @@ -117,7 +117,7 @@ public class BroadcastSenderRootExec extends BaseRootExec { case OK_NEW_SCHEMA: case OK: - WritableBatch writableBatch = incoming.getWritableBatch(); + WritableBatch writableBatch = incoming.getWritableBatch().transfer(oContext.getAllocator()); if (tunnels.length > 1) { writableBatch.retainBuffers(tunnels.length - 1); } http://git-wip-us.apache.org/repos/asf/drill/blob/6dea4299/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java index e435e79..c0e8944 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java @@ -153,7 +153,7 @@ public class FilterRecordBatch extends AbstractSingleRecordBatch{ for (final VectorWrapper vw : incoming) { for (final ValueVector vv : vw.getValueVectors()) { - final TransferPair pair = vv.getTransferPair(); + final TransferPair pair = vv.getTransferPair(oContext.getAllocator()); container.add(pair.getTo()); transfers.add(pair); } http://git-wip-us.apache.org/repos/asf/drill/blob/6dea4299/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java index c3b3f45..dcaa244 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java @@ -265,12 +265,12 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch { TransferPair tp = null; if (flattenField instanceof RepeatedMapVector) { - tp = ((RepeatedMapVector)flattenField).getTransferPairToSingleMap(reference); + tp = ((RepeatedMapVector)flattenField).getTransferPairToSingleMap(reference, oContext.getAllocator()); } else { final ValueVector vvIn = RepeatedValueVector.class.cast(flattenField).getDataVector(); // vvIn may be null because of fast schema return for repeated list vectors if (vvIn != null) { - tp = vvIn.getTransferPair(reference); + tp = vvIn.getTransferPair(reference, oContext.getAllocator()); } } return tp; http://git-wip-us.apache.org/repos/asf/drill/blob/6dea4299/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java index 2d0bd43..3ea97c6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java @@ -380,7 +380,7 @@ public class HashJoinBatch extends AbstractRecordBatch { * to the hyper vector container. Will be used when we want to retrieve * records that have matching keys on the probe side. */ - final RecordBatchData nextBatch = new RecordBatchData(right); + final RecordBatchData nextBatch = new RecordBatchData(right, oContext.getAllocator()); boolean success = false; try { if (hyperContainer == null) { http://git-wip-us.apache.org/repos/asf/drill/blob/6dea4299/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java index b390b8f..f0e53e1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java @@ -340,7 +340,7 @@ public class NestedLoopJoinBatch extends AbstractRecordBatch } private void addBatchToHyperContainer(RecordBatch inputBatch) { - final RecordBatchData batchCopy = new RecordBatchData(inputBatch); + final RecordBatchData batchCopy = new RecordBatchData(inputBatch, oContext.getAllocator()); boolean success = false; try { rightCounts.addLast(inputBatch.getRecordCount()); http://git-wip-us.apache.org/repos/asf/drill/blob/6dea4299/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/VectorRecordMaterializer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/VectorRecordMaterializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/VectorRecordMaterializer.java index 3933ddd..ba8df92 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/VectorRecordMaterializer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/VectorRecordMaterializer.java @@ -17,7 +17,9 @@ */ package org.apache.drill.exec.physical.impl.materialize; +import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.proto.UserBitShared.QueryId; import org.apache.drill.exec.proto.UserBitShared.QueryData; import org.apache.drill.exec.record.BatchSchema; @@ -29,10 +31,12 @@ public class VectorRecordMaterializer implements RecordMaterializer{ private QueryId queryId; private RecordBatch batch; + private BufferAllocator allocator; - public VectorRecordMaterializer(FragmentContext context, RecordBatch batch) { + public VectorRecordMaterializer(FragmentContext context, OperatorContext oContext, RecordBatch batch) { this.queryId = context.getHandle().getQueryId(); this.batch = batch; + this.allocator = oContext.getAllocator(); BatchSchema schema = batch.getSchema(); assert schema != null : "Schema must be defined."; @@ -43,7 +47,7 @@ public class VectorRecordMaterializer implements RecordMaterializer{ public QueryWritableBatch convertNext() { //batch.getWritableBatch().getDef().getRecordCount() - WritableBatch w = batch.getWritableBatch(); + WritableBatch w = batch.getWritableBatch().transfer(allocator); QueryData header = QueryData.newBuilder() // .setQueryId(queryId) // http://git-wip-us.apache.org/repos/asf/drill/blob/6dea4299/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java index c9483ae..64cfad0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java @@ -590,7 +590,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch vw : batch) { - TransferPair tp = vw.getValueVector().getTransferPair(); + TransferPair tp = vw.getValueVector().getTransferPair(oContext.getAllocator()); transfers.add(tp); container.add(tp.getTo()); } http://git-wip-us.apache.org/repos/asf/drill/blob/6dea4299/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java index e3033b4..38d08b6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java @@ -141,7 +141,7 @@ public class ProducerConsumerBatch extends AbstractRecordBatch { return; case OK_NEW_SCHEMA: case OK: - wrapper = RecordBatchDataWrapper.batch(new RecordBatchData(incoming)); + wrapper = RecordBatchDataWrapper.batch(new RecordBatchData(incoming, oContext.getAllocator())); queue.put(wrapper); wrapper = null; break; http://git-wip-us.apache.org/repos/asf/drill/blob/6dea4299/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java index af774db..0cd55eb 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java @@ -19,6 +19,7 @@ package org.apache.drill.exec.physical.impl.sort; import java.util.List; +import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.TransferPair; @@ -40,7 +41,7 @@ public class RecordBatchData { private int recordCount; VectorContainer container = new VectorContainer(); - public RecordBatchData(VectorAccessible batch) { + public RecordBatchData(VectorAccessible batch, BufferAllocator allocator) { List vectors = Lists.newArrayList(); recordCount = batch.getRecordCount(); @@ -54,7 +55,7 @@ public class RecordBatchData { if (v.isHyper()) { throw new UnsupportedOperationException("Record batch data can't be created based on a hyper batch."); } - TransferPair tp = v.getValueVector().getTransferPair(); + TransferPair tp = v.getValueVector().getTransferPair(allocator); tp.transfer(); vectors.add(tp.getTo()); } http://git-wip-us.apache.org/repos/asf/drill/blob/6dea4299/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java index f2302ce..33338dd 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java @@ -91,7 +91,7 @@ public class SortRecordBatchBuilder implements AutoCloseable { } - RecordBatchData bd = new RecordBatchData(batch); + RecordBatchData bd = new RecordBatchData(batch, allocator); runningBatches++; batches.put(batch.getSchema(), bd); recordCount += bd.getRecordCount(); http://git-wip-us.apache.org/repos/asf/drill/blob/6dea4299/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java index ff83cc9..209624b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java @@ -140,7 +140,7 @@ public class TraceRecordBatch extends AbstractSingleRecordBatch { /* Add all the value vectors in the container */ for (VectorWrapper vv : incoming) { - TransferPair tp = vv.getValueVector().getTransferPair(); + TransferPair tp = vv.getValueVector().getTransferPair(oContext.getAllocator()); container.add(tp.getTo()); } container.buildSchema(incoming.getSchema().getSelectionVectorMode()); http://git-wip-us.apache.org/repos/asf/drill/blob/6dea4299/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowDataBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowDataBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowDataBatch.java index b2befa3..7abc03c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowDataBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowDataBatch.java @@ -47,7 +47,7 @@ public class WindowDataBatch implements VectorAccessible { if (v.isHyper()) { throw new UnsupportedOperationException("Record batch data can't be created based on a hyper batch."); } - TransferPair tp = v.getValueVector().getTransferPair(); + TransferPair tp = v.getValueVector().getTransferPair(oContext.getAllocator()); tp.transfer(); vectors.add(tp.getTo()); } http://git-wip-us.apache.org/repos/asf/drill/blob/6dea4299/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java index 4dbd92d..6e79f01 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java @@ -366,7 +366,7 @@ public class ExternalSortBatch extends AbstractRecordBatch { totalCount += count; sorter.setup(context, sv2, convertedBatch); sorter.sort(sv2); - RecordBatchData rbd = new RecordBatchData(convertedBatch); + RecordBatchData rbd = new RecordBatchData(convertedBatch, oContext.getAllocator()); boolean success = false; try { rbd.setSv2(sv2); @@ -446,7 +446,7 @@ public class ExternalSortBatch extends AbstractRecordBatch { builder = new SortRecordBatchBuilder(oContext.getAllocator()); for (BatchGroup group : batchGroups) { - RecordBatchData rbd = new RecordBatchData(group.getContainer()); + RecordBatchData rbd = new RecordBatchData(group.getContainer(), oContext.getAllocator()); rbd.setSv2(group.getSv2()); builder.add(rbd); } @@ -562,7 +562,7 @@ public class ExternalSortBatch extends AbstractRecordBatch { // 1 output container is kept in memory, so we want to hold on to it and transferClone // allows keeping ownership - VectorContainer c1 = VectorContainer.getTransferClone(outputContainer); + VectorContainer c1 = VectorContainer.getTransferClone(outputContainer, oContext); c1.buildSchema(BatchSchema.SelectionVectorMode.NONE); c1.setRecordCount(count); http://git-wip-us.apache.org/repos/asf/drill/blob/6dea4299/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java index 7fc7960..322339e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java @@ -21,6 +21,7 @@ import java.util.AbstractMap; import org.apache.commons.lang3.ArrayUtils; import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.complex.AbstractContainerVector; import org.apache.drill.exec.vector.complex.AbstractMapVector; @@ -116,7 +117,7 @@ public class HyperVectorWrapper implements VectorWrapper< @Override @SuppressWarnings("unchecked") - public VectorWrapper cloneAndTransfer() { + public VectorWrapper cloneAndTransfer(BufferAllocator allocator) { return new HyperVectorWrapper(f, vectors, false); // T[] newVectors = (T[]) Array.newInstance(vectors.getClass().getComponentType(), vectors.length); // for(int i =0; i < newVectors.length; i++) { http://git-wip-us.apache.org/repos/asf/drill/blob/6dea4299/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java index 77cb9a1..af0a753 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java @@ -53,6 +53,7 @@ public class RecordIterator implements VectorAccessible { private int inputIndex; // For two way merge join 0:left, 1:right private boolean lastBatchRead; // True if all batches are consumed. private boolean initialized; + private OperatorContext oContext; private final VectorContainer container; // Holds VectorContainer of current record batch private final TreeRangeMap batches = TreeRangeMap.create(); @@ -66,6 +67,7 @@ public class RecordIterator implements VectorAccessible { this.inputIndex = inputIndex; this.lastBatchRead = false; this.container = new VectorContainer(oContext); + this.oContext = oContext; resetIndices(); this.initialized = false; } @@ -181,7 +183,7 @@ public class RecordIterator implements VectorAccessible { nextOuterPosition = 0; } // Transfer vectors from incoming record batch. - final RecordBatchData rbd = new RecordBatchData(incoming); + final RecordBatchData rbd = new RecordBatchData(incoming, oContext.getAllocator()); innerRecordCount = incoming.getRecordCount(); if (!initialized) { for (VectorWrapper w : rbd.getContainer()) { http://git-wip-us.apache.org/repos/asf/drill/blob/6dea4299/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaUtil.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaUtil.java index 8cf90ab..48f0a36 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaUtil.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaUtil.java @@ -100,7 +100,7 @@ public class SchemaUtil { int recordCount, OperatorContext context) { if (v != null) { int valueCount = v.getAccessor().getValueCount(); - TransferPair tp = v.getTransferPair(); + TransferPair tp = v.getTransferPair(context.getAllocator()); tp.transfer(); if (v.getField().getType().getMinorType().equals(field.getType().getMinorType())) { if (field.getType().getMinorType() == MinorType.UNION) { http://git-wip-us.apache.org/repos/asf/drill/blob/6dea4299/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java index f1b60d4..1e8a52f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java @@ -26,6 +26,7 @@ import org.apache.drill.common.types.TypeProtos.MajorType; import org.apache.drill.common.types.TypeProtos.MajorTypeOrBuilder; import org.apache.drill.common.types.TypeProtos.MinorType; import org.apache.drill.common.types.Types; +import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.complex.AbstractContainerVector; import org.apache.drill.exec.vector.complex.AbstractMapVector; @@ -74,8 +75,8 @@ public class SimpleVectorWrapper implements VectorWrapper @SuppressWarnings("unchecked") @Override - public VectorWrapper cloneAndTransfer() { - TransferPair tp = vector.getTransferPair(); + public VectorWrapper cloneAndTransfer(BufferAllocator allocator) { + TransferPair tp = vector.getTransferPair(allocator); tp.transfer(); return new SimpleVectorWrapper((T) tp.getTo()); } http://git-wip-us.apache.org/repos/asf/drill/blob/6dea4299/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java index c483650..33351ba 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java @@ -28,6 +28,7 @@ import java.util.Set; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.types.TypeProtos.MajorType; import org.apache.drill.exec.expr.TypeHelper; +import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; import org.apache.drill.exec.vector.SchemaChangeCallBack; @@ -148,15 +149,15 @@ public class VectorContainer implements Iterable>, VectorAccess * The RecordBatch iterator the contains the batch we should take over. * @return A cloned vector container. */ - public static VectorContainer getTransferClone(VectorAccessible incoming) { - VectorContainer vc = new VectorContainer(); + public static VectorContainer getTransferClone(VectorAccessible incoming, OperatorContext oContext) { + VectorContainer vc = new VectorContainer(oContext); for (VectorWrapper w : incoming) { vc.cloneAndTransfer(w); } return vc; } - public static VectorContainer getTransferClone(VectorAccessible incoming, VectorWrapper[] ignoreWrappers) { + public static VectorContainer getTransferClone(VectorAccessible incoming, VectorWrapper[] ignoreWrappers, OperatorContext oContext) { Iterable> wrappers = incoming; if (ignoreWrappers != null) { final List ignored = Lists.newArrayList(ignoreWrappers); @@ -165,7 +166,7 @@ public class VectorContainer implements Iterable>, VectorAccess wrappers = resultant; } - final VectorContainer vc = new VectorContainer(); + final VectorContainer vc = new VectorContainer(oContext); for (VectorWrapper w : wrappers) { vc.cloneAndTransfer(w); } @@ -198,7 +199,7 @@ public class VectorContainer implements Iterable>, VectorAccess } private void cloneAndTransfer(VectorWrapper wrapper) { - wrappers.add(wrapper.cloneAndTransfer()); + wrappers.add(wrapper.cloneAndTransfer(oContext.getAllocator())); } public void addCollection(Iterable vectors) { http://git-wip-us.apache.org/repos/asf/drill/blob/6dea4299/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorWrapper.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorWrapper.java index 5250f98..65ea457 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorWrapper.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorWrapper.java @@ -18,6 +18,7 @@ package org.apache.drill.exec.record; import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.vector.ValueVector; @@ -31,7 +32,7 @@ public interface VectorWrapper { public T[] getValueVectors(); public boolean isHyper(); public void clear(); - public VectorWrapper cloneAndTransfer(); + public VectorWrapper cloneAndTransfer(BufferAllocator allocator); public VectorWrapper getChildWrapper(int[] ids); public void transfer(VectorWrapper destination); http://git-wip-us.apache.org/repos/asf/drill/blob/6dea4299/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java index d39ce5e..bcec920 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java @@ -51,6 +51,18 @@ public class WritableBatch implements AutoCloseable { this.buffers = buffers; } + public WritableBatch transfer(BufferAllocator allocator) { + List newBuffers = Lists.newArrayList(); + for (DrillBuf buf : buffers) { + int writerIndex = buf.writerIndex(); + DrillBuf newBuf = buf.transferOwnership(allocator).buffer; + newBuf.writerIndex(writerIndex); + newBuffers.add(newBuf); + } + clear(); + return new WritableBatch(def, newBuffers); + } + public RecordBatchDef getDef() { return def; } http://git-wip-us.apache.org/repos/asf/drill/blob/6dea4299/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributedConcurrent.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributedConcurrent.java b/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributedConcurrent.java index f7843f5..66b7571 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributedConcurrent.java +++ b/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributedConcurrent.java @@ -46,7 +46,7 @@ import static org.junit.Assert.assertNull; * any particular order of execution. We ignore the results. */ public class TestTpchDistributedConcurrent extends BaseTestQuery { - @Rule public final TestRule TIMEOUT = TestTools.getTimeoutRule(120000); // Longer timeout than usual. + @Rule public final TestRule TIMEOUT = TestTools.getTimeoutRule(140000); // Longer timeout than usual. /* * Valid test names taken from TestTpchDistributed. Fuller path prefixes are http://git-wip-us.apache.org/repos/asf/drill/blob/6dea4299/exec/java-exec/src/test/java/org/apache/drill/exec/vector/TestSplitAndTransfer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/TestSplitAndTransfer.java b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/TestSplitAndTransfer.java index b2054e6..6ac8e97 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/TestSplitAndTransfer.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/TestSplitAndTransfer.java @@ -51,7 +51,7 @@ public class TestSplitAndTransfer { } mutator.setValueCount(valueCount); - final TransferPair tp = varCharVector.getTransferPair(); + final TransferPair tp = varCharVector.getTransferPair(allocator); final NullableVarCharVector newVarCharVector = (NullableVarCharVector) tp.getTo(); final Accessor accessor = newVarCharVector.getAccessor(); final int[][] startLengths = {{0, 201}, {201, 200}, {401, 99}}; http://git-wip-us.apache.org/repos/asf/drill/blob/6dea4299/exec/vector/src/main/codegen/templates/FixedValueVectors.java ---------------------------------------------------------------------- diff --git a/exec/vector/src/main/codegen/templates/FixedValueVectors.java b/exec/vector/src/main/codegen/templates/FixedValueVectors.java index f67614a..8e77dcc 100644 --- a/exec/vector/src/main/codegen/templates/FixedValueVectors.java +++ b/exec/vector/src/main/codegen/templates/FixedValueVectors.java @@ -198,13 +198,13 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F data.writerIndex(actualLength); } - public TransferPair getTransferPair(){ - return new TransferImpl(getField()); + public TransferPair getTransferPair(BufferAllocator allocator){ + return new TransferImpl(getField(), allocator); } @Override - public TransferPair getTransferPair(FieldReference ref){ - return new TransferImpl(getField().withPath(ref)); + public TransferPair getTransferPair(FieldReference ref, BufferAllocator allocator){ + return new TransferImpl(getField().withPath(ref), allocator); } @Override @@ -214,8 +214,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F public void transferTo(${minor.class}Vector target){ target.clear(); - target.data = data; - target.data.retain(1); + target.data = data.transferOwnership(target.allocator).buffer; target.data.writerIndex(data.writerIndex()); clear(); } @@ -224,15 +223,14 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F final int startPoint = startIndex * ${type.width}; final int sliceLength = length * ${type.width}; target.clear(); - target.data = data.slice(startPoint, sliceLength); - target.data.retain(1); + target.data = data.slice(startPoint, sliceLength).transferOwnership(target.allocator).buffer; target.data.writerIndex(sliceLength); } private class TransferImpl implements TransferPair{ private ${minor.class}Vector to; - public TransferImpl(MaterializedField field){ + public TransferImpl(MaterializedField field, BufferAllocator allocator){ to = new ${minor.class}Vector(field, allocator); } http://git-wip-us.apache.org/repos/asf/drill/blob/6dea4299/exec/vector/src/main/codegen/templates/NullableValueVectors.java ---------------------------------------------------------------------- diff --git a/exec/vector/src/main/codegen/templates/NullableValueVectors.java b/exec/vector/src/main/codegen/templates/NullableValueVectors.java index 13bdd4f..d2c17ff 100644 --- a/exec/vector/src/main/codegen/templates/NullableValueVectors.java +++ b/exec/vector/src/main/codegen/templates/NullableValueVectors.java @@ -243,13 +243,13 @@ public final class ${className} extends BaseDataValueVector implements <#if type } @Override - public TransferPair getTransferPair(){ - return new TransferImpl(getField()); + public TransferPair getTransferPair(BufferAllocator allocator){ + return new TransferImpl(getField(), allocator); } @Override - public TransferPair getTransferPair(FieldReference ref){ - return new TransferImpl(getField().withPath(ref)); + public TransferPair getTransferPair(FieldReference ref, BufferAllocator allocator){ + return new TransferImpl(getField().withPath(ref), allocator); } @Override @@ -277,7 +277,7 @@ public final class ${className} extends BaseDataValueVector implements <#if type private class TransferImpl implements TransferPair { Nullable${minor.class}Vector to; - public TransferImpl(MaterializedField field){ + public TransferImpl(MaterializedField field, BufferAllocator allocator){ to = new Nullable${minor.class}Vector(field, allocator); } http://git-wip-us.apache.org/repos/asf/drill/blob/6dea4299/exec/vector/src/main/codegen/templates/RepeatedValueVectors.java ---------------------------------------------------------------------- diff --git a/exec/vector/src/main/codegen/templates/RepeatedValueVectors.java b/exec/vector/src/main/codegen/templates/RepeatedValueVectors.java index 21f5616..ca39d71 100644 --- a/exec/vector/src/main/codegen/templates/RepeatedValueVectors.java +++ b/exec/vector/src/main/codegen/templates/RepeatedValueVectors.java @@ -79,13 +79,13 @@ public final class Repeated${minor.class}Vector extends BaseRepeatedValueVector } @Override - public TransferPair getTransferPair() { - return new TransferImpl(getField()); + public TransferPair getTransferPair(BufferAllocator allocator) { + return new TransferImpl(getField(), allocator); } @Override - public TransferPair getTransferPair(FieldReference ref){ - return new TransferImpl(getField().withPath(ref)); + public TransferPair getTransferPair(FieldReference ref, BufferAllocator allocator){ + return new TransferImpl(getField().withPath(ref), allocator); } @Override @@ -131,7 +131,7 @@ public final class Repeated${minor.class}Vector extends BaseRepeatedValueVector private class TransferImpl implements TransferPair { final Repeated${minor.class}Vector to; - public TransferImpl(MaterializedField field) { + public TransferImpl(MaterializedField field, BufferAllocator allocator) { this.to = new Repeated${minor.class}Vector(field, allocator); } http://git-wip-us.apache.org/repos/asf/drill/blob/6dea4299/exec/vector/src/main/codegen/templates/UnionVector.java ---------------------------------------------------------------------- diff --git a/exec/vector/src/main/codegen/templates/UnionVector.java b/exec/vector/src/main/codegen/templates/UnionVector.java index cc541e5..2e278b1 100644 --- a/exec/vector/src/main/codegen/templates/UnionVector.java +++ b/exec/vector/src/main/codegen/templates/UnionVector.java @@ -80,6 +80,10 @@ public class UnionVector implements ValueVector { this.callBack = callBack; } + public BufferAllocator getAllocator() { + return allocator; + } + public List getSubTypes() { return majorType.getSubTypeList(); } @@ -198,13 +202,13 @@ public class UnionVector implements ValueVector { } @Override - public TransferPair getTransferPair() { - return new TransferImpl(field); + public TransferPair getTransferPair(BufferAllocator allocator) { + return new TransferImpl(field, allocator); } @Override - public TransferPair getTransferPair(FieldReference ref) { - return new TransferImpl(field.withPath(ref)); + public TransferPair getTransferPair(FieldReference ref, BufferAllocator allocator) { + return new TransferImpl(field.withPath(ref), allocator); } @Override @@ -242,7 +246,7 @@ public class UnionVector implements ValueVector { UnionVector to; - public TransferImpl(MaterializedField field) { + public TransferImpl(MaterializedField field, BufferAllocator allocator) { to = new UnionVector(field, allocator, null); } http://git-wip-us.apache.org/repos/asf/drill/blob/6dea4299/exec/vector/src/main/codegen/templates/VariableLengthVectors.java ---------------------------------------------------------------------- diff --git a/exec/vector/src/main/codegen/templates/VariableLengthVectors.java b/exec/vector/src/main/codegen/templates/VariableLengthVectors.java index f734734..56d2d52 100644 --- a/exec/vector/src/main/codegen/templates/VariableLengthVectors.java +++ b/exec/vector/src/main/codegen/templates/VariableLengthVectors.java @@ -174,13 +174,13 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V } @Override - public TransferPair getTransferPair(){ - return new TransferImpl(getField()); + public TransferPair getTransferPair(BufferAllocator allocator){ + return new TransferImpl(getField(), allocator); } @Override - public TransferPair getTransferPair(FieldReference ref){ - return new TransferImpl(getField().withPath(ref)); + public TransferPair getTransferPair(FieldReference ref, BufferAllocator allocator){ + return new TransferImpl(getField().withPath(ref), allocator); } @Override @@ -191,8 +191,8 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V public void transferTo(${minor.class}Vector target){ target.clear(); this.offsetVector.transferTo(target.offsetVector); - target.data = data; - target.data.retain(1); + target.data = data.transferOwnership(target.allocator).buffer; + target.data.writerIndex(data.writerIndex()); clear(); } @@ -207,8 +207,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V for (int i = 0; i < length + 1; i++) { targetOffsetVectorMutator.set(i, offsetVectorAccessor.get(startIndex + i) - startPoint); } - target.data = data.slice(startPoint, sliceLength); - target.data.retain(1); + target.data = data.slice(startPoint, sliceLength).transferOwnership(target.allocator).buffer; target.getMutator().setValueCount(length); } @@ -242,7 +241,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V private class TransferImpl implements TransferPair{ ${minor.class}Vector to; - public TransferImpl(MaterializedField field){ + public TransferImpl(MaterializedField field, BufferAllocator allocator){ to = new ${minor.class}Vector(field, allocator); } http://git-wip-us.apache.org/repos/asf/drill/blob/6dea4299/exec/vector/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java ---------------------------------------------------------------------- diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java index eb5dbcd..23ad778 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java @@ -71,8 +71,8 @@ public abstract class BaseValueVector implements ValueVector { } @Override - public TransferPair getTransferPair() { - return getTransferPair(new FieldReference(getField().getPath())); + public TransferPair getTransferPair(BufferAllocator allocator) { + return getTransferPair(new FieldReference(getField().getPath()), allocator); } @Override @@ -119,5 +119,10 @@ public abstract class BaseValueVector implements ValueVector { return true; } + + @Override + public BufferAllocator getAllocator() { + return allocator; + } } http://git-wip-us.apache.org/repos/asf/drill/blob/6dea4299/exec/vector/src/main/java/org/apache/drill/exec/vector/BitVector.java ---------------------------------------------------------------------- diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/BitVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/BitVector.java index c1504c6..3ba11e2 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/BitVector.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/BitVector.java @@ -208,13 +208,13 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe } @Override - public TransferPair getTransferPair() { - return new TransferImpl(getField()); + public TransferPair getTransferPair(BufferAllocator allocator) { + return new TransferImpl(getField(), allocator); } @Override - public TransferPair getTransferPair(FieldReference ref) { - return new TransferImpl(getField().withPath(ref)); + public TransferPair getTransferPair(FieldReference ref, BufferAllocator allocator) { + return new TransferImpl(getField().withPath(ref), allocator); } @Override @@ -273,7 +273,7 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe private class TransferImpl implements TransferPair { BitVector to; - public TransferImpl(MaterializedField field) { + public TransferImpl(MaterializedField field, BufferAllocator allocator) { this.to = new BitVector(field, allocator); } http://git-wip-us.apache.org/repos/asf/drill/blob/6dea4299/exec/vector/src/main/java/org/apache/drill/exec/vector/ObjectVector.java ---------------------------------------------------------------------- diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/ObjectVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/ObjectVector.java index 9ca4410..494f234 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/ObjectVector.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/ObjectVector.java @@ -147,7 +147,7 @@ public class ObjectVector extends BaseValueVector { } @Override - public TransferPair getTransferPair() { + public TransferPair getTransferPair(BufferAllocator allocator) { throw new UnsupportedOperationException("ObjectVector does not support this"); } @@ -157,7 +157,7 @@ public class ObjectVector extends BaseValueVector { } @Override - public TransferPair getTransferPair(FieldReference ref) { + public TransferPair getTransferPair(FieldReference ref, BufferAllocator allocator) { throw new UnsupportedOperationException("ObjectVector does not support this"); } http://git-wip-us.apache.org/repos/asf/drill/blob/6dea4299/exec/vector/src/main/java/org/apache/drill/exec/vector/ValueVector.java ---------------------------------------------------------------------- diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/ValueVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/ValueVector.java index b39fcfe..a4a071e 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/ValueVector.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/ValueVector.java @@ -23,6 +23,7 @@ import io.netty.buffer.DrillBuf; import org.apache.drill.common.expression.FieldReference; import org.apache.drill.exec.exception.OutOfMemoryException; +import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.proto.UserBitShared.SerializedField; import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.TransferPair; @@ -68,6 +69,8 @@ public interface ValueVector extends Closeable, Iterable { */ boolean allocateNewSafe(); + BufferAllocator getAllocator(); + /** * Set the initial record capacity * @param numRecords @@ -99,9 +102,9 @@ public interface ValueVector extends Closeable, Iterable { * Returns a {@link org.apache.drill.exec.record.TransferPair transfer pair}, creating a new target vector of * the same type. */ - TransferPair getTransferPair(); + TransferPair getTransferPair(BufferAllocator allocator); - TransferPair getTransferPair(FieldReference ref); + TransferPair getTransferPair(FieldReference ref, BufferAllocator allocator); /** * Returns a new {@link org.apache.drill.exec.record.TransferPair transfer pair} that is used to transfer underlying http://git-wip-us.apache.org/repos/asf/drill/blob/6dea4299/exec/vector/src/main/java/org/apache/drill/exec/vector/ZeroVector.java ---------------------------------------------------------------------- diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/ZeroVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/ZeroVector.java index c5326f6..165fc14 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/ZeroVector.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/ZeroVector.java @@ -94,7 +94,7 @@ public class ZeroVector implements ValueVector { } @Override - public TransferPair getTransferPair() { + public TransferPair getTransferPair(BufferAllocator allocator) { return defaultPair; } @@ -138,6 +138,11 @@ public class ZeroVector implements ValueVector { } @Override + public BufferAllocator getAllocator() { + throw new UnsupportedOperationException("Tried to get allocator from ZeroVector"); + } + + @Override public void setInitialCapacity(int numRecords) { } @Override @@ -146,7 +151,7 @@ public class ZeroVector implements ValueVector { } @Override - public TransferPair getTransferPair(FieldReference ref) { + public TransferPair getTransferPair(FieldReference ref, BufferAllocator allocator) { return defaultPair; } http://git-wip-us.apache.org/repos/asf/drill/blob/6dea4299/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/AbstractContainerVector.java ---------------------------------------------------------------------- diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/AbstractContainerVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/AbstractContainerVector.java index caedb96..0ac2417 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/AbstractContainerVector.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/AbstractContainerVector.java @@ -62,6 +62,10 @@ public abstract class AbstractContainerVector implements ValueVector { } } + public BufferAllocator getAllocator() { + return allocator; + } + /** * Returns the field definition of this instance. */ http://git-wip-us.apache.org/repos/asf/drill/blob/6dea4299/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/ListVector.java ---------------------------------------------------------------------- diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/ListVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/ListVector.java index b780d1a..10975f5 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/ListVector.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/ListVector.java @@ -103,8 +103,8 @@ public class ListVector extends BaseRepeatedValueVector { } @Override - public TransferPair getTransferPair(FieldReference ref) { - return new TransferImpl(field.withPath(ref)); + public TransferPair getTransferPair(FieldReference ref, BufferAllocator allocator) { + return new TransferImpl(field.withPath(ref), allocator); } @Override @@ -116,7 +116,7 @@ public class ListVector extends BaseRepeatedValueVector { ListVector to; - public TransferImpl(MaterializedField field) { + public TransferImpl(MaterializedField field, BufferAllocator allocator) { to = new ListVector(field, allocator, null); to.addOrGetVector(new VectorDescriptor(vector.getField().getType())); } http://git-wip-us.apache.org/repos/asf/drill/blob/6dea4299/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java ---------------------------------------------------------------------- diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java index 60d74c1..6784ed4 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java @@ -143,8 +143,8 @@ public class MapVector extends AbstractMapVector { } @Override - public TransferPair getTransferPair() { - return new MapTransferPair(this, getField().getPath()); + public TransferPair getTransferPair(BufferAllocator allocator) { + return new MapTransferPair(this, getField().getPath(), allocator); } @Override @@ -153,8 +153,8 @@ public class MapVector extends AbstractMapVector { } @Override - public TransferPair getTransferPair(FieldReference ref) { - return new MapTransferPair(this, ref); + public TransferPair getTransferPair(FieldReference ref, BufferAllocator allocator) { + return new MapTransferPair(this, ref, allocator); } protected static class MapTransferPair implements TransferPair{ @@ -162,8 +162,8 @@ public class MapVector extends AbstractMapVector { private final MapVector from; private final MapVector to; - public MapTransferPair(MapVector from, SchemaPath path) { - this(from, new MapVector(MaterializedField.create(path, TYPE), from.allocator, from.callBack), false); + public MapTransferPair(MapVector from, SchemaPath path, BufferAllocator allocator) { + this(from, new MapVector(MaterializedField.create(path, TYPE), allocator, from.callBack), false); } public MapTransferPair(MapVector from, MapVector to) { http://git-wip-us.apache.org/repos/asf/drill/blob/6dea4299/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java ---------------------------------------------------------------------- diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java index cbc61f8..4706999 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java @@ -189,7 +189,7 @@ public class RepeatedListVector extends AbstractContainerVector } @Override - public TransferPair getTransferPair(FieldReference ref) { + public TransferPair getTransferPair(FieldReference ref, BufferAllocator allocator) { return makeTransferPair(new DelegateRepeatedVector(ref, allocator)); } @@ -344,13 +344,13 @@ public class RepeatedListVector extends AbstractContainerVector } @Override - public TransferPair getTransferPair() { - return new RepeatedListTransferPair(delegate.getTransferPair()); + public TransferPair getTransferPair(BufferAllocator allocator) { + return new RepeatedListTransferPair(delegate.getTransferPair(allocator)); } @Override - public TransferPair getTransferPair(FieldReference ref) { - return new RepeatedListTransferPair(delegate.getTransferPair(ref)); + public TransferPair getTransferPair(FieldReference ref, BufferAllocator allocator) { + return new RepeatedListTransferPair(delegate.getTransferPair(ref, allocator)); } @Override http://git-wip-us.apache.org/repos/asf/drill/blob/6dea4299/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java ---------------------------------------------------------------------- diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java index cb597be..b13de9d 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java @@ -160,8 +160,8 @@ public class RepeatedMapVector extends AbstractMapVector } @Override - public TransferPair getTransferPair() { - return new RepeatedMapTransferPair(this, getField().getPath()); + public TransferPair getTransferPair(BufferAllocator allocator) { + return new RepeatedMapTransferPair(this, getField().getPath(), allocator); } @Override @@ -224,13 +224,13 @@ public class RepeatedMapVector extends AbstractMapVector return super.getFieldIdIfMatches(builder, addToBreadCrumb, seg); } - public TransferPair getTransferPairToSingleMap(FieldReference reference) { - return new SingleMapTransferPair(this, reference); + public TransferPair getTransferPairToSingleMap(FieldReference reference, BufferAllocator allocator) { + return new SingleMapTransferPair(this, reference, allocator); } @Override - public TransferPair getTransferPair(FieldReference ref) { - return new RepeatedMapTransferPair(this, ref); + public TransferPair getTransferPair(FieldReference ref, BufferAllocator allocator) { + return new RepeatedMapTransferPair(this, ref, allocator); } @Override @@ -261,8 +261,8 @@ public class RepeatedMapVector extends AbstractMapVector private final MapVector to; private static final MajorType MAP_TYPE = Types.required(MinorType.MAP); - public SingleMapTransferPair(RepeatedMapVector from, SchemaPath path) { - this(from, new MapVector(MaterializedField.create(path, MAP_TYPE), from.allocator, from.callBack), false); + public SingleMapTransferPair(RepeatedMapVector from, SchemaPath path, BufferAllocator allocator) { + this(from, new MapVector(MaterializedField.create(path, MAP_TYPE), allocator, from.callBack), false); } public SingleMapTransferPair(RepeatedMapVector from, MapVector to) { @@ -326,8 +326,8 @@ public class RepeatedMapVector extends AbstractMapVector private final RepeatedMapVector to; private final RepeatedMapVector from; - public RepeatedMapTransferPair(RepeatedMapVector from, SchemaPath path) { - this(from, new RepeatedMapVector(MaterializedField.create(path, TYPE), from.allocator, from.callBack), false); + public RepeatedMapTransferPair(RepeatedMapVector from, SchemaPath path, BufferAllocator allocator) { + this(from, new RepeatedMapVector(MaterializedField.create(path, TYPE), allocator, from.callBack), false); } public RepeatedMapTransferPair(RepeatedMapVector from, RepeatedMapVector to) { http://git-wip-us.apache.org/repos/asf/drill/blob/6dea4299/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/impl/PromotableWriter.java ---------------------------------------------------------------------- diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/impl/PromotableWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/impl/PromotableWriter.java index 894e60e..ee16e97 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/impl/PromotableWriter.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/impl/PromotableWriter.java @@ -151,7 +151,7 @@ public class PromotableWriter extends AbstractPromotableFieldWriter { private FieldWriter promoteToUnion() { String name = vector.getField().getLastName(); - TransferPair tp = vector.getTransferPair(new FieldReference(vector.getField().getType().getMinorType().name().toLowerCase())); + TransferPair tp = vector.getTransferPair(new FieldReference(vector.getField().getType().getMinorType().name().toLowerCase()), vector.getAllocator()); tp.transfer(); if (parentContainer != null) { unionVector = parentContainer.addOrGet(name, Types.optional(MinorType.UNION), UnionVector.class);