nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jeremyd...@apache.org
Subject [20/51] [partial] nifi-minifi-cpp git commit: MINIFI-372: Replace leveldb with RocksDB
Date Mon, 09 Oct 2017 16:25:00 GMT
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/db_impl_files.cc
----------------------------------------------------------------------
diff --git a/thirdparty/rocksdb/db/db_impl_files.cc b/thirdparty/rocksdb/db/db_impl_files.cc
new file mode 100644
index 0000000..e44e423
--- /dev/null
+++ b/thirdparty/rocksdb/db/db_impl_files.cc
@@ -0,0 +1,548 @@
+//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+#include "db/db_impl.h"
+
+#ifndef __STDC_FORMAT_MACROS
+#define __STDC_FORMAT_MACROS
+#endif
+#include <inttypes.h>
+#include "db/event_helpers.h"
+#include "util/file_util.h"
+#include "util/sst_file_manager_impl.h"
+
+
+namespace rocksdb {
+uint64_t DBImpl::FindMinPrepLogReferencedByMemTable() {
+  if (!allow_2pc()) {
+    return 0;
+  }
+
+  uint64_t min_log = 0;
+
+  // we must look through the memtables for two phase transactions
+  // that have been committed but not yet flushed
+  for (auto loop_cfd : *versions_->GetColumnFamilySet()) {
+    if (loop_cfd->IsDropped()) {
+      continue;
+    }
+
+    auto log = loop_cfd->imm()->GetMinLogContainingPrepSection();
+
+    if (log > 0 && (min_log == 0 || log < min_log)) {
+      min_log = log;
+    }
+
+    log = loop_cfd->mem()->GetMinLogContainingPrepSection();
+
+    if (log > 0 && (min_log == 0 || log < min_log)) {
+      min_log = log;
+    }
+  }
+
+  return min_log;
+}
+
+void DBImpl::MarkLogAsHavingPrepSectionFlushed(uint64_t log) {
+  assert(log != 0);
+  std::lock_guard<std::mutex> lock(prep_heap_mutex_);
+  auto it = prepared_section_completed_.find(log);
+  assert(it != prepared_section_completed_.end());
+  it->second += 1;
+}
+
+void DBImpl::MarkLogAsContainingPrepSection(uint64_t log) {
+  assert(log != 0);
+  std::lock_guard<std::mutex> lock(prep_heap_mutex_);
+  min_log_with_prep_.push(log);
+  auto it = prepared_section_completed_.find(log);
+  if (it == prepared_section_completed_.end()) {
+    prepared_section_completed_[log] = 0;
+  }
+}
+
+uint64_t DBImpl::FindMinLogContainingOutstandingPrep() {
+
+  if (!allow_2pc()) {
+    return 0;
+  }
+
+  std::lock_guard<std::mutex> lock(prep_heap_mutex_);
+  uint64_t min_log = 0;
+
+  // first we look in the prepared heap where we keep
+  // track of transactions that have been prepared (written to WAL)
+  // but not yet committed.
+  while (!min_log_with_prep_.empty()) {
+    min_log = min_log_with_prep_.top();
+
+    auto it = prepared_section_completed_.find(min_log);
+
+    // value was marked as 'deleted' from heap
+    if (it != prepared_section_completed_.end() && it->second > 0) {
+      it->second -= 1;
+      min_log_with_prep_.pop();
+
+      // back to squere one...
+      min_log = 0;
+      continue;
+    } else {
+      // found a valid value
+      break;
+    }
+  }
+
+  return min_log;
+}
+
+uint64_t DBImpl::MinLogNumberToKeep() {
+  uint64_t log_number = versions_->MinLogNumber();
+
+  if (allow_2pc()) {
+    // if are 2pc we must consider logs containing prepared
+    // sections of outstanding transactions.
+    //
+    // We must check min logs with outstanding prep before we check
+    // logs referneces by memtables because a log referenced by the
+    // first data structure could transition to the second under us.
+    //
+    // TODO(horuff): iterating over all column families under db mutex.
+    // should find more optimial solution
+    auto min_log_in_prep_heap = FindMinLogContainingOutstandingPrep();
+
+    if (min_log_in_prep_heap != 0 && min_log_in_prep_heap < log_number) {
+      log_number = min_log_in_prep_heap;
+    }
+
+    auto min_log_refed_by_mem = FindMinPrepLogReferencedByMemTable();
+
+    if (min_log_refed_by_mem != 0 && min_log_refed_by_mem < log_number) {
+      log_number = min_log_refed_by_mem;
+    }
+  }
+  return log_number;
+}
+
+// * Returns the list of live files in 'sst_live'
+// If it's doing full scan:
+// * Returns the list of all files in the filesystem in
+// 'full_scan_candidate_files'.
+// Otherwise, gets obsolete files from VersionSet.
+// no_full_scan = true -- never do the full scan using GetChildren()
+// force = false -- don't force the full scan, except every
+//  mutable_db_options_.delete_obsolete_files_period_micros
+// force = true -- force the full scan
+void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
+                               bool no_full_scan) {
+  mutex_.AssertHeld();
+
+  // if deletion is disabled, do nothing
+  if (disable_delete_obsolete_files_ > 0) {
+    return;
+  }
+
+  bool doing_the_full_scan = false;
+
+  // logic for figurint out if we're doing the full scan
+  if (no_full_scan) {
+    doing_the_full_scan = false;
+  } else if (force ||
+             mutable_db_options_.delete_obsolete_files_period_micros == 0) {
+    doing_the_full_scan = true;
+  } else {
+    const uint64_t now_micros = env_->NowMicros();
+    if ((delete_obsolete_files_last_run_ +
+         mutable_db_options_.delete_obsolete_files_period_micros) <
+        now_micros) {
+      doing_the_full_scan = true;
+      delete_obsolete_files_last_run_ = now_micros;
+    }
+  }
+
+  // don't delete files that might be currently written to from compaction
+  // threads
+  // Since job_context->min_pending_output is set, until file scan finishes,
+  // mutex_ cannot be released. Otherwise, we might see no min_pending_output
+  // here but later find newer generated unfinalized files while scannint.
+  if (!pending_outputs_.empty()) {
+    job_context->min_pending_output = *pending_outputs_.begin();
+  } else {
+    // delete all of them
+    job_context->min_pending_output = std::numeric_limits<uint64_t>::max();
+  }
+
+  // Get obsolete files.  This function will also update the list of
+  // pending files in VersionSet().
+  versions_->GetObsoleteFiles(&job_context->sst_delete_files,
+                              &job_context->manifest_delete_files,
+                              job_context->min_pending_output);
+
+  // store the current filenum, lognum, etc
+  job_context->manifest_file_number = versions_->manifest_file_number();
+  job_context->pending_manifest_file_number =
+      versions_->pending_manifest_file_number();
+  job_context->log_number = MinLogNumberToKeep();
+
+  job_context->prev_log_number = versions_->prev_log_number();
+
+  versions_->AddLiveFiles(&job_context->sst_live);
+  if (doing_the_full_scan) {
+    for (size_t path_id = 0; path_id < immutable_db_options_.db_paths.size();
+         path_id++) {
+      // set of all files in the directory. We'll exclude files that are still
+      // alive in the subsequent processings.
+      std::vector<std::string> files;
+      env_->GetChildren(immutable_db_options_.db_paths[path_id].path,
+                        &files);  // Ignore errors
+      for (std::string file : files) {
+        // TODO(icanadi) clean up this mess to avoid having one-off "/" prefixes
+        job_context->full_scan_candidate_files.emplace_back(
+            "/" + file, static_cast<uint32_t>(path_id));
+      }
+    }
+
+    // Add log files in wal_dir
+    if (immutable_db_options_.wal_dir != dbname_) {
+      std::vector<std::string> log_files;
+      env_->GetChildren(immutable_db_options_.wal_dir,
+                        &log_files);  // Ignore errors
+      for (std::string log_file : log_files) {
+        job_context->full_scan_candidate_files.emplace_back(log_file, 0);
+      }
+    }
+    // Add info log files in db_log_dir
+    if (!immutable_db_options_.db_log_dir.empty() &&
+        immutable_db_options_.db_log_dir != dbname_) {
+      std::vector<std::string> info_log_files;
+      // Ignore errors
+      env_->GetChildren(immutable_db_options_.db_log_dir, &info_log_files);
+      for (std::string log_file : info_log_files) {
+        job_context->full_scan_candidate_files.emplace_back(log_file, 0);
+      }
+    }
+  }
+
+  // logs_ is empty when called during recovery, in which case there can't yet
+  // be any tracked obsolete logs
+  if (!alive_log_files_.empty() && !logs_.empty()) {
+    uint64_t min_log_number = job_context->log_number;
+    size_t num_alive_log_files = alive_log_files_.size();
+    // find newly obsoleted log files
+    while (alive_log_files_.begin()->number < min_log_number) {
+      auto& earliest = *alive_log_files_.begin();
+      if (immutable_db_options_.recycle_log_file_num >
+          log_recycle_files.size()) {
+        ROCKS_LOG_INFO(immutable_db_options_.info_log,
+                       "adding log %" PRIu64 " to recycle list\n",
+                       earliest.number);
+        log_recycle_files.push_back(earliest.number);
+      } else {
+        job_context->log_delete_files.push_back(earliest.number);
+      }
+      if (job_context->size_log_to_delete == 0) {
+        job_context->prev_total_log_size = total_log_size_;
+        job_context->num_alive_log_files = num_alive_log_files;
+      }
+      job_context->size_log_to_delete += earliest.size;
+      total_log_size_ -= earliest.size;
+      if (concurrent_prepare_) {
+        log_write_mutex_.Lock();
+      }
+      alive_log_files_.pop_front();
+      if (concurrent_prepare_) {
+        log_write_mutex_.Unlock();
+      }
+      // Current log should always stay alive since it can't have
+      // number < MinLogNumber().
+      assert(alive_log_files_.size());
+    }
+    while (!logs_.empty() && logs_.front().number < min_log_number) {
+      auto& log = logs_.front();
+      if (log.getting_synced) {
+        log_sync_cv_.Wait();
+        // logs_ could have changed while we were waiting.
+        continue;
+      }
+      logs_to_free_.push_back(log.ReleaseWriter());
+      {
+        InstrumentedMutexLock wl(&log_write_mutex_);
+        logs_.pop_front();
+      }
+    }
+    // Current log cannot be obsolete.
+    assert(!logs_.empty());
+  }
+
+  // We're just cleaning up for DB::Write().
+  assert(job_context->logs_to_free.empty());
+  job_context->logs_to_free = logs_to_free_;
+  job_context->log_recycle_files.assign(log_recycle_files.begin(),
+                                        log_recycle_files.end());
+  logs_to_free_.clear();
+}
+
+namespace {
+bool CompareCandidateFile(const JobContext::CandidateFileInfo& first,
+                          const JobContext::CandidateFileInfo& second) {
+  if (first.file_name > second.file_name) {
+    return true;
+  } else if (first.file_name < second.file_name) {
+    return false;
+  } else {
+    return (first.path_id > second.path_id);
+  }
+}
+};  // namespace
+
+// Delete obsolete files and log status and information of file deletion
+void DBImpl::DeleteObsoleteFileImpl(Status file_deletion_status, int job_id,
+                                    const std::string& fname, FileType type,
+                                    uint64_t number, uint32_t path_id) {
+  if (type == kTableFile) {
+    file_deletion_status =
+        DeleteSSTFile(&immutable_db_options_, fname, path_id);
+  } else {
+    file_deletion_status = env_->DeleteFile(fname);
+  }
+  if (file_deletion_status.ok()) {
+    ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
+                    "[JOB %d] Delete %s type=%d #%" PRIu64 " -- %s\n", job_id,
+                    fname.c_str(), type, number,
+                    file_deletion_status.ToString().c_str());
+  } else if (env_->FileExists(fname).IsNotFound()) {
+    ROCKS_LOG_INFO(
+        immutable_db_options_.info_log,
+        "[JOB %d] Tried to delete a non-existing file %s type=%d #%" PRIu64
+        " -- %s\n",
+        job_id, fname.c_str(), type, number,
+        file_deletion_status.ToString().c_str());
+  } else {
+    ROCKS_LOG_ERROR(immutable_db_options_.info_log,
+                    "[JOB %d] Failed to delete %s type=%d #%" PRIu64 " -- %s\n",
+                    job_id, fname.c_str(), type, number,
+                    file_deletion_status.ToString().c_str());
+  }
+  if (type == kTableFile) {
+    EventHelpers::LogAndNotifyTableFileDeletion(
+        &event_logger_, job_id, number, fname, file_deletion_status, GetName(),
+        immutable_db_options_.listeners);
+  }
+}
+
+// Diffs the files listed in filenames and those that do not
+// belong to live files are posibly removed. Also, removes all the
+// files in sst_delete_files and log_delete_files.
+// It is not necessary to hold the mutex when invoking this method.
+void DBImpl::PurgeObsoleteFiles(const JobContext& state, bool schedule_only) {
+  // we'd better have sth to delete
+  assert(state.HaveSomethingToDelete());
+
+  // this checks if FindObsoleteFiles() was run before. If not, don't do
+  // PurgeObsoleteFiles(). If FindObsoleteFiles() was run, we need to also
+  // run PurgeObsoleteFiles(), even if disable_delete_obsolete_files_ is true
+  if (state.manifest_file_number == 0) {
+    return;
+  }
+
+  // Now, convert live list to an unordered map, WITHOUT mutex held;
+  // set is slow.
+  std::unordered_map<uint64_t, const FileDescriptor*> sst_live_map;
+  for (const FileDescriptor& fd : state.sst_live) {
+    sst_live_map[fd.GetNumber()] = &fd;
+  }
+  std::unordered_set<uint64_t> log_recycle_files_set(
+      state.log_recycle_files.begin(), state.log_recycle_files.end());
+
+  auto candidate_files = state.full_scan_candidate_files;
+  candidate_files.reserve(
+      candidate_files.size() + state.sst_delete_files.size() +
+      state.log_delete_files.size() + state.manifest_delete_files.size());
+  // We may ignore the dbname when generating the file names.
+  const char* kDumbDbName = "";
+  for (auto file : state.sst_delete_files) {
+    candidate_files.emplace_back(
+        MakeTableFileName(kDumbDbName, file->fd.GetNumber()),
+        file->fd.GetPathId());
+    if (file->table_reader_handle) {
+      table_cache_->Release(file->table_reader_handle);
+    }
+    delete file;
+  }
+
+  for (auto file_num : state.log_delete_files) {
+    if (file_num > 0) {
+      candidate_files.emplace_back(LogFileName(kDumbDbName, file_num), 0);
+    }
+  }
+  for (const auto& filename : state.manifest_delete_files) {
+    candidate_files.emplace_back(filename, 0);
+  }
+
+  // dedup state.candidate_files so we don't try to delete the same
+  // file twice
+  std::sort(candidate_files.begin(), candidate_files.end(),
+            CompareCandidateFile);
+  candidate_files.erase(
+      std::unique(candidate_files.begin(), candidate_files.end()),
+      candidate_files.end());
+
+  if (state.prev_total_log_size > 0) {
+    ROCKS_LOG_INFO(immutable_db_options_.info_log,
+                   "[JOB %d] Try to delete WAL files size %" PRIu64
+                   ", prev total WAL file size %" PRIu64
+                   ", number of live WAL files %" ROCKSDB_PRIszt ".\n",
+                   state.job_id, state.size_log_to_delete,
+                   state.prev_total_log_size, state.num_alive_log_files);
+  }
+
+  std::vector<std::string> old_info_log_files;
+  InfoLogPrefix info_log_prefix(!immutable_db_options_.db_log_dir.empty(),
+                                dbname_);
+  for (const auto& candidate_file : candidate_files) {
+    std::string to_delete = candidate_file.file_name;
+    uint32_t path_id = candidate_file.path_id;
+    uint64_t number;
+    FileType type;
+    // Ignore file if we cannot recognize it.
+    if (!ParseFileName(to_delete, &number, info_log_prefix.prefix, &type)) {
+      continue;
+    }
+
+    bool keep = true;
+    switch (type) {
+      case kLogFile:
+        keep = ((number >= state.log_number) ||
+                (number == state.prev_log_number) ||
+                (log_recycle_files_set.find(number) !=
+                 log_recycle_files_set.end()));
+        break;
+      case kDescriptorFile:
+        // Keep my manifest file, and any newer incarnations'
+        // (can happen during manifest roll)
+        keep = (number >= state.manifest_file_number);
+        break;
+      case kTableFile:
+        // If the second condition is not there, this makes
+        // DontDeletePendingOutputs fail
+        keep = (sst_live_map.find(number) != sst_live_map.end()) ||
+               number >= state.min_pending_output;
+        break;
+      case kTempFile:
+        // Any temp files that are currently being written to must
+        // be recorded in pending_outputs_, which is inserted into "live".
+        // Also, SetCurrentFile creates a temp file when writing out new
+        // manifest, which is equal to state.pending_manifest_file_number. We
+        // should not delete that file
+        //
+        // TODO(yhchiang): carefully modify the third condition to safely
+        //                 remove the temp options files.
+        keep = (sst_live_map.find(number) != sst_live_map.end()) ||
+               (number == state.pending_manifest_file_number) ||
+               (to_delete.find(kOptionsFileNamePrefix) != std::string::npos);
+        break;
+      case kInfoLogFile:
+        keep = true;
+        if (number != 0) {
+          old_info_log_files.push_back(to_delete);
+        }
+        break;
+      case kCurrentFile:
+      case kDBLockFile:
+      case kIdentityFile:
+      case kMetaDatabase:
+      case kOptionsFile:
+      case kBlobFile:
+        keep = true;
+        break;
+    }
+
+    if (keep) {
+      continue;
+    }
+
+    std::string fname;
+    if (type == kTableFile) {
+      // evict from cache
+      TableCache::Evict(table_cache_.get(), number);
+      fname = TableFileName(immutable_db_options_.db_paths, number, path_id);
+    } else {
+      fname = ((type == kLogFile) ? immutable_db_options_.wal_dir : dbname_) +
+              "/" + to_delete;
+    }
+
+#ifndef ROCKSDB_LITE
+    if (type == kLogFile && (immutable_db_options_.wal_ttl_seconds > 0 ||
+                             immutable_db_options_.wal_size_limit_mb > 0)) {
+      wal_manager_.ArchiveWALFile(fname, number);
+      continue;
+    }
+#endif  // !ROCKSDB_LITE
+
+    Status file_deletion_status;
+    if (schedule_only) {
+      InstrumentedMutexLock guard_lock(&mutex_);
+      SchedulePendingPurge(fname, type, number, path_id, state.job_id);
+    } else {
+      DeleteObsoleteFileImpl(file_deletion_status, state.job_id, fname, type,
+                             number, path_id);
+    }
+  }
+
+  // Delete old info log files.
+  size_t old_info_log_file_count = old_info_log_files.size();
+  if (old_info_log_file_count != 0 &&
+      old_info_log_file_count >= immutable_db_options_.keep_log_file_num) {
+    std::sort(old_info_log_files.begin(), old_info_log_files.end());
+    size_t end =
+        old_info_log_file_count - immutable_db_options_.keep_log_file_num;
+    for (unsigned int i = 0; i <= end; i++) {
+      std::string& to_delete = old_info_log_files.at(i);
+      std::string full_path_to_delete =
+          (immutable_db_options_.db_log_dir.empty()
+               ? dbname_
+               : immutable_db_options_.db_log_dir) +
+          "/" + to_delete;
+      ROCKS_LOG_INFO(immutable_db_options_.info_log,
+                     "[JOB %d] Delete info log file %s\n", state.job_id,
+                     full_path_to_delete.c_str());
+      Status s = env_->DeleteFile(full_path_to_delete);
+      if (!s.ok()) {
+        if (env_->FileExists(full_path_to_delete).IsNotFound()) {
+          ROCKS_LOG_INFO(
+              immutable_db_options_.info_log,
+              "[JOB %d] Tried to delete non-existing info log file %s FAILED "
+              "-- %s\n",
+              state.job_id, to_delete.c_str(), s.ToString().c_str());
+        } else {
+          ROCKS_LOG_ERROR(immutable_db_options_.info_log,
+                          "[JOB %d] Delete info log file %s FAILED -- %s\n",
+                          state.job_id, to_delete.c_str(),
+                          s.ToString().c_str());
+        }
+      }
+    }
+  }
+#ifndef ROCKSDB_LITE
+  wal_manager_.PurgeObsoleteWALFiles();
+#endif  // ROCKSDB_LITE
+  LogFlush(immutable_db_options_.info_log);
+}
+
+void DBImpl::DeleteObsoleteFiles() {
+  mutex_.AssertHeld();
+  JobContext job_context(next_job_id_.fetch_add(1));
+  FindObsoleteFiles(&job_context, true);
+
+  mutex_.Unlock();
+  if (job_context.HaveSomethingToDelete()) {
+    PurgeObsoleteFiles(job_context);
+  }
+  job_context.Clean();
+  mutex_.Lock();
+}
+}  // namespace rocksdb

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/db_impl_open.cc
----------------------------------------------------------------------
diff --git a/thirdparty/rocksdb/db/db_impl_open.cc b/thirdparty/rocksdb/db/db_impl_open.cc
new file mode 100644
index 0000000..bc94b60
--- /dev/null
+++ b/thirdparty/rocksdb/db/db_impl_open.cc
@@ -0,0 +1,1129 @@
+//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+#include "db/db_impl.h"
+
+#ifndef __STDC_FORMAT_MACROS
+#define __STDC_FORMAT_MACROS
+#endif
+#include <inttypes.h>
+
+#include "db/builder.h"
+#include "options/options_helper.h"
+#include "rocksdb/wal_filter.h"
+#include "table/block_based_table_factory.h"
+#include "util/rate_limiter.h"
+#include "util/sst_file_manager_impl.h"
+#include "util/sync_point.h"
+
+namespace rocksdb {
+Options SanitizeOptions(const std::string& dbname,
+                        const Options& src) {
+  auto db_options = SanitizeOptions(dbname, DBOptions(src));
+  ImmutableDBOptions immutable_db_options(db_options);
+  auto cf_options =
+      SanitizeOptions(immutable_db_options, ColumnFamilyOptions(src));
+  return Options(db_options, cf_options);
+}
+
+DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src) {
+  DBOptions result(src);
+
+  // result.max_open_files means an "infinite" open files.
+  if (result.max_open_files != -1) {
+    int max_max_open_files = port::GetMaxOpenFiles();
+    if (max_max_open_files == -1) {
+      max_max_open_files = 0x400000;
+    }
+    ClipToRange(&result.max_open_files, 20, max_max_open_files);
+  }
+
+  if (result.info_log == nullptr) {
+    Status s = CreateLoggerFromOptions(dbname, result, &result.info_log);
+    if (!s.ok()) {
+      // No place suitable for logging
+      result.info_log = nullptr;
+    }
+  }
+
+  if (!result.write_buffer_manager) {
+    result.write_buffer_manager.reset(
+        new WriteBufferManager(result.db_write_buffer_size));
+  }
+  auto bg_job_limits = DBImpl::GetBGJobLimits(result.max_background_flushes,
+                                              result.max_background_compactions,
+                                              result.max_background_jobs,
+                                              true /* parallelize_compactions */);
+  result.env->IncBackgroundThreadsIfNeeded(bg_job_limits.max_compactions,
+                                           Env::Priority::LOW);
+  result.env->IncBackgroundThreadsIfNeeded(bg_job_limits.max_flushes,
+                                           Env::Priority::HIGH);
+
+  if (result.rate_limiter.get() != nullptr) {
+    if (result.bytes_per_sync == 0) {
+      result.bytes_per_sync = 1024 * 1024;
+    }
+  }
+
+  if (result.delayed_write_rate == 0) {
+    if (result.rate_limiter.get() != nullptr) {
+      result.delayed_write_rate = result.rate_limiter->GetBytesPerSecond();
+    }
+    if (result.delayed_write_rate == 0) {
+      result.delayed_write_rate = 16 * 1024 * 1024;
+    }
+  }
+
+  if (result.WAL_ttl_seconds > 0 || result.WAL_size_limit_MB > 0) {
+    result.recycle_log_file_num = false;
+  }
+
+  if (result.recycle_log_file_num &&
+      (result.wal_recovery_mode == WALRecoveryMode::kPointInTimeRecovery ||
+       result.wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency)) {
+    // kPointInTimeRecovery is indistinguishable from
+    // kTolerateCorruptedTailRecords in recycle mode since we define
+    // the "end" of the log as the first corrupt record we encounter.
+    // kAbsoluteConsistency doesn't make sense because even a clean
+    // shutdown leaves old junk at the end of the log file.
+    result.wal_recovery_mode = WALRecoveryMode::kTolerateCorruptedTailRecords;
+  }
+
+  if (result.wal_dir.empty()) {
+    // Use dbname as default
+    result.wal_dir = dbname;
+  }
+  if (result.wal_dir.back() == '/') {
+    result.wal_dir = result.wal_dir.substr(0, result.wal_dir.size() - 1);
+  }
+
+  if (result.db_paths.size() == 0) {
+    result.db_paths.emplace_back(dbname, std::numeric_limits<uint64_t>::max());
+  }
+
+  if (result.use_direct_io_for_flush_and_compaction &&
+      result.compaction_readahead_size == 0) {
+    TEST_SYNC_POINT_CALLBACK("SanitizeOptions:direct_io", nullptr);
+    result.compaction_readahead_size = 1024 * 1024 * 2;
+  }
+
+  if (result.compaction_readahead_size > 0 ||
+      result.use_direct_io_for_flush_and_compaction) {
+    result.new_table_reader_for_compaction_inputs = true;
+  }
+
+  // Force flush on DB open if 2PC is enabled, since with 2PC we have no
+  // guarantee that consecutive log files have consecutive sequence id, which
+  // make recovery complicated.
+  if (result.allow_2pc) {
+    result.avoid_flush_during_recovery = false;
+  }
+
+  return result;
+}
+
+namespace {
+
+Status SanitizeOptionsByTable(
+    const DBOptions& db_opts,
+    const std::vector<ColumnFamilyDescriptor>& column_families) {
+  Status s;
+  for (auto cf : column_families) {
+    s = cf.options.table_factory->SanitizeOptions(db_opts, cf.options);
+    if (!s.ok()) {
+      return s;
+    }
+  }
+  return Status::OK();
+}
+
+static Status ValidateOptions(
+    const DBOptions& db_options,
+    const std::vector<ColumnFamilyDescriptor>& column_families) {
+  Status s;
+
+  for (auto& cfd : column_families) {
+    s = CheckCompressionSupported(cfd.options);
+    if (s.ok() && db_options.allow_concurrent_memtable_write) {
+      s = CheckConcurrentWritesSupported(cfd.options);
+    }
+    if (!s.ok()) {
+      return s;
+    }
+    if (db_options.db_paths.size() > 1) {
+      if ((cfd.options.compaction_style != kCompactionStyleUniversal) &&
+          (cfd.options.compaction_style != kCompactionStyleLevel)) {
+        return Status::NotSupported(
+            "More than one DB paths are only supported in "
+            "universal and level compaction styles. ");
+      }
+    }
+    if (cfd.options.compaction_options_fifo.ttl > 0) {
+      if (db_options.max_open_files != -1) {
+        return Status::NotSupported(
+            "FIFO Compaction with TTL is only supported when files are always "
+            "kept open (set max_open_files = -1). ");
+      }
+      if (cfd.options.table_factory->Name() !=
+          BlockBasedTableFactory().Name()) {
+        return Status::NotSupported(
+            "FIFO Compaction with TTL is only supported in "
+            "Block-Based Table format. ");
+      }
+    }
+  }
+
+  if (db_options.db_paths.size() > 4) {
+    return Status::NotSupported(
+        "More than four DB paths are not supported yet. ");
+  }
+
+  if (db_options.allow_mmap_reads && db_options.use_direct_reads) {
+    // Protect against assert in PosixMMapReadableFile constructor
+    return Status::NotSupported(
+        "If memory mapped reads (allow_mmap_reads) are enabled "
+        "then direct I/O reads (use_direct_reads) must be disabled. ");
+  }
+
+  if (db_options.allow_mmap_writes &&
+      db_options.use_direct_io_for_flush_and_compaction) {
+    return Status::NotSupported(
+        "If memory mapped writes (allow_mmap_writes) are enabled "
+        "then direct I/O writes (use_direct_io_for_flush_and_compaction) must "
+        "be disabled. ");
+  }
+
+  if (db_options.keep_log_file_num == 0) {
+    return Status::InvalidArgument("keep_log_file_num must be greater than 0");
+  }
+
+  return Status::OK();
+}
+} // namespace
+Status DBImpl::NewDB() {
+  VersionEdit new_db;
+  new_db.SetLogNumber(0);
+  new_db.SetNextFile(2);
+  new_db.SetLastSequence(0);
+
+  Status s;
+
+  ROCKS_LOG_INFO(immutable_db_options_.info_log, "Creating manifest 1 \n");
+  const std::string manifest = DescriptorFileName(dbname_, 1);
+  {
+    unique_ptr<WritableFile> file;
+    EnvOptions env_options = env_->OptimizeForManifestWrite(env_options_);
+    s = NewWritableFile(env_, manifest, &file, env_options);
+    if (!s.ok()) {
+      return s;
+    }
+    file->SetPreallocationBlockSize(
+        immutable_db_options_.manifest_preallocation_size);
+    unique_ptr<WritableFileWriter> file_writer(
+        new WritableFileWriter(std::move(file), env_options));
+    log::Writer log(std::move(file_writer), 0, false);
+    std::string record;
+    new_db.EncodeTo(&record);
+    s = log.AddRecord(record);
+    if (s.ok()) {
+      s = SyncManifest(env_, &immutable_db_options_, log.file());
+    }
+  }
+  if (s.ok()) {
+    // Make "CURRENT" file that points to the new manifest file.
+    s = SetCurrentFile(env_, dbname_, 1, directories_.GetDbDir());
+  } else {
+    env_->DeleteFile(manifest);
+  }
+  return s;
+}
+
+Status DBImpl::Directories::CreateAndNewDirectory(
+    Env* env, const std::string& dirname,
+    std::unique_ptr<Directory>* directory) const {
+  // We call CreateDirIfMissing() as the directory may already exist (if we
+  // are reopening a DB), when this happens we don't want creating the
+  // directory to cause an error. However, we need to check if creating the
+  // directory fails or else we may get an obscure message about the lock
+  // file not existing. One real-world example of this occurring is if
+  // env->CreateDirIfMissing() doesn't create intermediate directories, e.g.
+  // when dbname_ is "dir/db" but when "dir" doesn't exist.
+  Status s = env->CreateDirIfMissing(dirname);
+  if (!s.ok()) {
+    return s;
+  }
+  return env->NewDirectory(dirname, directory);
+}
+
+Status DBImpl::Directories::SetDirectories(
+    Env* env, const std::string& dbname, const std::string& wal_dir,
+    const std::vector<DbPath>& data_paths) {
+  Status s = CreateAndNewDirectory(env, dbname, &db_dir_);
+  if (!s.ok()) {
+    return s;
+  }
+  if (!wal_dir.empty() && dbname != wal_dir) {
+    s = CreateAndNewDirectory(env, wal_dir, &wal_dir_);
+    if (!s.ok()) {
+      return s;
+    }
+  }
+
+  data_dirs_.clear();
+  for (auto& p : data_paths) {
+    const std::string db_path = p.path;
+    if (db_path == dbname) {
+      data_dirs_.emplace_back(nullptr);
+    } else {
+      std::unique_ptr<Directory> path_directory;
+      s = CreateAndNewDirectory(env, db_path, &path_directory);
+      if (!s.ok()) {
+        return s;
+      }
+      data_dirs_.emplace_back(path_directory.release());
+    }
+  }
+  assert(data_dirs_.size() == data_paths.size());
+  return Status::OK();
+}
+
+Status DBImpl::Recover(
+    const std::vector<ColumnFamilyDescriptor>& column_families, bool read_only,
+    bool error_if_log_file_exist, bool error_if_data_exists_in_logs) {
+  mutex_.AssertHeld();
+
+  bool is_new_db = false;
+  assert(db_lock_ == nullptr);
+  if (!read_only) {
+    Status s = directories_.SetDirectories(env_, dbname_,
+                                           immutable_db_options_.wal_dir,
+                                           immutable_db_options_.db_paths);
+    if (!s.ok()) {
+      return s;
+    }
+
+    s = env_->LockFile(LockFileName(dbname_), &db_lock_);
+    if (!s.ok()) {
+      return s;
+    }
+
+    s = env_->FileExists(CurrentFileName(dbname_));
+    if (s.IsNotFound()) {
+      if (immutable_db_options_.create_if_missing) {
+        s = NewDB();
+        is_new_db = true;
+        if (!s.ok()) {
+          return s;
+        }
+      } else {
+        return Status::InvalidArgument(
+            dbname_, "does not exist (create_if_missing is false)");
+      }
+    } else if (s.ok()) {
+      if (immutable_db_options_.error_if_exists) {
+        return Status::InvalidArgument(
+            dbname_, "exists (error_if_exists is true)");
+      }
+    } else {
+      // Unexpected error reading file
+      assert(s.IsIOError());
+      return s;
+    }
+    // Check for the IDENTITY file and create it if not there
+    s = env_->FileExists(IdentityFileName(dbname_));
+    if (s.IsNotFound()) {
+      s = SetIdentityFile(env_, dbname_);
+      if (!s.ok()) {
+        return s;
+      }
+    } else if (!s.ok()) {
+      assert(s.IsIOError());
+      return s;
+    }
+  }
+
+  Status s = versions_->Recover(column_families, read_only);
+  if (immutable_db_options_.paranoid_checks && s.ok()) {
+    s = CheckConsistency();
+  }
+  if (s.ok()) {
+    SequenceNumber next_sequence(kMaxSequenceNumber);
+    default_cf_handle_ = new ColumnFamilyHandleImpl(
+        versions_->GetColumnFamilySet()->GetDefault(), this, &mutex_);
+    default_cf_internal_stats_ = default_cf_handle_->cfd()->internal_stats();
+    single_column_family_mode_ =
+        versions_->GetColumnFamilySet()->NumberOfColumnFamilies() == 1;
+
+    // Recover from all newer log files than the ones named in the
+    // descriptor (new log files may have been added by the previous
+    // incarnation without registering them in the descriptor).
+    //
+    // Note that prev_log_number() is no longer used, but we pay
+    // attention to it in case we are recovering a database
+    // produced by an older version of rocksdb.
+    std::vector<std::string> filenames;
+    s = env_->GetChildren(immutable_db_options_.wal_dir, &filenames);
+    if (!s.ok()) {
+      return s;
+    }
+
+    std::vector<uint64_t> logs;
+    for (size_t i = 0; i < filenames.size(); i++) {
+      uint64_t number;
+      FileType type;
+      if (ParseFileName(filenames[i], &number, &type) && type == kLogFile) {
+        if (is_new_db) {
+          return Status::Corruption(
+              "While creating a new Db, wal_dir contains "
+              "existing log file: ",
+              filenames[i]);
+        } else {
+          logs.push_back(number);
+        }
+      }
+    }
+
+    if (logs.size() > 0) {
+      if (error_if_log_file_exist) {
+        return Status::Corruption(
+            "The db was opened in readonly mode with error_if_log_file_exist"
+            "flag but a log file already exists");
+      } else if (error_if_data_exists_in_logs) {
+        for (auto& log : logs) {
+          std::string fname = LogFileName(immutable_db_options_.wal_dir, log);
+          uint64_t bytes;
+          s = env_->GetFileSize(fname, &bytes);
+          if (s.ok()) {
+            if (bytes > 0) {
+              return Status::Corruption(
+                  "error_if_data_exists_in_logs is set but there are data "
+                  " in log files.");
+            }
+          }
+        }
+      }
+    }
+
+    if (!logs.empty()) {
+      // Recover in the order in which the logs were generated
+      std::sort(logs.begin(), logs.end());
+      s = RecoverLogFiles(logs, &next_sequence, read_only);
+      if (!s.ok()) {
+        // Clear memtables if recovery failed
+        for (auto cfd : *versions_->GetColumnFamilySet()) {
+          cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions(),
+                                 kMaxSequenceNumber);
+        }
+      }
+    }
+  }
+
+  // Initial value
+  max_total_in_memory_state_ = 0;
+  for (auto cfd : *versions_->GetColumnFamilySet()) {
+    auto* mutable_cf_options = cfd->GetLatestMutableCFOptions();
+    max_total_in_memory_state_ += mutable_cf_options->write_buffer_size *
+                                  mutable_cf_options->max_write_buffer_number;
+  }
+
+  return s;
+}
+
+// REQUIRES: log_numbers are sorted in ascending order
+Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
+                               SequenceNumber* next_sequence, bool read_only) {
+  struct LogReporter : public log::Reader::Reporter {
+    Env* env;
+    Logger* info_log;
+    const char* fname;
+    Status* status;  // nullptr if immutable_db_options_.paranoid_checks==false
+    virtual void Corruption(size_t bytes, const Status& s) override {
+      ROCKS_LOG_WARN(info_log, "%s%s: dropping %d bytes; %s",
+                     (this->status == nullptr ? "(ignoring error) " : ""),
+                     fname, static_cast<int>(bytes), s.ToString().c_str());
+      if (this->status != nullptr && this->status->ok()) {
+        *this->status = s;
+      }
+    }
+  };
+
+  mutex_.AssertHeld();
+  Status status;
+  std::unordered_map<int, VersionEdit> version_edits;
+  // no need to refcount because iteration is under mutex
+  for (auto cfd : *versions_->GetColumnFamilySet()) {
+    VersionEdit edit;
+    edit.SetColumnFamily(cfd->GetID());
+    version_edits.insert({cfd->GetID(), edit});
+  }
+  int job_id = next_job_id_.fetch_add(1);
+  {
+    auto stream = event_logger_.Log();
+    stream << "job" << job_id << "event"
+           << "recovery_started";
+    stream << "log_files";
+    stream.StartArray();
+    for (auto log_number : log_numbers) {
+      stream << log_number;
+    }
+    stream.EndArray();
+  }
+
+#ifndef ROCKSDB_LITE
+  if (immutable_db_options_.wal_filter != nullptr) {
+    std::map<std::string, uint32_t> cf_name_id_map;
+    std::map<uint32_t, uint64_t> cf_lognumber_map;
+    for (auto cfd : *versions_->GetColumnFamilySet()) {
+      cf_name_id_map.insert(
+        std::make_pair(cfd->GetName(), cfd->GetID()));
+      cf_lognumber_map.insert(
+        std::make_pair(cfd->GetID(), cfd->GetLogNumber()));
+    }
+
+    immutable_db_options_.wal_filter->ColumnFamilyLogNumberMap(cf_lognumber_map,
+                                                               cf_name_id_map);
+  }
+#endif
+
+  bool stop_replay_by_wal_filter = false;
+  bool stop_replay_for_corruption = false;
+  bool flushed = false;
+  for (auto log_number : log_numbers) {
+    // The previous incarnation may not have written any MANIFEST
+    // records after allocating this log number.  So we manually
+    // update the file number allocation counter in VersionSet.
+    versions_->MarkFileNumberUsedDuringRecovery(log_number);
+    // Open the log file
+    std::string fname = LogFileName(immutable_db_options_.wal_dir, log_number);
+
+    ROCKS_LOG_INFO(immutable_db_options_.info_log,
+                   "Recovering log #%" PRIu64 " mode %d", log_number,
+                   immutable_db_options_.wal_recovery_mode);
+    auto logFileDropped = [this, &fname]() {
+      uint64_t bytes;
+      if (env_->GetFileSize(fname, &bytes).ok()) {
+        auto info_log = immutable_db_options_.info_log.get();
+        ROCKS_LOG_WARN(info_log, "%s: dropping %d bytes", fname.c_str(),
+                       static_cast<int>(bytes));
+      }
+    };
+    if (stop_replay_by_wal_filter) {
+      logFileDropped();
+      continue;
+    }
+
+    unique_ptr<SequentialFileReader> file_reader;
+    {
+      unique_ptr<SequentialFile> file;
+      status = env_->NewSequentialFile(fname, &file,
+                                       env_->OptimizeForLogRead(env_options_));
+      if (!status.ok()) {
+        MaybeIgnoreError(&status);
+        if (!status.ok()) {
+          return status;
+        } else {
+          // Fail with one log file, but that's ok.
+          // Try next one.
+          continue;
+        }
+      }
+      file_reader.reset(new SequentialFileReader(std::move(file)));
+    }
+
+    // Create the log reader.
+    LogReporter reporter;
+    reporter.env = env_;
+    reporter.info_log = immutable_db_options_.info_log.get();
+    reporter.fname = fname.c_str();
+    if (!immutable_db_options_.paranoid_checks ||
+        immutable_db_options_.wal_recovery_mode ==
+            WALRecoveryMode::kSkipAnyCorruptedRecords) {
+      reporter.status = nullptr;
+    } else {
+      reporter.status = &status;
+    }
+    // We intentially make log::Reader do checksumming even if
+    // paranoid_checks==false so that corruptions cause entire commits
+    // to be skipped instead of propagating bad information (like overly
+    // large sequence numbers).
+    log::Reader reader(immutable_db_options_.info_log, std::move(file_reader),
+                       &reporter, true /*checksum*/, 0 /*initial_offset*/,
+                       log_number);
+
+    // Determine if we should tolerate incomplete records at the tail end of the
+    // Read all the records and add to a memtable
+    std::string scratch;
+    Slice record;
+    WriteBatch batch;
+
+    while (!stop_replay_by_wal_filter &&
+           reader.ReadRecord(&record, &scratch,
+                             immutable_db_options_.wal_recovery_mode) &&
+           status.ok()) {
+      if (record.size() < WriteBatchInternal::kHeader) {
+        reporter.Corruption(record.size(),
+                            Status::Corruption("log record too small"));
+        continue;
+      }
+      WriteBatchInternal::SetContents(&batch, record);
+      SequenceNumber sequence = WriteBatchInternal::Sequence(&batch);
+
+      if (immutable_db_options_.wal_recovery_mode ==
+          WALRecoveryMode::kPointInTimeRecovery) {
+        // In point-in-time recovery mode, if sequence id of log files are
+        // consecutive, we continue recovery despite corruption. This could
+        // happen when we open and write to a corrupted DB, where sequence id
+        // will start from the last sequence id we recovered.
+        if (sequence == *next_sequence) {
+          stop_replay_for_corruption = false;
+        }
+        if (stop_replay_for_corruption) {
+          logFileDropped();
+          break;
+        }
+      }
+
+#ifndef ROCKSDB_LITE
+      if (immutable_db_options_.wal_filter != nullptr) {
+        WriteBatch new_batch;
+        bool batch_changed = false;
+
+        WalFilter::WalProcessingOption wal_processing_option =
+            immutable_db_options_.wal_filter->LogRecordFound(
+                log_number, fname, batch, &new_batch, &batch_changed);
+
+        switch (wal_processing_option) {
+          case WalFilter::WalProcessingOption::kContinueProcessing:
+            // do nothing, proceeed normally
+            break;
+          case WalFilter::WalProcessingOption::kIgnoreCurrentRecord:
+            // skip current record
+            continue;
+          case WalFilter::WalProcessingOption::kStopReplay:
+            // skip current record and stop replay
+            stop_replay_by_wal_filter = true;
+            continue;
+          case WalFilter::WalProcessingOption::kCorruptedRecord: {
+            status =
+                Status::Corruption("Corruption reported by Wal Filter ",
+                                   immutable_db_options_.wal_filter->Name());
+            MaybeIgnoreError(&status);
+            if (!status.ok()) {
+              reporter.Corruption(record.size(), status);
+              continue;
+            }
+            break;
+          }
+          default: {
+            assert(false);  // unhandled case
+            status = Status::NotSupported(
+                "Unknown WalProcessingOption returned"
+                " by Wal Filter ",
+                immutable_db_options_.wal_filter->Name());
+            MaybeIgnoreError(&status);
+            if (!status.ok()) {
+              return status;
+            } else {
+              // Ignore the error with current record processing.
+              continue;
+            }
+          }
+        }
+
+        if (batch_changed) {
+          // Make sure that the count in the new batch is
+          // within the orignal count.
+          int new_count = WriteBatchInternal::Count(&new_batch);
+          int original_count = WriteBatchInternal::Count(&batch);
+          if (new_count > original_count) {
+            ROCKS_LOG_FATAL(
+                immutable_db_options_.info_log,
+                "Recovering log #%" PRIu64
+                " mode %d log filter %s returned "
+                "more records (%d) than original (%d) which is not allowed. "
+                "Aborting recovery.",
+                log_number, immutable_db_options_.wal_recovery_mode,
+                immutable_db_options_.wal_filter->Name(), new_count,
+                original_count);
+            status = Status::NotSupported(
+                "More than original # of records "
+                "returned by Wal Filter ",
+                immutable_db_options_.wal_filter->Name());
+            return status;
+          }
+          // Set the same sequence number in the new_batch
+          // as the original batch.
+          WriteBatchInternal::SetSequence(&new_batch,
+                                          WriteBatchInternal::Sequence(&batch));
+          batch = new_batch;
+        }
+      }
+#endif  // ROCKSDB_LITE
+
+      // If column family was not found, it might mean that the WAL write
+      // batch references to the column family that was dropped after the
+      // insert. We don't want to fail the whole write batch in that case --
+      // we just ignore the update.
+      // That's why we set ignore missing column families to true
+      bool has_valid_writes = false;
+      status = WriteBatchInternal::InsertInto(
+          &batch, column_family_memtables_.get(), &flush_scheduler_, true,
+          log_number, this, false /* concurrent_memtable_writes */,
+          next_sequence, &has_valid_writes);
+      MaybeIgnoreError(&status);
+      if (!status.ok()) {
+        // We are treating this as a failure while reading since we read valid
+        // blocks that do not form coherent data
+        reporter.Corruption(record.size(), status);
+        continue;
+      }
+
+      if (has_valid_writes && !read_only) {
+        // we can do this because this is called before client has access to the
+        // DB and there is only a single thread operating on DB
+        ColumnFamilyData* cfd;
+
+        while ((cfd = flush_scheduler_.TakeNextColumnFamily()) != nullptr) {
+          cfd->Unref();
+          // If this asserts, it means that InsertInto failed in
+          // filtering updates to already-flushed column families
+          assert(cfd->GetLogNumber() <= log_number);
+          auto iter = version_edits.find(cfd->GetID());
+          assert(iter != version_edits.end());
+          VersionEdit* edit = &iter->second;
+          status = WriteLevel0TableForRecovery(job_id, cfd, cfd->mem(), edit);
+          if (!status.ok()) {
+            // Reflect errors immediately so that conditions like full
+            // file-systems cause the DB::Open() to fail.
+            return status;
+          }
+          flushed = true;
+
+          cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions(),
+                                 *next_sequence);
+        }
+      }
+    }
+
+    if (!status.ok()) {
+      if (immutable_db_options_.wal_recovery_mode ==
+          WALRecoveryMode::kSkipAnyCorruptedRecords) {
+        // We should ignore all errors unconditionally
+        status = Status::OK();
+      } else if (immutable_db_options_.wal_recovery_mode ==
+                 WALRecoveryMode::kPointInTimeRecovery) {
+        // We should ignore the error but not continue replaying
+        status = Status::OK();
+        stop_replay_for_corruption = true;
+        ROCKS_LOG_INFO(immutable_db_options_.info_log,
+                       "Point in time recovered to log #%" PRIu64
+                       " seq #%" PRIu64,
+                       log_number, *next_sequence);
+      } else {
+        assert(immutable_db_options_.wal_recovery_mode ==
+                   WALRecoveryMode::kTolerateCorruptedTailRecords ||
+               immutable_db_options_.wal_recovery_mode ==
+                   WALRecoveryMode::kAbsoluteConsistency);
+        return status;
+      }
+    }
+
+    flush_scheduler_.Clear();
+    auto last_sequence = *next_sequence - 1;
+    if ((*next_sequence != kMaxSequenceNumber) &&
+        (versions_->LastSequence() <= last_sequence)) {
+      versions_->SetLastToBeWrittenSequence(last_sequence);
+      versions_->SetLastSequence(last_sequence);
+    }
+  }
+
+  // True if there's any data in the WALs; if not, we can skip re-processing
+  // them later
+  bool data_seen = false;
+  if (!read_only) {
+    // no need to refcount since client still doesn't have access
+    // to the DB and can not drop column families while we iterate
+    auto max_log_number = log_numbers.back();
+    for (auto cfd : *versions_->GetColumnFamilySet()) {
+      auto iter = version_edits.find(cfd->GetID());
+      assert(iter != version_edits.end());
+      VersionEdit* edit = &iter->second;
+
+      if (cfd->GetLogNumber() > max_log_number) {
+        // Column family cfd has already flushed the data
+        // from all logs. Memtable has to be empty because
+        // we filter the updates based on log_number
+        // (in WriteBatch::InsertInto)
+        assert(cfd->mem()->GetFirstSequenceNumber() == 0);
+        assert(edit->NumEntries() == 0);
+        continue;
+      }
+
+      // flush the final memtable (if non-empty)
+      if (cfd->mem()->GetFirstSequenceNumber() != 0) {
+        // If flush happened in the middle of recovery (e.g. due to memtable
+        // being full), we flush at the end. Otherwise we'll need to record
+        // where we were on last flush, which make the logic complicated.
+        if (flushed || !immutable_db_options_.avoid_flush_during_recovery) {
+          status = WriteLevel0TableForRecovery(job_id, cfd, cfd->mem(), edit);
+          if (!status.ok()) {
+            // Recovery failed
+            break;
+          }
+          flushed = true;
+
+          cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions(),
+                                 versions_->LastSequence());
+        }
+        data_seen = true;
+      }
+
+      // write MANIFEST with update
+      // writing log_number in the manifest means that any log file
+      // with number strongly less than (log_number + 1) is already
+      // recovered and should be ignored on next reincarnation.
+      // Since we already recovered max_log_number, we want all logs
+      // with numbers `<= max_log_number` (includes this one) to be ignored
+      if (flushed || cfd->mem()->GetFirstSequenceNumber() == 0) {
+        edit->SetLogNumber(max_log_number + 1);
+      }
+      // we must mark the next log number as used, even though it's
+      // not actually used. that is because VersionSet assumes
+      // VersionSet::next_file_number_ always to be strictly greater than any
+      // log number
+      versions_->MarkFileNumberUsedDuringRecovery(max_log_number + 1);
+      status = versions_->LogAndApply(
+          cfd, *cfd->GetLatestMutableCFOptions(), edit, &mutex_);
+      if (!status.ok()) {
+        // Recovery failed
+        break;
+      }
+    }
+  }
+
+  if (data_seen && !flushed) {
+    // Mark these as alive so they'll be considered for deletion later by
+    // FindObsoleteFiles()
+    if (concurrent_prepare_) {
+      log_write_mutex_.Lock();
+    }
+    for (auto log_number : log_numbers) {
+      alive_log_files_.push_back(LogFileNumberSize(log_number));
+    }
+    if (concurrent_prepare_) {
+      log_write_mutex_.Unlock();
+    }
+  }
+
+  event_logger_.Log() << "job" << job_id << "event"
+                      << "recovery_finished";
+
+  return status;
+}
+
+Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
+                                           MemTable* mem, VersionEdit* edit) {
+  mutex_.AssertHeld();
+  const uint64_t start_micros = env_->NowMicros();
+  FileMetaData meta;
+  auto pending_outputs_inserted_elem =
+      CaptureCurrentFileNumberInPendingOutputs();
+  meta.fd = FileDescriptor(versions_->NewFileNumber(), 0, 0);
+  ReadOptions ro;
+  ro.total_order_seek = true;
+  Arena arena;
+  Status s;
+  TableProperties table_properties;
+  {
+    ScopedArenaIterator iter(mem->NewIterator(ro, &arena));
+    ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
+                    "[%s] [WriteLevel0TableForRecovery]"
+                    " Level-0 table #%" PRIu64 ": started",
+                    cfd->GetName().c_str(), meta.fd.GetNumber());
+
+    // Get the latest mutable cf options while the mutex is still locked
+    const MutableCFOptions mutable_cf_options =
+        *cfd->GetLatestMutableCFOptions();
+    bool paranoid_file_checks =
+        cfd->GetLatestMutableCFOptions()->paranoid_file_checks;
+
+    int64_t _current_time = 0;
+    env_->GetCurrentTime(&_current_time);  // ignore error
+    const uint64_t current_time = static_cast<uint64_t>(_current_time);
+
+    {
+      mutex_.Unlock();
+
+      SequenceNumber earliest_write_conflict_snapshot;
+      std::vector<SequenceNumber> snapshot_seqs =
+          snapshots_.GetAll(&earliest_write_conflict_snapshot);
+
+      EnvOptions optimized_env_options =
+          env_->OptimizeForCompactionTableWrite(env_options_, immutable_db_options_);
+      s = BuildTable(
+          dbname_, env_, *cfd->ioptions(), mutable_cf_options,
+          optimized_env_options, cfd->table_cache(), iter.get(),
+          std::unique_ptr<InternalIterator>(mem->NewRangeTombstoneIterator(ro)),
+          &meta, cfd->internal_comparator(),
+          cfd->int_tbl_prop_collector_factories(), cfd->GetID(), cfd->GetName(),
+          snapshot_seqs, earliest_write_conflict_snapshot,
+          GetCompressionFlush(*cfd->ioptions(), mutable_cf_options),
+          cfd->ioptions()->compression_opts, paranoid_file_checks,
+          cfd->internal_stats(), TableFileCreationReason::kRecovery,
+          &event_logger_, job_id, Env::IO_HIGH, nullptr /* table_properties */,
+          -1 /* level */, current_time);
+      LogFlush(immutable_db_options_.info_log);
+      ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
+                      "[%s] [WriteLevel0TableForRecovery]"
+                      " Level-0 table #%" PRIu64 ": %" PRIu64 " bytes %s",
+                      cfd->GetName().c_str(), meta.fd.GetNumber(),
+                      meta.fd.GetFileSize(), s.ToString().c_str());
+      mutex_.Lock();
+    }
+  }
+  ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem);
+
+  // Note that if file_size is zero, the file has been deleted and
+  // should not be added to the manifest.
+  int level = 0;
+  if (s.ok() && meta.fd.GetFileSize() > 0) {
+    edit->AddFile(level, meta.fd.GetNumber(), meta.fd.GetPathId(),
+                  meta.fd.GetFileSize(), meta.smallest, meta.largest,
+                  meta.smallest_seqno, meta.largest_seqno,
+                  meta.marked_for_compaction);
+  }
+
+  InternalStats::CompactionStats stats(1);
+  stats.micros = env_->NowMicros() - start_micros;
+  stats.bytes_written = meta.fd.GetFileSize();
+  stats.num_output_files = 1;
+  cfd->internal_stats()->AddCompactionStats(level, stats);
+  cfd->internal_stats()->AddCFStats(
+      InternalStats::BYTES_FLUSHED, meta.fd.GetFileSize());
+  RecordTick(stats_, COMPACT_WRITE_BYTES, meta.fd.GetFileSize());
+  return s;
+}
+
+Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
+  DBOptions db_options(options);
+  ColumnFamilyOptions cf_options(options);
+  std::vector<ColumnFamilyDescriptor> column_families;
+  column_families.push_back(
+      ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options));
+  std::vector<ColumnFamilyHandle*> handles;
+  Status s = DB::Open(db_options, dbname, column_families, &handles, dbptr);
+  if (s.ok()) {
+    assert(handles.size() == 1);
+    // i can delete the handle since DBImpl is always holding a reference to
+    // default column family
+    delete handles[0];
+  }
+  return s;
+}
+
+Status DB::Open(const DBOptions& db_options, const std::string& dbname,
+                const std::vector<ColumnFamilyDescriptor>& column_families,
+                std::vector<ColumnFamilyHandle*>* handles, DB** dbptr) {
+  Status s = SanitizeOptionsByTable(db_options, column_families);
+  if (!s.ok()) {
+    return s;
+  }
+
+  s = ValidateOptions(db_options, column_families);
+  if (!s.ok()) {
+    return s;
+  }
+
+  *dbptr = nullptr;
+  handles->clear();
+
+  size_t max_write_buffer_size = 0;
+  for (auto cf : column_families) {
+    max_write_buffer_size =
+        std::max(max_write_buffer_size, cf.options.write_buffer_size);
+  }
+
+  DBImpl* impl = new DBImpl(db_options, dbname);
+  s = impl->env_->CreateDirIfMissing(impl->immutable_db_options_.wal_dir);
+  if (s.ok()) {
+    for (auto db_path : impl->immutable_db_options_.db_paths) {
+      s = impl->env_->CreateDirIfMissing(db_path.path);
+      if (!s.ok()) {
+        break;
+      }
+    }
+  }
+
+  if (!s.ok()) {
+    delete impl;
+    return s;
+  }
+
+  s = impl->CreateArchivalDirectory();
+  if (!s.ok()) {
+    delete impl;
+    return s;
+  }
+  impl->mutex_.Lock();
+  // Handles create_if_missing, error_if_exists
+  s = impl->Recover(column_families);
+  if (s.ok()) {
+    uint64_t new_log_number = impl->versions_->NewFileNumber();
+    unique_ptr<WritableFile> lfile;
+    EnvOptions soptions(db_options);
+    EnvOptions opt_env_options =
+        impl->immutable_db_options_.env->OptimizeForLogWrite(
+            soptions, BuildDBOptions(impl->immutable_db_options_,
+                                     impl->mutable_db_options_));
+    s = NewWritableFile(
+        impl->immutable_db_options_.env,
+        LogFileName(impl->immutable_db_options_.wal_dir, new_log_number),
+        &lfile, opt_env_options);
+    if (s.ok()) {
+      lfile->SetPreallocationBlockSize(
+          impl->GetWalPreallocateBlockSize(max_write_buffer_size));
+      {
+        InstrumentedMutexLock wl(&impl->log_write_mutex_);
+        impl->logfile_number_ = new_log_number;
+        unique_ptr<WritableFileWriter> file_writer(
+            new WritableFileWriter(std::move(lfile), opt_env_options));
+        impl->logs_.emplace_back(
+            new_log_number,
+            new log::Writer(
+                std::move(file_writer), new_log_number,
+                impl->immutable_db_options_.recycle_log_file_num > 0));
+      }
+
+      // set column family handles
+      for (auto cf : column_families) {
+        auto cfd =
+            impl->versions_->GetColumnFamilySet()->GetColumnFamily(cf.name);
+        if (cfd != nullptr) {
+          handles->push_back(
+              new ColumnFamilyHandleImpl(cfd, impl, &impl->mutex_));
+          impl->NewThreadStatusCfInfo(cfd);
+        } else {
+          if (db_options.create_missing_column_families) {
+            // missing column family, create it
+            ColumnFamilyHandle* handle;
+            impl->mutex_.Unlock();
+            s = impl->CreateColumnFamily(cf.options, cf.name, &handle);
+            impl->mutex_.Lock();
+            if (s.ok()) {
+              handles->push_back(handle);
+            } else {
+              break;
+            }
+          } else {
+            s = Status::InvalidArgument("Column family not found: ", cf.name);
+            break;
+          }
+        }
+      }
+    }
+    if (s.ok()) {
+      for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
+        delete impl->InstallSuperVersionAndScheduleWork(
+            cfd, nullptr, *cfd->GetLatestMutableCFOptions());
+      }
+      if (impl->concurrent_prepare_) {
+        impl->log_write_mutex_.Lock();
+      }
+      impl->alive_log_files_.push_back(
+          DBImpl::LogFileNumberSize(impl->logfile_number_));
+      if (impl->concurrent_prepare_) {
+        impl->log_write_mutex_.Unlock();
+      }
+      impl->DeleteObsoleteFiles();
+      s = impl->directories_.GetDbDir()->Fsync();
+    }
+  }
+
+  if (s.ok()) {
+    for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
+      if (cfd->ioptions()->compaction_style == kCompactionStyleFIFO) {
+        auto* vstorage = cfd->current()->storage_info();
+        for (int i = 1; i < vstorage->num_levels(); ++i) {
+          int num_files = vstorage->NumLevelFiles(i);
+          if (num_files > 0) {
+            s = Status::InvalidArgument(
+                "Not all files are at level 0. Cannot "
+                "open with FIFO compaction style.");
+            break;
+          }
+        }
+      }
+      if (!cfd->mem()->IsSnapshotSupported()) {
+        impl->is_snapshot_supported_ = false;
+      }
+      if (cfd->ioptions()->merge_operator != nullptr &&
+          !cfd->mem()->IsMergeOperatorSupported()) {
+        s = Status::InvalidArgument(
+            "The memtable of column family %s does not support merge operator "
+            "its options.merge_operator is non-null", cfd->GetName().c_str());
+      }
+      if (!s.ok()) {
+        break;
+      }
+    }
+  }
+  TEST_SYNC_POINT("DBImpl::Open:Opened");
+  Status persist_options_status;
+  if (s.ok()) {
+    // Persist RocksDB Options before scheduling the compaction.
+    // The WriteOptionsFile() will release and lock the mutex internally.
+    persist_options_status = impl->WriteOptionsFile(
+        false /*need_mutex_lock*/, false /*need_enter_write_thread*/);
+
+    *dbptr = impl;
+    impl->opened_successfully_ = true;
+    impl->MaybeScheduleFlushOrCompaction();
+  }
+  impl->mutex_.Unlock();
+
+#ifndef ROCKSDB_LITE
+  auto sfm = static_cast<SstFileManagerImpl*>(
+      impl->immutable_db_options_.sst_file_manager.get());
+  if (s.ok() && sfm) {
+    // Notify SstFileManager about all sst files that already exist in
+    // db_paths[0] when the DB is opened.
+    auto& db_path = impl->immutable_db_options_.db_paths[0];
+    std::vector<std::string> existing_files;
+    impl->immutable_db_options_.env->GetChildren(db_path.path, &existing_files);
+    for (auto& file_name : existing_files) {
+      uint64_t file_number;
+      FileType file_type;
+      std::string file_path = db_path.path + "/" + file_name;
+      if (ParseFileName(file_name, &file_number, &file_type) &&
+          file_type == kTableFile) {
+        sfm->OnAddFile(file_path);
+      }
+    }
+  }
+#endif  // !ROCKSDB_LITE
+
+  if (s.ok()) {
+    ROCKS_LOG_INFO(impl->immutable_db_options_.info_log, "DB pointer %p", impl);
+    LogFlush(impl->immutable_db_options_.info_log);
+    if (!persist_options_status.ok()) {
+      s = Status::IOError(
+          "DB::Open() failed --- Unable to persist Options file",
+          persist_options_status.ToString());
+    }
+  }
+  if (!s.ok()) {
+    for (auto* h : *handles) {
+      delete h;
+    }
+    handles->clear();
+    delete impl;
+    *dbptr = nullptr;
+  }
+  return s;
+}
+}  // namespace rocksdb

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/db_impl_readonly.cc
----------------------------------------------------------------------
diff --git a/thirdparty/rocksdb/db/db_impl_readonly.cc b/thirdparty/rocksdb/db/db_impl_readonly.cc
new file mode 100644
index 0000000..d69eecb
--- /dev/null
+++ b/thirdparty/rocksdb/db/db_impl_readonly.cc
@@ -0,0 +1,197 @@
+//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
+
+#include "db/db_impl_readonly.h"
+
+#include "db/compacted_db_impl.h"
+#include "db/db_impl.h"
+#include "db/db_iter.h"
+#include "db/merge_context.h"
+#include "db/range_del_aggregator.h"
+#include "monitoring/perf_context_imp.h"
+
+namespace rocksdb {
+
+#ifndef ROCKSDB_LITE
+
+DBImplReadOnly::DBImplReadOnly(const DBOptions& db_options,
+                               const std::string& dbname)
+    : DBImpl(db_options, dbname) {
+  ROCKS_LOG_INFO(immutable_db_options_.info_log,
+                 "Opening the db in read only mode");
+  LogFlush(immutable_db_options_.info_log);
+}
+
+DBImplReadOnly::~DBImplReadOnly() {
+}
+
+// Implementations of the DB interface
+Status DBImplReadOnly::Get(const ReadOptions& read_options,
+                           ColumnFamilyHandle* column_family, const Slice& key,
+                           PinnableSlice* pinnable_val) {
+  assert(pinnable_val != nullptr);
+  Status s;
+  SequenceNumber snapshot = versions_->LastSequence();
+  auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
+  auto cfd = cfh->cfd();
+  SuperVersion* super_version = cfd->GetSuperVersion();
+  MergeContext merge_context;
+  RangeDelAggregator range_del_agg(cfd->internal_comparator(), snapshot);
+  LookupKey lkey(key, snapshot);
+  if (super_version->mem->Get(lkey, pinnable_val->GetSelf(), &s, &merge_context,
+                              &range_del_agg, read_options)) {
+    pinnable_val->PinSelf();
+  } else {
+    PERF_TIMER_GUARD(get_from_output_files_time);
+    super_version->current->Get(read_options, lkey, pinnable_val, &s,
+                                &merge_context, &range_del_agg);
+  }
+  return s;
+}
+
+Iterator* DBImplReadOnly::NewIterator(const ReadOptions& read_options,
+                                      ColumnFamilyHandle* column_family) {
+  auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
+  auto cfd = cfh->cfd();
+  SuperVersion* super_version = cfd->GetSuperVersion()->Ref();
+  SequenceNumber latest_snapshot = versions_->LastSequence();
+  auto db_iter = NewArenaWrappedDbIterator(
+      env_, read_options, *cfd->ioptions(),
+      (read_options.snapshot != nullptr
+           ? reinterpret_cast<const SnapshotImpl*>(read_options.snapshot)
+                 ->number_
+           : latest_snapshot),
+      super_version->mutable_cf_options.max_sequential_skip_in_iterations,
+      super_version->version_number);
+  auto internal_iter =
+      NewInternalIterator(read_options, cfd, super_version, db_iter->GetArena(),
+                          db_iter->GetRangeDelAggregator());
+  db_iter->SetIterUnderDBIter(internal_iter);
+  return db_iter;
+}
+
+Status DBImplReadOnly::NewIterators(
+    const ReadOptions& read_options,
+    const std::vector<ColumnFamilyHandle*>& column_families,
+    std::vector<Iterator*>* iterators) {
+  if (iterators == nullptr) {
+    return Status::InvalidArgument("iterators not allowed to be nullptr");
+  }
+  iterators->clear();
+  iterators->reserve(column_families.size());
+  SequenceNumber latest_snapshot = versions_->LastSequence();
+
+  for (auto cfh : column_families) {
+    auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(cfh)->cfd();
+    auto* sv = cfd->GetSuperVersion()->Ref();
+    auto* db_iter = NewArenaWrappedDbIterator(
+        env_, read_options, *cfd->ioptions(),
+        (read_options.snapshot != nullptr
+             ? reinterpret_cast<const SnapshotImpl*>(read_options.snapshot)
+                   ->number_
+             : latest_snapshot),
+        sv->mutable_cf_options.max_sequential_skip_in_iterations,
+        sv->version_number);
+    auto* internal_iter =
+        NewInternalIterator(read_options, cfd, sv, db_iter->GetArena(),
+                            db_iter->GetRangeDelAggregator());
+    db_iter->SetIterUnderDBIter(internal_iter);
+    iterators->push_back(db_iter);
+  }
+
+  return Status::OK();
+}
+
+Status DB::OpenForReadOnly(const Options& options, const std::string& dbname,
+                           DB** dbptr, bool error_if_log_file_exist) {
+  *dbptr = nullptr;
+
+  // Try to first open DB as fully compacted DB
+  Status s;
+  s = CompactedDBImpl::Open(options, dbname, dbptr);
+  if (s.ok()) {
+    return s;
+  }
+
+  DBOptions db_options(options);
+  ColumnFamilyOptions cf_options(options);
+  std::vector<ColumnFamilyDescriptor> column_families;
+  column_families.push_back(
+      ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options));
+  std::vector<ColumnFamilyHandle*> handles;
+
+  s = DB::OpenForReadOnly(db_options, dbname, column_families, &handles, dbptr);
+  if (s.ok()) {
+    assert(handles.size() == 1);
+    // i can delete the handle since DBImpl is always holding a
+    // reference to default column family
+    delete handles[0];
+  }
+  return s;
+}
+
+Status DB::OpenForReadOnly(
+    const DBOptions& db_options, const std::string& dbname,
+    const std::vector<ColumnFamilyDescriptor>& column_families,
+    std::vector<ColumnFamilyHandle*>* handles, DB** dbptr,
+    bool error_if_log_file_exist) {
+  *dbptr = nullptr;
+  handles->clear();
+
+  DBImplReadOnly* impl = new DBImplReadOnly(db_options, dbname);
+  impl->mutex_.Lock();
+  Status s = impl->Recover(column_families, true /* read only */,
+                           error_if_log_file_exist);
+  if (s.ok()) {
+    // set column family handles
+    for (auto cf : column_families) {
+      auto cfd =
+          impl->versions_->GetColumnFamilySet()->GetColumnFamily(cf.name);
+      if (cfd == nullptr) {
+        s = Status::InvalidArgument("Column family not found: ", cf.name);
+        break;
+      }
+      handles->push_back(new ColumnFamilyHandleImpl(cfd, impl, &impl->mutex_));
+    }
+  }
+  if (s.ok()) {
+    for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
+      delete cfd->InstallSuperVersion(new SuperVersion(), &impl->mutex_);
+    }
+  }
+  impl->mutex_.Unlock();
+  if (s.ok()) {
+    *dbptr = impl;
+    for (auto* h : *handles) {
+      impl->NewThreadStatusCfInfo(
+          reinterpret_cast<ColumnFamilyHandleImpl*>(h)->cfd());
+    }
+  } else {
+    for (auto h : *handles) {
+      delete h;
+    }
+    handles->clear();
+    delete impl;
+  }
+  return s;
+}
+
+#else  // !ROCKSDB_LITE
+
+Status DB::OpenForReadOnly(const Options& options, const std::string& dbname,
+                           DB** dbptr, bool error_if_log_file_exist) {
+  return Status::NotSupported("Not supported in ROCKSDB_LITE.");
+}
+
+Status DB::OpenForReadOnly(
+    const DBOptions& db_options, const std::string& dbname,
+    const std::vector<ColumnFamilyDescriptor>& column_families,
+    std::vector<ColumnFamilyHandle*>* handles, DB** dbptr,
+    bool error_if_log_file_exist) {
+  return Status::NotSupported("Not supported in ROCKSDB_LITE.");
+}
+#endif  // !ROCKSDB_LITE
+
+}   // namespace rocksdb

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/db_impl_readonly.h
----------------------------------------------------------------------
diff --git a/thirdparty/rocksdb/db/db_impl_readonly.h b/thirdparty/rocksdb/db/db_impl_readonly.h
new file mode 100644
index 0000000..9bdc95c
--- /dev/null
+++ b/thirdparty/rocksdb/db/db_impl_readonly.h
@@ -0,0 +1,123 @@
+//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
+
+#pragma once
+
+#ifndef ROCKSDB_LITE
+
+#include "db/db_impl.h"
+#include <vector>
+#include <string>
+
+namespace rocksdb {
+
+class DBImplReadOnly : public DBImpl {
+ public:
+  DBImplReadOnly(const DBOptions& options, const std::string& dbname);
+  virtual ~DBImplReadOnly();
+
+  // Implementations of the DB interface
+  using DB::Get;
+  virtual Status Get(const ReadOptions& options,
+                     ColumnFamilyHandle* column_family, const Slice& key,
+                     PinnableSlice* value) override;
+
+  // TODO: Implement ReadOnly MultiGet?
+
+  using DBImpl::NewIterator;
+  virtual Iterator* NewIterator(const ReadOptions&,
+                                ColumnFamilyHandle* column_family) override;
+
+  virtual Status NewIterators(
+      const ReadOptions& options,
+      const std::vector<ColumnFamilyHandle*>& column_families,
+      std::vector<Iterator*>* iterators) override;
+
+  using DBImpl::Put;
+  virtual Status Put(const WriteOptions& options,
+                     ColumnFamilyHandle* column_family, const Slice& key,
+                     const Slice& value) override {
+    return Status::NotSupported("Not supported operation in read only mode.");
+  }
+  using DBImpl::Merge;
+  virtual Status Merge(const WriteOptions& options,
+                       ColumnFamilyHandle* column_family, const Slice& key,
+                       const Slice& value) override {
+    return Status::NotSupported("Not supported operation in read only mode.");
+  }
+  using DBImpl::Delete;
+  virtual Status Delete(const WriteOptions& options,
+                        ColumnFamilyHandle* column_family,
+                        const Slice& key) override {
+    return Status::NotSupported("Not supported operation in read only mode.");
+  }
+  using DBImpl::SingleDelete;
+  virtual Status SingleDelete(const WriteOptions& options,
+                              ColumnFamilyHandle* column_family,
+                              const Slice& key) override {
+    return Status::NotSupported("Not supported operation in read only mode.");
+  }
+  virtual Status Write(const WriteOptions& options,
+                       WriteBatch* updates) override {
+    return Status::NotSupported("Not supported operation in read only mode.");
+  }
+  using DBImpl::CompactRange;
+  virtual Status CompactRange(const CompactRangeOptions& options,
+                              ColumnFamilyHandle* column_family,
+                              const Slice* begin, const Slice* end) override {
+    return Status::NotSupported("Not supported operation in read only mode.");
+  }
+
+  using DBImpl::CompactFiles;
+  virtual Status CompactFiles(
+      const CompactionOptions& compact_options,
+      ColumnFamilyHandle* column_family,
+      const std::vector<std::string>& input_file_names,
+      const int output_level, const int output_path_id = -1) override {
+    return Status::NotSupported("Not supported operation in read only mode.");
+  }
+
+  virtual Status DisableFileDeletions() override {
+    return Status::NotSupported("Not supported operation in read only mode.");
+  }
+
+  virtual Status EnableFileDeletions(bool force) override {
+    return Status::NotSupported("Not supported operation in read only mode.");
+  }
+  virtual Status GetLiveFiles(std::vector<std::string>&,
+                              uint64_t* manifest_file_size,
+                              bool flush_memtable = true) override {
+    return Status::NotSupported("Not supported operation in read only mode.");
+  }
+
+  using DBImpl::Flush;
+  virtual Status Flush(const FlushOptions& options,
+                       ColumnFamilyHandle* column_family) override {
+    return Status::NotSupported("Not supported operation in read only mode.");
+  }
+
+  using DBImpl::SyncWAL;
+  virtual Status SyncWAL() override {
+    return Status::NotSupported("Not supported operation in read only mode.");
+  }
+
+  using DB::IngestExternalFile;
+  virtual Status IngestExternalFile(
+      ColumnFamilyHandle* column_family,
+      const std::vector<std::string>& external_files,
+      const IngestExternalFileOptions& ingestion_options) override {
+    return Status::NotSupported("Not supported operation in read only mode.");
+  }
+
+ private:
+  friend class DB;
+
+  // No copying allowed
+  DBImplReadOnly(const DBImplReadOnly&);
+  void operator=(const DBImplReadOnly&);
+};
+}
+
+#endif  // !ROCKSDB_LITE


Mime
View raw message