nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From b...@apache.org
Subject nifi-minifi-cpp git commit: MINIFICPP-374: Commit Linux power management service that enables the threadpools to reduce the number of threads and throttle active tasks using linux powr constructs.
Date Thu, 18 Jan 2018 16:54:16 GMT
Repository: nifi-minifi-cpp
Updated Branches:
  refs/heads/master 7f252d2a9 -> 254877fa5


MINIFICPP-374: Commit Linux power management service that enables the threadpools to reduce the number of threads
and throttle active tasks using linux powr constructs.

MINIFICPP-374: Add readme entry

This closes #242.

Signed-off-by: Bin Qiu <benqiu2016@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/254877fa
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/254877fa
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/254877fa

Branch: refs/heads/master
Commit: 254877fa51bb983b94c07e68badc6d952b47bdb7
Parents: 7f252d2
Author: Marc Parisi <phrocker@apache.org>
Authored: Sat Jan 13 12:19:11 2018 -0500
Committer: Bin Qiu <benqiu2016@gmail.com>
Committed: Thu Jan 18 08:53:24 2018 -0800

----------------------------------------------------------------------
 README.md                                       |  15 ++
 libminifi/include/FlowController.h              |   6 +-
 libminifi/include/SchedulingAgent.h             |  77 ++++++-
 libminifi/include/ThreadedSchedulingAgent.h     |  46 -----
 .../controllers/LinuxPowerManagementService.h   | 147 ++++++++++++++
 .../controllers/ThreadManagementService.h       | 128 ++++++++++++
 .../core/controller/ControllerServiceProvider.h |   4 +-
 .../StandardControllerServiceProvider.h         |  19 +-
 .../include/sitetosite/SiteToSiteFactory.h      |   3 +-
 libminifi/include/utils/ThreadPool.h            | 175 +++++++++++++---
 libminifi/src/FlowController.cpp                |  27 ++-
 libminifi/src/SchedulingAgent.cpp               |  35 ++--
 libminifi/src/capi/api.cpp                      |   1 -
 .../controllers/LinuxPowerManagementService.cpp | 202 +++++++++++++++++++
 libminifi/src/utils/ByteArrayCallback.cpp       |   2 -
 main/CMakeLists.txt                             |   8 +-
 16 files changed, 773 insertions(+), 122 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/254877fa/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index 1682017..2c6fd41 100644
--- a/README.md
+++ b/README.md
@@ -609,6 +609,21 @@ Additionally, a unique hexadecimal uid.minifi.device.segment should be assigned
 	  	class: ControllerServiceClass
 	  	Properties:
 
+### Linux Power Manager Controller Service
+  The linux power manager controller service can be configured to monitor the battery level and status ( discharging or charging ) via the following configuration.
+  Simply provide the capacity path and status path along with your threshold for the trigger and low battery alarm and you can monitor your battery and throttle
+  the threadpools within MiNiFi C++. Note that the name is identified must be ThreadPoolManager.
+
+   Controller Services:
+    - name: ThreadPoolManager
+      id: 2438e3c8-015a-1000-79ca-83af40ec1888
+      class: LinuxPowerManagerService
+      Properties:
+          Battery Capacity Path: /path/to/battery/capacity
+          Battery Status Path: /path/to/battery/status
+          Trigger Threshold: 90
+          Low Battery Threshold: 50
+          Wait Period: 500 ms
 
 ### Running
 After completing a [build](#building), the application can be run by issuing the following from :

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/254877fa/libminifi/include/FlowController.h
----------------------------------------------------------------------
diff --git a/libminifi/include/FlowController.h b/libminifi/include/FlowController.h
index 6957c4e..2087f81 100644
--- a/libminifi/include/FlowController.h
+++ b/libminifi/include/FlowController.h
@@ -217,7 +217,7 @@ class FlowController : public core::controller::ControllerServiceProvider, publi
    * Enables the controller service services
    * @param serviceNode service node which will be disabled, along with linked services.
    */
-  virtual std::future<bool> enableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode);
+  virtual std::future<uint64_t> enableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode);
 
   /**
    * Enables controller services
@@ -229,13 +229,15 @@ class FlowController : public core::controller::ControllerServiceProvider, publi
    * Disables controller services
    * @param serviceNode service node which will be disabled, along with linked services.
    */
-  virtual std::future<bool> disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode);
+  virtual std::future<uint64_t> disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode);
 
   /**
    * Gets all controller services.
    */
   virtual std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> getAllControllerServices();
 
