Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 0BEAF200D24 for ; Mon, 9 Oct 2017 18:24:48 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 0A6B31609E0; Mon, 9 Oct 2017 16:24:48 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 30685160BF4 for ; Mon, 9 Oct 2017 18:24:44 +0200 (CEST) Received: (qmail 65571 invoked by uid 500); 9 Oct 2017 16:24:43 -0000 Mailing-List: contact commits-help@nifi.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@nifi.apache.org Delivered-To: mailing list commits@nifi.apache.org Received: (qmail 65146 invoked by uid 99); 9 Oct 2017 16:24:43 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 09 Oct 2017 16:24:43 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 6D838F5D47; Mon, 9 Oct 2017 16:24:42 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jeremydyer@apache.org To: commits@nifi.apache.org Date: Mon, 09 Oct 2017 16:25:07 -0000 Message-Id: <6e448b11cba64483a28b84276dcc6a07@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [27/51] [partial] nifi-minifi-cpp git commit: MINIFI-372: Replace leveldb with RocksDB archived-at: Mon, 09 Oct 2017 16:24:48 -0000 http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/compaction.cc ---------------------------------------------------------------------- diff --git a/thirdparty/rocksdb/db/compaction.cc b/thirdparty/rocksdb/db/compaction.cc new file mode 100644 index 0000000..9ea332d --- /dev/null +++ b/thirdparty/rocksdb/db/compaction.cc @@ -0,0 +1,480 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "db/compaction.h" + +#ifndef __STDC_FORMAT_MACROS +#define __STDC_FORMAT_MACROS +#endif + +#include +#include + +#include "db/column_family.h" +#include "rocksdb/compaction_filter.h" +#include "util/string_util.h" +#include "util/sync_point.h" + +namespace rocksdb { + +uint64_t TotalFileSize(const std::vector& files) { + uint64_t sum = 0; + for (size_t i = 0; i < files.size() && files[i]; i++) { + sum += files[i]->fd.GetFileSize(); + } + return sum; +} + +void Compaction::SetInputVersion(Version* _input_version) { + input_version_ = _input_version; + cfd_ = input_version_->cfd(); + + cfd_->Ref(); + input_version_->Ref(); + edit_.SetColumnFamily(cfd_->GetID()); +} + +void Compaction::GetBoundaryKeys( + VersionStorageInfo* vstorage, + const std::vector& inputs, Slice* smallest_user_key, + Slice* largest_user_key) { + bool initialized = false; + const Comparator* ucmp = vstorage->InternalComparator()->user_comparator(); + for (size_t i = 0; i < inputs.size(); ++i) { + if (inputs[i].files.empty()) { + continue; + } + if (inputs[i].level == 0) { + // we need to consider all files on level 0 + for (const auto* f : inputs[i].files) { + const Slice& start_user_key = f->smallest.user_key(); + if (!initialized || + ucmp->Compare(start_user_key, *smallest_user_key) < 0) { + *smallest_user_key = start_user_key; + } + const Slice& end_user_key = f->largest.user_key(); + if (!initialized || + ucmp->Compare(end_user_key, *largest_user_key) > 0) { + *largest_user_key = end_user_key; + } + initialized = true; + } + } else { + // we only need to consider the first and last file + const Slice& start_user_key = inputs[i].files[0]->smallest.user_key(); + if (!initialized || + ucmp->Compare(start_user_key, *smallest_user_key) < 0) { + *smallest_user_key = start_user_key; + } + const Slice& end_user_key = inputs[i].files.back()->largest.user_key(); + if (!initialized || ucmp->Compare(end_user_key, *largest_user_key) > 0) { + *largest_user_key = end_user_key; + } + initialized = true; + } + } +} + +// helper function to determine if compaction is creating files at the +// bottommost level +bool Compaction::IsBottommostLevel( + int output_level, VersionStorageInfo* vstorage, + const std::vector& inputs) { + if (inputs[0].level == 0 && + inputs[0].files.back() != vstorage->LevelFiles(0).back()) { + return false; + } + + Slice smallest_key, largest_key; + GetBoundaryKeys(vstorage, inputs, &smallest_key, &largest_key); + + // Checks whether there are files living beyond the output_level. + // If lower levels have files, it checks for overlap between files + // if the compaction process and those files. + // Bottomlevel optimizations can be made if there are no files in + // lower levels or if there is no overlap with the files in + // the lower levels. + for (int i = output_level + 1; i < vstorage->num_levels(); i++) { + // It is not the bottommost level if there are files in higher + // levels when the output level is 0 or if there are files in + // higher levels which overlap with files to be compacted. + // output_level == 0 means that we want it to be considered + // s the bottommost level only if the last file on the level + // is a part of the files to be compacted - this is verified by + // the first if condition in this function + if (vstorage->NumLevelFiles(i) > 0 && + (output_level == 0 || + vstorage->OverlapInLevel(i, &smallest_key, &largest_key))) { + return false; + } + } + return true; +} + +// test function to validate the functionality of IsBottommostLevel() +// function -- determines if compaction with inputs and storage is bottommost +bool Compaction::TEST_IsBottommostLevel( + int output_level, VersionStorageInfo* vstorage, + const std::vector& inputs) { + return IsBottommostLevel(output_level, vstorage, inputs); +} + +bool Compaction::IsFullCompaction( + VersionStorageInfo* vstorage, + const std::vector& inputs) { + size_t num_files_in_compaction = 0; + size_t total_num_files = 0; + for (int l = 0; l < vstorage->num_levels(); l++) { + total_num_files += vstorage->NumLevelFiles(l); + } + for (size_t i = 0; i < inputs.size(); i++) { + num_files_in_compaction += inputs[i].size(); + } + return num_files_in_compaction == total_num_files; +} + +Compaction::Compaction(VersionStorageInfo* vstorage, + const ImmutableCFOptions& _immutable_cf_options, + const MutableCFOptions& _mutable_cf_options, + std::vector _inputs, + int _output_level, uint64_t _target_file_size, + uint64_t _max_compaction_bytes, uint32_t _output_path_id, + CompressionType _compression, + std::vector _grandparents, + bool _manual_compaction, double _score, + bool _deletion_compaction, + CompactionReason _compaction_reason) + : input_vstorage_(vstorage), + start_level_(_inputs[0].level), + output_level_(_output_level), + max_output_file_size_(_target_file_size), + max_compaction_bytes_(_max_compaction_bytes), + immutable_cf_options_(_immutable_cf_options), + mutable_cf_options_(_mutable_cf_options), + input_version_(nullptr), + number_levels_(vstorage->num_levels()), + cfd_(nullptr), + output_path_id_(_output_path_id), + output_compression_(_compression), + deletion_compaction_(_deletion_compaction), + inputs_(std::move(_inputs)), + grandparents_(std::move(_grandparents)), + score_(_score), + bottommost_level_(IsBottommostLevel(output_level_, vstorage, inputs_)), + is_full_compaction_(IsFullCompaction(vstorage, inputs_)), + is_manual_compaction_(_manual_compaction), + is_trivial_move_(false), + compaction_reason_(_compaction_reason) { + MarkFilesBeingCompacted(true); + if (is_manual_compaction_) { + compaction_reason_ = CompactionReason::kManualCompaction; + } + +#ifndef NDEBUG + for (size_t i = 1; i < inputs_.size(); ++i) { + assert(inputs_[i].level > inputs_[i - 1].level); + } +#endif + + // setup input_levels_ + { + input_levels_.resize(num_input_levels()); + for (size_t which = 0; which < num_input_levels(); which++) { + DoGenerateLevelFilesBrief(&input_levels_[which], inputs_[which].files, + &arena_); + } + } + + GetBoundaryKeys(vstorage, inputs_, &smallest_user_key_, &largest_user_key_); +} + +Compaction::~Compaction() { + if (input_version_ != nullptr) { + input_version_->Unref(); + } + if (cfd_ != nullptr) { + if (cfd_->Unref()) { + delete cfd_; + } + } +} + +bool Compaction::InputCompressionMatchesOutput() const { + int base_level = input_vstorage_->base_level(); + bool matches = (GetCompressionType(immutable_cf_options_, input_vstorage_, + mutable_cf_options_, start_level_, + base_level) == output_compression_); + if (matches) { + TEST_SYNC_POINT("Compaction::InputCompressionMatchesOutput:Matches"); + return true; + } + TEST_SYNC_POINT("Compaction::InputCompressionMatchesOutput:DidntMatch"); + return matches; +} + +bool Compaction::IsTrivialMove() const { + // Avoid a move if there is lots of overlapping grandparent data. + // Otherwise, the move could create a parent file that will require + // a very expensive merge later on. + // If start_level_== output_level_, the purpose is to force compaction + // filter to be applied to that level, and thus cannot be a trivial move. + + // Check if start level have files with overlapping ranges + if (start_level_ == 0 && input_vstorage_->level0_non_overlapping() == false) { + // We cannot move files from L0 to L1 if the files are overlapping + return false; + } + + if (is_manual_compaction_ && + (immutable_cf_options_.compaction_filter != nullptr || + immutable_cf_options_.compaction_filter_factory != nullptr)) { + // This is a manual compaction and we have a compaction filter that should + // be executed, we cannot do a trivial move + return false; + } + + // Used in universal compaction, where trivial move can be done if the + // input files are non overlapping + if ((immutable_cf_options_.compaction_options_universal.allow_trivial_move) && + (output_level_ != 0)) { + return is_trivial_move_; + } + + if (!(start_level_ != output_level_ && num_input_levels() == 1 && + input(0, 0)->fd.GetPathId() == output_path_id() && + InputCompressionMatchesOutput())) { + return false; + } + + // assert inputs_.size() == 1 + + for (const auto& file : inputs_.front().files) { + std::vector file_grand_parents; + if (output_level_ + 1 >= number_levels_) { + continue; + } + input_vstorage_->GetOverlappingInputs(output_level_ + 1, &file->smallest, + &file->largest, &file_grand_parents); + const auto compaction_size = + file->fd.GetFileSize() + TotalFileSize(file_grand_parents); + if (compaction_size > max_compaction_bytes_) { + return false; + } + } + + return true; +} + +void Compaction::AddInputDeletions(VersionEdit* out_edit) { + for (size_t which = 0; which < num_input_levels(); which++) { + for (size_t i = 0; i < inputs_[which].size(); i++) { + out_edit->DeleteFile(level(which), inputs_[which][i]->fd.GetNumber()); + } + } +} + +bool Compaction::KeyNotExistsBeyondOutputLevel( + const Slice& user_key, std::vector* level_ptrs) const { + assert(input_version_ != nullptr); + assert(level_ptrs != nullptr); + assert(level_ptrs->size() == static_cast(number_levels_)); + assert(cfd_->ioptions()->compaction_style != kCompactionStyleFIFO); + if (cfd_->ioptions()->compaction_style == kCompactionStyleUniversal) { + return bottommost_level_; + } + if (cfd_->ioptions()->compaction_style == kCompactionStyleLevel && + output_level_ == 0) { + return false; + } + // Maybe use binary search to find right entry instead of linear search? + const Comparator* user_cmp = cfd_->user_comparator(); + for (int lvl = output_level_ + 1; lvl < number_levels_; lvl++) { + const std::vector& files = input_vstorage_->LevelFiles(lvl); + for (; level_ptrs->at(lvl) < files.size(); level_ptrs->at(lvl)++) { + auto* f = files[level_ptrs->at(lvl)]; + if (user_cmp->Compare(user_key, f->largest.user_key()) <= 0) { + // We've advanced far enough + if (user_cmp->Compare(user_key, f->smallest.user_key()) >= 0) { + // Key falls in this file's range, so definitely + // exists beyond output level + return false; + } + break; + } + } + } + return true; +} + +// Mark (or clear) each file that is being compacted +void Compaction::MarkFilesBeingCompacted(bool mark_as_compacted) { + for (size_t i = 0; i < num_input_levels(); i++) { + for (size_t j = 0; j < inputs_[i].size(); j++) { + assert(mark_as_compacted ? !inputs_[i][j]->being_compacted + : inputs_[i][j]->being_compacted); + inputs_[i][j]->being_compacted = mark_as_compacted; + } + } +} + +// Sample output: +// If compacting 3 L0 files, 2 L3 files and 1 L4 file, and outputting to L5, +// print: "3@0 + 2@3 + 1@4 files to L5" +const char* Compaction::InputLevelSummary( + InputLevelSummaryBuffer* scratch) const { + int len = 0; + bool is_first = true; + for (auto& input_level : inputs_) { + if (input_level.empty()) { + continue; + } + if (!is_first) { + len += + snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len, " + "); + } else { + is_first = false; + } + len += snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len, + "%" ROCKSDB_PRIszt "@%d", input_level.size(), + input_level.level); + } + snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len, + " files to L%d", output_level()); + + return scratch->buffer; +} + +uint64_t Compaction::CalculateTotalInputSize() const { + uint64_t size = 0; + for (auto& input_level : inputs_) { + for (auto f : input_level.files) { + size += f->fd.GetFileSize(); + } + } + return size; +} + +void Compaction::ReleaseCompactionFiles(Status status) { + MarkFilesBeingCompacted(false); + cfd_->compaction_picker()->ReleaseCompactionFiles(this, status); +} + +void Compaction::ResetNextCompactionIndex() { + assert(input_version_ != nullptr); + input_vstorage_->ResetNextCompactionIndex(start_level_); +} + +namespace { +int InputSummary(const std::vector& files, char* output, + int len) { + *output = '\0'; + int write = 0; + for (size_t i = 0; i < files.size(); i++) { + int sz = len - write; + int ret; + char sztxt[16]; + AppendHumanBytes(files.at(i)->fd.GetFileSize(), sztxt, 16); + ret = snprintf(output + write, sz, "%" PRIu64 "(%s) ", + files.at(i)->fd.GetNumber(), sztxt); + if (ret < 0 || ret >= sz) break; + write += ret; + } + // if files.size() is non-zero, overwrite the last space + return write - !!files.size(); +} +} // namespace + +void Compaction::Summary(char* output, int len) { + int write = + snprintf(output, len, "Base version %" PRIu64 " Base level %d, inputs: [", + input_version_->GetVersionNumber(), start_level_); + if (write < 0 || write >= len) { + return; + } + + for (size_t level_iter = 0; level_iter < num_input_levels(); ++level_iter) { + if (level_iter > 0) { + write += snprintf(output + write, len - write, "], ["); + if (write < 0 || write >= len) { + return; + } + } + write += + InputSummary(inputs_[level_iter].files, output + write, len - write); + if (write < 0 || write >= len) { + return; + } + } + + snprintf(output + write, len - write, "]"); +} + +uint64_t Compaction::OutputFilePreallocationSize() const { + uint64_t preallocation_size = 0; + + if (max_output_file_size_ != port::kMaxUint64 && + (cfd_->ioptions()->compaction_style == kCompactionStyleLevel || + output_level() > 0)) { + preallocation_size = max_output_file_size_; + } else { + for (const auto& level_files : inputs_) { + for (const auto& file : level_files.files) { + preallocation_size += file->fd.GetFileSize(); + } + } + } + // Over-estimate slightly so we don't end up just barely crossing + // the threshold + return preallocation_size + (preallocation_size / 10); +} + +std::unique_ptr Compaction::CreateCompactionFilter() const { + if (!cfd_->ioptions()->compaction_filter_factory) { + return nullptr; + } + + CompactionFilter::Context context; + context.is_full_compaction = is_full_compaction_; + context.is_manual_compaction = is_manual_compaction_; + context.column_family_id = cfd_->GetID(); + return cfd_->ioptions()->compaction_filter_factory->CreateCompactionFilter( + context); +} + +bool Compaction::IsOutputLevelEmpty() const { + return inputs_.back().level != output_level_ || inputs_.back().empty(); +} + +bool Compaction::ShouldFormSubcompactions() const { + if (immutable_cf_options_.max_subcompactions <= 1 || cfd_ == nullptr) { + return false; + } + if (cfd_->ioptions()->compaction_style == kCompactionStyleLevel) { + return start_level_ == 0 && output_level_ > 0 && !IsOutputLevelEmpty(); + } else if (cfd_->ioptions()->compaction_style == kCompactionStyleUniversal) { + return number_levels_ > 1 && output_level_ > 0; + } else { + return false; + } +} + +uint64_t Compaction::MaxInputFileCreationTime() const { + uint64_t max_creation_time = 0; + for (const auto& file : inputs_[0].files) { + if (file->fd.table_reader != nullptr && + file->fd.table_reader->GetTableProperties() != nullptr) { + uint64_t creation_time = + file->fd.table_reader->GetTableProperties()->creation_time; + max_creation_time = std::max(max_creation_time, creation_time); + } + } + return max_creation_time; +} + +} // namespace rocksdb http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/compaction.h ---------------------------------------------------------------------- diff --git a/thirdparty/rocksdb/db/compaction.h b/thirdparty/rocksdb/db/compaction.h new file mode 100644 index 0000000..7be6df2 --- /dev/null +++ b/thirdparty/rocksdb/db/compaction.h @@ -0,0 +1,325 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#pragma once +#include "db/version_set.h" +#include "options/cf_options.h" +#include "util/arena.h" +#include "util/autovector.h" + +namespace rocksdb { + +// The structure that manages compaction input files associated +// with the same physical level. +struct CompactionInputFiles { + int level; + std::vector files; + inline bool empty() const { return files.empty(); } + inline size_t size() const { return files.size(); } + inline void clear() { files.clear(); } + inline FileMetaData* operator[](size_t i) const { return files[i]; } +}; + +class Version; +class ColumnFamilyData; +class VersionStorageInfo; +class CompactionFilter; + +// A Compaction encapsulates information about a compaction. +class Compaction { + public: + Compaction(VersionStorageInfo* input_version, + const ImmutableCFOptions& immutable_cf_options, + const MutableCFOptions& mutable_cf_options, + std::vector inputs, int output_level, + uint64_t target_file_size, uint64_t max_compaction_bytes, + uint32_t output_path_id, CompressionType compression, + std::vector grandparents, + bool manual_compaction = false, double score = -1, + bool deletion_compaction = false, + CompactionReason compaction_reason = CompactionReason::kUnknown); + + // No copying allowed + Compaction(const Compaction&) = delete; + void operator=(const Compaction&) = delete; + + ~Compaction(); + + // Returns the level associated to the specified compaction input level. + // If compaction_input_level is not specified, then input_level is set to 0. + int level(size_t compaction_input_level = 0) const { + return inputs_[compaction_input_level].level; + } + + int start_level() const { return start_level_; } + + // Outputs will go to this level + int output_level() const { return output_level_; } + + // Returns the number of input levels in this compaction. + size_t num_input_levels() const { return inputs_.size(); } + + // Return the object that holds the edits to the descriptor done + // by this compaction. + VersionEdit* edit() { return &edit_; } + + // Returns the number of input files associated to the specified + // compaction input level. + // The function will return 0 if when "compaction_input_level" < 0 + // or "compaction_input_level" >= "num_input_levels()". + size_t num_input_files(size_t compaction_input_level) const { + if (compaction_input_level < inputs_.size()) { + return inputs_[compaction_input_level].size(); + } + return 0; + } + + // Returns input version of the compaction + Version* input_version() const { return input_version_; } + + // Returns the ColumnFamilyData associated with the compaction. + ColumnFamilyData* column_family_data() const { return cfd_; } + + // Returns the file meta data of the 'i'th input file at the + // specified compaction input level. + // REQUIREMENT: "compaction_input_level" must be >= 0 and + // < "input_levels()" + FileMetaData* input(size_t compaction_input_level, size_t i) const { + assert(compaction_input_level < inputs_.size()); + return inputs_[compaction_input_level][i]; + } + + // Returns the list of file meta data of the specified compaction + // input level. + // REQUIREMENT: "compaction_input_level" must be >= 0 and + // < "input_levels()" + const std::vector* inputs( + size_t compaction_input_level) const { + assert(compaction_input_level < inputs_.size()); + return &inputs_[compaction_input_level].files; + } + + const std::vector* inputs() { return &inputs_; } + + // Returns the LevelFilesBrief of the specified compaction input level. + const LevelFilesBrief* input_levels(size_t compaction_input_level) const { + return &input_levels_[compaction_input_level]; + } + + // Maximum size of files to build during this compaction. + uint64_t max_output_file_size() const { return max_output_file_size_; } + + // What compression for output + CompressionType output_compression() const { return output_compression_; } + + // Whether need to write output file to second DB path. + uint32_t output_path_id() const { return output_path_id_; } + + // Is this a trivial compaction that can be implemented by just + // moving a single input file to the next level (no merging or splitting) + bool IsTrivialMove() const; + + // If true, then the compaction can be done by simply deleting input files. + bool deletion_compaction() const { return deletion_compaction_; } + + // Add all inputs to this compaction as delete operations to *edit. + void AddInputDeletions(VersionEdit* edit); + + // Returns true if the available information we have guarantees that + // the input "user_key" does not exist in any level beyond "output_level()". + bool KeyNotExistsBeyondOutputLevel(const Slice& user_key, + std::vector* level_ptrs) const; + + // Clear all files to indicate that they are not being compacted + // Delete this compaction from the list of running compactions. + // + // Requirement: DB mutex held + void ReleaseCompactionFiles(Status status); + + // Returns the summary of the compaction in "output" with maximum "len" + // in bytes. The caller is responsible for the memory management of + // "output". + void Summary(char* output, int len); + + // Return the score that was used to pick this compaction run. + double score() const { return score_; } + + // Is this compaction creating a file in the bottom most level? + bool bottommost_level() const { return bottommost_level_; } + + // Does this compaction include all sst files? + bool is_full_compaction() const { return is_full_compaction_; } + + // Was this compaction triggered manually by the client? + bool is_manual_compaction() const { return is_manual_compaction_; } + + // Used when allow_trivial_move option is set in + // Universal compaction. If all the input files are + // non overlapping, then is_trivial_move_ variable + // will be set true, else false + void set_is_trivial_move(bool trivial_move) { + is_trivial_move_ = trivial_move; + } + + // Used when allow_trivial_move option is set in + // Universal compaction. Returns true, if the input files + // are non-overlapping and can be trivially moved. + bool is_trivial_move() const { return is_trivial_move_; } + + // How many total levels are there? + int number_levels() const { return number_levels_; } + + // Return the ImmutableCFOptions that should be used throughout the compaction + // procedure + const ImmutableCFOptions* immutable_cf_options() const { + return &immutable_cf_options_; + } + + // Return the MutableCFOptions that should be used throughout the compaction + // procedure + const MutableCFOptions* mutable_cf_options() const { + return &mutable_cf_options_; + } + + // Returns the size in bytes that the output file should be preallocated to. + // In level compaction, that is max_file_size_. In universal compaction, that + // is the sum of all input file sizes. + uint64_t OutputFilePreallocationSize() const; + + void SetInputVersion(Version* input_version); + + struct InputLevelSummaryBuffer { + char buffer[128]; + }; + + const char* InputLevelSummary(InputLevelSummaryBuffer* scratch) const; + + uint64_t CalculateTotalInputSize() const; + + // In case of compaction error, reset the nextIndex that is used + // to pick up the next file to be compacted from files_by_size_ + void ResetNextCompactionIndex(); + + // Create a CompactionFilter from compaction_filter_factory + std::unique_ptr CreateCompactionFilter() const; + + // Is the input level corresponding to output_level_ empty? + bool IsOutputLevelEmpty() const; + + // Should this compaction be broken up into smaller ones run in parallel? + bool ShouldFormSubcompactions() const; + + // test function to validate the functionality of IsBottommostLevel() + // function -- determines if compaction with inputs and storage is bottommost + static bool TEST_IsBottommostLevel( + int output_level, VersionStorageInfo* vstorage, + const std::vector& inputs); + + TablePropertiesCollection GetOutputTableProperties() const { + return output_table_properties_; + } + + void SetOutputTableProperties(TablePropertiesCollection tp) { + output_table_properties_ = std::move(tp); + } + + Slice GetSmallestUserKey() const { return smallest_user_key_; } + + Slice GetLargestUserKey() const { return largest_user_key_; } + + CompactionReason compaction_reason() { return compaction_reason_; } + + const std::vector& grandparents() const { + return grandparents_; + } + + uint64_t max_compaction_bytes() const { return max_compaction_bytes_; } + + uint64_t MaxInputFileCreationTime() const; + + private: + // mark (or clear) all files that are being compacted + void MarkFilesBeingCompacted(bool mark_as_compacted); + + // get the smallest and largest key present in files to be compacted + static void GetBoundaryKeys(VersionStorageInfo* vstorage, + const std::vector& inputs, + Slice* smallest_key, Slice* largest_key); + + // helper function to determine if compaction with inputs and storage is + // bottommost + static bool IsBottommostLevel( + int output_level, VersionStorageInfo* vstorage, + const std::vector& inputs); + + static bool IsFullCompaction(VersionStorageInfo* vstorage, + const std::vector& inputs); + + VersionStorageInfo* input_vstorage_; + + const int start_level_; // the lowest level to be compacted + const int output_level_; // levels to which output files are stored + uint64_t max_output_file_size_; + uint64_t max_compaction_bytes_; + const ImmutableCFOptions immutable_cf_options_; + const MutableCFOptions mutable_cf_options_; + Version* input_version_; + VersionEdit edit_; + const int number_levels_; + ColumnFamilyData* cfd_; + Arena arena_; // Arena used to allocate space for file_levels_ + + const uint32_t output_path_id_; + CompressionType output_compression_; + // If true, then the comaction can be done by simply deleting input files. + const bool deletion_compaction_; + + // Compaction input files organized by level. Constant after construction + const std::vector inputs_; + + // A copy of inputs_, organized more closely in memory + autovector input_levels_; + + // State used to check for number of overlapping grandparent files + // (grandparent == "output_level_ + 1") + std::vector grandparents_; + const double score_; // score that was used to pick this compaction. + + // Is this compaction creating a file in the bottom most level? + const bool bottommost_level_; + // Does this compaction include all sst files? + const bool is_full_compaction_; + + // Is this compaction requested by the client? + const bool is_manual_compaction_; + + // True if we can do trivial move in Universal multi level + // compaction + bool is_trivial_move_; + + // Does input compression match the output compression? + bool InputCompressionMatchesOutput() const; + + // table properties of output files + TablePropertiesCollection output_table_properties_; + + // smallest user keys in compaction + Slice smallest_user_key_; + + // largest user keys in compaction + Slice largest_user_key_; + + // Reason for compaction + CompactionReason compaction_reason_; +}; + +// Utility function +extern uint64_t TotalFileSize(const std::vector& files); + +} // namespace rocksdb http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/compaction_iteration_stats.h ---------------------------------------------------------------------- diff --git a/thirdparty/rocksdb/db/compaction_iteration_stats.h b/thirdparty/rocksdb/db/compaction_iteration_stats.h new file mode 100644 index 0000000..52a666e --- /dev/null +++ b/thirdparty/rocksdb/db/compaction_iteration_stats.h @@ -0,0 +1,33 @@ +// Copyright (c) 2016-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +struct CompactionIterationStats { + // Compaction statistics + + // Doesn't include records skipped because of + // CompactionFilter::Decision::kRemoveAndSkipUntil. + int64_t num_record_drop_user = 0; + + int64_t num_record_drop_hidden = 0; + int64_t num_record_drop_obsolete = 0; + int64_t num_record_drop_range_del = 0; + int64_t num_range_del_drop_obsolete = 0; + uint64_t total_filter_time = 0; + + // Input statistics + // TODO(noetzli): The stats are incomplete. They are lacking everything + // consumed by MergeHelper. + uint64_t num_input_records = 0; + uint64_t num_input_deletion_records = 0; + uint64_t num_input_corrupt_records = 0; + uint64_t total_input_raw_key_bytes = 0; + uint64_t total_input_raw_value_bytes = 0; + + // Single-Delete diagnostics for exceptional situations + uint64_t num_single_del_fallthru = 0; + uint64_t num_single_del_mismatch = 0; +}; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/compaction_iterator.cc ---------------------------------------------------------------------- diff --git a/thirdparty/rocksdb/db/compaction_iterator.cc b/thirdparty/rocksdb/db/compaction_iterator.cc new file mode 100644 index 0000000..08ae197 --- /dev/null +++ b/thirdparty/rocksdb/db/compaction_iterator.cc @@ -0,0 +1,576 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "db/compaction_iterator.h" +#include "rocksdb/listener.h" +#include "table/internal_iterator.h" + +namespace rocksdb { + +#ifndef ROCKSDB_LITE +CompactionEventListener::CompactionListenerValueType fromInternalValueType( + ValueType vt) { + switch (vt) { + case kTypeDeletion: + return CompactionEventListener::CompactionListenerValueType::kDelete; + case kTypeValue: + return CompactionEventListener::CompactionListenerValueType::kValue; + case kTypeMerge: + return CompactionEventListener::CompactionListenerValueType:: + kMergeOperand; + case kTypeSingleDeletion: + return CompactionEventListener::CompactionListenerValueType:: + kSingleDelete; + case kTypeRangeDeletion: + return CompactionEventListener::CompactionListenerValueType::kRangeDelete; + default: + assert(false); + return CompactionEventListener::CompactionListenerValueType::kInvalid; + } +} +#endif // ROCKSDB_LITE + +CompactionIterator::CompactionIterator( + InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper, + SequenceNumber last_sequence, std::vector* snapshots, + SequenceNumber earliest_write_conflict_snapshot, Env* env, + bool expect_valid_internal_key, RangeDelAggregator* range_del_agg, + const Compaction* compaction, const CompactionFilter* compaction_filter, + CompactionEventListener* compaction_listener, + const std::atomic* shutting_down) + : CompactionIterator( + input, cmp, merge_helper, last_sequence, snapshots, + earliest_write_conflict_snapshot, env, expect_valid_internal_key, + range_del_agg, + std::unique_ptr( + compaction ? new CompactionProxy(compaction) : nullptr), + compaction_filter, compaction_listener, shutting_down) {} + +CompactionIterator::CompactionIterator( + InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper, + SequenceNumber last_sequence, std::vector* snapshots, + SequenceNumber earliest_write_conflict_snapshot, Env* env, + bool expect_valid_internal_key, RangeDelAggregator* range_del_agg, + std::unique_ptr compaction, + const CompactionFilter* compaction_filter, + CompactionEventListener* compaction_listener, + const std::atomic* shutting_down) + : input_(input), + cmp_(cmp), + merge_helper_(merge_helper), + snapshots_(snapshots), + earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot), + env_(env), + expect_valid_internal_key_(expect_valid_internal_key), + range_del_agg_(range_del_agg), + compaction_(std::move(compaction)), + compaction_filter_(compaction_filter), +#ifndef ROCKSDB_LITE + compaction_listener_(compaction_listener), +#endif // ROCKSDB_LITE + shutting_down_(shutting_down), + ignore_snapshots_(false), + merge_out_iter_(merge_helper_) { + assert(compaction_filter_ == nullptr || compaction_ != nullptr); + bottommost_level_ = + compaction_ == nullptr ? false : compaction_->bottommost_level(); + if (compaction_ != nullptr) { + level_ptrs_ = std::vector(compaction_->number_levels(), 0); + } + + if (snapshots_->size() == 0) { + // optimize for fast path if there are no snapshots + visible_at_tip_ = true; + earliest_snapshot_ = kMaxSequenceNumber; + latest_snapshot_ = 0; + } else { + visible_at_tip_ = false; + earliest_snapshot_ = snapshots_->at(0); + latest_snapshot_ = snapshots_->back(); + } + if (compaction_filter_ != nullptr) { + if (compaction_filter_->IgnoreSnapshots()) { + ignore_snapshots_ = true; + } + } else { + ignore_snapshots_ = false; + } + input_->SetPinnedItersMgr(&pinned_iters_mgr_); +} + +CompactionIterator::~CompactionIterator() { + // input_ Iteartor lifetime is longer than pinned_iters_mgr_ lifetime + input_->SetPinnedItersMgr(nullptr); +} + +void CompactionIterator::ResetRecordCounts() { + iter_stats_.num_record_drop_user = 0; + iter_stats_.num_record_drop_hidden = 0; + iter_stats_.num_record_drop_obsolete = 0; + iter_stats_.num_record_drop_range_del = 0; + iter_stats_.num_range_del_drop_obsolete = 0; +} + +void CompactionIterator::SeekToFirst() { + NextFromInput(); + PrepareOutput(); +} + +void CompactionIterator::Next() { + // If there is a merge output, return it before continuing to process the + // input. + if (merge_out_iter_.Valid()) { + merge_out_iter_.Next(); + + // Check if we returned all records of the merge output. + if (merge_out_iter_.Valid()) { + key_ = merge_out_iter_.key(); + value_ = merge_out_iter_.value(); + bool valid_key __attribute__((__unused__)) = + ParseInternalKey(key_, &ikey_); + // MergeUntil stops when it encounters a corrupt key and does not + // include them in the result, so we expect the keys here to be valid. + assert(valid_key); + // Keep current_key_ in sync. + current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type); + key_ = current_key_.GetInternalKey(); + ikey_.user_key = current_key_.GetUserKey(); + valid_ = true; + } else { + // We consumed all pinned merge operands, release pinned iterators + pinned_iters_mgr_.ReleasePinnedData(); + // MergeHelper moves the iterator to the first record after the merged + // records, so even though we reached the end of the merge output, we do + // not want to advance the iterator. + NextFromInput(); + } + } else { + // Only advance the input iterator if there is no merge output and the + // iterator is not already at the next record. + if (!at_next_) { + input_->Next(); + } + NextFromInput(); + } + + if (valid_) { + // Record that we've outputted a record for the current key. + has_outputted_key_ = true; + } + + PrepareOutput(); +} + +void CompactionIterator::NextFromInput() { + at_next_ = false; + valid_ = false; + + while (!valid_ && input_->Valid() && !IsShuttingDown()) { + key_ = input_->key(); + value_ = input_->value(); + iter_stats_.num_input_records++; + + if (!ParseInternalKey(key_, &ikey_)) { + // If `expect_valid_internal_key_` is false, return the corrupted key + // and let the caller decide what to do with it. + // TODO(noetzli): We should have a more elegant solution for this. + if (expect_valid_internal_key_) { + assert(!"Corrupted internal key not expected."); + status_ = Status::Corruption("Corrupted internal key not expected."); + break; + } + key_ = current_key_.SetInternalKey(key_); + has_current_user_key_ = false; + current_user_key_sequence_ = kMaxSequenceNumber; + current_user_key_snapshot_ = 0; + iter_stats_.num_input_corrupt_records++; + valid_ = true; + break; + } + + // Update input statistics + if (ikey_.type == kTypeDeletion || ikey_.type == kTypeSingleDeletion) { + iter_stats_.num_input_deletion_records++; + } + iter_stats_.total_input_raw_key_bytes += key_.size(); + iter_stats_.total_input_raw_value_bytes += value_.size(); + + // If need_skip is true, we should seek the input iterator + // to internal key skip_until and continue from there. + bool need_skip = false; + // Points either into compaction_filter_skip_until_ or into + // merge_helper_->compaction_filter_skip_until_. + Slice skip_until; + + // Check whether the user key changed. After this if statement current_key_ + // is a copy of the current input key (maybe converted to a delete by the + // compaction filter). ikey_.user_key is pointing to the copy. + if (!has_current_user_key_ || + !cmp_->Equal(ikey_.user_key, current_user_key_)) { + // First occurrence of this user key + // Copy key for output + key_ = current_key_.SetInternalKey(key_, &ikey_); + current_user_key_ = ikey_.user_key; + has_current_user_key_ = true; + has_outputted_key_ = false; + current_user_key_sequence_ = kMaxSequenceNumber; + current_user_key_snapshot_ = 0; + +#ifndef ROCKSDB_LITE + if (compaction_listener_) { + compaction_listener_->OnCompaction(compaction_->level(), ikey_.user_key, + fromInternalValueType(ikey_.type), + value_, ikey_.sequence, true); + } +#endif // ROCKSDB_LITE + + // apply the compaction filter to the first occurrence of the user key + if (compaction_filter_ != nullptr && ikey_.type == kTypeValue && + (visible_at_tip_ || ikey_.sequence > latest_snapshot_ || + ignore_snapshots_)) { + // If the user has specified a compaction filter and the sequence + // number is greater than any external snapshot, then invoke the + // filter. If the return value of the compaction filter is true, + // replace the entry with a deletion marker. + CompactionFilter::Decision filter; + compaction_filter_value_.clear(); + compaction_filter_skip_until_.Clear(); + { + StopWatchNano timer(env_, true); + filter = compaction_filter_->FilterV2( + compaction_->level(), ikey_.user_key, + CompactionFilter::ValueType::kValue, value_, + &compaction_filter_value_, compaction_filter_skip_until_.rep()); + iter_stats_.total_filter_time += + env_ != nullptr ? timer.ElapsedNanos() : 0; + } + + if (filter == CompactionFilter::Decision::kRemoveAndSkipUntil && + cmp_->Compare(*compaction_filter_skip_until_.rep(), + ikey_.user_key) <= 0) { + // Can't skip to a key smaller than the current one. + // Keep the key as per FilterV2 documentation. + filter = CompactionFilter::Decision::kKeep; + } + + if (filter == CompactionFilter::Decision::kRemove) { + // convert the current key to a delete; key_ is pointing into + // current_key_ at this point, so updating current_key_ updates key() + ikey_.type = kTypeDeletion; + current_key_.UpdateInternalKey(ikey_.sequence, kTypeDeletion); + // no value associated with delete + value_.clear(); + iter_stats_.num_record_drop_user++; + } else if (filter == CompactionFilter::Decision::kChangeValue) { + value_ = compaction_filter_value_; + } else if (filter == CompactionFilter::Decision::kRemoveAndSkipUntil) { + need_skip = true; + compaction_filter_skip_until_.ConvertFromUserKey(kMaxSequenceNumber, + kValueTypeForSeek); + skip_until = compaction_filter_skip_until_.Encode(); + } + } + } else { +#ifndef ROCKSDB_LITE + if (compaction_listener_) { + compaction_listener_->OnCompaction(compaction_->level(), ikey_.user_key, + fromInternalValueType(ikey_.type), + value_, ikey_.sequence, false); + } +#endif // ROCKSDB_LITE + + // Update the current key to reflect the new sequence number/type without + // copying the user key. + // TODO(rven): Compaction filter does not process keys in this path + // Need to have the compaction filter process multiple versions + // if we have versions on both sides of a snapshot + current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type); + key_ = current_key_.GetInternalKey(); + ikey_.user_key = current_key_.GetUserKey(); + } + + // If there are no snapshots, then this kv affect visibility at tip. + // Otherwise, search though all existing snapshots to find the earliest + // snapshot that is affected by this kv. + SequenceNumber last_sequence __attribute__((__unused__)) = + current_user_key_sequence_; + current_user_key_sequence_ = ikey_.sequence; + SequenceNumber last_snapshot = current_user_key_snapshot_; + SequenceNumber prev_snapshot = 0; // 0 means no previous snapshot + current_user_key_snapshot_ = + visible_at_tip_ + ? earliest_snapshot_ + : findEarliestVisibleSnapshot(ikey_.sequence, &prev_snapshot); + + if (need_skip) { + // This case is handled below. + } else if (clear_and_output_next_key_) { + // In the previous iteration we encountered a single delete that we could + // not compact out. We will keep this Put, but can drop it's data. + // (See Optimization 3, below.) + assert(ikey_.type == kTypeValue); + assert(current_user_key_snapshot_ == last_snapshot); + + value_.clear(); + valid_ = true; + clear_and_output_next_key_ = false; + } else if (ikey_.type == kTypeSingleDeletion) { + // We can compact out a SingleDelete if: + // 1) We encounter the corresponding PUT -OR- we know that this key + // doesn't appear past this output level + // =AND= + // 2) We've already returned a record in this snapshot -OR- + // there are no earlier earliest_write_conflict_snapshot. + // + // Rule 1 is needed for SingleDelete correctness. Rule 2 is needed to + // allow Transactions to do write-conflict checking (if we compacted away + // all keys, then we wouldn't know that a write happened in this + // snapshot). If there is no earlier snapshot, then we know that there + // are no active transactions that need to know about any writes. + // + // Optimization 3: + // If we encounter a SingleDelete followed by a PUT and Rule 2 is NOT + // true, then we must output a SingleDelete. In this case, we will decide + // to also output the PUT. While we are compacting less by outputting the + // PUT now, hopefully this will lead to better compaction in the future + // when Rule 2 is later true (Ie, We are hoping we can later compact out + // both the SingleDelete and the Put, while we couldn't if we only + // outputted the SingleDelete now). + // In this case, we can save space by removing the PUT's value as it will + // never be read. + // + // Deletes and Merges are not supported on the same key that has a + // SingleDelete as it is not possible to correctly do any partial + // compaction of such a combination of operations. The result of mixing + // those operations for a given key is documented as being undefined. So + // we can choose how to handle such a combinations of operations. We will + // try to compact out as much as we can in these cases. + // We will report counts on these anomalous cases. + + // The easiest way to process a SingleDelete during iteration is to peek + // ahead at the next key. + ParsedInternalKey next_ikey; + input_->Next(); + + // Check whether the next key exists, is not corrupt, and is the same key + // as the single delete. + if (input_->Valid() && ParseInternalKey(input_->key(), &next_ikey) && + cmp_->Equal(ikey_.user_key, next_ikey.user_key)) { + // Check whether the next key belongs to the same snapshot as the + // SingleDelete. + if (prev_snapshot == 0 || next_ikey.sequence > prev_snapshot) { + if (next_ikey.type == kTypeSingleDeletion) { + // We encountered two SingleDeletes in a row. This could be due to + // unexpected user input. + // Skip the first SingleDelete and let the next iteration decide how + // to handle the second SingleDelete + + // First SingleDelete has been skipped since we already called + // input_->Next(). + ++iter_stats_.num_record_drop_obsolete; + ++iter_stats_.num_single_del_mismatch; + } else if ((ikey_.sequence <= earliest_write_conflict_snapshot_) || + has_outputted_key_) { + // Found a matching value, we can drop the single delete and the + // value. It is safe to drop both records since we've already + // outputted a key in this snapshot, or there is no earlier + // snapshot (Rule 2 above). + + // Note: it doesn't matter whether the second key is a Put or if it + // is an unexpected Merge or Delete. We will compact it out + // either way. We will maintain counts of how many mismatches + // happened + if (next_ikey.type != kTypeValue) { + ++iter_stats_.num_single_del_mismatch; + } + + ++iter_stats_.num_record_drop_hidden; + ++iter_stats_.num_record_drop_obsolete; + // Already called input_->Next() once. Call it a second time to + // skip past the second key. + input_->Next(); + } else { + // Found a matching value, but we cannot drop both keys since + // there is an earlier snapshot and we need to leave behind a record + // to know that a write happened in this snapshot (Rule 2 above). + // Clear the value and output the SingleDelete. (The value will be + // outputted on the next iteration.) + + // Setting valid_ to true will output the current SingleDelete + valid_ = true; + + // Set up the Put to be outputted in the next iteration. + // (Optimization 3). + clear_and_output_next_key_ = true; + } + } else { + // We hit the next snapshot without hitting a put, so the iterator + // returns the single delete. + valid_ = true; + } + } else { + // We are at the end of the input, could not parse the next key, or hit + // a different key. The iterator returns the single delete if the key + // possibly exists beyond the current output level. We set + // has_current_user_key to false so that if the iterator is at the next + // key, we do not compare it again against the previous key at the next + // iteration. If the next key is corrupt, we return before the + // comparison, so the value of has_current_user_key does not matter. + has_current_user_key_ = false; + if (compaction_ != nullptr && ikey_.sequence <= earliest_snapshot_ && + compaction_->KeyNotExistsBeyondOutputLevel(ikey_.user_key, + &level_ptrs_)) { + // Key doesn't exist outside of this range. + // Can compact out this SingleDelete. + ++iter_stats_.num_record_drop_obsolete; + ++iter_stats_.num_single_del_fallthru; + } else { + // Output SingleDelete + valid_ = true; + } + } + + if (valid_) { + at_next_ = true; + } + } else if (last_snapshot == current_user_key_snapshot_) { + // If the earliest snapshot is which this key is visible in + // is the same as the visibility of a previous instance of the + // same key, then this kv is not visible in any snapshot. + // Hidden by an newer entry for same user key + // TODO(noetzli): why not > ? + // + // Note: Dropping this key will not affect TransactionDB write-conflict + // checking since there has already been a record returned for this key + // in this snapshot. + assert(last_sequence >= current_user_key_sequence_); + ++iter_stats_.num_record_drop_hidden; // (A) + input_->Next(); + } else if (compaction_ != nullptr && ikey_.type == kTypeDeletion && + ikey_.sequence <= earliest_snapshot_ && + compaction_->KeyNotExistsBeyondOutputLevel(ikey_.user_key, + &level_ptrs_)) { + // TODO(noetzli): This is the only place where we use compaction_ + // (besides the constructor). We should probably get rid of this + // dependency and find a way to do similar filtering during flushes. + // + // For this user key: + // (1) there is no data in higher levels + // (2) data in lower levels will have larger sequence numbers + // (3) data in layers that are being compacted here and have + // smaller sequence numbers will be dropped in the next + // few iterations of this loop (by rule (A) above). + // Therefore this deletion marker is obsolete and can be dropped. + // + // Note: Dropping this Delete will not affect TransactionDB + // write-conflict checking since it is earlier than any snapshot. + ++iter_stats_.num_record_drop_obsolete; + input_->Next(); + } else if (ikey_.type == kTypeMerge) { + if (!merge_helper_->HasOperator()) { + status_ = Status::InvalidArgument( + "merge_operator is not properly initialized."); + return; + } + + pinned_iters_mgr_.StartPinning(); + // We know the merge type entry is not hidden, otherwise we would + // have hit (A) + // We encapsulate the merge related state machine in a different + // object to minimize change to the existing flow. + Status s = merge_helper_->MergeUntil(input_, range_del_agg_, + prev_snapshot, bottommost_level_); + merge_out_iter_.SeekToFirst(); + + if (!s.ok() && !s.IsMergeInProgress()) { + status_ = s; + return; + } else if (merge_out_iter_.Valid()) { + // NOTE: key, value, and ikey_ refer to old entries. + // These will be correctly set below. + key_ = merge_out_iter_.key(); + value_ = merge_out_iter_.value(); + bool valid_key __attribute__((__unused__)) = + ParseInternalKey(key_, &ikey_); + // MergeUntil stops when it encounters a corrupt key and does not + // include them in the result, so we expect the keys here to valid. + assert(valid_key); + // Keep current_key_ in sync. + current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type); + key_ = current_key_.GetInternalKey(); + ikey_.user_key = current_key_.GetUserKey(); + valid_ = true; + } else { + // all merge operands were filtered out. reset the user key, since the + // batch consumed by the merge operator should not shadow any keys + // coming after the merges + has_current_user_key_ = false; + pinned_iters_mgr_.ReleasePinnedData(); + + if (merge_helper_->FilteredUntil(&skip_until)) { + need_skip = true; + } + } + } else { + // 1. new user key -OR- + // 2. different snapshot stripe + bool should_delete = range_del_agg_->ShouldDelete( + key_, RangeDelAggregator::RangePositioningMode::kForwardTraversal); + if (should_delete) { + ++iter_stats_.num_record_drop_hidden; + ++iter_stats_.num_record_drop_range_del; + input_->Next(); + } else { + valid_ = true; + } + } + + if (need_skip) { + input_->Seek(skip_until); + } + } + + if (!valid_ && IsShuttingDown()) { + status_ = Status::ShutdownInProgress(); + } +} + +void CompactionIterator::PrepareOutput() { + // Zeroing out the sequence number leads to better compression. + // If this is the bottommost level (no files in lower levels) + // and the earliest snapshot is larger than this seqno + // and the userkey differs from the last userkey in compaction + // then we can squash the seqno to zero. + + // This is safe for TransactionDB write-conflict checking since transactions + // only care about sequence number larger than any active snapshots. + if ((compaction_ != nullptr && !compaction_->allow_ingest_behind()) && + bottommost_level_ && valid_ && ikey_.sequence <= earliest_snapshot_ && + ikey_.type != kTypeMerge && + !cmp_->Equal(compaction_->GetLargestUserKey(), ikey_.user_key)) { + assert(ikey_.type != kTypeDeletion && ikey_.type != kTypeSingleDeletion); + ikey_.sequence = 0; + current_key_.UpdateInternalKey(0, ikey_.type); + } +} + +inline SequenceNumber CompactionIterator::findEarliestVisibleSnapshot( + SequenceNumber in, SequenceNumber* prev_snapshot) { + assert(snapshots_->size()); + SequenceNumber prev __attribute__((__unused__)) = kMaxSequenceNumber; + for (const auto cur : *snapshots_) { + assert(prev == kMaxSequenceNumber || prev <= cur); + if (cur >= in) { + *prev_snapshot = prev == kMaxSequenceNumber ? 0 : prev; + return cur; + } + prev = cur; + assert(prev < kMaxSequenceNumber); + } + *prev_snapshot = prev; + return kMaxSequenceNumber; +} + +} // namespace rocksdb http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/compaction_iterator.h ---------------------------------------------------------------------- diff --git a/thirdparty/rocksdb/db/compaction_iterator.h b/thirdparty/rocksdb/db/compaction_iterator.h new file mode 100644 index 0000000..cad2386 --- /dev/null +++ b/thirdparty/rocksdb/db/compaction_iterator.h @@ -0,0 +1,197 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +#pragma once + +#include +#include +#include +#include + +#include "db/compaction.h" +#include "db/compaction_iteration_stats.h" +#include "db/merge_helper.h" +#include "db/pinned_iterators_manager.h" +#include "db/range_del_aggregator.h" +#include "options/cf_options.h" +#include "rocksdb/compaction_filter.h" + +namespace rocksdb { + +class CompactionEventListener; + +class CompactionIterator { + public: + // A wrapper around Compaction. Has a much smaller interface, only what + // CompactionIterator uses. Tests can override it. + class CompactionProxy { + public: + explicit CompactionProxy(const Compaction* compaction) + : compaction_(compaction) {} + + virtual ~CompactionProxy() = default; + virtual int level(size_t compaction_input_level = 0) const { + return compaction_->level(); + } + virtual bool KeyNotExistsBeyondOutputLevel( + const Slice& user_key, std::vector* level_ptrs) const { + return compaction_->KeyNotExistsBeyondOutputLevel(user_key, level_ptrs); + } + virtual bool bottommost_level() const { + return compaction_->bottommost_level(); + } + virtual int number_levels() const { return compaction_->number_levels(); } + virtual Slice GetLargestUserKey() const { + return compaction_->GetLargestUserKey(); + } + virtual bool allow_ingest_behind() const { + return compaction_->immutable_cf_options()->allow_ingest_behind; + } + + protected: + CompactionProxy() = default; + + private: + const Compaction* compaction_; + }; + + CompactionIterator(InternalIterator* input, const Comparator* cmp, + MergeHelper* merge_helper, SequenceNumber last_sequence, + std::vector* snapshots, + SequenceNumber earliest_write_conflict_snapshot, Env* env, + bool expect_valid_internal_key, + RangeDelAggregator* range_del_agg, + const Compaction* compaction = nullptr, + const CompactionFilter* compaction_filter = nullptr, + CompactionEventListener* compaction_listener = nullptr, + const std::atomic* shutting_down = nullptr); + + // Constructor with custom CompactionProxy, used for tests. + CompactionIterator(InternalIterator* input, const Comparator* cmp, + MergeHelper* merge_helper, SequenceNumber last_sequence, + std::vector* snapshots, + SequenceNumber earliest_write_conflict_snapshot, Env* env, + bool expect_valid_internal_key, + RangeDelAggregator* range_del_agg, + std::unique_ptr compaction, + const CompactionFilter* compaction_filter = nullptr, + CompactionEventListener* compaction_listener = nullptr, + const std::atomic* shutting_down = nullptr); + + ~CompactionIterator(); + + void ResetRecordCounts(); + + // Seek to the beginning of the compaction iterator output. + // + // REQUIRED: Call only once. + void SeekToFirst(); + + // Produces the next record in the compaction. + // + // REQUIRED: SeekToFirst() has been called. + void Next(); + + // Getters + const Slice& key() const { return key_; } + const Slice& value() const { return value_; } + const Status& status() const { return status_; } + const ParsedInternalKey& ikey() const { return ikey_; } + bool Valid() const { return valid_; } + const Slice& user_key() const { return current_user_key_; } + const CompactionIterationStats& iter_stats() const { return iter_stats_; } + + private: + // Processes the input stream to find the next output + void NextFromInput(); + + // Do last preparations before presenting the output to the callee. At this + // point this only zeroes out the sequence number if possible for better + // compression. + void PrepareOutput(); + + // Given a sequence number, return the sequence number of the + // earliest snapshot that this sequence number is visible in. + // The snapshots themselves are arranged in ascending order of + // sequence numbers. + // Employ a sequential search because the total number of + // snapshots are typically small. + inline SequenceNumber findEarliestVisibleSnapshot( + SequenceNumber in, SequenceNumber* prev_snapshot); + + InternalIterator* input_; + const Comparator* cmp_; + MergeHelper* merge_helper_; + const std::vector* snapshots_; + const SequenceNumber earliest_write_conflict_snapshot_; + Env* env_; + bool expect_valid_internal_key_; + RangeDelAggregator* range_del_agg_; + std::unique_ptr compaction_; + const CompactionFilter* compaction_filter_; +#ifndef ROCKSDB_LITE + CompactionEventListener* compaction_listener_; +#endif // ROCKSDB_LITE + const std::atomic* shutting_down_; + bool bottommost_level_; + bool valid_ = false; + bool visible_at_tip_; + SequenceNumber earliest_snapshot_; + SequenceNumber latest_snapshot_; + bool ignore_snapshots_; + + // State + // + // Points to a copy of the current compaction iterator output (current_key_) + // if valid_. + Slice key_; + // Points to the value in the underlying iterator that corresponds to the + // current output. + Slice value_; + // The status is OK unless compaction iterator encounters a merge operand + // while not having a merge operator defined. + Status status_; + // Stores the user key, sequence number and type of the current compaction + // iterator output (or current key in the underlying iterator during + // NextFromInput()). + ParsedInternalKey ikey_; + // Stores whether ikey_.user_key is valid. If set to false, the user key is + // not compared against the current key in the underlying iterator. + bool has_current_user_key_ = false; + bool at_next_ = false; // If false, the iterator + // Holds a copy of the current compaction iterator output (or current key in + // the underlying iterator during NextFromInput()). + IterKey current_key_; + Slice current_user_key_; + SequenceNumber current_user_key_sequence_; + SequenceNumber current_user_key_snapshot_; + + // True if the iterator has already returned a record for the current key. + bool has_outputted_key_ = false; + + // truncated the value of the next key and output it without applying any + // compaction rules. This is used for outputting a put after a single delete. + bool clear_and_output_next_key_ = false; + + MergeOutputIterator merge_out_iter_; + // PinnedIteratorsManager used to pin input_ Iterator blocks while reading + // merge operands and then releasing them after consuming them. + PinnedIteratorsManager pinned_iters_mgr_; + std::string compaction_filter_value_; + InternalKey compaction_filter_skip_until_; + // "level_ptrs" holds indices that remember which file of an associated + // level we were last checking during the last call to compaction-> + // KeyNotExistsBeyondOutputLevel(). This allows future calls to the function + // to pick off where it left off since each subcompaction's key range is + // increasing so a later call to the function must be looking for a key that + // is in or beyond the last file checked during the previous call + std::vector level_ptrs_; + CompactionIterationStats iter_stats_; + + bool IsShuttingDown() { + // This is a best-effort facility, so memory_order_relaxed is sufficient. + return shutting_down_ && shutting_down_->load(std::memory_order_relaxed); + } +}; +} // namespace rocksdb