drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From prog...@apache.org
Subject [2/2] drill git commit: DRILL-5842: Refactor fragment, operator contexts
Date Tue, 14 Nov 2017 00:14:56 GMT
DRILL-5842: Refactor fragment, operator contexts

This closes #978


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/c56de2f1
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/c56de2f1
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/c56de2f1

Branch: refs/heads/master
Commit: c56de2f13a36d673f0c5836d44817137e64b91e4
Parents: 42fc11e
Author: Paul Rogers <progers@maprtech.com>
Authored: Wed Oct 4 22:43:44 2017 -0700
Committer: Paul Rogers <progers@maprtech.com>
Committed: Mon Nov 13 14:55:39 2017 -0800

----------------------------------------------------------------------
 .../exec/ops/AbstractOperatorExecContext.java   |  95 ---------
 .../drill/exec/ops/AccountingDataTunnel.java    |   3 +-
 .../drill/exec/ops/BaseFragmentContext.java     |  90 +++++++++
 .../drill/exec/ops/BaseOperatorContext.java     | 196 +++++++++++++++++++
 .../apache/drill/exec/ops/FragmentContext.java  |  81 +++-----
 .../exec/ops/FragmentContextInterface.java      | 149 ++++++++++++++
 .../drill/exec/ops/FragmentExecContext.java     | 131 -------------
 .../apache/drill/exec/ops/OperExecContext.java  |  90 ---------
 .../drill/exec/ops/OperExecContextImpl.java     | 146 --------------
 .../apache/drill/exec/ops/OperatorContext.java  |  97 ++++++++-
 .../drill/exec/ops/OperatorContextImpl.java     |  76 ++-----
 .../drill/exec/ops/OperatorExecContext.java     |  46 -----
 .../drill/exec/ops/OperatorStatReceiver.java    |   4 +
 .../apache/drill/exec/ops/OperatorStats.java    |   2 +
 .../drill/exec/physical/impl/ScanBatch.java     |   5 +-
 .../physical/impl/xsort/SingleBatchSorter.java  |   4 +-
 .../impl/xsort/SingleBatchSorterTemplate.java   |   6 +-
 .../impl/xsort/managed/BaseSortWrapper.java     |   9 +-
 .../impl/xsort/managed/BaseWrapper.java         |   8 +-
 .../impl/xsort/managed/BufferedBatches.java     |   6 +-
 .../impl/xsort/managed/ExternalSortBatch.java   |  10 +-
 .../impl/xsort/managed/MSortTemplate.java       |   8 +-
 .../physical/impl/xsort/managed/MSorter.java    |   4 +-
 .../impl/xsort/managed/MergeSortWrapper.java    |  15 +-
 .../managed/PriorityQueueCopierWrapper.java     |  10 +-
 .../physical/impl/xsort/managed/SortImpl.java   |  10 +-
 .../impl/xsort/managed/SortMetrics.java         |   1 +
 .../impl/xsort/managed/SorterWrapper.java       |  10 +-
 .../impl/xsort/managed/SpilledRuns.java         |   8 +-
 .../exec/store/dfs/DrillFSDataInputStream.java  |  11 +-
 .../drill/exec/store/dfs/DrillFileSystem.java   |   6 +-
 .../impl/xsort/managed/SortTestUtilities.java   |   8 +-
 .../impl/xsort/managed/TestSortImpl.java        |   8 +-
 .../physical/impl/xsort/managed/TestSorter.java |   6 +-
 .../physical/unit/BasicPhysicalOpUnitTest.java  |   3 +
 .../physical/unit/MiniPlanUnitTestBase.java     |  36 ++--
 .../physical/unit/PhysicalOpUnitTestBase.java   |  14 +-
 .../org/apache/drill/test/OperatorFixture.java  | 124 +++++++-----
 38 files changed, 768 insertions(+), 768 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AbstractOperatorExecContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AbstractOperatorExecContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AbstractOperatorExecContext.java
