nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jeremyd...@apache.org
Subject [28/51] [partial] nifi-minifi-cpp git commit: MINIFI-372: Replace leveldb with RocksDB
Date Mon, 09 Oct 2017 16:25:08 GMT
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/column_family.cc
----------------------------------------------------------------------
diff --git a/thirdparty/rocksdb/db/column_family.cc b/thirdparty/rocksdb/db/column_family.cc
new file mode 100644
index 0000000..b00eda0
--- /dev/null
+++ b/thirdparty/rocksdb/db/column_family.cc
@@ -0,0 +1,1137 @@
+//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#include "db/column_family.h"
+
+#ifndef __STDC_FORMAT_MACROS
+#define __STDC_FORMAT_MACROS
+#endif
+
+#include <inttypes.h>
+#include <vector>
+#include <string>
+#include <algorithm>
+#include <limits>
+
+#include "db/compaction_picker.h"
+#include "db/compaction_picker_universal.h"
+#include "db/db_impl.h"
+#include "db/internal_stats.h"
+#include "db/job_context.h"
+#include "db/table_properties_collector.h"
+#include "db/version_set.h"
+#include "db/write_controller.h"
+#include "memtable/hash_skiplist_rep.h"
+#include "monitoring/thread_status_util.h"
+#include "options/options_helper.h"
+#include "table/block_based_table_factory.h"
+#include "util/autovector.h"
+#include "util/compression.h"
+
+namespace rocksdb {
+
+ColumnFamilyHandleImpl::ColumnFamilyHandleImpl(
+    ColumnFamilyData* column_family_data, DBImpl* db, InstrumentedMutex* mutex)
+    : cfd_(column_family_data), db_(db), mutex_(mutex) {
+  if (cfd_ != nullptr) {
+    cfd_->Ref();
+  }
+}
+
+ColumnFamilyHandleImpl::~ColumnFamilyHandleImpl() {
+  if (cfd_ != nullptr) {
+#ifndef ROCKSDB_LITE
+    for (auto& listener : cfd_->ioptions()->listeners) {
+      listener->OnColumnFamilyHandleDeletionStarted(this);
+    }
+#endif  // ROCKSDB_LITE
+    // Job id == 0 means that this is not our background process, but rather
+    // user thread
+    JobContext job_context(0);
+    mutex_->Lock();
+    if (cfd_->Unref()) {
+      delete cfd_;
+    }
+    db_->FindObsoleteFiles(&job_context, false, true);
+    mutex_->Unlock();
+    if (job_context.HaveSomethingToDelete()) {
+      db_->PurgeObsoleteFiles(job_context);
+    }
+    job_context.Clean();
+  }
+}
+
+uint32_t ColumnFamilyHandleImpl::GetID() const { return cfd()->GetID(); }
+
+const std::string& ColumnFamilyHandleImpl::GetName() const {
+  return cfd()->GetName();
+}
+
+Status ColumnFamilyHandleImpl::GetDescriptor(ColumnFamilyDescriptor* desc) {
+#ifndef ROCKSDB_LITE
+  // accessing mutable cf-options requires db mutex.
+  InstrumentedMutexLock l(mutex_);
+  *desc = ColumnFamilyDescriptor(cfd()->GetName(), cfd()->GetLatestCFOptions());
+  return Status::OK();
+#else
+  return Status::NotSupported();
+#endif  // !ROCKSDB_LITE
+}
+
+const Comparator* ColumnFamilyHandleImpl::GetComparator() const {
+  return cfd()->user_comparator();
+}
+
+void GetIntTblPropCollectorFactory(
+    const ImmutableCFOptions& ioptions,
+    std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
+        int_tbl_prop_collector_factories) {
+  auto& collector_factories = ioptions.table_properties_collector_factories;
+  for (size_t i = 0; i < ioptions.table_properties_collector_factories.size();
+       ++i) {
+    assert(collector_factories[i]);
+    int_tbl_prop_collector_factories->emplace_back(
+        new UserKeyTablePropertiesCollectorFactory(collector_factories[i]));
+  }
+  // Add collector to collect internal key statistics
+  int_tbl_prop_collector_factories->emplace_back(
+      new InternalKeyPropertiesCollectorFactory);
+}
+
+Status CheckCompressionSupported(const ColumnFamilyOptions& cf_options) {
+  if (!cf_options.compression_per_level.empty()) {
+    for (size_t level = 0; level < cf_options.compression_per_level.size();
+         ++level) {
+      if (!CompressionTypeSupported(cf_options.compression_per_level[level])) {
+        return Status::InvalidArgument(
+            "Compression type " +
+            CompressionTypeToString(cf_options.compression_per_level[level]) +
+            " is not linked with the binary.");
+      }
+    }
+  } else {
+    if (!CompressionTypeSupported(cf_options.compression)) {
+      return Status::InvalidArgument(
+          "Compression type " +
+          CompressionTypeToString(cf_options.compression) +
+          " is not linked with the binary.");
+    }
+  }
+  return Status::OK();
+}
+
+Status CheckConcurrentWritesSupported(const ColumnFamilyOptions& cf_options) {
+  if (cf_options.inplace_update_support) {
+    return Status::InvalidArgument(
+        "In-place memtable updates (inplace_update_support) is not compatible "
+        "with concurrent writes (allow_concurrent_memtable_write)");
+  }
+  if (!cf_options.memtable_factory->IsInsertConcurrentlySupported()) {
+    return Status::InvalidArgument(
+        "Memtable doesn't concurrent writes (allow_concurrent_memtable_write)");
+  }
+  return Status::OK();
+}
+
+ColumnFamilyOptions SanitizeOptions(const ImmutableDBOptions& db_options,
+                                    const ColumnFamilyOptions& src) {
+  ColumnFamilyOptions result = src;
+  size_t clamp_max = std::conditional<
+      sizeof(size_t) == 4, std::integral_constant<size_t, 0xffffffff>,
+      std::integral_constant<uint64_t, 64ull << 30>>::type::value;
+  ClipToRange(&result.write_buffer_size, ((size_t)64) << 10, clamp_max);
+  // if user sets arena_block_size, we trust user to use this value. Otherwise,
+  // calculate a proper value from writer_buffer_size;
+  if (result.arena_block_size <= 0) {
+    result.arena_block_size = result.write_buffer_size / 8;
+
+    // Align up to 4k
+    const size_t align = 4 * 1024;
+    result.arena_block_size =
+        ((result.arena_block_size + align - 1) / align) * align;
+  }
+  result.min_write_buffer_number_to_merge =
+      std::min(result.min_write_buffer_number_to_merge,
+               result.max_write_buffer_number - 1);
+  if (result.min_write_buffer_number_to_merge < 1) {
+    result.min_write_buffer_number_to_merge = 1;
+  }
+
+  if (result.num_levels < 1) {
+    result.num_levels = 1;
+  }
+  if (result.compaction_style == kCompactionStyleLevel &&
+      result.num_levels < 2) {
+    result.num_levels = 2;
+  }
+
+  if (result.compaction_style == kCompactionStyleUniversal &&
+      db_options.allow_ingest_behind && result.num_levels < 3) {
+    result.num_levels = 3;
+  }
+
+  if (result.max_write_buffer_number < 2) {
+    result.max_write_buffer_number = 2;
+  }
+  if (result.max_write_buffer_number_to_maintain < 0) {
+    result.max_write_buffer_number_to_maintain = result.max_write_buffer_number;
+  }
+  // bloom filter size shouldn't exceed 1/4 of memtable size.
+  if (result.memtable_prefix_bloom_size_ratio > 0.25) {
+    result.memtable_prefix_bloom_size_ratio = 0.25;
+  } else if (result.memtable_prefix_bloom_size_ratio < 0) {
+    result.memtable_prefix_bloom_size_ratio = 0;
+  }
+
+  if (!result.prefix_extractor) {
+    assert(result.memtable_factory);
+    Slice name = result.memtable_factory->Name();
+    if (name.compare("HashSkipListRepFactory") == 0 ||
+        name.compare("HashLinkListRepFactory") == 0) {
+      result.memtable_factory = std::make_shared<SkipListFactory>();
+    }
+  }
+
+  if (result.compaction_style == kCompactionStyleFIFO) {
+    result.num_levels = 1;
+    // since we delete level0 files in FIFO compaction when there are too many
+    // of them, these options don't really mean anything
+    result.level0_slowdown_writes_trigger = std::numeric_limits<int>::max();
+    result.level0_stop_writes_trigger = std::numeric_limits<int>::max();
+  }
+
+  if (result.max_bytes_for_level_multiplier <= 0) {
+    result.max_bytes_for_level_multiplier = 1;
+  }
+
+  if (result.level0_file_num_compaction_trigger == 0) {
+    ROCKS_LOG_WARN(db_options.info_log.get(),
+                   "level0_file_num_compaction_trigger cannot be 0");
+    result.level0_file_num_compaction_trigger = 1;
+  }
+
+  if (result.level0_stop_writes_trigger <
+          result.level0_slowdown_writes_trigger ||
+      result.level0_slowdown_writes_trigger <
+          result.level0_file_num_compaction_trigger) {
+    ROCKS_LOG_WARN(db_options.info_log.get(),
+                   "This condition must be satisfied: "
+                   "level0_stop_writes_trigger(%d) >= "
+                   "level0_slowdown_writes_trigger(%d) >= "
+                   "level0_file_num_compaction_trigger(%d)",
+                   result.level0_stop_writes_trigger,
+                   result.level0_slowdown_writes_trigger,
+                   result.level0_file_num_compaction_trigger);
+    if (result.level0_slowdown_writes_trigger <
+        result.level0_file_num_compaction_trigger) {
+      result.level0_slowdown_writes_trigger =
+          result.level0_file_num_compaction_trigger;
+    }
+    if (result.level0_stop_writes_trigger <
+        result.level0_slowdown_writes_trigger) {
+      result.level0_stop_writes_trigger = result.level0_slowdown_writes_trigger;
+    }
+    ROCKS_LOG_WARN(db_options.info_log.get(),
+                   "Adjust the value to "
+                   "level0_stop_writes_trigger(%d)"
+                   "level0_slowdown_writes_trigger(%d)"
+                   "level0_file_num_compaction_trigger(%d)",
+                   result.level0_stop_writes_trigger,
+                   result.level0_slowdown_writes_trigger,
+                   result.level0_file_num_compaction_trigger);
+  }
+
+  if (result.soft_pending_compaction_bytes_limit == 0) {
+    result.soft_pending_compaction_bytes_limit =
+        result.hard_pending_compaction_bytes_limit;
+  } else if (result.hard_pending_compaction_bytes_limit > 0 &&
+             result.soft_pending_compaction_bytes_limit >
+                 result.hard_pending_compaction_bytes_limit) {
+    result.soft_pending_compaction_bytes_limit =
+        result.hard_pending_compaction_bytes_limit;
+  }
+
+  if (result.level_compaction_dynamic_level_bytes) {
+    if (result.compaction_style != kCompactionStyleLevel ||
+        db_options.db_paths.size() > 1U) {
+      // 1. level_compaction_dynamic_level_bytes only makes sense for
+      //    level-based compaction.
+      // 2. we don't yet know how to make both of this feature and multiple
+      //    DB path work.
+      result.level_compaction_dynamic_level_bytes = false;
+    }
+  }
+
+  if (result.max_compaction_bytes == 0) {
+    result.max_compaction_bytes = result.target_file_size_base * 25;
+  }
+
+  return result;
+}
+
+int SuperVersion::dummy = 0;
+void* const SuperVersion::kSVInUse = &SuperVersion::dummy;
+void* const SuperVersion::kSVObsolete = nullptr;
+
+SuperVersion::~SuperVersion() {
+  for (auto td : to_delete) {
+    delete td;
+  }
+}
+
+SuperVersion* SuperVersion::Ref() {
+  refs.fetch_add(1, std::memory_order_relaxed);
+  return this;
+}
+
+bool SuperVersion::Unref() {
+  // fetch_sub returns the previous value of ref
+  uint32_t previous_refs = refs.fetch_sub(1);
+  assert(previous_refs > 0);
+  return previous_refs == 1;
+}
+
+void SuperVersion::Cleanup() {
+  assert(refs.load(std::memory_order_relaxed) == 0);
+  imm->Unref(&to_delete);
+  MemTable* m = mem->Unref();
+  if (m != nullptr) {
+    auto* memory_usage = current->cfd()->imm()->current_memory_usage();
+    assert(*memory_usage >= m->ApproximateMemoryUsage());
+    *memory_usage -= m->ApproximateMemoryUsage();
+    to_delete.push_back(m);
+  }
+  current->Unref();
+}
+
+void SuperVersion::Init(MemTable* new_mem, MemTableListVersion* new_imm,
+                        Version* new_current) {
+  mem = new_mem;
+  imm = new_imm;
+  current = new_current;
+  mem->Ref();
+  imm->Ref();
+  current->Ref();
+  refs.store(1, std::memory_order_relaxed);
+}
+
+namespace {
+void SuperVersionUnrefHandle(void* ptr) {
+  // UnrefHandle is called when a thread exists or a ThreadLocalPtr gets
+  // destroyed. When former happens, the thread shouldn't see kSVInUse.
+  // When latter happens, we are in ~ColumnFamilyData(), no get should happen as
+  // well.
+  SuperVersion* sv = static_cast<SuperVersion*>(ptr);
+  if (sv->Unref()) {
+    sv->db_mutex->Lock();
+    sv->Cleanup();
+    sv->db_mutex->Unlock();
+    delete sv;
+  }
+}
+}  // anonymous namespace
+
+ColumnFamilyData::ColumnFamilyData(
+    uint32_t id, const std::string& name, Version* _dummy_versions,
+    Cache* _table_cache, WriteBufferManager* write_buffer_manager,
+    const ColumnFamilyOptions& cf_options, const ImmutableDBOptions& db_options,
+    const EnvOptions& env_options, ColumnFamilySet* column_family_set)
+    : id_(id),
+      name_(name),
+      dummy_versions_(_dummy_versions),
+      current_(nullptr),
+      refs_(0),
+      initialized_(false),
+      dropped_(false),
+      internal_comparator_(cf_options.comparator),
+      initial_cf_options_(SanitizeOptions(db_options, cf_options)),
+      ioptions_(db_options, initial_cf_options_),
+      mutable_cf_options_(initial_cf_options_),
+      is_delete_range_supported_(
+          cf_options.table_factory->IsDeleteRangeSupported()),
+      write_buffer_manager_(write_buffer_manager),
+      mem_(nullptr),
+      imm_(ioptions_.min_write_buffer_number_to_merge,
+           ioptions_.max_write_buffer_number_to_maintain),
+      super_version_(nullptr),
+      super_version_number_(0),
+      local_sv_(new ThreadLocalPtr(&SuperVersionUnrefHandle)),
+      next_(nullptr),
+      prev_(nullptr),
+      log_number_(0),
+      column_family_set_(column_family_set),
+      pending_flush_(false),
+      pending_compaction_(false),
+      prev_compaction_needed_bytes_(0),
+      allow_2pc_(db_options.allow_2pc) {
+  Ref();
+
+  // Convert user defined table properties collector factories to internal ones.
+  GetIntTblPropCollectorFactory(ioptions_, &int_tbl_prop_collector_factories_);
+
+  // if _dummy_versions is nullptr, then this is a dummy column family.
+  if (_dummy_versions != nullptr) {
+    internal_stats_.reset(
+        new InternalStats(ioptions_.num_levels, db_options.env, this));
+    table_cache_.reset(new TableCache(ioptions_, env_options, _table_cache));
+    if (ioptions_.compaction_style == kCompactionStyleLevel) {
+      compaction_picker_.reset(
+          new LevelCompactionPicker(ioptions_, &internal_comparator_));
+#ifndef ROCKSDB_LITE
+    } else if (ioptions_.compaction_style == kCompactionStyleUniversal) {
+      compaction_picker_.reset(
+          new UniversalCompactionPicker(ioptions_, &internal_comparator_));
+    } else if (ioptions_.compaction_style == kCompactionStyleFIFO) {
+      compaction_picker_.reset(
+          new FIFOCompactionPicker(ioptions_, &internal_comparator_));
+    } else if (ioptions_.compaction_style == kCompactionStyleNone) {
+      compaction_picker_.reset(new NullCompactionPicker(
+          ioptions_, &internal_comparator_));
+      ROCKS_LOG_WARN(ioptions_.info_log,
+                     "Column family %s does not use any background compaction. "
+                     "Compactions can only be done via CompactFiles\n",
+                     GetName().c_str());
+#endif  // !ROCKSDB_LITE
+    } else {
+      ROCKS_LOG_ERROR(ioptions_.info_log,
+                      "Unable to recognize the specified compaction style %d. "
+                      "Column family %s will use kCompactionStyleLevel.\n",
+                      ioptions_.compaction_style, GetName().c_str());
+      compaction_picker_.reset(
+          new LevelCompactionPicker(ioptions_, &internal_comparator_));
+    }
+
+    if (column_family_set_->NumberOfColumnFamilies() < 10) {
+      ROCKS_LOG_INFO(ioptions_.info_log,
+                     "--------------- Options for column family [%s]:\n",
+                     name.c_str());
+      initial_cf_options_.Dump(ioptions_.info_log);
+    } else {
+      ROCKS_LOG_INFO(ioptions_.info_log, "\t(skipping printing options)\n");
+    }
+  }
+
+  RecalculateWriteStallConditions(mutable_cf_options_);
+}
+
+// DB mutex held
+ColumnFamilyData::~ColumnFamilyData() {
+  assert(refs_.load(std::memory_order_relaxed) == 0);
+  // remove from linked list
+  auto prev = prev_;
+  auto next = next_;
+  prev->next_ = next;
+  next->prev_ = prev;
+
+  if (!dropped_ && column_family_set_ != nullptr) {
+    // If it's dropped, it's already removed from column family set
+    // If column_family_set_ == nullptr, this is dummy CFD and not in
+    // ColumnFamilySet
+    column_family_set_->RemoveColumnFamily(this);
+  }
+
+  if (current_ != nullptr) {
+    current_->Unref();
+  }
+
+  // It would be wrong if this ColumnFamilyData is in flush_queue_ or
+  // compaction_queue_ and we destroyed it
+  assert(!pending_flush_);
+  assert(!pending_compaction_);
+
+  if (super_version_ != nullptr) {
+    // Release SuperVersion reference kept in ThreadLocalPtr.
+    // This must be done outside of mutex_ since unref handler can lock mutex.
+    super_version_->db_mutex->Unlock();
+    local_sv_.reset();
+    super_version_->db_mutex->Lock();
+
+    bool is_last_reference __attribute__((unused));
+    is_last_reference = super_version_->Unref();
+    assert(is_last_reference);
+    super_version_->Cleanup();
+    delete super_version_;
+    super_version_ = nullptr;
+  }
+
+  if (dummy_versions_ != nullptr) {
+    // List must be empty
+    assert(dummy_versions_->TEST_Next() == dummy_versions_);
+    bool deleted __attribute__((unused)) = dummy_versions_->Unref();
+    assert(deleted);
+  }
+
+  if (mem_ != nullptr) {
+    delete mem_->Unref();
+  }
+  autovector<MemTable*> to_delete;
+  imm_.current()->Unref(&to_delete);
+  for (MemTable* m : to_delete) {
+    delete m;
+  }
+}
+
+void ColumnFamilyData::SetDropped() {
+  // can't drop default CF
+  assert(id_ != 0);
+  dropped_ = true;
+  write_controller_token_.reset();
+
+  // remove from column_family_set
+  column_family_set_->RemoveColumnFamily(this);
+}
+
+ColumnFamilyOptions ColumnFamilyData::GetLatestCFOptions() const {
+  return BuildColumnFamilyOptions(initial_cf_options_, mutable_cf_options_);
+}
+
+uint64_t ColumnFamilyData::OldestLogToKeep() {
+  auto current_log = GetLogNumber();
+
+  if (allow_2pc_) {
+    auto imm_prep_log = imm()->GetMinLogContainingPrepSection();
+    auto mem_prep_log = mem()->GetMinLogContainingPrepSection();
+
+    if (imm_prep_log > 0 && imm_prep_log < current_log) {
+      current_log = imm_prep_log;
+    }
+
+    if (mem_prep_log > 0 && mem_prep_log < current_log) {
+      current_log = mem_prep_log;
+    }
+  }
+
+  return current_log;
+}
+
+const double kIncSlowdownRatio = 0.8;
+const double kDecSlowdownRatio = 1 / kIncSlowdownRatio;
+const double kNearStopSlowdownRatio = 0.6;
+const double kDelayRecoverSlowdownRatio = 1.4;
+
+namespace {
+// If penalize_stop is true, we further reduce slowdown rate.
+std::unique_ptr<WriteControllerToken> SetupDelay(
+    WriteController* write_controller, uint64_t compaction_needed_bytes,
+    uint64_t prev_compaction_need_bytes, bool penalize_stop,
+    bool auto_comapctions_disabled) {
+  const uint64_t kMinWriteRate = 16 * 1024u;  // Minimum write rate 16KB/s.
+
+  uint64_t max_write_rate = write_controller->max_delayed_write_rate();
+  uint64_t write_rate = write_controller->delayed_write_rate();
+
+  if (auto_comapctions_disabled) {
+    // When auto compaction is disabled, always use the value user gave.
+    write_rate = max_write_rate;
+  } else if (write_controller->NeedsDelay() && max_write_rate > kMinWriteRate) {
+    // If user gives rate less than kMinWriteRate, don't adjust it.
+    //
+    // If already delayed, need to adjust based on previous compaction debt.
+    // When there are two or more column families require delay, we always
+    // increase or reduce write rate based on information for one single
+    // column family. It is likely to be OK but we can improve if there is a
+    // problem.
+    // Ignore compaction_needed_bytes = 0 case because compaction_needed_bytes
+    // is only available in level-based compaction
+    //
+    // If the compaction debt stays the same as previously, we also further slow
+    // down. It usually means a mem table is full. It's mainly for the case
+    // where both of flush and compaction are much slower than the speed we
+    // insert to mem tables, so we need to actively slow down before we get
+    // feedback signal from compaction and flushes to avoid the full stop
+    // because of hitting the max write buffer number.
+    //
+    // If DB just falled into the stop condition, we need to further reduce
+    // the write rate to avoid the stop condition.
+    if (penalize_stop) {
+      // Penalize the near stop or stop condition by more aggressive slowdown.
+      // This is to provide the long term slowdown increase signal.
+      // The penalty is more than the reward of recovering to the normal
+      // condition.
+      write_rate = static_cast<uint64_t>(static_cast<double>(write_rate) *
+                                         kNearStopSlowdownRatio);
+      if (write_rate < kMinWriteRate) {
+        write_rate = kMinWriteRate;
+      }
+    } else if (prev_compaction_need_bytes > 0 &&
+               prev_compaction_need_bytes <= compaction_needed_bytes) {
+      write_rate = static_cast<uint64_t>(static_cast<double>(write_rate) *
+                                         kIncSlowdownRatio);
+      if (write_rate < kMinWriteRate) {
+        write_rate = kMinWriteRate;
+      }
+    } else if (prev_compaction_need_bytes > compaction_needed_bytes) {
+      // We are speeding up by ratio of kSlowdownRatio when we have paid
+      // compaction debt. But we'll never speed up to faster than the write rate
+      // given by users.
+      write_rate = static_cast<uint64_t>(static_cast<double>(write_rate) *
+                                         kDecSlowdownRatio);
+      if (write_rate > max_write_rate) {
+        write_rate = max_write_rate;
+      }
+    }
+  }
+  return write_controller->GetDelayToken(write_rate);
+}
+
+int GetL0ThresholdSpeedupCompaction(int level0_file_num_compaction_trigger,
+                                    int level0_slowdown_writes_trigger) {
+  // SanitizeOptions() ensures it.
+  assert(level0_file_num_compaction_trigger <= level0_slowdown_writes_trigger);
+
+  if (level0_file_num_compaction_trigger < 0) {
+    return std::numeric_limits<int>::max();
+  }
+
+  const int64_t twice_level0_trigger =
+      static_cast<int64_t>(level0_file_num_compaction_trigger) * 2;
+
+  const int64_t one_fourth_trigger_slowdown =
+      static_cast<int64_t>(level0_file_num_compaction_trigger) +
+      ((level0_slowdown_writes_trigger - level0_file_num_compaction_trigger) /
+       4);
+
+  assert(twice_level0_trigger >= 0);
+  assert(one_fourth_trigger_slowdown >= 0);
+
+  // 1/4 of the way between L0 compaction trigger threshold and slowdown
+  // condition.
+  // Or twice as compaction trigger, if it is smaller.
+  int64_t res = std::min(twice_level0_trigger, one_fourth_trigger_slowdown);
+  if (res >= port::kMaxInt32) {
+    return port::kMaxInt32;
+  } else {
+    // res fits in int
+    return static_cast<int>(res);
+  }
+}
+}  // namespace
+
+void ColumnFamilyData::RecalculateWriteStallConditions(
+      const MutableCFOptions& mutable_cf_options) {
+  if (current_ != nullptr) {
+    auto* vstorage = current_->storage_info();
+    auto write_controller = column_family_set_->write_controller_;
+    uint64_t compaction_needed_bytes =
+        vstorage->estimated_compaction_needed_bytes();
+
+    bool was_stopped = write_controller->IsStopped();
+    bool needed_delay = write_controller->NeedsDelay();
+
+    if (imm()->NumNotFlushed() >= mutable_cf_options.max_write_buffer_number) {
+      write_controller_token_ = write_controller->GetStopToken();
+      internal_stats_->AddCFStats(InternalStats::MEMTABLE_COMPACTION, 1);
+      ROCKS_LOG_WARN(
+          ioptions_.info_log,
+          "[%s] Stopping writes because we have %d immutable memtables "
+          "(waiting for flush), max_write_buffer_number is set to %d",
+          name_.c_str(), imm()->NumNotFlushed(),
+          mutable_cf_options.max_write_buffer_number);
+    } else if (!mutable_cf_options.disable_auto_compactions &&
+               vstorage->l0_delay_trigger_count() >=
+                   mutable_cf_options.level0_stop_writes_trigger) {
+      write_controller_token_ = write_controller->GetStopToken();
+      internal_stats_->AddCFStats(InternalStats::LEVEL0_NUM_FILES_TOTAL, 1);
+      if (compaction_picker_->IsLevel0CompactionInProgress()) {
+        internal_stats_->AddCFStats(
+            InternalStats::LEVEL0_NUM_FILES_WITH_COMPACTION, 1);
+      }
+      ROCKS_LOG_WARN(ioptions_.info_log,
+                     "[%s] Stopping writes because we have %d level-0 files",
+                     name_.c_str(), vstorage->l0_delay_trigger_count());
+    } else if (!mutable_cf_options.disable_auto_compactions &&
+               mutable_cf_options.hard_pending_compaction_bytes_limit > 0 &&
+               compaction_needed_bytes >=
+                   mutable_cf_options.hard_pending_compaction_bytes_limit) {
+      write_controller_token_ = write_controller->GetStopToken();
+      internal_stats_->AddCFStats(
+          InternalStats::HARD_PENDING_COMPACTION_BYTES_LIMIT, 1);
+      ROCKS_LOG_WARN(
+          ioptions_.info_log,
+          "[%s] Stopping writes because of estimated pending compaction "
+          "bytes %" PRIu64,
+          name_.c_str(), compaction_needed_bytes);
+    } else if (mutable_cf_options.max_write_buffer_number > 3 &&
+               imm()->NumNotFlushed() >=
+                   mutable_cf_options.max_write_buffer_number - 1) {
+      write_controller_token_ =
+          SetupDelay(write_controller, compaction_needed_bytes,
+                     prev_compaction_needed_bytes_, was_stopped,
+                     mutable_cf_options.disable_auto_compactions);
+      internal_stats_->AddCFStats(InternalStats::MEMTABLE_SLOWDOWN, 1);
+      ROCKS_LOG_WARN(
+          ioptions_.info_log,
+          "[%s] Stalling writes because we have %d immutable memtables "
+          "(waiting for flush), max_write_buffer_number is set to %d "
+          "rate %" PRIu64,
+          name_.c_str(), imm()->NumNotFlushed(),
+          mutable_cf_options.max_write_buffer_number,
+          write_controller->delayed_write_rate());
+    } else if (!mutable_cf_options.disable_auto_compactions &&
+               mutable_cf_options.level0_slowdown_writes_trigger >= 0 &&
+               vstorage->l0_delay_trigger_count() >=
+                   mutable_cf_options.level0_slowdown_writes_trigger) {
+      // L0 is the last two files from stopping.
+      bool near_stop = vstorage->l0_delay_trigger_count() >=
+                       mutable_cf_options.level0_stop_writes_trigger - 2;
+      write_controller_token_ =
+          SetupDelay(write_controller, compaction_needed_bytes,
+                     prev_compaction_needed_bytes_, was_stopped || near_stop,
+                     mutable_cf_options.disable_auto_compactions);
+      internal_stats_->AddCFStats(InternalStats::LEVEL0_SLOWDOWN_TOTAL, 1);
+      if (compaction_picker_->IsLevel0CompactionInProgress()) {
+        internal_stats_->AddCFStats(
+            InternalStats::LEVEL0_SLOWDOWN_WITH_COMPACTION, 1);
+      }
+      ROCKS_LOG_WARN(ioptions_.info_log,
+                     "[%s] Stalling writes because we have %d level-0 files "
+                     "rate %" PRIu64,
+                     name_.c_str(), vstorage->l0_delay_trigger_count(),
+                     write_controller->delayed_write_rate());
+    } else if (!mutable_cf_options.disable_auto_compactions &&
+               mutable_cf_options.soft_pending_compaction_bytes_limit > 0 &&
+               vstorage->estimated_compaction_needed_bytes() >=
+                   mutable_cf_options.soft_pending_compaction_bytes_limit) {
+      // If the distance to hard limit is less than 1/4 of the gap between soft
+      // and
+      // hard bytes limit, we think it is near stop and speed up the slowdown.
+      bool near_stop =
+          mutable_cf_options.hard_pending_compaction_bytes_limit > 0 &&
+          (compaction_needed_bytes -
+           mutable_cf_options.soft_pending_compaction_bytes_limit) >
+              3 * (mutable_cf_options.hard_pending_compaction_bytes_limit -
+                   mutable_cf_options.soft_pending_compaction_bytes_limit) /
+                  4;
+
+      write_controller_token_ =
+          SetupDelay(write_controller, compaction_needed_bytes,
+                     prev_compaction_needed_bytes_, was_stopped || near_stop,
+                     mutable_cf_options.disable_auto_compactions);
+      internal_stats_->AddCFStats(
+          InternalStats::SOFT_PENDING_COMPACTION_BYTES_LIMIT, 1);
+      ROCKS_LOG_WARN(
+          ioptions_.info_log,
+          "[%s] Stalling writes because of estimated pending compaction "
+          "bytes %" PRIu64 " rate %" PRIu64,
+          name_.c_str(), vstorage->estimated_compaction_needed_bytes(),
+          write_controller->delayed_write_rate());
+    } else {
+      if (vstorage->l0_delay_trigger_count() >=
+          GetL0ThresholdSpeedupCompaction(
+              mutable_cf_options.level0_file_num_compaction_trigger,
+              mutable_cf_options.level0_slowdown_writes_trigger)) {
+        write_controller_token_ =
+            write_controller->GetCompactionPressureToken();
+        ROCKS_LOG_INFO(
+            ioptions_.info_log,
+            "[%s] Increasing compaction threads because we have %d level-0 "
+            "files ",
+            name_.c_str(), vstorage->l0_delay_trigger_count());
+      } else if (vstorage->estimated_compaction_needed_bytes() >=
+                 mutable_cf_options.soft_pending_compaction_bytes_limit / 4) {
+        // Increase compaction threads if bytes needed for compaction exceeds
+        // 1/4 of threshold for slowing down.
+        // If soft pending compaction byte limit is not set, always speed up
+        // compaction.
+        write_controller_token_ =
+            write_controller->GetCompactionPressureToken();
+        if (mutable_cf_options.soft_pending_compaction_bytes_limit > 0) {
+          ROCKS_LOG_INFO(
+              ioptions_.info_log,
+              "[%s] Increasing compaction threads because of estimated pending "
+              "compaction "
+              "bytes %" PRIu64,
+              name_.c_str(), vstorage->estimated_compaction_needed_bytes());
+        }
+      } else {
+        write_controller_token_.reset();
+      }
+      // If the DB recovers from delay conditions, we reward with reducing
+      // double the slowdown ratio. This is to balance the long term slowdown
+      // increase signal.
+      if (needed_delay) {
+        uint64_t write_rate = write_controller->delayed_write_rate();
+        write_controller->set_delayed_write_rate(static_cast<uint64_t>(
+            static_cast<double>(write_rate) * kDelayRecoverSlowdownRatio));
+        // Set the low pri limit to be 1/4 the delayed write rate.
+        // Note we don't reset this value even after delay condition is relased.
+        // Low-pri rate will continue to apply if there is a compaction
+        // pressure.
+        write_controller->low_pri_rate_limiter()->SetBytesPerSecond(write_rate /
+                                                                    4);
+      }
+    }
+    prev_compaction_needed_bytes_ = compaction_needed_bytes;
+  }
+}
+
+const EnvOptions* ColumnFamilyData::soptions() const {
+  return &(column_family_set_->env_options_);
+}
+
+void ColumnFamilyData::SetCurrent(Version* current_version) {
+  current_ = current_version;
+}
+
+uint64_t ColumnFamilyData::GetNumLiveVersions() const {
+  return VersionSet::GetNumLiveVersions(dummy_versions_);
+}
+
+uint64_t ColumnFamilyData::GetTotalSstFilesSize() const {
+  return VersionSet::GetTotalSstFilesSize(dummy_versions_);
+}
+
+MemTable* ColumnFamilyData::ConstructNewMemtable(
+    const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq) {
+  return new MemTable(internal_comparator_, ioptions_, mutable_cf_options,
+                      write_buffer_manager_, earliest_seq, id_);
+}
+
+void ColumnFamilyData::CreateNewMemtable(
+    const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq) {
+  if (mem_ != nullptr) {
+    delete mem_->Unref();
+  }
+  SetMemtable(ConstructNewMemtable(mutable_cf_options, earliest_seq));
+  mem_->Ref();
+}
+
+bool ColumnFamilyData::NeedsCompaction() const {
+  return compaction_picker_->NeedsCompaction(current_->storage_info());
+}
+
+Compaction* ColumnFamilyData::PickCompaction(
+    const MutableCFOptions& mutable_options, LogBuffer* log_buffer) {
+  auto* result = compaction_picker_->PickCompaction(
+      GetName(), mutable_options, current_->storage_info(), log_buffer);
+  if (result != nullptr) {
+    result->SetInputVersion(current_);
+  }
+  return result;
+}
+
+bool ColumnFamilyData::RangeOverlapWithCompaction(
+    const Slice& smallest_user_key, const Slice& largest_user_key,
+    int level) const {
+  return compaction_picker_->RangeOverlapWithCompaction(
+      smallest_user_key, largest_user_key, level);
+}
+
+const int ColumnFamilyData::kCompactAllLevels = -1;
+const int ColumnFamilyData::kCompactToBaseLevel = -2;
+
+Compaction* ColumnFamilyData::CompactRange(
+    const MutableCFOptions& mutable_cf_options, int input_level,
+    int output_level, uint32_t output_path_id, const InternalKey* begin,
+    const InternalKey* end, InternalKey** compaction_end, bool* conflict) {
+  auto* result = compaction_picker_->CompactRange(
+      GetName(), mutable_cf_options, current_->storage_info(), input_level,
+      output_level, output_path_id, begin, end, compaction_end, conflict);
+  if (result != nullptr) {
+    result->SetInputVersion(current_);
+  }
+  return result;
+}
+
+SuperVersion* ColumnFamilyData::GetReferencedSuperVersion(
+    InstrumentedMutex* db_mutex) {
+  SuperVersion* sv = nullptr;
+  sv = GetThreadLocalSuperVersion(db_mutex);
+  sv->Ref();
+  if (!ReturnThreadLocalSuperVersion(sv)) {
+    sv->Unref();
+  }
+  return sv;
+}
+
+SuperVersion* ColumnFamilyData::GetThreadLocalSuperVersion(
+    InstrumentedMutex* db_mutex) {
+  SuperVersion* sv = nullptr;
+  // The SuperVersion is cached in thread local storage to avoid acquiring
+  // mutex when SuperVersion does not change since the last use. When a new
+  // SuperVersion is installed, the compaction or flush thread cleans up
+  // cached SuperVersion in all existing thread local storage. To avoid
+  // acquiring mutex for this operation, we use atomic Swap() on the thread
+  // local pointer to guarantee exclusive access. If the thread local pointer
+  // is being used while a new SuperVersion is installed, the cached
+  // SuperVersion can become stale. In that case, the background thread would
+  // have swapped in kSVObsolete. We re-check the value at when returning
+  // SuperVersion back to thread local, with an atomic compare and swap.
+  // The superversion will need to be released if detected to be stale.
+  void* ptr = local_sv_->Swap(SuperVersion::kSVInUse);
+  // Invariant:
+  // (1) Scrape (always) installs kSVObsolete in ThreadLocal storage
+  // (2) the Swap above (always) installs kSVInUse, ThreadLocal storage
+  // should only keep kSVInUse before ReturnThreadLocalSuperVersion call
+  // (if no Scrape happens).
+  assert(ptr != SuperVersion::kSVInUse);
+  sv = static_cast<SuperVersion*>(ptr);
+  if (sv == SuperVersion::kSVObsolete ||
+      sv->version_number != super_version_number_.load()) {
+    RecordTick(ioptions_.statistics, NUMBER_SUPERVERSION_ACQUIRES);
+    SuperVersion* sv_to_delete = nullptr;
+
+    if (sv && sv->Unref()) {
+      RecordTick(ioptions_.statistics, NUMBER_SUPERVERSION_CLEANUPS);
+      db_mutex->Lock();
+      // NOTE: underlying resources held by superversion (sst files) might
+      // not be released until the next background job.
+      sv->Cleanup();
+      sv_to_delete = sv;
+    } else {
+      db_mutex->Lock();
+    }
+    sv = super_version_->Ref();
+    db_mutex->Unlock();
+
+    delete sv_to_delete;
+  }
+  assert(sv != nullptr);
+  return sv;
+}
+
+bool ColumnFamilyData::ReturnThreadLocalSuperVersion(SuperVersion* sv) {
+  assert(sv != nullptr);
+  // Put the SuperVersion back
+  void* expected = SuperVersion::kSVInUse;
+  if (local_sv_->CompareAndSwap(static_cast<void*>(sv), expected)) {
+    // When we see kSVInUse in the ThreadLocal, we are sure ThreadLocal
+    // storage has not been altered and no Scrape has happened. The
+    // SuperVersion is still current.
+    return true;
+  } else {
+    // ThreadLocal scrape happened in the process of this GetImpl call (after
+    // thread local Swap() at the beginning and before CompareAndSwap()).
+    // This means the SuperVersion it holds is obsolete.
+    assert(expected == SuperVersion::kSVObsolete);
+  }
+  return false;
+}
+
+SuperVersion* ColumnFamilyData::InstallSuperVersion(
+    SuperVersion* new_superversion, InstrumentedMutex* db_mutex) {
+  db_mutex->AssertHeld();
+  return InstallSuperVersion(new_superversion, db_mutex, mutable_cf_options_);
+}
+
+SuperVersion* ColumnFamilyData::InstallSuperVersion(
+    SuperVersion* new_superversion, InstrumentedMutex* db_mutex,
+    const MutableCFOptions& mutable_cf_options) {
+  new_superversion->db_mutex = db_mutex;
+  new_superversion->mutable_cf_options = mutable_cf_options;
+  new_superversion->Init(mem_, imm_.current(), current_);
+  SuperVersion* old_superversion = super_version_;
+  super_version_ = new_superversion;
+  ++super_version_number_;
+  super_version_->version_number = super_version_number_;
+  // Reset SuperVersions cached in thread local storage
+  ResetThreadLocalSuperVersions();
+
+  RecalculateWriteStallConditions(mutable_cf_options);
+
+  if (old_superversion != nullptr && old_superversion->Unref()) {
+    old_superversion->Cleanup();
+    return old_superversion;  // will let caller delete outside of mutex
+  }
+  return nullptr;
+}
+
+void ColumnFamilyData::ResetThreadLocalSuperVersions() {
+  autovector<void*> sv_ptrs;
+  local_sv_->Scrape(&sv_ptrs, SuperVersion::kSVObsolete);
+  for (auto ptr : sv_ptrs) {
+    assert(ptr);
+    if (ptr == SuperVersion::kSVInUse) {
+      continue;
+    }
+    auto sv = static_cast<SuperVersion*>(ptr);
+    if (sv->Unref()) {
+      sv->Cleanup();
+      delete sv;
+    }
+  }
+}
+
+#ifndef ROCKSDB_LITE
+Status ColumnFamilyData::SetOptions(
+      const std::unordered_map<std::string, std::string>& options_map) {
+  MutableCFOptions new_mutable_cf_options;
+  Status s = GetMutableOptionsFromStrings(mutable_cf_options_, options_map,
+                                          &new_mutable_cf_options);
+  if (s.ok()) {
+    mutable_cf_options_ = new_mutable_cf_options;
+    mutable_cf_options_.RefreshDerivedOptions(ioptions_);
+  }
+  return s;
+}
+#endif  // ROCKSDB_LITE
+
+ColumnFamilySet::ColumnFamilySet(const std::string& dbname,
+                                 const ImmutableDBOptions* db_options,
+                                 const EnvOptions& env_options,
+                                 Cache* table_cache,
+                                 WriteBufferManager* write_buffer_manager,
+                                 WriteController* write_controller)
+    : max_column_family_(0),
+      dummy_cfd_(new ColumnFamilyData(0, "", nullptr, nullptr, nullptr,
+                                      ColumnFamilyOptions(), *db_options,
+                                      env_options, nullptr)),
+      default_cfd_cache_(nullptr),
+      db_name_(dbname),
+      db_options_(db_options),
+      env_options_(env_options),
+      table_cache_(table_cache),
+      write_buffer_manager_(write_buffer_manager),
+      write_controller_(write_controller) {
+  // initialize linked list
+  dummy_cfd_->prev_ = dummy_cfd_;
+  dummy_cfd_->next_ = dummy_cfd_;
+}
+
+ColumnFamilySet::~ColumnFamilySet() {
+  while (column_family_data_.size() > 0) {
+    // cfd destructor will delete itself from column_family_data_
+    auto cfd = column_family_data_.begin()->second;
+    cfd->Unref();
+    delete cfd;
+  }
+  dummy_cfd_->Unref();
+  delete dummy_cfd_;
+}
+
+ColumnFamilyData* ColumnFamilySet::GetDefault() const {
+  assert(default_cfd_cache_ != nullptr);
+  return default_cfd_cache_;
+}
+
+ColumnFamilyData* ColumnFamilySet::GetColumnFamily(uint32_t id) const {
+  auto cfd_iter = column_family_data_.find(id);
+  if (cfd_iter != column_family_data_.end()) {
+    return cfd_iter->second;
+  } else {
+    return nullptr;
+  }
+}
+
+ColumnFamilyData* ColumnFamilySet::GetColumnFamily(const std::string& name)
+    const {
+  auto cfd_iter = column_families_.find(name);
+  if (cfd_iter != column_families_.end()) {
+    auto cfd = GetColumnFamily(cfd_iter->second);
+    assert(cfd != nullptr);
+    return cfd;
+  } else {
+    return nullptr;
+  }
+}
+
+uint32_t ColumnFamilySet::GetNextColumnFamilyID() {
+  return ++max_column_family_;
+}
+
+uint32_t ColumnFamilySet::GetMaxColumnFamily() { return max_column_family_; }
+
+void ColumnFamilySet::UpdateMaxColumnFamily(uint32_t new_max_column_family) {
+  max_column_family_ = std::max(new_max_column_family, max_column_family_);
+}
+
+size_t ColumnFamilySet::NumberOfColumnFamilies() const {
+  return column_families_.size();
+}
+
+// under a DB mutex AND write thread
+ColumnFamilyData* ColumnFamilySet::CreateColumnFamily(
+    const std::string& name, uint32_t id, Version* dummy_versions,
+    const ColumnFamilyOptions& options) {
+  assert(column_families_.find(name) == column_families_.end());
+  ColumnFamilyData* new_cfd = new ColumnFamilyData(
+      id, name, dummy_versions, table_cache_, write_buffer_manager_, options,
+      *db_options_, env_options_, this);
+  column_families_.insert({name, id});
+  column_family_data_.insert({id, new_cfd});
+  max_column_family_ = std::max(max_column_family_, id);
+  // add to linked list
+  new_cfd->next_ = dummy_cfd_;
+  auto prev = dummy_cfd_->prev_;
+  new_cfd->prev_ = prev;
+  prev->next_ = new_cfd;
+  dummy_cfd_->prev_ = new_cfd;
+  if (id == 0) {
+    default_cfd_cache_ = new_cfd;
+  }
+  return new_cfd;
+}
+
+// REQUIRES: DB mutex held
+void ColumnFamilySet::FreeDeadColumnFamilies() {
+  autovector<ColumnFamilyData*> to_delete;
+  for (auto cfd = dummy_cfd_->next_; cfd != dummy_cfd_; cfd = cfd->next_) {
+    if (cfd->refs_.load(std::memory_order_relaxed) == 0) {
+      to_delete.push_back(cfd);
+    }
+  }
+  for (auto cfd : to_delete) {
+    // this is very rare, so it's not a problem that we do it under a mutex
+    delete cfd;
+  }
+}
+
+// under a DB mutex AND from a write thread
+void ColumnFamilySet::RemoveColumnFamily(ColumnFamilyData* cfd) {
+  auto cfd_iter = column_family_data_.find(cfd->GetID());
+  assert(cfd_iter != column_family_data_.end());
+  column_family_data_.erase(cfd_iter);
+  column_families_.erase(cfd->GetName());
+}
+
+// under a DB mutex OR from a write thread
+bool ColumnFamilyMemTablesImpl::Seek(uint32_t column_family_id) {
+  if (column_family_id == 0) {
+    // optimization for common case
+    current_ = column_family_set_->GetDefault();
+  } else {
+    current_ = column_family_set_->GetColumnFamily(column_family_id);
+  }
+  handle_.SetCFD(current_);
+  return current_ != nullptr;
+}
+
+uint64_t ColumnFamilyMemTablesImpl::GetLogNumber() const {
+  assert(current_ != nullptr);
+  return current_->GetLogNumber();
+}
+
+MemTable* ColumnFamilyMemTablesImpl::GetMemTable() const {
+  assert(current_ != nullptr);
+  return current_->mem();
+}
+
+ColumnFamilyHandle* ColumnFamilyMemTablesImpl::GetColumnFamilyHandle() {
+  assert(current_ != nullptr);
+  return &handle_;
+}
+
+uint32_t GetColumnFamilyID(ColumnFamilyHandle* column_family) {
+  uint32_t column_family_id = 0;
+  if (column_family != nullptr) {
+    auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
+    column_family_id = cfh->GetID();
+  }
+  return column_family_id;
+}
+
+const Comparator* GetColumnFamilyUserComparator(
+    ColumnFamilyHandle* column_family) {
+  if (column_family != nullptr) {
+    return column_family->GetComparator();
+  }
+  return nullptr;
+}
+
+}  // namespace rocksdb

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/column_family.h
----------------------------------------------------------------------
diff --git a/thirdparty/rocksdb/db/column_family.h b/thirdparty/rocksdb/db/column_family.h
new file mode 100644
index 0000000..3a807d2
--- /dev/null
+++ b/thirdparty/rocksdb/db/column_family.h
@@ -0,0 +1,577 @@
+//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#pragma once
+
+#include <unordered_map>
+#include <string>
+#include <vector>
+#include <atomic>
+
+#include "db/memtable_list.h"
+#include "db/table_cache.h"
+#include "db/table_properties_collector.h"
+#include "db/write_batch_internal.h"
+#include "db/write_controller.h"
+#include "options/cf_options.h"
+#include "rocksdb/compaction_job_stats.h"
+#include "rocksdb/db.h"
+#include "rocksdb/env.h"
+#include "rocksdb/options.h"
+#include "util/thread_local.h"
+
+namespace rocksdb {
+
+class Version;
+class VersionSet;
+class MemTable;
+class MemTableListVersion;
+class CompactionPicker;
+class Compaction;
+class InternalKey;
+class InternalStats;
+class ColumnFamilyData;
+class DBImpl;
+class LogBuffer;
+class InstrumentedMutex;
+class InstrumentedMutexLock;
+
+extern const double kIncSlowdownRatio;
+
+// ColumnFamilyHandleImpl is the class that clients use to access different
+// column families. It has non-trivial destructor, which gets called when client
+// is done using the column family
+class ColumnFamilyHandleImpl : public ColumnFamilyHandle {
+ public:
+  // create while holding the mutex
+  ColumnFamilyHandleImpl(
+      ColumnFamilyData* cfd, DBImpl* db, InstrumentedMutex* mutex);
+  // destroy without mutex
+  virtual ~ColumnFamilyHandleImpl();
+  virtual ColumnFamilyData* cfd() const { return cfd_; }
+
+  virtual uint32_t GetID() const override;
+  virtual const std::string& GetName() const override;
+  virtual Status GetDescriptor(ColumnFamilyDescriptor* desc) override;
+  virtual const Comparator* GetComparator() const override;
+
+ private:
+  ColumnFamilyData* cfd_;
+  DBImpl* db_;
+  InstrumentedMutex* mutex_;
+};
+
+// Does not ref-count ColumnFamilyData
+// We use this dummy ColumnFamilyHandleImpl because sometimes MemTableInserter
+// calls DBImpl methods. When this happens, MemTableInserter need access to
+// ColumnFamilyHandle (same as the client would need). In that case, we feed
+// MemTableInserter dummy ColumnFamilyHandle and enable it to call DBImpl
+// methods
+class ColumnFamilyHandleInternal : public ColumnFamilyHandleImpl {
+ public:
+  ColumnFamilyHandleInternal()
+      : ColumnFamilyHandleImpl(nullptr, nullptr, nullptr) {}
+
+  void SetCFD(ColumnFamilyData* _cfd) { internal_cfd_ = _cfd; }
+  virtual ColumnFamilyData* cfd() const override { return internal_cfd_; }
+
+ private:
+  ColumnFamilyData* internal_cfd_;
+};
+
+// holds references to memtable, all immutable memtables and version
+struct SuperVersion {
+  // Accessing members of this class is not thread-safe and requires external
+  // synchronization (ie db mutex held or on write thread).
+  MemTable* mem;
+  MemTableListVersion* imm;
+  Version* current;
+  MutableCFOptions mutable_cf_options;
+  // Version number of the current SuperVersion
+  uint64_t version_number;
+
+  InstrumentedMutex* db_mutex;
+
+  // should be called outside the mutex
+  SuperVersion() = default;
+  ~SuperVersion();
+  SuperVersion* Ref();
+  // If Unref() returns true, Cleanup() should be called with mutex held
+  // before deleting this SuperVersion.
+  bool Unref();
+
+  // call these two methods with db mutex held
+  // Cleanup unrefs mem, imm and current. Also, it stores all memtables
+  // that needs to be deleted in to_delete vector. Unrefing those
+  // objects needs to be done in the mutex
+  void Cleanup();
+  void Init(MemTable* new_mem, MemTableListVersion* new_imm,
+            Version* new_current);
+
+  // The value of dummy is not actually used. kSVInUse takes its address as a
+  // mark in the thread local storage to indicate the SuperVersion is in use
+  // by thread. This way, the value of kSVInUse is guaranteed to have no
+  // conflict with SuperVersion object address and portable on different
+  // platform.
+  static int dummy;
+  static void* const kSVInUse;
+  static void* const kSVObsolete;
+
+ private:
+  std::atomic<uint32_t> refs;
+  // We need to_delete because during Cleanup(), imm->Unref() returns
+  // all memtables that we need to free through this vector. We then
+  // delete all those memtables outside of mutex, during destruction
+  autovector<MemTable*> to_delete;
+};
+
+extern Status CheckCompressionSupported(const ColumnFamilyOptions& cf_options);
+
+extern Status CheckConcurrentWritesSupported(
+    const ColumnFamilyOptions& cf_options);
+
+extern ColumnFamilyOptions SanitizeOptions(const ImmutableDBOptions& db_options,
+                                           const ColumnFamilyOptions& src);
+// Wrap user defined table proproties collector factories `from cf_options`
+// into internal ones in int_tbl_prop_collector_factories. Add a system internal
+// one too.
+extern void GetIntTblPropCollectorFactory(
+    const ImmutableCFOptions& ioptions,
+    std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
+        int_tbl_prop_collector_factories);
+
+class ColumnFamilySet;
+
+// This class keeps all the data that a column family needs.
+// Most methods require DB mutex held, unless otherwise noted
+class ColumnFamilyData {
+ public:
+  ~ColumnFamilyData();
+
+  // thread-safe
+  uint32_t GetID() const { return id_; }
+  // thread-safe
+  const std::string& GetName() const { return name_; }
+
+  // Ref() can only be called from a context where the caller can guarantee
+  // that ColumnFamilyData is alive (while holding a non-zero ref already,
+  // holding a DB mutex, or as the leader in a write batch group).
+  void Ref() { refs_.fetch_add(1, std::memory_order_relaxed); }
+
+  // Unref decreases the reference count, but does not handle deletion
+  // when the count goes to 0.  If this method returns true then the
+  // caller should delete the instance immediately, or later, by calling
+  // FreeDeadColumnFamilies().  Unref() can only be called while holding
+  // a DB mutex, or during single-threaded recovery.
+  bool Unref() {
+    int old_refs = refs_.fetch_sub(1, std::memory_order_relaxed);
+    assert(old_refs > 0);
+    return old_refs == 1;
+  }
+
+  // SetDropped() can only be called under following conditions:
+  // 1) Holding a DB mutex,
+  // 2) from single-threaded write thread, AND
+  // 3) from single-threaded VersionSet::LogAndApply()
+  // After dropping column family no other operation on that column family
+  // will be executed. All the files and memory will be, however, kept around
+  // until client drops the column family handle. That way, client can still
+  // access data from dropped column family.
+  // Column family can be dropped and still alive. In that state:
+  // *) Compaction and flush is not executed on the dropped column family.
+  // *) Client can continue reading from column family. Writes will fail unless
+  // WriteOptions::ignore_missing_column_families is true
+  // When the dropped column family is unreferenced, then we:
+  // *) Remove column family from the linked list maintained by ColumnFamilySet
+  // *) delete all memory associated with that column family
+  // *) delete all the files associated with that column family
+  void SetDropped();
+  bool IsDropped() const { return dropped_; }
+
+  // thread-safe
+  int NumberLevels() const { return ioptions_.num_levels; }
+
+  void SetLogNumber(uint64_t log_number) { log_number_ = log_number; }
+  uint64_t GetLogNumber() const { return log_number_; }
+
+  // thread-safe
+  const EnvOptions* soptions() const;
+  const ImmutableCFOptions* ioptions() const { return &ioptions_; }
+  // REQUIRES: DB mutex held
+  // This returns the MutableCFOptions used by current SuperVersion
+  // You should use this API to reference MutableCFOptions most of the time.
+  const MutableCFOptions* GetCurrentMutableCFOptions() const {
+    return &(super_version_->mutable_cf_options);
+  }
+  // REQUIRES: DB mutex held
+  // This returns the latest MutableCFOptions, which may be not in effect yet.
+  const MutableCFOptions* GetLatestMutableCFOptions() const {
+    return &mutable_cf_options_;
+  }
+
+  // REQUIRES: DB mutex held
+  // Build ColumnFamiliesOptions with immutable options and latest mutable
+  // options.
+  ColumnFamilyOptions GetLatestCFOptions() const;
+
+  bool is_delete_range_supported() { return is_delete_range_supported_; }
+
+#ifndef ROCKSDB_LITE
+  // REQUIRES: DB mutex held
+  Status SetOptions(
+      const std::unordered_map<std::string, std::string>& options_map);
+#endif  // ROCKSDB_LITE
+
+  InternalStats* internal_stats() { return internal_stats_.get(); }
+
+  MemTableList* imm() { return &imm_; }
+  MemTable* mem() { return mem_; }
+  Version* current() { return current_; }
+  Version* dummy_versions() { return dummy_versions_; }
+  void SetCurrent(Version* _current);
+  uint64_t GetNumLiveVersions() const;  // REQUIRE: DB mutex held
+  uint64_t GetTotalSstFilesSize() const;  // REQUIRE: DB mutex held
+  void SetMemtable(MemTable* new_mem) { mem_ = new_mem; }
+
+  // calculate the oldest log needed for the durability of this column family
+  uint64_t OldestLogToKeep();
+
+  // See Memtable constructor for explanation of earliest_seq param.
+  MemTable* ConstructNewMemtable(const MutableCFOptions& mutable_cf_options,
+                                 SequenceNumber earliest_seq);
+  void CreateNewMemtable(const MutableCFOptions& mutable_cf_options,
+                         SequenceNumber earliest_seq);
+
+  TableCache* table_cache() const { return table_cache_.get(); }
+
+  // See documentation in compaction_picker.h
+  // REQUIRES: DB mutex held
+  bool NeedsCompaction() const;
+  // REQUIRES: DB mutex held
+  Compaction* PickCompaction(const MutableCFOptions& mutable_options,
+                             LogBuffer* log_buffer);
+
+  // Check if the passed range overlap with any running compactions.
+  // REQUIRES: DB mutex held
+  bool RangeOverlapWithCompaction(const Slice& smallest_user_key,
+                                  const Slice& largest_user_key,
+                                  int level) const;
+
+  // A flag to tell a manual compaction is to compact all levels together
+  // instad of for specific level.
+  static const int kCompactAllLevels;
+  // A flag to tell a manual compaction's output is base level.
+  static const int kCompactToBaseLevel;
+  // REQUIRES: DB mutex held
+  Compaction* CompactRange(const MutableCFOptions& mutable_cf_options,
+                           int input_level, int output_level,
+                           uint32_t output_path_id, const InternalKey* begin,
+                           const InternalKey* end, InternalKey** compaction_end,
+                           bool* manual_conflict);
+
+  CompactionPicker* compaction_picker() { return compaction_picker_.get(); }
+  // thread-safe
+  const Comparator* user_comparator() const {
+    return internal_comparator_.user_comparator();
+  }
+  // thread-safe
+  const InternalKeyComparator& internal_comparator() const {
+    return internal_comparator_;
+  }
+
+  const std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
+  int_tbl_prop_collector_factories() const {
+    return &int_tbl_prop_collector_factories_;
+  }
+
+  SuperVersion* GetSuperVersion() { return super_version_; }
+  // thread-safe
+  // Return a already referenced SuperVersion to be used safely.
+  SuperVersion* GetReferencedSuperVersion(InstrumentedMutex* db_mutex);
+  // thread-safe
+  // Get SuperVersion stored in thread local storage. If it does not exist,
+  // get a reference from a current SuperVersion.
+  SuperVersion* GetThreadLocalSuperVersion(InstrumentedMutex* db_mutex);
+  // Try to return SuperVersion back to thread local storage. Retrun true on
+  // success and false on failure. It fails when the thread local storage
+  // contains anything other than SuperVersion::kSVInUse flag.
+  bool ReturnThreadLocalSuperVersion(SuperVersion* sv);
+  // thread-safe
+  uint64_t GetSuperVersionNumber() const {
+    return super_version_number_.load();
+  }
+  // will return a pointer to SuperVersion* if previous SuperVersion
+  // if its reference count is zero and needs deletion or nullptr if not
+  // As argument takes a pointer to allocated SuperVersion to enable
+  // the clients to allocate SuperVersion outside of mutex.
+  // IMPORTANT: Only call this from DBImpl::InstallSuperVersion()
+  SuperVersion* InstallSuperVersion(SuperVersion* new_superversion,
+                                    InstrumentedMutex* db_mutex,
+                                    const MutableCFOptions& mutable_cf_options);
+  SuperVersion* InstallSuperVersion(SuperVersion* new_superversion,
+                                    InstrumentedMutex* db_mutex);
+
+  void ResetThreadLocalSuperVersions();
+
+  // Protected by DB mutex
+  void set_pending_flush(bool value) { pending_flush_ = value; }
+  void set_pending_compaction(bool value) { pending_compaction_ = value; }
+  bool pending_flush() { return pending_flush_; }
+  bool pending_compaction() { return pending_compaction_; }
+
+  // Recalculate some small conditions, which are changed only during
+  // compaction, adding new memtable and/or
+  // recalculation of compaction score. These values are used in
+  // DBImpl::MakeRoomForWrite function to decide, if it need to make
+  // a write stall
+  void RecalculateWriteStallConditions(
+      const MutableCFOptions& mutable_cf_options);
+
+  void set_initialized() { initialized_.store(true); }
+
+  bool initialized() const { return initialized_.load(); }
+
+ private:
+  friend class ColumnFamilySet;
+  ColumnFamilyData(uint32_t id, const std::string& name,
+                   Version* dummy_versions, Cache* table_cache,
+                   WriteBufferManager* write_buffer_manager,
+                   const ColumnFamilyOptions& options,
+                   const ImmutableDBOptions& db_options,
+                   const EnvOptions& env_options,
+                   ColumnFamilySet* column_family_set);
+
+  uint32_t id_;
+  const std::string name_;
+  Version* dummy_versions_;  // Head of circular doubly-linked list of versions.
+  Version* current_;         // == dummy_versions->prev_
+
+  std::atomic<int> refs_;      // outstanding references to ColumnFamilyData
+  std::atomic<bool> initialized_;
+  bool dropped_;               // true if client dropped it
+
+  const InternalKeyComparator internal_comparator_;
+  std::vector<std::unique_ptr<IntTblPropCollectorFactory>>
+      int_tbl_prop_collector_factories_;
+
+  const ColumnFamilyOptions initial_cf_options_;
+  const ImmutableCFOptions ioptions_;
+  MutableCFOptions mutable_cf_options_;
+
+  const bool is_delete_range_supported_;
+
+  std::unique_ptr<TableCache> table_cache_;
+
+  std::unique_ptr<InternalStats> internal_stats_;
+
+  WriteBufferManager* write_buffer_manager_;
+
+  MemTable* mem_;
+  MemTableList imm_;
+  SuperVersion* super_version_;
+
+  // An ordinal representing the current SuperVersion. Updated by
+  // InstallSuperVersion(), i.e. incremented every time super_version_
+  // changes.
+  std::atomic<uint64_t> super_version_number_;
+
+  // Thread's local copy of SuperVersion pointer
+  // This needs to be destructed before mutex_
+  std::unique_ptr<ThreadLocalPtr> local_sv_;
+
+  // pointers for a circular linked list. we use it to support iterations over
+  // all column families that are alive (note: dropped column families can also
+  // be alive as long as client holds a reference)
+  ColumnFamilyData* next_;
+  ColumnFamilyData* prev_;
+
+  // This is the earliest log file number that contains data from this
+  // Column Family. All earlier log files must be ignored and not
+  // recovered from
+  uint64_t log_number_;
+
+  // An object that keeps all the compaction stats
+  // and picks the next compaction
+  std::unique_ptr<CompactionPicker> compaction_picker_;
+
+  ColumnFamilySet* column_family_set_;
+
+  std::unique_ptr<WriteControllerToken> write_controller_token_;
+
+  // If true --> this ColumnFamily is currently present in DBImpl::flush_queue_
+  bool pending_flush_;
+
+  // If true --> this ColumnFamily is currently present in
+  // DBImpl::compaction_queue_
+  bool pending_compaction_;
+
+  uint64_t prev_compaction_needed_bytes_;
+
+  // if the database was opened with 2pc enabled
+  bool allow_2pc_;
+};
+
+// ColumnFamilySet has interesting thread-safety requirements
+// * CreateColumnFamily() or RemoveColumnFamily() -- need to be protected by DB
+// mutex AND executed in the write thread.
+// CreateColumnFamily() should ONLY be called from VersionSet::LogAndApply() AND
+// single-threaded write thread. It is also called during Recovery and in
+// DumpManifest().
+// RemoveColumnFamily() is only called from SetDropped(). DB mutex needs to be
+// held and it needs to be executed from the write thread. SetDropped() also
+// guarantees that it will be called only from single-threaded LogAndApply(),
+// but this condition is not that important.
+// * Iteration -- hold DB mutex, but you can release it in the body of
+// iteration. If you release DB mutex in body, reference the column
+// family before the mutex and unreference after you unlock, since the column
+// family might get dropped when the DB mutex is released
+// * GetDefault() -- thread safe
+// * GetColumnFamily() -- either inside of DB mutex or from a write thread
+// * GetNextColumnFamilyID(), GetMaxColumnFamily(), UpdateMaxColumnFamily(),
+// NumberOfColumnFamilies -- inside of DB mutex
+class ColumnFamilySet {
+ public:
+  // ColumnFamilySet supports iteration
+  class iterator {
+   public:
+    explicit iterator(ColumnFamilyData* cfd)
+        : current_(cfd) {}
+    iterator& operator++() {
+      // dropped column families might still be included in this iteration
+      // (we're only removing them when client drops the last reference to the
+      // column family).
+      // dummy is never dead, so this will never be infinite
+      do {
+        current_ = current_->next_;
+      } while (current_->refs_.load(std::memory_order_relaxed) == 0);
+      return *this;
+    }
+    bool operator!=(const iterator& other) {
+      return this->current_ != other.current_;
+    }
+    ColumnFamilyData* operator*() { return current_; }
+
+   private:
+    ColumnFamilyData* current_;
+  };
+
+  ColumnFamilySet(const std::string& dbname,
+                  const ImmutableDBOptions* db_options,
+                  const EnvOptions& env_options, Cache* table_cache,
+                  WriteBufferManager* write_buffer_manager,
+                  WriteController* write_controller);
+  ~ColumnFamilySet();
+
+  ColumnFamilyData* GetDefault() const;
+  // GetColumnFamily() calls return nullptr if column family is not found
+  ColumnFamilyData* GetColumnFamily(uint32_t id) const;
+  ColumnFamilyData* GetColumnFamily(const std::string& name) const;
+  // this call will return the next available column family ID. it guarantees
+  // that there is no column family with id greater than or equal to the
+  // returned value in the current running instance or anytime in RocksDB
+  // instance history.
+  uint32_t GetNextColumnFamilyID();
+  uint32_t GetMaxColumnFamily();
+  void UpdateMaxColumnFamily(uint32_t new_max_column_family);
+  size_t NumberOfColumnFamilies() const;
+
+  ColumnFamilyData* CreateColumnFamily(const std::string& name, uint32_t id,
+                                       Version* dummy_version,
+                                       const ColumnFamilyOptions& options);
+
+  iterator begin() { return iterator(dummy_cfd_->next_); }
+  iterator end() { return iterator(dummy_cfd_); }
+
+  // REQUIRES: DB mutex held
+  // Don't call while iterating over ColumnFamilySet
+  void FreeDeadColumnFamilies();
+
+  Cache* get_table_cache() { return table_cache_; }
+
+ private:
+  friend class ColumnFamilyData;
+  // helper function that gets called from cfd destructor
+  // REQUIRES: DB mutex held
+  void RemoveColumnFamily(ColumnFamilyData* cfd);
+
+  // column_families_ and column_family_data_ need to be protected:
+  // * when mutating both conditions have to be satisfied:
+  // 1. DB mutex locked
+  // 2. thread currently in single-threaded write thread
+  // * when reading, at least one condition needs to be satisfied:
+  // 1. DB mutex locked
+  // 2. accessed from a single-threaded write thread
+  std::unordered_map<std::string, uint32_t> column_families_;
+  std::unordered_map<uint32_t, ColumnFamilyData*> column_family_data_;
+
+  uint32_t max_column_family_;
+  ColumnFamilyData* dummy_cfd_;
+  // We don't hold the refcount here, since default column family always exists
+  // We are also not responsible for cleaning up default_cfd_cache_. This is
+  // just a cache that makes common case (accessing default column family)
+  // faster
+  ColumnFamilyData* default_cfd_cache_;
+
+  const std::string db_name_;
+  const ImmutableDBOptions* const db_options_;
+  const EnvOptions env_options_;
+  Cache* table_cache_;
+  WriteBufferManager* write_buffer_manager_;
+  WriteController* write_controller_;
+};
+
+// We use ColumnFamilyMemTablesImpl to provide WriteBatch a way to access
+// memtables of different column families (specified by ID in the write batch)
+class ColumnFamilyMemTablesImpl : public ColumnFamilyMemTables {
+ public:
+  explicit ColumnFamilyMemTablesImpl(ColumnFamilySet* column_family_set)
+      : column_family_set_(column_family_set), current_(nullptr) {}
+
+  // Constructs a ColumnFamilyMemTablesImpl equivalent to one constructed
+  // with the arguments used to construct *orig.
+  explicit ColumnFamilyMemTablesImpl(ColumnFamilyMemTablesImpl* orig)
+      : column_family_set_(orig->column_family_set_), current_(nullptr) {}
+
+  // sets current_ to ColumnFamilyData with column_family_id
+  // returns false if column family doesn't exist
+  // REQUIRES: use this function of DBImpl::column_family_memtables_ should be
+  //           under a DB mutex OR from a write thread
+  bool Seek(uint32_t column_family_id) override;
+
+  // Returns log number of the selected column family
+  // REQUIRES: under a DB mutex OR from a write thread
+  uint64_t GetLogNumber() const override;
+
+  // REQUIRES: Seek() called first
+  // REQUIRES: use this function of DBImpl::column_family_memtables_ should be
+  //           under a DB mutex OR from a write thread
+  virtual MemTable* GetMemTable() const override;
+
+  // Returns column family handle for the selected column family
+  // REQUIRES: use this function of DBImpl::column_family_memtables_ should be
+  //           under a DB mutex OR from a write thread
+  virtual ColumnFamilyHandle* GetColumnFamilyHandle() override;
+
+  // Cannot be called while another thread is calling Seek().
+  // REQUIRES: use this function of DBImpl::column_family_memtables_ should be
+  //           under a DB mutex OR from a write thread
+  virtual ColumnFamilyData* current() override { return current_; }
+
+ private:
+  ColumnFamilySet* column_family_set_;
+  ColumnFamilyData* current_;
+  ColumnFamilyHandleInternal handle_;
+};
+
+extern uint32_t GetColumnFamilyID(ColumnFamilyHandle* column_family);
+
+extern const Comparator* GetColumnFamilyUserComparator(
+    ColumnFamilyHandle* column_family);
+
+}  // namespace rocksdb

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/compacted_db_impl.cc
----------------------------------------------------------------------
diff --git a/thirdparty/rocksdb/db/compacted_db_impl.cc b/thirdparty/rocksdb/db/compacted_db_impl.cc
new file mode 100644
index 0000000..d1007d9
--- /dev/null
+++ b/thirdparty/rocksdb/db/compacted_db_impl.cc
@@ -0,0 +1,166 @@
+//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
+
+#ifndef ROCKSDB_LITE
+#include "db/compacted_db_impl.h"
+#include "db/db_impl.h"
+#include "db/version_set.h"
+#include "table/get_context.h"
+
+namespace rocksdb {
+
+extern void MarkKeyMayExist(void* arg);
+extern bool SaveValue(void* arg, const ParsedInternalKey& parsed_key,
+                      const Slice& v, bool hit_and_return);
+
+CompactedDBImpl::CompactedDBImpl(
+  const DBOptions& options, const std::string& dbname)
+  : DBImpl(options, dbname) {
+}
+
+CompactedDBImpl::~CompactedDBImpl() {
+}
+
+size_t CompactedDBImpl::FindFile(const Slice& key) {
+  size_t left = 0;
+  size_t right = files_.num_files - 1;
+  while (left < right) {
+    size_t mid = (left + right) >> 1;
+    const FdWithKeyRange& f = files_.files[mid];
+    if (user_comparator_->Compare(ExtractUserKey(f.largest_key), key) < 0) {
+      // Key at "mid.largest" is < "target".  Therefore all
+      // files at or before "mid" are uninteresting.
+      left = mid + 1;
+    } else {
+      // Key at "mid.largest" is >= "target".  Therefore all files
+      // after "mid" are uninteresting.
+      right = mid;
+    }
+  }
+  return right;
+}
+
+Status CompactedDBImpl::Get(const ReadOptions& options, ColumnFamilyHandle*,
+                            const Slice& key, PinnableSlice* value) {
+  GetContext get_context(user_comparator_, nullptr, nullptr, nullptr,
+                         GetContext::kNotFound, key, value, nullptr, nullptr,
+                         nullptr, nullptr);
+  LookupKey lkey(key, kMaxSequenceNumber);
+  files_.files[FindFile(key)].fd.table_reader->Get(
+      options, lkey.internal_key(), &get_context);
+  if (get_context.State() == GetContext::kFound) {
+    return Status::OK();
+  }
+  return Status::NotFound();
+}
+
+std::vector<Status> CompactedDBImpl::MultiGet(const ReadOptions& options,
+    const std::vector<ColumnFamilyHandle*>&,
+    const std::vector<Slice>& keys, std::vector<std::string>* values) {
+  autovector<TableReader*, 16> reader_list;
+  for (const auto& key : keys) {
+    const FdWithKeyRange& f = files_.files[FindFile(key)];
+    if (user_comparator_->Compare(key, ExtractUserKey(f.smallest_key)) < 0) {
+      reader_list.push_back(nullptr);
+    } else {
+      LookupKey lkey(key, kMaxSequenceNumber);
+      f.fd.table_reader->Prepare(lkey.internal_key());
+      reader_list.push_back(f.fd.table_reader);
+    }
+  }
+  std::vector<Status> statuses(keys.size(), Status::NotFound());
+  values->resize(keys.size());
+  int idx = 0;
+  for (auto* r : reader_list) {
+    if (r != nullptr) {
+      PinnableSlice pinnable_val;
+      std::string& value = (*values)[idx];
+      GetContext get_context(user_comparator_, nullptr, nullptr, nullptr,
+                             GetContext::kNotFound, keys[idx], &pinnable_val,
+                             nullptr, nullptr, nullptr, nullptr);
+      LookupKey lkey(keys[idx], kMaxSequenceNumber);
+      r->Get(options, lkey.internal_key(), &get_context);
+      value.assign(pinnable_val.data(), pinnable_val.size());
+      if (get_context.State() == GetContext::kFound) {
+        statuses[idx] = Status::OK();
+      }
+    }
+    ++idx;
+  }
+  return statuses;
+}
+
+Status CompactedDBImpl::Init(const Options& options) {
+  mutex_.Lock();
+  ColumnFamilyDescriptor cf(kDefaultColumnFamilyName,
+                            ColumnFamilyOptions(options));
+  Status s = Recover({cf}, true /* read only */, false, true);
+  if (s.ok()) {
+    cfd_ = reinterpret_cast<ColumnFamilyHandleImpl*>(
+              DefaultColumnFamily())->cfd();
+    delete cfd_->InstallSuperVersion(new SuperVersion(), &mutex_);
+  }
+  mutex_.Unlock();
+  if (!s.ok()) {
+    return s;
+  }
+  NewThreadStatusCfInfo(cfd_);
+  version_ = cfd_->GetSuperVersion()->current;
+  user_comparator_ = cfd_->user_comparator();
+  auto* vstorage = version_->storage_info();
+  if (vstorage->num_non_empty_levels() == 0) {
+    return Status::NotSupported("no file exists");
+  }
+  const LevelFilesBrief& l0 = vstorage->LevelFilesBrief(0);
+  // L0 should not have files
+  if (l0.num_files > 1) {
+    return Status::NotSupported("L0 contain more than 1 file");
+  }
+  if (l0.num_files == 1) {
+    if (vstorage->num_non_empty_levels() > 1) {
+      return Status::NotSupported("Both L0 and other level contain files");
+    }
+    files_ = l0;
+    return Status::OK();
+  }
+
+  for (int i = 1; i < vstorage->num_non_empty_levels() - 1; ++i) {
+    if (vstorage->LevelFilesBrief(i).num_files > 0) {
+      return Status::NotSupported("Other levels also contain files");
+    }
+  }
+
+  int level = vstorage->num_non_empty_levels() - 1;
+  if (vstorage->LevelFilesBrief(level).num_files > 0) {
+    files_ = vstorage->LevelFilesBrief(level);
+    return Status::OK();
+  }
+  return Status::NotSupported("no file exists");
+}
+
+Status CompactedDBImpl::Open(const Options& options,
+                             const std::string& dbname, DB** dbptr) {
+  *dbptr = nullptr;
+
+  if (options.max_open_files != -1) {
+    return Status::InvalidArgument("require max_open_files = -1");
+  }
+  if (options.merge_operator.get() != nullptr) {
+    return Status::InvalidArgument("merge operator is not supported");
+  }
+  DBOptions db_options(options);
+  std::unique_ptr<CompactedDBImpl> db(new CompactedDBImpl(db_options, dbname));
+  Status s = db->Init(options);
+  if (s.ok()) {
+    ROCKS_LOG_INFO(db->immutable_db_options_.info_log,
+                   "Opened the db as fully compacted mode");
+    LogFlush(db->immutable_db_options_.info_log);
+    *dbptr = db.release();
+  }
+  return s;
+}
+
+}   // namespace rocksdb
+#endif  // ROCKSDB_LITE

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/compacted_db_impl.h
----------------------------------------------------------------------
diff --git a/thirdparty/rocksdb/db/compacted_db_impl.h b/thirdparty/rocksdb/db/compacted_db_impl.h
new file mode 100644
index 0000000..de32f21
--- /dev/null
+++ b/thirdparty/rocksdb/db/compacted_db_impl.h
@@ -0,0 +1,102 @@
+//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
+
+#pragma once
+#ifndef ROCKSDB_LITE
+#include "db/db_impl.h"
+#include <vector>
+#include <string>
+
+namespace rocksdb {
+
+class CompactedDBImpl : public DBImpl {
+ public:
+  CompactedDBImpl(const DBOptions& options, const std::string& dbname);
+  virtual ~CompactedDBImpl();
+
+  static Status Open(const Options& options, const std::string& dbname,
+                     DB** dbptr);
+
+  // Implementations of the DB interface
+  using DB::Get;
+  virtual Status Get(const ReadOptions& options,
+                     ColumnFamilyHandle* column_family, const Slice& key,
+                     PinnableSlice* value) override;
+  using DB::MultiGet;
+  virtual std::vector<Status> MultiGet(
+      const ReadOptions& options,
+      const std::vector<ColumnFamilyHandle*>&,
+      const std::vector<Slice>& keys, std::vector<std::string>* values)
+    override;
+
+  using DBImpl::Put;
+  virtual Status Put(const WriteOptions& options,
+                     ColumnFamilyHandle* column_family, const Slice& key,
+                     const Slice& value) override {
+    return Status::NotSupported("Not supported in compacted db mode.");
+  }
+  using DBImpl::Merge;
+  virtual Status Merge(const WriteOptions& options,
+                       ColumnFamilyHandle* column_family, const Slice& key,
+                       const Slice& value) override {
+    return Status::NotSupported("Not supported in compacted db mode.");
+  }
+  using DBImpl::Delete;
+  virtual Status Delete(const WriteOptions& options,
+                        ColumnFamilyHandle* column_family,
+                        const Slice& key) override {
+    return Status::NotSupported("Not supported in compacted db mode.");
+  }
+  virtual Status Write(const WriteOptions& options,
+                       WriteBatch* updates) override {
+    return Status::NotSupported("Not supported in compacted db mode.");
+  }
+  using DBImpl::CompactRange;
+  virtual Status CompactRange(const CompactRangeOptions& options,
+                              ColumnFamilyHandle* column_family,
+                              const Slice* begin, const Slice* end) override {
+    return Status::NotSupported("Not supported in compacted db mode.");
+  }
+
+  virtual Status DisableFileDeletions() override {
+    return Status::NotSupported("Not supported in compacted db mode.");
+  }
+  virtual Status EnableFileDeletions(bool force) override {
+    return Status::NotSupported("Not supported in compacted db mode.");
+  }
+  virtual Status GetLiveFiles(std::vector<std::string>&,
+                              uint64_t* manifest_file_size,
+                              bool flush_memtable = true) override {
+    return Status::NotSupported("Not supported in compacted db mode.");
+  }
+  using DBImpl::Flush;
+  virtual Status Flush(const FlushOptions& options,
+                       ColumnFamilyHandle* column_family) override {
+    return Status::NotSupported("Not supported in compacted db mode.");
+  }
+  using DB::IngestExternalFile;
+  virtual Status IngestExternalFile(
+      ColumnFamilyHandle* column_family,
+      const std::vector<std::string>& external_files,
+      const IngestExternalFileOptions& ingestion_options) override {
+    return Status::NotSupported("Not supported in compacted db mode.");
+  }
+
+ private:
+  friend class DB;
+  inline size_t FindFile(const Slice& key);
+  Status Init(const Options& options);
+
+  ColumnFamilyData* cfd_;
+  Version* version_;
+  const Comparator* user_comparator_;
+  LevelFilesBrief files_;
+
+  // No copying allowed
+  CompactedDBImpl(const CompactedDBImpl&);
+  void operator=(const CompactedDBImpl&);
+};
+}
+#endif  // ROCKSDB_LITE


Mime
View raw message