hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From te...@apache.org
Subject hbase git commit: HBASE-14241 Fix deadlock during cluster shutdown due to concurrent connection close
Date Wed, 19 Aug 2015 16:01:25 GMT
Repository: hbase
Updated Branches:
  refs/heads/branch-1.0 7cde21a0a -> de5bdbe6b


HBASE-14241 Fix deadlock during cluster shutdown due to concurrent connection close


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/de5bdbe6
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/de5bdbe6
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/de5bdbe6

Branch: refs/heads/branch-1.0
Commit: de5bdbe6b2e4e494f0ec8d06c1ab8cc51ee9c187
Parents: 7cde21a
Author: tedyu <yuzhihong@gmail.com>
Authored: Wed Aug 19 09:01:24 2015 -0700
Committer: tedyu <yuzhihong@gmail.com>
Committed: Wed Aug 19 09:01:24 2015 -0700

----------------------------------------------------------------------
 .../apache/hadoop/hbase/ipc/RpcClientImpl.java  | 108 +++++++++++++------
 1 file changed, 75 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/de5bdbe6/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
index 1913027..f4fd844 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
@@ -150,6 +150,18 @@ public class RpcClientImpl extends AbstractRpcClient {
     }
   }
 
+  /*
+   * This is the return value from {@link #waitForWork()} indicating whether run() method
should:
+   * read response
+   * close the connection
+   * take no action - connection would be closed by others
+   */
+  private enum WaitForWorkResult {
+    READ_RESPONSE,
+    CALLER_SHOULD_CLOSE,
+    CLOSED
+  }
+
   /** Thread that reads responses and notifies callers.  Each connection owns a
    * socket connected to a remote address.  Calls are multiplexed through this
    * socket: responses may be delivered out of order. */
