impala-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tarmstr...@apache.org
Subject [3/4] incubator-impala git commit: IMPALA-4114: Port BufferedBlockMgr tests to buffer pool
Date Tue, 18 Apr 2017 06:42:35 GMT
IMPALA-4114: Port BufferedBlockMgr tests to buffer pool

BufferedBlockMgr had a number of interesting backend tests
that are still relevant to BufferPool. This commit copies
them across and adapts them to BufferPool. This should bring
the backend test coverage for BufferPool up to par with
BufferedBlockMgr.

Many tests weren't ported because they are not relevant or would
duplicate other tests:
* GetNewBlock* -> covered by PageCreation/BufferAllocation
* Pin -> covered by Pin
* Deletion/DeleteSingleBlocks -> all BufferPool tests cover deletion
* Close -> BufferPool doesn't have "cancellation"
* TransferBufferDuringWrite -> the API being tested is not present. Some
   of the deletion tests are the closest analogue.
* WriteCompleteWithCancelledRuntimeState -> not relevant, BufferPool
  doesn't reference RuntimeState.
* MultipleClients* -> we have many tests for the (very different)
  reservation mechanism
* ClientOversubscription -> oversubscription is not supported
* CreateDestroyMulti -> we don't support creation/destruction of
  buffer pools like this
* AllocationErrorHandling -> redundant with WriteErrorBlacklist

Change-Id: Ifb0221e8bea6f3b23b62d5094634d97562295ea3
Reviewed-on: http://gerrit.cloudera.org:8080/6498
Reviewed-by: Tim Armstrong <tarmstrong@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/cb900df4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/cb900df4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/cb900df4

Branch: refs/heads/master
Commit: cb900df435c643bc1046b35e15f68d0c4b983b06
Parents: 6c162df
Author: Tim Armstrong <tarmstrong@cloudera.com>
Authored: Wed Mar 22 07:49:30 2017 -0700
Committer: Impala Public Jenkins <impala-public-jenkins@gerrit.cloudera.org>
Committed: Tue Apr 18 06:33:44 2017 +0000

----------------------------------------------------------------------
 be/src/runtime/bufferpool/buffer-allocator.h    |   5 +
 .../runtime/bufferpool/buffer-pool-internal.h   |   8 +
 be/src/runtime/bufferpool/buffer-pool-test.cc   | 827 ++++++++++++++++++-
 be/src/runtime/bufferpool/buffer-pool.cc        |  12 +
 be/src/runtime/bufferpool/buffer-pool.h         |   1 +
 be/src/runtime/tmp-file-mgr.cc                  |   6 +-
 6 files changed, 847 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cb900df4/be/src/runtime/bufferpool/buffer-allocator.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-allocator.h b/be/src/runtime/bufferpool/buffer-allocator.h
