kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aw...@apache.org
Subject [kudu] 01/02: KUDU-2069 p1: add persistent tserver maintenance mode
Date Wed, 18 Sep 2019 22:21:41 GMT
This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit f5658f2d9cb28eedbd30ab44e57363427df877c5
Author: Andrew Wong <awong@apache.org>
AuthorDate: Sat Sep 7 23:37:02 2019 -0700

    KUDU-2069 p1: add persistent tserver maintenance mode
    
    Adds tablet server states to the master, with which to represent tablet
    server maintenance mode (and in the future, decommissioning).
    
    The on-disk state is represented as an entry in the system catalog
    table, keyed by the tserver UUID. The in-memory state is maintained by
    the TSManager as a mapping from tserver UUID to the new TServerStatePB
    enum.
    
    Note that this patch only introduces the states internally and doesn't
    expose a way for users to set them.
    
    Testing:
    - test to check that maintenance mode is maintained through a restart of
      the master
    - test to ensure that repeated setting of state does not affect the
      in-memory tserver states
    - test that concurrently updates tserver state and ensures that the
      results in-memory match with what's on-disk
    
    Change-Id: Ib669b43b3cee171c4c7dbd54041e29c30cb9f767
    Reviewed-on: http://gerrit.cloudera.org:8080/14217
    Reviewed-by: Adar Dembo <adar@cloudera.com>
    Reviewed-by: Alexey Serbin <aserbin@cloudera.com>
    Tested-by: Kudu Jenkins
---
 src/kudu/master/CMakeLists.txt     |   1 +
 src/kudu/master/catalog_manager.cc |  11 ++
 src/kudu/master/master.proto       |  21 ++++
 src/kudu/master/sys_catalog.cc     |  60 ++++++---
 src/kudu/master/sys_catalog.h      |  34 ++++++
 src/kudu/master/ts_manager.cc      |  64 +++++++++-
 src/kudu/master/ts_manager.h       |  33 ++++-
 src/kudu/master/ts_state-test.cc   | 241 +++++++++++++++++++++++++++++++++++++
 8 files changed, 444 insertions(+), 21 deletions(-)

diff --git a/src/kudu/master/CMakeLists.txt b/src/kudu/master/CMakeLists.txt
index fc6aa05..39092fd 100644
--- a/src/kudu/master/CMakeLists.txt
+++ b/src/kudu/master/CMakeLists.txt
@@ -92,6 +92,7 @@ ADD_KUDU_TEST(placement_policy-test)
 ADD_KUDU_TEST(sentry_authz_provider-test NUM_SHARDS 8)
 ADD_KUDU_TEST(sys_catalog-test RESOURCE_LOCK "master-web-port")
 ADD_KUDU_TEST(ts_descriptor-test DATA_FILES ../scripts/first_argument.sh)
+ADD_KUDU_TEST(ts_state-test)
 
 #########################################
 # kudu-master
diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc
index 08e435c..0933d89 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -1111,6 +1111,17 @@ void CatalogManager::PrepareForLeadershipTask() {
       }
     }
 
