nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ald...@apache.org
Subject [5/8] nifi-minifi-cpp git commit: MINIFI-339: Add C2 base allowing for 1 protocol and n heartbeat reporters MINIFI-339: Add GetTCP Processor MINIFI-339: Add listener server MINIFI-339: Update to listener MINIFI-339: Resolve Issue with stack based process
Date Mon, 02 Oct 2017 14:57:18 GMT
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/FlowController.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index b0fbffa..6485e6c 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -33,11 +33,20 @@
 #include <utility>
 #include <memory>
 #include <string>
+#include "core/state/metrics/QueueMetrics.h"
+#include "core/state/metrics/DeviceInformation.h"
+#include "core/state/metrics/SystemMetrics.h"
+#include "core/state/metrics/ProcessMetrics.h"
+#include "core/state/metrics/RepositoryMetrics.h"
+#include "core/state/ProcessorController.h"
 #include "yaml-cpp/yaml.h"
+#include "c2/C2Agent.h"
 #include "core/ProcessContext.h"
 #include "core/ProcessGroup.h"
 #include "utils/StringUtils.h"
 #include "core/Core.h"
+#include "core/ClassLoader.h"
+#include "SchedulingAgent.h"
 #include "core/controller/ControllerServiceProvider.h"
 #include "core/logging/LoggerConfiguration.h"
 #include "core/repository/FlowFileRepository.h"
