http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/db_impl_files.cc
----------------------------------------------------------------------
diff --git a/thirdparty/rocksdb/db/db_impl_files.cc b/thirdparty/rocksdb/db/db_impl_files.cc
new file mode 100644
index 0000000..e44e423
--- /dev/null
+++ b/thirdparty/rocksdb/db/db_impl_files.cc
@@ -0,0 +1,548 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+#include "db/db_impl.h"
+
+#ifndef __STDC_FORMAT_MACROS
+#define __STDC_FORMAT_MACROS
+#endif
+#include <inttypes.h>
+#include "db/event_helpers.h"
+#include "util/file_util.h"
+#include "util/sst_file_manager_impl.h"
+
+
+namespace rocksdb {
+uint64_t DBImpl::FindMinPrepLogReferencedByMemTable() {
+ if (!allow_2pc()) {
+ return 0;
+ }
+
+ uint64_t min_log = 0;
+
+ // we must look through the memtables for two phase transactions
+ // that have been committed but not yet flushed
+ for (auto loop_cfd : *versions_->GetColumnFamilySet()) {
+ if (loop_cfd->IsDropped()) {
+ continue;
+ }
+
+ auto log = loop_cfd->imm()->GetMinLogContainingPrepSection();
+
+ if (log > 0 && (min_log == 0 || log < min_log)) {
+ min_log = log;
+ }
+
+ log = loop_cfd->mem()->GetMinLogContainingPrepSection();
+
+ if (log > 0 && (min_log == 0 || log < min_log)) {
+ min_log = log;
+ }
+ }
+
+ return min_log;
+}
+
+void DBImpl::MarkLogAsHavingPrepSectionFlushed(uint64_t log) {
+ assert(log != 0);
+ std::lock_guard<std::mutex> lock(prep_heap_mutex_);
+ auto it = prepared_section_completed_.find(log);
+ assert(it != prepared_section_completed_.end());
+ it->second += 1;
+}
+
+void DBImpl::MarkLogAsContainingPrepSection(uint64_t log) {
+ assert(log != 0);
+ std::lock_guard<std::mutex> lock(prep_heap_mutex_);
+ min_log_with_prep_.push(log);
+ auto it = prepared_section_completed_.find(log);
+ if (it == prepared_section_completed_.end()) {
+ prepared_section_completed_[log] = 0;
+ }
+}
+
+uint64_t DBImpl::FindMinLogContainingOutstandingPrep() {
+
+ if (!allow_2pc()) {
+ return 0;
+ }
+
+ std::lock_guard<std::mutex> lock(prep_heap_mutex_);
+ uint64_t min_log = 0;
+
+ // first we look in the prepared heap where we keep
+ // track of transactions that have been prepared (written to WAL)
+ // but not yet committed.
+ while (!min_log_with_prep_.empty()) {
+ min_log = min_log_with_prep_.top();
+
+ auto it = prepared_section_completed_.find(min_log);
+
+ // value was marked as 'deleted' from heap
+ if (it != prepared_section_completed_.end() && it->second > 0) {
+ it->second -= 1;
+ min_log_with_prep_.pop();
+
+ // back to squere one...
+ min_log = 0;
+ continue;
+ } else {
+ // found a valid value
+ break;
+ }
+ }
+
+ return min_log;
+}
+
+uint64_t DBImpl::MinLogNumberToKeep() {
+ uint64_t log_number = versions_->MinLogNumber();
+
+ if (allow_2pc()) {
+ // if are 2pc we must consider logs containing prepared
+ // sections of outstanding transactions.
+ //
+ // We must check min logs with outstanding prep before we check
+ // logs referneces by memtables because a log referenced by the
+ // first data structure could transition to the second under us.
+ //
+ // TODO(horuff): iterating over all column families under db mutex.
+ // should find more optimial solution
+ auto min_log_in_prep_heap = FindMinLogContainingOutstandingPrep();
+
+ if (min_log_in_prep_heap != 0 && min_log_in_prep_heap < log_number) {
+ log_number = min_log_in_prep_heap;
+ }
+
+ auto min_log_refed_by_mem = FindMinPrepLogReferencedByMemTable();
+
+ if (min_log_refed_by_mem != 0 && min_log_refed_by_mem < log_number) {
+ log_number = min_log_refed_by_mem;
+ }
+ }
+ return log_number;
+}
+
+// * Returns the list of live files in 'sst_live'
+// If it's doing full scan:
+// * Returns the list of all files in the filesystem in
+// 'full_scan_candidate_files'.
+// Otherwise, gets obsolete files from VersionSet.
+// no_full_scan = true -- never do the full scan using GetChildren()
+// force = false -- don't force the full scan, except every
+// mutable_db_options_.delete_obsolete_files_period_micros
+// force = true -- force the full scan
+void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
+ bool no_full_scan) {
+ mutex_.AssertHeld();
+
+ // if deletion is disabled, do nothing
+ if (disable_delete_obsolete_files_ > 0) {
+ return;
+ }
+
+ bool doing_the_full_scan = false;
+
+ // logic for figurint out if we're doing the full scan
+ if (no_full_scan) {
+ doing_the_full_scan = false;
+ } else if (force ||
+ mutable_db_options_.delete_obsolete_files_period_micros == 0) {
+ doing_the_full_scan = true;
+ } else {
+ const uint64_t now_micros = env_->NowMicros();
+ if ((delete_obsolete_files_last_run_ +
+ mutable_db_options_.delete_obsolete_files_period_micros) <
+ now_micros) {
+ doing_the_full_scan = true;
+ delete_obsolete_files_last_run_ = now_micros;
+ }
+ }
+
+ // don't delete files that might be currently written to from compaction
+ // threads
+ // Since job_context->min_pending_output is set, until file scan finishes,
+ // mutex_ cannot be released. Otherwise, we might see no min_pending_output
+ // here but later find newer generated unfinalized files while scannint.
+ if (!pending_outputs_.empty()) {
+ job_context->min_pending_output = *pending_outputs_.begin();
+ } else {
+ // delete all of them
+ job_context->min_pending_output = std::numeric_limits<uint64_t>::max();
+ }
+
+ // Get obsolete files. This function will also update the list of
+ // pending files in VersionSet().
+ versions_->GetObsoleteFiles(&job_context->sst_delete_files,
+ &job_context->manifest_delete_files,
+ job_context->min_pending_output);
+
+ // store the current filenum, lognum, etc
+ job_context->manifest_file_number = versions_->manifest_file_number();
+ job_context->pending_manifest_file_number =
+ versions_->pending_manifest_file_number();
+ job_context->log_number = MinLogNumberToKeep();
+
+ job_context->prev_log_number = versions_->prev_log_number();
+
+ versions_->AddLiveFiles(&job_context->sst_live);
+ if (doing_the_full_scan) {
+ for (size_t path_id = 0; path_id < immutable_db_options_.db_paths.size();
+ path_id++) {
+ // set of all files in the directory. We'll exclude files that are still
+ // alive in the subsequent processings.
+ std::vector<std::string> files;
+ env_->GetChildren(immutable_db_options_.db_paths[path_id].path,
+ &files); // Ignore errors
+ for (std::string file : files) {
+ // TODO(icanadi) clean up this mess to avoid having one-off "/" prefixes
+ job_context->full_scan_candidate_files.emplace_back(
+ "/" + file, static_cast<uint32_t>(path_id));
+ }
+ }
+
+ // Add log files in wal_dir
+ if (immutable_db_options_.wal_dir != dbname_) {
+ std::vector<std::string> log_files;
+ env_->GetChildren(immutable_db_options_.wal_dir,
+ &log_files); // Ignore errors
+ for (std::string log_file : log_files) {
+ job_context->full_scan_candidate_files.emplace_back(log_file, 0);
+ }
+ }
+ // Add info log files in db_log_dir
+ if (!immutable_db_options_.db_log_dir.empty() &&
+ immutable_db_options_.db_log_dir != dbname_) {
+ std::vector<std::string> info_log_files;
+ // Ignore errors
+ env_->GetChildren(immutable_db_options_.db_log_dir, &info_log_files);
+ for (std::string log_file : info_log_files) {
+ job_context->full_scan_candidate_files.emplace_back(log_file, 0);
+ }
+ }
+ }
+
+ // logs_ is empty when called during recovery, in which case there can't yet
+ // be any tracked obsolete logs
+ if (!alive_log_files_.empty() && !logs_.empty()) {
+ uint64_t min_log_number = job_context->log_number;
+ size_t num_alive_log_files = alive_log_files_.size();
+ // find newly obsoleted log files
+ while (alive_log_files_.begin()->number < min_log_number) {
+ auto& earliest = *alive_log_files_.begin();
+ if (immutable_db_options_.recycle_log_file_num >
+ log_recycle_files.size()) {
+ ROCKS_LOG_INFO(immutable_db_options_.info_log,
+ "adding log %" PRIu64 " to recycle list\n",
+ earliest.number);
+ log_recycle_files.push_back(earliest.number);
+ } else {
+ job_context->log_delete_files.push_back(earliest.number);
+ }
+ if (job_context->size_log_to_delete == 0) {
+ job_context->prev_total_log_size = total_log_size_;
+ job_context->num_alive_log_files = num_alive_log_files;
+ }
+ job_context->size_log_to_delete += earliest.size;
+ total_log_size_ -= earliest.size;
+ if (concurrent_prepare_) {
+ log_write_mutex_.Lock();
+ }
+ alive_log_files_.pop_front();
+ if (concurrent_prepare_) {
+ log_write_mutex_.Unlock();
+ }
+ // Current log should always stay alive since it can't have
+ // number < MinLogNumber().
+ assert(alive_log_files_.size());
+ }
+ while (!logs_.empty() && logs_.front().number < min_log_number) {
+ auto& log = logs_.front();
+ if (log.getting_synced) {
+ log_sync_cv_.Wait();
+ // logs_ could have changed while we were waiting.
+ continue;
+ }
+ logs_to_free_.push_back(log.ReleaseWriter());
+ {
+ InstrumentedMutexLock wl(&log_write_mutex_);
+ logs_.pop_front();
+ }
+ }
+ // Current log cannot be obsolete.
+ assert(!logs_.empty());
+ }
+
+ // We're just cleaning up for DB::Write().
+ assert(job_context->logs_to_free.empty());
+ job_context->logs_to_free = logs_to_free_;
+ job_context->log_recycle_files.assign(log_recycle_files.begin(),
+ log_recycle_files.end());
+ logs_to_free_.clear();
+}
+
+namespace {
+bool CompareCandidateFile(const JobContext::CandidateFileInfo& first,
+ const JobContext::CandidateFileInfo& second) {
+ if (first.file_name > second.file_name) {
+ return true;
+ } else if (first.file_name < second.file_name) {
+ return false;
+ } else {
+ return (first.path_id > second.path_id);
+ }
+}
+}; // namespace
+
+// Delete obsolete files and log status and information of file deletion
+void DBImpl::DeleteObsoleteFileImpl(Status file_deletion_status, int job_id,
+ const std::string& fname, FileType type,
+ uint64_t number, uint32_t path_id) {
+ if (type == kTableFile) {
+ file_deletion_status =
+ DeleteSSTFile(&immutable_db_options_, fname, path_id);
+ } else {
+ file_deletion_status = env_->DeleteFile(fname);
+ }
+ if (file_deletion_status.ok()) {
+ ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
+ "[JOB %d] Delete %s type=%d #%" PRIu64 " -- %s\n", job_id,
+ fname.c_str(), type, number,
+ file_deletion_status.ToString().c_str());
+ } else if (env_->FileExists(fname).IsNotFound()) {
+ ROCKS_LOG_INFO(
+ immutable_db_options_.info_log,
+ "[JOB %d] Tried to delete a non-existing file %s type=%d #%" PRIu64
+ " -- %s\n",
+ job_id, fname.c_str(), type, number,
+ file_deletion_status.ToString().c_str());
+ } else {
+ ROCKS_LOG_ERROR(immutable_db_options_.info_log,
+ "[JOB %d] Failed to delete %s type=%d #%" PRIu64 " -- %s\n",
+ job_id, fname.c_str(), type, number,
+ file_deletion_status.ToString().c_str());
+ }
+ if (type == kTableFile) {
+ EventHelpers::LogAndNotifyTableFileDeletion(
+ &event_logger_, job_id, number, fname, file_deletion_status, GetName(),
+ immutable_db_options_.listeners);
+ }
+}
+
+// Diffs the files listed in filenames and those that do not
+// belong to live files are posibly removed. Also, removes all the
+// files in sst_delete_files and log_delete_files.
+// It is not necessary to hold the mutex when invoking this method.
+void DBImpl::PurgeObsoleteFiles(const JobContext& state, bool schedule_only) {
+ // we'd better have sth to delete
+ assert(state.HaveSomethingToDelete());
+
+ // this checks if FindObsoleteFiles() was run before. If not, don't do
+ // PurgeObsoleteFiles(). If FindObsoleteFiles() was run, we need to also
+ // run PurgeObsoleteFiles(), even if disable_delete_obsolete_files_ is true
+ if (state.manifest_file_number == 0) {
+ return;
+ }
+
+ // Now, convert live list to an unordered map, WITHOUT mutex held;
+ // set is slow.
+ std::unordered_map<uint64_t, const FileDescriptor*> sst_live_map;
+ for (const FileDescriptor& fd : state.sst_live) {
+ sst_live_map[fd.GetNumber()] = &fd;
+ }
+ std::unordered_set<uint64_t> log_recycle_files_set(
+ state.log_recycle_files.begin(), state.log_recycle_files.end());
+
+ auto candidate_files = state.full_scan_candidate_files;
+ candidate_files.reserve(
+ candidate_files.size() + state.sst_delete_files.size() +
+ state.log_delete_files.size() + state.manifest_delete_files.size());
+ // We may ignore the dbname when generating the file names.
+ const char* kDumbDbName = "";
+ for (auto file : state.sst_delete_files) {
+ candidate_files.emplace_back(
+ MakeTableFileName(kDumbDbName, file->fd.GetNumber()),
+ file->fd.GetPathId());
+ if (file->table_reader_handle) {
+ table_cache_->Release(file->table_reader_handle);
+ }
+ delete file;
+ }
+
+ for (auto file_num : state.log_delete_files) {
+ if (file_num > 0) {
+ candidate_files.emplace_back(LogFileName(kDumbDbName, file_num), 0);
+ }
+ }
+ for (const auto& filename : state.manifest_delete_files) {
+ candidate_files.emplace_back(filename, 0);
+ }
+
+ // dedup state.candidate_files so we don't try to delete the same
+ // file twice
+ std::sort(candidate_files.begin(), candidate_files.end(),
+ CompareCandidateFile);
+ candidate_files.erase(
+ std::unique(candidate_files.begin(), candidate_files.end()),
+ candidate_files.end());
+
+ if (state.prev_total_log_size > 0) {
+ ROCKS_LOG_INFO(immutable_db_options_.info_log,
+ "[JOB %d] Try to delete WAL files size %" PRIu64
+ ", prev total WAL file size %" PRIu64
+ ", number of live WAL files %" ROCKSDB_PRIszt ".\n",
+ state.job_id, state.size_log_to_delete,
+ state.prev_total_log_size, state.num_alive_log_files);
+ }
+
+ std::vector<std::string> old_info_log_files;
+ InfoLogPrefix info_log_prefix(!immutable_db_options_.db_log_dir.empty(),
+ dbname_);
+ for (const auto& candidate_file : candidate_files) {
+ std::string to_delete = candidate_file.file_name;
+ uint32_t path_id = candidate_file.path_id;
+ uint64_t number;
+ FileType type;
+ // Ignore file if we cannot recognize it.
+ if (!ParseFileName(to_delete, &number, info_log_prefix.prefix, &type)) {
+ continue;
+ }
+
+ bool keep = true;
+ switch (type) {
+ case kLogFile:
+ keep = ((number >= state.log_number) ||
+ (number == state.prev_log_number) ||
+ (log_recycle_files_set.find(number) !=
+ log_recycle_files_set.end()));
+ break;
+ case kDescriptorFile:
+ // Keep my manifest file, and any newer incarnations'
+ // (can happen during manifest roll)
+ keep = (number >= state.manifest_file_number);
+ break;
+ case kTableFile:
+ // If the second condition is not there, this makes
+ // DontDeletePendingOutputs fail
+ keep = (sst_live_map.find(number) != sst_live_map.end()) ||
+ number >= state.min_pending_output;
+ break;
+ case kTempFile:
+ // Any temp files that are currently being written to must
+ // be recorded in pending_outputs_, which is inserted into "live".
+ // Also, SetCurrentFile creates a temp file when writing out new
+ // manifest, which is equal to state.pending_manifest_file_number. We
+ // should not delete that file
+ //
+ // TODO(yhchiang): carefully modify the third condition to safely
+ // remove the temp options files.
+ keep = (sst_live_map.find(number) != sst_live_map.end()) ||
+ (number == state.pending_manifest_file_number) ||
+ (to_delete.find(kOptionsFileNamePrefix) != std::string::npos);
+ break;
+ case kInfoLogFile:
+ keep = true;
+ if (number != 0) {
+ old_info_log_files.push_back(to_delete);
+ }
+ break;
+ case kCurrentFile:
+ case kDBLockFile:
+ case kIdentityFile:
+ case kMetaDatabase:
+ case kOptionsFile:
+ case kBlobFile:
+ keep = true;
+ break;
+ }
+
+ if (keep) {
+ continue;
+ }
+
+ std::string fname;
+ if (type == kTableFile) {
+ // evict from cache
+ TableCache::Evict(table_cache_.get(), number);
+ fname = TableFileName(immutable_db_options_.db_paths, number, path_id);
+ } else {
+ fname = ((type == kLogFile) ? immutable_db_options_.wal_dir : dbname_) +
+ "/" + to_delete;
+ }
+
+#ifndef ROCKSDB_LITE
+ if (type == kLogFile && (immutable_db_options_.wal_ttl_seconds > 0 ||
+ immutable_db_options_.wal_size_limit_mb > 0)) {
+ wal_manager_.ArchiveWALFile(fname, number);
+ continue;
+ }
+#endif // !ROCKSDB_LITE
+
+ Status file_deletion_status;
+ if (schedule_only) {
+ InstrumentedMutexLock guard_lock(&mutex_);
+ SchedulePendingPurge(fname, type, number, path_id, state.job_id);
+ } else {
+ DeleteObsoleteFileImpl(file_deletion_status, state.job_id, fname, type,
+ number, path_id);
+ }
+ }
+
+ // Delete old info log files.
+ size_t old_info_log_file_count = old_info_log_files.size();
+ if (old_info_log_file_count != 0 &&
+ old_info_log_file_count >= immutable_db_options_.keep_log_file_num) {
+ std::sort(old_info_log_files.begin(), old_info_log_files.end());
+ size_t end =
+ old_info_log_file_count - immutable_db_options_.keep_log_file_num;
+ for (unsigned int i = 0; i <= end; i++) {
+ std::string& to_delete = old_info_log_files.at(i);
+ std::string full_path_to_delete =
+ (immutable_db_options_.db_log_dir.empty()
+ ? dbname_
+ : immutable_db_options_.db_log_dir) +
+ "/" + to_delete;
+ ROCKS_LOG_INFO(immutable_db_options_.info_log,
+ "[JOB %d] Delete info log file %s\n", state.job_id,
+ full_path_to_delete.c_str());
+ Status s = env_->DeleteFile(full_path_to_delete);
+ if (!s.ok()) {
+ if (env_->FileExists(full_path_to_delete).IsNotFound()) {
+ ROCKS_LOG_INFO(
+ immutable_db_options_.info_log,
+ "[JOB %d] Tried to delete non-existing info log file %s FAILED "
+ "-- %s\n",
+ state.job_id, to_delete.c_str(), s.ToString().c_str());
+ } else {
+ ROCKS_LOG_ERROR(immutable_db_options_.info_log,
+ "[JOB %d] Delete info log file %s FAILED -- %s\n",
+ state.job_id, to_delete.c_str(),
+ s.ToString().c_str());
+ }
+ }
+ }
+ }
+#ifndef ROCKSDB_LITE
+ wal_manager_.PurgeObsoleteWALFiles();
+#endif // ROCKSDB_LITE
+ LogFlush(immutable_db_options_.info_log);
+}
+
+void DBImpl::DeleteObsoleteFiles() {
+ mutex_.AssertHeld();
+ JobContext job_context(next_job_id_.fetch_add(1));
+ FindObsoleteFiles(&job_context, true);
+
+ mutex_.Unlock();
+ if (job_context.HaveSomethingToDelete()) {
+ PurgeObsoleteFiles(job_context);
+ }
+ job_context.Clean();
+ mutex_.Lock();
+}
+} // namespace rocksdb
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/db_impl_open.cc
----------------------------------------------------------------------
diff --git a/thirdparty/rocksdb/db/db_impl_open.cc b/thirdparty/rocksdb/db/db_impl_open.cc
new file mode 100644
index 0000000..bc94b60
--- /dev/null
+++ b/thirdparty/rocksdb/db/db_impl_open.cc
@@ -0,0 +1,1129 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+#include "db/db_impl.h"
+
+#ifndef __STDC_FORMAT_MACROS
+#define __STDC_FORMAT_MACROS
+#endif
+#include <inttypes.h>
+
+#include "db/builder.h"
+#include "options/options_helper.h"
+#include "rocksdb/wal_filter.h"
+#include "table/block_based_table_factory.h"
+#include "util/rate_limiter.h"
+#include "util/sst_file_manager_impl.h"
+#include "util/sync_point.h"
+
+namespace rocksdb {
+Options SanitizeOptions(const std::string& dbname,
+ const Options& src) {
+ auto db_options = SanitizeOptions(dbname, DBOptions(src));
+ ImmutableDBOptions immutable_db_options(db_options);
+ auto cf_options =
+ SanitizeOptions(immutable_db_options, ColumnFamilyOptions(src));
+ return Options(db_options, cf_options);
+}
+
+DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src) {
+ DBOptions result(src);
+
+ // result.max_open_files means an "infinite" open files.
+ if (result.max_open_files != -1) {
+ int max_max_open_files = port::GetMaxOpenFiles();
+ if (max_max_open_files == -1) {
+ max_max_open_files = 0x400000;
+ }
+ ClipToRange(&result.max_open_files, 20, max_max_open_files);
+ }
+
+ if (result.info_log == nullptr) {
+ Status s = CreateLoggerFromOptions(dbname, result, &result.info_log);
+ if (!s.ok()) {
+ // No place suitable for logging
+ result.info_log = nullptr;
+ }
+ }
+
+ if (!result.write_buffer_manager) {
+ result.write_buffer_manager.reset(
+ new WriteBufferManager(result.db_write_buffer_size));
+ }
+ auto bg_job_limits = DBImpl::GetBGJobLimits(result.max_background_flushes,
+ result.max_background_compactions,
+ result.max_background_jobs,
+ true /* parallelize_compactions */);
+ result.env->IncBackgroundThreadsIfNeeded(bg_job_limits.max_compactions,
+ Env::Priority::LOW);
+ result.env->IncBackgroundThreadsIfNeeded(bg_job_limits.max_flushes,
+ Env::Priority::HIGH);
+
+ if (result.rate_limiter.get() != nullptr) {
+ if (result.bytes_per_sync == 0) {
+ result.bytes_per_sync = 1024 * 1024;
+ }
+ }
+
+ if (result.delayed_write_rate == 0) {
+ if (result.rate_limiter.get() != nullptr) {
+ result.delayed_write_rate = result.rate_limiter->GetBytesPerSecond();
+ }
+ if (result.delayed_write_rate == 0) {
+ result.delayed_write_rate = 16 * 1024 * 1024;
+ }
+ }
+
+ if (result.WAL_ttl_seconds > 0 || result.WAL_size_limit_MB > 0) {
+ result.recycle_log_file_num = false;
+ }
+
+ if (result.recycle_log_file_num &&
+ (result.wal_recovery_mode == WALRecoveryMode::kPointInTimeRecovery ||
+ result.wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency)) {
+ // kPointInTimeRecovery is indistinguishable from
+ // kTolerateCorruptedTailRecords in recycle mode since we define
+ // the "end" of the log as the first corrupt record we encounter.
+ // kAbsoluteConsistency doesn't make sense because even a clean
+ // shutdown leaves old junk at the end of the log file.
+ result.wal_recovery_mode = WALRecoveryMode::kTolerateCorruptedTailRecords;
+ }
+
+ if (result.wal_dir.empty()) {
+ // Use dbname as default
+ result.wal_dir = dbname;
+ }
+ if (result.wal_dir.back() == '/') {
+ result.wal_dir = result.wal_dir.substr(0, result.wal_dir.size() - 1);
+ }
+
+ if (result.db_paths.size() == 0) {
+ result.db_paths.emplace_back(dbname, std::numeric_limits<uint64_t>::max());
+ }
+
+ if (result.use_direct_io_for_flush_and_compaction &&
+ result.compaction_readahead_size == 0) {
+ TEST_SYNC_POINT_CALLBACK("SanitizeOptions:direct_io", nullptr);
+ result.compaction_readahead_size = 1024 * 1024 * 2;
+ }
+
+ if (result.compaction_readahead_size > 0 ||
+ result.use_direct_io_for_flush_and_compaction) {
+ result.new_table_reader_for_compaction_inputs = true;
+ }
+
+ // Force flush on DB open if 2PC is enabled, since with 2PC we have no
+ // guarantee that consecutive log files have consecutive sequence id, which
+ // make recovery complicated.
+ if (result.allow_2pc) {
+ result.avoid_flush_during_recovery = false;
+ }
+
+ return result;
+}
+
+namespace {
+
+Status SanitizeOptionsByTable(
+ const DBOptions& db_opts,
+ const std::vector<ColumnFamilyDescriptor>& column_families) {
+ Status s;
+ for (auto cf : column_families) {
+ s = cf.options.table_factory->SanitizeOptions(db_opts, cf.options);
+ if (!s.ok()) {
+ return s;
+ }
+ }
+ return Status::OK();
+}
+
+static Status ValidateOptions(
+ const DBOptions& db_options,
+ const std::vector<ColumnFamilyDescriptor>& column_families) {
+ Status s;
+
+ for (auto& cfd : column_families) {
+ s = CheckCompressionSupported(cfd.options);
+ if (s.ok() && db_options.allow_concurrent_memtable_write) {
+ s = CheckConcurrentWritesSupported(cfd.options);
+ }
+ if (!s.ok()) {
+ return s;
+ }
+ if (db_options.db_paths.size() > 1) {
+ if ((cfd.options.compaction_style != kCompactionStyleUniversal) &&
+ (cfd.options.compaction_style != kCompactionStyleLevel)) {
+ return Status::NotSupported(
+ "More than one DB paths are only supported in "
+ "universal and level compaction styles. ");
+ }
+ }
+ if (cfd.options.compaction_options_fifo.ttl > 0) {
+ if (db_options.max_open_files != -1) {
+ return Status::NotSupported(
+ "FIFO Compaction with TTL is only supported when files are always "
+ "kept open (set max_open_files = -1). ");
+ }
+ if (cfd.options.table_factory->Name() !=
+ BlockBasedTableFactory().Name()) {
+ return Status::NotSupported(
+ "FIFO Compaction with TTL is only supported in "
+ "Block-Based Table format. ");
+ }
+ }
+ }
+
+ if (db_options.db_paths.size() > 4) {
+ return Status::NotSupported(
+ "More than four DB paths are not supported yet. ");
+ }
+
+ if (db_options.allow_mmap_reads && db_options.use_direct_reads) {
+ // Protect against assert in PosixMMapReadableFile constructor
+ return Status::NotSupported(
+ "If memory mapped reads (allow_mmap_reads) are enabled "
+ "then direct I/O reads (use_direct_reads) must be disabled. ");
+ }
+
+ if (db_options.allow_mmap_writes &&
+ db_options.use_direct_io_for_flush_and_compaction) {
+ return Status::NotSupported(
+ "If memory mapped writes (allow_mmap_writes) are enabled "
+ "then direct I/O writes (use_direct_io_for_flush_and_compaction) must "
+ "be disabled. ");
+ }
+
+ if (db_options.keep_log_file_num == 0) {
+ return Status::InvalidArgument("keep_log_file_num must be greater than 0");
+ }
+
+ return Status::OK();
+}
+} // namespace
+Status DBImpl::NewDB() {
+ VersionEdit new_db;
+ new_db.SetLogNumber(0);
+ new_db.SetNextFile(2);
+ new_db.SetLastSequence(0);
+
+ Status s;
+
+ ROCKS_LOG_INFO(immutable_db_options_.info_log, "Creating manifest 1 \n");
+ const std::string manifest = DescriptorFileName(dbname_, 1);
+ {
+ unique_ptr<WritableFile> file;
+ EnvOptions env_options = env_->OptimizeForManifestWrite(env_options_);
+ s = NewWritableFile(env_, manifest, &file, env_options);
+ if (!s.ok()) {
+ return s;
+ }
+ file->SetPreallocationBlockSize(
+ immutable_db_options_.manifest_preallocation_size);
+ unique_ptr<WritableFileWriter> file_writer(
+ new WritableFileWriter(std::move(file), env_options));
+ log::Writer log(std::move(file_writer), 0, false);
+ std::string record;
+ new_db.EncodeTo(&record);
+ s = log.AddRecord(record);
+ if (s.ok()) {
+ s = SyncManifest(env_, &immutable_db_options_, log.file());
+ }
+ }
+ if (s.ok()) {
+ // Make "CURRENT" file that points to the new manifest file.
+ s = SetCurrentFile(env_, dbname_, 1, directories_.GetDbDir());
+ } else {
+ env_->DeleteFile(manifest);
+ }
+ return s;
+}
+
+Status DBImpl::Directories::CreateAndNewDirectory(
+ Env* env, const std::string& dirname,
+ std::unique_ptr<Directory>* directory) const {
+ // We call CreateDirIfMissing() as the directory may already exist (if we
+ // are reopening a DB), when this happens we don't want creating the
+ // directory to cause an error. However, we need to check if creating the
+ // directory fails or else we may get an obscure message about the lock
+ // file not existing. One real-world example of this occurring is if
+ // env->CreateDirIfMissing() doesn't create intermediate directories, e.g.
+ // when dbname_ is "dir/db" but when "dir" doesn't exist.
+ Status s = env->CreateDirIfMissing(dirname);
+ if (!s.ok()) {
+ return s;
+ }
+ return env->NewDirectory(dirname, directory);
+}
+
+Status DBImpl::Directories::SetDirectories(
+ Env* env, const std::string& dbname, const std::string& wal_dir,
+ const std::vector<DbPath>& data_paths) {
+ Status s = CreateAndNewDirectory(env, dbname, &db_dir_);
+ if (!s.ok()) {
+ return s;
+ }
+ if (!wal_dir.empty() && dbname != wal_dir) {
+ s = CreateAndNewDirectory(env, wal_dir, &wal_dir_);
+ if (!s.ok()) {
+ return s;
+ }
+ }
+
+ data_dirs_.clear();
+ for (auto& p : data_paths) {
+ const std::string db_path = p.path;
+ if (db_path == dbname) {
+ data_dirs_.emplace_back(nullptr);
+ } else {
+ std::unique_ptr<Directory> path_directory;
+ s = CreateAndNewDirectory(env, db_path, &path_directory);
+ if (!s.ok()) {
+ return s;
+ }
+ data_dirs_.emplace_back(path_directory.release());
+ }
+ }
+ assert(data_dirs_.size() == data_paths.size());
+ return Status::OK();
+}
+
+Status DBImpl::Recover(
+ const std::vector<ColumnFamilyDescriptor>& column_families, bool read_only,
+ bool error_if_log_file_exist, bool error_if_data_exists_in_logs) {
+ mutex_.AssertHeld();
+
+ bool is_new_db = false;
+ assert(db_lock_ == nullptr);
+ if (!read_only) {
+ Status s = directories_.SetDirectories(env_, dbname_,
+ immutable_db_options_.wal_dir,
+ immutable_db_options_.db_paths);
+ if (!s.ok()) {
+ return s;
+ }
+
+ s = env_->LockFile(LockFileName(dbname_), &db_lock_);
+ if (!s.ok()) {
+ return s;
+ }
+
+ s = env_->FileExists(CurrentFileName(dbname_));
+ if (s.IsNotFound()) {
+ if (immutable_db_options_.create_if_missing) {
+ s = NewDB();
+ is_new_db = true;
+ if (!s.ok()) {
+ return s;
+ }
+ } else {
+ return Status::InvalidArgument(
+ dbname_, "does not exist (create_if_missing is false)");
+ }
+ } else if (s.ok()) {
+ if (immutable_db_options_.error_if_exists) {
+ return Status::InvalidArgument(
+ dbname_, "exists (error_if_exists is true)");
+ }
+ } else {
+ // Unexpected error reading file
+ assert(s.IsIOError());
+ return s;
+ }
+ // Check for the IDENTITY file and create it if not there
+ s = env_->FileExists(IdentityFileName(dbname_));
+ if (s.IsNotFound()) {
+ s = SetIdentityFile(env_, dbname_);
+ if (!s.ok()) {
+ return s;
+ }
+ } else if (!s.ok()) {
+ assert(s.IsIOError());
+ return s;
+ }
+ }
+
+ Status s = versions_->Recover(column_families, read_only);
+ if (immutable_db_options_.paranoid_checks && s.ok()) {
+ s = CheckConsistency();
+ }
+ if (s.ok()) {
+ SequenceNumber next_sequence(kMaxSequenceNumber);
+ default_cf_handle_ = new ColumnFamilyHandleImpl(
+ versions_->GetColumnFamilySet()->GetDefault(), this, &mutex_);
+ default_cf_internal_stats_ = default_cf_handle_->cfd()->internal_stats();
+ single_column_family_mode_ =
+ versions_->GetColumnFamilySet()->NumberOfColumnFamilies() == 1;
+
+ // Recover from all newer log files than the ones named in the
+ // descriptor (new log files may have been added by the previous
+ // incarnation without registering them in the descriptor).
+ //
+ // Note that prev_log_number() is no longer used, but we pay
+ // attention to it in case we are recovering a database
+ // produced by an older version of rocksdb.
+ std::vector<std::string> filenames;
+ s = env_->GetChildren(immutable_db_options_.wal_dir, &filenames);
+ if (!s.ok()) {
+ return s;
+ }
+
+ std::vector<uint64_t> logs;
+ for (size_t i = 0; i < filenames.size(); i++) {
+ uint64_t number;
+ FileType type;
+ if (ParseFileName(filenames[i], &number, &type) && type == kLogFile) {
+ if (is_new_db) {
+ return Status::Corruption(
+ "While creating a new Db, wal_dir contains "
+ "existing log file: ",
+ filenames[i]);
+ } else {
+ logs.push_back(number);
+ }
+ }
+ }
+
+ if (logs.size() > 0) {
+ if (error_if_log_file_exist) {
+ return Status::Corruption(
+ "The db was opened in readonly mode with error_if_log_file_exist"
+ "flag but a log file already exists");
+ } else if (error_if_data_exists_in_logs) {
+ for (auto& log : logs) {
+ std::string fname = LogFileName(immutable_db_options_.wal_dir, log);
+ uint64_t bytes;
+ s = env_->GetFileSize(fname, &bytes);
+ if (s.ok()) {
+ if (bytes > 0) {
+ return Status::Corruption(
+ "error_if_data_exists_in_logs is set but there are data "
+ " in log files.");
+ }
+ }
+ }
+ }
+ }
+
+ if (!logs.empty()) {
+ // Recover in the order in which the logs were generated
+ std::sort(logs.begin(), logs.end());
+ s = RecoverLogFiles(logs, &next_sequence, read_only);
+ if (!s.ok()) {
+ // Clear memtables if recovery failed
+ for (auto cfd : *versions_->GetColumnFamilySet()) {
+ cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions(),
+ kMaxSequenceNumber);
+ }
+ }
+ }
+ }
+
+ // Initial value
+ max_total_in_memory_state_ = 0;
+ for (auto cfd : *versions_->GetColumnFamilySet()) {
+ auto* mutable_cf_options = cfd->GetLatestMutableCFOptions();
+ max_total_in_memory_state_ += mutable_cf_options->write_buffer_size *
+ mutable_cf_options->max_write_buffer_number;
+ }
+
+ return s;
+}
+
+// REQUIRES: log_numbers are sorted in ascending order
+Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
+ SequenceNumber* next_sequence, bool read_only) {
+ struct LogReporter : public log::Reader::Reporter {
+ Env* env;
+ Logger* info_log;
+ const char* fname;
+ Status* status; // nullptr if immutable_db_options_.paranoid_checks==false
+ virtual void Corruption(size_t bytes, const Status& s) override {
+ ROCKS_LOG_WARN(info_log, "%s%s: dropping %d bytes; %s",
+ (this->status == nullptr ? "(ignoring error) " : ""),
+ fname, static_cast<int>(bytes), s.ToString().c_str());
+ if (this->status != nullptr && this->status->ok()) {
+ *this->status = s;
+ }
+ }
+ };
+
+ mutex_.AssertHeld();
+ Status status;
+ std::unordered_map<int, VersionEdit> version_edits;
+ // no need to refcount because iteration is under mutex
+ for (auto cfd : *versions_->GetColumnFamilySet()) {
+ VersionEdit edit;
+ edit.SetColumnFamily(cfd->GetID());
+ version_edits.insert({cfd->GetID(), edit});
+ }
+ int job_id = next_job_id_.fetch_add(1);
+ {
+ auto stream = event_logger_.Log();
+ stream << "job" << job_id << "event"
+ << "recovery_started";
+ stream << "log_files";
+ stream.StartArray();
+ for (auto log_number : log_numbers) {
+ stream << log_number;
+ }
+ stream.EndArray();
+ }
+
+#ifndef ROCKSDB_LITE
+ if (immutable_db_options_.wal_filter != nullptr) {
+ std::map<std::string, uint32_t> cf_name_id_map;
+ std::map<uint32_t, uint64_t> cf_lognumber_map;
+ for (auto cfd : *versions_->GetColumnFamilySet()) {
+ cf_name_id_map.insert(
+ std::make_pair(cfd->GetName(), cfd->GetID()));
+ cf_lognumber_map.insert(
+ std::make_pair(cfd->GetID(), cfd->GetLogNumber()));
+ }
+
+ immutable_db_options_.wal_filter->ColumnFamilyLogNumberMap(cf_lognumber_map,
+ cf_name_id_map);
+ }
+#endif
+
+ bool stop_replay_by_wal_filter = false;
+ bool stop_replay_for_corruption = false;
+ bool flushed = false;
+ for (auto log_number : log_numbers) {
+ // The previous incarnation may not have written any MANIFEST
+ // records after allocating this log number. So we manually
+ // update the file number allocation counter in VersionSet.
+ versions_->MarkFileNumberUsedDuringRecovery(log_number);
+ // Open the log file
+ std::string fname = LogFileName(immutable_db_options_.wal_dir, log_number);
+
+ ROCKS_LOG_INFO(immutable_db_options_.info_log,
+ "Recovering log #%" PRIu64 " mode %d", log_number,
+ immutable_db_options_.wal_recovery_mode);
+ auto logFileDropped = [this, &fname]() {
+ uint64_t bytes;
+ if (env_->GetFileSize(fname, &bytes).ok()) {
+ auto info_log = immutable_db_options_.info_log.get();
+ ROCKS_LOG_WARN(info_log, "%s: dropping %d bytes", fname.c_str(),
+ static_cast<int>(bytes));
+ }
+ };
+ if (stop_replay_by_wal_filter) {
+ logFileDropped();
+ continue;
+ }
+
+ unique_ptr<SequentialFileReader> file_reader;
+ {
+ unique_ptr<SequentialFile> file;
+ status = env_->NewSequentialFile(fname, &file,
+ env_->OptimizeForLogRead(env_options_));
+ if (!status.ok()) {
+ MaybeIgnoreError(&status);
+ if (!status.ok()) {
+ return status;
+ } else {
+ // Fail with one log file, but that's ok.
+ // Try next one.
+ continue;
+ }
+ }
+ file_reader.reset(new SequentialFileReader(std::move(file)));
+ }
+
+ // Create the log reader.
+ LogReporter reporter;
+ reporter.env = env_;
+ reporter.info_log = immutable_db_options_.info_log.get();
+ reporter.fname = fname.c_str();
+ if (!immutable_db_options_.paranoid_checks ||
+ immutable_db_options_.wal_recovery_mode ==
+ WALRecoveryMode::kSkipAnyCorruptedRecords) {
+ reporter.status = nullptr;
+ } else {
+ reporter.status = &status;
+ }
+ // We intentially make log::Reader do checksumming even if
+ // paranoid_checks==false so that corruptions cause entire commits
+ // to be skipped instead of propagating bad information (like overly
+ // large sequence numbers).
+ log::Reader reader(immutable_db_options_.info_log, std::move(file_reader),
+ &reporter, true /*checksum*/, 0 /*initial_offset*/,
+ log_number);
+
+ // Determine if we should tolerate incomplete records at the tail end of the
+ // Read all the records and add to a memtable
+ std::string scratch;
+ Slice record;
+ WriteBatch batch;
+
+ while (!stop_replay_by_wal_filter &&
+ reader.ReadRecord(&record, &scratch,
+ immutable_db_options_.wal_recovery_mode) &&
+ status.ok()) {
+ if (record.size() < WriteBatchInternal::kHeader) {
+ reporter.Corruption(record.size(),
+ Status::Corruption("log record too small"));
+ continue;
+ }
+ WriteBatchInternal::SetContents(&batch, record);
+ SequenceNumber sequence = WriteBatchInternal::Sequence(&batch);
+
+ if (immutable_db_options_.wal_recovery_mode ==
+ WALRecoveryMode::kPointInTimeRecovery) {
+ // In point-in-time recovery mode, if sequence id of log files are
+ // consecutive, we continue recovery despite corruption. This could
+ // happen when we open and write to a corrupted DB, where sequence id
+ // will start from the last sequence id we recovered.
+ if (sequence == *next_sequence) {
+ stop_replay_for_corruption = false;
+ }
+ if (stop_replay_for_corruption) {
+ logFileDropped();
+ break;
+ }
+ }
+
+#ifndef ROCKSDB_LITE
+ if (immutable_db_options_.wal_filter != nullptr) {
+ WriteBatch new_batch;
+ bool batch_changed = false;
+
+ WalFilter::WalProcessingOption wal_processing_option =
+ immutable_db_options_.wal_filter->LogRecordFound(
+ log_number, fname, batch, &new_batch, &batch_changed);
+
+ switch (wal_processing_option) {
+ case WalFilter::WalProcessingOption::kContinueProcessing:
+ // do nothing, proceeed normally
+ break;
+ case WalFilter::WalProcessingOption::kIgnoreCurrentRecord:
+ // skip current record
+ continue;
+ case WalFilter::WalProcessingOption::kStopReplay:
+ // skip current record and stop replay
+ stop_replay_by_wal_filter = true;
+ continue;
+ case WalFilter::WalProcessingOption::kCorruptedRecord: {
+ status =
+ Status::Corruption("Corruption reported by Wal Filter ",
+ immutable_db_options_.wal_filter->Name());
+ MaybeIgnoreError(&status);
+ if (!status.ok()) {
+ reporter.Corruption(record.size(), status);
+ continue;
+ }
+ break;
+ }
+ default: {
+ assert(false); // unhandled case
+ status = Status::NotSupported(
+ "Unknown WalProcessingOption returned"
+ " by Wal Filter ",
+ immutable_db_options_.wal_filter->Name());
+ MaybeIgnoreError(&status);
+ if (!status.ok()) {
+ return status;
+ } else {
+ // Ignore the error with current record processing.
+ continue;
+ }
+ }
+ }
+
+ if (batch_changed) {
+ // Make sure that the count in the new batch is
+ // within the orignal count.
+ int new_count = WriteBatchInternal::Count(&new_batch);
+ int original_count = WriteBatchInternal::Count(&batch);
+ if (new_count > original_count) {
+ ROCKS_LOG_FATAL(
+ immutable_db_options_.info_log,
+ "Recovering log #%" PRIu64
+ " mode %d log filter %s returned "
+ "more records (%d) than original (%d) which is not allowed. "
+ "Aborting recovery.",
+ log_number, immutable_db_options_.wal_recovery_mode,
+ immutable_db_options_.wal_filter->Name(), new_count,
+ original_count);
+ status = Status::NotSupported(
+ "More than original # of records "
+ "returned by Wal Filter ",
+ immutable_db_options_.wal_filter->Name());
+ return status;
+ }
+ // Set the same sequence number in the new_batch
+ // as the original batch.
+ WriteBatchInternal::SetSequence(&new_batch,
+ WriteBatchInternal::Sequence(&batch));
+ batch = new_batch;
+ }
+ }
+#endif // ROCKSDB_LITE
+
+ // If column family was not found, it might mean that the WAL write
+ // batch references to the column family that was dropped after the
+ // insert. We don't want to fail the whole write batch in that case --
+ // we just ignore the update.
+ // That's why we set ignore missing column families to true
+ bool has_valid_writes = false;
+ status = WriteBatchInternal::InsertInto(
+ &batch, column_family_memtables_.get(), &flush_scheduler_, true,
+ log_number, this, false /* concurrent_memtable_writes */,
+ next_sequence, &has_valid_writes);
+ MaybeIgnoreError(&status);
+ if (!status.ok()) {
+ // We are treating this as a failure while reading since we read valid
+ // blocks that do not form coherent data
+ reporter.Corruption(record.size(), status);
+ continue;
+ }
+
+ if (has_valid_writes && !read_only) {
+ // we can do this because this is called before client has access to the
+ // DB and there is only a single thread operating on DB
+ ColumnFamilyData* cfd;
+
+ while ((cfd = flush_scheduler_.TakeNextColumnFamily()) != nullptr) {
+ cfd->Unref();
+ // If this asserts, it means that InsertInto failed in
+ // filtering updates to already-flushed column families
+ assert(cfd->GetLogNumber() <= log_number);
+ auto iter = version_edits.find(cfd->GetID());
+ assert(iter != version_edits.end());
+ VersionEdit* edit = &iter->second;
+ status = WriteLevel0TableForRecovery(job_id, cfd, cfd->mem(), edit);
+ if (!status.ok()) {
+ // Reflect errors immediately so that conditions like full
+ // file-systems cause the DB::Open() to fail.
+ return status;
+ }
+ flushed = true;
+
+ cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions(),
+ *next_sequence);
+ }
+ }
+ }
+
+ if (!status.ok()) {
+ if (immutable_db_options_.wal_recovery_mode ==
+ WALRecoveryMode::kSkipAnyCorruptedRecords) {
+ // We should ignore all errors unconditionally
+ status = Status::OK();
+ } else if (immutable_db_options_.wal_recovery_mode ==
+ WALRecoveryMode::kPointInTimeRecovery) {
+ // We should ignore the error but not continue replaying
+ status = Status::OK();
+ stop_replay_for_corruption = true;
+ ROCKS_LOG_INFO(immutable_db_options_.info_log,
+ "Point in time recovered to log #%" PRIu64
+ " seq #%" PRIu64,
+ log_number, *next_sequence);
+ } else {
+ assert(immutable_db_options_.wal_recovery_mode ==
+ WALRecoveryMode::kTolerateCorruptedTailRecords ||
+ immutable_db_options_.wal_recovery_mode ==
+ WALRecoveryMode::kAbsoluteConsistency);
+ return status;
+ }
+ }
+
+ flush_scheduler_.Clear();
+ auto last_sequence = *next_sequence - 1;
+ if ((*next_sequence != kMaxSequenceNumber) &&
+ (versions_->LastSequence() <= last_sequence)) {
+ versions_->SetLastToBeWrittenSequence(last_sequence);
+ versions_->SetLastSequence(last_sequence);
+ }
+ }
+
+ // True if there's any data in the WALs; if not, we can skip re-processing
+ // them later
+ bool data_seen = false;
+ if (!read_only) {
+ // no need to refcount since client still doesn't have access
+ // to the DB and can not drop column families while we iterate
+ auto max_log_number = log_numbers.back();
+ for (auto cfd : *versions_->GetColumnFamilySet()) {
+ auto iter = version_edits.find(cfd->GetID());
+ assert(iter != version_edits.end());
+ VersionEdit* edit = &iter->second;
+
+ if (cfd->GetLogNumber() > max_log_number) {
+ // Column family cfd has already flushed the data
+ // from all logs. Memtable has to be empty because
+ // we filter the updates based on log_number
+ // (in WriteBatch::InsertInto)
+ assert(cfd->mem()->GetFirstSequenceNumber() == 0);
+ assert(edit->NumEntries() == 0);
+ continue;
+ }
+
+ // flush the final memtable (if non-empty)
+ if (cfd->mem()->GetFirstSequenceNumber() != 0) {
+ // If flush happened in the middle of recovery (e.g. due to memtable
+ // being full), we flush at the end. Otherwise we'll need to record
+ // where we were on last flush, which make the logic complicated.
+ if (flushed || !immutable_db_options_.avoid_flush_during_recovery) {
+ status = WriteLevel0TableForRecovery(job_id, cfd, cfd->mem(), edit);
+ if (!status.ok()) {
+ // Recovery failed
+ break;
+ }
+ flushed = true;
+
+ cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions(),
+ versions_->LastSequence());
+ }
+ data_seen = true;
+ }
+
+ // write MANIFEST with update
+ // writing log_number in the manifest means that any log file
+ // with number strongly less than (log_number + 1) is already
+ // recovered and should be ignored on next reincarnation.
+ // Since we already recovered max_log_number, we want all logs
+ // with numbers `<= max_log_number` (includes this one) to be ignored
+ if (flushed || cfd->mem()->GetFirstSequenceNumber() == 0) {
+ edit->SetLogNumber(max_log_number + 1);
+ }
+ // we must mark the next log number as used, even though it's
+ // not actually used. that is because VersionSet assumes
+ // VersionSet::next_file_number_ always to be strictly greater than any
+ // log number
+ versions_->MarkFileNumberUsedDuringRecovery(max_log_number + 1);
+ status = versions_->LogAndApply(
+ cfd, *cfd->GetLatestMutableCFOptions(), edit, &mutex_);
+ if (!status.ok()) {
+ // Recovery failed
+ break;
+ }
+ }
+ }
+
+ if (data_seen && !flushed) {
+ // Mark these as alive so they'll be considered for deletion later by
+ // FindObsoleteFiles()
+ if (concurrent_prepare_) {
+ log_write_mutex_.Lock();
+ }
+ for (auto log_number : log_numbers) {
+ alive_log_files_.push_back(LogFileNumberSize(log_number));
+ }
+ if (concurrent_prepare_) {
+ log_write_mutex_.Unlock();
+ }
+ }
+
+ event_logger_.Log() << "job" << job_id << "event"
+ << "recovery_finished";
+
+ return status;
+}
+
+Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
+ MemTable* mem, VersionEdit* edit) {
+ mutex_.AssertHeld();
+ const uint64_t start_micros = env_->NowMicros();
+ FileMetaData meta;
+ auto pending_outputs_inserted_elem =
+ CaptureCurrentFileNumberInPendingOutputs();
+ meta.fd = FileDescriptor(versions_->NewFileNumber(), 0, 0);
+ ReadOptions ro;
+ ro.total_order_seek = true;
+ Arena arena;
+ Status s;
+ TableProperties table_properties;
+ {
+ ScopedArenaIterator iter(mem->NewIterator(ro, &arena));
+ ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
+ "[%s] [WriteLevel0TableForRecovery]"
+ " Level-0 table #%" PRIu64 ": started",
+ cfd->GetName().c_str(), meta.fd.GetNumber());
+
+ // Get the latest mutable cf options while the mutex is still locked
+ const MutableCFOptions mutable_cf_options =
+ *cfd->GetLatestMutableCFOptions();
+ bool paranoid_file_checks =
+ cfd->GetLatestMutableCFOptions()->paranoid_file_checks;
+
+ int64_t _current_time = 0;
+ env_->GetCurrentTime(&_current_time); // ignore error
+ const uint64_t current_time = static_cast<uint64_t>(_current_time);
+
+ {
+ mutex_.Unlock();
+
+ SequenceNumber earliest_write_conflict_snapshot;
+ std::vector<SequenceNumber> snapshot_seqs =
+ snapshots_.GetAll(&earliest_write_conflict_snapshot);
+
+ EnvOptions optimized_env_options =
+ env_->OptimizeForCompactionTableWrite(env_options_, immutable_db_options_);
+ s = BuildTable(
+ dbname_, env_, *cfd->ioptions(), mutable_cf_options,
+ optimized_env_options, cfd->table_cache(), iter.get(),
+ std::unique_ptr<InternalIterator>(mem->NewRangeTombstoneIterator(ro)),
+ &meta, cfd->internal_comparator(),
+ cfd->int_tbl_prop_collector_factories(), cfd->GetID(), cfd->GetName(),
+ snapshot_seqs, earliest_write_conflict_snapshot,
+ GetCompressionFlush(*cfd->ioptions(), mutable_cf_options),
+ cfd->ioptions()->compression_opts, paranoid_file_checks,
+ cfd->internal_stats(), TableFileCreationReason::kRecovery,
+ &event_logger_, job_id, Env::IO_HIGH, nullptr /* table_properties */,
+ -1 /* level */, current_time);
+ LogFlush(immutable_db_options_.info_log);
+ ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
+ "[%s] [WriteLevel0TableForRecovery]"
+ " Level-0 table #%" PRIu64 ": %" PRIu64 " bytes %s",
+ cfd->GetName().c_str(), meta.fd.GetNumber(),
+ meta.fd.GetFileSize(), s.ToString().c_str());
+ mutex_.Lock();
+ }
+ }
+ ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem);
+
+ // Note that if file_size is zero, the file has been deleted and
+ // should not be added to the manifest.
+ int level = 0;
+ if (s.ok() && meta.fd.GetFileSize() > 0) {
+ edit->AddFile(level, meta.fd.GetNumber(), meta.fd.GetPathId(),
+ meta.fd.GetFileSize(), meta.smallest, meta.largest,
+ meta.smallest_seqno, meta.largest_seqno,
+ meta.marked_for_compaction);
+ }
+
+ InternalStats::CompactionStats stats(1);
+ stats.micros = env_->NowMicros() - start_micros;
+ stats.bytes_written = meta.fd.GetFileSize();
+ stats.num_output_files = 1;
+ cfd->internal_stats()->AddCompactionStats(level, stats);
+ cfd->internal_stats()->AddCFStats(
+ InternalStats::BYTES_FLUSHED, meta.fd.GetFileSize());
+ RecordTick(stats_, COMPACT_WRITE_BYTES, meta.fd.GetFileSize());
+ return s;
+}
+
+Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
+ DBOptions db_options(options);
+ ColumnFamilyOptions cf_options(options);
+ std::vector<ColumnFamilyDescriptor> column_families;
+ column_families.push_back(
+ ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options));
+ std::vector<ColumnFamilyHandle*> handles;
+ Status s = DB::Open(db_options, dbname, column_families, &handles, dbptr);
+ if (s.ok()) {
+ assert(handles.size() == 1);
+ // i can delete the handle since DBImpl is always holding a reference to
+ // default column family
+ delete handles[0];
+ }
+ return s;
+}
+
+Status DB::Open(const DBOptions& db_options, const std::string& dbname,
+ const std::vector<ColumnFamilyDescriptor>& column_families,
+ std::vector<ColumnFamilyHandle*>* handles, DB** dbptr) {
+ Status s = SanitizeOptionsByTable(db_options, column_families);
+ if (!s.ok()) {
+ return s;
+ }
+
+ s = ValidateOptions(db_options, column_families);
+ if (!s.ok()) {
+ return s;
+ }
+
+ *dbptr = nullptr;
+ handles->clear();
+
+ size_t max_write_buffer_size = 0;
+ for (auto cf : column_families) {
+ max_write_buffer_size =
+ std::max(max_write_buffer_size, cf.options.write_buffer_size);
+ }
+
+ DBImpl* impl = new DBImpl(db_options, dbname);
+ s = impl->env_->CreateDirIfMissing(impl->immutable_db_options_.wal_dir);
+ if (s.ok()) {
+ for (auto db_path : impl->immutable_db_options_.db_paths) {
+ s = impl->env_->CreateDirIfMissing(db_path.path);
+ if (!s.ok()) {
+ break;
+ }
+ }
+ }
+
+ if (!s.ok()) {
+ delete impl;
+ return s;
+ }
+
+ s = impl->CreateArchivalDirectory();
+ if (!s.ok()) {
+ delete impl;
+ return s;
+ }
+ impl->mutex_.Lock();
+ // Handles create_if_missing, error_if_exists
+ s = impl->Recover(column_families);
+ if (s.ok()) {
+ uint64_t new_log_number = impl->versions_->NewFileNumber();
+ unique_ptr<WritableFile> lfile;
+ EnvOptions soptions(db_options);
+ EnvOptions opt_env_options =
+ impl->immutable_db_options_.env->OptimizeForLogWrite(
+ soptions, BuildDBOptions(impl->immutable_db_options_,
+ impl->mutable_db_options_));
+ s = NewWritableFile(
+ impl->immutable_db_options_.env,
+ LogFileName(impl->immutable_db_options_.wal_dir, new_log_number),
+ &lfile, opt_env_options);
+ if (s.ok()) {
+ lfile->SetPreallocationBlockSize(
+ impl->GetWalPreallocateBlockSize(max_write_buffer_size));
+ {
+ InstrumentedMutexLock wl(&impl->log_write_mutex_);
+ impl->logfile_number_ = new_log_number;
+ unique_ptr<WritableFileWriter> file_writer(
+ new WritableFileWriter(std::move(lfile), opt_env_options));
+ impl->logs_.emplace_back(
+ new_log_number,
+ new log::Writer(
+ std::move(file_writer), new_log_number,
+ impl->immutable_db_options_.recycle_log_file_num > 0));
+ }
+
+ // set column family handles
+ for (auto cf : column_families) {
+ auto cfd =
+ impl->versions_->GetColumnFamilySet()->GetColumnFamily(cf.name);
+ if (cfd != nullptr) {
+ handles->push_back(
+ new ColumnFamilyHandleImpl(cfd, impl, &impl->mutex_));
+ impl->NewThreadStatusCfInfo(cfd);
+ } else {
+ if (db_options.create_missing_column_families) {
+ // missing column family, create it
+ ColumnFamilyHandle* handle;
+ impl->mutex_.Unlock();
+ s = impl->CreateColumnFamily(cf.options, cf.name, &handle);
+ impl->mutex_.Lock();
+ if (s.ok()) {
+ handles->push_back(handle);
+ } else {
+ break;
+ }
+ } else {
+ s = Status::InvalidArgument("Column family not found: ", cf.name);
+ break;
+ }
+ }
+ }
+ }
+ if (s.ok()) {
+ for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
+ delete impl->InstallSuperVersionAndScheduleWork(
+ cfd, nullptr, *cfd->GetLatestMutableCFOptions());
+ }
+ if (impl->concurrent_prepare_) {
+ impl->log_write_mutex_.Lock();
+ }
+ impl->alive_log_files_.push_back(
+ DBImpl::LogFileNumberSize(impl->logfile_number_));
+ if (impl->concurrent_prepare_) {
+ impl->log_write_mutex_.Unlock();
+ }
+ impl->DeleteObsoleteFiles();
+ s = impl->directories_.GetDbDir()->Fsync();
+ }
+ }
+
+ if (s.ok()) {
+ for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
+ if (cfd->ioptions()->compaction_style == kCompactionStyleFIFO) {
+ auto* vstorage = cfd->current()->storage_info();
+ for (int i = 1; i < vstorage->num_levels(); ++i) {
+ int num_files = vstorage->NumLevelFiles(i);
+ if (num_files > 0) {
+ s = Status::InvalidArgument(
+ "Not all files are at level 0. Cannot "
+ "open with FIFO compaction style.");
+ break;
+ }
+ }
+ }
+ if (!cfd->mem()->IsSnapshotSupported()) {
+ impl->is_snapshot_supported_ = false;
+ }
+ if (cfd->ioptions()->merge_operator != nullptr &&
+ !cfd->mem()->IsMergeOperatorSupported()) {
+ s = Status::InvalidArgument(
+ "The memtable of column family %s does not support merge operator "
+ "its options.merge_operator is non-null", cfd->GetName().c_str());
+ }
+ if (!s.ok()) {
+ break;
+ }
+ }
+ }
+ TEST_SYNC_POINT("DBImpl::Open:Opened");
+ Status persist_options_status;
+ if (s.ok()) {
+ // Persist RocksDB Options before scheduling the compaction.
+ // The WriteOptionsFile() will release and lock the mutex internally.
+ persist_options_status = impl->WriteOptionsFile(
+ false /*need_mutex_lock*/, false /*need_enter_write_thread*/);
+
+ *dbptr = impl;
+ impl->opened_successfully_ = true;
+ impl->MaybeScheduleFlushOrCompaction();
+ }
+ impl->mutex_.Unlock();
+
+#ifndef ROCKSDB_LITE
+ auto sfm = static_cast<SstFileManagerImpl*>(
+ impl->immutable_db_options_.sst_file_manager.get());
+ if (s.ok() && sfm) {
+ // Notify SstFileManager about all sst files that already exist in
+ // db_paths[0] when the DB is opened.
+ auto& db_path = impl->immutable_db_options_.db_paths[0];
+ std::vector<std::string> existing_files;
+ impl->immutable_db_options_.env->GetChildren(db_path.path, &existing_files);
+ for (auto& file_name : existing_files) {
+ uint64_t file_number;
+ FileType file_type;
+ std::string file_path = db_path.path + "/" + file_name;
+ if (ParseFileName(file_name, &file_number, &file_type) &&
+ file_type == kTableFile) {
+ sfm->OnAddFile(file_path);
+ }
+ }
+ }
+#endif // !ROCKSDB_LITE
+
+ if (s.ok()) {
+ ROCKS_LOG_INFO(impl->immutable_db_options_.info_log, "DB pointer %p", impl);
+ LogFlush(impl->immutable_db_options_.info_log);
+ if (!persist_options_status.ok()) {
+ s = Status::IOError(
+ "DB::Open() failed --- Unable to persist Options file",
+ persist_options_status.ToString());
+ }
+ }
+ if (!s.ok()) {
+ for (auto* h : *handles) {
+ delete h;
+ }
+ handles->clear();
+ delete impl;
+ *dbptr = nullptr;
+ }
+ return s;
+}
+} // namespace rocksdb
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/db_impl_readonly.cc
----------------------------------------------------------------------
diff --git a/thirdparty/rocksdb/db/db_impl_readonly.cc b/thirdparty/rocksdb/db/db_impl_readonly.cc
new file mode 100644
index 0000000..d69eecb
--- /dev/null
+++ b/thirdparty/rocksdb/db/db_impl_readonly.cc
@@ -0,0 +1,197 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#include "db/db_impl_readonly.h"
+
+#include "db/compacted_db_impl.h"
+#include "db/db_impl.h"
+#include "db/db_iter.h"
+#include "db/merge_context.h"
+#include "db/range_del_aggregator.h"
+#include "monitoring/perf_context_imp.h"
+
+namespace rocksdb {
+
+#ifndef ROCKSDB_LITE
+
+DBImplReadOnly::DBImplReadOnly(const DBOptions& db_options,
+ const std::string& dbname)
+ : DBImpl(db_options, dbname) {
+ ROCKS_LOG_INFO(immutable_db_options_.info_log,
+ "Opening the db in read only mode");
+ LogFlush(immutable_db_options_.info_log);
+}
+
+DBImplReadOnly::~DBImplReadOnly() {
+}
+
+// Implementations of the DB interface
+Status DBImplReadOnly::Get(const ReadOptions& read_options,
+ ColumnFamilyHandle* column_family, const Slice& key,
+ PinnableSlice* pinnable_val) {
+ assert(pinnable_val != nullptr);
+ Status s;
+ SequenceNumber snapshot = versions_->LastSequence();
+ auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
+ auto cfd = cfh->cfd();
+ SuperVersion* super_version = cfd->GetSuperVersion();
+ MergeContext merge_context;
+ RangeDelAggregator range_del_agg(cfd->internal_comparator(), snapshot);
+ LookupKey lkey(key, snapshot);
+ if (super_version->mem->Get(lkey, pinnable_val->GetSelf(), &s, &merge_context,
+ &range_del_agg, read_options)) {
+ pinnable_val->PinSelf();
+ } else {
+ PERF_TIMER_GUARD(get_from_output_files_time);
+ super_version->current->Get(read_options, lkey, pinnable_val, &s,
+ &merge_context, &range_del_agg);
+ }
+ return s;
+}
+
+Iterator* DBImplReadOnly::NewIterator(const ReadOptions& read_options,
+ ColumnFamilyHandle* column_family) {
+ auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
+ auto cfd = cfh->cfd();
+ SuperVersion* super_version = cfd->GetSuperVersion()->Ref();
+ SequenceNumber latest_snapshot = versions_->LastSequence();
+ auto db_iter = NewArenaWrappedDbIterator(
+ env_, read_options, *cfd->ioptions(),
+ (read_options.snapshot != nullptr
+ ? reinterpret_cast<const SnapshotImpl*>(read_options.snapshot)
+ ->number_
+ : latest_snapshot),
+ super_version->mutable_cf_options.max_sequential_skip_in_iterations,
+ super_version->version_number);
+ auto internal_iter =
+ NewInternalIterator(read_options, cfd, super_version, db_iter->GetArena(),
+ db_iter->GetRangeDelAggregator());
+ db_iter->SetIterUnderDBIter(internal_iter);
+ return db_iter;
+}
+
+Status DBImplReadOnly::NewIterators(
+ const ReadOptions& read_options,
+ const std::vector<ColumnFamilyHandle*>& column_families,
+ std::vector<Iterator*>* iterators) {
+ if (iterators == nullptr) {
+ return Status::InvalidArgument("iterators not allowed to be nullptr");
+ }
+ iterators->clear();
+ iterators->reserve(column_families.size());
+ SequenceNumber latest_snapshot = versions_->LastSequence();
+
+ for (auto cfh : column_families) {
+ auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(cfh)->cfd();
+ auto* sv = cfd->GetSuperVersion()->Ref();
+ auto* db_iter = NewArenaWrappedDbIterator(
+ env_, read_options, *cfd->ioptions(),
+ (read_options.snapshot != nullptr
+ ? reinterpret_cast<const SnapshotImpl*>(read_options.snapshot)
+ ->number_
+ : latest_snapshot),
+ sv->mutable_cf_options.max_sequential_skip_in_iterations,
+ sv->version_number);
+ auto* internal_iter =
+ NewInternalIterator(read_options, cfd, sv, db_iter->GetArena(),
+ db_iter->GetRangeDelAggregator());
+ db_iter->SetIterUnderDBIter(internal_iter);
+ iterators->push_back(db_iter);
+ }
+
+ return Status::OK();
+}
+
+Status DB::OpenForReadOnly(const Options& options, const std::string& dbname,
+ DB** dbptr, bool error_if_log_file_exist) {
+ *dbptr = nullptr;
+
+ // Try to first open DB as fully compacted DB
+ Status s;
+ s = CompactedDBImpl::Open(options, dbname, dbptr);
+ if (s.ok()) {
+ return s;
+ }
+
+ DBOptions db_options(options);
+ ColumnFamilyOptions cf_options(options);
+ std::vector<ColumnFamilyDescriptor> column_families;
+ column_families.push_back(
+ ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options));
+ std::vector<ColumnFamilyHandle*> handles;
+
+ s = DB::OpenForReadOnly(db_options, dbname, column_families, &handles, dbptr);
+ if (s.ok()) {
+ assert(handles.size() == 1);
+ // i can delete the handle since DBImpl is always holding a
+ // reference to default column family
+ delete handles[0];
+ }
+ return s;
+}
+
+Status DB::OpenForReadOnly(
+ const DBOptions& db_options, const std::string& dbname,
+ const std::vector<ColumnFamilyDescriptor>& column_families,
+ std::vector<ColumnFamilyHandle*>* handles, DB** dbptr,
+ bool error_if_log_file_exist) {
+ *dbptr = nullptr;
+ handles->clear();
+
+ DBImplReadOnly* impl = new DBImplReadOnly(db_options, dbname);
+ impl->mutex_.Lock();
+ Status s = impl->Recover(column_families, true /* read only */,
+ error_if_log_file_exist);
+ if (s.ok()) {
+ // set column family handles
+ for (auto cf : column_families) {
+ auto cfd =
+ impl->versions_->GetColumnFamilySet()->GetColumnFamily(cf.name);
+ if (cfd == nullptr) {
+ s = Status::InvalidArgument("Column family not found: ", cf.name);
+ break;
+ }
+ handles->push_back(new ColumnFamilyHandleImpl(cfd, impl, &impl->mutex_));
+ }
+ }
+ if (s.ok()) {
+ for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
+ delete cfd->InstallSuperVersion(new SuperVersion(), &impl->mutex_);
+ }
+ }
+ impl->mutex_.Unlock();
+ if (s.ok()) {
+ *dbptr = impl;
+ for (auto* h : *handles) {
+ impl->NewThreadStatusCfInfo(
+ reinterpret_cast<ColumnFamilyHandleImpl*>(h)->cfd());
+ }
+ } else {
+ for (auto h : *handles) {
+ delete h;
+ }
+ handles->clear();
+ delete impl;
+ }
+ return s;
+}
+
+#else // !ROCKSDB_LITE
+
+Status DB::OpenForReadOnly(const Options& options, const std::string& dbname,
+ DB** dbptr, bool error_if_log_file_exist) {
+ return Status::NotSupported("Not supported in ROCKSDB_LITE.");
+}
+
+Status DB::OpenForReadOnly(
+ const DBOptions& db_options, const std::string& dbname,
+ const std::vector<ColumnFamilyDescriptor>& column_families,
+ std::vector<ColumnFamilyHandle*>* handles, DB** dbptr,
+ bool error_if_log_file_exist) {
+ return Status::NotSupported("Not supported in ROCKSDB_LITE.");
+}
+#endif // !ROCKSDB_LITE
+
+} // namespace rocksdb
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/db_impl_readonly.h
----------------------------------------------------------------------
diff --git a/thirdparty/rocksdb/db/db_impl_readonly.h b/thirdparty/rocksdb/db/db_impl_readonly.h
new file mode 100644
index 0000000..9bdc95c
--- /dev/null
+++ b/thirdparty/rocksdb/db/db_impl_readonly.h
@@ -0,0 +1,123 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#pragma once
+
+#ifndef ROCKSDB_LITE
+
+#include "db/db_impl.h"
+#include <vector>
+#include <string>
+
+namespace rocksdb {
+
+class DBImplReadOnly : public DBImpl {
+ public:
+ DBImplReadOnly(const DBOptions& options, const std::string& dbname);
+ virtual ~DBImplReadOnly();
+
+ // Implementations of the DB interface
+ using DB::Get;
+ virtual Status Get(const ReadOptions& options,
+ ColumnFamilyHandle* column_family, const Slice& key,
+ PinnableSlice* value) override;
+
+ // TODO: Implement ReadOnly MultiGet?
+
+ using DBImpl::NewIterator;
+ virtual Iterator* NewIterator(const ReadOptions&,
+ ColumnFamilyHandle* column_family) override;
+
+ virtual Status NewIterators(
+ const ReadOptions& options,
+ const std::vector<ColumnFamilyHandle*>& column_families,
+ std::vector<Iterator*>* iterators) override;
+
+ using DBImpl::Put;
+ virtual Status Put(const WriteOptions& options,
+ ColumnFamilyHandle* column_family, const Slice& key,
+ const Slice& value) override {
+ return Status::NotSupported("Not supported operation in read only mode.");
+ }
+ using DBImpl::Merge;
+ virtual Status Merge(const WriteOptions& options,
+ ColumnFamilyHandle* column_family, const Slice& key,
+ const Slice& value) override {
+ return Status::NotSupported("Not supported operation in read only mode.");
+ }
+ using DBImpl::Delete;
+ virtual Status Delete(const WriteOptions& options,
+ ColumnFamilyHandle* column_family,
+ const Slice& key) override {
+ return Status::NotSupported("Not supported operation in read only mode.");
+ }
+ using DBImpl::SingleDelete;
+ virtual Status SingleDelete(const WriteOptions& options,
+ ColumnFamilyHandle* column_family,
+ const Slice& key) override {
+ return Status::NotSupported("Not supported operation in read only mode.");
+ }
+ virtual Status Write(const WriteOptions& options,
+ WriteBatch* updates) override {
+ return Status::NotSupported("Not supported operation in read only mode.");
+ }
+ using DBImpl::CompactRange;
+ virtual Status CompactRange(const CompactRangeOptions& options,
+ ColumnFamilyHandle* column_family,
+ const Slice* begin, const Slice* end) override {
+ return Status::NotSupported("Not supported operation in read only mode.");
+ }
+
+ using DBImpl::CompactFiles;
+ virtual Status CompactFiles(
+ const CompactionOptions& compact_options,
+ ColumnFamilyHandle* column_family,
+ const std::vector<std::string>& input_file_names,
+ const int output_level, const int output_path_id = -1) override {
+ return Status::NotSupported("Not supported operation in read only mode.");
+ }
+
+ virtual Status DisableFileDeletions() override {
+ return Status::NotSupported("Not supported operation in read only mode.");
+ }
+
+ virtual Status EnableFileDeletions(bool force) override {
+ return Status::NotSupported("Not supported operation in read only mode.");
+ }
+ virtual Status GetLiveFiles(std::vector<std::string>&,
+ uint64_t* manifest_file_size,
+ bool flush_memtable = true) override {
+ return Status::NotSupported("Not supported operation in read only mode.");
+ }
+
+ using DBImpl::Flush;
+ virtual Status Flush(const FlushOptions& options,
+ ColumnFamilyHandle* column_family) override {
+ return Status::NotSupported("Not supported operation in read only mode.");
+ }
+
+ using DBImpl::SyncWAL;
+ virtual Status SyncWAL() override {
+ return Status::NotSupported("Not supported operation in read only mode.");
+ }
+
+ using DB::IngestExternalFile;
+ virtual Status IngestExternalFile(
+ ColumnFamilyHandle* column_family,
+ const std::vector<std::string>& external_files,
+ const IngestExternalFileOptions& ingestion_options) override {
+ return Status::NotSupported("Not supported operation in read only mode.");
+ }
+
+ private:
+ friend class DB;
+
+ // No copying allowed
+ DBImplReadOnly(const DBImplReadOnly&);
+ void operator=(const DBImplReadOnly&);
+};
+}
+
+#endif // !ROCKSDB_LITE
|