kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jdcry...@apache.org
Subject kudu git commit: KUDU-1962. Fix NPE when master returns SERVER_TOO_BUSY
Date Fri, 31 Mar 2017 16:11:26 GMT
Repository: kudu
Updated Branches:
  refs/heads/branch-1.3.x 00813f96b -> 5020a5039


KUDU-1962. Fix NPE when master returns SERVER_TOO_BUSY

If the master's RPC queue overflows, it returns a TOO_BUSY error to the
clients. We were not properly handling this in the case of
ConnectToMaster RPCs.

This adds a new unit test which provokes the behavior by setting a short
queue and injecting latency into tablet location lookups while starting
several threads which make clients and perform lookups. This also fixes
the issue by reverting back to passing the placeholder 'masterTable'
into these RPCs.

Longer term we should avoid this 'masterTable' hack, since the master is
not a table, but given that this causes serious problems, a more
targeted fix is desirable for 1.3.1 and 1.4.0.

Change-Id: Ic4aba2e39f05488da500c18247c72181c8e9dceb
Reviewed-on: http://gerrit.cloudera.org:8080/6516
Reviewed-by: Jean-Daniel Cryans <jdcryans@apache.org>
Tested-by: Kudu Jenkins
(cherry picked from commit bee8e875e21acb1d36f104f8388fc9ed6c963821)
Reviewed-on: http://gerrit.cloudera.org:8080/6519


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/5020a503
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/5020a503
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/5020a503

Branch: refs/heads/branch-1.3.x
Commit: 5020a503959849e75b3ba85f7e2a948caf6e504d
Parents: 00813f9
Author: Todd Lipcon <todd@apache.org>
Authored: Thu Mar 30 12:18:14 2017 -0700
Committer: Todd Lipcon <todd@apache.org>
Committed: Thu Mar 30 22:50:26 2017 +0000

----------------------------------------------------------------------
 .../org/apache/kudu/client/AsyncKuduClient.java |  3 +-
 .../apache/kudu/client/ConnectToCluster.java    |  7 +-
 .../kudu/client/ConnectToMasterRequest.java     |  5 +-
 .../java/org/apache/kudu/client/KuduClient.java |  4 +-
 .../org/apache/kudu/client/BaseKuduTest.java    |  2 +-
 .../org/apache/kudu/client/MiniKuduCluster.java | 12 +++
 .../apache/kudu/client/TestHandleTooBusy.java   | 83 ++++++++++++++++++++
 .../apache/kudu/client/TestMasterFailover.java  | 80 ++++++++++++-------
 8 files changed, 160 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/5020a503/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
