kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jdcry...@apache.org
Subject kudu git commit: KUDU-1894 fixed deadlock in client.Connection
Date Mon, 28 Aug 2017 21:58:46 GMT
Repository: kudu
Updated Branches:
  refs/heads/master b365ad0bd -> 82a8e9f99


KUDU-1894 fixed deadlock in client.Connection

Due to the reverse order of acquiring of the Connection.lock and some
lower-level Netty locks, Connection.enqueueMesage() could deadlock if
a ChannelDisconnected/ChannelClosed event arrived in the middle.

The idea is to not hold the Connection.lock while invoking the
Connection.sendCallToWire() method and, overall, avoid doing any
calls to Netty while holding that lock.

To test the changes, I ran the ITClient test via dist-test apllying
Todd's WIP patch on top: https://gerrit.cloudera.org/#/c/7579/

The test passed 3572 out of 3572 times:
  http://dist-test.cloudera.org/job?job_id=aserbin.1503526685.24101
  http://dist-test.cloudera.org/job?job_id=aserbin.1503527340.4787
  http://dist-test.cloudera.org/job?job_id=aserbin.1503527848.6921

Change-Id: I1a0edc3e3accbcff60f2cde641ee470312bbd27a
Reviewed-on: http://gerrit.cloudera.org:8080/7765
Tested-by: Kudu Jenkins
Reviewed-by: Dan Burkert <danburkert@apache.org>
Reviewed-by: Jean-Daniel Cryans <jdcryans@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/82a8e9f9
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/82a8e9f9
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/82a8e9f9

Branch: refs/heads/master
Commit: 82a8e9f990c9f4d7713d0ce3cc166b44ee2a9b3e
Parents: b365ad0
Author: Alexey Serbin <aserbin@cloudera.com>
Authored: Mon Aug 21 16:28:14 2017 -0700
Committer: Jean-Daniel Cryans <jdcryans@apache.org>
Committed: Mon Aug 28 21:58:22 2017 +0000

----------------------------------------------------------------------
 .../java/org/apache/kudu/client/Connection.java | 75 ++++++++++++++------
 1 file changed, 54 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/82a8e9f9/java/kudu-client/src/main/java/org/apache/kudu/client/Connection.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/Connection.java b/java/kudu-client/src/main/java/org/apache/kudu/client/Connection.java
index aa10e2e..c9cd594 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/Connection.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Connection.java
@@ -265,18 +265,41 @@ class Connection extends SimpleChannelUpstreamHandler {
       try {
         Preconditions.checkState(state == State.NEGOTIATING);
         Preconditions.checkState(inflightMessages.isEmpty());
-
-        state = State.READY;
         negotiationResult = (Negotiator.Success) m;
-        List<QueuedMessage> queued = Preconditions.checkNotNull(queuedMessages);
-        // The queuedMessages should not be used anymore once the connection is negotiated.
-        queuedMessages = null;
-        // Send out all the enqueued messages. This is done while holding the lock to preserve
-        // the sequence of the already enqueued and being-enqueued-right-now messages and
simplify
-        // the logic which checks for the consistency using Preconditions.checkXxx() methods.
-        for (final QueuedMessage qm : queued) {
-          sendCallToWire(qm.message, qm.cb);
+
+        // Before switching to the READY state, it's necessary to empty the queuedMessages.
There
+        // might be concurrent activity on adding new messages into the queue if enqueueMessage()
+        // is called in the middle.
+        while (!queuedMessages.isEmpty()) {
+          // Register the messages into the inflightMessages before sending them to the wire.
This
+          // is to be able to invoke appropriate callback when the response received. This
should
+          // be done under the lock since the inflightMessages itself does not provide any
+          // concurrency guarantees.
+          List<QueuedMessage> queued = queuedMessages;
+          for (final QueuedMessage qm : queued) {
+            Callback<Void, CallResponseInfo> empty = inflightMessages.put(
+                qm.message.getHeaderBuilder().getCallId(), qm.cb);
+            Preconditions.checkState(empty == null);
+          }
+          queuedMessages = Lists.newArrayList();
+
+          lock.unlock();
+          try {
+            // Send out the enqueued messages while not holding the lock. This is to avoid
+            // deadlock if channelDisconnected/channelClosed event happens and cleanup()
is called.
+            for (final QueuedMessage qm : queued) {
+              sendCallToWire(qm.message);
+            }
+          } finally {
+            lock.lock();
+          }
         }
+
+        assert queuedMessages.isEmpty();
+        queuedMessages = null;
+        // Set the state to READY -- that means the incoming messages should be no longer
put into
+        // the queuedMessages, but sent to wire right away (see the enqueueMessage() for
details).
+        state = State.READY;
       } finally {
         lock.unlock();
       }
@@ -477,11 +500,24 @@ class Connection extends SimpleChannelUpstreamHandler {
         return;
       }
 
-      // It's time to initiate sending the message over the wire.
-      sendCallToWire(msg, cb);
+      assert state == State.READY;
+      // Register the message into the inflightMessages before sending it to the wire.
+      final Callback<Void, CallResponseInfo> empty = inflightMessages.put(callId, cb);
+      Preconditions.checkState(empty == null);
     } finally {
       lock.unlock();
     }
+
+    // It's time to initiate sending the message over the wire. This is done outside of the
lock
+    // to prevent deadlocks due to the reverse order of locking while working with Connection.lock
+    // and the lower-level Netty locks. The other order of taking those two locks could happen
+    // upon receiving ChannelDisconnected or ChannelClosed events. Upon receiving those events,
+    // the low-level Netty lock is held and the channelDisconnected()/channelClosed() methods
+    // would call the cleanup() method. In its turn, the cleanup() method tries to acquire
the
+    // Connection.lock lock, while the low-level Netty lock might be already acquired.
+    //
+    // More details and an example of a stack trace is available in KUDU-1894 comments.
+    sendCallToWire(msg);
   }
 
   /**
@@ -561,18 +597,15 @@ class Connection extends SimpleChannelUpstreamHandler {
     }
   }
 
-  /** Start sending the message to the server over the wire. */
-  @GuardedBy("lock")
-  private void sendCallToWire(final RpcOutboundMessage msg, Callback<Void, CallResponseInfo>
cb) {
-    Preconditions.checkState(lock.isHeldByCurrentThread());
-    Preconditions.checkState(state == State.READY);
-
+  /**
+   * Start sending the message to the server over the wire. It's crucial to not hold the
lock
+   * while doing so: see enqueueMessage() and KUDU-1894 for details.
+   */
+  private void sendCallToWire(final RpcOutboundMessage msg) {
+    assert !lock.isHeldByCurrentThread();
     if (LOG.isTraceEnabled()) {
       LOG.trace("{} sending {}", getLogPrefix(), msg);
     }
-    final int callId = msg.getHeaderBuilder().getCallId();
-    final Callback<Void, CallResponseInfo> empty = inflightMessages.put(callId, cb);
-    Preconditions.checkState(empty == null);
     Channels.write(channel, msg);
   }
 


Mime
View raw message