kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a...@apache.org
Subject kudu git commit: periodic: add one-shot timers
Date Wed, 04 Oct 2017 22:35:34 GMT
Repository: kudu
Updated Branches:
  refs/heads/master 3b0ef599b -> e51211997


periodic: add one-shot timers

One-shot timers will Stop() themselves after running a user task once.

Note: it's worth pointing out that if Callback() races with Snooze(), the
outcome is somewhat undefined. That is, there's no telling whether the
currently running callback will run the task or snooze it. The same holds
for one-shot timers, albeit more profoundly, as Callback() can also race
with Start(). To avoid this, Start() the one-shot timer from within the
task, or right after the task.

I also refactored one-shot and jitter percentage into a new options struct.

Change-Id: Ia4d9376172d66c92958071d5abbac63d751e41f3
Reviewed-on: http://gerrit.cloudera.org:8080/8130
Tested-by: Adar Dembo <adar@cloudera.com>
Reviewed-by: Alexey Serbin <aserbin@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/e5121199
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/e5121199
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/e5121199

Branch: refs/heads/master
Commit: e51211997e47d88ea0ed5b6476c0655f84eebfb7
Parents: 3b0ef59
Author: Adar Dembo <adar@cloudera.com>
Authored: Wed Sep 20 19:31:13 2017 -0700
Committer: Adar Dembo <adar@cloudera.com>
Committed: Wed Oct 4 22:28:40 2017 +0000

----------------------------------------------------------------------
 src/kudu/rpc/periodic-test.cc | 77 +++++++++++++++++++++++++++++++++++++-
 src/kudu/rpc/periodic.cc      | 39 +++++++++++++++----
 src/kudu/rpc/periodic.h       | 40 ++++++++++++++++----
 3 files changed, 138 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/e5121199/src/kudu/rpc/periodic-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/periodic-test.cc b/src/kudu/rpc/periodic-test.cc
index d0d2b99..08f379f 100644
--- a/src/kudu/rpc/periodic-test.cc
+++ b/src/kudu/rpc/periodic-test.cc
@@ -19,6 +19,7 @@
 #include <cstdint>
 #include <memory>
 #include <string>
+#include <utility>
 
 #include <gtest/gtest.h>
 
@@ -66,7 +67,7 @@ class JitteredPeriodicTimerTest : public PeriodicTimerTest,
     timer_ = PeriodicTimer::Create(messenger_,
                                    [&] { counter_++; },
                                    MonoDelta::FromMilliseconds(period_ms_),
-                                   GetParam());
+                                   GetOptions());
   }
 
   virtual void TearDown() override {
@@ -78,6 +79,13 @@ class JitteredPeriodicTimerTest : public PeriodicTimerTest,
   }
 
  protected:
+
+  virtual PeriodicTimer::Options GetOptions() {
+    PeriodicTimer::Options opts;
+    opts.jitter_pct = GetParam();
+    return opts;
+  }
+
   atomic<int64_t> counter_;
   shared_ptr<Messenger> messenger_;
   shared_ptr<PeriodicTimer> timer_;
