kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ale...@apache.org
Subject [kudu] 01/02: [TTL cache] add periodic scrubbing thread
Date Fri, 03 May 2019 01:38:57 GMT
This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 806d7ba4f725a599ac80355de4f6ee21ad7905af
Author: Alexey Serbin <alexey@apache.org>
AuthorDate: Tue Apr 30 16:00:36 2019 -0700

    [TTL cache] add periodic scrubbing thread
    
    This patch adds an optional periodic scrubbing thread for the TTLCache.
    If enabled, the thread invalidates expired entries, effectively evicting
    them from the cache.  With this patch, the TTLCache evicts expired
    entries not only during the insertion of new ones if the cache is at its
    capacity, but also upon detection of an entry's expiry by the scrubbing
    thread.
    
    Added corresponding test coverage as well.
    
    Change-Id: Ib3bbbcf6211930a5501a03d6a47f3d911118946c
    Reviewed-on: http://gerrit.cloudera.org:8080/13201
    Reviewed-by: Andrew Wong <awong@cloudera.com>
    Tested-by: Alexey Serbin <aserbin@cloudera.com>
---
 src/kudu/util/cache.h           |   4 +-
 src/kudu/util/ttl_cache-test.cc | 183 +++++++++++++++++++++++++++++++++++++++-
 src/kudu/util/ttl_cache.h       | 102 ++++++++++++++++++++--
 3 files changed, 276 insertions(+), 13 deletions(-)

