impala-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tarmstr...@apache.org
Subject [9/9] incubator-impala git commit: IMPALA-5891: fix PeriodicCounterUpdater initialization
Date Tue, 05 Sep 2017 22:26:54 GMT
IMPALA-5891: fix PeriodicCounterUpdater initialization

Avoid running static destructors and constructors to avoid the potential
for startup and teardown races and hard-to-understand behaviour.

Testing:
Ran core tests.

Change-Id: Ieede9fa194605fb53033033959110f3ef12f18c3
Reviewed-on: http://gerrit.cloudera.org:8080/7942
Reviewed-by: Matthew Jacobs <mj@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/796db0fc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/796db0fc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/796db0fc

Branch: refs/heads/master
Commit: 796db0fce374c9aff0ec6e1e82862db012b8854a
Parents: ac8a72f
Author: Tim Armstrong <tarmstrong@cloudera.com>
Authored: Fri Sep 1 14:36:23 2017 -0700
Committer: Impala Public Jenkins <impala-public-jenkins@gerrit.cloudera.org>
Committed: Tue Sep 5 21:35:20 2017 +0000

----------------------------------------------------------------------
 be/src/common/init.cc                   |  3 ++
 be/src/util/periodic-counter-updater.cc | 58 ++++++++++++++--------------
 be/src/util/periodic-counter-updater.h  | 10 ++---
 3 files changed, 38 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/796db0fc/be/src/common/init.cc
----------------------------------------------------------------------
diff --git a/be/src/common/init.cc b/be/src/common/init.cc
index 6b48bb8..f2df173 100644
--- a/be/src/common/init.cc
+++ b/be/src/common/init.cc
@@ -45,6 +45,7 @@
 #include "util/network-util.h"
 #include "util/openssl-util.h"
 #include "util/os-info.h"
+#include "util/periodic-counter-updater.h"
 #include "util/pretty-printer.h"
 #include "util/redactor.h"
 #include "util/test-info.h"
