kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject [3/4] kudu git commit: KUDU-236 (part 1). Implement tablet history GC
Date Fri, 09 Sep 2016 19:13:45 GMT
http://git-wip-us.apache.org/repos/asf/kudu/blob/be719edc/src/kudu/tablet/tablet_history_gc-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_history_gc-test.cc b/src/kudu/tablet/tablet_history_gc-test.cc
new file mode 100644
index 0000000..02a7b3e
--- /dev/null
+++ b/src/kudu/tablet/tablet_history_gc-test.cc
@@ -0,0 +1,511 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <atomic>
+#include <gflags/gflags.h>
+
+#include "kudu/server/hybrid_clock.h"
+#include "kudu/tablet/compaction.h"
+#include "kudu/tablet/mvcc.h"
+#include "kudu/tablet/tablet.h"
+#include "kudu/tablet/tablet_metrics.h"
+#include "kudu/tablet/tablet-test-base.h"
+
+DECLARE_int32(tablet_history_max_age_sec);
+DECLARE_bool(use_mock_wall_clock);
+
+using kudu::server::HybridClock;
+
+// Specify row regex to match on. Empty string means don't match anything.
+#define ASSERT_DEBUG_DUMP_ROWS_MATCH(pattern) do { \
+  const std::string& _pat = (pattern); \
+  vector<string> _rows; \
+  ASSERT_OK(tablet()->DebugDump(&_rows)); \
+  /* Ignore the non-data (formattting) lines in the output. */ \
+  std::string _base_pat = R"(^Dumping|^-|^MRS|^RowSet)"; \
+  if (!_pat.empty()) _base_pat += "|"; \
+  ASSERT_STRINGS_ALL_MATCH(_rows, _base_pat + _pat); \
+} while (0)
+
+namespace kudu {
+namespace tablet {
+
+class TabletHistoryGcTest : public TabletTestBase<IntKeyTestSetup<INT64>> {
+ public:
+  typedef TabletTestBase<IntKeyTestSetup<INT64>> Superclass;
+
+  TabletHistoryGcTest()
+      : Superclass(TabletHarness::Options::HYBRID_CLOCK) {
+    FLAGS_use_mock_wall_clock = true;
+  }
+
+  virtual void SetUp() OVERRIDE {
+    NO_FATALS(TabletTestBase<IntKeyTestSetup<INT64>>::SetUp());
+    // Mock clock defaults to 0 and this screws up the AHM calculation which ends up negative.
+    down_cast<HybridClock*>(clock())->SetMockClockWallTimeForTests(GetCurrentTimeMicros());
+  }
+
+ protected:
+  enum ToFlush {
+    FLUSH,
+    NO_FLUSH
+  };
+
+  void InsertOriginalRows(int num_rowsets, uint64_t rows_per_rowset);
+  void AddTimeToHybridClock(MonoDelta delta) {
+    uint64_t now = HybridClock::GetPhysicalValueMicros(clock()->Now());
+    uint64_t new_time = now + delta.ToMicroseconds();
+    down_cast<HybridClock*>(clock())->SetMockClockWallTimeForTests(new_time);
+  }
+
+  int64_t TotalNumRows() const { return num_rowsets_ * rows_per_rowset_; }
+
+  TestRowVerifier GenRowsEqualVerifier(int32_t expected_val) {
+    return [=](int32_t key, int32_t val) -> bool { return val == expected_val; };
+  }
+
+  const TestRowVerifier kRowsEqual0 = GenRowsEqualVerifier(0);
+  const TestRowVerifier kRowsEqual1 = GenRowsEqualVerifier(1);
+  const TestRowVerifier kRowsEqual2 = GenRowsEqualVerifier(2);
+
+  const int kStartRow = 0;
+  int num_rowsets_ = 3;
+  int rows_per_rowset_ = 300;
+};
+
+void TabletHistoryGcTest::InsertOriginalRows(int num_rowsets, uint64_t rows_per_rowset) {
+  ClampRowCount(num_rowsets * rows_per_rowset);
+  for (int rowset_id = 0; rowset_id < num_rowsets; rowset_id++) {
+    InsertTestRows(rowset_id * rows_per_rowset, rows_per_rowset, 0);
+    ASSERT_OK(tablet()->Flush());
+  }
+  ASSERT_EQ(num_rowsets, tablet()->num_rowsets());
+}
+
+// Test that we do not generate undos for redo operations that are older than
+// the AHM during major delta compaction.
+TEST_F(TabletHistoryGcTest, TestNoGenerateUndoOnMajorDeltaCompaction) {
+  FLAGS_tablet_history_max_age_sec = 1; // Keep history for 1 second.
+
+  NO_FATALS(InsertOriginalRows(num_rowsets_, rows_per_rowset_));
+  NO_FATALS(VerifyTestRowsWithVerifier(kStartRow, TotalNumRows(), kRowsEqual0));
+  Timestamp time_after_insert = clock()->Now();
+
+  // Timestamps recorded after each round of updates.
+  Timestamp post_update_ts[2];
+
+  // Mutate all of the rows, setting val=1. Then again for val=2.
+  LocalTabletWriter writer(tablet().get(), &client_schema_);
+  for (int val = 1; val <= 2; val++) {
+    for (int row_idx = 0; row_idx < TotalNumRows(); row_idx++) {
+      ASSERT_OK(UpdateTestRow(&writer, row_idx, val));
+    }
+    // We must flush the DMS before major compaction can operate on these REDOs.
+    for (int i = 0; i < num_rowsets_; i++) {
+      tablet()->FlushBiggestDMS();
+    }
+    post_update_ts[val - 1] = clock()->Now();
+  }
+
+  // Move the AHM beyond our mutations, which are represented as REDOs.
+  NO_FATALS(AddTimeToHybridClock(MonoDelta::FromSeconds(2)));
+
+  // Current-time reads should give us 2, but reads from the past should give
+  // us 0 or 1.
+  NO_FATALS(VerifyTestRowsWithTimestampAndVerifier(kStartRow, TotalNumRows(),
+                                                   time_after_insert, kRowsEqual0));
+  NO_FATALS(VerifyTestRowsWithTimestampAndVerifier(kStartRow, TotalNumRows(),
+                                                   post_update_ts[0], kRowsEqual1));
+  NO_FATALS(VerifyTestRowsWithTimestampAndVerifier(kStartRow, TotalNumRows(),
+                                                   post_update_ts[1], kRowsEqual2));
+  NO_FATALS(VerifyTestRowsWithVerifier(kStartRow, TotalNumRows(), kRowsEqual2));
+
+  // Run major delta compaction.
+  for (int i = 0; i < num_rowsets_; i++) {
+    ASSERT_OK(tablet()->CompactWorstDeltas(RowSet::MAJOR_DELTA_COMPACTION));
+  }
+
+  // Now, we should have base data = 2 with no other historical values.
+  // Major delta compaction will not remove UNDOs, so we expect a single UNDO DELETE as well.
+  ASSERT_DEBUG_DUMP_ROWS_MATCH(R"(int32 val=2\) Undos: \[@[[:digit:]]+\(DELETE\)\] Redos:
\[\]$)");
+}
+
+// Test that major delta compaction works when run on a subset of columns:
+// 1. Insert rows and flush to DiskRowSets.
+// 2. Mutate two columns.
+// 3. Move time forward.
+// 4. Run major delta compaction on a single column.
+// 5. Make sure we don't lose anything unexpected.
+TEST_F(TabletHistoryGcTest, TestMajorDeltaCompactionOnSubsetOfColumns) {
+  FLAGS_tablet_history_max_age_sec = 100;
+
+  num_rowsets_ = 3;
+  rows_per_rowset_ = 20;
+
+  NO_FATALS(InsertOriginalRows(num_rowsets_, rows_per_rowset_));
+  NO_FATALS(VerifyTestRowsWithVerifier(kStartRow, TotalNumRows(), kRowsEqual0));
+
+  LocalTabletWriter writer(tablet().get(), &client_schema_);
+  for (int32_t row_key = 0; row_key < TotalNumRows(); row_key++) {
+    KuduPartialRow row(&client_schema_);
+    setup_.BuildRowKey(&row, row_key);
+    ASSERT_OK_FAST(row.SetInt32(1, 1));
+    ASSERT_OK_FAST(row.SetInt32(2, 2));
+    ASSERT_OK_FAST(writer.Update(row));
+  }
+  for (int i = 0; i < num_rowsets_; i++) {
+    tablet()->FlushBiggestDMS();
+  }
+
+  NO_FATALS(AddTimeToHybridClock(MonoDelta::FromSeconds(200)));
+
+  vector<std::shared_ptr<RowSet>> rowsets;
+  tablet()->GetRowSetsForTests(&rowsets);
+  for (int i = 0; i < num_rowsets_; i++) {
+    DiskRowSet* drs = down_cast<DiskRowSet*>(rowsets[i].get());
+    vector<ColumnId> col_ids_to_compact = { schema_.column_id(2) };
+    ASSERT_OK(drs->MajorCompactDeltaStoresWithColumnIds(col_ids_to_compact,
+                                                        tablet()->GetHistoryGcOpts()));
+  }
+
+  ASSERT_DEBUG_DUMP_ROWS_MATCH(R"(int32 val=2\) Undos: \[@[[:digit:]]+\(DELETE\)\] )"
+                               R"(Redos: \[@[[:digit:]]+\(SET key_idx=1\)\]$)");
+
+  vector<string> rows;
+  ASSERT_OK(IterateToStringList(&rows));
+  ASSERT_EQ(TotalNumRows(), rows.size());
+}
+
+// Tests the following two MRS flush scenarios:
+// 1. Verify that no UNDO is generated after inserting a row into the MRS,
+//    waiting for the AHM to pass, then flushing the MRS.
+// 2. Same as #1 but delete the inserted row from the MRS before waiting for
+//    the AHM to pass.
+TEST_F(TabletHistoryGcTest, TestNoGenerateUndoOnMRSFlush) {
+  FLAGS_tablet_history_max_age_sec = 100;
+
+  Timestamp time_before_insert = clock()->Now();
+  LocalTabletWriter writer(tablet().get(), &client_schema_);
+  for (int32_t i = kStartRow; i < TotalNumRows(); i++) {
+    ASSERT_OK(InsertTestRow(&writer, i, 0));
+  }
+  Timestamp time_after_insert = clock()->Now();
+  for (int32_t i = kStartRow; i < TotalNumRows(); i++) {
+    ASSERT_OK(DeleteTestRow(&writer, i));
+  }
+  Timestamp time_after_delete = clock()->Now();
+
+  // Move the clock.
+  NO_FATALS(AddTimeToHybridClock(MonoDelta::FromSeconds(200)));
+
+  NO_FATALS(VerifyTestRowsWithTimestampAndVerifier(kStartRow, 0,
+                                                   time_before_insert, boost::none));
+  NO_FATALS(VerifyTestRowsWithTimestampAndVerifier(kStartRow, TotalNumRows(),
+                                                   time_after_insert, kRowsEqual0));
+  NO_FATALS(VerifyTestRowsWithTimestampAndVerifier(kStartRow, 0,
+                                                   time_after_delete, boost::none));
+
+  // Now flush the MRS. No trace should remain after this.
+  ASSERT_OK(tablet()->Flush());
+  ASSERT_DEBUG_DUMP_ROWS_MATCH("");
+
+  for (const auto& rsmd : tablet()->metadata()->rowsets()) {
+    ASSERT_EQ(0, rsmd->undo_delta_blocks().size());
+  }
+  ASSERT_EQ(0, tablet()->EstimateOnDiskSize());
+
+  // Now check the same thing (flush not generating an UNDO), but without the
+  // delete following the insert. We do it with a single row.
+
+  ASSERT_OK(InsertTestRow(&writer, kStartRow, 0));
+  NO_FATALS(AddTimeToHybridClock(MonoDelta::FromSeconds(200)));
+  ASSERT_OK(tablet()->Flush());
+  // There should be no undo blocks, despite flushing an insert.
+  for (const auto& rsmd : tablet()->metadata()->rowsets()) {
+    ASSERT_EQ(0, rsmd->undo_delta_blocks().size());
+  }
+  NO_FATALS(VerifyTestRowsWithTimestampAndVerifier(kStartRow, 1, Timestamp(0), kRowsEqual0));
+  ASSERT_DEBUG_DUMP_ROWS_MATCH(R"(int32 val=0\) Undos: \[\] Redos: \[\]$)");
+}
+
+// Test that undos get GCed on a merge compaction.
+// In this test, we GC the UNDO that undoes the insert.
+TEST_F(TabletHistoryGcTest, TestUndoGCOnMergeCompaction) {
+  FLAGS_tablet_history_max_age_sec = 1; // Keep history for 1 second.
+
+  Timestamp time_before_insert = clock()->Now();
+  NO_FATALS(InsertOriginalRows(num_rowsets_, rows_per_rowset_));
+  NO_FATALS(VerifyTestRowsWithVerifier(kStartRow, TotalNumRows(), kRowsEqual0));
+
+  // The earliest thing we can see is an empty tablet.
+  NO_FATALS(VerifyTestRowsWithTimestampAndVerifier(kStartRow, 0, time_before_insert, boost::none));
+
+  // Move the clock so the insert is prior to the AHM, then compact.
+  NO_FATALS(AddTimeToHybridClock(MonoDelta::FromSeconds(2)));
+  ASSERT_OK(tablet()->Compact(Tablet::FORCE_COMPACT_ALL));
+
+  // Now the only thing we can see is the base data.
+  NO_FATALS(VerifyTestRowsWithTimestampAndVerifier(kStartRow, TotalNumRows(), time_before_insert,
+                                                   kRowsEqual0));
+  ASSERT_DEBUG_DUMP_ROWS_MATCH(R"(int32 val=0\) Undos: \[\] Redos: \[\]$)");
+}
+
+// Test that we GC the history and existence of entire deleted rows on a merge compaction.
+TEST_F(TabletHistoryGcTest, TestRowRemovalGCOnMergeCompaction) {
+  FLAGS_tablet_history_max_age_sec = 100; // Keep history for 100 seconds.
+
+  NO_FATALS(InsertOriginalRows(num_rowsets_, rows_per_rowset_));
+  NO_FATALS(VerifyTestRowsWithVerifier(kStartRow, TotalNumRows(), kRowsEqual0));
+
+  Timestamp prev_time = clock()->Now();
+
+  NO_FATALS(AddTimeToHybridClock(MonoDelta::FromSeconds(200)));
+
+  // Delete all of the rows in the tablet.
+  LocalTabletWriter writer(tablet().get(), &client_schema_);
+  for (int row_idx = 0; row_idx < TotalNumRows(); row_idx++) {
+    ASSERT_OK(DeleteTestRow(&writer, row_idx));
+  }
+  ASSERT_OK(tablet()->Flush());
+  ASSERT_DEBUG_DUMP_ROWS_MATCH(
+      R"(int32 val=0\) Undos: \[@[[:digit:]]+\(DELETE\)\] Redos: \[@[[:digit:]]+\(DELETE\)\]$)");
+
+  // Compaction at this time will only remove the initial UNDO records. The
+  // DELETE REDOs are too recent.
+  ASSERT_OK(tablet()->Compact(Tablet::FORCE_COMPACT_ALL));
+  ASSERT_DEBUG_DUMP_ROWS_MATCH(R"(int32 val=0\) Undos: \[\] Redos: \[@[[:digit:]]+\(DELETE\)\]$)");
+
+  // Move the AHM so that the delete is now prior to it.
+  NO_FATALS(AddTimeToHybridClock(MonoDelta::FromSeconds(200)));
+
+  // Now that even the deletion is prior to the AHM, all of the on-disk data
+  // will be GCed.
+  ASSERT_OK(tablet()->Compact(Tablet::FORCE_COMPACT_ALL));
+  ASSERT_DEBUG_DUMP_ROWS_MATCH("");
+  NO_FATALS(VerifyTestRowsWithTimestampAndVerifier(kStartRow, 0, prev_time, boost::none));
+  ASSERT_EQ(0, tablet()->EstimateOnDiskSize());
+}
+
+// Test that we don't over-aggressively GC history prior to the AHM.
+TEST_F(TabletHistoryGcTest, TestNoUndoGCUntilAncientHistoryMark) {
+  FLAGS_tablet_history_max_age_sec = 1000; // 1000 seconds before we GC history.
+
+  NO_FATALS(InsertOriginalRows(num_rowsets_, rows_per_rowset_));
+
+  Timestamp prev_time = clock()->Now();
+  NO_FATALS(AddTimeToHybridClock(MonoDelta::FromSeconds(2)));
+
+  // Mutate all of the rows.
+  LocalTabletWriter writer(tablet().get(), &client_schema_);
+  for (int row_idx = 0; row_idx < TotalNumRows(); row_idx++) {
+    SCOPED_TRACE(Substitute("Row index: $0", row_idx));
+    ASSERT_OK(UpdateTestRow(&writer, row_idx, 1));
+  }
+
+  // Current-time reads should give us 1, but reads from the past should give us 0.
+  NO_FATALS(VerifyTestRowsWithVerifier(kStartRow, TotalNumRows(), kRowsEqual1));
+  NO_FATALS(VerifyTestRowsWithTimestampAndVerifier(kStartRow, TotalNumRows(), prev_time,
+                                                   kRowsEqual0));
+
+  for (int i = 0; i < num_rowsets_; i++) {
+    ASSERT_OK(tablet()->CompactWorstDeltas(RowSet::MINOR_DELTA_COMPACTION));
+    ASSERT_OK(tablet()->CompactWorstDeltas(RowSet::MAJOR_DELTA_COMPACTION));
+  }
+  ASSERT_OK(tablet()->Compact(Tablet::FORCE_COMPACT_ALL));
+
+  // Still read 0 from the past.
+  NO_FATALS(VerifyTestRowsWithTimestampAndVerifier(kStartRow, TotalNumRows(), prev_time,
+                                                   kRowsEqual0));
+  ASSERT_DEBUG_DUMP_ROWS_MATCH(R"(int32 val=1\) Undos: \[@[[:digit:]]+\(SET val=0\), )"
+                               R"(@[[:digit:]]+\(DELETE\)\] Redos: \[\]$)");
+}
+
+// Test that "ghost" rows (deleted on one rowset, reinserted on another) don't
+// get revived after history GC.
+TEST_F(TabletHistoryGcTest, TestGhostRowsNotRevived) {
+  FLAGS_tablet_history_max_age_sec = 100;
+
+  LocalTabletWriter writer(tablet().get(), &client_schema_);
+  for (int i = 0; i <= 2; i++) {
+    ASSERT_OK(InsertTestRow(&writer, 0, i));
+    ASSERT_OK(DeleteTestRow(&writer, 0));
+    ASSERT_OK(tablet()->Flush());
+  }
+
+  // Create one more rowset on disk which has just an INSERT (ie a non-ghost row).
+  ASSERT_OK(InsertTestRow(&writer, 0, 3));
+  ASSERT_OK(tablet()->Flush());
+
+  // Move the clock, then compact. This should result in a rowset with just one
+  // row in it.
+  NO_FATALS(AddTimeToHybridClock(MonoDelta::FromSeconds(200)));
+  ASSERT_OK(tablet()->Compact(Tablet::FORCE_COMPACT_ALL));
+
+  // We should end up with a single row as base data.
+  NO_FATALS(VerifyTestRows(0, 1));
+  ASSERT_DEBUG_DUMP_ROWS_MATCH(R"(int32 val=3\) Undos: \[\] Redos: \[\])");
+}
+
+// Test to ensure that nothing bad happens when we partially GC different rows
+// in a rowset. We delete alternating keys to end up with a mix of GCed and
+// non-GCed rows in each rowset.
+TEST_F(TabletHistoryGcTest, TestGcOnAlternatingRows) {
+  FLAGS_tablet_history_max_age_sec = 100;
+  num_rowsets_ = 3;
+  rows_per_rowset_ = 5;
+
+  LocalTabletWriter writer(tablet().get(), &client_schema_);
+  for (int rowset_id = 0; rowset_id < num_rowsets_; rowset_id++) {
+    for (int i = 0; i < rows_per_rowset_; i++) {
+      int32_t row_key = rowset_id * rows_per_rowset_ + i;
+      ASSERT_OK(InsertTestRow(&writer, row_key, 0));
+    }
+    ASSERT_OK(tablet()->Flush());
+  }
+
+  // Delete all the odd rows.
+  for (int32_t row_key = 1; row_key < TotalNumRows(); row_key += 2) {
+    ASSERT_OK(DeleteTestRow(&writer, row_key));
+  }
+
+  // Move the clock and compact. We should end up with even rows.
+  NO_FATALS(AddTimeToHybridClock(MonoDelta::FromSeconds(200)));
+  ASSERT_OK(tablet()->Compact(Tablet::FORCE_COMPACT_ALL));
+
+  vector<string> rows;
+  ASSERT_OK(IterateToStringList(&rows));
+  ASSERT_EQ(TotalNumRows() / 2 + 1, rows.size());
+
+  // Even row keys are assigned negative values in this test framework and so
+  // end up sorted negatively.
+  std::reverse(rows.begin(), rows.end());
+
+  int i = 0;
+  for (int32_t row_key = 0; row_key < TotalNumRows(); row_key += 2) {
+    ASSERT_STR_CONTAINS(rows[i], Substitute("int64 key=$0, int32 key_idx=$1, int32 val=0",
+                                            -1 * row_key, row_key));
+    i++;
+  }
+}
+
+// Ensure that ReupdateMissedDeltas() doesn't reupdate the wrong row.
+// 1. Insert rows and flush.
+// 2. Delete some rows.
+// 3. Move time forward.
+// 4. Begin merge compaction.
+// 5. Insert some of the deleted rows after phase 1 snapshot is written but before phase
2.
+// 6. Update some of the rows in-between the deleted rows.
+// 7. Ensure that the rows all look right according to what we expect.
+//
+// This test uses the following pattern. Rows with even keys are deleted, rows
+// with odd keys are used in the test. The following takes place:
+// - Rows 1 and 5 are inserted with values equaling their keys and are not mutated.
+// - Rows 3 and 9 are inserted and then updated with values equaling their keys * 10 + 1.
+// - Row 7 is deleted and then reinserted, as well as updated using successive values.
+TEST_F(TabletHistoryGcTest, TestGcWithConcurrentCompaction) {
+  FLAGS_tablet_history_max_age_sec = 100;
+
+  class MyCommonHooks : public Tablet::FlushCompactCommonHooks {
+   public:
+    explicit MyCommonHooks(TabletHistoryGcTest* test)
+        : test_(test),
+          offset_(0) {
+    }
+
+    Status PostWriteSnapshot() OVERRIDE {
+      LocalTabletWriter writer(test_->tablet().get(), &test_->client_schema());
+      int offset = offset_.load(std::memory_order_acquire);
+      // Update our reinserted row.
+      CHECK_OK(test_->UpdateTestRow(&writer, 7, 73 + offset));
+
+      // Also insert and update other rows after the flush.
+      CHECK_OK(test_->UpdateTestRow(&writer, 3, 30 + offset));
+      CHECK_OK(test_->UpdateTestRow(&writer, 9, 90 + offset));
+      return Status::OK();
+    }
+
+    void set_offset(int offset) {
+      offset_.store(offset, std::memory_order_release);
+    }
+
+   private:
+    TabletHistoryGcTest* const test_;
+    std::atomic<int> offset_;
+  };
+
+  std::shared_ptr<MyCommonHooks> hooks = std::make_shared<MyCommonHooks>(this);
+  tablet()->SetFlushCompactCommonHooksForTests(hooks);
+
+  LocalTabletWriter writer(tablet().get(), &client_schema_);
+  for (int i = 0; i < 10; i++) {
+    ASSERT_OK(InsertTestRow(&writer, i, i));
+  }
+  // Also generate a reinsert.
+  ASSERT_OK(DeleteTestRow(&writer, 7));
+  NO_FATALS(AddTimeToHybridClock(MonoDelta::FromSeconds(200)));
+  ASSERT_OK(InsertTestRow(&writer, 7, 71));
+  CHECK_OK(UpdateTestRow(&writer, 7, 72));
+
+  // Flush the rowset.
+  ASSERT_OK(tablet()->Flush());
+
+  // Delete every even row.
+  for (int i = 0; i < 10; i += 2) {
+    ASSERT_OK(DeleteTestRow(&writer, i));
+  }
+
+  NO_FATALS(AddTimeToHybridClock(MonoDelta::FromSeconds(200)));
+
+  hooks->set_offset(1);
+  ASSERT_OK(tablet()->Compact(Tablet::FORCE_COMPACT_ALL));
+  tablet()->SetFlushCompactCommonHooksForTests(nullptr);
+
+  vector<string> rows;
+  ASSERT_OK(IterateToStringList(&rows));
+
+  if (VLOG_IS_ON(2)) {
+    for (const string& r : rows) {
+      VLOG(2) << r;
+    }
+  }
+
+  vector<int32_t> expected_rows = { 1, 3, 5, 7, 9 };
+  for (int i = 0; i < expected_rows.size(); i++) {
+    int32_t key = expected_rows[i];
+    switch (key) {
+      case 1:
+      case 5:
+        ASSERT_STR_CONTAINS(rows[i], Substitute("int32 key_idx=$0, int32 val=$1",
+                                                key, key));
+        break;
+      case 3:
+      case 9:
+        ASSERT_STR_CONTAINS(rows[i], Substitute("int32 key_idx=$0, int32 val=$1",
+                                                key, key * 10 + 1));
+        break;
+      case 7:
+        ASSERT_STR_CONTAINS(rows[i], Substitute("int32 key_idx=$0, int32 val=$1",
+                                                key, key * 10 + 4));
+        break;
+      default:
+        break;
+    }
+  }
+}
+
+} // namespace tablet
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/be719edc/src/kudu/tserver/tablet_service.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc
index 34013a8..2a5c7cd 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -36,6 +36,7 @@
 #include "kudu/rpc/rpc_context.h"
 #include "kudu/rpc/rpc_sidecar.h"
 #include "kudu/server/hybrid_clock.h"
+#include "kudu/tablet/compaction.h"
 #include "kudu/tablet/metadata.pb.h"
 #include "kudu/tablet/tablet_bootstrap.h"
 #include "kudu/tablet/tablet_metrics.h"
@@ -82,6 +83,41 @@ DEFINE_int32(scanner_inject_latency_on_each_batch_ms, 0,
 TAG_FLAG(scanner_inject_latency_on_each_batch_ms, unsafe);
 
 DECLARE_int32(memory_limit_warn_threshold_percentage);
+DECLARE_int32(tablet_history_max_age_sec);
+
+using google::protobuf::RepeatedPtrField;
+using kudu::consensus::ChangeConfigRequestPB;
+using kudu::consensus::ChangeConfigResponsePB;
+using kudu::consensus::CONSENSUS_CONFIG_ACTIVE;
+using kudu::consensus::CONSENSUS_CONFIG_COMMITTED;
+using kudu::consensus::Consensus;
+using kudu::consensus::ConsensusConfigType;
+using kudu::consensus::ConsensusRequestPB;
+using kudu::consensus::ConsensusResponsePB;
+using kudu::consensus::GetLastOpIdRequestPB;
+using kudu::consensus::GetNodeInstanceRequestPB;
+using kudu::consensus::GetNodeInstanceResponsePB;
+using kudu::consensus::LeaderStepDownRequestPB;
+using kudu::consensus::LeaderStepDownResponsePB;
+using kudu::consensus::RunLeaderElectionRequestPB;
+using kudu::consensus::RunLeaderElectionResponsePB;
+using kudu::consensus::StartTabletCopyRequestPB;
+using kudu::consensus::StartTabletCopyResponsePB;
+using kudu::consensus::VoteRequestPB;
+using kudu::consensus::VoteResponsePB;
+using kudu::rpc::ResultTracker;
+using kudu::rpc::RpcContext;
+using kudu::server::HybridClock;
+using kudu::tablet::AlterSchemaTransactionState;
+using kudu::tablet::Tablet;
+using kudu::tablet::TabletPeer;
+using kudu::tablet::TabletStatusPB;
+using kudu::tablet::TransactionCompletionCallback;
+using kudu::tablet::WriteTransactionState;
+using std::shared_ptr;
+using std::unique_ptr;
+using std::vector;
+using strings::Substitute;
 
 namespace kudu {
 namespace cfile {
@@ -93,40 +129,6 @@ extern const char* CFILE_CACHE_HIT_BYTES_METRIC_NAME;
 namespace kudu {
 namespace tserver {
 
-using consensus::ChangeConfigRequestPB;
-using consensus::ChangeConfigResponsePB;
-using consensus::CONSENSUS_CONFIG_ACTIVE;
-using consensus::CONSENSUS_CONFIG_COMMITTED;
-using consensus::Consensus;
-using consensus::ConsensusConfigType;
-using consensus::ConsensusRequestPB;
-using consensus::ConsensusResponsePB;
-using consensus::GetLastOpIdRequestPB;
-using consensus::GetNodeInstanceRequestPB;
-using consensus::GetNodeInstanceResponsePB;
-using consensus::LeaderStepDownRequestPB;
-using consensus::LeaderStepDownResponsePB;
-using consensus::RunLeaderElectionRequestPB;
-using consensus::RunLeaderElectionResponsePB;
-using consensus::StartTabletCopyRequestPB;
-using consensus::StartTabletCopyResponsePB;
-using consensus::VoteRequestPB;
-using consensus::VoteResponsePB;
-
-using google::protobuf::RepeatedPtrField;
-using rpc::ResultTracker;
-using rpc::RpcContext;
-using std::shared_ptr;
-using std::unique_ptr;
-using std::vector;
-using strings::Substitute;
-using tablet::AlterSchemaTransactionState;
-using tablet::Tablet;
-using tablet::TabletPeer;
-using tablet::TabletStatusPB;
-using tablet::TransactionCompletionCallback;
-using tablet::WriteTransactionState;
-
 namespace {
 
 // Lookup the given tablet, ensuring that it both exists and is RUNNING.
@@ -1041,7 +1043,7 @@ void TabletServiceImpl::Scan(const ScanRequestPB* req,
   ScanResultCopier collector(&data, rows_data.get(), indirect_data.get());
 
   bool has_more_results = false;
-  TabletServerErrorPB::Code error_code;
+  TabletServerErrorPB::Code error_code = TabletServerErrorPB::UNKNOWN_ERROR;
   if (req->has_new_scan_request()) {
     const NewScanRequestPB& scan_pb = req->new_scan_request();
     scoped_refptr<TabletPeer> tablet_peer;
@@ -1509,6 +1511,37 @@ Status TabletServiceImpl::HandleNewScanRequest(TabletPeer* tablet_peer,
     return s;
   }
 
+  // If this is a snapshot scan and the user specified a specific timestamp to
+  // scan at, then check that we are not attempting to scan at a time earlier
+  // than the ancient history mark. Only perform this check if tablet history
+  // GC is enabled.
+  //
+  // TODO: This validation essentially prohibits scans with READ_AT_SNAPSHOT
+  // when history_max_age is set to zero. There is a tablet history GC related
+  // race when the history max age is set to very low, or zero. Imagine a case
+  // where a scan was started and READ_AT_SNAPSHOT was specified without
+  // specifying a snapshot timestamp, and --tablet_history_max_age_sec=0. The
+  // above code path will select the latest timestamp (under a lock) prior to
+  // calling RowIterator::Init(), which actually opens the blocks. That means
+  // that there is an opportunity in between those two calls for tablet history
+  // GC to kick in and delete some history. In fact, we may easily not actually
+  // end up with a valid snapshot in that case. It would be more correct to
+  // initialize the row iterator and then select the latest timestamp
+  // represented by those open files in that case.
+  Timestamp ancient_history_mark;
+  tablet::HistoryGcOpts history_gc_opts = tablet->GetHistoryGcOpts();
+  if (scan_pb.read_mode() == READ_AT_SNAPSHOT &&
+      history_gc_opts.IsAncientHistory(*snap_timestamp)) {
+    // Now that we have initialized our row iterator at a snapshot, return an
+    // error if the snapshot timestamp was prior to the ancient history mark.
+    // We have to check after we open the iterator in order to avoid a TOCTOU
+    // error.
+    *error_code = TabletServerErrorPB::INVALID_SNAPSHOT;
+    return Status::InvalidArgument("Snapshot timestamp is earlier than the ancient history
mark",
+                                   "consider increasing the value of the configuration parameter
"
+                                   "--tablet_history_max_age_sec");
+  }
+
   *has_more_results = iter->HasNext();
   TRACE("has_more: $0", *has_more_results);
   if (!*has_more_results) {


Mime
View raw message