drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jacq...@apache.org
Subject drill git commit: DRILL-3714: Avoid cascading disconnection when a single connection is broken.
Date Wed, 13 Apr 2016 21:42:25 GMT
Repository: drill
Updated Branches:
  refs/heads/master e9b6e8f3d -> e4725ea53


DRILL-3714: Avoid cascading disconnection when a single connection is broken.

- Move the coordination id management to be connection level instead of RpcBus level
- Rename CoordinationQueue to a more appropriate name: RequestIdMap
- Simplify locking and memory overhead of RequestIdMap. It used to be that this would accessed
by a large number of threads concurrently. We modified the behavior so that it is only accessed
by two threads at most. Rather than have memory overhead of ConcurrentHashMap, switch to simple
locking approach since contention should be low.
- Update all methods associated with coordination to improve names as well as add javadocs.
Move these methods to the RemoteConnection.
- Consolidate the two different close handlers into a single, ordered close handler managed
inside the connection.
- Add better javadoc around the close method of RemoteConnection
- Add some preconditions checks.
- Update the HPPC version in the base memory module since it conflicts with the one in the
java-exec module.

This closes #463.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/e4725ea5
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/e4725ea5
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/e4725ea5

Branch: refs/heads/master
Commit: e4725ea53dce58499974be93b3da87388e7df51c
Parents: e9b6e8f
Author: Jacques Nadeau <jacques@apache.org>
Authored: Tue Apr 5 16:36:25 2016 -0700
Committer: Jacques Nadeau <jacques@apache.org>
Committed: Wed Apr 13 14:39:12 2016 -0700

----------------------------------------------------------------------
 exec/memory/base/pom.xml                        |   2 +-
 .../org/apache/drill/exec/rpc/BasicClient.java  |  18 +-
 .../org/apache/drill/exec/rpc/BasicServer.java  |   5 -
 .../drill/exec/rpc/CoordinationQueue.java       | 160 ----------------
 .../drill/exec/rpc/PositiveAtomicInteger.java   |  39 ----
 .../apache/drill/exec/rpc/RemoteConnection.java | 111 ++++++++---
 .../org/apache/drill/exec/rpc/RequestIdMap.java | 192 +++++++++++++++++++
 .../java/org/apache/drill/exec/rpc/RpcBus.java  |  35 ++--
 8 files changed, 296 insertions(+), 266 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/e4725ea5/exec/memory/base/pom.xml
----------------------------------------------------------------------
diff --git a/exec/memory/base/pom.xml b/exec/memory/base/pom.xml
index 804cdd6..54335a1 100644
--- a/exec/memory/base/pom.xml
+++ b/exec/memory/base/pom.xml
@@ -36,7 +36,7 @@
     <dependency>
       <groupId>com.carrotsearch</groupId>
       <artifactId>hppc</artifactId>
-      <version>0.5.2</version>
+      <version>0.7.1</version>
     </dependency>
   </dependencies>
 

http://git-wip-us.apache.org/repos/asf/drill/blob/e4725ea5/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
----------------------------------------------------------------------
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicClient.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
index ed6e791..0a501fd 100644
--- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
@@ -41,6 +41,7 @@ import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.proto.GeneralRPCProtos.RpcMode;
 import org.apache.drill.exec.rpc.RpcConnectionHandler.FailureType;
 
+import com.google.common.base.Preconditions;
 import com.google.protobuf.Internal.EnumLite;
 import com.google.protobuf.MessageLite;
 import com.google.protobuf.Parser;
@@ -95,7 +96,7 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection
             pipe.addLast("protocol-decoder", getDecoder(connection.getAllocator()));
             pipe.addLast("message-decoder", new RpcDecoder("c-" + rpcConfig.getName()));
             pipe.addLast("protocol-encoder", new RpcEncoder("c-" + rpcConfig.getName()));
