Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 1CF49200C7B for ; Fri, 5 May 2017 15:27:29 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 1BB45160B97; Fri, 5 May 2017 13:27:29 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 70E83160BBF for ; Fri, 5 May 2017 15:27:27 +0200 (CEST) Received: (qmail 80466 invoked by uid 500); 5 May 2017 13:27:26 -0000 Mailing-List: contact commits-help@drill.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: commits@drill.apache.org Delivered-To: mailing list commits@drill.apache.org Received: (qmail 80384 invoked by uid 99); 5 May 2017 13:27:25 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 05 May 2017 13:27:25 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 2D7EBE04E3; Fri, 5 May 2017 13:27:25 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit From: arina@apache.org To: commits@drill.apache.org Date: Fri, 05 May 2017 13:27:29 -0000 Message-Id: <339b3c00312e4555b1ff98b0aa0dabe8@git.apache.org> In-Reply-To: <32d3a2c55be841c998b31b579e2eb808@git.apache.org> References: <32d3a2c55be841c998b31b579e2eb808@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [5/5] drill git commit: DRILL-5423: Refactor ScanBatch to allow unit testing record readers archived-at: Fri, 05 May 2017 13:27:29 -0000 DRILL-5423: Refactor ScanBatch to allow unit testing record readers Refactors ScanBatch to allow unit testing of record reader implementations, especially the “writer” classes. See JIRA for details. closes #811 Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/41ffed50 Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/41ffed50 Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/41ffed50 Branch: refs/heads/master Commit: 41ffed50fbb9319b4a796555396b88de010cb10b Parents: 0939485 Author: Paul Rogers Authored: Sat Apr 8 20:52:04 2017 -0700 Committer: Arina Ielchiieva Committed: Fri May 5 15:46:01 2017 +0300 ---------------------------------------------------------------------- .../exec/ops/AbstractOperatorExecContext.java | 90 ++++++++++++++++ .../apache/drill/exec/ops/OperatorContext.java | 51 ++-------- .../drill/exec/ops/OperatorContextImpl.java | 102 ++++++------------- .../drill/exec/ops/OperatorExecContext.java | 46 +++++++++ .../drill/exec/ops/OperatorUtilities.java | 48 +++++++++ .../drill/exec/physical/impl/BaseRootExec.java | 5 +- .../drill/exec/physical/impl/ScanBatch.java | 62 ++++++++--- .../drill/exec/memory/TestAllocators.java | 7 +- .../drill/exec/record/TestRecordIterator.java | 5 +- 9 files changed, 280 insertions(+), 136 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/41ffed50/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AbstractOperatorExecContext.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AbstractOperatorExecContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AbstractOperatorExecContext.java new file mode 100644 index 0000000..a517fdf --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AbstractOperatorExecContext.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.ops; + +import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.testing.ExecutionControls; + +import io.netty.buffer.DrillBuf; + +/** + * Implementation of {@link OperatorExecContext} that provides services + * needed by most run-time operators. Excludes services that need the + * entire Drillbit. Allows easy testing of operator code that uses this + * interface. + */ + +public class AbstractOperatorExecContext implements OperatorExecContext { + + protected final BufferAllocator allocator; + protected final ExecutionControls executionControls; + protected final PhysicalOperator popConfig; + protected final BufferManager manager; + protected final OperatorStatReceiver statsWriter; + + public AbstractOperatorExecContext(BufferAllocator allocator, PhysicalOperator popConfig, + ExecutionControls executionControls, + OperatorStatReceiver stats) { + this.allocator = allocator; + this.popConfig = popConfig; + manager = new BufferManagerImpl(allocator); + statsWriter = stats; + this.executionControls = executionControls; + } + + @Override + public DrillBuf replace(DrillBuf old, int newSize) { + return manager.replace(old, newSize); + } + + @Override + public DrillBuf getManagedBuffer() { + return manager.getManagedBuffer(); + } + + @Override + public DrillBuf getManagedBuffer(int size) { + return manager.getManagedBuffer(size); + } + + @Override + public ExecutionControls getExecutionControls() { return executionControls; } + + @Override + public OperatorStatReceiver getStatsWriter() { return statsWriter; } + + @Override + public BufferAllocator getAllocator() { + if (allocator == null) { + throw new UnsupportedOperationException("Operator context does not have an allocator"); + } + return allocator; + } + + @Override + public void close() { + try { + manager.close(); + } finally { + if (allocator != null) { + allocator.close(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/41ffed50/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java index d6045fc..b248d5f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -18,44 +18,28 @@ package org.apache.drill.exec.ops; import java.io.IOException; -import java.util.Iterator; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; -import org.apache.drill.exec.memory.BufferAllocator; -import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.store.dfs.DrillFileSystem; -import org.apache.drill.exec.testing.ExecutionControls; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; import com.google.common.util.concurrent.ListenableFuture; -import io.netty.buffer.DrillBuf; +public interface OperatorContext extends OperatorExecContext { -public abstract class OperatorContext { + OperatorStats getStats(); - public abstract DrillBuf replace(DrillBuf old, int newSize); + ExecutorService getExecutor(); - public abstract DrillBuf getManagedBuffer(); + ExecutorService getScanExecutor(); - public abstract DrillBuf getManagedBuffer(int size); + ExecutorService getScanDecodeExecutor(); - public abstract BufferAllocator getAllocator(); + DrillFileSystem newFileSystem(Configuration conf) throws IOException; - public abstract OperatorStats getStats(); - - public abstract ExecutorService getExecutor(); - - public abstract ExecutorService getScanExecutor(); - - public abstract ExecutorService getScanDecodeExecutor(); - - public abstract ExecutionControls getExecutionControls(); - - public abstract DrillFileSystem newFileSystem(Configuration conf) throws IOException; - - public abstract DrillFileSystem newNonTrackingFileSystem(Configuration conf) throws IOException; + DrillFileSystem newNonTrackingFileSystem(Configuration conf) throws IOException; /** * Run the callable as the given proxy user. @@ -65,21 +49,6 @@ public abstract class OperatorContext { * @param result type * @return Future future with the result of calling the callable */ - public abstract ListenableFuture runCallableAs(UserGroupInformation proxyUgi, + ListenableFuture runCallableAs(UserGroupInformation proxyUgi, Callable callable); - - public static int getChildCount(PhysicalOperator popConfig) { - Iterator iter = popConfig.iterator(); - int i = 0; - while (iter.hasNext()) { - iter.next(); - i++; - } - - if (i == 0) { - i = 1; - } - return i; - } - -} + } http://git-wip-us.apache.org/repos/asf/drill/blob/41ffed50/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java index c19cc1f..37c609e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -17,8 +17,6 @@ */ package org.apache.drill.exec.ops; -import io.netty.buffer.DrillBuf; - import java.io.IOException; import java.security.PrivilegedExceptionAction; import java.util.concurrent.Callable; @@ -26,10 +24,8 @@ import java.util.concurrent.ExecutorService; import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.exec.exception.OutOfMemoryException; -import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.store.dfs.DrillFileSystem; -import org.apache.drill.exec.testing.ExecutionControls; import org.apache.drill.exec.work.WorkManager; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; @@ -39,15 +35,11 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; -class OperatorContextImpl extends OperatorContext implements AutoCloseable { +class OperatorContextImpl extends AbstractOperatorExecContext implements OperatorContext, AutoCloseable { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OperatorContextImpl.class); - private final BufferAllocator allocator; - private final ExecutionControls executionControls; private boolean closed = false; - private final PhysicalOperator popConfig; private final OperatorStats stats; - private final BufferManager manager; private DrillFileSystem fs; private final ExecutorService executor; private final ExecutorService scanExecutor; @@ -62,75 +54,43 @@ class OperatorContextImpl extends OperatorContext implements AutoCloseable { private ListeningExecutorService delegatePool; public OperatorContextImpl(PhysicalOperator popConfig, FragmentContext context) throws OutOfMemoryException { - this.allocator = context.getNewChildAllocator(popConfig.getClass().getSimpleName(), - popConfig.getOperatorId(), popConfig.getInitialAllocation(), popConfig.getMaxAllocation()); - this.popConfig = popConfig; - this.manager = new BufferManagerImpl(allocator); - - OpProfileDef def = - new OpProfileDef(popConfig.getOperatorId(), popConfig.getOperatorType(), getChildCount(popConfig)); - stats = context.getStats().newOperatorStats(def, allocator); - executionControls = context.getExecutionControls(); - executor = context.getDrillbitContext().getExecutor(); - scanExecutor = context.getDrillbitContext().getScanExecutor(); - scanDecodeExecutor = context.getDrillbitContext().getScanDecodeExecutor(); + this(popConfig, context, null); } public OperatorContextImpl(PhysicalOperator popConfig, FragmentContext context, OperatorStats stats) throws OutOfMemoryException { - this.allocator = context.getNewChildAllocator(popConfig.getClass().getSimpleName(), - popConfig.getOperatorId(), popConfig.getInitialAllocation(), popConfig.getMaxAllocation()); - this.popConfig = popConfig; - this.manager = new BufferManagerImpl(allocator); - this.stats = stats; - executionControls = context.getExecutionControls(); + super(context.getNewChildAllocator(popConfig.getClass().getSimpleName(), + popConfig.getOperatorId(), popConfig.getInitialAllocation(), popConfig.getMaxAllocation()), + popConfig, context.getExecutionControls(), stats); + if (stats != null) { + this.stats = stats; + } else { + OpProfileDef def = + new OpProfileDef(popConfig.getOperatorId(), popConfig.getOperatorType(), + OperatorUtilities.getChildCount(popConfig)); + this.stats = context.getStats().newOperatorStats(def, allocator); + } executor = context.getDrillbitContext().getExecutor(); scanExecutor = context.getDrillbitContext().getScanExecutor(); scanDecodeExecutor = context.getDrillbitContext().getScanDecodeExecutor(); } - @Override - public DrillBuf replace(DrillBuf old, int newSize) { - return manager.replace(old, newSize); - } - - @Override - public DrillBuf getManagedBuffer() { - return manager.getManagedBuffer(); - } - - @Override - public DrillBuf getManagedBuffer(int size) { - return manager.getManagedBuffer(size); - } - // Allow an operator to use the thread pool @Override public ExecutorService getExecutor() { return executor; } + @Override public ExecutorService getScanExecutor() { return scanExecutor; } + @Override public ExecutorService getScanDecodeExecutor() { return scanDecodeExecutor; } - @Override - public ExecutionControls getExecutionControls() { - return executionControls; - } - - @Override - public BufferAllocator getAllocator() { - if (allocator == null) { - throw new UnsupportedOperationException("Operator context does not have an allocator"); - } - return allocator; - } - public boolean isClosed() { return closed; } @@ -143,20 +103,19 @@ class OperatorContextImpl extends OperatorContext implements AutoCloseable { } logger.debug("Closing context for {}", popConfig != null ? popConfig.getClass().getName() : null); - manager.close(); - - if (allocator != null) { - allocator.close(); - } - - if (fs != null) { - try { - fs.close(); - } catch (IOException e) { - throw new DrillRuntimeException(e); + closed = true; + try { + super.close(); + } finally { + if (fs != null) { + try { + fs.close(); + fs = null; + } catch (IOException e) { + throw new DrillRuntimeException(e); + } } } - closed = true; } @Override @@ -201,14 +160,13 @@ class OperatorContextImpl extends OperatorContext implements AutoCloseable { return fs; } - @Override - /* - Creates a DrillFileSystem that does not automatically track operator stats. + /** + * Creates a DrillFileSystem that does not automatically track operator stats. */ + @Override public DrillFileSystem newNonTrackingFileSystem(Configuration conf) throws IOException { Preconditions.checkState(fs == null, "Tried to create a second FileSystem. Can only be called once per OperatorContext"); fs = new DrillFileSystem(conf, null); return fs; } - } http://git-wip-us.apache.org/repos/asf/drill/blob/41ffed50/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorExecContext.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorExecContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorExecContext.java new file mode 100644 index 0000000..4d64aba --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorExecContext.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.ops; + +import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.testing.ExecutionControls; + +import io.netty.buffer.DrillBuf; + +/** + * Narrowed version of the {@link OperatorContext} used to create an + * easy-to-test version of the operator context that excludes services + * that require a full Drillbit server. + */ + +public interface OperatorExecContext { + + DrillBuf replace(DrillBuf old, int newSize); + + DrillBuf getManagedBuffer(); + + DrillBuf getManagedBuffer(int size); + + BufferAllocator getAllocator(); + + ExecutionControls getExecutionControls(); + + OperatorStatReceiver getStatsWriter(); + + void close(); +} http://git-wip-us.apache.org/repos/asf/drill/blob/41ffed50/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorUtilities.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorUtilities.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorUtilities.java new file mode 100644 index 0000000..2e6e759 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorUtilities.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.ops; + +import java.util.Iterator; + +import org.apache.drill.exec.physical.base.PhysicalOperator; + +/** + * Utility methods, formerly on the OperatorContext class, that work with + * operators. The utilities here are available to operators at unit test + * time, while methods in OperatorContext are available only in production + * code. + */ + +public class OperatorUtilities { + + private OperatorUtilities() { } + + public static int getChildCount(PhysicalOperator popConfig) { + Iterator iter = popConfig.iterator(); + int count = 0; + while (iter.hasNext()) { + iter.next(); + count++; + } + + if (count == 0) { + count = 1; + } + return count; + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/41ffed50/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java index f720f8e..d01e294 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java @@ -25,6 +25,7 @@ import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.OpProfileDef; import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.ops.OperatorStats; +import org.apache.drill.exec.ops.OperatorUtilities; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; import org.apache.drill.exec.record.CloseableRecordBatch; @@ -44,7 +45,7 @@ public abstract class BaseRootExec implements RootExec { public BaseRootExec(final FragmentContext fragmentContext, final PhysicalOperator config) throws OutOfMemoryException { this.oContext = fragmentContext.newOperatorContext(config, stats); stats = new OperatorStats(new OpProfileDef(config.getOperatorId(), - config.getOperatorType(), OperatorContext.getChildCount(config)), + config.getOperatorType(), OperatorUtilities.getChildCount(config)), oContext.getAllocator()); fragmentContext.getStats().addOperatorStats(this.stats); this.fragmentContext = fragmentContext; @@ -54,7 +55,7 @@ public abstract class BaseRootExec implements RootExec { final PhysicalOperator config) throws OutOfMemoryException { this.oContext = oContext; stats = new OperatorStats(new OpProfileDef(config.getOperatorId(), - config.getOperatorType(), OperatorContext.getChildCount(config)), + config.getOperatorType(), OperatorUtilities.getChildCount(config)), oContext.getAllocator()); fragmentContext.getStats().addOperatorStats(this.stats); this.fragmentContext = fragmentContext; http://git-wip-us.apache.org/repos/asf/drill/blob/41ffed50/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java index 011e751..5a9af39 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -35,6 +35,7 @@ import org.apache.drill.exec.expr.TypeHelper; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.OperatorContext; +import org.apache.drill.exec.ops.OperatorExecContext; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; @@ -56,6 +57,7 @@ import org.apache.drill.exec.vector.SchemaChangeCallBack; import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.common.map.CaseInsensitiveMap; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Maps; /** @@ -68,19 +70,14 @@ public class ScanBatch implements CloseableRecordBatch { /** Main collection of fields' value vectors. */ private final VectorContainer container = new VectorContainer(); - /** Fields' value vectors indexed by fields' keys. */ - private final CaseInsensitiveMap fieldVectorMap = - CaseInsensitiveMap.newHashMap(); - private int recordCount; private final FragmentContext context; private final OperatorContext oContext; private Iterator readers; private RecordReader currentReader; private BatchSchema schema; - private final Mutator mutator = new Mutator(); + private final Mutator mutator; private boolean done = false; - private SchemaChangeCallBack callBack = new SchemaChangeCallBack(); private boolean hasReadNonEmptyFile = false; private Map implicitVectors; private Iterator> implicitColumns; @@ -98,6 +95,7 @@ public class ScanBatch implements CloseableRecordBatch { currentReader = readers.next(); this.oContext = oContext; allocator = oContext.getAllocator(); + mutator = new Mutator(oContext, allocator, container); boolean setup = false; try { @@ -158,7 +156,7 @@ public class ScanBatch implements CloseableRecordBatch { } private void clearFieldVectorMap() { - for (final ValueVector v : fieldVectorMap.values()) { + for (final ValueVector v : mutator.fieldVectorMap().values()) { v.clear(); } } @@ -173,7 +171,7 @@ public class ScanBatch implements CloseableRecordBatch { try { injector.injectChecked(context.getExecutionControls(), "next-allocate", OutOfMemoryException.class); - currentReader.allocate(fieldVectorMap); + currentReader.allocate(mutator.fieldVectorMap()); } catch (OutOfMemoryException e) { logger.debug("Caught Out of Memory Exception", e); clearFieldVectorMap(); @@ -204,10 +202,8 @@ public class ScanBatch implements CloseableRecordBatch { // If all the files we have read so far are just empty, the schema is not useful if (! hasReadNonEmptyFile) { container.clear(); - for (ValueVector v : fieldVectorMap.values()) { - v.clear(); - } - fieldVectorMap.clear(); + clearFieldVectorMap(); + mutator.clear(); } currentReader.close(); @@ -215,7 +211,7 @@ public class ScanBatch implements CloseableRecordBatch { implicitValues = implicitColumns.hasNext() ? implicitColumns.next() : null; currentReader.setup(oContext, mutator); try { - currentReader.allocate(fieldVectorMap); + currentReader.allocate(mutator.fieldVectorMap()); } catch (OutOfMemoryException e) { logger.debug("Caught OutOfMemoryException"); clearFieldVectorMap(); @@ -323,11 +319,41 @@ public class ScanBatch implements CloseableRecordBatch { return container.getValueAccessorById(clazz, ids); } - private class Mutator implements OutputMutator { + /** + * Row set mutator implementation provided to record readers created by + * this scan batch. Made visible so that tests can create this mutator + * without also needing a ScanBatch instance. (This class is really independent + * of the ScanBatch, but resides here for historical reasons. This is, + * in turn, the only use of the genereated vector readers in the vector + * package.) + */ + + @VisibleForTesting + public static class Mutator implements OutputMutator { /** Whether schema has changed since last inquiry (via #isNewSchema}). Is * true before first inquiry. */ private boolean schemaChanged = true; + /** Fields' value vectors indexed by fields' keys. */ + private final CaseInsensitiveMap fieldVectorMap = + CaseInsensitiveMap.newHashMap(); + + private final SchemaChangeCallBack callBack = new SchemaChangeCallBack(); + private final BufferAllocator allocator; + + private final VectorContainer container; + + private final OperatorExecContext oContext; + + public Mutator(OperatorExecContext oContext, BufferAllocator allocator, VectorContainer container) { + this.oContext = oContext; + this.allocator = allocator; + this.container = container; + } + + public Map fieldVectorMap() { + return fieldVectorMap; + } @SuppressWarnings("resource") @Override @@ -396,6 +422,10 @@ public class ScanBatch implements CloseableRecordBatch { public CallBack getCallBack() { return callBack; } + + public void clear() { + fieldVectorMap.clear(); + } } @Override @@ -414,7 +444,7 @@ public class ScanBatch implements CloseableRecordBatch { for (final ValueVector v : implicitVectors.values()) { v.clear(); } - fieldVectorMap.clear(); + mutator.clear(); currentReader.close(); } http://git-wip-us.apache.org/repos/asf/drill/blob/41ffed50/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java b/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java index 288e78d..0dc2925 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java @@ -36,6 +36,7 @@ import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.OpProfileDef; import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.ops.OperatorStats; +import org.apache.drill.exec.ops.OperatorUtilities; import org.apache.drill.exec.physical.PhysicalPlan; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.planner.PhysicalPlanReader; @@ -218,7 +219,7 @@ public class TestAllocators extends DrillTest { // Use some bogus operator type to create a new operator context. def = new OpProfileDef(physicalOperator1.getOperatorId(), UserBitShared.CoreOperatorType.MOCK_SUB_SCAN_VALUE, - OperatorContext.getChildCount(physicalOperator1)); + OperatorUtilities.getChildCount(physicalOperator1)); stats = fragmentContext1.getStats().newOperatorStats(def, fragmentContext1.getAllocator()); // Add a couple of Operator Contexts @@ -232,7 +233,7 @@ public class TestAllocators extends DrillTest { OperatorContext oContext21 = fragmentContext1.newOperatorContext(physicalOperator3); def = new OpProfileDef(physicalOperator4.getOperatorId(), UserBitShared.CoreOperatorType.TEXT_WRITER_VALUE, - OperatorContext.getChildCount(physicalOperator4)); + OperatorUtilities.getChildCount(physicalOperator4)); stats = fragmentContext2.getStats().newOperatorStats(def, fragmentContext2.getAllocator()); OperatorContext oContext22 = fragmentContext2.newOperatorContext(physicalOperator4, stats); DrillBuf b22 = oContext22.getAllocator().buffer(2000000); @@ -246,7 +247,7 @@ public class TestAllocators extends DrillTest { // New fragment starts an operator that allocates an amount within the limit def = new OpProfileDef(physicalOperator5.getOperatorId(), UserBitShared.CoreOperatorType.UNION_VALUE, - OperatorContext.getChildCount(physicalOperator5)); + OperatorUtilities.getChildCount(physicalOperator5)); stats = fragmentContext3.getStats().newOperatorStats(def, fragmentContext3.getAllocator()); OperatorContext oContext31 = fragmentContext3.newOperatorContext(physicalOperator5, stats); http://git-wip-us.apache.org/repos/asf/drill/blob/41ffed50/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestRecordIterator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestRecordIterator.java b/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestRecordIterator.java index c2429b7..847caa5 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestRecordIterator.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestRecordIterator.java @@ -31,6 +31,7 @@ import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.OpProfileDef; import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.ops.OperatorStats; +import org.apache.drill.exec.ops.OperatorUtilities; import org.apache.drill.exec.physical.PhysicalPlan; import org.apache.drill.exec.physical.base.FragmentRoot; import org.apache.drill.exec.physical.base.PhysicalOperator; @@ -73,7 +74,7 @@ public class TestRecordIterator extends PopUnitTestBase { RecordBatch singleBatch = exec.getIncoming(); PhysicalOperator dummyPop = operatorList.iterator().next(); OpProfileDef def = new OpProfileDef(dummyPop.getOperatorId(), UserBitShared.CoreOperatorType.MOCK_SUB_SCAN_VALUE, - OperatorContext.getChildCount(dummyPop)); + OperatorUtilities.getChildCount(dummyPop)); OperatorStats stats = exec.getContext().getStats().newOperatorStats(def, exec.getContext().getAllocator()); RecordIterator iter = new RecordIterator(singleBatch, null, exec.getContext().newOperatorContext(dummyPop, stats), 0, false); int totalRecords = 0; @@ -130,7 +131,7 @@ public class TestRecordIterator extends PopUnitTestBase { RecordBatch singleBatch = exec.getIncoming(); PhysicalOperator dummyPop = operatorList.iterator().next(); OpProfileDef def = new OpProfileDef(dummyPop.getOperatorId(), UserBitShared.CoreOperatorType.MOCK_SUB_SCAN_VALUE, - OperatorContext.getChildCount(dummyPop)); + OperatorUtilities.getChildCount(dummyPop)); OperatorStats stats = exec.getContext().getStats().newOperatorStats(def, exec.getContext().getAllocator()); RecordIterator iter = new RecordIterator(singleBatch, null, exec.getContext().newOperatorContext(dummyPop, stats), 0); List vectors = null;