drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jacq...@apache.org
Subject [2/2] drill git commit: DRILL-3242: Update RPC layer so that requests and response are managed on a secondary thread. - Create a separate serialized executor for fragment receiverFinished events. - Update serialized executor to pool object creation. - En
Date Mon, 02 Nov 2015 19:37:14 GMT
DRILL-3242: Update RPC layer so that requests and response are managed on a secondary thread.
- Create a separate serialized executor for fragment receiverFinished events.
- Update serialized executor to pool object creation.
- Ensure that FragmentExecutor acceptExternalEvents countdown occurs when only execution is cancellation.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/e7db9dca
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/e7db9dca
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/e7db9dca

Branch: refs/heads/master
Commit: e7db9dcacbc39c4797de1aa29b119a7428451dea
Parents: 839f8da
Author: Jacques Nadeau <jacques@apache.org>
Authored: Sun May 24 22:01:49 2015 -0700
Committer: Jacques Nadeau <jacques@apache.org>
Committed: Mon Nov 2 08:07:49 2015 -0800

----------------------------------------------------------------------
 .../apache/drill/common/SerializedExecutor.java | 112 +++++---
 .../apache/drill/exec/client/DrillClient.java   |  23 +-
 .../apache/drill/exec/ops/FragmentContext.java  |   9 +-
 .../java/org/apache/drill/exec/rpc/RpcBus.java  | 273 +++++++++++++++----
 .../org/apache/drill/exec/rpc/RpcConfig.java    |  19 +-
 .../drill/exec/rpc/control/ControlClient.java   |  13 +-
 .../exec/rpc/control/ControlRpcConfig.java      |   5 +-
 .../drill/exec/rpc/control/ControlServer.java   |   6 +-
 .../apache/drill/exec/rpc/data/DataClient.java  |  11 +-
 .../exec/rpc/data/DataConnectionCreator.java    |   1 -
 .../exec/rpc/data/DataConnectionManager.java    |   1 -
 .../exec/rpc/data/DataResponseHandler.java      |   7 +-
 .../drill/exec/rpc/data/DataRpcConfig.java      |   5 +-
 .../apache/drill/exec/rpc/data/DataServer.java  |   6 +-
 .../apache/drill/exec/rpc/user/UserClient.java  |   6 +-
 .../drill/exec/rpc/user/UserRpcConfig.java      |   5 +-
 .../apache/drill/exec/rpc/user/UserServer.java  |   5 +-
 .../drill/exec/server/BootStrapContext.java     |  45 ++-
 .../drill/exec/server/DrillbitContext.java      |  14 +-
 .../drill/exec/service/ServiceEngine.java       |  16 +-
 .../org/apache/drill/exec/work/WorkManager.java |  54 +---
 .../exec/work/fragment/FragmentExecutor.java    |  62 ++++-
 .../exec/physical/impl/TestOptiqPlans.java      |  11 +-
 23 files changed, 507 insertions(+), 202 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/e7db9dca/common/src/main/java/org/apache/drill/common/SerializedExecutor.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/SerializedExecutor.java b/common/src/main/java/org/apache/drill/common/SerializedExecutor.java
index 203be49..6a3a823 100644
--- a/common/src/main/java/org/apache/drill/common/SerializedExecutor.java
+++ b/common/src/main/java/org/apache/drill/common/SerializedExecutor.java
@@ -17,79 +17,113 @@
  */
 package org.apache.drill.common;
 
+import io.netty.util.Recycler;
+import io.netty.util.Recycler.Handle;
+
 import java.util.LinkedList;
 import java.util.concurrent.Executor;
 
 /**
- * Serializes execution of multiple submissions to a single target, while
- * still using a thread pool to execute those submissions. Provides an
- * implicit queueing capability for a single target that requires any commands
- * that execute against it to be serialized.
+ * Serializes execution of multiple submissions to a single target, while still
+ * using a thread pool to execute those submissions. Provides an implicit
+ * queueing capability for a single target that requires any commands that
+ * execute against it to be serialized.
  */
