nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From phroc...@apache.org
Subject nifi-minifi-cpp git commit: MINIFICPP-41: First iteration of C api MINIFICPP-41: Improve build process and make function names clearer
Date Mon, 18 Dec 2017 19:23:28 GMT
Repository: nifi-minifi-cpp
Updated Branches:
  refs/heads/master 2981bc1d4 -> 088e1be7c


MINIFICPP-41: First iteration of C api
MINIFICPP-41: Improve build process and make function names clearer

This closes #217.

Signed-off-by: Marc Parisi <phrocker@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/088e1be7
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/088e1be7
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/088e1be7

Branch: refs/heads/master
Commit: 088e1be7ca9deac9f104ebf315a88bd311e6a623
Parents: 2981bc1
Author: Marc Parisi <phrocker@apache.org>
Authored: Tue Dec 5 13:57:06 2017 -0500
Committer: Marc Parisi <phrocker@apache.org>
Committed: Mon Dec 18 13:55:28 2017 -0500

----------------------------------------------------------------------
 CMakeLists.txt                                  |   3 +
 LibExample/CMakeLists.txt                       |  61 +++++
 LibExample/generate_flow.c                      |  67 +++++
 LibExample/transmit_flow.c                      |  92 +++++++
 libminifi/CMakeLists.txt                        |   4 +
 libminifi/include/capi/Instance.h               | 134 ++++++++++
 libminifi/include/capi/Plan.h                   | 117 ++++++++
 libminifi/include/capi/ReflexiveSession.h       |  77 ++++++
 libminifi/include/capi/api.h                    | 154 +++++++++++
 libminifi/include/capi/expect.h                 |  32 +++
 libminifi/include/core/ProcessContext.h         |   4 +-
 libminifi/include/core/ProcessSession.h         |   6 +-
 .../include/sitetosite/SiteToSiteFactory.h      |   3 +
 libminifi/src/RemoteProcessorGroupPort.cpp      |   1 +
 libminifi/src/capi/Plan.cpp                     | 216 +++++++++++++++
 libminifi/src/capi/api.cpp                      | 264 +++++++++++++++++++
 16 files changed, 1231 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/088e1be7/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 3463d69..eabc1ba 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -206,6 +206,9 @@ endif()
 ## NOW WE CAN ADD LIBRARIES AND EXTENSIONS TO MAIN
 
 add_subdirectory(main)
