kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject [2/2] incubator-kudu git commit: KUDU-969. Fix handling of crashes just before tablet metadata flush
Date Fri, 04 Mar 2016 03:53:03 GMT
KUDU-969. Fix handling of crashes just before tablet metadata flush

This commit fixes a long-standing bug with the following race:

  - a compaction is in "duplicating" phase (i.e. updates are written to both
    the input and output rowsets)
  - a update arrives which is duplicated to both input and output rowsets
  - we crash before writing the new metadata

In bootstrap, we see that the update was written to one rowset (i.e. the output
rowset) whose ID is not present in our metadata. We assumed incorrectly that
this meant the rowset had been written, and then compacted away, and therefore
considered it a "flushed" store. However, in fact, it really should be
considered an "unflushed" store.

Really, we cannot distinguish whether this update was flushed and later compacted
away, or if it was never flushed at all. So, this patch removes usage of the term
'unflushed' from the bootstrap code (see below for more details).

Despite having somewhat flawed reasoning, it turns out that most of the
bootstrap logic ended up being correct, anyway. In the case above, despite
incorrectly considering the edit "flushed", the other duplicated target (the
input rowset) was considered "unflushed", so we correctly replayed the edit.

However, we did not handle this case correctly in our sanity checks regarding
pending commits at the end of bootstrap. In particular, we checked that a
pending commit did not refer to any store which was considered "flushed". In
the above scenario, the duplicated "output" was designated as "flushed" and
caused the bootstrap process to flag a corruption.

The bulk of this patch is made up of a new fault point and integration test
which exercises crashes just prior to flushing tablet metadata. This ensures
that these scenarios in bootstrap are well covered, and that the resulting
tablet does not diverge from another replica which did not have a fault
injected.

The changes to the bootstrap code itself are actually not so large:

- First, we stop using the term 'unflushed' to refer to stores. Instead, we
  call a store 'active' if it was a current in-memory store at the time of
  crash according to the metadata. A store is considered inactive if either
    (a) we know it was flushed to disk (eg a DMS ID lower than the durable DMS
        ID for a given DRS), or:
    (b) it is a DRS ID which is not currently represented in the tablet metadata.
        This could either because the DRS was flushed and compacted away, or
        because it was a DRS that was in the process of being written when the
        server crashed.

- Because of the above terminology improvement, the patch renames
  'WasStoreAlreadyFlushed' to 'IsMemStoreActive' (and correspondingly inverts its
  return value). The logic itself is unchanged, though the comments are expanded.

- FilterMutate() and FilterInsert() are changed correspondingly to use the new
  method (WasStoreAlreadyFlushed() changed to !IsMemStoreActive())

- Now that we better understand these code paths, I replaced a section of TODO
  in FilterMutate() which previously had a DFATAL with a better comment and a
  Corruption status return.

- The checks for pending commits at the end of bootstrap were changed per the
  description above: we only need to ensure that _one_ target of each pending
  commit is active, not that _all_ targets are active.

Without the changes to tablet_bootstrap.cc, the test fails frequently with
errors like:

FAILED: Corruption: Failed log replay. Reason: CommitMsg was pending but it
    referred to flushed stores. ...

With the change, it passed 500/500 iterations:
http://dist-test.cloudera.org/job?job_id=todd.1456816652.10895

Change-Id: I1d996a470b9a7957ad3eb7fe02f22f85c32b5f9d
Reviewed-on: http://gerrit.cloudera.org:8080/2333
Tested-by: Kudu Jenkins
Reviewed-by: David Ribeiro Alves <david.alves@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/826b7720
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/826b7720
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/826b7720

Branch: refs/heads/master
Commit: 826b7720f8998796b0a6b84ca6ab8f96aa8ba603
Parents: eb03604
Author: Todd Lipcon <todd@apache.org>
Authored: Fri Feb 26 18:03:42 2016 -0800
Committer: Todd Lipcon <todd@apache.org>
Committed: Fri Mar 4 03:51:01 2016 +0000