+  virtual std::shared_ptr<core::controller::ControllerService> getControllerService(const std::string &identifier);
+
   /**
    * Gets controller service node specified by <code>id</code>
    * @param id service identifier

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/254877fa/libminifi/include/SchedulingAgent.h
----------------------------------------------------------------------
diff --git a/libminifi/include/SchedulingAgent.h b/libminifi/include/SchedulingAgent.h
index 71aceb8..682f6ec 100644
--- a/libminifi/include/SchedulingAgent.h
+++ b/libminifi/include/SchedulingAgent.h
@@ -44,6 +44,67 @@ namespace apache {
 namespace nifi {
 namespace minifi {
 
+/**
+ * Uses the wait time for a given worker to determine if it is eligible to run
+ */
+class TimerAwareMonitor : public utils::AfterExecute<uint64_t> {
+ public:
+  TimerAwareMonitor(std::atomic<bool> *run_monitor)
+      : current_wait_(0),
+        run_monitor_(run_monitor) {
+  }
+  explicit TimerAwareMonitor(TimerAwareMonitor &&other)
+      : AfterExecute(std::move(other)),
+        run_monitor_(std::move(other.run_monitor_)) {
+    current_wait_.store(other.current_wait_.load());
+  }
+  virtual bool isFinished(const uint64_t &result) {
+    current_wait_.store(result);
+    if (*run_monitor_) {
+      return false;
+    }
+    return true;
+  }
+  virtual bool isCancelled(const uint64_t &result) {
+    if (*run_monitor_) {
+      return false;
+    }
+    return true;
+  }
+  /**
+   * Time to wait before re-running this task if necessary
+   * @return milliseconds since epoch after which we are eligible to re-run this task.
+   */
+  virtual int64_t wait_time() {
+    return current_wait_.load();
+  }
+ protected:
+
+  std::atomic<uint64_t> current_wait_;
+  std::atomic<bool> *run_monitor_;
+};
+
+class SingleRunMonitor : public TimerAwareMonitor {
+ public:
+  SingleRunMonitor(std::atomic<bool> *run_monitor)
+      : TimerAwareMonitor(run_monitor) {
+  }
+  explicit SingleRunMonitor(TimerAwareMonitor &&other)
+      : TimerAwareMonitor(std::move(other)){
+  }
+  virtual bool isFinished(const uint64_t &result) {
+    if (result == 0) {
+      return true;
+    } else {
+      current_wait_.store(result);
+      if (*run_monitor_) {
+        return false;
+      }
+      return true;
+    }
+  }
+};
+
 // SchedulingAgent Class
 class SchedulingAgent {
  public:
@@ -62,9 +123,9 @@ class SchedulingAgent {
     running_ = false;
     repo_ = repo;
     flow_repo_ = flow_repo;
-    utils::ThreadPool<bool> pool = utils::ThreadPool<bool>(configure_->getInt(Configure::nifi_flow_engine_threads, 2), true);
-    component_lifecycle_thread_pool_ = std::move(pool);
-    component_lifecycle_thread_pool_.start();
+    auto pool = utils::ThreadPool<uint64_t>(configure_->getInt(Configure::nifi_flow_engine_threads, 2), true, controller_service_provider);
+    thread_pool_ = std::move(pool);
+    thread_pool_.start();
   }
   // Destructor
   virtual ~SchedulingAgent() {
@@ -79,17 +140,17 @@ class SchedulingAgent {
   // start
   void start() {
     running_ = true;
-    component_lifecycle_thread_pool_.start();
+    thread_pool_.start();
   }
   // stop
   virtual void stop() {
     running_ = false;
-    component_lifecycle_thread_pool_.shutdown();
+    thread_pool_.shutdown();
   }
 
  public:
-  virtual std::future<bool> enableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode);
-  virtual std::future<bool> disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode);
+  virtual std::future<uint64_t> enableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode);
+  virtual std::future<uint64_t> disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode);
   // schedule, overwritten by different DrivenSchedulingAgent
   virtual void schedule(std::shared_ptr<core::Processor> processor) = 0;
   // unschedule, overwritten by different DrivenSchedulingAgent
