kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject [1/2] kudu git commit: log: improve mt-log-test benchmark
Date Thu, 09 Mar 2017 22:45:42 GMT
Repository: kudu
Updated Branches:
  refs/heads/master 937064f91 -> fc50b98aa


log: improve mt-log-test benchmark

- builds the entries to be appended outside of holding any lock, which
  is more realistic. The entries get their IDs set once inside the lock
  (similar to what we do in real code).
- use the same higher-level APIs that the real append paths use
- make the log segment size overridable
- allow the "verification" step to be disabled to serve as a better benchmark
- allow the concurrent "reader" thread to be disabled
- a couple bug fixes here and there

Change-Id: I870dc26e2937e7c92e3f0530e2c2880178507f12
Reviewed-on: http://gerrit.cloudera.org:8080/6283
Tested-by: Kudu Jenkins
Reviewed-by: David Ribeiro Alves <dralves@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/57a26b19
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/57a26b19
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/57a26b19

Branch: refs/heads/master
Commit: 57a26b199571d0bb39ede709e8ee569370340417
Parents: 937064f
Author: Todd Lipcon <todd@apache.org>
Authored: Mon Mar 6 21:48:59 2017 -0800
Committer: Todd Lipcon <todd@apache.org>
Committed: Thu Mar 9 19:13:57 2017 +0000

----------------------------------------------------------------------
 src/kudu/consensus/mt-log-test.cc | 144 ++++++++++++++++++---------------
 1 file changed, 78 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/57a26b19/src/kudu/consensus/mt-log-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/mt-log-test.cc b/src/kudu/consensus/mt-log-test.cc
index 160dfb9..09cd819 100644
--- a/src/kudu/consensus/mt-log-test.cc
+++ b/src/kudu/consensus/mt-log-test.cc
@@ -32,8 +32,10 @@
 #include "kudu/util/thread.h"
 
 DEFINE_int32(num_writer_threads, 4, "Number of threads writing to the log");
