Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 581D0200D35 for ; Mon, 2 Oct 2017 16:57:18 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 568571609DE; Mon, 2 Oct 2017 14:57:18 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id D8C88160BDA for ; Mon, 2 Oct 2017 16:57:15 +0200 (CEST) Received: (qmail 69372 invoked by uid 500); 2 Oct 2017 14:57:15 -0000 Mailing-List: contact commits-help@nifi.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@nifi.apache.org Delivered-To: mailing list commits@nifi.apache.org Received: (qmail 69259 invoked by uid 99); 2 Oct 2017 14:57:14 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 02 Oct 2017 14:57:14 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 82148F5B71; Mon, 2 Oct 2017 14:57:14 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: aldrin@apache.org To: commits@nifi.apache.org Date: Mon, 02 Oct 2017 14:57:19 -0000 Message-Id: <581eef3a19714a05b461370632566510@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [6/8] nifi-minifi-cpp git commit: MINIFI-339: Add C2 base allowing for 1 protocol and n heartbeat reporters MINIFI-339: Add GetTCP Processor MINIFI-339: Add listener server MINIFI-339: Update to listener MINIFI-339: Resolve Issue with stack based process archived-at: Mon, 02 Oct 2017 14:57:18 -0000 http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/core/state/metrics/ProcessMetrics.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/state/metrics/ProcessMetrics.h b/libminifi/include/core/state/metrics/ProcessMetrics.h new file mode 100644 index 0000000..f3f911d --- /dev/null +++ b/libminifi/include/core/state/metrics/ProcessMetrics.h @@ -0,0 +1,102 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CORE_STATE_METRICS_PROCMETRICS_H_ +#define LIBMINIFI_INCLUDE_CORE_STATE_METRICS_PROCMETRICS_H_ + +#include "core/Resource.h" +#include +#include +#include +#include +#include "MetricsBase.h" +#include "Connection.h" +#include "DeviceInformation.h" +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace state { +namespace metrics { + +/** + * Justification and Purpose: Provides Connection queue metrics. Provides critical information to the + * C2 server. + * + */ +class ProcessMetrics : public Metrics { + public: + + ProcessMetrics(const std::string &name, uuid_t uuid) + : Metrics(name, uuid) { + } + + ProcessMetrics(const std::string &name) + : Metrics(name, 0) { + } + + ProcessMetrics() { + } + + std::string getName() { + return "ProcessMetrics"; + } + + std::vector serialize() { + std::vector serialized; + + struct rusage my_usage; + getrusage(RUSAGE_SELF, &my_usage); + + MetricResponse memory; + memory.name = "MemoryMetrics"; + + MetricResponse maxrss; + maxrss.name = "maxrss"; + + maxrss.value = std::to_string(my_usage.ru_maxrss); + + memory.children.push_back(maxrss); + serialized.push_back(memory); + + MetricResponse cpu; + cpu.name = "CpuMetrics"; + MetricResponse ics; + ics.name = "involcs"; + + ics.value = std::to_string(my_usage.ru_nivcsw); + + cpu.children.push_back(ics); + serialized.push_back(cpu); + + return serialized; + } + + protected: + +}; + +REGISTER_RESOURCE(ProcessMetrics); + +} /* namespace metrics */ +} /* namespace state */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif /* LIBMINIFI_INCLUDE_CORE_STATE_METRICS_QUEUEMETRICS_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/core/state/metrics/QueueMetrics.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/state/metrics/QueueMetrics.h b/libminifi/include/core/state/metrics/QueueMetrics.h new file mode 100644 index 0000000..018e70b --- /dev/null +++ b/libminifi/include/core/state/metrics/QueueMetrics.h @@ -0,0 +1,106 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CORE_STATE_METRICS_QUEUEMETRICS_H_ +#define LIBMINIFI_INCLUDE_CORE_STATE_METRICS_QUEUEMETRICS_H_ + +#include +#include + +#include "MetricsBase.h" +#include "Connection.h" +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace state { +namespace metrics { + +/** + * Justification and Purpose: Provides Connection queue metrics. Provides critical information to the + * C2 server. + * + */ +class QueueMetrics : public Metrics { + public: + + QueueMetrics(const std::string &name, uuid_t uuid) + : Metrics(name, uuid) { + } + + QueueMetrics(const std::string &name) + : Metrics(name, 0) { + } + + QueueMetrics() + : Metrics("QueueMetrics", 0) { + } + + std::string getName() { + return "QueueMetrics"; + } + + void addConnection(const std::shared_ptr &connection) { + if (nullptr != connection) { + connections.insert(std::make_pair(connection->getName(), connection)); + } + } + + std::vector serialize() { + std::vector serialized; + for (auto conn : connections) { + auto connection = conn.second; + MetricResponse parent; + parent.name = connection->getName(); + MetricResponse datasize; + datasize.name = "datasize"; + datasize.value = std::to_string(connection->getQueueDataSize()); + + MetricResponse datasizemax; + datasizemax.name = "datasizemax"; + datasizemax.value = std::to_string(connection->getMaxQueueDataSize()); + + MetricResponse queuesize; + queuesize.name = "queued"; + queuesize.value = std::to_string(connection->getQueueSize()); + + MetricResponse queuesizemax; + queuesizemax.name = "queuedmax"; + queuesizemax.value = std::to_string(connection->getMaxQueueSize()); + + parent.children.push_back(datasize); + parent.children.push_back(datasizemax); + parent.children.push_back(queuesize); + parent.children.push_back(queuesizemax); + + serialized.push_back(parent); + } + return serialized; + } + + protected: + std::map> connections; +}; + +} /* namespace metrics */ +} /* namespace state */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif /* LIBMINIFI_INCLUDE_CORE_STATE_METRICS_QUEUEMETRICS_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/core/state/metrics/RepositoryMetrics.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/state/metrics/RepositoryMetrics.h b/libminifi/include/core/state/metrics/RepositoryMetrics.h new file mode 100644 index 0000000..fa37e94 --- /dev/null +++ b/libminifi/include/core/state/metrics/RepositoryMetrics.h @@ -0,0 +1,101 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CORE_STATE_METRICS_REPOSITORYMETRICS_H_ +#define LIBMINIFI_INCLUDE_CORE_STATE_METRICS_REPOSITORYMETRICS_H_ + +#include +#include + +#include "MetricsBase.h" +#include "Connection.h" +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace state { +namespace metrics { + +/** + * Justification and Purpose: Provides repository metrics. Provides critical information to the + * C2 server. + * + */ +class RepositoryMetrics : public Metrics { + public: + + RepositoryMetrics(const std::string &name, uuid_t uuid) + : Metrics(name, uuid) { + } + + RepositoryMetrics(const std::string &name) + : Metrics(name, 0) { + } + + RepositoryMetrics() + : Metrics("RepositoryMetrics", 0) { + } + + std::string getName() { + return "RepositoryMetrics"; + } + + void addRepository(const std::shared_ptr &repo) { + if (nullptr != repo) { + repositories.insert(std::make_pair(repo->getName(), repo)); + } + } + + std::vector serialize() { + std::vector serialized; + for (auto conn : repositories) { + auto repo = conn.second; + MetricResponse parent; + parent.name = repo->getName(); + MetricResponse datasize; + datasize.name = "running"; + datasize.value = std::to_string(repo->isRunning()); + + MetricResponse datasizemax; + datasizemax.name = "full"; + datasizemax.value = std::to_string(repo->isFull()); + + MetricResponse queuesize; + queuesize.name = "size"; + queuesize.value = std::to_string(repo->getRepoSize()); + + parent.children.push_back(datasize); + parent.children.push_back(datasizemax); + parent.children.push_back(queuesize); + + serialized.push_back(parent); + } + return serialized; + } + + protected: + std::map> repositories; +}; + +} /* namespace metrics */ +} /* namespace state */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif /* LIBMINIFI_INCLUDE_CORE_STATE_METRICS_RepositoryMetrics_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/core/state/metrics/SystemMetrics.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/state/metrics/SystemMetrics.h b/libminifi/include/core/state/metrics/SystemMetrics.h new file mode 100644 index 0000000..4ac2b52 --- /dev/null +++ b/libminifi/include/core/state/metrics/SystemMetrics.h @@ -0,0 +1,109 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CORE_STATE_METRICS_SYSMETRICS_H_ +#define LIBMINIFI_INCLUDE_CORE_STATE_METRICS_SYSMETRICS_H_ + +#include "core/Resource.h" +#include +#include +#ifndef _WIN32 +#include +#endif +#include "MetricsBase.h" +#include "Connection.h" +#include "DeviceInformation.h" +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace state { +namespace metrics { + +/** + * Justification and Purpose: Provides system information, including critical device information. + * + */ +class SystemInformation : public DeviceInformation { + public: + + SystemInformation(const std::string &name, uuid_t uuid) + : DeviceInformation(name, uuid) { + } + + SystemInformation(const std::string &name) + : DeviceInformation(name, 0) { + } + + SystemInformation() + : DeviceInformation("SystemInformation", 0) { + } + + std::string getName() { + return "SystemInformation"; + } + + std::vector serialize() { + std::vector serialized; + + MetricResponse vcores; + vcores.name = "vcores"; + int cpus[2] = { 0 }; + size_t ncpus = std::thread::hardware_concurrency(); + + vcores.value = std::to_string(ncpus); + + serialized.push_back(vcores); + + MetricResponse mem; + mem.name = "physicalmem"; +#if defined(_SC_PHYS_PAGES) && defined(_SC_PAGESIZE) + size_t mema = (size_t) sysconf( _SC_PHYS_PAGES) * (size_t) sysconf( _SC_PAGESIZE); +#endif + mem.value = std::to_string(mema); + + serialized.push_back(mem); + + MetricResponse arch; + arch.name = "machinearch"; + + utsname buf; + + if (uname(&buf) == -1) { + arch.value = "unknown"; + } else { + arch.value = buf.machine; + } + + serialized.push_back(arch); + + return serialized; + } + + protected: + +}; + +REGISTER_RESOURCE(SystemInformation); +} /* namespace metrics */ +} /* namespace state */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif /* LIBMINIFI_INCLUDE_CORE_STATE_METRICS_QUEUEMETRICS_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/core/yaml/YamlConfiguration.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/yaml/YamlConfiguration.h b/libminifi/include/core/yaml/YamlConfiguration.h index 17b060f..0f247e7 100644 --- a/libminifi/include/core/yaml/YamlConfiguration.h +++ b/libminifi/include/core/yaml/YamlConfiguration.h @@ -51,7 +51,7 @@ class YamlConfiguration : public FlowConfiguration { : FlowConfiguration(repo, flow_file_repo, content_repo, stream_factory, configuration, path), logger_(logging::LoggerFactory::getLogger()) { stream_factory_ = stream_factory; - if (IsNullOrEmpty (config_path_)) { + if (IsNullOrEmpty(config_path_)) { config_path_ = DEFAULT_FLOW_YAML_FILE_NAME; } } @@ -102,7 +102,7 @@ class YamlConfiguration : public FlowConfiguration { * @return the root ProcessGroup node of the flow * configuration tree */ - std::unique_ptr getRootFromPayload(std::string &yamlConfigPayload) { + std::unique_ptr getRootFromPayload(const std::string &yamlConfigPayload) { YAML::Node rootYamlNode = YAML::Load(yamlConfigPayload); return getRoot(&rootYamlNode); } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/io/AtomicEntryStream.h ---------------------------------------------------------------------- diff --git a/libminifi/include/io/AtomicEntryStream.h b/libminifi/include/io/AtomicEntryStream.h index 5f200f0..3eb456e 100644 --- a/libminifi/include/io/AtomicEntryStream.h +++ b/libminifi/include/io/AtomicEntryStream.h @@ -48,7 +48,7 @@ class AtomicEntryStream : public BaseStream { invalid_stream_ = true; } } - + virtual ~AtomicEntryStream(); virtual void closeStream() { @@ -116,9 +116,9 @@ class AtomicEntryStream : public BaseStream { }; template -AtomicEntryStream::~AtomicEntryStream(){ +AtomicEntryStream::~AtomicEntryStream() { logger_->log_debug("Decrementing"); - entry_->decrementOwnership(); + entry_->decrementOwnership(); } template @@ -141,13 +141,11 @@ int AtomicEntryStream::writeData(uint8_t *value, int size) { std::lock_guard lock(entry_lock_); if (entry_->insert(key_, value, size)) { offset_ += size; - if (offset_ > length_) - { + if (offset_ > length_) { length_ = offset_; } return size; - } - else { + } else { logger_->log_debug("Cannot insert %d bytes due to insufficient space in atomic entry", size); } @@ -158,7 +156,7 @@ int AtomicEntryStream::writeData(uint8_t *value, int size) { template int AtomicEntryStream::readData(std::vector &buf, int buflen) { - if (invalid_stream_){ + if (invalid_stream_) { return -1; } if (buf.capacity() < buflen) { @@ -182,13 +180,13 @@ int AtomicEntryStream::readData(uint8_t *buf, int buflen) { if (offset_ + len > value->getBufferSize()) { len = value->getBufferSize() - offset_; if (len <= 0) { - entry_->decrementOwnership(); + entry_->decrementOwnership(); return 0; } } std::memcpy(buf, reinterpret_cast(const_cast(value->getBuffer()) + offset_), len); offset_ += len; - entry_->decrementOwnership(); + entry_->decrementOwnership(); return len; } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/io/BaseStream.h ---------------------------------------------------------------------- diff --git a/libminifi/include/io/BaseStream.h b/libminifi/include/io/BaseStream.h index cd982bb..dc810e3 100644 --- a/libminifi/include/io/BaseStream.h +++ b/libminifi/include/io/BaseStream.h @@ -178,7 +178,7 @@ class BaseStream : public DataStream, public Serializable { * @return resulting read size **/ virtual int readUTF(std::string &str, bool widen = false); - protected: + protected: DataStream *composable_stream_; }; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/io/CRCStream.h ---------------------------------------------------------------------- diff --git a/libminifi/include/io/CRCStream.h b/libminifi/include/io/CRCStream.h index 696f418..70cb89a 100644 --- a/libminifi/include/io/CRCStream.h +++ b/libminifi/include/io/CRCStream.h @@ -133,7 +133,7 @@ class CRCStream : public BaseStream { } void reset(); - protected: + protected: /** * Creates a vector and returns the vector using the provided http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/io/ClientSocket.h ---------------------------------------------------------------------- diff --git a/libminifi/include/io/ClientSocket.h b/libminifi/include/io/ClientSocket.h index cd8a4fc..c9ad90e 100644 --- a/libminifi/include/io/ClientSocket.h +++ b/libminifi/include/io/ClientSocket.h @@ -101,6 +101,11 @@ class Socket : public BaseStream { */ virtual int16_t initialize(); + /** + * Sets the non blocking flag on the file descriptor. + */ + void setNonBlocking(); + std::string getHostname() const; /** @@ -114,14 +119,35 @@ class Socket : public BaseStream { * Reads data and places it into buf * @param buf buffer in which we extract data * @param buflen + * @param retrieve_all_bytes determines if we should read all bytes before returning + */ + virtual int readData(std::vector &buf, int buflen) { + return readData(buf, buflen, true); + } + /** + * Reads data and places it into buf + * @param buf buffer in which we extract data + * @param buflen + * @param retrieve_all_bytes determines if we should read all bytes before returning + */ + virtual int readData(uint8_t *buf, int buflen) { + return readData(buf, buflen, true); + } + + /** + * Reads data and places it into buf + * @param buf buffer in which we extract data + * @param buflen + * @param retrieve_all_bytes determines if we should read all bytes before returning */ - virtual int readData(std::vector &buf, int buflen); + virtual int readData(std::vector &buf, int buflen, bool retrieve_all_bytes); /** * Reads data and places it into buf * @param buf buffer in which we extract data * @param buflen + * @param retrieve_all_bytes determines if we should read all bytes before returning */ - virtual int readData(uint8_t *buf, int buflen); + virtual int readData(uint8_t *buf, int buflen, bool retrieve_all_bytes); /** * Write value to the stream using std::vector http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/io/EndianCheck.h ---------------------------------------------------------------------- diff --git a/libminifi/include/io/EndianCheck.h b/libminifi/include/io/EndianCheck.h index c4d4504..3ceb19c 100644 --- a/libminifi/include/io/EndianCheck.h +++ b/libminifi/include/io/EndianCheck.h @@ -30,7 +30,7 @@ namespace io { class EndiannessCheck { public: static bool IS_LITTLE; - private: + private: static bool is_little_endian() { /* do whatever is needed at static init time */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/io/FileStream.h ---------------------------------------------------------------------- diff --git a/libminifi/include/io/FileStream.h b/libminifi/include/io/FileStream.h index 23a1f0b..0cddcc2 100644 --- a/libminifi/include/io/FileStream.h +++ b/libminifi/include/io/FileStream.h @@ -45,7 +45,7 @@ class FileStream : public io::BaseStream { * File Stream constructor that accepts an fstream shared pointer. * It must already be initialized for read and write. */ - explicit FileStream(const std::string &path, uint32_t offset, bool write_enable = false); + explicit FileStream(const std::string &path, uint32_t offset, bool write_enable = false); /** * File Stream constructor that accepts an fstream shared pointer. http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/processors/GetFile.h ---------------------------------------------------------------------- diff --git a/libminifi/include/processors/GetFile.h b/libminifi/include/processors/GetFile.h index df8f775..c193a8d 100644 --- a/libminifi/include/processors/GetFile.h +++ b/libminifi/include/processors/GetFile.h @@ -25,6 +25,7 @@ #include "core/Core.h" #include "core/Resource.h" #include "core/logging/LoggerConfiguration.h" +#include "core/state/metrics/MetricsBase.h" namespace org { namespace apache { @@ -43,8 +44,63 @@ struct GetFileRequest { std::string fileFilter = "[^\\.].*"; }; +class GetFileMetrics : public state::metrics::Metrics { + public: + GetFileMetrics() + : state::metrics::Metrics("GetFileMetrics", 0) { + iterations_ = 0; + accepted_files_ = 0; + input_bytes_ = 0; + } + + GetFileMetrics(std::string name, uuid_t uuid) + : state::metrics::Metrics(name, uuid) { + iterations_ = 0; + accepted_files_ = 0; + input_bytes_ = 0; + } + virtual ~GetFileMetrics() { + + } + virtual std::string getName() { + return core::Connectable::getName(); + } + + virtual std::vector serialize() { + std::vector resp; + + state::metrics::MetricResponse iter; + iter.name = "OnTriggerInvocations"; + iter.value = std::to_string(iterations_.load()); + + resp.push_back(iter); + + state::metrics::MetricResponse accepted_files; + accepted_files.name = "AcceptedFiles"; + accepted_files.value = std::to_string(accepted_files_.load()); + + resp.push_back(accepted_files); + + state::metrics::MetricResponse input_bytes; + input_bytes.name = "InputBytes"; + input_bytes.value = std::to_string(input_bytes_.load()); + + resp.push_back(input_bytes); + + return resp; + } + + protected: + friend class GetFile; + + std::atomic iterations_; + std::atomic accepted_files_; + std::atomic input_bytes_; + +}; + // GetFile Class -class GetFile : public core::Processor { +class GetFile : public core::Processor, public state::metrics::MetricsSource { public: // Constructor /*! @@ -53,7 +109,7 @@ class GetFile : public core::Processor { explicit GetFile(std::string name, uuid_t uuid = NULL) : Processor(name, uuid), logger_(logging::LoggerFactory::getLogger()) { - + metrics_ = std::make_shared(); } // Destructor virtual ~GetFile() { @@ -99,10 +155,14 @@ class GetFile : public core::Processor { */ void performListing(std::string dir, const GetFileRequest &request); + int16_t getMetrics(std::vector> &metric_vector); + protected: private: + std::shared_ptr metrics_; + // Queue for store directory list std::queue _dirList; // Get Listing size http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/processors/GetTCP.h ---------------------------------------------------------------------- diff --git a/libminifi/include/processors/GetTCP.h b/libminifi/include/processors/GetTCP.h new file mode 100644 index 0000000..78a9c01 --- /dev/null +++ b/libminifi/include/processors/GetTCP.h @@ -0,0 +1,288 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __GET_TCP_H__ +#define __GET_TCP_H__ + +#include +#include "FlowFileRecord.h" +#include "core/Processor.h" +#include "core/ProcessSession.h" +#include "core/Core.h" +#include "core/Resource.h" +#include "concurrentqueue.h" +#include "utils/ThreadPool.h" +#include "core/logging/LoggerConfiguration.h" +#include "core/state/metrics/MetricsBase.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +class SocketAfterExecute : public utils::AfterExecute { + public: + explicit SocketAfterExecute(std::atomic &running, const std::string &endpoint, std::map*> *list, std::mutex *mutex) + : running_(running.load()), + endpoint_(endpoint), + mutex_(mutex), + list_(list) { + } + + explicit SocketAfterExecute(SocketAfterExecute && other) { + } + + ~SocketAfterExecute() { + } + + virtual bool isFinished(const int &result) { + + if (result == -1 || result == 0 || !running_) { + std::lock_guard lock(*mutex_); + list_->erase(endpoint_); + return true; + } else { + return false; + } + } + virtual bool isCancelled(const int &result) { + if (!running_) + return true; + else + return false; + } + + protected: + std::atomic running_; + std::map*> *list_; + std::mutex *mutex_; + std::string endpoint_; +}; + +class DataHandlerCallback : public OutputStreamCallback { + public: + DataHandlerCallback(uint8_t *message, size_t size) + : message_(message), + size_(size) { + } + + virtual ~DataHandlerCallback() { + + } + + virtual int64_t process(std::shared_ptr stream) { + return stream->write(message_, size_); + } + + private: + uint8_t *message_; + size_t size_; +}; + +class DataHandler { + public: + DataHandler(std::shared_ptr sessionFactory) + : sessionFactory_(sessionFactory) { + + } + static const char *SOURCE_ENDPOINT_ATTRIBUTE; + + int16_t handle(std::string source, uint8_t *message, size_t size, bool partial); + + private: + std::shared_ptr sessionFactory_; + +}; + +class GetTCPMetrics : public state::metrics::Metrics { + public: + GetTCPMetrics() + : state::metrics::Metrics("GetTCPMetrics", 0) { + iterations_ = 0; + accepted_files_ = 0; + input_bytes_ = 0; + } + + GetTCPMetrics(std::string name, uuid_t uuid) + : state::metrics::Metrics(name, uuid) { + iterations_ = 0; + accepted_files_ = 0; + input_bytes_ = 0; + } + virtual ~GetTCPMetrics() { + + } + virtual std::string getName() { + return core::Connectable::getName(); + } + + virtual std::vector serialize() { + std::vector resp; + + state::metrics::MetricResponse iter; + iter.name = "OnTriggerInvocations"; + iter.value = std::to_string(iterations_.load()); + + resp.push_back(iter); + + state::metrics::MetricResponse accepted_files; + accepted_files.name = "AcceptedFiles"; + accepted_files.value = std::to_string(accepted_files_.load()); + + resp.push_back(accepted_files); + + state::metrics::MetricResponse input_bytes; + input_bytes.name = "InputBytes"; + input_bytes.value = std::to_string(input_bytes_.load()); + + resp.push_back(input_bytes); + + return resp; + } + + protected: + friend class GetTCP; + + std::atomic iterations_; + std::atomic accepted_files_; + std::atomic input_bytes_; + +}; + +// GetTCP Class +class GetTCP : public core::Processor, public state::metrics::MetricsSource { + public: +// Constructor + /*! + * Create a new processor + */ + explicit GetTCP(std::string name, uuid_t uuid = NULL) + : Processor(name, uuid), + connection_attempts_(3), + reconnect_interval_(5000), + receive_buffer_size_(16 * 1024 * 1024), + stay_connected_(true), + endOfMessageByte(13), + running_(false), + connection_attempt_limit_(3), + concurrent_handlers_(2), + logger_(logging::LoggerFactory::getLogger()) { + metrics_ = std::make_shared(); + } +// Destructor + virtual ~GetTCP() { + } +// Processor Name + static constexpr char const* ProcessorName = "GetTCP"; + + // Supported Properties + static core::Property EndpointList; + static core::Property ConcurrentHandlers; + static core::Property ReconnectInterval; + static core::Property StayConnected; + static core::Property ReceiveBufferSize; + static core::Property ConnectionAttemptLimit; + static core::Property EndOfMessageByte; + + // Supported Relationships + static core::Relationship Success; + static core::Relationship Partial; + + public: + /** + * Function that's executed when the processor is scheduled. + * @param context process context. + * @param sessionFactory process session factory that is used when creating + * ProcessSession objects. + */ + virtual void onSchedule(std::shared_ptr processContext, std::shared_ptr sessionFactory); + + void onSchedule(core::ProcessContext *processContext, core::ProcessSessionFactory *sessionFactory) { + throw std::exception(); + } + /** + * Execution trigger for the GetTCP Processor + * @param context processor context + * @param session processor session reference. + */ + virtual void onTrigger(std::shared_ptr context, std::shared_ptr session); + + virtual void onTrigger(core::ProcessContext *context, core::ProcessSession *session) { + throw std::exception(); + } + + // Initialize, over write by NiFi GetTCP + virtual void initialize(void); + + int16_t getMetrics(std::vector> &metric_vector); + + protected: + + virtual void notifyStop(); + + private: + + std::function f_ex; + + std::atomic running_; + + std::unique_ptr handler_; + + std::vector endpoints; + + std::map*> live_clients_; + + utils::ThreadPool client_thread_pool_; + + moodycamel::ConcurrentQueue> socket_ring_buffer_; + + bool stay_connected_; + + uint16_t concurrent_handlers_; + + int8_t endOfMessageByte; + + int64_t reconnect_interval_; + + int64_t receive_buffer_size_; + + int8_t connection_attempts_; + + uint16_t connection_attempt_limit_; + + std::shared_ptr metrics_; + + // Mutex for ensuring clients are running + + std::mutex mutex_; + +// last listing time for root directory ( if recursive, we will consider the root +// as the top level time. + + std::shared_ptr logger_; +}; + +REGISTER_RESOURCE(GetTCP); + +} /* namespace processors */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/processors/InvokeHTTP.h ---------------------------------------------------------------------- diff --git a/libminifi/include/processors/InvokeHTTP.h b/libminifi/include/processors/InvokeHTTP.h index 03a1611..1c9d594 100644 --- a/libminifi/include/processors/InvokeHTTP.h +++ b/libminifi/include/processors/InvokeHTTP.h @@ -33,7 +33,7 @@ #include "utils/ByteInputCallBack.h" #include "core/logging/LoggerConfiguration.h" #include "utils/Id.h" -#include "utils/HTTPUtils.h" +#include "utils/HTTPClient.h" namespace org { namespace apache { @@ -56,9 +56,11 @@ class InvokeHTTP : public core::Processor { penalize_no_retry_(false), read_timeout_(20000), always_output_response_(false), + disable_peer_verification_(false), ssl_context_service_(nullptr), + use_chunked_encoding_(false), logger_(logging::LoggerFactory::getLogger()) { - curl_global_init(CURL_GLOBAL_DEFAULT); + static utils::HTTPClientInitializer *initializer = utils::HTTPClientInitializer::getInstance(); } // Destructor virtual ~InvokeHTTP(); @@ -79,7 +81,8 @@ class InvokeHTTP : public core::Processor { static core::Property ProxyPassword; static core::Property ContentType; static core::Property SendBody; - + static core::Property UseChunkedEncoding; + static core::Property DisablePeerVerification; static core::Property PropPutOutputAttributes; static core::Property AlwaysOutputResponse; @@ -101,9 +104,9 @@ class InvokeHTTP : public core::Processor { static core::Relationship RelNoRetry; static core::Relationship RelFailure; - void onTrigger(core::ProcessContext *context, core::ProcessSession *session); - void initialize(); - void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory); + virtual void onTrigger(core::ProcessContext *context, core::ProcessSession *session); + virtual void initialize(); + virtual void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory); /** * Provides a reference to the URL. */ @@ -114,37 +117,10 @@ class InvokeHTTP : public core::Processor { protected: /** - * Configures the SSL Context. Relies on the Context service and OpenSSL's installation - */ - static CURLcode configure_ssl_context(CURL *curl, void *ctx, void *param); - - /** - * Determines if a secure connection is required - * @param url url we will be connecting to - * @returns true if secure connection is allowed/required - */ - bool isSecure(const std::string &url); - - /** - * Configures a secure connection - */ - void configure_secure_connection(CURL *http_session); - - /** * Generate a transaction ID * @return transaction ID string. */ std::string generateId(); - /** - * Set the request method on the curl struct. - * @param curl pointer to this instance. - * @param string request method - */ - void set_request_method(CURL *curl, const std::string &); - - struct curl_slist *build_header_list(CURL *curl, std::string regex, const std::map &); - - bool matches(const std::string &value, const std::string &sregex); /** * Routes the flowfile to the proper destination @@ -155,9 +131,7 @@ class InvokeHTTP : public core::Processor { * @param isSuccess success code or not * @param statuscode http response code. */ - void route(std::shared_ptr &request, std::shared_ptr &response, core::ProcessSession *session, core::ProcessContext *context, - bool isSuccess, - int statusCode); + void route(std::shared_ptr &request, std::shared_ptr &response, core::ProcessSession *session, core::ProcessContext *context, bool isSuccess, int statusCode); /** * Determine if we should emit a new flowfile based on our activity * @param method method type @@ -167,8 +141,6 @@ class InvokeHTTP : public core::Processor { std::shared_ptr ssl_context_service_; - CURLcode res; - // http method std::string method_; // url @@ -187,7 +159,12 @@ class InvokeHTTP : public core::Processor { bool always_output_response_; // penalize on no retry bool penalize_no_retry_; - + // content type. + std::string content_type_; + // use chunked encoding. + bool use_chunked_encoding_; + // disable peer verification ( makes susceptible for MITM attacks ) + bool disable_peer_verification_; private: std::shared_ptr logger_; static std::shared_ptr id_generator_; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/processors/LoadProcessors.h ---------------------------------------------------------------------- diff --git a/libminifi/include/processors/LoadProcessors.h b/libminifi/include/processors/LoadProcessors.h index e8d207a..b629052 100644 --- a/libminifi/include/processors/LoadProcessors.h +++ b/libminifi/include/processors/LoadProcessors.h @@ -25,6 +25,7 @@ #include "ExecuteProcess.h" #include "GenerateFlowFile.h" #include "GetFile.h" +#include "GetTCP.h" #include "ListenHTTP.h" #include "LogAttribute.h" #include "PutFile.h" http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/processors/PutFile.h ---------------------------------------------------------------------- diff --git a/libminifi/include/processors/PutFile.h b/libminifi/include/processors/PutFile.h index c7f2823..0b8add7 100644 --- a/libminifi/include/processors/PutFile.h +++ b/libminifi/include/processors/PutFile.h @@ -80,8 +80,7 @@ class PutFile : public core::Processor { public: ReadCallback(const std::string &tmpFile, const std::string &destFile); ~ReadCallback(); - virtual int64_t process(std::shared_ptr stream); - bool commit(); + virtual int64_t process(std::shared_ptr stream);bool commit(); private: std::shared_ptr logger_; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/properties/Configure.h ---------------------------------------------------------------------- diff --git a/libminifi/include/properties/Configure.h b/libminifi/include/properties/Configure.h index 341b89c..f71ce9c 100644 --- a/libminifi/include/properties/Configure.h +++ b/libminifi/include/properties/Configure.h @@ -31,6 +31,7 @@ class Configure : public Properties { public: // nifi.flow.configuration.file static const char *nifi_default_directory; + static const char *nifi_c2_enable; static const char *nifi_flow_configuration_file; static const char *nifi_flow_engine_threads; static const char *nifi_administrative_yield_duration; @@ -63,7 +64,7 @@ class Configure : public Properties { static const char *nifi_configuration_listener_pull_interval; static const char *nifi_configuration_listener_http_url; static const char *nifi_configuration_listener_rest_url; - static const char *nifi_configuration_listener_type; // http or rest + static const char *nifi_configuration_listener_type; // http or rest // security config for all https service static const char *nifi_https_need_ClientAuth; static const char *nifi_https_client_certificate; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/properties/Properties.h ---------------------------------------------------------------------- diff --git a/libminifi/include/properties/Properties.h b/libminifi/include/properties/Properties.h index 1b456d0..ec0ca5d 100644 --- a/libminifi/include/properties/Properties.h +++ b/libminifi/include/properties/Properties.h @@ -75,6 +75,14 @@ class Properties { minifi_home_ = minifiHome; } + std::vector getConfiguredKeys() { + std::vector keys; + for (auto &property : properties_) { + keys.push_back(property.first); + } + return keys; + } + // Get the determined MINIFI_HOME std::string getHome() { return minifi_home_; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/provenance/Provenance.h ---------------------------------------------------------------------- diff --git a/libminifi/include/provenance/Provenance.h b/libminifi/include/provenance/Provenance.h index b9415dc..6d9895a 100644 --- a/libminifi/include/provenance/Provenance.h +++ b/libminifi/include/provenance/Provenance.h @@ -157,7 +157,7 @@ class ProvenanceEventRecord : public core::SerializableComponent { REPLAY }; static const char *ProvenanceEventTypeStr[REPLAY + 1]; - public: + public: // Constructor /*! * Create a new provenance event record @@ -483,14 +483,6 @@ class ProvenanceReporter { } _events.clear(); } - // allocate - ProvenanceEventRecord *allocate(ProvenanceEventRecord::ProvenanceEventType eventType, std::shared_ptr flow) { - ProvenanceEventRecord *event = new ProvenanceEventRecord(eventType, _componentId, _componentType); - if (event) - event->fromFlowFile(flow); - - return event; - } // commit void commit(); // create @@ -520,6 +512,15 @@ class ProvenanceReporter { protected: + // allocate + ProvenanceEventRecord *allocate(ProvenanceEventRecord::ProvenanceEventType eventType, std::shared_ptr flow) { + ProvenanceEventRecord *event = new ProvenanceEventRecord(eventType, _componentId, _componentType); + if (event) + event->fromFlowFile(flow); + + return event; + } + // Component ID std::string _componentId; // Component Type http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/provenance/ProvenanceRepository.h ---------------------------------------------------------------------- diff --git a/libminifi/include/provenance/ProvenanceRepository.h b/libminifi/include/provenance/ProvenanceRepository.h index ea78a3c..53f489f 100644 --- a/libminifi/include/provenance/ProvenanceRepository.h +++ b/libminifi/include/provenance/ProvenanceRepository.h @@ -25,6 +25,7 @@ #include "core/Core.h" #include "provenance/Provenance.h" #include "core/logging/LoggerConfiguration.h" +#include "concurrentqueue.h" namespace org { namespace apache { namespace nifi { @@ -56,6 +57,8 @@ class ProvenanceRepository : public core::Repository, public std::enable_shared_ delete db_; } + virtual void flush(); + void start() { if (this->purge_period_ <= 0) return; @@ -98,12 +101,14 @@ class ProvenanceRepository : public core::Repository, public std::enable_shared_ // Put virtual bool Put(std::string key, const uint8_t *buf, size_t bufLen) { - if (repo_full_) + if (repo_full_) { return false; + } // persistent to the DB leveldb::Slice value((const char *) buf, bufLen); leveldb::Status status; + repo_size_ += bufLen; status = db_->Put(leveldb::WriteOptions(), key, value); if (status.ok()) return true; @@ -112,12 +117,8 @@ class ProvenanceRepository : public core::Repository, public std::enable_shared_ } // Delete virtual bool Delete(std::string key) { - leveldb::Status status; - status = db_->Delete(leveldb::WriteOptions(), key); - if (status.ok()) - return true; - else - return false; + keys_to_delete.enqueue(key); + return true; } // Get virtual bool Get(const std::string &key, std::string &value) { @@ -134,6 +135,10 @@ class ProvenanceRepository : public core::Repository, public std::enable_shared_ Delete(event->getEventId()); } + virtual bool Serialize(const std::string &key, const uint8_t *buffer, const size_t bufferSize) { + return Put(key, buffer, bufferSize); + } + virtual bool get(std::vector> &store, size_t max_size) { leveldb::Iterator* it = db_->NewIterator(leveldb::ReadOptions()); for (it->SeekToFirst(); it->Valid(); it->Next()) { @@ -213,6 +218,7 @@ class ProvenanceRepository : public core::Repository, public std::enable_shared_ for (auto record : records) { Delete(record->getEventId()); } + flush(); } // destroy void destroy() { @@ -230,6 +236,7 @@ class ProvenanceRepository : public core::Repository, public std::enable_shared_ ProvenanceRepository &operator=(const ProvenanceRepository &parent) = delete; private: + moodycamel::ConcurrentQueue keys_to_delete; leveldb::DB* db_; std::shared_ptr logger_; }; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/utils/ByteInputCallBack.h ---------------------------------------------------------------------- diff --git a/libminifi/include/utils/ByteInputCallBack.h b/libminifi/include/utils/ByteInputCallBack.h index 86aae09..676d67b 100644 --- a/libminifi/include/utils/ByteInputCallBack.h +++ b/libminifi/include/utils/ByteInputCallBack.h @@ -47,7 +47,7 @@ class ByteInputCallBack : public InputStreamCallback { if (stream->getSize() > 0) { vec.resize(stream->getSize()); - stream->readData(vec, stream->getSize()); + stream->readData(reinterpret_cast(vec.data()), stream->getSize()); } ptr = (char*) &vec[0]; @@ -56,6 +56,13 @@ class ByteInputCallBack : public InputStreamCallback { } + void write(std::string content) { + //vec.resize(content.length()); + //std::copy(content.begin(), content.end(), std::back_inserter(vec)); + vec.assign(content.begin(), content.end()); + ptr = &vec[0]; + } + char *getBuffer() { return ptr; } @@ -66,7 +73,7 @@ class ByteInputCallBack : public InputStreamCallback { private: char *ptr; - std::vector vec; + std::vector vec; }; } /* namespace utils */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/utils/HTTPClient.h ---------------------------------------------------------------------- diff --git a/libminifi/include/utils/HTTPClient.h b/libminifi/include/utils/HTTPClient.h new file mode 100644 index 0000000..2a26847 --- /dev/null +++ b/libminifi/include/utils/HTTPClient.h @@ -0,0 +1,301 @@ +/** + * HTTPUtils class declaration + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __HTTP_UTILS_H__ +#define __HTTP_UTILS_H__ + +#include +#include +#include +#include +#include +#include +#include +#include +#include "controllers/SSLContextService.h" +#include "ByteInputCallBack.h" +#include "core/logging/Logger.h" +#include "core/logging/LoggerConfiguration.h" +#include "properties/Configure.h" +#include "io/validation.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace utils { + +struct HTTPUploadCallback { + ByteInputCallBack *ptr; + size_t pos; +}; + +struct HTTPHeaderResponse { + public: + + HTTPHeaderResponse(int max) + : max_tokens_(max) { + } + + void append(const std::string &header) { + if (header_tokens_.size() <= max_tokens_) { + header_tokens_.push_back(header); + } + } + + int max_tokens_; + std::vector header_tokens_; + + static size_t receive_headers(void *buffer, size_t size, size_t nmemb, void *userp) { + HTTPHeaderResponse *pHeaders = (HTTPHeaderResponse *) (userp); + int result = 0; + if (pHeaders != NULL) { + std::string s = ""; + s.append((char*) buffer, size * nmemb); + pHeaders->append(s); + result = size * nmemb; + } + return result; + } +}; + +/** + * HTTP Response object + */ +struct HTTPRequestResponse { + std::vector data; + + /** + * Receive HTTP Response. + */ + static size_t recieve_write(char * data, size_t size, size_t nmemb, void * p) { + return static_cast(p)->write_content(data, size, nmemb); + } + + /** + * Callback for post, put, and patch operations + * @param buffer + * @param size size of buffer + * @param nitems items to add + * @param insteam input stream object. + */ + + static size_t send_write(char * data, size_t size, size_t nmemb, void * p) { + if (p != 0) { + HTTPUploadCallback *callback = (HTTPUploadCallback*) p; + if (callback->pos <= callback->ptr->getBufferSize()) { + char *ptr = callback->ptr->getBuffer(); + int len = callback->ptr->getBufferSize() - callback->pos; + if (len <= 0) { + return 0; + } + if (len > size * nmemb) + len = size * nmemb; + memcpy(data, callback->ptr->getBuffer() + callback->pos, len); + callback->pos += len; + return len; + } + } else { + return CURL_READFUNC_ABORT; + } + + return 0; + } + + size_t write_content(char* ptr, size_t size, size_t nmemb) { + data.insert(data.end(), ptr, ptr + size * nmemb); + return size * nmemb; + } + +}; + +static void parse_url(std::string &url, std::string &host, int &port, std::string &protocol) { + + std::string http("http://"); + std::string https("https://"); + + if (url.compare(0, http.size(), http) == 0) + protocol = http; + + if (url.compare(0, https.size(), https) == 0) + protocol = https; + + if (!protocol.empty()) { + size_t pos = url.find_first_of(":", protocol.size()); + + if (pos == std::string::npos) { + pos = url.size(); + } + + host = url.substr(protocol.size(), pos - protocol.size()); + + if (pos < url.size() && url[pos] == ':') { + size_t ppos = url.find_first_of("/", pos); + if (ppos == std::string::npos) { + ppos = url.size(); + } + std::string portStr(url.substr(pos + 1, ppos - pos - 1)); + if (portStr.size() > 0) { + port = std::stoi(portStr); + } + } + } +} + +/** + * Purpose and Justification: Initializes and cleans up curl once. Cleanup will only occur at the end of our execution since we are relying on a static variable. + */ +class HTTPClientInitializer { + public: + static HTTPClientInitializer *getInstance() { + static HTTPClientInitializer initializer; + return &initializer; + } + private: + ~HTTPClientInitializer() { + curl_global_cleanup(); + } + HTTPClientInitializer() { + curl_global_init(CURL_GLOBAL_DEFAULT); + } +}; + +/** + * Purpose and Justification: Pull the basics for an HTTPClient into a self contained class. Simply provide + * the URL and an SSLContextService ( can be null). + * + * Since several portions of the code have been relying on curl, we can encapsulate most CURL HTTP + * operations here without maintaining it everywhere. Further, this will help with testing as we + * only need to to test our usage of CURL once + */ +class HTTPClient { + public: + HTTPClient(const std::string &url, const std::shared_ptr ssl_context_service = nullptr); + + ~HTTPClient(); + + void setVerbose(); + + void initialize(const std::string &method); + + void setConnectionTimeout(int64_t timeout); + + void setReadTimeout(int64_t timeout); + + void setUploadCallback(HTTPUploadCallback *callbackObj); + + struct curl_slist *build_header_list(std::string regex, const std::map &attributes); + + void setContentType(std::string content_type); + + std::string escape(std::string string_to_escape); + + void setPostFields(std::string input); + + void setHeaders(struct curl_slist *list); + + bool submit(); + + CURLcode getResponseResult(); + + int64_t &getResponseCode(); + + const char *getContentType(); + + const std::vector &getResponseBody(); + + void set_request_method(const std::string method); + + void setUseChunkedEncoding(); + + void setDisablePeerVerification(); + + const std::vector &getHeaders() { + return header_response_.header_tokens_; + + } + + protected: + + inline bool matches(const std::string &value, const std::string &sregex); + + static CURLcode configure_ssl_context(CURL *curl, void *ctx, void *param) { + minifi::controllers::SSLContextService *ssl_context_service = static_cast(param); + if (!ssl_context_service->configure_ssl_context(static_cast(ctx))) { + return CURLE_FAILED_INIT; + } + return CURLE_OK; + } + + void configure_secure_connection(CURL *http_session); + + bool isSecure(const std::string &url); + struct curl_slist *headers_; + utils::HTTPRequestResponse content_; + utils::HTTPHeaderResponse header_response_; + CURLcode res; + int64_t http_code; + char *content_type; + + int64_t connect_timeout_; + // read timeout. + int64_t read_timeout_; + + std::string content_type_; + + std::shared_ptr logger_; + CURL *http_session_; + std::string url_; + std::string method_; + std::shared_ptr ssl_context_service_; +}; + +//static std::string get_token(HTTPClientstd::string loginUrl, std::string username, std::string password, HTTPSecurityConfiguration &securityConfig) { +static std::string get_token(HTTPClient &client, std::string username, std::string password) { + utils::HTTPRequestResponse content; + std::string token; + + client.setContentType("application/x-www-form-urlencoded"); + + client.set_request_method("POST"); + + std::string payload = "username=" + username + "&" + "password=" + password; + + client.setPostFields(client.escape(payload)); + + client.submit(); + + if (client.submit() && client.getResponseCode() == 200) { + + const std::string &response_body = std::string(client.getResponseBody().data(), client.getResponseBody().size()); + + if (!response_body.empty()) { + token = "Bearer " + response_body; + } + } + + return token; +} + +} /* namespace utils */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/utils/HTTPUtils.h ---------------------------------------------------------------------- diff --git a/libminifi/include/utils/HTTPUtils.h b/libminifi/include/utils/HTTPUtils.h deleted file mode 100644 index e47bc11..0000000 --- a/libminifi/include/utils/HTTPUtils.h +++ /dev/null @@ -1,304 +0,0 @@ -/** - * HTTPUtils class declaration - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef __HTTP_UTILS_H__ -#define __HTTP_UTILS_H__ - -#include -#include -#include -#include -#include -#include -#include "ByteInputCallBack.h" -#include "core/logging/Logger.h" -#include "core/logging/LoggerConfiguration.h" -#include "properties/Configure.h" -#include "io/validation.h" - -namespace org { -namespace apache { -namespace nifi { -namespace minifi { -namespace utils { - -struct CallBackPosition { - ByteInputCallBack *ptr; - size_t pos; -}; - -/** - * HTTP Response object - */ -struct HTTPRequestResponse { - std::vector data; - - /** - * Receive HTTP Response. - */ - static size_t recieve_write(char * data, size_t size, size_t nmemb, - void * p) { - return static_cast(p)->write_content(data, size, - nmemb); - } - - /** - * Callback for post, put, and patch operations - * @param buffer - * @param size size of buffer - * @param nitems items to add - * @param insteam input stream object. - */ - - static size_t send_write(char * data, size_t size, size_t nmemb, void * p) { - if (p != 0) { - CallBackPosition *callback = (CallBackPosition*) p; - if (callback->pos <= callback->ptr->getBufferSize()) { - char *ptr = callback->ptr->getBuffer(); - int len = callback->ptr->getBufferSize() - callback->pos; - if (len <= 0) { - delete callback->ptr; - delete callback; - return 0; - } - if (len > size * nmemb) - len = size * nmemb; - memcpy(data, callback->ptr->getBuffer() + callback->pos, len); - callback->pos += len; - return len; - } - } else { - return CURL_READFUNC_ABORT; - } - - return 0; - } - - size_t write_content(char* ptr, size_t size, size_t nmemb) { - data.insert(data.end(), ptr, ptr + size * nmemb); - return size * nmemb; - } - -}; - -static void parse_url(std::string &url, std::string &host, int &port, std::string &protocol) { - - std::string http("http://"); - std::string https("https://"); - - if (url.compare(0, http.size(), http) == 0) - protocol = http; - - if (url.compare(0, https.size(), https) == 0) - protocol = https; - - if (!protocol.empty()) { - size_t pos = url.find_first_of(":", protocol.size()); - - if (pos == std::string::npos) { - pos = url.size(); - } - - host = url.substr(protocol.size(), pos - protocol.size()); - - if (pos < url.size() && url[pos] == ':') { - size_t ppos = url.find_first_of("/", pos); - if (ppos == std::string::npos) { - ppos = url.size(); - } - std::string portStr(url.substr(pos + 1, ppos - pos - 1)); - if (portStr.size() > 0) { - port = std::stoi(portStr); - } - } - } -} - -// HTTPSecurityConfiguration -class HTTPSecurityConfiguration { -public: - - // Constructor - /*! - * Create a new HTTPSecurityConfiguration - */ - HTTPSecurityConfiguration(bool need_client_certificate, std::string certificate, - std::string private_key, std::string passphrase, std::string ca_certificate) : - need_client_certificate_(need_client_certificate), certificate_(certificate), - private_key_(private_key), passphrase_(passphrase), ca_certificate_(ca_certificate) { - logger_ = logging::LoggerFactory::getLogger(); - } - // Destructor - virtual ~HTTPSecurityConfiguration() { - } - - HTTPSecurityConfiguration(std::shared_ptr configure) { - logger_ = logging::LoggerFactory::getLogger(); - need_client_certificate_ = false; - std::string clientAuthStr; - if (configure->get(Configure::nifi_https_need_ClientAuth, clientAuthStr)) { - org::apache::nifi::minifi::utils::StringUtils::StringToBool(clientAuthStr, this->need_client_certificate_); - } - - if (configure->get(Configure::nifi_https_client_ca_certificate, this->ca_certificate_)) { - logger_->log_info("HTTPSecurityConfiguration CA certificates: [%s]", this->ca_certificate_); - } - - if (this->need_client_certificate_) { - std::string passphrase_file; - - if (!(configure->get(Configure::nifi_https_client_certificate, this->certificate_) && configure->get(Configure::nifi_https_client_private_key, this->private_key_))) { - logger_->log_error("Certificate and Private Key PEM file not configured for HTTPSecurityConfiguration, error: %s.", std::strerror(errno)); - } - - if (configure->get(Configure::nifi_https_client_pass_phrase, passphrase_file)) { - // load the passphase from file - std::ifstream file(passphrase_file.c_str(), std::ifstream::in); - if (file.good()) { - this->passphrase_.assign((std::istreambuf_iterator(file)), std::istreambuf_iterator()); - file.close(); - } - } - - logger_->log_info("HTTPSecurityConfiguration certificate: [%s], private key: [%s], passphrase file: [%s]", this->certificate_, this->private_key_, passphrase_file); - } - } - - /** - * Configures a secure connection - */ - void configureSecureConnection(CURL *http_session) { - curl_easy_setopt(http_session, CURLOPT_VERBOSE, 1L); - curl_easy_setopt(http_session, CURLOPT_CAINFO, this->ca_certificate_.c_str()); - curl_easy_setopt(http_session, CURLOPT_SSLCERTTYPE, "PEM"); - curl_easy_setopt(http_session, CURLOPT_SSL_VERIFYPEER, 1L); - if (this->need_client_certificate_) { - CURLcode ret; - ret = curl_easy_setopt(http_session, CURLOPT_SSL_CTX_FUNCTION, - &HTTPSecurityConfiguration::configureSSLContext); - if (ret != CURLE_OK) - logger_->log_error("CURLOPT_SSL_CTX_FUNCTION not supported %d", ret); - curl_easy_setopt(http_session, CURLOPT_SSL_CTX_DATA, - static_cast(this)); - curl_easy_setopt(http_session, CURLOPT_SSLKEYTYPE, "PEM"); - } - } - - static CURLcode configureSSLContext(CURL *curl, void *ctx, void *param) { - minifi::utils::HTTPSecurityConfiguration *config = - static_cast(param); - SSL_CTX* sslCtx = static_cast(ctx); - - SSL_CTX_load_verify_locations(sslCtx, config->ca_certificate_.c_str(), 0); - SSL_CTX_use_certificate_file(sslCtx, config->certificate_.c_str(), - SSL_FILETYPE_PEM); - SSL_CTX_set_default_passwd_cb(sslCtx, - HTTPSecurityConfiguration::pemPassWordCb); - SSL_CTX_set_default_passwd_cb_userdata(sslCtx, param); - SSL_CTX_use_PrivateKey_file(sslCtx, config->private_key_.c_str(), - SSL_FILETYPE_PEM); - // verify private key - if (!SSL_CTX_check_private_key(sslCtx)) { - config->logger_->log_error( - "Private key does not match the public certificate, error : %s", - std::strerror(errno)); - return CURLE_FAILED_INIT; - } - - config->logger_->log_debug( - "HTTPSecurityConfiguration load Client Certificates OK"); - return CURLE_OK; - } - - static int pemPassWordCb(char *buf, int size, int rwflag, void *param) { - minifi::utils::HTTPSecurityConfiguration *config = - static_cast(param); - - if (config->passphrase_.length() > 0) { - memset(buf, 0x00, size); - memcpy(buf, config->passphrase_.c_str(), - config->passphrase_.length() - 1); - return config->passphrase_.length() - 1; - } - return 0; - } - -protected: - bool need_client_certificate_; - std::string certificate_; - std::string private_key_; - std::string passphrase_; - std::string ca_certificate_; - -private: - std::shared_ptr logger_; -}; - -static std::string get_token(std::string loginUrl, std::string username, std::string password, HTTPSecurityConfiguration &securityConfig) { - utils::HTTPRequestResponse content; - std::string token; - CURL *login_session = curl_easy_init(); - if (loginUrl.find("https") != std::string::npos) { - securityConfig.configureSecureConnection(login_session); - } - curl_easy_setopt(login_session, CURLOPT_URL, loginUrl.c_str()); - struct curl_slist *list = NULL; - list = curl_slist_append(list, "Content-Type: application/x-www-form-urlencoded"); - list = curl_slist_append(list, "Accept: text/plain"); - curl_easy_setopt(login_session, CURLOPT_HTTPHEADER, list); - std::string payload = "username=" + username + "&" + "password=" + password; - char *output = curl_easy_escape(login_session, payload.c_str(), payload.length()); - curl_easy_setopt(login_session, CURLOPT_WRITEFUNCTION, - &utils::HTTPRequestResponse::recieve_write); - curl_easy_setopt(login_session, CURLOPT_WRITEDATA, - static_cast(&content)); - curl_easy_setopt(login_session, CURLOPT_POSTFIELDSIZE, strlen(output)); - curl_easy_setopt(login_session, CURLOPT_POSTFIELDS, output); - curl_easy_setopt(login_session, CURLOPT_POST, 1); - CURLcode res = curl_easy_perform(login_session); - curl_slist_free_all(list); - curl_free(output); - if (res == CURLE_OK) { - std::string response_body(content.data.begin(), content.data.end()); - int64_t http_code = 0; - curl_easy_getinfo(login_session, CURLINFO_RESPONSE_CODE, &http_code); - char *content_type; - /* ask for the content-type */ - curl_easy_getinfo(login_session, CURLINFO_CONTENT_TYPE, &content_type); - - bool isSuccess = ((int32_t) (http_code / 100)) == 2 - && res != CURLE_ABORTED_BY_CALLBACK; - bool body_empty = IsNullOrEmpty(content.data); - - if (isSuccess && !body_empty) { - token = "Bearer " + response_body; - } - } - curl_easy_cleanup(login_session); - - return token; -} - - -} /* namespace utils */ -} /* namespace minifi */ -} /* namespace nifi */ -} /* namespace apache */ -} /* namespace org */ - -#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/utils/Id.h ---------------------------------------------------------------------- diff --git a/libminifi/include/utils/Id.h b/libminifi/include/utils/Id.h index a4f8239..8431548 100644 --- a/libminifi/include/utils/Id.h +++ b/libminifi/include/utils/Id.h @@ -41,7 +41,7 @@ class IdGenerator { public: void generate(uuid_t output); void initialize(const std::shared_ptr & properties); - + static std::shared_ptr getIdGenerator() { static std::shared_ptr generator = std::shared_ptr(new IdGenerator()); return generator; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/utils/StringUtils.h ---------------------------------------------------------------------- diff --git a/libminifi/include/utils/StringUtils.h b/libminifi/include/utils/StringUtils.h index b754467..3f8fbea 100644 --- a/libminifi/include/utils/StringUtils.h +++ b/libminifi/include/utils/StringUtils.h @@ -105,7 +105,7 @@ class StringUtils { } catch (const std::invalid_argument &ie) { switch (cp) { case RETURN: - case NOTHING: + case NOTHING: return false; case EXIT: exit(1); @@ -115,7 +115,7 @@ class StringUtils { } catch (const std::out_of_range &ofr) { switch (cp) { case RETURN: - case NOTHING: + case NOTHING: return false; case EXIT: exit(1); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/utils/ThreadPool.h ---------------------------------------------------------------------- diff --git a/libminifi/include/utils/ThreadPool.h b/libminifi/include/utils/ThreadPool.h index 5335c81..0c71922 100644 --- a/libminifi/include/utils/ThreadPool.h +++ b/libminifi/include/utils/ThreadPool.h @@ -21,6 +21,7 @@ #include #include #include +#include #include #include #include @@ -129,8 +130,8 @@ class Worker { virtual uint64_t getTimeSlice() { return time_slice_; } - - virtual uint64_t getWaitTime(){ + + virtual uint64_t getWaitTime() { return run_determinant_->wait_time(); } @@ -147,8 +148,7 @@ class Worker { protected: inline uint64_t increment_time(const uint64_t &time) { - std::chrono::time_point now = - std::chrono::system_clock::now(); + std::chrono::time_point now = std::chrono::system_clock::now(); auto millis = std::chrono::duration_cast(now.time_since_epoch()).count(); return millis + time; } @@ -161,6 +161,14 @@ class Worker { }; template +class WorkerComparator { + public: + bool operator()(Worker &a, Worker &b) { + return a.getTimeSlice() < b.getTimeSlice(); + } +}; + +template Worker& Worker::operator =(Worker && other) { task = std::move(other.task); promise = other.promise; @@ -185,7 +193,7 @@ template class ThreadPool { public: - ThreadPool(int max_worker_threads = 8, bool daemon_threads = false) + ThreadPool(int max_worker_threads = 2, bool daemon_threads = false) : max_worker_threads_(max_worker_threads), daemon_threads_(daemon_threads), running_(false) { @@ -298,6 +306,7 @@ class ThreadPool { std::atomic running_; // worker queue of worker objects moodycamel::ConcurrentQueue> worker_queue_; + std::priority_queue, std::vector>, WorkerComparator> worker_priority_queue_; // notification for available work std::condition_variable tasks_available_; // map to identify if a task should be @@ -336,7 +345,7 @@ bool ThreadPool::execute(Worker &&task, std::future &future) { template void ThreadPool::startWorkers() { for (int i = 0; i < max_worker_threads_; i++) { - thread_queue_.push_back(std::thread(&ThreadPool::run_tasks, this)); + thread_queue_.push_back(std::move(std::thread(&ThreadPool::run_tasks, this))); current_workers_++; } @@ -360,26 +369,33 @@ void ThreadPool::run_tasks() { // they are eligible to run per the fact that the thread pool isn't shut down and the tasks are in a runnable state // BUT they've been continually timesliced, we will lower the wait decay to 100ms and continue incrementing from // there. This ensures we don't have arbitrarily long sleep cycles. - if (wait_decay_ > 500000000L){ - wait_decay_ = 100000000L; + if (wait_decay_ > 500000000L) { + wait_decay_ = 100000000L; } // if we are spinning, perform a wait. If something changes in the worker such that the timeslice has changed, we will pick that information up. Note that it's possible // we could starve for processing time if all workers are waiting. In the event that the number of workers far exceeds the number of threads, threads will spin and potentially // wait until they arrive at a task that can be run. In this case we reset the wait_decay and attempt to pick up a new task. This means that threads that recently ran should // be more likely to run. This is intentional. - + if (wait_decay_ > 2000) { std::this_thread::sleep_for(std::chrono::nanoseconds(wait_decay_)); } Worker task; if (!worker_queue_.try_dequeue(task)) { - std::unique_lock lock(worker_queue_mutex_); + if (worker_priority_queue_.size() > 0) { + // this is safe as we are going to immediately pop the queue + while (!worker_priority_queue_.empty()) { + task = std::move(const_cast&>(worker_priority_queue_.top())); + worker_priority_queue_.pop(); + worker_queue_.enqueue(std::move(task)); + continue; + } + + } tasks_available_.wait_for(lock, waitperiod); continue; - } - else { - + } else { std::unique_lock lock(worker_queue_mutex_); if (!task_status_[task.getIdentifier()]) { continue; @@ -388,12 +404,12 @@ void ThreadPool::run_tasks() { bool wait_to_run = false; if (task.getTimeSlice() > 1) { - double wt = (double)task.getWaitTime(); + double wt = (double) task.getWaitTime(); auto now = std::chrono::system_clock::now().time_since_epoch(); auto ms = std::chrono::duration_cast(now).count(); // if our differential is < 10% of the wait time we will not put the task into a wait state // since requeuing will break the time slice contract. - if (task.getTimeSlice() > ms && (task.getTimeSlice() - ms) > (wt*.10)) { + if (task.getTimeSlice() > ms && (task.getTimeSlice() - ms) > (wt * .10)) { wait_to_run = true; } } @@ -404,8 +420,10 @@ void ThreadPool::run_tasks() { if (!task_status_[task.getIdentifier()]) { continue; } + // put it on the priority queue + worker_priority_queue_.push(std::move(task)); } - worker_queue_.enqueue(std::move(task)); + //worker_queue_.enqueue(std::move(task)); wait_decay_ += 25; continue; @@ -423,7 +441,6 @@ void ThreadPool::run_tasks() { } } worker_queue_.enqueue(std::move(task)); - } } current_workers_--; @@ -434,7 +451,7 @@ void ThreadPool::start() { std::lock_guard lock(manager_mutex_); if (!running_) { running_ = true; - manager_thread_ = std::thread(&ThreadPool::startWorkers, this); + manager_thread_ = std::move(std::thread(&ThreadPool::startWorkers, this)); if (worker_queue_.size_approx() > 0) { tasks_available_.notify_all(); } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/CPPLINT.cfg ---------------------------------------------------------------------- diff --git a/libminifi/src/CPPLINT.cfg b/libminifi/src/CPPLINT.cfg new file mode 100644 index 0000000..9205687 --- /dev/null +++ b/libminifi/src/CPPLINT.cfg @@ -0,0 +1,3 @@ +set noparent +filter=-build/include_order,-build/include_alpha +exclude_files=ResourceClaim.cpp http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/ConfigurationListener.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/ConfigurationListener.cpp b/libminifi/src/ConfigurationListener.cpp deleted file mode 100644 index 858e455..0000000 --- a/libminifi/src/ConfigurationListener.cpp +++ /dev/null @@ -1,87 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include "ConfigurationListener.h" -#include "FlowController.h" -#include -#include -#include -#include -#include - -namespace org { -namespace apache { -namespace nifi { -namespace minifi { - -void ConfigurationListener::start() { - if (running_) - return; - - pull_interval_ = 60 * 1000; - std::string value; - // grab the value for configuration - if (configure_->get(Configure::nifi_configuration_listener_pull_interval, value)) { - core::TimeUnit unit; - if (core::Property::StringToTime(value, pull_interval_, unit) && core::Property::ConvertTimeUnitToMS(pull_interval_, unit, pull_interval_)) { - logger_->log_info("Configuration Listener pull interval: [%d] ms", pull_interval_); - } - } - - thread_ = std::thread(&ConfigurationListener::threadExecutor, this); - thread_.detach(); - running_ = true; - logger_->log_info("%s ConfigurationListener Thread Start", type_); -} - -void ConfigurationListener::stop() { - if (!running_) - return; - running_ = false; - if (thread_.joinable()) - thread_.join(); - logger_->log_info("%s ConfigurationListener Thread Stop", type_); -} - -void ConfigurationListener::run() { - std::unique_lock < std::mutex > lk(mutex_); - std::condition_variable cv; - int64_t interval = 0; - while (!cv.wait_for(lk, std::chrono::milliseconds(100), [this] {return (running_ == false);})) { - interval += 100; - if (interval >= pull_interval_) { - std::string payload; - bool ret = false; - ret = pullConfiguration(payload); - if (ret) { - if (payload.empty() || payload == lastAppliedConfiguration) { - interval = 0; - continue; - } - ret = this->controller_->applyConfiguration(payload); - if (ret) - this->lastAppliedConfiguration = payload; - } - interval = 0; - } - } -} - -} /* namespace minifi */ -} /* namespace nifi */ -} /* namespace apache */ -} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/Configure.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/Configure.cpp b/libminifi/src/Configure.cpp index acad1fd..9dcd6c6 100644 --- a/libminifi/src/Configure.cpp +++ b/libminifi/src/Configure.cpp @@ -23,6 +23,7 @@ namespace nifi { namespace minifi { const char *Configure::nifi_default_directory = "nifi.default.directory"; +const char *Configure::nifi_c2_enable = "nifi.c2.enable"; const char *Configure::nifi_flow_configuration_file = "nifi.flow.configuration.file"; const char *Configure::nifi_flow_engine_threads = "nifi.flow.engine.threads"; const char *Configure::nifi_administrative_yield_duration = "nifi.administrative.yield.duration"; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/Connection.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/Connection.cpp b/libminifi/src/Connection.cpp index 1d937b4..082063d 100644 --- a/libminifi/src/Connection.cpp +++ b/libminifi/src/Connection.cpp @@ -104,7 +104,7 @@ void Connection::put(std::shared_ptr flow) { // Notify receiving processor that work may be available if (dest_connectable_) { - logger_->log_debug("Notifying %s", dest_connectable_->getName()); + logger_->log_debug("Notifying %s that %s was inserted", dest_connectable_->getName(), flow->getUUIDStr()); dest_connectable_->notifyWork(); } } @@ -122,6 +122,7 @@ std::shared_ptr Connection::poll(std::set (item->getEntryDate() + expired_duration_)) { // Flow record expired expiredFlowRecords.insert(item); + logger_->log_debug("Delete flow file UUID %s from connection %s, because it expired", item->getUUIDStr().c_str(), name_.c_str()); if (flow_repository_->Delete(item->getUUIDStr())) { item->setStoredToRepository(false); } @@ -136,12 +137,6 @@ std::shared_ptr Connection::poll(std::set connectable = std::static_pointer_cast(shared_from_this()); item->setOriginalConnection(connectable); logger_->log_debug("Dequeue flow file UUID %s from connection %s", item->getUUIDStr().c_str(), name_.c_str()); - - // delete from the flowfile repo - if (flow_repository_->Delete(item->getUUIDStr())) { - item->setStoredToRepository(false); - } - return item; } } else { @@ -155,11 +150,6 @@ std::shared_ptr Connection::poll(std::set connectable = std::static_pointer_cast(shared_from_this()); item->setOriginalConnection(connectable); logger_->log_debug("Dequeue flow file UUID %s from connection %s", item->getUUIDStr().c_str(), name_.c_str()); - // delete from the flowfile repo - if (flow_repository_->Delete(item->getUUIDStr())) { - item->setStoredToRepository(false); - } - return item; } } @@ -171,10 +161,14 @@ void Connection::drain() { std::lock_guard lock(mutex_); while (!queue_.empty()) { - auto &&item = queue_.front(); + std::shared_ptr item = queue_.front(); queue_.pop(); + logger_->log_debug("Delete flow file UUID %s from connection %s, because it expired", item->getUUIDStr().c_str(), name_.c_str()); + if (flow_repository_->Delete(item->getUUIDStr())) { + item->setStoredToRepository(false); + } } - + queued_data_size_ = 0; logger_->log_debug("Drain connection %s", name_.c_str()); } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/EventDrivenSchedulingAgent.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/EventDrivenSchedulingAgent.cpp b/libminifi/src/EventDrivenSchedulingAgent.cpp index db5ca08..c56ac58 100644 --- a/libminifi/src/EventDrivenSchedulingAgent.cpp +++ b/libminifi/src/EventDrivenSchedulingAgent.cpp @@ -32,7 +32,8 @@ namespace apache { namespace nifi { namespace minifi { -uint64_t EventDrivenSchedulingAgent::run(std::shared_ptr processor, core::ProcessContext *processContext, core::ProcessSessionFactory *sessionFactory) { +uint64_t EventDrivenSchedulingAgent::run(const std::shared_ptr &processor, const std::shared_ptr &processContext, + const std::shared_ptr &sessionFactory) { while (this->running_) { bool shouldYield = this->onTrigger(processor, processContext, sessionFactory);