kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aw...@apache.org
Subject [kudu] branch master updated: KUDU-2780: create thread for auto-rebalancing
Date Fri, 20 Mar 2020 17:55:38 GMT
This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 664d7fe  KUDU-2780: create thread for auto-rebalancing
664d7fe is described below

commit 664d7fec8e603d930e7ca2c8302130103f4fbcac
Author: hannahvnguyen <hannah.nguyen@cloudera.com>
AuthorDate: Mon Feb 3 09:07:59 2020 -0800

    KUDU-2780: create thread for auto-rebalancing
    
    The auto-rebalancer thread is a background task of
    the master's catalog manager. Each loop iteration, the
    thread collects information on tablet servers, tables,
    and tablets, in order to determine the best rebalancing
    replica moves for the current state of the cluster.
    The maximum number of moves per tserver, per iteration,
    is controlled by a flag. Currently, each batch of moves
    scheduled per iteration is executed synchronously. The
    thread waits for the moves to complete, then sleeps for a
    time interval, which is controlled by a flag, before resuming.
    
    If the cluster has placement policy violations, the thread
    will prioritize scheduling and executing replica moves to
    reinstate the policy. Otherwise, the thread will perform
    inter-location (cross-location), then intra-location (by
    table, then by tserver) rebalancing.
    
    By default, auto-rebalancing is disabled. This can be
    changed by a flag in the catalog manager. The rebalancer
    tool should be run first, to fully rebalance the cluster,
    before enabling auto-rebalancing.
    
    Change-Id: Ifca25d1063c07047cf2123e6792b3c7395be20e4
    Reviewed-on: http://gerrit.cloudera.org:8080/14177
    Tested-by: Kudu Jenkins
    Reviewed-by: Alexey Serbin <aserbin@cloudera.com>
    Reviewed-by: Andrew Wong <awong@cloudera.com>
---
 src/kudu/master/CMakeLists.txt          |   5 +
 src/kudu/master/auto_rebalancer-test.cc | 549 ++++++++++++++++++++++
 src/kudu/master/auto_rebalancer.cc      | 793 ++++++++++++++++++++++++++++++++
 src/kudu/master/auto_rebalancer.h       | 195 ++++++++
 src/kudu/master/catalog_manager.cc      |  41 +-
 src/kudu/master/catalog_manager.h       |  14 +-
 src/kudu/master/master_runner.cc        |   3 +
 src/kudu/master/ts_descriptor.h         |   5 +
 src/kudu/master/ts_manager.cc           |  12 +
 src/kudu/master/ts_manager.h            |   3 +
 src/kudu/rebalance/rebalancer.cc        |  67 +++
 src/kudu/rebalance/rebalancer.h         | 100 ++--
 src/kudu/tools/rebalancer_tool.cc       |  70 +--
 src/kudu/tools/tool_replica_util.cc     |   1 -
 src/kudu/tools/tool_replica_util.h      |   4 +-
 15 files changed, 1755 insertions(+), 107 deletions(-)

