kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject [3/4] kudu git commit: KUDU-1567. Decouple hard-minimum WAL segment retention from target
Date Thu, 06 Oct 2016 02:57:18 GMT
KUDU-1567. Decouple hard-minimum WAL segment retention from target

This changes the behavior around the "minimum log segments to retain".
Previously, the maintenance manager considered it high priority to flush
any in-memory store which was retaining more than this number of log
segments. With the default log_min_segments_to_retain=2, this caused the
maintenance manager to trigger very small flushes (128MB) regardless of
the size of flush_threshold_mb. The end result here was high write
amplification.

Testing with -log_min_segments_to_retain=50 indicated that write
performance could be improved about 2x and write amplification reduced
by about 1.7x by removing this aggressive flush behavior.

However, setting the 'min segments to retain' also had the unfortunate
side effect of _always_ retaining 50 segments, regardless of whether
those were actually necessary for durability purposes. In a long-running
cluster, most tablets are not actively being loaded into at such a high
rate, and retaining 50 segments would mean unnecessary disk usage as
well as longer startup times in the absence of a solution to KUDU-38.

Thus, this patch takes the approach of decoupling the two ideas into two
separate configurations:

1) the original 'log_min_segments_to_retain', which can be left very
   low, and now is really only useful for things like post-mortem
   debugging. A future commit could change this to 1 or possibly even 0.

2) a new 'maintenance_manager_target_log_replay_size_mb' flag, which
   indicates the amount of retained log data at which point the MM
   should schedule flushes of in-memory stores.

With the new defaults, we should have the following behavior:
- an MRS can fill up until the logs reach 1GB. At that point, the MM
  will begin flushing.
- after a flush, the logs will be GCed down to 2 segments.

As follow-on work, we can consider the following ideas:
- log_min_segments_to_retain is no longer determining flushes, so it
  would be safe to set it to 0 (i.e. only retain the in-progress log).
  However, this will likely need a bit more stress testing and will
  require updating various tests.
- along the same lines, we can consider adding functionality to the
  log such that, if a tablet hasn't received writes in a long time
  and the log size is greater than some threshold, it would perform
  an "early roll" and not preallocate the next segment. This could
  save disk space for "inactive" tablets as well as decrease startup
  time.

Change-Id: I31400e2200f9ce3eeb63f4bc948bc630e8c1115f
Reviewed-on: http://gerrit.cloudera.org:8080/4470
Tested-by: Kudu Jenkins
Reviewed-by: David Ribeiro Alves <dralves@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/127438af
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/127438af
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/127438af

Branch: refs/heads/master
Commit: 127438af30356f1afedb862166c907ff754d1c55
Parents: 0ee10ea
Author: Todd Lipcon <todd@apache.org>
Authored: Mon Sep 19 17:16:15 2016 -0700
Committer: Todd Lipcon <todd@apache.org>
Committed: Thu Oct 6 01:15:30 2016 +0000

----------------------------------------------------------------------
 src/kudu/consensus/log-test.cc                  | 64 +++++++++++---------
 src/kudu/consensus/log.cc                       | 28 ++++-----
 src/kudu/consensus/log.h                        | 34 ++++++++---
 .../integration-tests/raft_consensus-itest.cc   |  4 +-
 src/kudu/tablet/tablet-test.cc                  | 42 +++++++------
 src/kudu/tablet/tablet.cc                       | 49 +++++++--------
 src/kudu/tablet/tablet.h                        | 29 ++++-----
 src/kudu/tablet/tablet_peer.cc                  |  8 +--
 src/kudu/tablet/tablet_peer.h                   |  7 +--
 src/kudu/tablet/tablet_peer_mm_ops.cc           | 20 +++---
 src/kudu/util/maintenance_manager-test.cc       | 18 ++++--
 src/kudu/util/maintenance_manager.cc            | 19 ++++--
 12 files changed, 183 insertions(+), 139 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/127438af/src/kudu/consensus/log-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/log-test.cc b/src/kudu/consensus/log-test.cc