+    static const char* const kTServerStatesDescription =
+        "Initializing in-progress tserver states";
+    LOG(INFO) << kTServerStatesDescription << "...";
+    LOG_SLOW_EXECUTION(WARNING, 1000, LogPrefix() + kTServerStatesDescription) {
+      if (!check(std::bind(&TSManager::ReloadTServerStates, master_->ts_manager(),
+                           sys_catalog_.get()),
+                 *consensus, term, kTServerStatesDescription).ok()) {
+        return;
+      }
+    }
+
     if (hms_catalog_) {
       static const char* const kNotificationLogEventIdDescription =
           "Loading latest processed Hive Metastore notification log event ID";
diff --git a/src/kudu/master/master.proto b/src/kudu/master/master.proto
index 73a656d..a932232 100644
--- a/src/kudu/master/master.proto
+++ b/src/kudu/master/master.proto
@@ -224,6 +224,13 @@ message SysNotificationLogEventIdPB {
   optional int64 latest_notification_log_event_id = 1;
 }
 
+// The on-disk entry in the sys.catalog table to represent the existence of
+// on-going tserver state (e.g. maintenance mode).
+message SysTServerStateEntryPB {
+  // TODO(awong): consider adding a timestamp here.
+  optional TServerStatePB state = 1;
+}
+
 ////////////////////////////////////////////////////////////
 // RPCs
 ////////////////////////////////////////////////////////////
@@ -812,6 +819,20 @@ message ListTabletServersResponsePB {
   repeated Entry servers = 2;
 }
 
+// Representation of the state of a tablet server.
+// TODO(KUDU-1827): add state for decommissioning.
+enum TServerStatePB {
+  // Default value for backwards compatibility.
+  UNKNOWN_STATE = 0;
+
+  // No state for the tserver.
+  NONE = 1;
+
+  // New replicas are not added to the tserver, and failed replicas on the
+  // tserver are not re-replicated.
+  MAINTENANCE_MODE = 2;
+}
+
 // GetMasterRegistrationRequest/Response: get the instance id and
 // HTTP/RPC addresses for this Master server.
 message GetMasterRegistrationRequestPB {
diff --git a/src/kudu/master/sys_catalog.cc b/src/kudu/master/sys_catalog.cc
index fbc6e9e..372294b 100644
--- a/src/kudu/master/sys_catalog.cc
+++ b/src/kudu/master/sys_catalog.cc
@@ -468,20 +468,8 @@ Status SysCatalogTable::SyncWrite(const WriteRequestPB *req, WriteResponsePB
*re
   return Status::OK();
 }
 
-// Schema for the unified SysCatalogTable:
-//
-// (entry_type, entry_id) -> metadata
-//
-// entry_type is a enum defined in sys_tables. It indicates
-// whether an entry is a table or a tablet.
-//
-// entry_type is the first part of a compound key as to allow
-// efficient scans of entries of only a single type (e.g., only
-// scan all of the tables, or only scan all of the tablets).
-//
-// entry_id is either a table id or a tablet id. For tablet entries,
-// the table id that the tablet is associated with is stored in the
-// protobuf itself.
+// Schema for the unified SysCatalogTable. See the comment in the header for
+// more details.
 Schema SysCatalogTable::BuildTableSchema() {
   SchemaBuilder builder;
   CHECK_OK(builder.AddKeyColumn(kSysCatalogTableColType, INT8));
@@ -584,6 +572,15 @@ string SysCatalogTable::TskSeqNumberToEntryId(int64_t seq_number) {
   return entry_id;
 }
 
+Status SysCatalogTable::VisitTServerStates(TServerStateVisitor* visitor) {
+  TRACE_EVENT0("master", "SysCatalogTable::VisitTServerStates");
+  const auto processor = [&] (const string& entry_id,
+                              const SysTServerStateEntryPB& entry_data) {
+    return visitor->Visit(entry_id, entry_data);
+  };
+  return ProcessRows<SysTServerStateEntryPB, TSERVER_STATE>(processor);
+}
+
 Status SysCatalogTable::VisitTables(TableVisitor* visitor) {
   TRACE_EVENT0("master", "SysCatalogTable::VisitTables");
   auto processor = [&](
@@ -763,6 +760,41 @@ Status SysCatalogTable::RemoveTskEntries(const set<string>&
entry_ids) {
   return SyncWrite(&req, &resp);
 }
 
+Status SysCatalogTable::WriteTServerState(const string& tserver_id,
+                                          const SysTServerStateEntryPB& entry) {
+  DCHECK(!tserver_id.empty());
+  WriteRequestPB req;
+  req.set_tablet_id(kSysCatalogTabletId);
+  RETURN_NOT_OK(SchemaToPB(schema_, req.mutable_schema()));
+  KuduPartialRow row(&schema_);
+  RETURN_NOT_OK(row.SetInt8(kSysCatalogTableColType, TSERVER_STATE));
+  RETURN_NOT_OK(row.SetString(kSysCatalogTableColId, tserver_id));
+
+  faststring metadata_buf;
+  pb_util::SerializeToString(entry, &metadata_buf);
+  RETURN_NOT_OK(row.SetStringNoCopy(kSysCatalogTableColMetadata, metadata_buf));
+
+  RowOperationsPBEncoder enc(req.mutable_row_operations());
+  enc.Add(RowOperationsPB::INSERT, row);
+
+  WriteResponsePB resp;
+  return SyncWrite(&req, &resp);
+}
+
+Status SysCatalogTable::RemoveTServerState(const string& tserver_id) {
+  WriteRequestPB req;
+  req.set_tablet_id(kSysCatalogTabletId);
+  RowOperationsPBEncoder enc(req.mutable_row_operations());
+  RETURN_NOT_OK(SchemaToPB(schema_, req.mutable_schema()));
+  KuduPartialRow row(&schema_);
+  RETURN_NOT_OK(row.SetInt8(kSysCatalogTableColType, TSERVER_STATE));
+  RETURN_NOT_OK(row.SetStringNoCopy(kSysCatalogTableColId, tserver_id));
+  enc.Add(RowOperationsPB::DELETE, row);
+
+  WriteResponsePB resp;
+  return SyncWrite(&req, &resp);
+}
+
 // ==================================================================
 // Tablet related methods
 // ==================================================================
diff --git a/src/kudu/master/sys_catalog.h b/src/kudu/master/sys_catalog.h
index 64dd893..89b6451 100644
--- a/src/kudu/master/sys_catalog.h
+++ b/src/kudu/master/sys_catalog.h
@@ -59,6 +59,7 @@ namespace master {
 
 class Master;
 class SysCertAuthorityEntryPB;
+class SysTServerStateEntryPB;
 class SysTablesEntryPB;
 class SysTabletsEntryPB;
 class SysTskEntryPB;
@@ -92,6 +93,13 @@ class TskEntryVisitor {
                        const SysTskEntryPB& metadata) = 0;
 };
 
+class TServerStateVisitor {
+ public:
+  virtual ~TServerStateVisitor() = default;
+  virtual Status Visit(const std::string& tserver_id,
+                       const SysTServerStateEntryPB& metadata) = 0;
+};
+
 // SysCatalogTable is a Kudu table that keeps track of the following
 // system information:
 //   * table metadata
@@ -100,11 +108,26 @@ class TskEntryVisitor {
 //   * Kudu IPKI root CA cert's private key
 //   * TSK (Token Signing Key) entries
 //   * Latest handled Hive Metastore notification log event ID
+//   * tserver state (e.g. maintenance mode)
 //
 // The essential properties of the SysCatalogTable are:
 //   * SysCatalogTable has only one tablet.
 //   * SysCatalogTable is managed by the master and not exposed to the user
 //     as a "normal table", instead we have Master APIs to query the table.
+//
+// It has the schema:
+//
+//    (entry_type INT8, entry_id STRING) -> metadata STRING
+//
+//  * entry_type is one of CatalogEntryType. It indicates whether an entry is a
+//    table, tablet, or some other piece of metadata. It is the first part of
+//    the compound key to allow efficient scans of entries of only a single
+//    type (e.g. only scan all of the tables, or only scan all of the tablets).
+//  * entry_id is the ID of the entry type (e.g. table ID or tablet ID for
+//    table and tablet entries respectively).
+//  * metadata is a string containing a protobuf message specific to the
+//    entry_type. These are defined in master.proto under "Sys Tables
+//    Metadata".
 class SysCatalogTable {
  public:
   // Magic ID of the system tablet.
@@ -125,6 +148,7 @@ class SysCatalogTable {
     CERT_AUTHORITY_INFO = 3,  // Kudu's root certificate authority entry.
     TSK_ENTRY = 4,            // Token Signing Key entry.
     HMS_NOTIFICATION_LOG = 5, // HMS notification log latest event ID.
+    TSERVER_STATE = 6,        // TServer state.
   };
 
   // 'leader_cb_' is invoked whenever this node is elected as a leader
@@ -172,6 +196,9 @@ class SysCatalogTable {
   // Scan for TSK-related entries in the system table.
   Status VisitTskEntries(TskEntryVisitor* visitor);
 
+  // Scan for tserver state entries in the system table.
+  Status VisitTServerStates(TServerStateVisitor* visitor);
+
   // Get the latest processed HMS notification log event ID.
   Status GetLatestNotificationLogEventId(int64_t* event_id) WARN_UNUSED_RESULT;
 
@@ -190,6 +217,13 @@ class SysCatalogTable {
   // entry identifiers must not be empty.
   Status RemoveTskEntries(const std::set<std::string>& entry_ids);
 
+  // Add a tserver state entry to the system table.
+  Status WriteTServerState(const std::string& tserver_id,
+                           const SysTServerStateEntryPB& entry);
+
+  // Remove a tserver state entry from the system table.
+  Status RemoveTServerState(const std::string& tserver_id);
+
   // Return the underlying TabletReplica instance hosting the metadata.
   // This should be used with caution -- typically the various methods
   // above should be used rather than directly accessing the replica.
diff --git a/src/kudu/master/ts_manager.cc b/src/kudu/master/ts_manager.cc
index f7f8bbf..6eef6c1 100644
--- a/src/kudu/master/ts_manager.cc
+++ b/src/kudu/master/ts_manager.cc
@@ -33,6 +33,7 @@
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/master/location_cache.h"
+#include "kudu/master/sys_catalog.h"
 #include "kudu/master/ts_descriptor.h"
 #include "kudu/util/flag_tags.h"
 #include "kudu/util/logging.h"
@@ -55,6 +56,7 @@ METRIC_DEFINE_gauge_int32(server, cluster_replica_skew,
                           "the least replicas.");
 
 using kudu::pb_util::SecureShortDebugString;
+using std::lock_guard;
 using std::shared_ptr;
 using std::string;
 using strings::Substitute;
@@ -62,9 +64,32 @@ using strings::Substitute;
 namespace kudu {
 namespace master {
 
+class TServerStateLoader : public TServerStateVisitor {
+ public:
+  explicit TServerStateLoader(TSManager* ts_manager)
+      : ts_manager_(ts_manager) {}
+
+  Status Visit(const std::string& tserver_id,
+               const SysTServerStateEntryPB& metadata) override {
+    ts_manager_->ts_state_lock_.AssertAcquiredForWriting();
+    TServerStatePB state = metadata.state();
+    if (state == TServerStatePB::UNKNOWN_STATE) {
+      LOG(WARNING) << Substitute("ignoring unknown tserver state: $0", metadata.state());
+      return Status::OK();
+    }
+    DCHECK_NE(TServerStatePB::NONE, state);
+    InsertOrDie(&ts_manager_->ts_state_by_uuid_, tserver_id, state);
+    return Status::OK();
+  }
+
+ private:
+  TSManager* ts_manager_;
+};
+
 TSManager::TSManager(LocationCache* location_cache,
                      const scoped_refptr<MetricEntity>& metric_entity)
-    : location_cache_(location_cache) {
+    : ts_state_lock_(RWMutex::Priority::PREFER_READING),
+      location_cache_(location_cache) {
   METRIC_cluster_replica_skew.InstantiateFunctionGauge(
       metric_entity,
       Bind(&TSManager::ClusterSkew, Unretained(this)))
@@ -145,7 +170,7 @@ Status TSManager::RegisterTS(const NodeInstancePB& instance,
   shared_ptr<TSDescriptor> descriptor;
   bool new_tserver = false;
   {
-    std::lock_guard<rw_spinlock> l(lock_);
+    lock_guard<rw_spinlock> l(lock_);
     auto* descriptor_ptr = FindOrNull(servers_by_id_, uuid);
     if (descriptor_ptr) {
       descriptor = *descriptor_ptr;
@@ -190,6 +215,41 @@ int TSManager::GetCount() const {
   return servers_by_id_.size();
 }
 
+Status TSManager::SetTServerState(const string& ts_uuid,
+                                  TServerStatePB ts_state,
+                                  SysCatalogTable* sys_catalog) {
+  lock_guard<RWMutex> l(ts_state_lock_);
+  auto existing_state = FindWithDefault(ts_state_by_uuid_, ts_uuid, TServerStatePB::NONE);
+  if (existing_state == ts_state) {
+    return Status::OK();
+  }
+  if (ts_state == TServerStatePB::NONE) {
+    RETURN_NOT_OK_PREPEND(sys_catalog->RemoveTServerState(ts_uuid),
+        Substitute("Failed to remove tserver state for $0", ts_uuid));
+    ts_state_by_uuid_.erase(ts_uuid);
+    return Status::OK();
+  }
+  SysTServerStateEntryPB pb;
+  pb.set_state(ts_state);
+  RETURN_NOT_OK_PREPEND(sys_catalog->WriteTServerState(ts_uuid, pb),
+      Substitute("Failed to set tserver state for $0 to $1",
+                 ts_uuid, TServerStatePB_Name(ts_state)));
+  InsertOrUpdate(&ts_state_by_uuid_, ts_uuid, ts_state);
+  return Status::OK();
+}
+
+TServerStatePB TSManager::GetTServerState(const string& ts_uuid) const {
+  shared_lock<RWMutex> l(ts_state_lock_);
+  return FindWithDefault(ts_state_by_uuid_, ts_uuid, TServerStatePB::NONE);
+}
+
+Status TSManager::ReloadTServerStates(SysCatalogTable* sys_catalog) {
+  lock_guard<RWMutex> l(ts_state_lock_);
+  ts_state_by_uuid_ = {};
+  TServerStateLoader loader(this);
+  return sys_catalog->VisitTServerStates(&loader);
+}
+
 int TSManager::ClusterSkew() const {
   int min_count = std::numeric_limits<int>::max();
   int max_count = 0;
diff --git a/src/kudu/master/ts_manager.h b/src/kudu/master/ts_manager.h
index 359432a..cd803fa 100644
--- a/src/kudu/master/ts_manager.h
+++ b/src/kudu/master/ts_manager.h
@@ -14,8 +14,7 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-#ifndef KUDU_MASTER_TS_MANAGER_H
-#define KUDU_MASTER_TS_MANAGER_H
+#pragma once
 
 #include <memory>
 #include <string>
@@ -23,9 +22,11 @@
 
 #include "kudu/gutil/macros.h"
 #include "kudu/gutil/ref_counted.h"
+#include "kudu/master/master.pb.h"
 #include "kudu/master/ts_descriptor.h"
 #include "kudu/util/locks.h"
 #include "kudu/util/metrics.h"
+#include "kudu/util/rw_mutex.h"
 #include "kudu/util/status.h"
 
 namespace kudu {
@@ -37,6 +38,7 @@ class ServerRegistrationPB;
 namespace master {
 
 class LocationCache;
+class SysCatalogTable;
 
 // Tracks the servers that the master has heard from, along with their
 // last heartbeat, etc.
@@ -68,7 +70,7 @@ class TSManager {
   // Returns false if the TS has never registered.
   // Otherwise, *desc is set and returns true.
   bool LookupTSByUUID(const std::string& uuid,
-                        std::shared_ptr<TSDescriptor>* desc) const;
+                      std::shared_ptr<TSDescriptor>* desc) const;
 
   // Register or re-register a tablet server with the manager.
   //
@@ -89,17 +91,39 @@ class TSManager {
   // Get the TS count.
   int GetCount() const;
 
+  // Sets the tserver state for the given tserver, persisting it to disk.
+  Status SetTServerState(const std::string& ts_uuid,
+                         TServerStatePB ts_state,
+                         SysCatalogTable* sys_catalog);
+
+  // Return the tserver state for the given tablet server UUID, or NONE if one
+  // doesn't exist.
+  TServerStatePB GetTServerState(const std::string& ts_uuid) const;
+
+  // Resets the tserver states and reloads them from disk.
+  Status ReloadTServerStates(SysCatalogTable* sys_catalog);
+
  private:
+  friend class TServerStateLoader;
+
   int ClusterSkew() const;
 
   mutable rw_spinlock lock_;
 
   FunctionGaugeDetacher metric_detacher_;
 
+  // TODO(awong): add a map from HostPort to descriptor so we aren't forced to
+  // know UUIDs up front, e.g. if specifying a given tablet server for
+  // maintenance mode, it'd be easier for users to specify the HostPort.
   typedef std::unordered_map<
-    std::string, std::shared_ptr<TSDescriptor>> TSDescriptorMap;
+      std::string, std::shared_ptr<TSDescriptor>> TSDescriptorMap;
   TSDescriptorMap servers_by_id_;
 
+  mutable RWMutex ts_state_lock_;
+  // Maps from the UUIDs of tablet servers to their tserver state, if any.
+  // Note: the states don't necessarily belong to registered tablet servers.
+  std::unordered_map<std::string, TServerStatePB> ts_state_by_uuid_;
+
   LocationCache* location_cache_;
 
   DISALLOW_COPY_AND_ASSIGN(TSManager);
@@ -108,4 +132,3 @@ class TSManager {
 } // namespace master
 } // namespace kudu
 
-#endif
diff --git a/src/kudu/master/ts_state-test.cc b/src/kudu/master/ts_state-test.cc
new file mode 100644
index 0000000..c0c45ab
--- /dev/null
+++ b/src/kudu/master/ts_state-test.cc
@@ -0,0 +1,241 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdlib>
+#include <memory>
+#include <string>
+#include <thread>
+#include <vector>
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/common/common.pb.h"
+#include "kudu/common/wire_protocol.pb.h"
+#include "kudu/consensus/replica_management.pb.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/master/catalog_manager.h"
+#include "kudu/master/master.h"
+#include "kudu/master/master.pb.h"
+#include "kudu/master/master.proxy.h"
+#include "kudu/master/mini_master.h"
+#include "kudu/master/ts_manager.h"
+#include "kudu/rpc/messenger.h"
+#include "kudu/rpc/rpc_controller.h"
+#include "kudu/util/barrier.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/net/net_util.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+
+using kudu::consensus::ReplicaManagementInfoPB;
+using kudu::rpc::Messenger;
+using kudu::rpc::MessengerBuilder;
+using kudu::rpc::RpcController;
+using std::shared_ptr;
+using std::string;
+using std::thread;
+using std::unique_ptr;
+using std::vector;
+using strings::Substitute;
+
+namespace kudu {
+namespace master {
+
+namespace {
+const char* kTServer = "tserver";
+
+TServerStatePB PickRandomState() {
+  switch (rand() % 2) {
+    case 0:
+      return TServerStatePB::NONE;
+    case 1:
+      return TServerStatePB::MAINTENANCE_MODE;
+  }
+  return TServerStatePB::NONE;
+}
+
+} // anonymous namespace
+
+class TServerStateTest : public KuduTest {
+ public:
+  void SetUp() override {
+    KuduTest::SetUp();
+    NO_FATALS(ResetMaster());
+  }
+
+  void TearDown() override {
+    mini_master_->Shutdown();
+    KuduTest::TearDown();
+  }
+
+  // Restarts the master and resets any affiliated references to it.
+  void ResetMaster() {
+    mini_master_.reset(new MiniMaster(GetTestPath("master"),
+                                      HostPort("127.0.0.1", 0)));
+    ASSERT_OK(mini_master_->Start());
+    Master* master = mini_master_->master();
+    ASSERT_OK(master->WaitUntilCatalogManagerIsLeaderAndReadyForTests(
+        MonoDelta::FromSeconds(30)));
+    ts_manager_ = master->ts_manager();
+    MessengerBuilder builder("client");
+    ASSERT_OK(builder.Build(&client_messenger_));
+    proxy_.reset(new MasterServiceProxy(client_messenger_,
+                                        mini_master_->bound_rpc_addr(),
+                                        mini_master_->bound_rpc_addr().host()));
+  }
+
+  // Sets the tserver state for the given tablet server to 'state'.
+  Status SetTServerState(const string& tserver_uuid, TServerStatePB state) {
+    Master* master = mini_master_->master();
+    return master->ts_manager()->SetTServerState(
+        tserver_uuid, state, master->catalog_manager()->sys_catalog());
+  }
+
+  // Pretends to be a tserver by sending a heartbeat to the master from the
+  // given tserver.
+  Status SendHeartbeat(const string& tserver) {
+    TSHeartbeatRequestPB req;
+    NodeInstancePB* ts_instance = req.mutable_common()->mutable_ts_instance();
+    ts_instance->set_permanent_uuid(tserver);
+    ts_instance->set_instance_seqno(0);
+    req.mutable_replica_management_info()->set_replacement_scheme(
+        ReplicaManagementInfoPB::PREPARE_REPLACEMENT_BEFORE_EVICTION);
+    ServerRegistrationPB* reg = req.mutable_registration();
+    HostPortPB* rpc_hp = reg->add_rpc_addresses();
+    rpc_hp->set_host(Substitute("$0-host", tserver));
+    rpc_hp->set_port(7051);
+    TabletReportPB* tablet_report = req.mutable_tablet_report();
+    tablet_report->set_sequence_number(0);
+    tablet_report->set_is_incremental(false);
+
+    RpcController rpc;
+    TSHeartbeatResponsePB resp_pb;
+    return proxy_->TSHeartbeat(req, &resp_pb, &rpc);
+  }
+
+ protected:
+  unique_ptr<MiniMaster> mini_master_;
+  TSManager* ts_manager_;
+  unique_ptr<MasterServiceProxy> proxy_;
+  shared_ptr<Messenger> client_messenger_;
+};
+
+// Basic test that sets some tserver states for registered and unregistered
+// tablet servers, and checks that they still exist after restarting.
+TEST_F(TServerStateTest, TestReloadTServerState) {
+  // Sanity check that we've got no tservers yet.
+  ASSERT_EQ(0, ts_manager_->GetCount());
+
+  // Register a tserver and then set maintenance mode.
+  ASSERT_OK(SendHeartbeat(kTServer));
+  ASSERT_EQ(1, ts_manager_->GetCount());
+  ASSERT_OK(SetTServerState(kTServer, TServerStatePB::MAINTENANCE_MODE));
+  ASSERT_EQ(TServerStatePB::MAINTENANCE_MODE, ts_manager_->GetTServerState(kTServer));
+
+  // Restart the master; the maintenance mode should still be there, even if
+  // the tablet server hasn't re-registered.
+  NO_FATALS(ResetMaster());
+  ASSERT_EQ(TServerStatePB::MAINTENANCE_MODE, ts_manager_->GetTServerState(kTServer));
+  ASSERT_EQ(0, ts_manager_->GetCount());
+
+  // When the tserver registers, maintenance mode should still be there.
+  ASSERT_OK(SendHeartbeat(kTServer));
+  ASSERT_EQ(1, ts_manager_->GetCount());
+
+  // When maintenance mode is turned off, this should be reflected on the
+  // master, even after another restart.
+  ASSERT_OK(SetTServerState(kTServer, TServerStatePB::NONE));
+  ASSERT_EQ(TServerStatePB::NONE, ts_manager_->GetTServerState(kTServer));
+  NO_FATALS(ResetMaster());
+  ASSERT_EQ(TServerStatePB::NONE, ts_manager_->GetTServerState(kTServer));
+}
+
+// Test that setting tserver states that are already set end up as no-ops for
+// both registered and unregistered tservers.
+TEST_F(TServerStateTest, TestRepeatedTServerStates) {
+  // We should start out with no tserver states.
+  ASSERT_EQ(TServerStatePB::NONE, ts_manager_->GetTServerState(kTServer));
+
+  // And "exiting" the tserver state should be a no-op.
+  ASSERT_OK(SetTServerState(kTServer, TServerStatePB::NONE));
+  ASSERT_EQ(TServerStatePB::NONE, ts_manager_->GetTServerState(kTServer));
+
+  // The same should be true for entering a tserver state.
+  ASSERT_OK(SetTServerState(kTServer, TServerStatePB::MAINTENANCE_MODE));
+  ASSERT_EQ(TServerStatePB::MAINTENANCE_MODE, ts_manager_->GetTServerState(kTServer));
+  ASSERT_OK(SetTServerState(kTServer, TServerStatePB::MAINTENANCE_MODE));
+  ASSERT_EQ(TServerStatePB::MAINTENANCE_MODE, ts_manager_->GetTServerState(kTServer));
+
+  // Now do the same thing with the tserver registered.
+  ASSERT_OK(SetTServerState(kTServer, TServerStatePB::NONE));
+  ASSERT_OK(SendHeartbeat(kTServer));
+  ASSERT_EQ(1, ts_manager_->GetCount());
+  ASSERT_EQ(TServerStatePB::NONE, ts_manager_->GetTServerState(kTServer));
+
+  // Exiting the tserver state should be a no-op.
+  ASSERT_OK(SetTServerState(kTServer, TServerStatePB::NONE));
+  ASSERT_EQ(TServerStatePB::NONE, ts_manager_->GetTServerState(kTServer));
+
+  // Setting maintenance mode on a tserver that's already in maintenance mode
+  // should also no-op.
+  ASSERT_OK(SetTServerState(kTServer, TServerStatePB::MAINTENANCE_MODE));
+  ASSERT_EQ(TServerStatePB::MAINTENANCE_MODE, ts_manager_->GetTServerState(kTServer));
+  ASSERT_OK(SetTServerState(kTServer, TServerStatePB::MAINTENANCE_MODE));
+  ASSERT_EQ(TServerStatePB::MAINTENANCE_MODE, ts_manager_->GetTServerState(kTServer));
+}
+
+// Test that setting both in-memory and on-disk tserver state is atomic.
+TEST_F(TServerStateTest, TestConcurrentSetTServerState) {
+  const int kNumTServers = 10;
+  const int kNumThreadsPerTServer = 10;
+  vector<thread> threads;
+  vector<string> tservers(kNumTServers);
+  for (int i = 0; i < kNumTServers; i++) {
+    tservers[i] = Substitute("$0-$1", kTServer, i);
+  }
+  // Spin up a bunch of threads that contend for setting the state for a
+  // limited number of tablet servers.
+  Barrier b(kNumThreadsPerTServer * kNumTServers);
+  for (int i = 0; i < kNumThreadsPerTServer; i++) {
+    for (const auto& ts : tservers) {
+      threads.emplace_back([&, ts] {
+        b.Wait();
+        CHECK_OK(SetTServerState(ts, PickRandomState()));
+      });
+    }
+  }
+  for (auto& t : threads) {
+    t.join();
+  }
+  // Ensure that the in-memory state matches the on-disk state by reloading the
+  // master state from disk and checking it against the original in-memory
+  // state.
+  vector<TServerStatePB> in_memory_states(tservers.size());
+  for (int i = 0; i < tservers.size(); i++) {
+    in_memory_states[i] = ts_manager_->GetTServerState(tservers[i]);
+  }
+  NO_FATALS(ResetMaster());
+  for (int i = 0; i < tservers.size(); i++) {
+    ASSERT_EQ(in_memory_states[i], ts_manager_->GetTServerState(tservers[i]));
+  }
+}
+
+} // namespace master
+} // namespace kudu


Mime
View raw message