kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject incubator-kudu git commit: KUDU-1317. Spread creation of new tablets more randomly
Date Fri, 29 Jan 2016 23:52:26 GMT
Repository: incubator-kudu
Updated Branches:
  refs/heads/master 18dbc453a -> 30773866c


KUDU-1317. Spread creation of new tablets more randomly

This switches the assignment of new tablets on the cluster from being a pure
round-robin to instead use the "power-of-two-choices" algorithm. In this
algorithm, each time we need to select a replica location, we sample a random
two nodes in the cluster, and choose whichever of those has fewer tablets
already on it. Per a simple Python script simulation (attached to the JIRA),
this does a much better job of even distribution than pure random selection,
but also avoids completely overloading a reformatted node with all replicas
of a new table.

This patch aims to solve a problem we're seeing with the round-robin
strategy where recovery is not well parallelized. Because of round-robin,
each tablet server only shares replicas with its "adjacent" nodes in the
round robin ring. So, if a server crashes, at most 4 other servers can
participate in recovery.

To check that the new test isn't flaky, I looped it 1000 times:
http://dist-test.cloudera.org/job?job_id=todd.1453857519.32553

Change-Id: I8a27c2ed52b49baeffb309ebecd3d58192eaeec6
Reviewed-on: http://gerrit.cloudera.org:8080/1654
Reviewed-by: Jean-Daniel Cryans
Reviewed-by: Adar Dembo <adar@cloudera.com>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/30773866
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/30773866
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/30773866

Branch: refs/heads/master
Commit: 30773866cfec5c112e1b3f385808d88eeaa1d586
Parents: 18dbc45
Author: Todd Lipcon <todd@cloudera.com>
Authored: Wed Dec 16 20:51:51 2015 -0800
Committer: Todd Lipcon <todd@apache.org>
Committed: Fri Jan 29 23:52:10 2016 +0000

----------------------------------------------------------------------
 .../integration-tests/create-table-itest.cc     |  74 ++++++++++++
 src/kudu/master/catalog_manager.cc              | 117 +++++++++++++++++--
 src/kudu/master/catalog_manager.h               |  16 +++
 src/kudu/master/master.proto                    |   4 +
 src/kudu/master/master_service.cc               |   1 +
 src/kudu/master/ts_descriptor.cc                |  11 +-
 src/kudu/master/ts_descriptor.h                 |  37 ++++++
 src/kudu/tserver/heartbeater.cc                 |   1 +
 src/kudu/tserver/ts_tablet_manager.cc           |  15 ++-
 src/kudu/tserver/ts_tablet_manager.h            |   3 +
 src/kudu/util/random-test.cc                    |  66 +++++++++++
 src/kudu/util/random.h                          |  43 +++++++
 12 files changed, 376 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/30773866/src/kudu/integration-tests/create-table-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/create-table-itest.cc b/src/kudu/integration-tests/create-table-itest.cc
index 055fa44..3583329 100644
--- a/src/kudu/integration-tests/create-table-itest.cc
+++ b/src/kudu/integration-tests/create-table-itest.cc
@@ -17,7 +17,9 @@
 
 #include <gflags/gflags.h>
 #include <gtest/gtest.h>
+#include <map>
 #include <memory>
+#include <set>
 #include <string>
 
 #include "kudu/client/client-test-util.h"
@@ -25,6 +27,8 @@
 #include "kudu/integration-tests/external_mini_cluster-itest-base.h"
 #include "kudu/util/metrics.h"
 
+using std::multimap;
+using std::set;
 using std::string;
 using std::vector;
 
@@ -113,4 +117,74 @@ TEST_F(CreateTableITest, TestCreateWhenMajorityOfReplicasFailCreation)
{
   }
 }
 
