nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ald...@apache.org
Subject [8/8] nifi-minifi-cpp git commit: MINIFI-339: Add C2 base allowing for 1 protocol and n heartbeat reporters MINIFI-339: Add GetTCP Processor MINIFI-339: Add listener server MINIFI-339: Update to listener MINIFI-339: Resolve Issue with stack based process
Date Mon, 02 Oct 2017 14:57:21 GMT
MINIFI-339: Add C2 base allowing for 1 protocol and n heartbeat reporters
MINIFI-339: Add GetTCP Processor
MINIFI-339: Add listener server
MINIFI-339: Update to listener
MINIFI-339: Resolve Issue with stack based processor nodes losing scope
MINIFI-369: Update ListenHTTP processor to allow transfer encoding
MINIFI-339: Update rest receiver instantiation and fix issue found in GetFile
MINIFI-339: Rename content to operational arguments
MINIFI-371: remove virtual destructors when not needed
MINIFI-339: Fixing issues with GetTCP
MINIFI-378: Resolve issues with shutdown. Took the approach to call notifyStop at the destructor to avoid larger changes
MINIFI-339: Allow C2 to be disabled

This closes #134.

Signed-off-by: Aldrin Piri <aldrin@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/9f161a27
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/9f161a27
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/9f161a27

Branch: refs/heads/master
Commit: 9f161a27e5fa0ea9aac4d59d1c23edbad3d77858
Parents: f5832fa
Author: Marc Parisi <phrocker@apache.org>
Authored: Wed Jun 21 10:47:35 2017 -0400
Committer: Aldrin Piri <aldrin@apache.org>
Committed: Mon Oct 2 10:56:07 2017 -0400

----------------------------------------------------------------------
 CMakeLists.txt                                  |   2 +
 README.md                                       |  24 +
 cmake/BuildTests.cmake                          |  23 +-
 libminifi/CMakeLists.txt                        |   4 +-
 libminifi/include/ConfigurationListener.h       | 117 -----
 libminifi/include/Connection.h                  |   6 +-
 libminifi/include/EventDrivenSchedulingAgent.h  |   2 +-
 libminifi/include/FlowController.h              |  92 +++-
 libminifi/include/FlowFileRecord.h              |   1 -
 libminifi/include/HttpConfigurationListener.h   |  76 ---
 libminifi/include/RemoteProcessorGroupPort.h    |  22 +-
 libminifi/include/ResourceClaim.h               |  29 +-
 libminifi/include/SchedulingAgent.h             |  17 +-
 libminifi/include/Site2SiteClientProtocol.h     | 115 ++---
 libminifi/include/Site2SitePeer.h               |   2 +-
 libminifi/include/ThreadedSchedulingAgent.h     |  14 +-
 libminifi/include/TimerDrivenSchedulingAgent.h  |  10 +-
 libminifi/include/c2/C2Agent.h                  | 200 ++++++++
 libminifi/include/c2/C2Payload.h                | 192 ++++++++
 libminifi/include/c2/C2Protocol.h               | 119 +++++
 libminifi/include/c2/HeartBeatReporter.h        | 101 ++++
 libminifi/include/c2/protocols/Protocols.h      |  24 +
 libminifi/include/c2/protocols/RESTProtocol.h   |  74 +++
 libminifi/include/c2/protocols/RESTReceiver.h   | 110 +++++
 libminifi/include/c2/protocols/RESTSender.h     |  80 +++
 .../include/controllers/SSLContextService.h     |  38 +-
 libminifi/include/core/ClassLoader.h            |  72 +++
 libminifi/include/core/ConfigurationFactory.h   |  15 +-
 libminifi/include/core/ContentRepository.h      |  50 ++
 libminifi/include/core/FlowConfiguration.h      |   6 +-
 libminifi/include/core/ProcessContext.h         |  34 +-
 libminifi/include/core/ProcessGroup.h           |   2 +
 libminifi/include/core/ProcessSession.h         |  13 +-
 libminifi/include/core/ProcessSessionFactory.h  |   6 +-
 libminifi/include/core/Processor.h              |  25 +-
 libminifi/include/core/ProcessorNode.h          |  10 +-
 libminifi/include/core/Property.h               |   5 +
 libminifi/include/core/Relationship.h           |   2 +-
 libminifi/include/core/Repository.h             |   9 +-
 libminifi/include/core/StreamManager.h          |  13 +
 .../core/controller/ControllerServiceProvider.h |   5 +-
 .../StandardControllerServiceProvider.h         |  20 +-
 libminifi/include/core/logging/Logger.h         |   2 +-
 .../include/core/logging/LoggerConfiguration.h  |   7 +-
 .../SiteToSiteProvenanceReportingTask.h         |   2 +-
 .../include/core/repository/AtomicRepoEntries.h |  97 ++--
 .../core/repository/FileSystemRepository.h      |   2 +
 .../core/repository/FlowFileRepository.h        |  24 +-
 .../core/repository/VolatileContentRepository.h |   8 +-
 .../repository/VolatileFlowFileRepository.h     |   9 +-
 .../repository/VolatileProvenanceRepository.h   |   3 +-
 .../core/repository/VolatileRepository.h        |   7 +-
 .../include/core/state/ProcessorController.h    |  73 +++
 libminifi/include/core/state/StateManager.h     | 126 +++++
 libminifi/include/core/state/UpdateController.h | 252 ++++++++++
 .../core/state/metrics/DeviceInformation.h      | 319 ++++++++++++
 .../include/core/state/metrics/MetricsBase.h    | 161 ++++++
 .../core/state/metrics/MetricsListener.h        | 128 +++++
 .../include/core/state/metrics/ProcessMetrics.h | 102 ++++
 .../include/core/state/metrics/QueueMetrics.h   | 106 ++++
 .../core/state/metrics/RepositoryMetrics.h      | 101 ++++
 .../include/core/state/metrics/SystemMetrics.h  | 109 +++++
 libminifi/include/core/yaml/YamlConfiguration.h |   4 +-
 libminifi/include/io/AtomicEntryStream.h        |  18 +-
 libminifi/include/io/BaseStream.h               |   2 +-
 libminifi/include/io/CRCStream.h                |   2 +-
 libminifi/include/io/ClientSocket.h             |  30 +-
 libminifi/include/io/EndianCheck.h              |   2 +-
 libminifi/include/io/FileStream.h               |   2 +-
 libminifi/include/processors/GetFile.h          |  64 ++-
 libminifi/include/processors/GetTCP.h           | 288 +++++++++++
 libminifi/include/processors/InvokeHTTP.h       |  55 +--
 libminifi/include/processors/LoadProcessors.h   |   1 +
 libminifi/include/processors/PutFile.h          |   3 +-
 libminifi/include/properties/Configure.h        |   3 +-
 libminifi/include/properties/Properties.h       |   8 +
 libminifi/include/provenance/Provenance.h       |  19 +-
 .../include/provenance/ProvenanceRepository.h   |  21 +-
 libminifi/include/utils/ByteInputCallBack.h     |  11 +-
 libminifi/include/utils/HTTPClient.h            | 301 ++++++++++++
 libminifi/include/utils/HTTPUtils.h             | 304 ------------
 libminifi/include/utils/Id.h                    |   2 +-
 libminifi/include/utils/StringUtils.h           |   4 +-
 libminifi/include/utils/ThreadPool.h            |  53 +-
 libminifi/src/CPPLINT.cfg                       |   3 +
 libminifi/src/ConfigurationListener.cpp         |  87 ----
 libminifi/src/Configure.cpp                     |   1 +
 libminifi/src/Connection.cpp                    |  22 +-
 libminifi/src/EventDrivenSchedulingAgent.cpp    |   3 +-
 libminifi/src/FlowController.cpp                | 290 +++++++++--
 libminifi/src/FlowFileRecord.cpp                |  10 +-
 libminifi/src/HttpConfigurationListener.cpp     | 102 ----
 libminifi/src/Properties.cpp                    |   4 +-
 libminifi/src/RemoteProcessorGroupPort.cpp      | 169 +++----
 libminifi/src/ResourceClaim.cpp                 |   9 +-
 libminifi/src/SchedulingAgent.cpp               |  22 +-
 libminifi/src/Site2SiteClientProtocol.cpp       | 256 +++++-----
 libminifi/src/ThreadedSchedulingAgent.cpp       |  20 +-
 libminifi/src/TimerDrivenSchedulingAgent.cpp    |   7 +-
 libminifi/src/c2/C2Agent.cpp                    | 485 +++++++++++++++++++
 libminifi/src/c2/C2Payload.cpp                  | 219 +++++++++
 libminifi/src/c2/protocols/RESTProtocol.cpp     | 177 +++++++
 libminifi/src/c2/protocols/RESTReceiver.cpp     | 148 ++++++
 libminifi/src/c2/protocols/RESTSender.cpp       | 144 ++++++
 libminifi/src/controllers/SSLContextService.cpp |  19 +-
 libminifi/src/core/ClassLoader.cpp              |   6 +-
 libminifi/src/core/ConfigurableComponent.cpp    |  14 +-
 libminifi/src/core/ConfigurationFactory.cpp     |   8 +-
 libminifi/src/core/Connectable.cpp              |  12 +-
 libminifi/src/core/FlowConfiguration.cpp        |   9 +-
 libminifi/src/core/FlowFile.cpp                 |   1 +
 libminifi/src/core/ProcessGroup.cpp             |  38 +-
 libminifi/src/core/ProcessSession.cpp           |  71 ++-
 libminifi/src/core/ProcessSessionFactory.cpp    |   4 +-
 libminifi/src/core/Processor.cpp                |  39 +-
 libminifi/src/core/ProcessorNode.cpp            |   8 +-
 libminifi/src/core/Repository.cpp               |   5 +-
 libminifi/src/core/RepositoryFactory.cpp        |   8 +-
 .../StandardControllerServiceNode.cpp           |   6 +-
 .../src/core/logging/LoggerConfiguration.cpp    |  16 +-
 .../SiteToSiteProvenanceReportingTask.cpp       |   6 +-
 .../core/repository/FileSystemRepository.cpp    |   5 +
 .../src/core/repository/FlowFileRepository.cpp  |  90 ++--
 .../repository/VolatileContentRepository.cpp    |  17 +
 .../src/core/state/ProcessorController.cpp      |  64 +++
 libminifi/src/core/state/StateManager.cpp       | 137 ++++++
 libminifi/src/core/state/UpdateController.cpp   |  76 +++
 libminifi/src/io/ClientSocket.cpp               |  49 +-
 libminifi/src/io/FileStream.cpp                 |   1 +
 libminifi/src/io/StreamFactory.cpp              |   6 +-
 libminifi/src/processors/ExecuteProcess.cpp     |   6 +-
 libminifi/src/processors/GenerateFlowFile.cpp   |   4 +-
 libminifi/src/processors/GetFile.cpp            |  21 +-
 libminifi/src/processors/GetTCP.cpp             | 289 +++++++++++
 libminifi/src/processors/InvokeHTTP.cpp         | 190 +++-----
 libminifi/src/processors/ListenHTTP.cpp         |  10 +-
 libminifi/src/processors/ListenSyslog.cpp       |   2 +-
 libminifi/src/processors/LogAttribute.cpp       |   1 +
 libminifi/src/processors/TailFile.cpp           |   4 +-
 libminifi/src/provenance/Provenance.cpp         |   6 +-
 .../src/provenance/ProvenanceRepository.cpp     |  36 +-
 libminifi/src/utils/HttpClient.cpp              | 214 ++++++++
 libminifi/src/utils/Id.cpp                      |  19 +-
 libminifi/test/.device_id                       |   1 +
 libminifi/test/TestBase.cpp                     |  16 +-
 libminifi/test/TestBase.h                       |  14 +-
 libminifi/test/TestServer.h                     |  54 +--
 .../test/integration/C2NullConfiguration.cpp    | 135 ++++++
 libminifi/test/integration/C2UpdateTest.cpp     | 185 +++++++
 .../integration/C2VerifyHeartbeatAndStop.cpp    | 153 ++++++
 .../test/integration/C2VerifyServeResults.cpp   | 131 +++++
 .../ControllerServiceIntegrationTests.cpp       |  33 +-
 libminifi/test/integration/GetFileNoData.cpp    | 184 +++++++
 .../HttpConfigurationListenerTest.cpp           | 131 -----
 .../test/integration/HttpGetIntegrationTest.cpp |  45 +-
 .../integration/HttpPostIntegrationTest.cpp     | 127 +++--
 libminifi/test/integration/IntegrationBase.h    | 177 +++++++
 .../integration/ProvenanceReportingTest.cpp     |   1 +
 .../test/integration/Site2SiteRestTest.cpp      | 148 ------
 .../test/integration/SiteToSiteRestTest.cpp     | 144 ++++++
 .../test/integration/TestExecuteProcess.cpp     |   9 +-
 libminifi/test/integration/ThreadPoolAdjust.cpp | 109 +++++
 libminifi/test/resources/TestHTTPGet.yml        |  10 +-
 libminifi/test/resources/TestHTTPGetSecure.yml  |  13 +-
 libminifi/test/resources/TestHTTPPost.yml       |  59 ++-
 .../resources/TestHTTPPostChunkedEncoding.yml   |  97 ++++
 .../test/resources/TestSite2SiteRestSecure.yml  |  58 +++
 libminifi/test/resources/cn.crt.key.pem         |  52 ++
 libminifi/test/resources/nifi-cert-key.pem      |  47 ++
 libminifi/test/unit/C2MetricsTests.cpp          | 230 +++++++++
 libminifi/test/unit/GetTCPTests.cpp             | 419 ++++++++++++++++
 libminifi/test/unit/InvokeHTTPTests.cpp         |  99 ++--
 libminifi/test/unit/LoggerTests.cpp             |   6 +
 libminifi/test/unit/ProcessorTests.cpp          |  40 +-
 libminifi/test/unit/PropertyTests.cpp           |  13 +
 libminifi/test/unit/ProvenanceTestHelper.h      |  27 +-
 libminifi/test/unit/RepoTests.cpp               |  54 +++
 libminifi/test/unit/SchedulingAgentTests.cpp    |  36 ++
 libminifi/test/unit/Site2SiteTests.cpp          |   2 +-
 libminifi/test/unit/TailFileTests.cpp           |  80 ++-
 libminifi/test/unit/ThreadPoolTests.cpp         |  55 +++
 libminifi/test/unit/YamlConfigurationTests.cpp  |   3 +-
 main/CMakeLists.txt                             |   7 +-
 183 files changed, 9290 insertions(+), 2225 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 292bc8d..a81127c 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -35,6 +35,8 @@ ENDIF(POLICY CMP0048)
 include(CheckCXXCompilerFlag)
 CHECK_CXX_COMPILER_FLAG("-std=c++11 " COMPILER_SUPPORTS_CXX11)
 CHECK_CXX_COMPILER_FLAG("-std=c++0x " COMPILER_SUPPORTS_CXX0X)
+SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ")
+SET(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS}")
 if(COMPILER_SUPPORTS_CXX11)
     set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11")
 elseif(COMPILER_SUPPORTS_CXX0X)

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index db10ab4..1c879c5 100644
--- a/README.md
+++ b/README.md
@@ -317,6 +317,30 @@ Additionally, users can utilize the MiNiFi Toolkit Converter (version 0.0.1 - sc
 
     if you do not want to enable client certificate base authorization
     nifi.security.need.ClientAuth=false
+    
+### Command and Control Configuration
+For more more insight into the API used within the C2 agent, please visit:
+https://cwiki.apache.org/confluence/display/MINIFI/C2+Design+Proposal
+
+    in minifi.properties
+
+    #Disable/Enable C2
+    nifi.c2.enable=true
+
+	#specify metrics classes
+	nifi.flow.metrics.classes=DeviceInformation,SystemInformation,ProcessMetrics
+	
+	#specify C2 protocol
+	c2.agent.protocol.class=RESTSender
+	
+	#control c2 heartbeat interval in millisecocnds
+	c2.agent.heartbeat.period=3000
+	
+	# enable reporter classes
+	c2.agent.heartbeat.reporter.class=RESTReciver
+	
+	
+	     
 
 ### Configuring Volatile and NO-OP Repositories
 Each of the repositories can be configured to be volatile ( state kept in memory and flushed

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/cmake/BuildTests.cmake
----------------------------------------------------------------------
diff --git a/cmake/BuildTests.cmake b/cmake/BuildTests.cmake
index 29603bf..69c5e7d 100644
--- a/cmake/BuildTests.cmake
+++ b/cmake/BuildTests.cmake
@@ -40,10 +40,14 @@ function(createTests testName)
     target_include_directories(${testName} PRIVATE BEFORE ${LEVELDB_INCLUDE_DIRS})
     target_include_directories(${testName} PRIVATE BEFORE "include")
     target_include_directories(${testName} PRIVATE BEFORE "libminifi/include/")
+    target_include_directories(${testName} PRIVATE BEFORE "libminifi/include/c2/protocols")
+    target_include_directories(${testName} PRIVATE BEFORE "libminifi/include/c2")
     target_include_directories(${testName} PRIVATE BEFORE "libminifi/include/core")
     target_include_directories(${testName} PRIVATE BEFORE "libminifi/include/core/controller")
     target_include_directories(${testName} PRIVATE BEFORE "libminifi/include/core/repository")
     target_include_directories(${testName} PRIVATE BEFORE "libminifi/include/core/yaml")
+    target_include_directories(${testName} PRIVATE BEFORE "libminifi/include/core/statemanagement")
+    target_include_directories(${testName} PRIVATE BEFORE "libminifi/include/core/statemanagement/metrics")
     target_include_directories(${testName} PRIVATE BEFORE "libminifi/include/io")
     target_include_directories(${testName} PRIVATE BEFORE "libminifi/include/utils")
     target_include_directories(${testName} PRIVATE BEFORE "libminifi/include/processors")
@@ -88,12 +92,25 @@ add_test(NAME ControllerServiceIntegrationTests COMMAND ControllerServiceIntegra
 
 add_test(NAME HttpGetIntegrationTest COMMAND HttpGetIntegrationTest "${TEST_RESOURCES}/TestHTTPGet.yml"  "${TEST_RESOURCES}/")
 
-add_test(NAME HttpConfigurationListenerTest COMMAND HttpConfigurationListenerTest "${TEST_RESOURCES}/TestHTTPGet.yml"  "${TEST_RESOURCES}/")
+add_test(NAME C2UpdateTest COMMAND C2UpdateTest "${TEST_RESOURCES}/TestHTTPGet.yml"  "${TEST_RESOURCES}/")
 
 add_test(NAME HttpGetIntegrationTestSecure COMMAND HttpGetIntegrationTest "${TEST_RESOURCES}/TestHTTPGetSecure.yml"  "${TEST_RESOURCES}/")
 
-add_test(NAME HttpPostIntegrationTest COMMAND HttpPostIntegrationTest "${TEST_RESOURCES}/TestHTTPPost.yml" )
+add_test(NAME HttpPostIntegrationTest COMMAND HttpPostIntegrationTest "${TEST_RESOURCES}/TestHTTPPost.yml" "${TEST_RESOURCES}/")
 
-add_test(NAME Site2SiteRestTest COMMAND Site2SiteRestTest "${TEST_RESOURCES}/TestSite2SiteRest.yml" "${TEST_RESOURCES}/")
+add_test(NAME HttpPostIntegrationTestChunked COMMAND HttpPostIntegrationTest "${TEST_RESOURCES}/TestHTTPPostChunkedEncoding.yml" "${TEST_RESOURCES}/")
+
+add_test(NAME ThreadPoolAdjust COMMAND ThreadPoolAdjust "${TEST_RESOURCES}/TestHTTPPostChunkedEncoding.yml" "${TEST_RESOURCES}/")
+
+add_test(NAME C2VerifyServeResults COMMAND C2VerifyServeResults "${TEST_RESOURCES}/TestHTTPGet.yml" "${TEST_RESOURCES}/")
+
+add_test(NAME C2NullConfiguration COMMAND C2NullConfiguration "${TEST_RESOURCES}/TestHTTPGet.yml" "${TEST_RESOURCES}/")
+
+add_test(NAME C2VerifyHeartbeatAndStop COMMAND C2VerifyHeartbeatAndStop "${TEST_RESOURCES}/TestHTTPGet.yml" "${TEST_RESOURCES}/")
+
+add_test(NAME SiteToSiteRestTest COMMAND SiteToSiteRestTest "${TEST_RESOURCES}/TestSite2SiteRest.yml" "${TEST_RESOURCES}/" "http://localhost:8082/nifi-api/controller")
+
+## removed due to travis issues with our certs
+#add_test(NAME SiteToSiteRestTestSecure COMMAND SiteToSiteRestTest "${TEST_RESOURCES}/TestSite2SiteRestSecure.yml" "${TEST_RESOURCES}/" "https://localhost:8082/nifi-api/controller")
 
 add_test(NAME TestExecuteProcess COMMAND TestExecuteProcess )

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/libminifi/CMakeLists.txt b/libminifi/CMakeLists.txt
index 5e63a30..539256b 100644
--- a/libminifi/CMakeLists.txt
+++ b/libminifi/CMakeLists.txt
@@ -62,7 +62,7 @@ include_directories(../thirdparty/jsoncpp/include)
 include_directories(../thirdparty/concurrentqueue/)
 include_directories(include)
 
-file(GLOB SOURCES  "src/core/logging/*.cpp"  "src/io/*.cpp" "src/io/tls/*.cpp" "src/core/controller/*.cpp" "src/controllers/*.cpp" "src/core/*.cpp"  "src/core/repository/*.cpp" "src/core/yaml/*.cpp" "src/core/reporting/*.cpp"  "src/provenance/*.cpp" "src/processors/*.cpp" "src/utils/*.cpp" "src/*.cpp")
+file(GLOB SOURCES  "src/core/logging/*.cpp"  "src/core/state/*.cpp" "src/c2/protocols/*.cpp" "src/c2/*.cpp" "src/io/*.cpp" "src/io/tls/*.cpp" "src/core/controller/*.cpp" "src/controllers/*.cpp" "src/core/*.cpp"  "src/core/repository/*.cpp" "src/core/yaml/*.cpp" "src/core/reporting/*.cpp"  "src/provenance/*.cpp" "src/processors/*.cpp" "src/utils/*.cpp" "src/*.cpp")
 
 file(GLOB SPD_SOURCES "../thirdparty/spdlog-20170710/include/spdlog/*")
 
@@ -110,3 +110,5 @@ endif (OPENSSL_FOUND)
 endif ()
 
 add_subdirectory(src/utils)
+
+set_property(TARGET minifi PROPERTY INTERPROCEDURAL_OPTIMIZATION True)

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/ConfigurationListener.h
----------------------------------------------------------------------
diff --git a/libminifi/include/ConfigurationListener.h b/libminifi/include/ConfigurationListener.h
deleted file mode 100644
index 856ea95..0000000
--- a/libminifi/include/ConfigurationListener.h
+++ /dev/null
@@ -1,117 +0,0 @@
-/**
- * ConfigurationListener class declaration
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef __CONFIGURATION_LISTENER__
-#define __CONFIGURATION_LISTENER__
-
-#include <memory>
-#include <atomic>
-#include <cstdint>
-#include <cstring>
-#include <iostream>
-#include <string>
-#include <thread>
-
-#include "yaml-cpp/yaml.h"
-#include "core/Core.h"
-#include "core/Property.h"
-#include "properties/Configure.h"
-#include "core/logging/Logger.h"
-#include "core/logging/LoggerConfiguration.h"
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-// Forwarder declaration
-class FlowController;
-// ConfigurationListener Class
-class ConfigurationListener {
-public:
-
-  // Constructor
-  /*!
-   * Create a new processor
-   */
-  ConfigurationListener(std::shared_ptr<FlowController> controller,
-      std::shared_ptr<Configure> configure, std::string type) :
-      connect_timeout_(20000), read_timeout_(20000), type_(type), configure_(
-          configure), controller_(controller) {
-    logger_ = logging::LoggerFactory<ConfigurationListener>::getLogger();
-    running_ = false;
-
-  }
-  // Destructor
-  virtual ~ConfigurationListener() {
-    stop();
-  }
-
-  // Start the thread
-  void start();
-  // Stop the thread
-  void stop();
-  // whether the thread is enable
-  bool isRunning() {
-    return running_;
-  }
-  // pull the new configuration from the remote host
-  virtual bool pullConfiguration(std::string &configuration) {
-    return false;
-  }
-
-protected:
-
-  // Run function for the thread
-  void run();
-
-  // Run function for the thread
-  void threadExecutor() {
-    run();
-  }
-
-  // Mutex for protection
-  std::mutex mutex_;
-  // thread
-  std::thread thread_;
-  // whether the thread is running
-  std::atomic<bool> running_;
-
-  // url
-  std::string url_;
-  // connection timeout
-  int64_t connect_timeout_;
-  // read timeout.
-  int64_t read_timeout_;
-  // pull interval
-  int64_t pull_interval_;
-  // type (http/rest)
-  std::string type_;
-  // last applied configuration
-  std::string lastAppliedConfiguration;
-
-  std::shared_ptr<Configure> configure_;
-  std::shared_ptr<logging::Logger> logger_;
-  std::shared_ptr<FlowController> controller_;
-};
-
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
-
-#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/Connection.h
----------------------------------------------------------------------
diff --git a/libminifi/include/Connection.h b/libminifi/include/Connection.h
index ff32baf..c92a626 100644
--- a/libminifi/include/Connection.h
+++ b/libminifi/include/Connection.h
@@ -47,8 +47,8 @@ class Connection : public core::Connectable, public std::enable_shared_from_this
   /*
    * Create a new processor
    */
-  explicit Connection(const std::shared_ptr<core::Repository> &flow_repository, const std::shared_ptr<core::ContentRepository> &content_repo, std::string name, uuid_t uuid = NULL,
-                      uuid_t srcUUID = NULL,
+  explicit Connection(const std::shared_ptr<core::Repository> &flow_repository, const std::shared_ptr<core::ContentRepository> &content_repo, std::string name, uuid_t uuid = NULL, uuid_t srcUUID =
+                          NULL,
                       uuid_t destUUID = NULL);
   // Destructor
   virtual ~Connection() {
@@ -130,7 +130,7 @@ class Connection : public core::Connectable, public std::enable_shared_from_this
   }
   // Get queue data size
   uint64_t getQueueDataSize() {
-    return max_data_queue_size_;
+    return queued_data_size_;
   }
   // Put the flow file into queue
   void put(std::shared_ptr<core::FlowFile> flow);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/EventDrivenSchedulingAgent.h
----------------------------------------------------------------------
diff --git a/libminifi/include/EventDrivenSchedulingAgent.h b/libminifi/include/EventDrivenSchedulingAgent.h
index ca9f021..b434de5 100644
--- a/libminifi/include/EventDrivenSchedulingAgent.h
+++ b/libminifi/include/EventDrivenSchedulingAgent.h
@@ -46,7 +46,7 @@ class EventDrivenSchedulingAgent : public ThreadedSchedulingAgent {
   virtual ~EventDrivenSchedulingAgent() {
   }
   // Run function for the thread
-  uint64_t run(std::shared_ptr<core::Processor> processor, core::ProcessContext *processContext, core::ProcessSessionFactory *sessionFactory);
+  uint64_t run(const std::shared_ptr<core::Processor> &processor, const std::shared_ptr<core::ProcessContext> &processContext, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory);
 
  private:
   // Prevent default copy constructor and assignment operation

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/FlowController.h
----------------------------------------------------------------------
diff --git a/libminifi/include/FlowController.h b/libminifi/include/FlowController.h
index d9a0452..e79999f 100644
--- a/libminifi/include/FlowController.h
+++ b/libminifi/include/FlowController.h
@@ -44,11 +44,10 @@
 #include "TimerDrivenSchedulingAgent.h"
 #include "EventDrivenSchedulingAgent.h"
 #include "FlowControlProtocol.h"
-#include "ConfigurationListener.h"
-#include "HttpConfigurationListener.h"
-
 #include "core/Property.h"
 #include "utils/Id.h"
+#include "core/state/metrics/MetricsBase.h"
+#include "core/state/StateManager.h"
 
 namespace org {
 namespace apache {
@@ -62,7 +61,7 @@ namespace minifi {
  * Flow Controller class. Generally used by FlowController factory
  * as a singleton.
  */
-class FlowController : public core::controller::ControllerServiceProvider, public std::enable_shared_from_this<FlowController> {
+class FlowController : public core::controller::ControllerServiceProvider, public state::StateManager {
  public:
   static const int DEFAULT_MAX_TIMER_DRIVEN_THREAD = 10;
   static const int DEFAULT_MAX_EVENT_DRIVEN_THREAD = 5;
@@ -71,20 +70,16 @@ class FlowController : public core::controller::ControllerServiceProvider, publi
    * Flow controller constructor
    */
   explicit FlowController(std::shared_ptr<core::Repository> provenance_repo, std::shared_ptr<core::Repository> flow_file_repo, std::shared_ptr<Configure> configure,
-                          std::unique_ptr<core::FlowConfiguration> flow_configuration,
-                          std::shared_ptr<core::ContentRepository> content_repo, const std::string name, bool headless_mode);
+                          std::unique_ptr<core::FlowConfiguration> flow_configuration, std::shared_ptr<core::ContentRepository> content_repo, const std::string name, bool headless_mode);
 
   explicit FlowController(std::shared_ptr<core::Repository> provenance_repo, std::shared_ptr<core::Repository> flow_file_repo, std::shared_ptr<Configure> configure,
-                          std::unique_ptr<core::FlowConfiguration> flow_configuration,
-                          std::shared_ptr<core::ContentRepository> content_repo)
-      : FlowController(provenance_repo, flow_file_repo, configure, std::move(flow_configuration), content_repo, DEFAULT_ROOT_GROUP_NAME, false)
-  {
+                          std::unique_ptr<core::FlowConfiguration> flow_configuration, std::shared_ptr<core::ContentRepository> content_repo)
+      : FlowController(provenance_repo, flow_file_repo, configure, std::move(flow_configuration), content_repo, DEFAULT_ROOT_GROUP_NAME, false) {
   }
 
   explicit FlowController(std::shared_ptr<core::Repository> provenance_repo, std::shared_ptr<core::Repository> flow_file_repo, std::shared_ptr<Configure> configure,
                           std::unique_ptr<core::FlowConfiguration> flow_configuration)
-      : FlowController(provenance_repo, flow_file_repo, configure, std::move(flow_configuration), std::make_shared<core::repository::FileSystemRepository>(), DEFAULT_ROOT_GROUP_NAME, false)
-  {
+      : FlowController(provenance_repo, flow_file_repo, configure, std::move(flow_configuration), std::make_shared<core::repository::FileSystemRepository>(), DEFAULT_ROOT_GROUP_NAME, false) {
     content_repo_->initialize(configure);
   }
 
@@ -124,14 +119,33 @@ class FlowController : public core::controller::ControllerServiceProvider, publi
   virtual bool isRunning() {
     return running_.load();
   }
+
   // Whether the Flow Controller has already been initialized (loaded flow XML)
   virtual bool isInitialized() {
     return initialized_.load();
   }
   // Start to run the Flow Controller which internally start the root process group and all its children
-  virtual bool start();
+  virtual int16_t start();
+  virtual int16_t pause() {
+    return -1;
+  }
   // Unload the current flow YAML, clean the root process group and all its children
-  virtual void stop(bool force);
+  virtual int16_t stop(bool force, uint64_t timeToWait = 0);
+  virtual int16_t applyUpdate(const std::string &configuration);
+  virtual int16_t drainRepositories() {
+
+    return -1;
+  }
+
+  virtual std::vector<std::shared_ptr<state::StateController>> getComponents(const std::string &name);
+
+  virtual std::vector<std::shared_ptr<state::StateController>> getAllComponents();
+
+  virtual int16_t clearConnection(const std::string &connection);
+
+  virtual int16_t applyUpdate(const std::shared_ptr<state::Update> &updateController) {
+    return -1;
+  }
   // Asynchronous function trigger unloading and wait for a period of time
   virtual void waitUnload(const uint64_t timeToWaitMs);
   // Unload the current flow xml, clean the root process group and all its children
@@ -158,7 +172,7 @@ class FlowController : public core::controller::ControllerServiceProvider, publi
   // first it will validate the payload with the current root node config for flowController
   // like FlowController id/name is the same and new version is greater than the current version
   // after that, it will apply the configuration
-  bool applyConfiguration(std::string &configurePayload);
+  bool applyConfiguration(const std::string &configurePayload);
 
   // get name
   std::string getName() {
@@ -168,6 +182,10 @@ class FlowController : public core::controller::ControllerServiceProvider, publi
       return "";
   }
 
+  virtual std::string getComponentName() {
+    return "FlowController";
+  }
+
   // get version
   int getVersion() {
     if (root_ != nullptr)
@@ -199,7 +217,7 @@ class FlowController : public core::controller::ControllerServiceProvider, publi
    * Enables the controller service services
    * @param serviceNode service node which will be disabled, along with linked services.
    */
-  virtual void enableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode);
+  virtual std::future<bool> enableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode);
 
   /**
    * Enables controller services
@@ -211,7 +229,7 @@ class FlowController : public core::controller::ControllerServiceProvider, publi
    * Disables controller services
    * @param serviceNode service node which will be disabled, along with linked services.
    */
-  virtual void disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode);
+  virtual std::future<bool> disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode);
 
   /**
    * Gets all controller services.
@@ -278,11 +296,25 @@ class FlowController : public core::controller::ControllerServiceProvider, publi
    */
   virtual void enableAllControllerServices();
 
+  /**
+   * Retrieves all metrics from this source.
+   * @param metric_vector -- metrics will be placed in this vector.
+   * @return result of the get operation.
+   *  0 Success
+   *  1 No error condition, but cannot obtain lock in timely manner.
+   *  -1 failure
+   */
+  virtual int16_t getMetrics(std::vector<std::shared_ptr<state::metrics::Metrics>> &metric_vector, uint8_t metricsClass);
+
+  virtual uint64_t getUptime();
+
  protected:
 
   // function to load the flow file repo.
   void loadFlowRepo();
 
+  void initializeC2();
+
   /**
    * Initializes flow controller paths.
    */
@@ -304,8 +336,12 @@ class FlowController : public core::controller::ControllerServiceProvider, publi
   // FlowFile Repo
   // Whether it is running
   std::atomic<bool> running_;
+
   // conifiguration filename
   std::string configuration_filename_;
+
+  std::atomic<bool> c2_initialized_;
+  std::atomic<bool> c2_enabled_;
   // Whether it has already been initialized (load the flow XML already)
   std::atomic<bool> initialized_;
   // Provenance Repo
@@ -336,15 +372,29 @@ class FlowController : public core::controller::ControllerServiceProvider, publi
   // flow configuration object.
   std::unique_ptr<core::FlowConfiguration> flow_configuration_;
 
- private:
+  // metrics information
+
+  std::chrono::steady_clock::time_point start_time_;
+
+  std::mutex metrics_mutex_;
+  // metrics cache
+  std::map<std::string, std::shared_ptr<state::metrics::Metrics>> metrics_;
+
+  // metrics cache
+  std::map<std::string, std::shared_ptr<state::metrics::Metrics>> component_metrics_;
+
+  std::map<uint8_t, std::vector<std::shared_ptr<state::metrics::Metrics>>>component_metrics_by_id_;
+  // metrics last run
+  std::chrono::steady_clock::time_point last_metrics_capture_;
+
+private:
   std::shared_ptr<logging::Logger> logger_;
-  // http configuration listener object.
-  std::unique_ptr<HttpConfigurationListener> http_configuration_listener_;
   std::string serial_number_;
   static std::shared_ptr<utils::IdGenerator> id_generator_;
 };
 
-} /* namespace minifi */
+}
+/* namespace minifi */
 } /* namespace nifi */
 } /* namespace apache */
 } /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/FlowFileRecord.h
