drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jacq...@apache.org
Subject [9/9] drill git commit: DRILL-2245: Clean up query setup and execution kickoff in Foreman/WorkManager in order to ensure consistent handling, and avoid hangs and races, with the goal of improving Drillbit robustness.
Date Thu, 19 Mar 2015 21:08:19 GMT
DRILL-2245: Clean up query setup and execution kickoff in Foreman/WorkManager in order to ensure consistent handling, and avoid hangs and races, with the goal of improving Drillbit robustness.

I did my best to keep these clean when I split them up, but this core commit
may depend on some minor changes in the hygiene commit that is also
associated with this bug, so either both should be applied, or neither.
The core commit should be applied first.

protocol/pom.xml
- updated protocol buffer compiler version to 2.6
- this made slight modifications to the formats of a few committed protobuf
  files

AutoCloseables
- created org.apache.drill.common.AutoCloseables to handle closing these
  quietly

BaseTestQuery, and derivatives
- factored out pieces into QueryTestUtil so they can be reused

DeferredException:
- created this so we can collect exceptions during the shutdown process

Drillbit
- uses AutoCloseables for the WorkManager and for the storeProvider
- allow start() to take a RemoteServiceSet
- private, final, formatting

Foreman
- added new state CANCELLATION_REQUESTED (via UserBitShared.proto) to represent
  the time between request of a cancellation, and acknowledgement from all
  remote endpoints running fragments on a query's behalf
- created ForemanResult to manage interleaving cleanup effects/failure with
  query result state
- does not need to implement Comparable
- does not need to implement Closeable
- thread blocking fixes
- add resultSent flag
- add code to log plan fragments with endpoint assignments
- added finals, cleaned up formatting
- do queue management in acquireQuerySemaphore; local tests pass
- rename getContext() to getQueryContext()
- retain DrillbitContext
- a couple of exception injections for testing
- minor formatting
- TODOs

FragmentContext
- added a DeferredException to collect errors during startup/shutdown sequences

FragmentExecutor
- eliminated CancelableQuery
- use the FragmentContext's DeferredException for errors
- common subexpression elimination
- cleaned up

QueryContext
- removed unnecessary functions (with some outside classes tweaked for this)
- finals, formatting

QueryManager
- merge in QueryStatus
  - affects Foreman, ../batch/ControlHandlerImpl,
    and ../../server/rest/ProfileResources
- made some methods private
- removed unused imports
- add finals and formatting
- variable renaming to improve readability
- formatting
- comments
- TODOs

QueryStatus
- getAsInfo() private
- member renaming
- member access changes
- formatting
- TODOs

QueryTestUtil, BaseTestQuery, TestDrillbitResilience
- make maxWidth a parameter to server startup

SelfCleaningRunnable
- created org.apache.drill.common.SelfCleaningRunnable

SingleRowListener
- created org.apache.drill.SingleRowListener results listener
- use in TestDrillbitResilience

TestComparisonFunctions
- fix not to close the FragmentContext multiple times

TestDrillbitResilience
- created org.apache.drill.exec.server.TestDrillbitResilience to test drillbit
  resilience in the face of exceptions and failures during queries

TestWithZookeeper
- factor out work into ZookeeperHelper so that it can be reused by
  TestDrillbitResilience

UserBitShared
- get rid of unused UNKNOWN_QUERY

WorkEventBus
- rename methods, affects Foreman and ControlHandlerImpl
- remove unused WorkerBee reference
- most members final
- formatting

WorkManager
- Closeable to AutoCloseable
- removed unused incomingFragments Set
- eliminated unnecessary eventThread and pendingTasks by posting Runnables
  directly to executor
- use SelfCleaningRunnable for Foreman management
- FragmentExecutor management uses SelfCleaningRunnable
- runningFragments to be a ConcurrentHashMap; TestTpchDistributed passes
- other improvements due to bee no longer needed in various places
- most members final
- minor formatting
- comments
- TODOs

(*) Created exception injection classes to simulate exceptions for testing
- ExceptionInjection
- ExceptionInjector
- ExceptionInjectionUtil
- TestExceptionInjection

DRILL-2245-hygiene: General code cleanup encountered while working on the rest
of this commit. This includes
- making members final whenever possible
- making members private whenever possible
- making loggers private
- removing unused imports
- removing unused private functions
- removing unused public functions
- removing unused local variables
- removing unused private members
- deleting unused files
- cleaning up formatting
  - adding spaces before braces in conditionals and loop bodies
  - breaking up overly long lines
  - removing extra blank lines

While I tried to keep this clean, this commit may have minor dependencies on
DRILL-2245-core that I missed. The intention is just to break this up for
review purposes. Either both commits should be applied, or neither.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/2da618cd
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/2da618cd
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/2da618cd

Branch: refs/heads/master
Commit: 2da618cd4da33ad55021935933bc28a18a658b74
Parents: ff9882b
Author: Chris Westin <cwestin@yahoo.com>
Authored: Wed Feb 25 10:59:36 2015 -0800
Committer: Jacques Nadeau <jacques@apache.org>
Committed: Thu Mar 19 14:06:55 2015 -0700

----------------------------------------------------------------------
 .../org/apache/drill/common/AutoCloseables.java |   45 +
 .../apache/drill/common/DeferredException.java  |  121 ++
 .../drill/common/SelfCleaningRunnable.java      |   52 +
 .../apache/drill/common/config/DrillConfig.java |    7 +-
 .../org/apache/drill/exec/ExecConstants.java    |    4 +
 .../apache/drill/exec/client/DrillClient.java   |   19 +-
 .../drill/exec/compile/ClassTransformer.java    |    7 +-
 .../coord/local/LocalClusterCoordinator.java    |   63 +-
 .../apache/drill/exec/ops/FragmentContext.java  |  144 +-
 .../org/apache/drill/exec/ops/QueryContext.java |  108 +-
 .../apache/drill/exec/opt/BasicOptimizer.java   |  172 +--
 .../drill/exec/opt/IdentityOptimizer.java       |   10 +-
 .../org/apache/drill/exec/opt/Optimizer.java    |   17 +-
 .../drill/exec/physical/impl/BaseRootExec.java  |    1 +
 .../drill/exec/physical/impl/RootExec.java      |    5 -
 .../drill/exec/physical/impl/ScreenCreator.java |    7 +-
 .../exec/physical/impl/SendingAccountor.java    |    1 +
 .../impl/materialize/QueryWritableBatch.java    |   14 +-
 .../OrderedPartitionRecordBatch.java            |    1 +
 .../impl/producer/ProducerConsumerBatch.java    |    5 +
 .../drill/exec/planner/fragment/Fragment.java   |    1 +
 .../planner/fragment/SimpleParallelizer.java    |    3 +-
 .../planner/sql/handlers/SetOptionHandler.java  |   21 +-
 .../org/apache/drill/exec/rpc/BasicClient.java  |    4 +-
 .../org/apache/drill/exec/rpc/BasicServer.java  |    3 +-
 .../drill/exec/rpc/NamedThreadFactory.java      |   56 +-
 .../drill/exec/rpc/ReconnectingConnection.java  |    1 +
 .../apache/drill/exec/rpc/RemoteConnection.java |    2 +
 .../java/org/apache/drill/exec/rpc/RpcBus.java  |    1 -
 .../drill/exec/rpc/control/Controller.java      |    2 +-
 .../drill/exec/rpc/control/ControllerImpl.java  |    2 +-
 .../drill/exec/rpc/control/WorkEventBus.java    |   49 +-
 .../exec/rpc/data/DataConnectionCreator.java    |    2 +-
 .../exec/rpc/data/DataResponseHandlerImpl.java  |   33 +-
 .../apache/drill/exec/rpc/data/DataTunnel.java  |    2 +
 .../apache/drill/exec/rpc/user/UserClient.java  |    2 +-
 .../apache/drill/exec/rpc/user/UserServer.java  |    1 -
 .../org/apache/drill/exec/server/Drillbit.java  |  158 +--
 .../drill/exec/server/DrillbitContext.java      |   37 +-
 .../server/options/DrillConfigIterator.java     |    4 -
 .../server/options/SessionOptionManager.java    |    5 +-
 .../server/options/SystemOptionManager.java     |    1 +
 .../server/rest/profile/ProfileResources.java   |   18 +-
 .../drill/exec/service/ServiceEngine.java       |    2 +-
 .../drill/exec/store/sys/local/FilePStore.java  |   23 +-
 .../drill/exec/testing/ExceptionInjection.java  |  133 ++
 .../drill/exec/testing/ExceptionInjector.java   |  112 ++
 .../drill/exec/testing/InjectionSite.java       |   72 +
 .../drill/exec/testing/SimulatedExceptions.java |  164 +++
 .../apache/drill/exec/work/CancelableQuery.java |   22 -
 .../apache/drill/exec/work/QueryWorkUnit.java   |   14 +-
 .../apache/drill/exec/work/StatusProvider.java  |   29 -
 .../org/apache/drill/exec/work/WorkManager.java |  255 ++--
 .../exec/work/batch/ControlHandlerImpl.java     |  101 +-
 .../exec/work/batch/SpoolingRawBatchBuffer.java |    1 +
 .../work/batch/UnlimitedRawBatchBuffer.java     |    1 +
 .../work/foreman/DrillbitStatusListener.java    |    2 +-
 .../apache/drill/exec/work/foreman/Foreman.java | 1004 +++++++++-----
 .../drill/exec/work/foreman/FragmentData.java   |   29 +-
 .../work/foreman/FragmentStatusListener.java    |    2 -
 .../drill/exec/work/foreman/QueryManager.java   |  323 ++++-
 .../drill/exec/work/foreman/QueryStatus.java    |  255 ----
 .../exec/work/fragment/FragmentExecutor.java    |  199 +--
 .../work/fragment/NonRootFragmentManager.java   |   12 +-
 .../exec/work/fragment/RootFragmentManager.java |    1 +
 .../java/org/apache/drill/BaseTestQuery.java    |   69 +-
 .../java/org/apache/drill/PlanTestBase.java     |   60 +-
 .../java/org/apache/drill/QueryTestUtil.java    |  154 +++
 .../org/apache/drill/SingleRowListener.java     |  138 ++
 .../test/java/org/apache/drill/TestBuilder.java |    4 +-
 .../org/apache/drill/TestTpchDistributed.java   |    4 +-
 .../apache/drill/exec/DrillSystemTestBase.java  |    6 +-
 .../apache/drill/exec/TestWithZookeeper.java    |   52 +-
 .../org/apache/drill/exec/ZookeeperHelper.java  |  102 ++
 .../exec/physical/impl/SimpleRootExec.java      |   10 +-
 .../physical/impl/TestComparisonFunctions.java  |   20 +-
 .../exec/physical/impl/TestOptiqPlans.java      |    9 +-
 .../exec/server/TestDrillbitResilience.java     |  398 ++++++
 .../exec/testing/ExceptionInjectionUtil.java    |   82 ++
 .../exec/testing/TestExceptionInjection.java    |  192 +++
 .../apache/drill/jdbc/DrillConnectionImpl.java  |    1 +
 .../org/apache/drill/jdbc/test/JdbcAssert.java  |   10 +-
 protocol/pom.xml                                |   20 +-
 .../apache/drill/common/types/TypeProtos.java   |   64 +-
 .../org/apache/drill/exec/proto/BitControl.java |  372 +++---
 .../org/apache/drill/exec/proto/BitData.java    |  153 +--
 .../drill/exec/proto/CoordinationProtos.java    |  177 ++-
 .../org/apache/drill/exec/proto/ExecProtos.java |   59 +-
 .../drill/exec/proto/GeneralRPCProtos.java      |  134 +-
 .../drill/exec/proto/SchemaDefProtos.java       |   15 +-
 .../apache/drill/exec/proto/UserBitShared.java  | 1232 ++++++++----------
 .../org/apache/drill/exec/proto/UserProtos.java |  298 +++--
 .../drill/exec/proto/beans/QueryResult.java     |    4 +-
 protocol/src/main/protobuf/UserBitShared.proto  |    6 +-
 94 files changed, 4667 insertions(+), 3145 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/2da618cd/common/src/main/java/org/apache/drill/common/AutoCloseables.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/AutoCloseables.java b/common/src/main/java/org/apache/drill/common/AutoCloseables.java
