nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From alopre...@apache.org
Subject [nifi] branch master updated: NIFI-6734: Fixed S3 multipart upload in case of SSE S3 and CSE* encryptions. Removed unnecessary code from S3 CSE* encryptions. S3 Encryption Service documentation fixes and improvements. Renamed region property of StandardS3EncryptionService to kms-region. Renamed Client-side Customer Master Key in StandardS3EncryptionService. Use Client-side Customer Key on the GUI / documentation (similar to Server-side Customer Key). Use C suffix in constants and class names (similar to SSE_C). Fixed / [...]
Date Fri, 04 Oct 2019 22:57:30 GMT
This is an automated email from the ASF dual-hosted git repository.

alopresto pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new ba14169  NIFI-6734: Fixed S3 multipart upload in case of SSE S3 and CSE* encryptions. Removed unnecessary code from S3 CSE* encryptions. S3 Encryption Service documentation fixes and improvements. Renamed region property of StandardS3EncryptionService to kms-region. Renamed Client-side Customer Master Key in StandardS3EncryptionService. Use Client-side Customer Key on the GUI / documentation (similar to Server-side Customer Key). Use C suffix in constants and class names (similar [...]
ba14169 is described below

commit ba141690c50a019939cb12af17b3bcbecf577b36
Author: Peter Turcsanyi <turcsanyi@cloudera.com>
AuthorDate: Tue Oct 1 12:08:57 2019 +0200

    NIFI-6734: Fixed S3 multipart upload in case of SSE S3 and CSE* encryptions.
    Removed unnecessary code from S3 CSE* encryptions.
    S3 Encryption Service documentation fixes and improvements.
    Renamed region property of StandardS3EncryptionService to kms-region.
    Renamed Client-side Customer Master Key in StandardS3EncryptionService.
    Use Client-side Customer Key on the GUI / documentation (similar to
    Server-side Customer Key).
    Use C suffix in constants and class names (similar to SSE_C).
    Fixed / extended StandardS3EncryptionService validation.
    FetchS3Object encryption strategy changes.
    Disable SSE S3 and SSE KMS for FetchS3Object. In case of fetching the
    S3 object, these strategies are handled implicitly / automatically.
    Set the encryption strategy on the fetched FF that was used to store
    the S3 object, instead of the one that is used to read the object (eg.
    non-encrypted or SSE S3 encrypted objects can be fetched with a CSE client).
    Typo fix.
    
    This closes #3787.
    
    Signed-off-by: Andy LoPresto <alopresto@apache.org>
---
 .../processors/aws/s3/AbstractS3Processor.java     |   5 +-
 .../nifi/processors/aws/s3/FetchS3Object.java      |  36 ++-
 .../apache/nifi/processors/aws/s3/PutS3Object.java |  13 +-
 ...egy.java => ClientSideCEncryptionStrategy.java} |  60 ++--
 .../ClientSideKMSEncryptionStrategy.java           |  14 +-
 .../aws/s3/encryption/S3EncryptionStrategy.java    |   2 +-
 ...egy.java => ServerSideCEncryptionStrategy.java} |  39 ++-
 .../encryption/ServerSideS3EncryptionStrategy.java |   6 +
 .../s3/encryption/StandardS3EncryptionService.java | 120 +++++---
 .../additionalDetails.html                         |  21 +-
 .../nifi/processors/aws/s3/ITPutS3Object.java      | 168 +++++++----
 .../aws/s3/encryption/S3EncryptionTestUtil.java}   |  30 +-
 ...ClientSideCEncryptionStrategyKeyValidation.java |  98 +++++++
 .../s3/encryption/TestS3EncryptionStrategies.java  |  26 +-
 ...ServerSideCEncryptionStrategyKeyValidation.java |  80 ++++++
 .../TestStandardS3EncryptionService.java           |  13 +-
 .../TestStandardS3EncryptionServiceValidation.java | 307 +++++++++++++++++++++
 .../aws/s3/AmazonS3EncryptionService.java          |  16 +-
 18 files changed, 878 insertions(+), 176 deletions(-)

diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java
index 5e8ff32..3f01543 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java
@@ -130,8 +130,9 @@ public abstract class AbstractS3Processor extends AbstractAWSCredentialsProvider
     public static final PropertyDescriptor ENCRYPTION_SERVICE = new PropertyDescriptor.Builder()
             .name("encryption-service")
             .displayName("Encryption Service")
-            .description("Specifies the Encryption Service Controller used configure requests.  "
-                    + "For backward compatibility, this value is ignored when 'Server Side Encryption' is set.")
+            .description("Specifies the Encryption Service Controller used to configure requests. " +
+                    "PutS3Object: For backward compatibility, this value is ignored when 'Server Side Encryption' is set. " +
+                    "FetchS3Object: Only needs to be configured in case of Server-side Customer Key, Client-side KMS and Client-side Customer Key encryptions.")
             .required(false)
             .identifiesControllerService(AmazonS3EncryptionService.class)
             .build();
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
index aa6233d..4f68a98 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
@@ -17,13 +17,16 @@
 package org.apache.nifi.processors.aws.s3;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
+import com.amazonaws.services.s3.model.SSEAlgorithm;
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
@@ -35,6 +38,8 @@ import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
@@ -67,7 +72,7 @@ import com.amazonaws.services.s3.model.S3Object;
     @WritesAttribute(attribute = "s3.expirationTimeRuleId", description = "The ID of the rule that dictates this object's expiration time"),
     @WritesAttribute(attribute = "s3.sseAlgorithm", description = "The server side encryption algorithm of the object"),
     @WritesAttribute(attribute = "s3.version", description = "The version of the S3 object"),
