nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jeremyd...@apache.org
Subject [09/51] [partial] nifi-minifi-cpp git commit: MINIFI-372: Replace leveldb with RocksDB
Date Mon, 09 Oct 2017 16:24:49 GMT
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/version_set.cc
----------------------------------------------------------------------
diff --git a/thirdparty/rocksdb/db/version_set.cc b/thirdparty/rocksdb/db/version_set.cc
new file mode 100644
index 0000000..6b9611a
--- /dev/null
+++ b/thirdparty/rocksdb/db/version_set.cc
@@ -0,0 +1,3801 @@
+//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#include "db/version_set.h"
+
+#ifndef __STDC_FORMAT_MACROS
+#define __STDC_FORMAT_MACROS
+#endif
+
+#include <inttypes.h>
+#include <stdio.h>
+#include <algorithm>
+#include <climits>
+#include <map>
+#include <set>
+#include <string>
+#include <unordered_map>
+#include <vector>
+#include "db/compaction.h"
+#include "db/internal_stats.h"
+#include "db/log_reader.h"
+#include "db/log_writer.h"
+#include "db/memtable.h"
+#include "db/merge_context.h"
+#include "db/merge_helper.h"
+#include "db/pinned_iterators_manager.h"
+#include "db/table_cache.h"
+#include "db/version_builder.h"
+#include "monitoring/file_read_sample.h"
+#include "monitoring/perf_context_imp.h"
+#include "rocksdb/env.h"
+#include "rocksdb/merge_operator.h"
+#include "rocksdb/write_buffer_manager.h"
+#include "table/format.h"
+#include "table/get_context.h"
+#include "table/internal_iterator.h"
+#include "table/merging_iterator.h"
+#include "table/meta_blocks.h"
+#include "table/plain_table_factory.h"
+#include "table/table_reader.h"
+#include "table/two_level_iterator.h"
+#include "util/coding.h"
+#include "util/file_reader_writer.h"
+#include "util/filename.h"
+#include "util/stop_watch.h"
+#include "util/string_util.h"
+#include "util/sync_point.h"
+
+namespace rocksdb {
+
+namespace {
+
+// Find File in LevelFilesBrief data structure
+// Within an index range defined by left and right
+int FindFileInRange(const InternalKeyComparator& icmp,
+    const LevelFilesBrief& file_level,
+    const Slice& key,
+    uint32_t left,
+    uint32_t right) {
+  while (left < right) {
+    uint32_t mid = (left + right) / 2;
+    const FdWithKeyRange& f = file_level.files[mid];
+    if (icmp.InternalKeyComparator::Compare(f.largest_key, key) < 0) {
+      // Key at "mid.largest" is < "target".  Therefore all
+      // files at or before "mid" are uninteresting.
+      left = mid + 1;
+    } else {
+      // Key at "mid.largest" is >= "target".  Therefore all files
+      // after "mid" are uninteresting.
+      right = mid;
+    }
+  }
+  return right;
+}
+
+// Class to help choose the next file to search for the particular key.
+// Searches and returns files level by level.
+// We can search level-by-level since entries never hop across
+// levels. Therefore we are guaranteed that if we find data
+// in a smaller level, later levels are irrelevant (unless we
+// are MergeInProgress).
+class FilePicker {
+ public:
+  FilePicker(std::vector<FileMetaData*>* files, const Slice& user_key,
+             const Slice& ikey, autovector<LevelFilesBrief>* file_levels,
+             unsigned int num_levels, FileIndexer* file_indexer,
+             const Comparator* user_comparator,
+             const InternalKeyComparator* internal_comparator)
+      : num_levels_(num_levels),
+        curr_level_(static_cast<unsigned int>(-1)),
+        returned_file_level_(static_cast<unsigned int>(-1)),
+        hit_file_level_(static_cast<unsigned int>(-1)),
+        search_left_bound_(0),
+        search_right_bound_(FileIndexer::kLevelMaxIndex),
+#ifndef NDEBUG
+        files_(files),
+#endif
+        level_files_brief_(file_levels),
+        is_hit_file_last_in_level_(false),
+        user_key_(user_key),
+        ikey_(ikey),
+        file_indexer_(file_indexer),
+        user_comparator_(user_comparator),
+        internal_comparator_(internal_comparator) {
+    // Setup member variables to search first level.
+    search_ended_ = !PrepareNextLevel();
+    if (!search_ended_) {
+      // Prefetch Level 0 table data to avoid cache miss if possible.
+      for (unsigned int i = 0; i < (*level_files_brief_)[0].num_files; ++i) {
+        auto* r = (*level_files_brief_)[0].files[i].fd.table_reader;
+        if (r) {
+          r->Prepare(ikey);
+        }
+      }
+    }
+  }
+
+  int GetCurrentLevel() const { return curr_level_; }
+
+  FdWithKeyRange* GetNextFile() {
+    while (!search_ended_) {  // Loops over different levels.
+      while (curr_index_in_curr_level_ < curr_file_level_->num_files) {
+        // Loops over all files in current level.
+        FdWithKeyRange* f = &curr_file_level_->files[curr_index_in_curr_level_];
+        hit_file_level_ = curr_level_;
+        is_hit_file_last_in_level_ =
+            curr_index_in_curr_level_ == curr_file_level_->num_files - 1;
+        int cmp_largest = -1;
+
+        // Do key range filtering of files or/and fractional cascading if:
+        // (1) not all the files are in level 0, or
+        // (2) there are more than 3 current level files
+        // If there are only 3 or less current level files in the system, we skip
+        // the key range filtering. In this case, more likely, the system is
+        // highly tuned to minimize number of tables queried by each query,
+        // so it is unlikely that key range filtering is more efficient than
+        // querying the files.
+        if (num_levels_ > 1 || curr_file_level_->num_files > 3) {
+          // Check if key is within a file's range. If search left bound and
+          // right bound point to the same find, we are sure key falls in
+          // range.
+          assert(
+              curr_level_ == 0 ||
+              curr_index_in_curr_level_ == start_index_in_curr_level_ ||
+              user_comparator_->Compare(user_key_,
+                ExtractUserKey(f->smallest_key)) <= 0);
+
+          int cmp_smallest = user_comparator_->Compare(user_key_,
+              ExtractUserKey(f->smallest_key));
+          if (cmp_smallest >= 0) {
+            cmp_largest = user_comparator_->Compare(user_key_,
+                ExtractUserKey(f->largest_key));
+          }
+
+          // Setup file search bound for the next level based on the
+          // comparison results
+          if (curr_level_ > 0) {
+            file_indexer_->GetNextLevelIndex(curr_level_,
+                                            curr_index_in_curr_level_,
+                                            cmp_smallest, cmp_largest,
+                                            &search_left_bound_,
+                                            &search_right_bound_);
+          }
+          // Key falls out of current file's range
+          if (cmp_smallest < 0 || cmp_largest > 0) {
+            if (curr_level_ == 0) {
+              ++curr_index_in_curr_level_;
+              continue;
+            } else {
+              // Search next level.
+              break;
+            }
+          }
+        }
+#ifndef NDEBUG
+        // Sanity check to make sure that the files are correctly sorted
+        if (prev_file_) {
+          if (curr_level_ != 0) {
+            int comp_sign = internal_comparator_->Compare(
+                prev_file_->largest_key, f->smallest_key);
+            assert(comp_sign < 0);
+          } else {
+            // level == 0, the current file cannot be newer than the previous
+            // one. Use compressed data structure, has no attribute seqNo
+            assert(curr_index_in_curr_level_ > 0);
+            assert(!NewestFirstBySeqNo(files_[0][curr_index_in_curr_level_],
+                  files_[0][curr_index_in_curr_level_-1]));
+          }
+        }
+        prev_file_ = f;
+#endif
+        returned_file_level_ = curr_level_;
+        if (curr_level_ > 0 && cmp_largest < 0) {
+          // No more files to search in this level.
+          search_ended_ = !PrepareNextLevel();
+        } else {
+          ++curr_index_in_curr_level_;
+        }
+        return f;
+      }
+      // Start searching next level.
+      search_ended_ = !PrepareNextLevel();
+    }
+    // Search ended.
+    return nullptr;
+  }
+
+  // getter for current file level
+  // for GET_HIT_L0, GET_HIT_L1 & GET_HIT_L2_AND_UP counts
+  unsigned int GetHitFileLevel() { return hit_file_level_; }
+
+  // Returns true if the most recent "hit file" (i.e., one returned by
+  // GetNextFile()) is at the last index in its level.
+  bool IsHitFileLastInLevel() { return is_hit_file_last_in_level_; }
+
+ private:
+  unsigned int num_levels_;
+  unsigned int curr_level_;
+  unsigned int returned_file_level_;
+  unsigned int hit_file_level_;
+  int32_t search_left_bound_;
+  int32_t search_right_bound_;
+#ifndef NDEBUG
+  std::vector<FileMetaData*>* files_;
+#endif
+  autovector<LevelFilesBrief>* level_files_brief_;
+  bool search_ended_;
+  bool is_hit_file_last_in_level_;
+  LevelFilesBrief* curr_file_level_;
+  unsigned int curr_index_in_curr_level_;
+  unsigned int start_index_in_curr_level_;
+  Slice user_key_;
+  Slice ikey_;
+  FileIndexer* file_indexer_;
+  const Comparator* user_comparator_;
+  const InternalKeyComparator* internal_comparator_;
+#ifndef NDEBUG
+  FdWithKeyRange* prev_file_;
+#endif
+
+  // Setup local variables to search next level.
+  // Returns false if there are no more levels to search.
+  bool PrepareNextLevel() {
+    curr_level_++;
+    while (curr_level_ < num_levels_) {
+      curr_file_level_ = &(*level_files_brief_)[curr_level_];
+      if (curr_file_level_->num_files == 0) {
+        // When current level is empty, the search bound generated from upper
+        // level must be [0, -1] or [0, FileIndexer::kLevelMaxIndex] if it is
+        // also empty.
+        assert(search_left_bound_ == 0);
+        assert(search_right_bound_ == -1 ||
+               search_right_bound_ == FileIndexer::kLevelMaxIndex);
+        // Since current level is empty, it will need to search all files in
+        // the next level
+        search_left_bound_ = 0;
+        search_right_bound_ = FileIndexer::kLevelMaxIndex;
+        curr_level_++;
+        continue;
+      }
+
+      // Some files may overlap each other. We find
+      // all files that overlap user_key and process them in order from
+      // newest to oldest. In the context of merge-operator, this can occur at
+      // any level. Otherwise, it only occurs at Level-0 (since Put/Deletes
+      // are always compacted into a single entry).
+      int32_t start_index;
+      if (curr_level_ == 0) {
+        // On Level-0, we read through all files to check for overlap.
+        start_index = 0;
+      } else {
+        // On Level-n (n>=1), files are sorted. Binary search to find the
+        // earliest file whose largest key >= ikey. Search left bound and
+        // right bound are used to narrow the range.
+        if (search_left_bound_ == search_right_bound_) {
+          start_index = search_left_bound_;
+        } else if (search_left_bound_ < search_right_bound_) {
+          if (search_right_bound_ == FileIndexer::kLevelMaxIndex) {
+            search_right_bound_ =
+                static_cast<int32_t>(curr_file_level_->num_files) - 1;
+          }
+          start_index =
+              FindFileInRange(*internal_comparator_, *curr_file_level_, ikey_,
+                              static_cast<uint32_t>(search_left_bound_),
+                              static_cast<uint32_t>(search_right_bound_));
+        } else {
+          // search_left_bound > search_right_bound, key does not exist in
+          // this level. Since no comparison is done in this level, it will
+          // need to search all files in the next level.
+          search_left_bound_ = 0;
+          search_right_bound_ = FileIndexer::kLevelMaxIndex;
+          curr_level_++;
+          continue;
+        }
+      }
+      start_index_in_curr_level_ = start_index;
+      curr_index_in_curr_level_ = start_index;
+#ifndef NDEBUG
+      prev_file_ = nullptr;
+#endif
+      return true;
+    }
+    // curr_level_ = num_levels_. So, no more levels to search.
+    return false;
+  }
+};
+}  // anonymous namespace
+
+VersionStorageInfo::~VersionStorageInfo() { delete[] files_; }
+
+Version::~Version() {
+  assert(refs_ == 0);
+
+  // Remove from linked list
+  prev_->next_ = next_;
+  next_->prev_ = prev_;
+
+  // Drop references to files
+  for (int level = 0; level < storage_info_.num_levels_; level++) {
+    for (size_t i = 0; i < storage_info_.files_[level].size(); i++) {
+      FileMetaData* f = storage_info_.files_[level][i];
+      assert(f->refs > 0);
+      f->refs--;
+      if (f->refs <= 0) {
+        vset_->obsolete_files_.push_back(f);
+      }
+    }
+  }
+}
+
+int FindFile(const InternalKeyComparator& icmp,
+             const LevelFilesBrief& file_level,
+             const Slice& key) {
+  return FindFileInRange(icmp, file_level, key, 0,
+                         static_cast<uint32_t>(file_level.num_files));
+}
+
+void DoGenerateLevelFilesBrief(LevelFilesBrief* file_level,
+        const std::vector<FileMetaData*>& files,
+        Arena* arena) {
+  assert(file_level);
+  assert(arena);
+
+  size_t num = files.size();
+  file_level->num_files = num;
+  char* mem = arena->AllocateAligned(num * sizeof(FdWithKeyRange));
+  file_level->files = new (mem)FdWithKeyRange[num];
+
+  for (size_t i = 0; i < num; i++) {
+    Slice smallest_key = files[i]->smallest.Encode();
+    Slice largest_key = files[i]->largest.Encode();
+
+    // Copy key slice to sequential memory
+    size_t smallest_size = smallest_key.size();
+    size_t largest_size = largest_key.size();
+    mem = arena->AllocateAligned(smallest_size + largest_size);
+    memcpy(mem, smallest_key.data(), smallest_size);
+    memcpy(mem + smallest_size, largest_key.data(), largest_size);
+
+    FdWithKeyRange& f = file_level->files[i];
+    f.fd = files[i]->fd;
+    f.file_metadata = files[i];
+    f.smallest_key = Slice(mem, smallest_size);
+    f.largest_key = Slice(mem + smallest_size, largest_size);
+  }
+}
+
+static bool AfterFile(const Comparator* ucmp,
+                      const Slice* user_key, const FdWithKeyRange* f) {
+  // nullptr user_key occurs before all keys and is therefore never after *f
+  return (user_key != nullptr &&
+          ucmp->Compare(*user_key, ExtractUserKey(f->largest_key)) > 0);
+}
+
+static bool BeforeFile(const Comparator* ucmp,
+                       const Slice* user_key, const FdWithKeyRange* f) {
+  // nullptr user_key occurs after all keys and is therefore never before *f
+  return (user_key != nullptr &&
+          ucmp->Compare(*user_key, ExtractUserKey(f->smallest_key)) < 0);
+}
+
+bool SomeFileOverlapsRange(
+    const InternalKeyComparator& icmp,
+    bool disjoint_sorted_files,
+    const LevelFilesBrief& file_level,
+    const Slice* smallest_user_key,
+    const Slice* largest_user_key) {
+  const Comparator* ucmp = icmp.user_comparator();
+  if (!disjoint_sorted_files) {
+    // Need to check against all files
+    for (size_t i = 0; i < file_level.num_files; i++) {
+      const FdWithKeyRange* f = &(file_level.files[i]);
+      if (AfterFile(ucmp, smallest_user_key, f) ||
+          BeforeFile(ucmp, largest_user_key, f)) {
+        // No overlap
+      } else {
+        return true;  // Overlap
+      }
+    }
+    return false;
+  }
+
+  // Binary search over file list
+  uint32_t index = 0;
+  if (smallest_user_key != nullptr) {
+    // Find the earliest possible internal key for smallest_user_key
+    InternalKey small;
+    small.SetMaxPossibleForUserKey(*smallest_user_key);
+    index = FindFile(icmp, file_level, small.Encode());
+  }
+
+  if (index >= file_level.num_files) {
+    // beginning of range is after all files, so no overlap.
+    return false;
+  }
+
+  return !BeforeFile(ucmp, largest_user_key, &file_level.files[index]);
+}
+
+namespace {
+
+// An internal iterator.  For a given version/level pair, yields
+// information about the files in the level.  For a given entry, key()
+// is the largest key that occurs in the file, and value() is an
+// 16-byte value containing the file number and file size, both
+// encoded using EncodeFixed64.
+class LevelFileNumIterator : public InternalIterator {
+ public:
+  LevelFileNumIterator(const InternalKeyComparator& icmp,
+                       const LevelFilesBrief* flevel, bool should_sample)
+      : icmp_(icmp),
+        flevel_(flevel),
+        index_(static_cast<uint32_t>(flevel->num_files)),
+        current_value_(0, 0, 0),  // Marks as invalid
+        should_sample_(should_sample) {}
+  virtual bool Valid() const override { return index_ < flevel_->num_files; }
+  virtual void Seek(const Slice& target) override {
+    index_ = FindFile(icmp_, *flevel_, target);
+  }
+  virtual void SeekForPrev(const Slice& target) override {
+    SeekForPrevImpl(target, &icmp_);
+  }
+
+  virtual void SeekToFirst() override { index_ = 0; }
+  virtual void SeekToLast() override {
+    index_ = (flevel_->num_files == 0)
+                 ? 0
+                 : static_cast<uint32_t>(flevel_->num_files) - 1;
+  }
+  virtual void Next() override {
+    assert(Valid());
+    index_++;
+  }
+  virtual void Prev() override {
+    assert(Valid());
+    if (index_ == 0) {
+      index_ = static_cast<uint32_t>(flevel_->num_files);  // Marks as invalid
+    } else {
+      index_--;
+    }
+  }
+  Slice key() const override {
+    assert(Valid());
+    return flevel_->files[index_].largest_key;
+  }
+  Slice value() const override {
+    assert(Valid());
+
+    auto file_meta = flevel_->files[index_];
+    if (should_sample_) {
+      sample_file_read_inc(file_meta.file_metadata);
+    }
+    current_value_ = file_meta.fd;
+    return Slice(reinterpret_cast<const char*>(&current_value_),
+                 sizeof(FileDescriptor));
+  }
+  virtual Status status() const override { return Status::OK(); }
+
+ private:
+  const InternalKeyComparator icmp_;
+  const LevelFilesBrief* flevel_;
+  uint32_t index_;
+  mutable FileDescriptor current_value_;
+  bool should_sample_;
+};
+
+class LevelFileIteratorState : public TwoLevelIteratorState {
+ public:
+  // @param skip_filters Disables loading/accessing the filter block
+  LevelFileIteratorState(TableCache* table_cache,
+                         const ReadOptions& read_options,
+                         const EnvOptions& env_options,
+                         const InternalKeyComparator& icomparator,
+                         HistogramImpl* file_read_hist, bool for_compaction,
+                         bool prefix_enabled, bool skip_filters, int level,
+                         RangeDelAggregator* range_del_agg)
+      : TwoLevelIteratorState(prefix_enabled),
+        table_cache_(table_cache),
+        read_options_(read_options),
+        env_options_(env_options),
+        icomparator_(icomparator),
+        file_read_hist_(file_read_hist),
+        for_compaction_(for_compaction),
+        skip_filters_(skip_filters),
+        level_(level),
+        range_del_agg_(range_del_agg) {}
+
+  InternalIterator* NewSecondaryIterator(const Slice& meta_handle) override {
+    if (meta_handle.size() != sizeof(FileDescriptor)) {
+      return NewErrorInternalIterator(
+          Status::Corruption("FileReader invoked with unexpected value"));
+    }
+    const FileDescriptor* fd =
+        reinterpret_cast<const FileDescriptor*>(meta_handle.data());
+    return table_cache_->NewIterator(
+        read_options_, env_options_, icomparator_, *fd, range_del_agg_,
+        nullptr /* don't need reference to table */, file_read_hist_,
+        for_compaction_, nullptr /* arena */, skip_filters_, level_);
+  }
+
+  bool PrefixMayMatch(const Slice& internal_key) override {
+    return true;
+  }
+
+  bool KeyReachedUpperBound(const Slice& internal_key) override {
+    return read_options_.iterate_upper_bound != nullptr &&
+           icomparator_.user_comparator()->Compare(
+               ExtractUserKey(internal_key),
+               *read_options_.iterate_upper_bound) >= 0;
+  }
+
+ private:
+  TableCache* table_cache_;
+  const ReadOptions read_options_;
+  const EnvOptions& env_options_;
+  const InternalKeyComparator& icomparator_;
+  HistogramImpl* file_read_hist_;
+  bool for_compaction_;
+  bool skip_filters_;
+  int level_;
+  RangeDelAggregator* range_del_agg_;
+};
+
+// A wrapper of version builder which references the current version in
+// constructor and unref it in the destructor.
+// Both of the constructor and destructor need to be called inside DB Mutex.
+class BaseReferencedVersionBuilder {
+ public:
+  explicit BaseReferencedVersionBuilder(ColumnFamilyData* cfd)
+      : version_builder_(new VersionBuilder(
+            cfd->current()->version_set()->env_options(), cfd->table_cache(),
+            cfd->current()->storage_info(), cfd->ioptions()->info_log)),
+        version_(cfd->current()) {
+    version_->Ref();
+  }
+  ~BaseReferencedVersionBuilder() {
+    delete version_builder_;
+    version_->Unref();
+  }
+  VersionBuilder* version_builder() { return version_builder_; }
+
+ private:
+  VersionBuilder* version_builder_;
+  Version* version_;
+};
+}  // anonymous namespace
+
+Status Version::GetTableProperties(std::shared_ptr<const TableProperties>* tp,
+                                   const FileMetaData* file_meta,
+                                   const std::string* fname) const {
+  auto table_cache = cfd_->table_cache();
+  auto ioptions = cfd_->ioptions();
+  Status s = table_cache->GetTableProperties(
+      vset_->env_options_, cfd_->internal_comparator(), file_meta->fd,
+      tp, true /* no io */);
+  if (s.ok()) {
+    return s;
+  }
+
+  // We only ignore error type `Incomplete` since it's by design that we
+  // disallow table when it's not in table cache.
+  if (!s.IsIncomplete()) {
+    return s;
+  }
+
+  // 2. Table is not present in table cache, we'll read the table properties
+  // directly from the properties block in the file.
+  std::unique_ptr<RandomAccessFile> file;
+  std::string file_name;
+  if (fname != nullptr) {
+    file_name = *fname;
+  } else {
+    file_name =
+      TableFileName(vset_->db_options_->db_paths, file_meta->fd.GetNumber(),
+                    file_meta->fd.GetPathId());
+  }
+  s = ioptions->env->NewRandomAccessFile(file_name, &file, vset_->env_options_);
+  if (!s.ok()) {
+    return s;
+  }
+
+  TableProperties* raw_table_properties;
+  // By setting the magic number to kInvalidTableMagicNumber, we can by
+  // pass the magic number check in the footer.
+  std::unique_ptr<RandomAccessFileReader> file_reader(
+      new RandomAccessFileReader(std::move(file), file_name));
+  s = ReadTableProperties(
+      file_reader.get(), file_meta->fd.GetFileSize(),
+      Footer::kInvalidTableMagicNumber /* table's magic number */, *ioptions, &raw_table_properties);
+  if (!s.ok()) {
+    return s;
+  }
+  RecordTick(ioptions->statistics, NUMBER_DIRECT_LOAD_TABLE_PROPERTIES);
+
+  *tp = std::shared_ptr<const TableProperties>(raw_table_properties);
+  return s;
+}
+
+Status Version::GetPropertiesOfAllTables(TablePropertiesCollection* props) {
+  Status s;
+  for (int level = 0; level < storage_info_.num_levels_; level++) {
+    s = GetPropertiesOfAllTables(props, level);
+    if (!s.ok()) {
+      return s;
+    }
+  }
+
+  return Status::OK();
+}
+
+Status Version::GetPropertiesOfAllTables(TablePropertiesCollection* props,
+                                         int level) {
+  for (const auto& file_meta : storage_info_.files_[level]) {
+    auto fname =
+        TableFileName(vset_->db_options_->db_paths, file_meta->fd.GetNumber(),
+                      file_meta->fd.GetPathId());
+    // 1. If the table is already present in table cache, load table
+    // properties from there.
+    std::shared_ptr<const TableProperties> table_properties;
+    Status s = GetTableProperties(&table_properties, file_meta, &fname);
+    if (s.ok()) {
+      props->insert({fname, table_properties});
+    } else {
+      return s;
+    }
+  }
+
+  return Status::OK();
+}
+
+Status Version::GetPropertiesOfTablesInRange(
+    const Range* range, std::size_t n, TablePropertiesCollection* props) const {
+  for (int level = 0; level < storage_info_.num_non_empty_levels(); level++) {
+    for (decltype(n) i = 0; i < n; i++) {
+      // Convert user_key into a corresponding internal key.
+      InternalKey k1(range[i].start, kMaxSequenceNumber, kValueTypeForSeek);
+      InternalKey k2(range[i].limit, kMaxSequenceNumber, kValueTypeForSeek);
+      std::vector<FileMetaData*> files;
+      storage_info_.GetOverlappingInputs(level, &k1, &k2, &files, -1, nullptr,
+                                         false);
+      for (const auto& file_meta : files) {
+        auto fname =
+            TableFileName(vset_->db_options_->db_paths,
+                          file_meta->fd.GetNumber(), file_meta->fd.GetPathId());
+        if (props->count(fname) == 0) {
+          // 1. If the table is already present in table cache, load table
+          // properties from there.
+          std::shared_ptr<const TableProperties> table_properties;
+          Status s = GetTableProperties(&table_properties, file_meta, &fname);
+          if (s.ok()) {
+            props->insert({fname, table_properties});
+          } else {
+            return s;
+          }
+        }
+      }
+    }
+  }
+
+  return Status::OK();
+}
+
+Status Version::GetAggregatedTableProperties(
+    std::shared_ptr<const TableProperties>* tp, int level) {
+  TablePropertiesCollection props;
+  Status s;
+  if (level < 0) {
+    s = GetPropertiesOfAllTables(&props);
+  } else {
+    s = GetPropertiesOfAllTables(&props, level);
+  }
+  if (!s.ok()) {
+    return s;
+  }
+
+  auto* new_tp = new TableProperties();
+  for (const auto& item : props) {
+    new_tp->Add(*item.second);
+  }
+  tp->reset(new_tp);
+  return Status::OK();
+}
+
+size_t Version::GetMemoryUsageByTableReaders() {
+  size_t total_usage = 0;
+  for (auto& file_level : storage_info_.level_files_brief_) {
+    for (size_t i = 0; i < file_level.num_files; i++) {
+      total_usage += cfd_->table_cache()->GetMemoryUsageByTableReader(
+          vset_->env_options_, cfd_->internal_comparator(),
+          file_level.files[i].fd);
+    }
+  }
+  return total_usage;
+}
+
+void Version::GetColumnFamilyMetaData(ColumnFamilyMetaData* cf_meta) {
+  assert(cf_meta);
+  assert(cfd_);
+
+  cf_meta->name = cfd_->GetName();
+  cf_meta->size = 0;
+  cf_meta->file_count = 0;
+  cf_meta->levels.clear();
+
+  auto* ioptions = cfd_->ioptions();
+  auto* vstorage = storage_info();
+
+  for (int level = 0; level < cfd_->NumberLevels(); level++) {
+    uint64_t level_size = 0;
+    cf_meta->file_count += vstorage->LevelFiles(level).size();
+    std::vector<SstFileMetaData> files;
+    for (const auto& file : vstorage->LevelFiles(level)) {
+      uint32_t path_id = file->fd.GetPathId();
+      std::string file_path;
+      if (path_id < ioptions->db_paths.size()) {
+        file_path = ioptions->db_paths[path_id].path;
+      } else {
+        assert(!ioptions->db_paths.empty());
+        file_path = ioptions->db_paths.back().path;
+      }
+      files.emplace_back(
+          MakeTableFileName("", file->fd.GetNumber()), file_path,
+          file->fd.GetFileSize(), file->smallest_seqno, file->largest_seqno,
+          file->smallest.user_key().ToString(),
+          file->largest.user_key().ToString(),
+          file->stats.num_reads_sampled.load(std::memory_order_relaxed),
+          file->being_compacted);
+      level_size += file->fd.GetFileSize();
+    }
+    cf_meta->levels.emplace_back(
+        level, level_size, std::move(files));
+    cf_meta->size += level_size;
+  }
+}
+
+
+uint64_t VersionStorageInfo::GetEstimatedActiveKeys() const {
+  // Estimation will be inaccurate when:
+  // (1) there exist merge keys
+  // (2) keys are directly overwritten
+  // (3) deletion on non-existing keys
+  // (4) low number of samples
+  if (current_num_samples_ == 0) {
+    return 0;
+  }
+
+  if (current_num_non_deletions_ <= current_num_deletions_) {
+    return 0;
+  }
+
+  uint64_t est = current_num_non_deletions_ - current_num_deletions_;
+
+  uint64_t file_count = 0;
+  for (int level = 0; level < num_levels_; ++level) {
+    file_count += files_[level].size();
+  }
+
+  if (current_num_samples_ < file_count) {
+    // casting to avoid overflowing
+    return
+      static_cast<uint64_t>(
+        (est * static_cast<double>(file_count) / current_num_samples_)
+      );
+  } else {
+    return est;
+  }
+}
+
+double VersionStorageInfo::GetEstimatedCompressionRatioAtLevel(
+    int level) const {
+  assert(level < num_levels_);
+  uint64_t sum_file_size_bytes = 0;
+  uint64_t sum_data_size_bytes = 0;
+  for (auto* file_meta : files_[level]) {
+    sum_file_size_bytes += file_meta->fd.GetFileSize();
+    sum_data_size_bytes += file_meta->raw_key_size + file_meta->raw_value_size;
+  }
+  if (sum_file_size_bytes == 0) {
+    return -1.0;
+  }
+  return static_cast<double>(sum_data_size_bytes) / sum_file_size_bytes;
+}
+
+void Version::AddIterators(const ReadOptions& read_options,
+                           const EnvOptions& soptions,
+                           MergeIteratorBuilder* merge_iter_builder,
+                           RangeDelAggregator* range_del_agg) {
+  assert(storage_info_.finalized_);
+
+  for (int level = 0; level < storage_info_.num_non_empty_levels(); level++) {
+    AddIteratorsForLevel(read_options, soptions, merge_iter_builder, level,
+                         range_del_agg);
+  }
+}
+
+void Version::AddIteratorsForLevel(const ReadOptions& read_options,
+                                   const EnvOptions& soptions,
+                                   MergeIteratorBuilder* merge_iter_builder,
+                                   int level,
+                                   RangeDelAggregator* range_del_agg) {
+  assert(storage_info_.finalized_);
+  if (level >= storage_info_.num_non_empty_levels()) {
+    // This is an empty level
+    return;
+  } else if (storage_info_.LevelFilesBrief(level).num_files == 0) {
+    // No files in this level
+    return;
+  }
+
+  bool should_sample = should_sample_file_read();
+
+  auto* arena = merge_iter_builder->GetArena();
+  if (level == 0) {
+    // Merge all level zero files together since they may overlap
+    for (size_t i = 0; i < storage_info_.LevelFilesBrief(0).num_files; i++) {
+      const auto& file = storage_info_.LevelFilesBrief(0).files[i];
+      merge_iter_builder->AddIterator(cfd_->table_cache()->NewIterator(
+          read_options, soptions, cfd_->internal_comparator(), file.fd,
+          range_del_agg, nullptr, cfd_->internal_stats()->GetFileReadHist(0),
+          false, arena, false /* skip_filters */, 0 /* level */));
+    }
+    if (should_sample) {
+      // Count ones for every L0 files. This is done per iterator creation
+      // rather than Seek(), while files in other levels are recored per seek.
+      // If users execute one range query per iterator, there may be some
+      // discrepancy here.
+      for (FileMetaData* meta : storage_info_.LevelFiles(0)) {
+        sample_file_read_inc(meta);
+      }
+    }
+  } else {
+    // For levels > 0, we can use a concatenating iterator that sequentially
+    // walks through the non-overlapping files in the level, opening them
+    // lazily.
+    auto* mem = arena->AllocateAligned(sizeof(LevelFileIteratorState));
+    auto* state = new (mem)
+        LevelFileIteratorState(cfd_->table_cache(), read_options, soptions,
+                               cfd_->internal_comparator(),
+                               cfd_->internal_stats()->GetFileReadHist(level),
+                               false /* for_compaction */,
+                               cfd_->ioptions()->prefix_extractor != nullptr,
+                               IsFilterSkipped(level), level, range_del_agg);
+    mem = arena->AllocateAligned(sizeof(LevelFileNumIterator));
+    auto* first_level_iter = new (mem) LevelFileNumIterator(
+        cfd_->internal_comparator(), &storage_info_.LevelFilesBrief(level),
+        should_sample_file_read());
+    merge_iter_builder->AddIterator(
+        NewTwoLevelIterator(state, first_level_iter, arena, false));
+  }
+}
+
+void Version::AddRangeDelIteratorsForLevel(
+    const ReadOptions& read_options, const EnvOptions& soptions, int level,
+    std::vector<InternalIterator*>* range_del_iters) {
+  range_del_iters->clear();
+  for (size_t i = 0; i < storage_info_.LevelFilesBrief(level).num_files; i++) {
+    const auto& file = storage_info_.LevelFilesBrief(level).files[i];
+    auto* range_del_iter = cfd_->table_cache()->NewRangeTombstoneIterator(
+        read_options, soptions, cfd_->internal_comparator(), file.fd,
+        cfd_->internal_stats()->GetFileReadHist(level),
+        false /* skip_filters */, level);
+    if (range_del_iter != nullptr) {
+      range_del_iters->push_back(range_del_iter);
+    }
+  }
+}
+
+VersionStorageInfo::VersionStorageInfo(
+    const InternalKeyComparator* internal_comparator,
+    const Comparator* user_comparator, int levels,
+    CompactionStyle compaction_style, VersionStorageInfo* ref_vstorage,
+    bool _force_consistency_checks)
+    : internal_comparator_(internal_comparator),
+      user_comparator_(user_comparator),
+      // cfd is nullptr if Version is dummy
+      num_levels_(levels),
+      num_non_empty_levels_(0),
+      file_indexer_(user_comparator),
+      compaction_style_(compaction_style),
+      files_(new std::vector<FileMetaData*>[num_levels_]),
+      base_level_(num_levels_ == 1 ? -1 : 1),
+      files_by_compaction_pri_(num_levels_),
+      level0_non_overlapping_(false),
+      next_file_to_compact_by_size_(num_levels_),
+      compaction_score_(num_levels_),
+      compaction_level_(num_levels_),
+      l0_delay_trigger_count_(0),
+      accumulated_file_size_(0),
+      accumulated_raw_key_size_(0),
+      accumulated_raw_value_size_(0),
+      accumulated_num_non_deletions_(0),
+      accumulated_num_deletions_(0),
+      current_num_non_deletions_(0),
+      current_num_deletions_(0),
+      current_num_samples_(0),
+      estimated_compaction_needed_bytes_(0),
+      finalized_(false),
+      force_consistency_checks_(_force_consistency_checks) {
+  if (ref_vstorage != nullptr) {
+    accumulated_file_size_ = ref_vstorage->accumulated_file_size_;
+    accumulated_raw_key_size_ = ref_vstorage->accumulated_raw_key_size_;
+    accumulated_raw_value_size_ = ref_vstorage->accumulated_raw_value_size_;
+    accumulated_num_non_deletions_ =
+        ref_vstorage->accumulated_num_non_deletions_;
+    accumulated_num_deletions_ = ref_vstorage->accumulated_num_deletions_;
+    current_num_non_deletions_ = ref_vstorage->current_num_non_deletions_;
+    current_num_deletions_ = ref_vstorage->current_num_deletions_;
+    current_num_samples_ = ref_vstorage->current_num_samples_;
+  }
+}
+
+Version::Version(ColumnFamilyData* column_family_data, VersionSet* vset,
+                 uint64_t version_number)
+    : env_(vset->env_),
+      cfd_(column_family_data),
+      info_log_((cfd_ == nullptr) ? nullptr : cfd_->ioptions()->info_log),
+      db_statistics_((cfd_ == nullptr) ? nullptr
+                                       : cfd_->ioptions()->statistics),
+      table_cache_((cfd_ == nullptr) ? nullptr : cfd_->table_cache()),
+      merge_operator_((cfd_ == nullptr) ? nullptr
+                                        : cfd_->ioptions()->merge_operator),
+      storage_info_(
+          (cfd_ == nullptr) ? nullptr : &cfd_->internal_comparator(),
+          (cfd_ == nullptr) ? nullptr : cfd_->user_comparator(),
+          cfd_ == nullptr ? 0 : cfd_->NumberLevels(),
+          cfd_ == nullptr ? kCompactionStyleLevel
+                          : cfd_->ioptions()->compaction_style,
+          (cfd_ == nullptr || cfd_->current() == nullptr)
+              ? nullptr
+              : cfd_->current()->storage_info(),
+          cfd_ == nullptr ? false : cfd_->ioptions()->force_consistency_checks),
+      vset_(vset),
+      next_(this),
+      prev_(this),
+      refs_(0),
+      version_number_(version_number) {}
+
+void Version::Get(const ReadOptions& read_options, const LookupKey& k,
+                  PinnableSlice* value, Status* status,
+                  MergeContext* merge_context,
+                  RangeDelAggregator* range_del_agg, bool* value_found,
+                  bool* key_exists, SequenceNumber* seq) {
+  Slice ikey = k.internal_key();
+  Slice user_key = k.user_key();
+
+  assert(status->ok() || status->IsMergeInProgress());
+
+  if (key_exists != nullptr) {
+    // will falsify below if not found
+    *key_exists = true;
+  }
+
+  PinnedIteratorsManager pinned_iters_mgr;
+  GetContext get_context(
+      user_comparator(), merge_operator_, info_log_, db_statistics_,
+      status->ok() ? GetContext::kNotFound : GetContext::kMerge, user_key,
+      value, value_found, merge_context, range_del_agg, this->env_, seq,
+      merge_operator_ ? &pinned_iters_mgr : nullptr);
+
+  // Pin blocks that we read to hold merge operands
+  if (merge_operator_) {
+    pinned_iters_mgr.StartPinning();
+  }
+
+  FilePicker fp(
+      storage_info_.files_, user_key, ikey, &storage_info_.level_files_brief_,
+      storage_info_.num_non_empty_levels_, &storage_info_.file_indexer_,
+      user_comparator(), internal_comparator());
+  FdWithKeyRange* f = fp.GetNextFile();
+  while (f != nullptr) {
+    if (get_context.sample()) {
+      sample_file_read_inc(f->file_metadata);
+    }
+    *status = table_cache_->Get(
+        read_options, *internal_comparator(), f->fd, ikey, &get_context,
+        cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()),
+        IsFilterSkipped(static_cast<int>(fp.GetHitFileLevel()),
+                        fp.IsHitFileLastInLevel()),
+        fp.GetCurrentLevel());
+    // TODO: examine the behavior for corrupted key
+    if (!status->ok()) {
+      return;
+    }
+
+    switch (get_context.State()) {
+      case GetContext::kNotFound:
+        // Keep searching in other files
+        break;
+      case GetContext::kFound:
+        if (fp.GetHitFileLevel() == 0) {
+          RecordTick(db_statistics_, GET_HIT_L0);
+        } else if (fp.GetHitFileLevel() == 1) {
+          RecordTick(db_statistics_, GET_HIT_L1);
+        } else if (fp.GetHitFileLevel() >= 2) {
+          RecordTick(db_statistics_, GET_HIT_L2_AND_UP);
+        }
+        return;
+      case GetContext::kDeleted:
+        // Use empty error message for speed
+        *status = Status::NotFound();
+        return;
+      case GetContext::kCorrupt:
+        *status = Status::Corruption("corrupted key for ", user_key);
+        return;
+      case GetContext::kMerge:
+        break;
+    }
+    f = fp.GetNextFile();
+  }
+
+  if (GetContext::kMerge == get_context.State()) {
+    if (!merge_operator_) {
+      *status =  Status::InvalidArgument(
+          "merge_operator is not properly initialized.");
+      return;
+    }
+    // merge_operands are in saver and we hit the beginning of the key history
+    // do a final merge of nullptr and operands;
+    std::string* str_value = value != nullptr ? value->GetSelf() : nullptr;
+    *status = MergeHelper::TimedFullMerge(
+        merge_operator_, user_key, nullptr, merge_context->GetOperands(),
+        str_value, info_log_, db_statistics_, env_,
+        nullptr /* result_operand */, true);
+    if (LIKELY(value != nullptr)) {
+      value->PinSelf();
+    }
+  } else {
+    if (key_exists != nullptr) {
+      *key_exists = false;
+    }
+    *status = Status::NotFound(); // Use an empty error message for speed
+  }
+}
+
+bool Version::IsFilterSkipped(int level, bool is_file_last_in_level) {
+  // Reaching the bottom level implies misses at all upper levels, so we'll
+  // skip checking the filters when we predict a hit.
+  return cfd_->ioptions()->optimize_filters_for_hits &&
+         (level > 0 || is_file_last_in_level) &&
+         level == storage_info_.num_non_empty_levels() - 1;
+}
+
+void VersionStorageInfo::GenerateLevelFilesBrief() {
+  level_files_brief_.resize(num_non_empty_levels_);
+  for (int level = 0; level < num_non_empty_levels_; level++) {
+    DoGenerateLevelFilesBrief(
+        &level_files_brief_[level], files_[level], &arena_);
+  }
+}
+
+void Version::PrepareApply(
+    const MutableCFOptions& mutable_cf_options,
+    bool update_stats) {
+  UpdateAccumulatedStats(update_stats);
+  storage_info_.UpdateNumNonEmptyLevels();
+  storage_info_.CalculateBaseBytes(*cfd_->ioptions(), mutable_cf_options);
+  storage_info_.UpdateFilesByCompactionPri(cfd_->ioptions()->compaction_pri);
+  storage_info_.GenerateFileIndexer();
+  storage_info_.GenerateLevelFilesBrief();
+  storage_info_.GenerateLevel0NonOverlapping();
+}
+
+bool Version::MaybeInitializeFileMetaData(FileMetaData* file_meta) {
+  if (file_meta->init_stats_from_file ||
+      file_meta->compensated_file_size > 0) {
+    return false;
+  }
+  std::shared_ptr<const TableProperties> tp;
+  Status s = GetTableProperties(&tp, file_meta);
+  file_meta->init_stats_from_file = true;
+  if (!s.ok()) {
+    ROCKS_LOG_ERROR(vset_->db_options_->info_log,
+                    "Unable to load table properties for file %" PRIu64
+                    " --- %s\n",
+                    file_meta->fd.GetNumber(), s.ToString().c_str());
+    return false;
+  }
+  if (tp.get() == nullptr) return false;
+  file_meta->num_entries = tp->num_entries;
+  file_meta->num_deletions = GetDeletedKeys(tp->user_collected_properties);
+  file_meta->raw_value_size = tp->raw_value_size;
+  file_meta->raw_key_size = tp->raw_key_size;
+
+  return true;
+}
+
+void VersionStorageInfo::UpdateAccumulatedStats(FileMetaData* file_meta) {
+  assert(file_meta->init_stats_from_file);
+  accumulated_file_size_ += file_meta->fd.GetFileSize();
+  accumulated_raw_key_size_ += file_meta->raw_key_size;
+  accumulated_raw_value_size_ += file_meta->raw_value_size;
+  accumulated_num_non_deletions_ +=
+      file_meta->num_entries - file_meta->num_deletions;
+  accumulated_num_deletions_ += file_meta->num_deletions;
+
+  current_num_non_deletions_ +=
+      file_meta->num_entries - file_meta->num_deletions;
+  current_num_deletions_ += file_meta->num_deletions;
+  current_num_samples_++;
+}
+
+void VersionStorageInfo::RemoveCurrentStats(FileMetaData* file_meta) {
+  if (file_meta->init_stats_from_file) {
+    current_num_non_deletions_ -=
+        file_meta->num_entries - file_meta->num_deletions;
+    current_num_deletions_ -= file_meta->num_deletions;
+    current_num_samples_--;
+  }
+}
+
+void Version::UpdateAccumulatedStats(bool update_stats) {
+  if (update_stats) {
+    // maximum number of table properties loaded from files.
+    const int kMaxInitCount = 20;
+    int init_count = 0;
+    // here only the first kMaxInitCount files which haven't been
+    // initialized from file will be updated with num_deletions.
+    // The motivation here is to cap the maximum I/O per Version creation.
+    // The reason for choosing files from lower-level instead of higher-level
+    // is that such design is able to propagate the initialization from
+    // lower-level to higher-level:  When the num_deletions of lower-level
+    // files are updated, it will make the lower-level files have accurate
+    // compensated_file_size, making lower-level to higher-level compaction
+    // will be triggered, which creates higher-level files whose num_deletions
+    // will be updated here.
+    for (int level = 0;
+         level < storage_info_.num_levels_ && init_count < kMaxInitCount;
+         ++level) {
+      for (auto* file_meta : storage_info_.files_[level]) {
+        if (MaybeInitializeFileMetaData(file_meta)) {
+          // each FileMeta will be initialized only once.
+          storage_info_.UpdateAccumulatedStats(file_meta);
+          // when option "max_open_files" is -1, all the file metadata has
+          // already been read, so MaybeInitializeFileMetaData() won't incur
+          // any I/O cost. "max_open_files=-1" means that the table cache passed
+          // to the VersionSet and then to the ColumnFamilySet has a size of
+          // TableCache::kInfiniteCapacity
+          if (vset_->GetColumnFamilySet()->get_table_cache()->GetCapacity() ==
+              TableCache::kInfiniteCapacity) {
+            continue;
+          }
+          if (++init_count >= kMaxInitCount) {
+            break;
+          }
+        }
+      }
+    }
+    // In case all sampled-files contain only deletion entries, then we
+    // load the table-property of a file in higher-level to initialize
+    // that value.
+    for (int level = storage_info_.num_levels_ - 1;
+         storage_info_.accumulated_raw_value_size_ == 0 && level >= 0;
+         --level) {
+      for (int i = static_cast<int>(storage_info_.files_[level].size()) - 1;
+           storage_info_.accumulated_raw_value_size_ == 0 && i >= 0; --i) {
+        if (MaybeInitializeFileMetaData(storage_info_.files_[level][i])) {
+          storage_info_.UpdateAccumulatedStats(storage_info_.files_[level][i]);
+        }
+      }
+    }
+  }
+
+  storage_info_.ComputeCompensatedSizes();
+}
+
+void VersionStorageInfo::ComputeCompensatedSizes() {
+  static const int kDeletionWeightOnCompaction = 2;
+  uint64_t average_value_size = GetAverageValueSize();
+
+  // compute the compensated size
+  for (int level = 0; level < num_levels_; level++) {
+    for (auto* file_meta : files_[level]) {
+      // Here we only compute compensated_file_size for those file_meta
+      // which compensated_file_size is uninitialized (== 0). This is true only
+      // for files that have been created right now and no other thread has
+      // access to them. That's why we can safely mutate compensated_file_size.
+      if (file_meta->compensated_file_size == 0) {
+        file_meta->compensated_file_size = file_meta->fd.GetFileSize();
+        // Here we only boost the size of deletion entries of a file only
+        // when the number of deletion entries is greater than the number of
+        // non-deletion entries in the file.  The motivation here is that in
+        // a stable workload, the number of deletion entries should be roughly
+        // equal to the number of non-deletion entries.  If we compensate the
+        // size of deletion entries in a stable workload, the deletion
+        // compensation logic might introduce unwanted effet which changes the
+        // shape of LSM tree.
+        if (file_meta->num_deletions * 2 >= file_meta->num_entries) {
+          file_meta->compensated_file_size +=
+              (file_meta->num_deletions * 2 - file_meta->num_entries) *
+              average_value_size * kDeletionWeightOnCompaction;
+        }
+      }
+    }
+  }
+}
+
+int VersionStorageInfo::MaxInputLevel() const {
+  if (compaction_style_ == kCompactionStyleLevel) {
+    return num_levels() - 2;
+  }
+  return 0;
+}
+
+int VersionStorageInfo::MaxOutputLevel(bool allow_ingest_behind) const {
+  if (allow_ingest_behind) {
+    assert(num_levels() > 1);
+    return num_levels() - 2;
+  }
+  return num_levels() - 1;
+}
+
+void VersionStorageInfo::EstimateCompactionBytesNeeded(
+    const MutableCFOptions& mutable_cf_options) {
+  // Only implemented for level-based compaction
+  if (compaction_style_ != kCompactionStyleLevel) {
+    estimated_compaction_needed_bytes_ = 0;
+    return;
+  }
+
+  // Start from Level 0, if level 0 qualifies compaction to level 1,
+  // we estimate the size of compaction.
+  // Then we move on to the next level and see whether it qualifies compaction
+  // to the next level. The size of the level is estimated as the actual size
+  // on the level plus the input bytes from the previous level if there is any.
+  // If it exceeds, take the exceeded bytes as compaction input and add the size
+  // of the compaction size to tatal size.
+  // We keep doing it to Level 2, 3, etc, until the last level and return the
+  // accumulated bytes.
+
+  uint64_t bytes_compact_to_next_level = 0;
+  uint64_t level_size = 0;
+  for (auto* f : files_[0]) {
+    level_size += f->fd.GetFileSize();
+  }
+  // Level 0
+  bool level0_compact_triggered = false;
+  if (static_cast<int>(files_[0].size()) >=
+          mutable_cf_options.level0_file_num_compaction_trigger ||
+      level_size >= mutable_cf_options.max_bytes_for_level_base) {
+    level0_compact_triggered = true;
+    estimated_compaction_needed_bytes_ = level_size;
+    bytes_compact_to_next_level = level_size;
+  } else {
+    estimated_compaction_needed_bytes_ = 0;
+  }
+
+  // Level 1 and up.
+  uint64_t bytes_next_level = 0;
+  for (int level = base_level(); level <= MaxInputLevel(); level++) {
+    level_size = 0;
+    if (bytes_next_level > 0) {
+#ifndef NDEBUG
+      uint64_t level_size2 = 0;
+      for (auto* f : files_[level]) {
+        level_size2 += f->fd.GetFileSize();
+      }
+      assert(level_size2 == bytes_next_level);
+#endif
+      level_size = bytes_next_level;
+      bytes_next_level = 0;
+    } else {
+      for (auto* f : files_[level]) {
+        level_size += f->fd.GetFileSize();
+      }
+    }
+    if (level == base_level() && level0_compact_triggered) {
+      // Add base level size to compaction if level0 compaction triggered.
+      estimated_compaction_needed_bytes_ += level_size;
+    }
+    // Add size added by previous compaction
+    level_size += bytes_compact_to_next_level;
+    bytes_compact_to_next_level = 0;
+    uint64_t level_target = MaxBytesForLevel(level);
+    if (level_size > level_target) {
+      bytes_compact_to_next_level = level_size - level_target;
+      // Estimate the actual compaction fan-out ratio as size ratio between
+      // the two levels.
+
+      assert(bytes_next_level == 0);
+      if (level + 1 < num_levels_) {
+        for (auto* f : files_[level + 1]) {
+          bytes_next_level += f->fd.GetFileSize();
+        }
+      }
+      if (bytes_next_level > 0) {
+        assert(level_size > 0);
+        estimated_compaction_needed_bytes_ += static_cast<uint64_t>(
+            static_cast<double>(bytes_compact_to_next_level) *
+            (static_cast<double>(bytes_next_level) /
+                 static_cast<double>(level_size) +
+             1));
+      }
+    }
+  }
+}
+
+namespace {
+uint32_t GetExpiredTtlFilesCount(const ImmutableCFOptions& ioptions,
+                                 const std::vector<FileMetaData*>& files) {
+  uint32_t ttl_expired_files_count = 0;
+
+  int64_t _current_time;
+  auto status = ioptions.env->GetCurrentTime(&_current_time);
+  if (status.ok()) {
+    const uint64_t current_time = static_cast<uint64_t>(_current_time);
+    for (auto f : files) {
+      if (!f->being_compacted && f->fd.table_reader != nullptr &&
+          f->fd.table_reader->GetTableProperties() != nullptr) {
+        auto creation_time =
+            f->fd.table_reader->GetTableProperties()->creation_time;
+        if (creation_time > 0 &&
+            creation_time <
+                (current_time - ioptions.compaction_options_fifo.ttl)) {
+          ttl_expired_files_count++;
+        }
+      }
+    }
+  }
+  return ttl_expired_files_count;
+}
+}  // anonymous namespace
+
+void VersionStorageInfo::ComputeCompactionScore(
+    const ImmutableCFOptions& immutable_cf_options,
+    const MutableCFOptions& mutable_cf_options) {
+  for (int level = 0; level <= MaxInputLevel(); level++) {
+    double score;
+    if (level == 0) {
+      // We treat level-0 specially by bounding the number of files
+      // instead of number of bytes for two reasons:
+      //
+      // (1) With larger write-buffer sizes, it is nice not to do too
+      // many level-0 compactions.
+      //
+      // (2) The files in level-0 are merged on every read and
+      // therefore we wish to avoid too many files when the individual
+      // file size is small (perhaps because of a small write-buffer
+      // setting, or very high compression ratios, or lots of
+      // overwrites/deletions).
+      int num_sorted_runs = 0;
+      uint64_t total_size = 0;
+      for (auto* f : files_[level]) {
+        if (!f->being_compacted) {
+          total_size += f->compensated_file_size;
+          num_sorted_runs++;
+        }
+      }
+      if (compaction_style_ == kCompactionStyleUniversal) {
+        // For universal compaction, we use level0 score to indicate
+        // compaction score for the whole DB. Adding other levels as if
+        // they are L0 files.
+        for (int i = 1; i < num_levels(); i++) {
+          if (!files_[i].empty() && !files_[i][0]->being_compacted) {
+            num_sorted_runs++;
+          }
+        }
+      }
+
+      if (compaction_style_ == kCompactionStyleFIFO) {
+        score =
+            static_cast<double>(total_size) /
+            immutable_cf_options.compaction_options_fifo.max_table_files_size;
+        if (immutable_cf_options.compaction_options_fifo.allow_compaction) {
+          score = std::max(
+              static_cast<double>(num_sorted_runs) /
+                  mutable_cf_options.level0_file_num_compaction_trigger,
+              score);
+        }
+        if (immutable_cf_options.compaction_options_fifo.ttl > 0) {
+          score = std::max(static_cast<double>(GetExpiredTtlFilesCount(
+                               immutable_cf_options, files_[level])),
+                           score);
+        }
+
+      } else {
+        score = static_cast<double>(num_sorted_runs) /
+                mutable_cf_options.level0_file_num_compaction_trigger;
+        if (compaction_style_ == kCompactionStyleLevel && num_levels() > 1) {
+          // Level-based involves L0->L0 compactions that can lead to oversized
+          // L0 files. Take into account size as well to avoid later giant
+          // compactions to the base level.
+          score = std::max(
+              score, static_cast<double>(total_size) /
+                     mutable_cf_options.max_bytes_for_level_base);
+        }
+      }
+    } else {
+      // Compute the ratio of current size to size limit.
+      uint64_t level_bytes_no_compacting = 0;
+      for (auto f : files_[level]) {
+        if (!f->being_compacted) {
+          level_bytes_no_compacting += f->compensated_file_size;
+        }
+      }
+      score = static_cast<double>(level_bytes_no_compacting) /
+              MaxBytesForLevel(level);
+    }
+    compaction_level_[level] = level;
+    compaction_score_[level] = score;
+  }
+
+  // sort all the levels based on their score. Higher scores get listed
+  // first. Use bubble sort because the number of entries are small.
+  for (int i = 0; i < num_levels() - 2; i++) {
+    for (int j = i + 1; j < num_levels() - 1; j++) {
+      if (compaction_score_[i] < compaction_score_[j]) {
+        double score = compaction_score_[i];
+        int level = compaction_level_[i];
+        compaction_score_[i] = compaction_score_[j];
+        compaction_level_[i] = compaction_level_[j];
+        compaction_score_[j] = score;
+        compaction_level_[j] = level;
+      }
+    }
+  }
+  ComputeFilesMarkedForCompaction();
+  EstimateCompactionBytesNeeded(mutable_cf_options);
+}
+
+void VersionStorageInfo::ComputeFilesMarkedForCompaction() {
+  files_marked_for_compaction_.clear();
+  int last_qualify_level = 0;
+
+  // Do not include files from the last level with data
+  // If table properties collector suggests a file on the last level,
+  // we should not move it to a new level.
+  for (int level = num_levels() - 1; level >= 1; level--) {
+    if (!files_[level].empty()) {
+      last_qualify_level = level - 1;
+      break;
+    }
+  }
+
+  for (int level = 0; level <= last_qualify_level; level++) {
+    for (auto* f : files_[level]) {
+      if (!f->being_compacted && f->marked_for_compaction) {
+        files_marked_for_compaction_.emplace_back(level, f);
+      }
+    }
+  }
+}
+
+namespace {
+
+// used to sort files by size
+struct Fsize {
+  size_t index;
+  FileMetaData* file;
+};
+
+// Compator that is used to sort files based on their size
+// In normal mode: descending size
+bool CompareCompensatedSizeDescending(const Fsize& first, const Fsize& second) {
+  return (first.file->compensated_file_size >
+      second.file->compensated_file_size);
+}
+} // anonymous namespace
+
+void VersionStorageInfo::AddFile(int level, FileMetaData* f, Logger* info_log) {
+  auto* level_files = &files_[level];
+  // Must not overlap
+#ifndef NDEBUG
+  if (level > 0 && !level_files->empty() &&
+      internal_comparator_->Compare(
+          (*level_files)[level_files->size() - 1]->largest, f->smallest) >= 0) {
+    auto* f2 = (*level_files)[level_files->size() - 1];
+    if (info_log != nullptr) {
+      Error(info_log, "Adding new file %" PRIu64
+                      " range (%s, %s) to level %d but overlapping "
+                      "with existing file %" PRIu64 " %s %s",
+            f->fd.GetNumber(), f->smallest.DebugString(true).c_str(),
+            f->largest.DebugString(true).c_str(), level, f2->fd.GetNumber(),
+            f2->smallest.DebugString(true).c_str(),
+            f2->largest.DebugString(true).c_str());
+      LogFlush(info_log);
+    }
+    assert(false);
+  }
+#endif
+  f->refs++;
+  level_files->push_back(f);
+}
+
+// Version::PrepareApply() need to be called before calling the function, or
+// following functions called:
+// 1. UpdateNumNonEmptyLevels();
+// 2. CalculateBaseBytes();
+// 3. UpdateFilesByCompactionPri();
+// 4. GenerateFileIndexer();
+// 5. GenerateLevelFilesBrief();
+// 6. GenerateLevel0NonOverlapping();
+void VersionStorageInfo::SetFinalized() {
+  finalized_ = true;
+#ifndef NDEBUG
+  if (compaction_style_ != kCompactionStyleLevel) {
+    // Not level based compaction.
+    return;
+  }
+  assert(base_level_ < 0 || num_levels() == 1 ||
+         (base_level_ >= 1 && base_level_ < num_levels()));
+  // Verify all levels newer than base_level are empty except L0
+  for (int level = 1; level < base_level(); level++) {
+    assert(NumLevelBytes(level) == 0);
+  }
+  uint64_t max_bytes_prev_level = 0;
+  for (int level = base_level(); level < num_levels() - 1; level++) {
+    if (LevelFiles(level).size() == 0) {
+      continue;
+    }
+    assert(MaxBytesForLevel(level) >= max_bytes_prev_level);
+    max_bytes_prev_level = MaxBytesForLevel(level);
+  }
+  int num_empty_non_l0_level = 0;
+  for (int level = 0; level < num_levels(); level++) {
+    assert(LevelFiles(level).size() == 0 ||
+           LevelFiles(level).size() == LevelFilesBrief(level).num_files);
+    if (level > 0 && NumLevelBytes(level) > 0) {
+      num_empty_non_l0_level++;
+    }
+    if (LevelFiles(level).size() > 0) {
+      assert(level < num_non_empty_levels());
+    }
+  }
+  assert(compaction_level_.size() > 0);
+  assert(compaction_level_.size() == compaction_score_.size());
+#endif
+}
+
+void VersionStorageInfo::UpdateNumNonEmptyLevels() {
+  num_non_empty_levels_ = num_levels_;
+  for (int i = num_levels_ - 1; i >= 0; i--) {
+    if (files_[i].size() != 0) {
+      return;
+    } else {
+      num_non_empty_levels_ = i;
+    }
+  }
+}
+
+namespace {
+// Sort `temp` based on ratio of overlapping size over file size
+void SortFileByOverlappingRatio(
+    const InternalKeyComparator& icmp, const std::vector<FileMetaData*>& files,
+    const std::vector<FileMetaData*>& next_level_files,
+    std::vector<Fsize>* temp) {
+  std::unordered_map<uint64_t, uint64_t> file_to_order;
+  auto next_level_it = next_level_files.begin();
+
+  for (auto& file : files) {
+    uint64_t overlapping_bytes = 0;
+    // Skip files in next level that is smaller than current file
+    while (next_level_it != next_level_files.end() &&
+           icmp.Compare((*next_level_it)->largest, file->smallest) < 0) {
+      next_level_it++;
+    }
+
+    while (next_level_it != next_level_files.end() &&
+           icmp.Compare((*next_level_it)->smallest, file->largest) < 0) {
+      overlapping_bytes += (*next_level_it)->fd.file_size;
+
+      if (icmp.Compare((*next_level_it)->largest, file->largest) > 0) {
+        // next level file cross large boundary of current file.
+        break;
+      }
+      next_level_it++;
+    }
+
+    assert(file->fd.file_size != 0);
+    file_to_order[file->fd.GetNumber()] =
+        overlapping_bytes * 1024u / file->fd.file_size;
+  }
+
+  std::sort(temp->begin(), temp->end(),
+            [&](const Fsize& f1, const Fsize& f2) -> bool {
+              return file_to_order[f1.file->fd.GetNumber()] <
+                     file_to_order[f2.file->fd.GetNumber()];
+            });
+}
+}  // namespace
+
+void VersionStorageInfo::UpdateFilesByCompactionPri(
+    CompactionPri compaction_pri) {
+  if (compaction_style_ == kCompactionStyleFIFO ||
+      compaction_style_ == kCompactionStyleUniversal) {
+    // don't need this
+    return;
+  }
+  // No need to sort the highest level because it is never compacted.
+  for (int level = 0; level < num_levels() - 1; level++) {
+    const std::vector<FileMetaData*>& files = files_[level];
+    auto& files_by_compaction_pri = files_by_compaction_pri_[level];
+    assert(files_by_compaction_pri.size() == 0);
+
+    // populate a temp vector for sorting based on size
+    std::vector<Fsize> temp(files.size());
+    for (size_t i = 0; i < files.size(); i++) {
+      temp[i].index = i;
+      temp[i].file = files[i];
+    }
+
+    // sort the top number_of_files_to_sort_ based on file size
+    size_t num = VersionStorageInfo::kNumberFilesToSort;
+    if (num > temp.size()) {
+      num = temp.size();
+    }
+    switch (compaction_pri) {
+      case kByCompensatedSize:
+        std::partial_sort(temp.begin(), temp.begin() + num, temp.end(),
+                          CompareCompensatedSizeDescending);
+        break;
+      case kOldestLargestSeqFirst:
+        std::sort(temp.begin(), temp.end(),
+                  [](const Fsize& f1, const Fsize& f2) -> bool {
+                    return f1.file->largest_seqno < f2.file->largest_seqno;
+                  });
+        break;
+      case kOldestSmallestSeqFirst:
+        std::sort(temp.begin(), temp.end(),
+                  [](const Fsize& f1, const Fsize& f2) -> bool {
+                    return f1.file->smallest_seqno < f2.file->smallest_seqno;
+                  });
+        break;
+      case kMinOverlappingRatio:
+        SortFileByOverlappingRatio(*internal_comparator_, files_[level],
+                                   files_[level + 1], &temp);
+        break;
+      default:
+        assert(false);
+    }
+    assert(temp.size() == files.size());
+
+    // initialize files_by_compaction_pri_
+    for (size_t i = 0; i < temp.size(); i++) {
+      files_by_compaction_pri.push_back(static_cast<int>(temp[i].index));
+    }
+    next_file_to_compact_by_size_[level] = 0;
+    assert(files_[level].size() == files_by_compaction_pri_[level].size());
+  }
+}
+
+void VersionStorageInfo::GenerateLevel0NonOverlapping() {
+  assert(!finalized_);
+  level0_non_overlapping_ = true;
+  if (level_files_brief_.size() == 0) {
+    return;
+  }
+
+  // A copy of L0 files sorted by smallest key
+  std::vector<FdWithKeyRange> level0_sorted_file(
+      level_files_brief_[0].files,
+      level_files_brief_[0].files + level_files_brief_[0].num_files);
+  std::sort(level0_sorted_file.begin(), level0_sorted_file.end(),
+            [this](const FdWithKeyRange& f1, const FdWithKeyRange& f2) -> bool {
+              return (internal_comparator_->Compare(f1.smallest_key,
+                                                    f2.smallest_key) < 0);
+            });
+
+  for (size_t i = 1; i < level0_sorted_file.size(); ++i) {
+    FdWithKeyRange& f = level0_sorted_file[i];
+    FdWithKeyRange& prev = level0_sorted_file[i - 1];
+    if (internal_comparator_->Compare(prev.largest_key, f.smallest_key) >= 0) {
+      level0_non_overlapping_ = false;
+      break;
+    }
+  }
+}
+
+void Version::Ref() {
+  ++refs_;
+}
+
+bool Version::Unref() {
+  assert(refs_ >= 1);
+  --refs_;
+  if (refs_ == 0) {
+    delete this;
+    return true;
+  }
+  return false;
+}
+
+bool VersionStorageInfo::OverlapInLevel(int level,
+                                        const Slice* smallest_user_key,
+                                        const Slice* largest_user_key) {
+  if (level >= num_non_empty_levels_) {
+    // empty level, no overlap
+    return false;
+  }
+  return SomeFileOverlapsRange(*internal_comparator_, (level > 0),
+                               level_files_brief_[level], smallest_user_key,
+                               largest_user_key);
+}
+
+// Store in "*inputs" all files in "level" that overlap [begin,end]
+// If hint_index is specified, then it points to a file in the
+// overlapping range.
+// The file_index returns a pointer to any file in an overlapping range.
+void VersionStorageInfo::GetOverlappingInputs(
+    int level, const InternalKey* begin, const InternalKey* end,
+    std::vector<FileMetaData*>* inputs, int hint_index, int* file_index,
+    bool expand_range) const {
+  if (level >= num_non_empty_levels_) {
+    // this level is empty, no overlapping inputs
+    return;
+  }
+
+  inputs->clear();
+  Slice user_begin, user_end;
+  if (begin != nullptr) {
+    user_begin = begin->user_key();
+  }
+  if (end != nullptr) {
+    user_end = end->user_key();
+  }
+  if (file_index) {
+    *file_index = -1;
+  }
+  const Comparator* user_cmp = user_comparator_;
+  if (begin != nullptr && end != nullptr && level > 0) {
+    GetOverlappingInputsRangeBinarySearch(level, user_begin, user_end, inputs,
+                                          hint_index, file_index);
+    return;
+  }
+
+  for (size_t i = 0; i < level_files_brief_[level].num_files; ) {
+    FdWithKeyRange* f = &(level_files_brief_[level].files[i++]);
+    const Slice file_start = ExtractUserKey(f->smallest_key);
+    const Slice file_limit = ExtractUserKey(f->largest_key);
+    if (begin != nullptr && user_cmp->Compare(file_limit, user_begin) < 0) {
+      // "f" is completely before specified range; skip it
+    } else if (end != nullptr && user_cmp->Compare(file_start, user_end) > 0) {
+      // "f" is completely after specified range; skip it
+    } else {
+      inputs->push_back(files_[level][i-1]);
+      if (level == 0 && expand_range) {
+        // Level-0 files may overlap each other.  So check if the newly
+        // added file has expanded the range.  If so, restart search.
+        if (begin != nullptr && user_cmp->Compare(file_start, user_begin) < 0) {
+          user_begin = file_start;
+          inputs->clear();
+          i = 0;
+        } else if (end != nullptr
+            && user_cmp->Compare(file_limit, user_end) > 0) {
+          user_end = file_limit;
+          inputs->clear();
+          i = 0;
+        }
+      } else if (file_index) {
+        *file_index = static_cast<int>(i) - 1;
+      }
+    }
+  }
+}
+
+// Store in "*inputs" files in "level" that within range [begin,end]
+// Guarantee a "clean cut" boundary between the files in inputs
+// and the surrounding files and the maxinum number of files.
+// This will ensure that no parts of a key are lost during compaction.
+// If hint_index is specified, then it points to a file in the range.
+// The file_index returns a pointer to any file in an overlapping range.
+void VersionStorageInfo::GetCleanInputsWithinInterval(
+    int level, const InternalKey* begin, const InternalKey* end,
+    std::vector<FileMetaData*>* inputs, int hint_index, int* file_index) const {
+  if (level >= num_non_empty_levels_) {
+    // this level is empty, no inputs within range
+    return;
+  }
+
+  inputs->clear();
+  Slice user_begin, user_end;
+  if (begin != nullptr) {
+    user_begin = begin->user_key();
+  }
+  if (end != nullptr) {
+    user_end = end->user_key();
+  }
+  if (file_index) {
+    *file_index = -1;
+  }
+  if (begin != nullptr && end != nullptr && level > 0) {
+    GetOverlappingInputsRangeBinarySearch(level, user_begin, user_end, inputs,
+                                          hint_index, file_index,
+                                          true /* within_interval */);
+  }
+}
+
+// Store in "*inputs" all files in "level" that overlap [begin,end]
+// Employ binary search to find at least one file that overlaps the
+// specified range. From that file, iterate backwards and
+// forwards to find all overlapping files.
+// if within_range is set, then only store the maximum clean inputs
+// within range [begin, end]. "clean" means there is a boudnary
+// between the files in "*inputs" and the surrounding files
+void VersionStorageInfo::GetOverlappingInputsRangeBinarySearch(
+    int level, const Slice& user_begin, const Slice& user_end,
+    std::vector<FileMetaData*>* inputs, int hint_index, int* file_index,
+    bool within_interval) const {
+  assert(level > 0);
+  int min = 0;
+  int mid = 0;
+  int max = static_cast<int>(files_[level].size()) - 1;
+  bool foundOverlap = false;
+  const Comparator* user_cmp = user_comparator_;
+
+  // if the caller already knows the index of a file that has overlap,
+  // then we can skip the binary search.
+  if (hint_index != -1) {
+    mid = hint_index;
+    foundOverlap = true;
+  }
+
+  while (!foundOverlap && min <= max) {
+    mid = (min + max)/2;
+    FdWithKeyRange* f = &(level_files_brief_[level].files[mid]);
+    const Slice file_start = ExtractUserKey(f->smallest_key);
+    const Slice file_limit = ExtractUserKey(f->largest_key);
+    if ((!within_interval && user_cmp->Compare(file_limit, user_begin) < 0) ||
+        (within_interval && user_cmp->Compare(file_start, user_begin) < 0)) {
+      min = mid + 1;
+    } else if ((!within_interval &&
+                user_cmp->Compare(user_end, file_start) < 0) ||
+               (within_interval &&
+                user_cmp->Compare(user_end, file_limit) < 0)) {
+      max = mid - 1;
+    } else {
+      foundOverlap = true;
+      break;
+    }
+  }
+
+  // If there were no overlapping files, return immediately.
+  if (!foundOverlap) {
+    return;
+  }
+  // returns the index where an overlap is found
+  if (file_index) {
+    *file_index = mid;
+  }
+
+  int start_index, end_index;
+  if (within_interval) {
+    ExtendFileRangeWithinInterval(level, user_begin, user_end, mid, &start_index,
+                                  &end_index);
+  } else {
+    ExtendFileRangeOverlappingInterval(level, user_begin, user_end, mid,
+                                       &start_index, &end_index);
+  }
+  assert(end_index >= start_index);
+  // insert overlapping files into vector
+  for (int i = start_index; i <= end_index; i++) {
+    inputs->push_back(files_[level][i]);
+  }
+}
+
+// Store in *start_index and *end_index the range of all files in
+// "level" that overlap [begin,end]
+// The mid_index specifies the index of at least one file that
+// overlaps the specified range. From that file, iterate backward
+// and forward to find all overlapping files.
+// Use FileLevel in searching, make it faster
+void VersionStorageInfo::ExtendFileRangeOverlappingInterval(
+    int level, const Slice& user_begin, const Slice& user_end,
+    unsigned int mid_index, int* start_index, int* end_index) const {
+  const Comparator* user_cmp = user_comparator_;
+  const FdWithKeyRange* files = level_files_brief_[level].files;
+#ifndef NDEBUG
+  {
+    // assert that the file at mid_index overlaps with the range
+    assert(mid_index < level_files_brief_[level].num_files);
+    const FdWithKeyRange* f = &files[mid_index];
+    const Slice fstart = ExtractUserKey(f->smallest_key);
+    const Slice flimit = ExtractUserKey(f->largest_key);
+    if (user_cmp->Compare(fstart, user_begin) >= 0) {
+      assert(user_cmp->Compare(fstart, user_end) <= 0);
+    } else {
+      assert(user_cmp->Compare(flimit, user_begin) >= 0);
+    }
+  }
+#endif
+  *start_index = mid_index + 1;
+  *end_index = mid_index;
+  int count __attribute__((unused)) = 0;
+
+  // check backwards from 'mid' to lower indices
+  for (int i = mid_index; i >= 0 ; i--) {
+    const FdWithKeyRange* f = &files[i];
+    const Slice file_limit = ExtractUserKey(f->largest_key);
+    if (user_cmp->Compare(file_limit, user_begin) >= 0) {
+      *start_index = i;
+      assert((count++, true));
+    } else {
+      break;
+    }
+  }
+  // check forward from 'mid+1' to higher indices
+  for (unsigned int i = mid_index+1;
+       i < level_files_brief_[level].num_files; i++) {
+    const FdWithKeyRange* f = &files[i];
+    const Slice file_start = ExtractUserKey(f->smallest_key);
+    if (user_cmp->Compare(file_start, user_end) <= 0) {
+      assert((count++, true));
+      *end_index = i;
+    } else {
+      break;
+    }
+  }
+  assert(count == *end_index - *start_index + 1);
+}
+
+// Store in *start_index and *end_index the clean range of all files in
+// "level" within [begin,end]
+// The mid_index specifies the index of at least one file within
+// the specified range. From that file, iterate backward
+// and forward to find all overlapping files and then "shrink" to
+// the clean range required.
+// Use FileLevel in searching, make it faster
+void VersionStorageInfo::ExtendFileRangeWithinInterval(
+    int level, const Slice& user_begin, const Slice& user_end,
+    unsigned int mid_index, int* start_index, int* end_index) const {
+  assert(level != 0);
+  const Comparator* user_cmp = user_comparator_;
+  const FdWithKeyRange* files = level_files_brief_[level].files;
+#ifndef NDEBUG
+  {
+    // assert that the file at mid_index is within the range
+    assert(mid_index < level_files_brief_[level].num_files);
+    const FdWithKeyRange* f = &files[mid_index];
+    const Slice fstart = ExtractUserKey(f->smallest_key);
+    const Slice flimit = ExtractUserKey(f->largest_key);
+    assert(user_cmp->Compare(fstart, user_begin) >= 0 &&
+           user_cmp->Compare(flimit, user_end) <= 0);
+  }
+#endif
+  ExtendFileRangeOverlappingInterval(level, user_begin, user_end, mid_index,
+                                     start_index, end_index);
+  int left = *start_index;
+  int right = *end_index;
+  // shrink from left to right
+  while (left <= right) {
+    const Slice& first_key_in_range = ExtractUserKey(files[left].smallest_key);
+    if (user_cmp->Compare(first_key_in_range, user_begin) < 0) {
+      left++;
+      continue;
+    }
+    if (left > 0) {  // If not first file
+      const Slice& last_key_before =
+          ExtractUserKey(files[left - 1].largest_key);
+      if (user_cmp->Equal(first_key_in_range, last_key_before)) {
+        // The first user key in range overlaps with the previous file's last
+        // key
+        left++;
+        continue;
+      }
+    }
+    break;
+  }
+  // shrink from right to left
+  while (left <= right) {
+    const Slice last_key_in_range = ExtractUserKey(files[right].largest_key);
+    if (user_cmp->Compare(last_key_in_range, user_end) > 0) {
+      right--;
+      continue;
+    }
+    if (right < static_cast<int>(level_files_brief_[level].num_files) -
+                    1) {  // If not the last file
+      const Slice first_key_after =
+          ExtractUserKey(files[right + 1].smallest_key);
+      if (user_cmp->Equal(last_key_in_range, first_key_after)) {
+        // The last user key in range overlaps with the next file's first key
+        right--;
+        continue;
+      }
+    }
+    break;
+  }
+
+  *start_index = left;
+  *end_index = right;
+}
+
+uint64_t VersionStorageInfo::NumLevelBytes(int level) const {
+  assert(level >= 0);
+  assert(level < num_levels());
+  return TotalFileSize(files_[level]);
+}
+
+const char* VersionStorageInfo::LevelSummary(
+    LevelSummaryStorage* scratch) const {
+  int len = 0;
+  if (compaction_style_ == kCompactionStyleLevel && num_levels() > 1) {
+    assert(base_level_ < static_cast<int>(level_max_bytes_.size()));
+    len = snprintf(scratch->buffer, sizeof(scratch->buffer),
+                   "base level %d max bytes base %" PRIu64 " ", base_level_,
+                   level_max_bytes_[base_level_]);
+  }
+  len +=
+      snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len, "files[");
+  for (int i = 0; i < num_levels(); i++) {
+    int sz = sizeof(scratch->buffer) - len;
+    int ret = snprintf(scratch->buffer + len, sz, "%d ", int(files_[i].size()));
+    if (ret < 0 || ret >= sz) break;
+    len += ret;
+  }
+  if (len > 0) {
+    // overwrite the last space
+    --len;
+  }
+  len += snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len,
+                  "] max score %.2f", compaction_score_[0]);
+
+  if (!files_marked_for_compaction_.empty()) {
+    snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len,
+             " (%" ROCKSDB_PRIszt " files need compaction)",
+             files_marked_for_compaction_.size());
+  }
+
+  return scratch->buffer;
+}
+
+const char* VersionStorageInfo::LevelFileSummary(FileSummaryStorage* scratch,
+                                                 int level) const {
+  int len = snprintf(scratch->buffer, sizeof(scratch->buffer), "files_size[");
+  for (const auto& f : files_[level]) {
+    int sz = sizeof(scratch->buffer) - len;
+    char sztxt[16];
+    AppendHumanBytes(f->fd.GetFileSize(), sztxt, sizeof(sztxt));
+    int ret = snprintf(scratch->buffer + len, sz,
+                       "#%" PRIu64 "(seq=%" PRIu64 ",sz=%s,%d) ",
+                       f->fd.GetNumber(), f->smallest_seqno, sztxt,
+                       static_cast<int>(f->being_compacted));
+    if (ret < 0 || ret >= sz)
+      break;
+    len += ret;
+  }
+  // overwrite the last space (only if files_[level].size() is non-zero)
+  if (files_[level].size() && len > 0) {
+    --len;
+  }
+  snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len, "]");
+  return scratch->buffer;
+}
+
+int64_t VersionStorageInfo::MaxNextLevelOverlappingBytes() {
+  uint64_t result = 0;
+  std::vector<FileMetaData*> overlaps;
+  for (int level = 1; level < num_levels() - 1; level++) {
+    for (const auto& f : files_[level]) {
+      GetOverlappingInputs(level + 1, &f->smallest, &f->largest, &overlaps);
+      const uint64_t sum = TotalFileSize(overlaps);
+      if (sum > result) {
+        result = sum;
+      }
+    }
+  }
+  return result;
+}
+
+uint64_t VersionStorageInfo::MaxBytesForLevel(int level) const {
+  // Note: the result for level zero is not really used since we set
+  // the level-0 compaction threshold based on number of files.
+  assert(level >= 0);
+  assert(level < static_cast<int>(level_max_bytes_.size()));
+  return level_max_bytes_[level];
+}
+
+void VersionStorageInfo::CalculateBaseBytes(const ImmutableCFOptions& ioptions,
+                                            const MutableCFOptions& options) {
+  // Special logic to set number of sorted runs.
+  // It is to match the previous behavior when all files are in L0.
+  int num_l0_count = static_cast<int>(files_[0].size());
+  if (compaction_style_ == kCompactionStyleUniversal) {
+    // For universal compaction, we use level0 score to indicate
+    // compaction score for the whole DB. Adding other levels as if
+    // they are L0 files.
+    for (int i = 1; i < num_levels(); i++) {
+      if (!files_[i].empty()) {
+        num_l0_count++;
+      }
+    }
+  }
+  set_l0_delay_trigger_count(num_l0_count);
+
+  level_max_bytes_.resize(ioptions.num_levels);
+  if (!ioptions.level_compaction_dynamic_level_bytes) {
+    base_level_ = (ioptions.compaction_style == kCompactionStyleLevel) ? 1 : -1;
+
+    // Calculate for static bytes base case
+    for (int i = 0; i < ioptions.num_levels; ++i) {
+      if (i == 0 && ioptions.compaction_style == kCompactionStyleUniversal) {
+        level_max_bytes_[i] = options.max_bytes_for_level_base;
+      } else if (i > 1) {
+        level_max_bytes_[i] = MultiplyCheckOverflow(
+            MultiplyCheckOverflow(level_max_bytes_[i - 1],
+                                  options.max_bytes_for_level_multiplier),
+            options.MaxBytesMultiplerAdditional(i - 1));
+      } else {
+        level_max_bytes_[i] = options.max_bytes_for_level_base;
+      }
+    }
+  } else {
+    uint64_t max_level_size = 0;
+
+    int first_non_empty_level = -1;
+    // Find size of non-L0 level of most data.
+    // Cannot use the size of the last level because it can be empty or less
+    // than previous levels after compaction.
+    for (int i = 1; i < num_levels_; i++) {
+      uint64_t total_size = 0;
+      for (const auto& f : files_[i]) {
+        total_size += f->fd.GetFileSize();
+      }
+      if (total_size > 0 && first_non_empty_level == -1) {
+        first_non_empty_level = i;
+      }
+      if (total_size > max_level_size) {
+        max_level_size = total_size;
+      }
+    }
+
+    // Prefill every level's max bytes to disallow compaction from there.
+    for (int i = 0; i < num_levels_; i++) {
+      level_max_bytes_[i] = std::numeric_limits<uint64_t>::max();
+    }
+
+    if (max_level_size == 0) {
+      // No data for L1 and up. L0 compacts to last level directly.
+      // No compaction from L1+ needs to be scheduled.
+      base_level_ = num_levels_ - 1;
+    } else {
+      uint64_t base_bytes_max = options.max_bytes_for_level_base;
+      uint64_t base_bytes_min = static_cast<uint64_t>(
+          base_bytes_max / options.max_bytes_for_level_multiplier);
+
+      // Try whether we can make last level's target size to be max_level_size
+      uint64_t cur_level_size = max_level_size;
+      for (int i = num_levels_ - 2; i >= first_non_empty_level; i--) {
+        // Round up after dividing
+        cur_level_size = static_cast<uint64_t>(
+            cur_level_size / options.max_bytes_for_level_multiplier);
+      }
+
+      // Calculate base level and its size.
+      uint64_t base_level_size;
+      if (cur_level_size <= base_bytes_min) {
+        // Case 1. If we make target size of last level to be max_level_size,
+        // target size of the first non-empty level would be smaller than
+        // base_bytes_min. We set it be base_bytes_min.
+        base_level_size = base_bytes_min + 1U;
+        base_level_ = first_non_empty_level;
+        ROCKS_LOG_WARN(ioptions.info_log,
+                       "More existing levels in DB than needed. "
+                       "max_bytes_for_level_multiplier may not be guaranteed.");
+      } else {
+        // Find base level (where L0 data is compacted to).
+        base_level_ = first_non_empty_level;
+        while (base_level_ > 1 && cur_level_size > base_bytes_max) {
+          --base_level_;
+          cur_level_size = static_cast<uint64_t>(
+              cur_level_size / options.max_bytes_for_level_multiplier);
+        }
+        if (cur_level_size > base_bytes_max) {
+          // Even L1 will be too large
+          assert(base_level_ == 1);
+          base_level_size = base_bytes_max;
+        } else {
+          base_level_size = cur_level_size;
+        }
+      }
+
+      uint64_t level_size = base_level_size;
+      for (int i = base_level_; i < num_levels_; i++) {
+        if (i > base_level_) {
+          level_size = MultiplyCheckOverflow(
+              level_size, options.max_bytes_for_level_multiplier);
+        }
+        // Don't set any level below base_bytes_max. Otherwise, the LSM can
+        // assume an hourglass shape where L1+ sizes are smaller than L0. This
+        // causes compaction scoring, which depends on level sizes, to favor L1+
+        // at the expense of L0, which may fill up and stall.
+        level_max_bytes_[i] = std::max(level_size, base_bytes_max);
+      }
+    }
+  }
+}
+
+uint64_t VersionStorageInfo::EstimateLiveDataSize() const {
+  // Estimate the live data size by adding up the size of the last level for all
+  // key ranges. Note: Estimate depends on the ordering of files in level 0
+  // because files in level 0 can be overlapping.
+  uint64_t size = 0;
+
+  auto ikey_lt = [this](InternalKey* x, InternalKey* y) {
+    return internal_comparator_->Compare(*x, *y) < 0;
+  };
+  // (Ordered) map of largest keys in non-overlapping files
+  std::map<InternalKey*, FileMetaData*, decltype(ikey_lt)> ranges(ikey_lt);
+
+  for (int l = num_levels_ - 1; l >= 0; l--) {
+    bool found_end = false;
+    for (auto file : files_[l]) {
+      // Find the first file where the largest key is larger than the smallest
+      // key of the current file. If this file does not overlap with the
+      // current file, none of the files in the map does. If there is
+      // no potential overlap, we can safely insert the rest of this level
+      // (if the level is not 0) into the map without checking again because
+      // the elements in the level are sorted and non-overlapping.
+      auto lb = (found_end && l != 0) ?
+        ranges.end() : ranges.lower_bound(&file->smallest);
+      found_end = (lb == ranges.end());
+      if (found_end || internal_comparator_->Compare(
+            file->largest, (*lb).second->smallest) < 0) {
+          ranges.emplace_hint(lb, &file->largest, file);
+          size += file->fd.file_size;
+      }
+    }
+  }
+  return size;
+}
+
+
+void Version::AddLiveFiles(std::vector<FileDescriptor>* live) {
+  for (int level = 0; level < storage_info_.num_levels(); level++) {
+    const std::vector<FileMetaData*>& files = storage_info_.files_[level];
+    for (const auto& file : files) {
+      live->push_back(file->fd);
+    }
+  }
+}
+
+std::string Version::DebugString(bool hex, bool print_stats) const {
+  std::string r;
+  for (int level = 0; level < storage_info_.num_levels_; level++) {
+    // E.g.,
+    //   --- level 1 ---
+    //   17:123['a' .. 'd']
+    //   20:43['e' .. 'g']
+    //
+    // if print_stats=true:
+    //   17:123['a' .. 'd'](4096)
+    r.append("--- level ");
+    AppendNumberTo(&r, level);
+    r.append(" --- version# ");
+    AppendNumberTo(&r, version_number_);
+    r.append(" ---\n");
+    const std::vector<FileMetaData*>& files = storage_info_.files_[level];
+    for (size_t i = 0; i < files.size(); i++) {
+      r.push_back(' ');
+      AppendNumberTo(&r, files[i]->fd.GetNumber());
+      r.push_back(':');
+      AppendNumberTo(&r, files[i]->fd.GetFileSize());
+      r.append("[");
+      r.append(files[i]->smallest.DebugString(hex));
+      r.append(" .. ");
+      r.append(files[i]->largest.DebugString(hex));
+      r.append("]");
+      if (print_stats) {
+        r.append("(");
+        r.append(ToString(
+            files[i]->stats.num_reads_sampled.load(std::memory_order_relaxed)));
+        r.append(")");
+      }
+      r.append("\n");
+    }
+  }
+  return r;
+}
+
+// this is used to batch writes to the manifest file
+struct VersionSet::ManifestWriter {
+  Status status;
+  bool done;
+  InstrumentedCondVar cv;
+  ColumnFamilyData* cfd;
+  const autovector<VersionEdit*>& edit_list;
+
+  explicit ManifestWriter(InstrumentedMutex* mu, ColumnFamilyData* _cfd,
+                          const autovector<VersionEdit*>& e)
+      : done(false), cv(mu), cfd(_cfd), edit_list(e) {}
+};
+
+VersionSet::VersionSet(const std::string& dbname,
+                       const ImmutableDBOptions* db_options,
+                       const EnvOptions& storage_options, Cache* table_cache,
+                       WriteBufferManager* write_buffer_manager,
+                       WriteController* write_controller)
+    : column_family_set_(
+          new ColumnFamilySet(dbname, db_options, storage_options, table_cache,
+                              write_buffer_manager, write_controller)),
+      env_(db_options->env),
+      dbname_(dbname),
+      db_options_(db_options),
+      next_file_number_(2),
+      manifest_file_number_(0),  // Filled by Recover()
+      pending_manifest_file_number_(0),
+      last_sequence_(0),
+      last_to_be_written_sequence_(0),
+      prev_log_number_(0),
+      current_version_number_(0),
+      manifest_file_size_(0),
+      env_options_(storage_options),
+      env_options_compactions_(
+          env_->OptimizeForCompactionTableRead(env_options_, *db_options_)) {}
+
+void CloseTables(void* ptr, size_t) {
+  TableReader* table_reader = reinterpret_cast<TableReader*>(ptr);
+  table_reader->Close();
+}
+
+VersionSet::~VersionSet() {
+  // we need to delete column_family_set_ because its destructor depends on
+  // VersionSet
+  Cache* table_cache = column_family_set_->get_table_cache();
+  table_cache->ApplyToAllCacheEntries(&CloseTables, false /* thread_safe */);
+  column_family_set_.reset();
+  for (auto file : obsolete_files_) {
+    if (file->table_reader_handle) {
+      table_cache->Release(file->table_reader_handle);
+      TableCache::Evict(table_cache, file->fd.GetNumber());
+    }
+    delete file;
+  }
+  obsolete_files_.clear();
+}
+
+void VersionSet::AppendVersion(ColumnFamilyData* column_family_data,
+                               Version* v) {
+  // compute new compaction score
+  v->storage_info()->ComputeCompactionScore(
+      *column_family_data->ioptions(),
+      *column_family_data->GetLatestM

<TRUNCATED>

Mime
View raw message