impala-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tarmstr...@apache.org
Subject incubator-impala git commit: IMPALA-5130: fix race in MemTracker::EnableReservationReporting()
Date Thu, 30 Mar 2017 00:20:54 GMT
Repository: incubator-impala
Updated Branches:
  refs/heads/master 23100102c -> d0152d424


IMPALA-5130: fix race in MemTracker::EnableReservationReporting()

MemTracker::LogUsage() and MemTracker::EnableReservationReporting()
could race, with LogUsage() seeing a partially-constructed
'reservation_counters_' value and crashing.

This patch fixes that issue by atomically swapping in
'reservation_counters_' so that no threads see a
partially-constructed value.

While we're here, swap boost::mutex for the higher-performance
SpinLock for 'child_trackers_lock_'.

Change-Id: I2434c952d97c46040e29fca2327c244dd30599d2
Reviewed-on: http://gerrit.cloudera.org:8080/6502
Reviewed-by: Dan Hecht <dhecht@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/d0152d42
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/d0152d42
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/d0152d42

Branch: refs/heads/master
Commit: d0152d424ad1aa21a91122ca874a81793d497720
Parents: 2310010
Author: Tim Armstrong <tarmstrong@cloudera.com>
Authored: Tue Mar 28 13:54:34 2017 -0700
Committer: Impala Public Jenkins <impala-public-jenkins@gerrit.cloudera.org>
Committed: Thu Mar 30 00:17:24 2017 +0000

----------------------------------------------------------------------
 be/src/common/atomic.h        | 17 +++++++++++++++++
 be/src/runtime/mem-tracker.cc | 23 +++++++++++++----------
 be/src/runtime/mem-tracker.h  | 14 ++++++++------
 3 files changed, 38 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d0152d42/be/src/common/atomic.h
----------------------------------------------------------------------
diff --git a/be/src/common/atomic.h b/be/src/common/atomic.h
index 784be11..791f3b2 100644
--- a/be/src/common/atomic.h
+++ b/be/src/common/atomic.h
@@ -116,6 +116,23 @@ class AtomicInt {
 typedef internal::AtomicInt<int32_t> AtomicInt32;
 typedef internal::AtomicInt<int64_t> AtomicInt64;
 
+/// Atomic pointer. Operations have the same semantics as AtomicInt.
+template<typename T>
+class AtomicPtr {
+ public:
+  AtomicPtr(T* initial = nullptr) : ptr_(reinterpret_cast<intptr_t>(initial)) {}
+
+  /// Atomic load with "acquire" memory-ordering semantic.
+  ALWAYS_INLINE T* Load() const { return reinterpret_cast<T*>(ptr_.Load()); }
+
+  /// Atomic store with "release" memory-ordering semantic.
+  ALWAYS_INLINE void Store(T* val) { ptr_.Store(reinterpret_cast<intptr_t>(val)); }
+
+ private:
+  internal::AtomicInt<intptr_t> ptr_;
+};
+
+
 }
 
 #endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d0152d42/be/src/runtime/mem-tracker.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-tracker.cc b/be/src/runtime/mem-tracker.cc
