kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a...@apache.org
Subject [kudu] 02/02: [tablet] Fixed the bug of DeltaTracker::CountDeletedRows
Date Wed, 21 Aug 2019 17:59:33 GMT
This is an automated email from the ASF dual-hosted git repository.

adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit b9c429c6831cf7aa948490136eeedc984cf7e4c1
Author: oclarms <oclarms@gmail.com>
AuthorDate: Wed Aug 14 11:43:51 2019 +0800

    [tablet] Fixed the bug of DeltaTracker::CountDeletedRows
    
    When Tablet.CountLiveRows was called in a multi-thread case, there's a
    chance we'll see the following failure.
    
    User stack:
    F0814 12:05:51.975797 96375 diskrowset.cc:759] Check failed: *count >= 0 (-3 vs. 0)
    *** Check failure stack trace: ***
    *** Aborted at 1565755551 (unix time) try "date -d @1565755551" if you are using GNU date
***
    PC: @     0x7f9bd20425f7 __GI_raise
    *** SIGABRT (@0x70900017872) received by PID 96370 (TID 0x7f9bce2d7700) from PID 96370;
stack trace: ***
        @     0x7f9bdaff6100 (unknown)
        @     0x7f9bd20425f7 __GI_raise
        @     0x7f9bd2043ce8 __GI_abort
        @     0x7f9bd4540c99 google::logging_fail()
        @     0x7f9bd454246d google::LogMessage::Fail()
        @     0x7f9bd45443c3 google::LogMessage::SendToLog()
        @     0x7f9bd4541fc9 google::LogMessage::Flush()
        @     0x7f9bd4544d4f google::LogMessageFatal::~LogMessageFatal()
        @     0x7f9bddc9aabe kudu::tablet::DiskRowSet::CountLiveRows()
        @     0x7f9bddbdeb79 kudu::tablet::Tablet::CountLiveRows()
        @           0x49891f kudu::tablet::MultiThreadedTabletTest<>::CollectStatisticsThread()
        @           0x4ae34b boost::_mfi::mf1<>::operator()()
        @           0x4add25 boost::_bi::list2<>::operator()<>()
        @           0x4acfe9 boost::_bi::bind_t<>::operator()()
        @           0x4ac8a6 boost::detail::function::void_function_obj_invoker0<>::invoke()
        @     0x7f9bd7116492 boost::function0<>::operator()()
        @     0x7f9bd62e5324 kudu::Thread::SuperviseThread()
        @     0x7f9bdafeedc5 start_thread
        @     0x7f9bd2103ced __clone
    
    This is because there is DeltaTracker lack of lock protection when modify
    the number of live rows in rowset_metadata_ and reset the deleted_row_count_.
    This caused deleted_row_count_ to be duplicated when calculating the number
    of live rows of DRS. Consider the following sequence:
    | T1                                | T2
    |----------                         |----------
    |+ In DT::Flush                     |
    |  Take compact_flush_lock_ (excl)  |
    |  Take component_lock_ (excl)      |
    |  deleted_row_count_ = ...         |
    |  Release component_lock_          |
    |  + In DT::FlushDMS                |
    |    Call RSMD::IncrementLiveRows   |
    |    --> RSMD::live_row_count - deleted_row_count_
    |                                   |+ In DRS::CountLiveRows
    |                                   |  Take component_lock_ (shared)
    |                                   |  Call RSMD::live_row_count - DT::CountDeletedRows
    |                                   |  --> RSMD::live_row_count - deleted_row_count_
    |                                   |  --> we double counted deleted_row_count_ !!!
    |  Take component_lock_ (excl)      |
    |  deleted_row_count_ = 0           |
    |  Release component_lock_          |
    |  Release compact_flush_lock_      |
    
    Change-Id: I9bb4456123087778c9dc799777c5990938a84fdf
    Reviewed-on: http://gerrit.cloudera.org:8080/14061
    Reviewed-by: Adar Dembo <adar@cloudera.com>
    Tested-by: Adar Dembo <adar@cloudera.com>
