From commits-return-7356-archive-asf-public=cust-asf.ponee.io@kudu.apache.org Fri May 3 01:38:58 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id B5E7018067E for ; Fri, 3 May 2019 03:38:57 +0200 (CEST) Received: (qmail 56406 invoked by uid 500); 3 May 2019 01:38:57 -0000 Mailing-List: contact commits-help@kudu.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kudu.apache.org Delivered-To: mailing list commits@kudu.apache.org Received: (qmail 56383 invoked by uid 99); 3 May 2019 01:38:56 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 03 May 2019 01:38:56 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 678A4871DB; Fri, 3 May 2019 01:38:56 +0000 (UTC) Date: Fri, 03 May 2019 01:38:57 +0000 To: "commits@kudu.apache.org" Subject: [kudu] 01/02: [TTL cache] add periodic scrubbing thread MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit From: alexey@apache.org In-Reply-To: <155684753634.14828.15047862741160821136@gitbox.apache.org> References: <155684753634.14828.15047862741160821136@gitbox.apache.org> X-Git-Host: gitbox.apache.org X-Git-Repo: kudu X-Git-Refname: refs/heads/master X-Git-Reftype: branch X-Git-Rev: 806d7ba4f725a599ac80355de4f6ee21ad7905af X-Git-NotificationType: diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated Message-Id: <20190503013856.678A4871DB@gitbox.apache.org> This is an automated email from the ASF dual-hosted git repository. alexey pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git commit 806d7ba4f725a599ac80355de4f6ee21ad7905af Author: Alexey Serbin AuthorDate: Tue Apr 30 16:00:36 2019 -0700 [TTL cache] add periodic scrubbing thread This patch adds an optional periodic scrubbing thread for the TTLCache. If enabled, the thread invalidates expired entries, effectively evicting them from the cache. With this patch, the TTLCache evicts expired entries not only during the insertion of new ones if the cache is at its capacity, but also upon detection of an entry's expiry by the scrubbing thread. Added corresponding test coverage as well. Change-Id: Ib3bbbcf6211930a5501a03d6a47f3d911118946c Reviewed-on: http://gerrit.cloudera.org:8080/13201 Reviewed-by: Andrew Wong Tested-by: Alexey Serbin --- src/kudu/util/cache.h | 4 +- src/kudu/util/ttl_cache-test.cc | 183 +++++++++++++++++++++++++++++++++++++++- src/kudu/util/ttl_cache.h | 102 ++++++++++++++++++++-- 3 files changed, 276 insertions(+), 13 deletions(-) diff --git a/src/kudu/util/cache.h b/src/kudu/util/cache.h index 32213cc..628b452 100644 --- a/src/kudu/util/cache.h +++ b/src/kudu/util/cache.h @@ -239,8 +239,8 @@ class Cache { struct InvalidationControl; // Invalidate cache's entries, effectively evicting non-valid ones from the - // cache. The invalidation process iterates over cache's recency list(s), - // from the oldest (less relevant) entries to the newest (more relevant) ones. + // cache. The invalidation process iterates over the cache's recency list(s), + // from best candidate for eviction to the worst. // // The provided control structure 'ctl' is responsible for the following: // * determine whether an entry is valid or not diff --git a/src/kudu/util/ttl_cache-test.cc b/src/kudu/util/ttl_cache-test.cc index e81991c..821a066 100644 --- a/src/kudu/util/ttl_cache-test.cc +++ b/src/kudu/util/ttl_cache-test.cc @@ -17,23 +17,29 @@ #include "kudu/util/ttl_cache.h" -#include - +#include +#include #include +#include +#include #include +#include #include -#include +#include +#include #include #include #include #include "kudu/gutil/integral_types.h" +#include "kudu/gutil/map-util.h" #include "kudu/gutil/ref_counted.h" #include "kudu/gutil/strings/substitute.h" #include "kudu/util/cache.h" #include "kudu/util/metrics.h" #include "kudu/util/monotime.h" +#include "kudu/util/scoped_cleanup.h" #include "kudu/util/slice.h" #include "kudu/util/test_macros.h" #include "kudu/util/test_util.h" @@ -52,8 +58,12 @@ METRIC_DECLARE_counter(test_ttl_cache_lookups); METRIC_DECLARE_counter(test_ttl_cache_misses); METRIC_DECLARE_gauge_uint64(test_ttl_cache_memory_usage); +using std::atomic; +using std::set; using std::string; +using std::thread; using std::unique_ptr; +using std::vector; using strings::Substitute; namespace kudu { @@ -373,7 +383,7 @@ TEST_F(TTLCacheTest, Basic) { ASSERT_EQ(cache_capacity + 1, GetCacheUsage()); } -// Test invalidation of expired entries in the underlying cache. +// Test the invalidation of expired entries in the underlying cache. TEST_F(TTLCacheTest, InvalidationOfExpiredEntries) { constexpr size_t cache_capacity = 512; const auto entry_ttl = MonoDelta::FromMilliseconds(250); @@ -422,4 +432,169 @@ TEST_F(TTLCacheTest, InvalidationOfExpiredEntries) { ASSERT_EQ(cache_capacity / 2, GetCacheUsage()); } +// Verify the auto-invalidation of expired entries in TTLCache work as expected: +// expired entries are removed with the expecting timing and valid entries stay. +TEST_F(TTLCacheTest, AutoInvalidationOfEntries) { + constexpr size_t kCacheCapacity = 128; + constexpr size_t kHalfCacheCapacity = kCacheCapacity / 2; + const auto kEntryTtl = MonoDelta::FromMilliseconds(256); + const auto kScrubInterval = MonoDelta::FromMilliseconds( + kEntryTtl.ToMilliseconds() / 8); + const auto kEntryTtlThreeQuarters = MonoDelta::FromMilliseconds( + kEntryTtl.ToMilliseconds() * 3 / 4); + + TTLTestCache cache(kCacheCapacity, kEntryTtl, kScrubInterval); + { + unique_ptr metrics( + new TTLCacheTestMetrics(metric_entity_)); + cache.SetMetrics(std::move(metrics)); + } + + // Make sure the scrubber doesn't conflate non-expired entries with expired + // ones while scrubbing the cache. + for (auto i = 0; i < kHalfCacheCapacity; ++i) { + cache.Put(Substitute("1$0", i), unique_ptr(new TestValue(i)), 1); + } + SleepFor(kEntryTtlThreeQuarters); + for (auto i = 0; i < kHalfCacheCapacity; ++i) { + cache.Put(Substitute("2$0", i), unique_ptr(new TestValue(i)), 1); + } + ASSERT_EQ(kCacheCapacity, GetCacheUsage()); + SleepFor(kEntryTtlThreeQuarters); + + // Apart from the OS scheduler's anomalies, the scrubbing thread should run + // at least once at this point. Given the timing of adding the entries into + // the cache, half of the entries in the cache should have been invalidated, + // while the other half should stay. + ASSERT_EQ(kHalfCacheCapacity, GetCacheEvictionsExpired()); + ASSERT_EQ(kHalfCacheCapacity, GetCacheUsage()); + + // Eventually, nothing should be in the cache. + ASSERT_EVENTUALLY([&]{ + ASSERT_EQ(0, GetCacheUsage()); + ASSERT_EQ(kCacheCapacity, GetCacheEvictionsExpired()); + }); +} + +// Verify the auto-invalidation of expired entries in TTLCache work as +// expected in the presence of oustanding handles to the entries which +// are subject to invalidation by the scrubbing thread. +TEST_F(TTLCacheTest, AutoInvalidationOfEntriesWithOutstandingReferences) { + constexpr size_t kCacheCapacity = 512; + constexpr size_t kHalfCacheCapacity = kCacheCapacity / 2; + const auto kEntryTtl = MonoDelta::FromMilliseconds(50); + const auto kScrubInterval = MonoDelta::FromMilliseconds( + kEntryTtl.ToMilliseconds() / 2); + + // A TTL cache with auto-scrubbing thread that gets rid of expired entries + // with the frequency corresponding to a half of the TTL interval. + TTLTestCache cache(kCacheCapacity, kEntryTtl, kScrubInterval); + { + unique_ptr metrics( + new TTLCacheTestMetrics(metric_entity_)); + cache.SetMetrics(std::move(metrics)); + } + + { + const set selected_keys = { + "0", "2", "9", "99", "127", "128", "252", "255", }; + vector selected_handles; + selected_handles.reserve(selected_keys.size()); + for (auto i = 0; i < kHalfCacheCapacity; ++i) { + const auto key = Substitute("$0", i); + auto h = cache.Put(key, unique_ptr(new TestValue), 1); + if (ContainsKey(selected_keys, key)) { + selected_handles.emplace_back(std::move(h)); + } + } + ASSERT_EQ(selected_keys.size(), selected_handles.size()); + + // All the entries except for the ones selected should be gone: + // the background thread should get rid all the expired entries. + // The 'selected_handles' still keep references to the selected entries, + // so those should not be deallocated yet. + ASSERT_EVENTUALLY([&]{ + ASSERT_EQ(kHalfCacheCapacity - selected_keys.size(), + GetCacheEvictionsExpired()); + ASSERT_EQ(selected_keys.size(), GetCacheUsage()); + }); + + // However, even if the selected entries are not yet deallocated, + // they must have been removed from the cache's lookup table + // once the scrubbing thread invalidated them. + for (const auto& key : selected_keys) { + ASSERT_FALSE(cache.Get(key)); + } + } + // Now, once the handles for the selected entries are gone out of scope, + // the cache should be empty. + ASSERT_EQ(kHalfCacheCapacity, GetCacheEvictionsExpired()); + ASSERT_EQ(0, GetCacheUsage()); +} + +// Verify how the auto-invalidation of expired entries work in the presence +// of concurrent read access to the entries. Also, verify that the limit on the +// number of maximum number of processed entries per one run of the scrubbing +// thread applies as expected. +TEST_F(TTLCacheTest, AutoInvalidationOfEntriesLimitPerPass) { + constexpr size_t kCacheCapacity = 1024; + constexpr size_t kScrubMaxEntriesPerPass = 64; + const auto kEntryTtl = MonoDelta::FromMilliseconds(100); + const auto kScrubInterval = kEntryTtl; + const auto kNumRunnerThreads = 64; + + // A TTL cache with auto-scrubbing thread that gets rid of expired entries. + // The amount of scrubbed entries per pass is limited. + TTLTestCache cache(kCacheCapacity, kEntryTtl, + kScrubInterval, kScrubMaxEntriesPerPass); + { + unique_ptr metrics( + new TTLCacheTestMetrics(metric_entity_)); + cache.SetMetrics(std::move(metrics)); + } + + atomic stop(false); + vector threads; + SCOPED_CLEANUP({ + stop = true; + for (auto& thread : threads) { + thread.join(); + } + }); + + for (auto i = 0; i < kNumRunnerThreads; ++i) { + threads.emplace_back([&cache, &stop] () { + while (!stop) { + const auto key = std::to_string(rand() % kCacheCapacity); + auto h = cache.Get(key); + // Keep the handle around for some time. + SleepFor(MonoDelta::FromNanoseconds(rand() % 5)); + } + }); + } + + const auto start_time = MonoTime::Now(); + for (auto i = 0; i < kCacheCapacity; ++i) { + cache.Put(std::to_string(i), unique_ptr(new TestValue(i)), 1); + } + + const auto scrub_interval_ms = kScrubInterval.ToMilliseconds(); + for (auto i = 0; i < kCacheCapacity / kScrubMaxEntriesPerPass; ++i) { + const auto evictions_expired_num = GetCacheEvictionsExpired(); + const auto duration_ms = (MonoTime::Now() - start_time).ToMilliseconds(); + const auto runs_num = 1 + + (duration_ms + scrub_interval_ms - 1) / scrub_interval_ms; + // No more than the specified number of entries should be evicted per one + // pass of the scrubbing thread. + ASSERT_LE(evictions_expired_num, runs_num * kScrubMaxEntriesPerPass); + SleepFor(kScrubInterval); + } + stop = true; + + ASSERT_EVENTUALLY([&]{ + ASSERT_EQ(kCacheCapacity, GetCacheEvictionsExpired()); + ASSERT_EQ(0, GetCacheUsage()); + }); +} + } // namespace kudu diff --git a/src/kudu/util/ttl_cache.h b/src/kudu/util/ttl_cache.h index 65911a8..29516a4 100644 --- a/src/kudu/util/ttl_cache.h +++ b/src/kudu/util/ttl_cache.h @@ -33,6 +33,8 @@ #include "kudu/util/metrics.h" #include "kudu/util/monotime.h" #include "kudu/util/slice.h" +#include "kudu/util/status.h" +#include "kudu/util/thread.h" #include "kudu/util/ttl_cache_metrics.h" namespace kudu { @@ -111,24 +113,45 @@ class TTLCache { Cache::UniqueHandle handle_; }; - // Create a new TTL cache with the specified capacity and name. The cache's - // metric gauges are attached to the metric entity specified via the 'entity' - // parameter if the parameter is non-null. + // Create a new TTL cache with the specified capacity and name. All the + // entries put into the cache have the same TTL specified by the 'entry_ttl' + // parameter. If the 'scrubbing_period' parameter is provided with valid + // time interval, the cache starts a periodic scrubbing thread that evicts + // expired entries from the cache. TTLCache(size_t capacity_bytes, MonoDelta entry_ttl, - const std::string& cache_name = "") + MonoDelta scrubbing_period = {}, + size_t max_scrubbed_entries_per_pass_num = 0, + const std::string& cache_name = "ttl-cache") : entry_ttl_(entry_ttl), + scrubbing_period_(scrubbing_period), + max_scrubbed_entries_per_pass_num_(max_scrubbed_entries_per_pass_num), metrics_(nullptr), eviction_cb_(new EvictionCallback(this)), cache_(NewCache(capacity_bytes, - cache_name)) { + cache_name)), + scrubbing_thread_running_(0) { VLOG(1) << strings::Substitute( "constructed TTL cache '$0' with capacity of $1", cache_name, capacity_bytes); + if (scrubbing_period_.Initialized()) { + scrubbing_thread_running_.Reset(1); + CHECK_OK(Thread::Create( + "cache", strings::Substitute("$0-scrubbing", cache_name), + &TTLCache::ScrubExpiredEntries, this, &scrubbing_thread_)); + VLOG(1) << strings::Substitute( + "started scrubbing thread for TTL cache '$0' with period of $1", + cache_name, scrubbing_period_.ToString()); + } } // This class is not intended to be inherited from. - ~TTLCache() = default; + ~TTLCache() { + if (scrubbing_period_.Initialized()) { + scrubbing_thread_running_.CountDown(); + scrubbing_thread_->Join(); + } + } // Retrieve an entry from the cache. If a non-expired entry exists // for the specified key, this method returns corresponding handle. @@ -232,15 +255,71 @@ class TTLCache { DISALLOW_COPY_AND_ASSIGN(EvictionCallback); }; + // Periodically search for expired entries in the cache and remove them. + // This method is called from a separate thread 'scrubbing_thread_'. + void ScrubExpiredEntries() { + MonoTime time_now; + + // TODO(aserbin): clarify why making this 'static const' behaves + // incorrectly when compiled with LLVM 6.0.0 at CentOS 6.6 + // with devtoolset-3. However, it works as intended when + // compiled with LLVM 6.0.0 at Mac OS X 10.11.6. + const Cache::InvalidationControl ctl = { + [&time_now](Slice /* key */, Slice value) { + DCHECK_EQ(sizeof(Entry), value.size()); + const auto* entry = reinterpret_cast(value.data()); + // The entry expiration time is recorded in the entry's data. An entry + // is expired once current time passed the expiration time milestone. + return entry->exp_time > time_now; + }, + [this](size_t valid_entry_count, size_t invalid_entry_count) { + // The TTL cache arranges its recency list in a FIFO manner: the + // oldest entries are in the very beginning of the list. All entries + // have the same TTL. All the entries in the recency list past + // a non-expired one must be not yet expired as well. So, when + // searching for expired entries, it doesn't make sense to continue + // iterating over the recency list once a non-expired entry + // has been encountered. + return valid_entry_count == 0 && + (max_scrubbed_entries_per_pass_num_ == 0 || + invalid_entry_count < max_scrubbed_entries_per_pass_num_); + } + }; + + while (!scrubbing_thread_running_.WaitFor(scrubbing_period_)) { + // Capture current time once, so the validity functor wouldn't need + // to call MonoTime::Now() for every entry being processed. That also + // makes the invalidation logic consistent with the FIFO-ordered + // recency list of the underlying cache: once a non-expired entry is + // encountered, it's guaranteed all the entries past it are non-expired + // as well. With advancing 'now' the latter contract would be broken. + time_now = MonoTime::Now(); + + const auto count = cache_->Invalidate(ctl); + VLOG(2) << strings::Substitute("invalidated $0 expired entries", count); + } + } + // The validity interval for cached entries (see 'struct Entry' above). const MonoDelta entry_ttl_; + // The interval to run the 'scrubbing_thread_': that's the thread to scrub + // the cache of expired entries. + const MonoDelta scrubbing_period_; + + // The maximum number of processed entries per one pass of the scrubbing. + // The scrubbing of the underlying cache assumes locking access to the cache's + // recency list, increasing contention with the concurrent usage of the cache. + // Limiting the number of entries to invalidate at once while holding the + // lock helps to reduce the contention. + const size_t max_scrubbed_entries_per_pass_num_; + // A pointer to metrics specific to the TTL cache represented by this class. // The raw pointer is a duplicate of the pointer owned by the underlying FIFO // cache (see cache_ below). TTLCacheMetrics* metrics_; - // Invoked whenever a cached entry reaches zero references, i.e. it was + // Invoked whenever a cached entry reaches zero reference count, i.e. it was // removed from the cache and there aren't any other references // floating around. std::unique_ptr eviction_cb_; @@ -248,6 +327,15 @@ class TTLCache { // The underlying FIFO cache instance. std::unique_ptr cache_; + // A synchronization primitive to communicate on running conditions between + // the 'scrubbing_thread_' thread and the main one. + CountDownLatch scrubbing_thread_running_; + + // If 'scrubbing_period_' is set to a valid time interval, this thread + // periodically calls ScrubExpiredEntries() method until + // 'scrubbing_thread_running_' reaches 0. + scoped_refptr scrubbing_thread_; + DISALLOW_COPY_AND_ASSIGN(TTLCache); };