Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 3CAF0200CD2 for ; Thu, 27 Jul 2017 21:06:30 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 3B41016B648; Thu, 27 Jul 2017 19:06:30 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id AF10E16B646 for ; Thu, 27 Jul 2017 21:06:28 +0200 (CEST) Received: (qmail 36679 invoked by uid 500); 27 Jul 2017 19:06:27 -0000 Mailing-List: contact commits-help@kudu.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kudu.apache.org Delivered-To: mailing list commits@kudu.apache.org Received: (qmail 36670 invoked by uid 99); 27 Jul 2017 19:06:27 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 27 Jul 2017 19:06:27 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 172B8E02ED; Thu, 27 Jul 2017 19:06:26 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: mpercy@apache.org To: commits@kudu.apache.org Message-Id: <8e93476473594e9fb28a60fea73b056e@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: kudu git commit: [tools] Add a 'kudu tablet relocate' tool Date: Thu, 27 Jul 2017 19:06:26 +0000 (UTC) archived-at: Thu, 27 Jul 2017 19:06:30 -0000 Repository: kudu Updated Branches: refs/heads/master 0ec793e32 -> 5b7626e3a [tools] Add a 'kudu tablet relocate' tool This patch adds a relocate tool that moves a tablet replica from one tablet server to another. Usage: kudu tablet change_config relocate_replica It works by adding to the configuration, waiting for it to tablet copy, then removing . In the typical case, this means the number of replicas will change 3 -> 4 -> 3, and therefore the number of tolerable faults is 1 while the new tablet bootstraps. As a result of this extra fragility, the tool requires the tablet to be in "perfect health" when it runs, meaning ksck returns no errors for the tablet, and also requires the same after the copy is complete but before removing a replica. This probably limits the usefulness of the tool to rebalancing replicas within a healthy tablet. Additionally, this makes some minimal changes to ksck to allow it to print to other output streams besides stdout. The purpose was to allow the output to be suppressed when running the tool, since the use of ksck is an implementation detail, and the output is noisy. Change-Id: I3b7a7243333ba6e6a3d6fce96b220224d6e38a84 Reviewed-on: http://gerrit.cloudera.org:8080/7444 Tested-by: Kudu Jenkins Reviewed-by: Mike Percy Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/5b7626e3 Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/5b7626e3 Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/5b7626e3 Branch: refs/heads/master Commit: 5b7626e3a8250d797f2fc2001c8ee7634a39d18b Parents: 0ec793e Author: Will Berkeley Authored: Fri Jul 14 11:03:01 2017 -0700 Committer: Mike Percy Committed: Thu Jul 27 18:39:02 2017 +0000 ---------------------------------------------------------------------- src/kudu/tools/ksck-test.cc | 13 +- src/kudu/tools/ksck.cc | 69 +++----- src/kudu/tools/ksck.h | 25 ++- src/kudu/tools/ksck_remote-test.cc | 14 +- src/kudu/tools/kudu-admin-test.cc | 78 +++++++++ src/kudu/tools/kudu-tool-test.cc | 3 +- src/kudu/tools/tool_action_tablet.cc | 274 ++++++++++++++++++++++++++---- 7 files changed, 371 insertions(+), 105 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/5b7626e3/src/kudu/tools/ksck-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tools/ksck-test.cc b/src/kudu/tools/ksck-test.cc index 496726f..517d71c 100644 --- a/src/kudu/tools/ksck-test.cc +++ b/src/kudu/tools/ksck-test.cc @@ -15,7 +15,6 @@ // specific language governing permissions and limitations // under the License. -#include #include #include @@ -40,10 +39,6 @@ using std::string; using std::unordered_map; using strings::Substitute; -// Import this symbol from ksck.cc so we can introspect the -// errors being written to stderr. -extern std::ostream* g_err_stream; - class MockKsckTabletServer : public KsckTabletServer { public: explicit MockKsckTabletServer(const string& uuid) @@ -121,7 +116,7 @@ class KsckTest : public KuduTest { KsckTest() : master_(new MockKsckMaster()), cluster_(new KsckCluster(static_pointer_cast(master_))), - ksck_(new Ksck(cluster_)) { + ksck_(new Ksck(cluster_, &err_stream_)) { FLAGS_color = "never"; unordered_map> tablet_servers; for (int i = 0; i < 3; i++) { @@ -130,12 +125,6 @@ class KsckTest : public KuduTest { InsertOrDie(&tablet_servers, ts->uuid(), ts); } master_->tablet_servers_.swap(tablet_servers); - - g_err_stream = &err_stream_; - } - - ~KsckTest() { - g_err_stream = NULL; } protected: http://git-wip-us.apache.org/repos/asf/kudu/blob/5b7626e3/src/kudu/tools/ksck.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tools/ksck.cc b/src/kudu/tools/ksck.cc index 9e5acf1..766f008 100644 --- a/src/kudu/tools/ksck.cc +++ b/src/kudu/tools/ksck.cc @@ -32,7 +32,6 @@ #include "kudu/gutil/strings/human_readable.h" #include "kudu/gutil/strings/substitute.h" #include "kudu/gutil/strings/util.h" -#include "kudu/tools/color.h" #include "kudu/tools/tool_action_common.h" #include "kudu/util/atomic.h" #include "kudu/util/blocking_queue.h" @@ -72,25 +71,6 @@ using std::stringstream; using std::unordered_map; using strings::Substitute; -// The stream to write output to. If this is NULL, defaults to cout. -// This is used by tests to capture output. -ostream* g_err_stream = NULL; - -// Print an informational message to cout. -static ostream& Out() { - return (g_err_stream ? *g_err_stream : cout); -} - -// Print a warning message to cout. -static ostream& Warn() { - return Out() << Color(AnsiCode::YELLOW, "WARNING: "); -} - -// Print an error message to cout. -static ostream& Error() { - return Out() << Color(AnsiCode::RED, "WARNING: "); -} - namespace { // Return true if 'str' matches any of the patterns in 'patterns', or if // 'patterns' is empty. @@ -198,11 +178,10 @@ Status Ksck::FetchInfoFromTabletServers() { if (bad_servers.Load() == 0) { Out() << Substitute("Fetched info from all $0 Tablet Servers", servers_count) << endl; return Status::OK(); - } else { - Warn() << Substitute("Fetched info from $0 Tablet Servers, $1 weren't reachable", - servers_count - bad_servers.Load(), bad_servers.Load()) << endl; - return Status::NetworkError("Not all Tablet Servers are reachable"); } + Warn() << Substitute("Fetched info from $0 Tablet Servers, $1 weren't reachable", + servers_count - bad_servers.Load(), bad_servers.Load()) << endl; + return Status::NetworkError("Not all Tablet Servers are reachable"); } Status Ksck::ConnectToTabletServer(const shared_ptr& ts) { @@ -247,11 +226,10 @@ Status Ksck::CheckTablesConsistency() { if (bad_tables_count == 0) { Out() << Substitute("The metadata for $0 table(s) is HEALTHY", tables_checked) << endl; return Status::OK(); - } else { - Warn() << Substitute("$0 out of $1 table(s) are not in a healthy state", - bad_tables_count, tables_checked) << endl; - return Status::Corruption(Substitute("$0 table(s) are bad", bad_tables_count)); } + Warn() << Substitute("$0 out of $1 table(s) are not in a healthy state", + bad_tables_count, tables_checked) << endl; + return Status::Corruption(Substitute("$0 table(s) are bad", bad_tables_count)); } // Class to act as a collector of scan results. @@ -289,10 +267,10 @@ class ChecksumResultReporter : public RefCountedThreadSafeWaitFor(options.timeout); + bool timed_out = !reporter->WaitFor(options.timeout, out_); // Even if we timed out, print the checksum results that we did get. ChecksumResultReporter::TabletResultMap checksums = reporter->checksums(); @@ -584,21 +562,20 @@ bool Ksck::VerifyTable(const shared_ptr& table) { Color(AnsiCode::GREEN, "HEALTHY"), tablets.size()) << endl; return true; - } else { - if (result_counts[CheckResult::UNAVAILABLE] > 0) { - Out() << Substitute("Table $0 has $1 $2 tablet(s)", - table->name(), - result_counts[CheckResult::UNAVAILABLE], - Color(AnsiCode::RED, "unavailable")) << endl; - } - if (result_counts[CheckResult::UNDER_REPLICATED] > 0) { - Out() << Substitute("Table $0 has $1 $2 tablet(s)", - table->name(), - result_counts[CheckResult::UNDER_REPLICATED], - Color(AnsiCode::YELLOW, "under-replicated")) << endl; - } - return false; } + if (result_counts[CheckResult::UNAVAILABLE] > 0) { + Out() << Substitute("Table $0 has $1 $2 tablet(s)", + table->name(), + result_counts[CheckResult::UNAVAILABLE], + Color(AnsiCode::RED, "unavailable")) << endl; + } + if (result_counts[CheckResult::UNDER_REPLICATED] > 0) { + Out() << Substitute("Table $0 has $1 $2 tablet(s)", + table->name(), + result_counts[CheckResult::UNDER_REPLICATED], + Color(AnsiCode::YELLOW, "under-replicated")) << endl; + } + return false; } namespace { http://git-wip-us.apache.org/repos/asf/kudu/blob/5b7626e3/src/kudu/tools/ksck.h ---------------------------------------------------------------------- diff --git a/src/kudu/tools/ksck.h b/src/kudu/tools/ksck.h index 38171a2..805bbed 100644 --- a/src/kudu/tools/ksck.h +++ b/src/kudu/tools/ksck.h @@ -22,6 +22,7 @@ #include #include +#include #include #include #include @@ -33,6 +34,7 @@ #include "kudu/common/schema.h" #include "kudu/consensus/consensus.service.h" #include "kudu/tablet/tablet.pb.h" +#include "kudu/tools/color.h" #include "kudu/util/countdown_latch.h" #include "kudu/util/locks.h" #include "kudu/util/status.h" @@ -377,8 +379,10 @@ class KsckCluster { // Externally facing class to run checks against the provided cluster. class Ksck { public: - explicit Ksck(std::shared_ptr cluster) - : cluster_(std::move(cluster)) {} + explicit Ksck(std::shared_ptr cluster, std::ostream* out = &std::cout) + : cluster_(std::move(cluster)), + out_(out) {} + ~Ksck() {} // Set whether ksck should verify that each of the tablet's raft configurations @@ -446,12 +450,29 @@ class Ksck { CheckResult VerifyTablet(const std::shared_ptr& tablet, int table_num_replicas); + // Print an informational message to this instance's output stream. + ostream& Out() { + return *out_; + } + + // Print an error message to this instance's output stream. + ostream& Error() { + return (*out_) << Color(AnsiCode::RED, "ERROR: "); + } + + // Print a warning message to this instance's output stream. + ostream& Warn() { + return (*out_) << Color(AnsiCode::YELLOW, "WARNING: "); + } + const std::shared_ptr cluster_; bool check_replica_count_ = true; vector table_filters_; vector tablet_id_filters_; + std::ostream* out_; + DISALLOW_COPY_AND_ASSIGN(Ksck); }; } // namespace tools http://git-wip-us.apache.org/repos/asf/kudu/blob/5b7626e3/src/kudu/tools/ksck_remote-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tools/ksck_remote-test.cc b/src/kudu/tools/ksck_remote-test.cc index 1b7e2ab..7c33940 100644 --- a/src/kudu/tools/ksck_remote-test.cc +++ b/src/kudu/tools/ksck_remote-test.cc @@ -16,12 +16,10 @@ // under the License. #include -#include #include #include "kudu/client/client.h" -#include "kudu/gutil/strings/substitute.h" #include "kudu/integration-tests/internal_mini_cluster.h" #include "kudu/master/mini_master.h" #include "kudu/tools/data_gen_util.h" @@ -46,11 +44,6 @@ using client::sp::shared_ptr; using std::string; using std::unique_ptr; using std::vector; -using strings::Substitute; - -// Import this symbol from ksck.cc so we can introspect the -// errors being written to stderr. -extern std::ostream* g_err_stream; static const char *kTableName = "ksck-test-table"; @@ -62,11 +55,6 @@ class RemoteKsckTest : public KuduTest { b.AddColumn("key")->Type(KuduColumnSchema::INT32)->NotNull()->PrimaryKey(); b.AddColumn("int_val")->Type(KuduColumnSchema::INT32)->NotNull(); CHECK_OK(b.Build(&schema_)); - g_err_stream = &err_stream_; - } - - ~RemoteKsckTest() { - g_err_stream = NULL; } virtual void SetUp() OVERRIDE { @@ -110,7 +98,7 @@ class RemoteKsckTest : public KuduTest { std::shared_ptr master; ASSERT_OK(RemoteKsckMaster::Build(master_addresses, &master)); std::shared_ptr cluster(new KsckCluster(master)); - ksck_.reset(new Ksck(cluster)); + ksck_.reset(new Ksck(cluster, &err_stream_)); } virtual void TearDown() OVERRIDE { http://git-wip-us.apache.org/repos/asf/kudu/blob/5b7626e3/src/kudu/tools/kudu-admin-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tools/kudu-admin-test.cc b/src/kudu/tools/kudu-admin-test.cc index 376d9c1..379959d 100644 --- a/src/kudu/tools/kudu-admin-test.cc +++ b/src/kudu/tools/kudu-admin-test.cc @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +#include +#include #include #include @@ -48,9 +50,11 @@ using itest::TabletServerMap; using itest::TServerDetails; using itest::WAIT_FOR_LEADER; using itest::WaitForReplicasReportedToMaster; +using itest::WaitForServersToAgree; using itest::WaitUntilCommittedOpIdIndexIs; using itest::WaitUntilTabletInState; using itest::WaitUntilTabletRunning; +using std::deque; using std::string; using std::vector; using strings::Substitute; @@ -167,6 +171,80 @@ TEST_F(AdminCliTest, TestChangeConfig) { MonoDelta::FromSeconds(10))); } +// Test relocating replicas while running a workload. +// 1. Instantiate external mini cluster with 5 TS. +// 2. Create a table with 3 replicas. +// 3. Start a workload. +// 4. Using the CLI, move the 3 replicas around the 5 TS. +// 5. Profit! +TEST_F(AdminCliTest, TestMoveTablet) { + FLAGS_num_tablet_servers = 5; + FLAGS_num_replicas = 3; + NO_FATALS(BuildAndStart()); + + vector tservers; + AppendKeysFromMap(tablet_servers_, &tservers); + ASSERT_EQ(FLAGS_num_tablet_servers, tservers.size()); + + deque active_tservers; + for (auto iter = tablet_replicas_.find(tablet_id_); iter != tablet_replicas_.cend(); ++iter) { + active_tservers.push_back(iter->second->uuid()); + } + ASSERT_EQ(FLAGS_num_replicas, active_tservers.size()); + + deque inactive_tservers; + std::sort(tservers.begin(), tservers.end()); + std::sort(active_tservers.begin(), active_tservers.end()); + std::set_difference(tservers.cbegin(), tservers.cend(), + active_tservers.cbegin(), active_tservers.cend(), + std::back_inserter(inactive_tservers)); + ASSERT_EQ(FLAGS_num_tablet_servers - FLAGS_num_replicas, inactive_tservers.size()); + + // The workload is light (1 thread, 1 op batches) so that new replicas + // bootstrap and converge quickly. + TestWorkload workload(cluster_.get()); + workload.set_table_name(kTableId); + workload.set_num_replicas(FLAGS_num_replicas); + workload.set_num_write_threads(1); + workload.set_write_batch_size(1); + workload.Setup(); + workload.Start(); + + // Assuming no ad hoc leadership changes, 3 guarantees the leader is move at least once. + int num_moves = AllowSlowTests() ? 3 : 1; + for (int i = 0; i < num_moves; i++) { + const string remove = active_tservers.front(); + const string add = inactive_tservers.front(); + ASSERT_OK(Subprocess::Call({ + GetKuduCtlAbsolutePath(), + "tablet", + "change_config", + "move_replica", + cluster_->master()->bound_rpc_addr().ToString(), + tablet_id_, + remove, + add + })); + active_tservers.pop_front(); + active_tservers.push_back(add); + inactive_tservers.pop_front(); + inactive_tservers.push_back(remove); + + // Allow the added server time to catch up so it applies the newest configuration. + // If we don't wait, the initial ksck of move_tablet can fail with consensus conflict. + TabletServerMap active_tservers_map; + for (const string& uuid : active_tservers) { + InsertOrDie(&active_tservers_map, uuid, tablet_servers_[uuid]); + } + ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10), active_tservers_map, + tablet_id_, 1)); + } + workload.StopAndJoin(); + + ClusterVerifier v(cluster_.get()); + NO_FATALS(v.CheckCluster()); +} + Status RunUnsafeChangeConfig(const string& tablet_id, const string& dst_host, vector peer_uuid_list) { http://git-wip-us.apache.org/repos/asf/kudu/blob/5b7626e3/src/kudu/tools/kudu-tool-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc index 6edc4e1..1375ddd 100644 --- a/src/kudu/tools/kudu-tool-test.cc +++ b/src/kudu/tools/kudu-tool-test.cc @@ -480,6 +480,7 @@ TEST_F(ToolTest, TestModeHelp) { const vector kChangeConfigModeRegexes = { "add_replica.*Add a new replica", "change_replica_type.*Change the type of an existing replica", + "move_replica.*Move a tablet replica", "remove_replica.*Remove an existing replica" }; NO_FATALS(RunTestHelp("tablet change_config", kChangeConfigModeRegexes)); @@ -1195,7 +1196,7 @@ TEST_F(ToolTest, TestLocalReplicaOps) { } // Create and start Kudu mini cluster, optionally creating a table in the DB, -// and then run 'kudu test loadgen ...' utility against it. +// and then run 'kudu perf loadgen ...' utility against it. void ToolTest::RunLoadgen(int num_tservers, const vector& tool_args, const string& table_name) { http://git-wip-us.apache.org/repos/asf/kudu/blob/5b7626e3/src/kudu/tools/tool_action_tablet.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tools/tool_action_tablet.cc b/src/kudu/tools/tool_action_tablet.cc index 0ebc710..fefc43a 100644 --- a/src/kudu/tools/tool_action_tablet.cc +++ b/src/kudu/tools/tool_action_tablet.cc @@ -18,6 +18,8 @@ #include "kudu/tools/tool_action.h" #include +#include +#include #include #include #include @@ -33,11 +35,19 @@ #include "kudu/gutil/strings/substitute.h" #include "kudu/rpc/rpc_controller.h" #include "kudu/server/server_base.pb.h" +#include "kudu/tools/ksck.h" +#include "kudu/tools/ksck_remote.h" #include "kudu/tools/tool_action_common.h" +#include "kudu/util/monotime.h" #include "kudu/util/net/net_util.h" #include "kudu/util/status.h" #include "kudu/util/string_case.h" +DEFINE_int64(move_copy_timeout_sec, 600, + "Number of seconds to wait for tablet copy to complete when relocating a tablet"); +DEFINE_int64(move_leader_timeout_sec, 10, + "Number of seconds to wait for a leader when relocating a leader tablet"); + namespace kudu { namespace tools { @@ -47,10 +57,17 @@ using client::KuduTablet; using client::KuduTabletServer; using consensus::ChangeConfigType; using consensus::ConsensusServiceProxy; +using consensus::ConsensusStatePB; +using consensus::GetConsensusStateRequestPB; +using consensus::GetConsensusStateResponsePB; +using consensus::GetLastOpIdRequestPB; +using consensus::GetLastOpIdResponsePB; +using consensus::OpId; using consensus::RaftPeerPB; using rpc::RpcController; using std::cout; using std::endl; +using std::shared_ptr; using std::string; using std::unique_ptr; using std::vector; @@ -59,7 +76,9 @@ using strings::Substitute; namespace { const char* const kReplicaTypeArg = "replica_type"; -const char* const kReplicaUuidArg = "replica_uuid"; +const char* const kTsUuidArg = "ts_uuid"; +const char* const kFromTsUuidArg = "from_ts_uuid"; +const char* const kToTsUuidArg = "to_ts_uuid"; Status GetRpcAddressForTS(const client::sp::shared_ptr& client, const string& uuid, @@ -101,25 +120,53 @@ Status GetTabletLeader(const client::sp::shared_ptr& client, "No leader replica found for tablet $0", tablet_id)); } -Status ChangeConfig(const RunnerContext& context, ChangeConfigType cc_type) { - // Parse and validate arguments. - RaftPeerPB peer_pb; - const string& master_addresses_str = FindOrDie(context.required_args, - kMasterAddressesArg); - vector master_addresses = strings::Split(master_addresses_str, ","); - const string& tablet_id = FindOrDie(context.required_args, kTabletIdArg); - const string& replica_uuid = FindOrDie(context.required_args, kReplicaUuidArg); - if (cc_type == consensus::ADD_SERVER || cc_type == consensus::CHANGE_ROLE) { - const string& replica_type = FindOrDie(context.required_args, kReplicaTypeArg); - string uppercase_peer_type; - ToUpperCase(replica_type, &uppercase_peer_type); - RaftPeerPB::MemberType member_type_val; - if (!RaftPeerPB::MemberType_Parse(uppercase_peer_type, &member_type_val)) { - return Status::InvalidArgument("Unrecognized peer type", replica_type); - } - peer_pb.set_member_type(member_type_val); +Status DoKsckForTablet(const vector& master_addresses, const string& tablet_id) { + shared_ptr master; + RETURN_NOT_OK(RemoteKsckMaster::Build(master_addresses, &master)); + shared_ptr cluster(new KsckCluster(master)); + + // Print to an unopened ofstream to discard ksck output. + // See https://stackoverflow.com/questions/8243743. + std::ofstream null_stream; + Ksck ksck(cluster, &null_stream); + ksck.set_tablet_id_filters({ tablet_id }); + RETURN_NOT_OK(ksck.CheckMasterRunning()); + RETURN_NOT_OK(ksck.FetchTableAndTabletInfo()); + RETURN_NOT_OK(ksck.FetchInfoFromTabletServers()); + return ksck.CheckTablesConsistency(); +} + +Status WaitForCleanKsck(const vector& master_addresses, + const string& tablet_id, + const MonoDelta& timeout) { + Status s; + MonoTime deadline = MonoTime::Now() + timeout; + while (MonoTime::Now() < deadline) { + s = DoKsckForTablet(master_addresses, tablet_id); + if (s.ok()) return s; + SleepFor(MonoDelta::FromMilliseconds(1000)); + } + return s.CloneAndPrepend("timed out with ksck errors remaining: last error"); +} + +Status DoChangeConfig(const vector& master_addresses, + const string& tablet_id, + const string& replica_uuid, + const boost::optional& member_type, + ChangeConfigType cc_type) { + if (cc_type == consensus::REMOVE_SERVER && member_type) { + return Status::InvalidArgument("cannot supply Raft member type when removing a server"); + } + if ((cc_type == consensus::ADD_SERVER || cc_type == consensus::CHANGE_ROLE) && !member_type) { + return Status::InvalidArgument( + "must specify member type when adding a server or changing member type"); } + + RaftPeerPB peer_pb; peer_pb.set_permanent_uuid(replica_uuid); + if (member_type) { + peer_pb.set_member_type(*member_type); + } client::sp::shared_ptr client; RETURN_NOT_OK(KuduClientBuilder() @@ -156,6 +203,27 @@ Status ChangeConfig(const RunnerContext& context, ChangeConfigType cc_type) { return Status::OK(); } +Status ChangeConfig(const RunnerContext& context, ChangeConfigType cc_type) { + const string& master_addresses_str = FindOrDie(context.required_args, + kMasterAddressesArg); + vector master_addresses = strings::Split(master_addresses_str, ","); + const string& tablet_id = FindOrDie(context.required_args, kTabletIdArg); + const string& replica_uuid = FindOrDie(context.required_args, kTsUuidArg); + boost::optional member_type; + if (cc_type == consensus::ADD_SERVER || cc_type == consensus::CHANGE_ROLE) { + const string& replica_type = FindOrDie(context.required_args, kReplicaTypeArg); + string uppercase_peer_type; + ToUpperCase(replica_type, &uppercase_peer_type); + RaftPeerPB::MemberType member_type_val; + if (!RaftPeerPB::MemberType_Parse(uppercase_peer_type, &member_type_val)) { + return Status::InvalidArgument("Unrecognized peer type", replica_type); + } + member_type = member_type_val; + } + + return DoChangeConfig(master_addresses, tablet_id, replica_uuid, member_type, cc_type); +} + Status AddReplica(const RunnerContext& context) { return ChangeConfig(context, consensus::ADD_SERVER); } @@ -168,6 +236,25 @@ Status RemoveReplica(const RunnerContext& context) { return ChangeConfig(context, consensus::REMOVE_SERVER); } +Status DoLeaderStepDown(const client::sp::shared_ptr& client, const string& tablet_id, + const string& leader_uuid, const HostPort& leader_hp) { + unique_ptr proxy; + RETURN_NOT_OK(BuildProxy(leader_hp.host(), leader_hp.port(), &proxy)); + + consensus::LeaderStepDownRequestPB req; + consensus::LeaderStepDownResponsePB resp; + RpcController rpc; + rpc.set_timeout(client->default_admin_operation_timeout()); + req.set_dest_uuid(leader_uuid); + req.set_tablet_id(tablet_id); + + RETURN_NOT_OK(proxy->LeaderStepDown(req, &resp, &rpc)); + if (resp.has_error()) { + return StatusFromPB(resp.error().status()); + } + return Status::OK(); +} + Status LeaderStepDown(const RunnerContext& context) { const string& master_addresses_str = FindOrDie(context.required_args, kMasterAddressesArg); @@ -189,23 +276,126 @@ Status LeaderStepDown(const RunnerContext& context) { } RETURN_NOT_OK(s); - unique_ptr proxy; - RETURN_NOT_OK(BuildProxy(leader_hp.host(), leader_hp.port(), &proxy)); - - consensus::LeaderStepDownRequestPB req; - consensus::LeaderStepDownResponsePB resp; - RpcController rpc; - rpc.set_timeout(client->default_admin_operation_timeout()); - req.set_dest_uuid(leader_uuid); - req.set_tablet_id(tablet_id); + return DoLeaderStepDown(client, tablet_id, leader_uuid, leader_hp); +} - RETURN_NOT_OK(proxy->LeaderStepDown(req, &resp, &rpc)); +Status GetConsensusState(const unique_ptr& proxy, + const string& tablet_id, + const string& replica_uuid, + const MonoDelta& timeout, + ConsensusStatePB* consensus_state) { + GetConsensusStateRequestPB req; + GetConsensusStateResponsePB resp; + RpcController controller; + controller.set_timeout(timeout); + req.set_dest_uuid(replica_uuid); + req.add_tablet_ids(tablet_id); + RETURN_NOT_OK(proxy->GetConsensusState(req, &resp, &controller)); if (resp.has_error()) { return StatusFromPB(resp.error().status()); } + if (resp.tablets_size() == 0) { + return Status::NotFound("tablet not found:", tablet_id); + } + DCHECK_EQ(1, resp.tablets_size()); + *consensus_state = resp.tablets(0).cstate(); return Status::OK(); } +Status GetLastCommittedOpId(const string& tablet_id, const string& replica_uuid, + const HostPort& replica_hp, const MonoDelta& timeout, OpId* opid) { + GetLastOpIdRequestPB req; + GetLastOpIdResponsePB resp; + RpcController controller; + controller.set_timeout(timeout); + req.set_tablet_id(tablet_id); + req.set_dest_uuid(replica_uuid); + req.set_opid_type(consensus::COMMITTED_OPID); + + unique_ptr proxy; + RETURN_NOT_OK(BuildProxy(replica_hp.host(), replica_hp.port(), &proxy)); + RETURN_NOT_OK(proxy->GetLastOpId(req, &resp, &controller)); + *opid = resp.opid(); + return Status::OK(); +} + +Status ChangeLeader(const client::sp::shared_ptr& client, const string& tablet_id, + const string& old_leader_uuid, const HostPort& old_leader_hp, + const MonoDelta& timeout) { + unique_ptr proxy; + RETURN_NOT_OK(BuildProxy(old_leader_hp.host(), old_leader_hp.port(), &proxy)); + ConsensusStatePB cstate; + RETURN_NOT_OK(GetConsensusState(proxy, tablet_id, old_leader_uuid, + client->default_admin_operation_timeout(), &cstate)); + int64 current_term = -1; + MonoTime deadline = MonoTime::Now() + timeout; + + // First, check the leader and, if it's the old leader, ask it to step down. + // Repeat until we time out or get a different leader. + while (MonoTime::Now() < deadline) { + RETURN_NOT_OK(GetConsensusState(proxy, tablet_id, old_leader_uuid, + client->default_admin_operation_timeout(), &cstate)); + + if (cstate.current_term() > current_term && cstate.has_leader_uuid()) { + current_term = cstate.current_term(); + if (cstate.leader_uuid() != old_leader_uuid) { + break; + } + RETURN_NOT_OK(DoLeaderStepDown(client, tablet_id, old_leader_uuid, old_leader_hp)); + } + SleepFor(MonoDelta::FromMilliseconds(1000)); + } + + // Second, once we have a new leader, wait for it to assert leadership by replicating an op + // in the current term to the old leader. + OpId opid; + while (MonoTime::Now() < deadline) { + RETURN_NOT_OK(GetLastCommittedOpId(tablet_id, old_leader_uuid, old_leader_hp, + client->default_admin_operation_timeout(), &opid)); + if (opid.term() == current_term) { + return Status::OK(); + } + SleepFor(MonoDelta::FromMilliseconds(500)); + } + + return Status::TimedOut("unable to change leadership in time"); +} + +Status MoveReplica(const RunnerContext &context) { + const string& master_addresses_str = FindOrDie(context.required_args, kMasterAddressesArg); + vector master_addresses = strings::Split(master_addresses_str, ","); + const string& tablet_id = FindOrDie(context.required_args, kTabletIdArg); + const string& rem_replica_uuid = FindOrDie(context.required_args, kFromTsUuidArg); + const string& add_replica_uuid = FindOrDie(context.required_args, kToTsUuidArg); + + // Check the tablet is in perfect health and, if so, add the new server. + RETURN_NOT_OK_PREPEND(DoKsckForTablet(master_addresses, tablet_id), + "ksck pre-move health check failed"); + RETURN_NOT_OK(DoChangeConfig(master_addresses, tablet_id, add_replica_uuid, + RaftPeerPB::VOTER, consensus::ADD_SERVER)); + + // Wait until the tablet copy completes and the tablet returns to perfect health. + MonoDelta copy_timeout = MonoDelta::FromSeconds(FLAGS_move_copy_timeout_sec); + RETURN_NOT_OK_PREPEND(WaitForCleanKsck(master_addresses, tablet_id, copy_timeout), + "failed waiting for clean ksck after add server"); + + // Finally, remove the chosen replica. + // If it is the leader, it will be asked to step down. + client::sp::shared_ptr client; + RETURN_NOT_OK(KuduClientBuilder().master_server_addrs(master_addresses).Build(&client)); + string leader_uuid; + HostPort leader_hp; + RETURN_NOT_OK(GetTabletLeader(client, tablet_id, &leader_uuid, &leader_hp)); + if (rem_replica_uuid == leader_uuid) { + RETURN_NOT_OK_PREPEND(ChangeLeader(client, tablet_id, + leader_uuid, leader_hp, + MonoDelta::FromSeconds(FLAGS_move_leader_timeout_sec)), + "failed changing leadership from the replica to be removed"); + } + return DoChangeConfig(master_addresses, tablet_id, rem_replica_uuid, + boost::none, consensus::REMOVE_SERVER); +} + } // anonymous namespace unique_ptr BuildTabletMode() { @@ -214,7 +404,8 @@ unique_ptr BuildTabletMode() { .Description("Add a new replica to a tablet's Raft configuration") .AddRequiredParameter({ kMasterAddressesArg, kMasterAddressesArgDesc }) .AddRequiredParameter({ kTabletIdArg, kTabletIdArgDesc }) - .AddRequiredParameter({ kReplicaUuidArg, "New replica's UUID" }) + .AddRequiredParameter({ kTsUuidArg, + "UUID of the tablet server that should host the new replica" }) .AddRequiredParameter( { kReplicaTypeArg, "New replica's type. Must be VOTER or NON-VOTER." }) @@ -226,7 +417,8 @@ unique_ptr BuildTabletMode() { "Change the type of an existing replica in a tablet's Raft configuration") .AddRequiredParameter({ kMasterAddressesArg, kMasterAddressesArgDesc }) .AddRequiredParameter({ kTabletIdArg, kTabletIdArgDesc }) - .AddRequiredParameter({ kReplicaUuidArg, "Existing replica's UUID" }) + .AddRequiredParameter({ kTsUuidArg, + "UUID of the tablet server hosting the existing replica" }) .AddRequiredParameter( { kReplicaTypeArg, "Existing replica's new type. Must be VOTER or NON-VOTER." }) @@ -237,7 +429,26 @@ unique_ptr BuildTabletMode() { .Description("Remove an existing replica from a tablet's Raft configuration") .AddRequiredParameter({ kMasterAddressesArg, kMasterAddressesArgDesc }) .AddRequiredParameter({ kTabletIdArg, kTabletIdArgDesc }) - .AddRequiredParameter({ kReplicaUuidArg, "Existing replica's UUID" }) + .AddRequiredParameter({ kTsUuidArg, + "UUID of the tablet server hosting the existing replica" }) + .Build(); + + const string move_extra_desc = "The replica move tool effectively moves a " + "replica from one tablet server to another by adding a replica to the " + "new server and then removing it from the old one. It requires that " + "ksck return no errors when run against the target tablet. If the move " + "fails, the user should wait for any tablet copy to complete, and, if " + "the copy succeeds, use remove_replica manually. If the copy fails, the " + "new replica will be deleted automatically after some time, and then the " + "move can be retried."; + unique_ptr move_replica = + ActionBuilder("move_replica", &MoveReplica) + .Description("Move a tablet replica from one tablet server to another") + .ExtraDescription(move_extra_desc) + .AddRequiredParameter({ kMasterAddressesArg, kMasterAddressesArgDesc }) + .AddRequiredParameter({ kTabletIdArg, kTabletIdArgDesc }) + .AddRequiredParameter({ kFromTsUuidArg, "UUID of the tablet server to move from" }) + .AddRequiredParameter({ kToTsUuidArg, "UUID of the tablet server to move to" }) .Build(); unique_ptr leader_step_down = @@ -252,6 +463,7 @@ unique_ptr BuildTabletMode() { .Description("Change a tablet's Raft configuration") .AddAction(std::move(add_replica)) .AddAction(std::move(change_replica_type)) + .AddAction(std::move(move_replica)) .AddAction(std::move(remove_replica)) .Build();