impala-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tarmstr...@apache.org
Subject [5/5] incubator-impala git commit: IMPALA-5113: fix dirty unpinned invariant
Date Mon, 27 Mar 2017 23:13:54 GMT
IMPALA-5113: fix dirty unpinned invariant

There were two bugs:
* The invariant was too strict and didn't take into account multiple
  pins of pages (which don't use buffers and therefore shouldn't count).
* The invariant wasn't enforced when reclaiming a clean page.

Change the logic so that it's implemented in terms of pages/buffers in
various states (this avoids the reservation double-counting and more
directly expresses the intent). To aid in this, refactor the page lists
to use a wrapper that tracks the # of bytes of pages in each list.

Testing:
Added a unit test that reproduces the issue and added stricter DCHECKs
to detect the issue in future.

Change-Id: I07e08acb6cf6839bfccbd09258c093b1c8252b25
Reviewed-on: http://gerrit.cloudera.org:8080/6469
Reviewed-by: Tim Armstrong <tarmstrong@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/874d20d0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/874d20d0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/874d20d0

Branch: refs/heads/master
Commit: 874d20d0fabaf0572202a6e65dc55d334229ca6b
Parents: e98c88f
Author: Tim Armstrong <tarmstrong@cloudera.com>
Authored: Fri Mar 24 08:52:56 2017 -0700
Committer: Impala Public Jenkins <impala-public-jenkins@gerrit.cloudera.org>
Committed: Mon Mar 27 22:52:33 2017 +0000

----------------------------------------------------------------------
 .../runtime/bufferpool/buffer-pool-internal.h   | 242 ++++++++++++-------
 be/src/runtime/bufferpool/buffer-pool-test.cc   |  32 +++
 be/src/runtime/bufferpool/buffer-pool.cc        | 130 +++++-----
 be/src/runtime/bufferpool/buffer-pool.h         |   1 +
 be/src/util/internal-queue.h                    |  10 +-
 5 files changed, 263 insertions(+), 152 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/874d20d0/be/src/runtime/bufferpool/buffer-pool-internal.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool-internal.h b/be/src/runtime/bufferpool/buffer-pool-internal.h
index 04d76bb..f0dfa09 100644
--- a/be/src/runtime/bufferpool/buffer-pool-internal.h
+++ b/be/src/runtime/bufferpool/buffer-pool-internal.h
@@ -56,15 +56,14 @@
 /// reservations are not overcommitted (they shouldn't be), this global invariant can be
 /// maintained by enforcing a local invariant for every client:
 ///
-///   unused reservation >= dirty unpinned pages
+///   reservation >= BufferHandles returned to client
+//                   + pinned pages + dirty pages (dirty unpinned or write in flight)
 ///
 /// The local invariant is maintained by writing pages to disk as the first step of any
-/// operation that uses reservation. I.e. the R.H.S. of the invariant must be decreased
-/// before the L.H.S. can be decreased. These operations block waiting for enough writes
-/// to complete to satisfy the invariant.
-/// TODO: this invariant can be broken if a client calls DecreaseReservation() on the
-/// ReservationTracker. We should refactor so that DecreaseReservation() goes through
-/// the client before closing IMPALA-3202.
+/// operation that allocates a new buffer or reclaims buffers from clean pages. I.e.
+/// "dirty pages" must be decreased before one of the other values on the R.H.S. of the
+/// invariant can be increased. Operations block waiting for enough writes to complete
+/// to satisfy the invariant.
 
 #ifndef IMPALA_RUNTIME_BUFFER_POOL_INTERNAL_H
 #define IMPALA_RUNTIME_BUFFER_POOL_INTERNAL_H
@@ -79,8 +78,106 @@
 #include "runtime/bufferpool/reservation-tracker.h"
 #include "util/condition-variable.h"
 
