Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 93259200B21 for ; Wed, 11 May 2016 20:12:06 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 91E2B160A17; Wed, 11 May 2016 18:12:06 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id B9CD1160A18 for ; Wed, 11 May 2016 20:12:05 +0200 (CEST) Received: (qmail 68754 invoked by uid 500); 11 May 2016 18:12:05 -0000 Mailing-List: contact commits-help@kudu.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kudu.incubator.apache.org Delivered-To: mailing list commits@kudu.incubator.apache.org Received: (qmail 68745 invoked by uid 99); 11 May 2016 18:12:04 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 11 May 2016 18:12:04 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 8F08DC35EA for ; Wed, 11 May 2016 18:12:04 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.636 X-Spam-Level: X-Spam-Status: No, score=-4.636 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-1.416] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id MdwyJDIJuGq2 for ; Wed, 11 May 2016 18:12:02 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with SMTP id C81C25F489 for ; Wed, 11 May 2016 18:12:01 +0000 (UTC) Received: (qmail 68671 invoked by uid 99); 11 May 2016 18:12:00 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 11 May 2016 18:12:00 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 7F0BCDFE3D; Wed, 11 May 2016 18:12:00 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: todd@apache.org To: commits@kudu.incubator.apache.org Date: Wed, 11 May 2016 18:12:01 -0000 Message-Id: <34563fcd81fd4e24ac64469225a40450@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [2/2] incubator-kudu git commit: service_queue: fixes based on Adar's post-commit review archived-at: Wed, 11 May 2016 18:12:06 -0000 service_queue: fixes based on Adar's post-commit review Adar left some notes on the LifoServiceQueue patch after it was committed here: http://gerrit.cloudera.org:8080/#/c/2938/7 This addresses those issues (mostly nits). Change-Id: Ib7f8d5205a319c1e0ddd1b01d63dac5d4430d5de Reviewed-on: http://gerrit.cloudera.org:8080/3026 Reviewed-by: Adar Dembo Tested-by: Kudu Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/659cfc42 Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/659cfc42 Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/659cfc42 Branch: refs/heads/master Commit: 659cfc42dd58fc15c7a67dceb3506ac5d40caf14 Parents: c991020 Author: Todd Lipcon Authored: Wed May 11 10:37:05 2016 -0700 Committer: Todd Lipcon Committed: Wed May 11 18:11:38 2016 +0000 ---------------------------------------------------------------------- build-support/tsan-suppressions.txt | 5 ++-- src/kudu/rpc/service_queue-test.cc | 39 +++++++++++++++----------------- src/kudu/rpc/service_queue.cc | 9 +++++--- src/kudu/rpc/service_queue.h | 8 +++---- 4 files changed, 31 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/659cfc42/build-support/tsan-suppressions.txt ---------------------------------------------------------------------- diff --git a/build-support/tsan-suppressions.txt b/build-support/tsan-suppressions.txt index 1baa32f..da89ecd 100644 --- a/build-support/tsan-suppressions.txt +++ b/build-support/tsan-suppressions.txt @@ -22,8 +22,9 @@ race:kudu::tablet::ScopedRowLock::Release # # With TSAN in clang 3.5, it's the init() function that's flagged as a data # race (not local_addr_space_init()), due to the former calling sigfillset() -# on an unprotected global variable. Given that init() invokes -# local_addr_space_init(), suppressing init() suppresses both races. +# on an unprotected global variable. Although init() calls local_addr_space_init(), +# it can sometimes be eliminated from the call stack by inlining or a tail-call +# optimization, so adding the suppression on both is necessary. race:_ULx86_64_init race:_ULx86_64_local_addr_space_init http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/659cfc42/src/kudu/rpc/service_queue-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/rpc/service_queue-test.cc b/src/kudu/rpc/service_queue-test.cc index 272c06f..0bcbd12 100644 --- a/src/kudu/rpc/service_queue-test.cc +++ b/src/kudu/rpc/service_queue-test.cc @@ -17,23 +17,22 @@ #include -#include -#include #include #include #include #include #include +#include #include #include "kudu/rpc/service_queue.h" #include "kudu/util/stopwatch.h" #include "kudu/util/test_util.h" -using std::string; -using std::vector; using std::shared_ptr; +using std::string; using std::unique_ptr; +using std::vector; DEFINE_int32(num_producers, 4, "Number of producer threads"); @@ -59,17 +58,17 @@ void ProducerThread(Queue* queue) { base::subtle::PauseCPU(); } inprogress++; - InboundCall * call = new InboundCall(nullptr); + InboundCall* call = new InboundCall(nullptr); boost::optional evicted; auto status = queue->Put(call, &evicted); if (status == QUEUE_FULL) { - LOG(INFO) << "queue, exit"; + LOG(INFO) << "queue full: producer exiting"; delete call; break; } if (PREDICT_FALSE(evicted != boost::none)) { - LOG(INFO) << "call evicted, exit"; + LOG(INFO) << "call evicted: producer exiting"; delete evicted.get(); break; } @@ -87,23 +86,21 @@ void ConsumerThread(Queue* queue) { while (queue->BlockingGet(&call)) { inprogress--; total++; - call.reset(nullptr); + call.reset(); } } TEST(TestServiceQueue, LifoServiceQueuePerf) { LifoServiceQueue queue(FLAGS_max_queue_size); - vector> producers; - vector> consumers; + vector producers; + vector consumers; - for (int i = 0; i< FLAGS_num_producers; i++) { - producers.push_back(shared_ptr( - new boost::thread(&ProducerThread, &queue))); + for (int i = 0; i < FLAGS_num_producers; i++) { + producers.emplace_back(&ProducerThread, &queue); } for (int i = 0; i < FLAGS_num_consumers; i++) { - consumers.push_back(shared_ptr( - new boost::thread(&ConsumerThread, &queue))); + consumers.emplace_back(&ConsumerThread, &queue); } int seconds = AllowSlowTests() ? 10 : 1; @@ -117,19 +114,19 @@ TEST(TestServiceQueue, LifoServiceQueuePerf) { for (int i = 0; i < seconds * 50; i++) { SleepFor(MonoDelta::FromMilliseconds(20)); total_sample++; - total_queue_len = queue.estimated_queue_length(); - total_idle_workers = queue.estimated_idle_worker_count(); + total_queue_len += queue.estimated_queue_length(); + total_idle_workers += queue.estimated_idle_worker_count(); } sw.stop(); int32_t delta = total - before; queue.Shutdown(); - for (int i = 0; i< FLAGS_num_producers; i++) { - producers[i]->join(); + for (int i = 0; i < FLAGS_num_producers; i++) { + producers[i].join(); } - for (int i = 0; i< FLAGS_num_consumers; i++) { - consumers[i]->join(); + for (int i = 0; i < FLAGS_num_consumers; i++) { + consumers[i].join(); } float reqs_per_second = static_cast(delta / sw.elapsed().wall_seconds()); http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/659cfc42/src/kudu/rpc/service_queue.cc ---------------------------------------------------------------------- diff --git a/src/kudu/rpc/service_queue.cc b/src/kudu/rpc/service_queue.cc index 87810a3..e620683 100644 --- a/src/kudu/rpc/service_queue.cc +++ b/src/kudu/rpc/service_queue.cc @@ -26,6 +26,7 @@ __thread LifoServiceQueue::ConsumerState* LifoServiceQueue::tl_consumer_ = nullp LifoServiceQueue::LifoServiceQueue(int max_size) : shutdown_(false), max_queue_size_(max_size) { + CHECK_GT(max_queue_size_, 0); } LifoServiceQueue::~LifoServiceQueue() { @@ -33,11 +34,11 @@ LifoServiceQueue::~LifoServiceQueue() { << "ServiceQueue holds bare pointers at destruction time"; } -bool LifoServiceQueue::BlockingGet(std::unique_ptr *out) { +bool LifoServiceQueue::BlockingGet(std::unique_ptr* out) { auto consumer = tl_consumer_; if (PREDICT_FALSE(!consumer)) { - lock_guard l(&lock_); consumer = tl_consumer_ = new ConsumerState(this); + lock_guard l(&lock_); consumers_.emplace_back(consumer); } @@ -50,7 +51,7 @@ bool LifoServiceQueue::BlockingGet(std::unique_ptr *out) { queue_.erase(it); return true; } - if (shutdown_) { + if (PREDICT_FALSE(shutdown_)) { return false; } consumer->DCheckBoundInstance(this); @@ -61,6 +62,8 @@ bool LifoServiceQueue::BlockingGet(std::unique_ptr *out) { out->reset(call); return true; } + // if call == nullptr, this means we are shutting down the queue. + // Loop back around and re-check 'shutdown_'. } } http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/659cfc42/src/kudu/rpc/service_queue.h ---------------------------------------------------------------------- diff --git a/src/kudu/rpc/service_queue.h b/src/kudu/rpc/service_queue.h index a4b502d..57401b9 100644 --- a/src/kudu/rpc/service_queue.h +++ b/src/kudu/rpc/service_queue.h @@ -48,14 +48,14 @@ enum QueueStatus { // provide accurate deadlines for their calls. // // In order to improve concurrent throughput, this class uses a LIFO design: -// Each consumer thread will has its own lock and condition variable. If a +// Each consumer thread has its own lock and condition variable. If a // consumer arrives and there is no work available in the queue, it will not // wait on the queue lock, but rather push its own 'ConsumerState' object // to the 'waiting_consumers_' stack. When work arrives, if there are waiting // consumers, the top consumer is popped from the stack and woken up. // // This design has a few advantages over the basic BlockingQueue: -// - the worker who was most recently busy is the one which we be selected for +// - the worker who was most recently busy is the one which will be selected for // new work. This gives an opportunity for the worker to be scheduled again // without going to sleep, and also keeps CPU cache and allocator caches hot. // - in the common case that there are enough workers to fully service the incoming @@ -73,7 +73,7 @@ class LifoServiceQueue { // Get an element from the queue. Returns false if we were shut down prior to // getting the element. - bool BlockingGet(std::unique_ptr *out); + bool BlockingGet(std::unique_ptr* out); // Add a new call to the queue. // Returns: @@ -204,7 +204,7 @@ class LifoServiceQueue { std::multiset queue_; // The total set of consumers who have ever accessed this queue. - std::vector > consumers_; + std::vector> consumers_; DISALLOW_COPY_AND_ASSIGN(LifoServiceQueue); };