Repository: incubator-nifi
Updated Branches:
refs/heads/develop 594262a84 -> 14e73bc24
NIFI-374: Route ProcessException's to failure
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/ab6794b2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/ab6794b2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/ab6794b2
Branch: refs/heads/develop
Commit: ab6794b29ebd4e7205e9aa0a7e8c227374a4b6d3
Parents: 6e5469c
Author: Mark Payne <markap14@hotmail.com>
Authored: Wed Apr 29 17:02:08 2015 -0400
Committer: Mark Payne <markap14@hotmail.com>
Committed: Wed Apr 29 17:02:08 2015 -0400
----------------------------------------------------------------------
.../processors/standard/EncryptContent.java | 100 ++++++++++---------
.../processors/standard/TestEncryptContent.java | 21 ++++
2 files changed, 74 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/ab6794b2/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EncryptContent.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EncryptContent.java
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EncryptContent.java
index c0f6301..7fe9fbc 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EncryptContent.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EncryptContent.java
@@ -74,35 +74,35 @@ public class EncryptContent extends AbstractProcessor {
public static final int DEFAULT_SALT_SIZE = 8;
public static final PropertyDescriptor MODE = new PropertyDescriptor.Builder()
- .name("Mode")
- .description("Specifies whether the content should be encrypted or decrypted")
- .required(true)
- .allowableValues(ENCRYPT_MODE, DECRYPT_MODE)
- .defaultValue(ENCRYPT_MODE)
- .build();
+ .name("Mode")
+ .description("Specifies whether the content should be encrypted or decrypted")
+ .required(true)
+ .allowableValues(ENCRYPT_MODE, DECRYPT_MODE)
+ .defaultValue(ENCRYPT_MODE)
+ .build();
public static final PropertyDescriptor ENCRYPTION_ALGORITHM = new PropertyDescriptor.Builder()
- .name("Encryption Algorithm")
- .description("The Encryption Algorithm to use")
- .required(true)
- .allowableValues(EncryptionMethod.values())
- .defaultValue(EncryptionMethod.MD5_256AES.name())
- .build();
+ .name("Encryption Algorithm")
+ .description("The Encryption Algorithm to use")
+ .required(true)
+ .allowableValues(EncryptionMethod.values())
+ .defaultValue(EncryptionMethod.MD5_256AES.name())
+ .build();
public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
- .name("Password")
- .description("The Password to use for encrypting or decrypting the data")
- .required(true)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .sensitive(true)
- .build();
+ .name("Password")
+ .description("The Password to use for encrypting or decrypting the data")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .sensitive(true)
+ .build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
- .name("success")
- .description("Any FlowFile that is successfully encrypted or decrypted will be
routed to success")
- .build();
+ .name("success")
+ .description("Any FlowFile that is successfully encrypted or decrypted will be routed
to success")
+ .build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
- .name("failure")
- .description("Any FlowFile that cannot be encrypted or decrypted will be routed
to failure")
- .build();
+ .name("failure")
+ .description("Any FlowFile that cannot be encrypted or decrypted will be routed to failure")
+ .build();
private List<PropertyDescriptor> properties;
private Set<Relationship> relationships;
@@ -172,34 +172,40 @@ public class EncryptContent extends AbstractProcessor {
final int saltSize = (algorithmBlockSize > 0) ? algorithmBlockSize : DEFAULT_SALT_SIZE;
final StopWatch stopWatch = new StopWatch(true);
- if (context.getProperty(MODE).getValue().equalsIgnoreCase(ENCRYPT_MODE)) {
- final byte[] salt = new byte[saltSize];
- secureRandom.nextBytes(salt);
+ final String mode = context.getProperty(MODE).getValue();
+ try {
+ if (mode.equalsIgnoreCase(ENCRYPT_MODE)) {
+ final byte[] salt = new byte[saltSize];
+ secureRandom.nextBytes(salt);
+
+ final PBEParameterSpec parameterSpec = new PBEParameterSpec(salt, 1000);
+ try {
+ cipher.init(Cipher.ENCRYPT_MODE, secretKey, parameterSpec);
+ } catch (final InvalidKeyException | InvalidAlgorithmParameterException e)
{
+ logger.error("unable to encrypt {} due to {}", new Object[]{flowFile,
e});
+ session.transfer(flowFile, REL_FAILURE);
+ return;
+ }
- final PBEParameterSpec parameterSpec = new PBEParameterSpec(salt, 1000);
- try {
- cipher.init(Cipher.ENCRYPT_MODE, secretKey, parameterSpec);
- } catch (final InvalidKeyException | InvalidAlgorithmParameterException e) {
- logger.error("unable to encrypt {} due to {}", new Object[]{flowFile, e});
- session.transfer(flowFile, REL_FAILURE);
- return;
- }
+ flowFile = session.write(flowFile, new EncryptCallback(cipher, salt));
+ logger.info("Successfully encrypted {}", new Object[]{flowFile});
+ } else {
+ if (flowFile.getSize() <= saltSize) {
+ logger.error("Cannot decrypt {} because its file size is not greater
than the salt size", new Object[]{flowFile});
+ session.transfer(flowFile, REL_FAILURE);
+ return;
+ }
- flowFile = session.write(flowFile, new EncryptCallback(cipher, salt));
- logger.info("Successfully encrypted {}", new Object[]{flowFile});
- } else {
- if (flowFile.getSize() <= saltSize) {
- logger.error("Cannot decrypt {} because its file size is not greater than
the salt size", new Object[]{flowFile});
- session.transfer(flowFile, REL_FAILURE);
- return;
+ flowFile = session.write(flowFile, new DecryptCallback(cipher, secretKey,
saltSize));
+ logger.info("successfully decrypted {}", new Object[]{flowFile});
}
- flowFile = session.write(flowFile, new DecryptCallback(cipher, secretKey, saltSize));
- logger.info("successfully decrypted {}", new Object[]{flowFile});
+ session.getProvenanceReporter().modifyContent(flowFile, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+ session.transfer(flowFile, REL_SUCCESS);
+ } catch (final ProcessException pe) {
+ getLogger().error("Failed to {} {} due to {}; routing to failure", new Object[]
{mode, flowFile, pe});
+ session.transfer(flowFile, REL_FAILURE);
}
-
- session.getProvenanceReporter().modifyContent(flowFile, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
- session.transfer(flowFile, REL_SUCCESS);
}
private static class DecryptCallback implements StreamCallback {
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/ab6794b2/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEncryptContent.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEncryptContent.java
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEncryptContent.java
index 7340e0f..59ac975 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEncryptContent.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEncryptContent.java
@@ -61,4 +61,25 @@ public class TestEncryptContent {
}
}
+ @Test
+ public void testDecryptNonEncryptedFile() throws IOException {
+ final TestRunner testRunner = TestRunners.newTestRunner(new EncryptContent());
+ testRunner.setProperty(EncryptContent.PASSWORD, "Hello, World!");
+
+ for (final EncryptionMethod method : EncryptionMethod.values()) {
+ if (method.isUnlimitedStrength()) {
+ continue; // cannot test unlimited strength in unit tests because it's
not enabled by the JVM by default.
+ }
+
+ testRunner.setProperty(EncryptContent.ENCRYPTION_ALGORITHM, method.name());
+ testRunner.setProperty(EncryptContent.MODE, EncryptContent.DECRYPT_MODE);
+
+ testRunner.enqueue(Paths.get("src/test/resources/hello.txt"));
+ testRunner.clearTransferState();
+ testRunner.run();
+
+ testRunner.assertAllFlowFilesTransferred(EncryptContent.REL_FAILURE, 1);
+ }
+ }
+
}
|