kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a...@apache.org
Subject [1/4] kudu git commit: Memory tracking for result tracker
Date Tue, 16 Aug 2016 00:26:32 GMT
Repository: kudu
Updated Branches:
  refs/heads/master 74210b254 -> 2c3fc7c27


Memory tracking for result tracker

This adds memory tracking to ResultTracker, making sure we account for
the memory as we cache responses for clients' requests.

Testing wise this adds memory consumption checks to rpc-stress-test.cc.

Original patch by David Alves.

Change-Id: I3b81dda41c8bc7f70380ce426142c34afe6f1625
Reviewed-on: http://gerrit.cloudera.org:8080/3627
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <adar@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/0fb44093
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/0fb44093
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/0fb44093

Branch: refs/heads/master
Commit: 0fb44093fd64893d8333488fad763edc997393a7
Parents: 74210b2
Author: Todd Lipcon <todd@apache.org>
Authored: Fri Aug 12 14:57:24 2016 -0700
Committer: Todd Lipcon <todd@apache.org>
Committed: Mon Aug 15 19:39:25 2016 +0000

----------------------------------------------------------------------
 src/kudu/rpc/result_tracker.cc  | 66 ++++++++++++++++++++++++++++++--
 src/kudu/rpc/result_tracker.h   | 49 +++++++++++++++++++++---
 src/kudu/rpc/rpc-stress-test.cc | 73 +++++++++++++++++++++++++++++++++---
 src/kudu/rpc/rpc-test-base.h    |  5 ++-
 src/kudu/server/server_base.cc  |  3 +-
 src/kudu/util/test_util.cc      | 41 ++++++++++++++++++++
 src/kudu/util/test_util.h       | 14 +++++++
 7 files changed, 234 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/0fb44093/src/kudu/rpc/result_tracker.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/result_tracker.cc b/src/kudu/rpc/result_tracker.cc
index 61e5738..649ad15 100644
--- a/src/kudu/rpc/result_tracker.cc
+++ b/src/kudu/rpc/result_tracker.cc
@@ -22,6 +22,7 @@
 #include "kudu/rpc/inbound_call.h"
 #include "kudu/rpc/rpc_context.h"
 #include "kudu/util/debug/trace_event.h"
+#include "kudu/util/mem_tracker.h"
 #include "kudu/util/trace.h"
 #include "kudu/util/pb_util.h"
 