deleted file mode 100644
index ebef55c..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AbstractOperatorExecContext.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.ops;
-
-import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.testing.ExecutionControls;
-
-import io.netty.buffer.DrillBuf;
-
-/**
- * Implementation of {@link OperatorExecContext} that provides services
- * needed by most run-time operators. Excludes services that need the
- * entire Drillbit. Allows easy testing of operator code that uses this
- * interface.
- */
-
-public class AbstractOperatorExecContext implements OperatorExecContext {
-
-  protected final BufferAllocator allocator;
-  protected final ExecutionControls executionControls;
-  protected final PhysicalOperator popConfig;
-  protected final BufferManager manager;
-  protected OperatorStatReceiver statsWriter;
-
-  public AbstractOperatorExecContext(BufferAllocator allocator, PhysicalOperator popConfig,
-                                     ExecutionControls executionControls,
-                                     OperatorStatReceiver stats) {
-    this.allocator = allocator;
-    this.popConfig = popConfig;
-    this.manager = new BufferManagerImpl(allocator);
-    statsWriter = stats;
-
-    this.executionControls = executionControls;
-  }
-
-  @Override
-  public DrillBuf replace(DrillBuf old, int newSize) {
-    return manager.replace(old, newSize);
-  }
-
-  @Override
-  public DrillBuf getManagedBuffer() {
-    return manager.getManagedBuffer();
-  }
-
-  @Override
-  public DrillBuf getManagedBuffer(int size) {
-    return manager.getManagedBuffer(size);
-  }
-
-  @Override
-  public ExecutionControls getExecutionControls() {
-    return executionControls;
-  }
-
-  @Override
-  public BufferAllocator getAllocator() {
-    if (allocator == null) {
-      throw new UnsupportedOperationException("Operator context does not have an allocator");
-    }
-    return allocator;
-  }
-
-  @Override
-  public void close() {
-    try {
-      manager.close();
-    } finally {
-      if (allocator != null) {
-        allocator.close();
-      }
-    }
-  }
-
-  @Override
-  public OperatorStatReceiver getStatsWriter() {
-    return statsWriter;
-  }
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AccountingDataTunnel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AccountingDataTunnel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AccountingDataTunnel.java
index 22923bb..44aa280 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AccountingDataTunnel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AccountingDataTunnel.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -23,7 +23,6 @@ import org.apache.drill.exec.rpc.RpcOutcomeListener;
 import org.apache.drill.exec.rpc.data.DataTunnel;
 import org.apache.drill.exec.testing.ControlsInjector;
 import org.apache.drill.exec.testing.ExecutionControls;
-import org.apache.drill.exec.testing.ExecutionControlsInjector;
 import org.slf4j.Logger;
 
 /**

http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BaseFragmentContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BaseFragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BaseFragmentContext.java
new file mode 100644
index 0000000..a39213c
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BaseFragmentContext.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.ops;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.drill.exec.compile.CodeCompiler;
+import org.apache.drill.exec.exception.ClassTransformationException;
+import org.apache.drill.exec.expr.ClassGenerator;
+import org.apache.drill.exec.expr.CodeGenerator;
+import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
+
+import io.netty.buffer.DrillBuf;
+
+/**
+ * Common implementation for both the test and production versions
+ * of the fragment context.
+ */
+
+public abstract class BaseFragmentContext implements FragmentContextInterface {
+
+  private final FunctionImplementationRegistry funcRegistry;
+
+  public BaseFragmentContext(final FunctionImplementationRegistry funcRegistry) {
+    this.funcRegistry = funcRegistry;
+  }
+
+  @Override
+  public FunctionImplementationRegistry getFunctionRegistry() {
+    return funcRegistry;
+  }
+
+  protected abstract CodeCompiler getCompiler();
+
+  @Override
+  public <T> T getImplementationClass(final ClassGenerator<T> cg)
+      throws ClassTransformationException, IOException {
+    return getImplementationClass(cg.getCodeGenerator());
+  }
+
+  @Override
+  public <T> T getImplementationClass(final CodeGenerator<T> cg)
+      throws ClassTransformationException, IOException {
+    return getCompiler().createInstance(cg);
+  }
+
+  @Override
+  public <T> List<T> getImplementationClass(final ClassGenerator<T> cg, final int instanceCount) throws ClassTransformationException, IOException {
+    return getImplementationClass(cg.getCodeGenerator(), instanceCount);
+  }
+
+  @Override
+  public <T> List<T> getImplementationClass(final CodeGenerator<T> cg, final int instanceCount) throws ClassTransformationException, IOException {
+    return getCompiler().createInstances(cg, instanceCount);
+  }
+
+  protected abstract BufferManager getBufferManager();
+
+  @Override
+  public DrillBuf replace(final DrillBuf old, final int newSize) {
+    return getBufferManager().replace(old, newSize);
+  }
+
+  @Override
+  public DrillBuf getManagedBuffer() {
+    return getBufferManager().getManagedBuffer();
+  }
+
+  @Override
+  public DrillBuf getManagedBuffer(final int size) {
+    return getBufferManager().getManagedBuffer(size);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BaseOperatorContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BaseOperatorContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BaseOperatorContext.java
new file mode 100644
index 0000000..123f8fa
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BaseOperatorContext.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.ops;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.testing.ControlsInjector;
+import org.apache.drill.exec.testing.ExecutionControls;
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.base.Preconditions;
+
+import io.netty.buffer.DrillBuf;
+
+/**
+ * Implementation of {@link OperatorContext} that provides services
+ * needed by most run-time operators. Excludes services that need the
+ * entire Drillbit. This class provides services common to the test-time
+ * version of the operator context and the full production-time context
+ * that includes network services.
+ */
+
+public abstract class BaseOperatorContext implements OperatorContext {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseOperatorContext.class);
+
+  protected final FragmentContextInterface context;
+  protected final BufferAllocator allocator;
+  protected final PhysicalOperator popConfig;
+  protected final BufferManager manager;
+  private DrillFileSystem fs;
+  private ControlsInjector injector;
+
+  public BaseOperatorContext(FragmentContextInterface context, BufferAllocator allocator,
+               PhysicalOperator popConfig) {
+    this.context = context;
+    this.allocator = allocator;
+    this.popConfig = popConfig;
+    this.manager = new BufferManagerImpl(allocator);
+  }
+
+  @Override
+  public FragmentContextInterface getFragmentContext() {
+    return context;
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public <T extends PhysicalOperator> T getOperatorDefn() {
+    return (T) popConfig;
+  }
+
+  public String getName() {
+    return popConfig.getClass().getName();
+  }
+
+  @Override
+  public DrillBuf replace(DrillBuf old, int newSize) {
+    return manager.replace(old, newSize);
+  }
+
+  @Override
+  public DrillBuf getManagedBuffer() {
+    return manager.getManagedBuffer();
+  }
+
+  @Override
+  public DrillBuf getManagedBuffer(int size) {
+    return manager.getManagedBuffer(size);
+  }
+
+  @Override
+  public ExecutionControls getExecutionControls() {
+    return context.getExecutionControls();
+  }
+
+  @Override
+  public BufferAllocator getAllocator() {
+    if (allocator == null) {
+      throw new UnsupportedOperationException("Operator context does not have an allocator");
+    }
+    return allocator;
+  }
+
+  // Allow an operator to use the thread pool
+  @Override
+  public ExecutorService getExecutor() {
+    return context.getDrillbitContext().getExecutor();
+  }
+
+  @Override
+  public ExecutorService getScanExecutor() {
+    return context.getDrillbitContext().getScanExecutor();
+  }
+
+  @Override
+  public ExecutorService getScanDecodeExecutor() {
+    return context.getDrillbitContext().getScanDecodeExecutor();
+  }
+
+  @Override
+  public void setInjector(ControlsInjector injector) {
+    this.injector = injector;
+  }
+
+  @Override
+  public ControlsInjector getInjector() {
+    return injector;
+  }
+
+  @Override
+  public void injectUnchecked(String desc) {
+    ExecutionControls executionControls = context.getExecutionControls();
+    if (injector != null  &&  executionControls != null) {
+      injector.injectUnchecked(executionControls, desc);
+    }
+  }
+
+  @Override
+  public <T extends Throwable> void injectChecked(String desc, Class<T> exceptionClass)
+      throws T {
+    ExecutionControls executionControls = context.getExecutionControls();
+    if (injector != null  &&  executionControls != null) {
+      injector.injectChecked(executionControls, desc, exceptionClass);
+    }
+  }
+
+  @Override
+  public void close() {
+    RuntimeException ex = null;
+    try {
+      manager.close();
+    } catch (RuntimeException e) {
+      ex = e;
+    }
+    try {
+      if (allocator != null) {
+        allocator.close();
+      }
+    } catch (RuntimeException e) {
+      ex = ex == null ? e : ex;
+    }
+    try {
+      if (fs != null) {
+        fs.close();
+        fs = null;
+      }
+    } catch (IOException e) {
+      if (ex == null) {
+        ex = UserException
+            .resourceError(e)
+            .addContext("Failed to close the Drill file system for " + getName())
+            .build(logger);
+      }
+    }
+    if (ex != null) {
+      throw ex;
+    }
+  }
+
+  @Override
+  public DrillFileSystem newFileSystem(Configuration conf) throws IOException {
+    Preconditions.checkState(fs == null, "Tried to create a second FileSystem. Can only be called once per OperatorContext");
+    fs = new DrillFileSystem(conf, getStatsWriter());
+    return fs;
+  }
+
+  /**
+   * Creates a DrillFileSystem that does not automatically track operator stats.
+   */
+  @Override
+  public DrillFileSystem newNonTrackingFileSystem(Configuration conf) throws IOException {
+    Preconditions.checkState(fs == null, "Tried to create a second FileSystem. Can only be called once per OperatorContext");
+    fs = new DrillFileSystem(conf, null);
+    return fs;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
index 19ffca2..1cde97a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,22 +17,18 @@
  */
 package org.apache.drill.exec.ops;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import io.netty.buffer.DrillBuf;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executor;
+
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.exception.ClassTransformationException;
+import org.apache.drill.exec.compile.CodeCompiler;
 import org.apache.drill.exec.exception.OutOfMemoryException;
-import org.apache.drill.exec.expr.ClassGenerator;
-import org.apache.drill.exec.expr.CodeGenerator;
 import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
 import org.apache.drill.exec.expr.holders.ValueHolder;
 import org.apache.drill.exec.memory.BufferAllocator;
@@ -59,15 +55,21 @@ import org.apache.drill.exec.testing.ExecutionControls;
 import org.apache.drill.exec.util.ImpersonationUtil;
 import org.apache.drill.exec.work.batch.IncomingBuffers;
 
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Executor;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import io.netty.buffer.DrillBuf;
 
 /**
  * Contextual objects required for execution of a particular fragment.
+ * This is the implementation; use <tt>FragmentContextInterface</tt>
+ * in code to allow tests to use test-time implementations.
  */
-public class FragmentContext implements AutoCloseable, UdfUtilities, FragmentExecContext {
+
+public class FragmentContext extends BaseFragmentContext implements AutoCloseable, UdfUtilities, FragmentContextInterface {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentContext.class);
 
   private final Map<DrillbitEndpoint, AccountingDataTunnel> tunnels = Maps.newHashMap();
@@ -77,7 +79,6 @@ public class FragmentContext implements AutoCloseable, UdfUtilities, FragmentExe
   private final UserClientConnection connection; // is null if this context is for non-root fragment
   private final QueryContext queryContext; // is null if this context is for non-root fragment
   private final FragmentStats stats;
-  private final FunctionImplementationRegistry funcRegistry;
   private final BufferAllocator allocator;
   private final PlanFragment fragment;
   private final ContextInformation contextInformation;
@@ -87,7 +88,6 @@ public class FragmentContext implements AutoCloseable, UdfUtilities, FragmentExe
   private ExecutorState executorState;
   private final ExecutionControls executionControls;
 
-
   private final SendingAccountor sendingAccountor = new SendingAccountor();
   private final Consumer<RpcException> exceptionConsumer = new Consumer<RpcException>() {
     @Override
@@ -135,12 +135,12 @@ public class FragmentContext implements AutoCloseable, UdfUtilities, FragmentExe
   public FragmentContext(final DrillbitContext dbContext, final PlanFragment fragment, final QueryContext queryContext,
       final UserClientConnection connection, final FunctionImplementationRegistry funcRegistry)
     throws ExecutionSetupException {
+    super(funcRegistry);
     this.context = dbContext;
     this.queryContext = queryContext;
     this.connection = connection;
     this.accountingUserConnection = new AccountingUserConnection(connection, sendingAccountor, statusHandler);
     this.fragment = fragment;
-    this.funcRegistry = funcRegistry;
     contextInformation = new ContextInformation(fragment.getCredentials(), fragment.getContext());
 
     logger.debug("Getting initial memory allocation of {}", fragment.getMemInitial());
@@ -225,6 +225,7 @@ public class FragmentContext implements AutoCloseable, UdfUtilities, FragmentExe
     return executorState.shouldContinue();
   }
 
+  @Override
   public DrillbitContext getDrillbitContext() {
     return context;
   }
@@ -313,25 +314,8 @@ public class FragmentContext implements AutoCloseable, UdfUtilities, FragmentExe
   }
 
   @Override
-  public <T> T getImplementationClass(final ClassGenerator<T> cg)
-      throws ClassTransformationException, IOException {
-    return getImplementationClass(cg.getCodeGenerator());
-  }
-
-  @Override
-  public <T> T getImplementationClass(final CodeGenerator<T> cg)
-      throws ClassTransformationException, IOException {
-    return context.getCompiler().createInstance(cg);
-  }
-
-  @Override
-  public <T> List<T> getImplementationClass(final ClassGenerator<T> cg, final int instanceCount) throws ClassTransformationException, IOException {
-    return getImplementationClass(cg.getCodeGenerator(), instanceCount);
-  }
-
-  @Override
-  public <T> List<T> getImplementationClass(final CodeGenerator<T> cg, final int instanceCount) throws ClassTransformationException, IOException {
-    return context.getCompiler().createInstances(cg, instanceCount);
+  protected CodeCompiler getCompiler() {
+    return context.getCompiler();
   }
 
   public AccountingUserConnection getUserDataTunnel() {
@@ -383,11 +367,6 @@ public class FragmentContext implements AutoCloseable, UdfUtilities, FragmentExe
   }
 
   @Override
-  public FunctionImplementationRegistry getFunctionRegistry() {
-    return funcRegistry;
-  }
-
-  @Override
   public DrillConfig getConfig() {
     return context.getConfig();
   }
@@ -439,19 +418,6 @@ public class FragmentContext implements AutoCloseable, UdfUtilities, FragmentExe
     }
   }
 
-  public DrillBuf replace(final DrillBuf old, final int newSize) {
-    return bufferManager.replace(old, newSize);
-  }
-
-  @Override
-  public DrillBuf getManagedBuffer() {
-    return bufferManager.getManagedBuffer();
-  }
-
-  public DrillBuf getManagedBuffer(final int size) {
-    return bufferManager.getManagedBuffer(size);
-  }
-
   @Override
   public PartitionExplorer getPartitionExplorer() {
     throw new UnsupportedOperationException(String.format("The partition explorer interface can only be used " +
@@ -494,6 +460,11 @@ public class FragmentContext implements AutoCloseable, UdfUtilities, FragmentExe
     return buffers.isDone();
   }
 
+  @Override
+  protected BufferManager getBufferManager() {
+    return bufferManager;
+  }
+
   public interface ExecutorState {
     /**
      * Whether execution should continue.

http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextInterface.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextInterface.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextInterface.java
new file mode 100644
index 0000000..7d4ba18
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextInterface.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.ops;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.exception.ClassTransformationException;
+import org.apache.drill.exec.expr.ClassGenerator;
+import org.apache.drill.exec.expr.CodeGenerator;
+import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.options.OptionSet;
+import org.apache.drill.exec.testing.ExecutionControls;
+
+import io.netty.buffer.DrillBuf;
+
+/**
+ * Fragment context interface: separates implementation from definition.
+ * Allows unit testing by mocking or reimplementing services with
+ * test-time versions. The name is awkward, chosen to avoid renaming
+ * the implementation class which is used in many places in legacy code.
+ * New code should use this interface, and the names should eventually
+ * be swapped with <tt>FragmentContext</tt> becoming
+ * <tt>FragmentContextImpl</tt> and this interface becoming
+ * <tt>FragmentContext</tt>.
+ */
+
+public interface FragmentContextInterface {
+
+  /**
+   * Drillbit context. Valid only in production; returns null in
+   * operator test environments.
+   */
+
+  DrillbitContext getDrillbitContext();
+
+  /**
+   * Returns the UDF registry.
+   * @return the UDF registry
+   */
+  FunctionImplementationRegistry getFunctionRegistry();
+  /**
+   * Returns a read-only version of the session options.
+   * @return the session options
+   */
+  OptionSet getOptionSet();
+
+  /**
+   * Generates code for a class given a {@link ClassGenerator},
+   * and returns a single instance of the generated class. (Note
+   * that the name is a misnomer, it would be better called
+   * <tt>getImplementationInstance</tt>.)
+   *
+   * @param cg the class generator
+   * @return an instance of the generated class
+   */
+
+  <T> T getImplementationClass(final ClassGenerator<T> cg)
+      throws ClassTransformationException, IOException;
+
+  /**
+   * Generates code for a class given a {@link CodeGenerator},
+   * and returns a single instance of the generated class. (Note
+   * that the name is a misnomer, it would be better called
+   * <tt>getImplementationInstance</tt>.)
+   *
+   * @param cg the code generator
+   * @return an instance of the generated class
+   */
+
+  <T> T getImplementationClass(final CodeGenerator<T> cg)
+      throws ClassTransformationException, IOException;
+
+  /**
+   * Generates code for a class given a {@link ClassGenerator}, and returns the
+   * specified number of instances of the generated class. (Note that the name
+   * is a misnomer, it would be better called
+   * <tt>getImplementationInstances</tt>.)
+   *
+   * @param cg
+   *          the class generator
+   * @return list of instances of the generated class
+   */
+
+  <T> List<T> getImplementationClass(final ClassGenerator<T> cg, final int instanceCount)
+      throws ClassTransformationException, IOException;
+
+  /**
+   * Generates code for a class given a {@link CodeGenerator}, and returns the
+   * specified number of instances of the generated class. (Note that the name
+   * is a misnomer, it would be better called
+   * <tt>getImplementationInstances</tt>.)
+   *
+   * @param cg
+   *          the code generator
+   * @return list of instances of the generated class
+   */
+
+  <T> List<T> getImplementationClass(final CodeGenerator<T> cg, final int instanceCount)
+      throws ClassTransformationException, IOException;
+
+  /**
+   * Determine if fragment execution has been interrupted.
+   * @return true if execution should continue, false if an interruption has
+   * occurred and fragment execution should halt
+   */
+
+  boolean shouldContinue();
+
+  /**
+   * Return the set of execution controls used to inject faults into running
+   * code for testing.
+   *
+   * @return the execution controls
+   */
+  ExecutionControls getExecutionControls();
+
+  /**
+   * Returns the Drill configuration for this run. Note that the config is
+   * global and immutable.
+   *
+   * @return the Drill configuration
+   */
+
+  DrillConfig getConfig();
+
+  DrillBuf replace(DrillBuf old, int newSize);
+
+  DrillBuf getManagedBuffer();
+
+  DrillBuf getManagedBuffer(int size);
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentExecContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentExecContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentExecContext.java
deleted file mode 100644
index 526c030..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentExecContext.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.ops;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.exec.exception.ClassTransformationException;
-import org.apache.drill.exec.expr.ClassGenerator;
-import org.apache.drill.exec.expr.CodeGenerator;
-import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
-import org.apache.drill.exec.server.options.OptionSet;
-import org.apache.drill.exec.testing.ExecutionControls;
-
-/**
- * Services passed to fragments that deal only with execution details
- * such as the function registry, options, code generation and the like.
- * Does not include top-level services such as network endpoints. Code
- * written to use this interface can be unit tested quite easily using
- * the {@link OperatorContext} class. Code that uses the wider,
- * more global {@link FragmentContext} must be tested in the context
- * of the entire Drill server, or using mocks for the global services.
- */
-
-public interface FragmentExecContext {
-  /**
-   * Returns the UDF registry.
-   * @return the UDF registry
-   */
-  FunctionImplementationRegistry getFunctionRegistry();
-  /**
-   * Returns a read-only version of the session options.
-   * @return the session options
-   */
-  OptionSet getOptionSet();
-
-  /**
-   * Generates code for a class given a {@link ClassGenerator},
-   * and returns a single instance of the generated class. (Note
-   * that the name is a misnomer, it would be better called
-   * <tt>getImplementationInstance</tt>.)
-   *
-   * @param cg the class generator
-   * @return an instance of the generated class
-   */
-
-  <T> T getImplementationClass(final ClassGenerator<T> cg)
-      throws ClassTransformationException, IOException;
-
-  /**
-   * Generates code for a class given a {@link CodeGenerator},
-   * and returns a single instance of the generated class. (Note
-   * that the name is a misnomer, it would be better called
-   * <tt>getImplementationInstance</tt>.)
-   *
-   * @param cg the code generator
-   * @return an instance of the generated class
-   */
-
-  <T> T getImplementationClass(final CodeGenerator<T> cg)
-      throws ClassTransformationException, IOException;
-
-  /**
-   * Generates code for a class given a {@link ClassGenerator}, and returns the
-   * specified number of instances of the generated class. (Note that the name
-   * is a misnomer, it would be better called
-   * <tt>getImplementationInstances</tt>.)
-   *
-   * @param cg
-   *          the class generator
-   * @return list of instances of the generated class
-   */
-
-  <T> List<T> getImplementationClass(final ClassGenerator<T> cg, final int instanceCount)
-      throws ClassTransformationException, IOException;
-
-  /**
-   * Generates code for a class given a {@link CodeGenerator}, and returns the
-   * specified number of instances of the generated class. (Note that the name
-   * is a misnomer, it would be better called
-   * <tt>getImplementationInstances</tt>.)
-   *
-   * @param cg
-   *          the code generator
-   * @return list of instances of the generated class
-   */
-
-  <T> List<T> getImplementationClass(final CodeGenerator<T> cg, final int instanceCount)
-      throws ClassTransformationException, IOException;
-
-  /**
-   * Determine if fragment execution has been interrupted.
-   * @return true if execution should continue, false if an interruption has
-   * occurred and fragment execution should halt
-   */
-
-  boolean shouldContinue();
-
-  /**
-   * Return the set of execution controls used to inject faults into running
-   * code for testing.
-   *
-   * @return the execution controls
-   */
-  ExecutionControls getExecutionControls();
-
-  /**
-   * Returns the Drill configuration for this run. Note that the config is
-   * global and immutable.
-   *
-   * @return the Drill configuration
-   */
-
-  DrillConfig getConfig();
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperExecContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperExecContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperExecContext.java
deleted file mode 100644
index 89f3b63..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperExecContext.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.ops;
-
-import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.testing.ControlsInjector;
-
-/**
- * Defines the set of services used by operator implementations. This
- * is a subset of the full {@link OperatorContext} which removes global
- * services such as network endpoints. Code written to this interface
- * can be easily unit tested using the {@link OperatorFixture} class.
- * Code that needs global services must be tested in the Drill server
- * as a whole, or using mocks for global services.
- */
-
-public interface OperExecContext extends FragmentExecContext {
-
-  /**
-   * Return the physical operator definition created by the planner and passed
-   * into the Drillbit executing the query.
-   * @return the physical operator definition
-   */
-
-  <T extends PhysicalOperator> T getOperatorDefn();
-
-  /**
-   * Return the memory allocator for this operator.
-   *
-   * @return the per-operator memory allocator
-   */
-
-  BufferAllocator getAllocator();
-
-  /**
-   * A write-only interface to the Drill statistics mechanism. Allows
-   * operators to update statistics.
-   * @return operator statistics
-   */
-
-  OperatorStatReceiver getStats();
-
-  /**
-   * Returns the fault injection mechanism used to introduce faults at runtime
-   * for testing.
-   * @return the fault injector
-   */
-
-  ControlsInjector getInjector();
-
-  /**
-   * Insert an unchecked fault (exception). Handles the details of checking if
-   * fault injection is enabled and this particular fault is selected.
-   * @param desc the description of the fault used to match a fault
-   * injection parameter to determine if the fault should be injected
-   * @throws RuntimeException an unchecked exception if the fault is enabled
-   */
-
-  void injectUnchecked(String desc);
-
-  /**
-   * Insert a checked fault (exception) of the given class. Handles the details
-   * of checking if fault injection is enabled and this particular fault is
-   * selected.
-   *
-   * @param desc the description of the fault used to match a fault
-   * injection parameter to determine if the fault should be injected
-   * @param exceptionClass the class of exeception to be thrown
-   * @throws T if the fault is enabled
-   */
-
-  <T extends Throwable> void injectChecked(String desc, Class<T> exceptionClass)
-      throws T;
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperExecContextImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperExecContextImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperExecContextImpl.java
deleted file mode 100644
index b625e76..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperExecContextImpl.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.ops;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.exec.exception.ClassTransformationException;
-import org.apache.drill.exec.expr.ClassGenerator;
-import org.apache.drill.exec.expr.CodeGenerator;
-import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
-import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.server.options.OptionSet;
-import org.apache.drill.exec.testing.ControlsInjector;
-import org.apache.drill.exec.testing.ExecutionControls;
-
-/**
- * Implementation of the context used by low-level operator
- * tasks.
- */
-
-public class OperExecContextImpl implements OperExecContext {
-
-  private FragmentExecContext fragmentContext;
-  private PhysicalOperator operDefn;
-  private ControlsInjector injector;
-  private BufferAllocator allocator;
-  private OperatorStatReceiver stats;
-
-  public OperExecContextImpl(FragmentExecContext fragContext, OperatorContext opContext, PhysicalOperator opDefn, ControlsInjector injector) {
-    this(fragContext, opContext.getAllocator(), opContext.getStats(), opDefn, injector);
-  }
-
-  public OperExecContextImpl(FragmentExecContext fragContext, BufferAllocator allocator, OperatorStatReceiver stats, PhysicalOperator opDefn, ControlsInjector injector) {
-    this.fragmentContext = fragContext;
-    this.operDefn = opDefn;
-    this.injector = injector;
-    this.allocator = allocator;
-    this.stats = stats;
-  }
-
-  @Override
-  public FunctionImplementationRegistry getFunctionRegistry() {
-    return fragmentContext.getFunctionRegistry();
-  }
-
-  @Override
-  public OptionSet getOptionSet() {
-    return fragmentContext.getOptionSet();
-  }
-
-  @Override
-  public <T> T getImplementationClass(ClassGenerator<T> cg)
-      throws ClassTransformationException, IOException {
-    return fragmentContext.getImplementationClass(cg);
-  }
-
-  @Override
-  public <T> T getImplementationClass(CodeGenerator<T> cg)
-      throws ClassTransformationException, IOException {
-    return fragmentContext.getImplementationClass(cg);
-  }
-
-  @Override
-  public <T> List<T> getImplementationClass(ClassGenerator<T> cg,
-      int instanceCount) throws ClassTransformationException, IOException {
-    return fragmentContext.getImplementationClass(cg, instanceCount);
-  }
-
-  @Override
-  public <T> List<T> getImplementationClass(CodeGenerator<T> cg,
-      int instanceCount) throws ClassTransformationException, IOException {
-    return fragmentContext.getImplementationClass(cg, instanceCount);
-  }
-
-  @Override
-  public boolean shouldContinue() {
-    return fragmentContext.shouldContinue();
-  }
-
-  @Override
-  public ExecutionControls getExecutionControls() {
-    return fragmentContext.getExecutionControls();
-  }
-
-  @Override
-  public BufferAllocator getAllocator() {
-    return allocator;
-  }
-
-  @Override
-  public OperatorStatReceiver getStats() {
-    return stats;
-  }
-
-  @SuppressWarnings("unchecked")
-  @Override
-  public <T extends PhysicalOperator> T getOperatorDefn() {
-    return (T) operDefn;
-  }
-
-  @Override
-  public DrillConfig getConfig() {
-    return fragmentContext.getConfig();
-  }
-
-  @Override
-  public ControlsInjector getInjector() {
-    return injector;
-  }
-
-  @Override
-  public void injectUnchecked(String desc) {
-    ExecutionControls executionControls = fragmentContext.getExecutionControls();
-    if (injector != null  &&  executionControls != null) {
-      injector.injectUnchecked(executionControls, desc);
-    }
-  }
-
-  @Override
-  public <T extends Throwable> void injectChecked(String desc, Class<T> exceptionClass)
-      throws T {
-    ExecutionControls executionControls = fragmentContext.getExecutionControls();
-    if (injector != null  &&  executionControls != null) {
-      injector.injectChecked(executionControls, desc, exceptionClass);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
index b248d5f..37653e0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
@@ -21,13 +21,70 @@ import java.io.IOException;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.testing.ControlsInjector;
+import org.apache.drill.exec.testing.ExecutionControls;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
 
 import com.google.common.util.concurrent.ListenableFuture;
 
-public interface OperatorContext extends OperatorExecContext {
+import io.netty.buffer.DrillBuf;
+
+/**
+ * Per-operator services available for operator implementations.
+ * The services allow access to the operator definition, to the
+ * fragment context, and to per-operator services.
+ * <p>
+ * Use this interface in code to allow unit tests to provide
+ * test-time implementations of this context.
+ */
+
+public interface OperatorContext {
+
+  /**
+   * Return the physical operator definition created by the planner and passed
+   * into the Drillbit executing the query.
+   * @return the physical operator definition
+   */
+
+  <T extends PhysicalOperator> T getOperatorDefn();
+
+  /**
+   * Return the memory allocator for this operator.
+   *
+   * @return the per-operator memory allocator
+   */
+
+  BufferAllocator getAllocator();
+
+  FragmentContextInterface getFragmentContext();
+
+  DrillBuf replace(DrillBuf old, int newSize);
+
+  DrillBuf getManagedBuffer();
+
+  DrillBuf getManagedBuffer(int size);
+
+  ExecutionControls getExecutionControls();
+
+  /**
+   * A write-only interface to the Drill statistics mechanism. Allows
+   * operators to update statistics.
+   * @return operator statistics
+   */
+
+  OperatorStatReceiver getStatsWriter();
+
+  /**
+   * Full operator stats (for legacy code). Prefer
+   * <tt>getStatsWriter()</tt> to allow code to easily run in a
+   * test environment.
+   *
+   * @return operator statistics
+   */
 
   OperatorStats getStats();
 
@@ -51,4 +108,40 @@ public interface OperatorContext extends OperatorExecContext {
    */
   <RESULT> ListenableFuture<RESULT> runCallableAs(UserGroupInformation proxyUgi,
                                                                   Callable<RESULT> callable);
- }
+
+  void setInjector(ControlsInjector injector);
+
+  /**
+   * Returns the fault injection mechanism used to introduce faults at runtime
+   * for testing.
+   * @return the fault injector
+   */
+
+  ControlsInjector getInjector();
+
+  /**
+   * Insert an unchecked fault (exception). Handles the details of checking if
+   * fault injection is enabled and this particular fault is selected.
+   * @param desc the description of the fault used to match a fault
+   * injection parameter to determine if the fault should be injected
+   * @throws RuntimeException an unchecked exception if the fault is enabled
+   */
+
+  void injectUnchecked(String desc);
+
+  /**
+   * Insert a checked fault (exception) of the given class. Handles the details
+   * of checking if fault injection is enabled and this particular fault is
+   * selected.
+   *
+   * @param desc the description of the fault used to match a fault
+   * injection parameter to determine if the fault should be injected
+   * @param exceptionClass the class of exeception to be thrown
+   * @throws T if the fault is enabled
+   */
+
+  <T extends Throwable> void injectChecked(String desc, Class<T> exceptionClass)
+      throws T;
+
+  void close();
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
index 37c609e..bc85c39 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
@@ -17,33 +17,23 @@
  */
 package org.apache.drill.exec.ops;
 
-import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
 import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
 
-import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.store.dfs.DrillFileSystem;
 import org.apache.drill.exec.work.WorkManager;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
 
-import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 
-class OperatorContextImpl extends AbstractOperatorExecContext implements OperatorContext, AutoCloseable {
+class OperatorContextImpl extends BaseOperatorContext implements AutoCloseable {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OperatorContextImpl.class);
 
   private boolean closed = false;
   private final OperatorStats stats;
-  private DrillFileSystem fs;
-  private final ExecutorService executor;
-  private final ExecutorService scanExecutor;
-  private final ExecutorService scanDecodeExecutor;
 
   /**
    * This lazily initialized executor service is used to submit a {@link Callable task} that needs a proxy user. There
@@ -59,9 +49,10 @@ class OperatorContextImpl extends AbstractOperatorExecContext implements Operato
 
   public OperatorContextImpl(PhysicalOperator popConfig, FragmentContext context, OperatorStats stats)
       throws OutOfMemoryException {
-    super(context.getNewChildAllocator(popConfig.getClass().getSimpleName(),
-          popConfig.getOperatorId(), popConfig.getInitialAllocation(), popConfig.getMaxAllocation()),
-          popConfig, context.getExecutionControls(), stats);
+    super(context,
+          context.getNewChildAllocator(popConfig.getClass().getSimpleName(),
+              popConfig.getOperatorId(), popConfig.getInitialAllocation(), popConfig.getMaxAllocation()),
+          popConfig);
     if (stats != null) {
       this.stats = stats;
     } else {
@@ -70,25 +61,6 @@ class OperatorContextImpl extends AbstractOperatorExecContext implements Operato
                            OperatorUtilities.getChildCount(popConfig));
       this.stats = context.getStats().newOperatorStats(def, allocator);
     }
-    executor = context.getDrillbitContext().getExecutor();
-    scanExecutor = context.getDrillbitContext().getScanExecutor();
-    scanDecodeExecutor = context.getDrillbitContext().getScanDecodeExecutor();
-  }
-
-  // Allow an operator to use the thread pool
-  @Override
-  public ExecutorService getExecutor() {
-    return executor;
-  }
-
-  @Override
-  public ExecutorService getScanExecutor() {
-    return scanExecutor;
-  }
-
-  @Override
-  public ExecutorService getScanDecodeExecutor() {
-    return scanDecodeExecutor;
   }
 
   public boolean isClosed() {
@@ -98,23 +70,15 @@ class OperatorContextImpl extends AbstractOperatorExecContext implements Operato
   @Override
   public void close() {
     if (closed) {
-      logger.debug("Attempted to close Operator context for {}, but context is already closed", popConfig != null ? popConfig.getClass().getName() : null);
+      logger.debug("Attempted to close Operator context for {}, but context is already closed", popConfig != null ? getName() : null);
       return;
     }
-    logger.debug("Closing context for {}", popConfig != null ? popConfig.getClass().getName() : null);
+    logger.debug("Closing context for {}", popConfig != null ? getName() : null);
 
-    closed = true;
     try {
       super.close();
     } finally {
-      if (fs != null) {
-        try {
-          fs.close();
-          fs = null;
-        } catch (IOException e) {
-          throw new DrillRuntimeException(e);
-        }
-      }
+      closed = true;
     }
   }
 
@@ -124,11 +88,16 @@ class OperatorContextImpl extends AbstractOperatorExecContext implements Operato
   }
 
   @Override
+  public OperatorStatReceiver getStatsWriter() {
+    return stats;
+  }
+
+  @Override
   public <RESULT> ListenableFuture<RESULT> runCallableAs(final UserGroupInformation proxyUgi,
                                                          final Callable<RESULT> callable) {
     synchronized (this) {
       if (delegatePool == null) {
-        delegatePool = MoreExecutors.listeningDecorator(executor);
+        delegatePool = MoreExecutors.listeningDecorator(getExecutor());
       }
     }
     return delegatePool.submit(new Callable<RESULT>() {
@@ -152,21 +121,4 @@ class OperatorContextImpl extends AbstractOperatorExecContext implements Operato
       }
     });
   }
-
-  @Override
-  public DrillFileSystem newFileSystem(Configuration conf) throws IOException {
-    Preconditions.checkState(fs == null, "Tried to create a second FileSystem. Can only be called once per OperatorContext");
-    fs = new DrillFileSystem(conf, getStats());
-    return fs;
-  }
-
-  /**
-   * Creates a DrillFileSystem that does not automatically track operator stats.
-   */
-  @Override
-  public DrillFileSystem newNonTrackingFileSystem(Configuration conf) throws IOException {
-    Preconditions.checkState(fs == null, "Tried to create a second FileSystem. Can only be called once per OperatorContext");
-    fs = new DrillFileSystem(conf, null);
-    return fs;
-  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorExecContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorExecContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorExecContext.java
deleted file mode 100644
index 4d64aba..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorExecContext.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.ops;
-
-import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.testing.ExecutionControls;
-
-import io.netty.buffer.DrillBuf;
-
-/**
- * Narrowed version of the {@link OperatorContext} used to create an
- * easy-to-test version of the operator context that excludes services
- * that require a full Drillbit server.
- */
-
-public interface OperatorExecContext {
-
-  DrillBuf replace(DrillBuf old, int newSize);
-
-  DrillBuf getManagedBuffer();
-
-  DrillBuf getManagedBuffer(int size);
-
-  BufferAllocator getAllocator();
-
-  ExecutionControls getExecutionControls();
-
-  OperatorStatReceiver getStatsWriter();
-
-  void close();
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStatReceiver.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStatReceiver.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStatReceiver.java
index 6aa8d76..4dba2c0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStatReceiver.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStatReceiver.java
@@ -67,4 +67,8 @@ public interface OperatorStatReceiver {
    */
 
   void setDoubleStat(MetricDef metric, double value);
+
+  void startWait();
+
+  void stopWait();
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java
index b3c9ff9..1b96f28 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java
@@ -169,6 +169,7 @@ public class OperatorStats implements OperatorStatReceiver {
     inProcessing = false;
   }
 
+  @Override
   public synchronized void startWait() {
     assert !inWait : assertionError("starting waiting");
     stopProcessing();
@@ -176,6 +177,7 @@ public class OperatorStats implements OperatorStatReceiver {
     waitMark = System.nanoTime();
   }
 
+  @Override
   public synchronized void stopWait() {
     assert inWait : assertionError("stopping waiting");
     startProcessing();

http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index 6c2c171..77e9ea4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -32,7 +32,6 @@ import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.OperatorContext;
-import org.apache.drill.exec.ops.OperatorExecContext;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
@@ -319,9 +318,9 @@ public class ScanBatch implements CloseableRecordBatch {
 
     private final VectorContainer container;
 
-    private final OperatorExecContext oContext;
+    private final OperatorContext oContext;
 
-    public Mutator(OperatorExecContext oContext, BufferAllocator allocator, VectorContainer container) {
+    public Mutator(OperatorContext oContext, BufferAllocator allocator, VectorContainer container) {
       this.oContext = oContext;
       this.allocator = allocator;
       this.container = container;

http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorter.java
index ccaca98..733ea5e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorter.java
@@ -19,12 +19,12 @@ package org.apache.drill.exec.physical.impl.xsort;
 
 import org.apache.drill.exec.compile.TemplateClassDefinition;
 import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.ops.FragmentExecContext;
+import org.apache.drill.exec.ops.FragmentContextInterface;
 import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 
 public interface SingleBatchSorter {
-  public void setup(FragmentExecContext context, SelectionVector2 vector2, VectorAccessible incoming) throws SchemaChangeException;
+  public void setup(FragmentContextInterface context, SelectionVector2 vector2, VectorAccessible incoming) throws SchemaChangeException;
   public void sort(SelectionVector2 vector2) throws SchemaChangeException;
 
   public static TemplateClassDefinition<SingleBatchSorter> TEMPLATE_DEFINITION =

http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorterTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorterTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorterTemplate.java
index 672dd2b..0f4680d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorterTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorterTemplate.java
@@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit;
 import javax.inject.Named;
 
 import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.ops.FragmentExecContext;
+import org.apache.drill.exec.ops.FragmentContextInterface;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.selection.SelectionVector2;
@@ -38,7 +38,7 @@ public abstract class SingleBatchSorterTemplate implements SingleBatchSorter, In
   private SelectionVector2 vector2;
 
   @Override
-  public void setup(FragmentExecContext context, SelectionVector2 vector2, VectorAccessible incoming) throws SchemaChangeException{
+  public void setup(FragmentContextInterface context, SelectionVector2 vector2, VectorAccessible incoming) throws SchemaChangeException{
     Preconditions.checkNotNull(vector2);
     this.vector2 = vector2;
     try {
@@ -76,7 +76,7 @@ public abstract class SingleBatchSorterTemplate implements SingleBatchSorter, In
     }
   }
 
-  public abstract void doSetup(@Named("context") FragmentExecContext context,
+  public abstract void doSetup(@Named("context") FragmentContextInterface context,
                                @Named("incoming") VectorAccessible incoming,
                                @Named("outgoing") RecordBatch outgoing)
                        throws SchemaChangeException;

http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BaseSortWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BaseSortWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BaseSortWrapper.java
index 1f381b9..338462e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BaseSortWrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BaseSortWrapper.java
@@ -28,7 +28,7 @@ import org.apache.drill.exec.expr.ClassGenerator;
 import org.apache.drill.exec.expr.ClassGenerator.HoldingContainer;
 import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
 import org.apache.drill.exec.expr.fn.FunctionGenerationHelper;
-import org.apache.drill.exec.ops.OperExecContext;
+import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.config.Sort;
 import org.apache.drill.exec.record.VectorAccessible;
 
@@ -45,7 +45,7 @@ public abstract class BaseSortWrapper extends BaseWrapper {
   protected static final MappingSet LEFT_MAPPING = new MappingSet("leftIndex", null, ClassGenerator.DEFAULT_SCALAR_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
   protected static final MappingSet RIGHT_MAPPING = new MappingSet("rightIndex", null, ClassGenerator.DEFAULT_SCALAR_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
 
-  public BaseSortWrapper(OperExecContext opContext) {
+  public BaseSortWrapper(OperatorContext opContext) {
     super(opContext);
   }
 
@@ -56,7 +56,8 @@ public abstract class BaseSortWrapper extends BaseWrapper {
     for (Ordering od : popConfig.getOrderings()) {
       // first, we rewrite the evaluation stack for each side of the comparison.
       ErrorCollector collector = new ErrorCollectorImpl();
-      final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), batch, collector, context.getFunctionRegistry());
+      final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), batch, collector,
+          context.getFragmentContext().getFunctionRegistry());
       if (collector.hasErrors()) {
         throw UserException.unsupportedError()
               .message("Failure while materializing expression. " + collector.toErrorString())
@@ -71,7 +72,7 @@ public abstract class BaseSortWrapper extends BaseWrapper {
       // next we wrap the two comparison sides and add the expression block for the comparison.
       LogicalExpression fh =
           FunctionGenerationHelper.getOrderingComparator(od.nullsSortHigh(), left, right,
-                                                         context.getFunctionRegistry());
+                                                         context.getFragmentContext().getFunctionRegistry());
       HoldingContainer out = g.addExpr(fh, ClassGenerator.BlkCreateMode.FALSE);
       JConditional jc = g.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0)));
 

http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BaseWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BaseWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BaseWrapper.java
index e607f40..0287059 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BaseWrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BaseWrapper.java
@@ -22,7 +22,7 @@ import java.io.IOException;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.exception.ClassTransformationException;
 import org.apache.drill.exec.expr.CodeGenerator;
-import org.apache.drill.exec.ops.OperExecContext;
+import org.apache.drill.exec.ops.OperatorContext;
 
 /**
  * Base class for code-generation-based tasks.
@@ -30,15 +30,15 @@ import org.apache.drill.exec.ops.OperExecContext;
 
 public abstract class BaseWrapper {
 
-  protected OperExecContext context;
+  protected OperatorContext context;
 
-  public BaseWrapper(OperExecContext context) {
+  public BaseWrapper(OperatorContext context) {
     this.context = context;
   }
 
   protected <T> T getInstance(CodeGenerator<T> cg, org.slf4j.Logger logger) {
     try {
-      return context.getImplementationClass(cg);
+      return context.getFragmentContext().getImplementationClass(cg);
     } catch (ClassTransformationException e) {
       throw UserException.unsupportedError(e)
             .message("Code generation error - likely code error.")

http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BufferedBatches.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BufferedBatches.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BufferedBatches.java
index c930877..f26f6b8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BufferedBatches.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BufferedBatches.java
@@ -24,7 +24,7 @@ import java.util.List;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.ops.OperExecContext;
+import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
 import org.apache.drill.exec.physical.impl.xsort.managed.BatchGroup.InputBatch;
 import org.apache.drill.exec.record.BatchSchema;
@@ -55,9 +55,9 @@ public class BufferedBatches {
 
   private BatchSchema schema;
 
-  private final OperExecContext context;
+  private final OperatorContext context;
 
-  public BufferedBatches(OperExecContext opContext) {
+  public BufferedBatches(OperatorContext opContext) {
     context = opContext;
     sorterWrapper = new SorterWrapper(opContext);
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
index 6a97c29..2054c9b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
@@ -20,8 +20,6 @@ package org.apache.drill.exec.physical.impl.xsort.managed;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.MetricDef;
-import org.apache.drill.exec.ops.OperExecContext;
-import org.apache.drill.exec.ops.OperExecContextImpl;
 import org.apache.drill.exec.physical.config.ExternalSort;
 import org.apache.drill.exec.physical.impl.spill.SpillSet;
 import org.apache.drill.exec.physical.impl.validate.IteratorValidatorBatchIterator;
@@ -218,10 +216,10 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
 
     SortConfig sortConfig = new SortConfig(context.getConfig());
     SpillSet spillSet = new SpillSet(context.getConfig(), context.getHandle(), popConfig);
-    OperExecContext opContext = new OperExecContextImpl(context, oContext, popConfig, injector);
-    PriorityQueueCopierWrapper copierHolder = new PriorityQueueCopierWrapper(opContext);
-    SpilledRuns spilledRuns = new SpilledRuns(opContext, spillSet, copierHolder);
-    sortImpl = new SortImpl(opContext, sortConfig, spilledRuns, container);
+    oContext.setInjector(injector);
+    PriorityQueueCopierWrapper copierHolder = new PriorityQueueCopierWrapper(oContext);
+    SpilledRuns spilledRuns = new SpilledRuns(oContext, spillSet, copierHolder);
+    sortImpl = new SortImpl(oContext, sortConfig, spilledRuns, container);
 
     // The upstream operator checks on record count before we have
     // results. Create an empty result set temporarily to handle

http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSortTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSortTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSortTemplate.java
index 5b07c4a..698e32f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSortTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSortTemplate.java
@@ -24,7 +24,7 @@ import javax.inject.Named;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.memory.BaseAllocator;
 import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.ops.FragmentExecContext;
+import org.apache.drill.exec.ops.FragmentContextInterface;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.selection.SelectionVector4;
@@ -49,7 +49,7 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable {
    */
 
   private Queue<Integer> runStarts = Queues.newLinkedBlockingQueue();
-  private FragmentExecContext context;
+  private FragmentContextInterface context;
 
   /**
    * Controls the maximum size of batches exposed to downstream
@@ -57,7 +57,7 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable {
   private int desiredRecordBatchCount;
 
   @Override
-  public void setup(final FragmentExecContext context, final BufferAllocator allocator, final SelectionVector4 vector4,
+  public void setup(final FragmentContextInterface context, final BufferAllocator allocator, final SelectionVector4 vector4,
                     final VectorContainer hyperBatch, int outputBatchSize, int desiredBatchSize) throws SchemaChangeException{
     // we pass in the local hyperBatch since that is where we'll be reading data.
     Preconditions.checkNotNull(vector4);
@@ -233,7 +233,7 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable {
     }
   }
 
-  public abstract void doSetup(@Named("context") FragmentExecContext context,
+  public abstract void doSetup(@Named("context") FragmentContextInterface context,
                                @Named("incoming") VectorContainer incoming,
                                @Named("outgoing") RecordBatch outgoing)
                        throws SchemaChangeException;

http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSorter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSorter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSorter.java
index 71ae29e..428f6f8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSorter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSorter.java
@@ -20,7 +20,7 @@ package org.apache.drill.exec.physical.impl.xsort.managed;
 import org.apache.drill.exec.compile.TemplateClassDefinition;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.ops.FragmentExecContext;
+import org.apache.drill.exec.ops.FragmentContextInterface;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.selection.SelectionVector4;
 
@@ -30,7 +30,7 @@ import org.apache.drill.exec.record.selection.SelectionVector4;
  */
 
 public interface MSorter {
-  public void setup(FragmentExecContext context, BufferAllocator allocator, SelectionVector4 vector4,
+  public void setup(FragmentContextInterface context, BufferAllocator allocator, SelectionVector4 vector4,
                     VectorContainer hyperBatch, int outputBatchSize, int desiredBatchSize) throws SchemaChangeException;
   public void sort();
   public SelectionVector4 getSV4();

http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MergeSortWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MergeSortWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MergeSortWrapper.java
index 01135f0..f592e44 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MergeSortWrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MergeSortWrapper.java
@@ -32,7 +32,7 @@ import org.apache.drill.exec.expr.ClassGenerator.HoldingContainer;
 import org.apache.drill.exec.expr.CodeGenerator;
 import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
 import org.apache.drill.exec.expr.fn.FunctionGenerationHelper;
-import org.apache.drill.exec.ops.OperExecContext;
+import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.config.Sort;
 import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
 import org.apache.drill.exec.physical.impl.sort.SortRecordBatchBuilder;
@@ -87,7 +87,7 @@ public class MergeSortWrapper extends BaseSortWrapper implements SortResults {
   private State state = State.FIRST;
   private final VectorContainer destContainer;
 
-  public MergeSortWrapper(OperExecContext opContext, VectorContainer destContainer) {
+  public MergeSortWrapper(OperatorContext opContext, VectorContainer destContainer) {
     super(opContext);
     this.destContainer = destContainer;
   }
@@ -123,7 +123,7 @@ public class MergeSortWrapper extends BaseSortWrapper implements SortResults {
       sv4 = builder.getSv4();
       Sort popConfig = context.getOperatorDefn();
       mSorter = createNewMSorter(popConfig.getOrderings(), MAIN_MAPPING, LEFT_MAPPING, RIGHT_MAPPING);
-      mSorter.setup(context, context.getAllocator(), sv4, destContainer, sv4.getCount(), outputBatchSize);
+      mSorter.setup(context.getFragmentContext(), context.getAllocator(), sv4, destContainer, sv4.getCount(), outputBatchSize);
     } catch (SchemaChangeException e) {
       throw UserException.unsupportedError(e)
             .message("Unexpected schema change - likely code error.")
@@ -142,7 +142,9 @@ public class MergeSortWrapper extends BaseSortWrapper implements SortResults {
   }
 
   private MSorter createNewMSorter(List<Ordering> orderings, MappingSet mainMapping, MappingSet leftMapping, MappingSet rightMapping) {
-    CodeGenerator<MSorter> cg = CodeGenerator.get(MSorter.TEMPLATE_DEFINITION, context.getFunctionRegistry(), context.getOptionSet());
+    CodeGenerator<MSorter> cg = CodeGenerator.get(MSorter.TEMPLATE_DEFINITION,
+        context.getFragmentContext().getFunctionRegistry(),
+        context.getFragmentContext().getOptionSet());
     cg.plainJavaCapable(true);
 
     // Uncomment out this line to debug the generated code.
@@ -153,7 +155,8 @@ public class MergeSortWrapper extends BaseSortWrapper implements SortResults {
     for (Ordering od : orderings) {
       // first, we rewrite the evaluation stack for each side of the comparison.
       ErrorCollector collector = new ErrorCollectorImpl();
-      final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), destContainer, collector, context.getFunctionRegistry());
+      final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), destContainer, collector,
+          context.getFragmentContext().getFunctionRegistry());
       if (collector.hasErrors()) {
         throw UserException.unsupportedError()
               .message("Failure while materializing expression. " + collector.toErrorString())
@@ -168,7 +171,7 @@ public class MergeSortWrapper extends BaseSortWrapper implements SortResults {
       // next we wrap the two comparison sides and add the expression block for the comparison.
       LogicalExpression fh =
           FunctionGenerationHelper.getOrderingComparator(od.nullsSortHigh(), left, right,
-                                                         context.getFunctionRegistry());
+                                                         context.getFragmentContext().getFunctionRegistry());
       HoldingContainer out = g.addExpr(fh, ClassGenerator.BlkCreateMode.FALSE);
       JConditional jc = g.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0)));
 

http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/PriorityQueueCopierWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/PriorityQueueCopierWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/PriorityQueueCopierWrapper.java
index 88686e5..ab8cc9a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/PriorityQueueCopierWrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/PriorityQueueCopierWrapper.java
@@ -30,14 +30,14 @@ import org.apache.drill.exec.expr.ClassGenerator;
 import org.apache.drill.exec.expr.CodeGenerator;
 import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.ops.OperExecContext;
+import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.impl.xsort.managed.SortImpl.SortResults;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.MaterializedField;
-import org.apache.drill.exec.record.VectorInitializer;
 import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorAccessibleUtilities;
 import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorInitializer;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
@@ -66,7 +66,7 @@ public class PriorityQueueCopierWrapper extends BaseSortWrapper {
 
   private PriorityQueueCopier copier;
 
-  public PriorityQueueCopierWrapper(OperExecContext opContext) {
+  public PriorityQueueCopierWrapper(OperatorContext opContext) {
     super(opContext);
   }
 
@@ -80,7 +80,9 @@ public class PriorityQueueCopierWrapper extends BaseSortWrapper {
   private PriorityQueueCopier newCopier(VectorAccessible batch) {
     // Generate the copier code and obtain the resulting class
 
-    CodeGenerator<PriorityQueueCopier> cg = CodeGenerator.get(PriorityQueueCopier.TEMPLATE_DEFINITION, context.getFunctionRegistry(), context.getOptionSet());
+    CodeGenerator<PriorityQueueCopier> cg = CodeGenerator.get(PriorityQueueCopier.TEMPLATE_DEFINITION,
+        context.getFragmentContext().getFunctionRegistry(),
+        context.getFragmentContext().getOptionSet());
     ClassGenerator<PriorityQueueCopier> g = cg.getRoot();
     cg.plainJavaCapable(true);
     // Uncomment out this line to debug the generated code.

http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortImpl.java
index d2b589c..2d53c3b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortImpl.java
@@ -21,17 +21,17 @@ import java.io.IOException;
 import java.util.List;
 
 import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.ops.OperExecContext;
+import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.impl.spill.RecordBatchSizer;
 import org.apache.drill.exec.physical.impl.xsort.MSortTemplate;
 import org.apache.drill.exec.physical.impl.xsort.managed.BatchGroup.InputBatch;
 import org.apache.drill.exec.physical.impl.xsort.managed.SortMemoryManager.MergeTask;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
-import org.apache.drill.exec.record.VectorInitializer;
 import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorAccessibleUtilities;
 import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorInitializer;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
@@ -174,7 +174,7 @@ public class SortImpl {
   private final SortMetrics metrics;
   private final SortMemoryManager memManager;
   private VectorContainer outputBatch;
-  private OperExecContext context;
+  private OperatorContext context;
 
   /**
    * Memory allocator for this operator itself. Incoming batches are
@@ -192,7 +192,7 @@ public class SortImpl {
 
   private VectorInitializer allocHelper;
 
-  public SortImpl(OperExecContext opContext, SortConfig sortConfig,
+  public SortImpl(OperatorContext opContext, SortConfig sortConfig,
                   SpilledRuns spilledRuns, VectorContainer batch) {
     this.context = opContext;
     outputBatch = batch;
@@ -200,7 +200,7 @@ public class SortImpl {
     allocator = opContext.getAllocator();
     config = sortConfig;
     memManager = new SortMemoryManager(config, allocator.getLimit());
-    metrics = new SortMetrics(opContext.getStats());
+    metrics = new SortMetrics(opContext.getStatsWriter());
     bufferedBatches = new BufferedBatches(opContext);
 
     // Request leniency from the allocator. Leniency

http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortMetrics.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortMetrics.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortMetrics.java
index 1233de8..8d20cca 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortMetrics.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortMetrics.java
@@ -44,6 +44,7 @@ public class SortMetrics {
   private long writeBytes;
 
   public SortMetrics(OperatorStatReceiver stats) {
+    assert stats != null;
     this.stats = stats;
   }
 


Mime
View raw message