nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jeremyd...@apache.org
Subject [08/51] [partial] nifi-minifi-cpp git commit: MINIFI-372: Replace leveldb with RocksDB
Date Mon, 09 Oct 2017 16:24:48 GMT
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/version_set.h
----------------------------------------------------------------------
diff --git a/thirdparty/rocksdb/db/version_set.h b/thirdparty/rocksdb/db/version_set.h
new file mode 100644
index 0000000..9fb000c
--- /dev/null
+++ b/thirdparty/rocksdb/db/version_set.h
@@ -0,0 +1,862 @@
+//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+//
+// The representation of a DBImpl consists of a set of Versions.  The
+// newest version is called "current".  Older versions may be kept
+// around to provide a consistent view to live iterators.
+//
+// Each Version keeps track of a set of Table files per level.  The
+// entire set of versions is maintained in a VersionSet.
+//
+// Version,VersionSet are thread-compatible, but require external
+// synchronization on all accesses.
+
+#pragma once
+#include <atomic>
+#include <deque>
+#include <limits>
+#include <map>
+#include <memory>
+#include <set>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "db/column_family.h"
+#include "db/compaction.h"
+#include "db/compaction_picker.h"
+#include "db/dbformat.h"
+#include "db/file_indexer.h"
+#include "db/log_reader.h"
+#include "db/range_del_aggregator.h"
+#include "db/table_cache.h"
+#include "db/version_builder.h"
+#include "db/version_edit.h"
+#include "db/write_controller.h"
+#include "monitoring/instrumented_mutex.h"
+#include "options/db_options.h"
+#include "port/port.h"
+#include "rocksdb/env.h"
+
+namespace rocksdb {
+
+namespace log {
+class Writer;
+}
+
+class Compaction;
+class InternalIterator;
+class LogBuffer;
+class LookupKey;
+class MemTable;
+class Version;
+class VersionSet;
+class WriteBufferManager;
+class MergeContext;
+class ColumnFamilySet;
+class TableCache;
+class MergeIteratorBuilder;
+
+// Return the smallest index i such that file_level.files[i]->largest >= key.
+// Return file_level.num_files if there is no such file.
+// REQUIRES: "file_level.files" contains a sorted list of
+// non-overlapping files.
+extern int FindFile(const InternalKeyComparator& icmp,
+                    const LevelFilesBrief& file_level, const Slice& key);
+
+// Returns true iff some file in "files" overlaps the user key range
+// [*smallest,*largest].
+// smallest==nullptr represents a key smaller than all keys in the DB.
+// largest==nullptr represents a key largest than all keys in the DB.
+// REQUIRES: If disjoint_sorted_files, file_level.files[]
+// contains disjoint ranges in sorted order.
+extern bool SomeFileOverlapsRange(const InternalKeyComparator& icmp,
+                                  bool disjoint_sorted_files,
+                                  const LevelFilesBrief& file_level,
+                                  const Slice* smallest_user_key,
+                                  const Slice* largest_user_key);
+
+// Generate LevelFilesBrief from vector<FdWithKeyRange*>
+// Would copy smallest_key and largest_key data to sequential memory
+// arena: Arena used to allocate the memory
+extern void DoGenerateLevelFilesBrief(LevelFilesBrief* file_level,
+                                      const std::vector<FileMetaData*>& files,
+                                      Arena* arena);
+
+class VersionStorageInfo {
+ public:
+  VersionStorageInfo(const InternalKeyComparator* internal_comparator,
+                     const Comparator* user_comparator, int num_levels,
+                     CompactionStyle compaction_style,
+                     VersionStorageInfo* src_vstorage,
+                     bool _force_consistency_checks);
+  ~VersionStorageInfo();
+
+  void Reserve(int level, size_t size) { files_[level].reserve(size); }
+
+  void AddFile(int level, FileMetaData* f, Logger* info_log = nullptr);
+
+  void SetFinalized();
+
+  // Update num_non_empty_levels_.
+  void UpdateNumNonEmptyLevels();
+
+  void GenerateFileIndexer() {
+    file_indexer_.UpdateIndex(&arena_, num_non_empty_levels_, files_);
+  }
+
+  // Update the accumulated stats from a file-meta.
+  void UpdateAccumulatedStats(FileMetaData* file_meta);
+
+  // Decrease the current stat form a to-be-delected file-meta
+  void RemoveCurrentStats(FileMetaData* file_meta);
+
+  void ComputeCompensatedSizes();
+
+  // Updates internal structures that keep track of compaction scores
+  // We use compaction scores to figure out which compaction to do next
+  // REQUIRES: db_mutex held!!
+  // TODO find a better way to pass compaction_options_fifo.
+  void ComputeCompactionScore(const ImmutableCFOptions& immutable_cf_options,
+                              const MutableCFOptions& mutable_cf_options);
+
+  // Estimate est_comp_needed_bytes_
+  void EstimateCompactionBytesNeeded(
+      const MutableCFOptions& mutable_cf_options);
+
+  // This computes files_marked_for_compaction_ and is called by
+  // ComputeCompactionScore()
+  void ComputeFilesMarkedForCompaction();
+
+  // Generate level_files_brief_ from files_
+  void GenerateLevelFilesBrief();
+  // Sort all files for this version based on their file size and
+  // record results in files_by_compaction_pri_. The largest files are listed
+  // first.
+  void UpdateFilesByCompactionPri(CompactionPri compaction_pri);
+
+  void GenerateLevel0NonOverlapping();
+  bool level0_non_overlapping() const {
+    return level0_non_overlapping_;
+  }
+
+  int MaxInputLevel() const;
+  int MaxOutputLevel(bool allow_ingest_behind) const;
+
+  // Return level number that has idx'th highest score
+  int CompactionScoreLevel(int idx) const { return compaction_level_[idx]; }
+
+  // Return idx'th highest score
+  double CompactionScore(int idx) const { return compaction_score_[idx]; }
+
+  void GetOverlappingInputs(
+      int level, const InternalKey* begin,  // nullptr means before all keys
+      const InternalKey* end,               // nullptr means after all keys
+      std::vector<FileMetaData*>* inputs,
+      int hint_index = -1,        // index of overlap file
+      int* file_index = nullptr,  // return index of overlap file
+      bool expand_range = true)   // if set, returns files which overlap the
+      const;                      // range and overlap each other. If false,
+                                  // then just files intersecting the range
+  void GetCleanInputsWithinInterval(
+      int level, const InternalKey* begin,  // nullptr means before all keys
+      const InternalKey* end,               // nullptr means after all keys
+      std::vector<FileMetaData*>* inputs,
+      int hint_index = -1,        // index of overlap file
+      int* file_index = nullptr)  // return index of overlap file
+      const;
+
+  void GetOverlappingInputsRangeBinarySearch(
+      int level,           // level > 0
+      const Slice& begin,  // nullptr means before all keys
+      const Slice& end,    // nullptr means after all keys
+      std::vector<FileMetaData*>* inputs,
+      int hint_index,                // index of overlap file
+      int* file_index,               // return index of overlap file
+      bool within_interval = false)  // if set, force the inputs within interval
+      const;
+
+  void ExtendFileRangeOverlappingInterval(
+      int level,
+      const Slice& begin,  // nullptr means before all keys
+      const Slice& end,    // nullptr means after all keys
+      unsigned int index,  // start extending from this index
+      int* startIndex,     // return the startIndex of input range
+      int* endIndex)       // return the endIndex of input range
+      const;
+
+  void ExtendFileRangeWithinInterval(
+      int level,
+      const Slice& begin,  // nullptr means before all keys
+      const Slice& end,    // nullptr means after all keys
+      unsigned int index,  // start extending from this index
+      int* startIndex,     // return the startIndex of input range
+      int* endIndex)       // return the endIndex of input range
+      const;
+
+  // Returns true iff some file in the specified level overlaps
+  // some part of [*smallest_user_key,*largest_user_key].
+  // smallest_user_key==NULL represents a key smaller than all keys in the DB.
+  // largest_user_key==NULL represents a key largest than all keys in the DB.
+  bool OverlapInLevel(int level, const Slice* smallest_user_key,
+                      const Slice* largest_user_key);
+
+  // Returns true iff the first or last file in inputs contains
+  // an overlapping user key to the file "just outside" of it (i.e.
+  // just after the last file, or just before the first file)
+  // REQUIRES: "*inputs" is a sorted list of non-overlapping files
+  bool HasOverlappingUserKey(const std::vector<FileMetaData*>* inputs,
+                             int level);
+
+  int num_levels() const { return num_levels_; }
+
+  // REQUIRES: This version has been saved (see VersionSet::SaveTo)
+  int num_non_empty_levels() const {
+    assert(finalized_);
+    return num_non_empty_levels_;
+  }
+
+  // REQUIRES: This version has been finalized.
+  // (CalculateBaseBytes() is called)
+  // This may or may not return number of level files. It is to keep backward
+  // compatible behavior in universal compaction.
+  int l0_delay_trigger_count() const { return l0_delay_trigger_count_; }
+
+  void set_l0_delay_trigger_count(int v) { l0_delay_trigger_count_ = v; }
+
+  // REQUIRES: This version has been saved (see VersionSet::SaveTo)
+  int NumLevelFiles(int level) const {
+    assert(finalized_);
+    return static_cast<int>(files_[level].size());
+  }
+
+  // Return the combined file size of all files at the specified level.
+  uint64_t NumLevelBytes(int level) const;
+
+  // REQUIRES: This version has been saved (see VersionSet::SaveTo)
+  const std::vector<FileMetaData*>& LevelFiles(int level) const {
+    return files_[level];
+  }
+
+  const rocksdb::LevelFilesBrief& LevelFilesBrief(int level) const {
+    assert(level < static_cast<int>(level_files_brief_.size()));
+    return level_files_brief_[level];
+  }
+
+  // REQUIRES: This version has been saved (see VersionSet::SaveTo)
+  const std::vector<int>& FilesByCompactionPri(int level) const {
+    assert(finalized_);
+    return files_by_compaction_pri_[level];
+  }
+
+  // REQUIRES: This version has been saved (see VersionSet::SaveTo)
+  // REQUIRES: DB mutex held during access
+  const autovector<std::pair<int, FileMetaData*>>& FilesMarkedForCompaction()
+      const {
+    assert(finalized_);
+    return files_marked_for_compaction_;
+  }
+
+  int base_level() const { return base_level_; }
+
+  // REQUIRES: lock is held
+  // Set the index that is used to offset into files_by_compaction_pri_ to find
+  // the next compaction candidate file.
+  void SetNextCompactionIndex(int level, int index) {
+    next_file_to_compact_by_size_[level] = index;
+  }
+
+  // REQUIRES: lock is held
+  int NextCompactionIndex(int level) const {
+    return next_file_to_compact_by_size_[level];
+  }
+
+  // REQUIRES: This version has been saved (see VersionSet::SaveTo)
+  const FileIndexer& file_indexer() const {
+    assert(finalized_);
+    return file_indexer_;
+  }
+
+  // Only the first few entries of files_by_compaction_pri_ are sorted.
+  // There is no need to sort all the files because it is likely
+  // that on a running system, we need to look at only the first
+  // few largest files because a new version is created every few
+  // seconds/minutes (because of concurrent compactions).
+  static const size_t kNumberFilesToSort = 50;
+
+  // Return a human-readable short (single-line) summary of the number
+  // of files per level.  Uses *scratch as backing store.
+  struct LevelSummaryStorage {
+    char buffer[1000];
+  };
+  struct FileSummaryStorage {
+    char buffer[3000];
+  };
+  const char* LevelSummary(LevelSummaryStorage* scratch) const;
+  // Return a human-readable short (single-line) summary of files
+  // in a specified level.  Uses *scratch as backing store.
+  const char* LevelFileSummary(FileSummaryStorage* scratch, int level) const;
+
+  // Return the maximum overlapping data (in bytes) at next level for any
+  // file at a level >= 1.
+  int64_t MaxNextLevelOverlappingBytes();
+
+  // Return a human readable string that describes this version's contents.
+  std::string DebugString(bool hex = false) const;
+
+  uint64_t GetAverageValueSize() const {
+    if (accumulated_num_non_deletions_ == 0) {
+      return 0;
+    }
+    assert(accumulated_raw_key_size_ + accumulated_raw_value_size_ > 0);
+    assert(accumulated_file_size_ > 0);
+    return accumulated_raw_value_size_ / accumulated_num_non_deletions_ *
+           accumulated_file_size_ /
+           (accumulated_raw_key_size_ + accumulated_raw_value_size_);
+  }
+
+  uint64_t GetEstimatedActiveKeys() const;
+
+  double GetEstimatedCompressionRatioAtLevel(int level) const;
+
+  // re-initializes the index that is used to offset into
+  // files_by_compaction_pri_
+  // to find the next compaction candidate file.
+  void ResetNextCompactionIndex(int level) {
+    next_file_to_compact_by_size_[level] = 0;
+  }
+
+  const InternalKeyComparator* InternalComparator() {
+    return internal_comparator_;
+  }
+
+  // Returns maximum total bytes of data on a given level.
+  uint64_t MaxBytesForLevel(int level) const;
+
+  // Must be called after any change to MutableCFOptions.
+  void CalculateBaseBytes(const ImmutableCFOptions& ioptions,
+                          const MutableCFOptions& options);
+
+  // Returns an estimate of the amount of live data in bytes.
+  uint64_t EstimateLiveDataSize() const;
+
+  uint64_t estimated_compaction_needed_bytes() const {
+    return estimated_compaction_needed_bytes_;
+  }
+
+  void TEST_set_estimated_compaction_needed_bytes(uint64_t v) {
+    estimated_compaction_needed_bytes_ = v;
+  }
+
+  bool force_consistency_checks() const { return force_consistency_checks_; }
+
+ private:
+  const InternalKeyComparator* internal_comparator_;
+  const Comparator* user_comparator_;
+  int num_levels_;            // Number of levels
+  int num_non_empty_levels_;  // Number of levels. Any level larger than it
+                              // is guaranteed to be empty.
+  // Per-level max bytes
+  std::vector<uint64_t> level_max_bytes_;
+
+  // A short brief metadata of files per level
+  autovector<rocksdb::LevelFilesBrief> level_files_brief_;
+  FileIndexer file_indexer_;
+  Arena arena_;  // Used to allocate space for file_levels_
+
+  CompactionStyle compaction_style_;
+
+  // List of files per level, files in each level are arranged
+  // in increasing order of keys
+  std::vector<FileMetaData*>* files_;
+
+  // Level that L0 data should be compacted to. All levels < base_level_ should
+  // be empty. -1 if it is not level-compaction so it's not applicable.
+  int base_level_;
+
+  // A list for the same set of files that are stored in files_,
+  // but files in each level are now sorted based on file
+  // size. The file with the largest size is at the front.
+  // This vector stores the index of the file from files_.
+  std::vector<std::vector<int>> files_by_compaction_pri_;
+
+  // If true, means that files in L0 have keys with non overlapping ranges
+  bool level0_non_overlapping_;
+
+  // An index into files_by_compaction_pri_ that specifies the first
+  // file that is not yet compacted
+  std::vector<int> next_file_to_compact_by_size_;
+
+  // Only the first few entries of files_by_compaction_pri_ are sorted.
+  // There is no need to sort all the files because it is likely
+  // that on a running system, we need to look at only the first
+  // few largest files because a new version is created every few
+  // seconds/minutes (because of concurrent compactions).
+  static const size_t number_of_files_to_sort_ = 50;
+
+  // This vector contains list of files marked for compaction and also not
+  // currently being compacted. It is protected by DB mutex. It is calculated in
+  // ComputeCompactionScore()
+  autovector<std::pair<int, FileMetaData*>> files_marked_for_compaction_;
+
+  // Level that should be compacted next and its compaction score.
+  // Score < 1 means compaction is not strictly needed.  These fields
+  // are initialized by Finalize().
+  // The most critical level to be compacted is listed first
+  // These are used to pick the best compaction level
+  std::vector<double> compaction_score_;
+  std::vector<int> compaction_level_;
+  int l0_delay_trigger_count_ = 0;  // Count used to trigger slow down and stop
+                                    // for number of L0 files.
+
+  // the following are the sampled temporary stats.
+  // the current accumulated size of sampled files.
+  uint64_t accumulated_file_size_;
+  // the current accumulated size of all raw keys based on the sampled files.
+  uint64_t accumulated_raw_key_size_;
+  // the current accumulated size of all raw keys based on the sampled files.
+  uint64_t accumulated_raw_value_size_;
+  // total number of non-deletion entries
+  uint64_t accumulated_num_non_deletions_;
+  // total number of deletion entries
+  uint64_t accumulated_num_deletions_;
+  // current number of non_deletion entries
+  uint64_t current_num_non_deletions_;
+  // current number of delection entries
+  uint64_t current_num_deletions_;
+  // current number of file samples
+  uint64_t current_num_samples_;
+  // Estimated bytes needed to be compacted until all levels' size is down to
+  // target sizes.
+  uint64_t estimated_compaction_needed_bytes_;
+
+  bool finalized_;
+
+  // If set to true, we will run consistency checks even if RocksDB
+  // is compiled in release mode
+  bool force_consistency_checks_;
+
+  friend class Version;
+  friend class VersionSet;
+  // No copying allowed
+  VersionStorageInfo(const VersionStorageInfo&) = delete;
+  void operator=(const VersionStorageInfo&) = delete;
+};
+
+class Version {
+ public:
+  // Append to *iters a sequence of iterators that will
+  // yield the contents of this Version when merged together.
+  // REQUIRES: This version has been saved (see VersionSet::SaveTo)
+  void AddIterators(const ReadOptions&, const EnvOptions& soptions,
+                    MergeIteratorBuilder* merger_iter_builder,
+                    RangeDelAggregator* range_del_agg);
+
+  void AddIteratorsForLevel(const ReadOptions&, const EnvOptions& soptions,
+                            MergeIteratorBuilder* merger_iter_builder,
+                            int level, RangeDelAggregator* range_del_agg);
+
+  void AddRangeDelIteratorsForLevel(
+      const ReadOptions& read_options, const EnvOptions& soptions, int level,
+      std::vector<InternalIterator*>* range_del_iters);
+
+  // Lookup the value for key.  If found, store it in *val and
+  // return OK.  Else return a non-OK status.
+  // Uses *operands to store merge_operator operations to apply later.
+  //
+  // If the ReadOptions.read_tier is set to do a read-only fetch, then
+  // *value_found will be set to false if it cannot be determined whether
+  // this value exists without doing IO.
+  //
+  // If the key is Deleted, *status will be set to NotFound and
+  //                        *key_exists will be set to true.
+  // If no key was found, *status will be set to NotFound and
+  //                      *key_exists will be set to false.
+  // If seq is non-null, *seq will be set to the sequence number found
+  // for the key if a key was found.
+  //
+  // REQUIRES: lock is not held
+  void Get(const ReadOptions&, const LookupKey& key, PinnableSlice* value,
+           Status* status, MergeContext* merge_context,
+           RangeDelAggregator* range_del_agg, bool* value_found = nullptr,
+           bool* key_exists = nullptr, SequenceNumber* seq = nullptr);
+
+  // Loads some stats information from files. Call without mutex held. It needs
+  // to be called before applying the version to the version set.
+  void PrepareApply(const MutableCFOptions& mutable_cf_options,
+                    bool update_stats);
+
+  // Reference count management (so Versions do not disappear out from
+  // under live iterators)
+  void Ref();
+  // Decrease reference count. Delete the object if no reference left
+  // and return true. Otherwise, return false.
+  bool Unref();
+
+  // Add all files listed in the current version to *live.
+  void AddLiveFiles(std::vector<FileDescriptor>* live);
+
+  // Return a human readable string that describes this version's contents.
+  std::string DebugString(bool hex = false, bool print_stats = false) const;
+
+  // Returns the version nuber of this version
+  uint64_t GetVersionNumber() const { return version_number_; }
+
+  // REQUIRES: lock is held
+  // On success, "tp" will contains the table properties of the file
+  // specified in "file_meta".  If the file name of "file_meta" is
+  // known ahread, passing it by a non-null "fname" can save a
+  // file-name conversion.
+  Status GetTableProperties(std::shared_ptr<const TableProperties>* tp,
+                            const FileMetaData* file_meta,
+                            const std::string* fname = nullptr) const;
+
+  // REQUIRES: lock is held
+  // On success, *props will be populated with all SSTables' table properties.
+  // The keys of `props` are the sst file name, the values of `props` are the
+  // tables' propertis, represented as shared_ptr.
+  Status GetPropertiesOfAllTables(TablePropertiesCollection* props);
+  Status GetPropertiesOfAllTables(TablePropertiesCollection* props, int level);
+  Status GetPropertiesOfTablesInRange(const Range* range, std::size_t n,
+                                      TablePropertiesCollection* props) const;
+
+  // REQUIRES: lock is held
+  // On success, "tp" will contains the aggregated table property amoug
+  // the table properties of all sst files in this version.
+  Status GetAggregatedTableProperties(
+      std::shared_ptr<const TableProperties>* tp, int level = -1);
+
+  uint64_t GetEstimatedActiveKeys() {
+    return storage_info_.GetEstimatedActiveKeys();
+  }
+
+  size_t GetMemoryUsageByTableReaders();
+
+  ColumnFamilyData* cfd() const { return cfd_; }
+
+  // Return the next Version in the linked list. Used for debug only
+  Version* TEST_Next() const {
+    return next_;
+  }
+
+  int TEST_refs() const { return refs_; }
+
+  VersionStorageInfo* storage_info() { return &storage_info_; }
+
+  VersionSet* version_set() { return vset_; }
+
+  void GetColumnFamilyMetaData(ColumnFamilyMetaData* cf_meta);
+
+ private:
+  Env* env_;
+  friend class VersionSet;
+
+  const InternalKeyComparator* internal_comparator() const {
+    return storage_info_.internal_comparator_;
+  }
+  const Comparator* user_comparator() const {
+    return storage_info_.user_comparator_;
+  }
+
+  bool PrefixMayMatch(const ReadOptions& read_options,
+                      InternalIterator* level_iter,
+                      const Slice& internal_prefix) const;
+
+  // Returns true if the filter blocks in the specified level will not be
+  // checked during read operations. In certain cases (trivial move or preload),
+  // the filter block may already be cached, but we still do not access it such
+  // that it eventually expires from the cache.
+  bool IsFilterSkipped(int level, bool is_file_last_in_level = false);
+
+  // The helper function of UpdateAccumulatedStats, which may fill the missing
+  // fields of file_mata from its associated TableProperties.
+  // Returns true if it does initialize FileMetaData.
+  bool MaybeInitializeFileMetaData(FileMetaData* file_meta);
+
+  // Update the accumulated stats associated with the current version.
+  // This accumulated stats will be used in compaction.
+  void UpdateAccumulatedStats(bool update_stats);
+
+  // Sort all files for this version based on their file size and
+  // record results in files_by_compaction_pri_. The largest files are listed
+  // first.
+  void UpdateFilesByCompactionPri();
+
+  ColumnFamilyData* cfd_;  // ColumnFamilyData to which this Version belongs
+  Logger* info_log_;
+  Statistics* db_statistics_;
+  TableCache* table_cache_;
+  const MergeOperator* merge_operator_;
+
+  VersionStorageInfo storage_info_;
+  VersionSet* vset_;            // VersionSet to which this Version belongs
+  Version* next_;               // Next version in linked list
+  Version* prev_;               // Previous version in linked list
+  int refs_;                    // Number of live refs to this version
+
+  // A version number that uniquely represents this version. This is
+  // used for debugging and logging purposes only.
+  uint64_t version_number_;
+
+  Version(ColumnFamilyData* cfd, VersionSet* vset, uint64_t version_number = 0);
+
+  ~Version();
+
+  // No copying allowed
+  Version(const Version&);
+  void operator=(const Version&);
+};
+
+class VersionSet {
+ public:
+  VersionSet(const std::string& dbname, const ImmutableDBOptions* db_options,
+             const EnvOptions& env_options, Cache* table_cache,
+             WriteBufferManager* write_buffer_manager,
+             WriteController* write_controller);
+  ~VersionSet();
+
+  // Apply *edit to the current version to form a new descriptor that
+  // is both saved to persistent state and installed as the new
+  // current version.  Will release *mu while actually writing to the file.
+  // column_family_options has to be set if edit is column family add
+  // REQUIRES: *mu is held on entry.
+  // REQUIRES: no other thread concurrently calls LogAndApply()
+  Status LogAndApply(
+      ColumnFamilyData* column_family_data,
+      const MutableCFOptions& mutable_cf_options, VersionEdit* edit,
+      InstrumentedMutex* mu, Directory* db_directory = nullptr,
+      bool new_descriptor_log = false,
+      const ColumnFamilyOptions* column_family_options = nullptr) {
+    autovector<VersionEdit*> edit_list;
+    edit_list.push_back(edit);
+    return LogAndApply(column_family_data, mutable_cf_options, edit_list, mu,
+                       db_directory, new_descriptor_log, column_family_options);
+  }
+  // The batch version. If edit_list.size() > 1, caller must ensure that
+  // no edit in the list column family add or drop
+  Status LogAndApply(
+      ColumnFamilyData* column_family_data,
+      const MutableCFOptions& mutable_cf_options,
+      const autovector<VersionEdit*>& edit_list, InstrumentedMutex* mu,
+      Directory* db_directory = nullptr, bool new_descriptor_log = false,
+      const ColumnFamilyOptions* column_family_options = nullptr);
+
+  // Recover the last saved descriptor from persistent storage.
+  // If read_only == true, Recover() will not complain if some column families
+  // are not opened
+  Status Recover(const std::vector<ColumnFamilyDescriptor>& column_families,
+                 bool read_only = false);
+
+  // Reads a manifest file and returns a list of column families in
+  // column_families.
+  static Status ListColumnFamilies(std::vector<std::string>* column_families,
+                                   const std::string& dbname, Env* env);
+
+#ifndef ROCKSDB_LITE
+  // Try to reduce the number of levels. This call is valid when
+  // only one level from the new max level to the old
+  // max level containing files.
+  // The call is static, since number of levels is immutable during
+  // the lifetime of a RocksDB instance. It reduces number of levels
+  // in a DB by applying changes to manifest.
+  // For example, a db currently has 7 levels [0-6], and a call to
+  // to reduce to 5 [0-4] can only be executed when only one level
+  // among [4-6] contains files.
+  static Status ReduceNumberOfLevels(const std::string& dbname,
+                                     const Options* options,
+                                     const EnvOptions& env_options,
+                                     int new_levels);
+
+  // printf contents (for debugging)
+  Status DumpManifest(Options& options, std::string& manifestFileName,
+                      bool verbose, bool hex = false, bool json = false);
+
+#endif  // ROCKSDB_LITE
+
+  // Return the current manifest file number
+  uint64_t manifest_file_number() const { return manifest_file_number_; }
+
+  uint64_t options_file_number() const { return options_file_number_; }
+
+  uint64_t pending_manifest_file_number() const {
+    return pending_manifest_file_number_;
+  }
+
+  uint64_t current_next_file_number() const { return next_file_number_.load(); }
+
+  // Allocate and return a new file number
+  uint64_t NewFileNumber() { return next_file_number_.fetch_add(1); }
+
+  // Return the last sequence number.
+  uint64_t LastSequence() const {
+    return last_sequence_.load(std::memory_order_acquire);
+  }
+
+  // Note: memory_order_acquire must be sufficient.
+  uint64_t LastToBeWrittenSequence() const {
+    return last_to_be_written_sequence_.load(std::memory_order_seq_cst);
+  }
+
+  // Set the last sequence number to s.
+  void SetLastSequence(uint64_t s) {
+    assert(s >= last_sequence_);
+    // Last visible seqeunce must always be less than last written seq
+    assert(!db_options_->concurrent_prepare ||
+           s <= last_to_be_written_sequence_);
+    last_sequence_.store(s, std::memory_order_release);
+  }
+
+  // Note: memory_order_release must be sufficient
+  void SetLastToBeWrittenSequence(uint64_t s) {
+    assert(s >= last_to_be_written_sequence_);
+    last_to_be_written_sequence_.store(s, std::memory_order_seq_cst);
+  }
+
+  // Note: memory_order_release must be sufficient
+  uint64_t FetchAddLastToBeWrittenSequence(uint64_t s) {
+    return last_to_be_written_sequence_.fetch_add(s, std::memory_order_seq_cst);
+  }
+
+  // Mark the specified file number as used.
+  // REQUIRED: this is only called during single-threaded recovery
+  void MarkFileNumberUsedDuringRecovery(uint64_t number);
+
+  // Return the log file number for the log file that is currently
+  // being compacted, or zero if there is no such log file.
+  uint64_t prev_log_number() const { return prev_log_number_; }
+
+  // Returns the minimum log number such that all
+  // log numbers less than or equal to it can be deleted
+  uint64_t MinLogNumber() const {
+    uint64_t min_log_num = std::numeric_limits<uint64_t>::max();
+    for (auto cfd : *column_family_set_) {
+      // It's safe to ignore dropped column families here:
+      // cfd->IsDropped() becomes true after the drop is persisted in MANIFEST.
+      if (min_log_num > cfd->GetLogNumber() && !cfd->IsDropped()) {
+        min_log_num = cfd->GetLogNumber();
+      }
+    }
+    return min_log_num;
+  }
+
+  // Create an iterator that reads over the compaction inputs for "*c".
+  // The caller should delete the iterator when no longer needed.
+  InternalIterator* MakeInputIterator(const Compaction* c,
+                                      RangeDelAggregator* range_del_agg);
+
+  // Add all files listed in any live version to *live.
+  void AddLiveFiles(std::vector<FileDescriptor>* live_list);
+
+  // Return the approximate size of data to be scanned for range [start, end)
+  // in levels [start_level, end_level). If end_level == 0 it will search
+  // through all non-empty levels
+  uint64_t ApproximateSize(Version* v, const Slice& start, const Slice& end,
+                           int start_level = 0, int end_level = -1);
+
+  // Return the size of the current manifest file
+  uint64_t manifest_file_size() const { return manifest_file_size_; }
+
+  // verify that the files that we started with for a compaction
+  // still exist in the current version and in the same original level.
+  // This ensures that a concurrent compaction did not erroneously
+  // pick the same files to compact.
+  bool VerifyCompactionFileConsistency(Compaction* c);
+
+  Status GetMetadataForFile(uint64_t number, int* filelevel,
+                            FileMetaData** metadata, ColumnFamilyData** cfd);
+
+  // This function doesn't support leveldb SST filenames
+  void GetLiveFilesMetaData(std::vector<LiveFileMetaData> *metadata);
+
+  void GetObsoleteFiles(std::vector<FileMetaData*>* files,
+                        std::vector<std::string>* manifest_filenames,
+                        uint64_t min_pending_output);
+
+  ColumnFamilySet* GetColumnFamilySet() { return column_family_set_.get(); }
+  const EnvOptions& env_options() { return env_options_; }
+
+  static uint64_t GetNumLiveVersions(Version* dummy_versions);
+
+  static uint64_t GetTotalSstFilesSize(Version* dummy_versions);
+
+ private:
+  struct ManifestWriter;
+
+  friend class Version;
+  friend class DBImpl;
+
+  struct LogReporter : public log::Reader::Reporter {
+    Status* status;
+    virtual void Corruption(size_t bytes, const Status& s) override {
+      if (this->status->ok()) *this->status = s;
+    }
+  };
+
+  // ApproximateSize helper
+  uint64_t ApproximateSizeLevel0(Version* v, const LevelFilesBrief& files_brief,
+                                 const Slice& start, const Slice& end);
+
+  uint64_t ApproximateSize(Version* v, const FdWithKeyRange& f,
+                           const Slice& key);
+
+  // Save current contents to *log
+  Status WriteSnapshot(log::Writer* log);
+
+  void AppendVersion(ColumnFamilyData* column_family_data, Version* v);
+
+  ColumnFamilyData* CreateColumnFamily(const ColumnFamilyOptions& cf_options,
+                                       VersionEdit* edit);
+
+  std::unique_ptr<ColumnFamilySet> column_family_set_;
+
+  Env* const env_;
+  const std::string dbname_;
+  const ImmutableDBOptions* const db_options_;
+  std::atomic<uint64_t> next_file_number_;
+  uint64_t manifest_file_number_;
+  uint64_t options_file_number_;
+  uint64_t pending_manifest_file_number_;
+  // The last seq visible to reads
+  std::atomic<uint64_t> last_sequence_;
+  // The last seq with which a writer has written/will write.
+  std::atomic<uint64_t> last_to_be_written_sequence_;
+  uint64_t prev_log_number_;  // 0 or backing store for memtable being compacted
+
+  // Opened lazily
+  unique_ptr<log::Writer> descriptor_log_;
+
+  // generates a increasing version number for every new version
+  uint64_t current_version_number_;
+
+  // Queue of writers to the manifest file
+  std::deque<ManifestWriter*> manifest_writers_;
+
+  // Current size of manifest file
+  uint64_t manifest_file_size_;
+
+  std::vector<FileMetaData*> obsolete_files_;
+  std::vector<std::string> obsolete_manifests_;
+
+  // env options for all reads and writes except compactions
+  const EnvOptions& env_options_;
+
+  // env options used for compactions. This is a copy of
+  // env_options_ but with readaheads set to readahead_compactions_.
+  const EnvOptions env_options_compactions_;
+
+  // No copying allowed
+  VersionSet(const VersionSet&);
+  void operator=(const VersionSet&);
+
+  void LogAndApplyCFHelper(VersionEdit* edit);
+  void LogAndApplyHelper(ColumnFamilyData* cfd, VersionBuilder* b, Version* v,
+                         VersionEdit* edit, InstrumentedMutex* mu);
+};
+
+}  // namespace rocksdb

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/wal_manager.cc
----------------------------------------------------------------------
diff --git a/thirdparty/rocksdb/db/wal_manager.cc b/thirdparty/rocksdb/db/wal_manager.cc
new file mode 100644
index 0000000..4a9ecbf
--- /dev/null
+++ b/thirdparty/rocksdb/db/wal_manager.cc
@@ -0,0 +1,476 @@
+//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#include "db/wal_manager.h"
+
+#ifndef __STDC_FORMAT_MACROS
+#define __STDC_FORMAT_MACROS
+#endif
+
+#include <inttypes.h>
+#include <algorithm>
+#include <vector>
+#include <memory>
+
+#include "db/log_reader.h"
+#include "db/log_writer.h"
+#include "db/transaction_log_impl.h"
+#include "db/write_batch_internal.h"
+#include "port/port.h"
+#include "rocksdb/env.h"
+#include "rocksdb/options.h"
+#include "rocksdb/write_batch.h"
+#include "util/cast_util.h"
+#include "util/coding.h"
+#include "util/file_reader_writer.h"
+#include "util/filename.h"
+#include "util/logging.h"
+#include "util/mutexlock.h"
+#include "util/string_util.h"
+#include "util/sync_point.h"
+
+namespace rocksdb {
+
+#ifndef ROCKSDB_LITE
+
+Status WalManager::GetSortedWalFiles(VectorLogPtr& files) {
+  // First get sorted files in db dir, then get sorted files from archived
+  // dir, to avoid a race condition where a log file is moved to archived
+  // dir in between.
+  Status s;
+  // list wal files in main db dir.
+  VectorLogPtr logs;
+  s = GetSortedWalsOfType(db_options_.wal_dir, logs, kAliveLogFile);
+  if (!s.ok()) {
+    return s;
+  }
+
+  // Reproduce the race condition where a log file is moved
+  // to archived dir, between these two sync points, used in
+  // (DBTest,TransactionLogIteratorRace)
+  TEST_SYNC_POINT("WalManager::GetSortedWalFiles:1");
+  TEST_SYNC_POINT("WalManager::GetSortedWalFiles:2");
+
+  files.clear();
+  // list wal files in archive dir.
+  std::string archivedir = ArchivalDirectory(db_options_.wal_dir);
+  Status exists = env_->FileExists(archivedir);
+  if (exists.ok()) {
+    s = GetSortedWalsOfType(archivedir, files, kArchivedLogFile);
+    if (!s.ok()) {
+      return s;
+    }
+  } else if (!exists.IsNotFound()) {
+    assert(s.IsIOError());
+    return s;
+  }
+
+  uint64_t latest_archived_log_number = 0;
+  if (!files.empty()) {
+    latest_archived_log_number = files.back()->LogNumber();
+    ROCKS_LOG_INFO(db_options_.info_log, "Latest Archived log: %" PRIu64,
+                   latest_archived_log_number);
+  }
+
+  files.reserve(files.size() + logs.size());
+  for (auto& log : logs) {
+    if (log->LogNumber() > latest_archived_log_number) {
+      files.push_back(std::move(log));
+    } else {
+      // When the race condition happens, we could see the
+      // same log in both db dir and archived dir. Simply
+      // ignore the one in db dir. Note that, if we read
+      // archived dir first, we would have missed the log file.
+      ROCKS_LOG_WARN(db_options_.info_log, "%s already moved to archive",
+                     log->PathName().c_str());
+    }
+  }
+
+  return s;
+}
+
+Status WalManager::GetUpdatesSince(
+    SequenceNumber seq, std::unique_ptr<TransactionLogIterator>* iter,
+    const TransactionLogIterator::ReadOptions& read_options,
+    VersionSet* version_set) {
+
+  //  Get all sorted Wal Files.
+  //  Do binary search and open files and find the seq number.
+
+  std::unique_ptr<VectorLogPtr> wal_files(new VectorLogPtr);
+  Status s = GetSortedWalFiles(*wal_files);
+  if (!s.ok()) {
+    return s;
+  }
+
+  s = RetainProbableWalFiles(*wal_files, seq);
+  if (!s.ok()) {
+    return s;
+  }
+  iter->reset(new TransactionLogIteratorImpl(
+      db_options_.wal_dir, &db_options_, read_options, env_options_, seq,
+      std::move(wal_files), version_set));
+  return (*iter)->status();
+}
+
+// 1. Go through all archived files and
+//    a. if ttl is enabled, delete outdated files
+//    b. if archive size limit is enabled, delete empty files,
+//        compute file number and size.
+// 2. If size limit is enabled:
+//    a. compute how many files should be deleted
+//    b. get sorted non-empty archived logs
+//    c. delete what should be deleted
+void WalManager::PurgeObsoleteWALFiles() {
+  bool const ttl_enabled = db_options_.wal_ttl_seconds > 0;
+  bool const size_limit_enabled = db_options_.wal_size_limit_mb > 0;
+  if (!ttl_enabled && !size_limit_enabled) {
+    return;
+  }
+
+  int64_t current_time;
+  Status s = env_->GetCurrentTime(&current_time);
+  if (!s.ok()) {
+    ROCKS_LOG_ERROR(db_options_.info_log, "Can't get current time: %s",
+                    s.ToString().c_str());
+    assert(false);
+    return;
+  }
+  uint64_t const now_seconds = static_cast<uint64_t>(current_time);
+  uint64_t const time_to_check = (ttl_enabled && !size_limit_enabled)
+                                     ? db_options_.wal_ttl_seconds / 2
+                                     : kDefaultIntervalToDeleteObsoleteWAL;
+
+  if (purge_wal_files_last_run_ + time_to_check > now_seconds) {
+    return;
+  }
+
+  purge_wal_files_last_run_ = now_seconds;
+
+  std::string archival_dir = ArchivalDirectory(db_options_.wal_dir);
+  std::vector<std::string> files;
+  s = env_->GetChildren(archival_dir, &files);
+  if (!s.ok()) {
+    ROCKS_LOG_ERROR(db_options_.info_log, "Can't get archive files: %s",
+                    s.ToString().c_str());
+    assert(false);
+    return;
+  }
+
+  size_t log_files_num = 0;
+  uint64_t log_file_size = 0;
+
+  for (auto& f : files) {
+    uint64_t number;
+    FileType type;
+    if (ParseFileName(f, &number, &type) && type == kLogFile) {
+      std::string const file_path = archival_dir + "/" + f;
+      if (ttl_enabled) {
+        uint64_t file_m_time;
+        s = env_->GetFileModificationTime(file_path, &file_m_time);
+        if (!s.ok()) {
+          ROCKS_LOG_WARN(db_options_.info_log,
+                         "Can't get file mod time: %s: %s", file_path.c_str(),
+                         s.ToString().c_str());
+          continue;
+        }
+        if (now_seconds - file_m_time > db_options_.wal_ttl_seconds) {
+          s = env_->DeleteFile(file_path);
+          if (!s.ok()) {
+            ROCKS_LOG_WARN(db_options_.info_log, "Can't delete file: %s: %s",
+                           file_path.c_str(), s.ToString().c_str());
+            continue;
+          } else {
+            MutexLock l(&read_first_record_cache_mutex_);
+            read_first_record_cache_.erase(number);
+          }
+          continue;
+        }
+      }
+
+      if (size_limit_enabled) {
+        uint64_t file_size;
+        s = env_->GetFileSize(file_path, &file_size);
+        if (!s.ok()) {
+          ROCKS_LOG_ERROR(db_options_.info_log,
+                          "Unable to get file size: %s: %s", file_path.c_str(),
+                          s.ToString().c_str());
+          return;
+        } else {
+          if (file_size > 0) {
+            log_file_size = std::max(log_file_size, file_size);
+            ++log_files_num;
+          } else {
+            s = env_->DeleteFile(file_path);
+            if (!s.ok()) {
+              ROCKS_LOG_WARN(db_options_.info_log,
+                             "Unable to delete file: %s: %s", file_path.c_str(),
+                             s.ToString().c_str());
+              continue;
+            } else {
+              MutexLock l(&read_first_record_cache_mutex_);
+              read_first_record_cache_.erase(number);
+            }
+          }
+        }
+      }
+    }
+  }
+
+  if (0 == log_files_num || !size_limit_enabled) {
+    return;
+  }
+
+  size_t const files_keep_num =
+      db_options_.wal_size_limit_mb * 1024 * 1024 / log_file_size;
+  if (log_files_num <= files_keep_num) {
+    return;
+  }
+
+  size_t files_del_num = log_files_num - files_keep_num;
+  VectorLogPtr archived_logs;
+  GetSortedWalsOfType(archival_dir, archived_logs, kArchivedLogFile);
+
+  if (files_del_num > archived_logs.size()) {
+    ROCKS_LOG_WARN(db_options_.info_log,
+                   "Trying to delete more archived log files than "
+                   "exist. Deleting all");
+    files_del_num = archived_logs.size();
+  }
+
+  for (size_t i = 0; i < files_del_num; ++i) {
+    std::string const file_path = archived_logs[i]->PathName();
+    s = env_->DeleteFile(db_options_.wal_dir + "/" + file_path);
+    if (!s.ok()) {
+      ROCKS_LOG_WARN(db_options_.info_log, "Unable to delete file: %s: %s",
+                     file_path.c_str(), s.ToString().c_str());
+      continue;
+    } else {
+      MutexLock l(&read_first_record_cache_mutex_);
+      read_first_record_cache_.erase(archived_logs[i]->LogNumber());
+    }
+  }
+}
+
+void WalManager::ArchiveWALFile(const std::string& fname, uint64_t number) {
+  auto archived_log_name = ArchivedLogFileName(db_options_.wal_dir, number);
+  // The sync point below is used in (DBTest,TransactionLogIteratorRace)
+  TEST_SYNC_POINT("WalManager::PurgeObsoleteFiles:1");
+  Status s = env_->RenameFile(fname, archived_log_name);
+  // The sync point below is used in (DBTest,TransactionLogIteratorRace)
+  TEST_SYNC_POINT("WalManager::PurgeObsoleteFiles:2");
+  ROCKS_LOG_INFO(db_options_.info_log, "Move log file %s to %s -- %s\n",
+                 fname.c_str(), archived_log_name.c_str(),
+                 s.ToString().c_str());
+}
+
+namespace {
+struct CompareLogByPointer {
+  bool operator()(const std::unique_ptr<LogFile>& a,
+                  const std::unique_ptr<LogFile>& b) {
+    LogFileImpl* a_impl = static_cast_with_check<LogFileImpl, LogFile>(a.get());
+    LogFileImpl* b_impl = static_cast_with_check<LogFileImpl, LogFile>(b.get());
+    return *a_impl < *b_impl;
+  }
+};
+}
+
+Status WalManager::GetSortedWalsOfType(const std::string& path,
+                                       VectorLogPtr& log_files,
+                                       WalFileType log_type) {
+  std::vector<std::string> all_files;
+  const Status status = env_->GetChildren(path, &all_files);
+  if (!status.ok()) {
+    return status;
+  }
+  log_files.reserve(all_files.size());
+  for (const auto& f : all_files) {
+    uint64_t number;
+    FileType type;
+    if (ParseFileName(f, &number, &type) && type == kLogFile) {
+      SequenceNumber sequence;
+      Status s = ReadFirstRecord(log_type, number, &sequence);
+      if (!s.ok()) {
+        return s;
+      }
+      if (sequence == 0) {
+        // empty file
+        continue;
+      }
+
+      // Reproduce the race condition where a log file is moved
+      // to archived dir, between these two sync points, used in
+      // (DBTest,TransactionLogIteratorRace)
+      TEST_SYNC_POINT("WalManager::GetSortedWalsOfType:1");
+      TEST_SYNC_POINT("WalManager::GetSortedWalsOfType:2");
+
+      uint64_t size_bytes;
+      s = env_->GetFileSize(LogFileName(path, number), &size_bytes);
+      // re-try in case the alive log file has been moved to archive.
+      std::string archived_file = ArchivedLogFileName(path, number);
+      if (!s.ok() && log_type == kAliveLogFile &&
+          env_->FileExists(archived_file).ok()) {
+        s = env_->GetFileSize(archived_file, &size_bytes);
+        if (!s.ok() && env_->FileExists(archived_file).IsNotFound()) {
+          // oops, the file just got deleted from archived dir! move on
+          s = Status::OK();
+          continue;
+        }
+      }
+      if (!s.ok()) {
+        return s;
+      }
+
+      log_files.push_back(std::unique_ptr<LogFile>(
+          new LogFileImpl(number, log_type, sequence, size_bytes)));
+    }
+  }
+  CompareLogByPointer compare_log_files;
+  std::sort(log_files.begin(), log_files.end(), compare_log_files);
+  return status;
+}
+
+Status WalManager::RetainProbableWalFiles(VectorLogPtr& all_logs,
+                                          const SequenceNumber target) {
+  int64_t start = 0;  // signed to avoid overflow when target is < first file.
+  int64_t end = static_cast<int64_t>(all_logs.size()) - 1;
+  // Binary Search. avoid opening all files.
+  while (end >= start) {
+    int64_t mid = start + (end - start) / 2;  // Avoid overflow.
+    SequenceNumber current_seq_num = all_logs.at(mid)->StartSequence();
+    if (current_seq_num == target) {
+      end = mid;
+      break;
+    } else if (current_seq_num < target) {
+      start = mid + 1;
+    } else {
+      end = mid - 1;
+    }
+  }
+  // end could be -ve.
+  size_t start_index = std::max(static_cast<int64_t>(0), end);
+  // The last wal file is always included
+  all_logs.erase(all_logs.begin(), all_logs.begin() + start_index);
+  return Status::OK();
+}
+
+Status WalManager::ReadFirstRecord(const WalFileType type,
+                                   const uint64_t number,
+                                   SequenceNumber* sequence) {
+  *sequence = 0;
+  if (type != kAliveLogFile && type != kArchivedLogFile) {
+    ROCKS_LOG_ERROR(db_options_.info_log, "[WalManger] Unknown file type %s",
+                    ToString(type).c_str());
+    return Status::NotSupported(
+        "File Type Not Known " + ToString(type));
+  }
+  {
+    MutexLock l(&read_first_record_cache_mutex_);
+    auto itr = read_first_record_cache_.find(number);
+    if (itr != read_first_record_cache_.end()) {
+      *sequence = itr->second;
+      return Status::OK();
+    }
+  }
+  Status s;
+  if (type == kAliveLogFile) {
+    std::string fname = LogFileName(db_options_.wal_dir, number);
+    s = ReadFirstLine(fname, number, sequence);
+    if (env_->FileExists(fname).ok() && !s.ok()) {
+      // return any error that is not caused by non-existing file
+      return s;
+    }
+  }
+
+  if (type == kArchivedLogFile || !s.ok()) {
+    //  check if the file got moved to archive.
+    std::string archived_file =
+        ArchivedLogFileName(db_options_.wal_dir, number);
+    s = ReadFirstLine(archived_file, number, sequence);
+    // maybe the file was deleted from archive dir. If that's the case, return
+    // Status::OK(). The caller with identify this as empty file because
+    // *sequence == 0
+    if (!s.ok() && env_->FileExists(archived_file).IsNotFound()) {
+      return Status::OK();
+    }
+  }
+
+  if (s.ok() && *sequence != 0) {
+    MutexLock l(&read_first_record_cache_mutex_);
+    read_first_record_cache_.insert({number, *sequence});
+  }
+  return s;
+}
+
+// the function returns status.ok() and sequence == 0 if the file exists, but is
+// empty
+Status WalManager::ReadFirstLine(const std::string& fname,
+                                 const uint64_t number,
+                                 SequenceNumber* sequence) {
+  struct LogReporter : public log::Reader::Reporter {
+    Env* env;
+    Logger* info_log;
+    const char* fname;
+
+    Status* status;
+    bool ignore_error;  // true if db_options_.paranoid_checks==false
+    virtual void Corruption(size_t bytes, const Status& s) override {
+      ROCKS_LOG_WARN(info_log, "[WalManager] %s%s: dropping %d bytes; %s",
+                     (this->ignore_error ? "(ignoring error) " : ""), fname,
+                     static_cast<int>(bytes), s.ToString().c_str());
+      if (this->status->ok()) {
+        // only keep the first error
+        *this->status = s;
+      }
+    }
+  };
+
+  std::unique_ptr<SequentialFile> file;
+  Status status = env_->NewSequentialFile(
+      fname, &file, env_->OptimizeForLogRead(env_options_));
+  unique_ptr<SequentialFileReader> file_reader(
+      new SequentialFileReader(std::move(file)));
+
+  if (!status.ok()) {
+    return status;
+  }
+
+  LogReporter reporter;
+  reporter.env = env_;
+  reporter.info_log = db_options_.info_log.get();
+  reporter.fname = fname.c_str();
+  reporter.status = &status;
+  reporter.ignore_error = !db_options_.paranoid_checks;
+  log::Reader reader(db_options_.info_log, std::move(file_reader), &reporter,
+                     true /*checksum*/, 0 /*initial_offset*/, number);
+  std::string scratch;
+  Slice record;
+
+  if (reader.ReadRecord(&record, &scratch) &&
+      (status.ok() || !db_options_.paranoid_checks)) {
+    if (record.size() < WriteBatchInternal::kHeader) {
+      reporter.Corruption(record.size(),
+                          Status::Corruption("log record too small"));
+      // TODO read record's till the first no corrupt entry?
+    } else {
+      WriteBatch batch;
+      WriteBatchInternal::SetContents(&batch, record);
+      *sequence = WriteBatchInternal::Sequence(&batch);
+      return Status::OK();
+    }
+  }
+
+  // ReadRecord returns false on EOF, which means that the log file is empty. we
+  // return status.ok() in that case and set sequence number to 0
+  *sequence = 0;
+  return status;
+}
+
+#endif  // ROCKSDB_LITE
+}  // namespace rocksdb

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/wal_manager.h
----------------------------------------------------------------------
diff --git a/thirdparty/rocksdb/db/wal_manager.h b/thirdparty/rocksdb/db/wal_manager.h
new file mode 100644
index 0000000..aa62d79
--- /dev/null
+++ b/thirdparty/rocksdb/db/wal_manager.h
@@ -0,0 +1,95 @@
+//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+#pragma once
+
+#include <atomic>
+#include <deque>
+#include <limits>
+#include <set>
+#include <utility>
+#include <vector>
+#include <string>
+#include <memory>
+
+#include "db/version_set.h"
+#include "options/db_options.h"
+#include "port/port.h"
+#include "rocksdb/env.h"
+#include "rocksdb/status.h"
+#include "rocksdb/transaction_log.h"
+#include "rocksdb/types.h"
+
+namespace rocksdb {
+
+#ifndef ROCKSDB_LITE
+class WalManager {
+ public:
+  WalManager(const ImmutableDBOptions& db_options,
+             const EnvOptions& env_options)
+      : db_options_(db_options),
+        env_options_(env_options),
+        env_(db_options.env),
+        purge_wal_files_last_run_(0) {}
+
+  Status GetSortedWalFiles(VectorLogPtr& files);
+
+  Status GetUpdatesSince(
+      SequenceNumber seq_number, std::unique_ptr<TransactionLogIterator>* iter,
+      const TransactionLogIterator::ReadOptions& read_options,
+      VersionSet* version_set);
+
+  void PurgeObsoleteWALFiles();
+
+  void ArchiveWALFile(const std::string& fname, uint64_t number);
+
+  Status TEST_ReadFirstRecord(const WalFileType type, const uint64_t number,
+                              SequenceNumber* sequence) {
+    return ReadFirstRecord(type, number, sequence);
+  }
+
+  Status TEST_ReadFirstLine(const std::string& fname, const uint64_t number,
+                            SequenceNumber* sequence) {
+    return ReadFirstLine(fname, number, sequence);
+  }
+
+ private:
+  Status GetSortedWalsOfType(const std::string& path, VectorLogPtr& log_files,
+                             WalFileType type);
+  // Requires: all_logs should be sorted with earliest log file first
+  // Retains all log files in all_logs which contain updates with seq no.
+  // Greater Than or Equal to the requested SequenceNumber.
+  Status RetainProbableWalFiles(VectorLogPtr& all_logs,
+                                const SequenceNumber target);
+
+  Status ReadFirstRecord(const WalFileType type, const uint64_t number,
+                         SequenceNumber* sequence);
+
+  Status ReadFirstLine(const std::string& fname, const uint64_t number,
+                       SequenceNumber* sequence);
+
+  // ------- state from DBImpl ------
+  const ImmutableDBOptions& db_options_;
+  const EnvOptions& env_options_;
+  Env* env_;
+
+  // ------- WalManager state -------
+  // cache for ReadFirstRecord() calls
+  std::unordered_map<uint64_t, SequenceNumber> read_first_record_cache_;
+  port::Mutex read_first_record_cache_mutex_;
+
+  // last time when PurgeObsoleteWALFiles ran.
+  uint64_t purge_wal_files_last_run_;
+
+  // obsolete files will be deleted every this seconds if ttl deletion is
+  // enabled and archive size_limit is disabled.
+  static const uint64_t kDefaultIntervalToDeleteObsoleteWAL = 600;
+};
+
+#endif  // ROCKSDB_LITE
+}  // namespace rocksdb


Mime
View raw message