kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ale...@apache.org
Subject [kudu] 02/02: [consensus] KUDU-2947 fix voting in case of slow WAL
Date Sat, 21 Sep 2019 04:37:57 GMT
This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit ee22ddcc734ab4947218c670d5cfddd61fe90fbb
Author: Alexey Serbin <alexey@apache.org>
AuthorDate: Wed Sep 18 09:52:44 2019 -0700

    [consensus] KUDU-2947 fix voting in case of slow WAL
    
    Before this patch, a follower replica may grant 'yes' vote right after
    processing recent Raft transactions even if currently established leader
    is alive and well in cases when persisting a Raft transaction in WAL
    takes longer than the leader election timeout.
    
    That might lead to multiple successive election rounds even if there
    were no actual reason to re-elect leader replicas.
    
    This patch also adds a test to verify the behavior of follower tablet
    replicas under the conditions described above.  The test was failing
    before the patch and now it's now passing.
    
    This is a follow-up to 4870ef20b9a0c729c1d7f16c016c7d91b193e46f.
    
    Change-Id: I7c061b498e727a1a11e94e03c55530eeebfdf8dd
    Reviewed-on: http://gerrit.cloudera.org:8080/14260
    Tested-by: Kudu Jenkins
    Reviewed-by: Adar Dembo <adar@cloudera.com>
    Reviewed-by: Andrew Wong <awong@cloudera.com>
---
 src/kudu/consensus/raft_consensus.cc               |  24 ++-
 src/kudu/consensus/raft_consensus.h                |   6 +
 .../raft_consensus_election-itest.cc               | 162 ++++++++++++++++++++-
 3 files changed, 188 insertions(+), 4 deletions(-)

