kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a...@apache.org
Subject [kudu] 01/04: KUDU-2069 p4: stop replication from failed servers in maintenance mode
Date Tue, 01 Oct 2019 06:05:25 GMT
This is an automated email from the ASF dual-hosted git repository.

adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 5316a89dfd13c36eef078b32043f161e6d0bbf01
Author: Andrew Wong <awong@apache.org>
AuthorDate: Mon Sep 16 14:40:21 2019 -0700

    KUDU-2069 p4: stop replication from failed servers in maintenance mode
    
    When determining whether a replica needs to be added, we may now
    consider a set of of UUIDs that are allowed to be in a "bad" state while
    not counting towards being under-replicated.
    
    Since the goal of maintenance mode is to cope with unexpected failures,
    "healthy" movement, e.g. through tooling that uses REPLACE and PROMOTE
    tagging, is still allowed to and from tservers in maintenance mode.
    
    Testing:
    - a unit test is added to exercise the new quorum logic to ignore
      certain failed UUIDs, taking into account REPLACE and PROMOTE replicas
    - integration tests are added to test:
      - behavior with RF=3 through restarts of the master
      - behavior when running move_replica tooling
      - behavior with RF=5 with background failures
    
    Change-Id: I9a63b55011d16900c0d27eac0eb75880074204db
    Reviewed-on: http://gerrit.cloudera.org:8080/14222
    Reviewed-by: Adar Dembo <adar@cloudera.com>
    Reviewed-by: Alexey Serbin <aserbin@cloudera.com>
    Tested-by: Andrew Wong <awong@cloudera.com>
---
 src/kudu/consensus/quorum_util-test.cc             | 128 ++++++
 src/kudu/consensus/quorum_util.cc                  |  42 +-
 src/kudu/consensus/quorum_util.h                   |  20 +-
 src/kudu/integration-tests/CMakeLists.txt          |   2 +
 .../integration-tests/maintenance_mode-itest.cc    | 457 +++++++++++++++++++++
 src/kudu/master/catalog_manager.cc                 |   5 +-
 src/kudu/master/ts_manager.cc                      |  18 +-
 src/kudu/master/ts_manager.h                       |   9 +
 8 files changed, 657 insertions(+), 24 deletions(-)

diff --git a/src/kudu/consensus/quorum_util-test.cc b/src/kudu/consensus/quorum_util-test.cc
index c478be8..a0b3851 100644
--- a/src/kudu/consensus/quorum_util-test.cc
+++ b/src/kudu/consensus/quorum_util-test.cc
@@ -561,6 +561,134 @@ TEST(QuorumUtilTest, ShouldAddReplica) {
   }
 }
 
