nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ald...@apache.org
Subject [4/5] nifi-minifi-cpp git commit: MINIFI-226: Add controller services capabilities along with unit tests Fix test failures Update Travis YML Update readme to link to MiNiFi licensing information
Date Thu, 18 May 2017 13:07:54 GMT
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/include/core/controller/ControllerServiceProvider.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/controller/ControllerServiceProvider.h b/libminifi/include/core/controller/ControllerServiceProvider.h
new file mode 100644
index 0000000..a749475
--- /dev/null
+++ b/libminifi/include/core/controller/ControllerServiceProvider.h
@@ -0,0 +1,306 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CORE_CONTROLLER_CONTROLLERSERVICEPROVIDER_H_
+#define LIBMINIFI_INCLUDE_CORE_CONTROLLER_CONTROLLERSERVICEPROVIDER_H_
+
+#include <vector>
+#include "core/Core.h"
+#include "ControllerServiceLookup.h"
+#include "core/ConfigurableComponent.h"
+#include "ControllerServiceNode.h"
+#include "ControllerServiceMap.h"
+#include "core/ClassLoader.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+namespace controller {
+
+class ControllerServiceProvider : public CoreComponent,
+    public ConfigurableComponent, public ControllerServiceLookup {
+ public:
+
+  explicit ControllerServiceProvider(const std::string &name)
+      : CoreComponent(name),
+        ConfigurableComponent(logging::Logger::getLogger()) {
+    controller_map_ = std::make_shared<ControllerServiceMap>();
+  }
+
+  explicit ControllerServiceProvider(
+      std::shared_ptr<ControllerServiceMap> services)
+      : CoreComponent(core::getClassName<ControllerServiceProvider>()),
+        ConfigurableComponent(logging::Logger::getLogger()),
+        controller_map_(services) {
+  }
+
+  explicit ControllerServiceProvider(
+      const std::string &name, std::shared_ptr<ControllerServiceMap> services)
+      : CoreComponent(name),
+        ConfigurableComponent(logging::Logger::getLogger()),
+        controller_map_(services) {
+  }
+
+  explicit ControllerServiceProvider(const ControllerServiceProvider &&other)
+      : CoreComponent(std::move(other)),
+        ConfigurableComponent(std::move(other)),
+        controller_map_(std::move(other.controller_map_)) {
+  }
+
+  virtual ~ControllerServiceProvider() {
+  }
+
+  /**
+   * Creates a controller service node wrapping the controller service
+   *
+   * @param type service type.
+   * @param id controller service identifier.
+   * @return shared pointer to the controller service node.
+   */
+  virtual std::shared_ptr<ControllerServiceNode> createControllerService(
+      const std::string &type, const std::string &id,
+      bool firstTimeAdded) = 0;
+  /**
+   * Gets a controller service node wrapping the controller service
+   *
+   * @param type service type.
+   * @param id controller service identifier.
+   * @return shared pointer to the controller service node.
+   */
+  virtual std::shared_ptr<ControllerServiceNode> getControllerServiceNode(
+      const std::string &id) {
+    return controller_map_->getControllerServiceNode(id);
+  }
+
+  /**
+   * Removes a controller service.
+   * @param serviceNode controller service node.
+   */
+  virtual void removeControllerService(
+      const std::shared_ptr<ControllerServiceNode> &serviceNode) {
+    controller_map_->removeControllerService(serviceNode);
+  }
+
+  /**
+   * Enables the provided controller service
+   * @param serviceNode controller service node.
+   */
+  virtual void enableControllerService(
+      std::shared_ptr<ControllerServiceNode> &serviceNode) = 0;
+
+  /**
+   * Enables the provided controller service nodes
+   * @param serviceNode controller service node.
+   */
+  virtual void enableControllerServices(
+      std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> serviceNodes) = 0;
+
+  /**
+   * Disables the provided controller service node
+   * @param serviceNode controller service node.
+   */
+  virtual void disableControllerService(
+      std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) = 0;
+
+  /**
+   * Gets a list of all controller services.
+   */
+  virtual std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> getAllControllerServices() {
+    return controller_map_->getAllControllerServices();
+  }
+
+  /**
+   * Verifies that referencing components can be stopped for the controller service
+   */
+  virtual void verifyCanStopReferencingComponents(
+      std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) = 0;
+
+  /**
+   *  Unschedules referencing components.
+   */
+  virtual std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> unscheduleReferencingComponents(
+      std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) = 0;
+
+  /**
+   * Verifies referencing components for <code>serviceNode</code> can be disabled.
+   * @param serviceNode shared pointer to a controller service node.
+   */
+  virtual void verifyCanDisableReferencingServices(
+      std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) = 0;
+
+  /**
+   * Disables referencing components for <code>serviceNode</code> can be disabled.
+   * @param serviceNode shared pointer to a controller service node.
+   */
+  virtual std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> disableReferencingServices(
+      std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
+    return std::vector<std::shared_ptr<core::controller::ControllerServiceNode>>();
+  }
+
+  /**
+   * Verifies referencing components for <code>serviceNode</code> can be enabled.
+   * @param serviceNode shared pointer to a controller service node.
+   */
+  virtual void verifyCanEnableReferencingServices(
+      std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
+    std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> references =
+        findLinkedComponents(serviceNode);
+    for (auto ref : references) {
+      ref->canEnable();
+    }
+  }
+
+  /**
+   * Enables referencing components for <code>serviceNode</code> can be Enabled.
+   * @param serviceNode shared pointer to a controller service node.
+   */
+  virtual std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> enableReferencingServices(
+      std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) = 0;
+
+  /**
+   * Schedules the service node and referencing components.
+   * @param serviceNode shared pointer to a controller service node.
+   */
+  virtual std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> scheduleReferencingComponents(
+      std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) = 0;
+
+  /**
+   * Returns a controller service for the service identifier and componentID
+   * @param service Identifier service identifier.
+   */
+  virtual std::shared_ptr<ControllerService> getControllerServiceForComponent(
+      const std::string &serviceIdentifier, const std::string &componentId) {
+    std::shared_ptr<ControllerService> node = getControllerService(
+        serviceIdentifier);
+    return node;
+  }
+
+  /**
+   * Gets the controller service for the provided identifier
+   * @param identifier service identifier.
+   */
+  virtual std::shared_ptr<ControllerService> getControllerService(
+      const std::string &identifier);
+
+  /**
+   * Determines if Controller service is enabled.
+   * @param identifier service identifier.
+   */
+  virtual bool isControllerServiceEnabled(const std::string &identifier) {
+    std::shared_ptr<ControllerServiceNode> node = getControllerServiceNode(
+        identifier);
+    if (nullptr != node) {
+      return linkedServicesAre(ENABLED, node);
+    } else
+      return false;
+  }
+
+  /**
+   * Determines if Controller service is being enabled.
+   * @param identifier service identifier.
+   */
+  virtual bool isControllerServiceEnabling(const std::string &identifier) {
+    std::shared_ptr<ControllerServiceNode> node = getControllerServiceNode(
+        identifier);
+    if (nullptr != node) {
+      return linkedServicesAre(ENABLING, node);
+    } else
+      return false;
+  }
+
+  virtual const std::string getControllerServiceName(
+      const std::string &identifier) {
+    std::shared_ptr<ControllerService> node = getControllerService(identifier);
+    if (nullptr != node) {
+      return node->getName();
+    } else
+      return "";
+  }
+
+  virtual void enableAllControllerServices() = 0;
+
+ protected:
+
+  /**
+   * verifies that linked services match the provided state.
+   */
+  inline bool linkedServicesAre(
+      ControllerServiceState state,
+      const std::shared_ptr<ControllerServiceNode> &node) {
+    if (node->getControllerServiceImplementation()->getState() == state) {
+      for (auto child_service : node->getLinkedControllerServices()) {
+        if (child_service->getControllerServiceImplementation()->getState()
+            != state) {
+          return false;
+        }
+      }
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  bool canEdit() {
+    return true;
+  }
+
+  /**
+   * Finds linked components
+   * @param referenceNode reference node from whcih we will find linked references.
+   */
+  std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> findLinkedComponents(
+      std::shared_ptr<core::controller::ControllerServiceNode> &referenceNode) {
+
+    std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> references;
+
+    for (std::shared_ptr<core::controller::ControllerServiceNode> linked_node : referenceNode
+        ->getLinkedControllerServices()) {
+      references.push_back(linked_node);
+      std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> linked_references =
+          findLinkedComponents(linked_node);
+
+      auto removal_predicate =
+          [&linked_references](std::shared_ptr<core::controller::ControllerServiceNode> key) ->bool
+          {
+            return std::find(linked_references.begin(), linked_references.end(), key) != linked_references.end();
+          };
+
+      references.erase(
+          std::remove_if(references.begin(), references.end(),
+                         removal_predicate),
+          references.end());
+
+      references.insert(std::end(references), linked_references.begin(),
+                        linked_references.end());
+    }
+    return references;
+  }
+
+  std::shared_ptr<ControllerServiceMap> controller_map_;
+
+};
+
+} /* namespace controller */
+} /* namespace core */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_CORE_CONTROLLER_CONTROLLERSERVICEPROVIDER_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/include/core/controller/StandardControllerServiceNode.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/controller/StandardControllerServiceNode.h b/libminifi/include/core/controller/StandardControllerServiceNode.h
new file mode 100644
index 0000000..f599217
--- /dev/null
+++ b/libminifi/include/core/controller/StandardControllerServiceNode.h
@@ -0,0 +1,107 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CORE_CONTROLLER_STANDARDCONTROLLERSERVICENODE_H_
+#define LIBMINIFI_INCLUDE_CORE_CONTROLLER_STANDARDCONTROLLERSERVICENODE_H_
+
+#include "core/Core.h"
+#include "ControllerServiceNode.h"
+#include "core/logging/Logger.h"
+#include "core/ProcessGroup.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+namespace controller {
+
+class StandardControllerServiceNode : public ControllerServiceNode {
+ public:
+
+  explicit StandardControllerServiceNode(
+      std::shared_ptr<ControllerService> service,
+      std::shared_ptr<ControllerServiceProvider> provider,
+      const std::string &id, std::shared_ptr<Configure> configuration)
+      : ControllerServiceNode(service, id, configuration),
+        provider(provider) {
+  }
+
+  explicit StandardControllerServiceNode(
+      std::shared_ptr<ControllerService> service, const std::string &id,
+      std::shared_ptr<Configure> configuration)
+      : ControllerServiceNode(service, id, configuration),
+        provider(nullptr) {
+  }
+
+  std::shared_ptr<core::ProcessGroup> &getProcessGroup();
+
+  void setProcessGroup(std::shared_ptr<ProcessGroup> &processGroup);
+
+  StandardControllerServiceNode(const StandardControllerServiceNode &other) = delete;
+  StandardControllerServiceNode &operator=(
+      const StandardControllerServiceNode &parent) = delete;
+
+  /**
+   * Initializes the controller service node.
+   */
+  virtual void initialize() {
+    ControllerServiceNode::initialize();
+    active = false;
+  }
+
+  bool canEnable() {
+    if (!active.load()) {
+      for (auto linked_service : linked_controller_services_) {
+        if (!linked_service->canEnable()) {
+          return false;
+        }
+      }
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  bool enable();
+
+  bool disable() {
+    controller_service_->setState(DISABLED);
+    active = false;
+    return true;
+  }
+
+ protected:
+
+  // controller service provider.
+  std::shared_ptr<ControllerServiceProvider> provider;
+
+  // process group.
+  std::shared_ptr<core::ProcessGroup> process_group_;
+
+  std::mutex mutex_;
+
+};
+
+} /* namespace controller */
+} /* namespace core */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_CORE_CONTROLLER_STANDARDCONTROLLERSERVICENODE_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/include/core/controller/StandardControllerServiceProvider.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/controller/StandardControllerServiceProvider.h b/libminifi/include/core/controller/StandardControllerServiceProvider.h
new file mode 100644
index 0000000..ba8af56
--- /dev/null
+++ b/libminifi/include/core/controller/StandardControllerServiceProvider.h
@@ -0,0 +1,229 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CORE_CONTROLLER_STANDARDStandardControllerServiceProvider_H_
+#define LIBMINIFI_INCLUDE_CORE_CONTROLLER_STANDARDStandardControllerServiceProvider_H_
+
+#include <iostream>
+#include <memory>
+#include <vector>
+#include "core/ProcessGroup.h"
+#include "SchedulingAgent.h"
+#include "core/ClassLoader.h"
+#include "ControllerService.h"
+#include "ControllerServiceMap.h"
+#include "ControllerServiceNode.h"
+#include "StandardControllerServiceNode.h"
+#include "ControllerServiceProvider.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+namespace controller {
+
+class StandardControllerServiceProvider : public ControllerServiceProvider,
+    public std::enable_shared_from_this<StandardControllerServiceProvider> {
+ public:
+
+  explicit StandardControllerServiceProvider(
+      std::shared_ptr<ControllerServiceMap> services,
+      std::shared_ptr<ProcessGroup> root_group, std::shared_ptr<Configure> configuration,
+      std::shared_ptr<minifi::SchedulingAgent> agent, ClassLoader &loader =
+          ClassLoader::getDefaultClassLoader())
+      : ControllerServiceProvider(services),
+        root_group_(root_group),
+        agent_(agent),
+        extension_loader_(loader),
+        configuration_(configuration) {
+  }
+
+  explicit StandardControllerServiceProvider(
+      std::shared_ptr<ControllerServiceMap> services,
+      std::shared_ptr<ProcessGroup> root_group, std::shared_ptr<Configure> configuration,
+      ClassLoader &loader = ClassLoader::getDefaultClassLoader())
+      : ControllerServiceProvider(services),
+        root_group_(root_group),
+        agent_(0),
+        extension_loader_(loader),
+        configuration_(configuration) {
+  }
+
+  explicit StandardControllerServiceProvider(
+      const StandardControllerServiceProvider && other)
+      : ControllerServiceProvider(std::move(other)),
+        root_group_(std::move(other.root_group_)),
+        agent_(std::move(other.agent_)),
+        extension_loader_(other.extension_loader_),
+        configuration_(other.configuration_) {
+
+  }
+
+  void setRootGroup(std::shared_ptr<ProcessGroup> rg) {
+    root_group_ = rg;
+  }
+
+  void setSchedulingAgent(std::shared_ptr<minifi::SchedulingAgent> agent) {
+    agent_ = agent;
+  }
+
+  std::shared_ptr<ControllerServiceNode> createControllerService(
+      const std::string &type, const std::string &id,
+      bool firstTimeAdded) {
+
+    std::shared_ptr<ControllerService> new_controller_service =
+        extension_loader_.instantiate<ControllerService>(type, id);
+
+    if (nullptr == new_controller_service) {
+      return nullptr;
+    }
+
+    std::shared_ptr<ControllerServiceNode> new_service_node = std::make_shared<
+        StandardControllerServiceNode>(
+        new_controller_service,
+        std::static_pointer_cast<ControllerServiceProvider>(shared_from_this()),
+        id, configuration_);
+
+    controller_map_->put(id, new_service_node);
+    return new_service_node;
+
+  }
+
+
+  void enableControllerService(
+      std::shared_ptr<ControllerServiceNode> &serviceNode) {
+    if (serviceNode->canEnable()) {
+      agent_->enableControllerService(serviceNode);
+    }
+  }
+
+
+  virtual void enableAllControllerServices() {
+    logger_->log_info("Enabling %d controller services",
+                      controller_map_->getAllControllerServices().size());
+    for (auto service : controller_map_->getAllControllerServices()) {
+
+      if (service->canEnable()) {
+        logger_->log_info("Enabling %s", service->getName());
+        agent_->enableControllerService(service);
+      } else {
+        logger_->log_info("Could not enable %s", service->getName());
+      }
+    }
+  }
+
+
+  void enableControllerServices(
+      std::vector<std::shared_ptr<ControllerServiceNode>> serviceNodes) {
+    for (auto node : serviceNodes) {
+      enableControllerService(node);
+    }
+  }
+
+
+  void disableControllerService(
+      std::shared_ptr<ControllerServiceNode> &serviceNode) {
+    if (!IsNullOrEmpty(serviceNode.get()) && serviceNode->enabled()) {
+      agent_->disableControllerService(serviceNode);
+    }
+  }
+
+
+  void verifyCanStopReferencingComponents(
+      std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
+  }
+
+
+  std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> unscheduleReferencingComponents(
+      std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
+    std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> references =
+        findLinkedComponents(serviceNode);
+    for (auto ref : references) {
+      agent_->disableControllerService(ref);
+    }
+    return references;
+  }
+
+
+  void verifyCanDisableReferencingServices(
+      std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
+    std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> references =
+        findLinkedComponents(serviceNode);
+    for (auto ref : references) {
+      if (!ref->canEnable()) {
+        logger_->log_info("Cannot disable %s", ref->getName());
+      }
+    }
+  }
+
+
+  virtual std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> disableReferencingServices(
+      std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
+    std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> references =
+        findLinkedComponents(serviceNode);
+    for (auto ref : references) {
+      agent_->disableControllerService(ref);
+    }
+
+    return references;
+  }
+
+  std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> enableReferencingServices(
+      std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
+    std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> references =
+        findLinkedComponents(serviceNode);
+    for (auto ref : references) {
+      agent_->enableControllerService(ref);
+    }
+    return references;
+  }
+
+  std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> scheduleReferencingComponents(
+      std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
+    std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> references =
+        findLinkedComponents(serviceNode);
+    for (auto ref : references) {
+      agent_->enableControllerService(ref);
+    }
+    return references;
+  }
+
+ protected:
+
+  bool canEdit() {
+    return false;
+  }
+
+  std::shared_ptr<minifi::SchedulingAgent> agent_;
+
+  ClassLoader &extension_loader_;
+
+  std::shared_ptr<Configure> configuration_;
+
+  std::shared_ptr<ProcessGroup> root_group_;
+
+};
+
+} /* namespace controller */
+} /* namespace core */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_CORE_CONTROLLER_STANDARDStandardControllerServiceProvider_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/include/core/yaml/YamlConfiguration.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/yaml/YamlConfiguration.h b/libminifi/include/core/yaml/YamlConfiguration.h
index 793cdb9..6226a4e 100644
--- a/libminifi/include/core/yaml/YamlConfiguration.h
+++ b/libminifi/include/core/yaml/YamlConfiguration.h
@@ -20,6 +20,7 @@
 
 #include "core/ProcessorConfig.h"
 #include "yaml-cpp/yaml.h"
