kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a...@apache.org
Subject [kudu] branch master updated: KUDU-2826: Add tail for mutation list to accelerate update in memrowset
Date Wed, 29 May 2019 21:41:08 GMT
This is an automated email from the ASF dual-hosted git repository.

adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 2db92e3  KUDU-2826: Add tail for mutation list to accelerate update in memrowset
2db92e3 is described below

commit 2db92e322c42145be04a55cb4e8cf64606c96610
Author: triplesheep <triplesheep0419@gmail.com>
AuthorDate: Thu May 23 06:43:33 2019 +0000

    KUDU-2826: Add tail for mutation list to accelerate update in memrowset
    
    We found kudu needs traversing the whole mutation list to find
    tail to append the new mutations when UPDATE or REINSERT in
    memrowset. And it cost considerable time when we update the same
    key tens of thousands times(We encountered this case in production
    environment).
    
    We do YCSB results here for a zipfian update workload before and after change.
    Workload:
        recordcount=1000
        operationcount=1000000
        workload=com.yahoo.ycsb.workloads.CoreWorkload
        readallfields=true
        readproportion=0.2
        updateproportion=0.8
        scanproportion=0
        insertproportion=0
        requestdistribution=zipfian
    
    The performance before:
        Update success: 800304,
        AverageLatency(us): 93.81846898178692,
        MinLatency(us): 1,
        MaxLatency(us): 837631,
        95thPercentileLatency(us): 6
        99thPercentileLatency(us):12
    
    The performance after:
        Update success: 800153,
        AverageLatency(us): 4.785403541572674,
        MinLatency(us): 1,
        MaxLatency(us): 92287,
        95thPercentileLatency(us): 6
        99thPercentileLatency(us): 11
    
    Change-Id: I945f332f8241ecb3cc6ab66e67ee3ee2b4e49be8
    Reviewed-on: http://gerrit.cloudera.org:8080/13412
    Tested-by: Kudu Jenkins
    Reviewed-by: Adar Dembo <adar@cloudera.com>
---
 src/kudu/tablet/memrowset-test.cc | 45 +++++++++++++++++++++++++++++++++++++++
 src/kudu/tablet/memrowset.cc      | 34 ++++++++++++-----------------
 src/kudu/tablet/memrowset.h       | 23 ++++++++------------
 src/kudu/tablet/mutation.cc       | 17 +++++++--------
 src/kudu/tablet/mutation.h        | 14 +++++++-----
 5 files changed, 85 insertions(+), 48 deletions(-)

diff --git a/src/kudu/tablet/memrowset-test.cc b/src/kudu/tablet/memrowset-test.cc
index ff8ba4b..1ddc8c9 100644
--- a/src/kudu/tablet/memrowset-test.cc
+++ b/src/kudu/tablet/memrowset-test.cc
@@ -22,6 +22,7 @@
 #include <memory>
 #include <ostream>
 #include <string>
+#include <unordered_set>
 #include <vector>
 
 #include <boost/optional/optional.hpp>
@@ -50,6 +51,7 @@
 #include "kudu/util/faststring.h"
 #include "kudu/util/mem_tracker.h"
 #include "kudu/util/memory/arena.h"
+#include "kudu/util/random.h"
 #include "kudu/util/slice.h"
 #include "kudu/util/status.h"
 #include "kudu/util/stopwatch.h"
@@ -60,6 +62,10 @@ DEFINE_int32(roundtrip_num_rows, 10000,
              "Number of rows to use for the round-trip test");
 DEFINE_int32(num_scan_passes, 1,
              "Number of passes to run the scan portion of the round-trip test");
