Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 91E34200D23 for ; Wed, 4 Oct 2017 18:08:53 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 9028B1609D6; Wed, 4 Oct 2017 16:08:53 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 14887160BDA for ; Wed, 4 Oct 2017 18:08:50 +0200 (CEST) Received: (qmail 47296 invoked by uid 500); 4 Oct 2017 16:08:50 -0000 Mailing-List: contact commits-help@nifi.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@nifi.apache.org Delivered-To: mailing list commits@nifi.apache.org Received: (qmail 47199 invoked by uid 99); 4 Oct 2017 16:08:50 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 04 Oct 2017 16:08:50 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id D8875F5C6C; Wed, 4 Oct 2017 16:08:47 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit From: jeremydyer@apache.org To: commits@nifi.apache.org Date: Wed, 04 Oct 2017 16:08:50 -0000 Message-Id: <1c323d06cea44eeca5e31a9dbb8a274a@git.apache.org> In-Reply-To: <3bb8e7bdfe0744c2a060784f9410d954@git.apache.org> References: <3bb8e7bdfe0744c2a060784f9410d954@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [4/4] nifi-minifi-cpp git commit: MINIFICPP-215: Make libCuRL containing classes extensions. archived-at: Wed, 04 Oct 2017 16:08:53 -0000 MINIFICPP-215: Make libCuRL containing classes extensions. MINIFICPP-215: Add conditional to HTTP-CURL MINIFICPP-215: Update Readme MINIFICPP-215: Udate tests and ensure extensions doc is up to date MINIFICPP-215: Add header to Extensions MINIFICPP-215: Update linux breakage This closes #141. Signed-off-by: Jeremy Dyer Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/96d18744 Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/96d18744 Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/96d18744 Branch: refs/heads/master Commit: 96d187445d9a4c24e2dbb5e4b0b400e3746e0ad1 Parents: 1a2fa1e Author: Marc Parisi Authored: Tue Sep 19 13:09:59 2017 -0400 Committer: Jeremy Dyer Committed: Wed Oct 4 12:07:35 2017 -0400 ---------------------------------------------------------------------- CMakeLists.txt | 6 +- Extensions.md | 90 ++++ README.md | 4 + cmake/BuildTests.cmake | 38 +- extensions/http-curl/CMakeLists.txt | 75 ++++ extensions/http-curl/HttpCurlLoader.cpp | 30 ++ extensions/http-curl/HttpCurlLoader.h | 82 ++++ extensions/http-curl/client/HTTPClient.h | 191 +++++++++ extensions/http-curl/client/HttpClient.cpp | 257 ++++++++++++ extensions/http-curl/processors/InvokeHTTP.cpp | 404 ++++++++++++++++++ extensions/http-curl/processors/InvokeHTTP.h | 181 +++++++++ extensions/http-curl/protocols/RESTProtocol.cpp | 177 ++++++++ extensions/http-curl/protocols/RESTProtocol.h | 74 ++++ extensions/http-curl/protocols/RESTReceiver.cpp | 148 +++++++ extensions/http-curl/protocols/RESTReceiver.h | 110 +++++ extensions/http-curl/protocols/RESTSender.cpp | 144 +++++++ extensions/http-curl/protocols/RESTSender.h | 80 ++++ libminifi/CMakeLists.txt | 17 +- libminifi/include/c2/C2Agent.h | 3 +- libminifi/include/c2/protocols/Protocols.h | 24 -- libminifi/include/c2/protocols/RESTProtocol.h | 74 ---- libminifi/include/c2/protocols/RESTReceiver.h | 110 ----- libminifi/include/c2/protocols/RESTSender.h | 80 ---- libminifi/include/core/ClassLoader.h | 17 +- libminifi/include/core/ConfigurableComponent.h | 2 +- libminifi/include/core/Connectable.h | 2 +- libminifi/include/core/FlowConfiguration.h | 18 +- libminifi/include/core/ProcessSession.h | 24 +- libminifi/include/core/Processor.h | 2 +- libminifi/include/processors/InvokeHTTP.h | 181 --------- libminifi/include/processors/LoadProcessors.h | 1 - libminifi/include/utils/HTTPClient.h | 238 +++++------ libminifi/src/RemoteProcessorGroupPort.cpp | 35 +- libminifi/src/c2/C2Agent.cpp | 2 + libminifi/src/c2/protocols/RESTProtocol.cpp | 177 -------- libminifi/src/c2/protocols/RESTReceiver.cpp | 148 ------- libminifi/src/c2/protocols/RESTSender.cpp | 144 ------- libminifi/src/core/ClassLoader.cpp | 19 +- libminifi/src/core/FlowConfiguration.cpp | 5 + libminifi/src/core/yaml/YamlConfiguration.cpp | 17 + libminifi/src/processors/InvokeHTTP.cpp | 406 ------------------- libminifi/src/utils/HttpClient.cpp | 214 ---------- .../test/curl-tests/C2NullConfiguration.cpp | 136 +++++++ libminifi/test/curl-tests/C2UpdateTest.cpp | 187 +++++++++ .../curl-tests/C2VerifyHeartbeatAndStop.cpp | 154 +++++++ .../test/curl-tests/C2VerifyServeResults.cpp | 132 ++++++ libminifi/test/curl-tests/GetFileNoData.cpp | 184 +++++++++ .../test/curl-tests/HttpGetIntegrationTest.cpp | 159 ++++++++ .../test/curl-tests/HttpPostIntegrationTest.cpp | 111 +++++ libminifi/test/curl-tests/InvokeHTTPTests.cpp | 312 ++++++++++++++ .../test/curl-tests/SiteToSiteRestTest.cpp | 145 +++++++ libminifi/test/curl-tests/ThreadPoolAdjust.cpp | 111 +++++ .../test/integration/C2NullConfiguration.cpp | 135 ------ libminifi/test/integration/C2UpdateTest.cpp | 185 --------- .../integration/C2VerifyHeartbeatAndStop.cpp | 153 ------- .../test/integration/C2VerifyServeResults.cpp | 131 ------ libminifi/test/integration/GetFileNoData.cpp | 184 --------- .../test/integration/HttpGetIntegrationTest.cpp | 158 -------- .../integration/HttpPostIntegrationTest.cpp | 110 ----- .../test/integration/SiteToSiteRestTest.cpp | 144 ------- libminifi/test/integration/ThreadPoolAdjust.cpp | 109 ----- libminifi/test/unit/InvokeHTTPTests.cpp | 310 -------------- main/CMakeLists.txt | 14 + thirdparty/concurrentqueue/concurrentqueue.h | 1 - 64 files changed, 3958 insertions(+), 3358 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/96d18744/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/CMakeLists.txt b/CMakeLists.txt index a81127c..81ff472 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -53,6 +53,7 @@ set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -fsanitize=address") # Search for threads find_package(Threads REQUIRED) +find_package(CURL) # Set the right openssl root path if (${CMAKE_SYSTEM_NAME} MATCHES "Darwin") @@ -74,7 +75,6 @@ list(APPEND CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/cmake") # Include LevelDB find_package (Leveldb REQUIRED) -find_package(CURL REQUIRED) if (LEVELDB_FOUND) include_directories(${LEVELDB_INCLUDE_DIRS}) else () @@ -109,6 +109,10 @@ add_subdirectory(thirdparty/yaml-cpp-yaml-cpp-0.5.3) add_subdirectory(thirdparty/civetweb-1.9.1 EXCLUDE_FROM_ALL) include_directories(thirdparty/concurrentqueue) add_subdirectory(libminifi) +if (CURL_FOUND AND NOT DISABLE_CURL) + add_subdirectory(extensions/http-curl) + mark_as_advanced(HTTP-CURL) +endif(CURL_FOUND) add_subdirectory(main) http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/96d18744/Extensions.md ---------------------------------------------------------------------- diff --git a/Extensions.md b/Extensions.md new file mode 100644 index 0000000..7e13243 --- /dev/null +++ b/Extensions.md @@ -0,0 +1,90 @@ + +# Apache MiNiFi Extensions HowTo + +Extensions consist of modules that are conditionally built into your client. Reasons why you may wish to do this with your modules/processors + + - Do not with to make dependencies required or the lack thereof is a known/expected runtime condition. + - You wish to allow users to exclude dependencies for a variety of reasons. + +# Extensions by example +We've used HTTP-CURL as the first example. We've taken all libcURL runtime classes and placed them into an extensions folder + - /extensions/http-curl + +This folder contains a CMakeLists file so that we can conditionally build it. In the case with libcURL, if the user does not have curl installed OR they specify -DDISABLE_CURL=true in the cmake build, the extensions will not be built. In this case, when the extension is not built, C2 REST protocols, InvokeHTTP, and an HTTP Client implementation will not be included. + +Your CMAKE file should build a static library, that will be included into the run time. This must be added with your conditional to the libminifi CMAKE, along with a platform specific whole archive inclusion. Note that this will ensure that despite no direct linkage being found by the compiler, we will include the code so that we can dynamically find your code. + +# C bindings +To find your classes, you must adhere to a dlsym call back that adheres to the core::ObjectFactory class, like the one below. This object factory will return a list of classes, that we can instantiate through the class loader mechanism. Note that since we are including your code directly into our runtime, we will take care of dlopen and dlsym calls. A map from the class name to the object factory is kept in memory. + +```C++ +class __attribute__((visibility("default"))) HttpCurlObjectFactory : public core::ObjectFactory { + public: + HttpCurlObjectFactory() { + + } + + /** + * Gets the name of the object. + * @return class name of processor + */ + virtual std::string getName() { + return "HttpCurlObjectFactory"; + } + + virtual std::string getClassName() { + return "HttpCurlObjectFactory"; + } + /** + * Gets the class name for the object + * @return class name for the processor. + */ + virtual std::vector getClassNames() { + std::vector class_names; + class_names.push_back("RESTProtocol"); + class_names.push_back("RESTReceiver"); + class_names.push_back("RESTSender"); + class_names.push_back("InvokeHTTP"); + class_names.push_back("HTTPClient"); + return class_names; + } + + virtual std::unique_ptr assign(const std::string &class_name) { + if (class_name == "RESTReceiver") { + return std::unique_ptr(new core::DefautObjectFactory()); + } else if (class_name == "RESTSender") { + return std::unique_ptr(new core::DefautObjectFactory()); + } else if (class_name == "InvokeHTTP") { + return std::unique_ptr(new core::DefautObjectFactory()); + } else if (class_name == "HTTPClient") { + return std::unique_ptr(new core::DefautObjectFactory()); + } else { + return nullptr; + } + } + +}; + +extern "C" { +void *createHttpCurlFactory(void); +} +``` + +#Using your object factory function +To use your C function, you must define the sequence "Class Loader Functions" in your YAML file under FlowController. This will indicate to the code that the factory function is to be called and we will create the mappings defined, above. + +Note that for the case of HTTP-CURL we have made it so that if libcURL exists and it is not disabled, the createHttpCurlFactory function is automatically loaded. To do this use the function FlowConfiguration::add_static_func -- this will add your function to the list of registered resources and will do so in a thread safe way. If you take this approach you cannot disable your library with an argument within the YAML file. + http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/96d18744/README.md ---------------------------------------------------------------------- diff --git a/README.md b/README.md index 1c879c5..d3c8851 100644 --- a/README.md +++ b/README.md @@ -479,6 +479,10 @@ MiNiFi can also be installed as a system service using minifi.sh with an optiona $ ./bin/minifi.sh install [service name] +### Extensions + +Please see [Extensions.md](Extensions.md) on how to build and run conditionally built dependencies and extensions. + ## Documentation See https://nifi.apache.org/minifi for the latest documentation. http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/96d18744/cmake/BuildTests.cmake ---------------------------------------------------------------------- diff --git a/cmake/BuildTests.cmake b/cmake/BuildTests.cmake index 69c5e7d..4f4bed0 100644 --- a/cmake/BuildTests.cmake +++ b/cmake/BuildTests.cmake @@ -53,10 +53,19 @@ function(createTests testName) target_include_directories(${testName} PRIVATE BEFORE "libminifi/include/processors") target_include_directories(${testName} PRIVATE BEFORE "libminifi/include/provenance") target_link_libraries(${testName} ${CMAKE_THREAD_LIBS_INIT} ${UUID_LIBRARIES} ${LEVELDB_LIBRARIES} ${OPENSSL_LIBRARIES} minifi yaml-cpp c-library civetweb-cpp ${JSON_CPP_LIB}) - if (CURL_FOUND) + if (HTTP-CURL) target_include_directories(${testName} PRIVATE BEFORE ${CURL_INCLUDE_DIRS}) - target_link_libraries(${testName} ${CURL_LIBRARIES}) - endif(CURL_FOUND) + target_include_directories(${testName} PRIVATE BEFORE "extensions/http-curl/") + target_include_directories(${testName} PRIVATE BEFORE "extensions/http-curl/client/") + target_include_directories(${testName} PRIVATE BEFORE "extensions/http-curl/processors/") + target_include_directories(${testName} PRIVATE BEFORE "extensions/http-curl/protocols/") + target_link_libraries(${testName} ${CURL_LIBRARIES} ) + if (APPLE) + target_link_libraries (${testName} -Wl,-all_load ${HTTP-CURL}) + else () + target_link_libraries (${testName} -Wl,--whole-archive ${HTTP-CURL} -Wl,--no-whole-archive) + endif () + endif() endfunction() @@ -67,6 +76,7 @@ SET(TEST_RESOURCES ${TEST_DIR}/resources) GETSOURCEFILES(UNIT_TESTS "${TEST_DIR}/unit/") GETSOURCEFILES(INTEGRATION_TESTS "${TEST_DIR}/integration/") +GETSOURCEFILES(CURL_INTEGRATION_TESTS "${TEST_DIR}/curl-tests/") SET(UNIT_TEST_COUNT 0) FOREACH(testfile ${UNIT_TESTS}) @@ -88,7 +98,19 @@ FOREACH(testfile ${INTEGRATION_TESTS}) ENDFOREACH() message("-- Finished building ${INT_TEST_COUNT} integration test file(s)...") -add_test(NAME ControllerServiceIntegrationTests COMMAND ControllerServiceIntegrationTests "${TEST_RESOURCES}/TestControllerServices.yml" "${TEST_RESOURCES}/") +if (HTTP-CURL) + +SET(CURL_INT_TEST_COUNT 0) +FOREACH(testfile ${CURL_INTEGRATION_TESTS}) + get_filename_component(testfilename "${testfile}" NAME_WE) + add_executable("${testfilename}" "${TEST_DIR}/curl-tests/${testfile}" ${SPD_SOURCES} "${TEST_DIR}/TestBase.cpp") + createTests("${testfilename}") + #message("Adding ${testfilename} from ${testfile}") + MATH(EXPR CURL_INT_TEST_COUNT "${CURL_INT_TEST_COUNT}+1") +ENDFOREACH() +message("-- Finished building ${CURL_INT_TEST_COUNT} libcURL integration test file(s)...") + + add_test(NAME HttpGetIntegrationTest COMMAND HttpGetIntegrationTest "${TEST_RESOURCES}/TestHTTPGet.yml" "${TEST_RESOURCES}/") @@ -100,8 +122,6 @@ add_test(NAME HttpPostIntegrationTest COMMAND HttpPostIntegrationTest "${TEST_RE add_test(NAME HttpPostIntegrationTestChunked COMMAND HttpPostIntegrationTest "${TEST_RESOURCES}/TestHTTPPostChunkedEncoding.yml" "${TEST_RESOURCES}/") -add_test(NAME ThreadPoolAdjust COMMAND ThreadPoolAdjust "${TEST_RESOURCES}/TestHTTPPostChunkedEncoding.yml" "${TEST_RESOURCES}/") - add_test(NAME C2VerifyServeResults COMMAND C2VerifyServeResults "${TEST_RESOURCES}/TestHTTPGet.yml" "${TEST_RESOURCES}/") add_test(NAME C2NullConfiguration COMMAND C2NullConfiguration "${TEST_RESOURCES}/TestHTTPGet.yml" "${TEST_RESOURCES}/") @@ -113,4 +133,10 @@ add_test(NAME SiteToSiteRestTest COMMAND SiteToSiteRestTest "${TEST_RESOURCES}/T ## removed due to travis issues with our certs #add_test(NAME SiteToSiteRestTestSecure COMMAND SiteToSiteRestTest "${TEST_RESOURCES}/TestSite2SiteRestSecure.yml" "${TEST_RESOURCES}/" "https://localhost:8082/nifi-api/controller") +add_test(NAME ControllerServiceIntegrationTests COMMAND ControllerServiceIntegrationTests "${TEST_RESOURCES}/TestControllerServices.yml" "${TEST_RESOURCES}/") + +add_test(NAME ThreadPoolAdjust COMMAND ThreadPoolAdjust "${TEST_RESOURCES}/TestHTTPPostChunkedEncoding.yml" "${TEST_RESOURCES}/") + +endif(HTTP-CURL) + add_test(NAME TestExecuteProcess COMMAND TestExecuteProcess ) http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/96d18744/extensions/http-curl/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/extensions/http-curl/CMakeLists.txt b/extensions/http-curl/CMakeLists.txt new file mode 100644 index 0000000..cb91683 --- /dev/null +++ b/extensions/http-curl/CMakeLists.txt @@ -0,0 +1,75 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +cmake_minimum_required(VERSION 2.6) + +find_package(CURL REQUIRED) + +set(CMAKE_EXE_LINKER_FLAGS "-Wl,--export-all-symbols") +set(CMAKE_SHARED_LINKER_FLAGS "-Wl,--export-symbols") + +include_directories(../../libminifi/include ../../libminifi/include/c2 ../../libminifi/include/c2/protocols/ ../../libminifi/include/core/state ./libminifi/include/core/statemanagement/metrics ../../libminifi/include/core/yaml ../../libminifi/include/core ../../thirdparty/spdlog-20170710/include ../../thirdparty/concurrentqueue ../../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include ../../thirdparty/civetweb-1.9.1/include ../../thirdparty/jsoncpp/include ../../thirdparty/leveldb-1.18/include ../../thirdparty/) + +find_package(Boost REQUIRED) +include_directories(${Boost_INCLUDE_DIRS}) + +file(GLOB SOURCES "*.cpp" "protocols/*.cpp" "client/*.cpp" "processors/*.cpp") + +add_library(minifi-http-curl STATIC ${SOURCES}) +set_property(TARGET minifi-http-curl PROPERTY POSITION_INDEPENDENT_CODE ON) +if(THREADS_HAVE_PTHREAD_ARG) + target_compile_options(PUBLIC minifi-http-curl "-pthread") +endif() +if(CMAKE_THREAD_LIBS_INIT) + target_link_libraries(minifi-http-curl "${CMAKE_THREAD_LIBS_INIT}") +endif() + +if (CURL_FOUND) + include_directories(${CURL_INCLUDE_DIRS}) + target_link_libraries (minifi-http-curl ${CURL_LIBRARIES}) +endif(CURL_FOUND) + +# Include UUID +find_package(UUID REQUIRED) +target_link_libraries(minifi-http-curl ${LIBMINIFI} ${UUID_LIBRARIES} ${JSONCPP_LIB}) +add_dependencies(minifi-http-curl jsoncpp_project) +find_package(OpenSSL REQUIRED) +include_directories(${OPENSSL_INCLUDE_DIR}) +target_link_libraries(minifi-http-curl ${CMAKE_DL_LIBS} ) +find_package(ZLIB REQUIRED) +include_directories(${ZLIB_INCLUDE_DIRS}) +target_link_libraries (minifi-http-curl ${ZLIB_LIBRARIES}) +if (WIN32) + set_target_properties(minifi-http-curl PROPERTIES + LINK_FLAGS "/WHOLEARCHIVE" + ) +elseif (APPLE) + set_target_properties(minifi-http-curl PROPERTIES + LINK_FLAGS "-Wl,-all_load" + ) +else () + set_target_properties(minifi-http-curl PROPERTIES + LINK_FLAGS "-Wl,--whole-archive" + ) +endif () + + +SET (HTTP-CURL minifi-http-curl PARENT_SCOPE) + + http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/96d18744/extensions/http-curl/HttpCurlLoader.cpp ---------------------------------------------------------------------- diff --git a/extensions/http-curl/HttpCurlLoader.cpp b/extensions/http-curl/HttpCurlLoader.cpp new file mode 100644 index 0000000..b1f3eb5 --- /dev/null +++ b/extensions/http-curl/HttpCurlLoader.cpp @@ -0,0 +1,30 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "HttpCurlLoader.h" + +#include "core/FlowConfiguration.h" + +bool HttpCurlObjectFactory::added = core::FlowConfiguration::add_static_func("createHttpCurlFactory"); + +extern "C" { + +void *createHttpCurlFactory(void) { + return new HttpCurlObjectFactory(); +} + +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/96d18744/extensions/http-curl/HttpCurlLoader.h ---------------------------------------------------------------------- diff --git a/extensions/http-curl/HttpCurlLoader.h b/extensions/http-curl/HttpCurlLoader.h new file mode 100644 index 0000000..cb9a30d --- /dev/null +++ b/extensions/http-curl/HttpCurlLoader.h @@ -0,0 +1,82 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef EXTENSIONS_HTTPCURLLOADER_H +#define EXTENSIONS_HTTPCURLLOADER_H + +#include "protocols/RESTProtocol.h" +#include "protocols/RESTSender.h" +#include "protocols/RESTReceiver.h" +#include "processors/InvokeHTTP.h" +#include "client/HTTPClient.h" +#include "core/ClassLoader.h" + +class __attribute__((visibility("default"))) HttpCurlObjectFactory : public core::ObjectFactory { + public: + HttpCurlObjectFactory() { + + } + + /** + * Gets the name of the object. + * @return class name of processor + */ + virtual std::string getName() { + return "HttpCurlObjectFactory"; + } + + virtual std::string getClassName() { + return "HttpCurlObjectFactory"; + } + /** + * Gets the class name for the object + * @return class name for the processor. + */ + virtual std::vector getClassNames() { + std::vector class_names; + class_names.push_back("RESTProtocol"); + class_names.push_back("RESTReceiver"); + class_names.push_back("RESTSender"); + class_names.push_back("InvokeHTTP"); + class_names.push_back("HTTPClient"); + return class_names; + } + + virtual std::unique_ptr assign(const std::string &class_name) { + if (class_name == "RESTReceiver") { + return std::unique_ptr(new core::DefautObjectFactory()); + } else if (class_name == "RESTSender") { + return std::unique_ptr(new core::DefautObjectFactory()); + } else if (class_name == "InvokeHTTP") { + return std::unique_ptr(new core::DefautObjectFactory()); + } else if (class_name == "HTTPClient") { + return std::unique_ptr(new core::DefautObjectFactory()); + } else { + return nullptr; + } + } + + static bool added; + + + +}; + +extern "C" { +void *createHttpCurlFactory(void); +} +#endif /* EXTENSIONS_HTTPCURLLOADER_H */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/96d18744/extensions/http-curl/client/HTTPClient.h ---------------------------------------------------------------------- diff --git a/extensions/http-curl/client/HTTPClient.h b/extensions/http-curl/client/HTTPClient.h new file mode 100644 index 0000000..609a052 --- /dev/null +++ b/extensions/http-curl/client/HTTPClient.h @@ -0,0 +1,191 @@ +/** + * HTTPUtils class declaration + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __HTTP_UTILS_H__ +#define __HTTP_UTILS_H__ + +#include "utils/HTTPClient.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include "controllers/SSLContextService.h" +#include "utils/ByteInputCallBack.h" +#include "core/logging/Logger.h" +#include "core/logging/LoggerConfiguration.h" +#include "properties/Configure.h" +#include "io/validation.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace utils { + +/** + * Purpose and Justification: Initializes and cleans up curl once. Cleanup will only occur at the end of our execution since we are relying on a static variable. + */ +class HTTPClientInitializer { + public: + static HTTPClientInitializer *getInstance() { + static HTTPClientInitializer initializer; + return &initializer; + } + private: + ~HTTPClientInitializer() { + curl_global_cleanup(); + } + HTTPClientInitializer() { + curl_global_init(CURL_GLOBAL_DEFAULT); + } +}; + +/** + * Purpose and Justification: Pull the basics for an HTTPClient into a self contained class. Simply provide + * the URL and an SSLContextService ( can be null). + * + * Since several portions of the code have been relying on curl, we can encapsulate most CURL HTTP + * operations here without maintaining it everywhere. Further, this will help with testing as we + * only need to to test our usage of CURL once + */ +class HTTPClient : public BaseHTTPClient, public core::Connectable { + public: + + HTTPClient(); + + HTTPClient(std::string name, uuid_t uuid); + + HTTPClient(const std::string &url, const std::shared_ptr ssl_context_service = nullptr); + + ~HTTPClient(); + + void setVerbose(); + + void initialize(const std::string &method, const std::string url = "", const std::shared_ptr ssl_context_service = nullptr); + + void setConnectionTimeout(int64_t timeout); + + void setReadTimeout(int64_t timeout); + + void setUploadCallback(HTTPUploadCallback *callbackObj); + + struct curl_slist *build_header_list(std::string regex, const std::map &attributes); + + void setContentType(std::string content_type); + + std::string escape(std::string string_to_escape); + + void setPostFields(std::string input); + + void setHeaders(struct curl_slist *list); + + void appendHeader(const std::string &new_header); + + bool submit(); + + CURLcode getResponseResult(); + + int64_t &getResponseCode(); + + const char *getContentType(); + + const std::vector &getResponseBody(); + + void set_request_method(const std::string method); + + void setUseChunkedEncoding(); + + void setDisablePeerVerification(); + + const std::vector &getHeaders() { + return header_response_.header_tokens_; + + } + + /** + * Determines if we are connected and operating + */ + virtual bool isRunning() { + return true; + } + + /** + * Block until work is available on any input connection, or the given duration elapses + * @param timeoutMs timeout in milliseconds + */ + void waitForWork(uint64_t timeoutMs) { + } + + virtual void yield() { + + } + + /** + * Determines if work is available by this connectable + * @return boolean if work is available. + */ + virtual bool isWorkAvailable() { + return true; + } + + protected: + + inline bool matches(const std::string &value, const std::string &sregex); + + static CURLcode configure_ssl_context(CURL *curl, void *ctx, void *param) { + minifi::controllers::SSLContextService *ssl_context_service = static_cast(param); + if (!ssl_context_service->configure_ssl_context(static_cast(ctx))) { + return CURLE_FAILED_INIT; + } + return CURLE_OK; + } + + void configure_secure_connection(CURL *http_session); + + bool isSecure(const std::string &url); + struct curl_slist *headers_; + utils::HTTPRequestResponse content_; + utils::HTTPHeaderResponse header_response_; + CURLcode res; + int64_t http_code; + char *content_type; + + int64_t connect_timeout_; +// read timeout. + int64_t read_timeout_; + + std::string content_type_; + + std::shared_ptr logger_; + CURL *http_session_; + std::string url_; + std::string method_; + std::shared_ptr ssl_context_service_; +}; + +} /* namespace utils */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/96d18744/extensions/http-curl/client/HttpClient.cpp ---------------------------------------------------------------------- diff --git a/extensions/http-curl/client/HttpClient.cpp b/extensions/http-curl/client/HttpClient.cpp new file mode 100644 index 0000000..f2bc0e1 --- /dev/null +++ b/extensions/http-curl/client/HttpClient.cpp @@ -0,0 +1,257 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "HTTPClient.h" +#include +#include +#include +#include +#include + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace utils { + +HTTPClient::HTTPClient(const std::string &url, const std::shared_ptr ssl_context_service) + : core::Connectable("HTTPClient", 0), + ssl_context_service_(ssl_context_service), + url_(url), + logger_(logging::LoggerFactory::getLogger()), + connect_timeout_(0), + read_timeout_(0), + content_type(nullptr), + headers_(nullptr), + http_code(0), + header_response_(1), + res(CURLE_OK) { + HTTPClientInitializer *initializer = HTTPClientInitializer::getInstance(); + http_session_ = curl_easy_init(); +} + +HTTPClient::HTTPClient(std::string name, uuid_t uuid) + : core::Connectable(name, uuid), + ssl_context_service_(nullptr), + url_(), + logger_(logging::LoggerFactory::getLogger()), + connect_timeout_(0), + read_timeout_(0), + content_type(nullptr), + headers_(nullptr), + http_code(0), + header_response_(1), + res(CURLE_OK) { + HTTPClientInitializer *initializer = HTTPClientInitializer::getInstance(); + http_session_ = curl_easy_init(); +} + +HTTPClient::HTTPClient() + : core::Connectable("HTTPClient", 0), + ssl_context_service_(nullptr), + url_(), + logger_(logging::LoggerFactory::getLogger()), + connect_timeout_(0), + read_timeout_(0), + content_type(nullptr), + headers_(nullptr), + http_code(0), + header_response_(1), + res(CURLE_OK) { + HTTPClientInitializer *initializer = HTTPClientInitializer::getInstance(); + http_session_ = curl_easy_init(); +} + +HTTPClient::~HTTPClient() { + if (nullptr != headers_) { + curl_slist_free_all(headers_); + } + curl_easy_cleanup(http_session_); +} + +void HTTPClient::setVerbose() { + curl_easy_setopt(http_session_, CURLOPT_VERBOSE, 1L); +} + +void HTTPClient::initialize(const std::string &method, const std::string url, const std::shared_ptr ssl_context_service) { + method_ = method; + set_request_method(method_); + if (ssl_context_service != nullptr) { + ssl_context_service_ = ssl_context_service; + } + if (!url.empty()) { + url_ = url; + } + if (isSecure(url_) && ssl_context_service_ != nullptr) { + configure_secure_connection(http_session_); + } +} + +void HTTPClient::setDisablePeerVerification() { + logger_->log_info("Disabling peer verification"); + curl_easy_setopt(http_session_, CURLOPT_SSL_VERIFYPEER, 0L); +} + +void HTTPClient::setConnectionTimeout(int64_t timeout) { + connect_timeout_ = timeout; +} + +void HTTPClient::setReadTimeout(int64_t timeout) { + read_timeout_ = timeout; +} + +void HTTPClient::setUploadCallback(HTTPUploadCallback *callbackObj) { + logger_->log_info("Setting callback"); + if (method_ == "put" || method_ == "PUT") { + curl_easy_setopt(http_session_, CURLOPT_INFILESIZE_LARGE, (curl_off_t ) callbackObj->ptr->getBufferSize()); + } else { + curl_easy_setopt(http_session_, CURLOPT_POSTFIELDSIZE, (curl_off_t ) callbackObj->ptr->getBufferSize()); + } + curl_easy_setopt(http_session_, CURLOPT_READFUNCTION, &utils::HTTPRequestResponse::send_write); + curl_easy_setopt(http_session_, CURLOPT_READDATA, static_cast(callbackObj)); +} + +struct curl_slist *HTTPClient::build_header_list(std::string regex, const std::map &attributes) { + if (http_session_) { + for (auto attribute : attributes) { + if (matches(attribute.first, regex)) { + std::string attr = attribute.first + ":" + attribute.second; + headers_ = curl_slist_append(headers_, attr.c_str()); + } + } + } + return headers_; +} + +void HTTPClient::setContentType(std::string content_type) { + content_type_ = "Content-Type: " + content_type; + headers_ = curl_slist_append(headers_, content_type_.c_str()); +} + +std::string HTTPClient::escape(std::string string_to_escape) { + return curl_easy_escape(http_session_, string_to_escape.c_str(), string_to_escape.length()); +} + +void HTTPClient::setPostFields(std::string input) { + curl_easy_setopt(http_session_, CURLOPT_POSTFIELDSIZE, input.length()); + curl_easy_setopt(http_session_, CURLOPT_POSTFIELDS, input.c_str()); +} + +void HTTPClient::setHeaders(struct curl_slist *list) { + headers_ = list; +} + +void HTTPClient::appendHeader(const std::string &new_header) { + headers_ = curl_slist_append(headers_, new_header.c_str()); +} + +void HTTPClient::setUseChunkedEncoding() { + headers_ = curl_slist_append(headers_, "Transfer-Encoding: chunked"); +} + +bool HTTPClient::submit() { + if (connect_timeout_ > 0) { + curl_easy_setopt(http_session_, CURLOPT_CONNECTTIMEOUT, connect_timeout_); + } + + if (headers_ != nullptr) { + headers_ = curl_slist_append(headers_, "Expect:"); + curl_easy_setopt(http_session_, CURLOPT_HTTPHEADER, headers_); + } + + curl_easy_setopt(http_session_, CURLOPT_URL, url_.c_str()); + logger_->log_info("Submitting to %s", url_); + curl_easy_setopt(http_session_, CURLOPT_WRITEFUNCTION, &utils::HTTPRequestResponse::recieve_write); + curl_easy_setopt(http_session_, CURLOPT_WRITEDATA, static_cast(&content_)); + + curl_easy_setopt(http_session_, CURLOPT_HEADERFUNCTION, &utils::HTTPHeaderResponse::receive_headers); + curl_easy_setopt(http_session_, CURLOPT_HEADERDATA, static_cast(&header_response_)); + + res = curl_easy_perform(http_session_); + curl_easy_getinfo(http_session_, CURLINFO_RESPONSE_CODE, &http_code); + curl_easy_getinfo(http_session_, CURLINFO_CONTENT_TYPE, &content_type); + if (res != CURLE_OK) { + logger_->log_error("curl_easy_perform() failed %s\n", curl_easy_strerror(res)); + return false; + } + return true; +} + +CURLcode HTTPClient::getResponseResult() { + return res; +} + +int64_t &HTTPClient::getResponseCode() { + return http_code; +} + +const char *HTTPClient::getContentType() { + return content_type; +} + +const std::vector &HTTPClient::getResponseBody() { + return content_.data; +} + +void HTTPClient::set_request_method(const std::string method) { + std::string my_method = method; + std::transform(my_method.begin(), my_method.end(), my_method.begin(), ::toupper); + if (my_method == "POST") { + curl_easy_setopt(http_session_, CURLOPT_POST, 1L); + } else if (my_method == "PUT") { + curl_easy_setopt(http_session_, CURLOPT_PUT, 0L); + } else if (my_method == "GET") { + } else { + curl_easy_setopt(http_session_, CURLOPT_CUSTOMREQUEST, my_method.c_str()); + } +} + +bool HTTPClient::matches(const std::string &value, const std::string &sregex) { + if (sregex == ".*") + return true; + + regex_t regex; + int ret = regcomp(®ex, sregex.c_str(), 0); + if (ret) + return false; + ret = regexec(®ex, value.c_str(), (size_t) 0, NULL, 0); + regfree(®ex); + if (ret) + return false; + + return true; +} + +void HTTPClient::configure_secure_connection(CURL *http_session) { + logger_->log_debug("Using certificate file %s", ssl_context_service_->getCertificateFile()); + curl_easy_setopt(http_session, CURLOPT_SSL_CTX_FUNCTION, &configure_ssl_context); + curl_easy_setopt(http_session, CURLOPT_SSL_CTX_DATA, static_cast(ssl_context_service_.get())); +} + +bool HTTPClient::isSecure(const std::string &url) { + if (url.find("https") != std::string::npos) { + logger_->log_debug("%s is a secure url", url); + return true; + } + return false; +} + +} /* namespace utils */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/96d18744/extensions/http-curl/processors/InvokeHTTP.cpp ---------------------------------------------------------------------- diff --git a/extensions/http-curl/processors/InvokeHTTP.cpp b/extensions/http-curl/processors/InvokeHTTP.cpp new file mode 100644 index 0000000..8e16122 --- /dev/null +++ b/extensions/http-curl/processors/InvokeHTTP.cpp @@ -0,0 +1,404 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "InvokeHTTP.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "core/FlowFile.h" +#include "core/logging/Logger.h" +#include "core/ProcessContext.h" +#include "core/Relationship.h" +#include "io/DataStream.h" +#include "io/StreamFactory.h" +#include "ResourceClaim.h" +#include "utils/StringUtils.h" +#include "utils/ByteInputCallBack.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +std::shared_ptr InvokeHTTP::id_generator_ = utils::IdGenerator::getIdGenerator(); + +const char *InvokeHTTP::ProcessorName = "InvokeHTTP"; + +core::Property InvokeHTTP::Method("HTTP Method", "HTTP request method (GET, POST, PUT, PATCH, DELETE, HEAD, OPTIONS). " + "Arbitrary methods are also supported. Methods other than POST, PUT and PATCH will be sent without a message body.", + "GET"); +core::Property InvokeHTTP::URL("Remote URL", "Remote URL which will be connected to, including scheme, host, port, path.", ""); +core::Property InvokeHTTP::ConnectTimeout("Connection Timeout", "Max wait time for connection to remote service.", "5 secs"); +core::Property InvokeHTTP::ReadTimeout("Read Timeout", "Max wait time for response from remote service.", "15 secs"); +core::Property InvokeHTTP::DateHeader("Include Date Header", "Include an RFC-2616 Date header in the request.", "True"); +core::Property InvokeHTTP::FollowRedirects("Follow Redirects", "Follow HTTP redirects issued by remote server.", "True"); +core::Property InvokeHTTP::AttributesToSend("Attributes to Send", "Regular expression that defines which attributes to send as HTTP" + " headers in the request. If not defined, no attributes are sent as headers.", + ""); +core::Property InvokeHTTP::SSLContext("SSL Context Service", "The SSL Context Service used to provide client certificate information for TLS/SSL (https) connections.", ""); +core::Property InvokeHTTP::ProxyHost("Proxy Host", "The fully qualified hostname or IP address of the proxy server", ""); +core::Property InvokeHTTP::ProxyPort("Proxy Port", "The port of the proxy server", ""); +core::Property InvokeHTTP::ProxyUser("invokehttp-proxy-user", "Username to set when authenticating against proxy", ""); +core::Property InvokeHTTP::ProxyPassword("invokehttp-proxy-password", "Password to set when authenticating against proxy", ""); +core::Property InvokeHTTP::ContentType("Content-type", "The Content-Type to specify for when content is being transmitted through a PUT, " + "POST or PATCH. In the case of an empty value after evaluating an expression language expression, " + "Content-Type defaults to", + "application/octet-stream"); +core::Property InvokeHTTP::SendBody("send-message-body", "If true, sends the HTTP message body on POST/PUT/PATCH requests (default). " + "If false, suppresses the message body and content-type header for these requests.", + "true"); +core::Property InvokeHTTP::UseChunkedEncoding("Use Chunked Encoding", "When POST'ing, PUT'ing or PATCH'ing content set this property to true in order to not pass the 'Content-length' header" + " and instead send 'Transfer-Encoding' with a value of 'chunked'. This will enable the data transfer mechanism which was introduced in HTTP 1.1 " + "to pass data of unknown lengths in chunks.", + "false"); +core::Property InvokeHTTP::PropPutOutputAttributes("Put Response Body in Attribute", "If set, the response body received back will be put into an attribute of the original " + "FlowFile instead of a separate FlowFile. The attribute key to put to is determined by evaluating value of this property. ", + ""); +core::Property InvokeHTTP::AlwaysOutputResponse("Always Output Response", "Will force a response FlowFile to be generated and routed to the 'Response' relationship " + "regardless of what the server status code received is ", + "false"); +core::Property InvokeHTTP::PenalizeOnNoRetry("Penalize on \"No Retry\"", "Enabling this property will penalize FlowFiles that are routed to the \"No Retry\" relationship.", "false"); + +core::Property InvokeHTTP::DisablePeerVerification("Disable Peer Verification", "Disables peer verification for the SSL session", "false"); +const char* InvokeHTTP::STATUS_CODE = "invokehttp.status.code"; +const char* InvokeHTTP::STATUS_MESSAGE = "invokehttp.status.message"; +const char* InvokeHTTP::RESPONSE_BODY = "invokehttp.response.body"; +const char* InvokeHTTP::REQUEST_URL = "invokehttp.request.url"; +const char* InvokeHTTP::TRANSACTION_ID = "invokehttp.tx.id"; +const char* InvokeHTTP::REMOTE_DN = "invokehttp.remote.dn"; +const char* InvokeHTTP::EXCEPTION_CLASS = "invokehttp.java.exception.class"; +const char* InvokeHTTP::EXCEPTION_MESSAGE = "invokehttp.java.exception.message"; + +core::Relationship InvokeHTTP::Success("success", "All files are routed to success"); + +core::Relationship InvokeHTTP::RelResponse("response", "Represents a response flowfile"); + +core::Relationship InvokeHTTP::RelRetry("retry", "The original FlowFile will be routed on any status code that can be retried " + "(5xx status codes). It will have new attributes detailing the request."); + +core::Relationship InvokeHTTP::RelNoRetry("no retry", "The original FlowFile will be routed on any status code that should NOT " + "be retried (1xx, 3xx, 4xx status codes). It will have new attributes detailing the request."); + +core::Relationship InvokeHTTP::RelFailure("failure", "The original FlowFile will be routed on any type of connection failure, " + "timeout or general exception. It will have new attributes detailing the request."); + +void InvokeHTTP::initialize() { + logger_->log_info("Initializing InvokeHTTP"); + + // Set the supported properties + std::set properties; + properties.insert(Method); + properties.insert(URL); + properties.insert(ConnectTimeout); + properties.insert(ReadTimeout); + properties.insert(DateHeader); + properties.insert(AttributesToSend); + properties.insert(SSLContext); + properties.insert(ProxyHost); + properties.insert(ProxyPort); + properties.insert(ProxyUser); + properties.insert(UseChunkedEncoding); + properties.insert(ProxyPassword); + properties.insert(ContentType); + properties.insert(SendBody); + properties.insert(DisablePeerVerification); + properties.insert(AlwaysOutputResponse); + + setSupportedProperties(properties); + // Set the supported relationships + std::set relationships; + relationships.insert(Success); + setSupportedRelationships(relationships); +} + +void InvokeHTTP::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) { + if (!context->getProperty(Method.getName(), method_)) { + logger_->log_info("%s attribute is missing, so default value of %s will be used", Method.getName().c_str(), Method.getValue().c_str()); + return; + } + + if (!context->getProperty(URL.getName(), url_)) { + logger_->log_info("%s attribute is missing, so default value of %s will be used", URL.getName().c_str(), URL.getValue().c_str()); + return; + } + + std::string timeoutStr; + + if (context->getProperty(ConnectTimeout.getName(), timeoutStr)) { + core::Property::StringToInt(timeoutStr, connect_timeout_); + // set the timeout in curl options. + + } else { + logger_->log_info("%s attribute is missing, so default value of %s will be used", ConnectTimeout.getName().c_str(), ConnectTimeout.getValue().c_str()); + + return; + } + + std::string contentTypeStr; + if (context->getProperty(ContentType.getName(), contentTypeStr)) { + content_type_ = contentTypeStr; + } + + if (context->getProperty(ReadTimeout.getName(), timeoutStr)) { + core::Property::StringToInt(timeoutStr, read_timeout_); + + } else { + logger_->log_info("%s attribute is missing, so default value of %s will be used", ReadTimeout.getName().c_str(), ReadTimeout.getValue().c_str()); + } + + std::string dateHeaderStr; + if (!context->getProperty(DateHeader.getName(), dateHeaderStr)) { + logger_->log_info("%s attribute is missing, so default value of %s will be used", DateHeader.getName().c_str(), DateHeader.getValue().c_str()); + } + + date_header_include_ = utils::StringUtils::StringToBool(dateHeaderStr, date_header_include_); + + if (!context->getProperty(PropPutOutputAttributes.getName(), put_attribute_name_)) { + logger_->log_info("%s attribute is missing, so default value of %s will be used", PropPutOutputAttributes.getName().c_str(), PropPutOutputAttributes.getValue().c_str()); + } + + if (!context->getProperty(AttributesToSend.getName(), attribute_to_send_regex_)) { + logger_->log_info("%s attribute is missing, so default value of %s will be used", AttributesToSend.getName().c_str(), AttributesToSend.getValue().c_str()); + } + + std::string always_output_response = "false"; + if (!context->getProperty(AlwaysOutputResponse.getName(), always_output_response)) { + logger_->log_info("%s attribute is missing, so default value of %s will be used", AlwaysOutputResponse.getName().c_str(), AlwaysOutputResponse.getValue().c_str()); + } + + utils::StringUtils::StringToBool(always_output_response, always_output_response_); + + std::string penalize_no_retry = "false"; + if (!context->getProperty(PenalizeOnNoRetry.getName(), penalize_no_retry)) { + logger_->log_info("%s attribute is missing, so default value of %s will be used", PenalizeOnNoRetry.getName().c_str(), PenalizeOnNoRetry.getValue().c_str()); + } + + utils::StringUtils::StringToBool(penalize_no_retry, penalize_no_retry_); + + std::string context_name; + if (context->getProperty(SSLContext.getName(), context_name) && !IsNullOrEmpty(context_name)) { + std::shared_ptr service = context->getControllerService(context_name); + if (nullptr != service) { + ssl_context_service_ = std::static_pointer_cast(service); + } + } + + std::string useChunkedEncoding = "false"; + if (!context->getProperty(UseChunkedEncoding.getName(), useChunkedEncoding)) { + logger_->log_info("%s attribute is missing, so default value of %s will be used", UseChunkedEncoding.getName().c_str(), UseChunkedEncoding.getValue().c_str()); + } + + utils::StringUtils::StringToBool(useChunkedEncoding, use_chunked_encoding_); + + std::string disablePeerVerification = "false"; + if (context->getProperty(DisablePeerVerification.getName(), disablePeerVerification)) { + utils::StringUtils::StringToBool(disablePeerVerification, disable_peer_verification_); + } +} + +InvokeHTTP::~InvokeHTTP() { +} + +std::string InvokeHTTP::generateId() { + uuid_t txId; + id_generator_->generate(txId); + char uuidStr[37]; + uuid_unparse_lower(txId, uuidStr); + return uuidStr; +} + +bool InvokeHTTP::emitFlowFile(const std::string &method) { + return ("POST" == method || "PUT" == method || "PATCH" == method); +} + +void InvokeHTTP::onTrigger(core::ProcessContext *context, core::ProcessSession *session) { + logger_->log_info("onTrigger InvokeHTTP with %s to %s", method_, url_); + + std::shared_ptr flowFile = std::static_pointer_cast(session->get()); + + if (flowFile == nullptr) { + if (!emitFlowFile(method_)) { + logger_->log_info("InvokeHTTP -- create flow file with %s", method_.c_str()); + flowFile = std::static_pointer_cast(session->create()); + } else { + logger_->log_info("exiting because method is %s", method_.c_str()); + return; + } + } else { + logger_->log_info("InvokeHTTP -- Received flowfile "); + } + // create a transaction id + std::string tx_id = generateId(); + + utils::HTTPClient client(url_, ssl_context_service_); + + client.initialize(method_); + client.setConnectionTimeout(connect_timeout_); + client.setReadTimeout(read_timeout_); + + if (!content_type_.empty()) { + client.setContentType(content_type_); + } + + if (use_chunked_encoding_) { + client.setUseChunkedEncoding(); + } + + if (disable_peer_verification_) { + logger_->log_debug("Disabling peer verification in HTTPClient"); + client.setDisablePeerVerification(); + } + + std::unique_ptr callback = nullptr; + std::unique_ptr callbackObj = nullptr; + if (emitFlowFile(method_)) { + logger_->log_info("InvokeHTTP -- reading flowfile"); + std::shared_ptr claim = flowFile->getResourceClaim(); + if (claim) { + callback = std::unique_ptr(new utils::ByteInputCallBack()); + session->read(flowFile, callback.get()); + callbackObj = std::unique_ptr(new utils::HTTPUploadCallback); + callbackObj->ptr = callback.get(); + callbackObj->pos = 0; + logger_->log_info("InvokeHTTP -- Setting callback, size is %d", callback->getBufferSize()); + client.setUploadCallback(callbackObj.get()); + } else { + logger_->log_error("InvokeHTTP -- no resource claim"); + } + + } else { + logger_->log_info("InvokeHTTP -- Not emitting flowfile to HTTP Server"); + } + + // append all headers + client.build_header_list(attribute_to_send_regex_, flowFile->getAttributes()); + + logger_->log_info("InvokeHTTP -- curl performed"); + if (client.submit()) { + logger_->log_info("InvokeHTTP -- curl successful"); + + bool putToAttribute = !IsNullOrEmpty(put_attribute_name_); + + const std::vector &response_body = client.getResponseBody(); + const std::vector &response_headers = client.getHeaders(); + + int64_t http_code = client.getResponseCode(); + const char *content_type = client.getContentType(); + flowFile->addAttribute(STATUS_CODE, std::to_string(http_code)); + if (response_headers.size() > 0) + flowFile->addAttribute(STATUS_MESSAGE, response_headers.at(0)); + flowFile->addAttribute(REQUEST_URL, url_); + flowFile->addAttribute(TRANSACTION_ID, tx_id); + + bool isSuccess = ((int32_t) (http_code / 100)) == 2; + bool output_body_to_requestAttr = (!isSuccess || putToAttribute) && flowFile != nullptr; + bool output_body_to_content = isSuccess && !putToAttribute; + bool body_empty = IsNullOrEmpty(response_body); + + logger_->log_info("isSuccess: %d, response code %d", isSuccess, http_code); + std::shared_ptr response_flow = nullptr; + + if (output_body_to_content) { + if (flowFile != nullptr) { + response_flow = std::static_pointer_cast(session->create(flowFile)); + } else { + response_flow = std::static_pointer_cast(session->create()); + } + + std::string ct = content_type; + response_flow->addKeyedAttribute(MIME_TYPE, ct); + response_flow->addAttribute(STATUS_CODE, std::to_string(http_code)); + if (response_headers.size() > 0) + flowFile->addAttribute(STATUS_MESSAGE, response_headers.at(0)); + response_flow->addAttribute(REQUEST_URL, url_); + response_flow->addAttribute(TRANSACTION_ID, tx_id); + io::DataStream stream((const uint8_t*) response_body.data(), response_body.size()); + // need an import from the data stream. + session->importFrom(stream, response_flow); + } else { + logger_->log_info("Cannot output body to content"); + response_flow = std::static_pointer_cast(session->create()); + } + route(flowFile, response_flow, session, context, isSuccess, http_code); + } +} + +void InvokeHTTP::route(std::shared_ptr &request, std::shared_ptr &response, core::ProcessSession *session, core::ProcessContext *context, bool isSuccess, + int statusCode) { + // check if we should yield the processor + if (!isSuccess && request == nullptr) { + context->yield(); + } + + // If the property to output the response flowfile regardless of status code is set then transfer it + bool responseSent = false; + if (always_output_response_ && response != nullptr) { + logger_->log_info("Outputting success and response"); + session->transfer(response, Success); + responseSent = true; + } + + // transfer to the correct relationship + // 2xx -> SUCCESS + if (isSuccess) { + // we have two flowfiles to transfer + if (request != nullptr) { + session->transfer(request, Success); + } + if (response != nullptr && !responseSent) { + logger_->log_info("Outputting success and response"); + session->transfer(response, Success); + } + + // 5xx -> RETRY + } else if (statusCode / 100 == 5) { + if (request != nullptr) { + session->penalize(request); + session->transfer(request, RelRetry); + } + + // 1xx, 3xx, 4xx -> NO RETRY + } else { + if (request != nullptr) { + if (penalize_no_retry_) { + session->penalize(request); + } + session->transfer(request, RelNoRetry); + } + } +} + +} /* namespace processors */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/96d18744/extensions/http-curl/processors/InvokeHTTP.h ---------------------------------------------------------------------- diff --git a/extensions/http-curl/processors/InvokeHTTP.h b/extensions/http-curl/processors/InvokeHTTP.h new file mode 100644 index 0000000..2a00cef --- /dev/null +++ b/extensions/http-curl/processors/InvokeHTTP.h @@ -0,0 +1,181 @@ +/** + * InvokeHTTP class declaration + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __INVOKE_HTTP_H__ +#define __INVOKE_HTTP_H__ + +#include +#include + +#include +#include "FlowFileRecord.h" +#include "core/Processor.h" +#include "core/ProcessSession.h" +#include "core/Core.h" +#include "core/Property.h" +#include "core/Resource.h" +#include "controllers/SSLContextService.h" +#include "utils/ByteInputCallBack.h" +#include "core/logging/LoggerConfiguration.h" +#include "utils/Id.h" +#include "../client/HTTPClient.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +// InvokeHTTP Class +class InvokeHTTP : public core::Processor { + public: + + // Constructor + /*! + * Create a new processor + */ + InvokeHTTP(std::string name, uuid_t uuid = NULL) + : Processor(name, uuid), + date_header_include_(true), + connect_timeout_(20000), + penalize_no_retry_(false), + read_timeout_(20000), + always_output_response_(false), + disable_peer_verification_(false), + ssl_context_service_(nullptr), + use_chunked_encoding_(false), + logger_(logging::LoggerFactory::getLogger()) { + static utils::HTTPClientInitializer *initializer = utils::HTTPClientInitializer::getInstance(); + } + // Destructor + virtual ~InvokeHTTP(); + // Processor Name + static const char *ProcessorName; + // Supported Properties + static core::Property Method; + static core::Property URL; + static core::Property ConnectTimeout; + static core::Property ReadTimeout; + static core::Property DateHeader; + static core::Property FollowRedirects; + static core::Property AttributesToSend; + static core::Property SSLContext; + static core::Property ProxyHost; + static core::Property ProxyPort; + static core::Property ProxyUser; + static core::Property ProxyPassword; + static core::Property ContentType; + static core::Property SendBody; + static core::Property UseChunkedEncoding; + static core::Property DisablePeerVerification; + static core::Property PropPutOutputAttributes; + + static core::Property AlwaysOutputResponse; + + static core::Property PenalizeOnNoRetry; + + static const char* STATUS_CODE; + static const char* STATUS_MESSAGE; + static const char* RESPONSE_BODY; + static const char* REQUEST_URL; + static const char* TRANSACTION_ID; + static const char* REMOTE_DN; + static const char* EXCEPTION_CLASS; + static const char* EXCEPTION_MESSAGE; + // Supported Relationships + static core::Relationship Success; + static core::Relationship RelResponse; + static core::Relationship RelRetry; + static core::Relationship RelNoRetry; + static core::Relationship RelFailure; + + virtual void onTrigger(core::ProcessContext *context, core::ProcessSession *session); + virtual void initialize(); + virtual void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory); + /** + * Provides a reference to the URL. + */ + const std::string &getUrl() { + return url_; + } + + protected: + + /** + * Generate a transaction ID + * @return transaction ID string. + */ + std::string generateId(); + + /** + * Routes the flowfile to the proper destination + * @param request request flow file record + * @param response response flow file record + * @param session process session + * @param context process context + * @param isSuccess success code or not + * @param statuscode http response code. + */ + void route(std::shared_ptr &request, std::shared_ptr &response, core::ProcessSession *session, core::ProcessContext *context, bool isSuccess, int statusCode); + /** + * Determine if we should emit a new flowfile based on our activity + * @param method method type + * @return result of the evaluation. + */ + bool emitFlowFile(const std::string &method); + + std::shared_ptr ssl_context_service_; + + // http method + std::string method_; + // url + std::string url_; + // include date in the header + bool date_header_include_; + // attribute to send regex + std::string attribute_to_send_regex_; + // connection timeout + int64_t connect_timeout_; + // read timeout. + int64_t read_timeout_; + // attribute in which response body will be added + std::string put_attribute_name_; + // determine if we always output a response. + bool always_output_response_; + // penalize on no retry + bool penalize_no_retry_; + // content type. + std::string content_type_; + // use chunked encoding. + bool use_chunked_encoding_; + // disable peer verification ( makes susceptible for MITM attacks ) + bool disable_peer_verification_; + private: + std::shared_ptr logger_; + static std::shared_ptr id_generator_; +}; + +REGISTER_RESOURCE(InvokeHTTP) + +} /* namespace processors */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/96d18744/extensions/http-curl/protocols/RESTProtocol.cpp ---------------------------------------------------------------------- diff --git a/extensions/http-curl/protocols/RESTProtocol.cpp b/extensions/http-curl/protocols/RESTProtocol.cpp new file mode 100644 index 0000000..9d58c36 --- /dev/null +++ b/extensions/http-curl/protocols/RESTProtocol.cpp @@ -0,0 +1,177 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "RESTProtocol.h" + +#include +#include +#include +#include +#include +#include + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace c2 { + +const C2Payload RESTProtocol::parseJsonResponse(const C2Payload &payload, const std::vector &response) { + Json::Reader reader; + Json::Value root; + try { + if (reader.parse(std::string(response.data(), response.size()), root)) { + std::string requested_operation = getOperation(payload); + + std::string identifier; + if (root.isMember("operationid")) { + identifier = root["operationid"].asString(); + } + if (root["operation"].asString() == requested_operation) { + if (root["requested_operations"].size() == 0) { + return std::move(C2Payload(payload.getOperation(), state::UpdateState::READ_COMPLETE, true)); + } + C2Payload new_payload(payload.getOperation(), state::UpdateState::NESTED, true); + + new_payload.setIdentifier(identifier); + + for (const Json::Value& request : root["requested_operations"]) { + Operation newOp = stringToOperation(request["operation"].asString()); + C2Payload nested_payload(newOp, state::UpdateState::READ_COMPLETE, true); + C2ContentResponse new_command(newOp); + new_command.delay = 0; + new_command.required = true; + new_command.ttl = -1; + // set the identifier if one exists + if (request.isMember("operationid")) { + new_command.ident = request["operationid"].asString(); + nested_payload.setIdentifier(new_command.ident); + } + new_command.name = request["name"].asString(); + + if (request.isMember("content") && request["content"].size() > 0) { + for (const auto &name : request["content"].getMemberNames()) { + new_command.operation_arguments[name] = request["content"][name].asString(); + } + } + nested_payload.addContent(std::move(new_command)); + new_payload.addPayload(std::move(nested_payload)); + } + // we have a response for this request + return std::move(new_payload); + } + } + } catch (...) { + } + return std::move(C2Payload(payload.getOperation(), state::UpdateState::READ_ERROR, true)); +} + +Json::Value RESTProtocol::serializeJsonPayload(Json::Value &json_root, const C2Payload &payload) { + // get the name from the content + Json::Value json_payload; + std::map> children; + for (const auto &nested_payload : payload.getNestedPayloads()) { + Json::Value child_payload = serializeJsonPayload(json_payload, nested_payload); + children[nested_payload.getLabel()].push_back(child_payload); + } + for (auto child_vector : children) { + if (child_vector.second.size() > 1) { + Json::Value children_json(Json::arrayValue); + for (auto child : child_vector.second) { + json_payload[child_vector.first] = child; + } + } else { + if (child_vector.second.size() == 1) { + if (child_vector.second.at(0).isMember(child_vector.first)) { + json_payload[child_vector.first] = child_vector.second.at(0)[child_vector.first]; + } else { + json_payload[child_vector.first] = child_vector.second.at(0); + } + } + } + } + + const std::vector &content = payload.getContent(); + for (const auto &payload_content : content) { + Json::Value payload_content_values; + bool use_sub_option = true; + if (payload_content.op == payload.getOperation()) { + for (auto content : payload_content.operation_arguments) { + if (payload_content.operation_arguments.size() == 1 && payload_content.name == content.first) { + json_payload[payload_content.name] = content.second; + use_sub_option = false; + } else { + payload_content_values[content.first] = content.second; + } + } + } + if (use_sub_option) + json_payload[payload_content.name] = payload_content_values; + } + return json_payload; +} + +std::string RESTProtocol::getOperation(const C2Payload &payload) { + switch (payload.getOperation()) { + case Operation::ACKNOWLEDGE: + return "acknowledge"; + case Operation::HEARTBEAT: + return "heartbeat"; + case Operation::RESTART: + return "restart"; + case Operation::DESCRIBE: + return "describe"; + case Operation::STOP: + return "stop"; + case Operation::START: + return "start"; + case Operation::UPDATE: + return "update"; + default: + return "heartbeat"; + } +} + +Operation RESTProtocol::stringToOperation(const std::string str) { + std::string op = str; + std::transform(str.begin(), str.end(), op.begin(), ::tolower); + if (op == "heartbeat") { + return Operation::HEARTBEAT; + } else if (op == "acknowledge") { + return Operation::ACKNOWLEDGE; + } else if (op == "update") { + return Operation::UPDATE; + } else if (op == "describe") { + return Operation::DESCRIBE; + } else if (op == "restart") { + return Operation::RESTART; + } else if (op == "clear") { + return Operation::CLEAR; + } else if (op == "stop") { + return Operation::STOP; + } else if (op == "start") { + return Operation::START; + } + return Operation::HEARTBEAT; +} + +} /* namespace c2 */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/96d18744/extensions/http-curl/protocols/RESTProtocol.h ---------------------------------------------------------------------- diff --git a/extensions/http-curl/protocols/RESTProtocol.h b/extensions/http-curl/protocols/RESTProtocol.h new file mode 100644 index 0000000..ed62d9c --- /dev/null +++ b/extensions/http-curl/protocols/RESTProtocol.h @@ -0,0 +1,74 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_C2_PROTOCOLS_RESTPROTOCOL_H_ +#define LIBMINIFI_INCLUDE_C2_PROTOCOLS_RESTPROTOCOL_H_ + +#include "json/json.h" +#include "json/writer.h" +#include +#include +#include "CivetServer.h" +#include "c2/C2Protocol.h" +#include "c2/HeartBeatReporter.h" +#include "controllers/SSLContextService.h" +#include "utils/ByteInputCallBack.h" +#include "utils/HTTPClient.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace c2 { + +/** + * Purpose and Justification: Encapsulates the restful protocol that is built upon C2Protocol. + * + * The external interfaces rely solely on send, where send includes a Direction. Transmit will perform a POST + * and RECEIVE will perform a GET. This does not mean we can't receive on a POST; however, since Direction + * will encompass other protocols the context of its meaning here simply translates into POST and GET respectively. + * + */ +class RESTProtocol { + public: + RESTProtocol() { + + } + + virtual ~RESTProtocol() { + + } + + protected: + + virtual Json::Value serializeJsonPayload(Json::Value &json_root, const C2Payload &payload); + + virtual const C2Payload parseJsonResponse(const C2Payload &payload, const std::vector &response); + + virtual std::string getOperation(const C2Payload &payload); + + virtual Operation stringToOperation(const std::string str); + +}; + +} /* namesapce c2 */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif /* LIBMINIFI_INCLUDE_C2_PROTOCOLS_RESTOPERATIONS_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/96d18744/extensions/http-curl/protocols/RESTReceiver.cpp ---------------------------------------------------------------------- diff --git a/extensions/http-curl/protocols/RESTReceiver.cpp b/extensions/http-curl/protocols/RESTReceiver.cpp new file mode 100644 index 0000000..a84f30a --- /dev/null +++ b/extensions/http-curl/protocols/RESTReceiver.cpp @@ -0,0 +1,148 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "RESTReceiver.h" +#include +#include +#include +#include +#include +#include + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace c2 { + +int log_message(const struct mg_connection *conn, const char *message) { + puts(message); + return 1; +} + +int ssl_protocol_en(void *ssl_context, void *user_data) { + struct ssl_ctx_st *ctx = (struct ssl_ctx_st *) ssl_context; + return 0; +} + +RESTReceiver::RESTReceiver(std::string name, uuid_t uuid) + : HeartBeatReporter(name, uuid), + logger_(logging::LoggerFactory::getLogger()) { +} + +void RESTReceiver::initialize(const std::shared_ptr &controller, const std::shared_ptr &configure) { + HeartBeatReporter::initialize(controller, configure); + logger_->log_debug("Initializing rest receiveer"); + if (nullptr != configuration_) { + std::string listeningPort, rootUri, caCert; + configuration_->get("c2.rest.listener.port", listeningPort); + configuration_->get("c2.rest.listener.heartbeat.rooturi", rootUri); + configuration_->get("c2.rest.listener.cacert", caCert); + + if (!listeningPort.empty() && !rootUri.empty()) { + handler = std::unique_ptr(new ListeningProtocol()); + if (!caCert.empty()) { + listener = std::move(start_webserver(listeningPort, rootUri, dynamic_cast(handler.get()), caCert)); + } else { + listener = std::move(start_webserver(listeningPort, rootUri, dynamic_cast(handler.get()))); + } + } + } +} +int16_t RESTReceiver::heartbeat(const C2Payload &payload) { + std::string operation_request_str = getOperation(payload); + std::string outputConfig; + Json::Value json_payload; + json_payload["operation"] = operation_request_str; + if (payload.getIdentifier().length() > 0) { + json_payload["operationid"] = payload.getIdentifier(); + } + const std::vector &content = payload.getContent(); + + for (const auto &payload_content : content) { + Json::Value payload_content_values; + bool use_sub_option = true; + if (payload_content.op == payload.getOperation()) { + for (auto content : payload_content.operation_arguments) { + if (payload_content.operation_arguments.size() == 1 && payload_content.name == content.first) { + json_payload[payload_content.name] = content.second; + use_sub_option = false; + } else { + payload_content_values[content.first] = content.second; + } + } + } + if (use_sub_option) + json_payload[payload_content.name] = payload_content_values; + } + + for (const auto &nested_payload : payload.getNestedPayloads()) { + json_payload[nested_payload.getLabel()] = serializeJsonPayload(json_payload, nested_payload); + } + + Json::StyledWriter writer; + outputConfig = writer.write(json_payload); + if (handler != nullptr) { + logger_->log_debug("Setting %s", outputConfig); + handler->setResponse(outputConfig); + } + + return 0; +} + +std::unique_ptr RESTReceiver::start_webserver(const std::string &port, std::string &rooturi, CivetHandler *handler, std::string &ca_cert) { + struct mg_callbacks callback; + + memset(&callback, 0, sizeof(callback)); + callback.init_ssl = ssl_protocol_en; + std::string my_port = port; + my_port += "s"; + callback.log_message = log_message; + const char *options[] = { "listening_ports", port.c_str(), "ssl_certificate", ca_cert.c_str(), "ssl_protocol_version", "0", "ssl_cipher_list", "ALL", + "ssl_verify_peer", "no", "num_threads", "1", 0 }; + + std::vector cpp_options; + for (int i = 0; i < (sizeof(options) / sizeof(options[0]) - 1); i++) { + cpp_options.push_back(options[i]); + } + std::unique_ptr server = std::unique_ptr(new CivetServer(cpp_options)); + + server->addHandler(rooturi, handler); + + return server; +} + +std::unique_ptr RESTReceiver::start_webserver(const std::string &port, std::string &rooturi, CivetHandler *handler) { + const char *options[] = { "document_root", ".", "listening_ports", port.c_str(), "num_threads", "1", 0 }; + + std::vector cpp_options; + for (int i = 0; i < (sizeof(options) / sizeof(options[0]) - 1); i++) { + cpp_options.push_back(options[i]); + } + std::unique_ptr server = std::unique_ptr(new CivetServer(cpp_options)); + + server->addHandler(rooturi, handler); + + return server; +} + +} /* namespace c2 */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/96d18744/extensions/http-curl/protocols/RESTReceiver.h ---------------------------------------------------------------------- diff --git a/extensions/http-curl/protocols/RESTReceiver.h b/extensions/http-curl/protocols/RESTReceiver.h new file mode 100644 index 0000000..f4e4a3c --- /dev/null +++ b/extensions/http-curl/protocols/RESTReceiver.h @@ -0,0 +1,110 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_C2_RESTRCVR_H_ +#define LIBMINIFI_INCLUDE_C2_RESTRCVR_H_ + +#include "RESTSender.h" +#include "json/json.h" +#include "json/writer.h" +#include +#include +#include "core/Resource.h" +#include "RESTProtocol.h" +#include "CivetServer.h" +#include "c2/C2Protocol.h" +#include "controllers/SSLContextService.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace c2 { + +int log_message(const struct mg_connection *conn, const char *message); + +int ssl_protocol_en(void *ssl_context, void *user_data); + +/** + * Purpose and Justification: Encapsulates the restful protocol that is built upon C2Protocol. + * + * The external interfaces rely solely on send, where send includes a Direction. Transmit will perform a POST + * and RECEIVE will perform a GET. This does not mean we can't receive on a POST; however, since Direction + * will encompass other protocols the context of its meaning here simply translates into POST and GET respectively. + * + */ +class RESTReceiver : public RESTProtocol, public HeartBeatReporter { + public: + RESTReceiver(std::string name, uuid_t uuid = nullptr); + + void initialize(const std::shared_ptr &controller, const std::shared_ptr &configure); + virtual int16_t heartbeat(const C2Payload &heartbeat); + + protected: + + class ListeningProtocol : public CivetHandler { + + public: + ListeningProtocol() { + + } + + bool handleGet(CivetServer *server, struct mg_connection *conn) { + std::string currentvalue; + { + std::lock_guard lock(reponse_mutex_); + currentvalue = resp_; + } + + mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: " + "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n", + currentvalue.length()); + mg_printf(conn, "%s", currentvalue.c_str()); + return true; + } + + void setResponse(std::string response) { + std::lock_guard lock(reponse_mutex_); + resp_ = response; + } + + protected: + std::mutex reponse_mutex_; + std::string resp_; + + }; + + std::unique_ptr start_webserver(const std::string &port, std::string &rooturi, CivetHandler *handler, std::string &ca_cert); + + std::unique_ptr start_webserver(const std::string &port, std::string &rooturi, CivetHandler *handler); + + std::unique_ptr listener; + std::unique_ptr handler; + + private: + std::shared_ptr logger_; +}; + +REGISTER_RESOURCE(RESTReceiver); + +} /* namesapce c2 */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif /* LIBMINIFI_INCLUDE_C2_RESTRCVR_H_ */