+#include "processors/LoadProcessors.h"
 #include "../FlowConfiguration.h"
 #include "Site2SiteClientProtocol.h"
 #include <string>
@@ -36,18 +37,21 @@ namespace core {
 #define CONFIG_YAML_FLOW_CONTROLLER_KEY "Flow Controller"
 #define CONFIG_YAML_PROCESSORS_KEY "Processors"
 #define CONFIG_YAML_CONNECTIONS_KEY "Connections"
+#define CONFIG_YAML_CONTROLLER_SERVICES_KEY "Controller Services"
 #define CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY "Remote Processing Groups"
 #define CONFIG_YAML_PROVENANCE_REPORT_KEY "Provenance Reporting"
 
 class YamlConfiguration : public FlowConfiguration {
 
  public:
-  YamlConfiguration(std::shared_ptr<core::Repository> repo,
+  explicit YamlConfiguration(std::shared_ptr<core::Repository> repo,
                     std::shared_ptr<core::Repository> flow_file_repo,
                     std::shared_ptr<io::StreamFactory> stream_factory,
+                    std::shared_ptr<Configure> configuration,
                     const std::string path = DEFAULT_FLOW_YAML_FILE_NAME)
-      : FlowConfiguration(repo, flow_file_repo, stream_factory, path) {
-       stream_factory_ = stream_factory;
+      : FlowConfiguration(repo, flow_file_repo, stream_factory, configuration,
+                          path) {
+    stream_factory_ = stream_factory;
     if (IsNullOrEmpty(config_path_)) {
       config_path_ = DEFAULT_FLOW_YAML_FILE_NAME;
     }
@@ -67,7 +71,8 @@ class YamlConfiguration : public FlowConfiguration {
    * @return              the root ProcessGroup node of the flow
    *                        configuration tree
    */
-  std::unique_ptr<core::ProcessGroup> getRoot(const std::string &yamlConfigStr) {
+  std::unique_ptr<core::ProcessGroup> getRoot(
+      const std::string &yamlConfigStr) {
     YAML::Node rootYamlNode = YAML::LoadFile(yamlConfigStr);
     return getRoot(&rootYamlNode);
   }
@@ -106,9 +111,14 @@ class YamlConfiguration : public FlowConfiguration {
     YAML::Node flowControllerNode = rootYaml[CONFIG_YAML_FLOW_CONTROLLER_KEY];
     YAML::Node processorsNode = rootYaml[CONFIG_YAML_PROCESSORS_KEY];
     YAML::Node connectionsNode = rootYaml[CONFIG_YAML_CONNECTIONS_KEY];
-    YAML::Node remoteProcessingGroupsNode = rootYaml[CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY];
-    YAML::Node provenanceReportNode = rootYaml[CONFIG_YAML_PROVENANCE_REPORT_KEY];
+    YAML::Node controllerServiceNode =
+        rootYaml[CONFIG_YAML_CONTROLLER_SERVICES_KEY];
+    YAML::Node remoteProcessingGroupsNode =
+        rootYaml[CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY];
+    YAML::Node provenanceReportNode =
+        rootYaml[CONFIG_YAML_PROVENANCE_REPORT_KEY];
 
+    parseControllerServices(&controllerServiceNode);
     // Create the root process group
     core::ProcessGroup * root = parseRootProcessGroupYaml(flowControllerNode);
     parseProcessorNodeYaml(processorsNode, root);
@@ -116,6 +126,15 @@ class YamlConfiguration : public FlowConfiguration {
     parseConnectionYaml(&connectionsNode, root);
     parseProvenanceReportingYaml(&provenanceReportNode, root);
 
+    // set the controller services into the root group.
+    for (auto controller_service : controller_services_
+        ->getAllControllerServices()) {
+      root->addControllerService(controller_service->getName(),
+                                 controller_service);
+      root->addControllerService(controller_service->getUUIDStr(),
+                                 controller_service);
+    }
+
     return std::unique_ptr<core::ProcessGroup>(root);
   }
 
@@ -147,7 +166,6 @@ class YamlConfiguration : public FlowConfiguration {
   void parsePortYaml(YAML::Node *portNode, core::ProcessGroup *parent,
                      TransferDirection direction);
 
-
   /**
    * Parses the root level YAML node for the flow configuration and
    * returns a ProcessGroup containing the tree of flow configuration
@@ -158,6 +176,17 @@ class YamlConfiguration : public FlowConfiguration {
    */
   core::ProcessGroup *parseRootProcessGroupYaml(YAML::Node rootNode);
 
+  // Process Property YAML
+  void parseProcessorPropertyYaml(YAML::Node *doc, YAML::Node *node,
+                                  std::shared_ptr<core::Processor> processor);
+  /**
+   * Parse controller services
+   * @param controllerServicesNode controller services YAML node.
+   * @param parent parent process group.
+   */
+  void parseControllerServices(YAML::Node *controllerServicesNode);
+  // Process connection YAML
+
   /**
    * Parses the Connections section of a configuration YAML.
    * The resulting Connections are added to the parent ProcessGroup.
@@ -200,8 +229,9 @@ class YamlConfiguration : public FlowConfiguration {
    * @param propertiesNode the YAML::Node containing the properties
    * @param processor      the Processor to which to add the resulting properties
    */
-  void parsePropertiesNodeYaml(YAML::Node *propertiesNode,
-                               std::shared_ptr<core::Processor> processor);
+  void parsePropertiesNodeYaml(
+      YAML::Node *propertiesNode,
+      std::shared_ptr<core::ConfigurableComponent> processor);
 
   /**
    * A helper function for parsing or generating optional id fields.
@@ -219,7 +249,8 @@ class YamlConfiguration : public FlowConfiguration {
    *                   is optional and defaults to 'id'
    * @return         the parsed or generated UUID string
    */
-  std::string getOrGenerateId(YAML::Node *yamlNode, const std::string &idField = "id");
+  std::string getOrGenerateId(YAML::Node *yamlNode, const std::string &idField =
+                                  "id");
 
   /**
    * This is a helper function for verifying the existence of a required
@@ -239,8 +270,7 @@ class YamlConfiguration : public FlowConfiguration {
    * @throws std::invalid_argument if the required field 'fieldName' is
    *                               not present in 'yamlNode'
    */
-  void checkRequiredField(YAML::Node *yamlNode,
-                          const std::string &fieldName,
+  void checkRequiredField(YAML::Node *yamlNode, const std::string &fieldName,
                           const std::string &yamlSection = "",
                           const std::string &errorMessage = "");
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/include/io/StreamFactory.h
----------------------------------------------------------------------
diff --git a/libminifi/include/io/StreamFactory.h b/libminifi/include/io/StreamFactory.h
index db4625a..3faee45 100644
--- a/libminifi/include/io/StreamFactory.h
+++ b/libminifi/include/io/StreamFactory.h
@@ -30,7 +30,11 @@ namespace io {
 
 class AbstractStreamFactory {
  public:
-  virtual std::unique_ptr<Socket> createSocket(const std::string &host, const uint16_t port) = 0;
+  virtual ~AbstractStreamFactory() {
+  }
+
+  virtual std::unique_ptr<Socket> createSocket(const std::string &host,
+                                               const uint16_t port) = 0;
 };
 
 /**

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/include/io/TLSSocket.h
----------------------------------------------------------------------
diff --git a/libminifi/include/io/TLSSocket.h b/libminifi/include/io/TLSSocket.h
new file mode 100644
index 0000000..011a012
--- /dev/null
+++ b/libminifi/include/io/TLSSocket.h
@@ -0,0 +1,197 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_IO_TLSSOCKET_H_
+#define LIBMINIFI_INCLUDE_IO_TLSSOCKET_H_
+
+#include <cstdint>
+#include "ClientSocket.h"
+#include <atomic>
+#include <mutex>
+
+#include "properties/Configure.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace io {
+
+#include <openssl/ssl.h>
+#include <openssl/err.h>
+
+#define TLS_ERROR_CONTEXT 1
+#define TLS_ERROR_PEM_MISSING 2
+#define TLS_ERROR_CERT_MISSING 3
+#define TLS_ERROR_KEY_ERROR 4
+#define TLS_ERROR_CERT_ERROR 5
+
+class TLSContext {
+
+ public:
+
+  /**
+   * Build an instance, creating a memory fence, which
+   * allows us to avoid locking. This is tantamount to double checked locking.
+   * @returns new TLSContext;
+   */
+  static TLSContext *getInstance() {
+    TLSContext* atomic_context = context_instance.load(
+        std::memory_order_relaxed);
+    std::atomic_thread_fence(std::memory_order_acquire);
+    if (atomic_context == nullptr) {
+      std::lock_guard<std::mutex> lock(context_mutex);
+      atomic_context = context_instance.load(std::memory_order_relaxed);
+      if (atomic_context == nullptr) {
+        atomic_context = new TLSContext();
+        atomic_context->initialize();
+        std::atomic_thread_fence(std::memory_order_release);
+        context_instance.store(atomic_context, std::memory_order_relaxed);
+      }
+    }
+    return atomic_context;
+  }
+
+  virtual ~TLSContext() {
+    if (0 != ctx)
+      SSL_CTX_free(ctx);
+  }
+
+  SSL_CTX *getContext() {
+    return ctx;
+  }
+
+  short getError() {
+    return error_value;
+  }
+
+  short initialize();
+
+ private:
+
+  static int pemPassWordCb(char *buf, int size, int rwflag, void *userdata) {
+    std::string passphrase;
+
+    if (Configure::getConfigure()->get(
+        Configure::nifi_security_client_pass_phrase, passphrase)) {
+
+      std::ifstream file(passphrase.c_str(), std::ifstream::in);
+      if (!file.good()) {
+        memset(buf, 0x00, size);
+        return 0;
+      }
+
+      std::string password;
+      password.assign((std::istreambuf_iterator<char>(file)),
+                      std::istreambuf_iterator<char>());
+      file.close();
+      memset(buf, 0x00, size);
+      memcpy(buf, password.c_str(), password.length() - 1);
+
+      return password.length() - 1;
+    }
+    return 0;
+  }
+
+  TLSContext();
+
+  std::shared_ptr<logging::Logger> logger_;
+  Configure *configuration;
+  SSL_CTX *ctx;
+
+  short error_value;
+
+  static std::atomic<TLSContext*> context_instance;
+  static std::mutex context_mutex;
+};
+
+class TLSSocket : public Socket {
+ public:
+
+  /**
+   * Constructor that accepts host name, port and listeners. With this
+   * contructor we will be creating a server socket
+   * @param hostname our host name
+   * @param port connecting port
+   * @param listeners number of listeners in the queue
+   */
+  explicit TLSSocket(const std::string &hostname, const uint16_t port,
+                     const uint16_t listeners);
+
+  /**
+   * Constructor that creates a client socket.
+   * @param hostname hostname we are connecting to.
+   * @param port port we are connecting to.
+   */
+  explicit TLSSocket(const std::string &hostname, const uint16_t port);
+
+  /**
+   * Move constructor.
+   */
+  explicit TLSSocket(const TLSSocket &&);
+
+  virtual ~TLSSocket();
+
+  /**
+   * Initializes the socket
+   * @return result of the creation operation.
+   */
+  short initialize();
+
+  /**
+   * Attempt to select the socket file descriptor
+   * @param msec timeout interval to wait
+   * @returns file descriptor
+   */
+  virtual short select_descriptor(const uint16_t msec);
+
+  /**
+   * Reads data and places it into buf
+   * @param buf buffer in which we extract data
+   * @param buflen
+   */
+  virtual int readData(uint8_t *buf, int buflen);
+
+  /**
+   * Write value to the stream using std::vector
+   * @param buf incoming buffer
+   * @param buflen buffer to write
+   *
+   */
+  int writeData(std::vector<uint8_t> &buf, int buflen);
+
+  /**
+   * Write value to the stream using uint8_t ptr
+   * @param buf incoming buffer
+   * @param buflen buffer to write
+   *
+   */
+  int writeData(uint8_t *value, int size);
+
+ protected:
+
+  SSL* ssl;
+
+};
+
+} /* namespace io */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_IO_TLSSOCKET_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/include/io/validation.h
----------------------------------------------------------------------
diff --git a/libminifi/include/io/validation.h b/libminifi/include/io/validation.h
index 9dd1b8a..a5e1bc5 100644
--- a/libminifi/include/io/validation.h
+++ b/libminifi/include/io/validation.h
@@ -56,7 +56,7 @@ static auto IsNullOrEmpty(
 template<typename T>
 static auto IsNullOrEmpty(
     T *object) -> typename std::enable_if<size_function_functor_checker<T>::has_size_function==1, bool>::type {
-  return (0 == object || object->size() == 0);
+  return (nullptr == object || object->size() == 0);
 }
 
 /**
@@ -65,8 +65,18 @@ static auto IsNullOrEmpty(
 template<typename T>
 static auto IsNullOrEmpty(
     T *object) -> typename std::enable_if<not size_function_functor_checker<T>::has_size_function , bool>::type {
-  return (0 == object);
+  return (nullptr == object);
 }
+
+/**
+ * Determines if the variable is null or ::size() == 0
+ */
+template<typename T>
+static auto IsNullOrEmpty(
+    std::shared_ptr<T> object) -> typename std::enable_if<not size_function_functor_checker<T>::has_size_function , bool>::type {
+  return (nullptr == object || nullptr == object.get());
+}
+
 /**
  * Determines if the variable is null or strlen(str) == 0
  */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/include/processors/AppendHostInfo.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/AppendHostInfo.h b/libminifi/include/processors/AppendHostInfo.h
index a16dff3..d33c717 100644
--- a/libminifi/include/processors/AppendHostInfo.h
+++ b/libminifi/include/processors/AppendHostInfo.h
@@ -25,6 +25,7 @@
 #include "core/Processor.h"
 #include "core/ProcessSession.h"
 #include "core/Core.h"
+#include "core/Resource.h"
 
 namespace org {
 namespace apache {
@@ -70,6 +71,9 @@ class AppendHostInfo : public core::Processor {
   std::shared_ptr<logging::Logger> logger_;
 };
 
+REGISTER_RESOURCE(AppendHostInfo);
+
+
 } /* namespace processors */
 } /* namespace minifi */
 } /* namespace nifi */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/include/processors/ExecuteProcess.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/ExecuteProcess.h b/libminifi/include/processors/ExecuteProcess.h
index f74f489..dbf2d15 100644
--- a/libminifi/include/processors/ExecuteProcess.h
+++ b/libminifi/include/processors/ExecuteProcess.h
@@ -35,6 +35,7 @@
 #include "core/Processor.h"
 #include "core/ProcessSession.h"
 #include "core/Core.h"
+#include "core/Resource.h"
 
 namespace org {
 namespace apache {
@@ -115,6 +116,8 @@ class ExecuteProcess : public core::Processor {
   pid_t _pid;
 };
 
+REGISTER_RESOURCE(ExecuteProcess);
+
 } /* namespace processors */
 } /* namespace minifi */
 } /* namespace nifi */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/include/processors/GenerateFlowFile.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/GenerateFlowFile.h b/libminifi/include/processors/GenerateFlowFile.h
index d15a02c..2f24e64 100644
--- a/libminifi/include/processors/GenerateFlowFile.h
+++ b/libminifi/include/processors/GenerateFlowFile.h
@@ -24,6 +24,7 @@
 #include "core/Processor.h"
 #include "core/ProcessSession.h"
 #include "core/Core.h"
+#include "core/Resource.h"
 
 namespace org {
 namespace apache {
@@ -89,6 +90,8 @@ class GenerateFlowFile : public core::Processor {
   uint64_t _dataSize;
 };
 
+REGISTER_RESOURCE(GenerateFlowFile);
+
 } /* namespace processors */
 } /* namespace minifi */
 } /* namespace nifi */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/include/processors/GetFile.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/GetFile.h b/libminifi/include/processors/GetFile.h
index 5345404..e25e7db 100644
--- a/libminifi/include/processors/GetFile.h
+++ b/libminifi/include/processors/GetFile.h
@@ -23,6 +23,7 @@
 #include "core/Processor.h"
 #include "core/ProcessSession.h"
 #include "core/Core.h"
+#include "core/Resource.h"
 
 namespace org {
 namespace apache {
@@ -134,6 +135,8 @@ class GetFile : public core::Processor {
 
 };
 
+REGISTER_RESOURCE(GetFile);
+
 } /* namespace processors */
 } /* namespace minifi */
 } /* namespace nifi */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/include/processors/InvokeHTTP.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/InvokeHTTP.h b/libminifi/include/processors/InvokeHTTP.h
index 789b3b5..ca4fef6 100644
--- a/libminifi/include/processors/InvokeHTTP.h
+++ b/libminifi/include/processors/InvokeHTTP.h
@@ -28,6 +28,8 @@
 #include "core/ProcessSession.h"
 #include "core/Core.h"
 #include "core/Property.h"
+#include "core/Resource.h"
+#include "controllers/SSLContextService.h"
 #include "utils/ByteInputCallBack.h"
 
 namespace org {
@@ -109,7 +111,8 @@ class InvokeHTTP : public core::Processor {
         connect_timeout_(20000),
         penalize_no_retry_(false),
         read_timeout_(20000),
-        always_output_response_(false) {
+        always_output_response_(false),
+        ssl_context_service_(nullptr) {
     curl_global_init(CURL_GLOBAL_DEFAULT);
   }
   // Destructor
@@ -161,6 +164,23 @@ class InvokeHTTP : public core::Processor {
  protected:
 
   /**
+   * Configures the SSL Context. Relies on the Context service and OpenSSL's installation
+   */
+  static CURLcode configure_ssl_context(CURL *curl, void *ctx, void *param);
+
+  /**
+   * Determines if a secure connection is required
+   * @param url url we will be connecting to
+   * @returns true if secure connection is allowed/required
+   */
+  bool isSecure(const std::string &url);
+
+  /**
+   * Configures a secure connection
+   */
+  void configure_secure_connection(CURL *http_session);
+
+  /**
    * Generate a transaction ID
    * @return transaction ID string.
    */
@@ -190,13 +210,17 @@ class InvokeHTTP : public core::Processor {
   void route(std::shared_ptr<FlowFileRecord> &request,
              std::shared_ptr<FlowFileRecord> &response,
              core::ProcessSession *session, core::ProcessContext *context,
-             bool isSuccess, int statusCode);
+             bool isSuccess,
+             int statusCode);
   /**
    * Determine if we should emit a new flowfile based on our activity
    * @param method method type
    * @return result of the evaluation.
    */
   bool emitFlowFile(const std::string &method);
+
+  std::shared_ptr<minifi::controllers::SSLContextService> ssl_context_service_;
+
   CURLcode res;
 
   // http method
@@ -219,6 +243,8 @@ class InvokeHTTP : public core::Processor {
   bool penalize_no_retry_;
 };
 
+REGISTER_RESOURCE(InvokeHTTP)
+
 } /* namespace processors */
 } /* namespace minifi */
 } /* namespace nifi */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/include/processors/ListenHTTP.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/ListenHTTP.h b/libminifi/include/processors/ListenHTTP.h
index 69432be..586a699 100644
--- a/libminifi/include/processors/ListenHTTP.h
+++ b/libminifi/include/processors/ListenHTTP.h
@@ -29,6 +29,7 @@
 #include "core/Processor.h"
 #include "core/ProcessSession.h"
 #include "core/Core.h"
+#include "core/Resource.h"
 
 namespace org {
 namespace apache {
@@ -105,8 +106,6 @@ class ListenHTTP : public core::Processor {
     const struct mg_request_info *_reqInfo;
   };
 
- protected:
-
  private:
   // Logger
   std::shared_ptr<logging::Logger> _logger;
@@ -115,6 +114,9 @@ class ListenHTTP : public core::Processor {
   std::unique_ptr<Handler> _handler;
 };
 
+
+REGISTER_RESOURCE(ListenHTTP);
+
 } /* namespace processors */
 } /* namespace minifi */
 } /* namespace nifi */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/include/processors/ListenSyslog.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/ListenSyslog.h b/libminifi/include/processors/ListenSyslog.h
index cbbdf41..4e642e8 100644
--- a/libminifi/include/processors/ListenSyslog.h
+++ b/libminifi/include/processors/ListenSyslog.h
@@ -36,6 +36,7 @@
 #include "core/Processor.h"
 #include "core/ProcessSession.h"
 #include "core/Core.h"
+#include "core/Resource.h"
 
 namespace org {
 namespace apache {
@@ -206,6 +207,8 @@ class ListenSyslog : public core::Processor {
   char _buffer[2048];
 };
 
+REGISTER_RESOURCE(ListenSyslog);
+
 } /* namespace processors */
 } /* namespace minifi */
 } /* namespace nifi */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/include/processors/LoadProcessors.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/LoadProcessors.h b/libminifi/include/processors/LoadProcessors.h
new file mode 100644
index 0000000..7a16773
--- /dev/null
+++ b/libminifi/include/processors/LoadProcessors.h
@@ -0,0 +1,34 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_PROCESSORS_LOADPROCESSORS_H_
+#define LIBMINIFI_INCLUDE_PROCESSORS_LOADPROCESSORS_H_
+
+#include "core/Core.h"
+#include "core/Resource.h"
+
+#include "AppendHostInfo.h"
+#include "ExecuteProcess.h"
+#include "GenerateFlowFile.h"
+#include "GetFile.h"
+#include "ListenHTTP.h"
+#include "LogAttribute.h"
+#include "PutFile.h"
+#include "TailFile.h"
+
+
+#endif /* LIBMINIFI_INCLUDE_PROCESSORS_LOADPROCESSORS_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/include/processors/LogAttribute.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/LogAttribute.h b/libminifi/include/processors/LogAttribute.h
index dcc802d..56864c7 100644
--- a/libminifi/include/processors/LogAttribute.h
+++ b/libminifi/include/processors/LogAttribute.h
@@ -24,6 +24,7 @@
 #include "core/Processor.h"
 #include "core/ProcessSession.h"
 #include "core/Core.h"
+#include "core/Resource.h"
 
 namespace org {
 namespace apache {
@@ -120,6 +121,8 @@ class LogAttribute : public core::Processor {
   std::shared_ptr<logging::Logger> logger_;
 };
 
+REGISTER_RESOURCE(LogAttribute);
+
 } /* namespace processors */
 } /* namespace minifi */
 } /* namespace nifi */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/include/processors/PutFile.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/PutFile.h b/libminifi/include/processors/PutFile.h
index cc5dfca..a51f6b5 100644
--- a/libminifi/include/processors/PutFile.h
+++ b/libminifi/include/processors/PutFile.h
@@ -24,6 +24,7 @@
 #include "core/Processor.h"
 #include "core/ProcessSession.h"
 #include "core/Core.h"
+#include "core/Resource.h"
 
 namespace org {
 namespace apache {
@@ -103,6 +104,8 @@ class PutFile : public core::Processor {
                const std::string &tmpFile, const std::string &destFile);
 };
 
+REGISTER_RESOURCE(PutFile);
+
 } /* namespace processors */
 } /* namespace minifi */
 } /* namespace nifi */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/include/processors/TailFile.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/TailFile.h b/libminifi/include/processors/TailFile.h
index c6349a0..ac7db5a 100644
--- a/libminifi/include/processors/TailFile.h
+++ b/libminifi/include/processors/TailFile.h
@@ -24,6 +24,7 @@
 #include "core/Processor.h"
 #include "core/ProcessSession.h"
 #include "core/Core.h"
+#include "core/Resource.h"
 
 namespace org {
 namespace apache {
@@ -73,8 +74,9 @@ class TailFile : public core::Processor {
   std::string _stateFile;
   // State related to the tailed file
   std::string _currentTailFileName;
-  uint64_t _currentTailFilePosition;
+  // determine if state is recovered;
   bool _stateRecovered;
+  uint64_t _currentTailFilePosition;
   uint64_t _currentTailFileCreatedTime;
   static const int BUFFER_SIZE = 512;
 
@@ -89,6 +91,8 @@ class TailFile : public core::Processor {
 
 };
 
+REGISTER_RESOURCE(TailFile);
+
 // Matched File Item for Roll over check
 typedef struct {
   std::string fileName;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/include/properties/Configure.h
----------------------------------------------------------------------
diff --git a/libminifi/include/properties/Configure.h b/libminifi/include/properties/Configure.h
index cb1b412..4119edd 100644
--- a/libminifi/include/properties/Configure.h
+++ b/libminifi/include/properties/Configure.h
@@ -28,6 +28,7 @@
 #include <iostream>
 #include <fstream>
 #include "core/Core.h"
+#include "utils/StringUtils.h"
 #include "core/logging/Logger.h"
 
 namespace org {
@@ -38,7 +39,9 @@ namespace minifi {
 class Configure {
  public:
   // nifi.flow.configuration.file
+  static const char *nifi_default_directory;
   static const char *nifi_flow_configuration_file;
+  static const char *nifi_flow_engine_threads;
   static const char *nifi_administrative_yield_duration;
   static const char *nifi_bored_yield_duration;
   static const char *nifi_graceful_shutdown_seconds;
@@ -80,7 +83,7 @@ class Configure {
   // Set the config value
   void set(std::string key, std::string value) {
     std::lock_guard<std::mutex> lock(mutex_);
-    properties_[key] = value;
+    properties_[key] = std::string(value.c_str());
   }
   // Check whether the config value existed
   bool has(std::string key) {
@@ -89,6 +92,12 @@ class Configure {
   }
   // Get the config value
   bool get(std::string key, std::string &value);
+
+  /**
+   * Returns the configuration value or an empty string.
+   * @return value corresponding to key or empty value.
+   */
+  int getInt(const std::string &key, int default_value);
   // Parse one line in configure file like key=value
   void parseConfigureFileLine(char *buf);
   // Load Configure File

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/include/utils/ThreadPool.h
----------------------------------------------------------------------
diff --git a/libminifi/include/utils/ThreadPool.h b/libminifi/include/utils/ThreadPool.h
index 7508900..e3c15d8 100644
--- a/libminifi/include/utils/ThreadPool.h
+++ b/libminifi/include/utils/ThreadPool.h
@@ -35,58 +35,54 @@ namespace utils {
  * purpose: Provides a wrapper for the functor
  * and returns a future based on the template argument.
  */
-template< typename T>
-class Worker{
-public:
-  explicit Worker(std::function<T()> &task) : task(task)
-  {
+template<typename T>
+class Worker {
+ public:
+  explicit Worker(std::function<T()> &task)
+      : task(task) {
     promise = std::make_shared<std::promise<T>>();
   }
 
   /**
    * Move constructor for worker tasks
    */
-  Worker(Worker &&other) : task (std::move(other.task)),
-						promise(other.promise)
-  {
+  Worker(Worker &&other)
+      : task(std::move(other.task)),
+        promise(other.promise) {
   }
 
-
   /**
    * Runs the task and takes the output from the funtor
    * setting the result into the promise
    */
-  void run()
-  {
+  void run() {
     T result = task();
     promise->set_value(result);
   }
 
-   Worker<T>(const Worker<T>&) = delete;
-    Worker<T>& operator = (const Worker<T>&) = delete;
+  Worker<T>(const Worker<T>&) = delete;
+  Worker<T>& operator =(const Worker<T>&) = delete;
 
-  Worker<T>& operator = (Worker<T>&&) ;
+  Worker<T>& operator =(Worker<T> &&);
 
   std::shared_ptr<std::promise<T>> getPromise();
 
-private:
-   std::function<T()> task;
-   std::shared_ptr<std::promise<T>> promise;
+ private:
+  std::function<T()> task;
+  std::shared_ptr<std::promise<T>> promise;
 };
 
-template< typename T>
-Worker<T>&  Worker<T>::operator = (Worker<T>&& other)
-{
-    task = std::move(other.task);
-    promise = other.promise;
-    return *this;
+template<typename T>
+Worker<T>& Worker<T>::operator =(Worker<T> && other) {
+  task = std::move(other.task);
+  promise = other.promise;
+  return *this;
 }
 
-
 template<typename T>
-std::shared_ptr<std::promise<T>> Worker<T>::getPromise(){
-    return promise;
-  }
+std::shared_ptr<std::promise<T>> Worker<T>::getPromise() {
+  return promise;
+}
 
 /**
  * Thread pool
@@ -95,171 +91,190 @@ std::shared_ptr<std::promise<T>> Worker<T>::getPromise(){
  * Design: Locked control over a manager thread that controls the worker threads
  */
 template<typename T>
-class ThreadPool
-    {
-    public:
-        ThreadPool(int max_worker_threads, bool daemon_threads=false) : max_worker_threads_(max_worker_threads)
-	,daemon_threads_(daemon_threads), running_(false){
-	  current_workers_ = 0;
-	}
-        virtual ~ThreadPool(){
-	  shutdown();
-	}
-
-	/**
-	 * Execute accepts a worker task and returns
-	 * a future
-	 * @param task this thread pool will subsume ownership of
-	 * the worker task
-	 * @return future with the impending result.
-	 */
-        std::future<T> execute(Worker<T> &&task);
-	/**
-	 * Starts the Thread Pool
-	 */
-        void start();
-	/**
-	 * Shutdown the thread pool and clear any
-	 * currently running activities
-	 */
-	void shutdown();
-	/**
-	 * Set the max concurrent tasks. When this is done
-	 * we must start and restart the thread pool if
-	 * the number of tasks is less than the currently configured number
-	 */
-	void setMaxConcurrentTasks(uint16_t max)
-	{
-	  std::lock_guard<std::recursive_mutex> lock(manager_mutex_);
-	  if (running_)
-	  {
-	    shutdown();
-	  }
-	  max_worker_threads_= max;
-	  if (!running_)
-	    start();
-	}
-
-    protected:
-
-	/**
-	* Drain will notify tasks to stop following notification
-	*/
-	void drain()
-	{
-	  while(current_workers_ > 0)
-	  {
-	    tasks_available_.notify_one();
-	  }
-	}
-	// determines if threads are detached
-	bool daemon_threads_;
-	// max worker threads
-        int max_worker_threads_;
-	// current worker tasks.
-	std::atomic<int> current_workers_;
-	// thread queue
-        std::vector<std::thread> thread_queue_;
-	// manager thread
-        std::thread manager_thread_;
-	// atomic running boolean
-	std::atomic<bool> running_;
-	// worker queue of worker objects
-        std::queue<Worker<T>> worker_queue_;
-	// notification for available work
-        std::condition_variable tasks_available_;
-	// manager mutex
-	std::recursive_mutex manager_mutex_;
-	// work queue mutex
-        std::mutex worker_queue_mutex_;
-
-	/**
-	 * Call for the manager to start worker threads
-	 */
-	void startWorkers();
-
-	/**
-	 * Runs worker tasks
-	 */
-        void run_tasks();
-    };
+class ThreadPool {
+ public:
+
+  ThreadPool(int max_worker_threads = 8, bool daemon_threads = false)
+      : max_worker_threads_(max_worker_threads),
+        daemon_threads_(daemon_threads),
+        running_(false) {
+    current_workers_ = 0;
+  }
+
+  ThreadPool(const ThreadPool<T> &&other)
+      : max_worker_threads_(std::move(other.max_worker_threads_)),
+        daemon_threads_(std::move(other.daemon_threads_)),
+        running_(false) {
+    current_workers_ = 0;
+  }
+  virtual ~ThreadPool() {
+    shutdown();
+  }
+
+  /**
+   * Execute accepts a worker task and returns
+   * a future
+   * @param task this thread pool will subsume ownership of
+   * the worker task
+   * @return future with the impending result.
+   */
+  std::future<T> execute(Worker<T> &&task);
+  /**
+   * Starts the Thread Pool
+   */
+  void start();
+  /**
+   * Shutdown the thread pool and clear any
+   * currently running activities
+   */
+  void shutdown();
+  /**
+   * Set the max concurrent tasks. When this is done
+   * we must start and restart the thread pool if
+   * the number of tasks is less than the currently configured number
+   */
+  void setMaxConcurrentTasks(uint16_t max) {
+    std::lock_guard<std::recursive_mutex> lock(manager_mutex_);
+    if (running_) {
+      shutdown();
+    }
+    max_worker_threads_ = max;
+    if (!running_)
+      start();
+  }
+
+  ThreadPool<T> operator=(const ThreadPool<T> &other) = delete;
+  ThreadPool(const ThreadPool<T> &other) = delete;
+
+  ThreadPool<T> &operator=(ThreadPool<T> &&other) {
+    std::lock_guard<std::recursive_mutex> lock(manager_mutex_);
+    if (other.running_) {
+      other.shutdown();
+    }
+    if (running_) {
+      shutdown();
+    }
+    max_worker_threads_ = std::move(other.max_worker_threads_);
+    daemon_threads_ = std::move(other.daemon_threads_);
+    current_workers_ = 0;
+
+    thread_queue_ = std::move(other.thread_queue_);
+    worker_queue_ = std::move(other.worker_queue_);
+    if (!running_) {
+      start();
+    }
+    return *this;
+  }
+
+ protected:
+
+  /**
+   * Drain will notify tasks to stop following notification
+   */
+  void drain() {
+    while (current_workers_ > 0) {
+      tasks_available_.notify_one();
+    }
+  }
+// determines if threads are detached
+  bool daemon_threads_;
+// max worker threads
+  int max_worker_threads_;
+// current worker tasks.
+  std::atomic<int> current_workers_;
+// thread queue
+  std::vector<std::thread> thread_queue_;
+// manager thread
+  std::thread manager_thread_;
+// atomic running boolean
+  std::atomic<bool> running_;
+// worker queue of worker objects
+  std::queue<Worker<T>> worker_queue_;
+// notification for available work
+  std::condition_variable tasks_available_;
+// manager mutex
+  std::recursive_mutex manager_mutex_;
+// work queue mutex
+  std::mutex worker_queue_mutex_;
+
+  /**
+   * Call for the manager to start worker threads
+   */
+  void startWorkers();
+
+  /**
+   * Runs worker tasks
+   */
+  void run_tasks();
+}
+;
 
 template<typename T>
-std::future<T> ThreadPool<T>::execute(Worker<T> &&task){
+std::future<T> ThreadPool<T>::execute(Worker<T> &&task) {
 
   std::unique_lock<std::mutex> lock(worker_queue_mutex_);
   bool wasEmpty = worker_queue_.empty();
   std::future<T> future = task.getPromise()->get_future();
   worker_queue_.push(std::move(task));
-  if (wasEmpty)
-  {
-      tasks_available_.notify_one();
+  if (wasEmpty) {
+    tasks_available_.notify_one();
   }
   return future;
 }
 
-template< typename T>
-void  ThreadPool<T>::startWorkers(){
-    for (int i = 0; i < max_worker_threads_; i++)
-    {
-      thread_queue_.push_back( std::thread(&ThreadPool::run_tasks, this));
-      current_workers_++;
-    }
+template<typename T>
+void ThreadPool<T>::startWorkers() {
+  for (int i = 0; i < max_worker_threads_; i++) {
+    thread_queue_.push_back(std::thread(&ThreadPool::run_tasks, this));
+    current_workers_++;
+  }
 
-     if (daemon_threads_)
-     {
-	for (auto &thread : thread_queue_){
-	    thread.detach();
-	}
-     }
-    for (auto &thread : thread_queue_)
-    {
-	if (thread.joinable())
-	  thread.join();
+  if (daemon_threads_) {
+    for (auto &thread : thread_queue_) {
+      thread.detach();
     }
+  }
+  for (auto &thread : thread_queue_) {
+    if (thread.joinable())
+      thread.join();
+  }
 }
-template< typename T>
-void  ThreadPool<T>::run_tasks()
-{
-  while (running_.load())
-    {
-	std::unique_lock<std::mutex> lock(worker_queue_mutex_);
-	if (worker_queue_.empty())
-	{
+template<typename T>
+void ThreadPool<T>::run_tasks() {
+  while (running_.load()) {
+    std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+    if (worker_queue_.empty()) {
 
-	    tasks_available_.wait(lock);
-	}
+      tasks_available_.wait(lock);
+    }
 
-	if (!running_.load())
-	  break;
+    if (!running_.load())
+      break;
 
-	if (worker_queue_.empty())
-	  continue;
-	Worker<T> task = std::move(worker_queue_.front());
-	worker_queue_.pop();
-	task.run();
-    }
-    current_workers_--;
+    if (worker_queue_.empty())
+      continue;
+    Worker<T> task = std::move(worker_queue_.front());
+    worker_queue_.pop();
+    task.run();
+  }
+  current_workers_--;
 
 }
-template< typename T>
- void ThreadPool<T>::start()
-{
+template<typename T>
+void ThreadPool<T>::start() {
   std::lock_guard<std::recursive_mutex> lock(manager_mutex_);
-  if (!running_)
-  {
+  if (!running_) {
     running_ = true;
     manager_thread_ = std::thread(&ThreadPool::startWorkers, this);
 
   }
 }
 
-template< typename T>
-void ThreadPool<T>::shutdown(){
+template<typename T>
+void ThreadPool<T>::shutdown() {
 
   std::lock_guard<std::recursive_mutex> lock(manager_mutex_);
-  if (running_.load())
-  {
+  if (running_.load()) {
 
     running_.store(false);
 
@@ -267,21 +282,19 @@ void ThreadPool<T>::shutdown(){
     if (manager_thread_.joinable())
       manager_thread_.join();
     {
-    std::unique_lock<std::mutex> lock(worker_queue_mutex_);
-    thread_queue_.clear();
-    current_workers_ = 0;
-    while(!worker_queue_.empty())
-      worker_queue_.pop();
+      std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+      thread_queue_.clear();
+      current_workers_ = 0;
+      while (!worker_queue_.empty())
+        worker_queue_.pop();
     }
   }
 }
 
-
 } /* namespace utils */
 } /* namespace minifi */
 } /* namespace nifi */
 } /* namespace apache */
 } /* namespace org */
 
-
 #endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/src/Configure.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/Configure.cpp b/libminifi/src/Configure.cpp
index 95562c3..f35e88a 100644
--- a/libminifi/src/Configure.cpp
+++ b/libminifi/src/Configure.cpp
@@ -16,6 +16,7 @@
  * limitations under the License.
  */
 #include "properties/Configure.h"
+#include <cstdlib>
 #include <string>
 #include "utils/StringUtils.h"
 #include "core/Core.h"
@@ -25,8 +26,10 @@ namespace apache {
 namespace nifi {
 namespace minifi {
 
+const char *Configure::nifi_default_directory = "nifi.default.directory";
 const char *Configure::nifi_flow_configuration_file =
     "nifi.flow.configuration.file";
+const char *Configure::nifi_flow_engine_threads = "nifi.flow.engine.threads";
 const char *Configure::nifi_administrative_yield_duration =
     "nifi.administrative.yield.duration";
 const char *Configure::nifi_bored_yield_duration = "nifi.bored.yield.duration";
@@ -82,6 +85,17 @@ bool Configure::get(std::string key, std::string &value) {
   }
 }
 
+int Configure::getInt(const std::string &key, int default_value) {
+  std::lock_guard<std::mutex> lock(mutex_);
+  auto it = properties_.find(key);
+
+  if (it != properties_.end()) {
+    return std::atol(it->second.c_str());
+  } else {
+    return default_value;
+  }
+}
+
 // Parse one line in configure file like key=value
 void Configure::parseConfigureFileLine(char *buf) {
   char *line = buf;
@@ -124,8 +138,7 @@ void Configure::loadConfigureFile(const char *fileName) {
   if (fileName) {
     // perform a naive determination if this is a relative path
     if (fileName[0] != '/') {
-      adjustedFilename = adjustedFilename + getHome() + "/"
-          + fileName;
+      adjustedFilename = adjustedFilename + getHome() + "/" + fileName;
     } else {
       adjustedFilename += fileName;
     }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/src/EventDrivenSchedulingAgent.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/EventDrivenSchedulingAgent.cpp b/libminifi/src/EventDrivenSchedulingAgent.cpp
index cbb60ea..fa2171b 100644
--- a/libminifi/src/EventDrivenSchedulingAgent.cpp
+++ b/libminifi/src/EventDrivenSchedulingAgent.cpp
@@ -44,10 +44,10 @@ void EventDrivenSchedulingAgent::run(
       // Honor the yield
       std::this_thread::sleep_for(
           std::chrono::milliseconds(processor->getYieldTime()));
-    } else if (shouldYield && this->_boredYieldDuration > 0) {
+    } else if (shouldYield && this->bored_yield_duration_ > 0) {
       // No work to do or need to apply back pressure
       std::this_thread::sleep_for(
-          std::chrono::milliseconds(this->_boredYieldDuration));
+          std::chrono::milliseconds(this->bored_yield_duration_));
     }
 
     // Block until work is available

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/src/FlowController.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index 6785a9d..5f6e014 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -37,6 +37,7 @@
 #include "core/ProcessGroup.h"
 #include "utils/StringUtils.h"
 #include "core/Core.h"
+#include "core/controller/ControllerServiceProvider.h"
 #include "core/repository/FlowFileRepository.h"
 
 namespace org {
@@ -52,7 +53,8 @@ FlowController::FlowController(
     std::shared_ptr<Configure> configure,
     std::unique_ptr<core::FlowConfiguration> flow_configuration,
     const std::string name, bool headless_mode)
-    : CoreComponent(core::getClassName<FlowController>()),
+    : core::controller::ControllerServiceProvider(
+          core::getClassName<FlowController>()),
       root_(nullptr),
       max_timer_driven_threads_(0),
       max_event_driven_threads_(0),
@@ -61,14 +63,20 @@ FlowController::FlowController(
       provenance_repo_(provenance_repo),
       flow_file_repo_(flow_file_repo),
       protocol_(0),
-      _timerScheduler(provenance_repo_, configure),
-      _eventScheduler(provenance_repo_, configure),
-      flow_configuration_(std::move(flow_configuration)) {
+      controller_service_map_(
+          std::make_shared<core::controller::ControllerServiceMap>()),
+      timer_scheduler_(nullptr),
+      event_scheduler_(nullptr),
+      controller_service_provider_(nullptr),
+      flow_configuration_(std::move(flow_configuration)),
+      configuration_(configure) {
   if (provenance_repo == nullptr)
     throw std::runtime_error("Provenance Repo should not be null");
   if (flow_file_repo == nullptr)
     throw std::runtime_error("Flow Repo should not be null");
-
+  if (IsNullOrEmpty(configuration_)) {
+    throw std::runtime_error("Must supply a configuration.");
+  }
   uuid_generate(uuid_);
   setUUID(uuid_);
 
@@ -80,14 +88,14 @@ FlowController::FlowController(
   max_timer_driven_threads_ = DEFAULT_MAX_TIMER_DRIVEN_THREAD;
   running_ = false;
   initialized_ = false;
-  root_ = NULL;
+  root_ = nullptr;
 
   protocol_ = new FlowControlProtocol(this, configure);
 
   if (!headless_mode) {
     std::string rawConfigFileString;
     configure->get(Configure::nifi_flow_configuration_file,
-                    rawConfigFileString);
+                   rawConfigFileString);
 
     if (!rawConfigFileString.empty()) {
       configuration_filename_ = rawConfigFileString;
@@ -162,15 +170,15 @@ void FlowController::stop(bool force) {
     running_ = false;
 
     logger_->log_info("Stop Flow Controller");
-    this->_timerScheduler.stop();
-    this->_eventScheduler.stop();
+    this->timer_scheduler_->stop();
+    this->event_scheduler_->stop();
     this->flow_file_repo_->stop();
     this->provenance_repo_->stop();
     // Wait for sometime for thread stop
     std::this_thread::sleep_for(std::chrono::milliseconds(1000));
     if (this->root_)
-      this->root_->stopProcessing(&this->_timerScheduler,
-                                  &this->_eventScheduler);
+      this->root_->stopProcessing(this->timer_scheduler_.get(),
+                                  this->event_scheduler_.get());
   }
 }
 
@@ -221,14 +229,40 @@ void FlowController::load() {
     stop(true);
   }
   if (!initialized_) {
+    logger_->log_info("Initializing timers");
+    if (nullptr == timer_scheduler_) {
+      timer_scheduler_ = std::make_shared<TimerDrivenSchedulingAgent>(
+          std::static_pointer_cast<core::controller::ControllerServiceProvider>(
+              shared_from_this()),
+          provenance_repo_, configuration_);
+    }
+    if (nullptr == event_scheduler_) {
+      event_scheduler_ = std::make_shared<EventDrivenSchedulingAgent>(
+          std::static_pointer_cast<core::controller::ControllerServiceProvider>(
+              shared_from_this()),
+          provenance_repo_, configuration_);
+    }
     logger_->log_info("Load Flow Controller from file %s",
                       configuration_filename_.c_str());
 
-    this->root_ = flow_configuration_->getRoot(configuration_filename_);
+    this->root_ = std::shared_ptr<core::ProcessGroup>(
+        flow_configuration_->getRoot(configuration_filename_));
+
+    logger_->log_info("Loaded root processor Group");
+
+    controller_service_provider_ = flow_configuration_
+        ->getControllerServiceProvider();
 
+    std::static_pointer_cast<core::controller::StandardControllerServiceProvider>(
+        controller_service_provider_)->setRootGroup(root_);
+    std::static_pointer_cast<core::controller::StandardControllerServiceProvider>(
+        controller_service_provider_)->setSchedulingAgent(
+        std::static_pointer_cast<minifi::SchedulingAgent>(event_scheduler_));
+
+    logger_->log_info("Loaded controller service provider");
     // Load Flow File from Repo
     loadFlowRepo();
-
+    logger_->log_info("Loaded flow repository");
     initialized_ = true;
   }
 }
@@ -255,15 +289,20 @@ void FlowController::reload(std::string yamlFile) {
 }
 
 void FlowController::loadFlowRepo() {
-  if (this->flow_file_repo_) {
+  if (this->flow_file_repo_ != nullptr) {
+    logger_->log_debug("Getting connection map");
     std::map<std::string, std::shared_ptr<Connection>> connectionMap;
     if (this->root_ != nullptr) {
       this->root_->getConnections(connectionMap);
     }
+    logger_->log_debug("Number of connections from connectionMap %d",
+                       connectionMap.size());
     auto rep = std::static_pointer_cast<core::repository::FlowFileRepository>(
         flow_file_repo_);
     rep->setConnectionMap(connectionMap);
     flow_file_repo_->loadComponent();
+  } else {
+    logger_->log_debug("Flow file repository is not set");
   }
 }
 
@@ -276,11 +315,12 @@ bool FlowController::start() {
   } else {
     if (!running_) {
       logger_->log_info("Starting Flow Controller");
-      this->_timerScheduler.start();
-      this->_eventScheduler.start();
+      controller_service_provider_->enableAllControllerServices();
+      this->timer_scheduler_->start();
+      this->event_scheduler_->start();
       if (this->root_ != nullptr) {
-        this->root_->startProcessing(&this->_timerScheduler,
-                                     &this->_eventScheduler);
+        this->root_->startProcessing(this->timer_scheduler_.get(),
+                                     this->event_scheduler_.get());
       }
       running_ = true;
       this->protocol_->start();
@@ -291,6 +331,163 @@ bool FlowController::start() {
     return true;
   }
 }
+/**
+ * Controller Service functions
+ *
+ */
+
+/**
+ * Creates a controller service through the controller service provider impl.
+ * @param type class name
+ * @param id service identifier
+ * @param firstTimeAdded first time this CS was added
+ */
+std::shared_ptr<core::controller::ControllerServiceNode> FlowController::createControllerService(
+    const std::string &type, const std::string &id,
+    bool firstTimeAdded) {
+  return controller_service_provider_->createControllerService(type, id,
+                                                               firstTimeAdded);
+}
+
+/**
+ * controller service provider
+ */
+/**
+ * removes controller service
+ * @param serviceNode service node to be removed.
+ */
+
+void FlowController::removeControllerService(
+    const std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
+  controller_map_->removeControllerService(serviceNode);
+}
+
+/**
+ * Enables the controller service services
+ * @param serviceNode service node which will be disabled, along with linked services.
+ */
+void FlowController::enableControllerService(
+    std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
+  return controller_service_provider_->enableControllerService(serviceNode);
+}
+
+/**
+ * Enables controller services
+ * @param serviceNoden vector of service nodes which will be enabled, along with linked services.
+ */
+void FlowController::enableControllerServices(
+    std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> serviceNodes) {
+}
+
+/**
+ * Disables controller services
+ * @param serviceNode service node which will be disabled, along with linked services.
+ */
+void FlowController::disableControllerService(
+    std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
+  controller_service_provider_->disableControllerService(serviceNode);
+}
+
+/**
+ * Gets all controller services.
+ */
+std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> FlowController::getAllControllerServices() {
+  return controller_service_provider_->getAllControllerServices();
+}
+
+/**
+ * Gets controller service node specified by <code>id</code>
+ * @param id service identifier
+ * @return shared pointer to the controller service node or nullptr if it does not exist.
+ */
+std::shared_ptr<core::controller::ControllerServiceNode> FlowController::getControllerServiceNode(
+    const std::string &id) {
+  return controller_service_provider_->getControllerServiceNode(id);
+}
+
+void FlowController::verifyCanStopReferencingComponents(
+    std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
+}
+
+/**
+ * Unschedules referencing components.
+ */
+std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> FlowController::unscheduleReferencingComponents(
+    std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
+  return controller_service_provider_->unscheduleReferencingComponents(
+      serviceNode);
+}
+
+/**
+ * Verify can disable referencing components
+ * @param serviceNode service node whose referenced components will be scheduled.
+ */
+void FlowController::verifyCanDisableReferencingServices(
+    std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
+  controller_service_provider_->verifyCanDisableReferencingServices(
+      serviceNode);
+}
+
+/**
+ * Disables referencing components
+ * @param serviceNode service node whose referenced components will be scheduled.
+ */
+std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> FlowController::disableReferencingServices(
+    std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
+  return controller_service_provider_->disableReferencingServices(serviceNode);
+}
+
+/**
+ * Verify can enable referencing components
+ * @param serviceNode service node whose referenced components will be scheduled.
+ */
+void FlowController::verifyCanEnableReferencingServices(
+    std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
+  controller_service_provider_->verifyCanEnableReferencingServices(serviceNode);
+}
+
+/**
+ * Determines if the controller service specified by identifier is enabled.
+ */
+bool FlowController::isControllerServiceEnabled(const std::string &identifier) {
+  return controller_service_provider_->isControllerServiceEnabled(identifier);
+}
+
+/**
+ * Enables referencing components
+ * @param serviceNode service node whose referenced components will be scheduled.
+ */
+std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> FlowController::enableReferencingServices(
+    std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
+  return controller_service_provider_->enableReferencingServices(serviceNode);
+}
+
+/**
+ * Schedules referencing components
+ * @param serviceNode service node whose referenced components will be scheduled.
+ */
+std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> FlowController::scheduleReferencingComponents(
+    std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
+  return controller_service_provider_->scheduleReferencingComponents(
+      serviceNode);
+}
+
+/**
+ * Returns controller service components referenced by serviceIdentifier from the embedded
+ * controller service provider;
+ */
+std::shared_ptr<core::controller::ControllerService> FlowController::getControllerServiceForComponent(
+    const std::string &serviceIdentifier, const std::string &componentId) {
+  return controller_service_provider_->getControllerServiceForComponent(
+      serviceIdentifier, componentId);
+}
+
+/**
+ * Enables all controller services for the provider.
+ */
+void FlowController::enableAllControllerServices() {
+  controller_service_provider_->enableAllControllerServices();
+}
 
 } /* namespace minifi */
 } /* namespace nifi */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/src/SchedulingAgent.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/SchedulingAgent.cpp b/libminifi/src/SchedulingAgent.cpp
index d69ba00..fc979fd 100644
--- a/libminifi/src/SchedulingAgent.cpp
+++ b/libminifi/src/SchedulingAgent.cpp
@@ -20,6 +20,7 @@
 #include "SchedulingAgent.h"
 #include <chrono>
 #include <thread>
+#include <utility>
 #include <memory>
 #include <iostream>
 #include "Exception.h"
@@ -39,6 +40,36 @@ bool SchedulingAgent::hasWorkToDo(std::shared_ptr<core::Processor> processor) {
     return false;
 }
 
+void SchedulingAgent::enableControllerService(
+    std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
+
+  logger_->log_trace("Enabling CSN in SchedulingAgent %s",
+                     serviceNode->getName());
+  // reference the enable function from serviceNode
+  std::function<bool()> f_ex = [serviceNode] {
+    return serviceNode->enable();
+  };
+  // create a functor that will be submitted to the thread pool.
+  utils::Worker<bool> functor(f_ex);
+  // move the functor into the thread pool. While a future is returned
+  // we aren't terribly concerned with the result.
+  component_lifecycle_thread_pool_.execute(std::move(functor));
+}
+
+void SchedulingAgent::disableControllerService(
+    std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
+
+  // reference the disable function from serviceNode
+  std::function<bool()> f_ex = [serviceNode] {
+    return serviceNode->disable();
+  };
+  // create a functor that will be submitted to the thread pool.
+  utils::Worker<bool> functor(f_ex);
+  // move the functor into the thread pool. While a future is returned
+  // we aren't terribly concerned with the result.
+  component_lifecycle_thread_pool_.execute(std::move(functor));
+}
+
 bool SchedulingAgent::hasTooMuchOutGoing(
     std::shared_ptr<core::Processor> processor) {
   return processor->flowFilesOutGoingFull();
@@ -71,11 +102,11 @@ bool SchedulingAgent::onTrigger(std::shared_ptr<core::Processor> processor,
     processor->decrementActiveTask();
   } catch (std::exception &exception) {
     logger_->log_debug("Caught Exception %s", exception.what());
-    processor->yield(_administrativeYieldDuration);
+    processor->yield(admin_yield_duration_);
     processor->decrementActiveTask();
   } catch (...) {
     logger_->log_debug("Caught Exception during SchedulingAgent::onTrigger");
-    processor->yield(_administrativeYieldDuration);
+    processor->yield(admin_yield_duration_);
     processor->decrementActiveTask();
   }
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/src/ThreadedSchedulingAgent.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/ThreadedSchedulingAgent.cpp b/libminifi/src/ThreadedSchedulingAgent.cpp
index 65e7531..7e9bb03 100644
--- a/libminifi/src/ThreadedSchedulingAgent.cpp
+++ b/libminifi/src/ThreadedSchedulingAgent.cpp
@@ -39,30 +39,28 @@ void ThreadedSchedulingAgent::schedule(
     std::shared_ptr<core::Processor> processor) {
   std::lock_guard<std::mutex> lock(mutex_);
 
-  _administrativeYieldDuration = 0;
+  admin_yield_duration_ = 0;
   std::string yieldValue;
 
   if (configure_->get(Configure::nifi_administrative_yield_duration,
                       yieldValue)) {
     core::TimeUnit unit;
-    if (core::Property::StringToTime(yieldValue, _administrativeYieldDuration,
-                                     unit)
-        && core::Property::ConvertTimeUnitToMS(_administrativeYieldDuration,
-                                               unit,
-                                               _administrativeYieldDuration)) {
+    if (core::Property::StringToTime(yieldValue, admin_yield_duration_, unit)
+        && core::Property::ConvertTimeUnitToMS(admin_yield_duration_, unit,
+                                               admin_yield_duration_)) {
       logger_->log_debug("nifi_administrative_yield_duration: [%d] ms",
-                         _administrativeYieldDuration);
+                         admin_yield_duration_);
     }
   }
 
-  _boredYieldDuration = 0;
+  bored_yield_duration_ = 0;
   if (configure_->get(Configure::nifi_bored_yield_duration, yieldValue)) {
     core::TimeUnit unit;
-    if (core::Property::StringToTime(yieldValue, _boredYieldDuration, unit)
-        && core::Property::ConvertTimeUnitToMS(_boredYieldDuration, unit,
-                                               _boredYieldDuration)) {
+    if (core::Property::StringToTime(yieldValue, bored_yield_duration_, unit)
+        && core::Property::ConvertTimeUnitToMS(bored_yield_duration_, unit,
+                                               bored_yield_duration_)) {
       logger_->log_debug("nifi_bored_yield_duration: [%d] ms",
-                         _boredYieldDuration);
+                         bored_yield_duration_);
     }
   }
 
@@ -82,8 +80,8 @@ void ThreadedSchedulingAgent::schedule(
   }
 
   core::ProcessorNode processor_node(processor);
-  auto processContext = std::make_shared<core::ProcessContext>(processor_node,
-                                                               repo_);
+  auto processContext = std::make_shared<core::ProcessContext>(
+      processor_node, controller_service_provider_, repo_);
   auto sessionFactory = std::make_shared<core::ProcessSessionFactory>(
       processContext.get());
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/src/TimerDrivenSchedulingAgent.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/TimerDrivenSchedulingAgent.cpp b/libminifi/src/TimerDrivenSchedulingAgent.cpp
index 8d10658..8610e64 100644
--- a/libminifi/src/TimerDrivenSchedulingAgent.cpp
+++ b/libminifi/src/TimerDrivenSchedulingAgent.cpp
@@ -40,10 +40,10 @@ void TimerDrivenSchedulingAgent::run(
       // Honor the yield
       std::this_thread::sleep_for(
           std::chrono::milliseconds(processor->getYieldTime()));
-    } else if (shouldYield && this->_boredYieldDuration > 0) {
+    } else if (shouldYield && this->bored_yield_duration_ > 0) {
       // No work to do or need to apply back pressure
       std::this_thread::sleep_for(
-          std::chrono::milliseconds(this->_boredYieldDuration));
+          std::chrono::milliseconds(this->bored_yield_duration_));
     }
     std::this_thread::sleep_for(
         std::chrono::nanoseconds(processor->getSchedulingPeriodNano()));


Mime
View raw message