+// Regression test for KUDU-1317. Ensure that, when a table is created,
+// the tablets are well spread out across the machines in the cluster and
+// that recovery from failures will be well parallelized.
+TEST_F(CreateTableITest, TestSpreadReplicasEvenly) {
+  const int kNumServers = 10;
+  const int kNumTablets = 20;
+  vector<string> ts_flags;
+  vector<string> master_flags;
+  ts_flags.push_back("--never_fsync"); // run faster on slow disks
+  NO_FATALS(StartCluster(ts_flags, master_flags, kNumServers));
+
+  gscoped_ptr<client::KuduTableCreator> table_creator(client_->NewTableCreator());
+  client::KuduSchema client_schema(client::KuduSchemaFromSchema(GetSimpleTestSchema()));
+  ASSERT_OK(table_creator->table_name(kTableName)
+            .schema(&client_schema)
+            .num_replicas(3)
+            .add_hash_partitions({ "key" }, kNumTablets)
+            .Create());
+
+  // Check that the replicas are fairly well spread by computing the standard
+  // deviation of the number of replicas per server.
+  const double kMeanPerServer = kNumTablets * 3.0 / kNumServers;
+  double sum_squared_deviation = 0;
+  vector<int> tablet_counts;
+  for (int ts_idx = 0; ts_idx < kNumServers; ts_idx++) {
+    int num_replicas = inspect_->ListTabletsOnTS(ts_idx).size();
+    LOG(INFO) << "TS " << ts_idx << " has " << num_replicas <<
" tablets";
+    double deviation = static_cast<double>(num_replicas) - kMeanPerServer;
+    sum_squared_deviation += deviation * deviation;
+  }
+  double stddev = sqrt(sum_squared_deviation / (kMeanPerServer - 1));
+  LOG(INFO) << "stddev = " << stddev;
+  // In 1000 runs of the test, only one run had stddev above 2.0. So, 3.0 should
+  // be a safe non-flaky choice.
+  ASSERT_LE(stddev, 3.0);
+
+  // Construct a map from tablet ID to the set of servers that each tablet is hosted on.
+  multimap<string, int> tablet_to_servers;
+  for (int ts_idx = 0; ts_idx < kNumServers; ts_idx++) {
+    vector<string> tablets = inspect_->ListTabletsOnTS(ts_idx);
+    for (const string& tablet_id : tablets) {
+      tablet_to_servers.insert(std::make_pair(tablet_id, ts_idx));
+    }
+  }
+
+  // For each server, count how many other servers it shares tablets with.
+  // This is highly correlated to how well parallelized recovery will be
+  // in the case the server crashes.
+  int sum_num_peers = 0;
+  for (int ts_idx = 0; ts_idx < kNumServers; ts_idx++) {
+    vector<string> tablets = inspect_->ListTabletsOnTS(ts_idx);
+    set<int> peer_servers;
+    for (const string& tablet_id : tablets) {
+      auto peer_indexes = tablet_to_servers.equal_range(tablet_id);
+      for (auto it = peer_indexes.first; it != peer_indexes.second; ++it) {
+        peer_servers.insert(it->second);
+      }
+    }
+
+    peer_servers.erase(ts_idx);
+    LOG(INFO) << "Server " << ts_idx << " has " << peer_servers.size()
<< " peers";
+    sum_num_peers += peer_servers.size();
+  }
+
+  // On average, servers should have at least half the other servers as peers.
+  double avg_num_peers = static_cast<double>(sum_num_peers) / kNumServers;
+  LOG(INFO) << "avg_num_peers = " << avg_num_peers;
+  ASSERT_GE(avg_num_peers, kNumServers / 2);
+}
+
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/30773866/src/kudu/master/catalog_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc
index e328a8f..3d7867d 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -43,6 +43,7 @@
 #include <glog/logging.h>
 
 #include <algorithm>
+#include <set>
 #include <string>
 #include <vector>
 
@@ -76,6 +77,7 @@
 #include "kudu/util/flag_tags.h"
 #include "kudu/util/logging.h"
 #include "kudu/util/monotime.h"
+#include "kudu/util/random_util.h"
 #include "kudu/util/stopwatch.h"
 #include "kudu/util/thread.h"
 #include "kudu/util/threadpool.h"