@@ -241,12 +253,13 @@ public class RpcClientImpl extends AbstractRpcClient {
        */
       @Override
       public void run() {
+        boolean closeBySelf = false;
         while (!shouldCloseConnection.get()) {
           CallFuture cts = null;
           try {
             cts = callsToWrite.take();
           } catch (InterruptedException e) {
-            markClosed(new InterruptedIOException());
+            closeBySelf = markClosed(new InterruptedIOException());
           }
 
           if (cts == null || cts == CallFuture.DEATH_PILL) {
@@ -267,11 +280,14 @@ public class RpcClientImpl extends AbstractRpcClient {
           } catch (IOException e) {
             LOG.warn("call write error for call #" + cts.call.id + ", message =" + e.getMessage());
             cts.call.setException(e);
-            markClosed(e);
+            closeBySelf = markClosed(e);
           }
         }
 
         cleanup();
+        if (closeBySelf) {
+          close();
+        }
       }
 
       /**
@@ -504,27 +520,28 @@ public class RpcClientImpl extends AbstractRpcClient {
      * it is idle too long, it is marked as to be closed,
      * or the client is marked as not running.
      *
-     * @return true if it is time to read a response; false otherwise.
+     * @return WaitForWorkResult indicating whether it is time to read response;
+     * if the caller should close; or otherwise
      */
-    protected synchronized boolean waitForWork() throws InterruptedException {
+    protected synchronized WaitForWorkResult waitForWork() throws InterruptedException {
       // beware of the concurrent access to the calls list: we can add calls, but as well
       //  remove them.
       long waitUntil = EnvironmentEdgeManager.currentTime() + minIdleTimeBeforeClose;
 
       while (true) {
         if (shouldCloseConnection.get()) {
-          return false;
+          return WaitForWorkResult.CLOSED;
         }
 
         if (!running.get()) {
-          markClosed(new IOException("stopped with " + calls.size() + " pending request(s)"));
-          return false;
+          if (markClosed(new IOException("stopped with " + calls.size() + " pending request(s)")))
{
+            return WaitForWorkResult.CALLER_SHOULD_CLOSE;
+          }
+          return WaitForWorkResult.CLOSED;
         }
 
         if (!calls.isEmpty()) {
-          // shouldCloseConnection can be set to true by a parallel thread here. The caller
-          //  will need to check anyway.
-          return true;
+          return WaitForWorkResult.READ_RESPONSE;
         }
 
         if (EnvironmentEdgeManager.currentTime() >= waitUntil) {
@@ -532,9 +549,11 @@ public class RpcClientImpl extends AbstractRpcClient {
           // We expect the number of calls to be zero here, but actually someone can
           //  adds a call at the any moment, as there is no synchronization between this
task
           //  and adding new calls. It's not a big issue, but it will get an exception.
-          markClosed(new IOException(
-              "idle connection closed with " + calls.size() + " pending request(s)"));
-          return false;
+          if (markClosed(new IOException(
+              "idle connection closed with " + calls.size() + " pending request(s)"))) {
+            return WaitForWorkResult.CALLER_SHOULD_CLOSE;
+          }
+          return WaitForWorkResult.CLOSED;
         }
 
         wait(Math.min(minIdleTimeBeforeClose, 1000));
@@ -551,23 +570,37 @@ public class RpcClientImpl extends AbstractRpcClient {
         LOG.trace(getName() + ": starting, connections " + connections.size());
       }
 
+      WaitForWorkResult result = WaitForWorkResult.CALLER_SHOULD_CLOSE;
       try {
-        while (waitForWork()) { // Wait here for work - read or close connection
-          readResponse();
+        result = waitForWork(); // Wait here for work - read or close connection
+        while (result == WaitForWorkResult.READ_RESPONSE) {
+          if (readResponse()) {
+            // shouldCloseConnection is set to true by readResponse(). Close the connection
+            result = WaitForWorkResult.CALLER_SHOULD_CLOSE;
+          } else {
+            result = waitForWork();
+          }
         }
       } catch (InterruptedException t) {
         if (LOG.isTraceEnabled()) {
           LOG.trace(getName() + ": interrupted while waiting for call responses");
         }
-        markClosed(ExceptionUtil.asInterrupt(t));
+        if (markClosed(ExceptionUtil.asInterrupt(t))) {
+          // shouldCloseConnection is set to true. Close connection
+          result = WaitForWorkResult.CALLER_SHOULD_CLOSE;
+        }
       } catch (Throwable t) {
         if (LOG.isDebugEnabled()) {
           LOG.debug(getName() + ": unexpected throwable while waiting for call responses",
t);
         }
-        markClosed(new IOException("Unexpected throwable while waiting call responses", t));
+        if (markClosed(new IOException("Unexpected throwable while waiting call responses",
t))) {
+          // shouldCloseConnection is set to true. Close connection
+          result = WaitForWorkResult.CALLER_SHOULD_CLOSE;
+        }
+      }
+      if (result == WaitForWorkResult.CALLER_SHOULD_CLOSE) {
+        close();
       }
-
-      close();
 
       if (LOG.isTraceEnabled()) {
         LOG.trace(getName() + ": stopped, connections " + connections.size());
@@ -696,8 +729,9 @@ public class RpcClientImpl extends AbstractRpcClient {
         }
         IOException e = new FailedServerException(
             "This server is in the failed servers list: " + server);
-        markClosed(e);
-        close();
+        if (markClosed(e)) {
+          close();
+        }
         throw e;
       }
 
@@ -775,8 +809,9 @@ public class RpcClientImpl extends AbstractRpcClient {
             e = new IOException("Could not set up IO Streams to " + server, t);
           }
         }
-        markClosed(e);
-        close();
+        if (markClosed(e)) {
+          close();
+        }
         throw e;
       }
     }
@@ -916,9 +951,10 @@ public class RpcClientImpl extends AbstractRpcClient {
 
     /* Receive a response.
      * Because only one receiver, so no synchronization on in.
+     * @return true if connection should be closed by caller
      */
-    protected void readResponse() {
-      if (shouldCloseConnection.get()) return;
+    protected boolean readResponse() {
+      if (shouldCloseConnection.get()) return false;
       Call call = null;
       boolean expectedCall = false;
       try {
@@ -940,14 +976,14 @@ public class RpcClientImpl extends AbstractRpcClient {
           int readSoFar = IPCUtil.getTotalSizeWhenWrittenDelimited(responseHeader);
           int whatIsLeftToRead = totalSize - readSoFar;
           IOUtils.skipFully(in, whatIsLeftToRead);
-          return;
+          return false;
         }
         if (responseHeader.hasException()) {
           ExceptionResponse exceptionResponse = responseHeader.getException();
           RemoteException re = createRemoteException(exceptionResponse);
           call.setException(re);
           if (isFatalConnectionException(exceptionResponse)) {
-            markClosed(re);
+            return markClosed(re);
           }
         } else {
           Message value = null;
@@ -974,11 +1010,12 @@ public class RpcClientImpl extends AbstractRpcClient {
           if (LOG.isTraceEnabled()) LOG.trace("ignored", e);
         } else {
           // Treat this as a fatal condition and close this connection
-          markClosed(e);
+          return markClosed(e);
         }
       } finally {
         cleanupCalls(false);
       }
+      return false;
     }
 
     /**
@@ -1004,18 +1041,22 @@ public class RpcClientImpl extends AbstractRpcClient {
           e.getStackTrace(), doNotRetry);
     }
 
-    protected synchronized void markClosed(IOException e) {
+    /*
+     * @return true if shouldCloseConnection is set true by this thread; false otherwise
+     */
+    protected boolean markClosed(IOException e) {
       if (e == null) throw new NullPointerException();
 
-      if (shouldCloseConnection.compareAndSet(false, true)) {
+      boolean ret = shouldCloseConnection.compareAndSet(false, true);
+      if (ret) {
         if (LOG.isTraceEnabled()) {
           LOG.trace(getName() + ": marking at should close, reason: " + e.getMessage());
         }
         if (callSender != null) {
           callSender.close();
         }
-        notifyAll();
       }
+      return ret;
     }
 
 
@@ -1114,8 +1155,9 @@ public class RpcClientImpl extends AbstractRpcClient {
         // In case the CallSender did not setupIOStreams() yet, the Connection may not be
started
         // at all (if CallSender has a cancelled Call it can happen). See HBASE-13851
         if (!conn.isAlive()) {
-          conn.markClosed(new InterruptedIOException("RpcClient is closing"));
-          conn.close();
+          if (conn.markClosed(new InterruptedIOException("RpcClient is closing"))) {
+            conn.close();
+          }
         }
       }
     }


Mime
View raw message