nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jeremyd...@apache.org
Subject [24/51] [partial] nifi-minifi-cpp git commit: MINIFI-372: Replace leveldb with RocksDB
Date Mon, 09 Oct 2017 16:25:04 GMT
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/compaction_picker_universal.cc
----------------------------------------------------------------------
diff --git a/thirdparty/rocksdb/db/compaction_picker_universal.cc b/thirdparty/rocksdb/db/compaction_picker_universal.cc
new file mode 100644
index 0000000..ce48026
--- /dev/null
+++ b/thirdparty/rocksdb/db/compaction_picker_universal.cc
@@ -0,0 +1,747 @@
+//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#include "db/compaction_picker_universal.h"
+#ifndef ROCKSDB_LITE
+
+#ifndef __STDC_FORMAT_MACROS
+#define __STDC_FORMAT_MACROS
+#endif
+
+#include <inttypes.h>
+#include <limits>
+#include <queue>
+#include <string>
+#include <utility>
+#include "db/column_family.h"
+#include "monitoring/statistics.h"
+#include "util/filename.h"
+#include "util/log_buffer.h"
+#include "util/random.h"
+#include "util/string_util.h"
+#include "util/sync_point.h"
+
+namespace rocksdb {
+namespace {
+// Used in universal compaction when trivial move is enabled.
+// This structure is used for the construction of min heap
+// that contains the file meta data, the level of the file
+// and the index of the file in that level
+
+struct InputFileInfo {
+  InputFileInfo() : f(nullptr) {}
+
+  FileMetaData* f;
+  size_t level;
+  size_t index;
+};
+
+// Used in universal compaction when trivial move is enabled.
+// This comparator is used for the construction of min heap
+// based on the smallest key of the file.
+struct SmallestKeyHeapComparator {
+  explicit SmallestKeyHeapComparator(const Comparator* ucmp) { ucmp_ = ucmp; }
+
+  bool operator()(InputFileInfo i1, InputFileInfo i2) const {
+    return (ucmp_->Compare(i1.f->smallest.user_key(),
+                           i2.f->smallest.user_key()) > 0);
+  }
+
+ private:
+  const Comparator* ucmp_;
+};
+
+typedef std::priority_queue<InputFileInfo, std::vector<InputFileInfo>,
+                            SmallestKeyHeapComparator>
+    SmallestKeyHeap;
+
+// This function creates the heap that is used to find if the files are
+// overlapping during universal compaction when the allow_trivial_move
+// is set.
+SmallestKeyHeap create_level_heap(Compaction* c, const Comparator* ucmp) {
+  SmallestKeyHeap smallest_key_priority_q =
+      SmallestKeyHeap(SmallestKeyHeapComparator(ucmp));
+
+  InputFileInfo input_file;
+
+  for (size_t l = 0; l < c->num_input_levels(); l++) {
+    if (c->num_input_files(l) != 0) {
+      if (l == 0 && c->start_level() == 0) {
+        for (size_t i = 0; i < c->num_input_files(0); i++) {
+          input_file.f = c->input(0, i);
+          input_file.level = 0;
+          input_file.index = i;
+          smallest_key_priority_q.push(std::move(input_file));
+        }
+      } else {
+        input_file.f = c->input(l, 0);
+        input_file.level = l;
+        input_file.index = 0;
+        smallest_key_priority_q.push(std::move(input_file));
+      }
+    }
+  }
+  return smallest_key_priority_q;
+}
+
+#ifndef NDEBUG
+// smallest_seqno and largest_seqno are set iff. `files` is not empty.
+void GetSmallestLargestSeqno(const std::vector<FileMetaData*>& files,
+                             SequenceNumber* smallest_seqno,
+                             SequenceNumber* largest_seqno) {
+  bool is_first = true;
+  for (FileMetaData* f : files) {
+    assert(f->smallest_seqno <= f->largest_seqno);
+    if (is_first) {
+      is_first = false;
+      *smallest_seqno = f->smallest_seqno;
+      *largest_seqno = f->largest_seqno;
+    } else {
+      if (f->smallest_seqno < *smallest_seqno) {
+        *smallest_seqno = f->smallest_seqno;
+      }
+      if (f->largest_seqno > *largest_seqno) {
+        *largest_seqno = f->largest_seqno;
+      }
+    }
+  }
+}
+#endif
+}  // namespace
+
+// Algorithm that checks to see if there are any overlapping
+// files in the input
+bool UniversalCompactionPicker::IsInputFilesNonOverlapping(Compaction* c) {
+  auto comparator = icmp_->user_comparator();
+  int first_iter = 1;
+
+  InputFileInfo prev, curr, next;
+
+  SmallestKeyHeap smallest_key_priority_q =
+      create_level_heap(c, icmp_->user_comparator());
+
+  while (!smallest_key_priority_q.empty()) {
+    curr = smallest_key_priority_q.top();
+    smallest_key_priority_q.pop();
+
+    if (first_iter) {
+      prev = curr;
+      first_iter = 0;
+    } else {
+      if (comparator->Compare(prev.f->largest.user_key(),
+                              curr.f->smallest.user_key()) >= 0) {
+        // found overlapping files, return false
+        return false;
+      }
+      assert(comparator->Compare(curr.f->largest.user_key(),
+                                 prev.f->largest.user_key()) > 0);
+      prev = curr;
+    }
+
+    next.f = nullptr;
+
+    if (curr.level != 0 && curr.index < c->num_input_files(curr.level) - 1)
{
+      next.f = c->input(curr.level, curr.index + 1);
+      next.level = curr.level;
+      next.index = curr.index + 1;
+    }
+
+    if (next.f) {
+      smallest_key_priority_q.push(std::move(next));
+    }
+  }
+  return true;
+}
+
+bool UniversalCompactionPicker::NeedsCompaction(
+    const VersionStorageInfo* vstorage) const {
+  const int kLevel0 = 0;
+  return vstorage->CompactionScore(kLevel0) >= 1;
+}
+
+void UniversalCompactionPicker::SortedRun::Dump(char* out_buf,
+                                                size_t out_buf_size,
+                                                bool print_path) const {
+  if (level == 0) {
+    assert(file != nullptr);
+    if (file->fd.GetPathId() == 0 || !print_path) {
+      snprintf(out_buf, out_buf_size, "file %" PRIu64, file->fd.GetNumber());
+    } else {
+      snprintf(out_buf, out_buf_size, "file %" PRIu64
+                                      "(path "
+                                      "%" PRIu32 ")",
+               file->fd.GetNumber(), file->fd.GetPathId());
+    }
+  } else {
+    snprintf(out_buf, out_buf_size, "level %d", level);
+  }
+}
+
+void UniversalCompactionPicker::SortedRun::DumpSizeInfo(
+    char* out_buf, size_t out_buf_size, size_t sorted_run_count) const {
+  if (level == 0) {
+    assert(file != nullptr);
+    snprintf(out_buf, out_buf_size,
+             "file %" PRIu64 "[%" ROCKSDB_PRIszt
+             "] "
+             "with size %" PRIu64 " (compensated size %" PRIu64 ")",
+             file->fd.GetNumber(), sorted_run_count, file->fd.GetFileSize(),
+             file->compensated_file_size);
+  } else {
+    snprintf(out_buf, out_buf_size,
+             "level %d[%" ROCKSDB_PRIszt
+             "] "
+             "with size %" PRIu64 " (compensated size %" PRIu64 ")",
+             level, sorted_run_count, size, compensated_file_size);
+  }
+}
+
+std::vector<UniversalCompactionPicker::SortedRun>
+UniversalCompactionPicker::CalculateSortedRuns(
+    const VersionStorageInfo& vstorage, const ImmutableCFOptions& ioptions) {
+  std::vector<UniversalCompactionPicker::SortedRun> ret;
+  for (FileMetaData* f : vstorage.LevelFiles(0)) {
+    ret.emplace_back(0, f, f->fd.GetFileSize(), f->compensated_file_size,
+                     f->being_compacted);
+  }
+  for (int level = 1; level < vstorage.num_levels(); level++) {
+    uint64_t total_compensated_size = 0U;
+    uint64_t total_size = 0U;
+    bool being_compacted = false;
+    bool is_first = true;
+    for (FileMetaData* f : vstorage.LevelFiles(level)) {
+      total_compensated_size += f->compensated_file_size;
+      total_size += f->fd.GetFileSize();
+      if (ioptions.compaction_options_universal.allow_trivial_move == true) {
+        if (f->being_compacted) {
+          being_compacted = f->being_compacted;
+        }
+      } else {
+        // Compaction always includes all files for a non-zero level, so for a
+        // non-zero level, all the files should share the same being_compacted
+        // value.
+        // This assumption is only valid when
+        // ioptions.compaction_options_universal.allow_trivial_move is false
+        assert(is_first || f->being_compacted == being_compacted);
+      }
+      if (is_first) {
+        being_compacted = f->being_compacted;
+        is_first = false;
+      }
+    }
+    if (total_compensated_size > 0) {
+      ret.emplace_back(level, nullptr, total_size, total_compensated_size,
+                       being_compacted);
+    }
+  }
+  return ret;
+}
+
+// Universal style of compaction. Pick files that are contiguous in
+// time-range to compact.
+//
+Compaction* UniversalCompactionPicker::PickCompaction(
+    const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
+    VersionStorageInfo* vstorage, LogBuffer* log_buffer) {
+  const int kLevel0 = 0;
+  double score = vstorage->CompactionScore(kLevel0);
+  std::vector<SortedRun> sorted_runs =
+      CalculateSortedRuns(*vstorage, ioptions_);
+
+  if (sorted_runs.size() == 0 ||
+      sorted_runs.size() <
+          (unsigned int)mutable_cf_options.level0_file_num_compaction_trigger) {
+    ROCKS_LOG_BUFFER(log_buffer, "[%s] Universal: nothing to do\n",
+                     cf_name.c_str());
+    TEST_SYNC_POINT_CALLBACK("UniversalCompactionPicker::PickCompaction:Return",
+                             nullptr);
+    return nullptr;
+  }
+  VersionStorageInfo::LevelSummaryStorage tmp;
+  ROCKS_LOG_BUFFER_MAX_SZ(
+      log_buffer, 3072,
+      "[%s] Universal: sorted runs files(%" ROCKSDB_PRIszt "): %s\n",
+      cf_name.c_str(), sorted_runs.size(), vstorage->LevelSummary(&tmp));
+
+  // Check for size amplification first.
+  Compaction* c;
+  if ((c = PickCompactionToReduceSizeAmp(cf_name, mutable_cf_options, vstorage,
+                                         score, sorted_runs, log_buffer)) !=
+      nullptr) {
+    ROCKS_LOG_BUFFER(log_buffer, "[%s] Universal: compacting for size amp\n",
+                     cf_name.c_str());
+  } else {
+    // Size amplification is within limits. Try reducing read
+    // amplification while maintaining file size ratios.
+    unsigned int ratio = ioptions_.compaction_options_universal.size_ratio;
+
+    if ((c = PickCompactionToReduceSortedRuns(
+             cf_name, mutable_cf_options, vstorage, score, ratio, UINT_MAX,
+             sorted_runs, log_buffer)) != nullptr) {
+      ROCKS_LOG_BUFFER(log_buffer,
+                       "[%s] Universal: compacting for size ratio\n",
+                       cf_name.c_str());
+    } else {
+      // Size amplification and file size ratios are within configured limits.
+      // If max read amplification is exceeding configured limits, then force
+      // compaction without looking at filesize ratios and try to reduce
+      // the number of files to fewer than level0_file_num_compaction_trigger.
+      // This is guaranteed by NeedsCompaction()
+      assert(sorted_runs.size() >=
+             static_cast<size_t>(
+                 mutable_cf_options.level0_file_num_compaction_trigger));
+      // Get the total number of sorted runs that are not being compacted
+      int num_sr_not_compacted = 0;
+      for (size_t i = 0; i < sorted_runs.size(); i++) {
+        if (sorted_runs[i].being_compacted == false) {
+          num_sr_not_compacted++;
+        }
+      }
+
+      // The number of sorted runs that are not being compacted is greater than
+      // the maximum allowed number of sorted runs
+      if (num_sr_not_compacted >
+          mutable_cf_options.level0_file_num_compaction_trigger) {
+        unsigned int num_files =
+            num_sr_not_compacted -
+            mutable_cf_options.level0_file_num_compaction_trigger + 1;
+        if ((c = PickCompactionToReduceSortedRuns(
+                 cf_name, mutable_cf_options, vstorage, score, UINT_MAX,
+                 num_files, sorted_runs, log_buffer)) != nullptr) {
+          ROCKS_LOG_BUFFER(log_buffer,
+                           "[%s] Universal: compacting for file num -- %u\n",
+                           cf_name.c_str(), num_files);
+        }
+      }
+    }
+  }
+  if (c == nullptr) {
+    TEST_SYNC_POINT_CALLBACK("UniversalCompactionPicker::PickCompaction:Return",
+                             nullptr);
+    return nullptr;
+  }
+
+  if (ioptions_.compaction_options_universal.allow_trivial_move == true) {
+    c->set_is_trivial_move(IsInputFilesNonOverlapping(c));
+  }
+
+// validate that all the chosen files of L0 are non overlapping in time
+#ifndef NDEBUG
+  SequenceNumber prev_smallest_seqno = 0U;
+  bool is_first = true;
+
+  size_t level_index = 0U;
+  if (c->start_level() == 0) {
+    for (auto f : *c->inputs(0)) {
+      assert(f->smallest_seqno <= f->largest_seqno);
+      if (is_first) {
+        is_first = false;
+      }
+      prev_smallest_seqno = f->smallest_seqno;
+    }
+    level_index = 1U;
+  }
+  for (; level_index < c->num_input_levels(); level_index++) {
+    if (c->num_input_files(level_index) != 0) {
+      SequenceNumber smallest_seqno = 0U;
+      SequenceNumber largest_seqno = 0U;
+      GetSmallestLargestSeqno(*(c->inputs(level_index)), &smallest_seqno,
+                              &largest_seqno);
+      if (is_first) {
+        is_first = false;
+      } else if (prev_smallest_seqno > 0) {
+        // A level is considered as the bottommost level if there are
+        // no files in higher levels or if files in higher levels do
+        // not overlap with the files being compacted. Sequence numbers
+        // of files in bottommost level can be set to 0 to help
+        // compression. As a result, the following assert may not hold
+        // if the prev_smallest_seqno is 0.
+        assert(prev_smallest_seqno > largest_seqno);
+      }
+      prev_smallest_seqno = smallest_seqno;
+    }
+  }
+#endif
+  // update statistics
+  MeasureTime(ioptions_.statistics, NUM_FILES_IN_SINGLE_COMPACTION,
+              c->inputs(0)->size());
+
+  RegisterCompaction(c);
+
+  TEST_SYNC_POINT_CALLBACK("UniversalCompactionPicker::PickCompaction:Return",
+                           c);
+  return c;
+}
+
+uint32_t UniversalCompactionPicker::GetPathId(
+    const ImmutableCFOptions& ioptions, uint64_t file_size) {
+  // Two conditions need to be satisfied:
+  // (1) the target path needs to be able to hold the file's size
+  // (2) Total size left in this and previous paths need to be not
+  //     smaller than expected future file size before this new file is
+  //     compacted, which is estimated based on size_ratio.
+  // For example, if now we are compacting files of size (1, 1, 2, 4, 8),
+  // we will make sure the target file, probably with size of 16, will be
+  // placed in a path so that eventually when new files are generated and
+  // compacted to (1, 1, 2, 4, 8, 16), all those files can be stored in or
+  // before the path we chose.
+  //
+  // TODO(sdong): now the case of multiple column families is not
+  // considered in this algorithm. So the target size can be violated in
+  // that case. We need to improve it.
+  uint64_t accumulated_size = 0;
+  uint64_t future_size =
+      file_size * (100 - ioptions.compaction_options_universal.size_ratio) /
+      100;
+  uint32_t p = 0;
+  assert(!ioptions.db_paths.empty());
+  for (; p < ioptions.db_paths.size() - 1; p++) {
+    uint64_t target_size = ioptions.db_paths[p].target_size;
+    if (target_size > file_size &&
+        accumulated_size + (target_size - file_size) > future_size) {
+      return p;
+    }
+    accumulated_size += target_size;
+  }
+  return p;
+}
+
+//
+// Consider compaction files based on their size differences with
+// the next file in time order.
+//
+Compaction* UniversalCompactionPicker::PickCompactionToReduceSortedRuns(
+    const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
+    VersionStorageInfo* vstorage, double score, unsigned int ratio,
+    unsigned int max_number_of_files_to_compact,
+    const std::vector<SortedRun>& sorted_runs, LogBuffer* log_buffer) {
+  unsigned int min_merge_width =
+      ioptions_.compaction_options_universal.min_merge_width;
+  unsigned int max_merge_width =
+      ioptions_.compaction_options_universal.max_merge_width;
+
+  const SortedRun* sr = nullptr;
+  bool done = false;
+  size_t start_index = 0;
+  unsigned int candidate_count = 0;
+
+  unsigned int max_files_to_compact =
+      std::min(max_merge_width, max_number_of_files_to_compact);
+  min_merge_width = std::max(min_merge_width, 2U);
+
+  // Caller checks the size before executing this function. This invariant is
+  // important because otherwise we may have a possible integer underflow when
+  // dealing with unsigned types.
+  assert(sorted_runs.size() > 0);
+
+  // Considers a candidate file only if it is smaller than the
+  // total size accumulated so far.
+  for (size_t loop = 0; loop < sorted_runs.size(); loop++) {
+    candidate_count = 0;
+
+    // Skip files that are already being compacted
+    for (sr = nullptr; loop < sorted_runs.size(); loop++) {
+      sr = &sorted_runs[loop];
+
+      if (!sr->being_compacted) {
+        candidate_count = 1;
+        break;
+      }
+      char file_num_buf[kFormatFileNumberBufSize];
+      sr->Dump(file_num_buf, sizeof(file_num_buf));
+      ROCKS_LOG_BUFFER(log_buffer,
+                       "[%s] Universal: %s"
+                       "[%d] being compacted, skipping",
+                       cf_name.c_str(), file_num_buf, loop);
+
+      sr = nullptr;
+    }
+
+    // This file is not being compacted. Consider it as the
+    // first candidate to be compacted.
+    uint64_t candidate_size = sr != nullptr ? sr->compensated_file_size : 0;
+    if (sr != nullptr) {
+      char file_num_buf[kFormatFileNumberBufSize];
+      sr->Dump(file_num_buf, sizeof(file_num_buf), true);
+      ROCKS_LOG_BUFFER(log_buffer, "[%s] Universal: Possible candidate %s[%d].",
+                       cf_name.c_str(), file_num_buf, loop);
+    }
+
+    // Check if the succeeding files need compaction.
+    for (size_t i = loop + 1;
+         candidate_count < max_files_to_compact && i < sorted_runs.size();
+         i++) {
+      const SortedRun* succeeding_sr = &sorted_runs[i];
+      if (succeeding_sr->being_compacted) {
+        break;
+      }
+      // Pick files if the total/last candidate file size (increased by the
+      // specified ratio) is still larger than the next candidate file.
+      // candidate_size is the total size of files picked so far with the
+      // default kCompactionStopStyleTotalSize; with
+      // kCompactionStopStyleSimilarSize, it's simply the size of the last
+      // picked file.
+      double sz = candidate_size * (100.0 + ratio) / 100.0;
+      if (sz < static_cast<double>(succeeding_sr->size)) {
+        break;
+      }
+      if (ioptions_.compaction_options_universal.stop_style ==
+          kCompactionStopStyleSimilarSize) {
+        // Similar-size stopping rule: also check the last picked file isn't
+        // far larger than the next candidate file.
+        sz = (succeeding_sr->size * (100.0 + ratio)) / 100.0;
+        if (sz < static_cast<double>(candidate_size)) {
+          // If the small file we've encountered begins a run of similar-size
+          // files, we'll pick them up on a future iteration of the outer
+          // loop. If it's some lonely straggler, it'll eventually get picked
+          // by the last-resort read amp strategy which disregards size ratios.
+          break;
+        }
+        candidate_size = succeeding_sr->compensated_file_size;
+      } else {  // default kCompactionStopStyleTotalSize
+        candidate_size += succeeding_sr->compensated_file_size;
+      }
+      candidate_count++;
+    }
+
+    // Found a series of consecutive files that need compaction.
+    if (candidate_count >= (unsigned int)min_merge_width) {
+      start_index = loop;
+      done = true;
+      break;
+    } else {
+      for (size_t i = loop;
+           i < loop + candidate_count && i < sorted_runs.size(); i++) {
+        const SortedRun* skipping_sr = &sorted_runs[i];
+        char file_num_buf[256];
+        skipping_sr->DumpSizeInfo(file_num_buf, sizeof(file_num_buf), loop);
+        ROCKS_LOG_BUFFER(log_buffer, "[%s] Universal: Skipping %s",
+                         cf_name.c_str(), file_num_buf);
+      }
+    }
+  }
+  if (!done || candidate_count <= 1) {
+    return nullptr;
+  }
+  size_t first_index_after = start_index + candidate_count;
+  // Compression is enabled if files compacted earlier already reached
+  // size ratio of compression.
+  bool enable_compression = true;
+  int ratio_to_compress =
+      ioptions_.compaction_options_universal.compression_size_percent;
+  if (ratio_to_compress >= 0) {
+    uint64_t total_size = 0;
+    for (auto& sorted_run : sorted_runs) {
+      total_size += sorted_run.compensated_file_size;
+    }
+
+    uint64_t older_file_size = 0;
+    for (size_t i = sorted_runs.size() - 1; i >= first_index_after; i--) {
+      older_file_size += sorted_runs[i].size;
+      if (older_file_size * 100L >= total_size * (long)ratio_to_compress) {
+        enable_compression = false;
+        break;
+      }
+    }
+  }
+
+  uint64_t estimated_total_size = 0;
+  for (unsigned int i = 0; i < first_index_after; i++) {
+    estimated_total_size += sorted_runs[i].size;
+  }
+  uint32_t path_id = GetPathId(ioptions_, estimated_total_size);
+  int start_level = sorted_runs[start_index].level;
+  int output_level;
+  if (first_index_after == sorted_runs.size()) {
+    output_level = vstorage->num_levels() - 1;
+  } else if (sorted_runs[first_index_after].level == 0) {
+    output_level = 0;
+  } else {
+    output_level = sorted_runs[first_index_after].level - 1;
+  }
+
+  // last level is reserved for the files ingested behind
+  if (ioptions_.allow_ingest_behind &&
+      (output_level == vstorage->num_levels() - 1)) {
+    assert(output_level > 1);
+    output_level--;
+  }
+
+  std::vector<CompactionInputFiles> inputs(vstorage->num_levels());
+  for (size_t i = 0; i < inputs.size(); ++i) {
+    inputs[i].level = start_level + static_cast<int>(i);
+  }
+  for (size_t i = start_index; i < first_index_after; i++) {
+    auto& picking_sr = sorted_runs[i];
+    if (picking_sr.level == 0) {
+      FileMetaData* picking_file = picking_sr.file;
+      inputs[0].files.push_back(picking_file);
+    } else {
+      auto& files = inputs[picking_sr.level - start_level].files;
+      for (auto* f : vstorage->LevelFiles(picking_sr.level)) {
+        files.push_back(f);
+      }
+    }
+    char file_num_buf[256];
+    picking_sr.DumpSizeInfo(file_num_buf, sizeof(file_num_buf), i);
+    ROCKS_LOG_BUFFER(log_buffer, "[%s] Universal: Picking %s", cf_name.c_str(),
+                     file_num_buf);
+  }
+
+  CompactionReason compaction_reason;
+  if (max_number_of_files_to_compact == UINT_MAX) {
+    compaction_reason = CompactionReason::kUniversalSortedRunNum;
+  } else {
+    compaction_reason = CompactionReason::kUniversalSizeRatio;
+  }
+  return new Compaction(
+      vstorage, ioptions_, mutable_cf_options, std::move(inputs), output_level,
+      mutable_cf_options.MaxFileSizeForLevel(output_level), LLONG_MAX, path_id,
+      GetCompressionType(ioptions_, vstorage, mutable_cf_options, start_level,
+                         1, enable_compression),
+      /* grandparents */ {}, /* is manual */ false, score,
+      false /* deletion_compaction */, compaction_reason);
+}
+
+// Look at overall size amplification. If size amplification
+// exceeeds the configured value, then do a compaction
+// of the candidate files all the way upto the earliest
+// base file (overrides configured values of file-size ratios,
+// min_merge_width and max_merge_width).
+//
+Compaction* UniversalCompactionPicker::PickCompactionToReduceSizeAmp(
+    const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
+    VersionStorageInfo* vstorage, double score,
+    const std::vector<SortedRun>& sorted_runs, LogBuffer* log_buffer) {
+  // percentage flexibility while reducing size amplification
+  uint64_t ratio =
+      ioptions_.compaction_options_universal.max_size_amplification_percent;
+
+  unsigned int candidate_count = 0;
+  uint64_t candidate_size = 0;
+  size_t start_index = 0;
+  const SortedRun* sr = nullptr;
+
+  // Skip files that are already being compacted
+  for (size_t loop = 0; loop < sorted_runs.size() - 1; loop++) {
+    sr = &sorted_runs[loop];
+    if (!sr->being_compacted) {
+      start_index = loop;  // Consider this as the first candidate.
+      break;
+    }
+    char file_num_buf[kFormatFileNumberBufSize];
+    sr->Dump(file_num_buf, sizeof(file_num_buf), true);
+    ROCKS_LOG_BUFFER(log_buffer, "[%s] Universal: skipping %s[%d] compacted %s",
+                     cf_name.c_str(), file_num_buf, loop,
+                     " cannot be a candidate to reduce size amp.\n");
+    sr = nullptr;
+  }
+
+  if (sr == nullptr) {
+    return nullptr;  // no candidate files
+  }
+  {
+    char file_num_buf[kFormatFileNumberBufSize];
+    sr->Dump(file_num_buf, sizeof(file_num_buf), true);
+    ROCKS_LOG_BUFFER(
+        log_buffer,
+        "[%s] Universal: First candidate %s[%" ROCKSDB_PRIszt "] %s",
+        cf_name.c_str(), file_num_buf, start_index, " to reduce size amp.\n");
+  }
+
+  // keep adding up all the remaining files
+  for (size_t loop = start_index; loop < sorted_runs.size() - 1; loop++) {
+    sr = &sorted_runs[loop];
+    if (sr->being_compacted) {
+      char file_num_buf[kFormatFileNumberBufSize];
+      sr->Dump(file_num_buf, sizeof(file_num_buf), true);
+      ROCKS_LOG_BUFFER(
+          log_buffer, "[%s] Universal: Possible candidate %s[%d] %s",
+          cf_name.c_str(), file_num_buf, start_index,
+          " is already being compacted. No size amp reduction possible.\n");
+      return nullptr;
+    }
+    candidate_size += sr->compensated_file_size;
+    candidate_count++;
+  }
+  if (candidate_count == 0) {
+    return nullptr;
+  }
+
+  // size of earliest file
+  uint64_t earliest_file_size = sorted_runs.back().size;
+
+  // size amplification = percentage of additional size
+  if (candidate_size * 100 < ratio * earliest_file_size) {
+    ROCKS_LOG_BUFFER(
+        log_buffer,
+        "[%s] Universal: size amp not needed. newer-files-total-size %" PRIu64
+        " earliest-file-size %" PRIu64,
+        cf_name.c_str(), candidate_size, earliest_file_size);
+    return nullptr;
+  } else {
+    ROCKS_LOG_BUFFER(
+        log_buffer,
+        "[%s] Universal: size amp needed. newer-files-total-size %" PRIu64
+        " earliest-file-size %" PRIu64,
+        cf_name.c_str(), candidate_size, earliest_file_size);
+  }
+  assert(start_index < sorted_runs.size() - 1);
+
+  // Estimate total file size
+  uint64_t estimated_total_size = 0;
+  for (size_t loop = start_index; loop < sorted_runs.size(); loop++) {
+    estimated_total_size += sorted_runs[loop].size;
+  }
+  uint32_t path_id = GetPathId(ioptions_, estimated_total_size);
+  int start_level = sorted_runs[start_index].level;
+
+  std::vector<CompactionInputFiles> inputs(vstorage->num_levels());
+  for (size_t i = 0; i < inputs.size(); ++i) {
+    inputs[i].level = start_level + static_cast<int>(i);
+  }
+  // We always compact all the files, so always compress.
+  for (size_t loop = start_index; loop < sorted_runs.size(); loop++) {
+    auto& picking_sr = sorted_runs[loop];
+    if (picking_sr.level == 0) {
+      FileMetaData* f = picking_sr.file;
+      inputs[0].files.push_back(f);
+    } else {
+      auto& files = inputs[picking_sr.level - start_level].files;
+      for (auto* f : vstorage->LevelFiles(picking_sr.level)) {
+        files.push_back(f);
+      }
+    }
+    char file_num_buf[256];
+    picking_sr.DumpSizeInfo(file_num_buf, sizeof(file_num_buf), loop);
+    ROCKS_LOG_BUFFER(log_buffer, "[%s] Universal: size amp picking %s",
+                     cf_name.c_str(), file_num_buf);
+  }
+
+  // output files at the bottom most level, unless it's reserved
+  int output_level = vstorage->num_levels() - 1;
+  // last level is reserved for the files ingested behind
+  if (ioptions_.allow_ingest_behind) {
+    assert(output_level > 1);
+    output_level--;
+  }
+
+  return new Compaction(
+      vstorage, ioptions_, mutable_cf_options, std::move(inputs),
+      output_level, mutable_cf_options.MaxFileSizeForLevel(output_level),
+      /* max_grandparent_overlap_bytes */ LLONG_MAX, path_id,
+      GetCompressionType(ioptions_, vstorage, mutable_cf_options,
+                         output_level, 1),
+      /* grandparents */ {}, /* is manual */ false, score,
+      false /* deletion_compaction */,
+      CompactionReason::kUniversalSizeAmplification);
+}
+}  // namespace rocksdb
+
+#endif  // !ROCKSDB_LITE

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/compaction_picker_universal.h
----------------------------------------------------------------------
diff --git a/thirdparty/rocksdb/db/compaction_picker_universal.h b/thirdparty/rocksdb/db/compaction_picker_universal.h
new file mode 100644
index 0000000..3f2bed3
--- /dev/null
+++ b/thirdparty/rocksdb/db/compaction_picker_universal.h
@@ -0,0 +1,91 @@
+//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#pragma once
+#ifndef ROCKSDB_LITE
+
+#include "db/compaction_picker.h"
+
+namespace rocksdb {
+class UniversalCompactionPicker : public CompactionPicker {
+ public:
+  UniversalCompactionPicker(const ImmutableCFOptions& ioptions,
+                            const InternalKeyComparator* icmp)
+      : CompactionPicker(ioptions, icmp) {}
+  virtual Compaction* PickCompaction(const std::string& cf_name,
+                                     const MutableCFOptions& mutable_cf_options,
+                                     VersionStorageInfo* vstorage,
+                                     LogBuffer* log_buffer) override;
+
+  virtual int MaxOutputLevel() const override { return NumberLevels() - 1; }
+
+  virtual bool NeedsCompaction(
+      const VersionStorageInfo* vstorage) const override;
+
+ private:
+  struct SortedRun {
+    SortedRun(int _level, FileMetaData* _file, uint64_t _size,
+              uint64_t _compensated_file_size, bool _being_compacted)
+        : level(_level),
+          file(_file),
+          size(_size),
+          compensated_file_size(_compensated_file_size),
+          being_compacted(_being_compacted) {
+      assert(compensated_file_size > 0);
+      assert(level != 0 || file != nullptr);
+    }
+
+    void Dump(char* out_buf, size_t out_buf_size,
+              bool print_path = false) const;
+
+    // sorted_run_count is added into the string to print
+    void DumpSizeInfo(char* out_buf, size_t out_buf_size,
+                      size_t sorted_run_count) const;
+
+    int level;
+    // `file` Will be null for level > 0. For level = 0, the sorted run is
+    // for this file.
+    FileMetaData* file;
+    // For level > 0, `size` and `compensated_file_size` are sum of sizes all
+    // files in the level. `being_compacted` should be the same for all files
+    // in a non-zero level. Use the value here.
+    uint64_t size;
+    uint64_t compensated_file_size;
+    bool being_compacted;
+  };
+
+  // Pick Universal compaction to limit read amplification
+  Compaction* PickCompactionToReduceSortedRuns(
+      const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
+      VersionStorageInfo* vstorage, double score, unsigned int ratio,
+      unsigned int num_files, const std::vector<SortedRun>& sorted_runs,
+      LogBuffer* log_buffer);
+
+  // Pick Universal compaction to limit space amplification.
+  Compaction* PickCompactionToReduceSizeAmp(
+      const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
+      VersionStorageInfo* vstorage, double score,
+      const std::vector<SortedRun>& sorted_runs, LogBuffer* log_buffer);
+
+  // Used in universal compaction when the enabled_trivial_move
+  // option is set. Checks whether there are any overlapping files
+  // in the input. Returns true if the input files are non
+  // overlapping.
+  bool IsInputFilesNonOverlapping(Compaction* c);
+
+  static std::vector<SortedRun> CalculateSortedRuns(
+      const VersionStorageInfo& vstorage, const ImmutableCFOptions& ioptions);
+
+  // Pick a path ID to place a newly generated file, with its estimated file
+  // size.
+  static uint32_t GetPathId(const ImmutableCFOptions& ioptions,
+                            uint64_t file_size);
+};
+}  // namespace rocksdb
+#endif  // !ROCKSDB_LITE

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/convenience.cc
----------------------------------------------------------------------
diff --git a/thirdparty/rocksdb/db/convenience.cc b/thirdparty/rocksdb/db/convenience.cc
new file mode 100644
index 0000000..8ee31ca
--- /dev/null
+++ b/thirdparty/rocksdb/db/convenience.cc
@@ -0,0 +1,58 @@
+//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
+//
+
+#ifndef ROCKSDB_LITE
+
+#include "rocksdb/convenience.h"
+
+#include "db/db_impl.h"
+#include "util/cast_util.h"
+
+namespace rocksdb {
+
+void CancelAllBackgroundWork(DB* db, bool wait) {
+  (static_cast_with_check<DBImpl, DB>(db->GetRootDB()))
+      ->CancelAllBackgroundWork(wait);
+}
+
+Status DeleteFilesInRange(DB* db, ColumnFamilyHandle* column_family,
+                          const Slice* begin, const Slice* end) {
+  return (static_cast_with_check<DBImpl, DB>(db->GetRootDB()))
+      ->DeleteFilesInRange(column_family, begin, end);
+}
+
+Status VerifySstFileChecksum(const Options& options,
+                             const EnvOptions& env_options,
+                             const std::string& file_path) {
+  unique_ptr<RandomAccessFile> file;
+  uint64_t file_size;
+  InternalKeyComparator internal_comparator(options.comparator);
+  ImmutableCFOptions ioptions(options);
+
+  Status s = ioptions.env->NewRandomAccessFile(file_path, &file, env_options);
+  if (s.ok()) {
+    s = ioptions.env->GetFileSize(file_path, &file_size);
+  } else {
+    return s;
+  }
+  unique_ptr<TableReader> table_reader;
+  std::unique_ptr<RandomAccessFileReader> file_reader(
+      new RandomAccessFileReader(std::move(file), file_path));
+  s = ioptions.table_factory->NewTableReader(
+      TableReaderOptions(ioptions, env_options, internal_comparator,
+                         false /* skip_filters */, -1 /* level */),
+      std::move(file_reader), file_size, &table_reader,
+      false /* prefetch_index_and_filter_in_cache */);
+  if (!s.ok()) {
+    return s;
+  }
+  s = table_reader->VerifyChecksum();
+  return s;
+}
+
+}  // namespace rocksdb
+
+#endif  // ROCKSDB_LITE

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/db_filesnapshot.cc
----------------------------------------------------------------------
diff --git a/thirdparty/rocksdb/db/db_filesnapshot.cc b/thirdparty/rocksdb/db/db_filesnapshot.cc
new file mode 100644
index 0000000..e266bf1
--- /dev/null
+++ b/thirdparty/rocksdb/db/db_filesnapshot.cc
@@ -0,0 +1,149 @@
+//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
+//
+
+#ifndef ROCKSDB_LITE
+
+#ifndef __STDC_FORMAT_MACROS
+#define __STDC_FORMAT_MACROS
+#endif
+
+#include <inttypes.h>
+#include <stdint.h>
+#include <algorithm>
+#include <string>
+#include "db/db_impl.h"
+#include "db/job_context.h"
+#include "db/version_set.h"
+#include "port/port.h"
+#include "rocksdb/db.h"
+#include "rocksdb/env.h"
+#include "util/file_util.h"
+#include "util/filename.h"
+#include "util/mutexlock.h"
+#include "util/sync_point.h"
+
+namespace rocksdb {
+
+Status DBImpl::DisableFileDeletions() {
+  InstrumentedMutexLock l(&mutex_);
+  ++disable_delete_obsolete_files_;
+  if (disable_delete_obsolete_files_ == 1) {
+    ROCKS_LOG_INFO(immutable_db_options_.info_log, "File Deletions Disabled");
+  } else {
+    ROCKS_LOG_WARN(immutable_db_options_.info_log,
+                   "File Deletions Disabled, but already disabled. Counter: %d",
+                   disable_delete_obsolete_files_);
+  }
+  return Status::OK();
+}
+
+Status DBImpl::EnableFileDeletions(bool force) {
+  // Job id == 0 means that this is not our background process, but rather
+  // user thread
+  JobContext job_context(0);
+  bool should_purge_files = false;
+  {
+    InstrumentedMutexLock l(&mutex_);
+    if (force) {
+      // if force, we need to enable file deletions right away
+      disable_delete_obsolete_files_ = 0;
+    } else if (disable_delete_obsolete_files_ > 0) {
+      --disable_delete_obsolete_files_;
+    }
+    if (disable_delete_obsolete_files_ == 0)  {
+      ROCKS_LOG_INFO(immutable_db_options_.info_log, "File Deletions Enabled");
+      should_purge_files = true;
+      FindObsoleteFiles(&job_context, true);
+    } else {
+      ROCKS_LOG_WARN(
+          immutable_db_options_.info_log,
+          "File Deletions Enable, but not really enabled. Counter: %d",
+          disable_delete_obsolete_files_);
+    }
+  }
+  if (should_purge_files)  {
+    PurgeObsoleteFiles(job_context);
+  }
+  job_context.Clean();
+  LogFlush(immutable_db_options_.info_log);
+  return Status::OK();
+}
+
+int DBImpl::IsFileDeletionsEnabled() const {
+  return disable_delete_obsolete_files_;
+}
+
+Status DBImpl::GetLiveFiles(std::vector<std::string>& ret,
+                            uint64_t* manifest_file_size,
+                            bool flush_memtable) {
+  *manifest_file_size = 0;
+
+  mutex_.Lock();
+
+  if (flush_memtable) {
+    // flush all dirty data to disk.
+    Status status;
+    for (auto cfd : *versions_->GetColumnFamilySet()) {
+      if (cfd->IsDropped()) {
+        continue;
+      }
+      cfd->Ref();
+      mutex_.Unlock();
+      status = FlushMemTable(cfd, FlushOptions());
+      TEST_SYNC_POINT("DBImpl::GetLiveFiles:1");
+      TEST_SYNC_POINT("DBImpl::GetLiveFiles:2");
+      mutex_.Lock();
+      cfd->Unref();
+      if (!status.ok()) {
+        break;
+      }
+    }
+    versions_->GetColumnFamilySet()->FreeDeadColumnFamilies();
+
+    if (!status.ok()) {
+      mutex_.Unlock();
+      ROCKS_LOG_ERROR(immutable_db_options_.info_log, "Cannot Flush data %s\n",
+                      status.ToString().c_str());
+      return status;
+    }
+  }
+
+  // Make a set of all of the live *.sst files
+  std::vector<FileDescriptor> live;
+  for (auto cfd : *versions_->GetColumnFamilySet()) {
+    if (cfd->IsDropped()) {
+      continue;
+    }
+    cfd->current()->AddLiveFiles(&live);
+  }
+
+  ret.clear();
+  ret.reserve(live.size() + 3);  // *.sst + CURRENT + MANIFEST + OPTIONS
+
+  // create names of the live files. The names are not absolute
+  // paths, instead they are relative to dbname_;
+  for (auto live_file : live) {
+    ret.push_back(MakeTableFileName("", live_file.GetNumber()));
+  }
+
+  ret.push_back(CurrentFileName(""));
+  ret.push_back(DescriptorFileName("", versions_->manifest_file_number()));
+  ret.push_back(OptionsFileName("", versions_->options_file_number()));
+
+  // find length of manifest file while holding the mutex lock
+  *manifest_file_size = versions_->manifest_file_size();
+
+  mutex_.Unlock();
+  return Status::OK();
+}
+
+Status DBImpl::GetSortedWalFiles(VectorLogPtr& files) {
+  return wal_manager_.GetSortedWalFiles(files);
+}
+
+}
+
+#endif  // ROCKSDB_LITE


Mime
View raw message