kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mpe...@apache.org
Subject kudu git commit: [tools] Add a 'kudu tablet relocate' tool
Date Thu, 27 Jul 2017 19:06:26 GMT
Repository: kudu
Updated Branches:
  refs/heads/master 0ec793e32 -> 5b7626e3a


[tools] Add a 'kudu tablet relocate' tool

This patch adds a relocate tool that moves a tablet replica from
one tablet server to another. Usage:

kudu tablet change_config relocate_replica <masters> <tablet id> <remove id>
<add id>

It works by adding <add id> to the configuration, waiting for it
to tablet copy, then removing <remove id>. In the typical case,
this means the number of replicas will change 3 -> 4 -> 3, and
therefore the number of tolerable faults is 1 while the new
tablet bootstraps. As a result of this extra fragility, the tool
requires the tablet to be in "perfect health" when it runs,
meaning ksck returns no errors for the tablet, and also requires
the same after the copy is complete but before removing a
replica. This probably limits the usefulness of the tool to
rebalancing replicas within a healthy tablet.

Additionally, this makes some minimal changes to ksck to allow
it to print to other output streams besides stdout. The purpose
was to allow the output to be suppressed when running the tool,
since the use of ksck is an implementation detail, and the
output is noisy.

Change-Id: I3b7a7243333ba6e6a3d6fce96b220224d6e38a84
Reviewed-on: http://gerrit.cloudera.org:8080/7444
Tested-by: Kudu Jenkins
Reviewed-by: Mike Percy <mpercy@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/5b7626e3
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/5b7626e3
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/5b7626e3

Branch: refs/heads/master
Commit: 5b7626e3a8250d797f2fc2001c8ee7634a39d18b
Parents: 0ec793e
Author: Will Berkeley <wdberkeley@apache.org>
Authored: Fri Jul 14 11:03:01 2017 -0700
Committer: Mike Percy <mpercy@apache.org>
Committed: Thu Jul 27 18:39:02 2017 +0000

----------------------------------------------------------------------
 src/kudu/tools/ksck-test.cc          |  13 +-
 src/kudu/tools/ksck.cc               |  69 +++-----
 src/kudu/tools/ksck.h                |  25 ++-
 src/kudu/tools/ksck_remote-test.cc   |  14 +-
 src/kudu/tools/kudu-admin-test.cc    |  78 +++++++++
 src/kudu/tools/kudu-tool-test.cc     |   3 +-
 src/kudu/tools/tool_action_tablet.cc | 274 ++++++++++++++++++++++++++----
 7 files changed, 371 insertions(+), 105 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/5b7626e3/src/kudu/tools/ksck-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/ksck-test.cc b/src/kudu/tools/ksck-test.cc
index 496726f..517d71c 100644
--- a/src/kudu/tools/ksck-test.cc
+++ b/src/kudu/tools/ksck-test.cc
@@ -15,7 +15,6 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include <iosfwd>
 #include <memory>
 #include <unordered_map>
 
@@ -40,10 +39,6 @@ using std::string;
 using std::unordered_map;
 using strings::Substitute;
 