----------------------------------------------------------------------
diff --git a/libminifi/include/FlowFileRecord.h b/libminifi/include/FlowFileRecord.h
index d6e5f2e..9299b8a 100644
--- a/libminifi/include/FlowFileRecord.h
+++ b/libminifi/include/FlowFileRecord.h
@@ -94,7 +94,6 @@ class OutputStreamCallback {
   virtual ~OutputStreamCallback() {
 
   }
-  //virtual void process(std::ofstream *stream) = 0;
   virtual int64_t process(std::shared_ptr<io::BaseStream> stream) = 0;
 
 };

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/HttpConfigurationListener.h
----------------------------------------------------------------------
diff --git a/libminifi/include/HttpConfigurationListener.h b/libminifi/include/HttpConfigurationListener.h
deleted file mode 100644
index 7e3291e..0000000
--- a/libminifi/include/HttpConfigurationListener.h
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * HttpConfigurationListener class declaration
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef __HTTP_CONFIGURATION_LISTENER__
-#define __HTTP_CONFIGURATION_LISTENER__
-
-#include <curl/curl.h>
-#include "core/Core.h"
-#include "core/Property.h"
-#include "ConfigurationListener.h"
-#include "utils/HTTPUtils.h"
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-
-// HttpConfigurationListener Class
-class HttpConfigurationListener: public ConfigurationListener {
-public:
-
-  // Constructor
-  /*!
-   * Create a new processor
-   */
-  HttpConfigurationListener(std::shared_ptr<FlowController> controller,
-      std::shared_ptr<Configure> configure) :
-      minifi::ConfigurationListener(controller, configure, "http"),
-      securityConfig_(configure) {
-      std::string value;
-
-      if (configure->get(Configure::nifi_configuration_listener_http_url, value)) {
-        url_ = value;
-        logger_->log_info("Http configuration listener URL %s", url_.c_str());
-      } else {
-        url_ = "";
-      }
-
-      curl_global_init(CURL_GLOBAL_DEFAULT);
-      this->start();
-  }
-
-  bool pullConfiguration(std::string &configuration);
-
-  // Destructor
-  virtual ~HttpConfigurationListener() {
-    this->stop();
-    curl_global_cleanup();
-  }
-
-protected:
-  minifi::utils::HTTPSecurityConfiguration securityConfig_;
-
-};
-
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
-
-#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/RemoteProcessorGroupPort.h
----------------------------------------------------------------------
diff --git a/libminifi/include/RemoteProcessorGroupPort.h b/libminifi/include/RemoteProcessorGroupPort.h
index d484fb9..cefce45 100644
--- a/libminifi/include/RemoteProcessorGroupPort.h
+++ b/libminifi/include/RemoteProcessorGroupPort.h
@@ -23,13 +23,14 @@
 #include <mutex>
 #include <memory>
 #include <stack>
