drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jacq...@apache.org
Subject [27/45] drill git commit: DRILL-3987: (MOVE) Extract RPC, memory-base and memory-impl as separate modules.
Date Fri, 13 Nov 2015 02:37:57 GMT
http://git-wip-us.apache.org/repos/asf/drill/blob/1fde9bb1/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
deleted file mode 100644
index cf09be3..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
+++ /dev/null
@@ -1,316 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.rpc;
-
-import io.netty.bootstrap.Bootstrap;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufAllocator;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.ChannelPipeline;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.handler.timeout.IdleState;
-import io.netty.handler.timeout.IdleStateEvent;
-import io.netty.handler.timeout.IdleStateHandler;
-import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.GenericFutureListener;
-
-import java.net.SocketAddress;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.proto.GeneralRPCProtos.RpcMode;
-import org.apache.drill.exec.rpc.RpcConnectionHandler.FailureType;
-
-import com.google.protobuf.Internal.EnumLite;
-import com.google.protobuf.MessageLite;
-import com.google.protobuf.Parser;
-
-public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection, HANDSHAKE_SEND extends MessageLite, HANDSHAKE_RESPONSE extends MessageLite>
-    extends RpcBus<T, R> {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BasicClient.class);
-
-  // The percentage of time that should pass before sending a ping message to ensure server doesn't time us out. For
-  // example, if timeout is set to 30 seconds and we set percentage to 0.5, then if no write has happened within 15
-  // seconds, the idle state handler will send a ping message.
-  private static final double PERCENT_TIMEOUT_BEFORE_SENDING_PING = 0.5;
-
-  private final Bootstrap b;
-  protected R connection;
-  private final T handshakeType;
-  private final Class<HANDSHAKE_RESPONSE> responseClass;
-  private final Parser<HANDSHAKE_RESPONSE> handshakeParser;
-
-  private final IdlePingHandler pingHandler;
-
-  public BasicClient(RpcConfig rpcMapping, ByteBufAllocator alloc, EventLoopGroup eventLoopGroup, T handshakeType,
-      Class<HANDSHAKE_RESPONSE> responseClass, Parser<HANDSHAKE_RESPONSE> handshakeParser) {
-    super(rpcMapping);
-    this.responseClass = responseClass;
-    this.handshakeType = handshakeType;
-    this.handshakeParser = handshakeParser;
-    final long timeoutInMillis = rpcMapping.hasTimeout() ? (long) (rpcMapping.getTimeout() * 1000.0 * PERCENT_TIMEOUT_BEFORE_SENDING_PING)
-        : -1;
-    this.pingHandler = rpcMapping.hasTimeout() ? new IdlePingHandler(timeoutInMillis) : null;
-
-    b = new Bootstrap() //
-        .group(eventLoopGroup) //
-        .channel(TransportCheck.getClientSocketChannel()) //
-        .option(ChannelOption.ALLOCATOR, alloc) //
-        .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 30 * 1000)
-        .option(ChannelOption.SO_REUSEADDR, true)
-        .option(ChannelOption.SO_RCVBUF, 1 << 17) //
-        .option(ChannelOption.SO_SNDBUF, 1 << 17) //
-        .option(ChannelOption.TCP_NODELAY, true)
-        .handler(new ChannelInitializer<SocketChannel>() {
-
-          @Override
-          protected void initChannel(SocketChannel ch) throws Exception {
-            // logger.debug("initializing client connection.");
-            connection = initRemoteConnection(ch);
-
-            ch.closeFuture().addListener(getCloseHandler(ch, connection));
-
-            final ChannelPipeline pipe = ch.pipeline();
-
-            pipe.addLast("protocol-decoder", getDecoder(connection.getAllocator()));
-            pipe.addLast("message-decoder", new RpcDecoder("c-" + rpcConfig.getName()));
-            pipe.addLast("protocol-encoder", new RpcEncoder("c-" + rpcConfig.getName()));
-            pipe.addLast("handshake-handler", new ClientHandshakeHandler());
-
-            if(pingHandler != null){
-              pipe.addLast("idle-state-handler", pingHandler);
-            }
-
-            pipe.addLast("message-handler", new InboundHandler(connection));
-            pipe.addLast("exception-handler", new RpcExceptionHandler(connection));
-          }
-        }); //
-
-    // if(TransportCheck.SUPPORTS_EPOLL){
-    // b.option(EpollChannelOption.SO_REUSEPORT, true); //
-    // }
-  }
-
-  public R initRemoteConnection(SocketChannel channel){
-    local=channel.localAddress();
-    remote=channel.remoteAddress();
-    return null;
-  };
-
-  private static final OutboundRpcMessage PING_MESSAGE = new OutboundRpcMessage(RpcMode.PING, 0, 0, Acks.OK);
-
-  /**
-   * Handler that watches for situations where we haven't read from the socket in a certain timeout. If we exceed this
-   * timeout, we send a PING message to the server to state that we are still alive.
-   */
-  private class IdlePingHandler extends IdleStateHandler {
-
-    private GenericFutureListener<Future<? super Void>> pingFailedHandler = new GenericFutureListener<Future<? super Void>>() {
-      public void operationComplete(Future<? super Void> future) throws Exception {
-        if (!future.isSuccess()) {
-          logger.error("Unable to maintain connection {}.  Closing connection.", connection.getName());
-          connection.close();
-        }
-      }
-    };
-
-    public IdlePingHandler(long idleWaitInMillis) {
-      super(0, idleWaitInMillis, 0, TimeUnit.MILLISECONDS);
-    }
-
-    @Override
-    protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
-      if (evt.state() == IdleState.WRITER_IDLE) {
-        ctx.writeAndFlush(PING_MESSAGE).addListener(pingFailedHandler);
-      }
-    }
-  }
-
-  public abstract ProtobufLengthDecoder getDecoder(BufferAllocator allocator);
-
-  public boolean isActive() {
-    return connection != null
-        && connection.getChannel() != null
-        && connection.getChannel().isActive();
-  }
-
-  protected abstract void validateHandshake(HANDSHAKE_RESPONSE validateHandshake) throws RpcException;
-
-  protected abstract void finalizeConnection(HANDSHAKE_RESPONSE handshake, R connection);
-
-  public <SEND extends MessageLite, RECEIVE extends MessageLite> void send(RpcOutcomeListener<RECEIVE> listener,
-      T rpcType, SEND protobufBody, Class<RECEIVE> clazz, ByteBuf... dataBodies) {
-    super.send(listener, connection, rpcType, protobufBody, clazz, dataBodies);
-  }
-
-  public <SEND extends MessageLite, RECEIVE extends MessageLite> DrillRpcFuture<RECEIVE> send(T rpcType,
-      SEND protobufBody, Class<RECEIVE> clazz, ByteBuf... dataBodies) {
-    return super.send(connection, rpcType, protobufBody, clazz, dataBodies);
-  }
-
-  @Override
-  public boolean isClient() {
-    return true;
-  }
-
-  protected void connectAsClient(RpcConnectionHandler<R> connectionListener, HANDSHAKE_SEND handshakeValue,
-      String host, int port) {
-    ConnectionMultiListener cml = new ConnectionMultiListener(connectionListener, handshakeValue);
-    b.connect(host, port).addListener(cml.connectionHandler);
-  }
-
-  private class ConnectionMultiListener {
-    private final RpcConnectionHandler<R> l;
-    private final HANDSHAKE_SEND handshakeValue;
-
-    public ConnectionMultiListener(RpcConnectionHandler<R> l, HANDSHAKE_SEND handshakeValue) {
-      assert l != null;
-      assert handshakeValue != null;
-
-      this.l = l;
-      this.handshakeValue = handshakeValue;
-    }
-
-    public final ConnectionHandler connectionHandler = new ConnectionHandler();
-    public final HandshakeSendHandler handshakeSendHandler = new HandshakeSendHandler();
-
-    /**
-     * Manages connection establishment outcomes.
-     */
-    private class ConnectionHandler implements GenericFutureListener<ChannelFuture> {
-
-      @Override
-      public void operationComplete(ChannelFuture future) throws Exception {
-        boolean isInterrupted = false;
-
-        // We want to wait for at least 120 secs when interrupts occur. Establishing a connection fails/succeeds quickly,
-        // So there is no point propagating the interruption as failure immediately.
-        long remainingWaitTimeMills = 120000;
-        long startTime = System.currentTimeMillis();
-        // logger.debug("Connection operation finished.  Success: {}", future.isSuccess());
-        while(true) {
-          try {
-            future.get(remainingWaitTimeMills, TimeUnit.MILLISECONDS);
-            if (future.isSuccess()) {
-              SocketAddress remote = future.channel().remoteAddress();
-              SocketAddress local = future.channel().localAddress();
-              setAddresses(remote, local);
-              // send a handshake on the current thread. This is the only time we will send from within the event thread.
-              // We can do this because the connection will not be backed up.
-              send(handshakeSendHandler, connection, handshakeType, handshakeValue, responseClass, true);
-            } else {
-              l.connectionFailed(FailureType.CONNECTION, new RpcException("General connection failure."));
-            }
-            // logger.debug("Handshake queued for send.");
-            break;
-          } catch (final InterruptedException interruptEx) {
-            remainingWaitTimeMills -= (System.currentTimeMillis() - startTime);
-            startTime = System.currentTimeMillis();
-            isInterrupted = true;
-            if (remainingWaitTimeMills < 1) {
-              l.connectionFailed(FailureType.CONNECTION, interruptEx);
-              break;
-            }
-            // Ignore the interrupt and continue to wait until we elapse remainingWaitTimeMills.
-          } catch (final Exception ex) {
-            logger.error("Failed to establish connection", ex);
-            l.connectionFailed(FailureType.CONNECTION, ex);
-            break;
-          }
-        }
-
-        if (isInterrupted) {
-          // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
-          // interruption and respond to it if it wants to.
-          Thread.currentThread().interrupt();
-        }
-      }
-    }
-
-    /**
-     * manages handshake outcomes.
-     */
-    private class HandshakeSendHandler implements RpcOutcomeListener<HANDSHAKE_RESPONSE> {
-
-      @Override
-      public void failed(RpcException ex) {
-        logger.debug("Failure while initiating handshake", ex);
-        l.connectionFailed(FailureType.HANDSHAKE_COMMUNICATION, ex);
-      }
-
-      @Override
-      public void success(HANDSHAKE_RESPONSE value, ByteBuf buffer) {
-        // logger.debug("Handshake received. {}", value);
-        try {
-          BasicClient.this.validateHandshake(value);
-          BasicClient.this.finalizeConnection(value, connection);
-          l.connectionSucceeded(connection);
-          // logger.debug("Handshake completed succesfully.");
-        } catch (RpcException ex) {
-          l.connectionFailed(FailureType.HANDSHAKE_VALIDATION, ex);
-        }
-      }
-
-      @Override
-      public void interrupted(final InterruptedException ex) {
-        logger.warn("Interrupted while waiting for handshake response", ex);
-        l.connectionFailed(FailureType.HANDSHAKE_COMMUNICATION, ex);
-      }
-    }
-
-  }
-
-  private class ClientHandshakeHandler extends AbstractHandshakeHandler<HANDSHAKE_RESPONSE> {
-
-    public ClientHandshakeHandler() {
-      super(BasicClient.this.handshakeType, BasicClient.this.handshakeParser);
-    }
-
-    @Override
-    protected final void consumeHandshake(ChannelHandlerContext ctx, HANDSHAKE_RESPONSE msg) throws Exception {
-      // remove the handshake information from the queue so it doesn't sit there forever.
-      RpcOutcome<HANDSHAKE_RESPONSE> response = queue.getFuture(handshakeType.getNumber(), coordinationId,
-          responseClass);
-      response.set(msg, null);
-    }
-
-  }
-
-  public void setAutoRead(boolean enableAutoRead) {
-    connection.setAutoRead(enableAutoRead);
-  }
-
-  public void close() {
-    logger.debug("Closing client");
-    try {
-      connection.getChannel().close().get();
-    } catch (final InterruptedException | ExecutionException e) {
-      logger.warn("Failure while shutting {}", this.getClass().getName(), e);
-
-      // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
-      // interruption and respond to it if it wants to.
-      Thread.currentThread().interrupt();
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/1fde9bb1/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java
deleted file mode 100644
index c194b5e..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.rpc;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.socket.SocketChannel;
-
-import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.rpc.BasicClientWithConnection.ServerConnection;
-import org.apache.drill.exec.rpc.user.ConnectionThrottle;
-
-import com.google.protobuf.Internal.EnumLite;
-import com.google.protobuf.MessageLite;
-import com.google.protobuf.Parser;
-
-public abstract class BasicClientWithConnection<T extends EnumLite, HANDSHAKE_SEND extends MessageLite, HANDSHAKE_RESPONSE extends MessageLite> extends BasicClient<T, ServerConnection, HANDSHAKE_SEND, HANDSHAKE_RESPONSE>{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BasicClientWithConnection.class);
-
-  private BufferAllocator alloc;
-  private final String connectionName;
-
-  public BasicClientWithConnection(RpcConfig rpcMapping, BufferAllocator alloc, EventLoopGroup eventLoopGroup, T handshakeType,
-      Class<HANDSHAKE_RESPONSE> responseClass, Parser<HANDSHAKE_RESPONSE> handshakeParser, String connectionName) {
-    super(rpcMapping, alloc.getUnderlyingAllocator(), eventLoopGroup, handshakeType, responseClass, handshakeParser);
-    this.alloc = alloc;
-    this.connectionName = connectionName;
-  }
-
-  @Override
-  protected Response handle(ServerConnection connection, int rpcType, ByteBuf pBody, ByteBuf dBody) throws RpcException {
-    return handleReponse( (ConnectionThrottle) connection, rpcType, pBody, dBody);
-  }
-
-  protected abstract Response handleReponse(ConnectionThrottle throttle, int rpcType, ByteBuf pBody, ByteBuf dBody) throws RpcException ;
-
-  @Override
-  public ServerConnection initRemoteConnection(SocketChannel channel) {
-    super.initRemoteConnection(channel);
-    return new ServerConnection(connectionName, channel, alloc);
-  }
-
-  public static class ServerConnection extends RemoteConnection{
-
-    private final BufferAllocator alloc;
-
-    public ServerConnection(String name, SocketChannel channel, BufferAllocator alloc) {
-      super(channel, name);
-      this.alloc = alloc;
-    }
-
-    @Override
-    public BufferAllocator getAllocator() {
-      return alloc;
-    }
-
-
-
-  }
-
-
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/1fde9bb1/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
deleted file mode 100644
index df12fe7..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
+++ /dev/null
@@ -1,243 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.rpc;
-
-import io.netty.bootstrap.ServerBootstrap;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufAllocator;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.ChannelPipeline;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.handler.timeout.ReadTimeoutHandler;
-import io.netty.util.concurrent.EventExecutor;
-import io.netty.util.concurrent.Future;
-
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-
-import java.io.IOException;
-import java.net.BindException;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.exec.exception.DrillbitStartupException;
-import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.proto.GeneralRPCProtos.RpcMode;
-
-import com.google.protobuf.Internal.EnumLite;
-import com.google.common.base.Stopwatch;
-import com.google.common.io.Closeables;
-import com.google.protobuf.MessageLite;
-import com.google.protobuf.Parser;
-
-/**
- * A server is bound to a port and is responsible for responding to various type of requests. In some cases, the inbound
- * requests will generate more than one outbound request.
- */
-public abstract class BasicServer<T extends EnumLite, C extends RemoteConnection> extends RpcBus<T, C> {
-  final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(this.getClass());
-
-  protected static final String TIMEOUT_HANDLER = "timeout-handler";
-
-  private ServerBootstrap b;
-  private volatile boolean connect = false;
-  private final EventLoopGroup eventLoopGroup;
-
-  public BasicServer(final RpcConfig rpcMapping, ByteBufAllocator alloc, EventLoopGroup eventLoopGroup) {
-    super(rpcMapping);
-    this.eventLoopGroup = eventLoopGroup;
-
-    b = new ServerBootstrap()
-        .channel(TransportCheck.getServerSocketChannel())
-        .option(ChannelOption.SO_BACKLOG, 1000)
-        .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 30*1000)
-        .option(ChannelOption.TCP_NODELAY, true)
-        .option(ChannelOption.SO_REUSEADDR, true)
-        .option(ChannelOption.SO_RCVBUF, 1 << 17)
-        .option(ChannelOption.SO_SNDBUF, 1 << 17)
-        .group(eventLoopGroup) //
-        .childOption(ChannelOption.ALLOCATOR, alloc)
-
-        // .handler(new LoggingHandler(LogLevel.INFO))
-
-        .childHandler(new ChannelInitializer<SocketChannel>() {
-          @Override
-          protected void initChannel(SocketChannel ch) throws Exception {
-//            logger.debug("Starting initialization of server connection.");
-            C connection = initRemoteConnection(ch);
-            ch.closeFuture().addListener(getCloseHandler(ch, connection));
-
-            final ChannelPipeline pipe = ch.pipeline();
-            pipe.addLast("protocol-decoder", getDecoder(connection.getAllocator(), getOutOfMemoryHandler()));
-            pipe.addLast("message-decoder", new RpcDecoder("s-" + rpcConfig.getName()));
-            pipe.addLast("protocol-encoder", new RpcEncoder("s-" + rpcConfig.getName()));
-            pipe.addLast("handshake-handler", getHandshakeHandler(connection));
-
-            if (rpcMapping.hasTimeout()) {
-              pipe.addLast(TIMEOUT_HANDLER,
-                  new LogggingReadTimeoutHandler(connection, rpcMapping.getTimeout()));
-            }
-
-            pipe.addLast("message-handler", new InboundHandler(connection));
-            pipe.addLast("exception-handler", new RpcExceptionHandler(connection));
-
-            connect = true;
-//            logger.debug("Server connection initialization completed.");
-          }
-        });
-
-//     if(TransportCheck.SUPPORTS_EPOLL){
-//       b.option(EpollChannelOption.SO_REUSEPORT, true); //
-//     }
-  }
-
-  private class LogggingReadTimeoutHandler<C extends RemoteConnection> extends ReadTimeoutHandler {
-
-    private final C connection;
-    private final int timeoutSeconds;
-    public LogggingReadTimeoutHandler(C connection, int timeoutSeconds) {
-      super(timeoutSeconds);
-      this.connection = connection;
-      this.timeoutSeconds = timeoutSeconds;
-    }
-
-    @Override
-    protected void readTimedOut(ChannelHandlerContext ctx) throws Exception {
-      logger.info("RPC connection {} timed out.  Timeout was set to {} seconds. Closing connection.", connection.getName(),
-          timeoutSeconds);
-      super.readTimedOut(ctx);
-    }
-
-  }
-
-  public OutOfMemoryHandler getOutOfMemoryHandler() {
-    return OutOfMemoryHandler.DEFAULT_INSTANCE;
-  }
-
-  protected void removeTimeoutHandler() {
-
-  }
-
-  public abstract ProtobufLengthDecoder getDecoder(BufferAllocator allocator, OutOfMemoryHandler outOfMemoryHandler);
-
-  @Override
-  public boolean isClient() {
-    return false;
-  }
-
-  protected abstract ServerHandshakeHandler<?> getHandshakeHandler(C connection);
-
-  protected static abstract class ServerHandshakeHandler<T extends MessageLite> extends AbstractHandshakeHandler<T> {
-
-    public ServerHandshakeHandler(EnumLite handshakeType, Parser<T> parser) {
-      super(handshakeType, parser);
-    }
-
-    @Override
-    protected void consumeHandshake(ChannelHandlerContext ctx, T inbound) throws Exception {
-      OutboundRpcMessage msg = new OutboundRpcMessage(RpcMode.RESPONSE, this.handshakeType, coordinationId,
-          getHandshakeResponse(inbound));
-      ctx.writeAndFlush(msg);
-    }
-
-    public abstract MessageLite getHandshakeResponse(T inbound) throws Exception;
-
-  }
-
-  @Override
-  protected MessageLite getResponseDefaultInstance(int rpcType) throws RpcException {
-    return null;
-  }
-
-  @Override
-  protected Response handle(C connection, int rpcType, ByteBuf pBody, ByteBuf dBody) throws RpcException {
-    return null;
-  }
-
-  @Override
-  public <SEND extends MessageLite, RECEIVE extends MessageLite> DrillRpcFuture<RECEIVE> send(C connection, T rpcType,
-      SEND protobufBody, Class<RECEIVE> clazz, ByteBuf... dataBodies) {
-    return super.send(connection, rpcType, protobufBody, clazz, dataBodies);
-  }
-
-  @Override
-  public <SEND extends MessageLite, RECEIVE extends MessageLite> void send(RpcOutcomeListener<RECEIVE> listener,
-      C connection, T rpcType, SEND protobufBody, Class<RECEIVE> clazz, ByteBuf... dataBodies) {
-    super.send(listener, connection, rpcType, protobufBody, clazz, dataBodies);
-  }
-
-  @Override
-  public C initRemoteConnection(SocketChannel channel) {
-    local = channel.localAddress();
-    remote = channel.remoteAddress();
-    return null;
-  }
-
-  public int bind(final int initialPort, boolean allowPortHunting) throws DrillbitStartupException {
-    int port = initialPort - 1;
-    while (true) {
-      try {
-        b.bind(++port).sync();
-        break;
-      } catch (Exception e) {
-        // TODO(DRILL-3026):  Revisit:  Exception is not (always) BindException.
-        // One case is "java.io.IOException: bind() failed: Address already in
-        // use".
-        if (e instanceof BindException && allowPortHunting) {
-          continue;
-        }
-        final UserException bindException =
-            UserException
-              .resourceError( e )
-              .addContext( "Server type", getClass().getSimpleName() )
-              .message( "Drillbit could not bind to port %s.", port )
-              .build(logger);
-        throw bindException;
-      }
-    }
-
-    connect = !connect;
-    logger.debug("Server of type {} started on port {}.", getClass().getSimpleName(), port);
-    return port;
-  }
-
-  @Override
-  public void close() throws IOException {
-    try {
-      Stopwatch watch = new Stopwatch().start();
-      // this takes 1s to complete
-      // known issue: https://github.com/netty/netty/issues/2545
-      eventLoopGroup.shutdownGracefully(0, 0, TimeUnit.SECONDS).get();
-      long elapsed = watch.elapsed(MILLISECONDS);
-      if (elapsed > 500) {
-        logger.info("closed eventLoopGroup " + eventLoopGroup + " in " + elapsed + " ms");
-      }
-    } catch (final InterruptedException | ExecutionException e) {
-      logger.warn("Failure while shutting down {}. ", this.getClass().getName(), e);
-
-      // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
-      // interruption and respond to it if it wants to.
-      Thread.currentThread().interrupt();
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/1fde9bb1/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ChannelClosedException.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ChannelClosedException.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ChannelClosedException.java
deleted file mode 100644
index b68efae..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ChannelClosedException.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.rpc;
-
-public class ChannelClosedException extends RpcException{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ChannelClosedException.class);
-
-  public ChannelClosedException() {
-    super();
-  }
-
-  public ChannelClosedException(String message, Throwable cause) {
-    super(message, cause);
-  }
-
-  public ChannelClosedException(String message) {
-    super(message);
-  }
-
-  public ChannelClosedException(Throwable cause) {
-    super(cause);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/1fde9bb1/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ChannelListenerWithCoordinationId.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ChannelListenerWithCoordinationId.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ChannelListenerWithCoordinationId.java
deleted file mode 100644
index 3b95dd8..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ChannelListenerWithCoordinationId.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.rpc;
-
-import io.netty.channel.ChannelFuture;
-import io.netty.util.concurrent.GenericFutureListener;
-
-public interface ChannelListenerWithCoordinationId extends GenericFutureListener<ChannelFuture>{
-  public int getCoordinationId();
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/1fde9bb1/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java
deleted file mode 100644
index 5a5bbab..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.rpc;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelFuture;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.drill.common.exceptions.UserRemoteException;
-import org.apache.drill.exec.proto.UserBitShared.DrillPBError;
-
-/**
- * Manages the creation of rpc futures for a particular socket.
- */
-public class CoordinationQueue {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CoordinationQueue.class);
-
-  private final PositiveAtomicInteger circularInt = new PositiveAtomicInteger();
-  private final Map<Integer, RpcOutcome<?>> map;
-
-  public CoordinationQueue(int segmentSize, int segmentCount) {
-    map = new ConcurrentHashMap<Integer, RpcOutcome<?>>(segmentSize, 0.75f, segmentCount);
-  }
-
-  void channelClosed(Throwable ex) {
-    if (ex != null) {
-      RpcException e;
-      if (ex instanceof RpcException) {
-        e = (RpcException) ex;
-      } else {
-        e = new RpcException(ex);
-      }
-      for (RpcOutcome<?> f : map.values()) {
-        f.setException(e);
-      }
-    }
-  }
-
-  public <V> ChannelListenerWithCoordinationId get(RpcOutcomeListener<V> handler, Class<V> clazz, RemoteConnection connection) {
-    int i = circularInt.getNext();
-    RpcListener<V> future = new RpcListener<V>(handler, clazz, i, connection);
-    Object old = map.put(i, future);
-    if (old != null) {
-      throw new IllegalStateException(
-          "You attempted to reuse a coordination id when the previous coordination id has not been removed.  This is likely rpc future callback memory leak.");
-    }
-    return future;
-  }
-
-  private class RpcListener<T> implements ChannelListenerWithCoordinationId, RpcOutcome<T>{
-    final RpcOutcomeListener<T> handler;
-    final Class<T> clazz;
-    final int coordinationId;
-    final RemoteConnection connection;
-
-    public RpcListener(RpcOutcomeListener<T> handler, Class<T> clazz, int coordinationId, RemoteConnection connection) {
-      super();
-      this.handler = handler;
-      this.clazz = clazz;
-      this.coordinationId = coordinationId;
-      this.connection = connection;
-    }
-
-    @Override
-    public void operationComplete(ChannelFuture future) throws Exception {
-
-      if (!future.isSuccess()) {
-        removeFromMap(coordinationId);
-        if (future.channel().isActive()) {
-           throw new RpcException("Future failed") ;
-        } else {
-          setException(new ChannelClosedException());
-        }
-      }
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public void set(Object value, ByteBuf buffer) {
-      assert clazz.isAssignableFrom(value.getClass());
-      handler.success( (T) value, buffer);
-    }
-
-    @Override
-    public void setException(Throwable t) {
-      handler.failed(RpcException.mapException(t));
-    }
-
-    @Override
-    public Class<T> getOutcomeType() {
-      return clazz;
-    }
-
-    @Override
-    public int getCoordinationId() {
-      return coordinationId;
-    }
-
-  }
-
-  private RpcOutcome<?> removeFromMap(int coordinationId) {
-    RpcOutcome<?> rpc = map.remove(coordinationId);
-    if (rpc == null) {
-      throw new IllegalStateException(
-          "Attempting to retrieve an rpc that wasn't first stored in the rpc coordination queue.  This would most likely happen if you're opposite endpoint sent multiple messages on the same coordination id.");
-    }
-    return rpc;
-  }
-
-  public <V> RpcOutcome<V> getFuture(int rpcType, int coordinationId, Class<V> clazz) {
-
-    RpcOutcome<?> rpc = removeFromMap(coordinationId);
-    // logger.debug("Got rpc from map {}", rpc);
-    Class<?> outcomeClass = rpc.getOutcomeType();
-
-    if (outcomeClass != clazz) {
-      throw new IllegalStateException(
-          String
-              .format(
-                  "RPC Engine had a submission and response configuration mismatch.  The RPC request that you submitted was defined with an expected response type of %s.  However, "
-                      + "when the response returned, a call to getResponseDefaultInstance() with Rpc number %d provided an expected class of %s.  This means either your submission uses the wrong type definition"
-                      + "or your getResponseDefaultInstance() method responds the wrong instance type ",
-                  clazz.getCanonicalName(), rpcType, outcomeClass.getCanonicalName()));
-    }
-
-    @SuppressWarnings("unchecked")
-    RpcOutcome<V> crpc = (RpcOutcome<V>) rpc;
-
-    // logger.debug("Returning casted future");
-    return crpc;
-  }
-
-  public void updateFailedFuture(int coordinationId, DrillPBError failure) {
-    // logger.debug("Updating failed future.");
-    try {
-      RpcOutcome<?> rpc = removeFromMap(coordinationId);
-      rpc.setException(new UserRemoteException(failure));
-    } catch(Exception ex) {
-      logger.warn("Failed to remove from map.  Not a problem since we were updating on failed future.", ex);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/1fde9bb1/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFuture.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFuture.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFuture.java
deleted file mode 100644
index d044432..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFuture.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.rpc;
-
-import io.netty.buffer.ByteBuf;
-
-import com.google.common.util.concurrent.CheckedFuture;
-
-public interface DrillRpcFuture<T> extends CheckedFuture<T,RpcException> {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillRpcFuture.class);
-
-  public ByteBuf getBuffer();
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/1fde9bb1/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFutureImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFutureImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFutureImpl.java
deleted file mode 100644
index cbe63c6..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFutureImpl.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.rpc;
-
-import io.netty.buffer.ByteBuf;
-
-import com.google.common.util.concurrent.AbstractCheckedFuture;
-import com.google.common.util.concurrent.AbstractFuture;
-
-class DrillRpcFutureImpl<V> extends AbstractCheckedFuture<V, RpcException> implements DrillRpcFuture<V>, RpcOutcomeListener<V>{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillRpcFutureImpl.class);
-
-  private volatile ByteBuf buffer;
-
-  public DrillRpcFutureImpl() {
-    super(new InnerFuture<V>());
-  }
-
-  @Override
-  protected RpcException mapException(Exception ex) {
-    return RpcException.mapException(ex);
-  }
-
-  private static class InnerFuture<T> extends AbstractFuture<T> {
-    // we rewrite these so that the parent can see them
-
-    void setValue(T value) {
-      super.set(value);
-    }
-
-    @Override
-    protected boolean setException(Throwable t) {
-      return super.setException(t);
-    }
-  }
-
-  @Override
-  public void failed(RpcException ex) {
-    ( (InnerFuture<V>)delegate()).setException(ex);
-  }
-
-  @Override
-  public void success(V value, ByteBuf buffer) {
-    this.buffer = buffer;
-    ( (InnerFuture<V>)delegate()).setValue(value);
-  }
-
-  @Override
-  public void interrupted(final InterruptedException ex) {
-    // Propagate the interrupt to inner future
-    ( (InnerFuture<V>)delegate()).cancel(true);
-  }
-
-  @Override
-  public ByteBuf getBuffer() {
-    return buffer;
-  }
-
-  public void release() {
-    if (buffer != null) {
-      buffer.release();
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/1fde9bb1/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/FutureBitCommand.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/FutureBitCommand.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/FutureBitCommand.java
deleted file mode 100644
index 53d0772..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/FutureBitCommand.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.rpc;
-
-import io.netty.buffer.ByteBuf;
-
-import com.google.common.util.concurrent.SettableFuture;
-import com.google.protobuf.MessageLite;
-
-public abstract class FutureBitCommand<T extends MessageLite, C extends RemoteConnection> implements RpcCommand<T,C> {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FutureBitCommand.class);
-
-  protected final SettableFuture<T> settableFuture;
-  private final RpcCheckedFuture<T> parentFuture;
-
-  public FutureBitCommand() {
-    this.settableFuture = SettableFuture.create();
-    this.parentFuture = new RpcCheckedFuture<T>(settableFuture);
-  }
-
-  public abstract void doRpcCall(RpcOutcomeListener<T> outcomeListener, C connection);
-
-  @Override
-  public void connectionAvailable(C connection) {
-
-    doRpcCall(new DeferredRpcOutcome(), connection);
-  }
-
-  @Override
-  public void connectionSucceeded(C connection) {
-    connectionAvailable(connection);
-  }
-
-  private class DeferredRpcOutcome implements RpcOutcomeListener<T> {
-
-    @Override
-    public void failed(RpcException ex) {
-      settableFuture.setException(ex);
-    }
-
-    @Override
-    public void success(T value, ByteBuf buf) {
-      settableFuture.set(value);
-    }
-
-    @Override
-    public void interrupted(final InterruptedException e) {
-      // If we are interrupted while performing the command, consider as failure.
-      logger.warn("Interrupted while running the command", e);
-      failed(new RpcException(e));
-    }
-  }
-
-  public DrillRpcFuture<T> getFuture() {
-    return parentFuture;
-  }
-
-  @Override
-  public void connectionFailed(FailureType type, Throwable t) {
-    settableFuture.setException(RpcException.mapException(
-        String.format("Command failed while establishing connection.  Failure type %s.", type), t));
-  }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/1fde9bb1/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/InboundRpcMessage.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/InboundRpcMessage.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/InboundRpcMessage.java
deleted file mode 100644
index d739034..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/InboundRpcMessage.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.rpc;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufInputStream;
-
-import java.io.InputStream;
-
-import org.apache.drill.exec.proto.GeneralRPCProtos.RpcMode;
-
-public class InboundRpcMessage extends RpcMessage{
-  public ByteBuf pBody;
-  public ByteBuf dBody;
-
-  public InboundRpcMessage(RpcMode mode, int rpcType, int coordinationId, ByteBuf pBody, ByteBuf dBody) {
-    super(mode, rpcType, coordinationId);
-    this.pBody = pBody;
-    this.dBody = dBody;
-  }
-
-  @Override
-  public int getBodySize() {
-    int len = pBody.capacity();
-    if (dBody != null) {
-      len += dBody.capacity();
-    }
-    return len;
-  }
-
-  @Override
-  void release() {
-    if (pBody != null) {
-      pBody.release();
-    }
-    if (dBody != null) {
-      dBody.release();
-    }
-  }
-
-  @Override
-  public String toString() {
-    return "InboundRpcMessage [pBody=" + pBody + ", mode=" + mode + ", rpcType=" + rpcType + ", coordinationId="
-        + coordinationId + ", dBody=" + dBody + "]";
-  }
-
-  public InputStream getProtobufBodyAsIS() {
-    return new ByteBufInputStream(pBody);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/1fde9bb1/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ListeningCommand.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ListeningCommand.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ListeningCommand.java
deleted file mode 100644
index 92022b0..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ListeningCommand.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.rpc;
-
-import io.netty.buffer.ByteBuf;
-
-import com.google.protobuf.MessageLite;
-
-public abstract class ListeningCommand<T extends MessageLite, C extends RemoteConnection> implements RpcCommand<T, C> {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ListeningCommand.class);
-
-  private final RpcOutcomeListener<T> listener;
-
-  public ListeningCommand(RpcOutcomeListener<T> listener) {
-    this.listener = listener;
-  }
-
-  public abstract void doRpcCall(RpcOutcomeListener<T> outcomeListener, C connection);
-
-  @Override
-  public void connectionAvailable(C connection) {
-
-    doRpcCall(new DeferredRpcOutcome(), connection);
-  }
-
-  @Override
-  public void connectionSucceeded(C connection) {
-    connectionAvailable(connection);
-  }
-
-  private class DeferredRpcOutcome implements RpcOutcomeListener<T> {
-
-    @Override
-    public void failed(RpcException ex) {
-      listener.failed(ex);
-    }
-
-    @Override
-    public void success(T value, ByteBuf buf) {
-      listener.success(value, buf);
-    }
-
-    @Override
-    public void interrupted(final InterruptedException e) {
-      listener.interrupted(e);
-    }
-  }
-
-  @Override
-  public void connectionFailed(FailureType type, Throwable t) {
-    listener.failed(RpcException.mapException(
-        String.format("Command failed while establishing connection.  Failure type %s.", type), t));
-  }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/1fde9bb1/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/NamedThreadFactory.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/NamedThreadFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/NamedThreadFactory.java
deleted file mode 100644
index e7580b2..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/NamedThreadFactory.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.rpc;
-
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * {@link ThreadFactory} for {@link ExecutorServices} that names threads sequentially.
- * Creates Threads named with the prefix specified at construction time. Created threads
- * have the daemon bit set and priority Thread.MAX_PRIORITY.
- *
- * <p>An instance creates names with an instance-specific prefix suffixed with sequential
- * integers.</p>
- *
- * <p>Concurrency: See {@link newThread}.</p>
- */
-public class NamedThreadFactory implements ThreadFactory {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(NamedThreadFactory.class);
-  private final AtomicInteger nextId = new AtomicInteger(); // used to generate unique ids
-  private final String prefix;
-
-  /**
-   * Constructor.
-   *
-   * @param prefix the string prefix that will be used to name threads created by this factory
-   */
-  public NamedThreadFactory(final String prefix) {
-    this.prefix = prefix;
-  }
-
- /**
-  * Creates a sequentially named thread running a given Runnable.
-  * <p>
-  *   The thread's name will be this instance's prefix concatenated with
-  *   this instance's next<sup><a href="#fn-1">*</a></sup> sequential integer.
-  * </p>
-  * <p>
-  *  Concurrency:  Thread-safe.
-  * </p>
-  * <p>
-  * (Concurrent calls get different numbers.
-  *  Calls started after other calls complete get later/higher numbers than
-  *  those other calls.
-  * </p>
-  * <p>
-  *  <a name="fn-1" />*However, for concurrent calls, the order of numbers
-  *  is not defined.)
-  */
-  @Override
-  public Thread newThread(final Runnable runnable) {
-    final Thread thread = new Thread(runnable, prefix + nextId.incrementAndGet());
-    thread.setDaemon(true);
-
-    try {
-      if (thread.getPriority() != Thread.MAX_PRIORITY) {
-        thread.setPriority(Thread.MAX_PRIORITY);
-      }
-    } catch (Exception ignored) {
-      // Doesn't matter even if failed to set.
-      logger.info("ignored exception " + ignored);
-    }
-    return thread;
-  }
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/1fde9bb1/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/OutOfMemoryHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/OutOfMemoryHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/OutOfMemoryHandler.java
deleted file mode 100644
index 5d7db47..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/OutOfMemoryHandler.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.rpc;
-
-public interface OutOfMemoryHandler {
-
-  public static OutOfMemoryHandler DEFAULT_INSTANCE = new OutOfMemoryHandler() {
-    @Override
-    public void handle() {
-      throw new UnsupportedOperationException();
-    }
-  };
-
-  public void handle();
-
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/1fde9bb1/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/OutboundRpcMessage.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/OutboundRpcMessage.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/OutboundRpcMessage.java
deleted file mode 100644
index 5eda350..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/OutboundRpcMessage.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.rpc;
-
-import io.netty.buffer.ByteBuf;
-
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.drill.exec.proto.GeneralRPCProtos.RpcMode;
-
-import com.google.common.collect.Lists;
-import com.google.protobuf.Internal.EnumLite;
-import com.google.protobuf.MessageLite;
-
-public class OutboundRpcMessage extends RpcMessage {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OutboundRpcMessage.class);
-
-  final MessageLite pBody;
-  public ByteBuf[] dBodies;
-
-
-
-  public OutboundRpcMessage(RpcMode mode, EnumLite rpcType, int coordinationId, MessageLite pBody, ByteBuf... dBodies) {
-      this(mode, rpcType.getNumber(), coordinationId, pBody, dBodies);
-  }
-
-
-  OutboundRpcMessage(RpcMode mode, int rpcTypeNumber, int coordinationId, MessageLite pBody, ByteBuf... dBodies) {
-    super(mode, rpcTypeNumber, coordinationId);
-    this.pBody = pBody;
-
-    // Netty doesn't traditionally release the reference on an unreadable buffer.  However, we need to so that if we send a empty or unwritable buffer, we still release.  otherwise we get weird memory leaks when sending empty vectors.
-    List<ByteBuf> bufs = Lists.newArrayList();
-    for (ByteBuf d : dBodies) {
-      if (d.readableBytes() == 0) {
-        d.release();
-      } else {
-        bufs.add(d);
-      }
-    }
-    this.dBodies = bufs.toArray(new ByteBuf[bufs.size()]);
-  }
-
-  @Override
-  public int getBodySize() {
-    int len = pBody.getSerializedSize();
-    len += RpcEncoder.getRawVarintSize(len);
-    len += getRawBodySize();
-    return len;
-  }
-
-  public int getRawBodySize() {
-    if (dBodies == null) {
-      return 0;
-    }
-    int len = 0;
-
-    for (int i = 0; i < dBodies.length; i++) {
-      if (RpcConstants.EXTRA_DEBUGGING) {
-        logger.debug("Reader Index {}, Writer Index {}", dBodies[i].readerIndex(), dBodies[i].writerIndex());
-      }
-      len += dBodies[i].readableBytes();
-    }
-    return len;
-  }
-
-  @Override
-  public String toString() {
-    return "OutboundRpcMessage [pBody=" + pBody + ", mode=" + mode + ", rpcType=" + rpcType + ", coordinationId="
-        + coordinationId + ", dBodies=" + Arrays.toString(dBodies) + "]";
-  }
-
-  @Override
-  void release() {
-    if (dBodies != null) {
-      for (ByteBuf b : dBodies) {
-        b.release();
-      }
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/1fde9bb1/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/PositiveAtomicInteger.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/PositiveAtomicInteger.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/PositiveAtomicInteger.java
deleted file mode 100644
index 401663d..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/PositiveAtomicInteger.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.rpc;
-
-import java.util.concurrent.atomic.AtomicInteger;
-
-/*
- * An atomic integer that only ever returns 0 > MAX_INT and then starts over.  Should never has a negative overflow.
- */
-public class PositiveAtomicInteger {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PositiveAtomicInteger.class);
-
-  private final AtomicInteger internal = new AtomicInteger(0);
-
-  public int getNext(){
-    int i = internal.addAndGet(1);
-    if(i < 0){
-      return i + (-Integer.MIN_VALUE);
-    }else{
-      return i;
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/1fde9bb1/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ProtobufLengthDecoder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ProtobufLengthDecoder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ProtobufLengthDecoder.java
deleted file mode 100644
index 3dfe03f..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ProtobufLengthDecoder.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.rpc;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.codec.ByteToMessageDecoder;
-import io.netty.handler.codec.CorruptedFrameException;
-
-import java.util.List;
-
-import org.apache.drill.exec.memory.BufferAllocator;
-
-import com.google.protobuf.CodedInputStream;
-import org.apache.drill.exec.exception.OutOfMemoryException;
-
-/**
- * Modified version of {@link io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder} that avoids bytebuf copy.
- * See the documentation there.
- */
-public class ProtobufLengthDecoder extends ByteToMessageDecoder {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProtobufLengthDecoder.class);
-
-  private BufferAllocator allocator;
-  private OutOfMemoryHandler outOfMemoryHandler;
-
-  public ProtobufLengthDecoder(BufferAllocator allocator, OutOfMemoryHandler outOfMemoryHandler) {
-    super();
-    this.allocator = allocator;
-    this.outOfMemoryHandler = outOfMemoryHandler;
-  }
-
-
-  @Override
-  protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
-    if (!ctx.channel().isOpen()) {
-      if (in.readableBytes() > 0) {
-        logger.info("Channel is closed, discarding remaining {} byte(s) in buffer.", in.readableBytes());
-      }
-      in.skipBytes(in.readableBytes());
-      return;
-    }
-
-    in.markReaderIndex();
-    final byte[] buf = new byte[5];
-    for (int i = 0; i < buf.length; i++) {
-      if (!in.isReadable()) {
-        in.resetReaderIndex();
-        return;
-      }
-
-      buf[i] = in.readByte();
-      if (buf[i] >= 0) {
-
-        int length = CodedInputStream.newInstance(buf, 0, i + 1).readRawVarint32();
-
-        if (length < 0) {
-          throw new CorruptedFrameException("negative length: " + length);
-        }
-        if (length == 0) {
-          throw new CorruptedFrameException("Received a message of length 0.");
-        }
-
-        if (in.readableBytes() < length) {
-          in.resetReaderIndex();
-          return;
-        } else {
-          // need to make buffer copy, otherwise netty will try to refill this buffer if we move the readerIndex forward...
-          // TODO: Can we avoid this copy?
-          ByteBuf outBuf;
-          try {
-            outBuf = allocator.buffer(length);
-          } catch (OutOfMemoryException e) {
-            logger.warn("Failure allocating buffer on incoming stream due to memory limits.  Current Allocation: {}.", allocator.getAllocatedMemory());
-            in.resetReaderIndex();
-            outOfMemoryHandler.handle();
-            return;
-          }
-          outBuf.writeBytes(in, in.readerIndex(), length);
-
-          in.skipBytes(length);
-
-          if (RpcConstants.EXTRA_DEBUGGING) {
-            logger.debug(String.format(
-                "ReaderIndex is %d after length header of %d bytes and frame body of length %d bytes.",
-                in.readerIndex(), i + 1, length));
-          }
-
-          out.add(outBuf);
-          return;
-        }
-      }
-    }
-
-    // Couldn't find the byte whose MSB is off.
-    throw new CorruptedFrameException("length wider than 32-bit");
-
-  }
-
-  @Override
-  public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
-    ctx.fireChannelReadComplete();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/1fde9bb1/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java
deleted file mode 100644
index d62b6f2..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java
+++ /dev/null
@@ -1,269 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.rpc;
-
-import io.netty.channel.ChannelFuture;
-import io.netty.util.concurrent.GenericFutureListener;
-
-import java.io.Closeable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicReference;
-
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.AbstractFuture;
-import com.google.protobuf.MessageLite;
-
-/**
- * Manager all connections between two particular bits.
- */
-public abstract class ReconnectingConnection<CONNECTION_TYPE extends RemoteConnection, OUTBOUND_HANDSHAKE extends MessageLite>
-    implements Closeable {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ReconnectingConnection.class);
-
-  private final AtomicReference<CONNECTION_TYPE> connectionHolder = new AtomicReference<CONNECTION_TYPE>();
-  private final String host;
-  private final int port;
-  private final OUTBOUND_HANDSHAKE handshake;
-
-  public ReconnectingConnection(OUTBOUND_HANDSHAKE handshake, String host, int port) {
-    Preconditions.checkNotNull(host);
-    Preconditions.checkArgument(port > 0);
-    this.host = host;
-    this.port = port;
-    this.handshake = handshake;
-  }
-
-  protected abstract BasicClient<?, CONNECTION_TYPE, OUTBOUND_HANDSHAKE, ?> getNewClient();
-
-  public <R extends MessageLite, C extends RpcCommand<R, CONNECTION_TYPE>> void runCommand(C cmd) {
-//    if(logger.isDebugEnabled()) logger.debug(String.format("Running command %s sending to host %s:%d", cmd, host, port));
-    CONNECTION_TYPE connection = connectionHolder.get();
-    if (connection != null) {
-      if (connection.isActive()) {
-        cmd.connectionAvailable(connection);
-//        logger.debug("Connection available and active, command run inline.");
-        return;
-      } else {
-        // remove the old connection. (don't worry if we fail since someone else should have done it.
-        connectionHolder.compareAndSet(connection, null);
-      }
-    }
-
-    /**
-     * We've arrived here without a connection, let's make sure only one of us makes a connection. (fyi, another
-     * endpoint could create a reverse connection
-     **/
-    synchronized (this) {
-      connection = connectionHolder.get();
-      if (connection != null) {
-        cmd.connectionAvailable(connection);
-
-      } else {
-//        logger.debug("No connection active, opening client connection.");
-        BasicClient<?, CONNECTION_TYPE, OUTBOUND_HANDSHAKE, ?> client = getNewClient();
-        ConnectionListeningFuture<R, C> future = new ConnectionListeningFuture<R, C>(cmd);
-        client.connectAsClient(future, handshake, host, port);
-        future.waitAndRun();
-//        logger.debug("Connection available and active, command now being run inline.");
-      }
-      return;
-
-    }
-  }
-
-  public class ConnectionListeningFuture<R extends MessageLite, C extends RpcCommand<R, CONNECTION_TYPE>> extends
-      AbstractFuture<CONNECTION_TYPE> implements RpcConnectionHandler<CONNECTION_TYPE> {
-
-    private C cmd;
-
-    public ConnectionListeningFuture(C cmd) {
-      super();
-      this.cmd = cmd;
-    }
-
-    /**
-     * Called by
-     */
-    public void waitAndRun() {
-      boolean isInterrupted = false;
-
-      // We want to wait for at least 120 secs when interrupts occur. Establishing a connection fails/succeeds quickly,
-      // So there is no point propagating the interruption as failure immediately.
-      long remainingWaitTimeMills = 120000;
-      long startTime = System.currentTimeMillis();
-
-      while(true) {
-        try {
-          //        logger.debug("Waiting for connection.");
-          CONNECTION_TYPE connection = this.get(remainingWaitTimeMills, TimeUnit.MILLISECONDS);
-
-          if (connection == null) {
-            //          logger.debug("Connection failed.");
-          } else {
-            //          logger.debug("Connection received. {}", connection);
-            cmd.connectionSucceeded(connection);
-            //          logger.debug("Finished connection succeeded activity.");
-          }
-          break;
-        } catch (final InterruptedException interruptEx) {
-          remainingWaitTimeMills -= (System.currentTimeMillis() - startTime);
-          startTime = System.currentTimeMillis();
-          isInterrupted = true;
-          if (remainingWaitTimeMills < 1) {
-            cmd.connectionFailed(FailureType.CONNECTION, interruptEx);
-            break;
-          }
-          // Ignore the interrupt and continue to wait until we elapse remainingWaitTimeMills.
-        } catch (final ExecutionException | TimeoutException ex) {
-          logger.error("Failed to establish connection", ex);
-          cmd.connectionFailed(FailureType.CONNECTION, ex);
-          break;
-        }
-      }
-
-      if (isInterrupted) {
-        // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
-        // interruption and respond to it if it wants to.
-        Thread.currentThread().interrupt();
-      }
-    }
-
-    @Override
-    public void connectionFailed(org.apache.drill.exec.rpc.RpcConnectionHandler.FailureType type, Throwable t) {
-      set(null);
-      cmd.connectionFailed(type, t);
-    }
-
-    @Override
-    public void connectionSucceeded(CONNECTION_TYPE incoming) {
-      CONNECTION_TYPE connection = connectionHolder.get();
-      while (true) {
-        boolean setted = connectionHolder.compareAndSet(null, incoming);
-        if (setted) {
-          connection = incoming;
-          break;
-        }
-        connection = connectionHolder.get();
-        if (connection != null) {
-          break;
-        }
-      }
-
-      if (connection != incoming) {
-        // close the incoming because another channel was created in the mean time (unless this is a self connection).
-        logger.debug("Closing incoming connection because a connection was already set.");
-        incoming.getChannel().close();
-      }
-      set(connection);
-
-    }
-
-  }
-
-  /** Factory for close handlers **/
-  public class CloseHandlerCreator {
-    public GenericFutureListener<ChannelFuture> getHandler(CONNECTION_TYPE connection,
-        GenericFutureListener<ChannelFuture> parent) {
-      return new CloseHandler(connection, parent);
-    }
-  }
-
-  /**
-   * Listens for connection closes and clears connection holder.
-   */
-  protected class CloseHandler implements GenericFutureListener<ChannelFuture> {
-    private CONNECTION_TYPE connection;
-    private GenericFutureListener<ChannelFuture> parent;
-
-    public CloseHandler(CONNECTION_TYPE connection, GenericFutureListener<ChannelFuture> parent) {
-      super();
-      this.connection = connection;
-      this.parent = parent;
-    }
-
-    @Override
-    public void operationComplete(ChannelFuture future) throws Exception {
-      connectionHolder.compareAndSet(connection, null);
-      parent.operationComplete(future);
-    }
-
-  }
-
-  public CloseHandlerCreator getCloseHandlerCreator() {
-    return new CloseHandlerCreator();
-  }
-
-  public void addExternalConnection(CONNECTION_TYPE connection) {
-    // if the connection holder is not set, set it to this incoming connection. We'll simply ignore if already set.
-    this.connectionHolder.compareAndSet(null, connection);
-  }
-
-  @Override
-  public void close() {
-    CONNECTION_TYPE c = connectionHolder.getAndSet(null);
-    if (c != null) {
-      c.getChannel().close();
-    }
-  }
-
-  /**
-   * Decorate a connection creation so that we capture a success and keep it available for future requests. If we have
-   * raced and another is already available... we return that one and close things down on this one.
-   */
-  private class ConnectionListeningDecorator implements RpcConnectionHandler<CONNECTION_TYPE> {
-
-    private final RpcConnectionHandler<CONNECTION_TYPE> delegate;
-
-    public ConnectionListeningDecorator(RpcConnectionHandler<CONNECTION_TYPE> delegate) {
-      this.delegate = delegate;
-    }
-
-    @Override
-    public void connectionSucceeded(CONNECTION_TYPE incoming) {
-      CONNECTION_TYPE connection = connectionHolder.get();
-      while (true) {
-        boolean setted = connectionHolder.compareAndSet(null, incoming);
-        if (setted) {
-          connection = incoming;
-          break;
-        }
-        connection = connectionHolder.get();
-        if (connection != null) {
-          break;
-        }
-      }
-
-      if (connection == incoming) {
-        delegate.connectionSucceeded(connection);
-      } else {
-        // close the incoming because another channel was created in the mean time (unless this is a self connection).
-        logger.debug("Closing incoming connection because a connection was already set.");
-        incoming.getChannel().close();
-        delegate.connectionSucceeded(connection);
-      }
-    }
-
-    @Override
-    public void connectionFailed(org.apache.drill.exec.rpc.RpcConnectionHandler.FailureType type, Throwable t) {
-      delegate.connectionFailed(type, t);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/1fde9bb1/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
deleted file mode 100644
index 30abcc4..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.rpc;
-
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandlerAdapter;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.GenericFutureListener;
-
-import java.util.concurrent.ExecutionException;
-
-import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.rpc.user.ConnectionThrottle;
-
-public abstract class RemoteConnection implements ConnectionThrottle, AutoCloseable {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RemoteConnection.class);
-  private final Channel channel;
-  private final WriteManager writeManager;
-  private String name;
-  private final String clientName;
-
-  public boolean inEventLoop(){
-    return channel.eventLoop().inEventLoop();
-  }
-
-  public RemoteConnection(SocketChannel channel, String name) {
-    super();
-    this.channel = channel;
-    this.clientName = name;
-    this.writeManager = new WriteManager();
-    channel.pipeline().addLast(new BackPressureHandler());
-    channel.closeFuture().addListener(new GenericFutureListener<Future<? super Void>>() {
-      public void operationComplete(Future<? super Void> future) throws Exception {
-        // this could possibly overrelease but it doesn't matter since we're only going to do this to ensure that we
-        // fail out any pending messages
-        writeManager.disable();
-        writeManager.setWritable(true);
-      }
-    });
-
-  }
-
-  public String getName() {
-    if(name == null){
-      name = String.format("%s <--> %s (%s)", channel.localAddress(), channel.remoteAddress(), clientName);
-    }
-    return name;
-  }
-
-  public abstract BufferAllocator getAllocator();
-
-  public final Channel getChannel() {
-    return channel;
-  }
-
-  public boolean blockOnNotWritable(RpcOutcomeListener<?> listener){
-    try{
-      writeManager.waitForWritable();
-      return true;
-    }catch(final InterruptedException e){
-      listener.interrupted(e);
-
-      // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
-      // interruption and respond to it if it wants to.
-      Thread.currentThread().interrupt();
-
-      return false;
-    }
-  }
-
-  public void setAutoRead(boolean enableAutoRead){
-    channel.config().setAutoRead(enableAutoRead);
-  }
-
-  public boolean isActive(){
-    return channel.isActive();
-  }
-
-  /**
-   * The write manager is responsible for controlling whether or not a write can be sent.  It controls whether or not to block a sender if we have tcp backpressure on the receive side.
-   */
-  private static class WriteManager{
-    private final ResettableBarrier barrier = new ResettableBarrier();
-    private volatile boolean disabled = false;
-
-    public WriteManager(){
-      barrier.openBarrier();
-    }
-
-    public void waitForWritable() throws InterruptedException{
-      barrier.await();
-    }
-
-    public void setWritable(boolean isWritable){
-      if(isWritable){
-        barrier.openBarrier();
-      } else if (!disabled) {
-        barrier.closeBarrier();
-      }
-
-    }
-
-    public void disable() {
-      disabled = true;
-    }
-  }
-
-  private class BackPressureHandler extends ChannelInboundHandlerAdapter{
-
-    @Override
-    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
-//      logger.debug("Channel writability changed.", ctx.channel().isWritable());
-      writeManager.setWritable(ctx.channel().isWritable());
-      ctx.fireChannelWritabilityChanged();
-    }
-
-  }
-
-  @Override
-  public void close() {
-    try {
-      if (channel.isActive()) {
-        channel.close().get();
-      }
-    } catch (final InterruptedException | ExecutionException e) {
-      logger.warn("Caught exception while closing channel.", e);
-
-      // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
-      // interruption and respond to it if it wants to.
-      Thread.currentThread().interrupt();
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/1fde9bb1/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ResettableBarrier.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ResettableBarrier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ResettableBarrier.java
deleted file mode 100644
index a2a6d2a..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ResettableBarrier.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.rpc;
-
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.AbstractQueuedSynchronizer;
-
-/**
- * Modified implementation of countdown latch that allows a barrier to be unilaterally opened and closed.  All others simply wait when it is closed.  Is initialized in a closed state.
- */
-public class ResettableBarrier {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ResettableBarrier.class);
-
-  private final InternalSynchronizer sync = new InternalSynchronizer();
-
-  public ResettableBarrier() {
-  }
-
-  private static final class InternalSynchronizer extends AbstractQueuedSynchronizer {
-
-    private InternalSynchronizer() {
-      setState(1);
-    }
-
-    @Override
-    protected int tryAcquireShared(int acquires) {
-      assert acquires == 1;
-      return (getState() == 0) ? 1 : -1;
-    }
-
-    @Override
-    protected boolean tryReleaseShared(int releases) {
-      assert releases == 1;
-
-      while(true) {
-        int c = getState();
-        if (c == 0) {
-          return false;
-        }
-        int nextc = c - 1;
-        if (compareAndSetState(c, nextc)) {
-          return nextc == 0;
-        }
-      }
-    }
-
-    protected void reset() {
-      setState(1);
-    }
-
-  }
-
-  public void await() throws InterruptedException {
-//    logger.debug("awaiting barrier interruptibly.");
-    sync.acquireSharedInterruptibly(1);
-  }
-
-  public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
-//    logger.debug("awaiting barrier with timeout {}.", timeout);
-    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
-  }
-
-  public void openBarrier() {
-//    logger.debug("opening barrier.");
-    sync.releaseShared(1);
-  }
-
-  public void closeBarrier() {
-//    logger.debug("closing barrier.");
-    sync.reset();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/1fde9bb1/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/Response.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/Response.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/Response.java
deleted file mode 100644
index b48adec..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/Response.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.rpc;
-
-import io.netty.buffer.ByteBuf;
-
-import com.google.protobuf.Internal.EnumLite;
-import com.google.protobuf.MessageLite;
-
-public class Response {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Response.class);
-
-  public EnumLite rpcType;
-  public MessageLite pBody;
-  public ByteBuf[] dBodies;
-
-  public Response(EnumLite rpcType, MessageLite pBody, ByteBuf... dBodies) {
-    super();
-    this.rpcType = rpcType;
-    this.pBody = pBody;
-    this.dBodies = dBodies;
-  }
-
-
-
-
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/1fde9bb1/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ResponseSender.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ResponseSender.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ResponseSender.java
deleted file mode 100644
index 6dc9ae1..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ResponseSender.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.rpc;
-
-
-public interface ResponseSender {
-  public void send(Response r);
-
-}


Mime
View raw message