@@ -58,6 +67,7 @@ FlowController::FlowController(std::shared_ptr<core::Repository> provenance_repo
       max_timer_driven_threads_(0),
       max_event_driven_threads_(0),
       running_(false),
+      c2_enabled_(true),
       initialized_(false),
       provenance_repo_(provenance_repo),
       flow_file_repo_(flow_file_repo),
@@ -88,6 +98,7 @@ FlowController::FlowController(std::shared_ptr<core::Repository> provenance_repo
   max_timer_driven_threads_ = DEFAULT_MAX_TIMER_DRIVEN_THREAD;
   running_ = false;
   initialized_ = false;
+  c2_initialized_ = false;
   root_ = nullptr;
 
   protocol_ = new FlowControlProtocol(this, configure);
@@ -129,11 +140,13 @@ void FlowController::initializePaths(const std::string &adjustedFilename) {
   // Create the content repo directory if needed
   struct stat contentDirStat;
 
-  if (stat(ResourceClaim::default_directory_path, &contentDirStat) != -1 && S_ISDIR(contentDirStat.st_mode)) {
-    path = realpath(ResourceClaim::default_directory_path, full_path);
+  minifi::setDefaultDirectory(DEFAULT_CONTENT_DIRECTORY);
+
+  if (stat(minifi::default_directory_path.c_str(), &contentDirStat) != -1 && S_ISDIR(contentDirStat.st_mode)) {
+    path = realpath(minifi::default_directory_path.c_str(), full_path);
     logger_->log_info("FlowController content directory %s", full_path);
   } else {
-    if (mkdir(ResourceClaim::default_directory_path, 0777) == -1) {
+    if (mkdir(minifi::default_directory_path.c_str(), 0777) == -1) {
       logger_->log_error("FlowController content directory creation failed");
       exit(1);
     }
@@ -156,11 +169,11 @@ FlowController::~FlowController() {
   provenance_repo_ = nullptr;
 }
 
-bool FlowController::applyConfiguration(std::string &configurePayload) {
+bool FlowController::applyConfiguration(const std::string &configurePayload) {
   std::unique_ptr<core::ProcessGroup> newRoot;
   try {
     newRoot = std::move(flow_configuration_->getRootFromPayload(configurePayload));
-  } catch (const YAML::Exception& e) {
+  } catch (...) {
     logger_->log_error("Invalid configuration payload");
     return false;
   }
@@ -170,7 +183,7 @@ bool FlowController::applyConfiguration(std::string &configurePayload) {
 
   logger_->log_info("Starting to reload Flow Controller with flow control name %s, version %d", newRoot->getName().c_str(), newRoot->getVersion());
 
-  std::lock_guard < std::recursive_mutex > flow_lock(mutex_);
+  std::lock_guard<std::recursive_mutex> flow_lock(mutex_);
   stop(true);
   waitUnload(30000);
   this->root_ = std::move(newRoot);
@@ -179,8 +192,8 @@ bool FlowController::applyConfiguration(std::string &configurePayload) {
   return start();
 }
 
-void FlowController::stop(bool force) {
-  std::lock_guard < std::recursive_mutex > flow_lock(mutex_);
+int16_t FlowController::stop(bool force, uint64_t timeToWait) {
+  std::lock_guard<std::recursive_mutex> flow_lock(mutex_);
   if (running_) {
     // immediately indicate that we are not running
     logger_->log_info("Stop Flow Controller");
@@ -193,6 +206,7 @@ void FlowController::stop(bool force) {
     this->event_scheduler_->stop();
     running_ = false;
   }
+  return 0;
 }
 
 /**
@@ -219,13 +233,12 @@ void FlowController::waitUnload(const uint64_t timeToWaitMs) {
 }
 
 void FlowController::unload() {
-  std::lock_guard < std::recursive_mutex > flow_lock(mutex_);
+  std::lock_guard<std::recursive_mutex> flow_lock(mutex_);
   if (running_) {
     stop(true);
   }
   if (initialized_) {
     logger_->log_info("Unload Flow Controller");
-    root_ = nullptr;
     initialized_ = false;
     name_ = "";
   }
@@ -234,39 +247,34 @@ void FlowController::unload() {
 }
 
 void FlowController::load() {
-  std::lock_guard < std::recursive_mutex > flow_lock(mutex_);
+  std::lock_guard<std::recursive_mutex> flow_lock(mutex_);
   if (running_) {
     stop(true);
   }
   if (!initialized_) {
-    std::string listenerType;
-    // grab the value for configuration
-    if (this->http_configuration_listener_ == nullptr && configuration_->get(Configure::nifi_configuration_listener_type, listenerType)) {
-      if (listenerType == "http") {
-        this->http_configuration_listener_ = std::unique_ptr < minifi::HttpConfigurationListener > (new minifi::HttpConfigurationListener(shared_from_this(), configuration_));
-      }
-    }
-
     logger_->log_info("Initializing timers");
+
     if (nullptr == timer_scheduler_) {
-      timer_scheduler_ = std::make_shared < TimerDrivenSchedulingAgent
-          > (std::static_pointer_cast < core::controller::ControllerServiceProvider > (shared_from_this()), provenance_repo_, flow_file_repo_, content_repo_, configuration_);
+      timer_scheduler_ = std::make_shared<TimerDrivenSchedulingAgent>(
+          std::static_pointer_cast<core::controller::ControllerServiceProvider>(std::dynamic_pointer_cast<FlowController>(shared_from_this())), provenance_repo_, flow_file_repo_, content_repo_,
+          configuration_);
     }
     if (nullptr == event_scheduler_) {
-      event_scheduler_ = std::make_shared < EventDrivenSchedulingAgent
-          > (std::static_pointer_cast < core::controller::ControllerServiceProvider > (shared_from_this()), provenance_repo_, flow_file_repo_, content_repo_, configuration_);
+      event_scheduler_ = std::make_shared<EventDrivenSchedulingAgent>(
+          std::static_pointer_cast<core::controller::ControllerServiceProvider>(std::dynamic_pointer_cast<FlowController>(shared_from_this())), provenance_repo_, flow_file_repo_, content_repo_,
+          configuration_);
     }
     logger_->log_info("Load Flow Controller from file %s", configuration_filename_.c_str());
 
-    this->root_ = std::shared_ptr < core::ProcessGroup > (flow_configuration_->getRoot(configuration_filename_));
+    this->root_ = std::shared_ptr<core::ProcessGroup>(flow_configuration_->getRoot(configuration_filename_));
 
     logger_->log_info("Loaded root processor Group");
 
     controller_service_provider_ = flow_configuration_->getControllerServiceProvider();
 
-    std::static_pointer_cast < core::controller::StandardControllerServiceProvider > (controller_service_provider_)->setRootGroup(root_);
-    std::static_pointer_cast < core::controller::StandardControllerServiceProvider
-        > (controller_service_provider_)->setSchedulingAgent(std::static_pointer_cast < minifi::SchedulingAgent > (event_scheduler_));
+    std::static_pointer_cast<core::controller::StandardControllerServiceProvider>(controller_service_provider_)->setRootGroup(root_);
+    std::static_pointer_cast<core::controller::StandardControllerServiceProvider>(controller_service_provider_)->setSchedulingAgent(
+        std::static_pointer_cast<minifi::SchedulingAgent>(event_scheduler_));
 
     logger_->log_info("Loaded controller service provider");
     // Load Flow File from Repo
@@ -277,7 +285,7 @@ void FlowController::load() {
 }
 
 void FlowController::reload(std::string yamlFile) {
-  std::lock_guard < std::recursive_mutex > flow_lock(mutex_);
+  std::lock_guard<std::recursive_mutex> flow_lock(mutex_);
   logger_->log_info("Starting to reload Flow Controller with yaml %s", yamlFile.c_str());
   stop(true);
   unload();
@@ -303,7 +311,7 @@ void FlowController::loadFlowRepo() {
       this->root_->getConnections(connectionMap);
     }
     logger_->log_debug("Number of connections from connectionMap %d", connectionMap.size());
-    auto rep = std::dynamic_pointer_cast < core::repository::FlowFileRepository > (flow_file_repo_);
+    auto rep = std::dynamic_pointer_cast<core::repository::FlowFileRepository>(flow_file_repo_);
     if (nullptr != rep) {
       rep->setConnectionMap(connectionMap);
     }
@@ -313,27 +321,149 @@ void FlowController::loadFlowRepo() {
   }
 }
 
-bool FlowController::start() {
-  std::lock_guard < std::recursive_mutex > flow_lock(mutex_);
+int16_t FlowController::start() {
+  std::lock_guard<std::recursive_mutex> flow_lock(mutex_);
   if (!initialized_) {
     logger_->log_error("Can not start Flow Controller because it has not been initialized");
-    return false;
+    return -1;
   } else {
     if (!running_) {
       logger_->log_info("Starting Flow Controller");
       controller_service_provider_->enableAllControllerServices();
       this->timer_scheduler_->start();
       this->event_scheduler_->start();
+
       if (this->root_ != nullptr) {
+        start_time_ = std::chrono::steady_clock::now();
         this->root_->startProcessing(this->timer_scheduler_.get(), this->event_scheduler_.get());
       }
+      initializeC2();
       running_ = true;
       this->protocol_->start();
       this->provenance_repo_->start();
       this->flow_file_repo_->start();
       logger_->log_info("Started Flow Controller");
     }
-    return true;
+    return 0;
+  }
+}
+
+void FlowController::initializeC2() {
+  if (!c2_enabled_) {
+    return;
+  }
+  if (!c2_initialized_) {
+    std::string c2_enable_str;
+
+    if (configuration_->get(Configure::nifi_c2_enable, c2_enable_str)) {
+      bool enable_c2 = true;
+      utils::StringUtils::StringToBool(c2_enable_str, enable_c2);
+      c2_enabled_ = enable_c2;
+      if (!c2_enabled_) {
+        return;
+      }
+    } else {
+      c2_enabled_ = true;
+    }
+    state::StateManager::initialize();
+    std::shared_ptr<c2::C2Agent> agent = std::make_shared<c2::C2Agent>(std::dynamic_pointer_cast<FlowController>(shared_from_this()), std::dynamic_pointer_cast<FlowController>(shared_from_this()),
+                                                                       configuration_);
+    registerUpdateListener(agent);
+  }
+  if (!c2_enabled_) {
+    return;
+  }
+
+  c2_initialized_ = true;
+  metrics_.clear();
+  component_metrics_.clear();
+  component_metrics_by_id_.clear();
+  std::string class_csv;
+
+  if (root_ != nullptr) {
+    std::shared_ptr<state::metrics::QueueMetrics> queueMetrics = std::make_shared<state::metrics::QueueMetrics>();
+
+    std::map<std::string, std::shared_ptr<Connection>> connections;
+    root_->getConnections(connections);
+    for (auto con : connections) {
+      queueMetrics->addConnection(con.second);
+    }
+    metrics_[queueMetrics->getName()] = queueMetrics;
+
+    std::shared_ptr<state::metrics::RepositoryMetrics> repoMetrics = std::make_shared<state::metrics::RepositoryMetrics>();
+
+    repoMetrics->addRepository(provenance_repo_);
+    repoMetrics->addRepository(flow_file_repo_);
+
+    metrics_[repoMetrics->getName()] = repoMetrics;
+  }
+
+  if (configuration_->get("nifi.flow.metrics.classes", class_csv)) {
+    std::vector<std::string> classes = utils::StringUtils::split(class_csv, ",");
+
+    for (std::string clazz : classes) {
+      auto ptr = core::ClassLoader::getDefaultClassLoader().instantiate(clazz, clazz);
+
+      if (nullptr == ptr) {
+        logger_->log_error("No metric defined for %s", clazz.c_str());
+        continue;
+      }
+
+      std::shared_ptr<state::metrics::Metrics> processor = std::static_pointer_cast<state::metrics::Metrics>(ptr);
+
+      std::lock_guard<std::mutex> lock(metrics_mutex_);
+
+      metrics_[processor->getName()] = processor;
+    }
+  }
+
+  // first we should get all component metrics, then
+  // we will build the mapping
+  std::vector<std::shared_ptr<core::Processor>> processors;
+  if (root_ != nullptr) {
+    root_->getAllProcessors(processors);
+    for (const auto &processor : processors) {
+      auto rep = std::dynamic_pointer_cast<state::metrics::MetricsSource>(processor);
+      // we have a metrics source.
+      if (nullptr != rep) {
+        std::vector<std::shared_ptr<state::metrics::Metrics>> metric_vector;
+        rep->getMetrics(metric_vector);
+        for (auto metric : metric_vector) {
+          component_metrics_[metric->getName()] = metric;
+        }
+      }
+    }
+  }
+
+  std::string class_definitions;
+  if (configuration_->get("nifi.flow.metrics.class.definitions", class_definitions)) {
+    std::vector<std::string> classes = utils::StringUtils::split(class_definitions, ",");
+
+    for (std::string metricsClass : classes) {
+      try {
+        int id = std::stoi(metricsClass);
+        std::stringstream option;
+        option << "nifi.flow.metrics.class.definitions." << metricsClass;
+        if (configuration_->get(option.str(), class_definitions)) {
+          std::vector<std::string> classes = utils::StringUtils::split(class_definitions, ",");
+
+          for (std::string clazz : classes) {
+            std::lock_guard<std::mutex> lock(metrics_mutex_);
+            auto ret = component_metrics_[clazz];
+            if (nullptr == ret) {
+              ret = metrics_[clazz];
+            }
+            if (nullptr == ret) {
+              logger_->log_error("No metric defined for %s", clazz.c_str());
+              continue;
+            }
+            component_metrics_by_id_[id].push_back(ret);
+          }
+        }
+      } catch (...) {
+        logger_->log_error("Could not create metrics class %s", metricsClass);
+      }
+    }
   }
 }
 /**
@@ -367,7 +497,7 @@ void FlowController::removeControllerService(const std::shared_ptr<core::control
  * Enables the controller service services
  * @param serviceNode service node which will be disabled, along with linked services.
  */
-void FlowController::enableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
+std::future<bool> FlowController::enableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
   return controller_service_provider_->enableControllerService(serviceNode);
 }
 
@@ -382,8 +512,8 @@ void FlowController::enableControllerServices(std::vector<std::shared_ptr<core::
  * Disables controller services
  * @param serviceNode service node which will be disabled, along with linked services.
  */
-void FlowController::disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
-  controller_service_provider_->disableControllerService(serviceNode);
+std::future<bool> FlowController::disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
+  return controller_service_provider_->disableControllerService(serviceNode);
 }
 
 /**
@@ -474,6 +604,94 @@ void FlowController::enableAllControllerServices() {
   controller_service_provider_->enableAllControllerServices();
 }
 
+int16_t FlowController::applyUpdate(const std::string &configuration) {
+  applyConfiguration(configuration);
+  return 0;
+}
+
+int16_t FlowController::clearConnection(const std::string &connection) {
+  if (root_ != nullptr) {
+    logger_->log_info("Attempting to clear connection %s", connection);
+    std::map<std::string, std::shared_ptr<Connection>> connections;
+    root_->getConnections(connections);
+    auto conn = connections.find(connection);
+    if (conn != connections.end()) {
+      logger_->log_info("Clearing connection %s", connection);
+      conn->second->drain();
+    }
+  }
+  return -1;
+}
+
+int16_t FlowController::getMetrics(std::vector<std::shared_ptr<state::metrics::Metrics>> &metric_vector, uint8_t metricsClass) {
+  auto now = std::chrono::steady_clock::now();
+  auto time_since = std::chrono::duration_cast<std::chrono::milliseconds>(now - last_metrics_capture_).count();
+  std::lock_guard<std::mutex> lock(metrics_mutex_);
+  if (metricsClass == 0) {
+    for (auto metric : metrics_) {
+      metric_vector.push_back(metric.second);
+    }
+  } else {
+    auto metrics = component_metrics_by_id_[metricsClass];
+    for (const auto &metric : metrics) {
+      metric_vector.push_back(metric);
+    }
+  }
+  return 0;
+}
+
+std::vector<std::shared_ptr<state::StateController>> FlowController::getAllComponents() {
+  std::vector<std::shared_ptr<state::StateController>> vec;
+  vec.push_back(shared_from_this());
+  std::vector<std::shared_ptr<core::Processor>> processors;
+  if (root_ != nullptr) {
+    root_->getAllProcessors(processors);
+    for (auto &processor : processors) {
+      switch (processor->getSchedulingStrategy()) {
+        case core::SchedulingStrategy::TIMER_DRIVEN:
+          vec.push_back(std::make_shared<state::ProcessorController>(processor, timer_scheduler_));
+          break;
+        case core::SchedulingStrategy::EVENT_DRIVEN:
+          vec.push_back(std::make_shared<state::ProcessorController>(processor, event_scheduler_));
+          break;
+        default:
+          break;
+      }
+    }
+  }
+  return vec;
+}
+std::vector<std::shared_ptr<state::StateController>> FlowController::getComponents(const std::string &name) {
+  std::vector<std::shared_ptr<state::StateController>> vec;
+
+  if (name == "FlowController") {
+    vec.push_back(shared_from_this());
+  } else {
+    // check processors
+    std::shared_ptr<core::Processor> processor = root_->findProcessor(name);
+    if (processor != nullptr) {
+      switch (processor->getSchedulingStrategy()) {
+        case core::SchedulingStrategy::TIMER_DRIVEN:
+          vec.push_back(std::make_shared<state::ProcessorController>(processor, timer_scheduler_));
+          break;
+        case core::SchedulingStrategy::EVENT_DRIVEN:
+          vec.push_back(std::make_shared<state::ProcessorController>(processor, event_scheduler_));
+          break;
+        default:
+          break;
+      }
+    }
+  }
+
+  return vec;
+}
+
+uint64_t FlowController::getUptime() {
+  auto now = std::chrono::steady_clock::now();
+  auto time_since = std::chrono::duration_cast<std::chrono::milliseconds>(now - start_time_).count();
+  return time_since;
+}
+
 } /* namespace minifi */
 } /* namespace nifi */
 } /* namespace apache */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/FlowFileRecord.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/FlowFileRecord.cpp b/libminifi/src/FlowFileRecord.cpp
index efd6fa7..b97f290 100644
--- a/libminifi/src/FlowFileRecord.cpp
+++ b/libminifi/src/FlowFileRecord.cpp
@@ -61,9 +61,11 @@ FlowFileRecord::FlowFileRecord(std::shared_ptr<core::Repository> flow_repository
 
   snapshot_ = false;
 
-  if (claim_ != nullptr)
+  if (claim_ != nullptr) {
     // Increase the flow file record owned count for the resource claim
     claim_->increaseFlowFileRecordOwnedCount();
+    content_full_fath_ = claim->getContentFullPath();
+  }
 }
 
 FlowFileRecord::FlowFileRecord(std::shared_ptr<core::Repository> flow_repository, const std::shared_ptr<core::ContentRepository> &content_repo, std::shared_ptr<core::FlowFile> &event,
@@ -82,6 +84,7 @@ FlowFileRecord::FlowFileRecord(std::shared_ptr<core::Repository> flow_repository
   event->getUUID(uuid_);
   uuid_connection_ = uuidConnection;
   if (event->getResourceClaim()) {
+    event->getResourceClaim()->increaseFlowFileRecordOwnedCount();
     content_full_fath_ = event->getResourceClaim()->getContentFullPath();
   }
 }
@@ -104,8 +107,9 @@ FlowFileRecord::~FlowFileRecord() {
     claim_->decreaseFlowFileRecordOwnedCount();
     std::string value;
     if (claim_->getFlowFileRecordOwnedCount() <= 0) {
-      logger_->log_debug("Delete Resource Claim %s", claim_->getContentFullPath().c_str());
-      if (!this->stored || !flow_repository_->Get(uuid_str_, value)) {
+      // we cannot rely on the stored variable here since we
+      if (flow_repository_ != nullptr && !flow_repository_->Get(uuid_str_, value)) {
+        logger_->log_debug("Delete Resource Claim %s", claim_->getContentFullPath().c_str());
         content_repo_->remove(claim_);
       }
     }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/HttpConfigurationListener.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/HttpConfigurationListener.cpp b/libminifi/src/HttpConfigurationListener.cpp
deleted file mode 100644
index 6b3a061..0000000
--- a/libminifi/src/HttpConfigurationListener.cpp
+++ /dev/null
@@ -1,102 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "HttpConfigurationListener.h"
-#include "FlowController.h"
-#include <curl/easy.h>
-#include <iostream>
-#include <iterator>
-#include <string>
-#include <vector>
-#include <utility>
-
-#include "core/logging/Logger.h"
-#include "core/ProcessContext.h"
-#include "core/Relationship.h"
-#include "io/DataStream.h"
-#include "io/StreamFactory.h"
-#include "utils/StringUtils.h"
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-
-bool HttpConfigurationListener::pullConfiguration(std::string &configuration) {
-  if (url_.empty())
-    return false;
-
-  bool ret = false;
-
-  std::string fullUrl = url_;
-
-  CURL *http_session = curl_easy_init();
-
-  curl_easy_setopt(http_session, CURLOPT_URL, fullUrl.c_str());
-
-  if (connect_timeout_ > 0) {
-    curl_easy_setopt(http_session, CURLOPT_TIMEOUT, connect_timeout_);
-  }
-
-  if (read_timeout_ > 0) {
-    curl_easy_setopt(http_session, CURLOPT_TIMEOUT, read_timeout_);
-  }
-
-  if (fullUrl.find("https") != std::string::npos) {
-    securityConfig_.configureSecureConnection(http_session);
-  }
-
-  utils::HTTPRequestResponse content;
-  curl_easy_setopt(http_session, CURLOPT_WRITEFUNCTION, &utils::HTTPRequestResponse::recieve_write);
-
-  curl_easy_setopt(http_session, CURLOPT_WRITEDATA, static_cast<void*>(&content));
-
-  CURLcode res = curl_easy_perform(http_session);
-
-  if (res == CURLE_OK) {
-    logger_->log_debug("HttpConfigurationListener -- curl successful to %s", fullUrl.c_str());
-
-    std::string response_body(content.data.begin(), content.data.end());
-    int64_t http_code = 0;
-    curl_easy_getinfo(http_session, CURLINFO_RESPONSE_CODE, &http_code);
-    char *content_type;
-    /* ask for the content-type */
-    curl_easy_getinfo(http_session, CURLINFO_CONTENT_TYPE, &content_type);
-
-    bool isSuccess = ((int32_t) (http_code / 100)) == 2 && res != CURLE_ABORTED_BY_CALLBACK;
-    bool body_empty = IsNullOrEmpty(content.data);
-
-    if (isSuccess && !body_empty) {
-      configuration = std::move(response_body);
-      logger_->log_debug("config %s", configuration.c_str());
-      ret = true;
-    } else {
-      logger_->log_error("Cannot output body to content");
-    }
-  } else {
-    logger_->log_error("HttpConfigurationListener -- curl_easy_perform() failed %s\n", curl_easy_strerror(res));
-  }
-  curl_easy_cleanup(http_session);
-
-  return ret;
-}
-
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/Properties.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/Properties.cpp b/libminifi/src/Properties.cpp
index 076cefc..abebfbb 100644
--- a/libminifi/src/Properties.cpp
+++ b/libminifi/src/Properties.cpp
@@ -34,7 +34,7 @@ Properties::Properties()
 
 // Get the config value
 bool Properties::get(std::string key, std::string &value) {
-  std::lock_guard < std::mutex > lock(mutex_);
+  std::lock_guard<std::mutex> lock(mutex_);
   auto it = properties_.find(key);
 
   if (it != properties_.end()) {
@@ -46,7 +46,7 @@ bool Properties::get(std::string key, std::string &value) {
 }
 
 int Properties::getInt(const std::string &key, int default_value) {
-  std::lock_guard < std::mutex > lock(mutex_);
+  std::lock_guard<std::mutex> lock(mutex_);
   auto it = properties_.find(key);
 
   if (it != properties_.end()) {

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/RemoteProcessorGroupPort.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/RemoteProcessorGroupPort.cpp b/libminifi/src/RemoteProcessorGroupPort.cpp
index bcc3d49..03121a8 100644
--- a/libminifi/src/RemoteProcessorGroupPort.cpp
+++ b/libminifi/src/RemoteProcessorGroupPort.cpp
@@ -29,7 +29,7 @@
 #include <deque>
 #include <iostream>
 #include <set>
-
+#include <vector>
 #include <string>
 #include <type_traits>
 #include <utility>
@@ -43,14 +43,18 @@
 #include "core/Property.h"
 #include "core/Relationship.h"
 #include "Site2SitePeer.h"
+#include "utils/HTTPClient.h"
 
 namespace org {
 namespace apache {
 namespace nifi {
 namespace minifi {
 
+const char *RemoteProcessorGroupPort::RPG_SSL_CONTEXT_SERVICE_NAME = "RemoteProcessorGroupPortSSLContextService";
+
 const char *RemoteProcessorGroupPort::ProcessorName("RemoteProcessorGroupPort");
 core::Property RemoteProcessorGroupPort::hostName("Host Name", "Remote Host Name.", "");
+core::Property RemoteProcessorGroupPort::SSLContext("SSL Context Service", "The SSL Context Service used to provide client certificate information for TLS/SSL (https) connections.", "");
 core::Property RemoteProcessorGroupPort::port("Port", "Remote Port", "");
 core::Property RemoteProcessorGroupPort::portUUID("Port UUID", "Specifies remote NiFi Port UUID.", "");
 core::Relationship RemoteProcessorGroupPort::relation;
@@ -71,16 +75,20 @@ std::unique_ptr<Site2SiteClientProtocol> RemoteProcessorGroupPort::getNextProtoc
         minifi::Site2SitePeerStatus peer;
         nextProtocol->setPortId(protocol_uuid_);
         {
-          std::lock_guard < std::mutex > lock(site2site_peer_mutex_);
+          std::lock_guard<std::mutex> lock(site2site_peer_mutex_);
           peer = site2site_peer_status_list_[this->site2site_peer_index_];
           site2site_peer_index_++;
           if (site2site_peer_index_ >= site2site_peer_status_list_.size()) {
             site2site_peer_index_ = 0;
           }
         }
+        logger_->log_info("creating new protocol with %s and %d", peer.host_, peer.port_);
         std::unique_ptr<org::apache::nifi::minifi::io::DataStream> str = std::unique_ptr<org::apache::nifi::minifi::io::DataStream>(stream_factory_->createSocket(peer.host_, peer.port_));
         std::unique_ptr<Site2SitePeer> peer_ = std::unique_ptr<Site2SitePeer>(new Site2SitePeer(std::move(str), peer.host_, peer.port_));
         nextProtocol->setPeer(std::move(peer_));
+      } else {
+        logger_->log_info("Refreshing the peer list since there are none configured.");
+        refreshPeerList();
       }
     }
   }
@@ -103,67 +111,72 @@ void RemoteProcessorGroupPort::initialize() {
   std::set<core::Property> properties;
   properties.insert(hostName);
   properties.insert(port);
+  properties.insert(SSLContext);
   properties.insert(portUUID);
   setSupportedProperties(properties);
 // Set the supported relationships
   std::set<core::Relationship> relationships;
   relationships.insert(relation);
   setSupportedRelationships(relationships);
-  curl_global_init(CURL_GLOBAL_DEFAULT);
-  {
-    std::lock_guard < std::mutex > lock(site2site_peer_mutex_);
-    if (!url_.empty()) {
-      refreshPeerList();
-      if (site2site_peer_status_list_.size() > 0)
+  std::lock_guard<std::mutex> lock(site2site_peer_mutex_);
+  if (!url_.empty()) {
+    refreshPeerList();
+    if (site2site_peer_status_list_.size() > 0)
+      site2site_peer_index_ = 0;
+  }
+  // populate the site2site protocol for load balancing between them
+  if (site2site_peer_status_list_.size() > 0) {
+    int count = site2site_peer_status_list_.size();
+    if (max_concurrent_tasks_ > count)
+      count = max_concurrent_tasks_;
+    for (int i = 0; i < count; i++) {
+      std::unique_ptr<Site2SiteClientProtocol> nextProtocol = nullptr;
+      nextProtocol = std::unique_ptr<Site2SiteClientProtocol>(new Site2SiteClientProtocol(nullptr));
+      nextProtocol->setPortId(protocol_uuid_);
+      minifi::Site2SitePeerStatus peer = site2site_peer_status_list_[this->site2site_peer_index_];
+      site2site_peer_index_++;
+      if (site2site_peer_index_ >= site2site_peer_status_list_.size()) {
         site2site_peer_index_ = 0;
-    }
-    // populate the site2site protocol for load balancing between them
-    if (site2site_peer_status_list_.size() > 0) {
-      int count = site2site_peer_status_list_.size();
-      if (max_concurrent_tasks_ > count)
-        count = max_concurrent_tasks_;
-      for (int i = 0; i < count; i++) {
-        std::unique_ptr<Site2SiteClientProtocol> nextProtocol = nullptr;
-        nextProtocol = std::unique_ptr < Site2SiteClientProtocol > (new Site2SiteClientProtocol(nullptr));
-        nextProtocol->setPortId(protocol_uuid_);
-        minifi::Site2SitePeerStatus peer = site2site_peer_status_list_[this->site2site_peer_index_];
-        site2site_peer_index_++;
-        if (site2site_peer_index_ >= site2site_peer_status_list_.size()) {
-          site2site_peer_index_ = 0;
-        }
-        std::unique_ptr<org::apache::nifi::minifi::io::DataStream> str = std::unique_ptr < org::apache::nifi::minifi::io::DataStream > (stream_factory_->createSocket(peer.host_, peer.port_));
-        std::unique_ptr<Site2SitePeer> peer_ = std::unique_ptr < Site2SitePeer > (new Site2SitePeer(std::move(str), peer.host_, peer.port_));
-        nextProtocol->setPeer(std::move(peer_));
-        returnProtocol(std::move(nextProtocol));
       }
+      std::unique_ptr<org::apache::nifi::minifi::io::DataStream> str = std::unique_ptr<org::apache::nifi::minifi::io::DataStream>(stream_factory_->createSocket(peer.host_, peer.port_));
+      std::unique_ptr<Site2SitePeer> peer_ = std::unique_ptr<Site2SitePeer>(new Site2SitePeer(std::move(str), peer.host_, peer.port_));
+      nextProtocol->setPeer(std::move(peer_));
+      returnProtocol(std::move(nextProtocol));
     }
   }
 }
 
 void RemoteProcessorGroupPort::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) {
   std::string value;
-
-  int64_t lvalue;
-
   if (context->getProperty(portUUID.getName(), value)) {
     uuid_parse(value.c_str(), protocol_uuid_);
   }
+  std::string context_name;
+  if (!context->getProperty(SSLContext.getName(), context_name) || IsNullOrEmpty(context_name)) {
+    context_name = RPG_SSL_CONTEXT_SERVICE_NAME;
+  }
+  std::shared_ptr<core::controller::ControllerService> service = context->getControllerService(context_name);
+  if (nullptr != service) {
+    ssl_service = std::static_pointer_cast<minifi::controllers::SSLContextService>(service);
+  }
 }
 
 void RemoteProcessorGroupPort::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
-  if (!transmitting_)
+  if (!transmitting_) {
     return;
+  }
 
   std::string value;
 
-  int64_t lvalue;
-
-  if (context->getProperty(hostName.getName(), value) && !value.empty()) {
-    host_ = value;
-  }
+  if (url_.empty()) {
+    if (context->getProperty(hostName.getName(), value) && !value.empty()) {
+      host_ = value;
+    }
 
-  if (context->getProperty(port.getName(), value) && !value.empty() && core::Property::StringToInt(value, lvalue)) {
-    port_ = static_cast<int> (lvalue);
+    int64_t lvalue;
+    if (context->getProperty(port.getName(), value) && !value.empty() && core::Property::StringToInt(value, lvalue)) {
+      port_ = static_cast<int>(lvalue);
+    }
   }
 
   if (context->getProperty(portUUID.getName(), value) && !value.empty()) {
@@ -175,13 +188,15 @@ void RemoteProcessorGroupPort::onTrigger(core::ProcessContext *context, core::Pr
     protocol_ = getNextProtocol();
 
     if (!protocol_) {
+      logger_->log_info("no protocol");
       context->yield();
       return;
     }
+    logger_->log_info("got protocol");
     if (!protocol_->bootstrap()) {
-      // bootstrap the client protocol if needeed
+      // bootstrap the client protocol if needed
       context->yield();
-      std::shared_ptr<Processor> processor = std::static_pointer_cast < Processor > (context->getProcessorNode().getProcessor());
+      std::shared_ptr<Processor> processor = std::static_pointer_cast<Processor>(context->getProcessorNode()->getProcessor());
       logger_->log_error("Site2Site bootstrap failed yield period %d peer ", processor->getYieldPeriodMsec());
 
       return;
@@ -202,14 +217,11 @@ void RemoteProcessorGroupPort::onTrigger(core::ProcessContext *context, core::Pr
     context->yield();
     session->rollback();
   }
-
-
-  throw std::exception();
 }
 
 void RemoteProcessorGroupPort::refreshRemoteSite2SiteInfo() {
   if (this->host_.empty() || this->port_ == -1 || this->protocol_.empty())
-      return;
+    return;
 
   std::string fullUrl = this->protocol_ + this->host_ + ":" + std::to_string(this->port_) + "/nifi-api/controller";
 
@@ -221,52 +233,29 @@ void RemoteProcessorGroupPort::refreshRemoteSite2SiteInfo() {
 
   if (!rest_user_name_.empty()) {
     std::string loginUrl = this->protocol_ + this->host_ + ":" + std::to_string(this->port_) + "/nifi-api/access/token";
-    token = utils::get_token(loginUrl, this->rest_user_name_, this->rest_password_, this->securityConfig_);
-    logger_->log_debug("Token from NiFi REST Api endpoint %s", token);
+    utils::HTTPClient client(loginUrl, ssl_service);
+    client.setVerbose();
+    token = utils::get_token(client, this->rest_user_name_, this->rest_password_);
+    logger_->log_debug("Token from NiFi REST Api endpoint %s,  %s", loginUrl, token);
     if (token.empty())
-        return;
+      return;
   }
 
-  CURL *http_session = curl_easy_init();
+  utils::HTTPClient client(fullUrl.c_str(), ssl_service);
 
-  if (fullUrl.find("https") != std::string::npos) {
-    this->securityConfig_.configureSecureConnection(http_session);
-  }
+  client.initialize("GET");
 
   struct curl_slist *list = NULL;
   if (!token.empty()) {
     std::string header = "Authorization: " + token;
     list = curl_slist_append(list, header.c_str());
-    curl_easy_setopt(http_session, CURLOPT_HTTPHEADER, list);
+    client.setHeaders(list);
   }
 
-  curl_easy_setopt(http_session, CURLOPT_URL, fullUrl.c_str());
-
-  utils::HTTPRequestResponse content;
-  curl_easy_setopt(http_session, CURLOPT_WRITEFUNCTION,
-      &utils::HTTPRequestResponse::recieve_write);
-
-  curl_easy_setopt(http_session, CURLOPT_WRITEDATA,
-      static_cast<void*>(&content));
-
-  CURLcode res = curl_easy_perform(http_session);
-  if (list)
-    curl_slist_free_all(list);
-
-  if (res == CURLE_OK) {
-    std::string response_body(content.data.begin(), content.data.end());
-    int64_t http_code = 0;
-    curl_easy_getinfo(http_session, CURLINFO_RESPONSE_CODE, &http_code);
-    char *content_type;
-    /* ask for the content-type */
-    curl_easy_getinfo(http_session, CURLINFO_CONTENT_TYPE, &content_type);
-
-    bool isSuccess = ((int32_t) (http_code / 100)) == 2
-        && res != CURLE_ABORTED_BY_CALLBACK;
-    bool body_empty = IsNullOrEmpty(content.data);
-
-    if (isSuccess && !body_empty) {
-      std::string controller = std::move(response_body);
+  if (client.submit() && client.getResponseCode() == 200) {
+    const std::vector<char> &response_body = client.getResponseBody();
+    if (!response_body.empty()) {
+      std::string controller = std::string(response_body.begin(), response_body.end());
       logger_->log_debug("controller config %s", controller.c_str());
       Json::Value value;
       Json::Reader reader;
@@ -284,14 +273,11 @@ void RemoteProcessorGroupPort::refreshRemoteSite2SiteInfo() {
         logger_->log_info("process group remote site2site port %d, is secure %d", site2site_port_, site2site_secure_);
       }
     } else {
-      logger_->log_error("Cannot output body to content for ProcessGroup::refreshRemoteSite2SiteInfo: received HTTP code %d from %s", http_code, fullUrl);
+      logger_->log_error("Cannot output body to content for ProcessGroup::refreshRemoteSite2SiteInfo: received HTTP code %d from %s", client.getResponseCode(), fullUrl);
     }
   } else {
-    logger_->log_error(
-        "ProcessGroup::refreshRemoteSite2SiteInfo -- curl_easy_perform() failed %s\n",
-        curl_easy_strerror(res));
+    logger_->log_error("ProcessGroup::refreshRemoteSite2SiteInfo -- curl_easy_perform() failed \n");
   }
-  curl_easy_cleanup(http_session);
 }
 
 void RemoteProcessorGroupPort::refreshPeerList() {
@@ -301,17 +287,16 @@ void RemoteProcessorGroupPort::refreshPeerList() {
 
   this->site2site_peer_status_list_.clear();
 
-  std::unique_ptr < Site2SiteClientProtocol> protocol;
-  protocol = std::unique_ptr < Site2SiteClientProtocol
-      > (new Site2SiteClientProtocol(nullptr));
+  std::unique_ptr<Site2SiteClientProtocol> protocol;
+  protocol = std::unique_ptr<Site2SiteClientProtocol>(new Site2SiteClientProtocol(nullptr));
   protocol->setPortId(protocol_uuid_);
-  std::unique_ptr<org::apache::nifi::minifi::io::DataStream> str =
-      std::unique_ptr < org::apache::nifi::minifi::io::DataStream
-          > (stream_factory_->createSocket(host_, site2site_port_));
-  std::unique_ptr<Site2SitePeer> peer_ = std::unique_ptr < Site2SitePeer
-      > (new Site2SitePeer(std::move(str), host_, site2site_port_));
+  std::unique_ptr<org::apache::nifi::minifi::io::DataStream> str = std::unique_ptr<org::apache::nifi::minifi::io::DataStream>(stream_factory_->createSocket(host_, site2site_port_));
+  std::unique_ptr<Site2SitePeer> peer_ = std::unique_ptr<Site2SitePeer>(new Site2SitePeer(std::move(str), host_, site2site_port_));
   protocol->setPeer(std::move(peer_));
   protocol->getPeerList(site2site_peer_status_list_);
+
+  if (site2site_peer_status_list_.size() > 0)
+    site2site_peer_index_ = 0;
 }
 
 } /* namespace minifi */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/ResourceClaim.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/ResourceClaim.cpp b/libminifi/src/ResourceClaim.cpp
index e7d4557..783a108 100644
--- a/libminifi/src/ResourceClaim.cpp
+++ b/libminifi/src/ResourceClaim.cpp
@@ -35,11 +35,14 @@ namespace minifi {
 
 utils::NonRepeatingStringGenerator ResourceClaim::non_repeating_string_generator_;
 
-char *ResourceClaim::default_directory_path = const_cast<char*>(DEFAULT_CONTENT_DIRECTORY);
+std::string default_directory_path = "";
+
+void setDefaultDirectory(std::string path) {
+  default_directory_path = path;
+}
 
 ResourceClaim::ResourceClaim(std::shared_ptr<core::StreamManager<ResourceClaim>> claim_manager, const std::string contentDirectory)
-    : _flowFileRecordOwnedCount(0),
-      claim_manager_(claim_manager),
+    : claim_manager_(claim_manager),
       deleted_(false),
       logger_(logging::LoggerFactory<ResourceClaim>::getLogger()) {
   // Create the full content path for the content

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/SchedulingAgent.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/SchedulingAgent.cpp b/libminifi/src/SchedulingAgent.cpp
index e228ba5..4a227b5 100644
--- a/libminifi/src/SchedulingAgent.cpp
+++ b/libminifi/src/SchedulingAgent.cpp
@@ -39,10 +39,10 @@ bool SchedulingAgent::hasWorkToDo(std::shared_ptr<core::Processor> processor) {
     return false;
 }
 
-void SchedulingAgent::enableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
-  logger_->log_trace("Enabling CSN in SchedulingAgent %s", serviceNode->getName());
+std::future<bool> SchedulingAgent::enableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
+  logger_->log_info("Enabling CSN in SchedulingAgent %s", serviceNode->getName());
   // reference the enable function from serviceNode
-  std::function < bool() > f_ex = [serviceNode] {
+  std::function< bool()> f_ex = [serviceNode] {
     return serviceNode->enable();
   };
   // create a functor that will be submitted to the thread pool.
@@ -51,11 +51,14 @@ void SchedulingAgent::enableControllerService(std::shared_ptr<core::controller::
   // we aren't terribly concerned with the result.
   std::future<bool> future;
   component_lifecycle_thread_pool_.execute(std::move(functor), future);
+  future.wait();
+  return future;
 }
 
-void SchedulingAgent::disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
+std::future<bool> SchedulingAgent::disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
+  logger_->log_info("Disabling CSN in SchedulingAgent %s", serviceNode->getName());
   // reference the disable function from serviceNode
-  std::function < bool() > f_ex = [serviceNode] {
+  std::function< bool()> f_ex = [serviceNode] {
     return serviceNode->disable();
   };
   // create a functor that will be submitted to the thread pool.
@@ -64,15 +67,19 @@ void SchedulingAgent::disableControllerService(std::shared_ptr<core::controller:
   // we aren't terribly concerned with the result.
   std::future<bool> future;
   component_lifecycle_thread_pool_.execute(std::move(functor), future);
+  future.wait();
+  return future;
 }
 
 bool SchedulingAgent::hasTooMuchOutGoing(std::shared_ptr<core::Processor> processor) {
   return processor->flowFilesOutGoingFull();
 }
 
-bool SchedulingAgent::onTrigger(std::shared_ptr<core::Processor> processor, core::ProcessContext *processContext, core::ProcessSessionFactory *sessionFactory) {
-  if (processor->isYield())
+bool SchedulingAgent::onTrigger(std::shared_ptr<core::Processor> processor, std::shared_ptr<core::ProcessContext> processContext, std::shared_ptr<core::ProcessSessionFactory> sessionFactory) {
+  if (processor->isYield()) {
+    logger_->log_debug("Not running %s since it must yield", processor->getName());
     return false;
+  }
 
   // No need to yield, reset yield expiration to 0
   processor->clearYield();
@@ -89,6 +96,7 @@ bool SchedulingAgent::onTrigger(std::shared_ptr<core::Processor> processor, core
 
   processor->incrementActiveTasks();
   try {
+    logger_->log_debug("Triggering %s", processor->getName());
     processor->onTrigger(processContext, sessionFactory);
     processor->decrementActiveTask();
   } catch (Exception &exception) {

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/Site2SiteClientProtocol.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/Site2SiteClientProtocol.cpp b/libminifi/src/Site2SiteClientProtocol.cpp
index 024bd35..8b8b646 100644
--- a/libminifi/src/Site2SiteClientProtocol.cpp
+++ b/libminifi/src/Site2SiteClientProtocol.cpp
@@ -101,32 +101,32 @@ bool Site2SiteClientProtocol::initiateResourceNegotiation() {
   }
   logger_->log_info("status code is %i", statusCode);
   switch (statusCode) {
-  case RESOURCE_OK:
-    logger_->log_info("Site2Site Protocol Negotiate protocol version OK");
-    return true;
-  case DIFFERENT_RESOURCE_VERSION:
-    uint32_t serverVersion;
-    ret = peer_->read(serverVersion);
-    if (ret <= 0) {
-      return false;
-    }
-    logger_->log_info("Site2Site Server Response asked for a different protocol version %d", serverVersion);
-    for (unsigned int i = (_currentVersionIndex + 1); i < sizeof(_supportedVersion) / sizeof(uint32_t); i++) {
-      if (serverVersion >= _supportedVersion[i]) {
-        _currentVersion = _supportedVersion[i];
-        _currentVersionIndex = i;
-        return initiateResourceNegotiation();
+    case RESOURCE_OK:
+      logger_->log_info("Site2Site Protocol Negotiate protocol version OK");
+      return true;
+    case DIFFERENT_RESOURCE_VERSION:
+      uint32_t serverVersion;
+      ret = peer_->read(serverVersion);
+      if (ret <= 0) {
+        return false;
       }
-    }
-    ret = -1;
-    return false;
-  case NEGOTIATED_ABORT:
-    logger_->log_info("Site2Site Negotiate protocol response ABORT");
-    ret = -1;
-    return false;
-  default:
-    logger_->log_info("Negotiate protocol response unknown code %d", statusCode);
-    return true;
+      logger_->log_info("Site2Site Server Response asked for a different protocol version %d", serverVersion);
+      for (unsigned int i = (_currentVersionIndex + 1); i < sizeof(_supportedVersion) / sizeof(uint32_t); i++) {
+        if (serverVersion >= _supportedVersion[i]) {
+          _currentVersion = _supportedVersion[i];
+          _currentVersionIndex = i;
+          return initiateResourceNegotiation();
+        }
+      }
+      ret = -1;
+      return false;
+    case NEGOTIATED_ABORT:
+      logger_->log_info("Site2Site Negotiate protocol response ABORT");
+      ret = -1;
+      return false;
+    default:
+      logger_->log_info("Negotiate protocol response unknown code %d", statusCode);
+      return true;
   }
 
   return true;
@@ -163,32 +163,32 @@ bool Site2SiteClientProtocol::initiateCodecResourceNegotiation() {
   }
 
   switch (statusCode) {
-  case RESOURCE_OK:
-    logger_->log_info("Site2Site Codec Negotiate version OK");
-    return true;
-  case DIFFERENT_RESOURCE_VERSION:
-    uint32_t serverVersion;
-    ret = peer_->read(serverVersion);
-    if (ret <= 0) {
-      return false;
-    }
-    logger_->log_info("Site2Site Server Response asked for a different codec version %d", serverVersion);
-    for (unsigned int i = (_currentCodecVersionIndex + 1); i < sizeof(_supportedCodecVersion) / sizeof(uint32_t); i++) {
-      if (serverVersion >= _supportedCodecVersion[i]) {
-        _currentCodecVersion = _supportedCodecVersion[i];
-        _currentCodecVersionIndex = i;
-        return initiateCodecResourceNegotiation();
+    case RESOURCE_OK:
+      logger_->log_info("Site2Site Codec Negotiate version OK");
+      return true;
+    case DIFFERENT_RESOURCE_VERSION:
+      uint32_t serverVersion;
+      ret = peer_->read(serverVersion);
+      if (ret <= 0) {
+        return false;
       }
-    }
-    ret = -1;
-    return false;
-  case NEGOTIATED_ABORT:
-    logger_->log_info("Site2Site Codec Negotiate response ABORT");
-    ret = -1;
-    return false;
-  default:
-    logger_->log_info("Negotiate Codec response unknown code %d", statusCode);
-    return true;
+      logger_->log_info("Site2Site Server Response asked for a different codec version %d", serverVersion);
+      for (unsigned int i = (_currentCodecVersionIndex + 1); i < sizeof(_supportedCodecVersion) / sizeof(uint32_t); i++) {
+        if (serverVersion >= _supportedCodecVersion[i]) {
+          _currentCodecVersion = _supportedCodecVersion[i];
+          _currentCodecVersionIndex = i;
+          return initiateCodecResourceNegotiation();
+        }
+      }
+      ret = -1;
+      return false;
+    case NEGOTIATED_ABORT:
+      logger_->log_info("Site2Site Codec Negotiate response ABORT");
+      ret = -1;
+      return false;
+    default:
+      logger_->log_info("Negotiate Codec response unknown code %d", statusCode);
+      return true;
   }
 
   return true;
@@ -213,7 +213,7 @@ bool Site2SiteClientProtocol::handShake() {
     return false;
   }
 
-  std::map < std::string, std::string > properties;
+  std::map<std::string, std::string> properties;
   properties[HandShakePropertyStr[GZIP]] = "false";
   properties[HandShakePropertyStr[PORT_IDENTIFIER]] = _portIdStr;
   properties[HandShakePropertyStr[REQUEST_EXPIRATION_MILLIS]] = std::to_string(this->_timeOut);
@@ -262,20 +262,20 @@ bool Site2SiteClientProtocol::handShake() {
   }
 
   switch (code) {
-  case PROPERTIES_OK:
-    logger_->log_info("Site2Site HandShake Completed");
-    _peerState = HANDSHAKED;
-    return true;
-  case PORT_NOT_IN_VALID_STATE:
-  case UNKNOWN_PORT:
-  case PORTS_DESTINATION_FULL:
-    logger_->log_error("Site2Site HandShake Failed because destination port is either invalid or full");
-    ret = -1;
-    return false;
-  default:
-    logger_->log_info("HandShake Failed because of unknown respond code %d", code);
-    ret = -1;
-    return false;
+    case PROPERTIES_OK:
+      logger_->log_info("Site2Site HandShake Completed");
+      _peerState = HANDSHAKED;
+      return true;
+    case PORT_NOT_IN_VALID_STATE:
+    case UNKNOWN_PORT:
+    case PORTS_DESTINATION_FULL:
+      logger_->log_error("Site2Site HandShake Failed because destination port, %s, is either invalid or full", _portIdStr);
+      ret = -1;
+      return false;
+    default:
+      logger_->log_info("HandShake Failed because of unknown respond code %d", code);
+      ret = -1;
+      return false;
   }
 
   return false;
@@ -523,27 +523,27 @@ Transaction* Site2SiteClientProtocol::createTransaction(std::string &transaction
 
     org::apache::nifi::minifi::io::CRCStream<Site2SitePeer> crcstream(peer_.get());
     switch (code) {
-    case MORE_DATA:
-      dataAvailable = true;
-      logger_->log_info("Site2Site peer indicates that data is available");
-      transaction = new Transaction(direction, crcstream);
-      _transactionMap[transaction->getUUIDStr()] = transaction;
-      transactionID = transaction->getUUIDStr();
-      transaction->setDataAvailable(dataAvailable);
-      logger_->log_info("Site2Site create transaction %s", transaction->getUUIDStr().c_str());
-      return transaction;
-    case NO_MORE_DATA:
-      dataAvailable = false;
-      logger_->log_info("Site2Site peer indicates that no data is available");
-      transaction = new Transaction(direction, crcstream);
-      _transactionMap[transaction->getUUIDStr()] = transaction;
-      transactionID = transaction->getUUIDStr();
-      transaction->setDataAvailable(dataAvailable);
-      logger_->log_info("Site2Site create transaction %s", transaction->getUUIDStr().c_str());
-      return transaction;
-    default:
-      logger_->log_info("Site2Site got unexpected response %d when asking for data", code);
-      return NULL;
+      case MORE_DATA:
+        dataAvailable = true;
+        logger_->log_info("Site2Site peer indicates that data is available");
+        transaction = new Transaction(direction, crcstream);
+        _transactionMap[transaction->getUUIDStr()] = transaction;
+        transactionID = transaction->getUUIDStr();
+        transaction->setDataAvailable(dataAvailable);
+        logger_->log_info("Site2Site create transaction %s", transaction->getUUIDStr().c_str());
+        return transaction;
+      case NO_MORE_DATA:
+        dataAvailable = false;
+        logger_->log_info("Site2Site peer indicates that no data is available");
+        transaction = new Transaction(direction, crcstream);
+        _transactionMap[transaction->getUUIDStr()] = transaction;
+        transactionID = transaction->getUUIDStr();
+        transaction->setDataAvailable(dataAvailable);
+        logger_->log_info("Site2Site create transaction %s", transaction->getUUIDStr().c_str());
+        return transaction;
+      default:
+        logger_->log_info("Site2Site got unexpected response %d when asking for data", code);
+        return NULL;
     }
   } else {
     ret = writeRequestType(SEND_FLOWFILES);
@@ -661,40 +661,45 @@ bool Site2SiteClientProtocol::receive(std::string transactionID, DataPacket *pac
   return true;
 }
 
-bool Site2SiteClientProtocol::send(std::string transactionID, DataPacket *packet, std::shared_ptr<FlowFileRecord> flowFile, core::ProcessSession *session) {
+int16_t Site2SiteClientProtocol::send(std::string transactionID, DataPacket *packet, std::shared_ptr<FlowFileRecord> flowFile, core::ProcessSession *session) {
   int ret;
   Transaction *transaction = NULL;
 
+  if (flowFile && !flowFile->getResourceClaim()->exists()) {
+    logger_->log_info("Claim %s does not exist for FlowFile %s", flowFile->getResourceClaim()->getContentFullPath(), flowFile->getUUIDStr());
+    return -2;
+  }
+
   if (_peerState != READY) {
     bootstrap();
   }
 
   if (_peerState != READY) {
-    return false;
+    return -1;
   }
 
   std::map<std::string, Transaction *>::iterator it = this->_transactionMap.find(transactionID);
 
   if (it == _transactionMap.end()) {
-    return false;
+    return -1;
   } else {
     transaction = it->second;
   }
 
   if (transaction->getState() != TRANSACTION_STARTED && transaction->getState() != DATA_EXCHANGED) {
     logger_->log_info("Site2Site transaction %s is not at started or exchanged state", transactionID.c_str());
-    return false;
+    return -1;
   }
 
   if (transaction->getDirection() != SEND) {
     logger_->log_info("Site2Site transaction %s direction is wrong", transactionID.c_str());
-    return false;
+    return -1;
   }
 
   if (transaction->_transfers > 0) {
     ret = writeRespond(CONTINUE_TRANSACTION, "CONTINUE_TRANSACTION");
     if (ret <= 0) {
-      return false;
+      return -1;
     }
   }
 
@@ -702,7 +707,7 @@ bool Site2SiteClientProtocol::send(std::string transactionID, DataPacket *packet
   uint32_t numAttributes = packet->_attributes.size();
   ret = transaction->getStream().write(numAttributes);
   if (ret != 4) {
-    return false;
+    return -1;
   }
 
   std::map<std::string, std::string>::iterator itAttribute;
@@ -710,11 +715,11 @@ bool Site2SiteClientProtocol::send(std::string transactionID, DataPacket *packet
     ret = transaction->getStream().writeUTF(itAttribute->first, true);
 
     if (ret <= 0) {
-      return false;
+      return -1;
     }
     ret = transaction->getStream().writeUTF(itAttribute->second, true);
     if (ret <= 0) {
-      return false;
+      return -1;
     }
     logger_->log_info("Site2Site transaction %s send attribute key %s value %s", transactionID.c_str(), itAttribute->first.c_str(), itAttribute->second.c_str());
   }
@@ -724,13 +729,15 @@ bool Site2SiteClientProtocol::send(std::string transactionID, DataPacket *packet
     len = flowFile->getSize();
     ret = transaction->getStream().write(len);
     if (ret != 8) {
-      return false;
+      logger_->log_info("ret != 8");
+      return -1;
     }
     if (flowFile->getSize() > 0) {
       Site2SiteClientProtocol::ReadCallback callback(packet);
       session->read(flowFile, &callback);
       if (flowFile->getSize() != packet->_size) {
-        return false;
+        logger_->log_info("MisMatched sizes %d %d", flowFile->getSize(), packet->_size);
+        return -2;
       }
     }
     if (packet->payload_.length() == 0 && len == 0) {
@@ -744,12 +751,13 @@ bool Site2SiteClientProtocol::send(std::string transactionID, DataPacket *packet
 
     ret = transaction->getStream().write(len);
     if (ret != 8) {
-      return false;
+      return -1;
     }
 
     ret = transaction->getStream().writeData(reinterpret_cast<uint8_t *>(const_cast<char*>(packet->payload_.c_str())), len);
     if (ret != len) {
-      return false;
+      logger_->log_info("ret != len");
+      return -1;
     }
     packet->_size += len;
   }
@@ -759,7 +767,7 @@ bool Site2SiteClientProtocol::send(std::string transactionID, DataPacket *packet
   transaction->_bytes += len;
   logger_->log_info("Site2Site transaction %s send flow record %d, total length %d", transactionID.c_str(), transaction->_transfers, transaction->_bytes);
 
-  return true;
+  return 0;
 }
 
 void Site2SiteClientProtocol::receiveFlowFiles(core::ProcessContext *context, core::ProcessSession *session) {
@@ -775,7 +783,6 @@ void Site2SiteClientProtocol::receiveFlowFiles(core::ProcessContext *context, co
     context->yield();
     tearDown();
     throw Exception(SITE2SITE_EXCEPTION, "Can not establish handshake with peer");
-    return;
   }
 
   // Create the transaction
@@ -786,12 +793,11 @@ void Site2SiteClientProtocol::receiveFlowFiles(core::ProcessContext *context, co
     context->yield();
     tearDown();
     throw Exception(SITE2SITE_EXCEPTION, "Can not create transaction");
-    return;
   }
 
   try {
     while (true) {
-      std::map < std::string, std::string > empty;
+      std::map<std::string, std::string> empty;
       uint64_t startTime = getTimeMillis();
       std::string payload;
       DataPacket packet(this, transaction, empty, payload);
@@ -799,17 +805,15 @@ void Site2SiteClientProtocol::receiveFlowFiles(core::ProcessContext *context, co
 
       if (!receive(transactionID, &packet, eof)) {
         throw Exception(SITE2SITE_EXCEPTION, "Receive Failed");
-        return;
       }
       if (eof) {
         // transaction done
         break;
       }
-      std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast < FlowFileRecord > (session->create());
+      std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->create());
 
       if (!flowFile) {
         throw Exception(SITE2SITE_EXCEPTION, "Flow File Creation Failed");
-        return;
       }
       std::map<std::string, std::string>::iterator it;
       std::string sourceIdentifier;
@@ -824,7 +828,6 @@ void Site2SiteClientProtocol::receiveFlowFiles(core::ProcessContext *context, co
         session->write(flowFile, &callback);
         if (flowFile->getSize() != packet._size) {
           throw Exception(SITE2SITE_EXCEPTION, "Receive Size Not Right");
-          return;
         }
       }
       core::Relationship relation;  // undefined relationship
@@ -840,11 +843,9 @@ void Site2SiteClientProtocol::receiveFlowFiles(core::ProcessContext *context, co
 
     if (!confirm(transactionID)) {
       throw Exception(SITE2SITE_EXCEPTION, "Confirm Transaction Failed");
-      return;
     }
     if (!complete(transactionID)) {
       throw Exception(SITE2SITE_EXCEPTION, "Complete Transaction Failed");
-      return;
     }
     logger_->log_info("Site2Site transaction %s successfully receive flow record %d, content bytes %d", transactionID.c_str(), transfers, bytes);
     // we yield the receive if we did not get anything
@@ -962,12 +963,6 @@ bool Site2SiteClientProtocol::confirm(std::string transactionID) {
         } else {
           logger_->log_info("Site2Site transaction %s CRC not matched %s", transactionID.c_str(), crc.c_str());
           ret = writeRespond(BAD_CHECKSUM, "BAD_CHECKSUM");
-          /*
-           ret = writeRespond(CONFIRM_TRANSACTION, "CONFIRM_TRANSACTION");
-           if (ret <= 0)
-           return false;
-           transaction->_state = TRANSACTION_CONFIRMED;
-           return true; */
           return false;
         }
       }
@@ -1103,7 +1098,7 @@ bool Site2SiteClientProtocol::complete(std::string transactionID) {
 }
 
 void Site2SiteClientProtocol::transferFlowFiles(core::ProcessContext *context, core::ProcessSession *session) {
-  std::shared_ptr<FlowFileRecord> flow = std::static_pointer_cast < FlowFileRecord > (session->get());
+  std::shared_ptr<FlowFileRecord> flow = std::static_pointer_cast<FlowFileRecord>(session->get());
 
   Transaction *transaction = NULL;
 
@@ -1119,7 +1114,6 @@ void Site2SiteClientProtocol::transferFlowFiles(core::ProcessContext *context, c
     context->yield();
     tearDown();
     throw Exception(SITE2SITE_EXCEPTION, "Can not establish handshake with peer");
-    return;
   }
 
   // Create the transaction
@@ -1130,7 +1124,6 @@ void Site2SiteClientProtocol::transferFlowFiles(core::ProcessContext *context, c
     context->yield();
     tearDown();
     throw Exception(SITE2SITE_EXCEPTION, "Can not create transaction");
-    return;
   }
 
   bool continueTransaction = true;
@@ -1142,22 +1135,25 @@ void Site2SiteClientProtocol::transferFlowFiles(core::ProcessContext *context, c
       std::string payload;
       DataPacket packet(this, transaction, flow->getAttributes(), payload);
 
-      if (!send(transactionID, &packet, flow, session)) {
+      int16_t resp = send(transactionID, &packet, flow, session);
+      if (resp == -1) {
         throw Exception(SITE2SITE_EXCEPTION, "Send Failed");
-        return;
       }
+
       logger_->log_info("Site2Site transaction %s send flow record %s", transactionID.c_str(), flow->getUUIDStr().c_str());
-      uint64_t endTime = getTimeMillis();
-      std::string transitUri = peer_->getURL() + "/" + flow->getUUIDStr();
-      std::string details = "urn:nifi:" + flow->getUUIDStr() + "Remote Host=" + peer_->getHostName();
-      session->getProvenanceReporter()->send(flow, transitUri, details, endTime - startTime, false);
+      if (resp == 0) {
+        uint64_t endTime = getTimeMillis();
+        std::string transitUri = peer_->getURL() + "/" + flow->getUUIDStr();
+        std::string details = "urn:nifi:" + flow->getUUIDStr() + "Remote Host=" + peer_->getHostName();
+        session->getProvenanceReporter()->send(flow, transitUri, details, endTime - startTime, false);
+      }
       session->remove(flow);
 
       uint64_t transferNanos = getTimeNano() - startSendingNanos;
       if (transferNanos > _batchSendNanos)
         break;
 
-      flow = std::static_pointer_cast < FlowFileRecord > (session->get());
+      flow = std::static_pointer_cast<FlowFileRecord>(session->get());
 
       if (!flow) {
         continueTransaction = false;
@@ -1168,13 +1164,11 @@ void Site2SiteClientProtocol::transferFlowFiles(core::ProcessContext *context, c
       std::stringstream ss;
       ss << "Confirm Failed for " << transactionID;
       throw Exception(SITE2SITE_EXCEPTION, ss.str().c_str());
-      return;
     }
     if (!complete(transactionID)) {
       std::stringstream ss;
       ss << "Complete Failed for " << transactionID;
       throw Exception(SITE2SITE_EXCEPTION, ss.str().c_str());
-      return;
     }
     logger_->log_info("Site2Site transaction %s successfully send flow record %d, content bytes %d", transactionID.c_str(), transaction->_transfers, transaction->_bytes);
   } catch (std::exception &exception) {
@@ -1212,7 +1206,6 @@ void Site2SiteClientProtocol::transferString(core::ProcessContext *context, core
     context->yield();
     tearDown();
     throw Exception(SITE2SITE_EXCEPTION, "Can not establish handshake with peer");
-    return;
   }
 
   // Create the transaction
@@ -1223,25 +1216,22 @@ void Site2SiteClientProtocol::transferString(core::ProcessContext *context, core
     context->yield();
     tearDown();
     throw Exception(SITE2SITE_EXCEPTION, "Can not create transaction");
-    return;
   }
 
   try {
     DataPacket packet(this, transaction, attributes, payload);
 
-    if (!send(transactionID, &packet, nullptr, session)) {
+    int16_t resp = send(transactionID, &packet, nullptr, session);
+    if (resp == -1) {
       throw Exception(SITE2SITE_EXCEPTION, "Send Failed");
-      return;
     }
     logger_->log_info("Site2Site transaction %s send bytes length %d", transactionID.c_str(), payload.length());
 
     if (!confirm(transactionID)) {
       throw Exception(SITE2SITE_EXCEPTION, "Confirm Failed");
-      return;
     }
     if (!complete(transactionID)) {
       throw Exception(SITE2SITE_EXCEPTION, "Complete Failed");
-      return;
     }
     logger_->log_info("Site2Site transaction %s successfully send flow record %d, content bytes %d", transactionID.c_str(), transaction->_transfers, transaction->_bytes);
   } catch (std::exception &exception) {

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/ThreadedSchedulingAgent.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/ThreadedSchedulingAgent.cpp b/libminifi/src/ThreadedSchedulingAgent.cpp
index 82d4dfd..d74a74a 100644
--- a/libminifi/src/ThreadedSchedulingAgent.cpp
+++ b/libminifi/src/ThreadedSchedulingAgent.cpp
@@ -37,7 +37,7 @@ namespace nifi {
 namespace minifi {
 
 void ThreadedSchedulingAgent::schedule(std::shared_ptr<core::Processor> processor) {
-  std::lock_guard < std::mutex > lock(mutex_);
+  std::lock_guard<std::mutex> lock(mutex_);
 
   admin_yield_duration_ = 0;
   std::string yieldValue;
@@ -67,20 +67,24 @@ void ThreadedSchedulingAgent::schedule(std::shared_ptr<core::Processor> processo
     return;
   }
 
-  core::ProcessorNode processor_node(processor);
-  auto processContext = std::make_shared < core::ProcessContext > (processor_node, controller_service_provider_, repo_, flow_repo_, content_repo_);
-  auto sessionFactory = std::make_shared < core::ProcessSessionFactory > (processContext.get());
+  std::shared_ptr<core::ProcessorNode> processor_node = std::make_shared<core::ProcessorNode>(processor);
 
-  processor->onSchedule(processContext.get(), sessionFactory.get());
+  auto processContext = std::make_shared<core::ProcessContext>(processor_node, controller_service_provider_, repo_, flow_repo_, content_repo_);
+  auto sessionFactory = std::make_shared<core::ProcessSessionFactory>(processContext);
+
+  processor->onSchedule(processContext, sessionFactory);
 
   std::vector<std::thread *> threads;
 
   ThreadedSchedulingAgent *agent = this;
   for (int i = 0; i < processor->getMaxConcurrentTasks(); i++) {
     // reference the disable function from serviceNode
+    processor->incrementActiveTasks();
+
     std::function<uint64_t()> f_ex = [agent, processor, processContext, sessionFactory] () {
-      return agent->run(processor, processContext.get(), sessionFactory.get());
+      return agent->run(processor, processContext, sessionFactory);
     };
+
     // create a functor that will be submitted to the thread pool.
     std::unique_ptr<TimerAwareMonitor> monitor = std::unique_ptr<TimerAwareMonitor>(new TimerAwareMonitor(&running_));
     utils::Worker<uint64_t> functor(f_ex, processor->getUUIDStr(), std::move(monitor));
@@ -99,7 +103,7 @@ void ThreadedSchedulingAgent::stop() {
 }
 
 void ThreadedSchedulingAgent::unschedule(std::shared_ptr<core::Processor> processor) {
-  std::lock_guard < std::mutex > lock(mutex_);
+  std::lock_guard<std::mutex> lock(mutex_);
   logger_->log_info("Shutting down threads for processor %s/%s", processor->getName().c_str(), processor->getUUIDStr().c_str());
 
   if (processor->getScheduledState() != core::RUNNING) {
@@ -110,6 +114,8 @@ void ThreadedSchedulingAgent::unschedule(std::shared_ptr<core::Processor> proces
   thread_pool_.stopTasks(processor->getUUIDStr());
 
   processor->clearActiveTask();
+
+  processor->setScheduledState(core::STOPPED);
 }
 
 } /* namespace minifi */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/TimerDrivenSchedulingAgent.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/TimerDrivenSchedulingAgent.cpp b/libminifi/src/TimerDrivenSchedulingAgent.cpp
index c3aaa69..13a3439 100644
--- a/libminifi/src/TimerDrivenSchedulingAgent.cpp
+++ b/libminifi/src/TimerDrivenSchedulingAgent.cpp
@@ -29,8 +29,9 @@ namespace apache {
 namespace nifi {
 namespace minifi {
 
-uint64_t TimerDrivenSchedulingAgent::run(std::shared_ptr<core::Processor> processor, core::ProcessContext *processContext, core::ProcessSessionFactory *sessionFactory) {
-  while (this->running_) {
+uint64_t TimerDrivenSchedulingAgent::run(const std::shared_ptr<core::Processor> &processor, const std::shared_ptr<core::ProcessContext> &processContext,
+                                         const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
+  while (this->running_ && processor->isRunning()) {
     bool shouldYield = this->onTrigger(processor, processContext, sessionFactory);
     if (processor->isYield()) {
       // Honor the yield
@@ -41,7 +42,7 @@ uint64_t TimerDrivenSchedulingAgent::run(std::shared_ptr<core::Processor> proces
     }
     return processor->getSchedulingPeriodNano() / 1000000;
   }
-  return 0;
+  return processor->getSchedulingPeriodNano() / 1000000;
 }
 
 } /* namespace minifi */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/c2/C2Agent.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/c2/C2Agent.cpp b/libminifi/src/c2/C2Agent.cpp
new file mode 100644
index 0000000..d1c71e6
--- /dev/null
+++ b/libminifi/src/c2/C2Agent.cpp
@@ -0,0 +1,485 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *repo
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "c2/C2Agent.h"
+#include <unistd.h>
+#include <csignal>
+#include <utility>
+#include <vector>
+#include <map>
+#include <string>
+#include <memory>
+#include "core/state/UpdateController.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace c2 {
+
+C2Agent::C2Agent(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<state::StateMonitor> &updateSink,
+                 const std::shared_ptr<Configure> &configuration)
+    : controller_(controller),
+      update_sink_(updateSink),
+      configuration_(configuration),
+      heart_beat_period_(3000),
+      max_c2_responses(5),
+      logger_(logging::LoggerFactory<C2Agent>::getLogger()) {
+
+  running_configuration = std::make_shared<Configure>();
+
+  last_run_ = std::chrono::steady_clock::now();
+
+  configure(configuration, false);
+
+  c2_producer_ = [&]() {
+    auto now = std::chrono::steady_clock::now();
+    auto time_since = std::chrono::duration_cast<std::chrono::milliseconds>(now - last_run_).count();
+
+    // place priority on messages to send to the c2 server
+      if ( request_mutex.try_lock_until(now + std::chrono::seconds(1)) ) {
+        if (requests.size() > 0) {
+          int count = 0;
+          do {
+            const C2Payload payload(std::move(requests.back()));
+            requests.pop_back();
+            C2Payload && response = protocol_.load()->consumePayload(payload);
+            enqueue_c2_server_response(std::move(response));
+          }while(requests.size() > 0 && ++count < max_c2_responses);
+        }
+        request_mutex.unlock();
+      }
+
+      if ( time_since > heart_beat_period_ ) {
+        last_run_ = now;
+        performHeartBeat();
+      }
+
+      std::this_thread::sleep_for(std::chrono::milliseconds(500));
+      return state::Update(state::UpdateStatus(state::UpdateState::READ_COMPLETE, false));
+    };
+
+  functions_.push_back(c2_producer_);
+
+  c2_consumer_ = [&]() {
+    auto now = std::chrono::steady_clock::now();
+    if ( queue_mutex.try_lock_until(now + std::chrono::seconds(1)) ) {
+      if (responses.size() > 0) {
+        const C2Payload payload(std::move(responses.back()));
+        responses.pop_back();
+        extractPayload(std::move(payload));
+      }
+      queue_mutex.unlock();
+    }
+    return state::Update(state::UpdateStatus(state::UpdateState::READ_COMPLETE, false));
+  };
+
+  functions_.push_back(c2_consumer_);
+}
+
+void C2Agent::configure(const std::shared_ptr<Configure> &configure, bool reconfigure) {
+  std::string clazz, heartbeat_period, device;
+
+  if (!reconfigure) {
+    if (!configure->get("c2.agent.protocol.class", clazz)) {
+      clazz = "RESTSender";
+    }
+    logger_->log_info("Class is %s", clazz);
+    auto protocol = core::ClassLoader::getDefaultClassLoader().instantiateRaw(clazz, clazz);
+
+    if (protocol == nullptr) {
+      protocol = core::ClassLoader::getDefaultClassLoader().instantiateRaw("RESTSender", "RESTSender");
+      logger_->log_info("Class is RESTSender");
+    }
+    C2Protocol *old_protocol = protocol_.exchange(dynamic_cast<C2Protocol*>(protocol));
+
+    protocol_.load()->initialize(controller_, configuration_);
+
+    if (reconfigure && old_protocol != nullptr) {
+      delete old_protocol;
+    }
+  } else {
+    protocol_.load()->update(configure);
+  }
+
+  if (configure->get("c2.agent.heartbeat.period", heartbeat_period)) {
+    try {
+      heart_beat_period_ = std::stoi(heartbeat_period);
+    } catch (const std::invalid_argument &ie) {
+      heart_beat_period_ = 3000;
+    }
+  } else {
+    if (!reconfigure)
+      heart_beat_period_ = 3000;
+  }
+
+  std::string heartbeat_reporters;
+  if (configure->get("c2.agent.heartbeat.reporter.classes", heartbeat_reporters)) {
+    std::vector<std::string> reporters = utils::StringUtils::split(heartbeat_reporters, ",");
+    std::lock_guard<std::mutex> lock(heartbeat_mutex);
+    for (auto reporter : reporters) {
+      auto heartbeat_reporter_obj = core::ClassLoader::getDefaultClassLoader().instantiate(reporter, reporter);
+      if (heartbeat_reporter_obj == nullptr) {
+        logger_->log_debug("Could not instantiate %s", reporter);
+      } else {
+        std::shared_ptr<HeartBeatReporter> shp_reporter = std::static_pointer_cast<HeartBeatReporter>(heartbeat_reporter_obj);
+        shp_reporter->initialize(controller_, configuration_);
+        heartbeat_protocols_.push_back(shp_reporter);
+      }
+    }
+  }
+}
+
+void C2Agent::performHeartBeat() {
+  C2Payload payload(Operation::HEARTBEAT);
+
+  logger_->log_trace("Performing heartbeat");
+
+  std::map<std::string, std::shared_ptr<state::metrics::Metrics>> metrics_copy;
+  {
+    std::lock_guard<std::timed_mutex> lock(metrics_mutex_);
+    if (metrics_map_.size() > 0) {
+      metrics_copy = std::move(metrics_map_);
+    }
+  }
+
+  if (metrics_copy.size() > 0) {
+    C2Payload metrics(Operation::HEARTBEAT);
+    metrics.setLabel("metrics");
+
+    for (auto metric : metrics_copy) {
+      if (metric.second->serialize().size() == 0)
+        continue;
+      C2Payload child_metric_payload(Operation::HEARTBEAT);
+      child_metric_payload.setLabel(metric.first);
+      serializeMetrics(child_metric_payload, metric.first, metric.second->serialize());
+      metrics.addPayload(std::move(child_metric_payload));
+    }
+    payload.addPayload(std::move(metrics));
+  }
+
+  if (device_information_.size() > 0) {
+    C2Payload deviceInfo(Operation::HEARTBEAT);
+    deviceInfo.setLabel("DeviceInfo");
+
+    for (auto metric : device_information_) {
+      C2Payload child_metric_payload(Operation::HEARTBEAT);
+      child_metric_payload.setLabel(metric.first);
+      serializeMetrics(child_metric_payload, metric.first, metric.second->serialize());
+      deviceInfo.addPayload(std::move(child_metric_payload));
+    }
+    payload.addPayload(std::move(deviceInfo));
+  }
+
+  std::vector<std::shared_ptr<state::StateController>> components = update_sink_->getAllComponents();
+
+  if (!components.empty()) {
+    C2ContentResponse component_payload(Operation::HEARTBEAT);
+    component_payload.name = "Components";
+
+    for (auto &component : components) {
+      if (component->isRunning()) {
+        component_payload.operation_arguments[component->getComponentName()] = "enabled";
+      } else {
+        component_payload.operation_arguments[component->getComponentName()] = "disabled";
+      }
+    }
+    payload.addContent(std::move(component_payload));
+  }
+
+  C2ContentResponse state(Operation::HEARTBEAT);
+  state.name = "state";
+  if (update_sink_->isRunning()) {
+    state.operation_arguments["running"] = "true";
+  } else {
+    state.operation_arguments["running"] = "false";
+  }
+  state.operation_arguments["uptime"] = std::to_string(update_sink_->getUptime());
+
+  payload.addContent(std::move(state));
+
+  C2Payload && response = protocol_.load()->consumePayload(payload);
+
+  enqueue_c2_server_response(std::move(response));
+
+  std::lock_guard<std::mutex> lock(heartbeat_mutex);
+
+  for (auto reporter : heartbeat_protocols_) {
+    reporter->heartbeat(payload);
+  }
+}
+
+void C2Agent::serializeMetrics(C2Payload &metric_payload, const std::string &name, const std::vector<state::metrics::MetricResponse> &metrics) {
+  for (auto metric : metrics) {
+    if (metric.children.size() > 0) {
+      C2Payload child_metric_payload(metric_payload.getOperation());
+      child_metric_payload.setLabel(metric.name);
+      serializeMetrics(child_metric_payload, metric.name, metric.children);
+
+      metric_payload.addPayload(std::move(child_metric_payload));
+    } else {
+      C2ContentResponse response(metric_payload.getOperation());
+      response.name = name;
+
+      response.operation_arguments[metric.name] = metric.value;
+
+      metric_payload.addContent(std::move(response));
+    }
+  }
+}
+
+void C2Agent::extractPayload(const C2Payload &&resp) {
+  if (resp.getStatus().getState() == state::UpdateState::NESTED) {
+    const std::vector<C2Payload> &payloads = resp.getNestedPayloads();
+
+    for (const auto &payload : payloads) {
+      extractPayload(std::move(payload));
+    }
+    return;
+  }
+  switch (resp.getStatus().getState()) {
+    case state::UpdateState::INITIATE:
+      logger_->log_debug("Received initiation event from protocol");
+      break;
+    case state::UpdateState::READ_COMPLETE:
+      logger_->log_trace("Received Ack from Server");
+      // we have a heartbeat response.
+      for (const auto &server_response : resp.getContent()) {
+        handle_c2_server_response(server_response);
+      }
+      break;
+    case state::UpdateState::FULLY_APPLIED:
+      logger_->log_debug("Received fully applied event from protocol");
+      break;
+    case state::UpdateState::PARTIALLY_APPLIED:
+      logger_->log_debug("Received partially applied event from protocol");
+      break;
+    case state::UpdateState::NOT_APPLIED:
+      logger_->log_debug("Received not applied event from protocol");
+      break;
+    case state::UpdateState::SET_ERROR:
+      logger_->log_debug("Received error event from protocol");
+      break;
+    case state::UpdateState::READ_ERROR:
+      logger_->log_debug("Received error event from protocol");
+      break;
+    case state::UpdateState::NESTED:  // multiple updates embedded into one
+
+    default:
+      logger_->log_debug("Received nested event from protocol");
+      break;
+  }
+}
+
+void C2Agent::extractPayload(const C2Payload &resp) {
+  if (resp.getStatus().getState() == state::UpdateState::NESTED) {
+    const std::vector<C2Payload> &payloads = resp.getNestedPayloads();
+    for (const auto &payload : payloads) {
+      extractPayload(payload);
+    }
+  }
+  switch (resp.getStatus().getState()) {
+    case state::UpdateState::READ_COMPLETE:
+      // we have a heartbeat response.
+      for (const auto &server_response : resp.getContent()) {
+        handle_c2_server_response(server_response);
+      }
+      break;
+    default:
+      break;
+  }
+}
+
+void C2Agent::handle_c2_server_response(const C2ContentResponse &resp) {
+  switch (resp.op) {
+    case Operation::CLEAR:
+      // we've been told to clear something
+      if (resp.name == "connection") {
+        logger_->log_debug("Clearing connection %s", resp.name);
+        for (auto connection : resp.operation_arguments) {
+          update_sink_->clearConnection(connection.second);
+        }
+        C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true);
+        enqueue_c2_response(std::move(response));
+      } else if (resp.name == "repositories") {
+        update_sink_->drainRepositories();
+        C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true);
+        enqueue_c2_response(std::move(response));
+
+      } else {
+        logger_->log_debug("Clearing unknown %s", resp.name);
+      }
+
+      break;
+    case Operation::UPDATE: {
+      handle_update(resp);
+      C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true);
+      enqueue_c2_response(std::move(response));
+    }
+      break;
+
+    case Operation::DESCRIBE:
+      handle_describe(resp);
+      break;
+    case Operation::RESTART: {
+      update_sink_->stop(true);
+      C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true);
+      C2Payload && ret = protocol_.load()->consumePayload(std::move(response));
+      exit(1);
+    }
+      break;
+    case Operation::START:
+    case Operation::STOP: {
+      if (resp.name == "C2" || resp.name == "c2") {
+        raise(SIGTERM);
+      }
+
+      std::vector<std::shared_ptr<state::StateController>> components = update_sink_->getComponents(resp.name);
+
+      // stop all referenced components.
+      for (auto &component : components) {
+        logger_->log_debug("Stopping component %s", component->getComponentName());
+        if (resp.op == Operation::STOP)
+          component->stop(true);
+        else
+          component->start();
+      }
+
+      if (resp.ident.length() > 0) {
+        C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true);
+        enqueue_c2_response(std::move(response));
+      }
+    }
+      //
+      break;
+    default:
+      break;
+      // do nothing
+  }
+}
+
+/**
+ * Descriptions are special types of requests that require information
+ * to be put into the acknowledgement
+ */
+void C2Agent::handle_describe(const C2ContentResponse &resp) {
+  if (resp.name == "metrics") {
+    auto reporter = std::dynamic_pointer_cast<state::metrics::MetricsReporter>(update_sink_);
+
+    if (reporter != nullptr) {
+      auto metricsClass = resp.operation_arguments.find("metricsClass");
+      uint8_t metric_class_id = 0;
+      if (metricsClass != resp.operation_arguments.end()) {
+        // we have a class
+        try {
+          metric_class_id = std::stoi(metricsClass->second);
+        } catch (...) {
+          logger_->log_error("Could not convert %s into an integer", metricsClass->second);
+        }
+      }
+
+      std::vector<std::shared_ptr<state::metrics::Metrics>> metrics_vec;
+
+      reporter->getMetrics(metrics_vec, metric_class_id);
+      C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true);
+      response.setLabel("metrics");
+      for (auto metric : metrics_vec) {
+        serializeMetrics(response, metric->getName(), metric->serialize());
+      }
+      enqueue_c2_response(std::move(response));
+    }
+
+  } else if (resp.name == "configuration") {
+    auto keys = configuration_->getConfiguredKeys();
+    C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true);
+    response.setLabel("configuration_options");
+    C2Payload options(Operation::ACKNOWLEDGE, resp.ident, false, true);
+    options.setLabel("configuration_options");
+    std::string value;
+    for (auto key : keys) {
+      C2ContentResponse option(Operation::ACKNOWLEDGE);
+      option.name = key;
+      if (configuration_->get(key, value)) {
+        option.operation_arguments[key] = value;
+        options.addContent(std::move(option));
+      }
+    }
+    response.addPayload(std::move(options));
+    enqueue_c2_response(std::move(response));
+    return;
+  }
+  C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true);
+  enqueue_c2_response(std::move(response));
+}
+
+void C2Agent::handle_update(const C2ContentResponse &resp) {
+  // we've been told to update something
+  if (resp.name == "configuration") {
+    auto url = resp.operation_arguments.find("location");
+    if (url != resp.operation_arguments.end()) {
+      // just get the raw data.
+      C2Payload payload(Operation::UPDATE, false, true);
+
+      C2Payload &&response = protocol_.load()->consumePayload(url->second, payload, RECEIVE, false);
+
+      if (update_sink_->applyUpdate(response.getRawData()) == 0) {
+        C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true);
+        enqueue_c2_response(std::move(response));
+      }
+      // send
+    } else {
+      auto update_text = resp.operation_arguments.find("configuration_data");
+      if (update_text != resp.operation_arguments.end()) {
+        update_sink_->applyUpdate(update_text->second);
+      }
+    }
+  } else if (resp.name == "c2") {
+    // prior configuration options were already in place. thus
+    // we clear the map so that we don't go through replacing
+    // unnecessary objects.
+    running_configuration->clear();
+
+    for (auto entry : resp.operation_arguments) {
+      running_configuration->set(entry.first, entry.second);
+    }
+
+    if (resp.operation_arguments.size() > 0)
+      configure(running_configuration);
+  }
+}
+
+int16_t C2Agent::setMetrics(const std::shared_ptr<state::metrics::Metrics> &metric) {
+  auto now = std::chrono::steady_clock::now();
+  bool is_device_metric = std::dynamic_pointer_cast<state::metrics::DeviceMetric>(metric) != nullptr;
+  if (metrics_mutex_.try_lock_until(now + std::chrono::seconds(1))) {
+    if (is_device_metric) {
+      device_information_[metric->getName()] = metric;
+    } else {
+      metrics_map_[metric->getName()] = metric;
+    }
+    metrics_mutex_.unlock();
+    return 0;
+  }
+  return -1;
+}
+
+} /* namespace c2 */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */


Mime
View raw message