hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chenh...@apache.org
Subject hbase git commit: HBASE-15278 AsyncRPCClient hangs if Connection closes before RPC call response
Date Sat, 30 Apr 2016 01:26:46 GMT
Repository: hbase
Updated Branches:
  refs/heads/master c236409c3 -> 01c0448cc


HBASE-15278 AsyncRPCClient hangs if Connection closes before RPC call response


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/01c0448c
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/01c0448c
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/01c0448c

Branch: refs/heads/master
Commit: 01c0448ccd943186ba8045074a59e53f8f08c364
Parents: c236409
Author: chenheng <chenheng@apache.org>
Authored: Sat Apr 30 09:27:32 2016 +0800
Committer: chenheng <chenheng@apache.org>
Committed: Sat Apr 30 09:27:32 2016 +0800

----------------------------------------------------------------------
 .../hadoop/hbase/ipc/AsyncRpcChannel.java       |  6 ++
 .../hbase/ipc/AsyncServerResponseHandler.java   |  8 +--
 .../hadoop/hbase/ipc/AbstractTestIPC.java       | 69 +++++++++++++++++++-
 3 files changed, 74 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/01c0448c/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java
index 53eb824..ef3240c 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java
@@ -210,6 +210,12 @@ public class AsyncRpcChannel {
     ch.pipeline().addLast("frameDecoder",
       new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
     ch.pipeline().addLast(new AsyncServerResponseHandler(this));
+    ch.closeFuture().addListener(new GenericFutureListener<ChannelFuture>() {
+      @Override
+      public void operationComplete(ChannelFuture future) throws Exception {
+        close(null);
+      }
+    });
     try {
       writeChannelHeader(ch).addListener(new GenericFutureListener<ChannelFuture>()
{
         @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/01c0448c/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java
index e0c7586..5c604a4 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java
@@ -17,8 +17,6 @@
  */
 package org.apache.hadoop.hbase.ipc;
 
-import java.io.IOException;
-
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@@ -105,11 +103,6 @@ public class AsyncServerResponseHandler extends SimpleChannelInboundHandler<Byte
     channel.close(cause);
   }
 
-  @Override
-  public void channelInactive(ChannelHandlerContext ctx) throws Exception {
-    channel.close(new IOException("connection closed"));
-  }
-
   /**
    * @param e Proto exception
    * @return RemoteException made from passed <code>e</code>
@@ -123,4 +116,5 @@ public class AsyncServerResponseHandler extends SimpleChannelInboundHandler<Byte
             e.getPort(), doNotRetry)
         : new RemoteWithExtrasException(innerExceptionClassName, e.getStackTrace(), doNotRetry);
   }
+
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/01c0448c/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
index 69c8fe2..bfbfa8c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
@@ -30,6 +30,8 @@ import java.net.ConnectException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.SocketTimeoutException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -39,6 +41,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
@@ -158,6 +161,39 @@ public abstract class AbstractTestIPC {
     }
   }
 
+  static class TestFailingRpcServer extends TestRpcServer {
+
+    TestFailingRpcServer() throws IOException {
+      this(new FifoRpcScheduler(CONF, 1), CONF);
+    }
+
+    TestFailingRpcServer(Configuration conf) throws IOException {
+      this(new FifoRpcScheduler(conf, 1), conf);
+    }
+
+    TestFailingRpcServer(RpcScheduler scheduler, Configuration conf) throws IOException {
+      super(scheduler, conf);
+    }
+
+    class FailingConnection extends Connection {
+      public FailingConnection(SocketChannel channel, long lastContact) {
+        super(channel, lastContact);
+      }
+      @Override
+      protected void processRequest(ByteBuffer buf) throws IOException, InterruptedException
{
+        // this will throw exception after the connection header is read, and an RPC is sent
+        // from client
+        throw new DoNotRetryIOException("Failing for test");
+      }
+    }
+
+    @Override
+    protected Connection getConnection(SocketChannel channel, long time) {
+      return new FailingConnection(channel, time);
+    }
+
+  }
+
   protected abstract AbstractRpcClient createRpcClientNoCodec(Configuration conf);
 
   /**
@@ -296,8 +332,8 @@ public abstract class AbstractTestIPC {
     }
   }
 
-  /** Tests that the rpc scheduler is called when requests arrive. */
-  @Test
+  /** Tests that RPC max request size is respected from the server side */
+  @Test (timeout = 30000)
   public void testRpcMaxRequestSize() throws IOException, InterruptedException {
     Configuration conf = new Configuration(CONF);
     conf.setInt(RpcServer.MAX_REQUEST_SIZE, 100);
@@ -327,6 +363,35 @@ public abstract class AbstractTestIPC {
     }
   }
 
+  /** Tests that the connection closing is handled by the client with outstanding RPC calls
*/
+  @Test (timeout = 30000)
+  public void testConnectionCloseWithOutstandingRPCs() throws IOException, InterruptedException
{
+    Configuration conf = new Configuration(CONF);
+
+    RpcServer rpcServer = new TestFailingRpcServer(conf);
+    AbstractRpcClient client = createRpcClient(conf);
+    try {
+      rpcServer.start();
+      MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
+      EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
+      InetSocketAddress address = rpcServer.getListenerAddress();
+      if (address == null) {
+        throw new IOException("Listener channel is closed");
+      }
+      try {
+        client.call(new PayloadCarryingRpcController(
+          CellUtil.createCellScanner(ImmutableList.<Cell> of(CELL))), md, param,
+          md.getOutputType().toProto(), User.getCurrent(), address,
+          new MetricsConnection.CallStats());
+        fail("RPC should have failed because server closed connection");
+      } catch(IOException ex) {
+        // pass
+      }
+    } finally {
+      rpcServer.stop();
+    }
+  }
+
   /**
    * Instance of RpcServer that echoes client hostAddress back to client
    */


Mime
View raw message