Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 1A5CA200D20 for ; Mon, 2 Oct 2017 16:57:18 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 18D741609DE; Mon, 2 Oct 2017 14:57:18 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id EE7DF160BDB for ; Mon, 2 Oct 2017 16:57:15 +0200 (CEST) Received: (qmail 69454 invoked by uid 500); 2 Oct 2017 14:57:15 -0000 Mailing-List: contact commits-help@nifi.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@nifi.apache.org Delivered-To: mailing list commits@nifi.apache.org Received: (qmail 69260 invoked by uid 99); 2 Oct 2017 14:57:14 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 02 Oct 2017 14:57:14 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 86144F5B9D; Mon, 2 Oct 2017 14:57:14 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit From: aldrin@apache.org To: commits@nifi.apache.org Date: Mon, 02 Oct 2017 14:57:20 -0000 Message-Id: <0ad15c34ffe546668c1e297ce53ec665@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [7/8] nifi-minifi-cpp git commit: MINIFI-339: Add C2 base allowing for 1 protocol and n heartbeat reporters MINIFI-339: Add GetTCP Processor MINIFI-339: Add listener server MINIFI-339: Update to listener MINIFI-339: Resolve Issue with stack based process archived-at: Mon, 02 Oct 2017 14:57:18 -0000 http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/c2/protocols/RESTSender.h ---------------------------------------------------------------------- diff --git a/libminifi/include/c2/protocols/RESTSender.h b/libminifi/include/c2/protocols/RESTSender.h new file mode 100644 index 0000000..749de3e --- /dev/null +++ b/libminifi/include/c2/protocols/RESTSender.h @@ -0,0 +1,80 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_C2_RESTSENDER_H_ +#define LIBMINIFI_INCLUDE_C2_RESTSENDER_H_ + +#include "json/json.h" +#include "json/writer.h" +#include +#include +#include "CivetServer.h" +#include "../C2Protocol.h" +#include "RESTProtocol.h" +#include "../HeartBeatReporter.h" +#include "controllers/SSLContextService.h" +#include "utils/ByteInputCallBack.h" +#include "utils/HTTPClient.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace c2 { + +/** + * Purpose and Justification: Encapsulates the restful protocol that is built upon C2Protocol. + * + * The external interfaces rely solely on send, where send includes a Direction. Transmit will perform a POST + * and RECEIVE will perform a GET. This does not mean we can't receive on a POST; however, since Direction + * will encompass other protocols the context of its meaning here simply translates into POST and GET respectively. + * + */ +class RESTSender : public RESTProtocol, public C2Protocol { + public: + + explicit RESTSender(std::string name, uuid_t uuid = nullptr); + + virtual C2Payload consumePayload(const std::string &url, const C2Payload &payload, Direction direction, bool async); + + virtual C2Payload consumePayload(const C2Payload &payload, Direction direction, bool async); + + virtual void update(const std::shared_ptr &configure); + + virtual void initialize(const std::shared_ptr &controller, const std::shared_ptr &configure); + + protected: + + virtual const C2Payload sendPayload(const std::string url, const Direction direction, const C2Payload &payload, const std::string outputConfig); + + std::shared_ptr ssl_context_service_; + + private: + std::shared_ptr logger_; + std::string rest_uri_; + std::string ack_uri_; +}; + +REGISTER_RESOURCE(RESTSender); + +} /* namesapce c2 */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif /* LIBMINIFI_INCLUDE_C2_RESTPROTOCOL_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/controllers/SSLContextService.h ---------------------------------------------------------------------- diff --git a/libminifi/include/controllers/SSLContextService.h b/libminifi/include/controllers/SSLContextService.h index c48d30f..9ceb10c 100644 --- a/libminifi/include/controllers/SSLContextService.h +++ b/libminifi/include/controllers/SSLContextService.h @@ -73,6 +73,37 @@ class SSLContextService : public core::controller::ControllerService { logger_(logging::LoggerFactory::getLogger()) { } + explicit SSLContextService(const std::string &name, const std::shared_ptr &configuration) + : ControllerService(name, nullptr), + initialized_(false), + valid_(false), + logger_(logging::LoggerFactory::getLogger()) { + setConfiguration(configuration); + initialize(); + // set the properties based on the configuration + core::Property property("Client Certificate", "Client Certificate"); + core::Property privKey("Private Key", "Private Key file"); + core::Property passphrase_prop("Passphrase", "Client passphrase. Either a file or unencrypted text"); + core::Property caCert("CA Certificate", "CA certificate file"); + + std::string value; + if (configuration_->get(Configure::nifi_security_client_certificate, value)) { + setProperty(property.getName(), value); + } + + if (configuration_->get(Configure::nifi_security_client_private_key, value)) { + setProperty(privKey.getName(), value); + } + + if (configuration_->get(Configure::nifi_security_client_pass_phrase, value)) { + setProperty(passphrase_prop.getName(), value); + } + + if (configuration_->get(Configure::nifi_security_client_ca_certificate, value)) { + setProperty(caCert.getName(), value); + } + } + virtual void initialize(); std::unique_ptr createSSLContext(); @@ -114,8 +145,7 @@ class SSLContextService : public core::controller::ControllerService { if (!IsNullOrEmpty(private_key_)) { int retp = SSL_CTX_use_PrivateKey_file(ctx, private_key_.c_str(), SSL_FILETYPE_PEM); if (retp != 1) { - logger_->log_error("Could not create load private key,%i on %s error : %s", retp, private_key_, - std::strerror(errno)); + logger_->log_error("Could not create load private key,%i on %s error : %s", retp, private_key_, std::strerror(errno)); return false; } @@ -136,6 +166,8 @@ class SSLContextService : public core::controller::ControllerService { return true; } + virtual void onEnable(); + protected: static int pemPassWordCb(char *buf, int size, int rwflag, void *userdata) { @@ -153,8 +185,6 @@ class SSLContextService : public core::controller::ControllerService { virtual void initializeTLS(); - virtual void onEnable(); - std::mutex initialization_mutex_; std::atomic initialized_; std::atomic valid_; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/core/ClassLoader.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/ClassLoader.h b/libminifi/include/core/ClassLoader.h index 31292b2..b6cefc5 100644 --- a/libminifi/include/core/ClassLoader.h +++ b/libminifi/include/core/ClassLoader.h @@ -62,11 +62,25 @@ class ObjectFactory { /** * Create a shared pointer to a new processor. */ + virtual Connectable *createRaw(const std::string &name) { + return nullptr; + } + + /** + * Create a shared pointer to a new processor. + */ virtual std::shared_ptr create(const std::string &name, uuid_t uuid) { return nullptr; } /** + * Create a shared pointer to a new processor. + */ + virtual Connectable* createRaw(const std::string &name, uuid_t uuid) { + return nullptr; + } + + /** * Gets the name of the object. * @return class name of processor */ @@ -116,6 +130,22 @@ class DefautObjectFactory : public ObjectFactory { } /** + * Create a shared pointer to a new processor. + */ + virtual Connectable* createRaw(const std::string &name) { + T *ptr = new T(name); + return dynamic_cast(ptr); + } + + /** + * Create a shared pointer to a new processor. + */ + virtual Connectable* createRaw(const std::string &name, uuid_t uuid) { + T *ptr = new T(name, uuid); + return dynamic_cast(ptr); + } + + /** * Gets the name of the object. * @return class name of processor */ @@ -203,6 +233,24 @@ class ClassLoader { template std::shared_ptr instantiate(const std::string &class_name, uuid_t uuid); + /** + * Instantiate object based on class_name + * @param class_name class to create + * @param uuid uuid of object + * @return nullptr or object created from class_name definition. + */ + template + T *instantiateRaw(const std::string &class_name, const std::string &name); + + /** + * Instantiate object based on class_name + * @param class_name class to create + * @param uuid uuid of object + * @return nullptr or object created from class_name definition. + */ + template + T *instantiateRaw(const std::string &class_name, uuid_t uuid); + protected: std::map> loaded_factories_; @@ -239,6 +287,30 @@ std::shared_ptr ClassLoader::instantiate(const std::string &class_name, uuid_ } } +template +T *ClassLoader::instantiateRaw(const std::string &class_name, const std::string &name) { + std::lock_guard lock(internal_mutex_); + auto factory_entry = loaded_factories_.find(class_name); + if (factory_entry != loaded_factories_.end()) { + auto obj = factory_entry->second->createRaw(name); + return dynamic_cast(obj); + } else { + return nullptr; + } +} + +template +T *ClassLoader::instantiateRaw(const std::string &class_name, uuid_t uuid) { + std::lock_guard lock(internal_mutex_); + auto factory_entry = loaded_factories_.find(class_name); + if (factory_entry != loaded_factories_.end()) { + auto obj = factory_entry->second->createRaw(class_name, uuid); + return dynamic_cast(obj); + } else { + return nullptr; + } +} + }/* namespace core */ } /* namespace minifi */ } /* namespace nifi */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/core/ConfigurationFactory.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/ConfigurationFactory.h b/libminifi/include/core/ConfigurationFactory.h index 61af8cd..b71da7a 100644 --- a/libminifi/include/core/ConfigurationFactory.h +++ b/libminifi/include/core/ConfigurationFactory.h @@ -30,18 +30,15 @@ namespace core { template typename std::enable_if::value, T*>::type instantiate(const std::shared_ptr &repo, const std::shared_ptr &flow_file_repo, - const std::shared_ptr &content_repo, - std::shared_ptr configuration, + const std::shared_ptr &content_repo, std::shared_ptr configuration, const std::string path) { throw std::runtime_error("Cannot instantiate class"); } template typename std::enable_if::value, T*>::type instantiate(const std::shared_ptr &repo, const std::shared_ptr &flow_file_repo, - const std::shared_ptr &content_repo, - const std::shared_ptr &stream_factory, - std::shared_ptr configuration, - const std::string path) { + const std::shared_ptr &content_repo, const std::shared_ptr &stream_factory, + std::shared_ptr configuration, const std::string path) { return new T(repo, flow_file_repo, content_repo, stream_factory, configuration, path); } @@ -50,10 +47,8 @@ typename std::enable_if::value, T*>::type instantiate(const * object. */ std::unique_ptr createFlowConfiguration(std::shared_ptr repo, std::shared_ptr flow_file_repo, - std::shared_ptr content_repo, - std::shared_ptr configure, - std::shared_ptr stream_factory, - const std::string configuration_class_name, const std::string path = "", + std::shared_ptr content_repo, std::shared_ptr configure, + std::shared_ptr stream_factory, const std::string configuration_class_name, const std::string path = "", bool fail_safe = false); } /* namespace core */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/core/ContentRepository.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/ContentRepository.h b/libminifi/include/core/ContentRepository.h index b544ca0..5558b93 100644 --- a/libminifi/include/core/ContentRepository.h +++ b/libminifi/include/core/ContentRepository.h @@ -49,6 +49,56 @@ class ContentRepository : public StreamManager { */ virtual void stop() = 0; + /** + * Removes an item if it was orphan + */ + virtual bool removeIfOrphaned(const std::shared_ptr &streamId) { + std::lock_guard lock(count_map_mutex_); + const std::string str = streamId->getContentFullPath(); + auto count = count_map_.find(str); + if (count != count_map_.end()) { + if (count_map_[str] == 0) { + remove(streamId); + return true; + } + } + + return false; + } + + virtual uint32_t getStreamCount(const std::shared_ptr &streamId) { + std::lock_guard lock(count_map_mutex_); + return count_map_[streamId->getContentFullPath()]; + } + + virtual void incrementStreamCount(const std::shared_ptr &streamId) { + std::lock_guard lock(count_map_mutex_); + const std::string str = streamId->getContentFullPath(); + auto count = count_map_.find(str); + if (count != count_map_.end()) { + count_map_[str] = count->second + 1; + } else { + count_map_[str] = 1; + } + } + + virtual void decrementStreamCount(const std::shared_ptr &streamId) { + std::lock_guard lock(count_map_mutex_); + const std::string str = streamId->getContentFullPath(); + auto count = count_map_.find(str); + if (count != count_map_.end() && count->second > 0) { + count_map_[str] = count->second - 1; + } else { + count_map_[str] = 0; + } + } + + protected: + + std::mutex count_map_mutex_; + + std::map count_map_; + }; } /* namespace core */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/core/FlowConfiguration.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/FlowConfiguration.h b/libminifi/include/core/FlowConfiguration.h index d9ebc72..ffc567f 100644 --- a/libminifi/include/core/FlowConfiguration.h +++ b/libminifi/include/core/FlowConfiguration.h @@ -60,8 +60,7 @@ class FlowConfiguration : public CoreComponent { * the flow controller. */ explicit FlowConfiguration(std::shared_ptr repo, std::shared_ptr flow_file_repo, std::shared_ptr content_repo, - std::shared_ptr stream_factory, - std::shared_ptr configuration, const std::string path) + std::shared_ptr stream_factory, std::shared_ptr configuration, const std::string path) : CoreComponent(core::getClassName()), flow_file_repo_(flow_file_repo), content_repo_(content_repo), @@ -102,8 +101,7 @@ class FlowConfiguration : public CoreComponent { return getRoot(config_path_); } - virtual std::unique_ptr getRootFromPayload( - std::string &yamlConfigPayload) { + virtual std::unique_ptr getRootFromPayload(const std::string &yamlConfigPayload) { return nullptr; } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/core/ProcessContext.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/ProcessContext.h b/libminifi/include/core/ProcessContext.h index f6aaf5e..1158c71 100644 --- a/libminifi/include/core/ProcessContext.h +++ b/libminifi/include/core/ProcessContext.h @@ -48,49 +48,49 @@ class ProcessContext : public controller::ControllerServiceLookup { /*! * Create a new process context associated with the processor/controller service/state manager */ - ProcessContext(ProcessorNode &processor, std::shared_ptr &controller_service_provider, std::shared_ptr repo, - std::shared_ptr flow_repo, - std::shared_ptr content_repo = std::make_shared()) + ProcessContext(const std::shared_ptr &processor, std::shared_ptr &controller_service_provider, const std::shared_ptr &repo, + const std::shared_ptr &flow_repo, const std::shared_ptr &content_repo = std::make_shared()) : processor_node_(processor), controller_service_provider_(controller_service_provider), logger_(logging::LoggerFactory::getLogger()), - content_repo_(content_repo), flow_repo_(flow_repo) { + content_repo_(content_repo), + flow_repo_(flow_repo) { repo_ = repo; } // Destructor virtual ~ProcessContext() { } // Get Processor associated with the Process Context - ProcessorNode &getProcessorNode() { + std::shared_ptr getProcessorNode() { return processor_node_; } - bool getProperty(std::string name, std::string &value) { - return processor_node_.getProperty(name, value); + bool getProperty(const std::string &name, std::string &value) { + return processor_node_->getProperty(name, value); } // Sets the property value using the property's string name - bool setProperty(std::string name, std::string value) { - return processor_node_.setProperty(name, value); + bool setProperty(const std::string &name, std::string value) { + return processor_node_->setProperty(name, value); } // Sets the property value using the Property object bool setProperty(Property prop, std::string value) { - return processor_node_.setProperty(prop, value); + return processor_node_->setProperty(prop, value); } // Whether the relationship is supported bool isSupportedRelationship(Relationship relationship) { - return processor_node_.isSupportedRelationship(relationship); + return processor_node_->isSupportedRelationship(relationship); } // Check whether the relationship is auto terminated bool isAutoTerminated(Relationship relationship) { - return processor_node_.isAutoTerminated(relationship); + return processor_node_->isAutoTerminated(relationship); } // Get ProcessContext Maximum Concurrent Tasks uint8_t getMaxConcurrentTasks(void) { - return processor_node_.getMaxConcurrentTasks(); + return processor_node_->getMaxConcurrentTasks(); } // Yield based on the yield period void yield() { - processor_node_.yield(); + processor_node_->yield(); } std::shared_ptr getProvenanceRepository() { @@ -104,7 +104,7 @@ class ProcessContext : public controller::ControllerServiceLookup { std::shared_ptr getContentRepository() { return content_repo_; } - + std::shared_ptr getFlowFileRepository() { return flow_repo_; } @@ -122,7 +122,7 @@ class ProcessContext : public controller::ControllerServiceLookup { * identifier */ std::shared_ptr getControllerService(const std::string &identifier) { - return controller_service_provider_->getControllerServiceForComponent(identifier, processor_node_.getUUIDStr()); + return controller_service_provider_->getControllerServiceForComponent(identifier, processor_node_->getUUIDStr()); } /** @@ -167,7 +167,7 @@ class ProcessContext : public controller::ControllerServiceLookup { // repository shared pointer. std::shared_ptr content_repo_; // Processor - ProcessorNode processor_node_; + std::shared_ptr processor_node_; // Logger std::shared_ptr logger_; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/core/ProcessGroup.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/ProcessGroup.h b/libminifi/include/core/ProcessGroup.h index a0e51e3..907fdfc 100644 --- a/libminifi/include/core/ProcessGroup.h +++ b/libminifi/include/core/ProcessGroup.h @@ -144,6 +144,8 @@ class ProcessGroup { std::shared_ptr findProcessor(uuid_t uuid); // findProcessor based on name std::shared_ptr findProcessor(const std::string &processorName); + + void getAllProcessors(std::vector> &processor_vec); /** * Add controller service * @param nodeId node identifier http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/core/ProcessSession.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/ProcessSession.h b/libminifi/include/core/ProcessSession.h index 3a3b143..5b78022 100644 --- a/libminifi/include/core/ProcessSession.h +++ b/libminifi/include/core/ProcessSession.h @@ -47,12 +47,12 @@ class ProcessSession { /*! * Create a new process session */ - ProcessSession(ProcessContext *processContext = NULL) + ProcessSession(std::shared_ptr processContext = nullptr) : process_context_(processContext), logger_(logging::LoggerFactory::getLogger()) { - logger_->log_trace("ProcessSession created for %s", process_context_->getProcessorNode().getName()); + logger_->log_trace("ProcessSession created for %s", process_context_->getProcessorNode()->getName()); auto repo = processContext->getProvenanceRepository(); - provenance_report_ = new provenance::ProvenanceReporter(repo, process_context_->getProcessorNode().getUUIDStr(), process_context_->getProcessorNode().getName()); + provenance_report_ = new provenance::ProvenanceReporter(repo, process_context_->getProcessorNode()->getName(), process_context_->getProcessorNode()->getName()); } // Destructor @@ -125,8 +125,9 @@ class ProcessSession { void import(std::string source, std::shared_ptr &&flow, bool keepSource = true, uint64_t offset = 0); - void import(std::string source, std::vector> flows, - bool keepSource, uint64_t offset, char inputDelimiter); + void import(std::string source, std::vector> &flows, + bool keepSource, + uint64_t offset, char inputDelimiter); // Prevent default copy constructor and assignment operation // Only support pass by reference or pointer @@ -151,7 +152,7 @@ class ProcessSession { // Clone the flow file during transfer to multiple connections for a relationship std::shared_ptr cloneDuringTransfer(std::shared_ptr &parent); // ProcessContext - ProcessContext *process_context_; + std::shared_ptr process_context_; // Logger std::shared_ptr logger_; // Provenance Report http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/core/ProcessSessionFactory.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/ProcessSessionFactory.h b/libminifi/include/core/ProcessSessionFactory.h index e0ebe18..dc4a9f5 100644 --- a/libminifi/include/core/ProcessSessionFactory.h +++ b/libminifi/include/core/ProcessSessionFactory.h @@ -38,12 +38,12 @@ class ProcessSessionFactory { /*! * Create a new process session factory */ - explicit ProcessSessionFactory(ProcessContext *processContext) + explicit ProcessSessionFactory(std::shared_ptr processContext) : process_context_(processContext) { } // Create the session - std::unique_ptr createSession(); + std::shared_ptr createSession(); // Prevent default copy constructor and assignment operation // Only support pass by reference or pointer @@ -52,7 +52,7 @@ class ProcessSessionFactory { private: // ProcessContext - ProcessContext *process_context_; + std::shared_ptr process_context_; }; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/core/Processor.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/Processor.h b/libminifi/include/core/Processor.h index 0853c11..99b1043 100644 --- a/libminifi/include/core/Processor.h +++ b/libminifi/include/core/Processor.h @@ -72,6 +72,7 @@ class Processor : public Connectable, public ConfigurableComponent, public std:: Processor(std::string name, uuid_t uuid = NULL); // Destructor virtual ~Processor() { + notifyStop(); } bool isRunning(); @@ -153,7 +154,8 @@ class Processor : public Connectable, public ConfigurableComponent, public std:: } // decrement Active Task Counts void decrementActiveTask(void) { - active_tasks_--; + if (active_tasks_ > 0) + active_tasks_--; } void clearActiveTask(void) { active_tasks_ = 0; @@ -183,7 +185,7 @@ class Processor : public Connectable, public ConfigurableComponent, public std:: if (yield_expiration_ > curTime) return (yield_expiration_ - curTime); else - return 0;; + return 0; } // Whether flow file queued in incoming connection bool flowFilesQueued(); @@ -203,6 +205,9 @@ class Processor : public Connectable, public ConfigurableComponent, public std:: // Get the Next RoundRobin incoming connection std::shared_ptr getNextIncomingConnection(); // On Trigger + + void onTrigger(std::shared_ptr context, std::shared_ptr sessionFactory); + void onTrigger(ProcessContext *context, ProcessSessionFactory *sessionFactory); virtual bool canEdit() { @@ -212,19 +217,35 @@ class Processor : public Connectable, public ConfigurableComponent, public std:: public: // OnTrigger method, implemented by NiFi Processor Designer + virtual void onTrigger(std::shared_ptr context, std::shared_ptr session) { + onTrigger(context.get(), session.get()); + } virtual void onTrigger(ProcessContext *context, ProcessSession *session) = 0; // Initialize, overridden by NiFi Process Designer virtual void initialize() { } // Scheduled event hook, overridden by NiFi Process Designer + virtual void onSchedule(std::shared_ptr context, std::shared_ptr sessionFactory) { + onSchedule(context.get(), sessionFactory.get()); + } virtual void onSchedule(ProcessContext *context, ProcessSessionFactory *sessionFactory) { } // Check all incoming connections for work bool isWorkAvailable(); + void setStreamFactory(std::shared_ptr stream_factory) { + stream_factory_ = stream_factory; + } + protected: + virtual void notifyStop() { + + } + + std::shared_ptr stream_factory_; + // Processor state std::atomic state_; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/core/ProcessorNode.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/ProcessorNode.h b/libminifi/include/core/ProcessorNode.h index 9bf1f52..f420728 100644 --- a/libminifi/include/core/ProcessorNode.h +++ b/libminifi/include/core/ProcessorNode.h @@ -35,10 +35,12 @@ namespace core { */ class ProcessorNode : public ConfigurableComponent, public Connectable { public: - explicit ProcessorNode(const std::shared_ptr processor); + explicit ProcessorNode(const std::shared_ptr &processor); explicit ProcessorNode(const ProcessorNode &other); + explicit ProcessorNode(const ProcessorNode &&other); + /** * Get property using the provided name. * @param name property name. @@ -59,7 +61,7 @@ class ProcessorNode : public ConfigurableComponent, public Connectable { * @param value value passed in by reference * @return result of getting property. */ - bool getProperty(const std::string name, std::string &value) { + bool getProperty(const std::string &name, std::string &value) { const std::shared_ptr processor_cast = std::dynamic_pointer_cast(processor_); if (nullptr != processor_cast) return processor_cast->getProperty(name, value); @@ -73,7 +75,7 @@ class ProcessorNode : public ConfigurableComponent, public Connectable { * @param value property value. * @return result of setting property. */ - bool setProperty(const std::string name, std::string value) { + bool setProperty(const std::string &name, std::string value) { const std::shared_ptr processor_cast = std::dynamic_pointer_cast(processor_); bool ret = ConfigurableComponent::setProperty(name, value); if (nullptr != processor_cast) @@ -137,7 +139,7 @@ class ProcessorNode : public ConfigurableComponent, public Connectable { * Set name. * @param name */ - void setName(const std::string name) { + void setName(const std::string &name) { Connectable::setName(name); processor_->setName(name); } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/core/Property.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/Property.h b/libminifi/include/core/Property.h index 278e2ec..9eee1b5 100644 --- a/libminifi/include/core/Property.h +++ b/libminifi/include/core/Property.h @@ -162,11 +162,16 @@ class Property { } std::string unit(pEnd); + std::transform(unit.begin(), unit.end(), unit.begin(), ::tolower); if (unit == "sec" || unit == "s" || unit == "second" || unit == "seconds" || unit == "secs") { timeunit = SECOND; output = ival; return true; + } else if (unit == "msec" || unit == "ms" || unit == "millisecond" || unit == "milliseconds" || unit == "msecs") { + timeunit = MILLISECOND; + output = ival; + return true; } else if (unit == "min" || unit == "m" || unit == "mins" || unit == "minute" || unit == "minutes") { timeunit = MINUTE; output = ival; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/core/Relationship.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/Relationship.h b/libminifi/include/core/Relationship.h index 416ede6..d0157d9 100644 --- a/libminifi/include/core/Relationship.h +++ b/libminifi/include/core/Relationship.h @@ -59,7 +59,7 @@ class Relationship { : name_(UNDEFINED_RELATIONSHIP) { } // Destructor - virtual ~Relationship() { + ~Relationship() { } // Get Name for the relationship std::string getName() const { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/core/Repository.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/Repository.h b/libminifi/include/core/Repository.h index f1b47ae..cdc81c5 100644 --- a/libminifi/include/core/Repository.h +++ b/libminifi/include/core/Repository.h @@ -59,10 +59,11 @@ class Repository : public core::SerializableComponent { * Constructor for the repository */ Repository(std::string repo_name = "Repository", std::string directory = REPOSITORY_DIRECTORY, int64_t maxPartitionMillis = MAX_REPOSITORY_ENTRY_LIFE_TIME, int64_t maxPartitionBytes = - MAX_REPOSITORY_STORAGE_SIZE, + MAX_REPOSITORY_STORAGE_SIZE, uint64_t purgePeriod = REPOSITORY_PURGE_PERIOD) : core::SerializableComponent(repo_name), thread_(), + repo_size_(0), logger_(logging::LoggerFactory::getLogger()) { directory_ = directory; max_partition_millis_ = maxPartitionMillis; @@ -77,6 +78,8 @@ class Repository : public core::SerializableComponent { stop(); } + virtual void flush(); + // initialize virtual bool initialize(const std::shared_ptr &configure) { return true; @@ -193,6 +196,8 @@ class Repository : public core::SerializableComponent { } + virtual uint64_t getRepoSize(); + // Prevent default copy constructor and assignment operation // Only support pass by reference or pointer Repository(const Repository &parent) = delete; @@ -216,7 +221,7 @@ class Repository : public core::SerializableComponent { // whether stop accepting provenace event std::atomic repo_full_; // repoSize - uint64_t repoSize(); + // size of the directory std::atomic repo_size_; // Run function for the thread http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/core/StreamManager.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/StreamManager.h b/libminifi/include/core/StreamManager.h index 468526d..b6d0f0a 100644 --- a/libminifi/include/core/StreamManager.h +++ b/libminifi/include/core/StreamManager.h @@ -70,6 +70,19 @@ class StreamManager { */ virtual bool remove(const std::shared_ptr &streamId) = 0; + /** + * Removes an item if it was orphan + */ + virtual bool removeIfOrphaned(const std::shared_ptr &streamId) = 0; + + virtual uint32_t getStreamCount(const std::shared_ptr &streamId) = 0; + + virtual void incrementStreamCount(const std::shared_ptr &streamId) = 0; + + virtual void decrementStreamCount(const std::shared_ptr &streamId) = 0; + + virtual bool exists(const std::shared_ptr &streamId) = 0; + }; } /* namespace core */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/core/controller/ControllerServiceProvider.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/controller/ControllerServiceProvider.h b/libminifi/include/core/controller/ControllerServiceProvider.h index 148f970..bf02080 100644 --- a/libminifi/include/core/controller/ControllerServiceProvider.h +++ b/libminifi/include/core/controller/ControllerServiceProvider.h @@ -18,6 +18,7 @@ #ifndef LIBMINIFI_INCLUDE_CORE_CONTROLLER_CONTROLLERSERVICEPROVIDER_H_ #define LIBMINIFI_INCLUDE_CORE_CONTROLLER_CONTROLLERSERVICEPROVIDER_H_ +#include #include #include "core/Core.h" #include "ControllerServiceLookup.h" @@ -95,7 +96,7 @@ class ControllerServiceProvider : public CoreComponent, public ConfigurableCompo * Enables the provided controller service * @param serviceNode controller service node. */ - virtual void enableControllerService(std::shared_ptr &serviceNode) = 0; + virtual std::future enableControllerService(std::shared_ptr &serviceNode) = 0; /** * Enables the provided controller service nodes @@ -107,7 +108,7 @@ class ControllerServiceProvider : public CoreComponent, public ConfigurableCompo * Disables the provided controller service node * @param serviceNode controller service node. */ - virtual void disableControllerService(std::shared_ptr &serviceNode) = 0; + virtual std::future disableControllerService(std::shared_ptr &serviceNode) = 0; /** * Gets a list of all controller services. http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/core/controller/StandardControllerServiceProvider.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/controller/StandardControllerServiceProvider.h b/libminifi/include/core/controller/StandardControllerServiceProvider.h index f474ed2..4cb33a7 100644 --- a/libminifi/include/core/controller/StandardControllerServiceProvider.h +++ b/libminifi/include/core/controller/StandardControllerServiceProvider.h @@ -42,8 +42,7 @@ class StandardControllerServiceProvider : public ControllerServiceProvider, publ public: explicit StandardControllerServiceProvider(std::shared_ptr services, std::shared_ptr root_group, std::shared_ptr configuration, - std::shared_ptr agent, - ClassLoader &loader = ClassLoader::getDefaultClassLoader()) + std::shared_ptr agent, ClassLoader &loader = ClassLoader::getDefaultClassLoader()) : ControllerServiceProvider(services), root_group_(root_group), agent_(agent), @@ -90,8 +89,7 @@ class StandardControllerServiceProvider : public ControllerServiceProvider, publ } std::shared_ptr new_service_node = std::make_shared(new_controller_service, - std::static_pointer_cast(shared_from_this()), - id, + std::static_pointer_cast(shared_from_this()), id, configuration_); controller_map_->put(id, new_service_node); @@ -99,9 +97,12 @@ class StandardControllerServiceProvider : public ControllerServiceProvider, publ } - void enableControllerService(std::shared_ptr &serviceNode) { + std::future enableControllerService(std::shared_ptr &serviceNode) { if (serviceNode->canEnable()) { - agent_->enableControllerService(serviceNode); + return agent_->enableControllerService(serviceNode); + } else { + std::future no_run = std::async(std::launch::async, []() {return false;}); + return no_run; } } @@ -124,9 +125,12 @@ class StandardControllerServiceProvider : public ControllerServiceProvider, publ } } - void disableControllerService(std::shared_ptr &serviceNode) { + std::future disableControllerService(std::shared_ptr &serviceNode) { if (!IsNullOrEmpty(serviceNode.get()) && serviceNode->enabled()) { - agent_->disableControllerService(serviceNode); + return agent_->disableControllerService(serviceNode); + } else { + std::future no_run = std::async(std::launch::async, []() {return false;}); + return no_run; } } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/core/logging/Logger.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/logging/Logger.h b/libminifi/include/core/logging/Logger.h index 8b80006..d7d52b6 100644 --- a/libminifi/include/core/logging/Logger.h +++ b/libminifi/include/core/logging/Logger.h @@ -115,7 +115,7 @@ class Logger { std::shared_ptr delegate_; std::mutex mutex_; - private: + private: template inline void log(spdlog::level::level_enum level, const char * const format, const Args& ... args) { std::lock_guard lock(mutex_); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/core/logging/LoggerConfiguration.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/logging/LoggerConfiguration.h b/libminifi/include/core/logging/LoggerConfiguration.h index 787fec5..fbcdaad 100644 --- a/libminifi/include/core/logging/LoggerConfiguration.h +++ b/libminifi/include/core/logging/LoggerConfiguration.h @@ -31,7 +31,6 @@ #include "core/logging/Logger.h" #include "properties/Properties.h" - namespace org { namespace apache { namespace nifi { @@ -78,7 +77,7 @@ class LoggerProperties : public Properties { static const char* appender_prefix; static const char* logger_prefix; - private: + private: std::map> sinks_; }; @@ -102,11 +101,11 @@ class LoggerConfiguration { */ std::shared_ptr getLogger(const std::string &name); static const char *spdlog_default_pattern; - protected: + protected: static std::shared_ptr initialize_namespaces(const std::shared_ptr &logger_properties); static std::shared_ptr get_logger(std::shared_ptr logger, const std::shared_ptr &root_namespace, const std::string &name, std::shared_ptr formatter, bool remove_if_present = false); - private: + private: static std::shared_ptr create_default_root(); class LoggerImpl : public Logger { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/core/reporting/SiteToSiteProvenanceReportingTask.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/reporting/SiteToSiteProvenanceReportingTask.h b/libminifi/include/core/reporting/SiteToSiteProvenanceReportingTask.h index 2bd4099..0e4ba81 100644 --- a/libminifi/include/core/reporting/SiteToSiteProvenanceReportingTask.h +++ b/libminifi/include/core/reporting/SiteToSiteProvenanceReportingTask.h @@ -53,7 +53,7 @@ class SiteToSiteProvenanceReportingTask : public minifi::RemoteProcessorGroupPor batch_size_ = 100; } //! Destructor - virtual ~SiteToSiteProvenanceReportingTask() { + ~SiteToSiteProvenanceReportingTask() { } //! Report Task Name static constexpr char const* ReportTaskName = "SiteToSiteProvenanceReportingTask"; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/core/repository/AtomicRepoEntries.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/repository/AtomicRepoEntries.h b/libminifi/include/core/repository/AtomicRepoEntries.h index c681060..e96f19b 100644 --- a/libminifi/include/core/repository/AtomicRepoEntries.h +++ b/libminifi/include/core/repository/AtomicRepoEntries.h @@ -202,8 +202,7 @@ class AtomicEntry { bool setRepoValue(RepoValue &new_value, RepoValue &old_value, size_t &prev_size) { // delete the underlying pointer bool lock = false; - if (!write_pending_.compare_exchange_weak(lock, true)) - { + if (!write_pending_.compare_exchange_weak(lock, true)) { return false; } if (has_value_) { @@ -215,19 +214,17 @@ class AtomicEntry { try_unlock(); return true; } - - - AtomicEntry *takeOwnership() - { - bool lock = false; - if (!write_pending_.compare_exchange_weak(lock, true) ) - return nullptr; - - ref_count_++; - - try_unlock(); - - return this; + + AtomicEntry *takeOwnership() { + bool lock = false; + if (!write_pending_.compare_exchange_weak(lock, true)) + return nullptr; + + ref_count_++; + + try_unlock(); + + return this; } /** * A test and set operation, which is used to allow a function to test @@ -238,33 +235,29 @@ class AtomicEntry { bool testAndSetKey(const T str, std::function releaseTest = nullptr, std::function reclaimer = nullptr, std::function comparator = nullptr) { bool lock = false; - if (!write_pending_.compare_exchange_weak(lock, true) ) + if (!write_pending_.compare_exchange_weak(lock, true)) return false; if (has_value_) { // we either don't have a release test or we cannot release this // entity - if (releaseTest != nullptr && reclaimer != nullptr && releaseTest(value_.getKey())) - { + if (releaseTest != nullptr && reclaimer != nullptr && releaseTest(value_.getKey())) { reclaimer(value_.getKey()); - } - else if (free_required && ref_count_ == 0) - { - size_t bufferSize = value_.getBufferSize(); - value_.clearBuffer(); - has_value_ = false; - if (accumulated_repo_size_ != nullptr) { - *accumulated_repo_size_ -= bufferSize; - } - free_required = false; - } - else { + } else if (free_required && ref_count_ == 0) { + size_t bufferSize = value_.getBufferSize(); + value_.clearBuffer(); + has_value_ = false; + if (accumulated_repo_size_ != nullptr) { + *accumulated_repo_size_ -= bufferSize; + } + free_required = false; + } else { try_unlock(); return false; } } - ref_count_=1; + ref_count_ = 1; value_.setKey(str, comparator); has_value_ = true; try_unlock(); @@ -308,27 +301,25 @@ class AtomicEntry { try_unlock(); return true; } - - void decrementOwnership(){ + + void decrementOwnership() { try_lock(); if (!has_value_) { try_unlock(); return; } - if (ref_count_ > 0){ + if (ref_count_ > 0) { ref_count_--; } - if (ref_count_ == 0 && free_required) - { + if (ref_count_ == 0 && free_required) { size_t bufferSize = value_.getBufferSize(); value_.clearBuffer(); has_value_ = false; if (accumulated_repo_size_ != nullptr) { - *accumulated_repo_size_ -= bufferSize; + *accumulated_repo_size_ -= bufferSize; } free_required = false; - } - else{ + } else { } try_unlock(); } @@ -382,15 +373,14 @@ class AtomicEntry { try_unlock(); return ref; } - - size_t getLength() - { + + size_t getLength() { size_t size = 0; - try_lock(); - size = value_.getBufferSize(); - try_unlock(); - return size; - + try_lock(); + size = value_.getBufferSize(); + try_unlock(); + return size; + } /** @@ -407,11 +397,10 @@ class AtomicEntry { try_unlock(); return false; } - if (ref_count_ > 0) - { - free_required = true; - try_unlock(); - return true; + if (ref_count_ > 0) { + free_required = true; + try_unlock(); + return true; } size_t bufferSize = value_.getBufferSize(); value_.clearBuffer(); @@ -460,7 +449,7 @@ class AtomicEntry { */ inline void try_lock() { bool lock = false; - while (!write_pending_.compare_exchange_weak(lock, true,std::memory_order_acquire)) { + while (!write_pending_.compare_exchange_weak(lock, true, std::memory_order_acquire)) { lock = false; // attempt again } @@ -471,7 +460,7 @@ class AtomicEntry { */ inline void try_unlock() { bool lock = true; - while (!write_pending_.compare_exchange_weak(lock, false,std::memory_order_acquire)) { + while (!write_pending_.compare_exchange_weak(lock, false, std::memory_order_acquire)) { lock = true; // attempt again } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/core/repository/FileSystemRepository.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/repository/FileSystemRepository.h b/libminifi/include/core/repository/FileSystemRepository.h index 84bf01e..4a5ad5e 100644 --- a/libminifi/include/core/repository/FileSystemRepository.h +++ b/libminifi/include/core/repository/FileSystemRepository.h @@ -47,6 +47,8 @@ class FileSystemRepository : public core::ContentRepository, public core::CoreCo virtual void stop(); + bool exists(const std::shared_ptr &streamId); + virtual std::shared_ptr write(const std::shared_ptr &claim); virtual std::shared_ptr read(const std::shared_ptr &claim); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/core/repository/FlowFileRepository.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/repository/FlowFileRepository.h b/libminifi/include/core/repository/FlowFileRepository.h index 28b9c05..f2691ac 100644 --- a/libminifi/include/core/repository/FlowFileRepository.h +++ b/libminifi/include/core/repository/FlowFileRepository.h @@ -26,6 +26,7 @@ #include "core/Core.h" #include "Connection.h" #include "core/logging/LoggerConfiguration.h" +#include "concurrentqueue.h" namespace org { namespace apache { @@ -61,6 +62,8 @@ class FlowFileRepository : public core::Repository, public std::enable_shared_fr delete db_; } + virtual void flush(); + // initialize virtual bool initialize(const std::shared_ptr &configure) { std::string value; @@ -98,7 +101,7 @@ class FlowFileRepository : public core::Repository, public std::enable_shared_fr // persistent to the DB leveldb::Slice value((const char *) buf, bufLen); leveldb::Status status; - repo_size_+=bufLen; + repo_size_ += bufLen; status = db_->Put(leveldb::WriteOptions(), key, value); if (status.ok()) return true; @@ -111,20 +114,16 @@ class FlowFileRepository : public core::Repository, public std::enable_shared_fr * @return status of the delete operation */ virtual bool Delete(std::string key) { - leveldb::Status status; - status = db_->Delete(leveldb::WriteOptions(), key); - if (status.ok()) - { - return true; - } - else - return false; + keys_to_delete.enqueue(key); + return true; } /** * Sets the value from the provided key * @return status of the get operation. */ virtual bool Get(const std::string &key, std::string &value) { + if (db_ == nullptr) + return false; leveldb::Status status; status = db_->Get(leveldb::ReadOptions(), key, &value); if (status.ok()) @@ -139,12 +138,10 @@ class FlowFileRepository : public core::Repository, public std::enable_shared_fr virtual void loadComponent(const std::shared_ptr &content_repo); void start() { - if (this->purge_period_ <= 0) - { + if (this->purge_period_ <= 0) { return; } - if (running_) - { + if (running_) { return; } running_ = true; @@ -153,6 +150,7 @@ class FlowFileRepository : public core::Repository, public std::enable_shared_fr } private: + moodycamel::ConcurrentQueue keys_to_delete; std::map> connectionMap; std::shared_ptr content_repo_; leveldb::DB* db_; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/core/repository/VolatileContentRepository.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/repository/VolatileContentRepository.h b/libminifi/include/core/repository/VolatileContentRepository.h index 306a812..8507216 100644 --- a/libminifi/include/core/repository/VolatileContentRepository.h +++ b/libminifi/include/core/repository/VolatileContentRepository.h @@ -50,8 +50,7 @@ class VolatileContentRepository : public core::ContentRepository, public core::r virtual ~VolatileContentRepository() { if (!minimize_locking_) { std::lock_guard lock(map_mutex_); - for (const auto &item : master_list_) - { + for (const auto &item : master_list_) { delete item.second; } master_list_.clear(); @@ -84,6 +83,8 @@ class VolatileContentRepository : public core::ContentRepository, public core::r */ virtual std::shared_ptr read(const std::shared_ptr &claim); + virtual bool exists(const std::shared_ptr &streamId); + /** * Closes the claim. * @return whether or not the claim is associated with content stored in volatile memory. @@ -104,8 +105,7 @@ class VolatileContentRepository : public core::ContentRepository, public core::r virtual void run(); template - std::shared_ptr shared_from_parent() - { + std::shared_ptr shared_from_parent() { return std::static_pointer_cast(shared_from_this()); } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/core/repository/VolatileFlowFileRepository.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/repository/VolatileFlowFileRepository.h b/libminifi/include/core/repository/VolatileFlowFileRepository.h index 059c1de..0e75580 100644 --- a/libminifi/include/core/repository/VolatileFlowFileRepository.h +++ b/libminifi/include/core/repository/VolatileFlowFileRepository.h @@ -31,8 +31,7 @@ namespace repository { * Volatile flow file repository. keeps a running counter of the current location, freeing * those which we no longer hold. */ -class VolatileFlowFileRepository : public VolatileRepository -{ +class VolatileFlowFileRepository : public VolatileRepository { public: explicit VolatileFlowFileRepository(std::string repo_name = "", std::string dir = REPOSITORY_DIRECTORY, int64_t maxPartitionMillis = MAX_REPOSITORY_ENTRY_LIFE_TIME, int64_t maxPartitionBytes = MAX_REPOSITORY_STORAGE_SIZE, @@ -48,11 +47,9 @@ class VolatileFlowFileRepository : public VolatileRepository repo_full_ = false; while (running_) { std::this_thread::sleep_for(std::chrono::milliseconds(purge_period_)); - if (purge_required_ && nullptr != content_repo_) - { + if (purge_required_ && nullptr != content_repo_) { std::lock_guard lock(purge_mutex_); - for (auto purgeItem : purge_list_) - { + for (auto purgeItem : purge_list_) { std::shared_ptr newClaim = std::make_shared(purgeItem, content_repo_, true); content_repo_->remove(newClaim); } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/core/repository/VolatileProvenanceRepository.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/repository/VolatileProvenanceRepository.h b/libminifi/include/core/repository/VolatileProvenanceRepository.h index 7397751..2717510 100644 --- a/libminifi/include/core/repository/VolatileProvenanceRepository.h +++ b/libminifi/include/core/repository/VolatileProvenanceRepository.h @@ -30,8 +30,7 @@ namespace repository { /** * Volatile provenance repository. */ -class VolatileProvenanceRepository : public VolatileRepository -{ +class VolatileProvenanceRepository : public VolatileRepository { public: explicit VolatileProvenanceRepository(std::string repo_name = "", std::string dir = REPOSITORY_DIRECTORY, int64_t maxPartitionMillis = MAX_REPOSITORY_ENTRY_LIFE_TIME, int64_t maxPartitionBytes = http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/core/repository/VolatileRepository.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/repository/VolatileRepository.h b/libminifi/include/core/repository/VolatileRepository.h index 958d91a..da6608c 100644 --- a/libminifi/include/core/repository/VolatileRepository.h +++ b/libminifi/include/core/repository/VolatileRepository.h @@ -240,11 +240,10 @@ bool VolatileRepository::Put(T key, const uint8_t *buf, size_t bufLen) { continue; } } - + updated = value_vector_.at(private_index)->setRepoValue(new_value, old_value, reclaimed_size); - logger_->log_debug("Set repo value at %d out of %d updated %d current_size %d, adding %d to %d", private_index, max_count_,updated==true,reclaimed_size,size, current_size_.load()); - if (updated && reclaimed_size > 0) - { + logger_->log_debug("Set repo value at %d out of %d updated %d current_size %d, adding %d to %d", private_index, max_count_, updated == true, reclaimed_size, size, current_size_.load()); + if (updated && reclaimed_size > 0) { std::lock_guard lock(mutex_); purge_list_.push_back(old_value.getKey()); } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/core/state/ProcessorController.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/state/ProcessorController.h b/libminifi/include/core/state/ProcessorController.h new file mode 100644 index 0000000..de7c673 --- /dev/null +++ b/libminifi/include/core/state/ProcessorController.h @@ -0,0 +1,73 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CORE_STATE_PROCESSORCONTROLLER_H_ +#define LIBMINIFI_INCLUDE_CORE_STATE_PROCESSORCONTROLLER_H_ + +#include +#include "core/Processor.h" +#include "SchedulingAgent.h" +#include "UpdateController.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace state { + +/** + * Purpose, Justification, & Design: ProcessController is the state control mechanism for processors. + * This is coupled with the scheduler. Since scheduling agents run processors, we must ensure state + * is set in the processor to enable it to run, after which the scheduling agent will then be allowed + * to run the aforementioned processor. + */ +class ProcessorController : public StateController { + public: + + ProcessorController(const std::shared_ptr &processor, const std::shared_ptr &scheduler); + + virtual ~ProcessorController(); + + std::string getComponentName() { + return processor_->getName(); + } + /** + * Start the client + */ + virtual int16_t start(); + /** + * Stop the client + */ + virtual int16_t stop(bool force, uint64_t timeToWait = 0); + + virtual bool isRunning(); + + virtual int16_t pause(); + + protected: + std::shared_ptr processor_; + std::shared_ptr scheduler_; +}; + +} /* namespace state */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif /* LIBMINIFI_INCLUDE_CORE_STATE_PROCESSORCONTROLLER_H_ */ + http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/core/state/StateManager.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/state/StateManager.h b/libminifi/include/core/state/StateManager.h new file mode 100644 index 0000000..412b02e --- /dev/null +++ b/libminifi/include/core/state/StateManager.h @@ -0,0 +1,126 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_STATE_MANAGER_H +#define LIBMINIFI_INCLUDE_STATE_MANAGER_H + +#include +#include +#include + +#include "core/state/metrics/MetricsBase.h" +#include "core/state/metrics/MetricsListener.h" +#include "UpdateController.h" +#include "io/validation.h" +#include "utils/ThreadPool.h" +#include "core/Core.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace state { + +/** + * State manager is meant to be used by implementing agents of this library. It represents the source and sink for metrics, + * the sink for external updates, and encapsulates the thread pool that runs the listeners for various update operations + * that can be performed. + */ +class StateManager : public metrics::MetricsReporter, public metrics::MetricsSink, public StateMonitor, public std::enable_shared_from_this { + public: + + StateManager() + : metrics_listener_(nullptr) { + + } + + virtual ~StateManager() { + + } + + /** + * Initializes the thread pools. + */ + void initialize(); + + /** + * State management operations. + */ + /** + * Stop this controllable. + * @param force force stopping + * @param timeToWait time to wait before giving up. + * @return status of stopping this controller. + */ + virtual int16_t stop(bool force, uint64_t timeToWait = 0); + + /** + * Updates the given flow controller. + */ + int16_t update(const std::shared_ptr &updateController); + + /** + * Passes metrics to the update controllers if they are a metrics sink. + * @param metrics metric to pass through + */ + int16_t setMetrics(const std::shared_ptr &metrics); + + /** + * Metrics operations + */ + virtual int16_t getMetrics(std::vector> &metric_vector, uint16_t metricsClass); + + protected: + + /** + * Function to apply updates for a given update controller. + * @param updateController update controller mechanism. + */ + virtual int16_t applyUpdate(const std::shared_ptr &updateController) = 0; + + /** + * Registers and update controller + * @param updateController update controller to add. + */ + bool registerUpdateListener(const std::shared_ptr &updateController); + + /** + * Base metrics function will employ the default metrics listener. + */ + virtual bool startMetrics(); + + private: + + std::timed_mutex mutex_; + + std::map> metrics_maps_; + + std::vector > updateControllers; + + std::unique_ptr metrics_listener_; + + utils::ThreadPool listener_thread_pool_; + +}; + +} /* namespace state */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif /* LIBMINIFI_INCLUDE_C2_CONTROLLABLE_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/core/state/UpdateController.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/state/UpdateController.h b/libminifi/include/core/state/UpdateController.h new file mode 100644 index 0000000..9d4d2f6 --- /dev/null +++ b/libminifi/include/core/state/UpdateController.h @@ -0,0 +1,252 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_UPDATECONTROLLER_H_ +#define LIBMINIFI_INCLUDE_UPDATECONTROLLER_H_ + +#include +#include "utils/ThreadPool.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace state { + +enum class UpdateState { + INITIATE, + FULLY_APPLIED, + READ_COMPLETE, + PARTIALLY_APPLIED, + NOT_APPLIED, + SET_ERROR, + READ_ERROR, + NESTED // multiple updates embedded into one + +}; + +/** + * Represents the status of an update operation. + * + */ +class UpdateStatus { + public: + UpdateStatus(UpdateState state, int16_t reason = 0); + + UpdateStatus(const UpdateStatus &other); + + UpdateStatus(const UpdateStatus &&other); + + UpdateState getState() const; + + std::string getError() const; + + int16_t getReadonCode() const; + + UpdateStatus &operator=(const UpdateStatus &&other); + + UpdateStatus &operator=(const UpdateStatus &other); + private: + UpdateState state_; + std::string error_; + int16_t reason_; +}; + +class Update { + public: + + Update(UpdateStatus status) + : status_(status) { + + } + + Update(const Update &other) + : status_(other.status_) { + + } + + Update(const Update &&other) + : status_(std::move(other.status_)) { + + } + + virtual ~Update() { + + } + + virtual bool validate() { + return true; + } + + const UpdateStatus &getStatus() const { + return status_; + } + + Update &operator=(const Update &&other) { + status_ = std::move(other.status_); + return *this; + } + + Update &operator=(const Update &other) { + status_ = other.status_; + return *this; + } + + protected: + UpdateStatus status_; +}; + +/** + * Justification and Purpose: Update Runner reflects the post execution functors that determine if + * a given function that is running within a thread pool worker needs to end. + * + * Design: Simply implements isFinished and isCancelled, which it receives by way of the AfterExecute + * class. + */ +class UpdateRunner : public utils::AfterExecute { + public: + explicit UpdateRunner(std::atomic &running) + : running_(&running) { + } + + explicit UpdateRunner(UpdateRunner && other) + : running_(std::move(other.running_)) { + + } + + ~UpdateRunner() { + + } + + virtual bool isFinished(const Update &result) { + if ((result.getStatus().getState() == UpdateState::FULLY_APPLIED || result.getStatus().getState() == UpdateState::READ_COMPLETE) && *running_) { + return false; + } else { + return true; + } + } + virtual bool isCancelled(const Update &result) { + return !*running_; + } + + protected: + + std::atomic *running_; + +}; + +class StateController { + public: + + virtual ~StateController() { + + } + + virtual std::string getComponentName() = 0; + /** + * Start the client + */ + virtual int16_t start() = 0; + /** + * Stop the client + */ + virtual int16_t stop(bool force, uint64_t timeToWait = 0) = 0; + + virtual bool isRunning() = 0; + + virtual int16_t pause() = 0; +}; + +/** + * Justification and Purpose: Update sink is an abstract class with most functions being purely virtual. + * This is meant to reflect the operational sink for the state of the client. Functions, below, represent + * a small and tight interface for operations that can be performed from external controllers. + * + */ +class StateMonitor : public StateController { + public: + virtual ~StateMonitor() { + + } + + std::atomic &isStateMonitorRunning() { + return controller_running_; + } + + virtual std::vector> getComponents(const std::string &name) = 0; + + virtual std::vector> getAllComponents() = 0; + /** + * Operational controllers + */ + + /** + * Drain repositories + */ + virtual int16_t drainRepositories() = 0; + + /** + * Clear connection for the agent. + */ + virtual int16_t clearConnection(const std::string &connection) = 0; + + /** + * Apply an update with the provided string. + * + * < 0 is an error code + * 0 is success + */ + virtual int16_t applyUpdate(const std::string &configuration) = 0; + + /** + * Apply an update that the agent must decode. This is useful for certain operations + * that can't be encapsulated within these definitions. + */ + virtual int16_t applyUpdate(const std::shared_ptr &updateController) = 0; + + /** + * Returns uptime for this module. + * @return uptime for the current state monitor. + */ + virtual uint64_t getUptime() = 0; + + protected: + std::atomic controller_running_; +}; + +/** + * Asks: what is being updated, what can be updated without a restart + * what requires a restart, etc. + */ +class UpdateController { + public: + + virtual std::vector> getFunctions() = 0; + + virtual ~UpdateController() { + + } + +}; + +} /* namespace state */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif /* LIBMINIFI_INCLUDE_C2_UPDATECONTROLLER_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/core/state/metrics/DeviceInformation.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/state/metrics/DeviceInformation.h b/libminifi/include/core/state/metrics/DeviceInformation.h new file mode 100644 index 0000000..9579877 --- /dev/null +++ b/libminifi/include/core/state/metrics/DeviceInformation.h @@ -0,0 +1,319 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CORE_STATE_METRICS_DEVICEINFORMATION_H_ +#define LIBMINIFI_INCLUDE_CORE_STATE_METRICS_DEVICEINFORMATION_H_ + +#include "core/Resource.h" +#include +#include +#include +#include +#include +#if ( defined(__APPLE__) || defined(__MACH__) || defined(BSD)) +#include +#include +#endif +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "MetricsBase.h" +#include "Connection.h" +#include "io/ClientSocket.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace state { +namespace metrics { + +class Device { + public: + Device() { + initialize(); + } + void initialize() { + struct sockaddr_in servAddr; + + addrinfo hints = { sizeof(addrinfo) }; + memset(&hints, 0, sizeof hints); // make sure the struct is empty + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + hints.ai_flags = AI_CANONNAME; + hints.ai_protocol = 0; /* any protocol */ + + char hostname[1024]; + hostname[1023] = '\0'; + gethostname(hostname, 1023); + + std::ifstream device_id_file(".device_id"); + if (device_id_file) { + std::string line; + while (device_id_file) { + if (std::getline(device_id_file, line)) + device_id_ += line; + } + device_id_file.close(); + } else { + device_id_ = getDeviceId(); + + std::ofstream outputFile(".device_id"); + if (outputFile) { + outputFile.write(device_id_.c_str(), device_id_.length()); + } + outputFile.close(); + } + + canonical_hostname_ = hostname; + + std::stringstream ips; + for (auto ip : getIpAddresses()) { + if (ip.find("127") == 0 || ip.find("192") == 0) + continue; + ip_ = ip; + break; + } + + } + + std::string canonical_hostname_; + std::string ip_; + std::string device_id_; + protected: + + std::vector getIpAddresses() { + std::vector ips; + struct ifaddrs *ifaddr, *ifa; + if (getifaddrs(&ifaddr) == -1) { + perror("getifaddrs"); + exit(EXIT_FAILURE); + } + + for (ifa = ifaddr; ifa != NULL; ifa = ifa->ifa_next) { + if ((strcmp("lo", ifa->ifa_name) == 0) || !(ifa->ifa_flags & (IFF_RUNNING))) + continue; + if ((ifa->ifa_addr != NULL) && (ifa->ifa_addr->sa_family == AF_INET)) { + ips.push_back(inet_ntoa(((struct sockaddr_in *) ifa->ifa_addr)->sin_addr)); + } + } + + freeifaddrs(ifaddr); + return ips; + } + +#if __linux__ + std::string getDeviceId() { + + std::hash hash_fn; + std::string macs; + struct ifaddrs *ifaddr, *ifa; + int family, s, n; + char host[NI_MAXHOST]; + + if (getifaddrs(&ifaddr) == -1) { + exit(EXIT_FAILURE); + } + + /* Walk through linked list, maintaining head pointer so we + can free list later */ + for (ifa = ifaddr, n = 0; ifa != NULL; ifa = ifa->ifa_next, n++) { + if (ifa->ifa_addr == NULL) + continue; + + family = ifa->ifa_addr->sa_family; + + /* Display interface name and family (including symbolic + form of the latter for the common families) */ + + /* For an AF_INET* interface address, display the address */ + + if (family == AF_INET || family == AF_INET6) { + s = getnameinfo(ifa->ifa_addr, + (family == AF_INET) ? sizeof(struct sockaddr_in) : + sizeof(struct sockaddr_in6), + host, NI_MAXHOST, + NULL, 0, NI_NUMERICHOST); + if (s != 0) { + printf("getnameinfo() failed: %s\n", gai_strerror(s)); + exit(EXIT_FAILURE); + } + + } + } + + freeifaddrs(ifaddr); + + int sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_IP); + struct ifreq ifr; + struct ifconf ifc; + char buf[1024]; + int success = 0; + ifc.ifc_len = sizeof(buf); + ifc.ifc_buf = buf; + if (ioctl(sock, SIOCGIFCONF, &ifc) == -1) { /* handle error */} + + struct ifreq* it = ifc.ifc_req; + const struct ifreq* const end = it + (ifc.ifc_len / sizeof(struct ifreq)); + + for (; it != end; ++it) { + strcpy(ifr.ifr_name, it->ifr_name); + if (ioctl(sock, SIOCGIFFLAGS, &ifr) == 0) { + if (! (ifr.ifr_flags & IFF_LOOPBACK)) { // don't count loopback + if (ioctl(sock, SIOCGIFHWADDR, &ifr) == 0) { + unsigned char mac[6]; + + memcpy(mac, ifr.ifr_hwaddr.sa_data, 6); + + char mac_add[13]; + snprintf(mac_add,13,"%02X%02X%02X%02X%02X%02X",mac[0], mac[1], mac[2], mac[3], mac[4], mac[5] ); + + macs+= mac_add; + } + } + + } + else { /* handle error */} + } + + close(sock); + + return std::to_string(hash_fn(macs)); + } +#elif( defined(__unix__) || defined(__APPLE__) || defined(__MACH__) || defined(BSD)) // should work on bsd variants as well + std::string getDeviceId() { + ifaddrs* iflist; + bool found = false; + std::hash hash_fn; + std::set macs; + + if (getifaddrs(&iflist) == 0) { + for (ifaddrs* cur = iflist; cur; cur = cur->ifa_next) { + if (cur->ifa_addr && (cur->ifa_addr->sa_family == AF_LINK) && ((sockaddr_dl*) cur->ifa_addr)->sdl_alen) { + sockaddr_dl* sdl = (sockaddr_dl*) cur->ifa_addr; + + if (sdl->sdl_type != IFT_ETHER) { + continue; + } else { + } + char mac[32]; + memcpy(mac, LLADDR(sdl), sdl->sdl_alen); + char mac_add[13]; + snprintf(mac_add, 13, "%02X%02X%02X%02X%02X%02X", mac[0], mac[1], mac[2], mac[3], mac[4], mac[5]); + ///macs += mac_add; + macs.insert(mac_add); + } else if (cur->ifa_addr->sa_family == AF_INET) { + struct sockaddr_in* sockInfoIP = (struct sockaddr_in*) cur->ifa_addr; + } + } + + freeifaddrs(iflist); + } + std::string macstr; + for (auto &mac : macs) { + macstr += mac; + } + return macstr.length() > 0 ? std::to_string(hash_fn(macstr)) : "8675309"; + } +#else + std::string getDeviceId() { + return "NaD"; + } +#endif + + // connection information + int32_t socket_file_descriptor_; + + addrinfo *addr_info_; +}; + +/** + * Justification and Purpose: Provides Device Information + */ +class DeviceInformation : public DeviceMetric { + public: + + DeviceInformation(std::string name, uuid_t uuid) + : DeviceMetric(name, uuid) { + static Device device; + hostname_ = device.canonical_hostname_; + ip_ = device.ip_; + device_id_ = device.device_id_; + } + + DeviceInformation(const std::string &name) + : DeviceMetric(name, 0) { + static Device device; + hostname_ = device.canonical_hostname_; + ip_ = device.ip_; + device_id_ = device.device_id_; + } + + std::string getName() { + return "NetworkInfo"; + } + + std::vector serialize() { + std::vector serialized; + + MetricResponse hostname; + hostname.name = "hostname"; + hostname.value = hostname_; + + MetricResponse ip; + ip.name = "ip"; + ip.value = ip_; + + serialized.push_back(hostname); + serialized.push_back(ip); + + MetricResponse device_id; + device_id.name = "deviceid"; + device_id.value = device_id_; + + serialized.push_back(device_id); + + return serialized; + } + + protected: + + std::string hostname_; + std::string ip_; + std::string device_id_; +}; + +REGISTER_RESOURCE(DeviceInformation); + +} /* namespace metrics */ +} /* namespace state */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif /* LIBMINIFI_INCLUDE_CORE_STATE_METRICS_DEVICEINFORMATION_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/core/state/metrics/MetricsBase.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/state/metrics/MetricsBase.h b/libminifi/include/core/state/metrics/MetricsBase.h new file mode 100644 index 0000000..cc96298 --- /dev/null +++ b/libminifi/include/core/state/metrics/MetricsBase.h @@ -0,0 +1,161 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_METRICS_METRICSBASE_H_ +#define LIBMINIFI_INCLUDE_METRICS_METRICSBASE_H_ + +#include +#include +#include +#include "core/Core.h" +#include "core/Connectable.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace state { +namespace metrics { + +struct MetricResponse { + std::string name; + std::string value; + std::vector children; + MetricResponse &operator=(const MetricResponse &other) { + name = other.name; + value = other.value; + children = other.children; + return *this; + } +}; + +/** + * Purpose: Defines a metric. Serialization is intended to be thread safe. + */ +class Metrics : public core::Connectable { + public: + Metrics() + : core::Connectable("metric", 0) { + } + + Metrics(std::string name, uuid_t uuid) + : core::Connectable(name, uuid) { + } + virtual ~Metrics() { + + } + virtual std::string getName() = 0; + + virtual std::vector serialize() = 0; + + virtual void yield() { + } + virtual bool isRunning() { + return true; + } + virtual bool isWorkAvailable() { + return true; + } + +}; + +/** + * Purpose: Defines a metric that + */ +class DeviceMetric : public Metrics { + public: + DeviceMetric(std::string name, uuid_t uuid) + : Metrics(name, uuid) { + } +}; + +/** + * Purpose: Retrieves Metrics from the defined class. The current Metric, which is a consumable for any reader of Metrics must have the ability to set metrics. + * + */ +class MetricsSource { + public: + + MetricsSource() { + + } + + virtual ~MetricsSource() { + } + + /** + * Retrieves all metrics from this source. + * @param metric_vector -- metrics will be placed in this vector. + * @return result of the get operation. + * 0 Success + * 1 No error condition, but cannot obtain lock in timely manner. + * -1 failure + */ + virtual int16_t getMetrics(std::vector> &metric_vector) = 0; + +}; + +class MetricsReporter { + public: + + MetricsReporter() { + + } + + virtual ~MetricsReporter() { + } + + /** + * Retrieves all metrics from this source. + * @param metric_vector -- metrics will be placed in this vector. + * @return result of the get operation. + * 0 Success + * 1 No error condition, but cannot obtain lock in timely manner. + * -1 failure + */ + virtual int16_t getMetrics(std::vector> &metric_vector, uint8_t metricsClass) = 0; + +}; + +/** + * Purpose: Sink interface for all metrics. The current Metric, which is a consumable for any reader of Metrics must have the ability to set metrics. + * + */ +class MetricsSink { + public: + + virtual ~MetricsSink() { + } + /** + * Setter for metrics in this sink. + * @param metrics metrics to insert into the current sink. + * @return result of the set operation. + * 0 Success + * 1 No error condition, but cannot obtain lock in timely manner. + * -1 failure + */ + virtual int16_t setMetrics(const std::shared_ptr &metrics) = 0; +}; + +} /* namespace metrics */ +} /* namespace state */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif /* LIBMINIFI_INCLUDE_C2_METRICS_METRICSBASE_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/core/state/metrics/MetricsListener.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/state/metrics/MetricsListener.h b/libminifi/include/core/state/metrics/MetricsListener.h new file mode 100644 index 0000000..34df476 --- /dev/null +++ b/libminifi/include/core/state/metrics/MetricsListener.h @@ -0,0 +1,128 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_C2_METRICS_H_ +#define LIBMINIFI_INCLUDE_C2_METRICS_H_ + +#include + +#include "MetricsBase.h" +#include "core/state/UpdateController.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace state { +namespace metrics { + +/** + * Purpose: Class that will represent the metrics updates, which can be performed asynchronously. + */ +class MetricsUpdate : public Update { + public: + MetricsUpdate(UpdateStatus status) + : Update(status) { + + } + virtual bool validate() { + return true; + } +}; + +class MetricsWatcher : public utils::AfterExecute { + public: + explicit MetricsWatcher(std::atomic *running) + : running_(running) { + } + + explicit MetricsWatcher(MetricsWatcher && other) + : running_(std::move(other.running_)) { + + } + + ~MetricsWatcher() { + + } + + virtual bool isFinished(const Update &result) { + if (result.getStatus().getState() == UpdateState::READ_COMPLETE && running_) { + return false; + } else { + return true; + } + } + virtual bool isCancelled(const UpdateStatus &result) { + return false; + } + + protected: + std::atomic *running_; + +}; + +class MetricsListener { + public: + MetricsListener(const std::shared_ptr &source, const std::shared_ptr &sink) + : running_(true), + sink_(sink), + source_(source) { + + function_ = [&]() { + while(running_) { + std::vector> metric_vector; + // simple pass through for the metrics + if (nullptr != source_ && nullptr != sink_) { + source_->getMetrics(metric_vector,0); + for(auto metric : metric_vector) { + sink_->setMetrics(metric); + } + } + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + } + return MetricsUpdate(UpdateState::READ_COMPLETE); + }; + } + + void stop() { + running_ = false; + } + + std::function &getFunction() { + return function_; + } + + std::future &getFuture() { + return future_; + } + + private: + std::function function_; + std::future future_; + std::atomic running_; + std::shared_ptr source_; + std::shared_ptr sink_; +}; + +} /* namespace metrics */ +} /* namespace state */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif /* LIBMINIFI_INCLUDE_C2_METRICS_H_ */