nifi-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #791: MINIFICPP-1177 Improvements to the TailFile processor
Date Fri, 12 Jun 2020 15:02:20 GMT

arpadboda commented on a change in pull request #791:
URL: https://github.com/apache/nifi-minifi-cpp/pull/791#discussion_r439473816



##########
File path: extensions/standard-processors/processors/TailFile.cpp
##########
@@ -40,60 +35,283 @@
 #include <utility>
 #include <vector>
 
+#include "io/CRCStream.h"
 #include "utils/file/FileUtils.h"
 #include "utils/file/PathUtils.h"
 #include "utils/TimeUtil.h"
 #include "utils/StringUtils.h"
 #include "utils/RegexUtils.h"
-#ifdef HAVE_REGEX_CPP
-#include <regex>
-#else
-#include <regex.h>
-#endif
 #include "TailFile.h"
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
 
-#ifndef S_ISDIR
-#define S_ISDIR(mode)  (((mode) & S_IFMT) == S_IFDIR)
-#endif
-
-#if defined(__clang__)
-#pragma clang diagnostic push
-#pragma clang diagnostic ignored "-Wsign-compare"
-#elif defined(__GNUC__) || defined(__GNUG__)
-#pragma GCC diagnostic push
-#pragma GCC diagnostic ignored "-Wsign-compare"
-#endif
-
 namespace org {
 namespace apache {
 namespace nifi {
 namespace minifi {
 namespace processors {
 
-core::Property TailFile::FileName("File to Tail", "Fully-qualified filename of the file that
should be tailed when using single file mode, or a file regex when using multifile mode",
"");
-core::Property TailFile::StateFile("State File", "DEPRECATED. Only use it for state migration
from the legacy state file.",
-                                   "TailFileState");
-core::Property TailFile::Delimiter("Input Delimiter", "Specifies the character that should
be used for delimiting the data being tailed"
-                                   "from the incoming file."
-                                   "If none is specified, data will be ingested as it becomes
available.",
-                                   "");
+core::Property TailFile::FileName(
+    core::PropertyBuilder::createProperty("File to Tail")
+        ->withDescription("Fully-qualified filename of the file that should be tailed
when using single file mode, or a file regex when using multifile mode")
+        ->isRequired(true)
+        ->build());
+
+core::Property TailFile::StateFile(
+    core::PropertyBuilder::createProperty("State File")
+        ->withDescription("DEPRECATED. Only use it for state migration from the legacy
state file.")
+        ->isRequired(false)
+        ->withDefaultValue<std::string>("TailFileState")
+        ->build());
+
+core::Property TailFile::Delimiter(
+    core::PropertyBuilder::createProperty("Input Delimiter")
+        ->withDescription("Specifies the character that should be used for delimiting
the data being tailed"
+         "from the incoming file. If none is specified, data will be ingested as it becomes
available.")
+        ->isRequired(false)
+        ->withDefaultValue<std::string>("\\n")
+        ->build());
 
 core::Property TailFile::TailMode(
-    core::PropertyBuilder::createProperty("tail-mode", "Tailing Mode")->withDescription(
-        "Specifies the tail file mode. In 'Single file' mode only a single file will be watched.
"
+    core::PropertyBuilder::createProperty("tail-mode", "Tailing Mode")
+        ->withDescription("Specifies the tail file mode. In 'Single file' mode only a
single file will be watched. "
         "In 'Multiple file' mode a regex may be used. Note that in multiple file mode we
will still continue to watch for rollover on the initial set of watched files. "
         "The Regex used to locate multiple files will be run during the schedule phrase.
Note that if rotated files are matched by the regex, those files will be tailed.")->isRequired(true)
-        ->withAllowableValue<std::string>("Single file")->withAllowableValue("Multiple
file")->withDefaultValue("Single file")->build());
-
-core::Property TailFile::BaseDirectory(core::PropertyBuilder::createProperty("tail-base-directory",
"Base Directory")->isRequired(false)->build());
+        ->withAllowableValue<std::string>("Single file")->withAllowableValue("Multiple
file")->withDefaultValue("Single file")
+        ->build());
+
+core::Property TailFile::BaseDirectory(
+    core::PropertyBuilder::createProperty("tail-base-directory", "Base Directory")
+        ->withDescription("Base directory used to look for files to tail. This property
is required when using Multiple file mode.")
+        ->isRequired(false)
+        ->build());
+
+core::Property TailFile::RecursiveLookup(
+    core::PropertyBuilder::createProperty("Recursive lookup")
+        ->withDescription("When using Multiple file mode, this property determines whether
files are tailed in "
+        "child directories of the Base Directory or not.")
+        ->isRequired(false)
+        ->withDefaultValue<bool>(false)
+        ->build());
+
+core::Property TailFile::LookupFrequency(
+    core::PropertyBuilder::createProperty("Lookup frequency")
+        ->withDescription("When using Multiple file mode, this property specifies the
minimum duration "
+        "the processor will wait between looking for new files to tail in the Base Directory.")
+        ->isRequired(false)
+        ->withDefaultValue<core::TimePeriodValue>("10 min")
+        ->build());
+
+core::Property TailFile::RollingFilenamePattern(
+    core::PropertyBuilder::createProperty("Rolling Filename Pattern")
+        ->withDescription("If the file to tail \"rolls over\" as would be the case with
log files, this filename pattern will be used to "
+        "identify files that have rolled over so MiNiFi can read the remaining of the rolled-over
file and then continue with the new log file. "
+        "This pattern supports the wildcard characters * and ?, it also supports the notation
${filename} to specify a pattern based on the name of the file "
+        "(without extension), and will assume that the files that have rolled over live in
the same directory as the file being tailed.")
+        ->isRequired(false)
+        ->withDefaultValue<std::string>("${filename}.*")
+        ->build());
 
 core::Relationship TailFile::Success("success", "All files are routed to success");
 
 const char *TailFile::CURRENT_STR = "CURRENT.";
 const char *TailFile::POSITION_STR = "POSITION.";
 
+namespace {
+template<typename Container, typename Key>
+bool containsKey(const Container &container, const Key &key) {
+  return container.find(key) != container.end();
+}
+
+template <typename Container, typename Key>
+uint64_t readOptionalUint64(const Container &container, const Key &key) {
+  const auto it = container.find(key);
+  if (it != container.end()) {
+    return std::stoull(it->second);
+  } else {
+    return 0;
+  }
+}
+
+// the delimiter is the first character of the input, allowing some escape sequences
+std::string parseDelimiter(const std::string &input) {
+  if (input.empty()) return "";
+  if (input[0] != '\\') return std::string{ input[0] };
+  if (input.size() == std::size_t{1}) return "\\";
+  switch (input[1]) {
+    case 'r': return "\r";
+    case 't': return "\t";
+    case 'n': return "\n";
+    default: return std::string{ input[1] };
+  }
+}
+
+std::map<std::string, TailState> update_keys_in_legacy_states(const std::map<std::string,
TailState> &legacy_tail_states) {
+  std::map<std::string, TailState> new_tail_states;
+  for (const auto &key_value_pair : legacy_tail_states) {
+    const TailState &state = key_value_pair.second;
+    std::string full_file_name = utils::file::FileUtils::concat_path(state.path_, state.file_name_);
+    new_tail_states.emplace(full_file_name, state);
+  }
+  return new_tail_states;
+}
+
+struct TailStateWithMtime {
+  using TimePoint = std::chrono::time_point<std::chrono::system_clock, std::chrono::seconds>;
+
+  TailStateWithMtime(TailState tail_state, TimePoint mtime)
+    : tail_state_(std::move(tail_state)), mtime_(mtime) {}
+
+  TailState tail_state_;
+  TimePoint mtime_;
+};
+
+void openFile(const std::string &file_name, uint64_t offset, std::ifstream &input_stream,
const std::shared_ptr<logging::Logger> &logger) {
+  logger->log_debug("Opening %s", file_name);
+  input_stream.open(file_name.c_str(), std::fstream::in | std::fstream::binary);
+  if (!input_stream.is_open() || !input_stream.good()) {
+    input_stream.close();
+    throw Exception(FILE_OPERATION_EXCEPTION, "Could not open file: " + file_name);
+  }
+  if (offset != 0U) {
+    input_stream.seekg(offset, std::ifstream::beg);
+    if (!input_stream.good()) {
+      logger->log_error("Seeking to %lu failed for file %s (does file/filesystem support
seeking?)", offset, file_name);
+      throw Exception(FILE_OPERATION_EXCEPTION, "Could not seek file " + file_name + " to
offset " + std::to_string(offset));
+    }
+  }
+}
+
+class FileReaderCallback : public OutputStreamCallback {
+ public:
+  FileReaderCallback(const std::string &file_name,
+                     uint64_t offset,
+                     char input_delimiter,
+                     uint64_t checksum)
+    : input_delimiter_(input_delimiter),
+      checksum_(checksum),
+      logger_(logging::LoggerFactory<TailFile>::getLogger()) {
+    openFile(file_name, offset, input_stream_, logger_);
+  }
+
+  int64_t process(std::shared_ptr<io::BaseStream> output_stream) override {
+    io::CRCStream<io::BaseStream> crc_stream{output_stream.get(), checksum_};
+
+    uint64_t num_bytes_written = 0;
+    bool found_delimiter = false;
+
+    while (hasMoreToRead() && !found_delimiter) {
+      if (begin == end) {
+        input_stream_.read(reinterpret_cast<char *>(buffer.data()), buffer.size());
+
+        std::streamsize num_bytes_read = input_stream_.gcount();
+        logger_->log_trace("Read %d bytes of input", num_bytes_read);
+
+        begin = buffer.data();
+        end = begin + num_bytes_read;
+      }
+
+      uint8_t *delimiter_pos = std::find(begin, end, input_delimiter_);
+      found_delimiter = (delimiter_pos != end);
+
+      ptrdiff_t zlen{std::distance(begin, delimiter_pos)};
+      if (found_delimiter) {
+        zlen += 1;
+      }
+      if (zlen < (std::numeric_limits<int>::min)() || zlen > (std::numeric_limits<int>::max)())
{
+        logger_->log_error("narrowing conversion failed");
+      }
+      const int len = zlen;
+
+      crc_stream.write(begin, len);
+      num_bytes_written += len;
+      begin += len;
+    }
+
+    if (found_delimiter) {
+      checksum_ = crc_stream.getCRC();
+    } else {
+      latest_flow_file_ends_with_delimiter_ = false;
+    }
+
+    return num_bytes_written;
+  }
+
+  uint64_t checksum() const {
+    return checksum_;
+  }
+
+  bool hasMoreToRead() const {
+    return begin != end || input_stream_.good();

Review comment:
       Sounds good to me, thanks!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



Mime
View raw message