Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id BC9D8200C6C for ; Thu, 20 Apr 2017 17:35:39 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id BB430160B91; Thu, 20 Apr 2017 15:35:39 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id D7F29160BB1 for ; Thu, 20 Apr 2017 17:35:38 +0200 (CEST) Received: (qmail 72087 invoked by uid 500); 20 Apr 2017 15:35:38 -0000 Mailing-List: contact commits-help@impala.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@impala.incubator.apache.org Delivered-To: mailing list commits@impala.incubator.apache.org Received: (qmail 71988 invoked by uid 99); 20 Apr 2017 15:35:37 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 20 Apr 2017 15:35:37 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 935C1C18C3 for ; Thu, 20 Apr 2017 15:35:37 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.222 X-Spam-Level: X-Spam-Status: No, score=-4.222 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id k5W_SnsPSKdh for ; Thu, 20 Apr 2017 15:35:34 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 7B8DA5F297 for ; Thu, 20 Apr 2017 15:35:33 +0000 (UTC) Received: (qmail 71884 invoked by uid 99); 20 Apr 2017 15:35:32 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 20 Apr 2017 15:35:32 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 4433DF4A25; Thu, 20 Apr 2017 15:35:32 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: tarmstrong@apache.org To: commits@impala.incubator.apache.org Date: Thu, 20 Apr 2017 15:35:33 -0000 Message-Id: <59a34a6ff1074750b1e0a95774b1a86a@git.apache.org> In-Reply-To: <3838c827d4884b4faa4d8bae30c00b9d@git.apache.org> References: <3838c827d4884b4faa4d8bae30c00b9d@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/5] incubator-impala git commit: Allow BlockingQueue and ThreadPool to accept rvalue args archived-at: Thu, 20 Apr 2017 15:35:39 -0000 Allow BlockingQueue and ThreadPool to accept rvalue args Previously the BlockingQueue and ThreadPool APIs only accepted const lvalue references, so the argument was always copied into the queue. Very often we create a thin wrapper for each work item we submit to a thread pool, and will not want to use that object again, so moving it into the pool rather than copying makes the most sense. Note the introduction of an extra template parameter into Offer() and BlockingPut*(). To enable perfect-forwarding (i.e. allow the methods to accept rvalue or lvalues and pass them through), we need to use a) rvalue references (V&&) and b) do so in a 'type-deducing context' [1]. Having the enclosing class be template-parameterized does not count as type-deducing, so we add the dummy V parameter and the compiler will ensure that V is properly compatible with the original T type. [1] http://eli.thegreenplace.net/2014/perfect-forwarding-and-universal-references-in-c/ Change-Id: I1791870576cb269e86495034f92555de48f92f10 Reviewed-on: http://gerrit.cloudera.org:8080/6442 Reviewed-by: Henry Robinson Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/c7fa4dce Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/c7fa4dce Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/c7fa4dce Branch: refs/heads/master Commit: c7fa4dceb667e7279dd04a42fcd2fba94ca5214f Parents: 830889a Author: Henry Robinson Authored: Mon Mar 20 14:25:19 2017 -0700 Committer: Impala Public Jenkins Committed: Thu Apr 20 02:56:43 2017 +0000 ---------------------------------------------------------------------- be/src/rpc/TAcceptQueueServer.cpp | 2 +- be/src/util/blocking-queue.h | 25 ++++++++++++++++++------- be/src/util/thread-pool.h | 5 +++-- 3 files changed, 22 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7fa4dce/be/src/rpc/TAcceptQueueServer.cpp ---------------------------------------------------------------------- diff --git a/be/src/rpc/TAcceptQueueServer.cpp b/be/src/rpc/TAcceptQueueServer.cpp index 58d90f1..65fdc46 100644 --- a/be/src/rpc/TAcceptQueueServer.cpp +++ b/be/src/rpc/TAcceptQueueServer.cpp @@ -224,7 +224,7 @@ void TAcceptQueueServer::serve() { shared_ptr client = serverTransport_->accept(); // New - the work done to setup the connection has been moved to SetupConnection. - if (!connection_setup_pool.Offer(client)) { + if (!connection_setup_pool.Offer(std::move(client))) { string errStr = string("TAcceptQueueServer: thread pool unexpectedly shut down."); GlobalOutput(errStr.c_str()); stop_ = true; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7fa4dce/be/src/util/blocking-queue.h ---------------------------------------------------------------------- diff --git a/be/src/util/blocking-queue.h b/be/src/util/blocking-queue.h index a762d5d..5d88397 100644 --- a/be/src/util/blocking-queue.h +++ b/be/src/util/blocking-queue.h @@ -105,9 +105,12 @@ class BlockingQueue : public CacheLineAligned { return true; } - /// Puts an element into the queue, waiting indefinitely until there is space. - /// If the queue is shut down, returns false. - bool BlockingPut(const T& val) { + /// Puts an element into the queue, waiting indefinitely until there is space. Rvalues + /// are moved into the queue, lvalues are copied. If the queue is shut down, returns + /// false. V is a type that is compatible with T; that is, objects of type V can be + /// inserted into the queue. + template + bool BlockingPut(V&& val) { MonotonicStopWatch timer; boost::unique_lock write_lock(put_lock_); @@ -120,7 +123,7 @@ class BlockingQueue : public CacheLineAligned { if (UNLIKELY(shutdown_)) return false; DCHECK_LT(put_list_.size(), max_elements_); - put_list_.push_back(val); + Put(std::forward(val)); write_lock.unlock(); get_cv_.NotifyOne(); return true; @@ -128,8 +131,11 @@ class BlockingQueue : public CacheLineAligned { /// Puts an element into the queue, waiting until 'timeout_micros' elapses, if there is /// no space. If the queue is shut down, or if the timeout elapsed without being able to - /// put the element, returns false. - bool BlockingPutWithTimeout(const T& val, int64_t timeout_micros) { + /// put the element, returns false. Rvalues are moved into the queue, lvalues are + /// copied. V is a type that is compatible with T; that is, objects of type V can be + /// inserted into the queue. + template + bool BlockingPutWithTimeout(V&& val, int64_t timeout_micros) { MonotonicStopWatch timer; boost::unique_lock write_lock(put_lock_); boost::system_time wtime = boost::get_system_time() + @@ -149,7 +155,7 @@ class BlockingQueue : public CacheLineAligned { // another thread did in fact signal if (SizeLocked(write_lock) >= max_elements_ || shutdown_) return false; DCHECK_LT(put_list_.size(), max_elements_); - put_list_.push_back(val); + Put(std::forward(val)); write_lock.unlock(); get_cv_.NotifyOne(); return true; @@ -193,6 +199,11 @@ class BlockingQueue : public CacheLineAligned { return get_list_size_.Load() + put_list_.size(); } + /// Overloads for inserting an item into the list, depending on whether it should be + /// moved or copied. + void Put(const T& val) { put_list_.push_back(val); } + void Put(T&& val) { put_list_.emplace_back(std::move(val)); } + /// True if the BlockingQueue is being shut down. Guarded by 'put_lock_'. bool shutdown_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7fa4dce/be/src/util/thread-pool.h ---------------------------------------------------------------------- diff --git a/be/src/util/thread-pool.h b/be/src/util/thread-pool.h index c5da7bd..cbc0031 100644 --- a/be/src/util/thread-pool.h +++ b/be/src/util/thread-pool.h @@ -75,8 +75,9 @@ class ThreadPool : public CacheLineAligned { // /// Returns true if the work item was successfully added to the queue, false otherwise /// (which typically means that the thread pool has already been shut down). - bool Offer(const T& work) { - return work_queue_.BlockingPut(work); + template + bool Offer(V&& work) { + return work_queue_.BlockingPut(std::forward(work)); } /// Shuts the thread pool down, causing the work queue to cease accepting offered work