-public class SerializedExecutor implements Executor {
+public abstract class SerializedExecutor implements Executor {
+
+  private final Recycler<RunnableProcessor> processors = new Recycler<RunnableProcessor>() {
+    @Override
+    protected RunnableProcessor newObject(Handle handle) {
+      return new RunnableProcessor(handle);
+    }
+  };
+
   private boolean isProcessing = false;
   private final LinkedList<Runnable> queuedRunnables = new LinkedList<>();
   private final Executor underlyingExecutor;
+  private final String name;
 
   /**
    * Constructor.
    *
-   * @param underlyingExecutor underlying executor to use to execute commands
-   *   submitted to this SerializedExecutor
+   * @param underlyingExecutor
+   *          underlying executor to use to execute commands submitted to this
+   *          SerializedExecutor
    */
-  public SerializedExecutor(Executor underlyingExecutor) {
+  public SerializedExecutor(String name, Executor underlyingExecutor) {
     this.underlyingExecutor = underlyingExecutor;
+    this.name = name;
   }
 
   /**
-   * An exception occurred in the last command executed; this reports that
-   * to the subclass of SerializedExecutor.
+   * An exception occurred in the last command executed; this reports that to
+   * the subclass of SerializedExecutor.
    *
-   * <p>The default implementation of this method throws an exception, which
-   * is considered an error (see below). Implementors have two alternatives:
-   * Arrange not to throw from your commands' run(), or if they do,
-   * provide an override of this method that handles any exception that
-   * is thrown.</p>
+   * <p>
+   * The default implementation of this method throws an exception, which is
+   * considered an error (see below). Implementors have two alternatives:
+   * Arrange not to throw from your commands' run(), or if they do, provide an
+   * override of this method that handles any exception that is thrown.
+   * </p>
    *
-   * <p>It is an error for this to throw an exception, and doing so will
-   * terminate the thread with an IllegalStateException. Derivers must
-   * handle any reported exceptions in other ways.</p>
+   * <p>
+   * It is an error for this to throw an exception, and doing so will terminate
+   * the thread with an IllegalStateException. Derivers must handle any reported
+   * exceptions in other ways.
+   * </p>
    *
-   * @param command the command that caused the exception
-   * @param t the exception
+   * @param command
+   *          the command that caused the exception
+   * @param t
+   *          the exception
    */
-  protected void runException(Runnable command, Throwable t) {
-    throw new IllegalStateException("unhandled exception thrown by command");
-  }
+  protected abstract void runException(Runnable command, Throwable t);
 
   private class RunnableProcessor implements Runnable {
+    private final Handle handle;
+
     private Runnable command;
 
-    public RunnableProcessor(Runnable command) {
+    public RunnableProcessor(Handle handle) {
+      this.handle = handle;
+    }
+
+    public Runnable set(Runnable command) {
       this.command = command;
+      return this;
     }
 
     @Override
     public void run() {
-      while (true) {
-        try {
-          command.run();
-        } catch(Exception | AssertionError e) {
+      final Thread currentThread = Thread.currentThread();
+      final String originalThreadName = currentThread.getName();
+      currentThread.setName(name);
+
+      try {
+        while (true) {
           try {
-            runException(command, e);
-          } catch(Exception | AssertionError ee) {
-            throw new IllegalStateException("Exception handler threw an exception", ee);
+            command.run();
+          } catch (Exception | AssertionError e) {
+            try {
+              runException(command, e);
+            } catch (Exception | AssertionError ee) {
+              throw new IllegalStateException("Exception handler threw an exception", ee);
+            }
           }
-        }
 
-        synchronized (queuedRunnables) {
-          if (queuedRunnables.isEmpty()) {
-            isProcessing = false;
-            break;
-          }
+          synchronized (queuedRunnables) {
+            if (queuedRunnables.isEmpty()) {
+              isProcessing = false;
+              break;
+            }
 
-          command = queuedRunnables.removeFirst();
+            command = queuedRunnables.removeFirst();
+          }
         }
+      } finally {
+        currentThread.setName(originalThreadName);
+        command = null;
+        processors.recycle(this, handle);
       }
     }
   }
@@ -105,6 +139,6 @@ public class SerializedExecutor implements Executor {
       isProcessing = true;
     }
 
-    underlyingExecutor.execute(new RunnableProcessor(command));
+    underlyingExecutor.execute(processors.get().set(command));
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/e7db9dca/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
index bf6c8be..08663eb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
@@ -30,6 +30,10 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
 import java.util.Vector;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.drill.common.DrillAutoCloseables;
 import org.apache.drill.common.config.DrillConfig;
@@ -54,6 +58,7 @@ import org.apache.drill.exec.proto.helper.QueryIdHelper;
 import org.apache.drill.exec.rpc.BasicClientWithConnection.ServerConnection;
 import org.apache.drill.exec.rpc.ChannelClosedException;
 import org.apache.drill.exec.rpc.DrillRpcFuture;
+import org.apache.drill.exec.rpc.NamedThreadFactory;
 import org.apache.drill.exec.rpc.RpcConnectionHandler;
 import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.TransportCheck;
@@ -86,6 +91,7 @@ public class DrillClient implements Closeable, ConnectionThrottle {
   private final boolean ownsAllocator;
   private final boolean isDirectConnection; // true if the connection bypasses zookeeper and connects directly to a drillbit
   private EventLoopGroup eventLoopGroup;
+  private ExecutorService executor;
 
   public DrillClient() throws OutOfMemoryException {
     this(DrillConfig.create(), false);
@@ -212,7 +218,18 @@ public class DrillClient implements Closeable, ConnectionThrottle {
     }
 
     eventLoopGroup = createEventLoop(config.getInt(ExecConstants.CLIENT_RPC_THREADS), "Client-");
-    client = new UserClient(config, supportComplexTypes, allocator, eventLoopGroup);
+    executor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
+        new SynchronousQueue<Runnable>(),
+        new NamedThreadFactory("drill-client-executor-")) {
+      @Override
+      protected void afterExecute(final Runnable r, final Throwable t) {
+        if (t != null) {
+          logger.error("{}.run() leaked an exception.", r.getClass().getName(), t);
+        }
+        super.afterExecute(r, t);
+      }
+    };
+    client = new UserClient(config, supportComplexTypes, allocator, eventLoopGroup, executor);
     logger.debug("Connecting to server {}:{}", endpoint.getAddress(), endpoint.getUserPort());
     connect(endpoint);
     connected = true;
@@ -280,6 +297,10 @@ public class DrillClient implements Closeable, ConnectionThrottle {
       eventLoopGroup.shutdownGracefully();
     }
 
+    if (executor != null) {
+      executor.shutdownNow();
+    }
+
     // TODO:  Did DRILL-1735 changes cover this TODO?:
     // TODO: fix tests that fail when this is called.
     //allocator.close();

http://git-wip-us.apache.org/repos/asf/drill/blob/e7db9dca/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
index 02580fc..c974652 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
@@ -22,9 +22,9 @@ import io.netty.buffer.DrillBuf;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Executor;
 
 import org.apache.calcite.schema.SchemaPlus;
-
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.exceptions.UserException;
@@ -35,8 +35,8 @@ import org.apache.drill.exec.expr.CodeGenerator;
 import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.memory.OutOfMemoryException;
-import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.memory.OutOfMemoryRuntimeException;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.proto.BitControl.PlanFragment;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
@@ -84,6 +84,7 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
   private ExecutorState executorState;
   private final ExecutionControls executionControls;
 
+
   private final SendingAccountor sendingAccountor = new SendingAccountor();
   private final Consumer<RpcException> exceptionConsumer = new Consumer<RpcException>() {
     @Override
@@ -429,6 +430,10 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
         "option is set to true.", PlannerSettings.CONSTANT_FOLDING.getOptionName()));
   }
 
+  public Executor getExecutor(){
+    return context.getExecutor();
+  }
+
   /**
    * Wait for ack that all outgoing batches have been sent
    */

http://git-wip-us.apache.org/repos/asf/drill/blob/e7db9dca/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
index c643ac5..61922a1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
@@ -25,14 +25,19 @@ import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.handler.codec.MessageToMessageDecoder;
+import io.netty.util.Recycler;
+import io.netty.util.Recycler.Handle;
 import io.netty.util.concurrent.GenericFutureListener;
 
 import java.io.Closeable;
 import java.net.SocketAddress;
 import java.util.Arrays;
 import java.util.List;
+import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.drill.common.SerializedExecutor;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.proto.GeneralRPCProtos.RpcMode;
 import org.apache.drill.exec.proto.UserBitShared.DrillPBError;
@@ -53,6 +58,8 @@ import com.google.protobuf.Parser;
 public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> implements Closeable {
   final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(this.getClass());
 
+  private static final OutboundRpcMessage PONG = new OutboundRpcMessage(RpcMode.PONG, 0, 0, Acks.OK);
+
   protected final CoordinationQueue queue = new CoordinationQueue(16, 16);
 
   protected abstract MessageLite getResponseDefaultInstance(int rpcType) throws RpcException;
@@ -176,39 +183,89 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp
     return new ChannelClosedHandler(clientConnection, channel);
   }
 
+  private interface Recyclable {
+    public void recycle();
+  }
+
   private class ResponseSenderImpl implements ResponseSender {
 
-    RemoteConnection connection;
-    int coordinationId;
+    private RemoteConnection connection;
+    private int coordinationId;
+    private final AtomicBoolean sent = new AtomicBoolean(false);
+    private final Recyclable recyclable;
 
-    public ResponseSenderImpl(RemoteConnection connection, int coordinationId) {
-      super();
+    public ResponseSenderImpl(Recyclable recyclable) {
+      this.recyclable = recyclable;
+    }
+
+    void set(RemoteConnection connection, int coordinationId){
       this.connection = connection;
       this.coordinationId = coordinationId;
+      sent.set(false);
     }
 
     public void send(Response r) {
-      assert rpcConfig.checkResponseSend(r.rpcType, r.pBody.getClass());
-      OutboundRpcMessage outMessage = new OutboundRpcMessage(RpcMode.RESPONSE, r.rpcType, coordinationId,
-          r.pBody, r.dBodies);
-      if (RpcConstants.EXTRA_DEBUGGING) {
-        logger.debug("Adding message to outbound buffer. {}", outMessage);
+      try {
+        assert rpcConfig.checkResponseSend(r.rpcType, r.pBody.getClass());
+        sendOnce();
+        OutboundRpcMessage outMessage = new OutboundRpcMessage(RpcMode.RESPONSE, r.rpcType, coordinationId,
+            r.pBody, r.dBodies);
+        if (RpcConstants.EXTRA_DEBUGGING) {
+          logger.debug("Adding message to outbound buffer. {}", outMessage);
+        }
+        logger.debug("Sending response with Sender {}", System.identityHashCode(this));
+        connection.getChannel().writeAndFlush(outMessage);
+      } finally {
+        recyclable.recycle();
+      }
+    }
+
+    /**
+     * Ensures that each sender is only used once.
+     */
+    private void sendOnce() {
+      if (!sent.compareAndSet(false, true)) {
+        throw new IllegalStateException("Attempted to utilize a sender multiple times.");
+      }
+    }
+
+    void sendFailure(UserRpcException e){
+      try {
+        sendOnce();
+        UserException uex = UserException.systemError(e)
+            .addIdentity(e.getEndpoint())
+            .build(logger);
+
+        logger.error("Unexpected Error while handling request message", e);
+
+        OutboundRpcMessage outMessage = new OutboundRpcMessage(
+            RpcMode.RESPONSE_FAILURE,
+            0,
+            coordinationId,
+            uex.getOrCreatePBError(false)
+            );
+
+        if (RpcConstants.EXTRA_DEBUGGING) {
+          logger.debug("Adding message to outbound buffer. {}", outMessage);
+        }
+        connection.getChannel().writeAndFlush(outMessage);
+      } finally {
+        recyclable.recycle();
       }
-      logger.debug("Sending response with Sender {}", System.identityHashCode(this));
-      connection.getChannel().writeAndFlush(outMessage);
     }
 
   }
 
-  private static final OutboundRpcMessage PONG = new OutboundRpcMessage(RpcMode.PONG, 0, 0, Acks.OK);
 
   protected class InboundHandler extends MessageToMessageDecoder<InboundRpcMessage> {
 
-
+    private final RpcEventHandler exec;
     private final C connection;
+
     public InboundHandler(C connection) {
       super();
       this.connection = connection;
+      this.exec = new RpcEventHandler(rpcConfig.getExecutor());
     }
 
     @Override
@@ -225,48 +282,16 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp
       try{
 
         switch (msg.mode) {
-        case REQUEST: {
-          // handle message and ack.
-
-          try {
-            ResponseSender sender = new ResponseSenderImpl(connection, msg.coordinationId);
-            handle(connection, msg.rpcType, msg.pBody, msg.dBody, sender);
-          } catch (UserRpcException e) {
-            UserException uex = UserException.systemError(e).addIdentity(e.getEndpoint()).build(logger);
-
-            logger.error("Unexpected Error while handling request message", e);
-
-            OutboundRpcMessage outMessage = new OutboundRpcMessage(
-                RpcMode.RESPONSE_FAILURE,
-                0,
-                msg.coordinationId,
-                uex.getOrCreatePBError(false)
-                );
-
-            if (RpcConstants.EXTRA_DEBUGGING) {
-              logger.debug("Adding message to outbound buffer. {}", outMessage);
-            }
-
-            channel.writeAndFlush(outMessage);
-          }
+        case REQUEST:
+          RequestEvent reqEvent = requestRecycler.get();
+          reqEvent.set(msg.coordinationId, connection, msg.rpcType, msg.pBody, msg.dBody);
+          exec.execute(reqEvent);
           break;
-        }
 
         case RESPONSE:
-          try {
-            MessageLite m = getResponseDefaultInstance(msg.rpcType);
-            assert rpcConfig.checkReceive(msg.rpcType, m.getClass());
-            RpcOutcome<?> rpcFuture = queue.getFuture(msg.rpcType, msg.coordinationId, m.getClass());
-            Parser<?> parser = m.getParserForType();
-            Object value = parser.parseFrom(new ByteBufInputStream(msg.pBody, msg.pBody.readableBytes()));
-            rpcFuture.set(value, msg.dBody);
-            if (RpcConstants.EXTRA_DEBUGGING) {
-              logger.debug("Updated rpc future {} with value {}", rpcFuture, value);
-            }
-          } catch (Exception ex) {
-            logger.error("Failure while handling response.", ex);
-            throw ex;
-          }
+          ResponseEvent respEvent = responseRecycler.get();
+          respEvent.set(msg.rpcType, msg.coordinationId, msg.pBody, msg.dBody);
+          exec.execute(respEvent);
           break;
 
         case RESPONSE_FAILURE:
@@ -278,7 +303,7 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp
           break;
 
         case PING:
-          connection.getChannel().writeAndFlush(PONG);
+          channel.writeAndFlush(PONG);
           break;
 
         case PONG:
@@ -301,7 +326,7 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp
     }
   }
 
-  public static <T> T get(ByteBuf pBody, Parser<T> parser) throws RpcException{
+  public static <T> T get(ByteBuf pBody, Parser<T> parser) throws RpcException {
     try {
       ByteBufInputStream is = new ByteBufInputStream(pBody);
       return parser.parseFrom(is);
@@ -310,4 +335,144 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp
     }
   }
 
+  class RpcEventHandler extends SerializedExecutor {
+
+    public RpcEventHandler(Executor underlyingExecutor) {
+      super(rpcConfig.getName() + "-rpc-event-queue", underlyingExecutor);
+    }
+
+    @Override
+    protected void runException(Runnable command, Throwable t) {
+      logger.error("Failure while running rpc command.", t);
+    }
+
+  }
+
+  private final Recycler<RequestEvent> requestRecycler = new Recycler<RequestEvent>() {
+    @Override
+    protected RequestEvent newObject(Handle handle) {
+      return new RequestEvent(handle);
+    }
+  };
+
+  private class RequestEvent implements Runnable, Recyclable {
+    private final ResponseSenderImpl sender;
+    private final Handle handle;
+    private C connection;
+    private int rpcType;
+    private ByteBuf pBody;
+    private ByteBuf dBody;
+
+    RequestEvent(Handle handle){
+      this.handle = handle;
+      sender = new ResponseSenderImpl(this);
+    }
+
+    public void set(int coordinationId, C connection, int rpcType, ByteBuf pBody, ByteBuf dBody) {
+      this.connection = connection;
+      this.rpcType = rpcType;
+      this.pBody = pBody;
+      this.dBody = dBody;
+      sender.set(connection, coordinationId);
+
+      if(pBody != null){
+        pBody.retain();
+      }
+
+      if(dBody != null){
+        dBody.retain();
+      }
+    }
+
+    @Override
+    public void run() {
+      try {
+        handle(connection, rpcType, pBody, dBody, sender);
+      } catch (UserRpcException e) {
+        sender.sendFailure(e);
+      } catch (Exception e) {
+        logger.error("Failure while handling message.", e);
+      }finally{
+        if(pBody != null){
+          pBody.release();
+        }
+
+        if(dBody != null){
+          dBody.release();
+        }
+      }
+
+    }
+
+    @Override
+    public void recycle() {
+      // We must defer recycling until the sender has been used.
+      requestRecycler.recycle(this, handle);
+    }
+
+
+  }
+
+  private final Recycler<ResponseEvent> responseRecycler = new Recycler<ResponseEvent>() {
+    @Override
+    protected ResponseEvent newObject(Handle handle) {
+      return new ResponseEvent(handle);
+    }
+  };
+
+  private class ResponseEvent implements Runnable {
+    private final Handle handle;
+
+    private int rpcType;
+    private int coordinationId;
+    private ByteBuf pBody;
+    private ByteBuf dBody;
+
+    public ResponseEvent(Handle handle){
+      this.handle = handle;
+    }
+
+    public void set(int rpcType, int coordinationId, ByteBuf pBody, ByteBuf dBody) {
+      this.rpcType = rpcType;
+      this.coordinationId = coordinationId;
+      this.pBody = pBody;
+      this.dBody = dBody;
+
+      if(pBody != null){
+        pBody.retain();
+      }
+
+      if(dBody != null){
+        dBody.retain();
+      }
+    }
+
+    public void run(){
+      try {
+        MessageLite m = getResponseDefaultInstance(rpcType);
+        assert rpcConfig.checkReceive(rpcType, m.getClass());
+        RpcOutcome<?> rpcFuture = queue.getFuture(rpcType, coordinationId, m.getClass());
+        Parser<?> parser = m.getParserForType();
+        Object value = parser.parseFrom(new ByteBufInputStream(pBody, pBody.readableBytes()));
+        rpcFuture.set(value, dBody);
+        if (RpcConstants.EXTRA_DEBUGGING) {
+          logger.debug("Updated rpc future {} with value {}", rpcFuture, value);
+        }
+      } catch (Exception ex) {
+        logger.error("Failure while handling response.", ex);
+      }finally{
+        if(pBody != null){
+          pBody.release();
+        }
+
+        if(dBody != null){
+          dBody.release();
+        }
+
+        responseRecycler.recycle(this, handle);
+      }
+
+    }
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/e7db9dca/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcConfig.java
index ab6c375..22b253a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcConfig.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.rpc;
 
 import java.util.Map;
+import java.util.concurrent.Executor;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
@@ -32,13 +33,16 @@ public class RpcConfig {
   private final int timeout;
   private final Map<EnumLite, RpcMessageType<?, ?, ?>> sendMap;
   private final Map<Integer, RpcMessageType<?, ?, ?>> receiveMap;
+  private final Executor executor;
 
   private RpcConfig(String name, Map<EnumLite, RpcMessageType<?, ?, ?>> sendMap,
-      Map<Integer, RpcMessageType<?, ?, ?>> receiveMap, int timeout) {
+      Map<Integer, RpcMessageType<?, ?, ?>> receiveMap, int timeout, Executor executor) {
+    Preconditions.checkNotNull(executor, "Executor must be defined.");
     this.name = name;
     this.timeout = timeout;
     this.sendMap = ImmutableMap.copyOf(sendMap);
     this.receiveMap = ImmutableMap.copyOf(receiveMap);
+    this.executor = executor;
   }
 
   public String getName() {
@@ -52,6 +56,11 @@ public class RpcConfig {
   public boolean hasTimeout() {
     return timeout > 0;
   }
+
+  public Executor getExecutor() {
+    return executor;
+  }
+
   public boolean checkReceive(int rpcType, Class<?> receiveClass) {
     if (RpcConstants.EXTRA_DEBUGGING) {
       logger.debug(String.format("Checking reception for rpcType %d and receive class %s.", rpcType, receiveClass));
@@ -152,6 +161,7 @@ public class RpcConfig {
   public static class RpcConfigBuilder {
     private String name;
     private int timeout = -1;
+    private Executor executor;
     private Map<EnumLite, RpcMessageType<?, ?, ?>> sendMap = Maps.newHashMap();
     private Map<Integer, RpcMessageType<?, ?, ?>> receiveMap = Maps.newHashMap();
 
@@ -175,10 +185,15 @@ public class RpcConfig {
       return this;
     }
 
+    public RpcConfigBuilder executor(Executor executor) {
+      this.executor = executor;
+      return this;
+    }
+
     public RpcConfig build() {
       Preconditions.checkArgument(timeout > -1, "Timeout must be a positive number or zero for disabled.");
       Preconditions.checkArgument(name != null, "RpcConfig name must be set.");
-      return new RpcConfig(name, sendMap, receiveMap, timeout);
+      return new RpcConfig(name, sendMap, receiveMap, timeout, executor);
     }
 
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/e7db9dca/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java
index 159f1df..915509f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java
@@ -40,7 +40,7 @@ import com.google.protobuf.MessageLite;
 
 public class ControlClient extends BasicClient<RpcType, ControlConnection, BitControlHandshake, BitControlHandshake>{
 
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ControlClient.class);
+  // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ControlClient.class);
 
   private final ControlMessageHandler handler;
   private final DrillbitEndpoint remoteEndpoint;
@@ -49,9 +49,14 @@ public class ControlClient extends BasicClient<RpcType, ControlConnection, BitCo
   private final DrillbitEndpoint localIdentity;
   private final BufferAllocator allocator;
 
-  public ControlClient(DrillbitEndpoint remoteEndpoint, DrillbitEndpoint localEndpoint, ControlMessageHandler handler, BootStrapContext context, ControlConnectionManager.CloseHandlerCreator closeHandlerFactory) {
-    super(ControlRpcConfig.getMapping(context.getConfig()), context.getAllocator().getUnderlyingAllocator(), context
-        .getBitLoopGroup(), RpcType.HANDSHAKE, BitControlHandshake.class, BitControlHandshake.PARSER);
+  public ControlClient(DrillbitEndpoint remoteEndpoint, DrillbitEndpoint localEndpoint, ControlMessageHandler handler,
+      BootStrapContext context, ControlConnectionManager.CloseHandlerCreator closeHandlerFactory) {
+    super(ControlRpcConfig.getMapping(context.getConfig(), context.getExecutor()),
+        context.getAllocator().getUnderlyingAllocator(),
+        context.getBitLoopGroup(),
+        RpcType.HANDSHAKE,
+        BitControlHandshake.class,
+        BitControlHandshake.PARSER);
     this.localIdentity = localEndpoint;
     this.remoteEndpoint = remoteEndpoint;
     this.handler = handler;

http://git-wip-us.apache.org/repos/asf/drill/blob/e7db9dca/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java
index 0cfa56e..44046c9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java
@@ -17,6 +17,8 @@
  */
 package org.apache.drill.exec.rpc.control;
 
+import java.util.concurrent.Executor;
+
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.proto.BitControl.BitControlHandshake;
@@ -35,9 +37,10 @@ import org.apache.drill.exec.rpc.RpcConfig;
 public class ControlRpcConfig {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ControlRpcConfig.class);
 
-  public static RpcConfig getMapping(DrillConfig config) {
+  public static RpcConfig getMapping(DrillConfig config, Executor executor) {
     return RpcConfig.newBuilder()
         .name("CONTROL")
+        .executor(executor)
         .timeout(config.getInt(ExecConstants.BIT_RPC_TIMEOUT))
         .add(RpcType.HANDSHAKE, BitControlHandshake.class, RpcType.HANDSHAKE, BitControlHandshake.class)
         .add(RpcType.REQ_INITIALIZE_FRAGMENTS, InitializeFragments.class, RpcType.ACK, Ack.class)

http://git-wip-us.apache.org/repos/asf/drill/blob/e7db9dca/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java
index 98ce9e1..81c886a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java
@@ -44,8 +44,10 @@ public class ControlServer extends BasicServer<RpcType, ControlConnection>{
   private BufferAllocator allocator;
 
   public ControlServer(ControlMessageHandler handler, BootStrapContext context, ConnectionManagerRegistry connectionRegistry) {
-    super(ControlRpcConfig.getMapping(context.getConfig()), context.getAllocator().getUnderlyingAllocator(), context
-        .getBitLoopGroup());
+    super(
+        ControlRpcConfig.getMapping(context.getConfig(), context.getExecutor()),
+        context.getAllocator().getUnderlyingAllocator(),
+        context.getBitLoopGroup());
     this.handler = handler;
     this.connectionRegistry = connectionRegistry;
     this.allocator = context.getAllocator();

http://git-wip-us.apache.org/repos/asf/drill/blob/e7db9dca/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java
index 544bab9..d7611b9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java
@@ -40,16 +40,19 @@ public class DataClient extends BasicClient<RpcType, DataClientConnection, BitCl
 
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DataClient.class);
 
-  private final DrillbitEndpoint remoteEndpoint;
   private volatile DataClientConnection connection;
   private final BufferAllocator allocator;
   private final DataConnectionManager.CloseHandlerCreator closeHandlerFactory;
 
 
   public DataClient(DrillbitEndpoint remoteEndpoint, BootStrapContext context, DataConnectionManager.CloseHandlerCreator closeHandlerFactory) {
-    super(DataRpcConfig.getMapping(context.getConfig()), context.getAllocator().getUnderlyingAllocator(), context
-        .getBitClientLoopGroup(), RpcType.HANDSHAKE, BitServerHandshake.class, BitServerHandshake.PARSER);
-    this.remoteEndpoint = remoteEndpoint;
+    super(
+        DataRpcConfig.getMapping(context.getConfig(), context.getExecutor()),
+        context.getAllocator().getUnderlyingAllocator(),
+        context.getBitClientLoopGroup(),
+        RpcType.HANDSHAKE,
+        BitServerHandshake.class,
+        BitServerHandshake.PARSER);
     this.closeHandlerFactory = closeHandlerFactory;
     this.allocator = context.getAllocator();
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/e7db9dca/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionCreator.java
index ddb7c8e..5874b31 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionCreator.java
@@ -22,7 +22,6 @@ import java.util.concurrent.ConcurrentMap;
 
 import org.apache.drill.exec.exception.DrillbitStartupException;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.rpc.control.WorkEventBus;
 import org.apache.drill.exec.server.BootStrapContext;
 

http://git-wip-us.apache.org/repos/asf/drill/blob/e7db9dca/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionManager.java
index 8a947a9..267b7e3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionManager.java
@@ -19,7 +19,6 @@ package org.apache.drill.exec.rpc.data;
 
 import org.apache.drill.exec.proto.BitData.BitClientHandshake;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.UserBitShared.RpcChannel;
 import org.apache.drill.exec.rpc.ReconnectingConnection;
 import org.apache.drill.exec.server.BootStrapContext;

http://git-wip-us.apache.org/repos/asf/drill/blob/e7db9dca/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataResponseHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataResponseHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataResponseHandler.java
index 721b83e..39dcc90 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataResponseHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataResponseHandler.java
@@ -17,15 +17,12 @@
  */
 package org.apache.drill.exec.rpc.data;
 
-import java.io.IOException;
-
 import io.netty.buffer.DrillBuf;
 
+import java.io.IOException;
+
 import org.apache.drill.exec.exception.FragmentSetupException;
 import org.apache.drill.exec.proto.BitData.FragmentRecordBatch;
-import org.apache.drill.exec.rpc.RemoteConnection;
-import org.apache.drill.exec.rpc.ResponseSender;
-import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.work.fragment.FragmentManager;
 
 public interface DataResponseHandler {

http://git-wip-us.apache.org/repos/asf/drill/blob/e7db9dca/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataRpcConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataRpcConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataRpcConfig.java
index c5cf498..cd3cdfe 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataRpcConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataRpcConfig.java
@@ -17,6 +17,8 @@
  */
 package org.apache.drill.exec.rpc.data;
 
+import java.util.concurrent.Executor;
+
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.proto.BitData.BitClientHandshake;
@@ -31,9 +33,10 @@ import org.apache.drill.exec.rpc.RpcConfig;
 public class DataRpcConfig {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DataRpcConfig.class);
 
-  public static RpcConfig getMapping(DrillConfig config) {
+  public static RpcConfig getMapping(DrillConfig config, Executor executor) {
     return RpcConfig.newBuilder()
         .name("DATA")
+        .executor(executor)
         .timeout(config.getInt(ExecConstants.BIT_RPC_TIMEOUT))
         .add(RpcType.HANDSHAKE, BitClientHandshake.class, RpcType.HANDSHAKE, BitServerHandshake.class)
         .add(RpcType.REQ_RECORD_BATCH, FragmentRecordBatch.class, RpcType.ACK, Ack.class)

http://git-wip-us.apache.org/repos/asf/drill/blob/e7db9dca/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
index 4908c18..ed69699 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
@@ -57,8 +57,10 @@ public class DataServer extends BasicServer<RpcType, BitServerConnection> {
   private final DataResponseHandler dataHandler;
 
   public DataServer(BootStrapContext context, WorkEventBus workBus, DataResponseHandler dataHandler) {
-    super(DataRpcConfig.getMapping(context.getConfig()), context.getAllocator().getUnderlyingAllocator(), context
-        .getBitLoopGroup());
+    super(
+        DataRpcConfig.getMapping(context.getConfig(), context.getExecutor()),
+        context.getAllocator().getUnderlyingAllocator(),
+        context.getBitLoopGroup());
     this.context = context;
     this.workBus = workBus;
     this.dataHandler = dataHandler;

http://git-wip-us.apache.org/repos/asf/drill/blob/e7db9dca/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
index dde3e49..84c98b4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
@@ -20,6 +20,8 @@ package org.apache.drill.exec.rpc.user;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.EventLoopGroup;
 
+import java.util.concurrent.Executor;
+
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
@@ -51,9 +53,9 @@ public class UserClient extends BasicClientWithConnection<RpcType, UserToBitHand
   private boolean supportComplexTypes = true;
 
   public UserClient(DrillConfig config, boolean supportComplexTypes, BufferAllocator alloc,
-      EventLoopGroup eventLoopGroup) {
+      EventLoopGroup eventLoopGroup, Executor eventExecutor) {
     super(
-        UserRpcConfig.getMapping(config),
+        UserRpcConfig.getMapping(config, eventExecutor),
         alloc,
         eventLoopGroup,
         RpcType.HANDSHAKE,

http://git-wip-us.apache.org/repos/asf/drill/blob/e7db9dca/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java
index 3f8122d..22d3634 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java
@@ -17,6 +17,8 @@
  */
 package org.apache.drill.exec.rpc.user;
 
+import java.util.concurrent.Executor;
+
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
@@ -32,10 +34,11 @@ import org.apache.drill.exec.rpc.RpcConfig;
 public class UserRpcConfig {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UserRpcConfig.class);
 
-  public static RpcConfig getMapping(DrillConfig config) {
+  public static RpcConfig getMapping(DrillConfig config, Executor executor) {
     return RpcConfig.newBuilder()
         .name("USER")
         .timeout(config.getInt(ExecConstants.USER_RPC_TIMEOUT))
+        .executor(executor)
         .add(RpcType.HANDSHAKE, UserToBitHandshake.class, RpcType.HANDSHAKE, BitToUserHandshake.class) // user to bit
         .add(RpcType.RUN_QUERY, RunQuery.class, RpcType.QUERY_HANDLE, QueryId.class) // user to bit
         .add(RpcType.CANCEL_QUERY, QueryId.class, RpcType.ACK, Ack.class) // user to bit

http://git-wip-us.apache.org/repos/asf/drill/blob/e7db9dca/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
index beb3416..ddba213 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
@@ -25,6 +25,7 @@ import io.netty.channel.socket.SocketChannel;
 
 import java.io.IOException;
 import java.util.UUID;
+import java.util.concurrent.Executor;
 
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.scanner.persistence.ScanResult;
@@ -68,8 +69,8 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec
   final UserAuthenticator authenticator;
 
   public UserServer(DrillConfig config, ScanResult classpathScan, BufferAllocator alloc, EventLoopGroup eventLoopGroup,
-      UserWorker worker) throws DrillbitStartupException {
-    super(UserRpcConfig.getMapping(config), alloc.getUnderlyingAllocator(), eventLoopGroup);
+      UserWorker worker, Executor executor) throws DrillbitStartupException {
+    super(UserRpcConfig.getMapping(config, executor), alloc.getUnderlyingAllocator(), eventLoopGroup);
     this.worker = worker;
     this.alloc = alloc;
     // TODO: move this up

http://git-wip-us.apache.org/repos/asf/drill/blob/e7db9dca/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java
index ad757ef..3f6814e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java
@@ -19,6 +19,11 @@ package org.apache.drill.exec.server;
 
 import io.netty.channel.EventLoopGroup;
 
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.drill.common.DrillAutoCloseables;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.scanner.persistence.ScanResult;
@@ -26,6 +31,7 @@ import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.memory.RootAllocatorFactory;
 import org.apache.drill.exec.metrics.DrillMetrics;
+import org.apache.drill.exec.rpc.NamedThreadFactory;
 import org.apache.drill.exec.rpc.TransportCheck;
 
 import com.codahale.metrics.MetricRegistry;
@@ -39,6 +45,7 @@ public class BootStrapContext implements AutoCloseable {
   private final MetricRegistry metrics;
   private final BufferAllocator allocator;
   private final ScanResult classpathScan;
+  private final ExecutorService executor;
 
   public BootStrapContext(DrillConfig config, ScanResult classpathScan) {
     this.config = config;
@@ -47,6 +54,21 @@ public class BootStrapContext implements AutoCloseable {
     this.loop2 = TransportCheck.createEventLoopGroup(config.getInt(ExecConstants.BIT_SERVER_RPC_THREADS), "BitClient-");
     this.metrics = DrillMetrics.getInstance();
     this.allocator = RootAllocatorFactory.newRoot(config);
+    this.executor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
+        new SynchronousQueue<Runnable>(),
+        new NamedThreadFactory("drill-executor-")) {
+      @Override
+      protected void afterExecute(final Runnable r, final Throwable t) {
+        if (t != null) {
+          logger.error("{}.run() leaked an exception.", r.getClass().getName(), t);
+        }
+        super.afterExecute(r, t);
+      }
+    };
+  }
+
+  public ExecutorService getExecutor() {
+    return executor;
   }
 
   public DrillConfig getConfig() {
@@ -80,7 +102,28 @@ public class BootStrapContext implements AutoCloseable {
     } catch (Error | Exception e) {
       logger.warn("failure resetting metrics.", e);
     }
-    loop.shutdownGracefully();
+
+    if (executor != null) {
+      executor.shutdown(); // Disable new tasks from being submitted
+      try {
+        // Wait a while for existing tasks to terminate
+        if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
+          executor.shutdownNow(); // Cancel currently executing tasks
+          // Wait a while for tasks to respond to being cancelled
+          if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
+            logger.error("Pool did not terminate");
+          }
+        }
+      } catch (InterruptedException ie) {
+        logger.warn("Executor interrupted while awaiting termination");
+
+        // (Re-)Cancel if current thread also interrupted
+        executor.shutdownNow();
+        // Preserve interrupt status
+        Thread.currentThread().interrupt();
+      }
+    }
+
     DrillAutoCloseables.closeNoChecked(allocator);
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/e7db9dca/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
index 8a3a888..c1f2e5b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
@@ -17,9 +17,8 @@
  */
 package org.apache.drill.exec.server;
 
-import io.netty.channel.EventLoopGroup;
-
 import static com.google.common.base.Preconditions.checkNotNull;
+import io.netty.channel.EventLoopGroup;
 
 import java.util.Collection;
 import java.util.concurrent.ExecutorService;
@@ -43,7 +42,6 @@ import org.apache.drill.exec.store.StoragePluginRegistry.DrillSchemaFactory;
 import org.apache.drill.exec.store.sys.PStoreProvider;
 
 import com.codahale.metrics.MetricRegistry;
-import com.google.common.base.Preconditions;
 
 public class DrillbitContext {
 //  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillbitContext.class);
@@ -61,9 +59,9 @@ public class DrillbitContext {
   private final SystemOptionManager systemOptions;
   private final PStoreProvider provider;
   private final CodeCompiler compiler;
-  private final ExecutorService executor;
-  private final LogicalPlanPersistence lpPersistence;
   private final ScanResult classpathScan;
+  private final LogicalPlanPersistence lpPersistence;
+
 
   public DrillbitContext(
       DrillbitEndpoint endpoint,
@@ -72,8 +70,7 @@ public class DrillbitContext {
       Controller controller,
       DataConnectionCreator connectionsPool,
       WorkEventBus workBus,
-      PStoreProvider provider,
-      ExecutorService executor) {
+      PStoreProvider provider) {
     this.classpathScan = context.getClasspathScan();
     this.workBus = workBus;
     this.controller = checkNotNull(controller);
@@ -82,7 +79,6 @@ public class DrillbitContext {
     this.connectionsPool = checkNotNull(connectionsPool);
     this.endpoint = checkNotNull(endpoint);
     this.provider = provider;
-    this.executor = executor;
     this.lpPersistence = new LogicalPlanPersistence(context.getConfig(), classpathScan);
     this.storagePlugins = new StoragePluginRegistry(this); // TODO change constructor
     this.reader = new PhysicalPlanReader(context.getConfig(), classpathScan, lpPersistence, endpoint, storagePlugins);
@@ -169,7 +165,7 @@ public class DrillbitContext {
   }
 
   public ExecutorService getExecutor() {
-    return executor;
+    return context.getExecutor();
   }
 
   public LogicalPlanPersistence getLpPersistence() {

http://git-wip-us.apache.org/repos/asf/drill/blob/e7db9dca/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java b/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
index 8c347e8..d3b4128 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
@@ -18,17 +18,17 @@
 package org.apache.drill.exec.service;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import io.netty.channel.EventLoopGroup;
 
 import java.io.Closeable;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
-import io.netty.channel.EventLoopGroup;
-
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.exception.DrillbitStartupException;
@@ -61,7 +61,13 @@ public class ServiceEngine implements Closeable{
       WorkEventBus workBus, DataResponseHandler dataHandler, boolean allowPortHunting) throws DrillbitStartupException {
     final EventLoopGroup eventLoopGroup = TransportCheck.createEventLoopGroup(
         context.getConfig().getInt(ExecConstants.USER_SERVER_RPC_THREADS), "UserServer-");
-    this.userServer = new UserServer(context.getConfig(), context.getClasspathScan(), context.getAllocator(), eventLoopGroup, userWorker);
+    this.userServer = new UserServer(
+        context.getConfig(),
+        context.getClasspathScan(),
+        context.getAllocator(),
+        eventLoopGroup,
+        userWorker,
+        context.getExecutor());
     this.controller = new ControllerImpl(context, controlMessageHandler, allowPortHunting);
     this.dataPool = new DataConnectionCreator(context, workBus, dataHandler, allowPortHunting);
     this.config = context.getConfig();
@@ -89,8 +95,8 @@ public class ServiceEngine implements Closeable{
     return controller;
   }
 
-  private void submit(ExecutorService p, final String name, final Closeable c) {
-    p.submit(new Runnable() {
+  private void submit(Executor p, final String name, final Closeable c) {
+    p.execute(new Runnable() {
       @Override
       public void run() {
         Stopwatch watch = new Stopwatch().start();

http://git-wip-us.apache.org/repos/asf/drill/blob/e7db9dca/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
index 2e51e38..9ee65b1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
@@ -21,14 +21,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.Executor;
 
 import org.apache.drill.common.SelfCleaningRunnable;
 import org.apache.drill.common.concurrent.ExtendedLatch;
-import org.apache.drill.common.scanner.persistence.ScanResult;
 import org.apache.drill.exec.coord.ClusterCoordinator;
 import org.apache.drill.exec.proto.BitControl.FragmentStatus;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
@@ -37,7 +33,6 @@ import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.proto.helper.QueryIdHelper;
 import org.apache.drill.exec.rpc.DrillRpcFuture;
-import org.apache.drill.exec.rpc.NamedThreadFactory;
 import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.control.Controller;
 import org.apache.drill.exec.rpc.control.WorkEventBus;
@@ -84,7 +79,7 @@ public class WorkManager implements AutoCloseable {
   private final UserWorker userWorker;
   private final WorkerBee bee;
   private final WorkEventBus workBus;
-  private final ExecutorService executor;
+  private final Executor executor;
   private final StatusThread statusThread;
 
   /**
@@ -96,25 +91,7 @@ public class WorkManager implements AutoCloseable {
     this.bContext = context;
     bee = new WorkerBee(); // TODO should this just be an interface?
     workBus = new WorkEventBus(); // TODO should this just be an interface?
-
-    /*
-     * TODO
-     * This executor isn't bounded in any way and could create an arbitrarily large number of
-     * threads, possibly choking the machine. We should really put an upper bound on the number of
-     * threads that can be created. Ideally, this might be computed based on the number of cores or
-     * some similar metric; ThreadPoolExecutor can impose an upper bound, and might be a better choice.
-     */
-    executor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
-        new NamedThreadFactory("WorkManager-")) {
-            @Override
-            protected void afterExecute(final Runnable r, final Throwable t) {
-              if(t != null){
-                logger.error("{}.run() leaked an exception.", r.getClass().getName(), t);
-              }
-              super.afterExecute(r, t);
-            }
-      };
-
+    executor = context.getExecutor();
 
     // TODO references to this escape here (via WorkerBee) before construction is done
     controlMessageWorker = new ControlMessageHandler(bee); // TODO getFragmentRunner(), getForemanForQueryId()
@@ -129,7 +106,7 @@ public class WorkManager implements AutoCloseable {
       final DataConnectionCreator data,
       final ClusterCoordinator coord,
       final PStoreProvider provider) {
-    dContext = new DrillbitContext(endpoint, bContext, coord, controller, data, workBus, provider, executor);
+    dContext = new DrillbitContext(endpoint, bContext, coord, controller, data, workBus, provider);
     statusThread.start();
 
     // TODO remove try block once metrics moved from singleton, For now catch to avoid unit test failures
@@ -147,7 +124,7 @@ public class WorkManager implements AutoCloseable {
     }
   }
 
-  public ExecutorService getExecutor() {
+  public Executor getExecutor() {
     return executor;
   }
 
@@ -174,27 +151,6 @@ public class WorkManager implements AutoCloseable {
   @Override
   public void close() throws Exception {
 
-    if (executor != null) {
-      executor.shutdown(); // Disable new tasks from being submitted
-      try {
-        // Wait a while for existing tasks to terminate
-        if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
-          executor.shutdownNow(); // Cancel currently executing tasks
-          // Wait a while for tasks to respond to being cancelled
-          if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
-            logger.error("Pool did not terminate");
-          }
-        }
-      } catch (InterruptedException ie) {
-        logger.warn("Executor interrupted while awaiting termination");
-
-        // (Re-)Cancel if current thread also interrupted
-        executor.shutdownNow();
-        // Preserve interrupt status
-        Thread.currentThread().interrupt();
-      }
-    }
-
     if (!runningFragments.isEmpty()) {
       logger.warn("Closing WorkManager but there are {} running fragments.", runningFragments.size());
       if (logger.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/drill/blob/e7db9dca/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
index 6d03f01..094316e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
@@ -20,10 +20,12 @@ package org.apache.drill.exec.work.fragment;
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
 import java.util.Set;
+import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.drill.common.DeferredException;
+import org.apache.drill.common.SerializedExecutor;
 import org.apache.drill.common.concurrent.ExtendedLatch;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.coord.ClusterCoordinator;
@@ -62,6 +64,7 @@ public class FragmentExecutor implements Runnable {
   private final DeferredException deferredException = new DeferredException();
   private final PlanFragment fragment;
   private final FragmentRoot rootOperator;
+  private final ReceiverExecutor receiverExecutor;
 
   private volatile RootExec root;
   private final AtomicReference<FragmentState> fragmentState = new AtomicReference<>(FragmentState.AWAITING_ALLOCATION);
@@ -97,6 +100,7 @@ public class FragmentExecutor implements Runnable {
     this.fragment = fragment;
     this.rootOperator = rootOperator;
     this.fragmentName = QueryIdHelper.getQueryIdentifier(context.getHandle());
+    this.receiverExecutor = new ReceiverExecutor(fragmentName, fragmentContext.getExecutor());
 
     context.setExecutorState(new ExecutorStateImpl());
   }
@@ -139,7 +143,7 @@ public class FragmentExecutor implements Runnable {
    * so we need to be careful about the state transitions that can result.
    */
   public void cancel() {
-    final boolean thisIsOnlyThread = this.hasCloseoutThread.compareAndSet(false, true);
+    final boolean thisIsOnlyThread = hasCloseoutThread.compareAndSet(false, true);
 
     if (!thisIsOnlyThread) {
       acceptExternalEvents.awaitUninterruptibly();
@@ -162,10 +166,12 @@ public class FragmentExecutor implements Runnable {
         }
       }
     } else {
+      // countdown so receiver fragment finished can proceed.
+      acceptExternalEvents.countDown();
+
       updateState(FragmentState.CANCELLATION_REQUESTED);
       cleanup(FragmentState.FINISHED);
     }
-
   }
 
   private void cleanup(FragmentState state) {
@@ -194,15 +200,7 @@ public class FragmentExecutor implements Runnable {
    * @param handle The downstream FragmentHandle of the Fragment that needs no more records from this Fragment.
    */
   public void receivingFragmentFinished(final FragmentHandle handle) {
-    acceptExternalEvents.awaitUninterruptibly();
-    if (root != null) {
-      logger.info("Applying request for early sender termination for {} -> {}.",
-          QueryIdHelper.getFragmentId(this.getContext().getHandle()), QueryIdHelper.getFragmentId(handle));
-      root.receivingFragmentFinished(handle);
-    } else {
-      logger.warn("Dropping request for early fragment termination for path {} -> {} as no root exec exists.",
-          QueryIdHelper.getFragmentId(this.getContext().getHandle()), QueryIdHelper.getFragmentId(handle));
-    }
+    receiverExecutor.submitReceiverFinished(handle);
   }
 
   @Override
@@ -476,4 +474,46 @@ public class FragmentExecutor implements Runnable {
       }
     }
   }
+
+  private class ReceiverExecutor extends SerializedExecutor {
+
+    public ReceiverExecutor(String name, Executor underlyingExecutor) {
+      super(name, underlyingExecutor);
+    }
+
+    @Override
+    protected void runException(Runnable command, Throwable t) {
+      logger.error("Failure running with exception of command {}", command, t);
+    }
+
+    public void submitReceiverFinished(FragmentHandle handle){
+      execute(new ReceiverFinished(handle));
+    }
+  }
+
+  private class ReceiverFinished implements Runnable {
+    final FragmentHandle handle;
+
+    public ReceiverFinished(FragmentHandle handle) {
+      super();
+      this.handle = handle;
+    }
+
+    @Override
+    public void run() {
+      acceptExternalEvents.awaitUninterruptibly();
+
+      if (root != null) {
+        logger.info("Applying request for early sender termination for {} -> {}.",
+            QueryIdHelper.getFragmentId(getContext().getHandle()), QueryIdHelper.getFragmentId(handle));
+        root.receivingFragmentFinished(handle);
+      } else {
+        logger.warn("Dropping request for early fragment termination for path {} -> {} as no root exec exists.",
+            QueryIdHelper.getFragmentId(getContext().getHandle()), QueryIdHelper.getFragmentId(handle));
+      }
+
+    }
+
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/e7db9dca/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
index f0ea8cc..94aa84e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
@@ -39,7 +39,6 @@ import org.apache.drill.exec.physical.base.FragmentRoot;
 import org.apache.drill.exec.planner.PhysicalPlanReader;
 import org.apache.drill.exec.planner.PhysicalPlanReaderTestFactory;
 import org.apache.drill.exec.proto.BitControl.PlanFragment;
-import org.apache.drill.exec.proto.CoordinationProtos;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.record.RecordBatchLoader;
 import org.apache.drill.exec.record.VectorWrapper;
@@ -104,8 +103,14 @@ public class TestOptiqPlans extends ExecTest {
       }
     };
     final RemoteServiceSet lss = RemoteServiceSet.getLocalServiceSet();
-    final DrillbitContext bitContext = new DrillbitContext(DrillbitEndpoint.getDefaultInstance(), context, coord, controller,
-        com, workBus, new LocalPStoreProvider(config), null);
+    final DrillbitContext bitContext = new DrillbitContext(
+        DrillbitEndpoint.getDefaultInstance(),
+        context,
+        coord,
+        controller,
+        com,
+        workBus,
+        new LocalPStoreProvider(config));
     final QueryContext qc = new QueryContext(UserSession.Builder.newBuilder().setSupportComplexTypes(true).build(),
         bitContext);
     final PhysicalPlanReader reader = bitContext.getPlanReader();


Mime
View raw message