kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ale...@apache.org
Subject kudu git commit: [location_awareness] replica selection honors placement policy
Date Wed, 12 Sep 2018 23:14:37 GMT
Repository: kudu
Updated Branches:
  refs/heads/master a7590d49c -> ebb2852d9


[location_awareness] replica selection honors placement policy

This patch introduces placement policy into the catalog manager's
replica selection process. The replica selection logic is factored out
into the new PlacementPolicy class.

In essence (for details see [1]), the placement policy is about:
  * in case of N locations, N > 2, not placing the majority of replicas
    in one location
  * spreading replicas evenly among available locations
  * within a location, spreading replicas evenly among tablet servers

This patch also adds a few test scenarios for the new functionality.

[1] https://s.apache.org/location-awareness-design

Change-Id: I4169098abf17d5591d4c1675561afc15b5477fcd
Reviewed-on: http://gerrit.cloudera.org:8080/11207
Reviewed-by: Adar Dembo <adar@cloudera.com>
Reviewed-by: Will Berkeley <wdberkeley@gmail.com>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/ebb2852d
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/ebb2852d
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/ebb2852d

Branch: refs/heads/master
Commit: ebb2852d99ed27c26e65c3569d5cb515754b2937
Parents: a7590d4
Author: Alexey Serbin <aserbin@cloudera.com>
Authored: Thu Aug 2 21:53:59 2018 -0700
Committer: Alexey Serbin <aserbin@cloudera.com>
Committed: Wed Sep 12 23:13:30 2018 +0000

----------------------------------------------------------------------
 src/kudu/gutil/map-util.h                |   3 +-
 src/kudu/master/CMakeLists.txt           |   2 +
 src/kudu/master/catalog_manager.cc       | 267 ++++--------
 src/kudu/master/catalog_manager.h        |  29 +-
 src/kudu/master/placement_policy-test.cc | 578 ++++++++++++++++++++++++++
 src/kudu/master/placement_policy.cc      | 362 ++++++++++++++++
 src/kudu/master/placement_policy.h       | 169 ++++++++
 src/kudu/master/ts_descriptor.cc         |  16 +-
 src/kudu/master/ts_descriptor.h          |  10 +-
 src/kudu/master/ts_manager.cc            |   4 +-
 src/kudu/master/ts_manager.h             |  10 +-
 11 files changed, 1227 insertions(+), 223 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/ebb2852d/src/kudu/gutil/map-util.h