-#include "utils/HTTPUtils.h"
+#include "utils/HTTPClient.h"
 #include "concurrentqueue.h"
 #include "FlowFileRecord.h"
 #include "core/Processor.h"
 #include "core/ProcessSession.h"
 #include "Site2SiteClientProtocol.h"
 #include "io/StreamFactory.h"
+#include "controllers/SSLContextService.h"
 #include "core/logging/LoggerConfiguration.h"
 
 namespace org {
@@ -43,14 +44,15 @@ class RemoteProcessorGroupPort : public core::Processor {
   /*!
    * Create a new processor
    */
-  RemoteProcessorGroupPort(const std::shared_ptr<io::StreamFactory> &stream_factory, std::string name, std::string url, std::shared_ptr<Configure> configure, uuid_t uuid = nullptr)
+  RemoteProcessorGroupPort(const std::shared_ptr<io::StreamFactory> &stream_factory, std::string name, std::string url, const std::shared_ptr<Configure> &configure, uuid_t uuid = nullptr)
       : core::Processor(name, uuid),
         configure_(configure),
         direction_(SEND),
         transmitting_(false),
+        timeout_(0),
         logger_(logging::LoggerFactory<RemoteProcessorGroupPort>::getLogger()),
         url_(url),
-        securityConfig_(configure) {
+        ssl_service(nullptr) {
     stream_factory_ = stream_factory;
     if (uuid != nullptr) {
       uuid_copy(protocol_uuid_, uuid);
@@ -71,11 +73,12 @@ class RemoteProcessorGroupPort : public core::Processor {
   static const char *ProcessorName;
   // Supported Properties
   static core::Property hostName;
+  static core::Property SSLContext;
   static core::Property port;
   static core::Property portUUID;
   // Supported Relationships
   static core::Relationship relation;
-   public:
+ public:
   void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory);
   // OnTrigger method, implemented by NiFi RemoteProcessorGroupPort
   virtual void onTrigger(core::ProcessContext *context, core::ProcessSession *session);
@@ -102,8 +105,7 @@ class RemoteProcessorGroupPort : public core::Processor {
     if (port_ == -1) {
       if (protocol_.find("https") != std::string::npos) {
         port_ = 443;
-      }
-      else if (protocol_.find("http") != std::string::npos) {
+      } else if (protocol_.find("http") != std::string::npos) {
         port_ = 80;
       }
     }
@@ -142,15 +144,17 @@ class RemoteProcessorGroupPort : public core::Processor {
   std::string url_;
 
   // Remote Site2Site Info
-  int site2site_port_;
-  bool site2site_secure_;
+  int site2site_port_;bool site2site_secure_;
   std::vector<minifi::Site2SitePeerStatus> site2site_peer_status_list_;
   std::atomic<int> site2site_peer_index_;
   std::mutex site2site_peer_mutex_;
   std::string rest_user_name_;
   std::string rest_password_;
-  minifi::utils::HTTPSecurityConfiguration securityConfig_;
 
+  std::shared_ptr<controllers::SSLContextService> ssl_service;
+
+ private:
+  static const char* RPG_SSL_CONTEXT_SERVICE_NAME;
 };
 
 } /* namespace minifi */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/ResourceClaim.h
----------------------------------------------------------------------
diff --git a/libminifi/include/ResourceClaim.h b/libminifi/include/ResourceClaim.h
index 19a67fa..b788416 100644
--- a/libminifi/include/ResourceClaim.h
+++ b/libminifi/include/ResourceClaim.h
@@ -41,12 +41,15 @@ namespace minifi {
 // Default content directory
 #define DEFAULT_CONTENT_DIRECTORY "./content_repository"
 
+extern std::string default_directory_path;
+
+extern void setDefaultDirectory(std::string);
+
 // ResourceClaim Class
 class ResourceClaim : public std::enable_shared_from_this<ResourceClaim> {
 
  public:
 
-  static char *default_directory_path;
   // Constructor
   /*!
    * Create a new resource claim
@@ -55,23 +58,20 @@ class ResourceClaim : public std::enable_shared_from_this<ResourceClaim> {
 
   ResourceClaim(const std::string path, std::shared_ptr<core::StreamManager<ResourceClaim>> claim_manager, bool deleted = false);
   // Destructor
-  virtual ~ResourceClaim() {
+  ~ResourceClaim() {
   }
   // increaseFlowFileRecordOwnedCount
   void increaseFlowFileRecordOwnedCount() {
-    ++_flowFileRecordOwnedCount;
+    claim_manager_->incrementStreamCount(shared_from_this());
   }
   // decreaseFlowFileRecordOwenedCount
   void decreaseFlowFileRecordOwnedCount() {
-
-    if (_flowFileRecordOwnedCount > 0) {
-      _flowFileRecordOwnedCount--;
-    }
+    claim_manager_->decrementStreamCount(shared_from_this());
 
   }
   // getFlowFileRecordOwenedCount
   uint64_t getFlowFileRecordOwnedCount() {
-    return _flowFileRecordOwnedCount;
+    return claim_manager_->getStreamCount(shared_from_this());
   }
   // Get the content full path
   std::string getContentFullPath() {
@@ -83,13 +83,19 @@ class ResourceClaim : public std::enable_shared_from_this<ResourceClaim> {
   }
 
   void deleteClaim() {
-    if (!deleted_)
-    {
+    if (!deleted_) {
       deleted_ = true;
     }
 
   }
 
+  bool exists() {
+    if (claim_manager_ == nullptr) {
+      return false;
+    }
+    return claim_manager_->exists(shared_from_this());
+  }
+
   friend std::ostream& operator<<(std::ostream& stream, const ResourceClaim& claim) {
     stream << claim._contentFullPath;
     return stream;
@@ -104,9 +110,6 @@ class ResourceClaim : public std::enable_shared_from_this<ResourceClaim> {
   // Full path to the content
   std::string _contentFullPath;
 
-  // How many FlowFileRecord Own this cliam
-  std::atomic<uint64_t> _flowFileRecordOwnedCount;
-
   std::shared_ptr<core::StreamManager<ResourceClaim>> claim_manager_;
 
  private:

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/SchedulingAgent.h
----------------------------------------------------------------------
diff --git a/libminifi/include/SchedulingAgent.h b/libminifi/include/SchedulingAgent.h
index 130c088..569c4ee 100644
--- a/libminifi/include/SchedulingAgent.h
+++ b/libminifi/include/SchedulingAgent.h
@@ -52,10 +52,8 @@ class SchedulingAgent {
   /*!
    * Create a new scheduling agent.
    */
-  SchedulingAgent(std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider, std::shared_ptr<core::Repository> repo,
-		  std::shared_ptr<core::Repository> flow_repo,
-                  std::shared_ptr<core::ContentRepository> content_repo,
-                  std::shared_ptr<Configure> configuration)
+  SchedulingAgent(std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider, std::shared_ptr<core::Repository> repo, std::shared_ptr<core::Repository> flow_repo,
+                  std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<Configure> configuration)
       : configure_(configuration),
         admin_yield_duration_(0),
         bored_yield_duration_(0),
@@ -65,7 +63,7 @@ class SchedulingAgent {
     running_ = false;
     repo_ = repo;
     flow_repo_ = flow_repo;
-    utils::ThreadPool<bool> pool = utils::ThreadPool<bool>(configure_->getInt(Configure::nifi_flow_engine_threads, 8), true);
+    utils::ThreadPool<bool> pool = utils::ThreadPool<bool>(configure_->getInt(Configure::nifi_flow_engine_threads, 2), true);
     component_lifecycle_thread_pool_ = std::move(pool);
     component_lifecycle_thread_pool_.start();
   }
@@ -74,7 +72,7 @@ class SchedulingAgent {
 
   }
   // onTrigger, return whether the yield is need
-  bool onTrigger(std::shared_ptr<core::Processor> processor, core::ProcessContext *processContext, core::ProcessSessionFactory *sessionFactory);
+  bool onTrigger(std::shared_ptr<core::Processor> processor, std::shared_ptr<core::ProcessContext> processContext, std::shared_ptr<core::ProcessSessionFactory> sessionFactory);
   // Whether agent has work to do
   bool hasWorkToDo(std::shared_ptr<core::Processor> processor);
   // Whether the outgoing need to be backpressure
@@ -82,6 +80,7 @@ class SchedulingAgent {
   // start
   void start() {
     running_ = true;
+    component_lifecycle_thread_pool_.start();
   }
   // stop
   virtual void stop() {
@@ -90,8 +89,8 @@ class SchedulingAgent {
   }
 
  public:
-  virtual void enableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode);
-  virtual void disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode);
+  virtual std::future<bool> enableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode);
+  virtual std::future<bool> disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode);
   // schedule, overwritten by different DrivenSchedulingAgent
   virtual void schedule(std::shared_ptr<core::Processor> processor) = 0;
   // unschedule, overwritten by different DrivenSchedulingAgent
@@ -112,7 +111,7 @@ class SchedulingAgent {
   std::shared_ptr<Configure> configure_;
 
   std::shared_ptr<core::Repository> repo_;
-  
+
   std::shared_ptr<core::Repository> flow_repo_;
 
   std::shared_ptr<core::ContentRepository> content_repo_;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/Site2SiteClientProtocol.h
----------------------------------------------------------------------
diff --git a/libminifi/include/Site2SiteClientProtocol.h b/libminifi/include/Site2SiteClientProtocol.h
index dcb551a..df974fb 100644
--- a/libminifi/include/Site2SiteClientProtocol.h
+++ b/libminifi/include/Site2SiteClientProtocol.h
@@ -174,29 +174,13 @@ typedef struct {
 } RespondCodeContext;
 
 // Respond Code Context
-static RespondCodeContext respondCodeContext[] = {
-    { RESERVED, "Reserved for Future Use", false },
-    { PROPERTIES_OK, "Properties OK", false },
-    { UNKNOWN_PROPERTY_NAME, "Unknown Property Name", true },
-    { ILLEGAL_PROPERTY_VALUE, "Illegal Property Value", true },
-    { MISSING_PROPERTY, "Missing Property", true },
-    { CONTINUE_TRANSACTION, "Continue Transaction", false },
-    { FINISH_TRANSACTION, "Finish Transaction", false },
-    { CONFIRM_TRANSACTION, "Confirm Transaction", true },
-    { TRANSACTION_FINISHED, "Transaction Finished", false },
-    { TRANSACTION_FINISHED_BUT_DESTINATION_FULL, "Transaction Finished But Destination is Full", false },
-    { CANCEL_TRANSACTION, "Cancel Transaction", true },
-    { BAD_CHECKSUM, "Bad Checksum", false },
-    { MORE_DATA, "More Data Exists", false },
-    { NO_MORE_DATA, "No More Data Exists", false },
-    { UNKNOWN_PORT, "Unknown Port", false },
-    { PORT_NOT_IN_VALID_STATE, "Port Not in a Valid State", true },
-    { PORTS_DESTINATION_FULL, "Port's Destination is Full", false },
-    { UNAUTHORIZED, "User Not Authorized", true },
-    { ABORT, "Abort", true },
-    { UNRECOGNIZED_RESPONSE_CODE, "Unrecognized Response Code", false },
-    { END_OF_STREAM, "End of Stream", false }
-};
+static RespondCodeContext respondCodeContext[] = { { RESERVED, "Reserved for Future Use", false }, { PROPERTIES_OK, "Properties OK", false }, { UNKNOWN_PROPERTY_NAME, "Unknown Property Name", true },
+    { ILLEGAL_PROPERTY_VALUE, "Illegal Property Value", true }, { MISSING_PROPERTY, "Missing Property", true }, { CONTINUE_TRANSACTION, "Continue Transaction", false }, { FINISH_TRANSACTION,
+        "Finish Transaction", false }, { CONFIRM_TRANSACTION, "Confirm Transaction", true }, { TRANSACTION_FINISHED, "Transaction Finished", false }, { TRANSACTION_FINISHED_BUT_DESTINATION_FULL,
+        "Transaction Finished But Destination is Full", false }, { CANCEL_TRANSACTION, "Cancel Transaction", true }, { BAD_CHECKSUM, "Bad Checksum", false }, { MORE_DATA, "More Data Exists", false },
+    { NO_MORE_DATA, "No More Data Exists", false }, { UNKNOWN_PORT, "Unknown Port", false }, { PORT_NOT_IN_VALID_STATE, "Port Not in a Valid State", true }, { PORTS_DESTINATION_FULL,
+        "Port's Destination is Full", false }, { UNAUTHORIZED, "User Not Authorized", true }, { ABORT, "Abort", true }, { UNRECOGNIZED_RESPONSE_CODE, "Unrecognized Response Code", false }, {
+        END_OF_STREAM, "End of Stream", false } };
 
 // Respond Code Sequence Pattern
 static const uint8_t CODE_SEQUENCE_VALUE_1 = (uint8_t) 'R';
@@ -246,47 +230,47 @@ typedef enum {
 
 // HandShakeProperty Str
 static const char *HandShakePropertyStr[MAX_HANDSHAKE_PROPERTY] = {
-    /**
-     * Boolean value indicating whether or not the contents of a FlowFile should
-     * be GZipped when transferred.
-     */
-    "GZIP",
-    /**
-     * The unique identifier of the port to communicate with
-     */
-    "PORT_IDENTIFIER",
-    /**
-     * Indicates the number of milliseconds after the request was made that the
-     * client will wait for a response. If no response has been received by the
-     * time this value expires, the server can move on without attempting to
-     * service the request because the client will have already disconnected.
-     */
-    "REQUEST_EXPIRATION_MILLIS",
-    /**
-     * The preferred number of FlowFiles that the server should send to the
-     * client when pulling data. This property was introduced in version 5 of
-     * the protocol.
-     */
-    "BATCH_COUNT",
-    /**
-     * The preferred number of bytes that the server should send to the client
-     * when pulling data. This property was introduced in version 5 of the
-     * protocol.
-     */
-    "BATCH_SIZE",
-    /**
-     * The preferred amount of time that the server should send data to the
-     * client when pulling data. This property was introduced in version 5 of
-     * the protocol. Value is in milliseconds.
-     */
-    "BATCH_DURATION" };
+/**
+ * Boolean value indicating whether or not the contents of a FlowFile should
+ * be GZipped when transferred.
+ */
+"GZIP",
+/**
+ * The unique identifier of the port to communicate with
+ */
+"PORT_IDENTIFIER",
+/**
+ * Indicates the number of milliseconds after the request was made that the
+ * client will wait for a response. If no response has been received by the
+ * time this value expires, the server can move on without attempting to
+ * service the request because the client will have already disconnected.
+ */
+"REQUEST_EXPIRATION_MILLIS",
+/**
+ * The preferred number of FlowFiles that the server should send to the
+ * client when pulling data. This property was introduced in version 5 of
+ * the protocol.
+ */
+"BATCH_COUNT",
+/**
+ * The preferred number of bytes that the server should send to the client
+ * when pulling data. This property was introduced in version 5 of the
+ * protocol.
+ */
+"BATCH_SIZE",
+/**
+ * The preferred amount of time that the server should send data to the
+ * client when pulling data. This property was introduced in version 5 of
+ * the protocol. Value is in milliseconds.
+ */
+"BATCH_DURATION" };
 
 class Site2SiteClientProtocol;
 
 // Transaction Class
 class Transaction {
   friend class Site2SiteClientProtocol;
-   public:
+ public:
   // Constructor
   /*!
    * Create a new transaction
@@ -389,13 +373,12 @@ class DataPacket {
 };
 
 /**
-  * Site2Site Peer
-  */
- typedef struct Site2SitePeerStatus {
-   std::string host_;
-   int port_;
-   bool isSecure_;
- } Site2SitePeerStatus;
+ * Site2Site Peer
+ */
+typedef struct Site2SitePeerStatus {
+  std::string host_;
+  int port_;bool isSecure_;
+} Site2SitePeerStatus;
 
 // Site2SiteClientProtocol Class
 class Site2SiteClientProtocol {
@@ -525,7 +508,7 @@ class Site2SiteClientProtocol {
   bool receive(std::string transactionID, DataPacket *packet, bool &eof);
   // Send the data packet from the transaction
   // Return false when any error occurs
-  bool send(std::string transactionID, DataPacket *packet, std::shared_ptr<FlowFileRecord> flowFile, core::ProcessSession *session);
+  int16_t send(std::string transactionID, DataPacket *packet, std::shared_ptr<FlowFileRecord> flowFile, core::ProcessSession *session);
   // Confirm the data that was sent or received by comparing CRC32's of the data sent and the data received.
   bool confirm(std::string transactionID);
   // Cancel the transaction

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/Site2SitePeer.h
----------------------------------------------------------------------
diff --git a/libminifi/include/Site2SitePeer.h b/libminifi/include/Site2SitePeer.h
index 65a5479..59f5cfe 100644
--- a/libminifi/include/Site2SitePeer.h
+++ b/libminifi/include/Site2SitePeer.h
@@ -78,7 +78,7 @@ class Site2SitePeer : public org::apache::nifi::minifi::io::BaseStream {
     _url = std::move(ss._url);
   }
   // Destructor
-  virtual ~Site2SitePeer() {
+  ~Site2SitePeer() {
     Close();
   }
   // Set Processor yield period in MilliSecond

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/ThreadedSchedulingAgent.h
----------------------------------------------------------------------
diff --git a/libminifi/include/ThreadedSchedulingAgent.h b/libminifi/include/ThreadedSchedulingAgent.h
index 27b8b3a..d9fc7be 100644
--- a/libminifi/include/ThreadedSchedulingAgent.h
+++ b/libminifi/include/ThreadedSchedulingAgent.h
@@ -84,14 +84,12 @@ class ThreadedSchedulingAgent : public SchedulingAgent {
   /*!
    * Create a new threaded scheduling agent.
    */
-  ThreadedSchedulingAgent(std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider, std::shared_ptr<core::Repository> repo,
-                          std::shared_ptr<core::Repository> flow_repo,
-                          std::shared_ptr<core::ContentRepository> content_repo,
-                          std::shared_ptr<Configure> configuration)
+  ThreadedSchedulingAgent(std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider, std::shared_ptr<core::Repository> repo, std::shared_ptr<core::Repository> flow_repo,
+                          std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<Configure> configuration)
       : SchedulingAgent(controller_service_provider, repo, flow_repo, content_repo, configuration),
         logger_(logging::LoggerFactory<ThreadedSchedulingAgent>::getLogger()) {
 
-    utils::ThreadPool<uint64_t> pool = utils::ThreadPool<uint64_t>(configure_->getInt(Configure::nifi_flow_engine_threads, 8), true);
+    utils::ThreadPool<uint64_t> pool = utils::ThreadPool<uint64_t>(configure_->getInt(Configure::nifi_flow_engine_threads, 2), true);
     thread_pool_ = std::move(pool);
     thread_pool_.start();
 
@@ -101,7 +99,8 @@ class ThreadedSchedulingAgent : public SchedulingAgent {
   }
 
   // Run function for the thread
-  virtual uint64_t run(std::shared_ptr<core::Processor> processor, core::ProcessContext *processContext, core::ProcessSessionFactory *sessionFactory) = 0;
+  virtual uint64_t run(const std::shared_ptr<core::Processor> &processor, const std::shared_ptr<core::ProcessContext> &processContext,
+                       const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) = 0;
 
  public:
   // schedule, overwritten by different DrivenTimerDrivenSchedulingAgent
@@ -110,12 +109,11 @@ class ThreadedSchedulingAgent : public SchedulingAgent {
   virtual void unschedule(std::shared_ptr<core::Processor> processor);
 
   virtual void stop();
-   protected:
+ protected:
   utils::ThreadPool<uint64_t> thread_pool_;
 
  protected:
 
-
  private:
   // Prevent default copy constructor and assignment operation
   // Only support pass by reference or pointer

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/TimerDrivenSchedulingAgent.h
----------------------------------------------------------------------
diff --git a/libminifi/include/TimerDrivenSchedulingAgent.h b/libminifi/include/TimerDrivenSchedulingAgent.h
index 1502c47..8398b3a 100644
--- a/libminifi/include/TimerDrivenSchedulingAgent.h
+++ b/libminifi/include/TimerDrivenSchedulingAgent.h
@@ -38,10 +38,9 @@ class TimerDrivenSchedulingAgent : public ThreadedSchedulingAgent {
    * Create a new processor
    */
   TimerDrivenSchedulingAgent(std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider, std::shared_ptr<core::Repository> repo,
-                             std::shared_ptr<core::Repository> flow_repo,
-                             std::shared_ptr<core::ContentRepository> content_repo,
-                             std::shared_ptr<Configure> configure)
-      : ThreadedSchedulingAgent(controller_service_provider, repo, flow_repo, content_repo, configure) {
+                             std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<Configure> configure)
+      : ThreadedSchedulingAgent(controller_service_provider, repo, flow_repo, content_repo, configure),
+        logger_(logging::LoggerFactory<TimerDrivenSchedulingAgent>::getLogger()) {
   }
   //  Destructor
   virtual ~TimerDrivenSchedulingAgent() {
@@ -49,7 +48,7 @@ class TimerDrivenSchedulingAgent : public ThreadedSchedulingAgent {
   /**
    * Run function that accepts the processor, context and session factory.
    */
-  uint64_t run(std::shared_ptr<core::Processor> processor, core::ProcessContext *processContext, core::ProcessSessionFactory *sessionFactory);
+  uint64_t run(const std::shared_ptr<core::Processor> &processor, const std::shared_ptr<core::ProcessContext> &processContext, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory);
 
  private:
   // Prevent default copy constructor and assignment operation
@@ -57,6 +56,7 @@ class TimerDrivenSchedulingAgent : public ThreadedSchedulingAgent {
   TimerDrivenSchedulingAgent(const TimerDrivenSchedulingAgent &parent);
   TimerDrivenSchedulingAgent &operator=(const TimerDrivenSchedulingAgent &parent);
 
+  std::shared_ptr<logging::Logger> logger_;
 };
 
 } /* namespace minifi */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/c2/C2Agent.h
----------------------------------------------------------------------
diff --git a/libminifi/include/c2/C2Agent.h b/libminifi/include/c2/C2Agent.h
new file mode 100644
index 0000000..8c2e45d
--- /dev/null
+++ b/libminifi/include/c2/C2Agent.h
@@ -0,0 +1,200 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_C2_C2AGENT_H_
+#define LIBMINIFI_INCLUDE_C2_C2AGENT_H_
+
+#include <utility>
+#include <functional>
+#include <future>
+#include <memory>
+#include <mutex>
+#include <thread>
+#include "core/state/UpdateController.h"
+#include "core/state/metrics/MetricsBase.h"
+#include "C2Payload.h"
+#include "C2Protocol.h"
+#include "io/validation.h"
+#include "protocols/Protocols.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace c2 {
+
+/**
+ * Purpose and Justification: C2 agent will be the mechanism that will abstract the protocol to do the work.
+ *
+ * The protocol represents a transformation layer into the objects seen in C2Payload. That transformation may
+ * be minimal or extreme, depending on the protocol itself.
+ *
+ * Metrics Classes defined here:
+ *
+ *   0 HeartBeat --  RESERVED
+ *   1-255 Defined by the configuration file.
+ */
+class C2Agent : public state::UpdateController, public state::metrics::MetricsSink, public std::enable_shared_from_this<C2Agent> {
+ public:
+
+  C2Agent(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<state::StateMonitor> &updateSink, const std::shared_ptr<Configure> &configure);
+
+  virtual ~C2Agent() {
+
+  }
+
+  /**
+   * Sends the heartbeat to ths server. Will include metrics
+   * in the payload if they exist.
+   */
+  void performHeartBeat();
+
+  virtual std::vector<std::function<state::Update()>> getFunctions() {
+    return functions_;
+  }
+
+  /**
+   * Sets the metric within this sink
+   * @param metric metric to set
+   * @param return 0 on success, -1 on failure.
+   */
+  virtual int16_t setMetrics(const std::shared_ptr<state::metrics::Metrics> &metric);
+
+ protected:
+
+  /**
+   * Configure the C2 agent
+   */
+  void configure(const std::shared_ptr<Configure> &configure, bool reconfigure = true);
+
+  /**
+   * Serializes metrics into a payload.
+   * @parem parent_paylaod parent payload into which we insert the newly generated payload.
+   * @param name name of this metric
+   * @param metrics metrics to include.
+   */
+  void serializeMetrics(C2Payload &parent_payload, const std::string &name, const std::vector<state::metrics::MetricResponse> &metrics);
+
+  /**
+   * Extract the payload
+   * @param resp payload to be moved into the function.
+   */
+  void extractPayload(const C2Payload &&resp);
+
+  /**
+   * Extract the payload
+   * @param payload reference.
+   */
+  void extractPayload(const C2Payload &resp);
+
+  /**
+   * Enqueues a C2 server response for us to evaluate and parse.
+   */
+  void enqueue_c2_server_response(C2Payload &&resp) {
+    std::lock_guard<std::timed_mutex> lock(queue_mutex);
+    responses.push_back(std::move(resp));
+  }
+
+  /**
+   * Enqueues a c2 payload for a response to the C2 server.
+   */
+  void enqueue_c2_response(C2Payload &&resp) {
+    std::lock_guard<std::timed_mutex> lock(request_mutex);
+    requests.push_back(std::move(resp));
+  }
+
+  /**
+   * Handles a C2 event requested by the server.
+   * @param resp c2 server response.
+   */
+  void handle_c2_server_response(const C2ContentResponse &resp);
+
+  /**
+   * Handles an update request
+   * @param C2ContentResponse response
+   */
+  void handle_update(const C2ContentResponse &resp);
+
+  /**
+   * Handles a description request
+   */
+  void handle_describe(const C2ContentResponse &resp);
+
+  std::timed_mutex metrics_mutex_;
+  std::map<std::string, std::shared_ptr<state::metrics::Metrics>> metrics_map_;
+
+  /**
+   * Device information stored in the metrics format
+   */
+  std::map<std::string, std::shared_ptr<state::metrics::Metrics>> device_information_;
+  // queue mutex
+  std::timed_mutex queue_mutex;
+
+  // queue mutex
+  std::timed_mutex request_mutex;
+
+  // responses for the the C2 agent.
+  std::vector<C2Payload> responses;
+
+  // requests that originate from the C2 server.
+  std::vector<C2Payload> requests;
+
+  // heart beat period.
+  int64_t heart_beat_period_;
+
+  // maximum number of queued messages to send to the c2 server
+  int16_t max_c2_responses;
+
+  // time point the last time we performed a heartbeat.
+  std::chrono::steady_clock::time_point last_run_;
+
+  // function that performs the heartbeat
+  std::function<state::Update()> c2_producer_;
+
+  // function that acts upon the
+  std::function<state::Update()> c2_consumer_;
+
+  // reference to the update sink, against which we will execute updates.
+  std::shared_ptr<state::StateMonitor> update_sink_;
+
+  // functions that will be used for the udpate controller.
+  std::vector<std::function<state::Update()>> functions_;
+
+  // controller service provider refernece.
+  std::shared_ptr<core::controller::ControllerServiceProvider> controller_;
+
+  std::shared_ptr<Configure> configuration_;
+
+  std::shared_ptr<Configure> running_configuration;
+
+  std::mutex heartbeat_mutex;
+
+  std::vector<std::shared_ptr<HeartBeatReporter>> heartbeat_protocols_;
+
+  std::atomic<C2Protocol*> protocol_;
+
+  std::shared_ptr<logging::Logger> logger_;
+}
+;
+
+} /* namesapce c2 */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_C2_C2AGENT_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/c2/C2Payload.h
----------------------------------------------------------------------
diff --git a/libminifi/include/c2/C2Payload.h b/libminifi/include/c2/C2Payload.h
new file mode 100644
index 0000000..ca14584
--- /dev/null
+++ b/libminifi/include/c2/C2Payload.h
@@ -0,0 +1,192 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_C2_C2PAYLOAD_H_
+#define LIBMINIFI_INCLUDE_C2_C2PAYLOAD_H_
+
+#include <memory>
+#include <string>
+#include <map>
+#include "core/state/UpdateController.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace c2 {
+
+enum Operation {
+  ACKNOWLEDGE,
+  START,
+  STOP,
+  RESTART,
+  DESCRIBE,
+  HEARTBEAT,
+  UPDATE,
+  VALIDATE,
+  CLEAR
+};
+
+enum Direction {
+  TRANSMIT,
+  RECEIVE
+};
+
+class C2ContentResponse {
+ public:
+  C2ContentResponse(Operation op);
+
+  C2ContentResponse(const C2ContentResponse &other);
+
+  C2ContentResponse(const C2ContentResponse &&other);
+
+  C2ContentResponse & operator=(const C2ContentResponse &&other);
+
+  C2ContentResponse & operator=(const C2ContentResponse &other);
+
+  Operation op;
+  // determines if the operation is required
+  bool required;
+  // identifier
+  std::string ident;
+  // delay before running
+  uint32_t delay;
+  // max time before this response will no longer be honored.
+  uint64_t ttl;
+  // name applied to commands
+  std::string name;
+  // commands that correspond with the operation.
+  std::map<std::string, std::string> operation_arguments;
+//  std::vector<std::string> content;
+};
+
+/**
+ * C2Payload is an update for the state manager.
+ * Note that the payload can either consist of other payloads or
+ * have content directly within it, represented by C2ContentResponse objects, above.
+ *
+ * Payloads can also contain raw data, which can be binary data.
+ */
+class C2Payload : public state::Update {
+ public:
+  virtual ~C2Payload() {
+
+  }
+
+  C2Payload(Operation op, std::string identifier, bool resp = false, bool isRaw = false);
+
+  C2Payload(Operation op, bool resp = false, bool isRaw = false);
+
+  C2Payload(Operation op, state::UpdateState state, bool resp = false, bool isRaw = false);
+
+  C2Payload(const C2Payload &other);
+
+  C2Payload(const C2Payload &&other);
+
+  void setIdentifier(const std::string &ident);
+
+  std::string getIdentifier() const;
+
+  void setLabel(const std::string label) {
+    label_ = label;
+  }
+
+  std::string getLabel() const {
+    return label_;
+  }
+
+  /**
+   * Gets the operation for this payload. May be nested or a single operation.
+   */
+  Operation getOperation() const;
+
+  /**
+   * Validate the payload, if necessary and/or possible.
+   */
+  virtual bool validate();
+
+  /**
+   * Get content responses from this payload.
+   */
+  const std::vector<C2ContentResponse> &getContent() const;
+
+  /**
+   * Add a content response to this payload.
+   */
+  void addContent(const C2ContentResponse &&content);
+
+  /**
+   * Determines if this object contains raw data.
+   */
+  bool isRaw() const;
+
+  /**
+   * Sets raw data within this object.
+   */
+  void setRawData(const std::string &data);
+
+  /**
+   * Sets raw data from a vector within this object.
+   */
+  void setRawData(const std::vector<char> &data);
+
+  /**
+   * Returns raw data.
+   */
+  std::string getRawData() const;
+
+  /**
+   * Add a nested payload.
+   * @param payload payload to move into this object.
+   */
+  void addPayload(const C2Payload &&payload);
+  /**
+   * Get nested payloads.
+   */
+  const std::vector<C2Payload> &getNestedPayloads() const;
+
+  C2Payload &operator=(const C2Payload &&other);
+  C2Payload &operator=(const C2Payload &other);
+
+ protected:
+
+  // identifier for this payload.
+  std::string ident_;
+
+  std::string label_;
+
+  std::vector<C2Payload> payloads_;
+
+  std::vector<C2ContentResponse> content_;
+
+  Operation op_;
+
+  bool raw_;
+
+  std::string raw_data_;
+
+  bool isResponse;
+
+};
+
+} /* namesapce c2 */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_C2_C2PAYLOAD_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/c2/C2Protocol.h
----------------------------------------------------------------------
diff --git a/libminifi/include/c2/C2Protocol.h b/libminifi/include/c2/C2Protocol.h
new file mode 100644
index 0000000..683a486
--- /dev/null
+++ b/libminifi/include/c2/C2Protocol.h
@@ -0,0 +1,119 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_C2_C2PROTOCOL_H_
+#define LIBMINIFI_INCLUDE_C2_C2PROTOCOL_H_
+
+#include "C2Payload.h"
+#include "core/controller/ControllerServiceProvider.h"
+#include "properties/Configure.h"
+#include "core/Connectable.h"
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace c2 {
+
+/**
+ * Defines a protocol to perform state management of the minifi agent.
+ */
+class C2Protocol : public core::Connectable {
+ public:
+
+  C2Protocol(std::string name, uuid_t uuid)
+      : core::Connectable(name, uuid),
+        running_(true) {
+
+  }
+
+  virtual void initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<Configure> &configure) {
+    controller_ = controller;
+    configuration_ = configure;
+  }
+  virtual ~C2Protocol() {
+
+  }
+
+  /**
+   * Update the configuration.
+   */
+  virtual void update(const std::shared_ptr<Configure> &configure)=0;
+
+  /**
+   * Send a C2 payload to the provided URI. The direction indicates to the protocol whether or not this a transmit or receive operation.
+   * Depending on the protocol this may mean different things.
+   *
+   * @param url url.
+   * @param operation payload to perform and/or send
+   * @param direction direction of the C2 operation.
+   * @param async whether or not this is an asynchronous operation
+   * @return payload from the response or a response to come back in the face of an asynchronous operation.
+   */
+  virtual C2Payload consumePayload(const std::string &url, const C2Payload &operation, Direction direction = TRANSMIT, bool async = false) = 0;
+
+  /**
+   * Send a C2 payload . The direction indicates to the protocol whether or not this a transmit or receive operation.
+   * Depending on the protocol this may mean different things.
+   *
+   * @param operation payload to perform and/or send
+   * @param direction direction of the C2 operation.
+   * @param async whether or not this is an asynchronous operation
+   * @return payload from the response or a response to come back in the face of an asynchronous operation.
+   */
+  virtual C2Payload consumePayload(const C2Payload &operation, Direction direction = TRANSMIT, bool async = false) = 0;
+
+  /**
+   * Determines if we are connected and operating
+   */
+  virtual bool isRunning() {
+    return running_.load();
+  }
+
+  /**
+   * Block until work is available on any input connection, or the given duration elapses
+   * @param timeoutMs timeout in milliseconds
+   */
+  void waitForWork(uint64_t timeoutMs);
+
+  virtual void yield() {
+
+  }
+
+  /**
+   * Determines if work is available by this connectable
+   * @return boolean if work is available.
+   */
+  virtual bool isWorkAvailable() {
+    return true;
+  }
+
+ protected:
+
+  std::atomic<bool> running_;
+
+  std::shared_ptr<core::controller::ControllerServiceProvider> controller_;
+
+  std::shared_ptr<Configure> configuration_;
+};
+
+} /* namesapce c2 */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_C2_C2PROTOCOL_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/c2/HeartBeatReporter.h
----------------------------------------------------------------------
diff --git a/libminifi/include/c2/HeartBeatReporter.h b/libminifi/include/c2/HeartBeatReporter.h
new file mode 100644
index 0000000..3d0fd49
--- /dev/null
+++ b/libminifi/include/c2/HeartBeatReporter.h
@@ -0,0 +1,101 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_C2_HEARTBEATREPORTER_H_
+#define LIBMINIFI_INCLUDE_C2_HEARTBEATREPORTER_H_
+
+#include "C2Protocol.h"
+#include "C2Payload.h"
+#include "core/controller/ControllerServiceProvider.h"
+#include "properties/Configure.h"
+#include "core/Connectable.h"
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace c2 {
+
+/**
+ * Defines a heart beat reporting interface. Note that this differs from
+ * C2Protocol as heartbeats can be any interface which provides only one way communication.
+ */
+class HeartBeatReporter : public core::Connectable {
+ public:
+
+  HeartBeatReporter(std::string name, uuid_t uuid)
+      : core::Connectable(name, uuid),
+        controller_(nullptr),
+        configuration_(nullptr) {
+  }
+
+  virtual void initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<Configure> &configure) {
+    controller_ = controller;
+    configuration_ = configure;
+  }
+  virtual ~HeartBeatReporter() {
+  }
+  /**
+   * Send a C2 payloadd to the provided URI. The direction indicates to the protocol whether or not this a transmit or receive operation.
+   * Depending on the protocol this may mean different things.
+   *
+   * @param url url.
+   * @param operation payload to perform and/or send
+   * @param direction direction of the C2 operation.
+   * @param async whether or not this is an asynchronous operation
+   * @return result of the heartbeat operation
+   */
+  virtual int16_t heartbeat(const C2Payload &heartbeat) = 0;
+
+  /**
+   * Determines if we are connected and operating
+   */
+  virtual bool isRunning() {
+    return true;
+  }
+
+  /**
+   * Block until work is available on any input connection, or the given duration elapses
+   * @param timeoutMs timeout in milliseconds
+   */
+  void waitForWork(uint64_t timeoutMs);
+
+  virtual void yield() {
+
+  }
+
+  /**
+   * Determines if work is available by this connectable
+   * @return boolean if work is available.
+   */
+  virtual bool isWorkAvailable() {
+    return true;
+  }
+
+ protected:
+
+  std::shared_ptr<core::controller::ControllerServiceProvider> controller_;
+
+  std::shared_ptr<Configure> configuration_;
+};
+
+} /* namesapce c2 */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_C2_HEARTBEATREPORTER_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/c2/protocols/Protocols.h
----------------------------------------------------------------------
diff --git a/libminifi/include/c2/protocols/Protocols.h b/libminifi/include/c2/protocols/Protocols.h
new file mode 100644
index 0000000..c4c314f
--- /dev/null
+++ b/libminifi/include/c2/protocols/Protocols.h
@@ -0,0 +1,24 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_C2_PROTOCOLS_H_
+#define LIBMINIFI_INCLUDE_C2_PROTOCOLS_H_
+
+#include "RESTReceiver.h"
+#include "RESTSender.h"
+
+#endif /* LIBMINIFI_INCLUDE_C2_RESTPROTOCOL_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/c2/protocols/RESTProtocol.h
----------------------------------------------------------------------
diff --git a/libminifi/include/c2/protocols/RESTProtocol.h b/libminifi/include/c2/protocols/RESTProtocol.h
new file mode 100644
index 0000000..2978a03
--- /dev/null
+++ b/libminifi/include/c2/protocols/RESTProtocol.h
@@ -0,0 +1,74 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_C2_PROTOCOLS_RESTPROTOCOL_H_
+#define LIBMINIFI_INCLUDE_C2_PROTOCOLS_RESTPROTOCOL_H_
+
+#include "json/json.h"
+#include "json/writer.h"
+#include <string>
+#include <mutex>
+#include "CivetServer.h"
+#include "../C2Protocol.h"
+#include "../HeartBeatReporter.h"
+#include "controllers/SSLContextService.h"
+#include "utils/ByteInputCallBack.h"
+#include "utils/HTTPClient.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace c2 {
+
+/**
+ * Purpose and Justification: Encapsulates the restful protocol that is built upon C2Protocol.
+ *
+ * The external interfaces rely solely on send, where send includes a Direction. Transmit will perform a POST
+ * and RECEIVE will perform a GET. This does not mean we can't receive on a POST; however, since Direction
+ * will encompass other protocols the context of its meaning here simply translates into POST and GET respectively.
+ *
+ */
+class RESTProtocol {
+ public:
+  RESTProtocol() {
+
+  }
+
+  virtual ~RESTProtocol() {
+
+  }
+
+ protected:
+
+  virtual Json::Value serializeJsonPayload(Json::Value &json_root, const C2Payload &payload);
+
+  virtual const C2Payload parseJsonResponse(const C2Payload &payload, const std::vector<char> &response);
+
+  virtual std::string getOperation(const C2Payload &payload);
+
+  virtual Operation stringToOperation(const std::string str);
+
+};
+
+} /* namesapce c2 */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_C2_PROTOCOLS_RESTOPERATIONS_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/c2/protocols/RESTReceiver.h
----------------------------------------------------------------------
diff --git a/libminifi/include/c2/protocols/RESTReceiver.h b/libminifi/include/c2/protocols/RESTReceiver.h
new file mode 100644
index 0000000..17b5028
--- /dev/null
+++ b/libminifi/include/c2/protocols/RESTReceiver.h
@@ -0,0 +1,110 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_C2_RESTRCVR_H_
+#define LIBMINIFI_INCLUDE_C2_RESTRCVR_H_
+
+#include "RESTSender.h"
+#include "json/json.h"
+#include "json/writer.h"
+#include <string>
+#include <mutex>
+#include "core/Resource.h"
+#include "RESTProtocol.h"
+#include "CivetServer.h"
+#include "../C2Protocol.h"
+#include "controllers/SSLContextService.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace c2 {
+
+int log_message(const struct mg_connection *conn, const char *message);
+
+int ssl_protocol_en(void *ssl_context, void *user_data);
+
+/**
+ * Purpose and Justification: Encapsulates the restful protocol that is built upon C2Protocol.
+ *
+ * The external interfaces rely solely on send, where send includes a Direction. Transmit will perform a POST
+ * and RECEIVE will perform a GET. This does not mean we can't receive on a POST; however, since Direction
+ * will encompass other protocols the context of its meaning here simply translates into POST and GET respectively.
+ *
+ */
+class RESTReceiver : public RESTProtocol, public HeartBeatReporter {
+ public:
+  RESTReceiver(std::string name, uuid_t uuid = nullptr);
+
+  void initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<Configure> &configure);
+  virtual int16_t heartbeat(const C2Payload &heartbeat);
+
+ protected:
+
+  class ListeningProtocol : public CivetHandler {
+
+   public:
+    ListeningProtocol() {
+
+    }
+
+    bool handleGet(CivetServer *server, struct mg_connection *conn) {
+      std::string currentvalue;
+      {
+        std::lock_guard<std::mutex> lock(reponse_mutex_);
+        currentvalue = resp_;
+      }
+
+      mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
+                "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
+                currentvalue.length());
+      mg_printf(conn, "%s", currentvalue.c_str());
+      return true;
+    }
+
+    void setResponse(std::string response) {
+      std::lock_guard<std::mutex> lock(reponse_mutex_);
+      resp_ = response;
+    }
+
+   protected:
+    std::mutex reponse_mutex_;
+    std::string resp_;
+
+  };
+
+  std::unique_ptr<CivetServer> start_webserver(const std::string &port, std::string &rooturi, CivetHandler *handler, std::string &ca_cert);
+
+  std::unique_ptr<CivetServer> start_webserver(const std::string &port, std::string &rooturi, CivetHandler *handler);
+
+  std::unique_ptr<CivetServer> listener;
+  std::unique_ptr<ListeningProtocol> handler;
+
+ private:
+  std::shared_ptr<logging::Logger> logger_;
+};
+
+REGISTER_RESOURCE(RESTReceiver);
+
+} /* namesapce c2 */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_C2_RESTRCVR_H_ */


Mime
View raw message