kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject [2/2] incubator-kudu git commit: service_queue: fixes based on Adar's post-commit review
Date Wed, 11 May 2016 18:12:01 GMT
service_queue: fixes based on Adar's post-commit review

Adar left some notes on the LifoServiceQueue patch after it was
committed here: http://gerrit.cloudera.org:8080/#/c/2938/7

This addresses those issues (mostly nits).

Change-Id: Ib7f8d5205a319c1e0ddd1b01d63dac5d4430d5de
Reviewed-on: http://gerrit.cloudera.org:8080/3026
Reviewed-by: Adar Dembo <adar@cloudera.com>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/659cfc42
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/659cfc42
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/659cfc42

Branch: refs/heads/master
Commit: 659cfc42dd58fc15c7a67dceb3506ac5d40caf14
Parents: c991020
Author: Todd Lipcon <todd@apache.org>
Authored: Wed May 11 10:37:05 2016 -0700
Committer: Todd Lipcon <todd@apache.org>
Committed: Wed May 11 18:11:38 2016 +0000

----------------------------------------------------------------------
 build-support/tsan-suppressions.txt |  5 ++--
 src/kudu/rpc/service_queue-test.cc  | 39 +++++++++++++++-----------------
 src/kudu/rpc/service_queue.cc       |  9 +++++---
 src/kudu/rpc/service_queue.h        |  8 +++----
 4 files changed, 31 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/659cfc42/build-support/tsan-suppressions.txt
----------------------------------------------------------------------
diff --git a/build-support/tsan-suppressions.txt b/build-support/tsan-suppressions.txt
index 1baa32f..da89ecd 100644
--- a/build-support/tsan-suppressions.txt
+++ b/build-support/tsan-suppressions.txt
@@ -22,8 +22,9 @@ race:kudu::tablet::ScopedRowLock::Release
 #
 # With TSAN in clang 3.5, it's the init() function that's flagged as a data
 # race (not local_addr_space_init()), due to the former calling sigfillset()
-# on an unprotected global variable. Given that init() invokes
-# local_addr_space_init(), suppressing init() suppresses both races.
+# on an unprotected global variable. Although init() calls local_addr_space_init(),
+# it can sometimes be eliminated from the call stack by inlining or a tail-call
+# optimization, so adding the suppression on both is necessary.
 race:_ULx86_64_init
 race:_ULx86_64_local_addr_space_init
 

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/659cfc42/src/kudu/rpc/service_queue-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/service_queue-test.cc b/src/kudu/rpc/service_queue-test.cc
index 272c06f..0bcbd12 100644
--- a/src/kudu/rpc/service_queue-test.cc
+++ b/src/kudu/rpc/service_queue-test.cc
@@ -17,23 +17,22 @@
 
 
 #include <atomic>
-#include <boost/bind.hpp>
-#include <boost/thread/thread.hpp>
 #include <gflags/gflags.h>
 #include <glog/logging.h>
 #include <gtest/gtest.h>
 #include <memory>
 #include <string>
+#include <thread>
 #include <vector>
 
 #include "kudu/rpc/service_queue.h"
 #include "kudu/util/stopwatch.h"
 #include "kudu/util/test_util.h"
 
-using std::string;
-using std::vector;
 using std::shared_ptr;
+using std::string;
 using std::unique_ptr;
+using std::vector;
 
 DEFINE_int32(num_producers, 4,
              "Number of producer threads");
@@ -59,17 +58,17 @@ void ProducerThread(Queue* queue) {
       base::subtle::PauseCPU();
     }
     inprogress++;
-    InboundCall * call = new InboundCall(nullptr);
+    InboundCall* call = new InboundCall(nullptr);
     boost::optional<InboundCall*> evicted;
     auto status = queue->Put(call, &evicted);
     if (status == QUEUE_FULL) {
-      LOG(INFO) << "queue, exit";
+      LOG(INFO) << "queue full: producer exiting";
       delete call;
       break;
     }
 
     if (PREDICT_FALSE(evicted != boost::none)) {
-      LOG(INFO) << "call evicted, exit";
+      LOG(INFO) << "call evicted: producer exiting";
       delete evicted.get();
       break;
     }
