drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jacq...@apache.org
Subject [23/45] drill git commit: DRILL-3987: (MOVE) Extract RPC, memory-base and memory-impl as separate modules.
Date Fri, 13 Nov 2015 02:37:53 GMT
http://git-wip-us.apache.org/repos/asf/drill/blob/1fde9bb1/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ListeningCommand.java
----------------------------------------------------------------------
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ListeningCommand.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ListeningCommand.java
new file mode 100644
index 0000000..92022b0
--- /dev/null
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ListeningCommand.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc;
+
+import io.netty.buffer.ByteBuf;
+
+import com.google.protobuf.MessageLite;
+
+public abstract class ListeningCommand<T extends MessageLite, C extends RemoteConnection> implements RpcCommand<T, C> {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ListeningCommand.class);
+
+  private final RpcOutcomeListener<T> listener;
+
+  public ListeningCommand(RpcOutcomeListener<T> listener) {
+    this.listener = listener;
+  }
+
+  public abstract void doRpcCall(RpcOutcomeListener<T> outcomeListener, C connection);
+
+  @Override
+  public void connectionAvailable(C connection) {
+
+    doRpcCall(new DeferredRpcOutcome(), connection);
+  }
+
+  @Override
+  public void connectionSucceeded(C connection) {
+    connectionAvailable(connection);
+  }
+
+  private class DeferredRpcOutcome implements RpcOutcomeListener<T> {
+
+    @Override
+    public void failed(RpcException ex) {
+      listener.failed(ex);
+    }
+
+    @Override
+    public void success(T value, ByteBuf buf) {
+      listener.success(value, buf);
+    }
+
+    @Override
+    public void interrupted(final InterruptedException e) {
+      listener.interrupted(e);
+    }
+  }
+
+  @Override
+  public void connectionFailed(FailureType type, Throwable t) {
+    listener.failed(RpcException.mapException(
+        String.format("Command failed while establishing connection.  Failure type %s.", type), t));
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/1fde9bb1/exec/rpc/src/main/java/org/apache/drill/exec/rpc/NamedThreadFactory.java
----------------------------------------------------------------------
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/NamedThreadFactory.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/NamedThreadFactory.java
new file mode 100644
index 0000000..e7580b2
--- /dev/null
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/NamedThreadFactory.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * {@link ThreadFactory} for {@link ExecutorServices} that names threads sequentially.
+ * Creates Threads named with the prefix specified at construction time. Created threads
+ * have the daemon bit set and priority Thread.MAX_PRIORITY.
+ *
+ * <p>An instance creates names with an instance-specific prefix suffixed with sequential
+ * integers.</p>
+ *
+ * <p>Concurrency: See {@link newThread}.</p>
+ */
+public class NamedThreadFactory implements ThreadFactory {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(NamedThreadFactory.class);
+  private final AtomicInteger nextId = new AtomicInteger(); // used to generate unique ids
+  private final String prefix;
+
+  /**
+   * Constructor.
+   *
+   * @param prefix the string prefix that will be used to name threads created by this factory
+   */
+  public NamedThreadFactory(final String prefix) {
+    this.prefix = prefix;
+  }
+
+ /**
+  * Creates a sequentially named thread running a given Runnable.
+  * <p>
+  *   The thread's name will be this instance's prefix concatenated with
+  *   this instance's next<sup><a href="#fn-1">*</a></sup> sequential integer.
+  * </p>
+  * <p>
+  *  Concurrency:  Thread-safe.
+  * </p>
+  * <p>
+  * (Concurrent calls get different numbers.
+  *  Calls started after other calls complete get later/higher numbers than
+  *  those other calls.
+  * </p>
+  * <p>
+  *  <a name="fn-1" />*However, for concurrent calls, the order of numbers
+  *  is not defined.)
+  */
+  @Override
+  public Thread newThread(final Runnable runnable) {
+    final Thread thread = new Thread(runnable, prefix + nextId.incrementAndGet());
+    thread.setDaemon(true);
+
+    try {
+      if (thread.getPriority() != Thread.MAX_PRIORITY) {
+        thread.setPriority(Thread.MAX_PRIORITY);
+      }
+    } catch (Exception ignored) {
+      // Doesn't matter even if failed to set.
+      logger.info("ignored exception " + ignored);
+    }
+    return thread;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/1fde9bb1/exec/rpc/src/main/java/org/apache/drill/exec/rpc/OutOfMemoryHandler.java
----------------------------------------------------------------------
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/OutOfMemoryHandler.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/OutOfMemoryHandler.java
new file mode 100644
index 0000000..5d7db47
--- /dev/null
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/OutOfMemoryHandler.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc;
+
+public interface OutOfMemoryHandler {
+
+  public static OutOfMemoryHandler DEFAULT_INSTANCE = new OutOfMemoryHandler() {
+    @Override
+    public void handle() {
+      throw new UnsupportedOperationException();
+    }
+  };
+
+  public void handle();
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/1fde9bb1/exec/rpc/src/main/java/org/apache/drill/exec/rpc/OutboundRpcMessage.java
----------------------------------------------------------------------
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/OutboundRpcMessage.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/OutboundRpcMessage.java
new file mode 100644
index 0000000..5eda350
--- /dev/null
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/OutboundRpcMessage.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc;
+
+import io.netty.buffer.ByteBuf;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.drill.exec.proto.GeneralRPCProtos.RpcMode;
+
+import com.google.common.collect.Lists;
+import com.google.protobuf.Internal.EnumLite;
+import com.google.protobuf.MessageLite;
+
+public class OutboundRpcMessage extends RpcMessage {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OutboundRpcMessage.class);
+
+  final MessageLite pBody;
+  public ByteBuf[] dBodies;
+
+
+
+  public OutboundRpcMessage(RpcMode mode, EnumLite rpcType, int coordinationId, MessageLite pBody, ByteBuf... dBodies) {
+      this(mode, rpcType.getNumber(), coordinationId, pBody, dBodies);
+  }
+
+
+  OutboundRpcMessage(RpcMode mode, int rpcTypeNumber, int coordinationId, MessageLite pBody, ByteBuf... dBodies) {
+    super(mode, rpcTypeNumber, coordinationId);
+    this.pBody = pBody;
+
+    // Netty doesn't traditionally release the reference on an unreadable buffer.  However, we need to so that if we send a empty or unwritable buffer, we still release.  otherwise we get weird memory leaks when sending empty vectors.
+    List<ByteBuf> bufs = Lists.newArrayList();
+    for (ByteBuf d : dBodies) {
+      if (d.readableBytes() == 0) {
+        d.release();
+      } else {
+        bufs.add(d);
+      }
+    }
+    this.dBodies = bufs.toArray(new ByteBuf[bufs.size()]);
+  }
+
+  @Override
+  public int getBodySize() {
+    int len = pBody.getSerializedSize();
+    len += RpcEncoder.getRawVarintSize(len);
+    len += getRawBodySize();
+    return len;
+  }
+
+  public int getRawBodySize() {
+    if (dBodies == null) {
+      return 0;
+    }
+    int len = 0;
+
+    for (int i = 0; i < dBodies.length; i++) {
+      if (RpcConstants.EXTRA_DEBUGGING) {
+        logger.debug("Reader Index {}, Writer Index {}", dBodies[i].readerIndex(), dBodies[i].writerIndex());
+      }
+      len += dBodies[i].readableBytes();
+    }
+    return len;
+  }
+
+  @Override
+  public String toString() {
+    return "OutboundRpcMessage [pBody=" + pBody + ", mode=" + mode + ", rpcType=" + rpcType + ", coordinationId="
+        + coordinationId + ", dBodies=" + Arrays.toString(dBodies) + "]";
+  }
+
+  @Override
+  void release() {
+    if (dBodies != null) {
+      for (ByteBuf b : dBodies) {
+        b.release();
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/1fde9bb1/exec/rpc/src/main/java/org/apache/drill/exec/rpc/PositiveAtomicInteger.java
----------------------------------------------------------------------
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/PositiveAtomicInteger.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/PositiveAtomicInteger.java
new file mode 100644
index 0000000..401663d
--- /dev/null
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/PositiveAtomicInteger.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/*
+ * An atomic integer that only ever returns 0 > MAX_INT and then starts over.  Should never has a negative overflow.
+ */
+public class PositiveAtomicInteger {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PositiveAtomicInteger.class);
+
+  private final AtomicInteger internal = new AtomicInteger(0);
+
+  public int getNext(){
+    int i = internal.addAndGet(1);
+    if(i < 0){
+      return i + (-Integer.MIN_VALUE);
+    }else{
+      return i;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/1fde9bb1/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ProtobufLengthDecoder.java
----------------------------------------------------------------------
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ProtobufLengthDecoder.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ProtobufLengthDecoder.java
new file mode 100644
index 0000000..3dfe03f
--- /dev/null
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ProtobufLengthDecoder.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import io.netty.handler.codec.CorruptedFrameException;
+
+import java.util.List;
+
+import org.apache.drill.exec.memory.BufferAllocator;
+
+import com.google.protobuf.CodedInputStream;
+import org.apache.drill.exec.exception.OutOfMemoryException;
+
+/**
+ * Modified version of {@link io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder} that avoids bytebuf copy.
+ * See the documentation there.
+ */
+public class ProtobufLengthDecoder extends ByteToMessageDecoder {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProtobufLengthDecoder.class);
+
+  private BufferAllocator allocator;
+  private OutOfMemoryHandler outOfMemoryHandler;
+
+  public ProtobufLengthDecoder(BufferAllocator allocator, OutOfMemoryHandler outOfMemoryHandler) {
+    super();
+    this.allocator = allocator;
+    this.outOfMemoryHandler = outOfMemoryHandler;
+  }
+
+
+  @Override
+  protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
+    if (!ctx.channel().isOpen()) {
+      if (in.readableBytes() > 0) {
+        logger.info("Channel is closed, discarding remaining {} byte(s) in buffer.", in.readableBytes());
+      }
+      in.skipBytes(in.readableBytes());
+      return;
+    }
+
+    in.markReaderIndex();
+    final byte[] buf = new byte[5];
+    for (int i = 0; i < buf.length; i++) {
+      if (!in.isReadable()) {
+        in.resetReaderIndex();
+        return;
+      }
+
+      buf[i] = in.readByte();
+      if (buf[i] >= 0) {
+
+        int length = CodedInputStream.newInstance(buf, 0, i + 1).readRawVarint32();
+
+        if (length < 0) {
+          throw new CorruptedFrameException("negative length: " + length);
+        }
+        if (length == 0) {
+          throw new CorruptedFrameException("Received a message of length 0.");
+        }
+
+        if (in.readableBytes() < length) {
+          in.resetReaderIndex();
+          return;
+        } else {
+          // need to make buffer copy, otherwise netty will try to refill this buffer if we move the readerIndex forward...
+          // TODO: Can we avoid this copy?
+          ByteBuf outBuf;
+          try {
+            outBuf = allocator.buffer(length);
+          } catch (OutOfMemoryException e) {
+            logger.warn("Failure allocating buffer on incoming stream due to memory limits.  Current Allocation: {}.", allocator.getAllocatedMemory());
+            in.resetReaderIndex();
+            outOfMemoryHandler.handle();
+            return;
+          }
+          outBuf.writeBytes(in, in.readerIndex(), length);
+
+          in.skipBytes(length);
+
+          if (RpcConstants.EXTRA_DEBUGGING) {
+            logger.debug(String.format(
+                "ReaderIndex is %d after length header of %d bytes and frame body of length %d bytes.",
+                in.readerIndex(), i + 1, length));
+          }
+
+          out.add(outBuf);
+          return;
+        }
+      }
+    }
+
+    // Couldn't find the byte whose MSB is off.
+    throw new CorruptedFrameException("length wider than 32-bit");
+
+  }
+
+  @Override
+  public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
+    ctx.fireChannelReadComplete();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/1fde9bb1/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java
----------------------------------------------------------------------
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java
new file mode 100644
index 0000000..d62b6f2
--- /dev/null
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc;
+
+import io.netty.channel.ChannelFuture;
+import io.netty.util.concurrent.GenericFutureListener;
+
+import java.io.Closeable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.AbstractFuture;
+import com.google.protobuf.MessageLite;
+
+/**
+ * Manager all connections between two particular bits.
+ */
+public abstract class ReconnectingConnection<CONNECTION_TYPE extends RemoteConnection, OUTBOUND_HANDSHAKE extends MessageLite>
+    implements Closeable {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ReconnectingConnection.class);
+
+  private final AtomicReference<CONNECTION_TYPE> connectionHolder = new AtomicReference<CONNECTION_TYPE>();
+  private final String host;
+  private final int port;
+  private final OUTBOUND_HANDSHAKE handshake;
+
+  public ReconnectingConnection(OUTBOUND_HANDSHAKE handshake, String host, int port) {
+    Preconditions.checkNotNull(host);
+    Preconditions.checkArgument(port > 0);
+    this.host = host;
+    this.port = port;
+    this.handshake = handshake;
+  }
+
+  protected abstract BasicClient<?, CONNECTION_TYPE, OUTBOUND_HANDSHAKE, ?> getNewClient();
+
+  public <R extends MessageLite, C extends RpcCommand<R, CONNECTION_TYPE>> void runCommand(C cmd) {
+//    if(logger.isDebugEnabled()) logger.debug(String.format("Running command %s sending to host %s:%d", cmd, host, port));
+    CONNECTION_TYPE connection = connectionHolder.get();
+    if (connection != null) {
+      if (connection.isActive()) {
+        cmd.connectionAvailable(connection);
+//        logger.debug("Connection available and active, command run inline.");
+        return;
+      } else {
+        // remove the old connection. (don't worry if we fail since someone else should have done it.
+        connectionHolder.compareAndSet(connection, null);
+      }
+    }
+
+    /**
+     * We've arrived here without a connection, let's make sure only one of us makes a connection. (fyi, another
+     * endpoint could create a reverse connection
+     **/
+    synchronized (this) {
+      connection = connectionHolder.get();
+      if (connection != null) {
+        cmd.connectionAvailable(connection);
+
+      } else {
+//        logger.debug("No connection active, opening client connection.");
+        BasicClient<?, CONNECTION_TYPE, OUTBOUND_HANDSHAKE, ?> client = getNewClient();
+        ConnectionListeningFuture<R, C> future = new ConnectionListeningFuture<R, C>(cmd);
+        client.connectAsClient(future, handshake, host, port);
+        future.waitAndRun();
+//        logger.debug("Connection available and active, command now being run inline.");
+      }
+      return;
+
+    }
+  }
+
+  public class ConnectionListeningFuture<R extends MessageLite, C extends RpcCommand<R, CONNECTION_TYPE>> extends
+      AbstractFuture<CONNECTION_TYPE> implements RpcConnectionHandler<CONNECTION_TYPE> {
+
+    private C cmd;
+
+    public ConnectionListeningFuture(C cmd) {
+      super();
+      this.cmd = cmd;
+    }
+
+    /**
+     * Called by
+     */
+    public void waitAndRun() {
+      boolean isInterrupted = false;
+
+      // We want to wait for at least 120 secs when interrupts occur. Establishing a connection fails/succeeds quickly,
+      // So there is no point propagating the interruption as failure immediately.
+      long remainingWaitTimeMills = 120000;
+      long startTime = System.currentTimeMillis();
+
+      while(true) {
+        try {
+          //        logger.debug("Waiting for connection.");
+          CONNECTION_TYPE connection = this.get(remainingWaitTimeMills, TimeUnit.MILLISECONDS);
+
+          if (connection == null) {
+            //          logger.debug("Connection failed.");
+          } else {
+            //          logger.debug("Connection received. {}", connection);
+            cmd.connectionSucceeded(connection);
+            //          logger.debug("Finished connection succeeded activity.");
+          }
+          break;
+        } catch (final InterruptedException interruptEx) {
+          remainingWaitTimeMills -= (System.currentTimeMillis() - startTime);
+          startTime = System.currentTimeMillis();
+          isInterrupted = true;
+          if (remainingWaitTimeMills < 1) {
+            cmd.connectionFailed(FailureType.CONNECTION, interruptEx);
+            break;
+          }
+          // Ignore the interrupt and continue to wait until we elapse remainingWaitTimeMills.
+        } catch (final ExecutionException | TimeoutException ex) {
+          logger.error("Failed to establish connection", ex);
+          cmd.connectionFailed(FailureType.CONNECTION, ex);
+          break;
+        }
+      }
+
+      if (isInterrupted) {
+        // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
+        // interruption and respond to it if it wants to.
+        Thread.currentThread().interrupt();
+      }
+    }
+
+    @Override
+    public void connectionFailed(org.apache.drill.exec.rpc.RpcConnectionHandler.FailureType type, Throwable t) {
+      set(null);
+      cmd.connectionFailed(type, t);
+    }
+
+    @Override
+    public void connectionSucceeded(CONNECTION_TYPE incoming) {
+      CONNECTION_TYPE connection = connectionHolder.get();
+      while (true) {
+        boolean setted = connectionHolder.compareAndSet(null, incoming);
+        if (setted) {
+          connection = incoming;
+          break;
+        }
+        connection = connectionHolder.get();
+        if (connection != null) {
+          break;
+        }
+      }
+
+      if (connection != incoming) {
+        // close the incoming because another channel was created in the mean time (unless this is a self connection).
+        logger.debug("Closing incoming connection because a connection was already set.");
+        incoming.getChannel().close();
+      }
+      set(connection);
+
+    }
+
+  }
+
+  /** Factory for close handlers **/
+  public class CloseHandlerCreator {
+    public GenericFutureListener<ChannelFuture> getHandler(CONNECTION_TYPE connection,
+        GenericFutureListener<ChannelFuture> parent) {
+      return new CloseHandler(connection, parent);
+    }
+  }
+
+  /**
+   * Listens for connection closes and clears connection holder.
+   */
+  protected class CloseHandler implements GenericFutureListener<ChannelFuture> {
+    private CONNECTION_TYPE connection;
+    private GenericFutureListener<ChannelFuture> parent;
+
+    public CloseHandler(CONNECTION_TYPE connection, GenericFutureListener<ChannelFuture> parent) {
+      super();
+      this.connection = connection;
+      this.parent = parent;
+    }
+
+    @Override
+    public void operationComplete(ChannelFuture future) throws Exception {
+      connectionHolder.compareAndSet(connection, null);
+      parent.operationComplete(future);
+    }
+
+  }
+
+  public CloseHandlerCreator getCloseHandlerCreator() {
+    return new CloseHandlerCreator();
+  }
+
+  public void addExternalConnection(CONNECTION_TYPE connection) {
+    // if the connection holder is not set, set it to this incoming connection. We'll simply ignore if already set.
+    this.connectionHolder.compareAndSet(null, connection);
+  }
+
+  @Override
+  public void close() {
+    CONNECTION_TYPE c = connectionHolder.getAndSet(null);
+    if (c != null) {
+      c.getChannel().close();
+    }
+  }
+
+  /**
+   * Decorate a connection creation so that we capture a success and keep it available for future requests. If we have
+   * raced and another is already available... we return that one and close things down on this one.
+   */
+  private class ConnectionListeningDecorator implements RpcConnectionHandler<CONNECTION_TYPE> {
+
+    private final RpcConnectionHandler<CONNECTION_TYPE> delegate;
+
+    public ConnectionListeningDecorator(RpcConnectionHandler<CONNECTION_TYPE> delegate) {
+      this.delegate = delegate;
+    }
+
+    @Override
+    public void connectionSucceeded(CONNECTION_TYPE incoming) {
+      CONNECTION_TYPE connection = connectionHolder.get();
+      while (true) {
+        boolean setted = connectionHolder.compareAndSet(null, incoming);
+        if (setted) {
+          connection = incoming;
+          break;
+        }
+        connection = connectionHolder.get();
+        if (connection != null) {
+          break;
+        }
+      }
+
+      if (connection == incoming) {
+        delegate.connectionSucceeded(connection);
+      } else {
+        // close the incoming because another channel was created in the mean time (unless this is a self connection).
+        logger.debug("Closing incoming connection because a connection was already set.");
+        incoming.getChannel().close();
+        delegate.connectionSucceeded(connection);
+      }
+    }
+
+    @Override
+    public void connectionFailed(org.apache.drill.exec.rpc.RpcConnectionHandler.FailureType type, Throwable t) {
+      delegate.connectionFailed(type, t);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/1fde9bb1/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
----------------------------------------------------------------------
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
new file mode 100644
index 0000000..561f0a4
--- /dev/null
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
+
+import java.util.concurrent.ExecutionException;
+
+import org.apache.drill.exec.memory.BufferAllocator;
+
+public abstract class RemoteConnection implements ConnectionThrottle, AutoCloseable {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RemoteConnection.class);
+  private final Channel channel;
+  private final WriteManager writeManager;
+  private String name;
+  private final String clientName;
+
+  public boolean inEventLoop(){
+    return channel.eventLoop().inEventLoop();
+  }
+
+  public RemoteConnection(SocketChannel channel, String name) {
+    super();
+    this.channel = channel;
+    this.clientName = name;
+    this.writeManager = new WriteManager();
+    channel.pipeline().addLast(new BackPressureHandler());
+    channel.closeFuture().addListener(new GenericFutureListener<Future<? super Void>>() {
+      public void operationComplete(Future<? super Void> future) throws Exception {
+        // this could possibly overrelease but it doesn't matter since we're only going to do this to ensure that we
+        // fail out any pending messages
+        writeManager.disable();
+        writeManager.setWritable(true);
+      }
+    });
+
+  }
+
+  public String getName() {
+    if(name == null){
+      name = String.format("%s <--> %s (%s)", channel.localAddress(), channel.remoteAddress(), clientName);
+    }
+    return name;
+  }
+
+  public abstract BufferAllocator getAllocator();
+
+  public final Channel getChannel() {
+    return channel;
+  }
+
+  public boolean blockOnNotWritable(RpcOutcomeListener<?> listener){
+    try{
+      writeManager.waitForWritable();
+      return true;
+    }catch(final InterruptedException e){
+      listener.interrupted(e);
+
+      // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
+      // interruption and respond to it if it wants to.
+      Thread.currentThread().interrupt();
+
+      return false;
+    }
+  }
+
+  public void setAutoRead(boolean enableAutoRead){
+    channel.config().setAutoRead(enableAutoRead);
+  }
+
+  public boolean isActive(){
+    return channel.isActive();
+  }
+
+  /**
+   * The write manager is responsible for controlling whether or not a write can be sent.  It controls whether or not to block a sender if we have tcp backpressure on the receive side.
+   */
+  private static class WriteManager{
+    private final ResettableBarrier barrier = new ResettableBarrier();
+    private volatile boolean disabled = false;
+
+    public WriteManager(){
+      barrier.openBarrier();
+    }
+
+    public void waitForWritable() throws InterruptedException{
+      barrier.await();
+    }
+
+    public void setWritable(boolean isWritable){
+      if(isWritable){
+        barrier.openBarrier();
+      } else if (!disabled) {
+        barrier.closeBarrier();
+      }
+
+    }
+
+    public void disable() {
+      disabled = true;
+    }
+  }
+
+  private class BackPressureHandler extends ChannelInboundHandlerAdapter{
+
+    @Override
+    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
+//      logger.debug("Channel writability changed.", ctx.channel().isWritable());
+      writeManager.setWritable(ctx.channel().isWritable());
+      ctx.fireChannelWritabilityChanged();
+    }
+
+  }
+
+  @Override
+  public void close() {
+    try {
+      if (channel.isActive()) {
+        channel.close().get();
+      }
+    } catch (final InterruptedException | ExecutionException e) {
+      logger.warn("Caught exception while closing channel.", e);
+
+      // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
+      // interruption and respond to it if it wants to.
+      Thread.currentThread().interrupt();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/1fde9bb1/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ResettableBarrier.java
----------------------------------------------------------------------
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ResettableBarrier.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ResettableBarrier.java
new file mode 100644
index 0000000..a2a6d2a
--- /dev/null
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ResettableBarrier.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.AbstractQueuedSynchronizer;
+
+/**
+ * Modified implementation of countdown latch that allows a barrier to be unilaterally opened and closed.  All others simply wait when it is closed.  Is initialized in a closed state.
+ */
+public class ResettableBarrier {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ResettableBarrier.class);
+
+  private final InternalSynchronizer sync = new InternalSynchronizer();
+
+  public ResettableBarrier() {
+  }
+
+  private static final class InternalSynchronizer extends AbstractQueuedSynchronizer {
+
+    private InternalSynchronizer() {
+      setState(1);
+    }
+
+    @Override
+    protected int tryAcquireShared(int acquires) {
+      assert acquires == 1;
+      return (getState() == 0) ? 1 : -1;
+    }
+
+    @Override
+    protected boolean tryReleaseShared(int releases) {
+      assert releases == 1;
+
+      while(true) {
+        int c = getState();
+        if (c == 0) {
+          return false;
+        }
+        int nextc = c - 1;
+        if (compareAndSetState(c, nextc)) {
+          return nextc == 0;
+        }
+      }
+    }
+
+    protected void reset() {
+      setState(1);
+    }
+
+  }
+
+  public void await() throws InterruptedException {
+//    logger.debug("awaiting barrier interruptibly.");
+    sync.acquireSharedInterruptibly(1);
+  }
+
+  public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
+//    logger.debug("awaiting barrier with timeout {}.", timeout);
+    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
+  }
+
+  public void openBarrier() {
+//    logger.debug("opening barrier.");
+    sync.releaseShared(1);
+  }
+
+  public void closeBarrier() {
+//    logger.debug("closing barrier.");
+    sync.reset();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/1fde9bb1/exec/rpc/src/main/java/org/apache/drill/exec/rpc/Response.java
----------------------------------------------------------------------
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/Response.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/Response.java
new file mode 100644
index 0000000..b48adec
--- /dev/null
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/Response.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc;
+
+import io.netty.buffer.ByteBuf;
+
+import com.google.protobuf.Internal.EnumLite;
+import com.google.protobuf.MessageLite;
+
+public class Response {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Response.class);
+
+  public EnumLite rpcType;
+  public MessageLite pBody;
+  public ByteBuf[] dBodies;
+
+  public Response(EnumLite rpcType, MessageLite pBody, ByteBuf... dBodies) {
+    super();
+    this.rpcType = rpcType;
+    this.pBody = pBody;
+    this.dBodies = dBodies;
+  }
+
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/1fde9bb1/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ResponseSender.java
----------------------------------------------------------------------
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ResponseSender.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ResponseSender.java
new file mode 100644
index 0000000..6dc9ae1
--- /dev/null
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ResponseSender.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc;
+
+
+public interface ResponseSender {
+  public void send(Response r);
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/1fde9bb1/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
----------------------------------------------------------------------
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcBus.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
new file mode 100644
index 0000000..61922a1
--- /dev/null
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
@@ -0,0 +1,478 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufInputStream;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.codec.MessageToMessageDecoder;
+import io.netty.util.Recycler;
+import io.netty.util.Recycler.Handle;
+import io.netty.util.concurrent.GenericFutureListener;
+
+import java.io.Closeable;
+import java.net.SocketAddress;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.drill.common.SerializedExecutor;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.proto.GeneralRPCProtos.RpcMode;
+import org.apache.drill.exec.proto.UserBitShared.DrillPBError;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import com.google.protobuf.Internal.EnumLite;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.MessageLite;
+import com.google.protobuf.Parser;
+
+/**
+ * The Rpc Bus deals with incoming and outgoing communication and is used on both the server and the client side of a
+ * system.
+ *
+ * @param <T>
+ */
+public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> implements Closeable {
+  final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(this.getClass());
+
+  private static final OutboundRpcMessage PONG = new OutboundRpcMessage(RpcMode.PONG, 0, 0, Acks.OK);
+
+  protected final CoordinationQueue queue = new CoordinationQueue(16, 16);
+
+  protected abstract MessageLite getResponseDefaultInstance(int rpcType) throws RpcException;
+
+  protected void handle(C connection, int rpcType, ByteBuf pBody, ByteBuf dBody, ResponseSender sender) throws RpcException{
+    sender.send(handle(connection, rpcType, pBody, dBody));
+  }
+
+  protected abstract Response handle(C connection, int rpcType, ByteBuf pBody, ByteBuf dBody) throws RpcException;
+
+  public abstract boolean isClient();
+
+  protected final RpcConfig rpcConfig;
+
+  protected volatile SocketAddress local;
+  protected volatile SocketAddress remote;
+
+
+  public RpcBus(RpcConfig rpcConfig) {
+    this.rpcConfig = rpcConfig;
+  }
+
+  protected void setAddresses(SocketAddress remote, SocketAddress local){
+    this.remote = remote;
+    this.local = local;
+  }
+
+  <SEND extends MessageLite, RECEIVE extends MessageLite> DrillRpcFuture<RECEIVE> send(C connection, T rpcType,
+      SEND protobufBody, Class<RECEIVE> clazz, ByteBuf... dataBodies) {
+    DrillRpcFutureImpl<RECEIVE> rpcFuture = new DrillRpcFutureImpl<RECEIVE>();
+    this.send(rpcFuture, connection, rpcType, protobufBody, clazz, dataBodies);
+    return rpcFuture;
+  }
+
+  public <SEND extends MessageLite, RECEIVE extends MessageLite> void send(RpcOutcomeListener<RECEIVE> listener, C connection, T rpcType,
+      SEND protobufBody, Class<RECEIVE> clazz, ByteBuf... dataBodies) {
+    send(listener, connection, rpcType, protobufBody, clazz, false, dataBodies);
+  }
+
+  public <SEND extends MessageLite, RECEIVE extends MessageLite> void send(RpcOutcomeListener<RECEIVE> listener, C connection, T rpcType,
+      SEND protobufBody, Class<RECEIVE> clazz, boolean allowInEventLoop, ByteBuf... dataBodies) {
+
+    Preconditions
+        .checkArgument(
+            allowInEventLoop || !connection.inEventLoop(),
+            "You attempted to send while inside the rpc event thread.  This isn't allowed because sending will block if the channel is backed up.");
+
+    ByteBuf pBuffer = null;
+    boolean completed = false;
+
+    try {
+
+      if (!allowInEventLoop && !connection.blockOnNotWritable(listener)) {
+        // if we're in not in the event loop and we're interrupted while blocking, skip sending this message.
+        return;
+      }
+
+      assert !Arrays.asList(dataBodies).contains(null);
+      assert rpcConfig.checkSend(rpcType, protobufBody.getClass(), clazz);
+
+      Preconditions.checkNotNull(protobufBody);
+      ChannelListenerWithCoordinationId futureListener = queue.get(listener, clazz, connection);
+      OutboundRpcMessage m = new OutboundRpcMessage(RpcMode.REQUEST, rpcType, futureListener.getCoordinationId(), protobufBody, dataBodies);
+      ChannelFuture channelFuture = connection.getChannel().writeAndFlush(m);
+      channelFuture.addListener(futureListener);
+      channelFuture.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
+      completed = true;
+    } catch (Exception | AssertionError e) {
+      listener.failed(new RpcException("Failure sending message.", e));
+    } finally {
+      if (!completed) {
+        if (pBuffer != null) {
+          pBuffer.release();
+        }
+        if (dataBodies != null) {
+          for (ByteBuf b : dataBodies) {
+            b.release();
+          }
+
+        }
+      }
+      ;
+    }
+  }
+
+  public abstract C initRemoteConnection(SocketChannel channel);
+
+  public class ChannelClosedHandler implements GenericFutureListener<ChannelFuture> {
+
+    final C clientConnection;
+    private final Channel channel;
+
+    public ChannelClosedHandler(C clientConnection, Channel channel) {
+      this.channel = channel;
+      this.clientConnection = clientConnection;
+    }
+
+    @Override
+    public void operationComplete(ChannelFuture future) throws Exception {
+      String msg;
+      if(local!=null) {
+        msg = String.format("Channel closed %s <--> %s.", local, remote);
+      }else{
+        msg = String.format("Channel closed %s <--> %s.", future.channel().localAddress(), future.channel().remoteAddress());
+      }
+
+      if (RpcBus.this.isClient()) {
+        if(local != null) {
+          logger.info(String.format(msg));
+        }
+      } else {
+        queue.channelClosed(new ChannelClosedException(msg));
+      }
+
+      clientConnection.close();
+    }
+
+  }
+
+  protected GenericFutureListener<ChannelFuture> getCloseHandler(SocketChannel channel, C clientConnection) {
+    return new ChannelClosedHandler(clientConnection, channel);
+  }
+
+  private interface Recyclable {
+    public void recycle();
+  }
+
+  private class ResponseSenderImpl implements ResponseSender {
+
+    private RemoteConnection connection;
+    private int coordinationId;
+    private final AtomicBoolean sent = new AtomicBoolean(false);
+    private final Recyclable recyclable;
+
+    public ResponseSenderImpl(Recyclable recyclable) {
+      this.recyclable = recyclable;
+    }
+
+    void set(RemoteConnection connection, int coordinationId){
+      this.connection = connection;
+      this.coordinationId = coordinationId;
+      sent.set(false);
+    }
+
+    public void send(Response r) {
+      try {
+        assert rpcConfig.checkResponseSend(r.rpcType, r.pBody.getClass());
+        sendOnce();
+        OutboundRpcMessage outMessage = new OutboundRpcMessage(RpcMode.RESPONSE, r.rpcType, coordinationId,
+            r.pBody, r.dBodies);
+        if (RpcConstants.EXTRA_DEBUGGING) {
+          logger.debug("Adding message to outbound buffer. {}", outMessage);
+        }
+        logger.debug("Sending response with Sender {}", System.identityHashCode(this));
+        connection.getChannel().writeAndFlush(outMessage);
+      } finally {
+        recyclable.recycle();
+      }
+    }
+
+    /**
+     * Ensures that each sender is only used once.
+     */
+    private void sendOnce() {
+      if (!sent.compareAndSet(false, true)) {
+        throw new IllegalStateException("Attempted to utilize a sender multiple times.");
+      }
+    }
+
+    void sendFailure(UserRpcException e){
+      try {
+        sendOnce();
+        UserException uex = UserException.systemError(e)
+            .addIdentity(e.getEndpoint())
+            .build(logger);
+
+        logger.error("Unexpected Error while handling request message", e);
+
+        OutboundRpcMessage outMessage = new OutboundRpcMessage(
+            RpcMode.RESPONSE_FAILURE,
+            0,
+            coordinationId,
+            uex.getOrCreatePBError(false)
+            );
+
+        if (RpcConstants.EXTRA_DEBUGGING) {
+          logger.debug("Adding message to outbound buffer. {}", outMessage);
+        }
+        connection.getChannel().writeAndFlush(outMessage);
+      } finally {
+        recyclable.recycle();
+      }
+    }
+
+  }
+
+
+  protected class InboundHandler extends MessageToMessageDecoder<InboundRpcMessage> {
+
+    private final RpcEventHandler exec;
+    private final C connection;
+
+    public InboundHandler(C connection) {
+      super();
+      this.connection = connection;
+      this.exec = new RpcEventHandler(rpcConfig.getExecutor());
+    }
+
+    @Override
+    protected void decode(final ChannelHandlerContext ctx, final InboundRpcMessage msg, final List<Object> output) throws Exception {
+      if (!ctx.channel().isOpen()) {
+        return;
+      }
+      if (RpcConstants.EXTRA_DEBUGGING) {
+        logger.debug("Received message {}", msg);
+      }
+      final Channel channel = connection.getChannel();
+      final Stopwatch watch = new Stopwatch().start();
+
+      try{
+
+        switch (msg.mode) {
+        case REQUEST:
+          RequestEvent reqEvent = requestRecycler.get();
+          reqEvent.set(msg.coordinationId, connection, msg.rpcType, msg.pBody, msg.dBody);
+          exec.execute(reqEvent);
+          break;
+
+        case RESPONSE:
+          ResponseEvent respEvent = responseRecycler.get();
+          respEvent.set(msg.rpcType, msg.coordinationId, msg.pBody, msg.dBody);
+          exec.execute(respEvent);
+          break;
+
+        case RESPONSE_FAILURE:
+          DrillPBError failure = DrillPBError.parseFrom(new ByteBufInputStream(msg.pBody, msg.pBody.readableBytes()));
+          queue.updateFailedFuture(msg.coordinationId, failure);
+          if (RpcConstants.EXTRA_DEBUGGING) {
+            logger.debug("Updated rpc future with coordinationId {} with failure ", msg.coordinationId, failure);
+          }
+          break;
+
+        case PING:
+          channel.writeAndFlush(PONG);
+          break;
+
+        case PONG:
+          // noop.
+          break;
+
+        default:
+          throw new UnsupportedOperationException();
+        }
+      } finally {
+        long time = watch.elapsed(TimeUnit.MILLISECONDS);
+        long delayThreshold = Integer.parseInt(System.getProperty("drill.exec.rpcDelayWarning", "500"));
+        if (time > delayThreshold) {
+          logger.warn(String.format(
+              "Message of mode %s of rpc type %d took longer than %dms.  Actual duration was %dms.",
+              msg.mode, msg.rpcType, delayThreshold, time));
+        }
+        msg.release();
+      }
+    }
+  }
+
+  public static <T> T get(ByteBuf pBody, Parser<T> parser) throws RpcException {
+    try {
+      ByteBufInputStream is = new ByteBufInputStream(pBody);
+      return parser.parseFrom(is);
+    } catch (InvalidProtocolBufferException e) {
+      throw new RpcException(String.format("Failure while decoding message with parser of type. %s", parser.getClass().getCanonicalName()), e);
+    }
+  }
+
+  class RpcEventHandler extends SerializedExecutor {
+
+    public RpcEventHandler(Executor underlyingExecutor) {
+      super(rpcConfig.getName() + "-rpc-event-queue", underlyingExecutor);
+    }
+
+    @Override
+    protected void runException(Runnable command, Throwable t) {
+      logger.error("Failure while running rpc command.", t);
+    }
+
+  }
+
+  private final Recycler<RequestEvent> requestRecycler = new Recycler<RequestEvent>() {
+    @Override
+    protected RequestEvent newObject(Handle handle) {
+      return new RequestEvent(handle);
+    }
+  };
+
+  private class RequestEvent implements Runnable, Recyclable {
+    private final ResponseSenderImpl sender;
+    private final Handle handle;
+    private C connection;
+    private int rpcType;
+    private ByteBuf pBody;
+    private ByteBuf dBody;
+
+    RequestEvent(Handle handle){
+      this.handle = handle;
+      sender = new ResponseSenderImpl(this);
+    }
+
+    public void set(int coordinationId, C connection, int rpcType, ByteBuf pBody, ByteBuf dBody) {
+      this.connection = connection;
+      this.rpcType = rpcType;
+      this.pBody = pBody;
+      this.dBody = dBody;
+      sender.set(connection, coordinationId);
+
+      if(pBody != null){
+        pBody.retain();
+      }
+
+      if(dBody != null){
+        dBody.retain();
+      }
+    }
+
+    @Override
+    public void run() {
+      try {
+        handle(connection, rpcType, pBody, dBody, sender);
+      } catch (UserRpcException e) {
+        sender.sendFailure(e);
+      } catch (Exception e) {
+        logger.error("Failure while handling message.", e);
+      }finally{
+        if(pBody != null){
+          pBody.release();
+        }
+
+        if(dBody != null){
+          dBody.release();
+        }
+      }
+
+    }
+
+    @Override
+    public void recycle() {
+      // We must defer recycling until the sender has been used.
+      requestRecycler.recycle(this, handle);
+    }
+
+
+  }
+
+  private final Recycler<ResponseEvent> responseRecycler = new Recycler<ResponseEvent>() {
+    @Override
+    protected ResponseEvent newObject(Handle handle) {
+      return new ResponseEvent(handle);
+    }
+  };
+
+  private class ResponseEvent implements Runnable {
+    private final Handle handle;
+
+    private int rpcType;
+    private int coordinationId;
+    private ByteBuf pBody;
+    private ByteBuf dBody;
+
+    public ResponseEvent(Handle handle){
+      this.handle = handle;
+    }
+
+    public void set(int rpcType, int coordinationId, ByteBuf pBody, ByteBuf dBody) {
+      this.rpcType = rpcType;
+      this.coordinationId = coordinationId;
+      this.pBody = pBody;
+      this.dBody = dBody;
+
+      if(pBody != null){
+        pBody.retain();
+      }
+
+      if(dBody != null){
+        dBody.retain();
+      }
+    }
+
+    public void run(){
+      try {
+        MessageLite m = getResponseDefaultInstance(rpcType);
+        assert rpcConfig.checkReceive(rpcType, m.getClass());
+        RpcOutcome<?> rpcFuture = queue.getFuture(rpcType, coordinationId, m.getClass());
+        Parser<?> parser = m.getParserForType();
+        Object value = parser.parseFrom(new ByteBufInputStream(pBody, pBody.readableBytes()));
+        rpcFuture.set(value, dBody);
+        if (RpcConstants.EXTRA_DEBUGGING) {
+          logger.debug("Updated rpc future {} with value {}", rpcFuture, value);
+        }
+      } catch (Exception ex) {
+        logger.error("Failure while handling response.", ex);
+      }finally{
+        if(pBody != null){
+          pBody.release();
+        }
+
+        if(dBody != null){
+          dBody.release();
+        }
+
+        responseRecycler.recycle(this, handle);
+      }
+
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/1fde9bb1/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcCheckedFuture.java
----------------------------------------------------------------------
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcCheckedFuture.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcCheckedFuture.java
new file mode 100644
index 0000000..11b07ad
--- /dev/null
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcCheckedFuture.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc;
+
+import io.netty.buffer.ByteBuf;
+
+import com.google.common.util.concurrent.AbstractCheckedFuture;
+import com.google.common.util.concurrent.ListenableFuture;
+
+public class RpcCheckedFuture<T> extends AbstractCheckedFuture<T, RpcException> implements DrillRpcFuture<T>{
+
+  volatile ByteBuf buffer;
+
+  public RpcCheckedFuture(ListenableFuture<T> delegate) {
+    super(delegate);
+  }
+
+  public void set(T obj, ByteBuf buffer){
+    this.buffer = buffer;
+  }
+
+  @Override
+  protected RpcException mapException(Exception e) {
+    return RpcException.mapException(e);
+  }
+
+  @Override
+  public ByteBuf getBuffer() {
+    return null;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/1fde9bb1/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcCommand.java
----------------------------------------------------------------------
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcCommand.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcCommand.java
new file mode 100644
index 0000000..93b901c
--- /dev/null
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcCommand.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc;
+
+import com.google.protobuf.MessageLite;
+
+public interface RpcCommand<T extends MessageLite, C extends RemoteConnection> extends RpcConnectionHandler<C>{
+
+  public abstract void connectionAvailable(C connection);
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/1fde9bb1/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcConfig.java
----------------------------------------------------------------------
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcConfig.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcConfig.java
new file mode 100644
index 0000000..22b253a
--- /dev/null
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcConfig.java
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc;
+
+import java.util.Map;
+import java.util.concurrent.Executor;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import com.google.protobuf.Internal.EnumLite;
+import com.google.protobuf.MessageLite;
+
+public class RpcConfig {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RpcConfig.class);
+
+  private final String name;
+  private final int timeout;
+  private final Map<EnumLite, RpcMessageType<?, ?, ?>> sendMap;
+  private final Map<Integer, RpcMessageType<?, ?, ?>> receiveMap;
+  private final Executor executor;
+
+  private RpcConfig(String name, Map<EnumLite, RpcMessageType<?, ?, ?>> sendMap,
+      Map<Integer, RpcMessageType<?, ?, ?>> receiveMap, int timeout, Executor executor) {
+    Preconditions.checkNotNull(executor, "Executor must be defined.");
+    this.name = name;
+    this.timeout = timeout;
+    this.sendMap = ImmutableMap.copyOf(sendMap);
+    this.receiveMap = ImmutableMap.copyOf(receiveMap);
+    this.executor = executor;
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  public int getTimeout() {
+    return timeout;
+  }
+
+  public boolean hasTimeout() {
+    return timeout > 0;
+  }
+
+  public Executor getExecutor() {
+    return executor;
+  }
+
+  public boolean checkReceive(int rpcType, Class<?> receiveClass) {
+    if (RpcConstants.EXTRA_DEBUGGING) {
+      logger.debug(String.format("Checking reception for rpcType %d and receive class %s.", rpcType, receiveClass));
+    }
+    RpcMessageType<?,?,?> type = receiveMap.get(rpcType);
+    if (type == null) {
+      throw new IllegalStateException(String.format("%s: There is no defined RpcMessage type for a Rpc receive type number of %s.", name, rpcType));
+    }
+
+    if (receiveClass != type.getRet()) {
+      throw new IllegalStateException(String.format("%s: The definition for receive doesn't match implementation code.  The definition is %s however the current receive for this type was of type %s.", name, type, receiveClass.getCanonicalName()));
+    }
+    return true;
+  }
+
+  public boolean checkSend(EnumLite send, Class<?> sendClass, Class<?> receiveClass) {
+    if (RpcConstants.EXTRA_DEBUGGING) {
+      logger.debug(String.format("Checking send classes for send RpcType %s.  Send Class is %s and Receive class is %s.", send, sendClass, receiveClass));
+    }
+    RpcMessageType<?,?,?> type = sendMap.get(send);
+    if (type == null) {
+      throw new IllegalStateException(String.format("%s: There is no defined RpcMessage type for a Rpc send type of %s.", name, send));
+    }
+
+    if (type.getSend() != sendClass) {
+      throw new IllegalStateException(String.format("%s: The definition for send doesn't match implementation code.  The definition is %s however the current send is trying to send an object of type %s.", name, type, sendClass.getCanonicalName()));
+    }
+    if (type.getRet() != receiveClass) {
+      throw new IllegalStateException(String.format("%s: The definition for send doesn't match implementation code.  The definition is %s however the current send is trying to setup an expected reception of an object of type %s.", name, type, receiveClass.getCanonicalName()));
+    }
+
+    return true;
+  }
+
+  public boolean checkResponseSend(EnumLite responseType, Class<?> responseClass) {
+    if (RpcConstants.EXTRA_DEBUGGING) {
+      logger.debug(String.format("Checking responce send of type %s with response class of %s.",  responseType, responseClass));
+    }
+    RpcMessageType<?,?,?> type = receiveMap.get(responseType.getNumber());
+    if (type == null) {
+      throw new IllegalStateException(String.format("%s: There is no defined RpcMessage type for a Rpc response of type %s.", name, responseType));
+    }
+    if (type.getRet() != responseClass) {
+      throw new IllegalStateException(String.format("%s: The definition for the response doesn't match implementation code.  The definition is %s however the current response is trying to response with an object of type %s.", name, type, responseClass.getCanonicalName()));
+    }
+
+    return true;
+  }
+
+  public static class RpcMessageType<SEND extends MessageLite, RECEIVE extends MessageLite, T extends EnumLite>{
+    private T sendEnum;
+    private Class<SEND> send;
+    private T receiveEnum;
+    private Class<RECEIVE> ret;
+    public RpcMessageType(T sendEnum, Class<SEND> send, T receiveEnum, Class<RECEIVE> ret) {
+      super();
+      this.sendEnum = sendEnum;
+      this.send = send;
+      this.receiveEnum = receiveEnum;
+      this.ret = ret;
+    }
+    public Class<SEND> getSend() {
+      return send;
+    }
+    public void setSend(Class<SEND> send) {
+      this.send = send;
+    }
+    public T getSendEnum() {
+      return sendEnum;
+    }
+    public void setSendEnum(T sendEnum) {
+      this.sendEnum = sendEnum;
+    }
+    public Class<RECEIVE> getRet() {
+      return ret;
+    }
+    public void setRet(Class<RECEIVE> ret) {
+      this.ret = ret;
+    }
+    public T getReceiveEnum() {
+      return receiveEnum;
+    }
+    public void setReceiveEnum(T receiveEnum) {
+      this.receiveEnum = receiveEnum;
+    }
+    @Override
+    public String toString() {
+      return "RpcMessageType [sendEnum=" + sendEnum + ", send=" + send + ", receiveEnum=" + receiveEnum + ", ret="
+          + ret + "]";
+    }
+
+  }
+
+  public static RpcConfigBuilder newBuilder() {
+    return new RpcConfigBuilder();
+  }
+
+  public static class RpcConfigBuilder {
+    private String name;
+    private int timeout = -1;
+    private Executor executor;
+    private Map<EnumLite, RpcMessageType<?, ?, ?>> sendMap = Maps.newHashMap();
+    private Map<Integer, RpcMessageType<?, ?, ?>> receiveMap = Maps.newHashMap();
+
+    private RpcConfigBuilder() {
+    }
+
+    public RpcConfigBuilder name(String name) {
+      this.name = name;
+      return this;
+    }
+
+    public RpcConfigBuilder timeout(int timeoutInSeconds) {
+      this.timeout = timeoutInSeconds;
+      return this;
+    }
+
+    public <SEND extends MessageLite, RECEIVE extends MessageLite, T extends EnumLite>  RpcConfigBuilder add(T sendEnum, Class<SEND> send, T receiveEnum, Class<RECEIVE> rec) {
+      RpcMessageType<SEND, RECEIVE, T> type = new RpcMessageType<SEND, RECEIVE, T>(sendEnum, send, receiveEnum, rec);
+      this.sendMap.put(sendEnum, type);
+      this.receiveMap.put(receiveEnum.getNumber(), type);
+      return this;
+    }
+
+    public RpcConfigBuilder executor(Executor executor) {
+      this.executor = executor;
+      return this;
+    }
+
+    public RpcConfig build() {
+      Preconditions.checkArgument(timeout > -1, "Timeout must be a positive number or zero for disabled.");
+      Preconditions.checkArgument(name != null, "RpcConfig name must be set.");
+      return new RpcConfig(name, sendMap, receiveMap, timeout, executor);
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/1fde9bb1/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcConnectionHandler.java
----------------------------------------------------------------------
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcConnectionHandler.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcConnectionHandler.java
new file mode 100644
index 0000000..7618231
--- /dev/null
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcConnectionHandler.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc;
+
+public interface RpcConnectionHandler<T extends RemoteConnection> {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RpcConnectionHandler.class);
+
+  public static enum FailureType{CONNECTION, HANDSHAKE_COMMUNICATION, HANDSHAKE_VALIDATION}
+
+  public void connectionSucceeded(T connection);
+  public void connectionFailed(FailureType type, Throwable t);
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/1fde9bb1/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcConstants.java
----------------------------------------------------------------------
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcConstants.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcConstants.java
new file mode 100644
index 0000000..4be365c
--- /dev/null
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcConstants.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc;
+
+public class RpcConstants {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RpcConstants.class);
+
+  private RpcConstants(){}
+
+  public static final boolean SOME_DEBUGGING = false;
+  public static final boolean EXTRA_DEBUGGING = false;
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/1fde9bb1/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcDecoder.java
----------------------------------------------------------------------
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcDecoder.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcDecoder.java
new file mode 100644
index 0000000..ac48187
--- /dev/null
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcDecoder.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufInputStream;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.CorruptedFrameException;
+import io.netty.handler.codec.MessageToMessageDecoder;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.drill.exec.proto.GeneralRPCProtos.RpcHeader;
+
+/**
+ * Converts a previously length adjusted buffer into an RpcMessage.
+ */
+class RpcDecoder extends MessageToMessageDecoder<ByteBuf> {
+  final org.slf4j.Logger logger;
+
+  private final AtomicLong messageCounter = new AtomicLong();
+
+  public RpcDecoder(String name) {
+    this.logger = org.slf4j.LoggerFactory.getLogger(RpcDecoder.class.getCanonicalName() + "-" + name);
+  }
+
+
+  @Override
+  protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {
+    if (!ctx.channel().isOpen()) {
+      return;
+    }
+
+    if (RpcConstants.EXTRA_DEBUGGING) {
+      logger.debug("Inbound rpc message received.");
+    }
+
+    // now, we know the entire message is in the buffer and the buffer is constrained to this message. Additionally,
+    // this process should avoid reading beyond the end of this buffer so we inform the ByteBufInputStream to throw an
+    // exception if be go beyond readable bytes (as opposed to blocking).
+    final ByteBufInputStream is = new ByteBufInputStream(buffer, buffer.readableBytes());
+
+    // read the rpc header, saved in delimited format.
+    checkTag(is, RpcEncoder.HEADER_TAG);
+    final RpcHeader header = RpcHeader.parseDelimitedFrom(is);
+
+    if (RpcConstants.EXTRA_DEBUGGING) {
+      logger.debug(" post header read index {}", buffer.readerIndex());
+    }
+
+    // read the protobuf body into a buffer.
+    checkTag(is, RpcEncoder.PROTOBUF_BODY_TAG);
+    final int pBodyLength = readRawVarint32(is);
+    final ByteBuf pBody = buffer.slice(buffer.readerIndex(), pBodyLength);
+    buffer.skipBytes(pBodyLength);
+    pBody.retain(1);
+    if (RpcConstants.EXTRA_DEBUGGING) {
+      logger.debug("Read protobuf body of length {} into buffer {}.", pBodyLength, pBody);
+    }
+
+    if (RpcConstants.EXTRA_DEBUGGING) {
+      logger.debug("post protobufbody read index {}", buffer.readerIndex());
+    }
+
+    ByteBuf dBody = null;
+    int dBodyLength = 0;
+
+    // read the data body.
+    if (buffer.readableBytes() > 0) {
+
+      if (RpcConstants.EXTRA_DEBUGGING) {
+        logger.debug("Reading raw body, buffer has {} bytes available, is available {}.", buffer.readableBytes(), is.available());
+      }
+      checkTag(is, RpcEncoder.RAW_BODY_TAG);
+      dBodyLength = readRawVarint32(is);
+      if (buffer.readableBytes() != dBodyLength) {
+        throw new CorruptedFrameException(String.format("Expected to receive a raw body of %d bytes but received a buffer with %d bytes.", dBodyLength, buffer.readableBytes()));
+      }
+      dBody = buffer.slice();
+      dBody.retain(1);
+      if (RpcConstants.EXTRA_DEBUGGING) {
+        logger.debug("Read raw body of {}", dBody);
+      }
+
+    }else{
+      if (RpcConstants.EXTRA_DEBUGGING) {
+        logger.debug("No need to read raw body, no readable bytes left.");
+      }
+    }
+
+
+    // return the rpc message.
+    InboundRpcMessage m = new InboundRpcMessage(header.getMode(), header.getRpcType(), header.getCoordinationId(),
+        pBody, dBody);
+
+    // move the reader index forward so the next rpc call won't try to work with it.
+    buffer.skipBytes(dBodyLength);
+    messageCounter.incrementAndGet();
+    if (RpcConstants.SOME_DEBUGGING) {
+      logger.debug("Inbound Rpc Message Decoded {}.", m);
+    }
+    out.add(m);
+
+  }
+
+  private void checkTag(ByteBufInputStream is, int expectedTag) throws IOException {
+    int actualTag = readRawVarint32(is);
+    if (actualTag != expectedTag) {
+      throw new CorruptedFrameException(String.format("Expected to read a tag of %d but actually received a value of %d.  Happened after reading %d message.", expectedTag, actualTag, messageCounter.get()));
+    }
+  }
+
+  // Taken from CodedInputStream and modified to enable ByteBufInterface.
+  public static int readRawVarint32(ByteBufInputStream is) throws IOException {
+    byte tmp = is.readByte();
+    if (tmp >= 0) {
+      return tmp;
+    }
+    int result = tmp & 0x7f;
+    if ((tmp = is.readByte()) >= 0) {
+      result |= tmp << 7;
+    } else {
+      result |= (tmp & 0x7f) << 7;
+      if ((tmp = is.readByte()) >= 0) {
+        result |= tmp << 14;
+      } else {
+        result |= (tmp & 0x7f) << 14;
+        if ((tmp = is.readByte()) >= 0) {
+          result |= tmp << 21;
+        } else {
+          result |= (tmp & 0x7f) << 21;
+          result |= (tmp = is.readByte()) << 28;
+          if (tmp < 0) {
+            // Discard upper 32 bits.
+            for (int i = 0; i < 5; i++) {
+              if (is.readByte() >= 0) {
+                return result;
+              }
+            }
+            throw new CorruptedFrameException("Encountered a malformed varint.");
+          }
+        }
+      }
+    }
+    return result;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/1fde9bb1/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcEncoder.java
----------------------------------------------------------------------
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcEncoder.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcEncoder.java
new file mode 100644
index 0000000..f9da6f1
--- /dev/null
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcEncoder.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufOutputStream;
+import io.netty.buffer.CompositeByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToMessageEncoder;
+
+import java.io.OutputStream;
+import java.util.List;
+
+import org.apache.drill.exec.proto.GeneralRPCProtos.CompleteRpcMessage;
+import org.apache.drill.exec.proto.GeneralRPCProtos.RpcHeader;
+
+import com.google.protobuf.CodedOutputStream;
+import com.google.protobuf.WireFormat;
+
+/**
+ * Converts an RPCMessage into wire format.
+ */
+class RpcEncoder extends MessageToMessageEncoder<OutboundRpcMessage>{
+  final org.slf4j.Logger logger;
+
+  static final int HEADER_TAG = makeTag(CompleteRpcMessage.HEADER_FIELD_NUMBER, WireFormat.WIRETYPE_LENGTH_DELIMITED);
+  static final int PROTOBUF_BODY_TAG = makeTag(CompleteRpcMessage.PROTOBUF_BODY_FIELD_NUMBER, WireFormat.WIRETYPE_LENGTH_DELIMITED);
+  static final int RAW_BODY_TAG = makeTag(CompleteRpcMessage.RAW_BODY_FIELD_NUMBER, WireFormat.WIRETYPE_LENGTH_DELIMITED);
+  static final int HEADER_TAG_LENGTH = getRawVarintSize(HEADER_TAG);
+  static final int PROTOBUF_BODY_TAG_LENGTH = getRawVarintSize(PROTOBUF_BODY_TAG);
+  static final int RAW_BODY_TAG_LENGTH = getRawVarintSize(RAW_BODY_TAG);
+
+  public RpcEncoder(String name) {
+    this.logger = org.slf4j.LoggerFactory.getLogger(RpcEncoder.class.getCanonicalName() + "-" + name);
+  }
+
+  @Override
+  protected void encode(ChannelHandlerContext ctx, OutboundRpcMessage msg, List<Object> out) throws Exception {
+    if (RpcConstants.EXTRA_DEBUGGING) {
+      logger.debug("Rpc Encoder called with msg {}", msg);
+    }
+
+    if (!ctx.channel().isOpen()) {
+      //output.add(ctx.alloc().buffer(0));
+      logger.debug("Channel closed, skipping encode.");
+      msg.release();
+      return;
+    }
+
+    try{
+      if (RpcConstants.EXTRA_DEBUGGING) {
+        logger.debug("Encoding outbound message {}", msg);
+      }
+      // first we build the RpcHeader
+      RpcHeader header = RpcHeader.newBuilder() //
+          .setMode(msg.mode) //
+          .setCoordinationId(msg.coordinationId) //
+          .setRpcType(msg.rpcType).build();
+
+      // figure out the full length
+      int headerLength = header.getSerializedSize();
+      int protoBodyLength = msg.pBody.getSerializedSize();
+      int rawBodyLength = msg.getRawBodySize();
+      int fullLength = //
+          HEADER_TAG_LENGTH + getRawVarintSize(headerLength) + headerLength +   //
+          PROTOBUF_BODY_TAG_LENGTH + getRawVarintSize(protoBodyLength) + protoBodyLength; //
+
+      if (rawBodyLength > 0) {
+        fullLength += (RAW_BODY_TAG_LENGTH + getRawVarintSize(rawBodyLength) + rawBodyLength);
+      }
+
+      ByteBuf buf = ctx.alloc().buffer();
+      OutputStream os = new ByteBufOutputStream(buf);
+      CodedOutputStream cos = CodedOutputStream.newInstance(os);
+
+      // write full length first (this is length delimited stream).
+      cos.writeRawVarint32(fullLength);
+
+      // write header
+      cos.writeRawVarint32(HEADER_TAG);
+      cos.writeRawVarint32(headerLength);
+      header.writeTo(cos);
+
+      // write protobuf body length and body
+      cos.writeRawVarint32(PROTOBUF_BODY_TAG);
+      cos.writeRawVarint32(protoBodyLength);
+      msg.pBody.writeTo(cos);
+
+      // if exists, write data body and tag.
+      if (msg.getRawBodySize() > 0) {
+        if(RpcConstants.EXTRA_DEBUGGING) {
+          logger.debug("Writing raw body of size {}", msg.getRawBodySize());
+        }
+
+        cos.writeRawVarint32(RAW_BODY_TAG);
+        cos.writeRawVarint32(rawBodyLength);
+        cos.flush(); // need to flush so that dbody goes after if cos is caching.
+
+        CompositeByteBuf cbb = new CompositeByteBuf(buf.alloc(), true, msg.dBodies.length + 1);
+        cbb.addComponent(buf);
+        int bufLength = buf.readableBytes();
+        for (ByteBuf b : msg.dBodies) {
+          cbb.addComponent(b);
+          bufLength += b.readableBytes();
+        }
+        cbb.writerIndex(bufLength);
+        out.add(cbb);
+      } else {
+        cos.flush();
+        out.add(buf);
+      }
+
+      if (RpcConstants.SOME_DEBUGGING) {
+        logger.debug("Wrote message length {}:{} bytes (head:body).  Message: " + msg, getRawVarintSize(fullLength), fullLength);
+      }
+      if (RpcConstants.EXTRA_DEBUGGING) {
+        logger.debug("Sent message.  Ending writer index was {}.", buf.writerIndex());
+      }
+    } finally {
+      // make sure to release Rpc Messages underlying byte buffers.
+      //msg.release();
+    }
+  }
+
+  /** Makes a tag value given a field number and wire type, copied from WireFormat since it isn't public.  */
+  static int makeTag(final int fieldNumber, final int wireType) {
+    return (fieldNumber << 3) | wireType;
+  }
+
+  public static int getRawVarintSize(int value) {
+    int count = 0;
+    while (true) {
+      if ((value & ~0x7F) == 0) {
+        count++;
+        return count;
+      } else {
+        count++;
+        value >>>= 7;
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/1fde9bb1/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcException.java
----------------------------------------------------------------------
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcException.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcException.java
new file mode 100644
index 0000000..a6d0e8e
--- /dev/null
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcException.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc;
+
+import java.util.concurrent.ExecutionException;
+
+import org.apache.drill.common.exceptions.DrillIOException;
+import org.apache.drill.common.util.DrillStringUtils;
+import org.apache.drill.exec.proto.UserBitShared.DrillPBError;
+
+/**
+ * Parent class for all rpc exceptions.
+ */
+public class RpcException extends DrillIOException{
+  private static final long serialVersionUID = -5964230316010502319L;
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RpcException.class);
+
+  public RpcException() {
+    super();
+  }
+
+  public RpcException(String message, Throwable cause) {
+    super(format(message), cause);
+  }
+
+  private static String format(String message) {
+    return DrillStringUtils.unescapeJava(message);
+  }
+
+  public RpcException(String message) {
+    super(format(message));
+  }
+
+  public RpcException(Throwable cause) {
+    super(cause);
+  }
+
+  public static RpcException mapException(Throwable t) {
+    while (t instanceof ExecutionException) {
+      t = ((ExecutionException)t).getCause();
+    }
+    if (t instanceof RpcException) {
+      return ((RpcException) t);
+    }
+    return new RpcException(t);
+  }
+
+  public static RpcException mapException(String message, Throwable t) {
+    while (t instanceof ExecutionException) {
+      t = ((ExecutionException)t).getCause();
+    }
+    return new RpcException(message, t);
+  }
+
+  public boolean isRemote(){
+    return false;
+  }
+
+  public DrillPBError getRemoteError(){
+    throw new UnsupportedOperationException();
+  }
+}


Mime
View raw message