diff --git a/src/kudu/util/cache.h b/src/kudu/util/cache.h
index 32213cc..628b452 100644
--- a/src/kudu/util/cache.h
+++ b/src/kudu/util/cache.h
@@ -239,8 +239,8 @@ class Cache {
   struct InvalidationControl;
 
   // Invalidate cache's entries, effectively evicting non-valid ones from the
-  // cache. The invalidation process iterates over cache's recency list(s),
-  // from the oldest (less relevant) entries to the newest (more relevant) ones.
+  // cache. The invalidation process iterates over the cache's recency list(s),
+  // from best candidate for eviction to the worst.
   //
   // The provided control structure 'ctl' is responsible for the following:
   //   * determine whether an entry is valid or not
diff --git a/src/kudu/util/ttl_cache-test.cc b/src/kudu/util/ttl_cache-test.cc
index e81991c..821a066 100644
--- a/src/kudu/util/ttl_cache-test.cc
+++ b/src/kudu/util/ttl_cache-test.cc
@@ -17,23 +17,29 @@
 
 #include "kudu/util/ttl_cache.h"
 
-#include <stdint.h>
-
+#include <algorithm>
+#include <atomic>
 #include <cstddef>
+#include <cstdint>
+#include <cstdlib>
 #include <memory>
+#include <set>
 #include <string>
-#include <utility>
+#include <thread>
+#include <vector>
 
 #include <gflags/gflags_declare.h>
 #include <glog/logging.h>
 #include <gtest/gtest.h>
 
 #include "kudu/gutil/integral_types.h"
+#include "kudu/gutil/map-util.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/util/cache.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/monotime.h"
+#include "kudu/util/scoped_cleanup.h"
 #include "kudu/util/slice.h"
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
@@ -52,8 +58,12 @@ METRIC_DECLARE_counter(test_ttl_cache_lookups);
 METRIC_DECLARE_counter(test_ttl_cache_misses);
 METRIC_DECLARE_gauge_uint64(test_ttl_cache_memory_usage);
 
+using std::atomic;
+using std::set;
 using std::string;
+using std::thread;
 using std::unique_ptr;
+using std::vector;
 using strings::Substitute;
 
 namespace kudu {
@@ -373,7 +383,7 @@ TEST_F(TTLCacheTest, Basic) {
   ASSERT_EQ(cache_capacity + 1, GetCacheUsage());
 }
 
-// Test invalidation of expired entries in the underlying cache.
+// Test the invalidation of expired entries in the underlying cache.
 TEST_F(TTLCacheTest, InvalidationOfExpiredEntries) {
   constexpr size_t cache_capacity = 512;
   const auto entry_ttl = MonoDelta::FromMilliseconds(250);
@@ -422,4 +432,169 @@ TEST_F(TTLCacheTest, InvalidationOfExpiredEntries) {
   ASSERT_EQ(cache_capacity / 2, GetCacheUsage());
 }
 
+// Verify the auto-invalidation of expired entries in TTLCache work as expected:
+// expired entries are removed with the expecting timing and valid entries stay.
+TEST_F(TTLCacheTest, AutoInvalidationOfEntries) {
+  constexpr size_t kCacheCapacity = 128;
+  constexpr size_t kHalfCacheCapacity = kCacheCapacity / 2;
+  const auto kEntryTtl = MonoDelta::FromMilliseconds(256);
+  const auto kScrubInterval = MonoDelta::FromMilliseconds(
+      kEntryTtl.ToMilliseconds() / 8);
+  const auto kEntryTtlThreeQuarters = MonoDelta::FromMilliseconds(
+      kEntryTtl.ToMilliseconds() * 3 / 4);
+
+  TTLTestCache cache(kCacheCapacity, kEntryTtl, kScrubInterval);
+  {
+    unique_ptr<TTLCacheTestMetrics> metrics(
+        new TTLCacheTestMetrics(metric_entity_));
+    cache.SetMetrics(std::move(metrics));
+  }
+
+  // Make sure the scrubber doesn't conflate non-expired entries with expired
+  // ones while scrubbing the cache.
+  for (auto i = 0; i < kHalfCacheCapacity; ++i) {
+    cache.Put(Substitute("1$0", i), unique_ptr<TestValue>(new TestValue(i)), 1);
+  }
+  SleepFor(kEntryTtlThreeQuarters);
+  for (auto i = 0; i < kHalfCacheCapacity; ++i) {
+    cache.Put(Substitute("2$0", i), unique_ptr<TestValue>(new TestValue(i)), 1);
+  }
+  ASSERT_EQ(kCacheCapacity, GetCacheUsage());
+  SleepFor(kEntryTtlThreeQuarters);
+
+  // Apart from the OS scheduler's anomalies, the scrubbing thread should run
+  // at least once at this point. Given the timing of adding the entries into
+  // the cache, half of the entries in the cache should have been invalidated,
+  // while the other half should stay.
+  ASSERT_EQ(kHalfCacheCapacity, GetCacheEvictionsExpired());
+  ASSERT_EQ(kHalfCacheCapacity, GetCacheUsage());
+
+  // Eventually, nothing should be in the cache.
+  ASSERT_EVENTUALLY([&]{
+    ASSERT_EQ(0, GetCacheUsage());
+    ASSERT_EQ(kCacheCapacity, GetCacheEvictionsExpired());
+  });
+}
+
+// Verify the auto-invalidation of expired entries in TTLCache work as
+// expected in the presence of oustanding handles to the entries which
+// are subject to invalidation by the scrubbing thread.
+TEST_F(TTLCacheTest, AutoInvalidationOfEntriesWithOutstandingReferences) {
+  constexpr size_t kCacheCapacity = 512;
+  constexpr size_t kHalfCacheCapacity = kCacheCapacity / 2;
+  const auto kEntryTtl = MonoDelta::FromMilliseconds(50);
+  const auto kScrubInterval = MonoDelta::FromMilliseconds(
+      kEntryTtl.ToMilliseconds() / 2);
+
+  // A TTL cache with auto-scrubbing thread that gets rid of expired entries
+  // with the frequency corresponding to a half of the TTL interval.
+  TTLTestCache cache(kCacheCapacity, kEntryTtl, kScrubInterval);
+  {
+    unique_ptr<TTLCacheTestMetrics> metrics(
+        new TTLCacheTestMetrics(metric_entity_));
+    cache.SetMetrics(std::move(metrics));
+  }
+
+  {
+    const set<string> selected_keys = {
+        "0", "2", "9", "99", "127", "128", "252", "255", };
+    vector<Handle> selected_handles;
+    selected_handles.reserve(selected_keys.size());
+    for (auto i = 0; i < kHalfCacheCapacity; ++i) {
+      const auto key = Substitute("$0", i);
+      auto h = cache.Put(key, unique_ptr<TestValue>(new TestValue), 1);
+      if (ContainsKey(selected_keys, key)) {
+        selected_handles.emplace_back(std::move(h));
+      }
+    }
+    ASSERT_EQ(selected_keys.size(), selected_handles.size());
+
+    // All the entries except for the ones selected should be gone:
+    // the background thread should get rid all the expired entries.
+    // The 'selected_handles' still keep references to the selected entries,
+    // so those should not be deallocated yet.
+    ASSERT_EVENTUALLY([&]{
+      ASSERT_EQ(kHalfCacheCapacity - selected_keys.size(),
+                GetCacheEvictionsExpired());
+      ASSERT_EQ(selected_keys.size(), GetCacheUsage());
+    });
+
+    // However, even if the selected entries are not yet deallocated,
+    // they must have been removed from the cache's lookup table
+    // once the scrubbing thread invalidated them.
+    for (const auto& key : selected_keys) {
+      ASSERT_FALSE(cache.Get(key));
+    }
+  }
+  // Now, once the handles for the selected entries are gone out of scope,
+  // the cache should be empty.
+  ASSERT_EQ(kHalfCacheCapacity, GetCacheEvictionsExpired());
+  ASSERT_EQ(0, GetCacheUsage());
+}
+
+// Verify how the auto-invalidation of expired entries work in the presence
+// of concurrent read access to the entries. Also, verify that the limit on the
+// number of maximum number of processed entries per one run of the scrubbing
+// thread applies as expected.
+TEST_F(TTLCacheTest, AutoInvalidationOfEntriesLimitPerPass) {
+  constexpr size_t kCacheCapacity = 1024;
+  constexpr size_t kScrubMaxEntriesPerPass = 64;
+  const auto kEntryTtl = MonoDelta::FromMilliseconds(100);
+  const auto kScrubInterval = kEntryTtl;
+  const auto kNumRunnerThreads = 64;
+
+  // A TTL cache with auto-scrubbing thread that gets rid of expired entries.
+  // The amount of scrubbed entries per pass is limited.
+  TTLTestCache cache(kCacheCapacity, kEntryTtl,
+                     kScrubInterval, kScrubMaxEntriesPerPass);
+  {
+    unique_ptr<TTLCacheTestMetrics> metrics(
+        new TTLCacheTestMetrics(metric_entity_));
+    cache.SetMetrics(std::move(metrics));
+  }
+
+  atomic<bool> stop(false);
+  vector<thread> threads;
+  SCOPED_CLEANUP({
+    stop = true;
+    for (auto& thread : threads) {
+      thread.join();
+    }
+  });
+
+  for (auto i = 0; i < kNumRunnerThreads; ++i) {
+    threads.emplace_back([&cache, &stop] () {
+      while (!stop) {
+        const auto key = std::to_string(rand() % kCacheCapacity);
+        auto h = cache.Get(key);
+        // Keep the handle around for some time.
+        SleepFor(MonoDelta::FromNanoseconds(rand() % 5));
+      }
+    });
+  }
+
+  const auto start_time = MonoTime::Now();
+  for (auto i = 0; i < kCacheCapacity; ++i) {
+    cache.Put(std::to_string(i), unique_ptr<TestValue>(new TestValue(i)), 1);
+  }
+
+  const auto scrub_interval_ms = kScrubInterval.ToMilliseconds();
+  for (auto i = 0; i < kCacheCapacity / kScrubMaxEntriesPerPass; ++i) {
+    const auto evictions_expired_num = GetCacheEvictionsExpired();
+    const auto duration_ms = (MonoTime::Now() - start_time).ToMilliseconds();
+    const auto runs_num = 1 +
+        (duration_ms + scrub_interval_ms - 1) / scrub_interval_ms;
+    // No more than the specified number of entries should be evicted per one
+    // pass of the scrubbing thread.
+    ASSERT_LE(evictions_expired_num, runs_num * kScrubMaxEntriesPerPass);
+    SleepFor(kScrubInterval);
+  }
+  stop = true;
+
+  ASSERT_EVENTUALLY([&]{
+    ASSERT_EQ(kCacheCapacity, GetCacheEvictionsExpired());
+    ASSERT_EQ(0, GetCacheUsage());
+  });
+}
+
 } // namespace kudu
diff --git a/src/kudu/util/ttl_cache.h b/src/kudu/util/ttl_cache.h
index 65911a8..29516a4 100644
--- a/src/kudu/util/ttl_cache.h
+++ b/src/kudu/util/ttl_cache.h
@@ -33,6 +33,8 @@
 #include "kudu/util/metrics.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/slice.h"
+#include "kudu/util/status.h"
+#include "kudu/util/thread.h"
 #include "kudu/util/ttl_cache_metrics.h"
 
 namespace kudu {
@@ -111,24 +113,45 @@ class TTLCache {
     Cache::UniqueHandle handle_;
   };
 
-  // Create a new TTL cache with the specified capacity and name. The cache's
-  // metric gauges are attached to the metric entity specified via the 'entity'
-  // parameter if the parameter is non-null.
+  // Create a new TTL cache with the specified capacity and name. All the
+  // entries put into the cache have the same TTL specified by the 'entry_ttl'
+  // parameter. If the 'scrubbing_period' parameter is provided with valid
+  // time interval, the cache starts a periodic scrubbing thread that evicts
+  // expired entries from the cache.
   TTLCache(size_t capacity_bytes,
            MonoDelta entry_ttl,
-           const std::string& cache_name = "")
+           MonoDelta scrubbing_period = {},
+           size_t max_scrubbed_entries_per_pass_num = 0,
+           const std::string& cache_name = "ttl-cache")
       : entry_ttl_(entry_ttl),
+        scrubbing_period_(scrubbing_period),
+        max_scrubbed_entries_per_pass_num_(max_scrubbed_entries_per_pass_num),
         metrics_(nullptr),
         eviction_cb_(new EvictionCallback(this)),
         cache_(NewCache<Cache::EvictionPolicy::FIFO>(capacity_bytes,
-                                                     cache_name)) {
+                                                     cache_name)),
+        scrubbing_thread_running_(0) {
     VLOG(1) << strings::Substitute(
         "constructed TTL cache '$0' with capacity of $1",
         cache_name, capacity_bytes);
+    if (scrubbing_period_.Initialized()) {
+      scrubbing_thread_running_.Reset(1);
+      CHECK_OK(Thread::Create(
+          "cache", strings::Substitute("$0-scrubbing", cache_name),
+          &TTLCache::ScrubExpiredEntries, this, &scrubbing_thread_));
+      VLOG(1) << strings::Substitute(
+          "started scrubbing thread for TTL cache '$0' with period of $1",
+          cache_name, scrubbing_period_.ToString());
+    }
   }
 
   // This class is not intended to be inherited from.
-  ~TTLCache() = default;
+  ~TTLCache() {
+    if (scrubbing_period_.Initialized()) {
+      scrubbing_thread_running_.CountDown();
+      scrubbing_thread_->Join();
+    }
+  }
 
   // Retrieve an entry from the cache. If a non-expired entry exists
   // for the specified key, this method returns corresponding handle.
@@ -232,15 +255,71 @@ class TTLCache {
     DISALLOW_COPY_AND_ASSIGN(EvictionCallback);
   };
 
+  // Periodically search for expired entries in the cache and remove them.
+  // This method is called from a separate thread 'scrubbing_thread_'.
+  void ScrubExpiredEntries() {
+    MonoTime time_now;
+
+    // TODO(aserbin): clarify why making this 'static const' behaves
+    //                incorrectly when compiled with LLVM 6.0.0 at CentOS 6.6
+    //                with devtoolset-3. However, it works as intended when
+    //                compiled with LLVM 6.0.0 at Mac OS X 10.11.6.
+    const Cache::InvalidationControl ctl = {
+      [&time_now](Slice /* key */, Slice value) {
+        DCHECK_EQ(sizeof(Entry), value.size());
+        const auto* entry = reinterpret_cast<const Entry*>(value.data());
+        // The entry expiration time is recorded in the entry's data. An entry
+        // is expired once current time passed the expiration time milestone.
+        return entry->exp_time > time_now;
+      },
+      [this](size_t valid_entry_count, size_t invalid_entry_count) {
+        // The TTL cache arranges its recency list in a FIFO manner: the
+        // oldest entries are in the very beginning of the list. All entries
+        // have the same TTL. All the entries in the recency list past
+        // a non-expired one must be not yet expired as well. So, when
+        // searching for expired entries, it doesn't make sense to continue
+        // iterating over the recency list once a non-expired entry
+        // has been encountered.
+        return valid_entry_count == 0 &&
+            (max_scrubbed_entries_per_pass_num_ == 0 ||
+             invalid_entry_count < max_scrubbed_entries_per_pass_num_);
+      }
+    };
+
+    while (!scrubbing_thread_running_.WaitFor(scrubbing_period_)) {
+      // Capture current time once, so the validity functor wouldn't need
+      // to call MonoTime::Now() for every entry being processed. That also
+      // makes the invalidation logic consistent with the FIFO-ordered
+      // recency list of the underlying cache: once a non-expired entry is
+      // encountered, it's guaranteed all the entries past it are non-expired
+      // as well. With advancing 'now' the latter contract would be broken.
+      time_now = MonoTime::Now();
+
+      const auto count = cache_->Invalidate(ctl);
+      VLOG(2) << strings::Substitute("invalidated $0 expired entries", count);
+    }
+  }
+
   // The validity interval for cached entries (see 'struct Entry' above).
   const MonoDelta entry_ttl_;
 
+  // The interval to run the 'scrubbing_thread_': that's the thread to scrub
+  // the cache of expired entries.
+  const MonoDelta scrubbing_period_;
+
+  // The maximum number of processed entries per one pass of the scrubbing.
+  // The scrubbing of the underlying cache assumes locking access to the cache's
+  // recency list, increasing contention with the concurrent usage of the cache.
+  // Limiting the number of entries to invalidate at once while holding the
+  // lock helps to reduce the contention.
+  const size_t max_scrubbed_entries_per_pass_num_;
+
   // A pointer to metrics specific to the TTL cache represented by this class.
   // The raw pointer is a duplicate of the pointer owned by the underlying FIFO
   // cache (see cache_ below).
   TTLCacheMetrics* metrics_;
 
-  // Invoked whenever a cached entry reaches zero references, i.e. it was
+  // Invoked whenever a cached entry reaches zero reference count, i.e. it was
   // removed from the cache and there aren't any other references
   // floating around.
   std::unique_ptr<Cache::EvictionCallback> eviction_cb_;
@@ -248,6 +327,15 @@ class TTLCache {
   // The underlying FIFO cache instance.
   std::unique_ptr<Cache> cache_;
 
+  // A synchronization primitive to communicate on running conditions between
+  // the 'scrubbing_thread_' thread and the main one.
+  CountDownLatch scrubbing_thread_running_;
+
+  // If 'scrubbing_period_' is set to a valid time interval, this thread
+  // periodically calls ScrubExpiredEntries() method until
+  // 'scrubbing_thread_running_' reaches 0.
+  scoped_refptr<Thread> scrubbing_thread_;
+
   DISALLOW_COPY_AND_ASSIGN(TTLCache);
 };
 


Mime
View raw message