+DEFINE_int32(num_reader_threads, 1, "Number of threads accessing the log while writes are
ongoing");
 DEFINE_int32(num_batches_per_thread, 2000, "Number of batches per thread");
 DEFINE_int32(num_ops_per_batch_avg, 5, "Target average number of ops per batch");
+DEFINE_bool(verify_log, true, "Whether to verify the log by reading it after the writes complete");
 
 namespace kudu {
 namespace log {
@@ -80,54 +82,54 @@ class MultiThreadedLogTest : public LogTestBase {
     LogTestBase::SetUp();
   }
 
+  vector<consensus::ReplicateRefPtr> CreateRandomBatch() {
+    int num_ops = static_cast<int>(random_.Normal(
+        static_cast<double>(FLAGS_num_ops_per_batch_avg), 1.0));
+    DVLOG(1) << num_ops << " ops in this batch";
+    num_ops = std::max(num_ops, 1);
+    vector<consensus::ReplicateRefPtr> ret;
+    for (int j = 0; j < num_ops; j++) {
+      ReplicateRefPtr replicate = make_scoped_refptr_replicate(new ReplicateMsg);
+      replicate->get()->set_op_type(WRITE_OP);
+      replicate->get()->set_timestamp(clock_->Now().ToUint64());
+      tserver::WriteRequestPB* request = replicate->get()->mutable_write_request();
+      AddTestRowToPB(RowOperationsPB::INSERT, schema_, 12345, 0,
+                     "this is a test insert",
+                     request->mutable_row_operations());
+      request->set_tablet_id(kTestTablet);
+      ret.push_back(replicate);
+    }
+    return ret;
+  }
+
+  void AssignIndexes(vector<consensus::ReplicateRefPtr>* batch) {
+    for (auto& rep : *batch) {
+      OpId* op_id = rep->get()->mutable_id();
+      op_id->set_term(0);
+      op_id->set_index(current_index_++);
+    }
+  }
+
   void LogWriterThread(int thread_id) {
     CountDownLatch latch(FLAGS_num_batches_per_thread);
     vector<Status> errors;
     for (int i = 0; i < FLAGS_num_batches_per_thread; i++) {
-      LogEntryBatch* entry_batch;
-      vector<consensus::ReplicateRefPtr> batch_replicates;
-      int num_ops = static_cast<int>(random_.Normal(
-          static_cast<double>(FLAGS_num_ops_per_batch_avg), 1.0));
-      DVLOG(1) << num_ops << " ops in this batch";
-      num_ops =  std::max(num_ops, 1);
-      {
-        std::lock_guard<simple_spinlock> lock_guard(lock_);
-        for (int j = 0; j < num_ops; j++) {
-          ReplicateRefPtr replicate = make_scoped_refptr_replicate(new ReplicateMsg);
-          int32_t index = current_index_++;
-          OpId* op_id = replicate->get()->mutable_id();
-          op_id->set_term(0);
-          op_id->set_index(index);
-
-          replicate->get()->set_op_type(WRITE_OP);
-          replicate->get()->set_timestamp(clock_->Now().ToUint64());
-
-          tserver::WriteRequestPB* request = replicate->get()->mutable_write_request();
-          AddTestRowToPB(RowOperationsPB::INSERT, schema_, index, 0,
-                         "this is a test insert",
-                         request->mutable_row_operations());
-          request->set_tablet_id(kTestTablet);
-          batch_replicates.push_back(replicate);
-        }
-
-        gscoped_ptr<log::LogEntryBatchPB> entry_batch_pb;
-        CreateBatchFromAllocatedOperations(batch_replicates,
-                                           &entry_batch_pb);
-
-        ASSERT_OK(log_->Reserve(REPLICATE, std::move(entry_batch_pb), &entry_batch));
-      } // lock_guard scope
+      // Do the expensive allocation outside the lock.
+      vector<consensus::ReplicateRefPtr> batch_replicates = CreateRandomBatch();
       auto cb = new CustomLatchCallback(&latch, &errors);
-      entry_batch->SetReplicates(batch_replicates);
-      log_->AsyncAppend(entry_batch, cb->AsStatusCallback());
-    }
-    LOG_TIMING(INFO, strings::Substitute("thread $0 waiting to append and sync $1 batches",
-                                        thread_id, FLAGS_num_batches_per_thread)) {
-      latch.Wait();
+      // Assign indexes and append inside the lock, so that the index order and
+      // log order match up.
+      {
+        std::lock_guard<simple_spinlock> l(lock_);
+        AssignIndexes(&batch_replicates);
+        ASSERT_OK(log_->AsyncAppendReplicates(batch_replicates, cb->AsStatusCallback()));
+      }
     }
+    latch.Wait();
     for (const Status& status : errors) {
       WARN_NOT_OK(status, "Unexpected failure during AsyncAppend");
     }
-    ASSERT_EQ(0, errors.size());
+    CHECK_EQ(0, errors.size());
   }
 
   void Run() {
@@ -141,15 +143,18 @@ class MultiThreadedLogTest : public LogTestBase {
     // Start a thread which calls some read-only methods on the log
     // to check for races against writers.
     std::atomic<bool> stop_reader(false);
-    std::thread reader_thread([&]() {
-        std::map<int64_t, int64_t> map;
-        OpId opid;
-        while (!stop_reader) {
-          log_->GetLatestEntryOpId(&opid);
-          log_->GetReplaySizeMap(&map);
-          IgnoreResult(log_->GetGCableDataSize(RetentionIndexes(FLAGS_num_batches_per_thread)));
-        }
-      });
+    vector<std::thread> reader_threads;
+    for (int i = 0; i < FLAGS_num_reader_threads; i++) {
+      reader_threads.emplace_back([&]() {
+          std::map<int64_t, int64_t> map;
+          OpId opid;
+          while (!stop_reader) {
+            log_->GetLatestEntryOpId(&opid);
+            log_->GetReplaySizeMap(&map);
+            IgnoreResult(log_->GetGCableDataSize(RetentionIndexes(FLAGS_num_batches_per_thread)));
+          }
+        });
+    }
 
     // Wait for the writers to finish.
     for (scoped_refptr<kudu::Thread>& thread : threads_) {
@@ -158,7 +163,9 @@ class MultiThreadedLogTest : public LogTestBase {
 
     // Then stop the reader and join on it as well.
     stop_reader = true;
-    reader_thread.join();
+    for (auto& t : reader_threads) {
+      t.join();
+    }
   }
  private:
   ThreadSafeRandom random_;
@@ -167,31 +174,36 @@ class MultiThreadedLogTest : public LogTestBase {
 };
 
 TEST_F(MultiThreadedLogTest, TestAppends) {
-  // Roll frequently to stress related code paths.
-  options_.segment_size_mb = 1;
+  // Roll frequently to stress related code paths, unless overridden
+  // on the command line.
+  if (google::GetCommandLineFlagInfoOrDie("log_segment_size_mb").is_default) {
+    options_.segment_size_mb = 1;
+  }
 
   ASSERT_OK(BuildLog());
   int start_current_id = current_index_;
   LOG_TIMING(INFO, strings::Substitute("inserting $0 batches($1 threads, $2 per-thread)",
-                                      FLAGS_num_writer_threads * FLAGS_num_batches_per_thread,
-                                      FLAGS_num_batches_per_thread, FLAGS_num_writer_threads))
{
+                                       FLAGS_num_writer_threads * FLAGS_num_batches_per_thread,
+                                       FLAGS_num_writer_threads,
+                                       FLAGS_num_batches_per_thread)) {
     ASSERT_NO_FATAL_FAILURE(Run());
   }
   ASSERT_OK(log_->Close());
-
-  shared_ptr<LogReader> reader;
-  ASSERT_OK(LogReader::Open(fs_manager_.get(), nullptr, kTestTablet, nullptr, &reader));
-  SegmentSequence segments;
-  ASSERT_OK(reader->GetSegmentsSnapshot(&segments));
-
-  for (const SegmentSequence::value_type& entry : segments) {
-    ASSERT_OK(entry->ReadEntries(&entries_));
+  if (FLAGS_verify_log) {
+    shared_ptr<LogReader> reader;
+    ASSERT_OK(LogReader::Open(fs_manager_.get(), nullptr, kTestTablet, nullptr, &reader));
+    SegmentSequence segments;
+    ASSERT_OK(reader->GetSegmentsSnapshot(&segments));
+
+    for (const SegmentSequence::value_type& entry : segments) {
+      ASSERT_OK(entry->ReadEntries(&entries_));
+    }
+    vector<uint32_t> ids;
+    EntriesToIdList(&ids);
+    DVLOG(1) << "Wrote total of " << current_index_ - start_current_id <<
" ops";
+    ASSERT_EQ(current_index_ - start_current_id, ids.size());
+    ASSERT_TRUE(std::is_sorted(ids.begin(), ids.end()));
   }
-  vector<uint32_t> ids;
-  EntriesToIdList(&ids);
-  DVLOG(1) << "Wrote total of " << current_index_ - start_current_id <<
" ops";
-  ASSERT_EQ(current_index_ - start_current_id, ids.size());
-  ASSERT_TRUE(std::is_sorted(ids.begin(), ids.end()));
 }
 
 } // namespace log


Mime
View raw message