http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/column_family.cc
----------------------------------------------------------------------
diff --git a/thirdparty/rocksdb/db/column_family.cc b/thirdparty/rocksdb/db/column_family.cc
new file mode 100644
index 0000000..b00eda0
--- /dev/null
+++ b/thirdparty/rocksdb/db/column_family.cc
@@ -0,0 +1,1137 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#include "db/column_family.h"
+
+#ifndef __STDC_FORMAT_MACROS
+#define __STDC_FORMAT_MACROS
+#endif
+
+#include <inttypes.h>
+#include <vector>
+#include <string>
+#include <algorithm>
+#include <limits>
+
+#include "db/compaction_picker.h"
+#include "db/compaction_picker_universal.h"
+#include "db/db_impl.h"
+#include "db/internal_stats.h"
+#include "db/job_context.h"
+#include "db/table_properties_collector.h"
+#include "db/version_set.h"
+#include "db/write_controller.h"
+#include "memtable/hash_skiplist_rep.h"
+#include "monitoring/thread_status_util.h"
+#include "options/options_helper.h"
+#include "table/block_based_table_factory.h"
+#include "util/autovector.h"
+#include "util/compression.h"
+
+namespace rocksdb {
+
+ColumnFamilyHandleImpl::ColumnFamilyHandleImpl(
+ ColumnFamilyData* column_family_data, DBImpl* db, InstrumentedMutex* mutex)
+ : cfd_(column_family_data), db_(db), mutex_(mutex) {
+ if (cfd_ != nullptr) {
+ cfd_->Ref();
+ }
+}
+
+ColumnFamilyHandleImpl::~ColumnFamilyHandleImpl() {
+ if (cfd_ != nullptr) {
+#ifndef ROCKSDB_LITE
+ for (auto& listener : cfd_->ioptions()->listeners) {
+ listener->OnColumnFamilyHandleDeletionStarted(this);
+ }
+#endif // ROCKSDB_LITE
+ // Job id == 0 means that this is not our background process, but rather
+ // user thread
+ JobContext job_context(0);
+ mutex_->Lock();
+ if (cfd_->Unref()) {
+ delete cfd_;
+ }
+ db_->FindObsoleteFiles(&job_context, false, true);
+ mutex_->Unlock();
+ if (job_context.HaveSomethingToDelete()) {
+ db_->PurgeObsoleteFiles(job_context);
+ }
+ job_context.Clean();
+ }
+}
+
+uint32_t ColumnFamilyHandleImpl::GetID() const { return cfd()->GetID(); }
+
+const std::string& ColumnFamilyHandleImpl::GetName() const {
+ return cfd()->GetName();
+}
+
+Status ColumnFamilyHandleImpl::GetDescriptor(ColumnFamilyDescriptor* desc) {
+#ifndef ROCKSDB_LITE
+ // accessing mutable cf-options requires db mutex.
+ InstrumentedMutexLock l(mutex_);
+ *desc = ColumnFamilyDescriptor(cfd()->GetName(), cfd()->GetLatestCFOptions());
+ return Status::OK();
+#else
+ return Status::NotSupported();
+#endif // !ROCKSDB_LITE
+}
+
+const Comparator* ColumnFamilyHandleImpl::GetComparator() const {
+ return cfd()->user_comparator();
+}
+
+void GetIntTblPropCollectorFactory(
+ const ImmutableCFOptions& ioptions,
+ std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
+ int_tbl_prop_collector_factories) {
+ auto& collector_factories = ioptions.table_properties_collector_factories;
+ for (size_t i = 0; i < ioptions.table_properties_collector_factories.size();
+ ++i) {
+ assert(collector_factories[i]);
+ int_tbl_prop_collector_factories->emplace_back(
+ new UserKeyTablePropertiesCollectorFactory(collector_factories[i]));
+ }
+ // Add collector to collect internal key statistics
+ int_tbl_prop_collector_factories->emplace_back(
+ new InternalKeyPropertiesCollectorFactory);
+}
+
+Status CheckCompressionSupported(const ColumnFamilyOptions& cf_options) {
+ if (!cf_options.compression_per_level.empty()) {
+ for (size_t level = 0; level < cf_options.compression_per_level.size();
+ ++level) {
+ if (!CompressionTypeSupported(cf_options.compression_per_level[level])) {
+ return Status::InvalidArgument(
+ "Compression type " +
+ CompressionTypeToString(cf_options.compression_per_level[level]) +
+ " is not linked with the binary.");
+ }
+ }
+ } else {
+ if (!CompressionTypeSupported(cf_options.compression)) {
+ return Status::InvalidArgument(
+ "Compression type " +
+ CompressionTypeToString(cf_options.compression) +
+ " is not linked with the binary.");
+ }
+ }
+ return Status::OK();
+}
+
+Status CheckConcurrentWritesSupported(const ColumnFamilyOptions& cf_options) {
+ if (cf_options.inplace_update_support) {
+ return Status::InvalidArgument(
+ "In-place memtable updates (inplace_update_support) is not compatible "
+ "with concurrent writes (allow_concurrent_memtable_write)");
+ }
+ if (!cf_options.memtable_factory->IsInsertConcurrentlySupported()) {
+ return Status::InvalidArgument(
+ "Memtable doesn't concurrent writes (allow_concurrent_memtable_write)");
+ }
+ return Status::OK();
+}
+
+ColumnFamilyOptions SanitizeOptions(const ImmutableDBOptions& db_options,
+ const ColumnFamilyOptions& src) {
+ ColumnFamilyOptions result = src;
+ size_t clamp_max = std::conditional<
+ sizeof(size_t) == 4, std::integral_constant<size_t, 0xffffffff>,
+ std::integral_constant<uint64_t, 64ull << 30>>::type::value;
+ ClipToRange(&result.write_buffer_size, ((size_t)64) << 10, clamp_max);
+ // if user sets arena_block_size, we trust user to use this value. Otherwise,
+ // calculate a proper value from writer_buffer_size;
+ if (result.arena_block_size <= 0) {
+ result.arena_block_size = result.write_buffer_size / 8;
+
+ // Align up to 4k
+ const size_t align = 4 * 1024;
+ result.arena_block_size =
+ ((result.arena_block_size + align - 1) / align) * align;
+ }
+ result.min_write_buffer_number_to_merge =
+ std::min(result.min_write_buffer_number_to_merge,
+ result.max_write_buffer_number - 1);
+ if (result.min_write_buffer_number_to_merge < 1) {
+ result.min_write_buffer_number_to_merge = 1;
+ }
+
+ if (result.num_levels < 1) {
+ result.num_levels = 1;
+ }
+ if (result.compaction_style == kCompactionStyleLevel &&
+ result.num_levels < 2) {
+ result.num_levels = 2;
+ }
+
+ if (result.compaction_style == kCompactionStyleUniversal &&
+ db_options.allow_ingest_behind && result.num_levels < 3) {
+ result.num_levels = 3;
+ }
+
+ if (result.max_write_buffer_number < 2) {
+ result.max_write_buffer_number = 2;
+ }
+ if (result.max_write_buffer_number_to_maintain < 0) {
+ result.max_write_buffer_number_to_maintain = result.max_write_buffer_number;
+ }
+ // bloom filter size shouldn't exceed 1/4 of memtable size.
+ if (result.memtable_prefix_bloom_size_ratio > 0.25) {
+ result.memtable_prefix_bloom_size_ratio = 0.25;
+ } else if (result.memtable_prefix_bloom_size_ratio < 0) {
+ result.memtable_prefix_bloom_size_ratio = 0;
+ }
+
+ if (!result.prefix_extractor) {
+ assert(result.memtable_factory);
+ Slice name = result.memtable_factory->Name();
+ if (name.compare("HashSkipListRepFactory") == 0 ||
+ name.compare("HashLinkListRepFactory") == 0) {
+ result.memtable_factory = std::make_shared<SkipListFactory>();
+ }
+ }
+
+ if (result.compaction_style == kCompactionStyleFIFO) {
+ result.num_levels = 1;
+ // since we delete level0 files in FIFO compaction when there are too many
+ // of them, these options don't really mean anything
+ result.level0_slowdown_writes_trigger = std::numeric_limits<int>::max();
+ result.level0_stop_writes_trigger = std::numeric_limits<int>::max();
+ }
+
+ if (result.max_bytes_for_level_multiplier <= 0) {
+ result.max_bytes_for_level_multiplier = 1;
+ }
+
+ if (result.level0_file_num_compaction_trigger == 0) {
+ ROCKS_LOG_WARN(db_options.info_log.get(),
+ "level0_file_num_compaction_trigger cannot be 0");
+ result.level0_file_num_compaction_trigger = 1;
+ }
+
+ if (result.level0_stop_writes_trigger <
+ result.level0_slowdown_writes_trigger ||
+ result.level0_slowdown_writes_trigger <
+ result.level0_file_num_compaction_trigger) {
+ ROCKS_LOG_WARN(db_options.info_log.get(),
+ "This condition must be satisfied: "
+ "level0_stop_writes_trigger(%d) >= "
+ "level0_slowdown_writes_trigger(%d) >= "
+ "level0_file_num_compaction_trigger(%d)",
+ result.level0_stop_writes_trigger,
+ result.level0_slowdown_writes_trigger,
+ result.level0_file_num_compaction_trigger);
+ if (result.level0_slowdown_writes_trigger <
+ result.level0_file_num_compaction_trigger) {
+ result.level0_slowdown_writes_trigger =
+ result.level0_file_num_compaction_trigger;
+ }
+ if (result.level0_stop_writes_trigger <
+ result.level0_slowdown_writes_trigger) {
+ result.level0_stop_writes_trigger = result.level0_slowdown_writes_trigger;
+ }
+ ROCKS_LOG_WARN(db_options.info_log.get(),
+ "Adjust the value to "
+ "level0_stop_writes_trigger(%d)"
+ "level0_slowdown_writes_trigger(%d)"
+ "level0_file_num_compaction_trigger(%d)",
+ result.level0_stop_writes_trigger,
+ result.level0_slowdown_writes_trigger,
+ result.level0_file_num_compaction_trigger);
+ }
+
+ if (result.soft_pending_compaction_bytes_limit == 0) {
+ result.soft_pending_compaction_bytes_limit =
+ result.hard_pending_compaction_bytes_limit;
+ } else if (result.hard_pending_compaction_bytes_limit > 0 &&
+ result.soft_pending_compaction_bytes_limit >
+ result.hard_pending_compaction_bytes_limit) {
+ result.soft_pending_compaction_bytes_limit =
+ result.hard_pending_compaction_bytes_limit;
+ }
+
+ if (result.level_compaction_dynamic_level_bytes) {
+ if (result.compaction_style != kCompactionStyleLevel ||
+ db_options.db_paths.size() > 1U) {
+ // 1. level_compaction_dynamic_level_bytes only makes sense for
+ // level-based compaction.
+ // 2. we don't yet know how to make both of this feature and multiple
+ // DB path work.
+ result.level_compaction_dynamic_level_bytes = false;
+ }
+ }
+
+ if (result.max_compaction_bytes == 0) {
+ result.max_compaction_bytes = result.target_file_size_base * 25;
+ }
+
+ return result;
+}
+
+int SuperVersion::dummy = 0;
+void* const SuperVersion::kSVInUse = &SuperVersion::dummy;
+void* const SuperVersion::kSVObsolete = nullptr;
+
+SuperVersion::~SuperVersion() {
+ for (auto td : to_delete) {
+ delete td;
+ }
+}
+
+SuperVersion* SuperVersion::Ref() {
+ refs.fetch_add(1, std::memory_order_relaxed);
+ return this;
+}
+
+bool SuperVersion::Unref() {
+ // fetch_sub returns the previous value of ref
+ uint32_t previous_refs = refs.fetch_sub(1);
+ assert(previous_refs > 0);
+ return previous_refs == 1;
+}
+
+void SuperVersion::Cleanup() {
+ assert(refs.load(std::memory_order_relaxed) == 0);
+ imm->Unref(&to_delete);
+ MemTable* m = mem->Unref();
+ if (m != nullptr) {
+ auto* memory_usage = current->cfd()->imm()->current_memory_usage();
+ assert(*memory_usage >= m->ApproximateMemoryUsage());
+ *memory_usage -= m->ApproximateMemoryUsage();
+ to_delete.push_back(m);
+ }
+ current->Unref();
+}
+
+void SuperVersion::Init(MemTable* new_mem, MemTableListVersion* new_imm,
+ Version* new_current) {
+ mem = new_mem;
+ imm = new_imm;
+ current = new_current;
+ mem->Ref();
+ imm->Ref();
+ current->Ref();
+ refs.store(1, std::memory_order_relaxed);
+}
+
+namespace {
+void SuperVersionUnrefHandle(void* ptr) {
+ // UnrefHandle is called when a thread exists or a ThreadLocalPtr gets
+ // destroyed. When former happens, the thread shouldn't see kSVInUse.
+ // When latter happens, we are in ~ColumnFamilyData(), no get should happen as
+ // well.
+ SuperVersion* sv = static_cast<SuperVersion*>(ptr);
+ if (sv->Unref()) {
+ sv->db_mutex->Lock();
+ sv->Cleanup();
+ sv->db_mutex->Unlock();
+ delete sv;
+ }
+}
+} // anonymous namespace
+
+ColumnFamilyData::ColumnFamilyData(
+ uint32_t id, const std::string& name, Version* _dummy_versions,
+ Cache* _table_cache, WriteBufferManager* write_buffer_manager,
+ const ColumnFamilyOptions& cf_options, const ImmutableDBOptions& db_options,
+ const EnvOptions& env_options, ColumnFamilySet* column_family_set)
+ : id_(id),
+ name_(name),
+ dummy_versions_(_dummy_versions),
+ current_(nullptr),
+ refs_(0),
+ initialized_(false),
+ dropped_(false),
+ internal_comparator_(cf_options.comparator),
+ initial_cf_options_(SanitizeOptions(db_options, cf_options)),
+ ioptions_(db_options, initial_cf_options_),
+ mutable_cf_options_(initial_cf_options_),
+ is_delete_range_supported_(
+ cf_options.table_factory->IsDeleteRangeSupported()),
+ write_buffer_manager_(write_buffer_manager),
+ mem_(nullptr),
+ imm_(ioptions_.min_write_buffer_number_to_merge,
+ ioptions_.max_write_buffer_number_to_maintain),
+ super_version_(nullptr),
+ super_version_number_(0),
+ local_sv_(new ThreadLocalPtr(&SuperVersionUnrefHandle)),
+ next_(nullptr),
+ prev_(nullptr),
+ log_number_(0),
+ column_family_set_(column_family_set),
+ pending_flush_(false),
+ pending_compaction_(false),
+ prev_compaction_needed_bytes_(0),
+ allow_2pc_(db_options.allow_2pc) {
+ Ref();
+
+ // Convert user defined table properties collector factories to internal ones.
+ GetIntTblPropCollectorFactory(ioptions_, &int_tbl_prop_collector_factories_);
+
+ // if _dummy_versions is nullptr, then this is a dummy column family.
+ if (_dummy_versions != nullptr) {
+ internal_stats_.reset(
+ new InternalStats(ioptions_.num_levels, db_options.env, this));
+ table_cache_.reset(new TableCache(ioptions_, env_options, _table_cache));
+ if (ioptions_.compaction_style == kCompactionStyleLevel) {
+ compaction_picker_.reset(
+ new LevelCompactionPicker(ioptions_, &internal_comparator_));
+#ifndef ROCKSDB_LITE
+ } else if (ioptions_.compaction_style == kCompactionStyleUniversal) {
+ compaction_picker_.reset(
+ new UniversalCompactionPicker(ioptions_, &internal_comparator_));
+ } else if (ioptions_.compaction_style == kCompactionStyleFIFO) {
+ compaction_picker_.reset(
+ new FIFOCompactionPicker(ioptions_, &internal_comparator_));
+ } else if (ioptions_.compaction_style == kCompactionStyleNone) {
+ compaction_picker_.reset(new NullCompactionPicker(
+ ioptions_, &internal_comparator_));
+ ROCKS_LOG_WARN(ioptions_.info_log,
+ "Column family %s does not use any background compaction. "
+ "Compactions can only be done via CompactFiles\n",
+ GetName().c_str());
+#endif // !ROCKSDB_LITE
+ } else {
+ ROCKS_LOG_ERROR(ioptions_.info_log,
+ "Unable to recognize the specified compaction style %d. "
+ "Column family %s will use kCompactionStyleLevel.\n",
+ ioptions_.compaction_style, GetName().c_str());
+ compaction_picker_.reset(
+ new LevelCompactionPicker(ioptions_, &internal_comparator_));
+ }
+
+ if (column_family_set_->NumberOfColumnFamilies() < 10) {
+ ROCKS_LOG_INFO(ioptions_.info_log,
+ "--------------- Options for column family [%s]:\n",
+ name.c_str());
+ initial_cf_options_.Dump(ioptions_.info_log);
+ } else {
+ ROCKS_LOG_INFO(ioptions_.info_log, "\t(skipping printing options)\n");
+ }
+ }
+
+ RecalculateWriteStallConditions(mutable_cf_options_);
+}
+
+// DB mutex held
+ColumnFamilyData::~ColumnFamilyData() {
+ assert(refs_.load(std::memory_order_relaxed) == 0);
+ // remove from linked list
+ auto prev = prev_;
+ auto next = next_;
+ prev->next_ = next;
+ next->prev_ = prev;
+
+ if (!dropped_ && column_family_set_ != nullptr) {
+ // If it's dropped, it's already removed from column family set
+ // If column_family_set_ == nullptr, this is dummy CFD and not in
+ // ColumnFamilySet
+ column_family_set_->RemoveColumnFamily(this);
+ }
+
+ if (current_ != nullptr) {
+ current_->Unref();
+ }
+
+ // It would be wrong if this ColumnFamilyData is in flush_queue_ or
+ // compaction_queue_ and we destroyed it
+ assert(!pending_flush_);
+ assert(!pending_compaction_);
+
+ if (super_version_ != nullptr) {
+ // Release SuperVersion reference kept in ThreadLocalPtr.
+ // This must be done outside of mutex_ since unref handler can lock mutex.
+ super_version_->db_mutex->Unlock();
+ local_sv_.reset();
+ super_version_->db_mutex->Lock();
+
+ bool is_last_reference __attribute__((unused));
+ is_last_reference = super_version_->Unref();
+ assert(is_last_reference);
+ super_version_->Cleanup();
+ delete super_version_;
+ super_version_ = nullptr;
+ }
+
+ if (dummy_versions_ != nullptr) {
+ // List must be empty
+ assert(dummy_versions_->TEST_Next() == dummy_versions_);
+ bool deleted __attribute__((unused)) = dummy_versions_->Unref();
+ assert(deleted);
+ }
+
+ if (mem_ != nullptr) {
+ delete mem_->Unref();
+ }
+ autovector<MemTable*> to_delete;
+ imm_.current()->Unref(&to_delete);
+ for (MemTable* m : to_delete) {
+ delete m;
+ }
+}
+
+void ColumnFamilyData::SetDropped() {
+ // can't drop default CF
+ assert(id_ != 0);
+ dropped_ = true;
+ write_controller_token_.reset();
+
+ // remove from column_family_set
+ column_family_set_->RemoveColumnFamily(this);
+}
+
+ColumnFamilyOptions ColumnFamilyData::GetLatestCFOptions() const {
+ return BuildColumnFamilyOptions(initial_cf_options_, mutable_cf_options_);
+}
+
+uint64_t ColumnFamilyData::OldestLogToKeep() {
+ auto current_log = GetLogNumber();
+
+ if (allow_2pc_) {
+ auto imm_prep_log = imm()->GetMinLogContainingPrepSection();
+ auto mem_prep_log = mem()->GetMinLogContainingPrepSection();
+
+ if (imm_prep_log > 0 && imm_prep_log < current_log) {
+ current_log = imm_prep_log;
+ }
+
+ if (mem_prep_log > 0 && mem_prep_log < current_log) {
+ current_log = mem_prep_log;
+ }
+ }
+
+ return current_log;
+}
+
+const double kIncSlowdownRatio = 0.8;
+const double kDecSlowdownRatio = 1 / kIncSlowdownRatio;
+const double kNearStopSlowdownRatio = 0.6;
+const double kDelayRecoverSlowdownRatio = 1.4;
+
+namespace {
+// If penalize_stop is true, we further reduce slowdown rate.
+std::unique_ptr<WriteControllerToken> SetupDelay(
+ WriteController* write_controller, uint64_t compaction_needed_bytes,
+ uint64_t prev_compaction_need_bytes, bool penalize_stop,
+ bool auto_comapctions_disabled) {
+ const uint64_t kMinWriteRate = 16 * 1024u; // Minimum write rate 16KB/s.
+
+ uint64_t max_write_rate = write_controller->max_delayed_write_rate();
+ uint64_t write_rate = write_controller->delayed_write_rate();
+
+ if (auto_comapctions_disabled) {
+ // When auto compaction is disabled, always use the value user gave.
+ write_rate = max_write_rate;
+ } else if (write_controller->NeedsDelay() && max_write_rate > kMinWriteRate) {
+ // If user gives rate less than kMinWriteRate, don't adjust it.
+ //
+ // If already delayed, need to adjust based on previous compaction debt.
+ // When there are two or more column families require delay, we always
+ // increase or reduce write rate based on information for one single
+ // column family. It is likely to be OK but we can improve if there is a
+ // problem.
+ // Ignore compaction_needed_bytes = 0 case because compaction_needed_bytes
+ // is only available in level-based compaction
+ //
+ // If the compaction debt stays the same as previously, we also further slow
+ // down. It usually means a mem table is full. It's mainly for the case
+ // where both of flush and compaction are much slower than the speed we
+ // insert to mem tables, so we need to actively slow down before we get
+ // feedback signal from compaction and flushes to avoid the full stop
+ // because of hitting the max write buffer number.
+ //
+ // If DB just falled into the stop condition, we need to further reduce
+ // the write rate to avoid the stop condition.
+ if (penalize_stop) {
+ // Penalize the near stop or stop condition by more aggressive slowdown.
+ // This is to provide the long term slowdown increase signal.
+ // The penalty is more than the reward of recovering to the normal
+ // condition.
+ write_rate = static_cast<uint64_t>(static_cast<double>(write_rate) *
+ kNearStopSlowdownRatio);
+ if (write_rate < kMinWriteRate) {
+ write_rate = kMinWriteRate;
+ }
+ } else if (prev_compaction_need_bytes > 0 &&
+ prev_compaction_need_bytes <= compaction_needed_bytes) {
+ write_rate = static_cast<uint64_t>(static_cast<double>(write_rate) *
+ kIncSlowdownRatio);
+ if (write_rate < kMinWriteRate) {
+ write_rate = kMinWriteRate;
+ }
+ } else if (prev_compaction_need_bytes > compaction_needed_bytes) {
+ // We are speeding up by ratio of kSlowdownRatio when we have paid
+ // compaction debt. But we'll never speed up to faster than the write rate
+ // given by users.
+ write_rate = static_cast<uint64_t>(static_cast<double>(write_rate) *
+ kDecSlowdownRatio);
+ if (write_rate > max_write_rate) {
+ write_rate = max_write_rate;
+ }
+ }
+ }
+ return write_controller->GetDelayToken(write_rate);
+}
+
+int GetL0ThresholdSpeedupCompaction(int level0_file_num_compaction_trigger,
+ int level0_slowdown_writes_trigger) {
+ // SanitizeOptions() ensures it.
+ assert(level0_file_num_compaction_trigger <= level0_slowdown_writes_trigger);
+
+ if (level0_file_num_compaction_trigger < 0) {
+ return std::numeric_limits<int>::max();
+ }
+
+ const int64_t twice_level0_trigger =
+ static_cast<int64_t>(level0_file_num_compaction_trigger) * 2;
+
+ const int64_t one_fourth_trigger_slowdown =
+ static_cast<int64_t>(level0_file_num_compaction_trigger) +
+ ((level0_slowdown_writes_trigger - level0_file_num_compaction_trigger) /
+ 4);
+
+ assert(twice_level0_trigger >= 0);
+ assert(one_fourth_trigger_slowdown >= 0);
+
+ // 1/4 of the way between L0 compaction trigger threshold and slowdown
+ // condition.
+ // Or twice as compaction trigger, if it is smaller.
+ int64_t res = std::min(twice_level0_trigger, one_fourth_trigger_slowdown);
+ if (res >= port::kMaxInt32) {
+ return port::kMaxInt32;
+ } else {
+ // res fits in int
+ return static_cast<int>(res);
+ }
+}
+} // namespace
+
+void ColumnFamilyData::RecalculateWriteStallConditions(
+ const MutableCFOptions& mutable_cf_options) {
+ if (current_ != nullptr) {
+ auto* vstorage = current_->storage_info();
+ auto write_controller = column_family_set_->write_controller_;
+ uint64_t compaction_needed_bytes =
+ vstorage->estimated_compaction_needed_bytes();
+
+ bool was_stopped = write_controller->IsStopped();
+ bool needed_delay = write_controller->NeedsDelay();
+
+ if (imm()->NumNotFlushed() >= mutable_cf_options.max_write_buffer_number) {
+ write_controller_token_ = write_controller->GetStopToken();
+ internal_stats_->AddCFStats(InternalStats::MEMTABLE_COMPACTION, 1);
+ ROCKS_LOG_WARN(
+ ioptions_.info_log,
+ "[%s] Stopping writes because we have %d immutable memtables "
+ "(waiting for flush), max_write_buffer_number is set to %d",
+ name_.c_str(), imm()->NumNotFlushed(),
+ mutable_cf_options.max_write_buffer_number);
+ } else if (!mutable_cf_options.disable_auto_compactions &&
+ vstorage->l0_delay_trigger_count() >=
+ mutable_cf_options.level0_stop_writes_trigger) {
+ write_controller_token_ = write_controller->GetStopToken();
+ internal_stats_->AddCFStats(InternalStats::LEVEL0_NUM_FILES_TOTAL, 1);
+ if (compaction_picker_->IsLevel0CompactionInProgress()) {
+ internal_stats_->AddCFStats(
+ InternalStats::LEVEL0_NUM_FILES_WITH_COMPACTION, 1);
+ }
+ ROCKS_LOG_WARN(ioptions_.info_log,
+ "[%s] Stopping writes because we have %d level-0 files",
+ name_.c_str(), vstorage->l0_delay_trigger_count());
+ } else if (!mutable_cf_options.disable_auto_compactions &&
+ mutable_cf_options.hard_pending_compaction_bytes_limit > 0 &&
+ compaction_needed_bytes >=
+ mutable_cf_options.hard_pending_compaction_bytes_limit) {
+ write_controller_token_ = write_controller->GetStopToken();
+ internal_stats_->AddCFStats(
+ InternalStats::HARD_PENDING_COMPACTION_BYTES_LIMIT, 1);
+ ROCKS_LOG_WARN(
+ ioptions_.info_log,
+ "[%s] Stopping writes because of estimated pending compaction "
+ "bytes %" PRIu64,
+ name_.c_str(), compaction_needed_bytes);
+ } else if (mutable_cf_options.max_write_buffer_number > 3 &&
+ imm()->NumNotFlushed() >=
+ mutable_cf_options.max_write_buffer_number - 1) {
+ write_controller_token_ =
+ SetupDelay(write_controller, compaction_needed_bytes,
+ prev_compaction_needed_bytes_, was_stopped,
+ mutable_cf_options.disable_auto_compactions);
+ internal_stats_->AddCFStats(InternalStats::MEMTABLE_SLOWDOWN, 1);
+ ROCKS_LOG_WARN(
+ ioptions_.info_log,
+ "[%s] Stalling writes because we have %d immutable memtables "
+ "(waiting for flush), max_write_buffer_number is set to %d "
+ "rate %" PRIu64,
+ name_.c_str(), imm()->NumNotFlushed(),
+ mutable_cf_options.max_write_buffer_number,
+ write_controller->delayed_write_rate());
+ } else if (!mutable_cf_options.disable_auto_compactions &&
+ mutable_cf_options.level0_slowdown_writes_trigger >= 0 &&
+ vstorage->l0_delay_trigger_count() >=
+ mutable_cf_options.level0_slowdown_writes_trigger) {
+ // L0 is the last two files from stopping.
+ bool near_stop = vstorage->l0_delay_trigger_count() >=
+ mutable_cf_options.level0_stop_writes_trigger - 2;
+ write_controller_token_ =
+ SetupDelay(write_controller, compaction_needed_bytes,
+ prev_compaction_needed_bytes_, was_stopped || near_stop,
+ mutable_cf_options.disable_auto_compactions);
+ internal_stats_->AddCFStats(InternalStats::LEVEL0_SLOWDOWN_TOTAL, 1);
+ if (compaction_picker_->IsLevel0CompactionInProgress()) {
+ internal_stats_->AddCFStats(
+ InternalStats::LEVEL0_SLOWDOWN_WITH_COMPACTION, 1);
+ }
+ ROCKS_LOG_WARN(ioptions_.info_log,
+ "[%s] Stalling writes because we have %d level-0 files "
+ "rate %" PRIu64,
+ name_.c_str(), vstorage->l0_delay_trigger_count(),
+ write_controller->delayed_write_rate());
+ } else if (!mutable_cf_options.disable_auto_compactions &&
+ mutable_cf_options.soft_pending_compaction_bytes_limit > 0 &&
+ vstorage->estimated_compaction_needed_bytes() >=
+ mutable_cf_options.soft_pending_compaction_bytes_limit) {
+ // If the distance to hard limit is less than 1/4 of the gap between soft
+ // and
+ // hard bytes limit, we think it is near stop and speed up the slowdown.
+ bool near_stop =
+ mutable_cf_options.hard_pending_compaction_bytes_limit > 0 &&
+ (compaction_needed_bytes -
+ mutable_cf_options.soft_pending_compaction_bytes_limit) >
+ 3 * (mutable_cf_options.hard_pending_compaction_bytes_limit -
+ mutable_cf_options.soft_pending_compaction_bytes_limit) /
+ 4;
+
+ write_controller_token_ =
+ SetupDelay(write_controller, compaction_needed_bytes,
+ prev_compaction_needed_bytes_, was_stopped || near_stop,
+ mutable_cf_options.disable_auto_compactions);
+ internal_stats_->AddCFStats(
+ InternalStats::SOFT_PENDING_COMPACTION_BYTES_LIMIT, 1);
+ ROCKS_LOG_WARN(
+ ioptions_.info_log,
+ "[%s] Stalling writes because of estimated pending compaction "
+ "bytes %" PRIu64 " rate %" PRIu64,
+ name_.c_str(), vstorage->estimated_compaction_needed_bytes(),
+ write_controller->delayed_write_rate());
+ } else {
+ if (vstorage->l0_delay_trigger_count() >=
+ GetL0ThresholdSpeedupCompaction(
+ mutable_cf_options.level0_file_num_compaction_trigger,
+ mutable_cf_options.level0_slowdown_writes_trigger)) {
+ write_controller_token_ =
+ write_controller->GetCompactionPressureToken();
+ ROCKS_LOG_INFO(
+ ioptions_.info_log,
+ "[%s] Increasing compaction threads because we have %d level-0 "
+ "files ",
+ name_.c_str(), vstorage->l0_delay_trigger_count());
+ } else if (vstorage->estimated_compaction_needed_bytes() >=
+ mutable_cf_options.soft_pending_compaction_bytes_limit / 4) {
+ // Increase compaction threads if bytes needed for compaction exceeds
+ // 1/4 of threshold for slowing down.
+ // If soft pending compaction byte limit is not set, always speed up
+ // compaction.
+ write_controller_token_ =
+ write_controller->GetCompactionPressureToken();
+ if (mutable_cf_options.soft_pending_compaction_bytes_limit > 0) {
+ ROCKS_LOG_INFO(
+ ioptions_.info_log,
+ "[%s] Increasing compaction threads because of estimated pending "
+ "compaction "
+ "bytes %" PRIu64,
+ name_.c_str(), vstorage->estimated_compaction_needed_bytes());
+ }
+ } else {
+ write_controller_token_.reset();
+ }
+ // If the DB recovers from delay conditions, we reward with reducing
+ // double the slowdown ratio. This is to balance the long term slowdown
+ // increase signal.
+ if (needed_delay) {
+ uint64_t write_rate = write_controller->delayed_write_rate();
+ write_controller->set_delayed_write_rate(static_cast<uint64_t>(
+ static_cast<double>(write_rate) * kDelayRecoverSlowdownRatio));
+ // Set the low pri limit to be 1/4 the delayed write rate.
+ // Note we don't reset this value even after delay condition is relased.
+ // Low-pri rate will continue to apply if there is a compaction
+ // pressure.
+ write_controller->low_pri_rate_limiter()->SetBytesPerSecond(write_rate /
+ 4);
+ }
+ }
+ prev_compaction_needed_bytes_ = compaction_needed_bytes;
+ }
+}
+
+const EnvOptions* ColumnFamilyData::soptions() const {
+ return &(column_family_set_->env_options_);
+}
+
+void ColumnFamilyData::SetCurrent(Version* current_version) {
+ current_ = current_version;
+}
+
+uint64_t ColumnFamilyData::GetNumLiveVersions() const {
+ return VersionSet::GetNumLiveVersions(dummy_versions_);
+}
+
+uint64_t ColumnFamilyData::GetTotalSstFilesSize() const {
+ return VersionSet::GetTotalSstFilesSize(dummy_versions_);
+}
+
+MemTable* ColumnFamilyData::ConstructNewMemtable(
+ const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq) {
+ return new MemTable(internal_comparator_, ioptions_, mutable_cf_options,
+ write_buffer_manager_, earliest_seq, id_);
+}
+
+void ColumnFamilyData::CreateNewMemtable(
+ const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq) {
+ if (mem_ != nullptr) {
+ delete mem_->Unref();
+ }
+ SetMemtable(ConstructNewMemtable(mutable_cf_options, earliest_seq));
+ mem_->Ref();
+}
+
+bool ColumnFamilyData::NeedsCompaction() const {
+ return compaction_picker_->NeedsCompaction(current_->storage_info());
+}
+
+Compaction* ColumnFamilyData::PickCompaction(
+ const MutableCFOptions& mutable_options, LogBuffer* log_buffer) {
+ auto* result = compaction_picker_->PickCompaction(
+ GetName(), mutable_options, current_->storage_info(), log_buffer);
+ if (result != nullptr) {
+ result->SetInputVersion(current_);
+ }
+ return result;
+}
+
+bool ColumnFamilyData::RangeOverlapWithCompaction(
+ const Slice& smallest_user_key, const Slice& largest_user_key,
+ int level) const {
+ return compaction_picker_->RangeOverlapWithCompaction(
+ smallest_user_key, largest_user_key, level);
+}
+
+const int ColumnFamilyData::kCompactAllLevels = -1;
+const int ColumnFamilyData::kCompactToBaseLevel = -2;
+
+Compaction* ColumnFamilyData::CompactRange(
+ const MutableCFOptions& mutable_cf_options, int input_level,
+ int output_level, uint32_t output_path_id, const InternalKey* begin,
+ const InternalKey* end, InternalKey** compaction_end, bool* conflict) {
+ auto* result = compaction_picker_->CompactRange(
+ GetName(), mutable_cf_options, current_->storage_info(), input_level,
+ output_level, output_path_id, begin, end, compaction_end, conflict);
+ if (result != nullptr) {
+ result->SetInputVersion(current_);
+ }
+ return result;
+}
+
+SuperVersion* ColumnFamilyData::GetReferencedSuperVersion(
+ InstrumentedMutex* db_mutex) {
+ SuperVersion* sv = nullptr;
+ sv = GetThreadLocalSuperVersion(db_mutex);
+ sv->Ref();
+ if (!ReturnThreadLocalSuperVersion(sv)) {
+ sv->Unref();
+ }
+ return sv;
+}
+
+SuperVersion* ColumnFamilyData::GetThreadLocalSuperVersion(
+ InstrumentedMutex* db_mutex) {
+ SuperVersion* sv = nullptr;
+ // The SuperVersion is cached in thread local storage to avoid acquiring
+ // mutex when SuperVersion does not change since the last use. When a new
+ // SuperVersion is installed, the compaction or flush thread cleans up
+ // cached SuperVersion in all existing thread local storage. To avoid
+ // acquiring mutex for this operation, we use atomic Swap() on the thread
+ // local pointer to guarantee exclusive access. If the thread local pointer
+ // is being used while a new SuperVersion is installed, the cached
+ // SuperVersion can become stale. In that case, the background thread would
+ // have swapped in kSVObsolete. We re-check the value at when returning
+ // SuperVersion back to thread local, with an atomic compare and swap.
+ // The superversion will need to be released if detected to be stale.
+ void* ptr = local_sv_->Swap(SuperVersion::kSVInUse);
+ // Invariant:
+ // (1) Scrape (always) installs kSVObsolete in ThreadLocal storage
+ // (2) the Swap above (always) installs kSVInUse, ThreadLocal storage
+ // should only keep kSVInUse before ReturnThreadLocalSuperVersion call
+ // (if no Scrape happens).
+ assert(ptr != SuperVersion::kSVInUse);
+ sv = static_cast<SuperVersion*>(ptr);
+ if (sv == SuperVersion::kSVObsolete ||
+ sv->version_number != super_version_number_.load()) {
+ RecordTick(ioptions_.statistics, NUMBER_SUPERVERSION_ACQUIRES);
+ SuperVersion* sv_to_delete = nullptr;
+
+ if (sv && sv->Unref()) {
+ RecordTick(ioptions_.statistics, NUMBER_SUPERVERSION_CLEANUPS);
+ db_mutex->Lock();
+ // NOTE: underlying resources held by superversion (sst files) might
+ // not be released until the next background job.
+ sv->Cleanup();
+ sv_to_delete = sv;
+ } else {
+ db_mutex->Lock();
+ }
+ sv = super_version_->Ref();
+ db_mutex->Unlock();
+
+ delete sv_to_delete;
+ }
+ assert(sv != nullptr);
+ return sv;
+}
+
+bool ColumnFamilyData::ReturnThreadLocalSuperVersion(SuperVersion* sv) {
+ assert(sv != nullptr);
+ // Put the SuperVersion back
+ void* expected = SuperVersion::kSVInUse;
+ if (local_sv_->CompareAndSwap(static_cast<void*>(sv), expected)) {
+ // When we see kSVInUse in the ThreadLocal, we are sure ThreadLocal
+ // storage has not been altered and no Scrape has happened. The
+ // SuperVersion is still current.
+ return true;
+ } else {
+ // ThreadLocal scrape happened in the process of this GetImpl call (after
+ // thread local Swap() at the beginning and before CompareAndSwap()).
+ // This means the SuperVersion it holds is obsolete.
+ assert(expected == SuperVersion::kSVObsolete);
+ }
+ return false;
+}
+
+SuperVersion* ColumnFamilyData::InstallSuperVersion(
+ SuperVersion* new_superversion, InstrumentedMutex* db_mutex) {
+ db_mutex->AssertHeld();
+ return InstallSuperVersion(new_superversion, db_mutex, mutable_cf_options_);
+}
+
+SuperVersion* ColumnFamilyData::InstallSuperVersion(
+ SuperVersion* new_superversion, InstrumentedMutex* db_mutex,
+ const MutableCFOptions& mutable_cf_options) {
+ new_superversion->db_mutex = db_mutex;
+ new_superversion->mutable_cf_options = mutable_cf_options;
+ new_superversion->Init(mem_, imm_.current(), current_);
+ SuperVersion* old_superversion = super_version_;
+ super_version_ = new_superversion;
+ ++super_version_number_;
+ super_version_->version_number = super_version_number_;
+ // Reset SuperVersions cached in thread local storage
+ ResetThreadLocalSuperVersions();
+
+ RecalculateWriteStallConditions(mutable_cf_options);
+
+ if (old_superversion != nullptr && old_superversion->Unref()) {
+ old_superversion->Cleanup();
+ return old_superversion; // will let caller delete outside of mutex
+ }
+ return nullptr;
+}
+
+void ColumnFamilyData::ResetThreadLocalSuperVersions() {
+ autovector<void*> sv_ptrs;
+ local_sv_->Scrape(&sv_ptrs, SuperVersion::kSVObsolete);
+ for (auto ptr : sv_ptrs) {
+ assert(ptr);
+ if (ptr == SuperVersion::kSVInUse) {
+ continue;
+ }
+ auto sv = static_cast<SuperVersion*>(ptr);
+ if (sv->Unref()) {
+ sv->Cleanup();
+ delete sv;
+ }
+ }
+}
+
+#ifndef ROCKSDB_LITE
+Status ColumnFamilyData::SetOptions(
+ const std::unordered_map<std::string, std::string>& options_map) {
+ MutableCFOptions new_mutable_cf_options;
+ Status s = GetMutableOptionsFromStrings(mutable_cf_options_, options_map,
+ &new_mutable_cf_options);
+ if (s.ok()) {
+ mutable_cf_options_ = new_mutable_cf_options;
+ mutable_cf_options_.RefreshDerivedOptions(ioptions_);
+ }
+ return s;
+}
+#endif // ROCKSDB_LITE
+
+ColumnFamilySet::ColumnFamilySet(const std::string& dbname,
+ const ImmutableDBOptions* db_options,
+ const EnvOptions& env_options,
+ Cache* table_cache,
+ WriteBufferManager* write_buffer_manager,
+ WriteController* write_controller)
+ : max_column_family_(0),
+ dummy_cfd_(new ColumnFamilyData(0, "", nullptr, nullptr, nullptr,
+ ColumnFamilyOptions(), *db_options,
+ env_options, nullptr)),
+ default_cfd_cache_(nullptr),
+ db_name_(dbname),
+ db_options_(db_options),
+ env_options_(env_options),
+ table_cache_(table_cache),
+ write_buffer_manager_(write_buffer_manager),
+ write_controller_(write_controller) {
+ // initialize linked list
+ dummy_cfd_->prev_ = dummy_cfd_;
+ dummy_cfd_->next_ = dummy_cfd_;
+}
+
+ColumnFamilySet::~ColumnFamilySet() {
+ while (column_family_data_.size() > 0) {
+ // cfd destructor will delete itself from column_family_data_
+ auto cfd = column_family_data_.begin()->second;
+ cfd->Unref();
+ delete cfd;
+ }
+ dummy_cfd_->Unref();
+ delete dummy_cfd_;
+}
+
+ColumnFamilyData* ColumnFamilySet::GetDefault() const {
+ assert(default_cfd_cache_ != nullptr);
+ return default_cfd_cache_;
+}
+
+ColumnFamilyData* ColumnFamilySet::GetColumnFamily(uint32_t id) const {
+ auto cfd_iter = column_family_data_.find(id);
+ if (cfd_iter != column_family_data_.end()) {
+ return cfd_iter->second;
+ } else {
+ return nullptr;
+ }
+}
+
+ColumnFamilyData* ColumnFamilySet::GetColumnFamily(const std::string& name)
+ const {
+ auto cfd_iter = column_families_.find(name);
+ if (cfd_iter != column_families_.end()) {
+ auto cfd = GetColumnFamily(cfd_iter->second);
+ assert(cfd != nullptr);
+ return cfd;
+ } else {
+ return nullptr;
+ }
+}
+
+uint32_t ColumnFamilySet::GetNextColumnFamilyID() {
+ return ++max_column_family_;
+}
+
+uint32_t ColumnFamilySet::GetMaxColumnFamily() { return max_column_family_; }
+
+void ColumnFamilySet::UpdateMaxColumnFamily(uint32_t new_max_column_family) {
+ max_column_family_ = std::max(new_max_column_family, max_column_family_);
+}
+
+size_t ColumnFamilySet::NumberOfColumnFamilies() const {
+ return column_families_.size();
+}
+
+// under a DB mutex AND write thread
+ColumnFamilyData* ColumnFamilySet::CreateColumnFamily(
+ const std::string& name, uint32_t id, Version* dummy_versions,
+ const ColumnFamilyOptions& options) {
+ assert(column_families_.find(name) == column_families_.end());
+ ColumnFamilyData* new_cfd = new ColumnFamilyData(
+ id, name, dummy_versions, table_cache_, write_buffer_manager_, options,
+ *db_options_, env_options_, this);
+ column_families_.insert({name, id});
+ column_family_data_.insert({id, new_cfd});
+ max_column_family_ = std::max(max_column_family_, id);
+ // add to linked list
+ new_cfd->next_ = dummy_cfd_;
+ auto prev = dummy_cfd_->prev_;
+ new_cfd->prev_ = prev;
+ prev->next_ = new_cfd;
+ dummy_cfd_->prev_ = new_cfd;
+ if (id == 0) {
+ default_cfd_cache_ = new_cfd;
+ }
+ return new_cfd;
+}
+
+// REQUIRES: DB mutex held
+void ColumnFamilySet::FreeDeadColumnFamilies() {
+ autovector<ColumnFamilyData*> to_delete;
+ for (auto cfd = dummy_cfd_->next_; cfd != dummy_cfd_; cfd = cfd->next_) {
+ if (cfd->refs_.load(std::memory_order_relaxed) == 0) {
+ to_delete.push_back(cfd);
+ }
+ }
+ for (auto cfd : to_delete) {
+ // this is very rare, so it's not a problem that we do it under a mutex
+ delete cfd;
+ }
+}
+
+// under a DB mutex AND from a write thread
+void ColumnFamilySet::RemoveColumnFamily(ColumnFamilyData* cfd) {
+ auto cfd_iter = column_family_data_.find(cfd->GetID());
+ assert(cfd_iter != column_family_data_.end());
+ column_family_data_.erase(cfd_iter);
+ column_families_.erase(cfd->GetName());
+}
+
+// under a DB mutex OR from a write thread
+bool ColumnFamilyMemTablesImpl::Seek(uint32_t column_family_id) {
+ if (column_family_id == 0) {
+ // optimization for common case
+ current_ = column_family_set_->GetDefault();
+ } else {
+ current_ = column_family_set_->GetColumnFamily(column_family_id);
+ }
+ handle_.SetCFD(current_);
+ return current_ != nullptr;
+}
+
+uint64_t ColumnFamilyMemTablesImpl::GetLogNumber() const {
+ assert(current_ != nullptr);
+ return current_->GetLogNumber();
+}
+
+MemTable* ColumnFamilyMemTablesImpl::GetMemTable() const {
+ assert(current_ != nullptr);
+ return current_->mem();
+}
+
+ColumnFamilyHandle* ColumnFamilyMemTablesImpl::GetColumnFamilyHandle() {
+ assert(current_ != nullptr);
+ return &handle_;
+}
+
+uint32_t GetColumnFamilyID(ColumnFamilyHandle* column_family) {
+ uint32_t column_family_id = 0;
+ if (column_family != nullptr) {
+ auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
+ column_family_id = cfh->GetID();
+ }
+ return column_family_id;
+}
+
+const Comparator* GetColumnFamilyUserComparator(
+ ColumnFamilyHandle* column_family) {
+ if (column_family != nullptr) {
+ return column_family->GetComparator();
+ }
+ return nullptr;
+}
+
+} // namespace rocksdb
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/column_family.h
----------------------------------------------------------------------
diff --git a/thirdparty/rocksdb/db/column_family.h b/thirdparty/rocksdb/db/column_family.h
new file mode 100644
index 0000000..3a807d2
--- /dev/null
+++ b/thirdparty/rocksdb/db/column_family.h
@@ -0,0 +1,577 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#pragma once
+
+#include <unordered_map>
+#include <string>
+#include <vector>
+#include <atomic>
+
+#include "db/memtable_list.h"
+#include "db/table_cache.h"
+#include "db/table_properties_collector.h"
+#include "db/write_batch_internal.h"
+#include "db/write_controller.h"
+#include "options/cf_options.h"
+#include "rocksdb/compaction_job_stats.h"
+#include "rocksdb/db.h"
+#include "rocksdb/env.h"
+#include "rocksdb/options.h"
+#include "util/thread_local.h"
+
+namespace rocksdb {
+
+class Version;
+class VersionSet;
+class MemTable;
+class MemTableListVersion;
+class CompactionPicker;
+class Compaction;
+class InternalKey;
+class InternalStats;
+class ColumnFamilyData;
+class DBImpl;
+class LogBuffer;
+class InstrumentedMutex;
+class InstrumentedMutexLock;
+
+extern const double kIncSlowdownRatio;
+
+// ColumnFamilyHandleImpl is the class that clients use to access different
+// column families. It has non-trivial destructor, which gets called when client
+// is done using the column family
+class ColumnFamilyHandleImpl : public ColumnFamilyHandle {
+ public:
+ // create while holding the mutex
+ ColumnFamilyHandleImpl(
+ ColumnFamilyData* cfd, DBImpl* db, InstrumentedMutex* mutex);
+ // destroy without mutex
+ virtual ~ColumnFamilyHandleImpl();
+ virtual ColumnFamilyData* cfd() const { return cfd_; }
+
+ virtual uint32_t GetID() const override;
+ virtual const std::string& GetName() const override;
+ virtual Status GetDescriptor(ColumnFamilyDescriptor* desc) override;
+ virtual const Comparator* GetComparator() const override;
+
+ private:
+ ColumnFamilyData* cfd_;
+ DBImpl* db_;
+ InstrumentedMutex* mutex_;
+};
+
+// Does not ref-count ColumnFamilyData
+// We use this dummy ColumnFamilyHandleImpl because sometimes MemTableInserter
+// calls DBImpl methods. When this happens, MemTableInserter need access to
+// ColumnFamilyHandle (same as the client would need). In that case, we feed
+// MemTableInserter dummy ColumnFamilyHandle and enable it to call DBImpl
+// methods
+class ColumnFamilyHandleInternal : public ColumnFamilyHandleImpl {
+ public:
+ ColumnFamilyHandleInternal()
+ : ColumnFamilyHandleImpl(nullptr, nullptr, nullptr) {}
+
+ void SetCFD(ColumnFamilyData* _cfd) { internal_cfd_ = _cfd; }
+ virtual ColumnFamilyData* cfd() const override { return internal_cfd_; }
+
+ private:
+ ColumnFamilyData* internal_cfd_;
+};
+
+// holds references to memtable, all immutable memtables and version
+struct SuperVersion {
+ // Accessing members of this class is not thread-safe and requires external
+ // synchronization (ie db mutex held or on write thread).
+ MemTable* mem;
+ MemTableListVersion* imm;
+ Version* current;
+ MutableCFOptions mutable_cf_options;
+ // Version number of the current SuperVersion
+ uint64_t version_number;
+
+ InstrumentedMutex* db_mutex;
+
+ // should be called outside the mutex
+ SuperVersion() = default;
+ ~SuperVersion();
+ SuperVersion* Ref();
+ // If Unref() returns true, Cleanup() should be called with mutex held
+ // before deleting this SuperVersion.
+ bool Unref();
+
+ // call these two methods with db mutex held
+ // Cleanup unrefs mem, imm and current. Also, it stores all memtables
+ // that needs to be deleted in to_delete vector. Unrefing those
+ // objects needs to be done in the mutex
+ void Cleanup();
+ void Init(MemTable* new_mem, MemTableListVersion* new_imm,
+ Version* new_current);
+
+ // The value of dummy is not actually used. kSVInUse takes its address as a
+ // mark in the thread local storage to indicate the SuperVersion is in use
+ // by thread. This way, the value of kSVInUse is guaranteed to have no
+ // conflict with SuperVersion object address and portable on different
+ // platform.
+ static int dummy;
+ static void* const kSVInUse;
+ static void* const kSVObsolete;
+
+ private:
+ std::atomic<uint32_t> refs;
+ // We need to_delete because during Cleanup(), imm->Unref() returns
+ // all memtables that we need to free through this vector. We then
+ // delete all those memtables outside of mutex, during destruction
+ autovector<MemTable*> to_delete;
+};
+
+extern Status CheckCompressionSupported(const ColumnFamilyOptions& cf_options);
+
+extern Status CheckConcurrentWritesSupported(
+ const ColumnFamilyOptions& cf_options);
+
+extern ColumnFamilyOptions SanitizeOptions(const ImmutableDBOptions& db_options,
+ const ColumnFamilyOptions& src);
+// Wrap user defined table proproties collector factories `from cf_options`
+// into internal ones in int_tbl_prop_collector_factories. Add a system internal
+// one too.
+extern void GetIntTblPropCollectorFactory(
+ const ImmutableCFOptions& ioptions,
+ std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
+ int_tbl_prop_collector_factories);
+
+class ColumnFamilySet;
+
+// This class keeps all the data that a column family needs.
+// Most methods require DB mutex held, unless otherwise noted
+class ColumnFamilyData {
+ public:
+ ~ColumnFamilyData();
+
+ // thread-safe
+ uint32_t GetID() const { return id_; }
+ // thread-safe
+ const std::string& GetName() const { return name_; }
+
+ // Ref() can only be called from a context where the caller can guarantee
+ // that ColumnFamilyData is alive (while holding a non-zero ref already,
+ // holding a DB mutex, or as the leader in a write batch group).
+ void Ref() { refs_.fetch_add(1, std::memory_order_relaxed); }
+
+ // Unref decreases the reference count, but does not handle deletion
+ // when the count goes to 0. If this method returns true then the
+ // caller should delete the instance immediately, or later, by calling
+ // FreeDeadColumnFamilies(). Unref() can only be called while holding
+ // a DB mutex, or during single-threaded recovery.
+ bool Unref() {
+ int old_refs = refs_.fetch_sub(1, std::memory_order_relaxed);
+ assert(old_refs > 0);
+ return old_refs == 1;
+ }
+
+ // SetDropped() can only be called under following conditions:
+ // 1) Holding a DB mutex,
+ // 2) from single-threaded write thread, AND
+ // 3) from single-threaded VersionSet::LogAndApply()
+ // After dropping column family no other operation on that column family
+ // will be executed. All the files and memory will be, however, kept around
+ // until client drops the column family handle. That way, client can still
+ // access data from dropped column family.
+ // Column family can be dropped and still alive. In that state:
+ // *) Compaction and flush is not executed on the dropped column family.
+ // *) Client can continue reading from column family. Writes will fail unless
+ // WriteOptions::ignore_missing_column_families is true
+ // When the dropped column family is unreferenced, then we:
+ // *) Remove column family from the linked list maintained by ColumnFamilySet
+ // *) delete all memory associated with that column family
+ // *) delete all the files associated with that column family
+ void SetDropped();
+ bool IsDropped() const { return dropped_; }
+
+ // thread-safe
+ int NumberLevels() const { return ioptions_.num_levels; }
+
+ void SetLogNumber(uint64_t log_number) { log_number_ = log_number; }
+ uint64_t GetLogNumber() const { return log_number_; }
+
+ // thread-safe
+ const EnvOptions* soptions() const;
+ const ImmutableCFOptions* ioptions() const { return &ioptions_; }
+ // REQUIRES: DB mutex held
+ // This returns the MutableCFOptions used by current SuperVersion
+ // You should use this API to reference MutableCFOptions most of the time.
+ const MutableCFOptions* GetCurrentMutableCFOptions() const {
+ return &(super_version_->mutable_cf_options);
+ }
+ // REQUIRES: DB mutex held
+ // This returns the latest MutableCFOptions, which may be not in effect yet.
+ const MutableCFOptions* GetLatestMutableCFOptions() const {
+ return &mutable_cf_options_;
+ }
+
+ // REQUIRES: DB mutex held
+ // Build ColumnFamiliesOptions with immutable options and latest mutable
+ // options.
+ ColumnFamilyOptions GetLatestCFOptions() const;
+
+ bool is_delete_range_supported() { return is_delete_range_supported_; }
+
+#ifndef ROCKSDB_LITE
+ // REQUIRES: DB mutex held
+ Status SetOptions(
+ const std::unordered_map<std::string, std::string>& options_map);
+#endif // ROCKSDB_LITE
+
+ InternalStats* internal_stats() { return internal_stats_.get(); }
+
+ MemTableList* imm() { return &imm_; }
+ MemTable* mem() { return mem_; }
+ Version* current() { return current_; }
+ Version* dummy_versions() { return dummy_versions_; }
+ void SetCurrent(Version* _current);
+ uint64_t GetNumLiveVersions() const; // REQUIRE: DB mutex held
+ uint64_t GetTotalSstFilesSize() const; // REQUIRE: DB mutex held
+ void SetMemtable(MemTable* new_mem) { mem_ = new_mem; }
+
+ // calculate the oldest log needed for the durability of this column family
+ uint64_t OldestLogToKeep();
+
+ // See Memtable constructor for explanation of earliest_seq param.
+ MemTable* ConstructNewMemtable(const MutableCFOptions& mutable_cf_options,
+ SequenceNumber earliest_seq);
+ void CreateNewMemtable(const MutableCFOptions& mutable_cf_options,
+ SequenceNumber earliest_seq);
+
+ TableCache* table_cache() const { return table_cache_.get(); }
+
+ // See documentation in compaction_picker.h
+ // REQUIRES: DB mutex held
+ bool NeedsCompaction() const;
+ // REQUIRES: DB mutex held
+ Compaction* PickCompaction(const MutableCFOptions& mutable_options,
+ LogBuffer* log_buffer);
+
+ // Check if the passed range overlap with any running compactions.
+ // REQUIRES: DB mutex held
+ bool RangeOverlapWithCompaction(const Slice& smallest_user_key,
+ const Slice& largest_user_key,
+ int level) const;
+
+ // A flag to tell a manual compaction is to compact all levels together
+ // instad of for specific level.
+ static const int kCompactAllLevels;
+ // A flag to tell a manual compaction's output is base level.
+ static const int kCompactToBaseLevel;
+ // REQUIRES: DB mutex held
+ Compaction* CompactRange(const MutableCFOptions& mutable_cf_options,
+ int input_level, int output_level,
+ uint32_t output_path_id, const InternalKey* begin,
+ const InternalKey* end, InternalKey** compaction_end,
+ bool* manual_conflict);
+
+ CompactionPicker* compaction_picker() { return compaction_picker_.get(); }
+ // thread-safe
+ const Comparator* user_comparator() const {
+ return internal_comparator_.user_comparator();
+ }
+ // thread-safe
+ const InternalKeyComparator& internal_comparator() const {
+ return internal_comparator_;
+ }
+
+ const std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
+ int_tbl_prop_collector_factories() const {
+ return &int_tbl_prop_collector_factories_;
+ }
+
+ SuperVersion* GetSuperVersion() { return super_version_; }
+ // thread-safe
+ // Return a already referenced SuperVersion to be used safely.
+ SuperVersion* GetReferencedSuperVersion(InstrumentedMutex* db_mutex);
+ // thread-safe
+ // Get SuperVersion stored in thread local storage. If it does not exist,
+ // get a reference from a current SuperVersion.
+ SuperVersion* GetThreadLocalSuperVersion(InstrumentedMutex* db_mutex);
+ // Try to return SuperVersion back to thread local storage. Retrun true on
+ // success and false on failure. It fails when the thread local storage
+ // contains anything other than SuperVersion::kSVInUse flag.
+ bool ReturnThreadLocalSuperVersion(SuperVersion* sv);
+ // thread-safe
+ uint64_t GetSuperVersionNumber() const {
+ return super_version_number_.load();
+ }
+ // will return a pointer to SuperVersion* if previous SuperVersion
+ // if its reference count is zero and needs deletion or nullptr if not
+ // As argument takes a pointer to allocated SuperVersion to enable
+ // the clients to allocate SuperVersion outside of mutex.
+ // IMPORTANT: Only call this from DBImpl::InstallSuperVersion()
+ SuperVersion* InstallSuperVersion(SuperVersion* new_superversion,
+ InstrumentedMutex* db_mutex,
+ const MutableCFOptions& mutable_cf_options);
+ SuperVersion* InstallSuperVersion(SuperVersion* new_superversion,
+ InstrumentedMutex* db_mutex);
+
+ void ResetThreadLocalSuperVersions();
+
+ // Protected by DB mutex
+ void set_pending_flush(bool value) { pending_flush_ = value; }
+ void set_pending_compaction(bool value) { pending_compaction_ = value; }
+ bool pending_flush() { return pending_flush_; }
+ bool pending_compaction() { return pending_compaction_; }
+
+ // Recalculate some small conditions, which are changed only during
+ // compaction, adding new memtable and/or
+ // recalculation of compaction score. These values are used in
+ // DBImpl::MakeRoomForWrite function to decide, if it need to make
+ // a write stall
+ void RecalculateWriteStallConditions(
+ const MutableCFOptions& mutable_cf_options);
+
+ void set_initialized() { initialized_.store(true); }
+
+ bool initialized() const { return initialized_.load(); }
+
+ private:
+ friend class ColumnFamilySet;
+ ColumnFamilyData(uint32_t id, const std::string& name,
+ Version* dummy_versions, Cache* table_cache,
+ WriteBufferManager* write_buffer_manager,
+ const ColumnFamilyOptions& options,
+ const ImmutableDBOptions& db_options,
+ const EnvOptions& env_options,
+ ColumnFamilySet* column_family_set);
+
+ uint32_t id_;
+ const std::string name_;
+ Version* dummy_versions_; // Head of circular doubly-linked list of versions.
+ Version* current_; // == dummy_versions->prev_
+
+ std::atomic<int> refs_; // outstanding references to ColumnFamilyData
+ std::atomic<bool> initialized_;
+ bool dropped_; // true if client dropped it
+
+ const InternalKeyComparator internal_comparator_;
+ std::vector<std::unique_ptr<IntTblPropCollectorFactory>>
+ int_tbl_prop_collector_factories_;
+
+ const ColumnFamilyOptions initial_cf_options_;
+ const ImmutableCFOptions ioptions_;
+ MutableCFOptions mutable_cf_options_;
+
+ const bool is_delete_range_supported_;
+
+ std::unique_ptr<TableCache> table_cache_;
+
+ std::unique_ptr<InternalStats> internal_stats_;
+
+ WriteBufferManager* write_buffer_manager_;
+
+ MemTable* mem_;
+ MemTableList imm_;
+ SuperVersion* super_version_;
+
+ // An ordinal representing the current SuperVersion. Updated by
+ // InstallSuperVersion(), i.e. incremented every time super_version_
+ // changes.
+ std::atomic<uint64_t> super_version_number_;
+
+ // Thread's local copy of SuperVersion pointer
+ // This needs to be destructed before mutex_
+ std::unique_ptr<ThreadLocalPtr> local_sv_;
+
+ // pointers for a circular linked list. we use it to support iterations over
+ // all column families that are alive (note: dropped column families can also
+ // be alive as long as client holds a reference)
+ ColumnFamilyData* next_;
+ ColumnFamilyData* prev_;
+
+ // This is the earliest log file number that contains data from this
+ // Column Family. All earlier log files must be ignored and not
+ // recovered from
+ uint64_t log_number_;
+
+ // An object that keeps all the compaction stats
+ // and picks the next compaction
+ std::unique_ptr<CompactionPicker> compaction_picker_;
+
+ ColumnFamilySet* column_family_set_;
+
+ std::unique_ptr<WriteControllerToken> write_controller_token_;
+
+ // If true --> this ColumnFamily is currently present in DBImpl::flush_queue_
+ bool pending_flush_;
+
+ // If true --> this ColumnFamily is currently present in
+ // DBImpl::compaction_queue_
+ bool pending_compaction_;
+
+ uint64_t prev_compaction_needed_bytes_;
+
+ // if the database was opened with 2pc enabled
+ bool allow_2pc_;
+};
+
+// ColumnFamilySet has interesting thread-safety requirements
+// * CreateColumnFamily() or RemoveColumnFamily() -- need to be protected by DB
+// mutex AND executed in the write thread.
+// CreateColumnFamily() should ONLY be called from VersionSet::LogAndApply() AND
+// single-threaded write thread. It is also called during Recovery and in
+// DumpManifest().
+// RemoveColumnFamily() is only called from SetDropped(). DB mutex needs to be
+// held and it needs to be executed from the write thread. SetDropped() also
+// guarantees that it will be called only from single-threaded LogAndApply(),
+// but this condition is not that important.
+// * Iteration -- hold DB mutex, but you can release it in the body of
+// iteration. If you release DB mutex in body, reference the column
+// family before the mutex and unreference after you unlock, since the column
+// family might get dropped when the DB mutex is released
+// * GetDefault() -- thread safe
+// * GetColumnFamily() -- either inside of DB mutex or from a write thread
+// * GetNextColumnFamilyID(), GetMaxColumnFamily(), UpdateMaxColumnFamily(),
+// NumberOfColumnFamilies -- inside of DB mutex
+class ColumnFamilySet {
+ public:
+ // ColumnFamilySet supports iteration
+ class iterator {
+ public:
+ explicit iterator(ColumnFamilyData* cfd)
+ : current_(cfd) {}
+ iterator& operator++() {
+ // dropped column families might still be included in this iteration
+ // (we're only removing them when client drops the last reference to the
+ // column family).
+ // dummy is never dead, so this will never be infinite
+ do {
+ current_ = current_->next_;
+ } while (current_->refs_.load(std::memory_order_relaxed) == 0);
+ return *this;
+ }
+ bool operator!=(const iterator& other) {
+ return this->current_ != other.current_;
+ }
+ ColumnFamilyData* operator*() { return current_; }
+
+ private:
+ ColumnFamilyData* current_;
+ };
+
+ ColumnFamilySet(const std::string& dbname,
+ const ImmutableDBOptions* db_options,
+ const EnvOptions& env_options, Cache* table_cache,
+ WriteBufferManager* write_buffer_manager,
+ WriteController* write_controller);
+ ~ColumnFamilySet();
+
+ ColumnFamilyData* GetDefault() const;
+ // GetColumnFamily() calls return nullptr if column family is not found
+ ColumnFamilyData* GetColumnFamily(uint32_t id) const;
+ ColumnFamilyData* GetColumnFamily(const std::string& name) const;
+ // this call will return the next available column family ID. it guarantees
+ // that there is no column family with id greater than or equal to the
+ // returned value in the current running instance or anytime in RocksDB
+ // instance history.
+ uint32_t GetNextColumnFamilyID();
+ uint32_t GetMaxColumnFamily();
+ void UpdateMaxColumnFamily(uint32_t new_max_column_family);
+ size_t NumberOfColumnFamilies() const;
+
+ ColumnFamilyData* CreateColumnFamily(const std::string& name, uint32_t id,
+ Version* dummy_version,
+ const ColumnFamilyOptions& options);
+
+ iterator begin() { return iterator(dummy_cfd_->next_); }
+ iterator end() { return iterator(dummy_cfd_); }
+
+ // REQUIRES: DB mutex held
+ // Don't call while iterating over ColumnFamilySet
+ void FreeDeadColumnFamilies();
+
+ Cache* get_table_cache() { return table_cache_; }
+
+ private:
+ friend class ColumnFamilyData;
+ // helper function that gets called from cfd destructor
+ // REQUIRES: DB mutex held
+ void RemoveColumnFamily(ColumnFamilyData* cfd);
+
+ // column_families_ and column_family_data_ need to be protected:
+ // * when mutating both conditions have to be satisfied:
+ // 1. DB mutex locked
+ // 2. thread currently in single-threaded write thread
+ // * when reading, at least one condition needs to be satisfied:
+ // 1. DB mutex locked
+ // 2. accessed from a single-threaded write thread
+ std::unordered_map<std::string, uint32_t> column_families_;
+ std::unordered_map<uint32_t, ColumnFamilyData*> column_family_data_;
+
+ uint32_t max_column_family_;
+ ColumnFamilyData* dummy_cfd_;
+ // We don't hold the refcount here, since default column family always exists
+ // We are also not responsible for cleaning up default_cfd_cache_. This is
+ // just a cache that makes common case (accessing default column family)
+ // faster
+ ColumnFamilyData* default_cfd_cache_;
+
+ const std::string db_name_;
+ const ImmutableDBOptions* const db_options_;
+ const EnvOptions env_options_;
+ Cache* table_cache_;
+ WriteBufferManager* write_buffer_manager_;
+ WriteController* write_controller_;
+};
+
+// We use ColumnFamilyMemTablesImpl to provide WriteBatch a way to access
+// memtables of different column families (specified by ID in the write batch)
+class ColumnFamilyMemTablesImpl : public ColumnFamilyMemTables {
+ public:
+ explicit ColumnFamilyMemTablesImpl(ColumnFamilySet* column_family_set)
+ : column_family_set_(column_family_set), current_(nullptr) {}
+
+ // Constructs a ColumnFamilyMemTablesImpl equivalent to one constructed
+ // with the arguments used to construct *orig.
+ explicit ColumnFamilyMemTablesImpl(ColumnFamilyMemTablesImpl* orig)
+ : column_family_set_(orig->column_family_set_), current_(nullptr) {}
+
+ // sets current_ to ColumnFamilyData with column_family_id
+ // returns false if column family doesn't exist
+ // REQUIRES: use this function of DBImpl::column_family_memtables_ should be
+ // under a DB mutex OR from a write thread
+ bool Seek(uint32_t column_family_id) override;
+
+ // Returns log number of the selected column family
+ // REQUIRES: under a DB mutex OR from a write thread
+ uint64_t GetLogNumber() const override;
+
+ // REQUIRES: Seek() called first
+ // REQUIRES: use this function of DBImpl::column_family_memtables_ should be
+ // under a DB mutex OR from a write thread
+ virtual MemTable* GetMemTable() const override;
+
+ // Returns column family handle for the selected column family
+ // REQUIRES: use this function of DBImpl::column_family_memtables_ should be
+ // under a DB mutex OR from a write thread
+ virtual ColumnFamilyHandle* GetColumnFamilyHandle() override;
+
+ // Cannot be called while another thread is calling Seek().
+ // REQUIRES: use this function of DBImpl::column_family_memtables_ should be
+ // under a DB mutex OR from a write thread
+ virtual ColumnFamilyData* current() override { return current_; }
+
+ private:
+ ColumnFamilySet* column_family_set_;
+ ColumnFamilyData* current_;
+ ColumnFamilyHandleInternal handle_;
+};
+
+extern uint32_t GetColumnFamilyID(ColumnFamilyHandle* column_family);
+
+extern const Comparator* GetColumnFamilyUserComparator(
+ ColumnFamilyHandle* column_family);
+
+} // namespace rocksdb
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/compacted_db_impl.cc
----------------------------------------------------------------------
diff --git a/thirdparty/rocksdb/db/compacted_db_impl.cc b/thirdparty/rocksdb/db/compacted_db_impl.cc
new file mode 100644
index 0000000..d1007d9
--- /dev/null
+++ b/thirdparty/rocksdb/db/compacted_db_impl.cc
@@ -0,0 +1,166 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#ifndef ROCKSDB_LITE
+#include "db/compacted_db_impl.h"
+#include "db/db_impl.h"
+#include "db/version_set.h"
+#include "table/get_context.h"
+
+namespace rocksdb {
+
+extern void MarkKeyMayExist(void* arg);
+extern bool SaveValue(void* arg, const ParsedInternalKey& parsed_key,
+ const Slice& v, bool hit_and_return);
+
+CompactedDBImpl::CompactedDBImpl(
+ const DBOptions& options, const std::string& dbname)
+ : DBImpl(options, dbname) {
+}
+
+CompactedDBImpl::~CompactedDBImpl() {
+}
+
+size_t CompactedDBImpl::FindFile(const Slice& key) {
+ size_t left = 0;
+ size_t right = files_.num_files - 1;
+ while (left < right) {
+ size_t mid = (left + right) >> 1;
+ const FdWithKeyRange& f = files_.files[mid];
+ if (user_comparator_->Compare(ExtractUserKey(f.largest_key), key) < 0) {
+ // Key at "mid.largest" is < "target". Therefore all
+ // files at or before "mid" are uninteresting.
+ left = mid + 1;
+ } else {
+ // Key at "mid.largest" is >= "target". Therefore all files
+ // after "mid" are uninteresting.
+ right = mid;
+ }
+ }
+ return right;
+}
+
+Status CompactedDBImpl::Get(const ReadOptions& options, ColumnFamilyHandle*,
+ const Slice& key, PinnableSlice* value) {
+ GetContext get_context(user_comparator_, nullptr, nullptr, nullptr,
+ GetContext::kNotFound, key, value, nullptr, nullptr,
+ nullptr, nullptr);
+ LookupKey lkey(key, kMaxSequenceNumber);
+ files_.files[FindFile(key)].fd.table_reader->Get(
+ options, lkey.internal_key(), &get_context);
+ if (get_context.State() == GetContext::kFound) {
+ return Status::OK();
+ }
+ return Status::NotFound();
+}
+
+std::vector<Status> CompactedDBImpl::MultiGet(const ReadOptions& options,
+ const std::vector<ColumnFamilyHandle*>&,
+ const std::vector<Slice>& keys, std::vector<std::string>* values) {
+ autovector<TableReader*, 16> reader_list;
+ for (const auto& key : keys) {
+ const FdWithKeyRange& f = files_.files[FindFile(key)];
+ if (user_comparator_->Compare(key, ExtractUserKey(f.smallest_key)) < 0) {
+ reader_list.push_back(nullptr);
+ } else {
+ LookupKey lkey(key, kMaxSequenceNumber);
+ f.fd.table_reader->Prepare(lkey.internal_key());
+ reader_list.push_back(f.fd.table_reader);
+ }
+ }
+ std::vector<Status> statuses(keys.size(), Status::NotFound());
+ values->resize(keys.size());
+ int idx = 0;
+ for (auto* r : reader_list) {
+ if (r != nullptr) {
+ PinnableSlice pinnable_val;
+ std::string& value = (*values)[idx];
+ GetContext get_context(user_comparator_, nullptr, nullptr, nullptr,
+ GetContext::kNotFound, keys[idx], &pinnable_val,
+ nullptr, nullptr, nullptr, nullptr);
+ LookupKey lkey(keys[idx], kMaxSequenceNumber);
+ r->Get(options, lkey.internal_key(), &get_context);
+ value.assign(pinnable_val.data(), pinnable_val.size());
+ if (get_context.State() == GetContext::kFound) {
+ statuses[idx] = Status::OK();
+ }
+ }
+ ++idx;
+ }
+ return statuses;
+}
+
+Status CompactedDBImpl::Init(const Options& options) {
+ mutex_.Lock();
+ ColumnFamilyDescriptor cf(kDefaultColumnFamilyName,
+ ColumnFamilyOptions(options));
+ Status s = Recover({cf}, true /* read only */, false, true);
+ if (s.ok()) {
+ cfd_ = reinterpret_cast<ColumnFamilyHandleImpl*>(
+ DefaultColumnFamily())->cfd();
+ delete cfd_->InstallSuperVersion(new SuperVersion(), &mutex_);
+ }
+ mutex_.Unlock();
+ if (!s.ok()) {
+ return s;
+ }
+ NewThreadStatusCfInfo(cfd_);
+ version_ = cfd_->GetSuperVersion()->current;
+ user_comparator_ = cfd_->user_comparator();
+ auto* vstorage = version_->storage_info();
+ if (vstorage->num_non_empty_levels() == 0) {
+ return Status::NotSupported("no file exists");
+ }
+ const LevelFilesBrief& l0 = vstorage->LevelFilesBrief(0);
+ // L0 should not have files
+ if (l0.num_files > 1) {
+ return Status::NotSupported("L0 contain more than 1 file");
+ }
+ if (l0.num_files == 1) {
+ if (vstorage->num_non_empty_levels() > 1) {
+ return Status::NotSupported("Both L0 and other level contain files");
+ }
+ files_ = l0;
+ return Status::OK();
+ }
+
+ for (int i = 1; i < vstorage->num_non_empty_levels() - 1; ++i) {
+ if (vstorage->LevelFilesBrief(i).num_files > 0) {
+ return Status::NotSupported("Other levels also contain files");
+ }
+ }
+
+ int level = vstorage->num_non_empty_levels() - 1;
+ if (vstorage->LevelFilesBrief(level).num_files > 0) {
+ files_ = vstorage->LevelFilesBrief(level);
+ return Status::OK();
+ }
+ return Status::NotSupported("no file exists");
+}
+
+Status CompactedDBImpl::Open(const Options& options,
+ const std::string& dbname, DB** dbptr) {
+ *dbptr = nullptr;
+
+ if (options.max_open_files != -1) {
+ return Status::InvalidArgument("require max_open_files = -1");
+ }
+ if (options.merge_operator.get() != nullptr) {
+ return Status::InvalidArgument("merge operator is not supported");
+ }
+ DBOptions db_options(options);
+ std::unique_ptr<CompactedDBImpl> db(new CompactedDBImpl(db_options, dbname));
+ Status s = db->Init(options);
+ if (s.ok()) {
+ ROCKS_LOG_INFO(db->immutable_db_options_.info_log,
+ "Opened the db as fully compacted mode");
+ LogFlush(db->immutable_db_options_.info_log);
+ *dbptr = db.release();
+ }
+ return s;
+}
+
+} // namespace rocksdb
+#endif // ROCKSDB_LITE
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/compacted_db_impl.h
----------------------------------------------------------------------
diff --git a/thirdparty/rocksdb/db/compacted_db_impl.h b/thirdparty/rocksdb/db/compacted_db_impl.h
new file mode 100644
index 0000000..de32f21
--- /dev/null
+++ b/thirdparty/rocksdb/db/compacted_db_impl.h
@@ -0,0 +1,102 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#pragma once
+#ifndef ROCKSDB_LITE
+#include "db/db_impl.h"
+#include <vector>
+#include <string>
+
+namespace rocksdb {
+
+class CompactedDBImpl : public DBImpl {
+ public:
+ CompactedDBImpl(const DBOptions& options, const std::string& dbname);
+ virtual ~CompactedDBImpl();
+
+ static Status Open(const Options& options, const std::string& dbname,
+ DB** dbptr);
+
+ // Implementations of the DB interface
+ using DB::Get;
+ virtual Status Get(const ReadOptions& options,
+ ColumnFamilyHandle* column_family, const Slice& key,
+ PinnableSlice* value) override;
+ using DB::MultiGet;
+ virtual std::vector<Status> MultiGet(
+ const ReadOptions& options,
+ const std::vector<ColumnFamilyHandle*>&,
+ const std::vector<Slice>& keys, std::vector<std::string>* values)
+ override;
+
+ using DBImpl::Put;
+ virtual Status Put(const WriteOptions& options,
+ ColumnFamilyHandle* column_family, const Slice& key,
+ const Slice& value) override {
+ return Status::NotSupported("Not supported in compacted db mode.");
+ }
+ using DBImpl::Merge;
+ virtual Status Merge(const WriteOptions& options,
+ ColumnFamilyHandle* column_family, const Slice& key,
+ const Slice& value) override {
+ return Status::NotSupported("Not supported in compacted db mode.");
+ }
+ using DBImpl::Delete;
+ virtual Status Delete(const WriteOptions& options,
+ ColumnFamilyHandle* column_family,
+ const Slice& key) override {
+ return Status::NotSupported("Not supported in compacted db mode.");
+ }
+ virtual Status Write(const WriteOptions& options,
+ WriteBatch* updates) override {
+ return Status::NotSupported("Not supported in compacted db mode.");
+ }
+ using DBImpl::CompactRange;
+ virtual Status CompactRange(const CompactRangeOptions& options,
+ ColumnFamilyHandle* column_family,
+ const Slice* begin, const Slice* end) override {
+ return Status::NotSupported("Not supported in compacted db mode.");
+ }
+
+ virtual Status DisableFileDeletions() override {
+ return Status::NotSupported("Not supported in compacted db mode.");
+ }
+ virtual Status EnableFileDeletions(bool force) override {
+ return Status::NotSupported("Not supported in compacted db mode.");
+ }
+ virtual Status GetLiveFiles(std::vector<std::string>&,
+ uint64_t* manifest_file_size,
+ bool flush_memtable = true) override {
+ return Status::NotSupported("Not supported in compacted db mode.");
+ }
+ using DBImpl::Flush;
+ virtual Status Flush(const FlushOptions& options,
+ ColumnFamilyHandle* column_family) override {
+ return Status::NotSupported("Not supported in compacted db mode.");
+ }
+ using DB::IngestExternalFile;
+ virtual Status IngestExternalFile(
+ ColumnFamilyHandle* column_family,
+ const std::vector<std::string>& external_files,
+ const IngestExternalFileOptions& ingestion_options) override {
+ return Status::NotSupported("Not supported in compacted db mode.");
+ }
+
+ private:
+ friend class DB;
+ inline size_t FindFile(const Slice& key);
+ Status Init(const Options& options);
+
+ ColumnFamilyData* cfd_;
+ Version* version_;
+ const Comparator* user_comparator_;
+ LevelFilesBrief files_;
+
+ // No copying allowed
+ CompactedDBImpl(const CompactedDBImpl&);
+ void operator=(const CompactedDBImpl&);
+};
+}
+#endif // ROCKSDB_LITE
|