nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jeremyd...@apache.org
Subject [12/51] [partial] nifi-minifi-cpp git commit: MINIFI-372: Replace leveldb with RocksDB
Date Mon, 09 Oct 2017 16:24:52 GMT
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/memtable.h
----------------------------------------------------------------------
diff --git a/thirdparty/rocksdb/db/memtable.h b/thirdparty/rocksdb/db/memtable.h
new file mode 100644
index 0000000..fe9feaf
--- /dev/null
+++ b/thirdparty/rocksdb/db/memtable.h
@@ -0,0 +1,427 @@
+//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#pragma once
+#include <atomic>
+#include <deque>
+#include <functional>
+#include <memory>
+#include <string>
+#include <unordered_map>
+#include <vector>
+#include "db/dbformat.h"
+#include "db/range_del_aggregator.h"
+#include "db/version_edit.h"
+#include "monitoring/instrumented_mutex.h"
+#include "options/cf_options.h"
+#include "rocksdb/db.h"
+#include "rocksdb/env.h"
+#include "rocksdb/memtablerep.h"
+#include "util/allocator.h"
+#include "util/concurrent_arena.h"
+#include "util/dynamic_bloom.h"
+#include "util/hash.h"
+
+namespace rocksdb {
+
+class Mutex;
+class MemTableIterator;
+class MergeContext;
+class InternalIterator;
+
+struct MemTableOptions {
+  explicit MemTableOptions(
+      const ImmutableCFOptions& ioptions,
+      const MutableCFOptions& mutable_cf_options);
+  size_t write_buffer_size;
+  size_t arena_block_size;
+  uint32_t memtable_prefix_bloom_bits;
+  size_t memtable_huge_page_size;
+  bool inplace_update_support;
+  size_t inplace_update_num_locks;
+  UpdateStatus (*inplace_callback)(char* existing_value,
+                                   uint32_t* existing_value_size,
+                                   Slice delta_value,
+                                   std::string* merged_value);
+  size_t max_successive_merges;
+  Statistics* statistics;
+  MergeOperator* merge_operator;
+  Logger* info_log;
+};
+
+// Batched counters to updated when inserting keys in one write batch.
+// In post process of the write batch, these can be updated together.
+// Only used in concurrent memtable insert case.
+struct MemTablePostProcessInfo {
+  uint64_t data_size = 0;
+  uint64_t num_entries = 0;
+  uint64_t num_deletes = 0;
+};
+
+// Note:  Many of the methods in this class have comments indicating that
+// external synchromization is required as these methods are not thread-safe.
+// It is up to higher layers of code to decide how to prevent concurrent
+// invokation of these methods.  This is usually done by acquiring either
+// the db mutex or the single writer thread.
+//
+// Some of these methods are documented to only require external
+// synchronization if this memtable is immutable.  Calling MarkImmutable() is
+// not sufficient to guarantee immutability.  It is up to higher layers of
+// code to determine if this MemTable can still be modified by other threads.
+// Eg: The Superversion stores a pointer to the current MemTable (that can
+// be modified) and a separate list of the MemTables that can no longer be
+// written to (aka the 'immutable memtables').
+class MemTable {
+ public:
+  struct KeyComparator : public MemTableRep::KeyComparator {
+    const InternalKeyComparator comparator;
+    explicit KeyComparator(const InternalKeyComparator& c) : comparator(c) { }
+    virtual int operator()(const char* prefix_len_key1,
+                           const char* prefix_len_key2) const override;
+    virtual int operator()(const char* prefix_len_key,
+                           const Slice& key) const override;
+  };
+
+  // MemTables are reference counted.  The initial reference count
+  // is zero and the caller must call Ref() at least once.
+  //
+  // earliest_seq should be the current SequenceNumber in the db such that any
+  // key inserted into this memtable will have an equal or larger seq number.
+  // (When a db is first created, the earliest sequence number will be 0).
+  // If the earliest sequence number is not known, kMaxSequenceNumber may be
+  // used, but this may prevent some transactions from succeeding until the
+  // first key is inserted into the memtable.
+  explicit MemTable(const InternalKeyComparator& comparator,
+                    const ImmutableCFOptions& ioptions,
+                    const MutableCFOptions& mutable_cf_options,
+                    WriteBufferManager* write_buffer_manager,
+                    SequenceNumber earliest_seq, uint32_t column_family_id);
+
+  // Do not delete this MemTable unless Unref() indicates it not in use.
+  ~MemTable();
+
+  // Increase reference count.
+  // REQUIRES: external synchronization to prevent simultaneous
+  // operations on the same MemTable.
+  void Ref() { ++refs_; }
+
+  // Drop reference count.
+  // If the refcount goes to zero return this memtable, otherwise return null.
+  // REQUIRES: external synchronization to prevent simultaneous
+  // operations on the same MemTable.
+  MemTable* Unref() {
+    --refs_;
+    assert(refs_ >= 0);
+    if (refs_ <= 0) {
+      return this;
+    }
+    return nullptr;
+  }
+
+  // Returns an estimate of the number of bytes of data in use by this
+  // data structure.
+  //
+  // REQUIRES: external synchronization to prevent simultaneous
+  // operations on the same MemTable (unless this Memtable is immutable).
+  size_t ApproximateMemoryUsage();
+
+  // This method heuristically determines if the memtable should continue to
+  // host more data.
+  bool ShouldScheduleFlush() const {
+    return flush_state_.load(std::memory_order_relaxed) == FLUSH_REQUESTED;
+  }
+
+  // Returns true if a flush should be scheduled and the caller should
+  // be the one to schedule it
+  bool MarkFlushScheduled() {
+    auto before = FLUSH_REQUESTED;
+    return flush_state_.compare_exchange_strong(before, FLUSH_SCHEDULED,
+                                                std::memory_order_relaxed,
+                                                std::memory_order_relaxed);
+  }
+
+  // Return an iterator that yields the contents of the memtable.
+  //
+  // The caller must ensure that the underlying MemTable remains live
+  // while the returned iterator is live.  The keys returned by this
+  // iterator are internal keys encoded by AppendInternalKey in the
+  // db/dbformat.{h,cc} module.
+  //
+  // By default, it returns an iterator for prefix seek if prefix_extractor
+  // is configured in Options.
+  // arena: If not null, the arena needs to be used to allocate the Iterator.
+  //        Calling ~Iterator of the iterator will destroy all the states but
+  //        those allocated in arena.
+  InternalIterator* NewIterator(const ReadOptions& read_options, Arena* arena);
+
+  InternalIterator* NewRangeTombstoneIterator(const ReadOptions& read_options);
+
+  // Add an entry into memtable that maps key to value at the
+  // specified sequence number and with the specified type.
+  // Typically value will be empty if type==kTypeDeletion.
+  //
+  // REQUIRES: if allow_concurrent = false, external synchronization to prevent
+  // simultaneous operations on the same MemTable.
+  void Add(SequenceNumber seq, ValueType type, const Slice& key,
+           const Slice& value, bool allow_concurrent = false,
+           MemTablePostProcessInfo* post_process_info = nullptr);
+
+  // If memtable contains a value for key, store it in *value and return true.
+  // If memtable contains a deletion for key, store a NotFound() error
+  // in *status and return true.
+  // If memtable contains Merge operation as the most recent entry for a key,
+  //   and the merge process does not stop (not reaching a value or delete),
+  //   prepend the current merge operand to *operands.
+  //   store MergeInProgress in s, and return false.
+  // Else, return false.
+  // If any operation was found, its most recent sequence number
+  // will be stored in *seq on success (regardless of whether true/false is
+  // returned).  Otherwise, *seq will be set to kMaxSequenceNumber.
+  // On success, *s may be set to OK, NotFound, or MergeInProgress.  Any other
+  // status returned indicates a corruption or other unexpected error.
+  bool Get(const LookupKey& key, std::string* value, Status* s,
+           MergeContext* merge_context, RangeDelAggregator* range_del_agg,
+           SequenceNumber* seq, const ReadOptions& read_opts);
+
+  bool Get(const LookupKey& key, std::string* value, Status* s,
+           MergeContext* merge_context, RangeDelAggregator* range_del_agg,
+           const ReadOptions& read_opts) {
+    SequenceNumber seq;
+    return Get(key, value, s, merge_context, range_del_agg, &seq, read_opts);
+  }
+
+  // Attempts to update the new_value inplace, else does normal Add
+  // Pseudocode
+  //   if key exists in current memtable && prev_value is of type kTypeValue
+  //     if new sizeof(new_value) <= sizeof(prev_value)
+  //       update inplace
+  //     else add(key, new_value)
+  //   else add(key, new_value)
+  //
+  // REQUIRES: external synchronization to prevent simultaneous
+  // operations on the same MemTable.
+  void Update(SequenceNumber seq,
+              const Slice& key,
+              const Slice& value);
+
+  // If prev_value for key exists, attempts to update it inplace.
+  // else returns false
+  // Pseudocode
+  //   if key exists in current memtable && prev_value is of type kTypeValue
+  //     new_value = delta(prev_value)
+  //     if sizeof(new_value) <= sizeof(prev_value)
+  //       update inplace
+  //     else add(key, new_value)
+  //   else return false
+  //
+  // REQUIRES: external synchronization to prevent simultaneous
+  // operations on the same MemTable.
+  bool UpdateCallback(SequenceNumber seq,
+                      const Slice& key,
+                      const Slice& delta);
+
+  // Returns the number of successive merge entries starting from the newest
+  // entry for the key up to the last non-merge entry or last entry for the
+  // key in the memtable.
+  size_t CountSuccessiveMergeEntries(const LookupKey& key);
+
+  // Update counters and flush status after inserting a whole write batch
+  // Used in concurrent memtable inserts.
+  void BatchPostProcess(const MemTablePostProcessInfo& update_counters) {
+    num_entries_.fetch_add(update_counters.num_entries,
+                           std::memory_order_relaxed);
+    data_size_.fetch_add(update_counters.data_size, std::memory_order_relaxed);
+    if (update_counters.num_deletes != 0) {
+      num_deletes_.fetch_add(update_counters.num_deletes,
+                             std::memory_order_relaxed);
+    }
+    UpdateFlushState();
+  }
+
+  // Get total number of entries in the mem table.
+  // REQUIRES: external synchronization to prevent simultaneous
+  // operations on the same MemTable (unless this Memtable is immutable).
+  uint64_t num_entries() const {
+    return num_entries_.load(std::memory_order_relaxed);
+  }
+
+  // Get total number of deletes in the mem table.
+  // REQUIRES: external synchronization to prevent simultaneous
+  // operations on the same MemTable (unless this Memtable is immutable).
+  uint64_t num_deletes() const {
+    return num_deletes_.load(std::memory_order_relaxed);
+  }
+
+  // Returns the edits area that is needed for flushing the memtable
+  VersionEdit* GetEdits() { return &edit_; }
+
+  // Returns if there is no entry inserted to the mem table.
+  // REQUIRES: external synchronization to prevent simultaneous
+  // operations on the same MemTable (unless this Memtable is immutable).
+  bool IsEmpty() const { return first_seqno_ == 0; }
+
+  // Returns the sequence number of the first element that was inserted
+  // into the memtable.
+  // REQUIRES: external synchronization to prevent simultaneous
+  // operations on the same MemTable (unless this Memtable is immutable).
+  SequenceNumber GetFirstSequenceNumber() {
+    return first_seqno_.load(std::memory_order_relaxed);
+  }
+
+  // Returns the sequence number that is guaranteed to be smaller than or equal
+  // to the sequence number of any key that could be inserted into this
+  // memtable. It can then be assumed that any write with a larger(or equal)
+  // sequence number will be present in this memtable or a later memtable.
+  //
+  // If the earliest sequence number could not be determined,
+  // kMaxSequenceNumber will be returned.
+  SequenceNumber GetEarliestSequenceNumber() {
+    return earliest_seqno_.load(std::memory_order_relaxed);
+  }
+
+  // DB's latest sequence ID when the memtable is created. This number
+  // may be updated to a more recent one before any key is inserted.
+  SequenceNumber GetCreationSeq() const { return creation_seq_; }
+
+  void SetCreationSeq(SequenceNumber sn) { creation_seq_ = sn; }
+
+  // Returns the next active logfile number when this memtable is about to
+  // be flushed to storage
+  // REQUIRES: external synchronization to prevent simultaneous
+  // operations on the same MemTable.
+  uint64_t GetNextLogNumber() { return mem_next_logfile_number_; }
+
+  // Sets the next active logfile number when this memtable is about to
+  // be flushed to storage
+  // REQUIRES: external synchronization to prevent simultaneous
+  // operations on the same MemTable.
+  void SetNextLogNumber(uint64_t num) { mem_next_logfile_number_ = num; }
+
+  // if this memtable contains data from a committed
+  // two phase transaction we must take note of the
+  // log which contains that data so we can know
+  // when to relese that log
+  void RefLogContainingPrepSection(uint64_t log);
+  uint64_t GetMinLogContainingPrepSection();
+
+  // Notify the underlying storage that no more items will be added.
+  // REQUIRES: external synchronization to prevent simultaneous
+  // operations on the same MemTable.
+  // After MarkImmutable() is called, you should not attempt to
+  // write anything to this MemTable().  (Ie. do not call Add() or Update()).
+  void MarkImmutable() {
+    table_->MarkReadOnly();
+    mem_tracker_.DoneAllocating();
+  }
+
+  // return true if the current MemTableRep supports merge operator.
+  bool IsMergeOperatorSupported() const {
+    return table_->IsMergeOperatorSupported();
+  }
+
+  // return true if the current MemTableRep supports snapshots.
+  // inplace update prevents snapshots,
+  bool IsSnapshotSupported() const {
+    return table_->IsSnapshotSupported() && !moptions_.inplace_update_support;
+  }
+
+  struct MemTableStats {
+    uint64_t size;
+    uint64_t count;
+  };
+
+  MemTableStats ApproximateStats(const Slice& start_ikey,
+                                 const Slice& end_ikey);
+
+  // Get the lock associated for the key
+  port::RWMutex* GetLock(const Slice& key);
+
+  const InternalKeyComparator& GetInternalKeyComparator() const {
+    return comparator_.comparator;
+  }
+
+  const MemTableOptions* GetMemTableOptions() const { return &moptions_; }
+
+ private:
+  enum FlushStateEnum { FLUSH_NOT_REQUESTED, FLUSH_REQUESTED, FLUSH_SCHEDULED };
+
+  friend class MemTableIterator;
+  friend class MemTableBackwardIterator;
+  friend class MemTableList;
+
+  KeyComparator comparator_;
+  const MemTableOptions moptions_;
+  int refs_;
+  const size_t kArenaBlockSize;
+  AllocTracker mem_tracker_;
+  ConcurrentArena arena_;
+  unique_ptr<MemTableRep> table_;
+  unique_ptr<MemTableRep> range_del_table_;
+  bool is_range_del_table_empty_;
+
+  // Total data size of all data inserted
+  std::atomic<uint64_t> data_size_;
+  std::atomic<uint64_t> num_entries_;
+  std::atomic<uint64_t> num_deletes_;
+
+  // These are used to manage memtable flushes to storage
+  bool flush_in_progress_; // started the flush
+  bool flush_completed_;   // finished the flush
+  uint64_t file_number_;    // filled up after flush is complete
+
+  // The updates to be applied to the transaction log when this
+  // memtable is flushed to storage.
+  VersionEdit edit_;
+
+  // The sequence number of the kv that was inserted first
+  std::atomic<SequenceNumber> first_seqno_;
+
+  // The db sequence number at the time of creation or kMaxSequenceNumber
+  // if not set.
+  std::atomic<SequenceNumber> earliest_seqno_;
+
+  SequenceNumber creation_seq_;
+
+  // The log files earlier than this number can be deleted.
+  uint64_t mem_next_logfile_number_;
+
+  // the earliest log containing a prepared section
+  // which has been inserted into this memtable.
+  std::atomic<uint64_t> min_prep_log_referenced_;
+
+  // rw locks for inplace updates
+  std::vector<port::RWMutex> locks_;
+
+  const SliceTransform* const prefix_extractor_;
+  std::unique_ptr<DynamicBloom> prefix_bloom_;
+
+  std::atomic<FlushStateEnum> flush_state_;
+
+  Env* env_;
+
+  // Extract sequential insert prefixes.
+  const SliceTransform* insert_with_hint_prefix_extractor_;
+
+  // Insert hints for each prefix.
+  std::unordered_map<Slice, void*, SliceHasher> insert_hints_;
+
+  // Returns a heuristic flush decision
+  bool ShouldFlushNow() const;
+
+  // Updates flush_state_ using ShouldFlushNow()
+  void UpdateFlushState();
+
+  // No copying allowed
+  MemTable(const MemTable&);
+  MemTable& operator=(const MemTable&);
+};
+
+extern const char* EncodeKey(std::string* scratch, const Slice& target);
+
+}  // namespace rocksdb

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/memtable_list.cc
----------------------------------------------------------------------
diff --git a/thirdparty/rocksdb/db/memtable_list.cc b/thirdparty/rocksdb/db/memtable_list.cc
new file mode 100644
index 0000000..8f710c2
--- /dev/null
+++ b/thirdparty/rocksdb/db/memtable_list.cc
@@ -0,0 +1,482 @@
+//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
+//
+#include "db/memtable_list.h"
+
+#ifndef __STDC_FORMAT_MACROS
+#define __STDC_FORMAT_MACROS
+#endif
+
+#include <inttypes.h>
+#include <string>
+#include "db/memtable.h"
+#include "db/version_set.h"
+#include "monitoring/thread_status_util.h"
+#include "rocksdb/db.h"
+#include "rocksdb/env.h"
+#include "rocksdb/iterator.h"
+#include "table/merging_iterator.h"
+#include "util/coding.h"
+#include "util/log_buffer.h"
+#include "util/sync_point.h"
+
+namespace rocksdb {
+
+class InternalKeyComparator;
+class Mutex;
+class VersionSet;
+
+void MemTableListVersion::AddMemTable(MemTable* m) {
+  memlist_.push_front(m);
+  *parent_memtable_list_memory_usage_ += m->ApproximateMemoryUsage();
+}
+
+void MemTableListVersion::UnrefMemTable(autovector<MemTable*>* to_delete,
+                                        MemTable* m) {
+  if (m->Unref()) {
+    to_delete->push_back(m);
+    assert(*parent_memtable_list_memory_usage_ >= m->ApproximateMemoryUsage());
+    *parent_memtable_list_memory_usage_ -= m->ApproximateMemoryUsage();
+  } else {
+  }
+}
+
+MemTableListVersion::MemTableListVersion(
+    size_t* parent_memtable_list_memory_usage, MemTableListVersion* old)
+    : max_write_buffer_number_to_maintain_(
+          old->max_write_buffer_number_to_maintain_),
+      parent_memtable_list_memory_usage_(parent_memtable_list_memory_usage) {
+  if (old != nullptr) {
+    memlist_ = old->memlist_;
+    for (auto& m : memlist_) {
+      m->Ref();
+    }
+
+    memlist_history_ = old->memlist_history_;
+    for (auto& m : memlist_history_) {
+      m->Ref();
+    }
+  }
+}
+
+MemTableListVersion::MemTableListVersion(
+    size_t* parent_memtable_list_memory_usage,
+    int max_write_buffer_number_to_maintain)
+    : max_write_buffer_number_to_maintain_(max_write_buffer_number_to_maintain),
+      parent_memtable_list_memory_usage_(parent_memtable_list_memory_usage) {}
+
+void MemTableListVersion::Ref() { ++refs_; }
+
+// called by superversion::clean()
+void MemTableListVersion::Unref(autovector<MemTable*>* to_delete) {
+  assert(refs_ >= 1);
+  --refs_;
+  if (refs_ == 0) {
+    // if to_delete is equal to nullptr it means we're confident
+    // that refs_ will not be zero
+    assert(to_delete != nullptr);
+    for (const auto& m : memlist_) {
+      UnrefMemTable(to_delete, m);
+    }
+    for (const auto& m : memlist_history_) {
+      UnrefMemTable(to_delete, m);
+    }
+    delete this;
+  }
+}
+
+int MemTableList::NumNotFlushed() const {
+  int size = static_cast<int>(current_->memlist_.size());
+  assert(num_flush_not_started_ <= size);
+  return size;
+}
+
+int MemTableList::NumFlushed() const {
+  return static_cast<int>(current_->memlist_history_.size());
+}
+
+// Search all the memtables starting from the most recent one.
+// Return the most recent value found, if any.
+// Operands stores the list of merge operations to apply, so far.
+bool MemTableListVersion::Get(const LookupKey& key, std::string* value,
+                              Status* s, MergeContext* merge_context,
+                              RangeDelAggregator* range_del_agg,
+                              SequenceNumber* seq,
+                              const ReadOptions& read_opts) {
+  return GetFromList(&memlist_, key, value, s, merge_context, range_del_agg,
+                     seq, read_opts);
+}
+
+bool MemTableListVersion::GetFromHistory(const LookupKey& key,
+                                         std::string* value, Status* s,
+                                         MergeContext* merge_context,
+                                         RangeDelAggregator* range_del_agg,
+                                         SequenceNumber* seq,
+                                         const ReadOptions& read_opts) {
+  return GetFromList(&memlist_history_, key, value, s, merge_context,
+                     range_del_agg, seq, read_opts);
+}
+
+bool MemTableListVersion::GetFromList(std::list<MemTable*>* list,
+                                      const LookupKey& key, std::string* value,
+                                      Status* s, MergeContext* merge_context,
+                                      RangeDelAggregator* range_del_agg,
+                                      SequenceNumber* seq,
+                                      const ReadOptions& read_opts) {
+  *seq = kMaxSequenceNumber;
+
+  for (auto& memtable : *list) {
+    SequenceNumber current_seq = kMaxSequenceNumber;
+
+    bool done = memtable->Get(key, value, s, merge_context, range_del_agg,
+                              &current_seq, read_opts);
+    if (*seq == kMaxSequenceNumber) {
+      // Store the most recent sequence number of any operation on this key.
+      // Since we only care about the most recent change, we only need to
+      // return the first operation found when searching memtables in
+      // reverse-chronological order.
+      *seq = current_seq;
+    }
+
+    if (done) {
+      assert(*seq != kMaxSequenceNumber);
+      return true;
+    }
+    if (!done && !s->ok() && !s->IsMergeInProgress() && !s->IsNotFound()) {
+      return false;
+    }
+  }
+  return false;
+}
+
+Status MemTableListVersion::AddRangeTombstoneIterators(
+    const ReadOptions& read_opts, Arena* arena,
+    RangeDelAggregator* range_del_agg) {
+  assert(range_del_agg != nullptr);
+  for (auto& m : memlist_) {
+    std::unique_ptr<InternalIterator> range_del_iter(
+        m->NewRangeTombstoneIterator(read_opts));
+    Status s = range_del_agg->AddTombstones(std::move(range_del_iter));
+    if (!s.ok()) {
+      return s;
+    }
+  }
+  return Status::OK();
+}
+
+Status MemTableListVersion::AddRangeTombstoneIterators(
+    const ReadOptions& read_opts,
+    std::vector<InternalIterator*>* range_del_iters) {
+  for (auto& m : memlist_) {
+    auto* range_del_iter = m->NewRangeTombstoneIterator(read_opts);
+    if (range_del_iter != nullptr) {
+      range_del_iters->push_back(range_del_iter);
+    }
+  }
+  return Status::OK();
+}
+
+void MemTableListVersion::AddIterators(
+    const ReadOptions& options, std::vector<InternalIterator*>* iterator_list,
+    Arena* arena) {
+  for (auto& m : memlist_) {
+    iterator_list->push_back(m->NewIterator(options, arena));
+  }
+}
+
+void MemTableListVersion::AddIterators(
+    const ReadOptions& options, MergeIteratorBuilder* merge_iter_builder) {
+  for (auto& m : memlist_) {
+    merge_iter_builder->AddIterator(
+        m->NewIterator(options, merge_iter_builder->GetArena()));
+  }
+}
+
+uint64_t MemTableListVersion::GetTotalNumEntries() const {
+  uint64_t total_num = 0;
+  for (auto& m : memlist_) {
+    total_num += m->num_entries();
+  }
+  return total_num;
+}
+
+MemTable::MemTableStats MemTableListVersion::ApproximateStats(
+    const Slice& start_ikey, const Slice& end_ikey) {
+  MemTable::MemTableStats total_stats = {0, 0};
+  for (auto& m : memlist_) {
+    auto mStats = m->ApproximateStats(start_ikey, end_ikey);
+    total_stats.size += mStats.size;
+    total_stats.count += mStats.count;
+  }
+  return total_stats;
+}
+
+uint64_t MemTableListVersion::GetTotalNumDeletes() const {
+  uint64_t total_num = 0;
+  for (auto& m : memlist_) {
+    total_num += m->num_deletes();
+  }
+  return total_num;
+}
+
+SequenceNumber MemTableListVersion::GetEarliestSequenceNumber(
+    bool include_history) const {
+  if (include_history && !memlist_history_.empty()) {
+    return memlist_history_.back()->GetEarliestSequenceNumber();
+  } else if (!memlist_.empty()) {
+    return memlist_.back()->GetEarliestSequenceNumber();
+  } else {
+    return kMaxSequenceNumber;
+  }
+}
+
+// caller is responsible for referencing m
+void MemTableListVersion::Add(MemTable* m, autovector<MemTable*>* to_delete) {
+  assert(refs_ == 1);  // only when refs_ == 1 is MemTableListVersion mutable
+  AddMemTable(m);
+
+  TrimHistory(to_delete);
+}
+
+// Removes m from list of memtables not flushed.  Caller should NOT Unref m.
+void MemTableListVersion::Remove(MemTable* m,
+                                 autovector<MemTable*>* to_delete) {
+  assert(refs_ == 1);  // only when refs_ == 1 is MemTableListVersion mutable
+  memlist_.remove(m);
+
+  if (max_write_buffer_number_to_maintain_ > 0) {
+    memlist_history_.push_front(m);
+    TrimHistory(to_delete);
+  } else {
+    UnrefMemTable(to_delete, m);
+  }
+}
+
+// Make sure we don't use up too much space in history
+void MemTableListVersion::TrimHistory(autovector<MemTable*>* to_delete) {
+  while (memlist_.size() + memlist_history_.size() >
+             static_cast<size_t>(max_write_buffer_number_to_maintain_) &&
+         !memlist_history_.empty()) {
+    MemTable* x = memlist_history_.back();
+    memlist_history_.pop_back();
+
+    UnrefMemTable(to_delete, x);
+  }
+}
+
+// Returns true if there is at least one memtable on which flush has
+// not yet started.
+bool MemTableList::IsFlushPending() const {
+  if ((flush_requested_ && num_flush_not_started_ >= 1) ||
+      (num_flush_not_started_ >= min_write_buffer_number_to_merge_)) {
+    assert(imm_flush_needed.load(std::memory_order_relaxed));
+    return true;
+  }
+  return false;
+}
+
+// Returns the memtables that need to be flushed.
+void MemTableList::PickMemtablesToFlush(autovector<MemTable*>* ret) {
+  AutoThreadOperationStageUpdater stage_updater(
+      ThreadStatus::STAGE_PICK_MEMTABLES_TO_FLUSH);
+  const auto& memlist = current_->memlist_;
+  for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) {
+    MemTable* m = *it;
+    if (!m->flush_in_progress_) {
+      assert(!m->flush_completed_);
+      num_flush_not_started_--;
+      if (num_flush_not_started_ == 0) {
+        imm_flush_needed.store(false, std::memory_order_release);
+      }
+      m->flush_in_progress_ = true;  // flushing will start very soon
+      ret->push_back(m);
+    }
+  }
+  flush_requested_ = false;  // start-flush request is complete
+}
+
+void MemTableList::RollbackMemtableFlush(const autovector<MemTable*>& mems,
+                                         uint64_t file_number) {
+  AutoThreadOperationStageUpdater stage_updater(
+      ThreadStatus::STAGE_MEMTABLE_ROLLBACK);
+  assert(!mems.empty());
+
+  // If the flush was not successful, then just reset state.
+  // Maybe a succeeding attempt to flush will be successful.
+  for (MemTable* m : mems) {
+    assert(m->flush_in_progress_);
+    assert(m->file_number_ == 0);
+
+    m->flush_in_progress_ = false;
+    m->flush_completed_ = false;
+    m->edit_.Clear();
+    num_flush_not_started_++;
+  }
+  imm_flush_needed.store(true, std::memory_order_release);
+}
+
+// Record a successful flush in the manifest file
+Status MemTableList::InstallMemtableFlushResults(
+    ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
+    const autovector<MemTable*>& mems, VersionSet* vset, InstrumentedMutex* mu,
+    uint64_t file_number, autovector<MemTable*>* to_delete,
+    Directory* db_directory, LogBuffer* log_buffer) {
+  AutoThreadOperationStageUpdater stage_updater(
+      ThreadStatus::STAGE_MEMTABLE_INSTALL_FLUSH_RESULTS);
+  mu->AssertHeld();
+
+  // flush was successful
+  for (size_t i = 0; i < mems.size(); ++i) {
+    // All the edits are associated with the first memtable of this batch.
+    assert(i == 0 || mems[i]->GetEdits()->NumEntries() == 0);
+
+    mems[i]->flush_completed_ = true;
+    mems[i]->file_number_ = file_number;
+  }
+
+  // if some other thread is already committing, then return
+  Status s;
+  if (commit_in_progress_) {
+    TEST_SYNC_POINT("MemTableList::InstallMemtableFlushResults:InProgress");
+    return s;
+  }
+
+  // Only a single thread can be executing this piece of code
+  commit_in_progress_ = true;
+
+  // Retry until all completed flushes are committed. New flushes can finish
+  // while the current thread is writing manifest where mutex is released.
+  while (s.ok()) {
+    auto& memlist = current_->memlist_;
+    if (memlist.empty() || !memlist.back()->flush_completed_) {
+      break;
+    }
+    // scan all memtables from the earliest, and commit those
+    // (in that order) that have finished flushing. Memetables
+    // are always committed in the order that they were created.
+    uint64_t batch_file_number = 0;
+    size_t batch_count = 0;
+    autovector<VersionEdit*> edit_list;
+    // enumerate from the last (earliest) element to see how many batch finished
+    for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) {
+      MemTable* m = *it;
+      if (!m->flush_completed_) {
+        break;
+      }
+      if (it == memlist.rbegin() || batch_file_number != m->file_number_) {
+        batch_file_number = m->file_number_;
+        ROCKS_LOG_BUFFER(log_buffer,
+                         "[%s] Level-0 commit table #%" PRIu64 " started",
+                         cfd->GetName().c_str(), m->file_number_);
+        edit_list.push_back(&m->edit_);
+      }
+      batch_count++;
+    }
+
+    if (batch_count > 0) {
+      // this can release and reacquire the mutex.
+      s = vset->LogAndApply(cfd, mutable_cf_options, edit_list, mu,
+                            db_directory);
+
+      // we will be changing the version in the next code path,
+      // so we better create a new one, since versions are immutable
+      InstallNewVersion();
+
+      // All the later memtables that have the same filenum
+      // are part of the same batch. They can be committed now.
+      uint64_t mem_id = 1;  // how many memtables have been flushed.
+      if (s.ok()) {         // commit new state
+        while (batch_count-- > 0) {
+          MemTable* m = current_->memlist_.back();
+          ROCKS_LOG_BUFFER(log_buffer, "[%s] Level-0 commit table #%" PRIu64
+                                       ": memtable #%" PRIu64 " done",
+                           cfd->GetName().c_str(), m->file_number_, mem_id);
+          assert(m->file_number_ > 0);
+          current_->Remove(m, to_delete);
+          ++mem_id;
+        }
+      } else {
+        for (auto it = current_->memlist_.rbegin(); batch_count-- > 0; it++) {
+          MemTable* m = *it;
+          // commit failed. setup state so that we can flush again.
+          ROCKS_LOG_BUFFER(log_buffer, "Level-0 commit table #%" PRIu64
+                                       ": memtable #%" PRIu64 " failed",
+                           m->file_number_, mem_id);
+          m->flush_completed_ = false;
+          m->flush_in_progress_ = false;
+          m->edit_.Clear();
+          num_flush_not_started_++;
+          m->file_number_ = 0;
+          imm_flush_needed.store(true, std::memory_order_release);
+          ++mem_id;
+        }
+      }
+    }
+  }
+  commit_in_progress_ = false;
+  return s;
+}
+
+// New memtables are inserted at the front of the list.
+void MemTableList::Add(MemTable* m, autovector<MemTable*>* to_delete) {
+  assert(static_cast<int>(current_->memlist_.size()) >= num_flush_not_started_);
+  InstallNewVersion();
+  // this method is used to move mutable memtable into an immutable list.
+  // since mutable memtable is already refcounted by the DBImpl,
+  // and when moving to the imutable list we don't unref it,
+  // we don't have to ref the memtable here. we just take over the
+  // reference from the DBImpl.
+  current_->Add(m, to_delete);
+  m->MarkImmutable();
+  num_flush_not_started_++;
+  if (num_flush_not_started_ == 1) {
+    imm_flush_needed.store(true, std::memory_order_release);
+  }
+}
+
+// Returns an estimate of the number of bytes of data in use.
+size_t MemTableList::ApproximateUnflushedMemTablesMemoryUsage() {
+  size_t total_size = 0;
+  for (auto& memtable : current_->memlist_) {
+    total_size += memtable->ApproximateMemoryUsage();
+  }
+  return total_size;
+}
+
+size_t MemTableList::ApproximateMemoryUsage() { return current_memory_usage_; }
+
+void MemTableList::InstallNewVersion() {
+  if (current_->refs_ == 1) {
+    // we're the only one using the version, just keep using it
+  } else {
+    // somebody else holds the current version, we need to create new one
+    MemTableListVersion* version = current_;
+    current_ = new MemTableListVersion(&current_memory_usage_, current_);
+    current_->Ref();
+    version->Unref();
+  }
+}
+
+uint64_t MemTableList::GetMinLogContainingPrepSection() {
+  uint64_t min_log = 0;
+
+  for (auto& m : current_->memlist_) {
+    // this mem has been flushed it no longer
+    // needs to hold on the its prep section
+    if (m->flush_completed_) {
+      continue;
+    }
+
+    auto log = m->GetMinLogContainingPrepSection();
+
+    if (log > 0 && (min_log == 0 || log < min_log)) {
+      min_log = log;
+    }
+  }
+
+  return min_log;
+}
+
+}  // namespace rocksdb

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/memtable_list.h
----------------------------------------------------------------------
diff --git a/thirdparty/rocksdb/db/memtable_list.h b/thirdparty/rocksdb/db/memtable_list.h
new file mode 100644
index 0000000..ed475b8
--- /dev/null
+++ b/thirdparty/rocksdb/db/memtable_list.h
@@ -0,0 +1,258 @@
+//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
+//
+#pragma once
+
+#include <string>
+#include <list>
+#include <vector>
+#include <set>
+#include <deque>
+
+#include "db/dbformat.h"
+#include "db/memtable.h"
+#include "db/range_del_aggregator.h"
+#include "monitoring/instrumented_mutex.h"
+#include "rocksdb/db.h"
+#include "rocksdb/iterator.h"
+#include "rocksdb/options.h"
+#include "rocksdb/types.h"
+#include "util/autovector.h"
+#include "util/filename.h"
+#include "util/log_buffer.h"
+
+namespace rocksdb {
+
+class ColumnFamilyData;
+class InternalKeyComparator;
+class InstrumentedMutex;
+class MergeIteratorBuilder;
+
+// keeps a list of immutable memtables in a vector. the list is immutable
+// if refcount is bigger than one. It is used as a state for Get() and
+// Iterator code paths
+//
+// This class is not thread-safe.  External synchronization is required
+// (such as holding the db mutex or being on the write thread).
+class MemTableListVersion {
+ public:
+  explicit MemTableListVersion(size_t* parent_memtable_list_memory_usage,
+                               MemTableListVersion* old = nullptr);
+  explicit MemTableListVersion(size_t* parent_memtable_list_memory_usage,
+                               int max_write_buffer_number_to_maintain);
+
+  void Ref();
+  void Unref(autovector<MemTable*>* to_delete = nullptr);
+
+  // Search all the memtables starting from the most recent one.
+  // Return the most recent value found, if any.
+  //
+  // If any operation was found for this key, its most recent sequence number
+  // will be stored in *seq on success (regardless of whether true/false is
+  // returned).  Otherwise, *seq will be set to kMaxSequenceNumber.
+  bool Get(const LookupKey& key, std::string* value, Status* s,
+           MergeContext* merge_context, RangeDelAggregator* range_del_agg,
+           SequenceNumber* seq, const ReadOptions& read_opts);
+
+  bool Get(const LookupKey& key, std::string* value, Status* s,
+           MergeContext* merge_context, RangeDelAggregator* range_del_agg,
+           const ReadOptions& read_opts) {
+    SequenceNumber seq;
+    return Get(key, value, s, merge_context, range_del_agg, &seq, read_opts);
+  }
+
+  // Similar to Get(), but searches the Memtable history of memtables that
+  // have already been flushed.  Should only be used from in-memory only
+  // queries (such as Transaction validation) as the history may contain
+  // writes that are also present in the SST files.
+  bool GetFromHistory(const LookupKey& key, std::string* value, Status* s,
+                      MergeContext* merge_context,
+                      RangeDelAggregator* range_del_agg, SequenceNumber* seq,
+                      const ReadOptions& read_opts);
+  bool GetFromHistory(const LookupKey& key, std::string* value, Status* s,
+                      MergeContext* merge_context,
+                      RangeDelAggregator* range_del_agg,
+                      const ReadOptions& read_opts) {
+    SequenceNumber seq;
+    return GetFromHistory(key, value, s, merge_context, range_del_agg, &seq,
+                          read_opts);
+  }
+
+  Status AddRangeTombstoneIterators(const ReadOptions& read_opts, Arena* arena,
+                                    RangeDelAggregator* range_del_agg);
+  Status AddRangeTombstoneIterators(
+      const ReadOptions& read_opts,
+      std::vector<InternalIterator*>* range_del_iters);
+
+  void AddIterators(const ReadOptions& options,
+                    std::vector<InternalIterator*>* iterator_list,
+                    Arena* arena);
+
+  void AddIterators(const ReadOptions& options,
+                    MergeIteratorBuilder* merge_iter_builder);
+
+  uint64_t GetTotalNumEntries() const;
+
+  uint64_t GetTotalNumDeletes() const;
+
+  MemTable::MemTableStats ApproximateStats(const Slice& start_ikey,
+                                           const Slice& end_ikey);
+
+  // Returns the value of MemTable::GetEarliestSequenceNumber() on the most
+  // recent MemTable in this list or kMaxSequenceNumber if the list is empty.
+  // If include_history=true, will also search Memtables in MemTableList
+  // History.
+  SequenceNumber GetEarliestSequenceNumber(bool include_history = false) const;
+
+ private:
+  // REQUIRE: m is an immutable memtable
+  void Add(MemTable* m, autovector<MemTable*>* to_delete);
+  // REQUIRE: m is an immutable memtable
+  void Remove(MemTable* m, autovector<MemTable*>* to_delete);
+
+  void TrimHistory(autovector<MemTable*>* to_delete);
+
+  bool GetFromList(std::list<MemTable*>* list, const LookupKey& key,
+                   std::string* value, Status* s, MergeContext* merge_context,
+                   RangeDelAggregator* range_del_agg, SequenceNumber* seq,
+                   const ReadOptions& read_opts);
+
+  void AddMemTable(MemTable* m);
+
+  void UnrefMemTable(autovector<MemTable*>* to_delete, MemTable* m);
+
+  friend class MemTableList;
+
+  // Immutable MemTables that have not yet been flushed.
+  std::list<MemTable*> memlist_;
+
+  // MemTables that have already been flushed
+  // (used during Transaction validation)
+  std::list<MemTable*> memlist_history_;
+
+  // Maximum number of MemTables to keep in memory (including both flushed
+  // and not-yet-flushed tables).
+  const int max_write_buffer_number_to_maintain_;
+
+  int refs_ = 0;
+
+  size_t* parent_memtable_list_memory_usage_;
+};
+
+// This class stores references to all the immutable memtables.
+// The memtables are flushed to L0 as soon as possible and in
+// any order. If there are more than one immutable memtable, their
+// flushes can occur concurrently.  However, they are 'committed'
+// to the manifest in FIFO order to maintain correctness and
+// recoverability from a crash.
+//
+//
+// Other than imm_flush_needed, this class is not thread-safe and requires
+// external synchronization (such as holding the db mutex or being on the
+// write thread.)
+class MemTableList {
+ public:
+  // A list of memtables.
+  explicit MemTableList(int min_write_buffer_number_to_merge,
+                        int max_write_buffer_number_to_maintain)
+      : imm_flush_needed(false),
+        min_write_buffer_number_to_merge_(min_write_buffer_number_to_merge),
+        current_(new MemTableListVersion(&current_memory_usage_,
+                                         max_write_buffer_number_to_maintain)),
+        num_flush_not_started_(0),
+        commit_in_progress_(false),
+        flush_requested_(false) {
+    current_->Ref();
+    current_memory_usage_ = 0;
+  }
+
+  // Should not delete MemTableList without making sure MemTableList::current()
+  // is Unref()'d.
+  ~MemTableList() {}
+
+  MemTableListVersion* current() { return current_; }
+
+  // so that background threads can detect non-nullptr pointer to
+  // determine whether there is anything more to start flushing.
+  std::atomic<bool> imm_flush_needed;
+
+  // Returns the total number of memtables in the list that haven't yet
+  // been flushed and logged.
+  int NumNotFlushed() const;
+
+  // Returns total number of memtables in the list that have been
+  // completely flushed and logged.
+  int NumFlushed() const;
+
+  // Returns true if there is at least one memtable on which flush has
+  // not yet started.
+  bool IsFlushPending() const;
+
+  // Returns the earliest memtables that needs to be flushed. The returned
+  // memtables are guaranteed to be in the ascending order of created time.
+  void PickMemtablesToFlush(autovector<MemTable*>* mems);
+
+  // Reset status of the given memtable list back to pending state so that
+  // they can get picked up again on the next round of flush.
+  void RollbackMemtableFlush(const autovector<MemTable*>& mems,
+                             uint64_t file_number);
+
+  // Commit a successful flush in the manifest file
+  Status InstallMemtableFlushResults(
+      ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
+      const autovector<MemTable*>& m, VersionSet* vset, InstrumentedMutex* mu,
+      uint64_t file_number, autovector<MemTable*>* to_delete,
+      Directory* db_directory, LogBuffer* log_buffer);
+
+  // New memtables are inserted at the front of the list.
+  // Takes ownership of the referenced held on *m by the caller of Add().
+  void Add(MemTable* m, autovector<MemTable*>* to_delete);
+
+  // Returns an estimate of the number of bytes of data in use.
+  size_t ApproximateMemoryUsage();
+
+  // Returns an estimate of the number of bytes of data used by
+  // the unflushed mem-tables.
+  size_t ApproximateUnflushedMemTablesMemoryUsage();
+
+  // Request a flush of all existing memtables to storage.  This will
+  // cause future calls to IsFlushPending() to return true if this list is
+  // non-empty (regardless of the min_write_buffer_number_to_merge
+  // parameter). This flush request will persist until the next time
+  // PickMemtablesToFlush() is called.
+  void FlushRequested() { flush_requested_ = true; }
+
+  bool HasFlushRequested() { return flush_requested_; }
+
+  // Copying allowed
+  // MemTableList(const MemTableList&);
+  // void operator=(const MemTableList&);
+
+  size_t* current_memory_usage() { return &current_memory_usage_; }
+
+  uint64_t GetMinLogContainingPrepSection();
+
+ private:
+  // DB mutex held
+  void InstallNewVersion();
+
+  const int min_write_buffer_number_to_merge_;
+
+  MemTableListVersion* current_;
+
+  // the number of elements that still need flushing
+  int num_flush_not_started_;
+
+  // committing in progress
+  bool commit_in_progress_;
+
+  // Requested a flush of all memtables to storage
+  bool flush_requested_;
+
+  // The current memory usage.
+  size_t current_memory_usage_;
+};
+
+}  // namespace rocksdb

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/merge_context.h
----------------------------------------------------------------------
diff --git a/thirdparty/rocksdb/db/merge_context.h b/thirdparty/rocksdb/db/merge_context.h
new file mode 100644
index 0000000..5e75e09
--- /dev/null
+++ b/thirdparty/rocksdb/db/merge_context.h
@@ -0,0 +1,116 @@
+//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
+//
+#pragma once
+#include <string>
+#include <vector>
+#include "db/dbformat.h"
+#include "rocksdb/slice.h"
+
+namespace rocksdb {
+
+const std::vector<Slice> empty_operand_list;
+
+// The merge context for merging a user key.
+// When doing a Get(), DB will create such a class and pass it when
+// issuing Get() operation to memtables and version_set. The operands
+// will be fetched from the context when issuing partial of full merge.
+class MergeContext {
+ public:
+  // Clear all the operands
+  void Clear() {
+    if (operand_list_) {
+      operand_list_->clear();
+      copied_operands_->clear();
+    }
+  }
+
+  // Push a merge operand
+  void PushOperand(const Slice& operand_slice, bool operand_pinned = false) {
+    Initialize();
+    SetDirectionBackward();
+
+    if (operand_pinned) {
+      operand_list_->push_back(operand_slice);
+    } else {
+      // We need to have our own copy of the operand since it's not pinned
+      copied_operands_->emplace_back(
+          new std::string(operand_slice.data(), operand_slice.size()));
+      operand_list_->push_back(*copied_operands_->back());
+    }
+  }
+
+  // Push back a merge operand
+  void PushOperandBack(const Slice& operand_slice,
+                       bool operand_pinned = false) {
+    Initialize();
+    SetDirectionForward();
+
+    if (operand_pinned) {
+      operand_list_->push_back(operand_slice);
+    } else {
+      // We need to have our own copy of the operand since it's not pinned
+      copied_operands_->emplace_back(
+          new std::string(operand_slice.data(), operand_slice.size()));
+      operand_list_->push_back(*copied_operands_->back());
+    }
+  }
+
+  // return total number of operands in the list
+  size_t GetNumOperands() const {
+    if (!operand_list_) {
+      return 0;
+    }
+    return operand_list_->size();
+  }
+
+  // Get the operand at the index.
+  Slice GetOperand(int index) {
+    assert(operand_list_);
+
+    SetDirectionForward();
+    return (*operand_list_)[index];
+  }
+
+  // Return all the operands.
+  const std::vector<Slice>& GetOperands() {
+    if (!operand_list_) {
+      return empty_operand_list;
+    }
+
+    SetDirectionForward();
+    return *operand_list_;
+  }
+
+ private:
+  void Initialize() {
+    if (!operand_list_) {
+      operand_list_.reset(new std::vector<Slice>());
+      copied_operands_.reset(new std::vector<std::unique_ptr<std::string>>());
+    }
+  }
+
+  void SetDirectionForward() {
+    if (operands_reversed_ == true) {
+      std::reverse(operand_list_->begin(), operand_list_->end());
+      operands_reversed_ = false;
+    }
+  }
+
+  void SetDirectionBackward() {
+    if (operands_reversed_ == false) {
+      std::reverse(operand_list_->begin(), operand_list_->end());
+      operands_reversed_ = true;
+    }
+  }
+
+  // List of operands
+  std::unique_ptr<std::vector<Slice>> operand_list_;
+  // Copy of operands that are not pinned.
+  std::unique_ptr<std::vector<std::unique_ptr<std::string>>> copied_operands_;
+  bool operands_reversed_ = true;
+};
+
+} // namespace rocksdb

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/merge_helper.cc
----------------------------------------------------------------------
diff --git a/thirdparty/rocksdb/db/merge_helper.cc b/thirdparty/rocksdb/db/merge_helper.cc
new file mode 100644
index 0000000..625de27
--- /dev/null
+++ b/thirdparty/rocksdb/db/merge_helper.cc
@@ -0,0 +1,365 @@
+//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
+
+#include "db/merge_helper.h"
+
+#include <stdio.h>
+#include <string>
+
+#include "db/dbformat.h"
+#include "monitoring/perf_context_imp.h"
+#include "monitoring/statistics.h"
+#include "rocksdb/comparator.h"
+#include "rocksdb/db.h"
+#include "rocksdb/merge_operator.h"
+#include "table/internal_iterator.h"
+
+namespace rocksdb {
+
+Status MergeHelper::TimedFullMerge(const MergeOperator* merge_operator,
+                                   const Slice& key, const Slice* value,
+                                   const std::vector<Slice>& operands,
+                                   std::string* result, Logger* logger,
+                                   Statistics* statistics, Env* env,
+                                   Slice* result_operand,
+                                   bool update_num_ops_stats) {
+  assert(merge_operator != nullptr);
+
+  if (operands.size() == 0) {
+    assert(value != nullptr && result != nullptr);
+    result->assign(value->data(), value->size());
+    return Status::OK();
+  }
+
+  if (update_num_ops_stats) {
+    MeasureTime(statistics, READ_NUM_MERGE_OPERANDS,
+                static_cast<uint64_t>(operands.size()));
+  }
+
+  bool success;
+  Slice tmp_result_operand(nullptr, 0);
+  const MergeOperator::MergeOperationInput merge_in(key, value, operands,
+                                                    logger);
+  MergeOperator::MergeOperationOutput merge_out(*result, tmp_result_operand);
+  {
+    // Setup to time the merge
+    StopWatchNano timer(env, statistics != nullptr);
+    PERF_TIMER_GUARD(merge_operator_time_nanos);
+
+    // Do the merge
+    success = merge_operator->FullMergeV2(merge_in, &merge_out);
+
+    if (tmp_result_operand.data()) {
+      // FullMergeV2 result is an existing operand
+      if (result_operand != nullptr) {
+        *result_operand = tmp_result_operand;
+      } else {
+        result->assign(tmp_result_operand.data(), tmp_result_operand.size());
+      }
+    } else if (result_operand) {
+      *result_operand = Slice(nullptr, 0);
+    }
+
+    RecordTick(statistics, MERGE_OPERATION_TOTAL_TIME,
+               statistics ? timer.ElapsedNanos() : 0);
+  }
+
+  if (!success) {
+    RecordTick(statistics, NUMBER_MERGE_FAILURES);
+    return Status::Corruption("Error: Could not perform merge.");
+  }
+
+  return Status::OK();
+}
+
+// PRE:  iter points to the first merge type entry
+// POST: iter points to the first entry beyond the merge process (or the end)
+//       keys_, operands_ are updated to reflect the merge result.
+//       keys_ stores the list of keys encountered while merging.
+//       operands_ stores the list of merge operands encountered while merging.
+//       keys_[i] corresponds to operands_[i] for each i.
+Status MergeHelper::MergeUntil(InternalIterator* iter,
+                               RangeDelAggregator* range_del_agg,
+                               const SequenceNumber stop_before,
+                               const bool at_bottom) {
+  // Get a copy of the internal key, before it's invalidated by iter->Next()
+  // Also maintain the list of merge operands seen.
+  assert(HasOperator());
+  keys_.clear();
+  merge_context_.Clear();
+  has_compaction_filter_skip_until_ = false;
+  assert(user_merge_operator_);
+  bool first_key = true;
+
+  // We need to parse the internal key again as the parsed key is
+  // backed by the internal key!
+  // Assume no internal key corruption as it has been successfully parsed
+  // by the caller.
+  // original_key_is_iter variable is just caching the information:
+  // original_key_is_iter == (iter->key().ToString() == original_key)
+  bool original_key_is_iter = true;
+  std::string original_key = iter->key().ToString();
+  // Important:
+  // orig_ikey is backed by original_key if keys_.empty()
+  // orig_ikey is backed by keys_.back() if !keys_.empty()
+  ParsedInternalKey orig_ikey;
+  ParseInternalKey(original_key, &orig_ikey);
+
+  Status s;
+  bool hit_the_next_user_key = false;
+  for (; iter->Valid(); iter->Next(), original_key_is_iter = false) {
+    if (IsShuttingDown()) {
+      return Status::ShutdownInProgress();
+    }
+
+    ParsedInternalKey ikey;
+    assert(keys_.size() == merge_context_.GetNumOperands());
+
+    if (!ParseInternalKey(iter->key(), &ikey)) {
+      // stop at corrupted key
+      if (assert_valid_internal_key_) {
+        assert(!"Corrupted internal key not expected.");
+        return Status::Corruption("Corrupted internal key not expected.");
+      }
+      break;
+    } else if (first_key) {
+      assert(user_comparator_->Equal(ikey.user_key, orig_ikey.user_key));
+      first_key = false;
+    } else if (!user_comparator_->Equal(ikey.user_key, orig_ikey.user_key)) {
+      // hit a different user key, stop right here
+      hit_the_next_user_key = true;
+      break;
+    } else if (stop_before && ikey.sequence <= stop_before) {
+      // hit an entry that's visible by the previous snapshot, can't touch that
+      break;
+    }
+
+    // At this point we are guaranteed that we need to process this key.
+
+    assert(IsValueType(ikey.type));
+    if (ikey.type != kTypeMerge) {
+
+      // hit a put/delete/single delete
+      //   => merge the put value or a nullptr with operands_
+      //   => store result in operands_.back() (and update keys_.back())
+      //   => change the entry type to kTypeValue for keys_.back()
+      // We are done! Success!
+
+      // If there are no operands, just return the Status::OK(). That will cause
+      // the compaction iterator to write out the key we're currently at, which
+      // is the put/delete we just encountered.
+      if (keys_.empty()) {
+        return Status::OK();
+      }
+
+      // TODO(noetzli) If the merge operator returns false, we are currently
+      // (almost) silently dropping the put/delete. That's probably not what we
+      // want. Also if we're in compaction and it's a put, it would be nice to
+      // run compaction filter on it.
+      const Slice val = iter->value();
+      const Slice* val_ptr = (kTypeValue == ikey.type) ? &val : nullptr;
+      std::string merge_result;
+      s = TimedFullMerge(user_merge_operator_, ikey.user_key, val_ptr,
+                         merge_context_.GetOperands(), &merge_result, logger_,
+                         stats_, env_);
+
+      // We store the result in keys_.back() and operands_.back()
+      // if nothing went wrong (i.e.: no operand corruption on disk)
+      if (s.ok()) {
+        // The original key encountered
+        original_key = std::move(keys_.back());
+        orig_ikey.type = kTypeValue;
+        UpdateInternalKey(&original_key, orig_ikey.sequence, orig_ikey.type);
+        keys_.clear();
+        merge_context_.Clear();
+        keys_.emplace_front(std::move(original_key));
+        merge_context_.PushOperand(merge_result);
+      }
+
+      // move iter to the next entry
+      iter->Next();
+      return s;
+    } else {
+      // hit a merge
+      //   => if there is a compaction filter, apply it.
+      //   => check for range tombstones covering the operand
+      //   => merge the operand into the front of the operands_ list
+      //      if not filtered
+      //   => then continue because we haven't yet seen a Put/Delete.
+      //
+      // Keep queuing keys and operands until we either meet a put / delete
+      // request or later did a partial merge.
+
+      Slice value_slice = iter->value();
+      // add an operand to the list if:
+      // 1) it's included in one of the snapshots. in that case we *must* write
+      // it out, no matter what compaction filter says
+      // 2) it's not filtered by a compaction filter
+      CompactionFilter::Decision filter =
+          ikey.sequence <= latest_snapshot_
+              ? CompactionFilter::Decision::kKeep
+              : FilterMerge(orig_ikey.user_key, value_slice);
+      if (filter != CompactionFilter::Decision::kRemoveAndSkipUntil &&
+          range_del_agg != nullptr &&
+          range_del_agg->ShouldDelete(
+              iter->key(),
+              RangeDelAggregator::RangePositioningMode::kForwardTraversal)) {
+        filter = CompactionFilter::Decision::kRemove;
+      }
+      if (filter == CompactionFilter::Decision::kKeep ||
+          filter == CompactionFilter::Decision::kChangeValue) {
+        if (original_key_is_iter) {
+          // this is just an optimization that saves us one memcpy
+          keys_.push_front(std::move(original_key));
+        } else {
+          keys_.push_front(iter->key().ToString());
+        }
+        if (keys_.size() == 1) {
+          // we need to re-anchor the orig_ikey because it was anchored by
+          // original_key before
+          ParseInternalKey(keys_.back(), &orig_ikey);
+        }
+        if (filter == CompactionFilter::Decision::kKeep) {
+          merge_context_.PushOperand(
+              value_slice, iter->IsValuePinned() /* operand_pinned */);
+        } else {  // kChangeValue
+          // Compaction filter asked us to change the operand from value_slice
+          // to compaction_filter_value_.
+          merge_context_.PushOperand(compaction_filter_value_, false);
+        }
+      } else if (filter == CompactionFilter::Decision::kRemoveAndSkipUntil) {
+        // Compaction filter asked us to remove this key altogether
+        // (not just this operand), along with some keys following it.
+        keys_.clear();
+        merge_context_.Clear();
+        has_compaction_filter_skip_until_ = true;
+        return Status::OK();
+      }
+    }
+  }
+
+  if (merge_context_.GetNumOperands() == 0) {
+    // we filtered out all the merge operands
+    return Status::OK();
+  }
+
+  // We are sure we have seen this key's entire history if we are at the
+  // last level and exhausted all internal keys of this user key.
+  // NOTE: !iter->Valid() does not necessarily mean we hit the
+  // beginning of a user key, as versions of a user key might be
+  // split into multiple files (even files on the same level)
+  // and some files might not be included in the compaction/merge.
+  //
+  // There are also cases where we have seen the root of history of this
+  // key without being sure of it. Then, we simply miss the opportunity
+  // to combine the keys. Since VersionSet::SetupOtherInputs() always makes
+  // sure that all merge-operands on the same level get compacted together,
+  // this will simply lead to these merge operands moving to the next level.
+  //
+  // So, we only perform the following logic (to merge all operands together
+  // without a Put/Delete) if we are certain that we have seen the end of key.
+  bool surely_seen_the_beginning = hit_the_next_user_key && at_bottom;
+  if (surely_seen_the_beginning) {
+    // do a final merge with nullptr as the existing value and say
+    // bye to the merge type (it's now converted to a Put)
+    assert(kTypeMerge == orig_ikey.type);
+    assert(merge_context_.GetNumOperands() >= 1);
+    assert(merge_context_.GetNumOperands() == keys_.size());
+    std::string merge_result;
+    s = TimedFullMerge(user_merge_operator_, orig_ikey.user_key, nullptr,
+                       merge_context_.GetOperands(), &merge_result, logger_,
+                       stats_, env_);
+    if (s.ok()) {
+      // The original key encountered
+      // We are certain that keys_ is not empty here (see assertions couple of
+      // lines before).
+      original_key = std::move(keys_.back());
+      orig_ikey.type = kTypeValue;
+      UpdateInternalKey(&original_key, orig_ikey.sequence, orig_ikey.type);
+      keys_.clear();
+      merge_context_.Clear();
+      keys_.emplace_front(std::move(original_key));
+      merge_context_.PushOperand(merge_result);
+    }
+  } else {
+    // We haven't seen the beginning of the key nor a Put/Delete.
+    // Attempt to use the user's associative merge function to
+    // merge the stacked merge operands into a single operand.
+    s = Status::MergeInProgress();
+    if (merge_context_.GetNumOperands() >= 2) {
+      bool merge_success = false;
+      std::string merge_result;
+      {
+        StopWatchNano timer(env_, stats_ != nullptr);
+        PERF_TIMER_GUARD(merge_operator_time_nanos);
+        merge_success = user_merge_operator_->PartialMergeMulti(
+            orig_ikey.user_key,
+            std::deque<Slice>(merge_context_.GetOperands().begin(),
+                              merge_context_.GetOperands().end()),
+            &merge_result, logger_);
+        RecordTick(stats_, MERGE_OPERATION_TOTAL_TIME,
+                   stats_ ? timer.ElapsedNanosSafe() : 0);
+      }
+      if (merge_success) {
+        // Merging of operands (associative merge) was successful.
+        // Replace operands with the merge result
+        merge_context_.Clear();
+        merge_context_.PushOperand(merge_result);
+        keys_.erase(keys_.begin(), keys_.end() - 1);
+      }
+    }
+  }
+
+  return s;
+}
+
+MergeOutputIterator::MergeOutputIterator(const MergeHelper* merge_helper)
+    : merge_helper_(merge_helper) {
+  it_keys_ = merge_helper_->keys().rend();
+  it_values_ = merge_helper_->values().rend();
+}
+
+void MergeOutputIterator::SeekToFirst() {
+  const auto& keys = merge_helper_->keys();
+  const auto& values = merge_helper_->values();
+  assert(keys.size() == values.size());
+  it_keys_ = keys.rbegin();
+  it_values_ = values.rbegin();
+}
+
+void MergeOutputIterator::Next() {
+  ++it_keys_;
+  ++it_values_;
+}
+
+CompactionFilter::Decision MergeHelper::FilterMerge(const Slice& user_key,
+                                                    const Slice& value_slice) {
+  if (compaction_filter_ == nullptr) {
+    return CompactionFilter::Decision::kKeep;
+  }
+  if (stats_ != nullptr) {
+    filter_timer_.Start();
+  }
+  compaction_filter_value_.clear();
+  compaction_filter_skip_until_.Clear();
+  auto ret = compaction_filter_->FilterV2(
+      level_, user_key, CompactionFilter::ValueType::kMergeOperand, value_slice,
+      &compaction_filter_value_, compaction_filter_skip_until_.rep());
+  if (ret == CompactionFilter::Decision::kRemoveAndSkipUntil) {
+    if (user_comparator_->Compare(*compaction_filter_skip_until_.rep(),
+                                  user_key) <= 0) {
+      // Invalid skip_until returned from compaction filter.
+      // Keep the key as per FilterV2 documentation.
+      ret = CompactionFilter::Decision::kKeep;
+    } else {
+      compaction_filter_skip_until_.ConvertFromUserKey(kMaxSequenceNumber,
+                                                       kValueTypeForSeek);
+    }
+  }
+  total_filter_time_ += filter_timer_.ElapsedNanosSafe();
+  return ret;
+}
+
+} // namespace rocksdb

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/merge_helper.h
----------------------------------------------------------------------
diff --git a/thirdparty/rocksdb/db/merge_helper.h b/thirdparty/rocksdb/db/merge_helper.h
new file mode 100644
index 0000000..59da47a
--- /dev/null
+++ b/thirdparty/rocksdb/db/merge_helper.h
@@ -0,0 +1,209 @@
+//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
+//
+#ifndef MERGE_HELPER_H
+#define MERGE_HELPER_H
+
+#include <deque>
+#include <string>
+#include <vector>
+
+#include "db/dbformat.h"
+#include "db/merge_context.h"
+#include "db/range_del_aggregator.h"
+#include "rocksdb/compaction_filter.h"
+#include "rocksdb/env.h"
+#include "rocksdb/slice.h"
+#include "util/stop_watch.h"
+
+namespace rocksdb {
+
+class Comparator;
+class Iterator;
+class Logger;
+class MergeOperator;
+class Statistics;
+class InternalIterator;
+
+class MergeHelper {
+ public:
+  MergeHelper(Env* env, const Comparator* user_comparator,
+              const MergeOperator* user_merge_operator,
+              const CompactionFilter* compaction_filter, Logger* logger,
+              bool assert_valid_internal_key, SequenceNumber latest_snapshot,
+              int level = 0, Statistics* stats = nullptr,
+              const std::atomic<bool>* shutting_down = nullptr)
+      : env_(env),
+        user_comparator_(user_comparator),
+        user_merge_operator_(user_merge_operator),
+        compaction_filter_(compaction_filter),
+        shutting_down_(shutting_down),
+        logger_(logger),
+        assert_valid_internal_key_(assert_valid_internal_key),
+        latest_snapshot_(latest_snapshot),
+        level_(level),
+        keys_(),
+        filter_timer_(env_),
+        total_filter_time_(0U),
+        stats_(stats) {
+    assert(user_comparator_ != nullptr);
+  }
+
+  // Wrapper around MergeOperator::FullMergeV2() that records perf statistics.
+  // Result of merge will be written to result if status returned is OK.
+  // If operands is empty, the value will simply be copied to result.
+  // Set `update_num_ops_stats` to true if it is from a user read, so that
+  // the latency is sensitive.
+  // Returns one of the following statuses:
+  // - OK: Entries were successfully merged.
+  // - Corruption: Merge operator reported unsuccessful merge.
+  static Status TimedFullMerge(const MergeOperator* merge_operator,
+                               const Slice& key, const Slice* value,
+                               const std::vector<Slice>& operands,
+                               std::string* result, Logger* logger,
+                               Statistics* statistics, Env* env,
+                               Slice* result_operand = nullptr,
+                               bool update_num_ops_stats = false);
+
+  // Merge entries until we hit
+  //     - a corrupted key
+  //     - a Put/Delete,
+  //     - a different user key,
+  //     - a specific sequence number (snapshot boundary),
+  //     - REMOVE_AND_SKIP_UNTIL returned from compaction filter,
+  //  or - the end of iteration
+  // iter: (IN)  points to the first merge type entry
+  //       (OUT) points to the first entry not included in the merge process
+  // range_del_agg: (IN) filters merge operands covered by range tombstones.
+  // stop_before: (IN) a sequence number that merge should not cross.
+  //                   0 means no restriction
+  // at_bottom:   (IN) true if the iterator covers the bottem level, which means
+  //                   we could reach the start of the history of this user key.
+  //
+  // Returns one of the following statuses:
+  // - OK: Entries were successfully merged.
+  // - MergeInProgress: Put/Delete not encountered, and didn't reach the start
+  //   of key's history. Output consists of merge operands only.
+  // - Corruption: Merge operator reported unsuccessful merge or a corrupted
+  //   key has been encountered and not expected (applies only when compiling
+  //   with asserts removed).
+  // - ShutdownInProgress: interrupted by shutdown (*shutting_down == true).
+  //
+  // REQUIRED: The first key in the input is not corrupted.
+  Status MergeUntil(InternalIterator* iter,
+                    RangeDelAggregator* range_del_agg = nullptr,
+                    const SequenceNumber stop_before = 0,
+                    const bool at_bottom = false);
+
+  // Filters a merge operand using the compaction filter specified
+  // in the constructor. Returns the decision that the filter made.
+  // Uses compaction_filter_value_ and compaction_filter_skip_until_ for the
+  // optional outputs of compaction filter.
+  CompactionFilter::Decision FilterMerge(const Slice& user_key,
+                                         const Slice& value_slice);
+
+  // Query the merge result
+  // These are valid until the next MergeUntil call
+  // If the merging was successful:
+  //   - keys() contains a single element with the latest sequence number of
+  //     the merges. The type will be Put or Merge. See IMPORTANT 1 note, below.
+  //   - values() contains a single element with the result of merging all the
+  //     operands together
+  //
+  //   IMPORTANT 1: the key type could change after the MergeUntil call.
+  //        Put/Delete + Merge + ... + Merge => Put
+  //        Merge + ... + Merge => Merge
+  //
+  // If the merge operator is not associative, and if a Put/Delete is not found
+  // then the merging will be unsuccessful. In this case:
+  //   - keys() contains the list of internal keys seen in order of iteration.
+  //   - values() contains the list of values (merges) seen in the same order.
+  //              values() is parallel to keys() so that the first entry in
+  //              keys() is the key associated with the first entry in values()
+  //              and so on. These lists will be the same length.
+  //              All of these pairs will be merges over the same user key.
+  //              See IMPORTANT 2 note below.
+  //
+  //   IMPORTANT 2: The entries were traversed in order from BACK to FRONT.
+  //                So keys().back() was the first key seen by iterator.
+  // TODO: Re-style this comment to be like the first one
+  const std::deque<std::string>& keys() const { return keys_; }
+  const std::vector<Slice>& values() const {
+    return merge_context_.GetOperands();
+  }
+  uint64_t TotalFilterTime() const { return total_filter_time_; }
+  bool HasOperator() const { return user_merge_operator_ != nullptr; }
+
+  // If compaction filter returned REMOVE_AND_SKIP_UNTIL, this method will
+  // return true and fill *until with the key to which we should skip.
+  // If true, keys() and values() are empty.
+  bool FilteredUntil(Slice* skip_until) const {
+    if (!has_compaction_filter_skip_until_) {
+      return false;
+    }
+    assert(compaction_filter_ != nullptr);
+    assert(skip_until != nullptr);
+    assert(compaction_filter_skip_until_.Valid());
+    *skip_until = compaction_filter_skip_until_.Encode();
+    return true;
+  }
+
+ private:
+  Env* env_;
+  const Comparator* user_comparator_;
+  const MergeOperator* user_merge_operator_;
+  const CompactionFilter* compaction_filter_;
+  const std::atomic<bool>* shutting_down_;
+  Logger* logger_;
+  bool assert_valid_internal_key_; // enforce no internal key corruption?
+  SequenceNumber latest_snapshot_;
+  int level_;
+
+  // the scratch area that holds the result of MergeUntil
+  // valid up to the next MergeUntil call
+
+  // Keeps track of the sequence of keys seen
+  std::deque<std::string> keys_;
+  // Parallel with keys_; stores the operands
+  mutable MergeContext merge_context_;
+
+  StopWatchNano filter_timer_;
+  uint64_t total_filter_time_;
+  Statistics* stats_;
+
+  bool has_compaction_filter_skip_until_ = false;
+  std::string compaction_filter_value_;
+  InternalKey compaction_filter_skip_until_;
+
+  bool IsShuttingDown() {
+    // This is a best-effort facility, so memory_order_relaxed is sufficient.
+    return shutting_down_ && shutting_down_->load(std::memory_order_relaxed);
+  }
+};
+
+// MergeOutputIterator can be used to iterate over the result of a merge.
+class MergeOutputIterator {
+ public:
+  // The MergeOutputIterator is bound to a MergeHelper instance.
+  explicit MergeOutputIterator(const MergeHelper* merge_helper);
+
+  // Seeks to the first record in the output.
+  void SeekToFirst();
+  // Advances to the next record in the output.
+  void Next();
+
+  Slice key() { return Slice(*it_keys_); }
+  Slice value() { return Slice(*it_values_); }
+  bool Valid() { return it_keys_ != merge_helper_->keys().rend(); }
+
+ private:
+  const MergeHelper* merge_helper_;
+  std::deque<std::string>::const_reverse_iterator it_keys_;
+  std::vector<Slice>::const_reverse_iterator it_values_;
+};
+
+} // namespace rocksdb
+
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/merge_operator.cc
----------------------------------------------------------------------
diff --git a/thirdparty/rocksdb/db/merge_operator.cc b/thirdparty/rocksdb/db/merge_operator.cc
new file mode 100644
index 0000000..1981e65
--- /dev/null
+++ b/thirdparty/rocksdb/db/merge_operator.cc
@@ -0,0 +1,86 @@
+//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
+//
+/**
+ * Back-end implementation details specific to the Merge Operator.
+ */
+
+#include "rocksdb/merge_operator.h"
+
+namespace rocksdb {
+
+bool MergeOperator::FullMergeV2(const MergeOperationInput& merge_in,
+                                MergeOperationOutput* merge_out) const {
+  // If FullMergeV2 is not implemented, we convert the operand_list to
+  // std::deque<std::string> and pass it to FullMerge
+  std::deque<std::string> operand_list_str;
+  for (auto& op : merge_in.operand_list) {
+    operand_list_str.emplace_back(op.data(), op.size());
+  }
+  return FullMerge(merge_in.key, merge_in.existing_value, operand_list_str,
+                   &merge_out->new_value, merge_in.logger);
+}
+
+// The default implementation of PartialMergeMulti, which invokes
+// PartialMerge multiple times internally and merges two operands at
+// a time.
+bool MergeOperator::PartialMergeMulti(const Slice& key,
+                                      const std::deque<Slice>& operand_list,
+                                      std::string* new_value,
+                                      Logger* logger) const {
+  assert(operand_list.size() >= 2);
+  // Simply loop through the operands
+  Slice temp_slice(operand_list[0]);
+
+  for (size_t i = 1; i < operand_list.size(); ++i) {
+    auto& operand = operand_list[i];
+    std::string temp_value;
+    if (!PartialMerge(key, temp_slice, operand, &temp_value, logger)) {
+      return false;
+    }
+    swap(temp_value, *new_value);
+    temp_slice = Slice(*new_value);
+  }
+
+  // The result will be in *new_value. All merges succeeded.
+  return true;
+}
+
+// Given a "real" merge from the library, call the user's
+// associative merge function one-by-one on each of the operands.
+// NOTE: It is assumed that the client's merge-operator will handle any errors.
+bool AssociativeMergeOperator::FullMergeV2(
+    const MergeOperationInput& merge_in,
+    MergeOperationOutput* merge_out) const {
+  // Simply loop through the operands
+  Slice temp_existing;
+  const Slice* existing_value = merge_in.existing_value;
+  for (const auto& operand : merge_in.operand_list) {
+    std::string temp_value;
+    if (!Merge(merge_in.key, existing_value, operand, &temp_value,
+               merge_in.logger)) {
+      return false;
+    }
+    swap(temp_value, merge_out->new_value);
+    temp_existing = Slice(merge_out->new_value);
+    existing_value = &temp_existing;
+  }
+
+  // The result will be in *new_value. All merges succeeded.
+  return true;
+}
+
+// Call the user defined simple merge on the operands;
+// NOTE: It is assumed that the client's merge-operator will handle any errors.
+bool AssociativeMergeOperator::PartialMerge(
+    const Slice& key,
+    const Slice& left_operand,
+    const Slice& right_operand,
+    std::string* new_value,
+    Logger* logger) const {
+  return Merge(key, &left_operand, right_operand, new_value, logger);
+}
+
+} // namespace rocksdb

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/pinned_iterators_manager.h
----------------------------------------------------------------------
diff --git a/thirdparty/rocksdb/db/pinned_iterators_manager.h b/thirdparty/rocksdb/db/pinned_iterators_manager.h
new file mode 100644
index 0000000..7874eef
--- /dev/null
+++ b/thirdparty/rocksdb/db/pinned_iterators_manager.h
@@ -0,0 +1,87 @@
+//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
+//
+#pragma once
+#include <algorithm>
+#include <memory>
+#include <utility>
+#include <vector>
+
+#include "table/internal_iterator.h"
+
+namespace rocksdb {
+
+// PinnedIteratorsManager will be notified whenever we need to pin an Iterator
+// and it will be responsible for deleting pinned Iterators when they are
+// not needed anymore.
+class PinnedIteratorsManager : public Cleanable {
+ public:
+  PinnedIteratorsManager() : pinning_enabled(false) {}
+  ~PinnedIteratorsManager() {
+    if (pinning_enabled) {
+      ReleasePinnedData();
+    }
+  }
+
+  // Enable Iterators pinning
+  void StartPinning() {
+    assert(pinning_enabled == false);
+    pinning_enabled = true;
+  }
+
+  // Is pinning enabled ?
+  bool PinningEnabled() { return pinning_enabled; }
+
+  // Take ownership of iter and delete it when ReleasePinnedData() is called
+  void PinIterator(InternalIterator* iter, bool arena = false) {
+    if (arena) {
+      PinPtr(iter, &PinnedIteratorsManager::ReleaseArenaInternalIterator);
+    } else {
+      PinPtr(iter, &PinnedIteratorsManager::ReleaseInternalIterator);
+    }
+  }
+
+  typedef void (*ReleaseFunction)(void* arg1);
+  void PinPtr(void* ptr, ReleaseFunction release_func) {
+    assert(pinning_enabled);
+    if (ptr == nullptr) {
+      return;
+    }
+    pinned_ptrs_.emplace_back(ptr, release_func);
+  }
+
+  // Release pinned Iterators
+  inline void ReleasePinnedData() {
+    assert(pinning_enabled == true);
+    pinning_enabled = false;
+
+    // Remove duplicate pointers
+    std::sort(pinned_ptrs_.begin(), pinned_ptrs_.end());
+    auto unique_end = std::unique(pinned_ptrs_.begin(), pinned_ptrs_.end());
+
+    for (auto i = pinned_ptrs_.begin(); i != unique_end; ++i) {
+      void* ptr = i->first;
+      ReleaseFunction release_func = i->second;
+      release_func(ptr);
+    }
+    pinned_ptrs_.clear();
+    // Also do cleanups from the base Cleanable
+    Cleanable::Reset();
+  }
+
+ private:
+  static void ReleaseInternalIterator(void* ptr) {
+    delete reinterpret_cast<InternalIterator*>(ptr);
+  }
+
+  static void ReleaseArenaInternalIterator(void* ptr) {
+    reinterpret_cast<InternalIterator*>(ptr)->~InternalIterator();
+  }
+
+  bool pinning_enabled;
+  std::vector<std::pair<void*, ReleaseFunction>> pinned_ptrs_;
+};
+
+}  // namespace rocksdb


Mime
View raw message