Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 9E9ED200D23 for ; Thu, 5 Oct 2017 00:35:36 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 9CC721609DD; Wed, 4 Oct 2017 22:35:36 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 95E1B160BD7 for ; Thu, 5 Oct 2017 00:35:35 +0200 (CEST) Received: (qmail 45146 invoked by uid 500); 4 Oct 2017 22:35:34 -0000 Mailing-List: contact commits-help@kudu.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kudu.apache.org Delivered-To: mailing list commits@kudu.apache.org Received: (qmail 45137 invoked by uid 99); 4 Oct 2017 22:35:34 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 04 Oct 2017 22:35:34 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id AC558F3219; Wed, 4 Oct 2017 22:35:34 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: adar@apache.org To: commits@kudu.apache.org Message-Id: <0b863e8f51ea47559c546564d4312f4d@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: kudu git commit: periodic: add one-shot timers Date: Wed, 4 Oct 2017 22:35:34 +0000 (UTC) archived-at: Wed, 04 Oct 2017 22:35:36 -0000 Repository: kudu Updated Branches: refs/heads/master 3b0ef599b -> e51211997 periodic: add one-shot timers One-shot timers will Stop() themselves after running a user task once. Note: it's worth pointing out that if Callback() races with Snooze(), the outcome is somewhat undefined. That is, there's no telling whether the currently running callback will run the task or snooze it. The same holds for one-shot timers, albeit more profoundly, as Callback() can also race with Start(). To avoid this, Start() the one-shot timer from within the task, or right after the task. I also refactored one-shot and jitter percentage into a new options struct. Change-Id: Ia4d9376172d66c92958071d5abbac63d751e41f3 Reviewed-on: http://gerrit.cloudera.org:8080/8130 Tested-by: Adar Dembo Reviewed-by: Alexey Serbin Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/e5121199 Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/e5121199 Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/e5121199 Branch: refs/heads/master Commit: e51211997e47d88ea0ed5b6476c0655f84eebfb7 Parents: 3b0ef59 Author: Adar Dembo Authored: Wed Sep 20 19:31:13 2017 -0700 Committer: Adar Dembo Committed: Wed Oct 4 22:28:40 2017 +0000 ---------------------------------------------------------------------- src/kudu/rpc/periodic-test.cc | 77 +++++++++++++++++++++++++++++++++++++- src/kudu/rpc/periodic.cc | 39 +++++++++++++++---- src/kudu/rpc/periodic.h | 40 ++++++++++++++++---- 3 files changed, 138 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/e5121199/src/kudu/rpc/periodic-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/rpc/periodic-test.cc b/src/kudu/rpc/periodic-test.cc index d0d2b99..08f379f 100644 --- a/src/kudu/rpc/periodic-test.cc +++ b/src/kudu/rpc/periodic-test.cc @@ -19,6 +19,7 @@ #include #include #include +#include #include @@ -66,7 +67,7 @@ class JitteredPeriodicTimerTest : public PeriodicTimerTest, timer_ = PeriodicTimer::Create(messenger_, [&] { counter_++; }, MonoDelta::FromMilliseconds(period_ms_), - GetParam()); + GetOptions()); } virtual void TearDown() override { @@ -78,6 +79,13 @@ class JitteredPeriodicTimerTest : public PeriodicTimerTest, } protected: + + virtual PeriodicTimer::Options GetOptions() { + PeriodicTimer::Options opts; + opts.jitter_pct = GetParam(); + return opts; + } + atomic counter_; shared_ptr messenger_; shared_ptr timer_; @@ -160,6 +168,8 @@ TEST_F(PeriodicTimerTest, TestCallbackRestartsTimer) { ASSERT_OK(MessengerBuilder("test").Build(&messenger)); // Create a timer that restarts itself from within its functor. + PeriodicTimer::Options opts; + opts.jitter_pct = 0.0; // don't need jittering shared_ptr timer = PeriodicTimer::Create( messenger, [&] { @@ -167,7 +177,7 @@ TEST_F(PeriodicTimerTest, TestCallbackRestartsTimer) { timer->Start(); }, MonoDelta::FromMilliseconds(period_ms_), - 0.0); // jittering would just complicate the measurements below + std::move(opts)); // Run the timer for a fixed amount of time. timer->Start(); @@ -182,5 +192,68 @@ TEST_F(PeriodicTimerTest, TestCallbackRestartsTimer) { ASSERT_LE(timer->NumCallbacksForTests(), kPeriods * 3); } +class JitteredOneShotPeriodicTimerTest : public JitteredPeriodicTimerTest { + protected: + virtual PeriodicTimer::Options GetOptions() override { + PeriodicTimer::Options opts; + opts.jitter_pct = GetParam(); + opts.one_shot = true; + return opts; + } +}; + +INSTANTIATE_TEST_CASE_P(AllJitterModes, + JitteredOneShotPeriodicTimerTest, + ::testing::Values(0.0, 0.25)); + +TEST_P(JitteredOneShotPeriodicTimerTest, TestBasics) { + // Kick off the one-shot timer a few times. + for (int i = 0; i < 3; i++) { + ASSERT_EQ(i, counter_); + + // Eventually the task will run. + timer_->Start(); + ASSERT_EVENTUALLY([&](){ + ASSERT_EQ(i + 1, counter_); + }); + + // Even if we explicitly wait another few periods, the counter value + // shouldn't change. + SleepFor(MonoDelta::FromMilliseconds(period_ms_ * 2)); + ASSERT_EQ(i + 1, counter_); + } +} + +TEST_F(PeriodicTimerTest, TestCallbackRestartsOneShotTimer) { + atomic counter(0); + shared_ptr messenger; + ASSERT_OK(MessengerBuilder("test") + .Build(&messenger)); + + // Create a timer that restarts itself from within its functor. + PeriodicTimer::Options opts; + opts.jitter_pct = 0.0; // don't need jittering + opts.one_shot = true; + shared_ptr timer = PeriodicTimer::Create( + messenger, + [&] { + counter++; + timer->Start(); + }, + MonoDelta::FromMilliseconds(period_ms_), + std::move(opts)); + + // Because the timer restarts itself every time the functor runs, we + // should see the counter value increase with each period. + timer->Start(); + ASSERT_EVENTUALLY([&](){ + ASSERT_GE(counter, 5); + }); + + // Ensure that the reactor threads are fully quiesced (and thus no timer + // callbacks are running) by the time 'counter' is destroyed. + messenger->Shutdown(); +} + } // namespace rpc } // namespace kudu http://git-wip-us.apache.org/repos/asf/kudu/blob/e5121199/src/kudu/rpc/periodic.cc ---------------------------------------------------------------------- diff --git a/src/kudu/rpc/periodic.cc b/src/kudu/rpc/periodic.cc index f4b33db..07674de 100644 --- a/src/kudu/rpc/periodic.cc +++ b/src/kudu/rpc/periodic.cc @@ -36,30 +36,35 @@ using std::weak_ptr; namespace kudu { namespace rpc { +PeriodicTimer::Options::Options() + : jitter_pct(0.25), + one_shot(false) { +} + shared_ptr PeriodicTimer::Create( shared_ptr messenger, RunTaskFunctor functor, MonoDelta period, - double jitter_pct) { + Options options) { return std::make_shared( - std::move(messenger), std::move(functor), period, jitter_pct); + std::move(messenger), std::move(functor), period, options); } PeriodicTimer::PeriodicTimer( shared_ptr messenger, RunTaskFunctor functor, MonoDelta period, - double jitter_pct) + Options options) : messenger_(std::move(messenger)), functor_(std::move(functor)), period_(period), - jitter_pct_(jitter_pct), + options_(std::move(options)), rng_(GetRandomSeed32()), current_callback_generation_(0), num_callbacks_for_tests_(0), started_(false) { - DCHECK_GE(jitter_pct_, 0); - DCHECK_LE(jitter_pct_, 1); + DCHECK_GE(options_.jitter_pct, 0); + DCHECK_LE(options_.jitter_pct, 1); } PeriodicTimer::~PeriodicTimer() { @@ -81,6 +86,11 @@ void PeriodicTimer::Start(boost::optional next_task_delta) { void PeriodicTimer::Stop() { std::lock_guard l(lock_); + StopUnlocked(); +} + +void PeriodicTimer::StopUnlocked() { + DCHECK(lock_.is_locked()); started_ = false; } @@ -101,7 +111,7 @@ void PeriodicTimer::SnoozeUnlocked(boost::optional next_task_delta) { next_task_delta = MonoDelta::FromMilliseconds( GetMinimumPeriod().ToMilliseconds() + rng_.NextDoubleFraction() * - jitter_pct_ * + options_.jitter_pct * (2 * period_.ToMilliseconds())); } next_task_time_ = MonoTime::Now() + *next_task_delta; @@ -110,7 +120,7 @@ void PeriodicTimer::SnoozeUnlocked(boost::optional next_task_delta) { MonoDelta PeriodicTimer::GetMinimumPeriod() { // Given jitter percentage J and period P, this returns (1-J)*P, which is // the lowest possible jittered value. - return MonoDelta::FromMilliseconds((1.0 - jitter_pct_) * + return MonoDelta::FromMilliseconds((1.0 - options_.jitter_pct) * period_.ToMilliseconds()); } @@ -166,10 +176,23 @@ void PeriodicTimer::Callback(int64_t my_callback_generation) { // It's time to run the task. Although the next task time is reset now, // it may be reset again by virtue of running the task itself. run_task = true; + + if (options_.one_shot) { + // Stop the timer first, in case the task wants to restart it. + StopUnlocked(); + } } } + if (run_task) { functor_(); + + if (options_.one_shot) { + // The task was run; exit the loop. Even if the task restarted the timer, + // that will have started a new callback loop, so exiting here is always + // the correct thing to do. + return; + } Snooze(); } http://git-wip-us.apache.org/repos/asf/kudu/blob/e5121199/src/kudu/rpc/periodic.h ---------------------------------------------------------------------- diff --git a/src/kudu/rpc/periodic.h b/src/kudu/rpc/periodic.h index f5b1e77..f65ed85 100644 --- a/src/kudu/rpc/periodic.h +++ b/src/kudu/rpc/periodic.h @@ -66,6 +66,28 @@ class PeriodicTimer : public std::enable_shared_from_this { public: typedef std::function RunTaskFunctor; + struct Options { + Options(); + + // Defines the percentage of the period that will be jittered up or down + // randomly. Together with the period, the periodicity of the timer will + // vary between (1-J)*P and (1+J)*P. + // + // Must be between 0 and 1. + // + // If not set, defaults to 0.25. + double jitter_pct; + + // The timer will automatically stop after running the user's task. + // + // Just as with a normal timer, Snooze() will postpone the running of the + // task, and Stop() will cancel the task outright. Unlike a normal timer, + // both operations will no-op if the timer has already fired. + // + // If not set, defaults to false. + bool one_shot; + }; + // Creates a new PeriodicTimer. // // A ref is taken on 'messenger', which is used for scheduling callbacks. @@ -74,15 +96,14 @@ class PeriodicTimer : public std::enable_shared_from_this { // PeriodicTimer. The task will run on the messenger's reactor threads so it // should do very little work (i.e. no I/O). // - // 'period' defines the period between tasks while 'jitter_pct' (which must - // be between 0 and 1) defines the percentage of the period that will be - // jittered up or down randomly. Taken together, the periodicity of the - // timer varies between (1-J)*P and (1+J)*P. + // 'period' defines the period between tasks. + // + // 'options' allows additional (optional) customization of the timer. static std::shared_ptr Create( std::shared_ptr messenger, RunTaskFunctor functor, MonoDelta period, - double jitter_pct = 0.25); + Options options = {}); ~PeriodicTimer(); @@ -126,7 +147,7 @@ class PeriodicTimer : public std::enable_shared_from_this { PeriodicTimer(std::shared_ptr messenger, RunTaskFunctor functor, MonoDelta period, - double jitter_pct); + Options options); // Calculate the minimum period for the timer, which varies depending on // 'jitter_pct_' and the output of the PRNG. @@ -137,6 +158,9 @@ class PeriodicTimer : public std::enable_shared_from_this { // when it was constructed. void Callback(int64_t my_callback_generation); + // Like Stop() but must be called with 'lock_' held. + void StopUnlocked(); + // Like Snooze() but must be called with 'lock_' held. void SnoozeUnlocked(boost::optional next_task_delta = boost::none); @@ -154,8 +178,8 @@ class PeriodicTimer : public std::enable_shared_from_this { // User-specified task period. const MonoDelta period_; - // User-specified jitter percentage. - const double jitter_pct_; + // User-specified options. + const Options options_; // Protects all mutable state below. mutable simple_spinlock lock_;