nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ald...@apache.org
Subject [4/8] nifi-minifi-cpp git commit: MINIFI-339: Add C2 base allowing for 1 protocol and n heartbeat reporters MINIFI-339: Add GetTCP Processor MINIFI-339: Add listener server MINIFI-339: Update to listener MINIFI-339: Resolve Issue with stack based process
Date Mon, 02 Oct 2017 14:57:17 GMT
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/c2/C2Payload.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/c2/C2Payload.cpp b/libminifi/src/c2/C2Payload.cpp
new file mode 100644
index 0000000..2a737d4
--- /dev/null
+++ b/libminifi/src/c2/C2Payload.cpp
@@ -0,0 +1,219 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "c2/C2Payload.h"
+#include <utility>
+#include <vector>
+#include <string>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace c2 {
+
+C2ContentResponse::C2ContentResponse(Operation op)
+    : op(op),
+      required(false),
+      delay(0),
+      ttl(-1) {
+}
+
+C2ContentResponse::C2ContentResponse(const C2ContentResponse &other)
+    : op(other.op),
+      required(other.required),
+      delay(other.delay),
+      ttl(other.ttl),
+      name(other.name),
+      ident(other.ident),
+      operation_arguments(other.operation_arguments) {
+}
+
+C2ContentResponse::C2ContentResponse(const C2ContentResponse &&other)
+    : op(other.op),
+      required(other.required),
+      delay(std::move(other.delay)),
+      ttl(std::move(other.ttl)),
+      ident(std::move(other.ident)),
+      name(std::move(other.name)),
+      operation_arguments(std::move(other.operation_arguments)) {
+}
+
+C2ContentResponse &C2ContentResponse::operator=(const C2ContentResponse &&other) {
+  op = other.op;
+  required = other.required;
+  delay = std::move(other.delay);
+  ttl = std::move(other.ttl);
+  name = std::move(other.name);
+  ident = std::move(other.ident);
+  operation_arguments = std::move(other.operation_arguments);
+  return *this;
+}
+
+C2ContentResponse &C2ContentResponse::operator=(const C2ContentResponse &other) {
+  op = other.op;
+  required = other.required;
+  delay = other.delay;
+  ttl = other.ttl;
+  name = other.name;
+  operation_arguments = other.operation_arguments;
+  return *this;
+}
+
+C2Payload::C2Payload(Operation op, std::string identifier, bool resp, bool isRaw)
+    : state::Update(state::UpdateStatus(state::UpdateState::INITIATE, 0)),
+      op_(op),
+      raw_(isRaw),
+      ident_(identifier),
+      isResponse(resp) {
+}
+
+C2Payload::C2Payload(Operation op, bool resp, bool isRaw)
+    : state::Update(state::UpdateStatus(state::UpdateState::INITIATE, 0)),
+      op_(op),
+      raw_(isRaw),
+      isResponse(resp) {
+}
+
+C2Payload::C2Payload(Operation op, state::UpdateState state, bool resp, bool isRaw)
+    : state::Update(state::UpdateStatus(state, 0)),
+      op_(op),
+      raw_(isRaw),
+      isResponse(resp) {
+}
+
+C2Payload::C2Payload(const C2Payload &other)
+    : state::Update(other),
+      isResponse(other.isResponse),
+      op_(other.op_),
+      raw_(other.raw_),
+      label_(other.label_),
+      ident_(other.ident_),
+      raw_data_(other.raw_data_),
+      payloads_(other.payloads_),
+      content_(other.content_) {
+}
+
+C2Payload::C2Payload(const C2Payload &&other)
+    : state::Update(std::move(other)),
+      isResponse(other.isResponse),
+      op_(std::move(other.op_)),
+      raw_(other.raw_),
+      label_(std::move(other.label_)),
+      ident_(std::move(other.ident_)),
+      raw_data_(std::move(other.raw_data_)),
+      payloads_(std::move(other.payloads_)),
+      content_(std::move(other.content_)) {
+}
+
+void C2Payload::setIdentifier(const std::string &ident) {
+  ident_ = ident;
+}
+
+std::string C2Payload::getIdentifier() const {
+  return ident_;
+}
+
+Operation C2Payload::getOperation() const {
+  return op_;
+}
+
+bool C2Payload::validate() {
+  return true;
+}
+
+const std::vector<C2ContentResponse> &C2Payload::getContent() const {
+  return content_;
+}
+
+void C2Payload::addContent(const C2ContentResponse &&content) {
+  for (auto &existing_content : content_) {
+    if (existing_content.name == content.name) {
+      for (auto subcontent : existing_content.operation_arguments) {
+      }
+
+      for (auto subcontent : content.operation_arguments) {
+      }
+
+      existing_content.operation_arguments.insert(content.operation_arguments.begin(), content.operation_arguments.end());
+
+      for (auto subcontent : existing_content.operation_arguments) {
+      }
+
+      return;
+    }
+  }
+  content_.push_back(std::move(content));
+}
+
+bool C2Payload::isRaw() const {
+  return raw_;
+}
+
+void C2Payload::setRawData(const std::string &data) {
+  raw_data_ = data;
+}
+
+void C2Payload::setRawData(const std::vector<char> &data) {
+  raw_data_ = std::string(data.data(), data.size());
+}
+
+std::string C2Payload::getRawData() const {
+  return raw_data_;
+}
+
+void C2Payload::addPayload(const C2Payload &&payload) {
+  payloads_.push_back(std::move(payload));
+}
+const std::vector<C2Payload> &C2Payload::getNestedPayloads() const {
+  return payloads_;
+}
+
+C2Payload &C2Payload::operator=(const C2Payload &&other) {
+  state::Update::operator=(std::move(other));
+  isResponse = other.isResponse;
+  op_ = std::move(other.op_);
+  raw_ = other.raw_;
+  if (raw_) {
+    raw_data_ = std::move(raw_data_);
+  }
+  label_ = std::move(other.label_);
+  payloads_ = std::move(other.payloads_);
+  content_ = std::move(other.content_);
+  return *this;
+}
+
+C2Payload &C2Payload::operator=(const C2Payload &other) {
+  state::Update::operator=(other);
+  isResponse = other.isResponse;
+  op_ = other.op_;
+  raw_ = other.raw_;
+  if (raw_) {
+    raw_data_ = other.raw_data_;
+  }
+  label_ = other.label_;
+  payloads_ = other.payloads_;
+  content_ = other.content_;
+  return *this;
+}
+
+} /* namespace c2 */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/c2/protocols/RESTProtocol.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/c2/protocols/RESTProtocol.cpp b/libminifi/src/c2/protocols/RESTProtocol.cpp
new file mode 100644
index 0000000..c8babb3
--- /dev/null
+++ b/libminifi/src/c2/protocols/RESTProtocol.cpp
@@ -0,0 +1,177 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "c2/protocols/RESTProtocol.h"
+
+#include <algorithm>
+#include <memory>
+#include <utility>
+#include <map>
+#include <string>
+#include <vector>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace c2 {
+
+const C2Payload RESTProtocol::parseJsonResponse(const C2Payload &payload, const std::vector<char> &response) {
+  Json::Reader reader;
+  Json::Value root;
+  try {
+    if (reader.parse(std::string(response.data(), response.size()), root)) {
+      std::string requested_operation = getOperation(payload);
+
+      std::string identifier;
+      if (root.isMember("operationid")) {
+        identifier = root["operationid"].asString();
+      }
+      if (root["operation"].asString() == requested_operation) {
+        if (root["requested_operations"].size() == 0) {
+          return std::move(C2Payload(payload.getOperation(), state::UpdateState::READ_COMPLETE, true));
+        }
+        C2Payload new_payload(payload.getOperation(), state::UpdateState::NESTED, true);
+
+        new_payload.setIdentifier(identifier);
+
+        for (const Json::Value& request : root["requested_operations"]) {
+          Operation newOp = stringToOperation(request["operation"].asString());
+          C2Payload nested_payload(newOp, state::UpdateState::READ_COMPLETE, true);
+          C2ContentResponse new_command(newOp);
+          new_command.delay = 0;
+          new_command.required = true;
+          new_command.ttl = -1;
+          // set the identifier if one exists
+          if (request.isMember("operationid")) {
+            new_command.ident = request["operationid"].asString();
+            nested_payload.setIdentifier(new_command.ident);
+          }
+          new_command.name = request["name"].asString();
+
+          if (request.isMember("content") && request["content"].size() > 0) {
+            for (const auto &name : request["content"].getMemberNames()) {
+              new_command.operation_arguments[name] = request["content"][name].asString();
+            }
+          }
+          nested_payload.addContent(std::move(new_command));
+          new_payload.addPayload(std::move(nested_payload));
+        }
+        // we have a response for this request
+        return std::move(new_payload);
+      }
+    }
+  } catch (...) {
+  }
+  return std::move(C2Payload(payload.getOperation(), state::UpdateState::READ_ERROR, true));
+}
+
+Json::Value RESTProtocol::serializeJsonPayload(Json::Value &json_root, const C2Payload &payload) {
+  // get the name from the content
+  Json::Value json_payload;
+  std::map<std::string, std::vector<Json::Value>> children;
+  for (const auto &nested_payload : payload.getNestedPayloads()) {
+    Json::Value child_payload = serializeJsonPayload(json_payload, nested_payload);
+    children[nested_payload.getLabel()].push_back(child_payload);
+  }
+  for (auto child_vector : children) {
+    if (child_vector.second.size() > 1) {
+      Json::Value children_json(Json::arrayValue);
+      for (auto child : child_vector.second) {
+        json_payload[child_vector.first] = child;
+      }
+    } else {
+      if (child_vector.second.size() == 1) {
+        if (child_vector.second.at(0).isMember(child_vector.first)) {
+          json_payload[child_vector.first] = child_vector.second.at(0)[child_vector.first];
+        } else {
+          json_payload[child_vector.first] = child_vector.second.at(0);
+        }
+      }
+    }
+  }
+
+  const std::vector<C2ContentResponse> &content = payload.getContent();
+  for (const auto &payload_content : content) {
+    Json::Value payload_content_values;
+    bool use_sub_option = true;
+    if (payload_content.op == payload.getOperation()) {
+      for (auto content : payload_content.operation_arguments) {
+        if (payload_content.operation_arguments.size() == 1 && payload_content.name == content.first) {
+          json_payload[payload_content.name] = content.second;
+          use_sub_option = false;
+        } else {
+          payload_content_values[content.first] = content.second;
+        }
+      }
+    }
+    if (use_sub_option)
+      json_payload[payload_content.name] = payload_content_values;
+  }
+  return json_payload;
+}
+
+std::string RESTProtocol::getOperation(const C2Payload &payload) {
+  switch (payload.getOperation()) {
+    case Operation::ACKNOWLEDGE:
+      return "acknowledge";
+    case Operation::HEARTBEAT:
+      return "heartbeat";
+    case Operation::RESTART:
+      return "restart";
+    case Operation::DESCRIBE:
+      return "describe";
+    case Operation::STOP:
+      return "stop";
+    case Operation::START:
+      return "start";
+    case Operation::UPDATE:
+      return "update";
+    default:
+      return "heartbeat";
+  }
+}
+
+Operation RESTProtocol::stringToOperation(const std::string str) {
+  std::string op = str;
+  std::transform(str.begin(), str.end(), op.begin(), ::tolower);
+  if (op == "heartbeat") {
+    return Operation::HEARTBEAT;
+  } else if (op == "acknowledge") {
+    return Operation::ACKNOWLEDGE;
+  } else if (op == "update") {
+    return Operation::UPDATE;
+  } else if (op == "describe") {
+    return Operation::DESCRIBE;
+  } else if (op == "restart") {
+    return Operation::RESTART;
+  } else if (op == "clear") {
+    return Operation::CLEAR;
+  } else if (op == "stop") {
+    return Operation::STOP;
+  } else if (op == "start") {
+    return Operation::START;
+  }
+  return Operation::HEARTBEAT;
+}
+
+} /* namespace c2 */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/c2/protocols/RESTReceiver.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/c2/protocols/RESTReceiver.cpp b/libminifi/src/c2/protocols/RESTReceiver.cpp
new file mode 100644
index 0000000..e79ffd7
--- /dev/null
+++ b/libminifi/src/c2/protocols/RESTReceiver.cpp
@@ -0,0 +1,148 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "c2/protocols/RESTReceiver.h"
+#include <algorithm>
+#include <memory>
+#include <utility>
+#include <map>
+#include <string>
+#include <vector>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace c2 {
+
+int log_message(const struct mg_connection *conn, const char *message) {
+  puts(message);
+  return 1;
+}
+
+int ssl_protocol_en(void *ssl_context, void *user_data) {
+  struct ssl_ctx_st *ctx = (struct ssl_ctx_st *) ssl_context;
+  return 0;
+}
+
+RESTReceiver::RESTReceiver(std::string name, uuid_t uuid)
+    : HeartBeatReporter(name, uuid),
+      logger_(logging::LoggerFactory<RESTReceiver>::getLogger()) {
+}
+
+void RESTReceiver::initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<Configure> &configure) {
+  HeartBeatReporter::initialize(controller, configure);
+  logger_->log_debug("Initializing rest receiveer");
+  if (nullptr != configuration_) {
+    std::string listeningPort, rootUri, caCert;
+    configuration_->get("c2.rest.listener.port", listeningPort);
+    configuration_->get("c2.rest.listener.heartbeat.rooturi", rootUri);
+    configuration_->get("c2.rest.listener.cacert", caCert);
+
+    if (!listeningPort.empty() && !rootUri.empty()) {
+      handler = std::unique_ptr<ListeningProtocol>(new ListeningProtocol());
+      if (!caCert.empty()) {
+        listener = std::move(start_webserver(listeningPort, rootUri, dynamic_cast<CivetHandler*>(handler.get()), caCert));
+      } else {
+        listener = std::move(start_webserver(listeningPort, rootUri, dynamic_cast<CivetHandler*>(handler.get())));
+      }
+    }
+  }
+}
+int16_t RESTReceiver::heartbeat(const C2Payload &payload) {
+  std::string operation_request_str = getOperation(payload);
+  std::string outputConfig;
+  Json::Value json_payload;
+  json_payload["operation"] = operation_request_str;
+  if (payload.getIdentifier().length() > 0) {
+    json_payload["operationid"] = payload.getIdentifier();
+  }
+  const std::vector<C2ContentResponse> &content = payload.getContent();
+
+  for (const auto &payload_content : content) {
+    Json::Value payload_content_values;
+    bool use_sub_option = true;
+    if (payload_content.op == payload.getOperation()) {
+      for (auto content : payload_content.operation_arguments) {
+        if (payload_content.operation_arguments.size() == 1 && payload_content.name == content.first) {
+          json_payload[payload_content.name] = content.second;
+          use_sub_option = false;
+        } else {
+          payload_content_values[content.first] = content.second;
+        }
+      }
+    }
+    if (use_sub_option)
+      json_payload[payload_content.name] = payload_content_values;
+  }
+
+  for (const auto &nested_payload : payload.getNestedPayloads()) {
+    json_payload[nested_payload.getLabel()] = serializeJsonPayload(json_payload, nested_payload);
+  }
+
+  Json::StyledWriter writer;
+  outputConfig = writer.write(json_payload);
+  if (handler != nullptr) {
+    logger_->log_debug("Setting %s", outputConfig);
+    handler->setResponse(outputConfig);
+  }
+
+  return 0;
+}
+
+std::unique_ptr<CivetServer> RESTReceiver::start_webserver(const std::string &port, std::string &rooturi, CivetHandler *handler, std::string &ca_cert) {
+  struct mg_callbacks callback;
+
+  memset(&callback, 0, sizeof(callback));
+  callback.init_ssl = ssl_protocol_en;
+  std::string my_port = port;
+  my_port += "s";
+  callback.log_message = log_message;
+  const char *options[] = { "listening_ports", port.c_str(), "ssl_certificate", ca_cert.c_str(), "ssl_protocol_version", "0", "ssl_cipher_list", "ALL",
+      "ssl_verify_peer", "no", "num_threads", "1", 0 };
+
+  std::vector<std::string> cpp_options;
+  for (int i = 0; i < (sizeof(options) / sizeof(options[0]) - 1); i++) {
+    cpp_options.push_back(options[i]);
+  }
+  std::unique_ptr<CivetServer> server = std::unique_ptr<CivetServer>(new CivetServer(cpp_options));
+
+  server->addHandler(rooturi, handler);
+
+  return server;
+}
+
+std::unique_ptr<CivetServer> RESTReceiver::start_webserver(const std::string &port, std::string &rooturi, CivetHandler *handler) {
+  const char *options[] = { "document_root", ".", "listening_ports", port.c_str(), "num_threads", "1", 0 };
+
+  std::vector<std::string> cpp_options;
+  for (int i = 0; i < (sizeof(options) / sizeof(options[0]) - 1); i++) {
+    cpp_options.push_back(options[i]);
+  }
+  std::unique_ptr<CivetServer> server = std::unique_ptr<CivetServer>(new CivetServer(cpp_options));
+
+  server->addHandler(rooturi, handler);
+
+  return server;
+}
+
+} /* namespace c2 */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/c2/protocols/RESTSender.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/c2/protocols/RESTSender.cpp b/libminifi/src/c2/protocols/RESTSender.cpp
new file mode 100644
index 0000000..e15522b
--- /dev/null
+++ b/libminifi/src/c2/protocols/RESTSender.cpp
@@ -0,0 +1,144 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "c2/protocols/RESTSender.h"
+
+#include <algorithm>
+#include <memory>
+#include <utility>
+#include <map>
+#include <string>
+#include <vector>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace c2 {
+
+RESTSender::RESTSender(std::string name, uuid_t uuid)
+    : C2Protocol(name, uuid),
+      logger_(logging::LoggerFactory<Connectable>::getLogger()) {
+}
+
+void RESTSender::initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<Configure> &configure) {
+  C2Protocol::initialize(controller, configure);
+  // base URL when one is not specified.
+  if (nullptr != configure) {
+    configure->get("c2.rest.url", rest_uri_);
+    configure->get("c2.rest.url.ack", ack_uri_);
+  }
+  logger_->log_info("Submitting to %s", rest_uri_);
+}
+C2Payload RESTSender::consumePayload(const std::string &url, const C2Payload &payload, Direction direction, bool async) {
+  std::string operation_request_str = getOperation(payload);
+  std::string outputConfig;
+  if (direction == Direction::TRANSMIT) {
+    Json::Value json_payload;
+    json_payload["operation"] = operation_request_str;
+    if (payload.getIdentifier().length() > 0) {
+      json_payload["operationid"] = payload.getIdentifier();
+    }
+    const std::vector<C2ContentResponse> &content = payload.getContent();
+
+    for (const auto &payload_content : content) {
+      Json::Value payload_content_values;
+      bool use_sub_option = true;
+      if (payload_content.op == payload.getOperation()) {
+        for (auto content : payload_content.operation_arguments) {
+          if (payload_content.operation_arguments.size() == 1 && payload_content.name == content.first) {
+            json_payload[payload_content.name] = content.second;
+            use_sub_option = false;
+          } else {
+            payload_content_values[content.first] = content.second;
+          }
+        }
+      }
+      if (use_sub_option)
+        json_payload[payload_content.name] = payload_content_values;
+    }
+
+    for (const auto &nested_payload : payload.getNestedPayloads()) {
+      json_payload[nested_payload.getLabel()] = serializeJsonPayload(json_payload, nested_payload);
+    }
+
+    Json::StyledWriter writer;
+    outputConfig = writer.write(json_payload);
+  }
+
+  return std::move(sendPayload(url, direction, payload, outputConfig));
+}
+
+C2Payload RESTSender::consumePayload(const C2Payload &payload, Direction direction, bool async) {
+  if (payload.getOperation() == ACKNOWLEDGE) {
+    return consumePayload(ack_uri_, payload, direction, async);
+  }
+  return consumePayload(rest_uri_, payload, direction, async);
+}
+
+void RESTSender::update(const std::shared_ptr<Configure> &configure) {
+  std::string url;
+  configure->get("c2.rest.url", url);
+  configure->get("c2.rest.url.ack", url);
+}
+
+const C2Payload RESTSender::sendPayload(const std::string url, const Direction direction, const C2Payload &payload, const std::string outputConfig) {
+  if (!url.empty()) {
+    utils::HTTPClient client(url, ssl_context_service_);
+    client.setConnectionTimeout(2);
+
+    std::unique_ptr<utils::ByteInputCallBack> input = nullptr;
+    std::unique_ptr<utils::HTTPUploadCallback> callback = nullptr;
+    if (direction == Direction::TRANSMIT) {
+      input = std::unique_ptr<utils::ByteInputCallBack>(new utils::ByteInputCallBack());
+      callback = std::unique_ptr<utils::HTTPUploadCallback>(new utils::HTTPUploadCallback);
+      input->write(outputConfig);
+      callback->ptr = input.get();
+      callback->pos = 0;
+      client.set_request_method("POST");
+      client.setUploadCallback(callback.get());
+    } else {
+      // we do not need to set the uplaod callback
+      // since we are not uploading anything on a get
+      client.set_request_method("GET");
+    }
+    client.setContentType("application/json");
+    bool isOkay = client.submit();
+    int64_t respCode = client.getResponseCode();
+
+    if (isOkay && respCode) {
+      if (payload.isRaw()) {
+        C2Payload response_payload(payload.getOperation(), state::UpdateState::READ_COMPLETE, true, true);
+
+        response_payload.setRawData(client.getResponseBody());
+        return std::move(response_payload);
+      }
+      return parseJsonResponse(payload, client.getResponseBody());
+    } else {
+      return C2Payload(payload.getOperation(), state::UpdateState::READ_ERROR, true);
+    }
+  } else {
+    return C2Payload(payload.getOperation(), state::UpdateState::READ_COMPLETE, true);
+  }
+}
+
+} /* namespace c2 */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/controllers/SSLContextService.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/controllers/SSLContextService.cpp b/libminifi/src/controllers/SSLContextService.cpp
index 95ccbb0..e8dc520 100644
--- a/libminifi/src/controllers/SSLContextService.cpp
+++ b/libminifi/src/controllers/SSLContextService.cpp
@@ -35,11 +35,13 @@ void SSLContextService::initialize() {
   if (initialized_)
     return;
 
-  std::lock_guard < std::mutex > lock(initialization_mutex_);
+  std::lock_guard<std::mutex> lock(initialization_mutex_);
 
   ControllerService::initialize();
 
   initializeTLS();
+
+  initialized_ = true;
 }
 
 std::unique_ptr<SSLContext> SSLContextService::createSSLContext() {
@@ -65,8 +67,7 @@ std::unique_ptr<SSLContext> SSLContextService::createSSLContext() {
   if (!IsNullOrEmpty(private_key_)) {
     int retp = SSL_CTX_use_PrivateKey_file(ctx, private_key_.c_str(), SSL_FILETYPE_PEM);
     if (retp != 1) {
-      logger_->log_error("Could not create load private key,%i on %s error : %s", retp, private_key_,
-                         std::strerror(errno));
+      logger_->log_error("Could not create load private key,%i on %s error : %s", retp, private_key_, std::strerror(errno));
       return nullptr;
     }
 
@@ -80,31 +81,31 @@ std::unique_ptr<SSLContext> SSLContextService::createSSLContext() {
   if (retp == 0) {
     logger_->log_error("Can not load CA certificate, Exiting, error : %s", std::strerror(errno));
   }
-  return std::unique_ptr < SSLContext > (new SSLContext(ctx));
+  return std::unique_ptr<SSLContext>(new SSLContext(ctx));
 }
 
 const std::string &SSLContextService::getCertificateFile() {
-  std::lock_guard < std::mutex > lock(initialization_mutex_);
+  std::lock_guard<std::mutex> lock(initialization_mutex_);
   return certificate;
 }
 
 const std::string &SSLContextService::getPassphrase() {
-  std::lock_guard < std::mutex > lock(initialization_mutex_);
+  std::lock_guard<std::mutex> lock(initialization_mutex_);
   return passphrase_;
 }
 
 const std::string &SSLContextService::getPassphraseFile() {
-  std::lock_guard < std::mutex > lock(initialization_mutex_);
+  std::lock_guard<std::mutex> lock(initialization_mutex_);
   return passphrase_file_;
 }
 
 const std::string &SSLContextService::getPrivateKeyFile() {
-  std::lock_guard < std::mutex > lock(initialization_mutex_);
+  std::lock_guard<std::mutex> lock(initialization_mutex_);
   return private_key_;
 }
 
 const std::string &SSLContextService::getCACertificate() {
-  std::lock_guard < std::mutex > lock(initialization_mutex_);
+  std::lock_guard<std::mutex> lock(initialization_mutex_);
   return ca_certificate_;
 }
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/core/ClassLoader.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/ClassLoader.cpp b/libminifi/src/core/ClassLoader.cpp
index fbd46f6..9bead0e 100644
--- a/libminifi/src/core/ClassLoader.cpp
+++ b/libminifi/src/core/ClassLoader.cpp
@@ -43,7 +43,7 @@ uint16_t ClassLoader::registerResource(const std::string &resource) {
     logger_->log_error("Cannot load library: %s", dlerror());
     return RESOURCE_FAILURE;
   } else {
-    std::lock_guard < std::mutex > lock(internal_mutex_);
+    std::lock_guard<std::mutex> lock(internal_mutex_);
     dl_handles_.push_back(resource_ptr);
   }
 
@@ -60,9 +60,9 @@ uint16_t ClassLoader::registerResource(const std::string &resource) {
 
   ObjectFactory *factory = create_factory_func();
 
-  std::lock_guard < std::mutex > lock(internal_mutex_);
+  std::lock_guard<std::mutex> lock(internal_mutex_);
 
-  loaded_factories_[factory->getClassName()] = std::unique_ptr < ObjectFactory > (factory);
+  loaded_factories_[factory->getClassName()] = std::unique_ptr<ObjectFactory>(factory);
 
   return RESOURCE_SUCCESS;
 }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/core/ConfigurableComponent.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/ConfigurableComponent.cpp b/libminifi/src/core/ConfigurableComponent.cpp
index 62a08db..b9a3a79 100644
--- a/libminifi/src/core/ConfigurableComponent.cpp
+++ b/libminifi/src/core/ConfigurableComponent.cpp
@@ -43,7 +43,7 @@ ConfigurableComponent::~ConfigurableComponent() {
 }
 
 bool ConfigurableComponent::getProperty(const std::string &name, Property &prop) {
-  std::lock_guard < std::mutex > lock(configuration_mutex_);
+  std::lock_guard<std::mutex> lock(configuration_mutex_);
 
   auto &&it = properties_.find(name);
 
@@ -62,13 +62,13 @@ bool ConfigurableComponent::getProperty(const std::string &name, Property &prop)
  * @return result of getting property.
  */
 bool ConfigurableComponent::getProperty(const std::string name, std::string &value) {
-  std::lock_guard < std::mutex > lock(configuration_mutex_);
+  std::lock_guard<std::mutex> lock(configuration_mutex_);
 
   auto &&it = properties_.find(name);
   if (it != properties_.end()) {
     Property item = it->second;
     value = item.getValue();
-    logger_->log_info("Processor %s property name %s value %s", name, item.getName(), value);
+    logger_->log_info("Component %s property name %s value %s", name, item.getName(), value);
     return true;
   } else {
     return false;
@@ -81,7 +81,7 @@ bool ConfigurableComponent::getProperty(const std::string name, std::string &val
  * @return result of setting property.
  */
 bool ConfigurableComponent::setProperty(const std::string name, std::string value) {
-  std::lock_guard < std::mutex > lock(configuration_mutex_);
+  std::lock_guard<std::mutex> lock(configuration_mutex_);
   auto &&it = properties_.find(name);
 
   if (it != properties_.end()) {
@@ -102,7 +102,7 @@ bool ConfigurableComponent::setProperty(const std::string name, std::string valu
  * @return result of setting property.
  */
 bool ConfigurableComponent::updateProperty(const std::string &name, const std::string &value) {
-  std::lock_guard < std::mutex > lock(configuration_mutex_);
+  std::lock_guard<std::mutex> lock(configuration_mutex_);
   auto &&it = properties_.find(name);
 
   if (it != properties_.end()) {
@@ -123,7 +123,7 @@ bool ConfigurableComponent::updateProperty(const std::string &name, const std::s
  * @return whether property was set or not
  */
 bool ConfigurableComponent::setProperty(Property &prop, std::string value) {
-  std::lock_guard < std::mutex > lock(configuration_mutex_);
+  std::lock_guard<std::mutex> lock(configuration_mutex_);
   auto it = properties_.find(prop.getName());
 
   if (it != properties_.end()) {
@@ -151,7 +151,7 @@ bool ConfigurableComponent::setSupportedProperties(std::set<Property> properties
     return false;
   }
 
-  std::lock_guard < std::mutex > lock(configuration_mutex_);
+  std::lock_guard<std::mutex> lock(configuration_mutex_);
 
   properties_.clear();
   for (auto item : properties) {

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/core/ConfigurationFactory.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/ConfigurationFactory.cpp b/libminifi/src/core/ConfigurationFactory.cpp
index 0a0e911..1640380 100644
--- a/libminifi/src/core/ConfigurationFactory.cpp
+++ b/libminifi/src/core/ConfigurationFactory.cpp
@@ -49,22 +49,22 @@ std::unique_ptr<core::FlowConfiguration> createFlowConfiguration(std::shared_ptr
     if (class_name_lc == "flowconfiguration") {
       // load the base configuration.
 
-      return std::unique_ptr < core::FlowConfiguration > (new core::FlowConfiguration(repo, flow_file_repo, content_repo, stream_factory, configure, path));
+      return std::unique_ptr<core::FlowConfiguration>(new core::FlowConfiguration(repo, flow_file_repo, content_repo, stream_factory, configure, path));
 
     } else if (class_name_lc == "yamlconfiguration") {
       // only load if the class is defined.
-      return std::unique_ptr < core::FlowConfiguration > (instantiate<core::YamlConfiguration>(repo, flow_file_repo, content_repo, stream_factory, configure, path));
+      return std::unique_ptr<core::FlowConfiguration>(instantiate<core::YamlConfiguration>(repo, flow_file_repo, content_repo, stream_factory, configure, path));
 
     } else {
       if (fail_safe) {
-        return std::unique_ptr < core::FlowConfiguration > (new core::FlowConfiguration(repo, flow_file_repo, content_repo, stream_factory, configure, path));
+        return std::unique_ptr<core::FlowConfiguration>(new core::FlowConfiguration(repo, flow_file_repo, content_repo, stream_factory, configure, path));
       } else {
         throw std::runtime_error("Support for the provided configuration class could not be found");
       }
     }
   } catch (const std::runtime_error &r) {
     if (fail_safe) {
-      return std::unique_ptr < core::FlowConfiguration > (new core::FlowConfiguration(repo, flow_file_repo, content_repo, stream_factory, configure, path));
+      return std::unique_ptr<core::FlowConfiguration>(new core::FlowConfiguration(repo, flow_file_repo, content_repo, stream_factory, configure, path));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/core/Connectable.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/Connectable.cpp b/libminifi/src/core/Connectable.cpp
index 9c3b26a..cf01f0c 100644
--- a/libminifi/src/core/Connectable.cpp
+++ b/libminifi/src/core/Connectable.cpp
@@ -53,7 +53,7 @@ bool Connectable::setSupportedRelationships(std::set<core::Relationship> relatio
     return false;
   }
 
-  std::lock_guard < std::mutex > lock(relationship_mutex_);
+  std::lock_guard<std::mutex> lock(relationship_mutex_);
 
   relationships_.clear();
   for (auto item : relationships) {
@@ -67,7 +67,7 @@ bool Connectable::setSupportedRelationships(std::set<core::Relationship> relatio
 bool Connectable::isSupportedRelationship(core::Relationship relationship) {
   const bool requiresLock = isRunning();
 
-  const auto conditionalLock = !requiresLock ? std::unique_lock<std::mutex>() : std::unique_lock < std::mutex > (relationship_mutex_);
+  const auto conditionalLock = !requiresLock ? std::unique_lock<std::mutex>() : std::unique_lock<std::mutex>(relationship_mutex_);
 
   const auto &it = relationships_.find(relationship.getName());
   if (it != relationships_.end()) {
@@ -83,7 +83,7 @@ bool Connectable::setAutoTerminatedRelationships(std::set<Relationship> relation
     return false;
   }
 
-  std::lock_guard < std::mutex > lock(relationship_mutex_);
+  std::lock_guard<std::mutex> lock(relationship_mutex_);
 
   auto_terminated_relationships_.clear();
   for (auto item : relationships) {
@@ -97,7 +97,7 @@ bool Connectable::setAutoTerminatedRelationships(std::set<Relationship> relation
 bool Connectable::isAutoTerminated(core::Relationship relationship) {
   const bool requiresLock = isRunning();
 
-  const auto conditionalLock = !requiresLock ? std::unique_lock<std::mutex>() : std::unique_lock < std::mutex > (relationship_mutex_);
+  const auto conditionalLock = !requiresLock ? std::unique_lock<std::mutex>() : std::unique_lock<std::mutex>(relationship_mutex_);
 
   const auto &it = auto_terminated_relationships_.find(relationship.getName());
   if (it != auto_terminated_relationships_.end()) {
@@ -111,7 +111,7 @@ void Connectable::waitForWork(uint64_t timeoutMs) {
   has_work_.store(isWorkAvailable());
 
   if (!has_work_.load()) {
-    std::unique_lock < std::mutex > lock(work_available_mutex_);
+    std::unique_lock<std::mutex> lock(work_available_mutex_);
     work_condition_.wait_for(lock, std::chrono::milliseconds(timeoutMs), [&] {return has_work_.load();});
   }
 }
@@ -143,7 +143,7 @@ std::set<std::shared_ptr<Connectable>> Connectable::getOutGoingConnections(std::
 }
 
 std::shared_ptr<Connectable> Connectable::getNextIncomingConnection() {
-  std::lock_guard < std::mutex > lock(relationship_mutex_);
+  std::lock_guard<std::mutex> lock(relationship_mutex_);
 
   if (_incomingConnections.size() == 0)
     return NULL;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/core/FlowConfiguration.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/FlowConfiguration.cpp b/libminifi/src/core/FlowConfiguration.cpp
index e8e7462..9ce7146 100644
--- a/libminifi/src/core/FlowConfiguration.cpp
+++ b/libminifi/src/core/FlowConfiguration.cpp
@@ -35,11 +35,12 @@ std::shared_ptr<core::Processor> FlowConfiguration::createProcessor(std::string
   if (nullptr == ptr) {
     logger_->log_error("No Processor defined for %s", name.c_str());
   }
-  std::shared_ptr<core::Processor> processor = std::static_pointer_cast < core::Processor > (ptr);
+  std::shared_ptr<core::Processor> processor = std::static_pointer_cast<core::Processor>(ptr);
 
   // initialize the processor
   processor->initialize();
 
+  processor->setStreamFactory(stream_factory_);
   return processor;
 }
 
@@ -54,15 +55,15 @@ std::shared_ptr<core::Processor> FlowConfiguration::createProvenanceReportTask()
 }
 
 std::unique_ptr<core::ProcessGroup> FlowConfiguration::createRootProcessGroup(std::string name, uuid_t uuid, int version) {
-  return std::unique_ptr < core::ProcessGroup > (new core::ProcessGroup(core::ROOT_PROCESS_GROUP, name, uuid, version));
+  return std::unique_ptr<core::ProcessGroup>(new core::ProcessGroup(core::ROOT_PROCESS_GROUP, name, uuid, version));
 }
 
 std::unique_ptr<core::ProcessGroup> FlowConfiguration::createRemoteProcessGroup(std::string name, uuid_t uuid) {
-  return std::unique_ptr < core::ProcessGroup > (new core::ProcessGroup(core::REMOTE_PROCESS_GROUP, name, uuid));
+  return std::unique_ptr<core::ProcessGroup>(new core::ProcessGroup(core::REMOTE_PROCESS_GROUP, name, uuid));
 }
 
 std::shared_ptr<minifi::Connection> FlowConfiguration::createConnection(std::string name, uuid_t uuid) {
-  return std::make_shared < minifi::Connection > (flow_file_repo_, content_repo_, name, uuid);
+  return std::make_shared<minifi::Connection>(flow_file_repo_, content_repo_, name, uuid);
 }
 
 std::shared_ptr<core::controller::ControllerServiceNode> FlowConfiguration::createControllerService(const std::string &class_name, const std::string &name, uuid_t uuid) {

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/core/FlowFile.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/FlowFile.cpp b/libminifi/src/core/FlowFile.cpp
index 6afd0fe..d08ea4b 100644
--- a/libminifi/src/core/FlowFile.cpp
+++ b/libminifi/src/core/FlowFile.cpp
@@ -45,6 +45,7 @@ FlowFile::FlowFile()
       connection_(nullptr),
       original_connection_() {
   entry_date_ = getTimeMillis();
+  event_time_ = entry_date_;
   lineage_start_date_ = entry_date_;
 
   char uuidStr[37] = { 0 };

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/core/ProcessGroup.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/ProcessGroup.cpp b/libminifi/src/core/ProcessGroup.cpp
index db0fe08..a537f1a 100644
--- a/libminifi/src/core/ProcessGroup.cpp
+++ b/libminifi/src/core/ProcessGroup.cpp
@@ -69,12 +69,12 @@ ProcessGroup::~ProcessGroup() {
 }
 
 bool ProcessGroup::isRootProcessGroup() {
-  std::lock_guard < std::recursive_mutex > lock(mutex_);
+  std::lock_guard<std::recursive_mutex> lock(mutex_);
   return (type_ == ROOT_PROCESS_GROUP);
 }
 
 void ProcessGroup::addProcessor(std::shared_ptr<Processor> processor) {
-  std::lock_guard < std::recursive_mutex > lock(mutex_);
+  std::lock_guard<std::recursive_mutex> lock(mutex_);
 
   if (processors_.find(processor) == processors_.end()) {
     // We do not have the same processor in this process group yet
@@ -84,7 +84,7 @@ void ProcessGroup::addProcessor(std::shared_ptr<Processor> processor) {
 }
 
 void ProcessGroup::removeProcessor(std::shared_ptr<Processor> processor) {
-  std::lock_guard < std::recursive_mutex > lock(mutex_);
+  std::lock_guard<std::recursive_mutex> lock(mutex_);
 
   if (processors_.find(processor) != processors_.end()) {
     // We do have the same processor in this process group yet
@@ -94,7 +94,7 @@ void ProcessGroup::removeProcessor(std::shared_ptr<Processor> processor) {
 }
 
 void ProcessGroup::addProcessGroup(ProcessGroup *child) {
-  std::lock_guard < std::recursive_mutex > lock(mutex_);
+  std::lock_guard<std::recursive_mutex> lock(mutex_);
 
   if (child_process_groups_.find(child) == child_process_groups_.end()) {
     // We do not have the same child process group in this process group yet
@@ -104,7 +104,7 @@ void ProcessGroup::addProcessGroup(ProcessGroup *child) {
 }
 
 void ProcessGroup::removeProcessGroup(ProcessGroup *child) {
-  std::lock_guard < std::recursive_mutex > lock(mutex_);
+  std::lock_guard<std::recursive_mutex> lock(mutex_);
 
   if (child_process_groups_.find(child) != child_process_groups_.end()) {
     // We do have the same child process group in this process group yet
@@ -114,7 +114,7 @@ void ProcessGroup::removeProcessGroup(ProcessGroup *child) {
 }
 
 void ProcessGroup::startProcessing(TimerDrivenSchedulingAgent *timeScheduler, EventDrivenSchedulingAgent *eventScheduler) {
-  std::lock_guard < std::recursive_mutex > lock(mutex_);
+  std::lock_guard<std::recursive_mutex> lock(mutex_);
 
   try {
     // Start all the processor node, input and output ports
@@ -142,7 +142,7 @@ void ProcessGroup::startProcessing(TimerDrivenSchedulingAgent *timeScheduler, Ev
 }
 
 void ProcessGroup::stopProcessing(TimerDrivenSchedulingAgent *timeScheduler, EventDrivenSchedulingAgent *eventScheduler) {
-  std::lock_guard < std::recursive_mutex > lock(mutex_);
+  std::lock_guard<std::recursive_mutex> lock(mutex_);
 
   try {
     // Stop all the processor node, input and output ports
@@ -168,7 +168,7 @@ void ProcessGroup::stopProcessing(TimerDrivenSchedulingAgent *timeScheduler, Eve
 }
 
 std::shared_ptr<Processor> ProcessGroup::findProcessor(uuid_t uuid) {
-  std::lock_guard < std::recursive_mutex > lock(mutex_);
+  std::lock_guard<std::recursive_mutex> lock(mutex_);
   std::shared_ptr<Processor> ret = NULL;
   for (auto processor : processors_) {
     logger_->log_info("find processor %s", processor->getName().c_str());
@@ -207,8 +207,21 @@ std::shared_ptr<core::controller::ControllerServiceNode> ProcessGroup::findContr
   return controller_service_map_.getControllerServiceNode(nodeId);
 }
 
+void ProcessGroup::getAllProcessors(std::vector<std::shared_ptr<Processor>> &processor_vec) {
+  std::lock_guard<std::recursive_mutex> lock(mutex_);
+  std::shared_ptr<Processor> ret = NULL;
+
+  for (auto processor : processors_) {
+    logger_->log_debug("Current processor is %s", processor->getName().c_str());
+    processor_vec.push_back(processor);
+  }
+  for (auto processGroup : child_process_groups_) {
+    processGroup->getAllProcessors(processor_vec);
+  }
+}
+
 std::shared_ptr<Processor> ProcessGroup::findProcessor(const std::string &processorName) {
-  std::lock_guard < std::recursive_mutex > lock(mutex_);
+  std::lock_guard<std::recursive_mutex> lock(mutex_);
   std::shared_ptr<Processor> ret = NULL;
   for (auto processor : processors_) {
     logger_->log_debug("Current processor is %s", processor->getName().c_str());
@@ -224,7 +237,7 @@ std::shared_ptr<Processor> ProcessGroup::findProcessor(const std::string &proces
 }
 
 void ProcessGroup::updatePropertyValue(std::string processorName, std::string propertyName, std::string propertyValue) {
-  std::lock_guard < std::recursive_mutex > lock(mutex_);
+  std::lock_guard<std::recursive_mutex> lock(mutex_);
   for (auto processor : processors_) {
     if (processor->getName() == processorName) {
       processor->setProperty(propertyName, propertyValue);
@@ -239,6 +252,7 @@ void ProcessGroup::updatePropertyValue(std::string processorName, std::string pr
 void ProcessGroup::getConnections(std::map<std::string, std::shared_ptr<Connection>> &connectionMap) {
   for (auto connection : connections_) {
     connectionMap[connection->getUUIDStr()] = connection;
+    connectionMap[connection->getName()] = connection;
   }
   for (auto processGroup : child_process_groups_) {
     processGroup->getConnections(connectionMap);
@@ -246,7 +260,7 @@ void ProcessGroup::getConnections(std::map<std::string, std::shared_ptr<Connecti
 }
 
 void ProcessGroup::addConnection(std::shared_ptr<Connection> connection) {
-  std::lock_guard < std::recursive_mutex > lock(mutex_);
+  std::lock_guard<std::recursive_mutex> lock(mutex_);
 
   if (connections_.find(connection) == connections_.end()) {
     // We do not have the same connection in this process group yet
@@ -268,7 +282,7 @@ void ProcessGroup::addConnection(std::shared_ptr<Connection> connection) {
 }
 
 void ProcessGroup::removeConnection(std::shared_ptr<Connection> connection) {
-  std::lock_guard < std::recursive_mutex > lock(mutex_);
+  std::lock_guard<std::recursive_mutex> lock(mutex_);
 
   if (connections_.find(connection) != connections_.end()) {
     // We do not have the same connection in this process group yet

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/core/ProcessSession.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp
index b3035cb..e3799e1 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -44,7 +44,7 @@ std::shared_ptr<core::FlowFile> ProcessSession::create() {
   _addedFlowFiles[record->getUUIDStr()] = record;
   logger_->log_debug("Create FlowFile with UUID %s", record->getUUIDStr().c_str());
   std::stringstream details;
-  details << process_context_->getProcessorNode().getName() << " creates flow record " << record->getUUIDStr();
+  details << process_context_->getProcessorNode()->getName() << " creates flow record " << record->getUUIDStr();
   provenance_report_->create(record, details.str());
 
   return record;
@@ -160,52 +160,54 @@ std::shared_ptr<core::FlowFile> ProcessSession::clone(std::shared_ptr<core::Flow
 
 void ProcessSession::remove(std::shared_ptr<core::FlowFile> &flow) {
   flow->setDeleted(true);
+  process_context_->getFlowFileRepository()->Delete(flow->getUUIDStr());
   _deletedFlowFiles[flow->getUUIDStr()] = flow;
-  std::string reason = process_context_->getProcessorNode().getName() + " drop flow record " + flow->getUUIDStr();
+  std::string reason = process_context_->getProcessorNode()->getName() + " drop flow record " + flow->getUUIDStr();
   provenance_report_->drop(flow, reason);
 }
 
 void ProcessSession::remove(std::shared_ptr<core::FlowFile> &&flow) {
   flow->setDeleted(true);
+  process_context_->getFlowFileRepository()->Delete(flow->getUUIDStr());
   _deletedFlowFiles[flow->getUUIDStr()] = flow;
-  std::string reason = process_context_->getProcessorNode().getName() + " drop flow record " + flow->getUUIDStr();
+  std::string reason = process_context_->getProcessorNode()->getName() + " drop flow record " + flow->getUUIDStr();
   provenance_report_->drop(flow, reason);
 }
 
 void ProcessSession::putAttribute(std::shared_ptr<core::FlowFile> &flow, std::string key, std::string value) {
   flow->setAttribute(key, value);
   std::stringstream details;
-  details << process_context_->getProcessorNode().getName() << " modify flow record " << flow->getUUIDStr() << " attribute " << key << ":" << value;
+  details << process_context_->getProcessorNode()->getName() << " modify flow record " << flow->getUUIDStr() << " attribute " << key << ":" << value;
   provenance_report_->modifyAttributes(flow, details.str());
 }
 
 void ProcessSession::removeAttribute(std::shared_ptr<core::FlowFile> &flow, std::string key) {
   flow->removeAttribute(key);
   std::stringstream details;
-  details << process_context_->getProcessorNode().getName() << " remove flow record " << flow->getUUIDStr() << " attribute " + key;
+  details << process_context_->getProcessorNode()->getName() << " remove flow record " << flow->getUUIDStr() << " attribute " + key;
   provenance_report_->modifyAttributes(flow, details.str());
 }
 
 void ProcessSession::putAttribute(std::shared_ptr<core::FlowFile> &&flow, std::string key, std::string value) {
   flow->setAttribute(key, value);
   std::stringstream details;
-  details << process_context_->getProcessorNode().getName() << " modify flow record " << flow->getUUIDStr() << " attribute " << key << ":" << value;
+  details << process_context_->getProcessorNode()->getName() << " modify flow record " << flow->getUUIDStr() << " attribute " << key << ":" << value;
   provenance_report_->modifyAttributes(flow, details.str());
 }
 
 void ProcessSession::removeAttribute(std::shared_ptr<core::FlowFile> &&flow, std::string key) {
   flow->removeAttribute(key);
   std::stringstream details;
-  details << process_context_->getProcessorNode().getName() << " remove flow record " << flow->getUUIDStr() << " attribute " << key;
+  details << process_context_->getProcessorNode()->getName() << " remove flow record " << flow->getUUIDStr() << " attribute " << key;
   provenance_report_->modifyAttributes(flow, details.str());
 }
 
 void ProcessSession::penalize(std::shared_ptr<core::FlowFile> &flow) {
-  flow->setPenaltyExpiration(getTimeMillis() + process_context_->getProcessorNode().getPenalizationPeriodMsec());
+  flow->setPenaltyExpiration(getTimeMillis() + process_context_->getProcessorNode()->getPenalizationPeriodMsec());
 }
 
 void ProcessSession::penalize(std::shared_ptr<core::FlowFile> &&flow) {
-  flow->setPenaltyExpiration(getTimeMillis() + process_context_->getProcessorNode().getPenalizationPeriodMsec());
+  flow->setPenaltyExpiration(getTimeMillis() + process_context_->getProcessorNode()->getPenalizationPeriodMsec());
 }
 
 void ProcessSession::transfer(std::shared_ptr<core::FlowFile> &flow, Relationship relationship) {
@@ -222,7 +224,6 @@ void ProcessSession::write(std::shared_ptr<core::FlowFile> &flow, OutputStreamCa
   try {
     uint64_t startTime = getTimeMillis();
     claim->increaseFlowFileRecordOwnedCount();
-//    fs.open(claim->getContentFullPath().c_str(), std::fstream::out | std::fstream::binary | std::fstream::trunc);
     std::shared_ptr<io::BaseStream> stream = process_context_->getContentRepository()->write(claim);
     // Call the callback to write the content
     if (nullptr == stream) {
@@ -244,12 +245,9 @@ void ProcessSession::write(std::shared_ptr<core::FlowFile> &flow, OutputStreamCa
     }
     flow->setResourceClaim(claim);
 
-    /*
-     logger_->log_debug("Write offset %d length %d into content %s for FlowFile UUID %s",
-     flow->_offset, flow->_size, flow->_claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */
     stream->closeStream();
     std::stringstream details;
-    details << process_context_->getProcessorNode().getName() << " modify flow record content " << flow->getUUIDStr();
+    details << process_context_->getProcessorNode()->getName() << " modify flow record content " << flow->getUUIDStr();
     uint64_t endTime = getTimeMillis();
     provenance_report_->modifyContent(flow, details.str(), endTime - startTime);
   } catch (std::exception &exception) {
@@ -295,7 +293,7 @@ void ProcessSession::write(std::shared_ptr<core::FlowFile> &&flow, OutputStreamC
     flow->setResourceClaim(claim);
 
     std::stringstream details;
-    details << process_context_->getProcessorNode().getName() << " modify flow record content " << flow->getUUIDStr();
+    details << process_context_->getProcessorNode()->getName() << " modify flow record content " << flow->getUUIDStr();
     uint64_t endTime = getTimeMillis();
     provenance_report_->modifyContent(flow, details.str(), endTime - startTime);
   } catch (std::exception &exception) {
@@ -342,7 +340,7 @@ void ProcessSession::append(std::shared_ptr<core::FlowFile> &&flow, OutputStream
     uint64_t appendSize = stream->getSize() - oldPos;
     flow->setSize(flow->getSize() + appendSize);
     std::stringstream details;
-    details << process_context_->getProcessorNode().getName() << " modify flow record content " << flow->getUUIDStr();
+    details << process_context_->getProcessorNode()->getName() << " modify flow record content " << flow->getUUIDStr();
     uint64_t endTime = getTimeMillis();
     provenance_report_->modifyContent(flow, details.str(), endTime - startTime);
   } catch (std::exception &exception) {
@@ -382,7 +380,7 @@ void ProcessSession::append(std::shared_ptr<core::FlowFile> &flow, OutputStreamC
     flow->setSize(flow->getSize() + appendSize);
 
     std::stringstream details;
-    details << process_context_->getProcessorNode().getName() << " modify flow record content " << flow->getUUIDStr();
+    details << process_context_->getProcessorNode()->getName() << " modify flow record content " << flow->getUUIDStr();
     uint64_t endTime = getTimeMillis();
     provenance_report_->modifyContent(flow, details.str(), endTime - startTime);
   } catch (std::exception &exception) {
@@ -440,12 +438,14 @@ void ProcessSession::read(std::shared_ptr<core::FlowFile> &&flow, InputStreamCal
     std::shared_ptr<io::BaseStream> stream = process_context_->getContentRepository()->read(claim);
 
     if (nullptr == stream) {
+      logger_->log_info("claim does not exist");
       rollback();
       return;
     }
     stream->seek(flow->getOffset());
 
     if (callback->process(stream) < 0) {
+      logger_->log_info("no data written from stream");
       rollback();
       return;
     }
@@ -511,7 +511,7 @@ void ProcessSession::importFrom(io::DataStream &stream, std::shared_ptr<core::Fl
 
     content_stream->closeStream();
     std::stringstream details;
-    details << process_context_->getProcessorNode().getName() << " modify flow record content " << flow->getUUIDStr();
+    details << process_context_->getProcessorNode()->getName() << " modify flow record content " << flow->getUUIDStr();
     uint64_t endTime = getTimeMillis();
     provenance_report_->modifyContent(flow, details.str(), endTime - startTime);
   } catch (std::exception &exception) {
@@ -585,7 +585,7 @@ void ProcessSession::import(std::string source, std::shared_ptr<core::FlowFile>
         if (!keepSource)
           std::remove(source.c_str());
         std::stringstream details;
-        details << process_context_->getProcessorNode().getName() << " modify flow record content " << flow->getUUIDStr();
+        details << process_context_->getProcessorNode()->getName() << " modify flow record content " << flow->getUUIDStr();
         uint64_t endTime = getTimeMillis();
         provenance_report_->modifyContent(flow, details.str(), endTime - startTime);
       } else {
@@ -617,7 +617,7 @@ void ProcessSession::import(std::string source, std::shared_ptr<core::FlowFile>
   }
 }
 
-void ProcessSession::import(std::string source, std::vector<std::shared_ptr<FlowFileRecord>> flows, bool keepSource, uint64_t offset, char inputDelimiter) {
+void ProcessSession::import(std::string source, std::vector<std::shared_ptr<FlowFileRecord>> &flows, bool keepSource, uint64_t offset, char inputDelimiter) {
   std::shared_ptr<ResourceClaim> claim;
 
   std::shared_ptr<FlowFileRecord> flowFile;
@@ -640,6 +640,7 @@ void ProcessSession::import(std::string source, std::vector<std::shared_ptr<Flow
         uint64_t startTime = getTimeMillis();
         input.getline(buf, size, inputDelimiter);
 
+        size_t bufsize = strlen(buf);
         std::shared_ptr<io::BaseStream> stream = process_context_->getContentRepository()->write(claim);
         if (nullptr == stream) {
           logger_->log_debug("Stream is null");
@@ -648,7 +649,7 @@ void ProcessSession::import(std::string source, std::vector<std::shared_ptr<Flow
         }
 
         if (input) {
-          if (stream->write(reinterpret_cast<uint8_t*>(buf), size) < 0) {
+          if (stream->write(reinterpret_cast<uint8_t*>(buf), bufsize) < 0) {
             invalidWrite = true;
             break;
           }
@@ -670,13 +671,11 @@ void ProcessSession::import(std::string source, std::vector<std::shared_ptr<Flow
           flowFile->setResourceClaim(claim);
           claim->increaseFlowFileRecordOwnedCount();
 
-          logger_->log_debug("Import offset %d length %d into content %s for FlowFile UUID %s", flowFile->getOffset(),
-                             flowFile->getSize(),
-                             flowFile->getResourceClaim()->getContentFullPath().c_str(),
+          logger_->log_debug("Import offset %d length %d into content %s for FlowFile UUID %s", flowFile->getOffset(), flowFile->getSize(), flowFile->getResourceClaim()->getContentFullPath().c_str(),
                              flowFile->getUUIDStr().c_str());
 
           stream->closeStream();
-          std::string details = process_context_->getProcessorNode().getName() + " modify flow record content " + flowFile->getUUIDStr();
+          std::string details = process_context_->getProcessorNode()->getName() + " modify flow record content " + flowFile->getUUIDStr();
           uint64_t endTime = getTimeMillis();
           provenance_report_->modifyContent(flowFile, details, endTime - startTime);
           flows.push_back(flowFile);
@@ -768,7 +767,7 @@ void ProcessSession::import(std::string source, std::shared_ptr<core::FlowFile>
         if (!keepSource)
           std::remove(source.c_str());
         std::stringstream details;
-        details << process_context_->getProcessorNode().getName() << " modify flow record content " << flow->getUUIDStr();
+        details << process_context_->getProcessorNode()->getName() << " modify flow record content " << flow->getUUIDStr();
         uint64_t endTime = getTimeMillis();
         provenance_report_->modifyContent(flow, details.str(), endTime - startTime);
       } else {
@@ -811,10 +810,10 @@ void ProcessSession::commit() {
       if (itRelationship != _transferRelationship.end()) {
         Relationship relationship = itRelationship->second;
         // Find the relationship, we need to find the connections for that relationship
-        std::set<std::shared_ptr<Connectable>> connections = process_context_->getProcessorNode().getOutGoingConnections(relationship.getName());
+        std::set<std::shared_ptr<Connectable>> connections = process_context_->getProcessorNode()->getOutGoingConnections(relationship.getName());
         if (connections.empty()) {
           // No connection
-          if (!process_context_->getProcessorNode().isAutoTerminated(relationship)) {
+          if (!process_context_->getProcessorNode()->isAutoTerminated(relationship)) {
             // Not autoterminate, we should have the connect
             std::string message = "Connect empty for non auto terminated relationship" + relationship.getName();
             throw Exception(PROCESS_SESSION_EXCEPTION, message.c_str());
@@ -855,10 +854,10 @@ void ProcessSession::commit() {
       if (itRelationship != _transferRelationship.end()) {
         Relationship relationship = itRelationship->second;
         // Find the relationship, we need to find the connections for that relationship
-        std::set<std::shared_ptr<Connectable>> connections = process_context_->getProcessorNode().getOutGoingConnections(relationship.getName());
+        std::set<std::shared_ptr<Connectable>> connections = process_context_->getProcessorNode()->getOutGoingConnections(relationship.getName());
         if (connections.empty()) {
           // No connection
-          if (!process_context_->getProcessorNode().isAutoTerminated(relationship)) {
+          if (!process_context_->getProcessorNode()->isAutoTerminated(relationship)) {
             // Not autoterminate, we should have the connect
             std::string message = "Connect empty for non auto terminated relationship " + relationship.getName();
             throw Exception(PROCESS_SESSION_EXCEPTION, message.c_str());
@@ -931,7 +930,7 @@ void ProcessSession::commit() {
     _originalFlowFiles.clear();
     // persistent the provenance report
     this->provenance_report_->commit();
-    logger_->log_trace("ProcessSession committed for %s", process_context_->getProcessorNode().getName().c_str());
+    logger_->log_trace("ProcessSession committed for %s", process_context_->getProcessorNode()->getName().c_str());
   } catch (std::exception &exception) {
     logger_->log_debug("Caught Exception %s", exception.what());
     throw;
@@ -960,7 +959,7 @@ void ProcessSession::rollback() {
     _addedFlowFiles.clear();
     _updatedFlowFiles.clear();
     _deletedFlowFiles.clear();
-    logger_->log_debug("ProcessSession rollback for %s", process_context_->getProcessorNode().getName().c_str());
+    logger_->log_debug("ProcessSession rollback for %s", process_context_->getProcessorNode()->getName().c_str());
   } catch (std::exception &exception) {
     logger_->log_debug("Caught Exception %s", exception.what());
     throw;
@@ -971,10 +970,10 @@ void ProcessSession::rollback() {
 }
 
 std::shared_ptr<core::FlowFile> ProcessSession::get() {
-  std::shared_ptr<Connectable> first = process_context_->getProcessorNode().getNextIncomingConnection();
+  std::shared_ptr<Connectable> first = process_context_->getProcessorNode()->getNextIncomingConnection();
 
   if (first == NULL) {
-    logger_->log_debug("Get is null for %s", process_context_->getProcessorNode().getName());
+    logger_->log_debug("Get is null for %s", process_context_->getProcessorNode()->getName());
     return NULL;
   }
 
@@ -988,7 +987,7 @@ std::shared_ptr<core::FlowFile> ProcessSession::get() {
       for (std::set<std::shared_ptr<core::FlowFile> >::iterator it = expired.begin(); it != expired.end(); ++it) {
         std::shared_ptr<core::FlowFile> record = *it;
         std::stringstream details;
-        details << process_context_->getProcessorNode().getName() << " expire flow record " << record->getUUIDStr();
+        details << process_context_->getProcessorNode()->getName() << " expire flow record " << record->getUUIDStr();
         provenance_report_->expire(record, details.str());
       }
     }
@@ -1004,7 +1003,7 @@ std::shared_ptr<core::FlowFile> ProcessSession::get() {
       _originalFlowFiles[snapshot->getUUIDStr()] = snapshot;
       return ret;
     }
-    current = std::static_pointer_cast<Connection>(process_context_->getProcessorNode().getNextIncomingConnection());
+    current = std::static_pointer_cast<Connection>(process_context_->getProcessorNode()->getNextIncomingConnection());
   } while (current != NULL && current != first);
 
   return NULL;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/core/ProcessSessionFactory.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/ProcessSessionFactory.cpp b/libminifi/src/core/ProcessSessionFactory.cpp
index 570d895..4a6f21a 100644
--- a/libminifi/src/core/ProcessSessionFactory.cpp
+++ b/libminifi/src/core/ProcessSessionFactory.cpp
@@ -27,8 +27,8 @@ namespace nifi {
 namespace minifi {
 namespace core {
 
-std::unique_ptr<ProcessSession> ProcessSessionFactory::createSession() {
-  return std::unique_ptr < ProcessSession > (new ProcessSession(process_context_));
+std::shared_ptr<ProcessSession> ProcessSessionFactory::createSession() {
+  return std::make_shared<ProcessSession>(process_context_);
 }
 
 } /* namespace core */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/core/Processor.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/Processor.cpp b/libminifi/src/core/Processor.cpp
index 0c2e7cf..d35f283 100644
--- a/libminifi/src/core/Processor.cpp
+++ b/libminifi/src/core/Processor.cpp
@@ -71,6 +71,9 @@ bool Processor::isRunning() {
 
 void Processor::setScheduledState(ScheduledState state) {
   state_ = state;
+  if (state == STOPPED) {
+    notifyStop();
+  }
 }
 
 bool Processor::addConnection(std::shared_ptr<Connectable> conn) {
@@ -80,8 +83,8 @@ bool Processor::addConnection(std::shared_ptr<Connectable> conn) {
     logger_->log_info("Can not add connection while the process %s is running", name_.c_str());
     return false;
   }
-  std::shared_ptr<Connection> connection = std::static_pointer_cast < Connection > (conn);
-  std::lock_guard < std::mutex > lock(mutex_);
+  std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>(conn);
+  std::lock_guard<std::mutex> lock(mutex_);
 
   uuid_t srcUUID;
   uuid_t destUUID;
@@ -141,12 +144,12 @@ void Processor::removeConnection(std::shared_ptr<Connectable> conn) {
     return;
   }
 
-  std::lock_guard < std::mutex > lock(mutex_);
+  std::lock_guard<std::mutex> lock(mutex_);
 
   uuid_t srcUUID;
   uuid_t destUUID;
 
-  std::shared_ptr<Connection> connection = std::static_pointer_cast < Connection > (conn);
+  std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>(conn);
 
   connection->getSourceUUID(srcUUID);
   connection->getDestinationUUID(destUUID);
@@ -178,13 +181,13 @@ void Processor::removeConnection(std::shared_ptr<Connectable> conn) {
 }
 
 bool Processor::flowFilesQueued() {
-  std::lock_guard < std::mutex > lock(mutex_);
+  std::lock_guard<std::mutex> lock(mutex_);
 
   if (_incomingConnections.size() == 0)
     return false;
 
   for (auto &&conn : _incomingConnections) {
-    std::shared_ptr<Connection> connection = std::static_pointer_cast < Connection > (conn);
+    std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>(conn);
     if (connection->getQueueSize() > 0)
       return true;
   }
@@ -193,13 +196,13 @@ bool Processor::flowFilesQueued() {
 }
 
 bool Processor::flowFilesOutGoingFull() {
-  std::lock_guard < std::mutex > lock(mutex_);
+  std::lock_guard<std::mutex> lock(mutex_);
 
   for (auto &&connection : out_going_connections_) {
     // We already has connection for this relationship
     std::set<std::shared_ptr<Connectable>> existedConnection = connection.second;
     for (const auto conn : existedConnection) {
-      std::shared_ptr < Connection > connection = std::static_pointer_cast < Connection > (conn);
+      std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>(conn);
       if (connection->isFull())
         return true;
     }
@@ -226,13 +229,31 @@ void Processor::onTrigger(ProcessContext *context, ProcessSessionFactory *sessio
   }
 }
 
+void Processor::onTrigger(std::shared_ptr<ProcessContext> context, std::shared_ptr<ProcessSessionFactory> sessionFactory) {
+  auto session = sessionFactory->createSession();
+
+  try {
+    // Call the virtual trigger function
+    onTrigger(context, session);
+    session->commit();
+  } catch (std::exception &exception) {
+    logger_->log_debug("Caught Exception %s", exception.what());
+    session->rollback();
+    throw;
+  } catch (...) {
+    logger_->log_debug("Caught Exception Processor::onTrigger");
+    session->rollback();
+    throw;
+  }
+}
+
 bool Processor::isWorkAvailable() {
   // We have work if any incoming connection has work
   bool hasWork = false;
 
   try {
     for (const auto &conn : _incomingConnections) {
-      std::shared_ptr<Connection> connection = std::static_pointer_cast < Connection > (conn);
+      std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>(conn);
       if (connection->getQueueSize() > 0) {
         hasWork = true;
         break;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/core/ProcessorNode.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/ProcessorNode.cpp b/libminifi/src/core/ProcessorNode.cpp
index 05f31a0..0a21f4d 100644
--- a/libminifi/src/core/ProcessorNode.cpp
+++ b/libminifi/src/core/ProcessorNode.cpp
@@ -17,13 +17,14 @@
 
 #include "core/ProcessorNode.h"
 #include <memory>
+#include <utility>
 namespace org {
 namespace apache {
 namespace nifi {
 namespace minifi {
 namespace core {
 
-ProcessorNode::ProcessorNode(const std::shared_ptr<Connectable> processor)
+ProcessorNode::ProcessorNode(const std::shared_ptr<Connectable> &processor)
     : processor_(processor),
       Connectable(processor->getName(), 0),
       ConfigurableComponent() {
@@ -40,6 +41,11 @@ ProcessorNode::ProcessorNode(const ProcessorNode &other)
   setUUID(copy);
 }
 
+ProcessorNode::ProcessorNode(const ProcessorNode &&other)
+    : Connectable(std::move(other)),
+      processor_(std::move(other.processor_)) {
+}
+
 ProcessorNode::~ProcessorNode() {
 }
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/core/Repository.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/Repository.cpp b/libminifi/src/core/Repository.cpp
index cf26a0d..be21b16 100644
--- a/libminifi/src/core/Repository.cpp
+++ b/libminifi/src/core/Repository.cpp
@@ -54,10 +54,13 @@ void Repository::stop() {
 }
 
 // repoSize
-uint64_t Repository::repoSize() {
+uint64_t Repository::getRepoSize() {
   return repo_size_;
 }
 
+void Repository::flush() {
+}
+
 } /* namespace core */
 } /* namespace minifi */
 } /* namespace nifi */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/core/RepositoryFactory.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/RepositoryFactory.cpp b/libminifi/src/core/RepositoryFactory.cpp
index 9e99718..b25e87c 100644
--- a/libminifi/src/core/RepositoryFactory.cpp
+++ b/libminifi/src/core/RepositoryFactory.cpp
@@ -67,13 +67,13 @@ std::shared_ptr<core::Repository> createRepository(const std::string configurati
       return return_obj;
     }
     if (fail_safe) {
-      return std::make_shared < core::Repository > ("fail_safe", "fail_safe", 1, 1, 1);
+      return std::make_shared<core::Repository>("fail_safe", "fail_safe", 1, 1, 1);
     } else {
       throw std::runtime_error("Support for the provided configuration class could not be found");
     }
   } catch (const std::runtime_error &r) {
     if (fail_safe) {
-      return std::make_shared < core::Repository > ("fail_safe", "fail_safe", 1, 1, 1);
+      return std::make_shared<core::Repository>("fail_safe", "fail_safe", 1, 1, 1);
     }
   }
 
@@ -96,13 +96,13 @@ std::shared_ptr<core::ContentRepository> createContentRepository(const std::stri
       return return_obj;
     }
     if (fail_safe) {
-      return std::make_shared < core::repository::FileSystemRepository > ("fail_safe");
+      return std::make_shared<core::repository::FileSystemRepository>("fail_safe");
     } else {
       throw std::runtime_error("Support for the provided configuration class could not be found");
     }
   } catch (const std::runtime_error &r) {
     if (fail_safe) {
-      return std::make_shared < core::repository::FileSystemRepository > ("fail_safe");
+      return std::make_shared<core::repository::FileSystemRepository>("fail_safe");
     }
   }
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/core/controller/StandardControllerServiceNode.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/controller/StandardControllerServiceNode.cpp b/libminifi/src/core/controller/StandardControllerServiceNode.cpp
index 69004c1..5c4aa70 100644
--- a/libminifi/src/core/controller/StandardControllerServiceNode.cpp
+++ b/libminifi/src/core/controller/StandardControllerServiceNode.cpp
@@ -27,12 +27,12 @@ namespace minifi {
 namespace core {
 namespace controller {
 std::shared_ptr<core::ProcessGroup> &StandardControllerServiceNode::getProcessGroup() {
-  std::lock_guard < std::mutex > lock(mutex_);
+  std::lock_guard<std::mutex> lock(mutex_);
   return process_group_;
 }
 
 void StandardControllerServiceNode::setProcessGroup(std::shared_ptr<ProcessGroup> &processGroup) {
-  std::lock_guard < std::mutex > lock(mutex_);
+  std::lock_guard<std::mutex> lock(mutex_);
   process_group_ = processGroup;
 }
 
@@ -45,7 +45,7 @@ bool StandardControllerServiceNode::enable() {
     for (auto linked_service : property.getValues()) {
       std::shared_ptr<ControllerServiceNode> csNode = provider->getControllerServiceNode(linked_service);
       if (nullptr != csNode) {
-        std::lock_guard < std::mutex > lock(mutex_);
+        std::lock_guard<std::mutex> lock(mutex_);
         linked_controller_services_.push_back(csNode);
       }
     }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/core/logging/LoggerConfiguration.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/logging/LoggerConfiguration.cpp b/libminifi/src/core/logging/LoggerConfiguration.cpp
index 4b97055..c06239b 100644
--- a/libminifi/src/core/logging/LoggerConfiguration.cpp
+++ b/libminifi/src/core/logging/LoggerConfiguration.cpp
@@ -56,19 +56,19 @@ std::vector<std::string> LoggerProperties::get_keys_of_type(const std::string &t
 LoggerConfiguration::LoggerConfiguration()
     : root_namespace_(create_default_root()),
       loggers(std::vector<std::shared_ptr<LoggerImpl>>()),
-      formatter_(std::make_shared < spdlog::pattern_formatter > (spdlog_default_pattern)) {
-  logger_ = std::shared_ptr < LoggerImpl > (new LoggerImpl(core::getClassName<LoggerConfiguration>(), get_logger(nullptr, root_namespace_, core::getClassName<LoggerConfiguration>(), formatter_)));
+      formatter_(std::make_shared<spdlog::pattern_formatter>(spdlog_default_pattern)) {
+  logger_ = std::shared_ptr<LoggerImpl>(new LoggerImpl(core::getClassName<LoggerConfiguration>(), get_logger(nullptr, root_namespace_, core::getClassName<LoggerConfiguration>(), formatter_)));
   loggers.push_back(logger_);
 }
 
 void LoggerConfiguration::initialize(const std::shared_ptr<LoggerProperties> &logger_properties) {
-  std::lock_guard < std::mutex > lock(mutex);
+  std::lock_guard<std::mutex> lock(mutex);
   root_namespace_ = initialize_namespaces(logger_properties);
   std::string spdlog_pattern;
   if (!logger_properties->get("spdlog.pattern", spdlog_pattern)) {
     spdlog_pattern = spdlog_default_pattern;
   }
-  formatter_ = std::make_shared < spdlog::pattern_formatter > (spdlog_pattern);
+  formatter_ = std::make_shared<spdlog::pattern_formatter>(spdlog_pattern);
   std::map<std::string, std::shared_ptr<spdlog::logger>> spdloggers;
   for (auto const & logger_impl : loggers) {
     std::shared_ptr<spdlog::logger> spdlogger;
@@ -85,8 +85,8 @@ void LoggerConfiguration::initialize(const std::shared_ptr<LoggerProperties> &lo
 }
 
 std::shared_ptr<Logger> LoggerConfiguration::getLogger(const std::string &name) {
-  std::lock_guard < std::mutex > lock(mutex);
-  std::shared_ptr<LoggerImpl> result = std::make_shared < LoggerImpl > (name, get_logger(logger_, root_namespace_, name, formatter_));
+  std::lock_guard<std::mutex> lock(mutex);
+  std::shared_ptr<LoggerImpl> result = std::make_shared<LoggerImpl>(name, get_logger(logger_, root_namespace_, name, formatter_));
   loggers.push_back(result);
   return result;
 }
@@ -130,7 +130,7 @@ std::shared_ptr<internal::LoggerNamespace> LoggerConfiguration::initialize_names
         } catch (const std::out_of_range &oor) {
         }
       }
-      sink_map[appender_name] = std::make_shared < spdlog::sinks::rotating_file_sink_mt > (file_name, max_file_size, max_files);
+      sink_map[appender_name] = std::make_shared<spdlog::sinks::rotating_file_sink_mt>(file_name, max_file_size, max_files);
     } else if ("stdout" == appender_type) {
       sink_map[appender_name] = spdlog::sinks::stdout_sink_mt::instance();
     } else {
@@ -227,7 +227,7 @@ std::shared_ptr<spdlog::logger> LoggerConfiguration::get_logger(std::shared_ptr<
   if (logger != nullptr) {
     logger->log_debug("%s logger got sinks from namespace %s and level %s from namespace %s", name, sink_namespace_str, spdlog::level::level_names[level], level_namespace_str);
   }
-  spdlogger = std::make_shared < spdlog::logger > (name, begin(sinks), end(sinks));
+  spdlogger = std::make_shared<spdlog::logger>(name, begin(sinks), end(sinks));
   spdlogger->set_level(level);
   spdlogger->set_formatter(formatter);
   spdlogger->flush_on(std::max(spdlog::level::info, current_namespace->level));

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp b/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp
index d4059d6..c556701 100644
--- a/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp
+++ b/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp
@@ -57,7 +57,7 @@ void SiteToSiteProvenanceReportingTask::getJsonReport(core::ProcessContext *cont
                                                       std::string &report) {
   Json::Value array;
   for (auto sercomp : records) {
-    std::shared_ptr<provenance::ProvenanceEventRecord> record = std::dynamic_pointer_cast < provenance::ProvenanceEventRecord > (sercomp);
+    std::shared_ptr<provenance::ProvenanceEventRecord> record = std::dynamic_pointer_cast<provenance::ProvenanceEventRecord>(sercomp);
     if (nullptr == record) {
       break;
     }
@@ -119,7 +119,7 @@ void SiteToSiteProvenanceReportingTask::onTrigger(core::ProcessContext *context,
   if (!protocol_->bootstrap()) {
     // bootstrap the client protocol if needeed
     context->yield();
-    std::shared_ptr<Processor> processor = std::static_pointer_cast < Processor > (context->getProcessorNode().getProcessor());
+    std::shared_ptr<Processor> processor = std::static_pointer_cast<Processor>(context->getProcessorNode()->getProcessor());
     logger_->log_error("Site2Site bootstrap failed yield period %d peer ", processor->getYieldPeriodMsec());
     returnProtocol(std::move(protocol_));
     return;
@@ -130,7 +130,7 @@ void SiteToSiteProvenanceReportingTask::onTrigger(core::ProcessContext *context,
   logger_->log_debug("batch size %d records", batch_size_);
   size_t deserialized = batch_size_;
   std::shared_ptr<core::Repository> repo = context->getProvenanceRepository();
-  std::function < std::shared_ptr<core::SerializableComponent>() > constructor = []() {return std::make_shared<provenance::ProvenanceEventRecord>();};
+  std::function<std::shared_ptr<core::SerializableComponent>()> constructor = []() {return std::make_shared<provenance::ProvenanceEventRecord>();};
   if (!repo->DeSerialize(records, deserialized, constructor) && deserialized == 0) {
     logger_->log_debug("Not sending because deserialized is %d", deserialized);
     returnProtocol(std::move(protocol_));

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/core/repository/FileSystemRepository.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/repository/FileSystemRepository.cpp b/libminifi/src/core/repository/FileSystemRepository.cpp
index fba1fe3..4945b31 100644
--- a/libminifi/src/core/repository/FileSystemRepository.cpp
+++ b/libminifi/src/core/repository/FileSystemRepository.cpp
@@ -37,6 +37,11 @@ std::shared_ptr<io::BaseStream> FileSystemRepository::write(const std::shared_pt
   return std::make_shared<io::FileStream>(claim->getContentFullPath());
 }
 
+bool FileSystemRepository::exists(const std::shared_ptr<minifi::ResourceClaim> &streamId) {
+  std::ifstream file(streamId->getContentFullPath());
+  return file.good();
+}
+
 std::shared_ptr<io::BaseStream> FileSystemRepository::read(const std::shared_ptr<minifi::ResourceClaim> &claim) {
   return std::make_shared<io::FileStream>(claim->getContentFullPath(), 0, false);
 }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/core/repository/FlowFileRepository.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/repository/FlowFileRepository.cpp b/libminifi/src/core/repository/FlowFileRepository.cpp
index ac092ea..3ed7fbf 100644
--- a/libminifi/src/core/repository/FlowFileRepository.cpp
+++ b/libminifi/src/core/repository/FlowFileRepository.cpp
@@ -16,6 +16,7 @@
  * limitations under the License.
  */
 #include "core/repository/FlowFileRepository.h"
+#include "leveldb/write_batch.h"
 #include <memory>
 #include <string>
 #include <utility>
@@ -29,46 +30,58 @@ namespace minifi {
 namespace core {
 namespace repository {
 
+void FlowFileRepository::flush() {
+  leveldb::WriteBatch batch;
+  std::string key;
+  std::string value;
+  leveldb::ReadOptions options;
+
+  std::vector<std::shared_ptr<FlowFileRecord>> purgeList;
+
+  uint64_t decrement_total = 0;
+  while (keys_to_delete.size_approx() > 0) {
+    if (keys_to_delete.try_dequeue(key)) {
+      db_->Get(options, key, &value);
+      decrement_total += value.size();
+      std::shared_ptr<FlowFileRecord> eventRead = std::make_shared<FlowFileRecord>(shared_from_this(), content_repo_);
+      if (eventRead->DeSerialize(reinterpret_cast<const uint8_t *>(value.data()), value.size())) {
+        purgeList.push_back(eventRead);
+      }
+      logger_->log_info("Issuing batch delete, including %s, Content path %s", eventRead->getUUIDStr(), eventRead->getContentFullPath());
+      batch.Delete(key);
+    }
+  }
+  if (db_->Write(leveldb::WriteOptions(), &batch).ok()) {
+    logger_->log_info("Decrementing %u from a repo size of %u", decrement_total, repo_size_.load());
+    if (decrement_total > repo_size_.load()) {
+      repo_size_ = 0;
+    } else {
+      repo_size_ -= decrement_total;
+    }
+  }
+
+  if (nullptr != content_repo_) {
+    for (const auto &ffr : purgeList) {
+      auto claim = ffr->getResourceClaim();
+      if (claim != nullptr) {
+        content_repo_->removeIfOrphaned(claim);
+      }
+    }
+  }
+}
+
 void FlowFileRepository::run() {
   // threshold for purge
   uint64_t purgeThreshold = max_partition_bytes_ * 3 / 4;
+
   while (running_) {
     std::this_thread::sleep_for(std::chrono::milliseconds(purge_period_));
     uint64_t curTime = getTimeMillis();
-    uint64_t size = repoSize();
-    if (size >= purgeThreshold) {
-      std::vector<std::shared_ptr<FlowFileRecord>> purgeList;
-      std::vector<std::pair<std::string, uint64_t>> keyRemovalList;
-      leveldb::Iterator* it = db_->NewIterator(leveldb::ReadOptions());
 
-      for (it->SeekToFirst(); it->Valid(); it->Next()) {
-        std::shared_ptr<FlowFileRecord> eventRead = std::make_shared<FlowFileRecord>(shared_from_this(), content_repo_);
-        std::string key = it->key().ToString();
-        if (eventRead->DeSerialize(reinterpret_cast<const uint8_t *>(it->value().data()), it->value().size())) {
-          if ((curTime - eventRead->getEventTime()) > max_partition_millis_) {
-            purgeList.push_back(eventRead);
-            keyRemovalList.push_back(std::make_pair(key, it->value().size()));
-          }
-        } else {
-          logger_->log_debug("NiFi %s retrieve event %s fail", name_.c_str(), key.c_str());
-          keyRemovalList.push_back(std::make_pair(key, it->value().size()));
-        }
-      }
-      delete it;
-      for (auto eventId : keyRemovalList) {
-        logger_->log_info("Repository Repo %s Purge %s", name_.c_str(), eventId.first.c_str());
-        if (Delete(eventId.first)) {
-          repo_size_ -= eventId.second;
-        }
-      }
+    flush();
+
+    uint64_t size = getRepoSize();
 
-      for (const auto &ffr : purgeList) {
-        auto claim = ffr->getResourceClaim();
-        if (claim != nullptr) {
-          content_repo_->remove(claim);
-        }
-      }
-    }
     if (size > max_partition_bytes_)
       repo_full_ = true;
     else
@@ -81,22 +94,25 @@ void FlowFileRepository::loadComponent(const std::shared_ptr<core::ContentReposi
   std::vector<std::pair<std::string, uint64_t>> purgeList;
   leveldb::Iterator* it = db_->NewIterator(leveldb::ReadOptions());
 
+  repo_size_ = 0;
   for (it->SeekToFirst(); it->Valid(); it->Next()) {
     std::shared_ptr<FlowFileRecord> eventRead = std::make_shared<FlowFileRecord>(shared_from_this(), content_repo_);
     std::string key = it->key().ToString();
     repo_size_ += it->value().size();
     if (eventRead->DeSerialize(reinterpret_cast<const uint8_t *>(it->value().data()), it->value().size())) {
+      logger_->log_info("Found connection for %s, path %s ", eventRead->getConnectionUuid(), eventRead->getContentFullPath());
       auto search = connectionMap.find(eventRead->getConnectionUuid());
       if (search != connectionMap.end()) {
         // we find the connection for the persistent flowfile, create the flowfile and enqueue that
         std::shared_ptr<core::FlowFile> flow_file_ref = std::static_pointer_cast<core::FlowFile>(eventRead);
-        std::shared_ptr<FlowFileRecord> record = std::make_shared<FlowFileRecord>(shared_from_this(), content_repo_);
-        // set store to repo to true so that we do need to persistent again in enqueue
-        record->setStoredToRepository(true);
-        search->second->put(record);
+        eventRead->setStoredToRepository(true);
+        search->second->put(eventRead);
       } else {
+        logger_->log_info("Could not find connectinon for %s, path %s ", eventRead->getConnectionUuid(), eventRead->getContentFullPath());
         if (eventRead->getContentFullPath().length() > 0) {
-          std::remove(eventRead->getContentFullPath().c_str());
+          if (nullptr != eventRead->getResourceClaim()) {
+            content_repo_->remove(eventRead->getResourceClaim());
+          }
         }
         purgeList.push_back(std::make_pair(key, it->value().size()));
       }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/core/repository/VolatileContentRepository.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/repository/VolatileContentRepository.cpp b/libminifi/src/core/repository/VolatileContentRepository.cpp
index ac575c5..65f1cf9 100644
--- a/libminifi/src/core/repository/VolatileContentRepository.cpp
+++ b/libminifi/src/core/repository/VolatileContentRepository.cpp
@@ -132,6 +132,23 @@ std::shared_ptr<io::BaseStream> VolatileContentRepository::write(const std::shar
   return nullptr;
 }
 
+bool VolatileContentRepository::exists(const std::shared_ptr<minifi::ResourceClaim> &claim) {
+  logger_->log_debug("enter exists");
+  int size = 0;
+  {
+    std::lock_guard<std::mutex> lock(map_mutex_);
+    auto claim_check = master_list_.find(claim->getContentFullPath());
+    if (claim_check != master_list_.end()) {
+      auto ent = claim_check->second->takeOwnership();
+      if (ent == nullptr) {
+        return false;
+      }
+      return true;
+    }
+  }
+  return false;
+}
+
 std::shared_ptr<io::BaseStream> VolatileContentRepository::read(const std::shared_ptr<minifi::ResourceClaim> &claim) {
   logger_->log_debug("enter read");
   int size = 0;


Mime
View raw message