@@ -160,6 +168,8 @@ TEST_F(PeriodicTimerTest, TestCallbackRestartsTimer) {
   ASSERT_OK(MessengerBuilder("test").Build(&messenger));
 
   // Create a timer that restarts itself from within its functor.
+  PeriodicTimer::Options opts;
+  opts.jitter_pct = 0.0; // don't need jittering
   shared_ptr<PeriodicTimer> timer = PeriodicTimer::Create(
       messenger,
       [&] {
@@ -167,7 +177,7 @@ TEST_F(PeriodicTimerTest, TestCallbackRestartsTimer) {
         timer->Start();
       },
       MonoDelta::FromMilliseconds(period_ms_),
-      0.0); // jittering would just complicate the measurements below
+      std::move(opts));
 
   // Run the timer for a fixed amount of time.
   timer->Start();
@@ -182,5 +192,68 @@ TEST_F(PeriodicTimerTest, TestCallbackRestartsTimer) {
   ASSERT_LE(timer->NumCallbacksForTests(), kPeriods * 3);
 }
 
+class JitteredOneShotPeriodicTimerTest : public JitteredPeriodicTimerTest {
+ protected:
+  virtual PeriodicTimer::Options GetOptions() override {
+    PeriodicTimer::Options opts;
+    opts.jitter_pct = GetParam();
+    opts.one_shot = true;
+    return opts;
+  }
+};
+
+INSTANTIATE_TEST_CASE_P(AllJitterModes,
+                        JitteredOneShotPeriodicTimerTest,
+                        ::testing::Values(0.0, 0.25));
+
+TEST_P(JitteredOneShotPeriodicTimerTest, TestBasics) {
+  // Kick off the one-shot timer a few times.
+  for (int i = 0; i < 3; i++) {
+    ASSERT_EQ(i, counter_);
+
+    // Eventually the task will run.
+    timer_->Start();
+    ASSERT_EVENTUALLY([&](){
+      ASSERT_EQ(i + 1, counter_);
+    });
+
+    // Even if we explicitly wait another few periods, the counter value
+    // shouldn't change.
+    SleepFor(MonoDelta::FromMilliseconds(period_ms_ * 2));
+    ASSERT_EQ(i + 1, counter_);
+  }
+}
+
+TEST_F(PeriodicTimerTest, TestCallbackRestartsOneShotTimer) {
+  atomic<int64_t> counter(0);
+  shared_ptr<Messenger> messenger;
+  ASSERT_OK(MessengerBuilder("test")
+            .Build(&messenger));
+
+  // Create a timer that restarts itself from within its functor.
+  PeriodicTimer::Options opts;
+  opts.jitter_pct = 0.0; // don't need jittering
+  opts.one_shot = true;
+  shared_ptr<PeriodicTimer> timer = PeriodicTimer::Create(
+      messenger,
+      [&] {
+        counter++;
+        timer->Start();
+      },
+      MonoDelta::FromMilliseconds(period_ms_),
+      std::move(opts));
+
+  // Because the timer restarts itself every time the functor runs, we
+  // should see the counter value increase with each period.
+  timer->Start();
+  ASSERT_EVENTUALLY([&](){
+    ASSERT_GE(counter, 5);
+  });
+
+  // Ensure that the reactor threads are fully quiesced (and thus no timer
+  // callbacks are running) by the time 'counter' is destroyed.
+  messenger->Shutdown();
+}
+
 } // namespace rpc
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/e5121199/src/kudu/rpc/periodic.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/periodic.cc b/src/kudu/rpc/periodic.cc
index f4b33db..07674de 100644
--- a/src/kudu/rpc/periodic.cc
+++ b/src/kudu/rpc/periodic.cc
@@ -36,30 +36,35 @@ using std::weak_ptr;
 namespace kudu {
 namespace rpc {
 
+PeriodicTimer::Options::Options()
+    : jitter_pct(0.25),
+      one_shot(false) {
+}
+
 shared_ptr<PeriodicTimer> PeriodicTimer::Create(
     shared_ptr<Messenger> messenger,
     RunTaskFunctor functor,
     MonoDelta period,
-    double jitter_pct) {
+    Options options) {
   return std::make_shared<PeriodicTimer>(
-      std::move(messenger), std::move(functor), period, jitter_pct);
+      std::move(messenger), std::move(functor), period, options);
 }
 
 PeriodicTimer::PeriodicTimer(
     shared_ptr<Messenger> messenger,
     RunTaskFunctor functor,
     MonoDelta period,
-    double jitter_pct)
+    Options options)
     : messenger_(std::move(messenger)),
       functor_(std::move(functor)),
       period_(period),
