drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sudhe...@apache.org
Subject [16/29] drill git commit: DRILL-4280: REFACTOR
Date Sat, 25 Feb 2017 07:18:09 GMT
DRILL-4280: REFACTOR

+ Extract RemoteConnection interface, and add AbstractRemoteConnection
+ Add ServerConnection and ClientConnection interfaces
+ Add RequestHandler interface to decouple connections from how requests are handled
+ Add NonTransientRpcException

+ Remove unused classes and methods
+ Code style changes


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/d068b3e3
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/d068b3e3
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/d068b3e3

Branch: refs/heads/master
Commit: d068b3e3fe7a2d050e919cf3377f4bb526e3982a
Parents: e17baa8
Author: Sudheesh Katkam <sudheesh@apache.org>
Authored: Wed Jan 25 18:45:37 2017 -0800
Committer: Sudheesh Katkam <sudheesh@apache.org>
Committed: Fri Feb 24 19:01:41 2017 -0800

----------------------------------------------------------------------
 .../exec/work/fragment/FragmentManager.java     |   8 -
 .../work/fragment/NonRootFragmentManager.java   |  16 --
 .../exec/work/fragment/RootFragmentManager.java |  16 --
 .../exec/rpc/AbstractRemoteConnection.java      | 227 +++++++++++++++++++
 .../org/apache/drill/exec/rpc/BasicClient.java  |  88 ++++---
 .../exec/rpc/BasicClientWithConnection.java     |  76 -------
 .../org/apache/drill/exec/rpc/BasicServer.java  |  63 ++---
 .../apache/drill/exec/rpc/ClientConnection.java |  30 +++
 .../rpc/InvalidConnectionInfoException.java     |  29 ---
 .../exec/rpc/NonTransientRpcException.java      |  33 +++
 .../drill/exec/rpc/ReconnectingConnection.java  |  90 ++------
 .../apache/drill/exec/rpc/RemoteConnection.java | 225 +++---------------
 .../apache/drill/exec/rpc/RequestHandler.java   |  50 ++++
 .../java/org/apache/drill/exec/rpc/RpcBus.java  |  37 ++-
 .../drill/exec/rpc/RpcConnectionHandler.java    |   8 +-
 .../apache/drill/exec/rpc/ServerConnection.java |  37 +++
 16 files changed, 531 insertions(+), 502 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/d068b3e3/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentManager.java
index 20315e1..aaf80d0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentManager.java
@@ -22,7 +22,6 @@ import java.io.IOException;
 import org.apache.drill.exec.exception.FragmentSetupException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
-import org.apache.drill.exec.rpc.RemoteConnection;
 import org.apache.drill.exec.rpc.data.IncomingDataBatch;
 
 /**
@@ -67,13 +66,6 @@ public interface FragmentManager {
 
   FragmentContext getFragmentContext();
 
-  void addConnection(RemoteConnection connection);
-
   void receivingFragmentFinished(final FragmentHandle handle);
 
-  /**
-   *  Sets autoRead property on all connections
-   * @param autoRead
-   */
-  void setAutoRead(boolean autoRead);
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/d068b3e3/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
index b9cf8e8..3e7a693 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
@@ -18,15 +18,12 @@
 package org.apache.drill.exec.work.fragment;
 
 import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.exception.FragmentSetupException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.proto.BitControl.PlanFragment;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
-import org.apache.drill.exec.rpc.RemoteConnection;
 import org.apache.drill.exec.rpc.data.IncomingDataBatch;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.work.batch.IncomingBuffers;