@@ -344,7 +346,20 @@ void CatalogManagerBgTasks::Shutdown() {
 }
 
 void CatalogManagerBgTasks::Run() {
+  MonoTime last_process_time = MonoTime::Now(MonoTime::FINE);
   while (!NoBarrier_Load(&closing_)) {
+    MonoTime now = MonoTime::Now(MonoTime::FINE);
+    double since_last_process = now.GetDeltaSince(last_process_time).ToSeconds();
+    last_process_time = now;
+
+    // Decay load estimates on tablet servers.
+    TSDescriptorVector ts_descs;
+    catalog_manager_->master_->ts_manager()->GetAllLiveDescriptors(&ts_descs);
+    for (const auto& ts : ts_descs) {
+      ts->DecayRecentReplicaCreations(since_last_process);
+    }
+
+    // Perform assignment processing.
     if (!catalog_manager_->IsInitialized()) {
       LOG(WARNING) << "Catalog manager is not initialized!";
     } else if (catalog_manager_->CheckIsLeaderAndReady().ok()) {
@@ -422,6 +437,7 @@ void CheckIfNoLongerLeaderAndSetupError(Status s, RespClass* resp) {
 
 CatalogManager::CatalogManager(Master *master)
   : master_(master),
+    rng_(GetRandomSeed32()),
     state_(kConstructed),
     leader_ready_term_(-1) {
   CHECK_OK(ThreadPoolBuilder("leader-initialization")
@@ -2813,22 +2829,103 @@ void CatalogManager::SendCreateTabletRequests(const vector<TabletInfo*>&
tablets
   }
 }
 
+shared_ptr<TSDescriptor> CatalogManager::PickBetterReplicaLocation(
+    const TSDescriptorVector& two_choices) {
+  DCHECK_EQ(two_choices.size(), 2);
+
+  const auto& a = two_choices[0];
+  const auto& b = two_choices[1];
+
+  // When creating replicas, we consider two aspects of load:
+  //   (1) how many tablet replicas are already on the server, and
+  //   (2) how often we've chosen this server recently.
+  //
+  // The first factor will attempt to put more replicas on servers that
+  // are under-loaded (eg because they have newly joined an existing cluster, or have
+  // been reformatted and re-joined).
+  //
+  // The second factor will ensure that we take into account the recent selection
+  // decisions even if those replicas are still in the process of being created (and thus
+  // not yet reported by the server). This is important because, while creating a table,
+  // we batch the selection process before sending any creation commands to the
+  // servers themselves.
+  //
+  // TODO: in the future we may want to factor in other items such as available disk space,
+  // actual request load, etc.
+  double load_a = a->recent_replica_creations() + a->num_live_replicas();
+  double load_b = b->recent_replica_creations() + b->num_live_replicas();
+  if (load_a < load_b) {
+    return a;
+  } else if (load_b < load_a) {
+    return b;
+  } else {
+    // If the load is the same, we can just pick randomly.
+    return two_choices[rng_.Uniform(2)];
+  }
+}
+
+shared_ptr<TSDescriptor> CatalogManager::SelectReplica(
+    const TSDescriptorVector& ts_descs,
+    const set<shared_ptr<TSDescriptor>>& excluded) {
+  // The replica selection algorithm follows the idea from
+  // "Power of Two Choices in Randomized Load Balancing"[1]. For each replica,
+  // we randomly select two tablet servers, and then assign the replica to the
+  // less-loaded one of the two. This has some nice properties:
+  //
+  // 1) because the initial selection of two servers is random, we get good
+  //    spreading of replicas across the cluster. In contrast if we sorted by
+  //    load and always picked under-loaded servers first, we'd end up causing
+  //    all tablets of a new table to be placed on an empty server. This wouldn't
+  //    give good load balancing of that table.
+  //
+  // 2) because we pick the less-loaded of two random choices, we do end up with a
+  //    weighting towards filling up the underloaded one over time, without
+  //    the extreme scenario above.
+  //
+  // 3) because we don't follow any sequential pattern, every server is equally
+  //    likely to replicate its tablets to every other server. In contrast, a
+  //    round-robin design would enforce that each server only replicates to its
+  //    adjacent nodes in the TS sort order, limiting recovery bandwidth (see
+  //    KUDU-1317).
+  //
+  // [1] http://www.eecs.harvard.edu/~michaelm/postscripts/mythesis.pdf
+
+  // Pick two random servers, excluding those we've already picked.
+  // If we've only got one server left, 'two_choices' will actually
+  // just contain one element.
+  vector<shared_ptr<TSDescriptor> > two_choices;
+  rng_.ReservoirSample(ts_descs, 2, excluded, &two_choices);
+
+  if (two_choices.size() == 2) {
+    // Pick the better of the two.
+    return PickBetterReplicaLocation(two_choices);
+  }
+
+  // If we couldn't randomly sample two servers, it's because we only had one
+  // more non-excluded choice left.
+  CHECK_EQ(1, ts_descs.size() - excluded.size())
+      << "ts_descs: " << ts_descs.size() << " already_sel: " << excluded.size();
+  return two_choices[0];
+}
+
 void CatalogManager::SelectReplicas(const TSDescriptorVector& ts_descs,
                                     int nreplicas,
                                     consensus::RaftConfigPB *config) {
-  // TODO: Select N Replicas
-  // at the moment we have to scan all the tablets to build a map TS -> tablets
-  // to know how many tablets a TS has... so, let's do a dumb assignment for now.
-  //
-  // Using a static variable here ensures that we round-robin our assignments.
-  // TODO: In the future we should do something smarter based on number of tablets currently
-  // running on each server, since round-robin may get unbalanced after moves/deletes.
-
   DCHECK_EQ(0, config->peers_size()) << "RaftConfig not empty: " << config->ShortDebugString();
+  DCHECK_LE(nreplicas, ts_descs.size());
 
-  static int index = rand();
+  // Keep track of servers we've already selected, so that we don't attempt to
+  // put two replicas on the same host.
+  set<shared_ptr<TSDescriptor> > already_selected;
   for (int i = 0; i < nreplicas; ++i) {
-    const TSDescriptor *ts = ts_descs[index++ % ts_descs.size()].get();
+    shared_ptr<TSDescriptor> ts = SelectReplica(ts_descs, already_selected);
+    InsertOrDie(&already_selected, ts);
+
+    // Increment the number of pending replicas so that we take this selection into
+    // account when assigning replicas for other tablets of the same table.
+    //
+    // This value gets decayed by the catalog manager background task.
+    ts->increment_recent_replica_creations();
 
     TSRegistrationPB reg;
     ts->GetRegistration(&reg);

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/30773866/src/kudu/master/catalog_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/master/catalog_manager.h b/src/kudu/master/catalog_manager.h
index b74cf39..2365361 100644
--- a/src/kudu/master/catalog_manager.h
+++ b/src/kudu/master/catalog_manager.h
@@ -20,6 +20,7 @@
 #include <boost/optional/optional_fwd.hpp>
 #include <boost/thread/mutex.hpp>
 #include <map>
+#include <set>
 #include <string>
 #include <unordered_map>
 #include <unordered_set>
@@ -37,6 +38,7 @@
 #include "kudu/util/monotime.h"
 #include "kudu/util/oid_generator.h"
 #include "kudu/util/promise.h"
+#include "kudu/util/random.h"
 #include "kudu/util/status.h"
 
 namespace kudu {
@@ -512,6 +514,17 @@ class CatalogManager : public tserver::TabletPeerLookupIf {
   // Loops through the "not created" tablets and sends a CreateTablet() request.
   Status ProcessPendingAssignments(const std::vector<scoped_refptr<TabletInfo> >&
tablets);
 
+  // Given 'two_choices', which should be a vector of exactly two elements, select which
+  // one is the better choice for a new replica.
+  std::shared_ptr<TSDescriptor> PickBetterReplicaLocation(const TSDescriptorVector&
two_choices);
+
+  // Select a tablet server from 'ts_descs' on which to place a new replica.
+  // Any tablet servers in 'excluded' are not considered.
+  // REQUIRES: 'ts_descs' must include at least one non-excluded server.
+  std::shared_ptr<TSDescriptor> SelectReplica(
+      const TSDescriptorVector& ts_descs,
+      const std::set<std::shared_ptr<TSDescriptor>>& excluded);
+
   // Select N Replicas from online tablet servers (as specified by
   // 'ts_descs') for the specified tablet and populate the consensus configuration
   // object. If 'ts_descs' does not specify enough online tablet
@@ -614,6 +627,9 @@ class CatalogManager : public tserver::TabletPeerLookupIf {
   Atomic32 closing_;
   ObjectIdGenerator oid_generator_;
 
+  // Random number generator used for selecting replica locations.
+  ThreadSafeRandom rng_;
+
   gscoped_ptr<SysCatalogTable> sys_catalog_;
 
   // Background thread, used to execute the catalog manager tasks

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/30773866/src/kudu/master/master.proto
----------------------------------------------------------------------
diff --git a/src/kudu/master/master.proto b/src/kudu/master/master.proto
index 87bab4d..29f1e4d 100644
--- a/src/kudu/master/master.proto
+++ b/src/kudu/master/master.proto
@@ -247,6 +247,10 @@ message TSHeartbeatRequestPB {
 
   // TODO: perhaps add some very basic metrics reporting here, like
   // free space, reqs/sec, etc?
+
+  // The number of tablets that are BOOTSTRAPPING or RUNNING.
+  // Used by the master to determine load when creating new tablet replicas.
+  optional int32 num_live_tablets = 4;
 }
 
 message TSHeartbeatResponsePB {

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/30773866/src/kudu/master/master_service.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/master_service.cc b/src/kudu/master/master_service.cc
index 329a786..4ea77d5 100644
--- a/src/kudu/master/master_service.cc
+++ b/src/kudu/master/master_service.cc
@@ -186,6 +186,7 @@ void MasterServiceImpl::TSHeartbeat(const TSHeartbeatRequestPB* req,
   }
 
   ts_desc->UpdateHeartbeatTime();
+  ts_desc->set_num_live_replicas(req->num_live_tablets());
 
   if (req->has_tablet_report()) {
     s = server_->catalog_manager()->ProcessTabletReport(

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/30773866/src/kudu/master/ts_descriptor.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/ts_descriptor.cc b/src/kudu/master/ts_descriptor.cc
index c29912c..8fa477b 100644
--- a/src/kudu/master/ts_descriptor.cc
+++ b/src/kudu/master/ts_descriptor.cc
@@ -26,6 +26,7 @@
 #include <boost/thread/locks.hpp>
 #include <boost/thread/mutex.hpp>
 
+#include <math.h>
 #include <vector>
 
 using std::shared_ptr;
@@ -46,7 +47,10 @@ TSDescriptor::TSDescriptor(std::string perm_id)
     : permanent_uuid_(std::move(perm_id)),
       latest_seqno_(-1),
       last_heartbeat_(MonoTime::Now(MonoTime::FINE)),
-      has_tablet_report_(false) {}
+      has_tablet_report_(false),
+      recent_replica_creations_(0),
+      num_live_replicas_(0) {
+}
 
 TSDescriptor::~TSDescriptor() {
 }
@@ -106,6 +110,11 @@ void TSDescriptor::set_has_tablet_report(bool has_report) {
   has_tablet_report_ = has_report;
 }
 
+void TSDescriptor::DecayRecentReplicaCreations(double secs_since_last_decay) {
+  const double kHalflifeSecs = 60;
+  recent_replica_creations_ *= pow(0.5, secs_since_last_decay / kHalflifeSecs);
+}
+
 void TSDescriptor::GetRegistration(TSRegistrationPB* reg) const {
   boost::lock_guard<simple_spinlock> l(lock_);
   CHECK(registration_) << "No registration";

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/30773866/src/kudu/master/ts_descriptor.h
----------------------------------------------------------------------
diff --git a/src/kudu/master/ts_descriptor.h b/src/kudu/master/ts_descriptor.h
index 792fe72..404e8f4 100644
--- a/src/kudu/master/ts_descriptor.h
+++ b/src/kudu/master/ts_descriptor.h
@@ -90,6 +90,36 @@ class TSDescriptor {
   Status GetConsensusProxy(const std::shared_ptr<rpc::Messenger>& messenger,
                            std::shared_ptr<consensus::ConsensusServiceProxy>* proxy);
 
+  void increment_recent_replica_creations() {
+    lock_guard<simple_spinlock> l(&lock_);
+    recent_replica_creations_ += 1;
+  }
+
+  // Decay the accounting of how many replicas have been recently
+  // created on this host.
+  void DecayRecentReplicaCreations(double secs_since_last_decay);
+
+  // Return the number of replicas which have recently been created on this
+  // TS. This number is incremented when replicas are placed on the TS, and
+  // then decayed over time.
+  double recent_replica_creations() const {
+    lock_guard<simple_spinlock> l(&lock_);
+    return recent_replica_creations_;
+  }
+
+  // Set the number of live replicas (i.e. running or bootstrapping).
+  void set_num_live_replicas(int n) {
+    DCHECK_GE(n, 0);
+    lock_guard<simple_spinlock> l(&lock_);
+    num_live_replicas_ = n;
+  }
+
+  // Return the number of live replicas (i.e running or bootstrapping).
+  int num_live_replicas() const {
+    lock_guard<simple_spinlock> l(&lock_);
+    return num_live_replicas_;
+  }
+
  private:
   explicit TSDescriptor(std::string perm_id);
 
@@ -107,6 +137,13 @@ class TSDescriptor {
   // Set to true once this instance has reported all of its tablets.
   bool has_tablet_report_;
 
+  // The number of times this tablet server has recently been selected to create a
+  // tablet replica. This value decays back to 0 over time.
+  double recent_replica_creations_;
+
+  // The number of live replicas on this host, from the last heartbeat.
+  int num_live_replicas_;
+
   gscoped_ptr<TSRegistrationPB> registration_;
 
   std::shared_ptr<tserver::TabletServerAdminServiceProxy> ts_admin_proxy_;

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/30773866/src/kudu/tserver/heartbeater.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/heartbeater.cc b/src/kudu/tserver/heartbeater.cc
index 13ba018..e70c31a 100644
--- a/src/kudu/tserver/heartbeater.cc
+++ b/src/kudu/tserver/heartbeater.cc
@@ -339,6 +339,7 @@ Status Heartbeater::Thread::DoHeartbeat() {
     server_->tablet_manager()->GenerateIncrementalTabletReport(
       req.mutable_tablet_report());
   }
+  req.set_num_live_tablets(server_->tablet_manager()->GetNumLiveTablets());
 
   RpcController rpc;
   rpc.set_timeout(MonoDelta::FromSeconds(10));

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/30773866/src/kudu/tserver/ts_tablet_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/ts_tablet_manager.cc b/src/kudu/tserver/ts_tablet_manager.cc
index 52fb56a..2e89e89 100644
--- a/src/kudu/tserver/ts_tablet_manager.cc
+++ b/src/kudu/tserver/ts_tablet_manager.cc
@@ -783,10 +783,23 @@ void TSTabletManager::MarkTabletDirty(const std::string& tablet_id,
const std::s
 }
 
 int TSTabletManager::GetNumDirtyTabletsForTests() const {
-  boost::lock_guard<rw_spinlock> lock(lock_);
+  boost::shared_lock<rw_spinlock> lock(lock_);
   return dirty_tablets_.size();
 }
 
+int TSTabletManager::GetNumLiveTablets() const {
+  int count = 0;
+  boost::shared_lock<rw_spinlock> lock(lock_);
+  for (const auto& entry : tablet_map_) {
+    tablet::TabletStatePB state = entry.second->state();
+    if (state == tablet::BOOTSTRAPPING ||
+        state == tablet::RUNNING) {
+      count++;
+    }
+  }
+  return count;
+}
+
 void TSTabletManager::MarkDirtyUnlocked(const std::string& tablet_id, const std::string&
reason) {
   TabletReportState* state = FindOrNull(dirty_tablets_, tablet_id);
   if (state != nullptr) {

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/30773866/src/kudu/tserver/ts_tablet_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/ts_tablet_manager.h b/src/kudu/tserver/ts_tablet_manager.h
index e8c5cb0..2d63142 100644
--- a/src/kudu/tserver/ts_tablet_manager.h
+++ b/src/kudu/tserver/ts_tablet_manager.h
@@ -178,6 +178,9 @@ class TSTabletManager : public tserver::TabletPeerLookupIf {
   // Returns the number of tablets in the "dirty" map, for use by unit tests.
   int GetNumDirtyTabletsForTests() const;
 
+  // Return the number of tablets in RUNNING or BOOTSTRAPPING state.
+  int GetNumLiveTablets() const;
+
   Status RunAllLogGC();
 
  private:

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/30773866/src/kudu/util/random-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/random-test.cc b/src/kudu/util/random-test.cc
index 8e37b12..54a9c66 100644
--- a/src/kudu/util/random-test.cc
+++ b/src/kudu/util/random-test.cc
@@ -16,6 +16,9 @@
 // under the License.
 
 #include <limits>
+#include <unordered_set>
+
+#include <glog/stl_logging.h>
 
 #include "kudu/util/random.h"
 #include "kudu/util/test_util.h"
@@ -94,4 +97,67 @@ TEST_F(RandomTest, TestResetSeed) {
   ASSERT_EQ(first, second);
 }
 
+TEST_F(RandomTest, TestReservoirSample) {
+  // Use a constant seed to avoid flakiness.
+  rng_.Reset(12345);
+
+  vector<int> population;
+  for (int i = 0; i < 100; i++) {
+    population.push_back(i);
+  }
+
+  // Run 1000 trials selecting 5 elements.
+  vector<int> results;
+  vector<int> counts(population.size());
+  std::unordered_set<int> avoid;
+  for (int trial = 0; trial < 1000; trial++) {
+    rng_.ReservoirSample(population, 5, avoid, &results);
+    for (int result : results) {
+      counts[result]++;
+    }
+  }
+
+  // We expect each element to be selected
+  // 50 times on average, but since it's random, it won't be exact.
+  // However, since we use a constant seed, this test won't be flaky.
+  for (int count : counts) {
+    ASSERT_GE(count, 25);
+    ASSERT_LE(count, 75);
+  }
+
+  // Run again, but avoid some particular entries.
+  avoid.insert(3);
+  avoid.insert(10);
+  avoid.insert(20);
+  counts.assign(100, 0);
+  for (int trial = 0; trial < 1000; trial++) {
+    rng_.ReservoirSample(population, 5, avoid, &results);
+    for (int result : results) {
+      counts[result]++;
+    }
+  }
+
+  // Ensure that we didn't ever pick the avoided elements.
+  ASSERT_EQ(0, counts[3]);
+  ASSERT_EQ(0, counts[10]);
+  ASSERT_EQ(0, counts[20]);
+}
+
+TEST_F(RandomTest, TestReservoirSamplePopulationTooSmall) {
+  vector<int> population;
+  for (int i = 0; i < 10; i++) {
+    population.push_back(i);
+  }
+
+  vector<int> results;
+  std::unordered_set<int> avoid;
+  rng_.ReservoirSample(population, 20, avoid, &results);
+  ASSERT_EQ(population.size(), results.size());
+  ASSERT_EQ(population, results);
+
+  rng_.ReservoirSample(population, 10, avoid, &results);
+  ASSERT_EQ(population.size(), results.size());
+  ASSERT_EQ(population, results);
+}
+
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/30773866/src/kudu/util/random.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/random.h b/src/kudu/util/random.h
index 7931f54..b4f0a46 100644
--- a/src/kudu/util/random.h
+++ b/src/kudu/util/random.h
@@ -8,7 +8,9 @@
 #include <stdint.h>
 
 #include <cmath>
+#include <vector>
 
+#include "kudu/gutil/map-util.h"
 #include "kudu/util/locks.h"
 
 namespace kudu {
@@ -118,6 +120,40 @@ class Random {
   double NextDoubleFraction() {
     return Next() / static_cast<double>(random_internal::M + 1.0);
   }
+
+  // Sample 'k' random elements from the collection 'c' into 'result', taking care not to
sample any
+  // elements that are already present in 'avoid'.
+  //
+  // In the case that 'c' has fewer than 'k' elements then all elements in 'c' will be selected.
+  //
+  // 'c' should be an iterable STL collection such as a vector, set, or list.
+  // 'avoid' should be an STL-compatible set.
+  //
+  // The results are not stored in a randomized order: the order of results will
+  // match their order in the input collection.
+  template<class Collection, class Set, class T>
+  void ReservoirSample(const Collection& c, int k, const Set& avoid,
+                       std::vector<T>* result) {
+    result->clear();
+    result->reserve(k);
+    int i = 0;
+    for (const T& elem : c) {
+      if (ContainsKey(avoid, elem)) {
+        continue;
+      }
+      i++;
+      // Fill the reservoir if there is available space.
+      if (result->size() < k) {
+        result->push_back(elem);
+        continue;
+      }
+      // Otherwise replace existing elements with decreasing probability.
+      int j = Uniform(i);
+      if (j < k) {
+        (*result)[j] = elem;
+      }
+    }
+  }
 };
 
 // Thread-safe wrapper around Random.
@@ -177,6 +213,13 @@ class ThreadSafeRandom {
     return random_.Normal(mean, std_dev);
   }
 
+  template<class Collection, class Set, class T>
+  void ReservoirSample(const Collection& c, int k, const Set& avoid,
+                       std::vector<T>* result) {
+    lock_guard<simple_spinlock> l(&lock_);
+    random_.ReservoirSample(c, k, avoid, result);
+  }
+
  private:
   simple_spinlock lock_;
   Random random_;


Mime
View raw message