impala-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [1/2] incubator-impala git commit: IMPALA-4026: Implement double-buffering for BlockingQueue
Date Fri, 30 Sep 2016 18:32:53 GMT
Repository: incubator-impala
Updated Branches:
  refs/heads/master 2a31fbdbf -> 757c68b29


IMPALA-4026: Implement double-buffering for BlockingQueue

With recent changes to improve the parquet scanner's efficency,
row batches are produced more quickly, leading to higher contention
in the blocking queue shared between scanner threads and the scan
node. The contention happens between different producers (i.e.
the scanner threads) and also to a lesser extent, between the
scanner threads and the scan node.

This change addresses the contention between the scanner threads
and the scan node by splitting the queue into a 'get_list_' and
a 'put_list_'. The consumers will consume from 'get_list_' until
it's exhausted while the producers will enqueue into 'put_list_'
until it's full. When 'get_list_' is exhausted, the consumer will
atomically swap the 'get_list_' with 'put_list_'. This reduces
the contention: 'get_list_' and 'put_list_' are protected by two
different locks so callers of BlockingGet() only contends for the
'put_lock_' when 'put_list_' is empty.

With this change, primitive_filter_bigint_non_selective improves
by 33.9%, going from 1.60s to 1.06s

Change-Id: Ib9f4cf351455efefb0f3bb791cf9bc82d1421d54
Reviewed-on: http://gerrit.cloudera.org:8080/4350
Reviewed-by: Michael Ho <kwho@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/70504502
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/70504502
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/70504502

Branch: refs/heads/master
Commit: 705045021ed1f53cdfeabf0dcc01b76144e54855
Parents: 2a31fbd
Author: Michael Ho <kwho@cloudera.com>
Authored: Thu Sep 1 17:15:12 2016 -0700
Committer: Internal Jenkins <cloudera-hudson@gerrit.cloudera.org>
Committed: Fri Sep 30 06:16:07 2016 +0000

----------------------------------------------------------------------
 be/src/common/compiler-util.h    |   2 +
 be/src/exec/hdfs-scan-node.cc    |   6 +-
 be/src/exec/hdfs-scan-node.h     |   6 +
 be/src/util/blocking-queue.h     | 203 +++++++++++++++++++++++-----------
 be/src/util/condition-variable.h |  65 +++++++++++
 be/src/util/thread-pool.h        |   6 +-
 6 files changed, 221 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/70504502/be/src/common/compiler-util.h
----------------------------------------------------------------------
diff --git a/be/src/common/compiler-util.h b/be/src/common/compiler-util.h
index fb4697b..d57767b 100644
--- a/be/src/common/compiler-util.h
+++ b/be/src/common/compiler-util.h
@@ -43,5 +43,7 @@
 /// decision, e.g. not inlining a small function on a hot path.
 #define ALWAYS_INLINE __attribute__((always_inline))
 
+#define CACHE_ALIGNED __attribute__ ((aligned(64)))
+
 #endif
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/70504502/be/src/exec/hdfs-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc
index 03f81ed..d1092d1 100644
--- a/be/src/exec/hdfs-scan-node.cc
+++ b/be/src/exec/hdfs-scan-node.cc
@@ -101,6 +101,8 @@ Status HdfsScanNode::GetNext(RuntimeState* state, RowBatch* row_batch,
bool* eos
     unique_lock<mutex> l(lock_);
     lock_guard<SpinLock> l2(file_type_counts_);
     StopAndFinalizeCounters();
+    row_batches_put_timer_->Set(materialized_row_batches_->total_put_wait_time());
+    row_batches_get_timer_->Set(materialized_row_batches_->total_get_wait_time());
   }
   return status;
 }
@@ -182,6 +184,8 @@ Status HdfsScanNode::Prepare(RuntimeState* state) {
     }
   }
   scanner_thread_bytes_required_ += scanner_thread_mem_usage;
+  row_batches_get_timer_ = ADD_TIMER(runtime_profile(), "RowBatchQueueGetWaitTime");
+  row_batches_put_timer_ = ADD_TIMER(runtime_profile(), "RowBatchQueuePutWaitTime");
   return Status::OK();
 }
 
