kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ale...@apache.org
Subject [2/2] kudu git commit: [rebalancing] add a rebalancing algorithm
Date Thu, 07 Jun 2018 04:13:41 GMT
[rebalancing] add a rebalancing algorithm

This changelist introduces the high-level rebalancing algorithm
and corresponding unit tests. It contains both an abstraction for
rebalancing algorithms in general and an implementation of a
greedy algorithm that rebalances a cluster by trying to equalize
the number of replicas per tablet server for each table and for the
cluster as a whole.

The algorithm just identifies what moves should be done given an
arrangement of a cluster. The 'engine' that performs moves
will be added in a subsequent changelist.

Change-Id: I5a8050ee79117a2ae2f6f88740ed25656946cfb4
Reviewed-on: http://gerrit.cloudera.org:8080/10336
Reviewed-by: Mike Percy <mpercy@apache.org>
Reviewed-by: Will Berkeley <wdberkeley@gmail.com>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/ccdcf6ce
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/ccdcf6ce
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/ccdcf6ce

Branch: refs/heads/master
Commit: ccdcf6cea99d1b0fa0ba1106e793b31aa607ba06
Parents: 8719014
Author: Alexey Serbin <aserbin@cloudera.com>
Authored: Mon May 7 15:33:11 2018 -0700
Committer: Alexey Serbin <aserbin@cloudera.com>
Committed: Thu Jun 7 04:09:18 2018 +0000

----------------------------------------------------------------------
 src/kudu/tools/CMakeLists.txt         |  15 +
 src/kudu/tools/rebalance_algo-test.cc | 629 +++++++++++++++++++++++++++++
 src/kudu/tools/rebalance_algo.cc      | 393 ++++++++++++++++++
 src/kudu/tools/rebalance_algo.h       | 176 ++++++++
 4 files changed, 1213 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/ccdcf6ce/src/kudu/tools/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/tools/CMakeLists.txt b/src/kudu/tools/CMakeLists.txt
index f88e8f4..ea38500 100644
--- a/src/kudu/tools/CMakeLists.txt
+++ b/src/kudu/tools/CMakeLists.txt
@@ -81,6 +81,18 @@ target_link_libraries(ksck
 )
 
 #######################################
+# kudu_tools_rebalance
+#######################################
+
+add_library(kudu_tools_rebalance
+  rebalance_algo.cc
+)
+target_link_libraries(kudu_tools_rebalance
+  kudu_common
+  ${KUDU_BASE_LIBS}
+)
+
+#######################################
 # kudu
 #######################################
 
