http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/96d18744/extensions/http-curl/protocols/RESTSender.cpp
----------------------------------------------------------------------
diff --git a/extensions/http-curl/protocols/RESTSender.cpp b/extensions/http-curl/protocols/RESTSender.cpp
new file mode 100644
index 0000000..e67661c
--- /dev/null
+++ b/extensions/http-curl/protocols/RESTSender.cpp
@@ -0,0 +1,144 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "RESTSender.h"
+
+#include <algorithm>
+#include <memory>
+#include <utility>
+#include <map>
+#include <string>
+#include <vector>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace c2 {
+
+RESTSender::RESTSender(std::string name, uuid_t uuid)
+ : C2Protocol(name, uuid),
+ logger_(logging::LoggerFactory<Connectable>::getLogger()) {
+}
+
+void RESTSender::initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<Configure> &configure) {
+ C2Protocol::initialize(controller, configure);
+ // base URL when one is not specified.
+ if (nullptr != configure) {
+ configure->get("c2.rest.url", rest_uri_);
+ configure->get("c2.rest.url.ack", ack_uri_);
+ }
+ logger_->log_info("Submitting to %s", rest_uri_);
+}
+C2Payload RESTSender::consumePayload(const std::string &url, const C2Payload &payload, Direction direction, bool async) {
+ std::string operation_request_str = getOperation(payload);
+ std::string outputConfig;
+ if (direction == Direction::TRANSMIT) {
+ Json::Value json_payload;
+ json_payload["operation"] = operation_request_str;
+ if (payload.getIdentifier().length() > 0) {
+ json_payload["operationid"] = payload.getIdentifier();
+ }
+ const std::vector<C2ContentResponse> &content = payload.getContent();
+
+ for (const auto &payload_content : content) {
+ Json::Value payload_content_values;
+ bool use_sub_option = true;
+ if (payload_content.op == payload.getOperation()) {
+ for (auto content : payload_content.operation_arguments) {
+ if (payload_content.operation_arguments.size() == 1 && payload_content.name == content.first) {
+ json_payload[payload_content.name] = content.second;
+ use_sub_option = false;
+ } else {
+ payload_content_values[content.first] = content.second;
+ }
+ }
+ }
+ if (use_sub_option)
+ json_payload[payload_content.name] = payload_content_values;
+ }
+
+ for (const auto &nested_payload : payload.getNestedPayloads()) {
+ json_payload[nested_payload.getLabel()] = serializeJsonPayload(json_payload, nested_payload);
+ }
+
+ Json::StyledWriter writer;
+ outputConfig = writer.write(json_payload);
+ }
+
+ return std::move(sendPayload(url, direction, payload, outputConfig));
+}
+
+C2Payload RESTSender::consumePayload(const C2Payload &payload, Direction direction, bool async) {
+ if (payload.getOperation() == ACKNOWLEDGE) {
+ return consumePayload(ack_uri_, payload, direction, async);
+ }
+ return consumePayload(rest_uri_, payload, direction, async);
+}
+
+void RESTSender::update(const std::shared_ptr<Configure> &configure) {
+ std::string url;
+ configure->get("c2.rest.url", url);
+ configure->get("c2.rest.url.ack", url);
+}
+
+const C2Payload RESTSender::sendPayload(const std::string url, const Direction direction, const C2Payload &payload, const std::string outputConfig) {
+ if (!url.empty()) {
+ utils::HTTPClient client(url, ssl_context_service_);
+ client.setConnectionTimeout(2);
+
+ std::unique_ptr<utils::ByteInputCallBack> input = nullptr;
+ std::unique_ptr<utils::HTTPUploadCallback> callback = nullptr;
+ if (direction == Direction::TRANSMIT) {
+ input = std::unique_ptr<utils::ByteInputCallBack>(new utils::ByteInputCallBack());
+ callback = std::unique_ptr<utils::HTTPUploadCallback>(new utils::HTTPUploadCallback);
+ input->write(outputConfig);
+ callback->ptr = input.get();
+ callback->pos = 0;
+ client.set_request_method("POST");
+ client.setUploadCallback(callback.get());
+ } else {
+ // we do not need to set the uplaod callback
+ // since we are not uploading anything on a get
+ client.set_request_method("GET");
+ }
+ client.setContentType("application/json");
+ bool isOkay = client.submit();
+ int64_t respCode = client.getResponseCode();
+
+ if (isOkay && respCode) {
+ if (payload.isRaw()) {
+ C2Payload response_payload(payload.getOperation(), state::UpdateState::READ_COMPLETE, true, true);
+
+ response_payload.setRawData(client.getResponseBody());
+ return std::move(response_payload);
+ }
+ return parseJsonResponse(payload, client.getResponseBody());
+ } else {
+ return C2Payload(payload.getOperation(), state::UpdateState::READ_ERROR, true);
+ }
+ } else {
+ return C2Payload(payload.getOperation(), state::UpdateState::READ_COMPLETE, true);
+ }
+}
+
+} /* namespace c2 */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/96d18744/extensions/http-curl/protocols/RESTSender.h
----------------------------------------------------------------------
diff --git a/extensions/http-curl/protocols/RESTSender.h b/extensions/http-curl/protocols/RESTSender.h
new file mode 100644
index 0000000..09a1a4d
--- /dev/null
+++ b/extensions/http-curl/protocols/RESTSender.h
@@ -0,0 +1,80 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_C2_RESTSENDER_H_
+#define LIBMINIFI_INCLUDE_C2_RESTSENDER_H_
+
+#include "json/json.h"
+#include "json/writer.h"
+#include <string>
+#include <mutex>
+#include "CivetServer.h"
+#include "c2/C2Protocol.h"
+#include "RESTProtocol.h"
+#include "c2/HeartBeatReporter.h"
+#include "controllers/SSLContextService.h"
+#include "utils/ByteInputCallBack.h"
+#include "../client/HTTPClient.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace c2 {
+
+/**
+ * Purpose and Justification: Encapsulates the restful protocol that is built upon C2Protocol.
+ *
+ * The external interfaces rely solely on send, where send includes a Direction. Transmit will perform a POST
+ * and RECEIVE will perform a GET. This does not mean we can't receive on a POST; however, since Direction
+ * will encompass other protocols the context of its meaning here simply translates into POST and GET respectively.
+ *
+ */
+class RESTSender : public RESTProtocol, public C2Protocol {
+ public:
+
+ explicit RESTSender(std::string name, uuid_t uuid = nullptr);
+
+ virtual C2Payload consumePayload(const std::string &url, const C2Payload &payload, Direction direction, bool async);
+
+ virtual C2Payload consumePayload(const C2Payload &payload, Direction direction, bool async);
+
+ virtual void update(const std::shared_ptr<Configure> &configure);
+
+ virtual void initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<Configure> &configure);
+
+ protected:
+
+ virtual const C2Payload sendPayload(const std::string url, const Direction direction, const C2Payload &payload, const std::string outputConfig);
+
+ std::shared_ptr<minifi::controllers::SSLContextService> ssl_context_service_;
+
+ private:
+ std::shared_ptr<logging::Logger> logger_;
+ std::string rest_uri_;
+ std::string ack_uri_;
+};
+
+REGISTER_RESOURCE(RESTSender);
+
+} /* namesapce c2 */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_C2_RESTPROTOCOL_H_ */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/96d18744/libminifi/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/libminifi/CMakeLists.txt b/libminifi/CMakeLists.txt
index 539256b..22046e3 100644
--- a/libminifi/CMakeLists.txt
+++ b/libminifi/CMakeLists.txt
@@ -25,7 +25,6 @@ set(PROJECT_VERSION_MAJOR 0)
set(PROJECT_VERSION_MINOR 3)
set(PROJECT_VERSION_PATCH 0)
-
#### Establish Project Configuration ####
# Enable usage of the VERSION specifier
# https://cmake.org/cmake/help/v3.0/policy/CMP0048.html#policy:CMP0048
@@ -44,6 +43,7 @@ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -DLEVELDB_SUPPORT -DOPENSSL_SUPPORT -DYA
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -DLEVELDB_SUPPORT -DOPENSSL_SUPPORT -DYAML_SUPPORT")
ENDIF()
+
include(CheckCXXCompilerFlag)
CHECK_CXX_COMPILER_FLAG("-std=c++11" COMPILER_SUPPORTS_CXX11)
CHECK_CXX_COMPILER_FLAG("-std=c++0x" COMPILER_SUPPORTS_CXX0X)
@@ -57,6 +57,7 @@ endif()
include_directories(../thirdparty/spdlog-20170710/include)
include_directories(../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include)
+
include_directories(../thirdparty/civetweb-1.9.1/include)
include_directories(../thirdparty/jsoncpp/include)
include_directories(../thirdparty/concurrentqueue/)
@@ -76,20 +77,19 @@ target_link_libraries(minifi ${UUID_LIBRARIES} ${JSONCPP_LIB})
find_package(ZLIB REQUIRED)
include_directories(${ZLIB_INCLUDE_DIRS})
+
target_link_libraries (minifi ${ZLIB_LIBRARIES})
+
+
+
+
if (NOT IOS)
# Include Boost System
find_package(Boost COMPONENTS system filesystem REQUIRED)
-find_package(CURL)
target_link_libraries(minifi ${Boost_SYSTEM_LIBRARY})
target_link_libraries(minifi ${Boost_FILESYSTEM_LIBRARY})
-if (CURL_FOUND)
- include_directories(${CURL_INCLUDE_DIRS})
- target_link_libraries (minifi ${CURL_LIBRARIES})
-endif(CURL_FOUND)
-
# Include LevelDB
find_package (Leveldb REQUIRED)
if (LEVELDB_FOUND)
@@ -109,6 +109,7 @@ else ()
endif (OPENSSL_FOUND)
endif ()
-add_subdirectory(src/utils)
set_property(TARGET minifi PROPERTY INTERPROCEDURAL_OPTIMIZATION True)
+
+SET (LIBMINIFI minifi PARENT_SCOPE)
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/96d18744/libminifi/include/c2/C2Agent.h
----------------------------------------------------------------------
diff --git a/libminifi/include/c2/C2Agent.h b/libminifi/include/c2/C2Agent.h
index 8c2e45d..c2021c9 100644
--- a/libminifi/include/c2/C2Agent.h
+++ b/libminifi/include/c2/C2Agent.h
@@ -29,8 +29,7 @@
#include "C2Payload.h"
#include "C2Protocol.h"
#include "io/validation.h"
-#include "protocols/Protocols.h"
-
+#include "HeartBeatReporter.h"
namespace org {
namespace apache {
namespace nifi {
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/96d18744/libminifi/include/c2/protocols/Protocols.h
----------------------------------------------------------------------
diff --git a/libminifi/include/c2/protocols/Protocols.h b/libminifi/include/c2/protocols/Protocols.h
deleted file mode 100644
index c4c314f..0000000
--- a/libminifi/include/c2/protocols/Protocols.h
+++ /dev/null
@@ -1,24 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef LIBMINIFI_INCLUDE_C2_PROTOCOLS_H_
-#define LIBMINIFI_INCLUDE_C2_PROTOCOLS_H_
-
-#include "RESTReceiver.h"
-#include "RESTSender.h"
-
-#endif /* LIBMINIFI_INCLUDE_C2_RESTPROTOCOL_H_ */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/96d18744/libminifi/include/c2/protocols/RESTProtocol.h
----------------------------------------------------------------------
diff --git a/libminifi/include/c2/protocols/RESTProtocol.h b/libminifi/include/c2/protocols/RESTProtocol.h
deleted file mode 100644
index 2978a03..0000000
--- a/libminifi/include/c2/protocols/RESTProtocol.h
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef LIBMINIFI_INCLUDE_C2_PROTOCOLS_RESTPROTOCOL_H_
-#define LIBMINIFI_INCLUDE_C2_PROTOCOLS_RESTPROTOCOL_H_
-
-#include "json/json.h"
-#include "json/writer.h"
-#include <string>
-#include <mutex>
-#include "CivetServer.h"
-#include "../C2Protocol.h"
-#include "../HeartBeatReporter.h"
-#include "controllers/SSLContextService.h"
-#include "utils/ByteInputCallBack.h"
-#include "utils/HTTPClient.h"
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace c2 {
-
-/**
- * Purpose and Justification: Encapsulates the restful protocol that is built upon C2Protocol.
- *
- * The external interfaces rely solely on send, where send includes a Direction. Transmit will perform a POST
- * and RECEIVE will perform a GET. This does not mean we can't receive on a POST; however, since Direction
- * will encompass other protocols the context of its meaning here simply translates into POST and GET respectively.
- *
- */
-class RESTProtocol {
- public:
- RESTProtocol() {
-
- }
-
- virtual ~RESTProtocol() {
-
- }
-
- protected:
-
- virtual Json::Value serializeJsonPayload(Json::Value &json_root, const C2Payload &payload);
-
- virtual const C2Payload parseJsonResponse(const C2Payload &payload, const std::vector<char> &response);
-
- virtual std::string getOperation(const C2Payload &payload);
-
- virtual Operation stringToOperation(const std::string str);
-
-};
-
-} /* namesapce c2 */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
-
-#endif /* LIBMINIFI_INCLUDE_C2_PROTOCOLS_RESTOPERATIONS_H_ */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/96d18744/libminifi/include/c2/protocols/RESTReceiver.h
----------------------------------------------------------------------
diff --git a/libminifi/include/c2/protocols/RESTReceiver.h b/libminifi/include/c2/protocols/RESTReceiver.h
deleted file mode 100644
index 17b5028..0000000
--- a/libminifi/include/c2/protocols/RESTReceiver.h
+++ /dev/null
@@ -1,110 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef LIBMINIFI_INCLUDE_C2_RESTRCVR_H_
-#define LIBMINIFI_INCLUDE_C2_RESTRCVR_H_
-
-#include "RESTSender.h"
-#include "json/json.h"
-#include "json/writer.h"
-#include <string>
-#include <mutex>
-#include "core/Resource.h"
-#include "RESTProtocol.h"
-#include "CivetServer.h"
-#include "../C2Protocol.h"
-#include "controllers/SSLContextService.h"
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace c2 {
-
-int log_message(const struct mg_connection *conn, const char *message);
-
-int ssl_protocol_en(void *ssl_context, void *user_data);
-
-/**
- * Purpose and Justification: Encapsulates the restful protocol that is built upon C2Protocol.
- *
- * The external interfaces rely solely on send, where send includes a Direction. Transmit will perform a POST
- * and RECEIVE will perform a GET. This does not mean we can't receive on a POST; however, since Direction
- * will encompass other protocols the context of its meaning here simply translates into POST and GET respectively.
- *
- */
-class RESTReceiver : public RESTProtocol, public HeartBeatReporter {
- public:
- RESTReceiver(std::string name, uuid_t uuid = nullptr);
-
- void initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<Configure> &configure);
- virtual int16_t heartbeat(const C2Payload &heartbeat);
-
- protected:
-
- class ListeningProtocol : public CivetHandler {
-
- public:
- ListeningProtocol() {
-
- }
-
- bool handleGet(CivetServer *server, struct mg_connection *conn) {
- std::string currentvalue;
- {
- std::lock_guard<std::mutex> lock(reponse_mutex_);
- currentvalue = resp_;
- }
-
- mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
- "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
- currentvalue.length());
- mg_printf(conn, "%s", currentvalue.c_str());
- return true;
- }
-
- void setResponse(std::string response) {
- std::lock_guard<std::mutex> lock(reponse_mutex_);
- resp_ = response;
- }
-
- protected:
- std::mutex reponse_mutex_;
- std::string resp_;
-
- };
-
- std::unique_ptr<CivetServer> start_webserver(const std::string &port, std::string &rooturi, CivetHandler *handler, std::string &ca_cert);
-
- std::unique_ptr<CivetServer> start_webserver(const std::string &port, std::string &rooturi, CivetHandler *handler);
-
- std::unique_ptr<CivetServer> listener;
- std::unique_ptr<ListeningProtocol> handler;
-
- private:
- std::shared_ptr<logging::Logger> logger_;
-};
-
-REGISTER_RESOURCE(RESTReceiver);
-
-} /* namesapce c2 */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
-
-#endif /* LIBMINIFI_INCLUDE_C2_RESTRCVR_H_ */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/96d18744/libminifi/include/c2/protocols/RESTSender.h
----------------------------------------------------------------------
diff --git a/libminifi/include/c2/protocols/RESTSender.h b/libminifi/include/c2/protocols/RESTSender.h
deleted file mode 100644
index 749de3e..0000000
--- a/libminifi/include/c2/protocols/RESTSender.h
+++ /dev/null
@@ -1,80 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef LIBMINIFI_INCLUDE_C2_RESTSENDER_H_
-#define LIBMINIFI_INCLUDE_C2_RESTSENDER_H_
-
-#include "json/json.h"
-#include "json/writer.h"
-#include <string>
-#include <mutex>
-#include "CivetServer.h"
-#include "../C2Protocol.h"
-#include "RESTProtocol.h"
-#include "../HeartBeatReporter.h"
-#include "controllers/SSLContextService.h"
-#include "utils/ByteInputCallBack.h"
-#include "utils/HTTPClient.h"
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace c2 {
-
-/**
- * Purpose and Justification: Encapsulates the restful protocol that is built upon C2Protocol.
- *
- * The external interfaces rely solely on send, where send includes a Direction. Transmit will perform a POST
- * and RECEIVE will perform a GET. This does not mean we can't receive on a POST; however, since Direction
- * will encompass other protocols the context of its meaning here simply translates into POST and GET respectively.
- *
- */
-class RESTSender : public RESTProtocol, public C2Protocol {
- public:
-
- explicit RESTSender(std::string name, uuid_t uuid = nullptr);
-
- virtual C2Payload consumePayload(const std::string &url, const C2Payload &payload, Direction direction, bool async);
-
- virtual C2Payload consumePayload(const C2Payload &payload, Direction direction, bool async);
-
- virtual void update(const std::shared_ptr<Configure> &configure);
-
- virtual void initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<Configure> &configure);
-
- protected:
-
- virtual const C2Payload sendPayload(const std::string url, const Direction direction, const C2Payload &payload, const std::string outputConfig);
-
- std::shared_ptr<minifi::controllers::SSLContextService> ssl_context_service_;
-
- private:
- std::shared_ptr<logging::Logger> logger_;
- std::string rest_uri_;
- std::string ack_uri_;
-};
-
-REGISTER_RESOURCE(RESTSender);
-
-} /* namesapce c2 */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
-
-#endif /* LIBMINIFI_INCLUDE_C2_RESTPROTOCOL_H_ */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/96d18744/libminifi/include/core/ClassLoader.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/ClassLoader.h b/libminifi/include/core/ClassLoader.h
index b6cefc5..c44cb9a 100644
--- a/libminifi/include/core/ClassLoader.h
+++ b/libminifi/include/core/ClassLoader.h
@@ -86,11 +86,14 @@ class ObjectFactory {
*/
virtual std::string getName() = 0;
+ virtual std::string getClassName() = 0;
/**
* Gets the class name for the object
* @return class name for the processor.
*/
- virtual std::string getClassName() = 0;
+ virtual std::vector<std::string> getClassNames() = 0;
+
+ virtual std::unique_ptr<ObjectFactory> assign(const std::string &class_name) = 0;
};
@@ -161,6 +164,16 @@ class DefautObjectFactory : public ObjectFactory {
return className;
}
+ virtual std::vector<std::string> getClassNames() {
+ std::vector<std::string> container;
+ container.push_back(className);
+ return container;
+ }
+
+ virtual std::unique_ptr<ObjectFactory> assign(const std::string &class_name) {
+ return nullptr;
+ }
+
protected:
std::string className;
@@ -200,7 +213,7 @@ class ClassLoader {
* This will attempt to load objects within this resource.
* @return return code: RESOURCE_FAILURE or RESOURCE_SUCCESS
*/
- uint16_t registerResource(const std::string &resource);
+ uint16_t registerResource(const std::string &resource, const std::string &resourceName);
/**
* Register a class with the give ProcessorFactory
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/96d18744/libminifi/include/core/ConfigurableComponent.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/ConfigurableComponent.h b/libminifi/include/core/ConfigurableComponent.h
index d48f8db..a19abc7 100644
--- a/libminifi/include/core/ConfigurableComponent.h
+++ b/libminifi/include/core/ConfigurableComponent.h
@@ -38,7 +38,7 @@ namespace core {
* Represents a configurable component
* Purpose: Extracts configuration items for all components and localized them
*/
-class ConfigurableComponent {
+class __attribute__((visibility("default"))) ConfigurableComponent {
public:
ConfigurableComponent();
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/96d18744/libminifi/include/core/Connectable.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/Connectable.h b/libminifi/include/core/Connectable.h
index 150b5fc..5d27901 100644
--- a/libminifi/include/core/Connectable.h
+++ b/libminifi/include/core/Connectable.h
@@ -37,7 +37,7 @@ namespace core {
* Purpose: As in NiFi, this represents a connection point and allows the derived
* object to be connected to other connectables.
*/
-class Connectable : public CoreComponent {
+class __attribute__((visibility("default"))) Connectable : public CoreComponent {
public:
explicit Connectable(std::string name, uuid_t uuid);
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/96d18744/libminifi/include/core/FlowConfiguration.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/FlowConfiguration.h b/libminifi/include/core/FlowConfiguration.h
index ffc567f..344a188 100644
--- a/libminifi/include/core/FlowConfiguration.h
+++ b/libminifi/include/core/FlowConfiguration.h
@@ -30,7 +30,6 @@
#include "processors/TailFile.h"
#include "processors/ListenSyslog.h"
#include "processors/GenerateFlowFile.h"
-#include "processors/InvokeHTTP.h"
#include "processors/ListenHTTP.h"
#include "processors/LogAttribute.h"
#include "processors/ExecuteProcess.h"
@@ -70,6 +69,9 @@ class FlowConfiguration : public CoreComponent {
logger_(logging::LoggerFactory<FlowConfiguration>::getLogger()) {
controller_services_ = std::make_shared<core::controller::ControllerServiceMap>();
service_provider_ = std::make_shared<core::controller::StandardControllerServiceProvider>(controller_services_, nullptr, configuration);
+ for(auto sl_func : statics_sl_funcs_){
+ registerResource("",sl_func);
+ }
}
virtual ~FlowConfiguration();
@@ -118,8 +120,21 @@ class FlowConfiguration : public CoreComponent {
return service_provider_;
}
+ static bool add_static_func(std::string functor){
+ statics_sl_funcs_.push_back(functor);
+ return true;
+ }
+
protected:
+ void registerResource(const std::string &resource_function) {
+ core::ClassLoader::getDefaultClassLoader().registerResource("", resource_function);
+ }
+
+ void registerResource(const std::string &resource_location, const std::string &resource_function) {
+ core::ClassLoader::getDefaultClassLoader().registerResource(resource_location, resource_function);
+ }
+
// service provider reference.
std::shared_ptr<core::controller::StandardControllerServiceProvider> service_provider_;
// based, shared controller service map.
@@ -136,6 +151,7 @@ class FlowConfiguration : public CoreComponent {
private:
std::shared_ptr<logging::Logger> logger_;
+ static std::vector<std::string> statics_sl_funcs_;
};
} /* namespace core */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/96d18744/libminifi/include/core/ProcessSession.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/ProcessSession.h b/libminifi/include/core/ProcessSession.h
index 5b78022..5585874 100644
--- a/libminifi/include/core/ProcessSession.h
+++ b/libminifi/include/core/ProcessSession.h
@@ -77,7 +77,29 @@ class ProcessSession {
std::shared_ptr<core::FlowFile> create(std::shared_ptr<core::FlowFile> &&parent);
// Create a new UUID FlowFile with no content resource claim and inherit all attributes from parent
std::shared_ptr<core::FlowFile> create(std::shared_ptr<core::FlowFile> &parent) {
- return create(parent);
+ std::map<std::string, std::string> empty;
+ std::shared_ptr<core::FlowFile> record = std::make_shared<FlowFileRecord>(process_context_->getFlowFileRepository(), process_context_->getContentRepository(), empty);
+
+ if (record) {
+ _addedFlowFiles[record->getUUIDStr()] = record;
+ logger_->log_debug("Create FlowFile with UUID %s", record->getUUIDStr().c_str());
+ }
+
+ if (record) {
+ // Copy attributes
+ std::map<std::string, std::string> parentAttributes = parent->getAttributes();
+ std::map<std::string, std::string>::iterator it;
+ for (it = parentAttributes.begin(); it != parentAttributes.end(); it++) {
+ if (it->first == FlowAttributeKey(ALTERNATE_IDENTIFIER) || it->first == FlowAttributeKey(DISCARD_REASON) || it->first == FlowAttributeKey(UUID))
+ // Do not copy special attributes from parent
+ continue;
+ record->setAttribute(it->first, it->second);
+ }
+ record->setLineageStartDate(parent->getlineageStartDate());
+ record->setLineageIdentifiers(parent->getlineageIdentifiers());
+ parent->getlineageIdentifiers().insert(parent->getUUIDStr());
+ }
+ return record;
}
// Add a FlowFile to the session
void add(std::shared_ptr<core::FlowFile> &flow);
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/96d18744/libminifi/include/core/Processor.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/Processor.h b/libminifi/include/core/Processor.h
index 99b1043..955a044 100644
--- a/libminifi/include/core/Processor.h
+++ b/libminifi/include/core/Processor.h
@@ -62,7 +62,7 @@ namespace core {
#define DEFAULT_PENALIZATION_PERIOD_SECONDS 30
// Processor Class
-class Processor : public Connectable, public ConfigurableComponent, public std::enable_shared_from_this<Processor> {
+class __attribute__((visibility("default"))) Processor : public Connectable, public ConfigurableComponent, public std::enable_shared_from_this<Processor> {
public:
// Constructor
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/96d18744/libminifi/include/processors/InvokeHTTP.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/InvokeHTTP.h b/libminifi/include/processors/InvokeHTTP.h
deleted file mode 100644
index 1c9d594..0000000
--- a/libminifi/include/processors/InvokeHTTP.h
+++ /dev/null
@@ -1,181 +0,0 @@
-/**
- * InvokeHTTP class declaration
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef __INVOKE_HTTP_H__
-#define __INVOKE_HTTP_H__
-
-#include <memory>
-#include <regex>
-
-#include <curl/curl.h>
-#include "FlowFileRecord.h"
-#include "core/Processor.h"
-#include "core/ProcessSession.h"
-#include "core/Core.h"
-#include "core/Property.h"
-#include "core/Resource.h"
-#include "controllers/SSLContextService.h"
-#include "utils/ByteInputCallBack.h"
-#include "core/logging/LoggerConfiguration.h"
-#include "utils/Id.h"
-#include "utils/HTTPClient.h"
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace processors {
-
-// InvokeHTTP Class
-class InvokeHTTP : public core::Processor {
- public:
-
- // Constructor
- /*!
- * Create a new processor
- */
- InvokeHTTP(std::string name, uuid_t uuid = NULL)
- : Processor(name, uuid),
- date_header_include_(true),
- connect_timeout_(20000),
- penalize_no_retry_(false),
- read_timeout_(20000),
- always_output_response_(false),
- disable_peer_verification_(false),
- ssl_context_service_(nullptr),
- use_chunked_encoding_(false),
- logger_(logging::LoggerFactory<InvokeHTTP>::getLogger()) {
- static utils::HTTPClientInitializer *initializer = utils::HTTPClientInitializer::getInstance();
- }
- // Destructor
- virtual ~InvokeHTTP();
- // Processor Name
- static const char *ProcessorName;
- // Supported Properties
- static core::Property Method;
- static core::Property URL;
- static core::Property ConnectTimeout;
- static core::Property ReadTimeout;
- static core::Property DateHeader;
- static core::Property FollowRedirects;
- static core::Property AttributesToSend;
- static core::Property SSLContext;
- static core::Property ProxyHost;
- static core::Property ProxyPort;
- static core::Property ProxyUser;
- static core::Property ProxyPassword;
- static core::Property ContentType;
- static core::Property SendBody;
- static core::Property UseChunkedEncoding;
- static core::Property DisablePeerVerification;
- static core::Property PropPutOutputAttributes;
-
- static core::Property AlwaysOutputResponse;
-
- static core::Property PenalizeOnNoRetry;
-
- static const char* STATUS_CODE;
- static const char* STATUS_MESSAGE;
- static const char* RESPONSE_BODY;
- static const char* REQUEST_URL;
- static const char* TRANSACTION_ID;
- static const char* REMOTE_DN;
- static const char* EXCEPTION_CLASS;
- static const char* EXCEPTION_MESSAGE;
- // Supported Relationships
- static core::Relationship Success;
- static core::Relationship RelResponse;
- static core::Relationship RelRetry;
- static core::Relationship RelNoRetry;
- static core::Relationship RelFailure;
-
- virtual void onTrigger(core::ProcessContext *context, core::ProcessSession *session);
- virtual void initialize();
- virtual void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory);
- /**
- * Provides a reference to the URL.
- */
- const std::string &getUrl() {
- return url_;
- }
-
- protected:
-
- /**
- * Generate a transaction ID
- * @return transaction ID string.
- */
- std::string generateId();
-
- /**
- * Routes the flowfile to the proper destination
- * @param request request flow file record
- * @param response response flow file record
- * @param session process session
- * @param context process context
- * @param isSuccess success code or not
- * @param statuscode http response code.
- */
- void route(std::shared_ptr<FlowFileRecord> &request, std::shared_ptr<FlowFileRecord> &response, core::ProcessSession *session, core::ProcessContext *context, bool isSuccess, int statusCode);
- /**
- * Determine if we should emit a new flowfile based on our activity
- * @param method method type
- * @return result of the evaluation.
- */
- bool emitFlowFile(const std::string &method);
-
- std::shared_ptr<minifi::controllers::SSLContextService> ssl_context_service_;
-
- // http method
- std::string method_;
- // url
- std::string url_;
- // include date in the header
- bool date_header_include_;
- // attribute to send regex
- std::string attribute_to_send_regex_;
- // connection timeout
- int64_t connect_timeout_;
- // read timeout.
- int64_t read_timeout_;
- // attribute in which response body will be added
- std::string put_attribute_name_;
- // determine if we always output a response.
- bool always_output_response_;
- // penalize on no retry
- bool penalize_no_retry_;
- // content type.
- std::string content_type_;
- // use chunked encoding.
- bool use_chunked_encoding_;
- // disable peer verification ( makes susceptible for MITM attacks )
- bool disable_peer_verification_;
- private:
- std::shared_ptr<logging::Logger> logger_;
- static std::shared_ptr<utils::IdGenerator> id_generator_;
-};
-
-REGISTER_RESOURCE(InvokeHTTP)
-
-} /* namespace processors */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
-
-#endif
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/96d18744/libminifi/include/processors/LoadProcessors.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/LoadProcessors.h b/libminifi/include/processors/LoadProcessors.h
index b629052..2cec29e 100644
--- a/libminifi/include/processors/LoadProcessors.h
+++ b/libminifi/include/processors/LoadProcessors.h
@@ -20,7 +20,6 @@
#include "core/Core.h"
#include "core/Resource.h"
-
#include "AppendHostInfo.h"
#include "ExecuteProcess.h"
#include "GenerateFlowFile.h"
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/96d18744/libminifi/include/utils/HTTPClient.h
----------------------------------------------------------------------
diff --git a/libminifi/include/utils/HTTPClient.h b/libminifi/include/utils/HTTPClient.h
index 2a26847..eb68b9d 100644
--- a/libminifi/include/utils/HTTPClient.h
+++ b/libminifi/include/utils/HTTPClient.h
@@ -1,5 +1,4 @@
/**
- * HTTPUtils class declaration
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
@@ -16,24 +15,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-#ifndef __HTTP_UTILS_H__
-#define __HTTP_UTILS_H__
-
-#include <curl/curl.h>
-#include <vector>
-#include <iostream>
-#include <string>
-#include <curl/easy.h>
-#include <uuid/uuid.h>
-#include <regex.h>
-#include <vector>
-#include "controllers/SSLContextService.h"
+#ifndef LIBMINIFI_INCLUDE_UTILS_BaseHTTPClient_H_
+#define LIBMINIFI_INCLUDE_UTILS_BaseHTTPClient_H_
#include "ByteInputCallBack.h"
-#include "core/logging/Logger.h"
-#include "core/logging/LoggerConfiguration.h"
-#include "properties/Configure.h"
-#include "io/validation.h"
-
+#include "controllers/SSLContextService.h"
namespace org {
namespace apache {
namespace nifi {
@@ -111,7 +96,7 @@ struct HTTPRequestResponse {
return len;
}
} else {
- return CURL_READFUNC_ABORT;
+ return 0x10000000;
}
return 0;
@@ -124,165 +109,110 @@ struct HTTPRequestResponse {
};
-static void parse_url(std::string &url, std::string &host, int &port, std::string &protocol) {
-
- std::string http("http://");
- std::string https("https://");
-
- if (url.compare(0, http.size(), http) == 0)
- protocol = http;
-
- if (url.compare(0, https.size(), https) == 0)
- protocol = https;
-
- if (!protocol.empty()) {
- size_t pos = url.find_first_of(":", protocol.size());
-
- if (pos == std::string::npos) {
- pos = url.size();
- }
-
- host = url.substr(protocol.size(), pos - protocol.size());
-
- if (pos < url.size() && url[pos] == ':') {
- size_t ppos = url.find_first_of("/", pos);
- if (ppos == std::string::npos) {
- ppos = url.size();
- }
- std::string portStr(url.substr(pos + 1, ppos - pos - 1));
- if (portStr.size() > 0) {
- port = std::stoi(portStr);
- }
- }
- }
-}
-
-/**
- * Purpose and Justification: Initializes and cleans up curl once. Cleanup will only occur at the end of our execution since we are relying on a static variable.
- */
-class HTTPClientInitializer {
+class BaseHTTPClient {
public:
- static HTTPClientInitializer *getInstance() {
- static HTTPClientInitializer initializer;
- return &initializer;
- }
- private:
- ~HTTPClientInitializer() {
- curl_global_cleanup();
+ explicit BaseHTTPClient(const std::string &url, const std::shared_ptr<minifi::controllers::SSLContextService> ssl_context_service = nullptr) {
+ response_code = -1;
}
- HTTPClientInitializer() {
- curl_global_init(CURL_GLOBAL_DEFAULT);
- }
-};
-
-/**
- * Purpose and Justification: Pull the basics for an HTTPClient into a self contained class. Simply provide
- * the URL and an SSLContextService ( can be null).
- *
- * Since several portions of the code have been relying on curl, we can encapsulate most CURL HTTP
- * operations here without maintaining it everywhere. Further, this will help with testing as we
- * only need to to test our usage of CURL once
- */
-class HTTPClient {
- public:
- HTTPClient(const std::string &url, const std::shared_ptr<minifi::controllers::SSLContextService> ssl_context_service = nullptr);
- ~HTTPClient();
+ explicit BaseHTTPClient() {
+ response_code = -1;
+ }
- void setVerbose();
+ virtual ~BaseHTTPClient() {
+ }
- void initialize(const std::string &method);
+ virtual void setVerbose() {
+ }
- void setConnectionTimeout(int64_t timeout);
+ virtual void initialize(const std::string &method, const std::string url = "", const std::shared_ptr<minifi::controllers::SSLContextService> ssl_context_service = nullptr) {
+ }
- void setReadTimeout(int64_t timeout);
+ virtual void setConnectionTimeout(int64_t timeout) {
+ }
- void setUploadCallback(HTTPUploadCallback *callbackObj);
+ virtual void setReadTimeout(int64_t timeout) {
+ }
- struct curl_slist *build_header_list(std::string regex, const std::map<std::string, std::string> &attributes);
+ virtual void setUploadCallback(HTTPUploadCallback *callbackObj) {
+ }
- void setContentType(std::string content_type);
+ virtual void setContentType(std::string content_type) {
+ }
- std::string escape(std::string string_to_escape);
+ virtual std::string escape(std::string string_to_escape) {
+ return "";
+ }
- void setPostFields(std::string input);
+ virtual void setPostFields(std::string input) {
+ }
- void setHeaders(struct curl_slist *list);
+ virtual bool submit() {
+ return false;
+ }
- bool submit();
+ virtual int64_t &getResponseCode() {
+ return response_code;
+ }
- CURLcode getResponseResult();
+ virtual const char *getContentType() {
+ return "";
+ }
- int64_t &getResponseCode();
+ virtual const std::vector<char> &getResponseBody() {
+ return response_body_;
+ }
- const char *getContentType();
+ virtual void appendHeader(const std::string &new_header) {
- const std::vector<char> &getResponseBody();
+ }
- void set_request_method(const std::string method);
+ virtual void set_request_method(const std::string method) {
+ }
- void setUseChunkedEncoding();
+ virtual void setUseChunkedEncoding() {
+ }
- void setDisablePeerVerification();
+ virtual void setDisablePeerVerification() {
+ }
- const std::vector<std::string> &getHeaders() {
- return header_response_.header_tokens_;
+ virtual const std::vector<std::string> &getHeaders() {
+ return headers_;
}
protected:
+ int64_t response_code;
+ std::vector<char> response_body_;
+ std::vector<std::string> headers_;
- inline bool matches(const std::string &value, const std::string &sregex);
-
- static CURLcode configure_ssl_context(CURL *curl, void *ctx, void *param) {
- minifi::controllers::SSLContextService *ssl_context_service = static_cast<minifi::controllers::SSLContextService*>(param);
- if (!ssl_context_service->configure_ssl_context(static_cast<SSL_CTX*>(ctx))) {
- return CURLE_FAILED_INIT;
- }
- return CURLE_OK;
+ virtual inline bool matches(const std::string &value, const std::string &sregex){
+ return false;
}
- void configure_secure_connection(CURL *http_session);
-
- bool isSecure(const std::string &url);
- struct curl_slist *headers_;
- utils::HTTPRequestResponse content_;
- utils::HTTPHeaderResponse header_response_;
- CURLcode res;
- int64_t http_code;
- char *content_type;
-
- int64_t connect_timeout_;
- // read timeout.
- int64_t read_timeout_;
-
- std::string content_type_;
-
- std::shared_ptr<logging::Logger> logger_;
- CURL *http_session_;
- std::string url_;
- std::string method_;
- std::shared_ptr<minifi::controllers::SSLContextService> ssl_context_service_;
};
-//static std::string get_token(HTTPClientstd::string loginUrl, std::string username, std::string password, HTTPSecurityConfiguration &securityConfig) {
-static std::string get_token(HTTPClient &client, std::string username, std::string password) {
+static std::string get_token(utils::BaseHTTPClient *client, std::string username, std::string password) {
+
+ if (nullptr == client) {
+ return "";
+ }
utils::HTTPRequestResponse content;
std::string token;
- client.setContentType("application/x-www-form-urlencoded");
+ client->setContentType("application/x-www-form-urlencoded");
- client.set_request_method("POST");
+ client->set_request_method("POST");
std::string payload = "username=" + username + "&" + "password=" + password;
- client.setPostFields(client.escape(payload));
+ client->setPostFields(client->escape(payload));
- client.submit();
+ client->submit();
- if (client.submit() && client.getResponseCode() == 200) {
+ if (client->submit() && client->getResponseCode() == 200) {
- const std::string &response_body = std::string(client.getResponseBody().data(), client.getResponseBody().size());
+ const std::string &response_body = std::string(client->getResponseBody().data(), client->getResponseBody().size());
if (!response_body.empty()) {
token = "Bearer " + response_body;
@@ -292,10 +222,42 @@ static std::string get_token(HTTPClient &client, std::string username, std::stri
return token;
}
+static void parse_url(std::string &url, std::string &host, int &port, std::string &protocol) {
+
+ std::string http("http://");
+ std::string https("https://");
+
+ if (url.compare(0, http.size(), http) == 0)
+ protocol = http;
+
+ if (url.compare(0, https.size(), https) == 0)
+ protocol = https;
+
+ if (!protocol.empty()) {
+ size_t pos = url.find_first_of(":", protocol.size());
+
+ if (pos == std::string::npos) {
+ pos = url.size();
+ }
+
+ host = url.substr(protocol.size(), pos - protocol.size());
+
+ if (pos < url.size() && url[pos] == ':') {
+ size_t ppos = url.find_first_of("/", pos);
+ if (ppos == std::string::npos) {
+ ppos = url.size();
+ }
+ std::string portStr(url.substr(pos + 1, ppos - pos - 1));
+ if (portStr.size() > 0) {
+ port = std::stoi(portStr);
+ }
+ }
+ }
+}
} /* namespace utils */
} /* namespace minifi */
} /* namespace nifi */
} /* namespace apache */
} /* namespace org */
-#endif
+#endif /* LIBMINIFI_INCLUDE_UTILS_BaseHTTPClient_H_ */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/96d18744/libminifi/src/RemoteProcessorGroupPort.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/RemoteProcessorGroupPort.cpp b/libminifi/src/RemoteProcessorGroupPort.cpp
index 03121a8..89b78db 100644
--- a/libminifi/src/RemoteProcessorGroupPort.cpp
+++ b/libminifi/src/RemoteProcessorGroupPort.cpp
@@ -230,30 +230,39 @@ void RemoteProcessorGroupPort::refreshRemoteSite2SiteInfo() {
configure_->get(Configure::nifi_rest_api_password, this->rest_password_);
std::string token;
-
+ std::unique_ptr<utils::BaseHTTPClient> client = nullptr;
if (!rest_user_name_.empty()) {
std::string loginUrl = this->protocol_ + this->host_ + ":" + std::to_string(this->port_) + "/nifi-api/access/token";
- utils::HTTPClient client(loginUrl, ssl_service);
- client.setVerbose();
- token = utils::get_token(client, this->rest_user_name_, this->rest_password_);
+
+ auto client_ptr = core::ClassLoader::getDefaultClassLoader().instantiateRaw("HTTPClient", "HTTPClient");
+ if (nullptr == client_ptr) {
+ logger_->log_error("Could not locate HTTPClient. You do not have cURL support!");
+ return;
+ }
+ client = std::unique_ptr<utils::BaseHTTPClient>(dynamic_cast<utils::BaseHTTPClient*>(client_ptr));
+ client->initialize("GET", loginUrl, ssl_service);
+
+ token = utils::get_token(client.get(), this->rest_user_name_, this->rest_password_);
logger_->log_debug("Token from NiFi REST Api endpoint %s, %s", loginUrl, token);
if (token.empty())
return;
}
- utils::HTTPClient client(fullUrl.c_str(), ssl_service);
-
- client.initialize("GET");
+ auto client_ptr = core::ClassLoader::getDefaultClassLoader().instantiateRaw("HTTPClient", "HTTPClient");
+ if (nullptr == client_ptr) {
+ logger_->log_error("Could not locate HTTPClient. You do not have cURL support!");
+ return;
+ }
+ client = std::unique_ptr<utils::BaseHTTPClient>(dynamic_cast<utils::BaseHTTPClient*>(client_ptr));
+ client->initialize("GET", fullUrl.c_str(), ssl_service);
- struct curl_slist *list = NULL;
if (!token.empty()) {
std::string header = "Authorization: " + token;
- list = curl_slist_append(list, header.c_str());
- client.setHeaders(list);
+ client->appendHeader(header);
}
- if (client.submit() && client.getResponseCode() == 200) {
- const std::vector<char> &response_body = client.getResponseBody();
+ if (client->submit() && client->getResponseCode() == 200) {
+ const std::vector<char> &response_body = client->getResponseBody();
if (!response_body.empty()) {
std::string controller = std::string(response_body.begin(), response_body.end());
logger_->log_debug("controller config %s", controller.c_str());
@@ -273,7 +282,7 @@ void RemoteProcessorGroupPort::refreshRemoteSite2SiteInfo() {
logger_->log_info("process group remote site2site port %d, is secure %d", site2site_port_, site2site_secure_);
}
} else {
- logger_->log_error("Cannot output body to content for ProcessGroup::refreshRemoteSite2SiteInfo: received HTTP code %d from %s", client.getResponseCode(), fullUrl);
+ logger_->log_error("Cannot output body to content for ProcessGroup::refreshRemoteSite2SiteInfo: received HTTP code %d from %s", client->getResponseCode(), fullUrl);
}
} else {
logger_->log_error("ProcessGroup::refreshRemoteSite2SiteInfo -- curl_easy_perform() failed \n");
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/96d18744/libminifi/src/c2/C2Agent.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/c2/C2Agent.cpp b/libminifi/src/c2/C2Agent.cpp
index d1c71e6..58b5514 100644
--- a/libminifi/src/c2/C2Agent.cpp
+++ b/libminifi/src/c2/C2Agent.cpp
@@ -25,6 +25,8 @@
#include <string>
#include <memory>
#include "core/state/UpdateController.h"
+#include "core/logging/Logger.h"
+#include "core/logging/LoggerConfiguration.h"
namespace org {
namespace apache {
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/96d18744/libminifi/src/c2/protocols/RESTProtocol.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/c2/protocols/RESTProtocol.cpp b/libminifi/src/c2/protocols/RESTProtocol.cpp
deleted file mode 100644
index c8babb3..0000000
--- a/libminifi/src/c2/protocols/RESTProtocol.cpp
+++ /dev/null
@@ -1,177 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "c2/protocols/RESTProtocol.h"
-
-#include <algorithm>
-#include <memory>
-#include <utility>
-#include <map>
-#include <string>
-#include <vector>
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace c2 {
-
-const C2Payload RESTProtocol::parseJsonResponse(const C2Payload &payload, const std::vector<char> &response) {
- Json::Reader reader;
- Json::Value root;
- try {
- if (reader.parse(std::string(response.data(), response.size()), root)) {
- std::string requested_operation = getOperation(payload);
-
- std::string identifier;
- if (root.isMember("operationid")) {
- identifier = root["operationid"].asString();
- }
- if (root["operation"].asString() == requested_operation) {
- if (root["requested_operations"].size() == 0) {
- return std::move(C2Payload(payload.getOperation(), state::UpdateState::READ_COMPLETE, true));
- }
- C2Payload new_payload(payload.getOperation(), state::UpdateState::NESTED, true);
-
- new_payload.setIdentifier(identifier);
-
- for (const Json::Value& request : root["requested_operations"]) {
- Operation newOp = stringToOperation(request["operation"].asString());
- C2Payload nested_payload(newOp, state::UpdateState::READ_COMPLETE, true);
- C2ContentResponse new_command(newOp);
- new_command.delay = 0;
- new_command.required = true;
- new_command.ttl = -1;
- // set the identifier if one exists
- if (request.isMember("operationid")) {
- new_command.ident = request["operationid"].asString();
- nested_payload.setIdentifier(new_command.ident);
- }
- new_command.name = request["name"].asString();
-
- if (request.isMember("content") && request["content"].size() > 0) {
- for (const auto &name : request["content"].getMemberNames()) {
- new_command.operation_arguments[name] = request["content"][name].asString();
- }
- }
- nested_payload.addContent(std::move(new_command));
- new_payload.addPayload(std::move(nested_payload));
- }
- // we have a response for this request
- return std::move(new_payload);
- }
- }
- } catch (...) {
- }
- return std::move(C2Payload(payload.getOperation(), state::UpdateState::READ_ERROR, true));
-}
-
-Json::Value RESTProtocol::serializeJsonPayload(Json::Value &json_root, const C2Payload &payload) {
- // get the name from the content
- Json::Value json_payload;
- std::map<std::string, std::vector<Json::Value>> children;
- for (const auto &nested_payload : payload.getNestedPayloads()) {
- Json::Value child_payload = serializeJsonPayload(json_payload, nested_payload);
- children[nested_payload.getLabel()].push_back(child_payload);
- }
- for (auto child_vector : children) {
- if (child_vector.second.size() > 1) {
- Json::Value children_json(Json::arrayValue);
- for (auto child : child_vector.second) {
- json_payload[child_vector.first] = child;
- }
- } else {
- if (child_vector.second.size() == 1) {
- if (child_vector.second.at(0).isMember(child_vector.first)) {
- json_payload[child_vector.first] = child_vector.second.at(0)[child_vector.first];
- } else {
- json_payload[child_vector.first] = child_vector.second.at(0);
- }
- }
- }
- }
-
- const std::vector<C2ContentResponse> &content = payload.getContent();
- for (const auto &payload_content : content) {
- Json::Value payload_content_values;
- bool use_sub_option = true;
- if (payload_content.op == payload.getOperation()) {
- for (auto content : payload_content.operation_arguments) {
- if (payload_content.operation_arguments.size() == 1 && payload_content.name == content.first) {
- json_payload[payload_content.name] = content.second;
- use_sub_option = false;
- } else {
- payload_content_values[content.first] = content.second;
- }
- }
- }
- if (use_sub_option)
- json_payload[payload_content.name] = payload_content_values;
- }
- return json_payload;
-}
-
-std::string RESTProtocol::getOperation(const C2Payload &payload) {
- switch (payload.getOperation()) {
- case Operation::ACKNOWLEDGE:
- return "acknowledge";
- case Operation::HEARTBEAT:
- return "heartbeat";
- case Operation::RESTART:
- return "restart";
- case Operation::DESCRIBE:
- return "describe";
- case Operation::STOP:
- return "stop";
- case Operation::START:
- return "start";
- case Operation::UPDATE:
- return "update";
- default:
- return "heartbeat";
- }
-}
-
-Operation RESTProtocol::stringToOperation(const std::string str) {
- std::string op = str;
- std::transform(str.begin(), str.end(), op.begin(), ::tolower);
- if (op == "heartbeat") {
- return Operation::HEARTBEAT;
- } else if (op == "acknowledge") {
- return Operation::ACKNOWLEDGE;
- } else if (op == "update") {
- return Operation::UPDATE;
- } else if (op == "describe") {
- return Operation::DESCRIBE;
- } else if (op == "restart") {
- return Operation::RESTART;
- } else if (op == "clear") {
- return Operation::CLEAR;
- } else if (op == "stop") {
- return Operation::STOP;
- } else if (op == "start") {
- return Operation::START;
- }
- return Operation::HEARTBEAT;
-}
-
-} /* namespace c2 */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/96d18744/libminifi/src/c2/protocols/RESTReceiver.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/c2/protocols/RESTReceiver.cpp b/libminifi/src/c2/protocols/RESTReceiver.cpp
deleted file mode 100644
index e79ffd7..0000000
--- a/libminifi/src/c2/protocols/RESTReceiver.cpp
+++ /dev/null
@@ -1,148 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "c2/protocols/RESTReceiver.h"
-#include <algorithm>
-#include <memory>
-#include <utility>
-#include <map>
-#include <string>
-#include <vector>
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace c2 {
-
-int log_message(const struct mg_connection *conn, const char *message) {
- puts(message);
- return 1;
-}
-
-int ssl_protocol_en(void *ssl_context, void *user_data) {
- struct ssl_ctx_st *ctx = (struct ssl_ctx_st *) ssl_context;
- return 0;
-}
-
-RESTReceiver::RESTReceiver(std::string name, uuid_t uuid)
- : HeartBeatReporter(name, uuid),
- logger_(logging::LoggerFactory<RESTReceiver>::getLogger()) {
-}
-
-void RESTReceiver::initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<Configure> &configure) {
- HeartBeatReporter::initialize(controller, configure);
- logger_->log_debug("Initializing rest receiveer");
- if (nullptr != configuration_) {
- std::string listeningPort, rootUri, caCert;
- configuration_->get("c2.rest.listener.port", listeningPort);
- configuration_->get("c2.rest.listener.heartbeat.rooturi", rootUri);
- configuration_->get("c2.rest.listener.cacert", caCert);
-
- if (!listeningPort.empty() && !rootUri.empty()) {
- handler = std::unique_ptr<ListeningProtocol>(new ListeningProtocol());
- if (!caCert.empty()) {
- listener = std::move(start_webserver(listeningPort, rootUri, dynamic_cast<CivetHandler*>(handler.get()), caCert));
- } else {
- listener = std::move(start_webserver(listeningPort, rootUri, dynamic_cast<CivetHandler*>(handler.get())));
- }
- }
- }
-}
-int16_t RESTReceiver::heartbeat(const C2Payload &payload) {
- std::string operation_request_str = getOperation(payload);
- std::string outputConfig;
- Json::Value json_payload;
- json_payload["operation"] = operation_request_str;
- if (payload.getIdentifier().length() > 0) {
- json_payload["operationid"] = payload.getIdentifier();
- }
- const std::vector<C2ContentResponse> &content = payload.getContent();
-
- for (const auto &payload_content : content) {
- Json::Value payload_content_values;
- bool use_sub_option = true;
- if (payload_content.op == payload.getOperation()) {
- for (auto content : payload_content.operation_arguments) {
- if (payload_content.operation_arguments.size() == 1 && payload_content.name == content.first) {
- json_payload[payload_content.name] = content.second;
- use_sub_option = false;
- } else {
- payload_content_values[content.first] = content.second;
- }
- }
- }
- if (use_sub_option)
- json_payload[payload_content.name] = payload_content_values;
- }
-
- for (const auto &nested_payload : payload.getNestedPayloads()) {
- json_payload[nested_payload.getLabel()] = serializeJsonPayload(json_payload, nested_payload);
- }
-
- Json::StyledWriter writer;
- outputConfig = writer.write(json_payload);
- if (handler != nullptr) {
- logger_->log_debug("Setting %s", outputConfig);
- handler->setResponse(outputConfig);
- }
-
- return 0;
-}
-
-std::unique_ptr<CivetServer> RESTReceiver::start_webserver(const std::string &port, std::string &rooturi, CivetHandler *handler, std::string &ca_cert) {
- struct mg_callbacks callback;
-
- memset(&callback, 0, sizeof(callback));
- callback.init_ssl = ssl_protocol_en;
- std::string my_port = port;
- my_port += "s";
- callback.log_message = log_message;
- const char *options[] = { "listening_ports", port.c_str(), "ssl_certificate", ca_cert.c_str(), "ssl_protocol_version", "0", "ssl_cipher_list", "ALL",
- "ssl_verify_peer", "no", "num_threads", "1", 0 };
-
- std::vector<std::string> cpp_options;
- for (int i = 0; i < (sizeof(options) / sizeof(options[0]) - 1); i++) {
- cpp_options.push_back(options[i]);
- }
- std::unique_ptr<CivetServer> server = std::unique_ptr<CivetServer>(new CivetServer(cpp_options));
-
- server->addHandler(rooturi, handler);
-
- return server;
-}
-
-std::unique_ptr<CivetServer> RESTReceiver::start_webserver(const std::string &port, std::string &rooturi, CivetHandler *handler) {
- const char *options[] = { "document_root", ".", "listening_ports", port.c_str(), "num_threads", "1", 0 };
-
- std::vector<std::string> cpp_options;
- for (int i = 0; i < (sizeof(options) / sizeof(options[0]) - 1); i++) {
- cpp_options.push_back(options[i]);
- }
- std::unique_ptr<CivetServer> server = std::unique_ptr<CivetServer>(new CivetServer(cpp_options));
-
- server->addHandler(rooturi, handler);
-
- return server;
-}
-
-} /* namespace c2 */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/96d18744/libminifi/src/c2/protocols/RESTSender.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/c2/protocols/RESTSender.cpp b/libminifi/src/c2/protocols/RESTSender.cpp
deleted file mode 100644
index e15522b..0000000
--- a/libminifi/src/c2/protocols/RESTSender.cpp
+++ /dev/null
@@ -1,144 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "c2/protocols/RESTSender.h"
-
-#include <algorithm>
-#include <memory>
-#include <utility>
-#include <map>
-#include <string>
-#include <vector>
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace c2 {
-
-RESTSender::RESTSender(std::string name, uuid_t uuid)
- : C2Protocol(name, uuid),
- logger_(logging::LoggerFactory<Connectable>::getLogger()) {
-}
-
-void RESTSender::initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<Configure> &configure) {
- C2Protocol::initialize(controller, configure);
- // base URL when one is not specified.
- if (nullptr != configure) {
- configure->get("c2.rest.url", rest_uri_);
- configure->get("c2.rest.url.ack", ack_uri_);
- }
- logger_->log_info("Submitting to %s", rest_uri_);
-}
-C2Payload RESTSender::consumePayload(const std::string &url, const C2Payload &payload, Direction direction, bool async) {
- std::string operation_request_str = getOperation(payload);
- std::string outputConfig;
- if (direction == Direction::TRANSMIT) {
- Json::Value json_payload;
- json_payload["operation"] = operation_request_str;
- if (payload.getIdentifier().length() > 0) {
- json_payload["operationid"] = payload.getIdentifier();
- }
- const std::vector<C2ContentResponse> &content = payload.getContent();
-
- for (const auto &payload_content : content) {
- Json::Value payload_content_values;
- bool use_sub_option = true;
- if (payload_content.op == payload.getOperation()) {
- for (auto content : payload_content.operation_arguments) {
- if (payload_content.operation_arguments.size() == 1 && payload_content.name == content.first) {
- json_payload[payload_content.name] = content.second;
- use_sub_option = false;
- } else {
- payload_content_values[content.first] = content.second;
- }
- }
- }
- if (use_sub_option)
- json_payload[payload_content.name] = payload_content_values;
- }
-
- for (const auto &nested_payload : payload.getNestedPayloads()) {
- json_payload[nested_payload.getLabel()] = serializeJsonPayload(json_payload, nested_payload);
- }
-
- Json::StyledWriter writer;
- outputConfig = writer.write(json_payload);
- }
-
- return std::move(sendPayload(url, direction, payload, outputConfig));
-}
-
-C2Payload RESTSender::consumePayload(const C2Payload &payload, Direction direction, bool async) {
- if (payload.getOperation() == ACKNOWLEDGE) {
- return consumePayload(ack_uri_, payload, direction, async);
- }
- return consumePayload(rest_uri_, payload, direction, async);
-}
-
-void RESTSender::update(const std::shared_ptr<Configure> &configure) {
- std::string url;
- configure->get("c2.rest.url", url);
- configure->get("c2.rest.url.ack", url);
-}
-
-const C2Payload RESTSender::sendPayload(const std::string url, const Direction direction, const C2Payload &payload, const std::string outputConfig) {
- if (!url.empty()) {
- utils::HTTPClient client(url, ssl_context_service_);
- client.setConnectionTimeout(2);
-
- std::unique_ptr<utils::ByteInputCallBack> input = nullptr;
- std::unique_ptr<utils::HTTPUploadCallback> callback = nullptr;
- if (direction == Direction::TRANSMIT) {
- input = std::unique_ptr<utils::ByteInputCallBack>(new utils::ByteInputCallBack());
- callback = std::unique_ptr<utils::HTTPUploadCallback>(new utils::HTTPUploadCallback);
- input->write(outputConfig);
- callback->ptr = input.get();
- callback->pos = 0;
- client.set_request_method("POST");
- client.setUploadCallback(callback.get());
- } else {
- // we do not need to set the uplaod callback
- // since we are not uploading anything on a get
- client.set_request_method("GET");
- }
- client.setContentType("application/json");
- bool isOkay = client.submit();
- int64_t respCode = client.getResponseCode();
-
- if (isOkay && respCode) {
- if (payload.isRaw()) {
- C2Payload response_payload(payload.getOperation(), state::UpdateState::READ_COMPLETE, true, true);
-
- response_payload.setRawData(client.getResponseBody());
- return std::move(response_payload);
- }
- return parseJsonResponse(payload, client.getResponseBody());
- } else {
- return C2Payload(payload.getOperation(), state::UpdateState::READ_ERROR, true);
- }
- } else {
- return C2Payload(payload.getOperation(), state::UpdateState::READ_COMPLETE, true);
- }
-}
-
-} /* namespace c2 */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/96d18744/libminifi/src/core/ClassLoader.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/ClassLoader.cpp b/libminifi/src/core/ClassLoader.cpp
index 9bead0e..ae39eae 100644
--- a/libminifi/src/core/ClassLoader.cpp
+++ b/libminifi/src/core/ClassLoader.cpp
@@ -37,8 +37,15 @@ ClassLoader &ClassLoader::getDefaultClassLoader() {
// populate ret
return ret;
}
-uint16_t ClassLoader::registerResource(const std::string &resource) {
- void* resource_ptr = dlopen(resource.c_str(), RTLD_LAZY);
+uint16_t ClassLoader::registerResource(const std::string &resource, const std::string &resourceFunction) {
+ void *resource_ptr = nullptr;
+ if (resource.empty()) {
+ dlclose(dlopen(0, RTLD_LAZY | RTLD_GLOBAL));
+ resource_ptr = dlopen(0, RTLD_NOW | RTLD_GLOBAL);
+ } else {
+ dlclose(dlopen(resource.c_str(), RTLD_LAZY | RTLD_GLOBAL));
+ resource_ptr = dlopen(resource.c_str(), RTLD_NOW | RTLD_GLOBAL);
+ }
if (!resource_ptr) {
logger_->log_error("Cannot load library: %s", dlerror());
return RESOURCE_FAILURE;
@@ -51,7 +58,7 @@ uint16_t ClassLoader::registerResource(const std::string &resource) {
dlerror();
// load the symbols
- createFactory* create_factory_func = reinterpret_cast<createFactory*>(dlsym(resource_ptr, "createFactory"));
+ createFactory* create_factory_func = reinterpret_cast<createFactory*>(dlsym(resource_ptr, resourceFunction.c_str()));
const char* dlsym_error = dlerror();
if (dlsym_error) {
logger_->log_error("Cannot load library: %s", dlsym_error);
@@ -62,7 +69,11 @@ uint16_t ClassLoader::registerResource(const std::string &resource) {
std::lock_guard<std::mutex> lock(internal_mutex_);
- loaded_factories_[factory->getClassName()] = std::unique_ptr<ObjectFactory>(factory);
+ for (auto class_name : factory->getClassNames()) {
+ loaded_factories_[class_name] = std::unique_ptr<ObjectFactory>(factory->assign(class_name));
+ }
+
+ delete factory;
return RESOURCE_SUCCESS;
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/96d18744/libminifi/src/core/FlowConfiguration.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/FlowConfiguration.cpp b/libminifi/src/core/FlowConfiguration.cpp
index 9ce7146..16a9778 100644
--- a/libminifi/src/core/FlowConfiguration.cpp
+++ b/libminifi/src/core/FlowConfiguration.cpp
@@ -18,6 +18,7 @@
#include "core/FlowConfiguration.h"
#include <memory>
+#include <vector>
#include <string>
#include "core/ClassLoader.h"
@@ -27,9 +28,13 @@ namespace nifi {
namespace minifi {
namespace core {
+std::vector<std::string> FlowConfiguration::statics_sl_funcs_;
+
FlowConfiguration::~FlowConfiguration() {
}
+
+
std::shared_ptr<core::Processor> FlowConfiguration::createProcessor(std::string name, uuid_t uuid) {
auto ptr = core::ClassLoader::getDefaultClassLoader().instantiate(name, uuid);
if (nullptr == ptr) {
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/96d18744/libminifi/src/core/yaml/YamlConfiguration.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/yaml/YamlConfiguration.cpp b/libminifi/src/core/yaml/YamlConfiguration.cpp
index b5d9a8f..370da21 100644
--- a/libminifi/src/core/yaml/YamlConfiguration.cpp
+++ b/libminifi/src/core/yaml/YamlConfiguration.cpp
@@ -38,6 +38,14 @@ core::ProcessGroup *YamlConfiguration::parseRootProcessGroupYaml(YAML::Node root
checkRequiredField(&rootFlowNode, "name",
CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
std::string flowName = rootFlowNode["name"].as<std::string>();
+
+ auto class_loader_functions = rootFlowNode["Class Loader Functions"];
+ if (class_loader_functions && class_loader_functions.IsSequence()) {
+ for (auto function : class_loader_functions) {
+ registerResource(function.as<std::string>());
+ }
+ }
+
std::string id = getOrGenerateId(&rootFlowNode);
uuid_parse(id.c_str(), uuid);
@@ -80,6 +88,15 @@ void YamlConfiguration::parseProcessorNodeYaml(YAML::Node processorsNode, core::
CONFIG_YAML_PROCESSORS_KEY);
procCfg.name = procNode["name"].as<std::string>();
procCfg.id = getOrGenerateId(&procNode);
+
+ auto lib_location = procNode["Library Location"];
+ auto lib_function = procNode["Library Function"];
+ if (lib_location && lib_function) {
+ auto lib_location_str = lib_location.as<std::string>();
+ auto lib_function_str = lib_function.as<std::string>();
+ registerResource(lib_location_str, lib_function_str);
+ }
+
uuid_parse(procCfg.id.c_str(), uuid);
logger_->log_debug("parseProcessorNode: name => [%s] id => [%s]", procCfg.name, procCfg.id);
checkRequiredField(&procNode, "class", CONFIG_YAML_PROCESSORS_KEY);
|