impala-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mjac...@apache.org
Subject [1/3] incubator-impala git commit: IMPALA-5240: Allow config of number of disk I/O threads per disk type
Date Wed, 12 Jul 2017 15:17:07 GMT
Repository: incubator-impala
Updated Branches:
  refs/heads/master 39e8cf313 -> 1d8274a89


IMPALA-5240: Allow config of number of disk I/O threads per disk type

Currently Impala defaults to 8 threads per flash disk and 1 thread per
rotational disk. This can be overridden with --num_threads_per_disk,
but that sets the thread count independent of disk type.

This would allow control of the number of disk I/O threads spawned
independently for solid state disks using
"--num_io_threads_per_solid_state_disk" and for rotational disks using
"--num_io_threads_per_rotational_disk" as startup parameters.

Testing:
Added backend tests that verify if "num_threads_per_disk",
"num_io_threads_per_solid_state_disk" and
"num_io_threads_per_rotational_disk" are used to spawn threads
appropriately. Existing tests have been updated to use the new
DiskIoMgr test constructor, but retain the same behavior as before.

Additionally made some changes to fix a bug, where impala would crash
if num_disks are set more than the number of logical disks on system and
num_threads_per_disk is not set. This bug also lets the user create more
disk queues than the num of logical disks which would eventually never
be used by the actual code. Moreover, after this fix, if num_disks is
set to more than the number of logical disks then it will default to
max available disks and log a warning.

Change-Id: I094aff3747104104fe0465d73dcdbef5d9892f7c
Reviewed-on: http://gerrit.cloudera.org:8080/7232
Reviewed-by: Matthew Jacobs <mj@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/4655c45e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/4655c45e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/4655c45e

Branch: refs/heads/master
Commit: 4655c45e9a797ab48008c92f6be1fd4abc7c24a9
Parents: 39e8cf3
Author: Bikramjeet Vig <bikramjeet.vig@cloudera.com>
Authored: Tue Jun 20 11:39:44 2017 -0700
Committer: Impala Public Jenkins <impala-public-jenkins@gerrit.cloudera.org>
Committed: Wed Jul 12 04:29:02 2017 +0000

----------------------------------------------------------------------
 be/src/runtime/disk-io-mgr-stress.cc            |  4 +-
 be/src/runtime/disk-io-mgr-test.cc              | 60 ++++++++++++-----
 be/src/runtime/disk-io-mgr.cc                   | 69 +++++++++++++++-----
 be/src/runtime/disk-io-mgr.h                    | 21 ++++--
 be/src/util/disk-info.h                         | 10 ++-
 be/src/util/thread.cc                           |  4 ++
 be/src/util/thread.h                            |  5 +-
 .../impala/testutil/SentryServicePinger.java    |  2 +-
 8 files changed, 125 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4655c45e/be/src/runtime/disk-io-mgr-stress.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-stress.cc b/be/src/runtime/disk-io-mgr-stress.cc
