kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ale...@apache.org
Subject [2/2] kudu git commit: [tools] rebalancer in the kudu CLI tool
Date Thu, 07 Jun 2018 21:46:39 GMT
[tools] rebalancer in the kudu CLI tool

Introduced rebalancing tool as a part of the Kudu CLI tool.  To run
the rebalancer against a Kudu cluster:

  sudo -u kudu kudu cluster rebalance <master_addresses>

This changelist also contains unit tests to cover converting KsckResults
into ClusterBalanceInfo and integration tests to verify
the functionality of the rebalancer as a part of the Kudu CLI tool.

The code was tested against a 12-node cluster running the recent
Kudu 1.7 (3-4-3 replica management scheme) with about 12TB of data total
and a cluster running Kudu 1.4 (3-2-3 replica management scheme).

This tool works against Kudu clusters of version 1.4 and above.
It does not work against Kudu version 1.3 and earlier.

Change-Id: I269ea1dcb0b528ad9f03308bac6b8769e2141238
Reviewed-on: http://gerrit.cloudera.org:8080/10399
Tested-by: Kudu Jenkins
Reviewed-by: Will Berkeley <wdberkeley@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/220cd66f
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/220cd66f
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/220cd66f

Branch: refs/heads/master
Commit: 220cd66f22aab2c9b0696fb2333943ba7889eb6d
Parents: a0a61bd
Author: Alexey Serbin <aserbin@cloudera.com>
Authored: Fri May 11 11:13:47 2018 -0700
Committer: Alexey Serbin <aserbin@cloudera.com>
Committed: Thu Jun 7 21:45:17 2018 +0000

----------------------------------------------------------------------
 src/kudu/client/client.cc             |   3 +
 src/kudu/tools/CMakeLists.txt         |   4 +
 src/kudu/tools/kudu-admin-test.cc     | 203 +++++-
 src/kudu/tools/rebalance-test.cc      | 290 +++++++++
 src/kudu/tools/rebalance_algo-test.cc |  16 +-
 src/kudu/tools/rebalance_algo.cc      |   7 +-
 src/kudu/tools/rebalance_algo.h       |   5 +-
 src/kudu/tools/rebalancer.cc          | 963 +++++++++++++++++++++++++++++
 src/kudu/tools/rebalancer.h           | 295 +++++++++
 src/kudu/tools/tool_action_cluster.cc | 197 ++++--
 src/kudu/tools/tool_action_common.cc  |   2 +
 src/kudu/tools/tool_action_common.h   |   2 +-
 src/kudu/tools/tool_action_tablet.cc  | 408 ++----------
 src/kudu/tools/tool_replica_util.cc   | 504 +++++++++++++++
 src/kudu/tools/tool_replica_util.h    | 131 ++++
 15 files changed, 2595 insertions(+), 435 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/220cd66f/src/kudu/client/client.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/client.cc b/src/kudu/client/client.cc
