impala-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tarmstr...@apache.org
Subject [1/2] incubator-impala git commit: IMPALA-6121: remove I/O mgr request context cache
Date Wed, 15 Nov 2017 22:06:41 GMT
Repository: incubator-impala
Updated Branches:
  refs/heads/master e98d2f1c0 -> 87fc463e0


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/87fc463e/be/src/runtime/disk-io-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-test.cc b/be/src/runtime/disk-io-mgr-test.cc
index 8d36ea6..eb8d3c7 100644
--- a/be/src/runtime/disk-io-mgr-test.cc
+++ b/be/src/runtime/disk-io-mgr-test.cc
@@ -20,13 +20,14 @@
 #include <boost/thread/thread.hpp>
 #include <sys/stat.h>
 
-#include "testutil/gtest-util.h"
 #include "codegen/llvm-codegen.h"
 #include "common/init.h"
-#include "runtime/disk-io-mgr.h"
+#include "runtime/disk-io-mgr-reader-context.h"
 #include "runtime/disk-io-mgr-stress.h"
+#include "runtime/disk-io-mgr.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/thread-resource-mgr.h"
+#include "testutil/gtest-util.h"
 #include "util/condition-variable.h"
 #include "util/cpu-info.h"
 #include "util/disk-info.h"
@@ -62,7 +63,7 @@ class DiskIoMgrTest : public testing::Test {
     }
     if (status.ok()) {
       DiskIoMgr::ScanRange* scan_range = pool_.Add(new DiskIoMgr::ScanRange());
-      scan_range->Reset(NULL, (*written_range)->file(), (*written_range)->len(),
+      scan_range->Reset(nullptr, (*written_range)->file(), (*written_range)->len(),
           (*written_range)->offset(), 0, false, DiskIoMgr::BufferOpts::Uncached());
       ValidateSyncRead(io_mgr, reader, scan_range, reinterpret_cast<const char*>(data),
           sizeof(int32_t));
@@ -87,14 +88,14 @@ class DiskIoMgrTest : public testing::Test {
  protected:
   void CreateTempFile(const char* filename, const char* data) {
     FILE* file = fopen(filename, "w");
-    EXPECT_TRUE(file != NULL);
+    EXPECT_TRUE(file != nullptr);
     fwrite(data, 1, strlen(data), file);
     fclose(file);
   }
 
   int CreateTempFile(const char* filename, int file_size) {
     FILE* file = fopen(filename, "w");
-    EXPECT_TRUE(file != NULL);
+    EXPECT_TRUE(file != nullptr);
     int success = fclose(file);
     if (success != 0) {
       LOG(ERROR) << "Error closing file " << filename;
@@ -116,7 +117,7 @@ class DiskIoMgrTest : public testing::Test {
       DiskIoMgr::ScanRange* range, const char* expected, int expected_len = -1) {
     unique_ptr<DiskIoMgr::BufferDescriptor> buffer;
     ASSERT_OK(io_mgr->Read(reader, range, &buffer));
-    ASSERT_TRUE(buffer != NULL);
+    ASSERT_TRUE(buffer != nullptr);
     EXPECT_EQ(buffer->len(), range->len());
     if (expected_len < 0) expected_len = strlen(expected);
     int cmp = memcmp(buffer->buffer(), expected, expected_len);
@@ -133,8 +134,8 @@ class DiskIoMgrTest : public testing::Test {
       unique_ptr<DiskIoMgr::BufferDescriptor> buffer;
       Status status = range->GetNext(&buffer);
       ASSERT_TRUE(status.ok() || status.code() == expected_status.code());
-      if (buffer == NULL || !status.ok()) {
-        if (buffer != NULL) io_mgr->ReturnBuffer(move(buffer));
+      if (buffer == nullptr || !status.ok()) {
+        if (buffer != nullptr) io_mgr->ReturnBuffer(move(buffer));
         break;
       }
       ASSERT_LE(buffer->len(), expected_len);
@@ -155,7 +156,7 @@ class DiskIoMgrTest : public testing::Test {
       DiskIoMgr::ScanRange* range;
       Status status = io_mgr->GetNextRange(reader, &range);
       ASSERT_TRUE(status.ok() || status.code() == expected_status.code());
-      if (range == NULL) break;
+      if (range == nullptr) break;
       ValidateScanRange(io_mgr, range, expected_result, expected_len, expected_status);
       num_ranges_processed->Add(1);
       ++num_ranges;
@@ -167,9 +168,9 @@ class DiskIoMgrTest : public testing::Test {
   }
 
   DiskIoMgr::ScanRange* InitRange(const char* file_path, int offset, int len,
-      int disk_id, int64_t mtime, void* meta_data = NULL, bool is_cached = false) {
+      int disk_id, int64_t mtime, void* meta_data = nullptr, bool is_cached = false) {
     DiskIoMgr::ScanRange* range = AllocateRange();
-    range->Reset(NULL, file_path, len, offset, disk_id, true,
+    range->Reset(nullptr, file_path, len, offset, disk_id, true,
         DiskIoMgr::BufferOpts(is_cached, mtime), meta_data);
     EXPECT_EQ(mtime, range->mtime());
     return range;
@@ -203,26 +204,25 @@ TEST_F(DiskIoMgrTest, SingleWriter) {
   scoped_ptr<DiskIoMgr> read_io_mgr(new DiskIoMgr(1, 1, 1, 1, 10));
   MemTracker reader_mem_tracker(LARGE_MEM_LIMIT);
   ASSERT_OK(read_io_mgr->Init(&reader_mem_tracker));
-  DiskIoRequestContext* reader;
-  read_io_mgr->RegisterContext(&reader, &reader_mem_tracker);
+  unique_ptr<DiskIoRequestContext> reader =
+      read_io_mgr->RegisterContext(&reader_mem_tracker);
   for (int num_threads_per_disk = 1; num_threads_per_disk <= 5; ++num_threads_per_disk) {
     for (int num_disks = 1; num_disks <= 5; num_disks += 2) {
       pool_.Clear(); // Destroy scan ranges from previous iterations.
       DiskIoMgr io_mgr(num_disks, num_threads_per_disk, num_threads_per_disk, 1, 10);
       ASSERT_OK(io_mgr.Init(&mem_tracker));
-      DiskIoRequestContext* writer;
-      io_mgr.RegisterContext(&writer, &mem_tracker);
+      unique_ptr<DiskIoRequestContext> writer = io_mgr.RegisterContext(&mem_tracker);
       for (int i = 0; i < num_ranges; ++i) {
         int32_t* data = pool_.Add(new int32_t);
         *data = rand();
         DiskIoMgr::WriteRange** new_range = pool_.Add(new DiskIoMgr::WriteRange*);
         DiskIoMgr::WriteRange::WriteDoneCallback callback =
             bind(mem_fn(&DiskIoMgrTest::WriteValidateCallback), this, num_ranges,
-                new_range, read_io_mgr.get(), reader, data, Status::OK(), _1);
-        *new_range = pool_.Add(new DiskIoMgr::WriteRange(tmp_file, cur_offset,
-            num_ranges % num_disks, callback));
+                new_range, read_io_mgr.get(), reader.get(), data, Status::OK(), _1);
+        *new_range = pool_.Add(new DiskIoMgr::WriteRange(
+            tmp_file, cur_offset, num_ranges % num_disks, callback));
         (*new_range)->SetData(reinterpret_cast<uint8_t*>(data), sizeof(int32_t));
-        EXPECT_OK(io_mgr.AddWriteRange(writer, *new_range));
+        EXPECT_OK(io_mgr.AddWriteRange(writer.get(), *new_range));
         cur_offset += sizeof(int32_t);
       }
 
@@ -231,11 +231,11 @@ TEST_F(DiskIoMgrTest, SingleWriter) {
         while (num_ranges_written_ < num_ranges) writes_done_.Wait(lock);
       }
       num_ranges_written_ = 0;
-      io_mgr.UnregisterContext(writer);
+      io_mgr.UnregisterContext(writer.get());
     }
   }
 
-  read_io_mgr->UnregisterContext(reader);
+  read_io_mgr->UnregisterContext(reader.get());
   read_io_mgr.reset();
 }
 
@@ -247,8 +247,7 @@ TEST_F(DiskIoMgrTest, InvalidWrite) {
   string tmp_file = "/non-existent/file.txt";
   DiskIoMgr io_mgr(1, 1, 1, 1, 10);
   ASSERT_OK(io_mgr.Init(&mem_tracker));
-  DiskIoRequestContext* writer;
-  io_mgr.RegisterContext(&writer, NULL);
+  unique_ptr<DiskIoRequestContext> writer = io_mgr.RegisterContext(nullptr);
   int32_t* data = pool_.Add(new int32_t);
   *data = rand();
 
@@ -256,12 +255,12 @@ TEST_F(DiskIoMgrTest, InvalidWrite) {
   DiskIoMgr::WriteRange** new_range = pool_.Add(new DiskIoMgr::WriteRange*);
   DiskIoMgr::WriteRange::WriteDoneCallback callback =
       bind(mem_fn(&DiskIoMgrTest::WriteValidateCallback), this, 2, new_range,
-          (DiskIoMgr*)NULL, (DiskIoRequestContext*)NULL, data,
+          (DiskIoMgr*)nullptr, (DiskIoRequestContext*)nullptr, data,
           Status(TErrorCode::DISK_IO_ERROR, "Test Failure"), _1);
   *new_range = pool_.Add(new DiskIoMgr::WriteRange(tmp_file, rand(), 0, callback));
 
   (*new_range)->SetData(reinterpret_cast<uint8_t*>(data), sizeof(int32_t));
-  EXPECT_OK(io_mgr.AddWriteRange(writer, *new_range));
+  EXPECT_OK(io_mgr.AddWriteRange(writer.get(), *new_range));
 
   // Write to a bad location in a file that exists.
   tmp_file = "/tmp/disk_io_mgr_test.txt";
@@ -273,19 +272,19 @@ TEST_F(DiskIoMgrTest, InvalidWrite) {
 
   new_range = pool_.Add(new DiskIoMgr::WriteRange*);
   callback = bind(mem_fn(&DiskIoMgrTest::WriteValidateCallback), this, 2,
-      new_range, (DiskIoMgr*)NULL, (DiskIoRequestContext*)NULL,
+      new_range, (DiskIoMgr*)nullptr, (DiskIoRequestContext*)nullptr,
       data, Status(TErrorCode::DISK_IO_ERROR, "Test Failure"), _1);
 
   *new_range = pool_.Add(new DiskIoMgr::WriteRange(tmp_file, -1, 0, callback));
   (*new_range)->SetData(reinterpret_cast<uint8_t*>(data), sizeof(int32_t));
-  EXPECT_OK(io_mgr.AddWriteRange(writer, *new_range));
+  EXPECT_OK(io_mgr.AddWriteRange(writer.get(), *new_range));
 
   {
     unique_lock<mutex> lock(written_mutex_);
     while (num_ranges_written_ < 2) writes_done_.Wait(lock);
   }
   num_ranges_written_ = 0;
-  io_mgr.UnregisterContext(writer);
+  io_mgr.UnregisterContext(writer.get());
 }
 
 // Issue a number of writes, cancel the writer context and issue more writes.
@@ -309,33 +308,31 @@ TEST_F(DiskIoMgrTest, SingleWriterCancel) {
   scoped_ptr<DiskIoMgr> read_io_mgr(new DiskIoMgr(1, 1, 1, 1, 10));
   MemTracker reader_mem_tracker(LARGE_MEM_LIMIT);
   ASSERT_OK(read_io_mgr->Init(&reader_mem_tracker));
-  DiskIoRequestContext* reader;
-  read_io_mgr->RegisterContext(&reader, &reader_mem_tracker);
+  unique_ptr<DiskIoRequestContext> reader =
+      read_io_mgr->RegisterContext(&reader_mem_tracker);
   for (int num_threads_per_disk = 1; num_threads_per_disk <= 5; ++num_threads_per_disk) {
     for (int num_disks = 1; num_disks <= 5; num_disks += 2) {
       pool_.Clear(); // Destroy scan ranges from previous iterations.
       DiskIoMgr io_mgr(num_disks, num_threads_per_disk, num_threads_per_disk, 1, 10);
       ASSERT_OK(io_mgr.Init(&mem_tracker));
-      DiskIoRequestContext* writer;
-      io_mgr.RegisterContext(&writer, &mem_tracker);
+      unique_ptr<DiskIoRequestContext> writer = io_mgr.RegisterContext(&mem_tracker);
       Status validate_status = Status::OK();
       for (int i = 0; i < num_ranges; ++i) {
         if (i == num_ranges_before_cancel) {
-          io_mgr.CancelContext(writer);
+          io_mgr.CancelContext(writer.get());
           validate_status = Status::CANCELLED;
         }
         int32_t* data = pool_.Add(new int32_t);
         *data = rand();
         DiskIoMgr::WriteRange** new_range = pool_.Add(new DiskIoMgr::WriteRange*);
-        DiskIoMgr::WriteRange::WriteDoneCallback callback =
-            bind(mem_fn(&DiskIoMgrTest::WriteValidateCallback), this,
-                num_ranges_before_cancel, new_range, read_io_mgr.get(), reader, data,
-                Status::CANCELLED, _1);
-        *new_range = pool_.Add(new DiskIoMgr::WriteRange(tmp_file, cur_offset,
-            num_ranges % num_disks, callback));
+        DiskIoMgr::WriteRange::WriteDoneCallback callback = bind(
+            mem_fn(&DiskIoMgrTest::WriteValidateCallback), this, num_ranges_before_cancel,
+            new_range, read_io_mgr.get(), reader.get(), data, Status::CANCELLED, _1);
+        *new_range = pool_.Add(new DiskIoMgr::WriteRange(
+            tmp_file, cur_offset, num_ranges % num_disks, callback));
         (*new_range)->SetData(reinterpret_cast<uint8_t*>(data), sizeof(int32_t));
         cur_offset += sizeof(int32_t);
-        Status add_status = io_mgr.AddWriteRange(writer, *new_range);
+        Status add_status = io_mgr.AddWriteRange(writer.get(), *new_range);
         EXPECT_TRUE(add_status.code() == validate_status.code());
       }
 
@@ -344,11 +341,11 @@ TEST_F(DiskIoMgrTest, SingleWriterCancel) {
         while (num_ranges_written_ < num_ranges_before_cancel) writes_done_.Wait(lock);
       }
       num_ranges_written_ = 0;
-      io_mgr.UnregisterContext(writer);
+      io_mgr.UnregisterContext(writer.get());
     }
   }
 
-  read_io_mgr->UnregisterContext(reader);
+  read_io_mgr->UnregisterContext(reader.get());
   read_io_mgr.reset();
 }
 
@@ -379,27 +376,26 @@ TEST_F(DiskIoMgrTest, SingleReader) {
 
         ASSERT_OK(io_mgr.Init(&mem_tracker));
         MemTracker reader_mem_tracker;
-        DiskIoRequestContext* reader;
-        io_mgr.RegisterContext(&reader, &reader_mem_tracker);
+        unique_ptr<DiskIoRequestContext> reader =
+            io_mgr.RegisterContext(&reader_mem_tracker);
 
         vector<DiskIoMgr::ScanRange*> ranges;
         for (int i = 0; i < len; ++i) {
           int disk_id = i % num_disks;
-          ranges.push_back(
-              InitRange(tmp_file, 0, len, disk_id, stat_val.st_mtime));
+          ranges.push_back(InitRange(tmp_file, 0, len, disk_id, stat_val.st_mtime));
         }
-        ASSERT_OK(io_mgr.AddScanRanges(reader, ranges));
+        ASSERT_OK(io_mgr.AddScanRanges(reader.get(), ranges));
 
         AtomicInt32 num_ranges_processed;
         thread_group threads;
         for (int i = 0; i < num_read_threads; ++i) {
-          threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader, data,
-              len, Status::OK(), 0, &num_ranges_processed));
+          threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader.get(), data, len,
+              Status::OK(), 0, &num_ranges_processed));
         }
         threads.join_all();
 
         EXPECT_EQ(num_ranges_processed.Load(), ranges.size());
-        io_mgr.UnregisterContext(reader);
+        io_mgr.UnregisterContext(reader.get());
         EXPECT_EQ(reader_mem_tracker.consumption(), 0);
       }
     }
@@ -431,8 +427,8 @@ TEST_F(DiskIoMgrTest, AddScanRangeTest) {
 
       ASSERT_OK(io_mgr.Init(&mem_tracker));
       MemTracker reader_mem_tracker;
-      DiskIoRequestContext* reader;
-      io_mgr.RegisterContext(&reader, &reader_mem_tracker);
+      unique_ptr<DiskIoRequestContext> reader =
+          io_mgr.RegisterContext(&reader_mem_tracker);
 
       vector<DiskIoMgr::ScanRange*> ranges_first_half;
       vector<DiskIoMgr::ScanRange*> ranges_second_half;
@@ -449,25 +445,25 @@ TEST_F(DiskIoMgrTest, AddScanRangeTest) {
       AtomicInt32 num_ranges_processed;
 
       // Issue first half the scan ranges.
-      ASSERT_OK(io_mgr.AddScanRanges(reader, ranges_first_half));
+      ASSERT_OK(io_mgr.AddScanRanges(reader.get(), ranges_first_half));
 
       // Read a couple of them
-      ScanRangeThread(&io_mgr, reader, data, strlen(data), Status::OK(), 2,
+      ScanRangeThread(&io_mgr, reader.get(), data, strlen(data), Status::OK(), 2,
           &num_ranges_processed);
 
       // Issue second half
-      ASSERT_OK(io_mgr.AddScanRanges(reader, ranges_second_half));
+      ASSERT_OK(io_mgr.AddScanRanges(reader.get(), ranges_second_half));
 
       // Start up some threads and then cancel
       thread_group threads;
       for (int i = 0; i < 3; ++i) {
-        threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader, data,
+        threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader.get(), data,
             strlen(data), Status::CANCELLED, 0, &num_ranges_processed));
       }
 
       threads.join_all();
       EXPECT_EQ(num_ranges_processed.Load(), len);
-      io_mgr.UnregisterContext(reader);
+      io_mgr.UnregisterContext(reader.get());
       EXPECT_EQ(reader_mem_tracker.consumption(), 0);
     }
   }
@@ -501,44 +497,43 @@ TEST_F(DiskIoMgrTest, SyncReadTest) {
 
       ASSERT_OK(io_mgr.Init(&mem_tracker));
       MemTracker reader_mem_tracker;
-      DiskIoRequestContext* reader;
-      io_mgr.RegisterContext(&reader, &reader_mem_tracker);
+      unique_ptr<DiskIoRequestContext> reader =
+          io_mgr.RegisterContext(&reader_mem_tracker);
 
-      DiskIoMgr::ScanRange* complete_range = InitRange(tmp_file, 0, strlen(data), 0,
-          stat_val.st_mtime);
+      DiskIoMgr::ScanRange* complete_range =
+          InitRange(tmp_file, 0, strlen(data), 0, stat_val.st_mtime);
 
       // Issue some reads before the async ones are issued
-      ValidateSyncRead(&io_mgr, reader, complete_range, data);
-      ValidateSyncRead(&io_mgr, reader, complete_range, data);
+      ValidateSyncRead(&io_mgr, reader.get(), complete_range, data);
+      ValidateSyncRead(&io_mgr, reader.get(), complete_range, data);
 
       vector<DiskIoMgr::ScanRange*> ranges;
       for (int i = 0; i < len; ++i) {
         int disk_id = i % num_disks;
-        ranges.push_back(InitRange(tmp_file, 0, len, disk_id,
-            stat_val.st_mtime));
+        ranges.push_back(InitRange(tmp_file, 0, len, disk_id, stat_val.st_mtime));
       }
-      ASSERT_OK(io_mgr.AddScanRanges(reader, ranges));
+      ASSERT_OK(io_mgr.AddScanRanges(reader.get(), ranges));
 
       AtomicInt32 num_ranges_processed;
       thread_group threads;
       for (int i = 0; i < 5; ++i) {
-        threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader, data,
+        threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader.get(), data,
             strlen(data), Status::OK(), 0, &num_ranges_processed));
       }
 
       // Issue some more sync ranges
       for (int i = 0; i < 5; ++i) {
         sched_yield();
-        ValidateSyncRead(&io_mgr, reader, complete_range, data);
+        ValidateSyncRead(&io_mgr, reader.get(), complete_range, data);
       }
 
       threads.join_all();
 
-      ValidateSyncRead(&io_mgr, reader, complete_range, data);
-      ValidateSyncRead(&io_mgr, reader, complete_range, data);
+      ValidateSyncRead(&io_mgr, reader.get(), complete_range, data);
+      ValidateSyncRead(&io_mgr, reader.get(), complete_range, data);
 
       EXPECT_EQ(num_ranges_processed.Load(), ranges.size());
-      io_mgr.UnregisterContext(reader);
+      io_mgr.UnregisterContext(reader.get());
       EXPECT_EQ(reader_mem_tracker.consumption(), 0);
     }
   }
@@ -569,21 +564,21 @@ TEST_F(DiskIoMgrTest, SingleReaderCancel) {
 
       ASSERT_OK(io_mgr.Init(&mem_tracker));
       MemTracker reader_mem_tracker;
-      DiskIoRequestContext* reader;
-      io_mgr.RegisterContext(&reader, &reader_mem_tracker);
+      unique_ptr<DiskIoRequestContext> reader =
+          io_mgr.RegisterContext(&reader_mem_tracker);
 
       vector<DiskIoMgr::ScanRange*> ranges;
       for (int i = 0; i < len; ++i) {
         int disk_id = i % num_disks;
         ranges.push_back(InitRange(tmp_file, 0, len, disk_id, stat_val.st_mtime));
       }
-      ASSERT_OK(io_mgr.AddScanRanges(reader, ranges));
+      ASSERT_OK(io_mgr.AddScanRanges(reader.get(), ranges));
 
       AtomicInt32 num_ranges_processed;
       int num_succesful_ranges = ranges.size() / 2;
       // Read half the ranges
       for (int i = 0; i < num_succesful_ranges; ++i) {
-        ScanRangeThread(&io_mgr, reader, data, strlen(data), Status::OK(), 1,
+        ScanRangeThread(&io_mgr, reader.get(), data, strlen(data), Status::OK(), 1,
             &num_ranges_processed);
       }
       EXPECT_EQ(num_ranges_processed.Load(), num_succesful_ranges);
@@ -591,16 +586,16 @@ TEST_F(DiskIoMgrTest, SingleReaderCancel) {
       // Start up some threads and then cancel
       thread_group threads;
       for (int i = 0; i < 3; ++i) {
-        threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader, data,
+        threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader.get(), data,
             strlen(data), Status::CANCELLED, 0, &num_ranges_processed));
       }
 
-      io_mgr.CancelContext(reader);
+      io_mgr.CancelContext(reader.get());
       sched_yield();
 
       threads.join_all();
-      EXPECT_TRUE(io_mgr.context_status(reader).IsCancelled());
-      io_mgr.UnregisterContext(reader);
+      EXPECT_TRUE(io_mgr.context_status(reader.get()).IsCancelled());
+      io_mgr.UnregisterContext(reader.get());
       EXPECT_EQ(reader_mem_tracker.consumption(), 0);
     }
   }
@@ -627,20 +622,19 @@ TEST_F(DiskIoMgrTest, MemLimits) {
 
     ASSERT_OK(io_mgr.Init(&root_mem_tracker));
     MemTracker reader_mem_tracker(-1, "Reader", &root_mem_tracker);
-    DiskIoRequestContext* reader;
-    io_mgr.RegisterContext(&reader, &reader_mem_tracker);
+    unique_ptr<DiskIoRequestContext> reader = io_mgr.RegisterContext(&reader_mem_tracker);
 
     vector<DiskIoMgr::ScanRange*> ranges;
     for (int i = 0; i < num_ranges; ++i) {
       ranges.push_back(InitRange(tmp_file, 0, len, 0, stat_val.st_mtime));
     }
-    ASSERT_OK(io_mgr.AddScanRanges(reader, ranges));
+    ASSERT_OK(io_mgr.AddScanRanges(reader.get(), ranges));
 
     // Don't return buffers to force memory pressure
     vector<unique_ptr<DiskIoMgr::BufferDescriptor>> buffers;
 
     AtomicInt32 num_ranges_processed;
-    ScanRangeThread(&io_mgr, reader, data, strlen(data), Status::MemLimitExceeded(),
+    ScanRangeThread(&io_mgr, reader.get(), data, strlen(data), Status::MemLimitExceeded(),
         1, &num_ranges_processed);
 
     char result[strlen(data) + 1];
@@ -648,16 +642,16 @@ TEST_F(DiskIoMgrTest, MemLimits) {
     // to go over the limit eventually.
     while (true) {
       memset(result, 0, strlen(data) + 1);
-      DiskIoMgr::ScanRange* range = NULL;
-      Status status = io_mgr.GetNextRange(reader, &range);
+      DiskIoMgr::ScanRange* range = nullptr;
+      Status status = io_mgr.GetNextRange(reader.get(), &range);
       ASSERT_TRUE(status.ok() || status.IsMemLimitExceeded());
-      if (range == NULL) break;
+      if (range == nullptr) break;
 
       while (true) {
         unique_ptr<DiskIoMgr::BufferDescriptor> buffer;
         Status status = range->GetNext(&buffer);
         ASSERT_TRUE(status.ok() || status.IsMemLimitExceeded());
-        if (buffer == NULL) break;
+        if (buffer == nullptr) break;
         memcpy(result + range->offset() + buffer->scan_range_offset(),
             buffer->buffer(), buffer->len());
         buffers.push_back(move(buffer));
@@ -669,8 +663,8 @@ TEST_F(DiskIoMgrTest, MemLimits) {
       io_mgr.ReturnBuffer(move(buffers[i]));
     }
 
-    EXPECT_TRUE(io_mgr.context_status(reader).IsMemLimitExceeded());
-    io_mgr.UnregisterContext(reader);
+    EXPECT_TRUE(io_mgr.context_status(reader.get()).IsMemLimitExceeded());
+    io_mgr.UnregisterContext(reader.get());
     EXPECT_EQ(reader_mem_tracker.consumption(), 0);
   }
 }
@@ -696,44 +690,43 @@ TEST_F(DiskIoMgrTest, CachedReads) {
 
     ASSERT_OK(io_mgr.Init(&mem_tracker));
     MemTracker reader_mem_tracker;
-    DiskIoRequestContext* reader;
-    io_mgr.RegisterContext(&reader, &reader_mem_tracker);
+    unique_ptr<DiskIoRequestContext> reader = io_mgr.RegisterContext(&reader_mem_tracker);
 
     DiskIoMgr::ScanRange* complete_range =
-        InitRange(tmp_file, 0, strlen(data), 0, stat_val.st_mtime, NULL, true);
+        InitRange(tmp_file, 0, strlen(data), 0, stat_val.st_mtime, nullptr, true);
 
     // Issue some reads before the async ones are issued
-    ValidateSyncRead(&io_mgr, reader, complete_range, data);
-    ValidateSyncRead(&io_mgr, reader, complete_range, data);
+    ValidateSyncRead(&io_mgr, reader.get(), complete_range, data);
+    ValidateSyncRead(&io_mgr, reader.get(), complete_range, data);
 
     vector<DiskIoMgr::ScanRange*> ranges;
     for (int i = 0; i < len; ++i) {
       int disk_id = i % num_disks;
       ranges.push_back(
-          InitRange(tmp_file, 0, len, disk_id, stat_val.st_mtime, NULL, true));
+          InitRange(tmp_file, 0, len, disk_id, stat_val.st_mtime, nullptr, true));
     }
-    ASSERT_OK(io_mgr.AddScanRanges(reader, ranges));
+    ASSERT_OK(io_mgr.AddScanRanges(reader.get(), ranges));
 
     AtomicInt32 num_ranges_processed;
     thread_group threads;
     for (int i = 0; i < 5; ++i) {
-      threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader, data,
+      threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader.get(), data,
           strlen(data), Status::OK(), 0, &num_ranges_processed));
     }
 
     // Issue some more sync ranges
     for (int i = 0; i < 5; ++i) {
       sched_yield();
-      ValidateSyncRead(&io_mgr, reader, complete_range, data);
+      ValidateSyncRead(&io_mgr, reader.get(), complete_range, data);
     }
 
     threads.join_all();
 
-    ValidateSyncRead(&io_mgr, reader, complete_range, data);
-    ValidateSyncRead(&io_mgr, reader, complete_range, data);
+    ValidateSyncRead(&io_mgr, reader.get(), complete_range, data);
+    ValidateSyncRead(&io_mgr, reader.get(), complete_range, data);
 
     EXPECT_EQ(num_ranges_processed.Load(), ranges.size());
-    io_mgr.UnregisterContext(reader);
+    io_mgr.UnregisterContext(reader.get());
     EXPECT_EQ(reader_mem_tracker.consumption(), 0);
   }
   EXPECT_EQ(mem_tracker.consumption(), 0);
@@ -761,7 +754,7 @@ TEST_F(DiskIoMgrTest, MultipleReaderWriter) {
   stat(file_name.c_str(), &stat_val);
 
   int64_t iters = 0;
-  vector<DiskIoRequestContext*> contexts(num_contexts);
+  vector<unique_ptr<DiskIoRequestContext>> contexts(num_contexts);
   Status status;
   for (int iteration = 0; iteration < ITERATIONS; ++iteration) {
     for (int threads_per_disk = 1; threads_per_disk <= 5; ++threads_per_disk) {
@@ -770,7 +763,7 @@ TEST_F(DiskIoMgrTest, MultipleReaderWriter) {
             MAX_BUFFER_SIZE);
         ASSERT_OK(io_mgr.Init(&mem_tracker));
         for (int file_index = 0; file_index < num_contexts; ++file_index) {
-          io_mgr.RegisterContext(&contexts[file_index], &mem_tracker);
+          contexts[file_index] = io_mgr.RegisterContext(&mem_tracker);
         }
         pool_.Clear();
         int read_offset = 0;
@@ -783,12 +776,12 @@ TEST_F(DiskIoMgrTest, MultipleReaderWriter) {
             vector<DiskIoMgr::ScanRange*> ranges;
             int num_scan_ranges = min<int>(num_reads_queued, write_offset - read_offset);
             for (int i = 0; i < num_scan_ranges; ++i) {
-              ranges.push_back(InitRange(file_name.c_str(), read_offset, 1,
-                  i % num_disks, stat_val.st_mtime));
-              threads.add_thread(new thread(ScanRangeThread, &io_mgr,
-                  contexts[context_index],
-                  reinterpret_cast<const char*>(data + (read_offset % strlen(data))), 1,
-                  Status::OK(), num_scan_ranges, &num_ranges_processed));
+              ranges.push_back(InitRange(
+                  file_name.c_str(), read_offset, 1, i % num_disks, stat_val.st_mtime));
+              threads.add_thread(
+                  new thread(ScanRangeThread, &io_mgr, contexts[context_index].get(),
+                      reinterpret_cast<const char*>(data + (read_offset % strlen(data))),
+                      1, Status::OK(), num_scan_ranges, &num_ranges_processed));
               ++read_offset;
             }
 
@@ -798,12 +791,12 @@ TEST_F(DiskIoMgrTest, MultipleReaderWriter) {
               DiskIoMgr::WriteRange::WriteDoneCallback callback =
                   bind(mem_fn(&DiskIoMgrTest::WriteCompleteCallback),
                       this, num_write_ranges, _1);
-              DiskIoMgr::WriteRange* new_range = pool_.Add(
-                  new DiskIoMgr::WriteRange(file_name,
-                      write_offset, i % num_disks, callback));
-              new_range->SetData(reinterpret_cast<const uint8_t*>
-                  (data + (write_offset % strlen(data))), 1);
-              status = io_mgr.AddWriteRange(contexts[context_index], new_range);
+              DiskIoMgr::WriteRange* new_range = pool_.Add(new DiskIoMgr::WriteRange(
+                  file_name, write_offset, i % num_disks, callback));
+              new_range->SetData(
+                  reinterpret_cast<const uint8_t*>(data + (write_offset % strlen(data))),
+                  1);
+              status = io_mgr.AddWriteRange(contexts[context_index].get(), new_range);
               ++write_offset;
             }
 
@@ -816,9 +809,8 @@ TEST_F(DiskIoMgrTest, MultipleReaderWriter) {
           } // for (int context_index
         } // while (read_offset < file_size)
 
-
         for (int file_index = 0; file_index < num_contexts; ++file_index) {
-          io_mgr.UnregisterContext(contexts[file_index]);
+          io_mgr.UnregisterContext(contexts[file_index].get());
         }
       } // for (int num_disks
     } // for (int threads_per_disk
@@ -836,7 +828,7 @@ TEST_F(DiskIoMgrTest, MultipleReader) {
   vector<string> file_names;
   vector<int64_t> mtimes;
   vector<string> data;
-  vector<DiskIoRequestContext*> readers;
+  vector<unique_ptr<DiskIoRequestContext>> readers;
   vector<char*> results;
 
   file_names.resize(NUM_READERS);
@@ -884,30 +876,28 @@ TEST_F(DiskIoMgrTest, MultipleReader) {
         EXPECT_OK(io_mgr.Init(&mem_tracker));
 
         for (int i = 0; i < NUM_READERS; ++i) {
-          io_mgr.RegisterContext(&readers[i], &mem_tracker);
+          readers[i] = io_mgr.RegisterContext(&mem_tracker);
 
           vector<DiskIoMgr::ScanRange*> ranges;
           for (int j = 0; j < DATA_LEN; ++j) {
             int disk_id = j % num_disks;
-            ranges.push_back(
-                InitRange(file_names[i].c_str(), j, 1, disk_id, mtimes[i]));
+            ranges.push_back(InitRange(file_names[i].c_str(), j, 1, disk_id, mtimes[i]));
           }
-          ASSERT_OK(io_mgr.AddScanRanges(readers[i], ranges));
+          ASSERT_OK(io_mgr.AddScanRanges(readers[i].get(), ranges));
         }
 
         AtomicInt32 num_ranges_processed;
         thread_group threads;
         for (int i = 0; i < NUM_READERS; ++i) {
           for (int j = 0; j < NUM_THREADS_PER_READER; ++j) {
-            threads.add_thread(new thread(ScanRangeThread, &io_mgr, readers[i],
-                data[i].c_str(), data[i].size(), Status::OK(), 0,
-                &num_ranges_processed));
+            threads.add_thread(new thread(ScanRangeThread, &io_mgr, readers[i].get(),
+                data[i].c_str(), data[i].size(), Status::OK(), 0, &num_ranges_processed));
           }
         }
         threads.join_all();
         EXPECT_EQ(num_ranges_processed.Load(), DATA_LEN * NUM_READERS);
         for (int i = 0; i < NUM_READERS; ++i) {
-          io_mgr.UnregisterContext(readers[i]);
+          io_mgr.UnregisterContext(readers[i].get());
         }
       }
     }
@@ -935,16 +925,16 @@ TEST_F(DiskIoMgrTest, Buffers) {
   ASSERT_EQ(root_mem_tracker.consumption(), 0);
 
   MemTracker reader_mem_tracker(-1, "Reader", &root_mem_tracker);
-  DiskIoRequestContext* reader;
-  io_mgr.RegisterContext(&reader, &reader_mem_tracker);
+  unique_ptr<DiskIoRequestContext> reader;
+  reader = io_mgr.RegisterContext(&reader_mem_tracker);
 
   DiskIoMgr::ScanRange* dummy_range = InitRange("dummy", 0, 0, 0, 0);
 
   // buffer length should be rounded up to min buffer size
   int64_t buffer_len = 1;
   unique_ptr<DiskIoMgr::BufferDescriptor> buffer_desc;
-  buffer_desc = io_mgr.GetFreeBuffer(reader, dummy_range, buffer_len);
-  EXPECT_TRUE(buffer_desc->buffer() != NULL);
+  buffer_desc = io_mgr.GetFreeBuffer(reader.get(), dummy_range, buffer_len);
+  EXPECT_TRUE(buffer_desc->buffer() != nullptr);
   EXPECT_EQ(min_buffer_size, buffer_desc->buffer_len());
   EXPECT_EQ(1, io_mgr.num_allocated_buffers_.Load());
   io_mgr.FreeBufferMemory(buffer_desc.get());
@@ -953,8 +943,8 @@ TEST_F(DiskIoMgrTest, Buffers) {
 
   // reuse buffer
   buffer_len = min_buffer_size;
-  buffer_desc = io_mgr.GetFreeBuffer(reader, dummy_range, buffer_len);
-  EXPECT_TRUE(buffer_desc->buffer() != NULL);
+  buffer_desc = io_mgr.GetFreeBuffer(reader.get(), dummy_range, buffer_len);
+  EXPECT_TRUE(buffer_desc->buffer() != nullptr);
   EXPECT_EQ(min_buffer_size, buffer_desc->buffer_len());
   EXPECT_EQ(1, io_mgr.num_allocated_buffers_.Load());
   io_mgr.FreeBufferMemory(buffer_desc.get());
@@ -963,8 +953,8 @@ TEST_F(DiskIoMgrTest, Buffers) {
 
   // bump up to next buffer size
   buffer_len = min_buffer_size + 1;
-  buffer_desc = io_mgr.GetFreeBuffer(reader, dummy_range, buffer_len);
-  EXPECT_TRUE(buffer_desc->buffer() != NULL);
+  buffer_desc = io_mgr.GetFreeBuffer(reader.get(), dummy_range, buffer_len);
+  EXPECT_TRUE(buffer_desc->buffer() != nullptr);
   EXPECT_EQ(min_buffer_size * 2, buffer_desc->buffer_len());
   EXPECT_EQ(2, io_mgr.num_allocated_buffers_.Load());
   EXPECT_EQ(min_buffer_size * 3, root_mem_tracker.consumption());
@@ -979,8 +969,8 @@ TEST_F(DiskIoMgrTest, Buffers) {
 
   // max buffer size
   buffer_len = max_buffer_size;
-  buffer_desc = io_mgr.GetFreeBuffer(reader, dummy_range, buffer_len);
-  EXPECT_TRUE(buffer_desc->buffer() != NULL);
+  buffer_desc = io_mgr.GetFreeBuffer(reader.get(), dummy_range, buffer_len);
+  EXPECT_TRUE(buffer_desc->buffer() != nullptr);
   EXPECT_EQ(max_buffer_size, buffer_desc->buffer_len());
   EXPECT_EQ(2, io_mgr.num_allocated_buffers_.Load());
   io_mgr.FreeBufferMemory(buffer_desc.get());
@@ -991,7 +981,7 @@ TEST_F(DiskIoMgrTest, Buffers) {
   io_mgr.GcIoBuffers();
   EXPECT_EQ(io_mgr.num_allocated_buffers_.Load(), 0);
   EXPECT_EQ(root_mem_tracker.consumption(), 0);
-  io_mgr.UnregisterContext(reader);
+  io_mgr.UnregisterContext(reader.get());
 }
 
 // IMPALA-2366: handle partial read where range goes past end of file.
@@ -1011,19 +1001,19 @@ TEST_F(DiskIoMgrTest, PartialRead) {
 
   ASSERT_OK(io_mgr->Init(&mem_tracker));
   MemTracker reader_mem_tracker;
-  DiskIoRequestContext* reader;
-  io_mgr->RegisterContext(&reader, &reader_mem_tracker);
+  unique_ptr<DiskIoRequestContext> reader;
+  reader = io_mgr->RegisterContext(&reader_mem_tracker);
 
   // We should not read past the end of file.
   DiskIoMgr::ScanRange* range = InitRange(tmp_file, 0, read_len, 0, stat_val.st_mtime);
   unique_ptr<DiskIoMgr::BufferDescriptor> buffer;
-  ASSERT_OK(io_mgr->Read(reader, range, &buffer));
+  ASSERT_OK(io_mgr->Read(reader.get(), range, &buffer));
   ASSERT_TRUE(buffer->eosr());
   ASSERT_EQ(len, buffer->len());
   ASSERT_TRUE(memcmp(buffer->buffer(), data, len) == 0);
   io_mgr->ReturnBuffer(move(buffer));
 
-  io_mgr->UnregisterContext(reader);
+  io_mgr->UnregisterContext(reader.get());
   pool_.Clear();
   io_mgr.reset();
   EXPECT_EQ(reader_mem_tracker.consumption(), 0);
@@ -1043,17 +1033,17 @@ TEST_F(DiskIoMgrTest, ReadIntoClientBuffer) {
 
   ASSERT_OK(io_mgr->Init(&mem_tracker));
   // Reader doesn't need to provide mem tracker if it's providing buffers.
-  MemTracker* reader_mem_tracker = NULL;
-  DiskIoRequestContext* reader;
-  io_mgr->RegisterContext(&reader, reader_mem_tracker);
+  MemTracker* reader_mem_tracker = nullptr;
+  unique_ptr<DiskIoRequestContext> reader;
+  reader = io_mgr->RegisterContext(reader_mem_tracker);
 
   for (int buffer_len : vector<int>({len - 1, len, len + 1})) {
     vector<uint8_t> client_buffer(buffer_len);
     int scan_len = min(len, buffer_len);
     DiskIoMgr::ScanRange* range = AllocateRange();
-    range->Reset(NULL, tmp_file, scan_len, 0, 0, true,
+    range->Reset(nullptr, tmp_file, scan_len, 0, 0, true,
         DiskIoMgr::BufferOpts::ReadInto(client_buffer.data(), buffer_len));
-    ASSERT_OK(io_mgr->AddScanRange(reader, range, true));
+    ASSERT_OK(io_mgr->AddScanRange(reader.get(), range, true));
 
     unique_ptr<DiskIoMgr::BufferDescriptor> io_buffer;
     ASSERT_OK(range->GetNext(&io_buffer));
@@ -1067,7 +1057,7 @@ TEST_F(DiskIoMgrTest, ReadIntoClientBuffer) {
     io_mgr->ReturnBuffer(move(io_buffer));
   }
 
-  io_mgr->UnregisterContext(reader);
+  io_mgr->UnregisterContext(reader.get());
   pool_.Clear();
   io_mgr.reset();
   EXPECT_EQ(mem_tracker.consumption(), 0);
@@ -1083,19 +1073,19 @@ TEST_F(DiskIoMgrTest, ReadIntoClientBufferError) {
 
   ASSERT_OK(io_mgr->Init(&mem_tracker));
   // Reader doesn't need to provide mem tracker if it's providing buffers.
-  MemTracker* reader_mem_tracker = NULL;
-  DiskIoRequestContext* reader;
+  MemTracker* reader_mem_tracker = nullptr;
+  unique_ptr<DiskIoRequestContext> reader;
   vector<uint8_t> client_buffer(SCAN_LEN);
   for (int i = 0; i < 1000; ++i) {
-    io_mgr->RegisterContext(&reader, reader_mem_tracker);
+    reader = io_mgr->RegisterContext(reader_mem_tracker);
     DiskIoMgr::ScanRange* range = AllocateRange();
-    range->Reset(NULL, tmp_file, SCAN_LEN, 0, 0, true,
+    range->Reset(nullptr, tmp_file, SCAN_LEN, 0, 0, true,
         DiskIoMgr::BufferOpts::ReadInto(client_buffer.data(), SCAN_LEN));
-    ASSERT_OK(io_mgr->AddScanRange(reader, range, true));
+    ASSERT_OK(io_mgr->AddScanRange(reader.get(), range, true));
 
     /// Also test the cancellation path. Run multiple iterations since it is racy whether
     /// the read fails before the cancellation.
-    if (i >= 1) io_mgr->CancelContext(reader);
+    if (i >= 1) io_mgr->CancelContext(reader.get());
 
     unique_ptr<DiskIoMgr::BufferDescriptor> io_buffer;
     ASSERT_FALSE(range->GetNext(&io_buffer).ok());
@@ -1103,7 +1093,7 @@ TEST_F(DiskIoMgrTest, ReadIntoClientBufferError) {
     // DiskIoMgr should not have allocated memory.
     EXPECT_EQ(mem_tracker.consumption(), 0);
 
-    io_mgr->UnregisterContext(reader);
+    io_mgr->UnregisterContext(reader.get());
   }
 
   pool_.Clear();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/87fc463e/be/src/runtime/disk-io-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr.cc b/be/src/runtime/disk-io-mgr.cc
index 8af70f5..d614ac7 100644
--- a/be/src/runtime/disk-io-mgr.cc
+++ b/be/src/runtime/disk-io-mgr.cc
@@ -123,90 +123,8 @@ static inline bool is_file_handle_caching_enabled() {
 }
 }
 
-// This class provides a cache of DiskIoRequestContext objects.  DiskIoRequestContexts
-// are recycled. This is good for locality as well as lock contention.  The cache has
-// the property that regardless of how many clients get added/removed, the memory
-// locations for existing clients do not change (not the case with std::vector)
-// minimizing the locks we have to take across all readers.
-// All functions on this object are thread safe
-class DiskIoMgr::RequestContextCache {
- public:
-  RequestContextCache(DiskIoMgr* io_mgr) : io_mgr_(io_mgr) {}
-
-  // Returns a context to the cache.  This object can now be reused.
-  void ReturnContext(DiskIoRequestContext* reader) {
-    DCHECK(reader->state_ != DiskIoRequestContext::Inactive);
-    reader->state_ = DiskIoRequestContext::Inactive;
-    lock_guard<mutex> l(lock_);
-    inactive_contexts_.push_back(reader);
-  }
-
-  // Returns a new DiskIoRequestContext object.  Allocates a new object if necessary.
-  DiskIoRequestContext* GetNewContext() {
-    lock_guard<mutex> l(lock_);
-    if (!inactive_contexts_.empty()) {
-      DiskIoRequestContext* reader = inactive_contexts_.front();
-      inactive_contexts_.pop_front();
-      return reader;
-    } else {
-      DiskIoRequestContext* reader =
-          new DiskIoRequestContext(io_mgr_, io_mgr_->num_total_disks());
-      all_contexts_.push_back(reader);
-      return reader;
-    }
-  }
-
-  // This object has the same lifetime as the disk IoMgr.
-  ~RequestContextCache() {
-    for (list<DiskIoRequestContext*>::iterator it = all_contexts_.begin();
-        it != all_contexts_.end(); ++it) {
-      delete *it;
-    }
-  }
-
-  // Validates that all readers are cleaned up and in the inactive state.  No locks
-  // are taken since this is only called from the disk IoMgr destructor.
-  bool ValidateAllInactive() {
-    for (list<DiskIoRequestContext*>::iterator it = all_contexts_.begin();
-        it != all_contexts_.end(); ++it) {
-      if ((*it)->state_ != DiskIoRequestContext::Inactive) {
-        return false;
-      }
-    }
-    DCHECK_EQ(all_contexts_.size(), inactive_contexts_.size());
-    return all_contexts_.size() == inactive_contexts_.size();
-  }
-
-  string DebugString();
-
- private:
-  DiskIoMgr* io_mgr_;
-
-  // lock to protect all members below
-  mutex lock_;
-
-  // List of all request contexts created.  Used for debugging
-  list<DiskIoRequestContext*> all_contexts_;
-
-  // List of inactive readers.  These objects can be used for a new reader.
-  list<DiskIoRequestContext*> inactive_contexts_;
-};
-
-string DiskIoMgr::RequestContextCache::DebugString() {
-  lock_guard<mutex> l(lock_);
-  stringstream ss;
-  for (list<DiskIoRequestContext*>::iterator it = all_contexts_.begin();
-      it != all_contexts_.end(); ++it) {
-    unique_lock<mutex> lock((*it)->lock_);
-    ss << (*it)->DebugString() << endl;
-  }
-  return ss.str();
-}
-
 string DiskIoMgr::DebugString() {
   stringstream ss;
-  ss << "RequestContexts: " << endl << request_context_cache_->DebugString() << endl;
-
   ss << "Disks: " << endl;
   for (int i = 0; i < disk_queues_.size(); ++i) {
     unique_lock<mutex> lock(disk_queues_[i]->lock);
@@ -358,9 +276,6 @@ DiskIoMgr::~DiskIoMgr() {
     }
   }
 
-  DCHECK(request_context_cache_.get() == nullptr ||
-      request_context_cache_->ValidateAllInactive())
-      << endl << DebugString();
   DCHECK_EQ(num_buffers_in_readers_.Load(), 0);
 
   // Delete all allocated buffers
@@ -407,7 +322,6 @@ Status DiskIoMgr::Init(MemTracker* process_mem_tracker) {
       disk_thread_group_.AddThread(move(t));
     }
   }
-  request_context_cache_.reset(new RequestContextCache(this));
   RETURN_IF_ERROR(file_handle_cache_.Init());
 
   cached_read_options_ = hadoopRzOptionsAlloc();
@@ -422,24 +336,13 @@ Status DiskIoMgr::Init(MemTracker* process_mem_tracker) {
   return Status::OK();
 }
 
-void DiskIoMgr::RegisterContext(DiskIoRequestContext** request_context,
-    MemTracker* mem_tracker) {
-  DCHECK(request_context_cache_.get() != nullptr) << "Must call Init() first.";
-  *request_context = request_context_cache_->GetNewContext();
-  (*request_context)->Reset(mem_tracker);
+unique_ptr<DiskIoRequestContext> DiskIoMgr::RegisterContext(MemTracker* mem_tracker) {
+  return unique_ptr<DiskIoRequestContext>(
+      new DiskIoRequestContext(this, num_total_disks(), mem_tracker));
 }
 
 void DiskIoMgr::UnregisterContext(DiskIoRequestContext* reader) {
-  // Blocking cancel (waiting for disks completion).
-  CancelContext(reader, true);
-
-  // All the disks are done with clean, validate nothing is leaking.
-  unique_lock<mutex> reader_lock(reader->lock_);
-  DCHECK_EQ(reader->num_buffers_in_reader_.Load(), 0) << endl << reader->DebugString();
-  DCHECK_EQ(reader->num_used_buffers_.Load(), 0) << endl << reader->DebugString();
-
-  DCHECK(reader->Validate()) << endl << reader->DebugString();
-  request_context_cache_->ReturnContext(reader);
+  reader->CancelAndMarkInactive();
 }
 
 // Cancellation requires coordination from multiple threads.  Each thread that currently
@@ -461,17 +364,8 @@ void DiskIoMgr::UnregisterContext(DiskIoRequestContext* reader) {
 // state, removes the context from the disk queue.  The last thread per disk with an
 // outstanding reference to the context decrements the number of disk queues the context
 // is on.
-// If wait_for_disks_completion is true, wait for the number of active disks to become 0.
-void DiskIoMgr::CancelContext(DiskIoRequestContext* context, bool wait_for_disks_completion) {
+void DiskIoMgr::CancelContext(DiskIoRequestContext* context) {
   context->Cancel(Status::CANCELLED);
-
-  if (wait_for_disks_completion) {
-    unique_lock<mutex> lock(context->lock_);
-    DCHECK(context->Validate()) << endl << context->DebugString();
-    while (context->num_disks_with_ranges_ > 0) {
-      context->disks_complete_cond_var_.Wait(lock);
-    }
-  }
 }
 
 void DiskIoMgr::set_read_timer(DiskIoRequestContext* r, RuntimeProfile::Counter* c) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/87fc463e/be/src/runtime/disk-io-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr.h b/be/src/runtime/disk-io-mgr.h
index e7a7122..49de0ff 100644
--- a/be/src/runtime/disk-io-mgr.h
+++ b/be/src/runtime/disk-io-mgr.h
@@ -645,33 +645,27 @@ class DiskIoMgr : public CacheLineAligned {
   Status Init(MemTracker* process_mem_tracker) WARN_UNUSED_RESULT;
 
   /// Allocates tracking structure for a request context.
-  /// Register a new request context which is returned in *request_context.
-  /// The IoMgr owns the allocated DiskIoRequestContext object. The caller must call
+  /// Register a new request context and return it to the caller. The caller must call
   /// UnregisterContext() for each context.
   /// reader_mem_tracker: Is non-null only for readers. IO buffers
   ///    used for this reader will be tracked by this. If the limit is exceeded
   ///    the reader will be cancelled and MEM_LIMIT_EXCEEDED will be returned via
   ///    GetNext().
-  void RegisterContext(DiskIoRequestContext** request_context,
-      MemTracker* reader_mem_tracker);
-
-  /// Unregisters context from the disk IoMgr. This must be called for every
-  /// RegisterContext() regardless of cancellation and must be called in the
-  /// same thread as GetNext()
-  /// The 'context' cannot be used after this call.
-  /// This call blocks until all the disk threads have finished cleaning up.
-  /// UnregisterContext also cancels the reader/writer from the disk IoMgr.
+  std::unique_ptr<DiskIoRequestContext> RegisterContext(MemTracker* reader_mem_tracker);
+
+  /// Unregisters context from the disk IoMgr by first cancelling it then blocking until
+  /// all references to the context are removed from I/O manager internal data structures.
+  /// This must be called for every RegisterContext() to ensure that the context object
+  /// can be safely destroyed. It is invalid to add more request ranges to 'context' after
+  /// after this call. This call blocks until all the disk threads have finished cleaning
+  /// up.
   void UnregisterContext(DiskIoRequestContext* context);
 
   /// This function cancels the context asychronously. All outstanding requests
   /// are aborted and tracking structures cleaned up. This does not need to be
   /// called if the context finishes normally.
   /// This will also fail any outstanding GetNext()/Read requests.
-  /// If wait_for_disks_completion is true, wait for the number of active disks for this
-  /// context to reach 0. After calling with wait_for_disks_completion = true, the only
-  /// valid API is returning IO buffers that have already been returned.
-  /// Takes context->lock_ if wait_for_disks_completion is true.
-  void CancelContext(DiskIoRequestContext* context, bool wait_for_disks_completion = false);
+  void CancelContext(DiskIoRequestContext* context);
 
   /// Adds the scan ranges to the queues. This call is non-blocking. The caller must
   /// not deallocate the scan range pointers before UnregisterContext().
@@ -825,7 +819,6 @@ class DiskIoMgr : public CacheLineAligned {
   friend class BufferDescriptor;
   friend class DiskIoRequestContext;
   struct DiskQueue;
-  class RequestContextCache;
 
   friend class DiskIoMgrTest_Buffers_Test;
   friend class DiskIoMgrTest_VerifyNumThreadsParameter_Test;
@@ -868,12 +861,6 @@ class DiskIoMgr : public CacheLineAligned {
   /// Total time spent in hdfs reading
   RuntimeProfile::Counter read_timer_;
 
-  /// Contains all contexts that the IoMgr is tracking. This includes contexts that are
-  /// active as well as those in the process of being cancelled. This is a cache
-  /// of context objects that get recycled to minimize object allocations and lock
-  /// contention.
-  boost::scoped_ptr<RequestContextCache> request_context_cache_;
-
   /// Protects free_buffers_
   boost::mutex free_buffers_lock_;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/87fc463e/be/src/runtime/runtime-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.cc b/be/src/runtime/runtime-state.cc
index 3415b0d..308b2c4 100644
--- a/be/src/runtime/runtime-state.cc
+++ b/be/src/runtime/runtime-state.cc
@@ -223,22 +223,8 @@ Status RuntimeState::CheckQueryState() {
   return GetQueryStatus();
 }
 
-void RuntimeState::AcquireReaderContext(DiskIoRequestContext* reader_context) {
-  boost::lock_guard<SpinLock> l(reader_contexts_lock_);
-  reader_contexts_.push_back(reader_context);
-}
-
-void RuntimeState::UnregisterReaderContexts() {
-  boost::lock_guard<SpinLock> l(reader_contexts_lock_);
-  for (DiskIoRequestContext* context : reader_contexts_) {
-    io_mgr()->UnregisterContext(context);
-  }
-  reader_contexts_.clear();
-}
-
 void RuntimeState::ReleaseResources() {
   DCHECK(!released_resources_);
-  UnregisterReaderContexts();
   if (filter_bank_ != nullptr) filter_bank_->Close();
   if (resource_pool_ != nullptr) {
     exec_env_->thread_mgr()->UnregisterPool(resource_pool_);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/87fc463e/be/src/runtime/runtime-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.h b/be/src/runtime/runtime-state.h
index c8ae147..74c27e5 100644
--- a/be/src/runtime/runtime-state.h
+++ b/be/src/runtime/runtime-state.h
@@ -36,7 +36,6 @@ class BufferPool;
 class DataStreamRecvr;
 class DescriptorTbl;
 class DiskIoMgr;
-class DiskIoRequestContext;
 class Expr;
 class LlvmCodeGen;
 class MemTracker;
@@ -195,14 +194,6 @@ class RuntimeState {
     return !CodegenDisabledByQueryOption() && !CodegenDisabledByHint();
   }
 
-  /// Takes ownership of a scan node's reader context and plan fragment executor will call
-  /// UnregisterReaderContexts() to unregister it when the fragment is closed. The IO
-  /// buffers may still be in use and thus the deferred unregistration.
-  void AcquireReaderContext(DiskIoRequestContext* reader_context);
-
-  /// Unregisters all reader contexts acquired through AcquireReaderContext().
-  void UnregisterReaderContexts();
-
   inline Status GetQueryStatus() {
     // Do a racy check for query_status_ to avoid unnecessary spinlock acquisition.
     if (UNLIKELY(!query_status_.ok())) {
@@ -389,12 +380,6 @@ class RuntimeState {
   SpinLock query_status_lock_;
   Status query_status_;
 
-  /// Reader contexts that need to be closed when the fragment is closed.
-  /// Synchronization is needed if there are multiple scan nodes in a plan fragment and
-  /// Close() may be called on them concurrently (see IMPALA-4180).
-  SpinLock reader_contexts_lock_;
-  std::vector<DiskIoRequestContext*> reader_contexts_;
-
   /// This is the node id of the root node for this plan fragment. This is used as the
   /// hash seed and has two useful properties:
   /// 1) It is the same for all exec nodes in a fragment, so the resulting hash values

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/87fc463e/be/src/runtime/tmp-file-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/tmp-file-mgr-test.cc b/be/src/runtime/tmp-file-mgr-test.cc
index 5f482ba..dde6348 100644
--- a/be/src/runtime/tmp-file-mgr-test.cc
+++ b/be/src/runtime/tmp-file-mgr-test.cc
@@ -132,7 +132,7 @@ class TmpFileMgrTest : public ::testing::Test {
 
   /// Helper to cancel the FileGroup DiskIoRequestContext.
   static void CancelIoContext(TmpFileMgr::FileGroup* group) {
-    group->io_mgr_->CancelContext(group->io_ctx_);
+    group->io_mgr_->CancelContext(group->io_ctx_.get());
   }
 
   /// Helper to get the # of bytes allocated by the group. Validates that the sum across

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/87fc463e/be/src/runtime/tmp-file-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/tmp-file-mgr.cc b/be/src/runtime/tmp-file-mgr.cc
index 8916d4f..f1e243c 100644
--- a/be/src/runtime/tmp-file-mgr.cc
+++ b/be/src/runtime/tmp-file-mgr.cc
@@ -26,6 +26,7 @@
 #include <gutil/strings/join.h>
 #include <gutil/strings/substitute.h>
 
+#include "runtime/disk-io-mgr-reader-context.h"
 #include "runtime/runtime-state.h"
 #include "runtime/tmp-file-mgr-internal.h"
 #include "util/bit-util.h"
@@ -240,7 +241,7 @@ TmpFileMgr::FileGroup::FileGroup(TmpFileMgr* tmp_file_mgr, DiskIoMgr* io_mgr,
     next_allocation_index_(0),
     free_ranges_(64) {
   DCHECK(tmp_file_mgr != nullptr);
-  io_mgr_->RegisterContext(&io_ctx_, nullptr);
+  io_ctx_ = io_mgr_->RegisterContext(nullptr);
 }
 
 TmpFileMgr::FileGroup::~FileGroup() {
@@ -282,8 +283,7 @@ Status TmpFileMgr::FileGroup::CreateFiles() {
 void TmpFileMgr::FileGroup::Close() {
   // Cancel writes before deleting the files, since in-flight writes could re-create
   // deleted files.
-  if (io_ctx_ != nullptr) io_mgr_->UnregisterContext(io_ctx_);
-  io_ctx_ = nullptr;
+  if (io_ctx_ != nullptr) io_mgr_->UnregisterContext(io_ctx_.get());
   for (std::unique_ptr<TmpFileMgr::File>& file : tmp_files_) {
     Status status = file->Remove();
     if (!status.ok()) {
@@ -361,7 +361,7 @@ Status TmpFileMgr::FileGroup::Write(
   DiskIoMgr::WriteRange::WriteDoneCallback callback = [this, tmp_handle_ptr](
       const Status& write_status) { WriteComplete(tmp_handle_ptr, write_status); };
   RETURN_IF_ERROR(
-      tmp_handle->Write(io_mgr_, io_ctx_, tmp_file, file_offset, buffer, callback));
+      tmp_handle->Write(io_mgr_, io_ctx_.get(), tmp_file, file_offset, buffer, callback));
   write_counter_->Add(1);
   bytes_written_counter_->Add(buffer.len());
   *handle = move(tmp_handle);
@@ -394,7 +394,7 @@ Status TmpFileMgr::FileGroup::ReadAsync(WriteHandle* handle, MemRange buffer) {
       DiskIoMgr::BufferOpts::ReadInto(buffer.data(), buffer.len()));
   read_counter_->Add(1);
   bytes_read_counter_->Add(buffer.len());
-  RETURN_IF_ERROR(io_mgr_->AddScanRange(io_ctx_, handle->read_range_, true));
+  RETURN_IF_ERROR(io_mgr_->AddScanRange(io_ctx_.get(), handle->read_range_, true));
   return Status::OK();
 }
 
@@ -489,7 +489,7 @@ Status TmpFileMgr::FileGroup::RecoverWriteError(
   // Choose another file to try. Blacklisting ensures we don't retry the same file.
   // If this fails, the status will include all the errors in 'scratch_errors_'.
   RETURN_IF_ERROR(AllocateSpace(handle->len(), &tmp_file, &file_offset));
-  return handle->RetryWrite(io_mgr_, io_ctx_, tmp_file, file_offset);
+  return handle->RetryWrite(io_mgr_, io_ctx_.get(), tmp_file, file_offset);
 }
 
 string TmpFileMgr::FileGroup::DebugString() {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/87fc463e/be/src/runtime/tmp-file-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/tmp-file-mgr.h b/be/src/runtime/tmp-file-mgr.h
index ba7210d..f550af2 100644
--- a/be/src/runtime/tmp-file-mgr.h
+++ b/be/src/runtime/tmp-file-mgr.h
@@ -19,6 +19,7 @@
 #define IMPALA_RUNTIME_TMP_FILE_MGR_H
 
 #include <functional>
+#include <memory>
 #include <utility>
 
 #include <boost/scoped_ptr.hpp>
@@ -200,7 +201,7 @@ class TmpFileMgr {
     DiskIoMgr* const io_mgr_;
 
     /// I/O context used for all reads and writes. Registered in constructor.
-    DiskIoRequestContext* io_ctx_;
+    std::unique_ptr<DiskIoRequestContext> io_ctx_;
 
     /// Stores scan ranges allocated in Read(). Needed because ScanRange objects may be
     /// touched by DiskIoMgr even after the scan is finished.


Mime
View raw message