kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject [2/5] kudu git commit: KUDU-237 (part 2) - Add support for REINSERT in delta files
Date Wed, 30 Nov 2016 17:07:42 GMT
KUDU-237 (part 2) - Add support for REINSERT in delta files

This patch goes the final mile in adding support for REINSERTs in
delta files. It does various things to achieve this:

- Transforms REDO DELETE/REINSERT mutation pairs into UNDO
REINSERT/DELETE mutation pairs, leaving at the most a REDO
delete.

- Merges ghost rows on compaction: When duplicated rows are
found a new algorithm finds out which one is the most recent
and adds the other one as a 'previous_ghost'. This can happen
for an arbitrary number of ghost rows. Noteworthy is that setting
previous versions requires a copy. The two rows are in different
RowBlocks (for row data) and Arenas (for mutations) and it is
not guaranteed that the previous ghost will suvive by the time the
row that points to it is processed.

- Adds new test to tablet-test and changes a test in
compaction-test proving that this works.

- Adds a new test to compaction-test that creates several layers
of overlapping rowsets where each layer has one rowset less than
the previous one. This creates a mix of duplicated (up to 10
different ghosts) and unique rows. The test then compacts the
rowsets a few at a time and makes sure the histories are correct.

Follow up patches will add new itests that test this functionality
even more broadly.

Ran 500 loops of mt-tablet-test and compaction-test
in dist-test with "KUDU_ALLOW_SLOW_TESTS=1" and
"--stress_cpu_threads=4". All tests passed. Results:

compaction-test: http://dist-test.cloudera.org//job?job_id=david.alves.1480028902.31041
mt-tablet-test:  http://dist-test.cloudera.org//job?job_id=david.alves.1480028908.31125

Change-Id: Ie1173b2bea721b376f2b6049be20f57307582c47
Reviewed-on: http://gerrit.cloudera.org:8080/4995
Tested-by: Kudu Jenkins
Reviewed-by: David Ribeiro Alves <dralves@apache.org>
Reviewed-by: Mike Percy <mpercy@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/abab2ced
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/abab2ced
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/abab2ced

Branch: refs/heads/master
Commit: abab2ced14c2166ca057573b78b738b0ebc113b2
Parents: ecff49a
Author: David Alves <dralves@apache.org>
Authored: Mon Nov 7 22:39:36 2016 -0800
Committer: David Ribeiro Alves <dralves@apache.org>
Committed: Wed Nov 30 10:14:29 2016 +0000

----------------------------------------------------------------------
 src/kudu/common/row_changelist.h   |  18 +-
 src/kudu/tablet/compaction-test.cc | 199 +++++++++++++++-
 src/kudu/tablet/compaction.cc      | 410 +++++++++++++++++++++++++-------
 src/kudu/tablet/compaction.h       |  12 +-
 src/kudu/tablet/delta_store.cc     |   1 -
 src/kudu/tablet/deltafile.cc       |  53 ++---
 src/kudu/tablet/deltafile.h        |   6 +-
 src/kudu/tablet/tablet-test.cc     |  63 +++++
 8 files changed, 631 insertions(+), 131 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/abab2ced/src/kudu/common/row_changelist.h
----------------------------------------------------------------------
diff --git a/src/kudu/common/row_changelist.h b/src/kudu/common/row_changelist.h
index ed5f536..5c1987d 100644
--- a/src/kudu/common/row_changelist.h
+++ b/src/kudu/common/row_changelist.h
@@ -199,6 +199,13 @@ class RowChangeListEncoder {
         (type_ == RowChangeList::kReinsert || type_ == RowChangeList::kUpdate);
   }
 
+  // Internal version of AddColumnUpdate which does not set type, allowing
+  // it to work for both REINSERT and UPDATE.
+  // Exposed for tests.
+  void EncodeColumnMutation(const ColumnSchema& col_schema,
+                            int col_id,
+                            const void* cell_ptr);
+
  private:
   FRIEND_TEST(TestRowChangeList, TestInvalid_SetNullForNonNullableColumn);
   FRIEND_TEST(TestRowChangeList, TestInvalid_SetWrongSizeForIntColumn);
@@ -217,12 +224,6 @@ class RowChangeListEncoder {
     DCHECK_EQ(type_, type);
   }
 
-  // Internal version of AddColumnUpdate which does not set type, allowing
-  // it to work for both REINSERT and UPDATE.
-  void EncodeColumnMutation(const ColumnSchema& col_schema,
-                            int col_id,
-                            const void* cell_ptr);
-
   // Add a column mutation by a raw value. This allows copying RCLs
   // from one file to another without having any awareness of schema.
   //
@@ -306,12 +307,11 @@ class RowChangeListDecoder {
     return type_;
   }
 