+DEFINE_double(update_ratio, 0.2,
+              "Percent of rows to be updated for the update performance test");
+DEFINE_int32(times_to_update, 5000,
+             "Number of updates for each row for the update performance test");
 
 namespace kudu {
 namespace tablet {
@@ -69,6 +75,7 @@ using log::LogAnchorRegistry;
 using std::shared_ptr;
 using std::string;
 using std::unique_ptr;
+using std::unordered_set;
 using std::vector;
 
 class TestMemRowSet : public KuduTest {
@@ -772,5 +779,43 @@ TEST_F(TestMemRowSet, TestScanVirtualColumnIsDeleted) {
   }
 }
 
+// Test for update performance.
+// Can simulates zipfian distribution of updates by setting --update_ratio to a small value
+// and --times_to_update to a high value.
+TEST_F(TestMemRowSet, TestMemRowSetUpdatePerformance) {
+  shared_ptr<MemRowSet> mrs;
+  ASSERT_OK(MemRowSet::Create(0, schema_, log_anchor_registry_.get(),
+                              MemTracker::GetRootTracker(), &mrs));
+  int num_rows = 1000;
+  LOG_TIMING(INFO, "Inserting rows") {
+    ASSERT_OK(InsertRows(mrs.get(), num_rows));
+  }
+
+  LOG_TIMING(INFO, "Counting rows") {
+    int count = mrs->entry_count();
+    ASSERT_EQ(num_rows, count);
+  }
+
+  int nums_to_update = FLAGS_update_ratio * num_rows;
+  unordered_set<int> rows_to_update;
+  Random rand(SeedRandom());
+  while (rows_to_update.size() < nums_to_update) {
+    int next = rand.Uniform(num_rows);
+    rows_to_update.insert(next);
+  }
+
+  LOG_TIMING(INFO, "Updating rows") {
+    for (int i = 0; i < FLAGS_times_to_update; ++i) {
+      for (auto row_idx : rows_to_update) {
+        OperationResultPB result;
+        string key = "hello " + std::to_string(row_idx);
+        ASSERT_OK(UpdateRow(mrs.get(), key, i, &result));
+        ASSERT_EQ(1, result.mutated_stores_size());
+        ASSERT_EQ(0L, result.mutated_stores(0).mrs_id());
+      }
+    }
+  }
+}
+
 } // namespace tablet
 } // namespace kudu