-      jitter_pct_(jitter_pct),
+      options_(std::move(options)),
       rng_(GetRandomSeed32()),
       current_callback_generation_(0),
       num_callbacks_for_tests_(0),
       started_(false) {
-  DCHECK_GE(jitter_pct_, 0);
-  DCHECK_LE(jitter_pct_, 1);
+  DCHECK_GE(options_.jitter_pct, 0);
+  DCHECK_LE(options_.jitter_pct, 1);
 }
 
 PeriodicTimer::~PeriodicTimer() {
@@ -81,6 +86,11 @@ void PeriodicTimer::Start(boost::optional<MonoDelta> next_task_delta)
{
 
 void PeriodicTimer::Stop() {
   std::lock_guard<simple_spinlock> l(lock_);
+  StopUnlocked();
+}
+
+void PeriodicTimer::StopUnlocked() {
+  DCHECK(lock_.is_locked());
   started_ = false;
 }
 
@@ -101,7 +111,7 @@ void PeriodicTimer::SnoozeUnlocked(boost::optional<MonoDelta> next_task_delta)
{
     next_task_delta = MonoDelta::FromMilliseconds(
         GetMinimumPeriod().ToMilliseconds() +
         rng_.NextDoubleFraction() *
-        jitter_pct_ *
+        options_.jitter_pct *
         (2 * period_.ToMilliseconds()));
   }
   next_task_time_ = MonoTime::Now() + *next_task_delta;
@@ -110,7 +120,7 @@ void PeriodicTimer::SnoozeUnlocked(boost::optional<MonoDelta> next_task_delta)
{
 MonoDelta PeriodicTimer::GetMinimumPeriod() {
   // Given jitter percentage J and period P, this returns (1-J)*P, which is
   // the lowest possible jittered value.
-  return MonoDelta::FromMilliseconds((1.0 - jitter_pct_) *
+  return MonoDelta::FromMilliseconds((1.0 - options_.jitter_pct) *
                                      period_.ToMilliseconds());
 }
 
@@ -166,10 +176,23 @@ void PeriodicTimer::Callback(int64_t my_callback_generation) {
       // It's time to run the task. Although the next task time is reset now,
       // it may be reset again by virtue of running the task itself.
       run_task = true;
+
+      if (options_.one_shot) {
+        // Stop the timer first, in case the task wants to restart it.
+        StopUnlocked();
+      }
     }
   }
+
   if (run_task) {
     functor_();
+
+    if (options_.one_shot) {
+      // The task was run; exit the loop. Even if the task restarted the timer,
+      // that will have started a new callback loop, so exiting here is always
+      // the correct thing to do.
+      return;
+    }
     Snooze();
   }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/e5121199/src/kudu/rpc/periodic.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/periodic.h b/src/kudu/rpc/periodic.h
index f5b1e77..f65ed85 100644
--- a/src/kudu/rpc/periodic.h
+++ b/src/kudu/rpc/periodic.h
@@ -66,6 +66,28 @@ class PeriodicTimer : public std::enable_shared_from_this<PeriodicTimer>
{
  public:
   typedef std::function<void(void)> RunTaskFunctor;
 
+  struct Options {
+    Options();
+
+    // Defines the percentage of the period that will be jittered up or down
+    // randomly. Together with the period, the periodicity of the timer will
+    // vary between (1-J)*P and (1+J)*P.
+    //
+    // Must be between 0 and 1.
+    //
+    // If not set, defaults to 0.25.
+    double jitter_pct;
+
+    // The timer will automatically stop after running the user's task.
+    //
+    // Just as with a normal timer, Snooze() will postpone the running of the
+    // task, and Stop() will cancel the task outright. Unlike a normal timer,
+    // both operations will no-op if the timer has already fired.
+    //
+    // If not set, defaults to false.
+    bool one_shot;
+  };
+
   // Creates a new PeriodicTimer.
   //
   // A ref is taken on 'messenger', which is used for scheduling callbacks.
@@ -74,15 +96,14 @@ class PeriodicTimer : public std::enable_shared_from_this<PeriodicTimer>
{
   // PeriodicTimer. The task will run on the messenger's reactor threads so it
   // should do very little work (i.e. no I/O).
   //
-  // 'period' defines the period between tasks while 'jitter_pct' (which must
-  // be between 0 and 1) defines the percentage of the period that will be
-  // jittered up or down randomly. Taken together, the periodicity of the
-  // timer varies between (1-J)*P and (1+J)*P.
+  // 'period' defines the period between tasks.
+  //
+  // 'options' allows additional (optional) customization of the timer.
   static std::shared_ptr<PeriodicTimer> Create(
       std::shared_ptr<Messenger> messenger,
       RunTaskFunctor functor,
       MonoDelta period,
-      double jitter_pct = 0.25);
+      Options options = {});
 
   ~PeriodicTimer();
 
@@ -126,7 +147,7 @@ class PeriodicTimer : public std::enable_shared_from_this<PeriodicTimer>
{
   PeriodicTimer(std::shared_ptr<Messenger> messenger,
                 RunTaskFunctor functor,
                 MonoDelta period,
-                double jitter_pct);
+                Options options);
 
   // Calculate the minimum period for the timer, which varies depending on
   // 'jitter_pct_' and the output of the PRNG.
@@ -137,6 +158,9 @@ class PeriodicTimer : public std::enable_shared_from_this<PeriodicTimer>
{
   // when it was constructed.
   void Callback(int64_t my_callback_generation);
 
+  // Like Stop() but must be called with 'lock_' held.
+  void StopUnlocked();
+
   // Like Snooze() but must be called with 'lock_' held.
   void SnoozeUnlocked(boost::optional<MonoDelta> next_task_delta = boost::none);
 
@@ -154,8 +178,8 @@ class PeriodicTimer : public std::enable_shared_from_this<PeriodicTimer>
{
   // User-specified task period.
   const MonoDelta period_;
 
-  // User-specified jitter percentage.
-  const double jitter_pct_;
+  // User-specified options.
+  const Options options_;
 
   // Protects all mutable state below.
   mutable simple_spinlock lock_;


Mime
View raw message