kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a...@apache.org
Subject [2/9] kudu git commit: KUDU-1358 (part 1): master should accept heartbeat even if follower
Date Mon, 01 Aug 2016 22:11:39 GMT
KUDU-1358 (part 1): master should accept heartbeat even if follower

This patch changes the master's heartbeat acceptance code so that heartbeats
are not rejected outright if the master is a follower. To be specific,
tablet reports are ignored, but heartbeats are processed just enough to warm
the TSDescriptor cache. That way, if this master is elected leader, it can
respond to a CreateTable() even before the first round of heartbeats.

I reduced the complexity of the "should this tserver register or send a full
tablet report?" dance by removing TSDescriptor.has_tablet_report_. It was
used to guarantee a full tablet report in the event that 1) the tserver is
sending incremental tablet reports, and 2) the master has already registered
the tserver. I don't think this exact sequence of events is actually
possible; the only way a master can "lose" a cached TSDescriptor is if the
master is restarted, at which point it loses the tserver registration too.
Plus, all the unit tests passed (in slow mode).

I also snuck in a fix to TSManager::RegisterTS: it wasn't actually returning
a TSDescriptor in its out parameter.

Change-Id: I578674927b65b4171e8437de8515130e4a0ed139
Reviewed-on: http://gerrit.cloudera.org:8080/3609
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <todd@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/8618ae2d
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/8618ae2d
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/8618ae2d

Branch: refs/heads/master
Commit: 8618ae2d6ce867879fe550c107f6178b167d507f
Parents: 8d89d5f
Author: Adar Dembo <adar@cloudera.com>
Authored: Fri Jun 10 16:42:58 2016 -0700
Committer: Todd Lipcon <todd@apache.org>
Committed: Sun Jul 31 23:44:58 2016 +0000

----------------------------------------------------------------------
 src/kudu/integration-tests/alter_table-test.cc  |  1 -
 .../master_replication-itest.cc                 | 49 +++++++++--
 src/kudu/integration-tests/mini_cluster.cc      | 35 +++++---
 src/kudu/integration-tests/mini_cluster.h       | 16 +++-
 src/kudu/integration-tests/registration-test.cc |  6 +-
 .../integration-tests/table_locations-itest.cc  |  3 +-
 src/kudu/master/catalog_manager.cc              |  8 --
 src/kudu/master/catalog_manager.h               | 24 +++++-
 src/kudu/master/master-test.cc                  | 70 ++++++++++++++--
 src/kudu/master/master_service.cc               | 86 ++++++++------------
 src/kudu/master/ts_descriptor.cc                | 19 +----
 src/kudu/master/ts_descriptor.h                 | 10 +--
 src/kudu/master/ts_manager.cc                   | 18 ++--
 13 files changed, 223 insertions(+), 122 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/8618ae2d/src/kudu/integration-tests/alter_table-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/alter_table-test.cc b/src/kudu/integration-tests/alter_table-test.cc
