impala-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tarmstr...@apache.org
Subject incubator-impala git commit: IMPALA-5238: transfer reservations between trackers
Date Fri, 12 May 2017 01:07:59 GMT
Repository: incubator-impala
Updated Branches:
  refs/heads/master a16a0fa84 -> 5ec1984f5


IMPALA-5238: transfer reservations between trackers

This is a primitive needed to implement claiming and distribution
of initial reservations. It supports transferring reservation between
any two ReservationTrackers under the same query.

Also remove the public DecreaseReservation() method, which is now
mostly redundant and we have no plans to use.

Testing:
* Added a test that exercises transfer between trackers at different
  levels and with different relationships.

Change-Id: I21f008abaf1aa4fcd2d854769a603b97589af3b3
Reviewed-on: http://gerrit.cloudera.org:8080/6708
Reviewed-by: Tim Armstrong <tarmstrong@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/5ec1984f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/5ec1984f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/5ec1984f

Branch: refs/heads/master
Commit: 5ec1984f56a2010fb9ca98bc58ca0c9ef83b2391
Parents: a16a0fa
Author: Tim Armstrong <tarmstrong@cloudera.com>
Authored: Wed Apr 19 16:49:11 2017 -0700
Committer: Impala Public Jenkins <impala-public-jenkins@gerrit.cloudera.org>
Committed: Fri May 12 00:46:55 2017 +0000

----------------------------------------------------------------------
 .../bufferpool/reservation-tracker-test.cc      | 141 +++++++++++++---
 .../runtime/bufferpool/reservation-tracker.cc   | 164 ++++++++++++++-----
 be/src/runtime/bufferpool/reservation-tracker.h |  66 ++++++--
 3 files changed, 296 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5ec1984f/be/src/runtime/bufferpool/reservation-tracker-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/reservation-tracker-test.cc b/be/src/runtime/bufferpool/reservation-tracker-test.cc