@@ -115,7 +176,7 @@ class SchedulingAgent {
 
   std::shared_ptr<core::ContentRepository> content_repo_;
   // thread pool for components.
-  utils::ThreadPool<bool> component_lifecycle_thread_pool_;
+  utils::ThreadPool<uint64_t> thread_pool_;
   // controller service provider reference
   std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider_;
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/254877fa/libminifi/include/ThreadedSchedulingAgent.h
----------------------------------------------------------------------
diff --git a/libminifi/include/ThreadedSchedulingAgent.h b/libminifi/include/ThreadedSchedulingAgent.h
index b01d740..ba0998a 100644
--- a/libminifi/include/ThreadedSchedulingAgent.h
+++ b/libminifi/include/ThreadedSchedulingAgent.h
@@ -33,45 +33,6 @@ namespace apache {
 namespace nifi {
 namespace minifi {
 
-/**
- * Uses the wait time for a given worker to determine if it is eligible to run
- */
-class TimerAwareMonitor : public utils::AfterExecute<uint64_t> {
- public:
-  TimerAwareMonitor(std::atomic<bool> *run_monitor)
-      : current_wait_(0),
-        run_monitor_(run_monitor) {
-  }
-  explicit TimerAwareMonitor(TimerAwareMonitor &&other)
-      : AfterExecute(std::move(other)),
-        run_monitor_(std::move(other.run_monitor_)) {
-    current_wait_.store(other.current_wait_.load());
-  }
-  virtual bool isFinished(const uint64_t &result) {
-    current_wait_.store(result);
-    if (*run_monitor_) {
-      return false;
-    }
-    return true;
-  }
-  virtual bool isCancelled(const uint64_t &result) {
-    if (*run_monitor_) {
-      return false;
-    }
-    return true;
-  }
-  /**
-   * Time to wait before re-running this task if necessary
-   * @return milliseconds since epoch after which we are eligible to re-run this task.
-   */
-  virtual int64_t wait_time() {
-    return current_wait_.load();
-  }
- private:
-
-  std::atomic<uint64_t> current_wait_;
-  std::atomic<bool> *run_monitor_;
-};
 
 /**
  * An abstract scheduling agent which creates and manages a pool of threads for
@@ -87,11 +48,6 @@ class ThreadedSchedulingAgent : public SchedulingAgent {
                           std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<Configure> configuration)
       : SchedulingAgent(controller_service_provider, repo, flow_repo, content_repo, configuration),
         logger_(logging::LoggerFactory<ThreadedSchedulingAgent>::getLogger()) {
-
-    utils::ThreadPool<uint64_t> pool = utils::ThreadPool<uint64_t>(configure_->getInt(Configure::nifi_flow_engine_threads, 2), true);
-    thread_pool_ = std::move(pool);
-    thread_pool_.start();
-
   }
   // Destructor
   virtual ~ThreadedSchedulingAgent() {
@@ -108,8 +64,6 @@ class ThreadedSchedulingAgent : public SchedulingAgent {
   virtual void unschedule(std::shared_ptr<core::Processor> processor);
 
   virtual void stop();
- protected:
-  utils::ThreadPool<uint64_t> thread_pool_;
 
  protected:
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/254877fa/libminifi/include/controllers/LinuxPowerManagementService.h
----------------------------------------------------------------------
diff --git a/libminifi/include/controllers/LinuxPowerManagementService.h b/libminifi/include/controllers/LinuxPowerManagementService.h
new file mode 100644
index 0000000..2b8bf0d
--- /dev/null
+++ b/libminifi/include/controllers/LinuxPowerManagementService.h
@@ -0,0 +1,147 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CONTROLLERS_LINUXPOWERMANAGEMENTSERVICE_H_
+#define LIBMINIFI_INCLUDE_CONTROLLERS_LINUXPOWERMANAGEMENTSERVICE_H_
+
+#include <iostream>
+#include <memory>
+#include "core/Resource.h"
+#include "utils/StringUtils.h"
+#include "io/validation.h"
+#include "core/controller/ControllerService.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "ThreadManagementService.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace controllers {
+
+/**
+ * Purpose: Linux power management service uses a path for the battery level
+ * and the status ( charging/discharging )
+ */
+class LinuxPowerManagerService : public ThreadManagementService {
+ public:
+  explicit LinuxPowerManagerService(const std::string &name, const std::string &id)
+      : ThreadManagementService(name, id),
+        enabled_(false),
+        battery_level_(0),
+        wait_period_(0),
+        last_time_(0),
+        trigger_(0),
+        low_battery_trigger_(0),
+        logger_(logging::LoggerFactory<LinuxPowerManagerService>::getLogger()) {
+  }
+
+  explicit LinuxPowerManagerService(const std::string &name, uuid_t uuid = 0)
+      : ThreadManagementService(name, uuid),
+        enabled_(false),
+        battery_level_(0),
+        wait_period_(0),
+        last_time_(0),
+        trigger_(0),
+        low_battery_trigger_(0),
+        logger_(logging::LoggerFactory<LinuxPowerManagerService>::getLogger()) {
+  }
+
+  explicit LinuxPowerManagerService(const std::string &name, const std::shared_ptr<Configure> &configuration)
+      : LinuxPowerManagerService(name, nullptr) {
+    setConfiguration(configuration);
+    initialize();
+  }
+
+  static core::Property BatteryCapacityPath;
+  static core::Property BatteryStatusPath;
+  static core::Property BatteryStatusDischargeKeyword;
+  static core::Property TriggerThreshold;
+  static core::Property LowBatteryThreshold;
+  static core::Property WaitPeriod;
+
+  /**
+   * Helps to determine if the number of tasks will increase the pools above their threshold.
+   * @param new_tasks tasks to be added.
+   * @return true if above max, false otherwise.
+   */
+  virtual bool isAboveMax(const int new_tasks);
+
+  /**
+   * Returns the max number of threads allowed by all pools
+   * @return max threads.
+   */
+  virtual uint16_t getMaxThreads();
+
+  /**
+   * Function based on cooperative multitasking that will tell a caller whether or not the number of threads should be reduced.
+   * @return true if threading impacts QOS.
+   */
+  virtual bool shouldReduce();
+
+  /**
+   * Function to indicate to this controller service that we've reduced threads in a threadpool
+   */
+  virtual void reduce();
+
+  /**
+   * Function to help callers identify if they can increase threads.
+   * @return true if QOS won't be breached.
+   */
+  virtual bool canIncrease();
+
+  void initialize();
+
+  void yield();
+
+  bool isRunning();
+
+  bool isWorkAvailable();
+
+  virtual void onEnable();
+
+ protected:
+
+  std::vector<std::pair<std::string, std::string>> paths_;
+
+  bool enabled_;
+
+  std::atomic<int> battery_level_;
+
+  std::atomic<uint64_t> wait_period_;
+
+  std::atomic<uint64_t> last_time_;
+
+  int trigger_;
+
+  int low_battery_trigger_;
+
+  std::string status_keyword_;
+
+ private:
+  std::shared_ptr<logging::Logger> logger_;
+};
+
+REGISTER_RESOURCE(LinuxPowerManagerService);
+
+} /* namespace controllers */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_CONTROLLERS_LINUXPOWERMANAGEMENTSERVICE_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/254877fa/libminifi/include/controllers/ThreadManagementService.h
----------------------------------------------------------------------
diff --git a/libminifi/include/controllers/ThreadManagementService.h b/libminifi/include/controllers/ThreadManagementService.h
new file mode 100644
index 0000000..b297834
--- /dev/null
+++ b/libminifi/include/controllers/ThreadManagementService.h
@@ -0,0 +1,128 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CONTROLLERS_THREADMANAGEMENTSERVICE_H_
+#define LIBMINIFI_INCLUDE_CONTROLLERS_THREADMANAGEMENTSERVICE_H_
+#include <iostream>
+#include <memory>
+#include "core/Resource.h"
+#include "utils/StringUtils.h"
+#include "io/validation.h"
+#include "core/controller/ControllerService.h"
+#include "core/logging/LoggerConfiguration.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace controllers {
+
+/**
+ * Purpose: Thread management service provides a contextual awareness across
+ * thread pools that enables us to deliver QOS to an agent.
+ */
+class ThreadManagementService : public core::controller::ControllerService {
+ public:
+  explicit ThreadManagementService(const std::string &name, const std::string &id)
+      : ControllerService(name, id),
+        logger_(logging::LoggerFactory<ThreadManagementService>::getLogger()) {
+  }
+
+  explicit ThreadManagementService(const std::string &name, uuid_t uuid = 0)
+      : ControllerService(name, uuid),
+        logger_(logging::LoggerFactory<ThreadManagementService>::getLogger()) {
+  }
+
+  explicit ThreadManagementService(const std::string &name, const std::shared_ptr<Configure> &configuration)
+      : ControllerService(name, nullptr),
+        logger_(logging::LoggerFactory<ThreadManagementService>::getLogger()) {
+
+  }
+
+  /**
+   * Helps to determine if the number of tasks will increase the pools above their threshold.
+   * @param new_tasks tasks to be added.
+   * @return true if above max, false otherwise.
+   */
+  virtual bool isAboveMax(const int new_tasks) = 0;
+
+  /**
+   * Returns the max number of threads allowed by all pools
+   * @return max threads.
+   */
+  virtual uint16_t getMaxThreads() = 0;
+
+  /**
+   * Function based on cooperative multitasking that will tell a caller whether or not the number of threads should be reduced.
+   * @return true if threading impacts QOS.
+   */
+  virtual bool shouldReduce() = 0;
+
+  /**
+   * Function to indicate to this controller service that we've reduced threads in a threadpool
+   */
+  virtual void reduce() = 0;
+
+  /**
+   * Registration function to tabulate total threads.
+   * @param threads threads from a thread pool.
+   */
+  virtual void registerThreadCount(const int threads) {
+    thread_count_ += threads;
+  }
+
+  /**
+   * Function to help callers identify if they can increase threads.
+   * @return true if QOS won't be breached.
+   */
+  virtual bool canIncrease() = 0;
+
+  virtual void initialize() {
+    ControllerService::initialize();
+  }
+
+  void yield() {
+
+  }
+
+  bool isRunning() {
+    return getState() == core::controller::ControllerServiceState::ENABLED;
+  }
+
+  bool isWorkAvailable() {
+    return false;
+  }
+
+  virtual void onEnable() {
+
+  }
+
+ protected:
+
+  std::atomic<int> thread_count_;
+
+ private:
+  std::shared_ptr<logging::Logger> logger_;
+};
+
+} /* namespace controllers */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_CONTROLLERS_THREADMANAGEMENTSERVICE_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/254877fa/libminifi/include/core/controller/ControllerServiceProvider.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/controller/ControllerServiceProvider.h b/libminifi/include/core/controller/ControllerServiceProvider.h
index bf02080..0499d35 100644
--- a/libminifi/include/core/controller/ControllerServiceProvider.h
+++ b/libminifi/include/core/controller/ControllerServiceProvider.h
@@ -96,7 +96,7 @@ class ControllerServiceProvider : public CoreComponent, public ConfigurableCompo
    * Enables the provided controller service
    * @param serviceNode controller service node.
    */
-  virtual std::future<bool> enableControllerService(std::shared_ptr<ControllerServiceNode> &serviceNode) = 0;
+  virtual std::future<uint64_t> enableControllerService(std::shared_ptr<ControllerServiceNode> &serviceNode) = 0;
 
   /**
    * Enables the provided controller service nodes
@@ -108,7 +108,7 @@ class ControllerServiceProvider : public CoreComponent, public ConfigurableCompo
    * Disables the provided controller service node
    * @param serviceNode controller service node.
    */
-  virtual std::future<bool> disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) = 0;
+  virtual std::future<uint64_t> disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) = 0;
 
   /**
    * Gets a list of all controller services.

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/254877fa/libminifi/include/core/controller/StandardControllerServiceProvider.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/controller/StandardControllerServiceProvider.h b/libminifi/include/core/controller/StandardControllerServiceProvider.h
index ff75488..a6ca684 100644
--- a/libminifi/include/core/controller/StandardControllerServiceProvider.h
+++ b/libminifi/include/core/controller/StandardControllerServiceProvider.h
@@ -79,8 +79,7 @@ class StandardControllerServiceProvider : public ControllerServiceProvider, publ
     agent_ = agent;
   }
 
-  std::shared_ptr<ControllerServiceNode> createControllerService(const std::string &type, const std::string &id,
-  bool firstTimeAdded) {
+  std::shared_ptr<ControllerServiceNode> createControllerService(const std::string &type, const std::string &id, bool firstTimeAdded) {
 
     std::shared_ptr<ControllerService> new_controller_service = extension_loader_.instantiate<ControllerService>(type, id);
 
@@ -97,11 +96,15 @@ class StandardControllerServiceProvider : public ControllerServiceProvider, publ
 
   }
 
-  std::future<bool> enableControllerService(std::shared_ptr<ControllerServiceNode> &serviceNode) {
+  std::future<uint64_t> enableControllerService(std::shared_ptr<ControllerServiceNode> &serviceNode) {
     if (serviceNode->canEnable()) {
       return agent_->enableControllerService(serviceNode);
     } else {
-      std::future<bool> no_run = std::async(std::launch::async, []() {return false;});
+
+      std::future<uint64_t> no_run = std::async(std::launch::async, []() {
+        uint64_t ret = 0;
+        return ret;
+      });
       return no_run;
     }
   }
@@ -125,11 +128,14 @@ class StandardControllerServiceProvider : public ControllerServiceProvider, publ
     }
   }
 
-  std::future<bool> disableControllerService(std::shared_ptr<ControllerServiceNode> &serviceNode) {
+  std::future<uint64_t> disableControllerService(std::shared_ptr<ControllerServiceNode> &serviceNode) {
     if (!IsNullOrEmpty(serviceNode.get()) && serviceNode->enabled()) {
       return agent_->disableControllerService(serviceNode);
     } else {
-      std::future<bool> no_run = std::async(std::launch::async, []() {return false;});
+      std::future<uint64_t> no_run = std::async(std::launch::async, []() {
+        uint64_t ret = 0;
+        return ret;
+      });
       return no_run;
     }
   }
@@ -193,7 +199,6 @@ class StandardControllerServiceProvider : public ControllerServiceProvider, publ
 
   std::shared_ptr<Configure> configuration_;
 
-
  private:
   std::shared_ptr<logging::Logger> logger_;
 };

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/254877fa/libminifi/include/sitetosite/SiteToSiteFactory.h
----------------------------------------------------------------------
diff --git a/libminifi/include/sitetosite/SiteToSiteFactory.h b/libminifi/include/sitetosite/SiteToSiteFactory.h
index 648347d..35e12b9 100644
--- a/libminifi/include/sitetosite/SiteToSiteFactory.h
+++ b/libminifi/include/sitetosite/SiteToSiteFactory.h
@@ -72,8 +72,7 @@ static std::unique_ptr<SiteToSiteClient> createClient(const SiteToSiteClientConf
         auto ptr = std::unique_ptr<SiteToSiteClient>(static_cast<SiteToSiteClient*>(http_protocol));
         auto peer = std::unique_ptr<SiteToSitePeer>(new SiteToSitePeer(client_configuration.getPeer()->getHost(), client_configuration.getPeer()->getPort()));
         char idStr[37];
-            uuid_unparse_lower(uuid, idStr);
-            std::cout << "sending " << idStr << std::endl;
+        uuid_unparse_lower(uuid, idStr);
         ptr->setPortId(uuid);
         ptr->setPeer(std::move(peer));
         return ptr;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/254877fa/libminifi/include/utils/ThreadPool.h
----------------------------------------------------------------------
diff --git a/libminifi/include/utils/ThreadPool.h b/libminifi/include/utils/ThreadPool.h
index 4f80829..f04e319 100644
--- a/libminifi/include/utils/ThreadPool.h
+++ b/libminifi/include/utils/ThreadPool.h
@@ -26,7 +26,13 @@
 #include <queue>
 #include <future>
 #include <thread>
+#include <functional>
+
+#include "capi/expect.h"
+#include "controllers/ThreadManagementService.h"
 #include "concurrentqueue.h"
+#include "core/controller/ControllerService.h"
+#include "core/controller/ControllerServiceProvider.h"
 namespace org {
 namespace apache {
 namespace nifi {
@@ -181,6 +187,21 @@ std::shared_ptr<std::promise<T>> Worker<T>::getPromise() {
   return promise;
 }
 
+class WorkerThread {
+ public:
+  explicit WorkerThread(std::thread thread)
+      : is_running_(false),
+        thread_(std::move(thread)) {
+
+  }
+  WorkerThread()
+      : is_running_(false) {
+
+  }
+  std::atomic<bool> is_running_;
+  std::thread thread_;
+};
+
 /**
  * Thread pool
  * Purpose: Provides a thread pool with basic functionality similar to
@@ -191,20 +212,29 @@ template<typename T>
 class ThreadPool {
  public:
 
-  ThreadPool(int max_worker_threads = 2, bool daemon_threads = false)
+  ThreadPool(int max_worker_threads = 2, bool daemon_threads = false, const std::shared_ptr<core::controller::ControllerServiceProvider> &controller_service_provider = nullptr)
       : daemon_threads_(daemon_threads),
+        thread_reduction_count_(0),
         max_worker_threads_(max_worker_threads),
-        running_(false) {
+        adjust_threads_(false),
+        running_(false),
+        controller_service_provider_(controller_service_provider) {
     current_workers_ = 0;
+    thread_manager_ = nullptr;
   }
 
   ThreadPool(const ThreadPool<T> &&other)
       : daemon_threads_(std::move(other.daemon_threads_)),
+        thread_reduction_count_(0),
         max_worker_threads_(std::move(other.max_worker_threads_)),
-        running_(false) {
+        adjust_threads_(false),
+        running_(false),
+        controller_service_provider_(std::move(other.controller_service_provider_)),
+        thread_manager_(std::move(other.thread_manager_)) {
     current_workers_ = 0;
   }
-  virtual ~ThreadPool() {
+
+  ~ThreadPool() {
     shutdown();
   }
 
@@ -270,10 +300,16 @@ class ThreadPool {
     max_worker_threads_ = std::move(other.max_worker_threads_);
     daemon_threads_ = std::move(other.daemon_threads_);
     current_workers_ = 0;
+    thread_reduction_count_ = 0;
 
     thread_queue_ = std::move(other.thread_queue_);
     worker_queue_ = std::move(other.worker_queue_);
 
+    controller_service_provider_ = std::move(other.controller_service_provider_);
+    thread_manager_ = std::move(other.thread_manager_);
+
+    adjust_threads_ = false;
+
     if (!running_) {
       start();
     }
@@ -282,6 +318,12 @@ class ThreadPool {
 
  protected:
 
+  std::thread createThread(std::function<void()> &&functor) {
+    return std::thread([ functor ]() mutable {
+      functor();
+    });
+  }
+
   /**
    * Drain will notify tasks to stop following notification
    */
