nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jeremyd...@apache.org
Subject [49/51] [partial] nifi-minifi-cpp git commit: MINIFI-372: Replace leveldb with RocksDB
Date Mon, 09 Oct 2017 16:25:29 GMT
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/libminifi/include/core/repository/FlowFileRepository.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/repository/FlowFileRepository.h b/libminifi/include/core/repository/FlowFileRepository.h
deleted file mode 100644
index f2691ac..0000000
--- a/libminifi/include/core/repository/FlowFileRepository.h
+++ /dev/null
@@ -1,167 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef LIBMINIFI_INCLUDE_CORE_REPOSITORY_FLOWFILEREPOSITORY_H_
-#define LIBMINIFI_INCLUDE_CORE_REPOSITORY_FLOWFILEREPOSITORY_H_
-
-#include "leveldb/db.h"
-#include "leveldb/options.h"
-#include "leveldb/slice.h"
-#include "leveldb/status.h"
-#include "core/Repository.h"
-#include "core/Core.h"
-#include "Connection.h"
-#include "core/logging/LoggerConfiguration.h"
-#include "concurrentqueue.h"
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace core {
-namespace repository {
-
-#define FLOWFILE_REPOSITORY_DIRECTORY "./flowfile_repository"
-#define MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE (10*1024*1024) // 10M
-#define MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME (600000) // 10 minute
-#define FLOWFILE_REPOSITORY_PURGE_PERIOD (2000) // 2000 msec
-
-/**
- * Flow File repository
- * Design: Extends Repository and implements the run function, using LevelDB as the primary substrate.
- */
-class FlowFileRepository : public core::Repository, public std::enable_shared_from_this<FlowFileRepository> {
- public:
-  // Constructor
-
-  FlowFileRepository(const std::string repo_name = "", std::string directory = FLOWFILE_REPOSITORY_DIRECTORY, int64_t maxPartitionMillis = MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME,
-                     int64_t maxPartitionBytes = MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE, uint64_t purgePeriod = FLOWFILE_REPOSITORY_PURGE_PERIOD)
-      : Repository(repo_name.length() > 0 ? repo_name : core::getClassName<FlowFileRepository>(), directory, maxPartitionMillis, maxPartitionBytes, purgePeriod),
-        logger_(logging::LoggerFactory<FlowFileRepository>::getLogger()),
-        content_repo_(nullptr) {
-    db_ = NULL;
-  }
-
-  // Destructor
-  ~FlowFileRepository() {
-    if (db_)
-      delete db_;
-  }
-
-  virtual void flush();
-
-  // initialize
-  virtual bool initialize(const std::shared_ptr<Configure> &configure) {
-    std::string value;
-
-    if (configure->get(Configure::nifi_flowfile_repository_directory_default, value)) {
-      directory_ = value;
-    }
-    logger_->log_info("NiFi FlowFile Repository Directory %s", directory_.c_str());
-    if (configure->get(Configure::nifi_flowfile_repository_max_storage_size, value)) {
-      Property::StringToInt(value, max_partition_bytes_);
-    }
-    logger_->log_info("NiFi FlowFile Max Partition Bytes %d", max_partition_bytes_);
-    if (configure->get(Configure::nifi_flowfile_repository_max_storage_time, value)) {
-      TimeUnit unit;
-      if (Property::StringToTime(value, max_partition_millis_, unit) && Property::ConvertTimeUnitToMS(max_partition_millis_, unit, max_partition_millis_)) {
-      }
-    }
-    logger_->log_info("NiFi FlowFile Max Storage Time: [%d] ms", max_partition_millis_);
-    leveldb::Options options;
-    options.create_if_missing = true;
-    leveldb::Status status = leveldb::DB::Open(options, directory_.c_str(), &db_);
-    if (status.ok()) {
-      logger_->log_info("NiFi FlowFile Repository database open %s success", directory_.c_str());
-    } else {
-      logger_->log_error("NiFi FlowFile Repository database open %s fail", directory_.c_str());
-      return false;
-    }
-    return true;
-  }
-
-  virtual void run();
-
-  virtual bool Put(std::string key, const uint8_t *buf, size_t bufLen) {
-
-    // persistent to the DB
-    leveldb::Slice value((const char *) buf, bufLen);
-    leveldb::Status status;
-    repo_size_ += bufLen;
-    status = db_->Put(leveldb::WriteOptions(), key, value);
-    if (status.ok())
-      return true;
-    else
-      return false;
-  }
-  /**
-   * 
-   * Deletes the key
-   * @return status of the delete operation
-   */
-  virtual bool Delete(std::string key) {
-    keys_to_delete.enqueue(key);
-    return true;
-  }
-  /**
-   * Sets the value from the provided key
-   * @return status of the get operation.
-   */
-  virtual bool Get(const std::string &key, std::string &value) {
-    if (db_ == nullptr)
-      return false;
-    leveldb::Status status;
-    status = db_->Get(leveldb::ReadOptions(), key, &value);
-    if (status.ok())
-      return true;
-    else
-      return false;
-  }
-
-  void setConnectionMap(std::map<std::string, std::shared_ptr<minifi::Connection>> &connectionMap) {
-    this->connectionMap = connectionMap;
-  }
-  virtual void loadComponent(const std::shared_ptr<core::ContentRepository> &content_repo);
-
-  void start() {
-    if (this->purge_period_ <= 0) {
-      return;
-    }
-    if (running_) {
-      return;
-    }
-    running_ = true;
-    thread_ = std::thread(&FlowFileRepository::run, shared_from_this());
-    logger_->log_info("%s Repository Monitor Thread Start", name_.c_str());
-  }
-
- private:
-  moodycamel::ConcurrentQueue<std::string> keys_to_delete;
-  std::map<std::string, std::shared_ptr<minifi::Connection>> connectionMap;
-  std::shared_ptr<core::ContentRepository> content_repo_;
-  leveldb::DB* db_;
-  std::shared_ptr<logging::Logger> logger_;
-};
-
-} /* namespace repository */
-} /* namespace core */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
-
-#endif /* LIBMINIFI_INCLUDE_CORE_REPOSITORY_FLOWFILEREPOSITORY_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/libminifi/include/core/repository/VolatileContentRepository.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/repository/VolatileContentRepository.h b/libminifi/include/core/repository/VolatileContentRepository.h
index 8507216..623f7be 100644
--- a/libminifi/include/core/repository/VolatileContentRepository.h
+++ b/libminifi/include/core/repository/VolatileContentRepository.h
@@ -24,6 +24,7 @@
 #include "../ContentRepository.h"
 #include "core/repository/VolatileRepository.h"
 #include "properties/Configure.h"
