kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ale...@apache.org
Subject [2/3] kudu git commit: [rebalancer] location-aware rebalancer (part 9/n)
Date Tue, 06 Nov 2018 22:38:30 GMT
[rebalancer] location-aware rebalancer (part 9/n)

Updated reporting functionality of the rebalancer tool to output
information on placement policy violations and other relevant
information for location-aware clusters.

Added one simple integration test as well.

Change-Id: I8407e9f8cf6b41a6aeb075372d852125d9739e08
Reviewed-on: http://gerrit.cloudera.org:8080/11862
Tested-by: Kudu Jenkins
Reviewed-by: Will Berkeley <wdberkeley@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/8e9345a7
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/8e9345a7
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/8e9345a7

Branch: refs/heads/master
Commit: 8e9345a79849ed3e96a85dc5240e0a4e709b2055
Parents: e172df4
Author: Alexey Serbin <aserbin@cloudera.com>
Authored: Fri Oct 26 18:25:24 2018 -0700
Committer: Alexey Serbin <aserbin@cloudera.com>
Committed: Tue Nov 6 21:59:21 2018 +0000

----------------------------------------------------------------------
 src/kudu/tools/placement_policy_util-test.cc |  38 +--
 src/kudu/tools/placement_policy_util.cc      |   2 +-
 src/kudu/tools/placement_policy_util.h       |   1 -
 src/kudu/tools/rebalancer.cc                 | 332 +++++++++++++++-------
 src/kudu/tools/rebalancer.h                  |  15 +
 src/kudu/tools/rebalancer_tool-test.cc       | 208 ++++++++++++--
 6 files changed, 456 insertions(+), 140 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/8e9345a7/src/kudu/tools/placement_policy_util-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/placement_policy_util-test.cc b/src/kudu/tools/placement_policy_util-test.cc
index 7d48cbe..cccd611 100644
--- a/src/kudu/tools/placement_policy_util-test.cc
+++ b/src/kudu/tools/placement_policy_util-test.cc
@@ -139,19 +139,19 @@ void ClusterConfigToClusterPlacementInfo(const TestClusterConfig&
tcc,
   *tpi = std::move(result_tpi);
 }
 
-// TODO(aserbin): is it needed at all?
 bool operator==(const PlacementPolicyViolationInfo& lhs,
                 const PlacementPolicyViolationInfo& rhs) {
   return lhs.tablet_id == rhs.tablet_id &&
       lhs.majority_location == rhs.majority_location &&
       lhs.replicas_num_at_majority_location ==
-          rhs.replicas_num_at_majority_location &&
-      lhs.replication_factor == rhs.replication_factor;
+          rhs.replicas_num_at_majority_location;
 }
 
 ostream& operator<<(ostream& s, const PlacementPolicyViolationInfo& info)
{
   s << "{tablet_id: " << info.tablet_id
-    << ", location: " << info.majority_location << "}";
+    << ", location: " << info.majority_location
+    << ", replicas_num_at_majority_location: "
+    << info.replicas_num_at_majority_location << "}";
   return s;
 }
 
@@ -327,7 +327,7 @@ TEST_F(ClusterLocationTest, PlacementPolicyViolationsSimple) {
         { "D", {} },
         { "E", {} },
       },
-      { { "t0", "L0" }, },
+      { { "t0", "L0", 3 }, },
       { { "t0", "C" }, }
     },
 
@@ -345,7 +345,7 @@ TEST_F(ClusterLocationTest, PlacementPolicyViolationsSimple) {
         { "B", { "t0", } },
         { "C", { "t0", } },
       },
-      { { "t0", "L0" }, },
+      { { "t0", "L0", 2 }, },
       {},
     },
 
@@ -364,7 +364,7 @@ TEST_F(ClusterLocationTest, PlacementPolicyViolationsSimple) {
         { "C", { "t0", } },
         { "D", {} },
       },
-      { { "t0", "L0" }, },
+      { { "t0", "L0", 2 }, },
       { { "t0", "B" }, }
     },
   };
@@ -390,7 +390,7 @@ TEST_F(ClusterLocationTest, PlacementPolicyViolationsMixed) {
         { "D", { "t1", "x1", } }, { "E", { "t1", } },
         { "F", { "t1", } },
       },
-      { { "t0", "L0" }, { "t1", "L1" }, },
+      { { "t0", "L0", 3 }, { "t1", "L1", 2 }, },
       { { "t0", "B" }, { "t1", "E" } }
     },
 
@@ -410,7 +410,7 @@ TEST_F(ClusterLocationTest, PlacementPolicyViolationsMixed) {
         { "D", { "t1", "t2", } }, { "E", { "t1", "t3", } },
         { "F", { "t1", "t2", "t3", } },
       },
-      { { "t0", "L0" }, { "t1", "L1" }, },
+      { { "t0", "L0", 3 }, { "t1", "L1", 2 }, },
       { { "t0", "B" }, { "t1", "E" } }
     },
   };
