Repository: nifi-minifi-cpp
Updated Branches:
refs/heads/master 2981bc1d4 -> 088e1be7c
MINIFICPP-41: First iteration of C api
MINIFICPP-41: Improve build process and make function names clearer
This closes #217.
Signed-off-by: Marc Parisi <phrocker@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/088e1be7
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/088e1be7
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/088e1be7
Branch: refs/heads/master
Commit: 088e1be7ca9deac9f104ebf315a88bd311e6a623
Parents: 2981bc1
Author: Marc Parisi <phrocker@apache.org>
Authored: Tue Dec 5 13:57:06 2017 -0500
Committer: Marc Parisi <phrocker@apache.org>
Committed: Mon Dec 18 13:55:28 2017 -0500
----------------------------------------------------------------------
CMakeLists.txt | 3 +
LibExample/CMakeLists.txt | 61 +++++
LibExample/generate_flow.c | 67 +++++
LibExample/transmit_flow.c | 92 +++++++
libminifi/CMakeLists.txt | 4 +
libminifi/include/capi/Instance.h | 134 ++++++++++
libminifi/include/capi/Plan.h | 117 ++++++++
libminifi/include/capi/ReflexiveSession.h | 77 ++++++
libminifi/include/capi/api.h | 154 +++++++++++
libminifi/include/capi/expect.h | 32 +++
libminifi/include/core/ProcessContext.h | 4 +-
libminifi/include/core/ProcessSession.h | 6 +-
.../include/sitetosite/SiteToSiteFactory.h | 3 +
libminifi/src/RemoteProcessorGroupPort.cpp | 1 +
libminifi/src/capi/Plan.cpp | 216 +++++++++++++++
libminifi/src/capi/api.cpp | 264 +++++++++++++++++++
16 files changed, 1231 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/088e1be7/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 3463d69..eabc1ba 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -206,6 +206,9 @@ endif()
## NOW WE CAN ADD LIBRARIES AND EXTENSIONS TO MAIN
add_subdirectory(main)
+if (NOT DISABLE_CURL)
+ add_subdirectory(LibExample)
+endif()
# Generate source assembly
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/088e1be7/LibExample/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/LibExample/CMakeLists.txt b/LibExample/CMakeLists.txt
new file mode 100644
index 0000000..4ba26bd
--- /dev/null
+++ b/LibExample/CMakeLists.txt
@@ -0,0 +1,61 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+cmake_minimum_required(VERSION 2.6)
+
+IF(POLICY CMP0048)
+ CMAKE_POLICY(SET CMP0048 OLD)
+ENDIF(POLICY CMP0048)
+
+include_directories(../libminifi/include ../libminifi/include/c2 ../libminifi/include/c2/protocols/ ../libminifi/include/core/state ./libminifi/include/core/statemanagement/metrics ../libminifi/include/core/yaml ../libminifi/include/core ../thirdparty/spdlog-20170710/include ../thirdparty/concurrentqueue ../thirdparty/yaml-cpp-yaml-cpp-20171024/include ../thirdparty/civetweb-1.9.1/include ../thirdparty/jsoncpp/include ../thirdparty/)
+
+include(CheckCXXCompilerFlag)
+CHECK_CXX_COMPILER_FLAG("-std=c++11" COMPILER_SUPPORTS_CXX11)
+CHECK_CXX_COMPILER_FLAG("-std=c++0x" COMPILER_SUPPORTS_CXX0X)
+if(COMPILER_SUPPORTS_CXX11)
+ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -Os")
+elseif(COMPILER_SUPPORTS_CXX0X)
+ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++0x -Os")
+else()
+ message(STATUS "The compiler ${CMAKE_CXX_COMPILER} has no C++11 support. Please use a different C++ compiler.")
+endif()
+
+add_executable(transmit_flow transmit_flow.c)
+
+# Link against minifi, yaml-cpp, civetweb-cpp, uuid, openssl, jsoncpp and rocksdb
+target_link_libraries(transmit_flow capi core-minifi minifi)
+
+if (APPLE)
+ target_link_libraries (transmit_flow -Wl,-all_load minifi-http-curl)
+else ()
+ target_link_libraries (transmit_flow -Wl,--whole-archive minifi-http-curl -Wl,--no-whole-archive)
+endif ()
+
+
+add_executable(generate_flow generate_flow.c)
+
+# Link against minifi, yaml-cpp, civetweb-cpp, uuid, openssl, jsoncpp and rocksdb
+target_link_libraries(generate_flow capi core-minifi minifi)
+
+if (APPLE)
+ target_link_libraries (generate_flow -Wl,-all_load minifi-http-curl)
+else ()
+ target_link_libraries (generate_flow -Wl,--whole-archive minifi-http-curl -Wl,--no-whole-archive)
+endif ()
+
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/088e1be7/LibExample/generate_flow.c
----------------------------------------------------------------------
diff --git a/LibExample/generate_flow.c b/LibExample/generate_flow.c
new file mode 100644
index 0000000..23ba979
--- /dev/null
+++ b/LibExample/generate_flow.c
@@ -0,0 +1,67 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <unistd.h>
+#include <dirent.h>
+#include "capi/api.h"
+
+
+/**
+ * This is an example of the C API that transmits a flow file to a remote instance.
+ */
+int main(int argc, char **argv) {
+
+ if (argc < 3) {
+ printf("Error: must run ./generate_flow <instance> <remote port> \n");
+ exit(1);
+ }
+
+ char *instance_str = argv[1];
+ char *portStr = argv[2];
+
+ nifi_port port;
+
+ port.pord_id = portStr;
+
+ nifi_instance *instance = create_instance(instance_str, &port);
+
+ flow *new_flow = create_flow(instance,"GenerateFlowFile");
+
+ flow_file_record *record = get_next_flow_file(instance, new_flow );
+
+ if (record == 0){
+ printf("Could not create flow file");
+ exit(1);
+ }
+
+ transmit_flowfile(record,instance);
+
+ free_flowfile(record);
+
+ // initializing will make the transmission slightly more efficient.
+ //initialize_instance(instance);
+ //transfer_file_or_directory(instance,file);
+
+ free_flow(new_flow);
+
+ free_instance(instance);
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/088e1be7/LibExample/transmit_flow.c
----------------------------------------------------------------------
diff --git a/LibExample/transmit_flow.c b/LibExample/transmit_flow.c
new file mode 100644
index 0000000..64e2beb
--- /dev/null
+++ b/LibExample/transmit_flow.c
@@ -0,0 +1,92 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <unistd.h>
+#include <dirent.h>
+#include "capi/api.h"
+
+int is_dir(const char *path) {
+ struct stat stat_struct;
+ if (stat(path, &stat_struct) != 0)
+ return 0;
+ return S_ISDIR(stat_struct.st_mode);
+}
+
+void transfer_file_or_directory(nifi_instance *instance, char *file_or_dir) {
+ int size = 1;
+
+ if (is_dir(file_or_dir)) {
+ DIR *d;
+
+ struct dirent *dir;
+ d = opendir(file_or_dir);
+ if (d) {
+ while ((dir = readdir(d)) != NULL) {
+ if (!memcmp(dir->d_name,".",1) )
+ continue;
+ char *file_path = malloc(strlen(file_or_dir) + strlen(dir->d_name) + 2);
+ sprintf(file_path,"%s/%s",file_or_dir,dir->d_name);
+ transfer_file_or_directory(instance,file_path);
+ free(file_path);
+ }
+ closedir(d);
+ }
+ printf("%s is a directory", file_or_dir);
+ } else {
+ printf("Transferring %s\n",file_or_dir);
+
+ flow_file_record *record = create_flowfile(file_or_dir);
+
+ add_attribute(record, "addedattribute", "1", 2);
+
+ transmit_flowfile(record, instance);
+
+ free_flowfile(record);
+ }
+}
+
+/**
+ * This is an example of the C API that transmits a flow file to a remote instance.
+ */
+int main(int argc, char **argv) {
+
+ if (argc < 4) {
+ printf("Error: must run ./transmit_flow <instance> <remote port> <file or directory>\n");
+ exit(1);
+ }
+
+ char *instance_str = argv[1];
+ char *portStr = argv[2];
+ char *file = argv[3];
+
+ nifi_port port;
+
+ port.pord_id = portStr;
+
+ nifi_instance *instance = create_instance(instance_str, &port);
+
+ // initializing will make the transmission slightly more efficient.
+ //initialize_instance(instance);
+ transfer_file_or_directory(instance,file);
+
+ free_instance(instance);
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/088e1be7/libminifi/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/libminifi/CMakeLists.txt b/libminifi/CMakeLists.txt
index b882a1c..6ede84f 100644
--- a/libminifi/CMakeLists.txt
+++ b/libminifi/CMakeLists.txt
@@ -64,6 +64,8 @@ file(GLOB SOURCES "src/sitetosite/*.cpp" "src/core/logging/*.cpp" "src/core/s
file(GLOB PROCESSOR_SOURCES "src/processors/*.cpp" )
+file(GLOB CAPI_SOURCES "src/capi/*.cpp" )
+
file(GLOB SPD_SOURCES "../thirdparty/spdlog-20170710/include/spdlog/*")
# Workaround the limitations of having a
@@ -89,8 +91,10 @@ else ()
endif (OPENSSL_FOUND)
add_library(minifi STATIC ${PROCESSOR_SOURCES})
+add_library(capi STATIC ${CAPI_SOURCES})
target_link_libraries(minifi c-library civetweb-cpp)
target_link_libraries(minifi core-minifi)
+
SET (LIBMINIFI core-minifi PARENT_SCOPE)
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/088e1be7/libminifi/include/capi/Instance.h
----------------------------------------------------------------------
diff --git a/libminifi/include/capi/Instance.h b/libminifi/include/capi/Instance.h
new file mode 100644
index 0000000..5d1c4c1
--- /dev/null
+++ b/libminifi/include/capi/Instance.h
@@ -0,0 +1,134 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CAPI_INSTANCE_H_
+#define LIBMINIFI_INCLUDE_CAPI_INSTANCE_H_
+
+#include <memory>
+#include <type_traits>
+#include <string>
+#include "core/Property.h"
+#include "properties/Configure.h"
+#include "io/StreamFactory.h"
+#include "RemoteProcessorGroupPort.h"
+#include "core/ContentRepository.h"
+#include "core/repository/VolatileContentRepository.h"
+#include "core/Repository.h"
+
+#include "core/Connectable.h"
+#include "core/ProcessorNode.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/ProcessSessionFactory.h"
+#include "core/controller/ControllerServiceProvider.h"
+#include "core/FlowConfiguration.h"
+#include "ReflexiveSession.h"
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+
+class ProcessorLink {
+ public:
+ explicit ProcessorLink(const std::shared_ptr<core::Processor> &processor)
+ : processor_(processor) {
+
+ }
+
+ const std::shared_ptr<core::Processor> &getProcessor() {
+ return processor_;
+ }
+
+ protected:
+ std::shared_ptr<core::Processor> processor_;
+};
+
+class Instance {
+ public:
+
+ explicit Instance(const std::string &url, const std::string &port)
+ : configure_(std::make_shared<Configure>()),
+ url_(url),
+ rpgInitialized_(false),
+ content_repo_(std::make_shared<minifi::core::repository::FileSystemRepository>()),
+ no_op_repo_(std::make_shared<minifi::core::Repository>()) {
+ stream_factory_ = std::make_shared<minifi::io::StreamFactory>(configure_);
+ uuid_t uuid;
+ uuid_parse(port.c_str(), uuid);
+ rpg_ = std::make_shared<minifi::RemoteProcessorGroupPort>(stream_factory_, url, url, configure_, uuid);
+ proc_node_ = std::make_shared<core::ProcessorNode>(rpg_);
+ core::FlowConfiguration::initialize_static_functions();
+ content_repo_->initialize(configure_);
+ }
+
+ bool isRPGConfigured() {
+ return rpgInitialized_;
+ }
+
+ void setRemotePort(std::string remote_port) {
+ rpg_->setProperty(minifi::RemoteProcessorGroupPort::portUUID, remote_port);
+ rpg_->initialize();
+ rpg_->setTransmitting(true);
+ rpgInitialized_ = true;
+ }
+
+ std::shared_ptr<Configure> getConfiguration() {
+ return configure_;
+ }
+
+ std::shared_ptr<minifi::core::Repository> getNoOpRepository() {
+ return no_op_repo_;
+ }
+
+ std::shared_ptr<minifi::core::ContentRepository> getContentRepository() {
+ return content_repo_;
+ }
+
+ void transfer(const std::shared_ptr<FlowFileRecord> &ff) {
+ std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider = nullptr;
+ auto processContext = std::make_shared<core::ProcessContext>(proc_node_, controller_service_provider, no_op_repo_, no_op_repo_, content_repo_);
+ auto sessionFactory = std::make_shared<core::ProcessSessionFactory>(processContext);
+
+ rpg_->onSchedule(processContext, sessionFactory);
+
+ auto session = std::make_shared<core::ReflexiveSession>(processContext);
+
+ session->add(ff);
+
+ rpg_->onTrigger(processContext, session);
+ }
+
+ protected:
+
+ bool rpgInitialized_;
+
+ std::shared_ptr<minifi::core::Repository> no_op_repo_;
+
+ std::shared_ptr<minifi::core::ContentRepository> content_repo_;
+
+ std::shared_ptr<core::ProcessorNode> proc_node_;
+ std::shared_ptr<minifi::RemoteProcessorGroupPort> rpg_;
+ std::shared_ptr<io::StreamFactory> stream_factory_;
+ std::string url_;
+ std::shared_ptr<Configure> configure_;
+};
+
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+#endif /* LIBMINIFI_INCLUDE_CAPI_INSTANCE_H_ */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/088e1be7/libminifi/include/capi/Plan.h
----------------------------------------------------------------------
diff --git a/libminifi/include/capi/Plan.h b/libminifi/include/capi/Plan.h
new file mode 100644
index 0000000..cd9d756
--- /dev/null
+++ b/libminifi/include/capi/Plan.h
@@ -0,0 +1,117 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef LIBMINIFI_CAPI_PLAN_H_
+#define LIBMINIFI_CAPI_PLAN_H_
+#include <dirent.h>
+#include <cstdio>
+#include <cstdlib>
+#include <sstream>
+#include "ResourceClaim.h"
+#include <vector>
+#include <set>
+#include <map>
+#include "core/logging/Logger.h"
+#include "core/Core.h"
+#include "properties/Configure.h"
+#include "properties/Properties.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "utils/Id.h"
+#include "spdlog/sinks/ostream_sink.h"
+#include "spdlog/sinks/dist_sink.h"
+#include "core/Core.h"
+#include "core/FlowFile.h"
+#include "core/Processor.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/ProcessorNode.h"
+#include "core/reporting/SiteToSiteProvenanceReportingTask.h"
+
+
+class ExecutionPlan {
+ public:
+
+ explicit ExecutionPlan(std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::Repository> prov_repo);
+
+ std::shared_ptr<core::Processor> addProcessor(const std::shared_ptr<core::Processor> &processor, const std::string &name,
+ core::Relationship relationship = core::Relationship("success", "description"),
+ bool linkToPrevious = false);
+
+ std::shared_ptr<core::Processor> addProcessor(const std::string &processor_name, const std::string &name, core::Relationship relationship = core::Relationship("success", "description"),
+ bool linkToPrevious = false);
+
+ bool setProperty(const std::shared_ptr<core::Processor> proc, const std::string &prop, const std::string &value);
+
+ void reset();
+
+ bool runNextProcessor(std::function<void(const std::shared_ptr<core::ProcessContext>, const std::shared_ptr<core::ProcessSession>)> verify = nullptr);
+
+ std::set<provenance::ProvenanceEventRecord*> getProvenanceRecords();
+
+ std::shared_ptr<core::FlowFile> getCurrentFlowFile();
+
+ std::shared_ptr<core::Repository> getFlowRepo() {
+ return flow_repo_;
+ }
+
+ std::shared_ptr<core::Repository> getProvenanceRepo() {
+ return prov_repo_;
+ }
+
+ std::shared_ptr<core::ContentRepository> getContentRepo() {
+ return content_repo_;
+ }
+
+ protected:
+
+ void finalize();
+
+ std::shared_ptr<minifi::Connection> buildFinalConnection(std::shared_ptr<core::Processor> processor, bool setDest = false);
+
+ std::shared_ptr<org::apache::nifi::minifi::io::StreamFactory> stream_factory;
+
+ std::shared_ptr<core::ContentRepository> content_repo_;
+
+ std::shared_ptr<core::Repository> flow_repo_;
+ std::shared_ptr<core::Repository> prov_repo_;
+
+ std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider_;
+
+ std::atomic<bool> finalized;
+
+ int location;
+
+ std::shared_ptr<core::ProcessSession> current_session_;
+ std::shared_ptr<core::FlowFile> current_flowfile_;
+
+ std::map<std::string, std::shared_ptr<core::Processor>> processor_mapping_;
+ std::vector<std::shared_ptr<core::Processor>> processor_queue_;
+ std::vector<std::shared_ptr<core::Processor>> configured_processors_;
+ std::vector<std::shared_ptr<core::ProcessorNode>> processor_nodes_;
+ std::vector<std::shared_ptr<core::ProcessContext>> processor_contexts_;
+ std::vector<std::shared_ptr<core::ProcessSession>> process_sessions_;
+ std::vector<std::shared_ptr<core::ProcessSessionFactory>> factories_;
+ std::vector<std::shared_ptr<minifi::Connection>> relationships_;
+ core::Relationship termination_;
+
+ private:
+
+ std::shared_ptr<logging::Logger> logger_;
+};
+
+#endif /* LIBMINIFI_CAPI_PLAN_H_ */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/088e1be7/libminifi/include/capi/ReflexiveSession.h
----------------------------------------------------------------------
diff --git a/libminifi/include/capi/ReflexiveSession.h b/libminifi/include/capi/ReflexiveSession.h
new file mode 100644
index 0000000..ebf6cbe
--- /dev/null
+++ b/libminifi/include/capi/ReflexiveSession.h
@@ -0,0 +1,77 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __REFLEXIVE_SESSION_H__
+#define __REFLEXIVE_SESSION_H__
+
+#include <uuid/uuid.h>
+#include <vector>
+#include <queue>
+#include <map>
+#include <mutex>
+#include <atomic>
+#include <algorithm>
+#include <set>
+
+#include "core/ProcessSession.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+
+// ReflexiveSession Class
+class ReflexiveSession : public ProcessSession{
+ public:
+ // Constructor
+ /*!
+ * Create a new process session
+ */
+ ReflexiveSession(std::shared_ptr<ProcessContext> processContext = nullptr)
+ : ProcessSession(processContext){
+ }
+
+// Destructor
+ virtual ~ReflexiveSession() {
+ }
+
+ virtual std::shared_ptr<core::FlowFile> get(){
+ auto prevff = ff;
+ ff = nullptr;
+ return prevff;
+ }
+
+ virtual void add(const std::shared_ptr<core::FlowFile> &flow){
+ ff = flow;
+ }
+ virtual void transfer(const std::shared_ptr<core::FlowFile> &flow, Relationship relationship){
+ // no op
+ }
+ protected:
+ //
+ // Get the FlowFile from the highest priority queue
+ std::shared_ptr<core::FlowFile> ff;
+
+};
+
+} /* namespace core */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+#endif
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/088e1be7/libminifi/include/capi/api.h
----------------------------------------------------------------------
diff --git a/libminifi/include/capi/api.h b/libminifi/include/capi/api.h
new file mode 100644
index 0000000..440329e
--- /dev/null
+++ b/libminifi/include/capi/api.h
@@ -0,0 +1,154 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CAPI_NANOFI_H_
+#define LIBMINIFI_INCLUDE_CAPI_NANOFI_H_
+
+#include <stdint.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#define API_VERSION "0.01"
+/****
+ * ##################################################################
+ * BASE NIFI OPERATIONS
+ * ##################################################################
+ */
+
+
+/**
+ * NiFi Port struct
+ */
+typedef struct {
+ char *pord_id;
+}nifi_port;
+
+
+/**
+ * Nifi instance struct
+ */
+typedef struct {
+
+ void *instance_ptr;
+
+ nifi_port port;
+
+} nifi_instance;
+
+
+nifi_instance *create_instance(char *url, nifi_port *port);
+
+void set_property(nifi_instance *, char *, char *);
+
+void initialize_instance(nifi_instance *);
+
+void free_instance(nifi_instance*);
+
+
+/****
+ * ##################################################################
+ * Processor OPERATIONS
+ * ##################################################################
+ */
+
+typedef struct {
+ void *processor_ptr;
+} processor;
+
+uint8_t run_processor(const processor *processor);
+
+
+/****
+ * ##################################################################
+ * FLOWFILE OPERATIONS
+ * ##################################################################
+ */
+
+typedef struct{
+ char *key;
+ void *value;
+ size_t value_size;
+} attribute;
+
+/**
+ * State of a flow file
+ *
+ */
+typedef struct {
+ uint64_t size; /**< Size in bytes of the data corresponding to this flow file */
+
+ char * contentLocation; /**< Filesystem location of this object */
+
+ void *attributes; /**< Hash map of attributes */
+
+
+} flow_file_record;
+
+
+
+typedef struct {
+ void *plan;
+} flow;
+
+
+flow *create_flow(nifi_instance *, const char *);
+
+void free_flow(flow *);
+
+flow_file_record *get_next_flow_file(nifi_instance *, flow *);
+
+size_t get_flow_files(nifi_instance *, flow *, flow_file_record **, size_t );
+
+
+/**
+ * Creates a flow file object.
+ * Will obtain the size of file
+ */
+flow_file_record* create_flowfile(const char *file);
+
+void free_flowfile(flow_file_record*);
+
+uint8_t add_attribute(flow_file_record*, char *key, void *value, size_t size);
+
+void *get_attribute(flow_file_record*, char *key);
+
+uint8_t remove_attribute(flow_file_record*, char *key);
+
+/****
+ * ##################################################################
+ * Remote NIFI OPERATIONS
+ * ##################################################################
+ */
+
+void transmit_flowfile(flow_file_record *, nifi_instance *);
+
+
+/****
+ * ##################################################################
+ * Persistence Operations
+ * ##################################################################
+ */
+
+void transmit_flowfile(flow_file_record *, nifi_instance *);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* LIBMINIFI_INCLUDE_CAPI_NANOFI_H_ */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/088e1be7/libminifi/include/capi/expect.h
----------------------------------------------------------------------
diff --git a/libminifi/include/capi/expect.h b/libminifi/include/capi/expect.h
new file mode 100644
index 0000000..ead182c
--- /dev/null
+++ b/libminifi/include/capi/expect.h
@@ -0,0 +1,32 @@
+/**
+
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CAPI_EXPECT_H_
+#define LIBMINIFI_INCLUDE_CAPI_EXPECT_H_
+
+
+// various likely/unlikely pragmas I've carried over the years.
+// you'll see this in many projects
+#if defined(__GNUC__) && __GNUC__ >= 4
+#define LIKELY(x) (__builtin_expect((x), 1))
+#define UNLIKELY(x) (__builtin_expect((x), 0))
+#else
+#define LIKELY(x) (x)
+#define UNLIKELY(x) (x)
+#endif
+
+#endif /* LIBMINIFI_INCLUDE_CAPI_EXPECT_H_ */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/088e1be7/libminifi/include/core/ProcessContext.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/ProcessContext.h b/libminifi/include/core/ProcessContext.h
index 9759c16..ad946a5 100644
--- a/libminifi/include/core/ProcessContext.h
+++ b/libminifi/include/core/ProcessContext.h
@@ -125,7 +125,9 @@ class ProcessContext : public controller::ControllerServiceLookup {
* identifier
*/
std::shared_ptr<core::controller::ControllerService> getControllerService(const std::string &identifier) {
- return controller_service_provider_->getControllerServiceForComponent(identifier, processor_node_->getUUIDStr());
+ if (controller_service_provider_ != nullptr)
+ return controller_service_provider_->getControllerServiceForComponent(identifier, processor_node_->getUUIDStr());
+ return nullptr;
}
/**
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/088e1be7/libminifi/include/core/ProcessSession.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/ProcessSession.h b/libminifi/include/core/ProcessSession.h
index 30d0563..38a713d 100644
--- a/libminifi/include/core/ProcessSession.h
+++ b/libminifi/include/core/ProcessSession.h
@@ -70,7 +70,7 @@ class ProcessSession {
}
//
// Get the FlowFile from the highest priority queue
- std::shared_ptr<core::FlowFile> get();
+ virtual std::shared_ptr<core::FlowFile> get();
// Create a new UUID FlowFile with no content resource claim and without parent
std::shared_ptr<core::FlowFile> create();
// Create a new UUID FlowFile with no content resource claim and inherit all attributes from parent
@@ -78,7 +78,7 @@ class ProcessSession {
// Create a new UUID FlowFile with no content resource claim and inherit all attributes from parent
std::shared_ptr<core::FlowFile> create(const std::shared_ptr<core::FlowFile> &parent);
// Add a FlowFile to the session
- void add(const std::shared_ptr<core::FlowFile> &flow);
+ virtual void add(const std::shared_ptr<core::FlowFile> &flow);
// Clone a new UUID FlowFile from parent both for content resource claim and attributes
std::shared_ptr<core::FlowFile> clone(const std::shared_ptr<core::FlowFile> &parent);
// Clone a new UUID FlowFile from parent for attributes and sub set of parent content resource claim
@@ -86,7 +86,7 @@ class ProcessSession {
// Duplicate a FlowFile with the same UUID and all attributes and content resource claim for the roll back of the session
std::shared_ptr<core::FlowFile> duplicate(const std::shared_ptr<core::FlowFile> &original);
// Transfer the FlowFile to the relationship
- void transfer(const std::shared_ptr<core::FlowFile> &flow, Relationship relationship);
+ virtual void transfer(const std::shared_ptr<core::FlowFile> &flow, Relationship relationship);
// Put Attribute
void putAttribute(const std::shared_ptr<core::FlowFile> &flow, std::string key, std::string value);
// Remove Attribute
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/088e1be7/libminifi/include/sitetosite/SiteToSiteFactory.h
----------------------------------------------------------------------
diff --git a/libminifi/include/sitetosite/SiteToSiteFactory.h b/libminifi/include/sitetosite/SiteToSiteFactory.h
index 4beefc6..648347d 100644
--- a/libminifi/include/sitetosite/SiteToSiteFactory.h
+++ b/libminifi/include/sitetosite/SiteToSiteFactory.h
@@ -71,6 +71,9 @@ static std::unique_ptr<SiteToSiteClient> createClient(const SiteToSiteClientConf
if (nullptr != http_protocol) {
auto ptr = std::unique_ptr<SiteToSiteClient>(static_cast<SiteToSiteClient*>(http_protocol));
auto peer = std::unique_ptr<SiteToSitePeer>(new SiteToSitePeer(client_configuration.getPeer()->getHost(), client_configuration.getPeer()->getPort()));
+ char idStr[37];
+ uuid_unparse_lower(uuid, idStr);
+ std::cout << "sending " << idStr << std::endl;
ptr->setPortId(uuid);
ptr->setPeer(std::move(peer));
return ptr;
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/088e1be7/libminifi/src/RemoteProcessorGroupPort.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/RemoteProcessorGroupPort.cpp b/libminifi/src/RemoteProcessorGroupPort.cpp
index 76932de..38ba889 100644
--- a/libminifi/src/RemoteProcessorGroupPort.cpp
+++ b/libminifi/src/RemoteProcessorGroupPort.cpp
@@ -156,6 +156,7 @@ void RemoteProcessorGroupPort::onSchedule(const std::shared_ptr<core::ProcessCon
if (context->getProperty(portUUID.getName(), value)) {
uuid_parse(value.c_str(), protocol_uuid_);
}
+
std::string context_name;
if (!context->getProperty(SSLContext.getName(), context_name) || IsNullOrEmpty(context_name)) {
context_name = RPG_SSL_CONTEXT_SERVICE_NAME;
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/088e1be7/libminifi/src/capi/Plan.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/capi/Plan.cpp b/libminifi/src/capi/Plan.cpp
new file mode 100644
index 0000000..f038e3d
--- /dev/null
+++ b/libminifi/src/capi/Plan.cpp
@@ -0,0 +1,216 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "capi/Plan.h"
+#include <memory>
+#include <vector>
+#include <set>
+#include <string>
+
+ExecutionPlan::ExecutionPlan(std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::Repository> prov_repo)
+ :
+ content_repo_(content_repo),
+ flow_repo_(flow_repo),
+ prov_repo_(prov_repo),
+ finalized(false),
+ location(-1),
+ current_flowfile_(nullptr),
+ logger_(logging::LoggerFactory<ExecutionPlan>::getLogger()) {
+ stream_factory = std::make_shared<org::apache::nifi::minifi::io::StreamFactory>(std::make_shared<minifi::Configure>());
+}
+
+std::shared_ptr<core::Processor> ExecutionPlan::addProcessor(const std::shared_ptr<core::Processor> &processor, const std::string &name, core::Relationship relationship,
+bool linkToPrevious) {
+ if (finalized) {
+ return nullptr;
+ }
+
+
+ uuid_t uuid;
+ uuid_generate(uuid);
+
+ processor->setStreamFactory(stream_factory);
+ // initialize the processor
+ processor->initialize();
+
+ processor_mapping_[processor->getUUIDStr()] = processor;
+
+ if (!linkToPrevious) {
+ termination_ = relationship;
+ } else {
+ std::shared_ptr<core::Processor> last = processor_queue_.back();
+
+ if (last == nullptr) {
+ last = processor;
+ termination_ = relationship;
+ }
+
+ std::stringstream connection_name;
+ connection_name << last->getUUIDStr() << "-to-" << processor->getUUIDStr();
+ logger_->log_info("Creating %s connection for proc %d", connection_name.str(), processor_queue_.size() + 1);
+ std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(flow_repo_, content_repo_, connection_name.str());
+ connection->setRelationship(relationship);
+
+ // link the connections so that we can test results at the end for this
+ connection->setSource(last);
+ connection->setDestination(processor);
+
+ uuid_t uuid_copy, uuid_copy_next;
+ last->getUUID(uuid_copy);
+ connection->setSourceUUID(uuid_copy);
+ processor->getUUID(uuid_copy_next);
+ connection->setDestinationUUID(uuid_copy_next);
+ last->addConnection(connection);
+ if (last != processor) {
+ processor->addConnection(connection);
+ }
+ relationships_.push_back(connection);
+ }
+
+ std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor);
+
+ processor_nodes_.push_back(node);
+
+ std::shared_ptr<core::ProcessContext> context = std::make_shared<core::ProcessContext>(node, controller_services_provider_, prov_repo_, flow_repo_, content_repo_);
+ processor_contexts_.push_back(context);
+
+ processor_queue_.push_back(processor);
+
+ return processor;
+}
+
+std::shared_ptr<core::Processor> ExecutionPlan::addProcessor(const std::string &processor_name, const std::string &name, core::Relationship relationship,
+bool linkToPrevious) {
+ if (finalized) {
+ return nullptr;
+ }
+
+ uuid_t uuid;
+ uuid_generate(uuid);
+
+ auto ptr = core::ClassLoader::getDefaultClassLoader().instantiate(processor_name, uuid);
+ if (nullptr == ptr) {
+ throw std::exception();
+ }
+ std::shared_ptr<core::Processor> processor = std::static_pointer_cast<core::Processor>(ptr);
+
+ processor->setName(name);
+
+ return addProcessor(processor, name, relationship, linkToPrevious);
+}
+
+bool ExecutionPlan::setProperty(const std::shared_ptr<core::Processor> proc, const std::string &prop, const std::string &value) {
+ uint32_t i = 0;
+ logger_->log_info("Attempting to set property %s %s for %s", prop, value, proc->getName());
+ for (i = 0; i < processor_queue_.size(); i++) {
+ if (processor_queue_.at(i) == proc) {
+ break;
+ }
+ }
+
+ if (i >= processor_queue_.size() || i == 0 || i >= processor_contexts_.size()) {
+ return false;
+ }
+
+ return processor_contexts_.at(i)->setProperty(prop, value);
+}
+
+void ExecutionPlan::reset() {
+ process_sessions_.clear();
+ factories_.clear();
+ location = -1;
+ for (auto proc : processor_queue_) {
+ while (proc->getActiveTasks() > 0) {
+ proc->decrementActiveTask();
+ }
+ }
+}
+
+
+bool ExecutionPlan::runNextProcessor(std::function<void(const std::shared_ptr<core::ProcessContext>, const std::shared_ptr<core::ProcessSession>)> verify) {
+ if (!finalized) {
+ finalize();
+ }
+ logger_->log_info("Running next processor %d, processor_queue_.size %d, processor_contexts_.size %d", location, processor_queue_.size(), processor_contexts_.size());
+
+ location++;
+ std::shared_ptr<core::Processor> processor = processor_queue_.at(location);
+ std::shared_ptr<core::ProcessContext> context = processor_contexts_.at(location);
+ std::shared_ptr<core::ProcessSessionFactory> factory = std::make_shared<core::ProcessSessionFactory>(context);
+ factories_.push_back(factory);
+ if (std::find(configured_processors_.begin(), configured_processors_.end(), processor) == configured_processors_.end()) {
+ processor->onSchedule(context, factory);
+ configured_processors_.push_back(processor);
+ }
+ std::shared_ptr<core::ProcessSession> current_session = std::make_shared<core::ProcessSession>(context);
+ process_sessions_.push_back(current_session);
+ processor->incrementActiveTasks();
+ processor->setScheduledState(core::ScheduledState::RUNNING);
+ if (verify != nullptr) {
+ verify(context, current_session);
+ } else {
+ logger_->log_info("Running %s", processor->getName());
+ processor->onTrigger(context, current_session);
+ }
+ current_session->commit();
+ current_flowfile_ = current_session->get();
+ return location + 1 < processor_queue_.size();
+}
+
+std::set<provenance::ProvenanceEventRecord*> ExecutionPlan::getProvenanceRecords() {
+ return process_sessions_.at(location)->getProvenanceReporter()->getEvents();
+}
+
+std::shared_ptr<core::FlowFile> ExecutionPlan::getCurrentFlowFile() {
+ return current_flowfile_;
+}
+
+std::shared_ptr<minifi::Connection> ExecutionPlan::buildFinalConnection(std::shared_ptr<core::Processor> processor, bool setDest) {
+ std::stringstream connection_name;
+ std::shared_ptr<core::Processor> last = processor;
+ connection_name << last->getUUIDStr() << "-to-" << processor->getUUIDStr();
+ std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(flow_repo_, content_repo_, connection_name.str());
+ connection->setRelationship(termination_);
+
+ // link the connections so that we can test results at the end for this
+ connection->setSource(last);
+ if (setDest)
+ connection->setDestination(processor);
+
+ uuid_t uuid_copy;
+ last->getUUID(uuid_copy);
+ connection->setSourceUUID(uuid_copy);
+ if (setDest)
+ connection->setDestinationUUID(uuid_copy);
+
+ processor->addConnection(connection);
+ return connection;
+}
+
+void ExecutionPlan::finalize() {
+ if (relationships_.size() > 0) {
+ relationships_.push_back(buildFinalConnection(processor_queue_.back()));
+ } else {
+ for (auto processor : processor_queue_) {
+ relationships_.push_back(buildFinalConnection(processor, true));
+ }
+ }
+
+ finalized = true;
+}
+
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/088e1be7/libminifi/src/capi/api.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/capi/api.cpp b/libminifi/src/capi/api.cpp
new file mode 100644
index 0000000..e46fcad
--- /dev/null
+++ b/libminifi/src/capi/api.cpp
@@ -0,0 +1,264 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <string>
+#include <map>
+#include <memory>
+#include <utility>
+#include "core/Core.h"
+#include "capi/api.h"
+#include "capi/expect.h"
+#include "capi/Instance.h"
+#include "capi/Plan.h"
+#include "ResourceClaim.h"
+
+class DirectoryConfiguration {
+ protected:
+ DirectoryConfiguration() {
+ minifi::setDefaultDirectory(DEFAULT_CONTENT_DIRECTORY);
+ }
+ public:
+ static void initialize() {
+ static DirectoryConfiguration configure;
+ }
+};
+
+/**
+ * Creates a NiFi Instance from the url and output port.
+ * @param url http URL for NiFi instance
+ * @param port Remote output port.
+ */
+nifi_instance *create_instance(char *url, nifi_port *port) {
+ // make sure that we have a thread safe way of initializing the content directory
+ DirectoryConfiguration::initialize();
+
+ nifi_instance *instance = new nifi_instance;
+
+ instance->instance_ptr = new minifi::Instance(url, port->pord_id);
+ instance->port.pord_id = port->pord_id;
+
+ return instance;
+}
+
+/**
+ * Initializes the instance
+ */
+void initialize_instance(nifi_instance *instance) {
+ auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr);
+ minifi_instance_ref->setRemotePort(instance->port.pord_id);
+}
+
+/**
+ * Sets a property within the nifi instance
+ * @param instance nifi instance
+ * @param key key in which we will set the valiue
+ * @param value
+ */
+void set_property(nifi_instance *instance, char *key, char *value) {
+ auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr);
+ minifi_instance_ref->getConfiguration()->set(key, value);
+}
+
+/**
+ * Reclaims memory associated with a nifi instance structure.
+ * @param instance nifi instance.
+ */
+void free_instance(nifi_instance* instance) {
+ if (instance != nullptr) {
+ delete ((minifi::Instance*) instance->instance_ptr);
+ delete instance;
+ }
+}
+
+/**
+ * Creates a flow file record
+ * @param file file to place into the flow file.
+ */
+flow_file_record* create_flowfile(const char *file) {
+ flow_file_record *new_ff = new flow_file_record;
+ new_ff->attributes = new std::map<std::string, std::string>();
+ new_ff->contentLocation = new char[strlen(file)];
+ snprintf(new_ff->contentLocation, strlen(file), "%s", file);
+ std::ifstream in(file, std::ifstream::ate | std::ifstream::binary);
+ // set the size of the flow file.
+ new_ff->size = in.tellg();
+
+ return new_ff;
+}
+
+/**
+ * Reclaims memory associated with a flow file object
+ * @param ff flow file record.
+ */
+void free_flowfile(flow_file_record *ff) {
+ if (ff != nullptr) {
+ auto map = static_cast<std::map<std::string, std::string>*>(ff->attributes);
+ delete[] ff->contentLocation;
+ delete map;
+ delete ff;
+ }
+}
+
+/**
+ * Adds an attribute
+ * @param ff flow file record
+ * @param key key
+ * @param value value to add
+ * @param size size of value
+ * @return 0
+ */
+uint8_t add_attribute(flow_file_record *ff, char *key, void *value, size_t size) {
+ auto attribute_map = static_cast<std::map<std::string, std::string>*>(ff->attributes);
+ attribute_map->insert(std::pair<std::string, std::string>(key, std::string(static_cast<char*>(value), size)));
+ return 0;
+}
+
+/*
+ * Obtains the attribute.
+ * @param ff flow file record
+ * @param key key
+ * @param caller_attribute caller supplied object in which we will copy the data ptr
+ * @return 0 if successful, -1 if the key does not exist
+ */
+uint8_t get_attribute(flow_file_record *ff, char *key, attribute *caller_attribute) {
+ auto attribute_map = static_cast<std::map<std::string, std::string>*>(ff->attributes);
+ auto find = attribute_map->find(key);
+ if (find != attribute_map->end()) {
+ caller_attribute->key = key;
+ caller_attribute->value = static_cast<void*>(const_cast<char*>(find->second.data()));
+ caller_attribute->value_size = find->second.size();
+ return 0;
+ }
+ return -1;
+}
+
+/**
+ * Removes a key from the attribute chain
+ * @param ff flow file record
+ * @param key key to remove
+ * @return 0 if removed, -1 otherwise
+ */
+uint8_t remove_attribute(flow_file_record *ff, char *key) {
+ auto attribute_map = static_cast<std::map<std::string, std::string>*>(ff->attributes);
+ auto find = attribute_map->find(key);
+ if (find != attribute_map->end()) {
+ attribute_map->erase(find);
+ return 0;
+ }
+ return -1;
+}
+
+/**
+ * Transmits the flowfile
+ * @param ff flow file record
+ * @param instance nifi instance structure
+ */
+void transmit_flowfile(flow_file_record *ff, nifi_instance *instance) {
+ auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr);
+ // in the unlikely event the user forgot to initialize the instance, we shall do it for them.
+ if (UNLIKELY(minifi_instance_ref->isRPGConfigured() == false)) {
+ minifi_instance_ref->setRemotePort(instance->port.pord_id);
+ }
+
+ auto attribute_map = static_cast<std::map<std::string, std::string>*>(ff->attributes);
+
+ auto no_op = minifi_instance_ref->getNoOpRepository();
+
+ auto content_repo = minifi_instance_ref->getContentRepository();
+
+ std::shared_ptr<minifi::ResourceClaim> claim = std::make_shared<minifi::ResourceClaim>(ff->contentLocation, content_repo);
+ claim->increaseFlowFileRecordOwnedCount();
+ claim->increaseFlowFileRecordOwnedCount();
+
+ auto ffr = std::make_shared<minifi::FlowFileRecord>(no_op, content_repo, *attribute_map, claim);
+ ffr->addAttribute("nanofi.version", API_VERSION);
+ ffr->setSize(ff->size);
+
+ std::string port_uuid = instance->port.pord_id;
+
+ minifi_instance_ref->transfer(ffr);
+}
+
+flow *create_flow(nifi_instance *instance, const char *first_processor) {
+ auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr);
+ flow *new_flow = new flow;
+
+ auto execution_plan = new ExecutionPlan(minifi_instance_ref->getContentRepository(), minifi_instance_ref->getNoOpRepository(), minifi_instance_ref->getNoOpRepository());
+
+ new_flow->plan = execution_plan;
+
+ // automatically adds it with success
+ execution_plan->addProcessor(first_processor, first_processor);
+
+ return new_flow;
+}
+
+void free_flow(flow *flow) {
+ if (flow == nullptr)
+ return;
+ auto execution_plan = static_cast<ExecutionPlan*>(flow->plan);
+ delete execution_plan;
+ delete flow;
+}
+
+flow_file_record *get_next_flow_file(nifi_instance *instance, flow *flow) {
+ auto execution_plan = static_cast<ExecutionPlan*>(flow->plan);
+
+ execution_plan->reset();
+ while (execution_plan->runNextProcessor()) {
+ }
+ auto ff = execution_plan->getCurrentFlowFile();
+ if (ff == nullptr)
+ return nullptr;
+ auto claim = ff->getResourceClaim();
+
+ if (claim != nullptr) {
+ // create a flow file.
+ claim->increaseFlowFileRecordOwnedCount();
+ auto path = claim->getContentFullPath();
+ auto ffr = create_flowfile(path.c_str());
+ std::cout << "dang created " << path << " " << ff->getSize() << std::endl;
+ return ffr;
+ } else {
+ return nullptr;
+ }
+}
+
+size_t get_flow_files(nifi_instance *instance, flow *flow, flow_file_record **ff_r, size_t size) {
+ auto execution_plan = static_cast<ExecutionPlan*>(flow->plan);
+ int i = 0;
+ for (; i < size; i++) {
+ execution_plan->reset();
+ while (execution_plan->runNextProcessor()) {
+ }
+ auto ff = execution_plan->getCurrentFlowFile();
+ if (ff == nullptr)
+ break;
+ auto claim = ff->getResourceClaim();
+
+ if (claim != nullptr) {
+ claim->increaseFlowFileRecordOwnedCount();
+
+ auto path = claim->getContentFullPath();
+ // create a flow file.
+ ff_r[i] = create_flowfile(path.c_str());
+ } else {
+ break;
+ }
+ }
+ return i;
+}
|