index c755dd8..907a99c 100644
--- a/src/kudu/consensus/log-test.cc
+++ b/src/kudu/consensus/log-test.cc
@@ -490,6 +490,16 @@ TEST_F(LogTest, TestGCWithLogRunning) {
   ASSERT_OK(log_->reader()->GetSegmentsSnapshot(&segments))
   ASSERT_EQ(4, segments.size()) << DumpSegmentsToString(segments);
 
+  // Logs should be retained for durability even if this puts it above the
+  // maximum configured number of segments.
+  {
+    google::FlagSaver saver;
+    FLAGS_log_min_segments_to_retain = 1;
+    FLAGS_log_max_segments_to_retain = 1;
+    ASSERT_OK(log_->GC(retention, &num_gced_segments));
+    ASSERT_EQ(0, num_gced_segments);
+  }
+
   // Freeing the first 2 anchors should allow GC of them.
   ASSERT_OK(log_anchor_registry_->Unregister(anchors[0]));
   ASSERT_OK(log_anchor_registry_->Unregister(anchors[1]));
@@ -972,45 +982,45 @@ TEST_F(LogTest, TestReadLogWithReplacedReplicates) {
 
 // Test various situations where we expect different segments depending on what the
 // min log index is.
-TEST_F(LogTest, TestGetMaxIndexesToSegmentSizeMap) {
+TEST_F(LogTest, TestGetGCableDataSize) {
   FLAGS_log_min_segments_to_retain = 2;
   ASSERT_OK(BuildLog());
 
   const int kNumTotalSegments = 5;
   const int kNumOpsPerSegment = 5;
+  const int kSegmentSizeBytes = 315;
   OpId op_id = MakeOpId(1, 10);
   // Create 5 segments, starting from log index 10, with 5 ops per segment.
+  // [10-14], [15-19], [20-24], [25-29], [30-34]
   ASSERT_OK(AppendMultiSegmentSequence(kNumTotalSegments, kNumOpsPerSegment,
                                               &op_id, nullptr));
 
-  std::map<int64_t, int64_t> max_idx_to_segment_size;
-
-  // Check getting all the segments we can get rid of (5 - 2).
-  log_->GetMaxIndexesToSegmentSizeMap(10, &max_idx_to_segment_size);
-  ASSERT_EQ(3, max_idx_to_segment_size.size());
+  // GCing through the first op should not be able to remove any logs.
+  EXPECT_EQ(0, log_->GetGCableDataSize(RetentionIndexes(10)));
 
   // Check that even when the min index is the last index from the oldest segment,
-  // we still return 3.
-  log_->GetMaxIndexesToSegmentSizeMap(14, &max_idx_to_segment_size);
-  ASSERT_EQ(3, max_idx_to_segment_size.size());
-
-  // Check that if the first segment is GCable, we get 2 back.
-  log_->GetMaxIndexesToSegmentSizeMap(15, &max_idx_to_segment_size);
-  ASSERT_EQ(2, max_idx_to_segment_size.size());
-
-  // Check that if the min index is at the very end of the only segment we can get rid of
that we
-  // get 1 back.
-  log_->GetMaxIndexesToSegmentSizeMap(24, &max_idx_to_segment_size);
-  ASSERT_EQ(1, max_idx_to_segment_size.size());
-
-  // Check that we don't get anything back when there's nothing we want to get rid of.
-  log_->GetMaxIndexesToSegmentSizeMap(25, &max_idx_to_segment_size);
-  ASSERT_EQ(0, max_idx_to_segment_size.size());
-
-  // Sanity check that even if the min log index is the newest op that nothing breaks and
that
-  // we get 0 segments back.
-  log_->GetMaxIndexesToSegmentSizeMap(35, &max_idx_to_segment_size);
-  ASSERT_EQ(0, max_idx_to_segment_size.size());
+  // we still return the same result.
+  EXPECT_EQ(0, log_->GetGCableDataSize(RetentionIndexes(14)));
+
+  // Check that if the first segment is GCable, we return its size.
+  EXPECT_EQ(kSegmentSizeBytes, log_->GetGCableDataSize(RetentionIndexes(15)));
+
+  // GCable index at the end of the third segment. Should only be able to GC the first
+  // two.
+  EXPECT_EQ(kSegmentSizeBytes * 2, log_->GetGCableDataSize(RetentionIndexes(24)));
+
+  // GCing through the first op in the fourth segment should be able to remove
+  // the first three.
+  EXPECT_EQ(kSegmentSizeBytes * 3, log_->GetGCableDataSize(RetentionIndexes(25)));
+
+  // Even if we could GC all of the ops written, we should respect the 'log_min_segments_to_retain'
+  // setting and not GC the last two.
+  EXPECT_EQ(kSegmentSizeBytes * 3, log_->GetGCableDataSize(RetentionIndexes(35)));
+
+  // If we change the configuration, we should be able to GC all of the closed segments.
+  // The last segment is not GCable because it is still open.
+  FLAGS_log_min_segments_to_retain = 0;
+  EXPECT_EQ(kSegmentSizeBytes * 4, log_->GetGCableDataSize(RetentionIndexes(35)));
 }
 
 // Regression test. Check that failed preallocation returns an error instead of

http://git-wip-us.apache.org/repos/asf/kudu/blob/127438af/src/kudu/consensus/log.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/log.cc b/src/kudu/consensus/log.cc
index 4a72048..b77257a 100644
--- a/src/kudu/consensus/log.cc
+++ b/src/kudu/consensus/log.cc
@@ -18,6 +18,7 @@
 #include "kudu/consensus/log.h"
 
 #include <algorithm>
+#include <boost/range/adaptor/reversed.hpp>
 #include <limits>
 #include <mutex>
 
@@ -752,28 +753,27 @@ Status Log::GC(RetentionIndexes retention_indexes, int32_t* num_gced)
{
   return Status::OK();
 }
 
-void Log::GetGCableDataSize(RetentionIndexes retention_indexes, int64_t* total_size) const
{
+int64_t Log::GetGCableDataSize(RetentionIndexes retention_indexes) const {
   CHECK_GE(retention_indexes.for_durability, 0);
   SegmentSequence segments_to_delete;
-  *total_size = 0;
   {
     shared_lock<rw_spinlock> l(state_lock_.get_lock());
     CHECK_EQ(kLogWriting, log_state_);
     Status s = GetSegmentsToGCUnlocked(retention_indexes, &segments_to_delete);
 
     if (!s.ok() || segments_to_delete.empty()) {
-      return;
+      return 0;
     }
   }
+  int64_t total_size = 0;
   for (const scoped_refptr<ReadableLogSegment>& segment : segments_to_delete) {
-    *total_size += segment->file_size();
+    total_size += segment->file_size();
   }
+  return total_size;
 }
 
-void Log::GetMaxIndexesToSegmentSizeMap(int64_t idx_for_durability,
-                                        std::map<int64_t, int64_t>* max_idx_to_segment_size)
-                                        const {
-  max_idx_to_segment_size->clear();
+void Log::GetReplaySizeMap(std::map<int64_t, int64_t>* replay_size) const {
+  replay_size->clear();
   SegmentSequence segments;
   {
     shared_lock<rw_spinlock> l(state_lock_.get_lock());
@@ -781,12 +781,12 @@ void Log::GetMaxIndexesToSegmentSizeMap(int64_t idx_for_durability,
     CHECK_OK(reader_->GetSegmentsSnapshot(&segments));
   }
 
-  int gc_prefix = GetPrefixSizeToGC(RetentionIndexes(idx_for_durability),
-                                    segments);
-  for (int i = gc_prefix; i < segments.size() - FLAGS_log_min_segments_to_retain; i++)
{
-    if (!segments[i]->HasFooter()) break;
-    int64_t max_repl_idx = segments[i]->footer().max_replicate_index();
-    (*max_idx_to_segment_size)[max_repl_idx] = segments[i]->file_size();
+  int64_t cumulative_size = 0;
+  for (const auto& segment : boost::adaptors::reverse(segments)) {
+    if (!segment->HasFooter()) continue;
+    cumulative_size += segment->file_size();
+    int64_t max_repl_idx = segment->footer().max_replicate_index();
+    (*replay_size)[max_repl_idx] = cumulative_size;
   }
 }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/127438af/src/kudu/consensus/log.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/log.h b/src/kudu/consensus/log.h
index a43de21..fce13c4 100644
--- a/src/kudu/consensus/log.h
+++ b/src/kudu/consensus/log.h
@@ -198,15 +198,31 @@ class Log : public RefCountedThreadSafe<Log> {
   Status GC(RetentionIndexes retention_indexes, int* num_gced);
 
   // Computes the amount of bytes that would have been GC'd if Log::GC had been called.
-  void GetGCableDataSize(RetentionIndexes retention_indexes, int64_t* total_size) const;
-
-  // Returns a map of log index -> segment size, of all the segments that currently cannot
be GCed
-  // because of an anchor on the given 'idx_for_durability' log index. Note that, even if
-  // these segments are being retained for peer catchup, they are treated as 'GCable' here,
-  // since the purpose of this method is to bound startup time by flushing in-memory stores
-  // which refer to operations far back in the log.
-  void GetMaxIndexesToSegmentSizeMap(int64_t idx_for_durability,
-                                     std::map<int64_t, int64_t>* max_idx_to_segment_size)
const;
+  int64_t GetGCableDataSize(RetentionIndexes retention_indexes) const;
+
+  // Returns a map which can be used to determine the cumulative size of log segments
+  // containing entries at or above any given log index.
+  //
+  // For example, if the current log segments are:
+  //
+  //    Indexes    Size
+  //    ------------------
+  //    [1-100]    20MB
+  //    [101-200]  15MB
+  //    [201-300]  10MB
+  //    [302-???]  <open>   (counts as 0MB)
+  //
+  // This function will return:
+  //
+  //    {100 => 45MB,
+  //     200 => 25MB,
+  //     300 => 10MB}
+  //
+  // In other words, an anchor on any index <= 100 would retain 45MB of logs,
+  // and any anchor on 100 < index <= 200 would retain 25MB of logs, etc.
+  //
+  // Note that the returned values are in units of bytes, not MB.
+  void GetReplaySizeMap(std::map<int64_t, int64_t>* replay_size) const;
 
   // Returns the file system location of the currently active WAL segment.
   const std::string& ActiveSegmentPathForTests() const {

http://git-wip-us.apache.org/repos/asf/kudu/blob/127438af/src/kudu/integration-tests/raft_consensus-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/raft_consensus-itest.cc b/src/kudu/integration-tests/raft_consensus-itest.cc
index 5d94a9b..3b44243 100644
--- a/src/kudu/integration-tests/raft_consensus-itest.cc
+++ b/src/kudu/integration-tests/raft_consensus-itest.cc
@@ -389,6 +389,7 @@ void RaftConsensusITest::AddFlagsForLogRolls(vector<string>* extra_tserver_flags
   extra_tserver_flags->push_back("--log_min_segments_to_retain=1");
   extra_tserver_flags->push_back("--log_max_segments_to_retain=3");
   extra_tserver_flags->push_back("--maintenance_manager_polling_interval_ms=100");
+  extra_tserver_flags->push_back("--log_target_replay_size_mb=1");
 }
 
 // Test that we can retrieve the permanent uuid of a server running
@@ -627,7 +628,8 @@ TEST_F(RaftConsensusITest, TestCatchupAfterOpsEvicted) {
     "--log_async_preallocate_segments=false",
     // Run the maintenance manager frequently so that we don't have to wait
     // long for GC.
-    "--maintenance_manager_polling_interval_ms=100"
+    "--maintenance_manager_polling_interval_ms=100",
+    "--log_target_replay_size_mb=1"
   };
   BuildAndStart(extra_flags);
   TServerDetails* replica = (*tablet_replicas_.begin()).second;

http://git-wip-us.apache.org/repos/asf/kudu/blob/127438af/src/kudu/tablet/tablet-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet-test.cc b/src/kudu/tablet/tablet-test.cc
index 21ce05c..e2844d9 100644
--- a/src/kudu/tablet/tablet-test.cc
+++ b/src/kudu/tablet/tablet-test.cc
@@ -950,37 +950,39 @@ TYPED_TEST(TestTablet, TestMetricsInit) {
 }
 
 // Test that we find the correct log segment size for different indexes.
-TEST(TestTablet, TestGetLogRetentionSizeForIndex) {
-  std::map<int64_t, int64_t> idx_size_map;
-  // We build a map that represents 3 logs. The key is the index where that log ends, and
the value
-  // is its size.
-  idx_size_map[3] = 1;
-  idx_size_map[6] = 10;
-  idx_size_map[9] = 100;
-
-  // The default value should return a size of 0.
+TEST(TestTablet, TestGetReplaySizeForIndex) {
+  std::map<int64_t, int64_t> replay_size_map;
+
+  // We build a map that represents 3 logs.
+  // See Log::GetReplaySizeMap(...) for details.
+  replay_size_map[100] = 45;
+  replay_size_map[200] = 25;
+  replay_size_map[300] = 10;
+
+  // -1 indicates that no logs are anchored, and thus we it should report
+  // no logs need to be replayed.
   int64_t min_log_index = -1;
-  ASSERT_EQ(Tablet::GetLogRetentionSizeForIndex(min_log_index, idx_size_map), 0);
+  EXPECT_EQ(Tablet::GetReplaySizeForIndex(min_log_index, replay_size_map), 0);
 
-  // A value at the beginning of the first segment retains all the logs.
+  // A value in or before the first segment retains all the logs.
   min_log_index = 1;
-  ASSERT_EQ(Tablet::GetLogRetentionSizeForIndex(min_log_index, idx_size_map), 111);
+  EXPECT_EQ(Tablet::GetReplaySizeForIndex(min_log_index, replay_size_map), 45);
 
   // A value at the end of the first segment also retains everything.
-  min_log_index = 3;
-  ASSERT_EQ(Tablet::GetLogRetentionSizeForIndex(min_log_index, idx_size_map), 111);
+  min_log_index = 100;
+  EXPECT_EQ(Tablet::GetReplaySizeForIndex(min_log_index, replay_size_map), 45);
 
   // Beginning of second segment, only retain that one and the next.
-  min_log_index = 4;
-  ASSERT_EQ(Tablet::GetLogRetentionSizeForIndex(min_log_index, idx_size_map), 110);
+  min_log_index = 101;
+  EXPECT_EQ(Tablet::GetReplaySizeForIndex(min_log_index, replay_size_map), 25);
 
   // Beginning of third segment, only retain that one.
-  min_log_index = 7;
-  ASSERT_EQ(Tablet::GetLogRetentionSizeForIndex(min_log_index, idx_size_map), 100);
+  min_log_index = 201;
+  EXPECT_EQ(Tablet::GetReplaySizeForIndex(min_log_index, replay_size_map), 10);
 
   // A value after all the passed segments, doesn't retain anything.
-  min_log_index = 10;
-  ASSERT_EQ(Tablet::GetLogRetentionSizeForIndex(min_log_index, idx_size_map), 0);
+  min_log_index = 301;
+  EXPECT_EQ(Tablet::GetReplaySizeForIndex(min_log_index, replay_size_map), 0);
 }
 
 } // namespace tablet

http://git-wip-us.apache.org/repos/asf/kudu/blob/127438af/src/kudu/tablet/tablet.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet.cc b/src/kudu/tablet/tablet.cc
index a0c3f3a..0185619 100644
--- a/src/kudu/tablet/tablet.cc
+++ b/src/kudu/tablet/tablet.cc
@@ -1421,12 +1421,11 @@ bool Tablet::MemRowSetEmpty() const {
   return comps->memrowset->empty();
 }
 
-size_t Tablet::MemRowSetLogRetentionSize(const MaxIdxToSegmentMap& max_idx_to_segment_size)
const {
+size_t Tablet::MemRowSetLogReplaySize(const ReplaySizeMap& replay_size_map) const {
   scoped_refptr<TabletComponents> comps;
   GetComponents(&comps);
 
-  return GetLogRetentionSizeForIndex(comps->memrowset->MinUnflushedLogIndex(),
-                                     max_idx_to_segment_size);
+  return GetReplaySizeForIndex(comps->memrowset->MinUnflushedLogIndex(), replay_size_map);
 }
 
 size_t Tablet::EstimateOnDiskSize() const {
@@ -1468,31 +1467,29 @@ bool Tablet::DeltaMemRowSetEmpty() const {
   return true;
 }
 
-void Tablet::GetInfoForBestDMSToFlush(const MaxIdxToSegmentMap& max_idx_to_segment_size,
-                                      int64_t* mem_size, int64_t* retention_size) const {
-  shared_ptr<RowSet> rowset = FindBestDMSToFlush(max_idx_to_segment_size);
+void Tablet::GetInfoForBestDMSToFlush(const ReplaySizeMap& replay_size_map,
+                                      int64_t* mem_size, int64_t* replay_size) const {
+  shared_ptr<RowSet> rowset = FindBestDMSToFlush(replay_size_map);
 
   if (rowset) {
-    *retention_size = GetLogRetentionSizeForIndex(rowset->MinUnflushedLogIndex(),
-                                            max_idx_to_segment_size);
+    *replay_size = GetReplaySizeForIndex(rowset->MinUnflushedLogIndex(),
+                                         replay_size_map);
     *mem_size = rowset->DeltaMemStoreSize();
   } else {
-    *retention_size = 0;
+    *replay_size = 0;
     *mem_size = 0;
   }
 }
 
-Status Tablet::FlushDMSWithHighestRetention(const MaxIdxToSegmentMap&
-                                            max_idx_to_segment_size) const {
-  shared_ptr<RowSet> rowset = FindBestDMSToFlush(max_idx_to_segment_size);
+Status Tablet::FlushDMSWithHighestRetention(const ReplaySizeMap& replay_size_map) const
{
+  shared_ptr<RowSet> rowset = FindBestDMSToFlush(replay_size_map);
   if (rowset) {
     return rowset->FlushDeltas();
   }
   return Status::OK();
 }
 
-shared_ptr<RowSet> Tablet::FindBestDMSToFlush(const MaxIdxToSegmentMap&
-                                              max_idx_to_segment_size) const {
+shared_ptr<RowSet> Tablet::FindBestDMSToFlush(const ReplaySizeMap& replay_size_map)
const {
   scoped_refptr<TabletComponents> comps;
   GetComponents(&comps);
   int64_t mem_size = 0;
@@ -1502,8 +1499,8 @@ shared_ptr<RowSet> Tablet::FindBestDMSToFlush(const MaxIdxToSegmentMap&
     if (rowset->DeltaMemStoreEmpty()) {
       continue;
     }
-    int64_t size = GetLogRetentionSizeForIndex(rowset->MinUnflushedLogIndex(),
-                                               max_idx_to_segment_size);
+    int64_t size = GetReplaySizeForIndex(rowset->MinUnflushedLogIndex(),
+                                         replay_size_map);
     if ((size > retention_size) ||
         (size == retention_size &&
          (rowset->DeltaMemStoreSize() > mem_size))) {
@@ -1515,19 +1512,19 @@ shared_ptr<RowSet> Tablet::FindBestDMSToFlush(const MaxIdxToSegmentMap&
   return best_dms;
 }
 
-int64_t Tablet::GetLogRetentionSizeForIndex(int64_t min_log_index,
-                                            const MaxIdxToSegmentMap& max_idx_to_segment_size)
{
-  if (max_idx_to_segment_size.size() == 0 || min_log_index == -1) {
+int64_t Tablet::GetReplaySizeForIndex(int64_t min_log_index,
+                                      const ReplaySizeMap& size_map) {
+  // If min_log_index is -1, that indicates that there is no anchor held
+  // for the tablet, and therefore no logs would need to be replayed.
+  if (size_map.empty() || min_log_index == -1) {
     return 0;
   }
-  int64_t total_size = 0;
-  for (const MaxIdxToSegmentMap::value_type& entry : max_idx_to_segment_size) {
-    if (min_log_index > entry.first) {
-      continue; // We're not in this segment, probably someone else is retaining it.
-    }
-    total_size += entry.second;
+
+  const auto& it = size_map.lower_bound(min_log_index);
+  if (it == size_map.end()) {
+    return 0;
   }
-  return total_size;
+  return it->second;
 }
 
 Status Tablet::FlushBiggestDMS() {

http://git-wip-us.apache.org/repos/asf/kudu/blob/127438af/src/kudu/tablet/tablet.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet.h b/src/kudu/tablet/tablet.h
index 72b08f4..fcc1c01 100644
--- a/src/kudu/tablet/tablet.h
+++ b/src/kudu/tablet/tablet.h
@@ -76,7 +76,7 @@ class WriteTransactionState;
 
 class Tablet {
  public:
-  typedef std::map<int64_t, int64_t> MaxIdxToSegmentMap;
+  typedef std::map<int64_t, int64_t> ReplaySizeMap;
   friend class CompactRowSetsOp;
   friend class FlushMRSOp;
 
@@ -260,8 +260,9 @@ class Tablet {
   // This method takes a read lock on component_lock_ and is thread-safe.
   bool MemRowSetEmpty() const;
 
-  // Returns the size in bytes for the MRS's log retention.
-  size_t MemRowSetLogRetentionSize(const MaxIdxToSegmentMap& max_idx_to_segment_size)
const;
+  // Returns the size in bytes of WALs that would need to be replayed to restore
+  // the current MRS.
+  size_t MemRowSetLogReplaySize(const ReplaySizeMap& replay_size_map) const;
 
   // Estimate the total on-disk size of this tablet, in bytes.
   size_t EstimateOnDiskSize() const;
@@ -272,13 +273,13 @@ class Tablet {
   // Same as MemRowSetEmpty(), but for the DMS.
   bool DeltaMemRowSetEmpty() const;
 
-  // Fills in the in-memory size and retention size in bytes for the DMS with the
+  // Fills in the in-memory size and replay size in bytes for the DMS with the
   // highest retention.
-  void GetInfoForBestDMSToFlush(const MaxIdxToSegmentMap& max_idx_to_segment_size,
-                                int64_t* mem_size, int64_t* retention_size) const;
+  void GetInfoForBestDMSToFlush(const ReplaySizeMap& replay_size_map,
+                                int64_t* mem_size, int64_t* replay_size) const;
 
   // Flushes the DMS with the highest retention.
-  Status FlushDMSWithHighestRetention(const MaxIdxToSegmentMap& max_idx_to_segment_size)
const;
+  Status FlushDMSWithHighestRetention(const ReplaySizeMap& replay_size_map) const;
 
   // Flush only the biggest DMS
   Status FlushBiggestDMS();
@@ -389,7 +390,7 @@ class Tablet {
  private:
   friend class Iterator;
   friend class TabletPeerTest;
-  FRIEND_TEST(TestTablet, TestGetLogRetentionSizeForIndex);
+  FRIEND_TEST(TestTablet, TestGetReplaySizeForIndex);
 
   Status FlushUnlocked();
 
@@ -490,15 +491,15 @@ class Tablet {
   Status GetMappedReadProjection(const Schema& projection,
                                  Schema *mapped_projection) const;
 
-  Status CheckRowInTablet(const ConstContiguousRow& probe) const;
+  Status CheckRowInTablet(const ConstContiguousRow& row) const;
 
   // Helper method to find the rowset that has the DMS with the highest retention.
-  std::shared_ptr<RowSet> FindBestDMSToFlush(
-      const MaxIdxToSegmentMap& max_idx_to_segment_size) const;
+  std::shared_ptr<RowSet> FindBestDMSToFlush(const ReplaySizeMap& replay_size_map)
const;
 
-  // Helper method to find how many bytes this index retains.
-  static int64_t GetLogRetentionSizeForIndex(int64_t min_log_index,
-                                             const MaxIdxToSegmentMap& max_idx_to_segment_size);
+  // Helper method to find how many bytes need to be replayed to restore in-memory
+  // state from this index.
+  static int64_t GetReplaySizeForIndex(int64_t min_log_index,
+                                       const ReplaySizeMap& size_map);
 
   std::string LogPrefix() const;
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/127438af/src/kudu/tablet/tablet_peer.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_peer.cc b/src/kudu/tablet/tablet_peer.cc
index ae6ac91..46ce6ec 100644
--- a/src/kudu/tablet/tablet_peer.cc
+++ b/src/kudu/tablet/tablet_peer.cc
@@ -54,6 +54,7 @@
 #include "kudu/util/threadpool.h"
 #include "kudu/util/trace.h"
 
+using std::map;
 using std::shared_ptr;
 using std::unique_ptr;
 
@@ -474,16 +475,15 @@ log::RetentionIndexes TabletPeer::GetRetentionIndexes() const {
   return ret;
 }
 
-Status TabletPeer::GetMaxIndexesToSegmentSizeMap(MaxIdxToSegmentSizeMap* idx_size_map) const
{
+Status TabletPeer::GetReplaySizeMap(map<int64_t, int64_t>* replay_size_map) const {
   RETURN_NOT_OK(CheckRunning());
-  log::RetentionIndexes retention = GetRetentionIndexes();
-  log_->GetMaxIndexesToSegmentSizeMap(retention.for_durability, idx_size_map);
+  log_->GetReplaySizeMap(replay_size_map);
   return Status::OK();
 }
 
 Status TabletPeer::GetGCableDataSize(int64_t* retention_size) const {
   RETURN_NOT_OK(CheckRunning());
-  log_->GetGCableDataSize(GetRetentionIndexes(), retention_size);
+  *retention_size = log_->GetGCableDataSize(GetRetentionIndexes());
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/127438af/src/kudu/tablet/tablet_peer.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_peer.h b/src/kudu/tablet/tablet_peer.h
index c43c83a..762925b 100644
--- a/src/kudu/tablet/tablet_peer.h
+++ b/src/kudu/tablet/tablet_peer.h
@@ -79,8 +79,6 @@ class TabletPeer : public RefCountedThreadSafe<TabletPeer>,
                    public consensus::ReplicaTransactionFactory,
                    public TabletStatusListener {
  public:
-  typedef std::map<int64_t, int64_t> MaxIdxToSegmentSizeMap;
-
   TabletPeer(const scoped_refptr<TabletMetadata>& meta,
              const consensus::RaftPeerPB& local_peer_pb, ThreadPool* apply_pool,
              Callback<void(const std::string& reason)> mark_dirty_clbk);
@@ -204,11 +202,10 @@ class TabletPeer : public RefCountedThreadSafe<TabletPeer>,
   // Used for selection of log segments to delete during Log GC.
   log::RetentionIndexes GetRetentionIndexes() const;
 
-  // Returns a map of log index -> segment size, of all the segments that currently cannot
be GCed
-  // because in-memory structures have anchors in them.
+  // See Log::GetReplaySizeMap(...).
   //
   // Returns a non-ok status if the tablet isn't running.
-  Status GetMaxIndexesToSegmentSizeMap(MaxIdxToSegmentSizeMap* idx_size_map) const;
+  Status GetReplaySizeMap(std::map<int64_t, int64_t>* replay_size_map) const;
 
   // Returns the amount of bytes that would be GC'd if RunLogGC() was called.
   //

http://git-wip-us.apache.org/repos/asf/kudu/blob/127438af/src/kudu/tablet/tablet_peer_mm_ops.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_peer_mm_ops.cc b/src/kudu/tablet/tablet_peer_mm_ops.cc
index d116cda..7ccaddf 100644
--- a/src/kudu/tablet/tablet_peer_mm_ops.cc
+++ b/src/kudu/tablet/tablet_peer_mm_ops.cc
@@ -92,9 +92,9 @@ void FlushOpPerfImprovementPolicy::SetPerfImprovementForFlush(MaintenanceOpStats
 void FlushMRSOp::UpdateStats(MaintenanceOpStats* stats) {
   std::lock_guard<simple_spinlock> l(lock_);
 
-  map<int64_t, int64_t> max_idx_to_segment_size;
+  map<int64_t, int64_t> replay_size_map;
   if (tablet_peer_->tablet()->MemRowSetEmpty() ||
-      !tablet_peer_->GetMaxIndexesToSegmentSizeMap(&max_idx_to_segment_size).ok())
{
+      !tablet_peer_->GetReplaySizeMap(&replay_size_map).ok()) {
     return;
   }
 
@@ -105,9 +105,9 @@ void FlushMRSOp::UpdateStats(MaintenanceOpStats* stats) {
 
   stats->set_ram_anchored(tablet_peer_->tablet()->MemRowSetSize());
   stats->set_logs_retained_bytes(
-      tablet_peer_->tablet()->MemRowSetLogRetentionSize(max_idx_to_segment_size));
+      tablet_peer_->tablet()->MemRowSetLogReplaySize(replay_size_map));
 
-  // TODO: use workload statistics here to find out how "hot" the tablet has
+  // TODO(todd): use workload statistics here to find out how "hot" the tablet has
   // been in the last 5 minutes.
   FlushOpPerfImprovementPolicy::SetPerfImprovementForFlush(
       stats,
@@ -150,12 +150,12 @@ void FlushDeltaMemStoresOp::UpdateStats(MaintenanceOpStats* stats) {
   std::lock_guard<simple_spinlock> l(lock_);
   int64_t dms_size;
   int64_t retention_size;
-  map<int64_t, int64_t> max_idx_to_segment_size;
+  map<int64_t, int64_t> max_idx_to_replay_size;
   if (tablet_peer_->tablet()->DeltaMemRowSetEmpty() ||
-      !tablet_peer_->GetMaxIndexesToSegmentSizeMap(&max_idx_to_segment_size).ok())
{
+      !tablet_peer_->GetReplaySizeMap(&max_idx_to_replay_size).ok()) {
     return;
   }
-  tablet_peer_->tablet()->GetInfoForBestDMSToFlush(max_idx_to_segment_size,
+  tablet_peer_->tablet()->GetInfoForBestDMSToFlush(max_idx_to_replay_size,
                                                    &dms_size, &retention_size);
 
   stats->set_ram_anchored(dms_size);
@@ -168,13 +168,13 @@ void FlushDeltaMemStoresOp::UpdateStats(MaintenanceOpStats* stats) {
 }
 
 void FlushDeltaMemStoresOp::Perform() {
-  map<int64_t, int64_t> max_idx_to_segment_size;
-  if (!tablet_peer_->GetMaxIndexesToSegmentSizeMap(&max_idx_to_segment_size).ok())
{
+  map<int64_t, int64_t> max_idx_to_replay_size;
+  if (!tablet_peer_->GetReplaySizeMap(&max_idx_to_replay_size).ok()) {
     LOG(WARNING) << "Won't flush deltas since tablet shutting down: " << tablet_peer_->tablet_id();
     return;
   }
   KUDU_CHECK_OK_PREPEND(tablet_peer_->tablet()->FlushDMSWithHighestRetention(
-                            max_idx_to_segment_size),
+                            max_idx_to_replay_size),
                         Substitute("Failed to flush DMS on $0",
                                    tablet_peer_->tablet()->tablet_id()));
   {

http://git-wip-us.apache.org/repos/asf/kudu/blob/127438af/src/kudu/util/maintenance_manager-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/maintenance_manager-test.cc b/src/kudu/util/maintenance_manager-test.cc
index cb124af..282578b 100644
--- a/src/kudu/util/maintenance_manager-test.cc
+++ b/src/kudu/util/maintenance_manager-test.cc
@@ -43,6 +43,8 @@ METRIC_DEFINE_histogram(test, maintenance_op_duration,
                         "Maintenance Operation Duration",
                         kudu::MetricUnit::kSeconds, "", 60000000LU, 2);
 
+DECLARE_int64(log_target_replay_size_mb);
+
 namespace kudu {
 
 const int kHistorySize = 4;
@@ -249,19 +251,21 @@ TEST_F(MaintenanceManagerTest, TestMemoryPressure) {
 
 // Test that ops are prioritized correctly when we add log retention.
 TEST_F(MaintenanceManagerTest, TestLogRetentionPrioritization) {
+  const int64_t kMB = 1024 * 1024;
+
   manager_->Shutdown();
 
   TestMaintenanceOp op1("op1", MaintenanceOp::LOW_IO_USAGE, test_tracker_);
   op1.set_ram_anchored(0);
-  op1.set_logs_retained_bytes(100);
+  op1.set_logs_retained_bytes(100 * kMB);
 
   TestMaintenanceOp op2("op2", MaintenanceOp::HIGH_IO_USAGE, test_tracker_);
   op2.set_ram_anchored(100);
-  op2.set_logs_retained_bytes(100);
+  op2.set_logs_retained_bytes(100 * kMB);
 
   TestMaintenanceOp op3("op3", MaintenanceOp::HIGH_IO_USAGE, test_tracker_);
   op3.set_ram_anchored(200);
-  op3.set_logs_retained_bytes(100);
+  op3.set_logs_retained_bytes(100 * kMB);
 
   manager_->RegisterOp(&op1);
   manager_->RegisterOp(&op2);
@@ -272,7 +276,13 @@ TEST_F(MaintenanceManagerTest, TestLogRetentionPrioritization) {
 
   manager_->UnregisterOp(&op1);
 
-  // Low IO is taken care of, now we find the op clears the most log retention and ram.
+  // Low IO is taken care of, now we find the op that clears the most log retention and ram.
+  // However, with the default settings, we won't bother running any of these operations
+  // which only retain 100MB of logs.
+  ASSERT_EQ(nullptr, manager_->FindBestOp());
+
+  // If we change the target WAL size, we will select these ops.
+  FLAGS_log_target_replay_size_mb = 50;
   ASSERT_EQ(&op3, manager_->FindBestOp());
 
   manager_->UnregisterOp(&op3);

http://git-wip-us.apache.org/repos/asf/kudu/blob/127438af/src/kudu/util/maintenance_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/maintenance_manager.cc b/src/kudu/util/maintenance_manager.cc
index 11fa614..973849a 100644
--- a/src/kudu/util/maintenance_manager.cc
+++ b/src/kudu/util/maintenance_manager.cc
@@ -57,6 +57,13 @@ DEFINE_bool(enable_maintenance_manager, true,
        "Enable the maintenance manager, runs compaction and tablet cleaning tasks.");
 TAG_FLAG(enable_maintenance_manager, unsafe);
 
+DEFINE_int64(log_target_replay_size_mb, 1024,
+             "The target maximum size of logs to be replayed at startup. If a tablet "
+             "has in-memory operations that are causing more than this size of logs "
+             "to be retained, then the maintenance manager will prioritize flushing "
+             "these operations to disk.");
+TAG_FLAG(log_target_replay_size_mb, experimental);
+
 namespace kudu {
 
 MaintenanceOpStats::MaintenanceOpStats() {
@@ -229,8 +236,9 @@ void MaintenanceManager::RunSchedulerThread() {
 // - If there's an Op that we can run quickly that frees log retention, we run it.
 // - If we've hit the overall process memory limit (note: this includes memory that the Ops
cannot
 //   free), we run the Op with the highest RAM usage.
-// - If there are Ops that retain logs, we run the one that has the highest retention (and
if many
-//   qualify, then we run the one that also frees up the most RAM).
+// - If there are Ops that are retaining logs past our target replay size, we run the one
that has
+//   the highest retention (and if many qualify, then we run the one that also frees up the
+//   most RAM).
 // - Finally, if there's nothing else that we really need to do, we run the Op that will
improve
 //   performance the most.
 //
@@ -334,11 +342,12 @@ MaintenanceOp* MaintenanceManager::FindBestOp() {
     return most_mem_anchored_op;
   }
 
-  if (most_logs_retained_bytes_op) {
+  if (most_logs_retained_bytes_op &&
+      most_logs_retained_bytes / 1024 / 1024 >= FLAGS_log_target_replay_size_mb) {
     VLOG_AND_TRACE("maintenance", 1)
             << "Performing " << most_logs_retained_bytes_op->name() <<
", "
-            << "because it can free up more logs " << "at " << most_logs_retained_bytes
-            << " bytes";
+            << "because it can free up more logs (" << most_logs_retained_bytes
+            << " bytes)";
     return most_logs_retained_bytes_op;
   }
 


Mime
View raw message