+#include "core/Connectable.h"
 #include "core/logging/LoggerConfiguration.h"
 namespace org {
 namespace apache {
@@ -36,13 +37,14 @@ namespace repository {
  * Purpose: Stages content into a volatile area of memory. Note that   when the maximum number
  * of entries is consumed we will rollback a session to wait for others to be freed.
  */
-class VolatileContentRepository : public core::ContentRepository, public core::repository::VolatileRepository<std::shared_ptr<minifi::ResourceClaim>> {
+class VolatileContentRepository : public core::ContentRepository, public virtual core::repository::VolatileRepository<std::shared_ptr<minifi::ResourceClaim>> {
  public:
 
   static const char *minimal_locking;
 
   explicit VolatileContentRepository(std::string name = getClassName<VolatileContentRepository>())
       : core::repository::VolatileRepository<std::shared_ptr<minifi::ResourceClaim>>(name),
+        core::SerializableComponent(name,0),
         logger_(logging::LoggerFactory<VolatileContentRepository>::getLogger()),
         minimize_locking_(true) {
     max_count_ = 15000;
@@ -100,13 +102,14 @@ class VolatileContentRepository : public core::ContentRepository, public core::r
   virtual bool remove(const std::shared_ptr<minifi::ResourceClaim> &claim);
 
  protected:
+
   virtual void start();
 
   virtual void run();
 
   template<typename T2>
   std::shared_ptr<T2> shared_from_parent() {
-    return std::static_pointer_cast<T2>(shared_from_this());
+    return std::dynamic_pointer_cast<T2>(shared_from_this());
   }
 
  private:

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/libminifi/include/core/repository/VolatileFlowFileRepository.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/repository/VolatileFlowFileRepository.h b/libminifi/include/core/repository/VolatileFlowFileRepository.h
index 0e75580..1a6be6b 100644
--- a/libminifi/include/core/repository/VolatileFlowFileRepository.h
+++ b/libminifi/include/core/repository/VolatileFlowFileRepository.h
@@ -19,6 +19,7 @@
 #define LIBMINIFI_INCLUDE_CORE_REPOSITORY_VOLATILEFLOWFILEREPOSITORY_H_
 
 #include "VolatileRepository.h"
+#include "FlowFileRecord.h"
 
 namespace org {
 namespace apache {
@@ -36,7 +37,8 @@ class VolatileFlowFileRepository : public VolatileRepository<std::string> {
   explicit VolatileFlowFileRepository(std::string repo_name = "", std::string dir = REPOSITORY_DIRECTORY, int64_t maxPartitionMillis = MAX_REPOSITORY_ENTRY_LIFE_TIME, int64_t maxPartitionBytes =
   MAX_REPOSITORY_STORAGE_SIZE,
                                       uint64_t purgePeriod = REPOSITORY_PURGE_PERIOD)
-      : VolatileRepository(repo_name.length() > 0 ? repo_name : core::getClassName<VolatileRepository>(), "", maxPartitionMillis, maxPartitionBytes, purgePeriod)
+      : core::SerializableComponent(repo_name, 0),
+        VolatileRepository(repo_name.length() > 0 ? repo_name : core::getClassName<VolatileRepository>(), "", maxPartitionMillis, maxPartitionBytes, purgePeriod)
 
   {
     purge_required_ = true;
@@ -50,8 +52,11 @@ class VolatileFlowFileRepository : public VolatileRepository<std::string> {
       if (purge_required_ && nullptr != content_repo_) {
         std::lock_guard<std::mutex> lock(purge_mutex_);
         for (auto purgeItem : purge_list_) {
-          std::shared_ptr<minifi::ResourceClaim> newClaim = std::make_shared<minifi::ResourceClaim>(purgeItem, content_repo_, true);
-          content_repo_->remove(newClaim);
+          std::shared_ptr<FlowFileRecord> eventRead = std::make_shared<FlowFileRecord>(shared_from_this(), content_repo_);
+          if (eventRead->DeSerialize(reinterpret_cast<const uint8_t *>(purgeItem.data()), purgeItem.size())) {
+            std::shared_ptr<minifi::ResourceClaim> newClaim = eventRead->getResourceClaim();
+            content_repo_->remove(newClaim);
+          }
         }
         purge_list_.resize(0);
         purge_list_.clear();
@@ -66,6 +71,13 @@ class VolatileFlowFileRepository : public VolatileRepository<std::string> {
 
  protected:
 
+  virtual void emplace(RepoValue<std::string> &old_value) {
+    std::string buffer;
+    old_value.emplace(buffer);
+    std::lock_guard<std::mutex> lock(purge_mutex_);
+    purge_list_.push_back(buffer);
+  }
+
   std::shared_ptr<core::ContentRepository> content_repo_;
 
 };

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/libminifi/include/core/repository/VolatileProvenanceRepository.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/repository/VolatileProvenanceRepository.h b/libminifi/include/core/repository/VolatileProvenanceRepository.h
index 2717510..cabb76f 100644
--- a/libminifi/include/core/repository/VolatileProvenanceRepository.h
+++ b/libminifi/include/core/repository/VolatileProvenanceRepository.h
@@ -36,7 +36,7 @@ class VolatileProvenanceRepository : public VolatileRepository<std::string> {
   explicit VolatileProvenanceRepository(std::string repo_name = "", std::string dir = REPOSITORY_DIRECTORY, int64_t maxPartitionMillis = MAX_REPOSITORY_ENTRY_LIFE_TIME, int64_t maxPartitionBytes =
   MAX_REPOSITORY_STORAGE_SIZE,
                                         uint64_t purgePeriod = REPOSITORY_PURGE_PERIOD)
-      : VolatileRepository(repo_name.length() > 0 ? repo_name : core::getClassName<VolatileRepository>(), "", maxPartitionMillis, maxPartitionBytes, purgePeriod)
+      : core::SerializableComponent(repo_name, 0),VolatileRepository(repo_name.length() > 0 ? repo_name : core::getClassName<VolatileRepository>(), "", maxPartitionMillis, maxPartitionBytes, purgePeriod)
 
   {
     purge_required_ = false;
@@ -45,6 +45,10 @@ class VolatileProvenanceRepository : public VolatileRepository<std::string> {
   virtual void run() {
     repo_full_ = false;
   }
+ protected:
+  virtual void emplace(RepoValue<std::string> &old_value) {
+    purge_list_.push_back(old_value.getKey());
+  }
  private:
 
 };

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/libminifi/include/core/repository/VolatileRepository.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/repository/VolatileRepository.h b/libminifi/include/core/repository/VolatileRepository.h
index da6608c..3b5d5e1 100644
--- a/libminifi/include/core/repository/VolatileRepository.h
+++ b/libminifi/include/core/repository/VolatileRepository.h
@@ -37,7 +37,7 @@ namespace repository {
 
 /**
  * Flow File repository
- * Design: Extends Repository and implements the run function, using LevelDB as the primary substrate.
+ * Design: Extends Repository and implements the run function, using RocksDB as the primary substrate.
  */
 template<typename T>
 class VolatileRepository : public core::Repository, public std::enable_shared_from_this<VolatileRepository<T>> {
@@ -50,7 +50,8 @@ class VolatileRepository : public core::Repository, public std::enable_shared_fr
   explicit VolatileRepository(std::string repo_name = "", std::string dir = REPOSITORY_DIRECTORY, int64_t maxPartitionMillis = MAX_REPOSITORY_ENTRY_LIFE_TIME, int64_t maxPartitionBytes =
   MAX_REPOSITORY_STORAGE_SIZE,
                               uint64_t purgePeriod = REPOSITORY_PURGE_PERIOD)
-      : Repository(repo_name.length() > 0 ? repo_name : core::getClassName<VolatileRepository>(), "", maxPartitionMillis, maxPartitionBytes, purgePeriod),
+      : core::SerializableComponent(repo_name, 0),
+        Repository(repo_name.length() > 0 ? repo_name : core::getClassName<VolatileRepository>(), "", maxPartitionMillis, maxPartitionBytes, purgePeriod),
         max_size_(maxPartitionBytes * 0.75),
         current_index_(0),
         max_count_(10000),
@@ -121,6 +122,11 @@ class VolatileRepository : public core::Repository, public std::enable_shared_fr
 
  protected:
 
+  virtual void emplace(RepoValue<T> &old_value) {
+    std::lock_guard<std::mutex> lock(purge_mutex_);
+    purge_list_.push_back(old_value.getKey());
+  }
+
   /**
    * Tests whether or not the current size exceeds the capacity
    * if the new prospectiveSize is inserted.
@@ -208,7 +214,7 @@ bool VolatileRepository<T>::initialize(const std::shared_ptr<Configure> &configu
   }
 
   logger_->log_info("Resizing value_vector_ for %s count is %d", getName(), max_count_);
-  logger_->log_info("Using a maximum size of %u", max_size_);
+  logger_->log_info("Using a maximum size for %s of %u", getName(), max_size_);
   value_vector_.reserve(max_count_);
   for (int i = 0; i < max_count_; i++) {
     value_vector_.emplace_back(new AtomicEntry<T>(&current_size_, &max_size_));
@@ -245,7 +251,7 @@ bool VolatileRepository<T>::Put(T key, const uint8_t *buf, size_t bufLen) {
     logger_->log_debug("Set repo value at %d out of %d updated %d current_size %d, adding %d to  %d", private_index, max_count_, updated == true, reclaimed_size, size, current_size_.load());
     if (updated && reclaimed_size > 0) {
       std::lock_guard<std::mutex> lock(mutex_);
-      purge_list_.push_back(old_value.getKey());
+      emplace(old_value);
     }
     if (reclaimed_size > 0) {
       /**
@@ -270,11 +276,14 @@ bool VolatileRepository<T>::Put(T key, const uint8_t *buf, size_t bufLen) {
  */
 template<typename T>
 bool VolatileRepository<T>::Delete(T key) {
+  logger_->log_debug("Delete from volatile");
   for (auto ent : value_vector_) {
     // let the destructor do the cleanup
     RepoValue<T> value;
     if (ent->getValue(key, value)) {
       current_size_ -= value.size();
+      logger_->log_debug("Delete and pushed into purge_list from volatile");
+      emplace(value);
       return true;
     }
   }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/libminifi/include/properties/Configure.h
----------------------------------------------------------------------
diff --git a/libminifi/include/properties/Configure.h b/libminifi/include/properties/Configure.h
index f71ce9c..0d7bb55 100644
--- a/libminifi/include/properties/Configure.h
+++ b/libminifi/include/properties/Configure.h
@@ -51,6 +51,7 @@ class Configure : public Properties {
   static const char *nifi_provenance_repository_directory_default;
   static const char *nifi_provenance_repository_enable;
   static const char *nifi_flowfile_repository_max_storage_time;
+  static const char *nifi_dbcontent_repository_directory_default;
   static const char *nifi_flowfile_repository_max_storage_size;
   static const char *nifi_flowfile_repository_directory_default;
   static const char *nifi_flowfile_repository_enable;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/libminifi/include/provenance/ProvenanceRepository.h
----------------------------------------------------------------------
diff --git a/libminifi/include/provenance/ProvenanceRepository.h b/libminifi/include/provenance/ProvenanceRepository.h
deleted file mode 100644
index 53f489f..0000000
--- a/libminifi/include/provenance/ProvenanceRepository.h
+++ /dev/null
@@ -1,250 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef LIBMINIFI_INCLUDE_PROVENANCE_PROVENANCEREPOSITORY_H_
-#define LIBMINIFI_INCLUDE_PROVENANCE_PROVENANCEREPOSITORY_H_
-
-#include "leveldb/db.h"
-#include "leveldb/options.h"
-#include "leveldb/slice.h"
-#include "leveldb/status.h"
-#include "core/Repository.h"
-#include "core/Core.h"
-#include "provenance/Provenance.h"
-#include "core/logging/LoggerConfiguration.h"
-#include "concurrentqueue.h"
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace provenance {
-
-#define PROVENANCE_DIRECTORY "./provenance_repository"
-#define MAX_PROVENANCE_STORAGE_SIZE (10*1024*1024) // 10M
-#define MAX_PROVENANCE_ENTRY_LIFE_TIME (60000) // 1 minute
-#define PROVENANCE_PURGE_PERIOD (2500) // 2500 msec
-
-class ProvenanceRepository : public core::Repository, public std::enable_shared_from_this<ProvenanceRepository> {
- public:
-  // Constructor
-  /*!
-   * Create a new provenance repository
-   */
-  ProvenanceRepository(const std::string repo_name = "", std::string directory = PROVENANCE_DIRECTORY, int64_t maxPartitionMillis = MAX_PROVENANCE_ENTRY_LIFE_TIME, int64_t maxPartitionBytes =
-  MAX_PROVENANCE_STORAGE_SIZE,
-                       uint64_t purgePeriod = PROVENANCE_PURGE_PERIOD)
-      : Repository(repo_name.length() > 0 ? repo_name : core::getClassName<ProvenanceRepository>(), directory, maxPartitionMillis, maxPartitionBytes, purgePeriod),
-        logger_(logging::LoggerFactory<ProvenanceRepository>::getLogger()) {
-    db_ = NULL;
-  }
-
-  // Destructor
-  virtual ~ProvenanceRepository() {
-    if (db_)
-      delete db_;
-  }
-
-  virtual void flush();
-
-  void start() {
-    if (this->purge_period_ <= 0)
-      return;
-    if (running_)
-      return;
-    running_ = true;
-    thread_ = std::thread(&ProvenanceRepository::run, shared_from_this());
-    logger_->log_info("%s Repository Monitor Thread Start", name_.c_str());
-  }
-
-  // initialize
-  virtual bool initialize(const std::shared_ptr<org::apache::nifi::minifi::Configure> &config) {
-    std::string value;
-    if (config->get(Configure::nifi_provenance_repository_directory_default, value)) {
-      directory_ = value;
-    }
-    logger_->log_info("NiFi Provenance Repository Directory %s", directory_.c_str());
-    if (config->get(Configure::nifi_provenance_repository_max_storage_size, value)) {
-      core::Property::StringToInt(value, max_partition_bytes_);
-    }
-    logger_->log_info("NiFi Provenance Max Partition Bytes %d", max_partition_bytes_);
-    if (config->get(Configure::nifi_provenance_repository_max_storage_time, value)) {
-      core::TimeUnit unit;
-      if (core::Property::StringToTime(value, max_partition_millis_, unit) && core::Property::ConvertTimeUnitToMS(max_partition_millis_, unit, max_partition_millis_)) {
-      }
-    }
-    logger_->log_info("NiFi Provenance Max Storage Time: [%d] ms", max_partition_millis_);
-    leveldb::Options options;
-    options.create_if_missing = true;
-    leveldb::Status status = leveldb::DB::Open(options, directory_.c_str(), &db_);
-    if (status.ok()) {
-      logger_->log_info("NiFi Provenance Repository database open %s success", directory_.c_str());
-    } else {
-      logger_->log_error("NiFi Provenance Repository database open %s fail", directory_.c_str());
-      return false;
-    }
-
-    return true;
-  }
-  // Put
-  virtual bool Put(std::string key, const uint8_t *buf, size_t bufLen) {
-
-    if (repo_full_) {
-      return false;
-    }
-
-    // persistent to the DB
-    leveldb::Slice value((const char *) buf, bufLen);
-    leveldb::Status status;
-    repo_size_ += bufLen;
-    status = db_->Put(leveldb::WriteOptions(), key, value);
-    if (status.ok())
-      return true;
-    else
-      return false;
-  }
-  // Delete
-  virtual bool Delete(std::string key) {
-    keys_to_delete.enqueue(key);
-    return true;
-  }
-  // Get
-  virtual bool Get(const std::string &key, std::string &value) {
-    leveldb::Status status;
-    status = db_->Get(leveldb::ReadOptions(), key, &value);
-    if (status.ok())
-      return true;
-    else
-      return false;
-  }
-
-  // Remove event
-  void removeEvent(ProvenanceEventRecord *event) {
-    Delete(event->getEventId());
-  }
-
-  virtual bool Serialize(const std::string &key, const uint8_t *buffer, const size_t bufferSize) {
-    return Put(key, buffer, bufferSize);
-  }
-
-  virtual bool get(std::vector<std::shared_ptr<core::CoreComponent>> &store, size_t max_size) {
-    leveldb::Iterator* it = db_->NewIterator(leveldb::ReadOptions());
-    for (it->SeekToFirst(); it->Valid(); it->Next()) {
-      std::shared_ptr<ProvenanceEventRecord> eventRead = std::make_shared<ProvenanceEventRecord>();
-      std::string key = it->key().ToString();
-      if (store.size() >= max_size)
-        break;
-      if (eventRead->DeSerialize((uint8_t *) it->value().data(), (int) it->value().size())) {
-        store.push_back(std::dynamic_pointer_cast<core::CoreComponent>(eventRead));
-      }
-    }
-    delete it;
-    return true;
-  }
-
-  virtual bool DeSerialize(std::vector<std::shared_ptr<core::SerializableComponent>> &records, size_t &max_size, std::function<std::shared_ptr<core::SerializableComponent>()> lambda) {
-    leveldb::Iterator* it = db_->NewIterator(leveldb::ReadOptions());
-    size_t requested_batch = max_size;
-    max_size = 0;
-    for (it->SeekToFirst(); it->Valid(); it->Next()) {
-
-      if (max_size >= requested_batch)
-        break;
-      std::shared_ptr<core::SerializableComponent> eventRead = lambda();
-      std::string key = it->key().ToString();
-      if (eventRead->DeSerialize((uint8_t *) it->value().data(), (int) it->value().size())) {
-        max_size++;
-        records.push_back(eventRead);
-      }
-
-    }
-    delete it;
-
-    if (max_size > 0) {
-      return true;
-    } else {
-      return false;
-    }
-  }
-  //! get record
-  void getProvenanceRecord(std::vector<std::shared_ptr<ProvenanceEventRecord>> &records, int maxSize) {
-    leveldb::Iterator* it = db_->NewIterator(leveldb::ReadOptions());
-    for (it->SeekToFirst(); it->Valid(); it->Next()) {
-      std::shared_ptr<ProvenanceEventRecord> eventRead = std::make_shared<ProvenanceEventRecord>();
-      std::string key = it->key().ToString();
-      if (records.size() >= maxSize)
-        break;
-      if (eventRead->DeSerialize((uint8_t *) it->value().data(), (int) it->value().size())) {
-        records.push_back(eventRead);
-      }
-    }
-    delete it;
-  }
-
-  virtual bool DeSerialize(std::vector<std::shared_ptr<core::SerializableComponent>> &store, size_t &max_size) {
-    leveldb::Iterator* it = db_->NewIterator(leveldb::ReadOptions());
-    max_size = 0;
-    for (it->SeekToFirst(); it->Valid(); it->Next()) {
-      std::shared_ptr<ProvenanceEventRecord> eventRead = std::make_shared<ProvenanceEventRecord>();
-      std::string key = it->key().ToString();
-
-      if (store.at(max_size)->DeSerialize((uint8_t *) it->value().data(), (int) it->value().size())) {
-        max_size++;
-      }
-      if (store.size() >= max_size)
-        break;
-    }
-    delete it;
-    if (max_size > 0) {
-      return true;
-    } else {
-      return false;
-    }
-  }
-  //! purge record
-  void purgeProvenanceRecord(std::vector<std::shared_ptr<ProvenanceEventRecord>> &records) {
-    for (auto record : records) {
-      Delete(record->getEventId());
-    }
-    flush();
-  }
-  // destroy
-  void destroy() {
-    if (db_) {
-      delete db_;
-      db_ = NULL;
-    }
-  }
-  // Run function for the thread
-  void run();
-
-  // Prevent default copy constructor and assignment operation
-  // Only support pass by reference or pointer
-  ProvenanceRepository(const ProvenanceRepository &parent) = delete;
-  ProvenanceRepository &operator=(const ProvenanceRepository &parent) = delete;
-
- private:
-  moodycamel::ConcurrentQueue<std::string> keys_to_delete;
-  leveldb::DB* db_;
-  std::shared_ptr<logging::Logger> logger_;
-};
-
-} /* namespace provenance */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
-#endif /* LIBMINIFI_INCLUDE_PROVENANCE_PROVENANCEREPOSITORY_H_ */
-

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/libminifi/src/Configure.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/Configure.cpp b/libminifi/src/Configure.cpp
index 9dcd6c6..db9840e 100644
--- a/libminifi/src/Configure.cpp
+++ b/libminifi/src/Configure.cpp
@@ -44,6 +44,7 @@ const char *Configure::nifi_provenance_repository_directory_default = "nifi.prov
 const char *Configure::nifi_flowfile_repository_max_storage_size = "nifi.flowfile.repository.max.storage.size";
 const char *Configure::nifi_flowfile_repository_max_storage_time = "nifi.flowfile.repository.max.storage.time";
 const char *Configure::nifi_flowfile_repository_directory_default = "nifi.flowfile.repository.directory.default";
+const char *Configure::nifi_dbcontent_repository_directory_default = "nifi.database.content.repository.directory.default";
 const char *Configure::nifi_remote_input_secure = "nifi.remote.input.secure";
 const char *Configure::nifi_security_need_ClientAuth = "nifi.security.need.ClientAuth";
 const char *Configure::nifi_security_client_certificate = "nifi.security.client.certificate";

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/libminifi/src/FlowController.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index 6485e6c..28294c9 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -49,7 +49,7 @@
 #include "SchedulingAgent.h"
 #include "core/controller/ControllerServiceProvider.h"
 #include "core/logging/LoggerConfiguration.h"
-#include "core/repository/FlowFileRepository.h"
+#include "core/Connectable.h"
 
 namespace org {
 namespace apache {
@@ -306,15 +306,12 @@ void FlowController::reload(std::string yamlFile) {
 void FlowController::loadFlowRepo() {
   if (this->flow_file_repo_ != nullptr) {
     logger_->log_debug("Getting connection map");
-    std::map<std::string, std::shared_ptr<Connection>> connectionMap;
+    std::map<std::string, std::shared_ptr<core::Connectable>> connectionMap;
     if (this->root_ != nullptr) {
       this->root_->getConnections(connectionMap);
     }
     logger_->log_debug("Number of connections from connectionMap %d", connectionMap.size());
-    auto rep = std::dynamic_pointer_cast<core::repository::FlowFileRepository>(flow_file_repo_);
-    if (nullptr != rep) {
-      rep->setConnectionMap(connectionMap);
-    }
+    flow_file_repo_->setConnectionMap(connectionMap);
     flow_file_repo_->loadComponent(content_repo_);
   } else {
     logger_->log_debug("Flow file repository is not set");

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/libminifi/src/FlowFileRecord.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/FlowFileRecord.cpp b/libminifi/src/FlowFileRecord.cpp
index b97f290..c89a00b 100644
--- a/libminifi/src/FlowFileRecord.cpp
+++ b/libminifi/src/FlowFileRecord.cpp
@@ -77,7 +77,7 @@ FlowFileRecord::FlowFileRecord(std::shared_ptr<core::Repository> flow_repository
   entry_date_ = event->getEntryDate();
   lineage_start_date_ = event->getlineageStartDate();
   lineage_Identifiers_ = event->getlineageIdentifiers();
-  uuid_str_ = event->getUUIDStr();
+  uuidStr_ = event->getUUIDStr();
   attributes_ = event->getAttributes();
   size_ = event->getSize();
   offset_ = event->getOffset();
@@ -99,16 +99,16 @@ FlowFileRecord::FlowFileRecord(std::shared_ptr<core::Repository> flow_repository
 
 FlowFileRecord::~FlowFileRecord() {
   if (!snapshot_)
-    logger_->log_debug("Delete FlowFile UUID %s", uuid_str_.c_str());
+    logger_->log_debug("Delete FlowFile UUID %s", uuidStr_.c_str());
   else
-    logger_->log_debug("Delete SnapShot FlowFile UUID %s", uuid_str_.c_str());
+    logger_->log_debug("Delete SnapShot FlowFile UUID %s", uuidStr_.c_str());
   if (claim_) {
     // Decrease the flow file record owned count for the resource claim
     claim_->decreaseFlowFileRecordOwnedCount();
     std::string value;
     if (claim_->getFlowFileRecordOwnedCount() <= 0) {
       // we cannot rely on the stored variable here since we
-      if (flow_repository_ != nullptr && !flow_repository_->Get(uuid_str_, value)) {
+      if (flow_repository_ != nullptr && !flow_repository_->Get(uuidStr_, value)) {
         logger_->log_debug("Delete Resource Claim %s", claim_->getContentFullPath().c_str());
         content_repo_->remove(claim_);
       }
@@ -181,9 +181,9 @@ bool FlowFileRecord::DeSerialize(std::string key) {
   ret = DeSerialize(stream);
 
   if (ret) {
-    logger_->log_debug("NiFi FlowFile retrieve uuid %s size %d connection %s success", uuid_str_.c_str(), stream.getSize(), uuid_connection_.c_str());
+    logger_->log_debug("NiFi FlowFile retrieve uuid %s size %d connection %s success", uuidStr_.c_str(), stream.getSize(), uuid_connection_.c_str());
   } else {
-    logger_->log_debug("NiFi FlowFile retrieve uuid %s size %d connection %d fail", uuid_str_.c_str(), stream.getSize(), uuid_connection_.c_str());
+    logger_->log_debug("NiFi FlowFile retrieve uuid %s size %d connection %d fail", uuidStr_.c_str(), stream.getSize(), uuid_connection_.c_str());
   }
 
   return ret;
@@ -209,7 +209,7 @@ bool FlowFileRecord::Serialize() {
     return false;
   }
 
-  ret = writeUTF(this->uuid_str_, &outStream);
+  ret = writeUTF(this->uuidStr_, &outStream);
   if (ret <= 0) {
     return false;
   }
@@ -251,11 +251,11 @@ bool FlowFileRecord::Serialize() {
     return false;
   }
 
-  if (flow_repository_->Put(uuid_str_, const_cast<uint8_t*>(outStream.getBuffer()), outStream.getSize())) {
-    logger_->log_debug("NiFi FlowFile Store event %s size %d success", uuid_str_.c_str(), outStream.getSize());
+  if (flow_repository_->Put(uuidStr_, const_cast<uint8_t*>(outStream.getBuffer()), outStream.getSize())) {
+    logger_->log_debug("NiFi FlowFile Store event %s size %d success", uuidStr_.c_str(), outStream.getSize());
     return true;
   } else {
-    logger_->log_error("NiFi FlowFile Store event %s size %d fail", uuid_str_.c_str(), outStream.getSize());
+    logger_->log_error("NiFi FlowFile Store event %s size %d fail", uuidStr_.c_str(), outStream.getSize());
     return false;
   }
 
@@ -282,7 +282,7 @@ bool FlowFileRecord::DeSerialize(const uint8_t *buffer, const int bufferSize) {
     return false;
   }
 
-  ret = readUTF(this->uuid_str_, &outStream);
+  ret = readUTF(this->uuidStr_, &outStream);
   if (ret <= 0) {
     return false;
   }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/libminifi/src/core/FlowConfiguration.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/FlowConfiguration.cpp b/libminifi/src/core/FlowConfiguration.cpp
index 16a9778..68b7520 100644
--- a/libminifi/src/core/FlowConfiguration.cpp
+++ b/libminifi/src/core/FlowConfiguration.cpp
@@ -29,12 +29,11 @@ namespace minifi {
 namespace core {
 
 std::vector<std::string> FlowConfiguration::statics_sl_funcs_;
+std::mutex FlowConfiguration::atomic_initialization_;
 
 FlowConfiguration::~FlowConfiguration() {
 }
 
-
-
 std::shared_ptr<core::Processor> FlowConfiguration::createProcessor(std::string name, uuid_t uuid) {
   auto ptr = core::ClassLoader::getDefaultClassLoader().instantiate(name, uuid);
   if (nullptr == ptr) {

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/libminifi/src/core/FlowFile.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/FlowFile.cpp b/libminifi/src/core/FlowFile.cpp
index d08ea4b..f5d8754 100644
--- a/libminifi/src/core/FlowFile.cpp
+++ b/libminifi/src/core/FlowFile.cpp
@@ -33,7 +33,8 @@ std::shared_ptr<utils::IdGenerator> FlowFile::id_generator_ = utils::IdGenerator
 std::shared_ptr<logging::Logger> FlowFile::logger_ = logging::LoggerFactory<FlowFile>::getLogger();
 
 FlowFile::FlowFile()
-    : size_(0),
+    : Connectable("FlowFile", 0),
+      size_(0),
       id_(0),
       stored(false),
       offset_(0),
@@ -47,14 +48,6 @@ FlowFile::FlowFile()
   entry_date_ = getTimeMillis();
   event_time_ = entry_date_;
   lineage_start_date_ = entry_date_;
-
-  char uuidStr[37] = { 0 };
-
-  // Generate the global UUID for the flow record
-  id_generator_->generate(uuid_);
-
-  uuid_unparse_lower(uuid_, uuidStr);
-  uuid_str_ = uuidStr;
 }
 
 FlowFile::~FlowFile() {
@@ -74,7 +67,7 @@ FlowFile& FlowFile::operator=(const FlowFile& other) {
   claim_ = other.claim_;
   if (claim_ != nullptr)
     this->claim_->increaseFlowFileRecordOwnedCount();
-  uuid_str_ = other.uuid_str_;
+  uuidStr_ = other.uuidStr_;
   connection_ = other.connection_;
   original_connection_ = other.original_connection_;
   return *this;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/libminifi/src/core/ProcessGroup.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/ProcessGroup.cpp b/libminifi/src/core/ProcessGroup.cpp
index a537f1a..5915089 100644
--- a/libminifi/src/core/ProcessGroup.cpp
+++ b/libminifi/src/core/ProcessGroup.cpp
@@ -259,6 +259,16 @@ void ProcessGroup::getConnections(std::map<std::string, std::shared_ptr<Connecti
   }
 }
 
+void ProcessGroup::getConnections(std::map<std::string, std::shared_ptr<Connectable>> &connectionMap) {
+  for (auto connection : connections_) {
+    connectionMap[connection->getUUIDStr()] = connection;
+    connectionMap[connection->getName()] = connection;
+  }
+  for (auto processGroup : child_process_groups_) {
+    processGroup->getConnections(connectionMap);
+  }
+}
+
 void ProcessGroup::addConnection(std::shared_ptr<Connection> connection) {
   std::lock_guard<std::recursive_mutex> lock(mutex_);
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/libminifi/src/core/Repository.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/Repository.cpp b/libminifi/src/core/Repository.cpp
index be21b16..575f694 100644
--- a/libminifi/src/core/Repository.cpp
+++ b/libminifi/src/core/Repository.cpp
@@ -20,7 +20,6 @@
 #include <cstdint>
 #include <vector>
 
-#include "../../include/core/repository/FlowFileRepository.h"
 #include "io/DataStream.h"
 #include "io/Serializable.h"
 #include "core/Relationship.h"

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/libminifi/src/core/RepositoryFactory.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/RepositoryFactory.cpp b/libminifi/src/core/RepositoryFactory.cpp
index b25e87c..d753a90 100644
--- a/libminifi/src/core/RepositoryFactory.cpp
+++ b/libminifi/src/core/RepositoryFactory.cpp
@@ -17,52 +17,45 @@
 #include "core/RepositoryFactory.h"
 #include <memory>
 #include <string>
+#include <utility>
 #include <algorithm>
 #include "core/ContentRepository.h"
-#include "core/repository/FileSystemRepository.h"
 #include "core/repository/VolatileContentRepository.h"
 #include "core/Repository.h"
-#ifdef LEVELDB_SUPPORT
-#include "core/repository/FlowFileRepository.h"
-#include "provenance/ProvenanceRepository.h"
-#endif
-
-#include "core/repository/VolatileProvenanceRepository.h"
+#include "core/ClassLoader.h"
+#include "core/repository/FileSystemRepository.h"
 #include "core/repository/VolatileFlowFileRepository.h"
+#include "core/repository/VolatileProvenanceRepository.h"
 
 namespace org {
 namespace apache {
 namespace nifi {
 namespace minifi {
-#ifndef LEVELDB_SUPPORT
-namespace provenance {
-class ProvenanceRepository;
-}
-#endif
 namespace core {
 
-#ifndef LEVELDB_SUPPORT
-class FlowFileRepository;
-#endif
-
 std::shared_ptr<core::Repository> createRepository(const std::string configuration_class_name, bool fail_safe, const std::string repo_name) {
   std::shared_ptr<core::Repository> return_obj = nullptr;
   std::string class_name_lc = configuration_class_name;
   std::transform(class_name_lc.begin(), class_name_lc.end(), class_name_lc.begin(), ::tolower);
   try {
     std::shared_ptr<core::Repository> return_obj = nullptr;
-    if (class_name_lc == "flowfilerepository") {
-      return_obj = instantiate<core::repository::FlowFileRepository>(repo_name);
-    } else if (class_name_lc == "provenancerepository") {
-      return_obj = instantiate<provenance::ProvenanceRepository>(repo_name);
-    } else if (class_name_lc == "volatileflowfilerepository") {
+
+    auto ptr = core::ClassLoader::getDefaultClassLoader().instantiate<core::Repository>(class_name_lc, class_name_lc);
+    if (nullptr != ptr) {
+      return_obj = ptr;
+    }
+
+    if (return_obj) {
+      return return_obj;
+    }
+    // if the desired repos don't exist, we can try doing string matches and reoly on volatile repositories
+    if (class_name_lc == "flowfilerepository" || class_name_lc == "volatileflowfilerepository") {
       return_obj = instantiate<repository::VolatileFlowFileRepository>(repo_name);
-    } else if (class_name_lc == "volatileprovenancefilerepository") {
+    } else if (class_name_lc == "provenancerepository" || class_name_lc == "volatileprovenancefilerepository") {
       return_obj = instantiate<repository::VolatileProvenanceRepository>(repo_name);
     } else if (class_name_lc == "nooprepository") {
       return_obj = instantiate<core::Repository>(repo_name);
     }
-
     if (return_obj) {
       return return_obj;
     }
@@ -86,23 +79,27 @@ std::shared_ptr<core::ContentRepository> createContentRepository(const std::stri
   std::transform(class_name_lc.begin(), class_name_lc.end(), class_name_lc.begin(), ::tolower);
   try {
     std::shared_ptr<core::ContentRepository> return_obj = nullptr;
-    if (class_name_lc == "volatilecontentrepository") {
-      return_obj = instantiate<core::repository::VolatileContentRepository>(repo_name);
-    } else {
-      return_obj = instantiate<core::repository::FileSystemRepository>(repo_name);
-    }
 
+    auto ptr = core::ClassLoader::getDefaultClassLoader().instantiate<core::ContentRepository>(class_name_lc, class_name_lc);
+    if (nullptr != ptr) {
+      return_obj = ptr;
+    }
     if (return_obj) {
       return return_obj;
     }
+    if (class_name_lc == "volatilecontentrepository") {
+      return std::make_shared<core::repository::VolatileContentRepository>(repo_name);
+    } else if (class_name_lc == "filesystemrepository") {
+      return std::make_shared<core::repository::FileSystemRepository>(repo_name);
+    }
     if (fail_safe) {
-      return std::make_shared<core::repository::FileSystemRepository>("fail_safe");
+      return std::make_shared<core::repository::VolatileContentRepository>("fail_safe");
     } else {
       throw std::runtime_error("Support for the provided configuration class could not be found");
     }
   } catch (const std::runtime_error &r) {
     if (fail_safe) {
-      return std::make_shared<core::repository::FileSystemRepository>("fail_safe");
+      return std::make_shared<core::repository::VolatileContentRepository>("fail_safe");
     }
   }
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/libminifi/src/core/repository/FlowFileRepository.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/repository/FlowFileRepository.cpp b/libminifi/src/core/repository/FlowFileRepository.cpp
deleted file mode 100644
index 3ed7fbf..0000000
--- a/libminifi/src/core/repository/FlowFileRepository.cpp
+++ /dev/null
@@ -1,140 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "core/repository/FlowFileRepository.h"
-#include "leveldb/write_batch.h"
-#include <memory>
-#include <string>
-#include <utility>
-#include <vector>
-#include "FlowFileRecord.h"
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace core {
-namespace repository {
-
-void FlowFileRepository::flush() {
-  leveldb::WriteBatch batch;
-  std::string key;
-  std::string value;
-  leveldb::ReadOptions options;
-
-  std::vector<std::shared_ptr<FlowFileRecord>> purgeList;
-
-  uint64_t decrement_total = 0;
-  while (keys_to_delete.size_approx() > 0) {
-    if (keys_to_delete.try_dequeue(key)) {
-      db_->Get(options, key, &value);
-      decrement_total += value.size();
-      std::shared_ptr<FlowFileRecord> eventRead = std::make_shared<FlowFileRecord>(shared_from_this(), content_repo_);
-      if (eventRead->DeSerialize(reinterpret_cast<const uint8_t *>(value.data()), value.size())) {
-        purgeList.push_back(eventRead);
-      }
-      logger_->log_info("Issuing batch delete, including %s, Content path %s", eventRead->getUUIDStr(), eventRead->getContentFullPath());
-      batch.Delete(key);
-    }
-  }
-  if (db_->Write(leveldb::WriteOptions(), &batch).ok()) {
-    logger_->log_info("Decrementing %u from a repo size of %u", decrement_total, repo_size_.load());
-    if (decrement_total > repo_size_.load()) {
-      repo_size_ = 0;
-    } else {
-      repo_size_ -= decrement_total;
-    }
-  }
-
-  if (nullptr != content_repo_) {
-    for (const auto &ffr : purgeList) {
-      auto claim = ffr->getResourceClaim();
-      if (claim != nullptr) {
-        content_repo_->removeIfOrphaned(claim);
-      }
-    }
-  }
-}
-
-void FlowFileRepository::run() {
-  // threshold for purge
-  uint64_t purgeThreshold = max_partition_bytes_ * 3 / 4;
-
-  while (running_) {
-    std::this_thread::sleep_for(std::chrono::milliseconds(purge_period_));
-    uint64_t curTime = getTimeMillis();
-
-    flush();
-
-    uint64_t size = getRepoSize();
-
-    if (size > max_partition_bytes_)
-      repo_full_ = true;
-    else
-      repo_full_ = false;
-  }
-}
-
-void FlowFileRepository::loadComponent(const std::shared_ptr<core::ContentRepository> &content_repo) {
-  content_repo_ = content_repo;
-  std::vector<std::pair<std::string, uint64_t>> purgeList;
-  leveldb::Iterator* it = db_->NewIterator(leveldb::ReadOptions());
-
-  repo_size_ = 0;
-  for (it->SeekToFirst(); it->Valid(); it->Next()) {
-    std::shared_ptr<FlowFileRecord> eventRead = std::make_shared<FlowFileRecord>(shared_from_this(), content_repo_);
-    std::string key = it->key().ToString();
-    repo_size_ += it->value().size();
-    if (eventRead->DeSerialize(reinterpret_cast<const uint8_t *>(it->value().data()), it->value().size())) {
-      logger_->log_info("Found connection for %s, path %s ", eventRead->getConnectionUuid(), eventRead->getContentFullPath());
-      auto search = connectionMap.find(eventRead->getConnectionUuid());
-      if (search != connectionMap.end()) {
-        // we find the connection for the persistent flowfile, create the flowfile and enqueue that
-        std::shared_ptr<core::FlowFile> flow_file_ref = std::static_pointer_cast<core::FlowFile>(eventRead);
-        eventRead->setStoredToRepository(true);
-        search->second->put(eventRead);
-      } else {
-        logger_->log_info("Could not find connectinon for %s, path %s ", eventRead->getConnectionUuid(), eventRead->getContentFullPath());
-        if (eventRead->getContentFullPath().length() > 0) {
-          if (nullptr != eventRead->getResourceClaim()) {
-            content_repo_->remove(eventRead->getResourceClaim());
-          }
-        }
-        purgeList.push_back(std::make_pair(key, it->value().size()));
-      }
-    } else {
-      purgeList.push_back(std::make_pair(key, it->value().size()));
-    }
-  }
-
-  delete it;
-  for (auto eventId : purgeList) {
-    logger_->log_info("Repository Repo %s Purge %s", name_.c_str(), eventId.first.c_str());
-    if (Delete(eventId.first)) {
-      repo_size_ -= eventId.second;
-    }
-  }
-
-  return;
-}
-
-} /* namespace repository */
-} /* namespace core */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/libminifi/src/core/repository/VolatileContentRepository.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/repository/VolatileContentRepository.cpp b/libminifi/src/core/repository/VolatileContentRepository.cpp
index 65f1cf9..d3e696b 100644
--- a/libminifi/src/core/repository/VolatileContentRepository.cpp
+++ b/libminifi/src/core/repository/VolatileContentRepository.cpp
@@ -83,11 +83,11 @@ void VolatileContentRepository::start() {
   thread_ = std::thread(&VolatileContentRepository::run, shared_from_parent<VolatileContentRepository>());
   thread_.detach();
   running_ = true;
-  logger_->log_info("%s Repository Monitor Thread Start", name_);
+  logger_->log_info("%s Repository Monitor Thread Start", getName());
 }
 
 std::shared_ptr<io::BaseStream> VolatileContentRepository::write(const std::shared_ptr<minifi::ResourceClaim> &claim) {
-  logger_->log_debug("enter write");
+  logger_->log_debug("enter write for %s", claim->getContentFullPath());
   {
     std::lock_guard<std::mutex> lock(map_mutex_);
     auto claim_check = master_list_.find(claim->getContentFullPath());
@@ -133,7 +133,7 @@ std::shared_ptr<io::BaseStream> VolatileContentRepository::write(const std::shar
 }
 
 bool VolatileContentRepository::exists(const std::shared_ptr<minifi::ResourceClaim> &claim) {
-  logger_->log_debug("enter exists");
+  logger_->log_debug("enter exists for %s", claim->getContentFullPath());
   int size = 0;
   {
     std::lock_guard<std::mutex> lock(map_mutex_);
@@ -150,7 +150,7 @@ bool VolatileContentRepository::exists(const std::shared_ptr<minifi::ResourceCla
 }
 
 std::shared_ptr<io::BaseStream> VolatileContentRepository::read(const std::shared_ptr<minifi::ResourceClaim> &claim) {
-  logger_->log_debug("enter read");
+  logger_->log_debug("enter read for %s", claim->getContentFullPath());
   int size = 0;
   {
     std::lock_guard<std::mutex> lock(map_mutex_);
@@ -175,19 +175,28 @@ bool VolatileContentRepository::remove(const std::shared_ptr<minifi::ResourceCla
     if (ent != master_list_.end()) {
       // if we cannot remove the entry we will let the owner's destructor
       // decrement the reference count and free it
+      master_list_.erase(claim->getContentFullPath());
       if (ent->second->freeValue(claim)) {
         logger_->log_debug("removed %s", claim->getContentFullPath());
+        logger_->log_debug("Remove for %s, reduced to %d", claim->getContentFullPath(), current_size_.load());
         return true;
+      } else {
+        logger_->log_debug("free failed for %s", claim->getContentFullPath());
       }
-      master_list_.erase(claim->getContentFullPath());
+    } else {
+      logger_->log_debug("Could not remove for %s, size is %d", claim->getContentFullPath(), current_size_.load());
     }
   } else {
     std::lock_guard<std::mutex> lock(map_mutex_);
+    auto size = master_list_[claim->getContentFullPath()]->getLength();
     delete master_list_[claim->getContentFullPath()];
     master_list_.erase(claim->getContentFullPath());
+    current_size_ -= size;
+    logger_->log_debug("Remove for %s, reduced to %d", claim->getContentFullPath(), current_size_.load());
     return true;
   }
 
+  logger_->log_debug("Remove for %s, reduced to %d", claim->getContentFullPath(), current_size_.load());
   logger_->log_debug("could not remove %s", claim->getContentFullPath());
   return false;
 }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/libminifi/src/provenance/ProvenanceRepository.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/provenance/ProvenanceRepository.cpp b/libminifi/src/provenance/ProvenanceRepository.cpp
deleted file mode 100644
index 665837c..0000000
--- a/libminifi/src/provenance/ProvenanceRepository.cpp
+++ /dev/null
@@ -1,92 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "provenance/ProvenanceRepository.h"
-#include "leveldb/write_batch.h"
-#include <string>
-#include <vector>
-#include "provenance/Provenance.h"
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace provenance {
-
-void ProvenanceRepository::flush() {
-  leveldb::WriteBatch batch;
-  std::string key;
-  std::string value;
-  leveldb::ReadOptions options;
-  uint64_t decrement_total = 0;
-  while (keys_to_delete.size_approx() > 0) {
-    if (keys_to_delete.try_dequeue(key)) {
-      db_->Get(options, key, &value);
-      decrement_total += value.size();
-      batch.Delete(key);
-      logger_->log_info("Removing %s", key);
-    }
-  }
-  if (db_->Write(leveldb::WriteOptions(), &batch).ok()) {
-    logger_->log_info("Decrementing %u from a repo size of %u", decrement_total, repo_size_.load());
-    if (decrement_total > repo_size_.load()) {
-      repo_size_ = 0;
-    } else {
-      repo_size_ -= decrement_total;
-    }
-  }
-}
-
-void ProvenanceRepository::run() {
-  while (running_) {
-    std::this_thread::sleep_for(std::chrono::milliseconds(purge_period_));
-    uint64_t curTime = getTimeMillis();
-    // threshold for purge
-    uint64_t purgeThreshold = max_partition_bytes_ * 3 / 4;
-
-    uint64_t size = getRepoSize();
-
-    if (size >= purgeThreshold) {
-      leveldb::Iterator* it = db_->NewIterator(leveldb::ReadOptions());
-      for (it->SeekToFirst(); it->Valid(); it->Next()) {
-        ProvenanceEventRecord eventRead;
-        std::string key = it->key().ToString();
-        uint64_t eventTime = eventRead.getEventTime(reinterpret_cast<uint8_t*>(const_cast<char*>(it->value().data())), it->value().size());
-        if (eventTime > 0) {
-          if ((curTime - eventTime) > max_partition_millis_)
-            Delete(key);
-        } else {
-          logger_->log_debug("NiFi Provenance retrieve event %s fail", key.c_str());
-          Delete(key);
-        }
-      }
-      delete it;
-    }
-    flush();
-    size = getRepoSize();
-    if (size > max_partition_bytes_)
-      repo_full_ = true;
-    else
-      repo_full_ = false;
-  }
-}
-} /* namespace provenance */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
-

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/libminifi/test/curl-tests/C2NullConfiguration.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/curl-tests/C2NullConfiguration.cpp b/libminifi/test/curl-tests/C2NullConfiguration.cpp
index a2a3906..de8d3b8 100644
--- a/libminifi/test/curl-tests/C2NullConfiguration.cpp
+++ b/libminifi/test/curl-tests/C2NullConfiguration.cpp
@@ -98,8 +98,8 @@ class VerifyC2Server : public IntegrationBase {
     parse_http_components(url, port, scheme, path);
     std::cout << "path is " << path << std::endl;
     configuration->set("c2.agent.protocol.class", "null");
-    configuration->set("c2.rest.url", "null");
-    configuration->set("c2.rest.url.ack", "null");
+    configuration->set("c2.rest.url", "");
+    configuration->set("c2.rest.url.ack", "");
     configuration->set("c2.agent.heartbeat.reporter.classes", "null");
     configuration->set("c2.rest.listener.port", "null");
     configuration->set("c2.agent.heartbeat.period", "null");

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/libminifi/test/nodefs/NoLevelDB.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/nodefs/NoLevelDB.cpp b/libminifi/test/nodefs/NoLevelDB.cpp
deleted file mode 100644
index 6856287..0000000
--- a/libminifi/test/nodefs/NoLevelDB.cpp
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "../TestBase.h"
-#include <memory>
-#include "core/Core.h"
-#include "core/RepositoryFactory.h"
-
-TEST_CASE("NoLevelDBTest1", "[NoLevelDBTest]") {
-  std::shared_ptr<core::Repository> prov_repo = core::createRepository("provenancerepository", true);
-  REQUIRE(nullptr != prov_repo);
-}
-
-TEST_CASE("NoLevelDBTest2", "[NoLevelDBTest]") {
-  std::shared_ptr<core::Repository> prov_repo = core::createRepository("flowfilerepository", true);
-  REQUIRE(nullptr != prov_repo);
-}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/libminifi/test/nodefs/NoRocksDB.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/nodefs/NoRocksDB.cpp b/libminifi/test/nodefs/NoRocksDB.cpp
new file mode 100644
index 0000000..472334f
--- /dev/null
+++ b/libminifi/test/nodefs/NoRocksDB.cpp
@@ -0,0 +1,32 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "../TestBase.h"
+#include <memory>
+#include "core/Core.h"
+#include "core/RepositoryFactory.h"
+
+TEST_CASE("NoRocksDBTest1", "[NoRocksDBTest]") {
+  std::shared_ptr<core::Repository> prov_repo = core::createRepository("provenancerepository", true);
+  REQUIRE(nullptr != prov_repo);
+}
+
+TEST_CASE("NoRocksDBTest2", "[NoRocksDBTest]") {
+  std::shared_ptr<core::Repository> prov_repo = core::createRepository("flowfilerepository", true);
+  REQUIRE(nullptr != prov_repo);
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp b/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp
new file mode 100644
index 0000000..d92c038
--- /dev/null
+++ b/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp
@@ -0,0 +1,246 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#define CATCH_CONFIG_MAIN  // This tells Catch to provide a main() - only do this in one cpp file
+#include "../TestBase.h"
+#include <memory>
+#include <string>
+#include "../unit/ProvenanceTestHelper.h"
+#include "provenance/Provenance.h"
+#include "FlowFileRecord.h"
+#include "core/Core.h"
+#include "DatabaseContentRepository.h"
+#include "properties/Configure.h"
+
+TEST_CASE("Write Claim", "[TestDBCR1]") {
+  TestController testController;
+  char format[] = "/tmp/testRepo.XXXXXX";
+  char *dir = testController.createTempDirectory(format);
+  auto content_repo = std::make_shared<core::repository::DatabaseContentRepository>();
+
+  auto configuration = std::make_shared<org::apache::nifi::minifi::Configure>();
+  configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, dir);
+  REQUIRE(true == content_repo->initialize(configuration));
+
+
+  auto claim = std::make_shared<minifi::ResourceClaim>(content_repo, dir);
+  auto stream = content_repo->write(claim);
+
+  stream->writeUTF("well hello there");
+
+  stream->closeStream();
+
+  content_repo->stop();
+  // reclaim the memory
+  content_repo = nullptr;
+
+  content_repo = std::make_shared<core::repository::DatabaseContentRepository>();
+
+  configuration = std::make_shared<org::apache::nifi::minifi::Configure>();
+  configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, dir);
+  REQUIRE(true == content_repo->initialize(configuration));
+
+  auto read_stream = content_repo->read(claim);
+
+  std::string readstr;
+  read_stream->readUTF(readstr);
+
+  REQUIRE(readstr == "well hello there");
+
+  // should not be able to write to the read stream
+  // -1 will indicate that we were not able to write any data
+
+  REQUIRE(read_stream->writeUTF("other value") == -1);
+}
+
+TEST_CASE("Delete Claim", "[TestDBCR2]") {
+  TestController testController;
+  char format[] = "/tmp/testRepo.XXXXXX";
+  char *dir = testController.createTempDirectory(format);
+  auto content_repo = std::make_shared<core::repository::DatabaseContentRepository>();
+
+  auto configuration = std::make_shared<org::apache::nifi::minifi::Configure>();
+  configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, dir);
+  REQUIRE(true == content_repo->initialize(configuration));
+
+
+  auto claim = std::make_shared<minifi::ResourceClaim>(content_repo, dir);
+  auto stream = content_repo->write(claim);
+
+  stream->writeUTF("well hello there");
+
+  stream->closeStream();
+
+  content_repo->stop();
+
+  // reclaim the memory
+  content_repo = nullptr;
+
+  content_repo = std::make_shared<core::repository::DatabaseContentRepository>();
+
+  configuration = std::make_shared<org::apache::nifi::minifi::Configure>();
+  configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, dir);
+  REQUIRE(true == content_repo->initialize(configuration));
+
+  content_repo->remove(claim);
+
+  auto read_stream = content_repo->read(claim);
+
+  std::string readstr;
+
+  // -1 tell us we have an invalid stream
+  REQUIRE(read_stream->readUTF(readstr) == -1);
+}
+
+TEST_CASE("Test Empty Claim", "[TestDBCR3]") {
+  TestController testController;
+  char format[] = "/tmp/testRepo.XXXXXX";
+  char *dir = testController.createTempDirectory(format);
+  auto content_repo = std::make_shared<core::repository::DatabaseContentRepository>();
+
+  auto configuration = std::make_shared<org::apache::nifi::minifi::Configure>();
+  configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, dir);
+  REQUIRE(true == content_repo->initialize(configuration));
+
+  auto claim = std::make_shared<minifi::ResourceClaim>(content_repo, dir);
+  auto stream = content_repo->write(claim);
+
+  // we're writing nothing to the stream.
+
+  stream->closeStream();
+
+  content_repo->stop();
+
+  // reclaim the memory
+  content_repo = nullptr;
+
+  content_repo = std::make_shared<core::repository::DatabaseContentRepository>();
+
+  configuration = std::make_shared<org::apache::nifi::minifi::Configure>();
+  configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, dir);
+  REQUIRE(true == content_repo->initialize(configuration));
+
+  auto read_stream = content_repo->read(claim);
+
+  std::string readstr;
+
+  // -1 tell us we have an invalid stream
+  REQUIRE(read_stream->readUTF(readstr) == -1);
+}
+
+TEST_CASE("Test Null Claim", "[TestDBCR4]") {
+  TestController testController;
+  char format[] = "/tmp/testRepo.XXXXXX";
+  char *dir = testController.createTempDirectory(format);
+  auto content_repo = std::make_shared<core::repository::DatabaseContentRepository>();
+
+  auto configuration = std::make_shared<org::apache::nifi::minifi::Configure>();
+  configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, dir);
+  REQUIRE(true == content_repo->initialize(configuration));
+
+
+  auto claim = std::make_shared<minifi::ResourceClaim>(content_repo, dir);
+  auto stream = content_repo->write(nullptr);
+
+  REQUIRE(stream == nullptr);
+
+  auto read_stream = content_repo->write(nullptr);
+
+  REQUIRE(read_stream == nullptr);
+}
+
+TEST_CASE("Delete Null Claim", "[TestDBCR5]") {
+  TestController testController;
+  char format[] = "/tmp/testRepo.XXXXXX";
+  char *dir = testController.createTempDirectory(format);
+  auto content_repo = std::make_shared<core::repository::DatabaseContentRepository>();
+
+  auto configuration = std::make_shared<org::apache::nifi::minifi::Configure>();
+  configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, dir);
+  REQUIRE(true == content_repo->initialize(configuration));
+
+  auto claim = std::make_shared<minifi::ResourceClaim>(content_repo, dir);
+  auto stream = content_repo->write(claim);
+
+  stream->writeUTF("well hello there");
+
+  stream->closeStream();
+
+  content_repo->stop();
+
+  // reclaim the memory
+  content_repo = nullptr;
+
+  content_repo = std::make_shared<core::repository::DatabaseContentRepository>();
+
+  configuration = std::make_shared<org::apache::nifi::minifi::Configure>();
+  configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, dir);
+  REQUIRE(true == content_repo->initialize(configuration));
+
+  REQUIRE(false == content_repo->remove(nullptr));
+
+  auto read_stream = content_repo->read(claim);
+
+  std::string readstr;
+
+  // -1 tell us we have an invalid stream
+  read_stream->readUTF(readstr);
+
+  REQUIRE(readstr == "well hello there");
+}
+
+TEST_CASE("Delete NonExistent Claim", "[TestDBCR5]") {
+  TestController testController;
+  char format[] = "/tmp/testRepo.XXXXXX";
+  char *dir = testController.createTempDirectory(format);
+  auto content_repo = std::make_shared<core::repository::DatabaseContentRepository>();
+
+  auto configuration = std::make_shared<org::apache::nifi::minifi::Configure>();
+  configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, dir);
+  REQUIRE(true == content_repo->initialize(configuration));
+
+  auto claim = std::make_shared<minifi::ResourceClaim>(content_repo, dir);
+  auto claim2 = std::make_shared<minifi::ResourceClaim>(content_repo, dir);
+  auto stream = content_repo->write(claim);
+
+  stream->writeUTF("well hello there");
+
+  stream->closeStream();
+
+  content_repo->stop();
+
+  // reclaim the memory
+  content_repo = nullptr;
+
+  content_repo = std::make_shared<core::repository::DatabaseContentRepository>();
+
+  configuration = std::make_shared<org::apache::nifi::minifi::Configure>();
+  configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, dir);
+  REQUIRE(true == content_repo->initialize(configuration));
+
+  // we won't complain if it does not exist
+  REQUIRE(true == content_repo->remove(claim2));
+
+  auto read_stream = content_repo->read(claim);
+
+  std::string readstr;
+
+  // -1 tell us we have an invalid stream
+  read_stream->readUTF(readstr);
+
+  REQUIRE(readstr == "well hello there");
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/libminifi/test/rocksdb-tests/ProvenanceTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/rocksdb-tests/ProvenanceTests.cpp b/libminifi/test/rocksdb-tests/ProvenanceTests.cpp
new file mode 100644
index 0000000..ffc5d98
--- /dev/null
+++ b/libminifi/test/rocksdb-tests/ProvenanceTests.cpp
@@ -0,0 +1,165 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#define CATCH_CONFIG_MAIN  // This tells Catch to provide a main() - only do this in one cpp file
+#include "../TestBase.h"
+#include <utility>
+#include <memory>
+#include <string>
+#include <map>
+#include "../unit/ProvenanceTestHelper.h"
+#include "provenance/Provenance.h"
+#include "FlowFileRecord.h"
+#include "core/Core.h"
+#include "core/repository/AtomicRepoEntries.h"
+#include "FlowFileRepository.h"
+#include "core/repository/VolatileProvenanceRepository.h"
+
+TEST_CASE("Test Provenance record create", "[Testprovenance::ProvenanceEventRecord]") {
+  provenance::ProvenanceEventRecord record1(provenance::ProvenanceEventRecord::ProvenanceEventType::CREATE, "blah", "blahblah");
+  REQUIRE(record1.getAttributes().size() == 0);
+  REQUIRE(record1.getAlternateIdentifierUri().length() == 0);
+}
+
+TEST_CASE("Test Provenance record serialization", "[Testprovenance::ProvenanceEventRecordSerializeDeser]") {
+  provenance::ProvenanceEventRecord record1(provenance::ProvenanceEventRecord::ProvenanceEventType::CREATE, "componentid", "componenttype");
+
+  std::string eventId = record1.getEventId();
+
+  std::string smileyface = ":)";
+  record1.setDetails(smileyface);
+
+  uint64_t sample = 65555;
+  std::shared_ptr<core::Repository> testRepository = std::make_shared<TestRepository>();
+  record1.setEventDuration(sample);
+
+  record1.Serialize(testRepository);
+  provenance::ProvenanceEventRecord record2;
+  record2.setEventId(eventId);
+  REQUIRE(record2.DeSerialize(testRepository) == true);
+  REQUIRE(record2.getEventId() == record1.getEventId());
+  REQUIRE(record2.getComponentId() == record1.getComponentId());
+  REQUIRE(record2.getComponentType() == record1.getComponentType());
+  REQUIRE(record2.getDetails() == record1.getDetails());
+  REQUIRE(record2.getDetails() == smileyface);
+  REQUIRE(record2.getEventDuration() == sample);
+}
+
+TEST_CASE("Test Flowfile record added to provenance", "[TestFlowAndProv1]") {
+  provenance::ProvenanceEventRecord record1(provenance::ProvenanceEventRecord::ProvenanceEventType::CLONE, "componentid", "componenttype");
+  std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
+  std::string eventId = record1.getEventId();
+  std::map<std::string, std::string> attributes;
+  attributes.insert(std::pair<std::string, std::string>("potato", "potatoe"));
+  attributes.insert(std::pair<std::string, std::string>("tomato", "tomatoe"));
+  std::shared_ptr<core::repository::FlowFileRepository> frepo = std::make_shared<core::repository::FlowFileRepository>("ff", "./content_repository", 0, 0, 0);
+  std::shared_ptr<minifi::FlowFileRecord> ffr1 = std::make_shared<minifi::FlowFileRecord>(frepo, content_repo, attributes);
+
+  record1.addChildFlowFile(ffr1);
+
+  uint64_t sample = 65555;
+  std::shared_ptr<core::Repository> testRepository = std::make_shared<TestRepository>();
+  record1.setEventDuration(sample);
+
+  record1.Serialize(testRepository);
+  provenance::ProvenanceEventRecord record2;
+  record2.setEventId(eventId);
+  REQUIRE(record2.DeSerialize(testRepository) == true);
+  REQUIRE(record1.getChildrenUuids().size() == 1);
+  REQUIRE(record2.getChildrenUuids().size() == 1);
+  std::string childId = record2.getChildrenUuids().at(0);
+  REQUIRE(childId == ffr1->getUUIDStr());
+  record2.removeChildUuid(childId);
+  REQUIRE(record2.getChildrenUuids().size() == 0);
+}
+
+TEST_CASE("Test Provenance record serialization Volatile", "[Testprovenance::ProvenanceEventRecordSerializeDeser]") {
+  provenance::ProvenanceEventRecord record1(provenance::ProvenanceEventRecord::ProvenanceEventType::CREATE, "componentid", "componenttype");
+
+  std::string eventId = record1.getEventId();
+
+  std::string smileyface = ":)";
+  record1.setDetails(smileyface);
+
+  uint64_t sample = 65555;
+
+  std::shared_ptr<core::Repository> testRepository = std::make_shared<core::repository::VolatileProvenanceRepository>();
+  testRepository->initialize(0);
+  record1.setEventDuration(sample);
+
+  record1.Serialize(testRepository);
+  provenance::ProvenanceEventRecord record2;
+  record2.setEventId(eventId);
+  REQUIRE(record2.DeSerialize(testRepository) == true);
+  REQUIRE(record2.getEventId() == record1.getEventId());
+  REQUIRE(record2.getComponentId() == record1.getComponentId());
+  REQUIRE(record2.getComponentType() == record1.getComponentType());
+  REQUIRE(record2.getDetails() == record1.getDetails());
+  REQUIRE(record2.getDetails() == smileyface);
+  REQUIRE(record2.getEventDuration() == sample);
+}
+
+TEST_CASE("Test Flowfile record added to provenance using Volatile Repo", "[TestFlowAndProv1]") {
+  provenance::ProvenanceEventRecord record1(provenance::ProvenanceEventRecord::ProvenanceEventType::CLONE, "componentid", "componenttype");
+  std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
+  std::string eventId = record1.getEventId();
+  std::map<std::string, std::string> attributes;
+  attributes.insert(std::pair<std::string, std::string>("potato", "potatoe"));
+  attributes.insert(std::pair<std::string, std::string>("tomato", "tomatoe"));
+  std::shared_ptr<core::Repository> frepo = std::make_shared<core::repository::VolatileProvenanceRepository>();
+  frepo->initialize(0);
+  std::shared_ptr<minifi::FlowFileRecord> ffr1 = std::make_shared<minifi::FlowFileRecord>(frepo, content_repo, attributes);
+
+  record1.addChildFlowFile(ffr1);
+
+  uint64_t sample = 65555;
+  std::shared_ptr<core::Repository> testRepository = std::make_shared<core::repository::VolatileProvenanceRepository>();
+  testRepository->initialize(0);
+  record1.setEventDuration(sample);
+
+  record1.Serialize(testRepository);
+  provenance::ProvenanceEventRecord record2;
+  record2.setEventId(eventId);
+  REQUIRE(record2.DeSerialize(testRepository) == true);
+  REQUIRE(record1.getChildrenUuids().size() == 1);
+  REQUIRE(record2.getChildrenUuids().size() == 1);
+  std::string childId = record2.getChildrenUuids().at(0);
+  REQUIRE(childId == ffr1->getUUIDStr());
+  record2.removeChildUuid(childId);
+  REQUIRE(record2.getChildrenUuids().size() == 0);
+}
+
+TEST_CASE("Test Provenance record serialization NoOp", "[Testprovenance::ProvenanceEventRecordSerializeDeser]") {
+  provenance::ProvenanceEventRecord record1(provenance::ProvenanceEventRecord::ProvenanceEventType::CREATE, "componentid", "componenttype");
+
+  std::string eventId = record1.getEventId();
+
+  std::string smileyface = ":)";
+  record1.setDetails(smileyface);
+
+  uint64_t sample = 65555;
+
+  std::shared_ptr<core::Repository> testRepository = std::make_shared<core::Repository>();
+  testRepository->initialize(0);
+  record1.setEventDuration(sample);
+
+  REQUIRE(record1.Serialize(testRepository) == true);
+  provenance::ProvenanceEventRecord record2;
+  record2.setEventId(eventId);
+  REQUIRE(record2.DeSerialize(testRepository) == false);
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/libminifi/test/rocksdb-tests/RepoTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/rocksdb-tests/RepoTests.cpp b/libminifi/test/rocksdb-tests/RepoTests.cpp
new file mode 100644
index 0000000..e966aba
--- /dev/null
+++ b/libminifi/test/rocksdb-tests/RepoTests.cpp
@@ -0,0 +1,162 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#define CATCH_CONFIG_MAIN  // This tells Catch to provide a main() - only do this in one cpp file
+#include "../TestBase.h"
+#include <memory>
+#include <string>
+#include <map>
+#include "../unit/ProvenanceTestHelper.h"
+#include "provenance/Provenance.h"
+#include "FlowFileRecord.h"
+#include "core/Core.h"
+#include "FlowFileRepository.h"
+#include "core/repository/AtomicRepoEntries.h"
+#include "properties/Configure.h"
+
+TEST_CASE("Test Repo Empty Value Attribute", "[TestFFR1]") {
+  TestController testController;
+  char format[] = "/tmp/testRepo.XXXXXX";
+  char *dir = testController.createTempDirectory(format);
+  std::shared_ptr<core::repository::FlowFileRepository> repository = std::make_shared<core::repository::FlowFileRepository>("ff", dir, 0, 0, 1);
+
+  repository->initialize(std::make_shared<minifi::Configure>());
+
+  std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
+  minifi::FlowFileRecord record(repository, content_repo);
+
+  record.addAttribute("keyA", "");
+
+  REQUIRE(true == record.Serialize());
+
+  repository->stop();
+}
+
+TEST_CASE("Test Repo Empty Key Attribute ", "[TestFFR2]") {
+  TestController testController;
+  char format[] = "/tmp/testRepo.XXXXXX";
+  char *dir = testController.createTempDirectory(format);
+  std::shared_ptr<core::repository::FlowFileRepository> repository = std::make_shared<core::repository::FlowFileRepository>("ff", dir, 0, 0, 1);
+
+  repository->initialize(std::make_shared<minifi::Configure>());
+  std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
+  minifi::FlowFileRecord record(repository, content_repo);
+
+  record.addAttribute("keyA", "hasdgasdgjsdgasgdsgsadaskgasd");
+
+  record.addAttribute("", "hasdgasdgjsdgasgdsgsadaskgasd");
+
+  REQUIRE(true == record.Serialize());
+
+  repository->stop();
+}
+
+TEST_CASE("Test Repo Key Attribute Verify ", "[TestFFR3]") {
+  TestController testController;
+  char format[] = "/tmp/testRepo.XXXXXX";
+  char *dir = testController.createTempDirectory(format);
+  std::shared_ptr<core::repository::FlowFileRepository> repository = std::make_shared<core::repository::FlowFileRepository>("ff", dir, 0, 0, 1);
+
+  repository->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
+
+  std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
+  minifi::FlowFileRecord record(repository, content_repo);
+
+  minifi::FlowFileRecord record2(repository, content_repo);
+
+  std::string uuid = record.getUUIDStr();
+
+  record.addAttribute("keyA", "hasdgasdgjsdgasgdsgsadaskgasd");
+
+  record.addAttribute("keyB", "");
+
+  record.addAttribute("", "");
+
+  record.updateAttribute("", "hasdgasdgjsdgasgdsgsadaskgasd2");
+
+  record.addAttribute("", "sdgsdg");
+
+  REQUIRE(true == record.Serialize());
+
+  repository->stop();
+
+  record2.DeSerialize(uuid);
+
+  std::string value;
+  REQUIRE(true == record2.getAttribute("", value));
+
+  REQUIRE("hasdgasdgjsdgasgdsgsadaskgasd2" == value);
+
+  REQUIRE(false == record2.getAttribute("key", value));
+  REQUIRE(true == record2.getAttribute("keyA", value));
+  REQUIRE("hasdgasdgjsdgasgdsgsadaskgasd" == value);
+
+  REQUIRE(true == record2.getAttribute("keyB", value));
+  REQUIRE("" == value);
+}
+
+TEST_CASE("Test Delete Content ", "[TestFFR4]") {
+  TestController testController;
+  char format[] = "/tmp/testRepo.XXXXXX";
+  LogTestController::getInstance().setDebug<core::ContentRepository>();
+  LogTestController::getInstance().setDebug<core::repository::FileSystemRepository>();
+  LogTestController::getInstance().setDebug<core::repository::FlowFileRepository>();
+
+  char *dir = testController.createTempDirectory(format);
+
+  std::shared_ptr<core::repository::FlowFileRepository> repository = std::make_shared<core::repository::FlowFileRepository>("ff", dir, 0, 0, 1);
+
+  std::map<std::string, std::string> attributes;
+
+  std::fstream file;
+  std::stringstream ss;
+  ss << dir << "/" << "tstFile.ext";
+  file.open(ss.str(), std::ios::out);
+  file << "tempFile";
+  file.close();
+
+  std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::FileSystemRepository>();
+
+  repository->initialize(std::make_shared<minifi::Configure>());
+
+  repository->loadComponent(content_repo);
+
+  std::shared_ptr<minifi::ResourceClaim> claim = std::make_shared<minifi::ResourceClaim>(ss.str(), content_repo);
+
+  minifi::FlowFileRecord record(repository, content_repo, attributes, claim);
+
+  record.addAttribute("keyA", "hasdgasdgjsdgasgdsgsadaskgasd");
+
+  record.addAttribute("", "hasdgasdgjsdgasgdsgsadaskgasd");
+
+  REQUIRE(true == record.Serialize());
+
+  claim->decreaseFlowFileRecordOwnedCount();
+
+  claim->decreaseFlowFileRecordOwnedCount();
+
+  repository->Delete(record.getUUIDStr());
+
+  repository->flush();
+
+  repository->stop();
+
+  std::ifstream fileopen(ss.str());
+  REQUIRE(false == fileopen.good());
+
+  LogTestController::getInstance().reset();
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/libminifi/test/unit/ProvenanceTestHelper.h
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/ProvenanceTestHelper.h b/libminifi/test/unit/ProvenanceTestHelper.h
index 2583a09..d3bccd0 100644
--- a/libminifi/test/unit/ProvenanceTestHelper.h
+++ b/libminifi/test/unit/ProvenanceTestHelper.h
@@ -28,7 +28,6 @@
 #include <vector>
 #include "core/repository/VolatileContentRepository.h"
 #include "../../include/core/Processor.h"
-#include "../../include/core/repository/FlowFileRepository.h"
 #include "../../include/Connection.h"
 #include "../../include/FlowController.h"
 #include "../../include/properties/Configure.h"
@@ -40,7 +39,7 @@
 class TestRepository : public core::Repository {
  public:
   TestRepository()
-      : Repository("repo_name", "./dir", 1000, 100, 0) {
+      : Repository("repo_name", "./dir", 1000, 100, 0), core::SerializableComponent("repo_name",0) {
   }
   // initialize
   bool initialize() {
@@ -145,10 +144,10 @@ class TestRepository : public core::Repository {
   std::map<std::string, std::string> repositoryResults;
 };
 
-class TestFlowRepository : public core::repository::FlowFileRepository {
+class TestFlowRepository : public core::Repository {
  public:
   TestFlowRepository()
-      : core::repository::FlowFileRepository("ff", "./dir", 1000, 100, 0) {
+      : core::Repository("ff", "./dir", 1000, 100, 0), core::SerializableComponent("ff",0) {
   }
   // initialize
   bool initialize() {


Mime
View raw message