hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From syuanji...@apache.org
Subject [30/50] hbase git commit: HBASE-15957 RpcClientImpl.close never ends in some circumstances
Date Sat, 11 Jun 2016 04:56:16 GMT
HBASE-15957 RpcClientImpl.close never ends in some circumstances

Signed-off-by: Enis Soztutar <enis@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/da88b482
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/da88b482
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/da88b482

Branch: refs/heads/hbase-12439
Commit: da88b4824054f57fbcbc7795469ab2369a39b5ed
Parents: 376ad0d
Author: Sergey Soldatov <ssa@apache.org>
Authored: Sun Jun 5 23:46:03 2016 -0700
Committer: Enis Soztutar <enis@apache.org>
Committed: Tue Jun 7 11:33:03 2016 -0700

----------------------------------------------------------------------
 .../apache/hadoop/hbase/ipc/RpcClientImpl.java  |  5 ++-
 .../hbase/ipc/IntegrationTestRpcClient.java     | 35 ++++++++++++++++----
 2 files changed, 31 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/da88b482/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
index d8c87e9..dc05af1 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
@@ -1202,9 +1202,8 @@ public class RpcClientImpl extends AbstractRpcClient {
     }
     if (connsToClose != null) {
       for (Connection conn : connsToClose) {
-        if (conn.markClosed(new InterruptedIOException("RpcClient is closing"))) {
-          conn.close();
-        }
+        conn.markClosed(new InterruptedIOException("RpcClient is closing"));
+        conn.close();
       }
     }
     // wait until all connections are closed

http://git-wip-us.apache.org/repos/asf/hbase/blob/da88b482/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
----------------------------------------------------------------------
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
b/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
index c28f3e6..6c0fbcc 100644
--- a/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hbase.ipc;
 
+import static org.apache.hadoop.hbase.ipc.RpcClient.SPECIFIC_WRITE_THREAD;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
@@ -41,12 +42,6 @@ import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.codec.Codec;
-import org.apache.hadoop.hbase.ipc.AbstractRpcClient;
-import org.apache.hadoop.hbase.ipc.AsyncRpcClient;
-import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
-import org.apache.hadoop.hbase.ipc.RpcClientImpl;
-import org.apache.hadoop.hbase.ipc.RpcScheduler;
-import org.apache.hadoop.hbase.ipc.RpcServer;
 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto;
 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto;
@@ -290,6 +285,7 @@ public class IntegrationTestRpcClient {
   static class SimpleClient extends Thread {
     AbstractRpcClient rpcClient;
     AtomicBoolean running = new  AtomicBoolean(true);
+    AtomicBoolean sending = new AtomicBoolean(false);
     AtomicReference<Throwable> exception = new AtomicReference<>(null);
     Cluster cluster;
     String id;
@@ -319,6 +315,7 @@ public class IntegrationTestRpcClient {
           if (address == null) {
             throw new IOException("Listener channel is closed");
           }
+          sending.set(true);
           ret = (EchoResponseProto)
               rpcClient.callBlockingMethod(md, null, param, ret, user, address);
         } catch (Exception e) {
@@ -340,6 +337,9 @@ public class IntegrationTestRpcClient {
     void stopRunning() {
       running.set(false);
     }
+    boolean isSending() {
+      return sending.get();
+    }
 
     void rethrowException() throws Throwable {
       if (exception.get() != null) {
@@ -348,6 +348,29 @@ public class IntegrationTestRpcClient {
     }
   }
 
+  /*
+  Test that not started connections are successfully removed from connection pool when
+  rpc client is closing.
+   */
+  @Test (timeout = 30000)
+  public void testRpcWithWriteThread() throws IOException, InterruptedException {
+    LOG.info("Starting test");
+    Cluster cluster = new Cluster(1, 1);
+    cluster.startServer();
+    conf.setBoolean(SPECIFIC_WRITE_THREAD, true);
+    for(int i = 0; i <1000; i++) {
+      AbstractRpcClient rpcClient = createRpcClient(conf, true);
+      SimpleClient client = new SimpleClient(cluster, rpcClient, "Client1");
+      client.start();
+      while(!client.isSending()) {
+        Thread.sleep(1);
+      }
+      client.stopRunning();
+      rpcClient.close();
+    }
+  }
+
+
   @Test (timeout = 900000)
   public void testRpcWithChaosMonkeyWithSyncClient() throws Throwable {
     for (int i = 0; i < numIterations; i++) {


Mime
View raw message