index 8c97a39..85321ff 100644
--- a/src/kudu/integration-tests/alter_table-test.cc
+++ b/src/kudu/integration-tests/alter_table-test.cc
@@ -108,7 +108,6 @@ class AlterTableTest : public KuduTest {
     opts.num_tablet_servers = num_replicas();
     cluster_.reset(new MiniCluster(env_.get(), opts));
     ASSERT_OK(cluster_->Start());
-    ASSERT_OK(cluster_->WaitForTabletServerCount(num_replicas()));
 
     CHECK_OK(KuduClientBuilder()
              .add_master_server_addr(cluster_->mini_master()->bound_rpc_addr_str())

http://git-wip-us.apache.org/repos/asf/kudu/blob/8618ae2d/src/kudu/integration-tests/master_replication-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/master_replication-itest.cc b/src/kudu/integration-tests/master_replication-itest.cc
index 6c5c429..47e47ea 100644
--- a/src/kudu/integration-tests/master_replication-itest.cc
+++ b/src/kudu/integration-tests/master_replication-itest.cc
@@ -26,7 +26,10 @@
 #include "kudu/integration-tests/mini_cluster.h"
 #include "kudu/master/catalog_manager.h"
 #include "kudu/master/master.h"
+#include "kudu/master/master.proxy.h"
 #include "kudu/master/mini_master.h"
+#include "kudu/rpc/messenger.h"
+#include "kudu/rpc/rpc_controller.h"
 #include "kudu/util/test_util.h"
 
 using std::vector;
@@ -65,7 +68,6 @@ class MasterReplicationTest : public KuduTest {
     KuduTest::SetUp();
     cluster_.reset(new MiniCluster(env_.get(), opts_));
     ASSERT_OK(cluster_->Start());
-    ASSERT_OK(cluster_->WaitForTabletServerCount(kNumTabletServerReplicas));
   }
 
   virtual void TearDown() OVERRIDE {
@@ -79,7 +81,6 @@ class MasterReplicationTest : public KuduTest {
   Status RestartCluster() {
     cluster_->Shutdown();
     RETURN_NOT_OK(cluster_->Start());
-    RETURN_NOT_OK(cluster_->WaitForTabletServerCount(kNumTabletServerReplicas));
     return Status::OK();
   }
 
@@ -89,7 +90,6 @@ class MasterReplicationTest : public KuduTest {
     SleepFor(MonoDelta::FromMilliseconds(millis));
     LOG(INFO) << "Attempting to start the cluster...";
     CHECK_OK(cluster_->Start());
-    CHECK_OK(cluster_->WaitForTabletServerCount(kNumTabletServerReplicas));
   }
 
   void ListMasterServerAddrs(vector<string>* out) {
@@ -152,8 +152,6 @@ TEST_F(MasterReplicationTest, TestSysTablesReplication) {
   ASSERT_OK(CreateClient(&client));
   ASSERT_OK(CreateTable(client, kTableId1));
 
-  ASSERT_OK(cluster_->WaitForTabletServerCount(kNumTabletServerReplicas));
-
   // Repeat the same for the second table.
   ASSERT_OK(CreateTable(client, kTableId2));
   ASSERT_NO_FATAL_FAILURE(VerifyTableExists(kTableId2));
@@ -211,5 +209,46 @@ TEST_F(MasterReplicationTest, TestCycleThroughAllMasters) {
   ASSERT_OK(ThreadJoiner(start_thread.get()).Join());
 }
 
+// Test that every master accepts heartbeats, and that a heartbeat to any
+// master updates its TSDescriptor cache.
+TEST_F(MasterReplicationTest, TestHeartbeatAcceptedByAnyMaster) {
+  // Register a fake tserver with every master.
+  TSToMasterCommonPB common;
+  common.mutable_ts_instance()->set_permanent_uuid("fake-ts-uuid");
+  common.mutable_ts_instance()->set_instance_seqno(1);
+  TSRegistrationPB fake_reg;
+  HostPortPB* pb = fake_reg.add_rpc_addresses();
+  pb->set_host("localhost");
+  pb->set_port(1000);
+  pb = fake_reg.add_http_addresses();
+  pb->set_host("localhost");
+  pb->set_port(2000);
+  std::shared_ptr<rpc::Messenger> messenger;
+  rpc::MessengerBuilder bld("Client");
+  ASSERT_OK(bld.Build(&messenger));
+  for (int i = 0; i < cluster_->num_masters(); i++) {
+    TSHeartbeatRequestPB req;
+    TSHeartbeatResponsePB resp;
+    rpc::RpcController rpc;
+
+    req.mutable_common()->CopyFrom(common);
+    req.mutable_registration()->CopyFrom(fake_reg);
+
+    MasterServiceProxy proxy(messenger,
+                             cluster_->mini_master(i)->bound_rpc_addr());
+
+    // All masters (including followers) should accept the heartbeat.
+    ASSERT_OK(proxy.TSHeartbeat(req, &resp, &rpc));
+    SCOPED_TRACE(resp.DebugString());
+    ASSERT_FALSE(resp.has_error());
+  }
+
+  // Now each master should have four registered tservers.
+  vector<std::shared_ptr<TSDescriptor>> descs;
+  ASSERT_OK(cluster_->WaitForTabletServerCount(
+      kNumTabletServerReplicas + 1,
+      MiniCluster::MatchMode::DO_NOT_MATCH_TSERVERS, &descs));
+}
+
 } // namespace master
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/8618ae2d/src/kudu/integration-tests/mini_cluster.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/mini_cluster.cc b/src/kudu/integration-tests/mini_cluster.cc
index 2b5ef67..62e4531 100644
--- a/src/kudu/integration-tests/mini_cluster.cc
+++ b/src/kudu/integration-tests/mini_cluster.cc
@@ -264,12 +264,13 @@ Status MiniCluster::WaitForReplicaCount(const string& tablet_id,
 }
 
 Status MiniCluster::WaitForTabletServerCount(int count) {
-  vector<shared_ptr<master::TSDescriptor> > descs;
-  return WaitForTabletServerCount(count, &descs);
+  vector<shared_ptr<master::TSDescriptor>> descs;
+  return WaitForTabletServerCount(count, MatchMode::MATCH_TSERVERS, &descs);
 }
 
 Status MiniCluster::WaitForTabletServerCount(int count,
-                                             vector<shared_ptr<TSDescriptor> >*
descs) {
+                                             MatchMode mode,
+                                             vector<shared_ptr<TSDescriptor>>*
descs) {
   Stopwatch sw;
   sw.start();
   while (sw.elapsed().wall_seconds() < kRegistrationWaitTimeSeconds) {
@@ -279,15 +280,27 @@ Status MiniCluster::WaitForTabletServerCount(int count,
       // Do a second step of verification to verify that the descs that we got
       // are aligned (same uuid/seqno) with the TSs that we have in the cluster.
       int match_count = 0;
-      for (const shared_ptr<TSDescriptor>& desc : *descs) {
-        for (auto mini_tablet_server : mini_tablet_servers_) {
-          auto ts = mini_tablet_server->server();
-          if (ts->instance_pb().permanent_uuid() == desc->permanent_uuid() &&
-              ts->instance_pb().instance_seqno() == desc->latest_seqno()) {
-            match_count++;
-            break;
+      switch (mode) {
+        case MatchMode::MATCH_TSERVERS:
+          // GetAllDescriptors() may return servers that are no longer online.
+          // Do a second step of verification to verify that the descs that we got
+          // are aligned (same uuid/seqno) with the TSs that we have in the cluster.
+          for (const shared_ptr<TSDescriptor>& desc : *descs) {
+            for (auto mini_tablet_server : mini_tablet_servers_) {
+              auto ts = mini_tablet_server->server();
+              if (ts->instance_pb().permanent_uuid() == desc->permanent_uuid() &&
+                  ts->instance_pb().instance_seqno() == desc->latest_seqno()) {
+                match_count++;
+                break;
+              }
+            }
           }
-        }
+          break;
+        case MatchMode::DO_NOT_MATCH_TSERVERS:
+          match_count = descs->size();
+          break;
+        default:
+          LOG(FATAL) << "Invalid match mode";
       }
 
       if (match_count == count) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/8618ae2d/src/kudu/integration-tests/mini_cluster.h
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/mini_cluster.h b/src/kudu/integration-tests/mini_cluster.h
index e682316..12a5211 100644
--- a/src/kudu/integration-tests/mini_cluster.h
+++ b/src/kudu/integration-tests/mini_cluster.h
@@ -141,9 +141,21 @@ class MiniCluster {
   // Wait until the number of registered tablet servers reaches the given
   // count. Returns Status::TimedOut if the desired count is not achieved
   // within kRegistrationWaitTimeSeconds.
+  enum class MatchMode {
+    // Ensure that the tservers retrieved from each master match up against the
+    // tservers defined in this cluster. The matching is done via
+    // NodeInstancePBs comparisons. If even one match fails, the retrieved
+    // response is considered to be malformed and is retried.
+    //
+    // Note: tservers participate in matching even if they are shut down.
+    MATCH_TSERVERS,
+
+    // Do not perform any matching on the retrieved tservers.
+    DO_NOT_MATCH_TSERVERS,
+  };
   Status WaitForTabletServerCount(int count);
-  Status WaitForTabletServerCount(int count,
-                                  std::vector<std::shared_ptr<master::TSDescriptor>
>* descs);
+  Status WaitForTabletServerCount(int count, MatchMode mode,
+                                  std::vector<std::shared_ptr<master::TSDescriptor>>*
descs);
 
   // Create a client configured to talk to this cluster. Builder may contain
   // override options for the client. The master address will be overridden to

http://git-wip-us.apache.org/repos/asf/kudu/blob/8618ae2d/src/kudu/integration-tests/registration-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/registration-test.cc b/src/kudu/integration-tests/registration-test.cc
index b0a1497..9eeca5f 100644
--- a/src/kudu/integration-tests/registration-test.cc
+++ b/src/kudu/integration-tests/registration-test.cc
@@ -96,7 +96,8 @@ class RegistrationTest : public KuduTest {
 TEST_F(RegistrationTest, TestTSRegisters) {
   // Wait for the TS to register.
   vector<shared_ptr<TSDescriptor> > descs;
-  ASSERT_OK(cluster_->WaitForTabletServerCount(1, &descs));
+  ASSERT_OK(cluster_->WaitForTabletServerCount(
+      1, MiniCluster::MatchMode::MATCH_TSERVERS, &descs));
   ASSERT_EQ(1, descs.size());
 
   // Verify that the registration is sane.
@@ -123,7 +124,6 @@ TEST_F(RegistrationTest, TestTSRegisters) {
 
 // Test starting multiple tablet servers and ensuring they both register with the master.
 TEST_F(RegistrationTest, TestMultipleTS) {
-  ASSERT_OK(cluster_->WaitForTabletServerCount(1));
   ASSERT_OK(cluster_->AddTabletServer());
   ASSERT_OK(cluster_->WaitForTabletServerCount(2));
 }
@@ -135,8 +135,6 @@ TEST_F(RegistrationTest, TestTabletReports) {
   string tablet_id_1;
   string tablet_id_2;
 
-  ASSERT_OK(cluster_->WaitForTabletServerCount(1));
-
   MiniTabletServer* ts = cluster_->mini_tablet_server(0);
   string ts_root = cluster_->GetTabletServerFsRoot(0);
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/8618ae2d/src/kudu/integration-tests/table_locations-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/table_locations-itest.cc b/src/kudu/integration-tests/table_locations-itest.cc
index 6dfbe56..3f90b18 100644
--- a/src/kudu/integration-tests/table_locations-itest.cc
+++ b/src/kudu/integration-tests/table_locations-itest.cc
@@ -61,13 +61,12 @@ class TableLocationsTest : public KuduTest {
 
     cluster_.reset(new MiniCluster(env_.get(), opts));
     ASSERT_OK(cluster_->Start());
-    ASSERT_OK(cluster_->WaitForTabletServerCount(kNumTabletServers));
 
     // Create a client proxy to the master.
     MessengerBuilder bld("Client");
     ASSERT_OK(bld.Build(&client_messenger_));
     proxy_.reset(new MasterServiceProxy(client_messenger_,
-                                        cluster_->leader_mini_master()->bound_rpc_addr()));
+                                        cluster_->mini_master()->bound_rpc_addr()));
   }
 
   void TearDown() override {

http://git-wip-us.apache.org/repos/asf/kudu/blob/8618ae2d/src/kudu/master/catalog_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc
index 6568637..7642ddd 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -1485,12 +1485,6 @@ Status CatalogManager::ProcessTabletReport(TSDescriptor* ts_desc,
     VLOG(2) << "Received tablet report from " <<
       RequestorString(rpc) << ": " << report.DebugString();
   }
-  if (!ts_desc->has_tablet_report() && report.is_incremental()) {
-    string msg = "Received an incremental tablet report when a full one was needed";
-    LOG(WARNING) << "Invalid tablet report from " << RequestorString(rpc) <<
": "
-                 << msg;
-    return Status::IllegalState(msg);
-  }
 
   // TODO: on a full tablet report, we may want to iterate over the tablets we think
   // the server should have, compare vs the ones being reported, and somehow mark
@@ -1503,8 +1497,6 @@ Status CatalogManager::ProcessTabletReport(TSDescriptor* ts_desc,
                           Substitute("Error handling $0", reported.ShortDebugString()));
   }
 
-  ts_desc->set_has_tablet_report(true);
-
   if (report.updated_tablets_size() > 0) {
     background_tasks_->WakeIfHasPendingUpdates();
   }

http://git-wip-us.apache.org/repos/asf/kudu/blob/8618ae2d/src/kudu/master/catalog_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/master/catalog_manager.h b/src/kudu/master/catalog_manager.h
index 0196d8d..88a941b 100644
--- a/src/kudu/master/catalog_manager.h
+++ b/src/kudu/master/catalog_manager.h
@@ -344,6 +344,29 @@ class CatalogManager : public tserver::TabletPeerLookupIf {
     shared_lock<RWMutex> leader_shared_lock_;
     Status catalog_status_;
     Status leader_status_;
+
+    DISALLOW_COPY_AND_ASSIGN(ScopedLeaderSharedLock);
+  };
+
+  // Temporarily forces the catalog manager to be a follower. Only for tests!
+  class ScopedLeaderDisablerForTests {
+   public:
+
+    explicit ScopedLeaderDisablerForTests(CatalogManager* catalog)
+        : catalog_(catalog),
+        old_leader_ready_term_(catalog->leader_ready_term_) {
+      catalog_->leader_ready_term_ = -1;
+    }
+
+    ~ScopedLeaderDisablerForTests() {
+      catalog_->leader_ready_term_ = old_leader_ready_term_;
+    }
+
+   private:
+    CatalogManager* catalog_;
+    int64_t old_leader_ready_term_;
+
+    DISALLOW_COPY_AND_ASSIGN(ScopedLeaderDisablerForTests);
   };
 
   explicit CatalogManager(Master *master);
@@ -678,7 +701,6 @@ class CatalogManager : public tserver::TabletPeerLookupIf {
   std::unordered_set<std::string> reserved_table_names_;
 
   Master *master_;
-  Atomic32 closing_;
   ObjectIdGenerator oid_generator_;
 
   // Random number generator used for selecting replica locations.

http://git-wip-us.apache.org/repos/asf/kudu/blob/8618ae2d/src/kudu/master/master-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/master-test.cc b/src/kudu/master/master-test.cc
index 87fb301..8775af1 100644
--- a/src/kudu/master/master-test.cc
+++ b/src/kudu/master/master-test.cc
@@ -130,8 +130,10 @@ TEST_F(MasterTest, TestRegisterAndHeartbeat) {
     req.mutable_common()->CopyFrom(common);
     ASSERT_OK(proxy_->TSHeartbeat(req, &resp, &rpc));
 
+    ASSERT_TRUE(resp.leader_master());
     ASSERT_TRUE(resp.needs_reregister());
     ASSERT_TRUE(resp.needs_full_tablet_report());
+    ASSERT_FALSE(resp.has_tablet_report());
   }
 
   vector<shared_ptr<TSDescriptor> > descs;
@@ -154,11 +156,12 @@ TEST_F(MasterTest, TestRegisterAndHeartbeat) {
     req.mutable_registration()->CopyFrom(fake_reg);
     ASSERT_OK(proxy_->TSHeartbeat(req, &resp, &rpc));
 
+    ASSERT_TRUE(resp.leader_master());
     ASSERT_FALSE(resp.needs_reregister());
-    ASSERT_TRUE(resp.needs_full_tablet_report());
+    ASSERT_FALSE(resp.needs_full_tablet_report());
+    ASSERT_FALSE(resp.has_tablet_report());
   }
 
-  descs.clear();
   master_->ts_manager()->GetAllDescriptors(&descs);
   ASSERT_EQ(1, descs.size()) << "Should have registered the TS";
   TSRegistrationPB reg;
@@ -179,12 +182,33 @@ TEST_F(MasterTest, TestRegisterAndHeartbeat) {
     req.mutable_registration()->CopyFrom(fake_reg);
     ASSERT_OK(proxy_->TSHeartbeat(req, &resp, &rpc));
 
+    ASSERT_TRUE(resp.leader_master());
     ASSERT_FALSE(resp.needs_reregister());
-    ASSERT_TRUE(resp.needs_full_tablet_report());
+    ASSERT_FALSE(resp.needs_full_tablet_report());
+    ASSERT_FALSE(resp.has_tablet_report());
+  }
+
+  // If we send the registration RPC while the master isn't the leader, it
+  // shouldn't ask for a full tablet report.
+  {
+    CatalogManager::ScopedLeaderDisablerForTests o(master_->catalog_manager());
+    TSHeartbeatRequestPB req;
+    TSHeartbeatResponsePB resp;
+    RpcController rpc;
+    req.mutable_common()->CopyFrom(common);
+    req.mutable_registration()->CopyFrom(fake_reg);
+    ASSERT_OK(proxy_->TSHeartbeat(req, &resp, &rpc));
+
+    ASSERT_FALSE(resp.leader_master());
+    ASSERT_FALSE(resp.needs_reregister());
+    ASSERT_FALSE(resp.needs_full_tablet_report());
+    ASSERT_FALSE(resp.has_tablet_report());
   }
 
-  // Now send a tablet report
+  // Send a full tablet report, but with the master as a follower. The
+  // report will be ignored.
   {
+    CatalogManager::ScopedLeaderDisablerForTests o(master_->catalog_manager());
     TSHeartbeatRequestPB req;
     TSHeartbeatResponsePB resp;
     RpcController rpc;
@@ -194,11 +218,47 @@ TEST_F(MasterTest, TestRegisterAndHeartbeat) {
     tr->set_sequence_number(0);
     ASSERT_OK(proxy_->TSHeartbeat(req, &resp, &rpc));
 
+    ASSERT_FALSE(resp.leader_master());
+    ASSERT_FALSE(resp.needs_reregister());
+    ASSERT_FALSE(resp.needs_full_tablet_report());
+    ASSERT_FALSE(resp.has_tablet_report());
+  }
+
+  // Now send a full report with the master as leader. The master will process
+  // it; this is reflected in the response.
+  {
+    TSHeartbeatRequestPB req;
+    TSHeartbeatResponsePB resp;
+    RpcController rpc;
+    req.mutable_common()->CopyFrom(common);
+    TabletReportPB* tr = req.mutable_tablet_report();
+    tr->set_is_incremental(false);
+    tr->set_sequence_number(0);
+    ASSERT_OK(proxy_->TSHeartbeat(req, &resp, &rpc));
+
+    ASSERT_TRUE(resp.leader_master());
+    ASSERT_FALSE(resp.needs_reregister());
+    ASSERT_FALSE(resp.needs_full_tablet_report());
+    ASSERT_TRUE(resp.has_tablet_report());
+  }
+
+  // Having sent a full report, an incremental report will also be processed.
+  {
+    TSHeartbeatRequestPB req;
+    TSHeartbeatResponsePB resp;
+    RpcController rpc;
+    req.mutable_common()->CopyFrom(common);
+    TabletReportPB* tr = req.mutable_tablet_report();
+    tr->set_is_incremental(true);
+    tr->set_sequence_number(0);
+    ASSERT_OK(proxy_->TSHeartbeat(req, &resp, &rpc));
+
+    ASSERT_TRUE(resp.leader_master());
     ASSERT_FALSE(resp.needs_reregister());
     ASSERT_FALSE(resp.needs_full_tablet_report());
+    ASSERT_TRUE(resp.has_tablet_report());
   }
 
-  descs.clear();
   master_->ts_manager()->GetAllDescriptors(&descs);
   ASSERT_EQ(1, descs.size()) << "Should still only have one TS registered";
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/8618ae2d/src/kudu/master/master_service.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/master_service.cc b/src/kudu/master/master_service.cc
index 9e7b08f..23dc37b 100644
--- a/src/kudu/master/master_service.cc
+++ b/src/kudu/master/master_service.cc
@@ -45,6 +45,7 @@ using consensus::RaftPeerPB;
 using std::string;
 using std::vector;
 using std::shared_ptr;
+using strings::Substitute;
 
 namespace {
 
@@ -82,78 +83,62 @@ void MasterServiceImpl::TSHeartbeat(const TSHeartbeatRequestPB* req,
   if (!l.CheckIsInitializedOrRespond(resp, rpc)) {
     return;
   }
+  bool is_leader_master = l.leader_status().ok();
 
+  // 2. All responses contain this.
   resp->mutable_master_instance()->CopyFrom(server_->instance_pb());
-  if (!l.leader_status().ok()) {
-    // For the time being, ignore heartbeats sent to non-leader distributed
-    // masters.
-    //
-    // TODO KUDU-493 Allow all master processes to receive heartbeat
-    // information: by having the TabletServers send heartbeats to all
-    // masters, or by storing heartbeat information in a replicated
-    // SysTable.
-    LOG(WARNING) << "Received a heartbeat, but this Master instance is not a leader
or a "
-                 << "single Master: " << l.leader_status().ToString();
-    resp->set_leader_master(false);
-    rpc->RespondSuccess();
-    return;
-  }
-  resp->set_leader_master(true);
+  resp->set_leader_master(is_leader_master);
 
+  // 3. Register or look up the tserver.
   shared_ptr<TSDescriptor> ts_desc;
-  // If the TS is registering, register in the TS manager.
   if (req->has_registration()) {
     Status s = server_->ts_manager()->RegisterTS(req->common().ts_instance(),
                                                  req->registration(),
                                                  &ts_desc);
     if (!s.ok()) {
-      LOG(WARNING) << "Unable to register tablet server (" << rpc->requestor_string()
<< "): "
-                   << s.ToString();
+      LOG(WARNING) << Substitute("Unable to register tserver ($0): $1",
+                                 rpc->requestor_string(), s.ToString());
       // TODO: add service-specific errors
       rpc->RespondFailure(s);
       return;
     }
+  } else {
+    Status s = server_->ts_manager()->LookupTS(req->common().ts_instance(), &ts_desc);
+    if (s.IsNotFound()) {
+      LOG(INFO) << Substitute("Got heartbeat from unknown tserver ($0) as $1; "
+          "Asking this server to re-register.",
+          req->common().ts_instance().ShortDebugString(), rpc->requestor_string());
+      resp->set_needs_reregister(true);
+
+      // Don't bother asking for a full tablet report if we're a follower;
+      // it'll just get ignored anyway.
+      resp->set_needs_full_tablet_report(is_leader_master);
+
+      rpc->RespondSuccess();
+      return;
+    } else if (!s.ok()) {
+      LOG(WARNING) << Substitute("Unable to look up tserver for heartbeat "
+          "request $0 from $1: $2", req->DebugString(),
+          rpc->requestor_string(), s.ToString());
+      rpc->RespondFailure(s.CloneAndPrepend("Unable to lookup tserver"));
+      return;
+    }
   }
 
-  // TODO: KUDU-86 if something fails after this point the TS will not be able
-  //       to register again.
-
-  // Look up the TS -- if it just registered above, it will be found here.
-  // This allows the TS to register and tablet-report in the same RPC.
-  Status s = server_->ts_manager()->LookupTS(req->common().ts_instance(), &ts_desc);
-  if (s.IsNotFound()) {
-    LOG(INFO) << "Got heartbeat from  unknown tablet server { "
-              << req->common().ts_instance().ShortDebugString()
-              << " } as " << rpc->requestor_string()
-              << "; Asking this server to re-register.";
-    resp->set_needs_reregister(true);
-    resp->set_needs_full_tablet_report(true);
-    rpc->RespondSuccess();
-    return;
-  } else if (!s.ok()) {
-    LOG(WARNING) << "Unable to look up tablet server for heartbeat request "
-                 << req->DebugString() << " from " << rpc->requestor_string()
-                 << "\nStatus: " << s.ToString();
-    rpc->RespondFailure(s.CloneAndPrepend("Unable to lookup TS"));
-    return;
-  }
-
+  // 4. Update tserver soft state based on the heartbeat contents.
   ts_desc->UpdateHeartbeatTime();
   ts_desc->set_num_live_replicas(req->num_live_tablets());
 
-  if (req->has_tablet_report()) {
-    s = server_->catalog_manager()->ProcessTabletReport(
-      ts_desc.get(), req->tablet_report(), resp->mutable_tablet_report(), rpc);
+  // 5. Only leaders handle tablet reports.
+  if (is_leader_master && req->has_tablet_report()) {
+    Status s = server_->catalog_manager()->ProcessTabletReport(
+        ts_desc.get(), req->tablet_report(), resp->mutable_tablet_report(), rpc);
     if (!s.ok()) {
       rpc->RespondFailure(s.CloneAndPrepend("Failed to process tablet report"));
       return;
     }
   }
 
-  if (!ts_desc->has_tablet_report()) {
-    resp->set_needs_full_tablet_report(true);
-  }
-
   rpc->RespondSuccess();
 }
 
@@ -297,11 +282,6 @@ void MasterServiceImpl::GetTableSchema(const GetTableSchemaRequestPB*
req,
 void MasterServiceImpl::ListTabletServers(const ListTabletServersRequestPB* req,
                                           ListTabletServersResponsePB* resp,
                                           rpc::RpcContext* rpc) {
-  CatalogManager::ScopedLeaderSharedLock l(server_->catalog_manager());
-  if (!l.CheckIsInitializedAndIsLeaderOrRespond(resp, rpc)) {
-    return;
-  }
-
   vector<std::shared_ptr<TSDescriptor> > descs;
   server_->ts_manager()->GetAllDescriptors(&descs);
   for (const std::shared_ptr<TSDescriptor>& desc : descs) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/8618ae2d/src/kudu/master/ts_descriptor.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/ts_descriptor.cc b/src/kudu/master/ts_descriptor.cc
index 80f500e..003faeb 100644
--- a/src/kudu/master/ts_descriptor.cc
+++ b/src/kudu/master/ts_descriptor.cc
@@ -28,6 +28,7 @@
 #include "kudu/tserver/tserver_admin.proxy.h"
 #include "kudu/util/net/net_util.h"
 
+using std::make_shared;
 using std::shared_ptr;
 
 namespace kudu {
@@ -35,8 +36,8 @@ namespace master {
 
 Status TSDescriptor::RegisterNew(const NodeInstancePB& instance,
                                  const TSRegistrationPB& registration,
-                                 gscoped_ptr<TSDescriptor>* desc) {
-  gscoped_ptr<TSDescriptor> ret(new TSDescriptor(instance.permanent_uuid()));
+                                 shared_ptr<TSDescriptor>* desc) {
+  shared_ptr<TSDescriptor> ret(make_shared<TSDescriptor>(instance.permanent_uuid()));
   RETURN_NOT_OK(ret->Register(instance, registration));
   desc->swap(ret);
   return Status::OK();
@@ -46,7 +47,6 @@ TSDescriptor::TSDescriptor(std::string perm_id)
     : permanent_uuid_(std::move(perm_id)),
       latest_seqno_(-1),
       last_heartbeat_(MonoTime::Now(MonoTime::FINE)),
-      has_tablet_report_(false),
       recent_replica_creations_(0),
       last_replica_creations_decay_(MonoTime::Now(MonoTime::FINE)),
       num_live_replicas_(0) {
@@ -87,9 +87,6 @@ Status TSDescriptor::Register(const NodeInstancePB& instance,
   }
 
   latest_seqno_ = instance.instance_seqno();
-  // After re-registering, make the TS re-report its tablets.
-  has_tablet_report_ = false;
-
   registration_.reset(new TSRegistrationPB(registration));
   ts_admin_proxy_.reset();
   consensus_proxy_.reset();
@@ -113,16 +110,6 @@ int64_t TSDescriptor::latest_seqno() const {
   return latest_seqno_;
 }
 
-bool TSDescriptor::has_tablet_report() const {
-  std::lock_guard<simple_spinlock> l(lock_);
-  return has_tablet_report_;
-}
-
-void TSDescriptor::set_has_tablet_report(bool has_report) {
-  std::lock_guard<simple_spinlock> l(lock_);
-  has_tablet_report_ = has_report;
-}
-
 void TSDescriptor::DecayRecentReplicaCreationsUnlocked() {
   // In most cases, we won't have any recent replica creations, so
   // we don't need to bother calling the clock, etc.

http://git-wip-us.apache.org/repos/asf/kudu/blob/8618ae2d/src/kudu/master/ts_descriptor.h
----------------------------------------------------------------------
diff --git a/src/kudu/master/ts_descriptor.h b/src/kudu/master/ts_descriptor.h
index 3522d83..1a571f2 100644
--- a/src/kudu/master/ts_descriptor.h
+++ b/src/kudu/master/ts_descriptor.h
@@ -23,6 +23,7 @@
 
 #include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/util/locks.h"
+#include "kudu/util/make_shared.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/status.h"
 
@@ -55,7 +56,7 @@ class TSDescriptor {
  public:
   static Status RegisterNew(const NodeInstancePB& instance,
                             const TSRegistrationPB& registration,
-                            gscoped_ptr<TSDescriptor>* desc);
+                            std::shared_ptr<TSDescriptor>* desc);
 
   virtual ~TSDescriptor();
 
@@ -73,9 +74,6 @@ class TSDescriptor {
   const std::string &permanent_uuid() const { return permanent_uuid_; }
   int64_t latest_seqno() const;
 
-  bool has_tablet_report() const;
-  void set_has_tablet_report(bool has_report);
-
   // Copy the current registration info into the given PB object.
   // A safe copy is returned because the internal Registration object
   // may be mutated at any point if the tablet server re-registers.
@@ -132,9 +130,6 @@ class TSDescriptor {
   // The last time a heartbeat was received for this node.
   MonoTime last_heartbeat_;
 
-  // Set to true once this instance has reported all of its tablets.
-  bool has_tablet_report_;
-
   // The number of times this tablet server has recently been selected to create a
   // tablet replica. This value decays back to 0 over time.
   double recent_replica_creations_;
@@ -148,6 +143,7 @@ class TSDescriptor {
   std::shared_ptr<tserver::TabletServerAdminServiceProxy> ts_admin_proxy_;
   std::shared_ptr<consensus::ConsensusServiceProxy> consensus_proxy_;
 
+  ALLOW_MAKE_SHARED(TSDescriptor);
   DISALLOW_COPY_AND_ASSIGN(TSDescriptor);
 };
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/8618ae2d/src/kudu/master/ts_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/ts_manager.cc b/src/kudu/master/ts_manager.cc
index 59b2b53..f7601ac 100644
--- a/src/kudu/master/ts_manager.cc
+++ b/src/kudu/master/ts_manager.cc
@@ -21,6 +21,7 @@
 #include <vector>
 
 #include "kudu/gutil/map-util.h"
+#include "kudu/gutil/strings/substitute.h"
 #include "kudu/master/master.pb.h"
 #include "kudu/master/ts_descriptor.h"
 #include "kudu/util/flag_tags.h"
@@ -34,6 +35,7 @@ TAG_FLAG(tserver_unresponsive_timeout_ms, advanced);
 using std::shared_ptr;
 using std::string;
 using std::vector;
+using strings::Substitute;
 
 namespace kudu {
 namespace master {
@@ -75,16 +77,18 @@ Status TSManager::RegisterTS(const NodeInstancePB& instance,
   const string& uuid = instance.permanent_uuid();
 
   if (!ContainsKey(servers_by_id_, uuid)) {
-    gscoped_ptr<TSDescriptor> new_desc;
+    shared_ptr<TSDescriptor> new_desc;
     RETURN_NOT_OK(TSDescriptor::RegisterNew(instance, registration, &new_desc));
-    InsertOrDie(&servers_by_id_, uuid, shared_ptr<TSDescriptor>(new_desc.release()));
-    LOG(INFO) << "Registered new tablet server { " << instance.ShortDebugString()
-              << " } with Master";
+    InsertOrDie(&servers_by_id_, uuid, new_desc);
+    LOG(INFO) << Substitute("Registered new tserver $0 with Master",
+                            instance.ShortDebugString());
+    desc->swap(new_desc);
   } else {
-    const shared_ptr<TSDescriptor>& found = FindOrDie(servers_by_id_, uuid);
+    shared_ptr<TSDescriptor> found(FindOrDie(servers_by_id_, uuid));
     RETURN_NOT_OK(found->Register(instance, registration));
-    LOG(INFO) << "Re-registered known tablet server { " << instance.ShortDebugString()
-              << " } with Master";
+    LOG(INFO) << Substitute("Re-registered known tserver $0 with Master",
+                            instance.ShortDebugString());
+    desc->swap(found);
   }
 
   return Status::OK();


Mime
View raw message