+// Ensure that DCheckConsistency() function calls get removed in release builds.
+#ifndef NDEBUG
+#define DCHECK_CONSISTENCY() DCheckConsistency()
+#else
+#define DCHECK_CONSISTENCY()
+#endif
+
 namespace impala {
 
+/// The internal representation of a page, which can be pinned or unpinned. See the
+/// class comment for explanation of the different page states.
+struct BufferPool::Page : public InternalList<Page>::Node {
+  Page(Client* client, int64_t len) : client(client), len(len), pin_count(0) {}
+
+  std::string DebugString();
+
+  // Helper for BufferPool::DebugString().
+  static bool DebugStringCallback(std::stringstream* ss, BufferPool::Page* page);
+
+  /// The client that the page belongs to.
+  Client* const client;
+
+  /// The length of the page in bytes.
+  const int64_t len;
+
+  /// The pin count of the page. Only accessed in contexts that are passed the associated
+  /// PageHandle, so it cannot be accessed by multiple threads concurrently.
+  int pin_count;
+
+  /// Non-null if there is a write in flight, the page is clean, or the page is evicted.
+  std::unique_ptr<TmpFileMgr::WriteHandle> write_handle;
+
+  /// Condition variable signalled when a write for this page completes. Protected by
+  /// client->lock_.
+  ConditionVariable write_complete_cv_;
+
+  /// This lock must be held when accessing 'buffer' if the page is unpinned and not
+  /// evicted (i.e. it is safe to access 'buffer' if the page is pinned or evicted).
+  SpinLock buffer_lock;
+
+  /// Buffer with the page's contents. Closed only iff page is evicted. Open otherwise.
+  BufferHandle buffer;
+};
+
+/// Wrapper around InternalList<Page> that tracks the # of bytes in the list.
+class BufferPool::PageList {
+ public:
+  PageList() : bytes_(0) {}
+  ~PageList() {
+    // Clients always empty out their list before destruction.
+    DCHECK(list_.empty());
+    DCHECK_EQ(0, bytes_);
+  }
+
+  void Enqueue(Page* page) {
+    list_.Enqueue(page);
+    bytes_ += page->len;
+  }
+
+  bool Remove(Page* page) {
+    if (list_.Remove(page)) {
+      bytes_ -= page->len;
+      return true;
+    }
+    return false;
+  }
+
+  Page* Dequeue() {
+    Page* page = list_.Dequeue();
+    if (page != nullptr) {
+      bytes_ -= page->len;
+    }
+    return page;
+  }
+
+  Page* PopBack() {
+    Page* page = list_.PopBack();
+    if (page != nullptr) {
+      bytes_ -= page->len;
+    }
+    return page;
+  }
+
+  void Iterate(boost::function<bool(Page*)> fn) { list_.Iterate(fn); }
+  bool Contains(Page* page) { return list_.Contains(page); }
+  Page* tail() { return list_.tail(); }
+  bool empty() const { return list_.empty(); }
+  int size() const { return list_.size(); }
+  int64_t bytes() const { return bytes_; }
+
+  void DCheckConsistency() {
+    DCHECK_GE(bytes_, 0);
+    DCHECK_EQ(list_.empty(), bytes_ == 0);
+  }
+
+ private:
+  InternalList<Page> list_;
+  int64_t bytes_;
+};
+
 /// The internal state for the client.
 class BufferPool::Client {
  public:
@@ -90,18 +187,15 @@ class BufferPool::Client {
 
   ~Client() {
     DCHECK_EQ(0, num_pages_);
-    DCHECK_EQ(0, pinned_pages_.size());
-    DCHECK_EQ(0, dirty_unpinned_pages_.size());
-    DCHECK_EQ(0, in_flight_write_pages_.size());
+    DCHECK_EQ(0, buffers_allocated_bytes_);
   }
 
   /// Release reservation for this client.
   void Close() { reservation_.Close(); }
 
-  /// Add a new pinned page 'page' to the pinned pages list. 'page' must not be in any
-  /// other lists. Neither the client's lock nor page->buffer_lock should be held by the
-  /// caller.
-  void AddNewPinnedPage(Page* page);
+  /// Create a pinned page using 'buffer', which was allocated using AllocateBuffer().
+  /// No client or page locks should be held by the caller.
+  Page* CreatePinnedPage(BufferHandle&& buffer);
 
   /// Reset 'handle', clean up references to handle->page and release any resources
   /// associated with handle->page. If the page is pinned, 'out_buffer' can be passed
in
@@ -113,7 +207,7 @@ class BufferPool::Client {
   /// Updates client state to reflect that 'page' is now a dirty unpinned page. May
   /// initiate writes for this or other dirty unpinned pages.
   /// Neither the client's lock nor page->buffer_lock should be held by the caller.
-  void MoveToDirtyUnpinned(int64_t unused_reservation, Page* page);
+  void MoveToDirtyUnpinned(Page* page);
 
   /// Move an unpinned page to the pinned state, moving between data structures and
   /// reading from disk if necessary. Returns once the page's buffer is allocated
@@ -121,26 +215,24 @@ class BufferPool::Client {
   /// handle->page_->buffer_lock should be held by the caller.
   Status MoveToPinned(ClientHandle* client, PageHandle* handle);
 
-  /// Must be called before allocating a buffer to ensure that the client can allocate
-  /// 'allocation_len' bytes without pinned bytes plus dirty unpinned bytes exceeding the
-  /// client's reservation. No page or client locks should be held by the caller.
-  Status CleanPagesBeforeAllocation(
-      ReservationTracker* reservation, int64_t allocation_len);
-
-  /// Same as CleanPagesBeforeAllocation(), except 'lock_' must be held by 'client_lock'.
-  /// 'client_lock' may be released temporarily while waiting for writes to complete.
-  Status CleanPagesBeforeAllocationLocked(boost::unique_lock<boost::mutex>* client_lock,
-      ReservationTracker* reservation, int64_t allocation_len);
-
-  /// Initiates asynchronous writes of dirty unpinned pages to disk. Ensures that at
-  /// least 'min_bytes_to_write' bytes of writes will be written asynchronously. May
-  /// start writes more aggressively so that I/O and compute can be overlapped. If
-  /// any errors are encountered, 'write_status_' is set. 'write_status_' must therefore
-  /// be checked before reading back any pages. 'lock_' must be held by the caller.
-  void WriteDirtyPagesAsync(int64_t min_bytes_to_write = 0);
+  /// Must be called once before allocating a buffer of 'len' via the AllocateBuffer()
+  /// API to deduct from the client's reservation and update internal accounting. Cleans
+  /// dirty pages if needed to satisfy the buffer pool's internal invariants. No page or
+  /// client locks should be held by the caller.
+  Status PrepareToAllocateBuffer(int64_t len);
+
+  /// Called after a buffer of 'len' is freed via the FreeBuffer() API to update
+  /// internal accounting and release the buffer to the client's reservation. No page or
+  /// client locks should be held by the caller.
+  void FreedBuffer(int64_t len) {
+    boost::lock_guard<boost::mutex> cl(lock_);
+    reservation_.ReleaseTo(len);
+    buffers_allocated_bytes_ -= len;
+    DCHECK_CONSISTENCY();
+  }
 
   /// Wait for the in-flight write for 'page' to complete.
-  /// 'lock_' must be held by the caller via 'client_lock'. page->bufffer_lock should
+  /// 'lock_' must be held by the caller via 'client_lock'. page->buffer_lock should
   /// not be held.
   void WaitForWrite(boost::unique_lock<boost::mutex>* client_lock, Page* page);
 
@@ -158,17 +250,32 @@ class BufferPool::Client {
  private:
   // Check consistency of client, DCHECK if inconsistent. 'lock_' must be held.
   void DCheckConsistency() {
-    DCHECK_GE(in_flight_write_bytes_, 0);
-    DCHECK_LE(in_flight_write_bytes_, dirty_unpinned_bytes_);
+    DCHECK_GE(buffers_allocated_bytes_, 0);
+    pinned_pages_.DCheckConsistency();
+    dirty_unpinned_pages_.DCheckConsistency();
+    in_flight_write_pages_.DCheckConsistency();
     DCHECK_LE(pinned_pages_.size() + dirty_unpinned_pages_.size()
             + in_flight_write_pages_.size(),
         num_pages_);
-    if (in_flight_write_pages_.empty()) DCHECK_EQ(0, in_flight_write_bytes_);
-    if (in_flight_write_pages_.empty() && dirty_unpinned_pages_.empty()) {
-      DCHECK_EQ(0, dirty_unpinned_bytes_);
-    }
+    // Check that we flushed enough pages to disk given our eviction policy.
+    DCHECK_GE(reservation_.GetReservation(), buffers_allocated_bytes_
+            + pinned_pages_.bytes() + dirty_unpinned_pages_.bytes()
+            + in_flight_write_pages_.bytes());
   }
 
+  /// Must be called once before allocating or reclaiming a buffer of 'len'. Ensures that
+  /// enough dirty pages are flushed to disk to satisfy the buffer pool's internal
+  /// invariants after the allocation. 'lock_' should be held by the caller via
+  /// 'client_lock'
+  Status CleanPages(boost::unique_lock<boost::mutex>* client_lock, int64_t len);
+
+  /// Initiates asynchronous writes of dirty unpinned pages to disk. Ensures that at
+  /// least 'min_bytes_to_write' bytes of writes will be written asynchronously. May
+  /// start writes more aggressively so that I/O and compute can be overlapped. If
+  /// any errors are encountered, 'write_status_' is set. 'write_status_' must therefore
+  /// be checked before reading back any pages. 'lock_' must be held by the caller.
+  void WriteDirtyPagesAsync(int64_t min_bytes_to_write = 0);
+
   /// Called when a write for 'page' completes.
   void WriteCompleteCallback(Page* page, const Status& write_status);
 
@@ -217,61 +324,20 @@ class BufferPool::Client {
   /// pages are destroyed before the client.
   int64_t num_pages_;
 
-  /// All pinned pages for this client. Only used for debugging.
-  InternalList<Page> pinned_pages_;
+  /// Total bytes of buffers in BufferHandles returned to clients (i.e. obtained from
+  /// AllocateBuffer() or ExtractBuffer()).
+  int64_t buffers_allocated_bytes_;
+
+  /// All pinned pages for this client.
+  PageList pinned_pages_;
 
   /// Dirty unpinned pages for this client for which writes are not in flight. Page
   /// writes are started in LIFO order, because operators typically have sequential access
   /// patterns where the most recently evicted page will be last to be read.
-  InternalList<Page> dirty_unpinned_pages_;
+  PageList dirty_unpinned_pages_;
 
   /// Dirty unpinned pages for this client for which writes are in flight.
-  InternalList<Page> in_flight_write_pages_;
-
-  /// Total bytes of dirty unpinned pages for this client.
-  int64_t dirty_unpinned_bytes_;
-
-  /// Total bytes of in-flight writes for dirty unpinned pages. Bytes accounted here
-  /// are also accounted in 'dirty_unpinned_bytes_'.
-  int64_t in_flight_write_bytes_;
-};
-
-/// The internal representation of a page, which can be pinned or unpinned. See the
-/// class comment for explanation of the different page states.
-///
-/// Code manipulating the page is responsible for acquiring 'lock' when reading or
-/// modifying the page.
-struct BufferPool::Page : public InternalList<Page>::Node {
-  Page(Client* client, int64_t len) : client(client), len(len), pin_count(0) {}
-
-  std::string DebugString();
-
-  // Helper for BufferPool::DebugString().
-  static bool DebugStringCallback(std::stringstream* ss, BufferPool::Page* page);
-
-  /// The client that the page belongs to.
-  Client* const client;
-
-  /// The length of the page in bytes.
-  const int64_t len;
-
-  /// The pin count of the page. Only accessed in contexts that are passed the associated
-  /// PageHandle, so it cannot be accessed by multiple threads concurrently.
-  int pin_count;
-
-  /// Non-null if there is a write in flight, the page is clean, or the page is evicted.
-  std::unique_ptr<TmpFileMgr::WriteHandle> write_handle;
-
-  /// Condition variable signalled when a write for this page completes. Protected by
-  /// client->lock_.
-  ConditionVariable write_complete_cv_;
-
-  /// This lock must be held when accessing 'buffer' if the page is unpinned and not
-  /// evicted (i.e. it is safe to access 'buffer' if the page is pinned or evicted).
-  SpinLock buffer_lock;
-
-  /// Buffer with the page's contents. Closed only iff page is evicted. Open otherwise.
-  BufferHandle buffer;
+  PageList in_flight_write_pages_;
 };
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/874d20d0/be/src/runtime/bufferpool/buffer-pool-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool-test.cc b/be/src/runtime/bufferpool/buffer-pool-test.cc
index d4640ac..c71ab60 100644
--- a/be/src/runtime/bufferpool/buffer-pool-test.cc
+++ b/be/src/runtime/bufferpool/buffer-pool-test.cc
@@ -636,6 +636,38 @@ TEST_F(BufferPoolTest, EvictPageDifferentClient) {
   pool.FreeBuffer(&clients[1], &buffer);
   for (BufferPool::ClientHandle& client : clients) pool.DeregisterClient(&client);
 }
+
+/// Regression test for IMPALA-5113 where the page flushing invariant didn't correctly
+/// take multiply pinned pages into account.
+TEST_F(BufferPoolTest, MultiplyPinnedPageAccounting) {
+  const int NUM_BUFFERS = 3;
+  const int64_t TOTAL_BYTES = NUM_BUFFERS * TEST_BUFFER_LEN;
+  global_reservations_.InitRootTracker(NULL, TOTAL_BYTES);
+  BufferPool pool(TEST_BUFFER_LEN, TOTAL_BYTES);
+
+  BufferPool::ClientHandle client;
+  RuntimeProfile* profile = NewProfile();
+  ASSERT_OK(pool.RegisterClient("test client", NewFileGroup(), &global_reservations_,
+      NULL, TOTAL_BYTES, profile, &client));
+  ASSERT_TRUE(client.IncreaseReservation(TOTAL_BYTES));
+
+  BufferPool::PageHandle handle1, handle2;
+  BufferPool::BufferHandle buffer;
+  ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle1));
+  ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle2));
+  pool.Unpin(&client, &handle1);
+  ASSERT_OK(pool.Pin(&client, &handle2));
+  ASSERT_OK(pool.AllocateBuffer(&client, TEST_BUFFER_LEN, &buffer));
+
+  // We shouldn't need to flush anything to disk since we have only three pages/buffers in
+  // memory. Rely on DCHECKs to check invariants and check we didn't evict the page.
+  EXPECT_FALSE(IsEvicted(&handle1)) << handle1.DebugString();
+
+  pool.DestroyPage(&client, &handle1);
+  pool.DestroyPage(&client, &handle2);
+  pool.FreeBuffer(&client, &buffer);
+  pool.DeregisterClient(&client);
+}
 }
 
 int main(int argc, char** argv) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/874d20d0/be/src/runtime/bufferpool/buffer-pool.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool.cc b/be/src/runtime/bufferpool/buffer-pool.cc
index 2503beb..482db10 100644
--- a/be/src/runtime/bufferpool/buffer-pool.cc
+++ b/be/src/runtime/bufferpool/buffer-pool.cc
@@ -127,12 +127,8 @@ Status BufferPool::CreatePage(ClientHandle* client, int64_t len, PageHandle*
han
   BufferHandle buffer;
   // No changes have been made to state yet, so we can cleanly return on error.
   RETURN_IF_ERROR(AllocateBuffer(client, len, &buffer));
-
-  Page* page = new Page(client->impl_, len);
-  page->buffer = std::move(buffer);
+  Page* page = client->impl_->CreatePinnedPage(move(buffer));
   handle->Open(page, client);
-  page->pin_count++;
-  client->impl_->AddNewPinnedPage(page);
   return Status::OK();
 }
 
@@ -178,7 +174,7 @@ void BufferPool::Unpin(ClientHandle* client, PageHandle* handle) {
   reservation->ReleaseTo(page->len);
 
   if (--page->pin_count > 0) return;
-  client->impl_->MoveToDirtyUnpinned(reservation->GetUnusedReservation(), page);
+  client->impl_->MoveToDirtyUnpinned(page);
   COUNTER_ADD(client->impl_->counters().total_unpinned_bytes, handle->len());
   COUNTER_ADD(client->impl_->counters().peak_unpinned_bytes, handle->len());
 }
@@ -199,10 +195,13 @@ void BufferPool::ExtractBuffer(
 
 Status BufferPool::AllocateBuffer(
     ClientHandle* client, int64_t len, BufferHandle* handle) {
-  ReservationTracker* reservation = client->impl_->reservation();
-  RETURN_IF_ERROR(client->impl_->CleanPagesBeforeAllocation(reservation, len));
-  reservation->AllocateFrom(len);
-  return AllocateBufferInternal(client, len, handle);
+  RETURN_IF_ERROR(client->impl_->PrepareToAllocateBuffer(len));
+  Status status = AllocateBufferInternal(client, len, handle);
+  if (!status.ok()) {
+    // Allocation failed - update client's accounting to reflect the failure.
+    client->impl_->FreedBuffer(len);
+  }
+  return status;
 }
 
 Status BufferPool::AllocateBufferInternal(
@@ -232,8 +231,9 @@ Status BufferPool::AllocateBufferInternal(
 void BufferPool::FreeBuffer(ClientHandle* client, BufferHandle* handle) {
   if (!handle->is_open()) return; // Should be idempotent.
   DCHECK_EQ(client, handle->client_);
-  client->impl_->reservation()->ReleaseTo(handle->len_);
+  int64_t len = handle->len_;
   FreeBufferInternal(handle);
+  client->impl_->FreedBuffer(len);
 }
 
 void BufferPool::FreeBufferInternal(BufferHandle* handle) {
@@ -281,9 +281,7 @@ void BufferPool::AddCleanPage(const unique_lock<mutex>& client_lock,
Page* page)
 bool BufferPool::RemoveCleanPage(const unique_lock<mutex>& client_lock, Page* page)
{
   page->client->DCheckHoldsLock(client_lock);
   lock_guard<SpinLock> cpl(clean_pages_lock_);
-  bool found = clean_pages_.Contains(page);
-  if (found) clean_pages_.Remove(page);
-  return found;
+  return clean_pages_.Remove(page);
 }
 
 Status BufferPool::EvictCleanPages(int64_t bytes_to_evict) {
@@ -347,8 +345,7 @@ BufferPool::Client::Client(BufferPool* pool, TmpFileMgr::FileGroup* file_group,
     file_group_(file_group),
     name_(name),
     num_pages_(0),
-    dirty_unpinned_bytes_(0),
-    in_flight_write_bytes_(0) {
+    buffers_allocated_bytes_(0) {
   reservation_.InitChildTracker(
       profile, parent_reservation, mem_tracker, reservation_limit);
   counters_.get_buffer_time = ADD_TIMER(profile, "BufferPoolGetBufferTime");
@@ -364,11 +361,19 @@ BufferPool::Client::Client(BufferPool* pool, TmpFileMgr::FileGroup*
file_group,
       ADD_COUNTER(profile, "BufferPoolTotalUnpinnedBytes", TUnit::BYTES);
 }
 
-void BufferPool::Client::AddNewPinnedPage(Page* page) {
-  DCHECK_GT(page->pin_count, 0);
+BufferPool::Page* BufferPool::Client::CreatePinnedPage(BufferHandle&& buffer) {
+  Page* page = new Page(this, buffer.len());
+  page->buffer = move(buffer);
+  page->pin_count = 1;
+
   boost::lock_guard<boost::mutex> lock(lock_);
+  // The buffer is transferred to the page so will be accounted for in
+  // pinned_pages_.bytes() instead of buffers_allocated_bytes_.
+  buffers_allocated_bytes_ -= page->len;
   pinned_pages_.Enqueue(page);
   ++num_pages_;
+  DCHECK_CONSISTENCY();
+  return page;
 }
 
 void BufferPool::Client::DestroyPageInternal(
@@ -378,12 +383,8 @@ void BufferPool::Client::DestroyPageInternal(
   // Remove the page from the list that it is currently present in (if any).
   {
     unique_lock<mutex> cl(lock_);
-    if (pinned_pages_.Contains(page)) {
-      pinned_pages_.Remove(page);
-    } else if (dirty_unpinned_pages_.Contains(page)) {
-      dirty_unpinned_pages_.Remove(page);
-      dirty_unpinned_bytes_ -= page->len;
-    } else {
+    // First try to remove from the pinned or dirty unpinned lists.
+    if (!pinned_pages_.Remove(page) && !dirty_unpinned_pages_.Remove(page)) {
       // The page either has a write in flight, is clean, or is evicted.
       // Let the write complete, if in flight.
       WaitForWrite(&cl, page);
@@ -401,6 +402,7 @@ void BufferPool::Client::DestroyPageInternal(
   if (out_buffer != NULL) {
     DCHECK(page->buffer.is_open());
     *out_buffer = std::move(page->buffer);
+    buffers_allocated_bytes_ += out_buffer->len();
   } else if (page->buffer.is_open()) {
     pool_->FreeBufferInternal(&page->buffer);
   }
@@ -408,15 +410,15 @@ void BufferPool::Client::DestroyPageInternal(
   handle->Reset();
 }
 
-void BufferPool::Client::MoveToDirtyUnpinned(int64_t unused_reservation, Page* page) {
+void BufferPool::Client::MoveToDirtyUnpinned(Page* page) {
   // Only valid to unpin pages if spilling is enabled.
   DCHECK(spilling_enabled());
   DCHECK_EQ(0, page->pin_count);
   unique_lock<mutex> lock(lock_);
+  DCHECK_CONSISTENCY();
   DCHECK(pinned_pages_.Contains(page));
   pinned_pages_.Remove(page);
   dirty_unpinned_pages_.Enqueue(page);
-  dirty_unpinned_bytes_ += page->len;
 
   // Check if we should initiate writes for this (or another) dirty page.
   WriteDirtyPagesAsync();
@@ -425,6 +427,7 @@ void BufferPool::Client::MoveToDirtyUnpinned(int64_t unused_reservation,
Page* p
 Status BufferPool::Client::MoveToPinned(ClientHandle* client, PageHandle* handle) {
   Page* page = handle->page_;
   unique_lock<mutex> cl(lock_);
+  DCHECK_CONSISTENCY();
   // Propagate any write errors that occurred for this client.
   RETURN_IF_ERROR(write_status_);
 
@@ -436,13 +439,15 @@ Status BufferPool::Client::MoveToPinned(ClientHandle* client, PageHandle*
handle
     lock_guard<SpinLock> pl(page->buffer_lock);
     evicted = !page->buffer.is_open();
   }
-  if (evicted) return MoveEvictedToPinned(&cl, client, handle);
+  if (evicted) {
+    // We may need to clean some pages to allocate a buffer for the evicted page.
+    RETURN_IF_ERROR(CleanPages(&cl, page->len));
+    return MoveEvictedToPinned(&cl, client, handle);
+  }
 
-  if (dirty_unpinned_pages_.Contains(page)) {
+  if (dirty_unpinned_pages_.Remove(page)) {
     // No writes were initiated for the page - just move it back to the pinned state.
-    dirty_unpinned_pages_.Remove(page);
     pinned_pages_.Enqueue(page);
-    dirty_unpinned_bytes_ -= page->len;
     return Status::OK();
   }
   if (in_flight_write_pages_.Contains(page)) {
@@ -451,6 +456,10 @@ Status BufferPool::Client::MoveToPinned(ClientHandle* client, PageHandle*
handle
     WaitForWrite(&cl, page);
     RETURN_IF_ERROR(write_status_); // The write may have set 'write_status_'.
   }
+
+  // At this point we need to either reclaim a clean page or allocate a new buffer.
+  // We may need to clean some pages to do so.
+  RETURN_IF_ERROR(CleanPages(&cl, page->len));
   if (pool_->RemoveCleanPage(cl, page)) {
     // The clean page still has an associated buffer. Just clean up the write, restore
     // the data, and move the page back to the pinned state.
@@ -471,8 +480,6 @@ Status BufferPool::Client::MoveEvictedToPinned(
     unique_lock<mutex>* client_lock, ClientHandle* client, PageHandle* handle) {
   Page* page = handle->page_;
   DCHECK(!page->buffer.is_open());
-  RETURN_IF_ERROR(CleanPagesBeforeAllocationLocked(
-      client_lock, client->impl_->reservation(), page->len));
 
   // Don't hold any locks while allocating or reading back the data. It is safe to modify
   // the page's buffer handle without holding any locks because no concurrent operations
@@ -490,35 +497,43 @@ Status BufferPool::Client::MoveEvictedToPinned(
   file_group_->DestroyWriteHandle(move(page->write_handle));
   client_lock->lock();
   pinned_pages_.Enqueue(page);
+  DCHECK_CONSISTENCY();
   return Status::OK();
 }
 
-Status BufferPool::Client::CleanPagesBeforeAllocation(
-    ReservationTracker* reservation, int64_t allocation_len) {
+Status BufferPool::Client::PrepareToAllocateBuffer(int64_t len) {
   unique_lock<mutex> lock(lock_);
-  return CleanPagesBeforeAllocationLocked(&lock, reservation, allocation_len);
+  // Clean enough pages to allow allocation to proceed without violating our eviction
+  // policy. This can fail, so only update the accounting once success is ensured.
+  RETURN_IF_ERROR(CleanPages(&lock, len));
+  reservation_.AllocateFrom(len);
+  buffers_allocated_bytes_ += len;
+  DCHECK_CONSISTENCY();
+  return Status::OK();
 }
 
-Status BufferPool::Client::CleanPagesBeforeAllocationLocked(
-    unique_lock<mutex>* client_lock, ReservationTracker* reservation,
-    int64_t allocation_len) {
+Status BufferPool::Client::CleanPages(unique_lock<mutex>* client_lock, int64_t len)
{
   DCheckHoldsLock(*client_lock);
-  int64_t unused_reservation = reservation->GetUnusedReservation();
-  DCHECK_LE(allocation_len, unused_reservation);
-  int64_t unused_reservation_after_alloc = unused_reservation - allocation_len;
+  DCHECK_CONSISTENCY();
+  // Work out what we need to get bytes of dirty unpinned + in flight pages down to
+  // in order to satisfy the eviction policy.
+  int64_t target_dirty_bytes = reservation_.GetReservation() - buffers_allocated_bytes_
+      - pinned_pages_.bytes() - len;
   // Start enough writes to ensure that the loop condition below will eventually become
   // false (or a write error will be encountered).
-  int64_t min_in_flight_bytes = dirty_unpinned_bytes_ - unused_reservation_after_alloc;
-  WriteDirtyPagesAsync(max<int64_t>(0, min_in_flight_bytes - in_flight_write_bytes_));
+  int64_t min_bytes_to_write =
+      max<int64_t>(0, dirty_unpinned_pages_.bytes() - target_dirty_bytes);
+  WriteDirtyPagesAsync(min_bytes_to_write);
 
   // One of the writes we initiated, or an earlier in-flight write may have hit an error.
   RETURN_IF_ERROR(write_status_);
 
-  // Wait until enough writes have finished that the allocation plus dirty pages won't
-  // exceed our reservation. I.e. so that other clients can immediately get the allocated
+  // Wait until enough writes have finished so that we can make the allocation without
+  // violating the eviction policy. I.e. so that other clients can immediately get the
   // memory they're entitled to without waiting for this client's write to complete.
-  DCHECK_GE(in_flight_write_bytes_, min_in_flight_bytes);
-  while (dirty_unpinned_bytes_ > unused_reservation_after_alloc) {
+  DCHECK_GE(in_flight_write_pages_.bytes(), min_bytes_to_write);
+  while (dirty_unpinned_pages_.bytes() + in_flight_write_pages_.bytes()
+      > target_dirty_bytes) {
     SCOPED_TIMER(counters().write_wait_time);
     write_complete_cv_.Wait(*client_lock);
     RETURN_IF_ERROR(write_status_); // Check if error occurred while waiting.
@@ -528,19 +543,17 @@ Status BufferPool::Client::CleanPagesBeforeAllocationLocked(
 
 void BufferPool::Client::WriteDirtyPagesAsync(int64_t min_bytes_to_write) {
   DCHECK_GE(min_bytes_to_write, 0);
-  DCheckConsistency();
+  DCHECK_LE(min_bytes_to_write, dirty_unpinned_pages_.bytes());
   if (file_group_ == NULL) {
     // Spilling disabled - there should be no unpinned pages to write.
     DCHECK_EQ(0, min_bytes_to_write);
-    DCHECK_EQ(0, dirty_unpinned_bytes_);
+    DCHECK_EQ(0, dirty_unpinned_pages_.bytes());
     return;
   }
   // No point in starting writes if an error occurred because future operations for the
   // client will fail regardless.
   if (!write_status_.ok()) return;
 
-  const int64_t writeable_bytes = dirty_unpinned_bytes_ - in_flight_write_bytes_;
-  DCHECK_LE(min_bytes_to_write, writeable_bytes);
   // Compute the ideal amount of writes to start. We use a simple heuristic based on the
   // total number of writes. The FileGroup's allocation should spread the writes across
   // disks somewhat, but doesn't guarantee we're fully using all available disks. In
@@ -549,7 +562,7 @@ void BufferPool::Client::WriteDirtyPagesAsync(int64_t min_bytes_to_write)
{
       * file_group_->tmp_file_mgr()->NumActiveTmpDevices();
 
   int64_t bytes_written = 0;
-  while (bytes_written < writeable_bytes
+  while (!dirty_unpinned_pages_.empty()
       && (bytes_written < min_bytes_to_write
              || in_flight_write_pages_.size() < target_writes)) {
     Page* page = dirty_unpinned_pages_.tail(); // LIFO.
@@ -577,7 +590,6 @@ void BufferPool::Client::WriteDirtyPagesAsync(int64_t min_bytes_to_write)
{
     DCHECK_EQ(tmp, page);
     in_flight_write_pages_.Enqueue(page);
     bytes_written += page->len;
-    in_flight_write_bytes_ += page->len;
   }
 }
 
@@ -593,8 +605,6 @@ void BufferPool::Client::WriteCompleteCallback(Page* page, const Status&
write_s
     // repurposed by other clients and 'write_status_' must be checked by this client
     // before reading back the bad data.
     pool_->AddCleanPage(cl, page);
-    dirty_unpinned_bytes_ -= page->len;
-    in_flight_write_bytes_ -= page->len;
     WriteDirtyPagesAsync(); // Start another asynchronous write if needed.
 
     // Notify before releasing lock to avoid race with Page and Client destruction.
@@ -614,10 +624,12 @@ void BufferPool::Client::WaitForWrite(unique_lock<mutex>* client_lock,
Page* pag
 string BufferPool::Client::DebugString() {
   lock_guard<mutex> lock(lock_);
   stringstream ss;
-  ss << Substitute("<BufferPool::Client> $0 name: $1 write_status: $2 num_pages:
$3 "
-                   "dirty_unpinned_bytes: $4 in_flight_write_bytes: $5 reservation: {$6}",
-      this, name_, write_status_.GetDetail(), num_pages_, dirty_unpinned_bytes_,
-      in_flight_write_bytes_, reservation_.DebugString());
+  ss << Substitute("<BufferPool::Client> $0 name: $1 write_status: $2 "
+                   "buffers allocated $3 num_pages: $4 pinned_bytes: $5 "
+                   "dirty_unpinned_bytes: $6 in_flight_write_bytes: $7 reservation: {$8}",
+      this, name_, write_status_.GetDetail(), buffers_allocated_bytes_, num_pages_,
+      pinned_pages_.bytes(), dirty_unpinned_pages_.bytes(),
+      in_flight_write_pages_.bytes(), reservation_.DebugString());
   ss << "\n  " << pinned_pages_.size() << " pinned pages: ";
   pinned_pages_.Iterate(bind<bool>(Page::DebugStringCallback, &ss, _1));
   ss << "\n  " << dirty_unpinned_pages_.size() << " dirty unpinned pages:
";

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/874d20d0/be/src/runtime/bufferpool/buffer-pool.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool.h b/be/src/runtime/bufferpool/buffer-pool.h
index b199637..da8ef3c 100644
--- a/be/src/runtime/bufferpool/buffer-pool.h
+++ b/be/src/runtime/bufferpool/buffer-pool.h
@@ -248,6 +248,7 @@ class BufferPool : public CacheLineAligned {
 
  private:
   DISALLOW_COPY_AND_ASSIGN(BufferPool);
+  class PageList;
   struct Page;
   /// Allocate a buffer of length 'len'. Assumes that the client's reservation has already
   /// been consumed for the buffer. Returns an error if the pool is unable to fulfill the

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/874d20d0/be/src/util/internal-queue.h
----------------------------------------------------------------------
diff --git a/be/src/util/internal-queue.h b/be/src/util/internal-queue.h
index 5e32116..b295b1f 100644
--- a/be/src/util/internal-queue.h
+++ b/be/src/util/internal-queue.h
@@ -154,11 +154,10 @@ class InternalQueueBase {
   }
 
   /// Removes 'node' from the queue. This is O(1). No-op if node is
-  /// not on the list.
-  void Remove(T* n) {
+  /// not on the list. Returns true if removed
+  bool Remove(T* n) {
     Node* node = (Node*)n;
-    if (node->parent_queue == NULL) return;
-    DCHECK(node->parent_queue == this);
+    if (node->parent_queue != this) return false;
     {
       boost::lock_guard<LockType> lock(lock_);
       if (node->next == NULL && node->prev == NULL) {
@@ -168,7 +167,7 @@ class InternalQueueBase {
         head_ = tail_ = NULL;
         --size_;
         node->parent_queue = NULL;
-        return;
+        return true;
       }
 
       if (head_ == node) {
@@ -189,6 +188,7 @@ class InternalQueueBase {
     }
     node->next = node->prev = NULL;
     node->parent_queue = NULL;
+    return true;
   }
 
   /// Clears all elements in the list.


Mime
View raw message