Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 4E485200B6B for ; Fri, 9 Sep 2016 21:13:46 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 4C11B160AA3; Fri, 9 Sep 2016 19:13:46 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id EA9EC160AD5 for ; Fri, 9 Sep 2016 21:13:44 +0200 (CEST) Received: (qmail 18547 invoked by uid 500); 9 Sep 2016 19:13:44 -0000 Mailing-List: contact commits-help@kudu.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kudu.apache.org Delivered-To: mailing list commits@kudu.apache.org Received: (qmail 18489 invoked by uid 99); 9 Sep 2016 19:13:43 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 09 Sep 2016 19:13:43 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A9CA2E2EF4; Fri, 9 Sep 2016 19:13:43 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: todd@apache.org To: commits@kudu.apache.org Date: Fri, 09 Sep 2016 19:13:45 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [3/4] kudu git commit: KUDU-236 (part 1). Implement tablet history GC archived-at: Fri, 09 Sep 2016 19:13:46 -0000 http://git-wip-us.apache.org/repos/asf/kudu/blob/be719edc/src/kudu/tablet/tablet_history_gc-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tablet/tablet_history_gc-test.cc b/src/kudu/tablet/tablet_history_gc-test.cc new file mode 100644 index 0000000..02a7b3e --- /dev/null +++ b/src/kudu/tablet/tablet_history_gc-test.cc @@ -0,0 +1,511 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include + +#include "kudu/server/hybrid_clock.h" +#include "kudu/tablet/compaction.h" +#include "kudu/tablet/mvcc.h" +#include "kudu/tablet/tablet.h" +#include "kudu/tablet/tablet_metrics.h" +#include "kudu/tablet/tablet-test-base.h" + +DECLARE_int32(tablet_history_max_age_sec); +DECLARE_bool(use_mock_wall_clock); + +using kudu::server::HybridClock; + +// Specify row regex to match on. Empty string means don't match anything. +#define ASSERT_DEBUG_DUMP_ROWS_MATCH(pattern) do { \ + const std::string& _pat = (pattern); \ + vector _rows; \ + ASSERT_OK(tablet()->DebugDump(&_rows)); \ + /* Ignore the non-data (formattting) lines in the output. */ \ + std::string _base_pat = R"(^Dumping|^-|^MRS|^RowSet)"; \ + if (!_pat.empty()) _base_pat += "|"; \ + ASSERT_STRINGS_ALL_MATCH(_rows, _base_pat + _pat); \ +} while (0) + +namespace kudu { +namespace tablet { + +class TabletHistoryGcTest : public TabletTestBase> { + public: + typedef TabletTestBase> Superclass; + + TabletHistoryGcTest() + : Superclass(TabletHarness::Options::HYBRID_CLOCK) { + FLAGS_use_mock_wall_clock = true; + } + + virtual void SetUp() OVERRIDE { + NO_FATALS(TabletTestBase>::SetUp()); + // Mock clock defaults to 0 and this screws up the AHM calculation which ends up negative. + down_cast(clock())->SetMockClockWallTimeForTests(GetCurrentTimeMicros()); + } + + protected: + enum ToFlush { + FLUSH, + NO_FLUSH + }; + + void InsertOriginalRows(int num_rowsets, uint64_t rows_per_rowset); + void AddTimeToHybridClock(MonoDelta delta) { + uint64_t now = HybridClock::GetPhysicalValueMicros(clock()->Now()); + uint64_t new_time = now + delta.ToMicroseconds(); + down_cast(clock())->SetMockClockWallTimeForTests(new_time); + } + + int64_t TotalNumRows() const { return num_rowsets_ * rows_per_rowset_; } + + TestRowVerifier GenRowsEqualVerifier(int32_t expected_val) { + return [=](int32_t key, int32_t val) -> bool { return val == expected_val; }; + } + + const TestRowVerifier kRowsEqual0 = GenRowsEqualVerifier(0); + const TestRowVerifier kRowsEqual1 = GenRowsEqualVerifier(1); + const TestRowVerifier kRowsEqual2 = GenRowsEqualVerifier(2); + + const int kStartRow = 0; + int num_rowsets_ = 3; + int rows_per_rowset_ = 300; +}; + +void TabletHistoryGcTest::InsertOriginalRows(int num_rowsets, uint64_t rows_per_rowset) { + ClampRowCount(num_rowsets * rows_per_rowset); + for (int rowset_id = 0; rowset_id < num_rowsets; rowset_id++) { + InsertTestRows(rowset_id * rows_per_rowset, rows_per_rowset, 0); + ASSERT_OK(tablet()->Flush()); + } + ASSERT_EQ(num_rowsets, tablet()->num_rowsets()); +} + +// Test that we do not generate undos for redo operations that are older than +// the AHM during major delta compaction. +TEST_F(TabletHistoryGcTest, TestNoGenerateUndoOnMajorDeltaCompaction) { + FLAGS_tablet_history_max_age_sec = 1; // Keep history for 1 second. + + NO_FATALS(InsertOriginalRows(num_rowsets_, rows_per_rowset_)); + NO_FATALS(VerifyTestRowsWithVerifier(kStartRow, TotalNumRows(), kRowsEqual0)); + Timestamp time_after_insert = clock()->Now(); + + // Timestamps recorded after each round of updates. + Timestamp post_update_ts[2]; + + // Mutate all of the rows, setting val=1. Then again for val=2. + LocalTabletWriter writer(tablet().get(), &client_schema_); + for (int val = 1; val <= 2; val++) { + for (int row_idx = 0; row_idx < TotalNumRows(); row_idx++) { + ASSERT_OK(UpdateTestRow(&writer, row_idx, val)); + } + // We must flush the DMS before major compaction can operate on these REDOs. + for (int i = 0; i < num_rowsets_; i++) { + tablet()->FlushBiggestDMS(); + } + post_update_ts[val - 1] = clock()->Now(); + } + + // Move the AHM beyond our mutations, which are represented as REDOs. + NO_FATALS(AddTimeToHybridClock(MonoDelta::FromSeconds(2))); + + // Current-time reads should give us 2, but reads from the past should give + // us 0 or 1. + NO_FATALS(VerifyTestRowsWithTimestampAndVerifier(kStartRow, TotalNumRows(), + time_after_insert, kRowsEqual0)); + NO_FATALS(VerifyTestRowsWithTimestampAndVerifier(kStartRow, TotalNumRows(), + post_update_ts[0], kRowsEqual1)); + NO_FATALS(VerifyTestRowsWithTimestampAndVerifier(kStartRow, TotalNumRows(), + post_update_ts[1], kRowsEqual2)); + NO_FATALS(VerifyTestRowsWithVerifier(kStartRow, TotalNumRows(), kRowsEqual2)); + + // Run major delta compaction. + for (int i = 0; i < num_rowsets_; i++) { + ASSERT_OK(tablet()->CompactWorstDeltas(RowSet::MAJOR_DELTA_COMPACTION)); + } + + // Now, we should have base data = 2 with no other historical values. + // Major delta compaction will not remove UNDOs, so we expect a single UNDO DELETE as well. + ASSERT_DEBUG_DUMP_ROWS_MATCH(R"(int32 val=2\) Undos: \[@[[:digit:]]+\(DELETE\)\] Redos: \[\]$)"); +} + +// Test that major delta compaction works when run on a subset of columns: +// 1. Insert rows and flush to DiskRowSets. +// 2. Mutate two columns. +// 3. Move time forward. +// 4. Run major delta compaction on a single column. +// 5. Make sure we don't lose anything unexpected. +TEST_F(TabletHistoryGcTest, TestMajorDeltaCompactionOnSubsetOfColumns) { + FLAGS_tablet_history_max_age_sec = 100; + + num_rowsets_ = 3; + rows_per_rowset_ = 20; + + NO_FATALS(InsertOriginalRows(num_rowsets_, rows_per_rowset_)); + NO_FATALS(VerifyTestRowsWithVerifier(kStartRow, TotalNumRows(), kRowsEqual0)); + + LocalTabletWriter writer(tablet().get(), &client_schema_); + for (int32_t row_key = 0; row_key < TotalNumRows(); row_key++) { + KuduPartialRow row(&client_schema_); + setup_.BuildRowKey(&row, row_key); + ASSERT_OK_FAST(row.SetInt32(1, 1)); + ASSERT_OK_FAST(row.SetInt32(2, 2)); + ASSERT_OK_FAST(writer.Update(row)); + } + for (int i = 0; i < num_rowsets_; i++) { + tablet()->FlushBiggestDMS(); + } + + NO_FATALS(AddTimeToHybridClock(MonoDelta::FromSeconds(200))); + + vector> rowsets; + tablet()->GetRowSetsForTests(&rowsets); + for (int i = 0; i < num_rowsets_; i++) { + DiskRowSet* drs = down_cast(rowsets[i].get()); + vector col_ids_to_compact = { schema_.column_id(2) }; + ASSERT_OK(drs->MajorCompactDeltaStoresWithColumnIds(col_ids_to_compact, + tablet()->GetHistoryGcOpts())); + } + + ASSERT_DEBUG_DUMP_ROWS_MATCH(R"(int32 val=2\) Undos: \[@[[:digit:]]+\(DELETE\)\] )" + R"(Redos: \[@[[:digit:]]+\(SET key_idx=1\)\]$)"); + + vector rows; + ASSERT_OK(IterateToStringList(&rows)); + ASSERT_EQ(TotalNumRows(), rows.size()); +} + +// Tests the following two MRS flush scenarios: +// 1. Verify that no UNDO is generated after inserting a row into the MRS, +// waiting for the AHM to pass, then flushing the MRS. +// 2. Same as #1 but delete the inserted row from the MRS before waiting for +// the AHM to pass. +TEST_F(TabletHistoryGcTest, TestNoGenerateUndoOnMRSFlush) { + FLAGS_tablet_history_max_age_sec = 100; + + Timestamp time_before_insert = clock()->Now(); + LocalTabletWriter writer(tablet().get(), &client_schema_); + for (int32_t i = kStartRow; i < TotalNumRows(); i++) { + ASSERT_OK(InsertTestRow(&writer, i, 0)); + } + Timestamp time_after_insert = clock()->Now(); + for (int32_t i = kStartRow; i < TotalNumRows(); i++) { + ASSERT_OK(DeleteTestRow(&writer, i)); + } + Timestamp time_after_delete = clock()->Now(); + + // Move the clock. + NO_FATALS(AddTimeToHybridClock(MonoDelta::FromSeconds(200))); + + NO_FATALS(VerifyTestRowsWithTimestampAndVerifier(kStartRow, 0, + time_before_insert, boost::none)); + NO_FATALS(VerifyTestRowsWithTimestampAndVerifier(kStartRow, TotalNumRows(), + time_after_insert, kRowsEqual0)); + NO_FATALS(VerifyTestRowsWithTimestampAndVerifier(kStartRow, 0, + time_after_delete, boost::none)); + + // Now flush the MRS. No trace should remain after this. + ASSERT_OK(tablet()->Flush()); + ASSERT_DEBUG_DUMP_ROWS_MATCH(""); + + for (const auto& rsmd : tablet()->metadata()->rowsets()) { + ASSERT_EQ(0, rsmd->undo_delta_blocks().size()); + } + ASSERT_EQ(0, tablet()->EstimateOnDiskSize()); + + // Now check the same thing (flush not generating an UNDO), but without the + // delete following the insert. We do it with a single row. + + ASSERT_OK(InsertTestRow(&writer, kStartRow, 0)); + NO_FATALS(AddTimeToHybridClock(MonoDelta::FromSeconds(200))); + ASSERT_OK(tablet()->Flush()); + // There should be no undo blocks, despite flushing an insert. + for (const auto& rsmd : tablet()->metadata()->rowsets()) { + ASSERT_EQ(0, rsmd->undo_delta_blocks().size()); + } + NO_FATALS(VerifyTestRowsWithTimestampAndVerifier(kStartRow, 1, Timestamp(0), kRowsEqual0)); + ASSERT_DEBUG_DUMP_ROWS_MATCH(R"(int32 val=0\) Undos: \[\] Redos: \[\]$)"); +} + +// Test that undos get GCed on a merge compaction. +// In this test, we GC the UNDO that undoes the insert. +TEST_F(TabletHistoryGcTest, TestUndoGCOnMergeCompaction) { + FLAGS_tablet_history_max_age_sec = 1; // Keep history for 1 second. + + Timestamp time_before_insert = clock()->Now(); + NO_FATALS(InsertOriginalRows(num_rowsets_, rows_per_rowset_)); + NO_FATALS(VerifyTestRowsWithVerifier(kStartRow, TotalNumRows(), kRowsEqual0)); + + // The earliest thing we can see is an empty tablet. + NO_FATALS(VerifyTestRowsWithTimestampAndVerifier(kStartRow, 0, time_before_insert, boost::none)); + + // Move the clock so the insert is prior to the AHM, then compact. + NO_FATALS(AddTimeToHybridClock(MonoDelta::FromSeconds(2))); + ASSERT_OK(tablet()->Compact(Tablet::FORCE_COMPACT_ALL)); + + // Now the only thing we can see is the base data. + NO_FATALS(VerifyTestRowsWithTimestampAndVerifier(kStartRow, TotalNumRows(), time_before_insert, + kRowsEqual0)); + ASSERT_DEBUG_DUMP_ROWS_MATCH(R"(int32 val=0\) Undos: \[\] Redos: \[\]$)"); +} + +// Test that we GC the history and existence of entire deleted rows on a merge compaction. +TEST_F(TabletHistoryGcTest, TestRowRemovalGCOnMergeCompaction) { + FLAGS_tablet_history_max_age_sec = 100; // Keep history for 100 seconds. + + NO_FATALS(InsertOriginalRows(num_rowsets_, rows_per_rowset_)); + NO_FATALS(VerifyTestRowsWithVerifier(kStartRow, TotalNumRows(), kRowsEqual0)); + + Timestamp prev_time = clock()->Now(); + + NO_FATALS(AddTimeToHybridClock(MonoDelta::FromSeconds(200))); + + // Delete all of the rows in the tablet. + LocalTabletWriter writer(tablet().get(), &client_schema_); + for (int row_idx = 0; row_idx < TotalNumRows(); row_idx++) { + ASSERT_OK(DeleteTestRow(&writer, row_idx)); + } + ASSERT_OK(tablet()->Flush()); + ASSERT_DEBUG_DUMP_ROWS_MATCH( + R"(int32 val=0\) Undos: \[@[[:digit:]]+\(DELETE\)\] Redos: \[@[[:digit:]]+\(DELETE\)\]$)"); + + // Compaction at this time will only remove the initial UNDO records. The + // DELETE REDOs are too recent. + ASSERT_OK(tablet()->Compact(Tablet::FORCE_COMPACT_ALL)); + ASSERT_DEBUG_DUMP_ROWS_MATCH(R"(int32 val=0\) Undos: \[\] Redos: \[@[[:digit:]]+\(DELETE\)\]$)"); + + // Move the AHM so that the delete is now prior to it. + NO_FATALS(AddTimeToHybridClock(MonoDelta::FromSeconds(200))); + + // Now that even the deletion is prior to the AHM, all of the on-disk data + // will be GCed. + ASSERT_OK(tablet()->Compact(Tablet::FORCE_COMPACT_ALL)); + ASSERT_DEBUG_DUMP_ROWS_MATCH(""); + NO_FATALS(VerifyTestRowsWithTimestampAndVerifier(kStartRow, 0, prev_time, boost::none)); + ASSERT_EQ(0, tablet()->EstimateOnDiskSize()); +} + +// Test that we don't over-aggressively GC history prior to the AHM. +TEST_F(TabletHistoryGcTest, TestNoUndoGCUntilAncientHistoryMark) { + FLAGS_tablet_history_max_age_sec = 1000; // 1000 seconds before we GC history. + + NO_FATALS(InsertOriginalRows(num_rowsets_, rows_per_rowset_)); + + Timestamp prev_time = clock()->Now(); + NO_FATALS(AddTimeToHybridClock(MonoDelta::FromSeconds(2))); + + // Mutate all of the rows. + LocalTabletWriter writer(tablet().get(), &client_schema_); + for (int row_idx = 0; row_idx < TotalNumRows(); row_idx++) { + SCOPED_TRACE(Substitute("Row index: $0", row_idx)); + ASSERT_OK(UpdateTestRow(&writer, row_idx, 1)); + } + + // Current-time reads should give us 1, but reads from the past should give us 0. + NO_FATALS(VerifyTestRowsWithVerifier(kStartRow, TotalNumRows(), kRowsEqual1)); + NO_FATALS(VerifyTestRowsWithTimestampAndVerifier(kStartRow, TotalNumRows(), prev_time, + kRowsEqual0)); + + for (int i = 0; i < num_rowsets_; i++) { + ASSERT_OK(tablet()->CompactWorstDeltas(RowSet::MINOR_DELTA_COMPACTION)); + ASSERT_OK(tablet()->CompactWorstDeltas(RowSet::MAJOR_DELTA_COMPACTION)); + } + ASSERT_OK(tablet()->Compact(Tablet::FORCE_COMPACT_ALL)); + + // Still read 0 from the past. + NO_FATALS(VerifyTestRowsWithTimestampAndVerifier(kStartRow, TotalNumRows(), prev_time, + kRowsEqual0)); + ASSERT_DEBUG_DUMP_ROWS_MATCH(R"(int32 val=1\) Undos: \[@[[:digit:]]+\(SET val=0\), )" + R"(@[[:digit:]]+\(DELETE\)\] Redos: \[\]$)"); +} + +// Test that "ghost" rows (deleted on one rowset, reinserted on another) don't +// get revived after history GC. +TEST_F(TabletHistoryGcTest, TestGhostRowsNotRevived) { + FLAGS_tablet_history_max_age_sec = 100; + + LocalTabletWriter writer(tablet().get(), &client_schema_); + for (int i = 0; i <= 2; i++) { + ASSERT_OK(InsertTestRow(&writer, 0, i)); + ASSERT_OK(DeleteTestRow(&writer, 0)); + ASSERT_OK(tablet()->Flush()); + } + + // Create one more rowset on disk which has just an INSERT (ie a non-ghost row). + ASSERT_OK(InsertTestRow(&writer, 0, 3)); + ASSERT_OK(tablet()->Flush()); + + // Move the clock, then compact. This should result in a rowset with just one + // row in it. + NO_FATALS(AddTimeToHybridClock(MonoDelta::FromSeconds(200))); + ASSERT_OK(tablet()->Compact(Tablet::FORCE_COMPACT_ALL)); + + // We should end up with a single row as base data. + NO_FATALS(VerifyTestRows(0, 1)); + ASSERT_DEBUG_DUMP_ROWS_MATCH(R"(int32 val=3\) Undos: \[\] Redos: \[\])"); +} + +// Test to ensure that nothing bad happens when we partially GC different rows +// in a rowset. We delete alternating keys to end up with a mix of GCed and +// non-GCed rows in each rowset. +TEST_F(TabletHistoryGcTest, TestGcOnAlternatingRows) { + FLAGS_tablet_history_max_age_sec = 100; + num_rowsets_ = 3; + rows_per_rowset_ = 5; + + LocalTabletWriter writer(tablet().get(), &client_schema_); + for (int rowset_id = 0; rowset_id < num_rowsets_; rowset_id++) { + for (int i = 0; i < rows_per_rowset_; i++) { + int32_t row_key = rowset_id * rows_per_rowset_ + i; + ASSERT_OK(InsertTestRow(&writer, row_key, 0)); + } + ASSERT_OK(tablet()->Flush()); + } + + // Delete all the odd rows. + for (int32_t row_key = 1; row_key < TotalNumRows(); row_key += 2) { + ASSERT_OK(DeleteTestRow(&writer, row_key)); + } + + // Move the clock and compact. We should end up with even rows. + NO_FATALS(AddTimeToHybridClock(MonoDelta::FromSeconds(200))); + ASSERT_OK(tablet()->Compact(Tablet::FORCE_COMPACT_ALL)); + + vector rows; + ASSERT_OK(IterateToStringList(&rows)); + ASSERT_EQ(TotalNumRows() / 2 + 1, rows.size()); + + // Even row keys are assigned negative values in this test framework and so + // end up sorted negatively. + std::reverse(rows.begin(), rows.end()); + + int i = 0; + for (int32_t row_key = 0; row_key < TotalNumRows(); row_key += 2) { + ASSERT_STR_CONTAINS(rows[i], Substitute("int64 key=$0, int32 key_idx=$1, int32 val=0", + -1 * row_key, row_key)); + i++; + } +} + +// Ensure that ReupdateMissedDeltas() doesn't reupdate the wrong row. +// 1. Insert rows and flush. +// 2. Delete some rows. +// 3. Move time forward. +// 4. Begin merge compaction. +// 5. Insert some of the deleted rows after phase 1 snapshot is written but before phase 2. +// 6. Update some of the rows in-between the deleted rows. +// 7. Ensure that the rows all look right according to what we expect. +// +// This test uses the following pattern. Rows with even keys are deleted, rows +// with odd keys are used in the test. The following takes place: +// - Rows 1 and 5 are inserted with values equaling their keys and are not mutated. +// - Rows 3 and 9 are inserted and then updated with values equaling their keys * 10 + 1. +// - Row 7 is deleted and then reinserted, as well as updated using successive values. +TEST_F(TabletHistoryGcTest, TestGcWithConcurrentCompaction) { + FLAGS_tablet_history_max_age_sec = 100; + + class MyCommonHooks : public Tablet::FlushCompactCommonHooks { + public: + explicit MyCommonHooks(TabletHistoryGcTest* test) + : test_(test), + offset_(0) { + } + + Status PostWriteSnapshot() OVERRIDE { + LocalTabletWriter writer(test_->tablet().get(), &test_->client_schema()); + int offset = offset_.load(std::memory_order_acquire); + // Update our reinserted row. + CHECK_OK(test_->UpdateTestRow(&writer, 7, 73 + offset)); + + // Also insert and update other rows after the flush. + CHECK_OK(test_->UpdateTestRow(&writer, 3, 30 + offset)); + CHECK_OK(test_->UpdateTestRow(&writer, 9, 90 + offset)); + return Status::OK(); + } + + void set_offset(int offset) { + offset_.store(offset, std::memory_order_release); + } + + private: + TabletHistoryGcTest* const test_; + std::atomic offset_; + }; + + std::shared_ptr hooks = std::make_shared(this); + tablet()->SetFlushCompactCommonHooksForTests(hooks); + + LocalTabletWriter writer(tablet().get(), &client_schema_); + for (int i = 0; i < 10; i++) { + ASSERT_OK(InsertTestRow(&writer, i, i)); + } + // Also generate a reinsert. + ASSERT_OK(DeleteTestRow(&writer, 7)); + NO_FATALS(AddTimeToHybridClock(MonoDelta::FromSeconds(200))); + ASSERT_OK(InsertTestRow(&writer, 7, 71)); + CHECK_OK(UpdateTestRow(&writer, 7, 72)); + + // Flush the rowset. + ASSERT_OK(tablet()->Flush()); + + // Delete every even row. + for (int i = 0; i < 10; i += 2) { + ASSERT_OK(DeleteTestRow(&writer, i)); + } + + NO_FATALS(AddTimeToHybridClock(MonoDelta::FromSeconds(200))); + + hooks->set_offset(1); + ASSERT_OK(tablet()->Compact(Tablet::FORCE_COMPACT_ALL)); + tablet()->SetFlushCompactCommonHooksForTests(nullptr); + + vector rows; + ASSERT_OK(IterateToStringList(&rows)); + + if (VLOG_IS_ON(2)) { + for (const string& r : rows) { + VLOG(2) << r; + } + } + + vector expected_rows = { 1, 3, 5, 7, 9 }; + for (int i = 0; i < expected_rows.size(); i++) { + int32_t key = expected_rows[i]; + switch (key) { + case 1: + case 5: + ASSERT_STR_CONTAINS(rows[i], Substitute("int32 key_idx=$0, int32 val=$1", + key, key)); + break; + case 3: + case 9: + ASSERT_STR_CONTAINS(rows[i], Substitute("int32 key_idx=$0, int32 val=$1", + key, key * 10 + 1)); + break; + case 7: + ASSERT_STR_CONTAINS(rows[i], Substitute("int32 key_idx=$0, int32 val=$1", + key, key * 10 + 4)); + break; + default: + break; + } + } +} + +} // namespace tablet +} // namespace kudu http://git-wip-us.apache.org/repos/asf/kudu/blob/be719edc/src/kudu/tserver/tablet_service.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc index 34013a8..2a5c7cd 100644 --- a/src/kudu/tserver/tablet_service.cc +++ b/src/kudu/tserver/tablet_service.cc @@ -36,6 +36,7 @@ #include "kudu/rpc/rpc_context.h" #include "kudu/rpc/rpc_sidecar.h" #include "kudu/server/hybrid_clock.h" +#include "kudu/tablet/compaction.h" #include "kudu/tablet/metadata.pb.h" #include "kudu/tablet/tablet_bootstrap.h" #include "kudu/tablet/tablet_metrics.h" @@ -82,6 +83,41 @@ DEFINE_int32(scanner_inject_latency_on_each_batch_ms, 0, TAG_FLAG(scanner_inject_latency_on_each_batch_ms, unsafe); DECLARE_int32(memory_limit_warn_threshold_percentage); +DECLARE_int32(tablet_history_max_age_sec); + +using google::protobuf::RepeatedPtrField; +using kudu::consensus::ChangeConfigRequestPB; +using kudu::consensus::ChangeConfigResponsePB; +using kudu::consensus::CONSENSUS_CONFIG_ACTIVE; +using kudu::consensus::CONSENSUS_CONFIG_COMMITTED; +using kudu::consensus::Consensus; +using kudu::consensus::ConsensusConfigType; +using kudu::consensus::ConsensusRequestPB; +using kudu::consensus::ConsensusResponsePB; +using kudu::consensus::GetLastOpIdRequestPB; +using kudu::consensus::GetNodeInstanceRequestPB; +using kudu::consensus::GetNodeInstanceResponsePB; +using kudu::consensus::LeaderStepDownRequestPB; +using kudu::consensus::LeaderStepDownResponsePB; +using kudu::consensus::RunLeaderElectionRequestPB; +using kudu::consensus::RunLeaderElectionResponsePB; +using kudu::consensus::StartTabletCopyRequestPB; +using kudu::consensus::StartTabletCopyResponsePB; +using kudu::consensus::VoteRequestPB; +using kudu::consensus::VoteResponsePB; +using kudu::rpc::ResultTracker; +using kudu::rpc::RpcContext; +using kudu::server::HybridClock; +using kudu::tablet::AlterSchemaTransactionState; +using kudu::tablet::Tablet; +using kudu::tablet::TabletPeer; +using kudu::tablet::TabletStatusPB; +using kudu::tablet::TransactionCompletionCallback; +using kudu::tablet::WriteTransactionState; +using std::shared_ptr; +using std::unique_ptr; +using std::vector; +using strings::Substitute; namespace kudu { namespace cfile { @@ -93,40 +129,6 @@ extern const char* CFILE_CACHE_HIT_BYTES_METRIC_NAME; namespace kudu { namespace tserver { -using consensus::ChangeConfigRequestPB; -using consensus::ChangeConfigResponsePB; -using consensus::CONSENSUS_CONFIG_ACTIVE; -using consensus::CONSENSUS_CONFIG_COMMITTED; -using consensus::Consensus; -using consensus::ConsensusConfigType; -using consensus::ConsensusRequestPB; -using consensus::ConsensusResponsePB; -using consensus::GetLastOpIdRequestPB; -using consensus::GetNodeInstanceRequestPB; -using consensus::GetNodeInstanceResponsePB; -using consensus::LeaderStepDownRequestPB; -using consensus::LeaderStepDownResponsePB; -using consensus::RunLeaderElectionRequestPB; -using consensus::RunLeaderElectionResponsePB; -using consensus::StartTabletCopyRequestPB; -using consensus::StartTabletCopyResponsePB; -using consensus::VoteRequestPB; -using consensus::VoteResponsePB; - -using google::protobuf::RepeatedPtrField; -using rpc::ResultTracker; -using rpc::RpcContext; -using std::shared_ptr; -using std::unique_ptr; -using std::vector; -using strings::Substitute; -using tablet::AlterSchemaTransactionState; -using tablet::Tablet; -using tablet::TabletPeer; -using tablet::TabletStatusPB; -using tablet::TransactionCompletionCallback; -using tablet::WriteTransactionState; - namespace { // Lookup the given tablet, ensuring that it both exists and is RUNNING. @@ -1041,7 +1043,7 @@ void TabletServiceImpl::Scan(const ScanRequestPB* req, ScanResultCopier collector(&data, rows_data.get(), indirect_data.get()); bool has_more_results = false; - TabletServerErrorPB::Code error_code; + TabletServerErrorPB::Code error_code = TabletServerErrorPB::UNKNOWN_ERROR; if (req->has_new_scan_request()) { const NewScanRequestPB& scan_pb = req->new_scan_request(); scoped_refptr tablet_peer; @@ -1509,6 +1511,37 @@ Status TabletServiceImpl::HandleNewScanRequest(TabletPeer* tablet_peer, return s; } + // If this is a snapshot scan and the user specified a specific timestamp to + // scan at, then check that we are not attempting to scan at a time earlier + // than the ancient history mark. Only perform this check if tablet history + // GC is enabled. + // + // TODO: This validation essentially prohibits scans with READ_AT_SNAPSHOT + // when history_max_age is set to zero. There is a tablet history GC related + // race when the history max age is set to very low, or zero. Imagine a case + // where a scan was started and READ_AT_SNAPSHOT was specified without + // specifying a snapshot timestamp, and --tablet_history_max_age_sec=0. The + // above code path will select the latest timestamp (under a lock) prior to + // calling RowIterator::Init(), which actually opens the blocks. That means + // that there is an opportunity in between those two calls for tablet history + // GC to kick in and delete some history. In fact, we may easily not actually + // end up with a valid snapshot in that case. It would be more correct to + // initialize the row iterator and then select the latest timestamp + // represented by those open files in that case. + Timestamp ancient_history_mark; + tablet::HistoryGcOpts history_gc_opts = tablet->GetHistoryGcOpts(); + if (scan_pb.read_mode() == READ_AT_SNAPSHOT && + history_gc_opts.IsAncientHistory(*snap_timestamp)) { + // Now that we have initialized our row iterator at a snapshot, return an + // error if the snapshot timestamp was prior to the ancient history mark. + // We have to check after we open the iterator in order to avoid a TOCTOU + // error. + *error_code = TabletServerErrorPB::INVALID_SNAPSHOT; + return Status::InvalidArgument("Snapshot timestamp is earlier than the ancient history mark", + "consider increasing the value of the configuration parameter " + "--tablet_history_max_age_sec"); + } + *has_more_results = iter->HasNext(); TRACE("has_more: $0", *has_more_results); if (!*has_more_results) {