kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ale...@apache.org
Subject [1/2] kudu git commit: [rebalancer] location-aware rebalancer (part 7/n)
Date Thu, 01 Nov 2018 03:33:42 GMT
Repository: kudu
Updated Branches:
  refs/heads/master fec218bf6 -> 4ec2598a3


[rebalancer] location-aware rebalancer (part 7/n)

Added PolicyFixer and integrated the cross-location balancing
algorithm.  Added one basic integration test as well.

More integration tests will be added in a follow-up commit.

Change-Id: I9ace790aad1c1a4605ef90f6df2104f4a228a5b5
Reviewed-on: http://gerrit.cloudera.org:8080/11748
Tested-by: Kudu Jenkins
Reviewed-by: Will Berkeley <wdberkeley@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/81bba247
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/81bba247
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/81bba247

Branch: refs/heads/master
Commit: 81bba2472b469996c3a0d8ed9f6412fe29cd4771
Parents: fec218b
Author: Alexey Serbin <aserbin@cloudera.com>
Authored: Fri Oct 19 23:38:16 2018 -0700
Committer: Alexey Serbin <aserbin@cloudera.com>
Committed: Thu Nov 1 03:32:45 2018 +0000

----------------------------------------------------------------------
 src/kudu/tools/CMakeLists.txt          |   3 +-
 src/kudu/tools/rebalancer.cc           | 492 ++++++++++++++++++++++++++--
 src/kudu/tools/rebalancer.h            | 158 +++++++--
 src/kudu/tools/rebalancer_tool-test.cc | 128 +++++++-
 src/kudu/tools/tool_action_cluster.cc  |  27 +-
 5 files changed, 740 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/81bba247/src/kudu/tools/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/tools/CMakeLists.txt b/src/kudu/tools/CMakeLists.txt
index d4017d6..1be02ca 100644
--- a/src/kudu/tools/CMakeLists.txt
+++ b/src/kudu/tools/CMakeLists.txt
@@ -185,7 +185,8 @@ ADD_KUDU_TEST(placement_policy_util-test)
 ADD_KUDU_TEST(rebalance-test)
 ADD_KUDU_TEST(rebalance_algo-test)
 ADD_KUDU_TEST(rebalancer_tool-test
-  NUM_SHARDS 8 PROCESSORS 3)
+  NUM_SHARDS 8 PROCESSORS 3
+  DATA_FILES ../scripts/assign-location.py)
 ADD_KUDU_TEST_DEPENDENCIES(rebalancer_tool-test
   kudu)
 ADD_KUDU_TEST(tool_action-test)

http://git-wip-us.apache.org/repos/asf/kudu/blob/81bba247/src/kudu/tools/rebalancer.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/rebalancer.cc b/src/kudu/tools/rebalancer.cc
index abf28f0..4d2d769 100644
--- a/src/kudu/tools/rebalancer.cc
+++ b/src/kudu/tools/rebalancer.cc
@@ -38,6 +38,7 @@
 #include <glog/logging.h>
 
 #include "kudu/client/client.h"
+#include "kudu/consensus/quorum_util.h"
 #include "kudu/gutil/basictypes.h"
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/port.h"
@@ -45,6 +46,7 @@
 #include "kudu/tools/ksck.h"
 #include "kudu/tools/ksck_remote.h"
 #include "kudu/tools/ksck_results.h"
+#include "kudu/tools/placement_policy_util.h"
 #include "kudu/tools/rebalance_algo.h"
 #include "kudu/tools/tool_action_common.h"
 #include "kudu/tools/tool_replica_util.h"
@@ -82,14 +84,20 @@ Rebalancer::Config::Config(
     size_t max_staleness_interval_sec,
     int64_t max_run_time_sec,
     bool move_rf1_replicas,
-    bool output_replica_distribution_details)
-        : master_addresses(std::move(master_addresses)),
-          table_filters(std::move(table_filters)),
-          max_moves_per_server(max_moves_per_server),
-          max_staleness_interval_sec(max_staleness_interval_sec),
-          max_run_time_sec(max_run_time_sec),
-          move_rf1_replicas(move_rf1_replicas),
-          output_replica_distribution_details(output_replica_distribution_details) {
+    bool output_replica_distribution_details,
+    bool run_policy_fixer,
+    bool run_cross_location_rebalancing,
+    bool run_intra_location_rebalancing)
+    : master_addresses(std::move(master_addresses)),
+      table_filters(std::move(table_filters)),
+      max_moves_per_server(max_moves_per_server),
+      max_staleness_interval_sec(max_staleness_interval_sec),
+      max_run_time_sec(max_run_time_sec),
+      move_rf1_replicas(move_rf1_replicas),
+      output_replica_distribution_details(output_replica_distribution_details),
+      run_policy_fixer(run_policy_fixer),
+      run_cross_location_rebalancing(run_cross_location_rebalancing),
+      run_intra_location_rebalancing(run_intra_location_rebalancing) {
   DCHECK_GE(max_moves_per_server, 0);
 }
 