-  // Append an entry to *column_ids for each column that is updated
-  // in this RCL.
+  // Append an entry to *column_ids for each column that is mutated in this RCL.
   // This 'consumes' the remainder of the encoded RowChangeList.
   Status GetIncludedColumnIds(std::vector<ColumnId>* column_ids) {
+    DCHECK(is_update() || is_reinsert());
     column_ids->clear();
-    DCHECK(is_update());
     while (HasNext()) {
       DecodedUpdate dec;
       RETURN_NOT_OK(DecodeNext(&dec));

http://git-wip-us.apache.org/repos/asf/kudu/blob/abab2ced/src/kudu/tablet/compaction-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/compaction-test.cc b/src/kudu/tablet/compaction-test.cc
index 3d23c29..c90caa2 100644
--- a/src/kudu/tablet/compaction-test.cc
+++ b/src/kudu/tablet/compaction-test.cc
@@ -70,6 +70,7 @@ class TestCompaction : public KuduRowSetTest {
     : KuduRowSetTest(CreateSchema()),
       op_id_(consensus::MaximumOpId()),
       row_builder_(schema_),
+      arena_(32*1024, 128*1024),
       clock_(server::LogicalClock::CreateStartingAt(Timestamp::kInitialTimestamp)),
       mvcc_(clock_),
       log_anchor_registry_(new log::LogAnchorRegistry()) {
@@ -423,12 +424,19 @@ class TestCompaction : public KuduRowSetTest {
     }
   }
 
+  // Helpers for building an expected row history.
+  void AddExpectedDelete(Mutation** current_head, Timestamp ts = Timestamp::kInvalidTimestamp);
+  void AddExpectedUpdate(Mutation** current_head, int32_t val);
+  void AddExpectedReinsert(Mutation** current_head, int32_t val);
+  void AddUpdateAndDelete(RowSet* rs, CompactionInputRow* row, int row_id, int32_t val);
+
  protected:
   OpId op_id_;
 
   RowBuilder row_builder_;
   char key_buf_[256];
-  scoped_refptr<server::Clock> clock_;
+  Arena arena_;
+  scoped_refptr<server::LogicalClock> clock_;
   MvccManager mvcc_;
 
   scoped_refptr<LogAnchorRegistry> log_anchor_registry_;
@@ -530,8 +538,9 @@ TEST_F(TestCompaction, TestRowSetInput) {
 }
 
 // Tests that the same rows, duplicated in three DRSs, ghost in two of them
-// appears only once on the compaction output
-TEST_F(TestCompaction, TestDuplicatedGhostRowsDontSurviveCompaction) {
+// appears only once on the compaction output but that the resulting row
+// includes reinserts for the ghost and all its mutations.
+TEST_F(TestCompaction, TestDuplicatedGhostRowsMerging) {
   shared_ptr<DiskRowSet> rs1;
   {
     shared_ptr<MemRowSet> mrs;
@@ -591,10 +600,190 @@ TEST_F(TestCompaction, TestDuplicatedGhostRowsDontSurviveCompaction) {
   ASSERT_EQ(out.size(), 10);
   EXPECT_EQ("RowIdxInBlock: 0; Base: (string key=hello 00000000, int32 val=2, "
                 "int32 nullable_val=NULL); Undo Mutations: [@61(SET val=0, nullable_val=0), "
-                "@51(DELETE)]; Redo Mutations: [];", out[0]);
+                "@51(DELETE), @41(REINSERT val=1, nullable_val=1), @31(SET val=0, nullable_val=0), "
+                "@21(DELETE), @11(REINSERT val=0, nullable_val=0), @1(DELETE)]; "
+                "Redo Mutations: [];", out[0]);
   EXPECT_EQ("RowIdxInBlock: 9; Base: (string key=hello 00000090, int32 val=2, "
                 "int32 nullable_val=NULL); Undo Mutations: [@70(SET val=9, nullable_val=NULL), "
-                "@60(DELETE)]; Redo Mutations: [];", out[9]);
+                "@60(DELETE), @50(REINSERT val=1, nullable_val=1), @40(SET val=9, "
+                "nullable_val=NULL), @30(DELETE), @20(REINSERT val=9, nullable_val=NULL), "
+                "@10(DELETE)]; Redo Mutations: [];", out[9]);
+}
+
+void TestCompaction::AddExpectedDelete(Mutation** current_head, Timestamp ts) {
+  faststring buf;
+  RowChangeListEncoder enc(&buf);
+  enc.SetToDelete();
+  if (ts == Timestamp::kInvalidTimestamp) ts = Timestamp(clock_->GetCurrentTime());
+  Mutation* mutation = Mutation::CreateInArena(&arena_,
+                                               ts,
+                                               enc.as_changelist());
+  mutation->set_next(*current_head);
+  *current_head = mutation;
+}
+
+void TestCompaction::AddExpectedUpdate(Mutation** current_head, int32_t val) {
+  faststring buf;
+  RowChangeListEncoder enc(&buf);
+  enc.SetToUpdate();
+  enc.AddColumnUpdate(schema_.column(1), schema_.column_id(1), &val);
+  if (val % 2 == 0) {
+    enc.AddColumnUpdate(schema_.column(2), schema_.column_id(2), &val);
+  } else {
+    enc.AddColumnUpdate(schema_.column(2), schema_.column_id(2), nullptr);
+  }
+  Mutation* mutation = Mutation::CreateInArena(&arena_,
+                                               Timestamp(clock_->GetCurrentTime()),
+                                               enc.as_changelist());
+  mutation->set_next(*current_head);
+  *current_head = mutation;
+}
+
+void TestCompaction::AddExpectedReinsert(Mutation** current_head, int32_t val) {
+  faststring buf;
+  RowChangeListEncoder enc(&buf);
+  enc.SetToReinsert();
+  enc.EncodeColumnMutation(schema_.column(1), schema_.column_id(1), &val);
+  if (val % 2 == 1) {
+    enc.EncodeColumnMutation(schema_.column(2), schema_.column_id(2), &val);
+  } else {
+    enc.EncodeColumnMutation(schema_.column(2), schema_.column_id(2), nullptr);
+  }
+  Mutation* mutation = Mutation::CreateInArena(&arena_, Timestamp(clock_->GetCurrentTime()),
+                                               enc.as_changelist());
+  mutation->set_next(*current_head);
+  *current_head = mutation;
+}
+
+void TestCompaction::AddUpdateAndDelete(RowSet* rs, CompactionInputRow* row, int row_id,
+                                        int32_t val) {
+  UpdateRow(rs, row_id, val);
+  // Expect an UNDO update for the update.
+  AddExpectedUpdate(&row->undo_head, row_id);
+
+  DeleteRow(rs, row_id);
+  // Expect an UNDO reinsert for the delete.
+  AddExpectedReinsert(&row->undo_head, val);
+}
+
+// Build several layers of overlapping rowsets with many ghost rows.
+// Repeatedly merge all the generated RowSets until we are left with a single RowSet, then make
+// sure that its history matches our expected history.
+//
+// There are 'kBaseNumRowSets' layers of overlapping rowsets, each level has one less rowset and
+// thus less rows. This is meant to exercise a normal-ish path where there are both duplicated and
+// unique rows per merge while at the same time making sure that some of the rows are duplicated
+// many times.
+//
+// The verification is performed against a vector of expected CompactionInputRow that we build
+// as we insert/update/delete.
+TEST_F(TestCompaction, TestDuplicatedRowsRandomCompaction) {
+  const int kBaseNumRowSets = 10;
+  const int kNumRowsPerRowSet = 10;
+
+  int total_num_rows = kBaseNumRowSets * kNumRowsPerRowSet;
+
+  MvccSnapshot all_snap = MvccSnapshot::CreateSnapshotIncludingAllTransactions();
+
+  vector<CompactionInputRow> expected_rows(total_num_rows);
+  vector<shared_ptr<DiskRowSet>> row_sets;
+
+  // Create a vector of ids for rows and fill it for the first layer.
+  vector<int> row_ids(total_num_rows);
+  std::iota(row_ids.begin(), row_ids.end(), 0);
+
+  SeedRandom();
+  int rs_id = 0;
+  for (int i = 0; i < kBaseNumRowSets; ++i) {
+    int num_rowsets_in_layer = kBaseNumRowSets - i;
+    size_t row_idx = 0;
+    for (int j = 0; j < num_rowsets_in_layer; ++j) {
+      shared_ptr<MemRowSet> mrs;
+      ASSERT_OK(MemRowSet::Create(rs_id, schema_, log_anchor_registry_.get(),
+                                  mem_trackers_.tablet_tracker, &mrs));
+
+      // For even rows, insert, update and delete them in the mrs.
+      for (int k = 0; k < kNumRowsPerRowSet; ++k) {
+        int row_id = row_ids[row_idx + k];
+        CompactionInputRow* row = &expected_rows[row_id];
+        InsertRow(mrs.get(), row_id, row_id);
+        // Expect an UNDO delete for the insert.
+        AddExpectedDelete(&row->undo_head);
+        if (row_id % 2 == 0) AddUpdateAndDelete(mrs.get(), row, row_id, row_id + i + 1);
+      }
+      shared_ptr<DiskRowSet> drs;
+      FlushMRSAndReopenNoRoll(*mrs, schema_, &drs);
+      // For odd rows, update them and delete them in the drs.
+      for (int k = 0; k < kNumRowsPerRowSet; ++k) {
+        int row_id = row_ids[row_idx];
+        CompactionInputRow* row = &expected_rows[row_id];
+        if (row_id % 2 == 1) AddUpdateAndDelete(drs.get(), row, row_id, row_id + i + 1);
+        row_idx++;
+      }
+      row_sets.push_back(drs);
+      rs_id++;
+    }
+    // For the next layer remove one rowset worth of rows at random.
+    for (int j = 0; j < kNumRowsPerRowSet; ++j) {
+      int to_remove = rand() % row_ids.size();
+      row_ids.erase(row_ids.begin() + to_remove);
+    }
+
+  }
+
+  RowBlock block(schema_, kBaseNumRowSets * kNumRowsPerRowSet, &arena_);
+  // Go through the expected compaction input rows, flip the last undo into a redo and
+  // build the base. This will give us the final version that we'll expect the result
+  // of the real compaction to match.
+  for (int i = 0; i < expected_rows.size(); ++i) {
+    CompactionInputRow* row = &expected_rows[i];
+    Mutation* reinsert = row->undo_head;
+    row->undo_head = reinsert->next();
+    row->row = block.row(i);
+    BuildRow(i, i);
+    CopyRow(row_builder_.row(), &row->row, &arena_);
+    RowChangeListDecoder redo_decoder(reinsert->changelist());
+    CHECK_OK(redo_decoder.Init());
+    faststring buf;
+    RowChangeListEncoder dummy(&buf);
+    dummy.SetToUpdate();
+    redo_decoder.MutateRowAndCaptureChanges(&row->row, &arena_, &dummy);
+    AddExpectedDelete(&row->redo_head, reinsert->timestamp());
+  }
+
+  vector<shared_ptr<CompactionInput>> inputs;
+  for (auto& row_set : row_sets) {
+    gscoped_ptr<CompactionInput> ci;
+    CHECK_OK(row_set->NewCompactionInput(&schema_, all_snap, &ci));
+    inputs.push_back(shared_ptr<CompactionInput>(ci.release()));
+  }
+
+  // Compact the row sets by picking a few at random until we're left with just one.
+  while (row_sets.size() > 1) {
+    std::random_shuffle(row_sets.begin(), row_sets.end());
+    // Merge between 2 and 4 row sets.
+    int num_rowsets_to_merge = std::min(rand() % 3 + 2, static_cast<int>(row_sets.size()));
+    vector<shared_ptr<DiskRowSet>> to_merge;
+    for (int i = 0; i < num_rowsets_to_merge; ++i) {
+      to_merge.push_back(row_sets.back());
+      row_sets.pop_back();
+    }
+    shared_ptr<DiskRowSet> result;
+    CompactAndReopenNoRoll(to_merge, schema_, &result);
+    row_sets.push_back(result);
+  }
+
+  vector<string> out;
+  gscoped_ptr<CompactionInput> ci;
+  CHECK_OK(row_sets[0]->NewCompactionInput(&schema_, all_snap, &ci));
+  IterateInput(ci.get(), &out);
+
+  // Finally go through the final compaction input and through the expected one and make sure
+  // they match.
+  ASSERT_EQ(expected_rows.size(), out.size());
+  for (int i = 0; i < expected_rows.size(); ++i) {
+    EXPECT_EQ(CompactionInputRowToString(expected_rows[i]), out[i]);
+  }
 }
 
 // Test case that inserts and deletes a row in the same transaction and makes sure

http://git-wip-us.apache.org/repos/asf/kudu/blob/abab2ced/src/kudu/tablet/compaction.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/compaction.cc b/src/kudu/tablet/compaction.cc
index bec6615..45f5772 100644
--- a/src/kudu/tablet/compaction.cc
+++ b/src/kudu/tablet/compaction.cc
@@ -251,6 +251,87 @@ class DiskRowSetCompactionInput : public CompactionInput {
   };
 };
 
+// Compares two duplicate rows before compaction (and before the REDO->UNDO transformation).
+// Returns -1 if 'left' is less recent than 'right', 1 otherwise. Never returns 0.
+int CompareDuplicatedRows(const CompactionInputRow& left,
+                          const CompactionInputRow& right) {
+  const Mutation* left_last = left.redo_head;
+  const Mutation* right_last = right.redo_head;
+  AdvanceToLastInList(&left_last);
+  AdvanceToLastInList(&right_last);
+
+  if (left.redo_head == nullptr) {
+    // left must still be alive, meaning right must have at least a DELETE redo.
+    DCHECK(right_last != nullptr);
+    DCHECK(right_last->changelist().is_delete());
+    return 1;
+  }
+  if (right.redo_head == nullptr) {
+    // right must still be alive, meaning left must have at least a DELETE redo.
+    DCHECK(left_last != nullptr);
+    DCHECK(left_last->changelist().is_delete());
+    return -1;
+  }
+
+  // Duplicated rows usually have disjoint redo histories, meaning the first mutation
+  // should be enough for the sake of determining the most recent row in most cases.
+  int ret = left.redo_head->timestamp().CompareTo(right.redo_head->timestamp());
+
+  if (ret > 0) {
+    return ret;
+  }
+
+  if (PREDICT_TRUE(ret < 0)) {
+    return ret;
+  }
+
+  // In case the histories aren't disjoint it must be because one row was deleted in the
+  // the same operation that inserted the other one, which was then mutated.
+  // For instance this is a valid history with non-disjoint REDO histories:
+  // -- Row 'a' lives in DRS1
+  // Update row 'a' @ 10
+  // Delete row 'a' @ 10
+  // Insert row 'a' @ 10
+  // -- Row 'a' lives in the MRS
+  // Update row 'a' @ 10
+  // -- Flush the MRS into DRS2
+  // -- Compact DRS1 and DRS2
+  //
+  // We can't have the case here where both 'left' and 'right' have a DELETE as the last
+  // mutation at the same timestamp. This would be troublesome as we would possibly have
+  // no way to decide which version is the most up-to-date (one or both version's undos
+  // might have been garbage collected). See MemRowSetCompactionInput::PrepareBlock().
+
+  // At least one of the rows must have a DELETE REDO as its last redo.
+  CHECK(left_last->changelist().is_delete() || right_last->changelist().is_delete());
+  // We made sure that rows that are inserted and deleted in the same operation can never
+  // be part of a compaction input so they can't both be deletes.
+  CHECK(!(left_last->changelist().is_delete() && right_last->changelist().is_delete()));
+
+  // If 'left' doesn't have a delete then it's the latest version.
+  if (!left_last->changelist().is_delete() && right_last->changelist().is_delete()) {
+    return 1;
+  }
+
+  // ...otherwise it's 'right'.
+  return -1;
+}
+
+void CopyMutations(Mutation* from, Mutation** to, Arena* arena) {
+  Mutation* previous = nullptr;
+  for (const Mutation* cur = from; cur != nullptr; cur = cur->acquire_next()) {
+    Mutation* copy = Mutation::CreateInArena(arena,
+                                             cur->timestamp(),
+                                             cur->changelist());
+    if (previous != nullptr) {
+      previous->set_next(copy);
+    } else {
+      *to = copy;
+    }
+    previous = copy;
+  }
+}
+
 class MergeCompactionInput : public CompactionInput {
  private:
   // State kept for each of the inputs.
@@ -267,8 +348,12 @@ class MergeCompactionInput : public CompactionInput {
       return pending_idx >= pending.size();
     }
 
-    const CompactionInputRow &next() const {
-      return pending[pending_idx];
+    CompactionInputRow* next() {
+      return &pending[pending_idx];
+    }
+
+    const CompactionInputRow* next() const {
+      return &pending[pending_idx];
     }
 
     void pop_front() {
@@ -289,7 +374,7 @@ class MergeCompactionInput : public CompactionInput {
       DCHECK(!empty());
       DCHECK(!other.empty());
 
-      return schema.Compare(pending.back().row, other.next().row) < 0;
+      return schema.Compare(pending.back().row, (*other.next()).row) < 0;
     }
 
     shared_ptr<CompactionInput> input;
@@ -302,7 +387,8 @@ class MergeCompactionInput : public CompactionInput {
  public:
   MergeCompactionInput(const vector<shared_ptr<CompactionInput> > &inputs,
                        const Schema* schema)
-    : schema_(schema) {
+    : schema_(schema),
+      num_dup_rows_(0) {
     for (const shared_ptr<CompactionInput> &input : inputs) {
       gscoped_ptr<MergeState> state(new MergeState);
       state->input = input;
@@ -344,7 +430,7 @@ class MergeCompactionInput : public CompactionInput {
 
     while (true) {
       int smallest_idx = -1;
-      CompactionInputRow smallest;
+      CompactionInputRow* smallest;
 
       // Iterate over the inputs to find the one with the smallest next row.
       // It may seem like an O(n lg k) merge using a heap would be more efficient,
@@ -367,40 +453,38 @@ class MergeCompactionInput : public CompactionInput {
           smallest = state->next();
           continue;
         }
-        int row_comp = schema_->Compare(state->next().row, smallest.row);
+        int row_comp = schema_->Compare(state->next()->row, smallest->row);
         if (row_comp < 0) {
           smallest_idx = i;
           smallest = state->next();
           continue;
         }
-        // If we found two duplicated rows, we want the row with the highest
-        // live version. If they're equal, that can only be because they're both
-        // dead, in which case it doesn't matter.
-        // TODO: this is going to change with historical REINSERT handling.
+        // If we found two rows with the same key, we want to make the newer one point to the older
+        // one, which must be a ghost.
         if (PREDICT_FALSE(row_comp == 0)) {
-          int mutation_comp = CompareLatestLiveVersion(state->next(), smallest);
+          int mutation_comp = CompareDuplicatedRows(*state->next(), *smallest);
+          CHECK_NE(mutation_comp, 0);
           if (mutation_comp > 0) {
             // If the previous smallest row has a highest version that is lower
-            // than this one, discard it.
+            // than this one, clone it as the previous version and discard the original.
+            RETURN_NOT_OK(SetPreviousGhost(state->next(), smallest, true /* clone */,
+                                           state->input->PreparedBlockArena()));
             states_[smallest_idx]->pop_front();
             smallest_idx = i;
             smallest = state->next();
             continue;
-          } else {
-            // .. otherwise pop the other one.
-            //
-            // NOTE: If they're equal, then currently that means that both versions are
-            // ghosts. Once we handle REINSERTS, we'll have to figure out which one "comes
-            // first" and deal with this properly. For now, we can just pick arbitrarily.
-            states_[i]->pop_front();
-            continue;
           }
+          // .. otherwise copy and pop the other one.
+          RETURN_NOT_OK(SetPreviousGhost(smallest, state->next(), true /* clone */,
+                                         smallest->row.row_block()->arena()));
+          states_[i]->pop_front();
+          continue;
         }
       }
       DCHECK_GE(smallest_idx, 0);
 
       states_[smallest_idx]->pop_front();
-      block->push_back(smallest);
+      block->push_back(*smallest);
     }
 
     return Status::OK();
@@ -502,54 +586,194 @@ class MergeCompactionInput : public CompactionInput {
     }
   }
 
-  // Compare the mutations of two duplicated rows.
-  // Returns -1 if latest_version(left) < latest_version(right)
-  static int CompareLatestLiveVersion(const CompactionInputRow& left,
-                                      const CompactionInputRow& right) {
-    if (left.redo_head == nullptr) {
-      // left must still be alive
-      DCHECK(right.redo_head != nullptr);
-      return 1;
+  // (Deep) clones a compaction input row, copying both the row data, all undo/redo mutations and
+  // all previous ghosts to 'arena'.
+  Status CloneCompactionInputRow(const CompactionInputRow* src,
+                                 CompactionInputRow** dst,
+                                 Arena* arena) {
+    CompactionInputRow* copy = arena->NewObject<CompactionInputRow>();
+    copy->row = NewRow();
+    // Copy the row to the arena.
+    RETURN_NOT_OK(CopyRow(src->row, &copy->row, arena));
+    // ... along with the redos and undos.
+    CopyMutations(src->redo_head, &copy->redo_head, arena);
+    CopyMutations(src->undo_head, &copy->undo_head, arena);
+    // Copy previous versions recursively.
+    if (src->previous_ghost != nullptr) {
+      CompactionInputRow* child;
+      RETURN_NOT_OK(CloneCompactionInputRow(src->previous_ghost, &child, arena));
+      copy->previous_ghost = child;
     }
-    if (right.redo_head == nullptr) {
-      DCHECK(left.redo_head != nullptr);
-      return -1;
+    *dst = copy;
+    return Status::OK();
+  }
+
+  // Sets the previous ghost row for a CompactionInputRow.
+  // 'must_copy' indicates whether there must be a deep copy (using CloneCompactionInputRow()).
+  Status SetPreviousGhost(CompactionInputRow* older,
+                          CompactionInputRow* newer,
+                          bool must_copy,
+                          Arena* arena) {
+    CHECK(arena != nullptr) << "Arena can't be null";
+    // Check if we already had a previous version and, if yes, whether 'newer' is more or less
+    // recent.
+    if (older->previous_ghost != nullptr) {
+      if (CompareDuplicatedRows(*older->previous_ghost, *newer) > 0) {
+        // 'older' was more recent.
+        return SetPreviousGhost(older->previous_ghost, newer, must_copy /* clone */, arena);
+      }
+      // 'newer' was more recent.
+      if (must_copy) {
+        CompactionInputRow* newer_copy;
+        RETURN_NOT_OK(CloneCompactionInputRow(newer, &newer_copy, arena));
+        newer = newer_copy;
+      }
+      // 'older->previous_ghost' is already in 'arena' so avoid the copy.
+      RETURN_NOT_OK(SetPreviousGhost(newer,
+                                     older->previous_ghost,
+                                     false /* don't clone */,
+                                     arena));
+      older->previous_ghost = newer;
+      return Status::OK();
     }
 
-    // Duplicated rows have disjoint histories, we don't need to get the latest
-    // mutation, the first one should be enough for the sake of determining the most recent
-    // row, but in debug mode do get the latest to make sure one of the rows is a ghost.
-    const Mutation* left_latest = left.redo_head;
-    const Mutation* right_latest = right.redo_head;
-    int ret = left_latest->timestamp().CompareTo(right_latest->timestamp());
-#ifndef NDEBUG
-    AdvanceToLastInList(&left_latest);
-    AdvanceToLastInList(&right_latest);
-    int debug_ret = left_latest->timestamp().CompareTo(right_latest->timestamp());
-    if (debug_ret != 0) {
-      // If in fact both rows were deleted at the same time, this is OK -- we could
-      // have a case like TestRandomAccess.TestFuzz3, in which a single batch
-      // DELETED from the DRS, INSERTed into MRS, and DELETED from MRS. In that case,
-      // the timestamp of the last REDO will be the same and we can pick whichever
-      // we like.
-      CHECK_EQ(ret, debug_ret);
+    if (must_copy) {
+      CompactionInputRow* newer_copy;
+      RETURN_NOT_OK(CloneCompactionInputRow(newer, &newer_copy, arena));
+      newer = newer_copy;
     }
-#endif
-    return ret;
+    older->previous_ghost = newer;
+    return Status::OK();
   }
 
-  static void AdvanceToLastInList(const Mutation** m) {
-    const Mutation* next;
-    while ((next = (*m)->acquire_next()) != nullptr) {
-      *m = next;
+  // Duplicates are rare and allocating for the worst case (all the rows in all but one of
+  // the inputs are duplicates) is expensive, so we create RowBlocks on demand with just
+  // space for a few rows. If a row block is exhausted a new one is allocated.
+  RowBlockRow NewRow() {
+    rowid_t row_idx = num_dup_rows_ % kDuplicatedRowsPerBlock;
+    num_dup_rows_++;
+    if (row_idx == 0) {
+      duplicated_rows_.push_back(std::unique_ptr<RowBlock>(
+          new RowBlock(*schema_, kDuplicatedRowsPerBlock, static_cast<Arena*>(nullptr))));
     }
+    return duplicated_rows_.back()->row(row_idx);
   }
 
   const Schema* schema_;
   vector<MergeState *> states_;
   Arena* prepared_block_arena_;
+
+  // Vector to keep blocks that store duplicated row data.
+  // This needs to be stored internally as row data for ghosts might have been deleted
+  // by the the time the most recent version row is processed.
+  vector<std::unique_ptr<RowBlock>> duplicated_rows_;
+  int num_dup_rows_;
+
+  enum {
+    kDuplicatedRowsPerBlock = 10
+  };
+
 };
 
+// Advances 'head' while the timestamp of the 'next' mutation is bigger than or equal to 'ts'
+// and 'next' is not null.
+void AdvanceWhileNextEqualToOrBiggerThan(Mutation** head, Timestamp ts) {
+  while ((*head)->next() != nullptr && (*head)->next()->timestamp() >= ts) {
+    *head = (*head)->next();
+  }
+}
+
+// Merges two undo histories into one with decreasing timestamp order and returns the new head.
+Mutation* MergeUndoHistories(Mutation* left, Mutation* right) {
+
+  if (PREDICT_FALSE(left == nullptr)) {
+    return right;
+  }
+  if (PREDICT_FALSE(right == nullptr)) {
+    return left;
+  }
+
+  Mutation* head;
+  if (left->timestamp() >= right->timestamp()) {
+    head = left;
+  } else {
+    head = right;
+  }
+
+  while (left != nullptr && right != nullptr) {
+    if (left->timestamp() >= right->timestamp()) {
+      AdvanceWhileNextEqualToOrBiggerThan(&left, right->timestamp());
+      Mutation* next = left->next();
+      left->set_next(right);
+      left = next;
+      continue;
+    }
+    AdvanceWhileNextEqualToOrBiggerThan(&right, left->timestamp());
+    Mutation* next = right->next();
+    right->set_next(left);
+    right = next;
+  }
+  return head;
+}
+
+// If 'old_row' has previous versions, this transforms prior version in undos and adds them
+// to 'new_undo_head'.
+Status MergeDuplicatedRowHistory(CompactionInputRow* old_row,
+                                 Mutation** new_undo_head,
+                                 Arena* arena) {
+  if (PREDICT_TRUE(old_row->previous_ghost == nullptr)) return Status::OK();
+
+  // Use an all inclusive snapshot as all of the previous version's undos and redos
+  // are guaranteed to be committed, otherwise the compaction wouldn't be able to
+  // see the new row.
+  MvccSnapshot all_snap = MvccSnapshot::CreateSnapshotIncludingAllTransactions();
+
+  faststring dst;
+
+  CompactionInputRow* previous_ghost = old_row->previous_ghost;
+  while (previous_ghost != nullptr) {
+
+    // First step is to transform the old rows REDO's into undos, if there are any.
+    // This simplifies this for several reasons:
+    // - will be left with most up-to-date version of the old row
+    // - only have one REDO left to deal with (the delete)
+    // - can reuse ApplyMutationsAndGenerateUndos()
+    Mutation* pv_new_undos_head = nullptr;
+    Mutation* pv_delete_redo = nullptr;
+
+    RETURN_NOT_OK(ApplyMutationsAndGenerateUndos(all_snap,
+                                                 *previous_ghost,
+                                                 &pv_new_undos_head,
+                                                 &pv_delete_redo,
+                                                 arena,
+                                                 &previous_ghost->row));
+
+    // We should be left with only one redo, the delete.
+    CHECK(pv_delete_redo != nullptr);
+    CHECK(pv_delete_redo->changelist().is_delete());
+    CHECK(pv_delete_redo->next() == nullptr);
+
+    // Now transform the redo delete into an undo (reinsert), which will contain the previous
+    // ghost. The reinsert will have the timestamp of the delete.
+    dst.clear();
+    RowChangeListEncoder undo_encoder(&dst);
+    undo_encoder.SetToReinsert(previous_ghost->row);
+    Mutation* pv_reinsert = Mutation::CreateInArena(arena,
+                                                    pv_delete_redo->timestamp(),
+                                                    undo_encoder.as_changelist());
+
+    // Make the reinsert point to the rest of the undos.
+    pv_reinsert->set_next(pv_new_undos_head);
+
+    // Merge the UNDO lists.
+    *new_undo_head = MergeUndoHistories(*new_undo_head, pv_reinsert);
+
+    // ... handle a previous ghost if there is any.
+    previous_ghost = previous_ghost->previous_ghost;
+  }
+  return Status::OK();
+}
+
 // Makes the current head point to the 'new_head', if it's not null, and
 // makes 'new_head' the new head.
 void SetHead(Mutation** current_head, Mutation* new_head) {
@@ -569,7 +793,19 @@ string RowToString(const RowBlockRow& row, const Mutation* redo_head, const Muta
 }
 
 string CompactionInputRowToString(const CompactionInputRow& input_row) {
-  return RowToString(input_row.row, input_row.redo_head, input_row.undo_head);
+  if (input_row.previous_ghost == nullptr) {
+    return RowToString(input_row.row, input_row.redo_head, input_row.undo_head);
+  }
+  string ret = RowToString(input_row.row, input_row.redo_head, input_row.undo_head);
+  const CompactionInputRow* previous_ghost = input_row.previous_ghost;
+  while (previous_ghost != nullptr) {
+    ret.append(" Previous Ghost: ");
+    ret.append(RowToString(previous_ghost->row,
+                           previous_ghost->redo_head,
+                           previous_ghost->undo_head));
+    previous_ghost = previous_ghost->previous_ghost;
+  }
+  return ret;
 }
 
 ////////////////////////////////////////////////////////////
@@ -693,9 +929,6 @@ Status ApplyMutationsAndGenerateUndos(const MvccSnapshot& snap,
                                       Mutation** new_redo_head,
                                       Arena* arena,
                                       RowBlockRow* dst_row) {
-
-  // At the time of writing, REINSERT is never encoded as an UNDO and the base
-  // data can never be in a deleted state (see KUDU-237).
   bool is_deleted = false;
 
   #define ERROR_LOG_CONTEXT \
@@ -764,40 +997,44 @@ Status ApplyMutationsAndGenerateUndos(const MvccSnapshot& snap,
                                               undo_encoder.as_changelist());
         break;
       }
+      // When we see a reinsert REDO we do the following:
+      // 1 - Reset the REDO head, which contained a DELETE REDO.
+      // 2 - Apply the REINSERT to the row, passing an undo_encoder that encodes the state of
+      //     the row prior to to the REINSERT.
+      // 3 - Create a mutation for the REINSERT above and add it to the UNDOs, this mutation
+      //     will have the timestamp of the DELETE REDO.
+      // 4 - Create a new delete UNDO. This mutation will have the timestamp of the REINSERT REDO.
       case RowChangeList::kReinsert: {
         DCHECK(is_deleted) << "Got REINSERT for a non-deleted row. " << ERROR_LOG_CONTEXT;
         CHECK(redo_delete != nullptr)  << "Got REINSERT without a redo DELETE. "
-                                        << ERROR_LOG_CONTEXT;
+                                       << ERROR_LOG_CONTEXT;
         redo_decoder.TwiddleDeleteStatus(&is_deleted);
+        Timestamp delete_ts = redo_delete->timestamp();
 
+        // 1 - Reset the delete REDO.
+        redo_delete = nullptr;
 
-        // Right now when a REINSERT mutation is found it is treated as a new insert and it
-        // clears the whole row history before it. We apply the reinsert to the new
-        // row but disregard the undo reinsert that is created.
-        //
-        // TODO(dralves) KUDU-237 Actually store the REINSERT
+        // 2 - Apply the changes of the reinsert to the latest version of the row
+        // capturing the old row while we're at it.
+        // TODO(dralves) Make Reinserts set defaults on the dest row. See KUDU-1760.
         undo_encoder.SetToReinsert();
         RETURN_NOT_OK_LOG(redo_decoder.MutateRowAndCaptureChanges(
             dst_row, static_cast<Arena*>(nullptr), &undo_encoder), ERROR,
-                          "Unable to apply reinsert undo. \n " + ERROR_LOG_CONTEXT);
+                          "Unable to apply reinsert undo. \n" + ERROR_LOG_CONTEXT);
 
-        // Discard the REINSERT and store a DELETE instead.
+        // 3 - Create a mutation for the REINSERT above and add it to the UNDOs.
+        current_undo = Mutation::CreateInArena(arena,
+                                               delete_ts,
+                                               undo_encoder.as_changelist());
+        SetHead(&undo_head, current_undo);
+
+        // 4 - Create a DELETE mutation and add it to the UNDOs.
         undo_encoder.Reset();
         undo_encoder.SetToDelete();
-
-        // Reset the UNDO head, losing all previous undos.
-        undo_head = Mutation::CreateInArena(arena,
-                                            redo_mut->timestamp(),
-                                            undo_encoder.as_changelist());
-
-        // Also reset the previous redo head since it stored the delete which was nullified
-        // by this reinsert
-        redo_delete = nullptr;
-
-        if (PREDICT_FALSE(VLOG_IS_ON(2))) {
-          VLOG(2) << "Found REINSERT REDO, cannot create UNDO for it, resetting row history "
-              "under snapshot: " << snap.ToString() << ERROR_LOG_CONTEXT;
-        }
+        current_undo = Mutation::CreateInArena(arena,
+                                               redo_mut->timestamp(),
+                                               undo_encoder.as_changelist());
+        SetHead(&undo_head, current_undo);
         break;
       }
       default: LOG(FATAL) << "Unknown mutation type!" << ERROR_LOG_CONTEXT;
@@ -851,12 +1088,17 @@ Status FlushCompactionInput(CompactionInput* input,
                                                    input->PreparedBlockArena(),
                                                    &dst_row));
 
+      // Merge the histories of 'input_row' with previous ghosts, if there are any.
+      RETURN_NOT_OK(MergeDuplicatedRowHistory(input_row,
+                                              &new_undos_head,
+                                              input->PreparedBlockArena()));
+
       // Remove ancient UNDOS and check whether the row should be garbage collected.
       bool is_garbage_collected;
       RemoveAncientUndos(history_gc_opts,
-                                &new_undos_head,
-                                new_redos_head,
-                                &is_garbage_collected);
+                         &new_undos_head,
+                         new_redos_head,
+                         &is_garbage_collected);
 
       DVLOG(4) << "Output Row: " << RowToString(dst_row, new_redos_head, new_undos_head) <<
           "; Was garbage collected? " << is_garbage_collected;

http://git-wip-us.apache.org/repos/asf/kudu/blob/abab2ced/src/kudu/tablet/compaction.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/compaction.h b/src/kudu/tablet/compaction.h
index a430688..bead82e 100644
--- a/src/kudu/tablet/compaction.h
+++ b/src/kudu/tablet/compaction.h
@@ -109,7 +109,8 @@ class CompactionInput {
   // Returns the arena for this compaction input corresponding to the last
   // prepared block. This must be called *after* PrepareBlock() as if this
   // is a MergeCompactionInput only then will the right arena be selected.
-  virtual Arena*  PreparedBlockArena() = 0;
+  virtual Arena* PreparedBlockArena() = 0;
+
   virtual Status FinishBlock() = 0;
 
   virtual bool HasMoreBlocks() = 0;
@@ -162,6 +163,15 @@ struct CompactionInputRow {
   Mutation* redo_head;
   // The current undo head for this row, may be null if all undos were garbage collected.
   Mutation* undo_head;
+
+  // When the same row is found in multiple rowsets because of ghost rows, this points
+  // to one that older in terms of row history.
+  CompactionInputRow* previous_ghost;
+
+  CompactionInputRow() :
+      redo_head(nullptr),
+      undo_head(nullptr),
+      previous_ghost(nullptr) {}
 };
 
 // Function shared by flushes and compactions. Removes UNDO Mutations

http://git-wip-us.apache.org/repos/asf/kudu/blob/abab2ced/src/kudu/tablet/delta_store.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/delta_store.cc b/src/kudu/tablet/delta_store.cc
index ba91f29..137bcda 100644
--- a/src/kudu/tablet/delta_store.cc
+++ b/src/kudu/tablet/delta_store.cc
@@ -38,7 +38,6 @@ string DeltaKeyAndUpdate::Stringify(DeltaType type, const Schema& schema, bool p
                                                    atoi(key.timestamp().ToString().c_str()))
                                     : Substitute("$0@tx$1", key.row_idx(),
                                                  key.timestamp().ToString()))));
-
 }
 
 Status DebugDumpDeltaIterator(DeltaType type,

http://git-wip-us.apache.org/repos/asf/kudu/blob/abab2ced/src/kudu/tablet/deltafile.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/deltafile.cc b/src/kudu/tablet/deltafile.cc
index cd1f5be..1664133 100644
--- a/src/kudu/tablet/deltafile.cc
+++ b/src/kudu/tablet/deltafile.cc
@@ -100,12 +100,6 @@ Status DeltaFileWriter::FinishAndReleaseBlock(ScopedWritableBlockCloser* closer)
 Status DeltaFileWriter::DoAppendDelta(const DeltaKey &key,
                                       const RowChangeList &delta) {
   Slice delta_slice(delta.slice());
-
-  // See TODO in RowChangeListEncoder::SetToReinsert
-  CHECK(!delta.is_reinsert())
-    << "TODO: REINSERT deltas cannot currently be written to disk "
-    << "since they don't have a standalone encoded form.";
-
   tmp_buf_.clear();
 
   // Write the encoded form of the key to the file.
@@ -639,14 +633,12 @@ struct ApplyingVisitor {
     const Schema* schema = dfi->projection_;
     RowChangeListDecoder decoder((RowChangeList(deltas)));
     RETURN_NOT_OK(decoder.Init());
-    if (decoder.is_update()) {
+    if (decoder.is_update() || decoder.is_reinsert()) {
       return decoder.ApplyToOneColumn(rel_idx, dst, *schema, col_to_apply, dst->arena());
-    } else if (decoder.is_delete()) {
-      // If it's a DELETE, then it will be processed by DeletingVisitor.
-      return Status::OK();
-    } else {
-      dfi->FatalUnexpectedDelta(key, deltas, "Expect only UPDATE or DELETE deltas on disk");
     }
+
+    DCHECK(decoder.is_delete());
+    // If it's a DELETE, then it will be processed by LivenessVisitor.
     return Status::OK();
   }
 
@@ -686,16 +678,15 @@ Status DeltaFileIterator::ApplyUpdates(size_t col_to_apply, ColumnBlock *dst) {
     DVLOG(3) << "Applying REDO mutations to " << col_to_apply;
     ApplyingVisitor<REDO> visitor = {this, col_to_apply, dst};
     return VisitMutations(&visitor);
-  } else {
-    DVLOG(3) << "Applying UNDO mutations to " << col_to_apply;
-    ApplyingVisitor<UNDO> visitor = {this, col_to_apply, dst};
-    return VisitMutations(&visitor);
   }
+  DVLOG(3) << "Applying UNDO mutations to " << col_to_apply;
+  ApplyingVisitor<UNDO> visitor = {this, col_to_apply, dst};
+  return VisitMutations(&visitor);
 }
 
-// Visitor which applies deletes to the selection vector.
+// Visitor which establishes the liveness of a row by applying deletes and reinserts.
 template<DeltaType Type>
-struct DeletingVisitor {
+struct LivenessVisitor {
 
   Status Visit(const DeltaKey &key, const Slice &deltas, bool* continue_visit);
 
@@ -710,12 +701,19 @@ struct DeletingVisitor {
       // If this is an update the row must be selected.
       DCHECK(sel_vec->IsRowSelected(rel_idx));
       return Status::OK();
-    } else if (decoder.is_delete()) {
+    }
+
+    if (decoder.is_delete()) {
       DVLOG(3) << "Row deleted";
       sel_vec->SetRowUnselected(rel_idx);
-    } else {
-      dfi->FatalUnexpectedDelta(key, deltas, "Expect only UPDATE or DELETE deltas on disk");
+      return Status::OK();
     }
+
+    DCHECK(decoder.is_reinsert());
+    DVLOG(3) << "Re-selected the row (reinsert)";
+    // If this is a reinsert the row must be unselected.
+    DCHECK(!sel_vec->IsRowSelected(rel_idx));
+    sel_vec->SetRowSelected(rel_idx);
     return Status::OK();
   }
 
@@ -724,7 +722,7 @@ struct DeletingVisitor {
 };
 
 template<>
-inline Status DeletingVisitor<REDO>::Visit(const DeltaKey& key,
+inline Status LivenessVisitor<REDO>::Visit(const DeltaKey& key,
                                            const Slice& deltas,
                                            bool* continue_visit) {
   if (IsRedoRelevant(dfi->mvcc_snap_, key.timestamp(), continue_visit)) {
@@ -734,7 +732,7 @@ inline Status DeletingVisitor<REDO>::Visit(const DeltaKey& key,
 }
 
 template<>
-inline Status DeletingVisitor<UNDO>::Visit(const DeltaKey& key,
+inline Status LivenessVisitor<UNDO>::Visit(const DeltaKey& key,
                                            const Slice& deltas, bool*
                                            continue_visit) {
   if (IsUndoRelevant(dfi->mvcc_snap_, key.timestamp(), continue_visit)) {
@@ -748,13 +746,12 @@ Status DeltaFileIterator::ApplyDeletes(SelectionVector *sel_vec) {
   DCHECK_LE(prepared_count_, sel_vec->nrows());
   if (delta_type_ == REDO) {
     DVLOG(3) << "Applying REDO deletes";
-    DeletingVisitor<REDO> visitor = { this, sel_vec};
-    return VisitMutations(&visitor);
-  } else {
-    DVLOG(3) << "Applying UNDO deletes";
-    DeletingVisitor<UNDO> visitor = { this, sel_vec};
+    LivenessVisitor<REDO> visitor = { this, sel_vec };
     return VisitMutations(&visitor);
   }
+  DVLOG(3) << "Applying UNDO deletes";
+  LivenessVisitor<UNDO> visitor = { this, sel_vec };
+  return VisitMutations(&visitor);
 }
 
 // Visitor which, for each mutation, adds it into a ColumnBlock of

http://git-wip-us.apache.org/repos/asf/kudu/blob/abab2ced/src/kudu/tablet/deltafile.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/deltafile.h b/src/kudu/tablet/deltafile.h
index ea040d7..5706b08 100644
--- a/src/kudu/tablet/deltafile.h
+++ b/src/kudu/tablet/deltafile.h
@@ -53,7 +53,7 @@ struct ApplyingVisitor;
 template<DeltaType Type>
 struct CollectingVisitor;
 template<DeltaType Type>
-struct DeletingVisitor;
+struct LivenessVisitor;
 
 class DeltaFileWriter {
  public:
@@ -208,8 +208,8 @@ class DeltaFileIterator : public DeltaIterator {
   friend struct ApplyingVisitor<UNDO>;
   friend struct CollectingVisitor<REDO>;
   friend struct CollectingVisitor<UNDO>;
-  friend struct DeletingVisitor<REDO>;
-  friend struct DeletingVisitor<UNDO>;
+  friend struct LivenessVisitor<REDO>;
+  friend struct LivenessVisitor<UNDO>;
   friend struct FilterAndAppendVisitor;
 
   DISALLOW_COPY_AND_ASSIGN(DeltaFileIterator);

http://git-wip-us.apache.org/repos/asf/kudu/blob/abab2ced/src/kudu/tablet/tablet-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet-test.cc b/src/kudu/tablet/tablet-test.cc
index 90bc5ef..1de12bd 100644
--- a/src/kudu/tablet/tablet-test.cc
+++ b/src/kudu/tablet/tablet-test.cc
@@ -219,6 +219,69 @@ TYPED_TEST(TestTablet, TestInsertDuplicateKey) {
   ASSERT_EQ(1, this->TabletCount());
 }
 
+// Tests that we are able to handle reinserts properly.
+//
+// Namely tests that:
+// - We're able to perform multiple reinserts in a MRS, flush them
+//   and that all versions of the row are still visible.
+// - After we've flushed the reinserts above, we can perform a
+//   new reinsert in a new MRS, flush that MRS and compact the row
+//   DRS together, all while preserving the full row history.
+TYPED_TEST(TestTablet, TestReinserts) {
+  LocalTabletWriter writer(this->tablet().get(), &this->client_schema_);
+
+  vector<MvccSnapshot> snaps;
+  // In the first snap there's no row.
+  snaps.push_back(MvccSnapshot(*this->tablet()->mvcc_manager()));
+
+  // Insert one row.
+  ASSERT_OK(this->InsertTestRow(&writer, 1, 0));
+
+  // In the second snap the row exists and has value 0.
+  snaps.push_back(MvccSnapshot(*this->tablet()->mvcc_manager()));
+
+  // Now delete the test row.
+  ASSERT_OK(this->DeleteTestRow(&writer, 1));
+
+  // In the third snap the row doesn't exist.
+  snaps.push_back(MvccSnapshot(*this->tablet()->mvcc_manager()));
+
+  // Reinsert the row.
+  ASSERT_OK(this->InsertTestRow(&writer, 1, 1));
+
+  // In the fourth snap the row exists again and has value 1.
+  snaps.push_back(MvccSnapshot(*this->tablet()->mvcc_manager()));
+
+  // .. and delete the row again.
+  ASSERT_OK(this->DeleteTestRow(&writer, 1));
+
+  // In the fifth snap the row has been deleted.
+  snaps.push_back(MvccSnapshot(*this->tablet()->mvcc_manager()));
+
+  // Now flush the MRS all versions of the tablet should be visible,
+  // depending on the chosen snapshot.
+  ASSERT_OK(this->tablet()->Flush());
+
+  vector<vector<string>* > expected_rows;
+  CollectRowsForSnapshots(this->tablet().get(), this->client_schema_,
+                          snaps, &expected_rows);
+
+  ASSERT_EQ(expected_rows.size(), 5);
+  ASSERT_EQ(expected_rows[0]->size(), 0) << "Got the wrong result from snap: "
+                                         << snaps[0].ToString();
+  ASSERT_EQ(expected_rows[1]->size(), 1) << "Got the wrong result from snap: "
+                                         << snaps[1].ToString();
+  ASSERT_STR_CONTAINS((*expected_rows[1])[0], "int32 key_idx=1, int32 val=0)");
+  ASSERT_EQ(expected_rows[2]->size(), 0) << "Got the wrong result from snap: "
+                                         << snaps[2].ToString();
+  ASSERT_EQ(expected_rows[3]->size(), 1) << "Got the wrong result from snap: "
+                                         << snaps[3].ToString();
+  ASSERT_STR_CONTAINS((*expected_rows[3])[0], "int32 key_idx=1, int32 val=1)");
+  ASSERT_EQ(expected_rows[4]->size(), 0) << "Got the wrong result from snap: "
+                                         << snaps[4].ToString();
+
+  STLDeleteElements(&expected_rows);
+}
 
 // Test flushes and compactions dealing with deleted rows.
 TYPED_TEST(TestTablet, TestDeleteWithFlushAndCompact) {


Mime
View raw message