nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ald...@apache.org
Subject [3/8] nifi-minifi-cpp git commit: MINIFI-339: Add C2 base allowing for 1 protocol and n heartbeat reporters MINIFI-339: Add GetTCP Processor MINIFI-339: Add listener server MINIFI-339: Update to listener MINIFI-339: Resolve Issue with stack based process
Date Mon, 02 Oct 2017 14:57:16 GMT
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/core/state/ProcessorController.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/state/ProcessorController.cpp b/libminifi/src/core/state/ProcessorController.cpp
new file mode 100644
index 0000000..7a1af7d
--- /dev/null
+++ b/libminifi/src/core/state/ProcessorController.cpp
@@ -0,0 +1,64 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/state/ProcessorController.h"
+#include <memory>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace state {
+
+ProcessorController::ProcessorController(const std::shared_ptr<core::Processor> &processor, const std::shared_ptr<SchedulingAgent> &scheduler)
+    : processor_(processor),
+      scheduler_(scheduler) {
+}
+
+ProcessorController::~ProcessorController() {
+}
+/**
+ * Start the client
+ */
+int16_t ProcessorController::start() {
+  processor_->setScheduledState(core::ScheduledState::RUNNING);
+  scheduler_->schedule(processor_);
+  return 0;
+}
+/**
+ * Stop the client
+ */
+int16_t ProcessorController::stop(bool force, uint64_t timeToWait) {
+  scheduler_->unschedule(processor_);
+  return 0;
+}
+
+bool ProcessorController::isRunning() {
+  return processor_->isRunning();
+}
+
+int16_t ProcessorController::pause() {
+  scheduler_->unschedule(processor_);
+  return 0;
+}
+
+} /* namespace state */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/core/state/StateManager.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/state/StateManager.cpp b/libminifi/src/core/state/StateManager.cpp
new file mode 100644
index 0000000..11feabf
--- /dev/null
+++ b/libminifi/src/core/state/StateManager.cpp
@@ -0,0 +1,137 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/state/StateManager.h"
+#include <memory>
+#include <utility>
+#include <vector>
+#include "core/state/metrics/MetricsBase.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace state {
+
+void StateManager::initialize() {
+  metrics_listener_ = std::unique_ptr<state::metrics::MetricsListener>(new state::metrics::MetricsListener(shared_from_this(), shared_from_this()));
+  // manually add the c2 agent for now
+  listener_thread_pool_.setMaxConcurrentTasks(3);
+  listener_thread_pool_.start();
+  controller_running_ = true;
+  startMetrics();
+}
+/**
+ * State management operations.
+ */
+int16_t StateManager::stop(bool force, uint64_t timeToWait) {
+  controller_running_ = false;
+  listener_thread_pool_.shutdown();
+  return 1;
+}
+
+int16_t StateManager::update(const std::shared_ptr<Update> &updateController) {
+  // must be stopped to update.
+  if (isStateMonitorRunning()) {
+    return -1;
+  }
+  int16_t ret = applyUpdate(updateController);
+  switch (ret) {
+    case -1:
+      return -1;
+    default:
+      return 1;
+  }
+}
+
+/**
+ * Passes metrics to the update controllers if they are a metrics sink.
+ * @param metrics metric to pass through
+ */
+int16_t StateManager::setMetrics(const std::shared_ptr<metrics::Metrics> &metrics) {
+  if (IsNullOrEmpty(metrics)) {
+    return -1;
+  }
+  auto now = std::chrono::steady_clock::now();
+  if (mutex_.try_lock_until(now + std::chrono::milliseconds(100))) {
+    // update controllers can be metric sinks too
+    for (auto controller : updateControllers) {
+      std::shared_ptr<metrics::MetricsSink> sink = std::dynamic_pointer_cast<metrics::MetricsSink>(controller);
+      if (sink != nullptr) {
+        sink->setMetrics(metrics);
+      }
+    }
+    metrics_maps_[metrics->getName()] = metrics;
+    mutex_.unlock();
+  } else {
+    return -1;
+  }
+  return 0;
+}
+/**
+ * Metrics operations
+ */
+int16_t StateManager::getMetrics(std::vector<std::shared_ptr<metrics::Metrics>> &metric_vector, uint16_t metricsClass) {
+  auto now = std::chrono::steady_clock::now();
+  const std::chrono::steady_clock::time_point wait_time = now + std::chrono::milliseconds(100);
+  if (mutex_.try_lock_until(wait_time)) {
+    for (auto metric : metrics_maps_) {
+      metric_vector.push_back(metric.second);
+    }
+    mutex_.unlock();
+    return 0;
+  }
+  return -1;
+}
+
+bool StateManager::registerUpdateListener(const std::shared_ptr<UpdateController> &updateController) {
+  auto functions = updateController->getFunctions();
+
+  updateControllers.push_back(updateController);
+  // run all functions independently
+
+  for (auto function : functions) {
+    std::unique_ptr<utils::AfterExecute<Update>> after_execute = std::unique_ptr<utils::AfterExecute<Update>>(new UpdateRunner(isStateMonitorRunning()));
+    utils::Worker<Update> functor(function, "listeners", std::move(after_execute));
+    std::future<Update> future;
+    if (!listener_thread_pool_.execute(std::move(functor), future)) {
+      // denote failure
+      return false;
+    }
+  }
+  return true;
+}
+
+/**
+ * Base metrics function will employ the default metrics listener.
+ */
+bool StateManager::startMetrics() {
+  std::unique_ptr<utils::AfterExecute<Update>> after_execute = std::unique_ptr<utils::AfterExecute<Update>>(new UpdateRunner(isStateMonitorRunning()));
+  utils::Worker<Update> functor(metrics_listener_->getFunction(), "metrics", std::move(after_execute));
+  if (!listener_thread_pool_.execute(std::move(functor), metrics_listener_->getFuture())) {
+    // denote failure
+    return false;
+  }
+  return true;
+}
+
+} /* namespace state */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/core/state/UpdateController.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/state/UpdateController.cpp b/libminifi/src/core/state/UpdateController.cpp
new file mode 100644
index 0000000..f4215f1
--- /dev/null
+++ b/libminifi/src/core/state/UpdateController.cpp
@@ -0,0 +1,76 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/state/UpdateController.h"
+#include <utility>
+#include <string>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace state {
+
+UpdateStatus::UpdateStatus(UpdateState state, int16_t reason)
+    : state_(state),
+      reason_(reason) {
+}
+
+UpdateStatus::UpdateStatus(const UpdateStatus &other)
+    : error_(other.error_),
+      reason_(other.reason_),
+      state_(other.state_) {
+}
+
+UpdateStatus::UpdateStatus(const UpdateStatus &&other)
+    : error_(std::move(other.error_)),
+      reason_(std::move(other.reason_)),
+      state_(std::move(other.state_)) {
+}
+
+UpdateState UpdateStatus::getState() const {
+  return state_;
+}
+
+std::string UpdateStatus::getError() const {
+  return error_;
+}
+
+int16_t UpdateStatus::getReadonCode() const {
+  return reason_;
+}
+
+UpdateStatus &UpdateStatus::operator=(const UpdateStatus &&other) {
+  error_ = std::move(other.error_);
+  reason_ = std::move(other.reason_);
+  state_ = std::move(other.state_);
+  return *this;
+}
+
+UpdateStatus &UpdateStatus::operator=(const UpdateStatus &other) {
+  error_ = other.error_;
+  reason_ = other.reason_;
+  state_ = other.state_;
+  return *this;
+}
+
+} /* namespace state */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/io/ClientSocket.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/io/ClientSocket.cpp b/libminifi/src/io/ClientSocket.cpp
index 5f7f5f4..d702620 100644
--- a/libminifi/src/io/ClientSocket.cpp
+++ b/libminifi/src/io/ClientSocket.cpp
@@ -79,11 +79,24 @@ void Socket::closeStream() {
     addr_info_ = 0;
   }
   if (socket_file_descriptor_ >= 0) {
+    logger_->log_debug("Closing %d", socket_file_descriptor_);
     close(socket_file_descriptor_);
     socket_file_descriptor_ = -1;
   }
 }
 
+void Socket::setNonBlocking() {
+  if (listeners_ <= 0) {
+    // Put the socket in non-blocking mode:
+    if (fcntl(socket_file_descriptor_, F_SETFL, O_NONBLOCK) < 0) {
+      // handle error
+      logger_->log_error("Could not create non blocking to socket", strerror(errno));
+    } else {
+      logger_->log_info("Successfully applied O_NONBLOCK to fd");
+    }
+  }
+}
+
 int8_t Socket::createConnection(const addrinfo *p, in_addr_t &addr) {
   if ((socket_file_descriptor_ = socket(p->ai_family, p->ai_socktype, p->ai_protocol)) == -1) {
     logger_->log_error("error while connecting to server socket");
@@ -131,6 +144,7 @@ int8_t Socket::createConnection(const addrinfo *p, in_addr_t &addr) {
   // add the listener to the total set
   FD_SET(socket_file_descriptor_, &total_list_);
   socket_max_ = socket_file_descriptor_;
+  logger_->log_info("Created connection with file descriptor %d", socket_file_descriptor_);
   return 0;
 }
 
@@ -183,15 +197,21 @@ int16_t Socket::initialize() {
     }
     // we've successfully connected
     if (port_ > 0 && createConnection(p, addr) >= 0) {
+      logger_->log_info("Successfully created connection");
       return 0;
       break;
     }
   }
 
+  logger_->log_info("Could not find device for our connection");
   return -1;
 }
 
 int16_t Socket::select_descriptor(const uint16_t msec) {
+  if (listeners_ == 0) {
+    return socket_file_descriptor_;
+  }
+
   struct timeval tv;
   int retval;
 
@@ -200,7 +220,7 @@ int16_t Socket::select_descriptor(const uint16_t msec) {
   tv.tv_sec = msec / 1000;
   tv.tv_usec = (msec % 1000) * 1000;
 
-  std::lock_guard < std::recursive_mutex > guard(selection_mutex_);
+  std::lock_guard<std::recursive_mutex> guard(selection_mutex_);
 
   if (msec > 0)
     retval = select(socket_max_ + 1, &read_fds_, NULL, NULL, &tv);
@@ -236,6 +256,8 @@ int16_t Socket::select_descriptor(const uint16_t msec) {
     }
   }
 
+  logger_->log_error("Could not find a suitable file descriptor");
+
   return -1;
 }
 