index b58d127..87851e0 100644
--- a/src/kudu/client/client.cc
+++ b/src/kudu/client/client.cc
@@ -545,6 +545,9 @@ Status KuduClient::GetTablet(const string& tablet_id, KuduTablet** tablet) {
   if (resp.has_error()) {
     return StatusFromPB(resp.error().status());
   }
+  if (resp.tablet_locations_size() == 0) {
+    return Status::NotFound(Substitute("$0: tablet not found", tablet_id));
+  }
   if (resp.tablet_locations_size() != 1) {
     return Status::IllegalState(Substitute(
         "Expected only one tablet, but received $0",

http://git-wip-us.apache.org/repos/asf/kudu/blob/220cd66f/src/kudu/tools/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/tools/CMakeLists.txt b/src/kudu/tools/CMakeLists.txt
index ea38500..97d8bdb 100644
--- a/src/kudu/tools/CMakeLists.txt
+++ b/src/kudu/tools/CMakeLists.txt
@@ -85,9 +85,12 @@ target_link_libraries(ksck
 #######################################
 
 add_library(kudu_tools_rebalance
+  rebalancer.cc
   rebalance_algo.cc
+  tool_replica_util.cc
 )
 target_link_libraries(kudu_tools_rebalance
+  ksck
   kudu_common
   ${KUDU_BASE_LIBS}
 )
@@ -172,5 +175,6 @@ ADD_KUDU_TEST_DEPENDENCIES(kudu-tool-test
 ADD_KUDU_TEST(kudu-ts-cli-test)
 ADD_KUDU_TEST_DEPENDENCIES(kudu-ts-cli-test
   kudu)
+ADD_KUDU_TEST(rebalance-test)
 ADD_KUDU_TEST(rebalance_algo-test)
 ADD_KUDU_TEST(tool_action-test)

http://git-wip-us.apache.org/repos/asf/kudu/blob/220cd66f/src/kudu/tools/kudu-admin-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/kudu-admin-test.cc b/src/kudu/tools/kudu-admin-test.cc
index 4c0a5fd..42a266e 100644
--- a/src/kudu/tools/kudu-admin-test.cc
+++ b/src/kudu/tools/kudu-admin-test.cc
@@ -20,6 +20,7 @@
 #include <cstdio>
 #include <deque>
 #include <iterator>
+#include <memory>
 #include <ostream>
 #include <string>
 #include <unordered_map>
@@ -31,16 +32,13 @@
 #include <glog/logging.h>
 #include <gtest/gtest.h>
 
-#include "kudu/common/common.pb.h"
-#include "kudu/common/wire_protocol.pb.h"
-#include "kudu/consensus/consensus.pb.h"
-#include "kudu/master/master.pb.h"
-#include "kudu/tablet/metadata.pb.h"
-#include "kudu/util/pb_util.h"
 #include "kudu/client/client-test-util.h"
 #include "kudu/client/client.h"
 #include "kudu/client/schema.h"
 #include "kudu/client/shared_ptr.h"
+#include "kudu/common/common.pb.h"
+#include "kudu/common/wire_protocol.pb.h"
+#include "kudu/consensus/consensus.pb.h"
 #include "kudu/consensus/metadata.pb.h"
 #include "kudu/consensus/opid.pb.h"
 #include "kudu/consensus/quorum_util.h"
@@ -52,11 +50,15 @@
 #include "kudu/integration-tests/cluster_verifier.h"
 #include "kudu/integration-tests/test_workload.h"
 #include "kudu/integration-tests/ts_itest-base.h"
+#include "kudu/master/master.pb.h"
 #include "kudu/mini-cluster/external_mini_cluster.h"
+#include "kudu/tablet/metadata.pb.h"
 #include "kudu/tools/tool_test_util.h"
 #include "kudu/tserver/tablet_server-test-base.h"
 #include "kudu/util/monotime.h"
+#include "kudu/util/net/net_util.h"
 #include "kudu/util/net/sockaddr.h"
+#include "kudu/util/pb_util.h"
 #include "kudu/util/status.h"
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
@@ -90,6 +92,7 @@ using std::back_inserter;
 using std::copy;
 using std::deque;
 using std::string;
+using std::unique_ptr;
 using std::vector;
 using strings::Split;
 using strings::Substitute;
@@ -235,6 +238,7 @@ class MoveTabletParamTest :
 };
 
 TEST_P(MoveTabletParamTest, Test) {
+  const MonoDelta timeout = MonoDelta::FromSeconds(30);
   const auto& param = GetParam();
   const auto enable_kudu_1097 = std::get<0>(param);
   const auto downTS = std::get<1>(param);
@@ -243,8 +247,9 @@ TEST_P(MoveTabletParamTest, Test) {
   FLAGS_num_replicas = 3;
 
   vector<string> ts_flags, master_flags;
-  ts_flags = master_flags = { Substitute("--raft_prepare_replacement_before_eviction=$0",
-                                         enable_kudu_1097 == Kudu1097::Enable) };
+  ts_flags = master_flags = {
+      Substitute("--raft_prepare_replacement_before_eviction=$0",
+                 enable_kudu_1097 == Kudu1097::Enable) };
   NO_FATALS(BuildAndStart(ts_flags, master_flags));
 
   vector<string> tservers;
@@ -272,6 +277,7 @@ TEST_P(MoveTabletParamTest, Test) {
   workload.set_num_replicas(FLAGS_num_replicas);
   workload.set_num_write_threads(1);
   workload.set_write_batch_size(1);
+  workload.set_write_timeout_millis(timeout.ToMilliseconds());
   workload.Setup();
   workload.Start();
 
@@ -322,10 +328,10 @@ TEST_P(MoveTabletParamTest, Test) {
     }
     ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(/*num_voters=*/ FLAGS_num_replicas,
                                                   active_tservers_map[add],
-                                                  tablet_id_, MonoDelta::FromSeconds(30)));
+                                                  tablet_id_, timeout));
     NO_FATALS(WaitUntilCommittedConfigNumMembersIs(/*num_members=*/ FLAGS_num_replicas,
                                                    active_tservers_map[add],
-                                                   tablet_id_, MonoDelta::FromSeconds(30)));
+                                                   tablet_id_, timeout));
 
   }
   workload.StopAndJoin();
@@ -1310,5 +1316,182 @@ TEST_F(AdminCliTest, TestListTablesDetail) {
   }
 }
 
+// Make sure the rebalancer doesn't start if a tablet server is down.
+class RebalanceStartCriteriaTest :
+    public AdminCliTest,
+    public ::testing::WithParamInterface<Kudu1097> {
+};
+INSTANTIATE_TEST_CASE_P(, RebalanceStartCriteriaTest,
+                        ::testing::Values(Kudu1097::Disable, Kudu1097::Enable));
+TEST_P(RebalanceStartCriteriaTest, TabletServerIsDown) {
+  const bool is_343_scheme = (GetParam() == Kudu1097::Enable);
+  const vector<string> kMasterFlags = {
+    Substitute("--raft_prepare_replacement_before_eviction=$0", is_343_scheme),
+  };
+  const vector<string> kTserverFlags = {
+    Substitute("--raft_prepare_replacement_before_eviction=$0", is_343_scheme),
+  };
+
+  FLAGS_num_tablet_servers = 5;
+  NO_FATALS(BuildAndStart(kTserverFlags, kMasterFlags));
+
+  // Shutdown one of the tablet servers.
+  HostPort ts_host_port;
+  {
+    auto* ts = cluster_->tablet_server(0);
+    ASSERT_NE(nullptr, ts);
+    ts_host_port = ts->bound_rpc_hostport();
+    ts->Shutdown();
+  }
+
+  string err;
+  Status s = RunKuduTool({
+    "cluster",
+    "rebalance",
+    cluster_->master()->bound_rpc_addr().ToString()
+  }, nullptr, &err);
+  ASSERT_TRUE(s.IsRuntimeError()) << s.ToString();
+  const auto err_msg_pattern = Substitute(
+      "Illegal state: tablet server .* \\($0\\): "
+      "unacceptable health status UNAVAILABLE",
+      ts_host_port.ToString());
+  ASSERT_STR_MATCHES(err, err_msg_pattern);
+}
+
+// A test to verify that rebalancing works for both 3-4-3 and 3-2-3 replica
+// management schemes. During replica movement, a light workload is run against
+// every table being rebalanced. This test covers different replication factors.
+class RebalanceParamTest :
+    public AdminCliTest,
+    public ::testing::WithParamInterface<std::tuple<int, Kudu1097>> {
+};
+INSTANTIATE_TEST_CASE_P(, RebalanceParamTest,
+    ::testing::Combine(::testing::Values(1, 2, 3, 5),
+                       ::testing::Values(Kudu1097::Disable, Kudu1097::Enable)));
+TEST_P(RebalanceParamTest, Rebalance) {
+  if (!AllowSlowTests()) {
+    LOG(WARNING) << "test is skipped; set KUDU_ALLOW_SLOW_TESTS=1 to run";
+    return;
+  }
+
+  const auto& param = GetParam();
+  const auto kRepFactor = std::get<0>(param);
+  const auto is_343_scheme = (std::get<1>(param) == Kudu1097::Enable);
+  constexpr auto kNumTservers = 7;
+  constexpr auto kNumTables = 5;
+  const string table_name_pattern = "rebalance_test_table_$0";
+  constexpr auto kTserverUnresponsiveMs = 3000;
+  const auto timeout = MonoDelta::FromSeconds(30);
+  const vector<string> kMasterFlags = {
+    "--allow_unsafe_replication_factor",
+    Substitute("--raft_prepare_replacement_before_eviction=$0", is_343_scheme),
+    Substitute("--tserver_unresponsive_timeout_ms=$0", kTserverUnresponsiveMs),
+  };
+  const vector<string> kTserverFlags = {
+    Substitute("--raft_prepare_replacement_before_eviction=$0", is_343_scheme),
+  };
+
+  FLAGS_num_tablet_servers = kNumTservers;
+  FLAGS_num_replicas = kRepFactor;
+  NO_FATALS(BuildAndStart(kTserverFlags, kMasterFlags));
+
+  // Keep running only (kRepFactor + 1) tablet servers and shut down the rest.
+  for (auto i = kRepFactor + 1; i < kNumTservers; ++i) {
+    cluster_->tablet_server(i)->Shutdown();
+  }
+
+  // Wait for the catalog manager to understand that only (kRepFactor + 1)
+  // tablet servers are available.
+  SleepFor(MonoDelta::FromMilliseconds(5 * kTserverUnresponsiveMs / 4));
+
+  // Create few tables with their tablet replicas landing only on those
+  // (kRepFactor + 1) running tablet servers.
+  KuduSchema client_schema(client::KuduSchemaFromSchema(schema_));
+  for (auto i = 0; i < kNumTables; ++i) {
+    const string table_name = Substitute(table_name_pattern, i);
+    unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
+    ASSERT_OK(table_creator->table_name(table_name)
+              .schema(&client_schema)
+              .add_hash_partitions({ "key" }, 3)
+              .num_replicas(kRepFactor)
+              .Create());
+    ASSERT_OK(RunKuduTool({
+      "perf",
+      "loadgen",
+      cluster_->master()->bound_rpc_addr().ToString(),
+      Substitute("--table_name=$0", table_name),
+    }));
+  }
+
+  // Workloads aren't run for 3-2-3 replica movement with RF = 1 because
+  // the tablet is unavailable during the move until the target voter replica
+  // is up and running. That might take some time, and to avoid flakiness or
+  // setting longer timeouts, RF=1 replicas are moved with no concurrent
+  // workload running.
+  //
+  // TODO(aserbin): clarify why even with 3-4-3 it's a bit flaky now.
+  vector<unique_ptr<TestWorkload>> workloads;
+  //if (kRepFactor > 1 || is_343_scheme) {
+  if (kRepFactor > 1) {
+    for (auto i = 0; i < kNumTables; ++i) {
+      const string table_name = Substitute(table_name_pattern, i);
+      // The workload is light (1 thread, 1 op batches) so that new replicas
+      // bootstrap and converge quickly.
+      unique_ptr<TestWorkload> workload(new TestWorkload(cluster_.get()));
+      workload->set_table_name(table_name);
+      workload->set_num_replicas(kRepFactor);
+      workload->set_num_write_threads(1);
+      workload->set_write_batch_size(1);
+      workload->set_write_timeout_millis(timeout.ToMilliseconds());
+      workload->set_already_present_allowed(true);
+      workload->Setup();
+      workload->Start();
+      workloads.emplace_back(std::move(workload));
+    }
+  }
+
+  for (auto i = (kRepFactor + 1); i < kNumTservers; ++i) {
+    ASSERT_OK(cluster_->tablet_server(i)->Restart());
+  }
+
+  {
+    string out;
+    string err;
+    const Status s = RunKuduTool({
+      "cluster",
+      "rebalance",
+      cluster_->master()->bound_rpc_addr().ToString(),
+      "--move_single_replicas",
+    }, &out, &err);
+    ASSERT_TRUE(s.ok()) << s.ToString() << ":" << err;
+    ASSERT_STR_CONTAINS(out, "rebalancing is complete: cluster is balanced");
+  }
+
+  // Next run should report the cluster as balanced and no replica movement
+  // should be attempted.
+  {
+    string out;
+    string err;
+    const Status s = RunKuduTool({
+      "cluster",
+      "rebalance",
+      cluster_->master()->bound_rpc_addr().ToString(),
+      "--move_single_replicas",
+    }, &out, &err);
+    ASSERT_TRUE(s.ok()) << s.ToString() << ":" << err;
+    ASSERT_STR_CONTAINS(out,
+        "rebalancing is complete: cluster is balanced (moved 0 replicas)");
+  }
+
+  for (auto& workload : workloads) {
+    workload->StopAndJoin();
+  }
+
+  NO_FATALS(cluster_->AssertNoCrashes());
+
+  ClusterVerifier v(cluster_.get());
+  NO_FATALS(v.CheckCluster());
+}
+
 } // namespace tools
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/220cd66f/src/kudu/tools/rebalance-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/rebalance-test.cc b/src/kudu/tools/rebalance-test.cc
new file mode 100644
index 0000000..271dab1
--- /dev/null
+++ b/src/kudu/tools/rebalance-test.cc
@@ -0,0 +1,290 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/tools/rebalancer.h"
+
+#include <algorithm>
+#include <cstdint>
+#include <iostream>
+#include <limits>
+#include <map>
+#include <set>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/tools/ksck_results.h"
+#include "kudu/tools/rebalance_algo.h"
+#include "kudu/util/test_macros.h"
+
+using std::numeric_limits;
+using std::ostream;
+using std::set;
+using std::sort;
+using std::string;
+using std::vector;
+using strings::Substitute;
+
+namespace kudu {
+namespace tools {
+
+namespace {
+
+struct KsckReplicaSummaryInput {
+  std::string ts_uuid;
+  bool is_voter;
+};
+
+struct KsckServerHealthSummaryInput {
+  std::string uuid;
+};
+
+struct KsckTabletSummaryInput {
+  std::string id;
+  std::string table_id;
+  std::vector<KsckReplicaSummaryInput> replicas;
+};
+
+struct KsckTableSummaryInput {
+  std::string id;
+};
+
+// The input to build KsckResults data. Contains relevant sub-fields of the
+// KsckResults to use in the test.
+struct KsckResultsInput {
+  vector<KsckServerHealthSummaryInput> tserver_summaries;
+  vector<KsckTabletSummaryInput> tablet_summaries;
+  vector<KsckTableSummaryInput> table_summaries;
+};
+
+// The configuration for the test.
+struct KsckResultsTestConfig {
+  // The input for the test: ksck results.
+  KsckResultsInput input;
+
+  // The reference result of transformation of the 'input' field.
+  ClusterBalanceInfo ref_balance_info;
+};
+
+KsckResults GenerateKsckResults(KsckResultsInput input) {
+  KsckResults results;
+  {
+    vector<KsckServerHealthSummary>& summaries = results.tserver_summaries;
+    for (const auto& summary_input : input.tserver_summaries) {
+      KsckServerHealthSummary summary;
+      summary.uuid = summary_input.uuid;
+      summaries.emplace_back(std::move(summary));
+    }
+  }
+  {
+    vector<KsckTabletSummary>& summaries = results.tablet_summaries;
+    for (const auto& summary_input : input.tablet_summaries) {
+      KsckTabletSummary summary;
+      summary.id = summary_input.id;
+      summary.table_id = summary_input.table_id;
+      auto& replicas = summary.replicas;
+      for (const auto& replica_input : summary_input.replicas) {
+        KsckReplicaSummary replica;
+        replica.ts_uuid = replica_input.ts_uuid;
+        replica.is_voter = replica_input.is_voter;
+        replicas.emplace_back(std::move(replica));
+      }
+      summaries.emplace_back(std::move(summary));
+    }
+  }
+  {
+    vector<KsckTableSummary>& summaries = results.table_summaries;
+    for (const auto& summary_input : input.table_summaries) {
+      KsckTableSummary summary;
+      summary.id = summary_input.id;
+      summaries.emplace_back(summary);
+    }
+  }
+  return results;
+}
+
+} // anonymous namespace
+
+bool operator==(const TableBalanceInfo& lhs, const TableBalanceInfo& rhs) {
+  return lhs.servers_by_replica_count == rhs.servers_by_replica_count;
+}
+
+bool operator==(const ClusterBalanceInfo& lhs, const ClusterBalanceInfo& rhs) {
+  return
+      lhs.table_info_by_skew == rhs.table_info_by_skew &&
+      lhs.servers_by_total_replica_count == rhs.servers_by_total_replica_count;
+}
+
+ostream& operator<<(ostream& s, const ClusterBalanceInfo& info) {
+  s << "[";
+  for (const auto& elem : info.servers_by_total_replica_count) {
+    s << " " << elem.first << ":" << elem.second;
+  }
+  s << " ]; [";
+  for (const auto& elem : info.table_info_by_skew) {
+    s << " " << elem.first << ":{ " << elem.second.table_id
+      << " [";
+    for (const auto& e : elem.second.servers_by_replica_count) {
+      s << " " << e.first << ":" << e.second;
+    }
+    s << " ] }";
+  }
+  s << " ]";
+  return s;
+}
+
+// Test converting KsckResults result into ClusterBalanceInfo.
+TEST(KuduKsckRebalanceTest, KsckResultsToClusterBalanceInfo) {
+  const KsckResultsTestConfig kConfigs[] = {
+    // Empty
+    {
+      {},
+      {}
+    },
+    // Simple one tserver, one table, one tablet, RF=1.
+    {
+      {
+        { { "ts_0" }, },
+        { { "tablet_0", "table_a", { { "ts_0", true }, }, }, },
+        { { "table_a" }, },
+      },
+      {
+        { { 0, { "table_a", { { 1, "ts_0" }, } } }, },
+        { { 1, "ts_0" }, },
+      }
+    },
+    // Balanced configuration: three tservers, one table, one tablet, RF=3.
+    {
+      {
+        { { "ts_0" }, { "ts_1" }, { "ts_2" }, },
+        {
+          { "tablet_a0", "table_a", { { "ts_0", true }, }, },
+          { "tablet_a0", "table_a", { { "ts_1", true }, }, },
+          { "tablet_a0", "table_a", { { "ts_2", true }, }, },
+        },
+        { { "table_a", } },
+      },
+      {
+        {
+          { 0, { "table_a", {
+                { 1, "ts_2" },
+                { 1, "ts_1" },
+                { 1, "ts_0" },
+              }
+            }
+          },
+        },
+        {
+          { 1, "ts_2" }, { 1, "ts_1" }, { 1, "ts_0" },
+        },
+      }
+    },
+    // Simple unbalanced configuration.
+    {
+      {
+        { { "ts_0" }, { "ts_1" }, { "ts_2" }, },
+        {
+          { "tablet_a_0", "table_a", { { "ts_0", true }, }, },
+          { "tablet_b_0", "table_b", { { "ts_0", true }, }, },
+          { "tablet_c_0", "table_c", { { "ts_0", true }, }, },
+        },
+        { { { "table_a" }, { "table_b" }, { "table_c" }, } },
+      },
+      {
+        {
+          { 1, { "table_c", {
+                { 0, "ts_1" }, { 0, "ts_2" }, { 1, "ts_0" },
+              }
+            }
+          },
+          { 1, { "table_b", {
+                { 0, "ts_1" }, { 0, "ts_2" }, { 1, "ts_0" },
+              }
+            }
+          },
+          { 1, { "table_a", {
+                { 0, "ts_1" }, { 0, "ts_2" }, { 1, "ts_0" },
+              }
+            }
+          },
+        },
+        {
+          { 0, "ts_2" }, { 0, "ts_1" }, { 3, "ts_0" },
+        },
+      }
+    },
+    // table_a: 1 tablet with RF=3
+    // table_b: 3 tablets with RF=1
+    // table_c: 2 tablets with RF=1
+    {
+      {
+        { { "ts_0" }, { "ts_1" }, { "ts_2" }, },
+        {
+          { "tablet_a_0", "table_a", { { "ts_0", true }, }, },
+          { "tablet_a_0", "table_a", { { "ts_1", true }, }, },
+          { "tablet_a_0", "table_a", { { "ts_2", true }, }, },
+          { "tablet_b_0", "table_b", { { "ts_0", true }, }, },
+          { "tablet_b_1", "table_b", { { "ts_0", true }, }, },
+          { "tablet_b_2", "table_b", { { "ts_0", true }, }, },
+          { "tablet_c_0", "table_c", { { "ts_1", true }, }, },
+          { "tablet_c_1", "table_c", { { "ts_1", true }, }, },
+        },
+        { { { "table_a" }, { "table_b" }, { "table_c" }, } },
+      },
+      {
+        {
+          { 2, { "table_c", {
+                { 0, "ts_0" }, { 0, "ts_2" }, { 2, "ts_1" },
+              }
+            }
+          },
+          { 3, { "table_b", {
+                { 0, "ts_1" }, { 0, "ts_2" }, { 3, "ts_0" },
+              }
+            }
+          },
+          { 0, { "table_a", {
+                { 1, "ts_2" }, { 1, "ts_1" }, { 1, "ts_0" },
+              }
+            }
+          },
+        },
+        {
+          { 1, "ts_2" }, { 3, "ts_1" }, { 4, "ts_0" },
+        },
+      }
+    },
+  };
+
+  for (auto idx = 0; idx < arraysize(kConfigs); ++idx) {
+    SCOPED_TRACE(Substitute("test config index: $0", idx)); \
+    const auto& cfg = kConfigs[idx];
+    auto ksck_results = GenerateKsckResults(cfg.input);
+    ClusterBalanceInfo cbi;
+    Rebalancer rebalancer({});
+    ASSERT_OK(rebalancer.KsckResultsToClusterBalanceInfo(
+        ksck_results, Rebalancer::MovesInProgress(), &cbi));
+    ASSERT_EQ(cfg.ref_balance_info, cbi);
+  }
+}
+
+} // namespace tools
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/220cd66f/src/kudu/tools/rebalance_algo-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/rebalance_algo-test.cc b/src/kudu/tools/rebalance_algo-test.cc
index a074387..42b1665 100644
--- a/src/kudu/tools/rebalance_algo-test.cc
+++ b/src/kudu/tools/rebalance_algo-test.cc
@@ -189,15 +189,23 @@ string TestClusterConfigToDebugString(const TestClusterConfig& cfg) {
 }
 
 // Test the behavior of the algorithm when no input information is given.
-TEST(RebalanceAlgoUnitTest, EmptyClusterBalanceInfo) {
-  TwoDimensionalGreedyAlgo algo;
+TEST(RebalanceAlgoUnitTest, EmptyClusterBalanceInfoGetNextMoves) {
   vector<TableReplicaMove> moves;
   ClusterBalanceInfo info;
-  Status s = algo.GetNextMoves(info, 1, &moves);
-  ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
+  ASSERT_OK(TwoDimensionalGreedyAlgo().GetNextMoves(info, 0, &moves));
   EXPECT_TRUE(moves.empty());
 }
 
+// Test the behavior of the internal (non-public) algorithm's method
+// GetNextMove() when no input information is given.
+TEST(RebalanceAlgoUnitTest, EmptyClusterBalanceInfoGetNextMove) {
+  boost::optional<TableReplicaMove> move;
+  ClusterBalanceInfo info;
+  const auto s = TwoDimensionalGreedyAlgo().GetNextMove(info, &move);
+  ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
+  EXPECT_EQ(boost::none, move);
+}
+
 // Various scenarios of balanced configurations where no moves are expected
 // to happen.
 TEST(RebalanceAlgoUnitTest, AlreadyBalanced) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/220cd66f/src/kudu/tools/rebalance_algo.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/rebalance_algo.cc b/src/kudu/tools/rebalance_algo.cc
index ec099be..7f3cba6 100644
--- a/src/kudu/tools/rebalance_algo.cc
+++ b/src/kudu/tools/rebalance_algo.cc
@@ -117,10 +117,15 @@ Status RebalancingAlgo::GetNextMoves(const ClusterBalanceInfo& cluster_info,
   if (max_moves_num == 0) {
     max_moves_num = std::numeric_limits<decltype(max_moves_num)>::max();
   }
+  moves->clear();
+  if (cluster_info.table_info_by_skew.empty() &&
+      cluster_info.servers_by_total_replica_count.empty()) {
+    // Nothing to balance: cluster is empty. Leave 'moves' empty and return.
+    return Status::OK();
+  }
 
   // Copy cluster_info so we can apply moves to the copy.
   ClusterBalanceInfo info(cluster_info);
-  moves->clear();
   for (decltype(max_moves_num) i = 0; i < max_moves_num; ++i) {
     boost::optional<TableReplicaMove> move;
     RETURN_NOT_OK(GetNextMove(info, &move));

http://git-wip-us.apache.org/repos/asf/kudu/blob/220cd66f/src/kudu/tools/rebalance_algo.h
----------------------------------------------------------------------
diff --git a/src/kudu/tools/rebalance_algo.h b/src/kudu/tools/rebalance_algo.h
index a6faac6..1db0d07 100644
--- a/src/kudu/tools/rebalance_algo.h
+++ b/src/kudu/tools/rebalance_algo.h
@@ -134,6 +134,9 @@ class TwoDimensionalGreedyAlgo : public RebalancingAlgo {
  private:
   enum class ExtremumType { MAX, MIN, };
 
+  FRIEND_TEST(RebalanceAlgoUnitTest, RandomizedTest);
+  FRIEND_TEST(RebalanceAlgoUnitTest, EmptyClusterBalanceInfoGetNextMove);
+
   // Compute the intersection of the least or most loaded tablet servers for a
   // table with the least or most loaded tablet servers in the cluster:
   // 'intersection' is populated with the ids of the tablet servers in the
@@ -172,8 +175,6 @@ class TwoDimensionalGreedyAlgo : public RebalancingAlgo {
   const EqualSkewOption equal_skew_opt_;
   std::random_device random_device_;
   std::mt19937 generator_;
-
-  FRIEND_TEST(RebalanceAlgoUnitTest, RandomizedTest);
 };
 
 } // namespace tools

http://git-wip-us.apache.org/repos/asf/kudu/blob/220cd66f/src/kudu/tools/rebalancer.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/rebalancer.cc b/src/kudu/tools/rebalancer.cc
new file mode 100644
index 0000000..9dae39c
--- /dev/null
+++ b/src/kudu/tools/rebalancer.cc
@@ -0,0 +1,963 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/tools/rebalancer.h"
+
+#include <algorithm>
+#include <cstddef>
+#include <cstdint>
+#include <iostream>
+#include <iterator>
+#include <limits>
+#include <map>
+#include <memory>
+#include <numeric>
+#include <random>
+#include <set>
+#include <string>
+#include <unordered_map>
+#include <unordered_set>
+#include <utility>
+#include <vector>
+
+#include <boost/optional/optional.hpp>
+#include <glog/logging.h>
+
+#include "kudu/client/client.h"
+#include "kudu/gutil/basictypes.h"
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/tools/ksck.h"
+#include "kudu/tools/ksck_remote.h"
+#include "kudu/tools/ksck_results.h"
+#include "kudu/tools/rebalance_algo.h"
+#include "kudu/tools/tool_action_common.h"
+#include "kudu/tools/tool_replica_util.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/status.h"
+
+using kudu::client::KuduClient;
+using kudu::client::KuduClientBuilder;
+using std::accumulate;
+using std::endl;
+using std::inserter;
+using std::ostream;
+using std::map;
+using std::multimap;
+using std::numeric_limits;
+using std::pair;
+using std::set_difference;
+using std::set;
+using std::shared_ptr;
+using std::sort;
+using std::string;
+using std::to_string;
+using std::unordered_map;
+using std::unordered_set;
+using std::vector;
+using strings::Substitute;
+
+namespace kudu {
+namespace tools {
+
+Rebalancer::Config::Config(
+    std::vector<std::string> master_addresses,
+    std::vector<std::string> table_filters,
+    size_t max_moves_per_server,
+    size_t max_staleness_interval_sec,
+    int64_t max_run_time_sec,
+    bool move_rf1_replicas,
+    bool output_replica_distribution_details)
+        : master_addresses(std::move(master_addresses)),
+          table_filters(std::move(table_filters)),
+          max_moves_per_server(max_moves_per_server),
+          max_staleness_interval_sec(max_staleness_interval_sec),
+          max_run_time_sec(max_run_time_sec),
+          move_rf1_replicas(move_rf1_replicas),
+          output_replica_distribution_details(output_replica_distribution_details) {
+  DCHECK_GE(max_moves_per_server, 0);
+}
+
+Rebalancer::Rebalancer(const Config& config)
+    : config_(config),
+      random_device_(),
+      random_generator_(random_device_()) {
+}
+
+Status Rebalancer::PrintStats(std::ostream& out) {
+  // First, report on the current balance state of the cluster.
+  RETURN_NOT_OK(ResetKsck());
+  ignore_result(ksck_->Run());
+  const KsckResults& results = ksck_->results();
+
+  ClusterBalanceInfo cbi;
+  RETURN_NOT_OK(KsckResultsToClusterBalanceInfo(results, MovesInProgress(), &cbi));
+
+  // Per-server replica distribution stats.
+  {
+    out << "Per-server replica distribution summary:" << endl;
+    DataTable summary({"Statistic", "Value"});
+
+    const auto& servers_load_info = cbi.servers_by_total_replica_count;
+    if (servers_load_info.empty()) {
+      summary.AddRow({ "N/A", "N/A" });
+    } else {
+      const int64_t total_replica_count = accumulate(
+          servers_load_info.begin(), servers_load_info.end(), 0L,
+          [](int64_t sum, const pair<int32_t, string>& elem) {
+            return sum + elem.first;
+          });
+
+      const auto min_replica_count = servers_load_info.begin()->first;
+      const auto max_replica_count = servers_load_info.rbegin()->first;
+      const double avg_replica_count =
+          1.0 * total_replica_count / servers_load_info.size();
+
+      summary.AddRow({ "Minimum Replica Count", to_string(min_replica_count) });
+      summary.AddRow({ "Maximum Replica Count", to_string(max_replica_count) });
+      summary.AddRow({ "Average Replica Count", to_string(avg_replica_count) });
+    }
+    RETURN_NOT_OK(summary.PrintTo(out));
+    out << endl;
+
+    if (config_.output_replica_distribution_details) {
+      const auto& tserver_summaries = results.tserver_summaries;
+      unordered_map<string, string> tserver_endpoints;
+      for (const auto& summary : tserver_summaries) {
+        tserver_endpoints.emplace(summary.uuid, summary.address);
+      }
+
+      out << "Per-server replica distribution details:" << endl;
+      DataTable servers_info({ "UUID", "Address", "Replica Count" });
+      for (const auto& elem : servers_load_info) {
+        const auto& id = elem.second;
+        servers_info.AddRow({ id, tserver_endpoints[id], to_string(elem.first) });
+      }
+      RETURN_NOT_OK(servers_info.PrintTo(out));
+      out << endl;
+    }
+  }
+
+  // Per-table replica distribution stats.
+  {
+    out << "Per-table replica distribution summary:" << endl;
+    DataTable summary({ "Replica Skew", "Value" });
+    const auto& table_skew_info = cbi.table_info_by_skew;
+    if (table_skew_info.empty()) {
+      summary.AddRow({ "N/A", "N/A" });
+    } else {
+      const auto min_table_skew = table_skew_info.begin()->first;
+      const auto max_table_skew = table_skew_info.rbegin()->first;
+      const int64_t sum_table_skew = accumulate(
+          table_skew_info.begin(), table_skew_info.end(), 0L,
+          [](int64_t sum, const pair<int32_t, TableBalanceInfo>& elem) {
+            return sum + elem.first;
+          });
+      double avg_table_skew = 1.0 * sum_table_skew / table_skew_info.size();
+
+      summary.AddRow({ "Minimum", to_string(min_table_skew) });
+      summary.AddRow({ "Maximum", to_string(max_table_skew) });
+      summary.AddRow({ "Average", to_string(avg_table_skew) });
+    }
+    RETURN_NOT_OK(summary.PrintTo(out));
+    out << endl;
+
+    if (config_.output_replica_distribution_details) {
+      const auto& table_summaries = results.table_summaries;
+      unordered_map<string, const KsckTableSummary*> table_info;
+      for (const auto& summary : table_summaries) {
+        table_info.emplace(summary.id, &summary);
+      }
+      out << "Per-table replica distribution details:" << endl;
+      DataTable skew(
+          { "Table Id", "Replica Count", "Replica Skew", "Table Name" });
+      for (const auto& elem : table_skew_info) {
+        const auto& table_id = elem.second.table_id;
+        const auto it = table_info.find(table_id);
+        const auto* table_summary =
+            (it == table_info.end()) ? nullptr : it->second;
+        const auto& table_name = table_summary ? table_summary->name : "";
+        const auto total_replica_count = table_summary
+            ? table_summary->replication_factor * table_summary->TotalTablets()
+            : 0;
+        skew.AddRow({ table_id,
+                      to_string(total_replica_count),
+                      to_string(elem.first),
+                      table_name });
+      }
+      RETURN_NOT_OK(skew.PrintTo(out));
+      out << endl;
+    }
+  }
+
+  return Status::OK();
+}
+
+Status Rebalancer::Run(RunStatus* result_status, size_t* moves_count) {
+  DCHECK(result_status);
+  *result_status = RunStatus::UNKNOWN;
+
+  boost::optional<MonoTime> deadline;
+  if (config_.max_run_time_sec != 0) {
+    deadline = MonoTime::Now() + MonoDelta::FromSeconds(config_.max_run_time_sec);
+  }
+
+  Runner runner(config_.max_moves_per_server, deadline);
+  RETURN_NOT_OK(runner.Init(config_.master_addresses));
+
+  const MonoDelta max_staleness_delta =
+      MonoDelta::FromSeconds(config_.max_staleness_interval_sec);
+  MonoTime staleness_start = MonoTime::Now();
+  bool is_timed_out = false;
+  bool resync_state = false;
+  while (!is_timed_out) {
+    if (resync_state) {
+      resync_state = false;
+      MonoDelta staleness_delta = MonoTime::Now() - staleness_start;
+      if (staleness_delta > max_staleness_delta) {
+        LOG(INFO) << Substitute("detected a staleness period of $0", staleness_delta.ToString());
+        return Status::Incomplete(Substitute(
+            "stalled with no progress for more than $0 seconds, aborting",
+            max_staleness_delta.ToString()));
+      }
+      // The actual re-synchronization happens during GetNextMoves() below:
+      // updated info is collected from the cluster and fed into the algorithm.
+      LOG(INFO) << "re-synchronizing cluster state";
+    }
+
+    {
+      vector<Rebalancer::ReplicaMove> replica_moves;
+      RETURN_NOT_OK(GetNextMoves(runner.scheduled_moves(), &replica_moves));
+      if (replica_moves.empty() && runner.scheduled_moves().empty()) {
+        // No moves are left: done!
+        break;
+      }
+
+      // Filter out moves for tablets which already have operations in progress.
+      FilterMoves(runner.scheduled_moves(), &replica_moves);
+      runner.LoadMoves(std::move(replica_moves));
+    }
+
+    auto has_errors = false;
+    while (!is_timed_out) {
+      auto is_scheduled = runner.ScheduleNextMove(&has_errors, &is_timed_out);
+      resync_state |= has_errors;
+      if (resync_state || is_timed_out) {
+        break;
+      }
+      if (is_scheduled) {
+        // Reset the start of the staleness interval: there was some progress
+        // in scheduling new move operations.
+        staleness_start = MonoTime::Now();
+
+        // Continue scheduling available move operations while there is enough
+        // capacity, i.e. until number of pending move operations on every
+        // involved tablet server reaches max_moves_per_server. Once no more
+        // operations can be scheduled, it's time to check for their status.
+        continue;
+      }
+
+      // Poll for the status of pending operations. If some of the in-flight
+      // operations are complete, it might be possible to schedule new ones
+      // by calling Runner::ScheduleNextMove().
+      auto has_updates = runner.UpdateMovesInProgressStatus(&has_errors,
+                                                            &is_timed_out);
+      if (has_updates) {
+        // Reset the start of the staleness interval: there was some updates
+        // on the status of scheduled move operations.
+        staleness_start = MonoTime::Now();
+      }
+      resync_state |= has_errors;
+      if (resync_state || is_timed_out || !has_updates) {
+        // If there were errors while trying to get the statuses of pending
+        // operations it's necessary to re-synchronize the state of the cluster:
+        // most likely something has changed, so it's better to get a new set
+        // of planned moves.
+        break;
+      }
+
+      // Sleep a bit before going next cycle of status polling.
+      SleepFor(MonoDelta::FromMilliseconds(200));
+    }
+  }
+
+  *result_status = is_timed_out ? RunStatus::TIMED_OUT
+                                : RunStatus::CLUSTER_IS_BALANCED;
+  if (moves_count) {
+    *moves_count = runner.moves_count();
+  }
+
+  return Status::OK();
+}
+
+// Transform the information on the cluster returned by ksck into
+// ClusterBalanceInfo that could be consumed by the rebalancing algorithm,
+// taking into account pending replica movement operations. The pending
+// operations are evaluated against the state of the cluster in accordance with
+// the ksck results, and if the replica movement operations are still in
+// progress, then they are interpreted as successfully completed. The idea is to
+// prevent the algorithm outputting the same moves again while some of the
+// moves recommended at prior steps are still in progress.
+Status Rebalancer::KsckResultsToClusterBalanceInfo(
+    const KsckResults& ksck_info,
+    const MovesInProgress& pending_moves,
+    ClusterBalanceInfo* cbi) const {
+  DCHECK(cbi);
+
+  // tserver UUID --> total replica count of all table's tablets at the server
+  typedef unordered_map<string, int32_t> TableReplicasAtServer;
+
+  // The result table balance information to build.
+  ClusterBalanceInfo balance_info;
+
+  unordered_map<string, int32_t> tserver_replicas_count;
+  unordered_map<string, TableReplicasAtServer> table_replicas_info;
+
+  // Build a set of tables with RF=1 (single replica tables).
+  unordered_set<string> rf1_tables;
+  if (!config_.move_rf1_replicas) {
+    for (const auto& s : ksck_info.table_summaries) {
+      if (s.replication_factor == 1) {
+        rf1_tables.emplace(s.id);
+      }
+    }
+  }
+
+  for (const auto& s : ksck_info.tserver_summaries) {
+    if (s.health != KsckServerHealth::HEALTHY) {
+      LOG(INFO) << Substitute("skipping tablet server $0 ($1) because of its "
+                              "non-HEALTHY status ($2)",
+                              s.uuid, s.address,
+                              ServerHealthToString(s.health));
+      continue;
+    }
+    tserver_replicas_count.emplace(s.uuid, 0);
+  }
+
+  for (const auto& tablet : ksck_info.tablet_summaries) {
+    if (!config_.move_rf1_replicas) {
+      if (rf1_tables.find(tablet.table_id) != rf1_tables.end()) {
+        LOG(INFO) << Substitute("tablet $0 of table '$0' ($1) has single replica, skipping",
+                                tablet.id, tablet.table_name, tablet.table_id);
+        continue;
+      }
+    }
+
+    // Check if it's one of the tablets which are currently being rebalanced.
+    // If so, interpret the move as successfully completed, updating the
+    // replica counts correspondingly.
+    const auto it_pending_moves = pending_moves.find(tablet.id);
+
+    for (const auto& ri : tablet.replicas) {
+      // Increment total count of replicas at the tablet server.
+      auto it = tserver_replicas_count.find(ri.ts_uuid);
+      if (it == tserver_replicas_count.end()) {
+        string msg = Substitute("skipping replica at tserver $0", ri.ts_uuid);
+        if (ri.ts_address) {
+          msg += " (" + *ri.ts_address + ")";
+        }
+        msg += " since it's not reported among known tservers";
+        LOG(INFO) << msg;
+        continue;
+      }
+      bool do_count_replica = true;
+      if (it_pending_moves != pending_moves.end() &&
+          tablet.result == KsckCheckResult::RECOVERING) {
+        const auto& move_info = it_pending_moves->second;
+        bool is_target_replica_present = false;
+        // Verify that the target replica is present in the config.
+        for (const auto& tr : tablet.replicas) {
+          if (tr.ts_uuid == move_info.ts_uuid_to) {
+            is_target_replica_present = true;
+            break;
+          }
+        }
+        if (move_info.ts_uuid_from == ri.ts_uuid && is_target_replica_present) {
+          // It seems both the source and the destination replicas of the
+          // scheduled replica movement operation are still in the config.
+          // That's a sign that the move operation hasn't yet completed.
+          // As explained above, let's interpret the move as successfully
+          // completed, so the source replica should not be counted in.
+          do_count_replica = false;
+        }
+      }
+      if (do_count_replica) {
+        it->second++;
+      }
+
+      auto table_ins = table_replicas_info.emplace(
+          tablet.table_id, TableReplicasAtServer());
+      TableReplicasAtServer& replicas_at_server = table_ins.first->second;
+
+      auto replicas_ins = replicas_at_server.emplace(ri.ts_uuid, 0);
+      if (do_count_replica) {
+        replicas_ins.first->second++;
+      }
+    }
+  }
+
+  // Check for the consistency of information derived from the ksck report.
+  for (const auto& elem : tserver_replicas_count) {
+    const auto& ts_uuid = elem.first;
+    int32_t count_by_table_info = 0;
+    for (auto& e : table_replicas_info) {
+      count_by_table_info += e.second[ts_uuid];
+    }
+    if (elem.second != count_by_table_info) {
+      return Status::Corruption("inconsistent cluster state returned by ksck");
+    }
+  }
+
+  // Populate ClusterBalanceInfo::servers_by_total_replica_count
+  auto& servers_by_count = balance_info.servers_by_total_replica_count;
+  for (const auto& elem : tserver_replicas_count) {
+    servers_by_count.emplace(elem.second, elem.first);
+  }
+
+  // Populate ClusterBalanceInfo::table_info_by_skew
+  auto& table_info_by_skew = balance_info.table_info_by_skew;
+  for (const auto& elem : table_replicas_info) {
+    const auto& table_id = elem.first;
+    int32_t max_count = numeric_limits<int32_t>::min();
+    int32_t min_count = numeric_limits<int32_t>::max();
+    TableBalanceInfo tbi;
+    tbi.table_id = table_id;
+    for (const auto& e : elem.second) {
+      const auto& ts_uuid = e.first;
+      const auto replica_count = e.second;
+      tbi.servers_by_replica_count.emplace(replica_count, ts_uuid);
+      max_count = std::max(replica_count, max_count);
+      min_count = std::min(replica_count, min_count);
+    }
+    table_info_by_skew.emplace(max_count - min_count, std::move(tbi));
+  }
+  *cbi = std::move(balance_info);
+
+  return Status::OK();
+}
+
+// Run one step of the rebalancer. Due to the inherent restrictions of the
+// rebalancing engine, no more than one replica per tablet is moved during
+// one step of the rebalancing.
+Status Rebalancer::GetNextMoves(const MovesInProgress& pending_moves,
+                                vector<ReplicaMove>* replica_moves) {
+  RETURN_NOT_OK(ResetKsck());
+  ignore_result(ksck_->Run());
+  const auto& ksck_info = ksck_->results();
+
+  // For simplicity, allow to run the rebalancing only when all tablet servers
+  // are in good shape. Otherwise, the rebalancing might interfere with the
+  // automatic re-replication or get unexpected errors while moving replicas.
+  for (const auto& s : ksck_info.tserver_summaries) {
+    if (s.health != KsckServerHealth::HEALTHY) {
+      return Status::IllegalState(
+          Substitute("tablet server $0 ($1): unacceptable health status $2",
+                     s.uuid, s.address, ServerHealthToString(s.health)));
+    }
+  }
+
+  // The number of operations to output by the algorithm. Those will be
+  // translated into concrete tablet replica movement operations, the output of
+  // this method.
+  const size_t max_moves = config_.max_moves_per_server *
+      ksck_info.tserver_summaries.size() * 5;
+
+  replica_moves->clear();
+  vector<TableReplicaMove> moves;
+  {
+    ClusterBalanceInfo cbi;
+    RETURN_NOT_OK(KsckResultsToClusterBalanceInfo(ksck_info, pending_moves, &cbi));
+    RETURN_NOT_OK(algo_.GetNextMoves(std::move(cbi), max_moves, &moves));
+  }
+  if (moves.empty()) {
+    // No suitable moves were found: the cluster described by 'cbi' is balanced,
+    // assuming the pending moves, if any, will succeed.
+    return Status::OK();
+  }
+  unordered_set<string> tablets_in_move;
+  std::transform(pending_moves.begin(), pending_moves.end(),
+                 inserter(tablets_in_move, tablets_in_move.begin()),
+                 [](const MovesInProgress::value_type& elem) {
+                   return elem.first;
+                 });
+  for (const auto& move : moves) {
+    vector<string> tablet_ids;
+    RETURN_NOT_OK(FindReplicas(move, ksck_info, &tablet_ids));
+    // Shuffle the set of the tablet identifiers: that's to achieve even spread
+    // of moves across tables with the same skew.
+    std::shuffle(tablet_ids.begin(), tablet_ids.end(), random_generator_);
+    string move_tablet_id;
+    for (const auto& tablet_id : tablet_ids) {
+      if (tablets_in_move.find(tablet_id) == tablets_in_move.end()) {
+        // For now, choose the very first tablet that does not have replicas
+        // in move. Later on, additional logic might be added to find
+        // the best candidate.
+        move_tablet_id = tablet_id;
+        break;
+      }
+    }
+    if (move_tablet_id.empty()) {
+      LOG(WARNING) << Substitute(
+          "table $0: could not find any suitable replica to move "
+          "from server $1 to server $2", move.table_id, move.from, move.to);
+      continue;
+    }
+    ReplicaMove info;
+    info.tablet_uuid = move_tablet_id;
+    info.ts_uuid_from = move.from;
+    info.ts_uuid_to = move.to;
+    replica_moves->emplace_back(std::move(info));
+    // Mark the tablet as 'has a replica in move'.
+    tablets_in_move.emplace(move_tablet_id);
+  }
+
+  return Status::OK();
+}
+
+// Given high-level description of moves, find tablets with replicas at the
+// corresponding tablet servers to satisfy those high-level descriptions.
+// The idea is to find all tablets of the specified table that would have a
+// replica at the source server, but would not have a replica at the destination
+// server. That is to satisfy the restriction of having no more than one replica
+// of the same tablet per server.
+//
+// An additional constraint: it's better not to move leader replicas, if
+// possible. If a client has a write operation in progress, moving leader
+// replicas of affected tablets would make the client to re-resolve new leaders
+// and retry the operations. Moving leader replicas is used as last resort
+// when no other candidates are left.
+Status Rebalancer::FindReplicas(const TableReplicaMove& move,
+                                const KsckResults& ksck_info,
+                                vector<string>* tablet_ids) const {
+  const auto& table_id = move.table_id;
+
+  // Tablet ids of replicas on the source tserver that are non-leaders.
+  vector<string> tablet_uuids_src;
+  // Tablet ids of replicas on the source tserver that are leaders.
+  vector<string> tablet_uuids_src_leaders;
+  // UUIDs of tablets of the selected table at the destination tserver.
+  vector<string> tablet_uuids_dst;
+
+  for (const auto& tablet_summary : ksck_info.tablet_summaries) {
+    if (tablet_summary.table_id != table_id) {
+      continue;
+    }
+    if (tablet_summary.result != KsckCheckResult::HEALTHY) {
+      VLOG(1) << Substitute("table $0: not considering replicas of tablet $1 "
+                            "as candidates for movement since the tablet's "
+                            "status is '$2'",
+                            table_id, tablet_summary.id,
+                            KsckCheckResultToString(tablet_summary.result));
+      continue;
+    }
+    for (const auto& replica_summary : tablet_summary.replicas) {
+      if (replica_summary.ts_uuid != move.from &&
+          replica_summary.ts_uuid != move.to) {
+        continue;
+      }
+      if (!replica_summary.ts_healthy) {
+        VLOG(1) << Substitute("table $0: not considering replica movement "
+                              "from $1 to $2 since server $3 is not healthy",
+                              table_id,
+                              move.from, move.to, replica_summary.ts_uuid);
+        continue;
+      }
+      if (replica_summary.ts_uuid == move.from) {
+        if (replica_summary.is_leader) {
+          tablet_uuids_src_leaders.emplace_back(tablet_summary.id);
+        } else {
+          tablet_uuids_src.emplace_back(tablet_summary.id);
+        }
+      } else {
+        DCHECK_EQ(move.to, replica_summary.ts_uuid);
+        tablet_uuids_dst.emplace_back(tablet_summary.id);
+      }
+    }
+  }
+  sort(tablet_uuids_src.begin(), tablet_uuids_src.end());
+  sort(tablet_uuids_dst.begin(), tablet_uuids_dst.end());
+
+  vector<string> tablet_uuids;
+  set_difference(
+      tablet_uuids_src.begin(), tablet_uuids_src.end(),
+      tablet_uuids_dst.begin(), tablet_uuids_dst.end(),
+      inserter(tablet_uuids, tablet_uuids.begin()));
+
+  if (!tablet_uuids.empty()) {
+    // If there are tablets with non-leader replicas at the source server,
+    // those are the best candidates for movement.
+    tablet_ids->swap(tablet_uuids);
+    return Status::OK();
+  }
+
+  // If no tablets with non-leader replicas were found, resort to tablets with
+  // leader replicas at the source server.
+  DCHECK(tablet_uuids.empty());
+  sort(tablet_uuids_src_leaders.begin(), tablet_uuids_src_leaders.end());
+  set_difference(
+      tablet_uuids_src_leaders.begin(), tablet_uuids_src_leaders.end(),
+      tablet_uuids_dst.begin(), tablet_uuids_dst.end(),
+      inserter(tablet_uuids, tablet_uuids.begin()));
+
+  tablet_ids->swap(tablet_uuids);
+
+  return Status::OK();
+}
+
+Status Rebalancer::ResetKsck() {
+  shared_ptr<KsckCluster> cluster;
+  RETURN_NOT_OK_PREPEND(
+      RemoteKsckCluster::Build(config_.master_addresses, &cluster),
+      "unable to build KsckCluster");
+  ksck_.reset(new Ksck(cluster));
+  ksck_->set_table_filters(config_.table_filters);
+  return Status::OK();
+}
+
+void Rebalancer::FilterMoves(const MovesInProgress& scheduled_moves,
+                             vector<ReplicaMove>* replica_moves) {
+  unordered_set<string> tablet_uuids;
+  vector<ReplicaMove> filtered_replica_moves;
+  for (auto&& move_op : *replica_moves) {
+    const auto& tablet_uuid = move_op.tablet_uuid;
+    if (scheduled_moves.find(tablet_uuid) != scheduled_moves.end()) {
+      // There is a move operation in progress for the tablet, don't schedule
+      // another one.
+      continue;
+    }
+    if (PREDICT_TRUE(tablet_uuids.emplace(tablet_uuid).second)) {
+      filtered_replica_moves.push_back(std::move(move_op));
+    } else {
+      // Rationale behind the unique tablet constraint: the implementation of
+      // the Run() method is designed to re-order operations suggested by the
+      // high-level algorithm to use the op-count-per-tablet-server capacity
+      // as much as possible. Right now, the RunStep() method outputs only one
+      // move operation per tablet in every batch. The code below is to
+      // enforce the contract between Run() and RunStep() methods.
+      LOG(DFATAL) << "detected multiple replica move operations for the same "
+                     "tablet " << tablet_uuid;
+    }
+  }
+  replica_moves->swap(filtered_replica_moves);
+}
+
+Rebalancer::Runner::Runner(size_t max_moves_per_server,
+                           const boost::optional<MonoTime>& deadline)
+    : max_moves_per_server_(max_moves_per_server),
+      deadline_(deadline),
+      moves_count_(0) {
+}
+
+Status Rebalancer::Runner::Init(vector<string> master_addresses) {
+  DCHECK_EQ(0, moves_count_);
+  DCHECK(src_op_indices_.empty());
+  DCHECK(dst_op_indices_.empty());
+  DCHECK(op_count_per_ts_.empty());
+  DCHECK(ts_per_op_count_.empty());
+  DCHECK(scheduled_moves_.empty());
+  DCHECK(master_addresses_.empty());
+  DCHECK(client_.get() == nullptr);
+  master_addresses_ = std::move(master_addresses);
+  return KuduClientBuilder()
+      .master_server_addrs(master_addresses_)
+      .Build(&client_);
+}
+
+void Rebalancer::Runner::LoadMoves(vector<ReplicaMove> replica_moves) {
+  // The moves to schedule (used by subsequent calls to ScheduleNextMove()).
+  replica_moves_.swap(replica_moves);
+
+  // Prepare helper containers.
+  src_op_indices_.clear();
+  dst_op_indices_.clear();
+  op_count_per_ts_.clear();
+  ts_per_op_count_.clear();
+
+  // If there are any scheduled moves, it's necessary to count them in
+  // to properly handle the 'maximum moves per server' constraint.
+  unordered_map<string, int32_t> ts_pending_op_count;
+  for (auto it = scheduled_moves_.begin(); it != scheduled_moves_.end(); ++it) {
+    ++ts_pending_op_count[it->second.ts_uuid_from];
+    ++ts_pending_op_count[it->second.ts_uuid_to];
+  }
+
+  // These two references is to make the compiler happy with the lambda below.
+  auto& op_count_per_ts = op_count_per_ts_;
+  auto& ts_per_op_count = ts_per_op_count_;
+  const auto set_op_count = [&ts_pending_op_count,
+      &op_count_per_ts, &ts_per_op_count](const string& ts_uuid) {
+    auto it = ts_pending_op_count.find(ts_uuid);
+    if (it == ts_pending_op_count.end()) {
+      // No operations for tablet server ts_uuid yet.
+      if (op_count_per_ts.emplace(ts_uuid, 0).second) {
+        ts_per_op_count.emplace(0, ts_uuid);
+      }
+    } else {
+      // There are pending operations for tablet server ts_uuid: set the number
+      // operations at the tablet server ts_uuid as calculated above with
+      // ts_pending_op_count.
+      if (op_count_per_ts.emplace(ts_uuid, it->second).second) {
+        ts_per_op_count.emplace(it->second, ts_uuid);
+      }
+      // Once set into op_count_per_ts and ts_per_op_count, this information
+      // is no longer needed. In addition, these elements are removed to leave
+      // only pending operations those do not intersect with the batch of newly
+      // loaded operations.
+      ts_pending_op_count.erase(it);
+    }
+  };
+
+  // Process move operations from the batch of newly loaded ones.
+  for (size_t i = 0; i < replica_moves_.size(); ++i) {
+    const auto& elem = replica_moves_[i];
+    src_op_indices_.emplace(elem.ts_uuid_from, set<size_t>()).first->
+        second.emplace(i);
+    set_op_count(elem.ts_uuid_from);
+
+    dst_op_indices_.emplace(elem.ts_uuid_to, set<size_t>()).first->
+        second.emplace(i);
+    set_op_count(elem.ts_uuid_to);
+  }
+
+  // Process pending/scheduled move operations which do not intersect
+  // with the batch of newly loaded ones.
+  for (const auto& elem : ts_pending_op_count) {
+    auto op_inserted = op_count_per_ts.emplace(elem.first, elem.second).second;
+    DCHECK(op_inserted);
+    ts_per_op_count.emplace(elem.second, elem.first);
+  }
+}
+
+// Return true if replica move operation has been scheduled successfully.
+bool Rebalancer::Runner::ScheduleNextMove(bool* has_errors, bool* timed_out) {
+  DCHECK(has_errors);
+  DCHECK(timed_out);
+  *has_errors = false;
+  *timed_out = false;
+
+  if (deadline_ && MonoTime::Now() >= *deadline_) {
+    *timed_out = true;
+    return false;
+  }
+
+  // Only one move operation per step: it's necessary to update information
+  // in the ts_per_op_count_ right after scheduling a single operation
+  // to avoid oversubscribing of the tablet servers.
+  size_t op_idx;
+  if (!FindNextMove(&op_idx)) {
+    // Nothing to schedule: unfruitful outcome. Need to wait until
+    // there is a slot at tablet server is available.
+    return false;
+  }
+
+  // Try to schedule next move operation.
+  DCHECK_LT(op_idx, replica_moves_.size());
+  const auto& info = replica_moves_[op_idx];
+  const auto& tablet_id = info.tablet_uuid;
+  const auto& src_ts_uuid = info.ts_uuid_from;
+  const auto& dst_ts_uuid = info.ts_uuid_to;
+
+  Status s = ScheduleReplicaMove(master_addresses_, client_,
+                                 tablet_id, src_ts_uuid, dst_ts_uuid);
+  if (s.ok()) {
+    UpdateOnMoveScheduled(op_idx, info.tablet_uuid,
+                          info.ts_uuid_from, info.ts_uuid_to, true);
+    LOG(INFO) << Substitute("tablet $0: $1 -> $2 move scheduled",
+                            tablet_id, src_ts_uuid, dst_ts_uuid);
+    // Successfully scheduled move operation.
+    return true;
+  }
+
+  DCHECK(!s.ok());
+  // The source replica is not found in the tablet's consensus config
+  // or the tablet does not exit anymore. The replica might already
+  // moved because of some other concurrent activity, e.g.
+  // re-replication, another rebalancing session in progress, etc.
+  LOG(INFO) << Substitute("tablet $0: $1 -> $2 move ignored: $3",
+                          tablet_id, src_ts_uuid, dst_ts_uuid, s.ToString());
+  UpdateOnMoveScheduled(op_idx, info.tablet_uuid,
+                        info.ts_uuid_from, info.ts_uuid_to, false);
+  // Failed to schedule move operation due to an error.
+  *has_errors = true;
+  return false;
+}
+
+bool Rebalancer::Runner::UpdateMovesInProgressStatus(
+    bool* has_errors, bool* timed_out) {
+  DCHECK(has_errors);
+  DCHECK(timed_out);
+  *has_errors = false;
+  *timed_out = false;
+
+  // Update the statuses of the in-progress move operations.
+  auto has_updates = false;
+  auto error_count = 0;
+  for (auto it = scheduled_moves_.begin(); it != scheduled_moves_.end(); ) {
+    if (deadline_ && MonoTime::Now() >= *deadline_) {
+      *timed_out = true;
+      break;
+    }
+    const auto& tablet_id = it->first;
+    DCHECK_EQ(tablet_id, it->second.tablet_uuid);
+    const auto& src_ts_uuid = it->second.ts_uuid_from;
+    const auto& dst_ts_uuid = it->second.ts_uuid_to;
+    auto is_complete = false;
+    Status move_status;
+    const Status s = CheckCompleteMove(master_addresses_, client_,
+                                       tablet_id, src_ts_uuid, dst_ts_uuid,
+                                       &is_complete, &move_status);
+    has_updates |= s.ok();
+    if (!s.ok()) {
+      // There was an error while fetching the status of this move operation.
+      // Since the actual status of the move is not known, don't update the
+      // stats on pending operations per server. The higher-level should handle
+      // this situation after returning from this method, re-synchronizing
+      // the state of the cluster.
+      ++error_count;
+      LOG(INFO) << Substitute("tablet $0: $1 -> $2 move is abandoned: $3",
+                              tablet_id, src_ts_uuid, dst_ts_uuid, s.ToString());
+      // Erase the element and advance the iterator.
+      it = scheduled_moves_.erase(it);
+      continue;
+    } else if (is_complete) {
+      // The move has completed (success or failure): update the stats on the
+      // pending operations per server.
+      ++moves_count_;
+      UpdateOnMoveCompleted(it->second.ts_uuid_from);
+      UpdateOnMoveCompleted(it->second.ts_uuid_to);
+      LOG(INFO) << Substitute("tablet $0: $1 -> $2 move completed: $3",
+                              tablet_id, src_ts_uuid, dst_ts_uuid,
+                              s.ok() ? move_status.ToString() : s.ToString());
+      // Erase the element and advance the iterator.
+      it = scheduled_moves_.erase(it);
+      continue;
+    }
+    // There was an update on the status of the move operation and it hasn't
+    // completed yet. Let's poll for the status of the rest.
+    ++it;
+  }
+  *has_errors = (error_count != 0);
+  return has_updates;
+}
+
+bool Rebalancer::Runner::FindNextMove(size_t* op_idx) {
+  vector<size_t> op_indices;
+  for (auto it = ts_per_op_count_.begin(); op_indices.empty() &&
+       it != ts_per_op_count_.end() && it->first < max_moves_per_server_; ++it) {
+    const auto& uuid_0 = it->second;
+
+    auto it_1 = it;
+    ++it_1;
+    for (; op_indices.empty() && it_1 != ts_per_op_count_.end() &&
+         it_1->first < max_moves_per_server_; ++it_1) {
+      const auto& uuid_1 = it_1->second;
+
+      // Check for available operations where uuid_0, uuid_1 would be
+      // source or destination servers correspondingly.
+      {
+        const auto it_src = src_op_indices_.find(uuid_0);
+        const auto it_dst = dst_op_indices_.find(uuid_1);
+        if (it_src != src_op_indices_.end() &&
+            it_dst != dst_op_indices_.end()) {
+          set_intersection(it_src->second.begin(), it_src->second.end(),
+                           it_dst->second.begin(), it_dst->second.end(),
+                           back_inserter(op_indices));
+        }
+      }
+      // It's enough to find just one move.
+      if (!op_indices.empty()) {
+        break;
+      }
+      {
+        const auto it_src = src_op_indices_.find(uuid_1);
+        const auto it_dst = dst_op_indices_.find(uuid_0);
+        if (it_src != src_op_indices_.end() &&
+            it_dst != dst_op_indices_.end()) {
+          set_intersection(it_src->second.begin(), it_src->second.end(),
+                           it_dst->second.begin(), it_dst->second.end(),
+                           back_inserter(op_indices));
+        }
+      }
+    }
+  }
+  if (!op_indices.empty() && op_idx) {
+    *op_idx = op_indices.front();
+  }
+  return !op_indices.empty();
+}
+
+void Rebalancer::Runner::UpdateOnMoveScheduled(
+    size_t idx,
+    const string& tablet_uuid,
+    const string& src_ts_uuid,
+    const string& dst_ts_uuid,
+    bool is_success) {
+  if (is_success) {
+    Rebalancer::ReplicaMove move_info = { tablet_uuid, src_ts_uuid, dst_ts_uuid };
+    auto ins = scheduled_moves_.emplace(tablet_uuid, std::move(move_info));
+    // Only one replica of a tablet can be moved at a time.
+    DCHECK(ins.second);
+  }
+  UpdateOnMoveScheduledImpl(idx, src_ts_uuid, is_success, &src_op_indices_);
+  UpdateOnMoveScheduledImpl(idx, dst_ts_uuid, is_success, &dst_op_indices_);
+}
+
+void Rebalancer::Runner::UpdateOnMoveScheduledImpl(
+    size_t idx,
+    const string& ts_uuid,
+    bool is_success,
+    std::unordered_map<std::string, std::set<size_t>>* op_indices) {
+  DCHECK(op_indices);
+  auto& indices = (*op_indices)[ts_uuid];
+  auto erased = indices.erase(idx);
+  DCHECK_EQ(1, erased);
+  if (indices.empty()) {
+    op_indices->erase(ts_uuid);
+  }
+  if (is_success) {
+    const auto op_count = op_count_per_ts_[ts_uuid]++;
+    const auto op_range = ts_per_op_count_.equal_range(op_count);
+    bool ts_op_count_updated = false;
+    for (auto it = op_range.first; it != op_range.second; ++it) {
+      if (it->second == ts_uuid) {
+        ts_per_op_count_.erase(it);
+        ts_per_op_count_.emplace(op_count + 1, ts_uuid);
+        ts_op_count_updated = true;
+        break;
+      }
+    }
+    DCHECK(ts_op_count_updated);
+  }
+}
+
+void Rebalancer::Runner::UpdateOnMoveCompleted(const string& ts_uuid) {
+  const auto op_count = op_count_per_ts_[ts_uuid]--;
+  const auto op_range = ts_per_op_count_.equal_range(op_count);
+  bool ts_per_op_count_updated = false;
+  for (auto it = op_range.first; it != op_range.second; ++it) {
+    if (it->second == ts_uuid) {
+      ts_per_op_count_.erase(it);
+      ts_per_op_count_.emplace(op_count - 1, ts_uuid);
+      ts_per_op_count_updated = true;
+      break;
+    }
+  }
+  DCHECK(ts_per_op_count_updated);
+}
+
+} // namespace tools
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/220cd66f/src/kudu/tools/rebalancer.h
----------------------------------------------------------------------
diff --git a/src/kudu/tools/rebalancer.h b/src/kudu/tools/rebalancer.h
new file mode 100644
index 0000000..e18c84a
--- /dev/null
+++ b/src/kudu/tools/rebalancer.h
@@ -0,0 +1,295 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <cstddef>
+#include <cstdint>
+#include <iosfwd>
+#include <map>
+#include <memory>
+#include <random>
+#include <set>
+#include <string>
+#include <unordered_map>
+#include <vector>
+
+#include <boost/optional/optional.hpp>
+#include <gtest/gtest_prod.h>
+
+#include "kudu/client/shared_ptr.h"
+#include "kudu/tools/rebalance_algo.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+namespace client {
+class KuduClient;
+}
+
+namespace tools {
+
+class Ksck;
+struct KsckResults;
+
+// A class implementing logic for Kudu cluster rebalancing.
+class Rebalancer {
+ public:
+  // Configuration parameters for the rebalancer aggregated into a struct.
+  struct Config {
+    Config(std::vector<std::string> master_addresses = {},
+           std::vector<std::string> table_filters = {},
+           size_t max_moves_per_server = 5,
+           size_t max_staleness_interval_sec = 300,
+           int64_t max_run_time_sec = 0,
+           bool move_rf1_replicas = false,
+           bool output_replica_distribution_details = false);
+
+    // Kudu masters' RPC endpoints.
+    std::vector<std::string> master_addresses;
+
+    // Names of tables to balance. If empty, every table and the whole cluster
+    // will be balanced.
+    std::vector<std::string> table_filters;
+
+    // Maximum number of move operations to run concurrently on one server.
+    // An 'operation on a server' means a move operation where either source or
+    // destination replica is located on the specified server.
+    size_t max_moves_per_server;
+
+    // Maximum duration of the 'staleness' interval, when the rebalancer cannot
+    // make any progress in scheduling new moves and no prior scheduled moves
+    // are left, even if re-synchronizing against the cluster's state again and
+    // again. Such a staleness usually happens in case of a persistent problem
+    // with the cluster or when some unexpected concurrent activity is present
+    // (such as automatic recovery of failed replicas, etc.).
+    size_t max_staleness_interval_sec;
+
+    // Maximum run time, in seconds.
+    int64_t max_run_time_sec;
+
+    // Whether to move replicas of tablets with replication factor of one.
+    bool move_rf1_replicas;
+
+    // Whether Rebalancer::PrintStats() should output per-table and per-server
+    // replica distribution details.
+    bool output_replica_distribution_details;
+  };
+
+  // Represents a concrete move of a replica from one tablet server to another.
+  // Formed logically from a TableReplicaMove by specifying a tablet for the move.
+  struct ReplicaMove {
+    std::string tablet_uuid;
+    std::string ts_uuid_from;
+    std::string ts_uuid_to;
+  };
+
+  enum class RunStatus {
+    UNKNOWN,
+    CLUSTER_IS_BALANCED,
+    TIMED_OUT,
+  };
+
+  // A helper type: key is tablet UUID which corresponds to value.tablet_uuid.
+  typedef std::unordered_map<std::string, ReplicaMove> MovesInProgress;
+
+  // Create Rebalancer object with the specified configuration.
+  explicit Rebalancer(const Config& config);
+
+  // Print the stats on the cluster balance information into the 'out' stream.
+  Status PrintStats(std::ostream& out);
+
+  // Run the rebalancing: start the process and return once the balancing
+  // criteria are satisfied or if an error occurs. The number of attempted
+  // moves is output into the 'moves_count' parameter (if the parameter is
+  // not null). The 'result_status' output parameter cannot be null.
+  Status Run(RunStatus* result_status, size_t* moves_count = nullptr);
+
+ private:
+  // Helper class to find and schedule next available rebalancing move operation
+  // and track already scheduled ones.
+  class Runner {
+   public:
+    // The 'max_moves_per_server' specifies the maximum number of operations
+    // per tablet server (both the source and the destination are counted in).
+    // The 'deadline' specifies the deadline for the run, 'boost::none'
+    // if no timeout is set.
+    Runner(size_t max_moves_per_server,
+           const boost::optional<MonoTime>& deadline);
+
+    // Initialize instance of Runner so it can run against Kudu cluster with
+    // the 'master_addresses' RPC endpoints.
+    Status Init(std::vector<std::string> master_addresses);
+
+    // Load information on prescribed replica movement operations. Also,
+    // populate helper containers and other auxiliary run-time structures
+    // used by ScheduleNextMove(). This method is called with every batch
+    // of move operations output by the rebalancing algorithm once previously
+    // loaded moves have been scheduled.
+    void LoadMoves(std::vector<ReplicaMove> replica_moves);
+
+    // Schedule next replica move.
+    bool ScheduleNextMove(bool* has_errors, bool* timed_out);
+
+    // Update statuses and auxiliary information on in-progress replica move
+    // operations. The 'timed_out' parameter is set to 'true' if not all
+    // in-progress operations were processed by the deadline specified by
+    // the 'deadline_' member field. The method returns 'true' if it's necessary
+    // to clear the state of the in-progress operations, i.e. 'forget'
+    // those, starting from a clean state.
+    bool UpdateMovesInProgressStatus(bool* has_errors, bool* timed_out);
+
+    uint32_t moves_count() const {
+      return moves_count_;
+    }
+
+    const MovesInProgress& scheduled_moves() const {
+      return scheduled_moves_;
+    }
+
+   private:
+    // Given the data in the helper containers, find the index describing
+    // the next replica move and output it into the 'op_idx' parameter.
+    bool FindNextMove(size_t* op_idx);
+
+    // Update the helper containers once a move operation has been scheduled.
+    void UpdateOnMoveScheduled(size_t idx,
+                               const std::string& tablet_uuid,
+                               const std::string& src_ts_uuid,
+                               const std::string& dst_ts_uuid,
+                               bool is_success);
+
+    // Auxiliary method used by UpdateOnMoveScheduled() implementation.
+    void UpdateOnMoveScheduledImpl(
+        size_t idx,
+        const std::string& ts_uuid,
+        bool is_success,
+        std::unordered_map<std::string, std::set<size_t>>* op_indices);
+
+    // Update the helper containers once a scheduled operation is complete
+    // (i.e. succeeded or failed).
+    void UpdateOnMoveCompleted(const std::string& ts_uuid);
+
+    // Maximum allowed number of move operations per server. For a move
+    // operation, a source replica adds +1 at the source server and the target
+    // replica adds +1 at the destination server.
+    const size_t max_moves_per_server_;
+
+    // Deadline for the activity performed by the Runner class in
+    // ScheduleNextMoves() and UpadteMovesInProgressStatus() methods.
+    const boost::optional<MonoTime> deadline_;
+
+    // Number of successfully completed replica moves operations.
+    uint32_t moves_count_;
+
+    // Kudu cluster RPC end-points.
+    std::vector<std::string> master_addresses_;
+
+    // The moves to schedule.
+    std::vector<ReplicaMove> replica_moves_;
+
+    // Mapping 'tserver UUID' --> 'indices of move operations having the
+    // tserver UUID (i.e. the key) as the source of the move operation'.
+    std::unordered_map<std::string, std::set<size_t>> src_op_indices_;
+
+    // Mapping 'tserver UUID' --> 'indices of move operations having the
+    // tserver UUID (i.e. the key) as the destination of the move operation'.
+    std::unordered_map<std::string, std::set<size_t>> dst_op_indices_;
+
+    // Mapping 'tserver UUID' --> 'scheduled move operations count'.
+    std::unordered_map<std::string, int32_t> op_count_per_ts_;
+
+    // Mapping 'scheduled move operations count' --> 'tserver UUID'. That's
+    // just reversed 'op_count_per_ts_'. Having count as key helps with finding
+    // servers with minimum number of scheduled operations while scheduling
+    // replica movement operations (it's necessary to preserve the
+    // 'maximum-moves-per-server' constraint while doing so).
+    std::multimap<int32_t, std::string> ts_per_op_count_;
+
+    // Information on scheduled replica movement operations; keys are
+    // tablet UUIDs, values are ReplicaMove structures.
+    MovesInProgress scheduled_moves_;
+
+    // Client object to make queries to Kudu masters for various auxiliary info
+    // while scheduling move operations and monitoring their status.
+    client::sp::shared_ptr<client::KuduClient> client_;
+  };
+
+  FRIEND_TEST(KuduKsckRebalanceTest, KsckResultsToClusterBalanceInfo);
+
+  // Convert ksck results into cluster balance information suitable for the
+  // input of the high-level rebalancing algorithm. The 'moves_in_progress'
+  // parameter contains information on the replica moves which have been
+  // scheduled by a caller and still in progress: those are considered
+  // as successfully completed and applied to the 'ksck_info' when building
+  // ClusterBalanceInfo for the specified 'ksck_info' input. The result
+  // cluster balance information is output into the 'cbi' parameter. The 'cbi'
+  // output parameter cannot be null.
+  Status KsckResultsToClusterBalanceInfo(
+      const KsckResults& ksck_info,
+      const MovesInProgress& moves_in_progress,
+      ClusterBalanceInfo* cbi) const;
+
+  // Get next batch of replica moves from the rebalancing algorithm.
+  // Essentially, it runs ksck against the cluster and feeds the data into the
+  // rebalancing algorithm along with the information on currently pending
+  // replica movement operations. The information returned by the high-level
+  // rebalancing algorithm is translated into particular replica movement
+  // instructions, which are used to populate the 'replica_moves' parameter
+  // (the container is cleared first).
+  //
+  // The 'moves_in_progress' parameter contains information on pending moves.
+  // The results are output into 'replica_moves', which will be empty
+  // if no next steps are needed to make the cluster balanced.
+  Status GetNextMoves(const MovesInProgress& moves_in_progress,
+                      std::vector<ReplicaMove>* replica_moves);
+
+  // Given information from the high-level rebalancing algorithm, find
+  // appropriate tablet replicas to move on the specified tablet servers.
+  // The set of result UUIDs is output into the 'tablet_ids' container (note:
+  // the output container is first cleared). If no suitable replicas are found,
+  // 'tablet_ids' will be empty with the result status of Status::OK().
+  Status FindReplicas(const TableReplicaMove& move,
+                      const KsckResults& ksck_info,
+                      std::vector<std::string>* tablet_ids) const;
+
+  // Reset ksck-related fields, preparing for a fresh ksck run.
+  Status ResetKsck();
+
+  // Filter out move operations at the tablets which already have operations
+  // in progress. The 'replica_moves' cannot be null.
+  void FilterMoves(const MovesInProgress& scheduled_moves,
+                   std::vector<ReplicaMove>* replica_moves);
+
+  // Configuration for the rebalancer.
+  const Config config_;
+
+  // Random device and generator for selecting among multiple choices, when
+  // appropriate.
+  std::random_device random_device_;
+  std::mt19937 random_generator_;
+
+  // An instance of the balancing algorithm.
+  TwoDimensionalGreedyAlgo algo_;
+
+  // Auxiliary Ksck object to get information on the cluster.
+  std::shared_ptr<Ksck> ksck_;
+
+};
+
+} // namespace tools
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/220cd66f/src/kudu/tools/tool_action_cluster.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/tool_action_cluster.cc b/src/kudu/tools/tool_action_cluster.cc
index e225254..010cdcd 100644
--- a/src/kudu/tools/tool_action_cluster.cc
+++ b/src/kudu/tools/tool_action_cluster.cc
@@ -15,24 +15,42 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include <cstddef>
 #include <iostream>
+#include <map>
 #include <memory>
+#include <set>
 #include <string>
+#include <type_traits>
 #include <unordered_map>
 #include <utility>
 #include <vector>
 
 #include <gflags/gflags.h>
+#include <glog/logging.h>
 
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/strings/split.h"
+#include "kudu/gutil/strings/substitute.h"
 #include "kudu/tools/ksck.h"
 #include "kudu/tools/ksck_remote.h"
+#include "kudu/tools/rebalancer.h"
 #include "kudu/tools/tool_action.h"
 #include "kudu/tools/tool_action_common.h"
 #include "kudu/util/status.h"
 
+using std::cout;
+using std::endl;
+using std::multimap;
+using std::set;
+using std::shared_ptr;
+using std::string;
+using std::unique_ptr;
+using std::vector;
+using strings::Split;
+using strings::Substitute;
+
 #define PUSH_PREPEND_NOT_OK(s, statuses, msg) do { \
   ::kudu::Status _s = (s); \
   if (PREDICT_FALSE(!_s.ok())) { \
@@ -48,68 +66,159 @@ DEFINE_string(tablets, "",
               "Tablets to check (comma-separated list of IDs) "
               "If not specified, checks all tablets.");
 
+DEFINE_uint32(max_moves_per_server, 5,
+              "Maximum number of replica moves to perform concurrently on one "
+              "tablet server: 'move from' and 'move to' are counted "
+              "as separate move operations.");
+
+DEFINE_uint32(max_staleness_interval_sec, 300,
+              "Maximum duration of the 'staleness' interval, when the "
+              "rebalancer cannot make any progress in scheduling new moves and "
+              "no prior scheduled moves are left, even if re-synchronizing "
+              "against the cluster's state again and again. Such a staleness "
+              "usually happens in case of a persistent problem with the "
+              "cluster or when some unexpected concurrent activity is "
+              "present (such as automatic recovery of failed replicas, etc.)");
+
+DEFINE_int64(max_run_time_sec, 0,
+             "Maximum time to run the rebalancing, in seconds. Specifying 0 "
+             "means not imposing any limit on the rebalancing run time.");
+
+DEFINE_bool(move_single_replicas, false,
+            "Whether to move single replica tablets (i.e. replicas of tablets "
+            "of replication factor 1).");
+
+DEFINE_bool(output_replica_distribution_details, false,
+            "Whether to output details on per-table and per-server "
+            "replica distribution");
+
 namespace kudu {
 namespace tools {
 
-using std::cerr;
-using std::cout;
-using std::endl;
-using std::shared_ptr;
-using std::string;
-using std::unique_ptr;
-using std::vector;
-
 namespace {
 
 Status RunKsck(const RunnerContext& context) {
-  const string& master_addresses_str = FindOrDie(context.required_args,
-                                                 kMasterAddressesArg);
-  vector<string> master_addresses = strings::Split(master_addresses_str, ",");
+  vector<string> master_addresses = Split(
+      FindOrDie(context.required_args, kMasterAddressesArg), ",");
   shared_ptr<KsckCluster> cluster;
   RETURN_NOT_OK_PREPEND(RemoteKsckCluster::Build(master_addresses, &cluster),
                         "unable to build KsckCluster");
   shared_ptr<Ksck> ksck(new Ksck(cluster));
 
-  ksck->set_table_filters(strings::Split(
-      FLAGS_tables, ",", strings::SkipEmpty()));
-  ksck->set_tablet_id_filters(strings::Split(
-      FLAGS_tablets, ",", strings::SkipEmpty()));
+  ksck->set_table_filters(Split(FLAGS_tables, ",", strings::SkipEmpty()));
+  ksck->set_tablet_id_filters(Split(FLAGS_tablets, ",", strings::SkipEmpty()));
 
   return ksck->RunAndPrintResults();
 }
 
+// Rebalance the cluster. The process is run step-by-step, where at each step
+// a new batch of move operations is output by the algorithm. As many as
+// possible replica movements from one batch are performed concurrently, while
+// running not more than the specified number of concurrent replica movements
+// at each tablet server. In other words, at every moment a single tablet server
+// can be the source and the destination of no more than the specified number of
+// move operations.
+Status RunRebalance(const RunnerContext& context) {
+  Rebalancer rebalancer(Rebalancer::Config(
+      std::move(Split(FindOrDie(context.required_args, kMasterAddressesArg), ",")),
+      std::move(Split(FLAGS_tables, ",", strings::SkipEmpty())),
+      FLAGS_max_moves_per_server,
+      FLAGS_max_staleness_interval_sec,
+      FLAGS_max_run_time_sec,
+      FLAGS_move_single_replicas,
+      FLAGS_output_replica_distribution_details));
+
+  // Print info on pre-rebalance distribution of replicas.
+  RETURN_NOT_OK(rebalancer.PrintStats(cout));
+
+  Rebalancer::RunStatus result_status;
+  size_t moves_count;
+  RETURN_NOT_OK(rebalancer.Run(&result_status, &moves_count));
+
+  const string msg_template = "rebalancing is complete: $0 (moved $1 replicas)";
+  string msg_result_status;
+  switch (result_status) {
+    case Rebalancer::RunStatus::CLUSTER_IS_BALANCED:
+      msg_result_status = "cluster is balanced";
+      break;
+    case Rebalancer::RunStatus::TIMED_OUT:
+      msg_result_status = "time is up";
+      break;
+    default:
+      msg_result_status = "unexpected rebalancer status";
+      DCHECK(false) << msg_result_status;
+      break;
+  }
+  cout << endl << Substitute(msg_template, msg_result_status, moves_count) << endl;
+
+  if (moves_count != 0) {
+    // Print info on post-rebalance distribution of replicas, if any moves
+    // were performed at all.
+    RETURN_NOT_OK(rebalancer.PrintStats(cout));
+  }
+
+  return Status::OK();
+}
+
 } // anonymous namespace
 
 unique_ptr<Mode> BuildClusterMode() {
-  string desc = "Check the health of a Kudu cluster";
-  string extra_desc = "By default, ksck checks that master and tablet server "
-      "processes are running, and that table metadata is consistent. Use the "
-      "'checksum' flag to check that tablet data is consistent (also see the "
-      "'tables' and 'tablets' flags). Use the 'checksum_snapshot' along with "
-      "'checksum' if the table or tablets are actively receiving inserts or "
-      "updates. Use the 'verbose' flag to output detailed information on "
-      "cluster status even if no inconsistency is found in metadata.";
-  unique_ptr<Action> ksck =
-      ActionBuilder("ksck", &RunKsck)
-      .Description(desc)
-      .ExtraDescription(extra_desc)
-      .AddRequiredParameter({ kMasterAddressesArg, kMasterAddressesArgDesc })
-      .AddOptionalParameter("checksum_cache_blocks")
-      .AddOptionalParameter("checksum_scan")
-      .AddOptionalParameter("checksum_scan_concurrency")
-      .AddOptionalParameter("checksum_snapshot")
-      .AddOptionalParameter("checksum_timeout_sec")
-      .AddOptionalParameter("color")
-      .AddOptionalParameter("consensus")
-      .AddOptionalParameter("ksck_format")
-      .AddOptionalParameter("tables")
-      .AddOptionalParameter("tablets")
-      .Build();
-
-  return ModeBuilder("cluster")
-      .Description("Operate on a Kudu cluster")
-      .AddAction(std::move(ksck))
-      .Build();
+  ModeBuilder builder("cluster");
+  builder.Description("Operate on a Kudu cluster");
+
+  {
+    constexpr auto desc = "Check the health of a Kudu cluster";
+    constexpr auto extra_desc = "By default, ksck checks that master and "
+        "tablet server processes are running, and that table metadata is "
+        "consistent. Use the 'checksum' flag to check that tablet data is "
+        "consistent (also see the 'tables' and 'tablets' flags). Use the "
+        "'checksum_snapshot' along with 'checksum' if the table or tablets "
+        "are actively receiving inserts or updates. Use the 'verbose' flag to "
+        "output detailed information on cluster status even if no "
+        "inconsistency is found in metadata.";
+
+    unique_ptr<Action> ksck = ActionBuilder("ksck", &RunKsck)
+        .Description(desc)
+        .ExtraDescription(extra_desc)
+        .AddRequiredParameter({ kMasterAddressesArg, kMasterAddressesArgDesc })
+        .AddOptionalParameter("checksum_cache_blocks")
+        .AddOptionalParameter("checksum_scan")
+        .AddOptionalParameter("checksum_scan_concurrency")
+        .AddOptionalParameter("checksum_snapshot")
+        .AddOptionalParameter("checksum_timeout_sec")
+        .AddOptionalParameter("color")
+        .AddOptionalParameter("consensus")
+        .AddOptionalParameter("ksck_format")
+        .AddOptionalParameter("tables")
+        .AddOptionalParameter("tablets")
+        .Build();
+    builder.AddAction(std::move(ksck));
+  }
+
+  {
+    constexpr auto desc = "Move tablet replicas between tablet servers to "
+        "balance replica counts for each table and for the cluster as a whole.";
+    constexpr auto extra_desc = "The rebalancing tool moves tablet replicas "
+        "between tablet servers, in the same manner as the "
+        "'kudu tablet change_config move_replica' command, attempting to "
+        "balance the count of replicas per table on each tablet server, "
+        "and after that attempting to balance the total number of replicas "
+        "per tablet server.";
+    unique_ptr<Action> rebalance = ActionBuilder("rebalance", &RunRebalance)
+        .Description(desc)
+        .ExtraDescription(extra_desc)
+        .AddRequiredParameter({ kMasterAddressesArg, kMasterAddressesArgDesc })
+        .AddOptionalParameter("max_moves_per_server")
+        .AddOptionalParameter("max_run_time_sec")
+        .AddOptionalParameter("max_staleness_interval_sec")
+        .AddOptionalParameter("move_single_replicas")
+        .AddOptionalParameter("output_replica_distribution_details")
+        .AddOptionalParameter("tables")
+        .Build();
+    builder.AddAction(std::move(rebalance));
+  }
+
+  return builder.Build();
 }
 
 } // namespace tools

http://git-wip-us.apache.org/repos/asf/kudu/blob/220cd66f/src/kudu/tools/tool_action_common.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/tool_action_common.cc b/src/kudu/tools/tool_action_common.cc
index d5604d9..09d0cf1 100644
--- a/src/kudu/tools/tool_action_common.cc
+++ b/src/kudu/tools/tool_action_common.cc
@@ -121,6 +121,8 @@ namespace tools {
 
 using client::KuduClient;
 using client::KuduClientBuilder;
+using client::KuduTablet;
+using client::KuduTabletServer;
 using consensus::ConsensusServiceProxy;
 using consensus::ReplicateMsg;
 using log::LogEntryPB;


Mime
View raw message