Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id D6C0A200C78 for ; Thu, 18 May 2017 15:07:54 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id D54A3160BC4; Thu, 18 May 2017 13:07:54 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 69EAD160BD1 for ; Thu, 18 May 2017 15:07:52 +0200 (CEST) Received: (qmail 8999 invoked by uid 500); 18 May 2017 13:07:51 -0000 Mailing-List: contact commits-help@nifi.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@nifi.apache.org Delivered-To: mailing list commits@nifi.apache.org Received: (qmail 8961 invoked by uid 99); 18 May 2017 13:07:51 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 18 May 2017 13:07:51 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 39136E0210; Thu, 18 May 2017 13:07:51 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: aldrin@apache.org To: commits@nifi.apache.org Date: Thu, 18 May 2017 13:07:53 -0000 Message-Id: In-Reply-To: <889868c7d56b4488a18ea88deb417056@git.apache.org> References: <889868c7d56b4488a18ea88deb417056@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [3/5] nifi-minifi-cpp git commit: MINIFI-226: Add controller services capabilities along with unit tests Fix test failures Update Travis YML Update readme to link to MiNiFi licensing information archived-at: Thu, 18 May 2017 13:07:55 -0000 http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/src/controllers/SSLContextService.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/controllers/SSLContextService.cpp b/libminifi/src/controllers/SSLContextService.cpp new file mode 100644 index 0000000..51a7cc4 --- /dev/null +++ b/libminifi/src/controllers/SSLContextService.cpp @@ -0,0 +1,226 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "controllers/SSLContextService.h" +#include +#include +#include +#include +#include +#include "core/Property.h" +#include "io/validation.h" +#include "properties/Configure.h" +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace controllers { + +void SSLContextService::initialize() { + if (initialized_) + return; + + std::lock_guard lock(initialization_mutex_); + + ControllerService::initialize(); + + initializeTLS(); +} + +std::unique_ptr SSLContextService::createSSLContext() { + SSL_library_init(); + const SSL_METHOD *method; + + OpenSSL_add_all_algorithms(); + SSL_load_error_strings(); + method = TLSv1_2_client_method(); + SSL_CTX *ctx = SSL_CTX_new(method); + + if (SSL_CTX_use_certificate_file(ctx, certificate.c_str(), SSL_FILETYPE_PEM) + <= 0) { + logger_->log_error("Could not create load certificate, error : %s", + std::strerror(errno)); + return nullptr; + } + if (!IsNullOrEmpty(passphrase_)) { + SSL_CTX_set_default_passwd_cb_userdata(ctx, &passphrase_); + SSL_CTX_set_default_passwd_cb(ctx, pemPassWordCb); + } + + int retp = SSL_CTX_use_PrivateKey_file(ctx, private_key_.c_str(), + SSL_FILETYPE_PEM); + if (retp != 1) { + logger_->log_error("Could not create load private key,%i on %s error : %s", + retp, private_key_, std::strerror(errno)); + return nullptr; + } + + if (!SSL_CTX_check_private_key(ctx)) { + logger_->log_error( + "Private key does not match the public certificate, error : %s", + std::strerror(errno)); + return nullptr; + } + + retp = SSL_CTX_load_verify_locations(ctx, ca_certificate_.c_str(), 0); + if (retp == 0) { + logger_->log_error("Can not load CA certificate, Exiting, error : %s", + std::strerror(errno)); + } + return std::unique_ptr(new SSLContext(ctx)); +} + +const std::string &SSLContextService::getCertificateFile() { + std::lock_guard lock(initialization_mutex_); + return certificate; +} + +const std::string &SSLContextService::getPassphrase() { + std::lock_guard lock(initialization_mutex_); + return passphrase_; +} + +const std::string &SSLContextService::getPassphraseFile() { + std::lock_guard lock(initialization_mutex_); + return passphrase_file_; +} + +const std::string &SSLContextService::getPrivateKeyFile() { + std::lock_guard lock(initialization_mutex_); + return private_key_; +} + +const std::string &SSLContextService::getCACertificate() { + std::lock_guard lock(initialization_mutex_); + return ca_certificate_; +} + +void SSLContextService::onEnable() { + valid_ = true; + core::Property property("Client Certificate", "Client Certificate"); + core::Property privKey("Private Key", "Private Key file"); + core::Property passphrase_prop( + "Passphrase", "Client passphrase. Either a file or unencrypted text"); + core::Property caCert("CA Certificate", "CA certificate file"); + std::string default_dir; + if (nullptr != configuration_) + configuration_->get(Configure::nifi_default_directory, default_dir); + + logger_->log_trace("onEnable()"); + + if (getProperty(property.getName(), certificate) + && getProperty(privKey.getName(), private_key_)) { + logger_->log_error( + "Certificate and Private Key PEM file not configured, error: %s.", + std::strerror(errno)); + + std::ifstream cert_file(certificate); + std::ifstream priv_file(private_key_); + if (!cert_file.good()) { + logger_->log_info("%s not good", certificate); + std::string test_cert = default_dir + certificate; + std::ifstream cert_file_test(test_cert); + if (cert_file_test.good()) { + certificate = test_cert; + logger_->log_debug("%s now good", certificate); + } else { + logger_->log_debug("%s still not good", test_cert); + valid_ = false; + } + cert_file_test.close(); + } + + if (!priv_file.good()) { + std::string test_priv = default_dir + private_key_; + std::ifstream private_file_test(test_priv); + if (private_file_test.good()) { + private_key_ = test_priv; + } else { + valid_ = false; + } + private_file_test.close(); + } + cert_file.close(); + priv_file.close(); + + } else { + logger_->log_debug("Certificate empty"); + } + if (!getProperty(passphrase_prop.getName(), passphrase_)) { + logger_->log_debug("No pass phrase for %s", certificate); + } else { + std::ifstream passphrase_file(passphrase_); + if (passphrase_file.good()) { + passphrase_file_ = passphrase_; + // we should read it from the file + passphrase_.assign((std::istreambuf_iterator(passphrase_file)), + std::istreambuf_iterator()); + } else { + std::string test_passphrase = default_dir + passphrase_; + std::ifstream passphrase_file_test(test_passphrase); + if (passphrase_file_test.good()) { + passphrase_ = test_passphrase; + passphrase_file_ = test_passphrase; + passphrase_.assign( + (std::istreambuf_iterator(passphrase_file_test)), + std::istreambuf_iterator()); + } else { + valid_ = false; + } + passphrase_file_test.close(); + } + passphrase_file.close(); + } + // load CA certificates + if (!getProperty(caCert.getName(), ca_certificate_)) { + logger_->log_error("Can not load CA certificate."); + } else { + std::ifstream cert_file(ca_certificate_); + if (!cert_file.good()) { + std::string test_ca_cert = default_dir + ca_certificate_; + std::ifstream ca_cert_file_file_test(test_ca_cert); + if (ca_cert_file_file_test.good()) { + ca_certificate_ = test_ca_cert; + } else { + valid_ = false; + } + ca_cert_file_file_test.close(); + } + cert_file.close(); + } +} + +void SSLContextService::initializeTLS() { + core::Property property("Client Certificate", "Client Certificate"); + core::Property privKey("Private Key", "Private Key file"); + core::Property passphrase_prop( + "Passphrase", "Client passphrase. Either a file or unencrypted text"); + core::Property caCert("CA Certificate", "CA certificate file"); + std::set supportedProperties; + supportedProperties.insert(property); + supportedProperties.insert(privKey); + supportedProperties.insert(passphrase_prop); + supportedProperties.insert(caCert); + setSupportedProperties(supportedProperties); +} + +} /* namespace controllers */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/src/core/ClassLoader.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/ClassLoader.cpp b/libminifi/src/core/ClassLoader.cpp new file mode 100644 index 0000000..77c4a85 --- /dev/null +++ b/libminifi/src/core/ClassLoader.cpp @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "core/ClassLoader.h" +#include +#include +#include + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { + +ClassLoader &ClassLoader::getDefaultClassLoader() { + static ClassLoader ret; + // populate ret + return ret; +} +uint16_t ClassLoader::registerResource(const std::string &resource) { + void* resource_ptr = dlopen(resource.c_str(), RTLD_LAZY); + if (!resource_ptr) { + logger_->log_error("Cannot load library: %s", dlerror()); + return RESOURCE_FAILURE; + } else { + std::lock_guard lock(internal_mutex_); + dl_handles_.push_back(resource_ptr); + } + + // reset errors + dlerror(); + + // load the symbols + createFactory* create_factory_func = reinterpret_cast(dlsym( + resource_ptr, "createFactory")); + const char* dlsym_error = dlerror(); + if (dlsym_error) { + logger_->log_error("Cannot load library: %s", dlsym_error); + return RESOURCE_FAILURE; + } + + ObjectFactory *factory = create_factory_func(); + + std::lock_guard lock(internal_mutex_); + + loaded_factories_[factory->getClassName()] = std::unique_ptr( + factory); + + return RESOURCE_SUCCESS; +} + +} /* namespace core */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/src/core/ConfigurableComponent.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/ConfigurableComponent.cpp b/libminifi/src/core/ConfigurableComponent.cpp index fa5ff7d..cf2089e 100644 --- a/libminifi/src/core/ConfigurableComponent.cpp +++ b/libminifi/src/core/ConfigurableComponent.cpp @@ -43,6 +43,20 @@ ConfigurableComponent::ConfigurableComponent( ConfigurableComponent::~ConfigurableComponent() { } +bool ConfigurableComponent::getProperty(const std::string &name, + Property &prop) { + std::lock_guard lock(configuration_mutex_); + + auto &&it = properties_.find(name); + + if (it != properties_.end()) { + prop = it->second; + return true; + } else { + return false; + } +} + /** * Get property using the provided name. * @param name property name. @@ -54,12 +68,11 @@ bool ConfigurableComponent::getProperty(const std::string name, std::lock_guard lock(configuration_mutex_); auto &&it = properties_.find(name); - if (it != properties_.end()) { Property item = it->second; value = item.getValue(); - my_logger_->log_info("Processor %s property name %s value %s", name.c_str(), - item.getName().c_str(), value.c_str()); + my_logger_->log_info("Processor %s property name %s value %s", name, + item.getName(), value); return true; } else { return false; @@ -92,6 +105,29 @@ bool ConfigurableComponent::setProperty(const std::string name, * Sets the property using the provided name * @param property name * @param value property value. + * @return result of setting property. + */ +bool ConfigurableComponent::updateProperty(const std::string &name, + const std::string &value) { + std::lock_guard lock(configuration_mutex_); + auto &&it = properties_.find(name); + + if (it != properties_.end()) { + Property item = it->second; + item.addValue(value); + properties_[item.getName()] = item; + my_logger_->log_info("Component %s property name %s value %s", name.c_str(), + item.getName().c_str(), value.c_str()); + return true; + } else { + return false; + } +} + +/** + * Sets the property using the provided name + * @param property name + * @param value property value. * @return whether property was set or not */ bool ConfigurableComponent::setProperty(Property &prop, std::string value) { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/src/core/ConfigurationFactory.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/ConfigurationFactory.cpp b/libminifi/src/core/ConfigurationFactory.cpp index 4ccead2..6aa42e3 100644 --- a/libminifi/src/core/ConfigurationFactory.cpp +++ b/libminifi/src/core/ConfigurationFactory.cpp @@ -21,6 +21,7 @@ #include #include #include +#include "core/Core.h" #include "core/ConfigurationFactory.h" #include "core/FlowConfiguration.h" #include "io/StreamFactory.h" @@ -53,17 +54,20 @@ std::unique_ptr createFlowConfiguration( if (class_name_lc == "flowconfiguration") { // load the base configuration. return std::unique_ptr( - new core::FlowConfiguration(repo, flow_file_repo, stream_factory, path)); + new core::FlowConfiguration(repo, flow_file_repo, stream_factory, + configure, path)); } else if (class_name_lc == "yamlconfiguration") { // only load if the class is defined. return std::unique_ptr( - instantiate(repo, flow_file_repo, stream_factory, path)); + instantiate(repo, flow_file_repo, + stream_factory, configure, path)); } else { if (fail_safe) { return std::unique_ptr( - new core::FlowConfiguration(repo, flow_file_repo, stream_factory, path)); + new core::FlowConfiguration(repo, flow_file_repo, stream_factory, + configure, path)); } else { throw std::runtime_error( "Support for the provided configuration class could not be found"); @@ -72,7 +76,8 @@ std::unique_ptr createFlowConfiguration( } catch (const std::runtime_error &r) { if (fail_safe) { return std::unique_ptr( - new core::FlowConfiguration(repo, flow_file_repo, stream_factory, path)); + new core::FlowConfiguration(repo, flow_file_repo, stream_factory, + configure, path)); } } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/src/core/Connectable.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/Connectable.cpp b/libminifi/src/core/Connectable.cpp index e234a06..58ae6d4 100644 --- a/libminifi/src/core/Connectable.cpp +++ b/libminifi/src/core/Connectable.cpp @@ -148,9 +148,9 @@ std::set> Connectable::getOutGoingConnections( std::string relationship) { std::set> empty; - auto &&it = _outGoingConnections.find(relationship); - if (it != _outGoingConnections.end()) { - return _outGoingConnections[relationship]; + auto &&it = out_going_connections_.find(relationship); + if (it != out_going_connections_.end()) { + return out_going_connections_[relationship]; } else { return empty; } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/src/core/FlowConfiguration.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/FlowConfiguration.cpp b/libminifi/src/core/FlowConfiguration.cpp index 90058d2..1ed9176 100644 --- a/libminifi/src/core/FlowConfiguration.cpp +++ b/libminifi/src/core/FlowConfiguration.cpp @@ -19,6 +19,7 @@ #include "core/FlowConfiguration.h" #include #include +#include "core/ClassLoader.h" namespace org { namespace apache { @@ -31,54 +32,12 @@ FlowConfiguration::~FlowConfiguration() { std::shared_ptr FlowConfiguration::createProcessor( std::string name, uuid_t uuid) { - std::shared_ptr processor = nullptr; - if (name - == org::apache::nifi::minifi::processors::GenerateFlowFile::ProcessorName) { - processor = std::make_shared< - org::apache::nifi::minifi::processors::GenerateFlowFile>(name, uuid); - } else if (name - == org::apache::nifi::minifi::processors::LogAttribute::ProcessorName) { - processor = std::make_shared< - org::apache::nifi::minifi::processors::LogAttribute>(name, uuid); - } else if (name - == org::apache::nifi::minifi::processors::GetFile::ProcessorName) { - processor = - std::make_shared(name, - uuid); - } else if (name - == org::apache::nifi::minifi::processors::PutFile::ProcessorName) { - processor = - std::make_shared(name, - uuid); - } else if (name - == org::apache::nifi::minifi::processors::TailFile::ProcessorName) { - processor = - std::make_shared(name, - uuid); - } else if (name - == org::apache::nifi::minifi::processors::ListenSyslog::ProcessorName) { - processor = std::make_shared< - org::apache::nifi::minifi::processors::ListenSyslog>(name, uuid); - } else if (name - == org::apache::nifi::minifi::processors::ListenHTTP::ProcessorName) { - processor = std::make_shared< - org::apache::nifi::minifi::processors::ListenHTTP>(name, uuid); - } else if (name - == org::apache::nifi::minifi::processors::InvokeHTTP::ProcessorName) { - processor = std::make_shared< - org::apache::nifi::minifi::processors::InvokeHTTP>(name, uuid); - } else if (name - == org::apache::nifi::minifi::processors::ExecuteProcess::ProcessorName) { - processor = std::make_shared< - org::apache::nifi::minifi::processors::ExecuteProcess>(name, uuid); - } else if (name - == org::apache::nifi::minifi::processors::AppendHostInfo::ProcessorName) { - processor = std::make_shared< - org::apache::nifi::minifi::processors::AppendHostInfo>(name, uuid); - } else { + auto ptr = core::ClassLoader::getDefaultClassLoader().instantiate(name, uuid); + if (nullptr == ptr) { logger_->log_error("No Processor defined for %s", name.c_str()); - return nullptr; } + std::shared_ptr processor = std::static_pointer_cast< + core::Processor>(ptr); // initialize the processor processor->initialize(); @@ -89,8 +48,10 @@ std::shared_ptr FlowConfiguration::createProcessor( std::shared_ptr FlowConfiguration::createProvenanceReportTask() { std::shared_ptr processor = nullptr; - processor = std::make_shared< - org::apache::nifi::minifi::core::reporting::SiteToSiteProvenanceReportingTask>(stream_factory_); + processor = + std::make_shared< + org::apache::nifi::minifi::core::reporting::SiteToSiteProvenanceReportingTask>( + stream_factory_); // initialize the processor processor->initialize(); @@ -114,6 +75,15 @@ std::shared_ptr FlowConfiguration::createConnection( return std::make_shared(flow_file_repo_, name, uuid); } +std::shared_ptr FlowConfiguration::createControllerService( + const std::string &class_name, const std::string &name, uuid_t uuid) { + std::shared_ptr controllerServicesNode = + service_provider_->createControllerService(class_name, name, true); + if (nullptr != controllerServicesNode) + controllerServicesNode->setUUID(uuid); + return controllerServicesNode; +} + } /* namespace core */ } /* namespace minifi */ } /* namespace nifi */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/src/core/ProcessGroup.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/ProcessGroup.cpp b/libminifi/src/core/ProcessGroup.cpp index 1a6e729..1b8ec3a 100644 --- a/libminifi/src/core/ProcessGroup.cpp +++ b/libminifi/src/core/ProcessGroup.cpp @@ -202,6 +202,22 @@ std::shared_ptr ProcessGroup::findProcessor(uuid_t uuid) { return ret; } +void ProcessGroup::addControllerService( + const std::string &nodeId, + std::shared_ptr &node) { + controller_service_map_.put(nodeId, node); +} + +/** + * Find controllerservice node will search child groups until the nodeId is found. + * @param node node identifier + * @return controller service node, if it exists. + */ +std::shared_ptr ProcessGroup::findControllerService( + const std::string &nodeId) { + return controller_service_map_.getControllerServiceNode(nodeId); +} + std::shared_ptr ProcessGroup::findProcessor( const std::string &processorName) { std::shared_ptr ret = NULL; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/src/core/Processor.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/Processor.cpp b/libminifi/src/core/Processor.cpp index e124992..7464af2 100644 --- a/libminifi/src/core/Processor.cpp +++ b/libminifi/src/core/Processor.cpp @@ -114,15 +114,15 @@ bool Processor::addConnection(std::shared_ptr conn) { if (my_uuid == source_uuid) { std::string relationship = connection->getRelationship().getName(); // Connection is source from the current processor - auto &&it = _outGoingConnections.find(relationship); - if (it != _outGoingConnections.end()) { + auto &&it = out_going_connections_.find(relationship); + if (it != out_going_connections_.end()) { // We already has connection for this relationship std::set> existedConnection = it->second; if (existedConnection.find(connection) == existedConnection.end()) { // We do not have the same connection for this relationship yet existedConnection.insert(connection); connection->setSource(shared_from_this()); - _outGoingConnections[relationship] = existedConnection; + out_going_connections_[relationship] = existedConnection; logger_->log_info( "Add connection %s into Processor %s outgoing connection for relationship %s", connection->getName().c_str(), name_.c_str(), relationship.c_str()); @@ -133,7 +133,7 @@ bool Processor::addConnection(std::shared_ptr conn) { std::set> newConnection; newConnection.insert(connection); connection->setSource(shared_from_this()); - _outGoingConnections[relationship] = newConnection; + out_going_connections_[relationship] = newConnection; logger_->log_info( "Add connection %s into Processor %s outgoing connection for relationship %s", connection->getName().c_str(), name_.c_str(), relationship.c_str()); @@ -178,13 +178,13 @@ void Processor::removeConnection(std::shared_ptr conn) { if (uuid_compare(uuid_, srcUUID) == 0) { std::string relationship = connection->getRelationship().getName(); // Connection is source from the current processor - auto &&it = _outGoingConnections.find(relationship); - if (it == _outGoingConnections.end()) { + auto &&it = out_going_connections_.find(relationship); + if (it == out_going_connections_.end()) { return; } else { - if (_outGoingConnections[relationship].find(connection) - != _outGoingConnections[relationship].end()) { - _outGoingConnections[relationship].erase(connection); + if (out_going_connections_[relationship].find(connection) + != out_going_connections_[relationship].end()) { + out_going_connections_[relationship].erase(connection); connection->setSource(NULL); logger_->log_info( "Remove connection %s into Processor %s outgoing connection for relationship %s", @@ -259,7 +259,7 @@ bool Processor::flowFilesQueued() { bool Processor::flowFilesOutGoingFull() { std::lock_guard lock(mutex_); - for (auto &&connection : _outGoingConnections) { + for (auto &&connection : out_going_connections_) { // We already has connection for this relationship std::set> existedConnection = connection.second; for (const auto conn : existedConnection) { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/src/core/Property.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/Property.cpp b/libminifi/src/core/Property.cpp index aa002af..dd3a9cb 100644 --- a/libminifi/src/core/Property.cpp +++ b/libminifi/src/core/Property.cpp @@ -18,6 +18,7 @@ #include "core/Property.h" #include +#include namespace org { namespace apache { namespace nifi { @@ -34,11 +35,27 @@ std::string Property::getDescription() { } // Get value for the property std::string Property::getValue() const { - return value_; + if (!values_.empty()) + return values_.front(); + else + return ""; +} + +std::vector &Property::getValues() { + return values_; } // Set value for the property void Property::setValue(std::string value) { - value_ = value; + if (!isCollection) { + values_.clear(); + values_.push_back(std::string(value.c_str())); + } else { + values_.push_back(std::string(value.c_str())); + } +} + +void Property::addValue(const std::string &value) { + values_.push_back(std::string(value.c_str())); } // Compare bool Property::operator <(const Property & right) const { @@ -47,7 +64,8 @@ bool Property::operator <(const Property & right) const { const Property &Property::operator=(const Property &other) { name_ = other.name_; - value_ = other.value_; + values_ = other.values_; + isCollection = other.isCollection; return *this; } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/src/core/controller/ControllerServiceNode.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/controller/ControllerServiceNode.cpp b/libminifi/src/core/controller/ControllerServiceNode.cpp new file mode 100644 index 0000000..12e3653 --- /dev/null +++ b/libminifi/src/core/controller/ControllerServiceNode.cpp @@ -0,0 +1,49 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "core/controller/ControllerServiceNode.h" +#include +#include + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { +namespace controller { + +std::shared_ptr &ControllerServiceNode::getControllerServiceImplementation() { + return controller_service_; +} + +std::vector > &ControllerServiceNode::getLinkedControllerServices() { + return linked_controller_services_; +} + +std::vector > &ControllerServiceNode::getLinkedComponents() { + return linked_components_; +} + + + +} /* namespace controller */ +} /* namespace core */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/src/core/controller/ControllerServiceProvider.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/controller/ControllerServiceProvider.cpp b/libminifi/src/core/controller/ControllerServiceProvider.cpp new file mode 100644 index 0000000..da5c6a1 --- /dev/null +++ b/libminifi/src/core/controller/ControllerServiceProvider.cpp @@ -0,0 +1,50 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "core/controller/ControllerServiceProvider.h" +#include +#include + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { +namespace controller { + +/** + * @param identifier of controller service + * @return the ControllerService that is registered with the given + * identifier + */ +std::shared_ptr ControllerServiceProvider::getControllerService( + const std::string &identifier) { + auto service = controller_map_->getControllerServiceNode(identifier); + if (service != nullptr) { + return service->getControllerServiceImplementation(); + } else { + return nullptr; + } +} + +} /* namespace controller */ +} /* namespace core */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/src/core/controller/StandardControllerServiceNode.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/controller/StandardControllerServiceNode.cpp b/libminifi/src/core/controller/StandardControllerServiceNode.cpp new file mode 100644 index 0000000..26804f6 --- /dev/null +++ b/libminifi/src/core/controller/StandardControllerServiceNode.cpp @@ -0,0 +1,69 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "core/controller/StandardControllerServiceNode.h" +#include +#include + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { +namespace controller { +std::shared_ptr &StandardControllerServiceNode::getProcessGroup() { + std::lock_guard lock(mutex_); + return process_group_; +} + +void StandardControllerServiceNode::setProcessGroup( + std::shared_ptr &processGroup) { + std::lock_guard lock(mutex_); + process_group_ = processGroup; +} + +bool StandardControllerServiceNode::enable() { + Property property("Linked Services", "Referenced Controller Services"); + controller_service_->setState(ENABLED); + logger_->log_trace("Enabling CSN %s", getName()); + if (getProperty(property.getName(), property)) { + active = true; + for (auto linked_service : property.getValues()) { + std::shared_ptr csNode = provider + ->getControllerServiceNode(linked_service); + if (nullptr != csNode) { + std::lock_guard lock(mutex_); + linked_controller_services_.push_back(csNode); + } + } + } else { + } + std::shared_ptr impl = + getControllerServiceImplementation(); + if (nullptr != impl) { + impl->onEnable(); + } + return true; +} + +} /* namespace controller */ +} /* namespace core */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/src/core/yaml/YamlConfiguration.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/yaml/YamlConfiguration.cpp b/libminifi/src/core/yaml/YamlConfiguration.cpp index b7c8246..c2c4950 100644 --- a/libminifi/src/core/yaml/YamlConfiguration.cpp +++ b/libminifi/src/core/yaml/YamlConfiguration.cpp @@ -21,6 +21,8 @@ #include #include #include +#include "core/reporting/SiteToSiteProvenanceReportingTask.h" +#include "io/validation.h" namespace org { namespace apache { namespace nifi { @@ -31,13 +33,14 @@ core::ProcessGroup *YamlConfiguration::parseRootProcessGroupYaml( YAML::Node rootFlowNode) { uuid_t uuid; - checkRequiredField(&rootFlowNode, "name", CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY); + checkRequiredField(&rootFlowNode, "name", + CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY); std::string flowName = rootFlowNode["name"].as(); std::string id = getOrGenerateId(&rootFlowNode); uuid_parse(id.c_str(), uuid); - logger_->log_debug( - "parseRootProcessGroup: id => [%s], name => [%s]", id, flowName); + logger_->log_debug("parseRootProcessGroup: id => [%s], name => [%s]", id, + flowName); std::unique_ptr group = FlowConfiguration::createRootProcessGroup(flowName, uuid); @@ -172,13 +175,16 @@ void YamlConfiguration::parseProcessorNodeYaml( if (procCfg.schedulingStrategy == "TIMER_DRIVEN") { processor->setSchedulingStrategy(core::TIMER_DRIVEN); - logger_->log_debug("setting scheduling strategy as %s", procCfg.schedulingStrategy); + logger_->log_debug("setting scheduling strategy as %s", + procCfg.schedulingStrategy); } else if (procCfg.schedulingStrategy == "EVENT_DRIVEN") { processor->setSchedulingStrategy(core::EVENT_DRIVEN); - logger_->log_debug("setting scheduling strategy as %s", procCfg.schedulingStrategy); + logger_->log_debug("setting scheduling strategy as %s", + procCfg.schedulingStrategy); } else { processor->setSchedulingStrategy(core::CRON_DRIVEN); - logger_->log_debug("setting scheduling strategy as %s", procCfg.schedulingStrategy); + logger_->log_debug("setting scheduling strategy as %s", + procCfg.schedulingStrategy); } int64_t maxConcurrentTasks; @@ -203,11 +209,14 @@ void YamlConfiguration::parseProcessorNodeYaml( parentGroup->addProcessor(processor); } + } else { + throw new std::invalid_argument( + "Cannot instantiate a MiNiFi instance without a defined Processors configuration node."); } } else { throw new std::invalid_argument( "Cannot instantiate a MiNiFi instance without a defined " - "Processors configuration node."); + "Processors configuration node."); } } @@ -323,14 +332,16 @@ void YamlConfiguration::parseProvenanceReportingYaml( std::shared_ptr processor = nullptr; processor = createProvenanceReportTask(); std::shared_ptr reportTask = - std::static_pointer_cast < core::reporting::SiteToSiteProvenanceReportingTask - > (processor); + std::static_pointer_cast< + core::reporting::SiteToSiteProvenanceReportingTask>(processor); YAML::Node node = reportNode->as(); - checkRequiredField(&node, "scheduling strategy", CONFIG_YAML_PROVENANCE_REPORT_KEY); + checkRequiredField(&node, "scheduling strategy", + CONFIG_YAML_PROVENANCE_REPORT_KEY); auto schedulingStrategyStr = node["scheduling strategy"].as(); - checkRequiredField(&node, "scheduling period", CONFIG_YAML_PROVENANCE_REPORT_KEY); + checkRequiredField(&node, "scheduling period", + CONFIG_YAML_PROVENANCE_REPORT_KEY); auto schedulingPeriodStr = node["scheduling period"].as(); checkRequiredField(&node, "host", CONFIG_YAML_PROVENANCE_REPORT_KEY); auto hostStr = node["host"].as(); @@ -346,21 +357,21 @@ void YamlConfiguration::parseProvenanceReportingYaml( processor->setScheduledState(core::RUNNING); core::TimeUnit unit; - if (core::Property::StringToTime(schedulingPeriodStr, schedulingPeriod, unit) && - core::Property::ConvertTimeUnitToNS(schedulingPeriod, unit, schedulingPeriod)) { - logger_->log_debug( - "ProvenanceReportingTask schedulingPeriod %d ns", - schedulingPeriod); + if (core::Property::StringToTime(schedulingPeriodStr, schedulingPeriod, unit) + && core::Property::ConvertTimeUnitToNS(schedulingPeriod, unit, + schedulingPeriod)) { + logger_->log_debug("ProvenanceReportingTask schedulingPeriod %d ns", + schedulingPeriod); processor->setSchedulingPeriodNano(schedulingPeriod); } if (schedulingStrategyStr == "TIMER_DRIVEN") { - processor->setSchedulingStrategy(core::TIMER_DRIVEN); - logger_->log_debug( - "ProvenanceReportingTask scheduling strategy %s", schedulingStrategyStr); + processor->setSchedulingStrategy(core::TIMER_DRIVEN); + logger_->log_debug("ProvenanceReportingTask scheduling strategy %s", + schedulingStrategyStr); } else { throw std::invalid_argument( - "Invalid scheduling strategy " + schedulingStrategyStr); + "Invalid scheduling strategy " + schedulingStrategyStr); } reportTask->setHost(hostStr); @@ -370,6 +381,7 @@ void YamlConfiguration::parseProvenanceReportingYaml( logger_->log_debug("ProvenanceReportingTask port %d", (uint16_t) lvalue); reportTask->setPort((uint16_t) lvalue); } + logger_->log_debug("ProvenanceReportingTask port uuid %s", portUUIDStr); uuid_parse(portUUIDStr.c_str(), port_uuid); reportTask->setPortUUID(port_uuid); @@ -378,9 +390,62 @@ void YamlConfiguration::parseProvenanceReportingYaml( } } -void YamlConfiguration::parseConnectionYaml( - YAML::Node *connectionsNode, - core::ProcessGroup *parent) { +void YamlConfiguration::parseControllerServices( + YAML::Node *controllerServicesNode) { + if (!IsNullOrEmpty(controllerServicesNode)) { + if (controllerServicesNode->IsSequence()) { + for (auto iter : *controllerServicesNode) { + YAML::Node controllerServiceNode = iter.as(); + try { + checkRequiredField(&controllerServiceNode, "name", + CONFIG_YAML_CONTROLLER_SERVICES_KEY); + checkRequiredField(&controllerServiceNode, "id", + CONFIG_YAML_CONTROLLER_SERVICES_KEY); + checkRequiredField(&controllerServiceNode, "class", + CONFIG_YAML_CONTROLLER_SERVICES_KEY); + + auto name = controllerServiceNode["name"].as(); + auto id = controllerServiceNode["id"].as(); + auto type = controllerServiceNode["class"].as(); + + uuid_t uuid; + uuid_parse(id.c_str(), uuid); + auto controller_service_node = createControllerService(type, name, + uuid); + if (nullptr != controller_service_node) { + logger_->log_debug( + "Created Controller Service with UUID %s and name %s", id, + name); + controller_service_node->initialize(); + YAML::Node propertiesNode = controllerServiceNode["Properties"]; + // we should propogate propertiets to the node and to the implementation + parsePropertiesNodeYaml( + &propertiesNode, + std::static_pointer_cast( + controller_service_node)); + if (controller_service_node->getControllerServiceImplementation() + != nullptr) { + parsePropertiesNodeYaml( + &propertiesNode, + std::static_pointer_cast( + controller_service_node + ->getControllerServiceImplementation())); + } + } + controller_services_->put(id, controller_service_node); + controller_services_->put(name, controller_service_node); + } catch (YAML::InvalidNode &in) { + throw Exception( + ExceptionType::GENERAL_EXCEPTION, + "Name, id, and class must be specified for controller services"); + } + } + } + } +} + +void YamlConfiguration::parseConnectionYaml(YAML::Node *connectionsNode, + core::ProcessGroup *parent) { if (!parent) { logger_->log_error("parseProcessNode: no parent group was provided"); return; @@ -400,15 +465,15 @@ void YamlConfiguration::parseConnectionYaml( std::string id = getOrGenerateId(&connectionNode); uuid_parse(id.c_str(), uuid); connection = this->createConnection(name, uuid); - logger_->log_debug( - "Created connection with UUID %s and name %s", id, name); - + logger_->log_debug("Created connection with UUID %s and name %s", id, + name); // Configure connection source checkRequiredField(&connectionNode, "source relationship name", CONFIG_YAML_CONNECTIONS_KEY); auto rawRelationship = connectionNode["source relationship name"].as(); core::Relationship relationship(rawRelationship, ""); - logger_->log_debug("parseConnection: relationship => [%s]", rawRelationship); + logger_->log_debug("parseConnection: relationship => [%s]", + rawRelationship); if (connection) { connection->setRelationship(relationship); } @@ -462,7 +527,7 @@ void YamlConfiguration::parseConnectionYaml( std::string connectionDestProcName = connectionNode["destination name"].as(); uuid_t tmpUUID; if (!uuid_parse(connectionDestProcName.c_str(), tmpUUID) && - NULL != parent->findProcessor(tmpUUID)) { + NULL != parent->findProcessor(tmpUUID)) { // the destination name is a remote port id, so use that as the dest id uuid_copy(destUUID, tmpUUID); logger_->log_debug("Using 'destination name' containing a remote port id to match the destination for " @@ -508,13 +573,18 @@ void YamlConfiguration::parsePortYaml(YAML::Node *portNode, YAML::Node inputPortsObj = portNode->as(); // Check for required fields - checkRequiredField(&inputPortsObj, "name", CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY); + checkRequiredField(&inputPortsObj, "name", + CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY); auto nameStr = inputPortsObj["name"].as(); - checkRequiredField(&inputPortsObj, "id", "The field 'id' is required for " - "the port named '" + nameStr + "' in the YAML Config. If this port " - "is an input port for a NiFi Remote Process Group, the port " - "id should match the corresponding id specified in the NiFi configuration. " - "This is a UUID of the format XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX."); + checkRequiredField( + &inputPortsObj, + "id", + "The field 'id' is required for " + "the port named '" + nameStr + + "' in the YAML Config. If this port " + "is an input port for a NiFi Remote Process Group, the port " + "id should match the corresponding id specified in the NiFi configuration. " + "This is a UUID of the format XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX."); auto portId = inputPortsObj["id"].as(); uuid_parse(portId.c_str(), uuid); @@ -530,7 +600,9 @@ void YamlConfiguration::parsePortYaml(YAML::Node *portNode, // handle port properties YAML::Node nodeVal = portNode->as(); YAML::Node propertiesNode = nodeVal["Properties"]; - parsePropertiesNodeYaml(&propertiesNode, processor); + parsePropertiesNodeYaml( + &propertiesNode, + std::static_pointer_cast(processor)); // add processor to parent parent->addProcessor(processor); @@ -548,28 +620,51 @@ void YamlConfiguration::parsePortYaml(YAML::Node *portNode, } void YamlConfiguration::parsePropertiesNodeYaml( - YAML::Node *propertiesNode, std::shared_ptr processor) { + YAML::Node *propertiesNode, + std::shared_ptr processor) { // Treat generically as a YAML node so we can perform inspection on entries to ensure they are populated for (YAML::const_iterator propsIter = propertiesNode->begin(); propsIter != propertiesNode->end(); ++propsIter) { std::string propertyName = propsIter->first.as(); YAML::Node propertyValueNode = propsIter->second; if (!propertyValueNode.IsNull() && propertyValueNode.IsDefined()) { - std::string rawValueString = propertyValueNode.as(); - if (!processor->setProperty(propertyName, rawValueString)) { - logger_->log_warn( - "Received property %s with value %s but it is not one of the properties for %s", - propertyName, - rawValueString, - processor->getName()); + if (propertyValueNode.IsSequence()) { + for (auto iter : propertyValueNode) { + if (iter.IsDefined()) { + YAML::Node nodeVal = iter.as(); + YAML::Node propertiesNode = nodeVal["value"]; + // must insert the sequence in differently. + std::string rawValueString = propertiesNode.as(); + logger_->log_info("Found %s=%s", propertyName, rawValueString); + if (!processor->updateProperty(propertyName, rawValueString)) { + std::shared_ptr proc = + std::dynamic_pointer_cast(processor); + if (proc != 0) { + logger_->log_warn( + "Received property %s with value %s but is not one of the properties for %s", + propertyName, rawValueString, proc->getName()); + } + } + } + } + } else { + std::string rawValueString = propertyValueNode.as(); + if (!processor->setProperty(propertyName, rawValueString)) { + std::shared_ptr proc = std::dynamic_pointer_cast< + core::Connectable>(processor); + if (proc != 0) { + logger_->log_warn( + "Received property %s with value %s but is not one of the properties for %s", + propertyName, rawValueString, proc->getName()); + } + } } } } } -std::string YamlConfiguration::getOrGenerateId( - YAML::Node *yamlNode, - const std::string &idField) { +std::string YamlConfiguration::getOrGenerateId(YAML::Node *yamlNode, + const std::string &idField) { std::string id; YAML::Node node = yamlNode->as(); @@ -579,7 +674,7 @@ std::string YamlConfiguration::getOrGenerateId( } else { throw std::invalid_argument( "getOrGenerateId: idField is expected to reference YAML::Node " - "of YAML::NodeType::Scalar."); + "of YAML::NodeType::Scalar."); } } else { uuid_t uuid; @@ -592,12 +687,10 @@ std::string YamlConfiguration::getOrGenerateId( return id; } -void YamlConfiguration::checkRequiredField( - YAML::Node *yamlNode, - const std::string &fieldName, - const std::string &yamlSection, - const std::string &errorMessage) { - +void YamlConfiguration::checkRequiredField(YAML::Node *yamlNode, + const std::string &fieldName, + const std::string &yamlSection, + const std::string &errorMessage) { std::string errMsg = errorMessage; if (!yamlNode->as()[fieldName]) { if (errMsg.empty()) { @@ -605,14 +698,13 @@ void YamlConfiguration::checkRequiredField( // invalid YAML config file, using the component name if present errMsg = yamlNode->as()["name"] ? - "Unable to parse configuration file for component named '" + - yamlNode->as()["name"].as() + - "' as required field '" + fieldName + "' is missing" : - "Unable to parse configuration file as required field '" + - fieldName + "' is missing"; + "Unable to parse configuration file for component named '" + + yamlNode->as()["name"].as() + + "' as required field '" + fieldName + "' is missing" : + "Unable to parse configuration file as required field '" + + fieldName + "' is missing"; if (!yamlSection.empty()) { - errMsg += " [in '" + yamlSection + - "' section of configuration file]"; + errMsg += " [in '" + yamlSection + "' section of configuration file]"; } } logger_->log_error(errMsg.c_str()); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/src/processors/GetFile.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/processors/GetFile.cpp b/libminifi/src/processors/GetFile.cpp index dcabb5d..e39afec 100644 --- a/libminifi/src/processors/GetFile.cpp +++ b/libminifi/src/processors/GetFile.cpp @@ -43,6 +43,10 @@ namespace apache { namespace nifi { namespace minifi { namespace processors { + + + + core::Property GetFile::BatchSize( "Batch Size", "The maximum number of files to pull in each iteration", "10"); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/src/processors/InvokeHTTP.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/processors/InvokeHTTP.cpp b/libminifi/src/processors/InvokeHTTP.cpp index 17226a8..295560f 100644 --- a/libminifi/src/processors/InvokeHTTP.cpp +++ b/libminifi/src/processors/InvokeHTTP.cpp @@ -17,6 +17,7 @@ */ #include "processors/InvokeHTTP.h" +#include #include #include #include @@ -42,12 +43,6 @@ #include "ResourceClaim.h" #include "utils/StringUtils.h" -#if (__GNUC__ >= 4) -#if (__GNUC_MINOR__ < 9) -#include -#endif -#endif - namespace org { namespace apache { namespace nifi { @@ -156,7 +151,8 @@ core::Relationship InvokeHTTP::RelFailure( void InvokeHTTP::set_request_method(CURL *curl, const std::string &method) { std::string my_method = method; - std::transform(my_method.begin(), my_method.end(), my_method.begin(), ::toupper); + std::transform(my_method.begin(), my_method.end(), my_method.begin(), + ::toupper); if (my_method == "POST") { curl_easy_setopt(curl, CURLOPT_POST, 1); } else if (my_method == "PUT") { @@ -280,6 +276,17 @@ void InvokeHTTP::onSchedule(core::ProcessContext *context, } utils::StringUtils::StringToBool(penalize_no_retry, penalize_no_retry_); + + std::string context_name; + if (context->getProperty(SSLContext.getName(), context_name) + && !IsNullOrEmpty(context_name)) { + std::shared_ptr service = context + ->getControllerService(context_name); + if (nullptr != service) { + ssl_context_service_ = std::static_pointer_cast< + minifi::controllers::SSLContextService>(service); + } + } } InvokeHTTP::~InvokeHTTP() { @@ -288,9 +295,9 @@ InvokeHTTP::~InvokeHTTP() { inline bool InvokeHTTP::matches(const std::string &value, const std::string &sregex) { -#ifdef __GNUC__ -#if (__GNUC__ >= 4) -#if (__GNUC_MINOR__ < 9) + if (sregex == ".*") + return true; + regex_t regex; int ret = regcomp(®ex, sregex.c_str(), 0); if (ret) @@ -299,24 +306,7 @@ inline bool InvokeHTTP::matches(const std::string &value, regfree(®ex); if (ret) return false; -#else - try { - std::regex re(sregex); - if (!std::regex_match(value, re)) { - return false; - } - } catch (std::regex_error e) { - logger_->log_error("Invalid File Filter regex: %s.", e.what()); - return false; - } -#endif -#endif -#else - logger_->log_info("Cannot support regex filtering"); - if (regex == ".*") - return true; -#endif return true; } @@ -346,6 +336,33 @@ struct curl_slist *InvokeHTTP::build_header_list( } return list; } + +bool InvokeHTTP::isSecure(const std::string &url) { + if (url.find("https") != std::string::npos) { + return true; + } + return false; +} + +CURLcode InvokeHTTP::configure_ssl_context(CURL *curl, void *ctx, void *param) { + minifi::controllers::SSLContextService *ssl_context_service = + static_cast(param); + if (!ssl_context_service->configure_ssl_context(static_cast(ctx))) { + return CURLE_FAILED_INIT; + } + return CURLE_OK; +} + +void InvokeHTTP::configure_secure_connection(CURL *http_session) { + logger_->log_debug("InvokeHTTP -- Using certificate file %s", + ssl_context_service_->getCertificateFile()); + curl_easy_setopt(http_session, CURLOPT_VERBOSE, 1L); + curl_easy_setopt(http_session, CURLOPT_SSL_CTX_FUNCTION, + &InvokeHTTP::configure_ssl_context); + curl_easy_setopt(http_session, CURLOPT_SSL_CTX_DATA, + static_cast(ssl_context_service_.get())); +} + void InvokeHTTP::onTrigger(core::ProcessContext *context, core::ProcessSession *session) { std::shared_ptr flowFile = std::static_pointer_cast< @@ -371,6 +388,10 @@ void InvokeHTTP::onTrigger(core::ProcessContext *context, CURL *http_session = curl_easy_init(); // set the HTTP request method from libCURL set_request_method(http_session, method_); + if (isSecure(url_) && ssl_context_service_ != nullptr) { + configure_secure_connection(http_session); + } + curl_easy_setopt(http_session, CURLOPT_URL, url_.c_str()); if (connect_timeout_ > 0) { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/test/FlowFileRecordTest.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/FlowFileRecordTest.cpp b/libminifi/test/FlowFileRecordTest.cpp deleted file mode 100644 index 09a3d33..0000000 --- a/libminifi/test/FlowFileRecordTest.cpp +++ /dev/null @@ -1,28 +0,0 @@ -/** - * @file MiNiFiMain.cpp - * MiNiFiMain implementation - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include -#include -#include - -#include "FlowFileRecord.h" - -int main(int argc, char **argv) -{ -} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/test/HttpGetIntegrationTest.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/HttpGetIntegrationTest.cpp b/libminifi/test/HttpGetIntegrationTest.cpp deleted file mode 100644 index 90505b4..0000000 --- a/libminifi/test/HttpGetIntegrationTest.cpp +++ /dev/null @@ -1,120 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include "utils/StringUtils.h" -#include "../include/core/Core.h" -#include "../include/core/logging/LogAppenders.h" -#include "../include/core/logging/BaseLogger.h" -#include "../include/core/logging/Logger.h" -#include "../include/core/ProcessGroup.h" -#include "../include/core/yaml/YamlConfiguration.h" -#include "../include/FlowController.h" -#include "../include/properties/Configure.h" -#include "unit/ProvenanceTestHelper.h" -#include "../include/io/StreamFactory.h" - -std::string test_file_location; - -void waitToVerifyProcessor() { - std::this_thread::sleep_for(std::chrono::seconds(10)); -} - -int main(int argc, char **argv) { - - if (argc > 1) { - test_file_location = argv[1]; - } - mkdir("content_repository", S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH); - std::ostringstream oss; - std::unique_ptr outputLogger = std::unique_ptr< - logging::BaseLogger>( - new org::apache::nifi::minifi::core::logging::OutputStreamAppender(oss, - 0)); - std::shared_ptr logger = logging::Logger::getLogger(); - logger->updateLogger(std::move(outputLogger)); - logger->setLogLevel("debug"); - - std::shared_ptr configuration = std::make_shared(); - - std::shared_ptr test_repo = - std::make_shared(); - std::shared_ptr test_flow_repo = std::make_shared< - TestFlowRepository>(); - - configuration->set(minifi::Configure::nifi_flow_configuration_file, - test_file_location); - - std::shared_ptr stream_factory = std::make_shared(configuration); - std::unique_ptr yaml_ptr = std::unique_ptr< - core::YamlConfiguration>( - new core::YamlConfiguration(test_repo, test_repo, stream_factory, test_file_location)); - std::shared_ptr repo = - std::static_pointer_cast(test_repo); - - std::shared_ptr controller = std::make_shared< - minifi::FlowController>(test_repo, test_flow_repo, std::make_shared(), std::move(yaml_ptr), - DEFAULT_ROOT_GROUP_NAME, - true); - - core::YamlConfiguration yaml_config(test_repo, test_repo, stream_factory, test_file_location); - - std::unique_ptr ptr = yaml_config.getRoot( - test_file_location); - std::shared_ptr pg = std::shared_ptr( - ptr.get()); - ptr.release(); - - controller->load(); - controller->start(); - waitToVerifyProcessor(); - - controller->waitUnload(60000); - std::string logs = oss.str(); - assert(logs.find("key:filename value:") != std::string::npos); - assert( - logs.find( - "key:invokehttp.request.url value:https://curl.haxx.se/libcurl/c/httpput.html") - != std::string::npos); - assert(logs.find("Size:8970 Offset:0") != std::string::npos); - assert( - logs.find("key:invokehttp.status.code value:200") != std::string::npos); - std::string stringtofind = "Resource Claim created ./content_repository/"; - - size_t loc = logs.find(stringtofind); - while (loc > 0) { - std::string id = logs.substr(loc + stringtofind.size(), 36); - - loc = logs.find(stringtofind, loc+1); - std::string path = "content_repository/" + id; - unlink(path.c_str()); - - if ( loc == std::string::npos) - break; - } - rmdir("./content_repository"); - return 0; -} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/test/HttpPostIntegrationTest.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/HttpPostIntegrationTest.cpp b/libminifi/test/HttpPostIntegrationTest.cpp deleted file mode 100644 index 73d21e6..0000000 --- a/libminifi/test/HttpPostIntegrationTest.cpp +++ /dev/null @@ -1,120 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include "utils/StringUtils.h" -#include "../include/core/Core.h" -#include "../include/core/logging/LogAppenders.h" -#include "../include/core/logging/BaseLogger.h" -#include "../include/core/logging/Logger.h" -#include "../include/core/ProcessGroup.h" -#include "../include/core/yaml/YamlConfiguration.h" -#include "../include/FlowController.h" -#include "../include/properties/Configure.h" -#include "unit/ProvenanceTestHelper.h" -#include "../include/io/StreamFactory.h" -#include "../include/properties/Configure.h" - -std::string test_file_location; - -void waitToVerifyProcessor() { - std::this_thread::sleep_for(std::chrono::seconds(2)); -} - -int main(int argc, char **argv) { - - if (argc > 1) { - test_file_location = argv[1]; - } - mkdir("/tmp/aljr39/",S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH); - std::ofstream myfile; - myfile.open ("/tmp/aljr39/example.txt"); - myfile << "Hello world" << std::endl; - myfile.close(); - mkdir("content_repository", S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH); - std::ostringstream oss; - std::unique_ptr outputLogger = std::unique_ptr< - logging::BaseLogger>( - new org::apache::nifi::minifi::core::logging::OutputStreamAppender(oss, - 0)); - std::shared_ptr logger = logging::Logger::getLogger(); - logger->updateLogger(std::move(outputLogger)); - logger->setLogLevel("debug"); - - std::shared_ptr configuration = std::make_shared(); - - std::shared_ptr test_repo = - std::make_shared(); - std::shared_ptr test_flow_repo = std::make_shared< - TestFlowRepository>(); - - configuration->set(minifi::Configure::nifi_flow_configuration_file, - test_file_location); - std::shared_ptr stream_factory = std::make_shared(configuration); - - std::unique_ptr yaml_ptr = std::unique_ptr< - core::YamlConfiguration>( - new core::YamlConfiguration(test_repo, test_repo, stream_factory, test_file_location)); - std::shared_ptr repo = - std::static_pointer_cast(test_repo); - - std::shared_ptr controller = std::make_shared< - minifi::FlowController>(test_repo, test_flow_repo, configuration, std::move(yaml_ptr), - DEFAULT_ROOT_GROUP_NAME, - true); - - core::YamlConfiguration yaml_config(test_repo, test_repo, stream_factory, test_file_location); - - std::unique_ptr ptr = yaml_config.getRoot( - test_file_location); - std::shared_ptr pg = std::shared_ptr( - ptr.get()); - ptr.release(); - - controller->load(); - controller->start(); - waitToVerifyProcessor(); - - controller->waitUnload(60000); - std::string logs = oss.str(); - assert(logs.find("curl performed") != std::string::npos); - assert(logs.find("Import offset 0 length 12") != std::string::npos); - - std::string stringtofind = "Resource Claim created ./content_repository/"; - - size_t loc = logs.find(stringtofind); - while (loc > 0 && loc != std::string::npos) { - std::string id = logs.substr(loc + stringtofind.size(), 36); - loc = logs.find(stringtofind, loc+1); - std::string path = "content_repository/" + id; - unlink(path.c_str()); - if ( loc == std::string::npos) - break; - } - - rmdir("./content_repository"); - return 0; -} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/test/ProcessorTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/ProcessorTests.cpp b/libminifi/test/ProcessorTests.cpp deleted file mode 100644 index dfdcf47..0000000 --- a/libminifi/test/ProcessorTests.cpp +++ /dev/null @@ -1,408 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#define CATCH_CONFIG_MAIN // This tells Catch to provide a main() - only do this in one cpp file -#include -#include -#include "unit/ProvenanceTestHelper.h" -#include "TestBase.h" -#include "core/logging/LogAppenders.h" -#include "core/logging/BaseLogger.h" -#include "processors/GetFile.h" -#include "core/Core.h" -#include "core/FlowFile.h" -#include "core/Processor.h" -#include "core/ProcessContext.h" -#include "core/ProcessSession.h" -#include "core/ProcessorNode.h" -#include "core/reporting/SiteToSiteProvenanceReportingTask.h" - - - -TEST_CASE("Test Creation of GetFile", "[getfileCreate]") { - std::shared_ptr processor = std::make_shared< - org::apache::nifi::minifi::processors::GetFile>("processorname"); - REQUIRE(processor->getName() == "processorname"); -} - -TEST_CASE("Test Find file", "[getfileCreate2]") { - - TestController testController; - - testController.enableDebug(); - - std::shared_ptr processor = std::make_shared< - org::apache::nifi::minifi::processors::GetFile>("getfileCreate2"); - - std::shared_ptr processorReport = - std::make_shared< - org::apache::nifi::minifi::core::reporting::SiteToSiteProvenanceReportingTask>(std::make_shared(std::make_shared())); - - std::shared_ptr test_repo = - std::make_shared(); - - std::shared_ptr repo = - std::static_pointer_cast(test_repo); - std::shared_ptr controller = std::make_shared< - TestFlowController>(test_repo, test_repo); - - char format[] = "/tmp/gt.XXXXXX"; - char *dir = testController.createTempDirectory(format); - - uuid_t processoruuid; - REQUIRE(true == processor->getUUID(processoruuid)); - - std::shared_ptr connection = std::make_shared< - minifi::Connection>(test_repo, "getfileCreate2Connection"); - connection->setRelationship(core::Relationship("success", "description")); - - // link the connections so that we can test results at the end for this - connection->setSource(processor); - connection->setDestination(processor); - - connection->setSourceUUID(processoruuid); - connection->setDestinationUUID(processoruuid); - - processor->addConnection(connection); - REQUIRE(dir != NULL); - - core::ProcessorNode node(processor); - - core::ProcessContext context(node, test_repo); - core::ProcessSessionFactory factory(&context); - context.setProperty(org::apache::nifi::minifi::processors::GetFile::Directory, - dir); - core::ProcessSession session(&context); - - processor->onSchedule(&context, &factory); - REQUIRE(processor->getName() == "getfileCreate2"); - - std::shared_ptr record; - processor->setScheduledState(core::ScheduledState::RUNNING); - processor->onTrigger(&context, &session); - - provenance::ProvenanceReporter *reporter = session.getProvenanceReporter(); - std::set records = reporter->getEvents(); - record = session.get(); - REQUIRE(record == nullptr); - REQUIRE(records.size() == 0); - - std::fstream file; - std::stringstream ss; - ss << dir << "/" << "tstFile.ext"; - file.open(ss.str(), std::ios::out); - file << "tempFile"; - file.close(); - - processor->incrementActiveTasks(); - processor->setScheduledState(core::ScheduledState::RUNNING); - processor->onTrigger(&context, &session); - unlink(ss.str().c_str()); - reporter = session.getProvenanceReporter(); - - REQUIRE(processor->getName() == "getfileCreate2"); - - records = reporter->getEvents(); - - for (provenance::ProvenanceEventRecord *provEventRecord : records) { - REQUIRE(provEventRecord->getComponentType() == processor->getName()); - } - session.commit(); - std::shared_ptr ffr = session.get(); - - ffr->getResourceClaim()->decreaseFlowFileRecordOwnedCount(); - REQUIRE(2 == repo->getRepoMap().size()); - - for (auto entry : repo->getRepoMap()) { - provenance::ProvenanceEventRecord newRecord; - newRecord.DeSerialize((uint8_t*) entry.second.data(), - entry.second.length()); - - bool found = false; - for (auto provRec : records) { - if (provRec->getEventId() == newRecord.getEventId()) { - REQUIRE(provRec->getEventId() == newRecord.getEventId()); - REQUIRE(provRec->getComponentId() == newRecord.getComponentId()); - REQUIRE(provRec->getComponentType() == newRecord.getComponentType()); - REQUIRE(provRec->getDetails() == newRecord.getDetails()); - REQUIRE(provRec->getEventDuration() == newRecord.getEventDuration()); - found = true; - break; - } - } - if (!found) - throw std::runtime_error("Did not find record"); - - } - - core::ProcessorNode nodeReport(processorReport); - core::ProcessContext contextReport(nodeReport, test_repo); - core::ProcessSessionFactory factoryReport(&contextReport); - core::ProcessSession sessionReport(&contextReport); - processorReport->onSchedule(&contextReport, &factoryReport); - std::shared_ptr< - org::apache::nifi::minifi::core::reporting::SiteToSiteProvenanceReportingTask> taskReport = - std::static_pointer_cast< - org::apache::nifi::minifi::core::reporting::SiteToSiteProvenanceReportingTask>( - processorReport); - taskReport->setBatchSize(1); - std::vector> recordsReport; - processorReport->incrementActiveTasks(); - processorReport->setScheduledState(core::ScheduledState::RUNNING); - std::string jsonStr; - repo->getProvenanceRecord(recordsReport, 1); - taskReport->getJsonReport(&contextReport, &sessionReport, recordsReport, - jsonStr); - REQUIRE(recordsReport.size() == 1); - REQUIRE( - taskReport->getName() - == std::string( - org::apache::nifi::minifi::core::reporting::SiteToSiteProvenanceReportingTask::ReportTaskName)); - REQUIRE( - jsonStr.find("\"componentType\" : \"getfileCreate2\"") - != std::string::npos); -} - -TEST_CASE("Test GetFileLikeIt'sThreaded", "[getfileCreate3]") { - - TestController testController; - - testController.enableDebug(); - - std::shared_ptr processor = std::make_shared< - org::apache::nifi::minifi::processors::GetFile>("getfileCreate2"); - - std::shared_ptr test_repo = - std::make_shared(); - - std::shared_ptr repo = - std::static_pointer_cast(test_repo); - std::shared_ptr controller = std::make_shared< - TestFlowController>(test_repo, test_repo); - - char format[] = "/tmp/gt.XXXXXX"; - char *dir = testController.createTempDirectory(format); - - uuid_t processoruuid; - REQUIRE(true == processor->getUUID(processoruuid)); - - std::shared_ptr connection = std::make_shared< - minifi::Connection>(test_repo, "getfileCreate2Connection"); - connection->setRelationship(core::Relationship("success", "description")); - - // link the connections so that we can test results at the end for this - connection->setSource(processor); - connection->setDestination(processor); - - connection->setSourceUUID(processoruuid); - connection->setDestinationUUID(processoruuid); - - processor->addConnection(connection); - REQUIRE(dir != NULL); - - core::ProcessorNode node(processor); - core::ProcessContext context(node, test_repo); - core::ProcessSessionFactory factory(&context); - context.setProperty(org::apache::nifi::minifi::processors::GetFile::Directory, - dir); - // replicate 10 threads - processor->setScheduledState(core::ScheduledState::RUNNING); - processor->onSchedule(&context, &factory); - - int prev = 0; - for (int i = 0; i < 10; i++) { - - core::ProcessSession session(&context); - REQUIRE(processor->getName() == "getfileCreate2"); - - std::shared_ptr record; - - processor->onTrigger(&context, &session); - - provenance::ProvenanceReporter *reporter = session.getProvenanceReporter(); - std::set records = - reporter->getEvents(); - record = session.get(); - REQUIRE(record == nullptr); - REQUIRE(records.size() == 0); - - std::fstream file; - std::stringstream ss; - ss << dir << "/" << "tstFile.ext"; - file.open(ss.str(), std::ios::out); - file << "tempFile"; - file.close(); - - processor->incrementActiveTasks(); - processor->setScheduledState(core::ScheduledState::RUNNING); - processor->onTrigger(&context, &session); - unlink(ss.str().c_str()); - reporter = session.getProvenanceReporter(); - - REQUIRE(processor->getName() == "getfileCreate2"); - - records = reporter->getEvents(); - - for (provenance::ProvenanceEventRecord *provEventRecord : records) { - REQUIRE(provEventRecord->getComponentType() == processor->getName()); - } - session.commit(); - std::shared_ptr ffr = session.get(); - - REQUIRE((repo->getRepoMap().size() % 2) == 0); - REQUIRE(repo->getRepoMap().size() == (prev + 2)); - prev += 2; - - } - -} - -TEST_CASE("LogAttributeTest", "[getfileCreate3]") { - std::ostringstream oss; - std::unique_ptr outputLogger = std::unique_ptr< - logging::BaseLogger>( - new org::apache::nifi::minifi::core::logging::OutputStreamAppender(oss, - 0)); - std::shared_ptr logger = logging::Logger::getLogger(); - logger->updateLogger(std::move(outputLogger)); - - TestController testController; - - testController.enableDebug(); - - std::shared_ptr repo = std::make_shared(); - - std::shared_ptr processor = std::make_shared< - org::apache::nifi::minifi::processors::GetFile>("getfileCreate2"); - - std::shared_ptr logAttribute = std::make_shared< - org::apache::nifi::minifi::processors::LogAttribute>("logattribute"); - - char format[] = "/tmp/gt.XXXXXX"; - char *dir = testController.createTempDirectory(format); - - uuid_t processoruuid; - REQUIRE(true == processor->getUUID(processoruuid)); - - uuid_t logattribute_uuid; - REQUIRE(true == logAttribute->getUUID(logattribute_uuid)); - - std::shared_ptr connection = std::make_shared< - minifi::Connection>(repo, "getfileCreate2Connection"); - connection->setRelationship(core::Relationship("success", "description")); - - std::shared_ptr connection2 = std::make_shared< - minifi::Connection>(repo, "logattribute"); - connection2->setRelationship(core::Relationship("success", "description")); - - // link the connections so that we can test results at the end for this - connection->setSource(processor); - - // link the connections so that we can test results at the end for this - connection->setDestination(logAttribute); - - connection2->setSource(logAttribute); - - connection2->setSourceUUID(logattribute_uuid); - connection->setSourceUUID(processoruuid); - connection->setDestinationUUID(logattribute_uuid); - - processor->addConnection(connection); - logAttribute->addConnection(connection); - logAttribute->addConnection(connection2); - REQUIRE(dir != NULL); - - core::ProcessorNode node(processor); - core::ProcessorNode node2(logAttribute); - - core::ProcessContext context(node, repo); - core::ProcessContext context2(node2, repo); - context.setProperty(org::apache::nifi::minifi::processors::GetFile::Directory, - dir); - core::ProcessSession session(&context); - core::ProcessSession session2(&context2); - - REQUIRE(processor->getName() == "getfileCreate2"); - - std::shared_ptr record; - processor->setScheduledState(core::ScheduledState::RUNNING); - - core::ProcessSessionFactory factory(&context); - processor->onSchedule(&context, &factory); - processor->onTrigger(&context, &session); - - logAttribute->incrementActiveTasks(); - logAttribute->setScheduledState(core::ScheduledState::RUNNING); - core::ProcessSessionFactory factory2(&context2); - logAttribute->onSchedule(&context2, &factory2); - logAttribute->onTrigger(&context2, &session2); - - provenance::ProvenanceReporter *reporter = session.getProvenanceReporter(); - std::set records = reporter->getEvents(); - record = session.get(); - REQUIRE(record == nullptr); - REQUIRE(records.size() == 0); - - std::fstream file; - std::stringstream ss; - ss << dir << "/" << "tstFile.ext"; - file.open(ss.str(), std::ios::out); - file << "tempFile"; - file.close(); - - processor->incrementActiveTasks(); - processor->setScheduledState(core::ScheduledState::RUNNING); - processor->onTrigger(&context, &session); - unlink(ss.str().c_str()); - reporter = session.getProvenanceReporter(); - - records = reporter->getEvents(); - session.commit(); - oss.str(""); - oss.clear(); - - logAttribute->incrementActiveTasks(); - logAttribute->setScheduledState(core::ScheduledState::RUNNING); - logAttribute->onTrigger(&context2, &session2); - - //session2.commit(); - records = reporter->getEvents(); - - std::string log_attribute_output = oss.str(); - REQUIRE( - log_attribute_output.find("key:absolute.path value:" + ss.str()) - != std::string::npos); - REQUIRE(log_attribute_output.find("Size:8 Offset:0") != std::string::npos); - REQUIRE( - log_attribute_output.find("key:path value:" + std::string(dir)) - != std::string::npos); - - outputLogger = std::unique_ptr( - new org::apache::nifi::minifi::core::logging::NullAppender()); - logger->updateLogger(std::move(outputLogger)); - -} - -int fileSize(const char *add) { - std::ifstream mySource; - mySource.open(add, std::ios_base::binary); - mySource.seekg(0, std::ios_base::end); - int size = mySource.tellg(); - mySource.close(); - return size; -} - http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/test/SocketTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/SocketTests.cpp b/libminifi/test/SocketTests.cpp deleted file mode 100644 index 2e5013b..0000000 --- a/libminifi/test/SocketTests.cpp +++ /dev/null @@ -1,185 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#define CATCH_CONFIG_MAIN // This tells Catch to provide a main() - only do this in one cpp file - -#include "TestBase.h" -#include "io/ClientSocket.h" - -using namespace org::apache::nifi::minifi::io; -TEST_CASE("TestSocket", "[TestSocket1]") { - - Socket socket(std::make_shared(std::make_shared()), "localhost", 8183); - REQUIRE(-1 == socket.initialize()); - REQUIRE("localhost" == socket.getHostname()); - socket.closeStream(); - -} - -TEST_CASE("TestSocketWriteTest1", "[TestSocket2]") { - - Socket socket(std::make_shared(std::make_shared()), "localhost", 8183); - REQUIRE(-1 == socket.initialize()); - - socket.writeData(0, 0); - - std::vector buffer; - buffer.push_back('a'); - - REQUIRE(-1 == socket.writeData(buffer, 1)); - - socket.closeStream(); - -} - -TEST_CASE("TestSocketWriteTest2", "[TestSocket3]") { - - std::vector buffer; - buffer.push_back('a'); - - std::shared_ptr socket_context = std::make_shared(std::make_shared()); - - Socket server(socket_context, "localhost", 9183, 1); - - REQUIRE(-1 != server.initialize()); - - Socket client(socket_context, "localhost", 9183); - - REQUIRE(-1 != client.initialize()); - - REQUIRE(1 == client.writeData(buffer, 1)); - - std::vector readBuffer; - readBuffer.resize(1); - - REQUIRE(1 == server.readData(readBuffer, 1)); - - REQUIRE(readBuffer == buffer); - - server.closeStream(); - - client.closeStream(); - -} - -TEST_CASE("TestGetHostName", "[TestSocket4]") { - - REQUIRE(Socket::getMyHostName().length() > 0); - -} - -TEST_CASE("TestWriteEndian64", "[TestSocket4]") { - - std::vector buffer; - buffer.push_back('a'); - - std::shared_ptr socket_context = std::make_shared(std::make_shared()); - - Socket server(socket_context, "localhost", 9183, 1); - - REQUIRE(-1 != server.initialize()); - - Socket client(socket_context, "localhost", 9183); - - REQUIRE(-1 != client.initialize()); - - uint64_t negative_one = -1; - REQUIRE(8 == client.write(negative_one)); - - uint64_t negative_two = 0; - REQUIRE(8 == server.read(negative_two)); - - REQUIRE(negative_two == negative_one); - - server.closeStream(); - - client.closeStream(); - -} - -TEST_CASE("TestWriteEndian32", "[TestSocket5]") { - - std::vector buffer; - buffer.push_back('a'); - - std::shared_ptr socket_context = std::make_shared(std::make_shared()); - - Socket server(socket_context, "localhost", 9183, 1); - - REQUIRE(-1 != server.initialize()); - - Socket client(socket_context, "localhost", 9183); - - REQUIRE(-1 != client.initialize()); - - { - uint32_t negative_one = -1; - REQUIRE(4 == client.write(negative_one)); - - uint32_t negative_two = 0; - REQUIRE(4 == server.read(negative_two)); - - REQUIRE(negative_two == negative_one); - } - - { - uint16_t negative_one = -1; - REQUIRE(2 == client.write(negative_one)); - - uint16_t negative_two = 0; - REQUIRE(2 == server.read(negative_two)); - - REQUIRE(negative_two == negative_one); - } - server.closeStream(); - - client.closeStream(); - -} - -TEST_CASE("TestSocketWriteTestAfterClose", "[TestSocket6]") { - - std::vector buffer; - buffer.push_back('a'); - - std::shared_ptr socket_context = std::make_shared(std::make_shared()); - - Socket server(socket_context, "localhost", 9183, 1); - - REQUIRE(-1 != server.initialize()); - - Socket client(socket_context, "localhost", 9183); - - REQUIRE(-1 != client.initialize()); - - REQUIRE(1 == client.writeData(buffer, 1)); - - std::vector readBuffer; - readBuffer.resize(1); - - REQUIRE(1 == server.readData(readBuffer, 1)); - - REQUIRE(readBuffer == buffer); - - client.closeStream(); - - REQUIRE(-1 == client.writeData(buffer, 1)); - - server.closeStream(); - -}