nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From phroc...@apache.org
Subject nifi-minifi-cpp git commit: MINIFICPP-321: Support Maximum File Count in PutFile
Date Fri, 01 Dec 2017 14:36:53 GMT
Repository: nifi-minifi-cpp
Updated Branches:
  refs/heads/master 231ccaa14 -> a5495e375


MINIFICPP-321: Support Maximum File Count in PutFile

This closes #211.

Signed-off-by: Marc Parisi <phrocker@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/a5495e37
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/a5495e37
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/a5495e37

Branch: refs/heads/master
Commit: a5495e37536fd04e7817731314d409ced9d7eaea
Parents: 231ccaa
Author: Dustin Rodrigues <dust.rod@gmail.com>
Authored: Tue Nov 28 22:18:11 2017 -0500
Committer: Marc Parisi <phrocker@apache.org>
Committed: Fri Dec 1 09:36:38 2017 -0500

----------------------------------------------------------------------
 libminifi/include/processors/PutFile.h |  2 ++
 libminifi/src/processors/PutFile.cpp   | 45 +++++++++++++++++++++++++++--
 2 files changed, 44 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a5495e37/libminifi/include/processors/PutFile.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/PutFile.h b/libminifi/include/processors/PutFile.h
index 9872eb2..46a2f57 100644
--- a/libminifi/include/processors/PutFile.h
+++ b/libminifi/include/processors/PutFile.h
@@ -60,6 +60,7 @@ class PutFile : public core::Processor {
   static core::Property Directory;
   static core::Property ConflictResolution;
   static core::Property CreateDirs;
+  static core::Property MaxDestFiles;
   // Supported Relationships
   static core::Relationship Success;
   static core::Relationship Failure;
@@ -109,6 +110,7 @@ class PutFile : public core::Processor {
   std::string directory_;
   std::string conflict_resolution_;
   bool try_mkdirs_ = true;
+  int64_t max_dest_files_ = -1;
 
   bool putFile(core::ProcessSession *session,
                std::shared_ptr<FlowFileRecord> flowFile,

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a5495e37/libminifi/src/processors/PutFile.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/PutFile.cpp b/libminifi/src/processors/PutFile.cpp
index c2789d7..a36f6d7 100644
--- a/libminifi/src/processors/PutFile.cpp
+++ b/libminifi/src/processors/PutFile.cpp
@@ -21,6 +21,7 @@
 #include "processors/PutFile.h"
 
 #include <sys/stat.h>
+#include <dirent.h>
 #include <unistd.h>
 #include <uuid/uuid.h>
 #include <cstdint>
@@ -60,6 +61,10 @@ core::Property PutFile::CreateDirs(
     "If true, then missing destination directories will be created. "
         "If false, flowfiles are penalized and sent to failure.",
     "true");
+core::Property PutFile::MaxDestFiles(
+    "Maximum File Count",
+    "Specifies the maximum number of files that can exist in the output directory",
+    "-1");
 
 core::Relationship PutFile::Success(
     "success",
@@ -74,6 +79,7 @@ void PutFile::initialize() {
   properties.insert(Directory);
   properties.insert(ConflictResolution);
   properties.insert(CreateDirs);
+  properties.insert(MaxDestFiles);
   setSupportedProperties(properties);
   // Set the supported relationships
   std::set<core::Relationship> relationships;
@@ -91,9 +97,13 @@ void PutFile::onSchedule(core::ProcessContext *context, core::ProcessSessionFact
     logger_->log_error("Conflict Resolution Strategy attribute is missing or invalid");
   }
 
-  std::string try_mkdirs_conf;
-  context->getProperty(CreateDirs.getName(), try_mkdirs_conf);
-  utils::StringUtils::StringToBool(try_mkdirs_conf, try_mkdirs_);
+  std::string value;
+  context->getProperty(CreateDirs.getName(), value);
+  utils::StringUtils::StringToBool(value, try_mkdirs_);
+
+  if (context->getProperty(MaxDestFiles.getName(), value)) {
+    core::Property::StringToInt(value, max_dest_files_);
+  }
 }
 
 void PutFile::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
@@ -125,6 +135,35 @@ void PutFile::onTrigger(core::ProcessContext *context, core::ProcessSession
*ses
   // If file exists, apply conflict resolution strategy
   struct stat statResult;
 
+  if ((max_dest_files_ != -1) && (stat(directory_.c_str(), &statResult) == 0))
{
+    // something exists at directory path
+    if (S_ISDIR(statResult.st_mode)) {
+      // it's a directory, count the files
+      DIR *myDir = opendir(directory_.c_str());
+      if (!myDir) {
+        logger_->log_warn("Could not open %s", directory_.c_str());
+        session->transfer(flowFile, Failure);
+        return;
+      }
+      struct dirent* entry = nullptr;
+
+      int64_t ct = 0;
+      while ((entry = readdir(myDir)) != nullptr) {
+        if ((strcmp(entry->d_name, ".") != 0) && (strcmp(entry->d_name, "..")
!= 0)) {
+          ct++;
+          if (ct >= max_dest_files_) {
+            logger_->log_warn("Routing to failure because the output directory %s has
at least %u files, which exceeds the "
+                "configured max number of files", directory_.c_str(), max_dest_files_);
+            session->transfer(flowFile, Failure);
+            closedir(myDir);
+            return;
+          }
+        }
+      }
+      closedir(myDir);
+    }
+  }
+
   if (stat(destFile.c_str(), &statResult) == 0) {
     logger_->log_info("Destination file %s exists; applying Conflict Resolution Strategy:
%s",
                       destFile.c_str(),


Mime
View raw message