-// Import this symbol from ksck.cc so we can introspect the
-// errors being written to stderr.
-extern std::ostream* g_err_stream;
-
 class MockKsckTabletServer : public KsckTabletServer {
  public:
   explicit MockKsckTabletServer(const string& uuid)
@@ -121,7 +116,7 @@ class KsckTest : public KuduTest {
   KsckTest()
       : master_(new MockKsckMaster()),
         cluster_(new KsckCluster(static_pointer_cast<KsckMaster>(master_))),
-        ksck_(new Ksck(cluster_)) {
+        ksck_(new Ksck(cluster_, &err_stream_)) {
     FLAGS_color = "never";
     unordered_map<string, shared_ptr<KsckTabletServer>> tablet_servers;
     for (int i = 0; i < 3; i++) {
@@ -130,12 +125,6 @@ class KsckTest : public KuduTest {
       InsertOrDie(&tablet_servers, ts->uuid(), ts);
     }
     master_->tablet_servers_.swap(tablet_servers);
-
-    g_err_stream = &err_stream_;
-  }
-
-  ~KsckTest() {
-    g_err_stream = NULL;
   }
 
  protected:

http://git-wip-us.apache.org/repos/asf/kudu/blob/5b7626e3/src/kudu/tools/ksck.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/ksck.cc b/src/kudu/tools/ksck.cc
index 9e5acf1..766f008 100644
--- a/src/kudu/tools/ksck.cc
+++ b/src/kudu/tools/ksck.cc
@@ -32,7 +32,6 @@
 #include "kudu/gutil/strings/human_readable.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/gutil/strings/util.h"
-#include "kudu/tools/color.h"
 #include "kudu/tools/tool_action_common.h"
 #include "kudu/util/atomic.h"
 #include "kudu/util/blocking_queue.h"
@@ -72,25 +71,6 @@ using std::stringstream;
 using std::unordered_map;
 using strings::Substitute;
 
-// The stream to write output to. If this is NULL, defaults to cout.
-// This is used by tests to capture output.
-ostream* g_err_stream = NULL;
-
-// Print an informational message to cout.
-static ostream& Out() {
-  return (g_err_stream ? *g_err_stream : cout);
-}
-
-// Print a warning message to cout.
-static ostream& Warn() {
-  return Out() << Color(AnsiCode::YELLOW, "WARNING: ");
-}
-
-// Print an error message to cout.
-static ostream& Error() {
-  return Out() << Color(AnsiCode::RED, "WARNING: ");
-}
-
 namespace {
 // Return true if 'str' matches any of the patterns in 'patterns', or if
 // 'patterns' is empty.
@@ -198,11 +178,10 @@ Status Ksck::FetchInfoFromTabletServers() {
   if (bad_servers.Load() == 0) {
     Out() << Substitute("Fetched info from all $0 Tablet Servers", servers_count) <<
endl;
     return Status::OK();
-  } else {
-    Warn() << Substitute("Fetched info from $0 Tablet Servers, $1 weren't reachable",
-                         servers_count - bad_servers.Load(), bad_servers.Load()) <<
endl;
-    return Status::NetworkError("Not all Tablet Servers are reachable");
   }
+  Warn() << Substitute("Fetched info from $0 Tablet Servers, $1 weren't reachable",
+                       servers_count - bad_servers.Load(), bad_servers.Load()) << endl;
+  return Status::NetworkError("Not all Tablet Servers are reachable");
 }
 
 Status Ksck::ConnectToTabletServer(const shared_ptr<KsckTabletServer>& ts) {
@@ -247,11 +226,10 @@ Status Ksck::CheckTablesConsistency() {
   if (bad_tables_count == 0) {
     Out() << Substitute("The metadata for $0 table(s) is HEALTHY", tables_checked)
<< endl;
     return Status::OK();
-  } else {
-    Warn() << Substitute("$0 out of $1 table(s) are not in a healthy state",
-                         bad_tables_count, tables_checked) << endl;
-    return Status::Corruption(Substitute("$0 table(s) are bad", bad_tables_count));
   }
+  Warn() << Substitute("$0 out of $1 table(s) are not in a healthy state",
+                       bad_tables_count, tables_checked) << endl;
+  return Status::Corruption(Substitute("$0 table(s) are bad", bad_tables_count));
 }
 
 // Class to act as a collector of scan results.
@@ -289,10 +267,10 @@ class ChecksumResultReporter : public RefCountedThreadSafe<ChecksumResultReporte
 
   // Blocks until either the number of results plus errors reported equals
   // num_tablet_replicas (from the constructor), or until the timeout expires,
-  // whichever comes first.
+  // whichever comes first. Progress messages are printed to 'out'.
   // Returns false if the timeout expired before all responses came in.
   // Otherwise, returns true.
-  bool WaitFor(const MonoDelta& timeout) const {
+  bool WaitFor(const MonoDelta& timeout, std::ostream* out) const {
     MonoTime start = MonoTime::Now();
     MonoTime deadline = start + timeout;
 
@@ -305,7 +283,7 @@ class ChecksumResultReporter : public RefCountedThreadSafe<ChecksumResultReporte
       done = responses_.WaitFor(MonoDelta::FromMilliseconds(std::min(rem_ms, 5000)));
       string status = done ? "finished in " : "running for ";
       int run_time_sec = (MonoTime::Now() - start).ToSeconds();
-      Out() << "Checksum " << status << run_time_sec << "s: "
+      (*out) << "Checksum " << status << run_time_sec << "s: "
              << responses_.count() << "/" << expected_count_ << "
replicas remaining ("
              << HumanReadableNumBytes::ToString(disk_bytes_summed_.Load()) <<
" from disk, "
              << HumanReadableInt::ToString(rows_summed_.Load()) << " rows summed)"
@@ -492,7 +470,7 @@ Status Ksck::ChecksumData(const ChecksumOptions& opts) {
     }
   }
 
-  bool timed_out = !reporter->WaitFor(options.timeout);
+  bool timed_out = !reporter->WaitFor(options.timeout, out_);
 
   // Even if we timed out, print the checksum results that we did get.
   ChecksumResultReporter::TabletResultMap checksums = reporter->checksums();
@@ -584,21 +562,20 @@ bool Ksck::VerifyTable(const shared_ptr<KsckTable>& table)
{
                         Color(AnsiCode::GREEN, "HEALTHY"),
                         tablets.size()) << endl;
     return true;
-  } else {
-    if (result_counts[CheckResult::UNAVAILABLE] > 0) {
-      Out() << Substitute("Table $0 has $1 $2 tablet(s)",
-                          table->name(),
-                          result_counts[CheckResult::UNAVAILABLE],
-                          Color(AnsiCode::RED, "unavailable")) << endl;
-    }
-    if (result_counts[CheckResult::UNDER_REPLICATED] > 0) {
-      Out() << Substitute("Table $0 has $1 $2 tablet(s)",
-                          table->name(),
-                          result_counts[CheckResult::UNDER_REPLICATED],
-                          Color(AnsiCode::YELLOW, "under-replicated")) << endl;
-    }
-    return false;
   }
+  if (result_counts[CheckResult::UNAVAILABLE] > 0) {
+    Out() << Substitute("Table $0 has $1 $2 tablet(s)",
+                        table->name(),
+                        result_counts[CheckResult::UNAVAILABLE],
+                        Color(AnsiCode::RED, "unavailable")) << endl;
+  }
+  if (result_counts[CheckResult::UNDER_REPLICATED] > 0) {
+    Out() << Substitute("Table $0 has $1 $2 tablet(s)",
+                        table->name(),
+                        result_counts[CheckResult::UNDER_REPLICATED],
+                        Color(AnsiCode::YELLOW, "under-replicated")) << endl;
+  }
+  return false;
 }
 
 namespace {

http://git-wip-us.apache.org/repos/asf/kudu/blob/5b7626e3/src/kudu/tools/ksck.h
----------------------------------------------------------------------
diff --git a/src/kudu/tools/ksck.h b/src/kudu/tools/ksck.h
index 38171a2..805bbed 100644
--- a/src/kudu/tools/ksck.h
+++ b/src/kudu/tools/ksck.h
@@ -22,6 +22,7 @@
 
 #include <boost/optional.hpp>
 #include <gtest/gtest_prod.h>
+#include <iostream>
 #include <map>
 #include <memory>
 #include <set>
@@ -33,6 +34,7 @@
 #include "kudu/common/schema.h"
 #include "kudu/consensus/consensus.service.h"
 #include "kudu/tablet/tablet.pb.h"
+#include "kudu/tools/color.h"
 #include "kudu/util/countdown_latch.h"
 #include "kudu/util/locks.h"
 #include "kudu/util/status.h"
@@ -377,8 +379,10 @@ class KsckCluster {
 // Externally facing class to run checks against the provided cluster.
 class Ksck {
  public:
-  explicit Ksck(std::shared_ptr<KsckCluster> cluster)
-      : cluster_(std::move(cluster)) {}
+  explicit Ksck(std::shared_ptr<KsckCluster> cluster, std::ostream* out = &std::cout)
+      : cluster_(std::move(cluster)),
+        out_(out) {}
+
   ~Ksck() {}
 
   // Set whether ksck should verify that each of the tablet's raft configurations
@@ -446,12 +450,29 @@ class Ksck {
   CheckResult VerifyTablet(const std::shared_ptr<KsckTablet>& tablet,
                            int table_num_replicas);
 
+  // Print an informational message to this instance's output stream.
+  ostream& Out() {
+    return *out_;
+  }
+
+  // Print an error message to this instance's output stream.
+  ostream& Error() {
+    return (*out_) << Color(AnsiCode::RED, "ERROR: ");
+  }
+
+  // Print a warning message to this instance's output stream.
+  ostream& Warn() {
+    return (*out_) << Color(AnsiCode::YELLOW, "WARNING: ");
+  }
+
   const std::shared_ptr<KsckCluster> cluster_;
 
   bool check_replica_count_ = true;
   vector<string> table_filters_;
   vector<string> tablet_id_filters_;
 
+  std::ostream* out_;
+
   DISALLOW_COPY_AND_ASSIGN(Ksck);
 };
 } // namespace tools

http://git-wip-us.apache.org/repos/asf/kudu/blob/5b7626e3/src/kudu/tools/ksck_remote-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/ksck_remote-test.cc b/src/kudu/tools/ksck_remote-test.cc
index 1b7e2ab..7c33940 100644
--- a/src/kudu/tools/ksck_remote-test.cc
+++ b/src/kudu/tools/ksck_remote-test.cc
@@ -16,12 +16,10 @@
 // under the License.
 
 #include <memory>
-#include <sstream>
 
 #include <gtest/gtest.h>
 
 #include "kudu/client/client.h"
-#include "kudu/gutil/strings/substitute.h"
 #include "kudu/integration-tests/internal_mini_cluster.h"
 #include "kudu/master/mini_master.h"
 #include "kudu/tools/data_gen_util.h"
@@ -46,11 +44,6 @@ using client::sp::shared_ptr;
 using std::string;
 using std::unique_ptr;
 using std::vector;
-using strings::Substitute;
-
-// Import this symbol from ksck.cc so we can introspect the
-// errors being written to stderr.
-extern std::ostream* g_err_stream;
 
 static const char *kTableName = "ksck-test-table";
 
@@ -62,11 +55,6 @@ class RemoteKsckTest : public KuduTest {
     b.AddColumn("key")->Type(KuduColumnSchema::INT32)->NotNull()->PrimaryKey();
     b.AddColumn("int_val")->Type(KuduColumnSchema::INT32)->NotNull();
     CHECK_OK(b.Build(&schema_));
-    g_err_stream = &err_stream_;
-  }
-
-  ~RemoteKsckTest() {
-    g_err_stream = NULL;
   }
 
   virtual void SetUp() OVERRIDE {
@@ -110,7 +98,7 @@ class RemoteKsckTest : public KuduTest {
     std::shared_ptr<KsckMaster> master;
     ASSERT_OK(RemoteKsckMaster::Build(master_addresses, &master));
     std::shared_ptr<KsckCluster> cluster(new KsckCluster(master));
-    ksck_.reset(new Ksck(cluster));
+    ksck_.reset(new Ksck(cluster, &err_stream_));
   }
 
   virtual void TearDown() OVERRIDE {

http://git-wip-us.apache.org/repos/asf/kudu/blob/5b7626e3/src/kudu/tools/kudu-admin-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/kudu-admin-test.cc b/src/kudu/tools/kudu-admin-test.cc
index 376d9c1..379959d 100644
--- a/src/kudu/tools/kudu-admin-test.cc
+++ b/src/kudu/tools/kudu-admin-test.cc
@@ -15,6 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include <algorithm>
+#include <deque>
 #include <string>
 #include <vector>
 
@@ -48,9 +50,11 @@ using itest::TabletServerMap;
 using itest::TServerDetails;
 using itest::WAIT_FOR_LEADER;
 using itest::WaitForReplicasReportedToMaster;
+using itest::WaitForServersToAgree;
 using itest::WaitUntilCommittedOpIdIndexIs;
 using itest::WaitUntilTabletInState;
 using itest::WaitUntilTabletRunning;
+using std::deque;
 using std::string;
 using std::vector;
 using strings::Substitute;
@@ -167,6 +171,80 @@ TEST_F(AdminCliTest, TestChangeConfig) {
                                                 MonoDelta::FromSeconds(10)));
 }
 
+// Test relocating replicas while running a workload.
+// 1. Instantiate external mini cluster with 5 TS.
+// 2. Create a table with 3 replicas.
+// 3. Start a workload.
+// 4. Using the CLI, move the 3 replicas around the 5 TS.
+// 5. Profit!
+TEST_F(AdminCliTest, TestMoveTablet) {
+  FLAGS_num_tablet_servers = 5;
+  FLAGS_num_replicas = 3;
+  NO_FATALS(BuildAndStart());
+
+  vector<string> tservers;
+  AppendKeysFromMap(tablet_servers_, &tservers);
+  ASSERT_EQ(FLAGS_num_tablet_servers, tservers.size());
+
+  deque<string> active_tservers;
+  for (auto iter = tablet_replicas_.find(tablet_id_); iter != tablet_replicas_.cend(); ++iter)
{
+    active_tservers.push_back(iter->second->uuid());
+  }
+  ASSERT_EQ(FLAGS_num_replicas, active_tservers.size());
+
+  deque<string> inactive_tservers;
+  std::sort(tservers.begin(), tservers.end());
+  std::sort(active_tservers.begin(), active_tservers.end());
+  std::set_difference(tservers.cbegin(), tservers.cend(),
+                      active_tservers.cbegin(), active_tservers.cend(),
+                      std::back_inserter(inactive_tservers));
+  ASSERT_EQ(FLAGS_num_tablet_servers - FLAGS_num_replicas, inactive_tservers.size());
+
+  // The workload is light (1 thread, 1 op batches) so that new replicas
+  // bootstrap and converge quickly.
+  TestWorkload workload(cluster_.get());
+  workload.set_table_name(kTableId);
+  workload.set_num_replicas(FLAGS_num_replicas);
+  workload.set_num_write_threads(1);
+  workload.set_write_batch_size(1);
+  workload.Setup();
+  workload.Start();
+
+  // Assuming no ad hoc leadership changes, 3 guarantees the leader is move at least once.
+  int num_moves = AllowSlowTests() ? 3 : 1;
+  for (int i = 0; i < num_moves; i++) {
+    const string remove = active_tservers.front();
+    const string add = inactive_tservers.front();
+    ASSERT_OK(Subprocess::Call({
+                                   GetKuduCtlAbsolutePath(),
+                                   "tablet",
+                                   "change_config",
+                                   "move_replica",
+                                   cluster_->master()->bound_rpc_addr().ToString(),
+                                   tablet_id_,
+                                   remove,
+                                   add
+                               }));
+    active_tservers.pop_front();
+    active_tservers.push_back(add);
+    inactive_tservers.pop_front();
+    inactive_tservers.push_back(remove);
+
+    // Allow the added server time to catch up so it applies the newest configuration.
+    // If we don't wait, the initial ksck of move_tablet can fail with consensus conflict.
+    TabletServerMap active_tservers_map;
+    for (const string& uuid : active_tservers) {
+      InsertOrDie(&active_tservers_map, uuid, tablet_servers_[uuid]);
+    }
+    ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10), active_tservers_map,
+                                    tablet_id_, 1));
+  }
+  workload.StopAndJoin();
+
+  ClusterVerifier v(cluster_.get());
+  NO_FATALS(v.CheckCluster());
+}
+
 Status RunUnsafeChangeConfig(const string& tablet_id,
                              const string& dst_host,
                              vector<string> peer_uuid_list) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/5b7626e3/src/kudu/tools/kudu-tool-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc
index 6edc4e1..1375ddd 100644
--- a/src/kudu/tools/kudu-tool-test.cc
+++ b/src/kudu/tools/kudu-tool-test.cc
@@ -480,6 +480,7 @@ TEST_F(ToolTest, TestModeHelp) {
     const vector<string> kChangeConfigModeRegexes = {
         "add_replica.*Add a new replica",
         "change_replica_type.*Change the type of an existing replica",
+        "move_replica.*Move a tablet replica",
         "remove_replica.*Remove an existing replica"
     };
     NO_FATALS(RunTestHelp("tablet change_config", kChangeConfigModeRegexes));
@@ -1195,7 +1196,7 @@ TEST_F(ToolTest, TestLocalReplicaOps) {
 }
 
 // Create and start Kudu mini cluster, optionally creating a table in the DB,
-// and then run 'kudu test loadgen ...' utility against it.
+// and then run 'kudu perf loadgen ...' utility against it.
 void ToolTest::RunLoadgen(int num_tservers,
                           const vector<string>& tool_args,
                           const string& table_name) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/5b7626e3/src/kudu/tools/tool_action_tablet.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/tool_action_tablet.cc b/src/kudu/tools/tool_action_tablet.cc
index 0ebc710..fefc43a 100644
--- a/src/kudu/tools/tool_action_tablet.cc
+++ b/src/kudu/tools/tool_action_tablet.cc
@@ -18,6 +18,8 @@
 #include "kudu/tools/tool_action.h"
 
 #include <algorithm>
+#include <boost/optional.hpp>
+#include <fstream>
 #include <iostream>
 #include <memory>
 #include <string>
@@ -33,11 +35,19 @@
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/rpc/rpc_controller.h"
 #include "kudu/server/server_base.pb.h"
+#include "kudu/tools/ksck.h"
+#include "kudu/tools/ksck_remote.h"
 #include "kudu/tools/tool_action_common.h"
+#include "kudu/util/monotime.h"
 #include "kudu/util/net/net_util.h"
 #include "kudu/util/status.h"
 #include "kudu/util/string_case.h"
 
+DEFINE_int64(move_copy_timeout_sec, 600,
+             "Number of seconds to wait for tablet copy to complete when relocating a tablet");
+DEFINE_int64(move_leader_timeout_sec, 10,
+             "Number of seconds to wait for a leader when relocating a leader tablet");
+
 namespace kudu {
 namespace tools {
 
@@ -47,10 +57,17 @@ using client::KuduTablet;
 using client::KuduTabletServer;
 using consensus::ChangeConfigType;
 using consensus::ConsensusServiceProxy;
+using consensus::ConsensusStatePB;
+using consensus::GetConsensusStateRequestPB;
+using consensus::GetConsensusStateResponsePB;
+using consensus::GetLastOpIdRequestPB;
+using consensus::GetLastOpIdResponsePB;
+using consensus::OpId;
 using consensus::RaftPeerPB;
 using rpc::RpcController;
 using std::cout;
 using std::endl;
+using std::shared_ptr;
 using std::string;
 using std::unique_ptr;
 using std::vector;
@@ -59,7 +76,9 @@ using strings::Substitute;
 namespace {
 
 const char* const kReplicaTypeArg = "replica_type";
-const char* const kReplicaUuidArg = "replica_uuid";
+const char* const kTsUuidArg = "ts_uuid";
+const char* const kFromTsUuidArg = "from_ts_uuid";
+const char* const kToTsUuidArg = "to_ts_uuid";
 
 Status GetRpcAddressForTS(const client::sp::shared_ptr<KuduClient>& client,
                           const string& uuid,
@@ -101,25 +120,53 @@ Status GetTabletLeader(const client::sp::shared_ptr<KuduClient>&
client,
       "No leader replica found for tablet $0", tablet_id));
 }
 
-Status ChangeConfig(const RunnerContext& context, ChangeConfigType cc_type) {
-  // Parse and validate arguments.
-  RaftPeerPB peer_pb;
-  const string& master_addresses_str = FindOrDie(context.required_args,
-                                                 kMasterAddressesArg);
-  vector<string> master_addresses = strings::Split(master_addresses_str, ",");
-  const string& tablet_id = FindOrDie(context.required_args, kTabletIdArg);
-  const string& replica_uuid = FindOrDie(context.required_args, kReplicaUuidArg);
-  if (cc_type == consensus::ADD_SERVER || cc_type == consensus::CHANGE_ROLE) {
-    const string& replica_type = FindOrDie(context.required_args, kReplicaTypeArg);
-    string uppercase_peer_type;
-    ToUpperCase(replica_type, &uppercase_peer_type);
-    RaftPeerPB::MemberType member_type_val;
-    if (!RaftPeerPB::MemberType_Parse(uppercase_peer_type, &member_type_val)) {
-      return Status::InvalidArgument("Unrecognized peer type", replica_type);
-    }
-    peer_pb.set_member_type(member_type_val);
+Status DoKsckForTablet(const vector<string>& master_addresses, const string&
tablet_id) {
+  shared_ptr<KsckMaster> master;
+  RETURN_NOT_OK(RemoteKsckMaster::Build(master_addresses, &master));
+  shared_ptr<KsckCluster> cluster(new KsckCluster(master));
+
+  // Print to an unopened ofstream to discard ksck output.
+  // See https://stackoverflow.com/questions/8243743.
+  std::ofstream null_stream;
+  Ksck ksck(cluster, &null_stream);
+  ksck.set_tablet_id_filters({ tablet_id });
+  RETURN_NOT_OK(ksck.CheckMasterRunning());
+  RETURN_NOT_OK(ksck.FetchTableAndTabletInfo());
+  RETURN_NOT_OK(ksck.FetchInfoFromTabletServers());
+  return ksck.CheckTablesConsistency();
+}
+
+Status WaitForCleanKsck(const vector<string>& master_addresses,
+                        const string& tablet_id,
+                        const MonoDelta& timeout) {
+  Status s;
+  MonoTime deadline = MonoTime::Now() + timeout;
+  while (MonoTime::Now() < deadline) {
+    s = DoKsckForTablet(master_addresses, tablet_id);
+    if (s.ok()) return s;
+    SleepFor(MonoDelta::FromMilliseconds(1000));
+  }
+  return s.CloneAndPrepend("timed out with ksck errors remaining: last error");
+}
+
+Status DoChangeConfig(const vector<string>& master_addresses,
+                      const string& tablet_id,
+                      const string& replica_uuid,
+                      const boost::optional<RaftPeerPB::MemberType>& member_type,
+                      ChangeConfigType cc_type) {
+  if (cc_type == consensus::REMOVE_SERVER && member_type) {
+    return Status::InvalidArgument("cannot supply Raft member type when removing a server");
+  }
+  if ((cc_type == consensus::ADD_SERVER || cc_type == consensus::CHANGE_ROLE) &&
!member_type) {
+    return Status::InvalidArgument(
+        "must specify member type when adding a server or changing member type");
   }
+
+  RaftPeerPB peer_pb;
   peer_pb.set_permanent_uuid(replica_uuid);
+  if (member_type) {
+    peer_pb.set_member_type(*member_type);
+  }
 
   client::sp::shared_ptr<KuduClient> client;
   RETURN_NOT_OK(KuduClientBuilder()
@@ -156,6 +203,27 @@ Status ChangeConfig(const RunnerContext& context, ChangeConfigType
cc_type) {
   return Status::OK();
 }
 
+Status ChangeConfig(const RunnerContext& context, ChangeConfigType cc_type) {
+  const string& master_addresses_str = FindOrDie(context.required_args,
+                                                 kMasterAddressesArg);
+  vector<string> master_addresses = strings::Split(master_addresses_str, ",");
+  const string& tablet_id = FindOrDie(context.required_args, kTabletIdArg);
+  const string& replica_uuid = FindOrDie(context.required_args, kTsUuidArg);
+  boost::optional<RaftPeerPB::MemberType> member_type;
+  if (cc_type == consensus::ADD_SERVER || cc_type == consensus::CHANGE_ROLE) {
+    const string& replica_type = FindOrDie(context.required_args, kReplicaTypeArg);
+    string uppercase_peer_type;
+    ToUpperCase(replica_type, &uppercase_peer_type);
+    RaftPeerPB::MemberType member_type_val;
+    if (!RaftPeerPB::MemberType_Parse(uppercase_peer_type, &member_type_val)) {
+      return Status::InvalidArgument("Unrecognized peer type", replica_type);
+    }
+    member_type = member_type_val;
+  }
+
+  return DoChangeConfig(master_addresses, tablet_id, replica_uuid, member_type, cc_type);
+}
+
 Status AddReplica(const RunnerContext& context) {
   return ChangeConfig(context, consensus::ADD_SERVER);
 }
@@ -168,6 +236,25 @@ Status RemoveReplica(const RunnerContext& context) {
   return ChangeConfig(context, consensus::REMOVE_SERVER);
 }
 
+Status DoLeaderStepDown(const client::sp::shared_ptr<KuduClient>& client, const
string& tablet_id,
+                        const string& leader_uuid, const HostPort& leader_hp) {
+  unique_ptr<ConsensusServiceProxy> proxy;
+  RETURN_NOT_OK(BuildProxy(leader_hp.host(), leader_hp.port(), &proxy));
+
+  consensus::LeaderStepDownRequestPB req;
+  consensus::LeaderStepDownResponsePB resp;
+  RpcController rpc;
+  rpc.set_timeout(client->default_admin_operation_timeout());
+  req.set_dest_uuid(leader_uuid);
+  req.set_tablet_id(tablet_id);
+
+  RETURN_NOT_OK(proxy->LeaderStepDown(req, &resp, &rpc));
+  if (resp.has_error()) {
+    return StatusFromPB(resp.error().status());
+  }
+  return Status::OK();
+}
+
 Status LeaderStepDown(const RunnerContext& context) {
   const string& master_addresses_str = FindOrDie(context.required_args,
                                                  kMasterAddressesArg);
@@ -189,23 +276,126 @@ Status LeaderStepDown(const RunnerContext& context) {
   }
   RETURN_NOT_OK(s);
 
-  unique_ptr<ConsensusServiceProxy> proxy;
-  RETURN_NOT_OK(BuildProxy(leader_hp.host(), leader_hp.port(), &proxy));
-
-  consensus::LeaderStepDownRequestPB req;
-  consensus::LeaderStepDownResponsePB resp;
-  RpcController rpc;
-  rpc.set_timeout(client->default_admin_operation_timeout());
-  req.set_dest_uuid(leader_uuid);
-  req.set_tablet_id(tablet_id);
+  return DoLeaderStepDown(client, tablet_id, leader_uuid, leader_hp);
+}
 
-  RETURN_NOT_OK(proxy->LeaderStepDown(req, &resp, &rpc));
+Status GetConsensusState(const unique_ptr<ConsensusServiceProxy>& proxy,
+                         const string& tablet_id,
+                         const string& replica_uuid,
+                         const MonoDelta& timeout,
+                         ConsensusStatePB* consensus_state) {
+  GetConsensusStateRequestPB req;
+  GetConsensusStateResponsePB resp;
+  RpcController controller;
+  controller.set_timeout(timeout);
+  req.set_dest_uuid(replica_uuid);
+  req.add_tablet_ids(tablet_id);
+  RETURN_NOT_OK(proxy->GetConsensusState(req, &resp, &controller));
   if (resp.has_error()) {
     return StatusFromPB(resp.error().status());
   }
+  if (resp.tablets_size() == 0) {
+    return Status::NotFound("tablet not found:", tablet_id);
+  }
+  DCHECK_EQ(1, resp.tablets_size());
+  *consensus_state = resp.tablets(0).cstate();
   return Status::OK();
 }
 
+Status GetLastCommittedOpId(const string& tablet_id, const string& replica_uuid,
+                            const HostPort& replica_hp, const MonoDelta& timeout,
OpId* opid) {
+  GetLastOpIdRequestPB req;
+  GetLastOpIdResponsePB resp;
+  RpcController controller;
+  controller.set_timeout(timeout);
+  req.set_tablet_id(tablet_id);
+  req.set_dest_uuid(replica_uuid);
+  req.set_opid_type(consensus::COMMITTED_OPID);
+
+  unique_ptr<ConsensusServiceProxy> proxy;
+  RETURN_NOT_OK(BuildProxy(replica_hp.host(), replica_hp.port(), &proxy));
+  RETURN_NOT_OK(proxy->GetLastOpId(req, &resp, &controller));
+  *opid = resp.opid();
+  return Status::OK();
+}
+
+Status ChangeLeader(const client::sp::shared_ptr<KuduClient>& client, const string&
tablet_id,
+                    const string& old_leader_uuid, const HostPort& old_leader_hp,
+                    const MonoDelta& timeout) {
+  unique_ptr<ConsensusServiceProxy> proxy;
+  RETURN_NOT_OK(BuildProxy(old_leader_hp.host(), old_leader_hp.port(), &proxy));
+  ConsensusStatePB cstate;
+  RETURN_NOT_OK(GetConsensusState(proxy, tablet_id, old_leader_uuid,
+                                  client->default_admin_operation_timeout(), &cstate));
+  int64 current_term = -1;
+  MonoTime deadline = MonoTime::Now() + timeout;
+
+  // First, check the leader and, if it's the old leader, ask it to step down.
+  // Repeat until we time out or get a different leader.
+  while (MonoTime::Now() < deadline) {
+    RETURN_NOT_OK(GetConsensusState(proxy, tablet_id, old_leader_uuid,
+                                    client->default_admin_operation_timeout(), &cstate));
+
+    if (cstate.current_term() > current_term && cstate.has_leader_uuid()) {
+      current_term = cstate.current_term();
+      if (cstate.leader_uuid() != old_leader_uuid) {
+        break;
+      }
+      RETURN_NOT_OK(DoLeaderStepDown(client, tablet_id, old_leader_uuid, old_leader_hp));
+    }
+    SleepFor(MonoDelta::FromMilliseconds(1000));
+  }
+
+  // Second, once we have a new leader, wait for it to assert leadership by replicating an
op
+  // in the current term to the old leader.
+  OpId opid;
+  while (MonoTime::Now() < deadline) {
+    RETURN_NOT_OK(GetLastCommittedOpId(tablet_id, old_leader_uuid, old_leader_hp,
+                                       client->default_admin_operation_timeout(), &opid));
+    if (opid.term() == current_term) {
+      return Status::OK();
+    }
+    SleepFor(MonoDelta::FromMilliseconds(500));
+  }
+
+  return Status::TimedOut("unable to change leadership in time");
+}
+
+Status MoveReplica(const RunnerContext &context) {
+  const string& master_addresses_str = FindOrDie(context.required_args, kMasterAddressesArg);
+  vector<string> master_addresses = strings::Split(master_addresses_str, ",");
+  const string& tablet_id = FindOrDie(context.required_args, kTabletIdArg);
+  const string& rem_replica_uuid = FindOrDie(context.required_args, kFromTsUuidArg);
+  const string& add_replica_uuid = FindOrDie(context.required_args, kToTsUuidArg);
+
+  // Check the tablet is in perfect health and, if so, add the new server.
+  RETURN_NOT_OK_PREPEND(DoKsckForTablet(master_addresses, tablet_id),
+                        "ksck pre-move health check failed");
+  RETURN_NOT_OK(DoChangeConfig(master_addresses, tablet_id, add_replica_uuid,
+                               RaftPeerPB::VOTER, consensus::ADD_SERVER));
+
+  // Wait until the tablet copy completes and the tablet returns to perfect health.
+  MonoDelta copy_timeout = MonoDelta::FromSeconds(FLAGS_move_copy_timeout_sec);
+  RETURN_NOT_OK_PREPEND(WaitForCleanKsck(master_addresses, tablet_id, copy_timeout),
+                        "failed waiting for clean ksck after add server");
+
+  // Finally, remove the chosen replica.
+  // If it is the leader, it will be asked to step down.
+  client::sp::shared_ptr<KuduClient> client;
+  RETURN_NOT_OK(KuduClientBuilder().master_server_addrs(master_addresses).Build(&client));
+  string leader_uuid;
+  HostPort leader_hp;
+  RETURN_NOT_OK(GetTabletLeader(client, tablet_id, &leader_uuid, &leader_hp));
+  if (rem_replica_uuid == leader_uuid) {
+    RETURN_NOT_OK_PREPEND(ChangeLeader(client, tablet_id,
+                                       leader_uuid, leader_hp,
+                                       MonoDelta::FromSeconds(FLAGS_move_leader_timeout_sec)),
+                          "failed changing leadership from the replica to be removed");
+  }
+  return DoChangeConfig(master_addresses, tablet_id, rem_replica_uuid,
+                        boost::none, consensus::REMOVE_SERVER);
+}
+
 } // anonymous namespace
 
 unique_ptr<Mode> BuildTabletMode() {
@@ -214,7 +404,8 @@ unique_ptr<Mode> BuildTabletMode() {
       .Description("Add a new replica to a tablet's Raft configuration")
       .AddRequiredParameter({ kMasterAddressesArg, kMasterAddressesArgDesc })
       .AddRequiredParameter({ kTabletIdArg, kTabletIdArgDesc })
-      .AddRequiredParameter({ kReplicaUuidArg, "New replica's UUID" })
+      .AddRequiredParameter({ kTsUuidArg,
+                              "UUID of the tablet server that should host the new replica"
})
       .AddRequiredParameter(
           { kReplicaTypeArg, "New replica's type. Must be VOTER or NON-VOTER."
           })
@@ -226,7 +417,8 @@ unique_ptr<Mode> BuildTabletMode() {
           "Change the type of an existing replica in a tablet's Raft configuration")
       .AddRequiredParameter({ kMasterAddressesArg, kMasterAddressesArgDesc })
       .AddRequiredParameter({ kTabletIdArg, kTabletIdArgDesc })
-      .AddRequiredParameter({ kReplicaUuidArg, "Existing replica's UUID" })
+      .AddRequiredParameter({ kTsUuidArg,
+                              "UUID of the tablet server hosting the existing replica" })
       .AddRequiredParameter(
           { kReplicaTypeArg, "Existing replica's new type. Must be VOTER or NON-VOTER."
           })
@@ -237,7 +429,26 @@ unique_ptr<Mode> BuildTabletMode() {
       .Description("Remove an existing replica from a tablet's Raft configuration")
       .AddRequiredParameter({ kMasterAddressesArg, kMasterAddressesArgDesc })
       .AddRequiredParameter({ kTabletIdArg, kTabletIdArgDesc })
-      .AddRequiredParameter({ kReplicaUuidArg, "Existing replica's UUID" })
+      .AddRequiredParameter({ kTsUuidArg,
+                              "UUID of the tablet server hosting the existing replica" })
+      .Build();
+
+  const string move_extra_desc = "The replica move tool effectively moves a "
+      "replica from one tablet server to another by adding a replica to the "
+      "new server and then removing it from the old one. It requires that "
+      "ksck return no errors when run against the target tablet. If the move "
+      "fails, the user should wait for any tablet copy to complete, and, if "
+      "the copy succeeds, use remove_replica manually. If the copy fails, the "
+      "new replica will be deleted automatically after some time, and then the "
+      "move can be retried.";
+  unique_ptr<Action> move_replica =
+      ActionBuilder("move_replica", &MoveReplica)
+      .Description("Move a tablet replica from one tablet server to another")
+      .ExtraDescription(move_extra_desc)
+      .AddRequiredParameter({ kMasterAddressesArg, kMasterAddressesArgDesc })
+      .AddRequiredParameter({ kTabletIdArg, kTabletIdArgDesc })
+      .AddRequiredParameter({ kFromTsUuidArg, "UUID of the tablet server to move from" })
+      .AddRequiredParameter({ kToTsUuidArg, "UUID of the tablet server to move to" })
       .Build();
 
   unique_ptr<Action> leader_step_down =
@@ -252,6 +463,7 @@ unique_ptr<Mode> BuildTabletMode() {
       .Description("Change a tablet's Raft configuration")
       .AddAction(std::move(add_replica))
       .AddAction(std::move(change_replica_type))
+      .AddAction(std::move(move_replica))
       .AddAction(std::move(remove_replica))
       .Build();
 


Mime
View raw message