Repository: nifi-minifi-cpp
Updated Branches:
refs/heads/master 231ccaa14 -> a5495e375
MINIFICPP-321: Support Maximum File Count in PutFile
This closes #211.
Signed-off-by: Marc Parisi <phrocker@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/a5495e37
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/a5495e37
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/a5495e37
Branch: refs/heads/master
Commit: a5495e37536fd04e7817731314d409ced9d7eaea
Parents: 231ccaa
Author: Dustin Rodrigues <dust.rod@gmail.com>
Authored: Tue Nov 28 22:18:11 2017 -0500
Committer: Marc Parisi <phrocker@apache.org>
Committed: Fri Dec 1 09:36:38 2017 -0500
----------------------------------------------------------------------
libminifi/include/processors/PutFile.h | 2 ++
libminifi/src/processors/PutFile.cpp | 45 +++++++++++++++++++++++++++--
2 files changed, 44 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a5495e37/libminifi/include/processors/PutFile.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/PutFile.h b/libminifi/include/processors/PutFile.h
index 9872eb2..46a2f57 100644
--- a/libminifi/include/processors/PutFile.h
+++ b/libminifi/include/processors/PutFile.h
@@ -60,6 +60,7 @@ class PutFile : public core::Processor {
static core::Property Directory;
static core::Property ConflictResolution;
static core::Property CreateDirs;
+ static core::Property MaxDestFiles;
// Supported Relationships
static core::Relationship Success;
static core::Relationship Failure;
@@ -109,6 +110,7 @@ class PutFile : public core::Processor {
std::string directory_;
std::string conflict_resolution_;
bool try_mkdirs_ = true;
+ int64_t max_dest_files_ = -1;
bool putFile(core::ProcessSession *session,
std::shared_ptr<FlowFileRecord> flowFile,
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a5495e37/libminifi/src/processors/PutFile.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/PutFile.cpp b/libminifi/src/processors/PutFile.cpp
index c2789d7..a36f6d7 100644
--- a/libminifi/src/processors/PutFile.cpp
+++ b/libminifi/src/processors/PutFile.cpp
@@ -21,6 +21,7 @@
#include "processors/PutFile.h"
#include <sys/stat.h>
+#include <dirent.h>
#include <unistd.h>
#include <uuid/uuid.h>
#include <cstdint>
@@ -60,6 +61,10 @@ core::Property PutFile::CreateDirs(
"If true, then missing destination directories will be created. "
"If false, flowfiles are penalized and sent to failure.",
"true");
+core::Property PutFile::MaxDestFiles(
+ "Maximum File Count",
+ "Specifies the maximum number of files that can exist in the output directory",
+ "-1");
core::Relationship PutFile::Success(
"success",
@@ -74,6 +79,7 @@ void PutFile::initialize() {
properties.insert(Directory);
properties.insert(ConflictResolution);
properties.insert(CreateDirs);
+ properties.insert(MaxDestFiles);
setSupportedProperties(properties);
// Set the supported relationships
std::set<core::Relationship> relationships;
@@ -91,9 +97,13 @@ void PutFile::onSchedule(core::ProcessContext *context, core::ProcessSessionFact
logger_->log_error("Conflict Resolution Strategy attribute is missing or invalid");
}
- std::string try_mkdirs_conf;
- context->getProperty(CreateDirs.getName(), try_mkdirs_conf);
- utils::StringUtils::StringToBool(try_mkdirs_conf, try_mkdirs_);
+ std::string value;
+ context->getProperty(CreateDirs.getName(), value);
+ utils::StringUtils::StringToBool(value, try_mkdirs_);
+
+ if (context->getProperty(MaxDestFiles.getName(), value)) {
+ core::Property::StringToInt(value, max_dest_files_);
+ }
}
void PutFile::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
@@ -125,6 +135,35 @@ void PutFile::onTrigger(core::ProcessContext *context, core::ProcessSession
*ses
// If file exists, apply conflict resolution strategy
struct stat statResult;
+ if ((max_dest_files_ != -1) && (stat(directory_.c_str(), &statResult) == 0))
{
+ // something exists at directory path
+ if (S_ISDIR(statResult.st_mode)) {
+ // it's a directory, count the files
+ DIR *myDir = opendir(directory_.c_str());
+ if (!myDir) {
+ logger_->log_warn("Could not open %s", directory_.c_str());
+ session->transfer(flowFile, Failure);
+ return;
+ }
+ struct dirent* entry = nullptr;
+
+ int64_t ct = 0;
+ while ((entry = readdir(myDir)) != nullptr) {
+ if ((strcmp(entry->d_name, ".") != 0) && (strcmp(entry->d_name, "..")
!= 0)) {
+ ct++;
+ if (ct >= max_dest_files_) {
+ logger_->log_warn("Routing to failure because the output directory %s has
at least %u files, which exceeds the "
+ "configured max number of files", directory_.c_str(), max_dest_files_);
+ session->transfer(flowFile, Failure);
+ closedir(myDir);
+ return;
+ }
+ }
+ }
+ closedir(myDir);
+ }
+ }
+
if (stat(destFile.c_str(), &statResult) == 0) {
logger_->log_info("Destination file %s exists; applying Conflict Resolution Strategy:
%s",
destFile.c_str(),
|