index af0c841..658b747 100644
--- a/be/src/runtime/disk-io-mgr-stress.cc
+++ b/be/src/runtime/disk-io-mgr-stress.cc
@@ -71,8 +71,8 @@ DiskIoMgrStress::DiskIoMgrStress(int num_disks, int num_threads_per_disk,
   LOG(INFO) << "Running with rand seed: " << rand_seed;
   srand(rand_seed);
 
-  io_mgr_.reset(new DiskIoMgr(
-      num_disks, num_threads_per_disk, MIN_READ_BUFFER_SIZE, MAX_READ_BUFFER_SIZE));
+  io_mgr_.reset(new DiskIoMgr(num_disks, num_threads_per_disk, num_threads_per_disk,
+      MIN_READ_BUFFER_SIZE, MAX_READ_BUFFER_SIZE));
   Status status = io_mgr_->Init(&mem_tracker_);
   CHECK(status.ok());
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4655c45e/be/src/runtime/disk-io-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-test.cc b/be/src/runtime/disk-io-mgr-test.cc
index c66fbf1..05c99e7 100644
--- a/be/src/runtime/disk-io-mgr-test.cc
+++ b/be/src/runtime/disk-io-mgr-test.cc
@@ -33,6 +33,10 @@
 
 #include "common/names.h"
 
+DECLARE_int32(num_remote_hdfs_io_threads);
+DECLARE_int32(num_s3_io_threads);
+DECLARE_int32(num_adls_io_threads);
+
 using boost::condition_variable;
 
 const int MIN_BUFFER_SIZE = 512;
@@ -200,7 +204,7 @@ TEST_F(DiskIoMgrTest, SingleWriter) {
     EXPECT_TRUE(false);
   }
 
-  scoped_ptr<DiskIoMgr> read_io_mgr(new DiskIoMgr(1, 1, 1, 10));
+  scoped_ptr<DiskIoMgr> read_io_mgr(new DiskIoMgr(1, 1, 1, 1, 10));
   MemTracker reader_mem_tracker(LARGE_MEM_LIMIT);
   ASSERT_OK(read_io_mgr->Init(&reader_mem_tracker));
   DiskIoRequestContext* reader;
@@ -208,7 +212,7 @@ TEST_F(DiskIoMgrTest, SingleWriter) {
   for (int num_threads_per_disk = 1; num_threads_per_disk <= 5; ++num_threads_per_disk)
{
     for (int num_disks = 1; num_disks <= 5; num_disks += 2) {
       pool_.reset(new ObjectPool);
-      DiskIoMgr io_mgr(num_disks, num_threads_per_disk, 1, 10);
+      DiskIoMgr io_mgr(num_disks, num_threads_per_disk, num_threads_per_disk, 1, 10);
       ASSERT_OK(io_mgr.Init(&mem_tracker));
       DiskIoRequestContext* writer;
       io_mgr.RegisterContext(&writer, &mem_tracker);
@@ -245,7 +249,7 @@ TEST_F(DiskIoMgrTest, InvalidWrite) {
   MemTracker mem_tracker(LARGE_MEM_LIMIT);
   num_ranges_written_ = 0;
   string tmp_file = "/non-existent/file.txt";
-  DiskIoMgr io_mgr(1, 1, 1, 10);
+  DiskIoMgr io_mgr(1, 1, 1, 1, 10);
   ASSERT_OK(io_mgr.Init(&mem_tracker));
   DiskIoRequestContext* writer;
   io_mgr.RegisterContext(&writer, NULL);
@@ -306,7 +310,7 @@ TEST_F(DiskIoMgrTest, SingleWriterCancel) {
     EXPECT_TRUE(false);
   }
 
-  scoped_ptr<DiskIoMgr> read_io_mgr(new DiskIoMgr(1, 1, 1, 10));
+  scoped_ptr<DiskIoMgr> read_io_mgr(new DiskIoMgr(1, 1, 1, 1, 10));
   MemTracker reader_mem_tracker(LARGE_MEM_LIMIT);
   ASSERT_OK(read_io_mgr->Init(&reader_mem_tracker));
   DiskIoRequestContext* reader;
@@ -314,7 +318,7 @@ TEST_F(DiskIoMgrTest, SingleWriterCancel) {
   for (int num_threads_per_disk = 1; num_threads_per_disk <= 5; ++num_threads_per_disk)
{
     for (int num_disks = 1; num_disks <= 5; num_disks += 2) {
       pool_.reset(new ObjectPool);
-      DiskIoMgr io_mgr(num_disks, num_threads_per_disk, 1, 10);
+      DiskIoMgr io_mgr(num_disks, num_threads_per_disk, num_threads_per_disk, 1, 10);
       ASSERT_OK(io_mgr.Init(&mem_tracker));
       DiskIoRequestContext* writer;
       io_mgr.RegisterContext(&writer, &mem_tracker);
@@ -376,7 +380,7 @@ TEST_F(DiskIoMgrTest, SingleReader) {
                     << " num_read_threads=" << num_read_threads;
 
           if (++iters % 5000 == 0) LOG(ERROR) << "Starting iteration " << iters;
-          DiskIoMgr io_mgr(num_disks, num_threads_per_disk, 1, 1);
+          DiskIoMgr io_mgr(num_disks, num_threads_per_disk, num_threads_per_disk, 1, 1);
 
           ASSERT_OK(io_mgr.Init(&mem_tracker));
           MemTracker reader_mem_tracker;
@@ -430,7 +434,7 @@ TEST_F(DiskIoMgrTest, AddScanRangeTest) {
                   << " num_disk=" << num_disks << " num_buffers=" <<
num_buffers;
 
         if (++iters % 5000 == 0) LOG(ERROR) << "Starting iteration " << iters;
-        DiskIoMgr io_mgr(num_disks, num_threads_per_disk, 1, 1);
+        DiskIoMgr io_mgr(num_disks, num_threads_per_disk, num_threads_per_disk, 1, 1);
 
         ASSERT_OK(io_mgr.Init(&mem_tracker));
         MemTracker reader_mem_tracker;
@@ -502,8 +506,8 @@ TEST_F(DiskIoMgrTest, SyncReadTest) {
                   << " num_disk=" << num_disks << " num_buffers=" <<
num_buffers;
 
         if (++iters % 5000 == 0) LOG(ERROR) << "Starting iteration " << iters;
-        DiskIoMgr io_mgr(
-            num_disks, num_threads_per_disk, MIN_BUFFER_SIZE, MAX_BUFFER_SIZE);
+        DiskIoMgr io_mgr(num_disks, num_threads_per_disk, num_threads_per_disk,
+            MIN_BUFFER_SIZE, MAX_BUFFER_SIZE);
 
         ASSERT_OK(io_mgr.Init(&mem_tracker));
         MemTracker reader_mem_tracker;
@@ -573,7 +577,7 @@ TEST_F(DiskIoMgrTest, SingleReaderCancel) {
                   << " num_disk=" << num_disks << " num_buffers=" <<
num_buffers;
 
         if (++iters % 5000 == 0) LOG(ERROR) << "Starting iteration " << iters;
-        DiskIoMgr io_mgr(num_disks, num_threads_per_disk, 1, 1);
+        DiskIoMgr io_mgr(num_disks, num_threads_per_disk, num_threads_per_disk, 1, 1);
 
         ASSERT_OK(io_mgr.Init(&mem_tracker));
         MemTracker reader_mem_tracker;
@@ -638,7 +642,7 @@ TEST_F(DiskIoMgrTest, MemLimits) {
     if (++iters % 1000 == 0) LOG(ERROR) << "Starting iteration " << iters;
 
     MemTracker root_mem_tracker(mem_limit_num_buffers * MAX_BUFFER_SIZE);
-    DiskIoMgr io_mgr(1, 1, MIN_BUFFER_SIZE, MAX_BUFFER_SIZE);
+    DiskIoMgr io_mgr(1, 1, 1, MIN_BUFFER_SIZE, MAX_BUFFER_SIZE);
 
     ASSERT_OK(io_mgr.Init(&root_mem_tracker));
     MemTracker reader_mem_tracker(-1, "Reader", &root_mem_tracker);
@@ -713,7 +717,7 @@ TEST_F(DiskIoMgrTest, CachedReads) {
   {
     pool_.reset(new ObjectPool);
     if (++iters % 5000 == 0) LOG(ERROR) << "Starting iteration " << iters;
-    DiskIoMgr io_mgr(num_disks, 1, MIN_BUFFER_SIZE, MAX_BUFFER_SIZE);
+    DiskIoMgr io_mgr(num_disks, 1, 1, MIN_BUFFER_SIZE, MAX_BUFFER_SIZE);
 
     ASSERT_OK(io_mgr.Init(&mem_tracker));
     MemTracker reader_mem_tracker;
@@ -787,7 +791,8 @@ TEST_F(DiskIoMgrTest, MultipleReaderWriter) {
   for (int iteration = 0; iteration < ITERATIONS; ++iteration) {
     for (int threads_per_disk = 1; threads_per_disk <= 5; ++threads_per_disk) {
       for (int num_disks = 1; num_disks <= 5; num_disks += 2) {
-        DiskIoMgr io_mgr(num_disks, threads_per_disk, MIN_BUFFER_SIZE, MAX_BUFFER_SIZE);
+        DiskIoMgr io_mgr(num_disks, threads_per_disk, threads_per_disk, MIN_BUFFER_SIZE,
+            MAX_BUFFER_SIZE);
         ASSERT_OK(io_mgr.Init(&mem_tracker));
         for (int file_index = 0; file_index < num_contexts; ++file_index) {
           io_mgr.RegisterContext(&contexts[file_index], &mem_tracker);
@@ -900,7 +905,8 @@ TEST_F(DiskIoMgrTest, MultipleReader) {
                     << " num_disk=" << num_disks << " num_buffers=" <<
num_buffers;
           if (++iters % 2500 == 0) LOG(ERROR) << "Starting iteration " << iters;
 
-          DiskIoMgr io_mgr(num_disks, threads_per_disk, MIN_BUFFER_SIZE, MAX_BUFFER_SIZE);
+          DiskIoMgr io_mgr(num_disks, threads_per_disk, threads_per_disk, MIN_BUFFER_SIZE,
+              MAX_BUFFER_SIZE);
           EXPECT_OK(io_mgr.Init(&mem_tracker));
 
           for (int i = 0; i < NUM_READERS; ++i) {
@@ -952,7 +958,7 @@ TEST_F(DiskIoMgrTest, Buffers) {
   int max_buffer_size = 8 * 1024 * 1024; // 8 MB
   MemTracker root_mem_tracker(max_buffer_size * 2);
 
-  DiskIoMgr io_mgr(1, 1, min_buffer_size, max_buffer_size);
+  DiskIoMgr io_mgr(1, 1, 1, min_buffer_size, max_buffer_size);
   ASSERT_OK(io_mgr.Init(&root_mem_tracker));
   ASSERT_EQ(root_mem_tracker.consumption(), 0);
 
@@ -1029,7 +1035,7 @@ TEST_F(DiskIoMgrTest, PartialRead) {
   struct stat stat_val;
   stat(tmp_file, &stat_val);
 
-  scoped_ptr<DiskIoMgr> io_mgr(new DiskIoMgr(1, 1, read_len, read_len));
+  scoped_ptr<DiskIoMgr> io_mgr(new DiskIoMgr(1, 1, 1, read_len, read_len));
 
   ASSERT_OK(io_mgr->Init(&mem_tracker));
   MemTracker reader_mem_tracker;
@@ -1061,7 +1067,7 @@ TEST_F(DiskIoMgrTest, ReadIntoClientBuffer) {
   int read_len = 4; // Make buffer size smaller than client-provided buffer.
   CreateTempFile(tmp_file, data);
 
-  scoped_ptr<DiskIoMgr> io_mgr(new DiskIoMgr(1, 1, read_len, read_len));
+  scoped_ptr<DiskIoMgr> io_mgr(new DiskIoMgr(1, 1, 1, read_len, read_len));
 
   ASSERT_OK(io_mgr->Init(&mem_tracker));
   // Reader doesn't need to provide mem tracker if it's providing buffers.
@@ -1101,7 +1107,7 @@ TEST_F(DiskIoMgrTest, ReadIntoClientBufferError) {
   const char* tmp_file = "/file/that/does/not/exist";
   const int SCAN_LEN = 128;
 
-  scoped_ptr<DiskIoMgr> io_mgr(new DiskIoMgr(1, 1, SCAN_LEN, SCAN_LEN));
+  scoped_ptr<DiskIoMgr> io_mgr(new DiskIoMgr(1, 1, 1, SCAN_LEN, SCAN_LEN));
 
   ASSERT_OK(io_mgr->Init(&mem_tracker));
   // Reader doesn't need to provide mem tracker if it's providing buffers.
@@ -1132,6 +1138,24 @@ TEST_F(DiskIoMgrTest, ReadIntoClientBufferError) {
   io_mgr.reset();
   EXPECT_EQ(mem_tracker.consumption(), 0);
 }
+
+// Test to verify configuration parameters for number of I/O threads per disk.
+TEST_F(DiskIoMgrTest, VerifyNumThreadsParameter) {
+  const int num_io_threads_for_remote_disks = FLAGS_num_remote_hdfs_io_threads
+      + FLAGS_num_s3_io_threads + FLAGS_num_adls_io_threads;
+
+  // Verify num_io_threads_per_rotational_disk and num_io_threads_per_solid_state_disk.
+  // Since we do not have control over which disk is used, we check for either type
+  // (rotational/solid state)
+  MemTracker mem_tracker(LARGE_MEM_LIMIT);
+  const int num_io_threads_per_rotational_or_ssd = 2;
+  DiskIoMgr io_mgr(1, num_io_threads_per_rotational_or_ssd,
+      num_io_threads_per_rotational_or_ssd, 1, 10);
+  ASSERT_OK(io_mgr.Init(&mem_tracker));
+  const int num_io_threads = io_mgr.disk_thread_group_.Size();
+  ASSERT_TRUE(num_io_threads ==
+      num_io_threads_per_rotational_or_ssd + num_io_threads_for_remote_disks);
+}
 }
 
 int main(int argc, char** argv) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4655c45e/be/src/runtime/disk-io-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr.cc b/be/src/runtime/disk-io-mgr.cc
index d78bad3..3393ab3 100644
--- a/be/src/runtime/disk-io-mgr.cc
+++ b/be/src/runtime/disk-io-mgr.cc
@@ -41,22 +41,43 @@ using namespace strings;
 DEFINE_int32(num_disks, 0, "Number of disks on data node.");
 // Default IoMgr configs:
 // The maximum number of the threads per disk is also the max queue depth per disk.
-DEFINE_int32(num_threads_per_disk, 0, "number of threads per disk");
+DEFINE_int32(num_threads_per_disk, 0, "Number of I/O threads per disk");
+
+// Rotational disks should have 1 thread per disk to minimize seeks.  Non-rotational
+// don't have this penalty and benefit from multiple concurrent IO requests.
+static const int THREADS_PER_ROTATIONAL_DISK = 1;
+static const int THREADS_PER_SOLID_STATE_DISK = 8;
+
+// The maximum number of the threads per rotational disk is also the max queue depth per
+// rotational disk.
+static const string num_io_threads_per_rotational_disk_help_msg = Substitute("Number of "
+    "I/O threads per rotational disk. Has priority over num_threads_per_disk. If neither"
+    " is set, defaults to $0 thread(s) per rotational disk", THREADS_PER_ROTATIONAL_DISK);
+DEFINE_int32(num_io_threads_per_rotational_disk, 0,
+    num_io_threads_per_rotational_disk_help_msg.c_str());
+// The maximum number of the threads per solid state disk is also the max queue depth per
+// solid state disk.
+static const string num_io_threads_per_solid_state_disk_help_msg = Substitute("Number of"
+    " I/O threads per solid state disk. Has priority over num_threads_per_disk. If "
+    "neither is set, defaults to $0 thread(s) per solid state disk",
+    THREADS_PER_SOLID_STATE_DISK);
+DEFINE_int32(num_io_threads_per_solid_state_disk, 0,
+    num_io_threads_per_solid_state_disk_help_msg.c_str());
 // The maximum number of remote HDFS I/O threads.  HDFS access that are expected to be
 // remote are placed on a separate remote disk queue.  This is the queue depth for that
 // queue.  If 0, then the remote queue is not used and instead ranges are round-robined
 // across the local disk queues.
-DEFINE_int32(num_remote_hdfs_io_threads, 8, "number of remote HDFS I/O threads");
+DEFINE_int32(num_remote_hdfs_io_threads, 8, "Number of remote HDFS I/O threads");
 // The maximum number of S3 I/O threads. The default value of 16 was chosen emperically
 // to maximize S3 throughput. Maximum throughput is achieved with multiple connections
 // open to S3 and use of multiple CPU cores since S3 reads are relatively compute
 // expensive (SSL and JNI buffer overheads).
-DEFINE_int32(num_s3_io_threads, 16, "number of S3 I/O threads");
+DEFINE_int32(num_s3_io_threads, 16, "Number of S3 I/O threads");
 // The maximum number of ADLS I/O threads. This number is a good default to have for
 // clusters that may vary widely in size, due to an undocumented concurrency limit
 // enforced by ADLS for a cluster, which spans between 500-700. For smaller clusters
 // (~10 nodes), 64 threads would be more ideal.
-DEFINE_int32(num_adls_io_threads, 16, "number of ADLS I/O threads");
+DEFINE_int32(num_adls_io_threads, 16, "Number of ADLS I/O threads");
 // The read size is the size of the reads sent to hdfs/os.
 // There is a trade off of latency and throughout, trying to keep disks busy but
 // not introduce seeks.  The literature seems to agree that with 8 MB reads, random
@@ -76,11 +97,6 @@ DEFINE_int32(max_free_io_buffers, 128,
 DEFINE_uint64(max_cached_file_handles, 20000, "Maximum number of HDFS file handles "
     "that will be cached. Disabled if set to 0.");
 
-// Rotational disks should have 1 thread per disk to minimize seeks.  Non-rotational
-// don't have this penalty and benefit from multiple concurrent IO requests.
-static const int THREADS_PER_ROTATIONAL_DISK = 1;
-static const int THREADS_PER_FLASH_DISK = 8;
-
 // The IoMgr is able to run with a wide range of memory usage. If a query has memory
 // remaining less than this value, the IoMgr will stop all buffering regardless of the
 // current queue size.
@@ -251,8 +267,19 @@ static void CheckSseSupport() {
   }
 }
 
+// Utility function to select flag that is set (has a positive value) based on precedence
+static inline int GetFirstPositiveVal(const int first_val, const int second_val,
+    const int default_val) {
+  return first_val > 0 ? first_val : (second_val > 0 ? second_val : default_val);
+}
+
 DiskIoMgr::DiskIoMgr() :
-    num_threads_per_disk_(FLAGS_num_threads_per_disk),
+    num_io_threads_per_rotational_disk_(GetFirstPositiveVal(
+        FLAGS_num_io_threads_per_rotational_disk, FLAGS_num_threads_per_disk,
+        THREADS_PER_ROTATIONAL_DISK)),
+    num_io_threads_per_solid_state_disk_(GetFirstPositiveVal(
+        FLAGS_num_io_threads_per_solid_state_disk, FLAGS_num_threads_per_disk,
+        THREADS_PER_SOLID_STATE_DISK)),
     max_buffer_size_(FLAGS_read_size),
     min_buffer_size_(FLAGS_min_buffer_size),
     shut_down_(false),
@@ -262,14 +289,22 @@ DiskIoMgr::DiskIoMgr() :
         FileSystemUtil::MaxNumFileHandles())) {
   int64_t max_buffer_size_scaled = BitUtil::Ceil(max_buffer_size_, min_buffer_size_);
   free_buffers_.resize(BitUtil::Log2Ceiling64(max_buffer_size_scaled) + 1);
-  int num_local_disks = FLAGS_num_disks == 0 ? DiskInfo::num_disks() : FLAGS_num_disks;
+  int num_local_disks = DiskInfo::num_disks();
+  if (FLAGS_num_disks < 0 || FLAGS_num_disks > DiskInfo::num_disks()) {
+    LOG(WARNING) << "Number of disks specified should be between 0 and the number of
"
+        "logical disks on the system. Defaulting to system setting of " <<
+        DiskInfo::num_disks() << " disks";
+  } else if (FLAGS_num_disks > 0) {
+    num_local_disks = FLAGS_num_disks;
+  }
   disk_queues_.resize(num_local_disks + REMOTE_NUM_DISKS);
   CheckSseSupport();
 }
 
-DiskIoMgr::DiskIoMgr(int num_local_disks, int threads_per_disk, int min_buffer_size,
-                     int max_buffer_size) :
-    num_threads_per_disk_(threads_per_disk),
+DiskIoMgr::DiskIoMgr(int num_local_disks, int threads_per_rotational_disk,
+    int threads_per_solid_state_disk, int min_buffer_size, int max_buffer_size) :
+    num_io_threads_per_rotational_disk_(threads_per_rotational_disk),
+    num_io_threads_per_solid_state_disk_(threads_per_solid_state_disk),
     max_buffer_size_(max_buffer_size),
     min_buffer_size_(min_buffer_size),
     shut_down_(false),
@@ -346,12 +381,10 @@ Status DiskIoMgr::Init(MemTracker* process_mem_tracker) {
       num_threads_per_disk = FLAGS_num_s3_io_threads;
     } else if (i == RemoteAdlsDiskId()) {
       num_threads_per_disk = FLAGS_num_adls_io_threads;
-    } else if (num_threads_per_disk_ != 0) {
-      num_threads_per_disk = num_threads_per_disk_;
     } else if (DiskInfo::is_rotational(i)) {
-      num_threads_per_disk = THREADS_PER_ROTATIONAL_DISK;
+      num_threads_per_disk = num_io_threads_per_rotational_disk_;
     } else {
-      num_threads_per_disk = THREADS_PER_FLASH_DISK;
+      num_threads_per_disk = num_io_threads_per_solid_state_disk_;
     }
     for (int j = 0; j < num_threads_per_disk; ++j) {
       stringstream ss;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4655c45e/be/src/runtime/disk-io-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr.h b/be/src/runtime/disk-io-mgr.h
index 0212f8d..138f973 100644
--- a/be/src/runtime/disk-io-mgr.h
+++ b/be/src/runtime/disk-io-mgr.h
@@ -628,15 +628,17 @@ class DiskIoMgr : public CacheLineAligned {
     WriteDoneCallback callback_;
   };
 
-  /// Create a DiskIoMgr object.
+  /// Create a DiskIoMgr object. This constructor is only used for testing.
   ///  - num_disks: The number of disks the IoMgr should use. This is used for testing.
   ///    Specify 0, to have the disk IoMgr query the os for the number of disks.
-  ///  - threads_per_disk: number of read threads to create per disk. This is also
-  ///    the max queue depth.
+  ///  - threads_per_rotational_disk: number of read threads to create per rotational
+  ///    disk. This is also the max queue depth.
+  ///  - threads_per_solid_state_disk: number of read threads to create per solid state
+  ///    disk. This is also the max queue depth.
   ///  - min_buffer_size: minimum io buffer size (in bytes)
   ///  - max_buffer_size: maximum io buffer size (in bytes). Also the max read size.
-  DiskIoMgr(int num_disks, int threads_per_disk, int min_buffer_size,
-      int max_buffer_size);
+  DiskIoMgr(int num_disks, int threads_per_rotational_disk,
+      int threads_per_solid_state_disk, int min_buffer_size, int max_buffer_size);
 
   /// Create DiskIoMgr with default configs.
   DiskIoMgr();
@@ -817,6 +819,7 @@ class DiskIoMgr : public CacheLineAligned {
   class RequestContextCache;
 
   friend class DiskIoMgrTest_Buffers_Test;
+  friend class DiskIoMgrTest_VerifyNumThreadsParameter_Test;
 
   /// Memory tracker for unused I/O buffers owned by DiskIoMgr.
   boost::scoped_ptr<MemTracker> free_buffer_mem_tracker_;
@@ -826,9 +829,13 @@ class DiskIoMgr : public CacheLineAligned {
   /// provide a MemTracker.
   boost::scoped_ptr<MemTracker> unowned_buffer_mem_tracker_;
 
-  /// Number of worker(read) threads per disk. Also the max depth of queued
+  /// Number of worker(read) threads per rotational disk. Also the max depth of queued
   /// work to the disk.
-  const int num_threads_per_disk_;
+  const int num_io_threads_per_rotational_disk_;
+
+  /// Number of worker(read) threads per solid state disk. Also the max depth of queued
+  /// work to the disk.
+  const int num_io_threads_per_solid_state_disk_;
 
   /// Maximum read size. This is also the maximum size of each allocated buffer.
   const int max_buffer_size_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4655c45e/be/src/util/disk-info.h
----------------------------------------------------------------------
diff --git a/be/src/util/disk-info.h b/be/src/util/disk-info.h
index 323a265..edfea21 100644
--- a/be/src/util/disk-info.h
+++ b/be/src/util/disk-info.h
@@ -54,12 +54,16 @@ class DiskInfo {
     return disks_[disk_id].name;
   }
 
+  /// Returns true if the disk with disk_id exists and is a rotational disk, is false
+  /// otherwise
   static bool is_rotational(int disk_id) {
     DCHECK_GE(disk_id, 0);
-    DCHECK_LT(disk_id, disks_.size());
+    // TODO: temporarily removed DCHECK due to an issue tracked in IMPALA-5574, put it
+    // back after its resolved
+    if (disk_id >= disks_.size()) return false;
     return disks_[disk_id].is_rotational;
   }
-  
+
   static std::string DebugString();
 
  private:
@@ -75,7 +79,7 @@ class DiskInfo {
 
     bool is_rotational;
 
-    Disk(const std::string& name = "", int id = -1, bool is_rotational = true) 
+    Disk(const std::string& name = "", int id = -1, bool is_rotational = true)
       : name(name), id(id), is_rotational(is_rotational) {}
   };
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4655c45e/be/src/util/thread.cc
----------------------------------------------------------------------
diff --git a/be/src/util/thread.cc b/be/src/util/thread.cc
index 39a344f..c84ef0b 100644
--- a/be/src/util/thread.cc
+++ b/be/src/util/thread.cc
@@ -340,6 +340,10 @@ void ThreadGroup::JoinAll() {
   for (const Thread& thread: threads_) thread.Join();
 }
 
+int ThreadGroup::Size() const {
+  return threads_.size();
+}
+
 namespace {
 
 void RegisterUrlCallbacks(bool include_jvm_threads, Webserver* webserver) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4655c45e/be/src/util/thread.h
----------------------------------------------------------------------
diff --git a/be/src/util/thread.h b/be/src/util/thread.h
index 3a96233..e21be7c 100644
--- a/be/src/util/thread.h
+++ b/be/src/util/thread.h
@@ -164,7 +164,7 @@ class Thread {
 };
 
 /// Utility class to group together a set of threads. A replacement for
-/// boost::thread_group.
+/// boost::thread_group. Not thread safe.
 class ThreadGroup {
  public:
   ThreadGroup() {}
@@ -179,6 +179,9 @@ class ThreadGroup {
   /// deadlock will predictably ensue.
   void JoinAll();
 
+  /// Returns the number of threads in the group
+  int Size() const;
+
  private:
   /// All the threads grouped by this set.
   boost::ptr_vector<Thread> threads_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4655c45e/fe/src/test/java/org/apache/impala/testutil/SentryServicePinger.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/testutil/SentryServicePinger.java b/fe/src/test/java/org/apache/impala/testutil/SentryServicePinger.java
index 9bb3a5d..96a849b 100644
--- a/fe/src/test/java/org/apache/impala/testutil/SentryServicePinger.java
+++ b/fe/src/test/java/org/apache/impala/testutil/SentryServicePinger.java
@@ -76,7 +76,7 @@ public class SentryServicePinger {
         LOG.info("Sentry Service ping succeeded.");
         System.exit(0);
       } catch (Exception e) {
-        LOG.error(String.format("Error issing RPC to Sentry Service (attempt %d/%d): ",
+        LOG.error(String.format("Error issuing RPC to Sentry Service (attempt %d/%d): ",
             maxPings - numPings + 1, maxPings), e);
         Thread.sleep(sleepSecs * 1000);
       }


Mime
View raw message