nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jeremyd...@apache.org
Subject [21/51] [partial] nifi-minifi-cpp git commit: MINIFI-372: Replace leveldb with RocksDB
Date Mon, 09 Oct 2017 16:25:01 GMT
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/db_impl_compaction_flush.cc
----------------------------------------------------------------------
diff --git a/thirdparty/rocksdb/db/db_impl_compaction_flush.cc b/thirdparty/rocksdb/db/db_impl_compaction_flush.cc
new file mode 100644
index 0000000..3e686fe
--- /dev/null
+++ b/thirdparty/rocksdb/db/db_impl_compaction_flush.cc
@@ -0,0 +1,1910 @@
+//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+#include "db/db_impl.h"
+
+#ifndef __STDC_FORMAT_MACROS
+#define __STDC_FORMAT_MACROS
+#endif
+#include <inttypes.h>
+
+#include "db/builder.h"
+#include "db/event_helpers.h"
+#include "monitoring/iostats_context_imp.h"
+#include "monitoring/perf_context_imp.h"
+#include "monitoring/thread_status_updater.h"
+#include "monitoring/thread_status_util.h"
+#include "util/sst_file_manager_impl.h"
+#include "util/sync_point.h"
+
+namespace rocksdb {
+Status DBImpl::SyncClosedLogs(JobContext* job_context) {
+  TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Start");
+  mutex_.AssertHeld();
+  autovector<log::Writer*, 1> logs_to_sync;
+  uint64_t current_log_number = logfile_number_;
+  while (logs_.front().number < current_log_number &&
+         logs_.front().getting_synced) {
+    log_sync_cv_.Wait();
+  }
+  for (auto it = logs_.begin();
+       it != logs_.end() && it->number < current_log_number; ++it) {
+    auto& log = *it;
+    assert(!log.getting_synced);
+    log.getting_synced = true;
+    logs_to_sync.push_back(log.writer);
+  }
+
+  Status s;
+  if (!logs_to_sync.empty()) {
+    mutex_.Unlock();
+
+    for (log::Writer* log : logs_to_sync) {
+      ROCKS_LOG_INFO(immutable_db_options_.info_log,
+                     "[JOB %d] Syncing log #%" PRIu64, job_context->job_id,
+                     log->get_log_number());
+      s = log->file()->Sync(immutable_db_options_.use_fsync);
+    }
+    if (s.ok()) {
+      s = directories_.GetWalDir()->Fsync();
+    }
+
+    mutex_.Lock();
+
+    // "number <= current_log_number - 1" is equivalent to
+    // "number < current_log_number".
+    MarkLogsSynced(current_log_number - 1, true, s);
+    if (!s.ok()) {
+      Status new_bg_error = s;
+      // may temporarily unlock and lock the mutex.
+      EventHelpers::NotifyOnBackgroundError(immutable_db_options_.listeners,
+                                            BackgroundErrorReason::kFlush,
+                                            &new_bg_error, &mutex_);
+      if (!new_bg_error.ok()) {
+        bg_error_ = new_bg_error;
+      }
+      TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Failed");
+      return s;
+    }
+  }
+  return s;
+}
+
+Status DBImpl::FlushMemTableToOutputFile(
+    ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
+    bool* made_progress, JobContext* job_context, LogBuffer* log_buffer) {
+  mutex_.AssertHeld();
+  assert(cfd->imm()->NumNotFlushed() != 0);
+  assert(cfd->imm()->IsFlushPending());
+
+  SequenceNumber earliest_write_conflict_snapshot;
+  std::vector<SequenceNumber> snapshot_seqs =
+      snapshots_.GetAll(&earliest_write_conflict_snapshot);
+
+  FlushJob flush_job(
+      dbname_, cfd, immutable_db_options_, mutable_cf_options, env_options_,
+      versions_.get(), &mutex_, &shutting_down_, snapshot_seqs,
+      earliest_write_conflict_snapshot, job_context, log_buffer,
+      directories_.GetDbDir(), directories_.GetDataDir(0U),
+      GetCompressionFlush(*cfd->ioptions(), mutable_cf_options), stats_,
+      &event_logger_, mutable_cf_options.report_bg_io_stats);
+
+  FileMetaData file_meta;
+
+  flush_job.PickMemTable();
+
+#ifndef ROCKSDB_LITE
+  // may temporarily unlock and lock the mutex.
+  NotifyOnFlushBegin(cfd, &file_meta, mutable_cf_options, job_context->job_id,
+                     flush_job.GetTableProperties());
+#endif  // ROCKSDB_LITE
+
+  Status s;
+  if (logfile_number_ > 0 &&
+      versions_->GetColumnFamilySet()->NumberOfColumnFamilies() > 0) {
+    // If there are more than one column families, we need to make sure that
+    // all the log files except the most recent one are synced. Otherwise if
+    // the host crashes after flushing and before WAL is persistent, the
+    // flushed SST may contain data from write batches whose updates to
+    // other column families are missing.
+    // SyncClosedLogs() may unlock and re-lock the db_mutex.
+    s = SyncClosedLogs(job_context);
+  }
+
+  // Within flush_job.Run, rocksdb may call event listener to notify
+  // file creation and deletion.
+  //
+  // Note that flush_job.Run will unlock and lock the db_mutex,
+  // and EventListener callback will be called when the db_mutex
+  // is unlocked by the current thread.
+  if (s.ok()) {
+    s = flush_job.Run(&file_meta);
+  } else {
+    flush_job.Cancel();
+  }
+
+  if (s.ok()) {
+    InstallSuperVersionAndScheduleWorkWrapper(cfd, job_context,
+                                              mutable_cf_options);
+    if (made_progress) {
+      *made_progress = 1;
+    }
+    VersionStorageInfo::LevelSummaryStorage tmp;
+    ROCKS_LOG_BUFFER(log_buffer, "[%s] Level summary: %s\n",
+                     cfd->GetName().c_str(),
+                     cfd->current()->storage_info()->LevelSummary(&tmp));
+  }
+
+  if (!s.ok() && !s.IsShutdownInProgress() &&
+      immutable_db_options_.paranoid_checks && bg_error_.ok()) {
+    Status new_bg_error = s;
+    // may temporarily unlock and lock the mutex.
+    EventHelpers::NotifyOnBackgroundError(immutable_db_options_.listeners,
+                                          BackgroundErrorReason::kFlush,
+                                          &new_bg_error, &mutex_);
+    if (!new_bg_error.ok()) {
+      // if a bad error happened (not ShutdownInProgress), paranoid_checks is
+      // true, and the error isn't handled by callback, mark DB read-only
+      bg_error_ = new_bg_error;
+    }
+  }
+  if (s.ok()) {
+#ifndef ROCKSDB_LITE
+    // may temporarily unlock and lock the mutex.
+    NotifyOnFlushCompleted(cfd, &file_meta, mutable_cf_options,
+                           job_context->job_id, flush_job.GetTableProperties());
+    auto sfm = static_cast<SstFileManagerImpl*>(
+        immutable_db_options_.sst_file_manager.get());
+    if (sfm) {
+      // Notify sst_file_manager that a new file was added
+      std::string file_path = MakeTableFileName(
+          immutable_db_options_.db_paths[0].path, file_meta.fd.GetNumber());
+      sfm->OnAddFile(file_path);
+      if (sfm->IsMaxAllowedSpaceReached() && bg_error_.ok()) {
+        Status new_bg_error = Status::IOError("Max allowed space was reached");
+        TEST_SYNC_POINT_CALLBACK(
+            "DBImpl::FlushMemTableToOutputFile:MaxAllowedSpaceReached",
+            &new_bg_error);
+        // may temporarily unlock and lock the mutex.
+        EventHelpers::NotifyOnBackgroundError(immutable_db_options_.listeners,
+                                              BackgroundErrorReason::kFlush,
+                                              &new_bg_error, &mutex_);
+        if (!new_bg_error.ok()) {
+          bg_error_ = new_bg_error;
+        }
+      }
+    }
+#endif  // ROCKSDB_LITE
+  }
+  return s;
+}
+
+void DBImpl::NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta,
+                                const MutableCFOptions& mutable_cf_options,
+                                int job_id, TableProperties prop) {
+#ifndef ROCKSDB_LITE
+  if (immutable_db_options_.listeners.size() == 0U) {
+    return;
+  }
+  mutex_.AssertHeld();
+  if (shutting_down_.load(std::memory_order_acquire)) {
+    return;
+  }
+  bool triggered_writes_slowdown =
+      (cfd->current()->storage_info()->NumLevelFiles(0) >=
+       mutable_cf_options.level0_slowdown_writes_trigger);
+  bool triggered_writes_stop =
+      (cfd->current()->storage_info()->NumLevelFiles(0) >=
+       mutable_cf_options.level0_stop_writes_trigger);
+  // release lock while notifying events
+  mutex_.Unlock();
+  {
+    FlushJobInfo info;
+    info.cf_name = cfd->GetName();
+    // TODO(yhchiang): make db_paths dynamic in case flush does not
+    //                 go to L0 in the future.
+    info.file_path = MakeTableFileName(immutable_db_options_.db_paths[0].path,
+                                       file_meta->fd.GetNumber());
+    info.thread_id = env_->GetThreadID();
+    info.job_id = job_id;
+    info.triggered_writes_slowdown = triggered_writes_slowdown;
+    info.triggered_writes_stop = triggered_writes_stop;
+    info.smallest_seqno = file_meta->smallest_seqno;
+    info.largest_seqno = file_meta->largest_seqno;
+    info.table_properties = prop;
+    for (auto listener : immutable_db_options_.listeners) {
+      listener->OnFlushBegin(this, info);
+    }
+  }
+  mutex_.Lock();
+// no need to signal bg_cv_ as it will be signaled at the end of the
+// flush process.
+#endif  // ROCKSDB_LITE
+}
+
+void DBImpl::NotifyOnFlushCompleted(ColumnFamilyData* cfd,
+                                    FileMetaData* file_meta,
+                                    const MutableCFOptions& mutable_cf_options,
+                                    int job_id, TableProperties prop) {
+#ifndef ROCKSDB_LITE
+  if (immutable_db_options_.listeners.size() == 0U) {
+    return;
+  }
+  mutex_.AssertHeld();
+  if (shutting_down_.load(std::memory_order_acquire)) {
+    return;
+  }
+  bool triggered_writes_slowdown =
+      (cfd->current()->storage_info()->NumLevelFiles(0) >=
+       mutable_cf_options.level0_slowdown_writes_trigger);
+  bool triggered_writes_stop =
+      (cfd->current()->storage_info()->NumLevelFiles(0) >=
+       mutable_cf_options.level0_stop_writes_trigger);
+  // release lock while notifying events
+  mutex_.Unlock();
+  {
+    FlushJobInfo info;
+    info.cf_name = cfd->GetName();
+    // TODO(yhchiang): make db_paths dynamic in case flush does not
+    //                 go to L0 in the future.
+    info.file_path = MakeTableFileName(immutable_db_options_.db_paths[0].path,
+                                       file_meta->fd.GetNumber());
+    info.thread_id = env_->GetThreadID();
+    info.job_id = job_id;
+    info.triggered_writes_slowdown = triggered_writes_slowdown;
+    info.triggered_writes_stop = triggered_writes_stop;
+    info.smallest_seqno = file_meta->smallest_seqno;
+    info.largest_seqno = file_meta->largest_seqno;
+    info.table_properties = prop;
+    for (auto listener : immutable_db_options_.listeners) {
+      listener->OnFlushCompleted(this, info);
+    }
+  }
+  mutex_.Lock();
+  // no need to signal bg_cv_ as it will be signaled at the end of the
+  // flush process.
+#endif  // ROCKSDB_LITE
+}
+
+Status DBImpl::CompactRange(const CompactRangeOptions& options,
+                            ColumnFamilyHandle* column_family,
+                            const Slice* begin, const Slice* end) {
+  if (options.target_path_id >= immutable_db_options_.db_paths.size()) {
+    return Status::InvalidArgument("Invalid target path ID");
+  }
+
+  auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
+  auto cfd = cfh->cfd();
+  bool exclusive = options.exclusive_manual_compaction;
+
+  Status s = FlushMemTable(cfd, FlushOptions());
+  if (!s.ok()) {
+    LogFlush(immutable_db_options_.info_log);
+    return s;
+  }
+
+  int max_level_with_files = 0;
+  {
+    InstrumentedMutexLock l(&mutex_);
+    Version* base = cfd->current();
+    for (int level = 1; level < base->storage_info()->num_non_empty_levels();
+         level++) {
+      if (base->storage_info()->OverlapInLevel(level, begin, end)) {
+        max_level_with_files = level;
+      }
+    }
+  }
+
+  int final_output_level = 0;
+  if (cfd->ioptions()->compaction_style == kCompactionStyleUniversal &&
+      cfd->NumberLevels() > 1) {
+    // Always compact all files together.
+    final_output_level = cfd->NumberLevels() - 1;
+    // if bottom most level is reserved
+    if (immutable_db_options_.allow_ingest_behind) {
+      final_output_level--;
+    }
+    s = RunManualCompaction(cfd, ColumnFamilyData::kCompactAllLevels,
+                            final_output_level, options.target_path_id,
+                            begin, end, exclusive);
+  } else {
+    for (int level = 0; level <= max_level_with_files; level++) {
+      int output_level;
+      // in case the compaction is universal or if we're compacting the
+      // bottom-most level, the output level will be the same as input one.
+      // level 0 can never be the bottommost level (i.e. if all files are in
+      // level 0, we will compact to level 1)
+      if (cfd->ioptions()->compaction_style == kCompactionStyleUniversal ||
+          cfd->ioptions()->compaction_style == kCompactionStyleFIFO) {
+        output_level = level;
+      } else if (level == max_level_with_files && level > 0) {
+        if (options.bottommost_level_compaction ==
+            BottommostLevelCompaction::kSkip) {
+          // Skip bottommost level compaction
+          continue;
+        } else if (options.bottommost_level_compaction ==
+                       BottommostLevelCompaction::kIfHaveCompactionFilter &&
+                   cfd->ioptions()->compaction_filter == nullptr &&
+                   cfd->ioptions()->compaction_filter_factory == nullptr) {
+          // Skip bottommost level compaction since we don't have a compaction
+          // filter
+          continue;
+        }
+        output_level = level;
+      } else {
+        output_level = level + 1;
+        if (cfd->ioptions()->compaction_style == kCompactionStyleLevel &&
+            cfd->ioptions()->level_compaction_dynamic_level_bytes &&
+            level == 0) {
+          output_level = ColumnFamilyData::kCompactToBaseLevel;
+        }
+      }
+      s = RunManualCompaction(cfd, level, output_level, options.target_path_id,
+                              begin, end, exclusive);
+      if (!s.ok()) {
+        break;
+      }
+      if (output_level == ColumnFamilyData::kCompactToBaseLevel) {
+        final_output_level = cfd->NumberLevels() - 1;
+      } else if (output_level > final_output_level) {
+        final_output_level = output_level;
+      }
+      TEST_SYNC_POINT("DBImpl::RunManualCompaction()::1");
+      TEST_SYNC_POINT("DBImpl::RunManualCompaction()::2");
+    }
+  }
+  if (!s.ok()) {
+    LogFlush(immutable_db_options_.info_log);
+    return s;
+  }
+
+  if (options.change_level) {
+    ROCKS_LOG_INFO(immutable_db_options_.info_log,
+                   "[RefitLevel] waiting for background threads to stop");
+    s = PauseBackgroundWork();
+    if (s.ok()) {
+      s = ReFitLevel(cfd, final_output_level, options.target_level);
+    }
+    ContinueBackgroundWork();
+  }
+  LogFlush(immutable_db_options_.info_log);
+
+  {
+    InstrumentedMutexLock l(&mutex_);
+    // an automatic compaction that has been scheduled might have been
+    // preempted by the manual compactions. Need to schedule it back.
+    MaybeScheduleFlushOrCompaction();
+  }
+
+  return s;
+}
+
+Status DBImpl::CompactFiles(
+    const CompactionOptions& compact_options,
+    ColumnFamilyHandle* column_family,
+    const std::vector<std::string>& input_file_names,
+    const int output_level, const int output_path_id) {
+#ifdef ROCKSDB_LITE
+    // not supported in lite version
+  return Status::NotSupported("Not supported in ROCKSDB LITE");
+#else
+  if (column_family == nullptr) {
+    return Status::InvalidArgument("ColumnFamilyHandle must be non-null.");
+  }
+
+  auto cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd();
+  assert(cfd);
+
+  Status s;
+  JobContext job_context(0, true);
+  LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL,
+                       immutable_db_options_.info_log.get());
+
+  // Perform CompactFiles
+  SuperVersion* sv = cfd->GetReferencedSuperVersion(&mutex_);
+  {
+    InstrumentedMutexLock l(&mutex_);
+
+    // This call will unlock/lock the mutex to wait for current running
+    // IngestExternalFile() calls to finish.
+    WaitForIngestFile();
+
+    s = CompactFilesImpl(compact_options, cfd, sv->current,
+                         input_file_names, output_level,
+                         output_path_id, &job_context, &log_buffer);
+  }
+  if (sv->Unref()) {
+    mutex_.Lock();
+    sv->Cleanup();
+    mutex_.Unlock();
+    delete sv;
+  }
+
+  // Find and delete obsolete files
+  {
+    InstrumentedMutexLock l(&mutex_);
+    // If !s.ok(), this means that Compaction failed. In that case, we want
+    // to delete all obsolete files we might have created and we force
+    // FindObsoleteFiles(). This is because job_context does not
+    // catch all created files if compaction failed.
+    FindObsoleteFiles(&job_context, !s.ok());
+  }  // release the mutex
+
+  // delete unnecessary files if any, this is done outside the mutex
+  if (job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) {
+    // Have to flush the info logs before bg_compaction_scheduled_--
+    // because if bg_flush_scheduled_ becomes 0 and the lock is
+    // released, the deconstructor of DB can kick in and destroy all the
+    // states of DB so info_log might not be available after that point.
+    // It also applies to access other states that DB owns.
+    log_buffer.FlushBufferToLog();
+    if (job_context.HaveSomethingToDelete()) {
+      // no mutex is locked here.  No need to Unlock() and Lock() here.
+      PurgeObsoleteFiles(job_context);
+    }
+    job_context.Clean();
+  }
+
+  return s;
+#endif  // ROCKSDB_LITE
+}
+
+#ifndef ROCKSDB_LITE
+Status DBImpl::CompactFilesImpl(
+    const CompactionOptions& compact_options, ColumnFamilyData* cfd,
+    Version* version, const std::vector<std::string>& input_file_names,
+    const int output_level, int output_path_id, JobContext* job_context,
+    LogBuffer* log_buffer) {
+  mutex_.AssertHeld();
+
+  if (shutting_down_.load(std::memory_order_acquire)) {
+    return Status::ShutdownInProgress();
+  }
+
+  std::unordered_set<uint64_t> input_set;
+  for (auto file_name : input_file_names) {
+    input_set.insert(TableFileNameToNumber(file_name));
+  }
+
+  ColumnFamilyMetaData cf_meta;
+  // TODO(yhchiang): can directly use version here if none of the
+  // following functions call is pluggable to external developers.
+  version->GetColumnFamilyMetaData(&cf_meta);
+
+  if (output_path_id < 0) {
+    if (immutable_db_options_.db_paths.size() == 1U) {
+      output_path_id = 0;
+    } else {
+      return Status::NotSupported(
+          "Automatic output path selection is not "
+          "yet supported in CompactFiles()");
+    }
+  }
+
+  Status s = cfd->compaction_picker()->SanitizeCompactionInputFiles(
+      &input_set, cf_meta, output_level);
+  if (!s.ok()) {
+    return s;
+  }
+
+  std::vector<CompactionInputFiles> input_files;
+  s = cfd->compaction_picker()->GetCompactionInputsFromFileNumbers(
+      &input_files, &input_set, version->storage_info(), compact_options);
+  if (!s.ok()) {
+    return s;
+  }
+
+  for (auto inputs : input_files) {
+    if (cfd->compaction_picker()->AreFilesInCompaction(inputs.files)) {
+      return Status::Aborted(
+          "Some of the necessary compaction input "
+          "files are already being compacted");
+    }
+  }
+
+  // At this point, CompactFiles will be run.
+  bg_compaction_scheduled_++;
+
+  unique_ptr<Compaction> c;
+  assert(cfd->compaction_picker());
+  c.reset(cfd->compaction_picker()->CompactFiles(
+      compact_options, input_files, output_level, version->storage_info(),
+      *cfd->GetLatestMutableCFOptions(), output_path_id));
+  if (!c) {
+    return Status::Aborted("Another Level 0 compaction is running");
+  }
+  c->SetInputVersion(version);
+  // deletion compaction currently not allowed in CompactFiles.
+  assert(!c->deletion_compaction());
+
+  SequenceNumber earliest_write_conflict_snapshot;
+  std::vector<SequenceNumber> snapshot_seqs =
+      snapshots_.GetAll(&earliest_write_conflict_snapshot);
+
+  auto pending_outputs_inserted_elem =
+      CaptureCurrentFileNumberInPendingOutputs();
+
+  assert(is_snapshot_supported_ || snapshots_.empty());
+  CompactionJob compaction_job(
+      job_context->job_id, c.get(), immutable_db_options_, env_options_,
+      versions_.get(), &shutting_down_, log_buffer, directories_.GetDbDir(),
+      directories_.GetDataDir(c->output_path_id()), stats_, &mutex_, &bg_error_,
+      snapshot_seqs, earliest_write_conflict_snapshot, table_cache_,
+      &event_logger_, c->mutable_cf_options()->paranoid_file_checks,
+      c->mutable_cf_options()->report_bg_io_stats, dbname_,
+      nullptr);  // Here we pass a nullptr for CompactionJobStats because
+                 // CompactFiles does not trigger OnCompactionCompleted(),
+                 // which is the only place where CompactionJobStats is
+                 // returned.  The idea of not triggering OnCompationCompleted()
+                 // is that CompactFiles runs in the caller thread, so the user
+                 // should always know when it completes.  As a result, it makes
+                 // less sense to notify the users something they should already
+                 // know.
+                 //
+                 // In the future, if we would like to add CompactionJobStats
+                 // support for CompactFiles, we should have CompactFiles API
+                 // pass a pointer of CompactionJobStats as the out-value
+                 // instead of using EventListener.
+
+  // Creating a compaction influences the compaction score because the score
+  // takes running compactions into account (by skipping files that are already
+  // being compacted). Since we just changed compaction score, we recalculate it
+  // here.
+  version->storage_info()->ComputeCompactionScore(*cfd->ioptions(),
+                                                  *c->mutable_cf_options());
+
+  compaction_job.Prepare();
+
+  mutex_.Unlock();
+  TEST_SYNC_POINT("CompactFilesImpl:0");
+  TEST_SYNC_POINT("CompactFilesImpl:1");
+  compaction_job.Run();
+  TEST_SYNC_POINT("CompactFilesImpl:2");
+  TEST_SYNC_POINT("CompactFilesImpl:3");
+  mutex_.Lock();
+
+  Status status = compaction_job.Install(*c->mutable_cf_options());
+  if (status.ok()) {
+    InstallSuperVersionAndScheduleWorkWrapper(
+        c->column_family_data(), job_context, *c->mutable_cf_options());
+  }
+  c->ReleaseCompactionFiles(s);
+
+  ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem);
+
+  if (status.ok()) {
+    // Done
+  } else if (status.IsShutdownInProgress()) {
+    // Ignore compaction errors found during shutting down
+  } else {
+    ROCKS_LOG_WARN(immutable_db_options_.info_log,
+                   "[%s] [JOB %d] Compaction error: %s",
+                   c->column_family_data()->GetName().c_str(),
+                   job_context->job_id, status.ToString().c_str());
+    if (immutable_db_options_.paranoid_checks && bg_error_.ok()) {
+      Status new_bg_error = status;
+      // may temporarily unlock and lock the mutex.
+      EventHelpers::NotifyOnBackgroundError(immutable_db_options_.listeners,
+                                            BackgroundErrorReason::kCompaction,
+                                            &new_bg_error, &mutex_);
+      if (!new_bg_error.ok()) {
+        bg_error_ = new_bg_error;
+      }
+    }
+  }
+
+  c.reset();
+
+  bg_compaction_scheduled_--;
+  if (bg_compaction_scheduled_ == 0) {
+    bg_cv_.SignalAll();
+  }
+
+  return status;
+}
+#endif  // ROCKSDB_LITE
+
+Status DBImpl::PauseBackgroundWork() {
+  InstrumentedMutexLock guard_lock(&mutex_);
+  bg_compaction_paused_++;
+  while (bg_bottom_compaction_scheduled_ > 0 || bg_compaction_scheduled_ > 0 ||
+         bg_flush_scheduled_ > 0) {
+    bg_cv_.Wait();
+  }
+  bg_work_paused_++;
+  return Status::OK();
+}
+
+Status DBImpl::ContinueBackgroundWork() {
+  InstrumentedMutexLock guard_lock(&mutex_);
+  if (bg_work_paused_ == 0) {
+    return Status::InvalidArgument();
+  }
+  assert(bg_work_paused_ > 0);
+  assert(bg_compaction_paused_ > 0);
+  bg_compaction_paused_--;
+  bg_work_paused_--;
+  // It's sufficient to check just bg_work_paused_ here since
+  // bg_work_paused_ is always no greater than bg_compaction_paused_
+  if (bg_work_paused_ == 0) {
+    MaybeScheduleFlushOrCompaction();
+  }
+  return Status::OK();
+}
+
+void DBImpl::NotifyOnCompactionCompleted(
+    ColumnFamilyData* cfd, Compaction *c, const Status &st,
+    const CompactionJobStats& compaction_job_stats,
+    const int job_id) {
+#ifndef ROCKSDB_LITE
+  if (immutable_db_options_.listeners.size() == 0U) {
+    return;
+  }
+  mutex_.AssertHeld();
+  if (shutting_down_.load(std::memory_order_acquire)) {
+    return;
+  }
+  // release lock while notifying events
+  mutex_.Unlock();
+  TEST_SYNC_POINT("DBImpl::NotifyOnCompactionCompleted::UnlockMutex");
+  {
+    CompactionJobInfo info;
+    info.cf_name = cfd->GetName();
+    info.status = st;
+    info.thread_id = env_->GetThreadID();
+    info.job_id = job_id;
+    info.base_input_level = c->start_level();
+    info.output_level = c->output_level();
+    info.stats = compaction_job_stats;
+    info.table_properties = c->GetOutputTableProperties();
+    info.compaction_reason = c->compaction_reason();
+    info.compression = c->output_compression();
+    for (size_t i = 0; i < c->num_input_levels(); ++i) {
+      for (const auto fmd : *c->inputs(i)) {
+        auto fn = TableFileName(immutable_db_options_.db_paths,
+                                fmd->fd.GetNumber(), fmd->fd.GetPathId());
+        info.input_files.push_back(fn);
+        if (info.table_properties.count(fn) == 0) {
+          std::shared_ptr<const TableProperties> tp;
+          auto s = cfd->current()->GetTableProperties(&tp, fmd, &fn);
+          if (s.ok()) {
+            info.table_properties[fn] = tp;
+          }
+        }
+      }
+    }
+    for (const auto newf : c->edit()->GetNewFiles()) {
+      info.output_files.push_back(TableFileName(immutable_db_options_.db_paths,
+                                                newf.second.fd.GetNumber(),
+                                                newf.second.fd.GetPathId()));
+    }
+    for (auto listener : immutable_db_options_.listeners) {
+      listener->OnCompactionCompleted(this, info);
+    }
+  }
+  mutex_.Lock();
+  // no need to signal bg_cv_ as it will be signaled at the end of the
+  // flush process.
+#endif  // ROCKSDB_LITE
+}
+
+// REQUIREMENT: block all background work by calling PauseBackgroundWork()
+// before calling this function
+Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) {
+  assert(level < cfd->NumberLevels());
+  if (target_level >= cfd->NumberLevels()) {
+    return Status::InvalidArgument("Target level exceeds number of levels");
+  }
+
+  std::unique_ptr<SuperVersion> superversion_to_free;
+  std::unique_ptr<SuperVersion> new_superversion(new SuperVersion());
+
+  Status status;
+
+  InstrumentedMutexLock guard_lock(&mutex_);
+
+  // only allow one thread refitting
+  if (refitting_level_) {
+    ROCKS_LOG_INFO(immutable_db_options_.info_log,
+                   "[ReFitLevel] another thread is refitting");
+    return Status::NotSupported("another thread is refitting");
+  }
+  refitting_level_ = true;
+
+  const MutableCFOptions mutable_cf_options = *cfd->GetLatestMutableCFOptions();
+  // move to a smaller level
+  int to_level = target_level;
+  if (target_level < 0) {
+    to_level = FindMinimumEmptyLevelFitting(cfd, mutable_cf_options, level);
+  }
+
+  auto* vstorage = cfd->current()->storage_info();
+  if (to_level > level) {
+    if (level == 0) {
+      return Status::NotSupported(
+          "Cannot change from level 0 to other levels.");
+    }
+    // Check levels are empty for a trivial move
+    for (int l = level + 1; l <= to_level; l++) {
+      if (vstorage->NumLevelFiles(l) > 0) {
+        return Status::NotSupported(
+            "Levels between source and target are not empty for a move.");
+      }
+    }
+  }
+  if (to_level != level) {
+    ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
+                    "[%s] Before refitting:\n%s", cfd->GetName().c_str(),
+                    cfd->current()->DebugString().data());
+
+    VersionEdit edit;
+    edit.SetColumnFamily(cfd->GetID());
+    for (const auto& f : vstorage->LevelFiles(level)) {
+      edit.DeleteFile(level, f->fd.GetNumber());
+      edit.AddFile(to_level, f->fd.GetNumber(), f->fd.GetPathId(),
+                   f->fd.GetFileSize(), f->smallest, f->largest,
+                   f->smallest_seqno, f->largest_seqno,
+                   f->marked_for_compaction);
+    }
+    ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
+                    "[%s] Apply version edit:\n%s", cfd->GetName().c_str(),
+                    edit.DebugString().data());
+
+    status = versions_->LogAndApply(cfd, mutable_cf_options, &edit, &mutex_,
+                                    directories_.GetDbDir());
+    superversion_to_free.reset(InstallSuperVersionAndScheduleWork(
+        cfd, new_superversion.release(), mutable_cf_options));
+
+    ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "[%s] LogAndApply: %s\n",
+                    cfd->GetName().c_str(), status.ToString().data());
+
+    if (status.ok()) {
+      ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
+                      "[%s] After refitting:\n%s", cfd->GetName().c_str(),
+                      cfd->current()->DebugString().data());
+    }
+  }
+
+  refitting_level_ = false;
+
+  return status;
+}
+
+int DBImpl::NumberLevels(ColumnFamilyHandle* column_family) {
+  auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
+  return cfh->cfd()->NumberLevels();
+}
+
+int DBImpl::MaxMemCompactionLevel(ColumnFamilyHandle* column_family) {
+  return 0;
+}
+
+int DBImpl::Level0StopWriteTrigger(ColumnFamilyHandle* column_family) {
+  auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
+  InstrumentedMutexLock l(&mutex_);
+  return cfh->cfd()->GetSuperVersion()->
+      mutable_cf_options.level0_stop_writes_trigger;
+}
+
+Status DBImpl::Flush(const FlushOptions& flush_options,
+                     ColumnFamilyHandle* column_family) {
+  auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
+  return FlushMemTable(cfh->cfd(), flush_options);
+}
+
+Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level,
+                                   int output_level, uint32_t output_path_id,
+                                   const Slice* begin, const Slice* end,
+                                   bool exclusive, bool disallow_trivial_move) {
+  assert(input_level == ColumnFamilyData::kCompactAllLevels ||
+         input_level >= 0);
+
+  InternalKey begin_storage, end_storage;
+  CompactionArg* ca;
+
+  bool scheduled = false;
+  bool manual_conflict = false;
+  ManualCompactionState manual;
+  manual.cfd = cfd;
+  manual.input_level = input_level;
+  manual.output_level = output_level;
+  manual.output_path_id = output_path_id;
+  manual.done = false;
+  manual.in_progress = false;
+  manual.incomplete = false;
+  manual.exclusive = exclusive;
+  manual.disallow_trivial_move = disallow_trivial_move;
+  // For universal compaction, we enforce every manual compaction to compact
+  // all files.
+  if (begin == nullptr ||
+      cfd->ioptions()->compaction_style == kCompactionStyleUniversal ||
+      cfd->ioptions()->compaction_style == kCompactionStyleFIFO) {
+    manual.begin = nullptr;
+  } else {
+    begin_storage.SetMaxPossibleForUserKey(*begin);
+    manual.begin = &begin_storage;
+  }
+  if (end == nullptr ||
+      cfd->ioptions()->compaction_style == kCompactionStyleUniversal ||
+      cfd->ioptions()->compaction_style == kCompactionStyleFIFO) {
+    manual.end = nullptr;
+  } else {
+    end_storage.SetMinPossibleForUserKey(*end);
+    manual.end = &end_storage;
+  }
+
+  TEST_SYNC_POINT("DBImpl::RunManualCompaction:0");
+  TEST_SYNC_POINT("DBImpl::RunManualCompaction:1");
+  InstrumentedMutexLock l(&mutex_);
+
+  // When a manual compaction arrives, temporarily disable scheduling of
+  // non-manual compactions and wait until the number of scheduled compaction
+  // jobs drops to zero. This is needed to ensure that this manual compaction
+  // can compact any range of keys/files.
+  //
+  // HasPendingManualCompaction() is true when at least one thread is inside
+  // RunManualCompaction(), i.e. during that time no other compaction will
+  // get scheduled (see MaybeScheduleFlushOrCompaction).
+  //
+  // Note that the following loop doesn't stop more that one thread calling
+  // RunManualCompaction() from getting to the second while loop below.
+  // However, only one of them will actually schedule compaction, while
+  // others will wait on a condition variable until it completes.
+
+  AddManualCompaction(&manual);
+  TEST_SYNC_POINT_CALLBACK("DBImpl::RunManualCompaction:NotScheduled", &mutex_);
+  if (exclusive) {
+    while (bg_bottom_compaction_scheduled_ > 0 ||
+           bg_compaction_scheduled_ > 0) {
+      TEST_SYNC_POINT("DBImpl::RunManualCompaction:WaitScheduled");
+      ROCKS_LOG_INFO(
+          immutable_db_options_.info_log,
+          "[%s] Manual compaction waiting for all other scheduled background "
+          "compactions to finish",
+          cfd->GetName().c_str());
+      bg_cv_.Wait();
+    }
+  }
+
+  ROCKS_LOG_INFO(immutable_db_options_.info_log,
+                 "[%s] Manual compaction starting", cfd->GetName().c_str());
+
+  // We don't check bg_error_ here, because if we get the error in compaction,
+  // the compaction will set manual.status to bg_error_ and set manual.done to
+  // true.
+  while (!manual.done) {
+    assert(HasPendingManualCompaction());
+    manual_conflict = false;
+    Compaction* compaction;
+    if (ShouldntRunManualCompaction(&manual) || (manual.in_progress == true) ||
+        scheduled ||
+        ((manual.manual_end = &manual.tmp_storage1) &&
+         ((compaction = manual.cfd->CompactRange(
+               *manual.cfd->GetLatestMutableCFOptions(), manual.input_level,
+               manual.output_level, manual.output_path_id, manual.begin,
+               manual.end, &manual.manual_end, &manual_conflict)) == nullptr) &&
+         manual_conflict)) {
+      // exclusive manual compactions should not see a conflict during
+      // CompactRange
+      assert(!exclusive || !manual_conflict);
+      // Running either this or some other manual compaction
+      bg_cv_.Wait();
+      if (scheduled && manual.incomplete == true) {
+        assert(!manual.in_progress);
+        scheduled = false;
+        manual.incomplete = false;
+      }
+    } else if (!scheduled) {
+      if (compaction == nullptr) {
+        manual.done = true;
+        bg_cv_.SignalAll();
+        continue;
+      }
+      ca = new CompactionArg;
+      ca->db = this;
+      ca->prepicked_compaction = new PrepickedCompaction;
+      ca->prepicked_compaction->manual_compaction_state = &manual;
+      ca->prepicked_compaction->compaction = compaction;
+      manual.incomplete = false;
+      bg_compaction_scheduled_++;
+      env_->Schedule(&DBImpl::BGWorkCompaction, ca, Env::Priority::LOW, this,
+                     &DBImpl::UnscheduleCallback);
+      scheduled = true;
+    }
+  }
+
+  assert(!manual.in_progress);
+  assert(HasPendingManualCompaction());
+  RemoveManualCompaction(&manual);
+  bg_cv_.SignalAll();
+  return manual.status;
+}
+
+Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
+                             const FlushOptions& flush_options,
+                             bool writes_stopped) {
+  Status s;
+  {
+    WriteContext context;
+    InstrumentedMutexLock guard_lock(&mutex_);
+
+    if (cfd->imm()->NumNotFlushed() == 0 && cfd->mem()->IsEmpty()) {
+      // Nothing to flush
+      return Status::OK();
+    }
+
+    WriteThread::Writer w;
+    if (!writes_stopped) {
+      write_thread_.EnterUnbatched(&w, &mutex_);
+    }
+
+    // SwitchMemtable() will release and reacquire mutex
+    // during execution
+    s = SwitchMemtable(cfd, &context);
+
+    if (!writes_stopped) {
+      write_thread_.ExitUnbatched(&w);
+    }
+
+    cfd->imm()->FlushRequested();
+
+    // schedule flush
+    SchedulePendingFlush(cfd);
+    MaybeScheduleFlushOrCompaction();
+  }
+
+  if (s.ok() && flush_options.wait) {
+    // Wait until the compaction completes
+    s = WaitForFlushMemTable(cfd);
+  }
+  return s;
+}
+
+Status DBImpl::WaitForFlushMemTable(ColumnFamilyData* cfd) {
+  Status s;
+  // Wait until the compaction completes
+  InstrumentedMutexLock l(&mutex_);
+  while (cfd->imm()->NumNotFlushed() > 0 && bg_error_.ok()) {
+    if (shutting_down_.load(std::memory_order_acquire)) {
+      return Status::ShutdownInProgress();
+    }
+    if (cfd->IsDropped()) {
+      // FlushJob cannot flush a dropped CF, if we did not break here
+      // we will loop forever since cfd->imm()->NumNotFlushed() will never
+      // drop to zero
+      return Status::InvalidArgument("Cannot flush a dropped CF");
+    }
+    bg_cv_.Wait();
+  }
+  if (!bg_error_.ok()) {
+    s = bg_error_;
+  }
+  return s;
+}
+
+Status DBImpl::EnableAutoCompaction(
+    const std::vector<ColumnFamilyHandle*>& column_family_handles) {
+  Status s;
+  for (auto cf_ptr : column_family_handles) {
+    Status status =
+        this->SetOptions(cf_ptr, {{"disable_auto_compactions", "false"}});
+    if (!status.ok()) {
+      s = status;
+    }
+  }
+
+  return s;
+}
+
+void DBImpl::MaybeScheduleFlushOrCompaction() {
+  mutex_.AssertHeld();
+  if (!opened_successfully_) {
+    // Compaction may introduce data race to DB open
+    return;
+  }
+  if (bg_work_paused_ > 0) {
+    // we paused the background work
+    return;
+  } else if (shutting_down_.load(std::memory_order_acquire)) {
+    // DB is being deleted; no more background compactions
+    return;
+  }
+  auto bg_job_limits = GetBGJobLimits();
+  bool is_flush_pool_empty =
+    env_->GetBackgroundThreads(Env::Priority::HIGH) == 0;
+  while (!is_flush_pool_empty && unscheduled_flushes_ > 0 &&
+         bg_flush_scheduled_ < bg_job_limits.max_flushes) {
+    unscheduled_flushes_--;
+    bg_flush_scheduled_++;
+    env_->Schedule(&DBImpl::BGWorkFlush, this, Env::Priority::HIGH, this);
+  }
+
+  // special case -- if high-pri (flush) thread pool is empty, then schedule
+  // flushes in low-pri (compaction) thread pool.
+  if (is_flush_pool_empty) {
+    while (unscheduled_flushes_ > 0 &&
+           bg_flush_scheduled_ + bg_compaction_scheduled_ <
+               bg_job_limits.max_flushes) {
+      unscheduled_flushes_--;
+      bg_flush_scheduled_++;
+      env_->Schedule(&DBImpl::BGWorkFlush, this, Env::Priority::LOW, this);
+    }
+  }
+
+  if (bg_compaction_paused_ > 0) {
+    // we paused the background compaction
+    return;
+  }
+
+  if (HasExclusiveManualCompaction()) {
+    // only manual compactions are allowed to run. don't schedule automatic
+    // compactions
+    return;
+  }
+
+  while (bg_compaction_scheduled_ < bg_job_limits.max_compactions &&
+         unscheduled_compactions_ > 0) {
+    CompactionArg* ca = new CompactionArg;
+    ca->db = this;
+    ca->prepicked_compaction = nullptr;
+    bg_compaction_scheduled_++;
+    unscheduled_compactions_--;
+    env_->Schedule(&DBImpl::BGWorkCompaction, ca, Env::Priority::LOW, this,
+                   &DBImpl::UnscheduleCallback);
+  }
+}
+
+DBImpl::BGJobLimits DBImpl::GetBGJobLimits() const {
+  mutex_.AssertHeld();
+  return GetBGJobLimits(immutable_db_options_.max_background_flushes,
+                        mutable_db_options_.max_background_compactions,
+                        mutable_db_options_.max_background_jobs,
+                        write_controller_.NeedSpeedupCompaction());
+}
+
+DBImpl::BGJobLimits DBImpl::GetBGJobLimits(int max_background_flushes,
+                                           int max_background_compactions,
+                                           int max_background_jobs,
+                                           bool parallelize_compactions) {
+  BGJobLimits res;
+  if (max_background_flushes == -1 && max_background_compactions == -1) {
+    // for our first stab implementing max_background_jobs, simply allocate a
+    // quarter of the threads to flushes.
+    res.max_flushes = std::max(1, max_background_jobs / 4);
+    res.max_compactions = std::max(1, max_background_jobs - res.max_flushes);
+  } else {
+    // compatibility code in case users haven't migrated to max_background_jobs,
+    // which automatically computes flush/compaction limits
+    res.max_flushes = std::max(1, max_background_flushes);
+    res.max_compactions = std::max(1, max_background_compactions);
+  }
+  if (!parallelize_compactions) {
+    // throttle background compactions until we deem necessary
+    res.max_compactions = 1;
+  }
+  return res;
+}
+
+void DBImpl::AddToCompactionQueue(ColumnFamilyData* cfd) {
+  assert(!cfd->pending_compaction());
+  cfd->Ref();
+  compaction_queue_.push_back(cfd);
+  cfd->set_pending_compaction(true);
+}
+
+ColumnFamilyData* DBImpl::PopFirstFromCompactionQueue() {
+  assert(!compaction_queue_.empty());
+  auto cfd = *compaction_queue_.begin();
+  compaction_queue_.pop_front();
+  assert(cfd->pending_compaction());
+  cfd->set_pending_compaction(false);
+  return cfd;
+}
+
+void DBImpl::AddToFlushQueue(ColumnFamilyData* cfd) {
+  assert(!cfd->pending_flush());
+  cfd->Ref();
+  flush_queue_.push_back(cfd);
+  cfd->set_pending_flush(true);
+}
+
+ColumnFamilyData* DBImpl::PopFirstFromFlushQueue() {
+  assert(!flush_queue_.empty());
+  auto cfd = *flush_queue_.begin();
+  flush_queue_.pop_front();
+  assert(cfd->pending_flush());
+  cfd->set_pending_flush(false);
+  return cfd;
+}
+
+void DBImpl::SchedulePendingFlush(ColumnFamilyData* cfd) {
+  if (!cfd->pending_flush() && cfd->imm()->IsFlushPending()) {
+    AddToFlushQueue(cfd);
+    ++unscheduled_flushes_;
+  }
+}
+
+void DBImpl::SchedulePendingCompaction(ColumnFamilyData* cfd) {
+  if (!cfd->pending_compaction() && cfd->NeedsCompaction()) {
+    AddToCompactionQueue(cfd);
+    ++unscheduled_compactions_;
+  }
+}
+
+void DBImpl::SchedulePendingPurge(std::string fname, FileType type,
+                                  uint64_t number, uint32_t path_id,
+                                  int job_id) {
+  mutex_.AssertHeld();
+  PurgeFileInfo file_info(fname, type, number, path_id, job_id);
+  purge_queue_.push_back(std::move(file_info));
+}
+
+void DBImpl::BGWorkFlush(void* db) {
+  IOSTATS_SET_THREAD_POOL_ID(Env::Priority::HIGH);
+  TEST_SYNC_POINT("DBImpl::BGWorkFlush");
+  reinterpret_cast<DBImpl*>(db)->BackgroundCallFlush();
+  TEST_SYNC_POINT("DBImpl::BGWorkFlush:done");
+}
+
+void DBImpl::BGWorkCompaction(void* arg) {
+  CompactionArg ca = *(reinterpret_cast<CompactionArg*>(arg));
+  delete reinterpret_cast<CompactionArg*>(arg);
+  IOSTATS_SET_THREAD_POOL_ID(Env::Priority::LOW);
+  TEST_SYNC_POINT("DBImpl::BGWorkCompaction");
+  auto prepicked_compaction =
+      static_cast<PrepickedCompaction*>(ca.prepicked_compaction);
+  reinterpret_cast<DBImpl*>(ca.db)->BackgroundCallCompaction(
+      prepicked_compaction, Env::Priority::LOW);
+  delete prepicked_compaction;
+}
+
+void DBImpl::BGWorkBottomCompaction(void* arg) {
+  CompactionArg ca = *(static_cast<CompactionArg*>(arg));
+  delete static_cast<CompactionArg*>(arg);
+  IOSTATS_SET_THREAD_POOL_ID(Env::Priority::BOTTOM);
+  TEST_SYNC_POINT("DBImpl::BGWorkBottomCompaction");
+  auto* prepicked_compaction = ca.prepicked_compaction;
+  assert(prepicked_compaction && prepicked_compaction->compaction &&
+         !prepicked_compaction->manual_compaction_state);
+  ca.db->BackgroundCallCompaction(prepicked_compaction, Env::Priority::BOTTOM);
+  delete prepicked_compaction;
+}
+
+void DBImpl::BGWorkPurge(void* db) {
+  IOSTATS_SET_THREAD_POOL_ID(Env::Priority::HIGH);
+  TEST_SYNC_POINT("DBImpl::BGWorkPurge:start");
+  reinterpret_cast<DBImpl*>(db)->BackgroundCallPurge();
+  TEST_SYNC_POINT("DBImpl::BGWorkPurge:end");
+}
+
+void DBImpl::UnscheduleCallback(void* arg) {
+  CompactionArg ca = *(reinterpret_cast<CompactionArg*>(arg));
+  delete reinterpret_cast<CompactionArg*>(arg);
+  if (ca.prepicked_compaction != nullptr) {
+    if (ca.prepicked_compaction->compaction != nullptr) {
+      delete ca.prepicked_compaction->compaction;
+    }
+    delete ca.prepicked_compaction;
+  }
+  TEST_SYNC_POINT("DBImpl::UnscheduleCallback");
+}
+
+Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context,
+                               LogBuffer* log_buffer) {
+  mutex_.AssertHeld();
+
+  Status status = bg_error_;
+  if (status.ok() && shutting_down_.load(std::memory_order_acquire)) {
+    status = Status::ShutdownInProgress();
+  }
+
+  if (!status.ok()) {
+    return status;
+  }
+
+  ColumnFamilyData* cfd = nullptr;
+  while (!flush_queue_.empty()) {
+    // This cfd is already referenced
+    auto first_cfd = PopFirstFromFlushQueue();
+
+    if (first_cfd->IsDropped() || !first_cfd->imm()->IsFlushPending()) {
+      // can't flush this CF, try next one
+      if (first_cfd->Unref()) {
+        delete first_cfd;
+      }
+      continue;
+    }
+
+    // found a flush!
+    cfd = first_cfd;
+    break;
+  }
+
+  if (cfd != nullptr) {
+    const MutableCFOptions mutable_cf_options =
+        *cfd->GetLatestMutableCFOptions();
+    auto bg_job_limits = GetBGJobLimits();
+    ROCKS_LOG_BUFFER(
+        log_buffer,
+        "Calling FlushMemTableToOutputFile with column "
+        "family [%s], flush slots available %d, compaction slots available %d, "
+        "flush slots scheduled %d, compaction slots scheduled %d",
+        cfd->GetName().c_str(), bg_job_limits.max_flushes,
+        bg_job_limits.max_compactions, bg_flush_scheduled_,
+        bg_compaction_scheduled_);
+    status = FlushMemTableToOutputFile(cfd, mutable_cf_options, made_progress,
+                                       job_context, log_buffer);
+    if (cfd->Unref()) {
+      delete cfd;
+    }
+  }
+  return status;
+}
+
+void DBImpl::BackgroundCallFlush() {
+  bool made_progress = false;
+  JobContext job_context(next_job_id_.fetch_add(1), true);
+
+  TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:start");
+
+  LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL,
+                       immutable_db_options_.info_log.get());
+  {
+    InstrumentedMutexLock l(&mutex_);
+    assert(bg_flush_scheduled_);
+    num_running_flushes_++;
+
+    auto pending_outputs_inserted_elem =
+        CaptureCurrentFileNumberInPendingOutputs();
+
+    Status s = BackgroundFlush(&made_progress, &job_context, &log_buffer);
+    if (!s.ok() && !s.IsShutdownInProgress()) {
+      // Wait a little bit before retrying background flush in
+      // case this is an environmental problem and we do not want to
+      // chew up resources for failed flushes for the duration of
+      // the problem.
+      uint64_t error_cnt =
+        default_cf_internal_stats_->BumpAndGetBackgroundErrorCount();
+      bg_cv_.SignalAll();  // In case a waiter can proceed despite the error
+      mutex_.Unlock();
+      ROCKS_LOG_ERROR(immutable_db_options_.info_log,
+                      "Waiting after background flush error: %s"
+                      "Accumulated background error counts: %" PRIu64,
+                      s.ToString().c_str(), error_cnt);
+      log_buffer.FlushBufferToLog();
+      LogFlush(immutable_db_options_.info_log);
+      env_->SleepForMicroseconds(1000000);
+      mutex_.Lock();
+    }
+
+    ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem);
+
+    // If flush failed, we want to delete all temporary files that we might have
+    // created. Thus, we force full scan in FindObsoleteFiles()
+    FindObsoleteFiles(&job_context, !s.ok() && !s.IsShutdownInProgress());
+    // delete unnecessary files if any, this is done outside the mutex
+    if (job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) {
+      mutex_.Unlock();
+      // Have to flush the info logs before bg_flush_scheduled_--
+      // because if bg_flush_scheduled_ becomes 0 and the lock is
+      // released, the deconstructor of DB can kick in and destroy all the
+      // states of DB so info_log might not be available after that point.
+      // It also applies to access other states that DB owns.
+      log_buffer.FlushBufferToLog();
+      if (job_context.HaveSomethingToDelete()) {
+        PurgeObsoleteFiles(job_context);
+      }
+      job_context.Clean();
+      mutex_.Lock();
+    }
+
+    assert(num_running_flushes_ > 0);
+    num_running_flushes_--;
+    bg_flush_scheduled_--;
+    // See if there's more work to be done
+    MaybeScheduleFlushOrCompaction();
+    bg_cv_.SignalAll();
+    // IMPORTANT: there should be no code after calling SignalAll. This call may
+    // signal the DB destructor that it's OK to proceed with destruction. In
+    // that case, all DB variables will be dealloacated and referencing them
+    // will cause trouble.
+  }
+}
+
+void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction,
+                                      Env::Priority bg_thread_pri) {
+  bool made_progress = false;
+  JobContext job_context(next_job_id_.fetch_add(1), true);
+  TEST_SYNC_POINT("BackgroundCallCompaction:0");
+  MaybeDumpStats();
+  LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL,
+                       immutable_db_options_.info_log.get());
+  {
+    InstrumentedMutexLock l(&mutex_);
+
+    // This call will unlock/lock the mutex to wait for current running
+    // IngestExternalFile() calls to finish.
+    WaitForIngestFile();
+
+    num_running_compactions_++;
+
+    auto pending_outputs_inserted_elem =
+        CaptureCurrentFileNumberInPendingOutputs();
+
+    assert((bg_thread_pri == Env::Priority::BOTTOM &&
+            bg_bottom_compaction_scheduled_) ||
+           (bg_thread_pri == Env::Priority::LOW && bg_compaction_scheduled_));
+    Status s = BackgroundCompaction(&made_progress, &job_context, &log_buffer,
+                                    prepicked_compaction);
+    TEST_SYNC_POINT("BackgroundCallCompaction:1");
+    if (!s.ok() && !s.IsShutdownInProgress()) {
+      // Wait a little bit before retrying background compaction in
+      // case this is an environmental problem and we do not want to
+      // chew up resources for failed compactions for the duration of
+      // the problem.
+      uint64_t error_cnt =
+          default_cf_internal_stats_->BumpAndGetBackgroundErrorCount();
+      bg_cv_.SignalAll();  // In case a waiter can proceed despite the error
+      mutex_.Unlock();
+      log_buffer.FlushBufferToLog();
+      ROCKS_LOG_ERROR(immutable_db_options_.info_log,
+                      "Waiting after background compaction error: %s, "
+                      "Accumulated background error counts: %" PRIu64,
+                      s.ToString().c_str(), error_cnt);
+      LogFlush(immutable_db_options_.info_log);
+      env_->SleepForMicroseconds(1000000);
+      mutex_.Lock();
+    }
+
+    ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem);
+
+    // If compaction failed, we want to delete all temporary files that we might
+    // have created (they might not be all recorded in job_context in case of a
+    // failure). Thus, we force full scan in FindObsoleteFiles()
+    FindObsoleteFiles(&job_context, !s.ok() && !s.IsShutdownInProgress());
+
+    // delete unnecessary files if any, this is done outside the mutex
+    if (job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) {
+      mutex_.Unlock();
+      // Have to flush the info logs before bg_compaction_scheduled_--
+      // because if bg_flush_scheduled_ becomes 0 and the lock is
+      // released, the deconstructor of DB can kick in and destroy all the
+      // states of DB so info_log might not be available after that point.
+      // It also applies to access other states that DB owns.
+      log_buffer.FlushBufferToLog();
+      if (job_context.HaveSomethingToDelete()) {
+        PurgeObsoleteFiles(job_context);
+      }
+      job_context.Clean();
+      mutex_.Lock();
+    }
+
+    assert(num_running_compactions_ > 0);
+    num_running_compactions_--;
+    if (bg_thread_pri == Env::Priority::LOW) {
+      bg_compaction_scheduled_--;
+    } else {
+      assert(bg_thread_pri == Env::Priority::BOTTOM);
+      bg_bottom_compaction_scheduled_--;
+    }
+
+    versions_->GetColumnFamilySet()->FreeDeadColumnFamilies();
+
+    // See if there's more work to be done
+    MaybeScheduleFlushOrCompaction();
+    if (made_progress ||
+        (bg_compaction_scheduled_ == 0 &&
+         bg_bottom_compaction_scheduled_ == 0) ||
+        HasPendingManualCompaction()) {
+      // signal if
+      // * made_progress -- need to wakeup DelayWrite
+      // * bg_{bottom,}_compaction_scheduled_ == 0 -- need to wakeup ~DBImpl
+      // * HasPendingManualCompaction -- need to wakeup RunManualCompaction
+      // If none of this is true, there is no need to signal since nobody is
+      // waiting for it
+      bg_cv_.SignalAll();
+    }
+    // IMPORTANT: there should be no code after calling SignalAll. This call may
+    // signal the DB destructor that it's OK to proceed with destruction. In
+    // that case, all DB variables will be dealloacated and referencing them
+    // will cause trouble.
+  }
+}
+
+Status DBImpl::BackgroundCompaction(bool* made_progress,
+                                    JobContext* job_context,
+                                    LogBuffer* log_buffer,
+                                    PrepickedCompaction* prepicked_compaction) {
+  ManualCompactionState* manual_compaction =
+      prepicked_compaction == nullptr
+          ? nullptr
+          : prepicked_compaction->manual_compaction_state;
+  *made_progress = false;
+  mutex_.AssertHeld();
+  TEST_SYNC_POINT("DBImpl::BackgroundCompaction:Start");
+
+  bool is_manual = (manual_compaction != nullptr);
+  unique_ptr<Compaction> c;
+  if (prepicked_compaction != nullptr &&
+      prepicked_compaction->compaction != nullptr) {
+    c.reset(prepicked_compaction->compaction);
+  }
+  bool is_prepicked = is_manual || c;
+
+  // (manual_compaction->in_progress == false);
+  bool trivial_move_disallowed =
+      is_manual && manual_compaction->disallow_trivial_move;
+
+  CompactionJobStats compaction_job_stats;
+  Status status = bg_error_;
+  if (status.ok() && shutting_down_.load(std::memory_order_acquire)) {
+    status = Status::ShutdownInProgress();
+  }
+
+  if (!status.ok()) {
+    if (is_manual) {
+      manual_compaction->status = status;
+      manual_compaction->done = true;
+      manual_compaction->in_progress = false;
+      manual_compaction = nullptr;
+    }
+    return status;
+  }
+
+  if (is_manual) {
+    // another thread cannot pick up the same work
+    manual_compaction->in_progress = true;
+  }
+
+  // InternalKey manual_end_storage;
+  // InternalKey* manual_end = &manual_end_storage;
+  if (is_manual) {
+    ManualCompactionState* m = manual_compaction;
+    assert(m->in_progress);
+    if (!c) {
+      m->done = true;
+      m->manual_end = nullptr;
+      ROCKS_LOG_BUFFER(log_buffer,
+                       "[%s] Manual compaction from level-%d from %s .. "
+                       "%s; nothing to do\n",
+                       m->cfd->GetName().c_str(), m->input_level,
+                       (m->begin ? m->begin->DebugString().c_str() : "(begin)"),
+                       (m->end ? m->end->DebugString().c_str() : "(end)"));
+    } else {
+      ROCKS_LOG_BUFFER(
+          log_buffer,
+          "[%s] Manual compaction from level-%d to level-%d from %s .. "
+          "%s; will stop at %s\n",
+          m->cfd->GetName().c_str(), m->input_level, c->output_level(),
+          (m->begin ? m->begin->DebugString().c_str() : "(begin)"),
+          (m->end ? m->end->DebugString().c_str() : "(end)"),
+          ((m->done || m->manual_end == nullptr)
+               ? "(end)"
+               : m->manual_end->DebugString().c_str()));
+    }
+  } else if (!is_prepicked && !compaction_queue_.empty()) {
+    if (HaveManualCompaction(compaction_queue_.front())) {
+      // Can't compact right now, but try again later
+      TEST_SYNC_POINT("DBImpl::BackgroundCompaction()::Conflict");
+
+      // Stay in the compaction queue.
+      unscheduled_compactions_++;
+
+      return Status::OK();
+    }
+
+    // cfd is referenced here
+    auto cfd = PopFirstFromCompactionQueue();
+    // We unreference here because the following code will take a Ref() on
+    // this cfd if it is going to use it (Compaction class holds a
+    // reference).
+    // This will all happen under a mutex so we don't have to be afraid of
+    // somebody else deleting it.
+    if (cfd->Unref()) {
+      delete cfd;
+      // This was the last reference of the column family, so no need to
+      // compact.
+      return Status::OK();
+    }
+
+    // Pick up latest mutable CF Options and use it throughout the
+    // compaction job
+    // Compaction makes a copy of the latest MutableCFOptions. It should be used
+    // throughout the compaction procedure to make sure consistency. It will
+    // eventually be installed into SuperVersion
+    auto* mutable_cf_options = cfd->GetLatestMutableCFOptions();
+    if (!mutable_cf_options->disable_auto_compactions && !cfd->IsDropped()) {
+      // NOTE: try to avoid unnecessary copy of MutableCFOptions if
+      // compaction is not necessary. Need to make sure mutex is held
+      // until we make a copy in the following code
+      TEST_SYNC_POINT("DBImpl::BackgroundCompaction():BeforePickCompaction");
+      c.reset(cfd->PickCompaction(*mutable_cf_options, log_buffer));
+      TEST_SYNC_POINT("DBImpl::BackgroundCompaction():AfterPickCompaction");
+      if (c != nullptr) {
+        // update statistics
+        MeasureTime(stats_, NUM_FILES_IN_SINGLE_COMPACTION,
+                    c->inputs(0)->size());
+        // There are three things that can change compaction score:
+        // 1) When flush or compaction finish. This case is covered by
+        // InstallSuperVersionAndScheduleWork
+        // 2) When MutableCFOptions changes. This case is also covered by
+        // InstallSuperVersionAndScheduleWork, because this is when the new
+        // options take effect.
+        // 3) When we Pick a new compaction, we "remove" those files being
+        // compacted from the calculation, which then influences compaction
+        // score. Here we check if we need the new compaction even without the
+        // files that are currently being compacted. If we need another
+        // compaction, we might be able to execute it in parallel, so we add it
+        // to the queue and schedule a new thread.
+        if (cfd->NeedsCompaction()) {
+          // Yes, we need more compactions!
+          AddToCompactionQueue(cfd);
+          ++unscheduled_compactions_;
+          MaybeScheduleFlushOrCompaction();
+        }
+      }
+    }
+  }
+
+  if (!c) {
+    // Nothing to do
+    ROCKS_LOG_BUFFER(log_buffer, "Compaction nothing to do");
+  } else if (c->deletion_compaction()) {
+    // TODO(icanadi) Do we want to honor snapshots here? i.e. not delete old
+    // file if there is alive snapshot pointing to it
+    assert(c->num_input_files(1) == 0);
+    assert(c->level() == 0);
+    assert(c->column_family_data()->ioptions()->compaction_style ==
+           kCompactionStyleFIFO);
+
+    compaction_job_stats.num_input_files = c->num_input_files(0);
+
+    for (const auto& f : *c->inputs(0)) {
+      c->edit()->DeleteFile(c->level(), f->fd.GetNumber());
+    }
+    status = versions_->LogAndApply(c->column_family_data(),
+                                    *c->mutable_cf_options(), c->edit(),
+                                    &mutex_, directories_.GetDbDir());
+    InstallSuperVersionAndScheduleWorkWrapper(
+        c->column_family_data(), job_context, *c->mutable_cf_options());
+    ROCKS_LOG_BUFFER(log_buffer, "[%s] Deleted %d files\n",
+                     c->column_family_data()->GetName().c_str(),
+                     c->num_input_files(0));
+    *made_progress = true;
+  } else if (!trivial_move_disallowed && c->IsTrivialMove()) {
+    TEST_SYNC_POINT("DBImpl::BackgroundCompaction:TrivialMove");
+    // Instrument for event update
+    // TODO(yhchiang): add op details for showing trivial-move.
+    ThreadStatusUtil::SetColumnFamily(
+        c->column_family_data(), c->column_family_data()->ioptions()->env,
+        immutable_db_options_.enable_thread_tracking);
+    ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_COMPACTION);
+
+    compaction_job_stats.num_input_files = c->num_input_files(0);
+
+    // Move files to next level
+    int32_t moved_files = 0;
+    int64_t moved_bytes = 0;
+    for (unsigned int l = 0; l < c->num_input_levels(); l++) {
+      if (c->level(l) == c->output_level()) {
+        continue;
+      }
+      for (size_t i = 0; i < c->num_input_files(l); i++) {
+        FileMetaData* f = c->input(l, i);
+        c->edit()->DeleteFile(c->level(l), f->fd.GetNumber());
+        c->edit()->AddFile(c->output_level(), f->fd.GetNumber(),
+                           f->fd.GetPathId(), f->fd.GetFileSize(), f->smallest,
+                           f->largest, f->smallest_seqno, f->largest_seqno,
+                           f->marked_for_compaction);
+
+        ROCKS_LOG_BUFFER(log_buffer, "[%s] Moving #%" PRIu64
+                                     " to level-%d %" PRIu64 " bytes\n",
+                         c->column_family_data()->GetName().c_str(),
+                         f->fd.GetNumber(), c->output_level(),
+                         f->fd.GetFileSize());
+        ++moved_files;
+        moved_bytes += f->fd.GetFileSize();
+      }
+    }
+
+    status = versions_->LogAndApply(c->column_family_data(),
+                                    *c->mutable_cf_options(), c->edit(),
+                                    &mutex_, directories_.GetDbDir());
+    // Use latest MutableCFOptions
+    InstallSuperVersionAndScheduleWorkWrapper(
+        c->column_family_data(), job_context, *c->mutable_cf_options());
+
+    VersionStorageInfo::LevelSummaryStorage tmp;
+    c->column_family_data()->internal_stats()->IncBytesMoved(c->output_level(),
+                                                             moved_bytes);
+    {
+      event_logger_.LogToBuffer(log_buffer)
+          << "job" << job_context->job_id << "event"
+          << "trivial_move"
+          << "destination_level" << c->output_level() << "files" << moved_files
+          << "total_files_size" << moved_bytes;
+    }
+    ROCKS_LOG_BUFFER(
+        log_buffer,
+        "[%s] Moved #%d files to level-%d %" PRIu64 " bytes %s: %s\n",
+        c->column_family_data()->GetName().c_str(), moved_files,
+        c->output_level(), moved_bytes, status.ToString().c_str(),
+        c->column_family_data()->current()->storage_info()->LevelSummary(&tmp));
+    *made_progress = true;
+
+    // Clear Instrument
+    ThreadStatusUtil::ResetThreadStatus();
+  } else if (c->column_family_data()->ioptions()->compaction_style ==
+                 kCompactionStyleUniversal &&
+             !is_prepicked && c->output_level() > 0 &&
+             c->output_level() ==
+                 c->column_family_data()
+                     ->current()
+                     ->storage_info()
+                     ->MaxOutputLevel(
+                         immutable_db_options_.allow_ingest_behind) &&
+             env_->GetBackgroundThreads(Env::Priority::BOTTOM) > 0) {
+    // Forward universal compactions involving last level to the bottom pool
+    // if it exists, such that long-running compactions can't block short-
+    // lived ones, like L0->L0s.
+    TEST_SYNC_POINT("DBImpl::BackgroundCompaction:ForwardToBottomPriPool");
+    CompactionArg* ca = new CompactionArg;
+    ca->db = this;
+    ca->prepicked_compaction = new PrepickedCompaction;
+    ca->prepicked_compaction->compaction = c.release();
+    ca->prepicked_compaction->manual_compaction_state = nullptr;
+    ++bg_bottom_compaction_scheduled_;
+    env_->Schedule(&DBImpl::BGWorkBottomCompaction, ca, Env::Priority::BOTTOM,
+                   this, &DBImpl::UnscheduleCallback);
+  } else {
+    int output_level  __attribute__((unused)) = c->output_level();
+    TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:NonTrivial",
+                             &output_level);
+
+    SequenceNumber earliest_write_conflict_snapshot;
+    std::vector<SequenceNumber> snapshot_seqs =
+        snapshots_.GetAll(&earliest_write_conflict_snapshot);
+
+    assert(is_snapshot_supported_ || snapshots_.empty());
+    CompactionJob compaction_job(
+        job_context->job_id, c.get(), immutable_db_options_, env_options_,
+        versions_.get(), &shutting_down_, log_buffer, directories_.GetDbDir(),
+        directories_.GetDataDir(c->output_path_id()), stats_, &mutex_,
+        &bg_error_, snapshot_seqs, earliest_write_conflict_snapshot,
+        table_cache_, &event_logger_,
+        c->mutable_cf_options()->paranoid_file_checks,
+        c->mutable_cf_options()->report_bg_io_stats, dbname_,
+        &compaction_job_stats);
+    compaction_job.Prepare();
+
+    mutex_.Unlock();
+    compaction_job.Run();
+    TEST_SYNC_POINT("DBImpl::BackgroundCompaction:NonTrivial:AfterRun");
+    mutex_.Lock();
+
+    status = compaction_job.Install(*c->mutable_cf_options());
+    if (status.ok()) {
+      InstallSuperVersionAndScheduleWorkWrapper(
+          c->column_family_data(), job_context, *c->mutable_cf_options());
+    }
+    *made_progress = true;
+  }
+  if (c != nullptr) {
+    c->ReleaseCompactionFiles(status);
+    *made_progress = true;
+    NotifyOnCompactionCompleted(
+        c->column_family_data(), c.get(), status,
+        compaction_job_stats, job_context->job_id);
+  }
+  // this will unref its input_version and column_family_data
+  c.reset();
+
+  if (status.ok()) {
+    // Done
+  } else if (status.IsShutdownInProgress()) {
+    // Ignore compaction errors found during shutting down
+  } else {
+    ROCKS_LOG_WARN(immutable_db_options_.info_log, "Compaction error: %s",
+                   status.ToString().c_str());
+    if (immutable_db_options_.paranoid_checks && bg_error_.ok()) {
+      Status new_bg_error = status;
+      // may temporarily unlock and lock the mutex.
+      EventHelpers::NotifyOnBackgroundError(immutable_db_options_.listeners,
+                                            BackgroundErrorReason::kCompaction,
+                                            &new_bg_error, &mutex_);
+      if (!new_bg_error.ok()) {
+        bg_error_ = new_bg_error;
+      }
+    }
+  }
+
+  if (is_manual) {
+    ManualCompactionState* m = manual_compaction;
+    if (!status.ok()) {
+      m->status = status;
+      m->done = true;
+    }
+    // For universal compaction:
+    //   Because universal compaction always happens at level 0, so one
+    //   compaction will pick up all overlapped files. No files will be
+    //   filtered out due to size limit and left for a successive compaction.
+    //   So we can safely conclude the current compaction.
+    //
+    //   Also note that, if we don't stop here, then the current compaction
+    //   writes a new file back to level 0, which will be used in successive
+    //   compaction. Hence the manual compaction will never finish.
+    //
+    // Stop the compaction if manual_end points to nullptr -- this means
+    // that we compacted the whole range. manual_end should always point
+    // to nullptr in case of universal compaction
+    if (m->manual_end == nullptr) {
+      m->done = true;
+    }
+    if (!m->done) {
+      // We only compacted part of the requested range.  Update *m
+      // to the range that is left to be compacted.
+      // Universal and FIFO compactions should always compact the whole range
+      assert(m->cfd->ioptions()->compaction_style !=
+                 kCompactionStyleUniversal ||
+             m->cfd->ioptions()->num_levels > 1);
+      assert(m->cfd->ioptions()->compaction_style != kCompactionStyleFIFO);
+      m->tmp_storage = *m->manual_end;
+      m->begin = &m->tmp_storage;
+      m->incomplete = true;
+    }
+    m->in_progress = false; // not being processed anymore
+  }
+  TEST_SYNC_POINT("DBImpl::BackgroundCompaction:Finish");
+  return status;
+}
+
+bool DBImpl::HasPendingManualCompaction() {
+  return (!manual_compaction_dequeue_.empty());
+}
+
+void DBImpl::AddManualCompaction(DBImpl::ManualCompactionState* m) {
+  manual_compaction_dequeue_.push_back(m);
+}
+
+void DBImpl::RemoveManualCompaction(DBImpl::ManualCompactionState* m) {
+  // Remove from queue
+  std::deque<ManualCompactionState*>::iterator it =
+      manual_compaction_dequeue_.begin();
+  while (it != manual_compaction_dequeue_.end()) {
+    if (m == (*it)) {
+      it = manual_compaction_dequeue_.erase(it);
+      return;
+    }
+    it++;
+  }
+  assert(false);
+  return;
+}
+
+bool DBImpl::ShouldntRunManualCompaction(ManualCompactionState* m) {
+  if (num_running_ingest_file_ > 0) {
+    // We need to wait for other IngestExternalFile() calls to finish
+    // before running a manual compaction.
+    return true;
+  }
+  if (m->exclusive) {
+    return (bg_bottom_compaction_scheduled_ > 0 ||
+            bg_compaction_scheduled_ > 0);
+  }
+  std::deque<ManualCompactionState*>::iterator it =
+      manual_compaction_dequeue_.begin();
+  bool seen = false;
+  while (it != manual_compaction_dequeue_.end()) {
+    if (m == (*it)) {
+      it++;
+      seen = true;
+      continue;
+    } else if (MCOverlap(m, (*it)) && (!seen && !(*it)->in_progress)) {
+      // Consider the other manual compaction *it, conflicts if:
+      // overlaps with m
+      // and (*it) is ahead in the queue and is not yet in progress
+      return true;
+    }
+    it++;
+  }
+  return false;
+}
+
+bool DBImpl::HaveManualCompaction(ColumnFamilyData* cfd) {
+  // Remove from priority queue
+  std::deque<ManualCompactionState*>::iterator it =
+      manual_compaction_dequeue_.begin();
+  while (it != manual_compaction_dequeue_.end()) {
+    if ((*it)->exclusive) {
+      return true;
+    }
+    if ((cfd == (*it)->cfd) && (!((*it)->in_progress || (*it)->done))) {
+      // Allow automatic compaction if manual compaction is
+      // in progress
+      return true;
+    }
+    it++;
+  }
+  return false;
+}
+
+bool DBImpl::HasExclusiveManualCompaction() {
+  // Remove from priority queue
+  std::deque<ManualCompactionState*>::iterator it =
+      manual_compaction_dequeue_.begin();
+  while (it != manual_compaction_dequeue_.end()) {
+    if ((*it)->exclusive) {
+      return true;
+    }
+    it++;
+  }
+  return false;
+}
+
+bool DBImpl::MCOverlap(ManualCompactionState* m, ManualCompactionState* m1) {
+  if ((m->exclusive) || (m1->exclusive)) {
+    return true;
+  }
+  if (m->cfd != m1->cfd) {
+    return false;
+  }
+  return true;
+}
+
+// JobContext gets created and destructed outside of the lock --
+// we
+// use this convinently to:
+// * malloc one SuperVersion() outside of the lock -- new_superversion
+// * delete SuperVersion()s outside of the lock -- superversions_to_free
+//
+// However, if InstallSuperVersionAndScheduleWork() gets called twice with the
+// same job_context, we can't reuse the SuperVersion() that got
+// malloced because
+// first call already used it. In that rare case, we take a hit and create a
+// new SuperVersion() inside of the mutex. We do similar thing
+// for superversion_to_free
+void DBImpl::InstallSuperVersionAndScheduleWorkWrapper(
+    ColumnFamilyData* cfd, JobContext* job_context,
+    const MutableCFOptions& mutable_cf_options) {
+  mutex_.AssertHeld();
+  SuperVersion* old_superversion = InstallSuperVersionAndScheduleWork(
+      cfd, job_context->new_superversion, mutable_cf_options);
+  job_context->new_superversion = nullptr;
+  job_context->superversions_to_free.push_back(old_superversion);
+}
+
+SuperVersion* DBImpl::InstallSuperVersionAndScheduleWork(
+    ColumnFamilyData* cfd, SuperVersion* new_sv,
+    const MutableCFOptions& mutable_cf_options) {
+  mutex_.AssertHeld();
+
+  // Update max_total_in_memory_state_
+  size_t old_memtable_size = 0;
+  auto* old_sv = cfd->GetSuperVersion();
+  if (old_sv) {
+    old_memtable_size = old_sv->mutable_cf_options.write_buffer_size *
+                        old_sv->mutable_cf_options.max_write_buffer_number;
+  }
+
+  auto* old = cfd->InstallSuperVersion(
+      new_sv ? new_sv : new SuperVersion(), &mutex_, mutable_cf_options);
+
+  // Whenever we install new SuperVersion, we might need to issue new flushes or
+  // compactions.
+  SchedulePendingFlush(cfd);
+  SchedulePendingCompaction(cfd);
+  MaybeScheduleFlushOrCompaction();
+
+  // Update max_total_in_memory_state_
+  max_total_in_memory_state_ =
+      max_total_in_memory_state_ - old_memtable_size +
+      mutable_cf_options.write_buffer_size *
+      mutable_cf_options.max_write_buffer_number;
+  return old;
+}
+}  // namespace rocksdb

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/db_impl_debug.cc
----------------------------------------------------------------------
diff --git a/thirdparty/rocksdb/db/db_impl_debug.cc b/thirdparty/rocksdb/db/db_impl_debug.cc
new file mode 100644
index 0000000..de5b66f
--- /dev/null
+++ b/thirdparty/rocksdb/db/db_impl_debug.cc
@@ -0,0 +1,200 @@
+//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#ifndef NDEBUG
+
+#include "db/db_impl.h"
+#include "monitoring/thread_status_updater.h"
+
+namespace rocksdb {
+
+uint64_t DBImpl::TEST_GetLevel0TotalSize() {
+  InstrumentedMutexLock l(&mutex_);
+  return default_cf_handle_->cfd()->current()->storage_info()->NumLevelBytes(0);
+}
+
+void DBImpl::TEST_HandleWALFull() {
+  WriteContext write_context;
+  InstrumentedMutexLock l(&mutex_);
+  HandleWALFull(&write_context);
+}
+
+int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes(
+    ColumnFamilyHandle* column_family) {
+  ColumnFamilyData* cfd;
+  if (column_family == nullptr) {
+    cfd = default_cf_handle_->cfd();
+  } else {
+    auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
+    cfd = cfh->cfd();
+  }
+  InstrumentedMutexLock l(&mutex_);
+  return cfd->current()->storage_info()->MaxNextLevelOverlappingBytes();
+}
+
+void DBImpl::TEST_GetFilesMetaData(
+    ColumnFamilyHandle* column_family,
+    std::vector<std::vector<FileMetaData>>* metadata) {
+  auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
+  auto cfd = cfh->cfd();
+  InstrumentedMutexLock l(&mutex_);
+  metadata->resize(NumberLevels());
+  for (int level = 0; level < NumberLevels(); level++) {
+    const std::vector<FileMetaData*>& files =
+        cfd->current()->storage_info()->LevelFiles(level);
+
+    (*metadata)[level].clear();
+    for (const auto& f : files) {
+      (*metadata)[level].push_back(*f);
+    }
+  }
+}
+
+uint64_t DBImpl::TEST_Current_Manifest_FileNo() {
+  return versions_->manifest_file_number();
+}
+
+Status DBImpl::TEST_CompactRange(int level, const Slice* begin,
+                                 const Slice* end,
+                                 ColumnFamilyHandle* column_family,
+                                 bool disallow_trivial_move) {
+  ColumnFamilyData* cfd;
+  if (column_family == nullptr) {
+    cfd = default_cf_handle_->cfd();
+  } else {
+    auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
+    cfd = cfh->cfd();
+  }
+  int output_level =
+      (cfd->ioptions()->compaction_style == kCompactionStyleUniversal ||
+       cfd->ioptions()->compaction_style == kCompactionStyleFIFO)
+          ? level
+          : level + 1;
+  return RunManualCompaction(cfd, level, output_level, 0, begin, end, true,
+                             disallow_trivial_move);
+}
+
+Status DBImpl::TEST_FlushMemTable(bool wait, ColumnFamilyHandle* cfh) {
+  FlushOptions fo;
+  fo.wait = wait;
+  ColumnFamilyData* cfd;
+  if (cfh == nullptr) {
+    cfd = default_cf_handle_->cfd();
+  } else {
+    auto cfhi = reinterpret_cast<ColumnFamilyHandleImpl*>(cfh);
+    cfd = cfhi->cfd();
+  }
+  return FlushMemTable(cfd, fo);
+}
+
+Status DBImpl::TEST_WaitForFlushMemTable(ColumnFamilyHandle* column_family) {
+  ColumnFamilyData* cfd;
+  if (column_family == nullptr) {
+    cfd = default_cf_handle_->cfd();
+  } else {
+    auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
+    cfd = cfh->cfd();
+  }
+  return WaitForFlushMemTable(cfd);
+}
+
+Status DBImpl::TEST_WaitForCompact() {
+  // Wait until the compaction completes
+
+  // TODO: a bug here. This function actually does not necessarily
+  // wait for compact. It actually waits for scheduled compaction
+  // OR flush to finish.
+
+  InstrumentedMutexLock l(&mutex_);
+  while ((bg_bottom_compaction_scheduled_ || bg_compaction_scheduled_ ||
+          bg_flush_scheduled_) &&
+         bg_error_.ok()) {
+    bg_cv_.Wait();
+  }
+  return bg_error_;
+}
+
+void DBImpl::TEST_LockMutex() {
+  mutex_.Lock();
+}
+
+void DBImpl::TEST_UnlockMutex() {
+  mutex_.Unlock();
+}
+
+void* DBImpl::TEST_BeginWrite() {
+  auto w = new WriteThread::Writer();
+  write_thread_.EnterUnbatched(w, &mutex_);
+  return reinterpret_cast<void*>(w);
+}
+
+void DBImpl::TEST_EndWrite(void* w) {
+  auto writer = reinterpret_cast<WriteThread::Writer*>(w);
+  write_thread_.ExitUnbatched(writer);
+  delete writer;
+}
+
+size_t DBImpl::TEST_LogsToFreeSize() {
+  InstrumentedMutexLock l(&mutex_);
+  return logs_to_free_.size();
+}
+
+uint64_t DBImpl::TEST_LogfileNumber() {
+  InstrumentedMutexLock l(&mutex_);
+  return logfile_number_;
+}
+
+Status DBImpl::TEST_GetAllImmutableCFOptions(
+    std::unordered_map<std::string, const ImmutableCFOptions*>* iopts_map) {
+  std::vector<std::string> cf_names;
+  std::vector<const ImmutableCFOptions*> iopts;
+  {
+    InstrumentedMutexLock l(&mutex_);
+    for (auto cfd : *versions_->GetColumnFamilySet()) {
+      cf_names.push_back(cfd->GetName());
+      iopts.push_back(cfd->ioptions());
+    }
+  }
+  iopts_map->clear();
+  for (size_t i = 0; i < cf_names.size(); ++i) {
+    iopts_map->insert({cf_names[i], iopts[i]});
+  }
+
+  return Status::OK();
+}
+
+uint64_t DBImpl::TEST_FindMinLogContainingOutstandingPrep() {
+  return FindMinLogContainingOutstandingPrep();
+}
+
+uint64_t DBImpl::TEST_FindMinPrepLogReferencedByMemTable() {
+  return FindMinPrepLogReferencedByMemTable();
+}
+
+Status DBImpl::TEST_GetLatestMutableCFOptions(
+    ColumnFamilyHandle* column_family, MutableCFOptions* mutable_cf_options) {
+  InstrumentedMutexLock l(&mutex_);
+
+  auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
+  *mutable_cf_options = *cfh->cfd()->GetLatestMutableCFOptions();
+  return Status::OK();
+}
+
+int DBImpl::TEST_BGCompactionsAllowed() const {
+  InstrumentedMutexLock l(&mutex_);
+  return GetBGJobLimits().max_compactions;
+}
+
+int DBImpl::TEST_BGFlushesAllowed() const {
+  InstrumentedMutexLock l(&mutex_);
+  return GetBGJobLimits().max_flushes;
+}
+
+}  // namespace rocksdb
+#endif  // NDEBUG

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/db_impl_experimental.cc
----------------------------------------------------------------------
diff --git a/thirdparty/rocksdb/db/db_impl_experimental.cc b/thirdparty/rocksdb/db/db_impl_experimental.cc
new file mode 100644
index 0000000..0d01075
--- /dev/null
+++ b/thirdparty/rocksdb/db/db_impl_experimental.cc
@@ -0,0 +1,152 @@
+//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#include "db/db_impl.h"
+
+#ifndef __STDC_FORMAT_MACROS
+#define __STDC_FORMAT_MACROS
+#endif
+
+#include <inttypes.h>
+#include <vector>
+
+#include "db/column_family.h"
+#include "db/job_context.h"
+#include "db/version_set.h"
+#include "rocksdb/status.h"
+
+namespace rocksdb {
+
+#ifndef ROCKSDB_LITE
+Status DBImpl::SuggestCompactRange(ColumnFamilyHandle* column_family,
+                                   const Slice* begin, const Slice* end) {
+  auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
+  auto cfd = cfh->cfd();
+  InternalKey start_key, end_key;
+  if (begin != nullptr) {
+    start_key.SetMaxPossibleForUserKey(*begin);
+  }
+  if (end != nullptr) {
+    end_key.SetMinPossibleForUserKey(*end);
+  }
+  {
+    InstrumentedMutexLock l(&mutex_);
+    auto vstorage = cfd->current()->storage_info();
+    for (int level = 0; level < vstorage->num_non_empty_levels() - 1; ++level) {
+      std::vector<FileMetaData*> inputs;
+      vstorage->GetOverlappingInputs(
+          level, begin == nullptr ? nullptr : &start_key,
+          end == nullptr ? nullptr : &end_key, &inputs);
+      for (auto f : inputs) {
+        f->marked_for_compaction = true;
+      }
+    }
+    // Since we have some more files to compact, we should also recompute
+    // compaction score
+    vstorage->ComputeCompactionScore(*cfd->ioptions(),
+                                     *cfd->GetLatestMutableCFOptions());
+    SchedulePendingCompaction(cfd);
+    MaybeScheduleFlushOrCompaction();
+  }
+  return Status::OK();
+}
+
+Status DBImpl::PromoteL0(ColumnFamilyHandle* column_family, int target_level) {
+  assert(column_family);
+
+  if (target_level < 1) {
+    ROCKS_LOG_INFO(immutable_db_options_.info_log,
+                   "PromoteL0 FAILED. Invalid target level %d\n", target_level);
+    return Status::InvalidArgument("Invalid target level");
+  }
+
+  Status status;
+  VersionEdit edit;
+  JobContext job_context(next_job_id_.fetch_add(1), true);
+  {
+    InstrumentedMutexLock l(&mutex_);
+    auto* cfd = static_cast<ColumnFamilyHandleImpl*>(column_family)->cfd();
+    const auto* vstorage = cfd->current()->storage_info();
+
+    if (target_level >= vstorage->num_levels()) {
+      ROCKS_LOG_INFO(immutable_db_options_.info_log,
+                     "PromoteL0 FAILED. Target level %d does not exist\n",
+                     target_level);
+      job_context.Clean();
+      return Status::InvalidArgument("Target level does not exist");
+    }
+
+    // Sort L0 files by range.
+    const InternalKeyComparator* icmp = &cfd->internal_comparator();
+    auto l0_files = vstorage->LevelFiles(0);
+    std::sort(l0_files.begin(), l0_files.end(),
+              [icmp](FileMetaData* f1, FileMetaData* f2) {
+                return icmp->Compare(f1->largest, f2->largest) < 0;
+              });
+
+    // Check that no L0 file is being compacted and that they have
+    // non-overlapping ranges.
+    for (size_t i = 0; i < l0_files.size(); ++i) {
+      auto f = l0_files[i];
+      if (f->being_compacted) {
+        ROCKS_LOG_INFO(immutable_db_options_.info_log,
+                       "PromoteL0 FAILED. File %" PRIu64 " being compacted\n",
+                       f->fd.GetNumber());
+        job_context.Clean();
+        return Status::InvalidArgument("PromoteL0 called during L0 compaction");
+      }
+
+      if (i == 0) continue;
+      auto prev_f = l0_files[i - 1];
+      if (icmp->Compare(prev_f->largest, f->smallest) >= 0) {
+        ROCKS_LOG_INFO(immutable_db_options_.info_log,
+                       "PromoteL0 FAILED. Files %" PRIu64 " and %" PRIu64
+                       " have overlapping ranges\n",
+                       prev_f->fd.GetNumber(), f->fd.GetNumber());
+        job_context.Clean();
+        return Status::InvalidArgument("L0 has overlapping files");
+      }
+    }
+
+    // Check that all levels up to target_level are empty.
+    for (int level = 1; level <= target_level; ++level) {
+      if (vstorage->NumLevelFiles(level) > 0) {
+        ROCKS_LOG_INFO(immutable_db_options_.info_log,
+                       "PromoteL0 FAILED. Level %d not empty\n", level);
+        job_context.Clean();
+        return Status::InvalidArgument(
+            "All levels up to target_level "
+            "must be empty");
+      }
+    }
+
+    edit.SetColumnFamily(cfd->GetID());
+    for (const auto& f : l0_files) {
+      edit.DeleteFile(0, f->fd.GetNumber());
+      edit.AddFile(target_level, f->fd.GetNumber(), f->fd.GetPathId(),
+                   f->fd.GetFileSize(), f->smallest, f->largest,
+                   f->smallest_seqno, f->largest_seqno,
+                   f->marked_for_compaction);
+    }
+
+    status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(),
+                                    &edit, &mutex_, directories_.GetDbDir());
+    if (status.ok()) {
+      InstallSuperVersionAndScheduleWorkWrapper(
+          cfd, &job_context, *cfd->GetLatestMutableCFOptions());
+    }
+  }  // lock released here
+  LogFlush(immutable_db_options_.info_log);
+  job_context.Clean();
+
+  return status;
+}
+#endif  // ROCKSDB_LITE
+
+}  // namespace rocksdb


Mime
View raw message