-    @WritesAttribute(attribute = "s3.encryptionStrategy", description = "The name of the encryption strategy, if any was set"),})
+    @WritesAttribute(attribute = "s3.encryptionStrategy", description = "The name of the encryption strategy that was used to store the S3 object (if it is encrypted)"),})
 public class FetchS3Object extends AbstractS3Processor {
 
     public static final PropertyDescriptor VERSION_ID = new PropertyDescriptor.Builder()
@@ -101,6 +106,27 @@ public class FetchS3Object extends AbstractS3Processor {
     }
 
     @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+        final List<ValidationResult> problems = new ArrayList<>(super.customValidate(validationContext));
+
+        AmazonS3EncryptionService encryptionService = validationContext.getProperty(ENCRYPTION_SERVICE).asControllerService(AmazonS3EncryptionService.class);
+        if (encryptionService != null) {
+            String strategyName = encryptionService.getStrategyName();
+            if (strategyName.equals(AmazonS3EncryptionService.STRATEGY_NAME_SSE_S3) || strategyName.equals(AmazonS3EncryptionService.STRATEGY_NAME_SSE_KMS)) {
+                problems.add(new ValidationResult.Builder()
+                        .subject(ENCRYPTION_SERVICE.getDisplayName())
+                        .valid(false)
+                        .explanation(encryptionService.getStrategyDisplayName() + " is not a valid encryption strategy for fetching objects. Decryption will be handled automatically " +
+                                "during the fetch of S3 objects encrypted with " + encryptionService.getStrategyDisplayName())
+                        .build()
+                );
+            }
+        }
+
+        return problems;
+    }
+
+    @Override
     public void onTrigger(final ProcessContext context, final ProcessSession session) {
         FlowFile flowFile = session.get();
         if (flowFile == null) {
@@ -169,7 +195,13 @@ public class FetchS3Object extends AbstractS3Processor {
                 attributes.putAll(metadata.getUserMetadata());
             }
             if (metadata.getSSEAlgorithm() != null) {
-                attributes.put("s3.sseAlgorithm", metadata.getSSEAlgorithm());
+                String sseAlgorithmName = metadata.getSSEAlgorithm();
+                attributes.put("s3.sseAlgorithm", sseAlgorithmName);
+                if (sseAlgorithmName.equals(SSEAlgorithm.AES256.getAlgorithm())) {
+                    attributes.put("s3.encryptionStrategy", AmazonS3EncryptionService.STRATEGY_NAME_SSE_S3);
+                } else if (sseAlgorithmName.equals(SSEAlgorithm.KMS.getAlgorithm())) {
+                    attributes.put("s3.encryptionStrategy", AmazonS3EncryptionService.STRATEGY_NAME_SSE_KMS);
+                }
             }
             if (metadata.getVersionId() != null) {
                 attributes.put("s3.version", metadata.getVersionId());
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
index 81c55b6..1c2bc01 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
@@ -664,6 +664,7 @@ public class PutS3Object extends AbstractS3Processor {
                             // upload parts
                             //------------------------------------------------------------
                             long thisPartSize;
+                            boolean isLastPart;
                             for (int part = currentState.getPartETags().size() + 1;
                                  currentState.getFilePosition() < currentState.getContentLength(); part++) {
                                 if (!PutS3Object.this.isScheduled()) {
@@ -672,13 +673,15 @@ public class PutS3Object extends AbstractS3Processor {
                                 }
                                 thisPartSize = Math.min(currentState.getPartSize(),
                                         (currentState.getContentLength() - currentState.getFilePosition()));
+                                isLastPart = currentState.getContentLength() == currentState.getFilePosition() + thisPartSize;
                                 UploadPartRequest uploadRequest = new UploadPartRequest()
                                         .withBucketName(bucket)
                                         .withKey(key)
                                         .withUploadId(currentState.getUploadId())
                                         .withInputStream(in)
                                         .withPartNumber(part)
-                                        .withPartSize(thisPartSize);
+                                        .withPartSize(thisPartSize)
+                                        .withLastPart(isLastPart);
                                 if (encryptionService != null) {
                                     encryptionService.configureUploadPartRequest(uploadRequest, objectMetadata);
                                 }
@@ -692,8 +695,14 @@ public class PutS3Object extends AbstractS3Processor {
                                         getLogger().info("Exception saving cache state processing flow file: " +
                                                 e.getMessage());
                                     }
+                                    int available = 0;
+                                    try {
+                                        available = in.available();
+                                    } catch (IOException e) {
+                                        // in case of the last part, the stream is already closed
+                                    }
                                     getLogger().info("Success uploading part flowfile={} part={} available={} " +
-                                            "etag={} uploadId={}", new Object[]{ffFilename, part, in.available(),
+                                            "etag={} uploadId={}", new Object[]{ffFilename, part, available,
                                             uploadPartResult.getETag(), currentState.getUploadId()});
                                 } catch (AmazonClientException e) {
                                     getLogger().info("Failure uploading part flowfile={} part={} bucket={} key={} " +
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/ClientSideCMKEncryptionStrategy.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/ClientSideCEncryptionStrategy.java
similarity index 64%
rename from nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/ClientSideCMKEncryptionStrategy.java
rename to nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/ClientSideCEncryptionStrategy.java
index d157ea5..45fdc5e 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/ClientSideCMKEncryptionStrategy.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/ClientSideCEncryptionStrategy.java
@@ -18,11 +18,8 @@ package org.apache.nifi.processors.aws.s3.encryption;
 
 import com.amazonaws.ClientConfiguration;
 import com.amazonaws.auth.AWSCredentialsProvider;
-import com.amazonaws.regions.Region;
-import com.amazonaws.regions.Regions;
 import com.amazonaws.services.s3.AmazonS3Client;
 import com.amazonaws.services.s3.AmazonS3EncryptionClient;
-import com.amazonaws.services.s3.model.CryptoConfiguration;
 import com.amazonaws.services.s3.model.EncryptionMaterials;
 import com.amazonaws.services.s3.model.StaticEncryptionMaterialsProvider;
 import org.apache.commons.codec.binary.Base64;
@@ -38,58 +35,65 @@ import javax.crypto.spec.SecretKeySpec;
  * See https://docs.aws.amazon.com/AmazonS3/latest/dev/UsingClientSideEncryption.html#client-side-encryption-client-side-master-key-intro
  *
  */
-public class ClientSideCMKEncryptionStrategy implements S3EncryptionStrategy {
+public class ClientSideCEncryptionStrategy implements S3EncryptionStrategy {
     /**
      * Create an encryption client.
      *
      * @param credentialsProvider AWS credentials provider.
      * @param clientConfiguration Client configuration
-     * @param region AWS region
+     * @param kmsRegion not used by this encryption strategy
      * @param keyIdOrMaterial client master key, always base64 encoded
      * @return AWS S3 client
      */
     @Override
-    public AmazonS3Client createEncryptionClient(AWSCredentialsProvider credentialsProvider, ClientConfiguration clientConfiguration, String region, String keyIdOrMaterial) throws SecurityException {
-        if (!validateKey(keyIdOrMaterial).isValid()) {
-            throw new SecurityException("Invalid client key; ensure key material is base64 encoded.");
+    public AmazonS3Client createEncryptionClient(AWSCredentialsProvider credentialsProvider, ClientConfiguration clientConfiguration, String kmsRegion, String keyIdOrMaterial) {
+        ValidationResult keyValidationResult = validateKey(keyIdOrMaterial);
+        if (!keyValidationResult.isValid()) {
+            throw new IllegalArgumentException("Invalid client key; " + keyValidationResult.getExplanation());
         }
 
         byte[] keyMaterial = Base64.decodeBase64(keyIdOrMaterial);
         SecretKeySpec symmetricKey = new SecretKeySpec(keyMaterial, "AES");
         StaticEncryptionMaterialsProvider encryptionMaterialsProvider = new StaticEncryptionMaterialsProvider(new EncryptionMaterials(symmetricKey));
-        boolean haveRegion = StringUtils.isNotBlank(region);
-        CryptoConfiguration cryptoConfig = new CryptoConfiguration();
-        Region awsRegion = null;
 
-        if (haveRegion) {
-            awsRegion = Region.getRegion(Regions.fromName(region));
-            cryptoConfig.setAwsKmsRegion(awsRegion);
-        }
-
-        AmazonS3EncryptionClient client = new AmazonS3EncryptionClient(credentialsProvider, encryptionMaterialsProvider, cryptoConfig);
-        if (haveRegion && awsRegion != null) {
-            client.setRegion(awsRegion);
-        }
+        AmazonS3EncryptionClient client = new AmazonS3EncryptionClient(credentialsProvider, encryptionMaterialsProvider);
 
         return client;
     }
 
+    @Override
     public ValidationResult validateKey(String keyValue) {
-        if (StringUtils.isBlank(keyValue) || !Base64.isBase64(keyValue)) {
-            return new ValidationResult.Builder().valid(false).build();
+        if (StringUtils.isBlank(keyValue)) {
+            return new ValidationResult.Builder()
+                    .subject("Key Material")
+                    .valid(false)
+                    .explanation("it is empty")
+                    .build();
         }
 
-        boolean decoded = false;
-        boolean sized = false;
         byte[] keyMaterial;
 
         try {
+            if (!Base64.isBase64(keyValue)) {
+                throw new Exception();
+            }
             keyMaterial = Base64.decodeBase64(keyValue);
-            decoded = true;
-            sized = keyMaterial.length == 32 || keyMaterial.length == 24 || keyMaterial.length == 16;
-        } catch (final Exception ignored) {
+        } catch (Exception e) {
+            return new ValidationResult.Builder()
+                    .subject("Key Material")
+                    .valid(false)
+                    .explanation("it is not in Base64 encoded form")
+                    .build();
+        }
+
+        if (!(keyMaterial.length == 32 || keyMaterial.length == 24 || keyMaterial.length == 16)) {
+            return new ValidationResult.Builder()
+                    .subject("Key Material")
+                    .valid(false)
+                    .explanation("it is not a Base64 encoded AES-256, AES-192 or AES-128 key")
+                    .build();
         }
 
-        return new ValidationResult.Builder().valid(decoded && sized).build();
+        return new ValidationResult.Builder().valid(true).build();
     }
 }
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/ClientSideKMSEncryptionStrategy.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/ClientSideKMSEncryptionStrategy.java
index e6d75e4..1da5dbb 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/ClientSideKMSEncryptionStrategy.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/ClientSideKMSEncryptionStrategy.java
@@ -39,26 +39,22 @@ public class ClientSideKMSEncryptionStrategy implements S3EncryptionStrategy {
      *
      * @param credentialsProvider AWS credentials provider.
      * @param clientConfiguration Client configuration
-     * @param region AWS region
+     * @param kmsRegion AWS KMS region
      * @param keyIdOrMaterial KMS key id
      * @return AWS S3 client
      */
     @Override
-    public AmazonS3Client createEncryptionClient(AWSCredentialsProvider credentialsProvider, ClientConfiguration clientConfiguration, String region, String keyIdOrMaterial) {
+    public AmazonS3Client createEncryptionClient(AWSCredentialsProvider credentialsProvider, ClientConfiguration clientConfiguration, String kmsRegion, String keyIdOrMaterial) {
         KMSEncryptionMaterialsProvider materialProvider = new KMSEncryptionMaterialsProvider(keyIdOrMaterial);
-        boolean haveRegion = StringUtils.isNotBlank(region);
-        Region awsRegion = null;
+        boolean haveKmsRegion = StringUtils.isNotBlank(kmsRegion);
 
         CryptoConfiguration cryptoConfig = new CryptoConfiguration();
-        if (haveRegion) {
-            awsRegion = Region.getRegion(Regions.fromName(region));
+        if (haveKmsRegion) {
+            Region awsRegion = Region.getRegion(Regions.fromName(kmsRegion));
             cryptoConfig.setAwsKmsRegion(awsRegion);
         }
 
         AmazonS3EncryptionClient client = new AmazonS3EncryptionClient(credentialsProvider, materialProvider, cryptoConfig);
-        if (haveRegion) {
-            client.setRegion(awsRegion);
-        }
 
         return client;
     }
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/S3EncryptionStrategy.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/S3EncryptionStrategy.java
index 677fc0e..a0012cb 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/S3EncryptionStrategy.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/S3EncryptionStrategy.java
@@ -76,7 +76,7 @@ public interface S3EncryptionStrategy {
      * @param clientConfiguration Client configuration.
      * @return {@link AmazonS3Client}, perhaps an {@link com.amazonaws.services.s3.AmazonS3EncryptionClient}
      */
-    default AmazonS3Client createEncryptionClient(AWSCredentialsProvider credentialsProvider, ClientConfiguration clientConfiguration, String region, String keyIdOrMaterial) throws SecurityException {
+    default AmazonS3Client createEncryptionClient(AWSCredentialsProvider credentialsProvider, ClientConfiguration clientConfiguration, String kmsRegion, String keyIdOrMaterial) {
         return null;
     }
 
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/ServerSideCEKEncryptionStrategy.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/ServerSideCEncryptionStrategy.java
similarity index 69%
rename from nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/ServerSideCEKEncryptionStrategy.java
rename to nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/ServerSideCEncryptionStrategy.java
index 231a5c8..574675c 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/ServerSideCEKEncryptionStrategy.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/ServerSideCEncryptionStrategy.java
@@ -22,8 +22,9 @@ import com.amazonaws.services.s3.model.ObjectMetadata;
 import com.amazonaws.services.s3.model.PutObjectRequest;
 import com.amazonaws.services.s3.model.SSECustomerKey;
 import com.amazonaws.services.s3.model.UploadPartRequest;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.components.ValidationResult;
-import org.bouncycastle.util.encoders.Base64;
 
 /**
  * This strategy uses a customer key to perform server-side encryption.  Use this strategy when you want the server to perform the encryption,
@@ -32,7 +33,7 @@ import org.bouncycastle.util.encoders.Base64;
  * See https://docs.aws.amazon.com/AmazonS3/latest/dev/ServerSideEncryptionCustomerKeys.html
  *
  */
-public class ServerSideCEKEncryptionStrategy implements S3EncryptionStrategy {
+public class ServerSideCEncryptionStrategy implements S3EncryptionStrategy {
     @Override
     public void configurePutObjectRequest(PutObjectRequest request, ObjectMetadata objectMetadata, String keyValue) {
         SSECustomerKey customerKey = new SSECustomerKey(keyValue);
@@ -59,17 +60,37 @@ public class ServerSideCEKEncryptionStrategy implements S3EncryptionStrategy {
 
     @Override
     public ValidationResult validateKey(String keyValue) {
-        boolean decoded = false;
-        boolean sized = false;
+        if (StringUtils.isBlank(keyValue)) {
+            return new ValidationResult.Builder()
+                    .subject("Key Material")
+                    .valid(false)
+                    .explanation("it is empty")
+                    .build();
+        }
+
         byte[] keyMaterial;
 
         try {
-            keyMaterial = Base64.decode(keyValue);
-            decoded = true;
-            sized = (keyMaterial.length > 0) && (keyMaterial.length % 32) == 0;
-        } catch (final Exception ignored) {
+            if (!org.apache.commons.codec.binary.Base64.isBase64(keyValue)) {
+                throw new Exception();
+            }
+            keyMaterial = Base64.decodeBase64(keyValue);
+        } catch (Exception e) {
+            return new ValidationResult.Builder()
+                    .subject("Key Material")
+                    .valid(false)
+                    .explanation("it is not in Base64 encoded form")
+                    .build();
+        }
+
+        if (keyMaterial.length != 32) {
+            return new ValidationResult.Builder()
+                    .subject("Key Material")
+                    .valid(false)
+                    .explanation("it is not a Base64 encoded AES-256 key")
+                    .build();
         }
 
-        return new ValidationResult.Builder().valid(decoded && sized).build();
+        return new ValidationResult.Builder().valid(true).build();
     }
 }
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/ServerSideS3EncryptionStrategy.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/ServerSideS3EncryptionStrategy.java
index 0845e12..89f1637 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/ServerSideS3EncryptionStrategy.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/ServerSideS3EncryptionStrategy.java
@@ -16,6 +16,7 @@
  */
 package org.apache.nifi.processors.aws.s3.encryption;
 
+import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
 import com.amazonaws.services.s3.model.ObjectMetadata;
 import com.amazonaws.services.s3.model.PutObjectRequest;
 
@@ -33,4 +34,9 @@ public class ServerSideS3EncryptionStrategy implements S3EncryptionStrategy {
     public void configurePutObjectRequest(PutObjectRequest request, ObjectMetadata objectMetadata, String keyValue) {
         objectMetadata.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION);
     }
+
+    @Override
+    public void configureInitiateMultipartUploadRequest(InitiateMultipartUploadRequest request, ObjectMetadata objectMetadata, String keyValue) {
+        objectMetadata.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION);
+    }
 }
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/StandardS3EncryptionService.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/StandardS3EncryptionService.java
index 21d8d9e..90c50e8 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/StandardS3EncryptionService.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/StandardS3EncryptionService.java
@@ -30,17 +30,18 @@ import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnEnabled;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.expression.ExpressionLanguageScope;
-import org.apache.nifi.processor.util.StandardValidators;
 
 import org.apache.nifi.processors.aws.s3.AmazonS3EncryptionService;
 import org.apache.nifi.processors.aws.s3.AbstractS3Processor;
 
 import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -52,39 +53,41 @@ import java.util.List;
 import java.util.Map;
 
 
-@Tags({"service", "encryption", "encrypt", "decryption", "decrypt", "key"})
+@Tags({"service", "aws", "s3", "encryption", "encrypt", "decryption", "decrypt", "key"})
 @CapabilityDescription("Adds configurable encryption to S3 Put and S3 Fetch operations.")
 public class StandardS3EncryptionService extends AbstractControllerService implements AmazonS3EncryptionService {
     private static final Logger logger = LoggerFactory.getLogger(StandardS3EncryptionService.class);
 
-    public static final String STRATEGY_NAME_NONE = "NONE";
-    public static final String STRATEGY_NAME_SSE_S3 = "SSE_S3";
-    public static final String STRATEGY_NAME_SSE_KMS = "SSE_KMS";
-    public static final String STRATEGY_NAME_SSE_C = "SSE_C";
-    public static final String STRATEGY_NAME_CSE_KMS = "CSE_KMS";
-    public static final String STRATEGY_NAME_CSE_CMK = "CSE_CMK";
-
-    private static final Map<String, S3EncryptionStrategy> namedStrategies = new HashMap<String, S3EncryptionStrategy>() {{
+    private static final Map<String, S3EncryptionStrategy> NAMED_STRATEGIES = new HashMap<String, S3EncryptionStrategy>() {{
         put(STRATEGY_NAME_NONE, new NoOpEncryptionStrategy());
         put(STRATEGY_NAME_SSE_S3, new ServerSideS3EncryptionStrategy());
         put(STRATEGY_NAME_SSE_KMS, new ServerSideKMSEncryptionStrategy());
-        put(STRATEGY_NAME_SSE_C, new ServerSideCEKEncryptionStrategy());
+        put(STRATEGY_NAME_SSE_C, new ServerSideCEncryptionStrategy());
         put(STRATEGY_NAME_CSE_KMS, new ClientSideKMSEncryptionStrategy());
-        put(STRATEGY_NAME_CSE_CMK, new ClientSideCMKEncryptionStrategy());
+        put(STRATEGY_NAME_CSE_C, new ClientSideCEncryptionStrategy());
     }};
 
     private static final AllowableValue NONE = new AllowableValue(STRATEGY_NAME_NONE, "None","No encryption.");
     private static final AllowableValue SSE_S3 = new AllowableValue(STRATEGY_NAME_SSE_S3, "Server-side S3","Use server-side, S3-managed encryption.");
     private static final AllowableValue SSE_KMS = new AllowableValue(STRATEGY_NAME_SSE_KMS, "Server-side KMS","Use server-side, KMS key to perform encryption.");
-    private static final AllowableValue SSE_C = new AllowableValue(STRATEGY_NAME_SSE_C, "Server-side Customer Key","Use server-side, customer-supplied key for encryption.");
+    private static final AllowableValue SSE_C = new AllowableValue(STRATEGY_NAME_SSE_C, "Server-side Customer Key","Use server-side, customer-supplied key to perform encryption.");
     private static final AllowableValue CSE_KMS = new AllowableValue(STRATEGY_NAME_CSE_KMS, "Client-side KMS","Use client-side, KMS key to perform encryption.");
-    private static final AllowableValue CSE_CMK = new AllowableValue(STRATEGY_NAME_CSE_CMK, "Client-side Customer Master Key","Use client-side, customer-supplied master key to perform encryption.");
+    private static final AllowableValue CSE_C = new AllowableValue(STRATEGY_NAME_CSE_C, "Client-side Customer Key","Use client-side, customer-supplied key to perform encryption.");
+
+    public static final Map<String, AllowableValue> ENCRYPTION_STRATEGY_ALLOWABLE_VALUES = new HashMap<String, AllowableValue>() {{
+        put(STRATEGY_NAME_NONE, NONE);
+        put(STRATEGY_NAME_SSE_S3, SSE_S3);
+        put(STRATEGY_NAME_SSE_KMS, SSE_KMS);
+        put(STRATEGY_NAME_SSE_C, SSE_C);
+        put(STRATEGY_NAME_CSE_KMS, CSE_KMS);
+        put(STRATEGY_NAME_CSE_C, CSE_C);
+    }};
 
     public static final PropertyDescriptor ENCRYPTION_STRATEGY = new PropertyDescriptor.Builder()
             .name("encryption-strategy")
             .displayName("Encryption Strategy")
             .description("Strategy to use for S3 data encryption and decryption.")
-            .allowableValues(NONE, SSE_S3, SSE_KMS, SSE_C, CSE_KMS, CSE_CMK)
+            .allowableValues(NONE, SSE_S3, SSE_KMS, SSE_C, CSE_KMS, CSE_C)
             .required(true)
             .defaultValue(NONE.getValue())
             .build();
@@ -92,34 +95,38 @@ public class StandardS3EncryptionService extends AbstractControllerService imple
     public static final PropertyDescriptor ENCRYPTION_VALUE = new PropertyDescriptor.Builder()
             .name("key-id-or-key-material")
             .displayName("Key ID or Key Material")
-            .description("For Server-side CEK and Client-side CMK, this is base64-encoded Key Material.  For all others (except 'None'), it is the KMS Key ID.")
+            .description("For None and Server-side S3: not used. For Server-side KMS and Client-side KMS: the KMS Key ID must be configured. " +
+                    "For Server-side Customer Key and Client-side Customer Key: the Key Material must be specified in Base64 encoded form. " +
+                    "In case of Server-side Customer Key, the key must be an AES-256 key. In case of Client-side Customer Key, it can be an AES-256, AES-192 or AES-128 key.")
             .required(false)
             .sensitive(true)
-            .addValidator(new StandardValidators.StringLengthValidator(0, 4096))
+            .addValidator((subject, input, context) -> new ValidationResult.Builder().valid(true).build()) // will be validated in customValidate()
             .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .build();
 
-    public static final PropertyDescriptor REGION = new PropertyDescriptor.Builder()
-            .name("region")
+    public static final PropertyDescriptor KMS_REGION = new PropertyDescriptor.Builder()
+            .name("kms-region")
+            .displayName("KMS Region")
+            .description("The Region of the AWS Key Management Service. Only used in case of Client-side KMS.")
             .required(false)
             .allowableValues(AbstractS3Processor.getAvailableRegions())
             .defaultValue(AbstractS3Processor.createAllowableValue(Regions.DEFAULT_REGION).getValue())
             .build();
 
     private String keyValue = "";
-    private String region = "";
+    private String kmsRegion = "";
     private S3EncryptionStrategy encryptionStrategy = new NoOpEncryptionStrategy();
     private String strategyName = STRATEGY_NAME_NONE;
 
     @OnEnabled
     public void onConfigured(final ConfigurationContext context) throws InitializationException {
         final String newStrategyName = context.getProperty(ENCRYPTION_STRATEGY).getValue();
-        final String newKeyValue = context.getProperty(ENCRYPTION_VALUE).getValue();
-        final S3EncryptionStrategy newEncryptionStrategy = namedStrategies.get(newStrategyName);
-        String newRegion = null;
+        final String newKeyValue = context.getProperty(ENCRYPTION_VALUE).evaluateAttributeExpressions().getValue();
+        final S3EncryptionStrategy newEncryptionStrategy = NAMED_STRATEGIES.get(newStrategyName);
+        String newKmsRegion = null;
 
-        if (context.getProperty(REGION) != null ) {
-            newRegion = context.getProperty(REGION).getValue();
+        if (context.getProperty(KMS_REGION) != null ) {
+            newKmsRegion = context.getProperty(KMS_REGION).getValue();
         }
 
         if (newEncryptionStrategy == null) {
@@ -131,13 +138,59 @@ public class StandardS3EncryptionService extends AbstractControllerService imple
         strategyName = newStrategyName;
         encryptionStrategy = newEncryptionStrategy;
         keyValue = newKeyValue;
-        region = newRegion;
+        kmsRegion = newKmsRegion;
     }
 
     @Override
     protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
         Collection<ValidationResult> validationResults = new ArrayList<>();
-        validationResults.add(encryptionStrategy.validateKey(validationContext.getProperty(ENCRYPTION_VALUE).getValue()));
+
+        String encryptionStrategyName = validationContext.getProperty(ENCRYPTION_STRATEGY).getValue();
+        String encryptionStrategyDisplayName = ENCRYPTION_STRATEGY_ALLOWABLE_VALUES.get(encryptionStrategyName).getDisplayName();
+        PropertyValue encryptionValueProperty = validationContext.getProperty(ENCRYPTION_VALUE);
+        String encryptionValue = encryptionValueProperty.evaluateAttributeExpressions().getValue();
+
+        switch (encryptionStrategyName) {
+            case STRATEGY_NAME_NONE:
+            case STRATEGY_NAME_SSE_S3:
+                if (encryptionValueProperty.isSet()) {
+                    validationResults.add(new ValidationResult.Builder()
+                            .subject(ENCRYPTION_VALUE.getDisplayName())
+                            .valid(false)
+                            .explanation("the property cannot be specified for encryption strategy " + encryptionStrategyDisplayName)
+                            .build()
+                    );
+                }
+                break;
+            case STRATEGY_NAME_SSE_KMS:
+            case STRATEGY_NAME_CSE_KMS:
+                if (StringUtils.isEmpty(encryptionValue)) {
+                    validationResults.add(new ValidationResult.Builder()
+                            .subject(ENCRYPTION_VALUE.getDisplayName())
+                            .valid(false)
+                            .explanation("a non-empty Key ID must be specified for encryption strategy " + encryptionStrategyDisplayName)
+                            .build()
+                    );
+                }
+                break;
+            case STRATEGY_NAME_SSE_C:
+            case STRATEGY_NAME_CSE_C:
+                if (StringUtils.isEmpty(encryptionValue)) {
+                    validationResults.add(new ValidationResult.Builder()
+                            .subject(ENCRYPTION_VALUE.getDisplayName())
+                            .valid(false)
+                            .explanation("a non-empty Key Material must be specified for encryption strategy " + encryptionStrategyDisplayName)
+                            .build()
+                    );
+                } else {
+                    S3EncryptionStrategy encryptionStrategy = NAMED_STRATEGIES.get(encryptionStrategyName);
+                    String keyIdOrMaterial = validationContext.getProperty(ENCRYPTION_VALUE).evaluateAttributeExpressions().getValue();
+
+                    validationResults.add(encryptionStrategy.validateKey(keyIdOrMaterial));
+                }
+                break;
+        }
+
         return validationResults;
     }
 
@@ -146,7 +199,7 @@ public class StandardS3EncryptionService extends AbstractControllerService imple
         final List<PropertyDescriptor> properties = new ArrayList<>();
         properties.add(ENCRYPTION_STRATEGY);
         properties.add(ENCRYPTION_VALUE);
-        properties.add(REGION);
+        properties.add(KMS_REGION);
         return Collections.unmodifiableList(properties);
     }
 
@@ -172,18 +225,23 @@ public class StandardS3EncryptionService extends AbstractControllerService imple
 
     @Override
     public AmazonS3Client createEncryptionClient(AWSCredentialsProvider credentialsProvider, ClientConfiguration clientConfiguration) {
-        return encryptionStrategy.createEncryptionClient(credentialsProvider, clientConfiguration, region, keyValue);
+        return encryptionStrategy.createEncryptionClient(credentialsProvider, clientConfiguration, kmsRegion, keyValue);
     }
 
     @Override
-    public String getRegion() {
-        return region;
+    public String getKmsRegion() {
+        return kmsRegion;
     }
 
     @Override
     public String getStrategyName() {
         return strategyName;
     }
+
+    @Override
+    public String getStrategyDisplayName() {
+        return ENCRYPTION_STRATEGY_ALLOWABLE_VALUES.get(strategyName).getDisplayName();
+    }
 }
 
 
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.s3.encryption.S3EncryptionService/additionalDetails.html b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.s3.encryption.StandardS3EncryptionService/additionalDetails.html
similarity index 70%
rename from nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.s3.encryption.S3EncryptionService/additionalDetails.html
rename to nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.s3.encryption.StandardS3EncryptionService/additionalDetails.html
index 108a0b7..b53cffe 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.s3.encryption.S3EncryptionService/additionalDetails.html
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.s3.encryption.StandardS3EncryptionService/additionalDetails.html
@@ -21,15 +21,15 @@
     <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
 </head>
 <body>
-
+<h2>Description</h2>
 <div>
-    The <code>S3EncryptionService</code> manages an encryption strategy and applies that strategy to various S3 operations.
+    The <code>StandardS3EncryptionService</code> manages an encryption strategy and applies that strategy to various S3 operations.
 
     <br>
 
-    <b>Note:</b> this service has no effect when a processor has the <code>SERVER_SIDE_ENCRYPTION</code> property set.  To use
+    <b>Note:</b> This service has no effect when a processor has the <code>Server Side Encryption</code> property set. To use
     this service with processors so configured, first create a service instance, set the <code>Encryption Strategy</code> to <code>Server-side S3</code>,
-    disable the <code>SERVER_SIDE_ENCRYPTION</code> processor setting, and finally, associate the processor with the service.
+    disable the <code>Server Side Encryption</code> processor setting, and finally, associate the processor with the service.
 </div>
 
 
@@ -44,27 +44,26 @@
         <li><code>Server-side S3</code> - encryption and decryption is managed by S3; no keys are required.</li>
         <li><code>Server-side KMS</code> - encryption and decryption are performed by S3 using the configured KMS key.</li>
         <li><code>Server-side Customer Key</code> - encryption and decryption are performed by S3 using the supplied customer key.</li>
-        <li><code>Client-side KMS</code> - like the server-side KMS strategy, with the encryption and decryption performed by the client.</li>
-        <li><code>Client-side Customer Master Key</code> - like the server-side CEK strategy, with the encryption and decryption performed by the client.</li>
+        <li><code>Client-side KMS</code> - like the Server-side KMS strategy, with the encryption and decryption performed by the client.</li>
+        <li><code>Client-side Customer Key</code> - like the Server-side Customer Key strategy, with the encryption and decryption performed by the client.</li>
     </ul>
 </div>
 
 <h3>Key ID or Key Material</h3>
 <p>
-    When configured for either the server-side or client-side KMS strategy, this field should contain the ID or alias
-    of that key.
+    When configured for either the Server-side or Client-side KMS strategies, this field should contain the KMS Key ID.
 </p>
 <p>
-    When configured for either the server-side or client-side customer key strategies, this field should contain the key
+    When configured for either the Server-side or Client-side Customer Key strategies, this field should contain the key
     material, and that material must be base64 encoded.
 </p>
 <p>
     All other encryption strategies ignore this field.
 </p>
 
-<h3>Region</h3>
+<h3>KMS Region</h3>
 <div>
-    KMS key region, if any.  This value must match the actual region of the KMS key if supplied.
+    KMS key region, if any. This value must match the actual region of the KMS key if supplied.
 </div>
 
 </body>
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java
index 7a2cf3c..dd5f990 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java
@@ -946,12 +946,23 @@ public class ITPutS3Object extends AbstractS3IT {
     }
 
     @Test
-    public void testEncryptionServiceWithServerSideS3ManagedEncryptionStrategy() throws IOException, InitializationException {
-        TestRunner runner = createPutEncryptionTestRunner(StandardS3EncryptionService.STRATEGY_NAME_SSE_S3, "");
+    public void testEncryptionServiceWithServerSideS3EncryptionStrategyUsingSingleUpload() throws IOException, InitializationException {
+        byte[] smallData = Files.readAllBytes(getResourcePath(SAMPLE_FILE_RESOURCE_NAME));
+        testEncryptionServiceWithServerSideS3EncryptionStrategy(smallData);
+    }
+
+    @Test
+    public void testEncryptionServiceWithServerSideS3EncryptionStrategyUsingMultipartUpload() throws IOException, InitializationException {
+        byte[] largeData = new byte[51 * 1024 * 1024];
+        testEncryptionServiceWithServerSideS3EncryptionStrategy(largeData);
+    }
+
+    private void testEncryptionServiceWithServerSideS3EncryptionStrategy(byte[] data) throws IOException, InitializationException {
+        TestRunner runner = createPutEncryptionTestRunner(AmazonS3EncryptionService.STRATEGY_NAME_SSE_S3, null);
 
         Map<String, String> attrs = new HashMap<>();
         attrs.put("filename", "test.txt");
-        runner.enqueue(getResourcePath(SAMPLE_FILE_RESOURCE_NAME), attrs);
+        runner.enqueue(data, attrs);
         runner.assertValid();
         runner.run();
         runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS);
@@ -960,21 +971,32 @@ public class ITPutS3Object extends AbstractS3IT {
         Assert.assertEquals(1, flowFiles.size());
         Assert.assertEquals(0, runner.getFlowFilesForRelationship(PutS3Object.REL_FAILURE).size());
         MockFlowFile putSuccess = flowFiles.get(0);
-        Assert.assertEquals(putSuccess.getAttribute(PutS3Object.S3_ENCRYPTION_STRATEGY), StandardS3EncryptionService.STRATEGY_NAME_SSE_S3);
+        Assert.assertEquals(putSuccess.getAttribute(PutS3Object.S3_ENCRYPTION_STRATEGY), AmazonS3EncryptionService.STRATEGY_NAME_SSE_S3);
 
-        MockFlowFile flowFile = fetchEncryptedFlowFile(attrs, StandardS3EncryptionService.STRATEGY_NAME_SSE_S3, "");
-        flowFile.assertContentEquals(getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME));
+        MockFlowFile flowFile = fetchEncryptedFlowFile(attrs, AmazonS3EncryptionService.STRATEGY_NAME_SSE_S3, null);
+        flowFile.assertContentEquals(data);
         flowFile.assertAttributeEquals(PutS3Object.S3_SSE_ALGORITHM, ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION);
-        flowFile.assertAttributeEquals(PutS3Object.S3_ENCRYPTION_STRATEGY, StandardS3EncryptionService.STRATEGY_NAME_SSE_S3);
+        flowFile.assertAttributeEquals(PutS3Object.S3_ENCRYPTION_STRATEGY, AmazonS3EncryptionService.STRATEGY_NAME_SSE_S3);
     }
 
     @Test
-    public void testEncryptionServiceWithServerSideKMSEncryptionStrategy() throws IOException, InitializationException {
-        TestRunner runner = createPutEncryptionTestRunner(StandardS3EncryptionService.STRATEGY_NAME_SSE_KMS, kmsKeyId);
+    public void testEncryptionServiceWithServerSideKMSEncryptionStrategyUsingSingleUpload() throws IOException, InitializationException {
+        byte[] smallData = Files.readAllBytes(getResourcePath(SAMPLE_FILE_RESOURCE_NAME));
+        testEncryptionServiceWithServerSideKMSEncryptionStrategy(smallData);
+    }
+
+    @Test
+    public void testEncryptionServiceWithServerSideKMSEncryptionStrategyUsingMultipartUpload() throws IOException, InitializationException {
+        byte[] largeData = new byte[51 * 1024 * 1024];
+        testEncryptionServiceWithServerSideKMSEncryptionStrategy(largeData);
+    }
+
+    private void testEncryptionServiceWithServerSideKMSEncryptionStrategy(byte[] data) throws IOException, InitializationException {
+        TestRunner runner = createPutEncryptionTestRunner(AmazonS3EncryptionService.STRATEGY_NAME_SSE_KMS, kmsKeyId);
 
         final Map<String, String> attrs = new HashMap<>();
         attrs.put("filename", "test.txt");
-        runner.enqueue(getResourcePath(SAMPLE_FILE_RESOURCE_NAME), attrs);
+        runner.enqueue(data, attrs);
         runner.assertValid();
         runner.run();
         runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS);
@@ -983,21 +1005,32 @@ public class ITPutS3Object extends AbstractS3IT {
         Assert.assertEquals(1, flowFiles.size());
         Assert.assertEquals(0, runner.getFlowFilesForRelationship(PutS3Object.REL_FAILURE).size());
         MockFlowFile putSuccess = flowFiles.get(0);
-        Assert.assertEquals(putSuccess.getAttribute(PutS3Object.S3_ENCRYPTION_STRATEGY), StandardS3EncryptionService.STRATEGY_NAME_SSE_KMS);
+        Assert.assertEquals(putSuccess.getAttribute(PutS3Object.S3_ENCRYPTION_STRATEGY), AmazonS3EncryptionService.STRATEGY_NAME_SSE_KMS);
 
-        MockFlowFile flowFile = fetchEncryptedFlowFile(attrs, StandardS3EncryptionService.STRATEGY_NAME_SSE_KMS, kmsKeyId);
-        flowFile.assertContentEquals(getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME));
+        MockFlowFile flowFile = fetchEncryptedFlowFile(attrs, AmazonS3EncryptionService.STRATEGY_NAME_SSE_KMS, kmsKeyId);
+        flowFile.assertContentEquals(data);
         flowFile.assertAttributeEquals(PutS3Object.S3_SSE_ALGORITHM, "aws:kms");
-        flowFile.assertAttributeEquals(PutS3Object.S3_ENCRYPTION_STRATEGY, StandardS3EncryptionService.STRATEGY_NAME_SSE_KMS);
+        flowFile.assertAttributeEquals(PutS3Object.S3_ENCRYPTION_STRATEGY, AmazonS3EncryptionService.STRATEGY_NAME_SSE_KMS);
     }
 
     @Test
-    public void testEncryptionServiceWithServerSideCPEKEncryptionStrategy() throws IOException, InitializationException {
-        TestRunner runner = createPutEncryptionTestRunner(StandardS3EncryptionService.STRATEGY_NAME_SSE_C, randomKeyMaterial);
+    public void testEncryptionServiceWithServerSideCEncryptionStrategyUsingSingleUpload() throws IOException, InitializationException {
+        byte[] smallData = Files.readAllBytes(getResourcePath(SAMPLE_FILE_RESOURCE_NAME));
+        testEncryptionServiceWithServerSideCEncryptionStrategy(smallData);
+    }
+
+    @Test
+    public void testEncryptionServiceWithServerSideCEncryptionStrategyUsingMultipartUpload() throws IOException, InitializationException {
+        byte[] largeData = new byte[51 * 1024 * 1024];
+        testEncryptionServiceWithServerSideCEncryptionStrategy(largeData);
+    }
+
+    private void testEncryptionServiceWithServerSideCEncryptionStrategy(byte[] data) throws IOException, InitializationException {
+        TestRunner runner = createPutEncryptionTestRunner(AmazonS3EncryptionService.STRATEGY_NAME_SSE_C, randomKeyMaterial);
 
         final Map<String, String> attrs = new HashMap<>();
         attrs.put("filename", "test.txt");
-        runner.enqueue(getResourcePath(SAMPLE_FILE_RESOURCE_NAME), attrs);
+        runner.enqueue(data, attrs);
         runner.assertValid();
         runner.run();
         runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS);
@@ -1006,23 +1039,34 @@ public class ITPutS3Object extends AbstractS3IT {
         Assert.assertEquals(1, flowFiles.size());
         Assert.assertEquals(0, runner.getFlowFilesForRelationship(PutS3Object.REL_FAILURE).size());
         MockFlowFile putSuccess = flowFiles.get(0);
-        Assert.assertEquals(putSuccess.getAttribute(PutS3Object.S3_ENCRYPTION_STRATEGY), StandardS3EncryptionService.STRATEGY_NAME_SSE_C);
+        Assert.assertEquals(putSuccess.getAttribute(PutS3Object.S3_ENCRYPTION_STRATEGY), AmazonS3EncryptionService.STRATEGY_NAME_SSE_C);
 
-        MockFlowFile flowFile = fetchEncryptedFlowFile(attrs, StandardS3EncryptionService.STRATEGY_NAME_SSE_C, randomKeyMaterial);
-        flowFile.assertContentEquals(getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME));
+        MockFlowFile flowFile = fetchEncryptedFlowFile(attrs, AmazonS3EncryptionService.STRATEGY_NAME_SSE_C, randomKeyMaterial);
+        flowFile.assertContentEquals(data);
         // successful fetch does not indicate type of original encryption:
         flowFile.assertAttributeEquals(PutS3Object.S3_SSE_ALGORITHM, null);
         // but it does indicate it via our specific attribute:
-        flowFile.assertAttributeEquals(PutS3Object.S3_ENCRYPTION_STRATEGY, StandardS3EncryptionService.STRATEGY_NAME_SSE_C);
+        flowFile.assertAttributeEquals(PutS3Object.S3_ENCRYPTION_STRATEGY, AmazonS3EncryptionService.STRATEGY_NAME_SSE_C);
+    }
+
+    @Test
+    public void testEncryptionServiceWithClientSideKMSEncryptionStrategyUsingSingleUpload() throws IOException, InitializationException {
+        byte[] smallData = Files.readAllBytes(getResourcePath(SAMPLE_FILE_RESOURCE_NAME));
+        testEncryptionServiceWithClientSideKMSEncryptionStrategy(smallData);
     }
 
     @Test
-    public void testEncryptionServiceWithClientSideKMSEncryptionStrategy() throws InitializationException, IOException {
-        TestRunner runner = createPutEncryptionTestRunner(StandardS3EncryptionService.STRATEGY_NAME_CSE_KMS, kmsKeyId);
+    public void testEncryptionServiceWithClientSideKMSEncryptionStrategyUsingMultipartUpload() throws IOException, InitializationException {
+        byte[] largeData = new byte[51 * 1024 * 1024];
+        testEncryptionServiceWithClientSideKMSEncryptionStrategy(largeData);
+    }
+
+    private void testEncryptionServiceWithClientSideKMSEncryptionStrategy(byte[] data) throws InitializationException, IOException {
+        TestRunner runner = createPutEncryptionTestRunner(AmazonS3EncryptionService.STRATEGY_NAME_CSE_KMS, kmsKeyId);
 
         final Map<String, String> attrs = new HashMap<>();
         attrs.put("filename", "test.txt");
-        runner.enqueue(getResourcePath(SAMPLE_FILE_RESOURCE_NAME), attrs);
+        runner.enqueue(data, attrs);
         runner.assertValid();
         runner.run();
         runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS);
@@ -1031,21 +1075,32 @@ public class ITPutS3Object extends AbstractS3IT {
         Assert.assertEquals(1, flowFiles.size());
         Assert.assertEquals(0, runner.getFlowFilesForRelationship(PutS3Object.REL_FAILURE).size());
         MockFlowFile putSuccess = flowFiles.get(0);
-        Assert.assertEquals(putSuccess.getAttribute(PutS3Object.S3_ENCRYPTION_STRATEGY), StandardS3EncryptionService.STRATEGY_NAME_CSE_KMS);
+        Assert.assertEquals(putSuccess.getAttribute(PutS3Object.S3_ENCRYPTION_STRATEGY), AmazonS3EncryptionService.STRATEGY_NAME_CSE_KMS);
 
-        MockFlowFile flowFile = fetchEncryptedFlowFile(attrs, StandardS3EncryptionService.STRATEGY_NAME_CSE_KMS, kmsKeyId);
-        flowFile.assertContentEquals(getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME));
+        MockFlowFile flowFile = fetchEncryptedFlowFile(attrs, AmazonS3EncryptionService.STRATEGY_NAME_CSE_KMS, kmsKeyId);
+        flowFile.assertContentEquals(data);
         flowFile.assertAttributeEquals("x-amz-wrap-alg", "kms");
-        flowFile.assertAttributeEquals(PutS3Object.S3_ENCRYPTION_STRATEGY, StandardS3EncryptionService.STRATEGY_NAME_CSE_KMS);
+        flowFile.assertAttributeEquals(PutS3Object.S3_ENCRYPTION_STRATEGY, AmazonS3EncryptionService.STRATEGY_NAME_CSE_KMS);
     }
 
     @Test
-    public void testEncryptionServiceWithClientSideCMKEncryptionStrategy() throws InitializationException, IOException {
-        TestRunner runner = createPutEncryptionTestRunner(StandardS3EncryptionService.STRATEGY_NAME_CSE_CMK, randomKeyMaterial);
+    public void testEncryptionServiceWithClientSideCEncryptionStrategyUsingSingleUpload() throws InitializationException, IOException {
+        byte[] smallData = Files.readAllBytes(getResourcePath(SAMPLE_FILE_RESOURCE_NAME));
+        testEncryptionServiceWithClientSideCEncryptionStrategy(smallData);
+    }
+
+    @Test
+    public void testEncryptionServiceWithClientSideCEncryptionStrategyUsingMultipartUpload() throws IOException, InitializationException {
+        byte[] largeData = new byte[51 * 1024 * 1024];
+        testEncryptionServiceWithClientSideCEncryptionStrategy(largeData);
+    }
+
+    private void testEncryptionServiceWithClientSideCEncryptionStrategy(byte[] data) throws InitializationException, IOException {
+        TestRunner runner = createPutEncryptionTestRunner(AmazonS3EncryptionService.STRATEGY_NAME_CSE_C, randomKeyMaterial);
 
         final Map<String, String> attrs = new HashMap<>();
         attrs.put("filename", "test.txt");
-        runner.enqueue(getResourcePath(SAMPLE_FILE_RESOURCE_NAME), attrs);
+        runner.enqueue(data, attrs);
         runner.assertValid();
         runner.run();
         runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS);
@@ -1054,11 +1109,11 @@ public class ITPutS3Object extends AbstractS3IT {
         Assert.assertEquals(1, flowFiles.size());
         Assert.assertEquals(0, runner.getFlowFilesForRelationship(PutS3Object.REL_FAILURE).size());
         MockFlowFile putSuccess = flowFiles.get(0);
-        Assert.assertEquals(putSuccess.getAttribute(PutS3Object.S3_ENCRYPTION_STRATEGY), StandardS3EncryptionService.STRATEGY_NAME_CSE_CMK);
+        Assert.assertEquals(putSuccess.getAttribute(PutS3Object.S3_ENCRYPTION_STRATEGY), AmazonS3EncryptionService.STRATEGY_NAME_CSE_C);
 
-        MockFlowFile flowFile = fetchEncryptedFlowFile(attrs, StandardS3EncryptionService.STRATEGY_NAME_CSE_CMK, randomKeyMaterial);
-        flowFile.assertAttributeEquals(PutS3Object.S3_ENCRYPTION_STRATEGY, StandardS3EncryptionService.STRATEGY_NAME_CSE_CMK);
-        flowFile.assertContentEquals(getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME));
+        MockFlowFile flowFile = fetchEncryptedFlowFile(attrs, AmazonS3EncryptionService.STRATEGY_NAME_CSE_C, randomKeyMaterial);
+        flowFile.assertAttributeEquals(PutS3Object.S3_ENCRYPTION_STRATEGY, AmazonS3EncryptionService.STRATEGY_NAME_CSE_C);
+        flowFile.assertContentEquals(data);
 
         flowFile.assertAttributeExists("x-amz-key");
         flowFile.assertAttributeNotEquals("x-amz-key", "");
@@ -1068,11 +1123,24 @@ public class ITPutS3Object extends AbstractS3IT {
     }
 
     private static TestRunner createPutEncryptionTestRunner(String strategyName, String keyIdOrMaterial) throws InitializationException {
-        return createEncryptionTestRunner(new PutS3Object(), strategyName, keyIdOrMaterial);
+        TestRunner runner = createEncryptionTestRunner(new PutS3Object(), strategyName, keyIdOrMaterial);
+
+        runner.setProperty(PutS3Object.MULTIPART_THRESHOLD, "50 MB");
+        runner.setProperty(PutS3Object.MULTIPART_PART_SIZE, "50 MB");
+
+        return runner;
+    }
+
+    private static TestRunner createFetchEncryptionTestRunner(String strategyName, String keyIdOrMaterial) throws InitializationException {
+        if (strategyName.equals(AmazonS3EncryptionService.STRATEGY_NAME_SSE_S3) || strategyName.equals(AmazonS3EncryptionService.STRATEGY_NAME_SSE_KMS)) {
+            strategyName = null;
+        }
+
+        return createEncryptionTestRunner(new FetchS3Object(), strategyName, keyIdOrMaterial);
     }
 
     private static MockFlowFile fetchEncryptedFlowFile(Map<String, String> attributes, String strategyName, String keyIdOrMaterial) throws InitializationException {
-        final TestRunner runner = createEncryptionTestRunner(new FetchS3Object(), strategyName, keyIdOrMaterial);
+        final TestRunner runner = createFetchEncryptionTestRunner(strategyName, keyIdOrMaterial);
         runner.enqueue(new byte[0], attributes);
         runner.run(1);
         runner.assertAllFlowFilesTransferred(FetchS3Object.REL_SUCCESS, 1);
@@ -1082,24 +1150,28 @@ public class ITPutS3Object extends AbstractS3IT {
 
     private static TestRunner createEncryptionTestRunner(Processor processor, String strategyName, String keyIdOrMaterial) throws InitializationException {
         final TestRunner runner = TestRunners.newTestRunner(processor);
-        final StandardS3EncryptionService service = new StandardS3EncryptionService();
         final ConfigurationContext context = mock(ConfigurationContext.class);
 
-        runner.addControllerService(PutS3Object.ENCRYPTION_SERVICE.getName(), service);
         runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
         runner.setProperty(PutS3Object.REGION, REGION);
         runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME);
-        runner.setProperty(PutS3Object.ENCRYPTION_SERVICE, service.getIdentifier());
-        runner.setProperty(service, StandardS3EncryptionService.ENCRYPTION_STRATEGY, strategyName);
-        runner.setProperty(service, StandardS3EncryptionService.ENCRYPTION_VALUE, keyIdOrMaterial);
-        runner.setProperty(service, StandardS3EncryptionService.REGION, REGION);
 
-        when(context.getProperty(StandardS3EncryptionService.ENCRYPTION_STRATEGY)).thenReturn(new MockPropertyValue(strategyName));
-        when(context.getProperty(StandardS3EncryptionService.ENCRYPTION_VALUE)).thenReturn(new MockPropertyValue(keyIdOrMaterial));
-        when(context.getProperty(StandardS3EncryptionService.REGION)).thenReturn(new MockPropertyValue(REGION));
+        if (strategyName != null) {
+            final StandardS3EncryptionService service = new StandardS3EncryptionService();
+            runner.addControllerService(PutS3Object.ENCRYPTION_SERVICE.getName(), service);
+            runner.setProperty(PutS3Object.ENCRYPTION_SERVICE, service.getIdentifier());
+
+            runner.setProperty(service, StandardS3EncryptionService.ENCRYPTION_STRATEGY, strategyName);
+            runner.setProperty(service, StandardS3EncryptionService.ENCRYPTION_VALUE, keyIdOrMaterial);
+            runner.setProperty(service, StandardS3EncryptionService.KMS_REGION, REGION);
 
-        service.onConfigured(context);
-        runner.enableControllerService(service);
+            when(context.getProperty(StandardS3EncryptionService.ENCRYPTION_STRATEGY)).thenReturn(new MockPropertyValue(strategyName));
+            when(context.getProperty(StandardS3EncryptionService.ENCRYPTION_VALUE)).thenReturn(new MockPropertyValue(keyIdOrMaterial));
+            when(context.getProperty(StandardS3EncryptionService.KMS_REGION)).thenReturn(new MockPropertyValue(REGION));
+
+            service.onConfigured(context);
+            runner.enableControllerService(service);
+        }
 
         return runner;
     }
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/ServerSideS3EncryptionStrategy.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/encryption/S3EncryptionTestUtil.java
similarity index 53%
copy from nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/ServerSideS3EncryptionStrategy.java
copy to nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/encryption/S3EncryptionTestUtil.java
index 0845e12..26e2c80 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/ServerSideS3EncryptionStrategy.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/encryption/S3EncryptionTestUtil.java
@@ -16,21 +16,23 @@
  */
 package org.apache.nifi.processors.aws.s3.encryption;
 
-import com.amazonaws.services.s3.model.ObjectMetadata;
-import com.amazonaws.services.s3.model.PutObjectRequest;
+import java.util.Base64;
+import java.util.Random;
 
+final class S3EncryptionTestUtil {
 
-/**
- * This strategy uses S3-managed keys to perform server-side encryption.  Use this strategy when you want the server to
- * perform the encryption (meaning you pay the cost of processing) and you want AWS to completely manage the key.
- *
- *
- * See https://docs.aws.amazon.com/AmazonS3/latest/dev/UsingServerSideEncryption.html
- *
- */
-public class ServerSideS3EncryptionStrategy implements S3EncryptionStrategy {
-    @Override
-    public void configurePutObjectRequest(PutObjectRequest request, ObjectMetadata objectMetadata, String keyValue) {
-        objectMetadata.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION);
+    private static Random RANDOM = new Random();
+
+    private S3EncryptionTestUtil() {
+    }
+
+    static String createKey(int keySize) {
+        if (keySize % 8 != 0) {
+            throw new IllegalArgumentException("Invalid test data");
+        }
+
+        byte[] keyMaterial = new byte[keySize / 8];
+        RANDOM.nextBytes(keyMaterial);
+        return new String(Base64.getEncoder().encode(keyMaterial));
     }
 }
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/encryption/TestClientSideCEncryptionStrategyKeyValidation.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/encryption/TestClientSideCEncryptionStrategyKeyValidation.java
new file mode 100644
index 0000000..5d716f5
--- /dev/null
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/encryption/TestClientSideCEncryptionStrategyKeyValidation.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.s3.encryption;
+
+import org.apache.nifi.components.ValidationResult;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.nifi.processors.aws.s3.encryption.S3EncryptionTestUtil.createKey;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class TestClientSideCEncryptionStrategyKeyValidation {
+
+    private ClientSideCEncryptionStrategy strategy;
+
+    @Before
+    public void setUp() {
+        strategy = new ClientSideCEncryptionStrategy();
+    }
+
+    @Test
+    public void testValid256BitKey() {
+        String key = createKey(256);
+
+        ValidationResult result = strategy.validateKey(key);
+
+        assertTrue(result.isValid());
+    }
+
+    @Test
+    public void testValid192BitKey() {
+        String key = createKey(192);
+
+        ValidationResult result = strategy.validateKey(key);
+
+        assertTrue(result.isValid());
+    }
+
+    @Test
+    public void testValid128BitKey() {
+        String key = createKey(128);
+
+        ValidationResult result = strategy.validateKey(key);
+
+        assertTrue(result.isValid());
+    }
+
+    @Test
+    public void testNotSupportedKeySize() {
+        String key = createKey(512);
+
+        ValidationResult result = strategy.validateKey(key);
+
+        assertFalse(result.isValid());
+    }
+
+    @Test
+    public void testNullKey() {
+        String key = null;
+
+        ValidationResult result = strategy.validateKey(key);
+
+        assertFalse(result.isValid());
+    }
+
+    @Test
+    public void testEmptyKey() {
+        String key = "";
+
+        ValidationResult result = strategy.validateKey(key);
+
+        assertFalse(result.isValid());
+    }
+
+    @Test
+    public void testNotBase64EncodedKey() {
+        String key = "NotBase64EncodedKey";
+
+        ValidationResult result = strategy.validateKey(key);
+
+        assertFalse(result.isValid());
+    }
+}
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/encryption/TestS3EncryptionStrategies.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/encryption/TestS3EncryptionStrategies.java
index 3261288..ec12230 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/encryption/TestS3EncryptionStrategies.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/encryption/TestS3EncryptionStrategies.java
@@ -33,7 +33,7 @@ public class TestS3EncryptionStrategies {
 
     private String randomKeyMaterial = "";
     private String randomKeyId = "mock-key-id";
-    private String region = "us-west-1";
+    private String kmsRegion = "us-west-1";
 
     private ObjectMetadata metadata = null;
     private PutObjectRequest putObjectRequest = null;
@@ -60,7 +60,7 @@ public class TestS3EncryptionStrategies {
         S3EncryptionStrategy strategy = new ClientSideKMSEncryptionStrategy();
 
         // This shows that the strategy builds a client:
-        Assert.assertNotNull(strategy.createEncryptionClient(null, null, region, randomKeyMaterial));
+        Assert.assertNotNull(strategy.createEncryptionClient(null, null, kmsRegion, randomKeyMaterial));
 
         // This shows that the strategy does not modify the metadata or any of the requests:
         Assert.assertNull(metadata.getSSEAlgorithm());
@@ -76,11 +76,11 @@ public class TestS3EncryptionStrategies {
     }
 
     @Test
-    public void testClientSideCMKEncryptionStrategy() {
-        S3EncryptionStrategy strategy = new ClientSideCMKEncryptionStrategy();
+    public void testClientSideCEncryptionStrategy() {
+        S3EncryptionStrategy strategy = new ClientSideCEncryptionStrategy();
 
         // This shows that the strategy builds a client:
-        Assert.assertNotNull(strategy.createEncryptionClient(null, null, region, randomKeyMaterial));
+        Assert.assertNotNull(strategy.createEncryptionClient(null, null, null, randomKeyMaterial));
 
         // This shows that the strategy does not modify the metadata or any of the requests:
         Assert.assertNull(metadata.getSSEAlgorithm());
@@ -96,11 +96,11 @@ public class TestS3EncryptionStrategies {
     }
 
     @Test
-    public void testServerSideCEKEncryptionStrategy() {
-        S3EncryptionStrategy strategy = new ServerSideCEKEncryptionStrategy();
+    public void testServerSideCEncryptionStrategy() {
+        S3EncryptionStrategy strategy = new ServerSideCEncryptionStrategy();
 
         // This shows that the strategy does *not* build a client:
-        Assert.assertNull(strategy.createEncryptionClient(null, null, "", ""));
+        Assert.assertNull(strategy.createEncryptionClient(null, null, null, ""));
 
         // This shows that the strategy sets the SSE customer key as expected:
         strategy.configurePutObjectRequest(putObjectRequest, metadata, randomKeyMaterial);
@@ -130,7 +130,7 @@ public class TestS3EncryptionStrategies {
         S3EncryptionStrategy strategy = new ServerSideKMSEncryptionStrategy();
 
         // This shows that the strategy does *not* build a client:
-        Assert.assertNull(strategy.createEncryptionClient(null, null, "", ""));
+        Assert.assertNull(strategy.createEncryptionClient(null, null, null, null));
 
         // This shows that the strategy sets the SSE KMS key id as expected:
         strategy.configurePutObjectRequest(putObjectRequest, metadata, randomKeyId);
@@ -150,10 +150,14 @@ public class TestS3EncryptionStrategies {
         S3EncryptionStrategy strategy = new ServerSideS3EncryptionStrategy();
 
         // This shows that the strategy does *not* build a client:
-        Assert.assertNull(strategy.createEncryptionClient(null, null, "", ""));
+        Assert.assertNull(strategy.createEncryptionClient(null, null, null, null));
 
         // This shows that the strategy sets the SSE algorithm field as expected:
-        strategy.configurePutObjectRequest(putObjectRequest, metadata, "");
+        strategy.configurePutObjectRequest(putObjectRequest, metadata, null);
+        Assert.assertEquals(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION, metadata.getSSEAlgorithm());
+
+        // Same for InitiateMultipartUploadRequest:
+        strategy.configureInitiateMultipartUploadRequest(initUploadRequest, metadata, null);
         Assert.assertEquals(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION, metadata.getSSEAlgorithm());
     }
 
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/encryption/TestServerSideCEncryptionStrategyKeyValidation.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/encryption/TestServerSideCEncryptionStrategyKeyValidation.java
new file mode 100644
index 0000000..77d64ce
--- /dev/null
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/encryption/TestServerSideCEncryptionStrategyKeyValidation.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.s3.encryption;
+
+import org.apache.nifi.components.ValidationResult;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.nifi.processors.aws.s3.encryption.S3EncryptionTestUtil.createKey;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class TestServerSideCEncryptionStrategyKeyValidation {
+
+    private ServerSideCEncryptionStrategy strategy;
+
+    @Before
+    public void setUp() {
+        strategy = new ServerSideCEncryptionStrategy();
+    }
+
+    @Test
+    public void testValid256BitKey() {
+        String key = createKey(256);
+
+        ValidationResult result = strategy.validateKey(key);
+
+        assertTrue(result.isValid());
+    }
+
+    @Test
+    public void testNotSupportedKeySize() {
+        String key = createKey(512);
+
+        ValidationResult result = strategy.validateKey(key);
+
+        assertFalse(result.isValid());
+    }
+
+    @Test
+    public void testNullKey() {
+        String key = null;
+
+        ValidationResult result = strategy.validateKey(key);
+
+        assertFalse(result.isValid());
+    }
+
+    @Test
+    public void testEmptyKey() {
+        String key = "";
+
+        ValidationResult result = strategy.validateKey(key);
+
+        assertFalse(result.isValid());
+    }
+
+    @Test
+    public void testNotBase64EncodedKey() {
+        String key = "NotBase64EncodedKey";
+
+        ValidationResult result = strategy.validateKey(key);
+
+        assertFalse(result.isValid());
+    }
+}
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/encryption/TestStandardS3EncryptionService.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/encryption/TestStandardS3EncryptionService.java
index d040d34..1a6c02e 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/encryption/TestStandardS3EncryptionService.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/encryption/TestStandardS3EncryptionService.java
@@ -23,6 +23,7 @@ import com.amazonaws.services.s3.model.PutObjectRequest;
 import com.amazonaws.services.s3.model.UploadPartRequest;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.processors.aws.s3.AmazonS3EncryptionService;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.util.MockPropertyValue;
 import org.junit.Assert;
@@ -37,26 +38,26 @@ public class TestStandardS3EncryptionService {
     private ConfigurationContext context;
     private String strategyName;
     private String keyIdOrMaterial;
-    private String region;
+    private String kmsRegion;
 
     @Before
     public void setup() throws InitializationException {
         service = new StandardS3EncryptionService();
         context = Mockito.mock(ConfigurationContext.class);
 
-        strategyName = StandardS3EncryptionService.STRATEGY_NAME_NONE;
+        strategyName = AmazonS3EncryptionService.STRATEGY_NAME_NONE;
         keyIdOrMaterial = "test-key-id";
-        region = "us-west-1";
+        kmsRegion = "us-west-1";
 
         Mockito.when(context.getProperty(StandardS3EncryptionService.ENCRYPTION_STRATEGY)).thenReturn(new MockPropertyValue(strategyName));
         Mockito.when(context.getProperty(StandardS3EncryptionService.ENCRYPTION_VALUE)).thenReturn(new MockPropertyValue(keyIdOrMaterial));
-        Mockito.when(context.getProperty(StandardS3EncryptionService.REGION)).thenReturn(new MockPropertyValue(region));
+        Mockito.when(context.getProperty(StandardS3EncryptionService.KMS_REGION)).thenReturn(new MockPropertyValue(kmsRegion));
         service.onConfigured(context);
     }
 
     @Test
     public void testServiceProperties() {
-        Assert.assertEquals(service.getRegion(), region);
+        Assert.assertEquals(service.getKmsRegion(), kmsRegion);
         Assert.assertEquals(service.getStrategyName(), strategyName);
     }
 
@@ -97,6 +98,6 @@ public class TestStandardS3EncryptionService {
 
         Assert.assertEquals(properties.get(0).getName(), StandardS3EncryptionService.ENCRYPTION_STRATEGY.getName());
         Assert.assertEquals(properties.get(1).getName(), StandardS3EncryptionService.ENCRYPTION_VALUE.getName());
-        Assert.assertEquals(properties.get(2).getName(), StandardS3EncryptionService.REGION.getName());
+        Assert.assertEquals(properties.get(2).getName(), StandardS3EncryptionService.KMS_REGION.getName());
     }
 }
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/encryption/TestStandardS3EncryptionServiceValidation.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/encryption/TestStandardS3EncryptionServiceValidation.java
new file mode 100644
index 0000000..2bccc17
--- /dev/null
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/encryption/TestStandardS3EncryptionServiceValidation.java
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.s3.encryption;
+
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.NoOpProcessor;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.nifi.processors.aws.s3.AmazonS3EncryptionService.STRATEGY_NAME_CSE_C;
+import static org.apache.nifi.processors.aws.s3.AmazonS3EncryptionService.STRATEGY_NAME_CSE_KMS;
+import static org.apache.nifi.processors.aws.s3.AmazonS3EncryptionService.STRATEGY_NAME_NONE;
+import static org.apache.nifi.processors.aws.s3.AmazonS3EncryptionService.STRATEGY_NAME_SSE_C;
+import static org.apache.nifi.processors.aws.s3.AmazonS3EncryptionService.STRATEGY_NAME_SSE_KMS;
+import static org.apache.nifi.processors.aws.s3.AmazonS3EncryptionService.STRATEGY_NAME_SSE_S3;
+import static org.apache.nifi.processors.aws.s3.encryption.S3EncryptionTestUtil.createKey;
+
+public class TestStandardS3EncryptionServiceValidation {
+
+    private TestRunner runner;
+    private StandardS3EncryptionService service;
+
+    @Before
+    public void setUp() throws InitializationException {
+        runner = TestRunners.newTestRunner(NoOpProcessor.class);
+        service = new StandardS3EncryptionService();
+        runner.addControllerService("s3-encryption-service", service);
+    }
+
+
+    // NoOpEncryptionStrategy
+
+    @Test
+    public void testValidNoOpEncryptionStrategy() {
+        configureService(STRATEGY_NAME_NONE, null);
+
+        runner.assertValid(service);
+    }
+
+    @Test
+    public void testInvalidNoOpEncryptionStrategyBecauseKeyIdOrMaterialSpecified() {
+        configureService(STRATEGY_NAME_NONE, "key-id");
+
+        runner.assertNotValid(service);
+    }
+
+    @Test
+    public void testInvalidNoOpEncryptionStrategyBecauseKeyIdOrMaterialSpecifiedAsEmptyString() {
+        configureService(STRATEGY_NAME_NONE, "");
+
+        runner.assertNotValid(service);
+    }
+
+    @Test
+    public void testInvalidNoOpEncryptionStrategyBecauseKeyIdOrMaterialSpecifiedUsingEL() {
+        configureService(STRATEGY_NAME_NONE, "${key-id-var}");
+
+        runner.assertNotValid(service);
+    }
+
+
+    // ServerSideS3EncryptionStrategy
+
+    @Test
+    public void testValidServerSideS3EncryptionStrategy() {
+        configureService(STRATEGY_NAME_SSE_S3, null);
+
+        runner.assertValid(service);
+    }
+
+    @Test
+    public void testInvalidServerSideS3EncryptionStrategyBecauseKeyIdOrMaterialSpecified() {
+        configureService(STRATEGY_NAME_SSE_S3, "key-id");
+
+        runner.assertNotValid(service);
+    }
+
+    @Test
+    public void testInvalidServerSideS3EncryptionStrategyBecauseKeyIdOrMaterialSpecifiedAsEmptyString() {
+        configureService(STRATEGY_NAME_SSE_S3, "");
+
+        runner.assertNotValid(service);
+    }
+
+    @Test
+    public void testInvalidServerSideS3EncryptionStrategyBecauseKeyIdOrMaterialSpecifiedUsingEL() {
+        configureService(STRATEGY_NAME_SSE_S3, "${key-id-var}");
+
+        runner.assertNotValid(service);
+    }
+
+
+    // ServerSideKMSEncryptionStrategy
+
+    @Test
+    public void testValidServerSideKMSEncryptionStrategy() {
+        configureService(STRATEGY_NAME_SSE_KMS, "key-id");
+
+        runner.assertValid(service);
+    }
+
+    @Test
+    public void testValidServerSideKMSEncryptionStrategyUsingEL() {
+        configureService(STRATEGY_NAME_SSE_KMS, "${key-id-var}");
+        configureVariable("key-id-var", "key-id");
+
+        runner.assertValid(service);
+    }
+
+    @Test
+    public void testInvalidServerSideKMSEncryptionStrategyBecauseKeyIdNotSpecified() {
+        configureService(STRATEGY_NAME_SSE_KMS, null);
+
+        runner.assertNotValid(service);
+    }
+
+    @Test
+    public void testInvalidServerSideKMSEncryptionStrategyBecauseKeyIdSpecifiedAsEmptyString() {
+        configureService(STRATEGY_NAME_SSE_KMS, "");
+
+        runner.assertNotValid(service);
+    }
+
+    @Test
+    public void testInvalidServerSideKMSEncryptionStrategyBecauseKeyIdEvaluatedToNull() {
+        configureService(STRATEGY_NAME_SSE_KMS, "${key-id-var}");
+
+        runner.assertNotValid(service);
+    }
+
+    @Test
+    public void testInvalidServerSideKMSEncryptionStrategyBecauseKeyIdEvaluatedToEmptyString() {
+        configureService(STRATEGY_NAME_SSE_KMS, "${key-id-var}");
+        configureVariable("key-id-var", "");
+
+        runner.assertNotValid(service);
+    }
+
+
+    // ServerSideCEncryptionStrategy
+
+    @Test
+    public void testValidServerSideCEncryptionStrategy() {
+        configureService(STRATEGY_NAME_SSE_C, createKey(256));
+
+        runner.assertValid(service);
+    }
+
+    @Test
+    public void testValidServerSideCEncryptionStrategyUsingEL() {
+        configureService(STRATEGY_NAME_SSE_C, "${key-material-var}");
+        configureVariable("key-material-var", createKey(256));
+
+        runner.assertValid(service);
+    }
+
+    @Test
+    public void testInvalidServerSideCEncryptionStrategyBecauseKeyMaterialNotSpecified() {
+        configureService(STRATEGY_NAME_SSE_C, null);
+
+        runner.assertNotValid(service);
+    }
+
+    @Test
+    public void testInvalidServerSideCEncryptionStrategyBecauseKeyMaterialSpecifiedAsEmptyString() {
+        configureService(STRATEGY_NAME_SSE_C, "");
+
+        runner.assertNotValid(service);
+    }
+
+    @Test
+    public void testInvalidServerSideCEncryptionStrategyBecauseKeyMaterialEvaluatedToNull() {
+        configureService(STRATEGY_NAME_SSE_C, "${key-material-var}");
+
+        runner.assertNotValid(service);
+    }
+
+    @Test
+    public void testInvalidServerSideCEncryptionStrategyBecauseKeyMaterialEvaluatedToEmptyString() {
+        configureService(STRATEGY_NAME_SSE_C, "${key-material-var}");
+        configureVariable("key-material-var", "");
+
+        runner.assertNotValid(service);
+    }
+
+
+    // ClientSideKMSEncryptionStrategy
+
+    @Test
+    public void testValidClientSideKMSEncryptionStrategy() {
+        configureService(STRATEGY_NAME_CSE_KMS, "key-id");
+
+        runner.assertValid(service);
+    }
+
+    @Test
+    public void testValidClientSideKMSEncryptionStrategyUsingEL() {
+        configureService(STRATEGY_NAME_CSE_KMS, "${key-id-var}");
+        configureVariable("key-id-var", "key-id");
+
+        runner.assertValid(service);
+    }
+
+    @Test
+    public void testInvalidClientSideKMSEncryptionStrategyBecauseKeyIdNotSpecified() {
+        configureService(STRATEGY_NAME_CSE_KMS, null);
+
+        runner.assertNotValid(service);
+    }
+
+    @Test
+    public void testInvalidClientSideKMSEncryptionStrategyBecauseKeyIdSpecifiedAsEmptyString() {
+        configureService(STRATEGY_NAME_CSE_KMS, "");
+
+        runner.assertNotValid(service);
+    }
+
+    @Test
+    public void testInvalidClientSideKMSEncryptionStrategyBecauseKeyIdEvaluatedToNull() {
+        configureService(STRATEGY_NAME_CSE_KMS, "${key-id-var}");
+
+        runner.assertNotValid(service);
+    }
+
+    @Test
+    public void testInvalidClientSideKMSEncryptionStrategyBecauseKeyIdEvaluatedToEmptyString() {
+        configureService(STRATEGY_NAME_CSE_KMS, "${key-id-var}");
+        configureVariable("key-id-var", "");
+
+        runner.assertNotValid(service);
+    }
+
+
+    // ClientSideCEncryptionStrategy
+
+    @Test
+    public void testValidClientSideCEncryptionStrategy() {
+        configureService(STRATEGY_NAME_CSE_C, createKey(256));
+
+        runner.assertValid(service);
+    }
+
+    @Test
+    public void testValidClientSideCEncryptionStrategyUsingEL() {
+        configureService(STRATEGY_NAME_CSE_C, "${key-material-var}");
+        configureVariable("key-material-var", createKey(256));
+
+        runner.assertValid(service);
+    }
+
+    @Test
+    public void testInvalidClientSideCEncryptionStrategyBecauseKeyMaterialNotSpecified() {
+        configureService(STRATEGY_NAME_CSE_C, null);
+
+        runner.assertNotValid(service);
+    }
+
+    @Test
+    public void testInvalidClientSideCEncryptionStrategyBecauseKeyMaterialSpecifiedAsEmptyString() {
+        configureService(STRATEGY_NAME_CSE_C, "");
+
+        runner.assertNotValid(service);
+    }
+
+    @Test
+    public void testInvalidClientSideCEncryptionStrategyBecauseKeyMaterialEvaluatedToNull() {
+        configureService(STRATEGY_NAME_CSE_C, "${key-material-var}");
+
+        runner.assertNotValid(service);
+    }
+
+    @Test
+    public void testInvalidClientSideCEncryptionStrategyBecauseKeyMaterialEvaluatedToEmptyString() {
+        configureService(STRATEGY_NAME_CSE_C, "${key-material-var}");
+        configureVariable("key-material-var", "");
+
+        runner.assertNotValid(service);
+    }
+
+
+    private void configureService(String encryptionStrategy, String keyIdOrMaterial) {
+        runner.setProperty(service, StandardS3EncryptionService.ENCRYPTION_STRATEGY, encryptionStrategy);
+        if (keyIdOrMaterial != null) {
+            runner.setProperty(service, StandardS3EncryptionService.ENCRYPTION_VALUE, keyIdOrMaterial);
+        }
+    }
+
+    private void configureVariable(String name, String value) {
+        runner.setVariable(name, value);
+    }
+}
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-service-api/src/main/java/org/apache/nifi/processors/aws/s3/AmazonS3EncryptionService.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-service-api/src/main/java/org/apache/nifi/processors/aws/s3/AmazonS3EncryptionService.java
index 946a1cc..81d52e6 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-service-api/src/main/java/org/apache/nifi/processors/aws/s3/AmazonS3EncryptionService.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-service-api/src/main/java/org/apache/nifi/processors/aws/s3/AmazonS3EncryptionService.java
@@ -31,6 +31,13 @@ import org.apache.nifi.controller.ControllerService;
  */
 public interface AmazonS3EncryptionService extends ControllerService {
 
+    String STRATEGY_NAME_NONE = "NONE";
+    String STRATEGY_NAME_SSE_S3 = "SSE_S3";
+    String STRATEGY_NAME_SSE_KMS = "SSE_KMS";
+    String STRATEGY_NAME_SSE_C = "SSE_C";
+    String STRATEGY_NAME_CSE_KMS = "CSE_KMS";
+    String STRATEGY_NAME_CSE_C = "CSE_C";
+
     /**
      * Configure a {@link PutObjectRequest} for encryption.
      * @param request the request to configure.
@@ -69,12 +76,17 @@ public interface AmazonS3EncryptionService extends ControllerService {
     AmazonS3Client createEncryptionClient(AWSCredentialsProvider credentialsProvider, ClientConfiguration clientConfiguration);
 
     /**
-     * @return The region associated with the service, as a String.
+     * @return The KMS region associated with the service, as a String.
      */
-    String getRegion();
+    String getKmsRegion();
 
     /**
      * @return The name of the encryption strategy associated with the service.
      */
     String getStrategyName();
+
+    /**
+     * @return The display name of the encryption strategy associated with the service.
+     */
+    String getStrategyDisplayName();
 }


Mime
View raw message