@@ -103,7 +111,7 @@ Status Rebalancer::PrintStats(std::ostream& out) {
   const KsckResults& results = ksck_->results();
 
   ClusterRawInfo raw_info;
-  RETURN_NOT_OK(KsckResultsToClusterRawInfo(results, &raw_info));
+  RETURN_NOT_OK(KsckResultsToClusterRawInfo(boost::none, results, &raw_info));
 
   ClusterInfo ci;
   RETURN_NOT_OK(BuildClusterInfo(raw_info, MovesInProgress(), &ci));
@@ -218,29 +226,143 @@ Status Rebalancer::Run(RunStatus* result_status, size_t* moves_count) {
   }
 
   ClusterRawInfo raw_info;
-  RETURN_NOT_OK(KsckResultsToClusterRawInfo(ksck_->results(), &raw_info));
+  RETURN_NOT_OK(
+      KsckResultsToClusterRawInfo(boost::none, ksck_->results(), &raw_info));
 
   ClusterInfo ci;
   RETURN_NOT_OK(BuildClusterInfo(raw_info, MovesInProgress(), &ci));
 
-  TwoDimensionalGreedyRunner runner(this, config_.max_moves_per_server, deadline);
-  RETURN_NOT_OK(runner.Init(config_.master_addresses));
-  RETURN_NOT_OK(RunWith(&runner, result_status));
+  const auto& ts_id_by_location = ci.locality.servers_by_location;
+  if (ts_id_by_location.empty()) {
+    // Empty cluster: no tablet servers reported.
+    if (moves_count != nullptr) {
+      *moves_count = 0;
+    }
+    *result_status = RunStatus::CLUSTER_IS_BALANCED;
+    LOG(INFO) << "no tablet servers are reported: nothing to balance";
+    return Status::OK();
+  }
+
+  size_t moves_count_total = 0;
+  if (ts_id_by_location.size() == 1) {
+    const auto& location = ts_id_by_location.cbegin()->first;
+    LOG(INFO) << "running whole-cluster rebalancing";
+    IntraLocationRunner runner(
+        this, config_.max_moves_per_server, deadline, location);
+    RETURN_NOT_OK(runner.Init(config_.master_addresses));
+    RETURN_NOT_OK(RunWith(&runner, result_status));
+    moves_count_total += runner.moves_count();
+  } else {
+    // The essence of location-aware balancing:
+    //   1. Find tablets whose replicas placed in such a way that their
+    //      distribution violates the main constraint of the placement policy.
+    //      For each non-conforming tablet, move its replicas to restore
+    //      the placement policy restrictions. In other words, if a location has
+    //      more than the majority of replicas for some tablet,
+    //      move the replicas of the tablet to other locations.
+    //   2. For every tablet whose replica placement does not violate the
+    //      placement policy constraints, balance the load among locations.
+    //   3. Balance replica distribution within every location. This is a.k.a.
+    //      intra-location balancing. The intra-location balancing involves
+    //      moving replicas only within location, no replicas are moved between
+    //      locations.
+    if (config_.run_policy_fixer) {
+      // Fix placement policy violations, if any.
+      LOG(INFO) << "fixing placement policy violations";
+      PolicyFixer runner(this, config_.max_moves_per_server, deadline);
+      RETURN_NOT_OK(runner.Init(config_.master_addresses));
+      RETURN_NOT_OK(RunWith(&runner, result_status));
+      moves_count_total += runner.moves_count();
+    }
+    if (config_.run_cross_location_rebalancing) {
+      // Run the rebalancing across locations (inter-location rebalancing).
+      LOG(INFO) << "running cross-location rebalancing";
+      CrossLocationRunner runner(this, config_.max_moves_per_server, deadline);
+      RETURN_NOT_OK(runner.Init(config_.master_addresses));
+      RETURN_NOT_OK(RunWith(&runner, result_status));
+      moves_count_total += runner.moves_count();
+    }
+    if (config_.run_intra_location_rebalancing) {
+      // Run the rebalancing within every location (intra-location rebalancing).
+      for (const auto& elem : ts_id_by_location) {
+        const auto& location = elem.first;
+        // TODO(aserbin): it would be nice to run these rebalancers in parallel
+        LOG(INFO) << "running rebalancer within location '" << location << "'";
+        IntraLocationRunner runner(
+            this, config_.max_moves_per_server, deadline, location);
+        RETURN_NOT_OK(runner.Init(config_.master_addresses));
+        RETURN_NOT_OK(RunWith(&runner, result_status));
+        moves_count_total += runner.moves_count();
+      }
+    }
+  }
   if (moves_count != nullptr) {
-    *moves_count = runner.moves_count();
+    *moves_count = moves_count_total;
   }
 
   return Status::OK();
 }
 
 Status Rebalancer::KsckResultsToClusterRawInfo(
+    const boost::optional<string>& location,
     const KsckResults& ksck_info,
     ClusterRawInfo* raw_info) {
   DCHECK(raw_info);
 
-  raw_info->tserver_summaries = ksck_info.tserver_summaries;
-  raw_info->table_summaries = ksck_info.table_summaries;
-  raw_info->tablet_summaries = ksck_info.tablet_summaries;
+  // Filter out entities that are not relevant to the specified location.
+  vector<KsckServerHealthSummary> tserver_summaries;
+  tserver_summaries.reserve(ksck_info.tserver_summaries.size());
+
+  vector<KsckTabletSummary> tablet_summaries;
+  tablet_summaries.reserve(ksck_info.tablet_summaries.size());
+
+  vector<KsckTableSummary> table_summaries;
+  table_summaries.reserve(table_summaries.size());
+
+  if (!location) {
+    // Information on the whole cluster.
+    tserver_summaries = ksck_info.tserver_summaries;
+    tablet_summaries = ksck_info.tablet_summaries;
+    table_summaries = ksck_info.table_summaries;
+  } else {
+    // Information on the specified location only: filter out non-relevant info.
+    const auto& location_str =  *location;
+
+    unordered_set<string> ts_ids_at_location;
+    for (const auto& summary : ksck_info.tserver_summaries) {
+      if (summary.ts_location == location_str) {
+        tserver_summaries.push_back(summary);
+        InsertOrDie(&ts_ids_at_location, summary.uuid);
+      }
+    }
+
+    unordered_set<string> table_ids_at_location;
+    for (const auto& summary : ksck_info.tablet_summaries) {
+      const auto& replicas = summary.replicas;
+      decltype(summary.replicas) replicas_at_location;
+      replicas_at_location.reserve(replicas.size());
+      for (const auto& replica : replicas) {
+        if (ContainsKey(ts_ids_at_location, replica.ts_uuid)) {
+          replicas_at_location.push_back(replica);
+        }
+      }
+      if (!replicas_at_location.empty()) {
+        table_ids_at_location.insert(summary.table_id);
+      }
+      tablet_summaries.push_back(summary);
+      tablet_summaries.back().replicas = std::move(replicas_at_location);
+    }
+
+    for (const auto& summary : ksck_info.table_summaries) {
+      if (ContainsKey(table_ids_at_location, summary.id)) {
+        table_summaries.push_back(summary);
+      }
+    }
+  }
+
+  raw_info->tserver_summaries = std::move(tserver_summaries);
+  raw_info->table_summaries = std::move(table_summaries);
+  raw_info->tablet_summaries = std::move(tablet_summaries);
 
   return Status::OK();
 }
@@ -362,6 +484,60 @@ void Rebalancer::FilterMoves(const MovesInProgress& scheduled_moves,
   *replica_moves = std::move(filtered_replica_moves);
 }
 
+Status Rebalancer::FilterCrossLocationTabletCandidates(
+    const unordered_map<string, string>& location_by_ts_id,
+    const TabletsPlacementInfo& placement_info,
+    const TableReplicaMove& move,
+    vector<string>* tablet_ids) {
+  DCHECK(tablet_ids);
+
+  if (tablet_ids->empty()) {
+    // Nothing to filter.
+    return Status::OK();
+  }
+
+  const auto& dst_location = FindOrDie(location_by_ts_id, move.to);
+  const auto& src_location = FindOrDie(location_by_ts_id, move.from);
+
+  // Sanity check: the source and the destination tablet servers should be
+  // in different locations.
+  if (src_location == dst_location) {
+    return Status::InvalidArgument(Substitute(
+        "moving replicas of table $0: the same location '$1' for both "
+        "the source ($2) and the destination ($3) tablet servers",
+         move.table_id, src_location, move.from, move.to));
+  }
+
+  vector<string> tablet_ids_filtered;
+  for (auto& tablet_id : *tablet_ids) {
+    const auto& replica_count_info = FindOrDie(
+        placement_info.tablet_location_info, tablet_id);
+    const auto* count_ptr = FindOrNull(replica_count_info, dst_location);
+    if (count_ptr == nullptr) {
+      // Nothing else to clarify: not a single replica in the destnation
+      // location for this candidate tablet.
+      tablet_ids_filtered.emplace_back(std::move(tablet_id));
+      continue;
+    }
+    const auto location_replica_num = *count_ptr;
+    const auto& table_id = FindOrDie(placement_info.tablet_to_table_id, tablet_id);
+    const auto& table_info = FindOrDie(placement_info.tables_info, table_id);
+    const auto rf = table_info.replication_factor;
+    if (location_replica_num + 1 >= consensus::MajoritySize(rf)) {
+      VLOG(1) << Substitute("destination location '$0' for candidate tablet $1 "
+                            "already contains $2 of $3 replicas",
+                            dst_location, tablet_id, location_replica_num, rf);
+      continue;
+    }
+    // No majority of replicas in the destination location: it's OK candidate.
+    tablet_ids_filtered.emplace_back(std::move(tablet_id));
+  }
+
+  *tablet_ids = std::move(tablet_ids_filtered);
+
+  return Status::OK();
+}
+
 Status Rebalancer::BuildClusterInfo(const ClusterRawInfo& raw_info,
                                     const MovesInProgress& moves_in_progress,
                                     ClusterInfo* info) const {
@@ -386,6 +562,18 @@ Status Rebalancer::BuildClusterInfo(const ClusterRawInfo& raw_info,
     }
   }
 
