kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jdcry...@apache.org
Subject incubator-kudu git commit: KUDU-749 (part 2): avoid O(n^2) behavior when compacting deltas
Date Thu, 26 May 2016 17:32:57 GMT
Repository: incubator-kudu
Updated Branches:
  refs/heads/branch-0.9.x 1bd7d3c3d -> e7dad227a


KUDU-749 (part 2): avoid O(n^2) behavior when compacting deltas

When handling a zipfian update workload, we have a lot of delta records applied
to a single row. In the compaction code path, we were previously using
Mutation::AppendToList() to append each such delta to the end of a linked list.
Thus, constructing the linked list was O(n^2) in the number of deltas
corresponding to a row. Each such iteration also most likely was a CPU cache
miss, making this quite slow.

In fact, in a YCSB workload I'm currently testing, a single compaction has been
running for over an hour and a half, spending all of its time in the
AppendToList function.

This patch changes the flow so that the deltas are collected by prepending them
to the linked list (an O(1) operation). The resulting list is then reversed (an
O(n) operation) before feeding into the rest of the unchanged compaction code
path.

This patch is also notable for being one of the few times in real life where a
"reverse the linked list" algorithm was actually necessary. Due to the
notoriously tricky nature of this foundational computer science problem (I once
botched it on a job interview), and the fact that this was a so-called "real
life situation", I googled the algorithm. Thus, I am confident of its
correctness. Aside from such assurances, this code path is also very well
covered by existing test cases which perform multiple updates against the same
row (in particular fuzz-itest).

Change-Id: If6bfa3fc6f41998b0f1ff58c5c8ea39881022de5
Reviewed-on: http://gerrit.cloudera.org:8080/3221
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <adar@cloudera.com>
(cherry picked from commit c7178e97e842f42e9ed9d5e9e2a4f521fbe70b6b)
Reviewed-on: http://gerrit.cloudera.org:8080/3224
Reviewed-by: Jean-Daniel Cryans


Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/e7dad227
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/e7dad227
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/e7dad227

Branch: refs/heads/branch-0.9.x
Commit: e7dad227accafc9f51849d8e792b0c29fb022a28
Parents: 1bd7d3c
Author: Todd Lipcon <todd@apache.org>
Authored: Wed May 25 15:54:57 2016 -0700
Committer: Jean-Daniel Cryans <jdcryans@gerrit.cloudera.org>
Committed: Thu May 26 17:25:24 2016 +0000

----------------------------------------------------------------------
 src/kudu/tablet/compaction.cc       |  2 ++
 src/kudu/tablet/compaction.h        |  4 ++--
 src/kudu/tablet/delta_compaction.cc |  1 +
 src/kudu/tablet/delta_store.h       |  4 ++--
 src/kudu/tablet/deltafile.cc        |  7 +++----
 src/kudu/tablet/deltamemstore.cc    |  2 +-
 src/kudu/tablet/memrowset.cc        |  4 ++--
 src/kudu/tablet/memrowset.h         |  2 +-
 src/kudu/tablet/mutation.cc         | 32 ++++----------------------------
 src/kudu/tablet/mutation.h          | 26 ++++++++++++++++++++++----
 10 files changed, 40 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/e7dad227/src/kudu/tablet/compaction.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/compaction.cc b/src/kudu/tablet/compaction.cc
