kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ale...@apache.org
Subject [1/2] kudu git commit: [rebalancer] location-aware rebalancer (part 5/n)
Date Tue, 30 Oct 2018 00:25:45 GMT
Repository: kudu
Updated Branches:
  refs/heads/master 34bb7f93b -> f731ea004


[rebalancer] location-aware rebalancer (part 5/n)

Added LocationBalancingAlgo and corresponding units tests.

Change-Id: I7ffff8446fec8b8f80b7c6112bdd9d53f3dbf506
Reviewed-on: http://gerrit.cloudera.org:8080/11746
Tested-by: Alexey Serbin <aserbin@cloudera.com>
Reviewed-by: Will Berkeley <wdberkeley@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/87084c10
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/87084c10
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/87084c10

Branch: refs/heads/master
Commit: 87084c108e1836ebb7811eace93836ee872a253e
Parents: 34bb7f9
Author: Alexey Serbin <aserbin@cloudera.com>
Authored: Fri Oct 19 22:59:26 2018 -0700
Committer: Alexey Serbin <aserbin@cloudera.com>
Committed: Mon Oct 29 23:26:48 2018 +0000

----------------------------------------------------------------------
 src/kudu/tools/rebalance_algo-test.cc | 317 ++++++++++++++++++++++++++++-
 src/kudu/tools/rebalance_algo.cc      | 220 ++++++++++++++++++++
 src/kudu/tools/rebalance_algo.h       |  90 +++++++-
 3 files changed, 623 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/87084c10/src/kudu/tools/rebalance_algo-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/rebalance_algo-test.cc b/src/kudu/tools/rebalance_algo-test.cc
index 212819f..3271339 100644
--- a/src/kudu/tools/rebalance_algo-test.cc
+++ b/src/kudu/tools/rebalance_algo-test.cc
@@ -25,6 +25,7 @@
 #include <memory>
 #include <set>
 #include <string>
+#include <unordered_map>
 #include <utility>
 #include <vector>
 
@@ -33,6 +34,7 @@
 #include <gtest/gtest.h>
 
 #include "kudu/gutil/macros.h"
+#include "kudu/gutil/map-util.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/util/random.h"
 #include "kudu/util/status.h"
@@ -53,11 +55,20 @@ struct TestClusterConfig;
     } \
   } while (false)
 
+#define VERIFY_LOCATION_BALANCING_MOVES(test_config) \
+  do { \
+    for (auto idx = 0; idx < ARRAYSIZE((test_config)); ++idx) { \
+      SCOPED_TRACE(Substitute("test config index: $0", idx)); \
+      NO_FATALS(VerifyLocationRebalancingMoves((test_config)[idx])); \
+    } \
+  } while (false)
+
 using std::endl;
 using std::ostream;
 using std::ostringstream;
 using std::set;
 using std::string;
+using std::unordered_map;
 using std::vector;
 using strings::Substitute;
 
@@ -68,12 +79,19 @@ struct TablePerServerReplicas {
   const string table_id;
 
   // Number of replicas of this table on each server in the cluster.
+  // By definition, the indices in this container correspond to indices
+  // in TestClusterConfig::tserver_uuids.
   const vector<size_t> num_replicas_by_server;
 };
 
 // Structure to describe rebalancing-related state of the cluster expressively
 // enough for the tests.
 struct TestClusterConfig {
+  // Distribution of tablet servers by locations. If the map is empty, it's
+  // interpreted as if the cluster does not have any locations specified
+  // (i.e. all the tablet servers are all in the same unnamed location).
+  const unordered_map<string, set<string>> servers_by_location;
+
   // UUIDs of tablet servers; every element must be unique.
   const vector<string> tserver_uuids;
 
@@ -148,6 +166,17 @@ void ClusterConfigToClusterInfo(const TestClusterConfig& tcc,
     table_info_by_skew.emplace(max_count - min_count, std::move(info));
   }
 
+  // TODO(aserbin): add a consistency check on location-related fields.
+  auto& locality = result.locality;
+  locality.servers_by_location = tcc.servers_by_location;
+  for (const auto& elem : tcc.servers_by_location) {
+    const auto& location = elem.first;
+    const auto& server_ids = elem.second;
+    for (const auto& server_id : server_ids) {
+      EmplaceOrDie(&locality.location_by_ts_id, server_id, location);
+    }
+  }
+
   *cluster_info = std::move(result);
 }
 
@@ -163,6 +192,18 @@ void VerifyRebalancingMoves(const TestClusterConfig& cfg) {
   EXPECT_EQ(cfg.expected_moves, moves);
 }
 