@@ -87,23 +86,21 @@ void ConsumerThread(Queue* queue) {
   while (queue->BlockingGet(&call)) {
     inprogress--;
     total++;
-    call.reset(nullptr);
+    call.reset();
   }
 }
 
 TEST(TestServiceQueue, LifoServiceQueuePerf) {
   LifoServiceQueue queue(FLAGS_max_queue_size);
-  vector<shared_ptr<boost::thread>> producers;
-  vector<shared_ptr<boost::thread>> consumers;
+  vector<std::thread> producers;
+  vector<std::thread> consumers;
 
-  for (int i = 0; i< FLAGS_num_producers; i++) {
-    producers.push_back(shared_ptr<boost::thread>(
-        new boost::thread(&ProducerThread<LifoServiceQueue>, &queue)));
+  for (int i = 0; i < FLAGS_num_producers; i++) {
+    producers.emplace_back(&ProducerThread<LifoServiceQueue>, &queue);
   }
 
   for (int i = 0; i < FLAGS_num_consumers; i++) {
-    consumers.push_back(shared_ptr<boost::thread>(
-        new boost::thread(&ConsumerThread<LifoServiceQueue>, &queue)));
+    consumers.emplace_back(&ConsumerThread<LifoServiceQueue>, &queue);
   }
 
   int seconds = AllowSlowTests() ? 10 : 1;
@@ -117,19 +114,19 @@ TEST(TestServiceQueue, LifoServiceQueuePerf) {
   for (int i = 0; i < seconds * 50; i++) {
     SleepFor(MonoDelta::FromMilliseconds(20));
     total_sample++;
-    total_queue_len = queue.estimated_queue_length();
-    total_idle_workers = queue.estimated_idle_worker_count();
+    total_queue_len += queue.estimated_queue_length();
+    total_idle_workers += queue.estimated_idle_worker_count();
   }
 
   sw.stop();
   int32_t delta = total - before;
 
   queue.Shutdown();
-  for (int i = 0; i< FLAGS_num_producers; i++) {
-    producers[i]->join();
+  for (int i = 0; i < FLAGS_num_producers; i++) {
+    producers[i].join();
   }
-  for (int i = 0; i< FLAGS_num_consumers; i++) {
-    consumers[i]->join();
+  for (int i = 0; i < FLAGS_num_consumers; i++) {
+    consumers[i].join();
   }
 
   float reqs_per_second = static_cast<float>(delta / sw.elapsed().wall_seconds());

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/659cfc42/src/kudu/rpc/service_queue.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/service_queue.cc b/src/kudu/rpc/service_queue.cc
index 87810a3..e620683 100644
--- a/src/kudu/rpc/service_queue.cc
+++ b/src/kudu/rpc/service_queue.cc
@@ -26,6 +26,7 @@ __thread LifoServiceQueue::ConsumerState* LifoServiceQueue::tl_consumer_
= nullp
 LifoServiceQueue::LifoServiceQueue(int max_size)
    : shutdown_(false),
      max_queue_size_(max_size) {
+  CHECK_GT(max_queue_size_, 0);
 }
 
 LifoServiceQueue::~LifoServiceQueue() {
@@ -33,11 +34,11 @@ LifoServiceQueue::~LifoServiceQueue() {
       << "ServiceQueue holds bare pointers at destruction time";
 }
 
-bool LifoServiceQueue::BlockingGet(std::unique_ptr<InboundCall> *out) {
+bool LifoServiceQueue::BlockingGet(std::unique_ptr<InboundCall>* out) {
   auto consumer = tl_consumer_;
   if (PREDICT_FALSE(!consumer)) {
-    lock_guard<simple_spinlock> l(&lock_);
     consumer = tl_consumer_ = new ConsumerState(this);
+    lock_guard<simple_spinlock> l(&lock_);
     consumers_.emplace_back(consumer);
   }
 
@@ -50,7 +51,7 @@ bool LifoServiceQueue::BlockingGet(std::unique_ptr<InboundCall> *out)
{
         queue_.erase(it);
         return true;
       }
-      if (shutdown_) {
+      if (PREDICT_FALSE(shutdown_)) {
         return false;
       }
       consumer->DCheckBoundInstance(this);
@@ -61,6 +62,8 @@ bool LifoServiceQueue::BlockingGet(std::unique_ptr<InboundCall> *out)
{
       out->reset(call);
       return true;
     }
+    // if call == nullptr, this means we are shutting down the queue.
+    // Loop back around and re-check 'shutdown_'.
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/659cfc42/src/kudu/rpc/service_queue.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/service_queue.h b/src/kudu/rpc/service_queue.h
index a4b502d..57401b9 100644
--- a/src/kudu/rpc/service_queue.h
+++ b/src/kudu/rpc/service_queue.h
@@ -48,14 +48,14 @@ enum QueueStatus {
 // provide accurate deadlines for their calls.
 //
 // In order to improve concurrent throughput, this class uses a LIFO design:
-// Each consumer thread will has its own lock and condition variable. If a
+// Each consumer thread has its own lock and condition variable. If a
 // consumer arrives and there is no work available in the queue, it will not
 // wait on the queue lock, but rather push its own 'ConsumerState' object
 // to the 'waiting_consumers_' stack. When work arrives, if there are waiting
 // consumers, the top consumer is popped from the stack and woken up.
 //
 // This design has a few advantages over the basic BlockingQueue:
-// - the worker who was most recently busy is the one which we be selected for
+// - the worker who was most recently busy is the one which will be selected for
 //   new work. This gives an opportunity for the worker to be scheduled again
 //   without going to sleep, and also keeps CPU cache and allocator caches hot.
 // - in the common case that there are enough workers to fully service the incoming
@@ -73,7 +73,7 @@ class LifoServiceQueue {
 
   // Get an element from the queue.  Returns false if we were shut down prior to
   // getting the element.
-  bool BlockingGet(std::unique_ptr<InboundCall> *out);
+  bool BlockingGet(std::unique_ptr<InboundCall>* out);
 
   // Add a new call to the queue.
   // Returns:
@@ -204,7 +204,7 @@ class LifoServiceQueue {
   std::multiset<InboundCall*, DeadlineLessStruct> queue_;
 
   // The total set of consumers who have ever accessed this queue.
-  std::vector<std::unique_ptr<ConsumerState> > consumers_;
+  std::vector<std::unique_ptr<ConsumerState>> consumers_;
 
   DISALLOW_COPY_AND_ASSIGN(LifoServiceQueue);
 };


Mime
View raw message