index 0af47ca..b2204bf 100644
--- a/src/kudu/tablet/compaction.cc
+++ b/src/kudu/tablet/compaction.cc
@@ -179,7 +179,9 @@ class DiskRowSetCompactionInput : public CompactionInput {
       CompactionInputRow &input_row = block->at(i);
       input_row.row.Reset(&block_, i);
       input_row.redo_head = redo_mutation_block_[i];
+      Mutation::ReverseMutationList(&input_row.redo_head);
       input_row.undo_head = undo_mutation_block_[i];
+      Mutation::ReverseMutationList(&input_row.undo_head);
     }
 
     first_rowid_in_block_ += block_.nrows();

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/e7dad227/src/kudu/tablet/compaction.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/compaction.h b/src/kudu/tablet/compaction.h
index 99a6dda..04b55b0 100644
--- a/src/kudu/tablet/compaction.h
+++ b/src/kudu/tablet/compaction.h
@@ -115,9 +115,9 @@ struct CompactionInputRow {
   // The compaction input base row.
   RowBlockRow row;
   // The current redo head for this row, may be null if the base row has no mutations.
-  const Mutation* redo_head;
+  Mutation* redo_head;
   // The current undo head for this row, may be null if all undos were garbage collected.
-  const Mutation* undo_head;
+  Mutation* undo_head;
 };
 
 // Function shared by flushes, compactions and major delta compactions. Applies all the REDO

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/e7dad227/src/kudu/tablet/delta_compaction.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/delta_compaction.cc b/src/kudu/tablet/delta_compaction.cc
index df1e95a..b79b414 100644
--- a/src/kudu/tablet/delta_compaction.cc
+++ b/src/kudu/tablet/delta_compaction.cc
@@ -135,6 +135,7 @@ Status MajorDeltaCompaction::FlushRowSetAndDeltas() {
       CompactionInputRow &input_row = input_rows.at(i);
       input_row.row.Reset(&block, i);
       input_row.redo_head = redo_mutation_block[i];
+      Mutation::ReverseMutationList(&input_row.redo_head);
       input_row.undo_head = nullptr;
 
       RowBlockRow dst_row = block.row(i);

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/e7dad227/src/kudu/tablet/delta_store.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/delta_store.h b/src/kudu/tablet/delta_store.h
index e4dafe6..b0ab4b2 100644
--- a/src/kudu/tablet/delta_store.h
+++ b/src/kudu/tablet/delta_store.h
@@ -153,8 +153,8 @@ class DeltaIterator {
   //
   // Each entry in the vector will be treated as a singly linked list of Mutation
   // objects. If there are no mutations for that row, the entry will be unmodified.
-  // If there are mutations, they will be appended at the tail of the linked list
-  // (i.e in ascending timestamp order)
+  // If there are mutations, they will be prepended at the head of the linked list
+  // (i.e the resulting list will be in descending timestamp order)
   //
   // The Mutation objects will be allocated out of the provided Arena, which must be non-NULL.
   // Must have called PrepareBatch() with flag = PREPARE_FOR_COLLECT.

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/e7dad227/src/kudu/tablet/deltafile.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/deltafile.cc b/src/kudu/tablet/deltafile.cc
index d767954..168b25b 100644
--- a/src/kudu/tablet/deltafile.cc
+++ b/src/kudu/tablet/deltafile.cc
@@ -756,9 +756,8 @@ Status DeltaFileIterator::ApplyDeletes(SelectionVector *sel_vec) {
   }
 }
 
-// Visitor which, for each mutation, appends it into a ColumnBlock of
-// Mutation *s. See CollectMutations()
-// Each mutation is projected into the iterator schema, if required.
+// Visitor which, for each mutation, adds it into a ColumnBlock of
+// Mutation *s, prepending to each linked list. See CollectMutations().
 template<DeltaType Type>
 struct CollectingVisitor {
 
@@ -770,7 +769,7 @@ struct CollectingVisitor {
 
     RowChangeList changelist(deltas);
     Mutation *mutation = Mutation::CreateInArena(dst_arena, key.timestamp(), changelist);
-    mutation->AppendToList(&dst->at(rel_idx));
+    mutation->PrependToList(&dst->at(rel_idx));
 
     return Status::OK();
   }

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/e7dad227/src/kudu/tablet/deltamemstore.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/deltamemstore.cc b/src/kudu/tablet/deltamemstore.cc
index 6e09669..3db3f26 100644
--- a/src/kudu/tablet/deltamemstore.cc
+++ b/src/kudu/tablet/deltamemstore.cc
@@ -344,7 +344,7 @@ Status DMSIterator::CollectMutations(vector<Mutation *> *dst, Arena
*arena) {
     uint32_t rel_idx = key.row_idx() - prepared_idx_;
 
     Mutation *mutation = Mutation::CreateInArena(arena, key.timestamp(), changelist);
-    mutation->AppendToList(&dst->at(rel_idx));
+    mutation->PrependToList(&dst->at(rel_idx));
   }
   return Status::OK();
 }

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/e7dad227/src/kudu/tablet/memrowset.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/memrowset.cc b/src/kudu/tablet/memrowset.cc
index e03bea2..04a4467 100644
--- a/src/kudu/tablet/memrowset.cc
+++ b/src/kudu/tablet/memrowset.cc
@@ -578,14 +578,14 @@ Status MemRowSet::Iterator::ApplyMutationsToProjectedRow(
 // Copy the current MRSRow to the 'dst_row' provided using the iterator projection schema.
 Status MemRowSet::Iterator::GetCurrentRow(RowBlockRow* dst_row,
                                           Arena* row_arena,
-                                          const Mutation** redo_head,
+                                          Mutation** redo_head,
                                           Arena* mutation_arena,
                                           Timestamp* insertion_timestamp) {
 
   DCHECK(redo_head != nullptr);
 
   // Get the row from the MemRowSet. It may have a different schema from the iterator projection.
-  const MRSRow src_row = GetCurrentRow();
+  MRSRow src_row = GetCurrentRow();
 
   *insertion_timestamp = src_row.insertion_timestamp();
 

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/e7dad227/src/kudu/tablet/memrowset.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/memrowset.h b/src/kudu/tablet/memrowset.h
index d494c1c..962a0e1 100644
--- a/src/kudu/tablet/memrowset.h
+++ b/src/kudu/tablet/memrowset.h
@@ -423,7 +423,7 @@ class MemRowSet::Iterator : public RowwiseIterator {
   // Copy the current MRSRow to the 'dst_row' provided using the iterator projection schema.
   Status GetCurrentRow(RowBlockRow* dst_row,
                        Arena* row_arena,
-                       const Mutation** redo_head,
+                       Mutation** redo_head,
                        Arena* mutation_arena,
                        Timestamp* insertion_timestamp);
 

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/e7dad227/src/kudu/tablet/mutation.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/mutation.cc b/src/kudu/tablet/mutation.cc
index 96b970c..a6d2237 100644
--- a/src/kudu/tablet/mutation.cc
+++ b/src/kudu/tablet/mutation.cc
@@ -46,43 +46,19 @@ string Mutation::StringifyMutationList(const Schema &schema, const
Mutation *hea
   return ret;
 }
 
-
 void Mutation::AppendToListAtomic(Mutation **list) {
-  DoAppendToList<true>(list);
-}
-
-void Mutation::AppendToList(Mutation **list) {
-  DoAppendToList<false>(list);
-}
-
-namespace {
-template<bool ATOMIC>
-inline void Store(Mutation** pointer, Mutation* val);
-
-template<>
-inline void Store<true>(Mutation** pointer, Mutation* val) {
-  Release_Store(reinterpret_cast<AtomicWord*>(pointer),
-                reinterpret_cast<AtomicWord>(val));
-}
-
-template<>
-inline void Store<false>(Mutation** pointer, Mutation* val) {
-  *pointer = val;
-}
-} // anonymous namespace
-
-template<bool ATOMIC>
-inline void Mutation::DoAppendToList(Mutation **list) {
   next_ = nullptr;
   if (*list == nullptr) {
-    Store<ATOMIC>(list, this);
+    Release_Store(reinterpret_cast<AtomicWord*>(list),
+                  reinterpret_cast<AtomicWord>(this));
   } else {
     // Find tail and append.
     Mutation *tail = *list;
     while (tail->next_ != nullptr) {
       tail = tail->next_;
     }
-    Store<ATOMIC>(&tail->next_, this);
+    Release_Store(reinterpret_cast<AtomicWord*>(&tail->next_),
+                  reinterpret_cast<AtomicWord>(this));
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/e7dad227/src/kudu/tablet/mutation.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/mutation.h b/src/kudu/tablet/mutation.h
index 6e29586..7bdd959 100644
--- a/src/kudu/tablet/mutation.h
+++ b/src/kudu/tablet/mutation.h
@@ -61,12 +61,19 @@ class Mutation {
   static string StringifyMutationList(const Schema &schema, const Mutation *head);
 
   // Append this mutation to the list at the given pointer.
-  void AppendToListAtomic(Mutation **list);
-
-  // Same as above, except that this version implies "Release" memory semantics
+  // This operation uses "Release" memory semantics
   // (see atomicops.h). The pointer as well as all of the mutations in the list
   // must be word-aligned.
-  void AppendToList(Mutation **list);
+  void AppendToListAtomic(Mutation **list);
+
+  void PrependToList(Mutation** list) {
+    this->next_ = *list;
+    *list = this;
+  }
+
+  // O(n) algorithm to reverse the order of a linked list of
+  // mutations.
+  static void ReverseMutationList(Mutation** list);
 
  private:
   friend class MSRow;
@@ -106,6 +113,17 @@ inline Mutation *Mutation::CreateInArena(
   return ret;
 }
 
+inline void Mutation::ReverseMutationList(Mutation** list) {
+  Mutation* prev = nullptr;
+  Mutation* cur = *list;
+  while (cur != nullptr) {
+    Mutation* next = cur->next_;
+    cur->next_ = prev;
+    prev = cur;
+    cur = next;
+  }
+  *list = prev;
+}
 
 } // namespace tablet
 } // namespace kudu


Mime
View raw message