index f4073b5..a48ede9 100644
--- a/be/src/runtime/mem-tracker.cc
+++ b/be/src/runtime/mem-tracker.cc
@@ -108,20 +108,20 @@ void MemTracker::Init() {
 }
 
 void MemTracker::AddChildTracker(MemTracker* tracker) {
-  lock_guard<mutex> l(child_trackers_lock_);
+  lock_guard<SpinLock> l(child_trackers_lock_);
   tracker->child_tracker_it_ = child_trackers_.insert(child_trackers_.end(), tracker);
 }
 
 void MemTracker::UnregisterFromParent() {
   DCHECK(parent_ != NULL);
-  lock_guard<mutex> l(parent_->child_trackers_lock_);
+  lock_guard<SpinLock> l(parent_->child_trackers_lock_);
   parent_->child_trackers_.erase(child_tracker_it_);
   child_tracker_it_ = parent_->child_trackers_.end();
 }
 
 void MemTracker::EnableReservationReporting(const ReservationTrackerCounters& counters)
{
-  reservation_counters_.reset(new ReservationTrackerCounters);
-  *reservation_counters_ = counters;
+  ReservationTrackerCounters* new_counters = new ReservationTrackerCounters(counters);
+  reservation_counters_.Store(new_counters);
 }
 
 int64_t MemTracker::GetPoolMemReserved() const {
@@ -130,7 +130,7 @@ int64_t MemTracker::GetPoolMemReserved() const {
   DCHECK_EQ(limit_, -1) << LogUsage("");
 
   int64_t mem_reserved = 0L;
-  lock_guard<mutex> l(child_trackers_lock_);
+  lock_guard<SpinLock> l(child_trackers_lock_);
   for (list<MemTracker*>::const_iterator it = child_trackers_.begin();
        it != child_trackers_.end(); ++it) {
     int64_t child_limit = (*it)->limit();
@@ -194,6 +194,7 @@ MemTracker::~MemTracker() {
   DCHECK_EQ(consumption_->current_value(), 0) << label_ << "\n"
                                               << GetStackTrace() << "\n"
                                               << LogUsage("");
+  delete reservation_counters_.Load();
 }
 
 void MemTracker::RegisterMetrics(MetricGroup* metrics, const string& prefix) {
@@ -246,11 +247,13 @@ string MemTracker::LogUsage(const string& prefix) const {
 
   int64_t total = consumption();
   int64_t peak = consumption_->value();
-  if (reservation_counters_ != NULL) {
-    int64_t reservation = reservation_counters_->peak_reservation->current_value();
+
+  ReservationTrackerCounters* reservation_counters = reservation_counters_.Load();
+  if (reservation_counters != nullptr) {
+    int64_t reservation = reservation_counters->peak_reservation->current_value();
     int64_t used_reservation =
-        reservation_counters_->peak_used_reservation->current_value();
-    int64_t reservation_limit = reservation_counters_->reservation_limit->value();
+        reservation_counters->peak_used_reservation->current_value();
+    int64_t reservation_limit = reservation_counters->reservation_limit->value();
     ss << " BufferPoolUsed/Reservation="
        << PrettyPrinter::Print(used_reservation, TUnit::BYTES) << "/"
        << PrettyPrinter::Print(reservation, TUnit::BYTES);
@@ -265,7 +268,7 @@ string MemTracker::LogUsage(const string& prefix) const {
   stringstream prefix_ss;
   prefix_ss << prefix << "  ";
   string new_prefix = prefix_ss.str();
-  lock_guard<mutex> l(child_trackers_lock_);
+  lock_guard<SpinLock> l(child_trackers_lock_);
   string child_trackers_usage = LogUsage(new_prefix, child_trackers_);
   if (!child_trackers_usage.empty()) ss << "\n" << child_trackers_usage;
   return ss.str();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d0152d42/be/src/runtime/mem-tracker.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-tracker.h b/be/src/runtime/mem-tracker.h
index 8fd40b5..acf3b46 100644
--- a/be/src/runtime/mem-tracker.h
+++ b/be/src/runtime/mem-tracker.h
@@ -65,6 +65,8 @@ class TQueryOptions;
 /// tracker has a GcFunction that releases any unused memory still held by tcmalloc, so
 /// this will be called before the process limit is reported as exceeded. GcFunctions are
 /// called in the order they are added, so expensive functions should be added last.
+/// GcFunctions are called with a global lock held, so should be non-blocking and not
+/// call back into MemTrackers, except to release memory.
 //
 /// This class is thread-safe.
 class MemTracker {
@@ -293,6 +295,7 @@ class MemTracker {
   MemTracker* parent() const { return parent_; }
 
   /// Signature for function that can be called to free some memory after limit is reached.
+  /// See the class header for further details on what these functions should do.
   typedef boost::function<void ()> GcFunction;
 
   /// Add a function 'f' to be called if the limit is reached.
@@ -324,8 +327,7 @@ class MemTracker {
   bool CheckLimitExceeded() const { return limit_ >= 0 && limit_ < consumption();
}
 
   /// If consumption is higher than max_consumption, attempts to free memory by calling
-  /// any
-  /// added GC functions.  Returns true if max_consumption is still exceeded. Takes
+  /// any added GC functions.  Returns true if max_consumption is still exceeded. Takes
   /// gc_lock. Updates metrics if initialized.
   bool GcMemory(int64_t max_consumption);
 
@@ -384,9 +386,9 @@ class MemTracker {
   UIntGauge* consumption_metric_;
 
   /// If non-NULL, counters from a corresponding ReservationTracker that should be
-  /// reported in logs and other diagnostics. The counters are owned by the fragment's
-  /// RuntimeProfile.
-  boost::scoped_ptr<ReservationTrackerCounters> reservation_counters_;
+  /// reported in logs and other diagnostics. Owned by this MemTracker. The counters
+  /// are owned by the fragment's RuntimeProfile.
+  AtomicPtr<ReservationTrackerCounters> reservation_counters_;
 
   std::vector<MemTracker*> all_trackers_;  // this tracker plus all of its ancestors
   std::vector<MemTracker*> limit_trackers_;  // all_trackers_ with valid limits
@@ -394,7 +396,7 @@ class MemTracker {
   /// All the child trackers of this tracker. Used only for computing resource pool mem
   /// reserved and error reporting, i.e., updating a parent tracker does not update its
   /// children.
-  mutable boost::mutex child_trackers_lock_;
+  mutable SpinLock child_trackers_lock_;
   std::list<MemTracker*> child_trackers_;
 
   /// Iterator into parent_->child_trackers_ for this object. Stored to have O(1)


Mime
View raw message