+  auto& ts_uuids_by_location = result_info.locality.servers_by_location;
+  auto& location_by_ts_uuid = result_info.locality.location_by_ts_id;
+  for (const auto& summary : raw_info.tserver_summaries) {
+    const auto& ts_id = summary.uuid;
+    const auto& ts_location = summary.ts_location;
+    VLOG(1) << Substitute("found tserver $0 at location '$1'", ts_id, ts_location);
+    EmplaceOrDie(&location_by_ts_uuid, ts_id, ts_location);
+    auto& ts_ids = LookupOrEmplace(&ts_uuids_by_location,
+                                   ts_location, set<string>());
+    InsertOrDie(&ts_ids, ts_id);
+  }
+
   for (const auto& s : raw_info.tserver_summaries) {
     if (s.health != KsckServerHealth::HEALTHY) {
       LOG(INFO) << Substitute("skipping tablet server $0 ($1) because of its "
@@ -495,6 +683,7 @@ Status Rebalancer::BuildClusterInfo(const ClusterRawInfo& raw_info,
     }
     table_info_by_skew.emplace(max_count - min_count, std::move(tbi));
   }
+  // TODO(aserbin): add sanity checks on the result.
   *info = std::move(result_info);
 
   return Status::OK();
@@ -577,9 +766,10 @@ Status Rebalancer::RunWith(Runner* runner, RunStatus* result_status) {
   return Status::OK();
 }
 
-Status Rebalancer::GetClusterRawInfo(ClusterRawInfo* raw_info) {
+Status Rebalancer::GetClusterRawInfo(const boost::optional<string>& location,
+                                     ClusterRawInfo* raw_info) {
   RETURN_NOT_OK(RefreshKsckResults());
-  return KsckResultsToClusterRawInfo(ksck_->results(), raw_info);
+  return KsckResultsToClusterRawInfo(location, ksck_->results(), raw_info);
 }
 
 Status Rebalancer::RefreshKsckResults() {
@@ -623,17 +813,21 @@ Status Rebalancer::BaseRunner::GetNextMoves(bool* has_moves) {
     return Status::OK();
   }
 
-  // Filter out moves for tablets which already have operations in progress.
-  // The idea is simple: no more than one move operation per tablet should
-  // ever be attempted.
+  // The GetNextMovesImpl() method prescribes replica movements using simplified
+  // logic that doesn't know about best practices of safe and robust Raft
+  // configuration changes. Here it's necessary to filter out moves for tablets
+  // which already have operations in progress. The idea is simple: don't start
+  // another operation for a tablet when there is still a pending operation
+  // for that tablet.
   Rebalancer::FilterMoves(scheduled_moves_, &replica_moves);
   LoadMoves(std::move(replica_moves));
 
-  // TODO(aserbin): now this method reports on presence of some moves even if
-  //                all of those are in progress and no fresh new are available.
-  //                Would it be more convenient for to report only on the
-  //                fresh new moves and check for the presence of the scheduled
-  //                moves at the upper level?
+  // TODO(aserbin): this method reports on availability of move operations
+  //                via the 'has_moves' parameter even if all of those were
+  //                actually filtered out by the FilterMoves() method.
+  //                Would it be more convenient to report only on the new,
+  //                not-yet-in-progress operations and check for the presence
+  //                of the scheduled moves at the upper level?
   *has_moves = true;
   return Status::OK();
 }
@@ -733,7 +927,6 @@ void Rebalancer::AlgoBasedRunner::LoadMoves(vector<ReplicaMove> replica_moves) {
   }
 }
 
-// Return true if replica move operation has been scheduled successfully.
 bool Rebalancer::AlgoBasedRunner::ScheduleNextMove(bool* has_errors,
                                                    bool* timed_out) {
   DCHECK(has_errors);
@@ -746,13 +939,13 @@ bool Rebalancer::AlgoBasedRunner::ScheduleNextMove(bool* has_errors,
     return false;
   }
 
-  // Only one move operation per step: it's necessary to update information
-  // in the ts_per_op_count_ right after scheduling a single operation
+  // Scheduling one operation per step. Once operation is scheduled, it's
+  // necessary to update the ts_per_op_count_ container right after scheduling
   // to avoid oversubscribing of the tablet servers.
   size_t op_idx;
   if (!FindNextMove(&op_idx)) {
-    // Nothing to schedule: unfruitful outcome. Need to wait until
-    // there is a slot at tablet server is available.
+    // Nothing to schedule yet: unfruitful outcome. Need to wait until there is
+    // an available slot at a tablet server.
     return false;
   }
 
@@ -853,8 +1046,9 @@ bool Rebalancer::AlgoBasedRunner::UpdateMovesInProgressStatus(
 // one step of the rebalancing.
 Status Rebalancer::AlgoBasedRunner::GetNextMovesImpl(
     vector<ReplicaMove>* replica_moves) {
+  const auto& loc = location();
   ClusterRawInfo raw_info;
-  RETURN_NOT_OK(rebalancer_->GetClusterRawInfo(&raw_info));
+  RETURN_NOT_OK(rebalancer_->GetClusterRawInfo(loc, &raw_info));
 
   // For simplicity, allow to run the rebalancing only when all tablet servers
   // are in good shape. Otherwise, the rebalancing might interfere with the
@@ -867,6 +1061,11 @@ Status Rebalancer::AlgoBasedRunner::GetNextMovesImpl(
     }
   }
 
+  TabletsPlacementInfo tpi;
+  if (!loc) {
+    RETURN_NOT_OK(BuildTabletsPlacementInfo(raw_info, &tpi));
+  }
+
   // The number of operations to output by the algorithm. Those will be
   // translated into concrete tablet replica movement operations, the output of
   // this method.
@@ -893,6 +1092,14 @@ Status Rebalancer::AlgoBasedRunner::GetNextMovesImpl(
   for (const auto& move : moves) {
     vector<string> tablet_ids;
     RETURN_NOT_OK(FindReplicas(move, raw_info, &tablet_ids));
+    if (!loc) {
+      // In case of cross-location (a.k.a. inter-location) rebalancing it is
+      // necessary to make sure the majority of replicas would not end up
+      // at the same location after the move. If so, remove those tablets
+      // from the list of candidates.
+      RETURN_NOT_OK(FilterCrossLocationTabletCandidates(
+          cluster_info.locality.location_by_ts_id, tpi, move, &tablet_ids));
+    }
     // Shuffle the set of the tablet identifiers: that's to achieve even spread
     // of moves across tables with the same skew.
     std::shuffle(tablet_ids.begin(), tablet_ids.end(), random_generator_);
@@ -978,7 +1185,7 @@ void Rebalancer::AlgoBasedRunner::UpdateOnMoveScheduled(
     bool is_success) {
   if (is_success) {
     Rebalancer::ReplicaMove move_info = { tablet_uuid, src_ts_uuid, dst_ts_uuid };
-    auto ins = scheduled_moves_.emplace(tablet_uuid, std::move(move_info));
+    scheduled_moves_.emplace(tablet_uuid, std::move(move_info));
     // Only one replica of a tablet can be moved at a time.
     // TODO(aserbin): clarify on duplicates
     //DCHECK(ins.second);
@@ -1015,12 +1222,227 @@ void Rebalancer::AlgoBasedRunner::UpdateOnMoveScheduledImpl(
   }
 }
 
-Rebalancer::TwoDimensionalGreedyRunner::TwoDimensionalGreedyRunner(
+Rebalancer::IntraLocationRunner::IntraLocationRunner(
+    Rebalancer* rebalancer,
+    size_t max_moves_per_server,
+    boost::optional<MonoTime> deadline,
+    std::string location)
+    : AlgoBasedRunner(rebalancer, max_moves_per_server, std::move(deadline)),
+      location_(std::move(location)) {
+}
+
+Rebalancer::CrossLocationRunner::CrossLocationRunner(
     Rebalancer* rebalancer,
     size_t max_moves_per_server,
     boost::optional<MonoTime> deadline)
     : AlgoBasedRunner(rebalancer, max_moves_per_server, std::move(deadline)) {
 }
 
+Rebalancer::PolicyFixer::PolicyFixer(
+    Rebalancer* rebalancer,
+    size_t max_moves_per_server,
+    boost::optional<MonoTime> deadline)
+    : BaseRunner(rebalancer, max_moves_per_server, std::move(deadline)) {
+}
+
+Status Rebalancer::PolicyFixer::Init(vector<string> master_addresses) {
+  DCHECK(moves_to_schedule_.empty());
+  return BaseRunner::Init(std::move(master_addresses));
+}
+
+void Rebalancer::PolicyFixer::LoadMoves(
+    vector<ReplicaMove> replica_moves) {
+  // Replace the list of moves operations to schedule. Even if it's not empty,
+  // some elements of it might be irrelevant anyway, so there is no need to
+  // keep any since the new information is the most up-to-date. The input list
+  // is already filtered and should not contain any operations which are
+  // tracked as already scheduled ones.
+  moves_to_schedule_.clear();
+
+  for (auto& move_info : replica_moves) {
+    auto ts_uuid = move_info.ts_uuid_from;
+    DCHECK(!ts_uuid.empty());
+    moves_to_schedule_.emplace(std::move(ts_uuid), std::move(move_info));
+  }
+
+  // Refresh the helper containers.
+  for (const auto& elem : moves_to_schedule_) {
+    const auto& ts_uuid = elem.first;
+    DCHECK(!ts_uuid.empty());
+    if (op_count_per_ts_.emplace(ts_uuid, 0).second) {
+      // No operations for tablet server ts_uuid: add ts_per_op_count_ entry.
+      ts_per_op_count_.emplace(0, ts_uuid);
+    }
+  }
+}
+
+bool Rebalancer::PolicyFixer::ScheduleNextMove(bool* has_errors,
+                                               bool* timed_out) {
+  DCHECK(has_errors);
+  DCHECK(timed_out);
+  *has_errors = false;
+  *timed_out = false;
+
+  if (deadline_ && MonoTime::Now() >= *deadline_) {
+    *timed_out = true;
+    return false;
+  }
+
+  ReplicaMove move_info;
+  if (!FindNextMove(&move_info)) {
+    return false;
+  }
+
+  // Find a move that's doesn't have its tserver UUID in scheduled_moves_.
+  const auto s = SetReplace(client_,
+                            move_info.tablet_uuid,
+                            move_info.ts_uuid_from,
+                            move_info.config_opid_idx);
+  if (!s.ok()) {
+    *has_errors = true;
+    return false;
+  }
+
+  // Remove the element from moves_to_schedule_.
+  bool erased = false;
+  auto range = moves_to_schedule_.equal_range(move_info.ts_uuid_from);
+  for (auto it = range.first; it != range.second; ++it) {
+    if (move_info.tablet_uuid == it->second.tablet_uuid) {
+      moves_to_schedule_.erase(it);
+      erased = true;
+      break;
+    }
+  }
+  CHECK(erased) << Substitute("T $0 P $1: move information not found",
+                              move_info.tablet_uuid, move_info.ts_uuid_from);
+  LOG(INFO) << Substitute("tablet $0: $1 -> ? move scheduled",
+                          move_info.tablet_uuid, move_info.ts_uuid_from);
+  // Add information on scheduled move into the scheduled_moves_.
+  // Only one replica of a tablet can be moved at a time.
+  auto tablet_uuid = move_info.tablet_uuid;
+  EmplaceOrDie(&scheduled_moves_, std::move(tablet_uuid), std::move(move_info));
+  return true;
+}
+
+bool Rebalancer::PolicyFixer::UpdateMovesInProgressStatus(
+    bool* has_errors, bool* timed_out) {
+  DCHECK(has_errors);
+  DCHECK(timed_out);
+
+  auto has_updates = false;
+  auto error_count = 0;
+  auto out_of_time = false;
+  for (auto it = scheduled_moves_.begin(); it != scheduled_moves_.end(); ) {
+    if (deadline_ && MonoTime::Now() >= *deadline_) {
+      out_of_time = true;
+      break;
+    }
+    bool is_complete;
+    Status completion_status;
+    const auto& tablet_id = it->second.tablet_uuid;
+    const auto& ts_uuid = it->second.ts_uuid_from;
+    auto s = CheckCompleteReplace(client_, tablet_id, ts_uuid,
+                                  &is_complete, &completion_status);
+    if (!s.ok()) {
+      // Update on the movement status has failed: remove the move operation
+      // as if it didn't exist. Once the cluster status is re-synchronized,
+      // the corresponding operation will be scheduled again, if needed.
+      ++error_count;
+      LOG(INFO) << Substitute("tablet $0: $1 -> ? move is abandoned: $2",
+                              tablet_id, ts_uuid, s.ToString());
+      it = scheduled_moves_.erase(it);
+      continue;
+    }
+    DCHECK(s.ok());
+    if (is_complete) {
+      // The replacement has completed (success or failure): update the stats
+      // on the pending operations per server.
+      ++moves_count_;
+      has_updates = true;
+      LOG(INFO) << Substitute("tablet $0: $1 -> ? move completed: $2",
+                              tablet_id, ts_uuid, completion_status.ToString());
+      UpdateOnMoveCompleted(ts_uuid);
+      it = scheduled_moves_.erase(it);
+      continue;
+    }
+    ++it;
+  }
+
+  *timed_out = out_of_time;
+  *has_errors = (error_count > 0);
+  return has_updates;
+}
+
+Status Rebalancer::PolicyFixer::GetNextMovesImpl(
+    vector<ReplicaMove>* replica_moves) {
+  ClusterRawInfo raw_info;
+  RETURN_NOT_OK(rebalancer_->GetClusterRawInfo(boost::none, &raw_info));
+
+  // For simplicity, allow to run the rebalancing only when all tablet servers
+  // are in good shape. Otherwise, the rebalancing might interfere with the
+  // automatic re-replication or get unexpected errors while moving replicas.
+  // TODO(aserbin): move it somewhere else?
+  for (const auto& s : raw_info.tserver_summaries) {
+    if (s.health != KsckServerHealth::HEALTHY) {
+      return Status::IllegalState(
+          Substitute("tablet server $0 ($1): unacceptable health status $2",
+                     s.uuid, s.address, ServerHealthToString(s.health)));
+    }
+  }
+  ClusterInfo ci;
+  RETURN_NOT_OK(rebalancer_->BuildClusterInfo(raw_info, MovesInProgress(), &ci));
+
+  TabletsPlacementInfo placement_info;
+  RETURN_NOT_OK(BuildTabletsPlacementInfo(raw_info, &placement_info));
+
+  vector<PlacementPolicyViolationInfo> ppvi;
+  RETURN_NOT_OK(DetectPlacementPolicyViolations(placement_info, &ppvi));
+
+  // Filter out all reported violations which are already taken care of.
+  // The idea is to have not more than one pending operation per tablet.
+  {
+    decltype(ppvi) ppvi_filtered;
+    for (auto& info : ppvi) {
+      if (ContainsKey(scheduled_moves_, info.tablet_id)) {
+        continue;
+      }
+      ppvi_filtered.emplace_back(std::move(info));
+    }
+    ppvi = std::move(ppvi_filtered);
+  }
+
+  RETURN_NOT_OK(FindMovesToReimposePlacementPolicy(
+      placement_info, ci.locality, ppvi, replica_moves));
+
+  if (PREDICT_FALSE(VLOG_IS_ON(1))) {
+    for (const auto& info : ppvi) {
+      VLOG(1) << Substitute("policy violation at location '$0': tablet $1",
+                            info.majority_location, info.tablet_id);
+    }
+    for (const auto& move : *replica_moves) {
+      VLOG(1) << Substitute("policy fix for tablet $0: replica to remove $1",
+                            move.tablet_uuid, move.ts_uuid_from);
+    }
+  }
+
+  return Status::OK();
+}
+
+bool Rebalancer::PolicyFixer::FindNextMove(ReplicaMove* move) {
+  DCHECK(move);
+  // TODO(aserbin): use pessimistic /2 limit for max_moves_per_servers_
+  // since the desitnation servers for the move of the replica marked with
+  // the REPLACE attribute is not known.
+
+  // Load the least loaded (in terms of scheduled moves) tablet servers first.
+  for (const auto& elem : ts_per_op_count_) {
+    const auto& ts_uuid = elem.second;
+    if (FindCopy(moves_to_schedule_, ts_uuid, move)) {
+      return true;
+    }
+  }
+  return false;
+}
+
 } // namespace tools
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/81bba247/src/kudu/tools/rebalancer.h
----------------------------------------------------------------------
diff --git a/src/kudu/tools/rebalancer.h b/src/kudu/tools/rebalancer.h
index c4f5824..7bb0d73 100644
--- a/src/kudu/tools/rebalancer.h
+++ b/src/kudu/tools/rebalancer.h
@@ -36,6 +36,12 @@
 #include "kudu/util/status.h"
 
 namespace kudu {
+namespace tools {
+struct TabletsPlacementInfo;
+}  // namespace tools
+}  // namespace kudu
+
+namespace kudu {
 
 namespace client {
 class KuduClient;
@@ -63,7 +69,10 @@ class Rebalancer {
            size_t max_staleness_interval_sec = 300,
            int64_t max_run_time_sec = 0,
            bool move_rf1_replicas = false,
-           bool output_replica_distribution_details = false);
+           bool output_replica_distribution_details = false,
+           bool run_policy_fixer = true,
+           bool run_cross_location_rebalancing = true,
+           bool run_intra_location_rebalancing = true);
 
     // Kudu masters' RPC endpoints.
     std::vector<std::string> master_addresses;
@@ -94,6 +103,23 @@ class Rebalancer {
     // Whether Rebalancer::PrintStats() should output per-table and per-server
     // replica distribution details.
     bool output_replica_distribution_details;
+
+    // In case of multi-location cluster, whether to detect and fix placement
+    // policy violations. Fixing placement policy violations involves moving
+    // tablet replicas across different locations in the cluster.
+    // This setting is applicable to multi-location clusters only.
+    bool run_policy_fixer = true;
+
+    // In case of multi-location cluster, whether to move tablet replicas
+    // between locations in attempt to spread tablet replicas among location
+    // evenly (equalizing loads of locations throughout the cluster).
+    // This setting is applicable to multi-location clusters only.
+    bool run_cross_location_rebalancing = true;
+
+    // In case of multi-location cluster, whether to rebalance tablet replica
+    // distribution within each location.
+    // This setting is applicable to multi-location clusters only.
+    bool run_intra_location_rebalancing = true;
   };
 
   // Represents a concrete move of a replica from one tablet server to another.
@@ -137,14 +163,16 @@ class Rebalancer {
     // the 'master_addresses' RPC endpoints.
     virtual Status Init(std::vector<std::string> master_addresses) = 0;
 
-    // Load information on prescribed replica movement operations. Also,
+    // Load information on the prescribed replica movement operations. Also,
     // populate helper containers and other auxiliary run-time structures
     // used by ScheduleNextMove(). This method is called with every batch
     // of move operations output by the rebalancing algorithm once previously
     // loaded moves have been scheduled.
     virtual void LoadMoves(std::vector<ReplicaMove> replica_moves) = 0;
 
-    // Schedule next replica move.
+    // Schedule next replica move. Returns 'true' if replica move operation
+    // has been scheduled successfully; otherwise returns 'false' and sets
+    // the 'has_errors' and 'timed_out' parameters accordingly.
     virtual bool ScheduleNextMove(bool* has_errors, bool* timed_out) = 0;
 
     // Update statuses and auxiliary information on in-progress replica move
@@ -229,7 +257,8 @@ class Rebalancer {
     // The 'max_moves_per_server' specifies the maximum number of operations
     // per tablet server (both the source and the destination are counted in).
     // The 'deadline' specifies the deadline for the run, 'boost::none'
-    // if no timeout is set.
+    // if no timeout is set. If 'location' is boost::none, rebalance across
+    // locations.
     AlgoBasedRunner(Rebalancer* rebalancer,
                     size_t max_moves_per_server,
                     boost::optional<MonoTime> deadline);
@@ -242,14 +271,19 @@ class Rebalancer {
 
     bool UpdateMovesInProgressStatus(bool* has_errors, bool* timed_out) override;
 
+    // Get the cluter location the runner is slated to run/running at.
+    // 'boost::none' means all the cluster.
+    virtual const boost::optional<std::string>& location() const = 0;
+
     // Rebalancing algorithm that running uses to find replica moves.
     virtual RebalancingAlgo* algorithm() = 0;
 
    protected:
     Status GetNextMovesImpl(std::vector<ReplicaMove>* replica_moves) override;
 
-    // Given the data in the helper containers, find the index describing
-    // the next replica move and output it into the 'op_idx' parameter.
+    // Using the helper containers src_op_indices_ and dst_op_indices_,
+    // find the index of the most optimal replica movement operation
+    // and output the index into the 'op_idx' parameter.
     bool FindNextMove(size_t* op_idx);
 
     // Update the helper containers once a move operation has been scheduled.
@@ -277,38 +311,107 @@ class Rebalancer {
     // tserver UUID (i.e. the key) as the destination of the move operation'.
     std::unordered_map<std::string, std::set<size_t>> dst_op_indices_;
 
+    // Information on scheduled replica movement operations; keys are
+    // tablet UUIDs, values are ReplicaMove structures.
+    MovesInProgress scheduled_moves_;
+
     // Random device and generator for selecting among multiple choices, when
     // appropriate.
     std::random_device random_device_;
     std::mt19937 random_generator_;
   }; // class AlgoBasedRunner
 
-  class TwoDimensionalGreedyRunner : public AlgoBasedRunner {
+  class IntraLocationRunner : public AlgoBasedRunner {
    public:
     // The 'max_moves_per_server' specifies the maximum number of operations
     // per tablet server (both the source and the destination are counted in).
     // The 'deadline' specifies the deadline for the run, 'boost::none'
-    // if no timeout is set.
-    TwoDimensionalGreedyRunner(Rebalancer* rebalancer,
-                               size_t max_moves_per_server,
-                               boost::optional<MonoTime> deadline);
+    // if no timeout is set. In case of non-location aware cluster or if there
+    // is just one location defined in the whole cluster, the whole cluster will
+    // be rebalanced.
+    IntraLocationRunner(Rebalancer* rebalancer,
+                        size_t max_moves_per_server,
+                        boost::optional<MonoTime> deadline,
+                        std::string location);
 
     RebalancingAlgo* algorithm() override {
       return &algorithm_;
     }
 
+    const boost::optional<std::string>& location() const override {
+      return location_;
+    }
+
    private:
+    const boost::optional<std::string> location_;
+
     // An instance of the balancing algorithm.
     TwoDimensionalGreedyAlgo algorithm_;
   };
 
+  class CrossLocationRunner : public AlgoBasedRunner {
+   public:
+    // The 'max_moves_per_server' specifies the maximum number of operations
+    // per tablet server (both the source and the destination are counted in).
+    // The 'deadline' specifies the deadline for the run, 'boost::none'
+    // if no timeout is set.
+    CrossLocationRunner(Rebalancer* rebalancer,
+                        size_t max_moves_per_server,
+                        boost::optional<MonoTime> deadline);
+
+    RebalancingAlgo* algorithm() override {
+      return &algorithm_;
+    }
+
+    const boost::optional<std::string>& location() const override {
+      return location_;
+    }
+
+   private:
+    const boost::optional<std::string> location_ = boost::none;
+
+    // An instance of the balancing algorithm.
+    LocationBalancingAlgo algorithm_;
+  };
+
+  class PolicyFixer : public BaseRunner {
+   public:
+    PolicyFixer(Rebalancer* rebalancer,
+                size_t max_moves_per_server,
+                boost::optional<MonoTime> deadline);
+
+    Status Init(std::vector<std::string> master_addresses) override;
+
+    void LoadMoves(std::vector<ReplicaMove> replica_moves) override;
+
+    bool ScheduleNextMove(bool* has_errors, bool* timed_out) override;
+
+    bool UpdateMovesInProgressStatus(bool* has_errors, bool* timed_out) override;
+
+   private:
+    // Key is tserver UUID which corresponds to value.ts_uuid_from.
+    typedef std::unordered_multimap<std::string, ReplicaMove> MovesToSchedule;
+
+    Status GetNextMovesImpl(std::vector<ReplicaMove>* replica_moves) override;
+
+    bool FindNextMove(ReplicaMove* move);
+
+    // An instance of the balancing algorithm.
+    LocationBalancingAlgo algorithm_;
+
+    // Moves yet to schedule.
+    MovesToSchedule moves_to_schedule_;
+  };
+
   friend class KsckResultsToClusterBalanceInfoTest;
 
-  // Convert ksck results into information relevant to rebalancing the cluster.
-  // Basically, 'raw' information is just a sub-set of relevant fields of the
-  // KsckResults structure filtered to contain information only for the
-  // specified location.
+  // Convert ksck results into information relevant to rebalancing the cluster
+  // at the location specified by 'location' parameter ('boost::none' for
+  // 'location' means that's about cross-location rebalancing). Basically,
+  // 'raw' information is just a sub-set of relevant fields of the KsckResults
+  // structure filtered to contain information only for the specified location.
   static Status KsckResultsToClusterRawInfo(
+      const boost::optional<std::string>& location,
       const KsckResults& ksck_info,
       ClusterRawInfo* raw_info);
 
@@ -330,12 +433,21 @@ class Rebalancer {
   static void FilterMoves(const MovesInProgress& scheduled_moves,
                           std::vector<ReplicaMove>* replica_moves);
 
+  // Filter the list of candidate tablets to make sure the location
+  // of the destination server would not contain the majority of replicas
+  // after the move. The 'tablet_ids' is an in-out parameter.
+  static Status FilterCrossLocationTabletCandidates(
+      const std::unordered_map<std::string, std::string>& location_by_ts_id,
+      const TabletsPlacementInfo& placement_info,
+      const TableReplicaMove& move,
+      std::vector<std::string>* tablet_ids);
+
   // Convert the 'raw' information about the cluster into information suitable
   // for the input of the high-level rebalancing algorithm.
   // The 'moves_in_progress' parameter contains information on the replica moves
   // which have been scheduled by a caller and still in progress: those are
   // considered as successfully completed and applied to the 'raw_info' when
-  // building ClusterInfo for the specified 'raw_info' input. The idea
+  // building ClusterBalanceInfo for the specified 'raw_info' input. The idea
   // is to prevent the algorithm outputting the same moves again while some
   // of the moves recommended at prior steps are still in progress.
   // The result cluster balance information is output into the 'info' parameter.
@@ -347,8 +459,10 @@ class Rebalancer {
   // Run rebalancing using the specified runner.
   Status RunWith(Runner* runner, RunStatus* result_status);
 
-  // Refresh the information on the cluster (involves running ksck).
-  Status GetClusterRawInfo(ClusterRawInfo* raw_info);
+  // Refresh the information on the cluster for the specified location
+  // (involves running ksck).
+  Status GetClusterRawInfo(const boost::optional<std::string>& location,
+                           ClusterRawInfo* raw_info);
 
   Status GetNextMoves(Runner* runner,
                       std::vector<ReplicaMove>* replica_moves);
@@ -359,14 +473,6 @@ class Rebalancer {
   // Configuration for the rebalancer.
   const Config config_;
 
-  // Random device and generator for selecting among multiple choices, when
-  // appropriate.
-  std::random_device random_device_;
-  std::mt19937 random_generator_;
-
-  // An instance of the balancing algorithm.
-  TwoDimensionalGreedyAlgo algo_;
-
   // Auxiliary Ksck object to get information on the cluster.
   std::shared_ptr<Ksck> ksck_;
 };

http://git-wip-us.apache.org/repos/asf/kudu/blob/81bba247/src/kudu/tools/rebalancer_tool-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/rebalancer_tool-test.cc b/src/kudu/tools/rebalancer_tool-test.cc
index 10fd9dd..60e9f45 100644
--- a/src/kudu/tools/rebalancer_tool-test.cc
+++ b/src/kudu/tools/rebalancer_tool-test.cc
@@ -24,6 +24,7 @@
 #include <ostream>
 #include <string>
 #include <thread>
+#include <unordered_map>
 #include <utility>
 #include <vector>
 
@@ -36,12 +37,16 @@
 #include "kudu/client/shared_ptr.h"
 #include "kudu/consensus/consensus.pb.h"
 #include "kudu/consensus/consensus.proxy.h"
+#include "kudu/consensus/quorum_util.h"
 #include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/stl_util.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/integration-tests/cluster_itest_util.h"
 #include "kudu/integration-tests/cluster_verifier.h"
 #include "kudu/integration-tests/test_workload.h"
 #include "kudu/integration-tests/ts_itest-base.h"
+#include "kudu/master/master.pb.h"
 #include "kudu/mini-cluster/external_mini_cluster.h"
 #include "kudu/rpc/rpc_controller.h"
 #include "kudu/tablet/tablet.pb.h"
@@ -70,6 +75,7 @@ using kudu::client::KuduSchema;
 using kudu::client::KuduSchemaBuilder;
 using kudu::client::KuduTableAlterer;
 using kudu::client::KuduTableCreator;
+using kudu::cluster::LocationInfo;
 using kudu::itest::TabletServerMap;
 using kudu::tserver::ListTabletsResponsePB;
 using std::atomic;
@@ -81,6 +87,7 @@ using std::string;
 using std::thread;
 using std::tuple;
 using std::unique_ptr;
+using std::unordered_map;
 using std::vector;
 using strings::Substitute;
 
@@ -211,7 +218,8 @@ static Status CreateUnbalancedTables(
     int rep_factor,
     int tserver_idx_from,
     int tserver_num,
-    int tserver_unresponsive_ms) {
+    int tserver_unresponsive_ms,
+    vector<string>* table_names = nullptr) {
   // Keep running only some tablet servers and shut down the rest.
   for (auto i = tserver_idx_from; i < tserver_num; ++i) {
     cluster->tablet_server(i)->Shutdown();
@@ -225,7 +233,7 @@ static Status CreateUnbalancedTables(
   // which are up and running.
   auto client_schema = KuduSchema::FromSchema(table_schema);
   for (auto i = 0; i < num_tables; ++i) {
-    const string table_name = Substitute(table_name_pattern, i);
+    string table_name = Substitute(table_name_pattern, i);
     unique_ptr<KuduTableCreator> table_creator(client->NewTableCreator());
     RETURN_NOT_OK(table_creator->table_name(table_name)
                   .schema(&client_schema)
@@ -240,6 +248,9 @@ static Status CreateUnbalancedTables(
       Substitute("--table_num_replicas=$0", rep_factor),
       "--string_fixed=unbalanced_tables_test",
     }));
+    if (table_names) {
+      table_names->emplace_back(std::move(table_name));
+    }
   }
 
   for (auto i = tserver_idx_from; i < tserver_num; ++i) {
@@ -394,7 +405,9 @@ class RebalancingTest : public tserver::TabletServerIntegrationTestBase {
   static const char* const kTableNamePattern;
 
   void Prepare(const vector<string>& extra_tserver_flags = {},
-               const vector<string>& extra_master_flags = {}) {
+               const vector<string>& extra_master_flags = {},
+               const LocationInfo& location_info = {},
+               vector<string>* created_tables_names = nullptr) {
     const auto& scheme_flag = Substitute(
         "--raft_prepare_replacement_before_eviction=$0", is_343_scheme());
     master_flags_.push_back(scheme_flag);
@@ -407,12 +420,12 @@ class RebalancingTest : public tserver::TabletServerIntegrationTestBase {
 
     FLAGS_num_tablet_servers = num_tservers_;
     FLAGS_num_replicas = rep_factor_;
-    NO_FATALS(BuildAndStart(tserver_flags_, master_flags_));
+    NO_FATALS(BuildAndStart(tserver_flags_, master_flags_, location_info));
 
     ASSERT_OK(CreateUnbalancedTables(
         cluster_.get(), client_.get(), schema_, kTableNamePattern,
         num_tables_, rep_factor_, rep_factor_ + 1, num_tservers_,
-        tserver_unresponsive_ms_));
+        tserver_unresponsive_ms_, created_tables_names));
   }
 
   // When the rebalancer starts moving replicas, ksck detects corruption
@@ -1143,5 +1156,110 @@ TEST_P(RebalancerAndSingleReplicaTablets, SingleReplicasStayOrMove) {
   NO_FATALS(v.CheckCluster());
 }
 
+// Basic fixture for the rebalancer tests.
+class LocationAwareRebalancingBasicTest : public RebalancingTest {
+ public:
+  LocationAwareRebalancingBasicTest()
+      : RebalancingTest(/*num_tables=*/ 12,
+                        /*rep_factor=*/ 3,
+                        /*num_tservers=*/ 6) {
+  }
+
+  bool is_343_scheme() const override {
+    // These tests are for the 3-4-3 replica management scheme only.
+    return true;
+  }
+};
+
+// Verifying the very basic functionality of the location-aware rebalancer:
+// given the very simple cluster configuration of 6 tablet servers spread
+// among 3 locations (2+2+2) and 12 tables with RF=3, the initially
+// imbalanced distribution of the replicas should become more balanced
+// and the placement policy constraints should be reimposed after running
+// the rebalancer tool.
+TEST_F(LocationAwareRebalancingBasicTest, Basic) {
+  if (!AllowSlowTests()) {
+    LOG(WARNING) << "test is skipped; set KUDU_ALLOW_SLOW_TESTS=1 to run";
+    return;
+  }
+
+  const LocationInfo location_info = { { "/A", 2 }, { "/B", 2 }, { "/C", 2 }, };
+  vector<string> table_names;
+  NO_FATALS(Prepare({}, {}, location_info, &table_names));
+
+  const vector<string> tool_args = {
+    "cluster",
+    "rebalance",
+    cluster_->master()->bound_rpc_addr().ToString(),
+  };
+
+  string out;
+  string err;
+  const auto s = RunKuduTool(tool_args, &out, &err);
+  ASSERT_TRUE(s.ok()) << ToolRunInfo(s, out, err);
+  ASSERT_STR_NOT_CONTAINS(s.ToString(), kExitOnSignalStr);
+
+  NO_FATALS(cluster_->AssertNoCrashes());
+  ClusterVerifier v(cluster_.get());
+  NO_FATALS(v.CheckCluster());
+
+  unordered_map<string, itest::TServerDetails*> ts_map;
+  ASSERT_OK(itest::CreateTabletServerMap(cluster_->master_proxy(0),
+                                         cluster_->messenger(),
+                                         &ts_map));
+  ValueDeleter deleter(&ts_map);
+
+  // Build tablet server UUID --> location map.
+  unordered_map<string, string> location_by_ts_id;
+  for (const auto& elem : ts_map) {
+    EmplaceOrDie(&location_by_ts_id, elem.first, elem.second->location);
+  }
+
+
+  for (const auto& table_name : table_names) {
+    master::GetTableLocationsResponsePB table_locations;
+    ASSERT_OK(itest::GetTableLocations(cluster_->master_proxy(),
+                                       table_name, MonoDelta::FromSeconds(30),
+                                       master::ANY_REPLICA,
+                                       &table_locations));
+    const auto tablet_num = table_locations.tablet_locations_size();
+    auto total_table_replica_count = 0;
+    unordered_map<string, int> total_count_per_location;
+    for (auto i = 0; i < tablet_num; ++i) {
+      const auto& location = table_locations.tablet_locations(i);
+      const auto& tablet_id = location.tablet_id();
+      unordered_map<string, int> count_per_location;
+      for (const auto& replica : location.replicas()) {
+        const auto& ts_id = replica.ts_info().permanent_uuid();
+        const auto& location = FindOrDie(location_by_ts_id, ts_id);
+        ++LookupOrEmplace(&count_per_location, location, 0);
+        ++LookupOrEmplace(&total_count_per_location, location, 0);
+        ++total_table_replica_count;
+      }
+
+      // Make sure no location has the majority of replicas for the tablet.
+      for (const auto& elem : count_per_location) {
+        const auto& location = elem.first;
+        const auto replica_count = elem.second;
+        ASSERT_GT(consensus::MajoritySize(rep_factor_), replica_count)
+            << Substitute("tablet $0 (table $1): $2 replicas out of $3 total "
+                          "are in location $4",
+                          tablet_id, table_name, replica_count, rep_factor_,
+                          location);
+
+      }
+
+      // Verify the overall replica distribution for the table.
+      double avg = static_cast<double>(total_table_replica_count) / location_info.size();
+      for (const auto& elem : total_count_per_location) {
+        const auto& location = elem.first;
+        const auto replica_num = elem.second;
+        ASSERT_GT(avg + 2, replica_num) << "at location " << location;
+        ASSERT_LT(avg - 2, replica_num) << "at location " << location;
+      }
+    }
+  }
+}
+
 } // namespace tools
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/81bba247/src/kudu/tools/tool_action_cluster.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/tool_action_cluster.cc b/src/kudu/tools/tool_action_cluster.cc
index b6a6981..d4989f3 100644
--- a/src/kudu/tools/tool_action_cluster.cc
+++ b/src/kudu/tools/tool_action_cluster.cc
@@ -101,6 +101,25 @@ DEFINE_bool(report_only, false,
             "Whether to report on table- and cluster-wide replica distribution "
             "skew and exit without doing any actual rebalancing");
 
+DEFINE_bool(disable_policy_fixer, false,
+            "In case of multi-location cluster, whether to detect and fix "
+            "placement policy violations. Fixing placement policy violations "
+            "involves moving tablet replicas across different locations "
+            "of the cluster. "
+            "This setting is applicable to multi-location clusters only.");
+
+DEFINE_bool(disable_cross_location_rebalancing, false,
+            "In case of multi-location cluster, whether to move tablet "
+            "replicas between locations in attempt to spread tablet replicas "
+            "among location evenly (equalizing loads of locations throughout "
+            "the cluster). "
+            "This setting is applicable to multi-location clusters only.");
+
+DEFINE_bool(disable_intra_location_rebalancing, false,
+            "In case of multi-location cluster, whether to rebalance tablet "
+            "replica distribution within each location. "
+            "This setting is applicable to multi-location clusters only.");
+
 static bool ValidateMoveSingleReplicas(const char* flag_name,
                                        const string& flag_value) {
   const vector<string> allowed_values = { "auto", "enabled", "disabled" };
@@ -260,7 +279,10 @@ Status RunRebalance(const RunnerContext& context) {
       FLAGS_max_staleness_interval_sec,
       FLAGS_max_run_time_sec,
       move_single_replicas,
-      FLAGS_output_replica_distribution_details));
+      FLAGS_output_replica_distribution_details,
+      !FLAGS_disable_policy_fixer,
+      !FLAGS_disable_cross_location_rebalancing,
+      !FLAGS_disable_intra_location_rebalancing));
 
   // Print info on pre-rebalance distribution of replicas.
   RETURN_NOT_OK(rebalancer.PrintStats(cout));
@@ -346,6 +368,9 @@ unique_ptr<Mode> BuildClusterMode() {
         .Description(desc)
         .ExtraDescription(extra_desc)
         .AddRequiredParameter({ kMasterAddressesArg, kMasterAddressesArgDesc })
+        .AddOptionalParameter("disable_policy_fixer")
+        .AddOptionalParameter("disable_cross_location_rebalancing")
+        .AddOptionalParameter("disable_intra_location_rebalancing")
         .AddOptionalParameter("max_moves_per_server")
         .AddOptionalParameter("max_run_time_sec")
         .AddOptionalParameter("max_staleness_interval_sec")


Mime
View raw message