@@ -441,7 +441,7 @@ TEST_F(ClusterLocationTest, NoCandidateMovesToFixPolicyViolations) {
         { "E", { "t0", } },
         { "F", { "t0", } },
       },
-      { { "t0", "L0" }, },
+      { { "t0", "L0", 3 }, },
       {},
     },
     // One RF=7 tablet with the distribution of its replica placement violating
@@ -467,7 +467,7 @@ TEST_F(ClusterLocationTest, NoCandidateMovesToFixPolicyViolations) {
         { "G", { "t0", } },
         { "H", { "t0", } },
       },
-      { { "t0", "L0" }, },
+      { { "t0", "L0", 4 }, },
       {},
     },
     {
@@ -485,7 +485,7 @@ TEST_F(ClusterLocationTest, NoCandidateMovesToFixPolicyViolations) {
         { "D", { "t0", } }, { "E", { "t0", } },
         { "F", { "t0", } },
       },
-      { { "t0", "L0" }, },
+      { { "t0", "L0", 3 }, },
       {}
     },
   };
@@ -525,7 +525,7 @@ TEST_F(ClusterLocationTest, PlacementPolicyViolationsEvenRFEdgeCases)
{
         { "C", { "t1", } },
         { "D", { "t1", } },
       },
-      { { "t0", "L0" }, { "t1", "L0" }, },
+      { { "t0", "L0", 2 }, { "t1", "L0", 4 }, },
       {}
     },
     {
@@ -541,7 +541,7 @@ TEST_F(ClusterLocationTest, PlacementPolicyViolationsEvenRFEdgeCases)
{
         { "A", { "t0", } }, { "B", { "t0", } },
         { "D", { "t1", } }, { "E", { "t1", } },
       },
-      { { "t0", "L0" }, { "t1", "L1" }, },
+      { { "t0", "L0", 2 }, { "t1", "L1", 2 }, },
       {}
     },
     {
@@ -558,7 +558,7 @@ TEST_F(ClusterLocationTest, PlacementPolicyViolationsEvenRFEdgeCases)
{
         { "A", { "t0", "t1", } }, { "B", { "t0", "t1", } },
         { "D", { "t1", } }, { "E", { "t1", } },
       },
-      { { "t0", "L0" }, { "t1", "L1" }, },
+      { { "t0", "L0", 2 }, { "t1", "L1", 2 }, },
       {}
     },
     {
@@ -574,7 +574,7 @@ TEST_F(ClusterLocationTest, PlacementPolicyViolationsEvenRFEdgeCases)
{
         { "A", { "t0", } }, { "B", { "t0", } },
         { "C", { "t1", } }, { "D", { "t1", } },
       },
-      { { "t0", "L0" }, { "t1", "L1" }, },
+      { { "t0", "L0", 2 }, { "t1", "L1", 2 }, },
       {}
     },
     {
@@ -592,7 +592,7 @@ TEST_F(ClusterLocationTest, PlacementPolicyViolationsEvenRFEdgeCases)
{
         { "D", { "t0", } },
         { "F", { "t0", } },
       },
-      { { "t0", "L0" }, },
+      { { "t0", "L0", 2 }, },
       {}
     },
   };
@@ -616,7 +616,7 @@ TEST_F(ClusterLocationTest, PlacementPolicyViolationsEvenRF) {
         { "D", { "t0", } }, { "F", { "t0", } },
         { "H", { "t0", } },
       },
-      { { "t0", "L0" }, },
+      { { "t0", "L0", 3 }, },
       { { "t0", "B" }, }
     },
     {
@@ -635,7 +635,7 @@ TEST_F(ClusterLocationTest, PlacementPolicyViolationsEvenRF) {
         { "G", { "t0", } },
         { "H", { "t0", } },
       },
-      { { "t0", "L1" }, },
+      { { "t0", "L1", 4 }, },
       { { "t0", "D" }, }
     },
   };

