kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From danburk...@apache.org
Subject [1/2] incubator-kudu git commit: [java client] RPCs can get lost in a TabletClient race
Date Thu, 30 Jun 2016 17:22:59 GMT
Repository: incubator-kudu
Updated Branches:
  refs/heads/master 1b7abe081 -> e0257ea41


[java client] RPCs can get lost in a TabletClient race

We saw hangs after running ITBLL for hours. Turns out that the recent
fixes in TabletClient introduced a new race condition. rpcs_inflight
is being cleaned in cleanup() by copying all the elements from it
and then calling clear(). Even though this is done under a lock, that
lock isn't protecting rpcs_inflight so it's possible to clear() rpcs
that were not copied out.

I haven't been able to recreate this race in unit tests, but it fixed
ITBLL.

Change-Id: Iaff89eb832d0d6f0dede198661856fae1a8585a0
Reviewed-on: http://gerrit.cloudera.org:8080/3541
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <adar@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/6583dffe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/6583dffe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/6583dffe

Branch: refs/heads/master
Commit: 6583dffef68ae6bf13aa682a366ced92f222391d
Parents: 1b7abe0
Author: Jean-Daniel Cryans <jdcryans@apache.org>
Authored: Wed Jun 29 18:35:19 2016 -0700
Committer: Jean-Daniel Cryans <jdcryans@apache.org>
Committed: Thu Jun 30 17:20:24 2016 +0000

----------------------------------------------------------------------
 .../java/org/kududb/client/TabletClient.java    | 31 +++++++++++++-------
 1 file changed, 20 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/6583dffe/java/kudu-client/src/main/java/org/kududb/client/TabletClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/kududb/client/TabletClient.java b/java/kudu-client/src/main/java/org/kududb/client/TabletClient.java
index ff47189..4d664a9 100644
--- a/java/kudu-client/src/main/java/org/kududb/client/TabletClient.java
+++ b/java/kudu-client/src/main/java/org/kududb/client/TabletClient.java
@@ -155,6 +155,7 @@ public class TabletClient extends ReplayingDecoder<VoidEnum> {
     if (!rpc.deadlineTracker.hasDeadline()) {
       LOG.warn(getPeerUuidLoggingString() + " sending an rpc without a timeout " + rpc);
     }
+    Pair<ChannelBuffer, Integer> encodedRpcAndId = null;
     if (chan != null) {
       if (!rpc.getRequiredFeatures().isEmpty() &&
           !secureRpcHelper.getServerFeatures().contains(
@@ -163,14 +164,14 @@ public class TabletClient extends ReplayingDecoder<VoidEnum> {
             "the server does not support the APPLICATION_FEATURE_FLAGS RPC feature"));
       }
 
-      final ChannelBuffer serialized = encode(rpc);
-      if (serialized == null) {  // Error during encoding.
+      encodedRpcAndId = encode(rpc);
+      if (encodedRpcAndId == null) {  // Error during encoding.
         return;  // Stop here.  RPC has been failed already.
       }
 
       final Channel chan = this.chan;  // Volatile read.
       if (chan != null) {  // Double check if we disconnected during encode().
-        Channels.write(chan, serialized);
+        Channels.write(chan, encodedRpcAndId.getFirst());
         return;
       }
     }
@@ -182,10 +183,11 @@ public class TabletClient extends ReplayingDecoder<VoidEnum> {
         tryAgain = true;
       // Check if we got disconnected.
       } else if (dead) {
-        if (rpcs_inflight.isEmpty()) {
-          LOG.debug("rpcs_inflight is empty and this TabletClient is dead, will assume that
this " +
-              "RPC was taken care of already {}", rpc);
-        } else {
+        // We got disconnected during the process of encoding this rpc, but we need to check
if
+        // cleanup() already took care of calling failOrRetryRpc() for us. If it did, the
entry we
+        // added in rpcs_inflight will be missing. If not, we have to call failOrRetryRpc()
+        // ourselves after this synchronized block.
+        if (encodedRpcAndId != null && rpcs_inflight.containsKey(encodedRpcAndId.getSecond()))
{
           failRpc = true;
         }
       } else {
@@ -209,7 +211,7 @@ public class TabletClient extends ReplayingDecoder<VoidEnum> {
     }
   }
 
-  private <R> ChannelBuffer encode(final KuduRpc<R> rpc) {
+  private <R> Pair<ChannelBuffer, Integer> encode(final KuduRpc<R> rpc)
{
     final int rpcid = this.rpcid.incrementAndGet();
     ChannelBuffer payload;
     final String service = rpc.serviceName();
@@ -261,7 +263,7 @@ public class TabletClient extends ReplayingDecoder<VoidEnum> {
 
     payload = secureRpcHelper.wrap(payload);
 
-    return payload;
+    return new Pair<>(payload, rpcid);
   }
 
   /**
@@ -674,8 +676,15 @@ public class TabletClient extends ReplayingDecoder<VoidEnum> {
       }
       dead = true;
       rpcs = pending_rpcs == null ? new ArrayList<KuduRpc<?>>(rpcs_inflight.size())
: pending_rpcs;
-      rpcs.addAll(rpcs_inflight.values());
-      rpcs_inflight.clear();
+
+      for (Iterator<KuduRpc<?>> iterator = rpcs_inflight.values().iterator();
iterator.hasNext();) {
+        KuduRpc<?> rpc = iterator.next();
+        rpcs.add(rpc);
+        iterator.remove();
+      }
+      // After this, rpcs_inflight might still have entries since they could have been added
+      // concurrently, and those RPCs will be handled by their caller in sendRpc.
+
       pending_rpcs = null;
     }
     final ConnectionResetException exception =


Mime
View raw message