new file mode 100644
index 0000000..39c5d78
--- /dev/null
+++ b/common/src/main/java/org/apache/drill/common/AutoCloseables.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.common;
+
+import org.slf4j.Logger;
+
+/**
+ * Utilities for AutoCloseable classes.
+ */
+public class AutoCloseables {
+  /**
+   * Close an {@link AutoCloseable}, catching and logging any exceptions at
+   * INFO level.
+   *
+   * <p>This can be dangerous if there is any possibility of recovery. See
+   * the <a href="https://code.google.com/p/guava-libraries/issues/detail?id=1118">
+   * notes regarding the deprecation of Guava's
+   * {@link com.google.common.io.Closeables#closeQuietly}</a>.
+   *
+   * @param ac the AutoCloseable to close
+   * @param logger the logger to use to record the exception if there was one
+   */
+  public static void close(final AutoCloseable ac, final Logger logger) {
+    try {
+      ac.close();
+    } catch(Exception e) {
+      logger.info("Failure on close(): " + e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/2da618cd/common/src/main/java/org/apache/drill/common/DeferredException.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/DeferredException.java b/common/src/main/java/org/apache/drill/common/DeferredException.java
new file mode 100644
index 0000000..99f18f1
--- /dev/null
+++ b/common/src/main/java/org/apache/drill/common/DeferredException.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.common;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Collects one or more exceptions that may occur, using
+ * <a href="http://docs.oracle.com/javase/tutorial/essential/exceptions/tryResourceClose.html#suppressed-exceptions">
+ * suppressed exceptions</a>.
+ * When this AutoCloseable is closed, if there was an exception added, it will be thrown. If more than one
+ * exception was added, then all but the first will be added to the first as suppressed
+ * exceptions.
+ *
+ * <p>This class is thread safe.
+ */
+public class DeferredException implements AutoCloseable {
+  private Exception exception = null;
+  private boolean isClosed = false;
+
+  /**
+   * Add an exception. If this is the first exception added, it will be the one
+   * that is thrown when this is closed. If not the first exception, then it will
+   * be added to the suppressed exceptions on the first exception.
+   *
+   * @param exception the exception to add
+   */
+  public void addException(final Exception exception) {
+    Preconditions.checkNotNull(exception);
+
+    synchronized(this) {
+      Preconditions.checkState(!isClosed);
+
+      if (this.exception == null) {
+        this.exception = exception;
+      } else {
+        this.exception.addSuppressed(exception);
+      }
+    }
+  }
+
+  public void addThrowable(final Throwable throwable) {
+    Preconditions.checkNotNull(throwable);
+
+    if (throwable instanceof Exception) {
+      addException((Exception) throwable);
+      return;
+    }
+
+    addException(new RuntimeException(throwable));
+  }
+
+  /**
+   * Get the deferred exception, if there is one. Note that if this returns null,
+   * the result could change at any time.
+   *
+   * @return the deferred exception, or null
+   */
+  public Exception getException() {
+    synchronized(this) {
+      return exception;
+    }
+  }
+
+  /**
+   * Close the given AutoCloseable, suppressing any exceptions that are thrown.
+   * If an exception is thrown, the rules for {@link #addException(Exception)}
+   * are followed.
+   *
+   * @param autoCloseable the AutoCloseable to close; may be null
+   */
+  public void suppressingClose(final AutoCloseable autoCloseable) {
+    synchronized(this) {
+      /*
+       * For the sake of detecting code that doesn't follow the conventions,
+       * we want this to complain whether the closeable exists or not.
+       */
+      Preconditions.checkState(!isClosed);
+
+      if (autoCloseable == null) {
+        return;
+      }
+
+      try {
+        autoCloseable.close();
+      } catch(Exception e) {
+        addException(e);
+      }
+    }
+  }
+
+  @Override
+  public void close() throws Exception {
+    synchronized(this) {
+      Preconditions.checkState(!isClosed);
+
+      try {
+        if (exception != null) {
+          throw exception;
+        }
+      } finally {
+        isClosed = true;
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/2da618cd/common/src/main/java/org/apache/drill/common/SelfCleaningRunnable.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/SelfCleaningRunnable.java b/common/src/main/java/org/apache/drill/common/SelfCleaningRunnable.java
new file mode 100644
index 0000000..6a69721
--- /dev/null
+++ b/common/src/main/java/org/apache/drill/common/SelfCleaningRunnable.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.common;
+
+/**
+ * A wrapper for Runnables that provides a hook to do cleanup.
+ */
+public abstract class SelfCleaningRunnable implements Runnable {
+  private final Runnable runnable;
+
+  /**
+   * Constructor.
+   *
+   * @param runnable the Runnable to wrap
+   */
+  public SelfCleaningRunnable(final Runnable runnable) {
+    this.runnable = runnable;
+  }
+
+  @Override
+  public void run() {
+    try {
+      runnable.run();
+    } finally {
+      cleanup();
+    }
+  }
+
+  /**
+   * Cleanup.
+   *
+   * <p>Derived classes should put any necessary cleanup in here. This
+   * is guaranteed to be called, even if the wrapped Runnable throws an
+   * exception.
+   */
+  protected abstract void cleanup();
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/2da618cd/common/src/main/java/org/apache/drill/common/config/DrillConfig.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/config/DrillConfig.java b/common/src/main/java/org/apache/drill/common/config/DrillConfig.java
index e8b2478..2b9b740 100644
--- a/common/src/main/java/org/apache/drill/common/config/DrillConfig.java
+++ b/common/src/main/java/org/apache/drill/common/config/DrillConfig.java
@@ -46,8 +46,7 @@ import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 
 public final class DrillConfig extends NestedConfig{
-
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillConfig.class);
+//  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillConfig.class);
   private final ObjectMapper mapper;
   private final ImmutableList<String> startupArguments;
   @SuppressWarnings("restriction")  private static final long MAX_DIRECT_MEMORY = sun.misc.VM.maxDirectMemory();
@@ -219,14 +218,12 @@ public final class DrillConfig extends NestedConfig{
     return this.root().render();
   }
 
-  public static void main(String[] args)  throws Exception{
+  public static void main(String[] args)  throws Exception {
     //"-XX:MaxDirectMemorySize"
     DrillConfig config = DrillConfig.create();
-
   }
 
   public static long getMaxDirectMemory() {
     return MAX_DIRECT_MEMORY;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/2da618cd/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 93d06f0..cd0a0a2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -202,4 +202,8 @@ public interface ExecConstants {
 
   public static final String ENABLE_WINDOW_FUNCTIONS = "window.enable";
   public static final OptionValidator ENABLE_WINDOW_FUNCTIONS_VALIDATOR = new BooleanValidator(ENABLE_WINDOW_FUNCTIONS, false);
+
+  public static final String DRILLBIT_EXCEPTION_INJECTIONS = "drill.exec.testing.exception-injections";
+  public static final OptionValidator DRILLBIT_EXCEPTION_INJECTIONS_VALIDATOR =
+      new StringValidator(DRILLBIT_EXCEPTION_INJECTIONS, "");
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/2da618cd/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
index 04b955b..c3a873c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
@@ -98,7 +98,7 @@ public class DrillClient implements Closeable, ConnectionThrottle{
   public DrillClient(DrillConfig config, ClusterCoordinator coordinator, BufferAllocator allocator) {
     this.ownsZkConnection = coordinator == null;
     this.ownsAllocator = allocator == null;
-    this.allocator = allocator == null ? new TopLevelAllocator(config) : allocator;
+    this.allocator = ownsAllocator ? new TopLevelAllocator(config) : allocator;
     this.config = config;
     this.clusterCoordinator = coordinator;
     this.reconnectTimes = config.getInt(ExecConstants.BIT_RETRY_TIMES);
@@ -131,7 +131,7 @@ public class DrillClient implements Closeable, ConnectionThrottle{
   /**
    * Connects the client to a Drillbit server
    *
-   * @throws IOException
+   * @throws RpcException
    */
   public void connect() throws RpcException {
     connect(null, new Properties());
@@ -176,7 +176,7 @@ public class DrillClient implements Closeable, ConnectionThrottle{
     connected = true;
   }
 
-  protected EventLoopGroup createEventLoop(int size, String prefix) {
+  protected static EventLoopGroup createEventLoop(int size, String prefix) {
     return TransportCheck.createEventLoopGroup(size, prefix);
   }
 
@@ -204,12 +204,8 @@ public class DrillClient implements Closeable, ConnectionThrottle{
 
   private void connect(DrillbitEndpoint endpoint) throws RpcException {
     FutureHandler f = new FutureHandler();
-    try {
-      client.connect(f, endpoint, props, getUserCredentials());
-      f.checkedGet();
-    } catch (InterruptedException e) {
-      throw new RpcException(e);
-    }
+    client.connect(f, endpoint, props, getUserCredentials());
+    f.checkedGet();
   }
 
   public BufferAllocator getAllocator() {
@@ -219,6 +215,7 @@ public class DrillClient implements Closeable, ConnectionThrottle{
   /**
    * Closes this client's connection to the server
    */
+  @Override
   public void close() {
     if (this.client != null) {
       this.client.close();
@@ -286,15 +283,13 @@ public class DrillClient implements Closeable, ConnectionThrottle{
    * Submits a Logical plan for direct execution (bypasses parsing)
    *
    * @param plan the plan to execute
-   * @return a handle for the query result
-   * @throws RpcException
    */
   public void runQuery(QueryType type, String plan, UserResultsListener resultsListener) {
     client.submitQuery(resultsListener, newBuilder().setResultsMode(STREAM_FULL).setType(type).setPlan(plan).build());
   }
 
   private class ListHoldingResultsListener implements UserResultsListener {
-    private Vector<QueryResultBatch> results = new Vector<QueryResultBatch>();
+    private Vector<QueryResultBatch> results = new Vector<>();
     private SettableFuture<List<QueryResultBatch>> future = SettableFuture.create();
     private UserProtos.RunQuery query ;
 

http://git-wip-us.apache.org/repos/asf/drill/blob/2da618cd/exec/java-exec/src/main/java/org/apache/drill/exec/compile/ClassTransformer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/ClassTransformer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/ClassTransformer.java
index 5c65724..493f6ce 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/ClassTransformer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/ClassTransformer.java
@@ -294,14 +294,11 @@ public class ClassTransformer {
       if (templateDefinition.getExternalInterface().isAssignableFrom(c)) {
         logger.debug("Done compiling (bytecode size={}, time:{} millis).", DrillStringUtils.readable(totalBytecodeSize), (System.nanoTime() - t1) / 1000000);
         return c;
-      } else {
-        throw new ClassTransformationException("The requested class did not implement the expected interface.");
       }
+
+      throw new ClassTransformationException("The requested class did not implement the expected interface.");
     } catch (CompileException | IOException | ClassNotFoundException e) {
       throw new ClassTransformationException(String.format("Failure generating transformation classes for value: \n %s", entireClass), e);
     }
-
   }
-
 }
-

http://git-wip-us.apache.org/repos/asf/drill/blob/2da618cd/exec/java-exec/src/main/java/org/apache/drill/exec/coord/local/LocalClusterCoordinator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/local/LocalClusterCoordinator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/local/LocalClusterCoordinator.java
index 035c1aa..27868f0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/local/LocalClusterCoordinator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/local/LocalClusterCoordinator.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
@@ -32,10 +33,15 @@ import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import com.google.common.collect.Maps;
 
 public class LocalClusterCoordinator extends ClusterCoordinator {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LocalClusterCoordinator.class);
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LocalClusterCoordinator.class);
 
-  private volatile Map<RegistrationHandle, DrillbitEndpoint> endpoints = Maps.newConcurrentMap();
-  private volatile ConcurrentMap<String, DistributedSemaphore> semaphores = Maps.newConcurrentMap();
+  /*
+   * Since we hand out the endpoints list in {@see #getAvailableEndpoints()}, we use a
+   * {@see java.util.concurrent.ConcurrentHashMap} because those guarantee not to throw
+   * ConcurrentModificationException.
+   */
+  private final Map<RegistrationHandle, DrillbitEndpoint> endpoints = new ConcurrentHashMap<>();
+  private final ConcurrentMap<String, DistributedSemaphore> semaphores = Maps.newConcurrentMap();
 
   @Override
   public void close() throws IOException {
@@ -43,21 +49,21 @@ public class LocalClusterCoordinator extends ClusterCoordinator {
   }
 
   @Override
-  public void start(long millis) throws Exception {
+  public void start(final long millis) throws Exception {
     logger.debug("Local Cluster Coordinator started.");
   }
 
   @Override
-  public RegistrationHandle register(DrillbitEndpoint data) {
+  public RegistrationHandle register(final DrillbitEndpoint data) {
     logger.debug("Endpoint registered {}.", data);
-    Handle h = new Handle();
+    final Handle h = new Handle();
     endpoints.put(h, data);
     return h;
   }
 
   @Override
-  public void unregister(RegistrationHandle handle) {
-    if(handle == null) {
+  public void unregister(final RegistrationHandle handle) {
+    if (handle == null) {
       return;
     }
 
@@ -69,8 +75,8 @@ public class LocalClusterCoordinator extends ClusterCoordinator {
     return endpoints.values();
   }
 
-  private class Handle implements RegistrationHandle{
-    UUID id = UUID.randomUUID();
+  private class Handle implements RegistrationHandle {
+    private final UUID id = UUID.randomUUID();
 
     @Override
     public int hashCode() {
@@ -82,7 +88,7 @@ public class LocalClusterCoordinator extends ClusterCoordinator {
     }
 
     @Override
-    public boolean equals(Object obj) {
+    public boolean equals(final Object obj) {
       if (this == obj) {
         return true;
       }
@@ -92,7 +98,7 @@ public class LocalClusterCoordinator extends ClusterCoordinator {
       if (getClass() != obj.getClass()) {
         return false;
       }
-      Handle other = (Handle) obj;
+      final Handle other = (Handle) obj;
       if (!getOuterType().equals(other.getOuterType())) {
         return false;
       }
@@ -109,41 +115,38 @@ public class LocalClusterCoordinator extends ClusterCoordinator {
     private LocalClusterCoordinator getOuterType() {
       return LocalClusterCoordinator.this;
     }
-
   }
 
   @Override
-  public DistributedSemaphore getSemaphore(String name, int maximumLeases) {
-    semaphores.putIfAbsent(name, new LocalSemaphore(maximumLeases));
+  public DistributedSemaphore getSemaphore(final String name, final int maximumLeases) {
+    if (!semaphores.containsKey(name)) {
+      semaphores.putIfAbsent(name, new LocalSemaphore(maximumLeases));
+    }
     return semaphores.get(name);
   }
 
-  public class LocalSemaphore implements DistributedSemaphore{
-
-    private final Semaphore inner;
-    private final LocalLease lease = new LocalLease();
+  public class LocalSemaphore implements DistributedSemaphore {
+    private final Semaphore semaphore;
+    private final LocalLease localLease = new LocalLease();
 
-    public LocalSemaphore(int size) {
-      inner = new Semaphore(size);
+    public LocalSemaphore(final int size) {
+      semaphore = new Semaphore(size);
     }
 
     @Override
-    public DistributedLease acquire(long timeout, TimeUnit unit) throws Exception {
-      if(!inner.tryAcquire(timeout, unit)) {
+    public DistributedLease acquire(final long timeout, final TimeUnit timeUnit) throws Exception {
+      if (!semaphore.tryAcquire(timeout, timeUnit)) {
         return null;
-      }else{
-        return lease;
+      } else {
+        return localLease;
       }
     }
 
-    private class LocalLease implements DistributedLease{
-
+    private class LocalLease implements DistributedLease {
       @Override
       public void close() throws Exception {
-        inner.release();
+        semaphore.release();
       }
-
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/2da618cd/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
index 2a6660e..5e31e5c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
@@ -19,7 +19,6 @@ package org.apache.drill.exec.ops;
 
 import io.netty.buffer.DrillBuf;
 
-import java.io.Closeable;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
@@ -27,9 +26,9 @@ import java.util.Map;
 import net.hydromatic.optiq.SchemaPlus;
 import net.hydromatic.optiq.jdbc.SimpleOptiqSchema;
 
+import org.apache.drill.common.DeferredException;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.compile.QueryClassLoader;
 import org.apache.drill.exec.exception.ClassTransformationException;
 import org.apache.drill.exec.expr.ClassGenerator;
 import org.apache.drill.exec.expr.CodeGenerator;
@@ -39,7 +38,6 @@ import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.proto.BitControl.PlanFragment;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
-import org.apache.drill.exec.proto.UserBitShared.UserCredentials;
 import org.apache.drill.exec.rpc.control.ControlTunnel;
 import org.apache.drill.exec.rpc.data.DataTunnel;
 import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
@@ -49,53 +47,52 @@ import org.apache.drill.exec.server.options.OptionList;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.work.batch.IncomingBuffers;
 
-import com.carrotsearch.hppc.LongObjectOpenHashMap;
-import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
 /**
  * Contextual objects required for execution of a particular fragment.
  */
 public class FragmentContext implements AutoCloseable, UdfUtilities {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentContext.class);
-
-
-  private Map<DrillbitEndpoint, DataTunnel> tunnels = Maps.newHashMap();
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentContext.class);
 
+  private final Map<DrillbitEndpoint, DataTunnel> tunnels = Maps.newHashMap();
   private final DrillbitContext context;
   private final UserClientConnection connection;
   private final FragmentStats stats;
   private final FunctionImplementationRegistry funcRegistry;
-  private final QueryClassLoader loader;
   private final BufferAllocator allocator;
   private final PlanFragment fragment;
-  private List<Thread> daemonThreads = Lists.newLinkedList();
-  private QueryDateTimeInfo queryDateTimeInfo;
+  private final QueryDateTimeInfo queryDateTimeInfo;
   private IncomingBuffers buffers;
   private final OptionManager fragmentOptions;
-  private final UserCredentials credentials;
   private final BufferManager bufferManager;
 
-  private volatile Throwable failureCause;
+  private final DeferredException deferredException = new DeferredException();
   private volatile FragmentContextState state = FragmentContextState.OK;
 
+  /*
+   * TODO we need a state that indicates that cancellation has been requested and
+   * is in progress. Early termination (such as from limit queries) could also use
+   * this, as the cleanup steps should be exactly the same.
+   */
   private static enum FragmentContextState {
     OK,
     FAILED,
     CANCELED
   }
 
-  public FragmentContext(DrillbitContext dbContext, PlanFragment fragment, UserClientConnection connection,
-      FunctionImplementationRegistry funcRegistry) throws OutOfMemoryException, ExecutionSetupException {
-
+  public FragmentContext(final DrillbitContext dbContext, final PlanFragment fragment,
+      final UserClientConnection connection, final FunctionImplementationRegistry funcRegistry)
+    throws ExecutionSetupException {
     this.context = dbContext;
     this.connection = connection;
     this.fragment = fragment;
     this.funcRegistry = funcRegistry;
-    this.credentials = fragment.getCredentials();
-    this.queryDateTimeInfo = new QueryDateTimeInfo(fragment.getQueryStartTime(), fragment.getTimeZone());
+    queryDateTimeInfo = new QueryDateTimeInfo(fragment.getQueryStartTime(), fragment.getTimeZone());
+
     logger.debug("Getting initial memory allocation of {}", fragment.getMemInitial());
     logger.debug("Fragment max allocation: {}", fragment.getMemMax());
+
     try {
       OptionList list;
       if (!fragment.hasOptionsJson() || fragment.getOptionsJson().isEmpty()) {
@@ -103,7 +100,7 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
       } else {
         list = dbContext.getConfig().getMapper().readValue(fragment.getOptionsJson(), OptionList.class);
       }
-      this.fragmentOptions = new FragmentOptionManager(context.getOptionManager(), list);
+      fragmentOptions = new FragmentOptionManager(context.getOptionManager(), list);
     } catch (Exception e) {
       throw new ExecutionSetupException("Failure while reading plan options.", e);
     }
@@ -111,15 +108,15 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
     // Add the fragment context to the root allocator.
     // The QueryManager will call the root allocator to recalculate all the memory limits for all the fragments
     try {
-      this.allocator = dbContext.getAllocator().getChildAllocator(this, fragment.getMemInitial(), fragment.getMemMax(), true);
+      allocator = context.getAllocator().getChildAllocator(
+          this, fragment.getMemInitial(), fragment.getMemMax(), true);
       assert (allocator != null);
-    }catch(Throwable e){
+    } catch(Throwable e) {
       throw new ExecutionSetupException("Failure while getting memory allocator for fragment.", e);
     }
-    this.stats = new FragmentStats(allocator, dbContext.getMetrics(), fragment.getAssignment());
-    this.bufferManager = new BufferManager(this.allocator, this);
 
-    this.loader = new QueryClassLoader(dbContext.getConfig(), fragmentOptions);
+    stats = new FragmentStats(allocator, dbContext.getMetrics(), fragment.getAssignment());
+    bufferManager = new BufferManager(this.allocator, this);
   }
 
   public OptionManager getOptions() {
@@ -133,7 +130,7 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
   public void fail(Throwable cause) {
     logger.error("Fragment Context received failure.", cause);
     setState(FragmentContextState.FAILED);
-    failureCause = cause;
+    deferredException.addThrowable(cause);
   }
 
   public void cancel() {
@@ -161,11 +158,11 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
       fail(new UnsupportedOperationException("Schema tree can only be created in root fragment. " +
           "This is a non-root fragment."));
       return null;
-    } else {
-      SchemaPlus root = SimpleOptiqSchema.createRootSchema(false);
-      context.getStorage().getSchemaFactory().registerSchemas(connection.getSession(), root);
-      return root;
     }
+
+    final SchemaPlus root = SimpleOptiqSchema.createRootSchema(false);
+    context.getStorage().getSchemaFactory().registerSchemas(connection.getSession(), root);
+    return root;
   }
 
   /**
@@ -177,14 +174,17 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
   }
 
   public FragmentStats getStats() {
-    return this.stats;
+    return stats;
   }
 
+  @Override
   public QueryDateTimeInfo getQueryDateTimeInfo(){
     return this.queryDateTimeInfo;
   }
 
-  public DrillbitEndpoint getForemanEndpoint() {return fragment.getForeman();}
+  public DrillbitEndpoint getForemanEndpoint() {
+    return fragment.getForeman();
+  }
 
   /**
    * The FragmentHandle for this Fragment
@@ -194,31 +194,37 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
     return fragment.getHandle();
   }
 
+  private String getFragIdString() {
+    final FragmentHandle handle = getHandle();
+    final String frag = handle != null ? handle.getMajorFragmentId() + ":" + handle.getMinorFragmentId() : "0:0";
+    return frag;
+  }
+
   /**
    * Get this fragment's allocator.
-   * @return
+   * @return the allocator
    */
   @Deprecated
   public BufferAllocator getAllocator() {
-    if(allocator == null){
-      FragmentHandle handle=getHandle();
-      String frag=handle!=null?handle.getMajorFragmentId()+":"+handle.getMinorFragmentId():"0:0";
-      logger.debug("Fragment:"+frag+" Allocator is NULL");
+    if (allocator == null) {
+      logger.debug("Fragment: " + getFragIdString() + " Allocator is NULL");
     }
     return allocator;
   }
 
-  public BufferAllocator getNewChildAllocator(long initialReservation,
-                                              long maximumReservation,
-                                              boolean applyFragmentLimit) throws OutOfMemoryException {
+  public BufferAllocator getNewChildAllocator(final long initialReservation,
+                                              final long maximumReservation,
+                                              final boolean applyFragmentLimit) throws OutOfMemoryException {
     return allocator.getChildAllocator(this, initialReservation, maximumReservation, applyFragmentLimit);
   }
 
-  public <T> T getImplementationClass(ClassGenerator<T> cg) throws ClassTransformationException, IOException {
+  public <T> T getImplementationClass(final ClassGenerator<T> cg)
+      throws ClassTransformationException, IOException {
     return getImplementationClass(cg.getCodeGenerator());
   }
 
-  public <T> T getImplementationClass(CodeGenerator<T> cg) throws ClassTransformationException, IOException {
+  public <T> T getImplementationClass(final CodeGenerator<T> cg)
+      throws ClassTransformationException, IOException {
     return context.getCompiler().getImplementationClass(cg);
   }
 
@@ -238,7 +244,7 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
     return connection;
   }
 
-  public ControlTunnel getControlTunnel(DrillbitEndpoint endpoint) {
+  public ControlTunnel getControlTunnel(final DrillbitEndpoint endpoint) {
     return context.getController().getTunnel(endpoint);
   }
 
@@ -251,24 +257,12 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
     return tunnel;
   }
 
-  /**
-   * Add a new thread to this fragment's context. This thread will likely run for the life of the fragment but should be
-   * terminated when the fragment completes. When the fragment completes, the threads will be interrupted.
-   *
-   * @param thread
-   */
-  public void addDaemonThread(Thread thread) {
-    daemonThreads.add(thread);
-    thread.start();
-
-  }
-
   public IncomingBuffers getBuffers() {
     return buffers;
   }
 
   public Throwable getFailureCause() {
-    return failureCause;
+    return deferredException.getException();
   }
 
   public boolean isFailed() {
@@ -283,43 +277,36 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
     return funcRegistry;
   }
 
-  public UserCredentials getCredentials() {
-    return credentials;
-  }
-
-  public QueryClassLoader getClassLoader() {
-    return loader;
-  }
-
   public DrillConfig getConfig() {
     return context.getConfig();
   }
 
-  public void setFragmentLimit(long limit) {
-    this.allocator.setFragmentLimit(limit);
+  public void setFragmentLimit(final long limit) {
+    allocator.setFragmentLimit(limit);
+  }
+
+  public DeferredException getDeferredException() {
+    return deferredException;
   }
 
   @Override
   public void close() throws Exception {
-    for (Thread thread: daemonThreads) {
-      thread.interrupt();
-    }
-    bufferManager.close();
-
-    if (buffers != null) {
-      buffers.close();
-    }
+    /*
+     * TODO wait for threads working on this Fragment to terminate (or at least stop working
+     * on this Fragment's query)
+     */
+    deferredException.suppressingClose(bufferManager);
+    deferredException.suppressingClose(buffers);
+    deferredException.suppressingClose(allocator);
 
-    FragmentHandle handle=getHandle();
-    String frag=handle!=null?handle.getMajorFragmentId()+":"+handle.getMinorFragmentId():"0:0";
-    allocator.close();
-    logger.debug("Fragment:"+frag+" After close allocator is: "+allocator!=null?"OK":"NULL");
+    deferredException.close(); // must be last, as this may throw
   }
 
   public DrillBuf replace(DrillBuf old, int newSize) {
     return bufferManager.replace(old, newSize);
   }
 
+  @Override
   public DrillBuf getManagedBuffer() {
     return bufferManager.getManagedBuffer();
   }
@@ -327,5 +314,4 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
   public DrillBuf getManagedBuffer(int size) {
     return bufferManager.getManagedBuffer(size);
   }
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/2da618cd/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
index fb6b4aa..3b51a69 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
@@ -24,35 +24,30 @@ import net.hydromatic.optiq.SchemaPlus;
 import net.hydromatic.optiq.jdbc.SimpleOptiqSchema;
 
 import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.exec.coord.ClusterCoordinator;
 import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
 import org.apache.drill.exec.expr.fn.impl.DateUtility;
-import org.apache.drill.exec.planner.PhysicalPlanReader;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.planner.sql.DrillOperatorTable;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-import org.apache.drill.exec.proto.UserBitShared.QueryId;
-import org.apache.drill.exec.rpc.control.WorkEventBus;
-import org.apache.drill.exec.rpc.data.DataConnectionCreator;
 import org.apache.drill.exec.rpc.user.UserSession;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.server.options.QueryOptionManager;
 import org.apache.drill.exec.store.StoragePluginRegistry;
 
+// TODO except for a couple of tests, this is only created by Foreman
+// TODO the many methods that just return drillbitContext.getXxx() should be replaced with getDrillbitContext()
 // TODO - consider re-name to PlanningContext, as the query execution context actually appears
 // in fragment contexts
-public class QueryContext implements AutoCloseable, UdfUtilities{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryContext.class);
+public class QueryContext implements AutoCloseable, UdfUtilities {
+//  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryContext.class);
 
-  private final QueryId queryId;
   private final DrillbitContext drillbitContext;
-  private final WorkEventBus workBus;
-  private UserSession session;
-  private OptionManager queryOptions;
+  private final UserSession session;
+  private final OptionManager queryOptions;
   private final PlannerSettings plannerSettings;
   private final DrillOperatorTable table;
 
@@ -62,32 +57,32 @@ public class QueryContext implements AutoCloseable, UdfUtilities{
   private static final int INITIAL_OFF_HEAP_ALLOCATION_IN_BYTES = 1024 * 1024;
   private static final int MAX_OFF_HEAP_ALLOCATION_IN_BYTES = 16 * 1024 * 1024;
 
-  // flag to indicate if close has been called, after calling close the first
-  // time this is set to true and the close method becomes a no-op
+  /*
+   * Flag to indicate if close has been called, after calling close the first
+   * time this is set to true and the close method becomes a no-op.
+   */
   private boolean closed = false;
 
-  public QueryContext(final UserSession session, QueryId queryId, final DrillbitContext drllbitContext) {
-    super();
-    this.queryId = queryId;
+  public QueryContext(final UserSession session, final DrillbitContext drllbitContext) {
     this.drillbitContext = drllbitContext;
-    this.workBus = drllbitContext.getWorkBus();
     this.session = session;
-    this.queryOptions = new QueryOptionManager(session.getOptions());
-    this.plannerSettings = new PlannerSettings(queryOptions, getFunctionRegistry());
+    queryOptions = new QueryOptionManager(session.getOptions());
+    plannerSettings = new PlannerSettings(queryOptions, getFunctionRegistry());
     plannerSettings.setNumEndPoints(drillbitContext.getBits().size());
-    this.table = new DrillOperatorTable(getFunctionRegistry());
+    table = new DrillOperatorTable(getFunctionRegistry());
 
-    long queryStartTime = System.currentTimeMillis();
-    int timeZone = DateUtility.getIndex(System.getProperty("user.timezone"));
-    this.queryDateTimeInfo = new QueryDateTimeInfo(queryStartTime, timeZone);
+    final long queryStartTime = System.currentTimeMillis();
+    final int timeZone = DateUtility.getIndex(System.getProperty("user.timezone"));
+    queryDateTimeInfo = new QueryDateTimeInfo(queryStartTime, timeZone);
 
     try {
-      this.allocator = drllbitContext.getAllocator().getChildAllocator(null, INITIAL_OFF_HEAP_ALLOCATION_IN_BYTES, MAX_OFF_HEAP_ALLOCATION_IN_BYTES, false);
+      allocator = drllbitContext.getAllocator().getChildAllocator(null, INITIAL_OFF_HEAP_ALLOCATION_IN_BYTES,
+          MAX_OFF_HEAP_ALLOCATION_IN_BYTES, false);
     } catch (OutOfMemoryException e) {
       throw new DrillRuntimeException("Error creating off-heap allocator for planning context.",e);
     }
     // TODO(DRILL-1942) the new allocator has this capability built-in, so this can be removed once that is available
-    this.bufferManager = new BufferManager(this.allocator, null);
+    bufferManager = new BufferManager(this.allocator, null);
   }
 
 
@@ -95,7 +90,7 @@ public class QueryContext implements AutoCloseable, UdfUtilities{
     return plannerSettings;
   }
 
-  public UserSession getSession(){
+  public UserSession getSession() {
     return session;
   }
 
@@ -103,18 +98,18 @@ public class QueryContext implements AutoCloseable, UdfUtilities{
     return allocator;
   }
 
-  public SchemaPlus getNewDefaultSchema(){
-    SchemaPlus rootSchema = getRootSchema();
-    SchemaPlus defaultSchema = session.getDefaultSchema(rootSchema);
-    if(defaultSchema == null){
+  public SchemaPlus getNewDefaultSchema() {
+    final SchemaPlus rootSchema = getRootSchema();
+    final SchemaPlus defaultSchema = session.getDefaultSchema(rootSchema);
+    if (defaultSchema == null) {
       return rootSchema;
-    }else{
-      return defaultSchema;
     }
+
+    return defaultSchema;
   }
 
-  public SchemaPlus getRootSchema(){
-    SchemaPlus rootSchema = SimpleOptiqSchema.createRootSchema(false);
+  public SchemaPlus getRootSchema() {
+    final SchemaPlus rootSchema = SimpleOptiqSchema.createRootSchema(false);
     drillbitContext.getSchemaFactory().registerSchemas(session, rootSchema);
     return rootSchema;
   }
@@ -123,43 +118,23 @@ public class QueryContext implements AutoCloseable, UdfUtilities{
     return queryOptions;
   }
 
-  public OptionManager getSessionOptions() {
-    return session.getOptions();
-  }
-
-  public DrillbitEndpoint getCurrentEndpoint(){
+  public DrillbitEndpoint getCurrentEndpoint() {
     return drillbitContext.getEndpoint();
   }
 
-  public QueryId getQueryId() {
-    return queryId;
-  }
-
-  public StoragePluginRegistry getStorage(){
+  public StoragePluginRegistry getStorage() {
     return drillbitContext.getStorage();
   }
 
-  public Collection<DrillbitEndpoint> getActiveEndpoints(){
+  public Collection<DrillbitEndpoint> getActiveEndpoints() {
     return drillbitContext.getBits();
   }
 
-  public PhysicalPlanReader getPlanReader(){
-    return drillbitContext.getPlanReader();
-  }
-
-  public DataConnectionCreator getDataConnectionsPool(){
-    return drillbitContext.getDataConnectionsPool();
-  }
-
-  public DrillConfig getConfig(){
+  public DrillConfig getConfig() {
     return drillbitContext.getConfig();
   }
 
-  public WorkEventBus getWorkBus(){
-    return workBus;
-  }
-
-  public FunctionImplementationRegistry getFunctionRegistry(){
+  public FunctionImplementationRegistry getFunctionRegistry() {
     return drillbitContext.getFunctionImplementationRegistry();
   }
 
@@ -167,10 +142,6 @@ public class QueryContext implements AutoCloseable, UdfUtilities{
     return table;
   }
 
-  public ClusterCoordinator getClusterCoordinator() {
-    return drillbitContext.getClusterCoordinator();
-  }
-
   @Override
   public QueryDateTimeInfo getQueryDateTimeInfo() {
     return queryDateTimeInfo;
@@ -183,10 +154,13 @@ public class QueryContext implements AutoCloseable, UdfUtilities{
 
   @Override
   public void close() throws Exception {
-    if (!closed) {
-      // TODO(DRILL-1942) the new allocator has this capability built-in, so this can be removed once that is available
-      bufferManager.close();
-      allocator.close();
+    try {
+      if (!closed) {
+        // TODO(DRILL-1942) the new allocator has this capability built-in, so this can be removed once that is available
+        bufferManager.close();
+        allocator.close();
+      }
+    } finally {
       closed = true;
     }
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/2da618cd/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
index 9f89f24..b1a71a5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
@@ -29,6 +29,7 @@ import org.apache.drill.common.logical.data.Filter;
 import org.apache.drill.common.logical.data.GroupingAggregate;
 import org.apache.drill.common.logical.data.Join;
 import org.apache.drill.common.logical.data.JoinCondition;
+import org.apache.drill.common.logical.data.LogicalOperator;
 import org.apache.drill.common.logical.data.NamedExpression;
 import org.apache.drill.common.logical.data.Order;
 import org.apache.drill.common.logical.data.Order.Ordering;
@@ -52,10 +53,8 @@ import org.apache.drill.exec.physical.config.SelectionVectorRemover;
 import org.apache.drill.exec.physical.config.Sort;
 import org.apache.drill.exec.physical.config.StreamingAggregate;
 import org.apache.drill.exec.physical.config.WindowPOP;
-import org.apache.drill.exec.rpc.user.UserServer;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.StoragePlugin;
-import org.apache.drill.exec.work.foreman.ForemanException;
 import org.eigenbase.rel.RelFieldCollation.Direction;
 import org.eigenbase.rel.RelFieldCollation.NullDirection;
 
@@ -63,87 +62,49 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Iterator;
 import java.util.List;
 
-public class BasicOptimizer extends Optimizer{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BasicOptimizer.class);
+public class BasicOptimizer extends Optimizer {
+//  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BasicOptimizer.class);
 
-  private DrillConfig config;
-  private QueryContext context;
-  private UserServer.UserClientConnection userSession;
+  private final QueryContext queryContext;
 
-  public BasicOptimizer(DrillConfig config, QueryContext context, UserServer.UserClientConnection userSession){
-    this.config = config;
-    this.context = context;
-    this.userSession = userSession;
-    logCurrentOptionValues();
-  }
-
-  private void logCurrentOptionValues(){
-//    Iterator<DrillOptionValue> optionVals = userSession.getSessionOptionIterator();
-//    DrillOptionValue val = null;
-//    String output = "";
-//    output += "SessionOptions: {\n";
-//    for ( ;optionVals.hasNext(); val = optionVals.next()){
-//      if (val != null) {
-//        output += val.getOptionName() + ":" + val.getValue() + ",\n";
-//      }
-//    }
-//    output += "}";
-//    logger.debug(output);
-  }
-
-  /**
-   * Get the current value of an option. Session options override global options.
-   *
-   * @param name - the name of the option
-   * @return - value of the option
-   */
-  private Object getOptionValue(String name) {
-//    Object val = userSession.getSessionLevelOption(name);
-//    if (val == null) {
-//      context.getOptionValue(name);
-//    }
-//    return val;
-    return null;
+  public BasicOptimizer(final QueryContext queryContext) {
+    this.queryContext = queryContext;
   }
 
   @Override
-  public void init(DrillConfig config) {
-
+  public void init(final DrillConfig config) {
   }
 
   @Override
-  public PhysicalPlan optimize(OptimizationContext context, LogicalPlan plan) throws OptimizerException{
-    Object obj = new Object();
-    Collection<SinkOperator> roots = plan.getGraph().getRoots();
-    List<PhysicalOperator> physOps = new ArrayList<PhysicalOperator>(roots.size());
-    LogicalConverter converter = new LogicalConverter(plan);
-    for ( SinkOperator op : roots){
-      PhysicalOperator pop  = op.accept(converter, obj);
+  public PhysicalPlan optimize(final OptimizationContext context, final LogicalPlan plan)
+      throws OptimizerException {
+    final Object obj = new Object();
+    final Collection<SinkOperator> roots = plan.getGraph().getRoots();
+    final List<PhysicalOperator> physOps = new ArrayList<>(roots.size());
+    final LogicalConverter converter = new LogicalConverter(plan);
+
+    for (SinkOperator op : roots) {
+      final PhysicalOperator pop  = op.accept(converter, obj);
       physOps.add(pop);
     }
 
-    PlanProperties props = PlanProperties.builder()
+    final PlanProperties logicalProperties = plan.getProperties();
+    final PlanProperties props = PlanProperties.builder()
         .type(PlanProperties.PlanType.APACHE_DRILL_PHYSICAL)
-        .version(plan.getProperties().version)
-        .generator(plan.getProperties().generator)
+        .version(logicalProperties.version)
+        .generator(logicalProperties.generator)
         .options(new JSONOptions(context.getOptions().getOptionList())).build();
-    PhysicalPlan p = new PhysicalPlan(props, physOps);
+    final PhysicalPlan p = new PhysicalPlan(props, physOps);
     return p;
-    //return new PhysicalPlan(props, physOps);
-  }
-
-  @Override
-  public void close() {
-
   }
 
   public static class BasicOptimizationContext implements OptimizationContext {
-
-    private OptionManager ops;
-    public BasicOptimizationContext(QueryContext c){
-      this.ops = c.getOptions();
+    private final OptionManager ops;
+    public BasicOptimizationContext(final QueryContext c) {
+      ops = c.getOptions();
     }
 
     @Override
@@ -159,64 +120,65 @@ public class BasicOptimizer extends Optimizer{
 
   private class LogicalConverter extends AbstractLogicalVisitor<PhysicalOperator, Object, OptimizerException> {
 
-    // storing a reference to the plan for access to other elements outside of the query graph
-    // such as the storage engine configs
-    LogicalPlan logicalPlan;
+    /*
+     * Store a reference to the plan for access to other elements outside of the query graph
+     * such as the storage engine configs.
+     */
+    private final LogicalPlan logicalPlan;
 
-    public LogicalConverter(LogicalPlan logicalPlan){
+    public LogicalConverter(final LogicalPlan logicalPlan) {
       this.logicalPlan = logicalPlan;
     }
 
     @Override
     public PhysicalOperator visitGroupingAggregate(GroupingAggregate groupBy, Object value) throws OptimizerException {
-
-      List<Ordering> orderDefs = Lists.newArrayList();
-
-
+      final List<Ordering> orderDefs = Lists.newArrayList();
       PhysicalOperator input = groupBy.getInput().accept(this, value);
 
-      if(groupBy.getKeys().length > 0){
-        for(NamedExpression e : groupBy.getKeys()){
+      if (groupBy.getKeys().length > 0) {
+        for(NamedExpression e : groupBy.getKeys()) {
           orderDefs.add(new Ordering(Direction.ASCENDING, e.getExpr(), NullDirection.FIRST));
         }
         input = new Sort(input, orderDefs, false);
       }
 
-      StreamingAggregate sa = new StreamingAggregate(input, groupBy.getKeys(), groupBy.getExprs(), 1.0f);
+      final StreamingAggregate sa = new StreamingAggregate(input, groupBy.getKeys(), groupBy.getExprs(), 1.0f);
       return sa;
     }
 
     @Override
-    public PhysicalOperator visitWindow(Window window, Object value) throws OptimizerException {
+    public PhysicalOperator visitWindow(final Window window, final Object value) throws OptimizerException {
       PhysicalOperator input = window.getInput().accept(this, value);
-
-      List<Ordering> ods = Lists.newArrayList();
+      final List<Ordering> ods = Lists.newArrayList();
 
       input = new Sort(input, ods, false);
 
-      return new WindowPOP(input, window.getWithins(), window.getAggregations(), window.getOrderings(), window.getStart(), window.getEnd());
+      return new WindowPOP(input, window.getWithins(), window.getAggregations(),
+          window.getOrderings(), window.getStart(), window.getEnd());
     }
 
     @Override
-    public PhysicalOperator visitOrder(Order order, Object value) throws OptimizerException {
-      PhysicalOperator input = order.getInput().accept(this, value);
-      List<Ordering> ods = Lists.newArrayList();
+    public PhysicalOperator visitOrder(final Order order, final Object value) throws OptimizerException {
+      final PhysicalOperator input = order.getInput().accept(this, value);
+      final List<Ordering> ods = Lists.newArrayList();
       for (Ordering o : order.getOrderings()){
         ods.add(o);
       }
+
       return new SelectionVectorRemover(new Sort(input, ods, false));
     }
 
     @Override
-    public PhysicalOperator visitLimit(org.apache.drill.common.logical.data.Limit limit, Object value) throws OptimizerException {
-      PhysicalOperator input = limit.getInput().accept(this, value);
+    public PhysicalOperator visitLimit(final org.apache.drill.common.logical.data.Limit limit,
+        final Object value) throws OptimizerException {
+      final PhysicalOperator input = limit.getInput().accept(this, value);
       return new SelectionVectorRemover(new Limit(input, limit.getFirst(), limit.getLast()));
     }
 
     @Override
-    public PhysicalOperator visitJoin(Join join, Object value) throws OptimizerException {
+    public PhysicalOperator visitJoin(final Join join, final Object value) throws OptimizerException {
       PhysicalOperator leftOp = join.getLeft().accept(this, value);
-      List<Ordering> leftOrderDefs = Lists.newArrayList();
+      final List<Ordering> leftOrderDefs = Lists.newArrayList();
       for(JoinCondition jc : join.getConditions()){
         leftOrderDefs.add(new Ordering(Direction.ASCENDING, jc.getLeft()));
       }
@@ -224,28 +186,28 @@ public class BasicOptimizer extends Optimizer{
       leftOp = new SelectionVectorRemover(leftOp);
 
       PhysicalOperator rightOp = join.getRight().accept(this, value);
-      List<Ordering> rightOrderDefs = Lists.newArrayList();
+      final List<Ordering> rightOrderDefs = Lists.newArrayList();
       for(JoinCondition jc : join.getConditions()){
         rightOrderDefs.add(new Ordering(Direction.ASCENDING, jc.getRight()));
       }
       rightOp = new Sort(rightOp, rightOrderDefs, false);
       rightOp = new SelectionVectorRemover(rightOp);
 
-      MergeJoinPOP mjp = new MergeJoinPOP(leftOp, rightOp, Arrays.asList(join.getConditions()), join.getJoinType());
+      final MergeJoinPOP mjp = new MergeJoinPOP(leftOp, rightOp, Arrays.asList(join.getConditions()),
+          join.getJoinType());
       return new SelectionVectorRemover(mjp);
     }
 
-
-
     @Override
-    public PhysicalOperator visitScan(Scan scan, Object obj) throws OptimizerException {
-      StoragePluginConfig config = logicalPlan.getStorageEngineConfig(scan.getStorageEngine());
+    public PhysicalOperator visitScan(final Scan scan, final Object obj) throws OptimizerException {
+      final StoragePluginConfig config = logicalPlan.getStorageEngineConfig(scan.getStorageEngine());
       if(config == null) {
-        throw new OptimizerException(String.format("Logical plan referenced the storage engine config %s but the logical plan didn't have that available as a config.", scan.getStorageEngine()));
+        throw new OptimizerException(
+            String.format("Logical plan referenced the storage engine config %s but the logical plan didn't have that available as a config.",
+                scan.getStorageEngine()));
       }
-      StoragePlugin storagePlugin;
       try {
-        storagePlugin = context.getStorage().getPlugin(config);
+        final StoragePlugin storagePlugin = queryContext.getStorage().getPlugin(config);
         return storagePlugin.getPhysicalScan(scan.getSelection());
       } catch (IOException | ExecutionSetupException e) {
         throw new OptimizerException("Failure while attempting to retrieve storage engine.", e);
@@ -253,27 +215,27 @@ public class BasicOptimizer extends Optimizer{
     }
 
     @Override
-    public PhysicalOperator visitStore(Store store, Object obj) throws OptimizerException {
-      if (!store.iterator().hasNext()) {
+    public PhysicalOperator visitStore(final Store store, final Object obj) throws OptimizerException {
+      final Iterator<LogicalOperator> iterator = store.iterator();
+      if (!iterator.hasNext()) {
         throw new OptimizerException("Store node in logical plan does not have a child.");
       }
-      return new Screen(store.iterator().next().accept(this, obj), context.getCurrentEndpoint());
+      return new Screen(iterator.next().accept(this, obj), queryContext.getCurrentEndpoint());
     }
 
     @Override
-    public PhysicalOperator visitProject(Project project, Object obj) throws OptimizerException {
-      return new org.apache.drill.exec.physical.config.Project(Arrays.asList(project.getSelections()), project.iterator().next().accept(this, obj));
+    public PhysicalOperator visitProject(final Project project, final Object obj) throws OptimizerException {
+      return new org.apache.drill.exec.physical.config.Project(
+          Arrays.asList(project.getSelections()), project.iterator().next().accept(this, obj));
     }
 
     @Override
-    public PhysicalOperator visitFilter(Filter filter, Object obj) throws OptimizerException {
-      TypeProtos.MajorType.Builder b = TypeProtos.MajorType.getDefaultInstance().newBuilderForType();
+    public PhysicalOperator visitFilter(final Filter filter, final Object obj) throws OptimizerException {
+      final TypeProtos.MajorType.Builder b = TypeProtos.MajorType.getDefaultInstance().newBuilderForType();
       b.setMode(DataMode.REQUIRED);
       b.setMinorType(MinorType.BIGINT);
-      PhysicalOperator child = filter.iterator().next().accept(this, obj);
+      final PhysicalOperator child = filter.iterator().next().accept(this, obj);
       return new SelectionVectorRemover(new org.apache.drill.exec.physical.config.Filter(child, filter.getExpr(), 1.0f));
     }
-
   }
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/2da618cd/exec/java-exec/src/main/java/org/apache/drill/exec/opt/IdentityOptimizer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/opt/IdentityOptimizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/opt/IdentityOptimizer.java
index 979c5e2..9e650b0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/opt/IdentityOptimizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/opt/IdentityOptimizer.java
@@ -22,18 +22,14 @@ import org.apache.drill.common.logical.LogicalPlan;
 import org.apache.drill.exec.physical.PhysicalPlan;
 
 public class IdentityOptimizer extends Optimizer {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(IdentityOptimizer.class);
+//  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(IdentityOptimizer.class);
 
   @Override
-  public void init(DrillConfig config) {
+  public void init(final DrillConfig config) {
   }
 
   @Override
-  public PhysicalPlan optimize(OptimizationContext context, LogicalPlan plan) {
+  public PhysicalPlan optimize(final OptimizationContext context, final LogicalPlan plan) {
     return null;
   }
-
-  @Override
-  public void close() {
-  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/2da618cd/exec/java-exec/src/main/java/org/apache/drill/exec/opt/Optimizer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/opt/Optimizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/opt/Optimizer.java
index 34d0622..aed6299 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/opt/Optimizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/opt/Optimizer.java
@@ -17,8 +17,6 @@
  */
 package org.apache.drill.exec.opt;
 
-import java.io.Closeable;
-
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.exceptions.DrillConfigurationException;
 import org.apache.drill.common.logical.LogicalPlan;
@@ -26,22 +24,19 @@ import org.apache.drill.exec.exception.OptimizerException;
 import org.apache.drill.exec.physical.PhysicalPlan;
 import org.apache.drill.exec.server.options.OptionManager;
 
-public abstract class Optimizer implements Closeable{
-
+public abstract class Optimizer {
   public static String OPTIMIZER_IMPL_KEY = "drill.exec.optimizer.implementation";
 
   public abstract void init(DrillConfig config);
-
   public abstract PhysicalPlan optimize(OptimizationContext context, LogicalPlan plan) throws OptimizerException;
-  public abstract void close();
 
-  public static Optimizer getOptimizer(DrillConfig config) throws DrillConfigurationException{
-    Optimizer o = config.getInstanceOf(OPTIMIZER_IMPL_KEY, Optimizer.class);
-    o.init(config);
-    return o;
+  public static Optimizer getOptimizer(final DrillConfig config) throws DrillConfigurationException {
+    final Optimizer optimizer = config.getInstanceOf(OPTIMIZER_IMPL_KEY, Optimizer.class);
+    optimizer.init(config);
+    return optimizer;
   }
 
-  public interface OptimizationContext{
+  public interface OptimizationContext {
     public int getPriority();
     public OptionManager getOptions();
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/2da618cd/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
index 412da85..a00df9d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
@@ -28,6 +28,7 @@ import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.RecordBatch.IterOutcome;
 
 public abstract class BaseRootExec implements RootExec {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseRootExec.class);
 
   protected OperatorStats stats = null;
   protected OperatorContext oContext = null;

http://git-wip-us.apache.org/repos/asf/drill/blob/2da618cd/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java
index a644c34..8fd68b2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java
@@ -17,8 +17,6 @@
  */
 package org.apache.drill.exec.physical.impl;
 
-
-import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 
 /**
@@ -26,8 +24,6 @@ import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
  * output nodes and storage nodes.  They are there driving force behind the completion of a query.
  */
 public interface RootExec {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RootExec.class);
-
   /**
    * Do the next batch of work.
    * @return Whether or not additional batches of work are necessary.  False means that this fragment is done.
@@ -44,5 +40,4 @@ public interface RootExec {
    * @param handle
    */
   public void receivingFragmentFinished(FragmentHandle handle);
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/2da618cd/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
index d884200..2d1a136 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
@@ -23,7 +23,6 @@ import java.util.List;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.MetricDef;
@@ -58,7 +57,7 @@ public class ScreenCreator implements RootCreator<Screen>{
 
 
   static class ScreenRoot extends BaseRootExec {
-    static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScreenRoot.class);
+//    private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScreenRoot.class);
     volatile boolean ok = true;
 
     private final SendingAccountor sendCount = new SendingAccountor();
@@ -67,7 +66,6 @@ public class ScreenCreator implements RootCreator<Screen>{
     final FragmentContext context;
     final UserClientConnection connection;
     private RecordMaterializer materializer;
-    private boolean first = true;
 
     public enum Metric implements MetricDef {
       BYTES_SENT;
@@ -146,7 +144,7 @@ public class ScreenCreator implements RootCreator<Screen>{
       }
       case OK_NEW_SCHEMA:
         materializer = new VectorRecordMaterializer(context, incoming);
-        // fall through.
+        //$FALL-THROUGH$
       case OK:
 //        context.getStats().batchesCompleted.inc(1);
 //        context.getStats().recordsCompleted.inc(incoming.getRecordCount());
@@ -160,7 +158,6 @@ public class ScreenCreator implements RootCreator<Screen>{
         }
         sendCount.increment();
 
-        first = false;
         return true;
       default:
         throw new UnsupportedOperationException();

http://git-wip-us.apache.org/repos/asf/drill/blob/2da618cd/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SendingAccountor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SendingAccountor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SendingAccountor.java
index 3920f9c..8794188 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SendingAccountor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SendingAccountor.java
@@ -46,6 +46,7 @@ public class SendingAccountor {
       batchesSent.set(0);
     } catch (InterruptedException e) {
       logger.warn("Failure while waiting for send complete.", e);
+      // TODO InterruptedException
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/2da618cd/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java
index e6c3fba..2a59e22 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java
@@ -20,31 +20,21 @@ package org.apache.drill.exec.physical.impl.materialize;
 import io.netty.buffer.ByteBuf;
 
 import java.util.Arrays;
-import java.util.List;
 
-import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.proto.UserBitShared.QueryResult;
-import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef;
-import org.apache.drill.exec.proto.UserBitShared.SerializedField;
-import org.apache.drill.exec.record.BatchSchema;
-import org.apache.drill.exec.record.MaterializedField;
-
-import com.google.common.collect.Lists;
 
 public class QueryWritableBatch {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryWritableBatch.class);
+//  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryWritableBatch.class);
 
   private final QueryResult header;
   private final ByteBuf[] buffers;
 
-
   public QueryWritableBatch(QueryResult header, ByteBuf... buffers) {
-    super();
     this.header = header;
     this.buffers = buffers;
   }
 
-  public ByteBuf[] getBuffers(){
+  public ByteBuf[] getBuffers() {
     return buffers;
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/2da618cd/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
index 352e7ae..42b1080 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
@@ -307,6 +307,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
       logger.error("Failure while building final partition table.", ex);
       context.fail(ex);
       return false;
+      // TODO InterruptedException
     }
     return true;
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/2da618cd/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
index 4c9b33b..c50cb8a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
@@ -73,6 +73,7 @@ public class ProducerConsumerBatch extends AbstractRecordBatch {
         context.fail(e);
       }
       return IterOutcome.STOP;
+      // TODO InterruptedException
     } finally {
       stats.stopWait();
     }
@@ -147,6 +148,7 @@ public class ProducerConsumerBatch extends AbstractRecordBatch {
         }
       } catch (InterruptedException e) {
         logger.warn("Producer thread is interrupted.", e);
+        // TODO InterruptedException
       } finally {
         if (stop) {
           try {
@@ -154,6 +156,7 @@ public class ProducerConsumerBatch extends AbstractRecordBatch {
             queue.put(new RecordBatchDataWrapper(null, true, false));
           } catch (InterruptedException e) {
             logger.error("Unable to enqueue the last batch indicator. Something is broken.", e);
+            // TODO InterruptedException
           }
         }
         if (wrapper!=null) {
@@ -181,6 +184,7 @@ public class ProducerConsumerBatch extends AbstractRecordBatch {
       producer.join();
     } catch (InterruptedException e) {
       logger.warn("Interrupted while waiting for producer thread");
+      // TODO InterruptedException
     }
   }
 
@@ -191,6 +195,7 @@ public class ProducerConsumerBatch extends AbstractRecordBatch {
       cleanUpLatch.await();
     } catch (InterruptedException e) {
       logger.warn("Interrupted while waiting for producer to clean up first. I will try to clean up now...", e);
+      // TODO InterruptedException
     } finally {
       super.cleanup();
       clearQueue();

http://git-wip-us.apache.org/repos/asf/drill/blob/2da618cd/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java
index 2436a0e..bc9ed74 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java
@@ -42,6 +42,7 @@ public class Fragment implements Iterable<Fragment.ExchangeFragmentPair> {
     if (root == null) {
       root = o;
     }
+    // TODO should complain otherwise
   }
 
   public void addSendExchange(Exchange e, Fragment sendingToFragment) throws ForemanSetupException{

http://git-wip-us.apache.org/repos/asf/drill/blob/2da618cd/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
index eebd40e..12043ce 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
@@ -33,7 +33,6 @@ import com.google.common.collect.Ordering;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.util.DrillStringUtils;
 import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.expr.fn.impl.DateUtility;
 import org.apache.drill.exec.ops.QueryContext;
 import org.apache.drill.exec.ops.QueryDateTimeInfo;
 import org.apache.drill.exec.physical.EndpointAffinity;
@@ -145,7 +144,7 @@ public class SimpleParallelizer {
    * @param planningSet
    * @return Returns a list of leaf fragments in fragment dependency graph.
    */
-  private Set<Wrapper> constructFragmentDependencyGraph(PlanningSet planningSet) {
+  private static Set<Wrapper> constructFragmentDependencyGraph(PlanningSet planningSet) {
 
     // Set up dependency of fragments based on the affinity of exchange that separates the fragments.
     for(Wrapper currentFragmentWrapper : planningSet) {

http://git-wip-us.apache.org/repos/asf/drill/blob/2da618cd/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SetOptionHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SetOptionHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SetOptionHandler.java
index b5d3f4a..dc63ef9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SetOptionHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SetOptionHandler.java
@@ -31,24 +31,21 @@ import org.eigenbase.sql.SqlLiteral;
 import org.eigenbase.sql.SqlNode;
 import org.eigenbase.sql.SqlSetOption;
 
-public class SetOptionHandler extends AbstractSqlHandler{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SetOptionHandler.class);
-
-  QueryContext context;
+public class SetOptionHandler extends AbstractSqlHandler {
+//  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SetOptionHandler.class);
 
+  private final QueryContext context;
 
   public SetOptionHandler(QueryContext context) {
-    super();
     this.context = context;
   }
 
-
   @Override
   public PhysicalPlan getPlan(SqlNode sqlNode) throws ValidationException, RelConversionException, IOException, ForemanSetupException {
-    SqlSetOption option = unwrap(sqlNode, SqlSetOption.class);
-    String scope = option.getScope();
-    String name = option.getName();
-    SqlNode value = option.getValue();
+    final SqlSetOption option = unwrap(sqlNode, SqlSetOption.class);
+    final String scope = option.getScope();
+    final String name = option.getName();
+    final SqlNode value = option.getValue();
     OptionValue.OptionType type;
     if (value instanceof SqlLiteral) {
       switch (scope.toLowerCase()) {
@@ -70,9 +67,5 @@ public class SetOptionHandler extends AbstractSqlHandler{
     }
 
     return DirectPlan.createDirectPlan(context, true, String.format("%s updated.", name));
-
   }
-
-
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/2da618cd/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
index f358097..72ae130 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
@@ -43,7 +43,6 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BasicClient.class);
 
   private final Bootstrap b;
-  private volatile boolean connect = false;
   protected R connection;
   private final T handshakeType;
   private final Class<HANDSHAKE_RESPONSE> responseClass;
@@ -81,7 +80,6 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection
                 new InboundHandler(connection), //
                 new RpcExceptionHandler() //
                 );
-            connect = true;
           }
         }); //
 
@@ -180,7 +178,6 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection
         try {
           BasicClient.this.validateHandshake(value);
           BasicClient.this.finalizeConnection(value, connection);
-          BasicClient.this.connect = true;
           l.connectionSucceeded(connection);
 //          logger.debug("Handshake completed succesfully.");
         } catch (RpcException ex) {
@@ -218,6 +215,7 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection
       connection.getChannel().close().get();
     } catch (InterruptedException | ExecutionException e) {
       logger.warn("Failure whiel shutting {}", this.getClass().getName(), e);
+      // TODO InterruptedException
     }
   }
 


Mime
View raw message