@@ -46,7 +43,6 @@ public class NonRootFragmentManager implements FragmentManager {
   private final FragmentHandle handle;
   private volatile boolean cancel = false;
   private final FragmentContext context;
-  private final List<RemoteConnection> connections = new CopyOnWriteArrayList<>();
   private volatile boolean runnerRetrieved = false;
 
   public NonRootFragmentManager(final PlanFragment fragment, final DrillbitContext context)
@@ -128,16 +124,4 @@ public class NonRootFragmentManager implements FragmentManager {
     return context;
   }
 
-  @Override
-  public void addConnection(final RemoteConnection connection) {
-    connections.add(connection);
-  }
-
-  @Override
-  public void setAutoRead(final boolean autoRead) {
-    for (final RemoteConnection c : connections) {
-      c.setAutoRead(autoRead);
-    }
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/d068b3e3/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java
index 0f7b10e..af81d17 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java
@@ -18,13 +18,10 @@
 package org.apache.drill.exec.work.fragment;
 
 import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
 
 import org.apache.drill.exec.exception.FragmentSetupException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
-import org.apache.drill.exec.rpc.RemoteConnection;
 import org.apache.drill.exec.rpc.data.IncomingDataBatch;
 import org.apache.drill.exec.work.batch.IncomingBuffers;
 
@@ -36,7 +33,6 @@ public class RootFragmentManager implements FragmentManager {
   private final FragmentExecutor runner;
   private final FragmentHandle handle;
   private volatile boolean cancel = false;
-  private final List<RemoteConnection> connections = new CopyOnWriteArrayList<>();
 
   public RootFragmentManager(final FragmentHandle handle, final IncomingBuffers buffers, final FragmentExecutor runner) {
     super();
@@ -90,16 +86,4 @@ public class RootFragmentManager implements FragmentManager {
     return runner.getContext();
   }
 
-  @Override
-  public void addConnection(final RemoteConnection connection) {
-    connections.add(connection);
-  }
-
-  @Override
-  public void setAutoRead(final boolean autoRead) {
-    for (final RemoteConnection c : connections) {
-      c.setAutoRead(autoRead);
-    }
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/d068b3e3/exec/rpc/src/main/java/org/apache/drill/exec/rpc/AbstractRemoteConnection.java
----------------------------------------------------------------------
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/AbstractRemoteConnection.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/AbstractRemoteConnection.java
new file mode 100644
index 0000000..59ed85b
--- /dev/null
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/AbstractRemoteConnection.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.socket.SocketChannel;
+
+import java.net.SocketAddress;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.drill.exec.proto.UserBitShared.DrillPBError;
+
+public abstract class AbstractRemoteConnection implements RemoteConnection {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractRemoteConnection.class);
+
+  private final Channel channel;
+  private final WriteManager writeManager;
+  private final RequestIdMap requestIdMap = new RequestIdMap();
+  private final String clientName;
+
+  private String name;
+
+  public AbstractRemoteConnection(SocketChannel channel, String name) {
+    this.channel = channel;
+    this.clientName = name;
+    this.writeManager = new WriteManager();
+    channel.pipeline().addLast(new BackPressureHandler());
+  }
+
+  @Override
+  public boolean inEventLoop() {
+    return channel.eventLoop().inEventLoop();
+  }
+
+  @Override
+  public String getName() {
+    if (name == null) {
+      name = String.format("%s <--> %s (%s)", channel.localAddress(), channel.remoteAddress(), clientName);
+    }
+    return name;
+  }
+
+  @Override
+  public final Channel getChannel() {
+    return channel;
+  }
+
+  @Override
+  public boolean blockOnNotWritable(RpcOutcomeListener<?> listener) {
+    try {
+      writeManager.waitForWritable();
+      return true;
+    } catch (final InterruptedException e) {
+      listener.interrupted(e);
+
+      // Preserve evidence that the interruption occurred so that code higher up
+      // on the call stack can learn of the
+      // interruption and respond to it if it wants to.
+      Thread.currentThread().interrupt();
+
+      return false;
+    }
+  }
+
+  @Override
+  public void setAutoRead(boolean enableAutoRead) {
+    channel.config().setAutoRead(enableAutoRead);
+  }
+
+  @Override
+  public boolean isActive() {
+    return (channel != null) && channel.isActive();
+  }
+
+  /**
+   * The write manager is responsible for controlling whether or not a write can
+   * be sent. It controls whether or not to block a sender if we have tcp
+   * backpressure on the receive side.
+   */
+  private static class WriteManager {
+    private final ResettableBarrier barrier = new ResettableBarrier();
+    private volatile boolean disabled = false;
+
+    public WriteManager() {
+      barrier.openBarrier();
+    }
+
+    public void waitForWritable() throws InterruptedException {
+      barrier.await();
+    }
+
+    public void setWritable(boolean isWritable) {
+      if (isWritable) {
+        barrier.openBarrier();
+      } else if (!disabled) {
+        barrier.closeBarrier();
+      }
+
+    }
+
+    public void disable() {
+      disabled = true;
+    }
+  }
+
+  private class BackPressureHandler extends ChannelInboundHandlerAdapter {
+
+    @Override
+    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
+      writeManager.setWritable(ctx.channel().isWritable());
+      ctx.fireChannelWritabilityChanged();
+    }
+
+  }
+
+  /**
+   * For incoming messages, remove the outcome listener and return it. Can only be done once per coordinationId
+   * creation. CoordinationId's are recycled so they will show up once we run through all 4B of them.
+   * @param rpcType The rpc type associated with the coordination.
+   * @param coordinationId The coordination id that was returned with the listener was created.
+   * @param clazz The class that is expected in response.
+   * @return An RpcOutcome associated with the provided coordinationId.
+   */
+  @Override
+  public <V> RpcOutcome<V> getAndRemoveRpcOutcome(int rpcType, int coordinationId, Class<V> clazz) {
+    return requestIdMap.getAndRemoveRpcOutcome(rpcType, coordinationId, clazz);
+  }
+
+  /**
+   * Create a new rpc listener that will be notified when the response is returned.
+   * @param handler The outcome handler to be notified when the response arrives.
+   * @param clazz The Class associated with the response object.
+   * @return The new listener. Also carries the coordination id for use in the rpc message.
+   */
+  @Override
+  public <V> ChannelListenerWithCoordinationId createNewRpcListener(RpcOutcomeListener<V> handler, Class<V> clazz) {
+    return requestIdMap.createNewRpcListener(handler, clazz, this);
+  }
+
+  /**
+   * Inform the local outcome listener that the remote operation could not be handled.
+   * @param coordinationId The id that failed.
+   * @param failure The failure that occurred.
+   */
+  @Override
+  public void recordRemoteFailure(int coordinationId, DrillPBError failure) {
+    requestIdMap.recordRemoteFailure(coordinationId, failure);
+  }
+
+  /**
+   * Called from the RpcBus's channel close handler to close all remaining
+   * resources associated with this connection. Ensures that any pending
+   * back-pressure items are also unblocked so they can be thrown away.
+   *
+   * @param ex
+   *          The exception that caused the channel to close.
+   */
+  @Override
+  public void channelClosed(RpcException ex) {
+    // this could possibly overrelease but it doesn't matter since we're only
+    // going to do this to ensure that we
+    // fail out any pending messages
+    writeManager.disable();
+    writeManager.setWritable(true);
+
+    // ensure outstanding requests are cleaned up.
+    requestIdMap.channelClosed(ex);
+  }
+
+  @Override
+  public SocketAddress getRemoteAddress() {
+    return getChannel().remoteAddress();
+  }
+
+  /**
+   * Connection consumer wants to close connection. Initiate connection close
+   * and complete. This is a blocking call that ensures that the connection is
+   * closed before returning. As part of this call, the channel close handler
+   * will be triggered which will call channelClosed() above. The latter will
+   * happen in a separate thread while this method is blocking.
+   *
+   * <p>
+   *   The check for isActive is not required here since channel can be in OPEN state without being active. We want
+   *   to close in both the scenarios. A channel is in OPEN state when a socket is created for it before binding to an
+   *   address.
+   *   <li>
+   *      For connection oriented transport protocol channel moves to ACTIVE state when a connection is established
+   *      using this channel. We need to have channel in ACTIVE state NOT OPEN before we can send any message to
+   *      remote endpoint.
+   *   </li>
+   *   <li>
+   *      For connectionless transport protocol a sender can send data as soon as channel moves to OPEN state.
+   *   </li>
+   * </p>
+   */
+  @Override
+  public void close() {
+    try {
+      channel.close().get();
+    } catch (final InterruptedException | ExecutionException e) {
+      logger.warn("Caught exception while closing channel.", e);
+
+      // Preserve evidence that the interruption occurred so that code higher up
+      // on the call stack can learn of the
+      // interruption and respond to it if it wants to.
+      Thread.currentThread().interrupt();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/d068b3e3/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
----------------------------------------------------------------------
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicClient.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
index a3eb4cb..e418c5a 100644
--- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -34,7 +34,6 @@ import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.GenericFutureListener;
 
 import java.net.SocketAddress;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.drill.exec.memory.BufferAllocator;
@@ -46,8 +45,16 @@ import com.google.protobuf.Internal.EnumLite;
 import com.google.protobuf.MessageLite;
 import com.google.protobuf.Parser;
 
-public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection, HANDSHAKE_SEND extends MessageLite, HANDSHAKE_RESPONSE extends MessageLite>
-    extends RpcBus<T, R> {
+/**
+ *
+ * @param <T> handshake rpc type
+ * @param <CC> Client connection type
+ * @param <HS> Handshake send type
+ * @param <HR> Handshake receive type
+ */
+public abstract class BasicClient<T extends EnumLite, CC extends ClientConnection,
+                                  HS extends MessageLite, HR extends MessageLite>
+    extends RpcBus<T, CC> {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BasicClient.class);
 
   // The percentage of time that should pass before sending a ping message to ensure server doesn't time us out. For
@@ -56,21 +63,22 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection
   private static final double PERCENT_TIMEOUT_BEFORE_SENDING_PING = 0.5;
 
   private final Bootstrap b;
-  protected R connection;
+  protected CC connection;
   private final T handshakeType;
-  private final Class<HANDSHAKE_RESPONSE> responseClass;
-  private final Parser<HANDSHAKE_RESPONSE> handshakeParser;
+  private final Class<HR> responseClass;
+  private final Parser<HR> handshakeParser;
 
   private final IdlePingHandler pingHandler;
 
   public BasicClient(RpcConfig rpcMapping, ByteBufAllocator alloc, EventLoopGroup eventLoopGroup, T handshakeType,
-      Class<HANDSHAKE_RESPONSE> responseClass, Parser<HANDSHAKE_RESPONSE> handshakeParser) {
+                     Class<HR> responseClass, Parser<HR> handshakeParser) {
     super(rpcMapping);
     this.responseClass = responseClass;
     this.handshakeType = handshakeType;
     this.handshakeParser = handshakeParser;
-    final long timeoutInMillis = rpcMapping.hasTimeout() ? (long) (rpcMapping.getTimeout() * 1000.0 * PERCENT_TIMEOUT_BEFORE_SENDING_PING)
-        : -1;
+    final long timeoutInMillis = rpcMapping.hasTimeout() ?
+        (long) (rpcMapping.getTimeout() * 1000.0 * PERCENT_TIMEOUT_BEFORE_SENDING_PING) :
+        -1;
     this.pingHandler = rpcMapping.hasTimeout() ? new IdlePingHandler(timeoutInMillis) : null;
 
     b = new Bootstrap() //
@@ -103,7 +111,7 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection
             }
 
             pipe.addLast("message-handler", new InboundHandler(connection));
-            pipe.addLast("exception-handler", new RpcExceptionHandler<R>(connection));
+            pipe.addLast("exception-handler", new RpcExceptionHandler<CC>(connection));
           }
         }); //
 
@@ -112,11 +120,12 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection
     // }
   }
 
-  public R initRemoteConnection(SocketChannel channel){
+  @Override
+  protected CC initRemoteConnection(SocketChannel channel){
     local=channel.localAddress();
     remote=channel.remoteAddress();
     return null;
-  };
+  }
 
   private static final OutboundRpcMessage PING_MESSAGE = new OutboundRpcMessage(RpcMode.PING, 0, 0, Acks.OK);
 
@@ -153,31 +162,40 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection
     return (connection != null) && connection.isActive();
   }
 
-  protected abstract void validateHandshake(HANDSHAKE_RESPONSE validateHandshake) throws RpcException;
+  protected abstract void validateHandshake(HR validateHandshake) throws RpcException;
 
-  protected abstract void finalizeConnection(HANDSHAKE_RESPONSE handshake, R connection);
+  protected void finalizeConnection(HR handshake, CC connection) {
+    // no-op
+  }
 
-  public <SEND extends MessageLite, RECEIVE extends MessageLite> void send(RpcOutcomeListener<RECEIVE> listener,
-      T rpcType, SEND protobufBody, Class<RECEIVE> clazz, ByteBuf... dataBodies) {
+  public <SEND extends MessageLite, RECEIVE extends MessageLite>
+  void send(RpcOutcomeListener<RECEIVE> listener, T rpcType, SEND protobufBody,
+            Class<RECEIVE> clazz, ByteBuf... dataBodies) {
     super.send(listener, connection, rpcType, protobufBody, clazz, dataBodies);
   }
 
-  public <SEND extends MessageLite, RECEIVE extends MessageLite> DrillRpcFuture<RECEIVE> send(T rpcType,
-      SEND protobufBody, Class<RECEIVE> clazz, ByteBuf... dataBodies) {
+  public <SEND extends MessageLite, RECEIVE extends MessageLite>
+  DrillRpcFuture<RECEIVE> send(T rpcType, SEND protobufBody, Class<RECEIVE> clazz, ByteBuf... dataBodies) {
     return super.send(connection, rpcType, protobufBody, clazz, dataBodies);
   }
 
-  protected void connectAsClient(RpcConnectionHandler<R> connectionListener, HANDSHAKE_SEND handshakeValue,
-      String host, int port) {
+  // the command itself must be "run" by the caller (to avoid calling inEventLoop)
+  protected <M extends MessageLite> RpcCommand<M, CC>
+  getInitialCommand(final RpcCommand<M, CC> command) {
+    return command;
+  }
+
+  protected void connectAsClient(RpcConnectionHandler<CC> connectionListener, HS handshakeValue,
+                                 String host, int port) {
     ConnectionMultiListener cml = new ConnectionMultiListener(connectionListener, handshakeValue);
     b.connect(host, port).addListener(cml.connectionHandler);
   }
 
   private class ConnectionMultiListener {
-    private final RpcConnectionHandler<R> l;
-    private final HANDSHAKE_SEND handshakeValue;
+    private final RpcConnectionHandler<CC> l;
+    private final HS handshakeValue;
 
-    public ConnectionMultiListener(RpcConnectionHandler<R> l, HANDSHAKE_SEND handshakeValue) {
+    public ConnectionMultiListener(RpcConnectionHandler<CC> l, HS handshakeValue) {
       assert l != null;
       assert handshakeValue != null;
 
@@ -244,7 +262,7 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection
     /**
      * manages handshake outcomes.
      */
-    private class HandshakeSendHandler implements RpcOutcomeListener<HANDSHAKE_RESPONSE> {
+    private class HandshakeSendHandler implements RpcOutcomeListener<HR> {
 
       @Override
       public void failed(RpcException ex) {
@@ -253,14 +271,15 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection
       }
 
       @Override
-      public void success(HANDSHAKE_RESPONSE value, ByteBuf buffer) {
+      public void success(HR value, ByteBuf buffer) {
         // logger.debug("Handshake received. {}", value);
         try {
-          BasicClient.this.validateHandshake(value);
-          BasicClient.this.finalizeConnection(value, connection);
+          validateHandshake(value);
+          finalizeConnection(value, connection);
           l.connectionSucceeded(connection);
           // logger.debug("Handshake completed succesfully.");
-        } catch (RpcException ex) {
+        } catch (Exception ex) {
+          logger.debug("Failure while validating handshake", ex);
           l.connectionFailed(FailureType.HANDSHAKE_VALIDATION, ex);
         }
       }
@@ -271,23 +290,22 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection
         l.connectionFailed(FailureType.HANDSHAKE_COMMUNICATION, ex);
       }
     }
-
   }
 
-  private class ClientHandshakeHandler extends AbstractHandshakeHandler<HANDSHAKE_RESPONSE> {
+  private class ClientHandshakeHandler extends AbstractHandshakeHandler<HR> {
 
-    private final R connection;
+    private final CC connection;
 
-    public ClientHandshakeHandler(R connection) {
+    public ClientHandshakeHandler(CC connection) {
       super(BasicClient.this.handshakeType, BasicClient.this.handshakeParser);
       Preconditions.checkNotNull(connection);
       this.connection = connection;
     }
 
     @Override
-    protected final void consumeHandshake(ChannelHandlerContext ctx, HANDSHAKE_RESPONSE msg) throws Exception {
+    protected final void consumeHandshake(ChannelHandlerContext ctx, HR msg) throws Exception {
       // remove the handshake information from the queue so it doesn't sit there forever.
-      final RpcOutcome<HANDSHAKE_RESPONSE> response =
+      final RpcOutcome<HR> response =
           connection.getAndRemoveRpcOutcome(handshakeType.getNumber(), coordinationId, responseClass);
       response.set(msg, null);
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/d068b3e3/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java
----------------------------------------------------------------------
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java
deleted file mode 100644
index bc79677..0000000
--- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.rpc;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.socket.SocketChannel;
-
-import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.rpc.BasicClientWithConnection.ServerConnection;
-
-import com.google.protobuf.Internal.EnumLite;
-import com.google.protobuf.MessageLite;
-import com.google.protobuf.Parser;
-
-public abstract class BasicClientWithConnection<T extends EnumLite, HANDSHAKE_SEND extends MessageLite, HANDSHAKE_RESPONSE extends MessageLite> extends BasicClient<T, ServerConnection, HANDSHAKE_SEND, HANDSHAKE_RESPONSE>{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BasicClientWithConnection.class);
-
-  private BufferAllocator alloc;
-  private final String connectionName;
-
-  public BasicClientWithConnection(RpcConfig rpcMapping, BufferAllocator alloc, EventLoopGroup eventLoopGroup, T handshakeType,
-      Class<HANDSHAKE_RESPONSE> responseClass, Parser<HANDSHAKE_RESPONSE> handshakeParser, String connectionName) {
-    super(rpcMapping, alloc.getAsByteBufAllocator(), eventLoopGroup, handshakeType, responseClass, handshakeParser);
-    this.alloc = alloc;
-    this.connectionName = connectionName;
-  }
-
-  @Override
-  protected Response handle(ServerConnection connection, int rpcType, ByteBuf pBody, ByteBuf dBody) throws RpcException {
-    return handleReponse( (ConnectionThrottle) connection, rpcType, pBody, dBody);
-  }
-
-  protected abstract Response handleReponse(ConnectionThrottle throttle, int rpcType, ByteBuf pBody, ByteBuf dBody) throws RpcException ;
-
-  @Override
-  public ServerConnection initRemoteConnection(SocketChannel channel) {
-    super.initRemoteConnection(channel);
-    return new ServerConnection(connectionName, channel, alloc);
-  }
-
-  public static class ServerConnection extends RemoteConnection{
-
-    private final BufferAllocator alloc;
-
-    public ServerConnection(String name, SocketChannel channel, BufferAllocator alloc) {
-      super(channel, name);
-      this.alloc = alloc;
-    }
-
-    @Override
-    public BufferAllocator getAllocator() {
-      return alloc;
-    }
-
-
-
-  }
-
-
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/d068b3e3/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
----------------------------------------------------------------------
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicServer.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
index b54d73e..7437b65 100644
--- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -44,15 +44,18 @@ import com.google.protobuf.MessageLite;
 import com.google.protobuf.Parser;
 
 /**
- * A server is bound to a port and is responsible for responding to various type of requests. In some cases, the inbound
- * requests will generate more than one outbound request.
+ * A server is bound to a port and is responsible for responding to various type of requests. In some cases,
+ * the inbound requests will generate more than one outbound request.
+ *
+ * @param <T> RPC type
+ * @param <SC> server connection type
  */
-public abstract class BasicServer<T extends EnumLite, C extends RemoteConnection> extends RpcBus<T, C> {
+public abstract class BasicServer<T extends EnumLite, SC extends ServerConnection<SC>> extends RpcBus<T, SC> {
   final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(this.getClass());
 
   protected static final String TIMEOUT_HANDLER = "timeout-handler";
 
-  private ServerBootstrap b;
+  private final ServerBootstrap b;
   private volatile boolean connect = false;
   private final EventLoopGroup eventLoopGroup;
 
@@ -77,7 +80,7 @@ public abstract class BasicServer<T extends EnumLite, C extends RemoteConnection
           @Override
           protected void initChannel(SocketChannel ch) throws Exception {
 //            logger.debug("Starting initialization of server connection.");
-            C connection = initRemoteConnection(ch);
+            SC connection = initRemoteConnection(ch);
             ch.closeFuture().addListener(getCloseHandler(ch, connection));
 
             final ChannelPipeline pipe = ch.pipeline();
@@ -88,11 +91,11 @@ public abstract class BasicServer<T extends EnumLite, C extends RemoteConnection
 
             if (rpcMapping.hasTimeout()) {
               pipe.addLast(TIMEOUT_HANDLER,
-                  new LogggingReadTimeoutHandler(connection, rpcMapping.getTimeout()));
+                  new LoggingReadTimeoutHandler(connection, rpcMapping.getTimeout()));
             }
 
             pipe.addLast("message-handler", new InboundHandler(connection));
-            pipe.addLast("exception-handler", new RpcExceptionHandler<C>(connection));
+            pipe.addLast("exception-handler", new RpcExceptionHandler<>(connection));
 
             connect = true;
 //            logger.debug("Server connection initialization completed.");
@@ -104,11 +107,11 @@ public abstract class BasicServer<T extends EnumLite, C extends RemoteConnection
 //     }
   }
 
-  private class LogggingReadTimeoutHandler extends ReadTimeoutHandler {
+  private class LoggingReadTimeoutHandler extends ReadTimeoutHandler {
 
-    private final C connection;
+    private final SC connection;
     private final int timeoutSeconds;
-    public LogggingReadTimeoutHandler(C connection, int timeoutSeconds) {
+    public LoggingReadTimeoutHandler(SC connection, int timeoutSeconds) {
       super(timeoutSeconds);
       this.connection = connection;
       this.timeoutSeconds = timeoutSeconds;
@@ -116,24 +119,20 @@ public abstract class BasicServer<T extends EnumLite, C extends RemoteConnection
 
     @Override
     protected void readTimedOut(ChannelHandlerContext ctx) throws Exception {
-      logger.info("RPC connection {} timed out.  Timeout was set to {} seconds. Closing connection.", connection.getName(),
-          timeoutSeconds);
+      logger.info("RPC connection {} timed out.  Timeout was set to {} seconds. Closing connection.",
+          connection.getName(), timeoutSeconds);
       super.readTimedOut(ctx);
     }
 
   }
 
-  public OutOfMemoryHandler getOutOfMemoryHandler() {
+  protected OutOfMemoryHandler getOutOfMemoryHandler() {
     return OutOfMemoryHandler.DEFAULT_INSTANCE;
   }
 
-  protected void removeTimeoutHandler() {
-
-  }
-
-  public abstract ProtobufLengthDecoder getDecoder(BufferAllocator allocator, OutOfMemoryHandler outOfMemoryHandler);
+  protected abstract ProtobufLengthDecoder getDecoder(BufferAllocator allocator, OutOfMemoryHandler outOfMemoryHandler);
 
-  protected abstract ServerHandshakeHandler<?> getHandshakeHandler(C connection);
+  protected abstract ServerHandshakeHandler<?> getHandshakeHandler(SC connection);
 
   protected static abstract class ServerHandshakeHandler<T extends MessageLite> extends AbstractHandshakeHandler<T> {
 
@@ -152,30 +151,16 @@ public abstract class BasicServer<T extends EnumLite, C extends RemoteConnection
 
   }
 
-  @Override
-  protected MessageLite getResponseDefaultInstance(int rpcType) throws RpcException {
-    return null;
-  }
-
-  @Override
-  protected Response handle(C connection, int rpcType, ByteBuf pBody, ByteBuf dBody) throws RpcException {
-    return null;
-  }
-
-  @Override
-  public <SEND extends MessageLite, RECEIVE extends MessageLite> DrillRpcFuture<RECEIVE> send(C connection, T rpcType,
-      SEND protobufBody, Class<RECEIVE> clazz, ByteBuf... dataBodies) {
-    return super.send(connection, rpcType, protobufBody, clazz, dataBodies);
-  }
+  protected abstract MessageLite getResponseDefaultInstance(int rpcType) throws RpcException;
 
   @Override
-  public <SEND extends MessageLite, RECEIVE extends MessageLite> void send(RpcOutcomeListener<RECEIVE> listener,
-      C connection, T rpcType, SEND protobufBody, Class<RECEIVE> clazz, ByteBuf... dataBodies) {
-    super.send(listener, connection, rpcType, protobufBody, clazz, dataBodies);
+  protected void handle(SC connection, int rpcType, ByteBuf pBody, ByteBuf dBody,
+                        ResponseSender sender) throws RpcException {
+    connection.getCurrentHandler().handle(connection, rpcType, pBody, dBody, sender);
   }
 
   @Override
-  public C initRemoteConnection(SocketChannel channel) {
+  protected SC initRemoteConnection(SocketChannel channel) {
     local = channel.localAddress();
     remote = channel.remoteAddress();
     return null;

http://git-wip-us.apache.org/repos/asf/drill/blob/d068b3e3/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ClientConnection.java
----------------------------------------------------------------------
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ClientConnection.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ClientConnection.java
new file mode 100644
index 0000000..5393173
--- /dev/null
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ClientConnection.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc;
+
+import javax.security.sasl.SaslClient;
+
+public interface ClientConnection extends RemoteConnection {
+
+  // set only once
+  void setSaslClient(SaslClient saslClient);
+
+  // get only after setting
+  SaslClient getSaslClient();
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/d068b3e3/exec/rpc/src/main/java/org/apache/drill/exec/rpc/InvalidConnectionInfoException.java
----------------------------------------------------------------------
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/InvalidConnectionInfoException.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/InvalidConnectionInfoException.java
deleted file mode 100644
index c82a718..0000000
--- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/InvalidConnectionInfoException.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.rpc;
-
-/**
- * Exception for malformed connection string from client
- */
-public class InvalidConnectionInfoException extends RpcException {
-
-  public InvalidConnectionInfoException(String message)
-  {
-    super(message);
-  }
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/d068b3e3/exec/rpc/src/main/java/org/apache/drill/exec/rpc/NonTransientRpcException.java
----------------------------------------------------------------------
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/NonTransientRpcException.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/NonTransientRpcException.java
new file mode 100644
index 0000000..014f21b
--- /dev/null
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/NonTransientRpcException.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc;
+
+public class NonTransientRpcException extends RpcException {
+
+  public NonTransientRpcException(String message) {
+    super(message);
+  }
+
+  public NonTransientRpcException(String format, Object... args) {
+    super(String.format(format, args));
+  }
+
+  public NonTransientRpcException(Throwable t) {
+    super(t);
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/d068b3e3/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java
----------------------------------------------------------------------
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java
index d62b6f2..a64a23b 100644
--- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java
@@ -33,16 +33,16 @@ import com.google.protobuf.MessageLite;
 /**
  * Manager all connections between two particular bits.
  */
-public abstract class ReconnectingConnection<CONNECTION_TYPE extends RemoteConnection, OUTBOUND_HANDSHAKE extends MessageLite>
+public abstract class ReconnectingConnection<C extends ClientConnection, HS extends MessageLite>
     implements Closeable {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ReconnectingConnection.class);
 
-  private final AtomicReference<CONNECTION_TYPE> connectionHolder = new AtomicReference<CONNECTION_TYPE>();
+  private final AtomicReference<C> connectionHolder = new AtomicReference<C>();
   private final String host;
   private final int port;
-  private final OUTBOUND_HANDSHAKE handshake;
+  private final HS handshake;
 
-  public ReconnectingConnection(OUTBOUND_HANDSHAKE handshake, String host, int port) {
+  public ReconnectingConnection(HS handshake, String host, int port) {
     Preconditions.checkNotNull(host);
     Preconditions.checkArgument(port > 0);
     this.host = host;
@@ -50,11 +50,11 @@ public abstract class ReconnectingConnection<CONNECTION_TYPE extends RemoteConne
     this.handshake = handshake;
   }
 
-  protected abstract BasicClient<?, CONNECTION_TYPE, OUTBOUND_HANDSHAKE, ?> getNewClient();
+  protected abstract BasicClient<?, C, HS, ?> getNewClient();
 
-  public <R extends MessageLite, C extends RpcCommand<R, CONNECTION_TYPE>> void runCommand(C cmd) {
+  public <T extends MessageLite, R extends RpcCommand<T, C>> void runCommand(R cmd) {
 //    if(logger.isDebugEnabled()) logger.debug(String.format("Running command %s sending to host %s:%d", cmd, host, port));
-    CONNECTION_TYPE connection = connectionHolder.get();
+    C connection = connectionHolder.get();
     if (connection != null) {
       if (connection.isActive()) {
         cmd.connectionAvailable(connection);
@@ -77,8 +77,8 @@ public abstract class ReconnectingConnection<CONNECTION_TYPE extends RemoteConne
 
       } else {
 //        logger.debug("No connection active, opening client connection.");
-        BasicClient<?, CONNECTION_TYPE, OUTBOUND_HANDSHAKE, ?> client = getNewClient();
-        ConnectionListeningFuture<R, C> future = new ConnectionListeningFuture<R, C>(cmd);
+        BasicClient<?, C, HS, ?> client = getNewClient();
+        ConnectionListeningFuture<T> future = new ConnectionListeningFuture<>(client.getInitialCommand(cmd));
         client.connectAsClient(future, handshake, host, port);
         future.waitAndRun();
 //        logger.debug("Connection available and active, command now being run inline.");
@@ -88,12 +88,13 @@ public abstract class ReconnectingConnection<CONNECTION_TYPE extends RemoteConne
     }
   }
 
-  public class ConnectionListeningFuture<R extends MessageLite, C extends RpcCommand<R, CONNECTION_TYPE>> extends
-      AbstractFuture<CONNECTION_TYPE> implements RpcConnectionHandler<CONNECTION_TYPE> {
+  public class ConnectionListeningFuture<R extends MessageLite>
+      extends AbstractFuture<C>
+      implements RpcConnectionHandler<C> {
 
-    private C cmd;
+    private RpcCommand<R, C> cmd;
 
-    public ConnectionListeningFuture(C cmd) {
+    public ConnectionListeningFuture(RpcCommand<R, C> cmd) {
       super();
       this.cmd = cmd;
     }
@@ -112,7 +113,7 @@ public abstract class ReconnectingConnection<CONNECTION_TYPE extends RemoteConne
       while(true) {
         try {
           //        logger.debug("Waiting for connection.");
-          CONNECTION_TYPE connection = this.get(remainingWaitTimeMills, TimeUnit.MILLISECONDS);
+          C connection = this.get(remainingWaitTimeMills, TimeUnit.MILLISECONDS);
 
           if (connection == null) {
             //          logger.debug("Connection failed.");
@@ -146,14 +147,14 @@ public abstract class ReconnectingConnection<CONNECTION_TYPE extends RemoteConne
     }
 
     @Override
-    public void connectionFailed(org.apache.drill.exec.rpc.RpcConnectionHandler.FailureType type, Throwable t) {
+    public void connectionFailed(FailureType type, Throwable t) {
       set(null);
       cmd.connectionFailed(type, t);
     }
 
     @Override
-    public void connectionSucceeded(CONNECTION_TYPE incoming) {
-      CONNECTION_TYPE connection = connectionHolder.get();
+    public void connectionSucceeded(C incoming) {
+      C connection = connectionHolder.get();
       while (true) {
         boolean setted = connectionHolder.compareAndSet(null, incoming);
         if (setted) {
@@ -179,8 +180,8 @@ public abstract class ReconnectingConnection<CONNECTION_TYPE extends RemoteConne
 
   /** Factory for close handlers **/
   public class CloseHandlerCreator {
-    public GenericFutureListener<ChannelFuture> getHandler(CONNECTION_TYPE connection,
-        GenericFutureListener<ChannelFuture> parent) {
+    public GenericFutureListener<ChannelFuture> getHandler(C connection,
+                                                           GenericFutureListener<ChannelFuture> parent) {
       return new CloseHandler(connection, parent);
     }
   }
@@ -189,10 +190,10 @@ public abstract class ReconnectingConnection<CONNECTION_TYPE extends RemoteConne
    * Listens for connection closes and clears connection holder.
    */
   protected class CloseHandler implements GenericFutureListener<ChannelFuture> {
-    private CONNECTION_TYPE connection;
+    private C connection;
     private GenericFutureListener<ChannelFuture> parent;
 
-    public CloseHandler(CONNECTION_TYPE connection, GenericFutureListener<ChannelFuture> parent) {
+    public CloseHandler(C connection, GenericFutureListener<ChannelFuture> parent) {
       super();
       this.connection = connection;
       this.parent = parent;
@@ -210,60 +211,17 @@ public abstract class ReconnectingConnection<CONNECTION_TYPE extends RemoteConne
     return new CloseHandlerCreator();
   }
 
-  public void addExternalConnection(CONNECTION_TYPE connection) {
+  public void addExternalConnection(C connection) {
     // if the connection holder is not set, set it to this incoming connection. We'll simply ignore if already set.
     this.connectionHolder.compareAndSet(null, connection);
   }
 
   @Override
   public void close() {
-    CONNECTION_TYPE c = connectionHolder.getAndSet(null);
+    C c = connectionHolder.getAndSet(null);
     if (c != null) {
       c.getChannel().close();
     }
   }
 
-  /**
-   * Decorate a connection creation so that we capture a success and keep it available for future requests. If we have
-   * raced and another is already available... we return that one and close things down on this one.
-   */
-  private class ConnectionListeningDecorator implements RpcConnectionHandler<CONNECTION_TYPE> {
-
-    private final RpcConnectionHandler<CONNECTION_TYPE> delegate;
-
-    public ConnectionListeningDecorator(RpcConnectionHandler<CONNECTION_TYPE> delegate) {
-      this.delegate = delegate;
-    }
-
-    @Override
-    public void connectionSucceeded(CONNECTION_TYPE incoming) {
-      CONNECTION_TYPE connection = connectionHolder.get();
-      while (true) {
-        boolean setted = connectionHolder.compareAndSet(null, incoming);
-        if (setted) {
-          connection = incoming;
-          break;
-        }
-        connection = connectionHolder.get();
-        if (connection != null) {
-          break;
-        }
-      }
-
-      if (connection == incoming) {
-        delegate.connectionSucceeded(connection);
-      } else {
-        // close the incoming because another channel was created in the mean time (unless this is a self connection).
-        logger.debug("Closing incoming connection because a connection was already set.");
-        incoming.getChannel().close();
-        delegate.connectionSucceeded(connection);
-      }
-    }
-
-    @Override
-    public void connectionFailed(org.apache.drill.exec.rpc.RpcConnectionHandler.FailureType type, Throwable t) {
-      delegate.connectionFailed(type, t);
-    }
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/d068b3e3/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
----------------------------------------------------------------------
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
index cd97ab7..ffb7429 100644
--- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
@@ -18,203 +18,40 @@
 package org.apache.drill.exec.rpc;
 
 import io.netty.channel.Channel;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandlerAdapter;
-import io.netty.channel.socket.SocketChannel;
-import java.util.concurrent.ExecutionException;
-
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.proto.UserBitShared.DrillPBError;
 
-public abstract class RemoteConnection implements ConnectionThrottle, AutoCloseable {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RemoteConnection.class);
-  private final Channel channel;
-  private final WriteManager writeManager;
-  private final RequestIdMap requestIdMap = new RequestIdMap();
-  private final String clientName;
-
-  private String name;
-
-  public boolean inEventLoop() {
-    return channel.eventLoop().inEventLoop();
-  }
-
-  public RemoteConnection(SocketChannel channel, String name) {
-    super();
-    this.channel = channel;
-    this.clientName = name;
-    this.writeManager = new WriteManager();
-    channel.pipeline().addLast(new BackPressureHandler());
-  }
-
-  public String getName() {
-    if (name == null) {
-      name = String.format("%s <--> %s (%s)", channel.localAddress(), channel.remoteAddress(), clientName);
-    }
-    return name;
-  }
-
-  public abstract BufferAllocator getAllocator();
-
-  public final Channel getChannel() {
-    return channel;
-  }
-
-  public boolean blockOnNotWritable(RpcOutcomeListener<?> listener) {
-    try {
-      writeManager.waitForWritable();
-      return true;
-    } catch (final InterruptedException e) {
-      listener.interrupted(e);
-
-      // Preserve evidence that the interruption occurred so that code higher up
-      // on the call stack can learn of the
-      // interruption and respond to it if it wants to.
-      Thread.currentThread().interrupt();
-
-      return false;
-    }
-  }
-
-  public void setAutoRead(boolean enableAutoRead) {
-    channel.config().setAutoRead(enableAutoRead);
-  }
-
-  public boolean isActive() {
-    return (channel != null) && channel.isActive();
-  }
-
-  /**
-   * The write manager is responsible for controlling whether or not a write can
-   * be sent. It controls whether or not to block a sender if we have tcp
-   * backpressure on the receive side.
-   */
-  private static class WriteManager {
-    private final ResettableBarrier barrier = new ResettableBarrier();
-    private volatile boolean disabled = false;
-
-    public WriteManager() {
-      barrier.openBarrier();
-    }
-
-    public void waitForWritable() throws InterruptedException {
-      barrier.await();
-    }
-
-    public void setWritable(boolean isWritable) {
-      if (isWritable) {
-        barrier.openBarrier();
-      } else if (!disabled) {
-        barrier.closeBarrier();
-      }
-
-    }
-
-    public void disable() {
-      disabled = true;
-    }
-  }
-
-  private class BackPressureHandler extends ChannelInboundHandlerAdapter {
-
-    @Override
-    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
-      writeManager.setWritable(ctx.channel().isWritable());
-      ctx.fireChannelWritabilityChanged();
-    }
-
-  }
-
-  /**
-   * For incoming messages, remove the outcome listener and return it. Can only be done once per coordinationId
-   * creation. CoordinationId's are recycled so they will show up once we run through all 4B of them.
-   * @param rpcType The rpc type associated with the coordination.
-   * @param coordinationId The coordination id that was returned with the listener was created.
-   * @param clazz The class that is expected in response.
-   * @return An RpcOutcome associated with the provided coordinationId.
-   */
-  <V> RpcOutcome<V> getAndRemoveRpcOutcome(int rpcType, int coordinationId, Class<V> clazz) {
-    return requestIdMap.getAndRemoveRpcOutcome(rpcType, coordinationId, clazz);
-  }
-
-  /**
-   * Create a new rpc listener that will be notified when the response is returned.
-   * @param handler The outcome handler to be notified when the response arrives.
-   * @param clazz The Class associated with the response object.
-   * @return The new listener. Also carries the coordination id for use in the rpc message.
-   */
-  <V> ChannelListenerWithCoordinationId createNewRpcListener(RpcOutcomeListener<V> handler, Class<V> clazz) {
-    return requestIdMap.createNewRpcListener(handler, clazz, this);
-  }
-
-  /**
-   * Inform the local outcome listener that the remote operation could not be handled.
-   * @param coordinationId The id that failed.
-   * @param failure The failure that occurred.
-   */
-  void recordRemoteFailure(int coordinationId, DrillPBError failure) {
-    requestIdMap.recordRemoteFailure(coordinationId, failure);
-  }
-
-  /**
-   * Called from the RpcBus's channel close handler to close all remaining
-   * resources associated with this connection. Ensures that any pending
-   * back-pressure items are also unblocked so they can be thrown away.
-   *
-   * @param ex
-   *          The exception that caused the channel to close.
-   */
-  void channelClosed(RpcException ex) {
-    // this could possibly overrelease but it doesn't matter since we're only
-    // going to do this to ensure that we
-    // fail out any pending messages
-    writeManager.disable();
-    writeManager.setWritable(true);
-
-    // ensure outstanding requests are cleaned up.
-    requestIdMap.channelClosed(ex);
-  }
-
-  /**
-   * Closes all resources connected with current session.
-   * By default has no implementation.
-   */
-  public void closeSession() {
-  }
-
-  /**
-   * Connection consumer wants to close connection. Initiate connection close
-   * and complete. This is a blocking call that ensures that the connection is
-   * closed before returning. As part of this call, the channel close handler
-   * will be triggered which will call channelClosed() above. The latter will
-   * happen in a separate thread while this method is blocking.
-   *
-   * <p>
-   *   The check for isActive is not required here since channel can be in OPEN state without being active. We want
-   *   to close in both the scenarios. A channel is in OPEN state when a socket is created for it before binding to an
-   *   address.
-   *   <li>
-   *      For connection oriented transport protocol channel moves to ACTIVE state when a connection is established
-   *      using this channel. We need to have channel in ACTIVE state NOT OPEN before we can send any message to
-   *      remote endpoint.
-   *   </li>
-   *   <li>
-   *      For connectionless transport protocol a sender can send data as soon as channel moves to OPEN state.
-   *   </li>
-   * </p>
-   */
+import java.net.SocketAddress;
+
+public interface RemoteConnection extends ConnectionThrottle, AutoCloseable {
+
+  boolean inEventLoop();
+
+  String getName();
+
+  BufferAllocator getAllocator();
+
+  Channel getChannel();
+
+  boolean blockOnNotWritable(RpcOutcomeListener<?> listener);
+
+  boolean isActive();
+
+  // should be invoked only within package
+  <V> RpcOutcome<V> getAndRemoveRpcOutcome(int rpcType, int coordinationId, Class<V> clazz);
+
+  // should be invoked only within package
+  <V> ChannelListenerWithCoordinationId createNewRpcListener(RpcOutcomeListener<V> handler, Class<V> clazz);
+
+  // should be invoked only within package
+  void recordRemoteFailure(int coordinationId, DrillPBError failure);
+
+  // should be invoked only within package
+  void channelClosed(RpcException ex);
+
+  SocketAddress getRemoteAddress();
+
   @Override
-  public void close() {
-    try {
-      channel.close().get();
-    } catch (final InterruptedException | ExecutionException e) {
-      logger.warn("Caught exception while closing channel.", e);
-
-      // Preserve evidence that the interruption occurred so that code higher up
-      // on the call stack can learn of the
-      // interruption and respond to it if it wants to.
-      Thread.currentThread().interrupt();
-    }
-  }
+  void close();
 
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/d068b3e3/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RequestHandler.java
----------------------------------------------------------------------
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RequestHandler.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RequestHandler.java
new file mode 100644
index 0000000..44dd5b3
--- /dev/null
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RequestHandler.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc;
+
+import io.netty.buffer.ByteBuf;
+
+/**
+ * Note that if a handler maintains any internal state, the state will be disposed if the handler on the connection
+ * changes. So handler should not maintain state.
+ *
+ * @param <S> server connection type
+ */
+public interface RequestHandler<S extends ServerConnection<S>> {
+
+  /**
+   * Handle request of given type (rpcType) with message (pBody) and optional data (dBody)
+   * on the connection, and return the appropriate response.
+   *
+   * The method must do one of three things:
+   * + use {@link ResponseSender#send send} the response
+   * + throw UserRpcException, in which case a response will be sent using {@link ResponseSender#send send}
+   * + throw an Exception, in which case, the connection will be dropped
+   *
+   * @param connection remote connection
+   * @param rpcType    rpc type
+   * @param pBody      message
+   * @param dBody      data, maybe null
+   * @param sender     used to {@link ResponseSender#send send} the response
+   * @return response to the request
+   * @throws RpcException
+   */
+  void handle(S connection, int rpcType, ByteBuf pBody, ByteBuf dBody, ResponseSender sender)
+      throws RpcException;
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/d068b3e3/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
----------------------------------------------------------------------
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcBus.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
index cdb9c07..38cba49 100644
--- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
@@ -51,7 +51,8 @@ import com.google.protobuf.Parser;
  * The Rpc Bus deals with incoming and outgoing communication and is used on both the server and the client side of a
  * system.
  *
- * @param <T>
+ * @param <T> RPC type
+ * @param <C> Remote connection type
  */
 public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> implements Closeable {
   final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(this.getClass());
@@ -61,11 +62,8 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp
 
   protected abstract MessageLite getResponseDefaultInstance(int rpcType) throws RpcException;
 
-  protected void handle(C connection, int rpcType, ByteBuf pBody, ByteBuf dBody, ResponseSender sender) throws RpcException{
-    sender.send(handle(connection, rpcType, pBody, dBody));
-  }
-
-  protected abstract Response handle(C connection, int rpcType, ByteBuf pBody, ByteBuf dBody) throws RpcException;
+  protected abstract void handle(C connection, int rpcType, ByteBuf pBody, ByteBuf dBody, ResponseSender sender)
+      throws RpcException;
 
   protected final RpcConfig rpcConfig;
 
@@ -82,20 +80,23 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp
     this.local = local;
   }
 
-  <SEND extends MessageLite, RECEIVE extends MessageLite> DrillRpcFuture<RECEIVE> send(C connection, T rpcType,
-      SEND protobufBody, Class<RECEIVE> clazz, ByteBuf... dataBodies) {
+  public <SEND extends MessageLite, RECEIVE extends MessageLite>
+  DrillRpcFuture<RECEIVE> send(C connection, T rpcType, SEND protobufBody, Class<RECEIVE> clazz,
+                               ByteBuf... dataBodies) {
     DrillRpcFutureImpl<RECEIVE> rpcFuture = new DrillRpcFutureImpl<RECEIVE>();
     this.send(rpcFuture, connection, rpcType, protobufBody, clazz, dataBodies);
     return rpcFuture;
   }
 
-  public <SEND extends MessageLite, RECEIVE extends MessageLite> void send(RpcOutcomeListener<RECEIVE> listener, C connection, T rpcType,
-      SEND protobufBody, Class<RECEIVE> clazz, ByteBuf... dataBodies) {
+  public <SEND extends MessageLite, RECEIVE extends MessageLite>
+  void send(RpcOutcomeListener<RECEIVE> listener, C connection, T rpcType, SEND protobufBody, Class<RECEIVE> clazz,
+            ByteBuf... dataBodies) {
     send(listener, connection, rpcType, protobufBody, clazz, false, dataBodies);
   }
 
-  public <SEND extends MessageLite, RECEIVE extends MessageLite> void send(RpcOutcomeListener<RECEIVE> listener, C connection, T rpcType,
-      SEND protobufBody, Class<RECEIVE> clazz, boolean allowInEventLoop, ByteBuf... dataBodies) {
+  public <SEND extends MessageLite, RECEIVE extends MessageLite>
+  void send(RpcOutcomeListener<RECEIVE> listener, C connection, T rpcType, SEND protobufBody, Class<RECEIVE> clazz,
+            boolean allowInEventLoop, ByteBuf... dataBodies) {
 
     Preconditions
         .checkArgument(
@@ -141,7 +142,7 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp
     }
   }
 
-  public abstract C initRemoteConnection(SocketChannel channel);
+  protected abstract C initRemoteConnection(SocketChannel channel);
 
   public class ChannelClosedHandler implements GenericFutureListener<ChannelFuture> {
 
@@ -163,12 +164,10 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp
         msg = String.format("Channel closed %s <--> %s.", future.channel().localAddress(), future.channel().remoteAddress());
       }
 
-      final ChannelClosedException ex = future.cause() != null ? new ChannelClosedException(msg, future.cause()) : new ChannelClosedException(msg);
-      try {
-        clientConnection.closeSession();
-      } finally {
-        clientConnection.channelClosed(ex);
-      }
+      final ChannelClosedException ex = future.cause() != null ?
+          new ChannelClosedException(msg, future.cause()) :
+          new ChannelClosedException(msg);
+      clientConnection.channelClosed(ex);
     }
 
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/d068b3e3/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcConnectionHandler.java
----------------------------------------------------------------------
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcConnectionHandler.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcConnectionHandler.java
index 7618231..7d158c1 100644
--- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcConnectionHandler.java
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcConnectionHandler.java
@@ -18,11 +18,11 @@
 package org.apache.drill.exec.rpc;
 
 public interface RpcConnectionHandler<T extends RemoteConnection> {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RpcConnectionHandler.class);
 
-  public static enum FailureType{CONNECTION, HANDSHAKE_COMMUNICATION, HANDSHAKE_VALIDATION}
+  enum FailureType {CONNECTION, HANDSHAKE_COMMUNICATION, HANDSHAKE_VALIDATION, AUTHENTICATION}
 
-  public void connectionSucceeded(T connection);
-  public void connectionFailed(FailureType type, Throwable t);
+  void connectionSucceeded(T connection);
+
+  void connectionFailed(FailureType type, Throwable t);
 
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/d068b3e3/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ServerConnection.java
----------------------------------------------------------------------
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ServerConnection.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ServerConnection.java
new file mode 100644
index 0000000..3cdfdb2
--- /dev/null
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ServerConnection.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc;
+
+import javax.security.sasl.SaslServer;
+import java.io.IOException;
+
+public interface ServerConnection<S extends ServerConnection<S>> extends RemoteConnection {
+
+  // init only once
+  void initSaslServer(String mechanismName) throws IOException;
+
+  // get only after setting
+  SaslServer getSaslServer();
+
+  void finalizeSaslSession() throws IOException;
+
+  RequestHandler<S> getCurrentHandler();
+
+  void changeHandlerTo(RequestHandler<S> handler);
+
+}


Mime
View raw message