diff --git a/src/kudu/consensus/raft_consensus.cc b/src/kudu/consensus/raft_consensus.cc
index a7b2057..358fa77 100644
--- a/src/kudu/consensus/raft_consensus.cc
+++ b/src/kudu/consensus/raft_consensus.cc
@@ -1436,7 +1436,7 @@ Status RaftConsensus::UpdateReplica(const ConsensusRequestPB* request,
     queue_->UpdateLastIndexAppendedToLeader(request->last_idx_appended_to_leader());
 
     // Also prohibit voting for anyone for the minimum election timeout.
-    withhold_votes_until_ = MonoTime::Now() + MinimumElectionTimeout();
+    WithholdVotesUnlocked();
 
     // 1 - Early commit pending (and committed) transactions
 
@@ -1596,9 +1596,15 @@ Status RaftConsensus::UpdateReplica(const ConsensusRequestPB* request,
       s = log_synchronizer.WaitFor(
           MonoDelta::FromMilliseconds(FLAGS_raft_heartbeat_interval_ms));
       // If just waiting for our log append to finish lets snooze the timer.
-      // We don't want to fire leader election because we're waiting on our own log.
+      // We don't want to fire leader election nor accept vote requests because
+      // we're still processing the Raft message from the leader,
+      // waiting on our own log.
       if (s.IsTimedOut()) {
         SnoozeFailureDetector();
+        {
+          LockGuard l(lock_);
+          WithholdVotesUnlocked();
+        }
       }
     } while (s.IsTimedOut());
     RETURN_NOT_OK(s);
@@ -1723,9 +1729,15 @@ Status RaftConsensus::RequestVote(const VoteRequestPB* request,
   // will change to eject the abandoned node, but until that point, we don't want the
   // abandoned follower to disturb the other nodes.
   //
+  // 3) Other dynamic scenarios with a stale former leader
+  // This is a generalization of the case 1. It's possible that a stale former
+  // leader detects it's not a leader anymore at some point, but a majority
+  // of replicas has elected a new leader already.
+  //
   // See also https://ramcloud.stanford.edu/~ongaro/thesis.pdf
   // section 4.2.3.
-  if (!request->ignore_live_leader() && MonoTime::Now() < withhold_votes_until_)
{
+  if (PREDICT_TRUE(!request->ignore_live_leader()) &&
+      MonoTime::Now() < withhold_votes_until_) {
     return RequestVoteRespondLeaderIsAlive(request, response);
   }
 
@@ -2848,6 +2860,12 @@ void RaftConsensus::SnoozeFailureDetector(boost::optional<string>
reason_for_log
   }
 }
 
+void RaftConsensus::WithholdVotesUnlocked() {
+  DCHECK(lock_.is_locked());
+  withhold_votes_until_ = std::max(withhold_votes_until_,
+                                   MonoTime::Now() + MinimumElectionTimeout());
+}
+
 MonoDelta RaftConsensus::MinimumElectionTimeout() const {
   int32_t failure_timeout = FLAGS_leader_failure_max_missed_heartbeat_periods *
       FLAGS_raft_heartbeat_interval_ms;
diff --git a/src/kudu/consensus/raft_consensus.h b/src/kudu/consensus/raft_consensus.h
index de12806..b2afafb 100644
--- a/src/kudu/consensus/raft_consensus.h
+++ b/src/kudu/consensus/raft_consensus.h
@@ -643,6 +643,12 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
   void SnoozeFailureDetector(boost::optional<std::string> reason_for_log = boost::none,
                              boost::optional<MonoDelta> delta = boost::none);
 
+  // Update the voting withhold interval, bumping it up for the minimum
+  // election timeout interval, i.e. 'FLAGS_raft_heartbeat_interval_ms' *
+  // 'FLAGS_leader_failure_max_missed_heartbeat_periods' milliseconds.
+  // This method is safe to call even it's a leader replica.
+  void WithholdVotesUnlocked();
+
   // Return the minimum election timeout. Due to backoff and random
   // jitter, election timeouts may be longer than this.
   MonoDelta MinimumElectionTimeout() const;
diff --git a/src/kudu/integration-tests/raft_consensus_election-itest.cc b/src/kudu/integration-tests/raft_consensus_election-itest.cc
index 63c2ecf..96c459c 100644
--- a/src/kudu/integration-tests/raft_consensus_election-itest.cc
+++ b/src/kudu/integration-tests/raft_consensus_election-itest.cc
@@ -29,6 +29,8 @@
 #include "kudu/common/wire_protocol.pb.h"
 #include "kudu/consensus/consensus.pb.h"
 #include "kudu/consensus/metadata.pb.h"
+#include "kudu/consensus/opid.pb.h"
+#include "kudu/consensus/opid_util.h"
 #include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/strings/substitute.h"
@@ -61,16 +63,21 @@ METRIC_DECLARE_counter(transaction_memory_pressure_rejections);
 METRIC_DECLARE_gauge_int64(raft_term);
 
 using kudu::cluster::ExternalTabletServer;
+using kudu::consensus::COMMITTED_OPID;
 using kudu::consensus::ConsensusStatePB;
+using kudu::consensus::MakeOpId;
+using kudu::consensus::OpId;
 using kudu::consensus::RaftPeerPB;
 using kudu::itest::AddServer;
 using kudu::itest::GetConsensusState;
+using kudu::itest::GetLastOpIdForReplica;
 using kudu::itest::GetReplicaStatusAndCheckIfLeader;
 using kudu::itest::LeaderStepDown;
 using kudu::itest::RemoveServer;
+using kudu::itest::RequestVote;
 using kudu::itest::StartElection;
-using kudu::itest::TabletServerMap;
 using kudu::itest::TServerDetails;
+using kudu::itest::TabletServerMap;
 using kudu::itest::WaitUntilLeader;
 using kudu::itest::WriteSimpleTestRow;
 using kudu::pb_util::SecureShortDebugString;
@@ -564,6 +571,159 @@ TEST_F(RaftConsensusElectionITest, TombstonedVoteAfterFailedTabletCopy)
{
   workload.StopAndJoin();
 }
 
+// A test scenario to verify that a disruptive server doesn't start needless
+// elections in case if it takes a long time to replicate Raft transactions
+// from leader to follower replicas (e.g., due to slowness in WAL IO ops).
+TEST_F(RaftConsensusElectionITest, DisruptiveServerAndSlowWAL) {
+  SKIP_IF_SLOW_NOT_ALLOWED();
+  const MonoDelta kTimeout = MonoDelta::FromSeconds(10);
+  // Shorten the heartbeat interval for faster test run time.
+  const auto kHeartbeatIntervalMs = 200;
+  const auto kMaxMissedHeartbeatPeriods = 3;
+  const vector<string> ts_flags {
+    Substitute("--raft_heartbeat_interval_ms=$0", kHeartbeatIntervalMs),
+    Substitute("--leader_failure_max_missed_heartbeat_periods=$0",
+               kMaxMissedHeartbeatPeriods),
+  };
+  NO_FATALS(BuildAndStart(ts_flags));
+
+  // Sanity check: this scenario assumes there are 3 tablet servers. The test
+  // table is created with RF=FLAGS_num_replicas.
+  ASSERT_EQ(3, FLAGS_num_replicas);
+  ASSERT_EQ(3, tablet_servers_.size());
+
+  // A convenience array to access each tablet server as TServerDetails.
+  vector<TServerDetails*> tservers;
+  AppendValuesFromMap(tablet_servers_, &tservers);
+  ASSERT_EQ(cluster_->num_tablet_servers(), tservers.size());
+
+  // The leadership might fluctuate before shutting down the third tablet
+  // server, so ASSERT_EVENTUALLY() below is for those rare cases.
+  //
+  // However, after one of the tablet servers is shutdown, the leadership should
+  // not fluctuate because:
+  //   1) only two voters out of three are alive
+  //   2) current leader should not be disturbed by any rogue votes -- that's
+  //      the whole premise of this test scenario
+  //
+  // So, for this scenario the leadership can fluctuate only if significantly
+  // delaying or dropping Raft heartbeats sent from leader to follower replicas.
+  // However, minicluster components send heartbeats via the loopback interface,
+  // so no real network layer that might significantly delay heartbeats
+  // is involved. Also, the consensus RPC queues should not overflow
+  // in this scenario because the number of consensus RPCs is relatively low.
+  TServerDetails* leader_tserver = nullptr;
+  TServerDetails* other_tserver = nullptr;
+  TServerDetails* shutdown_tserver = nullptr;
+  ASSERT_EVENTUALLY([&] {
+    // This is a clean-up in case of retry.
+    if (shutdown_tserver != nullptr) {
+      auto* ts = cluster_->tablet_server_by_uuid(shutdown_tserver->uuid());
+      if (ts->IsShutdown()) {
+        ASSERT_OK(ts->Restart());
+      }
+    }
+    for (auto idx = 0; idx < cluster_->num_tablet_servers(); ++idx) {
+      auto* ts = cluster_->tablet_server(idx);
+      ASSERT_OK(cluster_->SetFlag(ts, "log_inject_latency", "false"));
+    }
+    leader_tserver = nullptr;
+    other_tserver = nullptr;
+    shutdown_tserver = nullptr;
+
+    ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, &leader_tserver));
+    ASSERT_OK(WriteSimpleTestRow(leader_tserver, tablet_id_,
+                                 RowOperationsPB::UPSERT, 0, 0, "", kTimeout));
+    OpId op_id;
+    ASSERT_OK(GetLastOpIdForReplica(tablet_id_, leader_tserver,
+                                    consensus::COMMITTED_OPID, kTimeout,
+                                    &op_id));
+    ASSERT_OK(WaitForServersToAgree(kTimeout, tablet_servers_, tablet_id_,
+                                    op_id.index()));
+    // Shutdown one tablet server that doesn't host the leader replica of the
+    // target tablet and inject WAL latency to others.
+    for (const auto& server : tservers) {
+      auto* ts = cluster_->tablet_server_by_uuid(server->uuid());
+      if (server->uuid() != leader_tserver->uuid() && shutdown_tserver == nullptr)
{
+        shutdown_tserver = server;
+        continue;
+      }
+      if (server->uuid() != leader_tserver->uuid() && other_tserver == nullptr)
{
+        other_tserver = server;
+      }
+      // For this scenario it's important to reserve some inactivity intervals
+      // for the follower between processing Raft messages from the leader.
+      // If a vote request arrives while replica is busy with processing
+      // Raft message from leader, it is rejected with 'busy' status before
+      // evaluating the vote withholding interval.
+      const auto mult = (server->uuid() == leader_tserver->uuid()) ? 2 : 1;
+      const auto latency_ms = mult *
+          kHeartbeatIntervalMs * kMaxMissedHeartbeatPeriods;
+      ASSERT_OK(cluster_->SetFlag(ts, "log_inject_latency_ms_mean",
+                                  std::to_string(latency_ms)));
+      ASSERT_OK(cluster_->SetFlag(ts, "log_inject_latency_ms_stddev", "0"));
+      ASSERT_OK(cluster_->SetFlag(ts, "log_inject_latency", "true"));
+    }
+
+    // Shutdown the third tablet server.
+    cluster_->tablet_server_by_uuid(shutdown_tserver->uuid())->Shutdown();
+
+    // Sanity check: make sure the leadership hasn't changed since the leader
+    // has been determined.
+    TServerDetails* current_leader;
+    ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, &current_leader));
+    ASSERT_EQ(cluster_->tablet_server_index_by_uuid(leader_tserver->uuid()),
+              cluster_->tablet_server_index_by_uuid(current_leader->uuid()));
+  });
+
+  // Get the Raft term from the established leader.
+  ConsensusStatePB cstate;
+  ASSERT_OK(GetConsensusState(leader_tserver, tablet_id_, kTimeout,
+                              consensus::EXCLUDE_HEALTH_REPORT, &cstate));
+
+  TestWorkload workload(cluster_.get());
+  workload.set_table_name(kTableId);
+  workload.set_timeout_allowed(true);
+  workload.set_network_error_allowed(true);
+  workload.set_already_present_allowed(true);
+  workload.set_num_read_threads(0);
+  workload.set_num_write_threads(1);
+  workload.set_write_batch_size(1);
+  // Make a 'space' for the artificial vote requests (see below) to arrive
+  // while there is no activity on RaftConsensus::Update().
+  workload.set_write_interval_millis(kHeartbeatIntervalMs);
+  workload.Setup();
+  workload.Start();
+
+  // Issue rogue vote requests, imitating a disruptive tablet replica.
+  const auto& shutdown_server_uuid = shutdown_tserver->uuid();
+  const auto next_term = cstate.current_term() + 1;
+  const auto targets = { leader_tserver, other_tserver };
+  for (auto i = 0; i < 100; ++i) {
+    SleepFor(MonoDelta::FromMilliseconds(kHeartbeatIntervalMs / 4));
+    for (const auto* ts : targets) {
+      auto s = RequestVote(ts, tablet_id_, shutdown_server_uuid,
+                           next_term, MakeOpId(next_term + i, 0),
+                           /*ignore_live_leader=*/ false,
+                           /*is_pre_election=*/ false,
+                           kTimeout);
+      // Neither leader nor follower replica should grant 'yes' vote
+      // since the healthy leader is there and doing well, sending Raft
+      // transactions to be replicated.
+      ASSERT_TRUE(s.IsInvalidArgument() || s.IsServiceUnavailable())
+          << s.ToString();
+      ASSERT_STR_MATCHES(s.ToString(),
+          "("
+              "because replica is either leader or "
+              "believes a valid leader to be alive"
+          "|"
+              "because replica is already servicing an update "
+              "from a current leader or another vote"
+          ")");
+    }
+  }
+}
+
 }  // namespace tserver
 }  // namespace kudu
 


Mime
View raw message