@@ -112,6 +124,7 @@ target_link_libraries(kudu
   kudu_client_test_util
   kudu_common
   kudu_fs
+  kudu_tools_rebalance
   kudu_util
   log
   master
@@ -141,6 +154,7 @@ set(KUDU_TEST_LINK_LIBS
   ksck
   kudu_tools_util
   kudu_tools_test_util
+  kudu_tools_rebalance
   itest_util
   mini_cluster
   ${KUDU_MIN_TEST_LIBS})
@@ -158,4 +172,5 @@ ADD_KUDU_TEST_DEPENDENCIES(kudu-tool-test
 ADD_KUDU_TEST(kudu-ts-cli-test)
 ADD_KUDU_TEST_DEPENDENCIES(kudu-ts-cli-test
   kudu)
+ADD_KUDU_TEST(rebalance_algo-test)
 ADD_KUDU_TEST(tool_action-test)

http://git-wip-us.apache.org/repos/asf/kudu/blob/ccdcf6ce/src/kudu/tools/rebalance_algo-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/rebalance_algo-test.cc b/src/kudu/tools/rebalance_algo-test.cc
new file mode 100644
index 0000000..3da13e1
--- /dev/null
+++ b/src/kudu/tools/rebalance_algo-test.cc
@@ -0,0 +1,629 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/tools/rebalance_algo.h"
+
+#include <algorithm>
+#include <cstddef>
+#include <iostream>
+#include <iterator>
+#include <map>
+#include <memory>
+#include <set>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+
+namespace kudu {
+namespace tools {
+struct TestClusterConfig;
+}  // namespace tools
+}  // namespace kudu
+
+#define VERIFY_MOVES(test_config) \
+  do { \
+    for (auto idx = 0; idx < ARRAYSIZE((test_config)); ++idx) { \
+      SCOPED_TRACE(Substitute("test config index: $0", idx)); \
+      NO_FATALS(VerifyRebalancingMoves((test_config)[idx])); \
+    } \
+  } while (false)
+
+using std::ostream;
+using std::set;
+using std::sort;
+using std::string;
+using std::vector;
+using strings::Substitute;
+
+namespace kudu {
+namespace tools {
+
+struct TablePerServerReplicas {
+  const string table_id;
+
+  // Number of replicas of this table on each server in the cluster.
+  const vector<size_t> num_replicas_by_server;
+};
+
+// Structure to describe rebalancing-related state of the cluster expressively
+// enough for the tests.
+struct TestClusterConfig {
+  // UUIDs of tablet servers; every element must be unique.
+  const vector<string> tserver_uuids;
+
+  // Distribution of tablet replicas across the tablet servers. The following
+  // constraints should be in place:
+  //   * for each t in table_replicas:
+  //       t.num_replicas_by_server.size() == tserver_uuids.size()
+  const vector<TablePerServerReplicas> table_replicas;
+
+  // The expected replica movements: the reference output of the algorithm
+  // to compare with.
+  const vector<TableReplicaMove> expected_moves;
+};
+
+bool operator==(const TableReplicaMove& lhs, const TableReplicaMove& rhs) {
+  return
+      lhs.table_id == rhs.table_id &&
+      lhs.from == rhs.from &&
+      lhs.to == rhs.to;
+}
+
+ostream& operator<<(ostream& o, const TableReplicaMove& move) {
+  o << move.table_id << ":" << move.from << "->" << move.to;
+  return o;
+}
+
+// Transform the definition of the test cluster into the ClusterBalanceInfo
+// that is consumed by the rebalancing algorithm.
+void ClusterConfigToClusterBalanceInfo(const TestClusterConfig& tcc,
+                                       ClusterBalanceInfo* cbi) {
+  // First verify that the configuration of the test cluster is valid.
+  set<string> table_ids;
+  for (const auto& table_replica_info : tcc.table_replicas) {
+    CHECK_EQ(tcc.tserver_uuids.size(),
+             table_replica_info.num_replicas_by_server.size());
+    table_ids.emplace(table_replica_info.table_id);
+  }
+  CHECK_EQ(table_ids.size(), tcc.table_replicas.size());
+  {
+    // Check for uniqueness of the tablet servers' identifiers.
+    set<string> uuids(tcc.tserver_uuids.begin(), tcc.tserver_uuids.end());
+    CHECK_EQ(tcc.tserver_uuids.size(), uuids.size());
+  }
+
+  ClusterBalanceInfo result;
+  for (size_t tserver_idx = 0; tserver_idx < tcc.tserver_uuids.size();
+       ++tserver_idx) {
+    // Total replica count at the tablet server.
+    size_t count = 0;
+    for (const auto& table_replica_info: tcc.table_replicas) {
+      count += table_replica_info.num_replicas_by_server[tserver_idx];
+    }
+    result.servers_by_total_replica_count.emplace(count, tcc.tserver_uuids[tserver_idx]);
+  }
+
+  auto& table_info_by_skew = result.table_info_by_skew;
+  for (size_t table_idx = 0; table_idx < tcc.table_replicas.size(); ++table_idx) {
+    // Replicas of the current table per tablet server.
+    const vector<size_t>& replicas_count =
+        tcc.table_replicas[table_idx].num_replicas_by_server;
+    TableBalanceInfo info;
+    info.table_id = tcc.table_replicas[table_idx].table_id;
+    for (size_t tserver_idx = 0; tserver_idx < replicas_count.size(); ++tserver_idx) {
+      auto count = replicas_count[tserver_idx];
+      info.servers_by_replica_count.emplace(count, tcc.tserver_uuids[tserver_idx]);
+    }
+    size_t max_count = info.servers_by_replica_count.rbegin()->first;
+    size_t min_count = info.servers_by_replica_count.begin()->first;
+    CHECK_GE(max_count, min_count);
+    table_info_by_skew.emplace(max_count - min_count, std::move(info));
+  }
+  *cbi = std::move(result);
+}
+
+void VerifyRebalancingMoves(const TestClusterConfig& cfg) {
+  vector<TableReplicaMove> moves;
+  {
+    ClusterBalanceInfo cbi;
+    ClusterConfigToClusterBalanceInfo(cfg, &cbi);
+    TwoDimensionalGreedyAlgo algo(
+        TwoDimensionalGreedyAlgo::EqualSkewOption::PICK_FIRST);
+    ASSERT_OK(algo.GetNextMoves(std::move(cbi), 0, &moves));
+  }
+  EXPECT_EQ(cfg.expected_moves, moves);
+}
+
+// Test the behavior of the algorithm when no input information is given.
+TEST(RebalanceAlgoUnitTest, EmptyClusterBalanceInfo) {
+  TwoDimensionalGreedyAlgo algo;
+  vector<TableReplicaMove> moves;
+  ClusterBalanceInfo info;
+  Status s = algo.GetNextMoves(info, 1, &moves);
+  ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
+  EXPECT_TRUE(moves.empty());
+}
+
+// Various scenarios of balanced configurations where no moves are expected
+// to happen.
+TEST(RebalanceAlgoUnitTest, AlreadyBalanced) {
+  // The configurations are already balanced, no moves should be attempted.
+  const TestClusterConfig kConfigs[] = {
+    {
+      // A single tablet server with a single replica of the only table.
+      { "0", },
+      {
+        { "A", { 1 } },
+      },
+    },
+    {
+      // A single tablet server in the cluster that hosts all replicas.
+      { "0", },
+      {
+        { "A", { 1 } },
+        { "B", { 10 } },
+        { "C", { 100 } },
+      },
+    },
+    {
+      // Single table and 2 TS: 100 and 99 replicas at each.
+      { "0", "1", },
+      {
+        { "A", { 100, 99, } },
+      },
+    },
+    {
+      // Table- and cluster-wise balanced configuration with one-off skew.
+      { "0", "1", },
+      {
+        { "A", { 1, 1, } },
+        { "B", { 1, 2, } },
+      },
+    },
+    {
+      // A configuration which has zero skew cluster-wise, while the table-wise
+      // balance has one-off skew: the algorithm should not try to correct
+      // the latter.
+      { "0", "1", },
+      {
+        { "A", { 1, 2, } },
+        { "B", { 1, 2, } },
+        { "C", { 1, 0, } },
+        { "D", { 1, 0, } },
+      },
+    },
+    {
+      { "0", "1", "2", },
+      {
+        { "A", { 1, 0, 0, } },
+        { "B", { 0, 1, 0, } },
+        { "C", { 0, 0, 1, } },
+      },
+    },
+    {
+      // A simple balanced case: 3 tablet servers, 3 tables with
+      // one replica per server.
+      { "0", "1", "2", },
+      {
+        { "A", { 1, 1, 1, } },
+        { "B", { 1, 1, 1, } },
+        { "C", { 1, 1, 1, } },
+      },
+    },
+    {
+      { "0", "1", "2", },
+      {
+        { "A", { 0, 1, 1, } },
+        { "B", { 1, 0, 1, } },
+        { "C", { 1, 1, 0, } },
+      },
+    },
+    {
+      { "0", "1", "2", },
+      {
+        { "A", { 2, 1, 1, } },
+        { "B", { 1, 2, 1, } },
+        { "C", { 1, 1, 2, } },
+      },
+    },
+    {
+      { "0", "1", "2", },
+      {
+        { "A", { 1, 1, 0, } },
+        { "B", { 1, 1, 0, } },
+        { "C", { 1, 0, 1, } },
+        { "D", { 1, 0, 1, } },
+        { "E", { 0, 1, 1, } },
+        { "F", { 0, 1, 1, } },
+      },
+    },
+    {
+      { "0", "1", "2", },
+      {
+        { "A", { 1, 0, 1, } },
+        { "B", { 1, 1, 0, } },
+      },
+    },
+    {
+      { "0", "1", "2", },
+      {
+        { "B", { 1, 0, 1, } },
+        { "A", { 1, 1, 0, } },
+      },
+    },
+    {
+      { "0", "1", "2", },
+      {
+        { "A", { 2, 2, 1, } },
+        { "B", { 1, 0, 1, } },
+      },
+    },
+    {
+      { "0", "1", "2", },
+      {
+        { "A", { 2, 2, 1, } },
+        { "B", { 1, 1, 1, } },
+      },
+    },
+    {
+      { "0", "1", "2", },
+      {
+        { "A", { 2, 2, 1, } },
+        { "B", { 0, 0, 1, } },
+        { "C", { 0, 0, 1, } },
+      },
+    },
+    {
+      { "0", "1", "2", },
+      {
+        { "A", { 0, 1, 0, } },
+        { "B", { 1, 0, 1, } },
+        { "C", { 1, 0, 1, } },
+      },
+    },
+  };
+  VERIFY_MOVES(kConfigs);
+}
+
+// Set of scenarios where the distribution of replicas is table-wise balanced
+// but not yet cluster-wise balanced, requiring just a few replica moves
+// to achieve both table- and cluster-wise balance state.
+TEST(RebalanceAlgoUnitTest, TableWiseBalanced) {
+  const TestClusterConfig kConfigs[] = {
+    {
+      { "0", "1", },
+      {
+        { "A", { 100, 99, } },
+        { "B", { 100, 99, } },
+      },
+      { { "A", "0", "1" }, }
+    },
+    {
+      { "0", "1", },
+      {
+        { "A", { 1, 2, } },
+        { "B", { 1, 2, } },
+        { "C", { 1, 0, } },
+        { "D", { 0, 1, } },
+      },
+      { { "A", "1", "0" }, }
+    },
+    {
+      { "0", "1", "2", },
+      {
+        { "A", { 1, 0, 0, } },
+        { "B", { 0, 1, 0, } },
+        { "C", { 1, 0, 0, } },
+      },
+      { { "A", "0", "2" }, }
+    },
+    {
+      { "0", "1", "2", },
+      {
+        { "A", { 1, 1, 1, } },
+        { "B", { 0, 1, 1, } },
+        { "C", { 0, 0, 1, } },
+      },
+      { { "B", "2", "0" }, }
+    },
+    {
+      { "0", "1", "2", },
+      {
+        { "A", { 1, 1, 0, } },
+        { "B", { 1, 0, 1, } },
+        { "C", { 1, 0, 1, } },
+      },
+      { { "B", "0", "1" }, }
+    },
+    {
+      { "0", "1", "2", },
+      {
+        { "C", { 1, 0, 1, } },
+        { "B", { 1, 0, 1, } },
+        { "A", { 1, 1, 0, } },
+      },
+      { { "C", "0", "1" }, }
+    },
+  };
+  VERIFY_MOVES(kConfigs);
+}
+
+// Simple table-wise balanced configuration to have just one one-move
+// to make them cluster-wise balanced as well.
+TEST(RebalanceAlgoUnitTest, OneMoveNoCycling) {
+  // The internals of the algorithm might depend on the table UUID ordering,
+  // that's why multiples of virtually same configuration.
+  const TestClusterConfig kConfigs[] = {
+    {
+      { "0", "1", "2", },
+      {
+        { "A", { 1, 0, 1, } },
+        { "B", { 1, 0, 1, } },
+        { "C", { 1, 1, 0, } },
+      },
+      { { "A", "0", "1" }, }
+    },
+    {
+      { "0", "1", "2", },
+      {
+        { "A", { 1, 0, 1, } },
+        { "C", { 1, 0, 1, } },
+        { "B", { 1, 1, 0, } },
+      },
+      { { "A", "0", "1" }, }
+    },
+    {
+      { "0", "1", "2", },
+      {
+        { "B", { 1, 0, 1, } },
+        { "C", { 1, 0, 1, } },
+        { "A", { 1, 1, 0, } },
+      },
+      { { "B", "0", "1" }, }
+    },
+    {
+      { "0", "1", "2", },
+      {
+        { "B", { 1, 0, 1, } },
+        { "A", { 1, 0, 1, } },
+        { "C", { 1, 1, 0, } },
+      },
+      { { "B", "0", "1" }, }
+    },
+    {
+      { "0", "1", "2", },
+      {
+        { "C", { 1, 0, 1, } },
+        { "A", { 1, 0, 1, } },
+        { "B", { 1, 1, 0, } },
+      },
+      { { "C", "0", "1" }, }
+    },
+    {
+      { "0", "1", "2", },
+      {
+        { "C", { 1, 0, 1, } },
+        { "B", { 1, 0, 1, } },
+        { "A", { 1, 1, 0, } },
+      },
+      { { "C", "0", "1" }, }
+    },
+  };
+  VERIFY_MOVES(kConfigs);
+}
+
+// Set of scenarios where the distribution of table replicas is cluster-wise
+// balanced, but not table-wise balanced, requiring just few moves to make it
+// both table- and cluster-wise balanced.
+TEST(RebalanceAlgoUnitTest, ClusterWiseBalanced) {
+  const TestClusterConfig kConfigs[] = {
+    {
+      { "0", "1", },
+      {
+        { "A", { 2, 0, } },
+        { "B", { 1, 2, } },
+      },
+      {
+        { "A", "0", "1" },
+      }
+    },
+    {
+      { "0", "1", },
+      {
+        { "A", { 1, 2, } },
+        { "B", { 2, 0, } },
+        { "C", { 1, 2, } },
+      },
+      {
+        { "B", "0", "1" },
+        { "A", "1", "0" },
+      }
+    },
+    {
+      { "0", "1", "2", },
+      {
+        { "A", { 2, 1, 0, } },
+        { "B", { 0, 1, 2, } },
+      },
+      {
+        { "A", "0", "2" },
+        { "B", "2", "0" },
+      }
+    },
+    {
+      { "0", "1", "2", },
+      {
+        { "A", { 2, 1, 0, } },
+        { "B", { 0, 1, 2, } },
+        { "C", { 1, 1, 2, } },
+      },
+      {
+        { "A", "0", "2" },
+        { "B", "2", "0" },
+      }
+    },
+  };
+  VERIFY_MOVES(kConfigs);
+}
+
+// Unbalanced (both table- and cluster-wise) and simple enough configurations
+// to make them balanced moving just few replicas.
+TEST(RebalanceAlgoUnitTest, FewMoves) {
+  const TestClusterConfig kConfigs[] = {
+    {
+      { "0", "1", },
+      {
+        { "A", { 2, 0, } },
+      },
+      { { "A", "0", "1" }, }
+    },
+    {
+      { "0", "1", },
+      {
+        { "A", { 3, 0, } },
+      },
+      { { "A", "0", "1" }, }
+    },
+    {
+      { "0", "1", },
+      {
+        { "A", { 4, 0, } },
+      },
+      {
+        { "A", "0", "1" },
+        { "A", "0", "1" },
+      }
+    },
+    {
+      { "0", "1", },
+      {
+        { "A", { 1, 2, } },
+        { "B", { 2, 0, } },
+        { "C", { 2, 1, } },
+      },
+      {
+        { "B", "0", "1" },
+      }
+    },
+    {
+      { "0", "1", },
+      {
+        { "A", { 4, 0, } },
+        { "B", { 1, 3, } },
+      },
+      {
+        { "A", "0", "1" },
+        { "B", "1", "0" },
+        { "A", "0", "1" },
+      }
+    },
+    {
+      { "0", "1", "2", },
+      {
+        { "A", { 4, 2, 0, } },
+        { "B", { 2, 1, 0, } },
+        { "C", { 1, 1, 1, } },
+      },
+      {
+        { "A", "0", "2" },
+        { "B", "0", "2" },
+        { "A", "0", "2" },
+      }
+    },
+    {
+      { "0", "1", "2", },
+      {
+        { "A", { 2, 1, 0, } },
+        { "B", { 3, 2, 1, } },
+        { "C", { 2, 3, 5, } },
+      },
+      {
+        { "C", "2", "0" },
+        { "A", "0", "2" },
+        { "B", "0", "2" },
+      }
+    },
+    {
+      { "0", "1", "2", },
+      {
+        { "A", { 5, 1, 0, } },
+      },
+      {
+        { "A", "0", "2" },
+        { "A", "0", "1" },
+        { "A", "0", "2" },
+      }
+    },
+    {
+      { "0", "1", "2", },
+      {
+        { "A", { 5, 1, 0, } },
+        { "B", { 0, 1, 5, } },
+      },
+      {
+        { "A", "0", "2" },
+        { "B", "2", "0" },
+        { "A", "0", "1" },
+        { "B", "2", "1" },
+        { "A", "0", "2" },
+        { "B", "2", "0" },
+      }
+    },
+  };
+  VERIFY_MOVES(kConfigs);
+}
+
+// Unbalanced (both table- and cluster-wise) and simple enough configurations to
+// make them balanced moving many replicas around.
+TEST(RebalanceAlgoUnitTest, ManyMoves) {
+  const TestClusterConfig kConfig = {
+    { "0", "1", "2", },
+    {
+      { "A", { 100, 400, 100, } },
+    },
+  };
+  constexpr size_t kExpectedMovesNum = 200;
+
+  ClusterBalanceInfo cbi;
+  ClusterConfigToClusterBalanceInfo(kConfig, &cbi);
+
+  vector<TableReplicaMove> ref_moves;
+  for (size_t i = 0; i < kExpectedMovesNum; ++i) {
+    if (i % 2) {
+      ref_moves.push_back({ "A", "1", "2" });
+    } else {
+      ref_moves.push_back({ "A", "1", "0" });
+    }
+  }
+
+  TwoDimensionalGreedyAlgo algo(
+      TwoDimensionalGreedyAlgo::EqualSkewOption::PICK_FIRST);
+  vector<TableReplicaMove> moves;
+  ASSERT_OK(algo.GetNextMoves(cbi, 0, &moves));
+  EXPECT_EQ(ref_moves, moves);
+}
+
+} // namespace tools
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/ccdcf6ce/src/kudu/tools/rebalance_algo.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/rebalance_algo.cc b/src/kudu/tools/rebalance_algo.cc
new file mode 100644
index 0000000..ec099be
--- /dev/null
+++ b/src/kudu/tools/rebalance_algo.cc
@@ -0,0 +1,393 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/tools/rebalance_algo.h"
+
+#include <algorithm>
+#include <iostream>
+#include <iterator>
+#include <limits>
+#include <map>
+#include <memory>
+#include <random>
+#include <string>
+#include <unordered_map>
+#include <unordered_set>
+#include <utility>
+#include <vector>
+
+#include <boost/optional/optional.hpp>
+#include <glog/logging.h>
+
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/status.h"
+
+using std::back_inserter;
+using std::cout;
+using std::endl;
+using std::make_pair;
+using std::multimap;
+using std::ostringstream;
+using std::set_intersection;
+using std::shared_ptr;
+using std::shuffle;
+using std::sort;
+using std::string;
+using std::unordered_map;
+using std::unordered_set;
+using std::vector;
+using strings::Substitute;
+
+namespace kudu {
+namespace tools {
+
+namespace {
+
+// Applies to 'm' a move of a replica from the tablet server with id 'src' to
+// the tablet server with id 'dst' by decrementing the count of 'src' and
+// incrementing the count of 'dst'.
+// Returns Status::NotFound if either 'src' or 'dst' is not present in 'm'.
+Status MoveOneReplica(const string& src,
+                      const string& dst,
+                      ServersByCountMap* m) {
+  bool found_src = false;
+  bool found_dst = false;
+  int32_t count_src = 0;
+  int32_t count_dst = 0;
+  for (auto it = m->begin(); it != m->end(); ) {
+    if (it->second != src && it->second != dst) {
+      ++it;
+      continue;
+    }
+    auto count = it->first;
+    if (it->second == src) {
+      found_src = true;
+      count_src = count;
+    } else {
+      DCHECK_EQ(dst, it->second);
+      found_dst = true;
+      count_dst = count;
+    }
+    it = m->erase(it);
+  }
+  if (!found_src) {
+    if (found_dst) {
+      // Preserving the original data in the container.
+      m->emplace(count_dst, dst);
+    }
+    return Status::NotFound("no per-server counts for replica", src);
+  }
+  if (!found_dst) {
+    if (found_src) {
+      // Preserving the original data in the container.
+      m->emplace(count_src, src);
+    }
+    return Status::NotFound("no per-server counts for replica", dst);
+  }
+
+  // Moving replica from 'src' to 'dst', updating the counter correspondingly.
+  m->emplace(count_src - 1, src);
+  m->emplace(count_dst + 1, dst);
+  return Status::OK();
+}
+} // anonymous namespace
+
+Status RebalancingAlgo::GetNextMoves(const ClusterBalanceInfo& cluster_info,
+                                     int max_moves_num,
+                                     vector<TableReplicaMove>* moves) {
+  DCHECK_LE(0, max_moves_num);
+  DCHECK(moves);
+
+  // Value of '0' is a shortcut for 'the possible maximum'.
+  if (max_moves_num == 0) {
+    max_moves_num = std::numeric_limits<decltype(max_moves_num)>::max();
+  }
+
+  // Copy cluster_info so we can apply moves to the copy.
+  ClusterBalanceInfo info(cluster_info);
+  moves->clear();
+  for (decltype(max_moves_num) i = 0; i < max_moves_num; ++i) {
+    boost::optional<TableReplicaMove> move;
+    RETURN_NOT_OK(GetNextMove(info, &move));
+    if (!move) {
+      // No replicas to move.
+      break;
+    }
+    RETURN_NOT_OK(ApplyMove(*move, &info));
+    moves->push_back(std::move(*move));
+  }
+  return Status::OK();
+}
+
+Status RebalancingAlgo::ApplyMove(const TableReplicaMove& move,
+                                  ClusterBalanceInfo* cluster_info) {
+  // Copy cluster_info so we can apply moves to the copy.
+  ClusterBalanceInfo info(*DCHECK_NOTNULL(cluster_info));
+
+  // Update the total counts.
+  RETURN_NOT_OK_PREPEND(
+      MoveOneReplica(move.from, move.to, &info.servers_by_total_replica_count),
+      Substitute("missing information on table $0", move.table_id));
+
+  // Find the balance info for the table.
+  auto& table_info_by_skew = info.table_info_by_skew;
+  TableBalanceInfo table_info;
+  bool found_table_info = false;
+  for (auto it = table_info_by_skew.begin(); it != table_info_by_skew.end(); ) {
+    TableBalanceInfo& info = it->second;
+    if (info.table_id != move.table_id) {
+      ++it;
+      continue;
+    }
+    std::swap(info, table_info);
+    it = table_info_by_skew.erase(it);
+    found_table_info = true;
+    break;
+  }
+  if (!found_table_info) {
+    return Status::NotFound(Substitute(
+        "missing table info for table $0", move.table_id));
+  }
+
+  // Update the table counts.
+  RETURN_NOT_OK_PREPEND(
+      MoveOneReplica(move.from, move.to, &table_info.servers_by_replica_count),
+      Substitute("missing information on table $0", move.table_id));
+
+  const auto max_count = table_info.servers_by_replica_count.rbegin()->first;
+  const auto min_count = table_info.servers_by_replica_count.begin()->first;
+  DCHECK_GE(max_count, min_count);
+
+  const int32_t skew = max_count - min_count;
+  table_info_by_skew.emplace(skew, std::move(table_info));
+  std::swap(*cluster_info, info);
+
+  return Status::OK();
+}
+
+TwoDimensionalGreedyAlgo::TwoDimensionalGreedyAlgo(EqualSkewOption opt)
+    : equal_skew_opt_(opt),
+      random_device_(),
+      generator_(random_device_()) {
+}
+
+Status TwoDimensionalGreedyAlgo::GetNextMove(
+    const ClusterBalanceInfo& cluster_info,
+    boost::optional<TableReplicaMove>* move) {
+  DCHECK(move);
+  // Set the output to none: this fits the short-circuit cases when there is
+  // an issue with the parameters or there aren't any moves to return.
+  *move = boost::none;
+
+  // Due to the nature of the table_info_by_skew container, the very last
+  // range represents the most unbalanced tables.
+  const auto& table_info_by_skew = cluster_info.table_info_by_skew;
+  if (table_info_by_skew.empty()) {
+    return Status::InvalidArgument("no table balance information");
+  }
+  const auto max_table_skew = table_info_by_skew.rbegin()->first;
+
+  const auto& servers_by_total_replica_count =
+      cluster_info.servers_by_total_replica_count;
+  if (servers_by_total_replica_count.empty()) {
+    return Status::InvalidArgument("no per-server replica count information");
+  }
+  const auto max_server_skew =
+      servers_by_total_replica_count.rbegin()->first -
+      servers_by_total_replica_count.begin()->first;
+
+  if (max_table_skew == 0) {
+    // Every table is balanced and any move will unbalance a table, so there
+    // is no potential for the greedy algorithm to balance the cluster.
+    return Status::OK();
+  }
+  if (max_table_skew <= 1 && max_server_skew <= 1) {
+    // Every table is balanced and the cluster as a whole is balanced.
+    return Status::OK();
+  }
+
+  // Among the tables with maximum skew, attempt to pick a table where there is
+  // a move that improves the table skew and the cluster skew, if possible. If
+  // not, attempt to pick a move that improves the table skew. If all tables
+  // are balanced, attempt to pick a move that preserves table balance and
+  // improves cluster skew.
+  const auto range = table_info_by_skew.equal_range(max_table_skew);
+  for (auto it = range.first; it != range.second; ++it) {
+    const TableBalanceInfo& tbi = it->second;
+    const auto& servers_by_table_replica_count = tbi.servers_by_replica_count;
+    if (servers_by_table_replica_count.empty()) {
+      return Status::InvalidArgument(Substitute(
+          "no information on replicas of table $0", tbi.table_id));
+    }
+
+    const auto min_replica_count = servers_by_table_replica_count.begin()->first;
+    const auto max_replica_count = servers_by_table_replica_count.rbegin()->first;
+    VLOG(1) << Substitute(
+        "balancing table $0 with replica count skew $1 "
+        "(min_replica_count: $2, max_replica_count: $3)",
+        tbi.table_id, table_info_by_skew.rbegin()->first,
+        min_replica_count, max_replica_count);
+
+    // Compute the intersection of the tablet servers most loaded for the table
+    // with the tablet servers most loaded overall, and likewise for least loaded.
+    // These are our ideal candidates for moving from and to, respectively.
+    int32_t max_count_table;
+    int32_t max_count_total;
+    vector<string> max_loaded;
+    vector<string> max_loaded_intersection;
+    RETURN_NOT_OK(GetIntersection(
+        ExtremumType::MAX,
+        servers_by_table_replica_count, servers_by_total_replica_count,
+        &max_count_table, &max_count_total,
+        &max_loaded, &max_loaded_intersection));
+    int32_t min_count_table;
+    int32_t min_count_total;
+    vector<string> min_loaded;
+    vector<string> min_loaded_intersection;
+    RETURN_NOT_OK(GetIntersection(
+        ExtremumType::MIN,
+        servers_by_table_replica_count, servers_by_total_replica_count,
+        &min_count_table, &min_count_total,
+        &min_loaded, &min_loaded_intersection));
+
+    VLOG(1) << Substitute("table-wise  : min_count: $0, max_count: $1",
+                          min_count_table, max_count_table);
+    VLOG(1) << Substitute("cluster-wise: min_count: $0, max_count: $1",
+                          min_count_total, max_count_total);
+    if (PREDICT_FALSE(VLOG_IS_ON(1))) {
+      ostringstream s;
+      s << "[ ";
+      for (const auto& e : max_loaded_intersection) {
+        s << e << " ";
+      }
+      s << "]";
+      VLOG(1) << "max_loaded_intersection: " << s.str();
+
+      s.str("");
+      s << "[ ";
+      for (const auto& e : min_loaded_intersection) {
+        s << e << " ";
+      }
+      s << "]";
+      VLOG(1) << "min_loaded_intersection: " << s.str();
+    }
+    // Do not move replicas of a balanced table if the least (most) loaded
+    // servers overall do not intersect the servers hosting the least (most)
+    // replicas of the table. Moving a replica in that case might keep the
+    // cluster skew the same or make it worse while keeping the table balanced.
+    if ((max_count_table <= min_count_table + 1) &&
+        (min_loaded_intersection.empty() || max_loaded_intersection.empty())) {
+      continue;
+    }
+    if (equal_skew_opt_ == EqualSkewOption::PICK_RANDOM) {
+      shuffle(min_loaded.begin(), min_loaded.end(), generator_);
+      shuffle(min_loaded_intersection.begin(), min_loaded_intersection.end(),
+              generator_);
+      shuffle(max_loaded.begin(), max_loaded.end(), generator_);
+      shuffle(max_loaded_intersection.begin(), max_loaded_intersection.end(),
+              generator_);
+    }
+    const auto& min_loaded_uuid = min_loaded_intersection.empty()
+        ? min_loaded.front() : min_loaded_intersection.front();
+    const auto& max_loaded_uuid = max_loaded_intersection.empty()
+        ? max_loaded.back() : max_loaded_intersection.back();
+    VLOG(1) << Substitute("min_loaded_uuid: $0, max_loaded_uuid: $1",
+                          min_loaded_uuid, max_loaded_uuid);
+    if (min_loaded_uuid == max_loaded_uuid) {
+      // Nothing to move.
+      continue;
+    }
+
+    // Move a replica of the selected table from a most loaded server to a
+    // least loaded server.
+    *move = { tbi.table_id, max_loaded_uuid, min_loaded_uuid };
+    break;
+  }
+
+  return Status::OK();
+}
+
+Status TwoDimensionalGreedyAlgo::GetIntersection(
+    ExtremumType extremum,
+    const ServersByCountMap& servers_by_table_replica_count,
+    const ServersByCountMap& servers_by_total_replica_count,
+    int32_t* replica_count_table,
+    int32_t* replica_count_total,
+    vector<string>* server_uuids,
+    vector<string>* intersection) {
+  DCHECK(extremum == ExtremumType::MIN || extremum == ExtremumType::MAX);
+  DCHECK(replica_count_table);
+  DCHECK(replica_count_total);
+  DCHECK(server_uuids);
+  DCHECK(intersection);
+  if (servers_by_table_replica_count.empty()) {
+    return Status::InvalidArgument("no information on table replica count");
+  }
+  if (servers_by_total_replica_count.empty()) {
+    return Status::InvalidArgument("no information on total replica count");
+  }
+
+  vector<string> server_uuids_table;
+  RETURN_NOT_OK(GetMinMaxLoadedServers(
+      servers_by_table_replica_count, extremum, replica_count_table,
+      &server_uuids_table));
+  sort(server_uuids_table.begin(), server_uuids_table.end());
+
+  vector<string> server_uuids_total;
+  RETURN_NOT_OK(GetMinMaxLoadedServers(
+      servers_by_total_replica_count, extremum, replica_count_total,
+      &server_uuids_total));
+  sort(server_uuids_total.begin(), server_uuids_total.end());
+
+  intersection->clear();
+  set_intersection(
+      server_uuids_table.begin(), server_uuids_table.end(),
+      server_uuids_total.begin(), server_uuids_total.end(),
+      back_inserter(*intersection));
+  server_uuids->swap(server_uuids_table);
+
+  return Status::OK();
+}
+
+Status TwoDimensionalGreedyAlgo::GetMinMaxLoadedServers(
+    const ServersByCountMap& servers_by_replica_count,
+    ExtremumType extremum,
+    int32_t* replica_count,
+    vector<string>* server_uuids) {
+  DCHECK(extremum == ExtremumType::MIN || extremum == ExtremumType::MAX);
+  DCHECK(replica_count);
+  DCHECK(server_uuids);
+
+  if (servers_by_replica_count.empty()) {
+    return Status::InvalidArgument("no balance information");
+  }
+  const auto count = (extremum == ExtremumType::MIN)
+      ? servers_by_replica_count.begin()->first
+      : servers_by_replica_count.rbegin()->first;
+  const auto range = servers_by_replica_count.equal_range(count);
+  std::transform(range.first, range.second, back_inserter(*server_uuids),
+                 [](const ServersByCountMap::value_type& elem) {
+                   return elem.second;
+                 });
+  *replica_count = count;
+
+  return Status::OK();
+}
+
+} // namespace tools
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/ccdcf6ce/src/kudu/tools/rebalance_algo.h
----------------------------------------------------------------------
diff --git a/src/kudu/tools/rebalance_algo.h b/src/kudu/tools/rebalance_algo.h
new file mode 100644
index 0000000..c3543fd
--- /dev/null
+++ b/src/kudu/tools/rebalance_algo.h
@@ -0,0 +1,176 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <cstdint>
+#include <map>
+#include <random>
+#include <string>
+#include <vector>
+
+#include "kudu/util/status.h"
+
+namespace boost {
+template <class T> class optional;
+} // namespace boost
+
+namespace kudu {
+namespace tools {
+
+// A map from a count of replicas to a server id. The "reversed" relationship
+// facilitates finding the servers with the maximum and minimum counts.
+typedef std::multimap<int32_t, std::string> ServersByCountMap;
+
+// Balance information for a table.
+struct TableBalanceInfo {
+  std::string table_id;
+
+  // Mapping table replica count -> tablet server.
+  //
+  // The table replica count of a tablet server is defined as the number of
+  // replicas belonging to the table hosted on the tablet server.
+  ServersByCountMap servers_by_replica_count;
+};
+
+// Balance information for a cluster.
+struct ClusterBalanceInfo {
+  // Mapping table skew -> table balance information. The "reversed"
+  // relationship facilitates finding the most and least skewed tables.
+  //
+  // The skew of a table is defined as the difference between its most
+  // occupied and least occupied tablet servers. Skew is considered to be
+  // improved both when the number of pairs of tablet servers exhibiting max
+  // skew between them decreases, or when the skew decreases.
+  std::multimap<int32_t, TableBalanceInfo> table_info_by_skew;
+
+  // Mapping total replica count -> tablet server.
+  //
+  // The total replica count of a tablet server is defined as the total number
+  // of replicas hosted on the tablet server.
+  ServersByCountMap servers_by_total_replica_count;
+};
+
+// A directive to move some replica of a table between two tablet servers.
+struct TableReplicaMove {
+  std::string table_id;
+  std::string from;     // Unique identifier of the source tablet server.
+  std::string to;       // Unique identifier of the target tablet server.
+};
+
+// A rebalancing algorithm, which orders replica moves aiming to balance a
+// cluster. The definition of "balance" depends on the algorithm.
+class RebalancingAlgo {
+ public:
+  virtual ~RebalancingAlgo() = default;
+
+  // The top-level method of the algorithm. Using information on the current
+  // balance state of the cluster in 'cluster_info', the algorithm populates
+  // the output parameter 'moves' with no more than 'max_moves_num' replica
+  // moves that aim to balance the cluster. Due to code conventions, 'int' is
+  // used instead of 'size_t' as 'max_moves_num' type; 'max_moves_num' must be
+  // non-negative value, where value of '0' is a shortcut for
+  // 'the possible maximum'.
+  //
+  // Once this method returns Status::OK() and leaves 'moves' empty, the cluster
+  // is considered balanced.
+  //
+  // 'moves' must be non-NULL.
+  virtual Status GetNextMoves(const ClusterBalanceInfo& cluster_info,
+                              int max_moves_num,
+                              std::vector<TableReplicaMove>* moves);
+ protected:
+  // Get the next rebalancing move from the algorithm. If there is no such move,
+  // the 'move' output parameter is set to 'boost::none'.
+  //
+  // 'move' must be non-NULL.
+  virtual Status GetNextMove(const ClusterBalanceInfo& cluster_info,
+                             boost::optional<TableReplicaMove>* move) = 0;
+
+  // Update the balance state in 'cluster_info' with the outcome of the move
+  // 'move'. 'cluster_info' is an in-out parameter.
+  //
+  // 'cluster_info' must be non-NULL.
+  static Status ApplyMove(const TableReplicaMove& move,
+                          ClusterBalanceInfo* cluster_info);
+};
+
+// A two-dimensional greedy rebalancing algorithm. From among moves that
+// decrease the skew of a most skewed table, it prefers ones that reduce the
+// skew of the cluster. A cluster is considered balanced when the skew of every
+// table is <= 1 and the skew of the cluster is <= 1.
+//
+// The skew of the cluster is defined as the difference between the maximum
+// total replica count over all tablet servers and the minimum total replica
+// count over all tablet servers.
+class TwoDimensionalGreedyAlgo : public RebalancingAlgo {
+ public:
+  // Policies for picking one element from equal-skew choices.
+  enum class EqualSkewOption {
+    PICK_FIRST,
+    PICK_RANDOM,
+  };
+  explicit TwoDimensionalGreedyAlgo(
+      EqualSkewOption opt = EqualSkewOption::PICK_RANDOM);
+
+  Status GetNextMove(const ClusterBalanceInfo& cluster_info,
+                     boost::optional<TableReplicaMove>* move) override;
+
+ private:
+  enum class ExtremumType { MAX, MIN, };
+
+  // Compute the intersection of the least or most loaded tablet servers for a
+  // table with the least or most loaded tablet servers in the cluster:
+  // 'intersection' is populated with the ids of the tablet servers in the
+  // intersection of those two sets. The minimum or maximum number of replicas
+  // per tablet server for the table and for the cluster are output into
+  // 'replica_count_table' and 'replica_count_total', respectively. Identifiers
+  // of the tablet servers with '*replica_count_table' replicas of the table are
+  // output into the 'server_uuids' parameter. Whether to consider most or least
+  // loaded servers is controlled by 'extremum'. An empty 'intersection' on
+  // return means no intersection was found for the mentioned sets of the
+  // extremally loaded servers: in that case optimizing the load by table would
+  // not affect the extreme load by server.
+  //
+  // None of the output parameters may be NULL.
+  Status GetIntersection(
+      ExtremumType extremum,
+      const ServersByCountMap& servers_by_table_replica_count,
+      const ServersByCountMap& servers_by_total_replica_count,
+      int32_t* replica_count_table,
+      int32_t* replica_count_total,
+      std::vector<std::string>* server_uuids,
+      std::vector<std::string>* intersection);
+
+  // As determined by 'extremum', find the tablet servers with the minimum or
+  // maximum count in 'server_by_replica_counts'. The extremal count will be set
+  // in 'replica_count', while 'server_uuids' will be populated with the servers
+  // that host the extremal replica count.
+  //
+  // Neither of the output parameters may be NULL.
+  Status GetMinMaxLoadedServers(
+      const ServersByCountMap& servers_by_replica_count,
+      ExtremumType extremum,
+      int32_t* replica_count,
+      std::vector<std::string>* server_uuids);
+
+  const EqualSkewOption equal_skew_opt_;
+  std::random_device random_device_;
+  std::mt19937 generator_;
+};
+
+} // namespace tools
+} // namespace kudu


Mime
View raw message