---
 src/kudu/integration-tests/raft_consensus-itest.cc |  36 ++++-
 src/kudu/integration-tests/test_workload.cc        | 153 ++++++++++++---------
 src/kudu/integration-tests/test_workload.h         |  20 +++
 src/kudu/tablet/delta_tracker.cc                   |  15 +-
 src/kudu/tablet/delta_tracker.h                    |   2 +-
 src/kudu/tablet/diskrowset.cc                      |   2 +-
 src/kudu/tablet/metadata-test.cc                   |   2 +-
 src/kudu/tablet/mt-tablet-test.cc                  |   9 +-
 src/kudu/tablet/rowset_metadata.cc                 |  10 +-
 src/kudu/tablet/rowset_metadata.h                  |   8 +-
 10 files changed, 177 insertions(+), 80 deletions(-)

diff --git a/src/kudu/integration-tests/raft_consensus-itest.cc b/src/kudu/integration-tests/raft_consensus-itest.cc
index 4e77708..0c2c114 100644
--- a/src/kudu/integration-tests/raft_consensus-itest.cc
+++ b/src/kudu/integration-tests/raft_consensus-itest.cc
@@ -220,7 +220,7 @@ class RaftConsensusITest : public RaftConsensusITestBase {
   void AssertMajorityRequiredForElectionsAndWrites(const TabletServerMap& tablet_servers,
                                                    const string& leader_uuid);
 
-  void CreateClusterForCrashyNodesTests();
+  void CreateClusterForCrashyNodesTests(vector<string> extra_ts_flags = {});
   void DoTestCrashyNodes(TestWorkload* workload, int max_rows_to_insert);
 
   // Prepare for a test where a single replica of a 3-server cluster is left
@@ -528,7 +528,7 @@ void RaftConsensusITest::AssertMajorityRequiredForElectionsAndWrites(
                                MonoDelta::FromSeconds(10)));
 }
 
