nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jeremyd...@apache.org
Subject [13/51] [partial] nifi-minifi-cpp git commit: MINIFI-372: Replace leveldb with RocksDB
Date Mon, 09 Oct 2017 16:24:53 GMT
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/log_reader.cc
----------------------------------------------------------------------
diff --git a/thirdparty/rocksdb/db/log_reader.cc b/thirdparty/rocksdb/db/log_reader.cc
new file mode 100644
index 0000000..cae5d8e
--- /dev/null
+++ b/thirdparty/rocksdb/db/log_reader.cc
@@ -0,0 +1,432 @@
+//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#include "db/log_reader.h"
+
+#include <stdio.h>
+#include "rocksdb/env.h"
+#include "util/coding.h"
+#include "util/crc32c.h"
+#include "util/file_reader_writer.h"
+
+namespace rocksdb {
+namespace log {
+
+Reader::Reporter::~Reporter() {
+}
+
+Reader::Reader(std::shared_ptr<Logger> info_log,
+               unique_ptr<SequentialFileReader>&& _file, Reporter* reporter,
+               bool checksum, uint64_t initial_offset, uint64_t log_num)
+    : info_log_(info_log),
+      file_(std::move(_file)),
+      reporter_(reporter),
+      checksum_(checksum),
+      backing_store_(new char[kBlockSize]),
+      buffer_(),
+      eof_(false),
+      read_error_(false),
+      eof_offset_(0),
+      last_record_offset_(0),
+      end_of_buffer_offset_(0),
+      initial_offset_(initial_offset),
+      log_number_(log_num),
+      recycled_(false) {}
+
+Reader::~Reader() {
+  delete[] backing_store_;
+}
+
+bool Reader::SkipToInitialBlock() {
+  size_t initial_offset_in_block = initial_offset_ % kBlockSize;
+  uint64_t block_start_location = initial_offset_ - initial_offset_in_block;
+
+  // Don't search a block if we'd be in the trailer
+  if (initial_offset_in_block > kBlockSize - 6) {
+    block_start_location += kBlockSize;
+  }
+
+  end_of_buffer_offset_ = block_start_location;
+
+  // Skip to start of first block that can contain the initial record
+  if (block_start_location > 0) {
+    Status skip_status = file_->Skip(block_start_location);
+    if (!skip_status.ok()) {
+      ReportDrop(static_cast<size_t>(block_start_location), skip_status);
+      return false;
+    }
+  }
+
+  return true;
+}
+
+// For kAbsoluteConsistency, on clean shutdown we don't expect any error
+// in the log files.  For other modes, we can ignore only incomplete records
+// in the last log file, which are presumably due to a write in progress
+// during restart (or from log recycling).
+//
+// TODO krad: Evaluate if we need to move to a more strict mode where we
+// restrict the inconsistency to only the last log
+bool Reader::ReadRecord(Slice* record, std::string* scratch,
+                        WALRecoveryMode wal_recovery_mode) {
+  if (last_record_offset_ < initial_offset_) {
+    if (!SkipToInitialBlock()) {
+      return false;
+    }
+  }
+
+  scratch->clear();
+  record->clear();
+  bool in_fragmented_record = false;
+  // Record offset of the logical record that we're reading
+  // 0 is a dummy value to make compilers happy
+  uint64_t prospective_record_offset = 0;
+
+  Slice fragment;
+  while (true) {
+    uint64_t physical_record_offset = end_of_buffer_offset_ - buffer_.size();
+    size_t drop_size = 0;
+    const unsigned int record_type = ReadPhysicalRecord(&fragment, &drop_size);
+    switch (record_type) {
+      case kFullType:
+      case kRecyclableFullType:
+        if (in_fragmented_record && !scratch->empty()) {
+          // Handle bug in earlier versions of log::Writer where
+          // it could emit an empty kFirstType record at the tail end
+          // of a block followed by a kFullType or kFirstType record
+          // at the beginning of the next block.
+          ReportCorruption(scratch->size(), "partial record without end(1)");
+        }
+        prospective_record_offset = physical_record_offset;
+        scratch->clear();
+        *record = fragment;
+        last_record_offset_ = prospective_record_offset;
+        return true;
+
+      case kFirstType:
+      case kRecyclableFirstType:
+        if (in_fragmented_record && !scratch->empty()) {
+          // Handle bug in earlier versions of log::Writer where
+          // it could emit an empty kFirstType record at the tail end
+          // of a block followed by a kFullType or kFirstType record
+          // at the beginning of the next block.
+          ReportCorruption(scratch->size(), "partial record without end(2)");
+        }
+        prospective_record_offset = physical_record_offset;
+        scratch->assign(fragment.data(), fragment.size());
+        in_fragmented_record = true;
+        break;
+
+      case kMiddleType:
+      case kRecyclableMiddleType:
+        if (!in_fragmented_record) {
+          ReportCorruption(fragment.size(),
+                           "missing start of fragmented record(1)");
+        } else {
+          scratch->append(fragment.data(), fragment.size());
+        }
+        break;
+
+      case kLastType:
+      case kRecyclableLastType:
+        if (!in_fragmented_record) {
+          ReportCorruption(fragment.size(),
+                           "missing start of fragmented record(2)");
+        } else {
+          scratch->append(fragment.data(), fragment.size());
+          *record = Slice(*scratch);
+          last_record_offset_ = prospective_record_offset;
+          return true;
+        }
+        break;
+
+      case kBadHeader:
+        if (wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency) {
+          // in clean shutdown we don't expect any error in the log files
+          ReportCorruption(drop_size, "truncated header");
+        }
+      // fall-thru
+
+      case kEof:
+        if (in_fragmented_record) {
+          if (wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency) {
+            // in clean shutdown we don't expect any error in the log files
+            ReportCorruption(scratch->size(), "error reading trailing data");
+          }
+          // This can be caused by the writer dying immediately after
+          //  writing a physical record but before completing the next; don't
+          //  treat it as a corruption, just ignore the entire logical record.
+          scratch->clear();
+        }
+        return false;
+
+      case kOldRecord:
+        if (wal_recovery_mode != WALRecoveryMode::kSkipAnyCorruptedRecords) {
+          // Treat a record from a previous instance of the log as EOF.
+          if (in_fragmented_record) {
+            if (wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency) {
+              // in clean shutdown we don't expect any error in the log files
+              ReportCorruption(scratch->size(), "error reading trailing data");
+            }
+            // This can be caused by the writer dying immediately after
+            //  writing a physical record but before completing the next; don't
+            //  treat it as a corruption, just ignore the entire logical record.
+            scratch->clear();
+          }
+          return false;
+        }
+      // fall-thru
+
+      case kBadRecord:
+        if (in_fragmented_record) {
+          ReportCorruption(scratch->size(), "error in middle of record");
+          in_fragmented_record = false;
+          scratch->clear();
+        }
+        break;
+
+      case kBadRecordLen:
+      case kBadRecordChecksum:
+        if (recycled_ &&
+            wal_recovery_mode ==
+                WALRecoveryMode::kTolerateCorruptedTailRecords) {
+          scratch->clear();
+          return false;
+        }
+        if (record_type == kBadRecordLen) {
+          ReportCorruption(drop_size, "bad record length");
+        } else {
+          ReportCorruption(drop_size, "checksum mismatch");
+        }
+        if (in_fragmented_record) {
+          ReportCorruption(scratch->size(), "error in middle of record");
+          in_fragmented_record = false;
+          scratch->clear();
+        }
+        break;
+
+      default: {
+        char buf[40];
+        snprintf(buf, sizeof(buf), "unknown record type %u", record_type);
+        ReportCorruption(
+            (fragment.size() + (in_fragmented_record ? scratch->size() : 0)),
+            buf);
+        in_fragmented_record = false;
+        scratch->clear();
+        break;
+      }
+    }
+  }
+  return false;
+}
+
+uint64_t Reader::LastRecordOffset() {
+  return last_record_offset_;
+}
+
+void Reader::UnmarkEOF() {
+  if (read_error_) {
+    return;
+  }
+
+  eof_ = false;
+
+  if (eof_offset_ == 0) {
+    return;
+  }
+
+  // If the EOF was in the middle of a block (a partial block was read) we have
+  // to read the rest of the block as ReadPhysicalRecord can only read full
+  // blocks and expects the file position indicator to be aligned to the start
+  // of a block.
+  //
+  //      consumed_bytes + buffer_size() + remaining == kBlockSize
+
+  size_t consumed_bytes = eof_offset_ - buffer_.size();
+  size_t remaining = kBlockSize - eof_offset_;
+
+  // backing_store_ is used to concatenate what is left in buffer_ and
+  // the remainder of the block. If buffer_ already uses backing_store_,
+  // we just append the new data.
+  if (buffer_.data() != backing_store_ + consumed_bytes) {
+    // Buffer_ does not use backing_store_ for storage.
+    // Copy what is left in buffer_ to backing_store.
+    memmove(backing_store_ + consumed_bytes, buffer_.data(), buffer_.size());
+  }
+
+  Slice read_buffer;
+  Status status = file_->Read(remaining, &read_buffer,
+    backing_store_ + eof_offset_);
+
+  size_t added = read_buffer.size();
+  end_of_buffer_offset_ += added;
+
+  if (!status.ok()) {
+    if (added > 0) {
+      ReportDrop(added, status);
+    }
+
+    read_error_ = true;
+    return;
+  }
+
+  if (read_buffer.data() != backing_store_ + eof_offset_) {
+    // Read did not write to backing_store_
+    memmove(backing_store_ + eof_offset_, read_buffer.data(),
+      read_buffer.size());
+  }
+
+  buffer_ = Slice(backing_store_ + consumed_bytes,
+    eof_offset_ + added - consumed_bytes);
+
+  if (added < remaining) {
+    eof_ = true;
+    eof_offset_ += added;
+  } else {
+    eof_offset_ = 0;
+  }
+}
+
+void Reader::ReportCorruption(size_t bytes, const char* reason) {
+  ReportDrop(bytes, Status::Corruption(reason));
+}
+
+void Reader::ReportDrop(size_t bytes, const Status& reason) {
+  if (reporter_ != nullptr &&
+      end_of_buffer_offset_ - buffer_.size() - bytes >= initial_offset_) {
+    reporter_->Corruption(bytes, reason);
+  }
+}
+
+bool Reader::ReadMore(size_t* drop_size, int *error) {
+  if (!eof_ && !read_error_) {
+    // Last read was a full read, so this is a trailer to skip
+    buffer_.clear();
+    Status status = file_->Read(kBlockSize, &buffer_, backing_store_);
+    end_of_buffer_offset_ += buffer_.size();
+    if (!status.ok()) {
+      buffer_.clear();
+      ReportDrop(kBlockSize, status);
+      read_error_ = true;
+      *error = kEof;
+      return false;
+    } else if (buffer_.size() < (size_t)kBlockSize) {
+      eof_ = true;
+      eof_offset_ = buffer_.size();
+    }
+    return true;
+  } else {
+    // Note that if buffer_ is non-empty, we have a truncated header at the
+    //  end of the file, which can be caused by the writer crashing in the
+    //  middle of writing the header. Unless explicitly requested we don't
+    //  considering this an error, just report EOF.
+    if (buffer_.size()) {
+      *drop_size = buffer_.size();
+      buffer_.clear();
+      *error = kBadHeader;
+      return false;
+    }
+    buffer_.clear();
+    *error = kEof;
+    return false;
+  }
+}
+
+unsigned int Reader::ReadPhysicalRecord(Slice* result, size_t* drop_size) {
+  while (true) {
+    // We need at least the minimum header size
+    if (buffer_.size() < (size_t)kHeaderSize) {
+      int r;
+      if (!ReadMore(drop_size, &r)) {
+        return r;
+      }
+      continue;
+    }
+
+    // Parse the header
+    const char* header = buffer_.data();
+    const uint32_t a = static_cast<uint32_t>(header[4]) & 0xff;
+    const uint32_t b = static_cast<uint32_t>(header[5]) & 0xff;
+    const unsigned int type = header[6];
+    const uint32_t length = a | (b << 8);
+    int header_size = kHeaderSize;
+    if (type >= kRecyclableFullType && type <= kRecyclableLastType) {
+      if (end_of_buffer_offset_ - buffer_.size() == 0) {
+        recycled_ = true;
+      }
+      header_size = kRecyclableHeaderSize;
+      // We need enough for the larger header
+      if (buffer_.size() < (size_t)kRecyclableHeaderSize) {
+        int r;
+        if (!ReadMore(drop_size, &r)) {
+          return r;
+        }
+        continue;
+      }
+      const uint32_t log_num = DecodeFixed32(header + 7);
+      if (log_num != log_number_) {
+        return kOldRecord;
+      }
+    }
+    if (header_size + length > buffer_.size()) {
+      *drop_size = buffer_.size();
+      buffer_.clear();
+      if (!eof_) {
+        return kBadRecordLen;
+      }
+      // If the end of the file has been reached without reading |length| bytes
+      // of payload, assume the writer died in the middle of writing the record.
+      // Don't report a corruption unless requested.
+      if (*drop_size) {
+        return kBadHeader;
+      }
+      return kEof;
+    }
+
+    if (type == kZeroType && length == 0) {
+      // Skip zero length record without reporting any drops since
+      // such records are produced by the mmap based writing code in
+      // env_posix.cc that preallocates file regions.
+      // NOTE: this should never happen in DB written by new RocksDB versions,
+      // since we turn off mmap writes to manifest and log files
+      buffer_.clear();
+      return kBadRecord;
+    }
+
+    // Check crc
+    if (checksum_) {
+      uint32_t expected_crc = crc32c::Unmask(DecodeFixed32(header));
+      uint32_t actual_crc = crc32c::Value(header + 6, length + header_size - 6);
+      if (actual_crc != expected_crc) {
+        // Drop the rest of the buffer since "length" itself may have
+        // been corrupted and if we trust it, we could find some
+        // fragment of a real log record that just happens to look
+        // like a valid log record.
+        *drop_size = buffer_.size();
+        buffer_.clear();
+        return kBadRecordChecksum;
+      }
+    }
+
+    buffer_.remove_prefix(header_size + length);
+
+    // Skip physical record that started before initial_offset_
+    if (end_of_buffer_offset_ - buffer_.size() - header_size - length <
+        initial_offset_) {
+      result->clear();
+      return kBadRecord;
+    }
+
+    *result = Slice(header + header_size, length);
+    return type;
+  }
+}
+
+}  // namespace log
+}  // namespace rocksdb

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/log_reader.h
----------------------------------------------------------------------
diff --git a/thirdparty/rocksdb/db/log_reader.h b/thirdparty/rocksdb/db/log_reader.h
new file mode 100644
index 0000000..c6a471c
--- /dev/null
+++ b/thirdparty/rocksdb/db/log_reader.h
@@ -0,0 +1,160 @@
+//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#pragma once
+#include <memory>
+#include <stdint.h>
+
+#include "db/log_format.h"
+#include "rocksdb/slice.h"
+#include "rocksdb/status.h"
+#include "rocksdb/options.h"
+
+namespace rocksdb {
+
+class SequentialFileReader;
+class Logger;
+using std::unique_ptr;
+
+namespace log {
+
+/**
+ * Reader is a general purpose log stream reader implementation. The actual job
+ * of reading from the device is implemented by the SequentialFile interface.
+ *
+ * Please see Writer for details on the file and record layout.
+ */
+class Reader {
+ public:
+  // Interface for reporting errors.
+  class Reporter {
+   public:
+    virtual ~Reporter();
+
+    // Some corruption was detected.  "size" is the approximate number
+    // of bytes dropped due to the corruption.
+    virtual void Corruption(size_t bytes, const Status& status) = 0;
+  };
+
+  // Create a reader that will return log records from "*file".
+  // "*file" must remain live while this Reader is in use.
+  //
+  // If "reporter" is non-nullptr, it is notified whenever some data is
+  // dropped due to a detected corruption.  "*reporter" must remain
+  // live while this Reader is in use.
+  //
+  // If "checksum" is true, verify checksums if available.
+  //
+  // The Reader will start reading at the first record located at physical
+  // position >= initial_offset within the file.
+  Reader(std::shared_ptr<Logger> info_log,
+	 unique_ptr<SequentialFileReader>&& file,
+         Reporter* reporter, bool checksum, uint64_t initial_offset,
+         uint64_t log_num);
+
+  ~Reader();
+
+  // Read the next record into *record.  Returns true if read
+  // successfully, false if we hit end of the input.  May use
+  // "*scratch" as temporary storage.  The contents filled in *record
+  // will only be valid until the next mutating operation on this
+  // reader or the next mutation to *scratch.
+  bool ReadRecord(Slice* record, std::string* scratch,
+                  WALRecoveryMode wal_recovery_mode =
+                      WALRecoveryMode::kTolerateCorruptedTailRecords);
+
+  // Returns the physical offset of the last record returned by ReadRecord.
+  //
+  // Undefined before the first call to ReadRecord.
+  uint64_t LastRecordOffset();
+
+  // returns true if the reader has encountered an eof condition.
+  bool IsEOF() {
+    return eof_;
+  }
+
+  // when we know more data has been written to the file. we can use this
+  // function to force the reader to look again in the file.
+  // Also aligns the file position indicator to the start of the next block
+  // by reading the rest of the data from the EOF position to the end of the
+  // block that was partially read.
+  void UnmarkEOF();
+
+  SequentialFileReader* file() { return file_.get(); }
+
+ private:
+  std::shared_ptr<Logger> info_log_;
+  const unique_ptr<SequentialFileReader> file_;
+  Reporter* const reporter_;
+  bool const checksum_;
+  char* const backing_store_;
+  Slice buffer_;
+  bool eof_;   // Last Read() indicated EOF by returning < kBlockSize
+  bool read_error_;   // Error occurred while reading from file
+
+  // Offset of the file position indicator within the last block when an
+  // EOF was detected.
+  size_t eof_offset_;
+
+  // Offset of the last record returned by ReadRecord.
+  uint64_t last_record_offset_;
+  // Offset of the first location past the end of buffer_.
+  uint64_t end_of_buffer_offset_;
+
+  // Offset at which to start looking for the first record to return
+  uint64_t const initial_offset_;
+
+  // which log number this is
+  uint64_t const log_number_;
+
+  // Whether this is a recycled log file
+  bool recycled_;
+
+  // Extend record types with the following special values
+  enum {
+    kEof = kMaxRecordType + 1,
+    // Returned whenever we find an invalid physical record.
+    // Currently there are three situations in which this happens:
+    // * The record has an invalid CRC (ReadPhysicalRecord reports a drop)
+    // * The record is a 0-length record (No drop is reported)
+    // * The record is below constructor's initial_offset (No drop is reported)
+    kBadRecord = kMaxRecordType + 2,
+    // Returned when we fail to read a valid header.
+    kBadHeader = kMaxRecordType + 3,
+    // Returned when we read an old record from a previous user of the log.
+    kOldRecord = kMaxRecordType + 4,
+    // Returned when we get a bad record length
+    kBadRecordLen = kMaxRecordType + 5,
+    // Returned when we get a bad record checksum
+    kBadRecordChecksum = kMaxRecordType + 6,
+  };
+
+  // Skips all blocks that are completely before "initial_offset_".
+  //
+  // Returns true on success. Handles reporting.
+  bool SkipToInitialBlock();
+
+  // Return type, or one of the preceding special values
+  unsigned int ReadPhysicalRecord(Slice* result, size_t* drop_size);
+
+  // Read some more
+  bool ReadMore(size_t* drop_size, int *error);
+
+  // Reports dropped bytes to the reporter.
+  // buffer_ must be updated to remove the dropped bytes prior to invocation.
+  void ReportCorruption(size_t bytes, const char* reason);
+  void ReportDrop(size_t bytes, const Status& reason);
+
+  // No copying allowed
+  Reader(const Reader&);
+  void operator=(const Reader&);
+};
+
+}  // namespace log
+}  // namespace rocksdb

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/log_writer.cc
----------------------------------------------------------------------
diff --git a/thirdparty/rocksdb/db/log_writer.cc b/thirdparty/rocksdb/db/log_writer.cc
new file mode 100644
index 0000000..b02eec8
--- /dev/null
+++ b/thirdparty/rocksdb/db/log_writer.cc
@@ -0,0 +1,142 @@
+//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#include "db/log_writer.h"
+
+#include <stdint.h>
+#include "rocksdb/env.h"
+#include "util/coding.h"
+#include "util/crc32c.h"
+#include "util/file_reader_writer.h"
+
+namespace rocksdb {
+namespace log {
+
+Writer::Writer(unique_ptr<WritableFileWriter>&& dest, uint64_t log_number,
+               bool recycle_log_files, bool manual_flush)
+    : dest_(std::move(dest)),
+      block_offset_(0),
+      log_number_(log_number),
+      recycle_log_files_(recycle_log_files),
+      manual_flush_(manual_flush) {
+  for (int i = 0; i <= kMaxRecordType; i++) {
+    char t = static_cast<char>(i);
+    type_crc_[i] = crc32c::Value(&t, 1);
+  }
+}
+
+Writer::~Writer() { WriteBuffer(); }
+
+Status Writer::WriteBuffer() { return dest_->Flush(); }
+
+Status Writer::AddRecord(const Slice& slice) {
+  const char* ptr = slice.data();
+  size_t left = slice.size();
+
+  // Header size varies depending on whether we are recycling or not.
+  const int header_size =
+      recycle_log_files_ ? kRecyclableHeaderSize : kHeaderSize;
+
+  // Fragment the record if necessary and emit it.  Note that if slice
+  // is empty, we still want to iterate once to emit a single
+  // zero-length record
+  Status s;
+  bool begin = true;
+  do {
+    const int64_t leftover = kBlockSize - block_offset_;
+    assert(leftover >= 0);
+    if (leftover < header_size) {
+      // Switch to a new block
+      if (leftover > 0) {
+        // Fill the trailer (literal below relies on kHeaderSize and
+        // kRecyclableHeaderSize being <= 11)
+        assert(header_size <= 11);
+        dest_->Append(
+            Slice("\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00", leftover));
+      }
+      block_offset_ = 0;
+    }
+
+    // Invariant: we never leave < header_size bytes in a block.
+    assert(static_cast<int64_t>(kBlockSize - block_offset_) >= header_size);
+
+    const size_t avail = kBlockSize - block_offset_ - header_size;
+    const size_t fragment_length = (left < avail) ? left : avail;
+
+    RecordType type;
+    const bool end = (left == fragment_length);
+    if (begin && end) {
+      type = recycle_log_files_ ? kRecyclableFullType : kFullType;
+    } else if (begin) {
+      type = recycle_log_files_ ? kRecyclableFirstType : kFirstType;
+    } else if (end) {
+      type = recycle_log_files_ ? kRecyclableLastType : kLastType;
+    } else {
+      type = recycle_log_files_ ? kRecyclableMiddleType : kMiddleType;
+    }
+
+    s = EmitPhysicalRecord(type, ptr, fragment_length);
+    ptr += fragment_length;
+    left -= fragment_length;
+    begin = false;
+  } while (s.ok() && left > 0);
+  return s;
+}
+
+Status Writer::EmitPhysicalRecord(RecordType t, const char* ptr, size_t n) {
+  assert(n <= 0xffff);  // Must fit in two bytes
+
+  size_t header_size;
+  char buf[kRecyclableHeaderSize];
+
+  // Format the header
+  buf[4] = static_cast<char>(n & 0xff);
+  buf[5] = static_cast<char>(n >> 8);
+  buf[6] = static_cast<char>(t);
+
+  uint32_t crc = type_crc_[t];
+  if (t < kRecyclableFullType) {
+    // Legacy record format
+    assert(block_offset_ + kHeaderSize + n <= kBlockSize);
+    header_size = kHeaderSize;
+  } else {
+    // Recyclable record format
+    assert(block_offset_ + kRecyclableHeaderSize + n <= kBlockSize);
+    header_size = kRecyclableHeaderSize;
+
+    // Only encode low 32-bits of the 64-bit log number.  This means
+    // we will fail to detect an old record if we recycled a log from
+    // ~4 billion logs ago, but that is effectively impossible, and
+    // even if it were we'dbe far more likely to see a false positive
+    // on the 32-bit CRC.
+    EncodeFixed32(buf + 7, static_cast<uint32_t>(log_number_));
+    crc = crc32c::Extend(crc, buf + 7, 4);
+  }
+
+  // Compute the crc of the record type and the payload.
+  crc = crc32c::Extend(crc, ptr, n);
+  crc = crc32c::Mask(crc);  // Adjust for storage
+  EncodeFixed32(buf, crc);
+
+  // Write the header and the payload
+  Status s = dest_->Append(Slice(buf, header_size));
+  if (s.ok()) {
+    s = dest_->Append(Slice(ptr, n));
+    if (s.ok()) {
+      if (!manual_flush_) {
+        s = dest_->Flush();
+      }
+    }
+  }
+  block_offset_ += header_size + n;
+  return s;
+}
+
+}  // namespace log
+}  // namespace rocksdb

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/log_writer.h
----------------------------------------------------------------------
diff --git a/thirdparty/rocksdb/db/log_writer.h b/thirdparty/rocksdb/db/log_writer.h
new file mode 100644
index 0000000..a3a8799
--- /dev/null
+++ b/thirdparty/rocksdb/db/log_writer.h
@@ -0,0 +1,111 @@
+//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+#pragma once
+
+#include <stdint.h>
+
+#include <memory>
+
+#include "db/log_format.h"
+#include "rocksdb/slice.h"
+#include "rocksdb/status.h"
+
+namespace rocksdb {
+
+class WritableFileWriter;
+
+using std::unique_ptr;
+
+namespace log {
+
+/**
+ * Writer is a general purpose log stream writer. It provides an append-only
+ * abstraction for writing data. The details of the how the data is written is
+ * handled by the WriteableFile sub-class implementation.
+ *
+ * File format:
+ *
+ * File is broken down into variable sized records. The format of each record
+ * is described below.
+ *       +-----+-------------+--+----+----------+------+-- ... ----+
+ * File  | r0  |        r1   |P | r2 |    r3    |  r4  |           |
+ *       +-----+-------------+--+----+----------+------+-- ... ----+
+ *       <--- kBlockSize ------>|<-- kBlockSize ------>|
+ *  rn = variable size records
+ *  P = Padding
+ *
+ * Data is written out in kBlockSize chunks. If next record does not fit
+ * into the space left, the leftover space will be padded with \0.
+ *
+ * Legacy record format:
+ *
+ * +---------+-----------+-----------+--- ... ---+
+ * |CRC (4B) | Size (2B) | Type (1B) | Payload   |
+ * +---------+-----------+-----------+--- ... ---+
+ *
+ * CRC = 32bit hash computed over the payload using CRC
+ * Size = Length of the payload data
+ * Type = Type of record
+ *        (kZeroType, kFullType, kFirstType, kLastType, kMiddleType )
+ *        The type is used to group a bunch of records together to represent
+ *        blocks that are larger than kBlockSize
+ * Payload = Byte stream as long as specified by the payload size
+ *
+ * Recyclable record format:
+ *
+ * +---------+-----------+-----------+----------------+--- ... ---+
+ * |CRC (4B) | Size (2B) | Type (1B) | Log number (4B)| Payload   |
+ * +---------+-----------+-----------+----------------+--- ... ---+
+ *
+ * Same as above, with the addition of
+ * Log number = 32bit log file number, so that we can distinguish between
+ * records written by the most recent log writer vs a previous one.
+ */
+class Writer {
+ public:
+  // Create a writer that will append data to "*dest".
+  // "*dest" must be initially empty.
+  // "*dest" must remain live while this Writer is in use.
+  explicit Writer(unique_ptr<WritableFileWriter>&& dest, uint64_t log_number,
+                  bool recycle_log_files, bool manual_flush = false);
+  ~Writer();
+
+  Status AddRecord(const Slice& slice);
+
+  WritableFileWriter* file() { return dest_.get(); }
+  const WritableFileWriter* file() const { return dest_.get(); }
+
+  uint64_t get_log_number() const { return log_number_; }
+
+  Status WriteBuffer();
+
+ private:
+  unique_ptr<WritableFileWriter> dest_;
+  size_t block_offset_;       // Current offset in block
+  uint64_t log_number_;
+  bool recycle_log_files_;
+
+  // crc32c values for all supported record types.  These are
+  // pre-computed to reduce the overhead of computing the crc of the
+  // record type stored in the header.
+  uint32_t type_crc_[kMaxRecordType + 1];
+
+  Status EmitPhysicalRecord(RecordType type, const char* ptr, size_t length);
+
+  // If true, it does not flush after each write. Instead it relies on the upper
+  // layer to manually does the flush by calling ::WriteBuffer()
+  bool manual_flush_;
+
+  // No copying allowed
+  Writer(const Writer&);
+  void operator=(const Writer&);
+};
+
+}  // namespace log
+}  // namespace rocksdb

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/malloc_stats.cc
----------------------------------------------------------------------
diff --git a/thirdparty/rocksdb/db/malloc_stats.cc b/thirdparty/rocksdb/db/malloc_stats.cc
new file mode 100644
index 0000000..7acca65
--- /dev/null
+++ b/thirdparty/rocksdb/db/malloc_stats.cc
@@ -0,0 +1,52 @@
+//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#include "db/malloc_stats.h"
+
+#ifndef ROCKSDB_LITE
+#include <memory>
+#include <string.h>
+
+namespace rocksdb {
+
+#ifdef ROCKSDB_JEMALLOC
+#include "jemalloc/jemalloc.h"
+
+typedef struct {
+  char* cur;
+  char* end;
+} MallocStatus;
+
+static void GetJemallocStatus(void* mstat_arg, const char* status) {
+  MallocStatus* mstat = reinterpret_cast<MallocStatus*>(mstat_arg);
+  size_t status_len = status ? strlen(status) : 0;
+  size_t buf_size = (size_t)(mstat->end - mstat->cur);
+  if (!status_len || status_len > buf_size) {
+    return;
+  }
+
+  snprintf(mstat->cur, buf_size, "%s", status);
+  mstat->cur += status_len;
+}
+#endif  // ROCKSDB_JEMALLOC
+
+void DumpMallocStats(std::string* stats) {
+#ifdef ROCKSDB_JEMALLOC
+  MallocStatus mstat;
+  const unsigned int kMallocStatusLen = 1000000;
+  std::unique_ptr<char[]> buf{new char[kMallocStatusLen + 1]};
+  mstat.cur = buf.get();
+  mstat.end = buf.get() + kMallocStatusLen;
+  je_malloc_stats_print(GetJemallocStatus, &mstat, "");
+  stats->append(buf.get());
+#endif  // ROCKSDB_JEMALLOC
+}
+
+}
+#endif  // !ROCKSDB_LITE

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/malloc_stats.h
----------------------------------------------------------------------
diff --git a/thirdparty/rocksdb/db/malloc_stats.h b/thirdparty/rocksdb/db/malloc_stats.h
new file mode 100644
index 0000000..a2f324f
--- /dev/null
+++ b/thirdparty/rocksdb/db/malloc_stats.h
@@ -0,0 +1,22 @@
+//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#pragma once
+
+#ifndef ROCKSDB_LITE
+
+#include <string>
+
+namespace rocksdb {
+
+void DumpMallocStats(std::string*);
+
+}
+
+#endif  // !ROCKSDB_LITE

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/managed_iterator.cc
----------------------------------------------------------------------
diff --git a/thirdparty/rocksdb/db/managed_iterator.cc b/thirdparty/rocksdb/db/managed_iterator.cc
new file mode 100644
index 0000000..c393eb5
--- /dev/null
+++ b/thirdparty/rocksdb/db/managed_iterator.cc
@@ -0,0 +1,262 @@
+//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
+
+#ifndef ROCKSDB_LITE
+
+#include "db/managed_iterator.h"
+
+#include <limits>
+#include <string>
+#include <utility>
+
+#include "db/column_family.h"
+#include "db/db_impl.h"
+#include "db/db_iter.h"
+#include "db/dbformat.h"
+#include "rocksdb/env.h"
+#include "rocksdb/slice.h"
+#include "rocksdb/slice_transform.h"
+#include "table/merging_iterator.h"
+
+namespace rocksdb {
+
+namespace {
+// Helper class that locks a mutex on construction and unlocks the mutex when
+// the destructor of the MutexLock object is invoked.
+//
+// Typical usage:
+//
+//   void MyClass::MyMethod() {
+//     MILock l(&mu_);       // mu_ is an instance variable
+//     ... some complex code, possibly with multiple return paths ...
+//   }
+
+class MILock {
+ public:
+  explicit MILock(std::mutex* mu, ManagedIterator* mi) : mu_(mu), mi_(mi) {
+    this->mu_->lock();
+  }
+  ~MILock() {
+    this->mu_->unlock();
+  }
+  ManagedIterator* GetManagedIterator() { return mi_; }
+
+ private:
+  std::mutex* const mu_;
+  ManagedIterator* mi_;
+  // No copying allowed
+  MILock(const MILock&) = delete;
+  void operator=(const MILock&) = delete;
+};
+}  // anonymous namespace
+
+//
+// Synchronization between modifiers, releasers, creators
+// If iterator operation, wait till (!in_use), set in_use, do op, reset in_use
+//  if modifying mutable_iter, atomically exchange in_use:
+//  return if in_use set / otherwise set in use,
+//  atomically replace new iter with old , reset in use
+//  The releaser is the new operation and it holds a lock for a very short time
+//  The existing non-const iterator operations are supposed to be single
+//  threaded and hold the lock for the duration of the operation
+//  The existing const iterator operations use the cached key/values
+//  and don't do any locking.
+ManagedIterator::ManagedIterator(DBImpl* db, const ReadOptions& read_options,
+                                 ColumnFamilyData* cfd)
+    : db_(db),
+      read_options_(read_options),
+      cfd_(cfd),
+      svnum_(cfd->GetSuperVersionNumber()),
+      mutable_iter_(nullptr),
+      valid_(false),
+      snapshot_created_(false),
+      release_supported_(true) {
+  read_options_.managed = false;
+  if ((!read_options_.tailing) && (read_options_.snapshot == nullptr)) {
+    assert(nullptr != (read_options_.snapshot = db_->GetSnapshot()));
+    snapshot_created_ = true;
+  }
+  cfh_.SetCFD(cfd);
+  mutable_iter_ = unique_ptr<Iterator>(db->NewIterator(read_options_, &cfh_));
+}
+
+ManagedIterator::~ManagedIterator() {
+  Lock();
+  if (snapshot_created_) {
+    db_->ReleaseSnapshot(read_options_.snapshot);
+    snapshot_created_ = false;
+    read_options_.snapshot = nullptr;
+  }
+  UnLock();
+}
+
+bool ManagedIterator::Valid() const { return valid_; }
+
+void ManagedIterator::SeekToLast() {
+  MILock l(&in_use_, this);
+  if (NeedToRebuild()) {
+    RebuildIterator();
+  }
+  assert(mutable_iter_ != nullptr);
+  mutable_iter_->SeekToLast();
+  if (mutable_iter_->status().ok()) {
+    UpdateCurrent();
+  }
+}
+
+void ManagedIterator::SeekToFirst() {
+  MILock l(&in_use_, this);
+  SeekInternal(Slice(), true);
+}
+
+void ManagedIterator::Seek(const Slice& user_key) {
+  MILock l(&in_use_, this);
+  SeekInternal(user_key, false);
+}
+
+void ManagedIterator::SeekForPrev(const Slice& user_key) {
+  MILock l(&in_use_, this);
+  if (NeedToRebuild()) {
+    RebuildIterator();
+  }
+  assert(mutable_iter_ != nullptr);
+  mutable_iter_->SeekForPrev(user_key);
+  UpdateCurrent();
+}
+
+void ManagedIterator::SeekInternal(const Slice& user_key, bool seek_to_first) {
+  if (NeedToRebuild()) {
+    RebuildIterator();
+  }
+  assert(mutable_iter_ != nullptr);
+  if (seek_to_first) {
+    mutable_iter_->SeekToFirst();
+  } else {
+    mutable_iter_->Seek(user_key);
+  }
+  UpdateCurrent();
+}
+
+void ManagedIterator::Prev() {
+  if (!valid_) {
+    status_ = Status::InvalidArgument("Iterator value invalid");
+    return;
+  }
+  MILock l(&in_use_, this);
+  if (NeedToRebuild()) {
+    std::string current_key = key().ToString();
+    Slice old_key(current_key);
+    RebuildIterator();
+    SeekInternal(old_key, false);
+    UpdateCurrent();
+    if (!valid_) {
+      return;
+    }
+    if (key().compare(old_key) != 0) {
+      valid_ = false;
+      status_ = Status::Incomplete("Cannot do Prev now");
+      return;
+    }
+  }
+  mutable_iter_->Prev();
+  if (mutable_iter_->status().ok()) {
+    UpdateCurrent();
+    status_ = Status::OK();
+  } else {
+    status_ = mutable_iter_->status();
+  }
+}
+
+void ManagedIterator::Next() {
+  if (!valid_) {
+    status_ = Status::InvalidArgument("Iterator value invalid");
+    return;
+  }
+  MILock l(&in_use_, this);
+  if (NeedToRebuild()) {
+    std::string current_key = key().ToString();
+    Slice old_key(current_key.data(), cached_key_.Size());
+    RebuildIterator();
+    SeekInternal(old_key, false);
+    UpdateCurrent();
+    if (!valid_) {
+      return;
+    }
+    if (key().compare(old_key) != 0) {
+      valid_ = false;
+      status_ = Status::Incomplete("Cannot do Next now");
+      return;
+    }
+  }
+  mutable_iter_->Next();
+  UpdateCurrent();
+}
+
+Slice ManagedIterator::key() const {
+  assert(valid_);
+  return cached_key_.GetUserKey();
+}
+
+Slice ManagedIterator::value() const {
+  assert(valid_);
+  return cached_value_.GetUserKey();
+}
+
+Status ManagedIterator::status() const { return status_; }
+
+void ManagedIterator::RebuildIterator() {
+  svnum_ = cfd_->GetSuperVersionNumber();
+  mutable_iter_ = unique_ptr<Iterator>(db_->NewIterator(read_options_, &cfh_));
+}
+
+void ManagedIterator::UpdateCurrent() {
+  assert(mutable_iter_ != nullptr);
+
+  valid_ = mutable_iter_->Valid();
+  if (!valid_) {
+    status_ = mutable_iter_->status();
+    return;
+  }
+
+  status_ = Status::OK();
+  cached_key_.SetUserKey(mutable_iter_->key());
+  cached_value_.SetUserKey(mutable_iter_->value());
+}
+
+void ManagedIterator::ReleaseIter(bool only_old) {
+  if ((mutable_iter_ == nullptr) || (!release_supported_)) {
+    return;
+  }
+  if (svnum_ != cfd_->GetSuperVersionNumber() || !only_old) {
+    if (!TryLock()) {  // Don't release iter if in use
+      return;
+    }
+    mutable_iter_ = nullptr;  // in_use for a very short time
+    UnLock();
+  }
+}
+
+bool ManagedIterator::NeedToRebuild() {
+  if ((mutable_iter_ == nullptr) || (status_.IsIncomplete()) ||
+      (!only_drop_old_ && (svnum_ != cfd_->GetSuperVersionNumber()))) {
+    return true;
+  }
+  return false;
+}
+
+void ManagedIterator::Lock() {
+  in_use_.lock();
+  return;
+}
+
+bool ManagedIterator::TryLock() { return in_use_.try_lock(); }
+
+void ManagedIterator::UnLock() {
+  in_use_.unlock();
+}
+
+}  // namespace rocksdb
+
+#endif  // ROCKSDB_LITE

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/managed_iterator.h
----------------------------------------------------------------------
diff --git a/thirdparty/rocksdb/db/managed_iterator.h b/thirdparty/rocksdb/db/managed_iterator.h
new file mode 100644
index 0000000..8e962f7
--- /dev/null
+++ b/thirdparty/rocksdb/db/managed_iterator.h
@@ -0,0 +1,85 @@
+//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
+#pragma once
+
+#ifndef ROCKSDB_LITE
+
+#include <mutex>
+#include <queue>
+#include <string>
+#include <vector>
+
+#include "db/column_family.h"
+#include "rocksdb/db.h"
+#include "rocksdb/iterator.h"
+#include "rocksdb/options.h"
+#include "util/arena.h"
+
+namespace rocksdb {
+
+class DBImpl;
+struct SuperVersion;
+class ColumnFamilyData;
+
+/**
+ * ManagedIterator is a special type of iterator that supports freeing the
+ * underlying iterator and still being able to access the current key/value
+ * pair.  This is done by copying the key/value pair so that clients can
+ * continue to access the data without getting a SIGSEGV.
+ * The underlying iterator can be freed manually through the  call to
+ * ReleaseIter or automatically (as needed on space pressure or age.)
+ * The iterator is recreated using the saved original arguments.
+ */
+class ManagedIterator : public Iterator {
+ public:
+  ManagedIterator(DBImpl* db, const ReadOptions& read_options,
+                  ColumnFamilyData* cfd);
+  virtual ~ManagedIterator();
+
+  virtual void SeekToLast() override;
+  virtual void Prev() override;
+  virtual bool Valid() const override;
+  void SeekToFirst() override;
+  virtual void Seek(const Slice& target) override;
+  virtual void SeekForPrev(const Slice& target) override;
+  virtual void Next() override;
+  virtual Slice key() const override;
+  virtual Slice value() const override;
+  virtual Status status() const override;
+  void ReleaseIter(bool only_old);
+  void SetDropOld(bool only_old) {
+    only_drop_old_ = read_options_.tailing || only_old;
+  }
+
+ private:
+  void RebuildIterator();
+  void UpdateCurrent();
+  void SeekInternal(const Slice& user_key, bool seek_to_first);
+  bool NeedToRebuild();
+  void Lock();
+  bool TryLock();
+  void UnLock();
+  DBImpl* const db_;
+  ReadOptions read_options_;
+  ColumnFamilyData* const cfd_;
+  ColumnFamilyHandleInternal cfh_;
+
+  uint64_t svnum_;
+  std::unique_ptr<Iterator> mutable_iter_;
+  // internal iterator status
+  Status status_;
+  bool valid_;
+
+  IterKey cached_key_;
+  IterKey cached_value_;
+
+  bool only_drop_old_ = true;
+  bool snapshot_created_;
+  bool release_supported_;
+  std::mutex in_use_;  // is managed iterator in use
+};
+
+}  // namespace rocksdb
+#endif  // !ROCKSDB_LITE

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/memtable.cc
----------------------------------------------------------------------
diff --git a/thirdparty/rocksdb/db/memtable.cc b/thirdparty/rocksdb/db/memtable.cc
new file mode 100644
index 0000000..efea619
--- /dev/null
+++ b/thirdparty/rocksdb/db/memtable.cc
@@ -0,0 +1,885 @@
+//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#include "db/memtable.h"
+
+#include <memory>
+#include <algorithm>
+
+#include "db/dbformat.h"
+#include "db/merge_context.h"
+#include "db/merge_helper.h"
+#include "db/pinned_iterators_manager.h"
+#include "monitoring/perf_context_imp.h"
+#include "monitoring/statistics.h"
+#include "port/port.h"
+#include "rocksdb/comparator.h"
+#include "rocksdb/env.h"
+#include "rocksdb/iterator.h"
+#include "rocksdb/merge_operator.h"
+#include "rocksdb/slice_transform.h"
+#include "rocksdb/write_buffer_manager.h"
+#include "table/internal_iterator.h"
+#include "table/iterator_wrapper.h"
+#include "table/merging_iterator.h"
+#include "util/arena.h"
+#include "util/autovector.h"
+#include "util/coding.h"
+#include "util/memory_usage.h"
+#include "util/murmurhash.h"
+#include "util/mutexlock.h"
+
+namespace rocksdb {
+
+MemTableOptions::MemTableOptions(const ImmutableCFOptions& ioptions,
+                                 const MutableCFOptions& mutable_cf_options)
+    : write_buffer_size(mutable_cf_options.write_buffer_size),
+      arena_block_size(mutable_cf_options.arena_block_size),
+      memtable_prefix_bloom_bits(
+          static_cast<uint32_t>(
+              static_cast<double>(mutable_cf_options.write_buffer_size) *
+              mutable_cf_options.memtable_prefix_bloom_size_ratio) *
+          8u),
+      memtable_huge_page_size(mutable_cf_options.memtable_huge_page_size),
+      inplace_update_support(ioptions.inplace_update_support),
+      inplace_update_num_locks(mutable_cf_options.inplace_update_num_locks),
+      inplace_callback(ioptions.inplace_callback),
+      max_successive_merges(mutable_cf_options.max_successive_merges),
+      statistics(ioptions.statistics),
+      merge_operator(ioptions.merge_operator),
+      info_log(ioptions.info_log) {}
+
+MemTable::MemTable(const InternalKeyComparator& cmp,
+                   const ImmutableCFOptions& ioptions,
+                   const MutableCFOptions& mutable_cf_options,
+                   WriteBufferManager* write_buffer_manager,
+                   SequenceNumber latest_seq, uint32_t column_family_id)
+    : comparator_(cmp),
+      moptions_(ioptions, mutable_cf_options),
+      refs_(0),
+      kArenaBlockSize(OptimizeBlockSize(moptions_.arena_block_size)),
+      mem_tracker_(write_buffer_manager),
+      arena_(
+          moptions_.arena_block_size,
+          (write_buffer_manager != nullptr && write_buffer_manager->enabled())
+              ? &mem_tracker_
+              : nullptr,
+          mutable_cf_options.memtable_huge_page_size),
+      table_(ioptions.memtable_factory->CreateMemTableRep(
+          comparator_, &arena_, ioptions.prefix_extractor, ioptions.info_log,
+          column_family_id)),
+      range_del_table_(SkipListFactory().CreateMemTableRep(
+          comparator_, &arena_, nullptr /* transform */, ioptions.info_log,
+          column_family_id)),
+      is_range_del_table_empty_(true),
+      data_size_(0),
+      num_entries_(0),
+      num_deletes_(0),
+      flush_in_progress_(false),
+      flush_completed_(false),
+      file_number_(0),
+      first_seqno_(0),
+      earliest_seqno_(latest_seq),
+      creation_seq_(latest_seq),
+      mem_next_logfile_number_(0),
+      min_prep_log_referenced_(0),
+      locks_(moptions_.inplace_update_support
+                 ? moptions_.inplace_update_num_locks
+                 : 0),
+      prefix_extractor_(ioptions.prefix_extractor),
+      flush_state_(FLUSH_NOT_REQUESTED),
+      env_(ioptions.env),
+      insert_with_hint_prefix_extractor_(
+          ioptions.memtable_insert_with_hint_prefix_extractor) {
+  UpdateFlushState();
+  // something went wrong if we need to flush before inserting anything
+  assert(!ShouldScheduleFlush());
+
+  if (prefix_extractor_ && moptions_.memtable_prefix_bloom_bits > 0) {
+    prefix_bloom_.reset(new DynamicBloom(
+        &arena_, moptions_.memtable_prefix_bloom_bits, ioptions.bloom_locality,
+        6 /* hard coded 6 probes */, nullptr, moptions_.memtable_huge_page_size,
+        ioptions.info_log));
+  }
+}
+
+MemTable::~MemTable() {
+  mem_tracker_.FreeMem();
+  assert(refs_ == 0);
+}
+
+size_t MemTable::ApproximateMemoryUsage() {
+  autovector<size_t> usages = {arena_.ApproximateMemoryUsage(),
+                               table_->ApproximateMemoryUsage(),
+                               range_del_table_->ApproximateMemoryUsage(),
+                               rocksdb::ApproximateMemoryUsage(insert_hints_)};
+  size_t total_usage = 0;
+  for (size_t usage : usages) {
+    // If usage + total_usage >= kMaxSizet, return kMaxSizet.
+    // the following variation is to avoid numeric overflow.
+    if (usage >= port::kMaxSizet - total_usage) {
+      return port::kMaxSizet;
+    }
+    total_usage += usage;
+  }
+  // otherwise, return the actual usage
+  return total_usage;
+}
+
+bool MemTable::ShouldFlushNow() const {
+  // In a lot of times, we cannot allocate arena blocks that exactly matches the
+  // buffer size. Thus we have to decide if we should over-allocate or
+  // under-allocate.
+  // This constant variable can be interpreted as: if we still have more than
+  // "kAllowOverAllocationRatio * kArenaBlockSize" space left, we'd try to over
+  // allocate one more block.
+  const double kAllowOverAllocationRatio = 0.6;
+
+  // If arena still have room for new block allocation, we can safely say it
+  // shouldn't flush.
+  auto allocated_memory = table_->ApproximateMemoryUsage() +
+                          range_del_table_->ApproximateMemoryUsage() +
+                          arena_.MemoryAllocatedBytes();
+
+  // if we can still allocate one more block without exceeding the
+  // over-allocation ratio, then we should not flush.
+  if (allocated_memory + kArenaBlockSize <
+      moptions_.write_buffer_size +
+      kArenaBlockSize * kAllowOverAllocationRatio) {
+    return false;
+  }
+
+  // if user keeps adding entries that exceeds moptions.write_buffer_size,
+  // we need to flush earlier even though we still have much available
+  // memory left.
+  if (allocated_memory > moptions_.write_buffer_size +
+      kArenaBlockSize * kAllowOverAllocationRatio) {
+    return true;
+  }
+
+  // In this code path, Arena has already allocated its "last block", which
+  // means the total allocatedmemory size is either:
+  //  (1) "moderately" over allocated the memory (no more than `0.6 * arena
+  // block size`. Or,
+  //  (2) the allocated memory is less than write buffer size, but we'll stop
+  // here since if we allocate a new arena block, we'll over allocate too much
+  // more (half of the arena block size) memory.
+  //
+  // In either case, to avoid over-allocate, the last block will stop allocation
+  // when its usage reaches a certain ratio, which we carefully choose "0.75
+  // full" as the stop condition because it addresses the following issue with
+  // great simplicity: What if the next inserted entry's size is
+  // bigger than AllocatedAndUnused()?
+  //
+  // The answer is: if the entry size is also bigger than 0.25 *
+  // kArenaBlockSize, a dedicated block will be allocated for it; otherwise
+  // arena will anyway skip the AllocatedAndUnused() and allocate a new, empty
+  // and regular block. In either case, we *overly* over-allocated.
+  //
+  // Therefore, setting the last block to be at most "0.75 full" avoids both
+  // cases.
+  //
+  // NOTE: the average percentage of waste space of this approach can be counted
+  // as: "arena block size * 0.25 / write buffer size". User who specify a small
+  // write buffer size and/or big arena block size may suffer.
+  return arena_.AllocatedAndUnused() < kArenaBlockSize / 4;
+}
+
+void MemTable::UpdateFlushState() {
+  auto state = flush_state_.load(std::memory_order_relaxed);
+  if (state == FLUSH_NOT_REQUESTED && ShouldFlushNow()) {
+    // ignore CAS failure, because that means somebody else requested
+    // a flush
+    flush_state_.compare_exchange_strong(state, FLUSH_REQUESTED,
+                                         std::memory_order_relaxed,
+                                         std::memory_order_relaxed);
+  }
+}
+
+int MemTable::KeyComparator::operator()(const char* prefix_len_key1,
+                                        const char* prefix_len_key2) const {
+  // Internal keys are encoded as length-prefixed strings.
+  Slice k1 = GetLengthPrefixedSlice(prefix_len_key1);
+  Slice k2 = GetLengthPrefixedSlice(prefix_len_key2);
+  return comparator.Compare(k1, k2);
+}
+
+int MemTable::KeyComparator::operator()(const char* prefix_len_key,
+                                        const Slice& key)
+    const {
+  // Internal keys are encoded as length-prefixed strings.
+  Slice a = GetLengthPrefixedSlice(prefix_len_key);
+  return comparator.Compare(a, key);
+}
+
+Slice MemTableRep::UserKey(const char* key) const {
+  Slice slice = GetLengthPrefixedSlice(key);
+  return Slice(slice.data(), slice.size() - 8);
+}
+
+KeyHandle MemTableRep::Allocate(const size_t len, char** buf) {
+  *buf = allocator_->Allocate(len);
+  return static_cast<KeyHandle>(*buf);
+}
+
+// Encode a suitable internal key target for "target" and return it.
+// Uses *scratch as scratch space, and the returned pointer will point
+// into this scratch space.
+const char* EncodeKey(std::string* scratch, const Slice& target) {
+  scratch->clear();
+  PutVarint32(scratch, static_cast<uint32_t>(target.size()));
+  scratch->append(target.data(), target.size());
+  return scratch->data();
+}
+
+class MemTableIterator : public InternalIterator {
+ public:
+  MemTableIterator(const MemTable& mem, const ReadOptions& read_options,
+                   Arena* arena, bool use_range_del_table = false)
+      : bloom_(nullptr),
+        prefix_extractor_(mem.prefix_extractor_),
+        comparator_(mem.comparator_),
+        valid_(false),
+        arena_mode_(arena != nullptr),
+        value_pinned_(!mem.GetMemTableOptions()->inplace_update_support) {
+    if (use_range_del_table) {
+      iter_ = mem.range_del_table_->GetIterator(arena);
+    } else if (prefix_extractor_ != nullptr && !read_options.total_order_seek) {
+      bloom_ = mem.prefix_bloom_.get();
+      iter_ = mem.table_->GetDynamicPrefixIterator(arena);
+    } else {
+      iter_ = mem.table_->GetIterator(arena);
+    }
+  }
+
+  ~MemTableIterator() {
+#ifndef NDEBUG
+    // Assert that the MemTableIterator is never deleted while
+    // Pinning is Enabled.
+    assert(!pinned_iters_mgr_ ||
+           (pinned_iters_mgr_ && !pinned_iters_mgr_->PinningEnabled()));
+#endif
+    if (arena_mode_) {
+      iter_->~Iterator();
+    } else {
+      delete iter_;
+    }
+  }
+
+#ifndef NDEBUG
+  virtual void SetPinnedItersMgr(
+      PinnedIteratorsManager* pinned_iters_mgr) override {
+    pinned_iters_mgr_ = pinned_iters_mgr;
+  }
+  PinnedIteratorsManager* pinned_iters_mgr_ = nullptr;
+#endif
+
+  virtual bool Valid() const override { return valid_; }
+  virtual void Seek(const Slice& k) override {
+    PERF_TIMER_GUARD(seek_on_memtable_time);
+    PERF_COUNTER_ADD(seek_on_memtable_count, 1);
+    if (bloom_ != nullptr) {
+      if (!bloom_->MayContain(
+              prefix_extractor_->Transform(ExtractUserKey(k)))) {
+        PERF_COUNTER_ADD(bloom_memtable_miss_count, 1);
+        valid_ = false;
+        return;
+      } else {
+        PERF_COUNTER_ADD(bloom_memtable_hit_count, 1);
+      }
+    }
+    iter_->Seek(k, nullptr);
+    valid_ = iter_->Valid();
+  }
+  virtual void SeekForPrev(const Slice& k) override {
+    PERF_TIMER_GUARD(seek_on_memtable_time);
+    PERF_COUNTER_ADD(seek_on_memtable_count, 1);
+    if (bloom_ != nullptr) {
+      if (!bloom_->MayContain(
+              prefix_extractor_->Transform(ExtractUserKey(k)))) {
+        PERF_COUNTER_ADD(bloom_memtable_miss_count, 1);
+        valid_ = false;
+        return;
+      } else {
+        PERF_COUNTER_ADD(bloom_memtable_hit_count, 1);
+      }
+    }
+    iter_->Seek(k, nullptr);
+    valid_ = iter_->Valid();
+    if (!Valid()) {
+      SeekToLast();
+    }
+    while (Valid() && comparator_.comparator.Compare(k, key()) < 0) {
+      Prev();
+    }
+  }
+  virtual void SeekToFirst() override {
+    iter_->SeekToFirst();
+    valid_ = iter_->Valid();
+  }
+  virtual void SeekToLast() override {
+    iter_->SeekToLast();
+    valid_ = iter_->Valid();
+  }
+  virtual void Next() override {
+    PERF_COUNTER_ADD(next_on_memtable_count, 1);
+    assert(Valid());
+    iter_->Next();
+    valid_ = iter_->Valid();
+  }
+  virtual void Prev() override {
+    PERF_COUNTER_ADD(prev_on_memtable_count, 1);
+    assert(Valid());
+    iter_->Prev();
+    valid_ = iter_->Valid();
+  }
+  virtual Slice key() const override {
+    assert(Valid());
+    return GetLengthPrefixedSlice(iter_->key());
+  }
+  virtual Slice value() const override {
+    assert(Valid());
+    Slice key_slice = GetLengthPrefixedSlice(iter_->key());
+    return GetLengthPrefixedSlice(key_slice.data() + key_slice.size());
+  }
+
+  virtual Status status() const override { return Status::OK(); }
+
+  virtual bool IsKeyPinned() const override {
+    // memtable data is always pinned
+    return true;
+  }
+
+  virtual bool IsValuePinned() const override {
+    // memtable value is always pinned, except if we allow inplace update.
+    return value_pinned_;
+  }
+
+ private:
+  DynamicBloom* bloom_;
+  const SliceTransform* const prefix_extractor_;
+  const MemTable::KeyComparator comparator_;
+  MemTableRep::Iterator* iter_;
+  bool valid_;
+  bool arena_mode_;
+  bool value_pinned_;
+
+  // No copying allowed
+  MemTableIterator(const MemTableIterator&);
+  void operator=(const MemTableIterator&);
+};
+
+InternalIterator* MemTable::NewIterator(const ReadOptions& read_options,
+                                        Arena* arena) {
+  assert(arena != nullptr);
+  auto mem = arena->AllocateAligned(sizeof(MemTableIterator));
+  return new (mem) MemTableIterator(*this, read_options, arena);
+}
+
+InternalIterator* MemTable::NewRangeTombstoneIterator(
+    const ReadOptions& read_options) {
+  if (read_options.ignore_range_deletions || is_range_del_table_empty_) {
+    return nullptr;
+  }
+  return new MemTableIterator(*this, read_options, nullptr /* arena */,
+                              true /* use_range_del_table */);
+}
+
+port::RWMutex* MemTable::GetLock(const Slice& key) {
+  static murmur_hash hash;
+  return &locks_[hash(key) % locks_.size()];
+}
+
+MemTable::MemTableStats MemTable::ApproximateStats(const Slice& start_ikey,
+                                                   const Slice& end_ikey) {
+  uint64_t entry_count = table_->ApproximateNumEntries(start_ikey, end_ikey);
+  entry_count += range_del_table_->ApproximateNumEntries(start_ikey, end_ikey);
+  if (entry_count == 0) {
+    return {0, 0};
+  }
+  uint64_t n = num_entries_.load(std::memory_order_relaxed);
+  if (n == 0) {
+    return {0, 0};
+  }
+  if (entry_count > n) {
+    // (range_del_)table_->ApproximateNumEntries() is just an estimate so it can
+    // be larger than actual entries we have. Cap it to entries we have to limit
+    // the inaccuracy.
+    entry_count = n;
+  }
+  uint64_t data_size = data_size_.load(std::memory_order_relaxed);
+  return {entry_count * (data_size / n), entry_count};
+}
+
+void MemTable::Add(SequenceNumber s, ValueType type,
+                   const Slice& key, /* user key */
+                   const Slice& value, bool allow_concurrent,
+                   MemTablePostProcessInfo* post_process_info) {
+  // Format of an entry is concatenation of:
+  //  key_size     : varint32 of internal_key.size()
+  //  key bytes    : char[internal_key.size()]
+  //  value_size   : varint32 of value.size()
+  //  value bytes  : char[value.size()]
+  uint32_t key_size = static_cast<uint32_t>(key.size());
+  uint32_t val_size = static_cast<uint32_t>(value.size());
+  uint32_t internal_key_size = key_size + 8;
+  const uint32_t encoded_len = VarintLength(internal_key_size) +
+                               internal_key_size + VarintLength(val_size) +
+                               val_size;
+  char* buf = nullptr;
+  std::unique_ptr<MemTableRep>& table =
+      type == kTypeRangeDeletion ? range_del_table_ : table_;
+  KeyHandle handle = table->Allocate(encoded_len, &buf);
+
+  char* p = EncodeVarint32(buf, internal_key_size);
+  memcpy(p, key.data(), key_size);
+  Slice key_slice(p, key_size);
+  p += key_size;
+  uint64_t packed = PackSequenceAndType(s, type);
+  EncodeFixed64(p, packed);
+  p += 8;
+  p = EncodeVarint32(p, val_size);
+  memcpy(p, value.data(), val_size);
+  assert((unsigned)(p + val_size - buf) == (unsigned)encoded_len);
+  if (!allow_concurrent) {
+    // Extract prefix for insert with hint.
+    if (insert_with_hint_prefix_extractor_ != nullptr &&
+        insert_with_hint_prefix_extractor_->InDomain(key_slice)) {
+      Slice prefix = insert_with_hint_prefix_extractor_->Transform(key_slice);
+      table->InsertWithHint(handle, &insert_hints_[prefix]);
+    } else {
+      table->Insert(handle);
+    }
+
+    // this is a bit ugly, but is the way to avoid locked instructions
+    // when incrementing an atomic
+    num_entries_.store(num_entries_.load(std::memory_order_relaxed) + 1,
+                       std::memory_order_relaxed);
+    data_size_.store(data_size_.load(std::memory_order_relaxed) + encoded_len,
+                     std::memory_order_relaxed);
+    if (type == kTypeDeletion) {
+      num_deletes_.store(num_deletes_.load(std::memory_order_relaxed) + 1,
+                         std::memory_order_relaxed);
+    }
+
+    if (prefix_bloom_) {
+      assert(prefix_extractor_);
+      prefix_bloom_->Add(prefix_extractor_->Transform(key));
+    }
+
+    // The first sequence number inserted into the memtable
+    assert(first_seqno_ == 0 || s > first_seqno_);
+    if (first_seqno_ == 0) {
+      first_seqno_.store(s, std::memory_order_relaxed);
+
+      if (earliest_seqno_ == kMaxSequenceNumber) {
+        earliest_seqno_.store(GetFirstSequenceNumber(),
+                              std::memory_order_relaxed);
+      }
+      assert(first_seqno_.load() >= earliest_seqno_.load());
+    }
+    assert(post_process_info == nullptr);
+    UpdateFlushState();
+  } else {
+    table->InsertConcurrently(handle);
+
+    assert(post_process_info != nullptr);
+    post_process_info->num_entries++;
+    post_process_info->data_size += encoded_len;
+    if (type == kTypeDeletion) {
+      post_process_info->num_deletes++;
+    }
+
+    if (prefix_bloom_) {
+      assert(prefix_extractor_);
+      prefix_bloom_->AddConcurrently(prefix_extractor_->Transform(key));
+    }
+
+    // atomically update first_seqno_ and earliest_seqno_.
+    uint64_t cur_seq_num = first_seqno_.load(std::memory_order_relaxed);
+    while ((cur_seq_num == 0 || s < cur_seq_num) &&
+           !first_seqno_.compare_exchange_weak(cur_seq_num, s)) {
+    }
+    uint64_t cur_earliest_seqno =
+        earliest_seqno_.load(std::memory_order_relaxed);
+    while (
+        (cur_earliest_seqno == kMaxSequenceNumber || s < cur_earliest_seqno) &&
+        !first_seqno_.compare_exchange_weak(cur_earliest_seqno, s)) {
+    }
+  }
+  if (is_range_del_table_empty_ && type == kTypeRangeDeletion) {
+    is_range_del_table_empty_ = false;
+  }
+}
+
+// Callback from MemTable::Get()
+namespace {
+
+struct Saver {
+  Status* status;
+  const LookupKey* key;
+  bool* found_final_value;  // Is value set correctly? Used by KeyMayExist
+  bool* merge_in_progress;
+  std::string* value;
+  SequenceNumber seq;
+  const MergeOperator* merge_operator;
+  // the merge operations encountered;
+  MergeContext* merge_context;
+  RangeDelAggregator* range_del_agg;
+  MemTable* mem;
+  Logger* logger;
+  Statistics* statistics;
+  bool inplace_update_support;
+  Env* env_;
+};
+}  // namespace
+
+static bool SaveValue(void* arg, const char* entry) {
+  Saver* s = reinterpret_cast<Saver*>(arg);
+  MergeContext* merge_context = s->merge_context;
+  RangeDelAggregator* range_del_agg = s->range_del_agg;
+  const MergeOperator* merge_operator = s->merge_operator;
+
+  assert(s != nullptr && merge_context != nullptr && range_del_agg != nullptr);
+
+  // entry format is:
+  //    klength  varint32
+  //    userkey  char[klength-8]
+  //    tag      uint64
+  //    vlength  varint32
+  //    value    char[vlength]
+  // Check that it belongs to same user key.  We do not check the
+  // sequence number since the Seek() call above should have skipped
+  // all entries with overly large sequence numbers.
+  uint32_t key_length;
+  const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
+  if (s->mem->GetInternalKeyComparator().user_comparator()->Equal(
+          Slice(key_ptr, key_length - 8), s->key->user_key())) {
+    // Correct user key
+    const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
+    ValueType type;
+    UnPackSequenceAndType(tag, &s->seq, &type);
+
+    if ((type == kTypeValue || type == kTypeMerge) &&
+        range_del_agg->ShouldDelete(Slice(key_ptr, key_length))) {
+      type = kTypeRangeDeletion;
+    }
+    switch (type) {
+      case kTypeValue: {
+        if (s->inplace_update_support) {
+          s->mem->GetLock(s->key->user_key())->ReadLock();
+        }
+        Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
+        *(s->status) = Status::OK();
+        if (*(s->merge_in_progress)) {
+          *(s->status) = MergeHelper::TimedFullMerge(
+              merge_operator, s->key->user_key(), &v,
+              merge_context->GetOperands(), s->value, s->logger, s->statistics,
+              s->env_, nullptr /* result_operand */, true);
+        } else if (s->value != nullptr) {
+          s->value->assign(v.data(), v.size());
+        }
+        if (s->inplace_update_support) {
+          s->mem->GetLock(s->key->user_key())->ReadUnlock();
+        }
+        *(s->found_final_value) = true;
+        return false;
+      }
+      case kTypeDeletion:
+      case kTypeSingleDeletion:
+      case kTypeRangeDeletion: {
+        if (*(s->merge_in_progress)) {
+          *(s->status) = MergeHelper::TimedFullMerge(
+              merge_operator, s->key->user_key(), nullptr,
+              merge_context->GetOperands(), s->value, s->logger, s->statistics,
+              s->env_, nullptr /* result_operand */, true);
+        } else {
+          *(s->status) = Status::NotFound();
+        }
+        *(s->found_final_value) = true;
+        return false;
+      }
+      case kTypeMerge: {
+        if (!merge_operator) {
+          *(s->status) = Status::InvalidArgument(
+              "merge_operator is not properly initialized.");
+          // Normally we continue the loop (return true) when we see a merge
+          // operand.  But in case of an error, we should stop the loop
+          // immediately and pretend we have found the value to stop further
+          // seek.  Otherwise, the later call will override this error status.
+          *(s->found_final_value) = true;
+          return false;
+        }
+        Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
+        *(s->merge_in_progress) = true;
+        merge_context->PushOperand(
+            v, s->inplace_update_support == false /* operand_pinned */);
+        return true;
+      }
+      default:
+        assert(false);
+        return true;
+    }
+  }
+
+  // s->state could be Corrupt, merge or notfound
+  return false;
+}
+
+bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
+                   MergeContext* merge_context,
+                   RangeDelAggregator* range_del_agg, SequenceNumber* seq,
+                   const ReadOptions& read_opts) {
+  // The sequence number is updated synchronously in version_set.h
+  if (IsEmpty()) {
+    // Avoiding recording stats for speed.
+    return false;
+  }
+  PERF_TIMER_GUARD(get_from_memtable_time);
+
+  Slice user_key = key.user_key();
+  bool found_final_value = false;
+  bool merge_in_progress = s->IsMergeInProgress();
+  bool const may_contain =
+      nullptr == prefix_bloom_
+          ? false
+          : prefix_bloom_->MayContain(prefix_extractor_->Transform(user_key));
+  if (prefix_bloom_ && !may_contain) {
+    // iter is null if prefix bloom says the key does not exist
+    PERF_COUNTER_ADD(bloom_memtable_miss_count, 1);
+    *seq = kMaxSequenceNumber;
+  } else {
+    if (prefix_bloom_) {
+      PERF_COUNTER_ADD(bloom_memtable_hit_count, 1);
+    }
+    std::unique_ptr<InternalIterator> range_del_iter(
+        NewRangeTombstoneIterator(read_opts));
+    Status status = range_del_agg->AddTombstones(std::move(range_del_iter));
+    if (!status.ok()) {
+      *s = status;
+      return false;
+    }
+    Saver saver;
+    saver.status = s;
+    saver.found_final_value = &found_final_value;
+    saver.merge_in_progress = &merge_in_progress;
+    saver.key = &key;
+    saver.value = value;
+    saver.seq = kMaxSequenceNumber;
+    saver.mem = this;
+    saver.merge_context = merge_context;
+    saver.range_del_agg = range_del_agg;
+    saver.merge_operator = moptions_.merge_operator;
+    saver.logger = moptions_.info_log;
+    saver.inplace_update_support = moptions_.inplace_update_support;
+    saver.statistics = moptions_.statistics;
+    saver.env_ = env_;
+    table_->Get(key, &saver, SaveValue);
+
+    *seq = saver.seq;
+  }
+
+  // No change to value, since we have not yet found a Put/Delete
+  if (!found_final_value && merge_in_progress) {
+    *s = Status::MergeInProgress();
+  }
+  PERF_COUNTER_ADD(get_from_memtable_count, 1);
+  return found_final_value;
+}
+
+void MemTable::Update(SequenceNumber seq,
+                      const Slice& key,
+                      const Slice& value) {
+  LookupKey lkey(key, seq);
+  Slice mem_key = lkey.memtable_key();
+
+  std::unique_ptr<MemTableRep::Iterator> iter(
+      table_->GetDynamicPrefixIterator());
+  iter->Seek(lkey.internal_key(), mem_key.data());
+
+  if (iter->Valid()) {
+    // entry format is:
+    //    key_length  varint32
+    //    userkey  char[klength-8]
+    //    tag      uint64
+    //    vlength  varint32
+    //    value    char[vlength]
+    // Check that it belongs to same user key.  We do not check the
+    // sequence number since the Seek() call above should have skipped
+    // all entries with overly large sequence numbers.
+    const char* entry = iter->key();
+    uint32_t key_length = 0;
+    const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
+    if (comparator_.comparator.user_comparator()->Equal(
+            Slice(key_ptr, key_length - 8), lkey.user_key())) {
+      // Correct user key
+      const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
+      ValueType type;
+      SequenceNumber unused;
+      UnPackSequenceAndType(tag, &unused, &type);
+      if (type == kTypeValue) {
+        Slice prev_value = GetLengthPrefixedSlice(key_ptr + key_length);
+        uint32_t prev_size = static_cast<uint32_t>(prev_value.size());
+        uint32_t new_size = static_cast<uint32_t>(value.size());
+
+        // Update value, if new value size  <= previous value size
+        if (new_size <= prev_size) {
+          char* p =
+              EncodeVarint32(const_cast<char*>(key_ptr) + key_length, new_size);
+          WriteLock wl(GetLock(lkey.user_key()));
+          memcpy(p, value.data(), value.size());
+          assert((unsigned)((p + value.size()) - entry) ==
+                 (unsigned)(VarintLength(key_length) + key_length +
+                            VarintLength(value.size()) + value.size()));
+          return;
+        }
+      }
+    }
+  }
+
+  // key doesn't exist
+  Add(seq, kTypeValue, key, value);
+}
+
+bool MemTable::UpdateCallback(SequenceNumber seq,
+                              const Slice& key,
+                              const Slice& delta) {
+  LookupKey lkey(key, seq);
+  Slice memkey = lkey.memtable_key();
+
+  std::unique_ptr<MemTableRep::Iterator> iter(
+      table_->GetDynamicPrefixIterator());
+  iter->Seek(lkey.internal_key(), memkey.data());
+
+  if (iter->Valid()) {
+    // entry format is:
+    //    key_length  varint32
+    //    userkey  char[klength-8]
+    //    tag      uint64
+    //    vlength  varint32
+    //    value    char[vlength]
+    // Check that it belongs to same user key.  We do not check the
+    // sequence number since the Seek() call above should have skipped
+    // all entries with overly large sequence numbers.
+    const char* entry = iter->key();
+    uint32_t key_length = 0;
+    const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
+    if (comparator_.comparator.user_comparator()->Equal(
+            Slice(key_ptr, key_length - 8), lkey.user_key())) {
+      // Correct user key
+      const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
+      ValueType type;
+      uint64_t unused;
+      UnPackSequenceAndType(tag, &unused, &type);
+      switch (type) {
+        case kTypeValue: {
+          Slice prev_value = GetLengthPrefixedSlice(key_ptr + key_length);
+          uint32_t prev_size = static_cast<uint32_t>(prev_value.size());
+
+          char* prev_buffer = const_cast<char*>(prev_value.data());
+          uint32_t new_prev_size = prev_size;
+
+          std::string str_value;
+          WriteLock wl(GetLock(lkey.user_key()));
+          auto status = moptions_.inplace_callback(prev_buffer, &new_prev_size,
+                                                   delta, &str_value);
+          if (status == UpdateStatus::UPDATED_INPLACE) {
+            // Value already updated by callback.
+            assert(new_prev_size <= prev_size);
+            if (new_prev_size < prev_size) {
+              // overwrite the new prev_size
+              char* p = EncodeVarint32(const_cast<char*>(key_ptr) + key_length,
+                                       new_prev_size);
+              if (VarintLength(new_prev_size) < VarintLength(prev_size)) {
+                // shift the value buffer as well.
+                memcpy(p, prev_buffer, new_prev_size);
+              }
+            }
+            RecordTick(moptions_.statistics, NUMBER_KEYS_UPDATED);
+            UpdateFlushState();
+            return true;
+          } else if (status == UpdateStatus::UPDATED) {
+            Add(seq, kTypeValue, key, Slice(str_value));
+            RecordTick(moptions_.statistics, NUMBER_KEYS_WRITTEN);
+            UpdateFlushState();
+            return true;
+          } else if (status == UpdateStatus::UPDATE_FAILED) {
+            // No action required. Return.
+            UpdateFlushState();
+            return true;
+          }
+        }
+        default:
+          break;
+      }
+    }
+  }
+  // If the latest value is not kTypeValue
+  // or key doesn't exist
+  return false;
+}
+
+size_t MemTable::CountSuccessiveMergeEntries(const LookupKey& key) {
+  Slice memkey = key.memtable_key();
+
+  // A total ordered iterator is costly for some memtablerep (prefix aware
+  // reps). By passing in the user key, we allow efficient iterator creation.
+  // The iterator only needs to be ordered within the same user key.
+  std::unique_ptr<MemTableRep::Iterator> iter(
+      table_->GetDynamicPrefixIterator());
+  iter->Seek(key.internal_key(), memkey.data());
+
+  size_t num_successive_merges = 0;
+
+  for (; iter->Valid(); iter->Next()) {
+    const char* entry = iter->key();
+    uint32_t key_length = 0;
+    const char* iter_key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
+    if (!comparator_.comparator.user_comparator()->Equal(
+            Slice(iter_key_ptr, key_length - 8), key.user_key())) {
+      break;
+    }
+
+    const uint64_t tag = DecodeFixed64(iter_key_ptr + key_length - 8);
+    ValueType type;
+    uint64_t unused;
+    UnPackSequenceAndType(tag, &unused, &type);
+    if (type != kTypeMerge) {
+      break;
+    }
+
+    ++num_successive_merges;
+  }
+
+  return num_successive_merges;
+}
+
+void MemTableRep::Get(const LookupKey& k, void* callback_args,
+                      bool (*callback_func)(void* arg, const char* entry)) {
+  auto iter = GetDynamicPrefixIterator();
+  for (iter->Seek(k.internal_key(), k.memtable_key().data());
+       iter->Valid() && callback_func(callback_args, iter->key());
+       iter->Next()) {
+  }
+}
+
+void MemTable::RefLogContainingPrepSection(uint64_t log) {
+  assert(log > 0);
+  auto cur = min_prep_log_referenced_.load();
+  while ((log < cur || cur == 0) &&
+         !min_prep_log_referenced_.compare_exchange_strong(cur, log)) {
+    cur = min_prep_log_referenced_.load();
+  }
+}
+
+uint64_t MemTable::GetMinLogContainingPrepSection() {
+  return min_prep_log_referenced_.load();
+}
+
+}  // namespace rocksdb


Mime
View raw message