nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jeremyd...@apache.org
Subject [19/51] [partial] nifi-minifi-cpp git commit: MINIFI-372: Replace leveldb with RocksDB
Date Mon, 09 Oct 2017 16:24:59 GMT
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/db_impl_write.cc
----------------------------------------------------------------------
diff --git a/thirdparty/rocksdb/db/db_impl_write.cc b/thirdparty/rocksdb/db/db_impl_write.cc
new file mode 100644
index 0000000..b93dd6f
--- /dev/null
+++ b/thirdparty/rocksdb/db/db_impl_write.cc
@@ -0,0 +1,1240 @@
+//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+#include "db/db_impl.h"
+
+#ifndef __STDC_FORMAT_MACROS
+#define __STDC_FORMAT_MACROS
+#endif
+#include <inttypes.h>
+#include "db/event_helpers.h"
+#include "monitoring/perf_context_imp.h"
+#include "options/options_helper.h"
+#include "util/sync_point.h"
+
+namespace rocksdb {
+// Convenience methods
+Status DBImpl::Put(const WriteOptions& o, ColumnFamilyHandle* column_family,
+                   const Slice& key, const Slice& val) {
+  return DB::Put(o, column_family, key, val);
+}
+
+Status DBImpl::Merge(const WriteOptions& o, ColumnFamilyHandle* column_family,
+                     const Slice& key, const Slice& val) {
+  auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
+  if (!cfh->cfd()->ioptions()->merge_operator) {
+    return Status::NotSupported("Provide a merge_operator when opening DB");
+  } else {
+    return DB::Merge(o, column_family, key, val);
+  }
+}
+
+Status DBImpl::Delete(const WriteOptions& write_options,
+                      ColumnFamilyHandle* column_family, const Slice& key) {
+  return DB::Delete(write_options, column_family, key);
+}
+
+Status DBImpl::SingleDelete(const WriteOptions& write_options,
+                            ColumnFamilyHandle* column_family,
+                            const Slice& key) {
+  return DB::SingleDelete(write_options, column_family, key);
+}
+
+Status DBImpl::Write(const WriteOptions& write_options, WriteBatch* my_batch) {
+  return WriteImpl(write_options, my_batch, nullptr, nullptr);
+}
+
+#ifndef ROCKSDB_LITE
+Status DBImpl::WriteWithCallback(const WriteOptions& write_options,
+                                 WriteBatch* my_batch,
+                                 WriteCallback* callback) {
+  return WriteImpl(write_options, my_batch, callback, nullptr);
+}
+#endif  // ROCKSDB_LITE
+
+Status DBImpl::WriteImpl(const WriteOptions& write_options,
+                         WriteBatch* my_batch, WriteCallback* callback,
+                         uint64_t* log_used, uint64_t log_ref,
+                         bool disable_memtable) {
+  if (my_batch == nullptr) {
+    return Status::Corruption("Batch is nullptr!");
+  }
+  if (concurrent_prepare_ && immutable_db_options_.enable_pipelined_write) {
+    return Status::NotSupported(
+        "pipelined_writes is not compatible with concurrent prepares");
+  }
+
+  Status status;
+  if (write_options.low_pri) {
+    status = ThrottleLowPriWritesIfNeeded(write_options, my_batch);
+    if (!status.ok()) {
+      return status;
+    }
+  }
+
+  if (concurrent_prepare_ && disable_memtable) {
+    return WriteImplWALOnly(write_options, my_batch, callback, log_used,
+                            log_ref);
+  }
+
+  if (immutable_db_options_.enable_pipelined_write) {
+    return PipelinedWriteImpl(write_options, my_batch, callback, log_used,
+                              log_ref, disable_memtable);
+  }
+
+  PERF_TIMER_GUARD(write_pre_and_post_process_time);
+  WriteThread::Writer w(write_options, my_batch, callback, log_ref,
+                        disable_memtable);
+
+  if (!write_options.disableWAL) {
+    RecordTick(stats_, WRITE_WITH_WAL);
+  }
+
+  StopWatch write_sw(env_, immutable_db_options_.statistics.get(), DB_WRITE);
+
+  write_thread_.JoinBatchGroup(&w);
+  if (w.state == WriteThread::STATE_PARALLEL_MEMTABLE_WRITER) {
+    // we are a non-leader in a parallel group
+    PERF_TIMER_GUARD(write_memtable_time);
+
+    if (w.ShouldWriteToMemtable()) {
+      ColumnFamilyMemTablesImpl column_family_memtables(
+          versions_->GetColumnFamilySet());
+      w.status = WriteBatchInternal::InsertInto(
+          &w, w.sequence, &column_family_memtables, &flush_scheduler_,
+          write_options.ignore_missing_column_families, 0 /*log_number*/, this,
+          true /*concurrent_memtable_writes*/);
+    }
+
+    if (write_thread_.CompleteParallelMemTableWriter(&w)) {
+      // we're responsible for exit batch group
+      auto last_sequence = w.write_group->last_sequence;
+      versions_->SetLastSequence(last_sequence);
+      MemTableInsertStatusCheck(w.status);
+      write_thread_.ExitAsBatchGroupFollower(&w);
+    }
+    assert(w.state == WriteThread::STATE_COMPLETED);
+    // STATE_COMPLETED conditional below handles exit
+
+    status = w.FinalStatus();
+  }
+  if (w.state == WriteThread::STATE_COMPLETED) {
+    if (log_used != nullptr) {
+      *log_used = w.log_used;
+    }
+    // write is complete and leader has updated sequence
+    return w.FinalStatus();
+  }
+  // else we are the leader of the write batch group
+  assert(w.state == WriteThread::STATE_GROUP_LEADER);
+
+  // Once reaches this point, the current writer "w" will try to do its write
+  // job.  It may also pick up some of the remaining writers in the "writers_"
+  // when it finds suitable, and finish them in the same write batch.
+  // This is how a write job could be done by the other writer.
+  WriteContext write_context;
+  WriteThread::WriteGroup write_group;
+  bool in_parallel_group = false;
+  uint64_t last_sequence = kMaxSequenceNumber;
+  if (!concurrent_prepare_) {
+    last_sequence = versions_->LastSequence();
+  }
+
+  mutex_.Lock();
+
+  bool need_log_sync = !write_options.disableWAL && write_options.sync;
+  bool need_log_dir_sync = need_log_sync && !log_dir_synced_;
+  if (!concurrent_prepare_ || !disable_memtable) {
+    // With concurrent writes we do preprocess only in the write thread that
+    // also does write to memtable to avoid sync issue on shared data structure
+    // with the other thread
+    status = PreprocessWrite(write_options, &need_log_sync, &write_context);
+  }
+  log::Writer* log_writer = logs_.back().writer;
+
+  mutex_.Unlock();
+
+  // Add to log and apply to memtable.  We can release the lock
+  // during this phase since &w is currently responsible for logging
+  // and protects against concurrent loggers and concurrent writes
+  // into memtables
+
+  last_batch_group_size_ =
+      write_thread_.EnterAsBatchGroupLeader(&w, &write_group);
+
+  if (status.ok()) {
+    // Rules for when we can update the memtable concurrently
+    // 1. supported by memtable
+    // 2. Puts are not okay if inplace_update_support
+    // 3. Merges are not okay
+    //
+    // Rules 1..2 are enforced by checking the options
+    // during startup (CheckConcurrentWritesSupported), so if
+    // options.allow_concurrent_memtable_write is true then they can be
+    // assumed to be true.  Rule 3 is checked for each batch.  We could
+    // relax rules 2 if we could prevent write batches from referring
+    // more than once to a particular key.
+    bool parallel = immutable_db_options_.allow_concurrent_memtable_write &&
+                    write_group.size > 1;
+    int total_count = 0;
+    uint64_t total_byte_size = 0;
+    for (auto* writer : write_group) {
+      if (writer->CheckCallback(this)) {
+        if (writer->ShouldWriteToMemtable()) {
+          total_count += WriteBatchInternal::Count(writer->batch);
+          parallel = parallel && !writer->batch->HasMerge();
+        }
+
+        total_byte_size = WriteBatchInternal::AppendedByteSize(
+            total_byte_size, WriteBatchInternal::ByteSize(writer->batch));
+      }
+    }
+
+    const bool concurrent_update = concurrent_prepare_;
+    // Update stats while we are an exclusive group leader, so we know
+    // that nobody else can be writing to these particular stats.
+    // We're optimistic, updating the stats before we successfully
+    // commit.  That lets us release our leader status early.
+    auto stats = default_cf_internal_stats_;
+    stats->AddDBStats(InternalStats::NUMBER_KEYS_WRITTEN, total_count,
+                      concurrent_update);
+    RecordTick(stats_, NUMBER_KEYS_WRITTEN, total_count);
+    stats->AddDBStats(InternalStats::BYTES_WRITTEN, total_byte_size,
+                      concurrent_update);
+    RecordTick(stats_, BYTES_WRITTEN, total_byte_size);
+    stats->AddDBStats(InternalStats::WRITE_DONE_BY_SELF, 1, concurrent_update);
+    RecordTick(stats_, WRITE_DONE_BY_SELF);
+    auto write_done_by_other = write_group.size - 1;
+    if (write_done_by_other > 0) {
+      stats->AddDBStats(InternalStats::WRITE_DONE_BY_OTHER, write_done_by_other,
+                        concurrent_update);
+      RecordTick(stats_, WRITE_DONE_BY_OTHER, write_done_by_other);
+    }
+    MeasureTime(stats_, BYTES_PER_WRITE, total_byte_size);
+
+    if (write_options.disableWAL) {
+      has_unpersisted_data_.store(true, std::memory_order_relaxed);
+    }
+
+    PERF_TIMER_STOP(write_pre_and_post_process_time);
+
+    if (!concurrent_prepare_) {
+      if (status.ok() && !write_options.disableWAL) {
+        PERF_TIMER_GUARD(write_wal_time);
+        status = WriteToWAL(write_group, log_writer, log_used, need_log_sync,
+                            need_log_dir_sync, last_sequence + 1);
+      }
+    } else {
+      if (status.ok() && !write_options.disableWAL) {
+        PERF_TIMER_GUARD(write_wal_time);
+        // LastToBeWrittenSequence is increased inside WriteToWAL under
+        // wal_write_mutex_ to ensure ordered events in WAL
+        status = ConcurrentWriteToWAL(write_group, log_used, &last_sequence,
+                                      total_count);
+      } else {
+        // Otherwise we inc seq number for memtable writes
+        last_sequence = versions_->FetchAddLastToBeWrittenSequence(total_count);
+      }
+    }
+    assert(last_sequence != kMaxSequenceNumber);
+    const SequenceNumber current_sequence = last_sequence + 1;
+    last_sequence += total_count;
+
+    if (status.ok()) {
+      PERF_TIMER_GUARD(write_memtable_time);
+
+      if (!parallel) {
+        w.status = WriteBatchInternal::InsertInto(
+            write_group, current_sequence, column_family_memtables_.get(),
+            &flush_scheduler_, write_options.ignore_missing_column_families,
+            0 /*recovery_log_number*/, this);
+      } else {
+        SequenceNumber next_sequence = current_sequence;
+        for (auto* writer : write_group) {
+          if (writer->ShouldWriteToMemtable()) {
+            writer->sequence = next_sequence;
+            next_sequence += WriteBatchInternal::Count(writer->batch);
+          }
+        }
+        write_group.last_sequence = last_sequence;
+        write_group.running.store(static_cast<uint32_t>(write_group.size),
+                                  std::memory_order_relaxed);
+        write_thread_.LaunchParallelMemTableWriters(&write_group);
+        in_parallel_group = true;
+
+        // Each parallel follower is doing each own writes. The leader should
+        // also do its own.
+        if (w.ShouldWriteToMemtable()) {
+          ColumnFamilyMemTablesImpl column_family_memtables(
+              versions_->GetColumnFamilySet());
+          assert(w.sequence == current_sequence);
+          w.status = WriteBatchInternal::InsertInto(
+              &w, w.sequence, &column_family_memtables, &flush_scheduler_,
+              write_options.ignore_missing_column_families, 0 /*log_number*/,
+              this, true /*concurrent_memtable_writes*/);
+        }
+      }
+    }
+  }
+  PERF_TIMER_START(write_pre_and_post_process_time);
+
+  if (!w.CallbackFailed()) {
+    WriteCallbackStatusCheck(status);
+  }
+
+  if (need_log_sync) {
+    mutex_.Lock();
+    MarkLogsSynced(logfile_number_, need_log_dir_sync, status);
+    mutex_.Unlock();
+    // Requesting sync with concurrent_prepare_ is expected to be very rare. We
+    // hance provide a simple implementation that is not necessarily efficient.
+    if (concurrent_prepare_) {
+      if (manual_wal_flush_) {
+        status = FlushWAL(true);
+      } else {
+        status = SyncWAL();
+      }
+    }
+  }
+
+  bool should_exit_batch_group = true;
+  if (in_parallel_group) {
+    // CompleteParallelWorker returns true if this thread should
+    // handle exit, false means somebody else did
+    should_exit_batch_group = write_thread_.CompleteParallelMemTableWriter(&w);
+  }
+  if (should_exit_batch_group) {
+    if (status.ok()) {
+      versions_->SetLastSequence(last_sequence);
+    }
+    MemTableInsertStatusCheck(w.status);
+    write_thread_.ExitAsBatchGroupLeader(write_group, w.status);
+  }
+
+  if (status.ok()) {
+    status = w.FinalStatus();
+  }
+  return status;
+}
+
+Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options,
+                                  WriteBatch* my_batch, WriteCallback* callback,
+                                  uint64_t* log_used, uint64_t log_ref,
+                                  bool disable_memtable) {
+  PERF_TIMER_GUARD(write_pre_and_post_process_time);
+  StopWatch write_sw(env_, immutable_db_options_.statistics.get(), DB_WRITE);
+
+  WriteContext write_context;
+
+  WriteThread::Writer w(write_options, my_batch, callback, log_ref,
+                        disable_memtable);
+  write_thread_.JoinBatchGroup(&w);
+  if (w.state == WriteThread::STATE_GROUP_LEADER) {
+    WriteThread::WriteGroup wal_write_group;
+    if (w.callback && !w.callback->AllowWriteBatching()) {
+      write_thread_.WaitForMemTableWriters();
+    }
+    mutex_.Lock();
+    bool need_log_sync = !write_options.disableWAL && write_options.sync;
+    bool need_log_dir_sync = need_log_sync && !log_dir_synced_;
+    w.status = PreprocessWrite(write_options, &need_log_sync, &write_context);
+    log::Writer* log_writer = logs_.back().writer;
+    mutex_.Unlock();
+
+    // This can set non-OK status if callback fail.
+    last_batch_group_size_ =
+        write_thread_.EnterAsBatchGroupLeader(&w, &wal_write_group);
+    const SequenceNumber current_sequence =
+        write_thread_.UpdateLastSequence(versions_->LastSequence()) + 1;
+    size_t total_count = 0;
+    size_t total_byte_size = 0;
+
+    if (w.status.ok()) {
+      SequenceNumber next_sequence = current_sequence;
+      for (auto writer : wal_write_group) {
+        if (writer->CheckCallback(this)) {
+          if (writer->ShouldWriteToMemtable()) {
+            writer->sequence = next_sequence;
+            size_t count = WriteBatchInternal::Count(writer->batch);
+            next_sequence += count;
+            total_count += count;
+          }
+          total_byte_size = WriteBatchInternal::AppendedByteSize(
+              total_byte_size, WriteBatchInternal::ByteSize(writer->batch));
+        }
+      }
+      if (w.disable_wal) {
+        has_unpersisted_data_.store(true, std::memory_order_relaxed);
+      }
+      write_thread_.UpdateLastSequence(current_sequence + total_count - 1);
+    }
+
+    auto stats = default_cf_internal_stats_;
+    stats->AddDBStats(InternalStats::NUMBER_KEYS_WRITTEN, total_count);
+    RecordTick(stats_, NUMBER_KEYS_WRITTEN, total_count);
+    stats->AddDBStats(InternalStats::BYTES_WRITTEN, total_byte_size);
+    RecordTick(stats_, BYTES_WRITTEN, total_byte_size);
+
+    PERF_TIMER_STOP(write_pre_and_post_process_time);
+
+    if (w.ShouldWriteToWAL()) {
+      PERF_TIMER_GUARD(write_wal_time);
+      stats->AddDBStats(InternalStats::WRITE_DONE_BY_SELF, 1);
+      RecordTick(stats_, WRITE_DONE_BY_SELF, 1);
+      if (wal_write_group.size > 1) {
+        stats->AddDBStats(InternalStats::WRITE_DONE_BY_OTHER,
+                          wal_write_group.size - 1);
+        RecordTick(stats_, WRITE_DONE_BY_OTHER, wal_write_group.size - 1);
+      }
+      w.status = WriteToWAL(wal_write_group, log_writer, log_used,
+                            need_log_sync, need_log_dir_sync, current_sequence);
+    }
+
+    if (!w.CallbackFailed()) {
+      WriteCallbackStatusCheck(w.status);
+    }
+
+    if (need_log_sync) {
+      mutex_.Lock();
+      MarkLogsSynced(logfile_number_, need_log_dir_sync, w.status);
+      mutex_.Unlock();
+    }
+
+    write_thread_.ExitAsBatchGroupLeader(wal_write_group, w.status);
+  }
+
+  WriteThread::WriteGroup memtable_write_group;
+  if (w.state == WriteThread::STATE_MEMTABLE_WRITER_LEADER) {
+    PERF_TIMER_GUARD(write_memtable_time);
+    assert(w.status.ok());
+    write_thread_.EnterAsMemTableWriter(&w, &memtable_write_group);
+    if (memtable_write_group.size > 1 &&
+        immutable_db_options_.allow_concurrent_memtable_write) {
+      write_thread_.LaunchParallelMemTableWriters(&memtable_write_group);
+    } else {
+      memtable_write_group.status = WriteBatchInternal::InsertInto(
+          memtable_write_group, w.sequence, column_family_memtables_.get(),
+          &flush_scheduler_, write_options.ignore_missing_column_families,
+          0 /*log_number*/, this);
+      versions_->SetLastSequence(memtable_write_group.last_sequence);
+      write_thread_.ExitAsMemTableWriter(&w, memtable_write_group);
+    }
+  }
+
+  if (w.state == WriteThread::STATE_PARALLEL_MEMTABLE_WRITER) {
+    assert(w.ShouldWriteToMemtable());
+    ColumnFamilyMemTablesImpl column_family_memtables(
+        versions_->GetColumnFamilySet());
+    w.status = WriteBatchInternal::InsertInto(
+        &w, w.sequence, &column_family_memtables, &flush_scheduler_,
+        write_options.ignore_missing_column_families, 0 /*log_number*/, this,
+        true /*concurrent_memtable_writes*/);
+    if (write_thread_.CompleteParallelMemTableWriter(&w)) {
+      MemTableInsertStatusCheck(w.status);
+      versions_->SetLastSequence(w.write_group->last_sequence);
+      write_thread_.ExitAsMemTableWriter(&w, *w.write_group);
+    }
+  }
+
+  assert(w.state == WriteThread::STATE_COMPLETED);
+  return w.FinalStatus();
+}
+
+Status DBImpl::WriteImplWALOnly(const WriteOptions& write_options,
+                                WriteBatch* my_batch, WriteCallback* callback,
+                                uint64_t* log_used, uint64_t log_ref) {
+  Status status;
+  PERF_TIMER_GUARD(write_pre_and_post_process_time);
+  WriteThread::Writer w(write_options, my_batch, callback, log_ref,
+                        true /* disable_memtable */);
+  if (write_options.disableWAL) {
+    return status;
+  }
+  RecordTick(stats_, WRITE_WITH_WAL);
+  StopWatch write_sw(env_, immutable_db_options_.statistics.get(), DB_WRITE);
+
+  nonmem_write_thread_.JoinBatchGroup(&w);
+  assert(w.state != WriteThread::STATE_PARALLEL_MEMTABLE_WRITER);
+  if (w.state == WriteThread::STATE_COMPLETED) {
+    if (log_used != nullptr) {
+      *log_used = w.log_used;
+    }
+    return w.FinalStatus();
+  }
+  // else we are the leader of the write batch group
+  assert(w.state == WriteThread::STATE_GROUP_LEADER);
+  WriteContext write_context;
+  WriteThread::WriteGroup write_group;
+  uint64_t last_sequence;
+  nonmem_write_thread_.EnterAsBatchGroupLeader(&w, &write_group);
+  // Note: no need to update last_batch_group_size_ here since the batch writes
+  // to WAL only
+
+  uint64_t total_byte_size = 0;
+  for (auto* writer : write_group) {
+    if (writer->CheckCallback(this)) {
+      total_byte_size = WriteBatchInternal::AppendedByteSize(
+          total_byte_size, WriteBatchInternal::ByteSize(writer->batch));
+    }
+  }
+
+  const bool concurrent_update = true;
+  // Update stats while we are an exclusive group leader, so we know
+  // that nobody else can be writing to these particular stats.
+  // We're optimistic, updating the stats before we successfully
+  // commit.  That lets us release our leader status early.
+  auto stats = default_cf_internal_stats_;
+  stats->AddDBStats(InternalStats::BYTES_WRITTEN, total_byte_size,
+                    concurrent_update);
+  RecordTick(stats_, BYTES_WRITTEN, total_byte_size);
+  stats->AddDBStats(InternalStats::WRITE_DONE_BY_SELF, 1, concurrent_update);
+  RecordTick(stats_, WRITE_DONE_BY_SELF);
+  auto write_done_by_other = write_group.size - 1;
+  if (write_done_by_other > 0) {
+    stats->AddDBStats(InternalStats::WRITE_DONE_BY_OTHER, write_done_by_other,
+                      concurrent_update);
+    RecordTick(stats_, WRITE_DONE_BY_OTHER, write_done_by_other);
+  }
+  MeasureTime(stats_, BYTES_PER_WRITE, total_byte_size);
+
+  PERF_TIMER_STOP(write_pre_and_post_process_time);
+
+  PERF_TIMER_GUARD(write_wal_time);
+  // LastToBeWrittenSequence is increased inside WriteToWAL under
+  // wal_write_mutex_ to ensure ordered events in WAL
+  status = ConcurrentWriteToWAL(write_group, log_used, &last_sequence,
+                                0 /*total_count*/);
+  if (status.ok() && write_options.sync) {
+    // Requesting sync with concurrent_prepare_ is expected to be very rare. We
+    // hance provide a simple implementation that is not necessarily efficient.
+    if (manual_wal_flush_) {
+      status = FlushWAL(true);
+    } else {
+      status = SyncWAL();
+    }
+  }
+  PERF_TIMER_START(write_pre_and_post_process_time);
+
+  if (!w.CallbackFailed()) {
+    WriteCallbackStatusCheck(status);
+  }
+  nonmem_write_thread_.ExitAsBatchGroupLeader(write_group, w.status);
+  if (status.ok()) {
+    status = w.FinalStatus();
+  }
+  return status;
+}
+
+void DBImpl::WriteCallbackStatusCheck(const Status& status) {
+  // Is setting bg_error_ enough here?  This will at least stop
+  // compaction and fail any further writes.
+  if (immutable_db_options_.paranoid_checks && !status.ok() &&
+      !status.IsBusy() && !status.IsIncomplete()) {
+    mutex_.Lock();
+    if (bg_error_.ok()) {
+      Status new_bg_error = status;
+      // may temporarily unlock and lock the mutex.
+      EventHelpers::NotifyOnBackgroundError(immutable_db_options_.listeners,
+                                            BackgroundErrorReason::kWriteCallback,
+                                            &new_bg_error, &mutex_);
+      if (!new_bg_error.ok()) {
+        bg_error_ = new_bg_error;  // stop compaction & fail any further writes
+      }
+    }
+    mutex_.Unlock();
+  }
+}
+
+void DBImpl::MemTableInsertStatusCheck(const Status& status) {
+  // A non-OK status here indicates that the state implied by the
+  // WAL has diverged from the in-memory state.  This could be
+  // because of a corrupt write_batch (very bad), or because the
+  // client specified an invalid column family and didn't specify
+  // ignore_missing_column_families.
+  if (!status.ok()) {
+    mutex_.Lock();
+    assert(bg_error_.ok());
+    Status new_bg_error = status;
+    // may temporarily unlock and lock the mutex.
+    EventHelpers::NotifyOnBackgroundError(immutable_db_options_.listeners,
+                                          BackgroundErrorReason::kMemTable,
+                                          &new_bg_error, &mutex_);
+    if (!new_bg_error.ok()) {
+      bg_error_ = new_bg_error;  // stop compaction & fail any further writes
+    }
+    mutex_.Unlock();
+  }
+}
+
+Status DBImpl::PreprocessWrite(const WriteOptions& write_options,
+                               bool* need_log_sync,
+                               WriteContext* write_context) {
+  mutex_.AssertHeld();
+  assert(write_context != nullptr && need_log_sync != nullptr);
+  Status status;
+
+  assert(!single_column_family_mode_ ||
+         versions_->GetColumnFamilySet()->NumberOfColumnFamilies() == 1);
+  if (UNLIKELY(status.ok() && !single_column_family_mode_ &&
+               total_log_size_ > GetMaxTotalWalSize())) {
+    status = HandleWALFull(write_context);
+  }
+
+  if (UNLIKELY(status.ok() && write_buffer_manager_->ShouldFlush())) {
+    // Before a new memtable is added in SwitchMemtable(),
+    // write_buffer_manager_->ShouldFlush() will keep returning true. If another
+    // thread is writing to another DB with the same write buffer, they may also
+    // be flushed. We may end up with flushing much more DBs than needed. It's
+    // suboptimal but still correct.
+    status = HandleWriteBufferFull(write_context);
+  }
+
+  if (UNLIKELY(status.ok() && !bg_error_.ok())) {
+    return bg_error_;
+  }
+
+  if (UNLIKELY(status.ok() && !flush_scheduler_.Empty())) {
+    status = ScheduleFlushes(write_context);
+  }
+
+  if (UNLIKELY(status.ok() && (write_controller_.IsStopped() ||
+                               write_controller_.NeedsDelay()))) {
+    PERF_TIMER_GUARD(write_delay_time);
+    // We don't know size of curent batch so that we always use the size
+    // for previous one. It might create a fairness issue that expiration
+    // might happen for smaller writes but larger writes can go through.
+    // Can optimize it if it is an issue.
+    status = DelayWrite(last_batch_group_size_, write_options);
+  }
+
+  if (status.ok() && *need_log_sync) {
+    // Wait until the parallel syncs are finished. Any sync process has to sync
+    // the front log too so it is enough to check the status of front()
+    // We do a while loop since log_sync_cv_ is signalled when any sync is
+    // finished
+    // Note: there does not seem to be a reason to wait for parallel sync at
+    // this early step but it is not important since parallel sync (SyncWAL) and
+    // need_log_sync are usually not used together.
+    while (logs_.front().getting_synced) {
+      log_sync_cv_.Wait();
+    }
+    for (auto& log : logs_) {
+      assert(!log.getting_synced);
+      // This is just to prevent the logs to be synced by a parallel SyncWAL
+      // call. We will do the actual syncing later after we will write to the
+      // WAL.
+      // Note: there does not seem to be a reason to set this early before we
+      // actually write to the WAL
+      log.getting_synced = true;
+    }
+  } else {
+    *need_log_sync = false;
+  }
+
+  return status;
+}
+
+WriteBatch* DBImpl::MergeBatch(const WriteThread::WriteGroup& write_group,
+                               WriteBatch* tmp_batch, size_t* write_with_wal) {
+  assert(write_with_wal != nullptr);
+  assert(tmp_batch != nullptr);
+  WriteBatch* merged_batch = nullptr;
+  *write_with_wal = 0;
+  auto* leader = write_group.leader;
+  if (write_group.size == 1 && leader->ShouldWriteToWAL() &&
+      leader->batch->GetWalTerminationPoint().is_cleared()) {
+    // we simply write the first WriteBatch to WAL if the group only
+    // contains one batch, that batch should be written to the WAL,
+    // and the batch is not wanting to be truncated
+    merged_batch = leader->batch;
+    *write_with_wal = 1;
+  } else {
+    // WAL needs all of the batches flattened into a single batch.
+    // We could avoid copying here with an iov-like AddRecord
+    // interface
+    merged_batch = tmp_batch;
+    for (auto writer : write_group) {
+      if (writer->ShouldWriteToWAL()) {
+        WriteBatchInternal::Append(merged_batch, writer->batch,
+                                   /*WAL_only*/ true);
+        (*write_with_wal)++;
+      }
+    }
+  }
+  return merged_batch;
+}
+
+// When concurrent_prepare_ is disabled, this function is called from the only
+// write thread. Otherwise this must be called holding log_write_mutex_.
+Status DBImpl::WriteToWAL(const WriteBatch& merged_batch,
+                          log::Writer* log_writer, uint64_t* log_used,
+                          uint64_t* log_size) {
+  assert(log_size != nullptr);
+  Slice log_entry = WriteBatchInternal::Contents(&merged_batch);
+  *log_size = log_entry.size();
+  Status status = log_writer->AddRecord(log_entry);
+  if (log_used != nullptr) {
+    *log_used = logfile_number_;
+  }
+  total_log_size_ += log_entry.size();
+  // TODO(myabandeh): it might be unsafe to access alive_log_files_.back() here
+  // since alive_log_files_ might be modified concurrently
+  alive_log_files_.back().AddSize(log_entry.size());
+  log_empty_ = false;
+  return status;
+}
+
+Status DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group,
+                          log::Writer* log_writer, uint64_t* log_used,
+                          bool need_log_sync, bool need_log_dir_sync,
+                          SequenceNumber sequence) {
+  Status status;
+
+  size_t write_with_wal = 0;
+  WriteBatch* merged_batch =
+      MergeBatch(write_group, &tmp_batch_, &write_with_wal);
+  if (merged_batch == write_group.leader->batch) {
+    write_group.leader->log_used = logfile_number_;
+  } else if (write_with_wal > 1) {
+    for (auto writer : write_group) {
+      writer->log_used = logfile_number_;
+    }
+  }
+
+  WriteBatchInternal::SetSequence(merged_batch, sequence);
+
+  uint64_t log_size;
+  status = WriteToWAL(*merged_batch, log_writer, log_used, &log_size);
+
+  if (status.ok() && need_log_sync) {
+    StopWatch sw(env_, stats_, WAL_FILE_SYNC_MICROS);
+    // It's safe to access logs_ with unlocked mutex_ here because:
+    //  - we've set getting_synced=true for all logs,
+    //    so other threads won't pop from logs_ while we're here,
+    //  - only writer thread can push to logs_, and we're in
+    //    writer thread, so no one will push to logs_,
+    //  - as long as other threads don't modify it, it's safe to read
+    //    from std::deque from multiple threads concurrently.
+    for (auto& log : logs_) {
+      status = log.writer->file()->Sync(immutable_db_options_.use_fsync);
+      if (!status.ok()) {
+        break;
+      }
+    }
+    if (status.ok() && need_log_dir_sync) {
+      // We only sync WAL directory the first time WAL syncing is
+      // requested, so that in case users never turn on WAL sync,
+      // we can avoid the disk I/O in the write code path.
+      status = directories_.GetWalDir()->Fsync();
+    }
+  }
+
+  if (merged_batch == &tmp_batch_) {
+    tmp_batch_.Clear();
+  }
+  if (status.ok()) {
+    auto stats = default_cf_internal_stats_;
+    if (need_log_sync) {
+      stats->AddDBStats(InternalStats::WAL_FILE_SYNCED, 1);
+      RecordTick(stats_, WAL_FILE_SYNCED);
+    }
+    stats->AddDBStats(InternalStats::WAL_FILE_BYTES, log_size);
+    RecordTick(stats_, WAL_FILE_BYTES, log_size);
+    stats->AddDBStats(InternalStats::WRITE_WITH_WAL, write_with_wal);
+    RecordTick(stats_, WRITE_WITH_WAL, write_with_wal);
+  }
+  return status;
+}
+
+Status DBImpl::ConcurrentWriteToWAL(const WriteThread::WriteGroup& write_group,
+                                    uint64_t* log_used,
+                                    SequenceNumber* last_sequence,
+                                    int total_count) {
+  Status status;
+
+  WriteBatch tmp_batch;
+  size_t write_with_wal = 0;
+  WriteBatch* merged_batch =
+      MergeBatch(write_group, &tmp_batch, &write_with_wal);
+
+  // We need to lock log_write_mutex_ since logs_ and alive_log_files might be
+  // pushed back concurrently
+  log_write_mutex_.Lock();
+  if (merged_batch == write_group.leader->batch) {
+    write_group.leader->log_used = logfile_number_;
+  } else if (write_with_wal > 1) {
+    for (auto writer : write_group) {
+      writer->log_used = logfile_number_;
+    }
+  }
+  *last_sequence = versions_->FetchAddLastToBeWrittenSequence(total_count);
+  auto sequence = *last_sequence + 1;
+  WriteBatchInternal::SetSequence(merged_batch, sequence);
+
+  log::Writer* log_writer = logs_.back().writer;
+  uint64_t log_size;
+  status = WriteToWAL(*merged_batch, log_writer, log_used, &log_size);
+  log_write_mutex_.Unlock();
+
+  if (status.ok()) {
+    const bool concurrent = true;
+    auto stats = default_cf_internal_stats_;
+    stats->AddDBStats(InternalStats::WAL_FILE_BYTES, log_size, concurrent);
+    RecordTick(stats_, WAL_FILE_BYTES, log_size);
+    stats->AddDBStats(InternalStats::WRITE_WITH_WAL, write_with_wal,
+                      concurrent);
+    RecordTick(stats_, WRITE_WITH_WAL, write_with_wal);
+  }
+  return status;
+}
+
+Status DBImpl::HandleWALFull(WriteContext* write_context) {
+  mutex_.AssertHeld();
+  assert(write_context != nullptr);
+  Status status;
+
+  if (alive_log_files_.begin()->getting_flushed) {
+    return status;
+  }
+
+  auto oldest_alive_log = alive_log_files_.begin()->number;
+  auto oldest_log_with_uncommited_prep = FindMinLogContainingOutstandingPrep();
+
+  if (allow_2pc() &&
+      oldest_log_with_uncommited_prep > 0 &&
+      oldest_log_with_uncommited_prep <= oldest_alive_log) {
+    if (unable_to_flush_oldest_log_) {
+        // we already attempted to flush all column families dependent on
+        // the oldest alive log but the log still contained uncommited transactions.
+        // the oldest alive log STILL contains uncommited transaction so there
+        // is still nothing that we can do.
+        return status;
+    } else {
+      ROCKS_LOG_WARN(
+          immutable_db_options_.info_log,
+          "Unable to release oldest log due to uncommited transaction");
+      unable_to_flush_oldest_log_ = true;
+    }
+  } else {
+    // we only mark this log as getting flushed if we have successfully
+    // flushed all data in this log. If this log contains outstanding prepared
+    // transactions then we cannot flush this log until those transactions are commited.
+    unable_to_flush_oldest_log_ = false;
+    alive_log_files_.begin()->getting_flushed = true;
+  }
+
+  ROCKS_LOG_INFO(immutable_db_options_.info_log,
+                 "Flushing all column families with data in WAL number %" PRIu64
+                 ". Total log size is %" PRIu64
+                 " while max_total_wal_size is %" PRIu64,
+                 oldest_alive_log, total_log_size_.load(), GetMaxTotalWalSize());
+  // no need to refcount because drop is happening in write thread, so can't
+  // happen while we're in the write thread
+  for (auto cfd : *versions_->GetColumnFamilySet()) {
+    if (cfd->IsDropped()) {
+      continue;
+    }
+    if (cfd->OldestLogToKeep() <= oldest_alive_log) {
+      status = SwitchMemtable(cfd, write_context);
+      if (!status.ok()) {
+        break;
+      }
+      cfd->imm()->FlushRequested();
+      SchedulePendingFlush(cfd);
+    }
+  }
+  MaybeScheduleFlushOrCompaction();
+  return status;
+}
+
+Status DBImpl::HandleWriteBufferFull(WriteContext* write_context) {
+  mutex_.AssertHeld();
+  assert(write_context != nullptr);
+  Status status;
+
+  // Before a new memtable is added in SwitchMemtable(),
+  // write_buffer_manager_->ShouldFlush() will keep returning true. If another
+  // thread is writing to another DB with the same write buffer, they may also
+  // be flushed. We may end up with flushing much more DBs than needed. It's
+  // suboptimal but still correct.
+  ROCKS_LOG_INFO(
+      immutable_db_options_.info_log,
+      "Flushing column family with largest mem table size. Write buffer is "
+      "using %" PRIu64 " bytes out of a total of %" PRIu64 ".",
+      write_buffer_manager_->memory_usage(),
+      write_buffer_manager_->buffer_size());
+  // no need to refcount because drop is happening in write thread, so can't
+  // happen while we're in the write thread
+  ColumnFamilyData* cfd_picked = nullptr;
+  SequenceNumber seq_num_for_cf_picked = kMaxSequenceNumber;
+
+  for (auto cfd : *versions_->GetColumnFamilySet()) {
+    if (cfd->IsDropped()) {
+      continue;
+    }
+    if (!cfd->mem()->IsEmpty()) {
+      // We only consider active mem table, hoping immutable memtable is
+      // already in the process of flushing.
+      uint64_t seq = cfd->mem()->GetCreationSeq();
+      if (cfd_picked == nullptr || seq < seq_num_for_cf_picked) {
+        cfd_picked = cfd;
+        seq_num_for_cf_picked = seq;
+      }
+    }
+  }
+  if (cfd_picked != nullptr) {
+    status = SwitchMemtable(cfd_picked, write_context);
+    if (status.ok()) {
+      cfd_picked->imm()->FlushRequested();
+      SchedulePendingFlush(cfd_picked);
+      MaybeScheduleFlushOrCompaction();
+    }
+  }
+  return status;
+}
+
+uint64_t DBImpl::GetMaxTotalWalSize() const {
+  mutex_.AssertHeld();
+  return mutable_db_options_.max_total_wal_size == 0
+             ? 4 * max_total_in_memory_state_
+             : mutable_db_options_.max_total_wal_size;
+}
+
+// REQUIRES: mutex_ is held
+// REQUIRES: this thread is currently at the front of the writer queue
+Status DBImpl::DelayWrite(uint64_t num_bytes,
+                          const WriteOptions& write_options) {
+  uint64_t time_delayed = 0;
+  bool delayed = false;
+  {
+    StopWatch sw(env_, stats_, WRITE_STALL, &time_delayed);
+    uint64_t delay = write_controller_.GetDelay(env_, num_bytes);
+    if (delay > 0) {
+      if (write_options.no_slowdown) {
+        return Status::Incomplete();
+      }
+      TEST_SYNC_POINT("DBImpl::DelayWrite:Sleep");
+
+      mutex_.Unlock();
+      // We will delay the write until we have slept for delay ms or
+      // we don't need a delay anymore
+      const uint64_t kDelayInterval = 1000;
+      uint64_t stall_end = sw.start_time() + delay;
+      while (write_controller_.NeedsDelay()) {
+        if (env_->NowMicros() >= stall_end) {
+          // We already delayed this write `delay` microseconds
+          break;
+        }
+
+        delayed = true;
+        // Sleep for 0.001 seconds
+        env_->SleepForMicroseconds(kDelayInterval);
+      }
+      mutex_.Lock();
+    }
+
+    while (bg_error_.ok() && write_controller_.IsStopped()) {
+      if (write_options.no_slowdown) {
+        return Status::Incomplete();
+      }
+      delayed = true;
+      TEST_SYNC_POINT("DBImpl::DelayWrite:Wait");
+      bg_cv_.Wait();
+    }
+  }
+  assert(!delayed || !write_options.no_slowdown);
+  if (delayed) {
+    default_cf_internal_stats_->AddDBStats(InternalStats::WRITE_STALL_MICROS,
+                                           time_delayed);
+    RecordTick(stats_, STALL_MICROS, time_delayed);
+  }
+
+  return bg_error_;
+}
+
+Status DBImpl::ThrottleLowPriWritesIfNeeded(const WriteOptions& write_options,
+                                            WriteBatch* my_batch) {
+  assert(write_options.low_pri);
+  // This is called outside the DB mutex. Although it is safe to make the call,
+  // the consistency condition is not guaranteed to hold. It's OK to live with
+  // it in this case.
+  // If we need to speed compaction, it means the compaction is left behind
+  // and we start to limit low pri writes to a limit.
+  if (write_controller_.NeedSpeedupCompaction()) {
+    if (allow_2pc() && (my_batch->HasCommit() || my_batch->HasRollback())) {
+      // For 2PC, we only rate limit prepare, not commit.
+      return Status::OK();
+    }
+    if (write_options.no_slowdown) {
+      return Status::Incomplete();
+    } else {
+      assert(my_batch != nullptr);
+      // Rate limit those writes. The reason that we don't completely wait
+      // is that in case the write is heavy, low pri writes may never have
+      // a chance to run. Now we guarantee we are still slowly making
+      // progress.
+      write_controller_.low_pri_rate_limiter()->Request(
+          my_batch->GetDataSize(), Env::IO_HIGH, nullptr /* stats */,
+          RateLimiter::OpType::kWrite);
+    }
+  }
+  return Status::OK();
+}
+
+Status DBImpl::ScheduleFlushes(WriteContext* context) {
+  ColumnFamilyData* cfd;
+  while ((cfd = flush_scheduler_.TakeNextColumnFamily()) != nullptr) {
+    auto status = SwitchMemtable(cfd, context);
+    if (cfd->Unref()) {
+      delete cfd;
+    }
+    if (!status.ok()) {
+      return status;
+    }
+  }
+  return Status::OK();
+}
+
+#ifndef ROCKSDB_LITE
+void DBImpl::NotifyOnMemTableSealed(ColumnFamilyData* cfd,
+                                    const MemTableInfo& mem_table_info) {
+  if (immutable_db_options_.listeners.size() == 0U) {
+    return;
+  }
+  if (shutting_down_.load(std::memory_order_acquire)) {
+    return;
+  }
+
+  for (auto listener : immutable_db_options_.listeners) {
+    listener->OnMemTableSealed(mem_table_info);
+  }
+}
+#endif  // ROCKSDB_LITE
+
+// REQUIRES: mutex_ is held
+// REQUIRES: this thread is currently at the front of the writer queue
+Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
+  mutex_.AssertHeld();
+  WriteThread::Writer nonmem_w;
+  if (concurrent_prepare_) {
+    // SwitchMemtable is a rare event. To simply the reasoning, we make sure
+    // that there is no concurrent thread writing to WAL.
+    nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
+  }
+
+  unique_ptr<WritableFile> lfile;
+  log::Writer* new_log = nullptr;
+  MemTable* new_mem = nullptr;
+
+  // In case of pipelined write is enabled, wait for all pending memtable
+  // writers.
+  if (immutable_db_options_.enable_pipelined_write) {
+    write_thread_.WaitForMemTableWriters();
+  }
+
+  // Attempt to switch to a new memtable and trigger flush of old.
+  // Do this without holding the dbmutex lock.
+  assert(versions_->prev_log_number() == 0);
+  if (concurrent_prepare_) {
+    log_write_mutex_.Lock();
+  }
+  bool creating_new_log = !log_empty_;
+  if (concurrent_prepare_) {
+    log_write_mutex_.Unlock();
+  }
+  uint64_t recycle_log_number = 0;
+  if (creating_new_log && immutable_db_options_.recycle_log_file_num &&
+      !log_recycle_files.empty()) {
+    recycle_log_number = log_recycle_files.front();
+    log_recycle_files.pop_front();
+  }
+  uint64_t new_log_number =
+      creating_new_log ? versions_->NewFileNumber() : logfile_number_;
+  SuperVersion* new_superversion = nullptr;
+  const MutableCFOptions mutable_cf_options = *cfd->GetLatestMutableCFOptions();
+
+  // Set current_memtble_info for memtable sealed callback
+#ifndef ROCKSDB_LITE
+  MemTableInfo memtable_info;
+  memtable_info.cf_name = cfd->GetName();
+  memtable_info.first_seqno = cfd->mem()->GetFirstSequenceNumber();
+  memtable_info.earliest_seqno = cfd->mem()->GetEarliestSequenceNumber();
+  memtable_info.num_entries = cfd->mem()->num_entries();
+  memtable_info.num_deletes = cfd->mem()->num_deletes();
+#endif  // ROCKSDB_LITE
+  // Log this later after lock release. It may be outdated, e.g., if background
+  // flush happens before logging, but that should be ok.
+  int num_imm_unflushed = cfd->imm()->NumNotFlushed();
+  DBOptions db_options =
+      BuildDBOptions(immutable_db_options_, mutable_db_options_);
+  const auto preallocate_block_size =
+    GetWalPreallocateBlockSize(mutable_cf_options.write_buffer_size);
+  mutex_.Unlock();
+  Status s;
+  {
+    if (creating_new_log) {
+      EnvOptions opt_env_opt =
+          env_->OptimizeForLogWrite(env_options_, db_options);
+      if (recycle_log_number) {
+        ROCKS_LOG_INFO(immutable_db_options_.info_log,
+                       "reusing log %" PRIu64 " from recycle list\n",
+                       recycle_log_number);
+        s = env_->ReuseWritableFile(
+            LogFileName(immutable_db_options_.wal_dir, new_log_number),
+            LogFileName(immutable_db_options_.wal_dir, recycle_log_number),
+            &lfile, opt_env_opt);
+      } else {
+        s = NewWritableFile(
+            env_, LogFileName(immutable_db_options_.wal_dir, new_log_number),
+            &lfile, opt_env_opt);
+      }
+      if (s.ok()) {
+        // Our final size should be less than write_buffer_size
+        // (compression, etc) but err on the side of caution.
+
+        // use preallocate_block_size instead
+        // of calling GetWalPreallocateBlockSize()
+        lfile->SetPreallocationBlockSize(preallocate_block_size);
+        unique_ptr<WritableFileWriter> file_writer(
+            new WritableFileWriter(std::move(lfile), opt_env_opt));
+        new_log = new log::Writer(
+            std::move(file_writer), new_log_number,
+            immutable_db_options_.recycle_log_file_num > 0, manual_wal_flush_);
+      }
+    }
+
+    if (s.ok()) {
+      SequenceNumber seq = versions_->LastSequence();
+      new_mem = cfd->ConstructNewMemtable(mutable_cf_options, seq);
+      new_superversion = new SuperVersion();
+    }
+
+#ifndef ROCKSDB_LITE
+    // PLEASE NOTE: We assume that there are no failable operations
+    // after lock is acquired below since we are already notifying
+    // client about mem table becoming immutable.
+    NotifyOnMemTableSealed(cfd, memtable_info);
+#endif //ROCKSDB_LITE
+  }
+  ROCKS_LOG_INFO(immutable_db_options_.info_log,
+                 "[%s] New memtable created with log file: #%" PRIu64
+                 ". Immutable memtables: %d.\n",
+                 cfd->GetName().c_str(), new_log_number, num_imm_unflushed);
+  mutex_.Lock();
+  if (!s.ok()) {
+    // how do we fail if we're not creating new log?
+    assert(creating_new_log);
+    assert(!new_mem);
+    assert(!new_log);
+    if (concurrent_prepare_) {
+      nonmem_write_thread_.ExitUnbatched(&nonmem_w);
+    }
+    return s;
+  }
+  if (creating_new_log) {
+    log_write_mutex_.Lock();
+    logfile_number_ = new_log_number;
+    assert(new_log != nullptr);
+    log_empty_ = true;
+    log_dir_synced_ = false;
+    if (!logs_.empty()) {
+      // Alway flush the buffer of the last log before switching to a new one
+      log::Writer* cur_log_writer = logs_.back().writer;
+      cur_log_writer->WriteBuffer();
+    }
+    logs_.emplace_back(logfile_number_, new_log);
+    alive_log_files_.push_back(LogFileNumberSize(logfile_number_));
+    log_write_mutex_.Unlock();
+  }
+  for (auto loop_cfd : *versions_->GetColumnFamilySet()) {
+    // all this is just optimization to delete logs that
+    // are no longer needed -- if CF is empty, that means it
+    // doesn't need that particular log to stay alive, so we just
+    // advance the log number. no need to persist this in the manifest
+    if (loop_cfd->mem()->GetFirstSequenceNumber() == 0 &&
+        loop_cfd->imm()->NumNotFlushed() == 0) {
+      if (creating_new_log) {
+        loop_cfd->SetLogNumber(logfile_number_);
+      }
+      loop_cfd->mem()->SetCreationSeq(versions_->LastSequence());
+    }
+  }
+
+  cfd->mem()->SetNextLogNumber(logfile_number_);
+  cfd->imm()->Add(cfd->mem(), &context->memtables_to_free_);
+  new_mem->Ref();
+  cfd->SetMemtable(new_mem);
+  context->superversions_to_free_.push_back(InstallSuperVersionAndScheduleWork(
+      cfd, new_superversion, mutable_cf_options));
+  if (concurrent_prepare_) {
+    nonmem_write_thread_.ExitUnbatched(&nonmem_w);
+  }
+  return s;
+}
+
+size_t DBImpl::GetWalPreallocateBlockSize(uint64_t write_buffer_size) const {
+  mutex_.AssertHeld();
+  size_t bsize = write_buffer_size / 10 + write_buffer_size;
+  // Some users might set very high write_buffer_size and rely on
+  // max_total_wal_size or other parameters to control the WAL size.
+  if (mutable_db_options_.max_total_wal_size > 0) {
+    bsize = std::min<size_t>(bsize, mutable_db_options_.max_total_wal_size);
+  }
+  if (immutable_db_options_.db_write_buffer_size > 0) {
+    bsize = std::min<size_t>(bsize, immutable_db_options_.db_write_buffer_size);
+  }
+  if (immutable_db_options_.write_buffer_manager &&
+      immutable_db_options_.write_buffer_manager->enabled()) {
+    bsize = std::min<size_t>(
+        bsize, immutable_db_options_.write_buffer_manager->buffer_size());
+  }
+
+  return bsize;
+}
+
+// Default implementations of convenience methods that subclasses of DB
+// can call if they wish
+Status DB::Put(const WriteOptions& opt, ColumnFamilyHandle* column_family,
+               const Slice& key, const Slice& value) {
+  // Pre-allocate size of write batch conservatively.
+  // 8 bytes are taken by header, 4 bytes for count, 1 byte for type,
+  // and we allocate 11 extra bytes for key length, as well as value length.
+  WriteBatch batch(key.size() + value.size() + 24);
+  batch.Put(column_family, key, value);
+  return Write(opt, &batch);
+}
+
+Status DB::Delete(const WriteOptions& opt, ColumnFamilyHandle* column_family,
+                  const Slice& key) {
+  WriteBatch batch;
+  batch.Delete(column_family, key);
+  return Write(opt, &batch);
+}
+
+Status DB::SingleDelete(const WriteOptions& opt,
+                        ColumnFamilyHandle* column_family, const Slice& key) {
+  WriteBatch batch;
+  batch.SingleDelete(column_family, key);
+  return Write(opt, &batch);
+}
+
+Status DB::DeleteRange(const WriteOptions& opt,
+                       ColumnFamilyHandle* column_family,
+                       const Slice& begin_key, const Slice& end_key) {
+  WriteBatch batch;
+  batch.DeleteRange(column_family, begin_key, end_key);
+  return Write(opt, &batch);
+}
+
+Status DB::Merge(const WriteOptions& opt, ColumnFamilyHandle* column_family,
+                 const Slice& key, const Slice& value) {
+  WriteBatch batch;
+  batch.Merge(column_family, key, value);
+  return Write(opt, &batch);
+}
+}  // namespace rocksdb

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/db_info_dumper.cc
----------------------------------------------------------------------
diff --git a/thirdparty/rocksdb/db/db_info_dumper.cc b/thirdparty/rocksdb/db/db_info_dumper.cc
new file mode 100644
index 0000000..1668a16
--- /dev/null
+++ b/thirdparty/rocksdb/db/db_info_dumper.cc
@@ -0,0 +1,127 @@
+//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
+
+#ifndef __STDC_FORMAT_MACROS
+#define __STDC_FORMAT_MACROS
+#endif
+
+#include "db/db_info_dumper.h"
+
+#include <inttypes.h>
+#include <stdio.h>
+#include <string>
+#include <algorithm>
+#include <vector>
+
+#include "rocksdb/env.h"
+#include "util/filename.h"
+
+namespace rocksdb {
+
+void DumpDBFileSummary(const ImmutableDBOptions& options,
+                       const std::string& dbname) {
+  if (options.info_log == nullptr) {
+    return;
+  }
+
+  auto* env = options.env;
+  uint64_t number = 0;
+  FileType type = kInfoLogFile;
+
+  std::vector<std::string> files;
+  uint64_t file_num = 0;
+  uint64_t file_size;
+  std::string file_info, wal_info;
+
+  Header(options.info_log, "DB SUMMARY\n");
+  // Get files in dbname dir
+  if (!env->GetChildren(dbname, &files).ok()) {
+    Error(options.info_log,
+          "Error when reading %s dir\n", dbname.c_str());
+  }
+  std::sort(files.begin(), files.end());
+  for (std::string file : files) {
+    if (!ParseFileName(file, &number, &type)) {
+      continue;
+    }
+    switch (type) {
+      case kCurrentFile:
+        Header(options.info_log, "CURRENT file:  %s\n", file.c_str());
+        break;
+      case kIdentityFile:
+        Header(options.info_log, "IDENTITY file:  %s\n", file.c_str());
+        break;
+      case kDescriptorFile:
+        env->GetFileSize(dbname + "/" + file, &file_size);
+        Header(options.info_log, "MANIFEST file:  %s size: %" PRIu64 " Bytes\n",
+               file.c_str(), file_size);
+        break;
+      case kLogFile:
+        env->GetFileSize(dbname + "/" + file, &file_size);
+        char str[16];
+        snprintf(str, sizeof(str), "%" PRIu64, file_size);
+        wal_info.append(file).append(" size: ").
+            append(str).append(" ; ");
+        break;
+      case kTableFile:
+        if (++file_num < 10) {
+          file_info.append(file).append(" ");
+        }
+        break;
+      default:
+        break;
+    }
+  }
+
+  // Get sst files in db_path dir
+  for (auto& db_path : options.db_paths) {
+    if (dbname.compare(db_path.path) != 0) {
+      if (!env->GetChildren(db_path.path, &files).ok()) {
+        Error(options.info_log,
+            "Error when reading %s dir\n",
+            db_path.path.c_str());
+        continue;
+      }
+      std::sort(files.begin(), files.end());
+      for (std::string file : files) {
+        if (ParseFileName(file, &number, &type)) {
+          if (type == kTableFile && ++file_num < 10) {
+            file_info.append(file).append(" ");
+          }
+        }
+      }
+    }
+    Header(options.info_log,
+           "SST files in %s dir, Total Num: %" PRIu64 ", files: %s\n",
+           db_path.path.c_str(), file_num, file_info.c_str());
+    file_num = 0;
+    file_info.clear();
+  }
+
+  // Get wal file in wal_dir
+  if (dbname.compare(options.wal_dir) != 0) {
+    if (!env->GetChildren(options.wal_dir, &files).ok()) {
+      Error(options.info_log,
+          "Error when reading %s dir\n",
+          options.wal_dir.c_str());
+      return;
+    }
+    wal_info.clear();
+    for (std::string file : files) {
+      if (ParseFileName(file, &number, &type)) {
+        if (type == kLogFile) {
+          env->GetFileSize(options.wal_dir + "/" + file, &file_size);
+          char str[16];
+          snprintf(str, sizeof(str), "%" PRIu64, file_size);
+          wal_info.append(file).append(" size: ").
+              append(str).append(" ; ");
+        }
+      }
+    }
+  }
+  Header(options.info_log, "Write Ahead Log file in %s: %s\n",
+         options.wal_dir.c_str(), wal_info.c_str());
+}
+}  // namespace rocksdb

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/db_info_dumper.h
----------------------------------------------------------------------
diff --git a/thirdparty/rocksdb/db/db_info_dumper.h b/thirdparty/rocksdb/db/db_info_dumper.h
new file mode 100644
index 0000000..acff8f1
--- /dev/null
+++ b/thirdparty/rocksdb/db/db_info_dumper.h
@@ -0,0 +1,14 @@
+//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
+#pragma once
+
+#include <string>
+
+#include "options/db_options.h"
+
+namespace rocksdb {
+void DumpDBFileSummary(const ImmutableDBOptions& options,
+                       const std::string& dbname);
+}  // namespace rocksdb


Mime
View raw message