nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ald...@apache.org
Subject [3/5] nifi-minifi-cpp git commit: MINIFI-226: Add controller services capabilities along with unit tests Fix test failures Update Travis YML Update readme to link to MiNiFi licensing information
Date Thu, 18 May 2017 13:07:53 GMT
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/src/controllers/SSLContextService.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/controllers/SSLContextService.cpp b/libminifi/src/controllers/SSLContextService.cpp
new file mode 100644
index 0000000..51a7cc4
--- /dev/null
+++ b/libminifi/src/controllers/SSLContextService.cpp
@@ -0,0 +1,226 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "controllers/SSLContextService.h"
+#include <openssl/err.h>
+#include <openssl/ssl.h>
+#include <string>
+#include <memory>
+#include <set>
+#include "core/Property.h"
+#include "io/validation.h"
+#include "properties/Configure.h"
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace controllers {
+
+void SSLContextService::initialize() {
+  if (initialized_)
+    return;
+
+  std::lock_guard<std::mutex> lock(initialization_mutex_);
+
+  ControllerService::initialize();
+
+  initializeTLS();
+}
+
+std::unique_ptr<SSLContext> SSLContextService::createSSLContext() {
+  SSL_library_init();
+  const SSL_METHOD *method;
+
+  OpenSSL_add_all_algorithms();
+  SSL_load_error_strings();
+  method = TLSv1_2_client_method();
+  SSL_CTX *ctx = SSL_CTX_new(method);
+
+  if (SSL_CTX_use_certificate_file(ctx, certificate.c_str(), SSL_FILETYPE_PEM)
+      <= 0) {
+    logger_->log_error("Could not create load certificate, error : %s",
+                       std::strerror(errno));
+    return nullptr;
+  }
+  if (!IsNullOrEmpty(passphrase_)) {
+    SSL_CTX_set_default_passwd_cb_userdata(ctx, &passphrase_);
+    SSL_CTX_set_default_passwd_cb(ctx, pemPassWordCb);
+  }
+
+  int retp = SSL_CTX_use_PrivateKey_file(ctx, private_key_.c_str(),
+                                         SSL_FILETYPE_PEM);
+  if (retp != 1) {
+    logger_->log_error("Could not create load private key,%i on %s error : %s",
+                       retp, private_key_, std::strerror(errno));
+    return nullptr;
+  }
+
+  if (!SSL_CTX_check_private_key(ctx)) {
+    logger_->log_error(
+        "Private key does not match the public certificate, error : %s",
+        std::strerror(errno));
+    return nullptr;
+  }
+
+  retp = SSL_CTX_load_verify_locations(ctx, ca_certificate_.c_str(), 0);
+  if (retp == 0) {
+    logger_->log_error("Can not load CA certificate, Exiting, error : %s",
+                       std::strerror(errno));
+  }
+  return std::unique_ptr<SSLContext>(new SSLContext(ctx));
+}
+
+const std::string &SSLContextService::getCertificateFile() {
+  std::lock_guard<std::mutex> lock(initialization_mutex_);
+  return certificate;
+}
+
+const std::string &SSLContextService::getPassphrase() {
+  std::lock_guard<std::mutex> lock(initialization_mutex_);
+  return passphrase_;
+}
+
+const std::string &SSLContextService::getPassphraseFile() {
+  std::lock_guard<std::mutex> lock(initialization_mutex_);
+  return passphrase_file_;
+}
+
+const std::string &SSLContextService::getPrivateKeyFile() {
+  std::lock_guard<std::mutex> lock(initialization_mutex_);
+  return private_key_;
+}
+
+const std::string &SSLContextService::getCACertificate() {
+  std::lock_guard<std::mutex> lock(initialization_mutex_);
+  return ca_certificate_;
+}
+
+void SSLContextService::onEnable() {
+  valid_ = true;
+  core::Property property("Client Certificate", "Client Certificate");
+  core::Property privKey("Private Key", "Private Key file");
+  core::Property passphrase_prop(
+      "Passphrase", "Client passphrase. Either a file or unencrypted text");
+  core::Property caCert("CA Certificate", "CA certificate file");
+  std::string default_dir;
+  if (nullptr != configuration_)
+  configuration_->get(Configure::nifi_default_directory, default_dir);
+
+  logger_->log_trace("onEnable()");
+
+  if (getProperty(property.getName(), certificate)
+      && getProperty(privKey.getName(), private_key_)) {
+    logger_->log_error(
+        "Certificate and Private Key PEM file not configured, error: %s.",
+        std::strerror(errno));
+
+    std::ifstream cert_file(certificate);
+    std::ifstream priv_file(private_key_);
+    if (!cert_file.good()) {
+      logger_->log_info("%s not good", certificate);
+      std::string test_cert = default_dir + certificate;
+      std::ifstream cert_file_test(test_cert);
+      if (cert_file_test.good()) {
+        certificate = test_cert;
+        logger_->log_debug("%s now good", certificate);
+      } else {
+        logger_->log_debug("%s still not good", test_cert);
+        valid_ = false;
+      }
+      cert_file_test.close();
+    }
+
+    if (!priv_file.good()) {
+      std::string test_priv = default_dir + private_key_;
+      std::ifstream private_file_test(test_priv);
+      if (private_file_test.good()) {
+        private_key_ = test_priv;
+      } else {
+        valid_ = false;
+      }
+      private_file_test.close();
+    }
+    cert_file.close();
+    priv_file.close();
+
+  } else {
+    logger_->log_debug("Certificate empty");
+  }
+  if (!getProperty(passphrase_prop.getName(), passphrase_)) {
+    logger_->log_debug("No pass phrase for %s", certificate);
+  } else {
+    std::ifstream passphrase_file(passphrase_);
+    if (passphrase_file.good()) {
+      passphrase_file_ = passphrase_;
+      // we should read it from the file
+      passphrase_.assign((std::istreambuf_iterator<char>(passphrase_file)),
+                         std::istreambuf_iterator<char>());
+    } else {
+      std::string test_passphrase = default_dir + passphrase_;
+      std::ifstream passphrase_file_test(test_passphrase);
+      if (passphrase_file_test.good()) {
+        passphrase_ = test_passphrase;
+        passphrase_file_ = test_passphrase;
+        passphrase_.assign(
+            (std::istreambuf_iterator<char>(passphrase_file_test)),
+            std::istreambuf_iterator<char>());
+      } else {
+        valid_ = false;
+      }
+      passphrase_file_test.close();
+    }
+    passphrase_file.close();
+  }
+  // load CA certificates
+  if (!getProperty(caCert.getName(), ca_certificate_)) {
+    logger_->log_error("Can not load CA certificate.");
+  } else {
+    std::ifstream cert_file(ca_certificate_);
+    if (!cert_file.good()) {
+      std::string test_ca_cert = default_dir + ca_certificate_;
+      std::ifstream ca_cert_file_file_test(test_ca_cert);
+      if (ca_cert_file_file_test.good()) {
+        ca_certificate_ = test_ca_cert;
+      } else {
+        valid_ = false;
+      }
+      ca_cert_file_file_test.close();
+    }
+    cert_file.close();
+  }
+}
+
+void SSLContextService::initializeTLS() {
+  core::Property property("Client Certificate", "Client Certificate");
+  core::Property privKey("Private Key", "Private Key file");
+  core::Property passphrase_prop(
+      "Passphrase", "Client passphrase. Either a file or unencrypted text");
+  core::Property caCert("CA Certificate", "CA certificate file");
+  std::set<core::Property> supportedProperties;
+  supportedProperties.insert(property);
+  supportedProperties.insert(privKey);
+  supportedProperties.insert(passphrase_prop);
+  supportedProperties.insert(caCert);
+  setSupportedProperties(supportedProperties);
+}
+
+} /* namespace controllers */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/src/core/ClassLoader.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/ClassLoader.cpp b/libminifi/src/core/ClassLoader.cpp
new file mode 100644
index 0000000..77c4a85
--- /dev/null
+++ b/libminifi/src/core/ClassLoader.cpp
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/ClassLoader.h"
+#include <sys/mman.h>
+#include <memory>
+#include <string>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+
+ClassLoader &ClassLoader::getDefaultClassLoader() {
+  static ClassLoader ret;
+  // populate ret
+  return ret;
+}
+uint16_t ClassLoader::registerResource(const std::string &resource) {
+  void* resource_ptr = dlopen(resource.c_str(), RTLD_LAZY);
+  if (!resource_ptr) {
+    logger_->log_error("Cannot load library: %s", dlerror());
+    return RESOURCE_FAILURE;
+  } else {
+    std::lock_guard<std::mutex> lock(internal_mutex_);
+    dl_handles_.push_back(resource_ptr);
+  }
+
+  // reset errors
+  dlerror();
+
+  // load the symbols
+  createFactory* create_factory_func = reinterpret_cast<createFactory*>(dlsym(
+      resource_ptr, "createFactory"));
+  const char* dlsym_error = dlerror();
+  if (dlsym_error) {
+    logger_->log_error("Cannot load library: %s", dlsym_error);
+    return RESOURCE_FAILURE;
+  }
+
+  ObjectFactory *factory = create_factory_func();
+
+  std::lock_guard<std::mutex> lock(internal_mutex_);
+
+  loaded_factories_[factory->getClassName()] = std::unique_ptr<ObjectFactory>(
+      factory);
+
+  return RESOURCE_SUCCESS;
+}
+
+} /* namespace core */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/src/core/ConfigurableComponent.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/ConfigurableComponent.cpp b/libminifi/src/core/ConfigurableComponent.cpp
index fa5ff7d..cf2089e 100644
--- a/libminifi/src/core/ConfigurableComponent.cpp
+++ b/libminifi/src/core/ConfigurableComponent.cpp
@@ -43,6 +43,20 @@ ConfigurableComponent::ConfigurableComponent(
 ConfigurableComponent::~ConfigurableComponent() {
 }
 
+bool ConfigurableComponent::getProperty(const std::string &name,
+                                        Property &prop) {
+  std::lock_guard<std::mutex> lock(configuration_mutex_);
+
+  auto &&it = properties_.find(name);
+
+  if (it != properties_.end()) {
+    prop = it->second;
+    return true;
+  } else {
+    return false;
+  }
+}
+
 /**
  * Get property using the provided name.
  * @param name property name.
@@ -54,12 +68,11 @@ bool ConfigurableComponent::getProperty(const std::string name,
   std::lock_guard<std::mutex> lock(configuration_mutex_);
 
   auto &&it = properties_.find(name);
-
   if (it != properties_.end()) {
     Property item = it->second;
     value = item.getValue();
-    my_logger_->log_info("Processor %s property name %s value %s", name.c_str(),
-                         item.getName().c_str(), value.c_str());
+    my_logger_->log_info("Processor %s property name %s value %s", name,
+                         item.getName(), value);
     return true;
   } else {
     return false;
@@ -92,6 +105,29 @@ bool ConfigurableComponent::setProperty(const std::string name,
  * Sets the property using the provided name
  * @param property name
  * @param value property value.
+ * @return result of setting property.
+ */
+bool ConfigurableComponent::updateProperty(const std::string &name,
+                                           const std::string &value) {
+  std::lock_guard<std::mutex> lock(configuration_mutex_);
+  auto &&it = properties_.find(name);
+
+  if (it != properties_.end()) {
+    Property item = it->second;
+    item.addValue(value);
+    properties_[item.getName()] = item;
+    my_logger_->log_info("Component %s property name %s value %s", name.c_str(),
+                         item.getName().c_str(), value.c_str());
+    return true;
+  } else {
+    return false;
+  }
+}
+
+/**
+ * Sets the property using the provided name
+ * @param property name
+ * @param value property value.
  * @return whether property was set or not
  */
 bool ConfigurableComponent::setProperty(Property &prop, std::string value) {

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/src/core/ConfigurationFactory.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/ConfigurationFactory.cpp b/libminifi/src/core/ConfigurationFactory.cpp
index 4ccead2..6aa42e3 100644
--- a/libminifi/src/core/ConfigurationFactory.cpp
+++ b/libminifi/src/core/ConfigurationFactory.cpp
@@ -21,6 +21,7 @@
 #include <memory>
 #include <algorithm>
 #include <set>
+#include "core/Core.h"
 #include "core/ConfigurationFactory.h"
 #include "core/FlowConfiguration.h"
 #include "io/StreamFactory.h"
@@ -53,17 +54,20 @@ std::unique_ptr<core::FlowConfiguration> createFlowConfiguration(
     if (class_name_lc == "flowconfiguration") {
       // load the base configuration.
       return std::unique_ptr<core::FlowConfiguration>(
-          new core::FlowConfiguration(repo, flow_file_repo, stream_factory, path));
+          new core::FlowConfiguration(repo, flow_file_repo, stream_factory,
+                                      configure, path));
 
     } else if (class_name_lc == "yamlconfiguration") {
       // only load if the class is defined.
       return std::unique_ptr<core::FlowConfiguration>(
-          instantiate<core::YamlConfiguration>(repo, flow_file_repo, stream_factory, path));
+          instantiate<core::YamlConfiguration>(repo, flow_file_repo,
+                                               stream_factory, configure, path));
 
     } else {
       if (fail_safe) {
         return std::unique_ptr<core::FlowConfiguration>(
-            new core::FlowConfiguration(repo, flow_file_repo, stream_factory, path));
+            new core::FlowConfiguration(repo, flow_file_repo, stream_factory,
+                                        configure, path));
       } else {
         throw std::runtime_error(
             "Support for the provided configuration class could not be found");
@@ -72,7 +76,8 @@ std::unique_ptr<core::FlowConfiguration> createFlowConfiguration(
   } catch (const std::runtime_error &r) {
     if (fail_safe) {
       return std::unique_ptr<core::FlowConfiguration>(
-          new core::FlowConfiguration(repo, flow_file_repo, stream_factory, path));
+          new core::FlowConfiguration(repo, flow_file_repo, stream_factory,
+                                      configure, path));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/src/core/Connectable.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/Connectable.cpp b/libminifi/src/core/Connectable.cpp
index e234a06..58ae6d4 100644
--- a/libminifi/src/core/Connectable.cpp
+++ b/libminifi/src/core/Connectable.cpp
@@ -148,9 +148,9 @@ std::set<std::shared_ptr<Connectable>> Connectable::getOutGoingConnections(
     std::string relationship) {
   std::set<std::shared_ptr<Connectable>> empty;
 
-  auto &&it = _outGoingConnections.find(relationship);
-  if (it != _outGoingConnections.end()) {
-    return _outGoingConnections[relationship];
+  auto &&it = out_going_connections_.find(relationship);
+  if (it != out_going_connections_.end()) {
+    return out_going_connections_[relationship];
   } else {
     return empty;
   }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/src/core/FlowConfiguration.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/FlowConfiguration.cpp b/libminifi/src/core/FlowConfiguration.cpp
index 90058d2..1ed9176 100644
--- a/libminifi/src/core/FlowConfiguration.cpp
+++ b/libminifi/src/core/FlowConfiguration.cpp
@@ -19,6 +19,7 @@
 #include "core/FlowConfiguration.h"
 #include <memory>
 #include <string>
+#include "core/ClassLoader.h"
 
 namespace org {
 namespace apache {
@@ -31,54 +32,12 @@ FlowConfiguration::~FlowConfiguration() {
 
 std::shared_ptr<core::Processor> FlowConfiguration::createProcessor(
     std::string name, uuid_t uuid) {
-  std::shared_ptr<core::Processor> processor = nullptr;
-  if (name
-      == org::apache::nifi::minifi::processors::GenerateFlowFile::ProcessorName) {
-    processor = std::make_shared<
-        org::apache::nifi::minifi::processors::GenerateFlowFile>(name, uuid);
-  } else if (name
-      == org::apache::nifi::minifi::processors::LogAttribute::ProcessorName) {
-    processor = std::make_shared<
-        org::apache::nifi::minifi::processors::LogAttribute>(name, uuid);
-  } else if (name
-      == org::apache::nifi::minifi::processors::GetFile::ProcessorName) {
-    processor =
-        std::make_shared<org::apache::nifi::minifi::processors::GetFile>(name,
-                                                                         uuid);
-  } else if (name
-      == org::apache::nifi::minifi::processors::PutFile::ProcessorName) {
-    processor =
-        std::make_shared<org::apache::nifi::minifi::processors::PutFile>(name,
-                                                                         uuid);
-  } else if (name
-      == org::apache::nifi::minifi::processors::TailFile::ProcessorName) {
-    processor =
-        std::make_shared<org::apache::nifi::minifi::processors::TailFile>(name,
-                                                                          uuid);
-  } else if (name
-      == org::apache::nifi::minifi::processors::ListenSyslog::ProcessorName) {
-    processor = std::make_shared<
-        org::apache::nifi::minifi::processors::ListenSyslog>(name, uuid);
-  } else if (name
-        == org::apache::nifi::minifi::processors::ListenHTTP::ProcessorName) {
-      processor = std::make_shared<
-          org::apache::nifi::minifi::processors::ListenHTTP>(name, uuid);
-  } else if (name
-          == org::apache::nifi::minifi::processors::InvokeHTTP::ProcessorName) {
-        processor = std::make_shared<
-            org::apache::nifi::minifi::processors::InvokeHTTP>(name, uuid);
-  } else if (name
-      == org::apache::nifi::minifi::processors::ExecuteProcess::ProcessorName) {
-    processor = std::make_shared<
-        org::apache::nifi::minifi::processors::ExecuteProcess>(name, uuid);
-  } else if (name
-      == org::apache::nifi::minifi::processors::AppendHostInfo::ProcessorName) {
-    processor = std::make_shared<
-        org::apache::nifi::minifi::processors::AppendHostInfo>(name, uuid);
-  } else {
+  auto ptr = core::ClassLoader::getDefaultClassLoader().instantiate(name, uuid);
+  if (nullptr == ptr) {
     logger_->log_error("No Processor defined for %s", name.c_str());
-    return nullptr;
   }
+  std::shared_ptr<core::Processor> processor = std::static_pointer_cast<
+      core::Processor>(ptr);
 
   // initialize the processor
   processor->initialize();
@@ -89,8 +48,10 @@ std::shared_ptr<core::Processor> FlowConfiguration::createProcessor(
 std::shared_ptr<core::Processor> FlowConfiguration::createProvenanceReportTask() {
   std::shared_ptr<core::Processor> processor = nullptr;
 
-  processor = std::make_shared<
-        org::apache::nifi::minifi::core::reporting::SiteToSiteProvenanceReportingTask>(stream_factory_);
+  processor =
+      std::make_shared<
+          org::apache::nifi::minifi::core::reporting::SiteToSiteProvenanceReportingTask>(
+          stream_factory_);
   // initialize the processor
   processor->initialize();
 
@@ -114,6 +75,15 @@ std::shared_ptr<minifi::Connection> FlowConfiguration::createConnection(
   return std::make_shared<minifi::Connection>(flow_file_repo_, name, uuid);
 }
 
+std::shared_ptr<core::controller::ControllerServiceNode> FlowConfiguration::createControllerService(
+    const std::string &class_name, const std::string &name, uuid_t uuid) {
+  std::shared_ptr<core::controller::ControllerServiceNode> controllerServicesNode =
+      service_provider_->createControllerService(class_name, name, true);
+  if (nullptr != controllerServicesNode)
+    controllerServicesNode->setUUID(uuid);
+  return controllerServicesNode;
+}
+
 } /* namespace core */
 } /* namespace minifi */
 } /* namespace nifi */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/src/core/ProcessGroup.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/ProcessGroup.cpp b/libminifi/src/core/ProcessGroup.cpp
index 1a6e729..1b8ec3a 100644
--- a/libminifi/src/core/ProcessGroup.cpp
+++ b/libminifi/src/core/ProcessGroup.cpp
@@ -202,6 +202,22 @@ std::shared_ptr<Processor> ProcessGroup::findProcessor(uuid_t uuid) {
   return ret;
 }
 
+void ProcessGroup::addControllerService(
+    const std::string &nodeId,
+    std::shared_ptr<core::controller::ControllerServiceNode> &node) {
+  controller_service_map_.put(nodeId, node);
+}
+
+/**
+ * Find controllerservice node will search child groups until the nodeId is found.
+ * @param node node identifier
+ * @return controller service node, if it exists.
+ */
+std::shared_ptr<core::controller::ControllerServiceNode> ProcessGroup::findControllerService(
+    const std::string &nodeId) {
+  return controller_service_map_.getControllerServiceNode(nodeId);
+}
+
 std::shared_ptr<Processor> ProcessGroup::findProcessor(
     const std::string &processorName) {
   std::shared_ptr<Processor> ret = NULL;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/src/core/Processor.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/Processor.cpp b/libminifi/src/core/Processor.cpp
index e124992..7464af2 100644
--- a/libminifi/src/core/Processor.cpp
+++ b/libminifi/src/core/Processor.cpp
@@ -114,15 +114,15 @@ bool Processor::addConnection(std::shared_ptr<Connectable> conn) {
   if (my_uuid == source_uuid) {
     std::string relationship = connection->getRelationship().getName();
     // Connection is source from the current processor
-    auto &&it = _outGoingConnections.find(relationship);
-    if (it != _outGoingConnections.end()) {
+    auto &&it = out_going_connections_.find(relationship);
+    if (it != out_going_connections_.end()) {
       // We already has connection for this relationship
       std::set<std::shared_ptr<Connectable>> existedConnection = it->second;
       if (existedConnection.find(connection) == existedConnection.end()) {
         // We do not have the same connection for this relationship yet
         existedConnection.insert(connection);
         connection->setSource(shared_from_this());
-        _outGoingConnections[relationship] = existedConnection;
+        out_going_connections_[relationship] = existedConnection;
         logger_->log_info(
             "Add connection %s into Processor %s outgoing connection for relationship %s",
             connection->getName().c_str(), name_.c_str(), relationship.c_str());
@@ -133,7 +133,7 @@ bool Processor::addConnection(std::shared_ptr<Connectable> conn) {
       std::set<std::shared_ptr<Connectable>> newConnection;
       newConnection.insert(connection);
       connection->setSource(shared_from_this());
-      _outGoingConnections[relationship] = newConnection;
+      out_going_connections_[relationship] = newConnection;
       logger_->log_info(
           "Add connection %s into Processor %s outgoing connection for relationship %s",
           connection->getName().c_str(), name_.c_str(), relationship.c_str());
@@ -178,13 +178,13 @@ void Processor::removeConnection(std::shared_ptr<Connectable> conn) {
   if (uuid_compare(uuid_, srcUUID) == 0) {
     std::string relationship = connection->getRelationship().getName();
     // Connection is source from the current processor
-    auto &&it = _outGoingConnections.find(relationship);
-    if (it == _outGoingConnections.end()) {
+    auto &&it = out_going_connections_.find(relationship);
+    if (it == out_going_connections_.end()) {
       return;
     } else {
-      if (_outGoingConnections[relationship].find(connection)
-          != _outGoingConnections[relationship].end()) {
-        _outGoingConnections[relationship].erase(connection);
+      if (out_going_connections_[relationship].find(connection)
+          != out_going_connections_[relationship].end()) {
+        out_going_connections_[relationship].erase(connection);
         connection->setSource(NULL);
         logger_->log_info(
             "Remove connection %s into Processor %s outgoing connection for relationship %s",
@@ -259,7 +259,7 @@ bool Processor::flowFilesQueued() {
 bool Processor::flowFilesOutGoingFull() {
   std::lock_guard<std::mutex> lock(mutex_);
 
-  for (auto &&connection : _outGoingConnections) {
+  for (auto &&connection : out_going_connections_) {
     // We already has connection for this relationship
     std::set<std::shared_ptr<Connectable>> existedConnection = connection.second;
     for (const auto conn : existedConnection) {

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/src/core/Property.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/Property.cpp b/libminifi/src/core/Property.cpp
index aa002af..dd3a9cb 100644
--- a/libminifi/src/core/Property.cpp
+++ b/libminifi/src/core/Property.cpp
@@ -18,6 +18,7 @@
 
 #include "core/Property.h"
 #include <string>
+#include <vector>
 namespace org {
 namespace apache {
 namespace nifi {
@@ -34,11 +35,27 @@ std::string Property::getDescription() {
 }
 // Get value for the property
 std::string Property::getValue() const {
-  return value_;
+  if (!values_.empty())
+    return values_.front();
+  else
+    return "";
+}
+
+std::vector<std::string> &Property::getValues() {
+  return values_;
 }
 // Set value for the property
 void Property::setValue(std::string value) {
-  value_ = value;
+  if (!isCollection) {
+    values_.clear();
+    values_.push_back(std::string(value.c_str()));
+  } else {
+    values_.push_back(std::string(value.c_str()));
+  }
+}
+
+void Property::addValue(const std::string &value) {
+  values_.push_back(std::string(value.c_str()));
 }
 // Compare
 bool Property::operator <(const Property & right) const {
@@ -47,7 +64,8 @@ bool Property::operator <(const Property & right) const {
 
 const Property &Property::operator=(const Property &other) {
   name_ = other.name_;
-  value_ = other.value_;
+  values_ = other.values_;
+  isCollection = other.isCollection;
   return *this;
 }
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/src/core/controller/ControllerServiceNode.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/controller/ControllerServiceNode.cpp b/libminifi/src/core/controller/ControllerServiceNode.cpp
new file mode 100644
index 0000000..12e3653
--- /dev/null
+++ b/libminifi/src/core/controller/ControllerServiceNode.cpp
@@ -0,0 +1,49 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/controller/ControllerServiceNode.h"
+#include <memory>
+#include <vector>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+namespace controller {
+
+std::shared_ptr<ControllerService> &ControllerServiceNode::getControllerServiceImplementation() {
+  return controller_service_;
+}
+
+std::vector<std::shared_ptr<ControllerServiceNode> > &ControllerServiceNode::getLinkedControllerServices() {
+  return linked_controller_services_;
+}
+
+std::vector<std::shared_ptr<ConfigurableComponent> > &ControllerServiceNode::getLinkedComponents() {
+  return linked_components_;
+}
+
+
+
+} /* namespace controller */
+} /* namespace core */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/src/core/controller/ControllerServiceProvider.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/controller/ControllerServiceProvider.cpp b/libminifi/src/core/controller/ControllerServiceProvider.cpp
new file mode 100644
index 0000000..da5c6a1
--- /dev/null
+++ b/libminifi/src/core/controller/ControllerServiceProvider.cpp
@@ -0,0 +1,50 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/controller/ControllerServiceProvider.h"
+#include <memory>
+#include <string>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+namespace controller {
+
+/**
+ * @param identifier of controller service
+ * @return the ControllerService that is registered with the given
+ * identifier
+ */
+std::shared_ptr<ControllerService> ControllerServiceProvider::getControllerService(
+    const std::string &identifier) {
+  auto service = controller_map_->getControllerServiceNode(identifier);
+  if (service != nullptr) {
+    return service->getControllerServiceImplementation();
+  } else {
+    return nullptr;
+  }
+}
+
+} /* namespace controller */
+} /* namespace core */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/src/core/controller/StandardControllerServiceNode.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/controller/StandardControllerServiceNode.cpp b/libminifi/src/core/controller/StandardControllerServiceNode.cpp
new file mode 100644
index 0000000..26804f6
--- /dev/null
+++ b/libminifi/src/core/controller/StandardControllerServiceNode.cpp
@@ -0,0 +1,69 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/controller/StandardControllerServiceNode.h"
+#include <memory>
+#include <mutex>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+namespace controller {
+std::shared_ptr<core::ProcessGroup> &StandardControllerServiceNode::getProcessGroup() {
+  std::lock_guard<std::mutex> lock(mutex_);
+  return process_group_;
+}
+
+void StandardControllerServiceNode::setProcessGroup(
+    std::shared_ptr<ProcessGroup> &processGroup) {
+  std::lock_guard<std::mutex> lock(mutex_);
+  process_group_ = processGroup;
+}
+
+bool StandardControllerServiceNode::enable() {
+  Property property("Linked Services", "Referenced Controller Services");
+  controller_service_->setState(ENABLED);
+  logger_->log_trace("Enabling CSN %s", getName());
+  if (getProperty(property.getName(), property)) {
+    active = true;
+    for (auto linked_service : property.getValues()) {
+      std::shared_ptr<ControllerServiceNode> csNode = provider
+          ->getControllerServiceNode(linked_service);
+      if (nullptr != csNode) {
+        std::lock_guard<std::mutex> lock(mutex_);
+        linked_controller_services_.push_back(csNode);
+      }
+    }
+  } else {
+  }
+  std::shared_ptr<ControllerService> impl =
+      getControllerServiceImplementation();
+  if (nullptr != impl) {
+    impl->onEnable();
+  }
+  return true;
+}
+
+} /* namespace controller */
+} /* namespace core */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/src/core/yaml/YamlConfiguration.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/yaml/YamlConfiguration.cpp b/libminifi/src/core/yaml/YamlConfiguration.cpp
index b7c8246..c2c4950 100644
--- a/libminifi/src/core/yaml/YamlConfiguration.cpp
+++ b/libminifi/src/core/yaml/YamlConfiguration.cpp
@@ -21,6 +21,8 @@
 #include <string>
 #include <vector>
 #include <set>
+#include "core/reporting/SiteToSiteProvenanceReportingTask.h"
+#include "io/validation.h"
 namespace org {
 namespace apache {
 namespace nifi {
@@ -31,13 +33,14 @@ core::ProcessGroup *YamlConfiguration::parseRootProcessGroupYaml(
     YAML::Node rootFlowNode) {
   uuid_t uuid;
 
-  checkRequiredField(&rootFlowNode, "name", CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
+  checkRequiredField(&rootFlowNode, "name",
+  CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
   std::string flowName = rootFlowNode["name"].as<std::string>();
   std::string id = getOrGenerateId(&rootFlowNode);
   uuid_parse(id.c_str(), uuid);
 
-  logger_->log_debug(
-      "parseRootProcessGroup: id => [%s], name => [%s]", id, flowName);
+  logger_->log_debug("parseRootProcessGroup: id => [%s], name => [%s]", id,
+                     flowName);
   std::unique_ptr<core::ProcessGroup> group =
       FlowConfiguration::createRootProcessGroup(flowName, uuid);
 
@@ -172,13 +175,16 @@ void YamlConfiguration::parseProcessorNodeYaml(
 
         if (procCfg.schedulingStrategy == "TIMER_DRIVEN") {
           processor->setSchedulingStrategy(core::TIMER_DRIVEN);
-          logger_->log_debug("setting scheduling strategy as %s", procCfg.schedulingStrategy);
+          logger_->log_debug("setting scheduling strategy as %s",
+                             procCfg.schedulingStrategy);
         } else if (procCfg.schedulingStrategy == "EVENT_DRIVEN") {
           processor->setSchedulingStrategy(core::EVENT_DRIVEN);
-          logger_->log_debug("setting scheduling strategy as %s", procCfg.schedulingStrategy);
+          logger_->log_debug("setting scheduling strategy as %s",
+                             procCfg.schedulingStrategy);
         } else {
           processor->setSchedulingStrategy(core::CRON_DRIVEN);
-          logger_->log_debug("setting scheduling strategy as %s", procCfg.schedulingStrategy);
+          logger_->log_debug("setting scheduling strategy as %s",
+                             procCfg.schedulingStrategy);
         }
 
         int64_t maxConcurrentTasks;
@@ -203,11 +209,14 @@ void YamlConfiguration::parseProcessorNodeYaml(
 
         parentGroup->addProcessor(processor);
       }
+    } else {
+      throw new std::invalid_argument(
+          "Cannot instantiate a MiNiFi instance without a defined Processors configuration node.");
     }
   } else {
     throw new std::invalid_argument(
         "Cannot instantiate a MiNiFi instance without a defined "
-            "Processors configuration node.");
+        "Processors configuration node.");
   }
 }
 
@@ -323,14 +332,16 @@ void YamlConfiguration::parseProvenanceReportingYaml(
   std::shared_ptr<core::Processor> processor = nullptr;
   processor = createProvenanceReportTask();
   std::shared_ptr<core::reporting::SiteToSiteProvenanceReportingTask> reportTask =
-      std::static_pointer_cast < core::reporting::SiteToSiteProvenanceReportingTask
-      > (processor);
+      std::static_pointer_cast<
+          core::reporting::SiteToSiteProvenanceReportingTask>(processor);
 
   YAML::Node node = reportNode->as<YAML::Node>();
 
-  checkRequiredField(&node, "scheduling strategy", CONFIG_YAML_PROVENANCE_REPORT_KEY);
+  checkRequiredField(&node, "scheduling strategy",
+  CONFIG_YAML_PROVENANCE_REPORT_KEY);
   auto schedulingStrategyStr = node["scheduling strategy"].as<std::string>();
-  checkRequiredField(&node, "scheduling period", CONFIG_YAML_PROVENANCE_REPORT_KEY);
+  checkRequiredField(&node, "scheduling period",
+  CONFIG_YAML_PROVENANCE_REPORT_KEY);
   auto schedulingPeriodStr = node["scheduling period"].as<std::string>();
   checkRequiredField(&node, "host", CONFIG_YAML_PROVENANCE_REPORT_KEY);
   auto hostStr = node["host"].as<std::string>();
@@ -346,21 +357,21 @@ void YamlConfiguration::parseProvenanceReportingYaml(
   processor->setScheduledState(core::RUNNING);
 
   core::TimeUnit unit;
-  if (core::Property::StringToTime(schedulingPeriodStr, schedulingPeriod, unit) &&
-      core::Property::ConvertTimeUnitToNS(schedulingPeriod, unit, schedulingPeriod)) {
-    logger_->log_debug(
-        "ProvenanceReportingTask schedulingPeriod %d ns",
-        schedulingPeriod);
+  if (core::Property::StringToTime(schedulingPeriodStr, schedulingPeriod, unit)
+      && core::Property::ConvertTimeUnitToNS(schedulingPeriod, unit,
+                                             schedulingPeriod)) {
+    logger_->log_debug("ProvenanceReportingTask schedulingPeriod %d ns",
+                       schedulingPeriod);
     processor->setSchedulingPeriodNano(schedulingPeriod);
   }
 
   if (schedulingStrategyStr == "TIMER_DRIVEN") {
-     processor->setSchedulingStrategy(core::TIMER_DRIVEN);
-     logger_->log_debug(
-         "ProvenanceReportingTask scheduling strategy %s", schedulingStrategyStr);
+    processor->setSchedulingStrategy(core::TIMER_DRIVEN);
+    logger_->log_debug("ProvenanceReportingTask scheduling strategy %s",
+                       schedulingStrategyStr);
   } else {
     throw std::invalid_argument(
-        "Invalid scheduling strategy " +  schedulingStrategyStr);
+        "Invalid scheduling strategy " + schedulingStrategyStr);
   }
 
   reportTask->setHost(hostStr);
@@ -370,6 +381,7 @@ void YamlConfiguration::parseProvenanceReportingYaml(
     logger_->log_debug("ProvenanceReportingTask port %d", (uint16_t) lvalue);
     reportTask->setPort((uint16_t) lvalue);
   }
+
   logger_->log_debug("ProvenanceReportingTask port uuid %s", portUUIDStr);
   uuid_parse(portUUIDStr.c_str(), port_uuid);
   reportTask->setPortUUID(port_uuid);
@@ -378,9 +390,62 @@ void YamlConfiguration::parseProvenanceReportingYaml(
   }
 }
 
-void YamlConfiguration::parseConnectionYaml(
-    YAML::Node *connectionsNode,
-    core::ProcessGroup *parent) {
+void YamlConfiguration::parseControllerServices(
+    YAML::Node *controllerServicesNode) {
+  if (!IsNullOrEmpty(controllerServicesNode)) {
+    if (controllerServicesNode->IsSequence()) {
+      for (auto iter : *controllerServicesNode) {
+        YAML::Node controllerServiceNode = iter.as<YAML::Node>();
+        try {
+          checkRequiredField(&controllerServiceNode, "name",
+                             CONFIG_YAML_CONTROLLER_SERVICES_KEY);
+          checkRequiredField(&controllerServiceNode, "id",
+                             CONFIG_YAML_CONTROLLER_SERVICES_KEY);
+          checkRequiredField(&controllerServiceNode, "class",
+                             CONFIG_YAML_CONTROLLER_SERVICES_KEY);
+
+          auto name = controllerServiceNode["name"].as<std::string>();
+          auto id = controllerServiceNode["id"].as<std::string>();
+          auto type = controllerServiceNode["class"].as<std::string>();
+
+          uuid_t uuid;
+          uuid_parse(id.c_str(), uuid);
+          auto controller_service_node = createControllerService(type, name,
+                                                                 uuid);
+          if (nullptr != controller_service_node) {
+            logger_->log_debug(
+                "Created Controller Service with UUID %s and name %s", id,
+                name);
+            controller_service_node->initialize();
+            YAML::Node propertiesNode = controllerServiceNode["Properties"];
+            // we should propogate propertiets to the node and to the implementation
+            parsePropertiesNodeYaml(
+                &propertiesNode,
+                std::static_pointer_cast<core::ConfigurableComponent>(
+                    controller_service_node));
+            if (controller_service_node->getControllerServiceImplementation()
+                != nullptr) {
+              parsePropertiesNodeYaml(
+                  &propertiesNode,
+                  std::static_pointer_cast<core::ConfigurableComponent>(
+                      controller_service_node
+                          ->getControllerServiceImplementation()));
+            }
+          }
+          controller_services_->put(id, controller_service_node);
+          controller_services_->put(name, controller_service_node);
+        } catch (YAML::InvalidNode &in) {
+          throw Exception(
+              ExceptionType::GENERAL_EXCEPTION,
+              "Name, id, and class must be specified for controller services");
+        }
+      }
+    }
+  }
+}
+
+void YamlConfiguration::parseConnectionYaml(YAML::Node *connectionsNode,
+                                            core::ProcessGroup *parent) {
   if (!parent) {
     logger_->log_error("parseProcessNode: no parent group was provided");
     return;
@@ -400,15 +465,15 @@ void YamlConfiguration::parseConnectionYaml(
         std::string id = getOrGenerateId(&connectionNode);
         uuid_parse(id.c_str(), uuid);
         connection = this->createConnection(name, uuid);
-        logger_->log_debug(
-            "Created connection with UUID %s and name %s", id, name);
-
+        logger_->log_debug("Created connection with UUID %s and name %s", id,
+                           name);
 
         // Configure connection source
         checkRequiredField(&connectionNode, "source relationship name", CONFIG_YAML_CONNECTIONS_KEY);
         auto rawRelationship = connectionNode["source relationship name"].as<std::string>();
         core::Relationship relationship(rawRelationship, "");
-        logger_->log_debug("parseConnection: relationship => [%s]", rawRelationship);
+        logger_->log_debug("parseConnection: relationship => [%s]",
+                           rawRelationship);
         if (connection) {
           connection->setRelationship(relationship);
         }
@@ -462,7 +527,7 @@ void YamlConfiguration::parseConnectionYaml(
           std::string connectionDestProcName = connectionNode["destination name"].as<std::string>();
           uuid_t tmpUUID;
           if (!uuid_parse(connectionDestProcName.c_str(), tmpUUID) &&
-              NULL != parent->findProcessor(tmpUUID)) {
+          NULL != parent->findProcessor(tmpUUID)) {
             // the destination name is a remote port id, so use that as the dest id
             uuid_copy(destUUID, tmpUUID);
             logger_->log_debug("Using 'destination name' containing a remote port id to match the destination for "
@@ -508,13 +573,18 @@ void YamlConfiguration::parsePortYaml(YAML::Node *portNode,
   YAML::Node inputPortsObj = portNode->as<YAML::Node>();
 
   // Check for required fields
-  checkRequiredField(&inputPortsObj, "name", CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
+  checkRequiredField(&inputPortsObj, "name",
+  CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
   auto nameStr = inputPortsObj["name"].as<std::string>();
-  checkRequiredField(&inputPortsObj, "id", "The field 'id' is required for "
-      "the port named '" + nameStr + "' in the YAML Config. If this port "
-      "is an input port for a NiFi Remote Process Group, the port "
-      "id should match the corresponding id specified in the NiFi configuration. "
-      "This is a UUID of the format XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX.");
+  checkRequiredField(
+      &inputPortsObj,
+      "id",
+      "The field 'id' is required for "
+          "the port named '" + nameStr
+          + "' in the YAML Config. If this port "
+              "is an input port for a NiFi Remote Process Group, the port "
+              "id should match the corresponding id specified in the NiFi configuration. "
+              "This is a UUID of the format XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX.");
   auto portId = inputPortsObj["id"].as<std::string>();
   uuid_parse(portId.c_str(), uuid);
 
@@ -530,7 +600,9 @@ void YamlConfiguration::parsePortYaml(YAML::Node *portNode,
   // handle port properties
   YAML::Node nodeVal = portNode->as<YAML::Node>();
   YAML::Node propertiesNode = nodeVal["Properties"];
-  parsePropertiesNodeYaml(&propertiesNode, processor);
+  parsePropertiesNodeYaml(
+      &propertiesNode,
+      std::static_pointer_cast<core::ConfigurableComponent>(processor));
 
   // add processor to parent
   parent->addProcessor(processor);
@@ -548,28 +620,51 @@ void YamlConfiguration::parsePortYaml(YAML::Node *portNode,
 }
 
 void YamlConfiguration::parsePropertiesNodeYaml(
-    YAML::Node *propertiesNode, std::shared_ptr<core::Processor> processor) {
+    YAML::Node *propertiesNode,
+    std::shared_ptr<core::ConfigurableComponent> processor) {
   // Treat generically as a YAML node so we can perform inspection on entries to ensure they are populated
   for (YAML::const_iterator propsIter = propertiesNode->begin();
       propsIter != propertiesNode->end(); ++propsIter) {
     std::string propertyName = propsIter->first.as<std::string>();
     YAML::Node propertyValueNode = propsIter->second;
     if (!propertyValueNode.IsNull() && propertyValueNode.IsDefined()) {
-      std::string rawValueString = propertyValueNode.as<std::string>();
-      if (!processor->setProperty(propertyName, rawValueString)) {
-        logger_->log_warn(
-            "Received property %s with value %s but it is not one of the properties for %s",
-            propertyName,
-            rawValueString,
-            processor->getName());
+      if (propertyValueNode.IsSequence()) {
+        for (auto iter : propertyValueNode) {
+          if (iter.IsDefined()) {
+            YAML::Node nodeVal = iter.as<YAML::Node>();
+            YAML::Node propertiesNode = nodeVal["value"];
+            // must insert the sequence in differently.
+            std::string rawValueString = propertiesNode.as<std::string>();
+            logger_->log_info("Found %s=%s", propertyName, rawValueString);
+            if (!processor->updateProperty(propertyName, rawValueString)) {
+              std::shared_ptr<core::Connectable> proc =
+                  std::dynamic_pointer_cast<core::Connectable>(processor);
+              if (proc != 0) {
+                logger_->log_warn(
+                    "Received property %s with value %s but is not one of the properties for %s",
+                    propertyName, rawValueString, proc->getName());
+              }
+            }
+          }
+        }
+      } else {
+        std::string rawValueString = propertyValueNode.as<std::string>();
+        if (!processor->setProperty(propertyName, rawValueString)) {
+          std::shared_ptr<core::Connectable> proc = std::dynamic_pointer_cast<
+              core::Connectable>(processor);
+          if (proc != 0) {
+            logger_->log_warn(
+                "Received property %s with value %s but is not one of the properties for %s",
+                propertyName, rawValueString, proc->getName());
+          }
+        }
       }
     }
   }
 }
 
-std::string YamlConfiguration::getOrGenerateId(
-    YAML::Node *yamlNode,
-    const std::string &idField) {
+std::string YamlConfiguration::getOrGenerateId(YAML::Node *yamlNode,
+                                               const std::string &idField) {
   std::string id;
   YAML::Node node = yamlNode->as<YAML::Node>();
 
@@ -579,7 +674,7 @@ std::string YamlConfiguration::getOrGenerateId(
     } else {
       throw std::invalid_argument(
           "getOrGenerateId: idField is expected to reference YAML::Node "
-              "of YAML::NodeType::Scalar.");
+          "of YAML::NodeType::Scalar.");
     }
   } else {
     uuid_t uuid;
@@ -592,12 +687,10 @@ std::string YamlConfiguration::getOrGenerateId(
   return id;
 }
 
-void YamlConfiguration::checkRequiredField(
-    YAML::Node *yamlNode,
-    const std::string &fieldName,
-    const std::string &yamlSection,
-    const std::string &errorMessage) {
-
+void YamlConfiguration::checkRequiredField(YAML::Node *yamlNode,
+                                           const std::string &fieldName,
+                                           const std::string &yamlSection,
+                                           const std::string &errorMessage) {
   std::string errMsg = errorMessage;
   if (!yamlNode->as<YAML::Node>()[fieldName]) {
     if (errMsg.empty()) {
@@ -605,14 +698,13 @@ void YamlConfiguration::checkRequiredField(
       // invalid YAML config file, using the component name if present
       errMsg =
           yamlNode->as<YAML::Node>()["name"] ?
-          "Unable to parse configuration file for component named '" +
-              yamlNode->as<YAML::Node>()["name"].as<std::string>() +
-              "' as required field '" + fieldName + "' is missing" :
-          "Unable to parse configuration file as required field '" +
-              fieldName + "' is missing";
+              "Unable to parse configuration file for component named '"
+                  + yamlNode->as<YAML::Node>()["name"].as<std::string>()
+                  + "' as required field '" + fieldName + "' is missing" :
+              "Unable to parse configuration file as required field '"
+                  + fieldName + "' is missing";
       if (!yamlSection.empty()) {
-        errMsg += " [in '" + yamlSection +
-            "' section of configuration file]";
+        errMsg += " [in '" + yamlSection + "' section of configuration file]";
       }
     }
     logger_->log_error(errMsg.c_str());

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/src/processors/GetFile.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/GetFile.cpp b/libminifi/src/processors/GetFile.cpp
index dcabb5d..e39afec 100644
--- a/libminifi/src/processors/GetFile.cpp
+++ b/libminifi/src/processors/GetFile.cpp
@@ -43,6 +43,10 @@ namespace apache {
 namespace nifi {
 namespace minifi {
 namespace processors {
+
+
+
+
 core::Property GetFile::BatchSize(
     "Batch Size", "The maximum number of files to pull in each iteration",
     "10");

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/src/processors/InvokeHTTP.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/InvokeHTTP.cpp b/libminifi/src/processors/InvokeHTTP.cpp
index 17226a8..295560f 100644
--- a/libminifi/src/processors/InvokeHTTP.cpp
+++ b/libminifi/src/processors/InvokeHTTP.cpp
@@ -17,6 +17,7 @@
  */
 
 #include "processors/InvokeHTTP.h"
+#include <regex.h>
 #include <curl/curlbuild.h>
 #include <curl/easy.h>
 #include <uuid/uuid.h>
@@ -42,12 +43,6 @@
 #include "ResourceClaim.h"
 #include "utils/StringUtils.h"
 
-#if  (__GNUC__ >= 4)
-#if (__GNUC_MINOR__ < 9)
-#include <regex.h>
-#endif
-#endif
-
 namespace org {
 namespace apache {
 namespace nifi {
@@ -156,7 +151,8 @@ core::Relationship InvokeHTTP::RelFailure(
 
 void InvokeHTTP::set_request_method(CURL *curl, const std::string &method) {
   std::string my_method = method;
-  std::transform(my_method.begin(), my_method.end(), my_method.begin(), ::toupper);
+  std::transform(my_method.begin(), my_method.end(), my_method.begin(),
+                 ::toupper);
   if (my_method == "POST") {
     curl_easy_setopt(curl, CURLOPT_POST, 1);
   } else if (my_method == "PUT") {
@@ -280,6 +276,17 @@ void InvokeHTTP::onSchedule(core::ProcessContext *context,
   }
 
   utils::StringUtils::StringToBool(penalize_no_retry, penalize_no_retry_);
+
+  std::string context_name;
+  if (context->getProperty(SSLContext.getName(), context_name)
+      && !IsNullOrEmpty(context_name)) {
+    std::shared_ptr<core::controller::ControllerService> service = context
+        ->getControllerService(context_name);
+    if (nullptr != service) {
+      ssl_context_service_ = std::static_pointer_cast<
+          minifi::controllers::SSLContextService>(service);
+    }
+  }
 }
 
 InvokeHTTP::~InvokeHTTP() {
@@ -288,9 +295,9 @@ InvokeHTTP::~InvokeHTTP() {
 
 inline bool InvokeHTTP::matches(const std::string &value,
                                 const std::string &sregex) {
-#ifdef __GNUC__
-#if (__GNUC__ >= 4)
-#if (__GNUC_MINOR__ < 9)
+  if (sregex == ".*")
+    return true;
+
   regex_t regex;
   int ret = regcomp(&regex, sregex.c_str(), 0);
   if (ret)
@@ -299,24 +306,7 @@ inline bool InvokeHTTP::matches(const std::string &value,
   regfree(&regex);
   if (ret)
     return false;
-#else
-  try {
-    std::regex re(sregex);
 
-    if (!std::regex_match(value, re)) {
-      return false;
-    }
-  } catch (std::regex_error e) {
-    logger_->log_error("Invalid File Filter regex: %s.", e.what());
-    return false;
-  }
-#endif
-#endif
-#else
-  logger_->log_info("Cannot support regex filtering");
-  if (regex == ".*")
-  return true;
-#endif
   return true;
 }
 
@@ -346,6 +336,33 @@ struct curl_slist *InvokeHTTP::build_header_list(
   }
   return list;
 }
+
+bool InvokeHTTP::isSecure(const std::string &url) {
+  if (url.find("https") != std::string::npos) {
+    return true;
+  }
+  return false;
+}
+
+CURLcode InvokeHTTP::configure_ssl_context(CURL *curl, void *ctx, void *param) {
+  minifi::controllers::SSLContextService *ssl_context_service =
+      static_cast<minifi::controllers::SSLContextService*>(param);
+  if (!ssl_context_service->configure_ssl_context(static_cast<SSL_CTX*>(ctx))) {
+    return CURLE_FAILED_INIT;
+  }
+  return CURLE_OK;
+}
+
+void InvokeHTTP::configure_secure_connection(CURL *http_session) {
+  logger_->log_debug("InvokeHTTP -- Using certificate file %s",
+                     ssl_context_service_->getCertificateFile());
+  curl_easy_setopt(http_session, CURLOPT_VERBOSE, 1L);
+  curl_easy_setopt(http_session, CURLOPT_SSL_CTX_FUNCTION,
+                   &InvokeHTTP::configure_ssl_context);
+  curl_easy_setopt(http_session, CURLOPT_SSL_CTX_DATA,
+                   static_cast<void*>(ssl_context_service_.get()));
+}
+
 void InvokeHTTP::onTrigger(core::ProcessContext *context,
                            core::ProcessSession *session) {
   std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<
@@ -371,6 +388,10 @@ void InvokeHTTP::onTrigger(core::ProcessContext *context,
   CURL *http_session = curl_easy_init();
   // set the HTTP request method from libCURL
   set_request_method(http_session, method_);
+  if (isSecure(url_) && ssl_context_service_ != nullptr) {
+    configure_secure_connection(http_session);
+  }
+
   curl_easy_setopt(http_session, CURLOPT_URL, url_.c_str());
 
   if (connect_timeout_ > 0) {

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/test/FlowFileRecordTest.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/FlowFileRecordTest.cpp b/libminifi/test/FlowFileRecordTest.cpp
deleted file mode 100644
index 09a3d33..0000000
--- a/libminifi/test/FlowFileRecordTest.cpp
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * @file MiNiFiMain.cpp 
- * MiNiFiMain implementation 
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include <vector>
-#include <queue>
-#include <map>
-
-#include "FlowFileRecord.h"
-
-int main(int argc, char **argv)
-{
-}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/test/HttpGetIntegrationTest.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/HttpGetIntegrationTest.cpp b/libminifi/test/HttpGetIntegrationTest.cpp
deleted file mode 100644
index 90505b4..0000000
--- a/libminifi/test/HttpGetIntegrationTest.cpp
+++ /dev/null
@@ -1,120 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <cassert>
-#include <chrono>
-#include <fstream>
-#include <memory>
-#include <string>
-#include <thread>
-#include <type_traits>
-#include <vector>
-#include <sys/stat.h>
-#include "utils/StringUtils.h"
-#include "../include/core/Core.h"
-#include "../include/core/logging/LogAppenders.h"
-#include "../include/core/logging/BaseLogger.h"
-#include "../include/core/logging/Logger.h"
-#include "../include/core/ProcessGroup.h"
-#include "../include/core/yaml/YamlConfiguration.h"
-#include "../include/FlowController.h"
-#include "../include/properties/Configure.h"
-#include "unit/ProvenanceTestHelper.h"
-#include "../include/io/StreamFactory.h"
-
-std::string test_file_location;
-
-void waitToVerifyProcessor() {
-  std::this_thread::sleep_for(std::chrono::seconds(10));
-}
-
-int main(int argc, char **argv) {
-
-  if (argc > 1) {
-    test_file_location = argv[1];
-  }
-  mkdir("content_repository", S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
-  std::ostringstream oss;
-  std::unique_ptr<logging::BaseLogger> outputLogger = std::unique_ptr<
-      logging::BaseLogger>(
-      new org::apache::nifi::minifi::core::logging::OutputStreamAppender(oss,
-                                                                         0));
-  std::shared_ptr<logging::Logger> logger = logging::Logger::getLogger();
-  logger->updateLogger(std::move(outputLogger));
-  logger->setLogLevel("debug");
-
-  std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
-
-  std::shared_ptr<core::Repository> test_repo =
-      std::make_shared<TestRepository>();
-  std::shared_ptr<core::Repository> test_flow_repo = std::make_shared<
-      TestFlowRepository>();
-
-  configuration->set(minifi::Configure::nifi_flow_configuration_file,
-                     test_file_location);
-
-  std::shared_ptr<minifi::io::StreamFactory> stream_factory = std::make_shared<minifi::io::StreamFactory>(configuration);
-  std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr<
-      core::YamlConfiguration>(
-      new core::YamlConfiguration(test_repo, test_repo, stream_factory, test_file_location));
-  std::shared_ptr<TestRepository> repo =
-      std::static_pointer_cast<TestRepository>(test_repo);
-
-  std::shared_ptr<minifi::FlowController> controller = std::make_shared<
-      minifi::FlowController>(test_repo, test_flow_repo, std::make_shared<minifi::Configure>(), std::move(yaml_ptr),
-  DEFAULT_ROOT_GROUP_NAME,
-                              true);
-
-  core::YamlConfiguration yaml_config(test_repo, test_repo, stream_factory, test_file_location);
-
-  std::unique_ptr<core::ProcessGroup> ptr = yaml_config.getRoot(
-      test_file_location);
-  std::shared_ptr<core::ProcessGroup> pg = std::shared_ptr<core::ProcessGroup>(
-      ptr.get());
-  ptr.release();
-
-  controller->load();
-  controller->start();
-  waitToVerifyProcessor();
-
-  controller->waitUnload(60000);
-  std::string logs = oss.str();
-  assert(logs.find("key:filename value:") != std::string::npos);
-  assert(
-      logs.find(
-          "key:invokehttp.request.url value:https://curl.haxx.se/libcurl/c/httpput.html")
-          != std::string::npos);
-  assert(logs.find("Size:8970 Offset:0") != std::string::npos);
-  assert(
-      logs.find("key:invokehttp.status.code value:200") != std::string::npos);
-  std::string stringtofind = "Resource Claim created ./content_repository/";
-
-  size_t loc = logs.find(stringtofind);
-  while (loc > 0) {
-    std::string id = logs.substr(loc + stringtofind.size(), 36);
-
-    loc = logs.find(stringtofind, loc+1);
-    std::string path = "content_repository/" + id;
-    unlink(path.c_str());
-
-    if ( loc == std::string::npos)
-      break;
-  }
-  rmdir("./content_repository");
-  return 0;
-}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/test/HttpPostIntegrationTest.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/HttpPostIntegrationTest.cpp b/libminifi/test/HttpPostIntegrationTest.cpp
deleted file mode 100644
index 73d21e6..0000000
--- a/libminifi/test/HttpPostIntegrationTest.cpp
+++ /dev/null
@@ -1,120 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <cassert>
-#include <chrono>
-#include <fstream>
-#include <memory>
-#include <string>
-#include <thread>
-#include <type_traits>
-#include <vector>
-#include <sys/stat.h>
-#include "utils/StringUtils.h"
-#include "../include/core/Core.h"
-#include "../include/core/logging/LogAppenders.h"
-#include "../include/core/logging/BaseLogger.h"
-#include "../include/core/logging/Logger.h"
-#include "../include/core/ProcessGroup.h"
-#include "../include/core/yaml/YamlConfiguration.h"
-#include "../include/FlowController.h"
-#include "../include/properties/Configure.h"
-#include "unit/ProvenanceTestHelper.h"
-#include "../include/io/StreamFactory.h"
-#include "../include/properties/Configure.h"
-
-std::string test_file_location;
-
-void waitToVerifyProcessor() {
-  std::this_thread::sleep_for(std::chrono::seconds(2));
-}
-
-int main(int argc, char **argv) {
-
-  if (argc > 1) {
-    test_file_location = argv[1];
-  }
-  mkdir("/tmp/aljr39/",S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
-  std::ofstream myfile;
-  myfile.open ("/tmp/aljr39/example.txt");
-  myfile << "Hello world" << std::endl;
-  myfile.close();
-  mkdir("content_repository", S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
-  std::ostringstream oss;
-  std::unique_ptr<logging::BaseLogger> outputLogger = std::unique_ptr<
-      logging::BaseLogger>(
-      new org::apache::nifi::minifi::core::logging::OutputStreamAppender(oss,
-                                                                         0));
-  std::shared_ptr<logging::Logger> logger = logging::Logger::getLogger();
-  logger->updateLogger(std::move(outputLogger));
-  logger->setLogLevel("debug");
-
-  std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
-
-  std::shared_ptr<core::Repository> test_repo =
-      std::make_shared<TestRepository>();
-  std::shared_ptr<core::Repository> test_flow_repo = std::make_shared<
-      TestFlowRepository>();
-
-  configuration->set(minifi::Configure::nifi_flow_configuration_file,
-                     test_file_location);
-  std::shared_ptr<minifi::io::StreamFactory> stream_factory = std::make_shared<minifi::io::StreamFactory>(configuration);
-
-  std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr<
-      core::YamlConfiguration>(
-      new core::YamlConfiguration(test_repo, test_repo, stream_factory, test_file_location));
-  std::shared_ptr<TestRepository> repo =
-      std::static_pointer_cast<TestRepository>(test_repo);
-
-  std::shared_ptr<minifi::FlowController> controller = std::make_shared<
-      minifi::FlowController>(test_repo, test_flow_repo, configuration, std::move(yaml_ptr),
-  DEFAULT_ROOT_GROUP_NAME,
-                              true);
-
-  core::YamlConfiguration yaml_config(test_repo, test_repo, stream_factory, test_file_location);
-
-  std::unique_ptr<core::ProcessGroup> ptr = yaml_config.getRoot(
-      test_file_location);
-  std::shared_ptr<core::ProcessGroup> pg = std::shared_ptr<core::ProcessGroup>(
-      ptr.get());
-  ptr.release();
-
-  controller->load();
-  controller->start();
-  waitToVerifyProcessor();
-
-  controller->waitUnload(60000);
-  std::string logs = oss.str();
-  assert(logs.find("curl performed") != std::string::npos);
-  assert(logs.find("Import offset 0 length 12") != std::string::npos);
-
-  std::string stringtofind = "Resource Claim created ./content_repository/";
-
-  size_t loc = logs.find(stringtofind);
-  while (loc > 0 && loc != std::string::npos) {
-    std::string id = logs.substr(loc + stringtofind.size(), 36);
-    loc = logs.find(stringtofind, loc+1);
-    std::string path = "content_repository/" + id;
-    unlink(path.c_str());
-    if ( loc == std::string::npos)
-      break;
-  }
-
-  rmdir("./content_repository");
-  return 0;
-}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/test/ProcessorTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/ProcessorTests.cpp b/libminifi/test/ProcessorTests.cpp
deleted file mode 100644
index dfdcf47..0000000
--- a/libminifi/test/ProcessorTests.cpp
+++ /dev/null
@@ -1,408 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#define CATCH_CONFIG_MAIN  // This tells Catch to provide a main() - only do this in one cpp file
-#include <uuid/uuid.h>
-#include <fstream>
-#include "unit/ProvenanceTestHelper.h"
-#include "TestBase.h"
-#include "core/logging/LogAppenders.h"
-#include "core/logging/BaseLogger.h"
-#include "processors/GetFile.h"
-#include "core/Core.h"
-#include "core/FlowFile.h"
-#include "core/Processor.h"
-#include "core/ProcessContext.h"
-#include "core/ProcessSession.h"
-#include "core/ProcessorNode.h"
-#include "core/reporting/SiteToSiteProvenanceReportingTask.h"
-
-
-
-TEST_CASE("Test Creation of GetFile", "[getfileCreate]") {
-  std::shared_ptr<core::Processor> processor = std::make_shared<
-        org::apache::nifi::minifi::processors::GetFile>("processorname");
-  REQUIRE(processor->getName() == "processorname");
-}
-
-TEST_CASE("Test Find file", "[getfileCreate2]") {
-
-  TestController testController;
-
-  testController.enableDebug();
-
-  std::shared_ptr<core::Processor> processor = std::make_shared<
-      org::apache::nifi::minifi::processors::GetFile>("getfileCreate2");
-
-  std::shared_ptr<core::Processor> processorReport =
-      std::make_shared<
-          org::apache::nifi::minifi::core::reporting::SiteToSiteProvenanceReportingTask>(std::make_shared<org::apache::nifi::minifi::io::StreamFactory>(std::make_shared<org::apache::nifi::minifi::Configure>()));
-
-  std::shared_ptr<core::Repository> test_repo =
-      std::make_shared<TestRepository>();
-
-  std::shared_ptr<TestRepository> repo =
-      std::static_pointer_cast<TestRepository>(test_repo);
-  std::shared_ptr<minifi::FlowController> controller = std::make_shared<
-      TestFlowController>(test_repo, test_repo);
-
-  char format[] = "/tmp/gt.XXXXXX";
-  char *dir = testController.createTempDirectory(format);
-
-  uuid_t processoruuid;
-  REQUIRE(true == processor->getUUID(processoruuid));
-
-  std::shared_ptr<minifi::Connection> connection = std::make_shared<
-      minifi::Connection>(test_repo, "getfileCreate2Connection");
-  connection->setRelationship(core::Relationship("success", "description"));
-
-  // link the connections so that we can test results at the end for this
-  connection->setSource(processor);
-  connection->setDestination(processor);
-
-  connection->setSourceUUID(processoruuid);
-  connection->setDestinationUUID(processoruuid);
-
-  processor->addConnection(connection);
-  REQUIRE(dir != NULL);
-
-  core::ProcessorNode node(processor);
-
-  core::ProcessContext context(node, test_repo);
-  core::ProcessSessionFactory factory(&context);
-  context.setProperty(org::apache::nifi::minifi::processors::GetFile::Directory,
-                      dir);
-  core::ProcessSession session(&context);
-
-  processor->onSchedule(&context, &factory);
-  REQUIRE(processor->getName() == "getfileCreate2");
-
-  std::shared_ptr<core::FlowFile> record;
-  processor->setScheduledState(core::ScheduledState::RUNNING);
-  processor->onTrigger(&context, &session);
-
-  provenance::ProvenanceReporter *reporter = session.getProvenanceReporter();
-  std::set<provenance::ProvenanceEventRecord*> records = reporter->getEvents();
-  record = session.get();
-  REQUIRE(record == nullptr);
-  REQUIRE(records.size() == 0);
-
-  std::fstream file;
-  std::stringstream ss;
-  ss << dir << "/" << "tstFile.ext";
-  file.open(ss.str(), std::ios::out);
-  file << "tempFile";
-  file.close();
-
-  processor->incrementActiveTasks();
-  processor->setScheduledState(core::ScheduledState::RUNNING);
-  processor->onTrigger(&context, &session);
-  unlink(ss.str().c_str());
-  reporter = session.getProvenanceReporter();
-
-  REQUIRE(processor->getName() == "getfileCreate2");
-
-  records = reporter->getEvents();
-
-  for (provenance::ProvenanceEventRecord *provEventRecord : records) {
-    REQUIRE(provEventRecord->getComponentType() == processor->getName());
-  }
-  session.commit();
-  std::shared_ptr<core::FlowFile> ffr = session.get();
-
-  ffr->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
-  REQUIRE(2 == repo->getRepoMap().size());
-
-  for (auto entry : repo->getRepoMap()) {
-    provenance::ProvenanceEventRecord newRecord;
-    newRecord.DeSerialize((uint8_t*) entry.second.data(),
-                          entry.second.length());
-
-    bool found = false;
-    for (auto provRec : records) {
-      if (provRec->getEventId() == newRecord.getEventId()) {
-        REQUIRE(provRec->getEventId() == newRecord.getEventId());
-        REQUIRE(provRec->getComponentId() == newRecord.getComponentId());
-        REQUIRE(provRec->getComponentType() == newRecord.getComponentType());
-        REQUIRE(provRec->getDetails() == newRecord.getDetails());
-        REQUIRE(provRec->getEventDuration() == newRecord.getEventDuration());
-        found = true;
-        break;
-      }
-    }
-    if (!found)
-      throw std::runtime_error("Did not find record");
-
-  }
-
-  core::ProcessorNode nodeReport(processorReport);
-  core::ProcessContext contextReport(nodeReport, test_repo);
-  core::ProcessSessionFactory factoryReport(&contextReport);
-  core::ProcessSession sessionReport(&contextReport);
-  processorReport->onSchedule(&contextReport, &factoryReport);
-  std::shared_ptr<
-      org::apache::nifi::minifi::core::reporting::SiteToSiteProvenanceReportingTask> taskReport =
-      std::static_pointer_cast<
-          org::apache::nifi::minifi::core::reporting::SiteToSiteProvenanceReportingTask>(
-          processorReport);
-  taskReport->setBatchSize(1);
-  std::vector<std::shared_ptr<provenance::ProvenanceEventRecord>> recordsReport;
-  processorReport->incrementActiveTasks();
-  processorReport->setScheduledState(core::ScheduledState::RUNNING);
-  std::string jsonStr;
-  repo->getProvenanceRecord(recordsReport, 1);
-  taskReport->getJsonReport(&contextReport, &sessionReport, recordsReport,
-                            jsonStr);
-  REQUIRE(recordsReport.size() == 1);
-  REQUIRE(
-      taskReport->getName()
-          == std::string(
-              org::apache::nifi::minifi::core::reporting::SiteToSiteProvenanceReportingTask::ReportTaskName));
-  REQUIRE(
-      jsonStr.find("\"componentType\" : \"getfileCreate2\"")
-          != std::string::npos);
-}
-
-TEST_CASE("Test GetFileLikeIt'sThreaded", "[getfileCreate3]") {
-
-  TestController testController;
-
-  testController.enableDebug();
-
-  std::shared_ptr<core::Processor> processor = std::make_shared<
-      org::apache::nifi::minifi::processors::GetFile>("getfileCreate2");
-
-  std::shared_ptr<core::Repository> test_repo =
-      std::make_shared<TestRepository>();
-
-  std::shared_ptr<TestRepository> repo =
-      std::static_pointer_cast<TestRepository>(test_repo);
-  std::shared_ptr<minifi::FlowController> controller = std::make_shared<
-      TestFlowController>(test_repo, test_repo);
-
-  char format[] = "/tmp/gt.XXXXXX";
-  char *dir = testController.createTempDirectory(format);
-
-  uuid_t processoruuid;
-  REQUIRE(true == processor->getUUID(processoruuid));
-
-  std::shared_ptr<minifi::Connection> connection = std::make_shared<
-      minifi::Connection>(test_repo, "getfileCreate2Connection");
-  connection->setRelationship(core::Relationship("success", "description"));
-
-  // link the connections so that we can test results at the end for this
-  connection->setSource(processor);
-  connection->setDestination(processor);
-
-  connection->setSourceUUID(processoruuid);
-  connection->setDestinationUUID(processoruuid);
-
-  processor->addConnection(connection);
-  REQUIRE(dir != NULL);
-
-  core::ProcessorNode node(processor);
-  core::ProcessContext context(node, test_repo);
-  core::ProcessSessionFactory factory(&context);
-  context.setProperty(org::apache::nifi::minifi::processors::GetFile::Directory,
-                      dir);
-  // replicate 10 threads
-  processor->setScheduledState(core::ScheduledState::RUNNING);
-  processor->onSchedule(&context, &factory);
-
-  int prev = 0;
-  for (int i = 0; i < 10; i++) {
-
-    core::ProcessSession session(&context);
-    REQUIRE(processor->getName() == "getfileCreate2");
-
-    std::shared_ptr<core::FlowFile> record;
-
-    processor->onTrigger(&context, &session);
-
-    provenance::ProvenanceReporter *reporter = session.getProvenanceReporter();
-    std::set<provenance::ProvenanceEventRecord*> records =
-        reporter->getEvents();
-    record = session.get();
-    REQUIRE(record == nullptr);
-    REQUIRE(records.size() == 0);
-
-    std::fstream file;
-    std::stringstream ss;
-    ss << dir << "/" << "tstFile.ext";
-    file.open(ss.str(), std::ios::out);
-    file << "tempFile";
-    file.close();
-
-    processor->incrementActiveTasks();
-    processor->setScheduledState(core::ScheduledState::RUNNING);
-    processor->onTrigger(&context, &session);
-    unlink(ss.str().c_str());
-    reporter = session.getProvenanceReporter();
-
-    REQUIRE(processor->getName() == "getfileCreate2");
-
-    records = reporter->getEvents();
-
-    for (provenance::ProvenanceEventRecord *provEventRecord : records) {
-      REQUIRE(provEventRecord->getComponentType() == processor->getName());
-    }
-    session.commit();
-    std::shared_ptr<core::FlowFile> ffr = session.get();
-
-    REQUIRE((repo->getRepoMap().size() % 2) == 0);
-    REQUIRE(repo->getRepoMap().size() == (prev + 2));
-    prev += 2;
-
-  }
-
-}
-
-TEST_CASE("LogAttributeTest", "[getfileCreate3]") {
-  std::ostringstream oss;
-  std::unique_ptr<logging::BaseLogger> outputLogger = std::unique_ptr<
-      logging::BaseLogger>(
-      new org::apache::nifi::minifi::core::logging::OutputStreamAppender(oss,
-                                                                         0));
-  std::shared_ptr<logging::Logger> logger = logging::Logger::getLogger();
-  logger->updateLogger(std::move(outputLogger));
-
-  TestController testController;
-
-  testController.enableDebug();
-
-  std::shared_ptr<core::Repository> repo = std::make_shared<TestRepository>();
-
-  std::shared_ptr<core::Processor> processor = std::make_shared<
-      org::apache::nifi::minifi::processors::GetFile>("getfileCreate2");
-
-  std::shared_ptr<core::Processor> logAttribute = std::make_shared<
-      org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
-
-  char format[] = "/tmp/gt.XXXXXX";
-  char *dir = testController.createTempDirectory(format);
-
-  uuid_t processoruuid;
-  REQUIRE(true == processor->getUUID(processoruuid));
-
-  uuid_t logattribute_uuid;
-  REQUIRE(true == logAttribute->getUUID(logattribute_uuid));
-
-  std::shared_ptr<minifi::Connection> connection = std::make_shared<
-      minifi::Connection>(repo, "getfileCreate2Connection");
-  connection->setRelationship(core::Relationship("success", "description"));
-
-  std::shared_ptr<minifi::Connection> connection2 = std::make_shared<
-      minifi::Connection>(repo, "logattribute");
-  connection2->setRelationship(core::Relationship("success", "description"));
-
-  // link the connections so that we can test results at the end for this
-  connection->setSource(processor);
-
-  // link the connections so that we can test results at the end for this
-  connection->setDestination(logAttribute);
-
-  connection2->setSource(logAttribute);
-
-  connection2->setSourceUUID(logattribute_uuid);
-  connection->setSourceUUID(processoruuid);
-  connection->setDestinationUUID(logattribute_uuid);
-
-  processor->addConnection(connection);
-  logAttribute->addConnection(connection);
-  logAttribute->addConnection(connection2);
-  REQUIRE(dir != NULL);
-
-  core::ProcessorNode node(processor);
-  core::ProcessorNode node2(logAttribute);
-
-  core::ProcessContext context(node, repo);
-  core::ProcessContext context2(node2, repo);
-  context.setProperty(org::apache::nifi::minifi::processors::GetFile::Directory,
-                      dir);
-  core::ProcessSession session(&context);
-  core::ProcessSession session2(&context2);
-
-  REQUIRE(processor->getName() == "getfileCreate2");
-
-  std::shared_ptr<core::FlowFile> record;
-  processor->setScheduledState(core::ScheduledState::RUNNING);
-
-  core::ProcessSessionFactory factory(&context);
-  processor->onSchedule(&context, &factory);
-  processor->onTrigger(&context, &session);
-
-  logAttribute->incrementActiveTasks();
-  logAttribute->setScheduledState(core::ScheduledState::RUNNING);
-  core::ProcessSessionFactory factory2(&context2);
-  logAttribute->onSchedule(&context2, &factory2);
-  logAttribute->onTrigger(&context2, &session2);
-
-  provenance::ProvenanceReporter *reporter = session.getProvenanceReporter();
-  std::set<provenance::ProvenanceEventRecord*> records = reporter->getEvents();
-  record = session.get();
-  REQUIRE(record == nullptr);
-  REQUIRE(records.size() == 0);
-
-  std::fstream file;
-  std::stringstream ss;
-  ss << dir << "/" << "tstFile.ext";
-  file.open(ss.str(), std::ios::out);
-  file << "tempFile";
-  file.close();
-
-  processor->incrementActiveTasks();
-  processor->setScheduledState(core::ScheduledState::RUNNING);
-  processor->onTrigger(&context, &session);
-  unlink(ss.str().c_str());
-  reporter = session.getProvenanceReporter();
-
-  records = reporter->getEvents();
-  session.commit();
-  oss.str("");
-  oss.clear();
-
-  logAttribute->incrementActiveTasks();
-  logAttribute->setScheduledState(core::ScheduledState::RUNNING);
-  logAttribute->onTrigger(&context2, &session2);
-
-  //session2.commit();
-  records = reporter->getEvents();
-
-  std::string log_attribute_output = oss.str();
-  REQUIRE(
-      log_attribute_output.find("key:absolute.path value:" + ss.str())
-          != std::string::npos);
-  REQUIRE(log_attribute_output.find("Size:8 Offset:0") != std::string::npos);
-  REQUIRE(
-      log_attribute_output.find("key:path value:" + std::string(dir))
-          != std::string::npos);
-
-  outputLogger = std::unique_ptr<logging::BaseLogger>(
-      new org::apache::nifi::minifi::core::logging::NullAppender());
-  logger->updateLogger(std::move(outputLogger));
-
-}
-
-int fileSize(const char *add) {
-  std::ifstream mySource;
-  mySource.open(add, std::ios_base::binary);
-  mySource.seekg(0, std::ios_base::end);
-  int size = mySource.tellg();
-  mySource.close();
-  return size;
-}
-

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/test/SocketTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/SocketTests.cpp b/libminifi/test/SocketTests.cpp
deleted file mode 100644
index 2e5013b..0000000
--- a/libminifi/test/SocketTests.cpp
+++ /dev/null
@@ -1,185 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#define CATCH_CONFIG_MAIN  // This tells Catch to provide a main() - only do this in one cpp file
-
-#include "TestBase.h"
-#include "io/ClientSocket.h"
-
-using namespace org::apache::nifi::minifi::io;
-TEST_CASE("TestSocket", "[TestSocket1]") {
-
-  Socket socket(std::make_shared<SocketContext>(std::make_shared<minifi::Configure>()), "localhost", 8183);
-  REQUIRE(-1 == socket.initialize());
-  REQUIRE("localhost" == socket.getHostname());
-  socket.closeStream();
-
-}
-
-TEST_CASE("TestSocketWriteTest1", "[TestSocket2]") {
-
-  Socket socket(std::make_shared<SocketContext>(std::make_shared<minifi::Configure>()), "localhost", 8183);
-  REQUIRE(-1 == socket.initialize());
-
-  socket.writeData(0, 0);
-
-  std::vector<uint8_t> buffer;
-  buffer.push_back('a');
-
-  REQUIRE(-1 == socket.writeData(buffer, 1));
-
-  socket.closeStream();
-
-}
-
-TEST_CASE("TestSocketWriteTest2", "[TestSocket3]") {
-
-  std::vector<uint8_t> buffer;
-  buffer.push_back('a');
-  
-  std::shared_ptr<SocketContext> socket_context = std::make_shared<SocketContext>(std::make_shared<minifi::Configure>());
-
-  Socket server(socket_context, "localhost", 9183, 1);
-
-  REQUIRE(-1 != server.initialize());
-
-  Socket client(socket_context, "localhost", 9183);
-
-  REQUIRE(-1 != client.initialize());
-
-  REQUIRE(1 == client.writeData(buffer, 1));
-
-  std::vector<uint8_t> readBuffer;
-  readBuffer.resize(1);
-
-  REQUIRE(1 == server.readData(readBuffer, 1));
-
-  REQUIRE(readBuffer == buffer);
-
-  server.closeStream();
-
-  client.closeStream();
-
-}
-
-TEST_CASE("TestGetHostName", "[TestSocket4]") {
-
-  REQUIRE(Socket::getMyHostName().length() > 0);
-
-}
-
-TEST_CASE("TestWriteEndian64", "[TestSocket4]") {
-
-  std::vector<uint8_t> buffer;
-  buffer.push_back('a');
-  
-  std::shared_ptr<SocketContext> socket_context = std::make_shared<SocketContext>(std::make_shared<minifi::Configure>());
-
-  Socket server(socket_context, "localhost", 9183, 1);
-
-  REQUIRE(-1 != server.initialize());
-
-  Socket client(socket_context, "localhost", 9183);
-
-  REQUIRE(-1 != client.initialize());
-
-  uint64_t negative_one = -1;
-  REQUIRE(8 == client.write(negative_one));
-
-  uint64_t negative_two = 0;
-  REQUIRE(8 == server.read(negative_two));
-
-  REQUIRE(negative_two == negative_one);
-
-  server.closeStream();
-
-  client.closeStream();
-
-}
-
-TEST_CASE("TestWriteEndian32", "[TestSocket5]") {
-
-  std::vector<uint8_t> buffer;
-  buffer.push_back('a');
-
-  std::shared_ptr<SocketContext> socket_context = std::make_shared<SocketContext>(std::make_shared<minifi::Configure>());
-  
-  Socket server(socket_context, "localhost", 9183, 1);
-
-  REQUIRE(-1 != server.initialize());
-
-  Socket client(socket_context, "localhost", 9183);
-
-  REQUIRE(-1 != client.initialize());
-
-  {
-    uint32_t negative_one = -1;
-    REQUIRE(4 == client.write(negative_one));
-
-    uint32_t negative_two = 0;
-    REQUIRE(4 == server.read(negative_two));
-
-    REQUIRE(negative_two == negative_one);
-  }
-
-  {
-    uint16_t negative_one = -1;
-    REQUIRE(2 == client.write(negative_one));
-
-    uint16_t negative_two = 0;
-    REQUIRE(2 == server.read(negative_two));
-
-    REQUIRE(negative_two == negative_one);
-  }
-  server.closeStream();
-
-  client.closeStream();
-
-}
-
-TEST_CASE("TestSocketWriteTestAfterClose", "[TestSocket6]") {
-
-  std::vector<uint8_t> buffer;
-  buffer.push_back('a');
-
-  std::shared_ptr<SocketContext> socket_context = std::make_shared<SocketContext>(std::make_shared<minifi::Configure>());
-  
-  Socket server(socket_context, "localhost", 9183, 1);
-
-  REQUIRE(-1 != server.initialize());
-
-  Socket client(socket_context, "localhost", 9183);
-
-  REQUIRE(-1 != client.initialize());
-
-  REQUIRE(1 == client.writeData(buffer, 1));
-
-  std::vector<uint8_t> readBuffer;
-  readBuffer.resize(1);
-
-  REQUIRE(1 == server.readData(readBuffer, 1));
-
-  REQUIRE(readBuffer == buffer);
-
-  client.closeStream();
-
-  REQUIRE(-1 == client.writeData(buffer, 1));
-
-  server.closeStream();
-
-}


Mime
View raw message