Repository: kudu
Updated Branches:
refs/heads/master 37b89924e -> 79a255bbf
KUDU-1097 (patch 3): Implement promotion of NON_VOTER replicas
This patch implements automatic leader-side promotion of NON_VOTER
replicas that have their "promote" attribute set. This occurs when the
NON_VOTER in question catches up to within a certain number of WAL
entries of the commit index, as tracked by the leader (the default is
100, controlled by --consensus_promotion_max_wal_entries_behind).
This patch also adds some infrastructure to support this functionality:
* New notification callback so the consensus queue can tell
RaftConsensus when to promote a non-voter.
* Support for including RaftAttrPB attributes in CHANGE_REPLICA_TYPE.
* A simple non-voter promotion test.
More comprehensive end-to-end testing is needed before enabling this
feature by default.
Change-Id: Ife6f59658cb2f38d087d57c244ad811010a91fef
Reviewed-on: http://gerrit.cloudera.org:8080/8633
Reviewed-by: Alexey Serbin <aserbin@cloudera.com>
Tested-by: Kudu Jenkins
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/79a255bb
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/79a255bb
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/79a255bb
Branch: refs/heads/master
Commit: 79a255bbf4bf52f5d62feba213b21c9ad2fd9ffe
Parents: 37b8992
Author: Mike Percy <mpercy@apache.org>
Authored: Wed Nov 22 15:30:11 2017 -0800
Committer: Mike Percy <mpercy@apache.org>
Committed: Mon Nov 27 09:08:04 2017 +0000
----------------------------------------------------------------------
src/kudu/consensus/consensus_queue.cc | 65 ++++++++++++++++
src/kudu/consensus/consensus_queue.h | 13 ++++
src/kudu/consensus/raft_consensus.cc | 78 +++++++++++++++++++-
src/kudu/consensus/raft_consensus.h | 10 +++
.../integration-tests/cluster_itest_util.cc | 2 +
src/kudu/integration-tests/cluster_itest_util.h | 1 +
.../raft_config_change-itest.cc | 63 ++++++++++++++++
.../integration-tests/raft_consensus-itest.cc | 5 +-
8 files changed, 232 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/79a255bb/src/kudu/consensus/consensus_queue.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_queue.cc b/src/kudu/consensus/consensus_queue.cc
index b3897f7..e1c329a 100644
--- a/src/kudu/consensus/consensus_queue.cc
+++ b/src/kudu/consensus/consensus_queue.cc
@@ -69,6 +69,13 @@ DEFINE_int32(consensus_inject_latency_ms_in_notifications, 0,
TAG_FLAG(consensus_inject_latency_ms_in_notifications, hidden);
TAG_FLAG(consensus_inject_latency_ms_in_notifications, unsafe);
+DEFINE_int32(consensus_promotion_max_wal_entries_behind, 100,
+ "The number of WAL entries that a NON_VOTER with PROMOTE=true can "
+ "be behind the leader's committed index and be considered ready "
+ "for promotion to VOTER.");
+TAG_FLAG(consensus_promotion_max_wal_entries_behind, advanced);
+TAG_FLAG(consensus_promotion_max_wal_entries_behind, experimental);
+
DECLARE_int32(consensus_rpc_timeout_ms);
DECLARE_bool(safe_time_advancement_without_writes);
DECLARE_bool(raft_prepare_replacement_before_eviction);
@@ -896,6 +903,39 @@ void PeerMessageQueue::ResponseFromPeer(const std::string& peer_uuid,
peer->last_received = status.last_received();
peer->next_index = peer->last_received.index() + 1;
+ // Since the peer has a prefix of our log, if we are the leader and they
+ // are a non-voter then it may be time to promote them to a voter.
+ //
+ // TODO(mpercy): Factor this out into a dedicated function.
+
+ int64_t entries_behind = queue_state_.committed_index - peer->last_received.index();
+ if (queue_state_.mode == PeerMessageQueue::LEADER &&
+ entries_behind <= FLAGS_consensus_promotion_max_wal_entries_behind) {
+ RaftPeerPB* peer_pb;
+ // TODO(mpercy): It would be more efficient to cached the member type
+ // in the TrackedPeer data structure. The downside is that we'd end up
+ // with multiple sources of truth that would need to be kept in sync.
+ Status s = GetRaftConfigMember(DCHECK_NOTNULL(queue_state_.active_config.get()),
+ peer->uuid, &peer_pb);
+ if (s.ok() &&
+ peer_pb &&
+ peer_pb->member_type() == RaftPeerPB::NON_VOTER &&
+ peer_pb->attrs().promote()) {
+ // This peer is ready to promote.
+ //
+ // TODO(mpercy): Should we introduce a function SafeToPromote() that
+ // does the same calculation as SafeToEvict() but for adding a VOTER?
+ //
+ // Note: we can pass in the active config's 'opid_index' as the
+ // committed config's opid_index because if we're in the middle of a
+ // config change, this requested config change will be rejected
+ // anyway.
+ NotifyObserversOfPeerToPromote(peer->uuid,
+ queue_state_.current_term,
+ queue_state_.active_config->opid_index());
+ }
+ }
+
} else if (!OpIdEquals(status.last_received_current_leader(), MinimumOpId())) {
// Their log may have diverged from ours, however we are in the process
// of replicating our ops to them, so continue doing so. Eventually, we
@@ -1245,6 +1285,31 @@ void PeerMessageQueue::NotifyObserversOfFailedFollowerTask(const string&
uuid,
}
}
+void PeerMessageQueue::NotifyObserversOfPeerToPromote(const string& peer_uuid,
+ int64_t term,
+ int64_t committed_config_opid_index)
{
+ WARN_NOT_OK(raft_pool_observers_token_->SubmitClosure(
+ Bind(&PeerMessageQueue::NotifyObserversOfPeerToPromoteTask,
+ Unretained(this), peer_uuid, term, committed_config_opid_index)),
+ LogPrefixUnlocked() + "unable to notify RaftConsensus of peer to promote");
+
+}
+
+void PeerMessageQueue::NotifyObserversOfPeerToPromoteTask(const string& peer_uuid,
+ int64_t term,
+ int64_t committed_config_opid_index)
{
+ MAYBE_INJECT_RANDOM_LATENCY(FLAGS_consensus_inject_latency_ms_in_notifications);
+ std::vector<PeerMessageQueueObserver*> observers_copy;
+ {
+ std::lock_guard<simple_spinlock> lock(queue_lock_);
+ observers_copy = observers_;
+ }
+ for (PeerMessageQueueObserver* observer : observers_copy) {
+ observer->NotifyPeerToPromote(peer_uuid, term, committed_config_opid_index);
+ }
+
+}
+
void PeerMessageQueue::NotifyObserversOfPeerHealthChange() {
WARN_NOT_OK(raft_pool_observers_token_->SubmitClosure(
Bind(&PeerMessageQueue::NotifyObserversOfPeerHealthChangeTask, Unretained(this))),
http://git-wip-us.apache.org/repos/asf/kudu/blob/79a255bb/src/kudu/consensus/consensus_queue.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_queue.h b/src/kudu/consensus/consensus_queue.h
index 871089b..59d0f19 100644
--- a/src/kudu/consensus/consensus_queue.h
+++ b/src/kudu/consensus/consensus_queue.h
@@ -454,6 +454,13 @@ class PeerMessageQueue {
int64_t term,
const std::string& reason);
+ void NotifyObserversOfPeerToPromote(const std::string& peer_uuid,
+ int64_t term,
+ int64_t committed_config_opid_index);
+ void NotifyObserversOfPeerToPromoteTask(const std::string& peer_uuid,
+ int64_t term,
+ int64_t committed_config_opid_index);
+
void NotifyObserversOfPeerHealthChange();
void NotifyObserversOfPeerHealthChangeTask();
@@ -543,6 +550,12 @@ class PeerMessageQueueObserver {
int64_t term,
const std::string& reason) = 0;
+ // Notify the observer that the specified peer is ready to be promoted from
+ // NON_VOTER to VOTER.
+ virtual void NotifyPeerToPromote(const std::string& peer_uuid,
+ int64_t term,
+ int64_t committed_config_opid_index) = 0;
+
// Notify the observer that the health of one of the peers has changed.
virtual void NotifyPeerHealthChange() = 0;
http://git-wip-us.apache.org/repos/asf/kudu/blob/79a255bb/src/kudu/consensus/raft_consensus.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus.cc b/src/kudu/consensus/raft_consensus.cc
index 45066bb..f1e0650 100644
--- a/src/kudu/consensus/raft_consensus.cc
+++ b/src/kudu/consensus/raft_consensus.cc
@@ -776,7 +776,20 @@ void RaftConsensus::NotifyFailedFollower(const string& uuid,
uuid,
committed_config,
reason)),
- LogPrefixThreadSafe() + "Unable to start RemoteFollowerTask");
+ LogPrefixThreadSafe() + "Unable to start TryRemoveFollowerTask");
+}
+
+void RaftConsensus::NotifyPeerToPromote(const std::string& peer_uuid,
+ int64_t term,
+ int64_t committed_config_opid_index) {
+ // Run the config change on the raft thread pool.
+ WARN_NOT_OK(raft_pool_token_->SubmitFunc(std::bind(&RaftConsensus::TryPromoteNonVoterTask,
+ shared_from_this(),
+ peer_uuid,
+ term,
+ committed_config_opid_index)),
+ LogPrefixThreadSafe() + "Unable to start TryPromoteNonVoterTask");
+
}
void RaftConsensus::NotifyPeerHealthChange() {
@@ -798,6 +811,56 @@ void RaftConsensus::TryRemoveFollowerTask(const string& uuid,
LogPrefixThreadSafe() + "Unable to remove follower " + uuid);
}
+void RaftConsensus::TryPromoteNonVoterTask(const std::string& peer_uuid,
+ int64_t term,
+ int64_t committed_config_opid_index) {
+ string msg = Substitute("attempt to promote peer $0: ", peer_uuid);
+ {
+ ThreadRestrictions::AssertWaitAllowed();
+ LockGuard l(lock_);
+ int64_t current_term = CurrentTermUnlocked();
+ if (term != current_term) {
+ LOG_WITH_PREFIX_UNLOCKED(INFO) << msg << "requested in "
+ << "previous term " << term
+ << ", but a leader election "
+ << "likely occurred since then. Doing nothing.";
+ return;
+ }
+
+ int64_t current_committed_config_opid_index =
+ cmeta_->GetConfigOpIdIndex(COMMITTED_CONFIG);
+ if (committed_config_opid_index != current_committed_config_opid_index) {
+ LOG_WITH_PREFIX_UNLOCKED(INFO) << msg << "notified when "
+ << "committed config had opid index "
+ << committed_config_opid_index << ", but
now "
+ << "the committed config has opid index "
+ << current_committed_config_opid_index
+ << ". Doing nothing.";
+ return;
+ }
+
+ if (cmeta_->has_pending_config()) {
+ LOG_WITH_PREFIX_UNLOCKED(INFO) << msg << "there is already a config change
operation "
+ << "in progress. Unable to promote follower until
it "
+ << "completes. Doing nothing.";
+ return;
+ }
+ }
+
+ ChangeConfigRequestPB req;
+ req.set_tablet_id(options_.tablet_id);
+ req.set_type(CHANGE_REPLICA_TYPE);
+ req.mutable_server()->set_permanent_uuid(peer_uuid);
+ req.mutable_server()->set_member_type(RaftPeerPB::VOTER);
+ req.mutable_server()->mutable_attrs()->set_promote(false);
+ req.set_cas_config_opid_index(committed_config_opid_index);
+ LOG(INFO) << LogPrefixThreadSafe() << "attempting to promote NON_VOTER "
+ << peer_uuid << " to VOTER";
+ boost::optional<TabletServerErrorPB::Code> error_code;
+ WARN_NOT_OK(ChangeConfig(req, &DoNothingStatusCB, &error_code),
+ LogPrefixThreadSafe() + Substitute("Unable to promote non-voter $0", peer_uuid));
+}
+
Status RaftConsensus::Update(const ConsensusRequestPB* request,
ConsensusResponsePB* response) {
update_calls_for_tests_.Increment();
@@ -1653,7 +1716,6 @@ Status RaftConsensus::ChangeConfig(const ChangeConfigRequestPB&
req,
}
break;
- // TODO(mpercy): Support changing between VOTER and NON_VOTER.
case CHANGE_REPLICA_TYPE:
if (server.member_type() == RaftPeerPB::UNKNOWN_MEMBER_TYPE) {
return Status::InvalidArgument("Cannot change replica type to UNKNOWN_MEMBER_TYPE");
@@ -1673,7 +1735,17 @@ Status RaftConsensus::ChangeConfig(const ChangeConfigRequestPB&
req,
return Status::InvalidArgument("Cannot change replica type to same type");
}
peer_pb->set_member_type(server.member_type());
- // TODO(mpercy): Copy over replica intentions when implemented.
+ // Override attributes only if they are explicitly passed in the request.
+ // TODO(mpercy): It seems cleaner to bulk-overwrite 'attrs' with
+ // whatever is passed into the request, but that would make it more
+ // complicated to correctly handle a non-voter that had both its
+ // "promote" and "replace" flags set.
+ if (server.attrs().has_promote()) {
+ peer_pb->mutable_attrs()->set_promote(server.attrs().promote());
+ }
+ if (server.attrs().has_replace()) {
+ peer_pb->mutable_attrs()->set_replace(server.attrs().replace());
+ }
break;
default:
http://git-wip-us.apache.org/repos/asf/kudu/blob/79a255bb/src/kudu/consensus/raft_consensus.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus.h b/src/kudu/consensus/raft_consensus.h
index fb43e10..3bdb53a 100644
--- a/src/kudu/consensus/raft_consensus.h
+++ b/src/kudu/consensus/raft_consensus.h
@@ -325,6 +325,10 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
int64_t term,
const std::string& reason) override;
+ void NotifyPeerToPromote(const std::string& peer_uuid,
+ int64_t term,
+ int64_t committed_config_opid_index) override;
+
void NotifyPeerHealthChange() override;
// Return the log indexes which the consensus implementation would like to retain.
@@ -634,6 +638,12 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
const RaftConfigPB& committed_config,
const std::string& reason);
+ // Attempt to promote the given non-voter to a voter, if the 'term'
+ // and 'committed_config_opid_index' CAS parameters are both still current.
+ void TryPromoteNonVoterTask(const std::string& peer_uuid,
+ int64_t term,
+ int64_t committed_config_opid_index);
+
// Called when the failure detector expires.
// Submits ReportFailureDetectedTask() to a thread pool.
void ReportFailureDetected();
http://git-wip-us.apache.org/repos/asf/kudu/blob/79a255bb/src/kudu/integration-tests/cluster_itest_util.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/cluster_itest_util.cc b/src/kudu/integration-tests/cluster_itest_util.cc
index 114c239..898394f 100644
--- a/src/kudu/integration-tests/cluster_itest_util.cc
+++ b/src/kudu/integration-tests/cluster_itest_util.cc
@@ -690,6 +690,7 @@ Status AddServer(const TServerDetails* leader,
const TServerDetails* replica_to_add,
consensus::RaftPeerPB::MemberType member_type,
const MonoDelta& timeout,
+ const consensus::RaftPeerAttrsPB& attrs,
const boost::optional<int64_t>& cas_config_index,
TabletServerErrorPB::Code* error_code) {
ChangeConfigRequestPB req;
@@ -699,6 +700,7 @@ Status AddServer(const TServerDetails* leader,
RaftPeerPB* peer = req.mutable_server();
peer->set_permanent_uuid(replica_to_add->uuid());
peer->set_member_type(member_type);
+ *peer->mutable_attrs() = attrs;
*peer->mutable_last_known_addr() = replica_to_add->registration.rpc_addresses(0);
if (cas_config_index) {
req.set_cas_config_opid_index(*cas_config_index);
http://git-wip-us.apache.org/repos/asf/kudu/blob/79a255bb/src/kudu/integration-tests/cluster_itest_util.h
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/cluster_itest_util.h b/src/kudu/integration-tests/cluster_itest_util.h
index 9252dc9..8dcc5d7 100644
--- a/src/kudu/integration-tests/cluster_itest_util.h
+++ b/src/kudu/integration-tests/cluster_itest_util.h
@@ -276,6 +276,7 @@ Status AddServer(const TServerDetails* leader,
const TServerDetails* replica_to_add,
consensus::RaftPeerPB::MemberType member_type,
const MonoDelta& timeout,
+ const consensus::RaftPeerAttrsPB& attrs = {},
const boost::optional<int64_t>& cas_config_index = boost::none,
tserver::TabletServerErrorPB::Code* error_code = nullptr);
http://git-wip-us.apache.org/repos/asf/kudu/blob/79a255bb/src/kudu/integration-tests/raft_config_change-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/raft_config_change-itest.cc b/src/kudu/integration-tests/raft_config_change-itest.cc
index 0ba04b8..b0f0cba 100644
--- a/src/kudu/integration-tests/raft_config_change-itest.cc
+++ b/src/kudu/integration-tests/raft_config_change-itest.cc
@@ -19,12 +19,16 @@
#include <ostream>
#include <string>
#include <unordered_map>
+#include <unordered_set>
+#include <utility>
#include <vector>
#include <glog/logging.h>
#include <gtest/gtest.h>
#include "kudu/consensus/consensus.pb.h"
+#include "kudu/consensus/metadata.pb.h"
+#include "kudu/gutil/map-util.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/integration-tests/cluster_itest_util.h"
#include "kudu/integration-tests/external_mini_cluster-itest-base.h"
@@ -38,8 +42,11 @@
#include "kudu/util/test_util.h"
using kudu::consensus::COMMITTED_OPID;
+using kudu::consensus::RaftPeerAttrsPB;
+using kudu::consensus::RaftPeerPB;
using kudu::itest::TServerDetails;
using std::string;
+using std::unordered_set;
using std::vector;
using strings::Substitute;
@@ -151,4 +158,60 @@ TEST_F(RaftConfigChangeITest, TestKudu2147) {
ASSERT_OK(WaitForServersToAgree(kTimeout, ts_map_, tablet_id, 1));
}
+// Test automatic promotion of a non-voter replica in a 3-4-3 re-replication
+// (KUDU-1097) paradigm.
+TEST_F(RaftConfigChangeITest, TestNonVoterPromotion) {
+ const MonoDelta kTimeout = MonoDelta::FromSeconds(30);
+ // Enable 3-4-3 re-replication.
+ NO_FATALS(StartCluster({"--raft_prepare_replacement_before_eviction=true"},
+ {"--raft_prepare_replacement_before_eviction=true",
+ "--catalog_manager_evict_excess_replicas=false"},
+ /*num_tablet_servers=*/ 4));
+ TestWorkload workload(cluster_.get());
+ workload.Setup();
+
+ // The table should initially have replicas on three tservers.
+ ASSERT_OK(inspect_->WaitForReplicaCount(3));
+ master::GetTableLocationsResponsePB table_locations;
+ ASSERT_OK(itest::GetTableLocations(cluster_->master_proxy(), TestWorkload::kDefaultTableName,
+ kTimeout, &table_locations));
+ ASSERT_EQ(1, table_locations.tablet_locations_size()); // Only 1 tablet.
+ ASSERT_EQ(3, table_locations.tablet_locations().begin()->replicas_size()); // 3 replicas.
+ string tablet_id = table_locations.tablet_locations().begin()->tablet_id();
+
+ // Find the TS that does not have a replica.
+ unordered_set<string> initial_replicas;
+ for (const auto& replica : table_locations.tablet_locations().begin()->replicas())
{
+ initial_replicas.insert(replica.ts_info().permanent_uuid());
+ }
+ TServerDetails* new_replica = nullptr;
+ for (const auto& entry : ts_map_) {
+ if (!ContainsKey(initial_replicas, entry.first)) {
+ new_replica = entry.second;
+ break;
+ }
+ }
+ ASSERT_NE(nullptr, new_replica);
+
+ TServerDetails* leader_replica = nullptr;
+ ASSERT_EVENTUALLY([&] {
+ ASSERT_OK(FindTabletLeader(ts_map_, tablet_id, kTimeout, &leader_replica));
+ ASSERT_NE(new_replica, leader_replica);
+
+ // Add the 4th replica as a NON_VOTER with promote=true.
+ RaftPeerAttrsPB attrs;
+ attrs.set_promote(true);
+ ASSERT_OK(AddServer(leader_replica, tablet_id, new_replica,
+ RaftPeerPB::NON_VOTER, kTimeout, attrs));
+ });
+
+ // Wait for there to be 4 voters in the config.
+ ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(/*config_size=*/ 4,
+ leader_replica,
+ tablet_id,
+ kTimeout));
+
+ NO_FATALS(cluster_->AssertNoCrashes());
+}
+
} // namespace kudu
http://git-wip-us.apache.org/repos/asf/kudu/blob/79a255bb/src/kudu/integration-tests/raft_consensus-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/raft_consensus-itest.cc b/src/kudu/integration-tests/raft_consensus-itest.cc
index c31d7eb..3ceed43 100644
--- a/src/kudu/integration-tests/raft_consensus-itest.cc
+++ b/src/kudu/integration-tests/raft_consensus-itest.cc
@@ -101,6 +101,7 @@ using kudu::consensus::ConsensusServiceProxy;
using kudu::consensus::MajoritySize;
using kudu::consensus::MakeOpId;
using kudu::consensus::OpId;
+using kudu::consensus::RaftPeerAttrsPB;
using kudu::consensus::RaftPeerPB;
using kudu::consensus::ReplicateMsg;
using kudu::itest::AddServer;
@@ -1532,7 +1533,7 @@ TEST_F(RaftConsensusITest, TestAtomicAddRemoveServer) {
// Now, add the server back. Again, specifying something other than the
// latest committed_opid_index should fail.
s = AddServer(leader_tserver, tablet_id_, follower_ts, RaftPeerPB::VOTER,
- MonoDelta::FromSeconds(10), -1, &error_code);
+ MonoDelta::FromSeconds(10), {}, -1, &error_code);
ASSERT_EQ(TabletServerErrorPB::CAS_FAILED, error_code);
ASSERT_STR_CONTAINS(s.ToString(), "of -1 but the committed config has opid_index of 2");
@@ -1540,7 +1541,7 @@ TEST_F(RaftConsensusITest, TestAtomicAddRemoveServer) {
// The previous config change op is the latest entry in the log.
committed_opid_index = cur_log_index;
ASSERT_OK(AddServer(leader_tserver, tablet_id_, follower_ts, RaftPeerPB::VOTER,
- MonoDelta::FromSeconds(10), committed_opid_index));
+ MonoDelta::FromSeconds(10), {}, committed_opid_index));
InsertOrDie(&active_tablet_servers, follower_ts->uuid(), follower_ts);
ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10),
|