[rebalancing] add a rebalancing algorithm
This changelist introduces the high-level rebalancing algorithm
and corresponding unit tests. It contains both an abstraction for
rebalancing algorithms in general and an implementation of a
greedy algorithm that rebalances a cluster by trying to equalize
the number of replicas per tablet server for each table and for the
cluster as a whole.
The algorithm just identifies what moves should be done given an
arrangement of a cluster. The 'engine' that performs moves
will be added in a subsequent changelist.
Change-Id: I5a8050ee79117a2ae2f6f88740ed25656946cfb4
Reviewed-on: http://gerrit.cloudera.org:8080/10336
Reviewed-by: Mike Percy <mpercy@apache.org>
Reviewed-by: Will Berkeley <wdberkeley@gmail.com>
Tested-by: Kudu Jenkins
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/ccdcf6ce
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/ccdcf6ce
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/ccdcf6ce
Branch: refs/heads/master
Commit: ccdcf6cea99d1b0fa0ba1106e793b31aa607ba06
Parents: 8719014
Author: Alexey Serbin <aserbin@cloudera.com>
Authored: Mon May 7 15:33:11 2018 -0700
Committer: Alexey Serbin <aserbin@cloudera.com>
Committed: Thu Jun 7 04:09:18 2018 +0000
----------------------------------------------------------------------
src/kudu/tools/CMakeLists.txt | 15 +
src/kudu/tools/rebalance_algo-test.cc | 629 +++++++++++++++++++++++++++++
src/kudu/tools/rebalance_algo.cc | 393 ++++++++++++++++++
src/kudu/tools/rebalance_algo.h | 176 ++++++++
4 files changed, 1213 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/ccdcf6ce/src/kudu/tools/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/tools/CMakeLists.txt b/src/kudu/tools/CMakeLists.txt
index f88e8f4..ea38500 100644
--- a/src/kudu/tools/CMakeLists.txt
+++ b/src/kudu/tools/CMakeLists.txt
@@ -81,6 +81,18 @@ target_link_libraries(ksck
)
#######################################
+# kudu_tools_rebalance
+#######################################
+
+add_library(kudu_tools_rebalance
+ rebalance_algo.cc
+)
+target_link_libraries(kudu_tools_rebalance
+ kudu_common
+ ${KUDU_BASE_LIBS}
+)
+
+#######################################
# kudu
#######################################
@@ -112,6 +124,7 @@ target_link_libraries(kudu
kudu_client_test_util
kudu_common
kudu_fs
+ kudu_tools_rebalance
kudu_util
log
master
@@ -141,6 +154,7 @@ set(KUDU_TEST_LINK_LIBS
ksck
kudu_tools_util
kudu_tools_test_util
+ kudu_tools_rebalance
itest_util
mini_cluster
${KUDU_MIN_TEST_LIBS})
@@ -158,4 +172,5 @@ ADD_KUDU_TEST_DEPENDENCIES(kudu-tool-test
ADD_KUDU_TEST(kudu-ts-cli-test)
ADD_KUDU_TEST_DEPENDENCIES(kudu-ts-cli-test
kudu)
+ADD_KUDU_TEST(rebalance_algo-test)
ADD_KUDU_TEST(tool_action-test)
http://git-wip-us.apache.org/repos/asf/kudu/blob/ccdcf6ce/src/kudu/tools/rebalance_algo-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/rebalance_algo-test.cc b/src/kudu/tools/rebalance_algo-test.cc
new file mode 100644
index 0000000..3da13e1
--- /dev/null
+++ b/src/kudu/tools/rebalance_algo-test.cc
@@ -0,0 +1,629 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/tools/rebalance_algo.h"
+
+#include <algorithm>
+#include <cstddef>
+#include <iostream>
+#include <iterator>
+#include <map>
+#include <memory>
+#include <set>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+
+namespace kudu {
+namespace tools {
+struct TestClusterConfig;
+} // namespace tools
+} // namespace kudu
+
+#define VERIFY_MOVES(test_config) \
+ do { \
+ for (auto idx = 0; idx < ARRAYSIZE((test_config)); ++idx) { \
+ SCOPED_TRACE(Substitute("test config index: $0", idx)); \
+ NO_FATALS(VerifyRebalancingMoves((test_config)[idx])); \
+ } \
+ } while (false)
+
+using std::ostream;
+using std::set;
+using std::sort;
+using std::string;
+using std::vector;
+using strings::Substitute;
+
+namespace kudu {
+namespace tools {
+
+struct TablePerServerReplicas {
+ const string table_id;
+
+ // Number of replicas of this table on each server in the cluster.
+ const vector<size_t> num_replicas_by_server;
+};
+
+// Structure to describe rebalancing-related state of the cluster expressively
+// enough for the tests.
+struct TestClusterConfig {
+ // UUIDs of tablet servers; every element must be unique.
+ const vector<string> tserver_uuids;
+
+ // Distribution of tablet replicas across the tablet servers. The following
+ // constraints should be in place:
+ // * for each t in table_replicas:
+ // t.num_replicas_by_server.size() == tserver_uuids.size()
+ const vector<TablePerServerReplicas> table_replicas;
+
+ // The expected replica movements: the reference output of the algorithm
+ // to compare with.
+ const vector<TableReplicaMove> expected_moves;
+};
+
+bool operator==(const TableReplicaMove& lhs, const TableReplicaMove& rhs) {
+ return
+ lhs.table_id == rhs.table_id &&
+ lhs.from == rhs.from &&
+ lhs.to == rhs.to;
+}
+
+ostream& operator<<(ostream& o, const TableReplicaMove& move) {
+ o << move.table_id << ":" << move.from << "->" << move.to;
+ return o;
+}
+
+// Transform the definition of the test cluster into the ClusterBalanceInfo
+// that is consumed by the rebalancing algorithm.
+void ClusterConfigToClusterBalanceInfo(const TestClusterConfig& tcc,
+ ClusterBalanceInfo* cbi) {
+ // First verify that the configuration of the test cluster is valid.
+ set<string> table_ids;
+ for (const auto& table_replica_info : tcc.table_replicas) {
+ CHECK_EQ(tcc.tserver_uuids.size(),
+ table_replica_info.num_replicas_by_server.size());
+ table_ids.emplace(table_replica_info.table_id);
+ }
+ CHECK_EQ(table_ids.size(), tcc.table_replicas.size());
+ {
+ // Check for uniqueness of the tablet servers' identifiers.
+ set<string> uuids(tcc.tserver_uuids.begin(), tcc.tserver_uuids.end());
+ CHECK_EQ(tcc.tserver_uuids.size(), uuids.size());
+ }
+
+ ClusterBalanceInfo result;
+ for (size_t tserver_idx = 0; tserver_idx < tcc.tserver_uuids.size();
+ ++tserver_idx) {
+ // Total replica count at the tablet server.
+ size_t count = 0;
+ for (const auto& table_replica_info: tcc.table_replicas) {
+ count += table_replica_info.num_replicas_by_server[tserver_idx];
+ }
+ result.servers_by_total_replica_count.emplace(count, tcc.tserver_uuids[tserver_idx]);
+ }
+
+ auto& table_info_by_skew = result.table_info_by_skew;
+ for (size_t table_idx = 0; table_idx < tcc.table_replicas.size(); ++table_idx) {
+ // Replicas of the current table per tablet server.
+ const vector<size_t>& replicas_count =
+ tcc.table_replicas[table_idx].num_replicas_by_server;
+ TableBalanceInfo info;
+ info.table_id = tcc.table_replicas[table_idx].table_id;
+ for (size_t tserver_idx = 0; tserver_idx < replicas_count.size(); ++tserver_idx) {
+ auto count = replicas_count[tserver_idx];
+ info.servers_by_replica_count.emplace(count, tcc.tserver_uuids[tserver_idx]);
+ }
+ size_t max_count = info.servers_by_replica_count.rbegin()->first;
+ size_t min_count = info.servers_by_replica_count.begin()->first;
+ CHECK_GE(max_count, min_count);
+ table_info_by_skew.emplace(max_count - min_count, std::move(info));
+ }
+ *cbi = std::move(result);
+}
+
+void VerifyRebalancingMoves(const TestClusterConfig& cfg) {
+ vector<TableReplicaMove> moves;
+ {
+ ClusterBalanceInfo cbi;
+ ClusterConfigToClusterBalanceInfo(cfg, &cbi);
+ TwoDimensionalGreedyAlgo algo(
+ TwoDimensionalGreedyAlgo::EqualSkewOption::PICK_FIRST);
+ ASSERT_OK(algo.GetNextMoves(std::move(cbi), 0, &moves));
+ }
+ EXPECT_EQ(cfg.expected_moves, moves);
+}
+
+// Test the behavior of the algorithm when no input information is given.
+TEST(RebalanceAlgoUnitTest, EmptyClusterBalanceInfo) {
+ TwoDimensionalGreedyAlgo algo;
+ vector<TableReplicaMove> moves;
+ ClusterBalanceInfo info;
+ Status s = algo.GetNextMoves(info, 1, &moves);
+ ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
+ EXPECT_TRUE(moves.empty());
+}
+
+// Various scenarios of balanced configurations where no moves are expected
+// to happen.
+TEST(RebalanceAlgoUnitTest, AlreadyBalanced) {
+ // The configurations are already balanced, no moves should be attempted.
+ const TestClusterConfig kConfigs[] = {
+ {
+ // A single tablet server with a single replica of the only table.
+ { "0", },
+ {
+ { "A", { 1 } },
+ },
+ },
+ {
+ // A single tablet server in the cluster that hosts all replicas.
+ { "0", },
+ {
+ { "A", { 1 } },
+ { "B", { 10 } },
+ { "C", { 100 } },
+ },
+ },
+ {
+ // Single table and 2 TS: 100 and 99 replicas at each.
+ { "0", "1", },
+ {
+ { "A", { 100, 99, } },
+ },
+ },
+ {
+ // Table- and cluster-wise balanced configuration with one-off skew.
+ { "0", "1", },
+ {
+ { "A", { 1, 1, } },
+ { "B", { 1, 2, } },
+ },
+ },
+ {
+ // A configuration which has zero skew cluster-wise, while the table-wise
+ // balance has one-off skew: the algorithm should not try to correct
+ // the latter.
+ { "0", "1", },
+ {
+ { "A", { 1, 2, } },
+ { "B", { 1, 2, } },
+ { "C", { 1, 0, } },
+ { "D", { 1, 0, } },
+ },
+ },
+ {
+ { "0", "1", "2", },
+ {
+ { "A", { 1, 0, 0, } },
+ { "B", { 0, 1, 0, } },
+ { "C", { 0, 0, 1, } },
+ },
+ },
+ {
+ // A simple balanced case: 3 tablet servers, 3 tables with
+ // one replica per server.
+ { "0", "1", "2", },
+ {
+ { "A", { 1, 1, 1, } },
+ { "B", { 1, 1, 1, } },
+ { "C", { 1, 1, 1, } },
+ },
+ },
+ {
+ { "0", "1", "2", },
+ {
+ { "A", { 0, 1, 1, } },
+ { "B", { 1, 0, 1, } },
+ { "C", { 1, 1, 0, } },
+ },
+ },
+ {
+ { "0", "1", "2", },
+ {
+ { "A", { 2, 1, 1, } },
+ { "B", { 1, 2, 1, } },
+ { "C", { 1, 1, 2, } },
+ },
+ },
+ {
+ { "0", "1", "2", },
+ {
+ { "A", { 1, 1, 0, } },
+ { "B", { 1, 1, 0, } },
+ { "C", { 1, 0, 1, } },
+ { "D", { 1, 0, 1, } },
+ { "E", { 0, 1, 1, } },
+ { "F", { 0, 1, 1, } },
+ },
+ },
+ {
+ { "0", "1", "2", },
+ {
+ { "A", { 1, 0, 1, } },
+ { "B", { 1, 1, 0, } },
+ },
+ },
+ {
+ { "0", "1", "2", },
+ {
+ { "B", { 1, 0, 1, } },
+ { "A", { 1, 1, 0, } },
+ },
+ },
+ {
+ { "0", "1", "2", },
+ {
+ { "A", { 2, 2, 1, } },
+ { "B", { 1, 0, 1, } },
+ },
+ },
+ {
+ { "0", "1", "2", },
+ {
+ { "A", { 2, 2, 1, } },
+ { "B", { 1, 1, 1, } },
+ },
+ },
+ {
+ { "0", "1", "2", },
+ {
+ { "A", { 2, 2, 1, } },
+ { "B", { 0, 0, 1, } },
+ { "C", { 0, 0, 1, } },
+ },
+ },
+ {
+ { "0", "1", "2", },
+ {
+ { "A", { 0, 1, 0, } },
+ { "B", { 1, 0, 1, } },
+ { "C", { 1, 0, 1, } },
+ },
+ },
+ };
+ VERIFY_MOVES(kConfigs);
+}
+
+// Set of scenarios where the distribution of replicas is table-wise balanced
+// but not yet cluster-wise balanced, requiring just a few replica moves
+// to achieve both table- and cluster-wise balance state.
+TEST(RebalanceAlgoUnitTest, TableWiseBalanced) {
+ const TestClusterConfig kConfigs[] = {
+ {
+ { "0", "1", },
+ {
+ { "A", { 100, 99, } },
+ { "B", { 100, 99, } },
+ },
+ { { "A", "0", "1" }, }
+ },
+ {
+ { "0", "1", },
+ {
+ { "A", { 1, 2, } },
+ { "B", { 1, 2, } },
+ { "C", { 1, 0, } },
+ { "D", { 0, 1, } },
+ },
+ { { "A", "1", "0" }, }
+ },
+ {
+ { "0", "1", "2", },
+ {
+ { "A", { 1, 0, 0, } },
+ { "B", { 0, 1, 0, } },
+ { "C", { 1, 0, 0, } },
+ },
+ { { "A", "0", "2" }, }
+ },
+ {
+ { "0", "1", "2", },
+ {
+ { "A", { 1, 1, 1, } },
+ { "B", { 0, 1, 1, } },
+ { "C", { 0, 0, 1, } },
+ },
+ { { "B", "2", "0" }, }
+ },
+ {
+ { "0", "1", "2", },
+ {
+ { "A", { 1, 1, 0, } },
+ { "B", { 1, 0, 1, } },
+ { "C", { 1, 0, 1, } },
+ },
+ { { "B", "0", "1" }, }
+ },
+ {
+ { "0", "1", "2", },
+ {
+ { "C", { 1, 0, 1, } },
+ { "B", { 1, 0, 1, } },
+ { "A", { 1, 1, 0, } },
+ },
+ { { "C", "0", "1" }, }
+ },
+ };
+ VERIFY_MOVES(kConfigs);
+}
+
+// Simple table-wise balanced configuration to have just one one-move
+// to make them cluster-wise balanced as well.
+TEST(RebalanceAlgoUnitTest, OneMoveNoCycling) {
+ // The internals of the algorithm might depend on the table UUID ordering,
+ // that's why multiples of virtually same configuration.
+ const TestClusterConfig kConfigs[] = {
+ {
+ { "0", "1", "2", },
+ {
+ { "A", { 1, 0, 1, } },
+ { "B", { 1, 0, 1, } },
+ { "C", { 1, 1, 0, } },
+ },
+ { { "A", "0", "1" }, }
+ },
+ {
+ { "0", "1", "2", },
+ {
+ { "A", { 1, 0, 1, } },
+ { "C", { 1, 0, 1, } },
+ { "B", { 1, 1, 0, } },
+ },
+ { { "A", "0", "1" }, }
+ },
+ {
+ { "0", "1", "2", },
+ {
+ { "B", { 1, 0, 1, } },
+ { "C", { 1, 0, 1, } },
+ { "A", { 1, 1, 0, } },
+ },
+ { { "B", "0", "1" }, }
+ },
+ {
+ { "0", "1", "2", },
+ {
+ { "B", { 1, 0, 1, } },
+ { "A", { 1, 0, 1, } },
+ { "C", { 1, 1, 0, } },
+ },
+ { { "B", "0", "1" }, }
+ },
+ {
+ { "0", "1", "2", },
+ {
+ { "C", { 1, 0, 1, } },
+ { "A", { 1, 0, 1, } },
+ { "B", { 1, 1, 0, } },
+ },
+ { { "C", "0", "1" }, }
+ },
+ {
+ { "0", "1", "2", },
+ {
+ { "C", { 1, 0, 1, } },
+ { "B", { 1, 0, 1, } },
+ { "A", { 1, 1, 0, } },
+ },
+ { { "C", "0", "1" }, }
+ },
+ };
+ VERIFY_MOVES(kConfigs);
+}
+
+// Set of scenarios where the distribution of table replicas is cluster-wise
+// balanced, but not table-wise balanced, requiring just few moves to make it
+// both table- and cluster-wise balanced.
+TEST(RebalanceAlgoUnitTest, ClusterWiseBalanced) {
+ const TestClusterConfig kConfigs[] = {
+ {
+ { "0", "1", },
+ {
+ { "A", { 2, 0, } },
+ { "B", { 1, 2, } },
+ },
+ {
+ { "A", "0", "1" },
+ }
+ },
+ {
+ { "0", "1", },
+ {
+ { "A", { 1, 2, } },
+ { "B", { 2, 0, } },
+ { "C", { 1, 2, } },
+ },
+ {
+ { "B", "0", "1" },
+ { "A", "1", "0" },
+ }
+ },
+ {
+ { "0", "1", "2", },
+ {
+ { "A", { 2, 1, 0, } },
+ { "B", { 0, 1, 2, } },
+ },
+ {
+ { "A", "0", "2" },
+ { "B", "2", "0" },
+ }
+ },
+ {
+ { "0", "1", "2", },
+ {
+ { "A", { 2, 1, 0, } },
+ { "B", { 0, 1, 2, } },
+ { "C", { 1, 1, 2, } },
+ },
+ {
+ { "A", "0", "2" },
+ { "B", "2", "0" },
+ }
+ },
+ };
+ VERIFY_MOVES(kConfigs);
+}
+
+// Unbalanced (both table- and cluster-wise) and simple enough configurations
+// to make them balanced moving just few replicas.
+TEST(RebalanceAlgoUnitTest, FewMoves) {
+ const TestClusterConfig kConfigs[] = {
+ {
+ { "0", "1", },
+ {
+ { "A", { 2, 0, } },
+ },
+ { { "A", "0", "1" }, }
+ },
+ {
+ { "0", "1", },
+ {
+ { "A", { 3, 0, } },
+ },
+ { { "A", "0", "1" }, }
+ },
+ {
+ { "0", "1", },
+ {
+ { "A", { 4, 0, } },
+ },
+ {
+ { "A", "0", "1" },
+ { "A", "0", "1" },
+ }
+ },
+ {
+ { "0", "1", },
+ {
+ { "A", { 1, 2, } },
+ { "B", { 2, 0, } },
+ { "C", { 2, 1, } },
+ },
+ {
+ { "B", "0", "1" },
+ }
+ },
+ {
+ { "0", "1", },
+ {
+ { "A", { 4, 0, } },
+ { "B", { 1, 3, } },
+ },
+ {
+ { "A", "0", "1" },
+ { "B", "1", "0" },
+ { "A", "0", "1" },
+ }
+ },
+ {
+ { "0", "1", "2", },
+ {
+ { "A", { 4, 2, 0, } },
+ { "B", { 2, 1, 0, } },
+ { "C", { 1, 1, 1, } },
+ },
+ {
+ { "A", "0", "2" },
+ { "B", "0", "2" },
+ { "A", "0", "2" },
+ }
+ },
+ {
+ { "0", "1", "2", },
+ {
+ { "A", { 2, 1, 0, } },
+ { "B", { 3, 2, 1, } },
+ { "C", { 2, 3, 5, } },
+ },
+ {
+ { "C", "2", "0" },
+ { "A", "0", "2" },
+ { "B", "0", "2" },
+ }
+ },
+ {
+ { "0", "1", "2", },
+ {
+ { "A", { 5, 1, 0, } },
+ },
+ {
+ { "A", "0", "2" },
+ { "A", "0", "1" },
+ { "A", "0", "2" },
+ }
+ },
+ {
+ { "0", "1", "2", },
+ {
+ { "A", { 5, 1, 0, } },
+ { "B", { 0, 1, 5, } },
+ },
+ {
+ { "A", "0", "2" },
+ { "B", "2", "0" },
+ { "A", "0", "1" },
+ { "B", "2", "1" },
+ { "A", "0", "2" },
+ { "B", "2", "0" },
+ }
+ },
+ };
+ VERIFY_MOVES(kConfigs);
+}
+
+// Unbalanced (both table- and cluster-wise) and simple enough configurations to
+// make them balanced moving many replicas around.
+TEST(RebalanceAlgoUnitTest, ManyMoves) {
+ const TestClusterConfig kConfig = {
+ { "0", "1", "2", },
+ {
+ { "A", { 100, 400, 100, } },
+ },
+ };
+ constexpr size_t kExpectedMovesNum = 200;
+
+ ClusterBalanceInfo cbi;
+ ClusterConfigToClusterBalanceInfo(kConfig, &cbi);
+
+ vector<TableReplicaMove> ref_moves;
+ for (size_t i = 0; i < kExpectedMovesNum; ++i) {
+ if (i % 2) {
+ ref_moves.push_back({ "A", "1", "2" });
+ } else {
+ ref_moves.push_back({ "A", "1", "0" });
+ }
+ }
+
+ TwoDimensionalGreedyAlgo algo(
+ TwoDimensionalGreedyAlgo::EqualSkewOption::PICK_FIRST);
+ vector<TableReplicaMove> moves;
+ ASSERT_OK(algo.GetNextMoves(cbi, 0, &moves));
+ EXPECT_EQ(ref_moves, moves);
+}
+
+} // namespace tools
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/kudu/blob/ccdcf6ce/src/kudu/tools/rebalance_algo.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/rebalance_algo.cc b/src/kudu/tools/rebalance_algo.cc
new file mode 100644
index 0000000..ec099be
--- /dev/null
+++ b/src/kudu/tools/rebalance_algo.cc
@@ -0,0 +1,393 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/tools/rebalance_algo.h"
+
+#include <algorithm>
+#include <iostream>
+#include <iterator>
+#include <limits>
+#include <map>
+#include <memory>
+#include <random>
+#include <string>
+#include <unordered_map>
+#include <unordered_set>
+#include <utility>
+#include <vector>
+
+#include <boost/optional/optional.hpp>
+#include <glog/logging.h>
+
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/status.h"
+
+using std::back_inserter;
+using std::cout;
+using std::endl;
+using std::make_pair;
+using std::multimap;
+using std::ostringstream;
+using std::set_intersection;
+using std::shared_ptr;
+using std::shuffle;
+using std::sort;
+using std::string;
+using std::unordered_map;
+using std::unordered_set;
+using std::vector;
+using strings::Substitute;
+
+namespace kudu {
+namespace tools {
+
+namespace {
+
+// Applies to 'm' a move of a replica from the tablet server with id 'src' to
+// the tablet server with id 'dst' by decrementing the count of 'src' and
+// incrementing the count of 'dst'.
+// Returns Status::NotFound if either 'src' or 'dst' is not present in 'm'.
+Status MoveOneReplica(const string& src,
+ const string& dst,
+ ServersByCountMap* m) {
+ bool found_src = false;
+ bool found_dst = false;
+ int32_t count_src = 0;
+ int32_t count_dst = 0;
+ for (auto it = m->begin(); it != m->end(); ) {
+ if (it->second != src && it->second != dst) {
+ ++it;
+ continue;
+ }
+ auto count = it->first;
+ if (it->second == src) {
+ found_src = true;
+ count_src = count;
+ } else {
+ DCHECK_EQ(dst, it->second);
+ found_dst = true;
+ count_dst = count;
+ }
+ it = m->erase(it);
+ }
+ if (!found_src) {
+ if (found_dst) {
+ // Preserving the original data in the container.
+ m->emplace(count_dst, dst);
+ }
+ return Status::NotFound("no per-server counts for replica", src);
+ }
+ if (!found_dst) {
+ if (found_src) {
+ // Preserving the original data in the container.
+ m->emplace(count_src, src);
+ }
+ return Status::NotFound("no per-server counts for replica", dst);
+ }
+
+ // Moving replica from 'src' to 'dst', updating the counter correspondingly.
+ m->emplace(count_src - 1, src);
+ m->emplace(count_dst + 1, dst);
+ return Status::OK();
+}
+} // anonymous namespace
+
+Status RebalancingAlgo::GetNextMoves(const ClusterBalanceInfo& cluster_info,
+ int max_moves_num,
+ vector<TableReplicaMove>* moves) {
+ DCHECK_LE(0, max_moves_num);
+ DCHECK(moves);
+
+ // Value of '0' is a shortcut for 'the possible maximum'.
+ if (max_moves_num == 0) {
+ max_moves_num = std::numeric_limits<decltype(max_moves_num)>::max();
+ }
+
+ // Copy cluster_info so we can apply moves to the copy.
+ ClusterBalanceInfo info(cluster_info);
+ moves->clear();
+ for (decltype(max_moves_num) i = 0; i < max_moves_num; ++i) {
+ boost::optional<TableReplicaMove> move;
+ RETURN_NOT_OK(GetNextMove(info, &move));
+ if (!move) {
+ // No replicas to move.
+ break;
+ }
+ RETURN_NOT_OK(ApplyMove(*move, &info));
+ moves->push_back(std::move(*move));
+ }
+ return Status::OK();
+}
+
+Status RebalancingAlgo::ApplyMove(const TableReplicaMove& move,
+ ClusterBalanceInfo* cluster_info) {
+ // Copy cluster_info so we can apply moves to the copy.
+ ClusterBalanceInfo info(*DCHECK_NOTNULL(cluster_info));
+
+ // Update the total counts.
+ RETURN_NOT_OK_PREPEND(
+ MoveOneReplica(move.from, move.to, &info.servers_by_total_replica_count),
+ Substitute("missing information on table $0", move.table_id));
+
+ // Find the balance info for the table.
+ auto& table_info_by_skew = info.table_info_by_skew;
+ TableBalanceInfo table_info;
+ bool found_table_info = false;
+ for (auto it = table_info_by_skew.begin(); it != table_info_by_skew.end(); ) {
+ TableBalanceInfo& info = it->second;
+ if (info.table_id != move.table_id) {
+ ++it;
+ continue;
+ }
+ std::swap(info, table_info);
+ it = table_info_by_skew.erase(it);
+ found_table_info = true;
+ break;
+ }
+ if (!found_table_info) {
+ return Status::NotFound(Substitute(
+ "missing table info for table $0", move.table_id));
+ }
+
+ // Update the table counts.
+ RETURN_NOT_OK_PREPEND(
+ MoveOneReplica(move.from, move.to, &table_info.servers_by_replica_count),
+ Substitute("missing information on table $0", move.table_id));
+
+ const auto max_count = table_info.servers_by_replica_count.rbegin()->first;
+ const auto min_count = table_info.servers_by_replica_count.begin()->first;
+ DCHECK_GE(max_count, min_count);
+
+ const int32_t skew = max_count - min_count;
+ table_info_by_skew.emplace(skew, std::move(table_info));
+ std::swap(*cluster_info, info);
+
+ return Status::OK();
+}
+
+TwoDimensionalGreedyAlgo::TwoDimensionalGreedyAlgo(EqualSkewOption opt)
+ : equal_skew_opt_(opt),
+ random_device_(),
+ generator_(random_device_()) {
+}
+
+Status TwoDimensionalGreedyAlgo::GetNextMove(
+ const ClusterBalanceInfo& cluster_info,
+ boost::optional<TableReplicaMove>* move) {
+ DCHECK(move);
+ // Set the output to none: this fits the short-circuit cases when there is
+ // an issue with the parameters or there aren't any moves to return.
+ *move = boost::none;
+
+ // Due to the nature of the table_info_by_skew container, the very last
+ // range represents the most unbalanced tables.
+ const auto& table_info_by_skew = cluster_info.table_info_by_skew;
+ if (table_info_by_skew.empty()) {
+ return Status::InvalidArgument("no table balance information");
+ }
+ const auto max_table_skew = table_info_by_skew.rbegin()->first;
+
+ const auto& servers_by_total_replica_count =
+ cluster_info.servers_by_total_replica_count;
+ if (servers_by_total_replica_count.empty()) {
+ return Status::InvalidArgument("no per-server replica count information");
+ }
+ const auto max_server_skew =
+ servers_by_total_replica_count.rbegin()->first -
+ servers_by_total_replica_count.begin()->first;
+
+ if (max_table_skew == 0) {
+ // Every table is balanced and any move will unbalance a table, so there
+ // is no potential for the greedy algorithm to balance the cluster.
+ return Status::OK();
+ }
+ if (max_table_skew <= 1 && max_server_skew <= 1) {
+ // Every table is balanced and the cluster as a whole is balanced.
+ return Status::OK();
+ }
+
+ // Among the tables with maximum skew, attempt to pick a table where there is
+ // a move that improves the table skew and the cluster skew, if possible. If
+ // not, attempt to pick a move that improves the table skew. If all tables
+ // are balanced, attempt to pick a move that preserves table balance and
+ // improves cluster skew.
+ const auto range = table_info_by_skew.equal_range(max_table_skew);
+ for (auto it = range.first; it != range.second; ++it) {
+ const TableBalanceInfo& tbi = it->second;
+ const auto& servers_by_table_replica_count = tbi.servers_by_replica_count;
+ if (servers_by_table_replica_count.empty()) {
+ return Status::InvalidArgument(Substitute(
+ "no information on replicas of table $0", tbi.table_id));
+ }
+
+ const auto min_replica_count = servers_by_table_replica_count.begin()->first;
+ const auto max_replica_count = servers_by_table_replica_count.rbegin()->first;
+ VLOG(1) << Substitute(
+ "balancing table $0 with replica count skew $1 "
+ "(min_replica_count: $2, max_replica_count: $3)",
+ tbi.table_id, table_info_by_skew.rbegin()->first,
+ min_replica_count, max_replica_count);
+
+ // Compute the intersection of the tablet servers most loaded for the table
+ // with the tablet servers most loaded overall, and likewise for least loaded.
+ // These are our ideal candidates for moving from and to, respectively.
+ int32_t max_count_table;
+ int32_t max_count_total;
+ vector<string> max_loaded;
+ vector<string> max_loaded_intersection;
+ RETURN_NOT_OK(GetIntersection(
+ ExtremumType::MAX,
+ servers_by_table_replica_count, servers_by_total_replica_count,
+ &max_count_table, &max_count_total,
+ &max_loaded, &max_loaded_intersection));
+ int32_t min_count_table;
+ int32_t min_count_total;
+ vector<string> min_loaded;
+ vector<string> min_loaded_intersection;
+ RETURN_NOT_OK(GetIntersection(
+ ExtremumType::MIN,
+ servers_by_table_replica_count, servers_by_total_replica_count,
+ &min_count_table, &min_count_total,
+ &min_loaded, &min_loaded_intersection));
+
+ VLOG(1) << Substitute("table-wise : min_count: $0, max_count: $1",
+ min_count_table, max_count_table);
+ VLOG(1) << Substitute("cluster-wise: min_count: $0, max_count: $1",
+ min_count_total, max_count_total);
+ if (PREDICT_FALSE(VLOG_IS_ON(1))) {
+ ostringstream s;
+ s << "[ ";
+ for (const auto& e : max_loaded_intersection) {
+ s << e << " ";
+ }
+ s << "]";
+ VLOG(1) << "max_loaded_intersection: " << s.str();
+
+ s.str("");
+ s << "[ ";
+ for (const auto& e : min_loaded_intersection) {
+ s << e << " ";
+ }
+ s << "]";
+ VLOG(1) << "min_loaded_intersection: " << s.str();
+ }
+ // Do not move replicas of a balanced table if the least (most) loaded
+ // servers overall do not intersect the servers hosting the least (most)
+ // replicas of the table. Moving a replica in that case might keep the
+ // cluster skew the same or make it worse while keeping the table balanced.
+ if ((max_count_table <= min_count_table + 1) &&
+ (min_loaded_intersection.empty() || max_loaded_intersection.empty())) {
+ continue;
+ }
+ if (equal_skew_opt_ == EqualSkewOption::PICK_RANDOM) {
+ shuffle(min_loaded.begin(), min_loaded.end(), generator_);
+ shuffle(min_loaded_intersection.begin(), min_loaded_intersection.end(),
+ generator_);
+ shuffle(max_loaded.begin(), max_loaded.end(), generator_);
+ shuffle(max_loaded_intersection.begin(), max_loaded_intersection.end(),
+ generator_);
+ }
+ const auto& min_loaded_uuid = min_loaded_intersection.empty()
+ ? min_loaded.front() : min_loaded_intersection.front();
+ const auto& max_loaded_uuid = max_loaded_intersection.empty()
+ ? max_loaded.back() : max_loaded_intersection.back();
+ VLOG(1) << Substitute("min_loaded_uuid: $0, max_loaded_uuid: $1",
+ min_loaded_uuid, max_loaded_uuid);
+ if (min_loaded_uuid == max_loaded_uuid) {
+ // Nothing to move.
+ continue;
+ }
+
+ // Move a replica of the selected table from a most loaded server to a
+ // least loaded server.
+ *move = { tbi.table_id, max_loaded_uuid, min_loaded_uuid };
+ break;
+ }
+
+ return Status::OK();
+}
+
+Status TwoDimensionalGreedyAlgo::GetIntersection(
+ ExtremumType extremum,
+ const ServersByCountMap& servers_by_table_replica_count,
+ const ServersByCountMap& servers_by_total_replica_count,
+ int32_t* replica_count_table,
+ int32_t* replica_count_total,
+ vector<string>* server_uuids,
+ vector<string>* intersection) {
+ DCHECK(extremum == ExtremumType::MIN || extremum == ExtremumType::MAX);
+ DCHECK(replica_count_table);
+ DCHECK(replica_count_total);
+ DCHECK(server_uuids);
+ DCHECK(intersection);
+ if (servers_by_table_replica_count.empty()) {
+ return Status::InvalidArgument("no information on table replica count");
+ }
+ if (servers_by_total_replica_count.empty()) {
+ return Status::InvalidArgument("no information on total replica count");
+ }
+
+ vector<string> server_uuids_table;
+ RETURN_NOT_OK(GetMinMaxLoadedServers(
+ servers_by_table_replica_count, extremum, replica_count_table,
+ &server_uuids_table));
+ sort(server_uuids_table.begin(), server_uuids_table.end());
+
+ vector<string> server_uuids_total;
+ RETURN_NOT_OK(GetMinMaxLoadedServers(
+ servers_by_total_replica_count, extremum, replica_count_total,
+ &server_uuids_total));
+ sort(server_uuids_total.begin(), server_uuids_total.end());
+
+ intersection->clear();
+ set_intersection(
+ server_uuids_table.begin(), server_uuids_table.end(),
+ server_uuids_total.begin(), server_uuids_total.end(),
+ back_inserter(*intersection));
+ server_uuids->swap(server_uuids_table);
+
+ return Status::OK();
+}
+
+Status TwoDimensionalGreedyAlgo::GetMinMaxLoadedServers(
+ const ServersByCountMap& servers_by_replica_count,
+ ExtremumType extremum,
+ int32_t* replica_count,
+ vector<string>* server_uuids) {
+ DCHECK(extremum == ExtremumType::MIN || extremum == ExtremumType::MAX);
+ DCHECK(replica_count);
+ DCHECK(server_uuids);
+
+ if (servers_by_replica_count.empty()) {
+ return Status::InvalidArgument("no balance information");
+ }
+ const auto count = (extremum == ExtremumType::MIN)
+ ? servers_by_replica_count.begin()->first
+ : servers_by_replica_count.rbegin()->first;
+ const auto range = servers_by_replica_count.equal_range(count);
+ std::transform(range.first, range.second, back_inserter(*server_uuids),
+ [](const ServersByCountMap::value_type& elem) {
+ return elem.second;
+ });
+ *replica_count = count;
+
+ return Status::OK();
+}
+
+} // namespace tools
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/kudu/blob/ccdcf6ce/src/kudu/tools/rebalance_algo.h
----------------------------------------------------------------------
diff --git a/src/kudu/tools/rebalance_algo.h b/src/kudu/tools/rebalance_algo.h
new file mode 100644
index 0000000..c3543fd
--- /dev/null
+++ b/src/kudu/tools/rebalance_algo.h
@@ -0,0 +1,176 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <cstdint>
+#include <map>
+#include <random>
+#include <string>
+#include <vector>
+
+#include "kudu/util/status.h"
+
+namespace boost {
+template <class T> class optional;
+} // namespace boost
+
+namespace kudu {
+namespace tools {
+
+// A map from a count of replicas to a server id. The "reversed" relationship
+// facilitates finding the servers with the maximum and minimum counts.
+typedef std::multimap<int32_t, std::string> ServersByCountMap;
+
+// Balance information for a table.
+struct TableBalanceInfo {
+ std::string table_id;
+
+ // Mapping table replica count -> tablet server.
+ //
+ // The table replica count of a tablet server is defined as the number of
+ // replicas belonging to the table hosted on the tablet server.
+ ServersByCountMap servers_by_replica_count;
+};
+
+// Balance information for a cluster.
+struct ClusterBalanceInfo {
+ // Mapping table skew -> table balance information. The "reversed"
+ // relationship facilitates finding the most and least skewed tables.
+ //
+ // The skew of a table is defined as the difference between its most
+ // occupied and least occupied tablet servers. Skew is considered to be
+ // improved both when the number of pairs of tablet servers exhibiting max
+ // skew between them decreases, or when the skew decreases.
+ std::multimap<int32_t, TableBalanceInfo> table_info_by_skew;
+
+ // Mapping total replica count -> tablet server.
+ //
+ // The total replica count of a tablet server is defined as the total number
+ // of replicas hosted on the tablet server.
+ ServersByCountMap servers_by_total_replica_count;
+};
+
+// A directive to move some replica of a table between two tablet servers.
+struct TableReplicaMove {
+ std::string table_id;
+ std::string from; // Unique identifier of the source tablet server.
+ std::string to; // Unique identifier of the target tablet server.
+};
+
+// A rebalancing algorithm, which orders replica moves aiming to balance a
+// cluster. The definition of "balance" depends on the algorithm.
+class RebalancingAlgo {
+ public:
+ virtual ~RebalancingAlgo() = default;
+
+ // The top-level method of the algorithm. Using information on the current
+ // balance state of the cluster in 'cluster_info', the algorithm populates
+ // the output parameter 'moves' with no more than 'max_moves_num' replica
+ // moves that aim to balance the cluster. Due to code conventions, 'int' is
+ // used instead of 'size_t' as 'max_moves_num' type; 'max_moves_num' must be
+ // non-negative value, where value of '0' is a shortcut for
+ // 'the possible maximum'.
+ //
+ // Once this method returns Status::OK() and leaves 'moves' empty, the cluster
+ // is considered balanced.
+ //
+ // 'moves' must be non-NULL.
+ virtual Status GetNextMoves(const ClusterBalanceInfo& cluster_info,
+ int max_moves_num,
+ std::vector<TableReplicaMove>* moves);
+ protected:
+ // Get the next rebalancing move from the algorithm. If there is no such move,
+ // the 'move' output parameter is set to 'boost::none'.
+ //
+ // 'move' must be non-NULL.
+ virtual Status GetNextMove(const ClusterBalanceInfo& cluster_info,
+ boost::optional<TableReplicaMove>* move) = 0;
+
+ // Update the balance state in 'cluster_info' with the outcome of the move
+ // 'move'. 'cluster_info' is an in-out parameter.
+ //
+ // 'cluster_info' must be non-NULL.
+ static Status ApplyMove(const TableReplicaMove& move,
+ ClusterBalanceInfo* cluster_info);
+};
+
+// A two-dimensional greedy rebalancing algorithm. From among moves that
+// decrease the skew of a most skewed table, it prefers ones that reduce the
+// skew of the cluster. A cluster is considered balanced when the skew of every
+// table is <= 1 and the skew of the cluster is <= 1.
+//
+// The skew of the cluster is defined as the difference between the maximum
+// total replica count over all tablet servers and the minimum total replica
+// count over all tablet servers.
+class TwoDimensionalGreedyAlgo : public RebalancingAlgo {
+ public:
+ // Policies for picking one element from equal-skew choices.
+ enum class EqualSkewOption {
+ PICK_FIRST,
+ PICK_RANDOM,
+ };
+ explicit TwoDimensionalGreedyAlgo(
+ EqualSkewOption opt = EqualSkewOption::PICK_RANDOM);
+
+ Status GetNextMove(const ClusterBalanceInfo& cluster_info,
+ boost::optional<TableReplicaMove>* move) override;
+
+ private:
+ enum class ExtremumType { MAX, MIN, };
+
+ // Compute the intersection of the least or most loaded tablet servers for a
+ // table with the least or most loaded tablet servers in the cluster:
+ // 'intersection' is populated with the ids of the tablet servers in the
+ // intersection of those two sets. The minimum or maximum number of replicas
+ // per tablet server for the table and for the cluster are output into
+ // 'replica_count_table' and 'replica_count_total', respectively. Identifiers
+ // of the tablet servers with '*replica_count_table' replicas of the table are
+ // output into the 'server_uuids' parameter. Whether to consider most or least
+ // loaded servers is controlled by 'extremum'. An empty 'intersection' on
+ // return means no intersection was found for the mentioned sets of the
+ // extremally loaded servers: in that case optimizing the load by table would
+ // not affect the extreme load by server.
+ //
+ // None of the output parameters may be NULL.
+ Status GetIntersection(
+ ExtremumType extremum,
+ const ServersByCountMap& servers_by_table_replica_count,
+ const ServersByCountMap& servers_by_total_replica_count,
+ int32_t* replica_count_table,
+ int32_t* replica_count_total,
+ std::vector<std::string>* server_uuids,
+ std::vector<std::string>* intersection);
+
+ // As determined by 'extremum', find the tablet servers with the minimum or
+ // maximum count in 'server_by_replica_counts'. The extremal count will be set
+ // in 'replica_count', while 'server_uuids' will be populated with the servers
+ // that host the extremal replica count.
+ //
+ // Neither of the output parameters may be NULL.
+ Status GetMinMaxLoadedServers(
+ const ServersByCountMap& servers_by_replica_count,
+ ExtremumType extremum,
+ int32_t* replica_count,
+ std::vector<std::string>* server_uuids);
+
+ const EqualSkewOption equal_skew_opt_;
+ std::random_device random_device_;
+ std::mt19937 generator_;
+};
+
+} // namespace tools
+} // namespace kudu
|