+// Test that when tablet replicas are ignored for underreplication (e.g. due to
+// maintenance mode of a tablet server), the decision to add a replica will
+// actually ignore failures as appropriate.
+TEST(QuorumUtilTest, ShouldAddReplicaIgnoreFailures) {
+  {
+    RaftConfigPB config;
+    AddPeer(&config, "A", V, '?');
+    AddPeer(&config, "B", V, '-');
+    AddPeer(&config, "C", V, '+');
+    // The failed server is ignored, and doesn't count towards being
+    // under-replicated. Note: The server with unknown health also doesn't
+    // count towards being under-replicated.
+    EXPECT_FALSE(ShouldAddReplica(config, 3, { "B" }));
+    // While the server with unknown health doesn't count towards being
+    // under-replicated, the failed server does. But since we require a
+    // majority to add replicas, we can't add a replica.
+    EXPECT_FALSE(ShouldAddReplica(config, 3, { "A" }));
+  }
+  {
+    RaftConfigPB config;
+    AddPeer(&config, "A", V, '?');
+    AddPeer(&config, "B", V, '+');
+    AddPeer(&config, "C", V, '+');
+    // This is healthy, with or without ignoring failures.
+    EXPECT_FALSE(ShouldAddReplica(config, 3));
+    EXPECT_FALSE(ShouldAddReplica(config, 3, { "A" }));
+  }
+  {
+    RaftConfigPB config;
+    AddPeer(&config, "A", V, '-');
+    AddPeer(&config, "B", V, '+');
+    AddPeer(&config, "C", V, '+');
+    // But when a healthy server is in maintenance mode, we should consider the
+    // unhealthy server as failed and add a replica.
+    EXPECT_TRUE(ShouldAddReplica(config, 3, { "B" }));
+    // When the unhealthy server is in maintenance mode, we shouldn't add a
+    // replica, since all three servers aren't considered failed.
+    EXPECT_FALSE(ShouldAddReplica(config, 3, { "A" }));
+    // And when everything is in maintenance mode, we shouldn't add a replica
+    // even though a majority exists.
+    EXPECT_FALSE(ShouldAddReplica(config, 3, { "A", "B", "C" }));
+  }
+  {
+    RaftConfigPB config;
+    AddPeer(&config, "A", V, '-');
+    AddPeer(&config, "B", V, '-');
+    AddPeer(&config, "C", V, '+');
+    // A majority doesn't exist, so no matter what failures are being ignored,
+    // we will not add a replica.
+    EXPECT_FALSE(ShouldAddReplica(config, 3, { "A" }));
+    EXPECT_FALSE(ShouldAddReplica(config, 3, { "A", "B" }));
+    EXPECT_FALSE(ShouldAddReplica(config, 3, { "A", "B", "C" }));
+  }
+  {
+    // Ignored servers shouldn't change the decision when we really are
+    // under-replicated.
+    RaftConfigPB config;
+    AddPeer(&config, "A", V, '-');
+    AddPeer(&config, "B", V, '+');
+
+    // No majority present.
+    EXPECT_FALSE(ShouldAddReplica(config, 3, { "A" }));
+    EXPECT_FALSE(ShouldAddReplica(config, 3, { "B" }));
+  }
+  {
+    RaftConfigPB config;
+    AddPeer(&config, "A", V, '-');
+    AddPeer(&config, "B", V, '-');
+    AddPeer(&config, "C", V, '+');
+    AddPeer(&config, "D", V, '+');
+    AddPeer(&config, "E", V, '+');
+    // When both failed replicas are being ignored, we shouldn't add a replica.
+    EXPECT_FALSE(ShouldAddReplica(config, 5, { "A", "B" }));
+    // When only one of them is ignored, we should.
+    EXPECT_TRUE(ShouldAddReplica(config, 5, { "A" }));
+  }
+}
+
+// Test that when tablet replicas are ignored for underreplication, replace is
+// still honored as appropriate.
+TEST(QuorumUtilTest, ShouldAddReplicaHonorReplaceWhenIgnoringFailures) {
+  // Even if the replica to replace is meant to be ignored on failure, we
+  // should honor the replacement and try to add a replica.
+  for (char health : { '+', '-', '?' }) {
+    RaftConfigPB config;
+    AddPeer(&config, "A", V, health, {{"REPLACE", true}});
+    AddPeer(&config, "B", V, '+');
+    AddPeer(&config, "C", V, '+');
+    EXPECT_TRUE(ShouldAddReplica(config, 3, { "A" }));
+  }
+  {
+    RaftConfigPB config;
+    AddPeer(&config, "A", V, '+', {{"REPLACE", true}});
+    AddPeer(&config, "B", V, '-');
+    AddPeer(&config, "C", V, '+');
+    // Ignoring failures shouldn't impede our ability to add a replica when the
+    // "ignored" server is actually healthy.
+    EXPECT_TRUE(ShouldAddReplica(config, 3, { "A" }));
+  }
+}
+
+TEST(QuorumUtilTest, ShouldAddReplicaHonorPromoteWhenIgnoringFailures) {
+  // If one of our replicas to promote has failed, and we are supposed to
+  // ignore its failure, we should not add a replica because of it.
+  // And if they're healthy or unknown, we also shouldn't add a replica.
+  for (char health : { '+', '-', '?' }) {
+    {
+      RaftConfigPB config;
+      AddPeer(&config, "A", N, health, {{"PROMOTE", true}});
+      AddPeer(&config, "B", V, '+');
+      AddPeer(&config, "C", V, '+');
+      EXPECT_FALSE(ShouldAddReplica(config, 3, { "A" }));
+    }
+    {
+      RaftConfigPB config;
+      AddPeer(&config, "A", N, health, {{"PROMOTE", true}});
+      AddPeer(&config, "B", N, '-', {{"PROMOTE", true}});
+      AddPeer(&config, "C", V, '+');
+      AddPeer(&config, "D", V, '+');
+      AddPeer(&config, "E", V, '+');
+      EXPECT_FALSE(ShouldAddReplica(config, 5, { "A", "B" }));
+      // But when there is a failure that isn't supposed to be ignored (B), we
+      // should add a replica.
+      EXPECT_TRUE(ShouldAddReplica(config, 5, { "A" }));
+    }
+  }
+}
+
 // Verify logic of the kudu::consensus::ShouldEvictReplica(), anticipating
 // removal of a voter replica.
 TEST(QuorumUtilTest, ShouldEvictReplicaVoters) {
diff --git a/src/kudu/consensus/quorum_util.cc b/src/kudu/consensus/quorum_util.cc
index 40f09cb..dccf0fa 100644
--- a/src/kudu/consensus/quorum_util.cc
+++ b/src/kudu/consensus/quorum_util.cc
@@ -22,6 +22,7 @@
 #include <queue>
 #include <set>
 #include <string>
+#include <unordered_set>
 #include <utility>
 #include <vector>
 
@@ -44,6 +45,7 @@ using std::pair;
 using std::priority_queue;
 using std::set;
 using std::string;
+using std::unordered_set;
 using std::vector;
 using strings::Substitute;
 
@@ -418,8 +420,9 @@ string DiffConsensusStates(const ConsensusStatePB& old_state,
 
 // The decision is based on:
 //
-//   * the number of voter replicas in definitively bad shape and replicas
-//     marked with the REPLACE attribute
+//   * the number of voter replicas that aren't ignored (i.e. that aren't
+//     allowed to be in bad shape), that are definitively in bad shape or are
+//     marked with the REPLACE attribute.
 //
 //   * the number of non-voter replicas marked with the PROMOTE=true attribute
 //     in good or possibly good state.
@@ -433,7 +436,8 @@ string DiffConsensusStates(const ConsensusStatePB& old_state,
 // TODO(aserbin): add a test scenario for the leader replica's logic to cover
 //                the latter case.
 bool ShouldAddReplica(const RaftConfigPB& config,
-                      int replication_factor) {
+                      int replication_factor,
+                      const unordered_set<string>& uuids_ignored_for_underreplication)
{
   int num_voters_total = 0;
   int num_voters_healthy = 0;
   int num_voters_need_replacement = 0;
@@ -445,12 +449,24 @@ bool ShouldAddReplica(const RaftConfigPB& config,
   VLOG(2) << "config to evaluate: " << SecureDebugString(config);
   for (const RaftPeerPB& peer : config.peers()) {
     const auto overall_health = peer.health_report().overall_health();
+    const auto& peer_uuid = peer.permanent_uuid();
+    bool ignore_failure_for_underreplication = ContainsKey(uuids_ignored_for_underreplication,
+                                                           peer_uuid);
+    if (VLOG_IS_ON(1) && ignore_failure_for_underreplication) {
+      VLOG(1) << Substitute("ignoring $0 if failed", peer_uuid);
+    }
     switch (peer.member_type()) {
       case RaftPeerPB::VOTER:
         ++num_voters_total;
-        if (peer.attrs().replace() ||
-            overall_health == HealthReportPB::FAILED ||
-            overall_health == HealthReportPB::FAILED_UNRECOVERABLE) {
+        if (peer.attrs().replace()) {
+          ++num_voters_need_replacement;
+        } else if (overall_health == HealthReportPB::FAILED ||
+                   overall_health == HealthReportPB::FAILED_UNRECOVERABLE) {
+          // If the failed peer should be ignored, e.g. the server is in
+          // maintenance mode, don't count it towards under-replication.
+          if (ignore_failure_for_underreplication) {
+            continue;
+          }
           ++num_voters_need_replacement;
         }
         if (overall_health == HealthReportPB::HEALTHY) {
@@ -458,17 +474,19 @@ bool ShouldAddReplica(const RaftConfigPB& config,
         }
         break;
       case RaftPeerPB::NON_VOTER:
-        if (peer.attrs().promote() &&
-            overall_health != HealthReportPB::FAILED &&
-            overall_health != HealthReportPB::FAILED_UNRECOVERABLE) {
-          // A replica with HEALTHY or UNKNOWN overall health status
-          // is considered as a replica to promote: a new non-voter replica is
+        if (peer.attrs().promote()) {
+          // A replica with HEALTHY or UNKNOWN overall health status is
+          // considered as a replica to promote: a new non-voter replica is
           // added with UNKNOWN health status. If such a replica is not
           // responsive for a long time, then its state will change to
           // HealthReportPB::FAILED after some time and it will be evicted. But
           // before that, it's considered as a candidate for promotion in the
           // code below.
-          ++num_non_voters_to_promote;
+          if ((overall_health != HealthReportPB::FAILED &&
+               overall_health != HealthReportPB::FAILED_UNRECOVERABLE) ||
+              ignore_failure_for_underreplication) {
+            ++num_non_voters_to_promote;
+          }
         }
         break;
       default:
diff --git a/src/kudu/consensus/quorum_util.h b/src/kudu/consensus/quorum_util.h
index cc11319..e6afbaa 100644
--- a/src/kudu/consensus/quorum_util.h
+++ b/src/kudu/consensus/quorum_util.h
@@ -14,11 +14,10 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-
-#ifndef KUDU_CONSENSUS_QUORUM_UTIL_H_
-#define KUDU_CONSENSUS_QUORUM_UTIL_H_
+#pragma once
 
 #include <string>
+#include <unordered_set>
 
 #include "kudu/consensus/metadata.pb.h"
 #include "kudu/util/status.h"
@@ -106,12 +105,16 @@ std::string DiffConsensusStates(const ConsensusStatePB& old_state,
 std::string DiffRaftConfigs(const RaftConfigPB& old_config,
                             const RaftConfigPB& new_config);
 
-// Return 'true' iff the specified tablet configuration is under-replicated
-// given the 'replication_factor' and a healthy majority exists. The decision
-// is based on the health information provided by the Raft configuration in the
-// 'config' parameter.
+// Return 'true' iff there is a quorum and the specified tablet configuration
+// is under-replicated given the 'replication_factor', ignoring failures of
+// the UUIDs in 'uuids_ignored_for_underreplication'.
+//
+// The decision is based on the health information provided by the Raft
+// configuration in the 'config' parameter.
 bool ShouldAddReplica(const RaftConfigPB& config,
-                      int replication_factor);
+                      int replication_factor,
+                      const std::unordered_set<std::string>& uuids_ignored_for_underreplication
=
+                          std::unordered_set<std::string>());
 
 // Check if the given Raft configuration contains at least one extra replica
 // which should (and can) be removed in accordance with the specified
@@ -126,4 +129,3 @@ bool ShouldEvictReplica(const RaftConfigPB& config,
 }  // namespace consensus
 }  // namespace kudu
 
-#endif /* KUDU_CONSENSUS_QUORUM_UTIL_H_ */
diff --git a/src/kudu/integration-tests/CMakeLists.txt b/src/kudu/integration-tests/CMakeLists.txt
index dd8b54f..cd103ab 100644
--- a/src/kudu/integration-tests/CMakeLists.txt
+++ b/src/kudu/integration-tests/CMakeLists.txt
@@ -45,6 +45,7 @@ target_link_libraries(itest_util
   kudu_curl_util
   kudu_fs
   kudu_test_util
+  kudu_tools_test_util
   kudu_tools_util
   security_test_util)
 add_dependencies(itest_util
@@ -81,6 +82,7 @@ ADD_KUDU_TEST(fuzz-itest RUN_SERIAL true)
 ADD_KUDU_TEST(heavy-update-compaction-itest RUN_SERIAL true)
 ADD_KUDU_TEST(linked_list-test RUN_SERIAL true)
 ADD_KUDU_TEST(log-rolling-itest)
+ADD_KUDU_TEST(maintenance_mode-itest)
 ADD_KUDU_TEST(master_cert_authority-itest PROCESSORS 2)
 ADD_KUDU_TEST(master_failover-itest NUM_SHARDS 4 PROCESSORS 3)
 ADD_KUDU_TEST_DEPENDENCIES(master_failover-itest
diff --git a/src/kudu/integration-tests/maintenance_mode-itest.cc b/src/kudu/integration-tests/maintenance_mode-itest.cc
new file mode 100644
index 0000000..6d620f4
--- /dev/null
+++ b/src/kudu/integration-tests/maintenance_mode-itest.cc
@@ -0,0 +1,457 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+#include <cstdio>
+#include <memory>
+#include <string>
+#include <unordered_map>
+#include <utility>
+#include <vector>
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/consensus/consensus.pb.h"
+#include "kudu/consensus/metadata.pb.h"
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/stl_util.h"
+#include "kudu/integration-tests/cluster_itest_util.h"
+#include "kudu/integration-tests/cluster_verifier.h"
+#include "kudu/integration-tests/external_mini_cluster-itest-base.h"
+#include "kudu/integration-tests/test_workload.h"
+#include "kudu/master/master.pb.h"
+#include "kudu/master/master.proxy.h"
+#include "kudu/mini-cluster/external_mini_cluster.h"
+#include "kudu/mini-cluster/mini_cluster.h"
+#include "kudu/rpc/rpc_controller.h"
+#include "kudu/tools/tool_test_util.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/path_util.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/scoped_cleanup.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+
+METRIC_DECLARE_entity(server);
+METRIC_DECLARE_histogram(
+    handler_latency_kudu_tserver_TabletCopyService_BeginTabletCopySession);
+
+using kudu::master::ChangeTServerStateRequestPB;
+using kudu::master::ChangeTServerStateResponsePB;
+using kudu::cluster::ExternalDaemon;
+using kudu::cluster::ExternalMiniClusterOptions;
+using kudu::cluster::ExternalTabletServer;
+using kudu::consensus::ConsensusStatePB;
+using kudu::consensus::HealthReportPB;
+using kudu::consensus::IncludeHealthReport;
+using kudu::itest::GetInt64Metric;
+using kudu::master::MasterServiceProxy;
+using kudu::master::TServerStateChangePB;
+using kudu::tools::RunKuduTool;
+using std::pair;
+using std::shared_ptr;
+using std::string;
+using std::unique_ptr;
+using std::unordered_map;
+using std::vector;
+
+namespace kudu {
+namespace itest {
+
+typedef pair<unordered_map<string, TServerDetails*>, unique_ptr<ValueDeleter>>
MapAndDeleter;
+static const vector<string> kTServerFlags = {
+  // Set a low unavailability timeout so replicas are considered failed and can
+  // be re-replicated more quickly.
+  "--raft_heartbeat_interval_ms=100",
+  "--follower_unavailable_considered_failed_sec=2",
+  // Disable log GC in case our write workloads lead to eviction because
+  // consensus will consider replicas that are too fare behind unrecoverable
+  // and will evict them regardless of maintenance mode.
+  "--enable_log_gc=false",
+};
+static const MonoDelta kDurationForSomeHeartbeats = MonoDelta::FromSeconds(3);
+
+class MaintenanceModeITest : public ExternalMiniClusterITestBase {
+ public:
+  void SetUpCluster(int num_tservers) {
+    ExternalMiniClusterOptions opts;
+    opts.num_tablet_servers = num_tservers;
+    opts.extra_master_flags = { "--master_support_maintenance_mode=true" };
+    opts.extra_tserver_flags = kTServerFlags;
+    NO_FATALS(StartClusterWithOpts(std::move(opts)));
+    const auto& addr = cluster_->master(0)->bound_rpc_addr();
+    m_proxy_.reset(new MasterServiceProxy(cluster_->messenger(), addr, addr.host()));
+  }
+
+  // Perform the given state change on the given tablet server.
+  Status ChangeTServerState(const string& uuid, TServerStateChangePB::StateChange change)
{
+    ChangeTServerStateRequestPB req;
+    ChangeTServerStateResponsePB resp;
+    TServerStateChangePB* state_change = req.mutable_change();
+    state_change->set_uuid(uuid);
+    state_change->set_change(change);
+    rpc::RpcController rpc;
+    return cluster_->master_proxy()->ChangeTServerState(req, &resp, &rpc);
+  }
+
+  // Checks whether tablet copies have started.
+  void ExpectStartedTabletCopies(bool should_have_started) {
+    bool has_started = false;
+    for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
+      ExternalTabletServer* tserver = cluster_->tablet_server(i);
+      if (tserver->IsShutdown()) {
+        continue;
+      }
+      int64_t copies_started = 0;
+      ASSERT_OK(GetInt64Metric(tserver->bound_http_hostport(), &METRIC_ENTITY_server,
+          /*entity_id=*/nullptr,
+          &METRIC_handler_latency_kudu_tserver_TabletCopyService_BeginTabletCopySession,
+          "total_count", &copies_started));
+      if (copies_started > 0) {
+        has_started = true;
+        break;
+      }
+    }
+    ASSERT_EQ(should_have_started, has_started);
+  }
+
+  // Return the number of failed replicas there are in the cluster, according
+  // to the tablet leaders.
+  Status GetNumFailedReplicas(const unordered_map<string, TServerDetails*>& ts_map,
+                              int* num_replicas_failed) {
+    int num_failed = 0;
+    for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
+      ExternalTabletServer* tserver = cluster_->tablet_server(i);
+      if (tserver->IsShutdown()) {
+        continue;
+      }
+      const string& uuid = tserver->uuid();
+      const TServerDetails* ts_details = FindOrDie(ts_map, uuid);
+      vector<string> tablet_ids;
+      RETURN_NOT_OK(ListRunningTabletIds(ts_details, MonoDelta::FromSeconds(30), &tablet_ids));
+      for (const auto& tablet_id : tablet_ids) {
+        ConsensusStatePB consensus_state;
+        RETURN_NOT_OK(GetConsensusState(ts_details, tablet_id, MonoDelta::FromSeconds(30),
+            IncludeHealthReport::INCLUDE_HEALTH_REPORT, &consensus_state));
+        // Only consider the health states reported by the leaders.
+        if (consensus_state.leader_uuid() != uuid) {
+          continue;
+        }
+        // Go through all the peers and tally up any that are failed.
+        const auto& committed_config = consensus_state.committed_config();
+        for (int p = 0; p < committed_config.peers_size(); p++) {
+          const auto& peer = committed_config.peers(p);
+          if (peer.has_health_report() &&
+              peer.health_report().overall_health() == HealthReportPB::FAILED) {
+            num_failed++;
+          }
+        }
+      }
+    }
+    *num_replicas_failed = num_failed;
+    return Status::OK();
+  }
+
+  void AssertEventuallyNumFailedReplicas(const unordered_map<string, TServerDetails*>&
ts_map,
+                                         int expected_failed) {
+    ASSERT_EVENTUALLY([&] {
+      int num_failed;
+      ASSERT_OK(GetNumFailedReplicas(ts_map, &num_failed));
+      ASSERT_EQ(expected_failed, num_failed);
+    });
+  }
+
+  void GenerateTServerMap(MapAndDeleter* map_and_deleter) {
+    unordered_map<string, TServerDetails*> ts_map;
+    ASSERT_EVENTUALLY([&] {
+      ASSERT_OK(CreateTabletServerMap(m_proxy_, cluster_->messenger(), &ts_map));
+      auto cleanup = MakeScopedCleanup([&] {
+        STLDeleteValues(&ts_map);
+      });
+      ASSERT_EQ(cluster_->num_tablet_servers(), ts_map.size());
+      cleanup.cancel();
+    });
+    map_and_deleter->first = std::move(ts_map);
+    map_and_deleter->second.reset(new ValueDeleter(&map_and_deleter->first));
+  }
+
+ protected:
+  shared_ptr<MasterServiceProxy> m_proxy_;
+};
+
+class MaintenanceModeRF3ITest : public MaintenanceModeITest {
+ public:
+  void SetUp() override {
+    SKIP_IF_SLOW_NOT_ALLOWED();
+    NO_FATALS(MaintenanceModeITest::SetUp());
+    NO_FATALS(SetUpCluster(3));
+  }
+  void TearDown() override {
+    SKIP_IF_SLOW_NOT_ALLOWED();
+    NO_FATALS(MaintenanceModeITest::TearDown());
+  }
+};
+
+// Test that placing a tablet server in maintenance mode leads to the failed
+// replicas on that server not being re-replicated.
+TEST_F(MaintenanceModeRF3ITest, TestFailedTServerInMaintenanceModeDoesntRereplicate) {
+  // This test will sleep a bit to ensure the master has had some time to
+  // receive heartbeats.
+  SKIP_IF_SLOW_NOT_ALLOWED();
+  const int kNumTablets = 6;
+
+  // Create the table with three tablet servers and then add one so we're
+  // guaranteed that the replicas are all on the first three servers.
+  TestWorkload create_table(cluster_.get());
+  create_table.set_num_tablets(kNumTablets);
+  create_table.Setup();
+  create_table.Start();
+  // Add a server so there's one we could move to after bringing down a
+  // tserver.
+  ASSERT_OK(cluster_->AddTabletServer());
+  MapAndDeleter ts_map_and_deleter;
+  NO_FATALS(GenerateTServerMap(&ts_map_and_deleter));
+  const auto& ts_map = ts_map_and_deleter.first;
+
+  // Do a sanity check that all our replicas are healthy.
+  NO_FATALS(AssertEventuallyNumFailedReplicas(ts_map, 0));
+
+  // Put one of the servers in maintenance mode.
+  ExternalTabletServer* maintenance_ts = cluster_->tablet_server(0);
+  const string maintenance_uuid = maintenance_ts->uuid();
+  ASSERT_OK(ChangeTServerState(maintenance_uuid, TServerStateChangePB::ENTER_MAINTENANCE_MODE));
+
+  // Bringing the tablet server down shouldn't lead to re-replication.
+  //
+  // Note: it's possible to set up re-replication scenarios in other ways (e.g.
+  // by hitting an IO error, or falling too far behind when replicating); these
+  // should all be treated the same way by virtue of them all using the same
+  // health reporting.
+  NO_FATALS(maintenance_ts->Shutdown());
+
+  // Now wait a bit for this failure to make its way to the master. The failure
+  // shouldn't have led to any re-replication.
+  SleepFor(kDurationForSomeHeartbeats);
+  NO_FATALS(AssertEventuallyNumFailedReplicas(ts_map, kNumTablets));
+  NO_FATALS(ExpectStartedTabletCopies(/*should_have_started*/false));
+
+  // Restarting the masters shouldn't lead to re-replication either, even
+  // though the tablet server is still down.
+  NO_FATALS(cluster_->ShutdownNodes(cluster::ClusterNodes::MASTERS_ONLY));
+  ASSERT_OK(cluster_->master()->Restart());
+  SleepFor(kDurationForSomeHeartbeats);
+  NO_FATALS(ExpectStartedTabletCopies(/*should_have_started*/false));
+
+  // Now bring the server back up and wait for it to become healthy. It should
+  // be able to do this without tablet copies.
+  ASSERT_OK(maintenance_ts->Restart());
+  NO_FATALS(AssertEventuallyNumFailedReplicas(ts_map, 0));
+
+  // Since our server is healthy, leaving maintenance mode shouldn't trigger
+  // any re-replication either.
+  ASSERT_OK(ChangeTServerState(maintenance_uuid, TServerStateChangePB::EXIT_MAINTENANCE_MODE));
+  SleepFor(kDurationForSomeHeartbeats);
+  NO_FATALS(ExpectStartedTabletCopies(/*should_have_started*/false));
+
+  // All the while, our workload should not have been interrupted. Assert
+  // eventually to wait for the rows to converge.
+  NO_FATALS(create_table.StopAndJoin());
+  ClusterVerifier v(cluster_.get());
+  ASSERT_EVENTUALLY([&] {
+    NO_FATALS(v.CheckRowCount(create_table.table_name(),
+                              ClusterVerifier::EXACTLY, create_table.rows_inserted()));
+  });
+}
+
+TEST_F(MaintenanceModeRF3ITest, TestMaintenanceModeDoesntObstructMove) {
+  SKIP_IF_SLOW_NOT_ALLOWED();
+  TestWorkload create_table(cluster_.get());
+  create_table.set_num_tablets(1);
+  create_table.Setup();
+
+  // Add a tablet server.
+  ASSERT_OK(cluster_->AddTabletServer());
+  const string& added_uuid = cluster_->tablet_server(3)->uuid();
+  MapAndDeleter ts_map_and_deleter;
+  NO_FATALS(GenerateTServerMap(&ts_map_and_deleter));
+  const auto& ts_map = ts_map_and_deleter.first;
+
+  // Put a tablet server into maintenance mode.
+  const string maintenance_uuid = cluster_->tablet_server(0)->uuid();
+  const TServerDetails* maintenance_details = FindOrDie(ts_map, maintenance_uuid);
+  vector<string> mnt_tablet_ids;
+  ASSERT_OK(ListRunningTabletIds(maintenance_details, MonoDelta::FromSeconds(30), &mnt_tablet_ids));
+  ASSERT_EQ(1, mnt_tablet_ids.size());
+
+  ASSERT_OK(ChangeTServerState(maintenance_uuid, TServerStateChangePB::ENTER_MAINTENANCE_MODE));
+
+  // While the maintenance mode tserver is still online, move a tablet from it.
+  // This should succeed, because maintenance mode will not obstruct manual
+  // movement of replicas.
+  {
+    vector<string> move_cmd = {
+      "tablet",
+      "change_config",
+      "move_replica",
+      cluster_->master()->bound_rpc_addr().ToString(),
+      mnt_tablet_ids[0],
+      maintenance_uuid,
+      added_uuid,
+    };
+    string stdout, stderr;
+    ASSERT_OK(RunKuduTool(move_cmd, &stdout, &stderr));
+  }
+  const TServerDetails* added_details = FindOrDie(ts_map, added_uuid);
+  ASSERT_EVENTUALLY([&] {
+    vector<string> added_tablet_ids;
+    ASSERT_OK(ListRunningTabletIds(added_details, MonoDelta::FromSeconds(30), &added_tablet_ids));
+    ASSERT_EQ(1, added_tablet_ids.size());
+  });
+}
+
+// Test that the health state FAILED_UNRECOVERABLE (e.g. if there's a disk
+// error, or if a replica is lagging too much) is still re-replicated during
+// maintenance mode.
+TEST_F(MaintenanceModeRF3ITest, TestMaintenanceModeDoesntObstructFailedUnrecoverable) {
+  SKIP_IF_SLOW_NOT_ALLOWED();
+  TestWorkload create_table(cluster_.get());
+  create_table.set_num_tablets(1);
+  create_table.Setup();
+  create_table.Start();
+
+  // Add a tablet server.
+  ASSERT_OK(cluster_->AddTabletServer());
+  const string& added_uuid = cluster_->tablet_server(3)->uuid();
+  MapAndDeleter ts_map_and_deleter;
+  NO_FATALS(GenerateTServerMap(&ts_map_and_deleter));
+  const auto& ts_map = ts_map_and_deleter.first;
+
+  // Put a tablet server into maintenance mode.
+  ExternalDaemon* maintenance_ts = cluster_->tablet_server(0);
+  const string maintenance_uuid = maintenance_ts->uuid();
+  const TServerDetails* maintenance_details = FindOrDie(ts_map, maintenance_uuid);
+  vector<string> mnt_tablet_ids;
+  ASSERT_OK(ListRunningTabletIds(maintenance_details, MonoDelta::FromSeconds(30), &mnt_tablet_ids));
+  ASSERT_OK(ChangeTServerState(maintenance_uuid, TServerStateChangePB::ENTER_MAINTENANCE_MODE));
+
+  // Now fail the tablet on the server in maintenance mode by injecting a disk
+  // error. Also speed up flushes so we actually hit an IO error.
+  ASSERT_OK(cluster_->SetFlag(maintenance_ts, "flush_threshold_secs", "1"));
+  ASSERT_OK(cluster_->SetFlag(maintenance_ts, "env_inject_eio_globs",
+      JoinPathSegments(maintenance_ts->data_dirs()[0], "**")));
+  ASSERT_OK(cluster_->SetFlag(maintenance_ts, "env_inject_eio", "1"));
+
+  // Eventually the disk failure will be noted and a copy will be made at the
+  // added server.
+  const TServerDetails* added_details = FindOrDie(ts_map, added_uuid);
+  ASSERT_EVENTUALLY([&] {
+    vector<string> added_tablet_ids;
+    ASSERT_OK(ListRunningTabletIds(added_details, MonoDelta::FromSeconds(30), &added_tablet_ids));
+    ASSERT_EQ(1, added_tablet_ids.size());
+  });
+  NO_FATALS(create_table.StopAndJoin());
+  ClusterVerifier v(cluster_.get());
+  ASSERT_EVENTUALLY([&] {
+    NO_FATALS(v.CheckRowCount(create_table.table_name(),
+                              ClusterVerifier::EXACTLY, create_table.rows_inserted()));
+  });
+}
+
+class MaintenanceModeRF5ITest : public MaintenanceModeITest {
+ public:
+  void SetUp() override {
+    SKIP_IF_SLOW_NOT_ALLOWED();
+    NO_FATALS(MaintenanceModeITest::SetUp());
+    NO_FATALS(SetUpCluster(5));
+  }
+  void TearDown() override {
+    SKIP_IF_SLOW_NOT_ALLOWED();
+    NO_FATALS(MaintenanceModeITest::TearDown());
+  }
+};
+
+// Test that a table with RF=5 will still be available through the failure of
+// two nodes if one is put in maintenance mode.
+TEST_F(MaintenanceModeRF5ITest, TestBackgroundFailureDuringMaintenanceMode) {
+  SKIP_IF_SLOW_NOT_ALLOWED();
+  // Create some tables with RF=5.
+  const int kNumTablets = 3;
+  TestWorkload create_table(cluster_.get());
+  create_table.set_num_tablets(kNumTablets);
+  create_table.set_num_replicas(5);
+  create_table.Setup();
+  create_table.Start();
+
+  // Add a server so we have one empty server to replicate to.
+  ASSERT_OK(cluster_->AddTabletServer());
+  MapAndDeleter ts_map_and_deleter;
+  NO_FATALS(GenerateTServerMap(&ts_map_and_deleter));
+  const auto& ts_map = ts_map_and_deleter.first;
+
+  // Do a sanity check that all our replicas are healthy.
+  NO_FATALS(AssertEventuallyNumFailedReplicas(ts_map, 0));
+
+  // Enter maintenance mode on a tserver and shut it down.
+  ExternalTabletServer* maintenance_ts = cluster_->tablet_server(0);
+  const string maintenance_uuid = maintenance_ts->uuid();
+  ASSERT_OK(ChangeTServerState(maintenance_uuid, TServerStateChangePB::ENTER_MAINTENANCE_MODE));
+  NO_FATALS(maintenance_ts->Shutdown());
+  SleepFor(kDurationForSomeHeartbeats);
+
+  // Wait for the failure to be recognized by the other replicas.
+  NO_FATALS(AssertEventuallyNumFailedReplicas(ts_map, kNumTablets));
+  NO_FATALS(ExpectStartedTabletCopies(/*should_have_started*/false));
+
+  // Now kill another server. We should be able to see some copies.
+  NO_FATALS(cluster_->tablet_server(1)->Shutdown());
+  ASSERT_EVENTUALLY([&] {
+    NO_FATALS(ExpectStartedTabletCopies(/*should_have_started*/true));
+  });
+  NO_FATALS(AssertEventuallyNumFailedReplicas(ts_map, 0));
+  // The previously empty tablet server should hold all the replicas that were
+  // re-replicated.
+  const TServerDetails* added_details = FindOrDie(ts_map, cluster_->tablet_server(5)->uuid());
+  ASSERT_EVENTUALLY([&] {
+    vector<string> added_tablet_ids;
+    ASSERT_OK(ListRunningTabletIds(added_details, MonoDelta::FromSeconds(30), &added_tablet_ids));
+    ASSERT_EQ(kNumTablets, added_tablet_ids.size());
+  });
+  // Now exit maintenance mode and restart the maintenance tserver. The
+  // original tablets on the maintenance mode tserver should still exist.
+  ASSERT_OK(ChangeTServerState(maintenance_uuid, TServerStateChangePB::EXIT_MAINTENANCE_MODE));
+  ASSERT_OK(maintenance_ts->Restart());
+  SleepFor(kDurationForSomeHeartbeats);
+  const TServerDetails* maintenance_details = FindOrDie(ts_map, maintenance_uuid);
+  vector<string> mnt_tablet_ids;
+  ASSERT_OK(ListRunningTabletIds(maintenance_details, MonoDelta::FromSeconds(30), &mnt_tablet_ids));
+  ASSERT_EQ(kNumTablets, mnt_tablet_ids.size());
+
+  // All the while, our workload should not have been interrupted. Assert
+  // eventually to wait for the rows to converge.
+  NO_FATALS(create_table.StopAndJoin());
+  ClusterVerifier v(cluster_.get());
+  ASSERT_EVENTUALLY([&] {
+    NO_FATALS(v.CheckRowCount(create_table.table_name(),
+                              ClusterVerifier::EXACTLY, create_table.rows_inserted()));
+  });
+}
+
+} // namespace itest
+} // namespace kudu
diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc
index 83bea9e..ab1c7ad 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -3989,6 +3989,8 @@ Status CatalogManager::ProcessTabletReport(
   // 3. Process each tablet. This may not be in the order that the tablets
   // appear in 'full_report', but that has no bearing on correctness.
   vector<scoped_refptr<TabletInfo>> mutated_tablets;
+  unordered_set<string> uuids_ignored_for_underreplication =
+      master_->ts_manager()->GetUuidsToIgnoreForUnderreplication();
   for (const auto& e : tablet_infos) {
     const string& tablet_id = e.first;
     const scoped_refptr<TabletInfo>& tablet = e.second;
@@ -4199,7 +4201,8 @@ Status CatalogManager::ProcessTabletReport(
           rpcs.emplace_back(new AsyncEvictReplicaTask(
               master_, tablet, cstate, std::move(to_evict)));
         } else if (FLAGS_master_add_server_when_underreplicated &&
-                   ShouldAddReplica(config, replication_factor)) {
+                   ShouldAddReplica(config, replication_factor,
+                                    uuids_ignored_for_underreplication)) {
           rpcs.emplace_back(new AsyncAddReplicaTask(
               master_, tablet, cstate, RaftPeerPB::NON_VOTER, &rng_));
         }
diff --git a/src/kudu/master/ts_manager.cc b/src/kudu/master/ts_manager.cc
index 7106ed3..79d3088 100644
--- a/src/kudu/master/ts_manager.cc
+++ b/src/kudu/master/ts_manager.cc
@@ -20,6 +20,7 @@
 #include <algorithm>
 #include <limits>
 #include <mutex>
+#include <utility>
 
 #include <boost/optional/optional.hpp>
 #include <gflags/gflags.h>
@@ -57,6 +58,7 @@ METRIC_DEFINE_gauge_int32(server, cluster_replica_skew,
 
 using kudu::pb_util::SecureShortDebugString;
 using std::lock_guard;
+using std::unordered_set;
 using std::shared_ptr;
 using std::string;
 using strings::Substitute;
@@ -202,10 +204,22 @@ int TSManager::GetCount() const {
   return servers_by_id_.size();
 }
 
+unordered_set<string> TSManager::GetUuidsToIgnoreForUnderreplication() const {
+  unordered_set<string> uuids;
+  shared_lock<RWMutex> tsl(ts_state_lock_);
+  uuids.reserve(ts_state_by_uuid_.size());
+  for (const auto& ts_and_state : ts_state_by_uuid_) {
+    if (ts_and_state.second == TServerStatePB::MAINTENANCE_MODE) {
+      uuids.emplace(ts_and_state.first);
+    }
+  }
+  return uuids;
+}
+
 void TSManager::GetDescriptorsAvailableForPlacement(TSDescriptorVector* descs) const {
   descs->clear();
-  shared_lock<rw_spinlock> l(lock_);
   shared_lock<RWMutex> tsl(ts_state_lock_);
+  shared_lock<rw_spinlock> l(lock_);
   descs->reserve(servers_by_id_.size());
   for (const TSDescriptorMap::value_type& entry : servers_by_id_) {
     const shared_ptr<TSDescriptor>& ts = entry.second;
@@ -272,7 +286,7 @@ int TSManager::ClusterSkew() const {
 }
 
 bool TSManager::AvailableForPlacementUnlocked(const TSDescriptor& ts) const {
-  DCHECK(lock_.is_locked());
+  ts_state_lock_.AssertAcquired();
   // TODO(KUDU-1827): this should also be used when decommissioning a server.
   if (GetTServerStateUnlocked(ts.permanent_uuid()) == TServerStatePB::MAINTENANCE_MODE) {
     return false;
diff --git a/src/kudu/master/ts_manager.h b/src/kudu/master/ts_manager.h
index 7d3603f..d1cc925 100644
--- a/src/kudu/master/ts_manager.h
+++ b/src/kudu/master/ts_manager.h
@@ -19,6 +19,7 @@
 #include <memory>
 #include <string>
 #include <unordered_map>
+#include <unordered_set>
 
 #include "kudu/gutil/macros.h"
 #include "kudu/gutil/ref_counted.h"
@@ -90,6 +91,11 @@ class TSManager {
   // replication to them (e.g. maintenance mode).
   void GetDescriptorsAvailableForPlacement(TSDescriptorVector* descs) const;
 
+  // Return any tablet servers UUIDs that can be in a failed state without
+  // counting towards under-replication (e.g. because they're in maintenance
+  // mode).
+  std::unordered_set<std::string> GetUuidsToIgnoreForUnderreplication() const;
+
   // Get the TS count.
   int GetCount() const;
 
@@ -129,7 +135,10 @@ class TSManager {
       std::string, std::shared_ptr<TSDescriptor>> TSDescriptorMap;
   TSDescriptorMap servers_by_id_;
 
+  // Protects 'ts_state_by_uuid_'. If both 'ts_state_lock_' and 'lock_' are to
+  // be taken, 'ts_state_lock_' must be taken first.
   mutable RWMutex ts_state_lock_;
+
   // Maps from the UUIDs of tablet servers to their tserver state, if any.
   // Note: the states don't necessarily belong to registered tablet servers.
   std::unordered_map<std::string, TServerStatePB> ts_state_by_uuid_;


Mime
View raw message