http://git-wip-us.apache.org/repos/asf/kudu/blob/8e9345a7/src/kudu/tools/placement_policy_util.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/placement_policy_util.cc b/src/kudu/tools/placement_policy_util.cc
index be1b502..f5ab790 100644
--- a/src/kudu/tools/placement_policy_util.cc
+++ b/src/kudu/tools/placement_policy_util.cc
@@ -333,7 +333,7 @@ Status DetectPlacementPolicyViolations(
           tablet_id, max_replicas_num, rep_factor, max_replicas_location);
     }
     if (is_policy_violated) {
-      info.push_back({ tablet_id, max_replicas_location });
+      info.push_back({ tablet_id, max_replicas_location, max_replicas_num });
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/8e9345a7/src/kudu/tools/placement_policy_util.h
----------------------------------------------------------------------
diff --git a/src/kudu/tools/placement_policy_util.h b/src/kudu/tools/placement_policy_util.h
index e54848d..2938d17 100644
--- a/src/kudu/tools/placement_policy_util.h
+++ b/src/kudu/tools/placement_policy_util.h
@@ -86,7 +86,6 @@ Status BuildTabletsPlacementInfo(const ClusterRawInfo& raw_info,
 struct PlacementPolicyViolationInfo {
   std::string tablet_id;
   std::string majority_location;
-  int replication_factor;
   int replicas_num_at_majority_location;
 };
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/8e9345a7/src/kudu/tools/rebalancer.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/rebalancer.cc b/src/kudu/tools/rebalancer.cc
index 4d2d769..46f21f6 100644
--- a/src/kudu/tools/rebalancer.cc
+++ b/src/kudu/tools/rebalancer.cc
@@ -57,6 +57,7 @@ using kudu::client::KuduClient;
 using kudu::client::KuduClientBuilder;
 using std::accumulate;
 using std::endl;
+using std::back_inserter;
 using std::inserter;
 using std::ostream;
 using std::map;
@@ -69,6 +70,7 @@ using std::shared_ptr;
 using std::sort;
 using std::string;
 using std::to_string;
+using std::transform;
 using std::unordered_map;
 using std::unordered_set;
 using std::vector;
@@ -105,7 +107,7 @@ Rebalancer::Rebalancer(const Config& config)
     : config_(config) {
 }
 
-Status Rebalancer::PrintStats(std::ostream& out) {
+Status Rebalancer::PrintStats(ostream& out) {
   // First, report on the current balance state of the cluster.
   RETURN_NOT_OK(RefreshKsckResults());
   const KsckResults& results = ksck_->results();
@@ -116,103 +118,49 @@ Status Rebalancer::PrintStats(std::ostream& out) {
   ClusterInfo ci;
   RETURN_NOT_OK(BuildClusterInfo(raw_info, MovesInProgress(), &ci));
 
-  // Per-server replica distribution stats.
-  {
-    out << "Per-server replica distribution summary:" << endl;
-    DataTable summary({"Statistic", "Value"});
-
-    const auto& servers_load_info = ci.balance.servers_by_total_replica_count;
-    if (servers_load_info.empty()) {
-      summary.AddRow({ "N/A", "N/A" });
-    } else {
-      const int64_t total_replica_count = accumulate(
-          servers_load_info.begin(), servers_load_info.end(), 0L,
-          [](int64_t sum, const pair<int32_t, string>& elem) {
-            return sum + elem.first;
-          });
-
-      const auto min_replica_count = servers_load_info.begin()->first;
-      const auto max_replica_count = servers_load_info.rbegin()->first;
-      const double avg_replica_count =
-          1.0 * total_replica_count / servers_load_info.size();
-
-      summary.AddRow({ "Minimum Replica Count", to_string(min_replica_count) });
-      summary.AddRow({ "Maximum Replica Count", to_string(max_replica_count) });
-      summary.AddRow({ "Average Replica Count", to_string(avg_replica_count) });
-    }
-    RETURN_NOT_OK(summary.PrintTo(out));
-    out << endl;
-
-    if (config_.output_replica_distribution_details) {
-      const auto& tserver_summaries = results.tserver_summaries;
-      unordered_map<string, string> tserver_endpoints;
-      for (const auto& summary : tserver_summaries) {
-        tserver_endpoints.emplace(summary.uuid, summary.address);
-      }
-
-      out << "Per-server replica distribution details:" << endl;
-      DataTable servers_info({ "UUID", "Address", "Replica Count" });
-      for (const auto& elem : servers_load_info) {
-        const auto& id = elem.second;
-        servers_info.AddRow({ id, tserver_endpoints[id], to_string(elem.first) });
-      }
-      RETURN_NOT_OK(servers_info.PrintTo(out));
-      out << endl;
-    }
+  const auto& ts_id_by_location = ci.locality.servers_by_location;
+  if (ts_id_by_location.empty()) {
+    // Nothing to report about: there are no tablet servers reported.
+    out << "an empty cluster" << endl;
+    return Status::OK();
   }
 
-  // Per-table replica distribution stats.
-  {
-    out << "Per-table replica distribution summary:" << endl;
-    DataTable summary({ "Replica Skew", "Value" });
-    const auto& table_skew_info = ci.balance.table_info_by_skew;
-    if (table_skew_info.empty()) {
-      summary.AddRow({ "N/A", "N/A" });
-    } else {
-      const auto min_table_skew = table_skew_info.begin()->first;
-      const auto max_table_skew = table_skew_info.rbegin()->first;
-      const int64_t sum_table_skew = accumulate(
-          table_skew_info.begin(), table_skew_info.end(), 0L,
-          [](int64_t sum, const pair<int32_t, TableBalanceInfo>& elem) {
-            return sum + elem.first;
-          });
-      double avg_table_skew = 1.0 * sum_table_skew / table_skew_info.size();
-
-      summary.AddRow({ "Minimum", to_string(min_table_skew) });
-      summary.AddRow({ "Maximum", to_string(max_table_skew) });
-      summary.AddRow({ "Average", to_string(avg_table_skew) });
-    }
-    RETURN_NOT_OK(summary.PrintTo(out));
-    out << endl;
+  if (ts_id_by_location.size() == 1) {
+    // That's about printing information about the whole cluster.
+    return PrintLocationBalanceStats(ts_id_by_location.begin()->first,
+                                     raw_info, ci, out);
+  }
 
-    if (config_.output_replica_distribution_details) {
-      const auto& table_summaries = results.table_summaries;
-      unordered_map<string, const KsckTableSummary*> table_info;
-      for (const auto& summary : table_summaries) {
-        table_info.emplace(summary.id, &summary);
-      }
-      out << "Per-table replica distribution details:" << endl;
-      DataTable skew(
-          { "Table Id", "Replica Count", "Replica Skew", "Table Name" });
-      for (const auto& elem : table_skew_info) {
-        const auto& table_id = elem.second.table_id;
-        const auto it = table_info.find(table_id);
-        const auto* table_summary =
-            (it == table_info.end()) ? nullptr : it->second;
-        const auto& table_name = table_summary ? table_summary->name : "";
-        const auto total_replica_count = table_summary
-            ? table_summary->replication_factor * table_summary->TotalTablets()
-            : 0;
-        skew.AddRow({ table_id,
-                      to_string(total_replica_count),
-                      to_string(elem.first),
-                      table_name });
-      }
-      RETURN_NOT_OK(skew.PrintTo(out));
-      out << endl;
-    }
+  // The stats are more detailed in the case of a multi-location cluster.
+  DCHECK_GT(ts_id_by_location.size(), 1);
+
+  // 1. Print information about cross-location balance.
+  RETURN_NOT_OK(PrintCrossLocationBalanceStats(ci, out));
+
+  // 2. Iterating over locations in the cluster, print per-location balance
+  //    information. Since the ts_id_by_location is not sorted, let's first
+  //    create a sorted list of locations so the ouput would be sorted by
+  //    location.
+  vector<string> locations;
+  locations.reserve(ts_id_by_location.size());
+  transform(ts_id_by_location.cbegin(), ts_id_by_location.cend(),
+            back_inserter(locations),
+            [](const unordered_map<string, set<string>>::value_type& elem)
{
+              return elem.first;
+            });
+  sort(locations.begin(), locations.end());
+
+  for (const auto& location : locations) {
+    ClusterRawInfo raw_info;
+    RETURN_NOT_OK(KsckResultsToClusterRawInfo(location, results, &raw_info));
+    ClusterInfo ci;
+    RETURN_NOT_OK(BuildClusterInfo(raw_info, MovesInProgress(), &ci));
+    RETURN_NOT_OK(PrintLocationBalanceStats(location, raw_info, ci, out));
   }
 
+  // 3. Print information about placement policy violations.
+  RETURN_NOT_OK(PrintPolicyViolationInfo(raw_info, out));
+
   return Status::OK();
 }
 
@@ -538,6 +486,194 @@ Status Rebalancer::FilterCrossLocationTabletCandidates(
   return Status::OK();
 }
 
+Status Rebalancer::PrintCrossLocationBalanceStats(const ClusterInfo& ci,
+                                                  ostream& out) const {
+  // Print location load information.
+  map<string, int64_t> replicas_num_by_location;
+  for (const auto& elem : ci.balance.servers_by_total_replica_count) {
+    const auto& location = FindOrDie(ci.locality.location_by_ts_id, elem.second);
+    LookupOrEmplace(&replicas_num_by_location, location, 0) += elem.first;
+  }
+  out << "Locations load summary:" << endl;
+  DataTable location_load_summary({"Location", "Load"});
+  for (const auto& elem : replicas_num_by_location) {
+    const auto& location = elem.first;
+    const auto servers_num =
+        FindOrDie(ci.locality.servers_by_location, location).size();
+    CHECK_GT(servers_num, 0);
+    double location_load = static_cast<double>(elem.second) / servers_num;
+    location_load_summary.AddRow({ location, to_string(location_load) });
+  }
+  RETURN_NOT_OK(location_load_summary.PrintTo(out));
+  out << endl;
+
+  return Status::OK();
+}
+
+Status Rebalancer::PrintLocationBalanceStats(const string& location,
+                                             const ClusterRawInfo& raw_info,
+                                             const ClusterInfo& ci,
+                                             ostream& out) const {
+  if (!location.empty()) {
+    out << "--------------------------------------------------" << endl;
+    out << "Location: " << location << endl;
+    out << "--------------------------------------------------" << endl;
+  }
+
+  // Per-server replica distribution stats.
+  {
+    out << "Per-server replica distribution summary:" << endl;
+    DataTable summary({"Statistic", "Value"});
+
+    const auto& servers_load_info = ci.balance.servers_by_total_replica_count;
+    if (servers_load_info.empty()) {
+      summary.AddRow({ "N/A", "N/A" });
+    } else {
+      const int64_t total_replica_count = accumulate(
+          servers_load_info.begin(), servers_load_info.end(), 0L,
+          [](int64_t sum, const pair<int32_t, string>& elem) {
+            return sum + elem.first;
+          });
+
+      const auto min_replica_count = servers_load_info.begin()->first;
+      const auto max_replica_count = servers_load_info.rbegin()->first;
+      const double avg_replica_count =
+          1.0 * total_replica_count / servers_load_info.size();
+
+      summary.AddRow({ "Minimum Replica Count", to_string(min_replica_count) });
+      summary.AddRow({ "Maximum Replica Count", to_string(max_replica_count) });
+      summary.AddRow({ "Average Replica Count", to_string(avg_replica_count) });
+    }
+    RETURN_NOT_OK(summary.PrintTo(out));
+    out << endl;
+
+    if (config_.output_replica_distribution_details) {
+      const auto& tserver_summaries = raw_info.tserver_summaries;
+      unordered_map<string, string> tserver_endpoints;
+      for (const auto& summary : tserver_summaries) {
+        tserver_endpoints.emplace(summary.uuid, summary.address);
+      }
+
+      out << "Per-server replica distribution details:" << endl;
+      DataTable servers_info({ "UUID", "Address", "Replica Count" });
+      for (const auto& elem : servers_load_info) {
+        const auto& id = elem.second;
+        servers_info.AddRow({ id, tserver_endpoints[id], to_string(elem.first) });
+      }
+      RETURN_NOT_OK(servers_info.PrintTo(out));
+      out << endl;
+    }
+  }
+
+  // Per-table replica distribution stats.
+  {
+    out << "Per-table replica distribution summary:" << endl;
+    DataTable summary({ "Replica Skew", "Value" });
+    const auto& table_skew_info = ci.balance.table_info_by_skew;
+    if (table_skew_info.empty()) {
+      summary.AddRow({ "N/A", "N/A" });
+    } else {
+      const auto min_table_skew = table_skew_info.begin()->first;
+      const auto max_table_skew = table_skew_info.rbegin()->first;
+      const int64_t sum_table_skew = accumulate(
+          table_skew_info.begin(), table_skew_info.end(), 0L,
+          [](int64_t sum, const pair<int32_t, TableBalanceInfo>& elem) {
+            return sum + elem.first;
+          });
+      double avg_table_skew = 1.0 * sum_table_skew / table_skew_info.size();
+
+      summary.AddRow({ "Minimum", to_string(min_table_skew) });
+      summary.AddRow({ "Maximum", to_string(max_table_skew) });
+      summary.AddRow({ "Average", to_string(avg_table_skew) });
+    }
+    RETURN_NOT_OK(summary.PrintTo(out));
+    out << endl;
+
+    if (config_.output_replica_distribution_details) {
+      const auto& table_summaries = raw_info.table_summaries;
+      unordered_map<string, const KsckTableSummary*> table_info;
+      for (const auto& summary : table_summaries) {
+        table_info.emplace(summary.id, &summary);
+      }
+      out << "Per-table replica distribution details:" << endl;
+      DataTable skew(
+          { "Table Id", "Replica Count", "Replica Skew", "Table Name" });
+      for (const auto& elem : table_skew_info) {
+        const auto& table_id = elem.second.table_id;
+        const auto it = table_info.find(table_id);
+        const auto* table_summary =
+            (it == table_info.end()) ? nullptr : it->second;
+        const auto& table_name = table_summary ? table_summary->name : "";
+        const auto total_replica_count = table_summary
+            ? table_summary->replication_factor * table_summary->TotalTablets()
+            : 0;
+        skew.AddRow({ table_id,
+                      to_string(total_replica_count),
+                      to_string(elem.first),
+                      table_name });
+      }
+      RETURN_NOT_OK(skew.PrintTo(out));
+      out << endl;
+    }
+  }
+
+  return Status::OK();
+}
+
+Status Rebalancer::PrintPolicyViolationInfo(const ClusterRawInfo& raw_info,
+                                            ostream& out) const {
+  TabletsPlacementInfo placement_info;
+  RETURN_NOT_OK(BuildTabletsPlacementInfo(raw_info, &placement_info));
+  vector<PlacementPolicyViolationInfo> ppvi;
+  RETURN_NOT_OK(DetectPlacementPolicyViolations(placement_info, &ppvi));
+  out << "Placement policy violations:" << endl;
+  if (ppvi.empty()) {
+    out << "  none" << endl << endl;;
+    return Status::OK();
+  }
+
+  if (config_.output_replica_distribution_details) {
+    DataTable stats(
+        { "Location", "Table Name", "Tablet", "RF", "Replicas at location" });
+    for (const auto& info : ppvi) {
+      const auto& table_id = FindOrDie(placement_info.tablet_to_table_id,
+                                       info.tablet_id);
+      const auto& table_info = FindOrDie(placement_info.tables_info, table_id);
+      stats.AddRow({ info.majority_location,
+                     table_info.name,
+                     info.tablet_id,
+                     to_string(table_info.replication_factor),
+                     to_string(info.replicas_num_at_majority_location) });
+    }
+    RETURN_NOT_OK(stats.PrintTo(out));
+  } else {
+    DataTable summary({ "Location",
+                        "Number of non-complying tables",
+                        "Number of non-complying tablets" });
+    typedef pair<unordered_set<string>, unordered_set<string>> TableTabletIds;
+    // Location --> sets of identifiers of tables and tablets hosted by the
+    // tablet servers at the location. The summary is sorted by location.
+    map<string, TableTabletIds> info_by_location;
+    for (const auto& info : ppvi) {
+      const auto& table_id = FindOrDie(placement_info.tablet_to_table_id,
+                                       info.tablet_id);
+      auto& elem = LookupOrEmplace(&info_by_location,
+                                   info.majority_location, TableTabletIds());
+      elem.first.emplace(table_id);
+      elem.second.emplace(info.tablet_id);
+    }
+    for (const auto& elem : info_by_location) {
+      summary.AddRow({ elem.first,
+                       to_string(elem.second.first.size()),
+                       to_string(elem.second.second.size()) });
+    }
+    RETURN_NOT_OK(summary.PrintTo(out));
+  }
+  out << endl;
+
+  return Status::OK();
+}
+
 Status Rebalancer::BuildClusterInfo(const ClusterRawInfo& raw_info,
                                     const MovesInProgress& moves_in_progress,
                                     ClusterInfo* info) const {
@@ -1084,11 +1220,11 @@ Status Rebalancer::AlgoBasedRunner::GetNextMovesImpl(
     return Status::OK();
   }
   unordered_set<string> tablets_in_move;
-  std::transform(scheduled_moves_.begin(), scheduled_moves_.end(),
-                 inserter(tablets_in_move, tablets_in_move.begin()),
-                 [](const MovesInProgress::value_type& elem) {
-                   return elem.first;
-                 });
+  transform(scheduled_moves_.begin(), scheduled_moves_.end(),
+            inserter(tablets_in_move, tablets_in_move.begin()),
+            [](const MovesInProgress::value_type& elem) {
+              return elem.first;
+            });
   for (const auto& move : moves) {
     vector<string> tablet_ids;
     RETURN_NOT_OK(FindReplicas(move, raw_info, &tablet_ids));

http://git-wip-us.apache.org/repos/asf/kudu/blob/8e9345a7/src/kudu/tools/rebalancer.h
----------------------------------------------------------------------
diff --git a/src/kudu/tools/rebalancer.h b/src/kudu/tools/rebalancer.h
index 7bb0d73..cbaef49 100644
--- a/src/kudu/tools/rebalancer.h
+++ b/src/kudu/tools/rebalancer.h
@@ -442,6 +442,21 @@ class Rebalancer {
       const TableReplicaMove& move,
       std::vector<std::string>* tablet_ids);
 
+  // Print information on the cross-location balance.
+  Status PrintCrossLocationBalanceStats(const ClusterInfo& ci,
+                                        std::ostream& out) const;
+
+  // Print statistics for the specified location. If 'location' is an empty
+  // string, that's about printing the cluster-wide stats for a cluster that
+  // doesn't have any locations defined.
+  Status PrintLocationBalanceStats(const std::string& location,
+                                   const ClusterRawInfo& raw_info,
+                                   const ClusterInfo& ci,
+                                   std::ostream& out) const;
+
+  Status PrintPolicyViolationInfo(const ClusterRawInfo& raw_info,
+                                  std::ostream& out) const;
+
   // Convert the 'raw' information about the cluster into information suitable
   // for the input of the high-level rebalancing algorithm.
   // The 'moves_in_progress' parameter contains information on the replica moves

http://git-wip-us.apache.org/repos/asf/kudu/blob/8e9345a7/src/kudu/tools/rebalancer_tool-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/rebalancer_tool-test.cc b/src/kudu/tools/rebalancer_tool-test.cc
index 60e9f45..9fcd997 100644
--- a/src/kudu/tools/rebalancer_tool-test.cc
+++ b/src/kudu/tools/rebalancer_tool-test.cc
@@ -25,6 +25,7 @@
 #include <string>
 #include <thread>
 #include <unordered_map>
+#include <unordered_set>
 #include <utility>
 #include <vector>
 
@@ -88,6 +89,7 @@ using std::thread;
 using std::tuple;
 using std::unique_ptr;
 using std::unordered_map;
+using std::unordered_set;
 using std::vector;
 using strings::Substitute;
 
@@ -207,28 +209,14 @@ TEST_P(RebalanceStartCriteriaTest, TabletServerIsDown) {
   ASSERT_STR_MATCHES(err, err_msg_pattern);
 }
 
-// Create tables with unbalanced replica distribution: useful in
-// rebalancer-related tests.
-static Status CreateUnbalancedTables(
+static Status CreateTables(
     cluster::ExternalMiniCluster* cluster,
     client::KuduClient* client,
     const Schema& table_schema,
     const string& table_name_pattern,
     int num_tables,
     int rep_factor,
-    int tserver_idx_from,
-    int tserver_num,
-    int tserver_unresponsive_ms,
     vector<string>* table_names = nullptr) {
-  // Keep running only some tablet servers and shut down the rest.
-  for (auto i = tserver_idx_from; i < tserver_num; ++i) {
-    cluster->tablet_server(i)->Shutdown();
-  }
-
-  // Wait for the catalog manager to understand that not all tablet servers
-  // are available.
-  SleepFor(MonoDelta::FromMilliseconds(5 * tserver_unresponsive_ms / 4));
-
   // Create tables with their tablet replicas landing only on the tablet servers
   // which are up and running.
   auto client_schema = KuduSchema::FromSchema(table_schema);
@@ -253,6 +241,32 @@ static Status CreateUnbalancedTables(
     }
   }
 
+  return Status::OK();
+}
+
+// Create tables with unbalanced replica distribution: useful in
+// rebalancer-related tests.
+static Status CreateUnbalancedTables(
+    cluster::ExternalMiniCluster* cluster,
+    client::KuduClient* client,
+    const Schema& table_schema,
+    const string& table_name_pattern,
+    int num_tables,
+    int rep_factor,
+    int tserver_idx_from,
+    int tserver_num,
+    int tserver_unresponsive_ms,
+    vector<string>* table_names = nullptr) {
+  // Keep running only some tablet servers and shut down the rest.
+  for (auto i = tserver_idx_from; i < tserver_num; ++i) {
+    cluster->tablet_server(i)->Shutdown();
+  }
+
+  // Wait for the catalog manager to understand that not all tablet servers
+  // are available.
+  SleepFor(MonoDelta::FromMilliseconds(5 * tserver_unresponsive_ms / 4));
+  RETURN_NOT_OK(CreateTables(cluster, client, table_schema, table_name_pattern,
+                             num_tables, rep_factor, table_names));
   for (auto i = tserver_idx_from; i < tserver_num; ++i) {
     RETURN_NOT_OK(cluster->tablet_server(i)->Restart());
   }
@@ -404,9 +418,13 @@ class RebalancingTest : public tserver::TabletServerIntegrationTestBase
{
   static const char* const kExitOnSignalStr;
   static const char* const kTableNamePattern;
 
+  // Working around limitations of older libstdc++.
+  static const unordered_set<string> kEmptySet;
+
   void Prepare(const vector<string>& extra_tserver_flags = {},
                const vector<string>& extra_master_flags = {},
                const LocationInfo& location_info = {},
+               const unordered_set<string>& empty_locations = kEmptySet,
                vector<string>* created_tables_names = nullptr) {
     const auto& scheme_flag = Substitute(
         "--raft_prepare_replacement_before_eviction=$0", is_343_scheme());
@@ -420,12 +438,60 @@ class RebalancingTest : public tserver::TabletServerIntegrationTestBase
{
 
     FLAGS_num_tablet_servers = num_tservers_;
     FLAGS_num_replicas = rep_factor_;
-    NO_FATALS(BuildAndStart(tserver_flags_, master_flags_, location_info));
+    NO_FATALS(BuildAndStart(tserver_flags_, master_flags_, location_info,
+                            /*create_table=*/ false));
+
+    if (location_info.empty()) {
+      ASSERT_OK(CreateUnbalancedTables(
+          cluster_.get(), client_.get(), schema_, kTableNamePattern,
+          num_tables_, rep_factor_, rep_factor_ + 1, num_tservers_,
+          tserver_unresponsive_ms_, created_tables_names));
+    } else {
+      ASSERT_OK(CreateTablesExcludingLocations(empty_locations,
+                                               created_tables_names));
+    }
+  }
+
+  // Create tables placing their tablet replicas everywhere but not at the
+  // tablet servers in the specified locations. This is similar to
+  // CreateUnbalancedTables() but the set of tablet servers to avoid is defined
+  // by the set of the specified locations.
+  Status CreateTablesExcludingLocations(
+      const unordered_set<string>& excluded_locations,
+      vector<string>* table_names = nullptr) {
+    // Shutdown all tablet servers in the specified locations so no tablet
+    // replicas would be hosted by those servers.
+    unordered_set<string> seen_locations;
+    if (!excluded_locations.empty()) {
+      for (const auto& elem : tablet_servers_) {
+        auto* ts = elem.second;
+        if (ContainsKey(excluded_locations, ts->location)) {
+          cluster_->tablet_server_by_uuid(ts->uuid())->Shutdown();
+          EmplaceIfNotPresent(&seen_locations, ts->location);
+        }
+      }
+    }
+    // Sanity check: every specified location should have been seen, otherwise
+    // something is wrong with the tablet servers' registration.
+    CHECK_EQ(excluded_locations.size(), seen_locations.size());
+
+    // Wait for the catalog manager to understand that not all tablet servers
+    // are available.
+    SleepFor(MonoDelta::FromMilliseconds(5 * tserver_unresponsive_ms_ / 4));
+    RETURN_NOT_OK(CreateTables(cluster_.get(), client_.get(), schema_,
+                               kTableNamePattern, num_tables_, rep_factor_,
+                               table_names));
+    // Start tablet servers at the excluded locations.
+    if (!excluded_locations.empty()) {
+      for (const auto& elem : tablet_servers_) {
+        auto* ts = elem.second;
+        if (ContainsKey(excluded_locations, ts->location)) {
+          RETURN_NOT_OK(cluster_->tablet_server_by_uuid(ts->uuid())->Restart());
+        }
+      }
+    }
 
-    ASSERT_OK(CreateUnbalancedTables(
-        cluster_.get(), client_.get(), schema_, kTableNamePattern,
-        num_tables_, rep_factor_, rep_factor_ + 1, num_tservers_,
-        tserver_unresponsive_ms_, created_tables_names));
+    return Status::OK();
   }
 
   // When the rebalancer starts moving replicas, ksck detects corruption
@@ -454,6 +520,7 @@ class RebalancingTest : public tserver::TabletServerIntegrationTestBase
{
 };
 const char* const RebalancingTest::kExitOnSignalStr = "kudu: process exited on signal";
 const char* const RebalancingTest::kTableNamePattern = "rebalance_test_table_$0";
+const unordered_set<string> RebalancingTest::kEmptySet = unordered_set<string>();
 
 typedef testing::WithParamInterface<Kudu1097> Kudu1097ParamTest;
 
@@ -1185,7 +1252,7 @@ TEST_F(LocationAwareRebalancingBasicTest, Basic) {
 
   const LocationInfo location_info = { { "/A", 2 }, { "/B", 2 }, { "/C", 2 }, };
   vector<string> table_names;
-  NO_FATALS(Prepare({}, {}, location_info, &table_names));
+  NO_FATALS(Prepare({}, {}, location_info, kEmptySet, &table_names));
 
   const vector<string> tool_args = {
     "cluster",
@@ -1261,5 +1328,104 @@ TEST_F(LocationAwareRebalancingBasicTest, Basic) {
   }
 }
 
+class LocationAwareBalanceInfoTest : public RebalancingTest {
+ public:
+  LocationAwareBalanceInfoTest()
+      : RebalancingTest(/*num_tables=*/ 1,
+                        /*rep_factor=*/ 3,
+                        /*num_tservers=*/ 5) {
+  }
+
+  bool is_343_scheme() const override {
+    // These tests are for the 3-4-3 replica management scheme only.
+    return true;
+  }
+};
+
+// Verify the output of the location-aware rebalancer against a cluster
+// that has multiple locations.
+TEST_F(LocationAwareBalanceInfoTest, ReportOnly) {
+  static const char kReferenceOutput[] =
+    R"***(Locations load summary:
+ Location |   Load
+----------+----------
+ /A       | 3.000000
+ /B       | 3.000000
+ /C       | 0.000000
+
+--------------------------------------------------
+Location: /A
+--------------------------------------------------
+Per-server replica distribution summary:
+       Statistic       |  Value
+-----------------------+----------
+ Minimum Replica Count | 3
+ Maximum Replica Count | 3
+ Average Replica Count | 3.000000
+
+Per-table replica distribution summary:
+ Replica Skew |  Value
+--------------+----------
+ Minimum      | 0
+ Maximum      | 0
+ Average      | 0.000000
+
+--------------------------------------------------
+Location: /B
+--------------------------------------------------
+Per-server replica distribution summary:
+       Statistic       |  Value
+-----------------------+----------
+ Minimum Replica Count | 3
+ Maximum Replica Count | 3
+ Average Replica Count | 3.000000
+
+Per-table replica distribution summary:
+ Replica Skew |  Value
+--------------+----------
+ Minimum      | 0
+ Maximum      | 0
+ Average      | 0.000000
+
+--------------------------------------------------
+Location: /C
+--------------------------------------------------
+Per-server replica distribution summary:
+       Statistic       |  Value
+-----------------------+----------
+ Minimum Replica Count | 0
+ Maximum Replica Count | 0
+ Average Replica Count | 0.000000
+
+Per-table replica distribution summary:
+ Replica Skew | Value
+--------------+-------
+ N/A          | N/A
+
+Placement policy violations:
+ Location | Number of non-complying tables | Number of non-complying tablets
+----------+--------------------------------+---------------------------------
+ /B       | 1                              | 3
+)***";
+
+  const LocationInfo location_info = { { "/A", 1 }, { "/B", 2 }, { "/C", 2 }, };
+  NO_FATALS(Prepare({}, {}, location_info, { "/C" }));
+
+  string out;
+  string err;
+  Status s = RunKuduTool({
+    "cluster",
+    "rebalance",
+    cluster_->master()->bound_rpc_addr().ToString(),
+    "--report_only",
+  }, &out, &err);
+  ASSERT_TRUE(s.ok()) << ToolRunInfo(s, out, err);
+  // The output should match the reference report.
+  ASSERT_STR_CONTAINS(out, kReferenceOutput);
+  // The actual rebalancing should not run.
+  ASSERT_STR_NOT_CONTAINS(out, "rebalancing is complete:")
+      << ToolRunInfo(s, out, err);
+}
+
 } // namespace tools
 } // namespace kudu


Mime
View raw message