kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jtbirds...@apache.org
Subject [1/2] kudu git commit: [rebalancer] 'auto' mode for RF=1 tablet movements
Date Thu, 05 Jul 2018 17:48:52 GMT
Repository: kudu
Updated Branches:
  refs/heads/master 0dcf16673 -> c5df00c8f


[rebalancer] 'auto' mode for RF=1 tablet movements

Added logic to automatically determine whether the tool should
move replicas of tablets with RF=1.  This is determined by examining the
version of Kudu and the replica management schema in the cluster, which
indicates whether the fix for KUDU-2443 applies.  The fix is present
whenever the version is larger than 1.7.0.

This patch also contains a unit test for the newly introduced
Is343SchemeCluster() utility function and an integration test scenario
RebalancerAndSingleReplicaTablets.  Also, already existing
RebalanceParamTest provides additional coverage for the functionality
in case of --move_single_replicas=auto (the default setting).

Change-Id: Id5a06b137cb34d9351f7e2fb819ed52790e2ee7e
Reviewed-on: http://gerrit.cloudera.org:8080/10612
Reviewed-by: Adar Dembo <adar@cloudera.com>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/a1558eed
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/a1558eed
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/a1558eed

Branch: refs/heads/master
Commit: a1558eeded3d5140fc7ef9ac84d722117dec822c
Parents: 0dcf166
Author: Alexey Serbin <aserbin@cloudera.com>
Authored: Tue Jun 5 15:48:48 2018 -0700
Committer: Alexey Serbin <aserbin@cloudera.com>
Committed: Wed Jul 4 05:29:37 2018 +0000

----------------------------------------------------------------------
 src/kudu/tools/kudu-admin-test.cc     | 107 ++++++++++++++++++-
 src/kudu/tools/kudu-tool-test.cc      | 154 ++++++++++++++++++----------
 src/kudu/tools/rebalance-test.cc      | 142 ++++++++++++++++++++++----
 src/kudu/tools/rebalancer.h           |   2 +-
 src/kudu/tools/tool_action_cluster.cc | 158 ++++++++++++++++++++++++++---
 src/kudu/tools/tool_replica_util.cc   |  48 ++++++++-
 src/kudu/tools/tool_replica_util.h    |  13 ++-
 7 files changed, 528 insertions(+), 96 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/a1558eed/src/kudu/tools/kudu-admin-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/kudu-admin-test.cc b/src/kudu/tools/kudu-admin-test.cc
index 856f1f4..e084753 100644
--- a/src/kudu/tools/kudu-admin-test.cc
+++ b/src/kudu/tools/kudu-admin-test.cc
@@ -108,6 +108,7 @@ using std::deque;
 using std::ostringstream;
 using std::string;
 using std::thread;
+using std::tuple;
 using std::unique_ptr;
 using std::vector;
 using strings::Split;
@@ -253,7 +254,7 @@ enum class DownTS {
 };
 class MoveTabletParamTest :
     public AdminCliTest,