@@ -292,37 +334,51 @@ class ThreadPool {
   }
 // determines if threads are detached
   bool daemon_threads_;
-  // max worker threads
+  std::atomic<int> thread_reduction_count_;
+// max worker threads
   int max_worker_threads_;
-  // current worker tasks.
+// current worker tasks.
   std::atomic<int> current_workers_;
-  // thread queue
-  std::vector<std::thread> thread_queue_;
-  // manager thread
+// thread queue
+  std::vector<std::shared_ptr<WorkerThread>> thread_queue_;
+// manager thread
   std::thread manager_thread_;
-  // atomic running boolean
+// conditional that's used to adjust the threads
+  std::atomic<bool> adjust_threads_;
+// atomic running boolean
   std::atomic<bool> running_;
-  // worker queue of worker objects
+// controller service provider
+  std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider_;
+// integrated power manager
+  std::shared_ptr<controllers::ThreadManagementService> thread_manager_;
+  // thread queue for the recently deceased threads.
+  moodycamel::ConcurrentQueue<std::shared_ptr<WorkerThread>> deceased_thread_queue_;
+// worker queue of worker objects
   moodycamel::ConcurrentQueue<Worker<T>> worker_queue_;
   std::priority_queue<Worker<T>, std::vector<Worker<T>>, WorkerComparator<T>> worker_priority_queue_;