@@ -209,6 +210,8 @@ void impala::InitCommonRuntime(int argc, char** argv, bool init_jvm,
 
   pause_monitor.reset(new Thread("common", "pause-monitor", &PauseMonitorLoop));
 
+  PeriodicCounterUpdater::Init();
+
   LOG(INFO) << impala::GetVersionString();
   LOG(INFO) << "Using hostname: " << FLAGS_hostname;
   impala::LogCommandLineFlags();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/796db0fc/be/src/util/periodic-counter-updater.cc
----------------------------------------------------------------------
diff --git a/be/src/util/periodic-counter-updater.cc b/be/src/util/periodic-counter-updater.cc
index 0a8a8f7..b1c755a 100644
--- a/be/src/util/periodic-counter-updater.cc
+++ b/be/src/util/periodic-counter-updater.cc
@@ -32,12 +32,14 @@ namespace impala {
 DEFINE_int32(periodic_counter_update_period_ms, 500, "Period to update rate counters and"
     " sampling counters in ms");
 
-PeriodicCounterUpdater PeriodicCounterUpdater::state_;
-
-PeriodicCounterUpdater::PeriodicCounterUpdater() {
-  DCHECK_EQ(this, &state_);
-  state_.update_thread_.reset(
-      new thread(&PeriodicCounterUpdater::UpdateLoop, this));
+PeriodicCounterUpdater* PeriodicCounterUpdater::instance_ = nullptr;
+
+void PeriodicCounterUpdater::Init() {
+  DCHECK(instance_ == nullptr);
+  // Create the singleton, which will live until the process terminates.
+  instance_ = new PeriodicCounterUpdater;
+  instance_->update_thread_.reset(
+      new thread(&PeriodicCounterUpdater::UpdateLoop, instance_));
 }
 
 void PeriodicCounterUpdater::RegisterPeriodicCounter(
@@ -52,8 +54,8 @@ void PeriodicCounterUpdater::RegisterPeriodicCounter(
       counter.src_counter = src_counter;
       counter.sample_fn = sample_fn;
       counter.elapsed_ms = 0;
-      lock_guard<SpinLock> ratelock(state_.rate_lock_);
-      state_.rate_counters_[dst_counter] = counter;
+      lock_guard<SpinLock> ratelock(instance_->rate_lock_);
+      instance_->rate_counters_[dst_counter] = counter;
       break;
     }
     case SAMPLING_COUNTER: {
@@ -62,8 +64,8 @@ void PeriodicCounterUpdater::RegisterPeriodicCounter(
       counter.sample_fn = sample_fn;
       counter.num_sampled = 0;
       counter.total_sampled_value = 0;
-      lock_guard<SpinLock> samplinglock(state_.sampling_lock_);
-      state_.sampling_counters_[dst_counter] = counter;
+      lock_guard<SpinLock> samplinglock(instance_->sampling_lock_);
+      instance_->sampling_counters_[dst_counter] = counter;
       break;
     }
     default:
@@ -72,13 +74,13 @@ void PeriodicCounterUpdater::RegisterPeriodicCounter(
 }
 
 void PeriodicCounterUpdater::StopRateCounter(RuntimeProfile::Counter* counter) {
-  lock_guard<SpinLock> ratelock(state_.rate_lock_);
-  state_.rate_counters_.erase(counter);
+  lock_guard<SpinLock> ratelock(instance_->rate_lock_);
+  instance_->rate_counters_.erase(counter);
 }
 
 void PeriodicCounterUpdater::StopSamplingCounter(RuntimeProfile::Counter* counter) {
-  lock_guard<SpinLock> samplinglock(state_.sampling_lock_);
-  state_.sampling_counters_.erase(counter);
+  lock_guard<SpinLock> samplinglock(instance_->sampling_lock_);
+  instance_->sampling_counters_.erase(counter);
 }
 
 void PeriodicCounterUpdater::RegisterBucketingCounters(
@@ -86,20 +88,20 @@ void PeriodicCounterUpdater::RegisterBucketingCounters(
   BucketCountersInfo info;
   info.src_counter = src_counter;
   info.num_sampled = 0;
-  lock_guard<SpinLock> bucketinglock(state_.bucketing_lock_);
-  state_.bucketing_counters_[buckets] = info;
+  lock_guard<SpinLock> bucketinglock(instance_->bucketing_lock_);
+  instance_->bucketing_counters_[buckets] = info;
 }
 
 void PeriodicCounterUpdater::StopBucketingCounters(
     vector<RuntimeProfile::Counter*>* buckets, bool convert) {
   int64_t num_sampled = 0;
   {
-    lock_guard<SpinLock> bucketinglock(state_.bucketing_lock_);
+    lock_guard<SpinLock> bucketinglock(instance_->bucketing_lock_);
     BucketCountersMap::iterator itr =
-        state_.bucketing_counters_.find(buckets);
-    if (itr != state_.bucketing_counters_.end()) {
+        instance_->bucketing_counters_.find(buckets);
+    if (itr != instance_->bucketing_counters_.end()) {
       num_sampled = itr->second.num_sampled;
-      state_.bucketing_counters_.erase(itr);
+      instance_->bucketing_counters_.erase(itr);
     }
   }
 
@@ -114,14 +116,14 @@ void PeriodicCounterUpdater::StopBucketingCounters(
 
 void PeriodicCounterUpdater::RegisterTimeSeriesCounter(
     RuntimeProfile::TimeSeriesCounter* counter) {
-  lock_guard<SpinLock> timeserieslock(state_.time_series_lock_);
-  state_.time_series_counters_.insert(counter);
+  lock_guard<SpinLock> timeserieslock(instance_->time_series_lock_);
+  instance_->time_series_counters_.insert(counter);
 }
 
 void PeriodicCounterUpdater::StopTimeSeriesCounter(
     RuntimeProfile::TimeSeriesCounter* counter) {
-  lock_guard<SpinLock> timeserieslock(state_.time_series_lock_);
-  state_.time_series_counters_.erase(counter);
+  lock_guard<SpinLock> timeserieslock(instance_->time_series_lock_);
+  instance_->time_series_counters_.erase(counter);
 }
 
 void PeriodicCounterUpdater::UpdateLoop() {
@@ -132,7 +134,7 @@ void PeriodicCounterUpdater::UpdateLoop() {
     int elapsed_ms = elapsed.total_milliseconds();
 
     {
-      lock_guard<SpinLock> ratelock(state_.rate_lock_);
+      lock_guard<SpinLock> ratelock(instance_->rate_lock_);
       for (RateCounterMap::iterator it = rate_counters_.begin();
            it != rate_counters_.end(); ++it) {
         it->second.elapsed_ms += elapsed_ms;
@@ -149,7 +151,7 @@ void PeriodicCounterUpdater::UpdateLoop() {
     }
 
     {
-      lock_guard<SpinLock> samplinglock(state_.sampling_lock_);
+      lock_guard<SpinLock> samplinglock(instance_->sampling_lock_);
       for (SamplingCounterMap::iterator it = sampling_counters_.begin();
            it != sampling_counters_.end(); ++it) {
         ++it->second.num_sampled;
@@ -168,7 +170,7 @@ void PeriodicCounterUpdater::UpdateLoop() {
     }
 
     {
-      lock_guard<SpinLock> bucketinglock(state_.bucketing_lock_);
+      lock_guard<SpinLock> bucketinglock(instance_->bucketing_lock_);
       for (BucketCountersMap::iterator it = bucketing_counters_.begin();
            it != bucketing_counters_.end(); ++it) {
         int64_t val = it->second.src_counter->value();
@@ -179,7 +181,7 @@ void PeriodicCounterUpdater::UpdateLoop() {
     }
 
     {
-      lock_guard<SpinLock> timeserieslock(state_.time_series_lock_);
+      lock_guard<SpinLock> timeserieslock(instance_->time_series_lock_);
       for (TimeSeriesCounters::iterator it = time_series_counters_.begin();
            it != time_series_counters_.end(); ++it) {
         (*it)->AddSample(elapsed_ms);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/796db0fc/be/src/util/periodic-counter-updater.h
----------------------------------------------------------------------
diff --git a/be/src/util/periodic-counter-updater.h b/be/src/util/periodic-counter-updater.h
index 893b887..c603522 100644
--- a/be/src/util/periodic-counter-updater.h
+++ b/be/src/util/periodic-counter-updater.h
@@ -44,6 +44,10 @@ class PeriodicCounterUpdater {
     SAMPLING_COUNTER,
   };
 
+  // Sets up data structures and starts the counter update thread. Should only be called
+  // once during process startup and must be called before other methods.
+  static void Init();
+
   /// Registers a periodic counter to be updated by the update thread.
   /// Either sample_fn or dst_counter must be non-NULL.  When the periodic counter
   /// is updated, it either gets the value from the dst_counter or calls the sample
@@ -96,10 +100,6 @@ class PeriodicCounterUpdater {
     /// TODO: customize bucketing
   };
 
-  // Starts the counter update thread. We only have a single static object, so this
-  // is executed automatically when the process starts up.
-  PeriodicCounterUpdater();
-
   /// Loop for periodic counter update thread.  This thread wakes up once in a while
   /// and updates all the added rate counters and sampling counters.
   [[noreturn]] void UpdateLoop();
@@ -140,7 +140,7 @@ class PeriodicCounterUpdater {
 
   /// Singleton object that keeps track of all rate counters and the thread
   /// for updating them.
-  static PeriodicCounterUpdater state_;
+  static PeriodicCounterUpdater* instance_;
 };
 
 }


Mime
View raw message