-void RaftConsensusITest::CreateClusterForCrashyNodesTests() {
+void RaftConsensusITest::CreateClusterForCrashyNodesTests(vector<string> extra_ts_flags)
{
   if (AllowSlowTests()) {
     FLAGS_num_tablet_servers = 7;
     FLAGS_num_replicas = 7;
@@ -555,6 +555,8 @@ void RaftConsensusITest::CreateClusterForCrashyNodesTests() {
   // log area.
   ts_flags.emplace_back("--log_preallocate_segments=false");
 
+  ts_flags.insert(ts_flags.end(), extra_ts_flags.begin(), extra_ts_flags.end());
+
   NO_FATALS(CreateCluster("raft_consensus-itest-crashy-nodes-cluster",
                           std::move(ts_flags)));
 }
@@ -608,7 +610,7 @@ void RaftConsensusITest::DoTestCrashyNodes(TestWorkload* workload, int
max_rows_
   NO_FATALS(v.CheckCluster());
   NO_FATALS(v.CheckRowCount(workload->table_name(),
                             ClusterVerifier::EXACTLY,
-                            workload->rows_inserted()));
+                            workload->rows_inserted() - workload->rows_deleted()));
 }
 
 void RaftConsensusITest::SetupSingleReplicaTest(TServerDetails** replica_ts) {
@@ -952,6 +954,34 @@ TEST_F(RaftConsensusITest, InsertDuplicateKeysWithCrashyNodes) {
   NO_FATALS(DoTestCrashyNodes(&workload, 300));
 }
 
+// The same crashy nodes test as above but the keys will be deleted after insertion.
+TEST_F(RaftConsensusITest, InsertAndDeleteWithCrashyNodes) {
+  vector<string> extra_ts_flags = {
+      "--flush_threshold_mb=0",
+      "--flush_threshold_secs=1",
+      "--maintenance_manager_polling_interval_ms=10",
+      "--heartbeat_interval_ms=10",
+      "--update_tablet_stats_interval_ms=10",
+  };
+  NO_FATALS(CreateClusterForCrashyNodesTests(std::move(extra_ts_flags)));
+
+  // If the AllowSlowTests is true, test the scenario that deleting data on DRS.
+  // Otherwise, test deleting data on MRS.
+  int32_t write_interval_millis = 0;
+  int32_t write_batch_size = 5;
+  if (AllowSlowTests()) {
+    // Wait for MRS to be flushed.
+    write_interval_millis = 1000;
+    // Decrease the number of rows per batch to generate more DRSs.
+    write_batch_size = 1;
+  }
+
+  TestWorkload workload(cluster_.get());
+  workload.set_write_pattern(TestWorkload::INSERT_RANDOM_ROWS_WITH_DELETE);
+  workload.set_write_interval_millis(write_interval_millis);
+  workload.set_write_batch_size(write_batch_size);
+  NO_FATALS(DoTestCrashyNodes(&workload, 100));
+}
 
 TEST_F(RaftConsensusITest, MultiThreadedInsertWithFailovers) {
   int kNumElections = FLAGS_num_replicas;
diff --git a/src/kudu/integration-tests/test_workload.cc b/src/kudu/integration-tests/test_workload.cc
index 7b9863e..eb2bb70 100644
--- a/src/kudu/integration-tests/test_workload.cc
+++ b/src/kudu/integration-tests/test_workload.cc
@@ -17,7 +17,6 @@
 
 #include "kudu/integration-tests/test_workload.h"
 
-#include <cstddef>
 #include <memory>
 #include <ostream>
 
@@ -43,6 +42,8 @@ namespace kudu {
 
 using client::KuduClient;
 using client::KuduColumnSchema;
+using client::KuduDelete;
+using client::KuduError;
 using client::KuduInsert;
 using client::KuduScanBatch;
 using client::KuduScanner;
@@ -54,6 +55,8 @@ using client::KuduUpdate;
 using client::sp::shared_ptr;
 using cluster::MiniCluster;
 
+using std::vector;
+
 const char* const TestWorkload::kDefaultTableName = "test-workload";
 
 TestWorkload::TestWorkload(MiniCluster* cluster)
@@ -65,6 +68,7 @@ TestWorkload::TestWorkload(MiniCluster* cluster)
     // high-stress workloads.
     read_timeout_millis_(60000),
     write_batch_size_(50),
+    write_interval_millis_(0),
     write_timeout_millis_(20000),
     timeout_allowed_(false),
     not_found_allowed_(false),
@@ -77,6 +81,7 @@ TestWorkload::TestWorkload(MiniCluster* cluster)
     start_latch_(0),
     should_run_(false),
     rows_inserted_(0),
+    rows_deleted_(0),
     batches_completed_(0),
     sequential_key_gen_(0) {
   // Make the default write pattern random row inserts.
@@ -87,11 +92,11 @@ TestWorkload::~TestWorkload() {
   StopAndJoin();
 }
 
-void TestWorkload::set_schema(const client::KuduSchema& schema) {
+void TestWorkload::set_schema(const KuduSchema& schema) {
   // Do some sanity checks on the schema. They reflect how the rest of
   // TestWorkload is going to use the schema.
   CHECK_GT(schema.num_columns(), 0) << "Schema should have at least one column";
-  std::vector<int> key_indexes;
+  vector<int> key_indexes;
   schema.GetPrimaryKeyColumnIndexes(&key_indexes);
   CHECK_LE(1, key_indexes.size()) << "Schema should have at least one key column";
   CHECK_EQ(0, key_indexes[0]) << "Schema's first key column should be index 0";
@@ -137,73 +142,69 @@ void TestWorkload::WriteThread() {
 
   while (should_run_.Load()) {
     int inserted = 0;
-    for (int i = 0; i < write_batch_size_; i++) {
-      if (write_pattern_ == UPDATE_ONE_ROW) {
-        gscoped_ptr<KuduUpdate> update(table->NewUpdate());
-        KuduPartialRow* row = update->mutable_row();
-        tools::GenerateDataForRow(schema_, 0, &rng_, row);
-        CHECK_OK(session->Apply(update.release()));
-      } else {
-        gscoped_ptr<KuduInsert> insert(table->NewInsert());
-        KuduPartialRow* row = insert->mutable_row();
-        int32_t key;
-        if (write_pattern_ == INSERT_SEQUENTIAL_ROWS) {
-          key = sequential_key_gen_.Increment();
+    int deleted = 0;
+    vector<int32_t> keys;
+    // Write insert or update row to cluster.
+    {
+      for (int i = 0; i < write_batch_size_; i++) {
+        if (write_pattern_ == UPDATE_ONE_ROW) {
+          gscoped_ptr<KuduUpdate> update(table->NewUpdate());
+          KuduPartialRow* row = update->mutable_row();
+          tools::GenerateDataForRow(schema_, 0, &rng_, row);
+          CHECK_OK(session->Apply(update.release()));
         } else {
-          key = rng_.Next();
-          if (write_pattern_ == INSERT_WITH_MANY_DUP_KEYS) {
-            key %= kNumRowsForDuplicateKeyWorkload;
+          gscoped_ptr<KuduInsert> insert(table->NewInsert());
+          KuduPartialRow* row = insert->mutable_row();
+          int32_t key;
+          if (write_pattern_ == INSERT_SEQUENTIAL_ROWS) {
+            key = sequential_key_gen_.Increment();
+          } else {
+            key = rng_.Next();
+            if (write_pattern_ == INSERT_WITH_MANY_DUP_KEYS) {
+              key %= kNumRowsForDuplicateKeyWorkload;
+            }
           }
-        }
-
-        tools::GenerateDataForRow(schema_, key, &rng_, row);
-        if (payload_bytes_) {
-          // Note: overriding payload_bytes_ requires the "simple" schema.
-          std::string test_payload(payload_bytes_.get(), '0');
-          CHECK_OK(row->SetStringCopy(2, test_payload));
-        }
-        CHECK_OK(session->Apply(insert.release()));
-        inserted++;
-      }
-    }
-
-    Status s = session->Flush();
-
-    if (PREDICT_FALSE(!s.ok())) {
-      std::vector<client::KuduError*> errors;
-      ElementDeleter d(&errors);
-      bool overflow;
-      session->GetPendingErrors(&errors, &overflow);
-      CHECK(!overflow);
-      for (const client::KuduError* e : errors) {
-        if (timeout_allowed_ && e->status().IsTimedOut()) {
-          continue;
-        }
-
-        if (not_found_allowed_ && e->status().IsNotFound()) {
-          continue;
-        }
+          keys.push_back(key);
 
-        if (already_present_allowed_ && e->status().IsAlreadyPresent()) {
-          continue;
-        }
-
-        if (network_error_allowed_ && e->status().IsNetworkError()) {
-          continue;
-        }
+          tools::GenerateDataForRow(schema_, key, &rng_, row);
+          if (payload_bytes_) {
+            // Note: overriding payload_bytes_ requires the "simple" schema.
+            std::string test_payload(payload_bytes_.get(), '0');
+            CHECK_OK(row->SetStringCopy(2, test_payload));
+          }
+          CHECK_OK(session->Apply(insert.release()));
 
-        if (remote_error_allowed_ && e->status().IsRemoteError()) {
-          continue;
+          inserted++;
         }
-
-        LOG(FATAL) << e->status().ToString();
       }
-      inserted -= errors.size();
+      Status s = session->Flush();
+      if (PREDICT_FALSE(!s.ok())) {
+        inserted -= GetNumberOfErrors(session.get());
+      }
+      if (inserted > 0) {
+        rows_inserted_.IncrementBy(inserted);
+        batches_completed_.Increment();
+      }
     }
-
-    if (inserted > 0) {
-      rows_inserted_.IncrementBy(inserted);
-      batches_completed_.Increment();
+    if (PREDICT_FALSE(write_interval_millis_ > 0)) {
+      SleepFor(MonoDelta::FromMilliseconds(write_interval_millis_));
+    }
+    // Write delete row to cluster.
+    if (write_pattern_ == INSERT_RANDOM_ROWS_WITH_DELETE) {
+      for (auto key : keys) {
+        gscoped_ptr<KuduDelete> op(table->NewDelete());
+        KuduPartialRow* row = op->mutable_row();
+        tools::WriteValueToColumn(schema_, 0, key, row);
+        CHECK_OK(session->Apply(op.release()));
+        deleted++;
+      }
+      Status s = session->Flush();
+      if (PREDICT_FALSE(!s.ok())) {
+        deleted -= GetNumberOfErrors(session.get());
+      }
+      if (deleted > 0) {
+        rows_deleted_.IncrementBy(deleted);
+      }
     }
   }
 }
@@ -220,7 +221,10 @@ void TestWorkload::ReadThread() {
     CHECK_OK(scanner.SetTimeoutMillis(read_timeout_millis_));
     CHECK_OK(scanner.SetFaultTolerant());
 
-    int64_t expected_row_count = rows_inserted_.Load();
+    // Note: when INSERT_RANDOM_ROWS_WITH_DELETE is used, ReadThread doesn't really verify
+    // anything except that a scan works.
+    int64_t expected_row_count = write_pattern_ == INSERT_RANDOM_ROWS_WITH_DELETE ?
+                                 0 : rows_inserted_.Load();
     size_t row_count = 0;
 
     CHECK_OK(scanner.Open());
@@ -234,6 +238,25 @@ void TestWorkload::ReadThread() {
   }
 }
 
+size_t TestWorkload::GetNumberOfErrors(KuduSession* session) {
+  vector<KuduError*> errors;
+  ElementDeleter d(&errors);
+  bool overflow;
+  session->GetPendingErrors(&errors, &overflow);
+  CHECK(!overflow);
+  for (const KuduError* e : errors) {
+    if ((timeout_allowed_ && e->status().IsTimedOut()) ||
+        (not_found_allowed_ && e->status().IsNotFound()) ||
+        (already_present_allowed_ && e->status().IsAlreadyPresent()) ||
+        (network_error_allowed_ && e->status().IsNetworkError()) ||
+        (remote_error_allowed_ && e->status().IsRemoteError())) {
+      continue;
+    }
+    LOG(FATAL) << e->status().ToString();
+  }
+  return errors.size();
+}
+
 shared_ptr<KuduClient> TestWorkload::CreateClient() {
   CHECK_OK(cluster_->CreateClient(&client_builder_, &client_));
   return client_;
@@ -259,7 +282,7 @@ void TestWorkload::Setup() {
 
   if (!table_exists) {
     // Create split rows.
-    std::vector<const KuduPartialRow*> splits;
+    vector<const KuduPartialRow*> splits;
     for (int i = 1; i < num_tablets_; i++) {
       KuduPartialRow* r = schema_.NewRow();
       CHECK_OK(r->SetInt32("key", MathLimits<int32_t>::kMax / num_tablets_ * i));
diff --git a/src/kudu/integration-tests/test_workload.h b/src/kudu/integration-tests/test_workload.h
index 689f139..00a35af 100644
--- a/src/kudu/integration-tests/test_workload.h
+++ b/src/kudu/integration-tests/test_workload.h
@@ -17,6 +17,7 @@
 #ifndef KUDU_INTEGRATION_TESTS_TEST_WORKLOAD_H
 #define KUDU_INTEGRATION_TESTS_TEST_WORKLOAD_H
 
+#include <cstddef>
 #include <cstdint>
 #include <ostream>
 #include <string>
@@ -76,6 +77,10 @@ class TestWorkload {
     write_batch_size_ = s;
   }
 
+  void set_write_interval_millis(int t) {
+    write_interval_millis_ = t;
+  }
+
   void set_client_default_rpc_timeout_millis(int t) {
     client_builder_.default_rpc_timeout(MonoDelta::FromMilliseconds(t));
   }
@@ -149,6 +154,11 @@ class TestWorkload {
     // duplicate, but with 32-bit keys, they won't be frequent.
     INSERT_RANDOM_ROWS,
 
+    // Insert random rows, then delete them.
+    // This may cause an occasional duplicate, but with 32-bit keys, they won't be frequent.
+    // This requires two flush operations.
+    INSERT_RANDOM_ROWS_WITH_DELETE,
+
     // All threads generate updates against a single row.
     UPDATE_ONE_ROW,
 
@@ -170,6 +180,7 @@ class TestWorkload {
         set_already_present_allowed(true);
         break;
       case INSERT_RANDOM_ROWS:
+      case INSERT_RANDOM_ROWS_WITH_DELETE:
       case UPDATE_ONE_ROW:
       case INSERT_SEQUENTIAL_ROWS:
         set_already_present_allowed(false);
@@ -199,6 +210,12 @@ class TestWorkload {
     return rows_inserted_.Load();
   }
 
+  // Return the number of rows deleted so far. This may be called either
+  // during or after the write workload.
+  int64_t rows_deleted() const {
+    return rows_deleted_.Load();
+  }
+
   // Return the number of batches in which we have successfully inserted at
   // least one row.
   // NOTE: it is not safe to assume that this is exactly equal to the number
@@ -214,6 +231,7 @@ class TestWorkload {
   void OpenTable(client::sp::shared_ptr<client::KuduTable>* table);
   void WriteThread();
   void ReadThread();
+  size_t GetNumberOfErrors(client::KuduSession* session);
 
   cluster::MiniCluster* cluster_;
   client::KuduClientBuilder client_builder_;
@@ -225,6 +243,7 @@ class TestWorkload {
   int num_read_threads_;
   int read_timeout_millis_;
   int write_batch_size_;
+  int write_interval_millis_;
   int write_timeout_millis_;
   bool timeout_allowed_;
   bool not_found_allowed_;
@@ -241,6 +260,7 @@ class TestWorkload {
   CountDownLatch start_latch_;
   AtomicBool should_run_;
   AtomicInt<int64_t> rows_inserted_;
+  AtomicInt<int64_t> rows_deleted_;
   AtomicInt<int64_t> batches_completed_;
   AtomicInt<int32_t> sequential_key_gen_;
 
diff --git a/src/kudu/tablet/delta_tracker.cc b/src/kudu/tablet/delta_tracker.cc
index 4ea8684..770e3aa 100644
--- a/src/kudu/tablet/delta_tracker.cc
+++ b/src/kudu/tablet/delta_tracker.cc
@@ -744,10 +744,16 @@ Status DeltaTracker::FlushDMS(DeltaMemStore* dms,
                                             dfr));
   VLOG_WITH_PREFIX(1) << "Opened new delta block " << block_id.ToString() <<
" for read";
 
-  // Merge the deleted row count of the old DMS to the RowSetMetadata if necessary.
-  rowset_metadata_->IncrementLiveRows(-deleted_row_count_);
-
-  RETURN_NOT_OK(rowset_metadata_->CommitRedoDeltaDataBlock(dms->id(), block_id));
+  {
+    // Merge the deleted row count of the old DMS to the RowSetMetadata
+    // and reset deleted_row_count_ should be atomic, so we lock the
+    // component_lock_ in exclusive mode.
+    std::lock_guard<rw_spinlock> lock(component_lock_);
+    RETURN_NOT_OK(rowset_metadata_->CommitRedoDeltaDataBlock(dms->id(),
+                                                             deleted_row_count_,
+                                                             block_id));
+    deleted_row_count_ = 0;
+  }
   if (flush_type == FLUSH_METADATA) {
     RETURN_NOT_OK_PREPEND(rowset_metadata_->Flush(),
                           Substitute("Unable to commit Delta block metadata for: $0",
@@ -814,7 +820,6 @@ Status DeltaTracker::Flush(const IOContext* io_context, MetadataFlushType
flush_
     CHECK_EQ(redo_delta_stores_[idx], old_dms)
         << "Another thread modified the delta store list during flush";
     redo_delta_stores_[idx] = dfr;
-    deleted_row_count_ = 0;
   }
 
   return Status::OK();
diff --git a/src/kudu/tablet/delta_tracker.h b/src/kudu/tablet/delta_tracker.h
index 072b6e6..a4ea0a1 100644
--- a/src/kudu/tablet/delta_tracker.h
+++ b/src/kudu/tablet/delta_tracker.h
@@ -373,7 +373,7 @@ class DeltaTracker {
   // TODO(perf): this needs to be more fine grained
   mutable Mutex compact_flush_lock_;
 
-  // Number of deleted rows for a DMS that is currently being flushed.
+  // Number of deleted rows for a DMS that is currently being flushed.
   // When the flush completes, this is merged into the RowSetMetadata
   // and reset.
   int64_t deleted_row_count_;
diff --git a/src/kudu/tablet/diskrowset.cc b/src/kudu/tablet/diskrowset.cc
index 8143fc3..b2395a9 100644
--- a/src/kudu/tablet/diskrowset.cc
+++ b/src/kudu/tablet/diskrowset.cc
@@ -467,7 +467,7 @@ Status RollingDiskRowSetWriter::FinishCurrentWriter() {
     s = cur_redo_writer_->FinishAndReleaseBlock(block_transaction_.get());
     if (!s.IsAborted()) {
       RETURN_NOT_OK(s);
-      cur_drs_metadata_->CommitRedoDeltaDataBlock(0, cur_redo_ds_block_id_);
+      cur_drs_metadata_->CommitRedoDeltaDataBlock(0, 0, cur_redo_ds_block_id_);
     } else {
       DCHECK_EQ(cur_redo_delta_stats->min_timestamp(), Timestamp::kMax);
     }
diff --git a/src/kudu/tablet/metadata-test.cc b/src/kudu/tablet/metadata-test.cc
index ec77b00..e67529d 100644
--- a/src/kudu/tablet/metadata-test.cc
+++ b/src/kudu/tablet/metadata-test.cc
@@ -46,7 +46,7 @@ class MetadataTest : public KuduTest {
     tablet_meta_ = new TabletMetadata(nullptr, "fake-tablet");
     CHECK_OK(RowSetMetadata::CreateNew(tablet_meta_.get(), 0, &meta_));
     for (int i = 0; i < all_blocks_.size(); i++) {
-      CHECK_OK(meta_->CommitRedoDeltaDataBlock(i, all_blocks_[i]));
+      CHECK_OK(meta_->CommitRedoDeltaDataBlock(i, 0, all_blocks_[i]));
     }
     CHECK_EQ(4, meta_->redo_delta_blocks().size());
   }
diff --git a/src/kudu/tablet/mt-tablet-test.cc b/src/kudu/tablet/mt-tablet-test.cc
index 2e3afaa..dab3913 100644
--- a/src/kudu/tablet/mt-tablet-test.cc
+++ b/src/kudu/tablet/mt-tablet-test.cc
@@ -35,6 +35,7 @@
 #include "kudu/common/rowblock.h"
 #include "kudu/common/rowid.h"
 #include "kudu/common/schema.h"
+#include "kudu/gutil/basictypes.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/tablet/local_tablet_writer.h"
@@ -408,13 +409,18 @@ class MultiThreadedTabletTest : public TabletTestBase<SETUP> {
       "num_rowsets");
     shared_ptr<TimeSeries> memrowset_size_ts = ts_collector_.GetTimeSeries(
       "memrowset_kb");
+    shared_ptr<TimeSeries> num_live_rows_ts = ts_collector_.GetTimeSeries(
+      "num_live_rows");
 
     while (running_insert_count_.count() > 0) {
       num_rowsets_ts->SetValue(tablet()->num_rowsets());
       memrowset_size_ts->SetValue(tablet()->MemRowSetSize() / 1024.0);
+      int64_t num_live_rows;
+      ignore_result(tablet()->CountLiveRows(&num_live_rows));
+      num_live_rows_ts->SetValue(num_live_rows);
 
       // Wait, unless the inserters are all done.
-      running_insert_count_.WaitFor(MonoDelta::FromMilliseconds(250));
+      running_insert_count_.WaitFor(MonoDelta::FromMilliseconds(10));
     }
   }
 
@@ -491,6 +497,7 @@ TYPED_TEST(MultiThreadedTabletTest, DeleteAndReinsert) {
   FLAGS_flusher_initial_frequency_ms = 1;
   FLAGS_tablet_delta_store_major_compact_min_ratio = 0.01f;
   FLAGS_tablet_delta_store_minor_compact_max = 10;
+  this->StartThreads(1, &TestFixture::CollectStatisticsThread);
   this->StartThreads(FLAGS_num_flush_threads, &TestFixture::FlushThread);
   this->StartThreads(FLAGS_num_compact_threads, &TestFixture::CompactThread);
   this->StartThreads(FLAGS_num_undo_delta_gc_threads, &TestFixture::DeleteAncientUndoDeltasThread);
diff --git a/src/kudu/tablet/rowset_metadata.cc b/src/kudu/tablet/rowset_metadata.cc
index 6bd19ce..6c9705c 100644
--- a/src/kudu/tablet/rowset_metadata.cc
+++ b/src/kudu/tablet/rowset_metadata.cc
@@ -187,10 +187,12 @@ void RowSetMetadata::SetColumnDataBlocks(const std::map<ColumnId,
BlockId>& bloc
 }
 
 Status RowSetMetadata::CommitRedoDeltaDataBlock(int64_t dms_id,
+                                                int64_t num_deleted_rows,
                                                 const BlockId& block_id) {
   std::lock_guard<LockType> l(lock_);
   last_durable_redo_dms_id_ = dms_id;
   redo_delta_blocks_.push_back(block_id);
+  IncrementLiveRowsUnlocked(-num_deleted_rows);
   return Status::OK();
 }
 
@@ -278,14 +280,18 @@ void RowSetMetadata::CommitUpdate(const RowSetMetadataUpdate& update,
   blocks_by_col_id_.shrink_to_fit();
 }
 
-void RowSetMetadata::IncrementLiveRows(int64_t row_count) {
+void RowSetMetadata::IncrementLiveRowsUnlocked(int64_t row_count) {
   if (tablet_metadata_->supports_live_row_count() && row_count != 0) {
-    std::lock_guard<LockType> l(lock_);
     live_row_count_ += row_count;
     DCHECK_GE(live_row_count_, 0);
   }
 }
 
+void RowSetMetadata::IncrementLiveRows(int64_t row_count) {
+  std::lock_guard<LockType> l(lock_);
+  IncrementLiveRowsUnlocked(row_count);
+}
+
 int64_t RowSetMetadata::live_row_count() const {
   std::lock_guard<LockType> l(lock_);
   DCHECK_GE(live_row_count_, 0);
diff --git a/src/kudu/tablet/rowset_metadata.h b/src/kudu/tablet/rowset_metadata.h
index 94af6f3..7f67b63 100644
--- a/src/kudu/tablet/rowset_metadata.h
+++ b/src/kudu/tablet/rowset_metadata.h
@@ -117,7 +117,11 @@ class RowSetMetadata {
 
   void SetColumnDataBlocks(const std::map<ColumnId, BlockId>& blocks_by_col_id);
 
-  Status CommitRedoDeltaDataBlock(int64_t dms_id, const BlockId& block_id);
+  // Atomically commit the new redo delta block to RowSetMetadata.
+  // This atomic operation includes updates to last_durable_redo_dms_id_ and live_row_count_.
+  Status CommitRedoDeltaDataBlock(int64_t dms_id,
+                                  int64_t num_deleted_rows,
+                                  const BlockId& block_id);
 
   Status CommitUndoDeltaDataBlock(const BlockId& block_id);
 
@@ -256,6 +260,8 @@ class RowSetMetadata {
 
   Status InitFromPB(const RowSetDataPB& pb);
 
+  void IncrementLiveRowsUnlocked(int64_t row_count);
+
   TabletMetadata* const tablet_metadata_;
   bool initted_;
   int64_t id_;


Mime
View raw message