Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id CF76F200CC8 for ; Fri, 9 Jun 2017 00:56:19 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id CE2CF160BD5; Thu, 8 Jun 2017 22:56:19 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id C5533160BE8 for ; Fri, 9 Jun 2017 00:56:18 +0200 (CEST) Received: (qmail 45598 invoked by uid 500); 8 Jun 2017 22:56:18 -0000 Mailing-List: contact commits-help@kudu.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kudu.apache.org Delivered-To: mailing list commits@kudu.apache.org Received: (qmail 45496 invoked by uid 99); 8 Jun 2017 22:56:17 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 08 Jun 2017 22:56:17 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id EE4F0E0230; Thu, 8 Jun 2017 22:56:16 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: todd@apache.org To: commits@kudu.apache.org Date: Thu, 08 Jun 2017 22:56:19 -0000 Message-Id: <33dd7bf2a5b74b8d8f389ccf974d36d1@git.apache.org> In-Reply-To: <687e0fc8d1834590a77f1d0396484956@git.apache.org> References: <687e0fc8d1834590a77f1d0396484956@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [4/5] kudu git commit: log_block_manager: fix corruption after re-opening compacted metadata archived-at: Thu, 08 Jun 2017 22:56:20 -0000 log_block_manager: fix corruption after re-opening compacted metadata This fixes an issue discovered on a cluster due to the following sequence of events: - a block manager compacts a metadata file while starting up - when it reopens the metadata file after replacing it with the compacted one, it gets a file_cache hit. Thus, the WritablePBContainer continues to write to the _deleted_ file instead of the compacted one. Metadata entries at this point are lost (which could cause block loss in the case of lost CREATE records, or dangling blocks in the case of lost DELETEs) - if the server continues to run for a while, the FD will be evicted from the cache and eventually re-opened. At that point, a further DELETE record could end up writing to an offset past the end of the file, since the write offset was incremented by the "lost" records above. - on the next restart, the metadata file would have a "gap" of zero bytes, which would surface as a checksum failure and failure to start up. The fix is relatively simple: when we replace the metadata file we need to invalidate and evict the cache entry so that when we "reopen", it actually starts appending to the _new_ file and not the old deleted one. The bulk of the changes here are to tests: - the stress test now enforces a minimum number of live blocks before it starts deleting them. It also more aggressively compacts, and has a smaller cache. With these changes, I was sometimes able to reproduce the issue. - A more targeted test issues a canned sequence of block creations and deletions that can reliably reproduce the above issue. Change-Id: I14b2c64685e24d27591258911db4aeb9e8020a4d Reviewed-on: http://gerrit.cloudera.org:8080/7113 Reviewed-by: Adar Dembo Tested-by: Kudu Jenkins Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/e77538b5 Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/e77538b5 Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/e77538b5 Branch: refs/heads/master Commit: e77538b541b112555ee62cff1506bf7fd1f0e461 Parents: 8be2a59 Author: Todd Lipcon Authored: Wed Jun 7 20:01:17 2017 -0700 Committer: Todd Lipcon Committed: Thu Jun 8 22:47:26 2017 +0000 ---------------------------------------------------------------------- src/kudu/fs/block_manager-stress-test.cc | 45 ++++++---- src/kudu/fs/log_block_manager-test.cc | 56 +++++++++++++ src/kudu/fs/log_block_manager.cc | 12 ++- src/kudu/util/file_cache-test.cc | 34 ++++++++ src/kudu/util/file_cache.cc | 114 +++++++++++++++----------- src/kudu/util/file_cache.h | 23 ++++++ 6 files changed, 218 insertions(+), 66 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/e77538b5/src/kudu/fs/block_manager-stress-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/fs/block_manager-stress-test.cc b/src/kudu/fs/block_manager-stress-test.cc index 458f3fe..30cc6a9 100644 --- a/src/kudu/fs/block_manager-stress-test.cc +++ b/src/kudu/fs/block_manager-stress-test.cc @@ -43,10 +43,13 @@ DECLARE_int64(block_manager_max_open_files); DECLARE_uint64(log_container_max_size); DECLARE_uint64(log_container_preallocate_bytes); -DEFINE_int32(test_duration_secs, 2, "Number of seconds to run the test"); +DEFINE_double(test_duration_secs, 2, "Number of seconds to run the test"); DEFINE_int32(num_writer_threads, 4, "Number of writer threads to run"); DEFINE_int32(num_reader_threads, 8, "Number of reader threads to run"); DEFINE_int32(num_deleter_threads, 1, "Number of deleter threads to run"); +DEFINE_int32(minimum_live_blocks_for_delete, 1000, + "If there are fewer than this number of live blocks, the deleter " + "threads will not delete any"); DEFINE_int32(block_group_size, 8, "Number of blocks to write per block " "group. Must be power of 2"); DEFINE_int32(block_group_bytes, 32 * 1024, @@ -104,13 +107,13 @@ class BlockManagerStressTest : public KuduTest { FLAGS_log_container_preallocate_bytes = 1 * 1024 * 1024; // Ensure the file cache is under stress too. - FLAGS_block_manager_max_open_files = 512; + FLAGS_block_manager_max_open_files = 32; // Maximize the amount of cleanup triggered by the extra space heuristic. FLAGS_log_container_excess_space_before_cleanup_fraction = 0.0; // Compact block manager metadata aggressively. - FLAGS_log_container_live_metadata_before_compact_ratio = 0.80; + FLAGS_log_container_live_metadata_before_compact_ratio = 0.99; if (FLAGS_block_manager_paths.empty()) { data_dirs_.push_back(test_dir_); @@ -147,7 +150,7 @@ class BlockManagerStressTest : public KuduTest { return new T(env_, opts); } - void RunTest(int secs) { + void RunTest(double secs) { LOG(INFO) << "Starting all threads"; this->StartThreads(); SleepFor(MonoDelta::FromSeconds(secs)); @@ -389,8 +392,15 @@ void BlockManagerStressTest::DeleterThread() { // Grab a block at random. BlockId to_delete; { - std::lock_guard l(lock_); - if (written_blocks_.empty()) { + std::unique_lock l(lock_); + // If we only have a small number of live blocks, don't delete any. + // This ensures that, when we restart, we always have a reasonable + // amount of data -- otherwise the deletion threads are likely to + // "keep up" with the writer threads and every restart will consist + // of a very small number of non-dead containers. + if (written_blocks_.size() < FLAGS_minimum_live_blocks_for_delete) { + l.unlock(); + SleepFor(MonoDelta::FromMilliseconds(10)); continue; } @@ -460,6 +470,8 @@ TYPED_TEST(BlockManagerStressTest, StressTest) { OverrideFlagForSlowTests("block_group_size", "16"); OverrideFlagForSlowTests("num_inconsistencies", "128"); + const int kNumStarts = 3; + if ((FLAGS_block_group_size & (FLAGS_block_group_size - 1)) != 0) { LOG(FATAL) << "block_group_size " << FLAGS_block_group_size << " is not a power of 2"; @@ -469,16 +481,19 @@ TYPED_TEST(BlockManagerStressTest, StressTest) { LOG(INFO) << "Running on fresh block manager"; checker.Start(); - this->RunTest(FLAGS_test_duration_secs / 2); + this->RunTest(FLAGS_test_duration_secs / kNumStarts); NO_FATALS(this->InjectNonFatalInconsistencies()); - LOG(INFO) << "Running on populated block manager"; - this->bm_.reset(this->CreateBlockManager()); - FsReport report; - ASSERT_OK(this->bm_->Open(&report)); - ASSERT_OK(this->bm_->dd_manager()->LoadDataDirGroupFromPB(this->test_tablet_name_, - this->test_group_pb_)); - ASSERT_OK(report.LogAndCheckForFatalErrors()); - this->RunTest(FLAGS_test_duration_secs / 2); + + for (int i = 1; i < kNumStarts; i++) { + LOG(INFO) << "Running on populated block manager (restart #" << i << ")"; + this->bm_.reset(this->CreateBlockManager()); + FsReport report; + ASSERT_OK(this->bm_->Open(&report)); + ASSERT_OK(this->bm_->dd_manager()->LoadDataDirGroupFromPB(this->test_tablet_name_, + this->test_group_pb_)); + ASSERT_OK(report.LogAndCheckForFatalErrors()); + this->RunTest(FLAGS_test_duration_secs / kNumStarts); + } checker.Stop(); LOG(INFO) << "Printing test totals"; http://git-wip-us.apache.org/repos/asf/kudu/blob/e77538b5/src/kudu/fs/log_block_manager-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/fs/log_block_manager-test.cc b/src/kudu/fs/log_block_manager-test.cc index f8f155a..f3f1cfe 100644 --- a/src/kudu/fs/log_block_manager-test.cc +++ b/src/kudu/fs/log_block_manager-test.cc @@ -44,6 +44,8 @@ using std::unordered_set; using std::vector; using strings::Substitute; +DECLARE_int64(block_manager_max_open_files); +DECLARE_bool(cache_force_single_shard); DECLARE_double(log_container_excess_space_before_cleanup_fraction); DECLARE_double(log_container_live_metadata_before_compact_ratio); DECLARE_int64(log_container_max_blocks); @@ -1198,5 +1200,59 @@ TEST_F(LogBlockManagerTest, TestCompactFullContainerMetadataAtStartup) { ASSERT_EQ(last_live_aligned_bytes, report.stats.live_block_bytes_aligned); } +// Regression test for a bug in which, after a metadata file was compacted, +// we would not properly handle appending to the new (post-compaction) metadata. +// +// The bug was related to a stale file descriptor left in the file_cache, so +// this test explicitly targets that scenario. +TEST_F(LogBlockManagerTest, TestDeleteFromContainerAfterMetadataCompaction) { + // Compact aggressively. + FLAGS_log_container_live_metadata_before_compact_ratio = 0.99; + // Use a small file cache (smaller than the number of containers). + FLAGS_block_manager_max_open_files = 50; + // Use a single shard so that we have an accurate max cache capacity + // regardless of the number of cores on the machine. + FLAGS_cache_force_single_shard = true; + // Use very small containers, so that we generate a lot of them (and thus + // consume a lot of file descriptors). + FLAGS_log_container_max_blocks = 4; + // Reopen so the flags take effect. + ASSERT_OK(ReopenBlockManager(nullptr)); + + // Create many container with a bunch of blocks, half of which are deleted. + vector block_ids; + for (int i = 0; i < 1000; i++) { + unique_ptr block; + ASSERT_OK(bm_->CreateBlock(test_block_opts_, &block)); + ASSERT_OK(block->Close()); + if (i % 2 == 1) { + ASSERT_OK(bm_->DeleteBlock(block->id())); + } else { + block_ids.emplace_back(block->id()); + } + } + + // Reopen the block manager. This will cause it to compact all of the metadata + // files, since we've deleted half the blocks in every container and the + // threshold is set high above. + FsReport report; + ASSERT_OK(ReopenBlockManager(&report)); + + // Delete the remaining blocks in a random order. This will append to metadata + // files which have just been compacted. Since we have more metadata files than + // we have file_cache capacity, this will also generate a mix of cache hits, + // misses, and re-insertions. + std::random_shuffle(block_ids.begin(), block_ids.end()); + for (const BlockId& b : block_ids) { + ASSERT_OK(bm_->DeleteBlock(b)); + } + + // Reopen to make sure that the metadata can be properly loaded and + // that the resulting block manager is empty. + ASSERT_OK(ReopenBlockManager(&report)); + ASSERT_EQ(0, report.stats.live_block_count); + ASSERT_EQ(0, report.stats.live_block_bytes_aligned); +} + } // namespace fs } // namespace kudu http://git-wip-us.apache.org/repos/asf/kudu/blob/e77538b5/src/kudu/fs/log_block_manager.cc ---------------------------------------------------------------------- diff --git a/src/kudu/fs/log_block_manager.cc b/src/kudu/fs/log_block_manager.cc index 4579b7f..c73bd49 100644 --- a/src/kudu/fs/log_block_manager.cc +++ b/src/kudu/fs/log_block_manager.cc @@ -2255,9 +2255,8 @@ Status LogBlockManager::Repair( // Rewrite this metadata file. Failures are non-fatal. int64_t file_bytes_delta; - Status s = RewriteMetadataFile(StrCat(e.first, kContainerMetadataFileSuffix), - e.second, - &file_bytes_delta); + const auto& meta_path = StrCat(e.first, kContainerMetadataFileSuffix); + Status s = RewriteMetadataFile(meta_path, e.second, &file_bytes_delta); if (!s.ok()) { WARN_NOT_OK(s, "could not rewrite metadata file"); continue; @@ -2269,6 +2268,9 @@ Status LogBlockManager::Repair( metadata_files_compacted++; metadata_bytes_delta += file_bytes_delta; + VLOG(1) << "Compacted metadata file " << meta_path + << " (saved " << file_bytes_delta << " bytes)"; + } // The data directory can be synchronized once for all of the new metadata files. @@ -2322,6 +2324,10 @@ Status LogBlockManager::RewriteMetadataFile(const string& metadata_file_name, "could not get file size of temporary metadata file"); RETURN_NOT_OK_PREPEND(env_->RenameFile(tmp_file_name, metadata_file_name), "could not rename temporary metadata file"); + // Evict the old path from the file cache, so that when we re-open the new + // metadata file for write, we don't accidentally get a cache hit on the + // old file descriptor pointing to the now-deleted old version. + file_cache_.Invalidate(metadata_file_name); tmp_deleter.Cancel(); *file_bytes_delta = (static_cast(old_metadata_size) - new_metadata_size); http://git-wip-us.apache.org/repos/asf/kudu/blob/e77538b5/src/kudu/util/file_cache-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/util/file_cache-test.cc b/src/kudu/util/file_cache-test.cc index 5ac568e..1752bd2 100644 --- a/src/kudu/util/file_cache-test.cc +++ b/src/kudu/util/file_cache-test.cc @@ -223,6 +223,40 @@ TYPED_TEST(FileCacheTest, TestDeletion) { ASSERT_EQ(this->initial_open_fds_, CountOpenFds(this->env_)); } +TYPED_TEST(FileCacheTest, TestInvalidation) { + const string kFile1 = this->GetTestPath("foo"); + const string kData1 = "test data 1"; + ASSERT_OK(this->WriteTestFile(kFile1, kData1)); + + // Open the file. + shared_ptr f; + ASSERT_OK(this->cache_->OpenExistingFile(kFile1, &f)); + + // Write a new file and rename it in place on top of file1. + const string kFile2 = this->GetTestPath("foo2"); + const string kData2 = "test data 2 (longer than original)"; + ASSERT_OK(this->WriteTestFile(kFile2, kData2)); + ASSERT_OK(this->env_->RenameFile(kFile2, kFile1)); + + // We should still be able to access the file, since it has a cached fd. + uint64_t size; + ASSERT_OK(f->Size(&size)); + ASSERT_EQ(kData1.size(), size); + + // If we invalidate it from the cache and try again, it should crash because + // the existing descriptor was invalidated. + this->cache_->Invalidate(kFile1); + ASSERT_DEATH({ f->Size(&size); }, "invalidated"); + + // But if we re-open the path again, the new descriptor should read the + // new data. + shared_ptr f2; + ASSERT_OK(this->cache_->OpenExistingFile(kFile1, &f2)); + ASSERT_OK(f2->Size(&size)); + ASSERT_EQ(kData2.size(), size); +} + + TYPED_TEST(FileCacheTest, TestHeavyReads) { const int kNumFiles = 20; const int kNumIterations = 100; http://git-wip-us.apache.org/repos/asf/kudu/blob/e77538b5/src/kudu/util/file_cache.cc ---------------------------------------------------------------------- diff --git a/src/kudu/util/file_cache.cc b/src/kudu/util/file_cache.cc index b8d17cf..07babad 100644 --- a/src/kudu/util/file_cache.cc +++ b/src/kudu/util/file_cache.cc @@ -17,6 +17,7 @@ #include "kudu/util/file_cache.h" +#include #include #include #include @@ -96,7 +97,7 @@ class BaseDescriptor { // the next call to RunDescriptorExpiry(). Removing it here would risk a // deadlock on recursive acquisition of 'lock_'. - if (deleted_) { + if (deleted()) { cache()->Erase(filename()); VLOG(1) << "Deleting file: " << filename(); @@ -136,8 +137,21 @@ class BaseDescriptor { // Mark this descriptor as to-be-deleted later. void MarkDeleted() { - DCHECK(!deleted_); - deleted_ = true; + DCHECK(!deleted()); + while (true) { + auto v = flags_.load(); + if (flags_.compare_exchange_weak(v, v | FILE_DELETED)) return; + } + } + + // Mark this descriptor as invalidated. No further access is allowed + // to this file. + void MarkInvalidated() { + DCHECK(!invalidated()); + while (true) { + auto v = flags_.load(); + if (flags_.compare_exchange_weak(v, v | INVALIDATED)) return; + } } Cache* cache() const { return file_cache_->cache_.get(); } @@ -146,13 +160,17 @@ class BaseDescriptor { const string& filename() const { return file_name_; } - bool deleted() const { return deleted_; } + bool deleted() const { return flags_.load() & FILE_DELETED; } + bool invalidated() const { return flags_.load() & INVALIDATED; } private: FileCache* file_cache_; const string file_name_; - - bool deleted_ = false; + enum Flags { + FILE_DELETED = 1 << 0, + INVALIDATED = 1 << 1 + }; + std::atomic flags_ {0}; DISALLOW_COPY_AND_ASSIGN(BaseDescriptor); }; @@ -296,6 +314,7 @@ class Descriptor : public RWFile { Status ReopenFileIfNecessary(ScopedOpenedDescriptor* out) const { ScopedOpenedDescriptor found(base_.LookupFromCache()); + CHECK(!base_.invalidated()); if (found.opened()) { // The file is already open in the cache, return it. if (out) { @@ -398,6 +417,7 @@ class Descriptor : public RandomAccessFile { Status ReopenFileIfNecessary( ScopedOpenedDescriptor* out) const { ScopedOpenedDescriptor found(base_.LookupFromCache()); + CHECK(!base_.invalidated()); if (found.opened()) { // The file is already open in the cache, return it. if (out) { @@ -479,7 +499,7 @@ Status FileCache::OpenExistingFile(const string& file_name, // Check that the underlying file can be opened (no-op for found // descriptors). Done outside the lock. RETURN_NOT_OK(desc->Init()); - *file = desc; + *file = std::move(desc); return Status::OK(); } @@ -506,6 +526,42 @@ Status FileCache::DeleteFile(const string& file_name) { } template +void FileCache::Invalidate(const string& file_name) { + // Ensure that there is an invalidated descriptor in the map for this filename. + // + // This ensures that any concurrent OpenExistingFile() during this method wil + // see the invalidation and issue a CHECK failure. + shared_ptr> desc; + { + // Find an existing descriptor, or create one if none exists. + std::lock_guard l(lock_); + auto it = descriptors_.find(file_name); + if (it != descriptors_.end()) { + desc = it->second.lock(); + } + if (!desc) { + desc = std::make_shared>(this, file_name); + descriptors_.emplace(file_name, desc); + } + + desc->base_.MarkInvalidated(); + } + // Remove it from the cache so that if the same path is opened again, we + // will re-open a new FD rather than retrieving one that might have been + // cached prior to invalidation. + cache_->Erase(file_name); + + // Remove the invalidated descriptor from the map. We are guaranteed it + // is still there because we've held a strong reference to it for + // the duration of this method, and no other methods erase strong + // references from the map. + { + std::lock_guard l(lock_); + CHECK_EQ(1, descriptors_.erase(file_name)); + } +} + +template int FileCache::NumDescriptorsForTests() const { std::lock_guard l(lock_); return descriptors_.size(); @@ -552,6 +608,7 @@ Status FileCache::FindDescriptorUnlocked( // Found the descriptor. Has it expired? shared_ptr> desc = it->second.lock(); if (desc) { + CHECK(!desc->base_.invalidated()); if (desc->base_.deleted()) { return Status::NotFound("File already marked for deletion", file_name); } @@ -584,46 +641,7 @@ void FileCache::RunDescriptorExpiry() { } // Explicit specialization for callers outside this compilation unit. -template -FileCache::FileCache( - const string& cache_name, - Env* env, - int max_open_files, - const scoped_refptr& entity); -template -FileCache::~FileCache(); -template -Status FileCache::Init(); -template -Status FileCache::OpenExistingFile( - const string& file_name, - shared_ptr* file); -template -Status FileCache::DeleteFile(const string& file_name); -template -int FileCache::NumDescriptorsForTests() const; -template -string FileCache::ToDebugString() const; - -template -FileCache::FileCache( - const string& cache_name, - Env* env, - int max_open_files, - const scoped_refptr& entity); -template -FileCache::~FileCache(); -template -Status FileCache::Init(); -template -Status FileCache::OpenExistingFile( - const string& file_name, - shared_ptr* file); -template -Status FileCache::DeleteFile(const string& file_name); -template -int FileCache::NumDescriptorsForTests() const; -template -string FileCache::ToDebugString() const; +template class FileCache; +template class FileCache; } // namespace kudu http://git-wip-us.apache.org/repos/asf/kudu/blob/e77538b5/src/kudu/util/file_cache.h ---------------------------------------------------------------------- diff --git a/src/kudu/util/file_cache.h b/src/kudu/util/file_cache.h index 3ef87ad..44330d3 100644 --- a/src/kudu/util/file_cache.h +++ b/src/kudu/util/file_cache.h @@ -130,6 +130,29 @@ class FileCache { // deleted immediately. Status DeleteFile(const std::string& file_name); + // Invalidate the given path in the cache if present. This removes the + // path from the cache, and invalidates any previously-opened descriptors + // associated with this file. + // + // If a file with the same path is opened again, the actual path will be opened from + // disk. + // + // This operation should be used during 'rename-to-replace' patterns, eg: + // + // WriteNewDataTo(tmp_path); + // env->RenameFile(tmp_path, p); + // file_cache->Invalidate(p); + // + // NOTE: if any reader of 'p' holds an open descriptor from the cache + // prior to this operation, that descriptor is invalidated and any + // further operations on that descriptor will result in a CHECK failure. + // Hence this is not safe to use without some external synchronization + // which prevents concurrent access to the same file. + // + // NOTE: this function must not be called concurrently on the same file name + // from multiple threads. + void Invalidate(const std::string& file_name); + // Returns the number of entries in the descriptor map. // // Only intended for unit tests.