index 416a53e..a794a91 100644
--- a/be/src/runtime/bufferpool/reservation-tracker-test.cc
+++ b/be/src/runtime/bufferpool/reservation-tracker-test.cc
@@ -141,13 +141,7 @@ TEST_F(ReservationTrackerTest, BasicTwoLevel) {
   child.ReleaseTo(1);
   ASSERT_EQ(0, child.GetUsedReservation());
 
-  // Child reservation should be returned all the way up the tree.
-  child.DecreaseReservation(1);
-  ASSERT_EQ(child_reservation, root_.GetReservation());
-  ASSERT_EQ(child_reservation - 1, child.GetReservation());
-  ASSERT_EQ(child_reservation - 1, root_.GetChildReservations());
-
-  // Closing the child should release its reservation.
+  // Closing the child should release its reservation all the way up the tree.
   child.Close();
   ASSERT_EQ(1, root_.GetReservation());
   ASSERT_EQ(0, root_.GetChildReservations());
@@ -284,15 +278,12 @@ TEST_F(ReservationTrackerTest, MemTrackerIntegrationTwoLevel) {
   ASSERT_EQ(2 * MIN_BUFFER_LEN, root_.GetChildReservations());
 
   // Check that released memory is decremented from all trackers correctly.
-  child_reservations1.DecreaseReservation(MIN_BUFFER_LEN);
-  child_reservations2.DecreaseReservation(MIN_BUFFER_LEN);
-  ASSERT_EQ(0, child_reservations2.GetReservation());
+  child_reservations1.Close();
+  child_reservations2.Close();
+  ASSERT_EQ(0, child_mem_tracker1.consumption());
   ASSERT_EQ(0, child_mem_tracker2.consumption());
   ASSERT_EQ(0, root_mem_tracker.consumption());
   ASSERT_EQ(0, root_.GetUsedReservation());
-
-  child_reservations1.Close();
-  child_reservations2.Close();
   child_mem_tracker1.UnregisterFromParent();
   child_mem_tracker2.UnregisterFromParent();
 }
@@ -315,8 +306,6 @@ TEST_F(ReservationTrackerTest, MemTrackerIntegrationMultiLevel) {
   for (int i = 1; i < HIERARCHY_DEPTH; ++i) {
     mem_trackers[i].reset(new MemTracker(
         mem_limits[i], Substitute("Child $0", i), mem_trackers[i - 1].get()));
-    reservations[i].InitChildTracker(
-        NewProfile(), &reservations[i - 1], mem_trackers[i].get(), 500);
   }
 
   vector<int> interesting_amounts({LIMIT - 1, LIMIT, LIMIT + 1});
@@ -326,6 +315,10 @@ TEST_F(ReservationTrackerTest, MemTrackerIntegrationMultiLevel) {
   for (int level = 1; level < HIERARCHY_DEPTH; ++level) {
     int64_t lowest_limit = mem_trackers[level]->lowest_limit();
     for (int amount : interesting_amounts) {
+      // Initialize the tracker, increase reservation, then release reservation by closing
+      // the tracker.
+      reservations[level].InitChildTracker(
+          NewProfile(), &reservations[level - 1], mem_trackers[level].get(), 500);
       bool increased = reservations[level].IncreaseReservation(amount);
       if (lowest_limit == -1 || amount <= lowest_limit) {
         // The increase should go through.
@@ -338,12 +331,12 @@ TEST_F(ReservationTrackerTest, MemTrackerIntegrationMultiLevel) {
         }
 
         LOG(INFO) << "\n" << mem_trackers[0]->LogUsage();
-        reservations[level].DecreaseReservation(amount);
+        reservations[level].Close();
       } else {
         ASSERT_FALSE(increased);
       }
-      // We should be back in the original state.
-      for (int i = 0; i < HIERARCHY_DEPTH; ++i) {
+      // Reservations should be released on all ancestors.
+      for (int i = 0; i < level; ++i) {
         ASSERT_EQ(0, reservations[i].GetReservation()) << i << ": "
                                                        << reservations[i].DebugString();
         ASSERT_EQ(0, reservations[i].GetChildReservations());
@@ -365,7 +358,8 @@ TEST_F(ReservationTrackerTest, MemTrackerIntegrationMultiLevel) {
       ASSERT_EQ(amount, reservations[ancestor].GetChildReservations());
       ASSERT_EQ(amount, mem_trackers[ancestor]->consumption());
     }
-    reservations[level].DecreaseReservation(amount);
+    // Return the reservation to the root before the next iteration.
+    reservations[level].TransferReservationTo(&reservations[0], amount);
   }
 
   for (int i = HIERARCHY_DEPTH - 1; i >= 0; --i) {
@@ -373,6 +367,115 @@ TEST_F(ReservationTrackerTest, MemTrackerIntegrationMultiLevel) {
     if (i != 0) mem_trackers[i]->UnregisterFromParent();
   }
 }
+
+// Test TransferReservation().
+TEST_F(ReservationTrackerTest, TransferReservation) {
+  Status status;
+  // Set up this hierarchy, to test transfers between different levels and
+  // different cases:
+  //    (root) limit = 4
+  //      ^
+  //      |
+  //  (grandparent) limit = 3
+  //   ^         ^
+  //   |         |
+  //  (parent) (aunt) limit =2
+  //   ^
+  //   |
+  //  (child)
+  const int64_t TOTAL_MEM = MIN_BUFFER_LEN * 4;
+  const int64_t GRANDPARENT_LIMIT = MIN_BUFFER_LEN * 3;
+  const int64_t AUNT_LIMIT = MIN_BUFFER_LEN * 2;
+
+  root_.InitRootTracker(nullptr, TOTAL_MEM);
+  MemTracker* root_mem_tracker = obj_pool_.Add(new MemTracker);
+  ReservationTracker* grandparent = obj_pool_.Add(new ReservationTracker());
+  MemTracker* grandparent_mem_tracker =
+      obj_pool_.Add(new MemTracker(TOTAL_MEM, "grandparent", root_mem_tracker));
+  ReservationTracker* parent = obj_pool_.Add(new ReservationTracker());
+  MemTracker* parent_mem_tracker =
+      obj_pool_.Add(new MemTracker(-1, "parent", grandparent_mem_tracker));
+  ReservationTracker* aunt = obj_pool_.Add(new ReservationTracker());
+  ReservationTracker* child = obj_pool_.Add(new ReservationTracker());
+  MemTracker* child_mem_tracker =
+      obj_pool_.Add(new MemTracker(-1, "child", parent_mem_tracker));
+  grandparent->InitChildTracker(nullptr, &root_, grandparent_mem_tracker, TOTAL_MEM);
+  parent->InitChildTracker(
+      nullptr, grandparent, parent_mem_tracker, numeric_limits<int64_t>::max());
+  aunt->InitChildTracker(nullptr, grandparent, nullptr, AUNT_LIMIT);
+  child->InitChildTracker(
+      nullptr, parent, child_mem_tracker, numeric_limits<int64_t>::max());
+
+  ASSERT_TRUE(child->IncreaseReservation(GRANDPARENT_LIMIT));
+  // Transfer from child to self (no-op).
+  ASSERT_TRUE(child->TransferReservationTo(child, GRANDPARENT_LIMIT));
+  EXPECT_EQ(GRANDPARENT_LIMIT, child->GetReservation());
+
+  // Transfer from child to parent.
+  ASSERT_TRUE(child->TransferReservationTo(parent, GRANDPARENT_LIMIT));
+  EXPECT_EQ(0, child->GetReservation());
+  EXPECT_EQ(0, child_mem_tracker->consumption());
+  EXPECT_EQ(0, parent->GetChildReservations());
+  EXPECT_EQ(GRANDPARENT_LIMIT, parent->GetReservation());
+  EXPECT_EQ(GRANDPARENT_LIMIT, parent_mem_tracker->consumption());
+  EXPECT_EQ(GRANDPARENT_LIMIT, grandparent->GetChildReservations());
+  EXPECT_EQ(GRANDPARENT_LIMIT, grandparent->GetReservation());
+  EXPECT_EQ(GRANDPARENT_LIMIT, grandparent_mem_tracker->consumption());
+  EXPECT_EQ(GRANDPARENT_LIMIT, root_.GetReservation());
+
+  // Transfer from parent to aunt, up to aunt's limit.
+  ASSERT_TRUE(parent->TransferReservationTo(aunt, AUNT_LIMIT));
+  EXPECT_EQ(GRANDPARENT_LIMIT - AUNT_LIMIT, parent->GetReservation());
+  EXPECT_EQ(GRANDPARENT_LIMIT - AUNT_LIMIT, parent_mem_tracker->consumption());
+  EXPECT_EQ(AUNT_LIMIT, aunt->GetReservation());
+  EXPECT_EQ(GRANDPARENT_LIMIT, grandparent->GetChildReservations());
+  EXPECT_EQ(GRANDPARENT_LIMIT, grandparent->GetReservation());
+  EXPECT_EQ(GRANDPARENT_LIMIT, grandparent_mem_tracker->consumption());
+  EXPECT_EQ(GRANDPARENT_LIMIT, root_.GetReservation());
+  // Cannot exceed aunt's limit by transferring.
+  ASSERT_FALSE(parent->TransferReservationTo(aunt, parent->GetReservation()));
+
+  // Transfer from parent to child.
+  ASSERT_TRUE(parent->TransferReservationTo(child, parent->GetReservation()));
+  EXPECT_EQ(GRANDPARENT_LIMIT - AUNT_LIMIT, child->GetReservation());
+  EXPECT_EQ(GRANDPARENT_LIMIT - AUNT_LIMIT, child_mem_tracker->consumption());
+  EXPECT_EQ(GRANDPARENT_LIMIT - AUNT_LIMIT, parent->GetReservation());
+  EXPECT_EQ(GRANDPARENT_LIMIT - AUNT_LIMIT, parent_mem_tracker->consumption());
+  EXPECT_EQ(GRANDPARENT_LIMIT, grandparent->GetChildReservations());
+  EXPECT_EQ(GRANDPARENT_LIMIT, grandparent->GetReservation());
+  EXPECT_EQ(GRANDPARENT_LIMIT, grandparent_mem_tracker->consumption());
+
+  // Transfer from aunt to child.
+  ASSERT_TRUE(aunt->TransferReservationTo(child, AUNT_LIMIT));
+  EXPECT_EQ(GRANDPARENT_LIMIT, child->GetReservation());
+  EXPECT_EQ(GRANDPARENT_LIMIT, child_mem_tracker->consumption());
+  EXPECT_EQ(GRANDPARENT_LIMIT, parent->GetReservation());
+  EXPECT_EQ(GRANDPARENT_LIMIT, parent_mem_tracker->consumption());
+  EXPECT_EQ(0, aunt->GetReservation());
+  EXPECT_EQ(GRANDPARENT_LIMIT, grandparent->GetChildReservations());
+  EXPECT_EQ(GRANDPARENT_LIMIT, grandparent->GetReservation());
+  EXPECT_EQ(GRANDPARENT_LIMIT, grandparent_mem_tracker->consumption());
+
+  // Transfer from child to grandparent.
+  ASSERT_TRUE(child->TransferReservationTo(grandparent, GRANDPARENT_LIMIT));
+  EXPECT_EQ(0, child->GetReservation());
+  EXPECT_EQ(0, child_mem_tracker->consumption());
+  EXPECT_EQ(0, parent->GetReservation());
+  EXPECT_EQ(0, parent_mem_tracker->consumption());
+  EXPECT_EQ(0, aunt->GetReservation());
+  EXPECT_EQ(0, grandparent->GetChildReservations());
+  EXPECT_EQ(GRANDPARENT_LIMIT, grandparent->GetReservation());
+  EXPECT_EQ(GRANDPARENT_LIMIT, grandparent_mem_tracker->consumption());
+  EXPECT_EQ(GRANDPARENT_LIMIT, root_.GetReservation());
+
+  child->Close();
+  child_mem_tracker->UnregisterFromParent();
+  aunt->Close();
+  parent->Close();
+  parent_mem_tracker->UnregisterFromParent();
+  grandparent->Close();
+  grandparent_mem_tracker->UnregisterFromParent();
+}
 }
 
 IMPALA_TEST_MAIN();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5ec1984f/be/src/runtime/bufferpool/reservation-tracker.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/reservation-tracker.cc b/be/src/runtime/bufferpool/reservation-tracker.cc
index 7138d69..13cd67c 100644
--- a/be/src/runtime/bufferpool/reservation-tracker.cc
+++ b/be/src/runtime/bufferpool/reservation-tracker.cc
@@ -29,7 +29,7 @@
 
 namespace impala {
 
-ReservationTracker::ReservationTracker() : initialized_(false), mem_tracker_(NULL) {}
+ReservationTracker::ReservationTracker() : initialized_(false), mem_tracker_(nullptr) {}
 
 ReservationTracker::~ReservationTracker() {
   DCHECK(!initialized_);
@@ -39,8 +39,8 @@ void ReservationTracker::InitRootTracker(
     RuntimeProfile* profile, int64_t reservation_limit) {
   lock_guard<SpinLock> l(lock_);
   DCHECK(!initialized_);
-  parent_ = NULL;
-  mem_tracker_ = NULL;
+  parent_ = nullptr;
+  mem_tracker_ = nullptr;
   reservation_limit_ = reservation_limit;
   reservation_ = 0;
   used_reservation_ = 0;
@@ -55,7 +55,7 @@ void ReservationTracker::InitRootTracker(
 
 void ReservationTracker::InitChildTracker(RuntimeProfile* profile,
     ReservationTracker* parent, MemTracker* mem_tracker, int64_t reservation_limit) {
-  DCHECK(parent != NULL);
+  DCHECK(parent != nullptr);
   DCHECK_GE(reservation_limit, 0);
 
   lock_guard<SpinLock> l(lock_);
@@ -69,9 +69,9 @@ void ReservationTracker::InitChildTracker(RuntimeProfile* profile,
   child_reservations_ = 0;
   initialized_ = true;
 
-  if (mem_tracker_ != NULL) {
+  if (mem_tracker_ != nullptr) {
     MemTracker* parent_mem_tracker = GetParentMemTracker();
-    if (parent_mem_tracker != NULL) {
+    if (parent_mem_tracker != nullptr) {
       // Make sure the parent links of the MemTrackers correspond to our parent links.
       DCHECK_EQ(parent_mem_tracker, mem_tracker_->parent());
       // Make sure we don't have a lower limit than the ancestor, since we don't enforce
@@ -81,8 +81,8 @@ void ReservationTracker::InitChildTracker(RuntimeProfile* profile,
       // Make sure we didn't leave a gap in the links. E.g. this tracker's grandparent
       // shouldn't have a MemTracker.
       ReservationTracker* ancestor = parent_;
-      while (ancestor != NULL) {
-        DCHECK(ancestor->mem_tracker_ == NULL);
+      while (ancestor != nullptr) {
+        DCHECK(ancestor->mem_tracker_ == nullptr);
         ancestor = ancestor->parent_;
       }
     }
@@ -95,14 +95,13 @@ void ReservationTracker::InitChildTracker(RuntimeProfile* profile,
 
 void ReservationTracker::InitCounters(
     RuntimeProfile* profile, int64_t reservation_limit) {
-  bool profile_provided = profile != NULL;
-  if (profile == NULL) {
+  if (profile == nullptr) {
     dummy_profile_.reset(new DummyProfile);
     profile = dummy_profile_->profile();
   }
 
   // Check that another tracker's counters aren't already registered in the profile.
-  DCHECK(profile->GetCounter("BufferPoolInitialReservation") == NULL);
+  DCHECK(profile->GetCounter("BufferPoolInitialReservation") == nullptr);
   counters_.reservation_limit =
       ADD_COUNTER(profile, "BufferPoolReservationLimit", TUnit::BYTES);
   counters_.peak_reservation =
@@ -112,9 +111,7 @@ void ReservationTracker::InitCounters(
 
   COUNTER_SET(counters_.reservation_limit, reservation_limit);
 
-  if (mem_tracker_ != NULL && profile_provided) {
-    mem_tracker_->EnableReservationReporting(counters_);
-  }
+  if (mem_tracker_ != nullptr) mem_tracker_->EnableReservationReporting(counters_);
 }
 
 void ReservationTracker::Close() {
@@ -124,9 +121,9 @@ void ReservationTracker::Close() {
   DCHECK_EQ(used_reservation_, 0);
   DCHECK_EQ(child_reservations_, 0);
   // Release any reservation to parent.
-  if (parent_ != NULL) DecreaseReservationInternalLocked(reservation_, false);
-  mem_tracker_ = NULL;
-  parent_ = NULL;
+  if (parent_ != nullptr) DecreaseReservationLocked(reservation_, false);
+  mem_tracker_ = nullptr;
+  parent_ = nullptr;
   initialized_ = false;
 }
 
@@ -154,17 +151,17 @@ bool ReservationTracker::IncreaseReservationInternalLocked(
   } else if (reservation_increase == 0) {
     granted = true;
   } else {
-    if (parent_ == NULL) {
+    if (parent_ == nullptr) {
       granted = true;
     } else {
       lock_guard<SpinLock> l(parent_->lock_);
       granted =
           parent_->IncreaseReservationInternalLocked(reservation_increase, true, true);
     }
-    if (granted && !TryUpdateMemTracker(reservation_increase)) {
+    if (granted && !TryConsumeFromMemTracker(reservation_increase)) {
       granted = false;
       // Roll back changes to ancestors if MemTracker update fails.
-      parent_->DecreaseReservationInternal(reservation_increase, true);
+      parent_->DecreaseReservation(reservation_increase, true);
     }
   }
 
@@ -179,9 +176,10 @@ bool ReservationTracker::IncreaseReservationInternalLocked(
   return granted;
 }
 
-bool ReservationTracker::TryUpdateMemTracker(int64_t reservation_increase) {
-  if (mem_tracker_ == NULL) return true;
-  if (GetParentMemTracker() == NULL) {
+bool ReservationTracker::TryConsumeFromMemTracker(int64_t reservation_increase) {
+  DCHECK_GE(reservation_increase, 0);
+  if (mem_tracker_ == nullptr) return true;
+  if (GetParentMemTracker() == nullptr) {
     // At the topmost link, which may be a MemTracker with a limit, we need to use
     // TryConsume() to check the limit.
     return mem_tracker_->TryConsume(reservation_increase);
@@ -194,35 +192,125 @@ bool ReservationTracker::TryUpdateMemTracker(int64_t reservation_increase)
{
   }
 }
 
-void ReservationTracker::DecreaseReservation(int64_t bytes) {
-  DecreaseReservationInternal(bytes, false);
+void ReservationTracker::ReleaseToMemTracker(int64_t reservation_decrease) {
+  DCHECK_GE(reservation_decrease, 0);
+  if (mem_tracker_ == nullptr) return;
+  if (GetParentMemTracker() == nullptr) {
+    mem_tracker_->Release(reservation_decrease);
+  } else {
+    mem_tracker_->ReleaseLocal(reservation_decrease, GetParentMemTracker());
+  }
 }
 
-void ReservationTracker::DecreaseReservationInternal(
-    int64_t bytes, bool is_child_reservation) {
+void ReservationTracker::DecreaseReservation(int64_t bytes, bool is_child_reservation) {
   lock_guard<SpinLock> l(lock_);
-  DecreaseReservationInternalLocked(bytes, is_child_reservation);
+  DecreaseReservationLocked(bytes, is_child_reservation);
 }
 
-void ReservationTracker::DecreaseReservationInternalLocked(
+void ReservationTracker::DecreaseReservationLocked(
     int64_t bytes, bool is_child_reservation) {
   DCHECK(initialized_);
   DCHECK_GE(reservation_, bytes);
   if (bytes == 0) return;
   if (is_child_reservation) child_reservations_ -= bytes;
   UpdateReservation(-bytes);
+  ReleaseToMemTracker(bytes);
   // The reservation should be returned up the tree.
-  if (mem_tracker_ != NULL) {
-    if (GetParentMemTracker() == NULL) {
-      mem_tracker_->Release(bytes);
-    } else {
-      mem_tracker_->ReleaseLocal(bytes, GetParentMemTracker());
-    }
-  }
-  if (parent_ != NULL) parent_->DecreaseReservationInternal(bytes, true);
+  if (parent_ != nullptr) parent_->DecreaseReservation(bytes, true);
   CheckConsistency();
 }
 
+bool ReservationTracker::TransferReservationTo(ReservationTracker* other, int64_t bytes)
{
+  if (other == this) return true;
+  // Find the path to the root from both. The root is guaranteed to be a common ancestor.
+  vector<ReservationTracker*> path_to_common = FindPathToRoot();
+  vector<ReservationTracker*> other_path_to_common = other->FindPathToRoot();
+  DCHECK_EQ(path_to_common.back(), other_path_to_common.back());
+  ReservationTracker* common_ancestor = path_to_common.back();
+  // Remove any common ancestors - they do not need to be updated for this transfer.
+  while (!path_to_common.empty() && !other_path_to_common.empty()
+      && path_to_common.back() == other_path_to_common.back()) {
+    common_ancestor = path_to_common.back();
+    path_to_common.pop_back();
+    other_path_to_common.pop_back();
+  }
+
+  // At this point, we have three cases:
+  // 1. 'common_ancestor' == 'other'. 'other_path_to_common' is empty because 'other' is
+  //    the lowest common ancestor. To transfer, we decrease the reservation on the
+  //    trackers under 'other', down to 'this'.
+  // 2. 'common_ancestor' == 'this'. 'path_to_common' is empty because 'this' is the
+  //    lowest common ancestor. To transfer, we increase the reservation on the trackers
+  //    under 'this', down to 'other'.
+  // 3. Neither is an ancestor of the other. Both 'other_path_to_common' and
+  //    'path_to_common' are non-empty. We increase the reservation on trackers from
+  //    'other' up to one below the common ancestor (checking limits as needed) and if
+  //    successful, decrease reservations on trackers from 'this' up to one below the
+  //    common ancestor.
+
+  // Lock all of the trackers so we can do the update atomically. Need to be careful to
+  // lock subtrees in the correct order.
+  vector<unique_lock<SpinLock>> locks;
+  bool lock_first = path_to_common.empty() || other_path_to_common.empty()
+      || lock_sibling_subtree_first(path_to_common.back(), other_path_to_common.back());
+  if (lock_first) {
+    for (ReservationTracker* tracker : path_to_common) locks.emplace_back(tracker->lock_);
+  }
+  for (ReservationTracker* tracker : other_path_to_common) {
+    locks.emplace_back(tracker->lock_);
+  }
+  if (!lock_first) {
+    for (ReservationTracker* tracker : path_to_common) locks.emplace_back(tracker->lock_);
+  }
+
+  // Check reservation limits will not be violated before applying any updates.
+  for (ReservationTracker* tracker : other_path_to_common) {
+    if (tracker->reservation_ + bytes > tracker->reservation_limit_) return false;
+  }
+
+  // Do the updates now that we have checked the limits. We're holding all the locks
+  // so this is all atomic.
+  for (ReservationTracker* tracker : other_path_to_common) {
+    tracker->UpdateReservation(bytes);
+    // We don't handle MemTrackers with limit in this function - this should always
+    // succeed.
+    DCHECK(tracker->mem_tracker_ == nullptr || !tracker->mem_tracker_->has_limit());
+    bool success = tracker->TryConsumeFromMemTracker(bytes);
+    DCHECK(success);
+    if (tracker != other_path_to_common[0]) tracker->child_reservations_ += bytes;
+  }
+  for (ReservationTracker* tracker : path_to_common) {
+    if (tracker != path_to_common[0]) tracker->child_reservations_ -= bytes;
+    tracker->UpdateReservation(-bytes);
+    tracker->ReleaseToMemTracker(bytes);
+  }
+
+  // Update the 'child_reservations_' on the common ancestor if needed.
+  // Case 1: reservation was pushed up to 'other'.
+  if (common_ancestor == other) {
+    lock_guard<SpinLock> l(other->lock_);
+    other->child_reservations_ -= bytes;
+    other->CheckConsistency();
+  }
+  // Case 2: reservation was pushed down below 'this'.
+  if (common_ancestor == this) {
+    lock_guard<SpinLock> l(lock_);
+    child_reservations_ += bytes;
+    CheckConsistency();
+  }
+  return true;
+}
+
+vector<ReservationTracker*> ReservationTracker::FindPathToRoot() {
+  vector<ReservationTracker*> path_to_root;
+  ReservationTracker* curr = this;
+  do {
+    path_to_root.push_back(curr);
+    curr = curr->parent_;
+  } while (curr != nullptr);
+  return path_to_root;
+}
+
 void ReservationTracker::AllocateFrom(int64_t bytes) {
   lock_guard<SpinLock> l(lock_);
   DCHECK(initialized_);
@@ -296,7 +384,7 @@ string ReservationTracker::DebugString() {
   lock_guard<SpinLock> l(lock_);
   if (!initialized_) return "<ReservationTracker>: uninitialized";
 
-  string parent_debug_string = parent_ == NULL ? "NULL" : parent_->DebugString();
+  string parent_debug_string = parent_ == nullptr ? "NULL" : parent_->DebugString();
   return Substitute(
       "<ReservationTracker>: reservation_limit $0 reservation $1 used_reservation $2
"
       "child_reservations $3 parent:\n$4",

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5ec1984f/be/src/runtime/bufferpool/reservation-tracker.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/reservation-tracker.h b/be/src/runtime/bufferpool/reservation-tracker.h
index 3d40fcc..5be2c23 100644
--- a/be/src/runtime/bufferpool/reservation-tracker.h
+++ b/be/src/runtime/bufferpool/reservation-tracker.h
@@ -127,11 +127,14 @@ class ReservationTracker {
   /// Returns true if the reservation increase was successful or not necessary.
   bool IncreaseReservationToFit(int64_t bytes) WARN_UNUSED_RESULT;
 
-  /// Decrease tracker's reservation by 'bytes'. This tracker's reservation must be at
-  /// least 'bytes' before calling this method.
-  /// TODO: decide on and implement policy for how far to release the reservation up
-  /// the tree. Currently the reservation is released all the way to the root.
-  void DecreaseReservation(int64_t bytes);
+  /// Transfer reservation from this tracker to 'other'. Both trackers must be in the
+  /// same query subtree of the hierarchy. One tracker can be the ancestor of the other,
+  /// or they can share a common ancestor. The subtree root must be at the query level
+  /// or below so that the transfer cannot cause a MemTracker limit to be exceeded
+  /// (because linked MemTrackers with limits below the query level are not supported).
+  /// Returns true on success or false if the transfer would have caused a reservation
+  /// limit to be exceeded.
+  bool TransferReservationTo(ReservationTracker* other, int64_t bytes);
 
   /// Allocate 'bytes' from the reservation. The tracker must have at least 'bytes'
   /// unused reservation before calling this method.
@@ -181,19 +184,38 @@ class ReservationTracker {
   bool IncreaseReservationInternalLocked(
       int64_t bytes, bool use_existing_reservation, bool is_child_reservation);
 
-  /// Update consumption on linked MemTracker. For the topmost link, return false if
-  /// this failed because it would exceed a memory limit. If there is no linked
-  /// MemTracker, just returns true.
+  /// Increase consumption on linked MemTracker to reflect an increase in reservation
+  /// of 'reservation_increase'. For the topmost link, return false if this failed
+  /// because it would exceed a memory limit. If there is no linked MemTracker, just
+  /// returns true.
   /// TODO: remove once we account all memory via ReservationTrackers.
-  bool TryUpdateMemTracker(int64_t reservation_increase);
-
-  /// Internal helper for DecreaseReservation(). This behaves the same as
-  /// DecreaseReservation(), except when 'is_child_reservation' is true it decreases
-  /// 'child_reservations_' by 'bytes'.
-  void DecreaseReservationInternal(int64_t bytes, bool is_child_reservation);
+  bool TryConsumeFromMemTracker(int64_t reservation_increase);
 
-  /// Same as DecreaseReservationInternal(), but 'lock_' must be held by caller.
-  void DecreaseReservationInternalLocked(int64_t bytes, bool is_child_reservation);
+  /// Decrease consumption on linked MemTracker to reflect a decrease in reservation of
+  /// 'reservation_decrease'. If there is no linked MemTracker, does nothing.
+  /// TODO: remove once we account all memory via ReservationTrackers.
+  void ReleaseToMemTracker(int64_t reservation_decrease);
+
+  /// Decrease reservation by 'bytes' on this tracker and all ancestors. This tracker's
+  /// reservation must be at least 'bytes' before calling this method. If
+  /// 'is_child_reservation' is true it decreases 'child_reservations_' by 'bytes'
+  void DecreaseReservation(int64_t bytes, bool is_child_reservation);
+
+  /// Same as DecreaseReservation(), but 'lock_' must be held by caller.
+  void DecreaseReservationLocked(int64_t bytes, bool is_child_reservation);
+
+  /// Return a vector containing the trackers on the path to the root tracker. Includes
+  /// the current tracker and the root tracker.
+  std::vector<ReservationTracker*> FindPathToRoot();
+
+  /// Return true if trackers in the subtree rooted at 'subtree1' precede trackers in
+  /// the subtree rooted at 'subtree2' in the lock order. 'subtree1' and 'subtree2'
+  /// must share the same parent.
+  static bool lock_sibling_subtree_first(
+      ReservationTracker* subtree1, ReservationTracker* subtree2) {
+    DCHECK_EQ(subtree1->parent_, subtree2->parent_);
+    return reinterpret_cast<uintptr_t>(subtree1) < reinterpret_cast<uintptr_t>(subtree2);
+  }
 
   /// Check the internal consistency of the ReservationTracker and DCHECKs if in an
   /// inconsistent state.
@@ -208,8 +230,16 @@ class ReservationTracker {
   /// 'lock_' must be held by caller.
   void UpdateReservation(int64_t delta);
 
-  /// lock_ protects all members. In a hierarchy of trackers, locks must be acquired
-  /// from the bottom-up.
+  /// lock_ protects all members. The lock order in a tree of ReservationTrackers is
+  /// based on a post-order traversal of the tree, with children visited in order of the
+  /// memory address of the ReservationTracker object. The following rules can be applied
+  /// to determine the relative positions of two trackers t1 and t2 in the lock order:
+  /// * If t1 is a descendent of t2, t1's lock must be acquired before t2's lock (i.e.
+  ///   locks are acquired bottom-up).
+  /// * If neither t1 or t2 is a descendant of the other, they must be in subtrees of
+  ///   under a common ancestor. If the memory address of t1's subtree's root is less
+  ///   than the memory address of t2's subtree's root, t1's lock must be acquired before
+  ///   t2's lock. This check is implemented in lock_sibling_subtree_first().
   SpinLock lock_;
 
   /// True if the tracker is initialized.


Mime
View raw message