index f1c8adc..a621348 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
@@ -1114,7 +1114,8 @@ public class AsyncKuduClient implements AutoCloseable {
    * @return An initialized Deferred object to hold the response.
    */
   Deferred<Master.GetTableLocationsResponsePB> getMasterTableLocationsPB(KuduRpc<?>
parentRpc) {
-    return ConnectToCluster.run(masterAddresses, parentRpc, connectionCache,
+    // TODO(todd): stop using this 'masterTable' hack.
+    return ConnectToCluster.run(masterTable, masterAddresses, parentRpc, connectionCache,
         defaultAdminOperationTimeoutMs)
         .addCallback(
             new Callback<Master.GetTableLocationsResponsePB, ConnectToClusterResponse>()
{

http://git-wip-us.apache.org/repos/asf/kudu/blob/5020a503/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToCluster.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToCluster.java b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToCluster.java
index 018b531..ca4b270 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToCluster.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToCluster.java
@@ -79,11 +79,12 @@ final class ConnectToCluster {
   }
 
   private static Deferred<ConnectToMasterResponsePB> connectToMaster(
+      final KuduTable masterTable,
       final TabletClient masterClient, KuduRpc<?> parentRpc,
       long defaultTimeoutMs) {
     // TODO: Handle the situation when multiple in-flight RPCs all want to query the masters,
     // basically reuse in some way the master permits.
-    final ConnectToMasterRequest rpc = new ConnectToMasterRequest();
+    final ConnectToMasterRequest rpc = new ConnectToMasterRequest(masterTable);
     if (parentRpc != null) {
       rpc.setTimeoutMillis(parentRpc.deadlineTracker.getMillisBeforeDeadline());
       rpc.setParentRpc(parentRpc);
@@ -124,6 +125,7 @@ final class ConnectToCluster {
    * Locate the leader master and retrieve the cluster information
    * (see {@link ConnectToClusterResponse}.
    *
+   * @param masterTable the "placeholder" table used by AsyncKuduClient
    * @param masterAddresses the addresses of masters to fetch from
    * @param parentRpc RPC that prompted a master lookup, can be null
    * @param connCache the client's connection cache, used for creating connections
@@ -132,6 +134,7 @@ final class ConnectToCluster {
    * @return a Deferred object for the cluster connection status
    */
   public static Deferred<ConnectToClusterResponse> run(
+      KuduTable masterTable,
       List<HostAndPort> masterAddresses,
       KuduRpc<?> parentRpc,
       ConnectionCache connCache,
@@ -150,7 +153,7 @@ final class ConnectToCluster {
         Status statusIOE = Status.IOError(message);
         d = Deferred.fromError(new NonRecoverableException(statusIOE));
       } else {
-        d = connectToMaster(client, parentRpc, defaultTimeoutMs);
+        d = connectToMaster(masterTable, client, parentRpc, defaultTimeoutMs);
       }
       d.addCallbacks(connector.callbackForNode(hostAndPort),
           connector.errbackForNode(hostAndPort));

http://git-wip-us.apache.org/repos/asf/kudu/blob/5020a503/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToMasterRequest.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToMasterRequest.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToMasterRequest.java
index 00e4537..0a74ce9 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToMasterRequest.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToMasterRequest.java
@@ -51,8 +51,9 @@ public class ConnectToMasterRequest extends KuduRpc<ConnectToMasterResponsePB>
{
    */
   private String method = CONNECT_TO_MASTER;
 
-  public ConnectToMasterRequest() {
-    super(null); // no KuduTable
+  public ConnectToMasterRequest(KuduTable masterTable) {
+    super(masterTable);
+    // TODO(todd): get rid of 'masterTable' hack
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/kudu/blob/5020a503/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java
index b23e5c2..fc2e4da 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.Executor;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.stumbleupon.async.Deferred;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -41,7 +42,8 @@ public class KuduClient implements AutoCloseable {
 
   public static final Logger LOG = LoggerFactory.getLogger(AsyncKuduClient.class);
 
-  private final AsyncKuduClient asyncClient;
+  @VisibleForTesting
+  final AsyncKuduClient asyncClient;
 
   KuduClient(AsyncKuduClient asyncClient) {
     this.asyncClient = asyncClient;

http://git-wip-us.apache.org/repos/asf/kudu/blob/5020a503/java/kudu-client/src/test/java/org/apache/kudu/client/BaseKuduTest.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/BaseKuduTest.java b/java/kudu-client/src/test/java/org/apache/kudu/client/BaseKuduTest.java
index c87b706..83d2492 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/BaseKuduTest.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/BaseKuduTest.java
@@ -61,7 +61,7 @@ public class BaseKuduTest {
   // the seed it picks so that you can re-run tests with it.
   private static final Random randomForTSRestart = new Random();
 
-  private static MiniKuduCluster miniCluster;
+  protected static MiniKuduCluster miniCluster;
 
   // Expose the MiniKuduCluster builder so that subclasses can alter the builder.
   protected static final MiniKuduCluster.MiniKuduClusterBuilder miniClusterBuilder =

http://git-wip-us.apache.org/repos/asf/kudu/blob/5020a503/java/kudu-client/src/test/java/org/apache/kudu/client/MiniKuduCluster.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/MiniKuduCluster.java b/java/kudu-client/src/test/java/org/apache/kudu/client/MiniKuduCluster.java
index 006057c..0888441 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/MiniKuduCluster.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/MiniKuduCluster.java
@@ -333,6 +333,18 @@ public class MiniKuduCluster implements AutoCloseable {
   }
 
   /**
+   * Restart any master processes which are not currently running.
+   */
+  public void restartDeadMasters() throws Exception {
+    for (HostAndPort hostAndPort : masterHostPorts) {
+      if (!masterProcesses.containsKey(hostAndPort.getPort())) {
+        restartDeadProcessOnPort(hostAndPort.getPort(), masterProcesses);
+      }
+    }
+  }
+
+
+  /**
    * Starts a previously killed tablet server process on the specified port.
    * @param port which port the TS was listening on for RPCs
    * @throws Exception

http://git-wip-us.apache.org/repos/asf/kudu/blob/5020a503/java/kudu-client/src/test/java/org/apache/kudu/client/TestHandleTooBusy.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestHandleTooBusy.java
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestHandleTooBusy.java
new file mode 100644
index 0000000..0d8cd55
--- /dev/null
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestHandleTooBusy.java
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.kudu.client;
+
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Tests which provoke RPC queue overflow errors on the server side
+ * to ensure that we properly handle them in the client.
+ */
+public class TestHandleTooBusy extends BaseKuduTest {
+  private static final String TABLE_NAME = "TestHandleTooBusy";
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    miniClusterBuilder
+      // Short queue to provoke overflow.
+      .addMasterFlag("--rpc_service_queue_length=1")
+      // Low number of service threads, so things stay in the queue.
+      .addMasterFlag("--rpc_num_service_threads=3")
+      // inject latency so lookups process slowly.
+      .addMasterFlag("--master_inject_latency_on_tablet_lookups_ms=100");
+
+    BaseKuduTest.setUpBeforeClass();
+  }
+
+  /**
+   * Provoke overflows in the master RPC queue while connecting to the master
+   * and performing location lookups.
+   */
+  @Test(timeout=60000)
+  public void testMasterLookupOverflow() throws Exception {
+    createTable(TABLE_NAME, basicSchema, getBasicCreateTableOptions());
+    ExecutorService exec = Executors.newCachedThreadPool();
+    List<Future<Void>> futures = Lists.newArrayList();
+    for (int thr = 0; thr < 10; thr++) {
+      futures.add(exec.submit(new Callable<Void>() {
+        @Override
+        public Void call() throws Exception {
+          for (int i = 0; i < 5; i++) {
+            try (KuduClient c = new KuduClient.KuduClientBuilder(miniCluster.getMasterAddresses())
+                .build()) {
+              KuduTable table = c.openTable(TABLE_NAME);
+              for (int j = 0; j < 5; j++) {
+                KuduScanToken.KuduScanTokenBuilder scanBuilder = c.newScanTokenBuilder(table);
+                scanBuilder.build();
+                c.asyncClient.emptyTabletsCacheForTable(table.getTableId());
+              }
+            }
+          }
+          return null;
+        }
+      }));
+    }
+    for (Future<Void> f : futures) {
+      f.get();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/kudu/blob/5020a503/java/kudu-client/src/test/java/org/apache/kudu/client/TestMasterFailover.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestMasterFailover.java
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestMasterFailover.java
index cda6bb2..aede668 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestMasterFailover.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestMasterFailover.java
@@ -18,50 +18,72 @@ package org.apache.kudu.client;
 
 import static org.junit.Assert.assertEquals;
 
-import org.junit.BeforeClass;
+import org.junit.After;
 import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 
 /**
  * Tests {@link AsyncKuduClient} with multiple masters.
  */
 public class TestMasterFailover extends BaseKuduTest {
-  private static final Logger LOG = LoggerFactory.getLogger(TestMasterFailover.class);
-  private static final String TABLE_NAME =
-      TestMasterFailover.class.getName() + "-" + System.currentTimeMillis();
+  enum KillBefore {
+    CREATE_CLIENT,
+    CREATE_TABLE,
+    OPEN_TABLE,
+    SCAN_TABLE
+  }
 
-  @BeforeClass
-  public static void setUpBeforeClass() throws Exception {
-    BaseKuduTest.setUpBeforeClass();
-    createTable(TABLE_NAME, basicSchema, getBasicCreateTableOptions());
+  @After
+  public void restartDeadServers() throws Exception {
+    miniCluster.restartDeadMasters();
   }
 
   @Test(timeout = 30000)
-  public void testKillLeader() throws Exception {
+  public void testKillLeaderBeforeCreateClient() throws Exception {
+    doTestKillLeader(KillBefore.CREATE_CLIENT);
+  }
+  @Test(timeout = 30000)
+  public void testKillLeaderBeforeCreateTable() throws Exception {
+    doTestKillLeader(KillBefore.CREATE_TABLE);
+  }
+  @Test(timeout = 30000)
+  public void testKillLeaderBeforeOpenTable() throws Exception {
+    doTestKillLeader(KillBefore.OPEN_TABLE);
+  }
+  @Test(timeout = 30000)
+  public void testKillLeaderBeforeScanTable() throws Exception {
+    doTestKillLeader(KillBefore.SCAN_TABLE);
+  }
+
+  private void doTestKillLeader(KillBefore killBefore) throws Exception {
+    String tableName = "TestMasterFailover-killBefore=" + killBefore;
     int countMasters = masterHostPorts.size();
     if (countMasters < 3) {
-      LOG.info("This test requires at least 3 master servers, but only " + countMasters +
-          " are specified.");
-      return;
+      throw new Exception("This test requires at least 3 master servers, but only "
+        + countMasters + " are specified.");
     }
-    killMasterLeader();
 
-    // Test that we can open a previously created table after killing the leader master.
-    KuduTable table = openTable(TABLE_NAME);
-    assertEquals(0, countRowsInScan(client.newScannerBuilder(table).build()));
+    if (killBefore == KillBefore.CREATE_CLIENT) {
+      killMasterLeader();
+    }
+    try (KuduClient c = new KuduClient.KuduClientBuilder(miniCluster.getMasterAddresses())
+          .build()) {
+      if (killBefore == KillBefore.CREATE_TABLE) {
+        killMasterLeader();
+      }
 
-    // Test that we can create a new table when one of the masters is down.
-    String newTableName = TABLE_NAME + "-afterLeaderIsDead";
-    createTable(newTableName, basicSchema, getBasicCreateTableOptions());
-    table = openTable(newTableName);
-    assertEquals(0, countRowsInScan(client.newScannerBuilder(table).build()));
+      createTable(tableName, basicSchema, getBasicCreateTableOptions());
 
-    // Test that we can initialize a client when one of the masters specified in the
-    // connection string is down.
-    AsyncKuduClient newClient = new AsyncKuduClient.AsyncKuduClientBuilder(masterAddresses).build();
-    table = newClient.openTable(newTableName).join(DEFAULT_SLEEP);
-    assertEquals(0, countRowsInScan(newClient.newScannerBuilder(table).build()));
+      if (killBefore == KillBefore.OPEN_TABLE) {
+        killMasterLeader();
+      }
+
+      // Test that we can open a previously created table after killing the leader master.
+      KuduTable table = openTable(tableName);
+
+      if (killBefore == KillBefore.SCAN_TABLE) {
+        killMasterLeader();
+      }
+      assertEquals(0, countRowsInScan(client.newScannerBuilder(table).build()));
+    }
   }
 }


Mime
View raw message