kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From granthe...@apache.org
Subject [kudu] branch branch-1.8.x updated: KUDU-2710: Fix KeepAliveRequest retries
Date Wed, 27 Feb 2019 15:09:16 GMT
This is an automated email from the ASF dual-hosted git repository.

granthenke pushed a commit to branch branch-1.8.x
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/branch-1.8.x by this push:
     new 94c1b70  KUDU-2710: Fix KeepAliveRequest retries
94c1b70 is described below

commit 94c1b707faf383444bbc791e6ee1c70f5b108121
Author: Grant Henke <granthenke@apache.org>
AuthorDate: Mon Feb 25 21:17:01 2019 -0600

    KUDU-2710: Fix KeepAliveRequest retries
    
    Fixes KeepAliveRequest retries by adding a partitionKey
    implementation. Without this a null partitionKey is passed
    and the client treats this as a master table.
    
    A follow up patch should include fixes to prevent issues
    like this in the future and fix any remaining retry issues.
    This patch is kept small to ensure easy backports.
    
    Change-Id: I573c1b6a14a4277c0f837c189175ed91e4e9e933
    Reviewed-on: http://gerrit.cloudera.org:8080/12606
    Reviewed-by: Adar Dembo <adar@cloudera.com>
    Tested-by: Kudu Jenkins
---
 .../org/apache/kudu/client/AsyncKuduScanner.java    |  6 ++++++
 .../main/java/org/apache/kudu/client/RpcProxy.java  | 21 +++++++++++++++++++++
 .../java/org/apache/kudu/client/TestKuduClient.java |  8 ++++++++
 3 files changed, 35 insertions(+)

diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
index 71b1146..9f3c4c1 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
@@ -864,6 +864,12 @@ public final class AsyncKuduScanner {
     }
 
     @Override
+    public byte[] partitionKey() {
+      // This key is used to lookup where the request needs to go
+      return pruner.nextPartitionKey();
+    }
+
+    @Override
     Pair<Void, Object> deserialize(final CallResponse callResponse,
                                    String tsUUID) throws KuduException {
       ScannerKeepAliveResponsePB.Builder builder = ScannerKeepAliveResponsePB.newBuilder();
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/RpcProxy.java b/java/kudu-client/src/main/java/org/apache/kudu/client/RpcProxy.java
index 347e3c8..5a2728d 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/RpcProxy.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/RpcProxy.java
@@ -60,6 +60,9 @@ class RpcProxy {
 
   private static final Logger LOG = LoggerFactory.getLogger(RpcProxy.class);
 
+  private static int staticNumFail = 0;
+  private static Exception staticException = null;
+
   /** The reference to the top-level Kudu client object. */
   @Nonnull
   private final AsyncKuduClient client;
@@ -90,6 +93,18 @@ class RpcProxy {
   }
 
   /**
+   * Fails the next numFail RPCs by throwing the passed exception.
+   * @param numFail the number of RPCs to fail
+   * @param exception the exception to throw when failing an rpc
+   */
+  @InterfaceAudience.LimitedPrivate("Test")
+  static void failNextRpcs(int numFail, Exception exception) {
+    Preconditions.checkNotNull(exception);
+    staticNumFail = numFail;
+    staticException = exception;
+  }
+
+  /**
    * Send the specified RPC using the connection to the Kudu server.
    *
    * @param <R> type of the RPC
@@ -101,6 +116,12 @@ class RpcProxy {
                           final Connection connection,
                           final KuduRpc<R> rpc) {
     try {
+      // Throw an exception to enable testing failures. See `failNextRpcs`.
+      if (staticNumFail > 0) {
+        staticNumFail--;
+        LOG.warn("Forcing a failure on sendRpc: " + rpc);
+        throw staticException;
+      }
       if (!rpc.getRequiredFeatures().isEmpty()) {
         // An extra optimization: when the peer's features are already known, check that
the server
         // supports feature flags, if those are required.
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
index 49357f6..9e2b696 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
@@ -45,6 +45,7 @@ import java.math.BigDecimal;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Random;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -56,6 +57,7 @@ import com.stumbleupon.async.Deferred;
 import org.apache.kudu.test.KuduTestHarness;
 import org.apache.kudu.test.KuduTestHarness.TabletServerConfig;
 import org.apache.kudu.util.ClientTestUtil;
+import org.apache.kudu.util.RandomUtils;
 import org.apache.kudu.util.TimestampUtil;
 import org.junit.Before;
 import org.junit.Rule;
@@ -283,8 +285,14 @@ public class TestKuduClient {
     // Wait for longer than the scanner ttl calling keepAlive throughout.
     // Each loop sleeps 25% of the scanner ttl and we loop 10 times to ensure
     // we extend over 2x the scanner ttl.
+    Random random = RandomUtils.getRandom();
     for (int i = 0; i < 10; i++) {
       Thread.sleep(SHORT_SCANNER_TTL_MS / 4);
+      // Force 1/3 of the keepAlive requests to retry up to 3 times.
+      if (i % 3 == 0) {
+        RpcProxy.failNextRpcs(random.nextInt(4),
+            new RecoverableException(Status.ServiceUnavailable("testKeepAlive")));
+      }
       scanner.keepAlive();
     }
 


Mime
View raw message