@@ -291,19 +313,20 @@ int Socket::writeData(std::vector<uint8_t> &buf, int buflen) {
 int Socket::writeData(uint8_t *value, int size) {
   int ret = 0, bytes = 0;
 
+  int fd = select_descriptor(1000);
   while (bytes < size) {
-    ret = send(socket_file_descriptor_, value + bytes, size - bytes, 0);
+    ret = send(fd, value + bytes, size - bytes, 0);
     // check for errors
     if (ret <= 0) {
-      close(socket_file_descriptor_);
-      logger_->log_error("Could not send to %d, error: %s", socket_file_descriptor_, strerror(errno));
+      close(fd);
+      logger_->log_error("Could not send to %d, error: %s", fd, strerror(errno));
       return ret;
     }
     bytes += ret;
   }
 
   if (ret)
-    logger_->log_trace("Send data size %d over socket %d", size, socket_file_descriptor_);
+    logger_->log_trace("Send data size %d over socket %d", size, fd);
   return bytes;
 }
 
@@ -362,27 +385,32 @@ int Socket::read(uint16_t &value, bool is_little_endian) {
   return sizeof(value);
 }
 
-int Socket::readData(std::vector<uint8_t> &buf, int buflen) {
+int Socket::readData(std::vector<uint8_t> &buf, int buflen, bool retrieve_all_bytes) {
   if (buf.capacity() < buflen) {
     buf.resize(buflen);
   }
-  return readData(reinterpret_cast<uint8_t*>(&buf[0]), buflen);
+  return readData(reinterpret_cast<uint8_t*>(&buf[0]), buflen, retrieve_all_bytes);
 }
 
-int Socket::readData(uint8_t *buf, int buflen) {
+int Socket::readData(uint8_t *buf, int buflen, bool retrieve_all_bytes) {
   int32_t total_read = 0;
   while (buflen) {
     int16_t fd = select_descriptor(1000);
     if (fd < 0) {
-      logger_->log_info("fd close %i", buflen);
+      logger_->log_info("fd %d close %i", fd, buflen);
       close(socket_file_descriptor_);
       return -1;
     }
     int bytes_read = recv(fd, buf, buflen, 0);
+    logger_->log_info("Recv call %d", bytes_read);
     if (bytes_read <= 0) {
       if (bytes_read == 0) {
         logger_->log_info("Other side hung up on %d", fd);
       } else {
+        if (errno == EAGAIN || errno == EWOULDBLOCK) {
+          // continue
+          return -2;
+        }
         logger_->log_error("Could not recv on %d, error: %s", fd, strerror(errno));
       }
       return -1;
@@ -390,6 +418,9 @@ int Socket::readData(uint8_t *buf, int buflen) {
     buflen -= bytes_read;
     buf += bytes_read;
     total_read += bytes_read;
+    if (!retrieve_all_bytes) {
+      break;
+    }
   }
   return total_read;
 }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/io/FileStream.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/io/FileStream.cpp b/libminifi/src/io/FileStream.cpp
index 3b2bfe1..a3e7ee1 100644
--- a/libminifi/src/io/FileStream.cpp
+++ b/libminifi/src/io/FileStream.cpp
@@ -140,6 +140,7 @@ int FileStream::readData(uint8_t *buf, int buflen) {
       int len = file_stream_->tellg();
       offset_ = len;
       length_ = len;
+      logger_->log_info("%s eof bit, ended at %d", path_, offset_);
       return offset_;
     } else {
       offset_ += buflen;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/io/StreamFactory.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/io/StreamFactory.cpp b/libminifi/src/io/StreamFactory.cpp
index 288f4d1..7990edd 100644
--- a/libminifi/src/io/StreamFactory.cpp
+++ b/libminifi/src/io/StreamFactory.cpp
@@ -47,11 +47,11 @@ class SocketCreator : public AbstractStreamFactory {
  public:
   template<typename Q = V>
   ContextTypeCheck<true, std::shared_ptr<Q>> create(const std::shared_ptr<Configure> &configure) {
-    return std::make_shared < V > (configure);
+    return std::make_shared<V>(configure);
   }
   template<typename Q = V>
   ContextTypeCheck<false, std::shared_ptr<Q>> create(const std::shared_ptr<Configure> &configure) {
-    return std::make_shared < SocketContext > (configure);
+    return std::make_shared<SocketContext>(configure);
   }
 
   SocketCreator<T, V>(std::shared_ptr<Configure> configure) {
@@ -69,7 +69,7 @@ class SocketCreator : public AbstractStreamFactory {
 
   std::unique_ptr<Socket> createSocket(const std::string &host, const uint16_t port) {
     T *socket = create(host, port);
-    return std::unique_ptr < Socket > (socket);
+    return std::unique_ptr<Socket>(socket);
   }
 
  private:

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/processors/ExecuteProcess.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/ExecuteProcess.cpp b/libminifi/src/processors/ExecuteProcess.cpp
index 9c6f732..323d69a 100644
--- a/libminifi/src/processors/ExecuteProcess.cpp
+++ b/libminifi/src/processors/ExecuteProcess.cpp
@@ -145,7 +145,7 @@ void ExecuteProcess::onTrigger(core::ProcessContext *context, core::ProcessSessi
               break;
             logger_->log_info("Execute Command Respond %d", numRead);
             ExecuteProcess::WriteCallback callback(buffer, numRead);
-            std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast < FlowFileRecord > (session->create());
+            std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->create());
             if (!flowFile)
               continue;
             flowFile->addAttribute("command", _command.c_str());
@@ -167,7 +167,7 @@ void ExecuteProcess::onTrigger(core::ProcessContext *context, core::ProcessSessi
                 // child exits and close the pipe
                 ExecuteProcess::WriteCallback callback(buffer, totalRead);
                 if (!flowFile) {
-                  flowFile = std::static_pointer_cast < FlowFileRecord > (session->create());
+                  flowFile = std::static_pointer_cast<FlowFileRecord>(session->create());
                   if (!flowFile)
                     break;
                   flowFile->addAttribute("command", _command.c_str());
@@ -185,7 +185,7 @@ void ExecuteProcess::onTrigger(core::ProcessContext *context, core::ProcessSessi
                 logger_->log_info("Execute Command Max Respond %d", sizeof(buffer));
                 ExecuteProcess::WriteCallback callback(buffer, sizeof(buffer));
                 if (!flowFile) {
-                  flowFile = std::static_pointer_cast < FlowFileRecord > (session->create());
+                  flowFile = std::static_pointer_cast<FlowFileRecord>(session->create());
                   if (!flowFile)
                     continue;
                   flowFile->addAttribute("command", _command.c_str());

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/processors/GenerateFlowFile.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/GenerateFlowFile.cpp b/libminifi/src/processors/GenerateFlowFile.cpp
index 2fee3f2..3741a8f 100644
--- a/libminifi/src/processors/GenerateFlowFile.cpp
+++ b/libminifi/src/processors/GenerateFlowFile.cpp
@@ -91,7 +91,7 @@ void GenerateFlowFile::onTrigger(core::ProcessContext *context, core::ProcessSes
     }
     for (int i = 0; i < batchSize; i++) {
       // For each batch
-      std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast < FlowFileRecord > (session->create());
+      std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->create());
       if (!flowFile)
         return;
       if (fileSize > 0)
@@ -114,7 +114,7 @@ void GenerateFlowFile::onTrigger(core::ProcessContext *context, core::ProcessSes
     GenerateFlowFile::WriteCallback callback(_data, _dataSize);
     for (int i = 0; i < batchSize; i++) {
       // For each batch
-      std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast < FlowFileRecord > (session->create());
+      std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->create());
       if (!flowFile)
         return;
       if (fileSize > 0)

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/processors/GetFile.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/GetFile.cpp b/libminifi/src/processors/GetFile.cpp
index 723d461..1b73c2d 100644
--- a/libminifi/src/processors/GetFile.cpp
+++ b/libminifi/src/processors/GetFile.cpp
@@ -133,6 +133,9 @@ void GetFile::onSchedule(core::ProcessContext *context, core::ProcessSessionFact
 
 void GetFile::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
   // Perform directory list
+
+  metrics_->iterations_++;
+
   logger_->log_info("Is listing empty %i", isListingEmpty());
   if (isListingEmpty()) {
     if (request_.pollInterval == 0 || (getTimeMillis() - last_listing_time_) > request_.pollInterval) {
@@ -150,7 +153,7 @@ void GetFile::onTrigger(core::ProcessContext *context, core::ProcessSession *ses
         std::string fileName = list.front();
         list.pop();
         logger_->log_info("GetFile process %s", fileName.c_str());
-        std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast < FlowFileRecord > (session->create());
+        std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->create());
         if (flowFile == nullptr)
           return;
         std::size_t found = fileName.find_last_of("/\\");
@@ -172,21 +175,21 @@ void GetFile::onTrigger(core::ProcessContext *context, core::ProcessSession *ses
 }
 
 bool GetFile::isListingEmpty() {
-  std::lock_guard < std::mutex > lock(mutex_);
+  std::lock_guard<std::mutex> lock(mutex_);
 
   return _dirList.empty();
 }
 
 void GetFile::putListing(std::string fileName) {
-  std::lock_guard < std::mutex > lock(mutex_);
+  std::lock_guard<std::mutex> lock(mutex_);
 
   _dirList.push(fileName);
 }
 
 void GetFile::pollListing(std::queue<std::string> &list, const GetFileRequest &request) {
-  std::lock_guard < std::mutex > lock(mutex_);
+  std::lock_guard<std::mutex> lock(mutex_);
 
-  while (!_dirList.empty() && (request.maxSize == 0 || list.size() < request.maxSize)) {
+  while (!_dirList.empty() && (request.batchSize == 0 || list.size() < request.batchSize)) {
     std::string fileName = _dirList.front();
     _dirList.pop();
     list.push(fileName);
@@ -229,7 +232,8 @@ bool GetFile::acceptFile(std::string fullName, std::string name, const GetFileRe
     regfree(&regex);
     if (ret)
       return false;
-
+    metrics_->input_bytes_ += statbuf.st_size;
+    metrics_->accepted_files_++;
     return true;
   }
 
@@ -267,6 +271,11 @@ void GetFile::performListing(std::string dir, const GetFileRequest &request) {
   closedir(d);
 }
 
+int16_t GetFile::getMetrics(std::vector<std::shared_ptr<state::metrics::Metrics>> &metric_vector) {
+  metric_vector.push_back(metrics_);
+  return 0;
+}
+
 } /* namespace processors */
 } /* namespace minifi */
 } /* namespace nifi */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/processors/GetTCP.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/GetTCP.cpp b/libminifi/src/processors/GetTCP.cpp
new file mode 100644
index 0000000..bcf3d58
--- /dev/null
+++ b/libminifi/src/processors/GetTCP.cpp
@@ -0,0 +1,289 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "processors/GetTCP.h"
+#include <sys/time.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <time.h>
+#include <stdio.h>
+#include <dirent.h>
+#include <limits.h>
+#include <unistd.h>
+#include <regex.h>
+#include <vector>
+#include <queue>
+#include <map>
+#include <memory>
+#include <utility>
+#include <set>
+#include <sstream>
+#include <string>
+#include <iostream>
+#include "io/ClientSocket.h"
+#include "utils/StringUtils.h"
+#include "utils/TimeUtil.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+const char *DataHandler::SOURCE_ENDPOINT_ATTRIBUTE = "source.endpoint";
+
+core::Property GetTCP::EndpointList("endpoint-list", "A comma delimited list of the endpoints to connect to. The format should be <server_address>:<port>.", "");
+core::Property GetTCP::ConcurrentHandlers("concurrent-handler-count", "Number of concurrent handlers for this session", "1");
+core::Property GetTCP::ReconnectInterval("reconnect-interval", "The number of seconds to wait before attempting to reconnect to the endpoint.", "5s");
+core::Property GetTCP::ReceiveBufferSize("receive-buffer-size", "The size of the buffer to receive data in. Default 16384 (16MB).", "16MB");
+core::Property GetTCP::StayConnected("Stay Connected", "Determines if we keep the same socket despite having no data", "true");
+core::Property GetTCP::ConnectionAttemptLimit("connection-attempt-timeout", "Maximum number of connection attempts before attempting backup hosts, if configured.", "3");
+core::Property GetTCP::EndOfMessageByte(
+    "end-of-message-byte",
+    "Byte value which denotes end of message. Must be specified as integer within the valid byte range  (-128 thru 127). For example, '13' = Carriage return and '10' = New line. Default '13'.", "13");
+
+core::Relationship GetTCP::Success("success", "All files are routed to success");
+core::Relationship GetTCP::Partial("partial", "Indicates an incomplete message as a result of encountering the end of message byte trigger");
+
+int16_t DataHandler::handle(std::string source, uint8_t *message, size_t size, bool partial) {
+  std::shared_ptr<core::ProcessSession> my_session = sessionFactory_->createSession();
+  std::shared_ptr<core::FlowFile> flowFile = my_session->create();
+
+  DataHandlerCallback callback(message, size);
+
+  my_session->write(flowFile, &callback);
+
+  my_session->putAttribute(flowFile, SOURCE_ENDPOINT_ATTRIBUTE, source);
+
+  if (partial) {
+    my_session->transfer(flowFile, GetTCP::Partial);
+  } else {
+    my_session->transfer(flowFile, GetTCP::Success);
+  }
+
+  my_session->commit();
+
+  return 0;
+}
+void GetTCP::initialize() {
+  // Set the supported properties
+  std::set<core::Property> properties;
+  properties.insert(EndpointList);
+  properties.insert(ConcurrentHandlers);
+  properties.insert(ConnectionAttemptLimit);
+  properties.insert(EndOfMessageByte);
+  properties.insert(ReceiveBufferSize);
+  properties.insert(StayConnected);
+  setSupportedProperties(properties);
+  // Set the supported relationships
+  std::set<core::Relationship> relationships;
+  relationships.insert(Success);
+  relationships.insert(Partial);
+  setSupportedRelationships(relationships);
+}
+
+void GetTCP::onSchedule(std::shared_ptr<core::ProcessContext> context, std::shared_ptr<core::ProcessSessionFactory> sessionFactory) {
+  std::string value;
+  stay_connected_ = true;
+  if (context->getProperty(EndpointList.getName(), value)) {
+    endpoints = utils::StringUtils::split(value, ",");
+  }
+
+  if (context->getProperty(ConcurrentHandlers.getName(), value)) {
+    int64_t handlers = 0;
+    core::Property::StringToInt(value, handlers);
+    concurrent_handlers_ = handlers;
+  }
+
+  if (context->getProperty(StayConnected.getName(), value)) {
+    utils::StringUtils::StringToBool(value, stay_connected_);
+  } else {
+    stay_connected_ = true;
+  }
+  if (context->getProperty(ConnectionAttemptLimit.getName(), value)) {
+    int64_t connects = 0;
+    core::Property::StringToInt(value, connects);
+    connection_attempt_limit_ = connects;
+  }
+  if (context->getProperty(ReceiveBufferSize.getName(), value)) {
+    int64_t size = 0;
+    core::Property::StringToInt(value, size);
+    receive_buffer_size_ = size;
+  }
+
+  if (context->getProperty(EndOfMessageByte.getName(), value)) {
+    int64_t byteValue = 0;
+    core::Property::StringToInt(value, byteValue);
+    endOfMessageByte = byteValue & 0xFF;
+  }
+
+  if (context->getProperty(ReconnectInterval.getName(), value)) {
+    int64_t msec;
+    core::TimeUnit unit;
+    if (core::Property::StringToTime(value, msec, unit) && core::Property::ConvertTimeUnitToMS(msec, unit, msec)) {
+      reconnect_interval_ = msec;
+      logger_->log_debug("successfully applied reconnect interval of %d", reconnect_interval_);
+    }
+  } else {
+    reconnect_interval_ = 5000;
+  }
+
+  handler_ = std::unique_ptr<DataHandler>(new DataHandler(sessionFactory));
+
+  f_ex = [&] {
+    std::unique_ptr<io::Socket> socket_ptr;
+    // reuse the byte buffer.
+      std::vector<uint8_t> buffer;
+      int reconnects = 0;
+      do {
+        if ( socket_ring_buffer_.try_dequeue(socket_ptr) ) {
+          int size_read = socket_ptr->readData(buffer, receive_buffer_size_, false);
+
+          if (size_read >= 0) {
+            if (size_read > 0) {
+              // determine cut location
+              int startLoc = 0, i = 0;
+              for (; i < size_read; i++) {
+                if (buffer.at(i) == endOfMessageByte && i > 0) {
+                  if (i-startLoc > 0) {
+                    handler_->handle(socket_ptr->getHostname(), buffer.data()+startLoc, (i-startLoc), true);
+                  }
+                  startLoc = i;
+                }
+              }
+              if (startLoc > 0) {
+                logger_->log_info("Starting at %i, ending at %i", startLoc, size_read);
+                if (size_read-startLoc > 0) {
+                  handler_->handle(socket_ptr->getHostname(), buffer.data()+startLoc, (size_read-startLoc), true);
+                }
+              } else {
+                logger_->log_info("Handling at %i, ending at %i", startLoc, size_read);
+                if (size_read > 0) {
+                  handler_->handle(socket_ptr->getHostname(), buffer.data(), size_read, false);
+                }
+              }
+              reconnects = 0;
+            }
+            socket_ring_buffer_.enqueue(std::move(socket_ptr));
+          } else if (size_read == -2 && stay_connected_) {
+            if (++reconnects > connection_attempt_limit_) {
+              logger_->log_info("Too many reconnects, exiting thread");
+              socket_ptr->closeStream();
+              return -1;
+            }
+            logger_->log_info("Sleeping for %d msec before attempting to reconnect", reconnect_interval_);
+            std::this_thread::sleep_for(std::chrono::milliseconds(reconnect_interval_));
+            socket_ring_buffer_.enqueue(std::move(socket_ptr));
+          } else {
+            socket_ptr->closeStream();
+            std::this_thread::sleep_for(std::chrono::milliseconds(reconnect_interval_));
+            logger_->log_info("Read response returned a -1 from socket, exiting thread");
+            return -1;
+          }
+        } else {
+          std::this_thread::sleep_for(std::chrono::milliseconds(reconnect_interval_));
+          logger_->log_info("Could not use socket, exiting thread");
+          return -1;
+        }
+      }while (running_);
+      logger_->log_info("Ending private thread");
+      return 0;
+    };
+
+  utils::ThreadPool<int> pool = utils::ThreadPool<int>(concurrent_handlers_);
+  client_thread_pool_ = std::move(pool);
+  client_thread_pool_.start();
+
+  running_ = true;
+}
+
+void GetTCP::notifyStop() {
+  running_ = false;
+  // await threads to shutdown.
+  client_thread_pool_.shutdown();
+  std::unique_ptr<io::Socket> socket_ptr;
+  while (socket_ring_buffer_.size_approx() > 0) {
+    socket_ring_buffer_.try_dequeue(socket_ptr);
+  }
+}
+void GetTCP::onTrigger(std::shared_ptr<core::ProcessContext> context, std::shared_ptr<core::ProcessSession> session) {
+  // Perform directory list
+  metrics_->iterations_++;
+  std::lock_guard<std::mutex> lock(mutex_);
+  // check if the futures are valid. If they've terminated remove it from the map.
+
+  for (auto &endpoint : endpoints) {
+    auto endPointFuture = live_clients_.find(endpoint);
+
+    // does not exist
+    if (endPointFuture == live_clients_.end()) {
+      logger_->log_info("creating endpoint for %s", endpoint);
+      std::vector<std::string> hostAndPort = utils::StringUtils::split(endpoint, ":");
+      if (hostAndPort.size() == 2) {
+        logger_->log_debug("Opening another socket to %s:%s", hostAndPort.at(0), hostAndPort.at(1));
+        std::unique_ptr<io::Socket> socket = std::move(stream_factory_->createSocket(hostAndPort.at(0), std::stoi(hostAndPort.at(1))));
+
+        if (socket->initialize() != -1) {
+          socket->setNonBlocking();
+          logger_->log_debug("Enqueueing socket into ring buffer %s:%s", hostAndPort.at(0), hostAndPort.at(1));
+          socket_ring_buffer_.enqueue(std::move(socket));
+
+        } else {
+          logger_->log_error("Could not create socket during initialization for %s", endpoint);
+        }
+      } else {
+        logger_->log_error("Could not create socket for %s", endpoint);
+      }
+
+      std::future<int> *future = new std::future<int>();
+
+      std::unique_ptr<utils::AfterExecute<int>> after_execute = std::unique_ptr<utils::AfterExecute<int>>(new SocketAfterExecute(running_, endpoint, &live_clients_, &mutex_));
+      utils::Worker<int> functor(f_ex, "workers", std::move(after_execute));
+      if (client_thread_pool_.execute(std::move(functor), *future)) {
+        live_clients_[endpoint] = future;
+      }
+    } else {
+      if (!endPointFuture->second->valid()) {
+        delete endPointFuture->second;
+        std::future<int> *future = new std::future<int>();
+        std::unique_ptr<utils::AfterExecute<int>> after_execute = std::unique_ptr<utils::AfterExecute<int>>(new SocketAfterExecute(running_, endpoint, &live_clients_, &mutex_));
+        utils::Worker<int> functor(f_ex, "workers", std::move(after_execute));
+        if (client_thread_pool_.execute(std::move(functor), *future)) {
+          live_clients_[endpoint] = future;
+        }
+      } else {
+        logger_->log_info("Thread still running for %s", endPointFuture->first);
+        // we have a thread corresponding to this.
+      }
+    }
+  }
+  logger_->log_info("Updating endpoint");
+  context->yield();
+}
+
+int16_t GetTCP::getMetrics(std::vector<std::shared_ptr<state::metrics::Metrics>> &metric_vector) {
+  metric_vector.push_back(metrics_);
+  return 0;
+}
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/processors/InvokeHTTP.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/InvokeHTTP.cpp b/libminifi/src/processors/InvokeHTTP.cpp
index 81271a5..873e317 100644
--- a/libminifi/src/processors/InvokeHTTP.cpp
+++ b/libminifi/src/processors/InvokeHTTP.cpp
@@ -42,7 +42,7 @@
 #include "ResourceClaim.h"
 #include "utils/StringUtils.h"
 #include "utils/ByteInputCallBack.h"
-#include "utils/HTTPUtils.h"
+#include "utils/HTTPClient.h"
 
 namespace org {
 namespace apache {
@@ -77,7 +77,10 @@ core::Property InvokeHTTP::ContentType("Content-type", "The Content-Type to spec
 core::Property InvokeHTTP::SendBody("send-message-body", "If true, sends the HTTP message body on POST/PUT/PATCH requests (default).  "
                                     "If false, suppresses the message body and content-type header for these requests.",
                                     "true");
-
+core::Property InvokeHTTP::UseChunkedEncoding("Use Chunked Encoding", "When POST'ing, PUT'ing or PATCH'ing content set this property to true in order to not pass the 'Content-length' header"
+                                              " and instead send 'Transfer-Encoding' with a value of 'chunked'. This will enable the data transfer mechanism which was introduced in HTTP 1.1 "
+                                              "to pass data of unknown lengths in chunks.",
+                                              "false");
 core::Property InvokeHTTP::PropPutOutputAttributes("Put Response Body in Attribute", "If set, the response body received back will be put into an attribute of the original "
                                                    "FlowFile instead of a separate FlowFile. The attribute key to put to is determined by evaluating value of this property. ",
                                                    "");
@@ -86,6 +89,7 @@ core::Property InvokeHTTP::AlwaysOutputResponse("Always Output Response", "Will
                                                 "false");
 core::Property InvokeHTTP::PenalizeOnNoRetry("Penalize on \"No Retry\"", "Enabling this property will penalize FlowFiles that are routed to the \"No Retry\" relationship.", "false");
 
+core::Property InvokeHTTP::DisablePeerVerification("Disable Peer Verification", "Disables peer verification for the SSL session", "false");
 const char* InvokeHTTP::STATUS_CODE = "invokehttp.status.code";
 const char* InvokeHTTP::STATUS_MESSAGE = "invokehttp.status.message";
 const char* InvokeHTTP::RESPONSE_BODY = "invokehttp.response.body";
@@ -108,19 +112,6 @@ core::Relationship InvokeHTTP::RelNoRetry("no retry", "The original FlowFile wil
 core::Relationship InvokeHTTP::RelFailure("failure", "The original FlowFile will be routed on any type of connection failure, "
                                           "timeout or general exception. It will have new attributes detailing the request.");
 
-void InvokeHTTP::set_request_method(CURL *curl, const std::string &method) {
-  std::string my_method = method;
-  std::transform(my_method.begin(), my_method.end(), my_method.begin(), ::toupper);
-  if (my_method == "POST") {
-    curl_easy_setopt(curl, CURLOPT_POST, 1);
-  } else if (my_method == "PUT") {
-    curl_easy_setopt(curl, CURLOPT_PUT, 1);
-  } else if (my_method == "GET") {
-  } else {
-    curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, my_method.c_str());
-  }
-}
-
 void InvokeHTTP::initialize() {
   logger_->log_info("Initializing InvokeHTTP");
 
@@ -136,9 +127,11 @@ void InvokeHTTP::initialize() {
   properties.insert(ProxyHost);
   properties.insert(ProxyPort);
   properties.insert(ProxyUser);
+  properties.insert(UseChunkedEncoding);
   properties.insert(ProxyPassword);
   properties.insert(ContentType);
   properties.insert(SendBody);
+  properties.insert(DisablePeerVerification);
   properties.insert(AlwaysOutputResponse);
 
   setSupportedProperties(properties);
@@ -171,6 +164,11 @@ void InvokeHTTP::onSchedule(core::ProcessContext *context, core::ProcessSessionF
     return;
   }
 
+  std::string contentTypeStr;
+  if (context->getProperty(ContentType.getName(), contentTypeStr)) {
+    content_type_ = contentTypeStr;
+  }
+
   if (context->getProperty(ReadTimeout.getName(), timeoutStr)) {
     core::Property::StringToInt(timeoutStr, read_timeout_);
 
@@ -195,14 +193,14 @@ void InvokeHTTP::onSchedule(core::ProcessContext *context, core::ProcessSessionF
 
   std::string always_output_response = "false";
   if (!context->getProperty(AlwaysOutputResponse.getName(), always_output_response)) {
-    logger_->log_info("%s attribute is missing, so default value of %s will be used", AttributesToSend.getName().c_str(), AttributesToSend.getValue().c_str());
+    logger_->log_info("%s attribute is missing, so default value of %s will be used", AlwaysOutputResponse.getName().c_str(), AlwaysOutputResponse.getValue().c_str());
   }
 
   utils::StringUtils::StringToBool(always_output_response, always_output_response_);
 
   std::string penalize_no_retry = "false";
   if (!context->getProperty(PenalizeOnNoRetry.getName(), penalize_no_retry)) {
-    logger_->log_info("%s attribute is missing, so default value of %s will be used", AttributesToSend.getName().c_str(), AttributesToSend.getValue().c_str());
+    logger_->log_info("%s attribute is missing, so default value of %s will be used", PenalizeOnNoRetry.getName().c_str(), PenalizeOnNoRetry.getValue().c_str());
   }
 
   utils::StringUtils::StringToBool(penalize_no_retry, penalize_no_retry_);
@@ -211,29 +209,24 @@ void InvokeHTTP::onSchedule(core::ProcessContext *context, core::ProcessSessionF
   if (context->getProperty(SSLContext.getName(), context_name) && !IsNullOrEmpty(context_name)) {
     std::shared_ptr<core::controller::ControllerService> service = context->getControllerService(context_name);
     if (nullptr != service) {
-      ssl_context_service_ = std::static_pointer_cast < minifi::controllers::SSLContextService > (service);
+      ssl_context_service_ = std::static_pointer_cast<minifi::controllers::SSLContextService>(service);
     }
   }
-}
 
-InvokeHTTP::~InvokeHTTP() {
-  curl_global_cleanup();
-}
+  std::string useChunkedEncoding = "false";
+  if (!context->getProperty(UseChunkedEncoding.getName(), useChunkedEncoding)) {
+    logger_->log_info("%s attribute is missing, so default value of %s will be used", UseChunkedEncoding.getName().c_str(), UseChunkedEncoding.getValue().c_str());
+  }
 
-inline bool InvokeHTTP::matches(const std::string &value, const std::string &sregex) {
-  if (sregex == ".*")
-    return true;
+  utils::StringUtils::StringToBool(useChunkedEncoding, use_chunked_encoding_);
 
-  regex_t regex;
-  int ret = regcomp(&regex, sregex.c_str(), 0);
-  if (ret)
-    return false;
-  ret = regexec(&regex, value.c_str(), (size_t) 0, NULL, 0);
-  regfree(&regex);
-  if (ret)
-    return false;
+  std::string disablePeerVerification = "false";
+  if (context->getProperty(DisablePeerVerification.getName(), disablePeerVerification)) {
+    utils::StringUtils::StringToBool(disablePeerVerification, disable_peer_verification_);
+  }
+}
 
-  return true;
+InvokeHTTP::~InvokeHTTP() {
 }
 
 std::string InvokeHTTP::generateId() {
@@ -248,50 +241,15 @@ bool InvokeHTTP::emitFlowFile(const std::string &method) {
   return ("POST" == method || "PUT" == method || "PATCH" == method);
 }
 
-struct curl_slist *InvokeHTTP::build_header_list(CURL *curl, std::string regex, const std::map<std::string, std::string> &attributes) {
-  struct curl_slist *list = NULL;
-  if (curl) {
-    for (auto attribute : attributes) {
-      if (matches(attribute.first, regex)) {
-        std::string attr = attribute.first + ":" + attribute.second;
-        list = curl_slist_append(list, attr.c_str());
-      }
-    }
-  }
-  return list;
-}
-
-bool InvokeHTTP::isSecure(const std::string &url) {
-  if (url.find("https") != std::string::npos) {
-    return true;
-  }
-  return false;
-}
-
-CURLcode InvokeHTTP::configure_ssl_context(CURL *curl, void *ctx, void *param) {
-  minifi::controllers::SSLContextService *ssl_context_service = static_cast<minifi::controllers::SSLContextService*>(param);
-  if (!ssl_context_service->configure_ssl_context(static_cast<SSL_CTX*>(ctx))) {
-    return CURLE_FAILED_INIT;
-  }
-  return CURLE_OK;
-}
-
-void InvokeHTTP::configure_secure_connection(CURL *http_session) {
-  logger_->log_debug("InvokeHTTP -- Using certificate file %s", ssl_context_service_->getCertificateFile());
-  curl_easy_setopt(http_session, CURLOPT_VERBOSE, 1L);
-  curl_easy_setopt(http_session, CURLOPT_SSL_CTX_FUNCTION, &InvokeHTTP::configure_ssl_context);
-  curl_easy_setopt(http_session, CURLOPT_SSL_CTX_DATA, static_cast<void*>(ssl_context_service_.get()));
-}
-
 void InvokeHTTP::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
-  std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast < FlowFileRecord > (session->get());
-
   logger_->log_info("onTrigger InvokeHTTP with  %s", method_.c_str());
 
+  std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->get());
+
   if (flowFile == nullptr) {
     if (!emitFlowFile(method_)) {
       logger_->log_info("InvokeHTTP -- create flow file with  %s", method_.c_str());
-      flowFile = std::static_pointer_cast < FlowFileRecord > (session->create());
+      flowFile = std::static_pointer_cast<FlowFileRecord>(session->create());
     } else {
       logger_->log_info("exiting because method is %s", method_.c_str());
       return;
@@ -302,42 +260,39 @@ void InvokeHTTP::onTrigger(core::ProcessContext *context, core::ProcessSession *
   // create a transaction id
   std::string tx_id = generateId();
 
-  CURL *http_session = curl_easy_init();
-  // set the HTTP request method from libCURL
-  set_request_method(http_session, method_);
-  if (isSecure(url_) && ssl_context_service_ != nullptr) {
-    configure_secure_connection(http_session);
-  }
+  utils::HTTPClient client(url_, ssl_context_service_);
 
-  curl_easy_setopt(http_session, CURLOPT_URL, url_.c_str());
+  client.setVerbose();
+  client.initialize(method_);
+  client.setConnectionTimeout(connect_timeout_);
+  client.setReadTimeout(read_timeout_);
 
-  if (connect_timeout_ > 0) {
-    curl_easy_setopt(http_session, CURLOPT_TIMEOUT, connect_timeout_);
+  if (!content_type_.empty()) {
+    client.setContentType(content_type_);
   }
 
-  if (read_timeout_ > 0) {
-    curl_easy_setopt(http_session, CURLOPT_TIMEOUT, read_timeout_);
+  if (use_chunked_encoding_) {
+    client.setUseChunkedEncoding();
   }
 
-  utils::HTTPRequestResponse content;
-  curl_easy_setopt(http_session, CURLOPT_WRITEFUNCTION, &utils::HTTPRequestResponse::recieve_write);
-
-  curl_easy_setopt(http_session, CURLOPT_WRITEDATA, static_cast<void*>(&content));
+  if (disable_peer_verification_) {
+    logger_->log_debug("Disabling peer verification in HTTPClient");
+    client.setDisablePeerVerification();
+  }
 
+  std::unique_ptr<utils::ByteInputCallBack> callback = nullptr;
+  std::unique_ptr<utils::HTTPUploadCallback> callbackObj = nullptr;
   if (emitFlowFile(method_)) {
     logger_->log_info("InvokeHTTP -- reading flowfile");
     std::shared_ptr<ResourceClaim> claim = flowFile->getResourceClaim();
     if (claim) {
-      utils::ByteInputCallBack *callback = new utils::ByteInputCallBack();
-      session->read(flowFile, callback);
-      utils::CallBackPosition *callbackObj = new utils::CallBackPosition;
-      callbackObj->ptr = callback;
+      callback = std::unique_ptr<utils::ByteInputCallBack>(new utils::ByteInputCallBack());
+      session->read(flowFile, callback.get());
+      callbackObj = std::unique_ptr<utils::HTTPUploadCallback>(new utils::HTTPUploadCallback);
+      callbackObj->ptr = callback.get();
       callbackObj->pos = 0;
-      logger_->log_info("InvokeHTTP -- Setting callback");
-      curl_easy_setopt(http_session, CURLOPT_INFILESIZE_LARGE, (curl_off_t)callback->getBufferSize());
-      curl_easy_setopt(http_session, CURLOPT_READFUNCTION, &utils::HTTPRequestResponse::send_write);
-      curl_easy_setopt(http_session, CURLOPT_READDATA, static_cast<void*>(callbackObj));
-
+      logger_->log_info("InvokeHTTP -- Setting callback, size is %d", callback->getBufferSize());
+      client.setUploadCallback(callbackObj.get());
     } else {
       logger_->log_error("InvokeHTTP -- no resource claim");
     }
@@ -347,63 +302,56 @@ void InvokeHTTP::onTrigger(core::ProcessContext *context, core::ProcessSession *
   }
 
   // append all headers
-  struct curl_slist *headers = build_header_list(http_session, attribute_to_send_regex_, flowFile->getAttributes());
-  curl_easy_setopt(http_session, CURLOPT_HTTPHEADER, headers);
+  client.build_header_list(attribute_to_send_regex_, flowFile->getAttributes());
 
   logger_->log_info("InvokeHTTP -- curl performed");
-  res = curl_easy_perform(http_session);
-
-  if (res == CURLE_OK) {
+  if (client.submit()) {
     logger_->log_info("InvokeHTTP -- curl successful");
 
     bool putToAttribute = !IsNullOrEmpty(put_attribute_name_);
 
-    std::string response_body(content.data.begin(), content.data.end());
-    int64_t http_code = 0;
-    curl_easy_getinfo(http_session, CURLINFO_RESPONSE_CODE, &http_code);
-    char *content_type;
-    /* ask for the content-type */
-    curl_easy_getinfo(http_session, CURLINFO_CONTENT_TYPE, &content_type);
+    const std::vector<char> &response_body = client.getResponseBody();
+    const std::vector<std::string> &response_headers = client.getHeaders();
 
+    int64_t http_code = client.getResponseCode();
+    const char *content_type = client.getContentType();
     flowFile->addAttribute(STATUS_CODE, std::to_string(http_code));
-    flowFile->addAttribute(STATUS_MESSAGE, response_body);
+    if (response_headers.size() > 0)
+      flowFile->addAttribute(STATUS_MESSAGE, response_headers.at(0));
     flowFile->addAttribute(REQUEST_URL, url_);
     flowFile->addAttribute(TRANSACTION_ID, tx_id);
 
-    bool isSuccess = ((int32_t) (http_code / 100)) == 2 && res != CURLE_ABORTED_BY_CALLBACK;
+    bool isSuccess = ((int32_t) (http_code / 100)) == 2;
     bool output_body_to_requestAttr = (!isSuccess || putToAttribute) && flowFile != nullptr;
     bool output_body_to_content = isSuccess && !putToAttribute;
-    bool body_empty = IsNullOrEmpty(content.data);
+    bool body_empty = IsNullOrEmpty(response_body);
 
-    logger_->log_info("isSuccess: %d, response code %d ", isSuccess, http_code);
+    logger_->log_info("isSuccess: %d, response code %d", isSuccess, http_code);
     std::shared_ptr<FlowFileRecord> response_flow = nullptr;
 
     if (output_body_to_content) {
       if (flowFile != nullptr) {
-        response_flow = std::static_pointer_cast < FlowFileRecord > (session->create(flowFile));
+        response_flow = std::static_pointer_cast<FlowFileRecord>(session->create(flowFile));
       } else {
-        response_flow = std::static_pointer_cast < FlowFileRecord > (session->create());
+        response_flow = std::static_pointer_cast<FlowFileRecord>(session->create());
       }
 
       std::string ct = content_type;
       response_flow->addKeyedAttribute(MIME_TYPE, ct);
       response_flow->addAttribute(STATUS_CODE, std::to_string(http_code));
-      response_flow->addAttribute(STATUS_MESSAGE, response_body);
+      if (response_headers.size() > 0)
+        flowFile->addAttribute(STATUS_MESSAGE, response_headers.at(0));
       response_flow->addAttribute(REQUEST_URL, url_);
       response_flow->addAttribute(TRANSACTION_ID, tx_id);
-      io::DataStream stream((const uint8_t*) content.data.data(), content.data.size());
+      io::DataStream stream((const uint8_t*) response_body.data(), response_body.size());
       // need an import from the data stream.
       session->importFrom(stream, response_flow);
     } else {
       logger_->log_info("Cannot output body to content");
-      response_flow = std::static_pointer_cast < FlowFileRecord > (session->create());
+      response_flow = std::static_pointer_cast<FlowFileRecord>(session->create());
     }
     route(flowFile, response_flow, session, context, isSuccess, http_code);
-  } else {
-    logger_->log_error("InvokeHTTP -- curl_easy_perform() failed %s\n", curl_easy_strerror(res));
   }
-  curl_slist_free_all(headers);
-  curl_easy_cleanup(http_session);
 }
 
 void InvokeHTTP::route(std::shared_ptr<FlowFileRecord> &request, std::shared_ptr<FlowFileRecord> &response, core::ProcessSession *session, core::ProcessContext *context, bool isSuccess,
@@ -416,6 +364,7 @@ void InvokeHTTP::route(std::shared_ptr<FlowFileRecord> &request, std::shared_ptr
   // If the property to output the response flowfile regardless of status code is set then transfer it
   bool responseSent = false;
   if (always_output_response_ && response != nullptr) {
+    logger_->log_info("Outputting success and response");
     session->transfer(response, Success);
     responseSent = true;
   }
@@ -428,6 +377,7 @@ void InvokeHTTP::route(std::shared_ptr<FlowFileRecord> &request, std::shared_ptr
       session->transfer(request, Success);
     }
     if (response != nullptr && !responseSent) {
+      logger_->log_info("Outputting success and response");
       session->transfer(response, Success);
     }
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/processors/ListenHTTP.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/ListenHTTP.cpp b/libminifi/src/processors/ListenHTTP.cpp
index d410547..1e2241d 100644
--- a/libminifi/src/processors/ListenHTTP.cpp
+++ b/libminifi/src/processors/ListenHTTP.cpp
@@ -201,7 +201,7 @@ ListenHTTP::~ListenHTTP() {
 }
 
 void ListenHTTP::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
-  std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast < FlowFileRecord > (session->get());
+  std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->get());
 
   // Do nothing if there are no incoming files
   if (!flowFile) {
@@ -243,7 +243,7 @@ bool ListenHTTP::Handler::handlePost(CivetServer *server, struct mg_connection *
 
   auto session = _processSessionFactory->createSession();
   ListenHTTP::WriteCallback callback(conn, req_info);
-  auto flowFile = std::static_pointer_cast < FlowFileRecord > (session->create());
+  auto flowFile = std::static_pointer_cast<FlowFileRecord>(session->create());
 
   if (!flowFile) {
     sendErrorResponse(conn);
@@ -301,8 +301,10 @@ int64_t ListenHTTP::WriteCallback::process(std::shared_ptr<io::BaseStream> strea
   int64_t tlen = _reqInfo->content_length;
   uint8_t buf[16384];
 
-  while (nlen < tlen) {
-    rlen = tlen - nlen;
+  // if we have no content length we should call mg_read until
+  // there is no data left from the stream to be HTTP/1.1 compliant
+  while (tlen == -1 || nlen < tlen) {
+    rlen = tlen == -1 ? sizeof(buf) : tlen - nlen;
 
     if (rlen > sizeof(buf)) {
       rlen = sizeof(buf);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/processors/ListenSyslog.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/ListenSyslog.cpp b/libminifi/src/processors/ListenSyslog.cpp
index a5fdf28..054d585 100644
--- a/libminifi/src/processors/ListenSyslog.cpp
+++ b/libminifi/src/processors/ListenSyslog.cpp
@@ -279,7 +279,7 @@ void ListenSyslog::onTrigger(core::ProcessContext *context, core::ProcessSession
     SysLogEvent event = eventQueue.front();
     eventQueue.pop();
     if (firstEvent) {
-      flowFile = std::static_pointer_cast < FlowFileRecord > (session->create());
+      flowFile = std::static_pointer_cast<FlowFileRecord>(session->create());
       if (!flowFile)
         return;
       ListenSyslog::WriteCallback callback(event.payload, event.len);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/processors/LogAttribute.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/LogAttribute.cpp b/libminifi/src/processors/LogAttribute.cpp
index ad8e664..0c2f64e 100644
--- a/libminifi/src/processors/LogAttribute.cpp
+++ b/libminifi/src/processors/LogAttribute.cpp
@@ -64,6 +64,7 @@ void LogAttribute::initialize() {
 }
 
 void LogAttribute::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
+  logger_->log_info("enter log attribute");
   std::string dashLine = "--------------------------------------------------";
   LogAttrLevel level = LogAttrLevelInfo;
   bool logPayload = false;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/processors/TailFile.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/TailFile.cpp b/libminifi/src/processors/TailFile.cpp
index f87f4ec..711f226 100644
--- a/libminifi/src/processors/TailFile.cpp
+++ b/libminifi/src/processors/TailFile.cpp
@@ -247,11 +247,11 @@ void TailFile::onTrigger(core::ProcessContext *context, core::ProcessSession *se
 
     if (!this->_delimiter.empty()) {
       char delim = this->_delimiter.c_str()[0];
-      std::vector<std::shared_ptr<FlowFileRecord>> flowFiles = std::vector<std::shared_ptr<FlowFileRecord>>();
+      std::vector<std::shared_ptr<FlowFileRecord>> flowFiles;
       session->import(fullPath, flowFiles, true, this->_currentTailFilePosition, delim);
       logger_->log_info("%d flowfiles were received from TailFile input", flowFiles.size());
 
-      for (std::shared_ptr<FlowFileRecord> ffr : flowFiles) {
+      for (auto ffr : flowFiles) {
         logger_->log_info("TailFile %s for %d bytes", _currentTailFileName, ffr->getSize());
         std::string logName = baseName + "." + std::to_string(_currentTailFilePosition) + "-" + std::to_string(_currentTailFilePosition + ffr->getSize()) + "." + extension;
         ffr->updateKeyedAttribute(PATH, fileLocation);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/provenance/Provenance.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/provenance/Provenance.cpp b/libminifi/src/provenance/Provenance.cpp
index 3e42a5a..b46cbc0 100644
--- a/libminifi/src/provenance/Provenance.cpp
+++ b/libminifi/src/provenance/Provenance.cpp
@@ -218,10 +218,8 @@ bool ProvenanceEventRecord::Serialize(const std::shared_ptr<core::SerializableCo
       return false;
     }
   }
-  // Persistent to the DB
-  if (repo->Serialize(uuidStr_, const_cast<uint8_t*>(outStream.getBuffer()), outStream.getSize())) {
-    logger_->log_debug("NiFi Provenance Store event %s size %d success", uuidStr_, outStream.getSize());
-  } else {
+  // Persist to the DB
+  if (!repo->Serialize(uuidStr_, const_cast<uint8_t*>(outStream.getBuffer()), outStream.getSize())) {
     logger_->log_error("NiFi Provenance Store event %s size %d fail", uuidStr_, outStream.getSize());
   }
   return true;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/provenance/ProvenanceRepository.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/provenance/ProvenanceRepository.cpp b/libminifi/src/provenance/ProvenanceRepository.cpp
index ce19fe4..665837c 100644
--- a/libminifi/src/provenance/ProvenanceRepository.cpp
+++ b/libminifi/src/provenance/ProvenanceRepository.cpp
@@ -17,6 +17,7 @@
  */
 
 #include "provenance/ProvenanceRepository.h"
+#include "leveldb/write_batch.h"
 #include <string>
 #include <vector>
 #include "provenance/Provenance.h"
@@ -26,13 +27,39 @@ namespace nifi {
 namespace minifi {
 namespace provenance {
 
+void ProvenanceRepository::flush() {
+  leveldb::WriteBatch batch;
+  std::string key;
+  std::string value;
+  leveldb::ReadOptions options;
+  uint64_t decrement_total = 0;
+  while (keys_to_delete.size_approx() > 0) {
+    if (keys_to_delete.try_dequeue(key)) {
+      db_->Get(options, key, &value);
+      decrement_total += value.size();
+      batch.Delete(key);
+      logger_->log_info("Removing %s", key);
+    }
+  }
+  if (db_->Write(leveldb::WriteOptions(), &batch).ok()) {
+    logger_->log_info("Decrementing %u from a repo size of %u", decrement_total, repo_size_.load());
+    if (decrement_total > repo_size_.load()) {
+      repo_size_ = 0;
+    } else {
+      repo_size_ -= decrement_total;
+    }
+  }
+}
+
 void ProvenanceRepository::run() {
-  // threshold for purge
-  uint64_t purgeThreshold = max_partition_bytes_ * 3 / 4;
   while (running_) {
     std::this_thread::sleep_for(std::chrono::milliseconds(purge_period_));
     uint64_t curTime = getTimeMillis();
-    uint64_t size = repoSize();
+    // threshold for purge
+    uint64_t purgeThreshold = max_partition_bytes_ * 3 / 4;
+
+    uint64_t size = getRepoSize();
+
     if (size >= purgeThreshold) {
       leveldb::Iterator* it = db_->NewIterator(leveldb::ReadOptions());
       for (it->SeekToFirst(); it->Valid(); it->Next()) {
@@ -49,12 +76,13 @@ void ProvenanceRepository::run() {
       }
       delete it;
     }
+    flush();
+    size = getRepoSize();
     if (size > max_partition_bytes_)
       repo_full_ = true;
     else
       repo_full_ = false;
   }
-  return;
 }
 } /* namespace provenance */
 } /* namespace minifi */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/utils/HttpClient.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/utils/HttpClient.cpp b/libminifi/src/utils/HttpClient.cpp
new file mode 100644
index 0000000..fddbf2f
--- /dev/null
+++ b/libminifi/src/utils/HttpClient.cpp
@@ -0,0 +1,214 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "utils/HTTPClient.h"
+#include <memory>
+#include <map>
+#include <vector>
+#include <string>
+#include <algorithm>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+HTTPClient::HTTPClient(const std::string &url, const std::shared_ptr<minifi::controllers::SSLContextService> ssl_context_service)
+    : ssl_context_service_(ssl_context_service),
+      url_(url),
+      logger_(logging::LoggerFactory<HTTPClient>::getLogger()),
+      connect_timeout_(0),
+      read_timeout_(0),
+      content_type(nullptr),
+      headers_(nullptr),
+      http_code(0),
+      header_response_(1),
+      res(CURLE_OK) {
+  HTTPClientInitializer *initializer = HTTPClientInitializer::getInstance();
+  http_session_ = curl_easy_init();
+}
+
+HTTPClient::~HTTPClient() {
+  if (nullptr != headers_) {
+    curl_slist_free_all(headers_);
+  }
+  curl_easy_cleanup(http_session_);
+}
+
+void HTTPClient::setVerbose() {
+  curl_easy_setopt(http_session_, CURLOPT_VERBOSE, 1L);
+}
+
+void HTTPClient::initialize(const std::string &method) {
+  method_ = method;
+  set_request_method(method_);
+  if (isSecure(url_) && ssl_context_service_ != nullptr) {
+    configure_secure_connection(http_session_);
+  }
+}
+
+void HTTPClient::setDisablePeerVerification() {
+  logger_->log_info("Disabling peer verification");
+  curl_easy_setopt(http_session_, CURLOPT_SSL_VERIFYPEER, 0L);
+}
+
+void HTTPClient::setConnectionTimeout(int64_t timeout) {
+  connect_timeout_ = timeout;
+}
+
+void HTTPClient::setReadTimeout(int64_t timeout) {
+  read_timeout_ = timeout;
+}
+
+void HTTPClient::setUploadCallback(HTTPUploadCallback *callbackObj) {
+  logger_->log_info("Setting callback");
+  if (method_ == "put" || method_ == "PUT") {
+    curl_easy_setopt(http_session_, CURLOPT_INFILESIZE_LARGE, (curl_off_t) callbackObj->ptr->getBufferSize());
+  } else {
+    curl_easy_setopt(http_session_, CURLOPT_POSTFIELDSIZE, (curl_off_t) callbackObj->ptr->getBufferSize());
+  }
+  curl_easy_setopt(http_session_, CURLOPT_READFUNCTION, &utils::HTTPRequestResponse::send_write);
+  curl_easy_setopt(http_session_, CURLOPT_READDATA, static_cast<void*>(callbackObj));
+}
+
+struct curl_slist *HTTPClient::build_header_list(std::string regex, const std::map<std::string, std::string> &attributes) {
+  if (http_session_) {
+    for (auto attribute : attributes) {
+      if (matches(attribute.first, regex)) {
+        std::string attr = attribute.first + ":" + attribute.second;
+        headers_ = curl_slist_append(headers_, attr.c_str());
+      }
+    }
+  }
+  return headers_;
+}
+
+void HTTPClient::setContentType(std::string content_type) {
+  content_type_ = "Content-Type: " + content_type;
+  headers_ = curl_slist_append(headers_, content_type_.c_str());
+}
+
+std::string HTTPClient::escape(std::string string_to_escape) {
+  return curl_easy_escape(http_session_, string_to_escape.c_str(), string_to_escape.length());
+}
+
+void HTTPClient::setPostFields(std::string input) {
+  curl_easy_setopt(http_session_, CURLOPT_POSTFIELDSIZE, input.length());
+  curl_easy_setopt(http_session_, CURLOPT_POSTFIELDS, input.c_str());
+}
+
+void HTTPClient::setHeaders(struct curl_slist *list) {
+  headers_ = list;
+}
+
+void HTTPClient::setUseChunkedEncoding() {
+  headers_ = curl_slist_append(headers_, "Transfer-Encoding: chunked");
+}
+
+bool HTTPClient::submit() {
+  if (connect_timeout_ > 0) {
+    curl_easy_setopt(http_session_, CURLOPT_CONNECTTIMEOUT, connect_timeout_);
+  }
+
+  if (headers_ != nullptr) {
+    headers_ = curl_slist_append(headers_, "Expect:");
+    curl_easy_setopt(http_session_, CURLOPT_HTTPHEADER, headers_);
+  }
+
+  curl_easy_setopt(http_session_, CURLOPT_URL, url_.c_str());
+  logger_->log_info("Submitting to %s", url_);
+  curl_easy_setopt(http_session_, CURLOPT_WRITEFUNCTION, &utils::HTTPRequestResponse::recieve_write);
+  curl_easy_setopt(http_session_, CURLOPT_WRITEDATA, static_cast<void*>(&content_));
+
+  curl_easy_setopt(http_session_, CURLOPT_HEADERFUNCTION, &utils::HTTPHeaderResponse::receive_headers);
+  curl_easy_setopt(http_session_, CURLOPT_HEADERDATA, static_cast<void*>(&header_response_));
+
+  res = curl_easy_perform(http_session_);
+  curl_easy_getinfo(http_session_, CURLINFO_RESPONSE_CODE, &http_code);
+  curl_easy_getinfo(http_session_, CURLINFO_CONTENT_TYPE, &content_type);
+  if (res != CURLE_OK) {
+    logger_->log_error("curl_easy_perform() failed %s\n", curl_easy_strerror(res));
+    return false;
+  }
+  return true;
+}
+
+CURLcode HTTPClient::getResponseResult() {
+  return res;
+}
+
+int64_t &HTTPClient::getResponseCode() {
+  return http_code;
+}
+
+const char *HTTPClient::getContentType() {
+  return content_type;
+}
+
+const std::vector<char> &HTTPClient::getResponseBody() {
+  return content_.data;
+}
+
+void HTTPClient::set_request_method(const std::string method) {
+  std::string my_method = method;
+  std::transform(my_method.begin(), my_method.end(), my_method.begin(), ::toupper);
+  if (my_method == "POST") {
+    curl_easy_setopt(http_session_, CURLOPT_POST, 1L);
+  } else if (my_method == "PUT") {
+    curl_easy_setopt(http_session_, CURLOPT_PUT, 0L);
+  } else if (my_method == "GET") {
+  } else {
+    curl_easy_setopt(http_session_, CURLOPT_CUSTOMREQUEST, my_method.c_str());
+  }
+}
+
+bool HTTPClient::matches(const std::string &value, const std::string &sregex) {
+  if (sregex == ".*")
+    return true;
+
+  regex_t regex;
+  int ret = regcomp(&regex, sregex.c_str(), 0);
+  if (ret)
+    return false;
+  ret = regexec(&regex, value.c_str(), (size_t) 0, NULL, 0);
+  regfree(&regex);
+  if (ret)
+    return false;
+
+  return true;
+}
+
+void HTTPClient::configure_secure_connection(CURL *http_session) {
+  logger_->log_debug("Using certificate file %s", ssl_context_service_->getCertificateFile());
+  curl_easy_setopt(http_session, CURLOPT_SSL_CTX_FUNCTION, &configure_ssl_context);
+  curl_easy_setopt(http_session, CURLOPT_SSL_CTX_DATA, static_cast<void*>(ssl_context_service_.get()));
+}
+
+bool HTTPClient::isSecure(const std::string &url) {
+  if (url.find("https") != std::string::npos) {
+    logger_->log_debug("%s is a secure url", url);
+    return true;
+  }
+  return false;
+}
+
+} /* namespace utils */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/utils/Id.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/utils/Id.cpp b/libminifi/src/utils/Id.cpp
index ee6d84d..0c76a79 100644
--- a/libminifi/src/utils/Id.cpp
+++ b/libminifi/src/utils/Id.cpp
@@ -38,9 +38,14 @@ namespace utils {
 
 uint64_t timestamp = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
 
-NonRepeatingStringGenerator::NonRepeatingStringGenerator() : prefix_((std::to_string(timestamp) + "-")) {}
+NonRepeatingStringGenerator::NonRepeatingStringGenerator()
+    : prefix_((std::to_string(timestamp) + "-")) {
+}
 
-IdGenerator::IdGenerator() : implementation_(UUID_TIME_IMPL), logger_(logging::LoggerFactory<IdGenerator>::getLogger()), incrementor_(0) {
+IdGenerator::IdGenerator()
+    : implementation_(UUID_TIME_IMPL),
+      logger_(logging::LoggerFactory<IdGenerator>::getLogger()),
+      incrementor_(0) {
 }
 
 uint64_t IdGenerator::getDeviceSegmentFromString(const std::string& str, int numBits) {
@@ -134,12 +139,12 @@ void IdGenerator::generate(uuid_t output) {
       uuid_generate(output);
       break;
     case MINIFI_UID_IMPL: {
-        std::memcpy(output, deterministic_prefix_, sizeof(deterministic_prefix_));
-        uint64_t incrementor_value = incrementor_++;
-        for (int i = 8; i < 16; i++) {
-          output[i] = (incrementor_value >> ((15 - i) * 8)) & UNSIGNED_CHAR_MAX;
-        }
+      std::memcpy(output, deterministic_prefix_, sizeof(deterministic_prefix_));
+      uint64_t incrementor_value = incrementor_++;
+      for (int i = 8; i < 16; i++) {
+        output[i] = (incrementor_value >> ((15 - i) * 8)) & UNSIGNED_CHAR_MAX;
       }
+    }
       break;
     default:
       uuid_generate_time(output);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/test/.device_id
----------------------------------------------------------------------
diff --git a/libminifi/test/.device_id b/libminifi/test/.device_id
new file mode 100644
index 0000000..500a690
--- /dev/null
+++ b/libminifi/test/.device_id
@@ -0,0 +1 @@
+2291398467776739051
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/test/TestBase.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/TestBase.cpp b/libminifi/test/TestBase.cpp
index a471afe..c18153c 100644
--- a/libminifi/test/TestBase.cpp
+++ b/libminifi/test/TestBase.cpp
@@ -31,6 +31,7 @@ TestPlan::TestPlan(std::shared_ptr<core::ContentRepository> content_repo, std::s
       finalized(false),
       current_flowfile_(nullptr),
       logger_(logging::LoggerFactory<TestPlan>::getLogger()) {
+  stream_factory = std::make_shared<org::apache::nifi::minifi::io::StreamFactory>(std::make_shared<minifi::Configure>());
 }
 
 std::shared_ptr<core::Processor> TestPlan::addProcessor(const std::shared_ptr<core::Processor> &processor, const std::string &name, core::Relationship relationship,
@@ -43,6 +44,7 @@ bool linkToPrevious) {
   uuid_t uuid;
   uuid_generate(uuid);
 
+  processor->setStreamFactory(stream_factory);
   // initialize the processor
   processor->initialize();
 
@@ -60,7 +62,7 @@ bool linkToPrevious) {
 
     std::stringstream connection_name;
     connection_name << last->getUUIDStr() << "-to-" << processor->getUUIDStr();
-    logger_->log_info("Creating %s connection for proc %d",connection_name.str(),processor_queue_.size()+1);
+    logger_->log_info("Creating %s connection for proc %d", connection_name.str(), processor_queue_.size() + 1);
     std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(flow_repo_, content_repo_, connection_name.str());
     connection->setRelationship(relationship);
 
@@ -84,7 +86,7 @@ bool linkToPrevious) {
 
   processor_nodes_.push_back(node);
 
-  std::shared_ptr<core::ProcessContext> context = std::make_shared<core::ProcessContext>(*(node.get()), controller_services_provider_, flow_repo_, prov_repo_, content_repo_);
+  std::shared_ptr<core::ProcessContext> context = std::make_shared<core::ProcessContext>(node, controller_services_provider_, prov_repo_, flow_repo_, content_repo_);
   processor_contexts_.push_back(context);
 
   processor_queue_.push_back(processor);
@@ -116,7 +118,7 @@ bool linkToPrevious) {
 bool TestPlan::setProperty(const std::shared_ptr<core::Processor> proc, const std::string &prop, const std::string &value) {
   std::lock_guard<std::recursive_mutex> guard(mutex);
   int i = 0;
-  logger_->log_info("Attempting to set property %s %s for %s",prop,value,proc->getName());
+  logger_->log_info("Attempting to set property %s %s for %s", prop, value, proc->getName());
   for (i = 0; i < processor_queue_.size(); i++) {
     if (processor_queue_.at(i) == proc) {
       break;
@@ -151,13 +153,13 @@ bool TestPlan::runNextProcessor(std::function<void(core::ProcessContext*, core::
   location++;
   std::shared_ptr<core::Processor> processor = processor_queue_.at(location);
   std::shared_ptr<core::ProcessContext> context = processor_contexts_.at(location);
-  std::shared_ptr<core::ProcessSessionFactory> factory = std::make_shared<core::ProcessSessionFactory>(context.get());
+  std::shared_ptr<core::ProcessSessionFactory> factory = std::make_shared<core::ProcessSessionFactory>(context);
   factories_.push_back(factory);
   if (std::find(configured_processors_.begin(), configured_processors_.end(), processor) == configured_processors_.end()) {
-    processor->onSchedule(context.get(), factory.get());
+    processor->onSchedule(context, factory);
     configured_processors_.push_back(processor);
   }
-  std::shared_ptr<core::ProcessSession> current_session = std::make_shared<core::ProcessSession>(context.get());
+  std::shared_ptr<core::ProcessSession> current_session = std::make_shared<core::ProcessSession>(context);
   process_sessions_.push_back(current_session);
   processor->incrementActiveTasks();
   processor->setScheduledState(core::ScheduledState::RUNNING);
@@ -165,7 +167,7 @@ bool TestPlan::runNextProcessor(std::function<void(core::ProcessContext*, core::
     verify(context.get(), current_session.get());
   } else {
     logger_->log_info("Running %s", processor->getName());
-    processor->onTrigger(context.get(), current_session.get());
+    processor->onTrigger(context, current_session);
   }
   current_session->commit();
   current_flowfile_ = current_session->get();

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/test/TestBase.h
----------------------------------------------------------------------
diff --git a/libminifi/test/TestBase.h b/libminifi/test/TestBase.h
index 7e2a5d7..4eba0a8 100644
--- a/libminifi/test/TestBase.h
+++ b/libminifi/test/TestBase.h
@@ -191,6 +191,8 @@ class TestPlan {
 
   std::shared_ptr<minifi::Connection> buildFinalConnection(std::shared_ptr<core::Processor> processor, bool setDest = false);
 
+  std::shared_ptr<org::apache::nifi::minifi::io::StreamFactory> stream_factory;
+
   std::atomic<bool> finalized;
 
   std::shared_ptr<core::ContentRepository> content_repo_;
@@ -223,7 +225,7 @@ class TestController {
 
   TestController()
       : log(LogTestController::getInstance()) {
-    minifi::ResourceClaim::default_directory_path = const_cast<char*>("./");
+    minifi::setDefaultDirectory("./");
     log.reset();
     utils::IdGenerator::getIdGenerator()->initialize(std::make_shared<minifi::Properties>());
   }
@@ -235,8 +237,9 @@ class TestController {
 
     content_repo->initialize(configuration);
 
+    std::shared_ptr<core::Repository> flow_repo = std::make_shared<TestRepository>();
     std::shared_ptr<core::Repository> repo = std::make_shared<TestRepository>();
-    return std::make_shared<TestPlan>(content_repo, repo, repo);
+    return std::make_shared<TestPlan>(content_repo, flow_repo, repo);
   }
 
   void runSession(std::shared_ptr<TestPlan> &plan, bool runToCompletion = true, std::function<void(core::ProcessContext*, core::ProcessSession*)> verify = nullptr) {
@@ -247,6 +250,8 @@ class TestController {
     }
   }
 
+
+
   ~TestController() {
     for (auto dir : directories) {
       DIR *created_dir;
@@ -262,8 +267,9 @@ class TestController {
             unlink(file.c_str());
           }
         }
+        closedir(created_dir);
       }
-      closedir(created_dir);
+
       rmdir(dir);
     }
   }
@@ -276,6 +282,8 @@ class TestController {
 
  protected:
 
+
+
   std::mutex test_mutex;
   //std::map<std::string,>
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/test/TestServer.h
----------------------------------------------------------------------
diff --git a/libminifi/test/TestServer.h b/libminifi/test/TestServer.h
index 263a6b3..29f6345 100644
--- a/libminifi/test/TestServer.h
+++ b/libminifi/test/TestServer.h
@@ -21,15 +21,15 @@
 #include <string>
 #include <iostream>
 #include "civetweb.h"
+#include "CivetServer.h"
+
 
 /* Server context handle */
-static struct mg_context *ctx;
 static std::string resp_str;
 
 static int responder(struct mg_connection *conn, void *response) {
   const char *msg = resp_str.c_str();
 
-
   mg_printf(conn, "HTTP/1.1 200 OK\r\n"
             "Content-Length: %lu\r\n"
             "Content-Type: text/plain\r\n"
@@ -45,42 +45,36 @@ void init_webserver() {
   mg_init_library(0);
 }
 
-void start_webserver(std::string &port, std::string &rooturi, const std::string &response, struct mg_callbacks *callbacks, std::string &cert) {
 
-  std::cout << "root uri is " << rooturi << ":" << port << "/" << std::endl;
-  resp_str = response;
-  const char *options[] = { "listening_ports", port.c_str(), "ssl_certificate", cert.c_str(), "ssl_protocol_version", "3", "ssl_cipher_list",
-      "ECDHE-RSA-AES256-GCM-SHA384:DES-CBC3-SHA:AES128-SHA:AES128-GCM-SHA256", 0 };
+CivetServer * start_webserver(std::string &port, std::string &rooturi, CivetHandler *handler, struct mg_callbacks *callbacks, std::string &cert, std::string &ca_cert) {
+  const char *options[] = { "listening_ports", port.c_str(), "error_log_file",
+      "error.log", "ssl_certificate", ca_cert.c_str(), "ssl_protocol_version", "0", "ssl_cipher_list",
+      "ALL", "ssl_verify_peer", "no", 0 };
 
-  if (!mg_check_feature(2)) {
-    std::cerr << "Error: Embedded example built with SSL support, " << "but civetweb library build without" << std::endl;
-    exit(1);
+  std::vector<std::string> cpp_options;
+  for (int i = 0; i < (sizeof(options) / sizeof(options[0]) - 1); i++) {
+    cpp_options.push_back(options[i]);
   }
+  CivetServer *server = new CivetServer(cpp_options);
 
-  ctx = mg_start(callbacks, 0, options);
-  if (ctx == nullptr) {
-    std::cerr << "Cannot start CivetWeb - mg_start failed." << std::endl;
-    exit(1);
-  }
+  server->addHandler(rooturi, handler);
 
-  mg_set_request_handler(ctx, rooturi.c_str(), responder, (void*) &resp_str);
+  return server;
 
 }
 
-void start_webserver(std::string &port, std::string &rooturi, const std::string &response) {
+CivetServer * start_webserver(std::string &port, std::string &rooturi, CivetHandler *handler) {
+  const char *options[] = { "document_root", ".", "listening_ports", port.c_str(), 0 };
 
-  std::cout << "root uri is " << rooturi << ":" << port << "/" << std::endl;
-  resp_str = response;
-
-  const char *options[] = { "listening_ports", port.c_str(), 0 };
-  ctx = mg_start(nullptr, 0, options);
-
-  if (ctx == nullptr) {
-    std::cerr << "Cannot start CivetWeb - mg_start failed." << std::endl;
-    exit(1);
+  std::vector<std::string> cpp_options;
+  for (int i = 0; i < (sizeof(options) / sizeof(options[0]) - 1); i++) {
+    cpp_options.push_back(options[i]);
   }
+  CivetServer *server = new CivetServer(cpp_options);
+
+  server->addHandler(rooturi, handler);
 
-  mg_set_request_handler(ctx, rooturi.c_str(), responder, (void*) &resp_str);
+  return server;
 
 }
 
@@ -126,9 +120,9 @@ bool parse_http_components(const std::string &url, std::string &port, std::strin
 
 }
 
-static void stop_webserver() {
-  /* Stop the server */
-  mg_stop(ctx);
+static void stop_webserver(CivetServer *server) {
+  if (server != nullptr)
+    delete server;
 
   /* Un-initialize the library */
   mg_exit_library();

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/test/integration/C2NullConfiguration.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/integration/C2NullConfiguration.cpp b/libminifi/test/integration/C2NullConfiguration.cpp
new file mode 100644
index 0000000..394429f
--- /dev/null
+++ b/libminifi/test/integration/C2NullConfiguration.cpp
@@ -0,0 +1,135 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <sys/stat.h>
+#undef NDEBUG
+#include <cassert>
+#include <utility>
+#include <chrono>
+#include <fstream>
+#include <memory>
+#include <string>
+#include <thread>
+#include <type_traits>
+#include <vector>
+#include <iostream>
+#include <sstream>
+#include "../TestBase.h"
+#include "utils/StringUtils.h"
+#include "core/Core.h"
+#include "../include/core/logging/Logger.h"
+#include "core/ProcessGroup.h"
+#include "core/yaml/YamlConfiguration.h"
+#include "FlowController.h"
+#include "properties/Configure.h"
+#include "../unit/ProvenanceTestHelper.h"
+#include "io/StreamFactory.h"
+#include "CivetServer.h"
+#include "RemoteProcessorGroupPort.h"
+#include "core/ConfigurableComponent.h"
+#include "controllers/SSLContextService.h"
+#include "../TestServer.h"
+#include "c2/C2Agent.h"
+#include "c2/protocols/RESTReceiver.h"
+
+#include "IntegrationBase.h"
+
+class VerifyC2Server : public IntegrationBase {
+ public:
+  explicit VerifyC2Server(bool isSecure)
+      : isSecure(isSecure) {
+    char format[] = "/tmp/ssth.XXXXXX";
+    dir = testController.createTempDirectory(format);
+  }
+
+  void testSetup() {
+    LogTestController::getInstance().setDebug<utils::HTTPClient>();
+    LogTestController::getInstance().setDebug<processors::InvokeHTTP>();
+    LogTestController::getInstance().setDebug<minifi::c2::RESTReceiver>();
+      LogTestController::getInstance().setDebug<minifi::c2::C2Agent>();
+    LogTestController::getInstance().setDebug<processors::LogAttribute>();
+    LogTestController::getInstance().setDebug<minifi::core::ProcessSession>();
+    std::fstream file;
+    ss << dir << "/" << "tstFile.ext";
+    file.open(ss.str(), std::ios::out);
+    file << "tempFile";
+    file.close();
+  }
+
+  void cleanup() {
+    unlink(ss.str().c_str());
+  }
+
+  void runAssertions() {
+    assert(LogTestController::getInstance().contains("C2Agent] [info] Class is null") == true);
+    assert(LogTestController::getInstance().contains("C2Agent] [debug] Could not instantiate null") == true);
+    assert(LogTestController::getInstance().contains("Class is RESTSender") == true);
+  }
+
+  void queryRootProcessGroup(std::shared_ptr<core::ProcessGroup> pg) {
+    std::shared_ptr<core::Processor> proc = pg->findProcessor("invoke");
+    assert(proc != nullptr);
+
+    std::shared_ptr<minifi::processors::InvokeHTTP> inv = std::dynamic_pointer_cast<minifi::processors::InvokeHTTP>(proc);
+
+    assert(inv != nullptr);
+    std::string url = "";
+    inv->getProperty(minifi::processors::InvokeHTTP::URL.getName(), url);
+
+    std::cout << "url is " << url << std::endl;
+
+    std::string port, scheme, path;
+    parse_http_components(url, port, scheme, path);
+    std::cout << "path is " << path << std::endl;
+    configuration->set("c2.agent.protocol.class", "null");
+    configuration->set("c2.rest.url", "null");
+    configuration->set("c2.rest.url.ack", "null");
+    configuration->set("c2.agent.heartbeat.reporter.classes", "null");
+    configuration->set("c2.rest.listener.port", "null");
+    configuration->set("c2.agent.heartbeat.period", "null");
+    configuration->set("c2.rest.listener.heartbeat.rooturi", "null");
+  }
+
+ protected:
+  bool isSecure;
+  char *dir;
+  std::stringstream ss;
+  TestController testController;
+};
+
+int main(int argc, char **argv) {
+  std::string key_dir, test_file_location, url;
+  if (argc > 1) {
+    test_file_location = argv[1];
+    key_dir = argv[2];
+  }
+
+  bool isSecure = false;
+  if (url.find("https") != std::string::npos) {
+    isSecure = true;
+  }
+
+  VerifyC2Server harness(isSecure);
+
+  harness.setKeyDir(key_dir);
+
+  harness.run(test_file_location);
+
+  return 0;
+}
+


Mime
View raw message