nifi-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #586: MINIFICPP-911 - Added ListSFTP and FetchSFTP processors
Date Tue, 11 Jun 2019 13:49:34 GMT
bakaid commented on a change in pull request #586: MINIFICPP-911 - Added ListSFTP and FetchSFTP
processors
URL: https://github.com/apache/nifi-minifi-cpp/pull/586#discussion_r292462500
 
 

 ##########
 File path: extensions/sftp/processors/ListSFTP.cpp
 ##########
 @@ -0,0 +1,1320 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ListSFTP.h"
+
+#include <memory>
+#include <algorithm>
+#include <cctype>
+#include <cstdint>
+#include <cstring>
+#include <iostream>
+#include <iterator>
+#include <limits>
+#include <map>
+#include <set>
+#include <list>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "utils/ByteArrayCallback.h"
+#include "utils/TimeUtil.h"
+#include "utils/StringUtils.h"
+#include "utils/ScopeGuard.h"
+#include "utils/file/FileUtils.h"
+#include "core/FlowFile.h"
+#include "core/logging/Logger.h"
+#include "core/ProcessContext.h"
+#include "core/Relationship.h"
+#include "io/DataStream.h"
+#include "io/StreamFactory.h"
+#include "ResourceClaim.h"
+
+#include "rapidjson/document.h"
+#include "rapidjson/ostreamwrapper.h"
+#include "rapidjson/istreamwrapper.h"
+#include "rapidjson/writer.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+core::Property ListSFTP::ListingStrategy(
+    core::PropertyBuilder::createProperty("Listing Strategy")->withDescription("Specify
how to determine new/updated entities. See each strategy descriptions for detail.")
+        ->isRequired(true)
+        ->withAllowableValues<std::string>({LISTING_STRATEGY_TRACKING_TIMESTAMPS,
+                                            LISTING_STRATEGY_TRACKING_ENTITIES})
+        ->withDefaultValue(LISTING_STRATEGY_TRACKING_TIMESTAMPS)->build());
+core::Property ListSFTP::Hostname(
+    core::PropertyBuilder::createProperty("Hostname")->withDescription("The fully qualified
hostname or IP address of the remote system")
+        ->isRequired(true)->supportsExpressionLanguage(true)->build());
+core::Property ListSFTP::Port(
+    core::PropertyBuilder::createProperty("Port")->withDescription("The port that the
remote system is listening on for file transfers")
+        ->isRequired(true)->supportsExpressionLanguage(true)->build());
+core::Property ListSFTP::Username(
+    core::PropertyBuilder::createProperty("Username")->withDescription("Username")
+        ->isRequired(true)->supportsExpressionLanguage(true)->build());
+core::Property ListSFTP::Password(
+    core::PropertyBuilder::createProperty("Password")->withDescription("Password for the
user account")
+        ->isRequired(false)->supportsExpressionLanguage(true)->build());
+core::Property ListSFTP::PrivateKeyPath(
+    core::PropertyBuilder::createProperty("Private Key Path")->withDescription("The fully
qualified path to the Private Key file")
+        ->isRequired(false)->supportsExpressionLanguage(true)->build());
+core::Property ListSFTP::PrivateKeyPassphrase(
+    core::PropertyBuilder::createProperty("Private Key Passphrase")->withDescription("Password
for the private key")
+        ->isRequired(false)->supportsExpressionLanguage(true)->build());
+core::Property ListSFTP::RemotePath(
+    core::PropertyBuilder::createProperty("Remote Path")->withDescription("The fully qualified
filename on the remote system")
+        ->isRequired(false)->supportsExpressionLanguage(true)->build());
+core::Property ListSFTP::SearchRecursively(
+    core::PropertyBuilder::createProperty("Search Recursively")->withDescription("If true,
will pull files from arbitrarily nested subdirectories; "
+                                                                                 "otherwise,
will not traverse subdirectories")
+        ->isRequired(true)->withDefaultValue<bool>(false)->build());
+core::Property ListSFTP::FollowSymlink(
+    core::PropertyBuilder::createProperty("Follow symlink")->withDescription("If true,
will pull even symbolic files and also nested symbolic subdirectories; "
+                                                                             "otherwise,
will not read symbolic files and will not traverse symbolic link subdirectories")
+        ->isRequired(true)->withDefaultValue<bool>(false)->build());
+core::Property ListSFTP::FileFilterRegex(
+    core::PropertyBuilder::createProperty("File Filter Regex")->withDescription("Provides
a Java Regular Expression for filtering Filenames; "
+                                                                                "if a filter
is supplied, only files whose names match that Regular Expression will be fetched")
+        ->isRequired(false)->build());
+core::Property ListSFTP::PathFilterRegex(
+    core::PropertyBuilder::createProperty("Path Filter Regex")->withDescription("When
Search Recursively is true, then only subdirectories whose path matches the given Regular
Expression will be scanned")
+        ->isRequired(false)->build());
+core::Property ListSFTP::IgnoreDottedFiles(
+    core::PropertyBuilder::createProperty("Ignore Dotted Files")->withDescription("If
true, files whose names begin with a dot (\".\") will be ignored")
+        ->isRequired(true)->withDefaultValue<bool>(true)->build());
+core::Property ListSFTP::StrictHostKeyChecking(
+    core::PropertyBuilder::createProperty("Strict Host Key Checking")->withDescription("Indicates
whether or not strict enforcement of hosts keys should be applied")
+        ->isRequired(true)->withDefaultValue<bool>(false)->build());
+core::Property ListSFTP::HostKeyFile(
+    core::PropertyBuilder::createProperty("Host Key File")->withDescription("If supplied,
the given file will be used as the Host Key; otherwise, no use host key file will be used")
+        ->isRequired(false)->build());
+core::Property ListSFTP::ConnectionTimeout(
+    core::PropertyBuilder::createProperty("Connection Timeout")->withDescription("Amount
of time to wait before timing out while creating a connection")
+        ->isRequired(true)->withDefaultValue<core::TimePeriodValue>("30 sec")->build());
+core::Property ListSFTP::DataTimeout(
+    core::PropertyBuilder::createProperty("Data Timeout")->withDescription("When transferring
a file between the local and remote system, this value specifies how long is allowed to elapse
without any data being transferred between systems")
+        ->isRequired(true)->withDefaultValue<core::TimePeriodValue>("30 sec")->build());
+core::Property ListSFTP::SendKeepaliveOnTimeout(
+    core::PropertyBuilder::createProperty("Send Keep Alive On Timeout")->withDescription("Indicates
whether or not to send a single Keep Alive message when SSH socket times out")
+        ->isRequired(true)->withDefaultValue<bool>(true)->build());
+core::Property ListSFTP::TargetSystemTimestampPrecision(
+    core::PropertyBuilder::createProperty("Target System Timestamp Precision")->withDescription("Specify
timestamp precision at the target system. "
+                                                                                        
       "Since this processor uses timestamp of entities to decide which should be listed,
"
+                                                                                        
       "it is crucial to use the right timestamp precision.")
+        ->isRequired(true)
+        ->withAllowableValues<std::string>({TARGET_SYSTEM_TIMESTAMP_PRECISION_AUTO_DETECT,
+                                            TARGET_SYSTEM_TIMESTAMP_PRECISION_MILLISECONDS,
+                                            TARGET_SYSTEM_TIMESTAMP_PRECISION_SECONDS,
+                                            TARGET_SYSTEM_TIMESTAMP_PRECISION_MINUTES})
+        ->withDefaultValue(TARGET_SYSTEM_TIMESTAMP_PRECISION_AUTO_DETECT)->build());
+core::Property ListSFTP::ProxyType(
+    core::PropertyBuilder::createProperty("Proxy Type")->withDescription("Specifies the
Proxy Configuration Controller Service to proxy network requests. If set, it supersedes proxy
settings configured per component. "
+                                                                         "Supported proxies:
HTTP + AuthN, SOCKS + AuthN")
+        ->isRequired(false)
+        ->withAllowableValues<std::string>({PROXY_TYPE_DIRECT,
+                                            PROXY_TYPE_HTTP,
+                                            PROXY_TYPE_SOCKS})
+        ->withDefaultValue(PROXY_TYPE_DIRECT)->build());
+core::Property ListSFTP::ProxyHost(
+    core::PropertyBuilder::createProperty("Proxy Host")->withDescription("The fully qualified
hostname or IP address of the proxy server")
+        ->isRequired(false)->supportsExpressionLanguage(true)->build());
+core::Property ListSFTP::ProxyPort(
+    core::PropertyBuilder::createProperty("Proxy Port")->withDescription("The port of
the proxy server")
+        ->isRequired(false)->supportsExpressionLanguage(true)->build());
+core::Property ListSFTP::HttpProxyUsername(
+    core::PropertyBuilder::createProperty("Http Proxy Username")->withDescription("Http
Proxy Username")
+        ->isRequired(false)->supportsExpressionLanguage(true)->build());
+core::Property ListSFTP::HttpProxyPassword(
+    core::PropertyBuilder::createProperty("Http Proxy Password")->withDescription("Http
Proxy Password")
+        ->isRequired(false)->supportsExpressionLanguage(true)->build());
+core::Property ListSFTP::EntityTrackingTimeWindow(
+    core::PropertyBuilder::createProperty("Entity Tracking Time Window")->withDescription("Specify
how long this processor should track already-listed entities. "
+                                                                                        
 "'Tracking Entities' strategy can pick any entity whose timestamp is inside the specified
time window. "
+                                                                                        
 "For example, if set to '30 minutes', any entity having timestamp in recent 30 minutes will
be the listing target when this processor runs. "
+                                                                                        
 "A listed entity is considered 'new/updated' and a FlowFile is emitted if one of following
condition meets: "
+                                                                                        
 "1. does not exist in the already-listed entities, "
+                                                                                        
 "2. has newer timestamp than the cached entity, "
+                                                                                        
 "3. has different size than the cached entity. "
+                                                                                        
 "If a cached entity's timestamp becomes older than specified time window, that entity will
be removed from the cached already-listed entities. "
+                                                                                        
 "Used by 'Tracking Entities' strategy.")
+        ->isRequired(false)->build());
+core::Property ListSFTP::EntityTrackingInitialListingTarget(
+    core::PropertyBuilder::createProperty("Entity Tracking Initial Listing Target")->withDescription("Specify
how initial listing should be handled. Used by 'Tracking Entities' strategy.")
+        ->withAllowableValues<std::string>({ENTITY_TRACKING_INITIAL_LISTING_TARGET_TRACKING_TIME_WINDOW,
+                                            ENTITY_TRACKING_INITIAL_LISTING_TARGET_ALL_AVAILABLE})
+        ->isRequired(false)->withDefaultValue(ENTITY_TRACKING_INITIAL_LISTING_TARGET_ALL_AVAILABLE)->build());
+core::Property ListSFTP::MinimumFileAge(
+    core::PropertyBuilder::createProperty("Minimum File Age")->withDescription("The minimum
age that a file must be in order to be pulled; "
+                                                                               "any file
younger than this amount of time (according to last modification date) will be ignored")
+        ->isRequired(true)->withDefaultValue<core::TimePeriodValue>("0 sec")->build());
+core::Property ListSFTP::MaximumFileAge(
+    core::PropertyBuilder::createProperty("Maximum File Age")->withDescription("The maximum
age that a file must be in order to be pulled; "
+                                                                               "any file
older than this amount of time (according to last modification date) will be ignored")
+        ->isRequired(false)->build());
+core::Property ListSFTP::MinimumFileSize(
+    core::PropertyBuilder::createProperty("Minimum File Size")->withDescription("The minimum
size that a file must be in order to be pulled")
+        ->isRequired(true)->withDefaultValue<core::DataSizeValue>("0 B")->build());
+core::Property ListSFTP::MaximumFileSize(
+    core::PropertyBuilder::createProperty("Maximum File Size")->withDescription("The maximum
size that a file must be in order to be pulled")
+        ->isRequired(false)->build());
+core::Property ListSFTP::StateFile(
+    core::PropertyBuilder::createProperty("State File")->withDescription("Specifies the
file that should be used for storing state about"
+                                                                         " what data has
been ingested so that upon restart MiNiFi can resume from where it left off")
+        ->isRequired(true)->withDefaultValue("ListSFTP")->build());
+
+core::Relationship ListSFTP::Success("success", "All FlowFiles that are received are routed
to success");
+
+const std::map<std::string, uint64_t> ListSFTP::LISTING_LAG_MAP = {
+  {ListSFTP::TARGET_SYSTEM_TIMESTAMP_PRECISION_SECONDS, 1000},
+  {ListSFTP::TARGET_SYSTEM_TIMESTAMP_PRECISION_MINUTES, 60000},
+};
+
+void ListSFTP::initialize() {
+  logger_->log_trace("Initializing FetchSFTP");
+
+  // Set the supported properties
+  std::set<core::Property> properties;
+  properties.insert(ListingStrategy);
+  properties.insert(Hostname);
+  properties.insert(Port);
+  properties.insert(Username);
+  properties.insert(Password);
+  properties.insert(PrivateKeyPath);
+  properties.insert(PrivateKeyPassphrase);
+  properties.insert(RemotePath);
+  properties.insert(SearchRecursively);
+  properties.insert(FollowSymlink);
+  properties.insert(FileFilterRegex);
+  properties.insert(PathFilterRegex);
+  properties.insert(IgnoreDottedFiles);
+  properties.insert(StrictHostKeyChecking);
+  properties.insert(HostKeyFile);
+  properties.insert(ConnectionTimeout);
+  properties.insert(DataTimeout);
+  properties.insert(SendKeepaliveOnTimeout);
+  properties.insert(TargetSystemTimestampPrecision);
+  properties.insert(ProxyType);
+  properties.insert(ProxyHost);
+  properties.insert(ProxyPort);
+  properties.insert(HttpProxyUsername);
+  properties.insert(HttpProxyPassword);
+  properties.insert(EntityTrackingInitialListingTarget);
+  properties.insert(MinimumFileAge);
+  properties.insert(MaximumFileAge);
+  properties.insert(MinimumFileSize);
+  properties.insert(MaximumFileSize);
+  properties.insert(StateFile);
+  setSupportedProperties(properties);
+
+  // Set the supported relationships
+  std::set<core::Relationship> relationships;
+  relationships.insert(Success);
+  setSupportedRelationships(relationships);
+}
+
+ListSFTP::ListSFTP(std::string name, utils::Identifier uuid /*= utils::Identifier()*/)
+    : SFTPProcessorBase(name, uuid)
+    , search_recursively_(false)
+    , follow_symlink_(false)
+    , file_filter_regex_set_(false)
+    , path_filter_regex_set_(false)
+    , ignore_dotted_files_(false)
+    , minimum_file_age_(0U)
+    , maximum_file_age_(0U)
+    , minimum_file_size_(0U)
+    , maximum_file_size_(0U)
+    , already_loaded_from_cache_(false)
+    , last_listed_latest_entry_timestamp_(0U)
+    , last_processed_latest_entry_timestamp_(0U)
+    , initial_listing_complete_(false) {
+  logger_ = logging::LoggerFactory<ListSFTP>::getLogger();
+}
+
+ListSFTP::~ListSFTP() {
+#ifndef WIN32
+  if (file_filter_regex_set_) {
+    regfree(&compiled_file_filter_regex_);
+  }
+  if (path_filter_regex_set_) {
+    regfree(&compiled_path_filter_regex_);
+  }
+#endif
+}
+
+void ListSFTP::onSchedule(const std::shared_ptr<core::ProcessContext> &context,
const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
+  std::string value;
+  context->getProperty(ListingStrategy.getName(), listing_strategy_);
+  if (!last_listing_strategy_.empty() && last_listing_strategy_ != listing_strategy_)
{
+    invalidateCache();
+  }
+  last_listing_strategy_ = listing_strategy_;
+  if (!context->getProperty(SearchRecursively.getName(), value)) {
+    logger_->log_error("Search Recursively attribute is missing or invalid");
+  } else {
+    utils::StringUtils::StringToBool(value, search_recursively_);
+  }
+  if (!context->getProperty(FollowSymlink.getName(), value)) {
+    logger_->log_error("Follow symlink attribute is missing or invalid");
+  } else {
+    utils::StringUtils::StringToBool(value, follow_symlink_);
+  }
+  if (context->getProperty(FileFilterRegex.getName(), file_filter_regex_)) {
+#ifndef WIN32
+    if (file_filter_regex_set_) {
+      regfree(&compiled_file_filter_regex_);
+    }
+    int ret = regcomp(&compiled_file_filter_regex_, file_filter_regex_.c_str(), 0);
+    if (ret != 0) {
+      logger_->log_error("Failed to compile File Filter Regex \"%s\"", file_filter_regex_.c_str());
+      file_filter_regex_set_ = false;
+    } else {
+      file_filter_regex_set_ = true;
+    }
+#else
+    try {
+      compiled_file_filter_regex_ = std::regex(file_filter_regex_);
+      file_filter_regex_set_ = true;
+    } catch (std::regex_error&) {
+      logger_->log_error("Failed to compile File Filter Regex \"%s\"", file_filter_regex_.c_str());
+      file_filter_regex_set_ = false;
+    }
+#endif
+  } else {
+    file_filter_regex_set_ = false;
+  }
+  if (context->getProperty(PathFilterRegex.getName(), path_filter_regex_)) {
+#ifndef WIN32
+    if (path_filter_regex_set_) {
+      regfree(&compiled_path_filter_regex_);
+    }
+    int ret = regcomp(&compiled_path_filter_regex_, path_filter_regex_.c_str(), 0);
+    if (ret != 0) {
+      logger_->log_error("Failed to compile Path Filter Regex \"%s\"", path_filter_regex_.c_str());
+      file_filter_regex_set_ = false;
+    } else {
+      path_filter_regex_set_ = true;
+    }
+#else
+    try {
+      compiled_path_filter_regex_ = std::regex(path_filter_regex_);
+      path_filter_regex_set_ = true;
+    } catch (std::regex_error&) {
+      logger_->log_error("Failed to compile Path Filter Regex \"%s\"", path_filter_regex_.c_str());
+      path_filter_regex_set_ = false;
+    }
+#endif
+  } else {
+    path_filter_regex_set_ = false;
+  }
+  if (!context->getProperty(IgnoreDottedFiles.getName(), value)) {
+    logger_->log_error("Ignore Dotted Files attribute is missing or invalid");
+  } else {
+    utils::StringUtils::StringToBool(value, ignore_dotted_files_);
+  }
+  if (!context->getProperty(StrictHostKeyChecking.getName(), value)) {
+    logger_->log_error("Strict Host Key Checking attribute is missing or invalid");
+  } else {
+    utils::StringUtils::StringToBool(value, strict_host_checking_);
+  }
+  context->getProperty(HostKeyFile.getName(), host_key_file_);
+  if (!context->getProperty(ConnectionTimeout.getName(), value)) {
+    logger_->log_error("Connection Timeout attribute is missing or invalid");
+  } else {
+    core::TimeUnit unit;
+    if (!core::Property::StringToTime(value, connection_timeout_, unit) || !core::Property::ConvertTimeUnitToMS(connection_timeout_,
unit, connection_timeout_)) {
+      logger_->log_error("Connection Timeout attribute is invalid");
+    }
+  }
+  if (!context->getProperty(DataTimeout.getName(), value)) {
+    logger_->log_error("Data Timeout attribute is missing or invalid");
+  } else {
+    core::TimeUnit unit;
+    if (!core::Property::StringToTime(value, data_timeout_, unit) || !core::Property::ConvertTimeUnitToMS(data_timeout_,
unit, data_timeout_)) {
+      logger_->log_error("Data Timeout attribute is invalid");
+    }
+  }
+  if (!context->getProperty(SendKeepaliveOnTimeout.getName(), value)) {
+    logger_->log_error("Send Keep Alive On Timeout attribute is missing or invalid");
+  } else {
+    utils::StringUtils::StringToBool(value, use_keepalive_on_timeout_);
+  }
+  context->getProperty(TargetSystemTimestampPrecision.getName(), target_system_timestamp_precision_);
+  context->getProperty(ProxyType.getName(), proxy_type_);
+  context->getProperty(EntityTrackingInitialListingTarget.getName(), entity_tracking_initial_listing_target_);
+  if (!context->getProperty(MinimumFileAge.getName(), value)) {
+    logger_->log_error("Minimum File Age attribute is missing or invalid");
+  } else {
+    core::TimeUnit unit;
+    if (!core::Property::StringToTime(value, minimum_file_age_, unit) || !core::Property::ConvertTimeUnitToMS(minimum_file_age_,
unit, minimum_file_age_)) {
+      logger_->log_error("Minimum File Age attribute is invalid");
+    }
+  }
+  if (context->getProperty(MaximumFileAge.getName(), value)) {
+    core::TimeUnit unit;
+    if (!core::Property::StringToTime(value, maximum_file_age_, unit) || !core::Property::ConvertTimeUnitToMS(maximum_file_age_,
unit, maximum_file_age_)) {
+      logger_->log_error("Maximum File Age attribute is invalid");
+    }
+  }
+  if (!context->getProperty(MinimumFileSize.getName(), minimum_file_size_)) {
+    logger_->log_error("Minimum File Size attribute is invalid");
+  }
+  if (context->getProperty(MaximumFileSize.getName(), value)) {
+    if (!core::DataSizeValue::StringToInt(value, maximum_file_size_)) {
+      logger_->log_error("Maximum File Size attribute is invalid");
+    }
+  }
+  context->getProperty(StateFile.getName(), value);
+  if (listing_strategy_ == LISTING_STRATEGY_TRACKING_TIMESTAMPS) {
+    std::stringstream ss;
+    ss << value << "." << getUUIDStr() << ".TrackingTimestamps";
+    auto new_tracking_timestamps_state_filename = ss.str();
+    if (new_tracking_timestamps_state_filename != tracking_timestamps_state_filename_) {
+      if (!tracking_timestamps_state_filename_.empty()) {
+        if (unlink(tracking_timestamps_state_filename_.c_str()) != 0) {
+          logger_->log_error("Unable to delete old Tracking Timestamps state file \"%s\"",
+                             tracking_timestamps_state_filename_.c_str());
+        }
+      }
+    }
+    tracking_timestamps_state_filename_ = new_tracking_timestamps_state_filename;
+  } else if (listing_strategy_ == LISTING_STRATEGY_TRACKING_ENTITIES) {
+    std::stringstream ss;
+    ss << value << "." << getUUIDStr() << ".TrackingEntities";
+    auto new_tracking_entities_state_filename = ss.str();
+    ss << ".json";
+    auto new_tracking_entities_state_json_filename = ss.str();
+    if (new_tracking_entities_state_filename != tracking_entities_state_filename_) {
+      if (!tracking_entities_state_filename_.empty()) {
+        if (unlink(tracking_entities_state_filename_.c_str()) != 0) {
+          logger_->log_error("Unable to delete old Tracking Entities state file \"%s\"",
+                             tracking_entities_state_filename_.c_str());
+        }
+      }
+      if (!tracking_entities_state_json_filename_.empty()) {
+        if (unlink(tracking_entities_state_json_filename_.c_str()) != 0) {
+          logger_->log_error("Unable to delete old Tracking Entities json state file \"%s\"",
+                             tracking_entities_state_json_filename_.c_str());
+        }
+      }
+    }
+    tracking_entities_state_filename_ = new_tracking_entities_state_filename;
+    tracking_entities_state_json_filename_ = new_tracking_entities_state_json_filename;
+  } else {
+    logger_->log_error("Unknown Listing Strategy: \"%s\"", listing_strategy_.c_str());
+  }
+
+  startKeepaliveThreadIfNeeded();
+}
+
+void ListSFTP::notifyStop() {
+  logger_->log_debug("Got notifyStop, stopping keepalive thread and clearing connections");
+  cleanupConnectionCache();
+}
+
+void ListSFTP::invalidateCache() {
+  logger_->log_warn("Important properties have been reconfigured, invalidating in-memory
cache");
+
+  already_loaded_from_cache_ = false;
+
+  last_run_time_ = std::chrono::time_point<std::chrono::steady_clock>();
+  last_listed_latest_entry_timestamp_ = 0U;
+  last_processed_latest_entry_timestamp_ = 0U;
+  latest_identifiers_processed_.clear();
+
+  initial_listing_complete_ = false;
+  already_listed_entities_.clear();
+}
+
+ListSFTP::Child::Child()
+    :directory(false) {
+  memset(&attrs, 0x00, sizeof(attrs));
+}
+
+ListSFTP::Child::Child(const std::string& parent_path_, std::tuple<std::string /*
filename */, std::string /* longentry */, LIBSSH2_SFTP_ATTRIBUTES /* attrs */>&&
sftp_child) {
+  parent_path = parent_path_;
+  std::tie(filename, std::ignore, attrs) = std::move(sftp_child);
+  directory = LIBSSH2_SFTP_S_ISDIR(attrs.permissions);
+}
+
+std::string ListSFTP::Child::getPath() const {
+  std::stringstream ss;
+  ss << parent_path << "/" << filename;
+  return ss.str();
+}
+
+bool ListSFTP::filter(const std::string& parent_path, const std::tuple<std::string
/* filename */, std::string /* longentry */, LIBSSH2_SFTP_ATTRIBUTES /* attrs */>&
sftp_child) {
+  const std::string& filename = std::get<0>(sftp_child);
+  const LIBSSH2_SFTP_ATTRIBUTES& attrs = std::get<2>(sftp_child);
+  /* This should not happen */
+  if (filename.empty()) {
+    logger_->log_error("Listing directory \"%s\" returned an empty child", parent_path.c_str());
+    return false;
+  }
+  /* Ignore current dir and parent dir */
+  if (filename == "." || filename == "..") {
+    return false;
+  }
+  /* Dotted files */
+  if (ignore_dotted_files_ && filename[0] == '.') {
+    logger_->log_debug("Ignoring \"%s/%s\" because Ignore Dotted Files is true", parent_path.c_str(),
filename.c_str());
+    return false;
+  }
+  if (!(attrs.flags & LIBSSH2_SFTP_ATTR_PERMISSIONS)) {
+    // TODO: maybe do a fallback stat here
+    logger_->log_error("Failed to get permissions in stat for \"%s/%s\"", parent_path.c_str(),
filename.c_str());
+    return false;
+  }
+  if (LIBSSH2_SFTP_S_ISREG(attrs.permissions)) {
+    return filterFile(parent_path, filename, attrs);
+  } else if (LIBSSH2_SFTP_S_ISDIR(attrs.permissions)) {
+    return filterDirectory(parent_path, filename, attrs);
+  } else {
+    logger_->log_debug("Skipping non-regular, non-directory file \"%s/%s\"", parent_path.c_str(),
filename.c_str());
+    return false;
+  }
+}
+
+bool ListSFTP::filterFile(const std::string& parent_path, const std::string& filename,
const LIBSSH2_SFTP_ATTRIBUTES& attrs) {
+  if (!(attrs.flags & LIBSSH2_SFTP_ATTR_UIDGID) ||
+      !(attrs.flags & LIBSSH2_SFTP_ATTR_SIZE) ||
+      !(attrs.flags & LIBSSH2_SFTP_ATTR_ACMODTIME)) {
+    // TODO: maybe do a fallback stat here
+    logger_->log_error("Failed to get all attributes in stat for \"%s/%s\"", parent_path.c_str(),
filename.c_str());
+    return false;
+  }
+
+  /* Age */
+  time_t now = time(nullptr);
+  int64_t file_age = (now - attrs.mtime) * 1000;
+  if (file_age < minimum_file_age_) {
+    logger_->log_debug("Ignoring \"%s/%s\" because it is younger than the Minimum File
Age: %ld ms < %lu ms",
+        parent_path.c_str(),
+        filename.c_str(),
+        file_age,
+        minimum_file_age_);
+    return false;
+  }
+  if (maximum_file_age_ != 0U && file_age > maximum_file_age_) {
+    logger_->log_debug("Ignoring \"%s/%s\" because it is older than the Maximum File Age:
%ld ms > %lu ms",
+                       parent_path.c_str(),
+                       filename.c_str(),
+                       file_age,
+                       maximum_file_age_);
+    return false;
+  }
+
+  /* Size */
+  if (attrs.filesize < minimum_file_size_) {
+    logger_->log_debug("Ignoring \"%s/%s\" because it is smaller than the Minimum File
Size: %lu B < %lu B",
+                       parent_path.c_str(),
+                       filename.c_str(),
+                       attrs.filesize,
+                       minimum_file_size_);
+    return false;
+  }
+  if (maximum_file_size_ != 0U && attrs.filesize > maximum_file_size_) {
+    logger_->log_debug("Ignoring \"%s/%s\" because it is larger than the Maximum File
Size: %lu B > %lu B",
+                       parent_path.c_str(),
+                       filename.c_str(),
+                       attrs.filesize,
+                       maximum_file_size_);
+    return false;
+  }
+
+  /* File Filter Regex */
+  if (file_filter_regex_set_) {
+    bool match = false;
+#ifndef WIN32
+    int ret = regexec(&compiled_file_filter_regex_, filename.c_str(), static_cast<size_t>(0),
nullptr, 0);
+    match = ret == 0;
+#else
+    match = std::regex_match(filename, compiled_file_filter_regex_);
+#endif
+    if (!match) {
+      logger_->log_debug("Ignoring \"%s/%s\" because it did not match the File Filter
Regex \"%s\"",
+                         parent_path.c_str(),
+                         filename.c_str(),
+                         file_filter_regex_);
+      return false;
+    }
+  }
+
+  return true;
+}
+
+bool ListSFTP::filterDirectory(const std::string& parent_path, const std::string&
filename, const LIBSSH2_SFTP_ATTRIBUTES& attrs) {
+  if (!search_recursively_) {
+    return false;
+  }
+
+  /* Path Filter Regex */
+  if (path_filter_regex_set_) {
+    std::stringstream ss;
+    ss << parent_path << "/" << filename;
+    auto dir_path = ss.str();
+    bool match = false;
+#ifndef WIN32
+    int ret = regexec(&compiled_path_filter_regex_, dir_path.c_str(), static_cast<size_t>(0),
nullptr, 0);
+    match = ret == 0;
+#else
+    match = std::regex_match(dir_path, compiled_path_filter_regex_);
+#endif
+    if (!match) {
+      logger_->log_debug("Not recursing into \"%s\" because it did not match the Path
Filter Regex \"%s\"",
+                         dir_path.c_str(),
+                         path_filter_regex_);
+      return false;
+    }
+  }
+
+  return true;
+}
+
+bool ListSFTP::createAndTransferFlowFileFromChild(
+    const std::shared_ptr<core::ProcessSession>& session,
+    const std::string& hostname,
+    uint16_t port,
+    const std::string& username,
+    const ListSFTP::Child& child) {
+  /* Convert mtime to string */
+  if (child.attrs.mtime > std::numeric_limits<int64_t>::max()) {
+    logger_->log_error("Modification date %lu of \"%s/%s\" larger than int64_t max", child.attrs.mtime,
child.parent_path.c_str(), child.filename.c_str());
+    return true;
+  }
+  std::string mtime_str;
+  if (!getDateTimeStr(static_cast<int64_t>(child.attrs.mtime), mtime_str)) {
+    logger_->log_error("Failed to convert modification date %lu of \"%s/%s\" to string",
child.attrs.mtime, child.parent_path.c_str(), child.filename.c_str());
+    return true;
+  }
+
+  /* Create FlowFile */
+  std::shared_ptr<FlowFileRecord> flow_file = std::static_pointer_cast<FlowFileRecord>(session->create());
+  if (flow_file == nullptr) {
+    logger_->log_error("Failed to create FlowFileRecord");
+    return false;
+  }
+
+  /* Set attributes */
+  session->putAttribute(flow_file, ATTRIBUTE_SFTP_REMOTE_HOST, hostname);
+  session->putAttribute(flow_file, ATTRIBUTE_SFTP_REMOTE_PORT, std::to_string(port));
+  session->putAttribute(flow_file, ATTRIBUTE_SFTP_LISTING_USER, username);
+
+  /* uid and gid */
+  session->putAttribute(flow_file, ATTRIBUTE_FILE_OWNER, std::to_string(child.attrs.uid));
+  session->putAttribute(flow_file, ATTRIBUTE_FILE_GROUP, std::to_string(child.attrs.gid));
+
+  /* permissions */
+  std::stringstream ss;
+  ss << std::setfill('0') << std::setw(4) << std::oct << (child.attrs.permissions
& 0777);
+  session->putAttribute(flow_file, ATTRIBUTE_FILE_PERMISSIONS, ss.str());
+
+  /* filesize */
+  session->putAttribute(flow_file, ATTRIBUTE_FILE_SIZE, std::to_string(child.attrs.filesize));
+
+  /* mtime */
+  session->putAttribute(flow_file, ATTRIBUTE_FILE_LASTMODIFIEDTIME, mtime_str);
+
+  flow_file->updateKeyedAttribute(FILENAME, child.filename);
+  flow_file->updateKeyedAttribute(PATH, child.parent_path);
+
+  session->transfer(flow_file, Success);
+
+  return true;
+}
+
+ListSFTP::ListedEntity::ListedEntity()
+    : timestamp(0U)
+    , size(0U) {
+}
+
+ListSFTP::ListedEntity::ListedEntity(uint64_t timestamp_, uint64_t size_)
+    : timestamp(timestamp_)
+    , size(size_) {
+}
+
+bool ListSFTP::persistTrackingTimestampsCache(const std::string& hostname, const std::string&
username, const std::string& remote_path) {
+  std::ofstream file(tracking_timestamps_state_filename_);
+  if (!file.is_open()) {
+    logger_->log_error("Failed to store state to Tracking Timestamps state file \"%s\"",
tracking_timestamps_state_filename_.c_str());
+    return false;
+  }
+  file << "hostname=" << hostname << "\n";
+  file << "username=" << username << "\n";
+  file << "remote_path=" << remote_path << "\n";
+  file << "listing.timestamp=" << last_listed_latest_entry_timestamp_ <<
"\n";
+  file << "processed.timestamp=" << last_processed_latest_entry_timestamp_ <<
"\n";
+  size_t i = 0;
+  for (const auto& identifier : latest_identifiers_processed_) {
+    file << "id." << i << "=" << identifier << "\n";
+    ++i;
+  }
+  file.close();
+  return true;
+}
+
+bool ListSFTP::updateFromTrackingTimestampsCache(const std::string& hostname, const std::string&
username, const std::string& remote_path) {
+  std::ifstream file(tracking_timestamps_state_filename_);
+  if (!file.is_open()) {
+    logger_->log_error("Failed to open Tracking Timestamps state file \"%s\"", tracking_timestamps_state_filename_.c_str());
+    return false;
+  }
+  std::string state_hostname;
+  std::string state_username;
+  std::string state_remote_path;
+  uint64_t state_listing_timestamp;
+  uint64_t state_processed_timestamp;
+  std::set<std::string> state_ids;
+
+  std::string line;
+  while (std::getline(file, line)) {
+    size_t separator_pos = line.find('=');
+    if (separator_pos == std::string::npos) {
+      logger_->log_warn("None key-value line found in Tracking Timestamps state file \"%s\":
\"%s\"", tracking_timestamps_state_filename_.c_str(), line.c_str());
+    }
+    std::string key = line.substr(0, separator_pos);
+    std::string value = line.substr(separator_pos + 1);
+    if (key == "hostname") {
+      state_hostname = std::move(value);
+    } else if (key == "username") {
+      state_username = std::move(value);
+    } else if (key == "remote_path") {
+      state_remote_path = std::move(value);
+    } else if (key == "listing.timestamp") {
+      try {
+        state_listing_timestamp = stoull(value);
+      } catch (...) {
+        logger_->log_error("listing.timestamp is not an uint64 in Tracking Timestamps
state file \"%s\"", tracking_timestamps_state_filename_.c_str());
+        return false;
+      }
+    } else if (key == "processed.timestamp") {
+      try {
+        state_processed_timestamp = stoull(value);
+      } catch (...) {
+        logger_->log_error("processed.timestamp is not an uint64 in Tracking Timestamps
state file \"%s\"", tracking_timestamps_state_filename_.c_str());
+        return false;
+      }
+    } else if (key.compare(0, strlen("id."), "id.") == 0) {
+      state_ids.emplace(std::move(value));
+    } else {
+      logger_->log_warn("Unknown key found in Tracking Timestamps state file \"%s\": \"%s\"",
tracking_timestamps_state_filename_.c_str(), key.c_str());
+    }
+  }
+  file.close();
+
+  if (state_hostname != hostname ||
+      state_username != username ||
+      state_remote_path != remote_path) {
+    logger_->log_error("Tracking Timestamps state file \"%s\" was created with different
settings than the current ones, ignoring. "
+                       "Hostname: \"%s\" vs. \"%s\", "
+                       "Username: \"%s\" vs. \"%s\", "
+                       "Remote Path: \"%s\" vs. \"%s\"",
+                       tracking_timestamps_state_filename_.c_str(),
+                       state_hostname, hostname,
+                       state_username, username,
+                       state_remote_path, remote_path);
+    return false;
+  }
+
+  last_listed_latest_entry_timestamp_ = state_listing_timestamp;
+  last_processed_latest_entry_timestamp_ = state_processed_timestamp;
+  latest_identifiers_processed_ = std::move(state_ids);
+
+  return true;
+}
+
+void ListSFTP::listByTrackingTimestamps(
+    const std::shared_ptr<core::ProcessContext>& context,
+    const std::shared_ptr<core::ProcessSession>& session,
+    const std::string& hostname,
+    uint16_t port,
+    const std::string& username,
+    const std::string& remote_path,
+    std::vector<Child>&& files) {
+  uint64_t min_timestamp_to_list = last_listed_latest_entry_timestamp_;
+
+  /* Load state from cache file if needed */
+  if (!already_loaded_from_cache_ && !tracking_timestamps_state_filename_.empty())
{
+    if (updateFromTrackingTimestampsCache(hostname, username, remote_path)) {
+      logger_->log_debug("Successfully loaded Tracking Timestamps state file \"%s\"",
tracking_timestamps_state_filename_.c_str());
+    } else {
+      logger_->log_debug("Failed to load Tracking Timestamps state file \"%s\"", tracking_timestamps_state_filename_.c_str());
+    }
+    already_loaded_from_cache_ = true;
+  }
+
+  std::chrono::time_point<std::chrono::steady_clock> current_run_time = std::chrono::steady_clock::now();
+  time_t now = time(nullptr);
+
+  /* Order children by timestamp and try to detect timestamp precision if needed  */
+  std::map<uint64_t /*timestamp*/, std::list<Child>> ordered_files;
+  bool target_system_has_seconds = false;
+  for (auto&& file : files) {
+    uint64_t timestamp = file.attrs.mtime * 1000;
+    target_system_has_seconds |= timestamp % 60000 != 0;
+
+    bool new_file = min_timestamp_to_list == 0U || (timestamp >= min_timestamp_to_list
&& timestamp >= last_processed_latest_entry_timestamp_);
+    if (new_file) {
+      auto& files_for_timestamp = ordered_files[timestamp];
+      files_for_timestamp.emplace_back(std::move(file));
+    } else {
+      logger_->log_trace("Skipping \"%s\", because it is not new.", file.getPath().c_str());
+    }
+  }
+
+  uint64_t latest_listed_entry_timestamp_this_cycle = 0U;
+  size_t flow_files_created = 0U;
+  if (ordered_files.size() > 0) {
+    latest_listed_entry_timestamp_this_cycle = std::prev(ordered_files.end())->first;
+
+    std::string remote_system_timestamp_precision;
+    if (target_system_timestamp_precision_ == TARGET_SYSTEM_TIMESTAMP_PRECISION_AUTO_DETECT)
{
+      if (target_system_has_seconds) {
+        logger_->log_debug("Precision auto detection detected second precision");
+        remote_system_timestamp_precision = TARGET_SYSTEM_TIMESTAMP_PRECISION_SECONDS;
+      } else {
+        logger_->log_debug("Precision auto detection detected minute precision");
+        remote_system_timestamp_precision = TARGET_SYSTEM_TIMESTAMP_PRECISION_MINUTES;
+      }
+    } else if (target_system_timestamp_precision_ == TARGET_SYSTEM_TIMESTAMP_PRECISION_MINUTES)
{
+        remote_system_timestamp_precision = TARGET_SYSTEM_TIMESTAMP_PRECISION_MINUTES;
+    } else {
+      /*
+       * We only have seconds-precision timestamps, TARGET_SYSTEM_TIMESTAMP_PRECISION_MILLISECONDS
makes no real sense here,
+       * so we will treat it as TARGET_SYSTEM_TIMESTAMP_PRECISION_SECONDS.
+       */
+      remote_system_timestamp_precision = TARGET_SYSTEM_TIMESTAMP_PRECISION_SECONDS;
+    }
+    uint64_t listing_lag = LISTING_LAG_MAP.at(remote_system_timestamp_precision);
+    logger_->log_debug("The listing lag is %lu ms", listing_lag);
+
+    /* If the latest listing time is equal to the last listing time, there are no entries
with a newer timestamp than previously seen */
+    if (latest_listed_entry_timestamp_this_cycle == last_listed_latest_entry_timestamp_)
{
+      const auto& latest_files = ordered_files.at(latest_listed_entry_timestamp_this_cycle);
+      uint64_t elapsed_time = std::chrono::duration_cast<std::chrono::milliseconds>(current_run_time
- last_run_time_).count();
+      /* If a precision-specific listing lag has not yet elapsed since out last execution,
we wait. */
+      if (elapsed_time < listing_lag) {
+        logger_->log_debug("The latest listed entry timestamp is the same as the last
listed entry timestamp (%lu) "
+                           "and the listing lag has not yet elapsed (%lu ms < % lu ms).
Yielding.",
+                           latest_listed_entry_timestamp_this_cycle,
+                           elapsed_time,
+                           listing_lag);
+        context->yield();
+        return;
+      }
+      /*
+       * If we have already processed the entities with the newest timestamp,
+       * and there are no new entities with that timestamp, there is nothing to do.
+       */
+      if (latest_listed_entry_timestamp_this_cycle == last_processed_latest_entry_timestamp_
&&
+          std::all_of(latest_files.begin(), latest_files.end(), [this](const Child& child)
{
+            return latest_identifiers_processed_.count(child.getPath()) == 1U;
+          })) {
+        logger_->log_debug("The latest listed entry timestamp is the same as the last
listed entry timestamp (%lu) "
+                           "and all files for that timestamp has been processed. Yielding.",
latest_listed_entry_timestamp_this_cycle);
+        context->yield();
+        return;
+      }
+    } else {
+      /* Determine the minimum reliable timestamp based on precision */
+      uint64_t minimum_reliable_timestamp = now * 1000 - listing_lag;
+      if (remote_system_timestamp_precision == TARGET_SYSTEM_TIMESTAMP_PRECISION_SECONDS)
{
 
 Review comment:
   You are right, many parts of the code (this not being an exception) use unix timestamps
in milliseconds stored in uint64_t's/int64_t's. It would be better to use a proper type system
for timestamps and durations everywhere.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message