-            pipe.addLast("handshake-handler", new ClientHandshakeHandler());
+            pipe.addLast("handshake-handler", new ClientHandshakeHandler(connection));
 
             if(pingHandler != null){
               pipe.addLast("idle-state-handler", pingHandler);
@@ -168,11 +169,6 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection
     return super.send(connection, rpcType, protobufBody, clazz, dataBodies);
   }
 
-  @Override
-  public boolean isClient() {
-    return true;
-  }
-
   protected void connectAsClient(RpcConnectionHandler<R> connectionListener, HANDSHAKE_SEND
handshakeValue,
       String host, int port) {
     ConnectionMultiListener cml = new ConnectionMultiListener(connectionListener, handshakeValue);
@@ -282,15 +278,19 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection
 
   private class ClientHandshakeHandler extends AbstractHandshakeHandler<HANDSHAKE_RESPONSE>
{
 
-    public ClientHandshakeHandler() {
+    private final R connection;
+
+    public ClientHandshakeHandler(R connection) {
       super(BasicClient.this.handshakeType, BasicClient.this.handshakeParser);
+      Preconditions.checkNotNull(connection);
+      this.connection = connection;
     }
 
     @Override
     protected final void consumeHandshake(ChannelHandlerContext ctx, HANDSHAKE_RESPONSE msg)
throws Exception {
       // remove the handshake information from the queue so it doesn't sit there forever.
-      RpcOutcome<HANDSHAKE_RESPONSE> response = queue.getFuture(handshakeType.getNumber(),
coordinationId,
-          responseClass);
+      final RpcOutcome<HANDSHAKE_RESPONSE> response =
+          connection.getAndRemoveRpcOutcome(handshakeType.getNumber(), coordinationId, responseClass);
       response.set(msg, null);
     }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/e4725ea5/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
----------------------------------------------------------------------
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicServer.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
index 27364af..b54d73e 100644
--- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
@@ -133,11 +133,6 @@ public abstract class BasicServer<T extends EnumLite, C extends RemoteConnection
 
   public abstract ProtobufLengthDecoder getDecoder(BufferAllocator allocator, OutOfMemoryHandler
outOfMemoryHandler);
 
-  @Override
-  public boolean isClient() {
-    return false;
-  }
-
   protected abstract ServerHandshakeHandler<?> getHandshakeHandler(C connection);
 
   protected static abstract class ServerHandshakeHandler<T extends MessageLite> extends
AbstractHandshakeHandler<T> {

http://git-wip-us.apache.org/repos/asf/drill/blob/e4725ea5/exec/rpc/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java
----------------------------------------------------------------------
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java
deleted file mode 100644
index 5a5bbab..0000000
--- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.rpc;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelFuture;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.drill.common.exceptions.UserRemoteException;
-import org.apache.drill.exec.proto.UserBitShared.DrillPBError;
-
-/**
- * Manages the creation of rpc futures for a particular socket.
- */
-public class CoordinationQueue {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CoordinationQueue.class);
-
-  private final PositiveAtomicInteger circularInt = new PositiveAtomicInteger();
-  private final Map<Integer, RpcOutcome<?>> map;
-
-  public CoordinationQueue(int segmentSize, int segmentCount) {
-    map = new ConcurrentHashMap<Integer, RpcOutcome<?>>(segmentSize, 0.75f, segmentCount);
-  }
-
-  void channelClosed(Throwable ex) {
-    if (ex != null) {
-      RpcException e;
-      if (ex instanceof RpcException) {
-        e = (RpcException) ex;
-      } else {
-        e = new RpcException(ex);
-      }
-      for (RpcOutcome<?> f : map.values()) {
-        f.setException(e);
-      }
-    }
-  }
-
-  public <V> ChannelListenerWithCoordinationId get(RpcOutcomeListener<V> handler,
Class<V> clazz, RemoteConnection connection) {
-    int i = circularInt.getNext();
-    RpcListener<V> future = new RpcListener<V>(handler, clazz, i, connection);
-    Object old = map.put(i, future);
-    if (old != null) {
-      throw new IllegalStateException(
-          "You attempted to reuse a coordination id when the previous coordination id has
not been removed.  This is likely rpc future callback memory leak.");
-    }
-    return future;
-  }
-
-  private class RpcListener<T> implements ChannelListenerWithCoordinationId, RpcOutcome<T>{
-    final RpcOutcomeListener<T> handler;
-    final Class<T> clazz;
-    final int coordinationId;
-    final RemoteConnection connection;
-
-    public RpcListener(RpcOutcomeListener<T> handler, Class<T> clazz, int coordinationId,
RemoteConnection connection) {
-      super();
-      this.handler = handler;
-      this.clazz = clazz;
-      this.coordinationId = coordinationId;
-      this.connection = connection;
-    }
-
-    @Override
-    public void operationComplete(ChannelFuture future) throws Exception {
-
-      if (!future.isSuccess()) {
-        removeFromMap(coordinationId);
-        if (future.channel().isActive()) {
-           throw new RpcException("Future failed") ;
-        } else {
-          setException(new ChannelClosedException());
-        }
-      }
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public void set(Object value, ByteBuf buffer) {
-      assert clazz.isAssignableFrom(value.getClass());
-      handler.success( (T) value, buffer);
-    }
-
-    @Override
-    public void setException(Throwable t) {
-      handler.failed(RpcException.mapException(t));
-    }
-
-    @Override
-    public Class<T> getOutcomeType() {
-      return clazz;
-    }
-
-    @Override
-    public int getCoordinationId() {
-      return coordinationId;
-    }
-
-  }
-
-  private RpcOutcome<?> removeFromMap(int coordinationId) {
-    RpcOutcome<?> rpc = map.remove(coordinationId);
-    if (rpc == null) {
-      throw new IllegalStateException(
-          "Attempting to retrieve an rpc that wasn't first stored in the rpc coordination
queue.  This would most likely happen if you're opposite endpoint sent multiple messages on
the same coordination id.");
-    }
-    return rpc;
-  }
-
-  public <V> RpcOutcome<V> getFuture(int rpcType, int coordinationId, Class<V>
clazz) {
-
-    RpcOutcome<?> rpc = removeFromMap(coordinationId);
-    // logger.debug("Got rpc from map {}", rpc);
-    Class<?> outcomeClass = rpc.getOutcomeType();
-
-    if (outcomeClass != clazz) {
-      throw new IllegalStateException(
-          String
-              .format(
-                  "RPC Engine had a submission and response configuration mismatch.  The
RPC request that you submitted was defined with an expected response type of %s.  However,
"
-                      + "when the response returned, a call to getResponseDefaultInstance()
with Rpc number %d provided an expected class of %s.  This means either your submission uses
the wrong type definition"
-                      + "or your getResponseDefaultInstance() method responds the wrong instance
type ",
-                  clazz.getCanonicalName(), rpcType, outcomeClass.getCanonicalName()));
-    }
-
-    @SuppressWarnings("unchecked")
-    RpcOutcome<V> crpc = (RpcOutcome<V>) rpc;
-
-    // logger.debug("Returning casted future");
-    return crpc;
-  }
-
-  public void updateFailedFuture(int coordinationId, DrillPBError failure) {
-    // logger.debug("Updating failed future.");
-    try {
-      RpcOutcome<?> rpc = removeFromMap(coordinationId);
-      rpc.setException(new UserRemoteException(failure));
-    } catch(Exception ex) {
-      logger.warn("Failed to remove from map.  Not a problem since we were updating on failed
future.", ex);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/e4725ea5/exec/rpc/src/main/java/org/apache/drill/exec/rpc/PositiveAtomicInteger.java
----------------------------------------------------------------------
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/PositiveAtomicInteger.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/PositiveAtomicInteger.java
deleted file mode 100644
index 401663d..0000000
--- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/PositiveAtomicInteger.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.rpc;
-
-import java.util.concurrent.atomic.AtomicInteger;
-
-/*
- * An atomic integer that only ever returns 0 > MAX_INT and then starts over.  Should
never has a negative overflow.
- */
-public class PositiveAtomicInteger {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PositiveAtomicInteger.class);
-
-  private final AtomicInteger internal = new AtomicInteger(0);
-
-  public int getNext(){
-    int i = internal.addAndGet(1);
-    if(i < 0){
-      return i + (-Integer.MIN_VALUE);
-    }else{
-      return i;
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/e4725ea5/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
----------------------------------------------------------------------
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
index 561f0a4..ad68140 100644
--- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
@@ -21,21 +21,21 @@ import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.channel.socket.SocketChannel;
-import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.GenericFutureListener;
-
 import java.util.concurrent.ExecutionException;
 
 import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.proto.UserBitShared.DrillPBError;
 
 public abstract class RemoteConnection implements ConnectionThrottle, AutoCloseable {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RemoteConnection.class);
   private final Channel channel;
   private final WriteManager writeManager;
-  private String name;
+  private final RequestIdMap requestIdMap = new RequestIdMap();
   private final String clientName;
 
-  public boolean inEventLoop(){
+  private String name;
+
+  public boolean inEventLoop() {
     return channel.eventLoop().inEventLoop();
   }
 
@@ -45,19 +45,10 @@ public abstract class RemoteConnection implements ConnectionThrottle,
AutoClosea
     this.clientName = name;
     this.writeManager = new WriteManager();
     channel.pipeline().addLast(new BackPressureHandler());
-    channel.closeFuture().addListener(new GenericFutureListener<Future<? super Void>>()
{
-      public void operationComplete(Future<? super Void> future) throws Exception {
-        // this could possibly overrelease but it doesn't matter since we're only going to
do this to ensure that we
-        // fail out any pending messages
-        writeManager.disable();
-        writeManager.setWritable(true);
-      }
-    });
-
   }
 
   public String getName() {
-    if(name == null){
+    if (name == null) {
       name = String.format("%s <--> %s (%s)", channel.localAddress(), channel.remoteAddress(),
clientName);
     }
     return name;
@@ -69,14 +60,15 @@ public abstract class RemoteConnection implements ConnectionThrottle,
AutoClosea
     return channel;
   }
 
-  public boolean blockOnNotWritable(RpcOutcomeListener<?> listener){
-    try{
+  public boolean blockOnNotWritable(RpcOutcomeListener<?> listener) {
+    try {
       writeManager.waitForWritable();
       return true;
-    }catch(final InterruptedException e){
+    } catch (final InterruptedException e) {
       listener.interrupted(e);
 
-      // Preserve evidence that the interruption occurred so that code higher up on the call
stack can learn of the
+      // Preserve evidence that the interruption occurred so that code higher up
+      // on the call stack can learn of the
       // interruption and respond to it if it wants to.
       Thread.currentThread().interrupt();
 
@@ -84,31 +76,33 @@ public abstract class RemoteConnection implements ConnectionThrottle,
AutoClosea
     }
   }
 
-  public void setAutoRead(boolean enableAutoRead){
+  public void setAutoRead(boolean enableAutoRead) {
     channel.config().setAutoRead(enableAutoRead);
   }
 
-  public boolean isActive(){
+  public boolean isActive() {
     return channel.isActive();
   }
 
   /**
-   * The write manager is responsible for controlling whether or not a write can be sent.
 It controls whether or not to block a sender if we have tcp backpressure on the receive side.
+   * The write manager is responsible for controlling whether or not a write can
+   * be sent. It controls whether or not to block a sender if we have tcp
+   * backpressure on the receive side.
    */
-  private static class WriteManager{
+  private static class WriteManager {
     private final ResettableBarrier barrier = new ResettableBarrier();
     private volatile boolean disabled = false;
 
-    public WriteManager(){
+    public WriteManager() {
       barrier.openBarrier();
     }
 
-    public void waitForWritable() throws InterruptedException{
+    public void waitForWritable() throws InterruptedException {
       barrier.await();
     }
 
-    public void setWritable(boolean isWritable){
-      if(isWritable){
+    public void setWritable(boolean isWritable) {
+      if (isWritable) {
         barrier.openBarrier();
       } else if (!disabled) {
         barrier.closeBarrier();
@@ -121,17 +115,73 @@ public abstract class RemoteConnection implements ConnectionThrottle,
AutoClosea
     }
   }
 
-  private class BackPressureHandler extends ChannelInboundHandlerAdapter{
+  private class BackPressureHandler extends ChannelInboundHandlerAdapter {
 
     @Override
     public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
-//      logger.debug("Channel writability changed.", ctx.channel().isWritable());
       writeManager.setWritable(ctx.channel().isWritable());
       ctx.fireChannelWritabilityChanged();
     }
 
   }
 
+  /**
+   * For incoming messages, remove the outcome listener and return it. Can only be done once
per coordinationId
+   * creation. CoordinationId's are recycled so they will show up once we run through all
4B of them.
+   * @param rpcType The rpc type associated with the coordination.
+   * @param coordinationId The coordination id that was returned with the listener was created.
+   * @param clazz The class that is expected in response.
+   * @return An RpcOutcome associated with the provided coordinationId.
+   */
+  <V> RpcOutcome<V> getAndRemoveRpcOutcome(int rpcType, int coordinationId, Class<V>
clazz) {
+    return requestIdMap.getAndRemoveRpcOutcome(rpcType, coordinationId, clazz);
+  }
+
+  /**
+   * Create a new rpc listener that will be notified when the response is returned.
+   * @param handler The outcome handler to be notified when the response arrives.
+   * @param clazz The Class associated with the response object.
+   * @return The new listener. Also carries the coordination id for use in the rpc message.
+   */
+  <V> ChannelListenerWithCoordinationId createNewRpcListener(RpcOutcomeListener<V>
handler, Class<V> clazz) {
+    return requestIdMap.createNewRpcListener(handler, clazz, this);
+  }
+
+  /**
+   * Inform the local outcome listener that the remote operation could not be handled.
+   * @param coordinationId The id that failed.
+   * @param failure The failure that occurred.
+   */
+  void recordRemoteFailure(int coordinationId, DrillPBError failure) {
+    requestIdMap.recordRemoteFailure(coordinationId, failure);
+  }
+
+  /**
+   * Called from the RpcBus's channel close handler to close all remaining
+   * resources associated with this connection. Ensures that any pending
+   * back-pressure items are also unblocked so they can be thrown away.
+   *
+   * @param ex
+   *          The exception that caused the channel to close.
+   */
+  void channelClosed(RpcException ex) {
+    // this could possibly overrelease but it doesn't matter since we're only
+    // going to do this to ensure that we
+    // fail out any pending messages
+    writeManager.disable();
+    writeManager.setWritable(true);
+
+    // ensure outstanding requests are cleaned up.
+    requestIdMap.channelClosed(ex);
+  }
+
+  /**
+   * Connection consumer wants to close connection. Initiate connection close
+   * and complete. This is a blocking call that ensures that the connection is
+   * closed before returning. As part of this call, the channel close handler
+   * will be triggered which will call channelClosed() above. The latter will
+   * happen in a separate thread while this method is blocking.
+   */
   @Override
   public void close() {
     try {
@@ -141,7 +191,8 @@ public abstract class RemoteConnection implements ConnectionThrottle,
AutoClosea
     } catch (final InterruptedException | ExecutionException e) {
       logger.warn("Caught exception while closing channel.", e);
 
-      // Preserve evidence that the interruption occurred so that code higher up on the call
stack can learn of the
+      // Preserve evidence that the interruption occurred so that code higher up
+      // on the call stack can learn of the
       // interruption and respond to it if it wants to.
       Thread.currentThread().interrupt();
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/e4725ea5/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RequestIdMap.java
----------------------------------------------------------------------
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RequestIdMap.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RequestIdMap.java
new file mode 100644
index 0000000..a9c3012
--- /dev/null
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RequestIdMap.java
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelFuture;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.drill.common.exceptions.UserRemoteException;
+import org.apache.drill.exec.proto.UserBitShared.DrillPBError;
+
+import com.carrotsearch.hppc.IntObjectHashMap;
+import com.carrotsearch.hppc.procedures.IntObjectProcedure;
+import com.google.common.base.Preconditions;
+
+/**
+ * Manages the creation of rpc futures for a particular socket <--> socket
+ * connection. Generally speaking, there will be two threads working with this
+ * class (the socket thread and the Request generating thread). Synchronization
+ * is simple with the map being the only thing that is protected. Everything
+ * else works via Atomic variables.
+ */
+class RequestIdMap {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RequestIdMap.class);
+
+  private final AtomicInteger lastCoordinationId = new AtomicInteger();
+  private final AtomicBoolean isOpen = new AtomicBoolean(true);
+
+  /** Access to map must be protected. **/
+  private final IntObjectHashMap<RpcOutcome<?>> map;
+
+  public RequestIdMap() {
+    map = new IntObjectHashMap<RpcOutcome<?>>();
+  }
+
+  void channelClosed(Throwable ex) {
+    isOpen.set(false);
+    if (ex != null) {
+      final RpcException e = RpcException.mapException(ex);
+      synchronized (map) {
+        map.forEach(new SetExceptionProcedure(e));
+        map.clear();
+      }
+    }
+  }
+
+  private class SetExceptionProcedure implements IntObjectProcedure<RpcOutcome<?>>
{
+    final RpcException exception;
+
+    public SetExceptionProcedure(RpcException exception) {
+      this.exception = exception;
+    }
+
+    @Override
+    public void apply(int key, RpcOutcome<?> value) {
+      try{
+        value.setException(exception);
+      }catch(Exception e){
+        logger.warn("Failure while attempting to fail rpc response.", e);
+      }
+    }
+
+  }
+
+  public <V> ChannelListenerWithCoordinationId createNewRpcListener(RpcOutcomeListener<V>
handler, Class<V> clazz,
+      RemoteConnection connection) {
+    final int i = lastCoordinationId.incrementAndGet();
+    final RpcListener<V> future = new RpcListener<V>(handler, clazz, i, connection);
+    final Object old;
+    synchronized (map) {
+      Preconditions.checkArgument(isOpen.get(),
+          "Attempted to send a message when connection is no longer valid.");
+      old = map.put(i, future);
+    }
+    Preconditions.checkArgument(old == null,
+        "You attempted to reuse a coordination id when the previous coordination id has not
been removed.  "
+        + "This is likely rpc future callback memory leak.");
+    return future;
+  }
+
+  private class RpcListener<T> implements ChannelListenerWithCoordinationId, RpcOutcome<T>
{
+    final RpcOutcomeListener<T> handler;
+    final Class<T> clazz;
+    final int coordinationId;
+    final RemoteConnection connection;
+
+    public RpcListener(RpcOutcomeListener<T> handler, Class<T> clazz, int coordinationId,
RemoteConnection connection) {
+      super();
+      this.handler = handler;
+      this.clazz = clazz;
+      this.coordinationId = coordinationId;
+      this.connection = connection;
+    }
+
+    @Override
+    public void operationComplete(ChannelFuture future) throws Exception {
+
+      if (!future.isSuccess()) {
+        removeFromMap(coordinationId);
+        if (future.channel().isActive()) {
+          throw new RpcException("Future failed");
+        } else {
+          setException(new ChannelClosedException());
+        }
+      }
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void set(Object value, ByteBuf buffer) {
+      assert clazz.isAssignableFrom(value.getClass());
+      handler.success((T) value, buffer);
+    }
+
+    @Override
+    public void setException(Throwable t) {
+      handler.failed(RpcException.mapException(t));
+    }
+
+    @Override
+    public Class<T> getOutcomeType() {
+      return clazz;
+    }
+
+    @Override
+    public int getCoordinationId() {
+      return coordinationId;
+    }
+
+  }
+
+  private RpcOutcome<?> removeFromMap(int coordinationId) {
+    final RpcOutcome<?> rpc;
+    synchronized (map) {
+      rpc = map.remove(coordinationId);
+    }
+    if (rpc == null) {
+      throw new IllegalStateException(
+          "Attempting to retrieve an rpc that wasn't first stored in the rpc coordination
queue.  This would most likely happen if you're opposite endpoint sent multiple messages on
the same coordination id.");
+    }
+    return rpc;
+  }
+
+  public <V> RpcOutcome<V> getAndRemoveRpcOutcome(int rpcType, int coordinationId,
Class<V> clazz) {
+
+    RpcOutcome<?> rpc = removeFromMap(coordinationId);
+    // logger.debug("Got rpc from map {}", rpc);
+    Class<?> outcomeClass = rpc.getOutcomeType();
+
+    if (outcomeClass != clazz) {
+      throw new IllegalStateException(String.format(
+          "RPC Engine had a submission and response configuration mismatch.  The RPC request
that you submitted was defined with an expected response type of %s.  However, "
+              + "when the response returned, a call to getResponseDefaultInstance() with
Rpc number %d provided an expected class of %s.  This means either your submission uses the
wrong type definition"
+              + "or your getResponseDefaultInstance() method responds the wrong instance
type ",
+          clazz.getCanonicalName(), rpcType, outcomeClass.getCanonicalName()));
+    }
+
+    @SuppressWarnings("unchecked")
+    RpcOutcome<V> crpc = (RpcOutcome<V>) rpc;
+
+    // logger.debug("Returning casted future");
+    return crpc;
+  }
+
+  public void recordRemoteFailure(int coordinationId, DrillPBError failure) {
+    // logger.debug("Updating failed future.");
+    try {
+      RpcOutcome<?> rpc = removeFromMap(coordinationId);
+      rpc.setException(new UserRemoteException(failure));
+    } catch (Exception ex) {
+      logger.warn("Failed to remove from map.  Not a problem since we were updating on failed
future.", ex);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/e4725ea5/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
----------------------------------------------------------------------
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcBus.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
index 5cc10a8..c360e51 100644
--- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
@@ -59,8 +59,6 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection>
imp
   private static final OutboundRpcMessage PONG = new OutboundRpcMessage(RpcMode.PONG, 0,
0, Acks.OK);
   private static final boolean ENABLE_SEPARATE_THREADS = "true".equals(System.getProperty("drill.enable_rpc_offload"));
 
-  protected final CoordinationQueue queue = new CoordinationQueue(16, 16);
-
   protected abstract MessageLite getResponseDefaultInstance(int rpcType) throws RpcException;
 
   protected void handle(C connection, int rpcType, ByteBuf pBody, ByteBuf dBody, ResponseSender
sender) throws RpcException{
@@ -69,8 +67,6 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection>
imp
 
   protected abstract Response handle(C connection, int rpcType, ByteBuf pBody, ByteBuf dBody)
throws RpcException;
 
-  public abstract boolean isClient();
-
   protected final RpcConfig rpcConfig;
 
   protected volatile SocketAddress local;
@@ -120,7 +116,7 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection>
imp
       assert rpcConfig.checkSend(rpcType, protobufBody.getClass(), clazz);
 
       Preconditions.checkNotNull(protobufBody);
-      ChannelListenerWithCoordinationId futureListener = queue.get(listener, clazz, connection);
+      ChannelListenerWithCoordinationId futureListener = connection.createNewRpcListener(listener,
clazz);
       OutboundRpcMessage m = new OutboundRpcMessage(RpcMode.REQUEST, rpcType, futureListener.getCoordinationId(),
protobufBody, dataBodies);
       ChannelFuture channelFuture = connection.getChannel().writeAndFlush(m);
       channelFuture.addListener(futureListener);
@@ -129,6 +125,7 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection>
imp
     } catch (Exception | AssertionError e) {
       listener.failed(new RpcException("Failure sending message.", e));
     } finally {
+
       if (!completed) {
         if (pBuffer != null) {
           pBuffer.release();
@@ -158,22 +155,16 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection>
imp
 
     @Override
     public void operationComplete(ChannelFuture future) throws Exception {
-      String msg;
+      final String msg;
+
       if(local!=null) {
         msg = String.format("Channel closed %s <--> %s.", local, remote);
       }else{
         msg = String.format("Channel closed %s <--> %s.", future.channel().localAddress(),
future.channel().remoteAddress());
       }
 
-      if (RpcBus.this.isClient()) {
-        if(local != null) {
-          logger.info(String.format(msg));
-        }
-      } else {
-        queue.channelClosed(new ChannelClosedException(msg));
-      }
-
-      clientConnection.close();
+      final ChannelClosedException ex = future.cause() != null ? new ChannelClosedException(msg,
future.cause()) : new ChannelClosedException(msg);
+      clientConnection.channelClosed(ex);
     }
 
   }
@@ -182,9 +173,6 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection>
imp
     return new ChannelClosedHandler(clientConnection, channel);
   }
 
-  private interface Recyclable {
-    public void recycle();
-  }
 
   private class ResponseSenderImpl implements ResponseSender {
 
@@ -261,6 +249,7 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection>
imp
 
     public InboundHandler(C connection) {
       super();
+      Preconditions.checkNotNull(connection);
       this.connection = connection;
       final Executor underlyingExecutor = ENABLE_SEPARATE_THREADS ? rpcConfig.getExecutor()
: new SameExecutor();
       this.exec = new RpcEventHandler(underlyingExecutor);
@@ -286,13 +275,13 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection>
imp
           break;
 
         case RESPONSE:
-          ResponseEvent respEvent = new ResponseEvent(msg.rpcType, msg.coordinationId, msg.pBody,
msg.dBody);
+          ResponseEvent respEvent = new ResponseEvent(connection, msg.rpcType, msg.coordinationId,
msg.pBody, msg.dBody);
           exec.execute(respEvent);
           break;
 
         case RESPONSE_FAILURE:
           DrillPBError failure = DrillPBError.parseFrom(new ByteBufInputStream(msg.pBody,
msg.pBody.readableBytes()));
-          queue.updateFailedFuture(msg.coordinationId, failure);
+          connection.recordRemoteFailure(msg.coordinationId, failure);
           if (RpcConstants.EXTRA_DEBUGGING) {
             logger.debug("Updated rpc future with coordinationId {} with failure ", msg.coordinationId,
failure);
           }
@@ -398,12 +387,14 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection>
imp
     private final int coordinationId;
     private final ByteBuf pBody;
     private final ByteBuf dBody;
+    private final C connection;
 
-    public ResponseEvent(int rpcType, int coordinationId, ByteBuf pBody, ByteBuf dBody) {
+    public ResponseEvent(C connection, int rpcType, int coordinationId, ByteBuf pBody, ByteBuf
dBody) {
       this.rpcType = rpcType;
       this.coordinationId = coordinationId;
       this.pBody = pBody;
       this.dBody = dBody;
+      this.connection = connection;
 
       if(pBody != null){
         pBody.retain();
@@ -418,7 +409,7 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection>
imp
       try {
         MessageLite m = getResponseDefaultInstance(rpcType);
         assert rpcConfig.checkReceive(rpcType, m.getClass());
-        RpcOutcome<?> rpcFuture = queue.getFuture(rpcType, coordinationId, m.getClass());
+        RpcOutcome<?> rpcFuture = connection.getAndRemoveRpcOutcome(rpcType, coordinationId,
m.getClass());
         Parser<?> parser = m.getParserForType();
         Object value = parser.parseFrom(new ByteBufInputStream(pBody, pBody.readableBytes()));
         rpcFuture.set(value, dBody);


Mime
View raw message