nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jeremyd...@apache.org
Subject [27/51] [partial] nifi-minifi-cpp git commit: MINIFI-372: Replace leveldb with RocksDB
Date Mon, 09 Oct 2017 16:25:07 GMT
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/compaction.cc
----------------------------------------------------------------------
diff --git a/thirdparty/rocksdb/db/compaction.cc b/thirdparty/rocksdb/db/compaction.cc
new file mode 100644
index 0000000..9ea332d
--- /dev/null
+++ b/thirdparty/rocksdb/db/compaction.cc
@@ -0,0 +1,480 @@
+//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#include "db/compaction.h"
+
+#ifndef __STDC_FORMAT_MACROS
+#define __STDC_FORMAT_MACROS
+#endif
+
+#include <inttypes.h>
+#include <vector>
+
+#include "db/column_family.h"
+#include "rocksdb/compaction_filter.h"
+#include "util/string_util.h"
+#include "util/sync_point.h"
+
+namespace rocksdb {
+
+uint64_t TotalFileSize(const std::vector<FileMetaData*>& files) {
+  uint64_t sum = 0;
+  for (size_t i = 0; i < files.size() && files[i]; i++) {
+    sum += files[i]->fd.GetFileSize();
+  }
+  return sum;
+}
+
+void Compaction::SetInputVersion(Version* _input_version) {
+  input_version_ = _input_version;
+  cfd_ = input_version_->cfd();
+
+  cfd_->Ref();
+  input_version_->Ref();
+  edit_.SetColumnFamily(cfd_->GetID());
+}
+
+void Compaction::GetBoundaryKeys(
+    VersionStorageInfo* vstorage,
+    const std::vector<CompactionInputFiles>& inputs, Slice* smallest_user_key,
+    Slice* largest_user_key) {
+  bool initialized = false;
+  const Comparator* ucmp = vstorage->InternalComparator()->user_comparator();
+  for (size_t i = 0; i < inputs.size(); ++i) {
+    if (inputs[i].files.empty()) {
+      continue;
+    }
+    if (inputs[i].level == 0) {
+      // we need to consider all files on level 0
+      for (const auto* f : inputs[i].files) {
+        const Slice& start_user_key = f->smallest.user_key();
+        if (!initialized ||
+            ucmp->Compare(start_user_key, *smallest_user_key) < 0) {
+          *smallest_user_key = start_user_key;
+        }
+        const Slice& end_user_key = f->largest.user_key();
+        if (!initialized ||
+            ucmp->Compare(end_user_key, *largest_user_key) > 0) {
+          *largest_user_key = end_user_key;
+        }
+        initialized = true;
+      }
+    } else {
+      // we only need to consider the first and last file
+      const Slice& start_user_key = inputs[i].files[0]->smallest.user_key();
+      if (!initialized ||
+          ucmp->Compare(start_user_key, *smallest_user_key) < 0) {
+        *smallest_user_key = start_user_key;
+      }
+      const Slice& end_user_key = inputs[i].files.back()->largest.user_key();
+      if (!initialized || ucmp->Compare(end_user_key, *largest_user_key) > 0) {
+        *largest_user_key = end_user_key;
+      }
+      initialized = true;
+    }
+  }
+}
+
+// helper function to determine if compaction is creating files at the
+// bottommost level
+bool Compaction::IsBottommostLevel(
+    int output_level, VersionStorageInfo* vstorage,
+    const std::vector<CompactionInputFiles>& inputs) {
+  if (inputs[0].level == 0 &&
+      inputs[0].files.back() != vstorage->LevelFiles(0).back()) {
+    return false;
+  }
+
+  Slice smallest_key, largest_key;
+  GetBoundaryKeys(vstorage, inputs, &smallest_key, &largest_key);
+
+  // Checks whether there are files living beyond the output_level.
+  // If lower levels have files, it checks for overlap between files
+  // if the compaction process and those files.
+  // Bottomlevel optimizations can be made if there are no files in
+  // lower levels or if there is no overlap with the files in
+  // the lower levels.
+  for (int i = output_level + 1; i < vstorage->num_levels(); i++) {
+    // It is not the bottommost level if there are files in higher
+    // levels when the output level is 0 or if there are files in
+    // higher levels which overlap with files to be compacted.
+    // output_level == 0 means that we want it to be considered
+    // s the bottommost level only if the last file on the level
+    // is a part of the files to be compacted - this is verified by
+    // the first if condition in this function
+    if (vstorage->NumLevelFiles(i) > 0 &&
+        (output_level == 0 ||
+         vstorage->OverlapInLevel(i, &smallest_key, &largest_key))) {
+      return false;
+    }
+  }
+  return true;
+}
+
+// test function to validate the functionality of IsBottommostLevel()
+// function -- determines if compaction with inputs and storage is bottommost
+bool Compaction::TEST_IsBottommostLevel(
+    int output_level, VersionStorageInfo* vstorage,
+    const std::vector<CompactionInputFiles>& inputs) {
+  return IsBottommostLevel(output_level, vstorage, inputs);
+}
+
+bool Compaction::IsFullCompaction(
+    VersionStorageInfo* vstorage,
+    const std::vector<CompactionInputFiles>& inputs) {
+  size_t num_files_in_compaction = 0;
+  size_t total_num_files = 0;
+  for (int l = 0; l < vstorage->num_levels(); l++) {
+    total_num_files += vstorage->NumLevelFiles(l);
+  }
+  for (size_t i = 0; i < inputs.size(); i++) {
+    num_files_in_compaction += inputs[i].size();
+  }
+  return num_files_in_compaction == total_num_files;
+}
+
+Compaction::Compaction(VersionStorageInfo* vstorage,
+                       const ImmutableCFOptions& _immutable_cf_options,
+                       const MutableCFOptions& _mutable_cf_options,
+                       std::vector<CompactionInputFiles> _inputs,
+                       int _output_level, uint64_t _target_file_size,
+                       uint64_t _max_compaction_bytes, uint32_t _output_path_id,
+                       CompressionType _compression,
+                       std::vector<FileMetaData*> _grandparents,
+                       bool _manual_compaction, double _score,
+                       bool _deletion_compaction,
+                       CompactionReason _compaction_reason)
+    : input_vstorage_(vstorage),
+      start_level_(_inputs[0].level),
+      output_level_(_output_level),
+      max_output_file_size_(_target_file_size),
+      max_compaction_bytes_(_max_compaction_bytes),
+      immutable_cf_options_(_immutable_cf_options),
+      mutable_cf_options_(_mutable_cf_options),
+      input_version_(nullptr),
+      number_levels_(vstorage->num_levels()),
+      cfd_(nullptr),
+      output_path_id_(_output_path_id),
+      output_compression_(_compression),
+      deletion_compaction_(_deletion_compaction),
+      inputs_(std::move(_inputs)),
+      grandparents_(std::move(_grandparents)),
+      score_(_score),
+      bottommost_level_(IsBottommostLevel(output_level_, vstorage, inputs_)),
+      is_full_compaction_(IsFullCompaction(vstorage, inputs_)),
+      is_manual_compaction_(_manual_compaction),
+      is_trivial_move_(false),
+      compaction_reason_(_compaction_reason) {
+  MarkFilesBeingCompacted(true);
+  if (is_manual_compaction_) {
+    compaction_reason_ = CompactionReason::kManualCompaction;
+  }
+
+#ifndef NDEBUG
+  for (size_t i = 1; i < inputs_.size(); ++i) {
+    assert(inputs_[i].level > inputs_[i - 1].level);
+  }
+#endif
+
+  // setup input_levels_
+  {
+    input_levels_.resize(num_input_levels());
+    for (size_t which = 0; which < num_input_levels(); which++) {
+      DoGenerateLevelFilesBrief(&input_levels_[which], inputs_[which].files,
+                                &arena_);
+    }
+  }
+
+  GetBoundaryKeys(vstorage, inputs_, &smallest_user_key_, &largest_user_key_);
+}
+
+Compaction::~Compaction() {
+  if (input_version_ != nullptr) {
+    input_version_->Unref();
+  }
+  if (cfd_ != nullptr) {
+    if (cfd_->Unref()) {
+      delete cfd_;
+    }
+  }
+}
+
+bool Compaction::InputCompressionMatchesOutput() const {
+  int base_level = input_vstorage_->base_level();
+  bool matches = (GetCompressionType(immutable_cf_options_, input_vstorage_,
+                                     mutable_cf_options_, start_level_,
+                                     base_level) == output_compression_);
+  if (matches) {
+    TEST_SYNC_POINT("Compaction::InputCompressionMatchesOutput:Matches");
+    return true;
+  }
+  TEST_SYNC_POINT("Compaction::InputCompressionMatchesOutput:DidntMatch");
+  return matches;
+}
+
+bool Compaction::IsTrivialMove() const {
+  // Avoid a move if there is lots of overlapping grandparent data.
+  // Otherwise, the move could create a parent file that will require
+  // a very expensive merge later on.
+  // If start_level_== output_level_, the purpose is to force compaction
+  // filter to be applied to that level, and thus cannot be a trivial move.
+
+  // Check if start level have files with overlapping ranges
+  if (start_level_ == 0 && input_vstorage_->level0_non_overlapping() == false) {
+    // We cannot move files from L0 to L1 if the files are overlapping
+    return false;
+  }
+
+  if (is_manual_compaction_ &&
+      (immutable_cf_options_.compaction_filter != nullptr ||
+       immutable_cf_options_.compaction_filter_factory != nullptr)) {
+    // This is a manual compaction and we have a compaction filter that should
+    // be executed, we cannot do a trivial move
+    return false;
+  }
+
+  // Used in universal compaction, where trivial move can be done if the
+  // input files are non overlapping
+  if ((immutable_cf_options_.compaction_options_universal.allow_trivial_move) &&
+      (output_level_ != 0)) {
+    return is_trivial_move_;
+  }
+
+  if (!(start_level_ != output_level_ && num_input_levels() == 1 &&
+          input(0, 0)->fd.GetPathId() == output_path_id() &&
+          InputCompressionMatchesOutput())) {
+    return false;
+  }
+
+  // assert inputs_.size() == 1
+
+  for (const auto& file : inputs_.front().files) {
+    std::vector<FileMetaData*> file_grand_parents;
+    if (output_level_ + 1 >= number_levels_) {
+      continue;
+    }
+    input_vstorage_->GetOverlappingInputs(output_level_ + 1, &file->smallest,
+                                          &file->largest, &file_grand_parents);
+    const auto compaction_size =
+        file->fd.GetFileSize() + TotalFileSize(file_grand_parents);
+    if (compaction_size > max_compaction_bytes_) {
+      return false;
+    }
+  }
+
+  return true;
+}
+
+void Compaction::AddInputDeletions(VersionEdit* out_edit) {
+  for (size_t which = 0; which < num_input_levels(); which++) {
+    for (size_t i = 0; i < inputs_[which].size(); i++) {
+      out_edit->DeleteFile(level(which), inputs_[which][i]->fd.GetNumber());
+    }
+  }
+}
+
+bool Compaction::KeyNotExistsBeyondOutputLevel(
+    const Slice& user_key, std::vector<size_t>* level_ptrs) const {
+  assert(input_version_ != nullptr);
+  assert(level_ptrs != nullptr);
+  assert(level_ptrs->size() == static_cast<size_t>(number_levels_));
+  assert(cfd_->ioptions()->compaction_style != kCompactionStyleFIFO);
+  if (cfd_->ioptions()->compaction_style == kCompactionStyleUniversal) {
+    return bottommost_level_;
+  }
+  if (cfd_->ioptions()->compaction_style == kCompactionStyleLevel &&
+      output_level_ == 0) {
+    return false;
+  }
+  // Maybe use binary search to find right entry instead of linear search?
+  const Comparator* user_cmp = cfd_->user_comparator();
+  for (int lvl = output_level_ + 1; lvl < number_levels_; lvl++) {
+    const std::vector<FileMetaData*>& files = input_vstorage_->LevelFiles(lvl);
+    for (; level_ptrs->at(lvl) < files.size(); level_ptrs->at(lvl)++) {
+      auto* f = files[level_ptrs->at(lvl)];
+      if (user_cmp->Compare(user_key, f->largest.user_key()) <= 0) {
+        // We've advanced far enough
+        if (user_cmp->Compare(user_key, f->smallest.user_key()) >= 0) {
+          // Key falls in this file's range, so definitely
+          // exists beyond output level
+          return false;
+        }
+        break;
+      }
+    }
+  }
+  return true;
+}
+
+// Mark (or clear) each file that is being compacted
+void Compaction::MarkFilesBeingCompacted(bool mark_as_compacted) {
+  for (size_t i = 0; i < num_input_levels(); i++) {
+    for (size_t j = 0; j < inputs_[i].size(); j++) {
+      assert(mark_as_compacted ? !inputs_[i][j]->being_compacted
+                               : inputs_[i][j]->being_compacted);
+      inputs_[i][j]->being_compacted = mark_as_compacted;
+    }
+  }
+}
+
+// Sample output:
+// If compacting 3 L0 files, 2 L3 files and 1 L4 file, and outputting to L5,
+// print: "3@0 + 2@3 + 1@4 files to L5"
+const char* Compaction::InputLevelSummary(
+    InputLevelSummaryBuffer* scratch) const {
+  int len = 0;
+  bool is_first = true;
+  for (auto& input_level : inputs_) {
+    if (input_level.empty()) {
+      continue;
+    }
+    if (!is_first) {
+      len +=
+          snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len, " + ");
+    } else {
+      is_first = false;
+    }
+    len += snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len,
+                    "%" ROCKSDB_PRIszt "@%d", input_level.size(),
+                    input_level.level);
+  }
+  snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len,
+           " files to L%d", output_level());
+
+  return scratch->buffer;
+}
+
+uint64_t Compaction::CalculateTotalInputSize() const {
+  uint64_t size = 0;
+  for (auto& input_level : inputs_) {
+    for (auto f : input_level.files) {
+      size += f->fd.GetFileSize();
+    }
+  }
+  return size;
+}
+
+void Compaction::ReleaseCompactionFiles(Status status) {
+  MarkFilesBeingCompacted(false);
+  cfd_->compaction_picker()->ReleaseCompactionFiles(this, status);
+}
+
+void Compaction::ResetNextCompactionIndex() {
+  assert(input_version_ != nullptr);
+  input_vstorage_->ResetNextCompactionIndex(start_level_);
+}
+
+namespace {
+int InputSummary(const std::vector<FileMetaData*>& files, char* output,
+                 int len) {
+  *output = '\0';
+  int write = 0;
+  for (size_t i = 0; i < files.size(); i++) {
+    int sz = len - write;
+    int ret;
+    char sztxt[16];
+    AppendHumanBytes(files.at(i)->fd.GetFileSize(), sztxt, 16);
+    ret = snprintf(output + write, sz, "%" PRIu64 "(%s) ",
+                   files.at(i)->fd.GetNumber(), sztxt);
+    if (ret < 0 || ret >= sz) break;
+    write += ret;
+  }
+  // if files.size() is non-zero, overwrite the last space
+  return write - !!files.size();
+}
+}  // namespace
+
+void Compaction::Summary(char* output, int len) {
+  int write =
+      snprintf(output, len, "Base version %" PRIu64 " Base level %d, inputs: [",
+               input_version_->GetVersionNumber(), start_level_);
+  if (write < 0 || write >= len) {
+    return;
+  }
+
+  for (size_t level_iter = 0; level_iter < num_input_levels(); ++level_iter) {
+    if (level_iter > 0) {
+      write += snprintf(output + write, len - write, "], [");
+      if (write < 0 || write >= len) {
+        return;
+      }
+    }
+    write +=
+        InputSummary(inputs_[level_iter].files, output + write, len - write);
+    if (write < 0 || write >= len) {
+      return;
+    }
+  }
+
+  snprintf(output + write, len - write, "]");
+}
+
+uint64_t Compaction::OutputFilePreallocationSize() const {
+  uint64_t preallocation_size = 0;
+
+  if (max_output_file_size_ != port::kMaxUint64 &&
+      (cfd_->ioptions()->compaction_style == kCompactionStyleLevel ||
+       output_level() > 0)) {
+    preallocation_size = max_output_file_size_;
+  } else {
+    for (const auto& level_files : inputs_) {
+      for (const auto& file : level_files.files) {
+        preallocation_size += file->fd.GetFileSize();
+      }
+    }
+  }
+  // Over-estimate slightly so we don't end up just barely crossing
+  // the threshold
+  return preallocation_size + (preallocation_size / 10);
+}
+
+std::unique_ptr<CompactionFilter> Compaction::CreateCompactionFilter() const {
+  if (!cfd_->ioptions()->compaction_filter_factory) {
+    return nullptr;
+  }
+
+  CompactionFilter::Context context;
+  context.is_full_compaction = is_full_compaction_;
+  context.is_manual_compaction = is_manual_compaction_;
+  context.column_family_id = cfd_->GetID();
+  return cfd_->ioptions()->compaction_filter_factory->CreateCompactionFilter(
+      context);
+}
+
+bool Compaction::IsOutputLevelEmpty() const {
+  return inputs_.back().level != output_level_ || inputs_.back().empty();
+}
+
+bool Compaction::ShouldFormSubcompactions() const {
+  if (immutable_cf_options_.max_subcompactions <= 1 || cfd_ == nullptr) {
+    return false;
+  }
+  if (cfd_->ioptions()->compaction_style == kCompactionStyleLevel) {
+    return start_level_ == 0 && output_level_ > 0 && !IsOutputLevelEmpty();
+  } else if (cfd_->ioptions()->compaction_style == kCompactionStyleUniversal) {
+    return number_levels_ > 1 && output_level_ > 0;
+  } else {
+    return false;
+  }
+}
+
+uint64_t Compaction::MaxInputFileCreationTime() const {
+  uint64_t max_creation_time = 0;
+  for (const auto& file : inputs_[0].files) {
+    if (file->fd.table_reader != nullptr &&
+        file->fd.table_reader->GetTableProperties() != nullptr) {
+      uint64_t creation_time =
+          file->fd.table_reader->GetTableProperties()->creation_time;
+      max_creation_time = std::max(max_creation_time, creation_time);
+    }
+  }
+  return max_creation_time;
+}
+
+}  // namespace rocksdb

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/compaction.h
----------------------------------------------------------------------
diff --git a/thirdparty/rocksdb/db/compaction.h b/thirdparty/rocksdb/db/compaction.h
new file mode 100644
index 0000000..7be6df2
--- /dev/null
+++ b/thirdparty/rocksdb/db/compaction.h
@@ -0,0 +1,325 @@
+//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#pragma once
+#include "db/version_set.h"
+#include "options/cf_options.h"
+#include "util/arena.h"
+#include "util/autovector.h"
+
+namespace rocksdb {
+
+// The structure that manages compaction input files associated
+// with the same physical level.
+struct CompactionInputFiles {
+  int level;
+  std::vector<FileMetaData*> files;
+  inline bool empty() const { return files.empty(); }
+  inline size_t size() const { return files.size(); }
+  inline void clear() { files.clear(); }
+  inline FileMetaData* operator[](size_t i) const { return files[i]; }
+};
+
+class Version;
+class ColumnFamilyData;
+class VersionStorageInfo;
+class CompactionFilter;
+
+// A Compaction encapsulates information about a compaction.
+class Compaction {
+ public:
+  Compaction(VersionStorageInfo* input_version,
+             const ImmutableCFOptions& immutable_cf_options,
+             const MutableCFOptions& mutable_cf_options,
+             std::vector<CompactionInputFiles> inputs, int output_level,
+             uint64_t target_file_size, uint64_t max_compaction_bytes,
+             uint32_t output_path_id, CompressionType compression,
+             std::vector<FileMetaData*> grandparents,
+             bool manual_compaction = false, double score = -1,
+             bool deletion_compaction = false,
+             CompactionReason compaction_reason = CompactionReason::kUnknown);
+
+  // No copying allowed
+  Compaction(const Compaction&) = delete;
+  void operator=(const Compaction&) = delete;
+
+  ~Compaction();
+
+  // Returns the level associated to the specified compaction input level.
+  // If compaction_input_level is not specified, then input_level is set to 0.
+  int level(size_t compaction_input_level = 0) const {
+    return inputs_[compaction_input_level].level;
+  }
+
+  int start_level() const { return start_level_; }
+
+  // Outputs will go to this level
+  int output_level() const { return output_level_; }
+
+  // Returns the number of input levels in this compaction.
+  size_t num_input_levels() const { return inputs_.size(); }
+
+  // Return the object that holds the edits to the descriptor done
+  // by this compaction.
+  VersionEdit* edit() { return &edit_; }
+
+  // Returns the number of input files associated to the specified
+  // compaction input level.
+  // The function will return 0 if when "compaction_input_level" < 0
+  // or "compaction_input_level" >= "num_input_levels()".
+  size_t num_input_files(size_t compaction_input_level) const {
+    if (compaction_input_level < inputs_.size()) {
+      return inputs_[compaction_input_level].size();
+    }
+    return 0;
+  }
+
+  // Returns input version of the compaction
+  Version* input_version() const { return input_version_; }
+
+  // Returns the ColumnFamilyData associated with the compaction.
+  ColumnFamilyData* column_family_data() const { return cfd_; }
+
+  // Returns the file meta data of the 'i'th input file at the
+  // specified compaction input level.
+  // REQUIREMENT: "compaction_input_level" must be >= 0 and
+  //              < "input_levels()"
+  FileMetaData* input(size_t compaction_input_level, size_t i) const {
+    assert(compaction_input_level < inputs_.size());
+    return inputs_[compaction_input_level][i];
+  }
+
+  // Returns the list of file meta data of the specified compaction
+  // input level.
+  // REQUIREMENT: "compaction_input_level" must be >= 0 and
+  //              < "input_levels()"
+  const std::vector<FileMetaData*>* inputs(
+      size_t compaction_input_level) const {
+    assert(compaction_input_level < inputs_.size());
+    return &inputs_[compaction_input_level].files;
+  }
+
+  const std::vector<CompactionInputFiles>* inputs() { return &inputs_; }
+
+  // Returns the LevelFilesBrief of the specified compaction input level.
+  const LevelFilesBrief* input_levels(size_t compaction_input_level) const {
+    return &input_levels_[compaction_input_level];
+  }
+
+  // Maximum size of files to build during this compaction.
+  uint64_t max_output_file_size() const { return max_output_file_size_; }
+
+  // What compression for output
+  CompressionType output_compression() const { return output_compression_; }
+
+  // Whether need to write output file to second DB path.
+  uint32_t output_path_id() const { return output_path_id_; }
+
+  // Is this a trivial compaction that can be implemented by just
+  // moving a single input file to the next level (no merging or splitting)
+  bool IsTrivialMove() const;
+
+  // If true, then the compaction can be done by simply deleting input files.
+  bool deletion_compaction() const { return deletion_compaction_; }
+
+  // Add all inputs to this compaction as delete operations to *edit.
+  void AddInputDeletions(VersionEdit* edit);
+
+  // Returns true if the available information we have guarantees that
+  // the input "user_key" does not exist in any level beyond "output_level()".
+  bool KeyNotExistsBeyondOutputLevel(const Slice& user_key,
+                                     std::vector<size_t>* level_ptrs) const;
+
+  // Clear all files to indicate that they are not being compacted
+  // Delete this compaction from the list of running compactions.
+  //
+  // Requirement: DB mutex held
+  void ReleaseCompactionFiles(Status status);
+
+  // Returns the summary of the compaction in "output" with maximum "len"
+  // in bytes.  The caller is responsible for the memory management of
+  // "output".
+  void Summary(char* output, int len);
+
+  // Return the score that was used to pick this compaction run.
+  double score() const { return score_; }
+
+  // Is this compaction creating a file in the bottom most level?
+  bool bottommost_level() const { return bottommost_level_; }
+
+  // Does this compaction include all sst files?
+  bool is_full_compaction() const { return is_full_compaction_; }
+
+  // Was this compaction triggered manually by the client?
+  bool is_manual_compaction() const { return is_manual_compaction_; }
+
+  // Used when allow_trivial_move option is set in
+  // Universal compaction. If all the input files are
+  // non overlapping, then is_trivial_move_ variable
+  // will be set true, else false
+  void set_is_trivial_move(bool trivial_move) {
+    is_trivial_move_ = trivial_move;
+  }
+
+  // Used when allow_trivial_move option is set in
+  // Universal compaction. Returns true, if the input files
+  // are non-overlapping and can be trivially moved.
+  bool is_trivial_move() const { return is_trivial_move_; }
+
+  // How many total levels are there?
+  int number_levels() const { return number_levels_; }
+
+  // Return the ImmutableCFOptions that should be used throughout the compaction
+  // procedure
+  const ImmutableCFOptions* immutable_cf_options() const {
+    return &immutable_cf_options_;
+  }
+
+  // Return the MutableCFOptions that should be used throughout the compaction
+  // procedure
+  const MutableCFOptions* mutable_cf_options() const {
+    return &mutable_cf_options_;
+  }
+
+  // Returns the size in bytes that the output file should be preallocated to.
+  // In level compaction, that is max_file_size_. In universal compaction, that
+  // is the sum of all input file sizes.
+  uint64_t OutputFilePreallocationSize() const;
+
+  void SetInputVersion(Version* input_version);
+
+  struct InputLevelSummaryBuffer {
+    char buffer[128];
+  };
+
+  const char* InputLevelSummary(InputLevelSummaryBuffer* scratch) const;
+
+  uint64_t CalculateTotalInputSize() const;
+
+  // In case of compaction error, reset the nextIndex that is used
+  // to pick up the next file to be compacted from files_by_size_
+  void ResetNextCompactionIndex();
+
+  // Create a CompactionFilter from compaction_filter_factory
+  std::unique_ptr<CompactionFilter> CreateCompactionFilter() const;
+
+  // Is the input level corresponding to output_level_ empty?
+  bool IsOutputLevelEmpty() const;
+
+  // Should this compaction be broken up into smaller ones run in parallel?
+  bool ShouldFormSubcompactions() const;
+
+  // test function to validate the functionality of IsBottommostLevel()
+  // function -- determines if compaction with inputs and storage is bottommost
+  static bool TEST_IsBottommostLevel(
+      int output_level, VersionStorageInfo* vstorage,
+      const std::vector<CompactionInputFiles>& inputs);
+
+  TablePropertiesCollection GetOutputTableProperties() const {
+    return output_table_properties_;
+  }
+
+  void SetOutputTableProperties(TablePropertiesCollection tp) {
+    output_table_properties_ = std::move(tp);
+  }
+
+  Slice GetSmallestUserKey() const { return smallest_user_key_; }
+
+  Slice GetLargestUserKey() const { return largest_user_key_; }
+
+  CompactionReason compaction_reason() { return compaction_reason_; }
+
+  const std::vector<FileMetaData*>& grandparents() const {
+    return grandparents_;
+  }
+
+  uint64_t max_compaction_bytes() const { return max_compaction_bytes_; }
+
+  uint64_t MaxInputFileCreationTime() const;
+
+ private:
+  // mark (or clear) all files that are being compacted
+  void MarkFilesBeingCompacted(bool mark_as_compacted);
+
+  // get the smallest and largest key present in files to be compacted
+  static void GetBoundaryKeys(VersionStorageInfo* vstorage,
+                              const std::vector<CompactionInputFiles>& inputs,
+                              Slice* smallest_key, Slice* largest_key);
+
+  // helper function to determine if compaction with inputs and storage is
+  // bottommost
+  static bool IsBottommostLevel(
+      int output_level, VersionStorageInfo* vstorage,
+      const std::vector<CompactionInputFiles>& inputs);
+
+  static bool IsFullCompaction(VersionStorageInfo* vstorage,
+                               const std::vector<CompactionInputFiles>& inputs);
+
+  VersionStorageInfo* input_vstorage_;
+
+  const int start_level_;    // the lowest level to be compacted
+  const int output_level_;  // levels to which output files are stored
+  uint64_t max_output_file_size_;
+  uint64_t max_compaction_bytes_;
+  const ImmutableCFOptions immutable_cf_options_;
+  const MutableCFOptions mutable_cf_options_;
+  Version* input_version_;
+  VersionEdit edit_;
+  const int number_levels_;
+  ColumnFamilyData* cfd_;
+  Arena arena_;          // Arena used to allocate space for file_levels_
+
+  const uint32_t output_path_id_;
+  CompressionType output_compression_;
+  // If true, then the comaction can be done by simply deleting input files.
+  const bool deletion_compaction_;
+
+  // Compaction input files organized by level. Constant after construction
+  const std::vector<CompactionInputFiles> inputs_;
+
+  // A copy of inputs_, organized more closely in memory
+  autovector<LevelFilesBrief, 2> input_levels_;
+
+  // State used to check for number of overlapping grandparent files
+  // (grandparent == "output_level_ + 1")
+  std::vector<FileMetaData*> grandparents_;
+  const double score_;         // score that was used to pick this compaction.
+
+  // Is this compaction creating a file in the bottom most level?
+  const bool bottommost_level_;
+  // Does this compaction include all sst files?
+  const bool is_full_compaction_;
+
+  // Is this compaction requested by the client?
+  const bool is_manual_compaction_;
+
+  // True if we can do trivial move in Universal multi level
+  // compaction
+  bool is_trivial_move_;
+
+  // Does input compression match the output compression?
+  bool InputCompressionMatchesOutput() const;
+
+  // table properties of output files
+  TablePropertiesCollection output_table_properties_;
+
+  // smallest user keys in compaction
+  Slice smallest_user_key_;
+
+  // largest user keys in compaction
+  Slice largest_user_key_;
+
+  // Reason for compaction
+  CompactionReason compaction_reason_;
+};
+
+// Utility function
+extern uint64_t TotalFileSize(const std::vector<FileMetaData*>& files);
+
+}  // namespace rocksdb

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/compaction_iteration_stats.h
----------------------------------------------------------------------
diff --git a/thirdparty/rocksdb/db/compaction_iteration_stats.h b/thirdparty/rocksdb/db/compaction_iteration_stats.h
new file mode 100644
index 0000000..52a666e
--- /dev/null
+++ b/thirdparty/rocksdb/db/compaction_iteration_stats.h
@@ -0,0 +1,33 @@
+//  Copyright (c) 2016-present, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
+
+#pragma once
+
+struct CompactionIterationStats {
+  // Compaction statistics
+
+  // Doesn't include records skipped because of
+  // CompactionFilter::Decision::kRemoveAndSkipUntil.
+  int64_t num_record_drop_user = 0;
+
+  int64_t num_record_drop_hidden = 0;
+  int64_t num_record_drop_obsolete = 0;
+  int64_t num_record_drop_range_del = 0;
+  int64_t num_range_del_drop_obsolete = 0;
+  uint64_t total_filter_time = 0;
+
+  // Input statistics
+  // TODO(noetzli): The stats are incomplete. They are lacking everything
+  // consumed by MergeHelper.
+  uint64_t num_input_records = 0;
+  uint64_t num_input_deletion_records = 0;
+  uint64_t num_input_corrupt_records = 0;
+  uint64_t total_input_raw_key_bytes = 0;
+  uint64_t total_input_raw_value_bytes = 0;
+
+  // Single-Delete diagnostics for exceptional situations
+  uint64_t num_single_del_fallthru = 0;
+  uint64_t num_single_del_mismatch = 0;
+};

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/compaction_iterator.cc
----------------------------------------------------------------------
diff --git a/thirdparty/rocksdb/db/compaction_iterator.cc b/thirdparty/rocksdb/db/compaction_iterator.cc
new file mode 100644
index 0000000..08ae197
--- /dev/null
+++ b/thirdparty/rocksdb/db/compaction_iterator.cc
@@ -0,0 +1,576 @@
+//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
+
+#include "db/compaction_iterator.h"
+#include "rocksdb/listener.h"
+#include "table/internal_iterator.h"
+
+namespace rocksdb {
+
+#ifndef ROCKSDB_LITE
+CompactionEventListener::CompactionListenerValueType fromInternalValueType(
+    ValueType vt) {
+  switch (vt) {
+    case kTypeDeletion:
+      return CompactionEventListener::CompactionListenerValueType::kDelete;
+    case kTypeValue:
+      return CompactionEventListener::CompactionListenerValueType::kValue;
+    case kTypeMerge:
+      return CompactionEventListener::CompactionListenerValueType::
+          kMergeOperand;
+    case kTypeSingleDeletion:
+      return CompactionEventListener::CompactionListenerValueType::
+          kSingleDelete;
+    case kTypeRangeDeletion:
+      return CompactionEventListener::CompactionListenerValueType::kRangeDelete;
+    default:
+      assert(false);
+      return CompactionEventListener::CompactionListenerValueType::kInvalid;
+  }
+}
+#endif  // ROCKSDB_LITE
+
+CompactionIterator::CompactionIterator(
+    InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
+    SequenceNumber last_sequence, std::vector<SequenceNumber>* snapshots,
+    SequenceNumber earliest_write_conflict_snapshot, Env* env,
+    bool expect_valid_internal_key, RangeDelAggregator* range_del_agg,
+    const Compaction* compaction, const CompactionFilter* compaction_filter,
+    CompactionEventListener* compaction_listener,
+    const std::atomic<bool>* shutting_down)
+    : CompactionIterator(
+          input, cmp, merge_helper, last_sequence, snapshots,
+          earliest_write_conflict_snapshot, env, expect_valid_internal_key,
+          range_del_agg,
+          std::unique_ptr<CompactionProxy>(
+              compaction ? new CompactionProxy(compaction) : nullptr),
+          compaction_filter, compaction_listener, shutting_down) {}
+
+CompactionIterator::CompactionIterator(
+    InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
+    SequenceNumber last_sequence, std::vector<SequenceNumber>* snapshots,
+    SequenceNumber earliest_write_conflict_snapshot, Env* env,
+    bool expect_valid_internal_key, RangeDelAggregator* range_del_agg,
+    std::unique_ptr<CompactionProxy> compaction,
+    const CompactionFilter* compaction_filter,
+    CompactionEventListener* compaction_listener,
+    const std::atomic<bool>* shutting_down)
+    : input_(input),
+      cmp_(cmp),
+      merge_helper_(merge_helper),
+      snapshots_(snapshots),
+      earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot),
+      env_(env),
+      expect_valid_internal_key_(expect_valid_internal_key),
+      range_del_agg_(range_del_agg),
+      compaction_(std::move(compaction)),
+      compaction_filter_(compaction_filter),
+#ifndef ROCKSDB_LITE
+      compaction_listener_(compaction_listener),
+#endif  // ROCKSDB_LITE
+      shutting_down_(shutting_down),
+      ignore_snapshots_(false),
+      merge_out_iter_(merge_helper_) {
+  assert(compaction_filter_ == nullptr || compaction_ != nullptr);
+  bottommost_level_ =
+      compaction_ == nullptr ? false : compaction_->bottommost_level();
+  if (compaction_ != nullptr) {
+    level_ptrs_ = std::vector<size_t>(compaction_->number_levels(), 0);
+  }
+
+  if (snapshots_->size() == 0) {
+    // optimize for fast path if there are no snapshots
+    visible_at_tip_ = true;
+    earliest_snapshot_ = kMaxSequenceNumber;
+    latest_snapshot_ = 0;
+  } else {
+    visible_at_tip_ = false;
+    earliest_snapshot_ = snapshots_->at(0);
+    latest_snapshot_ = snapshots_->back();
+  }
+  if (compaction_filter_ != nullptr) {
+    if (compaction_filter_->IgnoreSnapshots()) {
+      ignore_snapshots_ = true;
+    }
+  } else {
+    ignore_snapshots_ = false;
+  }
+  input_->SetPinnedItersMgr(&pinned_iters_mgr_);
+}
+
+CompactionIterator::~CompactionIterator() {
+  // input_ Iteartor lifetime is longer than pinned_iters_mgr_ lifetime
+  input_->SetPinnedItersMgr(nullptr);
+}
+
+void CompactionIterator::ResetRecordCounts() {
+  iter_stats_.num_record_drop_user = 0;
+  iter_stats_.num_record_drop_hidden = 0;
+  iter_stats_.num_record_drop_obsolete = 0;
+  iter_stats_.num_record_drop_range_del = 0;
+  iter_stats_.num_range_del_drop_obsolete = 0;
+}
+
+void CompactionIterator::SeekToFirst() {
+  NextFromInput();
+  PrepareOutput();
+}
+
+void CompactionIterator::Next() {
+  // If there is a merge output, return it before continuing to process the
+  // input.
+  if (merge_out_iter_.Valid()) {
+    merge_out_iter_.Next();
+
+    // Check if we returned all records of the merge output.
+    if (merge_out_iter_.Valid()) {
+      key_ = merge_out_iter_.key();
+      value_ = merge_out_iter_.value();
+      bool valid_key __attribute__((__unused__)) =
+          ParseInternalKey(key_, &ikey_);
+      // MergeUntil stops when it encounters a corrupt key and does not
+      // include them in the result, so we expect the keys here to be valid.
+      assert(valid_key);
+      // Keep current_key_ in sync.
+      current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
+      key_ = current_key_.GetInternalKey();
+      ikey_.user_key = current_key_.GetUserKey();
+      valid_ = true;
+    } else {
+      // We consumed all pinned merge operands, release pinned iterators
+      pinned_iters_mgr_.ReleasePinnedData();
+      // MergeHelper moves the iterator to the first record after the merged
+      // records, so even though we reached the end of the merge output, we do
+      // not want to advance the iterator.
+      NextFromInput();
+    }
+  } else {
+    // Only advance the input iterator if there is no merge output and the
+    // iterator is not already at the next record.
+    if (!at_next_) {
+      input_->Next();
+    }
+    NextFromInput();
+  }
+
+  if (valid_) {
+    // Record that we've outputted a record for the current key.
+    has_outputted_key_ = true;
+  }
+
+  PrepareOutput();
+}
+
+void CompactionIterator::NextFromInput() {
+  at_next_ = false;
+  valid_ = false;
+
+  while (!valid_ && input_->Valid() && !IsShuttingDown()) {
+    key_ = input_->key();
+    value_ = input_->value();
+    iter_stats_.num_input_records++;
+
+    if (!ParseInternalKey(key_, &ikey_)) {
+      // If `expect_valid_internal_key_` is false, return the corrupted key
+      // and let the caller decide what to do with it.
+      // TODO(noetzli): We should have a more elegant solution for this.
+      if (expect_valid_internal_key_) {
+        assert(!"Corrupted internal key not expected.");
+        status_ = Status::Corruption("Corrupted internal key not expected.");
+        break;
+      }
+      key_ = current_key_.SetInternalKey(key_);
+      has_current_user_key_ = false;
+      current_user_key_sequence_ = kMaxSequenceNumber;
+      current_user_key_snapshot_ = 0;
+      iter_stats_.num_input_corrupt_records++;
+      valid_ = true;
+      break;
+    }
+
+    // Update input statistics
+    if (ikey_.type == kTypeDeletion || ikey_.type == kTypeSingleDeletion) {
+      iter_stats_.num_input_deletion_records++;
+    }
+    iter_stats_.total_input_raw_key_bytes += key_.size();
+    iter_stats_.total_input_raw_value_bytes += value_.size();
+
+    // If need_skip is true, we should seek the input iterator
+    // to internal key skip_until and continue from there.
+    bool need_skip = false;
+    // Points either into compaction_filter_skip_until_ or into
+    // merge_helper_->compaction_filter_skip_until_.
+    Slice skip_until;
+
+    // Check whether the user key changed. After this if statement current_key_
+    // is a copy of the current input key (maybe converted to a delete by the
+    // compaction filter). ikey_.user_key is pointing to the copy.
+    if (!has_current_user_key_ ||
+        !cmp_->Equal(ikey_.user_key, current_user_key_)) {
+      // First occurrence of this user key
+      // Copy key for output
+      key_ = current_key_.SetInternalKey(key_, &ikey_);
+      current_user_key_ = ikey_.user_key;
+      has_current_user_key_ = true;
+      has_outputted_key_ = false;
+      current_user_key_sequence_ = kMaxSequenceNumber;
+      current_user_key_snapshot_ = 0;
+
+#ifndef ROCKSDB_LITE
+      if (compaction_listener_) {
+        compaction_listener_->OnCompaction(compaction_->level(), ikey_.user_key,
+                                           fromInternalValueType(ikey_.type),
+                                           value_, ikey_.sequence, true);
+      }
+#endif  // ROCKSDB_LITE
+
+      // apply the compaction filter to the first occurrence of the user key
+      if (compaction_filter_ != nullptr && ikey_.type == kTypeValue &&
+          (visible_at_tip_ || ikey_.sequence > latest_snapshot_ ||
+           ignore_snapshots_)) {
+        // If the user has specified a compaction filter and the sequence
+        // number is greater than any external snapshot, then invoke the
+        // filter. If the return value of the compaction filter is true,
+        // replace the entry with a deletion marker.
+        CompactionFilter::Decision filter;
+        compaction_filter_value_.clear();
+        compaction_filter_skip_until_.Clear();
+        {
+          StopWatchNano timer(env_, true);
+          filter = compaction_filter_->FilterV2(
+              compaction_->level(), ikey_.user_key,
+              CompactionFilter::ValueType::kValue, value_,
+              &compaction_filter_value_, compaction_filter_skip_until_.rep());
+          iter_stats_.total_filter_time +=
+              env_ != nullptr ? timer.ElapsedNanos() : 0;
+        }
+
+        if (filter == CompactionFilter::Decision::kRemoveAndSkipUntil &&
+            cmp_->Compare(*compaction_filter_skip_until_.rep(),
+                          ikey_.user_key) <= 0) {
+          // Can't skip to a key smaller than the current one.
+          // Keep the key as per FilterV2 documentation.
+          filter = CompactionFilter::Decision::kKeep;
+        }
+
+        if (filter == CompactionFilter::Decision::kRemove) {
+          // convert the current key to a delete; key_ is pointing into
+          // current_key_ at this point, so updating current_key_ updates key()
+          ikey_.type = kTypeDeletion;
+          current_key_.UpdateInternalKey(ikey_.sequence, kTypeDeletion);
+          // no value associated with delete
+          value_.clear();
+          iter_stats_.num_record_drop_user++;
+        } else if (filter == CompactionFilter::Decision::kChangeValue) {
+          value_ = compaction_filter_value_;
+        } else if (filter == CompactionFilter::Decision::kRemoveAndSkipUntil) {
+          need_skip = true;
+          compaction_filter_skip_until_.ConvertFromUserKey(kMaxSequenceNumber,
+                                                           kValueTypeForSeek);
+          skip_until = compaction_filter_skip_until_.Encode();
+        }
+      }
+    } else {
+#ifndef ROCKSDB_LITE
+      if (compaction_listener_) {
+        compaction_listener_->OnCompaction(compaction_->level(), ikey_.user_key,
+                                           fromInternalValueType(ikey_.type),
+                                           value_, ikey_.sequence, false);
+      }
+#endif  // ROCKSDB_LITE
+
+      // Update the current key to reflect the new sequence number/type without
+      // copying the user key.
+      // TODO(rven): Compaction filter does not process keys in this path
+      // Need to have the compaction filter process multiple versions
+      // if we have versions on both sides of a snapshot
+      current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
+      key_ = current_key_.GetInternalKey();
+      ikey_.user_key = current_key_.GetUserKey();
+    }
+
+    // If there are no snapshots, then this kv affect visibility at tip.
+    // Otherwise, search though all existing snapshots to find the earliest
+    // snapshot that is affected by this kv.
+    SequenceNumber last_sequence __attribute__((__unused__)) =
+        current_user_key_sequence_;
+    current_user_key_sequence_ = ikey_.sequence;
+    SequenceNumber last_snapshot = current_user_key_snapshot_;
+    SequenceNumber prev_snapshot = 0;  // 0 means no previous snapshot
+    current_user_key_snapshot_ =
+        visible_at_tip_
+            ? earliest_snapshot_
+            : findEarliestVisibleSnapshot(ikey_.sequence, &prev_snapshot);
+
+    if (need_skip) {
+      // This case is handled below.
+    } else if (clear_and_output_next_key_) {
+      // In the previous iteration we encountered a single delete that we could
+      // not compact out.  We will keep this Put, but can drop it's data.
+      // (See Optimization 3, below.)
+      assert(ikey_.type == kTypeValue);
+      assert(current_user_key_snapshot_ == last_snapshot);
+
+      value_.clear();
+      valid_ = true;
+      clear_and_output_next_key_ = false;
+    } else if (ikey_.type == kTypeSingleDeletion) {
+      // We can compact out a SingleDelete if:
+      // 1) We encounter the corresponding PUT -OR- we know that this key
+      //    doesn't appear past this output level
+      // =AND=
+      // 2) We've already returned a record in this snapshot -OR-
+      //    there are no earlier earliest_write_conflict_snapshot.
+      //
+      // Rule 1 is needed for SingleDelete correctness.  Rule 2 is needed to
+      // allow Transactions to do write-conflict checking (if we compacted away
+      // all keys, then we wouldn't know that a write happened in this
+      // snapshot).  If there is no earlier snapshot, then we know that there
+      // are no active transactions that need to know about any writes.
+      //
+      // Optimization 3:
+      // If we encounter a SingleDelete followed by a PUT and Rule 2 is NOT
+      // true, then we must output a SingleDelete.  In this case, we will decide
+      // to also output the PUT.  While we are compacting less by outputting the
+      // PUT now, hopefully this will lead to better compaction in the future
+      // when Rule 2 is later true (Ie, We are hoping we can later compact out
+      // both the SingleDelete and the Put, while we couldn't if we only
+      // outputted the SingleDelete now).
+      // In this case, we can save space by removing the PUT's value as it will
+      // never be read.
+      //
+      // Deletes and Merges are not supported on the same key that has a
+      // SingleDelete as it is not possible to correctly do any partial
+      // compaction of such a combination of operations.  The result of mixing
+      // those operations for a given key is documented as being undefined.  So
+      // we can choose how to handle such a combinations of operations.  We will
+      // try to compact out as much as we can in these cases.
+      // We will report counts on these anomalous cases.
+
+      // The easiest way to process a SingleDelete during iteration is to peek
+      // ahead at the next key.
+      ParsedInternalKey next_ikey;
+      input_->Next();
+
+      // Check whether the next key exists, is not corrupt, and is the same key
+      // as the single delete.
+      if (input_->Valid() && ParseInternalKey(input_->key(), &next_ikey) &&
+          cmp_->Equal(ikey_.user_key, next_ikey.user_key)) {
+        // Check whether the next key belongs to the same snapshot as the
+        // SingleDelete.
+        if (prev_snapshot == 0 || next_ikey.sequence > prev_snapshot) {
+          if (next_ikey.type == kTypeSingleDeletion) {
+            // We encountered two SingleDeletes in a row.  This could be due to
+            // unexpected user input.
+            // Skip the first SingleDelete and let the next iteration decide how
+            // to handle the second SingleDelete
+
+            // First SingleDelete has been skipped since we already called
+            // input_->Next().
+            ++iter_stats_.num_record_drop_obsolete;
+            ++iter_stats_.num_single_del_mismatch;
+          } else if ((ikey_.sequence <= earliest_write_conflict_snapshot_) ||
+                     has_outputted_key_) {
+            // Found a matching value, we can drop the single delete and the
+            // value.  It is safe to drop both records since we've already
+            // outputted a key in this snapshot, or there is no earlier
+            // snapshot (Rule 2 above).
+
+            // Note: it doesn't matter whether the second key is a Put or if it
+            // is an unexpected Merge or Delete.  We will compact it out
+            // either way. We will maintain counts of how many mismatches
+            // happened
+            if (next_ikey.type != kTypeValue) {
+              ++iter_stats_.num_single_del_mismatch;
+            }
+
+            ++iter_stats_.num_record_drop_hidden;
+            ++iter_stats_.num_record_drop_obsolete;
+            // Already called input_->Next() once.  Call it a second time to
+            // skip past the second key.
+            input_->Next();
+          } else {
+            // Found a matching value, but we cannot drop both keys since
+            // there is an earlier snapshot and we need to leave behind a record
+            // to know that a write happened in this snapshot (Rule 2 above).
+            // Clear the value and output the SingleDelete. (The value will be
+            // outputted on the next iteration.)
+
+            // Setting valid_ to true will output the current SingleDelete
+            valid_ = true;
+
+            // Set up the Put to be outputted in the next iteration.
+            // (Optimization 3).
+            clear_and_output_next_key_ = true;
+          }
+        } else {
+          // We hit the next snapshot without hitting a put, so the iterator
+          // returns the single delete.
+          valid_ = true;
+        }
+      } else {
+        // We are at the end of the input, could not parse the next key, or hit
+        // a different key. The iterator returns the single delete if the key
+        // possibly exists beyond the current output level.  We set
+        // has_current_user_key to false so that if the iterator is at the next
+        // key, we do not compare it again against the previous key at the next
+        // iteration. If the next key is corrupt, we return before the
+        // comparison, so the value of has_current_user_key does not matter.
+        has_current_user_key_ = false;
+        if (compaction_ != nullptr && ikey_.sequence <= earliest_snapshot_ &&
+            compaction_->KeyNotExistsBeyondOutputLevel(ikey_.user_key,
+                                                       &level_ptrs_)) {
+          // Key doesn't exist outside of this range.
+          // Can compact out this SingleDelete.
+          ++iter_stats_.num_record_drop_obsolete;
+          ++iter_stats_.num_single_del_fallthru;
+        } else {
+          // Output SingleDelete
+          valid_ = true;
+        }
+      }
+
+      if (valid_) {
+        at_next_ = true;
+      }
+    } else if (last_snapshot == current_user_key_snapshot_) {
+      // If the earliest snapshot is which this key is visible in
+      // is the same as the visibility of a previous instance of the
+      // same key, then this kv is not visible in any snapshot.
+      // Hidden by an newer entry for same user key
+      // TODO(noetzli): why not > ?
+      //
+      // Note: Dropping this key will not affect TransactionDB write-conflict
+      // checking since there has already been a record returned for this key
+      // in this snapshot.
+      assert(last_sequence >= current_user_key_sequence_);
+      ++iter_stats_.num_record_drop_hidden;  // (A)
+      input_->Next();
+    } else if (compaction_ != nullptr && ikey_.type == kTypeDeletion &&
+               ikey_.sequence <= earliest_snapshot_ &&
+               compaction_->KeyNotExistsBeyondOutputLevel(ikey_.user_key,
+                                                          &level_ptrs_)) {
+      // TODO(noetzli): This is the only place where we use compaction_
+      // (besides the constructor). We should probably get rid of this
+      // dependency and find a way to do similar filtering during flushes.
+      //
+      // For this user key:
+      // (1) there is no data in higher levels
+      // (2) data in lower levels will have larger sequence numbers
+      // (3) data in layers that are being compacted here and have
+      //     smaller sequence numbers will be dropped in the next
+      //     few iterations of this loop (by rule (A) above).
+      // Therefore this deletion marker is obsolete and can be dropped.
+      //
+      // Note:  Dropping this Delete will not affect TransactionDB
+      // write-conflict checking since it is earlier than any snapshot.
+      ++iter_stats_.num_record_drop_obsolete;
+      input_->Next();
+    } else if (ikey_.type == kTypeMerge) {
+      if (!merge_helper_->HasOperator()) {
+        status_ = Status::InvalidArgument(
+            "merge_operator is not properly initialized.");
+        return;
+      }
+
+      pinned_iters_mgr_.StartPinning();
+      // We know the merge type entry is not hidden, otherwise we would
+      // have hit (A)
+      // We encapsulate the merge related state machine in a different
+      // object to minimize change to the existing flow.
+      Status s = merge_helper_->MergeUntil(input_, range_del_agg_,
+                                           prev_snapshot, bottommost_level_);
+      merge_out_iter_.SeekToFirst();
+
+      if (!s.ok() && !s.IsMergeInProgress()) {
+        status_ = s;
+        return;
+      } else if (merge_out_iter_.Valid()) {
+        // NOTE: key, value, and ikey_ refer to old entries.
+        //       These will be correctly set below.
+        key_ = merge_out_iter_.key();
+        value_ = merge_out_iter_.value();
+        bool valid_key __attribute__((__unused__)) =
+            ParseInternalKey(key_, &ikey_);
+        // MergeUntil stops when it encounters a corrupt key and does not
+        // include them in the result, so we expect the keys here to valid.
+        assert(valid_key);
+        // Keep current_key_ in sync.
+        current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
+        key_ = current_key_.GetInternalKey();
+        ikey_.user_key = current_key_.GetUserKey();
+        valid_ = true;
+      } else {
+        // all merge operands were filtered out. reset the user key, since the
+        // batch consumed by the merge operator should not shadow any keys
+        // coming after the merges
+        has_current_user_key_ = false;
+        pinned_iters_mgr_.ReleasePinnedData();
+
+        if (merge_helper_->FilteredUntil(&skip_until)) {
+          need_skip = true;
+        }
+      }
+    } else {
+      // 1. new user key -OR-
+      // 2. different snapshot stripe
+      bool should_delete = range_del_agg_->ShouldDelete(
+          key_, RangeDelAggregator::RangePositioningMode::kForwardTraversal);
+      if (should_delete) {
+        ++iter_stats_.num_record_drop_hidden;
+        ++iter_stats_.num_record_drop_range_del;
+        input_->Next();
+      } else {
+        valid_ = true;
+      }
+    }
+
+    if (need_skip) {
+      input_->Seek(skip_until);
+    }
+  }
+
+  if (!valid_ && IsShuttingDown()) {
+    status_ = Status::ShutdownInProgress();
+  }
+}
+
+void CompactionIterator::PrepareOutput() {
+  // Zeroing out the sequence number leads to better compression.
+  // If this is the bottommost level (no files in lower levels)
+  // and the earliest snapshot is larger than this seqno
+  // and the userkey differs from the last userkey in compaction
+  // then we can squash the seqno to zero.
+
+  // This is safe for TransactionDB write-conflict checking since transactions
+  // only care about sequence number larger than any active snapshots.
+  if ((compaction_ != nullptr && !compaction_->allow_ingest_behind()) &&
+      bottommost_level_ && valid_ && ikey_.sequence <= earliest_snapshot_ &&
+      ikey_.type != kTypeMerge &&
+      !cmp_->Equal(compaction_->GetLargestUserKey(), ikey_.user_key)) {
+    assert(ikey_.type != kTypeDeletion && ikey_.type != kTypeSingleDeletion);
+    ikey_.sequence = 0;
+    current_key_.UpdateInternalKey(0, ikey_.type);
+  }
+}
+
+inline SequenceNumber CompactionIterator::findEarliestVisibleSnapshot(
+    SequenceNumber in, SequenceNumber* prev_snapshot) {
+  assert(snapshots_->size());
+  SequenceNumber prev __attribute__((__unused__)) = kMaxSequenceNumber;
+  for (const auto cur : *snapshots_) {
+    assert(prev == kMaxSequenceNumber || prev <= cur);
+    if (cur >= in) {
+      *prev_snapshot = prev == kMaxSequenceNumber ? 0 : prev;
+      return cur;
+    }
+    prev = cur;
+    assert(prev < kMaxSequenceNumber);
+  }
+  *prev_snapshot = prev;
+  return kMaxSequenceNumber;
+}
+
+}  // namespace rocksdb

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/compaction_iterator.h
----------------------------------------------------------------------
diff --git a/thirdparty/rocksdb/db/compaction_iterator.h b/thirdparty/rocksdb/db/compaction_iterator.h
new file mode 100644
index 0000000..cad2386
--- /dev/null
+++ b/thirdparty/rocksdb/db/compaction_iterator.h
@@ -0,0 +1,197 @@
+//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
+#pragma once
+
+#include <algorithm>
+#include <deque>
+#include <string>
+#include <vector>
+
+#include "db/compaction.h"
+#include "db/compaction_iteration_stats.h"
+#include "db/merge_helper.h"
+#include "db/pinned_iterators_manager.h"
+#include "db/range_del_aggregator.h"
+#include "options/cf_options.h"
+#include "rocksdb/compaction_filter.h"
+
+namespace rocksdb {
+
+class CompactionEventListener;
+
+class CompactionIterator {
+ public:
+  // A wrapper around Compaction. Has a much smaller interface, only what
+  // CompactionIterator uses. Tests can override it.
+  class CompactionProxy {
+   public:
+    explicit CompactionProxy(const Compaction* compaction)
+        : compaction_(compaction) {}
+
+    virtual ~CompactionProxy() = default;
+    virtual int level(size_t compaction_input_level = 0) const {
+      return compaction_->level();
+    }
+    virtual bool KeyNotExistsBeyondOutputLevel(
+        const Slice& user_key, std::vector<size_t>* level_ptrs) const {
+      return compaction_->KeyNotExistsBeyondOutputLevel(user_key, level_ptrs);
+    }
+    virtual bool bottommost_level() const {
+      return compaction_->bottommost_level();
+    }
+    virtual int number_levels() const { return compaction_->number_levels(); }
+    virtual Slice GetLargestUserKey() const {
+      return compaction_->GetLargestUserKey();
+    }
+    virtual bool allow_ingest_behind() const {
+      return compaction_->immutable_cf_options()->allow_ingest_behind;
+    }
+
+   protected:
+    CompactionProxy() = default;
+
+   private:
+    const Compaction* compaction_;
+  };
+
+  CompactionIterator(InternalIterator* input, const Comparator* cmp,
+                     MergeHelper* merge_helper, SequenceNumber last_sequence,
+                     std::vector<SequenceNumber>* snapshots,
+                     SequenceNumber earliest_write_conflict_snapshot, Env* env,
+                     bool expect_valid_internal_key,
+                     RangeDelAggregator* range_del_agg,
+                     const Compaction* compaction = nullptr,
+                     const CompactionFilter* compaction_filter = nullptr,
+                     CompactionEventListener* compaction_listener = nullptr,
+                     const std::atomic<bool>* shutting_down = nullptr);
+
+  // Constructor with custom CompactionProxy, used for tests.
+  CompactionIterator(InternalIterator* input, const Comparator* cmp,
+                     MergeHelper* merge_helper, SequenceNumber last_sequence,
+                     std::vector<SequenceNumber>* snapshots,
+                     SequenceNumber earliest_write_conflict_snapshot, Env* env,
+                     bool expect_valid_internal_key,
+                     RangeDelAggregator* range_del_agg,
+                     std::unique_ptr<CompactionProxy> compaction,
+                     const CompactionFilter* compaction_filter = nullptr,
+                     CompactionEventListener* compaction_listener = nullptr,
+                     const std::atomic<bool>* shutting_down = nullptr);
+
+  ~CompactionIterator();
+
+  void ResetRecordCounts();
+
+  // Seek to the beginning of the compaction iterator output.
+  //
+  // REQUIRED: Call only once.
+  void SeekToFirst();
+
+  // Produces the next record in the compaction.
+  //
+  // REQUIRED: SeekToFirst() has been called.
+  void Next();
+
+  // Getters
+  const Slice& key() const { return key_; }
+  const Slice& value() const { return value_; }
+  const Status& status() const { return status_; }
+  const ParsedInternalKey& ikey() const { return ikey_; }
+  bool Valid() const { return valid_; }
+  const Slice& user_key() const { return current_user_key_; }
+  const CompactionIterationStats& iter_stats() const { return iter_stats_; }
+
+ private:
+  // Processes the input stream to find the next output
+  void NextFromInput();
+
+  // Do last preparations before presenting the output to the callee. At this
+  // point this only zeroes out the sequence number if possible for better
+  // compression.
+  void PrepareOutput();
+
+  // Given a sequence number, return the sequence number of the
+  // earliest snapshot that this sequence number is visible in.
+  // The snapshots themselves are arranged in ascending order of
+  // sequence numbers.
+  // Employ a sequential search because the total number of
+  // snapshots are typically small.
+  inline SequenceNumber findEarliestVisibleSnapshot(
+      SequenceNumber in, SequenceNumber* prev_snapshot);
+
+  InternalIterator* input_;
+  const Comparator* cmp_;
+  MergeHelper* merge_helper_;
+  const std::vector<SequenceNumber>* snapshots_;
+  const SequenceNumber earliest_write_conflict_snapshot_;
+  Env* env_;
+  bool expect_valid_internal_key_;
+  RangeDelAggregator* range_del_agg_;
+  std::unique_ptr<CompactionProxy> compaction_;
+  const CompactionFilter* compaction_filter_;
+#ifndef ROCKSDB_LITE
+  CompactionEventListener* compaction_listener_;
+#endif  // ROCKSDB_LITE
+  const std::atomic<bool>* shutting_down_;
+  bool bottommost_level_;
+  bool valid_ = false;
+  bool visible_at_tip_;
+  SequenceNumber earliest_snapshot_;
+  SequenceNumber latest_snapshot_;
+  bool ignore_snapshots_;
+
+  // State
+  //
+  // Points to a copy of the current compaction iterator output (current_key_)
+  // if valid_.
+  Slice key_;
+  // Points to the value in the underlying iterator that corresponds to the
+  // current output.
+  Slice value_;
+  // The status is OK unless compaction iterator encounters a merge operand
+  // while not having a merge operator defined.
+  Status status_;
+  // Stores the user key, sequence number and type of the current compaction
+  // iterator output (or current key in the underlying iterator during
+  // NextFromInput()).
+  ParsedInternalKey ikey_;
+  // Stores whether ikey_.user_key is valid. If set to false, the user key is
+  // not compared against the current key in the underlying iterator.
+  bool has_current_user_key_ = false;
+  bool at_next_ = false;  // If false, the iterator
+  // Holds a copy of the current compaction iterator output (or current key in
+  // the underlying iterator during NextFromInput()).
+  IterKey current_key_;
+  Slice current_user_key_;
+  SequenceNumber current_user_key_sequence_;
+  SequenceNumber current_user_key_snapshot_;
+
+  // True if the iterator has already returned a record for the current key.
+  bool has_outputted_key_ = false;
+
+  // truncated the value of the next key and output it without applying any
+  // compaction rules.  This is used for outputting a put after a single delete.
+  bool clear_and_output_next_key_ = false;
+
+  MergeOutputIterator merge_out_iter_;
+  // PinnedIteratorsManager used to pin input_ Iterator blocks while reading
+  // merge operands and then releasing them after consuming them.
+  PinnedIteratorsManager pinned_iters_mgr_;
+  std::string compaction_filter_value_;
+  InternalKey compaction_filter_skip_until_;
+  // "level_ptrs" holds indices that remember which file of an associated
+  // level we were last checking during the last call to compaction->
+  // KeyNotExistsBeyondOutputLevel(). This allows future calls to the function
+  // to pick off where it left off since each subcompaction's key range is
+  // increasing so a later call to the function must be looking for a key that
+  // is in or beyond the last file checked during the previous call
+  std::vector<size_t> level_ptrs_;
+  CompactionIterationStats iter_stats_;
+
+  bool IsShuttingDown() {
+    // This is a best-effort facility, so memory_order_relaxed is sufficient.
+    return shutting_down_ && shutting_down_->load(std::memory_order_relaxed);
+  }
+};
+}  // namespace rocksdb


Mime
View raw message