+if (NOT DISABLE_CURL)
+  add_subdirectory(LibExample)
+endif()
 
 
 # Generate source assembly

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/088e1be7/LibExample/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/LibExample/CMakeLists.txt b/LibExample/CMakeLists.txt
new file mode 100644
index 0000000..4ba26bd
--- /dev/null
+++ b/LibExample/CMakeLists.txt
@@ -0,0 +1,61 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+cmake_minimum_required(VERSION 2.6)
+
+IF(POLICY CMP0048)
+  CMAKE_POLICY(SET CMP0048 OLD)
+ENDIF(POLICY CMP0048)
+
+include_directories(../libminifi/include  ../libminifi/include/c2  ../libminifi/include/c2/protocols/  ../libminifi/include/core/state ./libminifi/include/core/statemanagement/metrics  ../libminifi/include/core/yaml  ../libminifi/include/core  ../thirdparty/spdlog-20170710/include ../thirdparty/concurrentqueue ../thirdparty/yaml-cpp-yaml-cpp-20171024/include ../thirdparty/civetweb-1.9.1/include ../thirdparty/jsoncpp/include  ../thirdparty/)
+
+include(CheckCXXCompilerFlag)
+CHECK_CXX_COMPILER_FLAG("-std=c++11" COMPILER_SUPPORTS_CXX11)
+CHECK_CXX_COMPILER_FLAG("-std=c++0x" COMPILER_SUPPORTS_CXX0X)
+if(COMPILER_SUPPORTS_CXX11)
+    set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -Os")
+elseif(COMPILER_SUPPORTS_CXX0X)
+    set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++0x -Os")
+else()
+ message(STATUS "The compiler ${CMAKE_CXX_COMPILER} has no C++11 support. Please use a different C++ compiler.")
+endif()
+
+add_executable(transmit_flow transmit_flow.c)
+
+# Link against minifi, yaml-cpp, civetweb-cpp, uuid, openssl, jsoncpp and rocksdb
+target_link_libraries(transmit_flow capi core-minifi minifi)
+
+if (APPLE)
+	target_link_libraries (transmit_flow -Wl,-all_load minifi-http-curl)
+else ()
+	target_link_libraries (transmit_flow -Wl,--whole-archive minifi-http-curl -Wl,--no-whole-archive)
+endif ()
+
+
+add_executable(generate_flow generate_flow.c)
+
+# Link against minifi, yaml-cpp, civetweb-cpp, uuid, openssl, jsoncpp and rocksdb
+target_link_libraries(generate_flow capi core-minifi minifi)
+
+if (APPLE)
+	target_link_libraries (generate_flow -Wl,-all_load minifi-http-curl)
+else ()
+	target_link_libraries (generate_flow -Wl,--whole-archive minifi-http-curl -Wl,--no-whole-archive)
+endif ()
+

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/088e1be7/LibExample/generate_flow.c
----------------------------------------------------------------------
diff --git a/LibExample/generate_flow.c b/LibExample/generate_flow.c
new file mode 100644
index 0000000..23ba979
--- /dev/null
+++ b/LibExample/generate_flow.c
@@ -0,0 +1,67 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <unistd.h>
+#include <dirent.h>
+#include "capi/api.h"
+
+
+/**
+ * This is an example of the C API that transmits a flow file to a remote instance.
+ */
+int main(int argc, char **argv) {
+
+  if (argc < 3) {
+    printf("Error: must run ./generate_flow <instance> <remote port> \n");
+    exit(1);
+  }
+
+  char *instance_str = argv[1];
+  char *portStr = argv[2];
+
+  nifi_port port;
+
+  port.pord_id = portStr;
+
+  nifi_instance *instance = create_instance(instance_str, &port);
+
+  flow *new_flow = create_flow(instance,"GenerateFlowFile");
+
+  flow_file_record *record = get_next_flow_file(instance, new_flow );
+
+  if (record == 0){
+    printf("Could not create flow file");
+    exit(1);
+  }
+
+  transmit_flowfile(record,instance);
+
+  free_flowfile(record);
+
+  // initializing will make the transmission slightly more efficient.
+  //initialize_instance(instance);
+  //transfer_file_or_directory(instance,file);
+
+  free_flow(new_flow);
+
+  free_instance(instance);
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/088e1be7/LibExample/transmit_flow.c
----------------------------------------------------------------------
diff --git a/LibExample/transmit_flow.c b/LibExample/transmit_flow.c
new file mode 100644
index 0000000..64e2beb
--- /dev/null
+++ b/LibExample/transmit_flow.c
@@ -0,0 +1,92 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <unistd.h>
+#include <dirent.h>
+#include "capi/api.h"
+
+int is_dir(const char *path) {
+  struct stat stat_struct;
+  if (stat(path, &stat_struct) != 0)
+    return 0;
+  return S_ISDIR(stat_struct.st_mode);
+}
+
+void transfer_file_or_directory(nifi_instance *instance, char *file_or_dir) {
+  int size = 1;
+
+  if (is_dir(file_or_dir)) {
+    DIR *d;
+
+    struct dirent *dir;
+    d = opendir(file_or_dir);
+    if (d) {
+      while ((dir = readdir(d)) != NULL) {
+        if (!memcmp(dir->d_name,".",1) )
+          continue;
+        char *file_path = malloc(strlen(file_or_dir) + strlen(dir->d_name) + 2);
+        sprintf(file_path,"%s/%s",file_or_dir,dir->d_name);
+        transfer_file_or_directory(instance,file_path);
+        free(file_path);
+      }
+      closedir(d);
+    }
+    printf("%s is a directory", file_or_dir);
+  } else {
+    printf("Transferring %s\n",file_or_dir);
+
+    flow_file_record *record = create_flowfile(file_or_dir);
+
+    add_attribute(record, "addedattribute", "1", 2);
+
+    transmit_flowfile(record, instance);
+
+    free_flowfile(record);
+  }
+}
+
+/**
+ * This is an example of the C API that transmits a flow file to a remote instance.
+ */
+int main(int argc, char **argv) {
+
+  if (argc < 4) {
+    printf("Error: must run ./transmit_flow <instance> <remote port> <file or directory>\n");
+    exit(1);
+  }
+
+  char *instance_str = argv[1];
+  char *portStr = argv[2];
+  char *file = argv[3];
+
+  nifi_port port;
+
+  port.pord_id = portStr;
+
+  nifi_instance *instance = create_instance(instance_str, &port);
+
+  // initializing will make the transmission slightly more efficient.
+  //initialize_instance(instance);
+  transfer_file_or_directory(instance,file);
+
+  free_instance(instance);
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/088e1be7/libminifi/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/libminifi/CMakeLists.txt b/libminifi/CMakeLists.txt
index b882a1c..6ede84f 100644
--- a/libminifi/CMakeLists.txt
+++ b/libminifi/CMakeLists.txt
@@ -64,6 +64,8 @@ file(GLOB SOURCES  "src/sitetosite/*.cpp"  "src/core/logging/*.cpp"  "src/core/s
 
 file(GLOB PROCESSOR_SOURCES  "src/processors/*.cpp" )
 
+file(GLOB CAPI_SOURCES  "src/capi/*.cpp" )
+
 file(GLOB SPD_SOURCES "../thirdparty/spdlog-20170710/include/spdlog/*")
 
 # Workaround the limitations of having a
@@ -89,8 +91,10 @@ else ()
 endif (OPENSSL_FOUND)
 
 add_library(minifi STATIC ${PROCESSOR_SOURCES})
+add_library(capi STATIC ${CAPI_SOURCES})
 target_link_libraries(minifi c-library civetweb-cpp)
 
 target_link_libraries(minifi core-minifi)
 
+
 SET (LIBMINIFI core-minifi PARENT_SCOPE)

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/088e1be7/libminifi/include/capi/Instance.h
----------------------------------------------------------------------
diff --git a/libminifi/include/capi/Instance.h b/libminifi/include/capi/Instance.h
new file mode 100644
index 0000000..5d1c4c1
--- /dev/null
+++ b/libminifi/include/capi/Instance.h
@@ -0,0 +1,134 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CAPI_INSTANCE_H_
+#define LIBMINIFI_INCLUDE_CAPI_INSTANCE_H_
+
+#include <memory>
+#include <type_traits>
+#include <string>
+#include "core/Property.h"
+#include "properties/Configure.h"
+#include "io/StreamFactory.h"
+#include "RemoteProcessorGroupPort.h"
+#include "core/ContentRepository.h"
+#include "core/repository/VolatileContentRepository.h"
+#include "core/Repository.h"
+
+#include "core/Connectable.h"
+#include "core/ProcessorNode.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/ProcessSessionFactory.h"
+#include "core/controller/ControllerServiceProvider.h"
+#include "core/FlowConfiguration.h"
+#include "ReflexiveSession.h"
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+
+class ProcessorLink {
+ public:
+  explicit ProcessorLink(const std::shared_ptr<core::Processor> &processor)
+      : processor_(processor) {
+
+  }
+
+  const std::shared_ptr<core::Processor> &getProcessor() {
+    return processor_;
+  }
+
+ protected:
+  std::shared_ptr<core::Processor> processor_;
+};
+
+class Instance {
+ public:
+
+  explicit Instance(const std::string &url, const std::string &port)
+      : configure_(std::make_shared<Configure>()),
+        url_(url),
+        rpgInitialized_(false),
+        content_repo_(std::make_shared<minifi::core::repository::FileSystemRepository>()),
+        no_op_repo_(std::make_shared<minifi::core::Repository>()) {
+    stream_factory_ = std::make_shared<minifi::io::StreamFactory>(configure_);
+    uuid_t uuid;
+    uuid_parse(port.c_str(), uuid);
+    rpg_ = std::make_shared<minifi::RemoteProcessorGroupPort>(stream_factory_, url, url, configure_, uuid);
+    proc_node_ = std::make_shared<core::ProcessorNode>(rpg_);
+    core::FlowConfiguration::initialize_static_functions();
+    content_repo_->initialize(configure_);
+  }
+
+  bool isRPGConfigured() {
+    return rpgInitialized_;
+  }
+
+  void setRemotePort(std::string remote_port) {
+    rpg_->setProperty(minifi::RemoteProcessorGroupPort::portUUID, remote_port);
+    rpg_->initialize();
+    rpg_->setTransmitting(true);
+    rpgInitialized_ = true;
+  }
+
+  std::shared_ptr<Configure> getConfiguration() {
+    return configure_;
+  }
+
+  std::shared_ptr<minifi::core::Repository> getNoOpRepository() {
+    return no_op_repo_;
+  }
+
+  std::shared_ptr<minifi::core::ContentRepository> getContentRepository() {
+    return content_repo_;
+  }
+
+  void transfer(const std::shared_ptr<FlowFileRecord> &ff) {
+    std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider = nullptr;
+    auto processContext = std::make_shared<core::ProcessContext>(proc_node_, controller_service_provider, no_op_repo_, no_op_repo_, content_repo_);
+    auto sessionFactory = std::make_shared<core::ProcessSessionFactory>(processContext);
+
+    rpg_->onSchedule(processContext, sessionFactory);
+
+    auto session = std::make_shared<core::ReflexiveSession>(processContext);
+
+    session->add(ff);
+
+    rpg_->onTrigger(processContext, session);
+  }
+
+ protected:
+
+  bool rpgInitialized_;
+
+  std::shared_ptr<minifi::core::Repository> no_op_repo_;
+
+  std::shared_ptr<minifi::core::ContentRepository> content_repo_;
+
+  std::shared_ptr<core::ProcessorNode> proc_node_;
+  std::shared_ptr<minifi::RemoteProcessorGroupPort> rpg_;
+  std::shared_ptr<io::StreamFactory> stream_factory_;
+  std::string url_;
+  std::shared_ptr<Configure> configure_;
+};
+
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+#endif /* LIBMINIFI_INCLUDE_CAPI_INSTANCE_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/088e1be7/libminifi/include/capi/Plan.h
----------------------------------------------------------------------
diff --git a/libminifi/include/capi/Plan.h b/libminifi/include/capi/Plan.h
new file mode 100644
index 0000000..cd9d756
--- /dev/null
+++ b/libminifi/include/capi/Plan.h
@@ -0,0 +1,117 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef LIBMINIFI_CAPI_PLAN_H_
+#define LIBMINIFI_CAPI_PLAN_H_
+#include <dirent.h>
+#include <cstdio>
+#include <cstdlib>
+#include <sstream>
+#include "ResourceClaim.h"
+#include <vector>
+#include <set>
+#include <map>
+#include "core/logging/Logger.h"
+#include "core/Core.h"
+#include "properties/Configure.h"
+#include "properties/Properties.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "utils/Id.h"
+#include "spdlog/sinks/ostream_sink.h"
+#include "spdlog/sinks/dist_sink.h"
+#include "core/Core.h"
+#include "core/FlowFile.h"
+#include "core/Processor.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/ProcessorNode.h"
+#include "core/reporting/SiteToSiteProvenanceReportingTask.h"
+
+
+class ExecutionPlan {
+ public:
+
+  explicit ExecutionPlan(std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::Repository> prov_repo);
+
+  std::shared_ptr<core::Processor> addProcessor(const std::shared_ptr<core::Processor> &processor, const std::string &name,
+                                                core::Relationship relationship = core::Relationship("success", "description"),
+                                                bool linkToPrevious = false);
+
+  std::shared_ptr<core::Processor> addProcessor(const std::string &processor_name, const std::string &name, core::Relationship relationship = core::Relationship("success", "description"),
+  bool linkToPrevious = false);
+
+  bool setProperty(const std::shared_ptr<core::Processor> proc, const std::string &prop, const std::string &value);
+
+  void reset();
+
+  bool runNextProcessor(std::function<void(const std::shared_ptr<core::ProcessContext>, const std::shared_ptr<core::ProcessSession>)> verify = nullptr);
+
+  std::set<provenance::ProvenanceEventRecord*> getProvenanceRecords();
+
+  std::shared_ptr<core::FlowFile> getCurrentFlowFile();
+
+  std::shared_ptr<core::Repository> getFlowRepo() {
+    return flow_repo_;
+  }
+
+  std::shared_ptr<core::Repository> getProvenanceRepo() {
+    return prov_repo_;
+  }
+
+  std::shared_ptr<core::ContentRepository> getContentRepo() {
+    return content_repo_;
+  }
+
+ protected:
+
+  void finalize();
+
+  std::shared_ptr<minifi::Connection> buildFinalConnection(std::shared_ptr<core::Processor> processor, bool setDest = false);
+
+  std::shared_ptr<org::apache::nifi::minifi::io::StreamFactory> stream_factory;
+
+  std::shared_ptr<core::ContentRepository> content_repo_;
+
+  std::shared_ptr<core::Repository> flow_repo_;
+  std::shared_ptr<core::Repository> prov_repo_;
+
+  std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider_;
+
+  std::atomic<bool> finalized;
+
+  int location;
+
+  std::shared_ptr<core::ProcessSession> current_session_;
+  std::shared_ptr<core::FlowFile> current_flowfile_;
+
+  std::map<std::string, std::shared_ptr<core::Processor>> processor_mapping_;
+  std::vector<std::shared_ptr<core::Processor>> processor_queue_;
+  std::vector<std::shared_ptr<core::Processor>> configured_processors_;
+  std::vector<std::shared_ptr<core::ProcessorNode>> processor_nodes_;
+  std::vector<std::shared_ptr<core::ProcessContext>> processor_contexts_;
+  std::vector<std::shared_ptr<core::ProcessSession>> process_sessions_;
+  std::vector<std::shared_ptr<core::ProcessSessionFactory>> factories_;
+  std::vector<std::shared_ptr<minifi::Connection>> relationships_;
+  core::Relationship termination_;
+
+ private:
+
+  std::shared_ptr<logging::Logger> logger_;
+};
+
+#endif /* LIBMINIFI_CAPI_PLAN_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/088e1be7/libminifi/include/capi/ReflexiveSession.h
----------------------------------------------------------------------
diff --git a/libminifi/include/capi/ReflexiveSession.h b/libminifi/include/capi/ReflexiveSession.h
new file mode 100644
index 0000000..ebf6cbe
--- /dev/null
+++ b/libminifi/include/capi/ReflexiveSession.h
@@ -0,0 +1,77 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __REFLEXIVE_SESSION_H__
+#define __REFLEXIVE_SESSION_H__
+
+#include <uuid/uuid.h>
+#include <vector>
+#include <queue>
+#include <map>
+#include <mutex>
+#include <atomic>
+#include <algorithm>
+#include <set>
+
+#include "core/ProcessSession.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+
+// ReflexiveSession Class
+class ReflexiveSession : public ProcessSession{
+ public:
+  // Constructor
+  /*!
+   * Create a new process session
+   */
+  ReflexiveSession(std::shared_ptr<ProcessContext> processContext = nullptr)
+      : ProcessSession(processContext){
+  }
+
+// Destructor
+  virtual ~ReflexiveSession() {
+  }
+
+   virtual std::shared_ptr<core::FlowFile> get(){
+     auto prevff = ff;
+     ff = nullptr;
+     return prevff;
+   }
+
+   virtual void add(const std::shared_ptr<core::FlowFile> &flow){
+     ff = flow;
+   }
+   virtual void transfer(const std::shared_ptr<core::FlowFile> &flow, Relationship relationship){
+     // no op
+   }
+ protected:
+  //
+  // Get the FlowFile from the highest priority queue
+  std::shared_ptr<core::FlowFile> ff;
+
+};
+
+} /* namespace core */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/088e1be7/libminifi/include/capi/api.h
----------------------------------------------------------------------
diff --git a/libminifi/include/capi/api.h b/libminifi/include/capi/api.h
new file mode 100644
index 0000000..440329e
--- /dev/null
+++ b/libminifi/include/capi/api.h
@@ -0,0 +1,154 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CAPI_NANOFI_H_
+#define LIBMINIFI_INCLUDE_CAPI_NANOFI_H_
+
+#include <stdint.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#define API_VERSION "0.01"
+/****
+ * ##################################################################
+ *  BASE NIFI OPERATIONS
+ * ##################################################################
+ */
+
+
+/**
+ * NiFi Port struct
+ */
+typedef struct {
+    char *pord_id;
+}nifi_port;
+
+
+/**
+ * Nifi instance struct
+ */
+typedef struct {
+
+  void *instance_ptr;
+
+  nifi_port port;
+
+} nifi_instance;
+
+
+nifi_instance *create_instance(char *url, nifi_port *port);
+
+void set_property(nifi_instance *, char *, char *);
+
+void initialize_instance(nifi_instance *);
+
+void free_instance(nifi_instance*);
+
+
+/****
+ * ##################################################################
+ *  Processor OPERATIONS
+ * ##################################################################
+ */
+
+typedef struct {
+  void *processor_ptr;
+} processor;
+
+uint8_t run_processor(const processor *processor);
+
+
+/****
+ * ##################################################################
+ *  FLOWFILE OPERATIONS
+ * ##################################################################
+ */
+
+typedef struct{
+  char *key;
+  void *value;
+  size_t value_size;
+} attribute;
+
+/**
+ * State of a flow file
+ *
+ */
+typedef struct {
+  uint64_t size; /**< Size in bytes of the data corresponding to this flow file */
+
+  char * contentLocation; /**< Filesystem location of this object */
+
+  void *attributes; /**< Hash map of attributes */
+
+
+} flow_file_record;
+
+
+
+typedef struct  {
+    void *plan;
+} flow;
+
+
+flow *create_flow(nifi_instance *, const char *);
+
+void free_flow(flow *);
+
+flow_file_record *get_next_flow_file(nifi_instance *, flow *);
+
+size_t get_flow_files(nifi_instance *, flow *, flow_file_record **, size_t );
+
+
+/**
+ * Creates a flow file object.
+ * Will obtain the size of file
+ */
+flow_file_record* create_flowfile(const char *file);
+
+void free_flowfile(flow_file_record*);
+
+uint8_t add_attribute(flow_file_record*, char *key, void *value, size_t size);
+
+void *get_attribute(flow_file_record*, char *key);
+
+uint8_t remove_attribute(flow_file_record*, char *key);
+
+/****
+ * ##################################################################
+ *  Remote NIFI OPERATIONS
+ * ##################################################################
+ */
+
+void transmit_flowfile(flow_file_record *, nifi_instance *);
+
+
+/****
+ * ##################################################################
+ *  Persistence Operations
+ * ##################################################################
+ */
+
+void transmit_flowfile(flow_file_record *, nifi_instance *);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* LIBMINIFI_INCLUDE_CAPI_NANOFI_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/088e1be7/libminifi/include/capi/expect.h
----------------------------------------------------------------------
diff --git a/libminifi/include/capi/expect.h b/libminifi/include/capi/expect.h
new file mode 100644
index 0000000..ead182c
--- /dev/null
+++ b/libminifi/include/capi/expect.h
@@ -0,0 +1,32 @@
+/**
+
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CAPI_EXPECT_H_
+#define LIBMINIFI_INCLUDE_CAPI_EXPECT_H_
+
+
+// various likely/unlikely pragmas I've carried over the years.
+// you'll see this in many projects
+#if defined(__GNUC__) && __GNUC__ >= 4
+#define LIKELY(x)   (__builtin_expect((x), 1))
+#define UNLIKELY(x) (__builtin_expect((x), 0))
+#else
+#define LIKELY(x)   (x)
+#define UNLIKELY(x) (x)
+#endif
+
+#endif /* LIBMINIFI_INCLUDE_CAPI_EXPECT_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/088e1be7/libminifi/include/core/ProcessContext.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/ProcessContext.h b/libminifi/include/core/ProcessContext.h
index 9759c16..ad946a5 100644
--- a/libminifi/include/core/ProcessContext.h
+++ b/libminifi/include/core/ProcessContext.h
@@ -125,7 +125,9 @@ class ProcessContext : public controller::ControllerServiceLookup {
    * identifier
    */
   std::shared_ptr<core::controller::ControllerService> getControllerService(const std::string &identifier) {
-    return controller_service_provider_->getControllerServiceForComponent(identifier, processor_node_->getUUIDStr());
+    if (controller_service_provider_ != nullptr)
+      return controller_service_provider_->getControllerServiceForComponent(identifier, processor_node_->getUUIDStr());
+    return nullptr;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/088e1be7/libminifi/include/core/ProcessSession.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/ProcessSession.h b/libminifi/include/core/ProcessSession.h
index 30d0563..38a713d 100644
--- a/libminifi/include/core/ProcessSession.h
+++ b/libminifi/include/core/ProcessSession.h
@@ -70,7 +70,7 @@ class ProcessSession {
   }
   //
   // Get the FlowFile from the highest priority queue
-  std::shared_ptr<core::FlowFile> get();
+  virtual std::shared_ptr<core::FlowFile> get();
   // Create a new UUID FlowFile with no content resource claim and without parent
   std::shared_ptr<core::FlowFile> create();
   // Create a new UUID FlowFile with no content resource claim and inherit all attributes from parent
@@ -78,7 +78,7 @@ class ProcessSession {
   // Create a new UUID FlowFile with no content resource claim and inherit all attributes from parent
   std::shared_ptr<core::FlowFile> create(const std::shared_ptr<core::FlowFile> &parent);
   // Add a FlowFile to the session
-  void add(const std::shared_ptr<core::FlowFile> &flow);
+  virtual void add(const std::shared_ptr<core::FlowFile> &flow);
 // Clone a new UUID FlowFile from parent both for content resource claim and attributes
   std::shared_ptr<core::FlowFile> clone(const std::shared_ptr<core::FlowFile> &parent);
   // Clone a new UUID FlowFile from parent for attributes and sub set of parent content resource claim
@@ -86,7 +86,7 @@ class ProcessSession {
   // Duplicate a FlowFile with the same UUID and all attributes and content resource claim for the roll back of the session
   std::shared_ptr<core::FlowFile> duplicate(const std::shared_ptr<core::FlowFile> &original);
   // Transfer the FlowFile to the relationship
-  void transfer(const std::shared_ptr<core::FlowFile> &flow, Relationship relationship);
+  virtual void transfer(const std::shared_ptr<core::FlowFile> &flow, Relationship relationship);
   // Put Attribute
   void putAttribute(const std::shared_ptr<core::FlowFile> &flow, std::string key, std::string value);
   // Remove Attribute

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/088e1be7/libminifi/include/sitetosite/SiteToSiteFactory.h
----------------------------------------------------------------------
diff --git a/libminifi/include/sitetosite/SiteToSiteFactory.h b/libminifi/include/sitetosite/SiteToSiteFactory.h
index 4beefc6..648347d 100644
--- a/libminifi/include/sitetosite/SiteToSiteFactory.h
+++ b/libminifi/include/sitetosite/SiteToSiteFactory.h
@@ -71,6 +71,9 @@ static std::unique_ptr<SiteToSiteClient> createClient(const SiteToSiteClientConf
       if (nullptr != http_protocol) {
         auto ptr = std::unique_ptr<SiteToSiteClient>(static_cast<SiteToSiteClient*>(http_protocol));
         auto peer = std::unique_ptr<SiteToSitePeer>(new SiteToSitePeer(client_configuration.getPeer()->getHost(), client_configuration.getPeer()->getPort()));
+        char idStr[37];
+            uuid_unparse_lower(uuid, idStr);
+            std::cout << "sending " << idStr << std::endl;
         ptr->setPortId(uuid);
         ptr->setPeer(std::move(peer));
         return ptr;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/088e1be7/libminifi/src/RemoteProcessorGroupPort.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/RemoteProcessorGroupPort.cpp b/libminifi/src/RemoteProcessorGroupPort.cpp
index 76932de..38ba889 100644
--- a/libminifi/src/RemoteProcessorGroupPort.cpp
+++ b/libminifi/src/RemoteProcessorGroupPort.cpp
@@ -156,6 +156,7 @@ void RemoteProcessorGroupPort::onSchedule(const std::shared_ptr<core::ProcessCon
   if (context->getProperty(portUUID.getName(), value)) {
     uuid_parse(value.c_str(), protocol_uuid_);
   }
+
   std::string context_name;
   if (!context->getProperty(SSLContext.getName(), context_name) || IsNullOrEmpty(context_name)) {
     context_name = RPG_SSL_CONTEXT_SERVICE_NAME;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/088e1be7/libminifi/src/capi/Plan.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/capi/Plan.cpp b/libminifi/src/capi/Plan.cpp
new file mode 100644
index 0000000..f038e3d
--- /dev/null
+++ b/libminifi/src/capi/Plan.cpp
@@ -0,0 +1,216 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "capi/Plan.h"
+#include <memory>
+#include <vector>
+#include <set>
+#include <string>
+
+ExecutionPlan::ExecutionPlan(std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::Repository> prov_repo)
+    :
+      content_repo_(content_repo),
+      flow_repo_(flow_repo),
+      prov_repo_(prov_repo),
+      finalized(false),
+      location(-1),
+      current_flowfile_(nullptr),
+      logger_(logging::LoggerFactory<ExecutionPlan>::getLogger()) {
+  stream_factory = std::make_shared<org::apache::nifi::minifi::io::StreamFactory>(std::make_shared<minifi::Configure>());
+}
+
+std::shared_ptr<core::Processor> ExecutionPlan::addProcessor(const std::shared_ptr<core::Processor> &processor, const std::string &name, core::Relationship relationship,
+bool linkToPrevious) {
+  if (finalized) {
+    return nullptr;
+  }
+
+
+  uuid_t uuid;
+  uuid_generate(uuid);
+
+  processor->setStreamFactory(stream_factory);
+  // initialize the processor
+  processor->initialize();
+
+  processor_mapping_[processor->getUUIDStr()] = processor;
+
+  if (!linkToPrevious) {
+    termination_ = relationship;
+  } else {
+    std::shared_ptr<core::Processor> last = processor_queue_.back();
+
+    if (last == nullptr) {
+      last = processor;
+      termination_ = relationship;
+    }
+
+    std::stringstream connection_name;
+    connection_name << last->getUUIDStr() << "-to-" << processor->getUUIDStr();
+    logger_->log_info("Creating %s connection for proc %d", connection_name.str(), processor_queue_.size() + 1);
+    std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(flow_repo_, content_repo_, connection_name.str());
+    connection->setRelationship(relationship);
+
+    // link the connections so that we can test results at the end for this
+    connection->setSource(last);
+    connection->setDestination(processor);
+
+    uuid_t uuid_copy, uuid_copy_next;
+    last->getUUID(uuid_copy);
+    connection->setSourceUUID(uuid_copy);
+    processor->getUUID(uuid_copy_next);
+    connection->setDestinationUUID(uuid_copy_next);
+    last->addConnection(connection);
+    if (last != processor) {
+      processor->addConnection(connection);
+    }
+    relationships_.push_back(connection);
+  }
+
+  std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor);
+
+  processor_nodes_.push_back(node);
+
+  std::shared_ptr<core::ProcessContext> context = std::make_shared<core::ProcessContext>(node, controller_services_provider_, prov_repo_, flow_repo_, content_repo_);
+  processor_contexts_.push_back(context);
+
+  processor_queue_.push_back(processor);
+
+  return processor;
+}
+
+std::shared_ptr<core::Processor> ExecutionPlan::addProcessor(const std::string &processor_name, const std::string &name, core::Relationship relationship,
+bool linkToPrevious) {
+  if (finalized) {
+    return nullptr;
+  }
+
+  uuid_t uuid;
+  uuid_generate(uuid);
+
+  auto ptr = core::ClassLoader::getDefaultClassLoader().instantiate(processor_name, uuid);
+  if (nullptr == ptr) {
+    throw std::exception();
+  }
+  std::shared_ptr<core::Processor> processor = std::static_pointer_cast<core::Processor>(ptr);
+
+  processor->setName(name);
+
+  return addProcessor(processor, name, relationship, linkToPrevious);
+}
+
+bool ExecutionPlan::setProperty(const std::shared_ptr<core::Processor> proc, const std::string &prop, const std::string &value) {
+  uint32_t i = 0;
+  logger_->log_info("Attempting to set property %s %s for %s", prop, value, proc->getName());
+  for (i = 0; i < processor_queue_.size(); i++) {
+    if (processor_queue_.at(i) == proc) {
+      break;
+    }
+  }
+
+  if (i >= processor_queue_.size() || i == 0 || i >= processor_contexts_.size()) {
+    return false;
+  }
+
+  return processor_contexts_.at(i)->setProperty(prop, value);
+}
+
+void ExecutionPlan::reset() {
+  process_sessions_.clear();
+  factories_.clear();
+  location = -1;
+  for (auto proc : processor_queue_) {
+    while (proc->getActiveTasks() > 0) {
+      proc->decrementActiveTask();
+    }
+  }
+}
+
+
+bool ExecutionPlan::runNextProcessor(std::function<void(const std::shared_ptr<core::ProcessContext>, const std::shared_ptr<core::ProcessSession>)> verify) {
+  if (!finalized) {
+    finalize();
+  }
+  logger_->log_info("Running next processor %d, processor_queue_.size %d, processor_contexts_.size %d", location, processor_queue_.size(), processor_contexts_.size());
+
+  location++;
+  std::shared_ptr<core::Processor> processor = processor_queue_.at(location);
+  std::shared_ptr<core::ProcessContext> context = processor_contexts_.at(location);
+  std::shared_ptr<core::ProcessSessionFactory> factory = std::make_shared<core::ProcessSessionFactory>(context);
+  factories_.push_back(factory);
+  if (std::find(configured_processors_.begin(), configured_processors_.end(), processor) == configured_processors_.end()) {
+    processor->onSchedule(context, factory);
+    configured_processors_.push_back(processor);
+  }
+  std::shared_ptr<core::ProcessSession> current_session = std::make_shared<core::ProcessSession>(context);
+  process_sessions_.push_back(current_session);
+  processor->incrementActiveTasks();
+  processor->setScheduledState(core::ScheduledState::RUNNING);
+  if (verify != nullptr) {
+    verify(context, current_session);
+  } else {
+    logger_->log_info("Running %s", processor->getName());
+    processor->onTrigger(context, current_session);
+  }
+  current_session->commit();
+  current_flowfile_ = current_session->get();
+  return location + 1 < processor_queue_.size();
+}
+
+std::set<provenance::ProvenanceEventRecord*> ExecutionPlan::getProvenanceRecords() {
+  return process_sessions_.at(location)->getProvenanceReporter()->getEvents();
+}
+
+std::shared_ptr<core::FlowFile> ExecutionPlan::getCurrentFlowFile() {
+  return current_flowfile_;
+}
+
+std::shared_ptr<minifi::Connection> ExecutionPlan::buildFinalConnection(std::shared_ptr<core::Processor> processor, bool setDest) {
+  std::stringstream connection_name;
+  std::shared_ptr<core::Processor> last = processor;
+  connection_name << last->getUUIDStr() << "-to-" << processor->getUUIDStr();
+  std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(flow_repo_, content_repo_, connection_name.str());
+  connection->setRelationship(termination_);
+
+  // link the connections so that we can test results at the end for this
+  connection->setSource(last);
+  if (setDest)
+    connection->setDestination(processor);
+
+  uuid_t uuid_copy;
+  last->getUUID(uuid_copy);
+  connection->setSourceUUID(uuid_copy);
+  if (setDest)
+    connection->setDestinationUUID(uuid_copy);
+
+  processor->addConnection(connection);
+  return connection;
+}
+
+void ExecutionPlan::finalize() {
+  if (relationships_.size() > 0) {
+    relationships_.push_back(buildFinalConnection(processor_queue_.back()));
+  } else {
+    for (auto processor : processor_queue_) {
+      relationships_.push_back(buildFinalConnection(processor, true));
+    }
+  }
+
+  finalized = true;
+}
+

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/088e1be7/libminifi/src/capi/api.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/capi/api.cpp b/libminifi/src/capi/api.cpp
new file mode 100644
index 0000000..e46fcad
--- /dev/null
+++ b/libminifi/src/capi/api.cpp
@@ -0,0 +1,264 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <string>
+#include <map>
+#include <memory>
+#include <utility>
+#include "core/Core.h"
+#include "capi/api.h"
+#include "capi/expect.h"
+#include "capi/Instance.h"
+#include "capi/Plan.h"
+#include "ResourceClaim.h"
+
+class DirectoryConfiguration {
+ protected:
+  DirectoryConfiguration() {
+    minifi::setDefaultDirectory(DEFAULT_CONTENT_DIRECTORY);
+  }
+ public:
+  static void initialize() {
+    static DirectoryConfiguration configure;
+  }
+};
+
+/**
+ * Creates a NiFi Instance from the url and output port.
+ * @param url http URL for NiFi instance
+ * @param port Remote output port.
+ */
+nifi_instance *create_instance(char *url, nifi_port *port) {
+  // make sure that we have a thread safe way of initializing the content directory
+  DirectoryConfiguration::initialize();
+
+  nifi_instance *instance = new nifi_instance;
+
+  instance->instance_ptr = new minifi::Instance(url, port->pord_id);
+  instance->port.pord_id = port->pord_id;
+
+  return instance;
+}
+
+/**
+ * Initializes the instance
+ */
+void initialize_instance(nifi_instance *instance) {
+  auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr);
+  minifi_instance_ref->setRemotePort(instance->port.pord_id);
+}
+
+/**
+ * Sets a property within the nifi instance
+ * @param instance nifi instance
+ * @param key key in which we will set the valiue
+ * @param value
+ */
+void set_property(nifi_instance *instance, char *key, char *value) {
+  auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr);
+  minifi_instance_ref->getConfiguration()->set(key, value);
+}
+
+/**
+ * Reclaims memory associated with a nifi instance structure.
+ * @param instance nifi instance.
+ */
+void free_instance(nifi_instance* instance) {
+  if (instance != nullptr) {
+    delete ((minifi::Instance*) instance->instance_ptr);
+    delete instance;
+  }
+}
+
+/**
+ * Creates a flow file record
+ * @param file file to place into the flow file.
+ */
+flow_file_record* create_flowfile(const char *file) {
+  flow_file_record *new_ff = new flow_file_record;
+  new_ff->attributes = new std::map<std::string, std::string>();
+  new_ff->contentLocation = new char[strlen(file)];
+  snprintf(new_ff->contentLocation, strlen(file), "%s", file);
+  std::ifstream in(file, std::ifstream::ate | std::ifstream::binary);
+  // set the size of the flow file.
+  new_ff->size = in.tellg();
+
+  return new_ff;
+}
+
+/**
+ * Reclaims memory associated with a flow file object
+ * @param ff flow file record.
+ */
+void free_flowfile(flow_file_record *ff) {
+  if (ff != nullptr) {
+    auto map = static_cast<std::map<std::string, std::string>*>(ff->attributes);
+    delete[] ff->contentLocation;
+    delete map;
+    delete ff;
+  }
+}
+
+/**
+ * Adds an attribute
+ * @param ff flow file record
+ * @param key key
+ * @param value value to add
+ * @param size size of value
+ * @return 0
+ */
+uint8_t add_attribute(flow_file_record *ff, char *key, void *value, size_t size) {
+  auto attribute_map = static_cast<std::map<std::string, std::string>*>(ff->attributes);
+  attribute_map->insert(std::pair<std::string, std::string>(key, std::string(static_cast<char*>(value), size)));
+  return 0;
+}
+
+/*
+ * Obtains the attribute.
+ * @param ff flow file record
+ * @param key key
+ * @param caller_attribute caller supplied object in which we will copy the data ptr
+ * @return 0 if successful, -1 if the key does not exist
+ */
+uint8_t get_attribute(flow_file_record *ff, char *key, attribute *caller_attribute) {
+  auto attribute_map = static_cast<std::map<std::string, std::string>*>(ff->attributes);
+  auto find = attribute_map->find(key);
+  if (find != attribute_map->end()) {
+    caller_attribute->key = key;
+    caller_attribute->value = static_cast<void*>(const_cast<char*>(find->second.data()));
+    caller_attribute->value_size = find->second.size();
+    return 0;
+  }
+  return -1;
+}
+
+/**
+ * Removes a key from the attribute chain
+ * @param ff flow file record
+ * @param key key to remove
+ * @return 0 if removed, -1 otherwise
+ */
+uint8_t remove_attribute(flow_file_record *ff, char *key) {
+  auto attribute_map = static_cast<std::map<std::string, std::string>*>(ff->attributes);
+  auto find = attribute_map->find(key);
+  if (find != attribute_map->end()) {
+    attribute_map->erase(find);
+    return 0;
+  }
+  return -1;
+}
+
+/**
+ * Transmits the flowfile
+ * @param ff flow file record
+ * @param instance nifi instance structure
+ */
+void transmit_flowfile(flow_file_record *ff, nifi_instance *instance) {
+  auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr);
+  // in the unlikely event the user forgot to initialize the instance, we shall do it for them.
+  if (UNLIKELY(minifi_instance_ref->isRPGConfigured() == false)) {
+    minifi_instance_ref->setRemotePort(instance->port.pord_id);
+  }
+
+  auto attribute_map = static_cast<std::map<std::string, std::string>*>(ff->attributes);
+
+  auto no_op = minifi_instance_ref->getNoOpRepository();
+
+  auto content_repo = minifi_instance_ref->getContentRepository();
+
+  std::shared_ptr<minifi::ResourceClaim> claim = std::make_shared<minifi::ResourceClaim>(ff->contentLocation, content_repo);
+  claim->increaseFlowFileRecordOwnedCount();
+  claim->increaseFlowFileRecordOwnedCount();
+
+  auto ffr = std::make_shared<minifi::FlowFileRecord>(no_op, content_repo, *attribute_map, claim);
+  ffr->addAttribute("nanofi.version", API_VERSION);
+  ffr->setSize(ff->size);
+
+  std::string port_uuid = instance->port.pord_id;
+
+  minifi_instance_ref->transfer(ffr);
+}
+
+flow *create_flow(nifi_instance *instance, const char *first_processor) {
+  auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr);
+  flow *new_flow = new flow;
+
+  auto execution_plan = new ExecutionPlan(minifi_instance_ref->getContentRepository(), minifi_instance_ref->getNoOpRepository(), minifi_instance_ref->getNoOpRepository());
+
+  new_flow->plan = execution_plan;
+
+  // automatically adds it with success
+  execution_plan->addProcessor(first_processor, first_processor);
+
+  return new_flow;
+}
+
+void free_flow(flow *flow) {
+  if (flow == nullptr)
+    return;
+  auto execution_plan = static_cast<ExecutionPlan*>(flow->plan);
+  delete execution_plan;
+  delete flow;
+}
+
+flow_file_record *get_next_flow_file(nifi_instance *instance, flow *flow) {
+  auto execution_plan = static_cast<ExecutionPlan*>(flow->plan);
+
+  execution_plan->reset();
+  while (execution_plan->runNextProcessor()) {
+  }
+  auto ff = execution_plan->getCurrentFlowFile();
+  if (ff == nullptr)
+    return nullptr;
+  auto claim = ff->getResourceClaim();
+
+  if (claim != nullptr) {
+    // create a flow file.
+    claim->increaseFlowFileRecordOwnedCount();
+    auto path = claim->getContentFullPath();
+    auto ffr = create_flowfile(path.c_str());
+    std::cout << "dang created " << path << " " << ff->getSize() << std::endl;
+    return ffr;
+  } else {
+    return nullptr;
+  }
+}
+
+size_t get_flow_files(nifi_instance *instance, flow *flow, flow_file_record **ff_r, size_t size) {
+  auto execution_plan = static_cast<ExecutionPlan*>(flow->plan);
+  int i = 0;
+  for (; i < size; i++) {
+    execution_plan->reset();
+    while (execution_plan->runNextProcessor()) {
+    }
+    auto ff = execution_plan->getCurrentFlowFile();
+    if (ff == nullptr)
+      break;
+    auto claim = ff->getResourceClaim();
+
+    if (claim != nullptr) {
+      claim->increaseFlowFileRecordOwnedCount();
+
+      auto path = claim->getContentFullPath();
+      // create a flow file.
+      ff_r[i] = create_flowfile(path.c_str());
+    } else {
+      break;
+    }
+  }
+  return i;
+}


Mime
View raw message