nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ald...@apache.org
Subject [7/8] nifi-minifi-cpp git commit: MINIFI-339: Add C2 base allowing for 1 protocol and n heartbeat reporters MINIFI-339: Add GetTCP Processor MINIFI-339: Add listener server MINIFI-339: Update to listener MINIFI-339: Resolve Issue with stack based process
Date Mon, 02 Oct 2017 14:57:20 GMT
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/c2/protocols/RESTSender.h
----------------------------------------------------------------------
diff --git a/libminifi/include/c2/protocols/RESTSender.h b/libminifi/include/c2/protocols/RESTSender.h
new file mode 100644
index 0000000..749de3e
--- /dev/null
+++ b/libminifi/include/c2/protocols/RESTSender.h
@@ -0,0 +1,80 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_C2_RESTSENDER_H_
+#define LIBMINIFI_INCLUDE_C2_RESTSENDER_H_
+
+#include "json/json.h"
+#include "json/writer.h"
+#include <string>
+#include <mutex>
+#include "CivetServer.h"
+#include "../C2Protocol.h"
+#include "RESTProtocol.h"
+#include "../HeartBeatReporter.h"
+#include "controllers/SSLContextService.h"
+#include "utils/ByteInputCallBack.h"
+#include "utils/HTTPClient.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace c2 {
+
+/**
+ * Purpose and Justification: Encapsulates the restful protocol that is built upon C2Protocol.
+ *
+ * The external interfaces rely solely on send, where send includes a Direction. Transmit will perform a POST
+ * and RECEIVE will perform a GET. This does not mean we can't receive on a POST; however, since Direction
+ * will encompass other protocols the context of its meaning here simply translates into POST and GET respectively.
+ *
+ */
+class RESTSender : public RESTProtocol, public C2Protocol {
+ public:
+
+  explicit RESTSender(std::string name, uuid_t uuid = nullptr);
+
+  virtual C2Payload consumePayload(const std::string &url, const C2Payload &payload, Direction direction, bool async);
+
+  virtual C2Payload consumePayload(const C2Payload &payload, Direction direction, bool async);
+
+  virtual void update(const std::shared_ptr<Configure> &configure);
+
+  virtual void initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<Configure> &configure);
+
+ protected:
+
+  virtual const C2Payload sendPayload(const std::string url, const Direction direction, const C2Payload &payload, const std::string outputConfig);
+
+  std::shared_ptr<minifi::controllers::SSLContextService> ssl_context_service_;
+
+ private:
+  std::shared_ptr<logging::Logger> logger_;
+  std::string rest_uri_;
+  std::string ack_uri_;
+};
+
+REGISTER_RESOURCE(RESTSender);
+
+} /* namesapce c2 */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_C2_RESTPROTOCOL_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/controllers/SSLContextService.h
----------------------------------------------------------------------
diff --git a/libminifi/include/controllers/SSLContextService.h b/libminifi/include/controllers/SSLContextService.h
index c48d30f..9ceb10c 100644
--- a/libminifi/include/controllers/SSLContextService.h
+++ b/libminifi/include/controllers/SSLContextService.h
@@ -73,6 +73,37 @@ class SSLContextService : public core::controller::ControllerService {
         logger_(logging::LoggerFactory<SSLContextService>::getLogger()) {
   }
 
+  explicit SSLContextService(const std::string &name, const std::shared_ptr<Configure> &configuration)
+      : ControllerService(name, nullptr),
+        initialized_(false),
+        valid_(false),
+        logger_(logging::LoggerFactory<SSLContextService>::getLogger()) {
+    setConfiguration(configuration);
+    initialize();
+    // set the properties based on the configuration
+    core::Property property("Client Certificate", "Client Certificate");
+    core::Property privKey("Private Key", "Private Key file");
+    core::Property passphrase_prop("Passphrase", "Client passphrase. Either a file or unencrypted text");
+    core::Property caCert("CA Certificate", "CA certificate file");
+
+    std::string value;
+    if (configuration_->get(Configure::nifi_security_client_certificate, value)) {
+      setProperty(property.getName(), value);
+    }
+
+    if (configuration_->get(Configure::nifi_security_client_private_key, value)) {
+      setProperty(privKey.getName(), value);
+    }
+
+    if (configuration_->get(Configure::nifi_security_client_pass_phrase, value)) {
+      setProperty(passphrase_prop.getName(), value);
+    }
+
+    if (configuration_->get(Configure::nifi_security_client_ca_certificate, value)) {
+      setProperty(caCert.getName(), value);
+    }
+  }
+
   virtual void initialize();
 
   std::unique_ptr<SSLContext> createSSLContext();
@@ -114,8 +145,7 @@ class SSLContextService : public core::controller::ControllerService {
     if (!IsNullOrEmpty(private_key_)) {
       int retp = SSL_CTX_use_PrivateKey_file(ctx, private_key_.c_str(), SSL_FILETYPE_PEM);
       if (retp != 1) {
-        logger_->log_error("Could not create load private key,%i on %s error : %s", retp, private_key_,
-                           std::strerror(errno));
+        logger_->log_error("Could not create load private key,%i on %s error : %s", retp, private_key_, std::strerror(errno));
         return false;
       }
 
@@ -136,6 +166,8 @@ class SSLContextService : public core::controller::ControllerService {
     return true;
   }
 
+  virtual void onEnable();
+
  protected:
 
   static int pemPassWordCb(char *buf, int size, int rwflag, void *userdata) {
@@ -153,8 +185,6 @@ class SSLContextService : public core::controller::ControllerService {
 
   virtual void initializeTLS();
 
-  virtual void onEnable();
-
   std::mutex initialization_mutex_;
   std::atomic<bool> initialized_;
   std::atomic<bool> valid_;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/core/ClassLoader.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/ClassLoader.h b/libminifi/include/core/ClassLoader.h
index 31292b2..b6cefc5 100644
--- a/libminifi/include/core/ClassLoader.h
+++ b/libminifi/include/core/ClassLoader.h
@@ -62,11 +62,25 @@ class ObjectFactory {
   /**
    * Create a shared pointer to a new processor.
    */
+  virtual Connectable *createRaw(const std::string &name) {
+    return nullptr;
+  }
+
+  /**
+   * Create a shared pointer to a new processor.
+   */
   virtual std::shared_ptr<Connectable> create(const std::string &name, uuid_t uuid) {
     return nullptr;
   }
 
   /**
+   * Create a shared pointer to a new processor.
+   */
+  virtual Connectable* createRaw(const std::string &name, uuid_t uuid) {
+    return nullptr;
+  }
+
+  /**
    * Gets the name of the object.
    * @return class name of processor
    */
@@ -116,6 +130,22 @@ class DefautObjectFactory : public ObjectFactory {
   }
 
   /**
+   * Create a shared pointer to a new processor.
+   */
+  virtual Connectable* createRaw(const std::string &name) {
+    T *ptr = new T(name);
+    return dynamic_cast<Connectable*>(ptr);
+  }
+
+  /**
+   * Create a shared pointer to a new processor.
+   */
+  virtual Connectable* createRaw(const std::string &name, uuid_t uuid) {
+    T *ptr = new T(name, uuid);
+    return dynamic_cast<Connectable*>(ptr);
+  }
+
+  /**
    * Gets the name of the object.
    * @return class name of processor
    */
@@ -203,6 +233,24 @@ class ClassLoader {
   template<class T = Connectable>
   std::shared_ptr<T> instantiate(const std::string &class_name, uuid_t uuid);
 
+  /**
+   * Instantiate object based on class_name
+   * @param class_name class to create
+   * @param uuid uuid of object
+   * @return nullptr or object created from class_name definition.
+   */
+  template<class T = Connectable>
+  T *instantiateRaw(const std::string &class_name, const std::string &name);
+
+  /**
+   * Instantiate object based on class_name
+   * @param class_name class to create
+   * @param uuid uuid of object
+   * @return nullptr or object created from class_name definition.
+   */
+  template<class T = Connectable>
+  T *instantiateRaw(const std::string &class_name, uuid_t uuid);
+
  protected:
 
   std::map<std::string, std::unique_ptr<ObjectFactory>> loaded_factories_;
@@ -239,6 +287,30 @@ std::shared_ptr<T> ClassLoader::instantiate(const std::string &class_name, uuid_
   }
 }
 
+template<class T>
+T *ClassLoader::instantiateRaw(const std::string &class_name, const std::string &name) {
+  std::lock_guard<std::mutex> lock(internal_mutex_);
+  auto factory_entry = loaded_factories_.find(class_name);
+  if (factory_entry != loaded_factories_.end()) {
+    auto obj = factory_entry->second->createRaw(name);
+    return dynamic_cast<T*>(obj);
+  } else {
+    return nullptr;
+  }
+}
+
+template<class T>
+T *ClassLoader::instantiateRaw(const std::string &class_name, uuid_t uuid) {
+  std::lock_guard<std::mutex> lock(internal_mutex_);
+  auto factory_entry = loaded_factories_.find(class_name);
+  if (factory_entry != loaded_factories_.end()) {
+    auto obj = factory_entry->second->createRaw(class_name, uuid);
+    return dynamic_cast<T*>(obj);
+  } else {
+    return nullptr;
+  }
+}
+
 }/* namespace core */
 } /* namespace minifi */
 } /* namespace nifi */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/core/ConfigurationFactory.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/ConfigurationFactory.h b/libminifi/include/core/ConfigurationFactory.h
index 61af8cd..b71da7a 100644
--- a/libminifi/include/core/ConfigurationFactory.h
+++ b/libminifi/include/core/ConfigurationFactory.h
@@ -30,18 +30,15 @@ namespace core {
 
 template<typename T>
 typename std::enable_if<!class_operations<T>::value, T*>::type instantiate(const std::shared_ptr<core::Repository> &repo, const std::shared_ptr<core::Repository> &flow_file_repo,
-                                                                           const std::shared_ptr<core::ContentRepository> &content_repo,
-                                                                           std::shared_ptr<Configure> configuration,
+                                                                           const std::shared_ptr<core::ContentRepository> &content_repo, std::shared_ptr<Configure> configuration,
                                                                            const std::string path) {
   throw std::runtime_error("Cannot instantiate class");
 }
 
 template<typename T>
 typename std::enable_if<class_operations<T>::value, T*>::type instantiate(const std::shared_ptr<core::Repository> &repo, const std::shared_ptr<core::Repository> &flow_file_repo,
-                                                                          const std::shared_ptr<core::ContentRepository> &content_repo,
-                                                                          const std::shared_ptr<io::StreamFactory> &stream_factory,
-                                                                          std::shared_ptr<Configure> configuration,
-                                                                          const std::string path) {
+                                                                          const std::shared_ptr<core::ContentRepository> &content_repo, const std::shared_ptr<io::StreamFactory> &stream_factory,
+                                                                          std::shared_ptr<Configure> configuration, const std::string path) {
   return new T(repo, flow_file_repo, content_repo, stream_factory, configuration, path);
 }
 
@@ -50,10 +47,8 @@ typename std::enable_if<class_operations<T>::value, T*>::type instantiate(const
  * object.
  */
 std::unique_ptr<core::FlowConfiguration> createFlowConfiguration(std::shared_ptr<core::Repository> repo, std::shared_ptr<core::Repository> flow_file_repo,
-                                                                 std::shared_ptr<core::ContentRepository> content_repo,
-                                                                 std::shared_ptr<Configure> configure,
-                                                                 std::shared_ptr<io::StreamFactory> stream_factory,
-                                                                 const std::string configuration_class_name, const std::string path = "",
+                                                                 std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<Configure> configure,
+                                                                 std::shared_ptr<io::StreamFactory> stream_factory, const std::string configuration_class_name, const std::string path = "",
                                                                  bool fail_safe = false);
 
 } /* namespace core */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/core/ContentRepository.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/ContentRepository.h b/libminifi/include/core/ContentRepository.h
index b544ca0..5558b93 100644
--- a/libminifi/include/core/ContentRepository.h
+++ b/libminifi/include/core/ContentRepository.h
@@ -49,6 +49,56 @@ class ContentRepository : public StreamManager<minifi::ResourceClaim> {
    */
   virtual void stop() = 0;
 
+  /**
+   * Removes an item if it was orphan
+   */
+  virtual bool removeIfOrphaned(const std::shared_ptr<minifi::ResourceClaim> &streamId) {
+    std::lock_guard<std::mutex> lock(count_map_mutex_);
+    const std::string str = streamId->getContentFullPath();
+    auto count = count_map_.find(str);
+    if (count != count_map_.end()) {
+      if (count_map_[str] == 0) {
+        remove(streamId);
+        return true;
+      }
+    }
+
+    return false;
+  }
+
+  virtual uint32_t getStreamCount(const std::shared_ptr<minifi::ResourceClaim> &streamId) {
+    std::lock_guard<std::mutex> lock(count_map_mutex_);
+    return count_map_[streamId->getContentFullPath()];
+  }
+
+  virtual void incrementStreamCount(const std::shared_ptr<minifi::ResourceClaim> &streamId) {
+    std::lock_guard<std::mutex> lock(count_map_mutex_);
+    const std::string str = streamId->getContentFullPath();
+    auto count = count_map_.find(str);
+    if (count != count_map_.end()) {
+      count_map_[str] = count->second + 1;
+    } else {
+      count_map_[str] = 1;
+    }
+  }
+
+  virtual void decrementStreamCount(const std::shared_ptr<minifi::ResourceClaim> &streamId) {
+    std::lock_guard<std::mutex> lock(count_map_mutex_);
+    const std::string str = streamId->getContentFullPath();
+    auto count = count_map_.find(str);
+    if (count != count_map_.end() && count->second > 0) {
+      count_map_[str] = count->second - 1;
+    } else {
+      count_map_[str] = 0;
+    }
+  }
+
+ protected:
+
+  std::mutex count_map_mutex_;
+
+  std::map<std::string, uint32_t> count_map_;
+
 };
 
 } /* namespace core */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/core/FlowConfiguration.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/FlowConfiguration.h b/libminifi/include/core/FlowConfiguration.h
index d9ebc72..ffc567f 100644
--- a/libminifi/include/core/FlowConfiguration.h
+++ b/libminifi/include/core/FlowConfiguration.h
@@ -60,8 +60,7 @@ class FlowConfiguration : public CoreComponent {
    * the flow controller.
    */
   explicit FlowConfiguration(std::shared_ptr<core::Repository> repo, std::shared_ptr<core::Repository> flow_file_repo, std::shared_ptr<core::ContentRepository> content_repo,
-                             std::shared_ptr<io::StreamFactory> stream_factory,
-                             std::shared_ptr<Configure> configuration, const std::string path)
+                             std::shared_ptr<io::StreamFactory> stream_factory, std::shared_ptr<Configure> configuration, const std::string path)
       : CoreComponent(core::getClassName<FlowConfiguration>()),
         flow_file_repo_(flow_file_repo),
         content_repo_(content_repo),
@@ -102,8 +101,7 @@ class FlowConfiguration : public CoreComponent {
     return getRoot(config_path_);
   }
 
-  virtual std::unique_ptr<core::ProcessGroup> getRootFromPayload(
-                                                                 std::string &yamlConfigPayload) {
+  virtual std::unique_ptr<core::ProcessGroup> getRootFromPayload(const std::string &yamlConfigPayload) {
     return nullptr;
   }
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/core/ProcessContext.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/ProcessContext.h b/libminifi/include/core/ProcessContext.h
index f6aaf5e..1158c71 100644
--- a/libminifi/include/core/ProcessContext.h
+++ b/libminifi/include/core/ProcessContext.h
@@ -48,49 +48,49 @@ class ProcessContext : public controller::ControllerServiceLookup {
   /*!
    * Create a new process context associated with the processor/controller service/state manager
    */
-  ProcessContext(ProcessorNode &processor, std::shared_ptr<controller::ControllerServiceProvider> &controller_service_provider, std::shared_ptr<core::Repository> repo,
-		 std::shared_ptr<core::Repository> flow_repo,
-                 std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::FileSystemRepository>())
+  ProcessContext(const std::shared_ptr<ProcessorNode> &processor, std::shared_ptr<controller::ControllerServiceProvider> &controller_service_provider, const std::shared_ptr<core::Repository> &repo,
+                 const std::shared_ptr<core::Repository> &flow_repo, const std::shared_ptr<core::ContentRepository> &content_repo = std::make_shared<core::repository::FileSystemRepository>())
       : processor_node_(processor),
         controller_service_provider_(controller_service_provider),
         logger_(logging::LoggerFactory<ProcessContext>::getLogger()),
-        content_repo_(content_repo), flow_repo_(flow_repo) {
+        content_repo_(content_repo),
+        flow_repo_(flow_repo) {
     repo_ = repo;
   }
   // Destructor
   virtual ~ProcessContext() {
   }
   // Get Processor associated with the Process Context
-  ProcessorNode &getProcessorNode() {
+  std::shared_ptr<ProcessorNode> getProcessorNode() {
     return processor_node_;
   }
-  bool getProperty(std::string name, std::string &value) {
-    return processor_node_.getProperty(name, value);
+  bool getProperty(const std::string &name, std::string &value) {
+    return processor_node_->getProperty(name, value);
   }
   // Sets the property value using the property's string name
-  bool setProperty(std::string name, std::string value) {
-    return processor_node_.setProperty(name, value);
+  bool setProperty(const std::string &name, std::string value) {
+    return processor_node_->setProperty(name, value);
   }
   // Sets the property value using the Property object
   bool setProperty(Property prop, std::string value) {
-    return processor_node_.setProperty(prop, value);
+    return processor_node_->setProperty(prop, value);
   }
   // Whether the relationship is supported
   bool isSupportedRelationship(Relationship relationship) {
-    return processor_node_.isSupportedRelationship(relationship);
+    return processor_node_->isSupportedRelationship(relationship);
   }
 
   // Check whether the relationship is auto terminated
   bool isAutoTerminated(Relationship relationship) {
-    return processor_node_.isAutoTerminated(relationship);
+    return processor_node_->isAutoTerminated(relationship);
   }
   // Get ProcessContext Maximum Concurrent Tasks
   uint8_t getMaxConcurrentTasks(void) {
-    return processor_node_.getMaxConcurrentTasks();
+    return processor_node_->getMaxConcurrentTasks();
   }
   // Yield based on the yield period
   void yield() {
-    processor_node_.yield();
+    processor_node_->yield();
   }
 
   std::shared_ptr<core::Repository> getProvenanceRepository() {
@@ -104,7 +104,7 @@ class ProcessContext : public controller::ControllerServiceLookup {
   std::shared_ptr<core::ContentRepository> getContentRepository() {
     return content_repo_;
   }
-  
+
   std::shared_ptr<core::Repository> getFlowFileRepository() {
     return flow_repo_;
   }
@@ -122,7 +122,7 @@ class ProcessContext : public controller::ControllerServiceLookup {
    * identifier
    */
   std::shared_ptr<core::controller::ControllerService> getControllerService(const std::string &identifier) {
-    return controller_service_provider_->getControllerServiceForComponent(identifier, processor_node_.getUUIDStr());
+    return controller_service_provider_->getControllerServiceForComponent(identifier, processor_node_->getUUIDStr());
   }
 
   /**
@@ -167,7 +167,7 @@ class ProcessContext : public controller::ControllerServiceLookup {
   // repository shared pointer.
   std::shared_ptr<core::ContentRepository> content_repo_;
   // Processor
-  ProcessorNode processor_node_;
+  std::shared_ptr<ProcessorNode> processor_node_;
   // Logger
   std::shared_ptr<logging::Logger> logger_;
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/core/ProcessGroup.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/ProcessGroup.h b/libminifi/include/core/ProcessGroup.h
index a0e51e3..907fdfc 100644
--- a/libminifi/include/core/ProcessGroup.h
+++ b/libminifi/include/core/ProcessGroup.h
@@ -144,6 +144,8 @@ class ProcessGroup {
   std::shared_ptr<Processor> findProcessor(uuid_t uuid);
   // findProcessor based on name
   std::shared_ptr<Processor> findProcessor(const std::string &processorName);
+
+  void getAllProcessors(std::vector<std::shared_ptr<Processor>> &processor_vec);
   /**
    * Add controller service
    * @param nodeId node identifier

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/core/ProcessSession.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/ProcessSession.h b/libminifi/include/core/ProcessSession.h
index 3a3b143..5b78022 100644
--- a/libminifi/include/core/ProcessSession.h
+++ b/libminifi/include/core/ProcessSession.h
@@ -47,12 +47,12 @@ class ProcessSession {
   /*!
    * Create a new process session
    */
-  ProcessSession(ProcessContext *processContext = NULL)
+  ProcessSession(std::shared_ptr<ProcessContext> processContext = nullptr)
       : process_context_(processContext),
         logger_(logging::LoggerFactory<ProcessSession>::getLogger()) {
-    logger_->log_trace("ProcessSession created for %s", process_context_->getProcessorNode().getName());
+    logger_->log_trace("ProcessSession created for %s", process_context_->getProcessorNode()->getName());
     auto repo = processContext->getProvenanceRepository();
-    provenance_report_ = new provenance::ProvenanceReporter(repo, process_context_->getProcessorNode().getUUIDStr(), process_context_->getProcessorNode().getName());
+    provenance_report_ = new provenance::ProvenanceReporter(repo, process_context_->getProcessorNode()->getName(), process_context_->getProcessorNode()->getName());
   }
 
 // Destructor
@@ -125,8 +125,9 @@ class ProcessSession {
   void import(std::string source, std::shared_ptr<core::FlowFile> &&flow,
   bool keepSource = true,
               uint64_t offset = 0);
-  void import(std::string source, std::vector<std::shared_ptr<FlowFileRecord>> flows,
-  bool keepSource, uint64_t offset, char inputDelimiter);
+  void import(std::string source, std::vector<std::shared_ptr<FlowFileRecord>> &flows,
+  bool keepSource,
+              uint64_t offset, char inputDelimiter);
 
 // Prevent default copy constructor and assignment operation
 // Only support pass by reference or pointer
@@ -151,7 +152,7 @@ class ProcessSession {
 // Clone the flow file during transfer to multiple connections for a relationship
   std::shared_ptr<core::FlowFile> cloneDuringTransfer(std::shared_ptr<core::FlowFile> &parent);
   // ProcessContext
-  ProcessContext *process_context_;
+  std::shared_ptr<ProcessContext> process_context_;
   // Logger
   std::shared_ptr<logging::Logger> logger_;
   // Provenance Report

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/core/ProcessSessionFactory.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/ProcessSessionFactory.h b/libminifi/include/core/ProcessSessionFactory.h
index e0ebe18..dc4a9f5 100644
--- a/libminifi/include/core/ProcessSessionFactory.h
+++ b/libminifi/include/core/ProcessSessionFactory.h
@@ -38,12 +38,12 @@ class ProcessSessionFactory {
   /*!
    * Create a new process session factory
    */
-  explicit ProcessSessionFactory(ProcessContext *processContext)
+  explicit ProcessSessionFactory(std::shared_ptr<ProcessContext> processContext)
       : process_context_(processContext) {
   }
 
   // Create the session
-  std::unique_ptr<ProcessSession> createSession();
+  std::shared_ptr<ProcessSession> createSession();
 
   // Prevent default copy constructor and assignment operation
   // Only support pass by reference or pointer
@@ -52,7 +52,7 @@ class ProcessSessionFactory {
 
  private:
   // ProcessContext
-  ProcessContext *process_context_;
+  std::shared_ptr<ProcessContext> process_context_;
 
 };
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/core/Processor.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/Processor.h b/libminifi/include/core/Processor.h
index 0853c11..99b1043 100644
--- a/libminifi/include/core/Processor.h
+++ b/libminifi/include/core/Processor.h
@@ -72,6 +72,7 @@ class Processor : public Connectable, public ConfigurableComponent, public std::
   Processor(std::string name, uuid_t uuid = NULL);
   // Destructor
   virtual ~Processor() {
+    notifyStop();
   }
 
   bool isRunning();
@@ -153,7 +154,8 @@ class Processor : public Connectable, public ConfigurableComponent, public std::
   }
   // decrement Active Task Counts
   void decrementActiveTask(void) {
-    active_tasks_--;
+    if (active_tasks_ > 0)
+      active_tasks_--;
   }
   void clearActiveTask(void) {
     active_tasks_ = 0;
@@ -183,7 +185,7 @@ class Processor : public Connectable, public ConfigurableComponent, public std::
     if (yield_expiration_ > curTime)
       return (yield_expiration_ - curTime);
     else
-      return 0;;
+      return 0;
   }
   // Whether flow file queued in incoming connection
   bool flowFilesQueued();
@@ -203,6 +205,9 @@ class Processor : public Connectable, public ConfigurableComponent, public std::
   // Get the Next RoundRobin incoming connection
   std::shared_ptr<Connection> getNextIncomingConnection();
   // On Trigger
+
+  void onTrigger(std::shared_ptr<ProcessContext> context, std::shared_ptr<ProcessSessionFactory> sessionFactory);
+
   void onTrigger(ProcessContext *context, ProcessSessionFactory *sessionFactory);
 
   virtual bool canEdit() {
@@ -212,19 +217,35 @@ class Processor : public Connectable, public ConfigurableComponent, public std::
  public:
 
   // OnTrigger method, implemented by NiFi Processor Designer
+  virtual void onTrigger(std::shared_ptr<ProcessContext> context, std::shared_ptr<ProcessSession> session) {
+    onTrigger(context.get(), session.get());
+  }
   virtual void onTrigger(ProcessContext *context, ProcessSession *session) = 0;
   // Initialize, overridden by NiFi Process Designer
   virtual void initialize() {
   }
   // Scheduled event hook, overridden by NiFi Process Designer
+  virtual void onSchedule(std::shared_ptr<ProcessContext> context, std::shared_ptr<ProcessSessionFactory> sessionFactory) {
+    onSchedule(context.get(), sessionFactory.get());
+  }
   virtual void onSchedule(ProcessContext *context, ProcessSessionFactory *sessionFactory) {
   }
 
   // Check all incoming connections for work
   bool isWorkAvailable();
 
+  void setStreamFactory(std::shared_ptr<minifi::io::StreamFactory> stream_factory) {
+    stream_factory_ = stream_factory;
+  }
+
  protected:
 
+  virtual void notifyStop() {
+
+  }
+
+  std::shared_ptr<minifi::io::StreamFactory> stream_factory_;
+
   // Processor state
   std::atomic<ScheduledState> state_;
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/core/ProcessorNode.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/ProcessorNode.h b/libminifi/include/core/ProcessorNode.h
index 9bf1f52..f420728 100644
--- a/libminifi/include/core/ProcessorNode.h
+++ b/libminifi/include/core/ProcessorNode.h
@@ -35,10 +35,12 @@ namespace core {
  */
 class ProcessorNode : public ConfigurableComponent, public Connectable {
  public:
-  explicit ProcessorNode(const std::shared_ptr<Connectable> processor);
+  explicit ProcessorNode(const std::shared_ptr<Connectable> &processor);
 
   explicit ProcessorNode(const ProcessorNode &other);
 
+  explicit ProcessorNode(const ProcessorNode &&other);
+
   /**
    * Get property using the provided name.
    * @param name property name.
@@ -59,7 +61,7 @@ class ProcessorNode : public ConfigurableComponent, public Connectable {
    * @param value value passed in by reference
    * @return result of getting property.
    */
-  bool getProperty(const std::string name, std::string &value) {
+  bool getProperty(const std::string &name, std::string &value) {
     const std::shared_ptr<ConfigurableComponent> processor_cast = std::dynamic_pointer_cast<ConfigurableComponent>(processor_);
     if (nullptr != processor_cast)
       return processor_cast->getProperty(name, value);
@@ -73,7 +75,7 @@ class ProcessorNode : public ConfigurableComponent, public Connectable {
    * @param value property value.
    * @return result of setting property.
    */
-  bool setProperty(const std::string name, std::string value) {
+  bool setProperty(const std::string &name, std::string value) {
     const std::shared_ptr<ConfigurableComponent> processor_cast = std::dynamic_pointer_cast<ConfigurableComponent>(processor_);
     bool ret = ConfigurableComponent::setProperty(name, value);
     if (nullptr != processor_cast)
@@ -137,7 +139,7 @@ class ProcessorNode : public ConfigurableComponent, public Connectable {
    * Set name.
    * @param name
    */
-  void setName(const std::string name) {
+  void setName(const std::string &name) {
     Connectable::setName(name);
     processor_->setName(name);
   }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/core/Property.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/Property.h b/libminifi/include/core/Property.h
index 278e2ec..9eee1b5 100644
--- a/libminifi/include/core/Property.h
+++ b/libminifi/include/core/Property.h
@@ -162,11 +162,16 @@ class Property {
     }
 
     std::string unit(pEnd);
+    std::transform(unit.begin(), unit.end(), unit.begin(), ::tolower);
 
     if (unit == "sec" || unit == "s" || unit == "second" || unit == "seconds" || unit == "secs") {
       timeunit = SECOND;
       output = ival;
       return true;
+    } else if (unit == "msec" || unit == "ms" || unit == "millisecond" || unit == "milliseconds" || unit == "msecs") {
+      timeunit = MILLISECOND;
+      output = ival;
+      return true;
     } else if (unit == "min" || unit == "m" || unit == "mins" || unit == "minute" || unit == "minutes") {
       timeunit = MINUTE;
       output = ival;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/core/Relationship.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/Relationship.h b/libminifi/include/core/Relationship.h
index 416ede6..d0157d9 100644
--- a/libminifi/include/core/Relationship.h
+++ b/libminifi/include/core/Relationship.h
@@ -59,7 +59,7 @@ class Relationship {
       : name_(UNDEFINED_RELATIONSHIP) {
   }
   // Destructor
-  virtual ~Relationship() {
+  ~Relationship() {
   }
   // Get Name for the relationship
   std::string getName() const {

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/core/Repository.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/Repository.h b/libminifi/include/core/Repository.h
index f1b47ae..cdc81c5 100644
--- a/libminifi/include/core/Repository.h
+++ b/libminifi/include/core/Repository.h
@@ -59,10 +59,11 @@ class Repository : public core::SerializableComponent {
    * Constructor for the repository
    */
   Repository(std::string repo_name = "Repository", std::string directory = REPOSITORY_DIRECTORY, int64_t maxPartitionMillis = MAX_REPOSITORY_ENTRY_LIFE_TIME, int64_t maxPartitionBytes =
-                 MAX_REPOSITORY_STORAGE_SIZE,
+  MAX_REPOSITORY_STORAGE_SIZE,
              uint64_t purgePeriod = REPOSITORY_PURGE_PERIOD)
       : core::SerializableComponent(repo_name),
         thread_(),
+        repo_size_(0),
         logger_(logging::LoggerFactory<Repository>::getLogger()) {
     directory_ = directory;
     max_partition_millis_ = maxPartitionMillis;
@@ -77,6 +78,8 @@ class Repository : public core::SerializableComponent {
     stop();
   }
 
+  virtual void flush();
+
   // initialize
   virtual bool initialize(const std::shared_ptr<Configure> &configure) {
     return true;
@@ -193,6 +196,8 @@ class Repository : public core::SerializableComponent {
 
   }
 
+  virtual uint64_t getRepoSize();
+
   // Prevent default copy constructor and assignment operation
   // Only support pass by reference or pointer
   Repository(const Repository &parent) = delete;
@@ -216,7 +221,7 @@ class Repository : public core::SerializableComponent {
   // whether stop accepting provenace event
   std::atomic<bool> repo_full_;
   // repoSize
-  uint64_t repoSize();
+
   // size of the directory
   std::atomic<uint64_t> repo_size_;
   // Run function for the thread

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/core/StreamManager.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/StreamManager.h b/libminifi/include/core/StreamManager.h
index 468526d..b6d0f0a 100644
--- a/libminifi/include/core/StreamManager.h
+++ b/libminifi/include/core/StreamManager.h
@@ -70,6 +70,19 @@ class StreamManager {
    */
   virtual bool remove(const std::shared_ptr<T> &streamId) = 0;
 
+  /**
+   * Removes an item if it was orphan
+   */
+  virtual bool removeIfOrphaned(const std::shared_ptr<T> &streamId) = 0;
+
+  virtual uint32_t getStreamCount(const std::shared_ptr<T> &streamId) = 0;
+
+  virtual void incrementStreamCount(const std::shared_ptr<T> &streamId) = 0;
+
+  virtual void decrementStreamCount(const std::shared_ptr<T> &streamId) = 0;
+
+  virtual bool exists(const std::shared_ptr<T> &streamId) = 0;
+
 };
 
 } /* namespace core */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/core/controller/ControllerServiceProvider.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/controller/ControllerServiceProvider.h b/libminifi/include/core/controller/ControllerServiceProvider.h
index 148f970..bf02080 100644
--- a/libminifi/include/core/controller/ControllerServiceProvider.h
+++ b/libminifi/include/core/controller/ControllerServiceProvider.h
@@ -18,6 +18,7 @@
 #ifndef LIBMINIFI_INCLUDE_CORE_CONTROLLER_CONTROLLERSERVICEPROVIDER_H_
 #define LIBMINIFI_INCLUDE_CORE_CONTROLLER_CONTROLLERSERVICEPROVIDER_H_
 
+#include <future>
 #include <vector>
 #include "core/Core.h"
 #include "ControllerServiceLookup.h"
@@ -95,7 +96,7 @@ class ControllerServiceProvider : public CoreComponent, public ConfigurableCompo
    * Enables the provided controller service
    * @param serviceNode controller service node.
    */
-  virtual void enableControllerService(std::shared_ptr<ControllerServiceNode> &serviceNode) = 0;
+  virtual std::future<bool> enableControllerService(std::shared_ptr<ControllerServiceNode> &serviceNode) = 0;
 
   /**
    * Enables the provided controller service nodes
@@ -107,7 +108,7 @@ class ControllerServiceProvider : public CoreComponent, public ConfigurableCompo
    * Disables the provided controller service node
    * @param serviceNode controller service node.
    */
-  virtual void disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) = 0;
+  virtual std::future<bool> disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) = 0;
 
   /**
    * Gets a list of all controller services.

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/core/controller/StandardControllerServiceProvider.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/controller/StandardControllerServiceProvider.h b/libminifi/include/core/controller/StandardControllerServiceProvider.h
index f474ed2..4cb33a7 100644
--- a/libminifi/include/core/controller/StandardControllerServiceProvider.h
+++ b/libminifi/include/core/controller/StandardControllerServiceProvider.h
@@ -42,8 +42,7 @@ class StandardControllerServiceProvider : public ControllerServiceProvider, publ
  public:
 
   explicit StandardControllerServiceProvider(std::shared_ptr<ControllerServiceMap> services, std::shared_ptr<ProcessGroup> root_group, std::shared_ptr<Configure> configuration,
-                                             std::shared_ptr<minifi::SchedulingAgent> agent,
-                                             ClassLoader &loader = ClassLoader::getDefaultClassLoader())
+                                             std::shared_ptr<minifi::SchedulingAgent> agent, ClassLoader &loader = ClassLoader::getDefaultClassLoader())
       : ControllerServiceProvider(services),
         root_group_(root_group),
         agent_(agent),
@@ -90,8 +89,7 @@ class StandardControllerServiceProvider : public ControllerServiceProvider, publ
     }
 
     std::shared_ptr<ControllerServiceNode> new_service_node = std::make_shared<StandardControllerServiceNode>(new_controller_service,
-                                                                                                              std::static_pointer_cast<ControllerServiceProvider>(shared_from_this()),
-                                                                                                              id,
+                                                                                                              std::static_pointer_cast<ControllerServiceProvider>(shared_from_this()), id,
                                                                                                               configuration_);
 
     controller_map_->put(id, new_service_node);
@@ -99,9 +97,12 @@ class StandardControllerServiceProvider : public ControllerServiceProvider, publ
 
   }
 
-  void enableControllerService(std::shared_ptr<ControllerServiceNode> &serviceNode) {
+  std::future<bool> enableControllerService(std::shared_ptr<ControllerServiceNode> &serviceNode) {
     if (serviceNode->canEnable()) {
-      agent_->enableControllerService(serviceNode);
+      return agent_->enableControllerService(serviceNode);
+    } else {
+      std::future<bool> no_run = std::async(std::launch::async, []() {return false;});
+      return no_run;
     }
   }
 
@@ -124,9 +125,12 @@ class StandardControllerServiceProvider : public ControllerServiceProvider, publ
     }
   }
 
-  void disableControllerService(std::shared_ptr<ControllerServiceNode> &serviceNode) {
+  std::future<bool> disableControllerService(std::shared_ptr<ControllerServiceNode> &serviceNode) {
     if (!IsNullOrEmpty(serviceNode.get()) && serviceNode->enabled()) {
-      agent_->disableControllerService(serviceNode);
+      return agent_->disableControllerService(serviceNode);
+    } else {
+      std::future<bool> no_run = std::async(std::launch::async, []() {return false;});
+      return no_run;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/core/logging/Logger.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/logging/Logger.h b/libminifi/include/core/logging/Logger.h
index 8b80006..d7d52b6 100644
--- a/libminifi/include/core/logging/Logger.h
+++ b/libminifi/include/core/logging/Logger.h
@@ -115,7 +115,7 @@ class Logger {
   std::shared_ptr<spdlog::logger> delegate_;
 
   std::mutex mutex_;
-   private:
+ private:
   template<typename ... Args>
   inline void log(spdlog::level::level_enum level, const char * const format, const Args& ... args) {
     std::lock_guard<std::mutex> lock(mutex_);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/core/logging/LoggerConfiguration.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/logging/LoggerConfiguration.h b/libminifi/include/core/logging/LoggerConfiguration.h
index 787fec5..fbcdaad 100644
--- a/libminifi/include/core/logging/LoggerConfiguration.h
+++ b/libminifi/include/core/logging/LoggerConfiguration.h
@@ -31,7 +31,6 @@
 #include "core/logging/Logger.h"
 #include "properties/Properties.h"
 
-
 namespace org {
 namespace apache {
 namespace nifi {
@@ -78,7 +77,7 @@ class LoggerProperties : public Properties {
 
   static const char* appender_prefix;
   static const char* logger_prefix;
-   private:
+ private:
   std::map<std::string, std::shared_ptr<spdlog::sinks::sink>> sinks_;
 };
 
@@ -102,11 +101,11 @@ class LoggerConfiguration {
    */
   std::shared_ptr<Logger> getLogger(const std::string &name);
   static const char *spdlog_default_pattern;
-   protected:
+ protected:
   static std::shared_ptr<internal::LoggerNamespace> initialize_namespaces(const std::shared_ptr<LoggerProperties> &logger_properties);
   static std::shared_ptr<spdlog::logger> get_logger(std::shared_ptr<Logger> logger, const std::shared_ptr<internal::LoggerNamespace> &root_namespace, const std::string &name,
                                                     std::shared_ptr<spdlog::formatter> formatter, bool remove_if_present = false);
-   private:
+ private:
   static std::shared_ptr<internal::LoggerNamespace> create_default_root();
 
   class LoggerImpl : public Logger {

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/core/reporting/SiteToSiteProvenanceReportingTask.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/reporting/SiteToSiteProvenanceReportingTask.h b/libminifi/include/core/reporting/SiteToSiteProvenanceReportingTask.h
index 2bd4099..0e4ba81 100644
--- a/libminifi/include/core/reporting/SiteToSiteProvenanceReportingTask.h
+++ b/libminifi/include/core/reporting/SiteToSiteProvenanceReportingTask.h
@@ -53,7 +53,7 @@ class SiteToSiteProvenanceReportingTask : public minifi::RemoteProcessorGroupPor
     batch_size_ = 100;
   }
   //! Destructor
-  virtual ~SiteToSiteProvenanceReportingTask() {
+  ~SiteToSiteProvenanceReportingTask() {
   }
   //! Report Task Name
   static constexpr char const* ReportTaskName = "SiteToSiteProvenanceReportingTask";

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/core/repository/AtomicRepoEntries.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/repository/AtomicRepoEntries.h b/libminifi/include/core/repository/AtomicRepoEntries.h
index c681060..e96f19b 100644
--- a/libminifi/include/core/repository/AtomicRepoEntries.h
+++ b/libminifi/include/core/repository/AtomicRepoEntries.h
@@ -202,8 +202,7 @@ class AtomicEntry {
   bool setRepoValue(RepoValue<T> &new_value, RepoValue<T> &old_value, size_t &prev_size) {
     // delete the underlying pointer
     bool lock = false;
-    if (!write_pending_.compare_exchange_weak(lock, true))
-    {
+    if (!write_pending_.compare_exchange_weak(lock, true)) {
       return false;
     }
     if (has_value_) {
@@ -215,19 +214,17 @@ class AtomicEntry {
     try_unlock();
     return true;
   }
-  
-  
-  AtomicEntry<T> *takeOwnership()
-  {
-      bool lock = false;
-      if (!write_pending_.compare_exchange_weak(lock, true) )
-	return nullptr;
-      
-      ref_count_++;
-      
-      try_unlock();
-      
-      return this;
+
+  AtomicEntry<T> *takeOwnership() {
+    bool lock = false;
+    if (!write_pending_.compare_exchange_weak(lock, true))
+      return nullptr;
+
+    ref_count_++;
+
+    try_unlock();
+
+    return this;
   }
   /**
    * A test and set operation, which is used to allow a function to test
@@ -238,33 +235,29 @@ class AtomicEntry {
   bool testAndSetKey(const T str, std::function<bool(T)> releaseTest = nullptr, std::function<void(T)> reclaimer = nullptr, std::function<bool(T, T)> comparator = nullptr) {
     bool lock = false;
 
-    if (!write_pending_.compare_exchange_weak(lock, true) )
+    if (!write_pending_.compare_exchange_weak(lock, true))
       return false;
 
     if (has_value_) {
       // we either don't have a release test or we cannot release this
       // entity
-      if (releaseTest != nullptr && reclaimer != nullptr && releaseTest(value_.getKey()))
-                                                                        {
+      if (releaseTest != nullptr && reclaimer != nullptr && releaseTest(value_.getKey())) {
         reclaimer(value_.getKey());
-      }
-      else if (free_required && ref_count_ == 0)
-      {
-	size_t bufferSize = value_.getBufferSize();
-	value_.clearBuffer();
-	has_value_ = false;
-	if (accumulated_repo_size_ != nullptr) {
-	  *accumulated_repo_size_ -= bufferSize;
-	}
-	free_required = false;  
-      }
-      else {
+      } else if (free_required && ref_count_ == 0) {
+        size_t bufferSize = value_.getBufferSize();
+        value_.clearBuffer();
+        has_value_ = false;
+        if (accumulated_repo_size_ != nullptr) {
+          *accumulated_repo_size_ -= bufferSize;
+        }
+        free_required = false;
+      } else {
         try_unlock();
         return false;
       }
 
     }
-    ref_count_=1;
+    ref_count_ = 1;
     value_.setKey(str, comparator);
     has_value_ = true;
     try_unlock();
@@ -308,27 +301,25 @@ class AtomicEntry {
     try_unlock();
     return true;
   }
-  
-  void decrementOwnership(){
+
+  void decrementOwnership() {
     try_lock();
     if (!has_value_) {
       try_unlock();
       return;
     }
-    if (ref_count_ > 0){
+    if (ref_count_ > 0) {
       ref_count_--;
     }
-    if (ref_count_ == 0 && free_required)
-    {
+    if (ref_count_ == 0 && free_required) {
       size_t bufferSize = value_.getBufferSize();
       value_.clearBuffer();
       has_value_ = false;
       if (accumulated_repo_size_ != nullptr) {
-	*accumulated_repo_size_ -= bufferSize;
+        *accumulated_repo_size_ -= bufferSize;
       }
       free_required = false;
-    }
-    else{
+    } else {
     }
     try_unlock();
   }
@@ -382,15 +373,14 @@ class AtomicEntry {
     try_unlock();
     return ref;
   }
-  
-  size_t getLength()
-  {
+
+  size_t getLength() {
     size_t size = 0;
-     try_lock();
-     size = value_.getBufferSize();
-     try_unlock();
-     return size;
-     
+    try_lock();
+    size = value_.getBufferSize();
+    try_unlock();
+    return size;
+
   }
 
   /**
@@ -407,11 +397,10 @@ class AtomicEntry {
       try_unlock();
       return false;
     }
-    if (ref_count_ > 0)
-    {
-       free_required = true;
-       try_unlock();
-       return true;
+    if (ref_count_ > 0) {
+      free_required = true;
+      try_unlock();
+      return true;
     }
     size_t bufferSize = value_.getBufferSize();
     value_.clearBuffer();
@@ -460,7 +449,7 @@ class AtomicEntry {
    */
   inline void try_lock() {
     bool lock = false;
-    while (!write_pending_.compare_exchange_weak(lock, true,std::memory_order_acquire)) {
+    while (!write_pending_.compare_exchange_weak(lock, true, std::memory_order_acquire)) {
       lock = false;
       // attempt again
     }
@@ -471,7 +460,7 @@ class AtomicEntry {
    */
   inline void try_unlock() {
     bool lock = true;
-    while (!write_pending_.compare_exchange_weak(lock, false,std::memory_order_acquire)) {
+    while (!write_pending_.compare_exchange_weak(lock, false, std::memory_order_acquire)) {
       lock = true;
       // attempt again
     }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/core/repository/FileSystemRepository.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/repository/FileSystemRepository.h b/libminifi/include/core/repository/FileSystemRepository.h
index 84bf01e..4a5ad5e 100644
--- a/libminifi/include/core/repository/FileSystemRepository.h
+++ b/libminifi/include/core/repository/FileSystemRepository.h
@@ -47,6 +47,8 @@ class FileSystemRepository : public core::ContentRepository, public core::CoreCo
 
   virtual void stop();
 
+  bool exists(const std::shared_ptr<minifi::ResourceClaim> &streamId);
+
   virtual std::shared_ptr<io::BaseStream> write(const std::shared_ptr<minifi::ResourceClaim> &claim);
 
   virtual std::shared_ptr<io::BaseStream> read(const std::shared_ptr<minifi::ResourceClaim> &claim);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/core/repository/FlowFileRepository.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/repository/FlowFileRepository.h b/libminifi/include/core/repository/FlowFileRepository.h
index 28b9c05..f2691ac 100644
--- a/libminifi/include/core/repository/FlowFileRepository.h
+++ b/libminifi/include/core/repository/FlowFileRepository.h
@@ -26,6 +26,7 @@
 #include "core/Core.h"
 #include "Connection.h"
 #include "core/logging/LoggerConfiguration.h"
+#include "concurrentqueue.h"
 
 namespace org {
 namespace apache {
@@ -61,6 +62,8 @@ class FlowFileRepository : public core::Repository, public std::enable_shared_fr
       delete db_;
   }
 
+  virtual void flush();
+
   // initialize
   virtual bool initialize(const std::shared_ptr<Configure> &configure) {
     std::string value;
@@ -98,7 +101,7 @@ class FlowFileRepository : public core::Repository, public std::enable_shared_fr
     // persistent to the DB
     leveldb::Slice value((const char *) buf, bufLen);
     leveldb::Status status;
-    repo_size_+=bufLen;
+    repo_size_ += bufLen;
     status = db_->Put(leveldb::WriteOptions(), key, value);
     if (status.ok())
       return true;
@@ -111,20 +114,16 @@ class FlowFileRepository : public core::Repository, public std::enable_shared_fr
    * @return status of the delete operation
    */
   virtual bool Delete(std::string key) {
-    leveldb::Status status;
-    status = db_->Delete(leveldb::WriteOptions(), key);
-    if (status.ok())
-    {
-      return true;
-    }
-    else
-      return false;
+    keys_to_delete.enqueue(key);
+    return true;
   }
   /**
    * Sets the value from the provided key
    * @return status of the get operation.
    */
   virtual bool Get(const std::string &key, std::string &value) {
+    if (db_ == nullptr)
+      return false;
     leveldb::Status status;
     status = db_->Get(leveldb::ReadOptions(), key, &value);
     if (status.ok())
@@ -139,12 +138,10 @@ class FlowFileRepository : public core::Repository, public std::enable_shared_fr
   virtual void loadComponent(const std::shared_ptr<core::ContentRepository> &content_repo);
 
   void start() {
-    if (this->purge_period_ <= 0)
-    {
+    if (this->purge_period_ <= 0) {
       return;
     }
-    if (running_)
-    {
+    if (running_) {
       return;
     }
     running_ = true;
@@ -153,6 +150,7 @@ class FlowFileRepository : public core::Repository, public std::enable_shared_fr
   }
 
  private:
+  moodycamel::ConcurrentQueue<std::string> keys_to_delete;
   std::map<std::string, std::shared_ptr<minifi::Connection>> connectionMap;
   std::shared_ptr<core::ContentRepository> content_repo_;
   leveldb::DB* db_;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/core/repository/VolatileContentRepository.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/repository/VolatileContentRepository.h b/libminifi/include/core/repository/VolatileContentRepository.h
index 306a812..8507216 100644
--- a/libminifi/include/core/repository/VolatileContentRepository.h
+++ b/libminifi/include/core/repository/VolatileContentRepository.h
@@ -50,8 +50,7 @@ class VolatileContentRepository : public core::ContentRepository, public core::r
   virtual ~VolatileContentRepository() {
     if (!minimize_locking_) {
       std::lock_guard<std::mutex> lock(map_mutex_);
-      for (const auto &item : master_list_)
-      {
+      for (const auto &item : master_list_) {
         delete item.second;
       }
       master_list_.clear();
@@ -84,6 +83,8 @@ class VolatileContentRepository : public core::ContentRepository, public core::r
    */
   virtual std::shared_ptr<io::BaseStream> read(const std::shared_ptr<minifi::ResourceClaim> &claim);
 
+  virtual bool exists(const std::shared_ptr<minifi::ResourceClaim> &streamId);
+
   /**
    * Closes the claim.
    * @return whether or not the claim is associated with content stored in volatile memory.
@@ -104,8 +105,7 @@ class VolatileContentRepository : public core::ContentRepository, public core::r
   virtual void run();
 
   template<typename T2>
-  std::shared_ptr<T2> shared_from_parent()
-  {
+  std::shared_ptr<T2> shared_from_parent() {
     return std::static_pointer_cast<T2>(shared_from_this());
   }
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/core/repository/VolatileFlowFileRepository.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/repository/VolatileFlowFileRepository.h b/libminifi/include/core/repository/VolatileFlowFileRepository.h
index 059c1de..0e75580 100644
--- a/libminifi/include/core/repository/VolatileFlowFileRepository.h
+++ b/libminifi/include/core/repository/VolatileFlowFileRepository.h
@@ -31,8 +31,7 @@ namespace repository {
  * Volatile flow file repository. keeps a running counter of the current location, freeing
  * those which we no longer hold.
  */
-class VolatileFlowFileRepository : public VolatileRepository<std::string>
-{
+class VolatileFlowFileRepository : public VolatileRepository<std::string> {
  public:
   explicit VolatileFlowFileRepository(std::string repo_name = "", std::string dir = REPOSITORY_DIRECTORY, int64_t maxPartitionMillis = MAX_REPOSITORY_ENTRY_LIFE_TIME, int64_t maxPartitionBytes =
   MAX_REPOSITORY_STORAGE_SIZE,
@@ -48,11 +47,9 @@ class VolatileFlowFileRepository : public VolatileRepository<std::string>
     repo_full_ = false;
     while (running_) {
       std::this_thread::sleep_for(std::chrono::milliseconds(purge_period_));
-      if (purge_required_ && nullptr != content_repo_)
-          {
+      if (purge_required_ && nullptr != content_repo_) {
         std::lock_guard<std::mutex> lock(purge_mutex_);
-        for (auto purgeItem : purge_list_)
-        {
+        for (auto purgeItem : purge_list_) {
           std::shared_ptr<minifi::ResourceClaim> newClaim = std::make_shared<minifi::ResourceClaim>(purgeItem, content_repo_, true);
           content_repo_->remove(newClaim);
         }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/core/repository/VolatileProvenanceRepository.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/repository/VolatileProvenanceRepository.h b/libminifi/include/core/repository/VolatileProvenanceRepository.h
index 7397751..2717510 100644
--- a/libminifi/include/core/repository/VolatileProvenanceRepository.h
+++ b/libminifi/include/core/repository/VolatileProvenanceRepository.h
@@ -30,8 +30,7 @@ namespace repository {
 /**
  * Volatile provenance repository.
  */
-class VolatileProvenanceRepository : public VolatileRepository<std::string>
-{
+class VolatileProvenanceRepository : public VolatileRepository<std::string> {
 
  public:
   explicit VolatileProvenanceRepository(std::string repo_name = "", std::string dir = REPOSITORY_DIRECTORY, int64_t maxPartitionMillis = MAX_REPOSITORY_ENTRY_LIFE_TIME, int64_t maxPartitionBytes =

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/core/repository/VolatileRepository.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/repository/VolatileRepository.h b/libminifi/include/core/repository/VolatileRepository.h
index 958d91a..da6608c 100644
--- a/libminifi/include/core/repository/VolatileRepository.h
+++ b/libminifi/include/core/repository/VolatileRepository.h
@@ -240,11 +240,10 @@ bool VolatileRepository<T>::Put(T key, const uint8_t *buf, size_t bufLen) {
         continue;
       }
     }
-    
+
     updated = value_vector_.at(private_index)->setRepoValue(new_value, old_value, reclaimed_size);
-    logger_->log_debug("Set repo value at %d out of %d updated %d current_size %d, adding %d to  %d", private_index, max_count_,updated==true,reclaimed_size,size, current_size_.load());
-    if (updated && reclaimed_size > 0)
-    {
+    logger_->log_debug("Set repo value at %d out of %d updated %d current_size %d, adding %d to  %d", private_index, max_count_, updated == true, reclaimed_size, size, current_size_.load());
+    if (updated && reclaimed_size > 0) {
       std::lock_guard<std::mutex> lock(mutex_);
       purge_list_.push_back(old_value.getKey());
     }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/core/state/ProcessorController.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/state/ProcessorController.h b/libminifi/include/core/state/ProcessorController.h
new file mode 100644
index 0000000..de7c673
--- /dev/null
+++ b/libminifi/include/core/state/ProcessorController.h
@@ -0,0 +1,73 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CORE_STATE_PROCESSORCONTROLLER_H_
+#define LIBMINIFI_INCLUDE_CORE_STATE_PROCESSORCONTROLLER_H_
+
+#include <memory>
+#include "core/Processor.h"
+#include "SchedulingAgent.h"
+#include "UpdateController.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace state {
+
+/**
+ * Purpose, Justification, & Design: ProcessController is the state control mechanism for processors.
+ * This is coupled with the scheduler. Since scheduling agents run processors, we must ensure state
+ * is set in the processor to enable it to run, after which the scheduling agent will then be allowed
+ * to run the aforementioned processor.
+ */
+class ProcessorController : public StateController {
+ public:
+
+  ProcessorController(const std::shared_ptr<core::Processor> &processor, const std::shared_ptr<SchedulingAgent> &scheduler);
+
+  virtual ~ProcessorController();
+
+  std::string getComponentName() {
+    return processor_->getName();
+  }
+  /**
+   * Start the client
+   */
+  virtual int16_t start();
+  /**
+   * Stop the client
+   */
+  virtual int16_t stop(bool force, uint64_t timeToWait = 0);
+
+  virtual bool isRunning();
+
+  virtual int16_t pause();
+
+ protected:
+  std::shared_ptr<core::Processor> processor_;
+  std::shared_ptr<SchedulingAgent> scheduler_;
+};
+
+} /* namespace state */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_CORE_STATE_PROCESSORCONTROLLER_H_ */
+

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/core/state/StateManager.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/state/StateManager.h b/libminifi/include/core/state/StateManager.h
new file mode 100644
index 0000000..412b02e
--- /dev/null
+++ b/libminifi/include/core/state/StateManager.h
@@ -0,0 +1,126 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_STATE_MANAGER_H
+#define LIBMINIFI_INCLUDE_STATE_MANAGER_H
+
+#include <map>
+#include <atomic>
+#include <algorithm>
+
+#include "core/state/metrics/MetricsBase.h"
+#include "core/state/metrics/MetricsListener.h"
+#include "UpdateController.h"
+#include "io/validation.h"
+#include "utils/ThreadPool.h"
+#include "core/Core.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace state {
+
+/**
+ * State manager is meant to be used by implementing agents of this library. It represents the source and sink for metrics,
+ * the sink for external updates, and encapsulates the thread pool that runs the listeners for various update operations
+ * that can be performed.
+ */
+class StateManager : public metrics::MetricsReporter, public metrics::MetricsSink, public StateMonitor, public std::enable_shared_from_this<StateManager> {
+ public:
+
+  StateManager()
+      : metrics_listener_(nullptr) {
+
+  }
+
+  virtual ~StateManager() {
+
+  }
+
+  /**
+   * Initializes the thread pools.
+   */
+  void initialize();
+
+  /**
+   * State management operations.
+   */
+  /**
+   * Stop this controllable.
+   * @param force force stopping
+   * @param timeToWait time to wait before giving up.
+   * @return status of stopping this controller.
+   */
+  virtual int16_t stop(bool force, uint64_t timeToWait = 0);
+
+  /**
+   * Updates the given flow controller.
+   */
+  int16_t update(const std::shared_ptr<Update> &updateController);
+
+  /**
+   * Passes metrics to the update controllers if they are a metrics sink.
+   * @param metrics metric to pass through
+   */
+  int16_t setMetrics(const std::shared_ptr<metrics::Metrics> &metrics);
+
+  /**
+   * Metrics operations
+   */
+  virtual int16_t getMetrics(std::vector<std::shared_ptr<metrics::Metrics>> &metric_vector, uint16_t metricsClass);
+
+ protected:
+
+  /**
+   * Function to apply updates for a given  update controller.
+   * @param updateController update controller mechanism.
+   */
+  virtual int16_t applyUpdate(const std::shared_ptr<Update> &updateController) = 0;
+
+  /**
+   * Registers and update controller
+   * @param updateController update controller to add.
+   */
+  bool registerUpdateListener(const std::shared_ptr<UpdateController> &updateController);
+
+  /**
+   * Base metrics function will employ the default metrics listener.
+   */
+  virtual bool startMetrics();
+
+ private:
+
+  std::timed_mutex mutex_;
+
+  std::map<std::string, std::shared_ptr<metrics::Metrics>> metrics_maps_;
+
+  std::vector<std::shared_ptr<UpdateController> > updateControllers;
+
+  std::unique_ptr<state::metrics::MetricsListener> metrics_listener_;
+
+  utils::ThreadPool<Update> listener_thread_pool_;
+
+};
+
+} /* namespace state */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_C2_CONTROLLABLE_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/core/state/UpdateController.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/state/UpdateController.h b/libminifi/include/core/state/UpdateController.h
new file mode 100644
index 0000000..9d4d2f6
--- /dev/null
+++ b/libminifi/include/core/state/UpdateController.h
@@ -0,0 +1,252 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_UPDATECONTROLLER_H_
+#define LIBMINIFI_INCLUDE_UPDATECONTROLLER_H_
+
+#include <string>
+#include "utils/ThreadPool.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace state {
+
+enum class UpdateState {
+  INITIATE,
+  FULLY_APPLIED,
+  READ_COMPLETE,
+  PARTIALLY_APPLIED,
+  NOT_APPLIED,
+  SET_ERROR,
+  READ_ERROR,
+  NESTED  // multiple updates embedded into one
+
+};
+
+/**
+ * Represents the status of an update operation.
+ *
+ */
+class UpdateStatus {
+ public:
+  UpdateStatus(UpdateState state, int16_t reason = 0);
+
+  UpdateStatus(const UpdateStatus &other);
+
+  UpdateStatus(const UpdateStatus &&other);
+
+  UpdateState getState() const;
+
+  std::string getError() const;
+
+  int16_t getReadonCode() const;
+
+  UpdateStatus &operator=(const UpdateStatus &&other);
+
+  UpdateStatus &operator=(const UpdateStatus &other);
+ private:
+  UpdateState state_;
+  std::string error_;
+  int16_t reason_;
+};
+
+class Update {
+ public:
+
+  Update(UpdateStatus status)
+      : status_(status) {
+
+  }
+
+  Update(const Update &other)
+      : status_(other.status_) {
+
+  }
+
+  Update(const Update &&other)
+      : status_(std::move(other.status_)) {
+
+  }
+
+  virtual ~Update() {
+
+  }
+
+  virtual bool validate() {
+    return true;
+  }
+
+  const UpdateStatus &getStatus() const {
+    return status_;
+  }
+
+  Update &operator=(const Update &&other) {
+    status_ = std::move(other.status_);
+    return *this;
+  }
+
+  Update &operator=(const Update &other) {
+    status_ = other.status_;
+    return *this;
+  }
+
+ protected:
+  UpdateStatus status_;
+};
+
+/**
+ * Justification and Purpose: Update Runner reflects the post execution functors that determine if
+ * a given function that is running within a thread pool worker needs to end.
+ *
+ * Design: Simply implements isFinished and isCancelled, which it receives by way of the AfterExecute
+ * class.
+ */
+class UpdateRunner : public utils::AfterExecute<Update> {
+ public:
+  explicit UpdateRunner(std::atomic<bool> &running)
+      : running_(&running) {
+  }
+
+  explicit UpdateRunner(UpdateRunner && other)
+      : running_(std::move(other.running_)) {
+
+  }
+
+  ~UpdateRunner() {
+
+  }
+
+  virtual bool isFinished(const Update &result) {
+    if ((result.getStatus().getState() == UpdateState::FULLY_APPLIED || result.getStatus().getState() == UpdateState::READ_COMPLETE) && *running_) {
+      return false;
+    } else {
+      return true;
+    }
+  }
+  virtual bool isCancelled(const Update &result) {
+    return !*running_;
+  }
+
+ protected:
+
+  std::atomic<bool> *running_;
+
+};
+
+class StateController {
+ public:
+
+  virtual ~StateController() {
+
+  }
+
+  virtual std::string getComponentName() = 0;
+  /**
+   * Start the client
+   */
+  virtual int16_t start() = 0;
+  /**
+   * Stop the client
+   */
+  virtual int16_t stop(bool force, uint64_t timeToWait = 0) = 0;
+
+  virtual bool isRunning() = 0;
+
+  virtual int16_t pause() = 0;
+};
+
+/**
+ * Justification and Purpose: Update sink is an abstract class with most functions being purely virtual.
+ * This is meant to reflect the operational sink for the state of the client. Functions, below, represent
+ * a small and tight interface for operations that can be performed from external controllers.
+ *
+ */
+class StateMonitor : public StateController {
+ public:
+  virtual ~StateMonitor() {
+
+  }
+
+  std::atomic<bool> &isStateMonitorRunning() {
+    return controller_running_;
+  }
+
+  virtual std::vector<std::shared_ptr<StateController>> getComponents(const std::string &name) = 0;
+
+  virtual std::vector<std::shared_ptr<StateController>> getAllComponents() = 0;
+  /**
+   * Operational controllers
+   */
+
+  /**
+   * Drain repositories
+   */
+  virtual int16_t drainRepositories() = 0;
+
+  /**
+   * Clear connection for the agent.
+   */
+  virtual int16_t clearConnection(const std::string &connection) = 0;
+
+  /**
+   * Apply an update with the provided string.
+   *
+   * < 0 is an error code
+   * 0 is success
+   */
+  virtual int16_t applyUpdate(const std::string &configuration) = 0;
+
+  /**
+   * Apply an update that the agent must decode. This is useful for certain operations
+   * that can't be encapsulated within these definitions.
+   */
+  virtual int16_t applyUpdate(const std::shared_ptr<Update> &updateController) = 0;
+
+  /**
+   * Returns uptime for this module.
+   * @return uptime for the current state monitor.
+   */
+  virtual uint64_t getUptime() = 0;
+
+ protected:
+  std::atomic<bool> controller_running_;
+};
+
+/**
+ * Asks: what is being updated, what can be updated without a restart
+ * what requires a restart, etc.
+ */
+class UpdateController {
+ public:
+
+  virtual std::vector<std::function<Update()>> getFunctions() = 0;
+
+  virtual ~UpdateController() {
+
+  }
+
+};
+
+} /* namespace state */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_C2_UPDATECONTROLLER_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/core/state/metrics/DeviceInformation.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/state/metrics/DeviceInformation.h b/libminifi/include/core/state/metrics/DeviceInformation.h
new file mode 100644
index 0000000..9579877
--- /dev/null
+++ b/libminifi/include/core/state/metrics/DeviceInformation.h
@@ -0,0 +1,319 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CORE_STATE_METRICS_DEVICEINFORMATION_H_
+#define LIBMINIFI_INCLUDE_CORE_STATE_METRICS_DEVICEINFORMATION_H_
+
+#include "core/Resource.h"
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <functional>
+#include <sys/ioctl.h>
+#if ( defined(__APPLE__) || defined(__MACH__) || defined(BSD)) 
+#include <net/if_dl.h>
+#include <net/if_types.h>
+#endif
+#include <ifaddrs.h>
+#include <net/if.h> 
+#include <unistd.h>
+#include <netinet/in.h>
+#include <string.h>
+#include <sys/socket.h>
+#include <netdb.h>
+#include <ifaddrs.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <sstream>
+#include <map>
+#include "MetricsBase.h"
+#include "Connection.h"
+#include "io/ClientSocket.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace state {
+namespace metrics {
+
+class Device {
+ public:
+  Device() {
+    initialize();
+  }
+  void initialize() {
+    struct sockaddr_in servAddr;
+
+    addrinfo hints = { sizeof(addrinfo) };
+    memset(&hints, 0, sizeof hints);  // make sure the struct is empty
+    hints.ai_family = AF_UNSPEC;
+    hints.ai_socktype = SOCK_STREAM;
+    hints.ai_flags = AI_CANONNAME;
+    hints.ai_protocol = 0; /* any protocol */
+
+    char hostname[1024];
+    hostname[1023] = '\0';
+    gethostname(hostname, 1023);
+
+    std::ifstream device_id_file(".device_id");
+    if (device_id_file) {
+      std::string line;
+      while (device_id_file) {
+        if (std::getline(device_id_file, line))
+          device_id_ += line;
+      }
+      device_id_file.close();
+    } else {
+      device_id_ = getDeviceId();
+
+      std::ofstream outputFile(".device_id");
+      if (outputFile) {
+        outputFile.write(device_id_.c_str(), device_id_.length());
+      }
+      outputFile.close();
+    }
+
+    canonical_hostname_ = hostname;
+
+    std::stringstream ips;
+    for (auto ip : getIpAddresses()) {
+      if (ip.find("127") == 0 || ip.find("192") == 0)
+        continue;
+      ip_ = ip;
+      break;
+    }
+
+  }
+
+  std::string canonical_hostname_;
+  std::string ip_;
+  std::string device_id_;
+ protected:
+
+  std::vector<std::string> getIpAddresses() {
+    std::vector<std::string> ips;
+    struct ifaddrs *ifaddr, *ifa;
+    if (getifaddrs(&ifaddr) == -1) {
+      perror("getifaddrs");
+      exit(EXIT_FAILURE);
+    }
+
+    for (ifa = ifaddr; ifa != NULL; ifa = ifa->ifa_next) {
+      if ((strcmp("lo", ifa->ifa_name) == 0) || !(ifa->ifa_flags & (IFF_RUNNING)))
+        continue;
+      if ((ifa->ifa_addr != NULL) && (ifa->ifa_addr->sa_family == AF_INET)) {
+        ips.push_back(inet_ntoa(((struct sockaddr_in *) ifa->ifa_addr)->sin_addr));
+      }
+    }
+
+    freeifaddrs(ifaddr);
+    return ips;
+  }
+
+#if __linux__
+  std::string getDeviceId() {
+
+    std::hash<std::string> hash_fn;
+    std::string macs;
+    struct ifaddrs *ifaddr, *ifa;
+    int family, s, n;
+    char host[NI_MAXHOST];
+
+    if (getifaddrs(&ifaddr) == -1) {
+      exit(EXIT_FAILURE);
+    }
+
+    /* Walk through linked list, maintaining head pointer so we
+     can free list later */
+    for (ifa = ifaddr, n = 0; ifa != NULL; ifa = ifa->ifa_next, n++) {
+      if (ifa->ifa_addr == NULL)
+      continue;
+
+      family = ifa->ifa_addr->sa_family;
+
+      /* Display interface name and family (including symbolic
+       form of the latter for the common families) */
+
+      /* For an AF_INET* interface address, display the address */
+
+      if (family == AF_INET || family == AF_INET6) {
+        s = getnameinfo(ifa->ifa_addr,
+            (family == AF_INET) ? sizeof(struct sockaddr_in) :
+            sizeof(struct sockaddr_in6),
+            host, NI_MAXHOST,
+            NULL, 0, NI_NUMERICHOST);
+        if (s != 0) {
+          printf("getnameinfo() failed: %s\n", gai_strerror(s));
+          exit(EXIT_FAILURE);
+        }
+
+      }
+    }
+
+    freeifaddrs(ifaddr);
+
+    int sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_IP);
+    struct ifreq ifr;
+    struct ifconf ifc;
+    char buf[1024];
+    int success = 0;
+    ifc.ifc_len = sizeof(buf);
+    ifc.ifc_buf = buf;
+    if (ioctl(sock, SIOCGIFCONF, &ifc) == -1) { /* handle error */}
+
+    struct ifreq* it = ifc.ifc_req;
+    const struct ifreq* const end = it + (ifc.ifc_len / sizeof(struct ifreq));
+
+    for (; it != end; ++it) {
+      strcpy(ifr.ifr_name, it->ifr_name);
+      if (ioctl(sock, SIOCGIFFLAGS, &ifr) == 0) {
+        if (! (ifr.ifr_flags & IFF_LOOPBACK)) {  // don't count loopback
+          if (ioctl(sock, SIOCGIFHWADDR, &ifr) == 0) {
+            unsigned char mac[6];
+
+            memcpy(mac, ifr.ifr_hwaddr.sa_data, 6);
+
+            char mac_add[13];
+            snprintf(mac_add,13,"%02X%02X%02X%02X%02X%02X",mac[0], mac[1], mac[2], mac[3], mac[4], mac[5] );
+
+            macs+= mac_add;
+          }
+        }
+
+      }
+      else { /* handle error */}
+    }
+
+    close(sock);
+
+    return std::to_string(hash_fn(macs));
+  }
+#elif( defined(__unix__) || defined(__APPLE__) || defined(__MACH__) || defined(BSD))  // should work on bsd variants as well
+  std::string getDeviceId() {
+    ifaddrs* iflist;
+    bool found = false;
+    std::hash<std::string> hash_fn;
+    std::set<std::string> macs;
+
+    if (getifaddrs(&iflist) == 0) {
+      for (ifaddrs* cur = iflist; cur; cur = cur->ifa_next) {
+        if (cur->ifa_addr && (cur->ifa_addr->sa_family == AF_LINK) && ((sockaddr_dl*) cur->ifa_addr)->sdl_alen) {
+          sockaddr_dl* sdl = (sockaddr_dl*) cur->ifa_addr;
+
+          if (sdl->sdl_type != IFT_ETHER) {
+            continue;
+          } else {
+          }
+          char mac[32];
+          memcpy(mac, LLADDR(sdl), sdl->sdl_alen);
+          char mac_add[13];
+          snprintf(mac_add, 13, "%02X%02X%02X%02X%02X%02X", mac[0], mac[1], mac[2], mac[3], mac[4], mac[5]);
+          ///macs += mac_add;
+          macs.insert(mac_add);
+        } else if (cur->ifa_addr->sa_family == AF_INET) {
+          struct sockaddr_in* sockInfoIP = (struct sockaddr_in*) cur->ifa_addr;
+        }
+      }
+
+      freeifaddrs(iflist);
+    }
+    std::string macstr;
+    for (auto &mac : macs) {
+      macstr += mac;
+    }
+    return macstr.length() > 0 ? std::to_string(hash_fn(macstr)) : "8675309";
+  }
+#else
+  std::string getDeviceId() {
+    return "NaD";
+  }
+#endif
+
+  // connection information
+  int32_t socket_file_descriptor_;
+
+  addrinfo *addr_info_;
+};
+
+/**
+ * Justification and Purpose: Provides Device Information
+ */
+class DeviceInformation : public DeviceMetric {
+ public:
+
+  DeviceInformation(std::string name, uuid_t uuid)
+      : DeviceMetric(name, uuid) {
+    static Device device;
+    hostname_ = device.canonical_hostname_;
+    ip_ = device.ip_;
+    device_id_ = device.device_id_;
+  }
+
+  DeviceInformation(const std::string &name)
+      : DeviceMetric(name, 0) {
+    static Device device;
+    hostname_ = device.canonical_hostname_;
+    ip_ = device.ip_;
+    device_id_ = device.device_id_;
+  }
+
+  std::string getName() {
+    return "NetworkInfo";
+  }
+
+  std::vector<MetricResponse> serialize() {
+    std::vector<MetricResponse> serialized;
+
+    MetricResponse hostname;
+    hostname.name = "hostname";
+    hostname.value = hostname_;
+
+    MetricResponse ip;
+    ip.name = "ip";
+    ip.value = ip_;
+
+    serialized.push_back(hostname);
+    serialized.push_back(ip);
+
+    MetricResponse device_id;
+    device_id.name = "deviceid";
+    device_id.value = device_id_;
+
+    serialized.push_back(device_id);
+
+    return serialized;
+  }
+
+ protected:
+
+  std::string hostname_;
+  std::string ip_;
+  std::string device_id_;
+};
+
+REGISTER_RESOURCE(DeviceInformation);
+
+} /* namespace metrics */
+} /* namespace state */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_CORE_STATE_METRICS_DEVICEINFORMATION_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/core/state/metrics/MetricsBase.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/state/metrics/MetricsBase.h b/libminifi/include/core/state/metrics/MetricsBase.h
new file mode 100644
index 0000000..cc96298
--- /dev/null
+++ b/libminifi/include/core/state/metrics/MetricsBase.h
@@ -0,0 +1,161 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_METRICS_METRICSBASE_H_
+#define LIBMINIFI_INCLUDE_METRICS_METRICSBASE_H_
+
+#include <vector>
+#include <memory>
+#include <string>
+#include "core/Core.h"
+#include "core/Connectable.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace state {
+namespace metrics {
+
+struct MetricResponse {
+  std::string name;
+  std::string value;
+  std::vector<MetricResponse> children;
+  MetricResponse &operator=(const MetricResponse &other) {
+    name = other.name;
+    value = other.value;
+    children = other.children;
+    return *this;
+  }
+};
+
+/**
+ * Purpose: Defines a metric. Serialization is intended to be thread safe.
+ */
+class Metrics : public core::Connectable {
+ public:
+  Metrics()
+      : core::Connectable("metric", 0) {
+  }
+
+  Metrics(std::string name, uuid_t uuid)
+      : core::Connectable(name, uuid) {
+  }
+  virtual ~Metrics() {
+
+  }
+  virtual std::string getName() = 0;
+
+  virtual std::vector<MetricResponse> serialize() = 0;
+
+  virtual void yield() {
+  }
+  virtual bool isRunning() {
+    return true;
+  }
+  virtual bool isWorkAvailable() {
+    return true;
+  }
+
+};
+
+/**
+ * Purpose: Defines a metric that
+ */
+class DeviceMetric : public Metrics {
+ public:
+  DeviceMetric(std::string name, uuid_t uuid)
+      : Metrics(name, uuid) {
+  }
+};
+
+/**
+ * Purpose: Retrieves Metrics from the defined class. The current Metric, which is a consumable for any reader of Metrics must have the ability to set metrics.
+ *
+ */
+class MetricsSource {
+ public:
+
+  MetricsSource() {
+
+  }
+
+  virtual ~MetricsSource() {
+  }
+
+  /**
+   * Retrieves all metrics from this source.
+   * @param metric_vector -- metrics will be placed in this vector.
+   * @return result of the get operation.
+   *  0 Success
+   *  1 No error condition, but cannot obtain lock in timely manner.
+   *  -1 failure
+   */
+  virtual int16_t getMetrics(std::vector<std::shared_ptr<Metrics>> &metric_vector) = 0;
+
+};
+
+class MetricsReporter {
+ public:
+
+  MetricsReporter() {
+
+  }
+
+  virtual ~MetricsReporter() {
+  }
+
+  /**
+   * Retrieves all metrics from this source.
+   * @param metric_vector -- metrics will be placed in this vector.
+   * @return result of the get operation.
+   *  0 Success
+   *  1 No error condition, but cannot obtain lock in timely manner.
+   *  -1 failure
+   */
+  virtual int16_t getMetrics(std::vector<std::shared_ptr<Metrics>> &metric_vector, uint8_t metricsClass) = 0;
+
+};
+
+/**
+ * Purpose: Sink interface for all metrics. The current Metric, which is a consumable for any reader of Metrics must have the ability to set metrics.
+ *
+ */
+class MetricsSink {
+ public:
+
+  virtual ~MetricsSink() {
+  }
+  /**
+   * Setter for metrics in this sink.
+   * @param metrics metrics to insert into the current sink.
+   * @return result of the set operation.
+   *  0 Success
+   *  1 No error condition, but cannot obtain lock in timely manner.
+   *  -1 failure
+   */
+  virtual int16_t setMetrics(const std::shared_ptr<Metrics> &metrics) = 0;
+};
+
+} /* namespace metrics */
+} /* namespace state */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_C2_METRICS_METRICSBASE_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/core/state/metrics/MetricsListener.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/state/metrics/MetricsListener.h b/libminifi/include/core/state/metrics/MetricsListener.h
new file mode 100644
index 0000000..34df476
--- /dev/null
+++ b/libminifi/include/core/state/metrics/MetricsListener.h
@@ -0,0 +1,128 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_C2_METRICS_H_
+#define LIBMINIFI_INCLUDE_C2_METRICS_H_
+
+#include <vector>
+
+#include "MetricsBase.h"
+#include "core/state/UpdateController.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace state {
+namespace metrics {
+
+/**
+ * Purpose: Class that will represent the metrics updates, which can be performed asynchronously.
+ */
+class MetricsUpdate : public Update {
+ public:
+  MetricsUpdate(UpdateStatus status)
+      : Update(status) {
+
+  }
+  virtual bool validate() {
+    return true;
+  }
+};
+
+class MetricsWatcher : public utils::AfterExecute<Update> {
+ public:
+  explicit MetricsWatcher(std::atomic<bool> *running)
+      : running_(running) {
+  }
+
+  explicit MetricsWatcher(MetricsWatcher && other)
+      : running_(std::move(other.running_)) {
+
+  }
+
+  ~MetricsWatcher() {
+
+  }
+
+  virtual bool isFinished(const Update &result) {
+    if (result.getStatus().getState() == UpdateState::READ_COMPLETE && running_) {
+      return false;
+    } else {
+      return true;
+    }
+  }
+  virtual bool isCancelled(const UpdateStatus &result) {
+    return false;
+  }
+
+ protected:
+  std::atomic<bool> *running_;
+
+};
+
+class MetricsListener {
+ public:
+  MetricsListener(const std::shared_ptr<metrics::MetricsReporter> &source, const std::shared_ptr<metrics::MetricsSink> &sink)
+      : running_(true),
+        sink_(sink),
+        source_(source) {
+
+    function_ = [&]() {
+      while(running_) {
+        std::vector<std::shared_ptr<metrics::Metrics>> metric_vector;
+        // simple pass through for the metrics
+        if (nullptr != source_ && nullptr != sink_) {
+          source_->getMetrics(metric_vector,0);
+          for(auto metric : metric_vector) {
+            sink_->setMetrics(metric);
+          }
+        }
+        std::this_thread::sleep_for(std::chrono::milliseconds(1000));
+      }
+      return MetricsUpdate(UpdateState::READ_COMPLETE);
+    };
+  }
+
+  void stop() {
+    running_ = false;
+  }
+
+  std::function<Update()> &getFunction() {
+    return function_;
+  }
+
+  std::future<Update> &getFuture() {
+    return future_;
+  }
+
+ private:
+  std::function<Update()> function_;
+  std::future<Update> future_;
+  std::atomic<bool> running_;
+  std::shared_ptr<metrics::MetricsReporter> source_;
+  std::shared_ptr<metrics::MetricsSink> sink_;
+};
+
+} /* namespace metrics */
+} /* namespace state */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_C2_METRICS_H_ */


Mime
View raw message