@@ -29,14 +30,59 @@ namespace kudu {
 namespace rpc {
 
 using google::protobuf::Message;
+using kudu::MemTracker;
 using rpc::InboundCall;
 using std::move;
 using std::lock_guard;
+using std::shared_ptr;
 using std::string;
 using std::unique_ptr;
 using strings::Substitute;
 using strings::SubstituteAndAppend;
 
+// This tracks the size changes of anything that has a memory_footprint() method.
+// It must be instantiated before the updates, and it makes sure that the MemTracker
+// is updated on scope exit.
+template <class T>
+struct ScopedMemTrackerUpdater {
+  ScopedMemTrackerUpdater(MemTracker* tracker_, const T* tracked_)
+      : tracker(tracker_),
+        tracked(tracked_),
+        memory_before(tracked->memory_footprint()),
+        cancelled(false) {
+  }
+
+  ~ScopedMemTrackerUpdater() {
+    if (cancelled) return;
+    tracker->Release(memory_before - tracked->memory_footprint());
+  }
+
+  void Cancel() {
+    cancelled = true;
+  }
+
+  MemTracker* tracker;
+  const T* tracked;
+  int64_t memory_before;
+  bool cancelled;
+};
+
+ResultTracker::ResultTracker(shared_ptr<MemTracker> mem_tracker)
+    : mem_tracker_(std::move(mem_tracker)),
+      clients_(ClientStateMap::key_compare(),
+               ClientStateMapAllocator(mem_tracker_)) {}
+
+ResultTracker::~ResultTracker() {
+  lock_guard<simple_spinlock> l(lock_);
+  // Release all the memory for the stuff we'll delete on destruction.
+  for (auto& client_state : clients_) {
+    for (auto& completion_record : client_state.second->completion_records) {
+      mem_tracker_->Release(completion_record.second->memory_footprint());
+    }
+    mem_tracker_->Release(client_state.second->memory_footprint());
+  }
+}
+
 ResultTracker::RpcState ResultTracker::TrackRpc(const RequestIdPB& request_id,
                                                 Message* response,
                                                 RpcContext* context) {
@@ -47,20 +93,28 @@ ResultTracker::RpcState ResultTracker::TrackRpc(const RequestIdPB&
request_id,
 ResultTracker::RpcState ResultTracker::TrackRpcUnlocked(const RequestIdPB& request_id,
                                                         Message* response,
                                                         RpcContext* context) {
-
   ClientState* client_state = ComputeIfAbsent(
       &clients_,
       request_id.client_id(),
-      []{ return unique_ptr<ClientState>(new ClientState()); })->get();
+      [&]{
+        unique_ptr<ClientState> client_state(new ClientState(mem_tracker_));
+        mem_tracker_->Consume(client_state->memory_footprint());
+        return client_state;
+      })->get();
 
   client_state->last_heard_from = MonoTime::Now();
 
   auto result = ComputeIfAbsentReturnAbsense(
       &client_state->completion_records,
       request_id.seq_no(),
-      []{ return unique_ptr<CompletionRecord>(new CompletionRecord()); });
+      [&]{
+        unique_ptr<CompletionRecord> completion_record(new CompletionRecord());
+        mem_tracker_->Consume(completion_record->memory_footprint());
+        return completion_record;
+      });
 
   CompletionRecord* completion_record = result.first->get();
+  ScopedMemTrackerUpdater<CompletionRecord> cr_updater(mem_tracker_.get(), completion_record);
 
   if (PREDICT_TRUE(result.second)) {
     completion_record->state = RpcState::IN_PROGRESS;
@@ -111,6 +165,7 @@ ResultTracker::RpcState ResultTracker::TrackRpcOrChangeDriver(const RequestIdPB&
   if (state != RpcState::IN_PROGRESS) return state;
 
   CompletionRecord* completion_record = FindCompletionRecordOrDieUnlocked(request_id);
+  ScopedMemTrackerUpdater<CompletionRecord> updater(mem_tracker_.get(), completion_record);
 
   // ... if we did find a CompletionRecord change the driver and return true.
   completion_record->driver_attempt_no = request_id.attempt_no();
@@ -194,6 +249,7 @@ void ResultTracker::RecordCompletionAndRespond(const RequestIdPB&
request_id,
   lock_guard<simple_spinlock> l(lock_);
 
   CompletionRecord* completion_record = FindCompletionRecordOrDieUnlocked(request_id);
+  ScopedMemTrackerUpdater<CompletionRecord> updater(mem_tracker_.get(), completion_record);
 
   CHECK_EQ(completion_record->driver_attempt_no, request_id.attempt_no())
     << "Called RecordCompletionAndRespond() from an executor identified with an attempt
number that"
@@ -233,13 +289,13 @@ void ResultTracker::FailAndRespondInternal(const RequestIdPB& request_id,
                                            HandleOngoingRpcFunc func) {
   lock_guard<simple_spinlock> l(lock_);
   auto state_and_record = FindClientStateAndCompletionRecordOrNullUnlocked(request_id);
-
   if (PREDICT_FALSE(state_and_record.first == nullptr)) {
     LOG(FATAL) << "Couldn't find ClientState for request: " << request_id.ShortDebugString()
         << ". \nTracker state:\n" << ToStringUnlocked();
   }
 
   CompletionRecord* completion_record = state_and_record.second;
+  ScopedMemTrackerUpdater<CompletionRecord> cr_updater(mem_tracker_.get(), completion_record);
 
   if (completion_record == nullptr) {
     return;
@@ -272,8 +328,10 @@ void ResultTracker::FailAndRespondInternal(const RequestIdPB& request_id,
   // delete the completion record.
   if (completion_record->ongoing_rpcs.size() == 0
       && completion_record->state != RpcState::COMPLETED) {
+    cr_updater.Cancel();
     unique_ptr<CompletionRecord> completion_record =
         EraseKeyReturnValuePtr(&state_and_record.first->completion_records, seq_no);
+    mem_tracker_->Release(completion_record->memory_footprint());
   }
 }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/0fb44093/src/kudu/rpc/result_tracker.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/result_tracker.h b/src/kudu/rpc/result_tracker.h
index a04cb07..86d87b2 100644
--- a/src/kudu/rpc/result_tracker.h
+++ b/src/kudu/rpc/result_tracker.h
@@ -16,6 +16,7 @@
 // under the License.
 #pragma once
 
+#include <functional>
 #include <map>
 #include <string>
 #include <utility>
@@ -26,6 +27,8 @@
 #include "kudu/rpc/request_tracker.h"
 #include "kudu/rpc/rpc_header.pb.h"
 #include "kudu/util/locks.h"
+#include "kudu/util/malloc.h"
+#include "kudu/util/mem_tracker.h"
 #include "kudu/util/monotime.h"
 
 namespace google {
@@ -139,7 +142,6 @@ class RpcContext;
 //
 // This class is thread safe.
 //
-// TODO Memory bookkeeping.
 // TODO Garbage collection.
 class ResultTracker : public RefCountedThreadSafe<ResultTracker> {
  public:
@@ -160,8 +162,8 @@ class ResultTracker : public RefCountedThreadSafe<ResultTracker>
{
     STALE
   };
 
-  ResultTracker() {}
-  ~ResultTracker() {}
+  explicit ResultTracker(std::shared_ptr<kudu::MemTracker> mem_tracker);
+  ~ResultTracker();
 
   // Tracks the RPC and returns its current state.
   //
@@ -244,12 +246,39 @@ class ResultTracker : public RefCountedThreadSafe<ResultTracker>
{
     std::vector<OnGoingRpcInfo> ongoing_rpcs;
 
     std::string ToString() const;
+
+    // Calculates the memory footprint of this struct.
+    int64_t memory_footprint() const {
+      return kudu_malloc_usable_size(this)
+          + (ongoing_rpcs.capacity() > 0 ? kudu_malloc_usable_size(ongoing_rpcs.data())
: 0)
+          + (response.get() != nullptr ? response->SpaceUsed() : 0);
+    }
   };
   // The state corresponding to a single client.
   struct ClientState {
+    typedef MemTrackerAllocator<
+        std::pair<const SequenceNumber,
+                  std::unique_ptr<CompletionRecord>>> CompletionRecordMapAllocator;
+    typedef std::map<SequenceNumber,
+                     std::unique_ptr<CompletionRecord>,
+                     std::less<SequenceNumber>,
+                     CompletionRecordMapAllocator> CompletionRecordMap;
+
+    explicit ClientState(std::shared_ptr<MemTracker> mem_tracker)
+     : completion_records(CompletionRecordMap::key_compare(),
+                          CompletionRecordMapAllocator(std::move(mem_tracker))) {}
+
     MonoTime last_heard_from;
-    std::map<SequenceNumber, std::unique_ptr<CompletionRecord>> completion_records;
+    CompletionRecordMap completion_records;
+
     std::string ToString() const;
+
+    // Calculates the memory footprint of this struct.
+    // This calculation is shallow and doesn't account for the memory the nested data
+    // structures occupy.
+    int64_t memory_footprint() const {
+      return kudu_malloc_usable_size(this);
+    }
   };
 
   RpcState TrackRpcUnlocked(const RequestIdPB& request_id,
@@ -290,12 +319,22 @@ class ResultTracker : public RefCountedThreadSafe<ResultTracker>
{
 
   std::string ToStringUnlocked() const;
 
+  // The memory tracker that tracks this ResultTracker's memory consumption.
+  std::shared_ptr<kudu::MemTracker> mem_tracker_;
 
   // Lock that protects access to 'clients_' and to the state contained in each
   // ClientState.
   // TODO consider a per-ClientState lock if we find this too coarse grained.
   simple_spinlock lock_;
-  std::map<std::string, std::unique_ptr<ClientState>> clients_;
+
+  typedef MemTrackerAllocator<std::pair<const std::string,
+                                        std::unique_ptr<ClientState>>> ClientStateMapAllocator;
+  typedef std::map<std::string,
+                   std::unique_ptr<ClientState>,
+                   std::less<std::string>,
+                   ClientStateMapAllocator> ClientStateMap;
+
+  ClientStateMap clients_;
 
   DISALLOW_COPY_AND_ASSIGN(ResultTracker);
 };

http://git-wip-us.apache.org/repos/asf/kudu/blob/0fb44093/src/kudu/rpc/rpc-stress-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc-stress-test.cc b/src/kudu/rpc/rpc-stress-test.cc
index b8dade1..2d1215e 100644
--- a/src/kudu/rpc/rpc-stress-test.cc
+++ b/src/kudu/rpc/rpc-stress-test.cc
@@ -31,10 +31,11 @@ namespace {
 const char* kClientId = "test-client";
 
 void AddRequestId(RpcController* controller,
+                  const std::string& client_id,
                   ResultTracker::SequenceNumber sequence_number,
                   int64_t attempt_no) {
   unique_ptr<RequestIdPB> request_id(new RequestIdPB());
-  request_id->set_client_id(kClientId);
+  request_id->set_client_id(client_id);
   request_id->set_seq_no(sequence_number);
   request_id->set_attempt_no(attempt_no);
   request_id->set_first_incomplete_seq_no(sequence_number);
@@ -191,7 +192,7 @@ class RpcStressTest : public RpcTestBase {
        client_sleep_for_ms(client_sleep) {
       req.set_value_to_add(value);
       req.set_sleep_for_ms(server_sleep);
-      AddRequestId(&controller, sequence_number, attempt_no);
+      AddRequestId(&controller, kClientId, sequence_number, attempt_no);
     }
 
     void Start() {
@@ -224,7 +225,7 @@ class RpcStressTest : public RpcTestBase {
     ExactlyOnceResponsePB resp;
     RequestTracker::SequenceNumber seq_no;
     CHECK_OK(request_tracker_->NewSeqNo(&seq_no));
-    AddRequestId(&controller, seq_no, 0);
+    AddRequestId(&controller, kClientId, seq_no, 0);
     ASSERT_OK(proxy_->AddExactlyOnce(req, &resp, &controller));
     ASSERT_EQ(resp.current_val(), expected_value);
     request_tracker_->RpcCompleted(seq_no);
@@ -244,16 +245,30 @@ class RpcStressTest : public RpcTestBase {
 // same sequence number as previous request.
 TEST_F(RpcStressTest, TestExactlyOnceSemanticsAfterRpcCompleted) {
   ExactlyOnceResponsePB original_resp;
+  int mem_consumption = mem_tracker_->consumption();
   {
     RpcController controller;
     ExactlyOnceRequestPB req;
     req.set_value_to_add(1);
 
     // Assign id 0.
-    AddRequestId(&controller, 0, 0);
+    AddRequestId(&controller, kClientId, 0, 0);
 
     // Send the request the first time.
     ASSERT_OK(proxy_->AddExactlyOnce(req, &original_resp, &controller));
+
+    // The incremental usage of a new client is the size of the response itself
+    // plus some fixed overhead for the client-tracking structure.
+    int expected_incremental_usage = original_resp.SpaceUsed() + 200;
+
+    // The consumption isn't immediately updated, since the MemTracker update
+    // happens after we call 'Respond' on the RPC.
+    int mem_consumption_after;
+    AssertEventually([&]() {
+        mem_consumption_after = mem_tracker_->consumption();
+        ASSERT_GT(mem_consumption_after - mem_consumption, expected_incremental_usage);
+      });
+    mem_consumption = mem_consumption_after;
   }
 
   // Now repeat the rpc 10 times, using the same sequence number, none of these should be
executed
@@ -264,10 +279,32 @@ TEST_F(RpcStressTest, TestExactlyOnceSemanticsAfterRpcCompleted) {
     ExactlyOnceRequestPB req;
     req.set_value_to_add(1);
     ExactlyOnceResponsePB resp;
-    AddRequestId(&controller, 0, i + 1);
+    AddRequestId(&controller, kClientId, 0, i + 1);
     ASSERT_OK(proxy_->AddExactlyOnce(req, &resp, &controller));
     ASSERT_EQ(resp.current_val(), 1);
     ASSERT_EQ(resp.current_time_micros(), original_resp.current_time_micros());
+    // Sleep to give the MemTracker time to update -- we don't expect any update,
+    // but if we had a bug here, we'd only see it with this sleep.
+    SleepFor(MonoDelta::FromMilliseconds(100));
+    // We shouldn't have consumed any more memory since the responses were cached.
+    ASSERT_EQ(mem_consumption, mem_tracker_->consumption());
+  }
+
+  // Making a new request, from a new client, should double the memory consumption.
+  {
+    RpcController controller;
+    ExactlyOnceRequestPB req;
+    ExactlyOnceResponsePB resp;
+    req.set_value_to_add(1);
+
+    // Assign id 0.
+    AddRequestId(&controller, "test-client2", 0, 0);
+
+    // Send the first request for this new client.
+    ASSERT_OK(proxy_->AddExactlyOnce(req, &resp, &controller));
+    AssertEventually([&]() {
+        ASSERT_EQ(mem_tracker_->consumption(), mem_consumption * 2);
+      });
   }
 }
 
@@ -313,7 +350,22 @@ TEST_F(RpcStressTest, TestExactlyOnceSemanticsWithConcurrentUpdaters)
{
     kNumThreads = 100;
   }
 
-  for (int i = 0; i < kNumIterations; i ++) {
+  ResultTracker::SequenceNumber sequence_number = 0;
+  int memory_consumption_initial = mem_tracker_->consumption();
+  int single_response_size = 0;
+
+  // Measure memory consumption for a single response from the same client.
+  {
+    RpcController controller;
+    ExactlyOnceRequestPB req;
+    ExactlyOnceResponsePB resp;
+    req.set_value_to_add(1);
+    AddRequestId(&controller, kClientId, sequence_number, 0);
+    ASSERT_OK(proxy_->AddExactlyOnce(req, &resp, &controller));
+    single_response_size = resp.SpaceUsed();
+  }
+
+  for (int i = 1; i <= kNumIterations; i ++) {
     vector<unique_ptr<SimultaneousExactlyOnceAdder>> adders;
     for (int j = 0; j < kNumThreads; j++) {
       unique_ptr<SimultaneousExactlyOnceAdder> adder(
@@ -334,6 +386,15 @@ TEST_F(RpcStressTest, TestExactlyOnceSemanticsWithConcurrentUpdaters)
{
         ASSERT_EQ(adders[j]->resp.current_time_micros(), time_micros);
       }
     }
+
+    // Wait for the MemTracker to be updated.
+    // After all adders finished we should at least the size of one more response.
+    // The actual size depends of multiple factors, for instance, how many calls were "attached"
+    // (which is timing dependent) so we can't be more precise than this.
+    AssertEventually([&]() {
+        ASSERT_GT(mem_tracker_->consumption(),
+                  memory_consumption_initial + single_response_size * i);
+      });
   }
 }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/0fb44093/src/kudu/rpc/rpc-test-base.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc-test-base.h b/src/kudu/rpc/rpc-test-base.h
index 7b82b08..9e4f00f 100644
--- a/src/kudu/rpc/rpc-test-base.h
+++ b/src/kudu/rpc/rpc-test-base.h
@@ -38,6 +38,7 @@
 #include "kudu/rpc/service_if.h"
 #include "kudu/rpc/service_pool.h"
 #include "kudu/util/faststring.h"
+#include "kudu/util/mem_tracker.h"
 #include "kudu/util/net/sockaddr.h"
 #include "kudu/util/random.h"
 #include "kudu/util/random_util.h"
@@ -458,7 +459,8 @@ class RpcTestBase : public KuduTest {
     ASSERT_OK(server_messenger_->AddAcceptorPool(Sockaddr(), &pool));
     ASSERT_OK(pool->Start(2));
     *server_addr = pool->bind_address();
-    result_tracker_.reset(new ResultTracker());
+    mem_tracker_ = MemTracker::CreateTracker(-1, "result_tracker");
+    result_tracker_.reset(new ResultTracker(mem_tracker_));
 
     gscoped_ptr<ServiceIf> service(new ServiceClass(metric_entity_, result_tracker_));
     service_name_ = service->service_name();
@@ -472,6 +474,7 @@ class RpcTestBase : public KuduTest {
   string service_name_;
   std::shared_ptr<Messenger> server_messenger_;
   scoped_refptr<ServicePool> service_pool_;
+  std::shared_ptr<kudu::MemTracker> mem_tracker_;
   scoped_refptr<ResultTracker> result_tracker_;
   int n_worker_threads_;
   int service_queue_length_;

http://git-wip-us.apache.org/repos/asf/kudu/blob/0fb44093/src/kudu/server/server_base.cc
----------------------------------------------------------------------
diff --git a/src/kudu/server/server_base.cc b/src/kudu/server/server_base.cc
index 19c9f6a..6010f23 100644
--- a/src/kudu/server/server_base.cc
+++ b/src/kudu/server/server_base.cc
@@ -93,7 +93,8 @@ ServerBase::ServerBase(string name, const ServerBaseOptions& options,
                                                       metric_namespace)),
       rpc_server_(new RpcServer(options.rpc_opts)),
       web_server_(new Webserver(options.webserver_opts)),
-      result_tracker_(new rpc::ResultTracker()),
+      result_tracker_(new rpc::ResultTracker(shared_ptr<MemTracker>(
+          MemTracker::CreateTracker(-1, "result-tracker", mem_tracker_)))),
       is_first_run_(false),
       options_(options),
       stop_metrics_logging_latch_(1) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/0fb44093/src/kudu/util/test_util.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/test_util.cc b/src/kudu/util/test_util.cc
index 58be8ff..b26302a 100644
--- a/src/kudu/util/test_util.cc
+++ b/src/kudu/util/test_util.cc
@@ -18,6 +18,7 @@
 
 #include <gflags/gflags.h>
 #include <glog/logging.h>
+#include <gtest/gtest-spi.h>
 
 #include "kudu/gutil/strings/strcat.h"
 #include "kudu/gutil/strings/substitute.h"
@@ -180,4 +181,44 @@ string GetTestDataDirectory() {
   return dir;
 }
 
+void AssertEventually(const std::function<void(void)>& f,
+                      const MonoDelta& timeout) {
+  MonoTime deadline = MonoTime::Now();
+  deadline.AddDelta(timeout);
+  int attempts = 0;
+
+  while (MonoTime::Now().ComesBefore(deadline)) {
+    // Capture any assertion failures within this scope (i.e. from their function)
+    // into 'results'
+    testing::TestPartResultArray results;
+    testing::ScopedFakeTestPartResultReporter reporter(
+        testing::ScopedFakeTestPartResultReporter::INTERCEPT_ONLY_CURRENT_THREAD,
+        &results);
+    f();
+
+    // Determine whether their function produced any new test failure results.
+    bool has_failures = false;
+    for (int i = 0; i < results.size(); i++) {
+      has_failures |= results.GetTestPartResult(i).failed();
+    }
+    if (!has_failures) {
+      return;
+    }
+
+    // If they had failures, sleep and try again.
+    int sleep_ms = (attempts < 10) ? (1 << attempts) : 1000;
+    SleepFor(MonoDelta::FromMilliseconds(sleep_ms));
+  }
+
+  // If we ran out of time looping, run their function one more time
+  // without capturing its assertions. This way the assertions will
+  // propagate back out to the normal test reporter. Of course it's
+  // possible that it will pass on this last attempt, but that's OK
+  // too, since we aren't trying to be that strict about the deadline.
+  f();
+  if (testing::Test::HasFatalFailure()) {
+    ADD_FAILURE() << "Timed out waiting for assertion to pass.";
+  }
+}
+
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/0fb44093/src/kudu/util/test_util.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/test_util.h b/src/kudu/util/test_util.h
index a545233..62dfe46 100644
--- a/src/kudu/util/test_util.h
+++ b/src/kudu/util/test_util.h
@@ -19,11 +19,13 @@
 #ifndef KUDU_UTIL_TEST_UTIL_H
 #define KUDU_UTIL_TEST_UTIL_H
 
+#include <functional>
 #include <gtest/gtest.h>
 #include <string>
 
 #include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/util/env.h"
+#include "kudu/util/monotime.h"
 #include "kudu/util/test_macros.h"
 
 namespace kudu {
@@ -79,5 +81,17 @@ int SeedRandom();
 // May only be called from within a gtest unit test.
 std::string GetTestDataDirectory();
 
+// Wait until 'f()' succeeds without adding any GTest 'fatal failures'.
+// For example:
+//
+//   AssertEventually([]() {
+//     ASSERT_GT(ReadValueOfMetric(), 10);
+//   });
+//
+// The function is run in a loop with exponential backoff, capped at once
+// a second.
+void AssertEventually(const std::function<void(void)>& f,
+                      const MonoDelta& timeout = MonoDelta::FromSeconds(30));
+
 } // namespace kudu
 #endif


Mime
View raw message