diff --git a/src/kudu/tablet/memrowset.cc b/src/kudu/tablet/memrowset.cc
index 4bb7294..71904cd 100644
--- a/src/kudu/tablet/memrowset.cc
+++ b/src/kudu/tablet/memrowset.cc
@@ -66,25 +66,18 @@ using strings::Substitute;
 static const int kInitialArenaSize = 16;
 
 bool MRSRow::IsGhost() const {
-  bool is_ghost = false;
-  for (const Mutation *mut = header_->redo_head;
-       mut != nullptr;
-       mut = mut->next()) {
-    RowChangeListDecoder decoder(mut->changelist());
-    Status s = decoder.Init();
-    if (!PREDICT_TRUE(s.ok())) {
-      LOG(FATAL) << "Failed to decode: " << mut->changelist().ToString(*schema())
-                  << " (" << s.ToString() << ")";
-    }
-    if (decoder.is_delete()) {
-      DCHECK(!is_ghost);
-      is_ghost = true;
-    } else if (decoder.is_reinsert()) {
-      DCHECK(is_ghost);
-      is_ghost = false;
-    }
+  const Mutation *mut_tail = header_->redo_tail;
+  if (mut_tail == nullptr) {
+    return false;
+  }
+  RowChangeListDecoder decoder(mut_tail->changelist());
+  Status s = decoder.Init();
+  if (!PREDICT_TRUE(s.ok())) {
+    LOG(FATAL) << Substitute("Failed to decode: $0 ($1)",
+                             mut_tail->changelist().ToString(*schema()),
+                             s.ToString());
   }
-  return is_ghost;
+  return decoder.is_delete();
 }
 
 namespace {
@@ -185,6 +178,7 @@ Status MemRowSet::Insert(Timestamp timestamp,
     DEFINE_MRSROW_ON_STACK(this, mrsrow, mrsrow_slice);
     mrsrow.header_->insertion_timestamp = timestamp;
     mrsrow.header_->redo_head = nullptr;
+    mrsrow.header_->redo_tail = nullptr;
     RETURN_NOT_OK(mrsrow.CopyRow(row, arena_.get()));
 
     CHECK(mutation.Insert(mrsrow_slice))
@@ -213,7 +207,7 @@ Status MemRowSet::Reinsert(Timestamp timestamp, const ConstContiguousRow&
row, M
   // This function has "release" semantics which ensures that the memory writes
   // for the mutation are fully published before any concurrent reader sees
   // the appended mutation.
-  mut->AppendToListAtomic(&ms_row->header_->redo_head);
+  mut->AppendToListAtomic(&ms_row->header_->redo_head, &ms_row->header_->redo_tail);
   return Status::OK();
 }
 
@@ -247,7 +241,7 @@ Status MemRowSet::MutateRow(Timestamp timestamp,
     // This function has "release" semantics which ensures that the memory writes
     // for the mutation are fully published before any concurrent reader sees
     // the appended mutation.
-    mut->AppendToListAtomic(&row.header_->redo_head);
+    mut->AppendToListAtomic(&row.header_->redo_head, &row.header_->redo_tail);
 
     MemStoreTargetPB* target = result->add_mutated_stores();
     target->set_mrs_id(id_);
diff --git a/src/kudu/tablet/memrowset.h b/src/kudu/tablet/memrowset.h
index c32f1c5..bd57a14 100644
--- a/src/kudu/tablet/memrowset.h
+++ b/src/kudu/tablet/memrowset.h
@@ -146,12 +146,6 @@ class MRSRow {
 
   // Return true if this row is a "ghost" -- i.e its most recent mutation is
   // a deletion.
-  //
-  // NOTE: this call is O(n) in the number of mutations, since it has to walk
-  // the linked list all the way to the end, checking if each mutation is a
-  // DELETE or REINSERT. We expect the list is usually short (low-update use
-  // cases) but if this becomes a bottleneck, we could cache the 'ghost' status
-  // as a bit inside the row header.
   bool IsGhost() const;
 
  private:
@@ -167,14 +161,15 @@ class MRSRow {
   }
 
   struct Header {
-    // Timestamp for the transaction which inserted this row. If a scanner with an
-    // older snapshot sees this row, it will be ignored.
-    Timestamp insertion_timestamp;
-
-    // Pointer to the first mutation which has been applied to this row. Each
-    // mutation is an instance of the Mutation class, making up a singly-linked
-    // list for any mutations applied to the row.
-    Mutation* redo_head;
+      // Timestamp for the transaction which inserted this row. If a scanner with an
+      // older snapshot sees this row, it will be ignored.
+      Timestamp insertion_timestamp;
+
+      // Pointers to the first and last mutations that have been applied to this row.
+      // Together they comprise a singly-linked list of all of the row's mutations,
+      // with the head and tail used for efficient iteration and insertion respectively.
+      Mutation* redo_head;
+      Mutation* redo_tail;
   };
 
   Header *header_;
diff --git a/src/kudu/tablet/mutation.cc b/src/kudu/tablet/mutation.cc
index 7e02f17..1771aa8 100644
--- a/src/kudu/tablet/mutation.cc
+++ b/src/kudu/tablet/mutation.cc
@@ -51,19 +51,18 @@ std::string Mutation::StringifyMutationList(const Schema &schema,
const Mutation
   return ret;
 }
 
-void Mutation::AppendToListAtomic(Mutation **list) {
+void Mutation::AppendToListAtomic(Mutation** redo_head, Mutation** redo_tail) {
   next_ = nullptr;
-  if (*list == nullptr) {
-    Release_Store(reinterpret_cast<AtomicWord*>(list),
+  if (*redo_tail == nullptr) {
+    Release_Store(reinterpret_cast<AtomicWord*>(redo_head),
+                  reinterpret_cast<AtomicWord>(this));
+    Release_Store(reinterpret_cast<AtomicWord*>(redo_tail),
                   reinterpret_cast<AtomicWord>(this));
   } else {
-    // Find tail and append.
-    Mutation *tail = *list;
-    while (tail->next_ != nullptr) {
-      tail = tail->next_;
-    }
-    Release_Store(reinterpret_cast<AtomicWord*>(&tail->next_),
+    Release_Store(reinterpret_cast<AtomicWord*>(&(*redo_tail)->next_),
                   reinterpret_cast<AtomicWord>(this));
+    Release_Store(reinterpret_cast<AtomicWord*>(redo_tail),
+                  reinterpret_cast<AtomicWord>((*redo_tail)->next_));
   }
 }
 
diff --git a/src/kudu/tablet/mutation.h b/src/kudu/tablet/mutation.h
index fd9d92c..9071665 100644
--- a/src/kudu/tablet/mutation.h
+++ b/src/kudu/tablet/mutation.h
@@ -79,11 +79,15 @@ class Mutation {
   // This should only be used for debugging/logging.
   static std::string StringifyMutationList(const Schema &schema, const Mutation *head);
 
-  // Append this mutation to the list at the given pointer.
-  // This operation uses "Release" memory semantics
-  // (see atomicops.h). The pointer as well as all of the mutations in the list
-  // must be word-aligned.
-  void AppendToListAtomic(Mutation **list);
+  // Appends this mutation to the list given by 'redo_head' and 'redo_tail'.
+  //
+  // This function is atomic provided that callers are externally synchronized
+  // on a per mutation list (i.e. per row) basis. Without this synchronization,
+  // the non-atomicity between 'redo_head' and 'redo_tail' writes may cause errors.
+  //
+  // This operation uses "Release" memory semantics (see atomicops.h). The
+  // pointers as well as all of the mutations in the list must be word-aligned.
+  void AppendToListAtomic(Mutation** redo_head, Mutation** redo_tail);
 
   void PrependToList(Mutation** list) {
     this->next_ = *list;


Mime
View raw message