+// Similar to VerifyRebalancingMoves(), but related to locations rebalancing.
+void VerifyLocationRebalancingMoves(const TestClusterConfig& cfg) {
+  vector<TableReplicaMove> moves;
+  {
+    ClusterInfo ci;
+    ClusterConfigToClusterInfo(cfg, &ci);
+    LocationBalancingAlgo algo;
+    ASSERT_OK(algo.GetNextMoves(ci, 0, &moves));
+  }
+  EXPECT_EQ(cfg.expected_moves, moves);
+}
+
 // Is 'cbi' balanced according to the two-dimensional greedy algorithm?
 bool IsBalanced(const ClusterBalanceInfo& cbi) {
   if (cbi.table_info_by_skew.empty()) {
@@ -223,7 +264,7 @@ TEST(RebalanceAlgoUnitTest, NoTableSkewInClusterBalanceInfoGetNextMoves)
{
 
 // Test the behavior of the internal (non-public) algorithm's method
 // GetNextMove() when no input information is given.
-TEST(RebalanceAlgoUnitTest, EmptyClusterBalanceInfoGetNextMove) {
+TEST(RebalanceAlgoUnitTest, EmptyBalanceInfoGetNextMove) {
   boost::optional<TableReplicaMove> move;
   const ClusterInfo info;
   const auto s = TwoDimensionalGreedyAlgo().GetNextMove(info, &move);
@@ -231,6 +272,10 @@ TEST(RebalanceAlgoUnitTest, EmptyClusterBalanceInfoGetNextMove) {
   EXPECT_EQ(boost::none, move);
 }
 
+// Workaround for older libstdc++ (like on RH/CentOS 6). In case of newer
+// libstdc++/libc++ '{}' works as needed for an empty unordered map.
+static const decltype(TestClusterConfig::servers_by_location) kNoLocations;
+
 // Various scenarios of balanced configurations where no moves are expected
 // to happen.
 TEST(RebalanceAlgoUnitTest, AlreadyBalanced) {
@@ -238,6 +283,7 @@ TEST(RebalanceAlgoUnitTest, AlreadyBalanced) {
   const TestClusterConfig kConfigs[] = {
     {
       // A single tablet server with a single replica of the only table.
+      kNoLocations,
       { "0", },
       {
         { "A", { 1 } },
@@ -245,6 +291,7 @@ TEST(RebalanceAlgoUnitTest, AlreadyBalanced) {
     },
     {
       // A single tablet server in the cluster that hosts all replicas.
+      kNoLocations,
       { "0", },
       {
         { "A", { 1 } },
@@ -254,6 +301,7 @@ TEST(RebalanceAlgoUnitTest, AlreadyBalanced) {
     },
     {
       // Single table and 2 TS: 100 and 99 replicas at each.
+      kNoLocations,
       { "0", "1", },
       {
         { "A", { 100, 99, } },
@@ -261,6 +309,7 @@ TEST(RebalanceAlgoUnitTest, AlreadyBalanced) {
     },
     {
       // Table- and cluster-wise balanced configuration with one-off skew.
+      kNoLocations,
       { "0", "1", },
       {
         { "A", { 1, 1, } },
@@ -271,6 +320,7 @@ TEST(RebalanceAlgoUnitTest, AlreadyBalanced) {
       // A configuration which has zero skew cluster-wise, while the table-wise
       // balance has one-off skew: the algorithm should not try to correct
       // the latter.
+      kNoLocations,
       { "0", "1", },
       {
         { "A", { 1, 2, } },
@@ -280,6 +330,7 @@ TEST(RebalanceAlgoUnitTest, AlreadyBalanced) {
       },
     },
     {
+      kNoLocations,
       { "0", "1", "2", },
       {
         { "A", { 1, 0, 0, } },
@@ -290,6 +341,7 @@ TEST(RebalanceAlgoUnitTest, AlreadyBalanced) {
     {
       // A simple balanced case: 3 tablet servers, 3 tables with
       // one replica per server.
+      kNoLocations,
       { "0", "1", "2", },
       {
         { "A", { 1, 1, 1, } },
@@ -298,6 +350,7 @@ TEST(RebalanceAlgoUnitTest, AlreadyBalanced) {
       },
     },
     {
+      kNoLocations,
       { "0", "1", "2", },
       {
         { "A", { 0, 1, 1, } },
@@ -306,6 +359,7 @@ TEST(RebalanceAlgoUnitTest, AlreadyBalanced) {
       },
     },
     {
+      kNoLocations,
       { "0", "1", "2", },
       {
         { "A", { 2, 1, 1, } },
@@ -314,6 +368,7 @@ TEST(RebalanceAlgoUnitTest, AlreadyBalanced) {
       },
     },
     {
+      kNoLocations,
       { "0", "1", "2", },
       {
         { "A", { 1, 1, 0, } },
@@ -325,6 +380,7 @@ TEST(RebalanceAlgoUnitTest, AlreadyBalanced) {
       },
     },
     {
+      kNoLocations,
       { "0", "1", "2", },
       {
         { "A", { 1, 0, 1, } },
@@ -332,6 +388,7 @@ TEST(RebalanceAlgoUnitTest, AlreadyBalanced) {
       },
     },
     {
+      kNoLocations,
       { "0", "1", "2", },
       {
         { "B", { 1, 0, 1, } },
@@ -339,6 +396,7 @@ TEST(RebalanceAlgoUnitTest, AlreadyBalanced) {
       },
     },
     {
+      kNoLocations,
       { "0", "1", "2", },
       {
         { "A", { 2, 2, 1, } },
@@ -346,6 +404,7 @@ TEST(RebalanceAlgoUnitTest, AlreadyBalanced) {
       },
     },
     {
+      kNoLocations,
       { "0", "1", "2", },
       {
         { "A", { 2, 2, 1, } },
@@ -353,6 +412,7 @@ TEST(RebalanceAlgoUnitTest, AlreadyBalanced) {
       },
     },
     {
+      kNoLocations,
       { "0", "1", "2", },
       {
         { "A", { 2, 2, 1, } },
@@ -361,6 +421,7 @@ TEST(RebalanceAlgoUnitTest, AlreadyBalanced) {
       },
     },
     {
+      kNoLocations,
       { "0", "1", "2", },
       {
         { "A", { 0, 1, 0, } },
@@ -378,6 +439,7 @@ TEST(RebalanceAlgoUnitTest, AlreadyBalanced) {
 TEST(RebalanceAlgoUnitTest, TableWiseBalanced) {
   const TestClusterConfig kConfigs[] = {
     {
+      kNoLocations,
       { "0", "1", },
       {
         { "A", { 100, 99, } },
@@ -386,6 +448,7 @@ TEST(RebalanceAlgoUnitTest, TableWiseBalanced) {
       { { "A", "0", "1" }, }
     },
     {
+      kNoLocations,
       { "0", "1", },
       {
         { "A", { 1, 2, } },
@@ -396,6 +459,7 @@ TEST(RebalanceAlgoUnitTest, TableWiseBalanced) {
       { { "A", "1", "0" }, }
     },
     {
+      kNoLocations,
       { "0", "1", "2", },
       {
         { "A", { 1, 0, 0, } },
@@ -405,6 +469,7 @@ TEST(RebalanceAlgoUnitTest, TableWiseBalanced) {
       { { "A", "0", "2" }, }
     },
     {
+      kNoLocations,
       { "0", "1", "2", },
       {
         { "A", { 1, 1, 1, } },
@@ -414,6 +479,7 @@ TEST(RebalanceAlgoUnitTest, TableWiseBalanced) {
       { { "B", "2", "0" }, }
     },
     {
+      kNoLocations,
       { "0", "1", "2", },
       {
         { "A", { 1, 1, 0, } },
@@ -423,6 +489,7 @@ TEST(RebalanceAlgoUnitTest, TableWiseBalanced) {
       { { "B", "0", "1" }, }
     },
     {
+      kNoLocations,
       { "0", "1", "2", },
       {
         { "C", { 1, 0, 1, } },
@@ -442,6 +509,7 @@ TEST(RebalanceAlgoUnitTest, OneMoveNoCycling) {
   // that's why multiples of virtually same configuration.
   const TestClusterConfig kConfigs[] = {
     {
+      kNoLocations,
       { "0", "1", "2", },
       {
         { "A", { 1, 0, 1, } },
@@ -451,6 +519,7 @@ TEST(RebalanceAlgoUnitTest, OneMoveNoCycling) {
       { { "A", "0", "1" }, }
     },
     {
+      kNoLocations,
       { "0", "1", "2", },
       {
         { "A", { 1, 0, 1, } },
@@ -460,6 +529,7 @@ TEST(RebalanceAlgoUnitTest, OneMoveNoCycling) {
       { { "A", "0", "1" }, }
     },
     {
+      kNoLocations,
       { "0", "1", "2", },
       {
         { "B", { 1, 0, 1, } },
@@ -469,6 +539,7 @@ TEST(RebalanceAlgoUnitTest, OneMoveNoCycling) {
       { { "B", "0", "1" }, }
     },
     {
+      kNoLocations,
       { "0", "1", "2", },
       {
         { "B", { 1, 0, 1, } },
@@ -478,6 +549,7 @@ TEST(RebalanceAlgoUnitTest, OneMoveNoCycling) {
       { { "B", "0", "1" }, }
     },
     {
+      kNoLocations,
       { "0", "1", "2", },
       {
         { "C", { 1, 0, 1, } },
@@ -487,6 +559,7 @@ TEST(RebalanceAlgoUnitTest, OneMoveNoCycling) {
       { { "C", "0", "1" }, }
     },
     {
+      kNoLocations,
       { "0", "1", "2", },
       {
         { "C", { 1, 0, 1, } },
@@ -505,6 +578,7 @@ TEST(RebalanceAlgoUnitTest, OneMoveNoCycling) {
 TEST(RebalanceAlgoUnitTest, ClusterWiseBalanced) {
   const TestClusterConfig kConfigs[] = {
     {
+      kNoLocations,
       { "0", "1", },
       {
         { "A", { 2, 0, } },
@@ -515,6 +589,7 @@ TEST(RebalanceAlgoUnitTest, ClusterWiseBalanced) {
       }
     },
     {
+      kNoLocations,
       { "0", "1", },
       {
         { "A", { 1, 2, } },
@@ -527,6 +602,7 @@ TEST(RebalanceAlgoUnitTest, ClusterWiseBalanced) {
       }
     },
     {
+      kNoLocations,
       { "0", "1", "2", },
       {
         { "A", { 2, 1, 0, } },
@@ -538,6 +614,7 @@ TEST(RebalanceAlgoUnitTest, ClusterWiseBalanced) {
       }
     },
     {
+      kNoLocations,
       { "0", "1", "2", },
       {
         { "A", { 2, 1, 0, } },
@@ -558,6 +635,7 @@ TEST(RebalanceAlgoUnitTest, ClusterWiseBalanced) {
 TEST(RebalanceAlgoUnitTest, FewMoves) {
   const TestClusterConfig kConfigs[] = {
     {
+      kNoLocations,
       { "0", "1", },
       {
         { "A", { 2, 0, } },
@@ -565,6 +643,7 @@ TEST(RebalanceAlgoUnitTest, FewMoves) {
       { { "A", "0", "1" }, }
     },
     {
+      kNoLocations,
       { "0", "1", },
       {
         { "A", { 3, 0, } },
@@ -572,6 +651,7 @@ TEST(RebalanceAlgoUnitTest, FewMoves) {
       { { "A", "0", "1" }, }
     },
     {
+      kNoLocations,
       { "0", "1", },
       {
         { "A", { 4, 0, } },
@@ -582,6 +662,7 @@ TEST(RebalanceAlgoUnitTest, FewMoves) {
       }
     },
     {
+      kNoLocations,
       { "0", "1", },
       {
         { "A", { 1, 2, } },
@@ -593,6 +674,7 @@ TEST(RebalanceAlgoUnitTest, FewMoves) {
       }
     },
     {
+      kNoLocations,
       { "0", "1", },
       {
         { "A", { 4, 0, } },
@@ -605,6 +687,7 @@ TEST(RebalanceAlgoUnitTest, FewMoves) {
       }
     },
     {
+      kNoLocations,
       { "0", "1", "2", },
       {
         { "A", { 4, 2, 0, } },
@@ -618,6 +701,7 @@ TEST(RebalanceAlgoUnitTest, FewMoves) {
       }
     },
     {
+      kNoLocations,
       { "0", "1", "2", },
       {
         { "A", { 2, 1, 0, } },
@@ -631,6 +715,7 @@ TEST(RebalanceAlgoUnitTest, FewMoves) {
       }
     },
     {
+      kNoLocations,
       { "0", "1", "2", },
       {
         { "A", { 5, 1, 0, } },
@@ -642,6 +727,7 @@ TEST(RebalanceAlgoUnitTest, FewMoves) {
       }
     },
     {
+      kNoLocations,
       { "0", "1", "2", },
       {
         { "A", { 5, 1, 0, } },
@@ -664,6 +750,7 @@ TEST(RebalanceAlgoUnitTest, FewMoves) {
 // make them balanced moving many replicas around.
 TEST(RebalanceAlgoUnitTest, ManyMoves) {
   const TestClusterConfig kConfig = {
+    kNoLocations,
     { "0", "1", "2", },
     {
       { "A", { 100, 400, 100, } },
@@ -720,6 +807,7 @@ TEST(RebalanceAlgoUnitTest, RandomizedTest) {
       });
     }
     TestClusterConfig cfg{
+      kNoLocations,
       std::move(tserver_uuids),
       std::move(table_replicas),
       {}  // This tests checks achievement of balance, not the path to it.
@@ -740,11 +828,236 @@ TEST(RebalanceAlgoUnitTest, RandomizedTest) {
       while (!IsBalanced(ci.balance)) {
         ASSERT_OK(algo.GetNextMove(ci, &move));
         ASSERT_OK(TwoDimensionalGreedyAlgo::ApplyMove(*move, &ci.balance));
-        ASSERT_GE(num_moves_ub, ++num_moves) << "Too many moves! The algorithm is likely
stuck";
+        ASSERT_GE(num_moves_ub, ++num_moves)
+            << "Too many moves! The algorithm is likely stuck";
       }
     }
   }
 }
 
+// Location-based rebalancing, the case of few moves because of slight (if any)
+// location load imbalance.
+TEST(RebalanceAlgoUnitTest, LocationBalancingFewMoves) {
+  const TestClusterConfig kConfigs[] = {
+    {
+      {
+        { "L0", { "0", "1", }, },
+        { "L1", { "2", }, },
+      },
+      { "0", "1", "2", },
+      { { "A", { 1, 0, 0, } }, },
+      {}
+    },
+    {
+      {
+        { "L0", { "0", "1", }, },
+        { "L1", { "2", }, },
+      },
+      { "0", "1", "2", },
+      { { "A", { 0, 0, 1, } }, },
+      {}
+    },
+    {
+      {
+        { "L0", { "0", "1", }, },
+        { "L1", { "2", }, },
+      },
+      { "0", "1", "2", },
+      { { "A", { 1, 1, 0, } }, },
+      {}
+    },
+    {
+      {
+        { "L0", { "0", "1", }, },
+        { "L1", { "2", }, },
+      },
+      { "0", "1", "2", },
+      { { "A", { 1, 1, 1, } }, },
+      {}
+    },
+    {
+      {
+        { "L0", { "0", "1", }, },
+        { "L1", { "2", }, },
+      },
+      { "0", "1", "2", },
+      { { "A", { 2, 1, 0, } }, },
+      { { "A", "0", "2" }, }
+    },
+    {
+      {
+        { "L0", { "0", "1", }, },
+        { "L1", { "2", }, },
+      },
+      { "0", "1", "2", },
+      { { "A", { 1, 1, 2, } }, },
+      {}
+    },
+    {
+      {
+        { "L0", { "0", "1", }, },
+        { "L1", { "2", }, },
+      },
+      { "0", "1", "2", },
+      { { "A", { 2, 1, 3, } }, },
+      { { "A", "2", "1" }, }
+    },
+    {
+      {
+        { "L0", { "0", "1", }, },
+        { "L1", { "2", }, },
+      },
+      { "0", "1", "2", },
+      { { "A", { 2, 4, 0, } }, },
+      {
+        { "A", "1", "2" },
+        { "A", "1", "2" },
+      }
+    },
+    {
+      {
+        { "L0", { "0", "1", }, },
+        { "L1", { "2", "3", "4", "5", }, },
+      },
+      { "0", "1", "2", "3", "4", "5" },
+      { { "A", { 1, 1, 1, 1, 1, 1, } }, },
+      {}
+    },
+    {
+      {
+        { "L0", { "0", "1", }, },
+        { "L1", { "2", "3", "4", "5", }, },
+      },
+      { "0", "1", "2", "3", "4", "5" },
+      { { "A", { 2, 0, 4, 0, 0, 0, } }, },
+      {}
+    },
+    {
+      {
+        { "L0", { "0", }, },
+        { "L1", { "1", "2", "3", "4", "5", }, },
+      },
+      { "0", "1", "2", "3", "4", "5", },
+      { { "A", { 0, 1, 1, 1, 1, 1, } }, },
+      {}
+    },
+    {
+      {
+        { "L0", { "0", }, },
+        { "L1", { "1", "2", "3", "4", "5", }, },
+      },
+      { "0", "1", "2", "3", "4", "5", },
+      { { "A", { 0, 5, 0, 0, 0, 0, } }, },
+      {}
+    },
+    {
+      {
+        { "L0", { "0", }, },
+        { "L1", { "1", "2", "3", "4", "5", }, },
+      },
+      { "0", "1", "2", "3", "4", "5", },
+      { { "A", { 2, 1, 1, 1, 1, 0, } }, },
+      { { "A", "0", "5" }, }
+    },
+  };
+  VERIFY_LOCATION_BALANCING_MOVES(kConfigs);
+}
+
+// A simple location-based rebalancing scenario, a single table.
+TEST(RebalanceAlgoUnitTest, LocationBalancingSimpleST) {
+  const TestClusterConfig kConfigs[] = {
+    {
+      {
+        { "L0", { "0", }, },
+        { "L1", { "1", }, },
+        { "L2", { "2", }, },
+      },
+      { "0", "1", "2", },
+      { { "A", { 2, 1, 0, } }, },
+      { { "A", "0", "2" }, }
+    },
+    {
+      {
+        { "L0", { "0", }, },
+        { "L1", { "1", }, },
+        { "L2", { "2", }, },
+      },
+      { "0", "1", "2", },
+      { { "A", { 6, 0, 0, } }, },
+      // TODO(aserbin): what about ordering?
+      {
+        { "A", "0", "2" },
+        { "A", "0", "1" },
+        { "A", "0", "1" },
+        { "A", "0", "2" },
+      }
+    },
+    {
+      {
+        { "L0", { "0", }, },
+        { "L1", { "1", }, },
+        { "L2", { "2", }, },
+      },
+      { "0", "1", "2", },
+      {
+        { "A", { 1, 0, 0, } },
+      },
+      {}
+    },
+  };
+  VERIFY_LOCATION_BALANCING_MOVES(kConfigs);
+}
+
+// A simple location-based rebalancing scenario, multiple tables.
+TEST(RebalanceAlgoUnitTest, LocationBalancingSimpleMT) {
+  const TestClusterConfig kConfigs[] = {
+    {
+      {
+        { "L0", { "0", }, },
+        { "L1", { "1", }, },
+        { "L2", { "2", }, },
+      },
+      { "0", "1", "2", },
+      {
+        { "A", { 2, 1, 1, } },
+        { "B", { 0, 0, 2, } },
+      },
+      { { "B", "2", "1" }, }
+    },
+    {
+      {
+        { "L0", { "0", }, },
+        { "L1", { "1", }, },
+        { "L2", { "2", }, },
+      },
+      { "0", "1", "2", },
+      {
+        { "A", { 2, 1, 0, } },
+        { "B", { 0, 0, 3, } },
+      },
+      {
+        { "B", "2", "1" },
+        { "B", "2", "0" },
+        { "A", "0", "2" },
+      }
+    },
+    {
+      {
+        { "L0", { "0", }, },
+        { "L1", { "1", }, },
+        { "L2", { "2", }, },
+      },
+      { "0", "1", "2", },
+      {
+        { "A", { 1, 0, 0, } },
+        { "B", { 1, 1, 2, } },
+        { "C", { 10, 9, 10, } },
+      },
+      {}
+    },
+  };
+  VERIFY_LOCATION_BALANCING_MOVES(kConfigs);
+}
+
 } // namespace tools
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/87084c10/src/kudu/tools/rebalance_algo.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/rebalance_algo.cc b/src/kudu/tools/rebalance_algo.cc
index c788761..a006258 100644
--- a/src/kudu/tools/rebalance_algo.cc
+++ b/src/kudu/tools/rebalance_algo.cc
@@ -18,28 +18,35 @@
 #include "kudu/tools/rebalance_algo.h"
 
 #include <algorithm>
+#include <cmath>
+#include <functional>
 #include <iostream>
 #include <iterator>
 #include <limits>
 #include <random>
 #include <string>
+#include <unordered_map>
 #include <utility>
 #include <vector>
 
 #include <boost/optional/optional.hpp>
 #include <glog/logging.h>
 
+#include "kudu/gutil/map-util.h"
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/util/status.h"
 
 using std::back_inserter;
+using std::endl;
+using std::multimap;
 using std::numeric_limits;
 using std::ostringstream;
 using std::set_intersection;
 using std::shuffle;
 using std::sort;
 using std::string;
+using std::unordered_map;
 using std::vector;
 using strings::Substitute;
 
@@ -396,5 +403,218 @@ Status TwoDimensionalGreedyAlgo::GetMinMaxLoadedServers(
   return Status::OK();
 }
 
+Status LocationBalancingAlgo::GetNextMove(
+    const ClusterInfo& cluster_info,
+    boost::optional<TableReplicaMove>* move) {
+  DCHECK(move);
+  *move = boost::none;
+
+  // Per-table information on locations load.
+  // TODO(aserbin): maybe, move this container into ClusterInfo?
+  unordered_map<string, multimap<double, string>> location_load_info_by_table;
+
+  // A dictionary to map location-wise load imbalance into table identifier.
+  // The most imbalanced tables come last.
+  multimap<double, string> table_id_by_load_imbalance;
+  for (const auto& elem : cluster_info.balance.table_info_by_skew) {
+    const auto& table_info = elem.second;
+    // Number of replicas of all tablets comprising the table, per location.
+    unordered_map<string, int32_t> replica_num_per_location;
+    for (const auto& elem : table_info.servers_by_replica_count) {
+      auto replica_count = elem.first;
+      const auto& ts_id = elem.second;
+      const auto& location =
+          FindOrDie(cluster_info.locality.location_by_ts_id, ts_id);
+      LookupOrEmplace(&replica_num_per_location, location, 0) += replica_count;
+    }
+    multimap<double, string> location_by_load;
+    for (const auto& elem : replica_num_per_location) {
+      const auto& location = elem.first;
+      double replica_num = static_cast<double>(elem.second);
+      auto ts_num = FindOrDie(cluster_info.locality.servers_by_location,
+                              location).size();
+      CHECK_NE(0, ts_num);
+      location_by_load.emplace(replica_num / ts_num, location);
+    }
+
+    const auto& table_id = table_info.table_id;
+    const auto load_min = location_by_load.cbegin()->first;
+    const auto load_max = location_by_load.crbegin()->first;
+    const auto imbalance = load_max - load_min;
+    DCHECK(!std::isnan(imbalance));
+    table_id_by_load_imbalance.emplace(imbalance, table_id);
+    EmplaceOrDie(&location_load_info_by_table,
+                 table_id, std::move(location_by_load));
+  }
+
+  string imbalanced_table_id;
+  if (!IsBalancingNeeded(table_id_by_load_imbalance, &imbalanced_table_id)) {
+    // Nothing to do: all tables are location-balanced enough.
+    return Status::OK();
+  }
+
+  // Work on the most location-wise unbalanced tables first.
+  const auto& load_info = FindOrDie(
+      location_load_info_by_table, imbalanced_table_id);
+
+  vector<string> loc_loaded_least;
+  {
+    const auto min_range = load_info.equal_range(load_info.cbegin()->first);
+    for (auto it = min_range.first; it != min_range.second; ++it) {
+      loc_loaded_least.push_back(it->second);
+    }
+  }
+  DCHECK(!loc_loaded_least.empty());
+
+  vector<string> loc_loaded_most;
+  {
+    const auto max_range = load_info.equal_range(load_info.crbegin()->first);
+    for (auto it = max_range.first; it != max_range.second; ++it) {
+      loc_loaded_most.push_back(it->second);
+    }
+  }
+  DCHECK(!loc_loaded_most.empty());
+
+  if (PREDICT_FALSE(VLOG_IS_ON(1))) {
+    ostringstream s;
+    s << "[ ";
+    for (const auto& loc : loc_loaded_least) {
+      s << loc << " ";
+    }
+    s << "]";
+    VLOG(1) << "loc_loaded_least: " << s.str();
+
+    s.str("");
+    s << "[ ";
+    for (const auto& loc : loc_loaded_most) {
+      s << loc << " ";
+    }
+    s << "]";
+    VLOG(1) << "loc_leaded_most: " << s.str();
+  }
+
+  return FindBestMove(imbalanced_table_id, loc_loaded_least, loc_loaded_most,
+                      cluster_info, move);
+}
+
+bool LocationBalancingAlgo::IsBalancingNeeded(
+    const TableByLoadImbalance& imbalance_info,
+    string* most_imbalanced_table_id) {
+  if (PREDICT_FALSE(VLOG_IS_ON(1))) {
+    ostringstream ss;
+    ss << "Table imbalance report: " << endl;
+    for (const auto& elem : imbalance_info) {
+      ss << "  " << elem.second << ": " << elem.first << endl;
+    }
+    VLOG(1) << ss.str();
+  }
+
+  if (imbalance_info.empty()) {
+    // Nothing to do -- an empty cluster.
+    return false;
+  }
+
+  // Evaluate the maximum existing imbalance: is it possible to move replicas
+  // between tablet servers in different locations to make the skew less?
+  //
+  // TODO(aserbin): detect 'good enough' vs ideal cases, like (b) vs (a) in
+  //                the class-wide comment. In other words, find the minimum
+  //                load imbalance down to which it makes sense to try
+  //                cross-location rebalancing. Probably, it should be a policy
+  //                wrt what to prefer: ideal location-wide balance or minimum
+  //                number of replica moves between locations?
+  //
+  // The information on the most imbalanced table is in the last element
+  // of the map.
+  const auto it = imbalance_info.crbegin();
+  const auto imbalance = it->first;
+  if (imbalance > 1) {
+    *most_imbalanced_table_id = it->second;
+    return true;
+  }
+  return false;
+}
+
+// Given the set of the most and the least table-wise loaded locations, choose
+// the source and destination tablet server to move a replica of the specified
+// tablet to improve per-table location load balance as much as possible.
+Status LocationBalancingAlgo::FindBestMove(
+    const string& table_id,
+    const vector<string>& loc_loaded_least,
+    const vector<string>& loc_loaded_most,
+    const ClusterInfo& cluster_info,
+    boost::optional<TableReplicaMove>* move) {
+  // Among the available candidate locations, prefer those having the most and
+  // least loaded tablet servers in terms of total number of hosted replicas.
+  // The rationale is that the per-table location load is a relative metric
+  // (i.e. number of table replicas / number of tablet servers), but it's
+  // always beneficial to have less loaded servers in absolute terms.
+  //
+  // If there are multiple candiate tablet servers with the same extremum load,
+  // choose among them randomly.
+  //
+  // TODO(aserbin): implement fine-grained logic to select the best move among
+  //                the available candidates, if multiple choices are available.
+  //                For example, among candidates with the same number of
+  //                replicas, prefer candidates where the movement from one
+  //                server to another also improves the table-wise skew within
+  //                the destination location.
+  //
+
+  // Building auxiliary containers.
+  // TODO(aserbin): refactor and move some of those into the ClusterBalanceInfo.
+  typedef std::unordered_map<std::string, int32_t> ServerLoadMap;
+  ServerLoadMap load_by_ts;
+  for (const auto& elem : cluster_info.balance.servers_by_total_replica_count) {
+    EmplaceOrDie(&load_by_ts, elem.second, elem.first);
+  }
+
+  // Least loaded tablet servers from the destination locations.
+  multimap<int32_t, string> ts_id_by_load_least;
+  for (const auto& loc : loc_loaded_least) {
+    const auto& loc_ts_ids =
+        FindOrDie(cluster_info.locality.servers_by_location, loc);
+    for (const auto& ts_id : loc_ts_ids) {
+      ts_id_by_load_least.emplace(FindOrDie(load_by_ts, ts_id), ts_id);
+    }
+  }
+  // TODO(aserbin): separate into a function or lambda.
+  const auto min_load = ts_id_by_load_least.cbegin()->first;
+  const auto min_range = ts_id_by_load_least.equal_range(min_load);
+  auto it_min = min_range.first;
+#if 0
+  // TODO(aserbin): add randomness
+  const auto distance_min = distance(min_range.first, min_range.second);
+  std::advance(it_min, Uniform(distance_min));
+  CHECK_NE(min_range.second, it_min);
+#endif
+  const auto& dst_ts_id = it_min->second;
+
+  // Most loaded tablet servers from the source locations.
+  multimap<int32_t, string, std::greater<int32_t>> ts_id_by_load_most;
+  for (const auto& loc : loc_loaded_most) {
+    const auto& loc_ts_ids =
+        FindOrDie(cluster_info.locality.servers_by_location, loc);
+    for (const auto& ts_id : loc_ts_ids) {
+      ts_id_by_load_most.emplace(FindOrDie(load_by_ts, ts_id), ts_id);
+    }
+  }
+  const auto max_load = ts_id_by_load_most.cbegin()->first;
+  const auto max_range = ts_id_by_load_most.equal_range(max_load);
+  auto it_max = max_range.first;
+#if 0
+  // TODO(aserbin): add randomness
+  const auto distance_max = distance(max_range.first, max_range.second);
+  std::advance(it_max, Uniform(distance_max));
+  CHECK_NE(max_range.second, it_max);
+#endif
+  const auto& src_ts_id = it_max->second;
+  CHECK_NE(src_ts_id, dst_ts_id);
+
+  *move = { table_id, src_ts_id, dst_ts_id };
+
+  return Status::OK();
+}
+
 } // namespace tools
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/87084c10/src/kudu/tools/rebalance_algo.h
----------------------------------------------------------------------
diff --git a/src/kudu/tools/rebalance_algo.h b/src/kudu/tools/rebalance_algo.h
index a32fdc7..af9dde0 100644
--- a/src/kudu/tools/rebalance_algo.h
+++ b/src/kudu/tools/rebalance_algo.h
@@ -81,10 +81,9 @@ struct ClusterLocalityInfo {
 };
 
 // Information on a cluster as input for various rebalancing algorithms.
-// As of now, contains only ClusterBalanceInfo, but ClusterLocalityInfo
-// is to be added once corresponding location-aware algorithms are implemented.
 struct ClusterInfo {
   ClusterBalanceInfo balance;
+  ClusterLocalityInfo locality;
 };
 
 // A directive to move some replica of a table between two tablet servers.
@@ -147,6 +146,7 @@ class TwoDimensionalGreedyAlgo : public RebalancingAlgo {
   explicit TwoDimensionalGreedyAlgo(
       EqualSkewOption opt = EqualSkewOption::PICK_RANDOM);
 
+ protected:
   Status GetNextMove(const ClusterInfo& cluster_info,
                      boost::optional<TableReplicaMove>* move) override;
 
@@ -154,6 +154,7 @@ class TwoDimensionalGreedyAlgo : public RebalancingAlgo {
   enum class ExtremumType { MAX, MIN, };
 
   FRIEND_TEST(RebalanceAlgoUnitTest, RandomizedTest);
+  FRIEND_TEST(RebalanceAlgoUnitTest, EmptyBalanceInfoGetNextMove);
   FRIEND_TEST(RebalanceAlgoUnitTest, EmptyClusterInfoGetNextMove);
 
   // Compute the intersection of the least or most loaded tablet servers for a
@@ -196,5 +197,90 @@ class TwoDimensionalGreedyAlgo : public RebalancingAlgo {
   std::mt19937 generator_;
 };
 
+// Algorithm to balance among locations in the cluster.
+//
+// The inter-location rebalancing is to minimize location load skew per table.
+// The idea is to equalize the density of the distribution of each table across
+// locations.
+//
+// Q: Why is it beneficial to equalize the density of table replicas across
+//    locations?
+// A: Assuming the homogeneous structure of the cluster (e.g., that's about
+//    having machines of the same hardware specs across the cluster) and
+//    uniform distribution of requests among all tables in the cluster
+//    (the latter is questionable, but in Kudu there isn't currently a way
+//    to specify any deviations anyway), that gives better usage
+//    of the available hardware resources.
+//
+// NOTE: probably, in the future we might add a notion of some preference in
+//       table placements regarding selected locations.
+//
+// Q: What is per-table location load skew?
+// A: Consider number of replicas per location for tablets comprising
+//    a table T. Assume we have locations L_0, ..., L_n, where
+//    replica_num(T, L_0), ..., replica_num(T, L_n) are numbers of replicas
+//    of T's tablets at corresponding locations. We want to make the following
+//    ratios to devicate as less as possible:
+//
+//    replica_num(T, L_0) / ts_num(L_0), ..., replica_num(T, L_n) / ts_num(L_n)
+//
+// ******* Some Examples *******
+//
+// Tablet T of replication factor 5, and locations L_0, ..., L_4. Consider
+// the following tablet servers disposition:
+//
+//   ts_num(L_0): 2
+//   ts_num(L_1): 2
+//   ts_num(L_2): 1
+//   ts_num(L_3): 1
+//   ts_num(L_4): 1
+//
+// What distribution of replicas is preferred for a tablet t0 of table T?
+//  (a) { L_0: 1, L_1: 1, L_2: 1, L_3: 1, L_4: 1 }
+//          skew 0.5: { 0.5, 0.5, 1.0, 1.0, 1.0 }
+//
+//  (b) { L_0: 2, L_1: 2, L_2: 1, L_3: 0, L_4: 0 }
+//          skew 1.0 : { 1.0, 1.0, 1.0, 0.0, 0.0 }
+//
+// The main idea is to prevent moving tablets if the distribution is 'good
+// enough'. E.g., the distribution of (b) is acceptable if the rebalancer finds
+// the replicas already placed like that, and it should not try to move
+// the replicas to achieve the ideal distribution of (a).
+//
+// How about:
+//  (c) { L_0: 0, L_1: 0, L_2: 1, L_3: 2, L_4: 2 }
+//          skew 2.0: { 0.0, 0.0, 1.0, 2.0, 2.0 }
+//
+// We want to move replicas to make the distribution (c) more balanced;
+// 2 movements gives us the 'ideal' location-wise replica placement.
+class LocationBalancingAlgo : public RebalancingAlgo {
+ protected:
+  Status GetNextMove(const ClusterInfo& cluster_info,
+                     boost::optional<TableReplicaMove>* move) override;
+ private:
+  FRIEND_TEST(RebalanceAlgoUnitTest, RandomizedTest);
+  typedef std::multimap<double, std::string> TableByLoadImbalance;
+
+  // Check if any rebalancing is needed across cluster locations based on the
+  // information provided by the 'imbalance_info' parameter. Returns 'true'
+  // if rebalancing is needed, 'false' otherwise. Upon returning 'true',
+  // the identifier of the most cross-location imbalanced table is output into
+  // the 'most_imbalanced_table_id' parameter (which must not be null).
+  static bool IsBalancingNeeded(
+      const TableByLoadImbalance& imbalance_info,
+      std::string* most_imbalanced_table_id);
+
+  // Given the set of the most and the least table-wise loaded locations, choose
+  // the source and destination tablet server to move a replica of the specified
+  // tablet to improve per-table location load balance as much as possible.
+  // If no replica can be moved to balance the load, the 'move' output parameter
+  // is set to 'boost::none'.
+  Status FindBestMove(const std::string& table_id,
+      const std::vector<std::string>& loc_loaded_least,
+      const std::vector<std::string>& loc_loaded_most,
+      const ClusterInfo& cluster_info,
+      boost::optional<TableReplicaMove>* move);
+};
+
 } // namespace tools
 } // namespace kudu


Mime
View raw message