Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 9F0DA200D11 for ; Mon, 2 Oct 2017 16:57:17 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 9D77A1609DE; Mon, 2 Oct 2017 14:57:17 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 789871609EF for ; Mon, 2 Oct 2017 16:57:15 +0200 (CEST) Received: (qmail 69072 invoked by uid 500); 2 Oct 2017 14:57:14 -0000 Mailing-List: contact commits-help@nifi.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@nifi.apache.org Delivered-To: mailing list commits@nifi.apache.org Received: (qmail 69052 invoked by uid 99); 2 Oct 2017 14:57:14 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 02 Oct 2017 14:57:14 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 2F3E3F5828; Mon, 2 Oct 2017 14:57:14 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: aldrin@apache.org To: commits@nifi.apache.org Date: Mon, 02 Oct 2017 14:57:16 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [3/8] nifi-minifi-cpp git commit: MINIFI-339: Add C2 base allowing for 1 protocol and n heartbeat reporters MINIFI-339: Add GetTCP Processor MINIFI-339: Add listener server MINIFI-339: Update to listener MINIFI-339: Resolve Issue with stack based process archived-at: Mon, 02 Oct 2017 14:57:17 -0000 http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/core/state/ProcessorController.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/state/ProcessorController.cpp b/libminifi/src/core/state/ProcessorController.cpp new file mode 100644 index 0000000..7a1af7d --- /dev/null +++ b/libminifi/src/core/state/ProcessorController.cpp @@ -0,0 +1,64 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "core/state/ProcessorController.h" +#include + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace state { + +ProcessorController::ProcessorController(const std::shared_ptr &processor, const std::shared_ptr &scheduler) + : processor_(processor), + scheduler_(scheduler) { +} + +ProcessorController::~ProcessorController() { +} +/** + * Start the client + */ +int16_t ProcessorController::start() { + processor_->setScheduledState(core::ScheduledState::RUNNING); + scheduler_->schedule(processor_); + return 0; +} +/** + * Stop the client + */ +int16_t ProcessorController::stop(bool force, uint64_t timeToWait) { + scheduler_->unschedule(processor_); + return 0; +} + +bool ProcessorController::isRunning() { + return processor_->isRunning(); +} + +int16_t ProcessorController::pause() { + scheduler_->unschedule(processor_); + return 0; +} + +} /* namespace state */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/core/state/StateManager.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/state/StateManager.cpp b/libminifi/src/core/state/StateManager.cpp new file mode 100644 index 0000000..11feabf --- /dev/null +++ b/libminifi/src/core/state/StateManager.cpp @@ -0,0 +1,137 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "core/state/StateManager.h" +#include +#include +#include +#include "core/state/metrics/MetricsBase.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace state { + +void StateManager::initialize() { + metrics_listener_ = std::unique_ptr(new state::metrics::MetricsListener(shared_from_this(), shared_from_this())); + // manually add the c2 agent for now + listener_thread_pool_.setMaxConcurrentTasks(3); + listener_thread_pool_.start(); + controller_running_ = true; + startMetrics(); +} +/** + * State management operations. + */ +int16_t StateManager::stop(bool force, uint64_t timeToWait) { + controller_running_ = false; + listener_thread_pool_.shutdown(); + return 1; +} + +int16_t StateManager::update(const std::shared_ptr &updateController) { + // must be stopped to update. + if (isStateMonitorRunning()) { + return -1; + } + int16_t ret = applyUpdate(updateController); + switch (ret) { + case -1: + return -1; + default: + return 1; + } +} + +/** + * Passes metrics to the update controllers if they are a metrics sink. + * @param metrics metric to pass through + */ +int16_t StateManager::setMetrics(const std::shared_ptr &metrics) { + if (IsNullOrEmpty(metrics)) { + return -1; + } + auto now = std::chrono::steady_clock::now(); + if (mutex_.try_lock_until(now + std::chrono::milliseconds(100))) { + // update controllers can be metric sinks too + for (auto controller : updateControllers) { + std::shared_ptr sink = std::dynamic_pointer_cast(controller); + if (sink != nullptr) { + sink->setMetrics(metrics); + } + } + metrics_maps_[metrics->getName()] = metrics; + mutex_.unlock(); + } else { + return -1; + } + return 0; +} +/** + * Metrics operations + */ +int16_t StateManager::getMetrics(std::vector> &metric_vector, uint16_t metricsClass) { + auto now = std::chrono::steady_clock::now(); + const std::chrono::steady_clock::time_point wait_time = now + std::chrono::milliseconds(100); + if (mutex_.try_lock_until(wait_time)) { + for (auto metric : metrics_maps_) { + metric_vector.push_back(metric.second); + } + mutex_.unlock(); + return 0; + } + return -1; +} + +bool StateManager::registerUpdateListener(const std::shared_ptr &updateController) { + auto functions = updateController->getFunctions(); + + updateControllers.push_back(updateController); + // run all functions independently + + for (auto function : functions) { + std::unique_ptr> after_execute = std::unique_ptr>(new UpdateRunner(isStateMonitorRunning())); + utils::Worker functor(function, "listeners", std::move(after_execute)); + std::future future; + if (!listener_thread_pool_.execute(std::move(functor), future)) { + // denote failure + return false; + } + } + return true; +} + +/** + * Base metrics function will employ the default metrics listener. + */ +bool StateManager::startMetrics() { + std::unique_ptr> after_execute = std::unique_ptr>(new UpdateRunner(isStateMonitorRunning())); + utils::Worker functor(metrics_listener_->getFunction(), "metrics", std::move(after_execute)); + if (!listener_thread_pool_.execute(std::move(functor), metrics_listener_->getFuture())) { + // denote failure + return false; + } + return true; +} + +} /* namespace state */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/core/state/UpdateController.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/state/UpdateController.cpp b/libminifi/src/core/state/UpdateController.cpp new file mode 100644 index 0000000..f4215f1 --- /dev/null +++ b/libminifi/src/core/state/UpdateController.cpp @@ -0,0 +1,76 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "core/state/UpdateController.h" +#include +#include + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace state { + +UpdateStatus::UpdateStatus(UpdateState state, int16_t reason) + : state_(state), + reason_(reason) { +} + +UpdateStatus::UpdateStatus(const UpdateStatus &other) + : error_(other.error_), + reason_(other.reason_), + state_(other.state_) { +} + +UpdateStatus::UpdateStatus(const UpdateStatus &&other) + : error_(std::move(other.error_)), + reason_(std::move(other.reason_)), + state_(std::move(other.state_)) { +} + +UpdateState UpdateStatus::getState() const { + return state_; +} + +std::string UpdateStatus::getError() const { + return error_; +} + +int16_t UpdateStatus::getReadonCode() const { + return reason_; +} + +UpdateStatus &UpdateStatus::operator=(const UpdateStatus &&other) { + error_ = std::move(other.error_); + reason_ = std::move(other.reason_); + state_ = std::move(other.state_); + return *this; +} + +UpdateStatus &UpdateStatus::operator=(const UpdateStatus &other) { + error_ = other.error_; + reason_ = other.reason_; + state_ = other.state_; + return *this; +} + +} /* namespace state */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/io/ClientSocket.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/io/ClientSocket.cpp b/libminifi/src/io/ClientSocket.cpp index 5f7f5f4..d702620 100644 --- a/libminifi/src/io/ClientSocket.cpp +++ b/libminifi/src/io/ClientSocket.cpp @@ -79,11 +79,24 @@ void Socket::closeStream() { addr_info_ = 0; } if (socket_file_descriptor_ >= 0) { + logger_->log_debug("Closing %d", socket_file_descriptor_); close(socket_file_descriptor_); socket_file_descriptor_ = -1; } } +void Socket::setNonBlocking() { + if (listeners_ <= 0) { + // Put the socket in non-blocking mode: + if (fcntl(socket_file_descriptor_, F_SETFL, O_NONBLOCK) < 0) { + // handle error + logger_->log_error("Could not create non blocking to socket", strerror(errno)); + } else { + logger_->log_info("Successfully applied O_NONBLOCK to fd"); + } + } +} + int8_t Socket::createConnection(const addrinfo *p, in_addr_t &addr) { if ((socket_file_descriptor_ = socket(p->ai_family, p->ai_socktype, p->ai_protocol)) == -1) { logger_->log_error("error while connecting to server socket"); @@ -131,6 +144,7 @@ int8_t Socket::createConnection(const addrinfo *p, in_addr_t &addr) { // add the listener to the total set FD_SET(socket_file_descriptor_, &total_list_); socket_max_ = socket_file_descriptor_; + logger_->log_info("Created connection with file descriptor %d", socket_file_descriptor_); return 0; } @@ -183,15 +197,21 @@ int16_t Socket::initialize() { } // we've successfully connected if (port_ > 0 && createConnection(p, addr) >= 0) { + logger_->log_info("Successfully created connection"); return 0; break; } } + logger_->log_info("Could not find device for our connection"); return -1; } int16_t Socket::select_descriptor(const uint16_t msec) { + if (listeners_ == 0) { + return socket_file_descriptor_; + } + struct timeval tv; int retval; @@ -200,7 +220,7 @@ int16_t Socket::select_descriptor(const uint16_t msec) { tv.tv_sec = msec / 1000; tv.tv_usec = (msec % 1000) * 1000; - std::lock_guard < std::recursive_mutex > guard(selection_mutex_); + std::lock_guard guard(selection_mutex_); if (msec > 0) retval = select(socket_max_ + 1, &read_fds_, NULL, NULL, &tv); @@ -236,6 +256,8 @@ int16_t Socket::select_descriptor(const uint16_t msec) { } } + logger_->log_error("Could not find a suitable file descriptor"); + return -1; } @@ -291,19 +313,20 @@ int Socket::writeData(std::vector &buf, int buflen) { int Socket::writeData(uint8_t *value, int size) { int ret = 0, bytes = 0; + int fd = select_descriptor(1000); while (bytes < size) { - ret = send(socket_file_descriptor_, value + bytes, size - bytes, 0); + ret = send(fd, value + bytes, size - bytes, 0); // check for errors if (ret <= 0) { - close(socket_file_descriptor_); - logger_->log_error("Could not send to %d, error: %s", socket_file_descriptor_, strerror(errno)); + close(fd); + logger_->log_error("Could not send to %d, error: %s", fd, strerror(errno)); return ret; } bytes += ret; } if (ret) - logger_->log_trace("Send data size %d over socket %d", size, socket_file_descriptor_); + logger_->log_trace("Send data size %d over socket %d", size, fd); return bytes; } @@ -362,27 +385,32 @@ int Socket::read(uint16_t &value, bool is_little_endian) { return sizeof(value); } -int Socket::readData(std::vector &buf, int buflen) { +int Socket::readData(std::vector &buf, int buflen, bool retrieve_all_bytes) { if (buf.capacity() < buflen) { buf.resize(buflen); } - return readData(reinterpret_cast(&buf[0]), buflen); + return readData(reinterpret_cast(&buf[0]), buflen, retrieve_all_bytes); } -int Socket::readData(uint8_t *buf, int buflen) { +int Socket::readData(uint8_t *buf, int buflen, bool retrieve_all_bytes) { int32_t total_read = 0; while (buflen) { int16_t fd = select_descriptor(1000); if (fd < 0) { - logger_->log_info("fd close %i", buflen); + logger_->log_info("fd %d close %i", fd, buflen); close(socket_file_descriptor_); return -1; } int bytes_read = recv(fd, buf, buflen, 0); + logger_->log_info("Recv call %d", bytes_read); if (bytes_read <= 0) { if (bytes_read == 0) { logger_->log_info("Other side hung up on %d", fd); } else { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + // continue + return -2; + } logger_->log_error("Could not recv on %d, error: %s", fd, strerror(errno)); } return -1; @@ -390,6 +418,9 @@ int Socket::readData(uint8_t *buf, int buflen) { buflen -= bytes_read; buf += bytes_read; total_read += bytes_read; + if (!retrieve_all_bytes) { + break; + } } return total_read; } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/io/FileStream.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/io/FileStream.cpp b/libminifi/src/io/FileStream.cpp index 3b2bfe1..a3e7ee1 100644 --- a/libminifi/src/io/FileStream.cpp +++ b/libminifi/src/io/FileStream.cpp @@ -140,6 +140,7 @@ int FileStream::readData(uint8_t *buf, int buflen) { int len = file_stream_->tellg(); offset_ = len; length_ = len; + logger_->log_info("%s eof bit, ended at %d", path_, offset_); return offset_; } else { offset_ += buflen; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/io/StreamFactory.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/io/StreamFactory.cpp b/libminifi/src/io/StreamFactory.cpp index 288f4d1..7990edd 100644 --- a/libminifi/src/io/StreamFactory.cpp +++ b/libminifi/src/io/StreamFactory.cpp @@ -47,11 +47,11 @@ class SocketCreator : public AbstractStreamFactory { public: template ContextTypeCheck> create(const std::shared_ptr &configure) { - return std::make_shared < V > (configure); + return std::make_shared(configure); } template ContextTypeCheck> create(const std::shared_ptr &configure) { - return std::make_shared < SocketContext > (configure); + return std::make_shared(configure); } SocketCreator(std::shared_ptr configure) { @@ -69,7 +69,7 @@ class SocketCreator : public AbstractStreamFactory { std::unique_ptr createSocket(const std::string &host, const uint16_t port) { T *socket = create(host, port); - return std::unique_ptr < Socket > (socket); + return std::unique_ptr(socket); } private: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/processors/ExecuteProcess.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/processors/ExecuteProcess.cpp b/libminifi/src/processors/ExecuteProcess.cpp index 9c6f732..323d69a 100644 --- a/libminifi/src/processors/ExecuteProcess.cpp +++ b/libminifi/src/processors/ExecuteProcess.cpp @@ -145,7 +145,7 @@ void ExecuteProcess::onTrigger(core::ProcessContext *context, core::ProcessSessi break; logger_->log_info("Execute Command Respond %d", numRead); ExecuteProcess::WriteCallback callback(buffer, numRead); - std::shared_ptr flowFile = std::static_pointer_cast < FlowFileRecord > (session->create()); + std::shared_ptr flowFile = std::static_pointer_cast(session->create()); if (!flowFile) continue; flowFile->addAttribute("command", _command.c_str()); @@ -167,7 +167,7 @@ void ExecuteProcess::onTrigger(core::ProcessContext *context, core::ProcessSessi // child exits and close the pipe ExecuteProcess::WriteCallback callback(buffer, totalRead); if (!flowFile) { - flowFile = std::static_pointer_cast < FlowFileRecord > (session->create()); + flowFile = std::static_pointer_cast(session->create()); if (!flowFile) break; flowFile->addAttribute("command", _command.c_str()); @@ -185,7 +185,7 @@ void ExecuteProcess::onTrigger(core::ProcessContext *context, core::ProcessSessi logger_->log_info("Execute Command Max Respond %d", sizeof(buffer)); ExecuteProcess::WriteCallback callback(buffer, sizeof(buffer)); if (!flowFile) { - flowFile = std::static_pointer_cast < FlowFileRecord > (session->create()); + flowFile = std::static_pointer_cast(session->create()); if (!flowFile) continue; flowFile->addAttribute("command", _command.c_str()); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/processors/GenerateFlowFile.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/processors/GenerateFlowFile.cpp b/libminifi/src/processors/GenerateFlowFile.cpp index 2fee3f2..3741a8f 100644 --- a/libminifi/src/processors/GenerateFlowFile.cpp +++ b/libminifi/src/processors/GenerateFlowFile.cpp @@ -91,7 +91,7 @@ void GenerateFlowFile::onTrigger(core::ProcessContext *context, core::ProcessSes } for (int i = 0; i < batchSize; i++) { // For each batch - std::shared_ptr flowFile = std::static_pointer_cast < FlowFileRecord > (session->create()); + std::shared_ptr flowFile = std::static_pointer_cast(session->create()); if (!flowFile) return; if (fileSize > 0) @@ -114,7 +114,7 @@ void GenerateFlowFile::onTrigger(core::ProcessContext *context, core::ProcessSes GenerateFlowFile::WriteCallback callback(_data, _dataSize); for (int i = 0; i < batchSize; i++) { // For each batch - std::shared_ptr flowFile = std::static_pointer_cast < FlowFileRecord > (session->create()); + std::shared_ptr flowFile = std::static_pointer_cast(session->create()); if (!flowFile) return; if (fileSize > 0) http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/processors/GetFile.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/processors/GetFile.cpp b/libminifi/src/processors/GetFile.cpp index 723d461..1b73c2d 100644 --- a/libminifi/src/processors/GetFile.cpp +++ b/libminifi/src/processors/GetFile.cpp @@ -133,6 +133,9 @@ void GetFile::onSchedule(core::ProcessContext *context, core::ProcessSessionFact void GetFile::onTrigger(core::ProcessContext *context, core::ProcessSession *session) { // Perform directory list + + metrics_->iterations_++; + logger_->log_info("Is listing empty %i", isListingEmpty()); if (isListingEmpty()) { if (request_.pollInterval == 0 || (getTimeMillis() - last_listing_time_) > request_.pollInterval) { @@ -150,7 +153,7 @@ void GetFile::onTrigger(core::ProcessContext *context, core::ProcessSession *ses std::string fileName = list.front(); list.pop(); logger_->log_info("GetFile process %s", fileName.c_str()); - std::shared_ptr flowFile = std::static_pointer_cast < FlowFileRecord > (session->create()); + std::shared_ptr flowFile = std::static_pointer_cast(session->create()); if (flowFile == nullptr) return; std::size_t found = fileName.find_last_of("/\\"); @@ -172,21 +175,21 @@ void GetFile::onTrigger(core::ProcessContext *context, core::ProcessSession *ses } bool GetFile::isListingEmpty() { - std::lock_guard < std::mutex > lock(mutex_); + std::lock_guard lock(mutex_); return _dirList.empty(); } void GetFile::putListing(std::string fileName) { - std::lock_guard < std::mutex > lock(mutex_); + std::lock_guard lock(mutex_); _dirList.push(fileName); } void GetFile::pollListing(std::queue &list, const GetFileRequest &request) { - std::lock_guard < std::mutex > lock(mutex_); + std::lock_guard lock(mutex_); - while (!_dirList.empty() && (request.maxSize == 0 || list.size() < request.maxSize)) { + while (!_dirList.empty() && (request.batchSize == 0 || list.size() < request.batchSize)) { std::string fileName = _dirList.front(); _dirList.pop(); list.push(fileName); @@ -229,7 +232,8 @@ bool GetFile::acceptFile(std::string fullName, std::string name, const GetFileRe regfree(®ex); if (ret) return false; - + metrics_->input_bytes_ += statbuf.st_size; + metrics_->accepted_files_++; return true; } @@ -267,6 +271,11 @@ void GetFile::performListing(std::string dir, const GetFileRequest &request) { closedir(d); } +int16_t GetFile::getMetrics(std::vector> &metric_vector) { + metric_vector.push_back(metrics_); + return 0; +} + } /* namespace processors */ } /* namespace minifi */ } /* namespace nifi */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/processors/GetTCP.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/processors/GetTCP.cpp b/libminifi/src/processors/GetTCP.cpp new file mode 100644 index 0000000..bcf3d58 --- /dev/null +++ b/libminifi/src/processors/GetTCP.cpp @@ -0,0 +1,289 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "processors/GetTCP.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "io/ClientSocket.h" +#include "utils/StringUtils.h" +#include "utils/TimeUtil.h" +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +const char *DataHandler::SOURCE_ENDPOINT_ATTRIBUTE = "source.endpoint"; + +core::Property GetTCP::EndpointList("endpoint-list", "A comma delimited list of the endpoints to connect to. The format should be :.", ""); +core::Property GetTCP::ConcurrentHandlers("concurrent-handler-count", "Number of concurrent handlers for this session", "1"); +core::Property GetTCP::ReconnectInterval("reconnect-interval", "The number of seconds to wait before attempting to reconnect to the endpoint.", "5s"); +core::Property GetTCP::ReceiveBufferSize("receive-buffer-size", "The size of the buffer to receive data in. Default 16384 (16MB).", "16MB"); +core::Property GetTCP::StayConnected("Stay Connected", "Determines if we keep the same socket despite having no data", "true"); +core::Property GetTCP::ConnectionAttemptLimit("connection-attempt-timeout", "Maximum number of connection attempts before attempting backup hosts, if configured.", "3"); +core::Property GetTCP::EndOfMessageByte( + "end-of-message-byte", + "Byte value which denotes end of message. Must be specified as integer within the valid byte range (-128 thru 127). For example, '13' = Carriage return and '10' = New line. Default '13'.", "13"); + +core::Relationship GetTCP::Success("success", "All files are routed to success"); +core::Relationship GetTCP::Partial("partial", "Indicates an incomplete message as a result of encountering the end of message byte trigger"); + +int16_t DataHandler::handle(std::string source, uint8_t *message, size_t size, bool partial) { + std::shared_ptr my_session = sessionFactory_->createSession(); + std::shared_ptr flowFile = my_session->create(); + + DataHandlerCallback callback(message, size); + + my_session->write(flowFile, &callback); + + my_session->putAttribute(flowFile, SOURCE_ENDPOINT_ATTRIBUTE, source); + + if (partial) { + my_session->transfer(flowFile, GetTCP::Partial); + } else { + my_session->transfer(flowFile, GetTCP::Success); + } + + my_session->commit(); + + return 0; +} +void GetTCP::initialize() { + // Set the supported properties + std::set properties; + properties.insert(EndpointList); + properties.insert(ConcurrentHandlers); + properties.insert(ConnectionAttemptLimit); + properties.insert(EndOfMessageByte); + properties.insert(ReceiveBufferSize); + properties.insert(StayConnected); + setSupportedProperties(properties); + // Set the supported relationships + std::set relationships; + relationships.insert(Success); + relationships.insert(Partial); + setSupportedRelationships(relationships); +} + +void GetTCP::onSchedule(std::shared_ptr context, std::shared_ptr sessionFactory) { + std::string value; + stay_connected_ = true; + if (context->getProperty(EndpointList.getName(), value)) { + endpoints = utils::StringUtils::split(value, ","); + } + + if (context->getProperty(ConcurrentHandlers.getName(), value)) { + int64_t handlers = 0; + core::Property::StringToInt(value, handlers); + concurrent_handlers_ = handlers; + } + + if (context->getProperty(StayConnected.getName(), value)) { + utils::StringUtils::StringToBool(value, stay_connected_); + } else { + stay_connected_ = true; + } + if (context->getProperty(ConnectionAttemptLimit.getName(), value)) { + int64_t connects = 0; + core::Property::StringToInt(value, connects); + connection_attempt_limit_ = connects; + } + if (context->getProperty(ReceiveBufferSize.getName(), value)) { + int64_t size = 0; + core::Property::StringToInt(value, size); + receive_buffer_size_ = size; + } + + if (context->getProperty(EndOfMessageByte.getName(), value)) { + int64_t byteValue = 0; + core::Property::StringToInt(value, byteValue); + endOfMessageByte = byteValue & 0xFF; + } + + if (context->getProperty(ReconnectInterval.getName(), value)) { + int64_t msec; + core::TimeUnit unit; + if (core::Property::StringToTime(value, msec, unit) && core::Property::ConvertTimeUnitToMS(msec, unit, msec)) { + reconnect_interval_ = msec; + logger_->log_debug("successfully applied reconnect interval of %d", reconnect_interval_); + } + } else { + reconnect_interval_ = 5000; + } + + handler_ = std::unique_ptr(new DataHandler(sessionFactory)); + + f_ex = [&] { + std::unique_ptr socket_ptr; + // reuse the byte buffer. + std::vector buffer; + int reconnects = 0; + do { + if ( socket_ring_buffer_.try_dequeue(socket_ptr) ) { + int size_read = socket_ptr->readData(buffer, receive_buffer_size_, false); + + if (size_read >= 0) { + if (size_read > 0) { + // determine cut location + int startLoc = 0, i = 0; + for (; i < size_read; i++) { + if (buffer.at(i) == endOfMessageByte && i > 0) { + if (i-startLoc > 0) { + handler_->handle(socket_ptr->getHostname(), buffer.data()+startLoc, (i-startLoc), true); + } + startLoc = i; + } + } + if (startLoc > 0) { + logger_->log_info("Starting at %i, ending at %i", startLoc, size_read); + if (size_read-startLoc > 0) { + handler_->handle(socket_ptr->getHostname(), buffer.data()+startLoc, (size_read-startLoc), true); + } + } else { + logger_->log_info("Handling at %i, ending at %i", startLoc, size_read); + if (size_read > 0) { + handler_->handle(socket_ptr->getHostname(), buffer.data(), size_read, false); + } + } + reconnects = 0; + } + socket_ring_buffer_.enqueue(std::move(socket_ptr)); + } else if (size_read == -2 && stay_connected_) { + if (++reconnects > connection_attempt_limit_) { + logger_->log_info("Too many reconnects, exiting thread"); + socket_ptr->closeStream(); + return -1; + } + logger_->log_info("Sleeping for %d msec before attempting to reconnect", reconnect_interval_); + std::this_thread::sleep_for(std::chrono::milliseconds(reconnect_interval_)); + socket_ring_buffer_.enqueue(std::move(socket_ptr)); + } else { + socket_ptr->closeStream(); + std::this_thread::sleep_for(std::chrono::milliseconds(reconnect_interval_)); + logger_->log_info("Read response returned a -1 from socket, exiting thread"); + return -1; + } + } else { + std::this_thread::sleep_for(std::chrono::milliseconds(reconnect_interval_)); + logger_->log_info("Could not use socket, exiting thread"); + return -1; + } + }while (running_); + logger_->log_info("Ending private thread"); + return 0; + }; + + utils::ThreadPool pool = utils::ThreadPool(concurrent_handlers_); + client_thread_pool_ = std::move(pool); + client_thread_pool_.start(); + + running_ = true; +} + +void GetTCP::notifyStop() { + running_ = false; + // await threads to shutdown. + client_thread_pool_.shutdown(); + std::unique_ptr socket_ptr; + while (socket_ring_buffer_.size_approx() > 0) { + socket_ring_buffer_.try_dequeue(socket_ptr); + } +} +void GetTCP::onTrigger(std::shared_ptr context, std::shared_ptr session) { + // Perform directory list + metrics_->iterations_++; + std::lock_guard lock(mutex_); + // check if the futures are valid. If they've terminated remove it from the map. + + for (auto &endpoint : endpoints) { + auto endPointFuture = live_clients_.find(endpoint); + + // does not exist + if (endPointFuture == live_clients_.end()) { + logger_->log_info("creating endpoint for %s", endpoint); + std::vector hostAndPort = utils::StringUtils::split(endpoint, ":"); + if (hostAndPort.size() == 2) { + logger_->log_debug("Opening another socket to %s:%s", hostAndPort.at(0), hostAndPort.at(1)); + std::unique_ptr socket = std::move(stream_factory_->createSocket(hostAndPort.at(0), std::stoi(hostAndPort.at(1)))); + + if (socket->initialize() != -1) { + socket->setNonBlocking(); + logger_->log_debug("Enqueueing socket into ring buffer %s:%s", hostAndPort.at(0), hostAndPort.at(1)); + socket_ring_buffer_.enqueue(std::move(socket)); + + } else { + logger_->log_error("Could not create socket during initialization for %s", endpoint); + } + } else { + logger_->log_error("Could not create socket for %s", endpoint); + } + + std::future *future = new std::future(); + + std::unique_ptr> after_execute = std::unique_ptr>(new SocketAfterExecute(running_, endpoint, &live_clients_, &mutex_)); + utils::Worker functor(f_ex, "workers", std::move(after_execute)); + if (client_thread_pool_.execute(std::move(functor), *future)) { + live_clients_[endpoint] = future; + } + } else { + if (!endPointFuture->second->valid()) { + delete endPointFuture->second; + std::future *future = new std::future(); + std::unique_ptr> after_execute = std::unique_ptr>(new SocketAfterExecute(running_, endpoint, &live_clients_, &mutex_)); + utils::Worker functor(f_ex, "workers", std::move(after_execute)); + if (client_thread_pool_.execute(std::move(functor), *future)) { + live_clients_[endpoint] = future; + } + } else { + logger_->log_info("Thread still running for %s", endPointFuture->first); + // we have a thread corresponding to this. + } + } + } + logger_->log_info("Updating endpoint"); + context->yield(); +} + +int16_t GetTCP::getMetrics(std::vector> &metric_vector) { + metric_vector.push_back(metrics_); + return 0; +} + +} /* namespace processors */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/processors/InvokeHTTP.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/processors/InvokeHTTP.cpp b/libminifi/src/processors/InvokeHTTP.cpp index 81271a5..873e317 100644 --- a/libminifi/src/processors/InvokeHTTP.cpp +++ b/libminifi/src/processors/InvokeHTTP.cpp @@ -42,7 +42,7 @@ #include "ResourceClaim.h" #include "utils/StringUtils.h" #include "utils/ByteInputCallBack.h" -#include "utils/HTTPUtils.h" +#include "utils/HTTPClient.h" namespace org { namespace apache { @@ -77,7 +77,10 @@ core::Property InvokeHTTP::ContentType("Content-type", "The Content-Type to spec core::Property InvokeHTTP::SendBody("send-message-body", "If true, sends the HTTP message body on POST/PUT/PATCH requests (default). " "If false, suppresses the message body and content-type header for these requests.", "true"); - +core::Property InvokeHTTP::UseChunkedEncoding("Use Chunked Encoding", "When POST'ing, PUT'ing or PATCH'ing content set this property to true in order to not pass the 'Content-length' header" + " and instead send 'Transfer-Encoding' with a value of 'chunked'. This will enable the data transfer mechanism which was introduced in HTTP 1.1 " + "to pass data of unknown lengths in chunks.", + "false"); core::Property InvokeHTTP::PropPutOutputAttributes("Put Response Body in Attribute", "If set, the response body received back will be put into an attribute of the original " "FlowFile instead of a separate FlowFile. The attribute key to put to is determined by evaluating value of this property. ", ""); @@ -86,6 +89,7 @@ core::Property InvokeHTTP::AlwaysOutputResponse("Always Output Response", "Will "false"); core::Property InvokeHTTP::PenalizeOnNoRetry("Penalize on \"No Retry\"", "Enabling this property will penalize FlowFiles that are routed to the \"No Retry\" relationship.", "false"); +core::Property InvokeHTTP::DisablePeerVerification("Disable Peer Verification", "Disables peer verification for the SSL session", "false"); const char* InvokeHTTP::STATUS_CODE = "invokehttp.status.code"; const char* InvokeHTTP::STATUS_MESSAGE = "invokehttp.status.message"; const char* InvokeHTTP::RESPONSE_BODY = "invokehttp.response.body"; @@ -108,19 +112,6 @@ core::Relationship InvokeHTTP::RelNoRetry("no retry", "The original FlowFile wil core::Relationship InvokeHTTP::RelFailure("failure", "The original FlowFile will be routed on any type of connection failure, " "timeout or general exception. It will have new attributes detailing the request."); -void InvokeHTTP::set_request_method(CURL *curl, const std::string &method) { - std::string my_method = method; - std::transform(my_method.begin(), my_method.end(), my_method.begin(), ::toupper); - if (my_method == "POST") { - curl_easy_setopt(curl, CURLOPT_POST, 1); - } else if (my_method == "PUT") { - curl_easy_setopt(curl, CURLOPT_PUT, 1); - } else if (my_method == "GET") { - } else { - curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, my_method.c_str()); - } -} - void InvokeHTTP::initialize() { logger_->log_info("Initializing InvokeHTTP"); @@ -136,9 +127,11 @@ void InvokeHTTP::initialize() { properties.insert(ProxyHost); properties.insert(ProxyPort); properties.insert(ProxyUser); + properties.insert(UseChunkedEncoding); properties.insert(ProxyPassword); properties.insert(ContentType); properties.insert(SendBody); + properties.insert(DisablePeerVerification); properties.insert(AlwaysOutputResponse); setSupportedProperties(properties); @@ -171,6 +164,11 @@ void InvokeHTTP::onSchedule(core::ProcessContext *context, core::ProcessSessionF return; } + std::string contentTypeStr; + if (context->getProperty(ContentType.getName(), contentTypeStr)) { + content_type_ = contentTypeStr; + } + if (context->getProperty(ReadTimeout.getName(), timeoutStr)) { core::Property::StringToInt(timeoutStr, read_timeout_); @@ -195,14 +193,14 @@ void InvokeHTTP::onSchedule(core::ProcessContext *context, core::ProcessSessionF std::string always_output_response = "false"; if (!context->getProperty(AlwaysOutputResponse.getName(), always_output_response)) { - logger_->log_info("%s attribute is missing, so default value of %s will be used", AttributesToSend.getName().c_str(), AttributesToSend.getValue().c_str()); + logger_->log_info("%s attribute is missing, so default value of %s will be used", AlwaysOutputResponse.getName().c_str(), AlwaysOutputResponse.getValue().c_str()); } utils::StringUtils::StringToBool(always_output_response, always_output_response_); std::string penalize_no_retry = "false"; if (!context->getProperty(PenalizeOnNoRetry.getName(), penalize_no_retry)) { - logger_->log_info("%s attribute is missing, so default value of %s will be used", AttributesToSend.getName().c_str(), AttributesToSend.getValue().c_str()); + logger_->log_info("%s attribute is missing, so default value of %s will be used", PenalizeOnNoRetry.getName().c_str(), PenalizeOnNoRetry.getValue().c_str()); } utils::StringUtils::StringToBool(penalize_no_retry, penalize_no_retry_); @@ -211,29 +209,24 @@ void InvokeHTTP::onSchedule(core::ProcessContext *context, core::ProcessSessionF if (context->getProperty(SSLContext.getName(), context_name) && !IsNullOrEmpty(context_name)) { std::shared_ptr service = context->getControllerService(context_name); if (nullptr != service) { - ssl_context_service_ = std::static_pointer_cast < minifi::controllers::SSLContextService > (service); + ssl_context_service_ = std::static_pointer_cast(service); } } -} -InvokeHTTP::~InvokeHTTP() { - curl_global_cleanup(); -} + std::string useChunkedEncoding = "false"; + if (!context->getProperty(UseChunkedEncoding.getName(), useChunkedEncoding)) { + logger_->log_info("%s attribute is missing, so default value of %s will be used", UseChunkedEncoding.getName().c_str(), UseChunkedEncoding.getValue().c_str()); + } -inline bool InvokeHTTP::matches(const std::string &value, const std::string &sregex) { - if (sregex == ".*") - return true; + utils::StringUtils::StringToBool(useChunkedEncoding, use_chunked_encoding_); - regex_t regex; - int ret = regcomp(®ex, sregex.c_str(), 0); - if (ret) - return false; - ret = regexec(®ex, value.c_str(), (size_t) 0, NULL, 0); - regfree(®ex); - if (ret) - return false; + std::string disablePeerVerification = "false"; + if (context->getProperty(DisablePeerVerification.getName(), disablePeerVerification)) { + utils::StringUtils::StringToBool(disablePeerVerification, disable_peer_verification_); + } +} - return true; +InvokeHTTP::~InvokeHTTP() { } std::string InvokeHTTP::generateId() { @@ -248,50 +241,15 @@ bool InvokeHTTP::emitFlowFile(const std::string &method) { return ("POST" == method || "PUT" == method || "PATCH" == method); } -struct curl_slist *InvokeHTTP::build_header_list(CURL *curl, std::string regex, const std::map &attributes) { - struct curl_slist *list = NULL; - if (curl) { - for (auto attribute : attributes) { - if (matches(attribute.first, regex)) { - std::string attr = attribute.first + ":" + attribute.second; - list = curl_slist_append(list, attr.c_str()); - } - } - } - return list; -} - -bool InvokeHTTP::isSecure(const std::string &url) { - if (url.find("https") != std::string::npos) { - return true; - } - return false; -} - -CURLcode InvokeHTTP::configure_ssl_context(CURL *curl, void *ctx, void *param) { - minifi::controllers::SSLContextService *ssl_context_service = static_cast(param); - if (!ssl_context_service->configure_ssl_context(static_cast(ctx))) { - return CURLE_FAILED_INIT; - } - return CURLE_OK; -} - -void InvokeHTTP::configure_secure_connection(CURL *http_session) { - logger_->log_debug("InvokeHTTP -- Using certificate file %s", ssl_context_service_->getCertificateFile()); - curl_easy_setopt(http_session, CURLOPT_VERBOSE, 1L); - curl_easy_setopt(http_session, CURLOPT_SSL_CTX_FUNCTION, &InvokeHTTP::configure_ssl_context); - curl_easy_setopt(http_session, CURLOPT_SSL_CTX_DATA, static_cast(ssl_context_service_.get())); -} - void InvokeHTTP::onTrigger(core::ProcessContext *context, core::ProcessSession *session) { - std::shared_ptr flowFile = std::static_pointer_cast < FlowFileRecord > (session->get()); - logger_->log_info("onTrigger InvokeHTTP with %s", method_.c_str()); + std::shared_ptr flowFile = std::static_pointer_cast(session->get()); + if (flowFile == nullptr) { if (!emitFlowFile(method_)) { logger_->log_info("InvokeHTTP -- create flow file with %s", method_.c_str()); - flowFile = std::static_pointer_cast < FlowFileRecord > (session->create()); + flowFile = std::static_pointer_cast(session->create()); } else { logger_->log_info("exiting because method is %s", method_.c_str()); return; @@ -302,42 +260,39 @@ void InvokeHTTP::onTrigger(core::ProcessContext *context, core::ProcessSession * // create a transaction id std::string tx_id = generateId(); - CURL *http_session = curl_easy_init(); - // set the HTTP request method from libCURL - set_request_method(http_session, method_); - if (isSecure(url_) && ssl_context_service_ != nullptr) { - configure_secure_connection(http_session); - } + utils::HTTPClient client(url_, ssl_context_service_); - curl_easy_setopt(http_session, CURLOPT_URL, url_.c_str()); + client.setVerbose(); + client.initialize(method_); + client.setConnectionTimeout(connect_timeout_); + client.setReadTimeout(read_timeout_); - if (connect_timeout_ > 0) { - curl_easy_setopt(http_session, CURLOPT_TIMEOUT, connect_timeout_); + if (!content_type_.empty()) { + client.setContentType(content_type_); } - if (read_timeout_ > 0) { - curl_easy_setopt(http_session, CURLOPT_TIMEOUT, read_timeout_); + if (use_chunked_encoding_) { + client.setUseChunkedEncoding(); } - utils::HTTPRequestResponse content; - curl_easy_setopt(http_session, CURLOPT_WRITEFUNCTION, &utils::HTTPRequestResponse::recieve_write); - - curl_easy_setopt(http_session, CURLOPT_WRITEDATA, static_cast(&content)); + if (disable_peer_verification_) { + logger_->log_debug("Disabling peer verification in HTTPClient"); + client.setDisablePeerVerification(); + } + std::unique_ptr callback = nullptr; + std::unique_ptr callbackObj = nullptr; if (emitFlowFile(method_)) { logger_->log_info("InvokeHTTP -- reading flowfile"); std::shared_ptr claim = flowFile->getResourceClaim(); if (claim) { - utils::ByteInputCallBack *callback = new utils::ByteInputCallBack(); - session->read(flowFile, callback); - utils::CallBackPosition *callbackObj = new utils::CallBackPosition; - callbackObj->ptr = callback; + callback = std::unique_ptr(new utils::ByteInputCallBack()); + session->read(flowFile, callback.get()); + callbackObj = std::unique_ptr(new utils::HTTPUploadCallback); + callbackObj->ptr = callback.get(); callbackObj->pos = 0; - logger_->log_info("InvokeHTTP -- Setting callback"); - curl_easy_setopt(http_session, CURLOPT_INFILESIZE_LARGE, (curl_off_t)callback->getBufferSize()); - curl_easy_setopt(http_session, CURLOPT_READFUNCTION, &utils::HTTPRequestResponse::send_write); - curl_easy_setopt(http_session, CURLOPT_READDATA, static_cast(callbackObj)); - + logger_->log_info("InvokeHTTP -- Setting callback, size is %d", callback->getBufferSize()); + client.setUploadCallback(callbackObj.get()); } else { logger_->log_error("InvokeHTTP -- no resource claim"); } @@ -347,63 +302,56 @@ void InvokeHTTP::onTrigger(core::ProcessContext *context, core::ProcessSession * } // append all headers - struct curl_slist *headers = build_header_list(http_session, attribute_to_send_regex_, flowFile->getAttributes()); - curl_easy_setopt(http_session, CURLOPT_HTTPHEADER, headers); + client.build_header_list(attribute_to_send_regex_, flowFile->getAttributes()); logger_->log_info("InvokeHTTP -- curl performed"); - res = curl_easy_perform(http_session); - - if (res == CURLE_OK) { + if (client.submit()) { logger_->log_info("InvokeHTTP -- curl successful"); bool putToAttribute = !IsNullOrEmpty(put_attribute_name_); - std::string response_body(content.data.begin(), content.data.end()); - int64_t http_code = 0; - curl_easy_getinfo(http_session, CURLINFO_RESPONSE_CODE, &http_code); - char *content_type; - /* ask for the content-type */ - curl_easy_getinfo(http_session, CURLINFO_CONTENT_TYPE, &content_type); + const std::vector &response_body = client.getResponseBody(); + const std::vector &response_headers = client.getHeaders(); + int64_t http_code = client.getResponseCode(); + const char *content_type = client.getContentType(); flowFile->addAttribute(STATUS_CODE, std::to_string(http_code)); - flowFile->addAttribute(STATUS_MESSAGE, response_body); + if (response_headers.size() > 0) + flowFile->addAttribute(STATUS_MESSAGE, response_headers.at(0)); flowFile->addAttribute(REQUEST_URL, url_); flowFile->addAttribute(TRANSACTION_ID, tx_id); - bool isSuccess = ((int32_t) (http_code / 100)) == 2 && res != CURLE_ABORTED_BY_CALLBACK; + bool isSuccess = ((int32_t) (http_code / 100)) == 2; bool output_body_to_requestAttr = (!isSuccess || putToAttribute) && flowFile != nullptr; bool output_body_to_content = isSuccess && !putToAttribute; - bool body_empty = IsNullOrEmpty(content.data); + bool body_empty = IsNullOrEmpty(response_body); - logger_->log_info("isSuccess: %d, response code %d ", isSuccess, http_code); + logger_->log_info("isSuccess: %d, response code %d", isSuccess, http_code); std::shared_ptr response_flow = nullptr; if (output_body_to_content) { if (flowFile != nullptr) { - response_flow = std::static_pointer_cast < FlowFileRecord > (session->create(flowFile)); + response_flow = std::static_pointer_cast(session->create(flowFile)); } else { - response_flow = std::static_pointer_cast < FlowFileRecord > (session->create()); + response_flow = std::static_pointer_cast(session->create()); } std::string ct = content_type; response_flow->addKeyedAttribute(MIME_TYPE, ct); response_flow->addAttribute(STATUS_CODE, std::to_string(http_code)); - response_flow->addAttribute(STATUS_MESSAGE, response_body); + if (response_headers.size() > 0) + flowFile->addAttribute(STATUS_MESSAGE, response_headers.at(0)); response_flow->addAttribute(REQUEST_URL, url_); response_flow->addAttribute(TRANSACTION_ID, tx_id); - io::DataStream stream((const uint8_t*) content.data.data(), content.data.size()); + io::DataStream stream((const uint8_t*) response_body.data(), response_body.size()); // need an import from the data stream. session->importFrom(stream, response_flow); } else { logger_->log_info("Cannot output body to content"); - response_flow = std::static_pointer_cast < FlowFileRecord > (session->create()); + response_flow = std::static_pointer_cast(session->create()); } route(flowFile, response_flow, session, context, isSuccess, http_code); - } else { - logger_->log_error("InvokeHTTP -- curl_easy_perform() failed %s\n", curl_easy_strerror(res)); } - curl_slist_free_all(headers); - curl_easy_cleanup(http_session); } void InvokeHTTP::route(std::shared_ptr &request, std::shared_ptr &response, core::ProcessSession *session, core::ProcessContext *context, bool isSuccess, @@ -416,6 +364,7 @@ void InvokeHTTP::route(std::shared_ptr &request, std::shared_ptr // If the property to output the response flowfile regardless of status code is set then transfer it bool responseSent = false; if (always_output_response_ && response != nullptr) { + logger_->log_info("Outputting success and response"); session->transfer(response, Success); responseSent = true; } @@ -428,6 +377,7 @@ void InvokeHTTP::route(std::shared_ptr &request, std::shared_ptr session->transfer(request, Success); } if (response != nullptr && !responseSent) { + logger_->log_info("Outputting success and response"); session->transfer(response, Success); } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/processors/ListenHTTP.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/processors/ListenHTTP.cpp b/libminifi/src/processors/ListenHTTP.cpp index d410547..1e2241d 100644 --- a/libminifi/src/processors/ListenHTTP.cpp +++ b/libminifi/src/processors/ListenHTTP.cpp @@ -201,7 +201,7 @@ ListenHTTP::~ListenHTTP() { } void ListenHTTP::onTrigger(core::ProcessContext *context, core::ProcessSession *session) { - std::shared_ptr flowFile = std::static_pointer_cast < FlowFileRecord > (session->get()); + std::shared_ptr flowFile = std::static_pointer_cast(session->get()); // Do nothing if there are no incoming files if (!flowFile) { @@ -243,7 +243,7 @@ bool ListenHTTP::Handler::handlePost(CivetServer *server, struct mg_connection * auto session = _processSessionFactory->createSession(); ListenHTTP::WriteCallback callback(conn, req_info); - auto flowFile = std::static_pointer_cast < FlowFileRecord > (session->create()); + auto flowFile = std::static_pointer_cast(session->create()); if (!flowFile) { sendErrorResponse(conn); @@ -301,8 +301,10 @@ int64_t ListenHTTP::WriteCallback::process(std::shared_ptr strea int64_t tlen = _reqInfo->content_length; uint8_t buf[16384]; - while (nlen < tlen) { - rlen = tlen - nlen; + // if we have no content length we should call mg_read until + // there is no data left from the stream to be HTTP/1.1 compliant + while (tlen == -1 || nlen < tlen) { + rlen = tlen == -1 ? sizeof(buf) : tlen - nlen; if (rlen > sizeof(buf)) { rlen = sizeof(buf); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/processors/ListenSyslog.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/processors/ListenSyslog.cpp b/libminifi/src/processors/ListenSyslog.cpp index a5fdf28..054d585 100644 --- a/libminifi/src/processors/ListenSyslog.cpp +++ b/libminifi/src/processors/ListenSyslog.cpp @@ -279,7 +279,7 @@ void ListenSyslog::onTrigger(core::ProcessContext *context, core::ProcessSession SysLogEvent event = eventQueue.front(); eventQueue.pop(); if (firstEvent) { - flowFile = std::static_pointer_cast < FlowFileRecord > (session->create()); + flowFile = std::static_pointer_cast(session->create()); if (!flowFile) return; ListenSyslog::WriteCallback callback(event.payload, event.len); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/processors/LogAttribute.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/processors/LogAttribute.cpp b/libminifi/src/processors/LogAttribute.cpp index ad8e664..0c2f64e 100644 --- a/libminifi/src/processors/LogAttribute.cpp +++ b/libminifi/src/processors/LogAttribute.cpp @@ -64,6 +64,7 @@ void LogAttribute::initialize() { } void LogAttribute::onTrigger(core::ProcessContext *context, core::ProcessSession *session) { + logger_->log_info("enter log attribute"); std::string dashLine = "--------------------------------------------------"; LogAttrLevel level = LogAttrLevelInfo; bool logPayload = false; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/processors/TailFile.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/processors/TailFile.cpp b/libminifi/src/processors/TailFile.cpp index f87f4ec..711f226 100644 --- a/libminifi/src/processors/TailFile.cpp +++ b/libminifi/src/processors/TailFile.cpp @@ -247,11 +247,11 @@ void TailFile::onTrigger(core::ProcessContext *context, core::ProcessSession *se if (!this->_delimiter.empty()) { char delim = this->_delimiter.c_str()[0]; - std::vector> flowFiles = std::vector>(); + std::vector> flowFiles; session->import(fullPath, flowFiles, true, this->_currentTailFilePosition, delim); logger_->log_info("%d flowfiles were received from TailFile input", flowFiles.size()); - for (std::shared_ptr ffr : flowFiles) { + for (auto ffr : flowFiles) { logger_->log_info("TailFile %s for %d bytes", _currentTailFileName, ffr->getSize()); std::string logName = baseName + "." + std::to_string(_currentTailFilePosition) + "-" + std::to_string(_currentTailFilePosition + ffr->getSize()) + "." + extension; ffr->updateKeyedAttribute(PATH, fileLocation); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/provenance/Provenance.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/provenance/Provenance.cpp b/libminifi/src/provenance/Provenance.cpp index 3e42a5a..b46cbc0 100644 --- a/libminifi/src/provenance/Provenance.cpp +++ b/libminifi/src/provenance/Provenance.cpp @@ -218,10 +218,8 @@ bool ProvenanceEventRecord::Serialize(const std::shared_ptrSerialize(uuidStr_, const_cast(outStream.getBuffer()), outStream.getSize())) { - logger_->log_debug("NiFi Provenance Store event %s size %d success", uuidStr_, outStream.getSize()); - } else { + // Persist to the DB + if (!repo->Serialize(uuidStr_, const_cast(outStream.getBuffer()), outStream.getSize())) { logger_->log_error("NiFi Provenance Store event %s size %d fail", uuidStr_, outStream.getSize()); } return true; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/provenance/ProvenanceRepository.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/provenance/ProvenanceRepository.cpp b/libminifi/src/provenance/ProvenanceRepository.cpp index ce19fe4..665837c 100644 --- a/libminifi/src/provenance/ProvenanceRepository.cpp +++ b/libminifi/src/provenance/ProvenanceRepository.cpp @@ -17,6 +17,7 @@ */ #include "provenance/ProvenanceRepository.h" +#include "leveldb/write_batch.h" #include #include #include "provenance/Provenance.h" @@ -26,13 +27,39 @@ namespace nifi { namespace minifi { namespace provenance { +void ProvenanceRepository::flush() { + leveldb::WriteBatch batch; + std::string key; + std::string value; + leveldb::ReadOptions options; + uint64_t decrement_total = 0; + while (keys_to_delete.size_approx() > 0) { + if (keys_to_delete.try_dequeue(key)) { + db_->Get(options, key, &value); + decrement_total += value.size(); + batch.Delete(key); + logger_->log_info("Removing %s", key); + } + } + if (db_->Write(leveldb::WriteOptions(), &batch).ok()) { + logger_->log_info("Decrementing %u from a repo size of %u", decrement_total, repo_size_.load()); + if (decrement_total > repo_size_.load()) { + repo_size_ = 0; + } else { + repo_size_ -= decrement_total; + } + } +} + void ProvenanceRepository::run() { - // threshold for purge - uint64_t purgeThreshold = max_partition_bytes_ * 3 / 4; while (running_) { std::this_thread::sleep_for(std::chrono::milliseconds(purge_period_)); uint64_t curTime = getTimeMillis(); - uint64_t size = repoSize(); + // threshold for purge + uint64_t purgeThreshold = max_partition_bytes_ * 3 / 4; + + uint64_t size = getRepoSize(); + if (size >= purgeThreshold) { leveldb::Iterator* it = db_->NewIterator(leveldb::ReadOptions()); for (it->SeekToFirst(); it->Valid(); it->Next()) { @@ -49,12 +76,13 @@ void ProvenanceRepository::run() { } delete it; } + flush(); + size = getRepoSize(); if (size > max_partition_bytes_) repo_full_ = true; else repo_full_ = false; } - return; } } /* namespace provenance */ } /* namespace minifi */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/utils/HttpClient.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/utils/HttpClient.cpp b/libminifi/src/utils/HttpClient.cpp new file mode 100644 index 0000000..fddbf2f --- /dev/null +++ b/libminifi/src/utils/HttpClient.cpp @@ -0,0 +1,214 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "utils/HTTPClient.h" +#include +#include +#include +#include +#include + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace utils { + +HTTPClient::HTTPClient(const std::string &url, const std::shared_ptr ssl_context_service) + : ssl_context_service_(ssl_context_service), + url_(url), + logger_(logging::LoggerFactory::getLogger()), + connect_timeout_(0), + read_timeout_(0), + content_type(nullptr), + headers_(nullptr), + http_code(0), + header_response_(1), + res(CURLE_OK) { + HTTPClientInitializer *initializer = HTTPClientInitializer::getInstance(); + http_session_ = curl_easy_init(); +} + +HTTPClient::~HTTPClient() { + if (nullptr != headers_) { + curl_slist_free_all(headers_); + } + curl_easy_cleanup(http_session_); +} + +void HTTPClient::setVerbose() { + curl_easy_setopt(http_session_, CURLOPT_VERBOSE, 1L); +} + +void HTTPClient::initialize(const std::string &method) { + method_ = method; + set_request_method(method_); + if (isSecure(url_) && ssl_context_service_ != nullptr) { + configure_secure_connection(http_session_); + } +} + +void HTTPClient::setDisablePeerVerification() { + logger_->log_info("Disabling peer verification"); + curl_easy_setopt(http_session_, CURLOPT_SSL_VERIFYPEER, 0L); +} + +void HTTPClient::setConnectionTimeout(int64_t timeout) { + connect_timeout_ = timeout; +} + +void HTTPClient::setReadTimeout(int64_t timeout) { + read_timeout_ = timeout; +} + +void HTTPClient::setUploadCallback(HTTPUploadCallback *callbackObj) { + logger_->log_info("Setting callback"); + if (method_ == "put" || method_ == "PUT") { + curl_easy_setopt(http_session_, CURLOPT_INFILESIZE_LARGE, (curl_off_t) callbackObj->ptr->getBufferSize()); + } else { + curl_easy_setopt(http_session_, CURLOPT_POSTFIELDSIZE, (curl_off_t) callbackObj->ptr->getBufferSize()); + } + curl_easy_setopt(http_session_, CURLOPT_READFUNCTION, &utils::HTTPRequestResponse::send_write); + curl_easy_setopt(http_session_, CURLOPT_READDATA, static_cast(callbackObj)); +} + +struct curl_slist *HTTPClient::build_header_list(std::string regex, const std::map &attributes) { + if (http_session_) { + for (auto attribute : attributes) { + if (matches(attribute.first, regex)) { + std::string attr = attribute.first + ":" + attribute.second; + headers_ = curl_slist_append(headers_, attr.c_str()); + } + } + } + return headers_; +} + +void HTTPClient::setContentType(std::string content_type) { + content_type_ = "Content-Type: " + content_type; + headers_ = curl_slist_append(headers_, content_type_.c_str()); +} + +std::string HTTPClient::escape(std::string string_to_escape) { + return curl_easy_escape(http_session_, string_to_escape.c_str(), string_to_escape.length()); +} + +void HTTPClient::setPostFields(std::string input) { + curl_easy_setopt(http_session_, CURLOPT_POSTFIELDSIZE, input.length()); + curl_easy_setopt(http_session_, CURLOPT_POSTFIELDS, input.c_str()); +} + +void HTTPClient::setHeaders(struct curl_slist *list) { + headers_ = list; +} + +void HTTPClient::setUseChunkedEncoding() { + headers_ = curl_slist_append(headers_, "Transfer-Encoding: chunked"); +} + +bool HTTPClient::submit() { + if (connect_timeout_ > 0) { + curl_easy_setopt(http_session_, CURLOPT_CONNECTTIMEOUT, connect_timeout_); + } + + if (headers_ != nullptr) { + headers_ = curl_slist_append(headers_, "Expect:"); + curl_easy_setopt(http_session_, CURLOPT_HTTPHEADER, headers_); + } + + curl_easy_setopt(http_session_, CURLOPT_URL, url_.c_str()); + logger_->log_info("Submitting to %s", url_); + curl_easy_setopt(http_session_, CURLOPT_WRITEFUNCTION, &utils::HTTPRequestResponse::recieve_write); + curl_easy_setopt(http_session_, CURLOPT_WRITEDATA, static_cast(&content_)); + + curl_easy_setopt(http_session_, CURLOPT_HEADERFUNCTION, &utils::HTTPHeaderResponse::receive_headers); + curl_easy_setopt(http_session_, CURLOPT_HEADERDATA, static_cast(&header_response_)); + + res = curl_easy_perform(http_session_); + curl_easy_getinfo(http_session_, CURLINFO_RESPONSE_CODE, &http_code); + curl_easy_getinfo(http_session_, CURLINFO_CONTENT_TYPE, &content_type); + if (res != CURLE_OK) { + logger_->log_error("curl_easy_perform() failed %s\n", curl_easy_strerror(res)); + return false; + } + return true; +} + +CURLcode HTTPClient::getResponseResult() { + return res; +} + +int64_t &HTTPClient::getResponseCode() { + return http_code; +} + +const char *HTTPClient::getContentType() { + return content_type; +} + +const std::vector &HTTPClient::getResponseBody() { + return content_.data; +} + +void HTTPClient::set_request_method(const std::string method) { + std::string my_method = method; + std::transform(my_method.begin(), my_method.end(), my_method.begin(), ::toupper); + if (my_method == "POST") { + curl_easy_setopt(http_session_, CURLOPT_POST, 1L); + } else if (my_method == "PUT") { + curl_easy_setopt(http_session_, CURLOPT_PUT, 0L); + } else if (my_method == "GET") { + } else { + curl_easy_setopt(http_session_, CURLOPT_CUSTOMREQUEST, my_method.c_str()); + } +} + +bool HTTPClient::matches(const std::string &value, const std::string &sregex) { + if (sregex == ".*") + return true; + + regex_t regex; + int ret = regcomp(®ex, sregex.c_str(), 0); + if (ret) + return false; + ret = regexec(®ex, value.c_str(), (size_t) 0, NULL, 0); + regfree(®ex); + if (ret) + return false; + + return true; +} + +void HTTPClient::configure_secure_connection(CURL *http_session) { + logger_->log_debug("Using certificate file %s", ssl_context_service_->getCertificateFile()); + curl_easy_setopt(http_session, CURLOPT_SSL_CTX_FUNCTION, &configure_ssl_context); + curl_easy_setopt(http_session, CURLOPT_SSL_CTX_DATA, static_cast(ssl_context_service_.get())); +} + +bool HTTPClient::isSecure(const std::string &url) { + if (url.find("https") != std::string::npos) { + logger_->log_debug("%s is a secure url", url); + return true; + } + return false; +} + +} /* namespace utils */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/utils/Id.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/utils/Id.cpp b/libminifi/src/utils/Id.cpp index ee6d84d..0c76a79 100644 --- a/libminifi/src/utils/Id.cpp +++ b/libminifi/src/utils/Id.cpp @@ -38,9 +38,14 @@ namespace utils { uint64_t timestamp = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); -NonRepeatingStringGenerator::NonRepeatingStringGenerator() : prefix_((std::to_string(timestamp) + "-")) {} +NonRepeatingStringGenerator::NonRepeatingStringGenerator() + : prefix_((std::to_string(timestamp) + "-")) { +} -IdGenerator::IdGenerator() : implementation_(UUID_TIME_IMPL), logger_(logging::LoggerFactory::getLogger()), incrementor_(0) { +IdGenerator::IdGenerator() + : implementation_(UUID_TIME_IMPL), + logger_(logging::LoggerFactory::getLogger()), + incrementor_(0) { } uint64_t IdGenerator::getDeviceSegmentFromString(const std::string& str, int numBits) { @@ -134,12 +139,12 @@ void IdGenerator::generate(uuid_t output) { uuid_generate(output); break; case MINIFI_UID_IMPL: { - std::memcpy(output, deterministic_prefix_, sizeof(deterministic_prefix_)); - uint64_t incrementor_value = incrementor_++; - for (int i = 8; i < 16; i++) { - output[i] = (incrementor_value >> ((15 - i) * 8)) & UNSIGNED_CHAR_MAX; - } + std::memcpy(output, deterministic_prefix_, sizeof(deterministic_prefix_)); + uint64_t incrementor_value = incrementor_++; + for (int i = 8; i < 16; i++) { + output[i] = (incrementor_value >> ((15 - i) * 8)) & UNSIGNED_CHAR_MAX; } + } break; default: uuid_generate_time(output); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/test/.device_id ---------------------------------------------------------------------- diff --git a/libminifi/test/.device_id b/libminifi/test/.device_id new file mode 100644 index 0000000..500a690 --- /dev/null +++ b/libminifi/test/.device_id @@ -0,0 +1 @@ +2291398467776739051 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/test/TestBase.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/TestBase.cpp b/libminifi/test/TestBase.cpp index a471afe..c18153c 100644 --- a/libminifi/test/TestBase.cpp +++ b/libminifi/test/TestBase.cpp @@ -31,6 +31,7 @@ TestPlan::TestPlan(std::shared_ptr content_repo, std::s finalized(false), current_flowfile_(nullptr), logger_(logging::LoggerFactory::getLogger()) { + stream_factory = std::make_shared(std::make_shared()); } std::shared_ptr TestPlan::addProcessor(const std::shared_ptr &processor, const std::string &name, core::Relationship relationship, @@ -43,6 +44,7 @@ bool linkToPrevious) { uuid_t uuid; uuid_generate(uuid); + processor->setStreamFactory(stream_factory); // initialize the processor processor->initialize(); @@ -60,7 +62,7 @@ bool linkToPrevious) { std::stringstream connection_name; connection_name << last->getUUIDStr() << "-to-" << processor->getUUIDStr(); - logger_->log_info("Creating %s connection for proc %d",connection_name.str(),processor_queue_.size()+1); + logger_->log_info("Creating %s connection for proc %d", connection_name.str(), processor_queue_.size() + 1); std::shared_ptr connection = std::make_shared(flow_repo_, content_repo_, connection_name.str()); connection->setRelationship(relationship); @@ -84,7 +86,7 @@ bool linkToPrevious) { processor_nodes_.push_back(node); - std::shared_ptr context = std::make_shared(*(node.get()), controller_services_provider_, flow_repo_, prov_repo_, content_repo_); + std::shared_ptr context = std::make_shared(node, controller_services_provider_, prov_repo_, flow_repo_, content_repo_); processor_contexts_.push_back(context); processor_queue_.push_back(processor); @@ -116,7 +118,7 @@ bool linkToPrevious) { bool TestPlan::setProperty(const std::shared_ptr proc, const std::string &prop, const std::string &value) { std::lock_guard guard(mutex); int i = 0; - logger_->log_info("Attempting to set property %s %s for %s",prop,value,proc->getName()); + logger_->log_info("Attempting to set property %s %s for %s", prop, value, proc->getName()); for (i = 0; i < processor_queue_.size(); i++) { if (processor_queue_.at(i) == proc) { break; @@ -151,13 +153,13 @@ bool TestPlan::runNextProcessor(std::function processor = processor_queue_.at(location); std::shared_ptr context = processor_contexts_.at(location); - std::shared_ptr factory = std::make_shared(context.get()); + std::shared_ptr factory = std::make_shared(context); factories_.push_back(factory); if (std::find(configured_processors_.begin(), configured_processors_.end(), processor) == configured_processors_.end()) { - processor->onSchedule(context.get(), factory.get()); + processor->onSchedule(context, factory); configured_processors_.push_back(processor); } - std::shared_ptr current_session = std::make_shared(context.get()); + std::shared_ptr current_session = std::make_shared(context); process_sessions_.push_back(current_session); processor->incrementActiveTasks(); processor->setScheduledState(core::ScheduledState::RUNNING); @@ -165,7 +167,7 @@ bool TestPlan::runNextProcessor(std::functionlog_info("Running %s", processor->getName()); - processor->onTrigger(context.get(), current_session.get()); + processor->onTrigger(context, current_session); } current_session->commit(); current_flowfile_ = current_session->get(); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/test/TestBase.h ---------------------------------------------------------------------- diff --git a/libminifi/test/TestBase.h b/libminifi/test/TestBase.h index 7e2a5d7..4eba0a8 100644 --- a/libminifi/test/TestBase.h +++ b/libminifi/test/TestBase.h @@ -191,6 +191,8 @@ class TestPlan { std::shared_ptr buildFinalConnection(std::shared_ptr processor, bool setDest = false); + std::shared_ptr stream_factory; + std::atomic finalized; std::shared_ptr content_repo_; @@ -223,7 +225,7 @@ class TestController { TestController() : log(LogTestController::getInstance()) { - minifi::ResourceClaim::default_directory_path = const_cast("./"); + minifi::setDefaultDirectory("./"); log.reset(); utils::IdGenerator::getIdGenerator()->initialize(std::make_shared()); } @@ -235,8 +237,9 @@ class TestController { content_repo->initialize(configuration); + std::shared_ptr flow_repo = std::make_shared(); std::shared_ptr repo = std::make_shared(); - return std::make_shared(content_repo, repo, repo); + return std::make_shared(content_repo, flow_repo, repo); } void runSession(std::shared_ptr &plan, bool runToCompletion = true, std::function verify = nullptr) { @@ -247,6 +250,8 @@ class TestController { } } + + ~TestController() { for (auto dir : directories) { DIR *created_dir; @@ -262,8 +267,9 @@ class TestController { unlink(file.c_str()); } } + closedir(created_dir); } - closedir(created_dir); + rmdir(dir); } } @@ -276,6 +282,8 @@ class TestController { protected: + + std::mutex test_mutex; //std::map http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/test/TestServer.h ---------------------------------------------------------------------- diff --git a/libminifi/test/TestServer.h b/libminifi/test/TestServer.h index 263a6b3..29f6345 100644 --- a/libminifi/test/TestServer.h +++ b/libminifi/test/TestServer.h @@ -21,15 +21,15 @@ #include #include #include "civetweb.h" +#include "CivetServer.h" + /* Server context handle */ -static struct mg_context *ctx; static std::string resp_str; static int responder(struct mg_connection *conn, void *response) { const char *msg = resp_str.c_str(); - mg_printf(conn, "HTTP/1.1 200 OK\r\n" "Content-Length: %lu\r\n" "Content-Type: text/plain\r\n" @@ -45,42 +45,36 @@ void init_webserver() { mg_init_library(0); } -void start_webserver(std::string &port, std::string &rooturi, const std::string &response, struct mg_callbacks *callbacks, std::string &cert) { - std::cout << "root uri is " << rooturi << ":" << port << "/" << std::endl; - resp_str = response; - const char *options[] = { "listening_ports", port.c_str(), "ssl_certificate", cert.c_str(), "ssl_protocol_version", "3", "ssl_cipher_list", - "ECDHE-RSA-AES256-GCM-SHA384:DES-CBC3-SHA:AES128-SHA:AES128-GCM-SHA256", 0 }; +CivetServer * start_webserver(std::string &port, std::string &rooturi, CivetHandler *handler, struct mg_callbacks *callbacks, std::string &cert, std::string &ca_cert) { + const char *options[] = { "listening_ports", port.c_str(), "error_log_file", + "error.log", "ssl_certificate", ca_cert.c_str(), "ssl_protocol_version", "0", "ssl_cipher_list", + "ALL", "ssl_verify_peer", "no", 0 }; - if (!mg_check_feature(2)) { - std::cerr << "Error: Embedded example built with SSL support, " << "but civetweb library build without" << std::endl; - exit(1); + std::vector cpp_options; + for (int i = 0; i < (sizeof(options) / sizeof(options[0]) - 1); i++) { + cpp_options.push_back(options[i]); } + CivetServer *server = new CivetServer(cpp_options); - ctx = mg_start(callbacks, 0, options); - if (ctx == nullptr) { - std::cerr << "Cannot start CivetWeb - mg_start failed." << std::endl; - exit(1); - } + server->addHandler(rooturi, handler); - mg_set_request_handler(ctx, rooturi.c_str(), responder, (void*) &resp_str); + return server; } -void start_webserver(std::string &port, std::string &rooturi, const std::string &response) { +CivetServer * start_webserver(std::string &port, std::string &rooturi, CivetHandler *handler) { + const char *options[] = { "document_root", ".", "listening_ports", port.c_str(), 0 }; - std::cout << "root uri is " << rooturi << ":" << port << "/" << std::endl; - resp_str = response; - - const char *options[] = { "listening_ports", port.c_str(), 0 }; - ctx = mg_start(nullptr, 0, options); - - if (ctx == nullptr) { - std::cerr << "Cannot start CivetWeb - mg_start failed." << std::endl; - exit(1); + std::vector cpp_options; + for (int i = 0; i < (sizeof(options) / sizeof(options[0]) - 1); i++) { + cpp_options.push_back(options[i]); } + CivetServer *server = new CivetServer(cpp_options); + + server->addHandler(rooturi, handler); - mg_set_request_handler(ctx, rooturi.c_str(), responder, (void*) &resp_str); + return server; } @@ -126,9 +120,9 @@ bool parse_http_components(const std::string &url, std::string &port, std::strin } -static void stop_webserver() { - /* Stop the server */ - mg_stop(ctx); +static void stop_webserver(CivetServer *server) { + if (server != nullptr) + delete server; /* Un-initialize the library */ mg_exit_library(); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/test/integration/C2NullConfiguration.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/integration/C2NullConfiguration.cpp b/libminifi/test/integration/C2NullConfiguration.cpp new file mode 100644 index 0000000..394429f --- /dev/null +++ b/libminifi/test/integration/C2NullConfiguration.cpp @@ -0,0 +1,135 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#undef NDEBUG +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "../TestBase.h" +#include "utils/StringUtils.h" +#include "core/Core.h" +#include "../include/core/logging/Logger.h" +#include "core/ProcessGroup.h" +#include "core/yaml/YamlConfiguration.h" +#include "FlowController.h" +#include "properties/Configure.h" +#include "../unit/ProvenanceTestHelper.h" +#include "io/StreamFactory.h" +#include "CivetServer.h" +#include "RemoteProcessorGroupPort.h" +#include "core/ConfigurableComponent.h" +#include "controllers/SSLContextService.h" +#include "../TestServer.h" +#include "c2/C2Agent.h" +#include "c2/protocols/RESTReceiver.h" + +#include "IntegrationBase.h" + +class VerifyC2Server : public IntegrationBase { + public: + explicit VerifyC2Server(bool isSecure) + : isSecure(isSecure) { + char format[] = "/tmp/ssth.XXXXXX"; + dir = testController.createTempDirectory(format); + } + + void testSetup() { + LogTestController::getInstance().setDebug(); + LogTestController::getInstance().setDebug(); + LogTestController::getInstance().setDebug(); + LogTestController::getInstance().setDebug(); + LogTestController::getInstance().setDebug(); + LogTestController::getInstance().setDebug(); + std::fstream file; + ss << dir << "/" << "tstFile.ext"; + file.open(ss.str(), std::ios::out); + file << "tempFile"; + file.close(); + } + + void cleanup() { + unlink(ss.str().c_str()); + } + + void runAssertions() { + assert(LogTestController::getInstance().contains("C2Agent] [info] Class is null") == true); + assert(LogTestController::getInstance().contains("C2Agent] [debug] Could not instantiate null") == true); + assert(LogTestController::getInstance().contains("Class is RESTSender") == true); + } + + void queryRootProcessGroup(std::shared_ptr pg) { + std::shared_ptr proc = pg->findProcessor("invoke"); + assert(proc != nullptr); + + std::shared_ptr inv = std::dynamic_pointer_cast(proc); + + assert(inv != nullptr); + std::string url = ""; + inv->getProperty(minifi::processors::InvokeHTTP::URL.getName(), url); + + std::cout << "url is " << url << std::endl; + + std::string port, scheme, path; + parse_http_components(url, port, scheme, path); + std::cout << "path is " << path << std::endl; + configuration->set("c2.agent.protocol.class", "null"); + configuration->set("c2.rest.url", "null"); + configuration->set("c2.rest.url.ack", "null"); + configuration->set("c2.agent.heartbeat.reporter.classes", "null"); + configuration->set("c2.rest.listener.port", "null"); + configuration->set("c2.agent.heartbeat.period", "null"); + configuration->set("c2.rest.listener.heartbeat.rooturi", "null"); + } + + protected: + bool isSecure; + char *dir; + std::stringstream ss; + TestController testController; +}; + +int main(int argc, char **argv) { + std::string key_dir, test_file_location, url; + if (argc > 1) { + test_file_location = argv[1]; + key_dir = argv[2]; + } + + bool isSecure = false; + if (url.find("https") != std::string::npos) { + isSecure = true; + } + + VerifyC2Server harness(isSecure); + + harness.setKeyDir(key_dir); + + harness.run(test_file_location); + + return 0; +} +