@@ -330,7 +334,7 @@ void HdfsScanNode::ThreadTokenAvailableCb(ThreadResourceMgr::ResourcePool*
pool)
 
     // Cases 5 and 6.
     if (active_scanner_thread_counter_.value() > 0 &&
-        (materialized_row_batches_->GetSize() >= max_materialized_row_batches_ ||
+        (materialized_row_batches_->Size() >= max_materialized_row_batches_ ||
          !EnoughMemoryForScannerThread(true))) {
       break;
     }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/70504502/be/src/exec/hdfs-scan-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.h b/be/src/exec/hdfs-scan-node.h
index 2efb24e..dc85c08 100644
--- a/be/src/exec/hdfs-scan-node.h
+++ b/be/src/exec/hdfs-scan-node.h
@@ -164,6 +164,12 @@ class HdfsScanNode : public HdfsScanNodeBase {
   /// the number of cores.
   int max_num_scanner_threads_;
 
+  /// The wait time for fetching a row batch from the row batch queue.
+  RuntimeProfile::Counter* row_batches_get_timer_;
+
+  /// The wait time for enqueuing a row batch into the row batch queue.
+  RuntimeProfile::Counter* row_batches_put_timer_;
+
   /// Tries to spin up as many scanner threads as the quota allows. Called explicitly
   /// (e.g., when adding new ranges) or when threads are available for this scan node.
   void ThreadTokenAvailableCb(ThreadResourceMgr::ResourcePool* pool);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/70504502/be/src/util/blocking-queue.h
----------------------------------------------------------------------
diff --git a/be/src/util/blocking-queue.h b/be/src/util/blocking-queue.h
index bb9d27e..e12793e 100644
--- a/be/src/util/blocking-queue.h
+++ b/be/src/util/blocking-queue.h
@@ -21,142 +21,219 @@
 
 #include <boost/thread/condition_variable.hpp>
 #include <boost/thread/mutex.hpp>
+#include <boost/scoped_ptr.hpp>
 #include <deque>
 #include <unistd.h>
 
+#include "common/atomic.h"
+#include "common/compiler-util.h"
+#include "util/condition-variable.h"
 #include "util/stopwatch.h"
 #include "util/time.h"
 
 namespace impala {
 
-/// Fixed capacity FIFO queue, where both BlockingGet and BlockingPut operations block
+/// Fixed capacity FIFO queue, where both BlockingGet() and BlockingPut() operations block
 /// if the queue is empty or full, respectively.
-
-/// TODO: Add some double-buffering so that readers do not block writers and vice versa.
-/// Or, implement a mostly lock-free blocking queue.
+///
+/// FIFO is made up of a 'get_list_' that BlockingGet() consumes from and a 'put_list_'
+/// that BlockingPut() enqueues into. They are protected by 'get_lock_' and 'put_lock_'
+/// respectively. If both locks need to be held at the same time, 'get_lock_' must be
+/// held before 'put_lock_'. When the 'get_list_' is empty, the caller of BlockingGet()
+/// will atomically swap the 'put_list_' with 'get_list_'. The swapping happens with both
+/// the 'get_lock_' and 'put_lock_' held.
 template <typename T>
 class BlockingQueue {
-
  public:
   BlockingQueue(size_t max_elements)
     : shutdown_(false),
       max_elements_(max_elements),
-      total_get_wait_time_(0),
-      total_put_wait_time_(0) {
+      total_put_wait_time_(0),
+      get_list_size_(0),
+      total_get_wait_time_(0) {
+    DCHECK_GT(max_elements_, 0);
+    // Make sure class members commonly used in BlockingPut() don't alias with class
+    // members used in BlockingGet(). 'pad_' is the point of division.
+    DCHECK_NE(offsetof(BlockingQueue, pad_) / 64,
+        offsetof(BlockingQueue, get_lock_) / 64);
   }
 
-  /// Get an element from the queue, waiting indefinitely for one to become available.
+  /// Gets an element from the queue, waiting indefinitely for one to become available.
   /// Returns false if we were shut down prior to getting the element, and there
   /// are no more elements available.
   bool BlockingGet(T* out) {
-    MonotonicStopWatch timer;
-    boost::unique_lock<boost::mutex> unique_lock(lock_);
-
-    while (true) {
-      if (!list_.empty()) {
-        *out = list_.front();
-        list_.pop_front();
-        total_get_wait_time_ += timer.ElapsedTime();
-        unique_lock.unlock();
-        put_cv_.notify_one();
-        return true;
+    boost::unique_lock<boost::mutex> read_lock(get_lock_);
+
+    if (UNLIKELY(get_list_.empty())) {
+      MonotonicStopWatch timer;
+      // Block off writers while swapping 'get_list_' with 'put_list_'.
+      boost::unique_lock<boost::mutex> write_lock(put_lock_);
+      while (put_list_.empty()) {
+        DCHECK(get_list_.empty());
+        if (UNLIKELY(shutdown_)) return false;
+        // Note that it's intentional to signal the writer while holding 'put_lock_' to
+        // avoid the race in which the writer may be signalled between when it checks
+        // the queue size and when it calls Wait() in BlockingGet(). NotifyAll() is not
+        // used here to avoid thundering herd which leads to contention (e.g. InitTuple()
+        // in scanner).
+        put_cv_.NotifyOne();
+        // Sleep with 'get_lock_' held to block off other readers which cannot
+        // make progress anyway.
+        timer.Start();
+        get_cv_.Wait(write_lock);
+        timer.Stop();
       }
-      if (shutdown_) return false;
-
-      timer.Start();
-      get_cv_.wait(unique_lock);
-      timer.Stop();
+      DCHECK(!put_list_.empty());
+      put_list_.swap(get_list_);
+      get_list_size_.Store(get_list_.size());
+      write_lock.unlock();
+      total_get_wait_time_ += timer.ElapsedTime();
     }
+
+    DCHECK(!get_list_.empty());
+    *out = get_list_.front();
+    get_list_.pop_front();
+    get_list_size_.Store(get_list_.size());
+    read_lock.unlock();
+    // Note that there is a race with any writer if NotifyOne() is called between when
+    // a writer checks the queue size and when it calls put_cv_.Wait(). If this race
+    // occurs, a writer can stay blocked even if the queue is not full until the next
+    // BlockingGet(). The race is benign correctness wise as BlockingGet() will always
+    // notify a writer with 'put_lock_' held when both lists are empty.
+    put_cv_.NotifyOne();
+    return true;
   }
 
   /// Puts an element into the queue, waiting indefinitely until there is space.
   /// If the queue is shut down, returns false.
   bool BlockingPut(const T& val) {
     MonotonicStopWatch timer;
-    boost::unique_lock<boost::mutex> unique_lock(lock_);
+    boost::unique_lock<boost::mutex> write_lock(put_lock_);
 
-    while (list_.size() >= max_elements_ && !shutdown_) {
+    while (SizeLocked(write_lock) >= max_elements_ && !shutdown_) {
       timer.Start();
-      put_cv_.wait(unique_lock);
+      put_cv_.Wait(write_lock);
       timer.Stop();
     }
     total_put_wait_time_ += timer.ElapsedTime();
-    if (shutdown_) return false;
+    if (UNLIKELY(shutdown_)) return false;
 
-    DCHECK_LT(list_.size(), max_elements_);
-    list_.push_back(val);
-    unique_lock.unlock();
-    get_cv_.notify_one();
+    DCHECK_LT(put_list_.size(), max_elements_);
+    put_list_.push_back(val);
+    write_lock.unlock();
+    get_cv_.NotifyOne();
     return true;
   }
 
   /// Puts an element into the queue, waiting until 'timeout_micros' elapses, if there is
-  /// no space. If the queue is shut down, or if the timeout elapsed without being able to
put the
-  /// element, returns false.
+  /// no space. If the queue is shut down, or if the timeout elapsed without being able to
+  /// put the element, returns false.
   bool BlockingPutWithTimeout(const T& val, int64_t timeout_micros) {
     MonotonicStopWatch timer;
-    boost::unique_lock<boost::mutex> unique_lock(lock_);
+    boost::unique_lock<boost::mutex> write_lock(put_lock_);
     boost::system_time wtime = boost::get_system_time() +
         boost::posix_time::microseconds(timeout_micros);
+    const struct timespec timeout = boost::detail::to_timespec(wtime);
     bool notified = true;
-    while (list_.size() >= max_elements_ && !shutdown_ && notified) {
+    while (SizeLocked(write_lock) >= max_elements_ && !shutdown_ && notified)
{
       timer.Start();
       // Wait until we're notified or until the timeout expires.
-      notified = put_cv_.timed_wait(unique_lock, wtime);
+      notified = put_cv_.TimedWait(write_lock, &timeout);
       timer.Stop();
     }
     total_put_wait_time_ += timer.ElapsedTime();
-    // If the list is still full or if the the queue has been shutdown, return false.
+    // If the list is still full or if the the queue has been shut down, return false.
     // NOTE: We don't check 'notified' here as it appears that pthread condition variables
     // have a weird behavior in which they can return ETIMEDOUT from timed_wait even if
     // another thread did in fact signal
-    if (list_.size() >= max_elements_ || shutdown_) return false;
-    DCHECK_LT(list_.size(), max_elements_);
-    list_.push_back(val);
-    unique_lock.unlock();
-    get_cv_.notify_one();
+    if (SizeLocked(write_lock) >= max_elements_ || shutdown_) return false;
+    DCHECK_LT(put_list_.size(), max_elements_);
+    put_list_.push_back(val);
+    write_lock.unlock();
+    get_cv_.NotifyOne();
     return true;
   }
 
   /// Shut down the queue. Wakes up all threads waiting on BlockingGet or BlockingPut.
   void Shutdown() {
     {
-      boost::lock_guard<boost::mutex> guard(lock_);
+      // No need to hold 'get_lock_' here. BlockingGet() may sleep with 'get_lock_' so
+      // it may delay the caller here if the lock is acquired.
+      boost::lock_guard<boost::mutex> write_lock(put_lock_);
       shutdown_ = true;
     }
 
-    get_cv_.notify_all();
-    put_cv_.notify_all();
+    get_cv_.NotifyAll();
+    put_cv_.NotifyAll();
   }
 
-  uint32_t GetSize() const {
-    boost::unique_lock<boost::mutex> l(lock_);
-    return list_.size();
+  uint32_t Size() const {
+    boost::unique_lock<boost::mutex> write_lock(put_lock_);
+    return SizeLocked(write_lock);
   }
 
-  /// Returns the total amount of time threads have blocked in BlockingGet.
-  uint64_t total_get_wait_time() const {
-    boost::lock_guard<boost::mutex> guard(lock_);
+  int64_t total_get_wait_time() const {
+    // Hold lock to make sure the value read is consistent (i.e. no torn read).
+    boost::lock_guard<boost::mutex> read_lock(get_lock_);
     return total_get_wait_time_;
   }
 
-  /// Returns the total amount of time threads have blocked in BlockingPut.
-  uint64_t total_put_wait_time() const {
-    boost::lock_guard<boost::mutex> guard(lock_);
+  int64_t total_put_wait_time() const {
+    // Hold lock to make sure the value read is consistent (i.e. no torn read).
+    boost::lock_guard<boost::mutex> write_lock(put_lock_);
     return total_put_wait_time_;
   }
 
  private:
+
+  uint32_t ALWAYS_INLINE SizeLocked(const boost::unique_lock<boost::mutex>& lock)
const {
+    // The size of 'get_list_' is read racily to avoid getting 'get_lock_' in write path.
+    DCHECK(lock.mutex() == &put_lock_ && lock.owns_lock());
+    return get_list_size_.Load() + put_list_.size();
+  }
+
+  /// True if the BlockingQueue is being shut down. Guarded by 'put_lock_'.
   bool shutdown_;
+
+  /// Maximum total number of elements in 'get_list_' + 'put_list_'.
   const int max_elements_;
-  boost::condition_variable get_cv_;   // 'get' callers wait on this
-  boost::condition_variable put_cv_;   // 'put' callers wait on this
-  /// lock_ guards access to list_, total_get_wait_time, and total_put_wait_time
-  mutable boost::mutex lock_;
-  std::deque<T> list_;
-  uint64_t total_get_wait_time_;
-  uint64_t total_put_wait_time_;
-};
+
+  /// Guards against concurrent access to 'put_list_'.
+  /// Please see comments at the beginning of the file for lock ordering.
+  mutable boost::mutex put_lock_;
+
+  /// The queue for items enqueued by BlockingPut(). Guarded by 'put_lock_'.
+  std::deque<T> put_list_;
+
+  /// BlockingPut()/BlockingPutWithTimeout() wait on this.
+  ConditionVariable put_cv_;
+
+  /// Total amount of time threads blocked in BlockingPut(). Guarded by 'put_lock_'.
+  int64_t total_put_wait_time_;
+
+  /// Padding to avoid data structures used in BlockingGet() to share cache lines
+  /// with data structures used in BlockingPut().
+  int64_t pad_;
+
+  /// Guards against concurrent access to 'get_list_'.
+  mutable boost::mutex get_lock_;
+
+  /// The queue of items to be consumed by BlockingGet(). Guarded by 'get_lock_'.
+  std::deque<T> get_list_;
+
+  /// The size of 'get_list_'. Read without lock held so explicitly use an AtomicInt32
+  /// to make sure readers will read a consistent value on all CPU architectures.
+  AtomicInt32 get_list_size_;
+
+  /// BlockingGet() waits on this.
+  ConditionVariable get_cv_;
+
+  /// Total amount of time a thread blocked in BlockingGet(). Guarded by 'get_lock_'.
+  /// Note that a caller of BlockingGet() may sleep with 'get_lock_' held and this
+  /// variable doesn't include the time which other threads block waiting for 'get_lock_'.
+  int64_t total_get_wait_time_;
+
+} CACHE_ALIGNED;
 
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/70504502/be/src/util/condition-variable.h
----------------------------------------------------------------------
diff --git a/be/src/util/condition-variable.h b/be/src/util/condition-variable.h
new file mode 100644
index 0000000..b0b1090
--- /dev/null
+++ b/be/src/util/condition-variable.h
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_UTIL_CONDITION_VARIABLE_H
+#define IMPALA_UTIL_CONDITION_VARIABLE_H
+
+#include <boost/thread/pthread/timespec.hpp>
+#include <boost/thread/mutex.hpp>
+#include <pthread.h>
+#include <unistd.h>
+
+namespace impala {
+
+/// Simple wrapper around POSIX pthread condition variable. This has lower overhead than
+/// boost's implementation as it doesn't implement boost thread interruption.
+class ConditionVariable {
+ public:
+  ConditionVariable() { pthread_cond_init(&cv_, NULL); }
+
+  ~ConditionVariable() { pthread_cond_destroy(&cv_); }
+
+  /// Wait indefinitely on the condition variable until it's notified.
+  inline void Wait(boost::unique_lock<boost::mutex>& lock) {
+    DCHECK(lock.owns_lock());
+    pthread_mutex_t* mutex = lock.mutex()->native_handle();
+    pthread_cond_wait(&cv_, mutex);
+  }
+
+  /// Wait until the condition variable is notified or 'timeout' has passed.
+  /// Returns true if the condition variable is notified before the absolute timeout
+  /// specified in 'timeout' has passed. Returns false otherwise.
+  inline bool TimedWait(boost::unique_lock<boost::mutex>& lock,
+      const struct timespec* timeout) {
+    DCHECK(lock.owns_lock());
+    pthread_mutex_t* mutex = lock.mutex()->native_handle();
+    return pthread_cond_timedwait(&cv_, mutex, timeout) == 0;
+  }
+
+  /// Notify a single waiter on this condition variable.
+  inline void NotifyOne() { pthread_cond_signal(&cv_); }
+
+  /// Notify all waiters on this condition variable.
+  inline void NotifyAll() { pthread_cond_broadcast(&cv_); }
+
+ private:
+  pthread_cond_t cv_;
+
+};
+
+}
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/70504502/be/src/util/thread-pool.h
----------------------------------------------------------------------
diff --git a/be/src/util/thread-pool.h b/be/src/util/thread-pool.h
index ccc49c9..d7be6dd 100644
--- a/be/src/util/thread-pool.h
+++ b/be/src/util/thread-pool.h
@@ -97,7 +97,7 @@ class ThreadPool {
   }
 
   uint32_t GetQueueSize() const {
-    return work_queue_.GetSize();
+    return work_queue_.Size();
   }
 
   /// Blocks until the work queue is empty, and then calls Shutdown to stop the worker
@@ -106,7 +106,7 @@ class ThreadPool {
   void DrainAndShutdown() {
     {
       boost::unique_lock<boost::mutex> l(lock_);
-      while (work_queue_.GetSize() != 0) {
+      while (work_queue_.Size() != 0) {
         empty_cv_.wait(l);
       }
     }
@@ -123,7 +123,7 @@ class ThreadPool {
       if (work_queue_.BlockingGet(&workitem)) {
         work_function_(thread_id, workitem);
       }
-      if (work_queue_.GetSize() == 0) {
+      if (work_queue_.Size() == 0) {
         /// Take lock to ensure that DrainAndShutdown() cannot be between checking
         /// GetSize() and wait()'ing when the condition variable is notified.
         /// (It will hang if we notify right before calling wait().)


Mime
View raw message