nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jeremyd...@apache.org
Subject [16/51] [partial] nifi-minifi-cpp git commit: MINIFI-372: Replace leveldb with RocksDB
Date Mon, 09 Oct 2017 16:24:56 GMT
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/dbformat.h
----------------------------------------------------------------------
diff --git a/thirdparty/rocksdb/db/dbformat.h b/thirdparty/rocksdb/db/dbformat.h
new file mode 100644
index 0000000..d9fd5f3
--- /dev/null
+++ b/thirdparty/rocksdb/db/dbformat.h
@@ -0,0 +1,596 @@
+//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#pragma once
+#include <stdio.h>
+#include <string>
+#include <utility>
+#include "rocksdb/comparator.h"
+#include "rocksdb/db.h"
+#include "rocksdb/filter_policy.h"
+#include "rocksdb/slice.h"
+#include "rocksdb/slice_transform.h"
+#include "rocksdb/table.h"
+#include "rocksdb/types.h"
+#include "util/coding.h"
+#include "util/logging.h"
+
+namespace rocksdb {
+
+class InternalKey;
+
+// Value types encoded as the last component of internal keys.
+// DO NOT CHANGE THESE ENUM VALUES: they are embedded in the on-disk
+// data structures.
+// The highest bit of the value type needs to be reserved to SST tables
+// for them to do more flexible encoding.
+enum ValueType : unsigned char {
+  kTypeDeletion = 0x0,
+  kTypeValue = 0x1,
+  kTypeMerge = 0x2,
+  kTypeLogData = 0x3,               // WAL only.
+  kTypeColumnFamilyDeletion = 0x4,  // WAL only.
+  kTypeColumnFamilyValue = 0x5,     // WAL only.
+  kTypeColumnFamilyMerge = 0x6,     // WAL only.
+  kTypeSingleDeletion = 0x7,
+  kTypeColumnFamilySingleDeletion = 0x8,  // WAL only.
+  kTypeBeginPrepareXID = 0x9,             // WAL only.
+  kTypeEndPrepareXID = 0xA,               // WAL only.
+  kTypeCommitXID = 0xB,                   // WAL only.
+  kTypeRollbackXID = 0xC,                 // WAL only.
+  kTypeNoop = 0xD,                        // WAL only.
+  kTypeColumnFamilyRangeDeletion = 0xE,   // WAL only.
+  kTypeRangeDeletion = 0xF,               // meta block
+  kMaxValue = 0x7F                        // Not used for storing records.
+};
+
+// Defined in dbformat.cc
+extern const ValueType kValueTypeForSeek;
+extern const ValueType kValueTypeForSeekForPrev;
+
+// Checks whether a type is an inline value type
+// (i.e. a type used in memtable skiplist and sst file datablock).
+inline bool IsValueType(ValueType t) {
+  return t <= kTypeMerge || t == kTypeSingleDeletion;
+}
+
+// Checks whether a type is from user operation
+// kTypeRangeDeletion is in meta block so this API is separated from above
+inline bool IsExtendedValueType(ValueType t) {
+  return IsValueType(t) || t == kTypeRangeDeletion;
+}
+
+// We leave eight bits empty at the bottom so a type and sequence#
+// can be packed together into 64-bits.
+static const SequenceNumber kMaxSequenceNumber =
+    ((0x1ull << 56) - 1);
+
+static const SequenceNumber kDisableGlobalSequenceNumber = port::kMaxUint64;
+
+struct ParsedInternalKey {
+  Slice user_key;
+  SequenceNumber sequence;
+  ValueType type;
+
+  ParsedInternalKey()
+      : sequence(kMaxSequenceNumber)  // Make code analyzer happy
+  {}  // Intentionally left uninitialized (for speed)
+  ParsedInternalKey(const Slice& u, const SequenceNumber& seq, ValueType t)
+      : user_key(u), sequence(seq), type(t) { }
+  std::string DebugString(bool hex = false) const;
+
+  void clear() {
+    user_key.clear();
+    sequence = 0;
+    type = kTypeDeletion;
+  }
+};
+
+// Return the length of the encoding of "key".
+inline size_t InternalKeyEncodingLength(const ParsedInternalKey& key) {
+  return key.user_key.size() + 8;
+}
+
+// Pack a sequence number and a ValueType into a uint64_t
+extern uint64_t PackSequenceAndType(uint64_t seq, ValueType t);
+
+// Given the result of PackSequenceAndType, store the sequence number in *seq
+// and the ValueType in *t.
+extern void UnPackSequenceAndType(uint64_t packed, uint64_t* seq, ValueType* t);
+
+// Append the serialization of "key" to *result.
+extern void AppendInternalKey(std::string* result,
+                              const ParsedInternalKey& key);
+// Serialized internal key consists of user key followed by footer.
+// This function appends the footer to *result, assuming that *result already
+// contains the user key at the end.
+extern void AppendInternalKeyFooter(std::string* result, SequenceNumber s,
+                                    ValueType t);
+
+// Attempt to parse an internal key from "internal_key".  On success,
+// stores the parsed data in "*result", and returns true.
+//
+// On error, returns false, leaves "*result" in an undefined state.
+extern bool ParseInternalKey(const Slice& internal_key,
+                             ParsedInternalKey* result);
+
+// Returns the user key portion of an internal key.
+inline Slice ExtractUserKey(const Slice& internal_key) {
+  assert(internal_key.size() >= 8);
+  return Slice(internal_key.data(), internal_key.size() - 8);
+}
+
+inline ValueType ExtractValueType(const Slice& internal_key) {
+  assert(internal_key.size() >= 8);
+  const size_t n = internal_key.size();
+  uint64_t num = DecodeFixed64(internal_key.data() + n - 8);
+  unsigned char c = num & 0xff;
+  return static_cast<ValueType>(c);
+}
+
+// A comparator for internal keys that uses a specified comparator for
+// the user key portion and breaks ties by decreasing sequence number.
+class InternalKeyComparator : public Comparator {
+ private:
+  const Comparator* user_comparator_;
+  std::string name_;
+ public:
+  explicit InternalKeyComparator(const Comparator* c) : user_comparator_(c),
+    name_("rocksdb.InternalKeyComparator:" +
+          std::string(user_comparator_->Name())) {
+  }
+  virtual ~InternalKeyComparator() {}
+
+  virtual const char* Name() const override;
+  virtual int Compare(const Slice& a, const Slice& b) const override;
+  virtual void FindShortestSeparator(std::string* start,
+                                     const Slice& limit) const override;
+  virtual void FindShortSuccessor(std::string* key) const override;
+
+  const Comparator* user_comparator() const { return user_comparator_; }
+
+  int Compare(const InternalKey& a, const InternalKey& b) const;
+  int Compare(const ParsedInternalKey& a, const ParsedInternalKey& b) const;
+  virtual const Comparator* GetRootComparator() const override {
+    return user_comparator_->GetRootComparator();
+  }
+};
+
+// Modules in this directory should keep internal keys wrapped inside
+// the following class instead of plain strings so that we do not
+// incorrectly use string comparisons instead of an InternalKeyComparator.
+class InternalKey {
+ private:
+  std::string rep_;
+ public:
+  InternalKey() { }   // Leave rep_ as empty to indicate it is invalid
+  InternalKey(const Slice& _user_key, SequenceNumber s, ValueType t) {
+    AppendInternalKey(&rep_, ParsedInternalKey(_user_key, s, t));
+  }
+
+  // sets the internal key to be bigger or equal to all internal keys with this
+  // user key
+  void SetMaxPossibleForUserKey(const Slice& _user_key) {
+    AppendInternalKey(&rep_, ParsedInternalKey(_user_key, kMaxSequenceNumber,
+                                               kValueTypeForSeek));
+  }
+
+  // sets the internal key to be smaller or equal to all internal keys with this
+  // user key
+  void SetMinPossibleForUserKey(const Slice& _user_key) {
+    AppendInternalKey(
+        &rep_, ParsedInternalKey(_user_key, 0, static_cast<ValueType>(0)));
+  }
+
+  bool Valid() const {
+    ParsedInternalKey parsed;
+    return ParseInternalKey(Slice(rep_), &parsed);
+  }
+
+  void DecodeFrom(const Slice& s) { rep_.assign(s.data(), s.size()); }
+  Slice Encode() const {
+    assert(!rep_.empty());
+    return rep_;
+  }
+
+  Slice user_key() const { return ExtractUserKey(rep_); }
+  size_t size() { return rep_.size(); }
+
+  void Set(const Slice& _user_key, SequenceNumber s, ValueType t) {
+    SetFrom(ParsedInternalKey(_user_key, s, t));
+  }
+
+  void SetFrom(const ParsedInternalKey& p) {
+    rep_.clear();
+    AppendInternalKey(&rep_, p);
+  }
+
+  void Clear() { rep_.clear(); }
+
+  // The underlying representation.
+  // Intended only to be used together with ConvertFromUserKey().
+  std::string* rep() { return &rep_; }
+
+  // Assuming that *rep() contains a user key, this method makes internal key
+  // out of it in-place. This saves a memcpy compared to Set()/SetFrom().
+  void ConvertFromUserKey(SequenceNumber s, ValueType t) {
+    AppendInternalKeyFooter(&rep_, s, t);
+  }
+
+  std::string DebugString(bool hex = false) const;
+};
+
+inline int InternalKeyComparator::Compare(
+    const InternalKey& a, const InternalKey& b) const {
+  return Compare(a.Encode(), b.Encode());
+}
+
+inline bool ParseInternalKey(const Slice& internal_key,
+                             ParsedInternalKey* result) {
+  const size_t n = internal_key.size();
+  if (n < 8) return false;
+  uint64_t num = DecodeFixed64(internal_key.data() + n - 8);
+  unsigned char c = num & 0xff;
+  result->sequence = num >> 8;
+  result->type = static_cast<ValueType>(c);
+  assert(result->type <= ValueType::kMaxValue);
+  result->user_key = Slice(internal_key.data(), n - 8);
+  return IsExtendedValueType(result->type);
+}
+
+// Update the sequence number in the internal key.
+// Guarantees not to invalidate ikey.data().
+inline void UpdateInternalKey(std::string* ikey, uint64_t seq, ValueType t) {
+  size_t ikey_sz = ikey->size();
+  assert(ikey_sz >= 8);
+  uint64_t newval = (seq << 8) | t;
+
+  // Note: Since C++11, strings are guaranteed to be stored contiguously and
+  // string::operator[]() is guaranteed not to change ikey.data().
+  EncodeFixed64(&(*ikey)[ikey_sz - 8], newval);
+}
+
+// Get the sequence number from the internal key
+inline uint64_t GetInternalKeySeqno(const Slice& internal_key) {
+  const size_t n = internal_key.size();
+  assert(n >= 8);
+  uint64_t num = DecodeFixed64(internal_key.data() + n - 8);
+  return num >> 8;
+}
+
+
+// A helper class useful for DBImpl::Get()
+class LookupKey {
+ public:
+  // Initialize *this for looking up user_key at a snapshot with
+  // the specified sequence number.
+  LookupKey(const Slice& _user_key, SequenceNumber sequence);
+
+  ~LookupKey();
+
+  // Return a key suitable for lookup in a MemTable.
+  Slice memtable_key() const {
+    return Slice(start_, static_cast<size_t>(end_ - start_));
+  }
+
+  // Return an internal key (suitable for passing to an internal iterator)
+  Slice internal_key() const {
+    return Slice(kstart_, static_cast<size_t>(end_ - kstart_));
+  }
+
+  // Return the user key
+  Slice user_key() const {
+    return Slice(kstart_, static_cast<size_t>(end_ - kstart_ - 8));
+  }
+
+ private:
+  // We construct a char array of the form:
+  //    klength  varint32               <-- start_
+  //    userkey  char[klength]          <-- kstart_
+  //    tag      uint64
+  //                                    <-- end_
+  // The array is a suitable MemTable key.
+  // The suffix starting with "userkey" can be used as an InternalKey.
+  const char* start_;
+  const char* kstart_;
+  const char* end_;
+  char space_[200];      // Avoid allocation for short keys
+
+  // No copying allowed
+  LookupKey(const LookupKey&);
+  void operator=(const LookupKey&);
+};
+
+inline LookupKey::~LookupKey() {
+  if (start_ != space_) delete[] start_;
+}
+
+class IterKey {
+ public:
+  IterKey()
+      : buf_(space_),
+        buf_size_(sizeof(space_)),
+        key_(buf_),
+        key_size_(0),
+        is_user_key_(true) {}
+
+  ~IterKey() { ResetBuffer(); }
+
+  Slice GetInternalKey() const {
+    assert(!IsUserKey());
+    return Slice(key_, key_size_);
+  }
+
+  Slice GetUserKey() const {
+    if (IsUserKey()) {
+      return Slice(key_, key_size_);
+    } else {
+      assert(key_size_ >= 8);
+      return Slice(key_, key_size_ - 8);
+    }
+  }
+
+  size_t Size() const { return key_size_; }
+
+  void Clear() { key_size_ = 0; }
+
+  // Append "non_shared_data" to its back, from "shared_len"
+  // This function is used in Block::Iter::ParseNextKey
+  // shared_len: bytes in [0, shard_len-1] would be remained
+  // non_shared_data: data to be append, its length must be >= non_shared_len
+  void TrimAppend(const size_t shared_len, const char* non_shared_data,
+                  const size_t non_shared_len) {
+    assert(shared_len <= key_size_);
+    size_t total_size = shared_len + non_shared_len;
+
+    if (IsKeyPinned() /* key is not in buf_ */) {
+      // Copy the key from external memory to buf_ (copy shared_len bytes)
+      EnlargeBufferIfNeeded(total_size);
+      memcpy(buf_, key_, shared_len);
+    } else if (total_size > buf_size_) {
+      // Need to allocate space, delete previous space
+      char* p = new char[total_size];
+      memcpy(p, key_, shared_len);
+
+      if (buf_ != space_) {
+        delete[] buf_;
+      }
+
+      buf_ = p;
+      buf_size_ = total_size;
+    }
+
+    memcpy(buf_ + shared_len, non_shared_data, non_shared_len);
+    key_ = buf_;
+    key_size_ = total_size;
+  }
+
+  Slice SetUserKey(const Slice& key, bool copy = true) {
+    is_user_key_ = true;
+    return SetKeyImpl(key, copy);
+  }
+
+  Slice SetInternalKey(const Slice& key, bool copy = true) {
+    is_user_key_ = false;
+    return SetKeyImpl(key, copy);
+  }
+
+  // Copies the content of key, updates the reference to the user key in ikey
+  // and returns a Slice referencing the new copy.
+  Slice SetInternalKey(const Slice& key, ParsedInternalKey* ikey) {
+    size_t key_n = key.size();
+    assert(key_n >= 8);
+    SetInternalKey(key);
+    ikey->user_key = Slice(key_, key_n - 8);
+    return Slice(key_, key_n);
+  }
+
+  // Copy the key into IterKey own buf_
+  void OwnKey() {
+    assert(IsKeyPinned() == true);
+
+    Reserve(key_size_);
+    memcpy(buf_, key_, key_size_);
+    key_ = buf_;
+  }
+
+  // Update the sequence number in the internal key.  Guarantees not to
+  // invalidate slices to the key (and the user key).
+  void UpdateInternalKey(uint64_t seq, ValueType t) {
+    assert(!IsKeyPinned());
+    assert(key_size_ >= 8);
+    uint64_t newval = (seq << 8) | t;
+    EncodeFixed64(&buf_[key_size_ - 8], newval);
+  }
+
+  bool IsKeyPinned() const { return (key_ != buf_); }
+
+  void SetInternalKey(const Slice& key_prefix, const Slice& user_key,
+                      SequenceNumber s,
+                      ValueType value_type = kValueTypeForSeek) {
+    size_t psize = key_prefix.size();
+    size_t usize = user_key.size();
+    EnlargeBufferIfNeeded(psize + usize + sizeof(uint64_t));
+    if (psize > 0) {
+      memcpy(buf_, key_prefix.data(), psize);
+    }
+    memcpy(buf_ + psize, user_key.data(), usize);
+    EncodeFixed64(buf_ + usize + psize, PackSequenceAndType(s, value_type));
+
+    key_ = buf_;
+    key_size_ = psize + usize + sizeof(uint64_t);
+    is_user_key_ = false;
+  }
+
+  void SetInternalKey(const Slice& user_key, SequenceNumber s,
+                      ValueType value_type = kValueTypeForSeek) {
+    SetInternalKey(Slice(), user_key, s, value_type);
+  }
+
+  void Reserve(size_t size) {
+    EnlargeBufferIfNeeded(size);
+    key_size_ = size;
+  }
+
+  void SetInternalKey(const ParsedInternalKey& parsed_key) {
+    SetInternalKey(Slice(), parsed_key);
+  }
+
+  void SetInternalKey(const Slice& key_prefix,
+                      const ParsedInternalKey& parsed_key_suffix) {
+    SetInternalKey(key_prefix, parsed_key_suffix.user_key,
+                   parsed_key_suffix.sequence, parsed_key_suffix.type);
+  }
+
+  void EncodeLengthPrefixedKey(const Slice& key) {
+    auto size = key.size();
+    EnlargeBufferIfNeeded(size + static_cast<size_t>(VarintLength(size)));
+    char* ptr = EncodeVarint32(buf_, static_cast<uint32_t>(size));
+    memcpy(ptr, key.data(), size);
+    key_ = buf_;
+    is_user_key_ = true;
+  }
+
+  bool IsUserKey() const { return is_user_key_; }
+
+ private:
+  char* buf_;
+  size_t buf_size_;
+  const char* key_;
+  size_t key_size_;
+  char space_[32];  // Avoid allocation for short keys
+  bool is_user_key_;
+
+  Slice SetKeyImpl(const Slice& key, bool copy) {
+    size_t size = key.size();
+    if (copy) {
+      // Copy key to buf_
+      EnlargeBufferIfNeeded(size);
+      memcpy(buf_, key.data(), size);
+      key_ = buf_;
+    } else {
+      // Update key_ to point to external memory
+      key_ = key.data();
+    }
+    key_size_ = size;
+    return Slice(key_, key_size_);
+  }
+
+  void ResetBuffer() {
+    if (buf_ != space_) {
+      delete[] buf_;
+      buf_ = space_;
+    }
+    buf_size_ = sizeof(space_);
+    key_size_ = 0;
+  }
+
+  // Enlarge the buffer size if needed based on key_size.
+  // By default, static allocated buffer is used. Once there is a key
+  // larger than the static allocated buffer, another buffer is dynamically
+  // allocated, until a larger key buffer is requested. In that case, we
+  // reallocate buffer and delete the old one.
+  void EnlargeBufferIfNeeded(size_t key_size) {
+    // If size is smaller than buffer size, continue using current buffer,
+    // or the static allocated one, as default
+    if (key_size > buf_size_) {
+      // Need to enlarge the buffer.
+      ResetBuffer();
+      buf_ = new char[key_size];
+      buf_size_ = key_size;
+    }
+  }
+
+  // No copying allowed
+  IterKey(const IterKey&) = delete;
+  void operator=(const IterKey&) = delete;
+};
+
+class InternalKeySliceTransform : public SliceTransform {
+ public:
+  explicit InternalKeySliceTransform(const SliceTransform* transform)
+      : transform_(transform) {}
+
+  virtual const char* Name() const override { return transform_->Name(); }
+
+  virtual Slice Transform(const Slice& src) const override {
+    auto user_key = ExtractUserKey(src);
+    return transform_->Transform(user_key);
+  }
+
+  virtual bool InDomain(const Slice& src) const override {
+    auto user_key = ExtractUserKey(src);
+    return transform_->InDomain(user_key);
+  }
+
+  virtual bool InRange(const Slice& dst) const override {
+    auto user_key = ExtractUserKey(dst);
+    return transform_->InRange(user_key);
+  }
+
+  const SliceTransform* user_prefix_extractor() const { return transform_; }
+
+ private:
+  // Like comparator, InternalKeySliceTransform will not take care of the
+  // deletion of transform_
+  const SliceTransform* const transform_;
+};
+
+// Read the key of a record from a write batch.
+// if this record represent the default column family then cf_record
+// must be passed as false, otherwise it must be passed as true.
+extern bool ReadKeyFromWriteBatchEntry(Slice* input, Slice* key,
+                                       bool cf_record);
+
+// Read record from a write batch piece from input.
+// tag, column_family, key, value and blob are return values. Callers own the
+// Slice they point to.
+// Tag is defined as ValueType.
+// input will be advanced to after the record.
+extern Status ReadRecordFromWriteBatch(Slice* input, char* tag,
+                                       uint32_t* column_family, Slice* key,
+                                       Slice* value, Slice* blob, Slice* xid);
+
+// When user call DeleteRange() to delete a range of keys,
+// we will store a serialized RangeTombstone in MemTable and SST.
+// the struct here is a easy-understood form
+// start/end_key_ is the start/end user key of the range to be deleted
+struct RangeTombstone {
+  Slice start_key_;
+  Slice end_key_;
+  SequenceNumber seq_;
+  RangeTombstone() = default;
+  RangeTombstone(Slice sk, Slice ek, SequenceNumber sn)
+      : start_key_(sk), end_key_(ek), seq_(sn) {}
+
+  RangeTombstone(ParsedInternalKey parsed_key, Slice value) {
+    start_key_ = parsed_key.user_key;
+    seq_ = parsed_key.sequence;
+    end_key_ = value;
+  }
+
+  // be careful to use Serialize(), allocates new memory
+  std::pair<InternalKey, Slice> Serialize() const {
+    auto key = InternalKey(start_key_, seq_, kTypeRangeDeletion);
+    Slice value = end_key_;
+    return std::make_pair(std::move(key), std::move(value));
+  }
+
+  // be careful to use SerializeKey(), allocates new memory
+  InternalKey SerializeKey() const {
+    return InternalKey(start_key_, seq_, kTypeRangeDeletion);
+  }
+
+  // be careful to use SerializeEndKey(), allocates new memory
+  InternalKey SerializeEndKey() const {
+    return InternalKey(end_key_, seq_, kTypeRangeDeletion);
+  }
+};
+
+}  // namespace rocksdb

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/event_helpers.cc
----------------------------------------------------------------------
diff --git a/thirdparty/rocksdb/db/event_helpers.cc b/thirdparty/rocksdb/db/event_helpers.cc
new file mode 100644
index 0000000..1b79acb
--- /dev/null
+++ b/thirdparty/rocksdb/db/event_helpers.cc
@@ -0,0 +1,155 @@
+//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
+
+#include "db/event_helpers.h"
+
+namespace rocksdb {
+
+namespace {
+template<class T>
+inline T SafeDivide(T a, T b) {
+  return b == 0 ? 0 : a / b;
+}
+}  // namespace
+
+void EventHelpers::AppendCurrentTime(JSONWriter* jwriter) {
+  *jwriter << "time_micros"
+           << std::chrono::duration_cast<std::chrono::microseconds>(
+                  std::chrono::system_clock::now().time_since_epoch()).count();
+}
+
+#ifndef ROCKSDB_LITE
+void EventHelpers::NotifyTableFileCreationStarted(
+    const std::vector<std::shared_ptr<EventListener>>& listeners,
+    const std::string& db_name, const std::string& cf_name,
+    const std::string& file_path, int job_id, TableFileCreationReason reason) {
+  TableFileCreationBriefInfo info;
+  info.db_name = db_name;
+  info.cf_name = cf_name;
+  info.file_path = file_path;
+  info.job_id = job_id;
+  info.reason = reason;
+  for (auto& listener : listeners) {
+    listener->OnTableFileCreationStarted(info);
+  }
+}
+#endif  // !ROCKSDB_LITE
+
+void EventHelpers::NotifyOnBackgroundError(
+    const std::vector<std::shared_ptr<EventListener>>& listeners,
+    BackgroundErrorReason reason, Status* bg_error,
+    InstrumentedMutex* db_mutex) {
+#ifndef ROCKSDB_LITE
+  if (listeners.size() == 0U) {
+    return;
+  }
+  db_mutex->AssertHeld();
+  // release lock while notifying events
+  db_mutex->Unlock();
+  for (auto& listener : listeners) {
+    listener->OnBackgroundError(reason, bg_error);
+  }
+  db_mutex->Lock();
+#endif  // ROCKSDB_LITE
+}
+
+void EventHelpers::LogAndNotifyTableFileCreationFinished(
+    EventLogger* event_logger,
+    const std::vector<std::shared_ptr<EventListener>>& listeners,
+    const std::string& db_name, const std::string& cf_name,
+    const std::string& file_path, int job_id, const FileDescriptor& fd,
+    const TableProperties& table_properties, TableFileCreationReason reason,
+    const Status& s) {
+  if (s.ok() && event_logger) {
+    JSONWriter jwriter;
+    AppendCurrentTime(&jwriter);
+    jwriter << "cf_name" << cf_name << "job" << job_id << "event"
+            << "table_file_creation"
+            << "file_number" << fd.GetNumber() << "file_size"
+            << fd.GetFileSize();
+
+    // table_properties
+    {
+      jwriter << "table_properties";
+      jwriter.StartObject();
+
+      // basic properties:
+      jwriter << "data_size" << table_properties.data_size << "index_size"
+              << table_properties.index_size << "filter_size"
+              << table_properties.filter_size << "raw_key_size"
+              << table_properties.raw_key_size << "raw_average_key_size"
+              << SafeDivide(table_properties.raw_key_size,
+                            table_properties.num_entries)
+              << "raw_value_size" << table_properties.raw_value_size
+              << "raw_average_value_size"
+              << SafeDivide(table_properties.raw_value_size,
+                            table_properties.num_entries)
+              << "num_data_blocks" << table_properties.num_data_blocks
+              << "num_entries" << table_properties.num_entries
+              << "filter_policy_name" << table_properties.filter_policy_name;
+
+      // user collected properties
+      for (const auto& prop : table_properties.readable_properties) {
+        jwriter << prop.first << prop.second;
+      }
+      jwriter.EndObject();
+    }
+    jwriter.EndObject();
+
+    event_logger->Log(jwriter);
+  }
+
+#ifndef ROCKSDB_LITE
+  if (listeners.size() == 0) {
+    return;
+  }
+  TableFileCreationInfo info;
+  info.db_name = db_name;
+  info.cf_name = cf_name;
+  info.file_path = file_path;
+  info.file_size = fd.file_size;
+  info.job_id = job_id;
+  info.table_properties = table_properties;
+  info.reason = reason;
+  info.status = s;
+  for (auto& listener : listeners) {
+    listener->OnTableFileCreated(info);
+  }
+#endif  // !ROCKSDB_LITE
+}
+
+void EventHelpers::LogAndNotifyTableFileDeletion(
+    EventLogger* event_logger, int job_id,
+    uint64_t file_number, const std::string& file_path,
+    const Status& status, const std::string& dbname,
+    const std::vector<std::shared_ptr<EventListener>>& listeners) {
+
+  JSONWriter jwriter;
+  AppendCurrentTime(&jwriter);
+
+  jwriter << "job" << job_id
+          << "event" << "table_file_deletion"
+          << "file_number" << file_number;
+  if (!status.ok()) {
+    jwriter << "status" << status.ToString();
+  }
+
+  jwriter.EndObject();
+
+  event_logger->Log(jwriter);
+
+#ifndef ROCKSDB_LITE
+  TableFileDeletionInfo info;
+  info.db_name = dbname;
+  info.job_id = job_id;
+  info.file_path = file_path;
+  info.status = status;
+  for (auto& listener : listeners) {
+    listener->OnTableFileDeleted(info);
+  }
+#endif  // !ROCKSDB_LITE
+}
+
+}  // namespace rocksdb

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/event_helpers.h
----------------------------------------------------------------------
diff --git a/thirdparty/rocksdb/db/event_helpers.h b/thirdparty/rocksdb/db/event_helpers.h
new file mode 100644
index 0000000..674e6c5
--- /dev/null
+++ b/thirdparty/rocksdb/db/event_helpers.h
@@ -0,0 +1,52 @@
+//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
+#pragma once
+
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "db/column_family.h"
+#include "db/version_edit.h"
+#include "rocksdb/listener.h"
+#include "rocksdb/table_properties.h"
+#include "util/event_logger.h"
+
+namespace rocksdb {
+
+class EventHelpers {
+ public:
+  static void AppendCurrentTime(JSONWriter* json_writer);
+#ifndef ROCKSDB_LITE
+  static void NotifyTableFileCreationStarted(
+      const std::vector<std::shared_ptr<EventListener>>& listeners,
+      const std::string& db_name, const std::string& cf_name,
+      const std::string& file_path, int job_id, TableFileCreationReason reason);
+#endif  // !ROCKSDB_LITE
+  static void NotifyOnBackgroundError(
+      const std::vector<std::shared_ptr<EventListener>>& listeners,
+      BackgroundErrorReason reason, Status* bg_error,
+      InstrumentedMutex* db_mutex);
+  static void LogAndNotifyTableFileCreationFinished(
+      EventLogger* event_logger,
+      const std::vector<std::shared_ptr<EventListener>>& listeners,
+      const std::string& db_name, const std::string& cf_name,
+      const std::string& file_path, int job_id, const FileDescriptor& fd,
+      const TableProperties& table_properties, TableFileCreationReason reason,
+      const Status& s);
+  static void LogAndNotifyTableFileDeletion(
+      EventLogger* event_logger, int job_id,
+      uint64_t file_number, const std::string& file_path,
+      const Status& status, const std::string& db_name,
+      const std::vector<std::shared_ptr<EventListener>>& listeners);
+
+ private:
+  static void LogAndNotifyTableFileCreation(
+      EventLogger* event_logger,
+      const std::vector<std::shared_ptr<EventListener>>& listeners,
+      const FileDescriptor& fd, const TableFileCreationInfo& info);
+};
+
+}  // namespace rocksdb

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/experimental.cc
----------------------------------------------------------------------
diff --git a/thirdparty/rocksdb/db/experimental.cc b/thirdparty/rocksdb/db/experimental.cc
new file mode 100644
index 0000000..effe9d7
--- /dev/null
+++ b/thirdparty/rocksdb/db/experimental.cc
@@ -0,0 +1,49 @@
+//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
+
+#include "rocksdb/experimental.h"
+
+#include "db/db_impl.h"
+
+namespace rocksdb {
+namespace experimental {
+
+#ifndef ROCKSDB_LITE
+
+Status SuggestCompactRange(DB* db, ColumnFamilyHandle* column_family,
+                           const Slice* begin, const Slice* end) {
+  if (db == nullptr) {
+    return Status::InvalidArgument("DB is empty");
+  }
+
+  return db->SuggestCompactRange(column_family, begin, end);
+}
+
+Status PromoteL0(DB* db, ColumnFamilyHandle* column_family, int target_level) {
+  if (db == nullptr) {
+    return Status::InvalidArgument("Didn't recognize DB object");
+  }
+  return db->PromoteL0(column_family, target_level);
+}
+
+#else  // ROCKSDB_LITE
+
+Status SuggestCompactRange(DB* db, ColumnFamilyHandle* column_family,
+                           const Slice* begin, const Slice* end) {
+  return Status::NotSupported("Not supported in RocksDB LITE");
+}
+
+Status PromoteL0(DB* db, ColumnFamilyHandle* column_family, int target_level) {
+  return Status::NotSupported("Not supported in RocksDB LITE");
+}
+
+#endif  // ROCKSDB_LITE
+
+Status SuggestCompactRange(DB* db, const Slice* begin, const Slice* end) {
+  return SuggestCompactRange(db, db->DefaultColumnFamily(), begin, end);
+}
+
+}  // namespace experimental
+}  // namespace rocksdb

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/external_sst_file_ingestion_job.cc
----------------------------------------------------------------------
diff --git a/thirdparty/rocksdb/db/external_sst_file_ingestion_job.cc b/thirdparty/rocksdb/db/external_sst_file_ingestion_job.cc
new file mode 100644
index 0000000..58fa354
--- /dev/null
+++ b/thirdparty/rocksdb/db/external_sst_file_ingestion_job.cc
@@ -0,0 +1,665 @@
+//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
+
+#ifndef ROCKSDB_LITE
+
+#include "db/external_sst_file_ingestion_job.h"
+
+#ifndef __STDC_FORMAT_MACROS
+#define __STDC_FORMAT_MACROS
+#endif
+
+#include <inttypes.h>
+#include <algorithm>
+#include <string>
+#include <vector>
+
+#include "db/version_edit.h"
+#include "table/merging_iterator.h"
+#include "table/scoped_arena_iterator.h"
+#include "table/sst_file_writer_collectors.h"
+#include "table/table_builder.h"
+#include "util/file_reader_writer.h"
+#include "util/file_util.h"
+#include "util/stop_watch.h"
+#include "util/sync_point.h"
+
+namespace rocksdb {
+
+Status ExternalSstFileIngestionJob::Prepare(
+    const std::vector<std::string>& external_files_paths) {
+  Status status;
+
+  // Read the information of files we are ingesting
+  for (const std::string& file_path : external_files_paths) {
+    IngestedFileInfo file_to_ingest;
+    status = GetIngestedFileInfo(file_path, &file_to_ingest);
+    if (!status.ok()) {
+      return status;
+    }
+    files_to_ingest_.push_back(file_to_ingest);
+  }
+
+  for (const IngestedFileInfo& f : files_to_ingest_) {
+    if (f.cf_id !=
+            TablePropertiesCollectorFactory::Context::kUnknownColumnFamily &&
+        f.cf_id != cfd_->GetID()) {
+      return Status::InvalidArgument(
+          "External file column family id dont match");
+    }
+  }
+
+  const Comparator* ucmp = cfd_->internal_comparator().user_comparator();
+  auto num_files = files_to_ingest_.size();
+  if (num_files == 0) {
+    return Status::InvalidArgument("The list of files is empty");
+  } else if (num_files > 1) {
+    // Verify that passed files dont have overlapping ranges
+    autovector<const IngestedFileInfo*> sorted_files;
+    for (size_t i = 0; i < num_files; i++) {
+      sorted_files.push_back(&files_to_ingest_[i]);
+    }
+
+    std::sort(
+        sorted_files.begin(), sorted_files.end(),
+        [&ucmp](const IngestedFileInfo* info1, const IngestedFileInfo* info2) {
+          return ucmp->Compare(info1->smallest_user_key,
+                               info2->smallest_user_key) < 0;
+        });
+
+    for (size_t i = 0; i < num_files - 1; i++) {
+      if (ucmp->Compare(sorted_files[i]->largest_user_key,
+                        sorted_files[i + 1]->smallest_user_key) >= 0) {
+        return Status::NotSupported("Files have overlapping ranges");
+      }
+    }
+  }
+
+  for (IngestedFileInfo& f : files_to_ingest_) {
+    if (f.num_entries == 0) {
+      return Status::InvalidArgument("File contain no entries");
+    }
+
+    if (!f.smallest_internal_key().Valid() ||
+        !f.largest_internal_key().Valid()) {
+      return Status::Corruption("Generated table have corrupted keys");
+    }
+  }
+
+  // Copy/Move external files into DB
+  for (IngestedFileInfo& f : files_to_ingest_) {
+    f.fd = FileDescriptor(versions_->NewFileNumber(), 0, f.file_size);
+
+    const std::string path_outside_db = f.external_file_path;
+    const std::string path_inside_db =
+        TableFileName(db_options_.db_paths, f.fd.GetNumber(), f.fd.GetPathId());
+
+    if (ingestion_options_.move_files) {
+      status = env_->LinkFile(path_outside_db, path_inside_db);
+      if (status.IsNotSupported()) {
+        // Original file is on a different FS, use copy instead of hard linking
+        status = CopyFile(env_, path_outside_db, path_inside_db, 0,
+                          db_options_.use_fsync);
+      }
+    } else {
+      status = CopyFile(env_, path_outside_db, path_inside_db, 0,
+                        db_options_.use_fsync);
+    }
+    TEST_SYNC_POINT("DBImpl::AddFile:FileCopied");
+    if (!status.ok()) {
+      break;
+    }
+    f.internal_file_path = path_inside_db;
+  }
+
+  if (!status.ok()) {
+    // We failed, remove all files that we copied into the db
+    for (IngestedFileInfo& f : files_to_ingest_) {
+      if (f.internal_file_path == "") {
+        break;
+      }
+      Status s = env_->DeleteFile(f.internal_file_path);
+      if (!s.ok()) {
+        ROCKS_LOG_WARN(db_options_.info_log,
+                       "AddFile() clean up for file %s failed : %s",
+                       f.internal_file_path.c_str(), s.ToString().c_str());
+      }
+    }
+  }
+
+  return status;
+}
+
+Status ExternalSstFileIngestionJob::NeedsFlush(bool* flush_needed) {
+  SuperVersion* super_version = cfd_->GetSuperVersion();
+  Status status =
+      IngestedFilesOverlapWithMemtables(super_version, flush_needed);
+
+  if (status.ok() && *flush_needed &&
+      !ingestion_options_.allow_blocking_flush) {
+    status = Status::InvalidArgument("External file requires flush");
+  }
+  return status;
+}
+
+// REQUIRES: we have become the only writer by entering both write_thread_ and
+// nonmem_write_thread_
+Status ExternalSstFileIngestionJob::Run() {
+  Status status;
+#ifndef NDEBUG
+  // We should never run the job with a memtable that is overlapping
+  // with the files we are ingesting
+  bool need_flush = false;
+  status = NeedsFlush(&need_flush);
+  assert(status.ok() && need_flush == false);
+#endif
+
+  bool consumed_seqno = false;
+  bool force_global_seqno = false;
+
+  if (ingestion_options_.snapshot_consistency && !db_snapshots_->empty()) {
+    // We need to assign a global sequence number to all the files even
+    // if the dont overlap with any ranges since we have snapshots
+    force_global_seqno = true;
+  }
+  // It is safe to use this instead of LastToBeWrittenSequence since we are
+  // the only active writer, and hence they are equal
+  const SequenceNumber last_seqno = versions_->LastSequence();
+  SuperVersion* super_version = cfd_->GetSuperVersion();
+  edit_.SetColumnFamily(cfd_->GetID());
+  // The levels that the files will be ingested into
+
+  for (IngestedFileInfo& f : files_to_ingest_) {
+    SequenceNumber assigned_seqno = 0;
+    if (ingestion_options_.ingest_behind) {
+      status = CheckLevelForIngestedBehindFile(&f);
+    } else {
+      status = AssignLevelAndSeqnoForIngestedFile(
+         super_version, force_global_seqno, cfd_->ioptions()->compaction_style,
+         &f, &assigned_seqno);
+    }
+    if (!status.ok()) {
+      return status;
+    }
+    status = AssignGlobalSeqnoForIngestedFile(&f, assigned_seqno);
+    TEST_SYNC_POINT_CALLBACK("ExternalSstFileIngestionJob::Run",
+                             &assigned_seqno);
+    if (assigned_seqno == last_seqno + 1) {
+      consumed_seqno = true;
+    }
+    if (!status.ok()) {
+      return status;
+    }
+    edit_.AddFile(f.picked_level, f.fd.GetNumber(), f.fd.GetPathId(),
+                  f.fd.GetFileSize(), f.smallest_internal_key(),
+                  f.largest_internal_key(), f.assigned_seqno, f.assigned_seqno,
+                  false);
+  }
+
+  if (consumed_seqno) {
+    versions_->SetLastToBeWrittenSequence(last_seqno + 1);
+    versions_->SetLastSequence(last_seqno + 1);
+  }
+
+  return status;
+}
+
+void ExternalSstFileIngestionJob::UpdateStats() {
+  // Update internal stats for new ingested files
+  uint64_t total_keys = 0;
+  uint64_t total_l0_files = 0;
+  uint64_t total_time = env_->NowMicros() - job_start_time_;
+  for (IngestedFileInfo& f : files_to_ingest_) {
+    InternalStats::CompactionStats stats(1);
+    stats.micros = total_time;
+    stats.bytes_written = f.fd.GetFileSize();
+    stats.num_output_files = 1;
+    cfd_->internal_stats()->AddCompactionStats(f.picked_level, stats);
+    cfd_->internal_stats()->AddCFStats(InternalStats::BYTES_INGESTED_ADD_FILE,
+                                       f.fd.GetFileSize());
+    total_keys += f.num_entries;
+    if (f.picked_level == 0) {
+      total_l0_files += 1;
+    }
+    ROCKS_LOG_INFO(
+        db_options_.info_log,
+        "[AddFile] External SST file %s was ingested in L%d with path %s "
+        "(global_seqno=%" PRIu64 ")\n",
+        f.external_file_path.c_str(), f.picked_level,
+        f.internal_file_path.c_str(), f.assigned_seqno);
+  }
+  cfd_->internal_stats()->AddCFStats(InternalStats::INGESTED_NUM_KEYS_TOTAL,
+                                     total_keys);
+  cfd_->internal_stats()->AddCFStats(InternalStats::INGESTED_NUM_FILES_TOTAL,
+                                     files_to_ingest_.size());
+  cfd_->internal_stats()->AddCFStats(
+      InternalStats::INGESTED_LEVEL0_NUM_FILES_TOTAL, total_l0_files);
+}
+
+void ExternalSstFileIngestionJob::Cleanup(const Status& status) {
+  if (!status.ok()) {
+    // We failed to add the files to the database
+    // remove all the files we copied
+    for (IngestedFileInfo& f : files_to_ingest_) {
+      Status s = env_->DeleteFile(f.internal_file_path);
+      if (!s.ok()) {
+        ROCKS_LOG_WARN(db_options_.info_log,
+                       "AddFile() clean up for file %s failed : %s",
+                       f.internal_file_path.c_str(), s.ToString().c_str());
+      }
+    }
+  } else if (status.ok() && ingestion_options_.move_files) {
+    // The files were moved and added successfully, remove original file links
+    for (IngestedFileInfo& f : files_to_ingest_) {
+      Status s = env_->DeleteFile(f.external_file_path);
+      if (!s.ok()) {
+        ROCKS_LOG_WARN(
+            db_options_.info_log,
+            "%s was added to DB successfully but failed to remove original "
+            "file link : %s",
+            f.external_file_path.c_str(), s.ToString().c_str());
+      }
+    }
+  }
+}
+
+Status ExternalSstFileIngestionJob::GetIngestedFileInfo(
+    const std::string& external_file, IngestedFileInfo* file_to_ingest) {
+  file_to_ingest->external_file_path = external_file;
+
+  // Get external file size
+  Status status = env_->GetFileSize(external_file, &file_to_ingest->file_size);
+  if (!status.ok()) {
+    return status;
+  }
+
+  // Create TableReader for external file
+  std::unique_ptr<TableReader> table_reader;
+  std::unique_ptr<RandomAccessFile> sst_file;
+  std::unique_ptr<RandomAccessFileReader> sst_file_reader;
+
+  status = env_->NewRandomAccessFile(external_file, &sst_file, env_options_);
+  if (!status.ok()) {
+    return status;
+  }
+  sst_file_reader.reset(new RandomAccessFileReader(std::move(sst_file),
+                                                   external_file));
+
+  status = cfd_->ioptions()->table_factory->NewTableReader(
+      TableReaderOptions(*cfd_->ioptions(), env_options_,
+                         cfd_->internal_comparator()),
+      std::move(sst_file_reader), file_to_ingest->file_size, &table_reader);
+  if (!status.ok()) {
+    return status;
+  }
+
+  // Get the external file properties
+  auto props = table_reader->GetTableProperties();
+  const auto& uprops = props->user_collected_properties;
+
+  // Get table version
+  auto version_iter = uprops.find(ExternalSstFilePropertyNames::kVersion);
+  if (version_iter == uprops.end()) {
+    return Status::Corruption("External file version not found");
+  }
+  file_to_ingest->version = DecodeFixed32(version_iter->second.c_str());
+
+  auto seqno_iter = uprops.find(ExternalSstFilePropertyNames::kGlobalSeqno);
+  if (file_to_ingest->version == 2) {
+    // version 2 imply that we have global sequence number
+    if (seqno_iter == uprops.end()) {
+      return Status::Corruption(
+          "External file global sequence number not found");
+    }
+
+    // Set the global sequence number
+    file_to_ingest->original_seqno = DecodeFixed64(seqno_iter->second.c_str());
+    file_to_ingest->global_seqno_offset = props->properties_offsets.at(
+        ExternalSstFilePropertyNames::kGlobalSeqno);
+
+    if (file_to_ingest->global_seqno_offset == 0) {
+      return Status::Corruption("Was not able to find file global seqno field");
+    }
+  } else if (file_to_ingest->version == 1) {
+    // SST file V1 should not have global seqno field
+    assert(seqno_iter == uprops.end());
+    file_to_ingest->original_seqno = 0;
+    if (ingestion_options_.allow_blocking_flush ||
+            ingestion_options_.allow_global_seqno) {
+      return Status::InvalidArgument(
+            "External SST file V1 does not support global seqno");
+    }
+  } else {
+    return Status::InvalidArgument("External file version is not supported");
+  }
+  // Get number of entries in table
+  file_to_ingest->num_entries = props->num_entries;
+
+  ParsedInternalKey key;
+  ReadOptions ro;
+  // During reading the external file we can cache blocks that we read into
+  // the block cache, if we later change the global seqno of this file, we will
+  // have block in cache that will include keys with wrong seqno.
+  // We need to disable fill_cache so that we read from the file without
+  // updating the block cache.
+  ro.fill_cache = false;
+  std::unique_ptr<InternalIterator> iter(table_reader->NewIterator(ro));
+
+  // Get first (smallest) key from file
+  iter->SeekToFirst();
+  if (!ParseInternalKey(iter->key(), &key)) {
+    return Status::Corruption("external file have corrupted keys");
+  }
+  if (key.sequence != 0) {
+    return Status::Corruption("external file have non zero sequence number");
+  }
+  file_to_ingest->smallest_user_key = key.user_key.ToString();
+
+  // Get last (largest) key from file
+  iter->SeekToLast();
+  if (!ParseInternalKey(iter->key(), &key)) {
+    return Status::Corruption("external file have corrupted keys");
+  }
+  if (key.sequence != 0) {
+    return Status::Corruption("external file have non zero sequence number");
+  }
+  file_to_ingest->largest_user_key = key.user_key.ToString();
+
+  file_to_ingest->cf_id = static_cast<uint32_t>(props->column_family_id);
+
+  file_to_ingest->table_properties = *props;
+
+  return status;
+}
+
+Status ExternalSstFileIngestionJob::IngestedFilesOverlapWithMemtables(
+    SuperVersion* sv, bool* overlap) {
+  // Create an InternalIterator over all memtables
+  Arena arena;
+  ReadOptions ro;
+  ro.total_order_seek = true;
+  MergeIteratorBuilder merge_iter_builder(&cfd_->internal_comparator(), &arena);
+  merge_iter_builder.AddIterator(sv->mem->NewIterator(ro, &arena));
+  sv->imm->AddIterators(ro, &merge_iter_builder);
+  ScopedArenaIterator memtable_iter(merge_iter_builder.Finish());
+
+  std::vector<InternalIterator*> memtable_range_del_iters;
+  auto* active_range_del_iter = sv->mem->NewRangeTombstoneIterator(ro);
+  if (active_range_del_iter != nullptr) {
+    memtable_range_del_iters.push_back(active_range_del_iter);
+  }
+  sv->imm->AddRangeTombstoneIterators(ro, &memtable_range_del_iters);
+  std::unique_ptr<InternalIterator> memtable_range_del_iter(NewMergingIterator(
+      &cfd_->internal_comparator(),
+      memtable_range_del_iters.empty() ? nullptr : &memtable_range_del_iters[0],
+      static_cast<int>(memtable_range_del_iters.size())));
+
+  Status status;
+  *overlap = false;
+  for (IngestedFileInfo& f : files_to_ingest_) {
+    status =
+        IngestedFileOverlapWithIteratorRange(&f, memtable_iter.get(), overlap);
+    if (!status.ok() || *overlap == true) {
+      break;
+    }
+    status = IngestedFileOverlapWithRangeDeletions(
+        &f, memtable_range_del_iter.get(), overlap);
+    if (!status.ok() || *overlap == true) {
+      break;
+    }
+  }
+
+  return status;
+}
+
+Status ExternalSstFileIngestionJob::AssignLevelAndSeqnoForIngestedFile(
+    SuperVersion* sv, bool force_global_seqno, CompactionStyle compaction_style,
+    IngestedFileInfo* file_to_ingest, SequenceNumber* assigned_seqno) {
+  Status status;
+  *assigned_seqno = 0;
+  const SequenceNumber last_seqno = versions_->LastSequence();
+  if (force_global_seqno) {
+    *assigned_seqno = last_seqno + 1;
+    if (compaction_style == kCompactionStyleUniversal) {
+      file_to_ingest->picked_level = 0;
+      return status;
+    }
+  }
+
+  bool overlap_with_db = false;
+  Arena arena;
+  ReadOptions ro;
+  ro.total_order_seek = true;
+  int target_level = 0;
+  auto* vstorage = cfd_->current()->storage_info();
+
+  for (int lvl = 0; lvl < cfd_->NumberLevels(); lvl++) {
+    if (lvl > 0 && lvl < vstorage->base_level()) {
+      continue;
+    }
+
+    if (vstorage->NumLevelFiles(lvl) > 0) {
+      bool overlap_with_level = false;
+      status = IngestedFileOverlapWithLevel(sv, file_to_ingest, lvl,
+        &overlap_with_level);
+      if (!status.ok()) {
+        return status;
+      }
+      if (overlap_with_level) {
+        // We must use L0 or any level higher than `lvl` to be able to overwrite
+        // the keys that we overlap with in this level, We also need to assign
+        // this file a seqno to overwrite the existing keys in level `lvl`
+        overlap_with_db = true;
+        break;
+      }
+
+      if (compaction_style == kCompactionStyleUniversal && lvl != 0) {
+        const std::vector<FileMetaData*>& level_files =
+            vstorage->LevelFiles(lvl);
+        const SequenceNumber level_largest_seqno =
+            (*max_element(level_files.begin(), level_files.end(),
+                          [](FileMetaData* f1, FileMetaData* f2) {
+                            return f1->largest_seqno < f2->largest_seqno;
+                          }))
+                ->largest_seqno;
+        if (level_largest_seqno != 0) {
+          *assigned_seqno = level_largest_seqno;
+        } else {
+          continue;
+        }
+      }
+    } else if (compaction_style == kCompactionStyleUniversal) {
+      continue;
+    }
+
+    // We dont overlap with any keys in this level, but we still need to check
+    // if our file can fit in it
+    if (IngestedFileFitInLevel(file_to_ingest, lvl)) {
+      target_level = lvl;
+    }
+  }
+ TEST_SYNC_POINT_CALLBACK(
+      "ExternalSstFileIngestionJob::AssignLevelAndSeqnoForIngestedFile",
+      &overlap_with_db);
+  file_to_ingest->picked_level = target_level;
+  if (overlap_with_db && *assigned_seqno == 0) {
+    *assigned_seqno = last_seqno + 1;
+  }
+  return status;
+}
+
+Status ExternalSstFileIngestionJob::CheckLevelForIngestedBehindFile(
+    IngestedFileInfo* file_to_ingest) {
+  auto* vstorage = cfd_->current()->storage_info();
+  // first check if new files fit in the bottommost level
+  int bottom_lvl = cfd_->NumberLevels() - 1;
+  if(!IngestedFileFitInLevel(file_to_ingest, bottom_lvl)) {
+    return Status::InvalidArgument(
+      "Can't ingest_behind file as it doesn't fit "
+      "at the bottommost level!");
+  }
+
+  // second check if despite allow_ingest_behind=true we still have 0 seqnums
+  // at some upper level
+  for (int lvl = 0; lvl < cfd_->NumberLevels() - 1; lvl++) {
+    for (auto file : vstorage->LevelFiles(lvl)) {
+      if (file->smallest_seqno == 0) {
+        return Status::InvalidArgument(
+          "Can't ingest_behind file as despite allow_ingest_behind=true "
+          "there are files with 0 seqno in database at upper levels!");
+      }
+    }
+  }
+
+  file_to_ingest->picked_level = bottom_lvl;
+  return Status::OK();
+}
+
+Status ExternalSstFileIngestionJob::AssignGlobalSeqnoForIngestedFile(
+    IngestedFileInfo* file_to_ingest, SequenceNumber seqno) {
+  if (file_to_ingest->original_seqno == seqno) {
+    // This file already have the correct global seqno
+    return Status::OK();
+  } else if (!ingestion_options_.allow_global_seqno) {
+    return Status::InvalidArgument("Global seqno is required, but disabled");
+  } else if (file_to_ingest->global_seqno_offset == 0) {
+    return Status::InvalidArgument(
+        "Trying to set global seqno for a file that dont have a global seqno "
+        "field");
+  }
+
+  std::unique_ptr<RandomRWFile> rwfile;
+  Status status = env_->NewRandomRWFile(file_to_ingest->internal_file_path,
+                                        &rwfile, env_options_);
+  if (!status.ok()) {
+    return status;
+  }
+
+  // Write the new seqno in the global sequence number field in the file
+  std::string seqno_val;
+  PutFixed64(&seqno_val, seqno);
+  status = rwfile->Write(file_to_ingest->global_seqno_offset, seqno_val);
+  if (status.ok()) {
+    file_to_ingest->assigned_seqno = seqno;
+  }
+  return status;
+}
+
+Status ExternalSstFileIngestionJob::IngestedFileOverlapWithIteratorRange(
+    const IngestedFileInfo* file_to_ingest, InternalIterator* iter,
+    bool* overlap) {
+  auto* vstorage = cfd_->current()->storage_info();
+  auto* ucmp = vstorage->InternalComparator()->user_comparator();
+  InternalKey range_start(file_to_ingest->smallest_user_key, kMaxSequenceNumber,
+                          kValueTypeForSeek);
+  iter->Seek(range_start.Encode());
+  if (!iter->status().ok()) {
+    return iter->status();
+  }
+
+  *overlap = false;
+  if (iter->Valid()) {
+    ParsedInternalKey seek_result;
+    if (!ParseInternalKey(iter->key(), &seek_result)) {
+      return Status::Corruption("DB have corrupted keys");
+    }
+
+    if (ucmp->Compare(seek_result.user_key, file_to_ingest->largest_user_key) <=
+        0) {
+      *overlap = true;
+    }
+  }
+
+  return iter->status();
+}
+
+Status ExternalSstFileIngestionJob::IngestedFileOverlapWithRangeDeletions(
+    const IngestedFileInfo* file_to_ingest, InternalIterator* range_del_iter,
+    bool* overlap) {
+  auto* vstorage = cfd_->current()->storage_info();
+  auto* ucmp = vstorage->InternalComparator()->user_comparator();
+
+  *overlap = false;
+  if (range_del_iter != nullptr) {
+    for (range_del_iter->SeekToFirst(); range_del_iter->Valid();
+         range_del_iter->Next()) {
+      ParsedInternalKey parsed_key;
+      if (!ParseInternalKey(range_del_iter->key(), &parsed_key)) {
+        return Status::Corruption("corrupted range deletion key: " +
+                                  range_del_iter->key().ToString());
+      }
+      RangeTombstone range_del(parsed_key, range_del_iter->value());
+      if (ucmp->Compare(range_del.start_key_,
+                        file_to_ingest->largest_user_key) <= 0 &&
+          ucmp->Compare(file_to_ingest->smallest_user_key,
+                        range_del.end_key_) <= 0) {
+        *overlap = true;
+        break;
+      }
+    }
+  }
+  return Status::OK();
+}
+
+bool ExternalSstFileIngestionJob::IngestedFileFitInLevel(
+    const IngestedFileInfo* file_to_ingest, int level) {
+  if (level == 0) {
+    // Files can always fit in L0
+    return true;
+  }
+
+  auto* vstorage = cfd_->current()->storage_info();
+  Slice file_smallest_user_key(file_to_ingest->smallest_user_key);
+  Slice file_largest_user_key(file_to_ingest->largest_user_key);
+
+  if (vstorage->OverlapInLevel(level, &file_smallest_user_key,
+                               &file_largest_user_key)) {
+    // File overlap with another files in this level, we cannot
+    // add it to this level
+    return false;
+  }
+  if (cfd_->RangeOverlapWithCompaction(file_smallest_user_key,
+                                       file_largest_user_key, level)) {
+    // File overlap with a running compaction output that will be stored
+    // in this level, we cannot add this file to this level
+    return false;
+  }
+
+  // File did not overlap with level files, our compaction output
+  return true;
+}
+
+Status ExternalSstFileIngestionJob::IngestedFileOverlapWithLevel(
+    SuperVersion* sv, IngestedFileInfo* file_to_ingest, int lvl,
+    bool* overlap_with_level) {
+  Arena arena;
+  ReadOptions ro;
+  ro.total_order_seek = true;
+  MergeIteratorBuilder merge_iter_builder(&cfd_->internal_comparator(),
+                                          &arena);
+  sv->current->AddIteratorsForLevel(ro, env_options_, &merge_iter_builder, lvl,
+                                    nullptr /* range_del_agg */);
+  ScopedArenaIterator level_iter(merge_iter_builder.Finish());
+
+  std::vector<InternalIterator*> level_range_del_iters;
+  sv->current->AddRangeDelIteratorsForLevel(ro, env_options_, lvl,
+                                            &level_range_del_iters);
+  std::unique_ptr<InternalIterator> level_range_del_iter(NewMergingIterator(
+      &cfd_->internal_comparator(),
+      level_range_del_iters.empty() ? nullptr : &level_range_del_iters[0],
+      static_cast<int>(level_range_del_iters.size())));
+
+  Status status = IngestedFileOverlapWithIteratorRange(
+      file_to_ingest, level_iter.get(), overlap_with_level);
+  if (status.ok() && *overlap_with_level == false) {
+    status = IngestedFileOverlapWithRangeDeletions(
+        file_to_ingest, level_range_del_iter.get(), overlap_with_level);
+  }
+  return status;
+}
+
+}  // namespace rocksdb
+
+#endif  // !ROCKSDB_LITE

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/external_sst_file_ingestion_job.h
----------------------------------------------------------------------
diff --git a/thirdparty/rocksdb/db/external_sst_file_ingestion_job.h b/thirdparty/rocksdb/db/external_sst_file_ingestion_job.h
new file mode 100644
index 0000000..2d0fade
--- /dev/null
+++ b/thirdparty/rocksdb/db/external_sst_file_ingestion_job.h
@@ -0,0 +1,171 @@
+//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
+
+#pragma once
+#include <string>
+#include <unordered_set>
+#include <vector>
+
+#include "db/column_family.h"
+#include "db/dbformat.h"
+#include "db/internal_stats.h"
+#include "db/snapshot_impl.h"
+#include "options/db_options.h"
+#include "rocksdb/db.h"
+#include "rocksdb/env.h"
+#include "rocksdb/sst_file_writer.h"
+#include "util/autovector.h"
+
+namespace rocksdb {
+
+struct IngestedFileInfo {
+  // External file path
+  std::string external_file_path;
+  // Smallest user key in external file
+  std::string smallest_user_key;
+  // Largest user key in external file
+  std::string largest_user_key;
+  // Sequence number for keys in external file
+  SequenceNumber original_seqno;
+  // Offset of the global sequence number field in the file, will
+  // be zero if version is 1 (global seqno is not supported)
+  size_t global_seqno_offset;
+  // External file size
+  uint64_t file_size;
+  // total number of keys in external file
+  uint64_t num_entries;
+  // Id of column family this file shoule be ingested into
+  uint32_t cf_id;
+  // TableProperties read from external file
+  TableProperties table_properties;
+  // Version of external file
+  int version;
+
+  // FileDescriptor for the file inside the DB
+  FileDescriptor fd;
+  // file path that we picked for file inside the DB
+  std::string internal_file_path = "";
+  // Global sequence number that we picked for the file inside the DB
+  SequenceNumber assigned_seqno = 0;
+  // Level inside the DB we picked for the external file.
+  int picked_level = 0;
+
+  InternalKey smallest_internal_key() const {
+    return InternalKey(smallest_user_key, assigned_seqno,
+                       ValueType::kTypeValue);
+  }
+
+  InternalKey largest_internal_key() const {
+    return InternalKey(largest_user_key, assigned_seqno, ValueType::kTypeValue);
+  }
+};
+
+class ExternalSstFileIngestionJob {
+ public:
+  ExternalSstFileIngestionJob(
+      Env* env, VersionSet* versions, ColumnFamilyData* cfd,
+      const ImmutableDBOptions& db_options, const EnvOptions& env_options,
+      SnapshotList* db_snapshots,
+      const IngestExternalFileOptions& ingestion_options)
+      : env_(env),
+        versions_(versions),
+        cfd_(cfd),
+        db_options_(db_options),
+        env_options_(env_options),
+        db_snapshots_(db_snapshots),
+        ingestion_options_(ingestion_options),
+        job_start_time_(env_->NowMicros()) {}
+
+  // Prepare the job by copying external files into the DB.
+  Status Prepare(const std::vector<std::string>& external_files_paths);
+
+  // Check if we need to flush the memtable before running the ingestion job
+  // This will be true if the files we are ingesting are overlapping with any
+  // key range in the memtable.
+  // REQUIRES: Mutex held
+  Status NeedsFlush(bool* flush_needed);
+
+  // Will execute the ingestion job and prepare edit() to be applied.
+  // REQUIRES: Mutex held
+  Status Run();
+
+  // Update column family stats.
+  // REQUIRES: Mutex held
+  void UpdateStats();
+
+  // Cleanup after successful/failed job
+  void Cleanup(const Status& status);
+
+  VersionEdit* edit() { return &edit_; }
+
+  const autovector<IngestedFileInfo>& files_to_ingest() const {
+    return files_to_ingest_;
+  }
+
+ private:
+  // Open the external file and populate `file_to_ingest` with all the
+  // external information we need to ingest this file.
+  Status GetIngestedFileInfo(const std::string& external_file,
+                             IngestedFileInfo* file_to_ingest);
+
+  // Check if the files we are ingesting overlap with any memtable.
+  // REQUIRES: Mutex held
+  Status IngestedFilesOverlapWithMemtables(SuperVersion* sv, bool* overlap);
+
+  // Assign `file_to_ingest` the appropriate sequence number and  the lowest
+  // possible level that it can be ingested to according to compaction_style.
+  // REQUIRES: Mutex held
+  Status AssignLevelAndSeqnoForIngestedFile(SuperVersion* sv,
+                                            bool force_global_seqno,
+                                            CompactionStyle compaction_style,
+                                            IngestedFileInfo* file_to_ingest,
+                                            SequenceNumber* assigned_seqno);
+
+  // File that we want to ingest behind always goes to the lowest level;
+  // we just check that it fits in the level, that DB allows ingest_behind,
+  // and that we don't have 0 seqnums at the upper levels.
+  // REQUIRES: Mutex held
+  Status CheckLevelForIngestedBehindFile(IngestedFileInfo* file_to_ingest);
+
+  // Set the file global sequence number to `seqno`
+  Status AssignGlobalSeqnoForIngestedFile(IngestedFileInfo* file_to_ingest,
+                                          SequenceNumber seqno);
+
+  // Check if `file_to_ingest` key range overlap with the range `iter` represent
+  // REQUIRES: Mutex held
+  Status IngestedFileOverlapWithIteratorRange(
+      const IngestedFileInfo* file_to_ingest, InternalIterator* iter,
+      bool* overlap);
+
+  // Check if `file_to_ingest` key range overlaps with any range deletions
+  // specified by `iter`.
+  // REQUIRES: Mutex held
+  Status IngestedFileOverlapWithRangeDeletions(
+      const IngestedFileInfo* file_to_ingest, InternalIterator* range_del_iter,
+      bool* overlap);
+
+  // Check if `file_to_ingest` key range overlap with level
+  // REQUIRES: Mutex held
+  Status IngestedFileOverlapWithLevel(SuperVersion* sv,
+    IngestedFileInfo* file_to_ingest, int lvl, bool* overlap_with_level);
+
+  // Check if `file_to_ingest` can fit in level `level`
+  // REQUIRES: Mutex held
+  bool IngestedFileFitInLevel(const IngestedFileInfo* file_to_ingest,
+                              int level);
+
+  Env* env_;
+  VersionSet* versions_;
+  ColumnFamilyData* cfd_;
+  const ImmutableDBOptions& db_options_;
+  const EnvOptions& env_options_;
+  SnapshotList* db_snapshots_;
+  autovector<IngestedFileInfo> files_to_ingest_;
+  const IngestExternalFileOptions& ingestion_options_;
+  VersionEdit edit_;
+  uint64_t job_start_time_;
+};
+
+}  // namespace rocksdb

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/file_indexer.cc
----------------------------------------------------------------------
diff --git a/thirdparty/rocksdb/db/file_indexer.cc b/thirdparty/rocksdb/db/file_indexer.cc
new file mode 100644
index 0000000..abfa7cf
--- /dev/null
+++ b/thirdparty/rocksdb/db/file_indexer.cc
@@ -0,0 +1,214 @@
+//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#include "db/file_indexer.h"
+#include <algorithm>
+#include <functional>
+#include "db/version_edit.h"
+#include "rocksdb/comparator.h"
+
+namespace rocksdb {
+
+FileIndexer::FileIndexer(const Comparator* ucmp)
+    : num_levels_(0), ucmp_(ucmp), level_rb_(nullptr) {}
+
+size_t FileIndexer::NumLevelIndex() const { return next_level_index_.size(); }
+
+size_t FileIndexer::LevelIndexSize(size_t level) const {
+  if (level >= next_level_index_.size()) {
+    return 0;
+  }
+  return next_level_index_[level].num_index;
+}
+
+void FileIndexer::GetNextLevelIndex(const size_t level, const size_t file_index,
+                                    const int cmp_smallest,
+                                    const int cmp_largest, int32_t* left_bound,
+                                    int32_t* right_bound) const {
+  assert(level > 0);
+
+  // Last level, no hint
+  if (level == num_levels_ - 1) {
+    *left_bound = 0;
+    *right_bound = -1;
+    return;
+  }
+
+  assert(level < num_levels_ - 1);
+  assert(static_cast<int32_t>(file_index) <= level_rb_[level]);
+
+  const IndexUnit* index_units = next_level_index_[level].index_units;
+  const auto& index = index_units[file_index];
+
+  if (cmp_smallest < 0) {
+    *left_bound = (level > 0 && file_index > 0)
+                      ? index_units[file_index - 1].largest_lb
+                      : 0;
+    *right_bound = index.smallest_rb;
+  } else if (cmp_smallest == 0) {
+    *left_bound = index.smallest_lb;
+    *right_bound = index.smallest_rb;
+  } else if (cmp_smallest > 0 && cmp_largest < 0) {
+    *left_bound = index.smallest_lb;
+    *right_bound = index.largest_rb;
+  } else if (cmp_largest == 0) {
+    *left_bound = index.largest_lb;
+    *right_bound = index.largest_rb;
+  } else if (cmp_largest > 0) {
+    *left_bound = index.largest_lb;
+    *right_bound = level_rb_[level + 1];
+  } else {
+    assert(false);
+  }
+
+  assert(*left_bound >= 0);
+  assert(*left_bound <= *right_bound + 1);
+  assert(*right_bound <= level_rb_[level + 1]);
+}
+
+void FileIndexer::UpdateIndex(Arena* arena, const size_t num_levels,
+                              std::vector<FileMetaData*>* const files) {
+  if (files == nullptr) {
+    return;
+  }
+  if (num_levels == 0) {  // uint_32 0-1 would cause bad behavior
+    num_levels_ = num_levels;
+    return;
+  }
+  assert(level_rb_ == nullptr);  // level_rb_ should be init here
+
+  num_levels_ = num_levels;
+  next_level_index_.resize(num_levels);
+
+  char* mem = arena->AllocateAligned(num_levels_ * sizeof(int32_t));
+  level_rb_ = new (mem) int32_t[num_levels_];
+  for (size_t i = 0; i < num_levels_; i++) {
+    level_rb_[i] = -1;
+  }
+
+  // L1 - Ln-1
+  for (size_t level = 1; level < num_levels_ - 1; ++level) {
+    const auto& upper_files = files[level];
+    const int32_t upper_size = static_cast<int32_t>(upper_files.size());
+    const auto& lower_files = files[level + 1];
+    level_rb_[level] = static_cast<int32_t>(upper_files.size()) - 1;
+    if (upper_size == 0) {
+      continue;
+    }
+    IndexLevel& index_level = next_level_index_[level];
+    index_level.num_index = upper_size;
+    mem = arena->AllocateAligned(upper_size * sizeof(IndexUnit));
+    index_level.index_units = new (mem) IndexUnit[upper_size];
+
+    CalculateLB(
+        upper_files, lower_files, &index_level,
+        [this](const FileMetaData * a, const FileMetaData * b)->int {
+          return ucmp_->Compare(a->smallest.user_key(), b->largest.user_key());
+        },
+        [](IndexUnit* index, int32_t f_idx) { index->smallest_lb = f_idx; });
+    CalculateLB(
+        upper_files, lower_files, &index_level,
+        [this](const FileMetaData * a, const FileMetaData * b)->int {
+          return ucmp_->Compare(a->largest.user_key(), b->largest.user_key());
+        },
+        [](IndexUnit* index, int32_t f_idx) { index->largest_lb = f_idx; });
+    CalculateRB(
+        upper_files, lower_files, &index_level,
+        [this](const FileMetaData * a, const FileMetaData * b)->int {
+          return ucmp_->Compare(a->smallest.user_key(), b->smallest.user_key());
+        },
+        [](IndexUnit* index, int32_t f_idx) { index->smallest_rb = f_idx; });
+    CalculateRB(
+        upper_files, lower_files, &index_level,
+        [this](const FileMetaData * a, const FileMetaData * b)->int {
+          return ucmp_->Compare(a->largest.user_key(), b->smallest.user_key());
+        },
+        [](IndexUnit* index, int32_t f_idx) { index->largest_rb = f_idx; });
+  }
+
+  level_rb_[num_levels_ - 1] =
+      static_cast<int32_t>(files[num_levels_ - 1].size()) - 1;
+}
+
+void FileIndexer::CalculateLB(
+    const std::vector<FileMetaData*>& upper_files,
+    const std::vector<FileMetaData*>& lower_files, IndexLevel* index_level,
+    std::function<int(const FileMetaData*, const FileMetaData*)> cmp_op,
+    std::function<void(IndexUnit*, int32_t)> set_index) {
+  const int32_t upper_size = static_cast<int32_t>(upper_files.size());
+  const int32_t lower_size = static_cast<int32_t>(lower_files.size());
+  int32_t upper_idx = 0;
+  int32_t lower_idx = 0;
+
+  IndexUnit* index = index_level->index_units;
+  while (upper_idx < upper_size && lower_idx < lower_size) {
+    int cmp = cmp_op(upper_files[upper_idx], lower_files[lower_idx]);
+
+    if (cmp == 0) {
+      set_index(&index[upper_idx], lower_idx);
+      ++upper_idx;
+      ++lower_idx;
+    } else if (cmp > 0) {
+      // Lower level's file (largest) is smaller, a key won't hit in that
+      // file. Move to next lower file
+      ++lower_idx;
+    } else {
+      // Lower level's file becomes larger, update the index, and
+      // move to the next upper file
+      set_index(&index[upper_idx], lower_idx);
+      ++upper_idx;
+    }
+  }
+
+  while (upper_idx < upper_size) {
+    // Lower files are exhausted, that means the remaining upper files are
+    // greater than any lower files. Set the index to be the lower level size.
+    set_index(&index[upper_idx], lower_size);
+    ++upper_idx;
+  }
+}
+
+void FileIndexer::CalculateRB(
+    const std::vector<FileMetaData*>& upper_files,
+    const std::vector<FileMetaData*>& lower_files, IndexLevel* index_level,
+    std::function<int(const FileMetaData*, const FileMetaData*)> cmp_op,
+    std::function<void(IndexUnit*, int32_t)> set_index) {
+  const int32_t upper_size = static_cast<int32_t>(upper_files.size());
+  const int32_t lower_size = static_cast<int32_t>(lower_files.size());
+  int32_t upper_idx = upper_size - 1;
+  int32_t lower_idx = lower_size - 1;
+
+  IndexUnit* index = index_level->index_units;
+  while (upper_idx >= 0 && lower_idx >= 0) {
+    int cmp = cmp_op(upper_files[upper_idx], lower_files[lower_idx]);
+
+    if (cmp == 0) {
+      set_index(&index[upper_idx], lower_idx);
+      --upper_idx;
+      --lower_idx;
+    } else if (cmp < 0) {
+      // Lower level's file (smallest) is larger, a key won't hit in that
+      // file. Move to next lower file.
+      --lower_idx;
+    } else {
+      // Lower level's file becomes smaller, update the index, and move to
+      // the next the upper file
+      set_index(&index[upper_idx], lower_idx);
+      --upper_idx;
+    }
+  }
+  while (upper_idx >= 0) {
+    // Lower files are exhausted, that means the remaining upper files are
+    // smaller than any lower files. Set it to -1.
+    set_index(&index[upper_idx], -1);
+    --upper_idx;
+  }
+}
+
+}  // namespace rocksdb

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/file_indexer.h
----------------------------------------------------------------------
diff --git a/thirdparty/rocksdb/db/file_indexer.h b/thirdparty/rocksdb/db/file_indexer.h
new file mode 100644
index 0000000..1bef3aa
--- /dev/null
+++ b/thirdparty/rocksdb/db/file_indexer.h
@@ -0,0 +1,142 @@
+//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#pragma once
+#include <cstdint>
+#include <functional>
+#include <limits>
+#include <vector>
+#include "port/port.h"
+#include "util/arena.h"
+#include "util/autovector.h"
+
+namespace rocksdb {
+
+class Comparator;
+struct FileMetaData;
+struct FdWithKeyRange;
+struct FileLevel;
+
+// The file tree structure in Version is prebuilt and the range of each file
+// is known. On Version::Get(), it uses binary search to find a potential file
+// and then check if a target key can be found in the file by comparing the key
+// to each file's smallest and largest key. The results of these comparisons
+// can be reused beyond checking if a key falls into a file's range.
+// With some pre-calculated knowledge, each key comparison that has been done
+// can serve as a hint to narrow down further searches: if a key compared to
+// be smaller than a file's smallest or largest, that comparison can be used
+// to find out the right bound of next binary search. Similarly, if a key
+// compared to be larger than a file's smallest or largest, it can be utilized
+// to find out the left bound of next binary search.
+// With these hints: it can greatly reduce the range of binary search,
+// especially for bottom levels, given that one file most likely overlaps with
+// only N files from level below (where N is max_bytes_for_level_multiplier).
+// So on level L, we will only look at ~N files instead of N^L files on the
+// naive approach.
+class FileIndexer {
+ public:
+  explicit FileIndexer(const Comparator* ucmp);
+
+  size_t NumLevelIndex() const;
+
+  size_t LevelIndexSize(size_t level) const;
+
+  // Return a file index range in the next level to search for a key based on
+  // smallest and largest key comparison for the current file specified by
+  // level and file_index. When *left_index < *right_index, both index should
+  // be valid and fit in the vector size.
+  void GetNextLevelIndex(const size_t level, const size_t file_index,
+                         const int cmp_smallest, const int cmp_largest,
+                         int32_t* left_bound, int32_t* right_bound) const;
+
+  void UpdateIndex(Arena* arena, const size_t num_levels,
+                   std::vector<FileMetaData*>* const files);
+
+  enum {
+    // MSVC version 1800 still does not have constexpr for ::max()
+    kLevelMaxIndex = rocksdb::port::kMaxInt32
+  };
+
+ private:
+  size_t num_levels_;
+  const Comparator* ucmp_;
+
+  struct IndexUnit {
+    IndexUnit()
+      : smallest_lb(0), largest_lb(0), smallest_rb(-1), largest_rb(-1) {}
+    // During file search, a key is compared against smallest and largest
+    // from a FileMetaData. It can have 3 possible outcomes:
+    // (1) key is smaller than smallest, implying it is also smaller than
+    //     larger. Precalculated index based on "smallest < smallest" can
+    //     be used to provide right bound.
+    // (2) key is in between smallest and largest.
+    //     Precalculated index based on "smallest > greatest" can be used to
+    //     provide left bound.
+    //     Precalculated index based on "largest < smallest" can be used to
+    //     provide right bound.
+    // (3) key is larger than largest, implying it is also larger than smallest.
+    //     Precalculated index based on "largest > largest" can be used to
+    //     provide left bound.
+    //
+    // As a result, we will need to do:
+    // Compare smallest (<=) and largest keys from upper level file with
+    // smallest key from lower level to get a right bound.
+    // Compare smallest (>=) and largest keys from upper level file with
+    // largest key from lower level to get a left bound.
+    //
+    // Example:
+    //    level 1:              [50 - 60]
+    //    level 2:        [1 - 40], [45 - 55], [58 - 80]
+    // A key 35, compared to be less than 50, 3rd file on level 2 can be
+    // skipped according to rule (1). LB = 0, RB = 1.
+    // A key 53, sits in the middle 50 and 60. 1st file on level 2 can be
+    // skipped according to rule (2)-a, but the 3rd file cannot be skipped
+    // because 60 is greater than 58. LB = 1, RB = 2.
+    // A key 70, compared to be larger than 60. 1st and 2nd file can be skipped
+    // according to rule (3). LB = 2, RB = 2.
+    //
+    // Point to a left most file in a lower level that may contain a key,
+    // which compares greater than smallest of a FileMetaData (upper level)
+    int32_t smallest_lb;
+    // Point to a left most file in a lower level that may contain a key,
+    // which compares greater than largest of a FileMetaData (upper level)
+    int32_t largest_lb;
+    // Point to a right most file in a lower level that may contain a key,
+    // which compares smaller than smallest of a FileMetaData (upper level)
+    int32_t smallest_rb;
+    // Point to a right most file in a lower level that may contain a key,
+    // which compares smaller than largest of a FileMetaData (upper level)
+    int32_t largest_rb;
+  };
+
+  // Data structure to store IndexUnits in a whole level
+  struct IndexLevel {
+    size_t num_index;
+    IndexUnit* index_units;
+
+    IndexLevel() : num_index(0), index_units(nullptr) {}
+  };
+
+  void CalculateLB(
+      const std::vector<FileMetaData*>& upper_files,
+      const std::vector<FileMetaData*>& lower_files, IndexLevel* index_level,
+      std::function<int(const FileMetaData*, const FileMetaData*)> cmp_op,
+      std::function<void(IndexUnit*, int32_t)> set_index);
+
+  void CalculateRB(
+      const std::vector<FileMetaData*>& upper_files,
+      const std::vector<FileMetaData*>& lower_files, IndexLevel* index_level,
+      std::function<int(const FileMetaData*, const FileMetaData*)> cmp_op,
+      std::function<void(IndexUnit*, int32_t)> set_index);
+
+  autovector<IndexLevel> next_level_index_;
+  int32_t* level_rb_;
+};
+
+}  // namespace rocksdb


Mime
View raw message