index fc79970..68efbc1 100644
--- a/be/src/runtime/bufferpool/buffer-allocator.h
+++ b/be/src/runtime/bufferpool/buffer-allocator.h
@@ -122,6 +122,11 @@ class BufferPool::BufferAllocator {
   /// Try to release at least 'bytes_to_free' bytes of memory to the system allocator.
   void ReleaseMemory(int64_t bytes_to_free);
 
+  /// Return the amount of memory currently allocated from the system.
+  int64_t GetSystemBytesAllocated() const {
+    return system_bytes_limit_ - system_bytes_remaining_.Load();
+  }
+
   std::string DebugString();
 
  protected:

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cb900df4/be/src/runtime/bufferpool/buffer-pool-internal.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool-internal.h b/be/src/runtime/bufferpool/buffer-pool-internal.h
index 3428087..619288b 100644
--- a/be/src/runtime/bufferpool/buffer-pool-internal.h
+++ b/be/src/runtime/bufferpool/buffer-pool-internal.h
@@ -237,6 +237,10 @@ class BufferPool::Client {
   /// not be held.
   void WaitForWrite(boost::unique_lock<boost::mutex>* client_lock, Page* page);
 
+  /// Test helper: wait for all in-flight writes to complete.
+  /// 'lock_' must not be held by the caller.
+  void WaitForAllWrites();
+
   /// Asserts that 'client_lock' is holding 'lock_'.
   void DCheckHoldsLock(const boost::unique_lock<boost::mutex>& client_lock) {
     DCHECK(client_lock.mutex() == &lock_ && client_lock.owns_lock());
@@ -245,6 +249,7 @@ class BufferPool::Client {
   ReservationTracker* reservation() { return &reservation_; }
   const BufferPoolClientCounters& counters() const { return counters_; }
   bool spilling_enabled() const { return file_group_ != NULL; }
+  void set_debug_write_delay_ms(int val) { debug_write_delay_ms_ = val; }
 
   std::string DebugString();
 
@@ -305,6 +310,9 @@ class BufferPool::Client {
   /// All non-NULL.
   BufferPoolClientCounters counters_;
 
+  /// Debug option to delay write completion.
+  int debug_write_delay_ms_;
+
   /// Lock to protect the below member variables;
   boost::mutex lock_;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cb900df4/be/src/runtime/bufferpool/buffer-pool-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool-test.cc b/be/src/runtime/bufferpool/buffer-pool-test.cc
index 42344f6..a40feac 100644
--- a/be/src/runtime/bufferpool/buffer-pool-test.cc
+++ b/be/src/runtime/bufferpool/buffer-pool-test.cc
@@ -20,11 +20,13 @@
 #include <string>
 #include <vector>
 #include <boost/bind.hpp>
+#include <boost/filesystem.hpp>
 #include <boost/scoped_ptr.hpp>
 #include <boost/thread/thread.hpp>
 #include <boost/unordered_map.hpp>
 
 #include "codegen/llvm-codegen.h"
+#include "common/atomic.h"
 #include "common/init.h"
 #include "common/object-pool.h"
 #include "runtime/bufferpool/buffer-allocator.h"
@@ -37,18 +39,36 @@
 #include "testutil/death-test-util.h"
 #include "testutil/gtest-util.h"
 #include "testutil/rand-util.h"
+#include "util/filesystem-util.h"
 #include "util/metrics.h"
 
 #include "common/names.h"
 
+using boost::filesystem::directory_iterator;
+using std::mt19937;
+using std::uniform_int_distribution;
+using std::uniform_real_distribution;
+
 DECLARE_bool(disk_spill_encryption);
 
+// Note: This is the default scratch dir created by impala.
+// FLAGS_scratch_dirs + TmpFileMgr::TMP_SUB_DIR_NAME.
+const string SCRATCH_DIR = "/tmp/impala-scratch";
+
+// This suffix is appended to a tmp dir
+const string SCRATCH_SUFFIX = "/impala-scratch";
+
 namespace impala {
 
+using BufferHandle = BufferPool::BufferHandle;
+using ClientHandle = BufferPool::ClientHandle;
+using FileGroup = TmpFileMgr::FileGroup;
+using PageHandle = BufferPool::PageHandle;
+
 class BufferPoolTest : public ::testing::Test {
  public:
   virtual void SetUp() {
-    test_env_ = obj_pool_.Add(new TestEnv);
+    test_env_.reset(new TestEnv);
     ASSERT_OK(test_env_->Init());
     RandTestUtil::SeedRng("BUFFER_POOL_TEST_SEED", &rng_);
   }
@@ -63,6 +83,13 @@ class BufferPoolTest : public ::testing::Test {
     }
     global_reservations_.Close();
     obj_pool_.Clear();
+
+    // Tests modify permissions, so make sure we can delete if they didn't clean up.
+    for (string created_tmp_dir : created_tmp_dirs_) {
+      chmod((created_tmp_dir + SCRATCH_SUFFIX).c_str(), S_IRWXU);
+    }
+    FileSystemUtil::RemovePaths(created_tmp_dirs_);
+    created_tmp_dirs_.clear();
     CpuTestUtil::ResetAffinity(); // Some tests modify affinity.
   }
 
@@ -79,6 +106,24 @@ class BufferPoolTest : public ::testing::Test {
       ReservationTracker* parent_tracker, int num_ops);
 
  protected:
+  /// Reinitialize test_env_ to have multiple temporary directories.
+  vector<string> InitMultipleTmpDirs(int num_dirs) {
+    vector<string> tmp_dirs;
+    for (int i = 0; i < num_dirs; ++i) {
+      const string& dir = Substitute("/tmp/buffer-pool-test.$0", i);
+      // Fix permissions in case old directories were left from previous runs of test.
+      chmod((dir + SCRATCH_SUFFIX).c_str(), S_IRWXU);
+      EXPECT_OK(FileSystemUtil::RemoveAndCreateDirectory(dir));
+      tmp_dirs.push_back(dir);
+      created_tmp_dirs_.push_back(dir);
+    }
+    test_env_.reset(new TestEnv);
+    test_env_->SetTmpFileMgrArgs(tmp_dirs, false);
+    EXPECT_OK(test_env_->Init());
+    EXPECT_EQ(num_dirs, test_env_->tmp_file_mgr()->NumActiveTmpDevices());
+    return tmp_dirs;
+  }
+
   static int64_t QueryId(int hi, int lo) { return static_cast<int64_t>(hi) <<
32 | lo; }
 
   /// Helper function to create one reservation tracker per query.
@@ -128,6 +173,14 @@ class BufferPoolTest : public ::testing::Test {
     }
   }
 
+  /// Do a temporary test allocation. Return the status of AllocateBuffer().
+  Status AllocateAndFree(BufferPool* pool, ClientHandle* client, int64_t len) {
+    BufferHandle tmp;
+    RETURN_IF_ERROR(pool->AllocateBuffer(client, len, &tmp));
+    pool->FreeBuffer(client, &tmp);
+    return Status::OK();
+  }
+
   /// Create pages of varying sizes at most 'max_page_size' that add up to
   /// 'total_bytes'. Both numbers must be a multiple of the minimum buffer size.
   /// If 'randomize_core' is true, will switch thread between cores randomly before
@@ -135,6 +188,7 @@ class BufferPoolTest : public ::testing::Test {
   void CreatePages(BufferPool* pool, BufferPool::ClientHandle* client,
       int64_t max_page_size, int64_t total_bytes, vector<BufferPool::PageHandle>* pages,
       bool randomize_core = false) {
+    ASSERT_GE(client->GetUnusedReservation(), total_bytes);
     int64_t curr_page_size = max_page_size;
     int64_t bytes_remaining = total_bytes;
     while (bytes_remaining > 0) {
@@ -158,26 +212,148 @@ class BufferPoolTest : public ::testing::Test {
     buffers->clear();
   }
 
+  Status PinAll(BufferPool* pool, ClientHandle* client, vector<PageHandle>* pages)
{
+    for (auto& page : *pages) RETURN_IF_ERROR(pool->Pin(client, &page));
+    return Status::OK();
+  }
+
+  /// Unpin all of 'pages'. If 'delay_between_unpins_ms' > 0, sleep between unpins.
+  void UnpinAll(BufferPool* pool, ClientHandle* client, vector<PageHandle>* pages,
+      int delay_between_unpins_ms = 0) {
+    for (auto& page : *pages) {
+      pool->Unpin(client, &page);
+      if (delay_between_unpins_ms > 0) SleepForMs(delay_between_unpins_ms);
+    }
+  }
+
+  void DestroyAll(BufferPool* pool, ClientHandle* client, vector<PageHandle>* pages)
{
+    for (auto& page : *pages) pool->DestroyPage(client, &page);
+  }
+
+  /// Write some deterministically-generated sentinel values to pages or buffers. The same
+  /// data is written each time for objects[i], based on start_num + i.
+  template <typename T>
+  void WriteData(const vector<T>& objects, int start_num) {
+    WriteOrVerifyData(objects, start_num, true);
+  }
+
+  template <typename T>
+  void WriteData(const T& object, int val) {
+    return WriteOrVerifyData(object, val, true);
+  }
+
+  /// Verify data written by WriteData().
+  template <typename T>
+  void VerifyData(const vector<T>& objects, int start_num) {
+    WriteOrVerifyData(objects, start_num, false);
+  }
+
+  template <typename T>
+  void VerifyData(const T& object, int val) {
+    return WriteOrVerifyData(object, val, false);
+  }
+
+  /// Implemention of WriteData() and VerifyData().
+  template <typename T>
+  void WriteOrVerifyData(const vector<T>& objects, int start_num, bool write) {
+    for (int i = 0; i < objects.size(); ++i) {
+      WriteOrVerifyData(objects[i], i + start_num, write);
+    }
+  }
+
+  template <typename T>
+  void WriteOrVerifyData(const T& object, int val, bool write) {
+    // Only write sentinel values to start and end of buffer to make writing and
+    // verification cheap.
+    MemRange mem = object.mem_range();
+    uint64_t* start_word = reinterpret_cast<uint64_t*>(mem.data());
+    uint64_t* end_word =
+        reinterpret_cast<uint64_t*>(&mem.data()[mem.len() - sizeof(uint64_t)]);
+    if (write) {
+      *start_word = val;
+      *end_word = ~val;
+    } else {
+      EXPECT_EQ(*start_word, val);
+      EXPECT_EQ(*end_word, ~val);
+    }
+  }
+
+  /// Return the total number of bytes allocated from the system currently.
+  int64_t SystemBytesAllocated(BufferPool* pool) {
+    return pool->allocator()->GetSystemBytesAllocated();
+  }
+
   /// Set the maximum number of scavenge attempts that the pool's allocator wil do.
   void SetMaxScavengeAttempts(BufferPool* pool, int max_attempts) {
     pool->allocator()->set_max_scavenge_attempts(max_attempts);
   }
 
+  void WaitForAllWrites(ClientHandle* client) { client->impl_->WaitForAllWrites();
}
+
+  // Remove write permissions on scratch files. Return # of scratch files.
+  static int RemoveScratchPerms() {
+    int num_files = 0;
+    directory_iterator dir_it(SCRATCH_DIR);
+    for (; dir_it != directory_iterator(); ++dir_it) {
+      ++num_files;
+      chmod(dir_it->path().c_str(), 0);
+    }
+    return num_files;
+  }
+
+  // Remove permissions for the temporary file at 'path' - all subsequent writes
+  // to the file should fail. Expects backing file has already been allocated.
+  static void DisableBackingFile(const string& path) {
+    EXPECT_GT(path.size(), 0);
+    EXPECT_EQ(0, chmod(path.c_str(), 0));
+    LOG(INFO) << "Injected fault by removing file permissions " << path;
+  }
+
+  // Return the path of the temporary file backing the page.
+  static string TmpFilePath(PageHandle* page) {
+    return page->page_->write_handle->TmpFilePath();
+  }
+  // Check that the file backing the page has dir as a prefix of its path.
+  static bool PageInDir(PageHandle* page, const string& dir) {
+    return TmpFilePath(page).find(dir) == 0;
+  }
+
+  // Find a page in the list that is backed by a file with the given directory as prefix
+  // of its path.
+  static PageHandle* FindPageInDir(vector<PageHandle>& pages, const string&
dir) {
+    for (PageHandle& page : pages) {
+      if (PageInDir(&page, dir)) return &page;
+    }
+    return NULL;
+  }
+
+  /// Parameterised test implementations.
   void TestMemoryReclamation(BufferPool* pool, int src_core, int dst_core);
+  void TestEvictionPolicy(int64_t page_size);
+  void TestQueryTeardown(bool write_error);
+  void TestWriteError(int write_delay_ms);
+  void TestRandomInternalSingle(int64_t buffer_len, bool multiple_pins);
+  void TestRandomInternalMulti(int num_threads, int64_t buffer_len, bool multiple_pins);
+  static const int SINGLE_THREADED_TID = -1;
+  void TestRandomInternalImpl(BufferPool* pool, FileGroup* file_group,
+      MemTracker* parent_mem_tracker, mt19937* rng, int tid, bool multiple_pins);
 
   ObjectPool obj_pool_;
   ReservationTracker global_reservations_;
 
-  TestEnv* test_env_; // Owned by 'obj_pool_'.
+  boost::scoped_ptr<TestEnv> test_env_;
 
   /// Per-test random number generator. Seeded before every test.
-  std::mt19937 rng_;
+  mt19937 rng_;
 
-  // The file groups created - closed at end of each test.
+  /// The file groups created - closed at end of each test.
   vector<TmpFileMgr::FileGroup*> file_groups_;
 
-  // Map from query_id to the reservation tracker for that query. Reads and modifications
-  // of the map are protected by query_reservations_lock_.
+  /// Paths of temporary directories created during tests - deleted at end of test.
+  vector<string> created_tmp_dirs_;
+
+  /// Map from query_id to the reservation tracker for that query. Reads and modifications
+  /// of the map are protected by query_reservations_lock_.
   unordered_map<int64_t, ReservationTracker*> query_reservations_;
   SpinLock query_reservations_lock_;
 };
@@ -212,7 +388,7 @@ void BufferPoolTest::RegisterQueriesAndClients(BufferPool* pool, int query_id_hi
           initial_query_reservation / clients_per_query + j
           < initial_query_reservation % clients_per_query;
       // Reservation limit can be anything greater or equal to the initial reservation.
-      int64_t client_reservation_limit = initial_client_reservation + rand() % 100000;
+      int64_t client_reservation_limit = initial_client_reservation + rng_() % 100000;
       string name = Substitute("Client $0 for query $1", j, query_id);
       EXPECT_OK(pool->RegisterClient(name, NULL, query_reservation, NULL,
           client_reservation_limit, NewProfile(), &clients[i][j]));
@@ -828,6 +1004,643 @@ void BufferPoolTest::TestMemoryReclamation(BufferPool* pool, int src_core,
int d
   }
   for (BufferPool::ClientHandle& client : clients) pool->DeregisterClient(&client);
 }
+
+// Test the eviction policy of the buffer pool. Writes are issued eagerly as pages
+// are unpinned, but pages are only evicted from memory when another buffer is
+// allocated.
+TEST_F(BufferPoolTest, EvictionPolicy) {
+  TestEvictionPolicy(TEST_BUFFER_LEN);
+  TestEvictionPolicy(2 * 1024 * 1024);
+}
+
+void BufferPoolTest::TestEvictionPolicy(int64_t page_size) {
+  // The eviction policy changes if there are multiple NUMA nodes, because buffers from
+  // clean pages on the local node are claimed in preference to free buffers on the
+  // non-local node. The rest of the test assumes that it executes on a single NUMA node.
+  if (CpuInfo::GetMaxNumNumaNodes() > 1) CpuTestUtil::PinToCore(0);
+  const int MAX_NUM_BUFFERS = 5;
+  int64_t total_mem = MAX_NUM_BUFFERS * page_size;
+  global_reservations_.InitRootTracker(NewProfile(), total_mem);
+  BufferPool pool(TEST_BUFFER_LEN, total_mem);
+
+  ClientHandle client;
+  RuntimeProfile* profile = NewProfile();
+  ASSERT_OK(pool.RegisterClient("test client", NewFileGroup(), &global_reservations_,
+      nullptr, total_mem, profile, &client));
+  ASSERT_TRUE(client.IncreaseReservation(total_mem));
+
+  RuntimeProfile::Counter* bytes_alloced =
+      profile->GetCounter("BufferPoolAllocationBytes");
+  RuntimeProfile::Counter* write_ios = profile->GetCounter("BufferPoolWriteIoOps");
+  RuntimeProfile::Counter* read_ios = profile->GetCounter("BufferPoolReadIoOps");
+
+  vector<PageHandle> pages;
+  CreatePages(&pool, &client, page_size, total_mem, &pages);
+  WriteData(pages, 0);
+
+  // Unpin pages. Writes should be started and memory should not be deallocated.
+  EXPECT_EQ(total_mem, bytes_alloced->value());
+  EXPECT_EQ(total_mem, SystemBytesAllocated(&pool));
+  UnpinAll(&pool, &client, &pages);
+  ASSERT_GT(write_ios->value(), 0);
+
+  // Re-pin all the pages and validate their data. This should not require reading the
+  // pages back from disk.
+  ASSERT_OK(PinAll(&pool, &client, &pages));
+  ASSERT_EQ(0, read_ios->value());
+  VerifyData(pages, 0);
+
+  // Unpin all pages. Writes should be started again.
+  int64_t prev_write_ios = write_ios->value();
+  UnpinAll(&pool, &client, &pages);
+  ASSERT_GT(write_ios->value(), prev_write_ios);
+
+  // Allocate two more buffers. Two unpinned pages must be evicted to make room.
+  const int NUM_EXTRA_BUFFERS = 2;
+  vector<BufferHandle> extra_buffers;
+  AllocateBuffers(
+      &pool, &client, page_size, page_size * NUM_EXTRA_BUFFERS, &extra_buffers);
+  // At least two unpinned pages should have been written out.
+  ASSERT_GE(write_ios->value(), prev_write_ios + NUM_EXTRA_BUFFERS);
+  // No additional memory should have been allocated - it should have been recycled.
+  EXPECT_EQ(total_mem, SystemBytesAllocated(&pool));
+  // Check that two pages were evicted.
+  int num_evicted = 0;
+  for (PageHandle& page : pages) {
+    if (IsEvicted(&page)) ++num_evicted;
+  }
+  EXPECT_EQ(NUM_EXTRA_BUFFERS, num_evicted);
+
+  // Free up memory required to pin the original pages again.
+  FreeBuffers(&pool, &client, &extra_buffers);
+  ASSERT_OK(PinAll(&pool, &client, &pages));
+  // We only needed read to back the two evicted pages. Make sure we didn't do extra I/O.
+  ASSERT_EQ(NUM_EXTRA_BUFFERS, read_ios->value());
+  VerifyData(pages, 0);
+  DestroyAll(&pool, &client, &pages);
+  pool.DeregisterClient(&client);
+  global_reservations_.Close();
+}
+
+/// Test that we can destroy pages while a disk write is in flight for those pages.
+TEST_F(BufferPoolTest, DestroyDuringWrite) {
+  const int TRIALS = 20;
+  const int MAX_NUM_BUFFERS = 5;
+  const int64_t TOTAL_MEM = TEST_BUFFER_LEN * MAX_NUM_BUFFERS;
+  BufferPool pool(TEST_BUFFER_LEN, TOTAL_MEM);
+  global_reservations_.InitRootTracker(NewProfile(), TOTAL_MEM);
+  ClientHandle client;
+  for (int trial = 0; trial < TRIALS; ++trial) {
+    ASSERT_OK(pool.RegisterClient("test client", NewFileGroup(), &global_reservations_,
+        nullptr, TOTAL_MEM, NewProfile(), &client));
+    ASSERT_TRUE(client.IncreaseReservation(TOTAL_MEM));
+
+    vector<PageHandle> pages;
+    CreatePages(&pool, &client, TEST_BUFFER_LEN, TOTAL_MEM, &pages);
+
+    // Unpin will initiate writes.
+    UnpinAll(&pool, &client, &pages);
+
+    // Writes should still be in flight when pages are deleted.
+    DestroyAll(&pool, &client, &pages);
+    pool.DeregisterClient(&client);
+  }
+}
+
+/// Test teardown of a query while writes are in flight. This was based on a
+/// BufferedBlockMgr regression test for IMPALA-2252 where tear-down of the
+/// query's RuntimeStates raced with scratch writes. If write_error is true,
+/// force writes to hit errors.
+void BufferPoolTest::TestQueryTeardown(bool write_error) {
+  const int64_t TOTAL_BUFFERS = 20;
+  const int CLIENT_BUFFERS = 10;
+  const int64_t TOTAL_MEM = TOTAL_BUFFERS * TEST_BUFFER_LEN;
+  const int64_t CLIENT_MEM = CLIENT_BUFFERS * TEST_BUFFER_LEN;
+
+  // Set up a BufferPool in the TestEnv.
+  test_env_.reset(new TestEnv());
+  test_env_->SetBufferPoolArgs(TEST_BUFFER_LEN, TOTAL_MEM);
+  ASSERT_OK(test_env_->Init());
+
+  BufferPool* pool = test_env_->exec_env()->buffer_pool();
+  RuntimeState* state;
+  ASSERT_OK(test_env_->CreateQueryState(0, nullptr, &state));
+
+  ClientHandle client;
+  ASSERT_OK(pool->RegisterClient("test client", state->query_state()->file_group(),
+      state->instance_buffer_reservation(),
+      obj_pool_.Add(new MemTracker(-1, "", state->instance_mem_tracker())), CLIENT_MEM,
+      NewProfile(), &client));
+  ASSERT_TRUE(client.IncreaseReservation(CLIENT_MEM));
+
+  vector<PageHandle> pages;
+  CreatePages(pool, &client, TEST_BUFFER_LEN, CLIENT_BUFFERS, &pages);
+
+  if (write_error) {
+    UnpinAll(pool, &client, &pages);
+    // Allocate more buffers to create memory pressure and force eviction of all the
+    // unpinned pages.
+    vector<BufferHandle> tmp_buffers;
+    AllocateBuffers(pool, &client, TEST_BUFFER_LEN, CLIENT_BUFFERS, &tmp_buffers);
+    string tmp_file_path = TmpFilePath(&pages[0]);
+    FreeBuffers(pool, &client, &tmp_buffers);
+
+    PinAll(pool, &client, &pages);
+    // Remove temporary file to force future writes to that file to fail.
+    DisableBackingFile(tmp_file_path);
+  }
+
+  // Unpin will initiate writes. If we triggered a write error earlier, some writes may
+  // go down the error path.
+  UnpinAll(pool, &client, &pages);
+
+  // Tear down the pages, client, and query in the correct order while writes are in
+  // flight.
+  DestroyAll(pool, &client, &pages);
+  pool->DeregisterClient(&client);
+  test_env_->TearDownQueries();
+
+  // All memory should be released from the query.
+  EXPECT_EQ(0, test_env_->TotalQueryMemoryConsumption());
+  EXPECT_EQ(0, test_env_->exec_env()->buffer_reservation()->GetChildReservations());
+}
+
+TEST_F(BufferPoolTest, QueryTeardown) {
+  TestQueryTeardown(false);
+}
+
+TEST_F(BufferPoolTest, QueryTeardownWriteError) {
+  TestQueryTeardown(true);
+}
+
+// Test that the buffer pool handles a write error correctly.  Delete the scratch
+// directory before an operation that would cause a write and test that subsequent API
+// calls return errors as expected.
+void BufferPoolTest::TestWriteError(int write_delay_ms) {
+  int MAX_NUM_BUFFERS = 2;
+  int64_t TOTAL_MEM = MAX_NUM_BUFFERS * TEST_BUFFER_LEN;
+  BufferPool pool(TEST_BUFFER_LEN, TOTAL_MEM);
+  global_reservations_.InitRootTracker(NewProfile(), TOTAL_MEM);
+  ClientHandle client;
+  ASSERT_OK(pool.RegisterClient("test client", NewFileGroup(), &global_reservations_,
+      nullptr, TOTAL_MEM, NewProfile(), &client));
+  ASSERT_TRUE(client.IncreaseReservation(TOTAL_MEM));
+  client.impl_->set_debug_write_delay_ms(write_delay_ms);
+
+  vector<PageHandle> pages;
+  CreatePages(&pool, &client, TEST_BUFFER_LEN, MAX_NUM_BUFFERS, &pages);
+  // Unpin two pages here, to ensure that backing storage is allocated in tmp file.
+  UnpinAll(&pool, &client, &pages);
+  WaitForAllWrites(&client);
+  // Repin the pages
+  PinAll(&pool, &client, &pages);
+  // Remove the backing storage so that future writes will fail
+  ASSERT_GT(RemoveScratchPerms(), 0);
+  // Give the first write a chance to fail before the second write starts.
+  const int INTERVAL_MS = 10;
+  UnpinAll(&pool, &client, &pages, INTERVAL_MS);
+  WaitForAllWrites(&client);
+
+  // Subsequent calls to APIs that require allocating memory should fail: the write error
+  // is picked up asynchronously.
+  BufferHandle tmp_buffer;
+  PageHandle tmp_page;
+  Status error = pool.AllocateBuffer(&client, TEST_BUFFER_LEN, &tmp_buffer);
+  EXPECT_EQ(TErrorCode::SCRATCH_ALLOCATION_FAILED, error.code());
+  EXPECT_FALSE(tmp_buffer.is_open());
+  error = pool.CreatePage(&client, TEST_BUFFER_LEN, &tmp_page);
+  EXPECT_EQ(TErrorCode::SCRATCH_ALLOCATION_FAILED, error.code());
+  EXPECT_FALSE(tmp_page.is_open());
+  error = pool.Pin(&client, &pages[0]);
+  EXPECT_EQ(TErrorCode::SCRATCH_ALLOCATION_FAILED, error.code());
+  EXPECT_FALSE(pages[0].is_pinned());
+
+  DestroyAll(&pool, &client, &pages);
+  pool.DeregisterClient(&client);
+  global_reservations_.Close();
+}
+
+TEST_F(BufferPoolTest, WriteError) {
+  TestWriteError(0);
+}
+
+// Regression test for IMPALA-4842 - inject a delay in the write to
+// reproduce the issue.
+TEST_F(BufferPoolTest, WriteErrorWriteDelay) {
+  TestWriteError(100);
+}
+
+// Test error handling when temporary file space cannot be allocated to back an unpinned
+// page.
+TEST_F(BufferPoolTest, TmpFileAllocateError) {
+  const int MAX_NUM_BUFFERS = 2;
+  const int64_t TOTAL_MEM = TEST_BUFFER_LEN * MAX_NUM_BUFFERS;
+  BufferPool pool(TEST_BUFFER_LEN, TOTAL_MEM);
+  global_reservations_.InitRootTracker(NewProfile(), TOTAL_MEM);
+  ClientHandle client;
+  ASSERT_OK(pool.RegisterClient("test client", NewFileGroup(), &global_reservations_,
+      nullptr, TOTAL_MEM, NewProfile(), &client));
+  ASSERT_TRUE(client.IncreaseReservation(TOTAL_MEM));
+
+  vector<PageHandle> pages;
+  CreatePages(&pool, &client, TEST_BUFFER_LEN, TOTAL_MEM, &pages);
+  // Unpin a page, which will trigger a write.
+  pool.Unpin(&client, &pages[0]);
+  WaitForAllWrites(&client);
+  // Remove temporary files - subsequent operations will fail.
+  ASSERT_GT(RemoveScratchPerms(), 0);
+  // The write error will happen asynchronously.
+  pool.Unpin(&client, &pages[1]);
+
+  // Write failure causes future operations like Pin() to fail.
+  WaitForAllWrites(&client);
+  Status error = pool.Pin(&client, &pages[0]);
+  EXPECT_EQ(TErrorCode::SCRATCH_ALLOCATION_FAILED, error.code());
+  EXPECT_FALSE(pages[0].is_pinned());
+
+  DestroyAll(&pool, &client, &pages);
+  pool.DeregisterClient(&client);
+}
+
+// Test that scratch devices are blacklisted after a write error. The query that
+// encountered the write error should not allocate more pages on that device, but
+// existing pages on the device will remain in use and future queries will use the device.
+TEST_F(BufferPoolTest, WriteErrorBlacklist) {
+  // Set up two file groups with two temporary dirs.
+  vector<string> tmp_dirs = InitMultipleTmpDirs(2);
+  // Simulate two concurrent queries.
+  const int TOTAL_QUERIES = 3;
+  const int INITIAL_QUERIES = 2;
+  const int MAX_NUM_PAGES = 6;
+  const int PAGES_PER_QUERY = MAX_NUM_PAGES / TOTAL_QUERIES;
+  const int64_t TOTAL_MEM = MAX_NUM_PAGES * TEST_BUFFER_LEN;
+  const int64_t MEM_PER_QUERY = PAGES_PER_QUERY * TEST_BUFFER_LEN;
+  BufferPool pool(TEST_BUFFER_LEN, TOTAL_MEM);
+  global_reservations_.InitRootTracker(NewProfile(), TOTAL_MEM);
+  vector<FileGroup*> file_groups;
+  vector<ClientHandle> clients(TOTAL_QUERIES);
+  for (int i = 0; i < INITIAL_QUERIES; ++i) {
+    file_groups.push_back(NewFileGroup());
+    ASSERT_OK(pool.RegisterClient("test client", file_groups[i], &global_reservations_,
+        nullptr, MEM_PER_QUERY, NewProfile(), &clients[i]));
+    ASSERT_TRUE(clients[i].IncreaseReservation(MEM_PER_QUERY));
+  }
+
+  // Allocate files for all 2x2 combinations by unpinning pages.
+  vector<vector<PageHandle>> pages(TOTAL_QUERIES);
+  for (int i = 0; i < INITIAL_QUERIES; ++i) {
+    CreatePages(&pool, &clients[i], TEST_BUFFER_LEN, MEM_PER_QUERY, &pages[i]);
+    WriteData(pages[i], 0);
+    UnpinAll(&pool, &clients[i], &pages[i]);
+    for (int j = 0; j < PAGES_PER_QUERY; ++j) {
+      LOG(INFO) << "Manager " << i << " Block " << j << " backed
by file "
+                << TmpFilePath(&pages[i][j]);
+    }
+  }
+  for (int i = 0; i < INITIAL_QUERIES; ++i) WaitForAllWrites(&clients[i]);
+  const int ERROR_QUERY = 0;
+  const int NO_ERROR_QUERY = 1;
+  const string& error_dir = tmp_dirs[0];
+  const string& good_dir = tmp_dirs[1];
+  // Delete one file from first scratch dir for first query to trigger an error.
+  PageHandle* error_page = FindPageInDir(pages[ERROR_QUERY], error_dir);
+  ASSERT_TRUE(error_page != NULL) << "Expected a tmp file in dir " << error_dir;
+  const string& error_file_path = TmpFilePath(error_page);
+  for (int i = 0; i < INITIAL_QUERIES; ++i) PinAll(&pool, &clients[i], &pages[i]);
+  DisableBackingFile(error_file_path);
+  for (int i = 0; i < INITIAL_QUERIES; ++i) UnpinAll(&pool, &clients[i], &pages[i]);
+
+  // At least one write should hit an error, but it should be recoverable.
+  for (int i = 0; i < INITIAL_QUERIES; ++i) WaitForAllWrites(&clients[i]);
+
+  // Both clients should still be usable - test the API.
+  for (int i = 0; i < INITIAL_QUERIES; ++i) {
+    PinAll(&pool, &clients[i], &pages[i]);
+    VerifyData(pages[i], 0);
+    UnpinAll(&pool, &clients[i], &pages[i]);
+    ASSERT_OK(AllocateAndFree(&pool, &clients[i], TEST_BUFFER_LEN));
+  }
+
+  // Temporary device with error should still be active.
+  vector<TmpFileMgr::DeviceId> active_tmp_devices =
+      test_env_->tmp_file_mgr()->ActiveTmpDevices();
+  ASSERT_EQ(tmp_dirs.size(), active_tmp_devices.size());
+  for (int i = 0; i < active_tmp_devices.size(); ++i) {
+    const string& device_path =
+        test_env_->tmp_file_mgr()->GetTmpDirPath(active_tmp_devices[i]);
+    ASSERT_EQ(string::npos, error_dir.find(device_path));
+  }
+
+  // The query that hit the error should only allocate from the device that had no error.
+  // The other one should continue using both devices, since it didn't encounter a write
+  // error itself.
+  vector<PageHandle> error_new_pages;
+  CreatePages(
+      &pool, &clients[ERROR_QUERY], TEST_BUFFER_LEN, MEM_PER_QUERY, &error_new_pages);
+  UnpinAll(&pool, &clients[ERROR_QUERY], &error_new_pages);
+  WaitForAllWrites(&clients[ERROR_QUERY]);
+  EXPECT_TRUE(FindPageInDir(error_new_pages, good_dir) != NULL);
+  EXPECT_TRUE(FindPageInDir(error_new_pages, error_dir) == NULL);
+  for (PageHandle& error_new_page : error_new_pages) {
+    LOG(INFO) << "Newly created page backed by file " << TmpFilePath(&error_new_page);
+    EXPECT_TRUE(PageInDir(&error_new_page, good_dir));
+  }
+  DestroyAll(&pool, &clients[ERROR_QUERY], &error_new_pages);
+
+  PinAll(&pool, &clients[NO_ERROR_QUERY], &pages[NO_ERROR_QUERY]);
+  UnpinAll(&pool, &clients[NO_ERROR_QUERY], &pages[NO_ERROR_QUERY]);
+  WaitForAllWrites(&clients[NO_ERROR_QUERY]);
+  EXPECT_TRUE(FindPageInDir(pages[NO_ERROR_QUERY], good_dir) != NULL);
+  EXPECT_TRUE(FindPageInDir(pages[NO_ERROR_QUERY], error_dir) != NULL);
+
+  // The second client should use the bad directory for new pages since
+  // blacklisting is per-query, not global.
+  vector<PageHandle> no_error_new_pages;
+  CreatePages(&pool, &clients[NO_ERROR_QUERY], TEST_BUFFER_LEN, MEM_PER_QUERY,
+      &no_error_new_pages);
+  UnpinAll(&pool, &clients[NO_ERROR_QUERY], &no_error_new_pages);
+  WaitForAllWrites(&clients[NO_ERROR_QUERY]);
+  EXPECT_TRUE(FindPageInDir(no_error_new_pages, good_dir) != NULL);
+  EXPECT_TRUE(FindPageInDir(no_error_new_pages, error_dir) != NULL);
+  DestroyAll(&pool, &clients[NO_ERROR_QUERY], &no_error_new_pages);
+
+  // A new query should use the both dirs for backing storage.
+  const int NEW_QUERY = 2;
+  ASSERT_OK(pool.RegisterClient("new test client", NewFileGroup(), &global_reservations_,
+      nullptr, MEM_PER_QUERY, NewProfile(), &clients[NEW_QUERY]));
+  ASSERT_TRUE(clients[NEW_QUERY].IncreaseReservation(MEM_PER_QUERY));
+  CreatePages(
+      &pool, &clients[NEW_QUERY], TEST_BUFFER_LEN, MEM_PER_QUERY, &pages[NEW_QUERY]);
+  UnpinAll(&pool, &clients[NEW_QUERY], &pages[NEW_QUERY]);
+  WaitForAllWrites(&clients[NEW_QUERY]);
+  EXPECT_TRUE(FindPageInDir(pages[NEW_QUERY], good_dir) != NULL);
+  EXPECT_TRUE(FindPageInDir(pages[NEW_QUERY], error_dir) != NULL);
+
+  for (int i = 0; i < TOTAL_QUERIES; ++i) {
+    DestroyAll(&pool, &clients[i], &pages[i]);
+    pool.DeregisterClient(&clients[i]);
+  }
+}
+
+/// Test that the buffer pool fails cleanly when all scratch directories are inaccessible
+/// at runtime.
+TEST_F(BufferPoolTest, NoDirsAllocationError) {
+  vector<string> tmp_dirs = InitMultipleTmpDirs(2);
+  int MAX_NUM_BUFFERS = 2;
+  int64_t TOTAL_MEM = MAX_NUM_BUFFERS * TEST_BUFFER_LEN;
+  BufferPool pool(TEST_BUFFER_LEN, TOTAL_MEM);
+  global_reservations_.InitRootTracker(NewProfile(), TOTAL_MEM);
+  ClientHandle client;
+  ASSERT_OK(pool.RegisterClient("test client", NewFileGroup(), &global_reservations_,
+      nullptr, TOTAL_MEM, NewProfile(), &client));
+  ASSERT_TRUE(client.IncreaseReservation(TOTAL_MEM));
+
+  vector<PageHandle> pages;
+  CreatePages(&pool, &client, TEST_BUFFER_LEN, TOTAL_MEM, &pages);
+  for (int i = 0; i < tmp_dirs.size(); ++i) {
+    const string& tmp_scratch_subdir = tmp_dirs[i] + SCRATCH_SUFFIX;
+    chmod(tmp_scratch_subdir.c_str(), 0);
+  }
+
+  // The error will happen asynchronously.
+  UnpinAll(&pool, &client, &pages);
+  WaitForAllWrites(&client);
+
+  // Write failure should results in an error getting propagated back to Pin().
+  for (PageHandle& page : pages) {
+    Status status = pool.Pin(&client, &page);
+    EXPECT_EQ(TErrorCode::SCRATCH_ALLOCATION_FAILED, status.code());
+  }
+  DestroyAll(&pool, &client, &pages);
+  pool.DeregisterClient(&client);
+}
+
+// Test that the buffer pool can still create pages when no scratch is present.
+TEST_F(BufferPoolTest, NoTmpDirs) {
+  InitMultipleTmpDirs(0);
+  const int MAX_NUM_BUFFERS = 3;
+  const int64_t TOTAL_MEM = MAX_NUM_BUFFERS * TEST_BUFFER_LEN;
+  BufferPool pool(TEST_BUFFER_LEN, TOTAL_MEM);
+  global_reservations_.InitRootTracker(NewProfile(), TOTAL_MEM);
+  ClientHandle client;
+  ASSERT_OK(pool.RegisterClient("test client", NewFileGroup(), &global_reservations_,
+      nullptr, TOTAL_MEM, NewProfile(), &client));
+  ASSERT_TRUE(client.IncreaseReservation(TOTAL_MEM));
+
+  vector<PageHandle> pages;
+  CreatePages(&pool, &client, TEST_BUFFER_LEN, TOTAL_MEM, &pages);
+
+  // Unpinning is allowed by the BufferPool interface but we won't start any writes to
+  // disk because the flushing heuristic does not eagerly start writes when there are no
+  // active scratch devices.
+  UnpinAll(&pool, &client, &pages);
+  WaitForAllWrites(&client);
+  ASSERT_OK(pool.Pin(&client, &pages[0]));
+
+  // Allocating another buffer will force a write, which will fail.
+  BufferHandle tmp_buffer;
+  Status status = pool.AllocateBuffer(&client, TEST_BUFFER_LEN, &tmp_buffer);
+  ASSERT_FALSE(status.ok());
+  ASSERT_EQ(TErrorCode::SCRATCH_ALLOCATION_FAILED, status.code()) << status.msg().msg();
+
+  DestroyAll(&pool, &client, &pages);
+  pool.DeregisterClient(&client);
+}
+
+// Test that the buffer pool can still create pages when spilling is disabled by
+// setting scratch_limit = 0.
+TEST_F(BufferPoolTest, ScratchLimitZero) {
+  const int QUERY_BUFFERS = 3;
+  const int64_t TOTAL_MEM = 100 * TEST_BUFFER_LEN;
+  const int64_t QUERY_MEM = QUERY_BUFFERS * TEST_BUFFER_LEN;
+
+  // Set up a query state with the scratch_limit option in the TestEnv.
+  test_env_.reset(new TestEnv());
+  test_env_->SetBufferPoolArgs(TEST_BUFFER_LEN, TOTAL_MEM);
+  ASSERT_OK(test_env_->Init());
+
+  BufferPool* pool = test_env_->exec_env()->buffer_pool();
+  RuntimeState* state;
+  TQueryOptions query_options;
+  query_options.scratch_limit = 0;
+  ASSERT_OK(test_env_->CreateQueryState(0, &query_options, &state));
+
+  ClientHandle client;
+  ASSERT_OK(pool->RegisterClient("test client", state->query_state()->file_group(),
+      state->instance_buffer_reservation(),
+      obj_pool_.Add(new MemTracker(-1, "", state->instance_mem_tracker())), QUERY_MEM,
+      NewProfile(), &client));
+  ASSERT_TRUE(client.IncreaseReservation(QUERY_MEM));
+
+  vector<PageHandle> pages;
+  CreatePages(pool, &client, TEST_BUFFER_LEN, QUERY_MEM, &pages);
+
+  // Spilling is disabled by the QueryState when scratch_limit is 0, so trying to unpin
+  // will cause a DCHECK.
+  IMPALA_ASSERT_DEBUG_DEATH(pool->Unpin(&client, &pages[0]), "");
+
+  DestroyAll(pool, &client, &pages);
+  pool->DeregisterClient(&client);
+}
+
+TEST_F(BufferPoolTest, SingleRandom) {
+  TestRandomInternalSingle(8 * 1024, true);
+  TestRandomInternalSingle(8 * 1024, false);
+}
+
+TEST_F(BufferPoolTest, Multi2Random) {
+  TestRandomInternalMulti(2, 8 * 1024, true);
+  TestRandomInternalMulti(2, 8 * 1024, false);
+}
+
+TEST_F(BufferPoolTest, Multi4Random) {
+  TestRandomInternalMulti(4, 8 * 1024, true);
+  TestRandomInternalMulti(4, 8 * 1024, false);
+}
+
+TEST_F(BufferPoolTest, Multi8Random) {
+  TestRandomInternalMulti(8, 8 * 1024, true);
+  TestRandomInternalMulti(8, 8 * 1024, false);
+}
+
+// Single-threaded execution of the TestRandomInternalImpl.
+void BufferPoolTest::TestRandomInternalSingle(
+    int64_t min_buffer_len, bool multiple_pins) {
+  const int MAX_NUM_BUFFERS = 200;
+  const int64_t TOTAL_MEM = MAX_NUM_BUFFERS * min_buffer_len;
+  BufferPool pool(min_buffer_len, TOTAL_MEM);
+  global_reservations_.InitRootTracker(NewProfile(), TOTAL_MEM);
+  MemTracker global_tracker(TOTAL_MEM);
+  TestRandomInternalImpl(
+      &pool, NewFileGroup(), &global_tracker, &rng_, SINGLE_THREADED_TID, multiple_pins);
+  global_reservations_.Close();
+}
+
+// Multi-threaded execution of the TestRandomInternalImpl.
+void BufferPoolTest::TestRandomInternalMulti(
+    int num_threads, int64_t min_buffer_len, bool multiple_pins) {
+  const int MAX_NUM_BUFFERS_PER_THREAD = 200;
+  const int64_t TOTAL_MEM = num_threads * MAX_NUM_BUFFERS_PER_THREAD * min_buffer_len;
+  BufferPool pool(min_buffer_len, TOTAL_MEM);
+  global_reservations_.InitRootTracker(NewProfile(), TOTAL_MEM);
+  MemTracker global_tracker(TOTAL_MEM);
+  FileGroup* shared_file_group = NewFileGroup();
+  thread_group workers;
+  vector<mt19937> rngs(num_threads);
+  for (int i = 0; i < num_threads; ++i) {
+    rngs[i].seed(rng_()); // Seed the thread-local rngs.
+    workers.add_thread(new thread(
+        [this, &pool, shared_file_group, &global_tracker, &rngs, i, multiple_pins]()
{
+          TestRandomInternalImpl(
+              &pool, shared_file_group, &global_tracker, &rngs[i], i, multiple_pins);
+        }));
+  }
+
+  AtomicInt32 stop_maintenance(0);
+  thread* maintenance_thread = new thread([this, &pool, &stop_maintenance]() {
+    while (stop_maintenance.Load() == 0) {
+      pool.Maintenance();
+      SleepForMs(50);
+    }
+  });
+  workers.join_all();
+  stop_maintenance.Add(1);
+  maintenance_thread->join();
+  global_reservations_.Close();
+}
+
+/// Randomly issue AllocateBuffer(), FreeBuffer(), CreatePage(), Pin(), Unpin(), and
+/// DestroyPage() calls. All calls made are legal - error conditions are not expected.
+/// When executed in single-threaded mode 'tid' should be SINGLE_THREADED_TID. If
+/// 'multiple_pins' is true, pages can be pinned multiple times (useful to test this
+/// functionality). Otherwise they are only pinned once (useful to test the case when
+/// memory is more committed).
+void BufferPoolTest::TestRandomInternalImpl(BufferPool* pool, FileGroup* file_group,
+    MemTracker* parent_mem_tracker, mt19937* rng, int tid, bool multiple_pins) {
+  // Encrypting and decrypting is expensive - reduce iterations when encryption is on.
+  int num_iterations = FLAGS_disk_spill_encryption ? 5000 : 50000;
+  // All the existing pages and buffers along with the sentinel values written to them.
+  vector<pair<PageHandle, int>> pages;
+  vector<pair<BufferHandle, int>> buffers;
+
+  /// Pick a power-of-two buffer sizes that are up to 2^4 times the minimum buffer length.
+  uniform_int_distribution<int> buffer_exponent_dist(0, 4);
+
+  ClientHandle client;
+  ASSERT_OK(pool->RegisterClient(Substitute("$0", tid), file_group, &global_reservations_,
+      obj_pool_.Add(new MemTracker(-1, "", parent_mem_tracker)), 1L << 48, NewProfile(),
+      &client));
+
+  for (int i = 0; i < num_iterations; ++i) {
+    if ((i % 10000) == 0) LOG(ERROR) << " Iteration " << i << endl;
+    // Pick an operation.
+    // New page: 15%
+    // Pin a page: 30%
+    // Unpin a pinned page: 25% (< Pin prob. so that memory consumption increases).
+    // Destroy page: 10% (< New page prob. so that number of pages grows over time).
+    // Allocate buffer: 10%
+    // Free buffer: 9.9%
+    // Switch core that the thread is executing on: 0.1%
+    double p = uniform_real_distribution<double>(0.0, 1.0)(*rng);
+    if (p < 0.15) {
+      // Create a new page.
+      int64_t page_len = pool->min_buffer_len() << (buffer_exponent_dist)(*rng);
+      if (!client.IncreaseReservationToFit(page_len)) continue;
+      PageHandle new_page;
+      ASSERT_OK(pool->CreatePage(&client, page_len, &new_page));
+      int data = (*rng)();
+      WriteData(new_page, data);
+      pages.emplace_back(move(new_page), data);
+    } else if (p < 0.45) {
+      // Pin a page.
+      if (pages.empty()) continue;
+      int rand_pick = uniform_int_distribution<int>(0, pages.size() - 1)(*rng);
+      PageHandle* page = &pages[rand_pick].first;
+      if (!client.IncreaseReservationToFit(page->len())) continue;
+      if (!page->is_pinned() || multiple_pins) ASSERT_OK(pool->Pin(&client, page));
+      VerifyData(*page, pages[rand_pick].second);
+    } else if (p < 0.70) {
+      // Unpin a pinned page.
+      if (pages.empty()) continue;
+      int rand_pick = uniform_int_distribution<int>(0, pages.size() - 1)(*rng);
+      PageHandle* page = &pages[rand_pick].first;
+      if (page->is_pinned()) pool->Unpin(&client, page);
+    } else if (p < 0.80) {
+      // Destroy a page.
+      if (pages.empty()) continue;
+      int rand_pick = uniform_int_distribution<int>(0, pages.size() - 1)(*rng);
+      auto page_data = move(pages[rand_pick]);
+      pages[rand_pick] = move(pages.back());
+      pages.pop_back();
+      pool->DestroyPage(&client, &page_data.first);
+    } else if (p < 0.90) {
+      // Allocate a buffer. Pick a random power-of-two size that is up to 2^4
+      // times the minimum buffer length.
+      int64_t buffer_len = pool->min_buffer_len() << (buffer_exponent_dist)(*rng);
+      if (!client.IncreaseReservationToFit(buffer_len)) continue;
+      BufferHandle new_buffer;
+      ASSERT_OK(pool->AllocateBuffer(&client, buffer_len, &new_buffer));
+      int data = (*rng)();
+      WriteData(new_buffer, data);
+      buffers.emplace_back(move(new_buffer), data);
+    } else if (p < 0.999) {
+      // Free a buffer.
+      if (buffers.empty()) continue;
+      int rand_pick = uniform_int_distribution<int>(0, buffers.size() - 1)(*rng);
+      auto buffer_data = move(buffers[rand_pick]);
+      buffers[rand_pick] = move(buffers.back());
+      buffers.pop_back();
+      pool->FreeBuffer(&client, &buffer_data.first);
+    } else {
+      CpuTestUtil::PinToRandomCore(rng);
+    }
+  }
+
+  // The client needs to delete all its pages.
+  for (auto& page : pages) pool->DestroyPage(&client, &page.first);
+  for (auto& buffer : buffers) pool->FreeBuffer(&client, &buffer.first);
+  pool->DeregisterClient(&client);
+}
 }
 
 int main(int argc, char** argv) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cb900df4/be/src/runtime/bufferpool/buffer-pool.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool.cc b/be/src/runtime/bufferpool/buffer-pool.cc
index b4b2420..05176a5 100644
--- a/be/src/runtime/bufferpool/buffer-pool.cc
+++ b/be/src/runtime/bufferpool/buffer-pool.cc
@@ -28,6 +28,7 @@
 #include "util/bit-util.h"
 #include "util/cpu-info.h"
 #include "util/runtime-profile-counters.h"
+#include "util/time.h"
 #include "util/uid-util.h"
 
 DEFINE_int32(concurrent_scratch_ios_per_device, 2,
@@ -265,6 +266,7 @@ BufferPool::Client::Client(BufferPool* pool, TmpFileMgr::FileGroup* file_group,
   : pool_(pool),
     file_group_(file_group),
     name_(name),
+    debug_write_delay_ms_(0),
     num_pages_(0),
     buffers_allocated_bytes_(0) {
   reservation_.InitChildTracker(
@@ -502,6 +504,9 @@ void BufferPool::Client::WriteDirtyPagesAsync(int64_t min_bytes_to_write)
{
 }
 
 void BufferPool::Client::WriteCompleteCallback(Page* page, const Status& write_status)
{
+#ifndef NDEBUG
+  if (debug_write_delay_ms_ > 0) SleepForMs(debug_write_delay_ms_);
+#endif
   {
     unique_lock<mutex> cl(lock_);
     DCHECK(in_flight_write_pages_.Contains(page));
@@ -529,6 +534,13 @@ void BufferPool::Client::WaitForWrite(unique_lock<mutex>* client_lock,
Page* pag
   }
 }
 
+void BufferPool::Client::WaitForAllWrites() {
+  unique_lock<mutex> cl(lock_);
+  while (in_flight_write_pages_.size() > 0) {
+    write_complete_cv_.Wait(cl);
+  }
+}
+
 string BufferPool::Client::DebugString() {
   lock_guard<mutex> lock(lock_);
   stringstream ss;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cb900df4/be/src/runtime/bufferpool/buffer-pool.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool.h b/be/src/runtime/bufferpool/buffer-pool.h
index 7770fff..ad9ab00 100644
--- a/be/src/runtime/bufferpool/buffer-pool.h
+++ b/be/src/runtime/bufferpool/buffer-pool.h
@@ -313,6 +313,7 @@ class BufferPool::ClientHandle {
 
  private:
   friend class BufferPool;
+  friend class BufferPoolTest;
   DISALLOW_COPY_AND_ASSIGN(ClientHandle);
 
   /// Internal state for the client. NULL means the client isn't registered.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cb900df4/be/src/runtime/tmp-file-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/tmp-file-mgr.cc b/be/src/runtime/tmp-file-mgr.cc
index abba192..4650e69 100644
--- a/be/src/runtime/tmp-file-mgr.cc
+++ b/be/src/runtime/tmp-file-mgr.cc
@@ -271,11 +271,7 @@ Status TmpFileMgr::FileGroup::CreateFiles() {
   }
   DCHECK_EQ(tmp_files_.size(), files_allocated);
   if (tmp_files_.size() == 0) {
-    // TODO: IMPALA-4697: the merged errors do not show up in the query error log,
-    // so we must point users to the impalad error log.
-    Status err_status(
-        "Could not create files in any configured scratch directories (--scratch_dirs). "
-        "See logs for previous errors that may have caused this.");
+    Status err_status(TErrorCode::SCRATCH_ALLOCATION_FAILED);
     for (Status& err : scratch_errors_) err_status.MergeStatus(err);
     return err_status;
   }



Mime
View raw message