diff --git a/src/kudu/master/CMakeLists.txt b/src/kudu/master/CMakeLists.txt
index 033429c..0dc4540 100644
--- a/src/kudu/master/CMakeLists.txt
+++ b/src/kudu/master/CMakeLists.txt
@@ -34,6 +34,7 @@ ADD_EXPORTABLE_LIBRARY(master_proto
 
 set(MASTER_SRCS
   authz_provider.cc
+  auto_rebalancer.cc
   catalog_manager.cc
   hms_notification_log_listener.cc
   location_cache.cc
@@ -69,6 +70,7 @@ target_link_libraries(master
   kudu_thrift
   kudu_util
   master_proto
+  rebalance
   rpc_header_proto
   security
   server_process
@@ -80,12 +82,15 @@ target_link_libraries(master
 SET_KUDU_TEST_LINK_LIBS(
   kudu_client
   kudu_curl_util
+  itest_util
   master
   master_proto
+  mini_cluster
   mini_hms
   mini_kdc
   mini_sentry)
 
+ADD_KUDU_TEST(auto_rebalancer-test)
 ADD_KUDU_TEST(catalog_manager-test)
 ADD_KUDU_TEST(hms_notification_log_listener-test)
 ADD_KUDU_TEST(location_cache-test DATA_FILES ../scripts/first_argument.sh)
diff --git a/src/kudu/master/auto_rebalancer-test.cc b/src/kudu/master/auto_rebalancer-test.cc
new file mode 100644
index 0000000..722ba55
--- /dev/null
+++ b/src/kudu/master/auto_rebalancer-test.cc
@@ -0,0 +1,549 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include "kudu/master/auto_rebalancer.h"
+
+#include <atomic>
+#include <memory>
+#include <set>
+#include <string>
+#include <vector>
+
+#include <gflags/gflags_declare.h>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/integration-tests/cluster_itest_util.h"
+#include "kudu/integration-tests/test_workload.h"
+#include "kudu/master/catalog_manager.h"
+#include "kudu/master/master.h"
+#include "kudu/master/master.pb.h"
+#include "kudu/master/mini_master.h"
+#include "kudu/master/ts_descriptor.h"
+#include "kudu/master/ts_manager.h"
+#include "kudu/mini-cluster/internal_mini_cluster.h"
+#include "kudu/tserver/mini_tablet_server.h"
+#include "kudu/tserver/tablet_server.h"
+#include "kudu/util/logging_test_util.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+
+using kudu::cluster::InternalMiniCluster;
+using kudu::cluster::InternalMiniClusterOptions;
+using std::set;
+using std::string;
+using std::unique_ptr;
+using std::vector;
+using strings::Substitute;
+
+DECLARE_bool(auto_rebalancing_enabled);
+DECLARE_int32(consensus_inject_latency_ms_in_notifications);
+DECLARE_int32(follower_unavailable_considered_failed_sec);
+DECLARE_int32(raft_heartbeat_interval_ms);
+DECLARE_int32(tablet_copy_download_file_inject_latency_ms);
+DECLARE_int32(tserver_unresponsive_timeout_ms);
+DECLARE_uint32(auto_rebalancing_interval_seconds);
+DECLARE_uint32(auto_rebalancing_max_moves_per_server);
+DECLARE_uint32(auto_rebalancing_wait_for_replica_moves_seconds);
+
+METRIC_DECLARE_gauge_int32(tablet_copy_open_client_sessions);
+
+namespace {
+
+// Some of these tests will set a low Raft timeout to move election traffic
+// along more quickly. When built with TSAN, this can lead to timeouts, so ease
+// up a bit.
+#ifdef THREAD_SANITIZER
+  constexpr int kLowRaftTimeout = 300;
+#else
+  constexpr int kLowRaftTimeout = 100;
+#endif
+
+} // anonymous namespace
+
+
+namespace kudu {
+namespace master {
+
+class AutoRebalancerTest : public KuduTest {
+  public:
+
+  Status CreateAndStartCluster() {
+    FLAGS_auto_rebalancing_interval_seconds = 1; // Shorten for testing.
+    FLAGS_auto_rebalancing_wait_for_replica_moves_seconds = 0; // Shorten for testing.
+    FLAGS_auto_rebalancing_enabled = true; // Enable for testing.
+    cluster_.reset(new InternalMiniCluster(env_, cluster_opts_));
+    return cluster_->Start();
+  }
+
+  // This function will assign tservers to available locations as
+  // evenly as possible.
+  void AssignLocationsEvenly(int num_locations) {
+    const int num_tservers = cluster_->num_tablet_servers();
+    ASSERT_GE(num_tservers, num_locations);
+
+    int master_idx;
+    ASSERT_OK(cluster_->GetLeaderMasterIndex(&master_idx));
+
+    TSDescriptorVector descs;
+
+    ASSERT_EVENTUALLY([&] {
+      cluster_->mini_master(master_idx)->master()->ts_manager()->
+        GetAllDescriptors(&descs);
+      ASSERT_EQ(num_tservers, descs.size());
+    });
+
+    // Assign num_locations unique locations to the first num_locations tservers.
+    for (int i = 0; i < num_tservers; ++i) {
+      descs[i]->AssignLocationForTesting(Substitute("L$0", i % num_locations));
+    }
+  }
+
+  // This function expects there to be more tservers than available
+  // locations. As many tservers as possible will be assigned unique
+  // locations, then any additional tservers will all be assigned
+  // to the first location.
+  void AssignLocationsWithSkew(int num_locations) {
+    const int num_tservers = cluster_->num_tablet_servers();
+    ASSERT_GT(num_tservers, num_locations);
+
+    int master_idx;
+    ASSERT_OK(cluster_->GetLeaderMasterIndex(&master_idx));
+
+    TSDescriptorVector descs;
+
+    ASSERT_EVENTUALLY([&] {
+      cluster_->mini_master(master_idx)->master()->ts_manager()->
+        GetAllDescriptors(&descs);
+      ASSERT_EQ(num_tservers, descs.size());
+    });
+
+    // Assign num_locations unique locations to the first num_locations tservers.
+    for (int i = 0; i < num_locations; ++i) {
+      descs[i]->AssignLocationForTesting(Substitute("L$0", i));
+    }
+
+    for (int i = num_locations; i < num_tservers; ++i) {
+      descs[i]->AssignLocationForTesting(Substitute("L$0", 0));
+    }
+  }
+
+  // Make sure the leader master has begun the auto-rebalancing thread.
+  void CheckAutoRebalancerStarted() {
+    ASSERT_EVENTUALLY([&] {
+      int leader_idx;
+      ASSERT_OK(cluster_->GetLeaderMasterIndex(&leader_idx));
+      ASSERT_LT(0, NumLoopIterations(leader_idx));
+    });
+  }
+
+  // Make sure the auto-rebalancing loop has iterated a few times,
+  // and no moves were scheduled.
+  void CheckNoMovesScheduled() {
+    int leader_idx;
+    ASSERT_OK(cluster_->GetLeaderMasterIndex(&leader_idx));
+    auto initial_loop_iterations = NumLoopIterations(leader_idx);
+
+    ASSERT_EVENTUALLY([&] {
+      ASSERT_LT(initial_loop_iterations + 3, NumLoopIterations(leader_idx));
+      ASSERT_EQ(0, NumMovesScheduled(leader_idx));
+    });
+  }
+
+  void CheckSomeMovesScheduled() {
+    int leader_idx;
+    ASSERT_OK(cluster_->GetLeaderMasterIndex(&leader_idx));
+    auto initial_loop_iterations = NumLoopIterations(leader_idx);
+
+    ASSERT_EVENTUALLY([&] {
+      ASSERT_LT(initial_loop_iterations, NumLoopIterations(leader_idx));
+      ASSERT_LT(0, NumMovesScheduled(leader_idx));
+    });
+  }
+
+  int NumLoopIterations(int master_idx) {
+    DCHECK(cluster_ != nullptr);
+    return cluster_->mini_master(master_idx)->master()->catalog_manager()->
+        auto_rebalancer()->number_of_loop_iterations_for_test_;
+  }
+
+  int NumMovesScheduled(int master_idx) {
+    DCHECK(cluster_ != nullptr);
+    return cluster_->mini_master(master_idx)->master()->catalog_manager()->
+        auto_rebalancer()->moves_scheduled_this_round_for_test_;
+  }
+
+  void SetupWorkLoad(int num_tablets, int num_replicas) {
+    workload_.reset(new TestWorkload(cluster_.get()));
+    workload_->set_num_tablets(num_tablets);
+    workload_->set_num_replicas(num_replicas);
+    workload_->Setup();
+  }
+
+  void TearDown() override {
+    if (cluster_) {
+      cluster_->Shutdown();
+    }
+    KuduTest::TearDown();
+  }
+
+  protected:
+    unique_ptr<InternalMiniCluster> cluster_;
+    InternalMiniClusterOptions cluster_opts_;
+    unique_ptr<TestWorkload> workload_;
+  };
+
+// Make sure that only the leader master is doing auto-rebalancing.
+TEST_F(AutoRebalancerTest, OnlyLeaderDoesAutoRebalancing) {
+  const int kNumMasters = 3;
+  const int kNumTservers = 3;
+  const int kNumTablets = 4;
+  cluster_opts_.num_masters = kNumMasters;
+  cluster_opts_.num_tablet_servers = kNumTservers;
+  ASSERT_OK(CreateAndStartCluster());
+  NO_FATALS(CheckAutoRebalancerStarted());
+
+  SetupWorkLoad(kNumTablets, /*num_replicas*/1);
+
+  int leader_idx;
+  ASSERT_OK(cluster_->GetLeaderMasterIndex(&leader_idx));
+  for (int i = 0; i < kNumMasters; i++) {
+    if (i == leader_idx) {
+      ASSERT_EVENTUALLY([&] {
+        ASSERT_LT(0, NumLoopIterations(i));
+      });
+    } else {
+      ASSERT_EVENTUALLY([&] {
+        ASSERT_EQ(0, NumMovesScheduled(i));
+      });
+    }
+  }
+}
+
+// If the leader master goes down, the next elected master should perform
+// auto-rebalancing.
+TEST_F(AutoRebalancerTest, NextLeaderResumesAutoRebalancing) {
+  const int kNumMasters = 3;
+  const int kNumTservers = 3;
+  const int kNumTablets = 3;
+  cluster_opts_.num_masters = kNumMasters;
+  cluster_opts_.num_tablet_servers = kNumTservers;
+  ASSERT_OK(CreateAndStartCluster());
+  NO_FATALS(CheckAutoRebalancerStarted());
+
+  SetupWorkLoad(kNumTablets, /*num_replicas*/1);
+
+  // Verify that non-leaders are not performing rebalancing,
+  // then take down the leader master.
+  int leader_idx;
+  ASSERT_OK(cluster_->GetLeaderMasterIndex(&leader_idx));
+  for (int i = 0; i < kNumMasters; i++) {
+    if (i != leader_idx) {
+      ASSERT_EQ(0, NumLoopIterations(i));
+    }
+  }
+  cluster_->mini_master(leader_idx)->Shutdown();
+
+  // Let another master become leader and resume auto-rebalancing.
+  // Number of auto-rebalancing iterations should increase again.
+  int new_leader_idx;
+  ASSERT_EVENTUALLY([&] {
+    ASSERT_OK(cluster_->GetLeaderMasterIndex(&new_leader_idx));
+    ASSERT_NE(leader_idx, new_leader_idx);
+    auto iterations = NumLoopIterations(new_leader_idx);
+    ASSERT_LT(0, iterations);
+  });
+}
+
+// Create a cluster that is initially balanced.
+// Bring up another tserver, and verify that moves are scheduled,
+// since the cluster is no longer balanced.
+TEST_F(AutoRebalancerTest, MovesScheduledIfAddTserver) {
+  const int kNumTservers = 3;
+  const int kNumTablets = 2;
+  cluster_opts_.num_tablet_servers = kNumTservers;
+  ASSERT_OK(CreateAndStartCluster());
+  NO_FATALS(CheckAutoRebalancerStarted());
+
+  SetupWorkLoad(kNumTablets, /*num_replicas*/3);
+
+  NO_FATALS(CheckNoMovesScheduled());
+
+  ASSERT_OK(cluster_->AddTabletServer());
+  NO_FATALS(CheckSomeMovesScheduled());
+}
+
+// A cluster with no tservers is balanced.
+TEST_F(AutoRebalancerTest, NoReplicaMovesIfNoTservers) {
+  const int kNumTservers = 0;
+  cluster_opts_.num_tablet_servers = kNumTservers;
+  ASSERT_OK(CreateAndStartCluster());
+  NO_FATALS(CheckAutoRebalancerStarted());
+  NO_FATALS(CheckNoMovesScheduled());
+}
+
+// A cluster with no tablets is balanced.
+TEST_F(AutoRebalancerTest, NoReplicaMovesIfNoTablets) {
+  const int kNumTservers = 3;
+  cluster_opts_.num_tablet_servers = kNumTservers;
+  ASSERT_OK(CreateAndStartCluster());
+  NO_FATALS(CheckAutoRebalancerStarted());
+  NO_FATALS(CheckNoMovesScheduled());
+}
+
+// Assign each tserver to its own location.
+// A cluster with location load skew = 1 is balanced.
+// In this test, 3 tservers should have load = 1, 1 tserver should have load = 0.
+TEST_F(AutoRebalancerTest, NoReplicaMovesIfLocationLoadSkewedByOne) {
+  const int kNumTservers = 4;
+  const int kNumTablets = 1;
+  const int kNumLocations = kNumTservers;
+  cluster_opts_.num_tablet_servers = kNumTservers;
+  ASSERT_OK(CreateAndStartCluster());
+  NO_FATALS(CheckAutoRebalancerStarted());
+
+  NO_FATALS(AssignLocationsEvenly(kNumLocations));
+
+  const auto timeout = MonoDelta::FromSeconds(30);
+  vector<master::ListTabletServersResponsePB_Entry> tservers;
+  ASSERT_OK(itest::ListTabletServers(cluster_->master_proxy(),
+                                     timeout,
+                                     &tservers));
+  set<string> locations;
+  for (const auto& tserver : tservers) {
+    const auto& ts_location = tserver.location();
+    locations.insert(ts_location);
+    ASSERT_FALSE(ts_location.empty());
+  }
+  ASSERT_EQ(kNumTservers, locations.size());
+
+  SetupWorkLoad(kNumTablets, /*num_replicas*/3);
+
+  NO_FATALS(CheckNoMovesScheduled());
+}
+
+// Assign tservers to one of two locations.
+// If placement policy can never be satisfied, the auto-rebalancer should
+// not attempt to schedule any replica movements.
+// In this test, the tablet should have the majority (2/3) of its replicas
+// in the same location. This violation cannot be fixed.
+TEST_F(AutoRebalancerTest, NoReplicaMovesIfCannotFixPlacementPolicy) {
+  const int kNumTservers = 3;
+  const int kNumTablets = 1;
+  const int kNumLocations = 2;
+  cluster_opts_.num_tablet_servers = kNumTservers;
+  ASSERT_OK(CreateAndStartCluster());
+  NO_FATALS(CheckAutoRebalancerStarted());
+
+  NO_FATALS(AssignLocationsWithSkew(kNumLocations));
+
+  const auto timeout = MonoDelta::FromSeconds(30);
+  vector<master::ListTabletServersResponsePB_Entry> tservers;
+  ASSERT_OK(itest::ListTabletServers(cluster_->master_proxy(),
+                                     timeout,
+                                     &tservers));
+  set<string> locations;
+  for (const auto& tserver : tservers) {
+    const auto& ts_location = tserver.location();
+    locations.insert(ts_location);
+    ASSERT_FALSE(ts_location.empty());
+  }
+  ASSERT_EQ(kNumLocations, locations.size());
+
+  SetupWorkLoad(kNumTablets, /*num_replicas*/3);
+
+  NO_FATALS(CheckNoMovesScheduled());
+}
+
+// Verify that each server's number of tablet copying sessions
+// doesn't exceed the value in the flag auto_rebalancing_max_moves_per_server.
+TEST_F(AutoRebalancerTest, TestMaxMovesPerServer) {
+  SKIP_IF_SLOW_NOT_ALLOWED();
+
+  // Make tablet replica copying slow, to make it easier to spot violations.
+  FLAGS_tablet_copy_download_file_inject_latency_ms = 2000;
+
+  const int kNumOrigTservers = 3;
+  const int kNumAdditionalTservers = 3;
+  const int kNumTablets = 12;
+
+  cluster_opts_.num_tablet_servers = kNumOrigTservers;
+  ASSERT_OK(CreateAndStartCluster());
+  NO_FATALS(CheckAutoRebalancerStarted());
+
+  SetupWorkLoad(kNumTablets, /*num_replicas*/3);
+  workload_->Start();
+  ASSERT_EVENTUALLY([&]() {
+      ASSERT_LE(1000, workload_->rows_inserted());
+  });
+  workload_->StopAndJoin();
+
+  // Bring up additional tservers, to trigger tablet replica copying.
+  for (int i = 0; i < kNumAdditionalTservers; ++i) {
+    ASSERT_OK(cluster_->AddTabletServer());
+  }
+  NO_FATALS(CheckSomeMovesScheduled());
+
+  // Check metric 'tablet_copy_open_client_sessions', which must be
+  // less than the auto_rebalancing_max_moves_per_server, for each tserver.
+  for (int i = 0; i < kNumOrigTservers + kNumAdditionalTservers; ++i) {
+    auto metric = cluster_->mini_tablet_server(i)->server()->metric_entity();
+    int open_client_sessions = METRIC_tablet_copy_open_client_sessions.
+        Instantiate(metric, 0)->value();
+
+    // Check that the total number of sessions does not excceed the flag.
+    ASSERT_GE(FLAGS_auto_rebalancing_max_moves_per_server, open_client_sessions);
+  }
+}
+
+// Attempt rebalancing a cluster with unstable Raft configurations.
+TEST_F(AutoRebalancerTest, AutoRebalancingUnstableCluster) {
+  // Set a low Raft heartbeat.
+  FLAGS_raft_heartbeat_interval_ms = kLowRaftTimeout;
+
+  const int kNumTservers = 3;
+  const int kNumTablets = 3;
+
+  cluster_opts_.num_tablet_servers = kNumTservers;
+  ASSERT_OK(CreateAndStartCluster());
+  NO_FATALS(CheckAutoRebalancerStarted());
+
+  SetupWorkLoad(kNumTablets, /*num_replicas*/3);
+
+  // Inject latency to make tablets' Raft configurations unstable due to
+  // frequent leader re-elections.
+  FLAGS_consensus_inject_latency_ms_in_notifications = kLowRaftTimeout;
+
+  // Bring up an additional tserver, to trigger rebalancing.
+  // Moves should be scheduled, even if they cannot be completed because of
+  // the frequent leadership changes.
+  ASSERT_OK(cluster_->AddTabletServer());
+  NO_FATALS(CheckSomeMovesScheduled());
+}
+
+// A cluster that cannot become healthy and meet the replication factor
+// will not attempt rebalancing.
+TEST_F(AutoRebalancerTest, NoReplicaMovesIfCannotMeetReplicationFactor) {
+  const int kNumTservers = 3;
+  const int kNumTablets = 1;
+
+  cluster_opts_.num_tablet_servers = kNumTservers;
+  ASSERT_OK(CreateAndStartCluster());
+  NO_FATALS(CheckAutoRebalancerStarted());
+
+  SetupWorkLoad(kNumTablets, /*num_replicas*/3);
+
+  // Take down a tserver, to make it impossible to meet RF=3 with 2 tservers.
+  NO_FATALS(cluster_->mini_tablet_server(0)->Shutdown());
+  NO_FATALS(CheckNoMovesScheduled());
+}
+
+// Verify that movement of replicas to meet the replication factor
+// does not count towards rebalancing, i.e. the auto-rebalancer will
+// not consider recovering replicas as candidates for replica movement.
+TEST_F(AutoRebalancerTest, NoRebalancingIfReplicasRecovering) {
+  // Set a low timeout for an unresponsive tserver to be presumed dead by the master.
+  FLAGS_tserver_unresponsive_timeout_ms = 1000;
+
+  // Shorten the interval for recovery re-replication to begin.
+  FLAGS_follower_unavailable_considered_failed_sec = 1;
+
+  const int kNumTservers = 3;
+  const int kNumTablets = 4;
+
+  cluster_opts_.num_tablet_servers = kNumTservers;
+  ASSERT_OK(CreateAndStartCluster());
+  NO_FATALS(CheckAutoRebalancerStarted());
+
+  SetupWorkLoad(kNumTablets, /*num_replicas*/3);
+
+  // Add another tserver; immediately take down one of the original tservers.
+  ASSERT_OK(cluster_->AddTabletServer());
+  NO_FATALS(cluster_->mini_tablet_server(0)->Shutdown());
+  // Wait for recovery re-replication to start.
+  SleepFor(MonoDelta::FromSeconds(
+      FLAGS_follower_unavailable_considered_failed_sec));
+
+  // No rebalancing should occur while there are under-replicated replicas.
+  NO_FATALS(CheckNoMovesScheduled());
+}
+
+// Make sure the auto-rebalancer reports the failure of scheduled replica movements,
+// in the case that tablet server failure is not yet accounted for by the TSManager.
+TEST_F(AutoRebalancerTest, TestHandlingFailedTservers) {
+  // Set a high timeout for an unresponsive tserver to be presumed dead,
+  // so the TSManager believes it is still available.
+  FLAGS_tserver_unresponsive_timeout_ms = 120 * 1000;
+
+  const int kNumTservers = 3;
+  const int kNumTablets = 4;
+
+  cluster_opts_.num_tablet_servers = kNumTservers;
+  ASSERT_OK(CreateAndStartCluster());
+  NO_FATALS(CheckAutoRebalancerStarted());
+
+  SetupWorkLoad(kNumTablets, /*num_replicas*/3);
+
+  // Take down all the original tservers.
+  for (int i = 0; i < kNumTservers; ++i) {
+    NO_FATALS(cluster_->mini_tablet_server(i)->Shutdown());
+  }
+
+  // Capture the glog output to ensure failed replica movements occur
+  // and the warnings are logged.
+  StringVectorSink pre_capture_logs;
+  {
+    ScopedRegisterSink reg(&pre_capture_logs);
+    // Bring up a new tserver.
+    ASSERT_OK(cluster_->AddTabletServer());
+
+    // The TSManager should still believe the original tservers are available,
+    // so the auto-rebalancer should attempt to schedule replica moves from those
+    // tservers to the new one.
+    NO_FATALS(CheckSomeMovesScheduled());
+  }
+  ASSERT_STRINGS_ANY_MATCH(pre_capture_logs.logged_msgs(),
+                           "scheduled replica move failed to complete: Network error");
+
+  // Wait for the TSManager to realize that the original tservers are unavailable.
+  FLAGS_tserver_unresponsive_timeout_ms = 5 * 1000;
+  SleepFor(MonoDelta::FromMilliseconds(FLAGS_tserver_unresponsive_timeout_ms));
+
+  // Bring back the tservers.
+  for (int i = 0; i < kNumTservers; ++i) {
+    NO_FATALS(cluster_->mini_tablet_server(i)->Restart());
+  }
+
+  // Make sure the auto-rebalancer thread schedules replica moves because of the
+  // re-available tservers.
+  StringVectorSink post_capture_logs;
+  {
+    ScopedRegisterSink reg(&post_capture_logs);
+    NO_FATALS(CheckSomeMovesScheduled());
+  }
+
+  // These replica moves should not fail because the tservers are unavailable.
+  for (const auto& str : post_capture_logs.logged_msgs()) {
+    ASSERT_STR_NOT_CONTAINS(str, "scheduled replica move failed to complete: Network error");
+  }
+}
+} // namespace master
+} // namespace kudu
diff --git a/src/kudu/master/auto_rebalancer.cc b/src/kudu/master/auto_rebalancer.cc
new file mode 100644
index 0000000..3faca43
--- /dev/null
+++ b/src/kudu/master/auto_rebalancer.cc
@@ -0,0 +1,793 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/master/auto_rebalancer.h"
+
+#include <atomic>
+#include <memory>
+#include <ostream>
+#include <random>
+#include <string>
+#include <unordered_map>
+#include <unordered_set>
+#include <utility>
+#include <vector>
+
+#include <boost/optional/optional.hpp>
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+
+#include "kudu/common/common.pb.h"
+#include "kudu/common/wire_protocol.h"
+#include "kudu/consensus/consensus.pb.h"
+#include "kudu/consensus/consensus.proxy.h"
+#include "kudu/consensus/metadata.pb.h"
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/master/catalog_manager.h"
+#include "kudu/master/master.pb.h"
+#include "kudu/master/ts_descriptor.h"
+#include "kudu/master/ts_manager.h"
+#include "kudu/rebalance/cluster_status.h"
+#include "kudu/rebalance/placement_policy_util.h"
+#include "kudu/rebalance/rebalance_algo.h"
+#include "kudu/rebalance/rebalancer.h"
+#include "kudu/rpc/messenger.h"
+#include "kudu/rpc/rpc_controller.h"
+#include "kudu/tserver/tserver.pb.h"
+#include "kudu/util/cow_object.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/net/net_util.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/status.h"
+#include "kudu/util/thread.h"
+
+using kudu::cluster_summary::HealthCheckResult;
+using kudu::cluster_summary::ReplicaSummary;
+using kudu::cluster_summary::ServerHealth;
+using kudu::cluster_summary::ServerHealthSummary;
+using kudu::cluster_summary::TableSummary;
+using kudu::cluster_summary::TabletSummary;
+using kudu::consensus::ADD_PEER;
+using kudu::consensus::BulkChangeConfigRequestPB;
+using kudu::consensus::ChangeConfigResponsePB;
+using kudu::consensus::ConsensusServiceProxy;
+using kudu::consensus::ConsensusStatePB;
+using kudu::consensus::GetConsensusStateRequestPB;
+using kudu::consensus::GetConsensusStateResponsePB;
+using kudu::consensus::LeaderStepDownMode;
+using kudu::consensus::LeaderStepDownRequestPB;
+using kudu::consensus::LeaderStepDownResponsePB;
+using kudu::consensus::MODIFY_PEER;
+using kudu::consensus::RaftPeerPB;
+using kudu::master::TSManager;
+using kudu::rebalance::BuildTabletExtraInfoMap;
+using kudu::rebalance::ClusterInfo;
+using kudu::rebalance::ClusterLocalityInfo;
+using kudu::rebalance::ClusterRawInfo;
+using kudu::rebalance::PlacementPolicyViolationInfo;
+using kudu::rebalance::Rebalancer;
+using kudu::rebalance::SelectReplicaToMove;
+using kudu::rebalance::TableReplicaMove;
+using kudu::rebalance::TabletExtraInfo;
+using kudu::rebalance::TabletsPlacementInfo;
+using kudu::rpc::MessengerBuilder;
+using kudu::rpc::RpcController;
+using strings::Substitute;
+
+using std::shared_ptr;
+using std::string;
+using std::unordered_map;
+using std::unordered_set;
+using std::vector;
+
+DEFINE_double(auto_rebalancing_load_imbalance_threshold,
+              kudu::rebalance::Rebalancer::Config::kLoadImbalanceThreshold,
+              "The threshold for the per-table location load imbalance. "
+              "The threshold is used during the cross-location rebalancing "
+              "phase. If the measured cross-location load imbalance for a "
+              "table is greater than the specified threshold, the rebalancer "
+              "tries to move table's replicas to reduce the imbalance. "
+              "The recommended range for the threshold is [0.5, ...) with the "
+              "default value of 1.0. The threshold represents a policy "
+              "wrt what to prefer: either ideal balance of the cross-location "
+              "load on per-table basis (lower threshold value) or minimum "
+              "number of replica movements between locations "
+              "(greater threshold value). The default value is empirically "
+              "proven to be a good choice between 'ideal' and 'good enough' "
+              "replica distributions.");
+
+DEFINE_uint32(auto_rebalancing_interval_seconds, 30,
+              "How long to sleep in between rebalancing cycles, before checking "
+              "the cluster again to see if there is skew and rebalancing to be done.");
+
+DEFINE_uint32(auto_rebalancing_max_moves_per_server, 1,
+              "Maximum number of replica moves to perform concurrently on one "
+              "tablet server: 'move from' and 'move to' are counted "
+              "as separate move operations.");
+
+DEFINE_uint32(auto_rebalancing_rpc_timeout_seconds, 60,
+              "RPC timeout in seconds when making RPCs to request moving tablet replicas "
+              "or to check if the replica movement has completed.");
+
+DEFINE_uint32(auto_rebalancing_wait_for_replica_moves_seconds, 1,
+              "How long to wait before checking to see if the scheduled replica movement "
+              "in this iteration of auto-rebalancing has completed.");
+
+namespace kudu {
+
+namespace master {
+
+AutoRebalancerTask::AutoRebalancerTask(CatalogManager* catalog_manager,
+                                       TSManager* ts_manager)
+    : catalog_manager_(catalog_manager),
+      ts_manager_(ts_manager),
+      shutdown_(1),
+      rebalancer_(Rebalancer(Rebalancer::Config(
+      /*ignored_tservers*/{},
+      /*master_addresses*/{},
+      /*table_filters*/{},
+      FLAGS_auto_rebalancing_max_moves_per_server,
+      /*max_staleness_interval_sec*/300,
+      /*max_run_time_sec*/0,
+      /*move_replicas_from_ignored_tservers*/false,
+      /*move_rf1_replicas*/false,
+      /*output_replica_distribution_details*/false,
+      /*run_policy_fixer*/true,
+      /*run_cross_location_rebalancing*/true,
+      /*run_intra_location_rebalancing*/true,
+      FLAGS_auto_rebalancing_load_imbalance_threshold))),
+      random_generator_(random_device_()),
+      number_of_loop_iterations_for_test_(0),
+      moves_scheduled_this_round_for_test_(0) {
+}
+
+AutoRebalancerTask::~AutoRebalancerTask() {
+  if (thread_) {
+    Shutdown();
+  }
+}
+
+Status AutoRebalancerTask::Init() {
+  DCHECK(!thread_) << "AutoRebalancerTask is already initialized";
+  RETURN_NOT_OK(MessengerBuilder("auto-rebalancer").Build(&messenger_));
+  return kudu::Thread::Create("catalog manager", "auto-rebalancer",
+                              [this]() { this->RunLoop(); }, &thread_);
+}
+
+void AutoRebalancerTask::Shutdown() {
+  CHECK(thread_) << "AutoRebalancerTask is not initialized";
+  if (!shutdown_.CountDown()) {
+    return;
+  }
+  CHECK_OK(ThreadJoiner(thread_.get()).Join());
+  thread_.reset();
+}
+
+void AutoRebalancerTask::RunLoop() {
+
+  vector<Rebalancer::ReplicaMove> replica_moves;
+
+  while (!shutdown_.WaitFor(
+      MonoDelta::FromSeconds(FLAGS_auto_rebalancing_interval_seconds))) {
+
+    // If catalog manager isn't initialized or isn't the leader, don't do rebalancing.
+    // Putting the auto-rebalancer to sleep shouldn't affect the master's ability
+    // to become the leader. When the thread wakes up and discovers it is now
+    // the leader, then it can begin auto-rebalancing.
+    {
+      CatalogManager::ScopedLeaderSharedLock l(catalog_manager_);
+      if (!l.first_failed_status().ok()) {
+        moves_scheduled_this_round_for_test_ = 0;
+        continue;
+      }
+    }
+
+    number_of_loop_iterations_for_test_++;
+
+    // Structs to hold information about the cluster's status.
+    ClusterRawInfo raw_info;
+    ClusterInfo cluster_info;
+    TabletsPlacementInfo placement_info;
+
+    Status s = BuildClusterRawInfo(/*location*/boost::none, &raw_info);
+    if (!s.ok()) {
+      LOG(WARNING) << Substitute("Could not retrieve cluster info: $0", s.ToString());
+      continue;
+    }
+
+    // There should be no in-flight moves in progress, because this loop waits
+    // for scheduled moves to complete before continuing to the next iteration.
+    s = rebalancer_.BuildClusterInfo(raw_info, Rebalancer::MovesInProgress(), &cluster_info);
+    if (!s.ok()) {
+      LOG(WARNING) << Substitute("Could not build cluster info: $0", s.ToString());
+      continue;
+    }
+
+    if (config_.run_policy_fixer) {
+      s = BuildTabletsPlacementInfo(raw_info, Rebalancer::MovesInProgress(), &placement_info);
+      if (!s.ok()) {
+        LOG(WARNING) << Substitute("Could not build tablet placement info: $0", s.ToString());
+        continue;
+      }
+    }
+
+    // With the current synchronous implementation, verify that any moves
+    // scheduled in the previous iteration completed.
+    // The container 'replica_moves' should be empty.
+    DCHECK_EQ(0, replica_moves.size());
+
+    // For simplicity, if there are policy violations, only move replicas
+    // to satisfy placement policy in this loop iteration.
+    s = GetMoves(raw_info, cluster_info.locality, placement_info, &replica_moves);
+    if (!s.ok()) {
+      LOG(WARNING) << Substitute("could not retrieve auto-rebalancing replica moves: $0",
+                                 s.ToString());
+      continue;
+    }
+
+    WARN_NOT_OK(ExecuteMoves(replica_moves),
+                "failed to send replica move request");
+
+    moves_scheduled_this_round_for_test_ = replica_moves.size();
+
+    do {
+      if (shutdown_.WaitFor(MonoDelta::FromSeconds(
+            FLAGS_auto_rebalancing_wait_for_replica_moves_seconds))) {
+        return;
+      }
+      WARN_NOT_OK(CheckReplicaMovesCompleted(&replica_moves),
+                  "scheduled replica move failed to complete");
+    } while (!replica_moves.empty());
+  } // end while
+}
+
+Status AutoRebalancerTask::GetMoves(
+    const ClusterRawInfo& raw_info,
+    const ClusterLocalityInfo& locality,
+    const TabletsPlacementInfo& placement_info,
+    vector<Rebalancer::ReplicaMove>* replica_moves) {
+
+  const auto& ts_id_by_location = locality.servers_by_location;
+  vector<Rebalancer::ReplicaMove> rep_moves;
+
+  // No tservers: no moves to make.
+  if (ts_id_by_location.empty()) {
+    return Status::OK();
+  }
+
+  // One location: use greedy rebalancing algorithm to find moves.
+  if (ts_id_by_location.size() == 1) {
+    rebalance::TwoDimensionalGreedyAlgo algo;
+    RETURN_NOT_OK(GetMovesUsingRebalancingAlgo(raw_info, &algo, CrossLocations::NO, &rep_moves));
+    *replica_moves = std::move(rep_moves);
+    return Status::OK();
+  }
+
+  // If there are placement policy violations, only find moves to fix them.
+  // Set flag to indicate that this round of rebalancing will only fix
+  // these violations.
+  if (config_.run_policy_fixer) {
+    vector<PlacementPolicyViolationInfo> ppvi;
+    RETURN_NOT_OK(DetectPlacementPolicyViolations(placement_info, &ppvi));
+    // Filter out all reported violations which are already taken care of.
+    RETURN_NOT_OK(FindMovesToReimposePlacementPolicy(
+        placement_info, locality, ppvi, &rep_moves));
+    if (!rep_moves.empty()) {
+      *replica_moves = std::move(rep_moves);
+      return Status::OK();
+    }
+  }
+
+  // If no placement policy violations were found, perform load rebalancing.
+  // Perform cross-location rebalancing.
+  if (config_.run_cross_location_rebalancing) {
+    rebalance::LocationBalancingAlgo algo(FLAGS_auto_rebalancing_load_imbalance_threshold);
+    RETURN_NOT_OK(GetMovesUsingRebalancingAlgo(
+        raw_info, &algo, CrossLocations::YES, &rep_moves));
+  }
+
+  // Perform intra-location rebalancing.
+  if (config_.run_intra_location_rebalancing) {
+    rebalance::TwoDimensionalGreedyAlgo algo;
+    for (const auto& elem : ts_id_by_location) {
+      const auto& location = elem.first;
+      ClusterRawInfo location_raw_info;
+      BuildClusterRawInfo(location, &location_raw_info);
+      RETURN_NOT_OK(GetMovesUsingRebalancingAlgo(
+          location_raw_info, &algo, CrossLocations::NO, &rep_moves));
+    }
+  }
+  *replica_moves = std::move(rep_moves);
+  return Status::OK();
+}
+
+Status AutoRebalancerTask::GetMovesUsingRebalancingAlgo(
+  const ClusterRawInfo& raw_info,
+  rebalance::RebalancingAlgo* algo,
+  CrossLocations cross_location,
+  vector<Rebalancer::ReplicaMove>* replica_moves) {
+
+  auto num_tservers = raw_info.tserver_summaries.size();
+  auto max_moves = FLAGS_auto_rebalancing_max_moves_per_server * num_tservers;
+  max_moves -= replica_moves->size();
+  if (max_moves <= 0) {
+    return Status::OK();
+  }
+
+  TabletsPlacementInfo tpi;
+  if (cross_location == CrossLocations::YES) {
+    RETURN_NOT_OK(BuildTabletsPlacementInfo(raw_info, Rebalancer::MovesInProgress(), &tpi));
+  }
+
+  unordered_map<string, TabletExtraInfo> extra_info_by_tablet_id;
+  BuildTabletExtraInfoMap(raw_info, &extra_info_by_tablet_id);
+
+  vector<TableReplicaMove> moves;
+  ClusterInfo cluster_info;
+  RETURN_NOT_OK(rebalancer_.BuildClusterInfo(
+      raw_info, Rebalancer::MovesInProgress(), &cluster_info));
+  RETURN_NOT_OK(algo->GetNextMoves(cluster_info, max_moves, &moves));
+
+  unordered_set<string> tablets_in_move;
+  vector<Rebalancer::ReplicaMove> rep_moves;
+  for (const auto& move : moves) {
+    vector<string> tablet_ids;
+    Rebalancer::FindReplicas(move, raw_info, &tablet_ids);
+    if (cross_location == CrossLocations::YES) {
+      // In case of cross-location (a.k.a. inter-location) rebalancing it is
+      // necessary to make sure the majority of replicas would not end up
+      // at the same location after the move. If so, remove those tablets
+      // from the list of candidates.
+      RETURN_NOT_OK(rebalancer_.FilterCrossLocationTabletCandidates(
+          cluster_info.locality.location_by_ts_id, tpi, move, &tablet_ids));
+    }
+
+    RETURN_NOT_OK(SelectReplicaToMove(move, extra_info_by_tablet_id,
+                                      &random_generator_, std::move(tablet_ids),
+                                      &tablets_in_move, &rep_moves));
+  }
+
+  *replica_moves = std::move(rep_moves);
+  return Status::OK();
+}
+
+Status AutoRebalancerTask::GetTabletLeader(
+    const string& tablet_id,
+    string* leader_uuid,
+    HostPort* leader_hp) const {
+  TabletLocationsPB locs_pb;
+  CatalogManager::TSInfosDict ts_infos_dict;
+  // GetTabletLocations() will fail if the catalog manager is not the leader.
+  {
+    CatalogManager::ScopedLeaderSharedLock l(catalog_manager_);
+    RETURN_NOT_OK(l.first_failed_status());
+    RETURN_NOT_OK(catalog_manager_->GetTabletLocations(
+        tablet_id,
+        ReplicaTypeFilter::VOTER_REPLICA,
+        &locs_pb,
+        &ts_infos_dict,
+        boost::none));
+  }
+  for (const auto& r : locs_pb.interned_replicas()) {
+    if (r.role() == RaftPeerPB::LEADER) {
+      int index = r.ts_info_idx();
+      const TSInfoPB ts_info = *(ts_infos_dict.ts_info_pbs[index]);
+      *leader_uuid = ts_info.permanent_uuid();
+      const auto& addr = ts_info.rpc_addresses(0);
+      leader_hp->set_host(addr.host());
+      leader_hp->set_port(addr.port());
+      return Status::OK();
+    }
+  }
+  return Status::NotFound(Substitute("Couldn't find leader for tablet $0", tablet_id));
+}
+
+// TODO(hannah.nguyen): remove moves that fail to be scheduled from 'replica_moves'.
+Status AutoRebalancerTask::ExecuteMoves(
+    const vector<Rebalancer::ReplicaMove>& replica_moves) {
+
+  for (const auto& move_info : replica_moves) {
+    const auto& tablet_id = move_info.tablet_uuid;
+    const auto& src_ts_uuid = move_info.ts_uuid_from;
+    const auto& dst_ts_uuid = move_info.ts_uuid_to;
+
+    string leader_uuid;
+    HostPort leader_hp;
+    RETURN_NOT_OK(GetTabletLeader(tablet_id, &leader_uuid, &leader_hp));
+
+    // Mark the replica to be replaced.
+    BulkChangeConfigRequestPB req;
+    auto* replace = req.add_config_changes();
+    replace->set_type(MODIFY_PEER);
+    *replace->mutable_peer()->mutable_permanent_uuid() = src_ts_uuid;
+    replace->mutable_peer()->mutable_attrs()->set_replace(true);
+
+    shared_ptr<TSDescriptor> desc;
+    if (!ts_manager_->LookupTSByUUID(leader_uuid, &desc)) {
+      return Status::NotFound(
+          Substitute("Couldn't find leader replica's tserver $0", leader_uuid));
+    }
+
+    // Set up the proxy to communicate with the leader replica's tserver.
+    shared_ptr<ConsensusServiceProxy> proxy;
+    HostPort hp;
+    RETURN_NOT_OK(hp.ParseString(leader_uuid, leader_hp.port()));
+    vector<Sockaddr> resolved;
+    RETURN_NOT_OK(hp.ResolveAddresses(&resolved));
+    proxy.reset(new ConsensusServiceProxy(messenger_, resolved[0], hp.host()));
+
+    // Find the specified destination tserver, if load rebalancing.
+    // Otherwise, replica moves to fix placement policy violations do not have
+    // destination tservers specified, i.e. dst_ts_uuid will be empty if
+    // there are placement policy violations.
+    if (!dst_ts_uuid.empty()) {
+      // Verify that the destination tserver exists.
+      shared_ptr<TSDescriptor> dest_desc;
+      if (!ts_manager_->LookupTSByUUID(dst_ts_uuid, &dest_desc)) {
+        return Status::NotFound("Could not find destination tserver");
+      }
+
+      auto* change = req.add_config_changes();
+      change->set_type(ADD_PEER);
+      *change->mutable_peer()->mutable_permanent_uuid() = dst_ts_uuid;
+      change->mutable_peer()->set_member_type(RaftPeerPB::NON_VOTER);
+      change->mutable_peer()->mutable_attrs()->set_promote(true);
+      RETURN_NOT_OK(
+          HostPortToPB(hp, change->mutable_peer()->mutable_last_known_addr()));
+    }
+
+    // Request movement or replacement of the replica.
+    ChangeConfigResponsePB resp;
+    RpcController rpc;
+    rpc.set_timeout(MonoDelta::FromSeconds(FLAGS_auto_rebalancing_rpc_timeout_seconds));
+    req.set_dest_uuid(leader_uuid);
+    req.set_tablet_id(tablet_id);
+
+    RETURN_NOT_OK(proxy->BulkChangeConfig(req, &resp, &rpc));
+    if (resp.has_error()) return StatusFromPB(resp.error().status());
+  }
+
+  return Status::OK();
+}
+
+Status AutoRebalancerTask::BuildClusterRawInfo(
+    const boost::optional<string>& location,
+    ClusterRawInfo* raw_info) const {
+
+  vector<ServerHealthSummary> tserver_summaries;
+  unordered_set<string> tserver_uuids;
+  vector<TableSummary> table_summaries;
+  vector<TabletSummary> tablet_summaries;
+
+  // Avoid making any moves if not all tservers are up, to prevent the possibility
+  // of moving tablets, then having to move them again later, when a tserver that
+  // was not available before, is available for tablet placement again.
+  TSDescriptorVector descriptors;
+  ts_manager_->GetDescriptorsAvailableForPlacement(&descriptors);
+  if (descriptors.size() != ts_manager_->GetLiveCount()) {
+    return Status::IllegalState(Substitute("not all tservers available for tablet placement"));
+  }
+  tserver_uuids.reserve(descriptors.size());
+  tserver_summaries.reserve(descriptors.size());
+
+  // All the tservers are healthy and available for placement.
+  // For rebalancing, only need to fill the uuid and location fields.
+  for (const auto& ts : descriptors) {
+    ServerHealthSummary summary;
+    summary.uuid = ts->permanent_uuid();
+    if (ts->location()) {
+      summary.ts_location = *(ts->location());
+    }
+    summary.health = ServerHealth::HEALTHY;
+    tserver_uuids.insert(summary.uuid);
+    tserver_summaries.push_back(std::move(summary));
+  }
+
+  vector<scoped_refptr<TableInfo>> table_infos;
+
+  {
+    CatalogManager::ScopedLeaderSharedLock leader_lock(catalog_manager_);
+    RETURN_NOT_OK(leader_lock.first_failed_status());
+    RETURN_NOT_OK(catalog_manager_->GetAllTables(&table_infos));
+  }
+
+  table_summaries.reserve(table_infos.size());
+
+  for (const auto& table : table_infos) {
+    TableMetadataLock table_l(table.get(), LockMode::READ);
+
+    TableSummary table_summary;
+    table_summary.id = table->id();
+    const SysTablesEntryPB& table_data = table->metadata().state().pb;
+    table_summary.name = table_data.name();
+    table_summary.replication_factor = table_data.num_replicas();
+
+    vector<scoped_refptr<TabletInfo>> tablet_infos;
+    table->GetAllTablets(&tablet_infos);
+    tablet_summaries.reserve(tablet_summaries.size() + tablet_infos.size());
+
+    for (const auto& tablet : tablet_infos) {
+      TabletMetadataLock tablet_l(tablet.get(), LockMode::READ);
+
+      TabletSummary tablet_summary;
+      tablet_summary.id = tablet->id();
+      tablet_summary.table_id = table_summary.id;
+      tablet_summary.table_name = table_summary.name;
+
+      // Retrieve all replicas of the tablet.
+      vector<ReplicaSummary> replicas;
+      TabletLocationsPB locs_pb;
+      CatalogManager::TSInfosDict ts_infos_dict;
+      // GetTabletLocations() will fail if the catalog manager is not the leader.
+      {
+        CatalogManager::ScopedLeaderSharedLock leaderlock(catalog_manager_);
+        RETURN_NOT_OK(leaderlock.first_failed_status());
+        // This will only return tablet replicas in the RUNNING state, and filter
+        // to only retrieve voter replicas.
+        RETURN_NOT_OK(catalog_manager_->GetTabletLocations(
+            tablet_summary.id,
+            ReplicaTypeFilter::VOTER_REPLICA,
+            &locs_pb,
+            &ts_infos_dict,
+            boost::none));
+      }
+
+      // Consensus state information is the same for all replicas of this tablet.
+      const ConsensusStatePB& cstatepb = tablet_l.data().pb.consensus_state();
+      vector<string> voters;
+      vector<string> non_voters;
+      for (const auto& peer : cstatepb.committed_config().peers()) {
+        if (peer.member_type() == RaftPeerPB::VOTER) {
+          voters.push_back(peer.permanent_uuid());
+        } else if (peer.member_type() == RaftPeerPB::NON_VOTER) {
+          non_voters.push_back(peer.permanent_uuid());
+        }
+      }
+
+      int leaders_count = 0;
+
+      // Build a summary for each replica of the tablet.
+      // Make sure that the tserver the tablet is on is registered with the master
+      // and is available for replica placement.
+      // If not, return an error.
+      for (const auto& r : locs_pb.interned_replicas()) {
+        int index = r.ts_info_idx();
+        const TSInfoPB& ts_info = *(ts_infos_dict.ts_info_pbs[index]);
+        ReplicaSummary rep;
+        rep.ts_uuid = ts_info.permanent_uuid();
+        if (!ContainsKey(tserver_uuids, rep.ts_uuid)) {
+          return Status::NotFound(Substitute("tserver $0 not available for placement",
+                                             rep.ts_uuid));
+        }
+        const auto& addr = ts_info.rpc_addresses(0);
+        rep.ts_address = Substitute("$0:$1", addr.host(), addr.port());
+        rep.is_leader = r.role() == RaftPeerPB::LEADER;
+        if (rep.is_leader) {
+          leaders_count++;
+        }
+        rep.is_voter = true;
+        rep.ts_healthy = true;
+        replicas.push_back(rep);
+      }
+
+      tablet_summary.replicas.swap(replicas);
+
+      // Determine if tablet is healthy enough for rebalancing.
+      if (voters.size() < table_summary.replication_factor) {
+        tablet_summary.result = HealthCheckResult::UNDER_REPLICATED;
+      } else if (leaders_count != 1) {
+        tablet_summary.result = HealthCheckResult::UNAVAILABLE;
+      } else {
+        tablet_summary.result = HealthCheckResult::HEALTHY;
+      }
+
+      tablet_summaries.push_back(std::move(tablet_summary));
+    }
+
+    table_summaries.push_back(std::move(table_summary));
+  }
+
+  if (!location) {
+    // Information on the whole cluster.
+    raw_info->tserver_summaries = std::move(tserver_summaries);
+    raw_info->tablet_summaries = std::move(tablet_summaries);
+    raw_info->table_summaries = std::move(table_summaries);
+
+    return Status::OK();
+  }
+
+  // Information on the specified location only: filter out non-relevant info.
+  const auto& location_str = *location;
+
+  unordered_set<string> ts_ids_at_location;
+  for (const auto& summary : tserver_summaries) {
+    if (summary.ts_location == location_str) {
+      raw_info->tserver_summaries.push_back(summary);
+      InsertOrDie(&ts_ids_at_location, summary.uuid);
+    }
+  }
+
+  unordered_set<string> table_ids_at_location;
+  for (const auto& summary : tablet_summaries) {
+    const auto& replicas = summary.replicas;
+    vector<ReplicaSummary> replicas_at_location;
+    replicas_at_location.reserve(replicas.size());
+    for (const auto& replica : replicas) {
+      if (ContainsKey(ts_ids_at_location, replica.ts_uuid)) {
+        replicas_at_location.push_back(replica);
+      }
+    }
+    if (!replicas_at_location.empty()) {
+      table_ids_at_location.insert(summary.table_id);
+      raw_info->tablet_summaries.push_back(summary);
+      raw_info->tablet_summaries.back().replicas = std::move(replicas_at_location);
+    }
+  }
+
+  for (const auto& summary : table_summaries) {
+    if (ContainsKey(table_ids_at_location, summary.id)) {
+      raw_info->table_summaries.push_back(summary);
+    }
+  }
+
+  return Status::OK();
+}
+
+Status AutoRebalancerTask::CheckReplicaMovesCompleted(
+    vector<rebalance::Rebalancer::ReplicaMove>* replica_moves) {
+
+  bool move_is_complete;
+  vector<int> indexes_to_remove;
+
+  for (int i = 0; i < replica_moves->size(); ++i) {
+    const rebalance::Rebalancer::ReplicaMove& move = (*replica_moves)[i];
+
+    // Check if there was an error in checking move completion.
+    // If so, moves are erased from 'replica_moves' other than this problematic one.
+    Status s = CheckMoveCompleted(move, &move_is_complete);
+    if (!s.ok()) {
+      replica_moves->erase(replica_moves->begin() + i);
+      LOG(WARNING) << Substitute("Could not move replica: $0", s.ToString());
+      return s;
+    }
+
+    // If move was completed, remove it from 'replica_moves'.
+    if (move_is_complete) {
+      indexes_to_remove.push_back(i);
+    }
+  }
+
+  int num_indexes = static_cast<int>(indexes_to_remove.size());
+  for (int j = num_indexes - 1; j >= 0; --j) {
+    replica_moves->erase(replica_moves->begin() + indexes_to_remove[j]);
+  }
+
+  return Status::OK();
+}
+
+// TODO(hannah.nguyen): Retrieve consensus state information from the CatalogManager instead.
+// Currently, this implementation mirrors CheckCompleteMove() in tools/tool_replica_util.
+Status AutoRebalancerTask::CheckMoveCompleted(
+    const rebalance::Rebalancer::ReplicaMove& replica_move,
+    bool* is_complete) {
+
+  DCHECK(is_complete);
+  *is_complete = false;
+
+  const auto& tablet_uuid = replica_move.tablet_uuid;
+  const auto& from_ts_uuid = replica_move.ts_uuid_from;
+  const auto& to_ts_uuid = replica_move.ts_uuid_to;
+
+  // Get the latest leader info. This may change later.
+  string orig_leader_uuid;
+  HostPort orig_leader_hp;
+  RETURN_NOT_OK(GetTabletLeader(tablet_uuid, &orig_leader_uuid, &orig_leader_hp));
+  shared_ptr<TSDescriptor> desc;
+  if (!ts_manager_->LookupTSByUUID(orig_leader_uuid, &desc)) {
+    return Status::NotFound("Could not find leader replica's tserver");
+  }
+  shared_ptr<ConsensusServiceProxy> proxy;
+  RETURN_NOT_OK(desc->GetConsensusProxy(messenger_, &proxy));
+
+  // Check if replica at 'to_ts_uuid' is in the config, and if it has been
+  // promoted to voter.
+  ConsensusStatePB cstate;
+  GetConsensusStateRequestPB req;
+  GetConsensusStateResponsePB resp;
+  RpcController rpc;
+  rpc.set_timeout(MonoDelta::FromSeconds(FLAGS_auto_rebalancing_rpc_timeout_seconds));
+  req.set_dest_uuid(orig_leader_uuid);
+  req.add_tablet_ids(tablet_uuid);
+  RETURN_NOT_OK(proxy->GetConsensusState(req, &resp, &rpc));
+  if (resp.has_error()) {
+    return StatusFromPB(resp.error().status());
+  }
+  if (resp.tablets_size() == 0) {
+    return Status::NotFound("tablet not found:", tablet_uuid);
+  }
+  DCHECK_EQ(1, resp.tablets_size());
+  cstate = resp.tablets(0).cstate();
+
+  bool to_ts_uuid_in_config = false;
+  bool to_ts_uuid_is_a_voter = false;
+  for (const auto& peer : cstate.committed_config().peers()) {
+    if (peer.permanent_uuid() == to_ts_uuid) {
+      to_ts_uuid_in_config = true;
+      if (peer.member_type() == RaftPeerPB::VOTER) {
+        to_ts_uuid_is_a_voter = true;
+      }
+      break;
+    }
+  }
+
+  // Failure case: newly added replica is no longer in the config.
+  if (!to_ts_uuid.empty() && !to_ts_uuid_in_config) {
+    return Status::Incomplete(Substitute(
+        "tablet $0, TS $1 -> TS $2 move failed, target replica disappeared",
+        tablet_uuid, from_ts_uuid, to_ts_uuid));
+  }
+
+  // Check if replica slated for removal is still in the config.
+  bool from_ts_uuid_in_config = false;
+  for (const auto& peer : cstate.committed_config().peers()) {
+    if (peer.permanent_uuid() == from_ts_uuid) {
+      // Source replica must have the REPLACE attribute set.
+      if (!peer.attrs().replace()) {
+        return Status::IllegalState(Substitute(
+            "$0: source replica $1 does not have REPLACE attribute set",
+            tablet_uuid, from_ts_uuid));
+      }
+      // Replica to be removed is the leader.
+      // - It's possible that leadership changed and 'orig_leader_uuid' is not
+      //   the leader's UUID by the time 'cstate' was collected. Let's
+      //   cross-reference the two sources and only act if they agree.
+      // - It doesn't make sense to have the leader step down if the newly-added
+      //   replica hasn't been promoted to a voter yet, since changing
+      //   leadership can only delay that process and the stepped-down leader
+      //   replica will not be evicted until the newly added replica is promoted
+      //   to voter.
+      if (orig_leader_uuid == from_ts_uuid && orig_leader_uuid == cstate.leader_uuid()) {
+        LeaderStepDownRequestPB req;
+        LeaderStepDownResponsePB resp;
+        RpcController rpc;
+        req.set_dest_uuid(orig_leader_uuid);
+        req.set_tablet_id(tablet_uuid);
+        req.set_mode(LeaderStepDownMode::GRACEFUL);
+        rpc.set_timeout(MonoDelta::FromSeconds(FLAGS_auto_rebalancing_rpc_timeout_seconds));
+        RETURN_NOT_OK(proxy->LeaderStepDown(req, &resp, &rpc));
+        if (resp.has_error()) {
+          return StatusFromPB(resp.error().status());
+        }
+      }
+
+      from_ts_uuid_in_config = true;
+      break;
+    }
+  }
+
+  if (!from_ts_uuid_in_config &&
+      (to_ts_uuid_is_a_voter || to_ts_uuid.empty())) {
+    *is_complete = true;
+  }
+
+  return Status::OK();
+}
+
+} // namespace master
+} // namespace kudu
diff --git a/src/kudu/master/auto_rebalancer.h b/src/kudu/master/auto_rebalancer.h
new file mode 100644
index 0000000..0861751
--- /dev/null
+++ b/src/kudu/master/auto_rebalancer.h
@@ -0,0 +1,195 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <atomic>
+#include <memory>
+#include <random>
+#include <string>
+#include <vector>
+
+#include <boost/optional/optional.hpp>
+
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/rebalance/rebalancer.h"
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+class HostPort;
+class Thread;
+
+namespace rebalance {
+class RebalancingAlgo;
+struct ClusterLocalityInfo;
+struct TabletsPlacementInfo;
+} // namespace rebalance
+
+namespace rpc {
+class Messenger;
+} // namespace rpc
+
+namespace master {
+
+class CatalogManager;
+class TSManager;
+
+enum CrossLocations {
+  YES,
+  NO
+};
+
+// A CatalogManager background task which auto-rebalances tablet replica distribution.
+//
+// As a background task, the lifetime of an instance of this class must be less
+// than the catalog manager it belongs to.
+//
+// The auto-rebalancing task continuously wakes up according to its
+// configured poll period. It performs no work when the master is a follower.
+class AutoRebalancerTask {
+ public:
+
+  AutoRebalancerTask(CatalogManager* catalog_manager, TSManager* ts_manager);
+  ~AutoRebalancerTask();
+
+  // Initializes the auto-rebalancer.
+  Status Init() WARN_UNUSED_RESULT;
+
+  // Shuts down the auto-rebalancer. This must be called
+  // before shutting down the catalog manager.
+  void Shutdown();
+
+ private:
+
+  friend class AutoRebalancerTest;
+
+  // Runs the main loop of the auto-rebalancing thread.
+  void RunLoop();
+
+  // Collects information about the cluster at the location specified by the
+  // 'location' parameter. If there is no location specified (and the parameter
+  // is set to 'boost::none'), information is being collected about the cluster
+  // to do cross-location rebalancing.
+  Status BuildClusterRawInfo(
+      const boost::optional<std::string>& location,
+      rebalance::ClusterRawInfo* raw_info) const;
+
+  // Gets a set of replica moves using the specified rebalancing algorithm and
+  // the information in 'raw_info.' The 'cross_location' parameter specifies
+  // whether or not the replica moves should be for cross-location rebalancing.
+  // The function adds to the 'replica_moves' parameter, which may or may not
+  // be empty.
+  Status GetMovesUsingRebalancingAlgo(
+      const rebalance::ClusterRawInfo& raw_info,
+      rebalance::RebalancingAlgo* algo,
+      CrossLocations cross_location,
+      std::vector<rebalance::Rebalancer::ReplicaMove>* replica_moves);
+
+  // Gets next set of replica moves for the auto-rebalancer task. The number of
+  // moves that will be put into the parameter 'replica_moves' is limited by a
+  // gflag that sets the maximum number of replica moves per server.
+  // The function first checks for violations of the placement policy. If so,
+  // 'replica_moves' will only be populated with moves to reinstate the policy.
+  // Otherwise, the function will find and add as many replica moves as possible
+  // to perform load rebalancing.
+  // If there are multiple locations, the moves will be first to perform cross-location
+  // rebalancing, then intra-location rebalancing.
+  // Returns a non-OK status if information about the cluster or its locations
+  // cannot be constructed, or there is no way to reimpose the placement policy.
+  // No changes are made to 'replica_moves' if a non-OK status is returned.
+  // If no moves are necessary to rebalance the cluster, the function returns
+  // Status::OK() and 'replica_moves' remains empty.
+  Status GetMoves(
+      const rebalance::ClusterRawInfo& raw_info,
+      const rebalance::ClusterLocalityInfo& locality,
+      const rebalance::TabletsPlacementInfo& placement_info,
+      std::vector<rebalance::Rebalancer::ReplicaMove>* replica_moves);
+
+  // Gets information on the current leader replica for the specified tablet and
+  // populates the 'leader_uuid' and 'leader_hp' output parameters. The
+  // function returns Status::NotFound() if no replica is a leader for the tablet.
+  Status GetTabletLeader(
+      const std::string& tablet_id,
+      std::string* leader_uuid,
+      HostPort* leader_hp) const;
+
+  // Finds replicas that are specified in 'replica_moves' and make requests
+  // to have them moved in order to rebalance the cluster.
+  // Returns a non-OK status if the replica or the replica's tserver
+  // cannot be found, or the request to move the replica cannot be completed.
+  Status ExecuteMoves(
+      const std::vector<rebalance::Rebalancer::ReplicaMove>& replica_moves);
+
+  // Given a set of replica moves, return Status::OK() if checking completion
+  // progress does not encounter an error. Otherwise, return the first error
+  // encountered when checking the status of the moves.
+  //
+  // If no error was encountered, moves that have been completed are
+  // removed from 'replica_moves'. Otherwise, only the move that caused an error
+  // is removed from 'replica_moves'.
+  Status CheckReplicaMovesCompleted(
+      std::vector<rebalance::Rebalancer::ReplicaMove>* replica_moves);
+
+  // Given a replica move, determine whether the operation has completed.
+  // The 'is_complete' output parameter cannot be null.
+  // If the replica move is completed and no errors were encountered, 'is_complete'
+  // is set to true and the function returns Status::OK(). If the replica move
+  // is not complete yet, but no errors were encountered, 'is_complete' is set
+  // to false and the function returns Status::OK().
+  //
+  // Otherwise, it returns the first non-OK status encountered while trying to
+  // get the status of the replica movement.
+  //
+  // If the source replica is a leader, this function asks it to step down.
+  Status CheckMoveCompleted(
+      const rebalance::Rebalancer::ReplicaMove& replica_move,
+      bool* is_complete);
+
+  // The associated catalog manager.
+  CatalogManager* catalog_manager_;
+
+  // The associated TS manager.
+  TSManager* ts_manager_;
+
+  // The auto-rebalancing thread.
+  scoped_refptr<kudu::Thread> thread_;
+
+  // Latch used to indicate that the thread is shutting down.
+  CountDownLatch shutdown_;
+
+  // The internal Rebalancer object.
+  rebalance::Rebalancer rebalancer_;
+
+  // The Config struct to initialize the Rebalancer object.
+  rebalance::Rebalancer::Config config_;
+
+  std::shared_ptr<rpc::Messenger> messenger_;
+
+  // Random device and generator for selecting among multiple choices.
+  std::random_device random_device_;
+  std::mt19937 random_generator_;
+
+  // Variables for testing.
+  std::atomic<int> number_of_loop_iterations_for_test_;
+  std::atomic<int> moves_scheduled_this_round_for_test_;
+};
+
+} // namespace master
+} // namespace kudu
diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc
index 193d69c..53e7919 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -98,6 +98,7 @@
 #include "kudu/gutil/walltime.h"
 #include "kudu/hms/hms_catalog.h"
 #include "kudu/master/authz_provider.h"
+#include "kudu/master/auto_rebalancer.h"
 #include "kudu/master/default_authz_provider.h"
 #include "kudu/master/hms_notification_log_listener.h"
 #include "kudu/master/master.h"
@@ -307,6 +308,11 @@ DEFINE_int64(live_row_count_for_testing, 0,
 TAG_FLAG(live_row_count_for_testing, hidden);
 TAG_FLAG(live_row_count_for_testing, runtime);
 
+DEFINE_bool(auto_rebalancing_enabled, false,
+            "Whether auto-rebalancing is enabled.");
+TAG_FLAG(auto_rebalancing_enabled, advanced);
+TAG_FLAG(auto_rebalancing_enabled, experimental);
+
 DECLARE_bool(raft_prepare_replacement_before_eviction);
 DECLARE_int64(tsk_rotation_seconds);
 
@@ -326,6 +332,20 @@ static bool ValidateRangerSentryFlags() {
 }
 GROUP_FLAG_VALIDATOR(ranger_sentry_flags, ValidateRangerSentryFlags);
 
+// Validates that if auto-rebalancing is enabled, the cluster uses 3-4-3 replication
+// (the --raft_prepare_replacement_before_eviction flag must be set to true).
+static bool Validate343SchemeEnabledForAutoRebalancing()  {
+  if (FLAGS_auto_rebalancing_enabled &&
+      !FLAGS_raft_prepare_replacement_before_eviction) {
+    LOG(ERROR) << "If enabling auto-rebalancing, Kudu must be configured"
+                  " with --raft_prepare_replacement_before_eviction.";
+    return false;
+  }
+  return true;
+}
+GROUP_FLAG_VALIDATOR(auto_rebalancing_flags,
+                     Validate343SchemeEnabledForAutoRebalancing);
+
 using base::subtle::NoBarrier_CompareAndSwap;
 using base::subtle::NoBarrier_Load;
 using boost::make_optional;
@@ -801,16 +821,23 @@ Status CatalogManager::Init(bool is_first_run) {
   RETURN_NOT_OK_PREPEND(sys_catalog_->WaitUntilRunning(),
                         "Failed waiting for the catalog tablet to run");
 
+  if (FLAGS_auto_rebalancing_enabled) {
+    unique_ptr<AutoRebalancerTask> task(
+        new AutoRebalancerTask(this, master_->ts_manager()));
+    RETURN_NOT_OK_PREPEND(task->Init(), "failed to initialize auto-rebalancing task");
+    auto_rebalancer_ = std::move(task);
+  }
+
   if (hms::HmsCatalog::IsEnabled()) {
     vector<HostPortPB> master_addrs_pb;
     RETURN_NOT_OK(master_->GetMasterHostPorts(&master_addrs_pb));
 
     string master_addresses = JoinMapped(
-        master_addrs_pb,
-        [] (const HostPortPB& hostport) {
-          return Substitute("$0:$1", hostport.host(), hostport.port());
-        },
-        ",");
+      master_addrs_pb,
+      [] (const HostPortPB& hostport) {
+        return Substitute("$0:$1", hostport.host(), hostport.port());
+      },
+      ",");
 
     // The leader_lock_ isn't really intended for this (it's for serializing
     // new leadership initialization against regular catalog manager operations)
@@ -1327,6 +1354,10 @@ void CatalogManager::Shutdown() {
     hms_catalog_->Stop();
   }
 
+  if (auto_rebalancer_) {
+    auto_rebalancer_->Shutdown();
+  }
+
   // Mark all outstanding table tasks as aborted and wait for them to fail.
   //
   // There may be an outstanding table visitor thread modifying the table map,
diff --git a/src/kudu/master/catalog_manager.h b/src/kudu/master/catalog_manager.h
index 0d9d9b3..d6d5ff6 100644
--- a/src/kudu/master/catalog_manager.h
+++ b/src/kudu/master/catalog_manager.h
@@ -97,7 +97,7 @@ class TokenSigningPublicKeyPB;
 
 namespace tserver {
 class TsTabletManagerITest_TestTableStats_Test;
-}
+} // namespace tserver
 
 namespace tablet {
 class TabletReplica;
@@ -106,6 +106,7 @@ class TabletReplica;
 namespace master {
 
 class AuthzProvider;
+class AutoRebalancerTask;
 class CatalogManagerBgTasks;
 class HmsNotificationLogListenerTask;
 class Master;
@@ -145,8 +146,8 @@ struct PersistentTabletInfo {
 // of data are in PersistentTabletInfo above, and typically accessed using
 // TabletMetadataLock. For example:
 //
-//   TabletInfo* table = ...;
-//   TabletMetadataLock l(tablet, TableMetadataLock::READ);
+//   TabletInfo* tablet = ...;
+//   TabletMetadataLock l(tablet, TabletMetadataLock::READ);
 //   if (l.data().is_running()) { ... }
 //
 // The non-persistent information about the tablet is protected by an internal
@@ -768,6 +769,10 @@ class CatalogManager : public tserver::TabletReplicaLookupIf {
     return authz_provider_.get();
   }
 
+  master::AutoRebalancerTask* auto_rebalancer() const {
+    return auto_rebalancer_.get();
+  }
+
   // Returns the normalized form of the provided table name.
   //
   // If the HMS integration is configured and the table name is a valid HMS
@@ -790,6 +795,7 @@ class CatalogManager : public tserver::TabletReplicaLookupIf {
   // This test exclusively acquires the leader_lock_ directly.
   FRIEND_TEST(kudu::client::ServiceUnavailableRetryClientTest, CreateTable);
 
+  friend class AutoRebalancerTest;
   friend class TableLoader;
   friend class TabletLoader;
 
@@ -1108,6 +1114,8 @@ class CatalogManager : public tserver::TabletReplicaLookupIf {
 
   std::unique_ptr<master::AuthzProvider> authz_provider_;
 
+  std::unique_ptr<AutoRebalancerTask> auto_rebalancer_;
+
   enum State {
     kConstructed,
     kStarting,
diff --git a/src/kudu/master/master_runner.cc b/src/kudu/master/master_runner.cc
index 3c5d8ef..eac8097 100644
--- a/src/kudu/master/master_runner.cc
+++ b/src/kudu/master/master_runner.cc
@@ -37,6 +37,9 @@ DECLARE_bool(evict_failed_followers);
 DECLARE_bool(hive_metastore_sasl_enabled);
 DECLARE_string(keytab_file);
 
+DECLARE_bool(auto_rebalancing_enabled);
+DECLARE_bool(raft_prepare_replacement_before_eviction);
+
 namespace kudu {
 namespace master {
 
diff --git a/src/kudu/master/ts_descriptor.h b/src/kudu/master/ts_descriptor.h
index 5b8c8ee..de90c8e 100644
--- a/src/kudu/master/ts_descriptor.h
+++ b/src/kudu/master/ts_descriptor.h
@@ -168,6 +168,7 @@ class TSDescriptor : public enable_make_shared<TSDescriptor> {
 
  private:
   FRIEND_TEST(TestTSDescriptor, TestReplicaCreationsDecay);
+  friend class AutoRebalancerTest;
   friend class PlacementPolicyTest;
 
   // Uses DNS to resolve registered hosts to a single Sockaddr.
@@ -177,6 +178,10 @@ class TSDescriptor : public enable_make_shared<TSDescriptor> {
 
   void DecayRecentReplicaCreationsUnlocked();
 
+  void AssignLocationForTesting(std::string loc) {
+    location_ = std::move(loc);
+  }
+
   mutable rw_spinlock lock_;
 
   const std::string permanent_uuid_;
diff --git a/src/kudu/master/ts_manager.cc b/src/kudu/master/ts_manager.cc
index cc1b89d..6a80b22 100644
--- a/src/kudu/master/ts_manager.cc
+++ b/src/kudu/master/ts_manager.cc
@@ -205,6 +205,18 @@ int TSManager::GetCount() const {
   return servers_by_id_.size();
 }
 
+int TSManager::GetLiveCount() const {
+  shared_lock<rw_spinlock> l(lock_);
+  int live_count = 0;
+  for (const auto& entry : servers_by_id_) {
+    const shared_ptr<TSDescriptor>& ts = entry.second;
+    if (!ts->PresumedDead()) {
+      live_count++;
+    }
+  }
+  return live_count;
+}
+
 unordered_set<string> TSManager::GetUuidsToIgnoreForUnderreplication() const {
   unordered_set<string> uuids;
   shared_lock<RWMutex> tsl(ts_state_lock_);
diff --git a/src/kudu/master/ts_manager.h b/src/kudu/master/ts_manager.h
index 831320d..464dfa6 100644
--- a/src/kudu/master/ts_manager.h
+++ b/src/kudu/master/ts_manager.h
@@ -106,6 +106,9 @@ class TSManager {
   // Get the TS count.
   int GetCount() const;
 
+  // Get the live TS count.
+  int GetLiveCount() const;
+
   // Sets the tserver state for the given tserver, persisting it to disk.
   //
   // If removing a tserver from maintenance mode, this also sets that all
diff --git a/src/kudu/rebalance/rebalancer.cc b/src/kudu/rebalance/rebalancer.cc
index 84ee672..ccc131b 100644
--- a/src/kudu/rebalance/rebalancer.cc
+++ b/src/kudu/rebalance/rebalancer.cc
@@ -23,6 +23,7 @@
 #include <iterator>
 #include <limits>
 #include <map>
+#include <random>
 #include <set>
 #include <string>
 #include <unordered_map>
@@ -462,6 +463,72 @@ Status Rebalancer::BuildClusterInfo(const ClusterRawInfo& raw_info,
   return Status::OK();
 }
 
+void BuildTabletExtraInfoMap(
+    const ClusterRawInfo& raw_info,
+    std::unordered_map<std::string, TabletExtraInfo>* extra_info_by_tablet_id) {
+  unordered_map<string, int> replication_factors_by_table;
+  for (const auto& s : raw_info.table_summaries) {
+    EmplaceOrDie(&replication_factors_by_table, s.id, s.replication_factor);
+  }
+  for (const auto& s : raw_info.tablet_summaries) {
+    int num_voters = 0;
+    for (const auto& rs : s.replicas) {
+      if (rs.is_voter) {
+        ++num_voters;
+      }
+    }
+    const auto rf = FindOrDie(replication_factors_by_table, s.table_id);
+    EmplaceOrDie(extra_info_by_tablet_id,
+                 s.id, TabletExtraInfo{rf, num_voters});
+  }
+}
+
+Status SelectReplicaToMove(
+    const TableReplicaMove& move,
+    const unordered_map<string, TabletExtraInfo>& extra_info_by_tablet_id,
+    std::mt19937* random_generator,
+    vector<string> tablet_ids,
+    unordered_set<string>* tablets_in_move,
+    vector<Rebalancer::ReplicaMove>* replica_moves) {
+
+  // Shuffle the set of the tablet identifiers: that's to achieve even spread
+  // of moves across tables with the same skew.
+  std::shuffle(tablet_ids.begin(), tablet_ids.end(), *random_generator);
+  string move_tablet_id;
+  for (const auto& tablet_id : tablet_ids) {
+    if (!ContainsKey(*tablets_in_move, tablet_id)) {
+      // For now, choose the very first tablet that does not have replicas
+      // in move. Later on, additional logic might be added to find
+      // the best candidate.
+      move_tablet_id = tablet_id;
+      break;
+    }
+  }
+  if (move_tablet_id.empty()) {
+    return Status::NotFound(Substitute(
+        "table $0: could not find any suitable replica to move "
+        "from server $1 to server $2", move.table_id, move.from, move.to));
+  }
+  Rebalancer::ReplicaMove move_info;
+  move_info.tablet_uuid = move_tablet_id;
+  move_info.ts_uuid_from = move.from;
+  const auto& extra_info = FindOrDie(extra_info_by_tablet_id, move_tablet_id);
+  if (extra_info.replication_factor < extra_info.num_voters) {
+    // The number of voter replicas is greater than the target replication
+    // factor. It might happen the replica distribution would be better
+    // if just removing the source replica. Anyway, once a replica is removed,
+    // the system will automatically add a new one, if needed, where the new
+    // replica will be placed to have balanced replica distribution.
+    move_info.ts_uuid_to = "";
+  } else {
+    move_info.ts_uuid_to = move.to;
+  }
+  replica_moves->emplace_back(std::move(move_info));
+  // Mark the tablet as 'has a replica in move'.
+  tablets_in_move->emplace(std::move(move_tablet_id));
+
+  return Status::OK();
+}
 
 } // namespace rebalance
 } // namespace kudu
diff --git a/src/kudu/rebalance/rebalancer.h b/src/kudu/rebalance/rebalancer.h
index d184cb2..5b2d0e8 100644
--- a/src/kudu/rebalance/rebalancer.h
+++ b/src/kudu/rebalance/rebalancer.h
@@ -18,9 +18,11 @@
 
 #include <cstddef>
 #include <cstdint>
+#include <random>
 #include <string>
 #include <unordered_map>
 #include <unordered_set>
+#include <utility>
 #include <vector>
 
 #include <boost/optional/optional.hpp>
@@ -135,7 +137,6 @@ class Rebalancer {
 
   // Represents a concrete move of a replica from one tablet server to another.
   // Formed logically from a TableReplicaMove by specifying a tablet for the move.
-  // Originally from "tools/rebalancer.h"
   struct ReplicaMove {
     std::string tablet_uuid;
     std::string ts_uuid_from;
@@ -150,11 +151,45 @@ class Rebalancer {
   };
 
   // A helper type: key is tablet UUID which corresponds to value.tablet_uuid.
-  // Originally from "tools/rebalancer.h"
   typedef std::unordered_map<std::string, ReplicaMove> MovesInProgress;
 
   explicit Rebalancer(Config config);
 
+  // Filter the list of candidate tablets to make sure the location
+  // of the destination server would not contain the majority of replicas
+  // after the move. The 'tablet_ids' is an in-out parameter.
+  static Status FilterCrossLocationTabletCandidates(
+      const std::unordered_map<std::string, std::string>& location_by_ts_id,
+      const TabletsPlacementInfo& placement_info,
+      const TableReplicaMove& move,
+      std::vector<std::string>* tablet_ids);
+
+  // Given high-level move-some-tablet-replica-for-a-table information from the
+  // rebalancing algorithm, find appropriate tablet replicas to move between the
+  // specified tablet servers. The set of result tablet UUIDs is output
+  // into the 'tablet_ids' container (note: the container is first cleared).
+  // The source and destination replicas can then be determined by the elements
+  // of the 'tablet_ids' container and tablet server UUIDs TableReplicaMove::from
+  // and TableReplica::to correspondingly. If no suitable tablet replicas are found,
+  // 'tablet_ids' will be empty.
+  static void FindReplicas(const TableReplicaMove& move,
+                           const ClusterRawInfo& raw_info,
+                           std::vector<std::string>* tablet_ids);
+
+  // Convert the 'raw' information about the cluster into information suitable
+  // for the input of the high-level rebalancing algorithm.
+  // The 'moves_in_progress' parameter contains information on the replica moves
+  // which have been scheduled by a caller and still in progress: those are
+  // considered as successfully completed and applied to the 'raw_info' when
+  // building ClusterBalanceInfo for the specified 'raw_info' input. The idea
+  // is to prevent the algorithm outputting the same moves again while some
+  // of the moves recommended at prior steps are still in progress.
+  // The result cluster balance information is output into the 'info' parameter.
+  // The 'info' output parameter cannot be null.
+  Status BuildClusterInfo(const ClusterRawInfo& raw_info,
+                          const MovesInProgress& moves_in_progress,
+                          ClusterInfo* info) const;
+
  protected:
   // Helper class to find and schedule next available rebalancing move operation
   // and track already scheduled ones.
@@ -200,50 +235,41 @@ class Rebalancer {
 
   friend class KsckResultsToClusterBalanceInfoTest;
 
-  // Given high-level move-some-tablet-replica-for-a-table information from the
-  // rebalancing algorithm, find appropriate tablet replicas to move between the
-  // specified tablet servers. The set of result tablet UUIDs is output
-  // into the 'tablet_ids' container (note: the container is first cleared).
-  // The source and destination replicas are determined by the elements of the
-  // 'tablet_ids' container and tablet server UUIDs TableReplicaMove::from and
-  // TableReplica::to correspondingly. If no suitable tablet replicas are found,
-  // 'tablet_ids' will be empty.
-  static void FindReplicas(const TableReplicaMove& move,
-                           const ClusterRawInfo& raw_info,
-                           std::vector<std::string>* tablet_ids);
-
   // Filter move operations in 'replica_moves': remove all operations that would
   // involve moving replicas of tablets which are in 'scheduled_moves'. The
   // 'replica_moves' cannot be null.
   static void FilterMoves(const MovesInProgress& scheduled_moves,
                           std::vector<ReplicaMove>* replica_moves);
 
-  // Filter the list of candidate tablets to make sure the location
-  // of the destination server would not contain the majority of replicas
-  // after the move. The 'tablet_ids' is an in-out parameter.
-  static Status FilterCrossLocationTabletCandidates(
-      const std::unordered_map<std::string, std::string>& location_by_ts_id,
-      const TabletsPlacementInfo& placement_info,
-      const TableReplicaMove& move,
-      std::vector<std::string>* tablet_ids);
-
-  // Convert the 'raw' information about the cluster into information suitable
-  // for the input of the high-level rebalancing algorithm.
-  // The 'moves_in_progress' parameter contains information on the replica moves
-  // which have been scheduled by a caller and still in progress: those are
-  // considered as successfully completed and applied to the 'raw_info' when
-  // building ClusterBalanceInfo for the specified 'raw_info' input. The idea
-  // is to prevent the algorithm outputting the same moves again while some
-  // of the moves recommended at prior steps are still in progress.
-  // The result cluster balance information is output into the 'info' parameter.
-  // The 'info' output parameter cannot be null.
-  Status BuildClusterInfo(const ClusterRawInfo& raw_info,
-                          const MovesInProgress& moves_in_progress,
-                          ClusterInfo* info) const;
-
   // Configuration for the rebalancer.
   const Config config_;
 };
 
+// Hold information about a tablet's replication factor and replicas.
+struct TabletExtraInfo {
+  int replication_factor;
+  int num_voters;
+};
+
+// Populate a 'tablet_id' --> 'target tablet replication factor' map.
+void BuildTabletExtraInfoMap(
+    const ClusterRawInfo& raw_info,
+    std::unordered_map<std::string, TabletExtraInfo>* extra_info_by_tablet_id);
+
+// For a given table, find a tablet replica on a specified tserver to move
+// to another. Given the requested 'move', shuffle the table's tablet_ids
+// and select the first tablet that doesn't currently have any replicas in
+// 'tablets_in_move'. Add the tablet's id to 'tablets_in_move' and add
+// information about this move to 'replica_moves'. If the chosen tablet is
+// overreplicated, no destination tserver is specified, in case it is better
+// to just remove it from the replica distribution entirely.
+Status SelectReplicaToMove(
+    const TableReplicaMove& move,
+    const std::unordered_map<std::string, TabletExtraInfo>& extra_info_by_tablet_id,
+    std::mt19937* random_generator,
+    std::vector<std::string> tablet_ids,
+    std::unordered_set<std::string>* tablets_in_move,
+    std::vector<Rebalancer::ReplicaMove>* replica_moves);
+
 } // namespace rebalance
 } // namespace kudu
diff --git a/src/kudu/tools/rebalancer_tool.cc b/src/kudu/tools/rebalancer_tool.cc
index 142038c..b0a9525 100644
--- a/src/kudu/tools/rebalancer_tool.cc
+++ b/src/kudu/tools/rebalancer_tool.cc
@@ -65,13 +65,16 @@ using kudu::cluster_summary::TabletSummary;
 using kudu::master::ListTabletServersRequestPB;
 using kudu::master::ListTabletServersResponsePB;
 using kudu::master::MasterServiceProxy;
+using kudu::rebalance::BuildTabletExtraInfoMap;
 using kudu::rebalance::ClusterInfo;
 using kudu::rebalance::ClusterRawInfo;
 using kudu::rebalance::PlacementPolicyViolationInfo;
 using kudu::rebalance::Rebalancer;
+using kudu::rebalance::SelectReplicaToMove;
 using kudu::rebalance::ServersByCountMap;
 using kudu::rebalance::TableBalanceInfo;
 using kudu::rebalance::TableReplicaMove;
+using kudu::rebalance::TabletExtraInfo;
 using kudu::rebalance::TabletsPlacementInfo;
 
 using std::accumulate;
@@ -938,29 +941,8 @@ Status RebalancerTool::AlgoBasedRunner::GetNextMovesImpl(
     RETURN_NOT_OK(BuildTabletsPlacementInfo(raw_info, scheduled_moves_, &tpi));
   }
 
-  // Build 'tablet_id' --> 'target tablet replication factor' map.
-  struct TabletExtraInfo {
-    int replication_factor;
-    int num_voters;
-  };
   unordered_map<string, TabletExtraInfo> extra_info_by_tablet_id;
-  {
-    unordered_map<string, int> replication_factors_by_table;
-    for (const auto& s : raw_info.table_summaries) {
-      EmplaceOrDie(&replication_factors_by_table, s.id, s.replication_factor);
-    }
-    for (const auto& s : raw_info.tablet_summaries) {
-      int num_voters = 0;
-      for (const auto& rs : s.replicas) {
-        if (rs.is_voter) {
-          ++num_voters;
-        }
-      }
-      const auto rf = FindOrDie(replication_factors_by_table, s.table_id);
-      EmplaceOrDie(&extra_info_by_tablet_id,
-                   s.id, TabletExtraInfo{rf, num_voters});
-    }
-  }
+  BuildTabletExtraInfoMap(raw_info, &extra_info_by_tablet_id);
 
   // The number of operations to output by the algorithm. Those will be
   // translated into concrete tablet replica movement operations, the output of
@@ -996,42 +978,12 @@ Status RebalancerTool::AlgoBasedRunner::GetNextMovesImpl(
       RETURN_NOT_OK(FilterCrossLocationTabletCandidates(
           cluster_info.locality.location_by_ts_id, tpi, move, &tablet_ids));
     }
-    // Shuffle the set of the tablet identifiers: that's to achieve even spread
-    // of moves across tables with the same skew.
-    std::shuffle(tablet_ids.begin(), tablet_ids.end(), random_generator_);
-    string move_tablet_id;
-    for (const auto& tablet_id : tablet_ids) {
-      if (!ContainsKey(tablets_in_move, tablet_id)) {
-        // For now, choose the very first tablet that does not have replicas
-        // in move. Later on, additional logic might be added to find
-        // the best candidate.
-        move_tablet_id = tablet_id;
-        break;
-      }
-    }
-    if (move_tablet_id.empty()) {
-      LOG(WARNING) << Substitute(
-          "table $0: could not find any suitable replica to move "
-          "from server $1 to server $2", move.table_id, move.from, move.to);
-      continue;
-    }
-    Rebalancer::ReplicaMove move_info;
-    move_info.tablet_uuid = move_tablet_id;
-    move_info.ts_uuid_from = move.from;
-    const auto& extra_info = FindOrDie(extra_info_by_tablet_id, move_tablet_id);
-    if (extra_info.replication_factor < extra_info.num_voters) {
-      // The number of voter replicas is greater than the target replication
-      // factor. It might happen the replica distribution would be better
-      // if just removing the source replica. Anyway, once a replica is removed,
-      // the system will automatically add a new one, if needed, where the new
-      // replica will be placed to have balanced replica distribution.
-      move_info.ts_uuid_to = "";
-    } else {
-      move_info.ts_uuid_to = move.to;
-    }
-    replica_moves->emplace_back(std::move(move_info));
-    // Mark the tablet as 'has a replica in move'.
-    tablets_in_move.emplace(move_tablet_id);
+    // This will return Status::NotFound if no replica can be moved.
+    // In that case, we just continue through the loop.
+    WARN_NOT_OK(SelectReplicaToMove(move, extra_info_by_tablet_id,
+                                    &random_generator_, std::move(tablet_ids),
+                                    &tablets_in_move, replica_moves),
+                "No replica could be moved this iteration");
   }
 
   return Status::OK();
@@ -1211,7 +1163,7 @@ bool RebalancerTool::ReplaceBasedRunner::ScheduleNextMove(bool* has_errors,
     return false;
   }
 
-  // Find a move that's doesn't have its tserver UUID in scheduled_moves_.
+  // Find a move that doesn't have its tserver UUID in scheduled_moves_.
   const auto s = SetReplace(client_,
                             move_info.tablet_uuid,
                             move_info.ts_uuid_from,
diff --git a/src/kudu/tools/tool_replica_util.cc b/src/kudu/tools/tool_replica_util.cc
index d270195..895c739 100644
--- a/src/kudu/tools/tool_replica_util.cc
+++ b/src/kudu/tools/tool_replica_util.cc
@@ -764,4 +764,3 @@ Status Is343SchemeCluster(const vector<string>& master_addresses,
 
 } // namespace tools
 } // namespace kudu
-
diff --git a/src/kudu/tools/tool_replica_util.h b/src/kudu/tools/tool_replica_util.h
index dbdb543..3c8f40c 100644
--- a/src/kudu/tools/tool_replica_util.h
+++ b/src/kudu/tools/tool_replica_util.h
@@ -94,11 +94,11 @@ Status GetTabletLeader(
     bool* is_no_leader = nullptr);
 
 // Whether the replica move operation from 'from_ts_uuid' to 'to_ts_uuid'
-// server is complete (i.e. succeeded of failed) for the tablet identified by
+// server is complete (i.e. succeeded or failed) for the tablet identified by
 // 'tablet_id'. Neither 'is_complete' nor 'completion_status' output parameter
 // can be null. If the function returns Status::OK() and the replica move is
 // complete, the 'is_complete' parameter is set to 'true' and the
-// 'completion_status' contains correspoding move status: Status::OK()
+// 'completion_status' contains corresponding move status: Status::OK()
 // if the move succeeded or non-OK status if it failed.
 //
 // The function returns Status::OK() if the 'is_complete' and


Mime
View raw message