Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id C2E6A200D24 for ; Mon, 9 Oct 2017 18:24:47 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id C18461609E0; Mon, 9 Oct 2017 16:24:47 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id B0673160BFD for ; Mon, 9 Oct 2017 18:24:44 +0200 (CEST) Received: (qmail 66471 invoked by uid 500); 9 Oct 2017 16:24:43 -0000 Mailing-List: contact commits-help@nifi.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@nifi.apache.org Delivered-To: mailing list commits@nifi.apache.org Received: (qmail 65581 invoked by uid 99); 9 Oct 2017 16:24:43 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 09 Oct 2017 16:24:43 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 75B91F5D46; Mon, 9 Oct 2017 16:24:41 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jeremydyer@apache.org To: commits@nifi.apache.org Date: Mon, 09 Oct 2017 16:25:05 -0000 Message-Id: <414831400bc94d8fb7653256d2dbedfd@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [25/51] [partial] nifi-minifi-cpp git commit: MINIFI-372: Replace leveldb with RocksDB archived-at: Mon, 09 Oct 2017 16:24:47 -0000 http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/compaction_picker.cc ---------------------------------------------------------------------- diff --git a/thirdparty/rocksdb/db/compaction_picker.cc b/thirdparty/rocksdb/db/compaction_picker.cc new file mode 100644 index 0000000..c6a5674 --- /dev/null +++ b/thirdparty/rocksdb/db/compaction_picker.cc @@ -0,0 +1,1591 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "db/compaction_picker.h" + +#ifndef __STDC_FORMAT_MACROS +#define __STDC_FORMAT_MACROS +#endif + +#include +#include +#include +#include +#include +#include "db/column_family.h" +#include "monitoring/statistics.h" +#include "util/filename.h" +#include "util/log_buffer.h" +#include "util/random.h" +#include "util/string_util.h" +#include "util/sync_point.h" + +namespace rocksdb { + +namespace { +uint64_t TotalCompensatedFileSize(const std::vector& files) { + uint64_t sum = 0; + for (size_t i = 0; i < files.size() && files[i]; i++) { + sum += files[i]->compensated_file_size; + } + return sum; +} + +bool FindIntraL0Compaction(const std::vector& level_files, + size_t min_files_to_compact, + uint64_t max_compact_bytes_per_del_file, + CompactionInputFiles* comp_inputs) { + size_t compact_bytes = level_files[0]->fd.file_size; + size_t compact_bytes_per_del_file = port::kMaxSizet; + // compaction range will be [0, span_len). + size_t span_len; + // pull in files until the amount of compaction work per deleted file begins + // increasing. + size_t new_compact_bytes_per_del_file = 0; + for (span_len = 1; span_len < level_files.size(); ++span_len) { + compact_bytes += level_files[span_len]->fd.file_size; + new_compact_bytes_per_del_file = compact_bytes / span_len; + if (level_files[span_len]->being_compacted || + new_compact_bytes_per_del_file > compact_bytes_per_del_file) { + break; + } + compact_bytes_per_del_file = new_compact_bytes_per_del_file; + } + + if (span_len >= min_files_to_compact && + new_compact_bytes_per_del_file < max_compact_bytes_per_del_file) { + assert(comp_inputs != nullptr); + comp_inputs->level = 0; + for (size_t i = 0; i < span_len; ++i) { + comp_inputs->files.push_back(level_files[i]); + } + return true; + } + return false; +} +} // anonymous namespace + +// Determine compression type, based on user options, level of the output +// file and whether compression is disabled. +// If enable_compression is false, then compression is always disabled no +// matter what the values of the other two parameters are. +// Otherwise, the compression type is determined based on options and level. +CompressionType GetCompressionType(const ImmutableCFOptions& ioptions, + const VersionStorageInfo* vstorage, + const MutableCFOptions& mutable_cf_options, + int level, int base_level, + const bool enable_compression) { + if (!enable_compression) { + // disable compression + return kNoCompression; + } + + // If bottommost_compression is set and we are compacting to the + // bottommost level then we should use it. + if (ioptions.bottommost_compression != kDisableCompressionOption && + level > base_level && level >= (vstorage->num_non_empty_levels() - 1)) { + return ioptions.bottommost_compression; + } + // If the user has specified a different compression level for each level, + // then pick the compression for that level. + if (!ioptions.compression_per_level.empty()) { + assert(level == 0 || level >= base_level); + int idx = (level == 0) ? 0 : level - base_level + 1; + + const int n = static_cast(ioptions.compression_per_level.size()) - 1; + // It is possible for level_ to be -1; in that case, we use level + // 0's compression. This occurs mostly in backwards compatibility + // situations when the builder doesn't know what level the file + // belongs to. Likewise, if level is beyond the end of the + // specified compression levels, use the last value. + return ioptions.compression_per_level[std::max(0, std::min(idx, n))]; + } else { + return mutable_cf_options.compression; + } +} + +CompactionPicker::CompactionPicker(const ImmutableCFOptions& ioptions, + const InternalKeyComparator* icmp) + : ioptions_(ioptions), icmp_(icmp) {} + +CompactionPicker::~CompactionPicker() {} + +// Delete this compaction from the list of running compactions. +void CompactionPicker::ReleaseCompactionFiles(Compaction* c, Status status) { + UnregisterCompaction(c); + if (!status.ok()) { + c->ResetNextCompactionIndex(); + } +} + +void CompactionPicker::GetRange(const CompactionInputFiles& inputs, + InternalKey* smallest, + InternalKey* largest) const { + const int level = inputs.level; + assert(!inputs.empty()); + smallest->Clear(); + largest->Clear(); + + if (level == 0) { + for (size_t i = 0; i < inputs.size(); i++) { + FileMetaData* f = inputs[i]; + if (i == 0) { + *smallest = f->smallest; + *largest = f->largest; + } else { + if (icmp_->Compare(f->smallest, *smallest) < 0) { + *smallest = f->smallest; + } + if (icmp_->Compare(f->largest, *largest) > 0) { + *largest = f->largest; + } + } + } + } else { + *smallest = inputs[0]->smallest; + *largest = inputs[inputs.size() - 1]->largest; + } +} + +void CompactionPicker::GetRange(const CompactionInputFiles& inputs1, + const CompactionInputFiles& inputs2, + InternalKey* smallest, + InternalKey* largest) const { + assert(!inputs1.empty() || !inputs2.empty()); + if (inputs1.empty()) { + GetRange(inputs2, smallest, largest); + } else if (inputs2.empty()) { + GetRange(inputs1, smallest, largest); + } else { + InternalKey smallest1, smallest2, largest1, largest2; + GetRange(inputs1, &smallest1, &largest1); + GetRange(inputs2, &smallest2, &largest2); + *smallest = + icmp_->Compare(smallest1, smallest2) < 0 ? smallest1 : smallest2; + *largest = icmp_->Compare(largest1, largest2) < 0 ? largest2 : largest1; + } +} + +void CompactionPicker::GetRange(const std::vector& inputs, + InternalKey* smallest, + InternalKey* largest) const { + InternalKey current_smallest; + InternalKey current_largest; + bool initialized = false; + for (const auto& in : inputs) { + if (in.empty()) { + continue; + } + GetRange(in, ¤t_smallest, ¤t_largest); + if (!initialized) { + *smallest = current_smallest; + *largest = current_largest; + initialized = true; + } else { + if (icmp_->Compare(current_smallest, *smallest) < 0) { + *smallest = current_smallest; + } + if (icmp_->Compare(current_largest, *largest) > 0) { + *largest = current_largest; + } + } + } + assert(initialized); +} + +bool CompactionPicker::ExpandInputsToCleanCut(const std::string& cf_name, + VersionStorageInfo* vstorage, + CompactionInputFiles* inputs) { + // This isn't good compaction + assert(!inputs->empty()); + + const int level = inputs->level; + // GetOverlappingInputs will always do the right thing for level-0. + // So we don't need to do any expansion if level == 0. + if (level == 0) { + return true; + } + + InternalKey smallest, largest; + + // Keep expanding inputs until we are sure that there is a "clean cut" + // boundary between the files in input and the surrounding files. + // This will ensure that no parts of a key are lost during compaction. + int hint_index = -1; + size_t old_size; + do { + old_size = inputs->size(); + GetRange(*inputs, &smallest, &largest); + inputs->clear(); + vstorage->GetOverlappingInputs(level, &smallest, &largest, &inputs->files, + hint_index, &hint_index); + } while (inputs->size() > old_size); + + // we started off with inputs non-empty and the previous loop only grew + // inputs. thus, inputs should be non-empty here + assert(!inputs->empty()); + + // If, after the expansion, there are files that are already under + // compaction, then we must drop/cancel this compaction. + if (AreFilesInCompaction(inputs->files)) { + return false; + } + return true; +} + +bool CompactionPicker::RangeOverlapWithCompaction( + const Slice& smallest_user_key, const Slice& largest_user_key, + int level) const { + const Comparator* ucmp = icmp_->user_comparator(); + for (Compaction* c : compactions_in_progress_) { + if (c->output_level() == level && + ucmp->Compare(smallest_user_key, c->GetLargestUserKey()) <= 0 && + ucmp->Compare(largest_user_key, c->GetSmallestUserKey()) >= 0) { + // Overlap + return true; + } + } + // Did not overlap with any running compaction in level `level` + return false; +} + +bool CompactionPicker::FilesRangeOverlapWithCompaction( + const std::vector& inputs, int level) const { + bool is_empty = true; + for (auto& in : inputs) { + if (!in.empty()) { + is_empty = false; + break; + } + } + if (is_empty) { + // No files in inputs + return false; + } + + InternalKey smallest, largest; + GetRange(inputs, &smallest, &largest); + return RangeOverlapWithCompaction(smallest.user_key(), largest.user_key(), + level); +} + +// Returns true if any one of specified files are being compacted +bool CompactionPicker::AreFilesInCompaction( + const std::vector& files) { + for (size_t i = 0; i < files.size(); i++) { + if (files[i]->being_compacted) { + return true; + } + } + return false; +} + +Compaction* CompactionPicker::CompactFiles( + const CompactionOptions& compact_options, + const std::vector& input_files, int output_level, + VersionStorageInfo* vstorage, const MutableCFOptions& mutable_cf_options, + uint32_t output_path_id) { + assert(input_files.size()); + + // TODO(rven ): we might be able to run concurrent level 0 compaction + // if the key ranges of the two compactions do not overlap, but for now + // we do not allow it. + if ((input_files[0].level == 0) && !level0_compactions_in_progress_.empty()) { + return nullptr; + } + // This compaction output could overlap with a running compaction + if (FilesRangeOverlapWithCompaction(input_files, output_level)) { + return nullptr; + } + auto c = + new Compaction(vstorage, ioptions_, mutable_cf_options, input_files, + output_level, compact_options.output_file_size_limit, + mutable_cf_options.max_compaction_bytes, output_path_id, + compact_options.compression, /* grandparents */ {}, true); + + // If it's level 0 compaction, make sure we don't execute any other level 0 + // compactions in parallel + RegisterCompaction(c); + return c; +} + +Status CompactionPicker::GetCompactionInputsFromFileNumbers( + std::vector* input_files, + std::unordered_set* input_set, const VersionStorageInfo* vstorage, + const CompactionOptions& compact_options) const { + if (input_set->size() == 0U) { + return Status::InvalidArgument( + "Compaction must include at least one file."); + } + assert(input_files); + + std::vector matched_input_files; + matched_input_files.resize(vstorage->num_levels()); + int first_non_empty_level = -1; + int last_non_empty_level = -1; + // TODO(yhchiang): use a lazy-initialized mapping from + // file_number to FileMetaData in Version. + for (int level = 0; level < vstorage->num_levels(); ++level) { + for (auto file : vstorage->LevelFiles(level)) { + auto iter = input_set->find(file->fd.GetNumber()); + if (iter != input_set->end()) { + matched_input_files[level].files.push_back(file); + input_set->erase(iter); + last_non_empty_level = level; + if (first_non_empty_level == -1) { + first_non_empty_level = level; + } + } + } + } + + if (!input_set->empty()) { + std::string message( + "Cannot find matched SST files for the following file numbers:"); + for (auto fn : *input_set) { + message += " "; + message += ToString(fn); + } + return Status::InvalidArgument(message); + } + + for (int level = first_non_empty_level; level <= last_non_empty_level; + ++level) { + matched_input_files[level].level = level; + input_files->emplace_back(std::move(matched_input_files[level])); + } + + return Status::OK(); +} + +// Returns true if any one of the parent files are being compacted +bool CompactionPicker::IsRangeInCompaction(VersionStorageInfo* vstorage, + const InternalKey* smallest, + const InternalKey* largest, + int level, int* level_index) { + std::vector inputs; + assert(level < NumberLevels()); + + vstorage->GetOverlappingInputs(level, smallest, largest, &inputs, + *level_index, level_index); + return AreFilesInCompaction(inputs); +} + +// Populates the set of inputs of all other levels that overlap with the +// start level. +// Now we assume all levels except start level and output level are empty. +// Will also attempt to expand "start level" if that doesn't expand +// "output level" or cause "level" to include a file for compaction that has an +// overlapping user-key with another file. +// REQUIRES: input_level and output_level are different +// REQUIRES: inputs->empty() == false +// Returns false if files on parent level are currently in compaction, which +// means that we can't compact them +bool CompactionPicker::SetupOtherInputs( + const std::string& cf_name, const MutableCFOptions& mutable_cf_options, + VersionStorageInfo* vstorage, CompactionInputFiles* inputs, + CompactionInputFiles* output_level_inputs, int* parent_index, + int base_index) { + assert(!inputs->empty()); + assert(output_level_inputs->empty()); + const int input_level = inputs->level; + const int output_level = output_level_inputs->level; + assert(input_level != output_level); + + // For now, we only support merging two levels, start level and output level. + // We need to assert other levels are empty. + for (int l = input_level + 1; l < output_level; l++) { + assert(vstorage->NumLevelFiles(l) == 0); + } + + InternalKey smallest, largest; + + // Get the range one last time. + GetRange(*inputs, &smallest, &largest); + + // Populate the set of next-level files (inputs_GetOutputLevelInputs()) to + // include in compaction + vstorage->GetOverlappingInputs(output_level, &smallest, &largest, + &output_level_inputs->files, *parent_index, + parent_index); + if (AreFilesInCompaction(output_level_inputs->files)) { + return false; + } + if (!output_level_inputs->empty()) { + if (!ExpandInputsToCleanCut(cf_name, vstorage, output_level_inputs)) { + return false; + } + } + + // See if we can further grow the number of inputs in "level" without + // changing the number of "level+1" files we pick up. We also choose NOT + // to expand if this would cause "level" to include some entries for some + // user key, while excluding other entries for the same user key. This + // can happen when one user key spans multiple files. + if (!output_level_inputs->empty()) { + const uint64_t limit = mutable_cf_options.max_compaction_bytes; + const uint64_t output_level_inputs_size = + TotalCompensatedFileSize(output_level_inputs->files); + const uint64_t inputs_size = TotalCompensatedFileSize(inputs->files); + bool expand_inputs = false; + + CompactionInputFiles expanded_inputs; + expanded_inputs.level = input_level; + // Get closed interval of output level + InternalKey all_start, all_limit; + GetRange(*inputs, *output_level_inputs, &all_start, &all_limit); + bool try_overlapping_inputs = true; + vstorage->GetOverlappingInputs(input_level, &all_start, &all_limit, + &expanded_inputs.files, base_index, nullptr); + uint64_t expanded_inputs_size = + TotalCompensatedFileSize(expanded_inputs.files); + if (!ExpandInputsToCleanCut(cf_name, vstorage, &expanded_inputs)) { + try_overlapping_inputs = false; + } + if (try_overlapping_inputs && expanded_inputs.size() > inputs->size() && + output_level_inputs_size + expanded_inputs_size < limit && + !AreFilesInCompaction(expanded_inputs.files)) { + InternalKey new_start, new_limit; + GetRange(expanded_inputs, &new_start, &new_limit); + CompactionInputFiles expanded_output_level_inputs; + expanded_output_level_inputs.level = output_level; + vstorage->GetOverlappingInputs(output_level, &new_start, &new_limit, + &expanded_output_level_inputs.files, + *parent_index, parent_index); + assert(!expanded_output_level_inputs.empty()); + if (!AreFilesInCompaction(expanded_output_level_inputs.files) && + ExpandInputsToCleanCut(cf_name, vstorage, + &expanded_output_level_inputs) && + expanded_output_level_inputs.size() == output_level_inputs->size()) { + expand_inputs = true; + } + } + if (!expand_inputs) { + vstorage->GetCleanInputsWithinInterval(input_level, &all_start, + &all_limit, &expanded_inputs.files, + base_index, nullptr); + expanded_inputs_size = TotalCompensatedFileSize(expanded_inputs.files); + if (expanded_inputs.size() > inputs->size() && + output_level_inputs_size + expanded_inputs_size < limit && + !AreFilesInCompaction(expanded_inputs.files)) { + expand_inputs = true; + } + } + if (expand_inputs) { + ROCKS_LOG_INFO(ioptions_.info_log, + "[%s] Expanding@%d %" ROCKSDB_PRIszt "+%" ROCKSDB_PRIszt + "(%" PRIu64 "+%" PRIu64 " bytes) to %" ROCKSDB_PRIszt + "+%" ROCKSDB_PRIszt " (%" PRIu64 "+%" PRIu64 "bytes)\n", + cf_name.c_str(), input_level, inputs->size(), + output_level_inputs->size(), inputs_size, + output_level_inputs_size, expanded_inputs.size(), + output_level_inputs->size(), expanded_inputs_size, + output_level_inputs_size); + inputs->files = expanded_inputs.files; + } + } + return true; +} + +void CompactionPicker::GetGrandparents( + VersionStorageInfo* vstorage, const CompactionInputFiles& inputs, + const CompactionInputFiles& output_level_inputs, + std::vector* grandparents) { + InternalKey start, limit; + GetRange(inputs, output_level_inputs, &start, &limit); + // Compute the set of grandparent files that overlap this compaction + // (parent == level+1; grandparent == level+2) + if (output_level_inputs.level + 1 < NumberLevels()) { + vstorage->GetOverlappingInputs(output_level_inputs.level + 1, &start, + &limit, grandparents); + } +} + +Compaction* CompactionPicker::CompactRange( + const std::string& cf_name, const MutableCFOptions& mutable_cf_options, + VersionStorageInfo* vstorage, int input_level, int output_level, + uint32_t output_path_id, const InternalKey* begin, const InternalKey* end, + InternalKey** compaction_end, bool* manual_conflict) { + // CompactionPickerFIFO has its own implementation of compact range + assert(ioptions_.compaction_style != kCompactionStyleFIFO); + + if (input_level == ColumnFamilyData::kCompactAllLevels) { + assert(ioptions_.compaction_style == kCompactionStyleUniversal); + + // Universal compaction with more than one level always compacts all the + // files together to the last level. + assert(vstorage->num_levels() > 1); + // DBImpl::CompactRange() set output level to be the last level + if (ioptions_.allow_ingest_behind) { + assert(output_level == vstorage->num_levels() - 2); + } else { + assert(output_level == vstorage->num_levels() - 1); + } + // DBImpl::RunManualCompaction will make full range for universal compaction + assert(begin == nullptr); + assert(end == nullptr); + *compaction_end = nullptr; + + int start_level = 0; + for (; start_level < vstorage->num_levels() && + vstorage->NumLevelFiles(start_level) == 0; + start_level++) { + } + if (start_level == vstorage->num_levels()) { + return nullptr; + } + + if ((start_level == 0) && (!level0_compactions_in_progress_.empty())) { + *manual_conflict = true; + // Only one level 0 compaction allowed + return nullptr; + } + + std::vector inputs(vstorage->num_levels() - + start_level); + for (int level = start_level; level < vstorage->num_levels(); level++) { + inputs[level - start_level].level = level; + auto& files = inputs[level - start_level].files; + for (FileMetaData* f : vstorage->LevelFiles(level)) { + files.push_back(f); + } + if (AreFilesInCompaction(files)) { + *manual_conflict = true; + return nullptr; + } + } + + // 2 non-exclusive manual compactions could run at the same time producing + // overlaping outputs in the same level. + if (FilesRangeOverlapWithCompaction(inputs, output_level)) { + // This compaction output could potentially conflict with the output + // of a currently running compaction, we cannot run it. + *manual_conflict = true; + return nullptr; + } + + Compaction* c = new Compaction( + vstorage, ioptions_, mutable_cf_options, std::move(inputs), + output_level, mutable_cf_options.MaxFileSizeForLevel(output_level), + /* max_compaction_bytes */ LLONG_MAX, output_path_id, + GetCompressionType(ioptions_, vstorage, mutable_cf_options, + output_level, 1), + /* grandparents */ {}, /* is manual */ true); + RegisterCompaction(c); + return c; + } + + CompactionInputFiles inputs; + inputs.level = input_level; + bool covering_the_whole_range = true; + + // All files are 'overlapping' in universal style compaction. + // We have to compact the entire range in one shot. + if (ioptions_.compaction_style == kCompactionStyleUniversal) { + begin = nullptr; + end = nullptr; + } + + vstorage->GetOverlappingInputs(input_level, begin, end, &inputs.files); + if (inputs.empty()) { + return nullptr; + } + + if ((input_level == 0) && (!level0_compactions_in_progress_.empty())) { + // Only one level 0 compaction allowed + TEST_SYNC_POINT("CompactionPicker::CompactRange:Conflict"); + *manual_conflict = true; + return nullptr; + } + + // Avoid compacting too much in one shot in case the range is large. + // But we cannot do this for level-0 since level-0 files can overlap + // and we must not pick one file and drop another older file if the + // two files overlap. + if (input_level > 0) { + const uint64_t limit = mutable_cf_options.max_compaction_bytes; + uint64_t total = 0; + for (size_t i = 0; i + 1 < inputs.size(); ++i) { + uint64_t s = inputs[i]->compensated_file_size; + total += s; + if (total >= limit) { + **compaction_end = inputs[i + 1]->smallest; + covering_the_whole_range = false; + inputs.files.resize(i + 1); + break; + } + } + } + assert(output_path_id < static_cast(ioptions_.db_paths.size())); + + if (ExpandInputsToCleanCut(cf_name, vstorage, &inputs) == false) { + // manual compaction is now multi-threaded, so it can + // happen that ExpandWhileOverlapping fails + // we handle it higher in RunManualCompaction + *manual_conflict = true; + return nullptr; + } + + if (covering_the_whole_range) { + *compaction_end = nullptr; + } + + CompactionInputFiles output_level_inputs; + if (output_level == ColumnFamilyData::kCompactToBaseLevel) { + assert(input_level == 0); + output_level = vstorage->base_level(); + assert(output_level > 0); + } + output_level_inputs.level = output_level; + if (input_level != output_level) { + int parent_index = -1; + if (!SetupOtherInputs(cf_name, mutable_cf_options, vstorage, &inputs, + &output_level_inputs, &parent_index, -1)) { + // manual compaction is now multi-threaded, so it can + // happen that SetupOtherInputs fails + // we handle it higher in RunManualCompaction + *manual_conflict = true; + return nullptr; + } + } + + std::vector compaction_inputs({inputs}); + if (!output_level_inputs.empty()) { + compaction_inputs.push_back(output_level_inputs); + } + for (size_t i = 0; i < compaction_inputs.size(); i++) { + if (AreFilesInCompaction(compaction_inputs[i].files)) { + *manual_conflict = true; + return nullptr; + } + } + + // 2 non-exclusive manual compactions could run at the same time producing + // overlaping outputs in the same level. + if (FilesRangeOverlapWithCompaction(compaction_inputs, output_level)) { + // This compaction output could potentially conflict with the output + // of a currently running compaction, we cannot run it. + *manual_conflict = true; + return nullptr; + } + + std::vector grandparents; + GetGrandparents(vstorage, inputs, output_level_inputs, &grandparents); + Compaction* compaction = new Compaction( + vstorage, ioptions_, mutable_cf_options, std::move(compaction_inputs), + output_level, mutable_cf_options.MaxFileSizeForLevel(output_level), + mutable_cf_options.max_compaction_bytes, output_path_id, + GetCompressionType(ioptions_, vstorage, mutable_cf_options, output_level, + vstorage->base_level()), + std::move(grandparents), /* is manual compaction */ true); + + TEST_SYNC_POINT_CALLBACK("CompactionPicker::CompactRange:Return", compaction); + RegisterCompaction(compaction); + + // Creating a compaction influences the compaction score because the score + // takes running compactions into account (by skipping files that are already + // being compacted). Since we just changed compaction score, we recalculate it + // here + vstorage->ComputeCompactionScore(ioptions_, mutable_cf_options); + + return compaction; +} + +#ifndef ROCKSDB_LITE +namespace { +// Test whether two files have overlapping key-ranges. +bool HaveOverlappingKeyRanges(const Comparator* c, const SstFileMetaData& a, + const SstFileMetaData& b) { + if (c->Compare(a.smallestkey, b.smallestkey) >= 0) { + if (c->Compare(a.smallestkey, b.largestkey) <= 0) { + // b.smallestkey <= a.smallestkey <= b.largestkey + return true; + } + } else if (c->Compare(a.largestkey, b.smallestkey) >= 0) { + // a.smallestkey < b.smallestkey <= a.largestkey + return true; + } + if (c->Compare(a.largestkey, b.largestkey) <= 0) { + if (c->Compare(a.largestkey, b.smallestkey) >= 0) { + // b.smallestkey <= a.largestkey <= b.largestkey + return true; + } + } else if (c->Compare(a.smallestkey, b.largestkey) <= 0) { + // a.smallestkey <= b.largestkey < a.largestkey + return true; + } + return false; +} +} // namespace + +Status CompactionPicker::SanitizeCompactionInputFilesForAllLevels( + std::unordered_set* input_files, + const ColumnFamilyMetaData& cf_meta, const int output_level) const { + auto& levels = cf_meta.levels; + auto comparator = icmp_->user_comparator(); + + // TODO(yhchiang): If there is any input files of L1 or up and there + // is at least one L0 files. All L0 files older than the L0 file needs + // to be included. Otherwise, it is a false conditoin + + // TODO(yhchiang): add is_adjustable to CompactionOptions + + // the smallest and largest key of the current compaction input + std::string smallestkey; + std::string largestkey; + // a flag for initializing smallest and largest key + bool is_first = false; + const int kNotFound = -1; + + // For each level, it does the following things: + // 1. Find the first and the last compaction input files + // in the current level. + // 2. Include all files between the first and the last + // compaction input files. + // 3. Update the compaction key-range. + // 4. For all remaining levels, include files that have + // overlapping key-range with the compaction key-range. + for (int l = 0; l <= output_level; ++l) { + auto& current_files = levels[l].files; + int first_included = static_cast(current_files.size()); + int last_included = kNotFound; + + // identify the first and the last compaction input files + // in the current level. + for (size_t f = 0; f < current_files.size(); ++f) { + if (input_files->find(TableFileNameToNumber(current_files[f].name)) != + input_files->end()) { + first_included = std::min(first_included, static_cast(f)); + last_included = std::max(last_included, static_cast(f)); + if (is_first == false) { + smallestkey = current_files[f].smallestkey; + largestkey = current_files[f].largestkey; + is_first = true; + } + } + } + if (last_included == kNotFound) { + continue; + } + + if (l != 0) { + // expend the compaction input of the current level if it + // has overlapping key-range with other non-compaction input + // files in the same level. + while (first_included > 0) { + if (comparator->Compare(current_files[first_included - 1].largestkey, + current_files[first_included].smallestkey) < + 0) { + break; + } + first_included--; + } + + while (last_included < static_cast(current_files.size()) - 1) { + if (comparator->Compare(current_files[last_included + 1].smallestkey, + current_files[last_included].largestkey) > 0) { + break; + } + last_included++; + } + } + + // include all files between the first and the last compaction input files. + for (int f = first_included; f <= last_included; ++f) { + if (current_files[f].being_compacted) { + return Status::Aborted("Necessary compaction input file " + + current_files[f].name + + " is currently being compacted."); + } + input_files->insert(TableFileNameToNumber(current_files[f].name)); + } + + // update smallest and largest key + if (l == 0) { + for (int f = first_included; f <= last_included; ++f) { + if (comparator->Compare(smallestkey, current_files[f].smallestkey) > + 0) { + smallestkey = current_files[f].smallestkey; + } + if (comparator->Compare(largestkey, current_files[f].largestkey) < 0) { + largestkey = current_files[f].largestkey; + } + } + } else { + if (comparator->Compare(smallestkey, + current_files[first_included].smallestkey) > 0) { + smallestkey = current_files[first_included].smallestkey; + } + if (comparator->Compare(largestkey, + current_files[last_included].largestkey) < 0) { + largestkey = current_files[last_included].largestkey; + } + } + + SstFileMetaData aggregated_file_meta; + aggregated_file_meta.smallestkey = smallestkey; + aggregated_file_meta.largestkey = largestkey; + + // For all lower levels, include all overlapping files. + // We need to add overlapping files from the current level too because even + // if there no input_files in level l, we would still need to add files + // which overlap with the range containing the input_files in levels 0 to l + // Level 0 doesn't need to be handled this way because files are sorted by + // time and not by key + for (int m = std::max(l, 1); m <= output_level; ++m) { + for (auto& next_lv_file : levels[m].files) { + if (HaveOverlappingKeyRanges(comparator, aggregated_file_meta, + next_lv_file)) { + if (next_lv_file.being_compacted) { + return Status::Aborted( + "File " + next_lv_file.name + + " that has overlapping key range with one of the compaction " + " input file is currently being compacted."); + } + input_files->insert(TableFileNameToNumber(next_lv_file.name)); + } + } + } + } + return Status::OK(); +} + +Status CompactionPicker::SanitizeCompactionInputFiles( + std::unordered_set* input_files, + const ColumnFamilyMetaData& cf_meta, const int output_level) const { + assert(static_cast(cf_meta.levels.size()) - 1 == + cf_meta.levels[cf_meta.levels.size() - 1].level); + if (output_level >= static_cast(cf_meta.levels.size())) { + return Status::InvalidArgument( + "Output level for column family " + cf_meta.name + + " must between [0, " + + ToString(cf_meta.levels[cf_meta.levels.size() - 1].level) + "]."); + } + + if (output_level > MaxOutputLevel()) { + return Status::InvalidArgument( + "Exceed the maximum output level defined by " + "the current compaction algorithm --- " + + ToString(MaxOutputLevel())); + } + + if (output_level < 0) { + return Status::InvalidArgument("Output level cannot be negative."); + } + + if (input_files->size() == 0) { + return Status::InvalidArgument( + "A compaction must contain at least one file."); + } + + Status s = SanitizeCompactionInputFilesForAllLevels(input_files, cf_meta, + output_level); + + if (!s.ok()) { + return s; + } + + // for all input files, check whether the file number matches + // any currently-existing files. + for (auto file_num : *input_files) { + bool found = false; + for (auto level_meta : cf_meta.levels) { + for (auto file_meta : level_meta.files) { + if (file_num == TableFileNameToNumber(file_meta.name)) { + if (file_meta.being_compacted) { + return Status::Aborted("Specified compaction input file " + + MakeTableFileName("", file_num) + + " is already being compacted."); + } + found = true; + break; + } + } + if (found) { + break; + } + } + if (!found) { + return Status::InvalidArgument( + "Specified compaction input file " + MakeTableFileName("", file_num) + + " does not exist in column family " + cf_meta.name + "."); + } + } + + return Status::OK(); +} +#endif // !ROCKSDB_LITE + +void CompactionPicker::RegisterCompaction(Compaction* c) { + if (c == nullptr) { + return; + } + assert(ioptions_.compaction_style != kCompactionStyleLevel || + c->output_level() == 0 || + !FilesRangeOverlapWithCompaction(*c->inputs(), c->output_level())); + if (c->start_level() == 0 || + ioptions_.compaction_style == kCompactionStyleUniversal) { + level0_compactions_in_progress_.insert(c); + } + compactions_in_progress_.insert(c); +} + +void CompactionPicker::UnregisterCompaction(Compaction* c) { + if (c == nullptr) { + return; + } + if (c->start_level() == 0 || + ioptions_.compaction_style == kCompactionStyleUniversal) { + level0_compactions_in_progress_.erase(c); + } + compactions_in_progress_.erase(c); +} + +bool LevelCompactionPicker::NeedsCompaction( + const VersionStorageInfo* vstorage) const { + if (!vstorage->FilesMarkedForCompaction().empty()) { + return true; + } + for (int i = 0; i <= vstorage->MaxInputLevel(); i++) { + if (vstorage->CompactionScore(i) >= 1) { + return true; + } + } + return false; +} + +namespace { +// A class to build a leveled compaction step-by-step. +class LevelCompactionBuilder { + public: + LevelCompactionBuilder(const std::string& cf_name, + VersionStorageInfo* vstorage, + CompactionPicker* compaction_picker, + LogBuffer* log_buffer, + const MutableCFOptions& mutable_cf_options, + const ImmutableCFOptions& ioptions) + : cf_name_(cf_name), + vstorage_(vstorage), + compaction_picker_(compaction_picker), + log_buffer_(log_buffer), + mutable_cf_options_(mutable_cf_options), + ioptions_(ioptions) {} + + // Pick and return a compaction. + Compaction* PickCompaction(); + + // Pick the initial files to compact to the next level. (or together + // in Intra-L0 compactions) + void SetupInitialFiles(); + + // If the initial files are from L0 level, pick other L0 + // files if needed. + bool SetupOtherL0FilesIfNeeded(); + + // Based on initial files, setup other files need to be compacted + // in this compaction, accordingly. + bool SetupOtherInputsIfNeeded(); + + Compaction* GetCompaction(); + + // For the specfied level, pick a file that we want to compact. + // Returns false if there is no file to compact. + // If it returns true, inputs->files.size() will be exactly one. + // If level is 0 and there is already a compaction on that level, this + // function will return false. + bool PickFileToCompact(); + + // For L0->L0, picks the longest span of files that aren't currently + // undergoing compaction for which work-per-deleted-file decreases. The span + // always starts from the newest L0 file. + // + // Intra-L0 compaction is independent of all other files, so it can be + // performed even when L0->base_level compactions are blocked. + // + // Returns true if `inputs` is populated with a span of files to be compacted; + // otherwise, returns false. + bool PickIntraL0Compaction(); + + // If there is any file marked for compaction, put put it into inputs. + void PickFilesMarkedForCompaction(); + + const std::string& cf_name_; + VersionStorageInfo* vstorage_; + CompactionPicker* compaction_picker_; + LogBuffer* log_buffer_; + int start_level_ = -1; + int output_level_ = -1; + int parent_index_ = -1; + int base_index_ = -1; + double start_level_score_ = 0; + bool is_manual_ = false; + CompactionInputFiles start_level_inputs_; + std::vector compaction_inputs_; + CompactionInputFiles output_level_inputs_; + std::vector grandparents_; + CompactionReason compaction_reason_ = CompactionReason::kUnknown; + + const MutableCFOptions& mutable_cf_options_; + const ImmutableCFOptions& ioptions_; + // Pick a path ID to place a newly generated file, with its level + static uint32_t GetPathId(const ImmutableCFOptions& ioptions, + const MutableCFOptions& mutable_cf_options, + int level); + + static const int kMinFilesForIntraL0Compaction = 4; +}; + +void LevelCompactionBuilder::PickFilesMarkedForCompaction() { + if (vstorage_->FilesMarkedForCompaction().empty()) { + return; + } + + auto continuation = [&](std::pair level_file) { + // If it's being compacted it has nothing to do here. + // If this assert() fails that means that some function marked some + // files as being_compacted, but didn't call ComputeCompactionScore() + assert(!level_file.second->being_compacted); + start_level_ = level_file.first; + output_level_ = + (start_level_ == 0) ? vstorage_->base_level() : start_level_ + 1; + + if (start_level_ == 0 && + !compaction_picker_->level0_compactions_in_progress()->empty()) { + return false; + } + + start_level_inputs_.files = {level_file.second}; + start_level_inputs_.level = start_level_; + return compaction_picker_->ExpandInputsToCleanCut(cf_name_, vstorage_, + &start_level_inputs_); + }; + + // take a chance on a random file first + Random64 rnd(/* seed */ reinterpret_cast(vstorage_)); + size_t random_file_index = static_cast(rnd.Uniform( + static_cast(vstorage_->FilesMarkedForCompaction().size()))); + + if (continuation(vstorage_->FilesMarkedForCompaction()[random_file_index])) { + // found the compaction! + return; + } + + for (auto& level_file : vstorage_->FilesMarkedForCompaction()) { + if (continuation(level_file)) { + // found the compaction! + return; + } + } + start_level_inputs_.files.clear(); +} + +void LevelCompactionBuilder::SetupInitialFiles() { + // Find the compactions by size on all levels. + bool skipped_l0_to_base = false; + for (int i = 0; i < compaction_picker_->NumberLevels() - 1; i++) { + start_level_score_ = vstorage_->CompactionScore(i); + start_level_ = vstorage_->CompactionScoreLevel(i); + assert(i == 0 || start_level_score_ <= vstorage_->CompactionScore(i - 1)); + if (start_level_score_ >= 1) { + if (skipped_l0_to_base && start_level_ == vstorage_->base_level()) { + // If L0->base_level compaction is pending, don't schedule further + // compaction from base level. Otherwise L0->base_level compaction + // may starve. + continue; + } + output_level_ = + (start_level_ == 0) ? vstorage_->base_level() : start_level_ + 1; + if (PickFileToCompact()) { + // found the compaction! + if (start_level_ == 0) { + // L0 score = `num L0 files` / `level0_file_num_compaction_trigger` + compaction_reason_ = CompactionReason::kLevelL0FilesNum; + } else { + // L1+ score = `Level files size` / `MaxBytesForLevel` + compaction_reason_ = CompactionReason::kLevelMaxLevelSize; + } + break; + } else { + // didn't find the compaction, clear the inputs + start_level_inputs_.clear(); + if (start_level_ == 0) { + skipped_l0_to_base = true; + // L0->base_level may be blocked due to ongoing L0->base_level + // compactions. It may also be blocked by an ongoing compaction from + // base_level downwards. + // + // In these cases, to reduce L0 file count and thus reduce likelihood + // of write stalls, we can attempt compacting a span of files within + // L0. + if (PickIntraL0Compaction()) { + output_level_ = 0; + compaction_reason_ = CompactionReason::kLevelL0FilesNum; + break; + } + } + } + } + } + + // if we didn't find a compaction, check if there are any files marked for + // compaction + if (start_level_inputs_.empty()) { + is_manual_ = true; + parent_index_ = base_index_ = -1; + PickFilesMarkedForCompaction(); + if (!start_level_inputs_.empty()) { + compaction_reason_ = CompactionReason::kFilesMarkedForCompaction; + } + } +} + +bool LevelCompactionBuilder::SetupOtherL0FilesIfNeeded() { + if (start_level_ == 0 && output_level_ != 0) { + // Two level 0 compaction won't run at the same time, so don't need to worry + // about files on level 0 being compacted. + assert(compaction_picker_->level0_compactions_in_progress()->empty()); + InternalKey smallest, largest; + compaction_picker_->GetRange(start_level_inputs_, &smallest, &largest); + // Note that the next call will discard the file we placed in + // c->inputs_[0] earlier and replace it with an overlapping set + // which will include the picked file. + start_level_inputs_.files.clear(); + vstorage_->GetOverlappingInputs(0, &smallest, &largest, + &start_level_inputs_.files); + + // If we include more L0 files in the same compaction run it can + // cause the 'smallest' and 'largest' key to get extended to a + // larger range. So, re-invoke GetRange to get the new key range + compaction_picker_->GetRange(start_level_inputs_, &smallest, &largest); + if (compaction_picker_->IsRangeInCompaction( + vstorage_, &smallest, &largest, output_level_, &parent_index_)) { + return false; + } + } + assert(!start_level_inputs_.files.empty()); + + return true; +} + +bool LevelCompactionBuilder::SetupOtherInputsIfNeeded() { + // Setup input files from output level. For output to L0, we only compact + // spans of files that do not interact with any pending compactions, so don't + // need to consider other levels. + if (output_level_ != 0) { + output_level_inputs_.level = output_level_; + if (!compaction_picker_->SetupOtherInputs( + cf_name_, mutable_cf_options_, vstorage_, &start_level_inputs_, + &output_level_inputs_, &parent_index_, base_index_)) { + return false; + } + + compaction_inputs_.push_back(start_level_inputs_); + if (!output_level_inputs_.empty()) { + compaction_inputs_.push_back(output_level_inputs_); + } + + // In some edge cases we could pick a compaction that will be compacting + // a key range that overlap with another running compaction, and both + // of them have the same output level. This could happen if + // (1) we are running a non-exclusive manual compaction + // (2) AddFile ingest a new file into the LSM tree + // We need to disallow this from happening. + if (compaction_picker_->FilesRangeOverlapWithCompaction(compaction_inputs_, + output_level_)) { + // This compaction output could potentially conflict with the output + // of a currently running compaction, we cannot run it. + return false; + } + compaction_picker_->GetGrandparents(vstorage_, start_level_inputs_, + output_level_inputs_, &grandparents_); + } else { + compaction_inputs_.push_back(start_level_inputs_); + } + return true; +} + +Compaction* LevelCompactionBuilder::PickCompaction() { + // Pick up the first file to start compaction. It may have been extended + // to a clean cut. + SetupInitialFiles(); + if (start_level_inputs_.empty()) { + return nullptr; + } + assert(start_level_ >= 0 && output_level_ >= 0); + + // If it is a L0 -> base level compaction, we need to set up other L0 + // files if needed. + if (!SetupOtherL0FilesIfNeeded()) { + return nullptr; + } + + // Pick files in the output level and expand more files in the start level + // if needed. + if (!SetupOtherInputsIfNeeded()) { + return nullptr; + } + + // Form a compaction object containing the files we picked. + Compaction* c = GetCompaction(); + + TEST_SYNC_POINT_CALLBACK("LevelCompactionPicker::PickCompaction:Return", c); + + return c; +} + +Compaction* LevelCompactionBuilder::GetCompaction() { + auto c = new Compaction( + vstorage_, ioptions_, mutable_cf_options_, std::move(compaction_inputs_), + output_level_, mutable_cf_options_.MaxFileSizeForLevel(output_level_), + mutable_cf_options_.max_compaction_bytes, + GetPathId(ioptions_, mutable_cf_options_, output_level_), + GetCompressionType(ioptions_, vstorage_, mutable_cf_options_, + output_level_, vstorage_->base_level()), + std::move(grandparents_), is_manual_, start_level_score_, + false /* deletion_compaction */, compaction_reason_); + + // If it's level 0 compaction, make sure we don't execute any other level 0 + // compactions in parallel + compaction_picker_->RegisterCompaction(c); + + // Creating a compaction influences the compaction score because the score + // takes running compactions into account (by skipping files that are already + // being compacted). Since we just changed compaction score, we recalculate it + // here + vstorage_->ComputeCompactionScore(ioptions_, mutable_cf_options_); + return c; +} + +/* + * Find the optimal path to place a file + * Given a level, finds the path where levels up to it will fit in levels + * up to and including this path + */ +uint32_t LevelCompactionBuilder::GetPathId( + const ImmutableCFOptions& ioptions, + const MutableCFOptions& mutable_cf_options, int level) { + uint32_t p = 0; + assert(!ioptions.db_paths.empty()); + + // size remaining in the most recent path + uint64_t current_path_size = ioptions.db_paths[0].target_size; + + uint64_t level_size; + int cur_level = 0; + + level_size = mutable_cf_options.max_bytes_for_level_base; + + // Last path is the fallback + while (p < ioptions.db_paths.size() - 1) { + if (level_size <= current_path_size) { + if (cur_level == level) { + // Does desired level fit in this path? + return p; + } else { + current_path_size -= level_size; + level_size = static_cast( + level_size * mutable_cf_options.max_bytes_for_level_multiplier); + cur_level++; + continue; + } + } + p++; + current_path_size = ioptions.db_paths[p].target_size; + } + return p; +} + +bool LevelCompactionBuilder::PickFileToCompact() { + // level 0 files are overlapping. So we cannot pick more + // than one concurrent compactions at this level. This + // could be made better by looking at key-ranges that are + // being compacted at level 0. + if (start_level_ == 0 && + !compaction_picker_->level0_compactions_in_progress()->empty()) { + TEST_SYNC_POINT("LevelCompactionPicker::PickCompactionBySize:0"); + return false; + } + + start_level_inputs_.clear(); + + assert(start_level_ >= 0); + + // Pick the largest file in this level that is not already + // being compacted + const std::vector& file_size = + vstorage_->FilesByCompactionPri(start_level_); + const std::vector& level_files = + vstorage_->LevelFiles(start_level_); + + unsigned int cmp_idx; + for (cmp_idx = vstorage_->NextCompactionIndex(start_level_); + cmp_idx < file_size.size(); cmp_idx++) { + int index = file_size[cmp_idx]; + auto* f = level_files[index]; + + // do not pick a file to compact if it is being compacted + // from n-1 level. + if (f->being_compacted) { + continue; + } + + start_level_inputs_.files.push_back(f); + start_level_inputs_.level = start_level_; + if (!compaction_picker_->ExpandInputsToCleanCut(cf_name_, vstorage_, + &start_level_inputs_) || + compaction_picker_->FilesRangeOverlapWithCompaction( + {start_level_inputs_}, output_level_)) { + // A locked (pending compaction) input-level file was pulled in due to + // user-key overlap. + start_level_inputs_.clear(); + continue; + } + + // Now that input level is fully expanded, we check whether any output files + // are locked due to pending compaction. + // + // Note we rely on ExpandInputsToCleanCut() to tell us whether any output- + // level files are locked, not just the extra ones pulled in for user-key + // overlap. + InternalKey smallest, largest; + compaction_picker_->GetRange(start_level_inputs_, &smallest, &largest); + CompactionInputFiles output_level_inputs; + output_level_inputs.level = output_level_; + vstorage_->GetOverlappingInputs(output_level_, &smallest, &largest, + &output_level_inputs.files); + if (!output_level_inputs.empty() && + !compaction_picker_->ExpandInputsToCleanCut(cf_name_, vstorage_, + &output_level_inputs)) { + start_level_inputs_.clear(); + continue; + } + base_index_ = index; + break; + } + + // store where to start the iteration in the next call to PickCompaction + vstorage_->SetNextCompactionIndex(start_level_, cmp_idx); + + return start_level_inputs_.size() > 0; +} + +bool LevelCompactionBuilder::PickIntraL0Compaction() { + start_level_inputs_.clear(); + const std::vector& level_files = + vstorage_->LevelFiles(0 /* level */); + if (level_files.size() < + static_cast( + mutable_cf_options_.level0_file_num_compaction_trigger + 2) || + level_files[0]->being_compacted) { + // If L0 isn't accumulating much files beyond the regular trigger, don't + // resort to L0->L0 compaction yet. + return false; + } + return FindIntraL0Compaction(level_files, kMinFilesForIntraL0Compaction, + port::kMaxUint64, &start_level_inputs_); +} +} // namespace + +Compaction* LevelCompactionPicker::PickCompaction( + const std::string& cf_name, const MutableCFOptions& mutable_cf_options, + VersionStorageInfo* vstorage, LogBuffer* log_buffer) { + LevelCompactionBuilder builder(cf_name, vstorage, this, log_buffer, + mutable_cf_options, ioptions_); + return builder.PickCompaction(); +} + +#ifndef ROCKSDB_LITE +bool FIFOCompactionPicker::NeedsCompaction( + const VersionStorageInfo* vstorage) const { + const int kLevel0 = 0; + return vstorage->CompactionScore(kLevel0) >= 1; +} + +namespace { +uint64_t GetTotalFilesSize( + const std::vector& files) { + uint64_t total_size = 0; + for (const auto& f : files) { + total_size += f->fd.file_size; + } + return total_size; +} +} // anonymous namespace + +Compaction* FIFOCompactionPicker::PickTTLCompaction( + const std::string& cf_name, const MutableCFOptions& mutable_cf_options, + VersionStorageInfo* vstorage, LogBuffer* log_buffer) { + assert(ioptions_.compaction_options_fifo.ttl > 0); + + const int kLevel0 = 0; + const std::vector& level_files = vstorage->LevelFiles(kLevel0); + uint64_t total_size = GetTotalFilesSize(level_files); + + int64_t _current_time; + auto status = ioptions_.env->GetCurrentTime(&_current_time); + if (!status.ok()) { + ROCKS_LOG_BUFFER(log_buffer, + "[%s] FIFO compaction: Couldn't get current time: %s. " + "Not doing compactions based on TTL. ", + cf_name.c_str(), status.ToString().c_str()); + return nullptr; + } + const uint64_t current_time = static_cast(_current_time); + + std::vector inputs; + inputs.emplace_back(); + inputs[0].level = 0; + + for (auto ritr = level_files.rbegin(); ritr != level_files.rend(); ++ritr) { + auto f = *ritr; + if (f->fd.table_reader != nullptr && + f->fd.table_reader->GetTableProperties() != nullptr) { + auto creation_time = + f->fd.table_reader->GetTableProperties()->creation_time; + if (creation_time == 0 || + creation_time >= + (current_time - ioptions_.compaction_options_fifo.ttl)) { + break; + } + total_size -= f->compensated_file_size; + inputs[0].files.push_back(f); + } + } + + // Return a nullptr and proceed to size-based FIFO compaction if: + // 1. there are no files older than ttl OR + // 2. there are a few files older than ttl, but deleting them will not bring + // the total size to be less than max_table_files_size threshold. + if (inputs[0].files.empty() || + total_size > ioptions_.compaction_options_fifo.max_table_files_size) { + return nullptr; + } + + for (const auto& f : inputs[0].files) { + ROCKS_LOG_BUFFER(log_buffer, + "[%s] FIFO compaction: picking file %" PRIu64 + " with creation time %" PRIu64 " for deletion", + cf_name.c_str(), f->fd.GetNumber(), + f->fd.table_reader->GetTableProperties()->creation_time); + } + + Compaction* c = new Compaction( + vstorage, ioptions_, mutable_cf_options, std::move(inputs), 0, 0, 0, 0, + kNoCompression, {}, /* is manual */ false, vstorage->CompactionScore(0), + /* is deletion compaction */ true, CompactionReason::kFIFOTtl); + return c; +} + +Compaction* FIFOCompactionPicker::PickSizeCompaction( + const std::string& cf_name, const MutableCFOptions& mutable_cf_options, + VersionStorageInfo* vstorage, LogBuffer* log_buffer) { + const int kLevel0 = 0; + const std::vector& level_files = vstorage->LevelFiles(kLevel0); + uint64_t total_size = GetTotalFilesSize(level_files); + + if (total_size <= ioptions_.compaction_options_fifo.max_table_files_size || + level_files.size() == 0) { + // total size not exceeded + if (ioptions_.compaction_options_fifo.allow_compaction && + level_files.size() > 0) { + CompactionInputFiles comp_inputs; + if (FindIntraL0Compaction( + level_files, + mutable_cf_options + .level0_file_num_compaction_trigger /* min_files_to_compact */, + mutable_cf_options.write_buffer_size, &comp_inputs)) { + Compaction* c = new Compaction( + vstorage, ioptions_, mutable_cf_options, {comp_inputs}, 0, + 16 * 1024 * 1024 /* output file size limit */, + 0 /* max compaction bytes, not applicable */, + 0 /* output path ID */, mutable_cf_options.compression, {}, + /* is manual */ false, vstorage->CompactionScore(0), + /* is deletion compaction */ false, + CompactionReason::kFIFOReduceNumFiles); + return c; + } + } + + ROCKS_LOG_BUFFER(log_buffer, + "[%s] FIFO compaction: nothing to do. Total size %" PRIu64 + ", max size %" PRIu64 "\n", + cf_name.c_str(), total_size, + ioptions_.compaction_options_fifo.max_table_files_size); + return nullptr; + } + + if (!level0_compactions_in_progress_.empty()) { + ROCKS_LOG_BUFFER( + log_buffer, + "[%s] FIFO compaction: Already executing compaction. No need " + "to run parallel compactions since compactions are very fast", + cf_name.c_str()); + return nullptr; + } + + std::vector inputs; + inputs.emplace_back(); + inputs[0].level = 0; + + for (auto ritr = level_files.rbegin(); ritr != level_files.rend(); ++ritr) { + auto f = *ritr; + total_size -= f->compensated_file_size; + inputs[0].files.push_back(f); + char tmp_fsize[16]; + AppendHumanBytes(f->fd.GetFileSize(), tmp_fsize, sizeof(tmp_fsize)); + ROCKS_LOG_BUFFER(log_buffer, + "[%s] FIFO compaction: picking file %" PRIu64 + " with size %s for deletion", + cf_name.c_str(), f->fd.GetNumber(), tmp_fsize); + if (total_size <= ioptions_.compaction_options_fifo.max_table_files_size) { + break; + } + } + + Compaction* c = new Compaction( + vstorage, ioptions_, mutable_cf_options, std::move(inputs), 0, 0, 0, 0, + kNoCompression, {}, /* is manual */ false, vstorage->CompactionScore(0), + /* is deletion compaction */ true, CompactionReason::kFIFOMaxSize); + return c; +} + +Compaction* FIFOCompactionPicker::PickCompaction( + const std::string& cf_name, const MutableCFOptions& mutable_cf_options, + VersionStorageInfo* vstorage, LogBuffer* log_buffer) { + assert(vstorage->num_levels() == 1); + + Compaction* c = nullptr; + if (ioptions_.compaction_options_fifo.ttl > 0) { + c = PickTTLCompaction(cf_name, mutable_cf_options, vstorage, log_buffer); + } + if (c == nullptr) { + c = PickSizeCompaction(cf_name, mutable_cf_options, vstorage, log_buffer); + } + RegisterCompaction(c); + return c; +} + +Compaction* FIFOCompactionPicker::CompactRange( + const std::string& cf_name, const MutableCFOptions& mutable_cf_options, + VersionStorageInfo* vstorage, int input_level, int output_level, + uint32_t output_path_id, const InternalKey* begin, const InternalKey* end, + InternalKey** compaction_end, bool* manual_conflict) { + assert(input_level == 0); + assert(output_level == 0); + *compaction_end = nullptr; + LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, ioptions_.info_log); + Compaction* c = + PickCompaction(cf_name, mutable_cf_options, vstorage, &log_buffer); + log_buffer.FlushBufferToLog(); + return c; +} + +#endif // !ROCKSDB_LITE + +} // namespace rocksdb http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/compaction_picker.h ---------------------------------------------------------------------- diff --git a/thirdparty/rocksdb/db/compaction_picker.h b/thirdparty/rocksdb/db/compaction_picker.h new file mode 100644 index 0000000..f44139c --- /dev/null +++ b/thirdparty/rocksdb/db/compaction_picker.h @@ -0,0 +1,298 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#pragma once + +#include +#include +#include +#include +#include + +#include "db/compaction.h" +#include "db/version_set.h" +#include "options/cf_options.h" +#include "rocksdb/env.h" +#include "rocksdb/options.h" +#include "rocksdb/status.h" + +namespace rocksdb { + +class LogBuffer; +class Compaction; +class VersionStorageInfo; +struct CompactionInputFiles; + +class CompactionPicker { + public: + CompactionPicker(const ImmutableCFOptions& ioptions, + const InternalKeyComparator* icmp); + virtual ~CompactionPicker(); + + // Pick level and inputs for a new compaction. + // Returns nullptr if there is no compaction to be done. + // Otherwise returns a pointer to a heap-allocated object that + // describes the compaction. Caller should delete the result. + virtual Compaction* PickCompaction(const std::string& cf_name, + const MutableCFOptions& mutable_cf_options, + VersionStorageInfo* vstorage, + LogBuffer* log_buffer) = 0; + + // Return a compaction object for compacting the range [begin,end] in + // the specified level. Returns nullptr if there is nothing in that + // level that overlaps the specified range. Caller should delete + // the result. + // + // The returned Compaction might not include the whole requested range. + // In that case, compaction_end will be set to the next key that needs + // compacting. In case the compaction will compact the whole range, + // compaction_end will be set to nullptr. + // Client is responsible for compaction_end storage -- when called, + // *compaction_end should point to valid InternalKey! + virtual Compaction* CompactRange( + const std::string& cf_name, const MutableCFOptions& mutable_cf_options, + VersionStorageInfo* vstorage, int input_level, int output_level, + uint32_t output_path_id, const InternalKey* begin, const InternalKey* end, + InternalKey** compaction_end, bool* manual_conflict); + + // The maximum allowed output level. Default value is NumberLevels() - 1. + virtual int MaxOutputLevel() const { return NumberLevels() - 1; } + + virtual bool NeedsCompaction(const VersionStorageInfo* vstorage) const = 0; + +// Sanitize the input set of compaction input files. +// When the input parameters do not describe a valid compaction, the +// function will try to fix the input_files by adding necessary +// files. If it's not possible to conver an invalid input_files +// into a valid one by adding more files, the function will return a +// non-ok status with specific reason. +#ifndef ROCKSDB_LITE + Status SanitizeCompactionInputFiles(std::unordered_set* input_files, + const ColumnFamilyMetaData& cf_meta, + const int output_level) const; +#endif // ROCKSDB_LITE + + // Free up the files that participated in a compaction + // + // Requirement: DB mutex held + void ReleaseCompactionFiles(Compaction* c, Status status); + + // Returns true if any one of the specified files are being compacted + bool AreFilesInCompaction(const std::vector& files); + + // Takes a list of CompactionInputFiles and returns a (manual) Compaction + // object. + Compaction* CompactFiles(const CompactionOptions& compact_options, + const std::vector& input_files, + int output_level, VersionStorageInfo* vstorage, + const MutableCFOptions& mutable_cf_options, + uint32_t output_path_id); + + // Converts a set of compaction input file numbers into + // a list of CompactionInputFiles. + Status GetCompactionInputsFromFileNumbers( + std::vector* input_files, + std::unordered_set* input_set, + const VersionStorageInfo* vstorage, + const CompactionOptions& compact_options) const; + + // Is there currently a compaction involving level 0 taking place + bool IsLevel0CompactionInProgress() const { + return !level0_compactions_in_progress_.empty(); + } + + // Return true if the passed key range overlap with a compaction output + // that is currently running. + bool RangeOverlapWithCompaction(const Slice& smallest_user_key, + const Slice& largest_user_key, + int level) const; + + // Stores the minimal range that covers all entries in inputs in + // *smallest, *largest. + // REQUIRES: inputs is not empty + void GetRange(const CompactionInputFiles& inputs, InternalKey* smallest, + InternalKey* largest) const; + + // Stores the minimal range that covers all entries in inputs1 and inputs2 + // in *smallest, *largest. + // REQUIRES: inputs is not empty + void GetRange(const CompactionInputFiles& inputs1, + const CompactionInputFiles& inputs2, InternalKey* smallest, + InternalKey* largest) const; + + // Stores the minimal range that covers all entries in inputs + // in *smallest, *largest. + // REQUIRES: inputs is not empty (at least on entry have one file) + void GetRange(const std::vector& inputs, + InternalKey* smallest, InternalKey* largest) const; + + int NumberLevels() const { return ioptions_.num_levels; } + + // Add more files to the inputs on "level" to make sure that + // no newer version of a key is compacted to "level+1" while leaving an older + // version in a "level". Otherwise, any Get() will search "level" first, + // and will likely return an old/stale value for the key, since it always + // searches in increasing order of level to find the value. This could + // also scramble the order of merge operands. This function should be + // called any time a new Compaction is created, and its inputs_[0] are + // populated. + // + // Will return false if it is impossible to apply this compaction. + bool ExpandInputsToCleanCut(const std::string& cf_name, + VersionStorageInfo* vstorage, + CompactionInputFiles* inputs); + + // Returns true if any one of the parent files are being compacted + bool IsRangeInCompaction(VersionStorageInfo* vstorage, + const InternalKey* smallest, + const InternalKey* largest, int level, int* index); + + // Returns true if the key range that `inputs` files cover overlap with the + // key range of a currently running compaction. + bool FilesRangeOverlapWithCompaction( + const std::vector& inputs, int level) const; + + bool SetupOtherInputs(const std::string& cf_name, + const MutableCFOptions& mutable_cf_options, + VersionStorageInfo* vstorage, + CompactionInputFiles* inputs, + CompactionInputFiles* output_level_inputs, + int* parent_index, int base_index); + + void GetGrandparents(VersionStorageInfo* vstorage, + const CompactionInputFiles& inputs, + const CompactionInputFiles& output_level_inputs, + std::vector* grandparents); + + // Register this compaction in the set of running compactions + void RegisterCompaction(Compaction* c); + + // Remove this compaction from the set of running compactions + void UnregisterCompaction(Compaction* c); + + std::set* level0_compactions_in_progress() { + return &level0_compactions_in_progress_; + } + std::unordered_set* compactions_in_progress() { + return &compactions_in_progress_; + } + + protected: + const ImmutableCFOptions& ioptions_; + +// A helper function to SanitizeCompactionInputFiles() that +// sanitizes "input_files" by adding necessary files. +#ifndef ROCKSDB_LITE + virtual Status SanitizeCompactionInputFilesForAllLevels( + std::unordered_set* input_files, + const ColumnFamilyMetaData& cf_meta, const int output_level) const; +#endif // ROCKSDB_LITE + + // Keeps track of all compactions that are running on Level0. + // Protected by DB mutex + std::set level0_compactions_in_progress_; + + // Keeps track of all compactions that are running. + // Protected by DB mutex + std::unordered_set compactions_in_progress_; + + const InternalKeyComparator* const icmp_; +}; + +class LevelCompactionPicker : public CompactionPicker { + public: + LevelCompactionPicker(const ImmutableCFOptions& ioptions, + const InternalKeyComparator* icmp) + : CompactionPicker(ioptions, icmp) {} + virtual Compaction* PickCompaction(const std::string& cf_name, + const MutableCFOptions& mutable_cf_options, + VersionStorageInfo* vstorage, + LogBuffer* log_buffer) override; + + virtual bool NeedsCompaction( + const VersionStorageInfo* vstorage) const override; +}; + +#ifndef ROCKSDB_LITE +class FIFOCompactionPicker : public CompactionPicker { + public: + FIFOCompactionPicker(const ImmutableCFOptions& ioptions, + const InternalKeyComparator* icmp) + : CompactionPicker(ioptions, icmp) {} + + virtual Compaction* PickCompaction(const std::string& cf_name, + const MutableCFOptions& mutable_cf_options, + VersionStorageInfo* version, + LogBuffer* log_buffer) override; + + virtual Compaction* CompactRange( + const std::string& cf_name, const MutableCFOptions& mutable_cf_options, + VersionStorageInfo* vstorage, int input_level, int output_level, + uint32_t output_path_id, const InternalKey* begin, const InternalKey* end, + InternalKey** compaction_end, bool* manual_conflict) override; + + // The maximum allowed output level. Always returns 0. + virtual int MaxOutputLevel() const override { return 0; } + + virtual bool NeedsCompaction( + const VersionStorageInfo* vstorage) const override; + + private: + Compaction* PickTTLCompaction(const std::string& cf_name, + const MutableCFOptions& mutable_cf_options, + VersionStorageInfo* version, + LogBuffer* log_buffer); + + Compaction* PickSizeCompaction(const std::string& cf_name, + const MutableCFOptions& mutable_cf_options, + VersionStorageInfo* version, + LogBuffer* log_buffer); +}; + +class NullCompactionPicker : public CompactionPicker { + public: + NullCompactionPicker(const ImmutableCFOptions& ioptions, + const InternalKeyComparator* icmp) + : CompactionPicker(ioptions, icmp) {} + virtual ~NullCompactionPicker() {} + + // Always return "nullptr" + Compaction* PickCompaction(const std::string& cf_name, + const MutableCFOptions& mutable_cf_options, + VersionStorageInfo* vstorage, + LogBuffer* log_buffer) override { + return nullptr; + } + + // Always return "nullptr" + Compaction* CompactRange(const std::string& cf_name, + const MutableCFOptions& mutable_cf_options, + VersionStorageInfo* vstorage, int input_level, + int output_level, uint32_t output_path_id, + const InternalKey* begin, const InternalKey* end, + InternalKey** compaction_end, + bool* manual_conflict) override { + return nullptr; + } + + // Always returns false. + virtual bool NeedsCompaction( + const VersionStorageInfo* vstorage) const override { + return false; + } +}; +#endif // !ROCKSDB_LITE + +CompressionType GetCompressionType(const ImmutableCFOptions& ioptions, + const VersionStorageInfo* vstorage, + const MutableCFOptions& mutable_cf_options, + int level, int base_level, + const bool enable_compression = true); + +} // namespace rocksdb