nifi-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [nifi-minifi-cpp] lordgamez commented on a change in pull request #915: MINIFICPP-1376 Create PutS3Object processor
Date Mon, 05 Oct 2020 07:59:15 GMT

lordgamez commented on a change in pull request #915:
URL: https://github.com/apache/nifi-minifi-cpp/pull/915#discussion_r499407872



##########
File path: extensions/aws/processors/PutS3Object.cpp
##########
@@ -0,0 +1,487 @@
+/**
+ * @file PutS3Object.cpp
+ * PutS3Object class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "PutS3Object.h"
+
+#include <string>
+#include <regex>
+#include <set>
+#include <memory>
+
+#include "AWSCredentialsService.h"
+#include "properties/Properties.h"
+#include "utils/StringUtils.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace aws {
+namespace processors {
+
+namespace {
+
+template<typename T, typename U>
+std::set<T> getMapKeys(const std::map<T,U>& m) {
+  std::set<T> keys;
+  for (const auto& pair : m) {
+    keys.insert(pair.first);
+  }
+  return keys;
+}
+
+}  // namespace
+
+const std::set<std::string> PutS3Object::CANNED_ACLS(getMapKeys(minifi::aws::s3::CANNED_ACL_MAP));
+const std::set<std::string> PutS3Object::REGIONS({region::US_GOV_WEST_1, region::US_EAST_1,
region::US_EAST_2, region::US_WEST_1,
+  region::US_WEST_2, region::EU_WEST_1, region::EU_WEST_2, region::EU_CENTRAL_1, region::AP_SOUTH_1,
+  region::AP_SOUTHEAST_1, region::AP_SOUTHEAST_2, region::AP_NORTHEAST_1, region::AP_NORTHEAST_2,
+  region::SA_EAST_1, region::CN_NORTH_1, region::CA_CENTRAL_1});
+const std::set<std::string> PutS3Object::STORAGE_CLASSES(getMapKeys(minifi::aws::s3::STORAGE_CLASS_MAP));
+const std::set<std::string> PutS3Object::SERVER_SIDE_ENCRYPTIONS(getMapKeys(minifi::aws::s3::SERVER_SIDE_ENCRYPTION_MAP));
+
+const core::Property PutS3Object::ObjectKey(
+  core::PropertyBuilder::createProperty("Object Key")
+    ->withDescription("The key of the S3 object")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->withDefaultValue<std::string>("${filename}")
+    ->build());
+const core::Property PutS3Object::Bucket(
+  core::PropertyBuilder::createProperty("Bucket")
+    ->withDescription("The S3 bucket")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build());
+const core::Property PutS3Object::ContentType(
+  core::PropertyBuilder::createProperty("Content Type")
+    ->withDescription("Sets the Content-Type HTTP header indicating the type of content
stored in "
+                      "the associated object. The value of this header is a standard MIME
type. "
+                      "If no content type is provided the default content type "
+                      "\"application/octet-stream\" will be used.")
+    ->supportsExpressionLanguage(true)
+    ->withDefaultValue<std::string>("application/octet-stream")
+    ->build());
+const core::Property PutS3Object::AccessKey(
+  core::PropertyBuilder::createProperty("Access Key")
+    ->withDescription("AWS account access key")
+    ->supportsExpressionLanguage(true)
+    ->build());
+const core::Property PutS3Object::SecretKey(
+  core::PropertyBuilder::createProperty("Secret Key")
+    ->withDescription("AWS account secret key")
+    ->supportsExpressionLanguage(true)
+    ->build());
+const core::Property PutS3Object::CredentialsFile(
+  core::PropertyBuilder::createProperty("Credentials File")
+    ->withDescription("Path to a file containing AWS access key and secret key in properties
file format. Properties used: accessKey and secretKey")
+    ->build());
+const core::Property PutS3Object::AWSCredentialsProviderService(
+  core::PropertyBuilder::createProperty("AWS Credentials Provider service")
+    ->withDescription("The name of the AWS Credentials Provider controller service that
is used to obtain AWS credentials.")
+    ->build());
+const core::Property PutS3Object::StorageClass(
+  core::PropertyBuilder::createProperty("Storage Class")
+    ->isRequired(true)
+    ->withDefaultValue<std::string>("Standard")
+    ->withAllowableValues<std::string>(PutS3Object::STORAGE_CLASSES)
+    ->withDescription("AWS S3 Storage Class")
+    ->build());
+const core::Property PutS3Object::Region(
+  core::PropertyBuilder::createProperty("Region")
+    ->isRequired(true)
+    ->withDefaultValue<std::string>(region::US_WEST_2)
+    ->withAllowableValues<std::string>(PutS3Object::REGIONS)
+    ->withDescription("AWS Region")
+    ->build());
+const core::Property PutS3Object::CommunicationsTimeout(
+  core::PropertyBuilder::createProperty("Communications Timeout")
+    ->isRequired(true)
+    ->withDefaultValue<core::TimePeriodValue>("30 sec")
+    ->withDescription("")
+    ->build());
+const core::Property PutS3Object::FullControlUserList(
+  core::PropertyBuilder::createProperty("FullControl User List")
+    ->withDescription("A comma-separated list of Amazon User ID's or E-mail addresses
that specifies who should have Full Control for an object.")
+    ->supportsExpressionLanguage(true)
+    ->withDefaultValue<std::string>("${s3.permissions.full.users}")
+    ->build());
+const core::Property PutS3Object::ReadPermissionUserList(
+  core::PropertyBuilder::createProperty("Read Permission User List")
+    ->withDescription("A comma-separated list of Amazon User ID's or E-mail addresses
that specifies who should have Read Access for an object.")
+    ->supportsExpressionLanguage(true)
+    ->withDefaultValue<std::string>("${s3.permissions.read.users}")
+    ->build());
+const core::Property PutS3Object::ReadACLUserList(
+  core::PropertyBuilder::createProperty("Read ACL User List")
+    ->withDescription("A comma-separated list of Amazon User ID's or E-mail addresses
that specifies who should have permissions to read the Access Control List for an object.")
+    ->supportsExpressionLanguage(true)
+    ->withDefaultValue<std::string>("${s3.permissions.readacl.users}")
+    ->build());
+const core::Property PutS3Object::WriteACLUserList(
+  core::PropertyBuilder::createProperty("Write ACL User List")
+    ->withDescription("A comma-separated list of Amazon User ID's or E-mail addresses
that specifies who should have permissions to change the Access Control List for an object.")
+    ->supportsExpressionLanguage(true)
+    ->withDefaultValue<std::string>("${s3.permissions.writeacl.users}")
+    ->build());
+const core::Property PutS3Object::CannedACL(
+  core::PropertyBuilder::createProperty("Canned ACL")
+    ->withDescription("Amazon Canned ACL for an object. Allowed values: BucketOwnerFullControl,
BucketOwnerRead, AuthenticatedRead, PublicReadWrite, PublicRead, Private, AwsExecRead; will
be ignored if any other ACL/permission property is specified.")
+    ->supportsExpressionLanguage(true)
+    ->withDefaultValue<std::string>("${s3.permissions.cannedacl}")
+    ->build());
+const core::Property PutS3Object::EndpointOverrideURL(
+  core::PropertyBuilder::createProperty("Endpoint Override URL")
+    ->withDescription("Endpoint URL to use instead of the AWS default including scheme,
host, "
+                      "port, and path. The AWS libraries select an endpoint URL based on
the AWS "
+                      "region, but this property overrides the selected endpoint URL, allowing
use "
+                      "with other S3-compatible endpoints.")
+    ->supportsExpressionLanguage(true)
+    ->build());
+const core::Property PutS3Object::ServerSideEncryption(
+  core::PropertyBuilder::createProperty("Server Side Encryption")
+    ->isRequired(true)
+    ->withDefaultValue<std::string>("None")
+    ->withAllowableValues<std::string>(PutS3Object::SERVER_SIDE_ENCRYPTIONS)
+    ->withDescription("Specifies the algorithm used for server side encryption.")
+    ->build());
+const core::Property PutS3Object::ProxyHost(
+  core::PropertyBuilder::createProperty("Proxy Host")
+    ->withDescription("Proxy host name or IP")
+    ->supportsExpressionLanguage(true)
+    ->build());
+const core::Property PutS3Object::ProxyPort(
+  core::PropertyBuilder::createProperty("Proxy Port")
+    ->withDescription("The port number of the proxy host")
+    ->supportsExpressionLanguage(true)
+    ->build());
+const core::Property PutS3Object::ProxyUsername(
+    core::PropertyBuilder::createProperty("Proxy Username")
+    ->withDescription("Username to set when authenticating against proxy")
+    ->supportsExpressionLanguage(true)
+    ->build());
+const core::Property PutS3Object::ProxyPassword(
+  core::PropertyBuilder::createProperty("Proxy Password")
+    ->withDescription("Password to set when authenticating against proxy")
+    ->supportsExpressionLanguage(true)
+    ->build());
+
+const core::Relationship PutS3Object::Success("success", "FlowFiles are routed to success
relationship");
+const core::Relationship PutS3Object::Failure("failure", "FlowFiles are routed to failure
relationship");
+
+void PutS3Object::initialize() {
+  // Set the supported properties
+  std::set<core::Property> properties;
+  properties.insert(ObjectKey);
+  properties.insert(Bucket);
+  properties.insert(ContentType);
+  properties.insert(AccessKey);
+  properties.insert(SecretKey);
+  properties.insert(CredentialsFile);
+  properties.insert(AWSCredentialsProviderService);
+  properties.insert(StorageClass);
+  properties.insert(Region);
+  properties.insert(CommunicationsTimeout);
+  properties.insert(FullControlUserList);
+  properties.insert(ReadPermissionUserList);
+  properties.insert(ReadACLUserList);
+  properties.insert(WriteACLUserList);
+  properties.insert(CannedACL);
+  properties.insert(EndpointOverrideURL);
+  properties.insert(ServerSideEncryption);
+  properties.insert(ProxyHost);
+  properties.insert(ProxyPort);
+  properties.insert(ProxyUsername);
+  properties.insert(ProxyPassword);
+  setSupportedProperties(properties);
+  // Set the supported relationships
+  std::set<core::Relationship> relationships;
+  relationships.insert(Failure);
+  relationships.insert(Success);
+  setSupportedRelationships(relationships);
+}
+
+minifi::utils::optional<Aws::Auth::AWSCredentials> PutS3Object::getAWSCredentialsFromControllerService(const
std::shared_ptr<core::ProcessContext> &context) const {
+  std::string service_name;
+  if (context->getProperty(AWSCredentialsProviderService.getName(), service_name) &&
!service_name.empty()) {
+    std::shared_ptr<core::controller::ControllerService> service = context->getControllerService(service_name);
+    if (nullptr != service) {
+      auto aws_credentials_service = std::static_pointer_cast<minifi::aws::controllers::AWSCredentialsService>(service);
+      return minifi::utils::make_optional<Aws::Auth::AWSCredentials>(aws_credentials_service->getAWSCredentials());
+    }
+  }
+  return minifi::utils::nullopt;
+}
+
+minifi::utils::optional<Aws::Auth::AWSCredentials> PutS3Object::getAWSCredentialsFromProperties(
+    const std::shared_ptr<core::ProcessContext> &context,
+    const std::shared_ptr<core::FlowFile> &flow_file) const {
+  std::string access_key;
+  context->getProperty(AccessKey, access_key, flow_file);
+  std::string secret_key;
+  context->getProperty(SecretKey, secret_key, flow_file);
+  if (!access_key.empty() && !secret_key.empty()) {
+    Aws::Auth::AWSCredentials creds(access_key, secret_key);
+    return minifi::utils::make_optional<Aws::Auth::AWSCredentials>(creds);
+  }
+  return minifi::utils::nullopt;
+}
+
+minifi::utils::optional<Aws::Auth::AWSCredentials> PutS3Object::getAWSCredentialsFromFile(const
std::shared_ptr<core::ProcessContext> &context) const {
+  std::string credential_file;
+  if (context->getProperty(CredentialsFile.getName(), credential_file) && !credential_file.empty())
{
+    auto properties = std::make_shared<minifi::Properties>();
+    properties->loadConfigureFile(credential_file.c_str());
+    std::string access_key;
+    std::string secret_key;
+    if (properties->get("accessKey", access_key) && !access_key.empty() &&
properties->get("secretKey", secret_key) && !secret_key.empty()) {
+      Aws::Auth::AWSCredentials creds(access_key, secret_key);
+      return minifi::utils::make_optional<Aws::Auth::AWSCredentials>(creds);
+    }
+  }
+  return minifi::utils::nullopt;
+}
+
+minifi::utils::optional<Aws::Auth::AWSCredentials> PutS3Object::getAWSCredentials(
+    const std::shared_ptr<core::ProcessContext> &context,
+    const std::shared_ptr<core::FlowFile> &flow_file) const {
+  auto prop_cred = getAWSCredentialsFromProperties(context, flow_file);
+  if (prop_cred) {
+    logger_->log_info("AWS Credentials successfully set from properties");
+    return prop_cred.value();
+  }
+
+  auto file_cred = getAWSCredentialsFromFile(context);
+  if (file_cred) {
+    logger_->log_info("AWS Credentials successfully set from file");
+    return file_cred.value();
+  }
+
+  auto service_cred = getAWSCredentialsFromControllerService(context);
+  if (service_cred) {
+    logger_->log_info("AWS Credentials successfully set from controller service");
+    return service_cred.value();
+  }
+
+  return minifi::utils::nullopt;
+}
+
+void PutS3Object::fillUserMetadata(const std::shared_ptr<core::ProcessContext> &context)
{
+  const auto &dynamic_prop_keys = context->getDynamicPropertyKeys();
+  bool first_property = true;
+  for (const auto &prop_key : dynamic_prop_keys) {
+    std::string prop_value = "";
+    if (context->getDynamicProperty(prop_key, prop_value) && !prop_value.empty())
{
+      logger_->log_debug("PutS3Object: DynamicProperty: [%s] -> [%s]", prop_key, prop_value);
+      put_s3_request_params_.user_metadata_map.emplace(prop_key, prop_value);
+      if (first_property) {
+        user_metadata_ = prop_key + "=" + prop_value;
+        first_property = false;
+      } else {
+        user_metadata_ += "," + prop_key + "=" + prop_value;
+      }
+    }
+  }
+  logger_->log_debug("PutS3Object: User metadata [%s]", user_metadata_);
+}
+
+bool PutS3Object::setProxy(const std::shared_ptr<core::ProcessContext> &context,
const std::shared_ptr<core::FlowFile> &flow_file) {
+  aws::s3::ProxyOptions proxy;
+  context->getProperty(ProxyHost, proxy.host, flow_file);
+  std::string port_str;
+  if (context->getProperty(ProxyPort, port_str, flow_file) && !port_str.empty()
&& !core::Property::StringToInt(port_str, proxy.port)) {
+    logger_->log_error("Proxy port invalid");
+    return false;
+  }
+  context->getProperty(ProxyUsername, proxy.username, flow_file);
+  context->getProperty(ProxyPassword, proxy.password, flow_file);
+  if (!proxy.host.empty()) {
+    s3_wrapper_->setProxy(proxy);
+    logger_->log_info("Proxy for PutS3Object was set.");
+  }
+  return true;
+}
+
+void PutS3Object::onSchedule(const std::shared_ptr<core::ProcessContext> &context,
const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
+  if (!context->getProperty(Bucket.getName(), put_s3_request_params_.bucket) || put_s3_request_params_.bucket.empty())
{
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Bucket property missing or invalid");
+  }
+  logger_->log_debug("PutS3Object: Bucket [%s]", put_s3_request_params_.bucket);
+
+  if (!context->getProperty(StorageClass.getName(), put_s3_request_params_.storage_class)
+      || put_s3_request_params_.storage_class.empty()
+      || STORAGE_CLASSES.find(put_s3_request_params_.storage_class) == STORAGE_CLASSES.end())
{
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Storage Class property missing or invalid");
+  }
+  logger_->log_debug("PutS3Object: Storage Class [%s]", put_s3_request_params_.storage_class);
+
+  std::string value;
+  if (!context->getProperty(Region.getName(), value) || value.empty() || REGIONS.find(value)
== REGIONS.end()) {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Region property missing or invalid");
+  }
+  s3_wrapper_->setRegion(value);
+  logger_->log_debug("PutS3Object: Region [%s]", value);
+
+  uint64_t timeout_val;
+  if (context->getProperty(CommunicationsTimeout.getName(), value) && !value.empty()
&& core::Property::getTimeMSFromString(value, timeout_val)) {
+    s3_wrapper_->setTimeout(timeout_val);
+    logger_->log_debug("PutS3Object: Communications Timeout [%d]", timeout_val);
+  } else {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Communications Timeout missing or invalid");
+  }
+
+  if (!context->getProperty(ServerSideEncryption.getName(), put_s3_request_params_.server_side_encryption)
+      || put_s3_request_params_.server_side_encryption.empty()
+      || SERVER_SIDE_ENCRYPTIONS.find(put_s3_request_params_.server_side_encryption) == SERVER_SIDE_ENCRYPTIONS.end())
{
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Server Side Encryption property missing
or invalid");
+  }
+  logger_->log_debug("PutS3Object: Server Side Encryption [%s]", put_s3_request_params_.server_side_encryption);
+
+  fillUserMetadata(context);
+}
+
+std::string PutS3Object::parseAccessControlList(const std::string &comma_separated_list)
const {
+  std::string result_list;
+  bool is_first = true;
+  for (const auto& user : minifi::utils::StringUtils::split(comma_separated_list, ","))
{
+    if (is_first) {
+      is_first = false;
+    } else {
+      result_list += ", ";
+    }
+
+    auto trimmed_user = minifi::utils::StringUtils::trim(user);
+    static const std::regex email_pattern("(\\w+)(\\.|_)?(\\w*)@(\\w+)(\\.(\\w+))+");
+    if (std::regex_match(trimmed_user, email_pattern)) {

Review comment:
       In this case as we only accept email addresses and AWS ids so currently I only checked
if it is an email address or not in which case we assume it is an AWS id. The canonical user
id is not really specified it is only defined [here](https://docs.aws.amazon.com/general/latest/gr/acct-identifiers.html)
as "An alpha-numeric identifier, such as 79a59df900b949e55d96a1e698fbacedfd6e09d98eacf8f8d5218e7cd47ef2be,
that is an obfuscated form of the AWS account ID.". It seems to me it is a 64 long hex string
literal, but I am not sure if the length is fixed as there are some questions about it [here
for example](https://forums.aws.amazon.com/thread.jspa?threadID=286019).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



Mime
View raw message