----------------------------------------------------------------------
diff --git a/src/kudu/gutil/map-util.h b/src/kudu/gutil/map-util.h
index ef6baae..212a078 100644
--- a/src/kudu/gutil/map-util.h
+++ b/src/kudu/gutil/map-util.h
@@ -292,8 +292,7 @@ bool FindCopy(const Collection& collection,
 // Returns true iff the given collection contains the given key.
 template <class Collection, class Key>
 bool ContainsKey(const Collection& collection, const Key& key) {
-  auto it = collection.find(key);
-  return it != collection.end();
+  return collection.find(key) != collection.end();
 }
 
 // Returns true iff the given collection contains the given key-value pair.

http://git-wip-us.apache.org/repos/asf/kudu/blob/ebb2852d/src/kudu/master/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/master/CMakeLists.txt b/src/kudu/master/CMakeLists.txt
index bb6ccce..2947a12 100644
--- a/src/kudu/master/CMakeLists.txt
+++ b/src/kudu/master/CMakeLists.txt
@@ -41,6 +41,7 @@ set(MASTER_SRCS
   master_path_handlers.cc
   master_service.cc
   mini_master.cc
+  placement_policy.cc
   sys_catalog.cc
   ts_descriptor.cc
   ts_manager.cc)
@@ -75,6 +76,7 @@ ADD_KUDU_TEST(catalog_manager-test)
 ADD_KUDU_TEST(hms_notification_log_listener-test)
 ADD_KUDU_TEST(master-test RESOURCE_LOCK "master-web-port")
 ADD_KUDU_TEST(mini_master-test RESOURCE_LOCK "master-web-port")
+ADD_KUDU_TEST(placement_policy-test)
 ADD_KUDU_TEST(sys_catalog-test RESOURCE_LOCK "master-web-port")
 ADD_KUDU_TEST(ts_descriptor-test DATA_FILES testdata/first_argument.sh)
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/ebb2852d/src/kudu/master/catalog_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc
index 6a90906..be42e47 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -46,8 +46,10 @@
 #include <cstdlib>
 #include <functional>
 #include <iterator>
+#include <map>
 #include <memory>
 #include <mutex>
+#include <numeric>
 #include <ostream>
 #include <set>
 #include <string>
@@ -99,6 +101,7 @@
 #include "kudu/master/master.h"
 #include "kudu/master/master.pb.h"
 #include "kudu/master/master_cert_authority.h"
+#include "kudu/master/placement_policy.h"
 #include "kudu/master/sys_catalog.h"
 #include "kudu/master/ts_descriptor.h"
 #include "kudu/master/ts_manager.h"
@@ -281,6 +284,9 @@ using kudu::tablet::TabletDataState;
 using kudu::tablet::TabletReplica;
 using kudu::tablet::TabletStatePB;
 using kudu::tserver::TabletServerErrorPB;
+using std::accumulate;
+using std::inserter;
+using std::map;
 using std::pair;
 using std::set;
 using std::shared_ptr;
@@ -673,12 +679,12 @@ Status ProcessColumnPBDefaults(ColumnSchemaPB* col) {
 } // anonymous namespace
 
 CatalogManager::CatalogManager(Master* master)
-  : master_(master),
-    rng_(GetRandomSeed32()),
-    state_(kConstructed),
-    leader_ready_term_(-1),
-    hms_notification_log_event_id_(-1),
-    leader_lock_(RWMutex::Priority::PREFER_WRITING) {
+    : master_(master),
+      rng_(GetRandomSeed32()),
+      state_(kConstructed),
+      leader_ready_term_(-1),
+      hms_notification_log_event_id_(-1),
+      leader_lock_(RWMutex::Priority::PREFER_WRITING) {
   CHECK_OK(ThreadPoolBuilder("leader-initialization")
            // Presently, this thread pool must contain only a single thread
            // (to correctly serialize invocations of ElectedAsLeaderCb upon
@@ -3166,92 +3172,6 @@ class AsyncDeleteReplica : public RetrySpecificTSRpcTask {
   tserver::DeleteTabletResponsePB resp_;
 };
 
-namespace {
-
-// Given exactly two choices in 'two_choices', pick the better tablet server on
-// which to place a tablet replica. Ties are broken using 'rng'.
-shared_ptr<TSDescriptor> PickBetterReplicaLocation(const TSDescriptorVector& two_choices,
-                                                   ThreadSafeRandom* rng) {
-  DCHECK_EQ(two_choices.size(), 2);
-
-  const auto& a = two_choices[0];
-  const auto& b = two_choices[1];
-
-  // When creating replicas, we consider two aspects of load:
-  //   (1) how many tablet replicas are already on the server, and
-  //   (2) how often we've chosen this server recently.
-  //
-  // The first factor will attempt to put more replicas on servers that
-  // are under-loaded (eg because they have newly joined an existing cluster, or have
-  // been reformatted and re-joined).
-  //
-  // The second factor will ensure that we take into account the recent selection
-  // decisions even if those replicas are still in the process of being created (and thus
-  // not yet reported by the server). This is important because, while creating a table,
-  // we batch the selection process before sending any creation commands to the
-  // servers themselves.
-  //
-  // TODO(wdberkeley): in the future we may want to factor in other items such
-  // as available disk space, actual request load, etc.
-  double load_a = a->RecentReplicaCreations() + a->num_live_replicas();
-  double load_b = b->RecentReplicaCreations() + b->num_live_replicas();
-  if (load_a < load_b) {
-    return a;
-  }
-  if (load_b < load_a) {
-    return b;
-  }
-  // If the load is the same, we can just pick randomly.
-  return two_choices[rng->Uniform(2)];
-}
-
-// Given the tablet servers in 'ts_descs', use 'rng' to pick a tablet server to
-// host a tablet replica, excluding tablet servers in 'excluded'.
-// If there are no servers in 'ts_descs' that are not in 'excluded, return nullptr.
-shared_ptr<TSDescriptor> SelectReplica(const TSDescriptorVector& ts_descs,
-                                       const set<shared_ptr<TSDescriptor>>& excluded,
-                                       ThreadSafeRandom* rng) {
-  // The replica selection algorithm follows the idea from
-  // "Power of Two Choices in Randomized Load Balancing"[1]. For each replica,
-  // we randomly select two tablet servers, and then assign the replica to the
-  // less-loaded one of the two. This has some nice properties:
-  //
-  // 1) because the initial selection of two servers is random, we get good
-  //    spreading of replicas across the cluster. In contrast if we sorted by
-  //    load and always picked under-loaded servers first, we'd end up causing
-  //    all tablets of a new table to be placed on an empty server. This wouldn't
-  //    give good load balancing of that table.
-  //
-  // 2) because we pick the less-loaded of two random choices, we do end up with a
-  //    weighting towards filling up the underloaded one over time, without
-  //    the extreme scenario above.
-  //
-  // 3) because we don't follow any sequential pattern, every server is equally
-  //    likely to replicate its tablets to every other server. In contrast, a
-  //    round-robin design would enforce that each server only replicates to its
-  //    adjacent nodes in the TS sort order, limiting recovery bandwidth (see
-  //    KUDU-1317).
-  //
-  // [1] http://www.eecs.harvard.edu/~michaelm/postscripts/mythesis.pdf
-
-  // Pick two random servers, excluding those we've already picked.
-  // If we've only got one server left, 'two_choices' will actually
-  // just contain one element.
-  vector<shared_ptr<TSDescriptor>> two_choices;
-  rng->ReservoirSample(ts_descs, 2, excluded, &two_choices);
-
-  if (two_choices.size() == 2) {
-    // Pick the better of the two.
-    return PickBetterReplicaLocation(two_choices, rng);
-  }
-  if (two_choices.size() == 1) {
-    return two_choices[0];
-  }
-  return nullptr;
-}
-
-} // anonymous namespace
-
 // Send the "Alter Table" with the latest table schema to the leader replica
 // for the tablet.
 // Keeps retrying until we get an "ok" response.
@@ -3466,33 +3386,39 @@ bool AsyncAddReplicaTask::SendRequest(int attempt) {
   LOG(INFO) << Substitute("Sending $0 on tablet $1 (attempt $2)",
                           type_name(), tablet_->id(), attempt);
 
-  // Select the replica we wish to add to the config.
-  // Do not include current members of the config.
-  const auto& config = cstate_.committed_config();
-  set<shared_ptr<TSDescriptor>> excluded;
-  for (auto i = 0; i < config.peers_size(); ++i) {
-    shared_ptr<TSDescriptor> desc;
-    if (master_->ts_manager()->LookupTSByUUID(config.peers(i).permanent_uuid(),
-                                              &desc)) {
-      EmplaceOrDie(&excluded, std::move(desc));
+  Status s;
+  shared_ptr<TSDescriptor> extra_replica;
+  {
+    // Select the replica we wish to add to the config.
+    // Do not include current members of the config.
+    const auto& config = cstate_.committed_config();
+    TSDescriptorVector existing;
+    for (auto i = 0; i < config.peers_size(); ++i) {
+      shared_ptr<TSDescriptor> desc;
+      if (master_->ts_manager()->LookupTSByUUID(config.peers(i).permanent_uuid(),
+                                                &desc)) {
+        existing.emplace_back(std::move(desc));
+      }
     }
-  }
 
-  TSDescriptorVector ts_descs;
-  master_->ts_manager()->GetAllLiveDescriptors(&ts_descs);
+    TSDescriptorVector ts_descs;
+    master_->ts_manager()->GetAllLiveDescriptors(&ts_descs);
 
-  // Some of the tablet servers hosting the current members of the config
-  // (see the 'excluded' populated above) might be presumably dead.
-  // Inclusion of a presumably dead tablet server into 'excluded' is OK:
-  // SelectReplica() does not require elements of 'excluded' to be a subset of
-  // 'ts_descs', and 'ts_descs' contains only alive tablet servers. Essentially,
-  // the list of candidate tablet servers to host the extra replica
-  // is 'ts_descs' after blacklisting all elements common with 'excluded'.
-  auto extra_replica = SelectReplica(ts_descs, excluded, rng_);
-  if (PREDICT_FALSE(!extra_replica)) {
-    auto msg = Substitute("no extra replica candidate found for tablet $0",
-                          tablet_->ToString());
-    // Check whether it's a situation when an extra replica cannot be found
+    // Some of the tablet servers hosting the current members of the config
+    // (see the 'existing' populated above) might be presumably dead.
+    // Inclusion of a presumably dead tablet server into 'existing' is OK:
+    // PlacementPolicy::PlaceExtraTabletReplica() does not require elements of
+    // 'existing' to be a subset of 'ts_descs', and 'ts_descs' contains only
+    // alive tablet servers. Essentially, the list of candidate tablet servers
+    // to host the extra replica is 'ts_descs' after blacklisting all elements
+    // common with 'existing'.
+    PlacementPolicy policy(std::move(ts_descs), rng_);
+    s = policy.PlaceExtraTabletReplica(std::move(existing), &extra_replica);
+  }
+  if (PREDICT_FALSE(!s.ok())) {
+    auto msg = Substitute("no extra replica candidate found for tablet $0: $1",
+                          tablet_->ToString(), s.ToString());
+    // Check whether it's a situation when a replacement replica cannot be found
     // due to an inconsistency in cluster configuration. If the tablet has the
     // replication factor of N, and the cluster is using the N->(N+1)->N
     // replica management scheme (see --raft_prepare_replacement_before_eviction
@@ -3520,6 +3446,7 @@ bool AsyncAddReplicaTask::SendRequest(int attempt) {
     return false;
   }
 
+  DCHECK(extra_replica);
   consensus::ChangeConfigRequestPB req;
   req.set_dest_uuid(target_ts_desc_->permanent_uuid());
   req.set_tablet_id(tablet_->id());
@@ -4276,7 +4203,7 @@ void CatalogManager::HandleTabletSchemaVersionReport(
 }
 
 Status CatalogManager::ProcessPendingAssignments(
-    const vector<scoped_refptr<TabletInfo> >& tablets) {
+    const vector<scoped_refptr<TabletInfo>>& tablets) {
   VLOG(1) << "Processing pending assignments";
 
   // Take write locks on all tablets to be processed, and ensure that they are
@@ -4330,15 +4257,18 @@ Status CatalogManager::ProcessPendingAssignments(
   }
 
   // For those tablets which need to be created in this round, assign replicas.
-  TSDescriptorVector ts_descs;
-  master_->ts_manager()->GetAllLiveDescriptors(&ts_descs);
-
-  for (const auto& tablet : deferred.needs_create_rpc) {
-    // NOTE: if we fail to select replicas on the first pass (due to
-    // insufficient Tablet Servers being online), we will still try
-    // again unless the tablet/table creation is cancelled.
-    RETURN_NOT_OK_PREPEND(SelectReplicasForTablet(ts_descs, tablet),
-                          Substitute("error selecting replicas for tablet $0", tablet->id()));
+  {
+    TSDescriptorVector ts_descs;
+    master_->ts_manager()->GetAllLiveDescriptors(&ts_descs);
+    PlacementPolicy policy(std::move(ts_descs), &rng_);
+    for (auto& tablet : deferred.needs_create_rpc) {
+      // NOTE: if we fail to select replicas on the first pass (due to
+      // insufficient Tablet Servers being online), we will still try
+      // again unless the tablet/table creation is cancelled.
+      RETURN_NOT_OK_PREPEND(SelectReplicasForTablet(policy, tablet.get()),
+                            Substitute("error selecting replicas for tablet $0",
+                                       tablet->id()));
+    }
   }
 
   // Update the sys catalog with the new set of tablets/metadata.
@@ -4384,8 +4314,9 @@ Status CatalogManager::ProcessPendingAssignments(
   return Status::OK();
 }
 
-Status CatalogManager::SelectReplicasForTablet(const TSDescriptorVector& ts_descs,
-                                               const scoped_refptr<TabletInfo>& tablet) {
+Status CatalogManager::SelectReplicasForTablet(const PlacementPolicy& policy,
+                                               TabletInfo* tablet) {
+  DCHECK(tablet);
   TableMetadataLock table_guard(tablet->table().get(), LockMode::READ);
 
   if (!table_guard.data().pb.IsInitialized()) {
@@ -4394,30 +4325,44 @@ Status CatalogManager::SelectReplicasForTablet(const TSDescriptorVector& ts_desc
                    tablet->id()));
   }
 
-  int nreplicas = table_guard.data().pb.num_replicas();
-
-  if (ts_descs.size() < nreplicas) {
+  const auto nreplicas = table_guard.data().pb.num_replicas();
+  if (policy.ts_num() < nreplicas) {
     return Status::InvalidArgument(
         Substitute("Not enough tablet servers are online for table '$0'. Need at least $1 "
                    "replicas, but only $2 tablet servers are available",
-                   table_guard.data().name(), nreplicas, ts_descs.size()));
+                   table_guard.data().name(), nreplicas, policy.ts_num()));
   }
 
-  // Select the set of replicas for the tablet.
-  ConsensusStatePB* cstate = tablet->mutable_metadata()->mutable_dirty()
-          ->pb.mutable_consensus_state();
+  ConsensusStatePB* cstate = tablet->mutable_metadata()->
+      mutable_dirty()->pb.mutable_consensus_state();
   cstate->set_current_term(kMinimumTerm);
-  RaftConfigPB *config = cstate->mutable_committed_config();
-
+  RaftConfigPB* config = cstate->mutable_committed_config();
+  DCHECK_EQ(0, config->peers_size()) << "RaftConfig not empty: "
+                                     << SecureShortDebugString(*config);
+  config->clear_peers();
   // Maintain ability to downgrade Kudu to a version with LocalConsensus.
-  if (nreplicas == 1) {
-    config->set_obsolete_local(true);
-  } else {
-    config->set_obsolete_local(false);
+  config->set_obsolete_local(nreplicas == 1);
+  config->set_opid_index(consensus::kInvalidOpIdIndex);
+
+  // Select the set of replicas for the tablet.
+  TSDescriptorVector descriptors;
+  RETURN_NOT_OK_PREPEND(policy.PlaceTabletReplicas(nreplicas, &descriptors),
+                        Substitute("failed to place replicas for tablet $0 "
+                                   "(table '$1')",
+                                   tablet->id(), table_guard.data().name()));
+  for (const auto& desc : descriptors) {
+    ServerRegistrationPB reg;
+    desc->GetRegistration(&reg);
+
+    RaftPeerPB* peer = config->add_peers();
+    peer->set_member_type(RaftPeerPB::VOTER);
+    peer->set_permanent_uuid(desc->permanent_uuid());
+
+    for (const HostPortPB& addr : reg.rpc_addresses()) {
+      peer->mutable_last_known_addr()->CopyFrom(addr);
+    }
   }
 
-  config->set_opid_index(consensus::kInvalidOpIdIndex);
-  SelectReplicas(ts_descs, nreplicas, config);
   return Status::OK();
 }
 
@@ -4435,42 +4380,6 @@ void CatalogManager::SendCreateTabletRequest(const scoped_refptr<TabletInfo>& ta
   }
 }
 
-void CatalogManager::SelectReplicas(const TSDescriptorVector& ts_descs,
-                                    int nreplicas,
-                                    RaftConfigPB *config) {
-  DCHECK_EQ(0, config->peers_size()) << "RaftConfig not empty: " << SecureShortDebugString(*config);
-  DCHECK_LE(nreplicas, ts_descs.size());
-
-  // Keep track of servers we've already selected, so that we don't attempt to
-  // put two replicas on the same host.
-  set<shared_ptr<TSDescriptor> > already_selected;
-  for (int i = 0; i < nreplicas; ++i) {
-    shared_ptr<TSDescriptor> ts = SelectReplica(ts_descs, already_selected, &rng_);
-    // We must be able to find a tablet server for the replica because of
-    // checks before this function is called.
-    DCHECK(ts) << "ts_descs: " << ts_descs.size()
-               << " already_sel: " << already_selected.size();
-    InsertOrDie(&already_selected, ts);
-
-    // Increment the number of pending replicas so that we take this selection into
-    // account when assigning replicas for other tablets of the same table. This
-    // value decays back to 0 over time.
-    ts->IncrementRecentReplicaCreations();
-
-    ServerRegistrationPB reg;
-    ts->GetRegistration(&reg);
-
-    RaftPeerPB *peer = config->add_peers();
-    peer->set_member_type(RaftPeerPB::VOTER);
-    peer->set_permanent_uuid(ts->permanent_uuid());
-
-    // TODO: This is temporary, we will use only UUIDs
-    for (const HostPortPB& addr : reg.rpc_addresses()) {
-      peer->mutable_last_known_addr()->CopyFrom(addr);
-    }
-  }
-}
-
 Status CatalogManager::BuildLocationsForTablet(
     const scoped_refptr<TabletInfo>& tablet,
     master::ReplicaTypeFilter filter,

http://git-wip-us.apache.org/repos/asf/kudu/blob/ebb2852d/src/kudu/master/catalog_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/master/catalog_manager.h b/src/kudu/master/catalog_manager.h
index 525a736..bed52ce 100644
--- a/src/kudu/master/catalog_manager.h
+++ b/src/kudu/master/catalog_manager.h
@@ -14,8 +14,8 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-#ifndef KUDU_MASTER_CATALOG_MANAGER_H
-#define KUDU_MASTER_CATALOG_MANAGER_H
+
+#pragma once
 
 #include <atomic>
 #include <cstdint>
@@ -41,7 +41,6 @@
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/master/master.pb.h"
-#include "kudu/master/ts_manager.h"
 #include "kudu/tserver/tablet_replica_lookup.h"
 #include "kudu/tserver/tserver.pb.h"
 #include "kudu/util/cow_object.h"
@@ -96,6 +95,7 @@ namespace master {
 class CatalogManagerBgTasks;
 class HmsNotificationLogListenerTask;
 class Master;
+class PlacementPolicy;
 class SysCatalogTable;
 class TSDescriptor;
 class TableInfo;
@@ -870,22 +870,12 @@ class CatalogManager : public tserver::TabletReplicaLookupIf {
   // Loops through the "not created" tablets and sends a CreateTablet() request.
   Status ProcessPendingAssignments(const std::vector<scoped_refptr<TabletInfo> >& tablets);
 
-  // Select N Replicas from online tablet servers (as specified by
-  // 'ts_descs') for the specified tablet and populate the consensus configuration
-  // object. If 'ts_descs' does not specify enough online tablet
-  // servers to select the N replicas, return Status::InvalidArgument.
-  //
-  // This method is called by "ProcessPendingAssignments()".
-  Status SelectReplicasForTablet(const TSDescriptorVector& ts_descs,
-                                 const scoped_refptr<TabletInfo>& tablet);
-
-  // Select N Replicas from the online tablet servers
-  // and populate the consensus configuration object.
-  //
-  // This method is called by "SelectReplicasForTablet".
-  void SelectReplicas(const TSDescriptorVector& ts_descs,
-                      int nreplicas,
-                      consensus::RaftConfigPB *config);
+  // Selects the tservers where the newly-created tablet's replicas will be
+  // placed, populating its consensus configuration in the process.
+  // Returns InvalidArgument if there are not enough live tservers to host
+  // the required number of replicas. 'tablet' must not be null.
+  Status SelectReplicasForTablet(const PlacementPolicy& policy,
+                                 TabletInfo* tablet);
 
   // Handles 'tablet' currently in the PREPARING state.
   //
@@ -1045,4 +1035,3 @@ class CatalogManager : public tserver::TabletReplicaLookupIf {
 
 } // namespace master
 } // namespace kudu
-#endif /* KUDU_MASTER_CATALOG_MANAGER_H */

http://git-wip-us.apache.org/repos/asf/kudu/blob/ebb2852d/src/kudu/master/placement_policy-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/placement_policy-test.cc b/src/kudu/master/placement_policy-test.cc
new file mode 100644
index 0000000..136727f
--- /dev/null
+++ b/src/kudu/master/placement_policy-test.cc
@@ -0,0 +1,578 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/master/placement_policy.h"
+
+#include <cstddef>
+#include <map>
+#include <memory>
+#include <set>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include <boost/optional/optional.hpp>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/master/ts_descriptor.h"
+#include "kudu/util/random.h"
+#include "kudu/util/random_util.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+
+using std::make_shared;
+using std::map;
+using std::multiset;
+using std::set;
+using std::shared_ptr;
+using std::string;
+using std::vector;
+using strings::Substitute;
+
+namespace kudu {
+namespace master {
+
+// Fixture to run placement policy-related scenarios.
+class PlacementPolicyTest : public ::testing::Test {
+ public:
+  struct TSInfo {
+    const string id;          // TS identifier
+    const size_t replica_num; // number of tablet replicas hosted by TS
+  };
+
+  struct LocationInfo {
+    const string id;                // location identifier
+    vector<TSInfo> tablet_servers;  // tablet servers in the location
+  };
+
+  typedef map<string, shared_ptr<TSDescriptor>> TSDescriptorsMap;
+
+  PlacementPolicyTest()
+      : rng_(GetRandomSeed32()) {
+  }
+
+  const TSDescriptorVector& descriptors() const { return descriptors_; }
+
+  ThreadSafeRandom* rng() { return &rng_; }
+
+  // Get tablet server descriptors for the specified tablet server UUIDs.
+  TSDescriptorVector GetDescriptors(const vector<string>& uuids) const {
+    TSDescriptorVector result;
+    // O(n^2) is not the best way to do this, but it's OK the test purposes.
+    for (const auto& uuid : uuids) {
+      for (const auto& desc : descriptors_) {
+        if (uuid == desc->permanent_uuid()) {
+          result.push_back(desc);
+          break;
+        }
+      }
+    }
+    CHECK_EQ(uuids.size(), result.size());
+    return result;
+  }
+
+ protected:
+  // Convert the information on the cluster into TSDescriptorVector.
+  static Status PopulateCluster(const vector<LocationInfo>& cluster_info,
+                                TSDescriptorVector* descs) {
+    TSDescriptorVector ts_descriptors;
+    for (const auto& location_info : cluster_info) {
+      const auto& ts_infos = location_info.tablet_servers;
+      for (const auto& ts : ts_infos) {
+        shared_ptr<TSDescriptor> tsd(new TSDescriptor(ts.id));
+        tsd->set_num_live_replicas(ts.replica_num);
+        tsd->location_.emplace(location_info.id);
+        ts_descriptors.emplace_back(std::move(tsd));
+      }
+    }
+
+    *descs = std::move(ts_descriptors);
+    return Status::OK();
+  }
+
+  Status Prepare(const vector<LocationInfo>& cluster_info) {
+    return PopulateCluster(cluster_info, &descriptors_);
+  }
+
+  // In tests, working with multiset instead of ReplicaLocationsInfo proves
+  // to be a bit handier.
+  Status SelectLocations(int nreplicas,
+                         multiset<string>* locations) {
+    PlacementPolicy policy(descriptors_, &rng_);
+    PlacementPolicy::ReplicaLocationsInfo info;
+    RETURN_NOT_OK(policy.SelectReplicaLocations(nreplicas, &info));
+    for (const auto& elem : info) {
+      for (auto i = 0; i < elem.second; ++i) {
+        locations->emplace(elem.first);
+      }
+    }
+    return Status::OK();
+  }
+
+  static Status TSDescriptorVectorToMap(const TSDescriptorVector& v_descs,
+                                        TSDescriptorsMap* m_descs) {
+    for (const auto& desc : v_descs) {
+      const auto& uuid = desc->permanent_uuid();
+      if (!m_descs->emplace(uuid, desc).second) {
+        return Status::IllegalState(
+            Substitute("$0: TS descriptors with duplicate UUID", uuid));
+      }
+    }
+    return Status::OK();
+  }
+
+ private:
+  ThreadSafeRandom rng_;
+  TSDescriptorVector descriptors_;
+};
+
+TEST_F(PlacementPolicyTest, SelectLocationsEdgeCases) {
+  // 'No location case': expecting backward compatible behavior with
+  // non-location-aware logic.
+  const vector<LocationInfo> cluster_info = {
+    { "", { { "ts0", 0 }, { "ts1", 10 }, { "ts2", 1 }, } },
+  };
+  ASSERT_OK(Prepare(cluster_info));
+
+  {
+    multiset<string> locations;
+    auto s = SelectLocations(3, &locations);
+    ASSERT_TRUE(s.ok()) << s.ToString();
+    ASSERT_EQ(3, locations.size());
+    EXPECT_EQ(3, locations.count(""));
+  }
+
+  {
+    multiset<string> locations;
+    auto s = SelectLocations(5, &locations);
+    ASSERT_TRUE(locations.empty());
+    ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+    ASSERT_STR_CONTAINS(s.ToString(),
+        "could not find next location after placing 3 out of 5 tablet replicas");
+  }
+}
+
+TEST_F(PlacementPolicyTest, SelectLocations0) {
+  const vector<LocationInfo> cluster_info = {
+    { "A", { { "A_ts0", 0 }, { "A_ts1", 10 }, } },
+    { "B", { { "B_ts0", 0 }, { "B_ts1", 1 }, } },
+    { "C", {} },
+  };
+  ASSERT_OK(Prepare(cluster_info));
+
+  {
+    // No replicas are slated to be hosted by an empty location.
+    multiset<string> locations;
+    auto s = SelectLocations(2, &locations);
+    ASSERT_TRUE(s.ok()) << s.ToString();
+    ASSERT_EQ(2, locations.size());
+    EXPECT_EQ(1, locations.count("A"));
+    EXPECT_EQ(1, locations.count("B"));
+  }
+
+  {
+    // Choosing location B over A because B has lower weight.
+    multiset<string> locations;
+    auto s = SelectLocations(3, &locations);
+    ASSERT_TRUE(s.ok()) << s.ToString();
+    ASSERT_EQ(3, locations.size());
+    EXPECT_EQ(1, locations.count("A"));
+    EXPECT_EQ(2, locations.count("B"));
+  }
+
+  {
+    multiset<string> locations;
+    auto s = SelectLocations(4, &locations);
+    ASSERT_TRUE(s.ok()) << s.ToString();
+    ASSERT_EQ(4, locations.size());
+    EXPECT_EQ(2, locations.count("A"));
+    EXPECT_EQ(2, locations.count("B"));
+  }
+
+  {
+    multiset<string> locations;
+    auto s = SelectLocations(5, &locations);
+    ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+    ASSERT_STR_CONTAINS(s.ToString(),
+        "could not find next location after placing 4 out of 5 tablet replicas");
+  }
+}
+
+TEST_F(PlacementPolicyTest, SelectLocations1) {
+  const vector<LocationInfo> cluster_info = {
+    { "A", { { "A_ts0", 0 }, { "A_ts1", 10 }, { "A_ts2", 1 }, { "A_ts3", 3 }, } },
+    { "B", { { "B_ts0", 0 }, { "B_ts1", 1 }, { "B_ts2", 2 }, } },
+    { "C", { { "C_ts0", 0 }, { "C_ts1", 5}, } },
+    { "D", {} },
+  };
+  ASSERT_OK(Prepare(cluster_info));
+
+  {
+    multiset<string> locations;
+    auto s = SelectLocations(3, &locations);
+    ASSERT_TRUE(s.ok()) << s.ToString();
+    ASSERT_EQ(3, locations.size());
+    EXPECT_EQ(1, locations.count("A"));
+    EXPECT_EQ(1, locations.count("B"));
+    EXPECT_EQ(1, locations.count("C"));
+  }
+
+  for (auto loc_num : { 2, 3, 4, 5, 6, 7, 8, 9 }) {
+    multiset<string> locations;
+    auto s = SelectLocations(loc_num, &locations);
+    ASSERT_TRUE(s.ok()) << s.ToString();
+    ASSERT_EQ(loc_num, locations.size());
+    for (const auto& loc : locations) {
+      EXPECT_LT(locations.count(loc), loc_num / 2 + 1)
+          << loc << ": location is slated to contain the majority of replicas";
+    }
+  }
+}
+
+TEST_F(PlacementPolicyTest, PlaceExtraTabletReplicaNoLoc) {
+  // 'No location case': expecting backward compatible behavior with
+  // non-location-aware logic.
+  const vector<LocationInfo> cluster_info = {
+    { "", { { "ts0", 0 }, { "ts1", 10 }, { "ts2", 1 }, } },
+  };
+  ASSERT_OK(Prepare(cluster_info));
+
+  const auto& all = descriptors();
+  PlacementPolicy policy(all, rng());
+
+  {
+    TSDescriptorVector existing(all.begin(), all.end());
+    existing.pop_back();
+    shared_ptr<TSDescriptor> extra_ts;
+    ASSERT_OK(policy.PlaceExtraTabletReplica(existing, &extra_ts));
+    ASSERT_TRUE(extra_ts);
+    ASSERT_EQ("ts2", extra_ts->permanent_uuid());
+  }
+
+  {
+    TSDescriptorVector existing(all.begin(), all.end());
+    existing.pop_back();
+    existing.pop_back();
+    shared_ptr<TSDescriptor> extra_ts;
+    ASSERT_OK(policy.PlaceExtraTabletReplica(existing, &extra_ts));
+    ASSERT_TRUE(extra_ts);
+    ASSERT_EQ("ts2", extra_ts->permanent_uuid());
+  }
+
+  {
+    TSDescriptorVector existing(all.begin(), all.end());
+    shared_ptr<TSDescriptor> extra_ts;
+    const auto s = policy.PlaceExtraTabletReplica(existing, &extra_ts);
+    ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+    ASSERT_FALSE(extra_ts);
+    ASSERT_STR_CONTAINS(s.ToString(),
+                        "could not select location for extra replica");
+  }
+}
+
+TEST_F(PlacementPolicyTest, PlaceTabletReplicasNoLoc) {
+  // 'No location case': expecting backward-compatible behavior with the
+  // legacy (i.e. non-location-aware) logic.
+  const vector<LocationInfo> cluster_info = {
+    { "", { { "ts0", 0 }, { "ts1", 10 }, { "ts2", 1 }, } },
+  };
+  ASSERT_OK(Prepare(cluster_info));
+
+  const auto& all = descriptors();
+  PlacementPolicy policy(all, rng());
+
+  // Ask just for a single replica.
+  {
+    TSDescriptorVector result;
+    ASSERT_OK(policy.PlaceTabletReplicas(1, &result));
+    ASSERT_EQ(1, result.size());
+    TSDescriptorsMap m;
+    ASSERT_OK(TSDescriptorVectorToMap(result, &m));
+    ASSERT_EQ(1, m.size());
+    // "Power of Two Choices" should select less loaded servers.
+    ASSERT_TRUE(m.count("ts0") == 1 || m.count("ts2") == 1);
+    ASSERT_EQ(0, m.count("ts1"));
+  }
+
+  // Ask for number of replicas equal to the number of available tablet servers.
+  {
+    TSDescriptorVector result;
+    ASSERT_OK(policy.PlaceTabletReplicas(3, &result));
+    ASSERT_EQ(3, result.size());
+    TSDescriptorsMap m;
+    ASSERT_OK(TSDescriptorVectorToMap(result, &m));
+    ASSERT_EQ(1, m.count("ts0"));
+    ASSERT_EQ(1, m.count("ts1"));
+    ASSERT_EQ(1, m.count("ts2"));
+  }
+
+  // Try to ask for too many replicas when too few tablet servers are available.
+  {
+    TSDescriptorVector result;
+    auto s = policy.PlaceTabletReplicas(4, &result);
+    ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+    ASSERT_STR_CONTAINS(s.ToString(),
+                        "could not find next location after placing "
+                        "3 out of 4 tablet replicas");
+    ASSERT_TRUE(result.empty());
+  }
+}
+
+TEST_F(PlacementPolicyTest, PlaceTabletReplicas) {
+  const vector<LocationInfo> cluster_info = {
+    { "A", { { "A_ts0", 2 }, { "A_ts1", 1 }, { "A_ts2", 3 }, } },
+    { "B", { { "B_ts0", 1 }, { "B_ts1", 2 }, { "B_ts2", 3 }, } },
+    { "C", { { "C_ts0", 10 }, } },
+  };
+  ASSERT_OK(Prepare(cluster_info));
+
+  const auto& all = descriptors();
+  PlacementPolicy policy(all, rng());
+
+  // Ask for number of replicas equal to the number of available locations.
+  {
+    TSDescriptorVector result;
+    ASSERT_OK(policy.PlaceTabletReplicas(3, &result));
+    ASSERT_EQ(3, result.size());
+    TSDescriptorsMap m;
+    ASSERT_OK(TSDescriptorVectorToMap(result, &m));
+    ASSERT_EQ(1, m.count("A_ts0") + m.count("A_ts1"));
+    ASSERT_EQ(1, m.count("B_ts0") + m.count("B_ts1"));
+    ASSERT_EQ(0, m.count("A_ts2"));
+    ASSERT_EQ(0, m.count("B_ts2"));
+    ASSERT_EQ(1, m.count("C_ts0"));
+  }
+
+  // Make sure no location contains the majority of replicas when there is
+  // enough locations to spread the replicas.
+  {
+    TSDescriptorVector result;
+    ASSERT_OK(policy.PlaceTabletReplicas(5, &result));
+    ASSERT_EQ(5, result.size());
+    TSDescriptorsMap m;
+    ASSERT_OK(TSDescriptorVectorToMap(result, &m));
+    ASSERT_EQ(1, m.count("A_ts0"));
+    ASSERT_EQ(1, m.count("A_ts1"));
+    ASSERT_EQ(1, m.count("B_ts0"));
+    ASSERT_EQ(1, m.count("B_ts1"));
+    ASSERT_EQ(1, m.count("C_ts0"));
+  }
+
+  // Ask for number of replicas greater than the number of tablet servers.
+  {
+    TSDescriptorVector result;
+    auto s = policy.PlaceTabletReplicas(8, &result);
+    ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+    ASSERT_STR_CONTAINS(s.ToString(),
+                        "could not find next location after placing "
+                        "7 out of 8 tablet replicas");
+    ASSERT_TRUE(result.empty());
+  }
+}
+
+TEST_F(PlacementPolicyTest, PlaceTabletReplicasOneTSPerLocation) {
+  const vector<LocationInfo> cluster_info = {
+    { "A", { { "A_ts0", 1 }, } },
+    { "B", { { "B_ts0", 2 }, } },
+    { "C", { { "C_ts0", 0 }, } },
+    { "D", { { "D_ts0", 3 }, } },
+    { "E", { { "E_ts0", 4 }, } },
+  };
+  ASSERT_OK(Prepare(cluster_info));
+
+  const auto& all = descriptors();
+  PlacementPolicy policy(all, rng());
+
+  // Ask for number of replicas equal to the number of available locations.
+  {
+    TSDescriptorVector result;
+    ASSERT_OK(policy.PlaceTabletReplicas(3, &result));
+    ASSERT_EQ(3, result.size());
+    TSDescriptorsMap m;
+    ASSERT_OK(TSDescriptorVectorToMap(result, &m));
+    ASSERT_EQ(1, m.count("A_ts0"));
+    ASSERT_EQ(1, m.count("B_ts0"));
+    ASSERT_EQ(1, m.count("C_ts0"));
+    ASSERT_EQ(0, m.count("D_ts0"));
+    ASSERT_EQ(0, m.count("E_ts0"));
+  }
+
+  // Ask for number of replicas equal to the number of available locations.
+  {
+    TSDescriptorVector result;
+    ASSERT_OK(policy.PlaceTabletReplicas(5, &result));
+    ASSERT_EQ(5, result.size());
+    TSDescriptorsMap m;
+    ASSERT_OK(TSDescriptorVectorToMap(result, &m));
+    ASSERT_EQ(1, m.count("A_ts0"));
+    ASSERT_EQ(1, m.count("B_ts0"));
+    ASSERT_EQ(1, m.count("C_ts0"));
+    ASSERT_EQ(1, m.count("D_ts0"));
+    ASSERT_EQ(1, m.count("E_ts0"));
+  }
+
+  // Ask for number of replicas greater than the number of tablet servers.
+  {
+    TSDescriptorVector result;
+    auto s = policy.PlaceTabletReplicas(6, &result);
+    ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+    ASSERT_STR_CONTAINS(s.ToString(),
+                        "could not find next location after placing "
+                        "5 out of 6 tablet replicas");
+    ASSERT_TRUE(result.empty());
+  }
+}
+
+TEST_F(PlacementPolicyTest, PlaceTabletReplicasBalancingLocations) {
+  const vector<LocationInfo> cluster_info = {
+    { "A", { { "A_ts0", 0 }, { "A_ts1", 1 }, } },
+    { "B", { { "B_ts0", 1 }, { "B_ts1", 2 }, } },
+    { "C", { { "C_ts0", 2 }, } },
+    { "D", { { "D_ts0", 3 }, } },
+    { "E", { { "E_ts0", 10 }, } },
+  };
+  ASSERT_OK(Prepare(cluster_info));
+
+  const auto& all = descriptors();
+  PlacementPolicy policy(all, rng());
+
+  // Make sure no location contains the majority of replicas.
+  {
+    TSDescriptorVector result;
+    ASSERT_OK(policy.PlaceTabletReplicas(3, &result));
+    ASSERT_EQ(3, result.size());
+    TSDescriptorsMap m;
+    ASSERT_OK(TSDescriptorVectorToMap(result, &m));
+    ASSERT_EQ(1, m.count("A_ts0"));
+    ASSERT_EQ(0, m.count("A_ts1"));
+    ASSERT_EQ(1, m.count("B_ts0"));
+    ASSERT_EQ(0, m.count("B_ts1"));
+    ASSERT_EQ(1, m.count("C_ts0"));
+    ASSERT_EQ(0, m.count("D_ts0"));
+    ASSERT_EQ(0, m.count("E_ts0"));
+  }
+
+  // Current location selection algorithm loads the locations evenly.
+  {
+    TSDescriptorVector result;
+    ASSERT_OK(policy.PlaceTabletReplicas(5, &result));
+    ASSERT_EQ(5, result.size());
+    TSDescriptorsMap m;
+    ASSERT_OK(TSDescriptorVectorToMap(result, &m));
+    ASSERT_EQ(1, m.count("A_ts0"));
+    ASSERT_EQ(1, m.count("B_ts0"));
+    ASSERT_EQ(1, m.count("C_ts0"));
+    ASSERT_EQ(0, m.count("D_ts0"));
+    ASSERT_EQ(0, m.count("E_ts0"));
+  }
+
+  // Place as many talbet replicas as possible given the number of tablet
+  // servers in the cluster.
+  {
+    TSDescriptorVector result;
+    ASSERT_OK(policy.PlaceTabletReplicas(7, &result));
+    ASSERT_EQ(7, result.size());
+    TSDescriptorsMap m;
+    ASSERT_OK(TSDescriptorVectorToMap(result, &m));
+    for (const auto& uuid : { "A_ts0", "A_ts1", "B_ts0", "B_ts1", "C_ts0",
+                              "D_ts0", "E_ts0" }) {
+      ASSERT_EQ(1, m.count(uuid));
+    }
+  }
+}
+
+// A few test scenarios to verify the functionality of PlaceExtraTabletReplica()
+// in the presence of more than two locations. The use cases behind are
+// automatic re-replication and replica movement.
+TEST_F(PlacementPolicyTest, PlaceExtraTabletReplicaViolatedPolicy) {
+  const vector<LocationInfo> cluster_info = {
+    { "A", { { "A_ts0", 10 }, { "A_ts1", 9 }, { "A_ts2", 8 }, { "A_ts3", 0 }, } },
+    { "B", { { "B_ts0", 25 }, { "B_ts1", 50 }, { "B_ts2", 100 }, } },
+    { "C", { { "C_ts0", 0 }, { "C_ts1", 1 }, { "C_ts2", 2 }, } },
+  };
+  ASSERT_OK(Prepare(cluster_info));
+
+  const auto& all = descriptors();
+  PlacementPolicy policy(all, rng());
+
+  {
+    // Start with some replica distribution that violates the very basic
+    // constraint of no-majority-in-single-location. In this scenario, all three
+    // replicas are in same location. Let's make sure that an additional replica
+    // is placed elsewhere even if a spare tablet server is available
+    // in the same location.
+    const auto existing = GetDescriptors({ "A_ts0", "A_ts1", "A_ts2", });
+    shared_ptr<TSDescriptor> extra_ts;
+    ASSERT_OK(policy.PlaceExtraTabletReplica(existing, &extra_ts));
+    ASSERT_TRUE(extra_ts);
+    // Within location a replica is placed by the 'power of 2' algorithm.
+    ASSERT_TRUE(extra_ts->permanent_uuid() == "C_ts0" ||
+                extra_ts->permanent_uuid() == "C_ts1");
+
+  }
+
+  {
+    // Make sure the replica placement algorithm avoid placing an extra replica
+    // into the least loaded location if the no-majority-in-single-location
+    // constraint would be violated.
+    const auto existing = GetDescriptors({ "A_ts0", "A_ts1", "C_ts1", "C_ts2", });
+    shared_ptr<TSDescriptor> extra_ts;
+    ASSERT_OK(policy.PlaceExtraTabletReplica(existing, &extra_ts));
+    ASSERT_TRUE(extra_ts);
+    // Within location a replica is placed by the 'power of 2' algorithm.
+    ASSERT_TRUE(extra_ts->permanent_uuid() == "B_ts0" ||
+                extra_ts->permanent_uuid() == "B_ts1");
+  }
+}
+
+// Test for randomness while selecting among locations of the same load.
+TEST_F(PlacementPolicyTest, SelectLocationTest) {
+  const vector<LocationInfo> cluster_info = {
+    { "A", { { "A_ts0", 1 }, { "A_ts1", 1 }, { "A_ts2", 1 }, } },
+    { "B", { { "B_ts0", 1 }, { "B_ts1", 1 }, { "B_ts2", 1 }, } },
+    { "C", { { "C_ts0", 1 }, { "C_ts1", 1 }, { "C_ts2", 1 }, } },
+  };
+  const PlacementPolicy::ReplicaLocationsInfo info = {
+    { "A", 1 }, { "B", 1 }, { "C", 1 },
+  };
+
+  ASSERT_OK(Prepare(cluster_info));
+
+  const auto& all = descriptors();
+  PlacementPolicy policy(all, rng());
+
+  map<string, int> locations_stats;
+  // Test for uniform distribution of the selected locations if selecting among
+  // locations of the same load.
+  for (auto i = 0; i < 3000; ++i) {
+    string location;
+    ASSERT_OK(policy.SelectLocation(3, info, &location));
+    ++locations_stats[location];
+  }
+  ASSERT_EQ(3, locations_stats.size());
+  EXPECT_LT(500, locations_stats["A"]);
+  EXPECT_GT(1500, locations_stats["A"]);
+  EXPECT_LT(500, locations_stats["B"]);
+  EXPECT_GT(1500, locations_stats["B"]);
+  EXPECT_LT(500, locations_stats["C"]);
+  EXPECT_GT(1500, locations_stats["C"]);
+}
+
+} // namespace master
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/ebb2852d/src/kudu/master/placement_policy.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/placement_policy.cc b/src/kudu/master/placement_policy.cc
new file mode 100644
index 0000000..45f9ef7
--- /dev/null
+++ b/src/kudu/master/placement_policy.cc
@@ -0,0 +1,362 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/master/placement_policy.h"
+
+#include <cstdint>
+#include <iterator>
+#include <limits>
+#include <map>
+#include <memory>
+#include <numeric>
+#include <set>
+#include <string>
+#include <unordered_map>
+#include <utility>
+#include <vector>
+
+#include <boost/optional/optional.hpp>
+#include <glog/logging.h>
+
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/master/ts_descriptor.h"
+#include "kudu/util/random.h"
+
+using std::multimap;
+using std::numeric_limits;
+using std::set;
+using std::shared_ptr;
+using std::string;
+using std::unordered_map;
+using std::vector;
+using strings::Substitute;
+
+namespace kudu {
+namespace master {
+
+namespace {
+
+double GetTSLoad(TSDescriptor* desc) {
+  return desc->RecentReplicaCreations() + desc->num_live_replicas();
+}
+
+// Given exactly two choices in 'two_choices', pick the better tablet server on
+// which to place a tablet replica. Ties are broken using 'rng'.
+shared_ptr<TSDescriptor> PickBetterReplica(
+    const TSDescriptorVector& two_choices,
+    ThreadSafeRandom* rng) {
+  CHECK_EQ(2, two_choices.size());
+
+  const auto& a = two_choices[0];
+  const auto& b = two_choices[1];
+
+  // When creating replicas, we consider two aspects of load:
+  //   (1) how many tablet replicas are already on the server, and
+  //   (2) how often we've chosen this server recently.
+  //
+  // The first factor will attempt to put more replicas on servers that
+  // are under-loaded (eg because they have newly joined an existing cluster, or have
+  // been reformatted and re-joined).
+  //
+  // The second factor will ensure that we take into account the recent selection
+  // decisions even if those replicas are still in the process of being created (and thus
+  // not yet reported by the server). This is important because, while creating a table,
+  // we batch the selection process before sending any creation commands to the
+  // servers themselves.
+  //
+  // TODO(wdberkeley): in the future we may want to factor in other items such
+  // as available disk space, actual request load, etc.
+  double load_a = GetTSLoad(a.get());
+  double load_b = GetTSLoad(b.get());
+  if (load_a < load_b) {
+    return a;
+  }
+  if (load_b < load_a) {
+    return b;
+  }
+  // If the load is the same, we can just pick randomly.
+  return two_choices[rng->Uniform(2)];
+}
+
+} // anonymous namespace
+
+PlacementPolicy::PlacementPolicy(TSDescriptorVector descs,
+                                 ThreadSafeRandom* rng)
+    : ts_num_(descs.size()),
+      rng_(rng) {
+  CHECK(rng_);
+  for (auto& desc : descs) {
+    EmplaceOrDie(&known_ts_ids_, desc->permanent_uuid());
+    string location = desc->location() ? *desc->location() : "";
+    LookupOrEmplace(&ltd_, std::move(location),
+                    TSDescriptorVector()).emplace_back(std::move(desc));
+  }
+}
+
+Status PlacementPolicy::PlaceTabletReplicas(int nreplicas,
+                                            TSDescriptorVector* ts_descs) const {
+  DCHECK(ts_descs);
+
+  // Two-level approach for placing replicas:
+  //   1) select locations to place tablet replicas
+  //   2) in each location, select tablet servers to place replicas
+  ReplicaLocationsInfo locations_info;
+  RETURN_NOT_OK(SelectReplicaLocations(nreplicas, &locations_info));
+  for (const auto& elem : locations_info) {
+    const auto& loc = elem.first;
+    const auto loc_nreplicas = elem.second;
+    const auto& ts_descriptors = FindOrDie(ltd_, loc);
+    RETURN_NOT_OK(SelectReplicas(ts_descriptors, loc_nreplicas, ts_descs));
+  }
+  return Status::OK();
+}
+
+Status PlacementPolicy::PlaceExtraTabletReplica(
+    TSDescriptorVector existing,
+    shared_ptr<TSDescriptor>* ts_desc) const {
+  DCHECK(ts_desc);
+
+  // Convert input vector into a set, filtering out replicas that are located
+  // at unknown (i.e. unavailable) tablet servers.
+  int total_replicas_num = 0;
+  int unavailable_replicas_num = 0;
+  set<shared_ptr<TSDescriptor>> existing_set;
+  for (auto& ts : existing) {
+    ++total_replicas_num;
+    if (!ContainsKey(known_ts_ids_, ts->permanent_uuid())) {
+      ++unavailable_replicas_num;
+      continue;
+    }
+    EmplaceOrDie(&existing_set, std::move(ts));
+  }
+  VLOG(1) << Substitute("$0 out of $1 existing replicas are unavailable",
+                        unavailable_replicas_num, total_replicas_num);
+
+  ReplicaLocationsInfo location_info;
+  for (const auto& desc : existing_set) {
+    // It's OK to use an empty string in the meaning of 'no location'
+    // (instead of e.g. boost::none) because valid location strings begin with
+    // '/' and therefore are nonempty.
+    string location = desc->location() ? *desc->location() : "";
+    ++LookupOrEmplace(&location_info, std::move(location), 0);
+  }
+
+  string location;
+  RETURN_NOT_OK_PREPEND(
+      SelectLocation(total_replicas_num, location_info, &location),
+      "could not select location for extra replica");
+  const auto* location_ts_descs_ptr = FindOrNull(ltd_, location);
+  if (!location_ts_descs_ptr) {
+    return Status::IllegalState(
+        Substitute("'$0': no info on tablet servers at location", location));
+  }
+  auto replica = SelectReplica(*location_ts_descs_ptr, existing_set);
+  if (!replica) {
+    return Status::NotFound("could not find tablet server for extra replica");
+  }
+  *ts_desc = std::move(replica);
+
+  return Status::OK();
+}
+
+double PlacementPolicy::GetLocationLoad(
+    const string& location,
+    const ReplicaLocationsInfo& locations_info) const {
+  // Get information on the distribution of already existing tablet replicas
+  // among tablet servers in the specified location.
+  const auto& ts_descriptors = FindOrDie(ltd_, location);
+  CHECK(!ts_descriptors.empty());
+  // Count the number of already existing replicas at the specified location.
+  auto num_live_replicas = accumulate(
+        ts_descriptors.begin(), ts_descriptors.end(), 0,
+        [](int val, const shared_ptr<TSDescriptor>& desc) {
+          return val + desc->num_live_replicas();
+        });
+  // Add the number of to-be-replicas slated for the placement at the specified
+  // location.
+  const auto* location_rep_num_ptr = FindOrNull(locations_info, location);
+  if (location_rep_num_ptr) {
+    num_live_replicas += *location_rep_num_ptr;
+  }
+  return static_cast<double>(num_live_replicas) / ts_descriptors.size();
+}
+
+Status PlacementPolicy::SelectReplicaLocations(
+    int nreplicas,
+    ReplicaLocationsInfo* locations_info) const {
+  DCHECK(locations_info);
+
+  ReplicaLocationsInfo result_info;
+  auto placed_replicas_num = 0;
+  while (placed_replicas_num < nreplicas) {
+    // Check if any location is available. If not, that's the case when
+    // placing an additional replica in any of the existing locations would
+    // violate the 'one tablet replica per tablet server' policy.
+    string location;
+    RETURN_NOT_OK_PREPEND(
+        SelectLocation(nreplicas, result_info, &location),
+        Substitute("could not find next location after placing $0 out of $1 "
+                   "tablet replicas", placed_replicas_num, nreplicas));
+    ++result_info[location];
+    if (++placed_replicas_num == nreplicas) {
+      // Placed the required number of replicas.
+      break;
+    }
+  }
+
+  *locations_info = std::move(result_info);
+  return Status::OK();
+}
+
+Status PlacementPolicy::SelectReplicas(const TSDescriptorVector& source_ts_descs,
+                                       int nreplicas,
+                                       TSDescriptorVector* result_ts_descs) const {
+  if (nreplicas > source_ts_descs.size()) {
+    return Status::InvalidArgument(
+        Substitute("too few to choose from: $0 total, $1 requested",
+                   source_ts_descs.size(), nreplicas));
+  }
+
+  // Keep track of servers we've already selected, so that we don't attempt to
+  // put two replicas on the same host.
+  set<shared_ptr<TSDescriptor>> already_selected;
+  for (auto i = 0; i < nreplicas; ++i) {
+    auto ts = SelectReplica(source_ts_descs, already_selected);
+    CHECK(ts);
+
+    // Increment the number of pending replicas so that we take this selection
+    // into account when assigning replicas for other tablets of the same table.
+    // This value decays back to 0 over time.
+    ts->IncrementRecentReplicaCreations();
+    result_ts_descs->emplace_back(ts);
+    EmplaceOrDie(&already_selected, std::move(ts));
+  }
+  return Status::OK();
+}
+
+//
+// The replica selection algorithm follows the idea from
+// "Power of Two Choices in Randomized Load Balancing"[1]. For each replica,
+// we randomly select two tablet servers, and then assign the replica to the
+// less-loaded one of the two. This has some nice properties:
+//
+// 1) because the initial selection of two servers is random, we get good
+//    spreading of replicas across the cluster. In contrast if we sorted by
+//    load and always picked under-loaded servers first, we'd end up causing
+//    all tablets of a new table to be placed on an empty server. This wouldn't
+//    give good load balancing of that table.
+//
+// 2) because we pick the less-loaded of two random choices, we do end up with a
+//    weighting towards filling up the underloaded one over time, without
+//    the extreme scenario above.
+//
+// 3) because we don't follow any sequential pattern, every server is equally
+//    likely to replicate its tablets to every other server. In contrast, a
+//    round-robin design would enforce that each server only replicates to its
+//    adjacent nodes in the TS sort order, limiting recovery bandwidth (see
+//    KUDU-1317).
+//
+// [1] http://www.eecs.harvard.edu/~michaelm/postscripts/mythesis.pdf
+//
+shared_ptr<TSDescriptor> PlacementPolicy::SelectReplica(
+    const TSDescriptorVector& ts_descs,
+    const set<shared_ptr<TSDescriptor>>& excluded) const {
+  // Pick two random servers, excluding those we've already picked.
+  // If we've only got one server left, 'two_choices' will actually
+  // just contain one element.
+  vector<shared_ptr<TSDescriptor>> two_choices;
+  rng_->ReservoirSample(ts_descs, 2, excluded, &two_choices);
+  DCHECK_LE(two_choices.size(), 2);
+
+  if (two_choices.size() == 2) {
+    // Pick the better of the two.
+    return PickBetterReplica(two_choices, rng_);
+  }
+  if (two_choices.size() == 1) {
+    return two_choices.front();
+  }
+  return nullptr;
+}
+
+Status PlacementPolicy::SelectLocation(
+    int num_replicas,
+    const ReplicaLocationsInfo& locations_info,
+    string* location) const {
+  DCHECK(location);
+
+  // A pair of the location-per-load maps. The idea is to get a group to select
+  // the best location based on the load, while not placing the majority of
+  // replicas in same location, if possible. Using multimap (but not
+  // unordered_multimap) in order to maintain the entries in load-sorted order.
+  multimap<double, string> location_per_load;
+  multimap<double, string> location_per_load_overflow;
+  for (const auto& elem : ltd_) {
+    const auto& location = elem.first;
+    const auto* location_replicas_num_ptr = FindOrNull(locations_info, location);
+    if (location_replicas_num_ptr) {
+      const auto location_num_tservers = elem.second.size();
+      const auto location_replicas_num = *location_replicas_num_ptr;
+      CHECK_LE(location_replicas_num, location_num_tservers);
+      if (location_replicas_num == location_num_tservers) {
+        // It's not possible to place more than one replica of the same tablet
+        // per tablet server.
+        continue;
+      }
+      if (location_replicas_num + 1 > num_replicas / 2) {
+        // If possible, avoid placing the majority of the tablet's replicas
+        // into a single location even if load-based criterion would favor that.
+        // So, if placing one extra replica will add up to the majority, place
+        // this location into the overflow group.
+        location_per_load_overflow.emplace(
+            GetLocationLoad(location, locations_info), location);
+        continue;
+      }
+    }
+    location_per_load.emplace(
+        GetLocationLoad(location, locations_info), location);
+  }
+
+  if (location_per_load.empty()) {
+    // In case of one location or two locations and odd replication factor
+    // it's not possible to make every location contain less than the majority
+    // of replicas. Another case is when it's not enough tablet servers
+    // to place the requested number of replicas.
+    location_per_load.swap(location_per_load_overflow);
+  }
+
+  if (location_per_load.empty()) {
+    // The cluster cannot accommodate any additional tablet replica even if
+    // placing the majority of replicas into one location: not enough
+    // tablet servers.
+    return Status::NotFound("not enough tablet servers to satisfy replica "
+                            "placement policy");
+  }
+
+  auto it = location_per_load.begin();
+  const auto min_load = it->first;
+  const auto eq_load_range = location_per_load.equal_range(min_load);
+  const auto distance = std::distance(eq_load_range.first, eq_load_range.second);
+
+  // In case if multiple location candidates, select a random one.
+  std::advance(it, rng_->Uniform(distance));
+  *location = it->second;
+  return Status::OK();
+}
+
+} // namespace master
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/ebb2852d/src/kudu/master/placement_policy.h
----------------------------------------------------------------------
diff --git a/src/kudu/master/placement_policy.h b/src/kudu/master/placement_policy.h
new file mode 100644
index 0000000..653516c
--- /dev/null
+++ b/src/kudu/master/placement_policy.h
@@ -0,0 +1,169 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstddef>
+#include <memory>
+#include <set>
+#include <string>
+#include <unordered_map>
+#include <unordered_set>
+
+#include <gtest/gtest_prod.h>
+
+#include "kudu/master/ts_descriptor.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+class ThreadSafeRandom;
+
+namespace master {
+
+// Utility class to help place tablet replicas on tablet servers according
+// to the location awareness policy. Currently, this class implements the logic
+// specific to location awareness as described in [1], but it could enforce
+// other placement policies in the future.
+//
+// In essence (for details see [1]), the location awareness placement policy
+// is about:
+//   * in case of N locations, N > 2, not placing the majority of replicas
+//     in one location
+//   * spreading replicas evenly among available locations
+//   * within a location, spreading load evenly among tablet servers
+//
+// [1] https://s.apache.org/location-awareness-design
+//
+// TODO(aserbin): add a link to the doc once it appears in the upstream repo.
+//
+// NOTE: in the implementation of this class, it's OK to use an empty string
+// in place of boost::none for a location because valid location strings begin
+// with '/' and therefore are nonempty.
+class PlacementPolicy {
+ public:
+
+  // The 'descs' vector contains information on all available tablet servers
+  // in the cluster, the 'rng' parameter points to an instance of a random
+  // generator that the object references to. The random generator instance
+  // must not be nullptr and must outlive the PlacementPolicy object.
+  PlacementPolicy(TSDescriptorVector descs,
+                  ThreadSafeRandom* rng);
+
+  virtual ~PlacementPolicy() = default;
+
+  size_t ts_num() const { return ts_num_; }
+
+  // Select tablet servers to host the given number of replicas for a tablet.
+  // The 'nreplicas' parameter specifies the desired replication factor,
+  // the result set of tablet server descriptors is output into the 'ts_descs'
+  // placeholder (must not be null).
+  Status PlaceTabletReplicas(int nreplicas, TSDescriptorVector* ts_descs) const;
+
+  // Select tablet server to host an additional tablet replica. The 'existing'
+  // parameter lists current members of the tablet's Raft configuration,
+  // the new member is output into 'ts_desc' placeholer (must not be null).
+  Status PlaceExtraTabletReplica(TSDescriptorVector existing,
+                                 std::shared_ptr<TSDescriptor>* ts_desc) const;
+
+ private:
+  // Tablet server descriptors per location. This is the most comprehensive
+  // information on how tablet servers are placed among locations. Inherently,
+  // the locations have a sense of proximity and a hierarchy, so '/mega/turbo0'
+  // is closer to '/mega/turbo1' than '/giga/awesome0' and both '/mega/turbo0'
+  // and '/mega/turbo1' are affected by conditions currently affecting '/mega'.
+  // Number of locations is not supposed to be high: at the order of magnitude
+  // scale, that's about tens.
+  //
+  // NOTE: this dictionary container is made unordered since currently no code
+  //       is taking advantage of the order of the keys.
+  typedef std::unordered_map<std::string, TSDescriptorVector>
+      LocationToDescriptorsMap;
+
+  // Number of tablet replicas per location.
+  typedef std::unordered_map<std::string, int> ReplicaLocationsInfo;
+
+  friend class PlacementPolicyTest;
+  FRIEND_TEST(PlacementPolicyTest, SelectLocationTest);
+
+  // Get the load of the location: a location with N tablet servers and
+  // R replicas has load R/N.
+  //
+  // Parameters:
+  //   'location'       The location in question.
+  //   'locations_info' Information on tablet replicas slated for placement,
+  //                    but not created yet. That's the placement information
+  //                    on to-be-replicas in the context of optimizing tablet
+  //                    replica distribution in the cluster.
+  double GetLocationLoad(const std::string& location,
+                         const ReplicaLocationsInfo& locations_info) const;
+
+  // Select locations to place the given number of replicas ('nreplicas') for
+  // a new tablet. The locations are be chosen according to the placement
+  // policies.
+  //
+  // TODO (aserbin): add the reference to the document once it's in the repo.
+  Status SelectReplicaLocations(int nreplicas,
+                                ReplicaLocationsInfo* locations_info) const;
+
+  // Select the given number ('nreplicas') from the set of specified tablet
+  // servers to place tablet replicas.
+  Status SelectReplicas(const TSDescriptorVector& source_ts_desc,
+                        int nreplicas,
+                        TSDescriptorVector* result_ts_desc) const;
+
+  // Given the tablet servers in 'ts_descs', pick a tablet server to host
+  // a tablet replica, excluding tablet servers in 'excluded'. If there are no
+  // servers in 'ts_descs' that are not in 'existing', return nullptr.
+  std::shared_ptr<TSDescriptor> SelectReplica(
+      const TSDescriptorVector& ts_descs,
+      const std::set<std::shared_ptr<TSDescriptor>>& excluded) const;
+
+  // Select location for next replica of a tablet with the specified replication
+  // factor. In essence, the algorithm picks the least loaded location,
+  // making sure no location contains the majority of the replicas.
+  //
+  // Parameters:
+  //   'num_replicas'   The total number of tablet replicas to place.
+  //   'locations_info' Information on tablet replicas slated for placement,
+  //                    but not created yet. That's the placement information
+  //                    on to-be-replicas in the context of optimizing tablet
+  //                    replica distribution in the cluster.
+  //   'location'       The result location pointer, must not be null.
+  Status SelectLocation(int num_replicas,
+                        const ReplicaLocationsInfo& locations_info,
+                        std::string* location) const;
+
+  // Number of available tablet servers.
+  const size_t ts_num_;
+
+  // Random number generator used for selecting replica locations.
+  // The object that rng_ points at is supposed to be available during the whole
+  // lifetime of a PlacementPolicy object.
+  mutable ThreadSafeRandom* rng_;
+
+  // Location to TSDescriptorVector map: the distribution of all already
+  // existing tablet replicas among available tablet servers in the cluster,
+  // grouped by location.
+  LocationToDescriptorsMap ltd_;
+
+  // A set of known tablet server identifiers (derived from ltd_).
+  std::unordered_set<std::string> known_ts_ids_;
+};
+
+} // namespace master
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/ebb2852d/src/kudu/master/ts_descriptor.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/ts_descriptor.cc b/src/kudu/master/ts_descriptor.cc
index 409cc85..e3c7d76 100644
--- a/src/kudu/master/ts_descriptor.cc
+++ b/src/kudu/master/ts_descriptor.cc
@@ -147,9 +147,6 @@ TSDescriptor::TSDescriptor(std::string perm_id)
       num_live_replicas_(0) {
 }
 
-TSDescriptor::~TSDescriptor() {
-}
-
 // Compares two repeated HostPortPB fields. Returns true if equal, false otherwise.
 static bool HostPortPBsEqual(const google::protobuf::RepeatedPtrField<HostPortPB>& pb1,
                              const google::protobuf::RepeatedPtrField<HostPortPB>& pb2) {
@@ -179,7 +176,7 @@ Status TSDescriptor::RegisterUnlocked(const NodeInstancePB& instance,
   // host/port is stored persistently in each tablet's metadata.
   if (registration_ &&
       !HostPortPBsEqual(registration_->rpc_addresses(), registration.rpc_addresses())) {
-    string msg = strings::Substitute(
+    string msg = Substitute(
         "Tablet server $0 is attempting to re-register with a different host/port. "
         "This is not currently supported. Old: {$1} New: {$2}",
         instance.permanent_uuid(),
@@ -196,11 +193,10 @@ Status TSDescriptor::RegisterUnlocked(const NodeInstancePB& instance,
   }
 
   if (instance.instance_seqno() < latest_seqno_) {
-    return Status::AlreadyPresent(
-      strings::Substitute("Cannot register with sequence number $0:"
-                          " Already have a registration from sequence number $1",
-                          instance.instance_seqno(),
-                          latest_seqno_));
+    return Status::AlreadyPresent(Substitute(
+        "Cannot register with sequence number $0:"
+        " Already have a registration from sequence number $1",
+        instance.instance_seqno(), latest_seqno_));
   } else if (instance.instance_seqno() == latest_seqno_) {
     // It's possible that the TS registered, but our response back to it
     // got lost, so it's trying to register again with the same sequence
@@ -396,7 +392,7 @@ Status TSDescriptor::GetConsensusProxy(const shared_ptr<rpc::Messenger>& messeng
 string TSDescriptor::ToString() const {
   std::lock_guard<simple_spinlock> l(lock_);
   const auto& addr = registration_->rpc_addresses(0);
-  return strings::Substitute("$0 ($1:$2)", permanent_uuid_, addr.host(), addr.port());
+  return Substitute("$0 ($1:$2)", permanent_uuid_, addr.host(), addr.port());
 }
 
 } // namespace master

http://git-wip-us.apache.org/repos/asf/kudu/blob/ebb2852d/src/kudu/master/ts_descriptor.h
----------------------------------------------------------------------
diff --git a/src/kudu/master/ts_descriptor.h b/src/kudu/master/ts_descriptor.h
index ad85fc0..c7fdd60 100644
--- a/src/kudu/master/ts_descriptor.h
+++ b/src/kudu/master/ts_descriptor.h
@@ -21,11 +21,13 @@
 #include <memory>
 #include <mutex>
 #include <string>
+#include <vector>
 
 #include <boost/optional/optional.hpp>
 #include <glog/logging.h>
 #include <gtest/gtest_prod.h>
 
+#include "kudu/common/wire_protocol.pb.h"
 #include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/gutil/macros.h"
 #include "kudu/util/locks.h"
@@ -35,8 +37,6 @@
 
 namespace kudu {
 
-class NodeInstancePB;
-class ServerRegistrationPB;
 class Sockaddr;
 
 namespace consensus {
@@ -63,7 +63,7 @@ class TSDescriptor : public enable_make_shared<TSDescriptor> {
                             const ServerRegistrationPB& registration,
                             std::shared_ptr<TSDescriptor>* desc);
 
-  virtual ~TSDescriptor();
+  virtual ~TSDescriptor() = default;
 
   // Set the last-heartbeat time to now.
   void UpdateHeartbeatTime();
@@ -137,6 +137,7 @@ class TSDescriptor : public enable_make_shared<TSDescriptor> {
 
  private:
   FRIEND_TEST(TestTSDescriptor, TestReplicaCreationsDecay);
+  friend class PlacementPolicyTest;
 
   Status RegisterUnlocked(const NodeInstancePB& instance,
                           const ServerRegistrationPB& registration);
@@ -175,6 +176,9 @@ class TSDescriptor : public enable_make_shared<TSDescriptor> {
   DISALLOW_COPY_AND_ASSIGN(TSDescriptor);
 };
 
+// Alias for a vector of tablet server descriptors.
+typedef std::vector<std::shared_ptr<TSDescriptor>> TSDescriptorVector;
+
 } // namespace master
 } // namespace kudu
 #endif /* KUDU_MASTER_TS_DESCRIPTOR_H */

http://git-wip-us.apache.org/repos/asf/kudu/blob/ebb2852d/src/kudu/master/ts_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/ts_manager.cc b/src/kudu/master/ts_manager.cc
index 39e4df6..4e4bebf 100644
--- a/src/kudu/master/ts_manager.cc
+++ b/src/kudu/master/ts_manager.cc
@@ -109,13 +109,13 @@ Status TSManager::RegisterTS(const NodeInstancePB& instance,
   return Status::OK();
 }
 
-void TSManager::GetAllDescriptors(vector<shared_ptr<TSDescriptor>> *descs) const {
+void TSManager::GetAllDescriptors(TSDescriptorVector* descs) const {
   descs->clear();
   shared_lock<rw_spinlock> l(lock_);
   AppendValuesFromMap(servers_by_id_, descs);
 }
 
-void TSManager::GetAllLiveDescriptors(vector<shared_ptr<TSDescriptor>> *descs) const {
+void TSManager::GetAllLiveDescriptors(TSDescriptorVector* descs) const {
   descs->clear();
 
   shared_lock<rw_spinlock> l(lock_);

http://git-wip-us.apache.org/repos/asf/kudu/blob/ebb2852d/src/kudu/master/ts_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/master/ts_manager.h b/src/kudu/master/ts_manager.h
index 1e716ff..b6dc306 100644
--- a/src/kudu/master/ts_manager.h
+++ b/src/kudu/master/ts_manager.h
@@ -20,10 +20,10 @@
 #include <memory>
 #include <string>
 #include <unordered_map>
-#include <vector>
 
 #include "kudu/gutil/macros.h"
 #include "kudu/gutil/ref_counted.h"
+#include "kudu/master/ts_descriptor.h"
 #include "kudu/util/locks.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/status.h"
@@ -35,10 +35,6 @@ class ServerRegistrationPB;
 
 namespace master {
 
-class TSDescriptor;
-
-typedef std::vector<std::shared_ptr<TSDescriptor>> TSDescriptorVector;
-
 // Tracks the servers that the master has heard from, along with their
 // last heartbeat, etc.
 //
@@ -76,11 +72,11 @@ class TSManager {
 
   // Return all of the currently registered TS descriptors into the provided
   // list.
-  void GetAllDescriptors(std::vector<std::shared_ptr<TSDescriptor>>* descs) const;
+  void GetAllDescriptors(TSDescriptorVector* descs) const;
 
   // Return all of the currently registered TS descriptors that have sent a
   // heartbeat recently, indicating that they're alive and well.
-  void GetAllLiveDescriptors(std::vector<std::shared_ptr<TSDescriptor>>* descs) const;
+  void GetAllLiveDescriptors(TSDescriptorVector* descs) const;
 
   // Get the TS count.
   int GetCount() const;


Mime
View raw message