----------------------------------------------------------------------
 src/kudu/integration-tests/delete_table-test.cc |   9 +-
 .../integration-tests/external_mini_cluster.cc  |  14 ++
 .../integration-tests/external_mini_cluster.h   |   4 +
 src/kudu/integration-tests/ts_recovery-itest.cc | 187 +++++++++++++++++++
 src/kudu/tablet/tablet.cc                       |  19 ++
 src/kudu/tablet/tablet_bootstrap.cc             | 138 +++++++-------
 6 files changed, 298 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/826b7720/src/kudu/integration-tests/delete_table-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/delete_table-test.cc b/src/kudu/integration-tests/delete_table-test.cc
index 49144d9..d139caa 100644
--- a/src/kudu/integration-tests/delete_table-test.cc
+++ b/src/kudu/integration-tests/delete_table-test.cc
@@ -188,12 +188,9 @@ void DeleteTableTest::WaitForTabletDeletedOnTS(int index,
 }
 
 void DeleteTableTest::WaitForTSToCrash(int index) {
-  ExternalTabletServer* ts = cluster_->tablet_server(index);
-  for (int i = 0; i < 6000; i++) { // wait 60sec
-    if (!ts->IsProcessAlive()) return;
-    SleepFor(MonoDelta::FromMilliseconds(10));
-  }
-  FAIL() << "TS " << ts->instance_id().permanent_uuid() << " did not
crash!";
+  auto ts = cluster_->tablet_server(index);
+  SCOPED_TRACE(ts->instance_id().permanent_uuid());
+  ASSERT_OK(ts->WaitForCrash(MonoDelta::FromSeconds(60)));
 }
 
 void DeleteTableTest::WaitForAllTSToCrash() {

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/826b7720/src/kudu/integration-tests/external_mini_cluster.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/external_mini_cluster.cc b/src/kudu/integration-tests/external_mini_cluster.cc
index 2f15edb..d358d1c 100644
--- a/src/kudu/integration-tests/external_mini_cluster.cc
+++ b/src/kudu/integration-tests/external_mini_cluster.cc
@@ -17,6 +17,7 @@
 
 #include "kudu/integration-tests/external_mini_cluster.h"
 
+#include <algorithm>
 #include <gtest/gtest.h>
 #include <memory>
 #include <rapidjson/document.h>
@@ -614,6 +615,19 @@ bool ExternalDaemon::IsProcessAlive() const {
   return s.IsTimedOut();
 }
 
+Status ExternalDaemon::WaitForCrash(const MonoDelta& timeout) const {
+  MonoTime deadline = MonoTime::Now(MonoTime::FINE);
+  deadline.AddDelta(timeout);
+
+  int i = 1;
+  while (MonoTime::Now(MonoTime::FINE).ComesBefore(deadline)) {
+    if (!IsProcessAlive()) return Status::OK();
+    int sleep_ms = std::min(i++ * 10, 200);
+    SleepFor(MonoDelta::FromMilliseconds(sleep_ms));
+  }
+  return Status::TimedOut(Substitute("Process did not crash within $0", timeout.ToString()));
+}
+
 pid_t ExternalDaemon::pid() const {
   return process_->pid();
 }

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/826b7720/src/kudu/integration-tests/external_mini_cluster.h
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/external_mini_cluster.h b/src/kudu/integration-tests/external_mini_cluster.h
index d19fc98..4e06a98 100644
--- a/src/kudu/integration-tests/external_mini_cluster.h
+++ b/src/kudu/integration-tests/external_mini_cluster.h
@@ -308,6 +308,10 @@ class ExternalDaemon : public RefCountedThreadSafe<ExternalDaemon>
{
   // explicitly call Shutdown().
   bool IsProcessAlive() const;
 
+  // Wait for this process to crash, or the given timeout to
+  // elapse. If the process is already crashed, returns immediately.
+  Status WaitForCrash(const MonoDelta& timeout) const;
+
   virtual void Shutdown();
 
   const std::string& data_dir() const { return data_dir_; }

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/826b7720/src/kudu/integration-tests/ts_recovery-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/ts_recovery-itest.cc b/src/kudu/integration-tests/ts_recovery-itest.cc
index 6818dc9..4ab891f 100644
--- a/src/kudu/integration-tests/ts_recovery-itest.cc
+++ b/src/kudu/integration-tests/ts_recovery-itest.cc
@@ -15,9 +15,12 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include "kudu/client/client.h"
 #include "kudu/integration-tests/cluster_verifier.h"
 #include "kudu/integration-tests/external_mini_cluster.h"
 #include "kudu/integration-tests/test_workload.h"
+#include "kudu/util/random.h"
+#include "kudu/util/random_util.h"
 #include "kudu/util/test_util.h"
 
 #include <string>
@@ -26,6 +29,23 @@ using std::string;
 
 namespace kudu {
 
+using client::KuduClient;
+using client::KuduClientBuilder;
+using client::KuduInsert;
+using client::KuduSession;
+using client::KuduTable;
+using client::KuduUpdate;
+using client::sp::shared_ptr;
+
+namespace {
+// Generate a row key such that an increasing sequence (0...N) ends up spreading writes
+// across the key space as several sequential streams rather than a single sequential
+// sequence.
+int IntToKey(int i) {
+  return 100000000 * (i % 3) + i;
+}
+} // anonymous namespace
+
 class TsRecoveryITest : public KuduTest {
  public:
   virtual void TearDown() OVERRIDE {
@@ -135,4 +155,171 @@ TEST_F(TsRecoveryITest, TestCrashDuringLogReplay) {
                                        MonoDelta::FromSeconds(30)));
 }
 
+// A set of threads which pick rows which are known to exist in the table
+// and issue random updates against them.
+class UpdaterThreads {
+ public:
+  static const int kNumThreads = 4;
+
+  // 'inserted' is an atomic integer which stores the number of rows
+  // which have been inserted up to that point.
+  UpdaterThreads(AtomicInt<int32_t>* inserted,
+                 const shared_ptr<KuduClient>& client,
+                 const shared_ptr<KuduTable>& table)
+    : should_run_(false),
+      inserted_(inserted),
+      client_(client),
+      table_(table) {
+  }
+
+  // Start running the updater threads.
+  void Start() {
+    CHECK(!should_run_.Load());
+    should_run_.Store(true);
+    threads_.resize(kNumThreads);
+    for (int i = 0; i < threads_.size(); i++) {
+      CHECK_OK(kudu::Thread::Create("test", "updater",
+                                    &UpdaterThreads::Run, this,
+                                    &threads_[i]));
+    }
+  }
+
+  // Stop running the updater threads, and wait for them to exit.
+  void StopAndJoin() {
+    CHECK(should_run_.Load());
+    should_run_.Store(false);
+
+    for (const auto& t : threads_) {
+      t->Join();
+    }
+    threads_.clear();
+  }
+
+ protected:
+  void Run() {
+    Random rng(GetRandomSeed32());
+    shared_ptr<KuduSession> session = client_->NewSession();
+    session->SetTimeoutMillis(2000);
+    CHECK_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH));
+    while (should_run_.Load()) {
+      int i = inserted_->Load();
+      if (i == 0) continue;
+
+      gscoped_ptr<KuduUpdate> up(table_->NewUpdate());
+      CHECK_OK(up->mutable_row()->SetInt32("key", IntToKey(rng.Uniform(i) + 1)));
+      CHECK_OK(up->mutable_row()->SetInt32("int_val", rng.Next32()));
+      CHECK_OK(session->Apply(up.release()));
+      // The server might crash due to a compaction while we're still updating.
+      // That's OK - we expect the main thread to shut us down quickly.
+      WARN_NOT_OK(session->Flush(), "failed to flush updates");
+    }
+  }
+
+  AtomicBool should_run_;
+  AtomicInt<int32_t>* inserted_;
+  shared_ptr<KuduClient> client_;
+  shared_ptr<KuduTable> table_;
+  vector<scoped_refptr<Thread> > threads_;
+};
+
+// Parameterized test which acts as a regression test for KUDU-969.
+//
+// This test is parameterized on the name of a fault point configuration.
+// The fault points exercised crashes right before flushing the tablet metadata
+// during a flush/compaction. Meanwhile, a set of threads hammer rows with
+// a lot of updates. The goal here is to trigger the following race:
+//
+// - a compaction is in "duplicating" phase (i.e. updates are written to both
+//   the input and output rowsets
+// - we crash (due to the fault point) before writing the new metadata
+//
+// This exercises the bootstrap code path which replays duplicated updates
+// in the case where the flush did not complete. Prior to fixing KUDU-969,
+// these updates would be mistakenly considered as "already flushed", despite
+// the fact that they were only written to the input rowset's memory stores, and
+// never hit disk.
+class Kudu969Test : public TsRecoveryITest,
+                    public ::testing::WithParamInterface<const char*> {
+};
+INSTANTIATE_TEST_CASE_P(DifferentFaultPoints,
+                        Kudu969Test,
+                        ::testing::Values("fault_crash_before_flush_tablet_meta_after_compaction",
+                                          "fault_crash_before_flush_tablet_meta_after_flush_mrs"));
+
+TEST_P(Kudu969Test, Test) {
+  if (!AllowSlowTests()) return;
+
+  // We use a replicated cluster here so that the 'REPLICATE' messages
+  // and 'COMMIT' messages are spread out further in time, and it's
+  // more likely to trigger races. We can also verify that the server
+  // with the injected fault recovers to the same state as the other
+  // servers.
+  ExternalMiniClusterOptions opts;
+  opts.num_tablet_servers = 3;
+
+  // Jack up the number of maintenance manager threads to try to trigger
+  // concurrency bugs where a compaction and a flush might be happening
+  // at the same time during the crash.
+  opts.extra_tserver_flags.push_back("--maintenance_manager_num_threads=3");
+  cluster_.reset(new ExternalMiniCluster(opts));
+  ASSERT_OK(cluster_->Start());
+
+  // Set a small flush threshold so that we flush a lot (causing more compactions
+  // as well).
+  cluster_->SetFlag(cluster_->tablet_server(0), "flush_threshold_mb", "1");
+
+  // Use TestWorkload to create a table
+  TestWorkload work(cluster_.get());
+  work.set_num_replicas(3);
+  work.Setup();
+
+  // Open the client and table.
+  KuduClientBuilder builder;
+  shared_ptr<KuduClient> client;
+  ASSERT_OK(cluster_->CreateClient(builder, &client));
+  shared_ptr<KuduTable> table;
+  CHECK_OK(client->OpenTable(work.table_name(), &table));
+
+  // Keep track of how many rows have been inserted.
+  AtomicInt<int32_t> inserted(0);
+
+  // Start updater threads.
+  UpdaterThreads updater(&inserted, client, table);
+  updater.Start();
+
+  // Enable the fault point to crash after a few flushes or compactions.
+  auto ts = cluster_->tablet_server(0);
+  cluster_->SetFlag(ts, GetParam(), "0.3");
+
+  // Insert some data.
+  shared_ptr<KuduSession> session = client->NewSession();
+  session->SetTimeoutMillis(1000);
+  CHECK_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH));
+  for (int i = 1; ts->IsProcessAlive(); i++) {
+    gscoped_ptr<KuduInsert> ins(table->NewInsert());
+    ASSERT_OK(ins->mutable_row()->SetInt32("key", IntToKey(i)));
+    ASSERT_OK(ins->mutable_row()->SetInt32("int_val", i));
+    ASSERT_OK(ins->mutable_row()->SetNull("string_val"));
+    ASSERT_OK(session->Apply(ins.release()));
+    if (i % 100 == 0) {
+      WARN_NOT_OK(session->Flush(), "could not flush session");
+      inserted.Store(i);
+    }
+  }
+  LOG(INFO) << "successfully detected TS crash!";
+  updater.StopAndJoin();
+
+  // Restart the TS to trigger bootstrap, and wait for it to start up.
+  ts->Shutdown();
+  ASSERT_OK(ts->Restart());
+  ASSERT_OK(cluster_->WaitForTabletsRunning(ts, MonoDelta::FromSeconds(90)));
+
+  // Verify that the bootstrapped server matches the other replications, which
+  // had no faults.
+  ClusterVerifier v(cluster_.get());
+  v.SetVerificationTimeout(MonoDelta::FromSeconds(30));
+  NO_FATALS(v.CheckCluster());
+}
+
+
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/826b7720/src/kudu/tablet/tablet.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet.cc b/src/kudu/tablet/tablet.cc
index ab5b6f4..ad01767 100644
--- a/src/kudu/tablet/tablet.cc
+++ b/src/kudu/tablet/tablet.cc
@@ -58,6 +58,7 @@
 #include "kudu/util/bloom_filter.h"
 #include "kudu/util/debug/trace_event.h"
 #include "kudu/util/env.h"
