kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ale...@apache.org
Subject [2/4] kudu git commit: log: fix some incorrect assumptions on BlockingQueue shutdown
Date Thu, 11 May 2017 18:23:10 GMT
log: fix some incorrect assumptions on BlockingQueue shutdown

There was an incorrect comment in the Log::AppendThread body in which it
claimed that BlockingQueue::Drain would potentially return false and
also return elements at the same time.

This was in fact not the case. This patch adds unit tests to verify
the actual behavior, and simplifies the code accordingly.

Change-Id: I43e1304ce242f2def26a29504b76bb3e74a93ca3
Reviewed-on: http://gerrit.cloudera.org:8080/6855
Reviewed-by: Adar Dembo <adar@cloudera.com>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/43fbfdff
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/43fbfdff
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/43fbfdff

Branch: refs/heads/master
Commit: 43fbfdff9abc1516c8489e2f477e8b9fca22878f
Parents: e6141a0
Author: Todd Lipcon <todd@cloudera.com>
Authored: Wed May 10 21:11:34 2017 -0700
Committer: Todd Lipcon <todd@apache.org>
Committed: Thu May 11 06:01:15 2017 +0000

----------------------------------------------------------------------
 src/kudu/consensus/log.cc            | 11 +++--------
 src/kudu/util/blocking_queue-test.cc | 25 +++++++++++++++++++++++++
 src/kudu/util/blocking_queue.h       |  9 +++++++--
 3 files changed, 35 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/43fbfdff/src/kudu/consensus/log.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/log.cc b/src/kudu/consensus/log.cc
index 4a6121d..50ce12b 100644
--- a/src/kudu/consensus/log.cc
+++ b/src/kudu/consensus/log.cc
@@ -193,18 +193,13 @@ Status Log::AppendThread::Init() {
 }
 
 void Log::AppendThread::RunThread() {
-  bool shutting_down = false;
-  while (PREDICT_TRUE(!shutting_down)) {
+  while (true) {
     vector<LogEntryBatch*> entry_batches;
     ElementDeleter d(&entry_batches);
 
-    // We shut down the entry_queue when it's time to shut down the append
-    // thread, which causes this call to return false, while still populating
-    // the entry_batches vector with the final set of log entry batches that
-    // were enqueued. We finish processing this last bunch of log entry batches
-    // before exiting the main RunThread() loop.
     if (PREDICT_FALSE(!log_->entry_queue()->BlockingDrainTo(&entry_batches))) {
-      shutting_down = true;
+      CHECK(entry_batches.empty());
+      break;
     }
 
     if (log_->metrics_) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/43fbfdff/src/kudu/util/blocking_queue-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/blocking_queue-test.cc b/src/kudu/util/blocking_queue-test.cc
index cce21bf..917ada5 100644
--- a/src/kudu/util/blocking_queue-test.cc
+++ b/src/kudu/util/blocking_queue-test.cc
@@ -65,6 +65,31 @@ TEST(BlockingQueueTest, TestBlockingDrainTo) {
   ASSERT_EQ(3, out[2]);
 }
 
+// Test that, when the queue is shut down with elements still pending,
+// Drain still returns true until the elements are all gone.
+TEST(BlockingQueueTest, TestGetAndDrainAfterShutdown) {
+  // Put some elements into the queue and then shut it down.
+  BlockingQueue<int32_t> q(3);
+  ASSERT_EQ(q.Put(1), QUEUE_SUCCESS);
+  ASSERT_EQ(q.Put(2), QUEUE_SUCCESS);
+
+  q.Shutdown();
+
+  // Get() should still return an element.
+  int i;
+  ASSERT_TRUE(q.BlockingGet(&i));
+  ASSERT_EQ(1, i);
+
+  // Drain should still return true, since it yielded elements.
+  vector<int32_t> out;
+  ASSERT_TRUE(q.BlockingDrainTo(&out));
+  ASSERT_EQ(2, out[0]);
+
+  // Now that it's empty, it should return false.
+  ASSERT_FALSE(q.BlockingDrainTo(&out));
+  ASSERT_FALSE(q.BlockingGet(&i));
+}
+
 TEST(BlockingQueueTest, TestTooManyInsertions) {
   BlockingQueue<int32_t> test_queue(2);
   ASSERT_EQ(test_queue.Put(123), QUEUE_SUCCESS);

http://git-wip-us.apache.org/repos/asf/kudu/blob/43fbfdff/src/kudu/util/blocking_queue.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/blocking_queue.h b/src/kudu/util/blocking_queue.h
index b44c129..86a4c48 100644
--- a/src/kudu/util/blocking_queue.h
+++ b/src/kudu/util/blocking_queue.h
@@ -99,8 +99,13 @@ class BlockingQueue {
     return true;
   }
 
-  // Get all elements from the queue and append them to a
-  // vector. Returns false if shutdown prior to getting the elements.
+  // Get all elements from the queue and append them to a vector.
+  //
+  // If the queue has been shut down, but there are still elements waiting,
+  // then it returns those elements as if the queue were not yet shut down.
+  //
+  // Returns false if the queue has been shut down and has no more remaining
+  // elements.
   bool BlockingDrainTo(std::vector<T>* out) {
     MutexLock l(lock_);
     while (true) {


Mime
View raw message