impala-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tarmstr...@apache.org
Subject [2/5] incubator-impala git commit: Allow BlockingQueue and ThreadPool to accept rvalue args
Date Thu, 20 Apr 2017 15:35:33 GMT
Allow BlockingQueue and ThreadPool to accept rvalue args

Previously the BlockingQueue and ThreadPool APIs only accepted const
lvalue references, so the argument was always copied into the
queue. Very often we create a thin wrapper for each work item we submit
to a thread pool, and will not want to use that object again, so moving
it into the pool rather than copying makes the most sense.

Note the introduction of an extra template parameter into Offer() and
BlockingPut*(). To enable perfect-forwarding (i.e. allow the methods to
accept rvalue or lvalues and pass them through), we need to use a)
rvalue references (V&&) and b) do so in a 'type-deducing
context' [1]. Having the enclosing class be template-parameterized does not
count as type-deducing, so we add the dummy V parameter and the compiler
will ensure that V is properly compatible with the original T type.

[1] http://eli.thegreenplace.net/2014/perfect-forwarding-and-universal-references-in-c/

Change-Id: I1791870576cb269e86495034f92555de48f92f10
Reviewed-on: http://gerrit.cloudera.org:8080/6442
Reviewed-by: Henry Robinson <henry@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/c7fa4dce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/c7fa4dce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/c7fa4dce

Branch: refs/heads/master
Commit: c7fa4dceb667e7279dd04a42fcd2fba94ca5214f
Parents: 830889a
Author: Henry Robinson <henry@cloudera.com>
Authored: Mon Mar 20 14:25:19 2017 -0700
Committer: Impala Public Jenkins <impala-public-jenkins@gerrit.cloudera.org>
Committed: Thu Apr 20 02:56:43 2017 +0000

----------------------------------------------------------------------
 be/src/rpc/TAcceptQueueServer.cpp |  2 +-
 be/src/util/blocking-queue.h      | 25 ++++++++++++++++++-------
 be/src/util/thread-pool.h         |  5 +++--
 3 files changed, 22 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7fa4dce/be/src/rpc/TAcceptQueueServer.cpp
----------------------------------------------------------------------
diff --git a/be/src/rpc/TAcceptQueueServer.cpp b/be/src/rpc/TAcceptQueueServer.cpp
index 58d90f1..65fdc46 100644
--- a/be/src/rpc/TAcceptQueueServer.cpp
+++ b/be/src/rpc/TAcceptQueueServer.cpp
@@ -224,7 +224,7 @@ void TAcceptQueueServer::serve() {
       shared_ptr<TTransport> client = serverTransport_->accept();
 
       // New - the work done to setup the connection has been moved to SetupConnection.
-      if (!connection_setup_pool.Offer(client)) {
+      if (!connection_setup_pool.Offer(std::move(client))) {
         string errStr = string("TAcceptQueueServer: thread pool unexpectedly shut down.");
         GlobalOutput(errStr.c_str());
         stop_ = true;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7fa4dce/be/src/util/blocking-queue.h
----------------------------------------------------------------------
diff --git a/be/src/util/blocking-queue.h b/be/src/util/blocking-queue.h
index a762d5d..5d88397 100644
--- a/be/src/util/blocking-queue.h
+++ b/be/src/util/blocking-queue.h
@@ -105,9 +105,12 @@ class BlockingQueue : public CacheLineAligned {
     return true;
   }
 
-  /// Puts an element into the queue, waiting indefinitely until there is space.
-  /// If the queue is shut down, returns false.
-  bool BlockingPut(const T& val) {
+  /// Puts an element into the queue, waiting indefinitely until there is space. Rvalues
+  /// are moved into the queue, lvalues are copied. If the queue is shut down, returns
+  /// false. V is a type that is compatible with T; that is, objects of type V can be
+  /// inserted into the queue.
+  template <typename V>
+  bool BlockingPut(V&& val) {
     MonotonicStopWatch timer;
     boost::unique_lock<boost::mutex> write_lock(put_lock_);
 
@@ -120,7 +123,7 @@ class BlockingQueue : public CacheLineAligned {
     if (UNLIKELY(shutdown_)) return false;
 
     DCHECK_LT(put_list_.size(), max_elements_);
-    put_list_.push_back(val);
+    Put(std::forward<V>(val));
     write_lock.unlock();
     get_cv_.NotifyOne();
     return true;
@@ -128,8 +131,11 @@ class BlockingQueue : public CacheLineAligned {
 
   /// Puts an element into the queue, waiting until 'timeout_micros' elapses, if there is
   /// no space. If the queue is shut down, or if the timeout elapsed without being able to
-  /// put the element, returns false.
-  bool BlockingPutWithTimeout(const T& val, int64_t timeout_micros) {
+  /// put the element, returns false. Rvalues are moved into the queue, lvalues are
+  /// copied. V is a type that is compatible with T; that is, objects of type V can be
+  /// inserted into the queue.
+  template <typename V>
+  bool BlockingPutWithTimeout(V&& val, int64_t timeout_micros) {
     MonotonicStopWatch timer;
     boost::unique_lock<boost::mutex> write_lock(put_lock_);
     boost::system_time wtime = boost::get_system_time() +
@@ -149,7 +155,7 @@ class BlockingQueue : public CacheLineAligned {
     // another thread did in fact signal
     if (SizeLocked(write_lock) >= max_elements_ || shutdown_) return false;
     DCHECK_LT(put_list_.size(), max_elements_);
-    put_list_.push_back(val);
+    Put(std::forward<V>(val));
     write_lock.unlock();
     get_cv_.NotifyOne();
     return true;
@@ -193,6 +199,11 @@ class BlockingQueue : public CacheLineAligned {
     return get_list_size_.Load() + put_list_.size();
   }
 
+  /// Overloads for inserting an item into the list, depending on whether it should be
+  /// moved or copied.
+  void Put(const T& val) { put_list_.push_back(val); }
+  void Put(T&& val) { put_list_.emplace_back(std::move(val)); }
+
   /// True if the BlockingQueue is being shut down. Guarded by 'put_lock_'.
   bool shutdown_;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7fa4dce/be/src/util/thread-pool.h
----------------------------------------------------------------------
diff --git a/be/src/util/thread-pool.h b/be/src/util/thread-pool.h
index c5da7bd..cbc0031 100644
--- a/be/src/util/thread-pool.h
+++ b/be/src/util/thread-pool.h
@@ -75,8 +75,9 @@ class ThreadPool : public CacheLineAligned {
   //
   /// Returns true if the work item was successfully added to the queue, false otherwise
   /// (which typically means that the thread pool has already been shut down).
-  bool Offer(const T& work) {
-    return work_queue_.BlockingPut(work);
+  template <typename V>
+  bool Offer(V&& work) {
+    return work_queue_.BlockingPut(std::forward<V>(work));
   }
 
   /// Shuts the thread pool down, causing the work queue to cease accepting offered work


Mime
View raw message