-  // notification for available work
+// notification for available work
   std::condition_variable tasks_available_;
-  // map to identify if a task should be
+// map to identify if a task should be
   std::map<std::string, bool> task_status_;
-  // manager mutex
+// manager mutex
   std::recursive_mutex manager_mutex_;
-  // work queue mutex
+// work queue mutex
   std::mutex worker_queue_mutex_;
 
   /**
    * Call for the manager to start worker threads
    */
-  void startWorkers();
+  void manageWorkers();
+
+  /**
+   * Function to adjust the workers up and down.
+   */
+  void adjustWorkers(int count);
 
   /**
    * Runs worker tasks
    */
-  void run_tasks();
+  void run_tasks(std::shared_ptr<WorkerThread> thread);
 };
 
 template<typename T>
@@ -341,32 +397,79 @@ bool ThreadPool<T>::execute(Worker<T> &&task, std::future<T> &future) {
 }
 
 template<typename T>
-void ThreadPool<T>::startWorkers() {
+void ThreadPool<T>::manageWorkers() {
   for (int i = 0; i < max_worker_threads_; i++) {
-    thread_queue_.push_back(std::move(std::thread(&ThreadPool::run_tasks, this)));
+    auto worker_thread = std::make_shared<WorkerThread>();
+    worker_thread->thread_ = createThread(std::bind(&ThreadPool::run_tasks, this, worker_thread));
+    thread_queue_.push_back(worker_thread);
     current_workers_++;
   }
 
   if (daemon_threads_) {
     for (auto &thread : thread_queue_) {
-      thread.detach();
+      thread->thread_.detach();
     }
   }
-  for (auto &thread : thread_queue_) {
-    if (thread.joinable())
-      thread.join();
+
+// likely don't have a thread manager
+  if (LIKELY(nullptr != thread_manager_)) {
+    while (running_) {
+      auto waitperiod = std::chrono::milliseconds(1) * 500;
+      {
+        if (thread_manager_->isAboveMax(current_workers_)) {
+          auto max = thread_manager_->getMaxConcurrentTasks();
+          auto differential = current_workers_ - max;
+          thread_reduction_count_ += differential;
+        } else if (thread_manager_->shouldReduce()) {
+          if (current_workers_ > 1)
+            thread_reduction_count_++;
+          thread_manager_->reduce();
+        } else if (thread_manager_->canIncrease() && max_worker_threads_ - current_workers_ > 0) {  // increase slowly
+          std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+          auto worker_thread = std::make_shared<WorkerThread>();
+          worker_thread->thread_ = createThread(std::bind(&ThreadPool::run_tasks, this, worker_thread));
+          thread_queue_.push_back(worker_thread);
+          current_workers_++;
+        }
+      }
+      {
+        std::shared_ptr<WorkerThread> thread_ref;
+        while (deceased_thread_queue_.try_dequeue(thread_ref)) {
+          std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+          if (thread_ref->thread_.joinable())
+            thread_ref->thread_.join();
+          thread_queue_.erase(std::remove(thread_queue_.begin(), thread_queue_.end(), thread_ref), thread_queue_.end());
+        }
+      }
+      std::this_thread::sleep_for(waitperiod);
+    }
+  } else {
+    for (auto &thread : thread_queue_) {
+      if (thread->thread_.joinable())
+        thread->thread_.join();
+    }
   }
 }
 template<typename T>
-void ThreadPool<T>::run_tasks() {
+void ThreadPool<T>::run_tasks(std::shared_ptr<WorkerThread> thread) {
   auto waitperiod = std::chrono::milliseconds(1) * 100;
   uint64_t wait_decay_ = 0;
+  uint64_t yield_backoff = 10;  // start at 10 ms
   while (running_.load()) {
 
+    if (UNLIKELY(thread_reduction_count_ > 0)) {
+      if (--thread_reduction_count_ >= 0) {
+        deceased_thread_queue_.enqueue(thread);
+        thread->is_running_ = false;
+        break;
+      } else {
+        thread_reduction_count_++;
+      }
+    }
     // if we exceed 500ms of wait due to not being able to run any tasks and there are tasks available, meaning
     // they are eligible to run per the fact that the thread pool isn't shut down and the tasks are in a runnable state
     // BUT they've been continually timesliced, we will lower the wait decay to 100ms and continue incrementing from
-    // there. This ensures we don't have arbitrarily long sleep cycles. 
+    // there. This ensures we don't have arbitrarily long sleep cycles.
     if (wait_decay_ > 500000000L) {
       wait_decay_ = 100000000L;
     }
@@ -378,6 +481,17 @@ void ThreadPool<T>::run_tasks() {
     if (wait_decay_ > 2000) {
       std::this_thread::sleep_for(std::chrono::nanoseconds(wait_decay_));
     }
+
+    if (current_workers_ < max_worker_threads_) {
+      // we are in a reduced state. due to thread management
+      // let's institute a backoff up to 500ms
+      if (yield_backoff < 500) {
+        yield_backoff += 10;
+      }
+      std::this_thread::sleep_for(std::chrono::milliseconds(yield_backoff));
+    } else {
+      yield_backoff = 10;
+    }
     Worker<T> task;
     if (!worker_queue_.try_dequeue(task)) {
       std::unique_lock<std::mutex> lock(worker_queue_mutex_);
@@ -407,7 +521,7 @@ void ThreadPool<T>::run_tasks() {
       auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(now).count();
       // if our differential is < 10% of the wait time we will not put the task into a wait state
       // since requeuing will break the time slice contract.
-      if ((double)task.getTimeSlice() > ms && ((double)(task.getTimeSlice() - ms)) > (wt * .10)) {
+      if ((double) task.getTimeSlice() > ms && ((double) (task.getTimeSlice() - ms)) > (wt * .10)) {
         wait_to_run = true;
       }
     }
@@ -442,14 +556,19 @@ void ThreadPool<T>::run_tasks() {
     }
   }
   current_workers_--;
-
 }
 template<typename T>
 void ThreadPool<T>::start() {
+  if (nullptr != controller_service_provider_) {
+    auto thread_man = controller_service_provider_->getControllerService("ThreadPoolManager");
+    thread_manager_ = thread_man != nullptr ? std::dynamic_pointer_cast<controllers::ThreadManagementService>(thread_man) : nullptr;
+  } else {
+    thread_manager_ = nullptr;
+  }
   std::lock_guard<std::recursive_mutex> lock(manager_mutex_);
   if (!running_) {
     running_ = true;
-    manager_thread_ = std::move(std::thread(&ThreadPool::startWorkers, this));
+    manager_thread_ = std::move(std::thread(&ThreadPool::manageWorkers, this));
     if (worker_queue_.size_approx() > 0) {
       tasks_available_.notify_all();
     }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/254877fa/libminifi/src/FlowController.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index 03452b6..4d2e1b7 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -252,8 +252,16 @@ void FlowController::load() {
     stop(true);
   }
   if (!initialized_) {
+    logger_->log_info("Load Flow Controller from file %s", configuration_filename_.c_str());
+
+    this->root_ = std::shared_ptr<core::ProcessGroup>(flow_configuration_->getRoot(configuration_filename_));
+
+    logger_->log_info("Loaded root processor Group");
+
     logger_->log_info("Initializing timers");
 
+    controller_service_provider_ = flow_configuration_->getControllerServiceProvider();
+
     if (nullptr == timer_scheduler_) {
       timer_scheduler_ = std::make_shared<TimerDrivenSchedulingAgent>(
           std::static_pointer_cast<core::controller::ControllerServiceProvider>(std::dynamic_pointer_cast<FlowController>(shared_from_this())), provenance_repo_, flow_file_repo_, content_repo_,
@@ -264,13 +272,6 @@ void FlowController::load() {
           std::static_pointer_cast<core::controller::ControllerServiceProvider>(std::dynamic_pointer_cast<FlowController>(shared_from_this())), provenance_repo_, flow_file_repo_, content_repo_,
           configuration_);
     }
-    logger_->log_info("Load Flow Controller from file %s", configuration_filename_.c_str());
-
-    this->root_ = std::shared_ptr<core::ProcessGroup>(flow_configuration_->getRoot(configuration_filename_));
-
-    logger_->log_info("Loaded root processor Group");
-
-    controller_service_provider_ = flow_configuration_->getControllerServiceProvider();
 
     std::static_pointer_cast<core::controller::StandardControllerServiceProvider>(controller_service_provider_)->setRootGroup(root_);
     std::static_pointer_cast<core::controller::StandardControllerServiceProvider>(controller_service_provider_)->setSchedulingAgent(
@@ -496,7 +497,7 @@ void FlowController::removeControllerService(const std::shared_ptr<core::control
  * Enables the controller service services
  * @param serviceNode service node which will be disabled, along with linked services.
  */
-std::future<bool> FlowController::enableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
+std::future<uint64_t> FlowController::enableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
   return controller_service_provider_->enableControllerService(serviceNode);
 }
 
@@ -511,7 +512,7 @@ void FlowController::enableControllerServices(std::vector<std::shared_ptr<core::
  * Disables controller services
  * @param serviceNode service node which will be disabled, along with linked services.
  */
-std::future<bool> FlowController::disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
+std::future<uint64_t> FlowController::disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
   return controller_service_provider_->disableControllerService(serviceNode);
 }
 
@@ -523,6 +524,14 @@ std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> FlowContro
 }
 
 /**
+ * Gets the controller service for <code>identifier</code>
+ * @param identifier service identifier
+ * @return shared pointer to teh controller service implementation or nullptr if it does not exist.
+ */
+std::shared_ptr<core::controller::ControllerService> FlowController::getControllerService(const std::string &identifier) {
+  return controller_service_provider_->getControllerService(identifier);
+}
+/**
  * Gets controller service node specified by <code>id</code>
  * @param id service identifier
  * @return shared pointer to the controller service node or nullptr if it does not exist.

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/254877fa/libminifi/src/SchedulingAgent.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/SchedulingAgent.cpp b/libminifi/src/SchedulingAgent.cpp
index 2451a08..ca30316 100644
--- a/libminifi/src/SchedulingAgent.cpp
+++ b/libminifi/src/SchedulingAgent.cpp
@@ -39,34 +39,41 @@ bool SchedulingAgent::hasWorkToDo(std::shared_ptr<core::Processor> processor) {
     return false;
 }
 
-std::future<bool> SchedulingAgent::enableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
+std::future<uint64_t> SchedulingAgent::enableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
   logger_->log_info("Enabling CSN in SchedulingAgent %s", serviceNode->getName());
   // reference the enable function from serviceNode
-  std::function< bool()> f_ex = [serviceNode] {
-    return serviceNode->enable();
+  std::function< uint64_t()> f_ex = [serviceNode] {
+    serviceNode->enable();
+    return 0;
   };
-  // create a functor that will be submitted to the thread pool.
-  utils::Worker<bool> functor(f_ex, serviceNode->getUUIDStr());
+
+  // only need to run this once.
+  std::unique_ptr<SingleRunMonitor> monitor = std::unique_ptr<SingleRunMonitor>(new SingleRunMonitor(&running_));
+  utils::Worker<uint64_t> functor(f_ex, serviceNode->getUUIDStr(), std::move(monitor));
   // move the functor into the thread pool. While a future is returned
   // we aren't terribly concerned with the result.
-  std::future<bool> future;
-  component_lifecycle_thread_pool_.execute(std::move(functor), future);
+  std::future<uint64_t> future;
+  thread_pool_.execute(std::move(functor), future);
   future.wait();
   return future;
 }
 
-std::future<bool> SchedulingAgent::disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
+std::future<uint64_t> SchedulingAgent::disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
   logger_->log_info("Disabling CSN in SchedulingAgent %s", serviceNode->getName());
   // reference the disable function from serviceNode
-  std::function< bool()> f_ex = [serviceNode] {
-    return serviceNode->disable();
+  std::function< uint64_t()> f_ex = [serviceNode] {
+    serviceNode->disable();
+    return 0;
   };
-  // create a functor that will be submitted to the thread pool.
-  utils::Worker<bool> functor(f_ex, serviceNode->getUUIDStr());
+
+  // only need to run this once.
+  std::unique_ptr<SingleRunMonitor> monitor = std::unique_ptr<SingleRunMonitor>(new SingleRunMonitor(&running_));
+  utils::Worker<uint64_t> functor(f_ex, serviceNode->getUUIDStr(), std::move(monitor));
+
   // move the functor into the thread pool. While a future is returned
   // we aren't terribly concerned with the result.
-  std::future<bool> future;
-  component_lifecycle_thread_pool_.execute(std::move(functor), future);
+  std::future<uint64_t> future;
+  thread_pool_.execute(std::move(functor), future);
   future.wait();
   return future;
 }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/254877fa/libminifi/src/capi/api.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/capi/api.cpp b/libminifi/src/capi/api.cpp
index 0fffcb8..5e8f3d8 100644
--- a/libminifi/src/capi/api.cpp
+++ b/libminifi/src/capi/api.cpp
@@ -231,7 +231,6 @@ flow_file_record *get_next_flow_file(nifi_instance *instance, flow *flow) {
     claim->increaseFlowFileRecordOwnedCount();
     auto path = claim->getContentFullPath();
     auto ffr = create_flowfile(path.c_str());
-    std::cout << "dang created " << path << " " << ff->getSize() << std::endl;
     return ffr;
   } else {
     return nullptr;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/254877fa/libminifi/src/controllers/LinuxPowerManagementService.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/controllers/LinuxPowerManagementService.cpp b/libminifi/src/controllers/LinuxPowerManagementService.cpp
new file mode 100644
index 0000000..836c9d3
--- /dev/null
+++ b/libminifi/src/controllers/LinuxPowerManagementService.cpp
@@ -0,0 +1,202 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "controllers/LinuxPowerManagementService.h"
+#include <utility>
+#include <limits>
+#include <string>
+#include <vector>
+#include <set>
+#include "utils/StringUtils.h"
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace controllers {
+
+core::Property LinuxPowerManagerService::BatteryCapacityPath("Battery Capacity Path", "Path to the battery level");
+core::Property LinuxPowerManagerService::BatteryStatusPath("Battery Status Path", "Path to the battery status ( Discharging/Battery )");
+core::Property LinuxPowerManagerService::BatteryStatusDischargeKeyword("Battery Status Discharge", "Keyword to identify if battery is discharging");
+core::Property LinuxPowerManagerService::TriggerThreshold("Trigger Threshold", "Battery threshold before which we consider a slow reduction");
+core::Property LinuxPowerManagerService::LowBatteryThreshold("Low Battery Threshold", "Battery threshold before which we will aggressively reduce");
+core::Property LinuxPowerManagerService::WaitPeriod("Wait Period", "Decay between checking threshold and determining if a reduction is needed");
+
+bool LinuxPowerManagerService::isAboveMax(int new_tasks) {
+  return false;
+}
+
+uint16_t LinuxPowerManagerService::getMaxThreads() {
+  return std::numeric_limits<uint16_t>::max();
+}
+
+bool LinuxPowerManagerService::canIncrease() {
+  for (const auto path_pair : paths_) {
+    auto capacity = path_pair.first;
+    auto status = path_pair.second;
+
+    std::ifstream status_file(status);
+    std::string status_str;
+    std::getline(status_file, status_str);
+    status_file.close();
+
+    if (!utils::StringUtils::equalsIgnoreCase(status_keyword_, status_str)) {
+      return true;
+    }
+  }
+  return false;
+}
+
+void LinuxPowerManagerService::reduce() {
+  auto curr_time = std::chrono::system_clock::now().time_since_epoch() / std::chrono::milliseconds(1);
+  last_time_ = curr_time;
+}
+
+/**
+ * We expect that the wait period has been
+ */
+bool LinuxPowerManagerService::shouldReduce() {
+  if (!enabled_) {
+    logger_->log_trace("LPM not enabled");
+    return false;
+  }
+
+  bool overConsume = false;
+
+  std::vector<bool> batteryAlert;
+
+  auto prev_level = battery_level_.load();
+
+  bool all_discharging = paths_.size() > 0 ? true : false;
+
+  int battery_sum = 0;
+  for (const auto path_pair : paths_) {
+    auto capacity = path_pair.first;
+    auto status = path_pair.second;
+
+    std::ifstream capacity_file(capacity);
+    std::string capacity_str;
+    std::getline(capacity_file, capacity_str);
+    capacity_file.close();
+    int battery_level = std::stoi(capacity_str);
+    battery_sum += battery_level;
+
+    std::ifstream status_file(status);
+    std::string status_str;
+    std::getline(status_file, status_str);
+    status_file.close();
+
+    if (!utils::StringUtils::equalsIgnoreCase(status_keyword_, status_str)) {
+      all_discharging &= false;
+    }
+  }
+
+  // average
+  battery_level_ = battery_sum / paths_.size();
+
+  // only reduce if we're still going down OR we've triggered the low battery threshold
+  if (battery_level_ < trigger_ && (battery_level_ < prev_level || battery_level_ < low_battery_trigger_)) {
+    if (all_discharging) {
+      // return true and wait until
+      if (last_time_ == 0) {
+        overConsume = true;
+        last_time_ = std::chrono::system_clock::now().time_since_epoch() / std::chrono::milliseconds(1);
+      } else {
+        auto curr_time = std::chrono::system_clock::now().time_since_epoch() / std::chrono::milliseconds(1);
+        if (curr_time - last_time_ > wait_period_) {
+          overConsume = true;
+          logger_->log_trace("All banks are discharging, suggesting reduction");
+        } else {
+          logging::LOG_DEBUG(logger_) << "dischaging but can't reduce due to time " << curr_time << " " << last_time_ << "  " << wait_period_;
+        }
+      }
+    }
+
+  } else {
+    logger_->log_trace("%d level is not below trigger of %d", battery_level_.load(), trigger_);
+  }
+
+  return overConsume;
+}
+
+void LinuxPowerManagerService::initialize() {
+  ThreadManagementService::initialize();
+  std::set<core::Property> supportedProperties;
+  supportedProperties.insert(BatteryCapacityPath);
+  supportedProperties.insert(BatteryStatusPath);
+  supportedProperties.insert(TriggerThreshold);
+  supportedProperties.insert(LowBatteryThreshold);
+  supportedProperties.insert(WaitPeriod);
+  setSupportedProperties(supportedProperties);
+}
+
+void LinuxPowerManagerService::yield() {
+}
+
+bool LinuxPowerManagerService::isRunning() {
+  return getState() == core::controller::ControllerServiceState::ENABLED;
+}
+
+bool LinuxPowerManagerService::isWorkAvailable() {
+  return false;
+}
+
+void LinuxPowerManagerService::onEnable() {
+  if (nullptr == configuration_) {
+    logger_->log_trace("Cannot enable Linux Power Manager");
+    return;
+  }
+  std::string trigger, wait;
+  status_keyword_ = "Discharging";
+  core::Property capacityPaths;
+  core::Property statusPaths;
+
+  if (getProperty(TriggerThreshold.getName(), trigger) && getProperty(WaitPeriod.getName(), wait)) {
+    core::TimeUnit unit;
+    int64_t wait_time;
+    if (core::Property::StringToTime(wait, wait_time, unit) && core::Property::ConvertTimeUnitToMS(wait_time, unit, wait_time)) {
+      wait_period_ = wait_time;
+    }
+
+    getProperty(BatteryStatusDischargeKeyword.getName(), status_keyword_);
+
+    trigger_ = std::stoi(trigger);
+
+    if (getProperty(LowBatteryThreshold.getName(), trigger)) {
+      low_battery_trigger_ = std::stoi(trigger);
+    } else {
+      low_battery_trigger_ = 0;
+    }
+    getProperty(BatteryCapacityPath.getName(), capacityPaths);
+    getProperty(BatteryStatusPath.getName(), statusPaths);
+    if (capacityPaths.getValues().size() == statusPaths.getValues().size()) {
+      for (size_t i = 0; i < capacityPaths.getValues().size(); i++) {
+        paths_.push_back(std::make_pair(capacityPaths.getValues().at(i), statusPaths.getValues().at(i)));
+      }
+    } else {
+      logger_->log_error("BatteryCapacityPath and BatteryStatusPath mis-configuration");
+    }
+    enabled_ = true;
+    logger_->log_trace("Enabled enable ");
+  } else {
+    logger_->log_trace("Could not enable ");
+  }
+}
+} /* namespace controllers */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/254877fa/libminifi/src/utils/ByteArrayCallback.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/utils/ByteArrayCallback.cpp b/libminifi/src/utils/ByteArrayCallback.cpp
index d2fe2e5..19d815f 100644
--- a/libminifi/src/utils/ByteArrayCallback.cpp
+++ b/libminifi/src/utils/ByteArrayCallback.cpp
@@ -68,8 +68,6 @@ void ByteOutputCallback::write(char *data, size_t size) {
     std::unique_lock<std::recursive_mutex> lock(vector_lock_);
     spinner_.wait(lock, [&] {
       return read_started_ || !is_alive_;});
-
-    std::cout << "unlock" << std::endl;
     if (!is_alive_)
       return;
   }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/254877fa/main/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/main/CMakeLists.txt b/main/CMakeLists.txt
index 3a51f14..174853e 100644
--- a/main/CMakeLists.txt
+++ b/main/CMakeLists.txt
@@ -52,7 +52,13 @@ find_package(OpenSSL REQUIRED)
 include_directories(${OPENSSL_INCLUDE_DIR})
 
 # Link against minifi, yaml-cpp, civetweb-cpp, uuid, openssl, jsoncpp and rocksdb
-target_link_libraries(minifiexe core-minifi)
+#target_link_libraries(minifiexe core-minifi)
+
+if (APPLE)
+	target_link_libraries (minifiexe -Wl,-all_load core-minifi)
+else ()
+	target_link_libraries (minifiexe -Wl,--whole-archive core-minifi -Wl,--no-whole-archive)
+endif ()
 
 if (APPLE)
 	target_link_libraries (minifiexe -Wl,-all_load minifi)


Mime
View raw message