MINIFI-339: Add C2 base allowing for 1 protocol and n heartbeat reporters
MINIFI-339: Add GetTCP Processor
MINIFI-339: Add listener server
MINIFI-339: Update to listener
MINIFI-339: Resolve Issue with stack based processor nodes losing scope
MINIFI-369: Update ListenHTTP processor to allow transfer encoding
MINIFI-339: Update rest receiver instantiation and fix issue found in GetFile
MINIFI-339: Rename content to operational arguments
MINIFI-371: remove virtual destructors when not needed
MINIFI-339: Fixing issues with GetTCP
MINIFI-378: Resolve issues with shutdown. Took the approach to call notifyStop at the destructor to avoid larger changes
MINIFI-339: Allow C2 to be disabled
This closes #134.
Signed-off-by: Aldrin Piri <aldrin@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/9f161a27
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/9f161a27
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/9f161a27
Branch: refs/heads/master
Commit: 9f161a27e5fa0ea9aac4d59d1c23edbad3d77858
Parents: f5832fa
Author: Marc Parisi <phrocker@apache.org>
Authored: Wed Jun 21 10:47:35 2017 -0400
Committer: Aldrin Piri <aldrin@apache.org>
Committed: Mon Oct 2 10:56:07 2017 -0400
----------------------------------------------------------------------
CMakeLists.txt | 2 +
README.md | 24 +
cmake/BuildTests.cmake | 23 +-
libminifi/CMakeLists.txt | 4 +-
libminifi/include/ConfigurationListener.h | 117 -----
libminifi/include/Connection.h | 6 +-
libminifi/include/EventDrivenSchedulingAgent.h | 2 +-
libminifi/include/FlowController.h | 92 +++-
libminifi/include/FlowFileRecord.h | 1 -
libminifi/include/HttpConfigurationListener.h | 76 ---
libminifi/include/RemoteProcessorGroupPort.h | 22 +-
libminifi/include/ResourceClaim.h | 29 +-
libminifi/include/SchedulingAgent.h | 17 +-
libminifi/include/Site2SiteClientProtocol.h | 115 ++---
libminifi/include/Site2SitePeer.h | 2 +-
libminifi/include/ThreadedSchedulingAgent.h | 14 +-
libminifi/include/TimerDrivenSchedulingAgent.h | 10 +-
libminifi/include/c2/C2Agent.h | 200 ++++++++
libminifi/include/c2/C2Payload.h | 192 ++++++++
libminifi/include/c2/C2Protocol.h | 119 +++++
libminifi/include/c2/HeartBeatReporter.h | 101 ++++
libminifi/include/c2/protocols/Protocols.h | 24 +
libminifi/include/c2/protocols/RESTProtocol.h | 74 +++
libminifi/include/c2/protocols/RESTReceiver.h | 110 +++++
libminifi/include/c2/protocols/RESTSender.h | 80 +++
.../include/controllers/SSLContextService.h | 38 +-
libminifi/include/core/ClassLoader.h | 72 +++
libminifi/include/core/ConfigurationFactory.h | 15 +-
libminifi/include/core/ContentRepository.h | 50 ++
libminifi/include/core/FlowConfiguration.h | 6 +-
libminifi/include/core/ProcessContext.h | 34 +-
libminifi/include/core/ProcessGroup.h | 2 +
libminifi/include/core/ProcessSession.h | 13 +-
libminifi/include/core/ProcessSessionFactory.h | 6 +-
libminifi/include/core/Processor.h | 25 +-
libminifi/include/core/ProcessorNode.h | 10 +-
libminifi/include/core/Property.h | 5 +
libminifi/include/core/Relationship.h | 2 +-
libminifi/include/core/Repository.h | 9 +-
libminifi/include/core/StreamManager.h | 13 +
.../core/controller/ControllerServiceProvider.h | 5 +-
.../StandardControllerServiceProvider.h | 20 +-
libminifi/include/core/logging/Logger.h | 2 +-
.../include/core/logging/LoggerConfiguration.h | 7 +-
.../SiteToSiteProvenanceReportingTask.h | 2 +-
.../include/core/repository/AtomicRepoEntries.h | 97 ++--
.../core/repository/FileSystemRepository.h | 2 +
.../core/repository/FlowFileRepository.h | 24 +-
.../core/repository/VolatileContentRepository.h | 8 +-
.../repository/VolatileFlowFileRepository.h | 9 +-
.../repository/VolatileProvenanceRepository.h | 3 +-
.../core/repository/VolatileRepository.h | 7 +-
.../include/core/state/ProcessorController.h | 73 +++
libminifi/include/core/state/StateManager.h | 126 +++++
libminifi/include/core/state/UpdateController.h | 252 ++++++++++
.../core/state/metrics/DeviceInformation.h | 319 ++++++++++++
.../include/core/state/metrics/MetricsBase.h | 161 ++++++
.../core/state/metrics/MetricsListener.h | 128 +++++
.../include/core/state/metrics/ProcessMetrics.h | 102 ++++
.../include/core/state/metrics/QueueMetrics.h | 106 ++++
.../core/state/metrics/RepositoryMetrics.h | 101 ++++
.../include/core/state/metrics/SystemMetrics.h | 109 +++++
libminifi/include/core/yaml/YamlConfiguration.h | 4 +-
libminifi/include/io/AtomicEntryStream.h | 18 +-
libminifi/include/io/BaseStream.h | 2 +-
libminifi/include/io/CRCStream.h | 2 +-
libminifi/include/io/ClientSocket.h | 30 +-
libminifi/include/io/EndianCheck.h | 2 +-
libminifi/include/io/FileStream.h | 2 +-
libminifi/include/processors/GetFile.h | 64 ++-
libminifi/include/processors/GetTCP.h | 288 +++++++++++
libminifi/include/processors/InvokeHTTP.h | 55 +--
libminifi/include/processors/LoadProcessors.h | 1 +
libminifi/include/processors/PutFile.h | 3 +-
libminifi/include/properties/Configure.h | 3 +-
libminifi/include/properties/Properties.h | 8 +
libminifi/include/provenance/Provenance.h | 19 +-
.../include/provenance/ProvenanceRepository.h | 21 +-
libminifi/include/utils/ByteInputCallBack.h | 11 +-
libminifi/include/utils/HTTPClient.h | 301 ++++++++++++
libminifi/include/utils/HTTPUtils.h | 304 ------------
libminifi/include/utils/Id.h | 2 +-
libminifi/include/utils/StringUtils.h | 4 +-
libminifi/include/utils/ThreadPool.h | 53 +-
libminifi/src/CPPLINT.cfg | 3 +
libminifi/src/ConfigurationListener.cpp | 87 ----
libminifi/src/Configure.cpp | 1 +
libminifi/src/Connection.cpp | 22 +-
libminifi/src/EventDrivenSchedulingAgent.cpp | 3 +-
libminifi/src/FlowController.cpp | 290 +++++++++--
libminifi/src/FlowFileRecord.cpp | 10 +-
libminifi/src/HttpConfigurationListener.cpp | 102 ----
libminifi/src/Properties.cpp | 4 +-
libminifi/src/RemoteProcessorGroupPort.cpp | 169 +++----
libminifi/src/ResourceClaim.cpp | 9 +-
libminifi/src/SchedulingAgent.cpp | 22 +-
libminifi/src/Site2SiteClientProtocol.cpp | 256 +++++-----
libminifi/src/ThreadedSchedulingAgent.cpp | 20 +-
libminifi/src/TimerDrivenSchedulingAgent.cpp | 7 +-
libminifi/src/c2/C2Agent.cpp | 485 +++++++++++++++++++
libminifi/src/c2/C2Payload.cpp | 219 +++++++++
libminifi/src/c2/protocols/RESTProtocol.cpp | 177 +++++++
libminifi/src/c2/protocols/RESTReceiver.cpp | 148 ++++++
libminifi/src/c2/protocols/RESTSender.cpp | 144 ++++++
libminifi/src/controllers/SSLContextService.cpp | 19 +-
libminifi/src/core/ClassLoader.cpp | 6 +-
libminifi/src/core/ConfigurableComponent.cpp | 14 +-
libminifi/src/core/ConfigurationFactory.cpp | 8 +-
libminifi/src/core/Connectable.cpp | 12 +-
libminifi/src/core/FlowConfiguration.cpp | 9 +-
libminifi/src/core/FlowFile.cpp | 1 +
libminifi/src/core/ProcessGroup.cpp | 38 +-
libminifi/src/core/ProcessSession.cpp | 71 ++-
libminifi/src/core/ProcessSessionFactory.cpp | 4 +-
libminifi/src/core/Processor.cpp | 39 +-
libminifi/src/core/ProcessorNode.cpp | 8 +-
libminifi/src/core/Repository.cpp | 5 +-
libminifi/src/core/RepositoryFactory.cpp | 8 +-
.../StandardControllerServiceNode.cpp | 6 +-
.../src/core/logging/LoggerConfiguration.cpp | 16 +-
.../SiteToSiteProvenanceReportingTask.cpp | 6 +-
.../core/repository/FileSystemRepository.cpp | 5 +
.../src/core/repository/FlowFileRepository.cpp | 90 ++--
.../repository/VolatileContentRepository.cpp | 17 +
.../src/core/state/ProcessorController.cpp | 64 +++
libminifi/src/core/state/StateManager.cpp | 137 ++++++
libminifi/src/core/state/UpdateController.cpp | 76 +++
libminifi/src/io/ClientSocket.cpp | 49 +-
libminifi/src/io/FileStream.cpp | 1 +
libminifi/src/io/StreamFactory.cpp | 6 +-
libminifi/src/processors/ExecuteProcess.cpp | 6 +-
libminifi/src/processors/GenerateFlowFile.cpp | 4 +-
libminifi/src/processors/GetFile.cpp | 21 +-
libminifi/src/processors/GetTCP.cpp | 289 +++++++++++
libminifi/src/processors/InvokeHTTP.cpp | 190 +++-----
libminifi/src/processors/ListenHTTP.cpp | 10 +-
libminifi/src/processors/ListenSyslog.cpp | 2 +-
libminifi/src/processors/LogAttribute.cpp | 1 +
libminifi/src/processors/TailFile.cpp | 4 +-
libminifi/src/provenance/Provenance.cpp | 6 +-
.../src/provenance/ProvenanceRepository.cpp | 36 +-
libminifi/src/utils/HttpClient.cpp | 214 ++++++++
libminifi/src/utils/Id.cpp | 19 +-
libminifi/test/.device_id | 1 +
libminifi/test/TestBase.cpp | 16 +-
libminifi/test/TestBase.h | 14 +-
libminifi/test/TestServer.h | 54 +--
.../test/integration/C2NullConfiguration.cpp | 135 ++++++
libminifi/test/integration/C2UpdateTest.cpp | 185 +++++++
.../integration/C2VerifyHeartbeatAndStop.cpp | 153 ++++++
.../test/integration/C2VerifyServeResults.cpp | 131 +++++
.../ControllerServiceIntegrationTests.cpp | 33 +-
libminifi/test/integration/GetFileNoData.cpp | 184 +++++++
.../HttpConfigurationListenerTest.cpp | 131 -----
.../test/integration/HttpGetIntegrationTest.cpp | 45 +-
.../integration/HttpPostIntegrationTest.cpp | 127 +++--
libminifi/test/integration/IntegrationBase.h | 177 +++++++
.../integration/ProvenanceReportingTest.cpp | 1 +
.../test/integration/Site2SiteRestTest.cpp | 148 ------
.../test/integration/SiteToSiteRestTest.cpp | 144 ++++++
.../test/integration/TestExecuteProcess.cpp | 9 +-
libminifi/test/integration/ThreadPoolAdjust.cpp | 109 +++++
libminifi/test/resources/TestHTTPGet.yml | 10 +-
libminifi/test/resources/TestHTTPGetSecure.yml | 13 +-
libminifi/test/resources/TestHTTPPost.yml | 59 ++-
.../resources/TestHTTPPostChunkedEncoding.yml | 97 ++++
.../test/resources/TestSite2SiteRestSecure.yml | 58 +++
libminifi/test/resources/cn.crt.key.pem | 52 ++
libminifi/test/resources/nifi-cert-key.pem | 47 ++
libminifi/test/unit/C2MetricsTests.cpp | 230 +++++++++
libminifi/test/unit/GetTCPTests.cpp | 419 ++++++++++++++++
libminifi/test/unit/InvokeHTTPTests.cpp | 99 ++--
libminifi/test/unit/LoggerTests.cpp | 6 +
libminifi/test/unit/ProcessorTests.cpp | 40 +-
libminifi/test/unit/PropertyTests.cpp | 13 +
libminifi/test/unit/ProvenanceTestHelper.h | 27 +-
libminifi/test/unit/RepoTests.cpp | 54 +++
libminifi/test/unit/SchedulingAgentTests.cpp | 36 ++
libminifi/test/unit/Site2SiteTests.cpp | 2 +-
libminifi/test/unit/TailFileTests.cpp | 80 ++-
libminifi/test/unit/ThreadPoolTests.cpp | 55 +++
libminifi/test/unit/YamlConfigurationTests.cpp | 3 +-
main/CMakeLists.txt | 7 +-
183 files changed, 9290 insertions(+), 2225 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 292bc8d..a81127c 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -35,6 +35,8 @@ ENDIF(POLICY CMP0048)
include(CheckCXXCompilerFlag)
CHECK_CXX_COMPILER_FLAG("-std=c++11 " COMPILER_SUPPORTS_CXX11)
CHECK_CXX_COMPILER_FLAG("-std=c++0x " COMPILER_SUPPORTS_CXX0X)
+SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ")
+SET(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS}")
if(COMPILER_SUPPORTS_CXX11)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11")
elseif(COMPILER_SUPPORTS_CXX0X)
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index db10ab4..1c879c5 100644
--- a/README.md
+++ b/README.md
@@ -317,6 +317,30 @@ Additionally, users can utilize the MiNiFi Toolkit Converter (version 0.0.1 - sc
if you do not want to enable client certificate base authorization
nifi.security.need.ClientAuth=false
+
+### Command and Control Configuration
+For more more insight into the API used within the C2 agent, please visit:
+https://cwiki.apache.org/confluence/display/MINIFI/C2+Design+Proposal
+
+ in minifi.properties
+
+ #Disable/Enable C2
+ nifi.c2.enable=true
+
+ #specify metrics classes
+ nifi.flow.metrics.classes=DeviceInformation,SystemInformation,ProcessMetrics
+
+ #specify C2 protocol
+ c2.agent.protocol.class=RESTSender
+
+ #control c2 heartbeat interval in millisecocnds
+ c2.agent.heartbeat.period=3000
+
+ # enable reporter classes
+ c2.agent.heartbeat.reporter.class=RESTReciver
+
+
+
### Configuring Volatile and NO-OP Repositories
Each of the repositories can be configured to be volatile ( state kept in memory and flushed
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/cmake/BuildTests.cmake
----------------------------------------------------------------------
diff --git a/cmake/BuildTests.cmake b/cmake/BuildTests.cmake
index 29603bf..69c5e7d 100644
--- a/cmake/BuildTests.cmake
+++ b/cmake/BuildTests.cmake
@@ -40,10 +40,14 @@ function(createTests testName)
target_include_directories(${testName} PRIVATE BEFORE ${LEVELDB_INCLUDE_DIRS})
target_include_directories(${testName} PRIVATE BEFORE "include")
target_include_directories(${testName} PRIVATE BEFORE "libminifi/include/")
+ target_include_directories(${testName} PRIVATE BEFORE "libminifi/include/c2/protocols")
+ target_include_directories(${testName} PRIVATE BEFORE "libminifi/include/c2")
target_include_directories(${testName} PRIVATE BEFORE "libminifi/include/core")
target_include_directories(${testName} PRIVATE BEFORE "libminifi/include/core/controller")
target_include_directories(${testName} PRIVATE BEFORE "libminifi/include/core/repository")
target_include_directories(${testName} PRIVATE BEFORE "libminifi/include/core/yaml")
+ target_include_directories(${testName} PRIVATE BEFORE "libminifi/include/core/statemanagement")
+ target_include_directories(${testName} PRIVATE BEFORE "libminifi/include/core/statemanagement/metrics")
target_include_directories(${testName} PRIVATE BEFORE "libminifi/include/io")
target_include_directories(${testName} PRIVATE BEFORE "libminifi/include/utils")
target_include_directories(${testName} PRIVATE BEFORE "libminifi/include/processors")
@@ -88,12 +92,25 @@ add_test(NAME ControllerServiceIntegrationTests COMMAND ControllerServiceIntegra
add_test(NAME HttpGetIntegrationTest COMMAND HttpGetIntegrationTest "${TEST_RESOURCES}/TestHTTPGet.yml" "${TEST_RESOURCES}/")
-add_test(NAME HttpConfigurationListenerTest COMMAND HttpConfigurationListenerTest "${TEST_RESOURCES}/TestHTTPGet.yml" "${TEST_RESOURCES}/")
+add_test(NAME C2UpdateTest COMMAND C2UpdateTest "${TEST_RESOURCES}/TestHTTPGet.yml" "${TEST_RESOURCES}/")
add_test(NAME HttpGetIntegrationTestSecure COMMAND HttpGetIntegrationTest "${TEST_RESOURCES}/TestHTTPGetSecure.yml" "${TEST_RESOURCES}/")
-add_test(NAME HttpPostIntegrationTest COMMAND HttpPostIntegrationTest "${TEST_RESOURCES}/TestHTTPPost.yml" )
+add_test(NAME HttpPostIntegrationTest COMMAND HttpPostIntegrationTest "${TEST_RESOURCES}/TestHTTPPost.yml" "${TEST_RESOURCES}/")
-add_test(NAME Site2SiteRestTest COMMAND Site2SiteRestTest "${TEST_RESOURCES}/TestSite2SiteRest.yml" "${TEST_RESOURCES}/")
+add_test(NAME HttpPostIntegrationTestChunked COMMAND HttpPostIntegrationTest "${TEST_RESOURCES}/TestHTTPPostChunkedEncoding.yml" "${TEST_RESOURCES}/")
+
+add_test(NAME ThreadPoolAdjust COMMAND ThreadPoolAdjust "${TEST_RESOURCES}/TestHTTPPostChunkedEncoding.yml" "${TEST_RESOURCES}/")
+
+add_test(NAME C2VerifyServeResults COMMAND C2VerifyServeResults "${TEST_RESOURCES}/TestHTTPGet.yml" "${TEST_RESOURCES}/")
+
+add_test(NAME C2NullConfiguration COMMAND C2NullConfiguration "${TEST_RESOURCES}/TestHTTPGet.yml" "${TEST_RESOURCES}/")
+
+add_test(NAME C2VerifyHeartbeatAndStop COMMAND C2VerifyHeartbeatAndStop "${TEST_RESOURCES}/TestHTTPGet.yml" "${TEST_RESOURCES}/")
+
+add_test(NAME SiteToSiteRestTest COMMAND SiteToSiteRestTest "${TEST_RESOURCES}/TestSite2SiteRest.yml" "${TEST_RESOURCES}/" "http://localhost:8082/nifi-api/controller")
+
+## removed due to travis issues with our certs
+#add_test(NAME SiteToSiteRestTestSecure COMMAND SiteToSiteRestTest "${TEST_RESOURCES}/TestSite2SiteRestSecure.yml" "${TEST_RESOURCES}/" "https://localhost:8082/nifi-api/controller")
add_test(NAME TestExecuteProcess COMMAND TestExecuteProcess )
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/libminifi/CMakeLists.txt b/libminifi/CMakeLists.txt
index 5e63a30..539256b 100644
--- a/libminifi/CMakeLists.txt
+++ b/libminifi/CMakeLists.txt
@@ -62,7 +62,7 @@ include_directories(../thirdparty/jsoncpp/include)
include_directories(../thirdparty/concurrentqueue/)
include_directories(include)
-file(GLOB SOURCES "src/core/logging/*.cpp" "src/io/*.cpp" "src/io/tls/*.cpp" "src/core/controller/*.cpp" "src/controllers/*.cpp" "src/core/*.cpp" "src/core/repository/*.cpp" "src/core/yaml/*.cpp" "src/core/reporting/*.cpp" "src/provenance/*.cpp" "src/processors/*.cpp" "src/utils/*.cpp" "src/*.cpp")
+file(GLOB SOURCES "src/core/logging/*.cpp" "src/core/state/*.cpp" "src/c2/protocols/*.cpp" "src/c2/*.cpp" "src/io/*.cpp" "src/io/tls/*.cpp" "src/core/controller/*.cpp" "src/controllers/*.cpp" "src/core/*.cpp" "src/core/repository/*.cpp" "src/core/yaml/*.cpp" "src/core/reporting/*.cpp" "src/provenance/*.cpp" "src/processors/*.cpp" "src/utils/*.cpp" "src/*.cpp")
file(GLOB SPD_SOURCES "../thirdparty/spdlog-20170710/include/spdlog/*")
@@ -110,3 +110,5 @@ endif (OPENSSL_FOUND)
endif ()
add_subdirectory(src/utils)
+
+set_property(TARGET minifi PROPERTY INTERPROCEDURAL_OPTIMIZATION True)
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/ConfigurationListener.h
----------------------------------------------------------------------
diff --git a/libminifi/include/ConfigurationListener.h b/libminifi/include/ConfigurationListener.h
deleted file mode 100644
index 856ea95..0000000
--- a/libminifi/include/ConfigurationListener.h
+++ /dev/null
@@ -1,117 +0,0 @@
-/**
- * ConfigurationListener class declaration
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef __CONFIGURATION_LISTENER__
-#define __CONFIGURATION_LISTENER__
-
-#include <memory>
-#include <atomic>
-#include <cstdint>
-#include <cstring>
-#include <iostream>
-#include <string>
-#include <thread>
-
-#include "yaml-cpp/yaml.h"
-#include "core/Core.h"
-#include "core/Property.h"
-#include "properties/Configure.h"
-#include "core/logging/Logger.h"
-#include "core/logging/LoggerConfiguration.h"
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-// Forwarder declaration
-class FlowController;
-// ConfigurationListener Class
-class ConfigurationListener {
-public:
-
- // Constructor
- /*!
- * Create a new processor
- */
- ConfigurationListener(std::shared_ptr<FlowController> controller,
- std::shared_ptr<Configure> configure, std::string type) :
- connect_timeout_(20000), read_timeout_(20000), type_(type), configure_(
- configure), controller_(controller) {
- logger_ = logging::LoggerFactory<ConfigurationListener>::getLogger();
- running_ = false;
-
- }
- // Destructor
- virtual ~ConfigurationListener() {
- stop();
- }
-
- // Start the thread
- void start();
- // Stop the thread
- void stop();
- // whether the thread is enable
- bool isRunning() {
- return running_;
- }
- // pull the new configuration from the remote host
- virtual bool pullConfiguration(std::string &configuration) {
- return false;
- }
-
-protected:
-
- // Run function for the thread
- void run();
-
- // Run function for the thread
- void threadExecutor() {
- run();
- }
-
- // Mutex for protection
- std::mutex mutex_;
- // thread
- std::thread thread_;
- // whether the thread is running
- std::atomic<bool> running_;
-
- // url
- std::string url_;
- // connection timeout
- int64_t connect_timeout_;
- // read timeout.
- int64_t read_timeout_;
- // pull interval
- int64_t pull_interval_;
- // type (http/rest)
- std::string type_;
- // last applied configuration
- std::string lastAppliedConfiguration;
-
- std::shared_ptr<Configure> configure_;
- std::shared_ptr<logging::Logger> logger_;
- std::shared_ptr<FlowController> controller_;
-};
-
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
-
-#endif
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/Connection.h
----------------------------------------------------------------------
diff --git a/libminifi/include/Connection.h b/libminifi/include/Connection.h
index ff32baf..c92a626 100644
--- a/libminifi/include/Connection.h
+++ b/libminifi/include/Connection.h
@@ -47,8 +47,8 @@ class Connection : public core::Connectable, public std::enable_shared_from_this
/*
* Create a new processor
*/
- explicit Connection(const std::shared_ptr<core::Repository> &flow_repository, const std::shared_ptr<core::ContentRepository> &content_repo, std::string name, uuid_t uuid = NULL,
- uuid_t srcUUID = NULL,
+ explicit Connection(const std::shared_ptr<core::Repository> &flow_repository, const std::shared_ptr<core::ContentRepository> &content_repo, std::string name, uuid_t uuid = NULL, uuid_t srcUUID =
+ NULL,
uuid_t destUUID = NULL);
// Destructor
virtual ~Connection() {
@@ -130,7 +130,7 @@ class Connection : public core::Connectable, public std::enable_shared_from_this
}
// Get queue data size
uint64_t getQueueDataSize() {
- return max_data_queue_size_;
+ return queued_data_size_;
}
// Put the flow file into queue
void put(std::shared_ptr<core::FlowFile> flow);
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/EventDrivenSchedulingAgent.h
----------------------------------------------------------------------
diff --git a/libminifi/include/EventDrivenSchedulingAgent.h b/libminifi/include/EventDrivenSchedulingAgent.h
index ca9f021..b434de5 100644
--- a/libminifi/include/EventDrivenSchedulingAgent.h
+++ b/libminifi/include/EventDrivenSchedulingAgent.h
@@ -46,7 +46,7 @@ class EventDrivenSchedulingAgent : public ThreadedSchedulingAgent {
virtual ~EventDrivenSchedulingAgent() {
}
// Run function for the thread
- uint64_t run(std::shared_ptr<core::Processor> processor, core::ProcessContext *processContext, core::ProcessSessionFactory *sessionFactory);
+ uint64_t run(const std::shared_ptr<core::Processor> &processor, const std::shared_ptr<core::ProcessContext> &processContext, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory);
private:
// Prevent default copy constructor and assignment operation
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/FlowController.h
----------------------------------------------------------------------
diff --git a/libminifi/include/FlowController.h b/libminifi/include/FlowController.h
index d9a0452..e79999f 100644
--- a/libminifi/include/FlowController.h
+++ b/libminifi/include/FlowController.h
@@ -44,11 +44,10 @@
#include "TimerDrivenSchedulingAgent.h"
#include "EventDrivenSchedulingAgent.h"
#include "FlowControlProtocol.h"
-#include "ConfigurationListener.h"
-#include "HttpConfigurationListener.h"
-
#include "core/Property.h"
#include "utils/Id.h"
+#include "core/state/metrics/MetricsBase.h"
+#include "core/state/StateManager.h"
namespace org {
namespace apache {
@@ -62,7 +61,7 @@ namespace minifi {
* Flow Controller class. Generally used by FlowController factory
* as a singleton.
*/
-class FlowController : public core::controller::ControllerServiceProvider, public std::enable_shared_from_this<FlowController> {
+class FlowController : public core::controller::ControllerServiceProvider, public state::StateManager {
public:
static const int DEFAULT_MAX_TIMER_DRIVEN_THREAD = 10;
static const int DEFAULT_MAX_EVENT_DRIVEN_THREAD = 5;
@@ -71,20 +70,16 @@ class FlowController : public core::controller::ControllerServiceProvider, publi
* Flow controller constructor
*/
explicit FlowController(std::shared_ptr<core::Repository> provenance_repo, std::shared_ptr<core::Repository> flow_file_repo, std::shared_ptr<Configure> configure,
- std::unique_ptr<core::FlowConfiguration> flow_configuration,
- std::shared_ptr<core::ContentRepository> content_repo, const std::string name, bool headless_mode);
+ std::unique_ptr<core::FlowConfiguration> flow_configuration, std::shared_ptr<core::ContentRepository> content_repo, const std::string name, bool headless_mode);
explicit FlowController(std::shared_ptr<core::Repository> provenance_repo, std::shared_ptr<core::Repository> flow_file_repo, std::shared_ptr<Configure> configure,
- std::unique_ptr<core::FlowConfiguration> flow_configuration,
- std::shared_ptr<core::ContentRepository> content_repo)
- : FlowController(provenance_repo, flow_file_repo, configure, std::move(flow_configuration), content_repo, DEFAULT_ROOT_GROUP_NAME, false)
- {
+ std::unique_ptr<core::FlowConfiguration> flow_configuration, std::shared_ptr<core::ContentRepository> content_repo)
+ : FlowController(provenance_repo, flow_file_repo, configure, std::move(flow_configuration), content_repo, DEFAULT_ROOT_GROUP_NAME, false) {
}
explicit FlowController(std::shared_ptr<core::Repository> provenance_repo, std::shared_ptr<core::Repository> flow_file_repo, std::shared_ptr<Configure> configure,
std::unique_ptr<core::FlowConfiguration> flow_configuration)
- : FlowController(provenance_repo, flow_file_repo, configure, std::move(flow_configuration), std::make_shared<core::repository::FileSystemRepository>(), DEFAULT_ROOT_GROUP_NAME, false)
- {
+ : FlowController(provenance_repo, flow_file_repo, configure, std::move(flow_configuration), std::make_shared<core::repository::FileSystemRepository>(), DEFAULT_ROOT_GROUP_NAME, false) {
content_repo_->initialize(configure);
}
@@ -124,14 +119,33 @@ class FlowController : public core::controller::ControllerServiceProvider, publi
virtual bool isRunning() {
return running_.load();
}
+
// Whether the Flow Controller has already been initialized (loaded flow XML)
virtual bool isInitialized() {
return initialized_.load();
}
// Start to run the Flow Controller which internally start the root process group and all its children
- virtual bool start();
+ virtual int16_t start();
+ virtual int16_t pause() {
+ return -1;
+ }
// Unload the current flow YAML, clean the root process group and all its children
- virtual void stop(bool force);
+ virtual int16_t stop(bool force, uint64_t timeToWait = 0);
+ virtual int16_t applyUpdate(const std::string &configuration);
+ virtual int16_t drainRepositories() {
+
+ return -1;
+ }
+
+ virtual std::vector<std::shared_ptr<state::StateController>> getComponents(const std::string &name);
+
+ virtual std::vector<std::shared_ptr<state::StateController>> getAllComponents();
+
+ virtual int16_t clearConnection(const std::string &connection);
+
+ virtual int16_t applyUpdate(const std::shared_ptr<state::Update> &updateController) {
+ return -1;
+ }
// Asynchronous function trigger unloading and wait for a period of time
virtual void waitUnload(const uint64_t timeToWaitMs);
// Unload the current flow xml, clean the root process group and all its children
@@ -158,7 +172,7 @@ class FlowController : public core::controller::ControllerServiceProvider, publi
// first it will validate the payload with the current root node config for flowController
// like FlowController id/name is the same and new version is greater than the current version
// after that, it will apply the configuration
- bool applyConfiguration(std::string &configurePayload);
+ bool applyConfiguration(const std::string &configurePayload);
// get name
std::string getName() {
@@ -168,6 +182,10 @@ class FlowController : public core::controller::ControllerServiceProvider, publi
return "";
}
+ virtual std::string getComponentName() {
+ return "FlowController";
+ }
+
// get version
int getVersion() {
if (root_ != nullptr)
@@ -199,7 +217,7 @@ class FlowController : public core::controller::ControllerServiceProvider, publi
* Enables the controller service services
* @param serviceNode service node which will be disabled, along with linked services.
*/
- virtual void enableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode);
+ virtual std::future<bool> enableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode);
/**
* Enables controller services
@@ -211,7 +229,7 @@ class FlowController : public core::controller::ControllerServiceProvider, publi
* Disables controller services
* @param serviceNode service node which will be disabled, along with linked services.
*/
- virtual void disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode);
+ virtual std::future<bool> disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode);
/**
* Gets all controller services.
@@ -278,11 +296,25 @@ class FlowController : public core::controller::ControllerServiceProvider, publi
*/
virtual void enableAllControllerServices();
+ /**
+ * Retrieves all metrics from this source.
+ * @param metric_vector -- metrics will be placed in this vector.
+ * @return result of the get operation.
+ * 0 Success
+ * 1 No error condition, but cannot obtain lock in timely manner.
+ * -1 failure
+ */
+ virtual int16_t getMetrics(std::vector<std::shared_ptr<state::metrics::Metrics>> &metric_vector, uint8_t metricsClass);
+
+ virtual uint64_t getUptime();
+
protected:
// function to load the flow file repo.
void loadFlowRepo();
+ void initializeC2();
+
/**
* Initializes flow controller paths.
*/
@@ -304,8 +336,12 @@ class FlowController : public core::controller::ControllerServiceProvider, publi
// FlowFile Repo
// Whether it is running
std::atomic<bool> running_;
+
// conifiguration filename
std::string configuration_filename_;
+
+ std::atomic<bool> c2_initialized_;
+ std::atomic<bool> c2_enabled_;
// Whether it has already been initialized (load the flow XML already)
std::atomic<bool> initialized_;
// Provenance Repo
@@ -336,15 +372,29 @@ class FlowController : public core::controller::ControllerServiceProvider, publi
// flow configuration object.
std::unique_ptr<core::FlowConfiguration> flow_configuration_;
- private:
+ // metrics information
+
+ std::chrono::steady_clock::time_point start_time_;
+
+ std::mutex metrics_mutex_;
+ // metrics cache
+ std::map<std::string, std::shared_ptr<state::metrics::Metrics>> metrics_;
+
+ // metrics cache
+ std::map<std::string, std::shared_ptr<state::metrics::Metrics>> component_metrics_;
+
+ std::map<uint8_t, std::vector<std::shared_ptr<state::metrics::Metrics>>>component_metrics_by_id_;
+ // metrics last run
+ std::chrono::steady_clock::time_point last_metrics_capture_;
+
+private:
std::shared_ptr<logging::Logger> logger_;
- // http configuration listener object.
- std::unique_ptr<HttpConfigurationListener> http_configuration_listener_;
std::string serial_number_;
static std::shared_ptr<utils::IdGenerator> id_generator_;
};
-} /* namespace minifi */
+}
+/* namespace minifi */
} /* namespace nifi */
} /* namespace apache */
} /* namespace org */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/FlowFileRecord.h
----------------------------------------------------------------------
diff --git a/libminifi/include/FlowFileRecord.h b/libminifi/include/FlowFileRecord.h
index d6e5f2e..9299b8a 100644
--- a/libminifi/include/FlowFileRecord.h
+++ b/libminifi/include/FlowFileRecord.h
@@ -94,7 +94,6 @@ class OutputStreamCallback {
virtual ~OutputStreamCallback() {
}
- //virtual void process(std::ofstream *stream) = 0;
virtual int64_t process(std::shared_ptr<io::BaseStream> stream) = 0;
};
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/HttpConfigurationListener.h
----------------------------------------------------------------------
diff --git a/libminifi/include/HttpConfigurationListener.h b/libminifi/include/HttpConfigurationListener.h
deleted file mode 100644
index 7e3291e..0000000
--- a/libminifi/include/HttpConfigurationListener.h
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * HttpConfigurationListener class declaration
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef __HTTP_CONFIGURATION_LISTENER__
-#define __HTTP_CONFIGURATION_LISTENER__
-
-#include <curl/curl.h>
-#include "core/Core.h"
-#include "core/Property.h"
-#include "ConfigurationListener.h"
-#include "utils/HTTPUtils.h"
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-
-// HttpConfigurationListener Class
-class HttpConfigurationListener: public ConfigurationListener {
-public:
-
- // Constructor
- /*!
- * Create a new processor
- */
- HttpConfigurationListener(std::shared_ptr<FlowController> controller,
- std::shared_ptr<Configure> configure) :
- minifi::ConfigurationListener(controller, configure, "http"),
- securityConfig_(configure) {
- std::string value;
-
- if (configure->get(Configure::nifi_configuration_listener_http_url, value)) {
- url_ = value;
- logger_->log_info("Http configuration listener URL %s", url_.c_str());
- } else {
- url_ = "";
- }
-
- curl_global_init(CURL_GLOBAL_DEFAULT);
- this->start();
- }
-
- bool pullConfiguration(std::string &configuration);
-
- // Destructor
- virtual ~HttpConfigurationListener() {
- this->stop();
- curl_global_cleanup();
- }
-
-protected:
- minifi::utils::HTTPSecurityConfiguration securityConfig_;
-
-};
-
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
-
-#endif
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/RemoteProcessorGroupPort.h
----------------------------------------------------------------------
diff --git a/libminifi/include/RemoteProcessorGroupPort.h b/libminifi/include/RemoteProcessorGroupPort.h
index d484fb9..cefce45 100644
--- a/libminifi/include/RemoteProcessorGroupPort.h
+++ b/libminifi/include/RemoteProcessorGroupPort.h
@@ -23,13 +23,14 @@
#include <mutex>
#include <memory>
#include <stack>
-#include "utils/HTTPUtils.h"
+#include "utils/HTTPClient.h"
#include "concurrentqueue.h"
#include "FlowFileRecord.h"
#include "core/Processor.h"
#include "core/ProcessSession.h"
#include "Site2SiteClientProtocol.h"
#include "io/StreamFactory.h"
+#include "controllers/SSLContextService.h"
#include "core/logging/LoggerConfiguration.h"
namespace org {
@@ -43,14 +44,15 @@ class RemoteProcessorGroupPort : public core::Processor {
/*!
* Create a new processor
*/
- RemoteProcessorGroupPort(const std::shared_ptr<io::StreamFactory> &stream_factory, std::string name, std::string url, std::shared_ptr<Configure> configure, uuid_t uuid = nullptr)
+ RemoteProcessorGroupPort(const std::shared_ptr<io::StreamFactory> &stream_factory, std::string name, std::string url, const std::shared_ptr<Configure> &configure, uuid_t uuid = nullptr)
: core::Processor(name, uuid),
configure_(configure),
direction_(SEND),
transmitting_(false),
+ timeout_(0),
logger_(logging::LoggerFactory<RemoteProcessorGroupPort>::getLogger()),
url_(url),
- securityConfig_(configure) {
+ ssl_service(nullptr) {
stream_factory_ = stream_factory;
if (uuid != nullptr) {
uuid_copy(protocol_uuid_, uuid);
@@ -71,11 +73,12 @@ class RemoteProcessorGroupPort : public core::Processor {
static const char *ProcessorName;
// Supported Properties
static core::Property hostName;
+ static core::Property SSLContext;
static core::Property port;
static core::Property portUUID;
// Supported Relationships
static core::Relationship relation;
- public:
+ public:
void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory);
// OnTrigger method, implemented by NiFi RemoteProcessorGroupPort
virtual void onTrigger(core::ProcessContext *context, core::ProcessSession *session);
@@ -102,8 +105,7 @@ class RemoteProcessorGroupPort : public core::Processor {
if (port_ == -1) {
if (protocol_.find("https") != std::string::npos) {
port_ = 443;
- }
- else if (protocol_.find("http") != std::string::npos) {
+ } else if (protocol_.find("http") != std::string::npos) {
port_ = 80;
}
}
@@ -142,15 +144,17 @@ class RemoteProcessorGroupPort : public core::Processor {
std::string url_;
// Remote Site2Site Info
- int site2site_port_;
- bool site2site_secure_;
+ int site2site_port_;bool site2site_secure_;
std::vector<minifi::Site2SitePeerStatus> site2site_peer_status_list_;
std::atomic<int> site2site_peer_index_;
std::mutex site2site_peer_mutex_;
std::string rest_user_name_;
std::string rest_password_;
- minifi::utils::HTTPSecurityConfiguration securityConfig_;
+ std::shared_ptr<controllers::SSLContextService> ssl_service;
+
+ private:
+ static const char* RPG_SSL_CONTEXT_SERVICE_NAME;
};
} /* namespace minifi */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/ResourceClaim.h
----------------------------------------------------------------------
diff --git a/libminifi/include/ResourceClaim.h b/libminifi/include/ResourceClaim.h
index 19a67fa..b788416 100644
--- a/libminifi/include/ResourceClaim.h
+++ b/libminifi/include/ResourceClaim.h
@@ -41,12 +41,15 @@ namespace minifi {
// Default content directory
#define DEFAULT_CONTENT_DIRECTORY "./content_repository"
+extern std::string default_directory_path;
+
+extern void setDefaultDirectory(std::string);
+
// ResourceClaim Class
class ResourceClaim : public std::enable_shared_from_this<ResourceClaim> {
public:
- static char *default_directory_path;
// Constructor
/*!
* Create a new resource claim
@@ -55,23 +58,20 @@ class ResourceClaim : public std::enable_shared_from_this<ResourceClaim> {
ResourceClaim(const std::string path, std::shared_ptr<core::StreamManager<ResourceClaim>> claim_manager, bool deleted = false);
// Destructor
- virtual ~ResourceClaim() {
+ ~ResourceClaim() {
}
// increaseFlowFileRecordOwnedCount
void increaseFlowFileRecordOwnedCount() {
- ++_flowFileRecordOwnedCount;
+ claim_manager_->incrementStreamCount(shared_from_this());
}
// decreaseFlowFileRecordOwenedCount
void decreaseFlowFileRecordOwnedCount() {
-
- if (_flowFileRecordOwnedCount > 0) {
- _flowFileRecordOwnedCount--;
- }
+ claim_manager_->decrementStreamCount(shared_from_this());
}
// getFlowFileRecordOwenedCount
uint64_t getFlowFileRecordOwnedCount() {
- return _flowFileRecordOwnedCount;
+ return claim_manager_->getStreamCount(shared_from_this());
}
// Get the content full path
std::string getContentFullPath() {
@@ -83,13 +83,19 @@ class ResourceClaim : public std::enable_shared_from_this<ResourceClaim> {
}
void deleteClaim() {
- if (!deleted_)
- {
+ if (!deleted_) {
deleted_ = true;
}
}
+ bool exists() {
+ if (claim_manager_ == nullptr) {
+ return false;
+ }
+ return claim_manager_->exists(shared_from_this());
+ }
+
friend std::ostream& operator<<(std::ostream& stream, const ResourceClaim& claim) {
stream << claim._contentFullPath;
return stream;
@@ -104,9 +110,6 @@ class ResourceClaim : public std::enable_shared_from_this<ResourceClaim> {
// Full path to the content
std::string _contentFullPath;
- // How many FlowFileRecord Own this cliam
- std::atomic<uint64_t> _flowFileRecordOwnedCount;
-
std::shared_ptr<core::StreamManager<ResourceClaim>> claim_manager_;
private:
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/SchedulingAgent.h
----------------------------------------------------------------------
diff --git a/libminifi/include/SchedulingAgent.h b/libminifi/include/SchedulingAgent.h
index 130c088..569c4ee 100644
--- a/libminifi/include/SchedulingAgent.h
+++ b/libminifi/include/SchedulingAgent.h
@@ -52,10 +52,8 @@ class SchedulingAgent {
/*!
* Create a new scheduling agent.
*/
- SchedulingAgent(std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider, std::shared_ptr<core::Repository> repo,
- std::shared_ptr<core::Repository> flow_repo,
- std::shared_ptr<core::ContentRepository> content_repo,
- std::shared_ptr<Configure> configuration)
+ SchedulingAgent(std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider, std::shared_ptr<core::Repository> repo, std::shared_ptr<core::Repository> flow_repo,
+ std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<Configure> configuration)
: configure_(configuration),
admin_yield_duration_(0),
bored_yield_duration_(0),
@@ -65,7 +63,7 @@ class SchedulingAgent {
running_ = false;
repo_ = repo;
flow_repo_ = flow_repo;
- utils::ThreadPool<bool> pool = utils::ThreadPool<bool>(configure_->getInt(Configure::nifi_flow_engine_threads, 8), true);
+ utils::ThreadPool<bool> pool = utils::ThreadPool<bool>(configure_->getInt(Configure::nifi_flow_engine_threads, 2), true);
component_lifecycle_thread_pool_ = std::move(pool);
component_lifecycle_thread_pool_.start();
}
@@ -74,7 +72,7 @@ class SchedulingAgent {
}
// onTrigger, return whether the yield is need
- bool onTrigger(std::shared_ptr<core::Processor> processor, core::ProcessContext *processContext, core::ProcessSessionFactory *sessionFactory);
+ bool onTrigger(std::shared_ptr<core::Processor> processor, std::shared_ptr<core::ProcessContext> processContext, std::shared_ptr<core::ProcessSessionFactory> sessionFactory);
// Whether agent has work to do
bool hasWorkToDo(std::shared_ptr<core::Processor> processor);
// Whether the outgoing need to be backpressure
@@ -82,6 +80,7 @@ class SchedulingAgent {
// start
void start() {
running_ = true;
+ component_lifecycle_thread_pool_.start();
}
// stop
virtual void stop() {
@@ -90,8 +89,8 @@ class SchedulingAgent {
}
public:
- virtual void enableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode);
- virtual void disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode);
+ virtual std::future<bool> enableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode);
+ virtual std::future<bool> disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode);
// schedule, overwritten by different DrivenSchedulingAgent
virtual void schedule(std::shared_ptr<core::Processor> processor) = 0;
// unschedule, overwritten by different DrivenSchedulingAgent
@@ -112,7 +111,7 @@ class SchedulingAgent {
std::shared_ptr<Configure> configure_;
std::shared_ptr<core::Repository> repo_;
-
+
std::shared_ptr<core::Repository> flow_repo_;
std::shared_ptr<core::ContentRepository> content_repo_;
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/Site2SiteClientProtocol.h
----------------------------------------------------------------------
diff --git a/libminifi/include/Site2SiteClientProtocol.h b/libminifi/include/Site2SiteClientProtocol.h
index dcb551a..df974fb 100644
--- a/libminifi/include/Site2SiteClientProtocol.h
+++ b/libminifi/include/Site2SiteClientProtocol.h
@@ -174,29 +174,13 @@ typedef struct {
} RespondCodeContext;
// Respond Code Context
-static RespondCodeContext respondCodeContext[] = {
- { RESERVED, "Reserved for Future Use", false },
- { PROPERTIES_OK, "Properties OK", false },
- { UNKNOWN_PROPERTY_NAME, "Unknown Property Name", true },
- { ILLEGAL_PROPERTY_VALUE, "Illegal Property Value", true },
- { MISSING_PROPERTY, "Missing Property", true },
- { CONTINUE_TRANSACTION, "Continue Transaction", false },
- { FINISH_TRANSACTION, "Finish Transaction", false },
- { CONFIRM_TRANSACTION, "Confirm Transaction", true },
- { TRANSACTION_FINISHED, "Transaction Finished", false },
- { TRANSACTION_FINISHED_BUT_DESTINATION_FULL, "Transaction Finished But Destination is Full", false },
- { CANCEL_TRANSACTION, "Cancel Transaction", true },
- { BAD_CHECKSUM, "Bad Checksum", false },
- { MORE_DATA, "More Data Exists", false },
- { NO_MORE_DATA, "No More Data Exists", false },
- { UNKNOWN_PORT, "Unknown Port", false },
- { PORT_NOT_IN_VALID_STATE, "Port Not in a Valid State", true },
- { PORTS_DESTINATION_FULL, "Port's Destination is Full", false },
- { UNAUTHORIZED, "User Not Authorized", true },
- { ABORT, "Abort", true },
- { UNRECOGNIZED_RESPONSE_CODE, "Unrecognized Response Code", false },
- { END_OF_STREAM, "End of Stream", false }
-};
+static RespondCodeContext respondCodeContext[] = { { RESERVED, "Reserved for Future Use", false }, { PROPERTIES_OK, "Properties OK", false }, { UNKNOWN_PROPERTY_NAME, "Unknown Property Name", true },
+ { ILLEGAL_PROPERTY_VALUE, "Illegal Property Value", true }, { MISSING_PROPERTY, "Missing Property", true }, { CONTINUE_TRANSACTION, "Continue Transaction", false }, { FINISH_TRANSACTION,
+ "Finish Transaction", false }, { CONFIRM_TRANSACTION, "Confirm Transaction", true }, { TRANSACTION_FINISHED, "Transaction Finished", false }, { TRANSACTION_FINISHED_BUT_DESTINATION_FULL,
+ "Transaction Finished But Destination is Full", false }, { CANCEL_TRANSACTION, "Cancel Transaction", true }, { BAD_CHECKSUM, "Bad Checksum", false }, { MORE_DATA, "More Data Exists", false },
+ { NO_MORE_DATA, "No More Data Exists", false }, { UNKNOWN_PORT, "Unknown Port", false }, { PORT_NOT_IN_VALID_STATE, "Port Not in a Valid State", true }, { PORTS_DESTINATION_FULL,
+ "Port's Destination is Full", false }, { UNAUTHORIZED, "User Not Authorized", true }, { ABORT, "Abort", true }, { UNRECOGNIZED_RESPONSE_CODE, "Unrecognized Response Code", false }, {
+ END_OF_STREAM, "End of Stream", false } };
// Respond Code Sequence Pattern
static const uint8_t CODE_SEQUENCE_VALUE_1 = (uint8_t) 'R';
@@ -246,47 +230,47 @@ typedef enum {
// HandShakeProperty Str
static const char *HandShakePropertyStr[MAX_HANDSHAKE_PROPERTY] = {
- /**
- * Boolean value indicating whether or not the contents of a FlowFile should
- * be GZipped when transferred.
- */
- "GZIP",
- /**
- * The unique identifier of the port to communicate with
- */
- "PORT_IDENTIFIER",
- /**
- * Indicates the number of milliseconds after the request was made that the
- * client will wait for a response. If no response has been received by the
- * time this value expires, the server can move on without attempting to
- * service the request because the client will have already disconnected.
- */
- "REQUEST_EXPIRATION_MILLIS",
- /**
- * The preferred number of FlowFiles that the server should send to the
- * client when pulling data. This property was introduced in version 5 of
- * the protocol.
- */
- "BATCH_COUNT",
- /**
- * The preferred number of bytes that the server should send to the client
- * when pulling data. This property was introduced in version 5 of the
- * protocol.
- */
- "BATCH_SIZE",
- /**
- * The preferred amount of time that the server should send data to the
- * client when pulling data. This property was introduced in version 5 of
- * the protocol. Value is in milliseconds.
- */
- "BATCH_DURATION" };
+/**
+ * Boolean value indicating whether or not the contents of a FlowFile should
+ * be GZipped when transferred.
+ */
+"GZIP",
+/**
+ * The unique identifier of the port to communicate with
+ */
+"PORT_IDENTIFIER",
+/**
+ * Indicates the number of milliseconds after the request was made that the
+ * client will wait for a response. If no response has been received by the
+ * time this value expires, the server can move on without attempting to
+ * service the request because the client will have already disconnected.
+ */
+"REQUEST_EXPIRATION_MILLIS",
+/**
+ * The preferred number of FlowFiles that the server should send to the
+ * client when pulling data. This property was introduced in version 5 of
+ * the protocol.
+ */
+"BATCH_COUNT",
+/**
+ * The preferred number of bytes that the server should send to the client
+ * when pulling data. This property was introduced in version 5 of the
+ * protocol.
+ */
+"BATCH_SIZE",
+/**
+ * The preferred amount of time that the server should send data to the
+ * client when pulling data. This property was introduced in version 5 of
+ * the protocol. Value is in milliseconds.
+ */
+"BATCH_DURATION" };
class Site2SiteClientProtocol;
// Transaction Class
class Transaction {
friend class Site2SiteClientProtocol;
- public:
+ public:
// Constructor
/*!
* Create a new transaction
@@ -389,13 +373,12 @@ class DataPacket {
};
/**
- * Site2Site Peer
- */
- typedef struct Site2SitePeerStatus {
- std::string host_;
- int port_;
- bool isSecure_;
- } Site2SitePeerStatus;
+ * Site2Site Peer
+ */
+typedef struct Site2SitePeerStatus {
+ std::string host_;
+ int port_;bool isSecure_;
+} Site2SitePeerStatus;
// Site2SiteClientProtocol Class
class Site2SiteClientProtocol {
@@ -525,7 +508,7 @@ class Site2SiteClientProtocol {
bool receive(std::string transactionID, DataPacket *packet, bool &eof);
// Send the data packet from the transaction
// Return false when any error occurs
- bool send(std::string transactionID, DataPacket *packet, std::shared_ptr<FlowFileRecord> flowFile, core::ProcessSession *session);
+ int16_t send(std::string transactionID, DataPacket *packet, std::shared_ptr<FlowFileRecord> flowFile, core::ProcessSession *session);
// Confirm the data that was sent or received by comparing CRC32's of the data sent and the data received.
bool confirm(std::string transactionID);
// Cancel the transaction
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/Site2SitePeer.h
----------------------------------------------------------------------
diff --git a/libminifi/include/Site2SitePeer.h b/libminifi/include/Site2SitePeer.h
index 65a5479..59f5cfe 100644
--- a/libminifi/include/Site2SitePeer.h
+++ b/libminifi/include/Site2SitePeer.h
@@ -78,7 +78,7 @@ class Site2SitePeer : public org::apache::nifi::minifi::io::BaseStream {
_url = std::move(ss._url);
}
// Destructor
- virtual ~Site2SitePeer() {
+ ~Site2SitePeer() {
Close();
}
// Set Processor yield period in MilliSecond
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/ThreadedSchedulingAgent.h
----------------------------------------------------------------------
diff --git a/libminifi/include/ThreadedSchedulingAgent.h b/libminifi/include/ThreadedSchedulingAgent.h
index 27b8b3a..d9fc7be 100644
--- a/libminifi/include/ThreadedSchedulingAgent.h
+++ b/libminifi/include/ThreadedSchedulingAgent.h
@@ -84,14 +84,12 @@ class ThreadedSchedulingAgent : public SchedulingAgent {
/*!
* Create a new threaded scheduling agent.
*/
- ThreadedSchedulingAgent(std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider, std::shared_ptr<core::Repository> repo,
- std::shared_ptr<core::Repository> flow_repo,
- std::shared_ptr<core::ContentRepository> content_repo,
- std::shared_ptr<Configure> configuration)
+ ThreadedSchedulingAgent(std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider, std::shared_ptr<core::Repository> repo, std::shared_ptr<core::Repository> flow_repo,
+ std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<Configure> configuration)
: SchedulingAgent(controller_service_provider, repo, flow_repo, content_repo, configuration),
logger_(logging::LoggerFactory<ThreadedSchedulingAgent>::getLogger()) {
- utils::ThreadPool<uint64_t> pool = utils::ThreadPool<uint64_t>(configure_->getInt(Configure::nifi_flow_engine_threads, 8), true);
+ utils::ThreadPool<uint64_t> pool = utils::ThreadPool<uint64_t>(configure_->getInt(Configure::nifi_flow_engine_threads, 2), true);
thread_pool_ = std::move(pool);
thread_pool_.start();
@@ -101,7 +99,8 @@ class ThreadedSchedulingAgent : public SchedulingAgent {
}
// Run function for the thread
- virtual uint64_t run(std::shared_ptr<core::Processor> processor, core::ProcessContext *processContext, core::ProcessSessionFactory *sessionFactory) = 0;
+ virtual uint64_t run(const std::shared_ptr<core::Processor> &processor, const std::shared_ptr<core::ProcessContext> &processContext,
+ const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) = 0;
public:
// schedule, overwritten by different DrivenTimerDrivenSchedulingAgent
@@ -110,12 +109,11 @@ class ThreadedSchedulingAgent : public SchedulingAgent {
virtual void unschedule(std::shared_ptr<core::Processor> processor);
virtual void stop();
- protected:
+ protected:
utils::ThreadPool<uint64_t> thread_pool_;
protected:
-
private:
// Prevent default copy constructor and assignment operation
// Only support pass by reference or pointer
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/TimerDrivenSchedulingAgent.h
----------------------------------------------------------------------
diff --git a/libminifi/include/TimerDrivenSchedulingAgent.h b/libminifi/include/TimerDrivenSchedulingAgent.h
index 1502c47..8398b3a 100644
--- a/libminifi/include/TimerDrivenSchedulingAgent.h
+++ b/libminifi/include/TimerDrivenSchedulingAgent.h
@@ -38,10 +38,9 @@ class TimerDrivenSchedulingAgent : public ThreadedSchedulingAgent {
* Create a new processor
*/
TimerDrivenSchedulingAgent(std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider, std::shared_ptr<core::Repository> repo,
- std::shared_ptr<core::Repository> flow_repo,
- std::shared_ptr<core::ContentRepository> content_repo,
- std::shared_ptr<Configure> configure)
- : ThreadedSchedulingAgent(controller_service_provider, repo, flow_repo, content_repo, configure) {
+ std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<Configure> configure)
+ : ThreadedSchedulingAgent(controller_service_provider, repo, flow_repo, content_repo, configure),
+ logger_(logging::LoggerFactory<TimerDrivenSchedulingAgent>::getLogger()) {
}
// Destructor
virtual ~TimerDrivenSchedulingAgent() {
@@ -49,7 +48,7 @@ class TimerDrivenSchedulingAgent : public ThreadedSchedulingAgent {
/**
* Run function that accepts the processor, context and session factory.
*/
- uint64_t run(std::shared_ptr<core::Processor> processor, core::ProcessContext *processContext, core::ProcessSessionFactory *sessionFactory);
+ uint64_t run(const std::shared_ptr<core::Processor> &processor, const std::shared_ptr<core::ProcessContext> &processContext, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory);
private:
// Prevent default copy constructor and assignment operation
@@ -57,6 +56,7 @@ class TimerDrivenSchedulingAgent : public ThreadedSchedulingAgent {
TimerDrivenSchedulingAgent(const TimerDrivenSchedulingAgent &parent);
TimerDrivenSchedulingAgent &operator=(const TimerDrivenSchedulingAgent &parent);
+ std::shared_ptr<logging::Logger> logger_;
};
} /* namespace minifi */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/c2/C2Agent.h
----------------------------------------------------------------------
diff --git a/libminifi/include/c2/C2Agent.h b/libminifi/include/c2/C2Agent.h
new file mode 100644
index 0000000..8c2e45d
--- /dev/null
+++ b/libminifi/include/c2/C2Agent.h
@@ -0,0 +1,200 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_C2_C2AGENT_H_
+#define LIBMINIFI_INCLUDE_C2_C2AGENT_H_
+
+#include <utility>
+#include <functional>
+#include <future>
+#include <memory>
+#include <mutex>
+#include <thread>
+#include "core/state/UpdateController.h"
+#include "core/state/metrics/MetricsBase.h"
+#include "C2Payload.h"
+#include "C2Protocol.h"
+#include "io/validation.h"
+#include "protocols/Protocols.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace c2 {
+
+/**
+ * Purpose and Justification: C2 agent will be the mechanism that will abstract the protocol to do the work.
+ *
+ * The protocol represents a transformation layer into the objects seen in C2Payload. That transformation may
+ * be minimal or extreme, depending on the protocol itself.
+ *
+ * Metrics Classes defined here:
+ *
+ * 0 HeartBeat -- RESERVED
+ * 1-255 Defined by the configuration file.
+ */
+class C2Agent : public state::UpdateController, public state::metrics::MetricsSink, public std::enable_shared_from_this<C2Agent> {
+ public:
+
+ C2Agent(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<state::StateMonitor> &updateSink, const std::shared_ptr<Configure> &configure);
+
+ virtual ~C2Agent() {
+
+ }
+
+ /**
+ * Sends the heartbeat to ths server. Will include metrics
+ * in the payload if they exist.
+ */
+ void performHeartBeat();
+
+ virtual std::vector<std::function<state::Update()>> getFunctions() {
+ return functions_;
+ }
+
+ /**
+ * Sets the metric within this sink
+ * @param metric metric to set
+ * @param return 0 on success, -1 on failure.
+ */
+ virtual int16_t setMetrics(const std::shared_ptr<state::metrics::Metrics> &metric);
+
+ protected:
+
+ /**
+ * Configure the C2 agent
+ */
+ void configure(const std::shared_ptr<Configure> &configure, bool reconfigure = true);
+
+ /**
+ * Serializes metrics into a payload.
+ * @parem parent_paylaod parent payload into which we insert the newly generated payload.
+ * @param name name of this metric
+ * @param metrics metrics to include.
+ */
+ void serializeMetrics(C2Payload &parent_payload, const std::string &name, const std::vector<state::metrics::MetricResponse> &metrics);
+
+ /**
+ * Extract the payload
+ * @param resp payload to be moved into the function.
+ */
+ void extractPayload(const C2Payload &&resp);
+
+ /**
+ * Extract the payload
+ * @param payload reference.
+ */
+ void extractPayload(const C2Payload &resp);
+
+ /**
+ * Enqueues a C2 server response for us to evaluate and parse.
+ */
+ void enqueue_c2_server_response(C2Payload &&resp) {
+ std::lock_guard<std::timed_mutex> lock(queue_mutex);
+ responses.push_back(std::move(resp));
+ }
+
+ /**
+ * Enqueues a c2 payload for a response to the C2 server.
+ */
+ void enqueue_c2_response(C2Payload &&resp) {
+ std::lock_guard<std::timed_mutex> lock(request_mutex);
+ requests.push_back(std::move(resp));
+ }
+
+ /**
+ * Handles a C2 event requested by the server.
+ * @param resp c2 server response.
+ */
+ void handle_c2_server_response(const C2ContentResponse &resp);
+
+ /**
+ * Handles an update request
+ * @param C2ContentResponse response
+ */
+ void handle_update(const C2ContentResponse &resp);
+
+ /**
+ * Handles a description request
+ */
+ void handle_describe(const C2ContentResponse &resp);
+
+ std::timed_mutex metrics_mutex_;
+ std::map<std::string, std::shared_ptr<state::metrics::Metrics>> metrics_map_;
+
+ /**
+ * Device information stored in the metrics format
+ */
+ std::map<std::string, std::shared_ptr<state::metrics::Metrics>> device_information_;
+ // queue mutex
+ std::timed_mutex queue_mutex;
+
+ // queue mutex
+ std::timed_mutex request_mutex;
+
+ // responses for the the C2 agent.
+ std::vector<C2Payload> responses;
+
+ // requests that originate from the C2 server.
+ std::vector<C2Payload> requests;
+
+ // heart beat period.
+ int64_t heart_beat_period_;
+
+ // maximum number of queued messages to send to the c2 server
+ int16_t max_c2_responses;
+
+ // time point the last time we performed a heartbeat.
+ std::chrono::steady_clock::time_point last_run_;
+
+ // function that performs the heartbeat
+ std::function<state::Update()> c2_producer_;
+
+ // function that acts upon the
+ std::function<state::Update()> c2_consumer_;
+
+ // reference to the update sink, against which we will execute updates.
+ std::shared_ptr<state::StateMonitor> update_sink_;
+
+ // functions that will be used for the udpate controller.
+ std::vector<std::function<state::Update()>> functions_;
+
+ // controller service provider refernece.
+ std::shared_ptr<core::controller::ControllerServiceProvider> controller_;
+
+ std::shared_ptr<Configure> configuration_;
+
+ std::shared_ptr<Configure> running_configuration;
+
+ std::mutex heartbeat_mutex;
+
+ std::vector<std::shared_ptr<HeartBeatReporter>> heartbeat_protocols_;
+
+ std::atomic<C2Protocol*> protocol_;
+
+ std::shared_ptr<logging::Logger> logger_;
+}
+;
+
+} /* namesapce c2 */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_C2_C2AGENT_H_ */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/c2/C2Payload.h
----------------------------------------------------------------------
diff --git a/libminifi/include/c2/C2Payload.h b/libminifi/include/c2/C2Payload.h
new file mode 100644
index 0000000..ca14584
--- /dev/null
+++ b/libminifi/include/c2/C2Payload.h
@@ -0,0 +1,192 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_C2_C2PAYLOAD_H_
+#define LIBMINIFI_INCLUDE_C2_C2PAYLOAD_H_
+
+#include <memory>
+#include <string>
+#include <map>
+#include "core/state/UpdateController.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace c2 {
+
+enum Operation {
+ ACKNOWLEDGE,
+ START,
+ STOP,
+ RESTART,
+ DESCRIBE,
+ HEARTBEAT,
+ UPDATE,
+ VALIDATE,
+ CLEAR
+};
+
+enum Direction {
+ TRANSMIT,
+ RECEIVE
+};
+
+class C2ContentResponse {
+ public:
+ C2ContentResponse(Operation op);
+
+ C2ContentResponse(const C2ContentResponse &other);
+
+ C2ContentResponse(const C2ContentResponse &&other);
+
+ C2ContentResponse & operator=(const C2ContentResponse &&other);
+
+ C2ContentResponse & operator=(const C2ContentResponse &other);
+
+ Operation op;
+ // determines if the operation is required
+ bool required;
+ // identifier
+ std::string ident;
+ // delay before running
+ uint32_t delay;
+ // max time before this response will no longer be honored.
+ uint64_t ttl;
+ // name applied to commands
+ std::string name;
+ // commands that correspond with the operation.
+ std::map<std::string, std::string> operation_arguments;
+// std::vector<std::string> content;
+};
+
+/**
+ * C2Payload is an update for the state manager.
+ * Note that the payload can either consist of other payloads or
+ * have content directly within it, represented by C2ContentResponse objects, above.
+ *
+ * Payloads can also contain raw data, which can be binary data.
+ */
+class C2Payload : public state::Update {
+ public:
+ virtual ~C2Payload() {
+
+ }
+
+ C2Payload(Operation op, std::string identifier, bool resp = false, bool isRaw = false);
+
+ C2Payload(Operation op, bool resp = false, bool isRaw = false);
+
+ C2Payload(Operation op, state::UpdateState state, bool resp = false, bool isRaw = false);
+
+ C2Payload(const C2Payload &other);
+
+ C2Payload(const C2Payload &&other);
+
+ void setIdentifier(const std::string &ident);
+
+ std::string getIdentifier() const;
+
+ void setLabel(const std::string label) {
+ label_ = label;
+ }
+
+ std::string getLabel() const {
+ return label_;
+ }
+
+ /**
+ * Gets the operation for this payload. May be nested or a single operation.
+ */
+ Operation getOperation() const;
+
+ /**
+ * Validate the payload, if necessary and/or possible.
+ */
+ virtual bool validate();
+
+ /**
+ * Get content responses from this payload.
+ */
+ const std::vector<C2ContentResponse> &getContent() const;
+
+ /**
+ * Add a content response to this payload.
+ */
+ void addContent(const C2ContentResponse &&content);
+
+ /**
+ * Determines if this object contains raw data.
+ */
+ bool isRaw() const;
+
+ /**
+ * Sets raw data within this object.
+ */
+ void setRawData(const std::string &data);
+
+ /**
+ * Sets raw data from a vector within this object.
+ */
+ void setRawData(const std::vector<char> &data);
+
+ /**
+ * Returns raw data.
+ */
+ std::string getRawData() const;
+
+ /**
+ * Add a nested payload.
+ * @param payload payload to move into this object.
+ */
+ void addPayload(const C2Payload &&payload);
+ /**
+ * Get nested payloads.
+ */
+ const std::vector<C2Payload> &getNestedPayloads() const;
+
+ C2Payload &operator=(const C2Payload &&other);
+ C2Payload &operator=(const C2Payload &other);
+
+ protected:
+
+ // identifier for this payload.
+ std::string ident_;
+
+ std::string label_;
+
+ std::vector<C2Payload> payloads_;
+
+ std::vector<C2ContentResponse> content_;
+
+ Operation op_;
+
+ bool raw_;
+
+ std::string raw_data_;
+
+ bool isResponse;
+
+};
+
+} /* namesapce c2 */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_C2_C2PAYLOAD_H_ */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/c2/C2Protocol.h
----------------------------------------------------------------------
diff --git a/libminifi/include/c2/C2Protocol.h b/libminifi/include/c2/C2Protocol.h
new file mode 100644
index 0000000..683a486
--- /dev/null
+++ b/libminifi/include/c2/C2Protocol.h
@@ -0,0 +1,119 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_C2_C2PROTOCOL_H_
+#define LIBMINIFI_INCLUDE_C2_C2PROTOCOL_H_
+
+#include "C2Payload.h"
+#include "core/controller/ControllerServiceProvider.h"
+#include "properties/Configure.h"
+#include "core/Connectable.h"
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace c2 {
+
+/**
+ * Defines a protocol to perform state management of the minifi agent.
+ */
+class C2Protocol : public core::Connectable {
+ public:
+
+ C2Protocol(std::string name, uuid_t uuid)
+ : core::Connectable(name, uuid),
+ running_(true) {
+
+ }
+
+ virtual void initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<Configure> &configure) {
+ controller_ = controller;
+ configuration_ = configure;
+ }
+ virtual ~C2Protocol() {
+
+ }
+
+ /**
+ * Update the configuration.
+ */
+ virtual void update(const std::shared_ptr<Configure> &configure)=0;
+
+ /**
+ * Send a C2 payload to the provided URI. The direction indicates to the protocol whether or not this a transmit or receive operation.
+ * Depending on the protocol this may mean different things.
+ *
+ * @param url url.
+ * @param operation payload to perform and/or send
+ * @param direction direction of the C2 operation.
+ * @param async whether or not this is an asynchronous operation
+ * @return payload from the response or a response to come back in the face of an asynchronous operation.
+ */
+ virtual C2Payload consumePayload(const std::string &url, const C2Payload &operation, Direction direction = TRANSMIT, bool async = false) = 0;
+
+ /**
+ * Send a C2 payload . The direction indicates to the protocol whether or not this a transmit or receive operation.
+ * Depending on the protocol this may mean different things.
+ *
+ * @param operation payload to perform and/or send
+ * @param direction direction of the C2 operation.
+ * @param async whether or not this is an asynchronous operation
+ * @return payload from the response or a response to come back in the face of an asynchronous operation.
+ */
+ virtual C2Payload consumePayload(const C2Payload &operation, Direction direction = TRANSMIT, bool async = false) = 0;
+
+ /**
+ * Determines if we are connected and operating
+ */
+ virtual bool isRunning() {
+ return running_.load();
+ }
+
+ /**
+ * Block until work is available on any input connection, or the given duration elapses
+ * @param timeoutMs timeout in milliseconds
+ */
+ void waitForWork(uint64_t timeoutMs);
+
+ virtual void yield() {
+
+ }
+
+ /**
+ * Determines if work is available by this connectable
+ * @return boolean if work is available.
+ */
+ virtual bool isWorkAvailable() {
+ return true;
+ }
+
+ protected:
+
+ std::atomic<bool> running_;
+
+ std::shared_ptr<core::controller::ControllerServiceProvider> controller_;
+
+ std::shared_ptr<Configure> configuration_;
+};
+
+} /* namesapce c2 */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_C2_C2PROTOCOL_H_ */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/c2/HeartBeatReporter.h
----------------------------------------------------------------------
diff --git a/libminifi/include/c2/HeartBeatReporter.h b/libminifi/include/c2/HeartBeatReporter.h
new file mode 100644
index 0000000..3d0fd49
--- /dev/null
+++ b/libminifi/include/c2/HeartBeatReporter.h
@@ -0,0 +1,101 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_C2_HEARTBEATREPORTER_H_
+#define LIBMINIFI_INCLUDE_C2_HEARTBEATREPORTER_H_
+
+#include "C2Protocol.h"
+#include "C2Payload.h"
+#include "core/controller/ControllerServiceProvider.h"
+#include "properties/Configure.h"
+#include "core/Connectable.h"
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace c2 {
+
+/**
+ * Defines a heart beat reporting interface. Note that this differs from
+ * C2Protocol as heartbeats can be any interface which provides only one way communication.
+ */
+class HeartBeatReporter : public core::Connectable {
+ public:
+
+ HeartBeatReporter(std::string name, uuid_t uuid)
+ : core::Connectable(name, uuid),
+ controller_(nullptr),
+ configuration_(nullptr) {
+ }
+
+ virtual void initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<Configure> &configure) {
+ controller_ = controller;
+ configuration_ = configure;
+ }
+ virtual ~HeartBeatReporter() {
+ }
+ /**
+ * Send a C2 payloadd to the provided URI. The direction indicates to the protocol whether or not this a transmit or receive operation.
+ * Depending on the protocol this may mean different things.
+ *
+ * @param url url.
+ * @param operation payload to perform and/or send
+ * @param direction direction of the C2 operation.
+ * @param async whether or not this is an asynchronous operation
+ * @return result of the heartbeat operation
+ */
+ virtual int16_t heartbeat(const C2Payload &heartbeat) = 0;
+
+ /**
+ * Determines if we are connected and operating
+ */
+ virtual bool isRunning() {
+ return true;
+ }
+
+ /**
+ * Block until work is available on any input connection, or the given duration elapses
+ * @param timeoutMs timeout in milliseconds
+ */
+ void waitForWork(uint64_t timeoutMs);
+
+ virtual void yield() {
+
+ }
+
+ /**
+ * Determines if work is available by this connectable
+ * @return boolean if work is available.
+ */
+ virtual bool isWorkAvailable() {
+ return true;
+ }
+
+ protected:
+
+ std::shared_ptr<core::controller::ControllerServiceProvider> controller_;
+
+ std::shared_ptr<Configure> configuration_;
+};
+
+} /* namesapce c2 */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_C2_HEARTBEATREPORTER_H_ */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/c2/protocols/Protocols.h
----------------------------------------------------------------------
diff --git a/libminifi/include/c2/protocols/Protocols.h b/libminifi/include/c2/protocols/Protocols.h
new file mode 100644
index 0000000..c4c314f
--- /dev/null
+++ b/libminifi/include/c2/protocols/Protocols.h
@@ -0,0 +1,24 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_C2_PROTOCOLS_H_
+#define LIBMINIFI_INCLUDE_C2_PROTOCOLS_H_
+
+#include "RESTReceiver.h"
+#include "RESTSender.h"
+
+#endif /* LIBMINIFI_INCLUDE_C2_RESTPROTOCOL_H_ */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/c2/protocols/RESTProtocol.h
----------------------------------------------------------------------
diff --git a/libminifi/include/c2/protocols/RESTProtocol.h b/libminifi/include/c2/protocols/RESTProtocol.h
new file mode 100644
index 0000000..2978a03
--- /dev/null
+++ b/libminifi/include/c2/protocols/RESTProtocol.h
@@ -0,0 +1,74 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_C2_PROTOCOLS_RESTPROTOCOL_H_
+#define LIBMINIFI_INCLUDE_C2_PROTOCOLS_RESTPROTOCOL_H_
+
+#include "json/json.h"
+#include "json/writer.h"
+#include <string>
+#include <mutex>
+#include "CivetServer.h"
+#include "../C2Protocol.h"
+#include "../HeartBeatReporter.h"
+#include "controllers/SSLContextService.h"
+#include "utils/ByteInputCallBack.h"
+#include "utils/HTTPClient.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace c2 {
+
+/**
+ * Purpose and Justification: Encapsulates the restful protocol that is built upon C2Protocol.
+ *
+ * The external interfaces rely solely on send, where send includes a Direction. Transmit will perform a POST
+ * and RECEIVE will perform a GET. This does not mean we can't receive on a POST; however, since Direction
+ * will encompass other protocols the context of its meaning here simply translates into POST and GET respectively.
+ *
+ */
+class RESTProtocol {
+ public:
+ RESTProtocol() {
+
+ }
+
+ virtual ~RESTProtocol() {
+
+ }
+
+ protected:
+
+ virtual Json::Value serializeJsonPayload(Json::Value &json_root, const C2Payload &payload);
+
+ virtual const C2Payload parseJsonResponse(const C2Payload &payload, const std::vector<char> &response);
+
+ virtual std::string getOperation(const C2Payload &payload);
+
+ virtual Operation stringToOperation(const std::string str);
+
+};
+
+} /* namesapce c2 */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_C2_PROTOCOLS_RESTOPERATIONS_H_ */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/c2/protocols/RESTReceiver.h
----------------------------------------------------------------------
diff --git a/libminifi/include/c2/protocols/RESTReceiver.h b/libminifi/include/c2/protocols/RESTReceiver.h
new file mode 100644
index 0000000..17b5028
--- /dev/null
+++ b/libminifi/include/c2/protocols/RESTReceiver.h
@@ -0,0 +1,110 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_C2_RESTRCVR_H_
+#define LIBMINIFI_INCLUDE_C2_RESTRCVR_H_
+
+#include "RESTSender.h"
+#include "json/json.h"
+#include "json/writer.h"
+#include <string>
+#include <mutex>
+#include "core/Resource.h"
+#include "RESTProtocol.h"
+#include "CivetServer.h"
+#include "../C2Protocol.h"
+#include "controllers/SSLContextService.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace c2 {
+
+int log_message(const struct mg_connection *conn, const char *message);
+
+int ssl_protocol_en(void *ssl_context, void *user_data);
+
+/**
+ * Purpose and Justification: Encapsulates the restful protocol that is built upon C2Protocol.
+ *
+ * The external interfaces rely solely on send, where send includes a Direction. Transmit will perform a POST
+ * and RECEIVE will perform a GET. This does not mean we can't receive on a POST; however, since Direction
+ * will encompass other protocols the context of its meaning here simply translates into POST and GET respectively.
+ *
+ */
+class RESTReceiver : public RESTProtocol, public HeartBeatReporter {
+ public:
+ RESTReceiver(std::string name, uuid_t uuid = nullptr);
+
+ void initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<Configure> &configure);
+ virtual int16_t heartbeat(const C2Payload &heartbeat);
+
+ protected:
+
+ class ListeningProtocol : public CivetHandler {
+
+ public:
+ ListeningProtocol() {
+
+ }
+
+ bool handleGet(CivetServer *server, struct mg_connection *conn) {
+ std::string currentvalue;
+ {
+ std::lock_guard<std::mutex> lock(reponse_mutex_);
+ currentvalue = resp_;
+ }
+
+ mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
+ "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
+ currentvalue.length());
+ mg_printf(conn, "%s", currentvalue.c_str());
+ return true;
+ }
+
+ void setResponse(std::string response) {
+ std::lock_guard<std::mutex> lock(reponse_mutex_);
+ resp_ = response;
+ }
+
+ protected:
+ std::mutex reponse_mutex_;
+ std::string resp_;
+
+ };
+
+ std::unique_ptr<CivetServer> start_webserver(const std::string &port, std::string &rooturi, CivetHandler *handler, std::string &ca_cert);
+
+ std::unique_ptr<CivetServer> start_webserver(const std::string &port, std::string &rooturi, CivetHandler *handler);
+
+ std::unique_ptr<CivetServer> listener;
+ std::unique_ptr<ListeningProtocol> handler;
+
+ private:
+ std::shared_ptr<logging::Logger> logger_;
+};
+
+REGISTER_RESOURCE(RESTReceiver);
+
+} /* namesapce c2 */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_C2_RESTRCVR_H_ */
|