nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ald...@apache.org
Subject [6/8] nifi-minifi-cpp git commit: MINIFI-339: Add C2 base allowing for 1 protocol and n heartbeat reporters MINIFI-339: Add GetTCP Processor MINIFI-339: Add listener server MINIFI-339: Update to listener MINIFI-339: Resolve Issue with stack based process
Date Mon, 02 Oct 2017 14:57:19 GMT
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/core/state/metrics/ProcessMetrics.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/state/metrics/ProcessMetrics.h b/libminifi/include/core/state/metrics/ProcessMetrics.h
new file mode 100644
index 0000000..f3f911d
--- /dev/null
+++ b/libminifi/include/core/state/metrics/ProcessMetrics.h
@@ -0,0 +1,102 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CORE_STATE_METRICS_PROCMETRICS_H_
+#define LIBMINIFI_INCLUDE_CORE_STATE_METRICS_PROCMETRICS_H_
+
+#include "core/Resource.h"
+#include <sstream>
+#include <map>
+#include <sys/time.h>
+#include <sys/resource.h>
+#include "MetricsBase.h"
+#include "Connection.h"
+#include "DeviceInformation.h"
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace state {
+namespace metrics {
+
+/**
+ * Justification and Purpose: Provides Connection queue metrics. Provides critical information to the
+ * C2 server.
+ *
+ */
+class ProcessMetrics : public Metrics {
+ public:
+
+  ProcessMetrics(const std::string &name, uuid_t uuid)
+      : Metrics(name, uuid) {
+  }
+
+  ProcessMetrics(const std::string &name)
+      : Metrics(name, 0) {
+  }
+
+  ProcessMetrics() {
+  }
+
+  std::string getName() {
+    return "ProcessMetrics";
+  }
+
+  std::vector<MetricResponse> serialize() {
+    std::vector<MetricResponse> serialized;
+
+    struct rusage my_usage;
+    getrusage(RUSAGE_SELF, &my_usage);
+
+    MetricResponse memory;
+    memory.name = "MemoryMetrics";
+
+    MetricResponse maxrss;
+    maxrss.name = "maxrss";
+
+    maxrss.value = std::to_string(my_usage.ru_maxrss);
+
+    memory.children.push_back(maxrss);
+    serialized.push_back(memory);
+
+    MetricResponse cpu;
+    cpu.name = "CpuMetrics";
+    MetricResponse ics;
+    ics.name = "involcs";
+
+    ics.value = std::to_string(my_usage.ru_nivcsw);
+
+    cpu.children.push_back(ics);
+    serialized.push_back(cpu);
+
+    return serialized;
+  }
+
+ protected:
+
+};
+
+REGISTER_RESOURCE(ProcessMetrics);
+
+} /* namespace metrics */
+} /* namespace state */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_CORE_STATE_METRICS_QUEUEMETRICS_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/core/state/metrics/QueueMetrics.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/state/metrics/QueueMetrics.h b/libminifi/include/core/state/metrics/QueueMetrics.h
new file mode 100644
index 0000000..018e70b
--- /dev/null
+++ b/libminifi/include/core/state/metrics/QueueMetrics.h
@@ -0,0 +1,106 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CORE_STATE_METRICS_QUEUEMETRICS_H_
+#define LIBMINIFI_INCLUDE_CORE_STATE_METRICS_QUEUEMETRICS_H_
+
+#include <sstream>
+#include <map>
+
+#include "MetricsBase.h"
+#include "Connection.h"
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace state {
+namespace metrics {
+
+/**
+ * Justification and Purpose: Provides Connection queue metrics. Provides critical information to the
+ * C2 server.
+ *
+ */
+class QueueMetrics : public Metrics {
+ public:
+
+  QueueMetrics(const std::string &name, uuid_t uuid)
+      : Metrics(name, uuid) {
+  }
+
+  QueueMetrics(const std::string &name)
+      : Metrics(name, 0) {
+  }
+
+  QueueMetrics()
+      : Metrics("QueueMetrics", 0) {
+  }
+
+  std::string getName() {
+    return "QueueMetrics";
+  }
+
+  void addConnection(const std::shared_ptr<minifi::Connection> &connection) {
+    if (nullptr != connection) {
+      connections.insert(std::make_pair(connection->getName(), connection));
+    }
+  }
+
+  std::vector<MetricResponse> serialize() {
+    std::vector<MetricResponse> serialized;
+    for (auto conn : connections) {
+      auto connection = conn.second;
+      MetricResponse parent;
+      parent.name = connection->getName();
+      MetricResponse datasize;
+      datasize.name = "datasize";
+      datasize.value = std::to_string(connection->getQueueDataSize());
+
+      MetricResponse datasizemax;
+      datasizemax.name = "datasizemax";
+      datasizemax.value = std::to_string(connection->getMaxQueueDataSize());
+
+      MetricResponse queuesize;
+      queuesize.name = "queued";
+      queuesize.value = std::to_string(connection->getQueueSize());
+
+      MetricResponse queuesizemax;
+      queuesizemax.name = "queuedmax";
+      queuesizemax.value = std::to_string(connection->getMaxQueueSize());
+
+      parent.children.push_back(datasize);
+      parent.children.push_back(datasizemax);
+      parent.children.push_back(queuesize);
+      parent.children.push_back(queuesizemax);
+
+      serialized.push_back(parent);
+    }
+    return serialized;
+  }
+
+ protected:
+  std::map<std::string, std::shared_ptr<minifi::Connection>> connections;
+};
+
+} /* namespace metrics */
+} /* namespace state */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_CORE_STATE_METRICS_QUEUEMETRICS_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/core/state/metrics/RepositoryMetrics.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/state/metrics/RepositoryMetrics.h b/libminifi/include/core/state/metrics/RepositoryMetrics.h
new file mode 100644
index 0000000..fa37e94
--- /dev/null
+++ b/libminifi/include/core/state/metrics/RepositoryMetrics.h
@@ -0,0 +1,101 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CORE_STATE_METRICS_REPOSITORYMETRICS_H_
+#define LIBMINIFI_INCLUDE_CORE_STATE_METRICS_REPOSITORYMETRICS_H_
+
+#include <sstream>
+#include <map>
+
+#include "MetricsBase.h"
+#include "Connection.h"
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace state {
+namespace metrics {
+
+/**
+ * Justification and Purpose: Provides repository metrics. Provides critical information to the
+ * C2 server.
+ *
+ */
+class RepositoryMetrics : public Metrics {
+ public:
+
+  RepositoryMetrics(const std::string &name, uuid_t uuid)
+      : Metrics(name, uuid) {
+  }
+
+  RepositoryMetrics(const std::string &name)
+      : Metrics(name, 0) {
+  }
+
+  RepositoryMetrics()
+      : Metrics("RepositoryMetrics", 0) {
+  }
+
+  std::string getName() {
+    return "RepositoryMetrics";
+  }
+
+  void addRepository(const std::shared_ptr<core::Repository> &repo) {
+    if (nullptr != repo) {
+      repositories.insert(std::make_pair(repo->getName(), repo));
+    }
+  }
+
+  std::vector<MetricResponse> serialize() {
+    std::vector<MetricResponse> serialized;
+    for (auto conn : repositories) {
+      auto repo = conn.second;
+      MetricResponse parent;
+      parent.name = repo->getName();
+      MetricResponse datasize;
+      datasize.name = "running";
+      datasize.value = std::to_string(repo->isRunning());
+
+      MetricResponse datasizemax;
+      datasizemax.name = "full";
+      datasizemax.value = std::to_string(repo->isFull());
+
+      MetricResponse queuesize;
+      queuesize.name = "size";
+      queuesize.value = std::to_string(repo->getRepoSize());
+
+      parent.children.push_back(datasize);
+      parent.children.push_back(datasizemax);
+      parent.children.push_back(queuesize);
+
+      serialized.push_back(parent);
+    }
+    return serialized;
+  }
+
+ protected:
+  std::map<std::string, std::shared_ptr<core::Repository>> repositories;
+};
+
+} /* namespace metrics */
+} /* namespace state */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_CORE_STATE_METRICS_RepositoryMetrics_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/core/state/metrics/SystemMetrics.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/state/metrics/SystemMetrics.h b/libminifi/include/core/state/metrics/SystemMetrics.h
new file mode 100644
index 0000000..4ac2b52
--- /dev/null
+++ b/libminifi/include/core/state/metrics/SystemMetrics.h
@@ -0,0 +1,109 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CORE_STATE_METRICS_SYSMETRICS_H_
+#define LIBMINIFI_INCLUDE_CORE_STATE_METRICS_SYSMETRICS_H_
+
+#include "core/Resource.h"
+#include <sstream>
+#include <map>
+#ifndef _WIN32
+#include <sys/utsname.h>
+#endif
+#include "MetricsBase.h"
+#include "Connection.h"
+#include "DeviceInformation.h"
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace state {
+namespace metrics {
+
+/**
+ * Justification and Purpose: Provides system information, including critical device information.
+ *
+ */
+class SystemInformation : public DeviceInformation {
+ public:
+
+  SystemInformation(const std::string &name, uuid_t uuid)
+      : DeviceInformation(name, uuid) {
+  }
+
+  SystemInformation(const std::string &name)
+      : DeviceInformation(name, 0) {
+  }
+
+  SystemInformation()
+      : DeviceInformation("SystemInformation", 0) {
+  }
+
+  std::string getName() {
+    return "SystemInformation";
+  }
+
+  std::vector<MetricResponse> serialize() {
+    std::vector<MetricResponse> serialized;
+
+    MetricResponse vcores;
+    vcores.name = "vcores";
+    int cpus[2] = { 0 };
+    size_t ncpus = std::thread::hardware_concurrency();
+
+    vcores.value = std::to_string(ncpus);
+
+    serialized.push_back(vcores);
+
+    MetricResponse mem;
+    mem.name = "physicalmem";
+#if defined(_SC_PHYS_PAGES) && defined(_SC_PAGESIZE)
+    size_t mema = (size_t) sysconf( _SC_PHYS_PAGES) * (size_t) sysconf( _SC_PAGESIZE);
+#endif
+    mem.value = std::to_string(mema);
+
+    serialized.push_back(mem);
+
+    MetricResponse arch;
+    arch.name = "machinearch";
+
+    utsname buf;
+
+    if (uname(&buf) == -1) {
+      arch.value = "unknown";
+    } else {
+      arch.value = buf.machine;
+    }
+
+    serialized.push_back(arch);
+
+    return serialized;
+  }
+
+ protected:
+
+};
+
+REGISTER_RESOURCE(SystemInformation);
+} /* namespace metrics */
+} /* namespace state */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_CORE_STATE_METRICS_QUEUEMETRICS_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/core/yaml/YamlConfiguration.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/yaml/YamlConfiguration.h b/libminifi/include/core/yaml/YamlConfiguration.h
index 17b060f..0f247e7 100644
--- a/libminifi/include/core/yaml/YamlConfiguration.h
+++ b/libminifi/include/core/yaml/YamlConfiguration.h
@@ -51,7 +51,7 @@ class YamlConfiguration : public FlowConfiguration {
       : FlowConfiguration(repo, flow_file_repo, content_repo, stream_factory, configuration, path),
         logger_(logging::LoggerFactory<YamlConfiguration>::getLogger()) {
     stream_factory_ = stream_factory;
-    if (IsNullOrEmpty (config_path_)) {
+    if (IsNullOrEmpty(config_path_)) {
       config_path_ = DEFAULT_FLOW_YAML_FILE_NAME;
     }
   }
@@ -102,7 +102,7 @@ class YamlConfiguration : public FlowConfiguration {
    * @return                 the root ProcessGroup node of the flow
    *                           configuration tree
    */
-  std::unique_ptr<core::ProcessGroup> getRootFromPayload(std::string &yamlConfigPayload) {
+  std::unique_ptr<core::ProcessGroup> getRootFromPayload(const std::string &yamlConfigPayload) {
     YAML::Node rootYamlNode = YAML::Load(yamlConfigPayload);
     return getRoot(&rootYamlNode);
   }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/io/AtomicEntryStream.h
----------------------------------------------------------------------
diff --git a/libminifi/include/io/AtomicEntryStream.h b/libminifi/include/io/AtomicEntryStream.h
index 5f200f0..3eb456e 100644
--- a/libminifi/include/io/AtomicEntryStream.h
+++ b/libminifi/include/io/AtomicEntryStream.h
@@ -48,7 +48,7 @@ class AtomicEntryStream : public BaseStream {
       invalid_stream_ = true;
     }
   }
-  
+
   virtual ~AtomicEntryStream();
 
   virtual void closeStream() {
@@ -116,9 +116,9 @@ class AtomicEntryStream : public BaseStream {
 };
 
 template<typename T>
-AtomicEntryStream<T>::~AtomicEntryStream(){
+AtomicEntryStream<T>::~AtomicEntryStream() {
   logger_->log_debug("Decrementing");
-    entry_->decrementOwnership();
+  entry_->decrementOwnership();
 }
 
 template<typename T>
@@ -141,13 +141,11 @@ int AtomicEntryStream<T>::writeData(uint8_t *value, int size) {
     std::lock_guard<std::recursive_mutex> lock(entry_lock_);
     if (entry_->insert(key_, value, size)) {
       offset_ += size;
-      if (offset_ > length_)
-          {
+      if (offset_ > length_) {
         length_ = offset_;
       }
       return size;
-    }
-    else {
+    } else {
       logger_->log_debug("Cannot insert %d bytes due to insufficient space in atomic entry", size);
     }
 
@@ -158,7 +156,7 @@ int AtomicEntryStream<T>::writeData(uint8_t *value, int size) {
 
 template<typename T>
 int AtomicEntryStream<T>::readData(std::vector<uint8_t> &buf, int buflen) {
-  if (invalid_stream_){
+  if (invalid_stream_) {
     return -1;
   }
   if (buf.capacity() < buflen) {
@@ -182,13 +180,13 @@ int AtomicEntryStream<T>::readData(uint8_t *buf, int buflen) {
       if (offset_ + len > value->getBufferSize()) {
         len = value->getBufferSize() - offset_;
         if (len <= 0) {
-	  entry_->decrementOwnership();
+          entry_->decrementOwnership();
           return 0;
         }
       }
       std::memcpy(buf, reinterpret_cast<uint8_t*>(const_cast<uint8_t*>(value->getBuffer()) + offset_), len);
       offset_ += len;
-    entry_->decrementOwnership();
+      entry_->decrementOwnership();
       return len;
     }
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/io/BaseStream.h
----------------------------------------------------------------------
diff --git a/libminifi/include/io/BaseStream.h b/libminifi/include/io/BaseStream.h
index cd982bb..dc810e3 100644
--- a/libminifi/include/io/BaseStream.h
+++ b/libminifi/include/io/BaseStream.h
@@ -178,7 +178,7 @@ class BaseStream : public DataStream, public Serializable {
    * @return resulting read size
    **/
   virtual int readUTF(std::string &str, bool widen = false);
-   protected:
+ protected:
   DataStream *composable_stream_;
 };
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/io/CRCStream.h
----------------------------------------------------------------------
diff --git a/libminifi/include/io/CRCStream.h b/libminifi/include/io/CRCStream.h
index 696f418..70cb89a 100644
--- a/libminifi/include/io/CRCStream.h
+++ b/libminifi/include/io/CRCStream.h
@@ -133,7 +133,7 @@ class CRCStream : public BaseStream {
   }
 
   void reset();
-   protected:
+ protected:
 
   /**
    * Creates a vector and returns the vector using the provided

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/io/ClientSocket.h
----------------------------------------------------------------------
diff --git a/libminifi/include/io/ClientSocket.h b/libminifi/include/io/ClientSocket.h
index cd8a4fc..c9ad90e 100644
--- a/libminifi/include/io/ClientSocket.h
+++ b/libminifi/include/io/ClientSocket.h
@@ -101,6 +101,11 @@ class Socket : public BaseStream {
    */
   virtual int16_t initialize();
 
+  /**
+   * Sets the non blocking flag on the file descriptor.
+   */
+  void setNonBlocking();
+
   std::string getHostname() const;
 
   /**
@@ -114,14 +119,35 @@ class Socket : public BaseStream {
    * Reads data and places it into buf
    * @param buf buffer in which we extract data
    * @param buflen
+   * @param retrieve_all_bytes determines if we should read all bytes before returning
+   */
+  virtual int readData(std::vector<uint8_t> &buf, int buflen) {
+    return readData(buf, buflen, true);
+  }
+  /**
+   * Reads data and places it into buf
+   * @param buf buffer in which we extract data
+   * @param buflen
+   * @param retrieve_all_bytes determines if we should read all bytes before returning
+   */
+  virtual int readData(uint8_t *buf, int buflen) {
+    return readData(buf, buflen, true);
+  }
+
+  /**
+   * Reads data and places it into buf
+   * @param buf buffer in which we extract data
+   * @param buflen
+   * @param retrieve_all_bytes determines if we should read all bytes before returning
    */
-  virtual int readData(std::vector<uint8_t> &buf, int buflen);
+  virtual int readData(std::vector<uint8_t> &buf, int buflen, bool retrieve_all_bytes);
   /**
    * Reads data and places it into buf
    * @param buf buffer in which we extract data
    * @param buflen
+   * @param retrieve_all_bytes determines if we should read all bytes before returning
    */
-  virtual int readData(uint8_t *buf, int buflen);
+  virtual int readData(uint8_t *buf, int buflen, bool retrieve_all_bytes);
 
   /**
    * Write value to the stream using std::vector

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/io/EndianCheck.h
----------------------------------------------------------------------
diff --git a/libminifi/include/io/EndianCheck.h b/libminifi/include/io/EndianCheck.h
index c4d4504..3ceb19c 100644
--- a/libminifi/include/io/EndianCheck.h
+++ b/libminifi/include/io/EndianCheck.h
@@ -30,7 +30,7 @@ namespace io {
 class EndiannessCheck {
  public:
   static bool IS_LITTLE;
-   private:
+ private:
 
   static bool is_little_endian() {
     /* do whatever is needed at static init time */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/io/FileStream.h
----------------------------------------------------------------------
diff --git a/libminifi/include/io/FileStream.h b/libminifi/include/io/FileStream.h
index 23a1f0b..0cddcc2 100644
--- a/libminifi/include/io/FileStream.h
+++ b/libminifi/include/io/FileStream.h
@@ -45,7 +45,7 @@ class FileStream : public io::BaseStream {
    * File Stream constructor that accepts an fstream shared pointer.
    * It must already be initialized for read and write.
    */
-  explicit FileStream(const std::string &path, uint32_t offset,  bool write_enable = false);
+  explicit FileStream(const std::string &path, uint32_t offset, bool write_enable = false);
 
   /**
    * File Stream constructor that accepts an fstream shared pointer.

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/processors/GetFile.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/GetFile.h b/libminifi/include/processors/GetFile.h
index df8f775..c193a8d 100644
--- a/libminifi/include/processors/GetFile.h
+++ b/libminifi/include/processors/GetFile.h
@@ -25,6 +25,7 @@
 #include "core/Core.h"
 #include "core/Resource.h"
 #include "core/logging/LoggerConfiguration.h"
+#include "core/state/metrics/MetricsBase.h"
 
 namespace org {
 namespace apache {
@@ -43,8 +44,63 @@ struct GetFileRequest {
   std::string fileFilter = "[^\\.].*";
 };
 
+class GetFileMetrics : public state::metrics::Metrics {
+ public:
+  GetFileMetrics()
+      : state::metrics::Metrics("GetFileMetrics", 0) {
+    iterations_ = 0;
+    accepted_files_ = 0;
+    input_bytes_ = 0;
+  }
+
+  GetFileMetrics(std::string name, uuid_t uuid)
+      : state::metrics::Metrics(name, uuid) {
+    iterations_ = 0;
+    accepted_files_ = 0;
+    input_bytes_ = 0;
+  }
+  virtual ~GetFileMetrics() {
+
+  }
+  virtual std::string getName() {
+    return core::Connectable::getName();
+  }
+
+  virtual std::vector<state::metrics::MetricResponse> serialize() {
+    std::vector<state::metrics::MetricResponse> resp;
+
+    state::metrics::MetricResponse iter;
+    iter.name = "OnTriggerInvocations";
+    iter.value = std::to_string(iterations_.load());
+
+    resp.push_back(iter);
+
+    state::metrics::MetricResponse accepted_files;
+    accepted_files.name = "AcceptedFiles";
+    accepted_files.value = std::to_string(accepted_files_.load());
+
+    resp.push_back(accepted_files);
+
+    state::metrics::MetricResponse input_bytes;
+    input_bytes.name = "InputBytes";
+    input_bytes.value = std::to_string(input_bytes_.load());
+
+    resp.push_back(input_bytes);
+
+    return resp;
+  }
+
+ protected:
+  friend class GetFile;
+
+  std::atomic<size_t> iterations_;
+  std::atomic<size_t> accepted_files_;
+  std::atomic<size_t> input_bytes_;
+
+};
+
 // GetFile Class
-class GetFile : public core::Processor {
+class GetFile : public core::Processor, public state::metrics::MetricsSource {
  public:
   // Constructor
   /*!
@@ -53,7 +109,7 @@ class GetFile : public core::Processor {
   explicit GetFile(std::string name, uuid_t uuid = NULL)
       : Processor(name, uuid),
         logger_(logging::LoggerFactory<GetFile>::getLogger()) {
-
+    metrics_ = std::make_shared<GetFileMetrics>();
   }
   // Destructor
   virtual ~GetFile() {
@@ -99,10 +155,14 @@ class GetFile : public core::Processor {
    */
   void performListing(std::string dir, const GetFileRequest &request);
 
+  int16_t getMetrics(std::vector<std::shared_ptr<state::metrics::Metrics>> &metric_vector);
+
  protected:
 
  private:
 
+  std::shared_ptr<GetFileMetrics> metrics_;
+
   // Queue for store directory list
   std::queue<std::string> _dirList;
   // Get Listing size

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/processors/GetTCP.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/GetTCP.h b/libminifi/include/processors/GetTCP.h
new file mode 100644
index 0000000..78a9c01
--- /dev/null
+++ b/libminifi/include/processors/GetTCP.h
@@ -0,0 +1,288 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __GET_TCP_H__
+#define __GET_TCP_H__
+
+#include <atomic>
+#include "FlowFileRecord.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/Core.h"
+#include "core/Resource.h"
+#include "concurrentqueue.h"
+#include "utils/ThreadPool.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "core/state/metrics/MetricsBase.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+class SocketAfterExecute : public utils::AfterExecute<int> {
+ public:
+  explicit SocketAfterExecute(std::atomic<bool> &running, const std::string &endpoint, std::map<std::string, std::future<int>*> *list, std::mutex *mutex)
+      : running_(running.load()),
+        endpoint_(endpoint),
+        mutex_(mutex),
+        list_(list) {
+  }
+
+  explicit SocketAfterExecute(SocketAfterExecute && other) {
+  }
+
+  ~SocketAfterExecute() {
+  }
+
+  virtual bool isFinished(const int &result) {
+
+    if (result == -1 || result == 0 || !running_) {
+      std::lock_guard<std::mutex> lock(*mutex_);
+      list_->erase(endpoint_);
+      return true;
+    } else {
+      return false;
+    }
+  }
+  virtual bool isCancelled(const int &result) {
+    if (!running_)
+      return true;
+    else
+      return false;
+  }
+
+ protected:
+  std::atomic<bool> running_;
+  std::map<std::string, std::future<int>*> *list_;
+  std::mutex *mutex_;
+  std::string endpoint_;
+};
+
+class DataHandlerCallback : public OutputStreamCallback {
+ public:
+  DataHandlerCallback(uint8_t *message, size_t size)
+      : message_(message),
+        size_(size) {
+  }
+
+  virtual ~DataHandlerCallback() {
+
+  }
+
+  virtual int64_t process(std::shared_ptr<io::BaseStream> stream) {
+    return stream->write(message_, size_);
+  }
+
+ private:
+  uint8_t *message_;
+  size_t size_;
+};
+
+class DataHandler {
+ public:
+  DataHandler(std::shared_ptr<core::ProcessSessionFactory> sessionFactory)
+      : sessionFactory_(sessionFactory) {
+
+  }
+  static const char *SOURCE_ENDPOINT_ATTRIBUTE;
+
+  int16_t handle(std::string source, uint8_t *message, size_t size, bool partial);
+
+ private:
+  std::shared_ptr<core::ProcessSessionFactory> sessionFactory_;
+
+};
+
+class GetTCPMetrics : public state::metrics::Metrics {
+ public:
+  GetTCPMetrics()
+      : state::metrics::Metrics("GetTCPMetrics", 0) {
+    iterations_ = 0;
+    accepted_files_ = 0;
+    input_bytes_ = 0;
+  }
+
+  GetTCPMetrics(std::string name, uuid_t uuid)
+      : state::metrics::Metrics(name, uuid) {
+    iterations_ = 0;
+    accepted_files_ = 0;
+    input_bytes_ = 0;
+  }
+  virtual ~GetTCPMetrics() {
+
+  }
+  virtual std::string getName() {
+    return core::Connectable::getName();
+  }
+
+  virtual std::vector<state::metrics::MetricResponse> serialize() {
+    std::vector<state::metrics::MetricResponse> resp;
+
+    state::metrics::MetricResponse iter;
+    iter.name = "OnTriggerInvocations";
+    iter.value = std::to_string(iterations_.load());
+
+    resp.push_back(iter);
+
+    state::metrics::MetricResponse accepted_files;
+    accepted_files.name = "AcceptedFiles";
+    accepted_files.value = std::to_string(accepted_files_.load());
+
+    resp.push_back(accepted_files);
+
+    state::metrics::MetricResponse input_bytes;
+    input_bytes.name = "InputBytes";
+    input_bytes.value = std::to_string(input_bytes_.load());
+
+    resp.push_back(input_bytes);
+
+    return resp;
+  }
+
+ protected:
+  friend class GetTCP;
+
+  std::atomic<size_t> iterations_;
+  std::atomic<size_t> accepted_files_;
+  std::atomic<size_t> input_bytes_;
+
+};
+
+// GetTCP Class
+class GetTCP : public core::Processor, public state::metrics::MetricsSource {
+ public:
+// Constructor
+  /*!
+   * Create a new processor
+   */
+  explicit GetTCP(std::string name, uuid_t uuid = NULL)
+      : Processor(name, uuid),
+        connection_attempts_(3),
+        reconnect_interval_(5000),
+        receive_buffer_size_(16 * 1024 * 1024),
+        stay_connected_(true),
+        endOfMessageByte(13),
+        running_(false),
+        connection_attempt_limit_(3),
+        concurrent_handlers_(2),
+        logger_(logging::LoggerFactory<GetTCP>::getLogger()) {
+    metrics_ = std::make_shared<GetTCPMetrics>();
+  }
+// Destructor
+  virtual ~GetTCP() {
+  }
+// Processor Name
+  static constexpr char const* ProcessorName = "GetTCP";
+
+  // Supported Properties
+  static core::Property EndpointList;
+  static core::Property ConcurrentHandlers;
+  static core::Property ReconnectInterval;
+  static core::Property StayConnected;
+  static core::Property ReceiveBufferSize;
+  static core::Property ConnectionAttemptLimit;
+  static core::Property EndOfMessageByte;
+
+  // Supported Relationships
+  static core::Relationship Success;
+  static core::Relationship Partial;
+
+ public:
+  /**
+   * Function that's executed when the processor is scheduled.
+   * @param context process context.
+   * @param sessionFactory process session factory that is used when creating
+   * ProcessSession objects.
+   */
+  virtual void onSchedule(std::shared_ptr<core::ProcessContext> processContext, std::shared_ptr<core::ProcessSessionFactory> sessionFactory);
+
+  void onSchedule(core::ProcessContext *processContext, core::ProcessSessionFactory *sessionFactory) {
+    throw std::exception();
+  }
+  /**
+   * Execution trigger for the GetTCP Processor
+   * @param context processor context
+   * @param session processor session reference.
+   */
+  virtual void onTrigger(std::shared_ptr<core::ProcessContext> context, std::shared_ptr<core::ProcessSession> session);
+
+  virtual void onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
+    throw std::exception();
+  }
+
+  // Initialize, over write by NiFi GetTCP
+  virtual void initialize(void);
+
+  int16_t getMetrics(std::vector<std::shared_ptr<state::metrics::Metrics>> &metric_vector);
+
+ protected:
+
+  virtual void notifyStop();
+
+ private:
+
+  std::function<int()> f_ex;
+
+  std::atomic<bool> running_;
+
+  std::unique_ptr<DataHandler> handler_;
+
+  std::vector<std::string> endpoints;
+
+  std::map<std::string, std::future<int>*> live_clients_;
+
+  utils::ThreadPool<int> client_thread_pool_;
+
+  moodycamel::ConcurrentQueue<std::unique_ptr<io::Socket>> socket_ring_buffer_;
+
+  bool stay_connected_;
+
+  uint16_t concurrent_handlers_;
+
+  int8_t endOfMessageByte;
+
+  int64_t reconnect_interval_;
+
+  int64_t receive_buffer_size_;
+
+  int8_t connection_attempts_;
+
+  uint16_t connection_attempt_limit_;
+
+  std::shared_ptr<GetTCPMetrics> metrics_;
+
+  // Mutex for ensuring clients are running
+
+  std::mutex mutex_;
+
+// last listing time for root directory ( if recursive, we will consider the root
+// as the top level time.
+
+  std::shared_ptr<logging::Logger> logger_;
+};
+
+REGISTER_RESOURCE(GetTCP);
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/processors/InvokeHTTP.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/InvokeHTTP.h b/libminifi/include/processors/InvokeHTTP.h
index 03a1611..1c9d594 100644
--- a/libminifi/include/processors/InvokeHTTP.h
+++ b/libminifi/include/processors/InvokeHTTP.h
@@ -33,7 +33,7 @@
 #include "utils/ByteInputCallBack.h"
 #include "core/logging/LoggerConfiguration.h"
 #include "utils/Id.h"
-#include "utils/HTTPUtils.h"
+#include "utils/HTTPClient.h"
 
 namespace org {
 namespace apache {
@@ -56,9 +56,11 @@ class InvokeHTTP : public core::Processor {
         penalize_no_retry_(false),
         read_timeout_(20000),
         always_output_response_(false),
+        disable_peer_verification_(false),
         ssl_context_service_(nullptr),
+        use_chunked_encoding_(false),
         logger_(logging::LoggerFactory<InvokeHTTP>::getLogger()) {
-    curl_global_init(CURL_GLOBAL_DEFAULT);
+    static utils::HTTPClientInitializer *initializer = utils::HTTPClientInitializer::getInstance();
   }
   // Destructor
   virtual ~InvokeHTTP();
@@ -79,7 +81,8 @@ class InvokeHTTP : public core::Processor {
   static core::Property ProxyPassword;
   static core::Property ContentType;
   static core::Property SendBody;
-
+  static core::Property UseChunkedEncoding;
+  static core::Property DisablePeerVerification;
   static core::Property PropPutOutputAttributes;
 
   static core::Property AlwaysOutputResponse;
@@ -101,9 +104,9 @@ class InvokeHTTP : public core::Processor {
   static core::Relationship RelNoRetry;
   static core::Relationship RelFailure;
 
-  void onTrigger(core::ProcessContext *context, core::ProcessSession *session);
-  void initialize();
-  void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory);
+  virtual void onTrigger(core::ProcessContext *context, core::ProcessSession *session);
+  virtual void initialize();
+  virtual void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory);
   /**
    * Provides a reference to the URL.
    */
@@ -114,37 +117,10 @@ class InvokeHTTP : public core::Processor {
  protected:
 
   /**
-   * Configures the SSL Context. Relies on the Context service and OpenSSL's installation
-   */
-  static CURLcode configure_ssl_context(CURL *curl, void *ctx, void *param);
-
-  /**
-   * Determines if a secure connection is required
-   * @param url url we will be connecting to
-   * @returns true if secure connection is allowed/required
-   */
-  bool isSecure(const std::string &url);
-
-  /**
-   * Configures a secure connection
-   */
-  void configure_secure_connection(CURL *http_session);
-
-  /**
    * Generate a transaction ID
    * @return transaction ID string.
    */
   std::string generateId();
-  /**
-   * Set the request method on the curl struct.
-   * @param curl pointer to this instance.
-   * @param string request method
-   */
-  void set_request_method(CURL *curl, const std::string &);
-
-  struct curl_slist *build_header_list(CURL *curl, std::string regex, const std::map<std::string, std::string> &);
-
-  bool matches(const std::string &value, const std::string &sregex);
 
   /**
    * Routes the flowfile to the proper destination
@@ -155,9 +131,7 @@ class InvokeHTTP : public core::Processor {
    * @param isSuccess success code or not
    * @param statuscode http response code.
    */
-  void route(std::shared_ptr<FlowFileRecord> &request, std::shared_ptr<FlowFileRecord> &response, core::ProcessSession *session, core::ProcessContext *context,
-  bool isSuccess,
-             int statusCode);
+  void route(std::shared_ptr<FlowFileRecord> &request, std::shared_ptr<FlowFileRecord> &response, core::ProcessSession *session, core::ProcessContext *context, bool isSuccess, int statusCode);
   /**
    * Determine if we should emit a new flowfile based on our activity
    * @param method method type
@@ -167,8 +141,6 @@ class InvokeHTTP : public core::Processor {
 
   std::shared_ptr<minifi::controllers::SSLContextService> ssl_context_service_;
 
-  CURLcode res;
-
   // http method
   std::string method_;
   // url
@@ -187,7 +159,12 @@ class InvokeHTTP : public core::Processor {
   bool always_output_response_;
   // penalize on no retry
   bool penalize_no_retry_;
-
+  // content type.
+  std::string content_type_;
+  // use chunked encoding.
+  bool use_chunked_encoding_;
+  // disable peer verification ( makes susceptible for MITM attacks )
+  bool disable_peer_verification_;
  private:
   std::shared_ptr<logging::Logger> logger_;
   static std::shared_ptr<utils::IdGenerator> id_generator_;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/processors/LoadProcessors.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/LoadProcessors.h b/libminifi/include/processors/LoadProcessors.h
index e8d207a..b629052 100644
--- a/libminifi/include/processors/LoadProcessors.h
+++ b/libminifi/include/processors/LoadProcessors.h
@@ -25,6 +25,7 @@
 #include "ExecuteProcess.h"
 #include "GenerateFlowFile.h"
 #include "GetFile.h"
+#include "GetTCP.h"
 #include "ListenHTTP.h"
 #include "LogAttribute.h"
 #include "PutFile.h"

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/processors/PutFile.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/PutFile.h b/libminifi/include/processors/PutFile.h
index c7f2823..0b8add7 100644
--- a/libminifi/include/processors/PutFile.h
+++ b/libminifi/include/processors/PutFile.h
@@ -80,8 +80,7 @@ class PutFile : public core::Processor {
    public:
     ReadCallback(const std::string &tmpFile, const std::string &destFile);
     ~ReadCallback();
-    virtual int64_t process(std::shared_ptr<io::BaseStream> stream);
-    bool commit();
+    virtual int64_t process(std::shared_ptr<io::BaseStream> stream);bool commit();
 
    private:
     std::shared_ptr<logging::Logger> logger_;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/properties/Configure.h
----------------------------------------------------------------------
diff --git a/libminifi/include/properties/Configure.h b/libminifi/include/properties/Configure.h
index 341b89c..f71ce9c 100644
--- a/libminifi/include/properties/Configure.h
+++ b/libminifi/include/properties/Configure.h
@@ -31,6 +31,7 @@ class Configure : public Properties {
  public:
   // nifi.flow.configuration.file
   static const char *nifi_default_directory;
+  static const char *nifi_c2_enable;
   static const char *nifi_flow_configuration_file;
   static const char *nifi_flow_engine_threads;
   static const char *nifi_administrative_yield_duration;
@@ -63,7 +64,7 @@ class Configure : public Properties {
   static const char *nifi_configuration_listener_pull_interval;
   static const char *nifi_configuration_listener_http_url;
   static const char *nifi_configuration_listener_rest_url;
-  static const char *nifi_configuration_listener_type; // http or rest
+  static const char *nifi_configuration_listener_type;  // http or rest
   // security config for all https service
   static const char *nifi_https_need_ClientAuth;
   static const char *nifi_https_client_certificate;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/properties/Properties.h
----------------------------------------------------------------------
diff --git a/libminifi/include/properties/Properties.h b/libminifi/include/properties/Properties.h
index 1b456d0..ec0ca5d 100644
--- a/libminifi/include/properties/Properties.h
+++ b/libminifi/include/properties/Properties.h
@@ -75,6 +75,14 @@ class Properties {
     minifi_home_ = minifiHome;
   }
 
+  std::vector<std::string> getConfiguredKeys() {
+    std::vector<std::string> keys;
+    for (auto &property : properties_) {
+      keys.push_back(property.first);
+    }
+    return keys;
+  }
+
   // Get the determined MINIFI_HOME
   std::string getHome() {
     return minifi_home_;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/provenance/Provenance.h
----------------------------------------------------------------------
diff --git a/libminifi/include/provenance/Provenance.h b/libminifi/include/provenance/Provenance.h
index b9415dc..6d9895a 100644
--- a/libminifi/include/provenance/Provenance.h
+++ b/libminifi/include/provenance/Provenance.h
@@ -157,7 +157,7 @@ class ProvenanceEventRecord : public core::SerializableComponent {
     REPLAY
   };
   static const char *ProvenanceEventTypeStr[REPLAY + 1];
-   public:
+ public:
   // Constructor
   /*!
    * Create a new provenance event record
@@ -483,14 +483,6 @@ class ProvenanceReporter {
     }
     _events.clear();
   }
-  // allocate
-  ProvenanceEventRecord *allocate(ProvenanceEventRecord::ProvenanceEventType eventType, std::shared_ptr<core::FlowFile> flow) {
-    ProvenanceEventRecord *event = new ProvenanceEventRecord(eventType, _componentId, _componentType);
-    if (event)
-      event->fromFlowFile(flow);
-
-    return event;
-  }
   // commit
   void commit();
   // create
@@ -520,6 +512,15 @@ class ProvenanceReporter {
 
  protected:
 
+  // allocate
+  ProvenanceEventRecord *allocate(ProvenanceEventRecord::ProvenanceEventType eventType, std::shared_ptr<core::FlowFile> flow) {
+    ProvenanceEventRecord *event = new ProvenanceEventRecord(eventType, _componentId, _componentType);
+    if (event)
+      event->fromFlowFile(flow);
+
+    return event;
+  }
+
   // Component ID
   std::string _componentId;
   // Component Type

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/provenance/ProvenanceRepository.h
----------------------------------------------------------------------
diff --git a/libminifi/include/provenance/ProvenanceRepository.h b/libminifi/include/provenance/ProvenanceRepository.h
index ea78a3c..53f489f 100644
--- a/libminifi/include/provenance/ProvenanceRepository.h
+++ b/libminifi/include/provenance/ProvenanceRepository.h
@@ -25,6 +25,7 @@
 #include "core/Core.h"
 #include "provenance/Provenance.h"
 #include "core/logging/LoggerConfiguration.h"
+#include "concurrentqueue.h"
 namespace org {
 namespace apache {
 namespace nifi {
@@ -56,6 +57,8 @@ class ProvenanceRepository : public core::Repository, public std::enable_shared_
       delete db_;
   }
 
+  virtual void flush();
+
   void start() {
     if (this->purge_period_ <= 0)
       return;
@@ -98,12 +101,14 @@ class ProvenanceRepository : public core::Repository, public std::enable_shared_
   // Put
   virtual bool Put(std::string key, const uint8_t *buf, size_t bufLen) {
 
-    if (repo_full_)
+    if (repo_full_) {
       return false;
+    }
 
     // persistent to the DB
     leveldb::Slice value((const char *) buf, bufLen);
     leveldb::Status status;
+    repo_size_ += bufLen;
     status = db_->Put(leveldb::WriteOptions(), key, value);
     if (status.ok())
       return true;
@@ -112,12 +117,8 @@ class ProvenanceRepository : public core::Repository, public std::enable_shared_
   }
   // Delete
   virtual bool Delete(std::string key) {
-    leveldb::Status status;
-    status = db_->Delete(leveldb::WriteOptions(), key);
-    if (status.ok())
-      return true;
-    else
-      return false;
+    keys_to_delete.enqueue(key);
+    return true;
   }
   // Get
   virtual bool Get(const std::string &key, std::string &value) {
@@ -134,6 +135,10 @@ class ProvenanceRepository : public core::Repository, public std::enable_shared_
     Delete(event->getEventId());
   }
 
+  virtual bool Serialize(const std::string &key, const uint8_t *buffer, const size_t bufferSize) {
+    return Put(key, buffer, bufferSize);
+  }
+
   virtual bool get(std::vector<std::shared_ptr<core::CoreComponent>> &store, size_t max_size) {
     leveldb::Iterator* it = db_->NewIterator(leveldb::ReadOptions());
     for (it->SeekToFirst(); it->Valid(); it->Next()) {
@@ -213,6 +218,7 @@ class ProvenanceRepository : public core::Repository, public std::enable_shared_
     for (auto record : records) {
       Delete(record->getEventId());
     }
+    flush();
   }
   // destroy
   void destroy() {
@@ -230,6 +236,7 @@ class ProvenanceRepository : public core::Repository, public std::enable_shared_
   ProvenanceRepository &operator=(const ProvenanceRepository &parent) = delete;
 
  private:
+  moodycamel::ConcurrentQueue<std::string> keys_to_delete;
   leveldb::DB* db_;
   std::shared_ptr<logging::Logger> logger_;
 };

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/utils/ByteInputCallBack.h
----------------------------------------------------------------------
diff --git a/libminifi/include/utils/ByteInputCallBack.h b/libminifi/include/utils/ByteInputCallBack.h
index 86aae09..676d67b 100644
--- a/libminifi/include/utils/ByteInputCallBack.h
+++ b/libminifi/include/utils/ByteInputCallBack.h
@@ -47,7 +47,7 @@ class ByteInputCallBack : public InputStreamCallback {
     if (stream->getSize() > 0) {
       vec.resize(stream->getSize());
 
-      stream->readData(vec, stream->getSize());
+      stream->readData(reinterpret_cast<uint8_t*>(vec.data()), stream->getSize());
     }
 
     ptr = (char*) &vec[0];
@@ -56,6 +56,13 @@ class ByteInputCallBack : public InputStreamCallback {
 
   }
 
+  void write(std::string content) {
+    //vec.resize(content.length());
+    //std::copy(content.begin(), content.end(), std::back_inserter(vec));
+    vec.assign(content.begin(), content.end());
+    ptr = &vec[0];
+  }
+
   char *getBuffer() {
     return ptr;
   }
@@ -66,7 +73,7 @@ class ByteInputCallBack : public InputStreamCallback {
 
  private:
   char *ptr;
-  std::vector<uint8_t> vec;
+  std::vector<char> vec;
 };
 
 } /* namespace utils */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/utils/HTTPClient.h
----------------------------------------------------------------------
diff --git a/libminifi/include/utils/HTTPClient.h b/libminifi/include/utils/HTTPClient.h
new file mode 100644
index 0000000..2a26847
--- /dev/null
+++ b/libminifi/include/utils/HTTPClient.h
@@ -0,0 +1,301 @@
+/**
+ * HTTPUtils class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __HTTP_UTILS_H__
+#define __HTTP_UTILS_H__
+
+#include <curl/curl.h>
+#include <vector>
+#include <iostream>
+#include <string>
+#include <curl/easy.h>
+#include <uuid/uuid.h>
+#include <regex.h>
+#include <vector>
+#include "controllers/SSLContextService.h"
+#include "ByteInputCallBack.h"
+#include "core/logging/Logger.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "properties/Configure.h"
+#include "io/validation.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+struct HTTPUploadCallback {
+  ByteInputCallBack *ptr;
+  size_t pos;
+};
+
+struct HTTPHeaderResponse {
+ public:
+
+  HTTPHeaderResponse(int max)
+      : max_tokens_(max) {
+  }
+
+  void append(const std::string &header) {
+    if (header_tokens_.size() <= max_tokens_) {
+      header_tokens_.push_back(header);
+    }
+  }
+
+  int max_tokens_;
+  std::vector<std::string> header_tokens_;
+
+  static size_t receive_headers(void *buffer, size_t size, size_t nmemb, void *userp) {
+    HTTPHeaderResponse *pHeaders = (HTTPHeaderResponse *) (userp);
+    int result = 0;
+    if (pHeaders != NULL) {
+      std::string s = "";
+      s.append((char*) buffer, size * nmemb);
+      pHeaders->append(s);
+      result = size * nmemb;
+    }
+    return result;
+  }
+};
+
+/**
+ * HTTP Response object
+ */
+struct HTTPRequestResponse {
+  std::vector<char> data;
+
+  /**
+   * Receive HTTP Response.
+   */
+  static size_t recieve_write(char * data, size_t size, size_t nmemb, void * p) {
+    return static_cast<HTTPRequestResponse*>(p)->write_content(data, size, nmemb);
+  }
+
+  /**
+   * Callback for post, put, and patch operations
+   * @param buffer
+   * @param size size of buffer
+   * @param nitems items to add
+   * @param insteam input stream object.
+   */
+
+  static size_t send_write(char * data, size_t size, size_t nmemb, void * p) {
+    if (p != 0) {
+      HTTPUploadCallback *callback = (HTTPUploadCallback*) p;
+      if (callback->pos <= callback->ptr->getBufferSize()) {
+        char *ptr = callback->ptr->getBuffer();
+        int len = callback->ptr->getBufferSize() - callback->pos;
+        if (len <= 0) {
+          return 0;
+        }
+        if (len > size * nmemb)
+          len = size * nmemb;
+        memcpy(data, callback->ptr->getBuffer() + callback->pos, len);
+        callback->pos += len;
+        return len;
+      }
+    } else {
+      return CURL_READFUNC_ABORT;
+    }
+
+    return 0;
+  }
+
+  size_t write_content(char* ptr, size_t size, size_t nmemb) {
+    data.insert(data.end(), ptr, ptr + size * nmemb);
+    return size * nmemb;
+  }
+
+};
+
+static void parse_url(std::string &url, std::string &host, int &port, std::string &protocol) {
+
+  std::string http("http://");
+  std::string https("https://");
+
+  if (url.compare(0, http.size(), http) == 0)
+    protocol = http;
+
+  if (url.compare(0, https.size(), https) == 0)
+    protocol = https;
+
+  if (!protocol.empty()) {
+    size_t pos = url.find_first_of(":", protocol.size());
+
+    if (pos == std::string::npos) {
+      pos = url.size();
+    }
+
+    host = url.substr(protocol.size(), pos - protocol.size());
+
+    if (pos < url.size() && url[pos] == ':') {
+      size_t ppos = url.find_first_of("/", pos);
+      if (ppos == std::string::npos) {
+        ppos = url.size();
+      }
+      std::string portStr(url.substr(pos + 1, ppos - pos - 1));
+      if (portStr.size() > 0) {
+        port = std::stoi(portStr);
+      }
+    }
+  }
+}
+
+/**
+ * Purpose and Justification: Initializes and cleans up curl once. Cleanup will only occur at the end of our execution since we are relying on a static variable.
+ */
+class HTTPClientInitializer {
+ public:
+  static HTTPClientInitializer *getInstance() {
+    static HTTPClientInitializer initializer;
+    return &initializer;
+  }
+ private:
+  ~HTTPClientInitializer() {
+    curl_global_cleanup();
+  }
+  HTTPClientInitializer() {
+    curl_global_init(CURL_GLOBAL_DEFAULT);
+  }
+};
+
+/**
+ * Purpose and Justification: Pull the basics for an HTTPClient into a self contained class. Simply provide
+ * the URL and an SSLContextService ( can be null).
+ *
+ * Since several portions of the code have been relying on curl, we can encapsulate most CURL HTTP
+ * operations here without maintaining it everywhere. Further, this will help with testing as we
+ * only need to to test our usage of CURL once
+ */
+class HTTPClient {
+ public:
+  HTTPClient(const std::string &url, const std::shared_ptr<minifi::controllers::SSLContextService> ssl_context_service = nullptr);
+
+  ~HTTPClient();
+
+  void setVerbose();
+
+  void initialize(const std::string &method);
+
+  void setConnectionTimeout(int64_t timeout);
+
+  void setReadTimeout(int64_t timeout);
+
+  void setUploadCallback(HTTPUploadCallback *callbackObj);
+
+  struct curl_slist *build_header_list(std::string regex, const std::map<std::string, std::string> &attributes);
+
+  void setContentType(std::string content_type);
+
+  std::string escape(std::string string_to_escape);
+
+  void setPostFields(std::string input);
+
+  void setHeaders(struct curl_slist *list);
+
+  bool submit();
+
+  CURLcode getResponseResult();
+
+  int64_t &getResponseCode();
+
+  const char *getContentType();
+
+  const std::vector<char> &getResponseBody();
+
+  void set_request_method(const std::string method);
+
+  void setUseChunkedEncoding();
+
+  void setDisablePeerVerification();
+
+  const std::vector<std::string> &getHeaders() {
+    return header_response_.header_tokens_;
+
+  }
+
+ protected:
+
+  inline bool matches(const std::string &value, const std::string &sregex);
+
+  static CURLcode configure_ssl_context(CURL *curl, void *ctx, void *param) {
+    minifi::controllers::SSLContextService *ssl_context_service = static_cast<minifi::controllers::SSLContextService*>(param);
+    if (!ssl_context_service->configure_ssl_context(static_cast<SSL_CTX*>(ctx))) {
+      return CURLE_FAILED_INIT;
+    }
+    return CURLE_OK;
+  }
+
+  void configure_secure_connection(CURL *http_session);
+
+  bool isSecure(const std::string &url);
+  struct curl_slist *headers_;
+  utils::HTTPRequestResponse content_;
+  utils::HTTPHeaderResponse header_response_;
+  CURLcode res;
+  int64_t http_code;
+  char *content_type;
+
+  int64_t connect_timeout_;
+  // read timeout.
+  int64_t read_timeout_;
+
+  std::string content_type_;
+
+  std::shared_ptr<logging::Logger> logger_;
+  CURL *http_session_;
+  std::string url_;
+  std::string method_;
+  std::shared_ptr<minifi::controllers::SSLContextService> ssl_context_service_;
+};
+
+//static std::string get_token(HTTPClientstd::string loginUrl, std::string username, std::string password, HTTPSecurityConfiguration &securityConfig) {
+static std::string get_token(HTTPClient &client, std::string username, std::string password) {
+  utils::HTTPRequestResponse content;
+  std::string token;
+
+  client.setContentType("application/x-www-form-urlencoded");
+
+  client.set_request_method("POST");
+
+  std::string payload = "username=" + username + "&" + "password=" + password;
+
+  client.setPostFields(client.escape(payload));
+
+  client.submit();
+
+  if (client.submit() && client.getResponseCode() == 200) {
+
+    const std::string &response_body = std::string(client.getResponseBody().data(), client.getResponseBody().size());
+
+    if (!response_body.empty()) {
+      token = "Bearer " + response_body;
+    }
+  }
+
+  return token;
+}
+
+} /* namespace utils */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/utils/HTTPUtils.h
----------------------------------------------------------------------
diff --git a/libminifi/include/utils/HTTPUtils.h b/libminifi/include/utils/HTTPUtils.h
deleted file mode 100644
index e47bc11..0000000
--- a/libminifi/include/utils/HTTPUtils.h
+++ /dev/null
@@ -1,304 +0,0 @@
-/**
- * HTTPUtils class declaration
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef __HTTP_UTILS_H__
-#define __HTTP_UTILS_H__
-
-#include <curl/curl.h>
-#include <vector>
-#include <iostream>
-#include <string>
-#include <curl/easy.h>
-#include <openssl/ssl.h>
-#include "ByteInputCallBack.h"
-#include "core/logging/Logger.h"
-#include "core/logging/LoggerConfiguration.h"
-#include "properties/Configure.h"
-#include "io/validation.h"
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace utils {
-
-struct CallBackPosition {
-  ByteInputCallBack *ptr;
-  size_t pos;
-};
-
-/**
- * HTTP Response object
- */
-struct HTTPRequestResponse {
-  std::vector<char> data;
-
-  /**
-   * Receive HTTP Response.
-   */
-  static size_t recieve_write(char * data, size_t size, size_t nmemb,
-                              void * p) {
-    return static_cast<HTTPRequestResponse*>(p)->write_content(data, size,
-                                                               nmemb);
-  }
-
-  /**
-   * Callback for post, put, and patch operations
-   * @param buffer
-   * @param size size of buffer
-   * @param nitems items to add
-   * @param insteam input stream object.
-   */
-
-  static size_t send_write(char * data, size_t size, size_t nmemb, void * p) {
-    if (p != 0) {
-      CallBackPosition *callback = (CallBackPosition*) p;
-      if (callback->pos <= callback->ptr->getBufferSize()) {
-        char *ptr = callback->ptr->getBuffer();
-        int len = callback->ptr->getBufferSize() - callback->pos;
-        if (len <= 0) {
-          delete callback->ptr;
-          delete callback;
-          return 0;
-        }
-        if (len > size * nmemb)
-          len = size * nmemb;
-        memcpy(data, callback->ptr->getBuffer() + callback->pos, len);
-        callback->pos += len;
-        return len;
-      }
-    } else {
-      return CURL_READFUNC_ABORT;
-    }
-
-    return 0;
-  }
-
-  size_t write_content(char* ptr, size_t size, size_t nmemb) {
-    data.insert(data.end(), ptr, ptr + size * nmemb);
-    return size * nmemb;
-  }
-
-};
-
-static void parse_url(std::string &url, std::string &host, int &port, std::string &protocol) {
-
-  std::string http("http://");
-  std::string https("https://");
-
-  if (url.compare(0, http.size(), http) == 0)
-    protocol = http;
-
-  if (url.compare(0, https.size(), https) == 0)
-    protocol = https;
-
-  if (!protocol.empty()) {
-    size_t pos = url.find_first_of(":", protocol.size());
-
-    if (pos == std::string::npos) {
-      pos = url.size();
-    }
-
-    host = url.substr(protocol.size(), pos - protocol.size());
-
-    if (pos < url.size() && url[pos] == ':') {
-      size_t ppos = url.find_first_of("/", pos);
-      if (ppos == std::string::npos) {
-        ppos = url.size();
-      }
-      std::string portStr(url.substr(pos + 1, ppos - pos - 1));
-      if (portStr.size() > 0) {
-        port = std::stoi(portStr);
-      }
-    }
-  }
-}
-
-// HTTPSecurityConfiguration
-class HTTPSecurityConfiguration {
-public:
-
-  // Constructor
-  /*!
-   * Create a new HTTPSecurityConfiguration
-   */
-  HTTPSecurityConfiguration(bool need_client_certificate, std::string certificate,
-      std::string private_key, std::string passphrase, std::string ca_certificate) :
-        need_client_certificate_(need_client_certificate), certificate_(certificate),
-        private_key_(private_key), passphrase_(passphrase), ca_certificate_(ca_certificate) {
-    logger_ = logging::LoggerFactory<HTTPSecurityConfiguration>::getLogger();
-  }
-  // Destructor
-  virtual ~HTTPSecurityConfiguration() {
-  }
-
-  HTTPSecurityConfiguration(std::shared_ptr<Configure> configure) {
-    logger_ = logging::LoggerFactory<HTTPSecurityConfiguration>::getLogger();
-    need_client_certificate_ = false;
-    std::string clientAuthStr;
-    if (configure->get(Configure::nifi_https_need_ClientAuth, clientAuthStr)) {
-      org::apache::nifi::minifi::utils::StringUtils::StringToBool(clientAuthStr, this->need_client_certificate_);
-    }
-
-    if (configure->get(Configure::nifi_https_client_ca_certificate, this->ca_certificate_)) {
-      logger_->log_info("HTTPSecurityConfiguration CA certificates: [%s]", this->ca_certificate_);
-    }
-
-    if (this->need_client_certificate_) {
-      std::string passphrase_file;
-
-      if (!(configure->get(Configure::nifi_https_client_certificate, this->certificate_) && configure->get(Configure::nifi_https_client_private_key, this->private_key_))) {
-        logger_->log_error("Certificate and Private Key PEM file not configured for HTTPSecurityConfiguration, error: %s.", std::strerror(errno));
-      }
-
-      if (configure->get(Configure::nifi_https_client_pass_phrase, passphrase_file)) {
-        // load the passphase from file
-        std::ifstream file(passphrase_file.c_str(), std::ifstream::in);
-        if (file.good()) {
-          this->passphrase_.assign((std::istreambuf_iterator<char>(file)), std::istreambuf_iterator<char>());
-          file.close();
-        }
-      }
-
-      logger_->log_info("HTTPSecurityConfiguration certificate: [%s], private key: [%s], passphrase file: [%s]", this->certificate_, this->private_key_, passphrase_file);
-    }
-  }
-
-  /**
-    * Configures a secure connection
-    */
-  void configureSecureConnection(CURL *http_session) {
-    curl_easy_setopt(http_session, CURLOPT_VERBOSE, 1L);
-    curl_easy_setopt(http_session, CURLOPT_CAINFO, this->ca_certificate_.c_str());
-    curl_easy_setopt(http_session, CURLOPT_SSLCERTTYPE, "PEM");
-    curl_easy_setopt(http_session, CURLOPT_SSL_VERIFYPEER, 1L);
-    if (this->need_client_certificate_) {
-      CURLcode ret;
-      ret = curl_easy_setopt(http_session, CURLOPT_SSL_CTX_FUNCTION,
-          &HTTPSecurityConfiguration::configureSSLContext);
-      if (ret != CURLE_OK)
-        logger_->log_error("CURLOPT_SSL_CTX_FUNCTION not supported %d", ret);
-      curl_easy_setopt(http_session, CURLOPT_SSL_CTX_DATA,
-          static_cast<void*>(this));
-      curl_easy_setopt(http_session, CURLOPT_SSLKEYTYPE, "PEM");
-    }
-  }
-
-  static CURLcode configureSSLContext(CURL *curl, void *ctx, void *param) {
-    minifi::utils::HTTPSecurityConfiguration *config =
-        static_cast<minifi::utils::HTTPSecurityConfiguration *>(param);
-    SSL_CTX* sslCtx = static_cast<SSL_CTX*>(ctx);
-
-    SSL_CTX_load_verify_locations(sslCtx, config->ca_certificate_.c_str(), 0);
-    SSL_CTX_use_certificate_file(sslCtx, config->certificate_.c_str(),
-        SSL_FILETYPE_PEM);
-    SSL_CTX_set_default_passwd_cb(sslCtx,
-        HTTPSecurityConfiguration::pemPassWordCb);
-    SSL_CTX_set_default_passwd_cb_userdata(sslCtx, param);
-    SSL_CTX_use_PrivateKey_file(sslCtx, config->private_key_.c_str(),
-        SSL_FILETYPE_PEM);
-    // verify private key
-    if (!SSL_CTX_check_private_key(sslCtx)) {
-      config->logger_->log_error(
-          "Private key does not match the public certificate, error : %s",
-          std::strerror(errno));
-      return CURLE_FAILED_INIT;
-    }
-
-    config->logger_->log_debug(
-        "HTTPSecurityConfiguration load Client Certificates OK");
-    return CURLE_OK;
-  }
-
-  static int pemPassWordCb(char *buf, int size, int rwflag, void *param) {
-    minifi::utils::HTTPSecurityConfiguration *config =
-        static_cast<minifi::utils::HTTPSecurityConfiguration *>(param);
-
-    if (config->passphrase_.length() > 0) {
-      memset(buf, 0x00, size);
-      memcpy(buf, config->passphrase_.c_str(),
-          config->passphrase_.length() - 1);
-      return config->passphrase_.length() - 1;
-    }
-    return 0;
-  }
-
-protected:
-  bool need_client_certificate_;
-  std::string certificate_;
-  std::string private_key_;
-  std::string passphrase_;
-  std::string ca_certificate_;
-
-private:
-  std::shared_ptr<logging::Logger> logger_;
-};
-
-static std::string get_token(std::string loginUrl, std::string username, std::string password, HTTPSecurityConfiguration &securityConfig) {
-  utils::HTTPRequestResponse content;
-  std::string token;
-  CURL *login_session = curl_easy_init();
-  if (loginUrl.find("https") != std::string::npos) {
-     securityConfig.configureSecureConnection(login_session);
-   }
-  curl_easy_setopt(login_session, CURLOPT_URL, loginUrl.c_str());
-  struct curl_slist *list = NULL;
-  list = curl_slist_append(list, "Content-Type: application/x-www-form-urlencoded");
-  list = curl_slist_append(list, "Accept: text/plain");
-  curl_easy_setopt(login_session, CURLOPT_HTTPHEADER, list);
-  std::string payload = "username=" + username + "&" + "password=" + password;
-  char *output = curl_easy_escape(login_session, payload.c_str(), payload.length());
-  curl_easy_setopt(login_session, CURLOPT_WRITEFUNCTION,
-      &utils::HTTPRequestResponse::recieve_write);
-  curl_easy_setopt(login_session, CURLOPT_WRITEDATA,
-      static_cast<void*>(&content));
-  curl_easy_setopt(login_session, CURLOPT_POSTFIELDSIZE, strlen(output));
-  curl_easy_setopt(login_session, CURLOPT_POSTFIELDS, output);
-  curl_easy_setopt(login_session, CURLOPT_POST, 1);
-  CURLcode res = curl_easy_perform(login_session);
-  curl_slist_free_all(list);
-  curl_free(output);
-  if (res == CURLE_OK) {
-    std::string response_body(content.data.begin(), content.data.end());
-    int64_t http_code = 0;
-    curl_easy_getinfo(login_session, CURLINFO_RESPONSE_CODE, &http_code);
-    char *content_type;
-    /* ask for the content-type */
-    curl_easy_getinfo(login_session, CURLINFO_CONTENT_TYPE, &content_type);
-
-    bool isSuccess = ((int32_t) (http_code / 100)) == 2
-        && res != CURLE_ABORTED_BY_CALLBACK;
-    bool body_empty = IsNullOrEmpty(content.data);
-
-    if (isSuccess && !body_empty) {
-      token = "Bearer " + response_body;
-    }
-  }
-  curl_easy_cleanup(login_session);
-
-  return token;
-}
-
-
-} /* namespace utils */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
-
-#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/utils/Id.h
----------------------------------------------------------------------
diff --git a/libminifi/include/utils/Id.h b/libminifi/include/utils/Id.h
index a4f8239..8431548 100644
--- a/libminifi/include/utils/Id.h
+++ b/libminifi/include/utils/Id.h
@@ -41,7 +41,7 @@ class IdGenerator {
  public:
   void generate(uuid_t output);
   void initialize(const std::shared_ptr<Properties> & properties);
-  
+
   static std::shared_ptr<IdGenerator> getIdGenerator() {
     static std::shared_ptr<IdGenerator> generator = std::shared_ptr<IdGenerator>(new IdGenerator());
     return generator;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/utils/StringUtils.h
----------------------------------------------------------------------
diff --git a/libminifi/include/utils/StringUtils.h b/libminifi/include/utils/StringUtils.h
index b754467..3f8fbea 100644
--- a/libminifi/include/utils/StringUtils.h
+++ b/libminifi/include/utils/StringUtils.h
@@ -105,7 +105,7 @@ class StringUtils {
     } catch (const std::invalid_argument &ie) {
       switch (cp) {
         case RETURN:
-          case NOTHING:
+        case NOTHING:
           return false;
         case EXIT:
           exit(1);
@@ -115,7 +115,7 @@ class StringUtils {
     } catch (const std::out_of_range &ofr) {
       switch (cp) {
         case RETURN:
-          case NOTHING:
+        case NOTHING:
           return false;
         case EXIT:
           exit(1);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/utils/ThreadPool.h
----------------------------------------------------------------------
diff --git a/libminifi/include/utils/ThreadPool.h b/libminifi/include/utils/ThreadPool.h
index 5335c81..0c71922 100644
--- a/libminifi/include/utils/ThreadPool.h
+++ b/libminifi/include/utils/ThreadPool.h
@@ -21,6 +21,7 @@
 #include <iostream>
 #include <atomic>
 #include <mutex>
+#include <map>
 #include <vector>
 #include <queue>
 #include <future>
@@ -129,8 +130,8 @@ class Worker {
   virtual uint64_t getTimeSlice() {
     return time_slice_;
   }
-  
-  virtual uint64_t getWaitTime(){
+
+  virtual uint64_t getWaitTime() {
     return run_determinant_->wait_time();
   }
 
@@ -147,8 +148,7 @@ class Worker {
  protected:
 
   inline uint64_t increment_time(const uint64_t &time) {
-    std::chrono::time_point<std::chrono::system_clock> now =
-        std::chrono::system_clock::now();
+    std::chrono::time_point<std::chrono::system_clock> now = std::chrono::system_clock::now();
     auto millis = std::chrono::duration_cast<std::chrono::milliseconds>(now.time_since_epoch()).count();
     return millis + time;
   }
@@ -161,6 +161,14 @@ class Worker {
 };
 
 template<typename T>
+class WorkerComparator {
+ public:
+  bool operator()(Worker<T> &a, Worker<T> &b) {
+    return a.getTimeSlice() < b.getTimeSlice();
+  }
+};
+
+template<typename T>
 Worker<T>& Worker<T>::operator =(Worker<T> && other) {
   task = std::move(other.task);
   promise = other.promise;
@@ -185,7 +193,7 @@ template<typename T>
 class ThreadPool {
  public:
 
-  ThreadPool(int max_worker_threads = 8, bool daemon_threads = false)
+  ThreadPool(int max_worker_threads = 2, bool daemon_threads = false)
       : max_worker_threads_(max_worker_threads),
         daemon_threads_(daemon_threads),
         running_(false) {
@@ -298,6 +306,7 @@ class ThreadPool {
   std::atomic<bool> running_;
   // worker queue of worker objects
   moodycamel::ConcurrentQueue<Worker<T>> worker_queue_;
+  std::priority_queue<Worker<T>, std::vector<Worker<T>>, WorkerComparator<T>> worker_priority_queue_;
   // notification for available work
   std::condition_variable tasks_available_;
   // map to identify if a task should be
@@ -336,7 +345,7 @@ bool ThreadPool<T>::execute(Worker<T> &&task, std::future<T> &future) {
 template<typename T>
 void ThreadPool<T>::startWorkers() {
   for (int i = 0; i < max_worker_threads_; i++) {
-    thread_queue_.push_back(std::thread(&ThreadPool::run_tasks, this));
+    thread_queue_.push_back(std::move(std::thread(&ThreadPool::run_tasks, this)));
     current_workers_++;
   }
 
@@ -360,26 +369,33 @@ void ThreadPool<T>::run_tasks() {
     // they are eligible to run per the fact that the thread pool isn't shut down and the tasks are in a runnable state
     // BUT they've been continually timesliced, we will lower the wait decay to 100ms and continue incrementing from
     // there. This ensures we don't have arbitrarily long sleep cycles. 
-    if (wait_decay_ > 500000000L){
-     wait_decay_ = 100000000L;
+    if (wait_decay_ > 500000000L) {
+      wait_decay_ = 100000000L;
     }
     // if we are spinning, perform a wait. If something changes in the worker such that the timeslice has changed, we will pick that information up. Note that it's possible
     // we could starve for processing time if all workers are waiting. In the event that the number of workers far exceeds the number of threads, threads will spin and potentially
     // wait until they arrive at a task that can be run. In this case we reset the wait_decay and attempt to pick up a new task. This means that threads that recently ran should
     // be more likely to run. This is intentional.
-    
+
     if (wait_decay_ > 2000) {
       std::this_thread::sleep_for(std::chrono::nanoseconds(wait_decay_));
     }
     Worker<T> task;
     if (!worker_queue_.try_dequeue(task)) {
-
       std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+      if (worker_priority_queue_.size() > 0) {
+        // this is safe as we are going to immediately pop the queue
+        while (!worker_priority_queue_.empty()) {
+          task = std::move(const_cast<Worker<T>&>(worker_priority_queue_.top()));
+          worker_priority_queue_.pop();
+          worker_queue_.enqueue(std::move(task));
+          continue;
+        }
+
+      }
       tasks_available_.wait_for(lock, waitperiod);
       continue;
-    }
-    else {
-
+    } else {
       std::unique_lock<std::mutex> lock(worker_queue_mutex_);
       if (!task_status_[task.getIdentifier()]) {
         continue;
@@ -388,12 +404,12 @@ void ThreadPool<T>::run_tasks() {
 
     bool wait_to_run = false;
     if (task.getTimeSlice() > 1) {
-      double wt = (double)task.getWaitTime();
+      double wt = (double) task.getWaitTime();
       auto now = std::chrono::system_clock::now().time_since_epoch();
       auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(now).count();
       // if our differential is < 10% of the wait time we will not put the task into a wait state
       // since requeuing will break the time slice contract.
-      if (task.getTimeSlice() > ms && (task.getTimeSlice() - ms) > (wt*.10)) {
+      if (task.getTimeSlice() > ms && (task.getTimeSlice() - ms) > (wt * .10)) {
         wait_to_run = true;
       }
     }
@@ -404,8 +420,10 @@ void ThreadPool<T>::run_tasks() {
         if (!task_status_[task.getIdentifier()]) {
           continue;
         }
+        // put it on the priority queue
+        worker_priority_queue_.push(std::move(task));
       }
-      worker_queue_.enqueue(std::move(task));
+      //worker_queue_.enqueue(std::move(task));
 
       wait_decay_ += 25;
       continue;
@@ -423,7 +441,6 @@ void ThreadPool<T>::run_tasks() {
         }
       }
       worker_queue_.enqueue(std::move(task));
-
     }
   }
   current_workers_--;
@@ -434,7 +451,7 @@ void ThreadPool<T>::start() {
   std::lock_guard<std::recursive_mutex> lock(manager_mutex_);
   if (!running_) {
     running_ = true;
-    manager_thread_ = std::thread(&ThreadPool::startWorkers, this);
+    manager_thread_ = std::move(std::thread(&ThreadPool::startWorkers, this));
     if (worker_queue_.size_approx() > 0) {
       tasks_available_.notify_all();
     }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/CPPLINT.cfg
----------------------------------------------------------------------
diff --git a/libminifi/src/CPPLINT.cfg b/libminifi/src/CPPLINT.cfg
new file mode 100644
index 0000000..9205687
--- /dev/null
+++ b/libminifi/src/CPPLINT.cfg
@@ -0,0 +1,3 @@
+set noparent
+filter=-build/include_order,-build/include_alpha
+exclude_files=ResourceClaim.cpp

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/ConfigurationListener.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/ConfigurationListener.cpp b/libminifi/src/ConfigurationListener.cpp
deleted file mode 100644
index 858e455..0000000
--- a/libminifi/src/ConfigurationListener.cpp
+++ /dev/null
@@ -1,87 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "ConfigurationListener.h"
-#include "FlowController.h"
-#include <openssl/ssl.h>
-#include <openssl/err.h>
-#include <string>
-#include <memory>
-#include <utility>
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-
-void ConfigurationListener::start() {
-  if (running_)
-    return;
-
-  pull_interval_ = 60 * 1000;
-  std::string value;
-  // grab the value for configuration
-  if (configure_->get(Configure::nifi_configuration_listener_pull_interval, value)) {
-    core::TimeUnit unit;
-    if (core::Property::StringToTime(value, pull_interval_, unit) && core::Property::ConvertTimeUnitToMS(pull_interval_, unit, pull_interval_)) {
-      logger_->log_info("Configuration Listener pull interval: [%d] ms", pull_interval_);
-    }
-  }
-
-  thread_ = std::thread(&ConfigurationListener::threadExecutor, this);
-  thread_.detach();
-  running_ = true;
-  logger_->log_info("%s ConfigurationListener Thread Start", type_);
-}
-
-void ConfigurationListener::stop() {
-  if (!running_)
-    return;
-  running_ = false;
-  if (thread_.joinable())
-    thread_.join();
-  logger_->log_info("%s ConfigurationListener Thread Stop", type_);
-}
-
-void ConfigurationListener::run() {
-  std::unique_lock < std::mutex > lk(mutex_);
-  std::condition_variable cv;
-  int64_t interval = 0;
-  while (!cv.wait_for(lk, std::chrono::milliseconds(100), [this] {return (running_ == false);})) {
-    interval += 100;
-    if (interval >= pull_interval_) {
-      std::string payload;
-      bool ret = false;
-      ret = pullConfiguration(payload);
-      if (ret) {
-        if (payload.empty() || payload == lastAppliedConfiguration) {
-          interval = 0;
-          continue;
-        }
-        ret = this->controller_->applyConfiguration(payload);
-        if (ret)
-          this->lastAppliedConfiguration = payload;
-      }
-      interval = 0;
-    }
-  }
-}
-
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/Configure.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/Configure.cpp b/libminifi/src/Configure.cpp
index acad1fd..9dcd6c6 100644
--- a/libminifi/src/Configure.cpp
+++ b/libminifi/src/Configure.cpp
@@ -23,6 +23,7 @@ namespace nifi {
 namespace minifi {
 
 const char *Configure::nifi_default_directory = "nifi.default.directory";
+const char *Configure::nifi_c2_enable = "nifi.c2.enable";
 const char *Configure::nifi_flow_configuration_file = "nifi.flow.configuration.file";
 const char *Configure::nifi_flow_engine_threads = "nifi.flow.engine.threads";
 const char *Configure::nifi_administrative_yield_duration = "nifi.administrative.yield.duration";

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/Connection.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/Connection.cpp b/libminifi/src/Connection.cpp
index 1d937b4..082063d 100644
--- a/libminifi/src/Connection.cpp
+++ b/libminifi/src/Connection.cpp
@@ -104,7 +104,7 @@ void Connection::put(std::shared_ptr<core::FlowFile> flow) {
 
   // Notify receiving processor that work may be available
   if (dest_connectable_) {
-    logger_->log_debug("Notifying %s", dest_connectable_->getName());
+    logger_->log_debug("Notifying %s that %s was inserted", dest_connectable_->getName(), flow->getUUIDStr());
     dest_connectable_->notifyWork();
   }
 }
@@ -122,6 +122,7 @@ std::shared_ptr<core::FlowFile> Connection::poll(std::set<std::shared_ptr<core::
       if (getTimeMillis() > (item->getEntryDate() + expired_duration_)) {
         // Flow record expired
         expiredFlowRecords.insert(item);
+        logger_->log_debug("Delete flow file UUID %s from connection %s, because it expired", item->getUUIDStr().c_str(), name_.c_str());
         if (flow_repository_->Delete(item->getUUIDStr())) {
           item->setStoredToRepository(false);
         }
@@ -136,12 +137,6 @@ std::shared_ptr<core::FlowFile> Connection::poll(std::set<std::shared_ptr<core::
         std::shared_ptr<Connectable> connectable = std::static_pointer_cast<Connectable>(shared_from_this());
         item->setOriginalConnection(connectable);
         logger_->log_debug("Dequeue flow file UUID %s from connection %s", item->getUUIDStr().c_str(), name_.c_str());
-
-        // delete from the flowfile repo
-        if (flow_repository_->Delete(item->getUUIDStr())) {
-          item->setStoredToRepository(false);
-        }
-
         return item;
       }
     } else {
@@ -155,11 +150,6 @@ std::shared_ptr<core::FlowFile> Connection::poll(std::set<std::shared_ptr<core::
       std::shared_ptr<Connectable> connectable = std::static_pointer_cast<Connectable>(shared_from_this());
       item->setOriginalConnection(connectable);
       logger_->log_debug("Dequeue flow file UUID %s from connection %s", item->getUUIDStr().c_str(), name_.c_str());
-      // delete from the flowfile repo
-      if (flow_repository_->Delete(item->getUUIDStr())) {
-        item->setStoredToRepository(false);
-      }
-
       return item;
     }
   }
@@ -171,10 +161,14 @@ void Connection::drain() {
   std::lock_guard<std::mutex> lock(mutex_);
 
   while (!queue_.empty()) {
-    auto &&item = queue_.front();
+    std::shared_ptr<core::FlowFile> item = queue_.front();
     queue_.pop();
+    logger_->log_debug("Delete flow file UUID %s from connection %s, because it expired", item->getUUIDStr().c_str(), name_.c_str());
+    if (flow_repository_->Delete(item->getUUIDStr())) {
+      item->setStoredToRepository(false);
+    }
   }
-
+  queued_data_size_ = 0;
   logger_->log_debug("Drain connection %s", name_.c_str());
 }
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/EventDrivenSchedulingAgent.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/EventDrivenSchedulingAgent.cpp b/libminifi/src/EventDrivenSchedulingAgent.cpp
index db5ca08..c56ac58 100644
--- a/libminifi/src/EventDrivenSchedulingAgent.cpp
+++ b/libminifi/src/EventDrivenSchedulingAgent.cpp
@@ -32,7 +32,8 @@ namespace apache {
 namespace nifi {
 namespace minifi {
 
-uint64_t EventDrivenSchedulingAgent::run(std::shared_ptr<core::Processor> processor, core::ProcessContext *processContext, core::ProcessSessionFactory *sessionFactory) {
+uint64_t EventDrivenSchedulingAgent::run(const std::shared_ptr<core::Processor> &processor, const std::shared_ptr<core::ProcessContext> &processContext,
+                                         const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
   while (this->running_) {
     bool shouldYield = this->onTrigger(processor, processContext, sessionFactory);
 


Mime
View raw message