Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 180AF200D60 for ; Fri, 1 Dec 2017 15:36:55 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 16BF8160C06; Fri, 1 Dec 2017 14:36:55 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 5C7C2160BFB for ; Fri, 1 Dec 2017 15:36:54 +0100 (CET) Received: (qmail 4091 invoked by uid 500); 1 Dec 2017 14:36:53 -0000 Mailing-List: contact commits-help@nifi.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@nifi.apache.org Delivered-To: mailing list commits@nifi.apache.org Received: (qmail 4082 invoked by uid 99); 1 Dec 2017 14:36:53 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 01 Dec 2017 14:36:53 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 6F01DDFF81; Fri, 1 Dec 2017 14:36:53 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: phrocker@apache.org To: commits@nifi.apache.org Message-Id: <73cd7a4ed0cd4d16bffdcdb3323ec56f@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: nifi-minifi-cpp git commit: MINIFICPP-321: Support Maximum File Count in PutFile Date: Fri, 1 Dec 2017 14:36:53 +0000 (UTC) archived-at: Fri, 01 Dec 2017 14:36:55 -0000 Repository: nifi-minifi-cpp Updated Branches: refs/heads/master 231ccaa14 -> a5495e375 MINIFICPP-321: Support Maximum File Count in PutFile This closes #211. Signed-off-by: Marc Parisi Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/a5495e37 Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/a5495e37 Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/a5495e37 Branch: refs/heads/master Commit: a5495e37536fd04e7817731314d409ced9d7eaea Parents: 231ccaa Author: Dustin Rodrigues Authored: Tue Nov 28 22:18:11 2017 -0500 Committer: Marc Parisi Committed: Fri Dec 1 09:36:38 2017 -0500 ---------------------------------------------------------------------- libminifi/include/processors/PutFile.h | 2 ++ libminifi/src/processors/PutFile.cpp | 45 +++++++++++++++++++++++++++-- 2 files changed, 44 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a5495e37/libminifi/include/processors/PutFile.h ---------------------------------------------------------------------- diff --git a/libminifi/include/processors/PutFile.h b/libminifi/include/processors/PutFile.h index 9872eb2..46a2f57 100644 --- a/libminifi/include/processors/PutFile.h +++ b/libminifi/include/processors/PutFile.h @@ -60,6 +60,7 @@ class PutFile : public core::Processor { static core::Property Directory; static core::Property ConflictResolution; static core::Property CreateDirs; + static core::Property MaxDestFiles; // Supported Relationships static core::Relationship Success; static core::Relationship Failure; @@ -109,6 +110,7 @@ class PutFile : public core::Processor { std::string directory_; std::string conflict_resolution_; bool try_mkdirs_ = true; + int64_t max_dest_files_ = -1; bool putFile(core::ProcessSession *session, std::shared_ptr flowFile, http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a5495e37/libminifi/src/processors/PutFile.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/processors/PutFile.cpp b/libminifi/src/processors/PutFile.cpp index c2789d7..a36f6d7 100644 --- a/libminifi/src/processors/PutFile.cpp +++ b/libminifi/src/processors/PutFile.cpp @@ -21,6 +21,7 @@ #include "processors/PutFile.h" #include +#include #include #include #include @@ -60,6 +61,10 @@ core::Property PutFile::CreateDirs( "If true, then missing destination directories will be created. " "If false, flowfiles are penalized and sent to failure.", "true"); +core::Property PutFile::MaxDestFiles( + "Maximum File Count", + "Specifies the maximum number of files that can exist in the output directory", + "-1"); core::Relationship PutFile::Success( "success", @@ -74,6 +79,7 @@ void PutFile::initialize() { properties.insert(Directory); properties.insert(ConflictResolution); properties.insert(CreateDirs); + properties.insert(MaxDestFiles); setSupportedProperties(properties); // Set the supported relationships std::set relationships; @@ -91,9 +97,13 @@ void PutFile::onSchedule(core::ProcessContext *context, core::ProcessSessionFact logger_->log_error("Conflict Resolution Strategy attribute is missing or invalid"); } - std::string try_mkdirs_conf; - context->getProperty(CreateDirs.getName(), try_mkdirs_conf); - utils::StringUtils::StringToBool(try_mkdirs_conf, try_mkdirs_); + std::string value; + context->getProperty(CreateDirs.getName(), value); + utils::StringUtils::StringToBool(value, try_mkdirs_); + + if (context->getProperty(MaxDestFiles.getName(), value)) { + core::Property::StringToInt(value, max_dest_files_); + } } void PutFile::onTrigger(core::ProcessContext *context, core::ProcessSession *session) { @@ -125,6 +135,35 @@ void PutFile::onTrigger(core::ProcessContext *context, core::ProcessSession *ses // If file exists, apply conflict resolution strategy struct stat statResult; + if ((max_dest_files_ != -1) && (stat(directory_.c_str(), &statResult) == 0)) { + // something exists at directory path + if (S_ISDIR(statResult.st_mode)) { + // it's a directory, count the files + DIR *myDir = opendir(directory_.c_str()); + if (!myDir) { + logger_->log_warn("Could not open %s", directory_.c_str()); + session->transfer(flowFile, Failure); + return; + } + struct dirent* entry = nullptr; + + int64_t ct = 0; + while ((entry = readdir(myDir)) != nullptr) { + if ((strcmp(entry->d_name, ".") != 0) && (strcmp(entry->d_name, "..") != 0)) { + ct++; + if (ct >= max_dest_files_) { + logger_->log_warn("Routing to failure because the output directory %s has at least %u files, which exceeds the " + "configured max number of files", directory_.c_str(), max_dest_files_); + session->transfer(flowFile, Failure); + closedir(myDir); + return; + } + } + } + closedir(myDir); + } + } + if (stat(destFile.c_str(), &statResult) == 0) { logger_->log_info("Destination file %s exists; applying Conflict Resolution Strategy: %s", destFile.c_str(),