+#include "kudu/util/fault_injection.h"
 #include "kudu/util/flag_tags.h"
 #include "kudu/util/locks.h"
 #include "kudu/util/logging.h"
@@ -87,6 +88,16 @@ DEFINE_double(tablet_bloom_target_fp_rate, 0.01f,
               "required for bloom filters.");
 TAG_FLAG(tablet_bloom_target_fp_rate, advanced);
 
+
+DEFINE_double(fault_crash_before_flush_tablet_meta_after_compaction, 0.0,
+              "Fraction of the time, during compaction, to crash before flushing metadata");
+TAG_FLAG(fault_crash_before_flush_tablet_meta_after_compaction, unsafe);
+
+DEFINE_double(fault_crash_before_flush_tablet_meta_after_flush_mrs, 0.0,
+              "Fraction of the time, while flushing an MRS, to crash before flushing metadata");
+TAG_FLAG(fault_crash_before_flush_tablet_meta_after_flush_mrs, unsafe);
+
+
 METRIC_DEFINE_entity(tablet);
 METRIC_DEFINE_gauge_size(tablet, memrowset_size, "MemRowSet Memory Usage",
                          kudu::MetricUnit::kBytes,
@@ -1345,6 +1356,14 @@ Status Tablet::DoCompactionOrFlush(const RowSetsInCompaction &input,
int64_t mrs
   // ------------------------------
   // Flush was successful.
 
+  // Run fault points used by some integration tests.
+  if (input.num_rowsets() > 1) {
+    MAYBE_FAULT(FLAGS_fault_crash_before_flush_tablet_meta_after_compaction);
+  } else if (input.num_rowsets() == 1 &&
+             input.rowsets()[0]->EstimateOnDiskSize() == 0) {
+    MAYBE_FAULT(FLAGS_fault_crash_before_flush_tablet_meta_after_flush_mrs);
+  }
+
   // Write out the new Tablet Metadata and remove old rowsets.
   RETURN_NOT_OK_PREPEND(FlushMetadata(input.rowsets(), new_drs_metas, mrs_being_flushed),
                         "Failed to flush new tablet metadata");

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/826b7720/src/kudu/tablet/tablet_bootstrap.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_bootstrap.cc b/src/kudu/tablet/tablet_bootstrap.cc
index 782a973..fb46b61 100644
--- a/src/kudu/tablet/tablet_bootstrap.cc
+++ b/src/kudu/tablet/tablet_bootstrap.cc
@@ -110,17 +110,30 @@ using tserver::WriteRequestPB;
 struct ReplayState;
 
 // Information from the tablet metadata which indicates which data was
-// flushed prior to this restart.
+// flushed prior to this restart and which memory stores are still active.
 //
 // We take a snapshot of this information at the beginning of the bootstrap
 // process so that we can allow compactions and flushes to run during bootstrap
 // without confusing our tracking of flushed stores.
+//
+// NOTE: automatic flushes and compactions are not currently scheduled during
+// bootstrap. However, flushes may still be triggered due to operations like
+// alter-table.
 class FlushedStoresSnapshot {
  public:
   FlushedStoresSnapshot() {}
   Status InitFrom(const TabletMetadata& meta);
 
-  bool WasStoreAlreadyFlushed(const MemStoreTargetPB& target) const;
+  // Return true if the given memory store is still active (i.e. edits that were
+  // originally written to this memory store should be replayed during the bootstrap
+  // process).
+  //
+  // NOTE: a store may be inactive for either of two reasons. Either:
+  // (a) the store was flushed to disk, OR
+  // (b) the store was in the process of being written by a flush or compaction
+  //     but the process crashed before the associated tablet metadata update
+  //     was committed.
+  bool IsMemStoreActive(const MemStoreTargetPB& target) const;
 
  private:
   int64_t last_durable_mrs_id_;
@@ -244,13 +257,9 @@ class TabletBootstrap {
                       RowOp* op,
                       const OperationResultPB& op_result);
 
-  // Returns whether all the stores that are referred to in the commit
-  // message are already flushed.
-  bool AreAllStoresAlreadyFlushed(const CommitMsg& commit);
-
-  // Returns whether there is any store that is referred to in the commit
-  // message that is already flushed.
-  bool AreAnyStoresAlreadyFlushed(const CommitMsg& commit);
+  // Returns true if any of the memory stores referenced in 'commit' are still
+  // active, in which case the operation needs to be replayed.
+  bool AreAnyStoresActive(const CommitMsg& commit);
 
   void DumpReplayStateToLog(const ReplayState& state);
 
@@ -261,9 +270,9 @@ class TabletBootstrap {
   Status ApplyCommitMessage(ReplayState* state, LogEntryPB* commit_entry);
   Status HandleEntryPair(LogEntryPB* replicate_entry, LogEntryPB* commit_entry);
 
-  // Checks that an orphaned commit message is actually irrelevant, i.e that the
-  // data stores it refers to are already flushed.
-  Status CheckOrphanedCommitAlreadyFlushed(const CommitMsg& commit);
+  // Checks that an orphaned commit message is actually irrelevant, i.e that none
+  // of the data stores it refers to are live.
+  Status CheckOrphanedCommitDoesntNeedReplay(const CommitMsg& commit);
 
   // Decodes a Timestamp from the provided string and updates the clock
   // with it.
@@ -821,7 +830,7 @@ Status TabletBootstrap::HandleCommitMessage(ReplayState* state, LogEntryPB*
comm
   if (state->pending_replicates.empty() ||
       (*state->pending_replicates.begin()).first > committed_op_id.index()) {
     VLOG_WITH_PREFIX(2) << "Found orphaned commit for " << committed_op_id;
-    RETURN_NOT_OK(CheckOrphanedCommitAlreadyFlushed(commit_entry->commit()));
+    RETURN_NOT_OK(CheckOrphanedCommitDoesntNeedReplay(commit_entry->commit()));
     stats_.orphaned_commits++;
     delete commit_entry;
     return Status::OK();
@@ -859,21 +868,10 @@ Status TabletBootstrap::HandleCommitMessage(ReplayState* state, LogEntryPB*
comm
   return Status::OK();
 }
 
-bool TabletBootstrap::AreAllStoresAlreadyFlushed(const CommitMsg& commit) {
-  for (const OperationResultPB& op_result : commit.result().ops()) {
-    for (const MemStoreTargetPB& mutated_store : op_result.mutated_stores()) {
-      if (!flushed_stores_.WasStoreAlreadyFlushed(mutated_store)) {
-        return false;
-      }
-    }
-  }
-  return true;
-}
-
-bool TabletBootstrap::AreAnyStoresAlreadyFlushed(const CommitMsg& commit) {
+bool TabletBootstrap::AreAnyStoresActive(const CommitMsg& commit) {
   for (const OperationResultPB& op_result : commit.result().ops()) {
     for (const MemStoreTargetPB& mutated_store : op_result.mutated_stores()) {
-      if (flushed_stores_.WasStoreAlreadyFlushed(mutated_store)) {
+      if (flushed_stores_.IsMemStoreActive(mutated_store)) {
         return true;
       }
     }
@@ -881,14 +879,15 @@ bool TabletBootstrap::AreAnyStoresAlreadyFlushed(const CommitMsg&
commit) {
   return false;
 }
 
-Status TabletBootstrap::CheckOrphanedCommitAlreadyFlushed(const CommitMsg& commit) {
-  if (!AreAllStoresAlreadyFlushed(commit)) {
+Status TabletBootstrap::CheckOrphanedCommitDoesntNeedReplay(const CommitMsg& commit)
{
+  if (AreAnyStoresActive(commit)) {
     TabletSuperBlockPB super;
     WARN_NOT_OK(meta_->ToSuperBlock(&super), LogPrefix() + "Couldn't build TabletSuperBlockPB");
     return Status::Corruption(Substitute("CommitMsg was orphaned but it referred to "
-        "unflushed stores. Commit: $0. TabletMetadata: $1", commit.ShortDebugString(),
+        "stores which need replay. Commit: $0. TabletMetadata: $1", commit.ShortDebugString(),
         super.ShortDebugString()));
   }
+
   return Status::OK();
 }
 
@@ -920,7 +919,7 @@ Status TabletBootstrap::ApplyCommitMessage(ReplayState* state, LogEntryPB*
commi
     stats_.ops_committed++;
   } else {
     stats_.orphaned_commits++;
-    RETURN_NOT_OK(CheckOrphanedCommitAlreadyFlushed(commit_entry->commit()));
+    RETURN_NOT_OK(CheckOrphanedCommitDoesntNeedReplay(commit_entry->commit()));
   }
 
   return Status::OK();
@@ -1080,26 +1079,27 @@ Status TabletBootstrap::PlaySegments(ConsensusBootstrapInfo* consensus_info)
{
   }
 
   // If we have non-applied commits they all must belong to pending operations and
-  // they should only pertain to unflushed stores.
+  // they should only pertain to stores which are still active.
   if (!state.pending_commits.empty()) {
     for (const OpIndexToEntryMap::value_type& entry : state.pending_commits) {
       if (!ContainsKey(state.pending_replicates, entry.first)) {
         DumpReplayStateToLog(state);
         return Status::Corruption("Had orphaned commits at the end of replay.");
       }
-      if (AreAnyStoresAlreadyFlushed(entry.second->commit())) {
+      if (entry.second->commit().op_type() == WRITE_OP &&
+          !AreAnyStoresActive(entry.second->commit())) {
         DumpReplayStateToLog(state);
         TabletSuperBlockPB super;
         WARN_NOT_OK(meta_->ToSuperBlock(&super), "Couldn't build TabletSuperBlockPB.");
-        return Status::Corruption(Substitute("CommitMsg was pending but it referred to "
-            "flushed stores. Commit: $0. TabletMetadata: $1",
+        return Status::Corruption(Substitute("CommitMsg was pending but it did not refer
"
+            "to any active memory stores. Commit: $0. TabletMetadata: $1",
             entry.second->commit().ShortDebugString(), super.ShortDebugString()));
       }
     }
   }
 
   // Note that we don't pass the information contained in the pending commits along with
-  // ConsensusBootstrapInfo. We know that this is safe as they must refer to unflushed
+  // ConsensusBootstrapInfo. We know that this is safe as they must refer to active
   // stores (we make doubly sure above).
   //
   // Example/Explanation:
@@ -1120,7 +1120,7 @@ Status TabletBootstrap::PlaySegments(ConsensusBootstrapInfo* consensus_info)
{
   // pass them both as "pending" to consensus to be applied again.
   //
   // The reason why it is safe to simply disregard 10.11's commit is that we know that
-  // it must refer only to unflushed stores. We know this because one important flush/compact
+  // it must refer only to active stores. We know this because one important flush/compact
   // pre-condition is:
   // - No flush will become visible on reboot (meaning we won't durably update the tablet
   //   metadata), unless the snapshot under which the flush/compact was performed has no
@@ -1348,13 +1348,14 @@ Status TabletBootstrap::FilterInsert(WriteTransactionState* tx_state,
                                      const OperationResultPB& op_result) {
   DCHECK_EQ(op->decoded_op.type, RowOperationsPB::INSERT);
 
+  // INSERTs are never duplicated.
   if (PREDICT_FALSE(op_result.mutated_stores_size() != 1 ||
                     !op_result.mutated_stores(0).has_mrs_id())) {
     return Status::Corruption(Substitute("Insert operation result must have an mrs_id: $0",
                                          op_result.ShortDebugString()));
   }
   // check if the insert is already flushed
-  if (flushed_stores_.WasStoreAlreadyFlushed(op_result.mutated_stores(0))) {
+  if (!flushed_stores_.IsMemStoreActive(op_result.mutated_stores(0))) {
     if (VLOG_IS_ON(1)) {
       VLOG_WITH_PREFIX(1) << "Skipping insert that was already flushed. OpId: "
                           << tx_state->op_id().DebugString()
@@ -1383,37 +1384,35 @@ Status TabletBootstrap::FilterMutate(WriteTransactionState* tx_state,
   }
 
   // The mutation may have been duplicated, so we'll check whether any of the
-  // output targets was "unflushed".
-  int num_unflushed_stores = 0;
+  // output targets was active.
+  int num_active_stores = 0;
   for (const MemStoreTargetPB& mutated_store : op_result.mutated_stores()) {
-    if (!flushed_stores_.WasStoreAlreadyFlushed(mutated_store)) {
-      num_unflushed_stores++;
+    if (flushed_stores_.IsMemStoreActive(mutated_store)) {
+      num_active_stores++;
     } else {
       if (VLOG_IS_ON(1)) {
         string mutation = op->decoded_op.changelist.ToString(*tablet_->schema());
         VLOG_WITH_PREFIX(1) << "Skipping mutation to " << mutated_store.ShortDebugString()
-                            << " that was already flushed. "
-                            << "OpId: " << tx_state->op_id().DebugString();
+                            << " that was not active. OpId: " << tx_state->op_id().DebugString();
       }
     }
   }
 
-  if (num_unflushed_stores == 0) {
+  if (num_active_stores == 0) {
     // The mutation was fully flushed.
     op->SetFailed(Status::AlreadyPresent("Update was already flushed."));
     stats_.mutations_ignored++;
     return Status::OK();
   }
 
-  if (num_unflushed_stores == 2) {
-    // 18:47 < dralves> off the top of my head, if we crashed before writing the meta
-    //                  at the end of a flush/compation then both mutations could
-    //                  potentually be considered unflushed
-    // This case is not currently covered by any tests -- we need to add test coverage
-    // for this. See KUDU-218. It's likely the correct behavior is just to apply the edit,
-    // ie not fatal below.
-    LOG_WITH_PREFIX(DFATAL) << "TODO: add test coverage for case where op is unflushed
"
-                            << "in both duplicated targets";
+  if (PREDICT_FALSE(num_active_stores == 2)) {
+    // It's not possible for a duplicated mutation to refer to two stores which are still
+    // active. Either the mutation arrived before the metadata was flushed, in which case
+    // the 'first' store is live, or it arrived just after it was flushed, in which case
+    // the 'second' store was live. But at no time should the metadata refer to both the
+    // 'input' and 'output' stores of a compaction.
+    return Status::Corruption("Mutation was duplicated to two stores that are considered
live",
+                              op_result.ShortDebugString());
   }
 
   return Status::OK();
@@ -1448,32 +1447,37 @@ Status FlushedStoresSnapshot::InitFrom(const TabletMetadata& meta)
{
   return Status::OK();
 }
 
-bool FlushedStoresSnapshot::WasStoreAlreadyFlushed(const MemStoreTargetPB& target) const
{
+bool FlushedStoresSnapshot::IsMemStoreActive(const MemStoreTargetPB& target) const {
   if (target.has_mrs_id()) {
     DCHECK(!target.has_rs_id());
     DCHECK(!target.has_dms_id());
 
-    // The original mutation went to the MRS. It is flushed if it went to an MRS
-    // with a lower ID than the latest flushed one.
-    return target.mrs_id() <= last_durable_mrs_id_;
+    // The original mutation went to the MRS. If this MRS has not yet been made
+    // durable, it needs to be replayed.
+    return target.mrs_id() > last_durable_mrs_id_;
   } else {
+
     // The original mutation went to a DRS's delta store.
+    DCHECK(target.has_rs_id());
+
     int64_t last_durable_dms_id;
     if (!FindCopy(flushed_dms_by_drs_id_, target.rs_id(), &last_durable_dms_id)) {
-      // if we have no data about this RowSet, then it must have been flushed and
-      // then deleted.
-      // TODO: how do we avoid a race where we get an update on a rowset before
-      // it is persisted? add docs about the ordering of flush.
-      return true;
+      // If we have no data about this DRS, then there are two cases:
+      //
+      // 1) The DRS has already been flushed, but then later got removed because
+      // it got compacted away. Since it was flushed, we don't need to replay it.
+      //
+      // 2) The DRS was in the process of being written, but haven't yet flushed the
+      // TabletMetadata update that includes it. We only write to an in-progress DRS like
+      // this when we are in the 'duplicating' phase of a compaction. In that case,
+      // the other duplicated 'target' should still be present in the metadata, and we
+      // can base our decision based on that one.
+      return false;
     }
 
     // If the original rowset that we applied the edit to exists, check whether
     // the edit was in a flushed DMS or a live one.
-    if (target.dms_id() <= last_durable_dms_id) {
-      return true;
-    }
-
-    return false;
+    return target.dms_id() > last_durable_dms_id;
   }
 }
 


Mime
View raw message