-    public ::testing::WithParamInterface<std::tuple<Kudu1097, DownTS>> {
+    public ::testing::WithParamInterface<tuple<Kudu1097, DownTS>> {
 };
 
 TEST_P(MoveTabletParamTest, Test) {
@@ -1426,7 +1427,7 @@ static Status CreateUnbalancedTables(
 // every table being rebalanced. This test covers different replication factors.
 class RebalanceParamTest :
     public AdminCliTest,
-    public ::testing::WithParamInterface<std::tuple<int, Kudu1097>> {
+    public ::testing::WithParamInterface<tuple<int, Kudu1097>> {
 };
 INSTANTIATE_TEST_CASE_P(, RebalanceParamTest,
     ::testing::Combine(::testing::Values(1, 2, 3, 5),
@@ -1493,7 +1494,7 @@ TEST_P(RebalanceParamTest, Rebalance) {
     "cluster",
     "rebalance",
     cluster_->master()->bound_rpc_addr().ToString(),
-    "--move_single_replicas",
+    "--move_single_replicas=enabled",
   };
 
   {
@@ -1534,7 +1535,7 @@ TEST_P(RebalanceParamTest, Rebalance) {
     ASSERT_TRUE(s.ok()) << s.ToString() << ":" << err;
     ASSERT_STR_CONTAINS(out, "rebalancing is complete: cluster is balanced")
         << "stderr: " << err;
-    // The cluster is un-balanced, so many replicas should have been moved.
+    // The cluster was un-balanced, so many replicas should have been moved.
     ASSERT_STR_NOT_CONTAINS(out, "(moved 0 replicas)");
   }
 
@@ -1748,7 +1749,6 @@ TEST_P(DDLDuringRebalancingTest, TablesCreatedAndDeletedDuringRebalancing)
{
     "cluster",
     "rebalance",
     cluster_->master()->bound_rpc_addr().ToString(),
-    "--move_single_replicas",
   };
 
   // Run the rebalancer concurrently with the DDL operations. The second run
@@ -2054,5 +2054,102 @@ TEST_P(TserverAddedDuringRebalancingTest, TserverStarts) {
   NO_FATALS(ClusterVerifier(cluster_.get()).CheckCluster());
 }
 
+// A test to verify how the rebalancer handles replicas of single-replica
+// tablets in case of various values of the '--move_single_replicas' flag
+// and replica management schemes.
+class RebalancerAndSingleReplicaTablets :
+    public AdminCliTest,
+    public ::testing::WithParamInterface<tuple<string, Kudu1097>> {
+};
+INSTANTIATE_TEST_CASE_P(, RebalancerAndSingleReplicaTablets,
+    ::testing::Combine(::testing::Values("auto", "enabled", "disabled"),
+                       ::testing::Values(Kudu1097::Disable, Kudu1097::Enable)));
+TEST_P(RebalancerAndSingleReplicaTablets, SingleReplicasStayOrMove) {
+  if (!AllowSlowTests()) {
+    LOG(WARNING) << "test is skipped; set KUDU_ALLOW_SLOW_TESTS=1 to run";
+    return;
+  }
+
+  constexpr auto kRepFactor = 1;
+  constexpr auto kNumTservers = 2 * (kRepFactor + 1);
+  constexpr auto kNumTables = kNumTservers;
+  constexpr auto kTserverUnresponsiveMs = 3000;
+  const auto& param = GetParam();
+  const auto& move_single_replica = std::get<0>(param);
+  const auto is_343_scheme = (std::get<1>(param) == Kudu1097::Enable);
+  const string table_name_pattern = "rebalance_test_table_$0";
+  const vector<string> master_flags = {
+    Substitute("--raft_prepare_replacement_before_eviction=$0", is_343_scheme),
+    Substitute("--tserver_unresponsive_timeout_ms=$0", kTserverUnresponsiveMs),
+  };
+  const vector<string> tserver_flags = {
+    Substitute("--raft_prepare_replacement_before_eviction=$0", is_343_scheme),
+  };
+
+  FLAGS_num_tablet_servers = kNumTservers;
+  FLAGS_num_replicas = kRepFactor;
+  NO_FATALS(BuildAndStart(tserver_flags, master_flags));
+
+  // Keep running only (kRepFactor + 1) tablet servers and shut down the rest.
+  for (auto i = kRepFactor + 1; i < kNumTservers; ++i) {
+    cluster_->tablet_server(i)->Shutdown();
+  }
+
+  // Wait for the catalog manager to understand that only (kRepFactor + 1)
+  // tablet servers are available.
+  SleepFor(MonoDelta::FromMilliseconds(5 * kTserverUnresponsiveMs / 4));
+
+  // Create few tables with their tablet replicas landing only on those
+  // (kRepFactor + 1) running tablet servers.
+  KuduSchema client_schema(client::KuduSchemaFromSchema(schema_));
+  for (auto i = 0; i < kNumTables; ++i) {
+    const string table_name = Substitute(table_name_pattern, i);
+    unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
+    ASSERT_OK(table_creator->table_name(table_name)
+              .schema(&client_schema)
+              .add_hash_partitions({ "key" }, 3)
+              .num_replicas(kRepFactor)
+              .Create());
+    ASSERT_OK(RunKuduTool({
+      "perf",
+      "loadgen",
+      cluster_->master()->bound_rpc_addr().ToString(),
+      Substitute("--table_name=$0", table_name),
+      // Don't need much data in there.
+      "--num_threads=1",
+      "--num_rows_per_thread=1",
+    }));
+  }
+  for (auto i = kRepFactor + 1; i < kNumTservers; ++i) {
+    ASSERT_OK(cluster_->tablet_server(i)->Restart());
+  }
+
+  string out;
+  string err;
+  const Status s = RunKuduTool({
+    "cluster",
+    "rebalance",
+    cluster_->master()->bound_rpc_addr().ToString(),
+    Substitute("--move_single_replicas=$0", move_single_replica),
+  }, &out, &err);
+  ASSERT_TRUE(s.ok()) << s.ToString() << ":" << err;
+  ASSERT_STR_CONTAINS(out, "rebalancing is complete: cluster is balanced");
+  if (move_single_replica == "enabled" ||
+      (move_single_replica == "auto" && is_343_scheme)) {
+    // Should move appropriate replicas of single-replica tablets.
+    ASSERT_STR_NOT_CONTAINS(out,
+        "rebalancing is complete: cluster is balanced (moved 0 replicas)");
+    ASSERT_STR_NOT_CONTAINS(err, "has single replica, skipping");
+  } else {
+    ASSERT_STR_CONTAINS(out,
+        "rebalancing is complete: cluster is balanced (moved 0 replicas)");
+    ASSERT_STR_MATCHES(err, "tablet .* of table '.*' (.*) has single replica, skipping");
+  }
+
+  NO_FATALS(cluster_->AssertNoCrashes());
+  ClusterVerifier v(cluster_.get());
+  NO_FATALS(v.CheckCluster());
+}
+
 } // namespace tools
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/a1558eed/src/kudu/tools/kudu-tool-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc
index e90c94a..10963c4 100644
--- a/src/kudu/tools/kudu-tool-test.cc
+++ b/src/kudu/tools/kudu-tool-test.cc
@@ -96,6 +96,7 @@
 #include "kudu/tablet/tablet_replica.h"
 #include "kudu/tools/tool.pb.h"
 #include "kudu/tools/tool_action_common.h"
+#include "kudu/tools/tool_replica_util.h"
 #include "kudu/tools/tool_test_util.h"
 #include "kudu/tserver/mini_tablet_server.h"
 #include "kudu/tserver/tablet_server.h"
@@ -127,42 +128,46 @@ DECLARE_string(block_manager);
 METRIC_DECLARE_counter(bloom_lookups);
 METRIC_DECLARE_entity(tablet);
 
-namespace kudu {
-
-namespace tserver {
-class TabletServerServiceProxy;
-}
-
-namespace tools {
-
-using cfile::CFileWriter;
-using cfile::StringDataGenerator;
-using cfile::WriterOptions;
-using client::KuduClient;
-using client::KuduClientBuilder;
-using client::KuduSchema;
-using client::KuduSchemaBuilder;
-using client::KuduTable;
-using client::sp::shared_ptr;
-using cluster::ExternalMiniCluster;
-using cluster::ExternalMiniClusterOptions;
-using cluster::ExternalTabletServer;
-using cluster::InternalMiniCluster;
-using cluster::InternalMiniClusterOptions;
-using consensus::OpId;
-using consensus::RECEIVED_OPID;
-using consensus::ReplicateRefPtr;
-using consensus::ReplicateMsg;
-using fs::BlockDeletionTransaction;
-using fs::FsReport;
-using fs::WritableBlock;
-using hms::HmsClient;
-using hms::HmsClientOptions;
-using itest::MiniClusterFsInspector;
-using itest::TServerDetails;
-using log::Log;
-using log::LogOptions;
-using rpc::RpcController;
+using kudu::cfile::CFileWriter;
+using kudu::cfile::StringDataGenerator;
+using kudu::cfile::WriterOptions;
+using kudu::client::KuduClient;
+using kudu::client::KuduClientBuilder;
+using kudu::client::KuduSchema;
+using kudu::client::KuduSchemaBuilder;
+using kudu::client::KuduTable;
+using kudu::client::sp::shared_ptr;
+using kudu::cluster::ExternalMiniCluster;
+using kudu::cluster::ExternalMiniClusterOptions;
+using kudu::cluster::ExternalTabletServer;
+using kudu::cluster::InternalMiniCluster;
+using kudu::cluster::InternalMiniClusterOptions;
+using kudu::consensus::OpId;
+using kudu::consensus::RECEIVED_OPID;
+using kudu::consensus::ReplicateMsg;
+using kudu::consensus::ReplicateRefPtr;
+using kudu::fs::BlockDeletionTransaction;
+using kudu::fs::FsReport;
+using kudu::fs::WritableBlock;
+using kudu::hms::HmsClient;
+using kudu::hms::HmsClientOptions;
+using kudu::itest::MiniClusterFsInspector;
+using kudu::itest::TServerDetails;
+using kudu::log::Log;
+using kudu::log::LogOptions;
+using kudu::rpc::RpcController;
+using kudu::tablet::LocalTabletWriter;
+using kudu::tablet::Tablet;
+using kudu::tablet::TabletDataState;
+using kudu::tablet::TabletHarness;
+using kudu::tablet::TabletMetadata;
+using kudu::tablet::TabletReplica;
+using kudu::tablet::TabletSuperBlockPB;
+using kudu::tserver::DeleteTabletRequestPB;
+using kudu::tserver::DeleteTabletResponsePB;
+using kudu::tserver::ListTabletsResponsePB;
+using kudu::tserver::MiniTabletServer;
+using kudu::tserver::WriteRequestPB;
 using std::back_inserter;
 using std::copy;
 using std::make_pair;
@@ -173,20 +178,9 @@ using std::unique_ptr;
 using std::unordered_map;
 using std::vector;
 using strings::Substitute;
-using tablet::LocalTabletWriter;
-using tablet::Tablet;
-using tablet::TabletDataState;
-using tablet::TabletHarness;
-using tablet::TabletMetadata;
-using tablet::TabletReplica;
-using tablet::TabletSuperBlockPB;
-using tserver::DeleteTabletRequestPB;
-using tserver::DeleteTabletResponsePB;
-using tserver::MiniTabletServer;
-using tserver::WriteRequestPB;
-using tserver::ListTabletsRequestPB;
-using tserver::ListTabletsResponsePB;
-using tserver::TabletServerServiceProxy;
+
+namespace kudu {
+namespace tools {
 
 class ToolTest : public KuduTest {
  public:
@@ -1416,7 +1410,7 @@ void ToolTest::RunLoadgen(int num_tservers,
         ColumnSchema("binary_val", BINARY),
       }, 1);
 
-    shared_ptr<client::KuduClient> client;
+    shared_ptr<KuduClient> client;
     ASSERT_OK(cluster_->CreateClient(nullptr, &client));
     KuduSchema client_schema(client::KuduSchemaFromSchema(kSchema));
     unique_ptr<client::KuduTableCreator> table_creator(
@@ -2132,7 +2126,7 @@ Status CreateHmsTable(HmsClient* client,
   }
 
   hive::EnvironmentContext env_ctx;
-  env_ctx.__set_properties({ std::make_pair(hms::HmsClient::kKuduMasterEventKey, "true")
});
+  env_ctx.__set_properties({ make_pair(HmsClient::kKuduMasterEventKey, "true") });
   return client->CreateTable(table, env_ctx);
 }
 
@@ -2777,13 +2771,13 @@ TEST_P(ControlShellToolTest, TestControlShell) {
 
   // Create a table.
   {
-    client::KuduClientBuilder client_builder;
+    KuduClientBuilder client_builder;
     for (const auto& e : masters) {
       HostPort hp;
       ASSERT_OK(HostPortFromPB(e.bound_rpc_address(), &hp));
       client_builder.add_master_server_addr(hp.ToString());
     }
-    shared_ptr<client::KuduClient> client;
+    shared_ptr<KuduClient> client;
     ASSERT_OK(client_builder.Build(&client));
     KuduSchemaBuilder schema_builder;
     schema_builder.AddColumn("foo")
@@ -3121,7 +3115,7 @@ TEST_F(ToolTest, TestFsAddRemoveDataDirEndToEnd) {
   ASSERT_STR_CONTAINS(s.ToString(), "one or more data dirs may have been removed");
 
   // Delete the second table and wait for all of its tablets to be deleted.
-  shared_ptr<client::KuduClient> client;
+  shared_ptr<KuduClient> client;
   ASSERT_OK(mini_cluster_->CreateClient(nullptr, &client));
   ASSERT_OK(client->DeleteTable(kTableBar));
   ASSERT_EVENTUALLY([&]{
@@ -3254,7 +3248,7 @@ TEST_F(ToolTest, TestReplaceTablet) {
 
   // Sanity check: there should be no more rows than we inserted before the replace.
   // TODO(wdberkeley): Should also be possible to keep inserting through a replace.
-  client::sp::shared_ptr<client::KuduTable> workload_table;
+  client::sp::shared_ptr<KuduTable> workload_table;
   ASSERT_OK(workload.client()->OpenTable(workload.table_name(), &workload_table));
   ASSERT_GE(workload.rows_inserted(), CountTableRows(workload_table.get()));
 }
@@ -3328,5 +3322,53 @@ TEST_F(ToolTest, TestParseStacks) {
                      "invalid JSON payload.*lacks ending quotation");
 }
 
+class Is343ReplicaUtilTest :
+    public ToolTest,
+    public ::testing::WithParamInterface<bool> {
+};
+INSTANTIATE_TEST_CASE_P(, Is343ReplicaUtilTest, ::testing::Bool());
+TEST_P(Is343ReplicaUtilTest, Is343Cluster) {
+  constexpr auto kReplicationFactor = 3;
+  const auto is_343_scheme = GetParam();
+
+  ExternalMiniClusterOptions opts;
+  opts.num_tablet_servers = kReplicationFactor;
+  opts.extra_master_flags = {
+    Substitute("--raft_prepare_replacement_before_eviction=$0", is_343_scheme),
+  };
+  opts.extra_tserver_flags = {
+    Substitute("--raft_prepare_replacement_before_eviction=$0", is_343_scheme),
+  };
+  NO_FATALS(StartExternalMiniCluster(opts));
+  const auto& master_addr = cluster_->master()->bound_rpc_addr().ToString();
+
+  {
+    const string empty_name = "";
+    bool is_343 = false;
+    const auto s = Is343SchemeCluster({ master_addr }, empty_name, &is_343);
+    ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+  }
+
+  {
+    bool is_343 = false;
+    const auto s = Is343SchemeCluster({ master_addr }, boost::none, &is_343);
+    ASSERT_TRUE(s.IsIncomplete()) << s.ToString();
+    ASSERT_STR_CONTAINS(s.ToString(), "not a single table found");
+  }
+
+  // Create a table.
+  TestWorkload workload(cluster_.get());
+  workload.set_num_replicas(kReplicationFactor);
+  workload.set_table_name("is_343_test_table");
+  workload.Setup();
+
+  {
+    bool is_343 = false;
+    const auto s = Is343SchemeCluster({ master_addr }, boost::none, &is_343);
+    ASSERT_TRUE(s.ok()) << s.ToString();
+    ASSERT_EQ(is_343_scheme, is_343);
+  }
+}
+
 } // namespace tools
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/a1558eed/src/kudu/tools/rebalance-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/rebalance-test.cc b/src/kudu/tools/rebalance-test.cc
index 3f3f6f3..ec59f55 100644
--- a/src/kudu/tools/rebalance-test.cc
+++ b/src/kudu/tools/rebalance-test.cc
@@ -28,7 +28,6 @@
 
 #include <gtest/gtest.h>
 
-#include "kudu/gutil/macros.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/tools/ksck_results.h"
 #include "kudu/tools/rebalance_algo.h"
@@ -64,6 +63,7 @@ struct KsckTabletSummaryInput {
 
 struct KsckTableSummaryInput {
   std::string id;
+  int replication_factor;
 };
 
 // The input to build KsckResults data. Contains relevant sub-fields of the
@@ -114,7 +114,8 @@ KsckResults GenerateKsckResults(KsckResultsInput input) {
     for (const auto& summary_input : input.table_summaries) {
       KsckTableSummary summary;
       summary.id = summary_input.id;
-      summaries.emplace_back(summary);
+      summary.replication_factor = summary_input.replication_factor;
+      summaries.emplace_back(std::move(summary));
     }
   }
   return results;
@@ -208,9 +209,37 @@ ostream& operator<<(ostream& s, const ClusterBalanceInfo&
info) {
   return s;
 }
 
-// Test converting KsckResults result into ClusterBalanceInfo.
-TEST(KuduKsckRebalanceTest, KsckResultsToClusterBalanceInfo) {
-  const KsckResultsTestConfig kConfigs[] = {
+class KsckResultsToClusterBalanceInfoTest : public ::testing::Test {
+ protected:
+  void RunTest(const Rebalancer::Config& rebalancer_cfg,
+               const vector<KsckResultsTestConfig>& test_configs) {
+    for (auto idx = 0; idx < test_configs.size(); ++idx) {
+      SCOPED_TRACE(Substitute("test config index: $0", idx));
+      const auto& cfg = test_configs[idx];
+      auto ksck_results = GenerateKsckResults(cfg.input);
+
+      Rebalancer rebalancer(rebalancer_cfg);
+      ClusterBalanceInfo cbi;
+      ASSERT_OK(rebalancer.KsckResultsToClusterBalanceInfo(
+          ksck_results, Rebalancer::MovesInProgress(), &cbi));
+      ASSERT_EQ(cfg.ref_balance_info, cbi);
+    }
+  }
+};
+
+// Test converting KsckResults result into ClusterBalanceInfo when movement
+// of RF=1 replicas is allowed.
+TEST_F(KsckResultsToClusterBalanceInfoTest, MoveRf1Replicas) {
+  const Rebalancer::Config rebalancer_config = {
+    {},     // master_addresses
+    {},     // table_filters
+    5,      // max_moves_per_server
+    30,     // max_staleness_interval_sec
+    0,      // max_run_time_sec
+    true,   // move_rf1_replicas
+  };
+
+  const vector<KsckResultsTestConfig> test_configs = {
     // Empty
     {
       {},
@@ -221,7 +250,7 @@ TEST(KuduKsckRebalanceTest, KsckResultsToClusterBalanceInfo) {
       {
         { { "ts_0" }, },
         { { "tablet_0", "table_a", { { "ts_0", true }, }, }, },
-        { { "table_a" }, },
+        { { "table_a", 1 }, },
       },
       {
         { { 0, { "table_a", { { 1, "ts_0" }, } } }, },
@@ -237,7 +266,7 @@ TEST(KuduKsckRebalanceTest, KsckResultsToClusterBalanceInfo) {
           { "tablet_a0", "table_a", { { "ts_1", true }, }, },
           { "tablet_a0", "table_a", { { "ts_2", true }, }, },
         },
-        { { "table_a", } },
+        { { "table_a", 3 } },
       },
       {
         {
@@ -263,7 +292,7 @@ TEST(KuduKsckRebalanceTest, KsckResultsToClusterBalanceInfo) {
           { "tablet_b_0", "table_b", { { "ts_0", true }, }, },
           { "tablet_c_0", "table_c", { { "ts_0", true }, }, },
         },
-        { { { "table_a" }, { "table_b" }, { "table_c" }, } },
+        { { { "table_a", 1 }, { "table_b", 1 }, { "table_c", 1 }, } },
       },
       {
         {
@@ -304,7 +333,7 @@ TEST(KuduKsckRebalanceTest, KsckResultsToClusterBalanceInfo) {
           { "tablet_c_0", "table_c", { { "ts_1", true }, }, },
           { "tablet_c_1", "table_c", { { "ts_1", true }, }, },
         },
-        { { { "table_a" }, { "table_b" }, { "table_c" }, } },
+        { { { "table_a", 3 }, { "table_b", 1 }, { "table_c", 1 }, } },
       },
       {
         {
@@ -331,16 +360,91 @@ TEST(KuduKsckRebalanceTest, KsckResultsToClusterBalanceInfo) {
     },
   };
 
-  for (auto idx = 0; idx < arraysize(kConfigs); ++idx) {
-    SCOPED_TRACE(Substitute("test config index: $0", idx)); \
-    const auto& cfg = kConfigs[idx];
-    auto ksck_results = GenerateKsckResults(cfg.input);
-    ClusterBalanceInfo cbi;
-    Rebalancer rebalancer({});
-    ASSERT_OK(rebalancer.KsckResultsToClusterBalanceInfo(
-        ksck_results, Rebalancer::MovesInProgress(), &cbi));
-    ASSERT_EQ(cfg.ref_balance_info, cbi);
-  }
+  NO_FATALS(RunTest(rebalancer_config, test_configs));
+}
+
+// Test converting KsckResults result into ClusterBalanceInfo when movement of
+// RF=1 replicas is disabled.
+TEST_F(KsckResultsToClusterBalanceInfoTest, DoNotMoveRf1Replicas) {
+  const Rebalancer::Config rebalancer_config = {
+    {},     // master_addresses
+    {},     // table_filters
+    5,      // max_moves_per_server
+    30,     // max_staleness_interval_sec
+    0,      // max_run_time_sec
+    false,  // move_rf1_replicas
+  };
+
+  const vector<KsckResultsTestConfig> test_configs = {
+    // Empty
+    {
+      {},
+      {}
+    },
+    // One tserver, one table, one tablet, RF=1.
+    {
+      {
+        { { "ts_0" }, },
+        { { "tablet_0", "table_a", { { "ts_0", true }, }, }, },
+        { { "table_a", 1 }, },
+      },
+      {
+        {},
+        { { 0, "ts_0" }, }
+      }
+    },
+    // Two tserver, two tables, RF=1.
+    {
+      {
+        { { "ts_0" }, { "ts_1" }, },
+        {
+          { "tablet_a0", "table_a", { { "ts_0", true }, }, },
+          { "tablet_b0", "table_b", { { "ts_0", true }, }, },
+          { "tablet_b1", "table_b", { { "ts_1", true }, }, },
+        },
+        { { "table_a", 1 }, { "table_b", 1 } },
+      },
+      {
+        {},
+        { { 0, "ts_1" }, { 0, "ts_0" }, }
+      }
+    },
+    // table_a: 1 tablet with RF=3
+    // table_b: 3 tablets with RF=1
+    // table_c: 2 tablets with RF=1
+    {
+      {
+        { { "ts_0" }, { "ts_1" }, { "ts_2" }, },
+        {
+          { "tablet_a_0", "table_a", { { "ts_0", true }, }, },
+          { "tablet_a_0", "table_a", { { "ts_1", true }, }, },
+          { "tablet_a_0", "table_a", { { "ts_2", true }, }, },
+          { "tablet_b_0", "table_b", { { "ts_0", true }, }, },
+          { "tablet_b_1", "table_b", { { "ts_0", true }, }, },
+          { "tablet_b_2", "table_b", { { "ts_0", true }, }, },
+          { "tablet_c_0", "table_c", { { "ts_1", true }, }, },
+          { "tablet_c_1", "table_c", { { "ts_1", true }, }, },
+        },
+        { { { "table_a", 3 }, { "table_b", 1 }, { "table_c", 1 }, } },
+      },
+      {
+        {
+          {
+            0, {
+              "table_a", {
+                { 1, "ts_2" }, { 1, "ts_1" }, { 1, "ts_0" },
+              }
+            }
+          },
+        },
+        {
+          { 1, "ts_2" }, { 1, "ts_1" }, { 1, "ts_0" },
+        },
+      }
+    },
+  };
+
+  NO_FATALS(RunTest(rebalancer_config, test_configs));
 }
 
 } // namespace tools

http://git-wip-us.apache.org/repos/asf/kudu/blob/a1558eed/src/kudu/tools/rebalancer.h
----------------------------------------------------------------------
diff --git a/src/kudu/tools/rebalancer.h b/src/kudu/tools/rebalancer.h
index e18c84a..3f7f0b1 100644
--- a/src/kudu/tools/rebalancer.h
+++ b/src/kudu/tools/rebalancer.h
@@ -229,7 +229,7 @@ class Rebalancer {
     client::sp::shared_ptr<client::KuduClient> client_;
   };
 
-  FRIEND_TEST(KuduKsckRebalanceTest, KsckResultsToClusterBalanceInfo);
+  friend class KsckResultsToClusterBalanceInfoTest;
 
   // Convert ksck results into cluster balance information suitable for the
   // input of the high-level rebalancing algorithm. The 'moves_in_progress'

http://git-wip-us.apache.org/repos/asf/kudu/blob/a1558eed/src/kudu/tools/tool_action_cluster.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/tool_action_cluster.cc b/src/kudu/tools/tool_action_cluster.cc
index 010cdcd..e420518 100644
--- a/src/kudu/tools/tool_action_cluster.cc
+++ b/src/kudu/tools/tool_action_cluster.cc
@@ -15,35 +15,39 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include <cstddef>
+#include <algorithm>
+#include <cstdlib>
 #include <iostream>
-#include <map>
+#include <iterator>
 #include <memory>
-#include <set>
 #include <string>
-#include <type_traits>
+#include <tuple>
 #include <unordered_map>
-#include <utility>
 #include <vector>
 
+#include <boost/algorithm/string/predicate.hpp>
+#include <boost/optional/optional.hpp>
 #include <gflags/gflags.h>
 #include <glog/logging.h>
 
+#include "kudu/gutil/basictypes.h"
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/strings/split.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/tools/ksck.h"
 #include "kudu/tools/ksck_remote.h"
+#include "kudu/tools/ksck_results.h"
 #include "kudu/tools/rebalancer.h"
 #include "kudu/tools/tool_action.h"
 #include "kudu/tools/tool_action_common.h"
+#include "kudu/tools/tool_replica_util.h"
 #include "kudu/util/status.h"
+#include "kudu/util/version_util.h"
 
 using std::cout;
 using std::endl;
-using std::multimap;
-using std::set;
+using std::make_tuple;
 using std::shared_ptr;
 using std::string;
 using std::unique_ptr;
@@ -84,14 +88,38 @@ DEFINE_int64(max_run_time_sec, 0,
              "Maximum time to run the rebalancing, in seconds. Specifying 0 "
              "means not imposing any limit on the rebalancing run time.");
 
-DEFINE_bool(move_single_replicas, false,
-            "Whether to move single replica tablets (i.e. replicas of tablets "
-            "of replication factor 1).");
+DEFINE_string(move_single_replicas, "auto",
+              "Whether to move single replica tablets (i.e. replicas of tablets "
+              "of replication factor 1). Acceptable values are: "
+              "'auto', 'enabled', 'disabled'. The value of 'auto' means "
+              "turn it on/off depending on the replica management scheme "
+              "and Kudu version.");
 
 DEFINE_bool(output_replica_distribution_details, false,
             "Whether to output details on per-table and per-server "
             "replica distribution");
 
+static bool ValidateMoveSingleReplicas(const char* flag_name,
+                                       const string& flag_value) {
+  const vector<string> allowed_values = { "auto", "enabled", "disabled" };
+  if (std::find_if(allowed_values.begin(), allowed_values.end(),
+                   [&](const string& allowed_value) {
+                     return boost::iequals(allowed_value, flag_value);
+                   }) != allowed_values.end()) {
+    return true;
+  }
+
+  std::ostringstream ss;
+  ss << "'" << flag_value << "': unsupported value for --" << flag_name
+     << " flag; should be one of ";
+  copy(allowed_values.begin(), allowed_values.end(),
+       std::ostream_iterator<string>(ss, " "));
+  LOG(ERROR) << ss.str();
+
+  return false;
+}
+DEFINE_validator(move_single_replicas, &ValidateMoveSingleReplicas);
+
 namespace kudu {
 namespace tools {
 
@@ -111,6 +139,99 @@ Status RunKsck(const RunnerContext& context) {
   return ksck->RunAndPrintResults();
 }
 
+// Does the version in 'version_str' support movement of single replicas?
+// The minimum version required is 1.7.1.
+bool VersionSupportsRF1Movement(const string& version_str) {
+  Version v;
+  if (!ParseVersion(version_str, &v).ok()) {
+    return false;
+  }
+  return make_tuple(v.major, v.minor, v.maintenance) >= make_tuple(1, 7, 1);
+}
+
+// Whether it make sense to move replicas of single-replica tablets.
+// The output parameter 'move_single_replicas' cannot be null.
+//
+// Moving replicas of tablets with replication factor one (a.k.a. non-replicated
+// tablets) is tricky because of the following:
+//
+//   * The sequence of Raft configuration updates when moving tablet replica
+//     in case of the 3-2-3 replica management scheme.
+//
+//   * KUDU-2443: even with the 3-4-3 replica management scheme, moving of
+//     non-replicated tablets is not possible for the versions prior to the fix.
+//
+Status EvaluateMoveSingleReplicasFlag(const vector<string>& master_addresses,
+                                      bool* move_single_replicas) {
+  DCHECK(move_single_replicas);
+  if (!boost::iequals(FLAGS_move_single_replicas, "auto")) {
+    if (boost::iequals(FLAGS_move_single_replicas, "enabled")) {
+      *move_single_replicas = true;
+    } else {
+      DCHECK(boost::iequals(FLAGS_move_single_replicas, "disabled"));
+      *move_single_replicas = false;
+    }
+    return Status::OK();
+  }
+
+  shared_ptr<KsckCluster> cluster;
+  RETURN_NOT_OK_PREPEND(RemoteKsckCluster::Build(master_addresses, &cluster),
+                        "unable to build KsckCluster");
+  shared_ptr<Ksck> ksck(new Ksck(cluster));
+
+  // Ignoring the result of the Ksck::Run() method: it's possible the cluster
+  // is not completely healthy but rebalancing can proceed; for example,
+  // if a leader election is occurring.
+  ignore_result(ksck->Run());
+  const auto& ksck_results = ksck->results();
+
+  for (const auto& summaries : { ksck_results.tserver_summaries,
+                                 ksck_results.master_summaries }) {
+    for (const auto& summary : summaries) {
+      if (summary.version) {
+        if (!VersionSupportsRF1Movement(*summary.version)) {
+          LOG(INFO) << "found Kudu server of version '" << *summary.version
+                    << "'; not rebalancing single-replica tablets as a result";
+          *move_single_replicas = false;
+          return Status::OK();
+        }
+      } else {
+        LOG(INFO) << "no version information from some servers; "
+                  << "not rebalancing single-replica tablets as the result";
+        *move_single_replicas = false;
+        return Status::OK();
+      }
+    }
+  }
+
+  // Now check for the replica management scheme. If it's the 3-2-3 scheme,
+  // don't move replicas of non-replicated (a.k.a. RF=1) tablets. The reasoning
+  // is simple: in Raft it's necessary to get acknowledgement from the majority
+  // of voter replicas to commit a write operation. In case of the 3-2-3 scheme
+  // and non-replicated tablets the majority is two out of two tablet replicas
+  // because the destination replica is added as a voter. In case of huge amount
+  // of data in the tablet or frequent updates, it might take a long time for the
+  // destination replica to catch up. During that time the tablet would not be
+  // available. The idea is to reduce the risk of unintended unavailability
+  // unless it's explicitly requested by the operator.
+  boost::optional<string> tid;
+  if (!ksck_results.tablet_summaries.empty()) {
+    tid = ksck_results.tablet_summaries.front().id;
+  }
+  bool is_343_scheme = false;
+  auto s = Is343SchemeCluster(master_addresses, tid, &is_343_scheme);
+  if (!s.ok()) {
+    LOG(WARNING) << s.ToString() << ": failed to get information "
+        "on the replica management scheme; not rebalancing "
+        "single-replica tablets as the result";
+    *move_single_replicas = false;
+    return Status::OK();
+  }
+
+  *move_single_replicas = is_343_scheme;
+  return Status::OK();
+}
+
 // Rebalance the cluster. The process is run step-by-step, where at each step
 // a new batch of move operations is output by the algorithm. As many as
 // possible replica movements from one batch are performed concurrently, while
@@ -119,13 +240,24 @@ Status RunKsck(const RunnerContext& context) {
 // can be the source and the destination of no more than the specified number of
 // move operations.
 Status RunRebalance(const RunnerContext& context) {
+  const vector<string> master_addresses = Split(
+      FindOrDie(context.required_args, kMasterAddressesArg), ",");
+  const vector<string> table_filters =
+      Split(FLAGS_tables, ",", strings::SkipEmpty());
+
+  // Evaluate --move_single_replicas flag: decide whether enable to disable
+  // moving of single-replica tablets based on the reported version of the
+  // Kudu components.
+  bool move_single_replicas = false;
+  RETURN_NOT_OK(EvaluateMoveSingleReplicasFlag(master_addresses,
+                                               &move_single_replicas));
   Rebalancer rebalancer(Rebalancer::Config(
-      std::move(Split(FindOrDie(context.required_args, kMasterAddressesArg), ",")),
-      std::move(Split(FLAGS_tables, ",", strings::SkipEmpty())),
+      master_addresses,
+      table_filters,
       FLAGS_max_moves_per_server,
       FLAGS_max_staleness_interval_sec,
       FLAGS_max_run_time_sec,
-      FLAGS_move_single_replicas,
+      move_single_replicas,
       FLAGS_output_replica_distribution_details));
 
   // Print info on pre-rebalance distribution of replicas.

http://git-wip-us.apache.org/repos/asf/kudu/blob/a1558eed/src/kudu/tools/tool_replica_util.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/tool_replica_util.cc b/src/kudu/tools/tool_replica_util.cc
index 37b429c..3001350 100644
--- a/src/kudu/tools/tool_replica_util.cc
+++ b/src/kudu/tools/tool_replica_util.cc
@@ -299,7 +299,7 @@ Status CheckCompleteMove(const vector<string>& master_addresses,
         return Status::OK();
       }
       // Make sure the current leader has asserted its leadership before sending
-      // it to ChangeConfig request.
+      // it the ChangeConfig request.
       OpId opid;
       RETURN_NOT_OK(GetLastCommittedOpId(tablet_id, leader_uuid, leader_hp,
                                          client->default_admin_operation_timeout(),
@@ -499,6 +499,52 @@ Status DoChangeConfig(const vector<string>& master_addresses,
   return Status::OK();
 }
 
+// This could alternatively be implemented using the GetFlags API, but the
+// GetFlags RPC is not supported on all versions with which the rebalancing
+// tool would like to be compatible, and this method based on PB fields
+// is less fragile than the string matching required to use GetFlags with old
+// versions.
+Status Is343SchemeCluster(const vector<string>& master_addresses,
+                          const boost::optional<string>& tablet_id_in,
+                          bool* is_343_scheme) {
+  client::sp::shared_ptr<client::KuduClient> client;
+  RETURN_NOT_OK(client::KuduClientBuilder()
+                .master_server_addrs(master_addresses)
+                .Build(&client));
+  string tablet_id;
+  if (tablet_id_in) {
+    tablet_id = *tablet_id_in;
+  } else {
+    vector<string> table_names;
+    RETURN_NOT_OK(client->ListTables(&table_names));
+    if (table_names.empty()) {
+      return Status::Incomplete("not a single table found");
+    }
+
+    const auto& table_name = table_names.front();
+    client::sp::shared_ptr<client::KuduTable> client_table;
+    RETURN_NOT_OK(client->OpenTable(table_name, &client_table));
+    vector<client::KuduScanToken*> tokens;
+    ElementDeleter deleter(&tokens);
+    client::KuduScanTokenBuilder builder(client_table.get());
+    RETURN_NOT_OK(builder.Build(&tokens));
+    if (tokens.empty()) {
+      return Status::Incomplete(Substitute(
+          "table '$0': not a single scan token returned", table_name));
+    }
+    tablet_id = tokens.front()->tablet().id();
+  }
+
+  string leader_uuid;
+  HostPort leader_hp;
+  RETURN_NOT_OK(GetTabletLeader(client, tablet_id, &leader_uuid, &leader_hp));
+  unique_ptr<consensus::ConsensusServiceProxy> proxy;
+  RETURN_NOT_OK(BuildProxy(leader_hp.host(), leader_hp.port(), &proxy));
+  return GetConsensusState(proxy, tablet_id, leader_uuid,
+                           client->default_admin_operation_timeout(),
+                           nullptr /* consensus_state */, is_343_scheme);
+}
+
 } // namespace tools
 } // namespace kudu
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/a1558eed/src/kudu/tools/tool_replica_util.h
----------------------------------------------------------------------
diff --git a/src/kudu/tools/tool_replica_util.h b/src/kudu/tools/tool_replica_util.h
index 12f92a6..19267a4 100644
--- a/src/kudu/tools/tool_replica_util.h
+++ b/src/kudu/tools/tool_replica_util.h
@@ -83,7 +83,7 @@ Status GetTabletLeader(
 //
 // The function returns Status::OK() if the 'is_complete' and
 // 'completion_status' contain valid information, otherwise it returns first
-// non-OK status encountered while trying to status of the replica move.
+// non-OK status encountered while trying to get status of the replica movement.
 //
 // If the source replica happens to be a leader, this function asks it to step
 // down. Also, in case of 3-2-3 replica management mode, this function removes
@@ -127,5 +127,16 @@ Status DoChangeConfig(const std::vector<std::string>& master_addresses,
     const boost::optional<int64_t>& cas_opid_idx = boost::none,
     bool* cas_failed = nullptr);
 
+// Check whether the cluster with the specified master addresses supports
+// the 3-4-3 replica management scheme. Returns Status::Incomplete() if
+// there is not enough information available to determine the replica management
+// scheme (e.g., not a single table exists in the cluster). If the identifier
+// of an existing tablet is known, pass it via the 'tablet_id_in' parameter.
+// The 'is_343_scheme' out parameter is set to 'true' if the cluster uses
+// the 3-4-3 replica management scheme.
+Status Is343SchemeCluster(const std::vector<std::string>& master_addresses,
+                          const boost::optional<std::string>& tablet_id_in,
+                          bool* is_343_scheme);
+
 } // namespace tools
 } // namespace kudu


Mime
View raw message