nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mose...@apache.org
Subject nifi git commit: NIFI-2611: Fixing bugs in UnpackContent with flow file unpackers
Date Mon, 22 Aug 2016 19:02:11 GMT
Repository: nifi
Updated Branches:
  refs/heads/master 087622ead -> 17dec0493


NIFI-2611: Fixing bugs in UnpackContent with flow file unpackers

Signed-off-by: Mike Moser <mosermw@apache.org>

This closes #905


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/17dec049
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/17dec049
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/17dec049

Branch: refs/heads/master
Commit: 17dec04939efda59345f5c2de49b575fd7d8dfcc
Parents: 087622e
Author: Joe Gresock <joseph.gresock@lmco.com>
Authored: Sun Aug 21 16:35:39 2016 +0000
Committer: Mike Moser <mosermw@apache.org>
Committed: Mon Aug 22 14:47:24 2016 -0400

----------------------------------------------------------------------
 .../nifi/processors/standard/UnpackContent.java | 52 +++++++++---------
 .../processors/standard/TestUnpackContent.java  | 56 +++++++++++---------
 2 files changed, 57 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/17dec049/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java
index 36cd4ad..d2de33c 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java
@@ -102,6 +102,13 @@ public class UnpackContent extends AbstractProcessor {
     public static final String FRAGMENT_COUNT = "fragment.count";
     public static final String SEGMENT_ORIGINAL_FILENAME = "segment.original.filename";
 
+    public static final String AUTO_DETECT_FORMAT_NAME = "use mime.type attribute";
+    public static final String TAR_FORMAT_NAME = "tar";
+    public static final String ZIP_FORMAT_NAME = "zip";
+    public static final String FLOWFILE_STREAM_FORMAT_V3_NAME = "flowfile-stream-v3";
+    public static final String FLOWFILE_STREAM_FORMAT_V2_NAME = "flowfile-stream-v2";
+    public static final String FLOWFILE_TAR_FORMAT_NAME = "flowfile-tar-v1";
+
     public static final String OCTET_STREAM = "application/octet-stream";
 
     public static final PropertyDescriptor PACKAGING_FORMAT = new PropertyDescriptor.Builder()
@@ -144,9 +151,6 @@ public class UnpackContent extends AbstractProcessor {
 
     private Unpacker tarUnpacker;
     private Unpacker zipUnpacker;
-    private Unpacker flowFileStreamV3Unpacker;
-    private Unpacker flowFileStreamV2Unpacker;
-    private Unpacker flowFileTarUnpacker;
 
     @Override
     protected void init(final ProcessorInitializationContext context) {
@@ -184,14 +188,6 @@ public class UnpackContent extends AbstractProcessor {
             fileFilter = Pattern.compile(context.getProperty(FILE_FILTER).getValue());
             tarUnpacker = new TarUnpacker(fileFilter);
             zipUnpacker = new ZipUnpacker(fileFilter);
-            flowFileStreamV3Unpacker = new FlowFileStreamUnpacker(new FlowFileUnpackagerV3());
-            flowFileStreamV2Unpacker = new FlowFileStreamUnpacker(new FlowFileUnpackagerV2());
-            flowFileTarUnpacker = new FlowFileStreamUnpacker(new FlowFileUnpackagerV1());
-        }
-
-        PackageFormat format = PackageFormat.getFormat(context.getProperty(PACKAGING_FORMAT).getValue());
-        if (format != PackageFormat.AUTO_DETECT_FORMAT && unpacker == null) {
-            initUnpacker(format);
         }
     }
 
@@ -207,15 +203,15 @@ public class UnpackContent extends AbstractProcessor {
                 addFragmentAttrs = true;
                 break;
             case FLOWFILE_STREAM_FORMAT_V2:
-                unpacker = flowFileStreamV2Unpacker;
+                unpacker = new FlowFileStreamUnpacker(new FlowFileUnpackagerV2());
                 addFragmentAttrs = false;
                 break;
             case FLOWFILE_STREAM_FORMAT_V3:
-                unpacker = flowFileStreamV3Unpacker;
+                unpacker = new FlowFileStreamUnpacker(new FlowFileUnpackagerV3());
                 addFragmentAttrs = false;
                 break;
             case FLOWFILE_TAR_FORMAT:
-                unpacker = flowFileTarUnpacker;
+                unpacker = new FlowFileStreamUnpacker(new FlowFileUnpackagerV1());
                 addFragmentAttrs = false;
                 break;
             case AUTO_DETECT_FORMAT:
@@ -254,6 +250,8 @@ public class UnpackContent extends AbstractProcessor {
             } else {
                 initUnpacker(packagingFormat);
             }
+        } else {
+            initUnpacker(packagingFormat);
         }
 
         final List<FlowFile> unpacked = new ArrayList<>();
@@ -495,13 +493,13 @@ public class UnpackContent extends AbstractProcessor {
     }
 
     protected enum PackageFormat {
-        AUTO_DETECT_FORMAT("use mime.type attribute"),
-        TAR_FORMAT("tar", "application/tar"),
-        X_TAR_FORMAT("tar", "application/x-tar"),
-        ZIP_FORMAT("zip", "application/zip"),
-        FLOWFILE_STREAM_FORMAT_V3("flowfile-stream-v3", "application/flowfile-v3"),
-        FLOWFILE_STREAM_FORMAT_V2("flowfile-stream-v2", "application/flowfile-v2"),
-        FLOWFILE_TAR_FORMAT("flowfile-tar-v1", "application/flowfile-v1");
+        AUTO_DETECT_FORMAT(AUTO_DETECT_FORMAT_NAME),
+        TAR_FORMAT(TAR_FORMAT_NAME, "application/tar"),
+        X_TAR_FORMAT(TAR_FORMAT_NAME, "application/x-tar"),
+        ZIP_FORMAT(ZIP_FORMAT_NAME, "application/zip"),
+        FLOWFILE_STREAM_FORMAT_V3(FLOWFILE_STREAM_FORMAT_V3_NAME, "application/flowfile-v3"),
+        FLOWFILE_STREAM_FORMAT_V2(FLOWFILE_STREAM_FORMAT_V2_NAME, "application/flowfile-v2"),
+        FLOWFILE_TAR_FORMAT(FLOWFILE_TAR_FORMAT_NAME, "application/flowfile-v1");
 
 
         private final String textValue;
@@ -526,17 +524,17 @@ public class UnpackContent extends AbstractProcessor {
 
         public static PackageFormat getFormat(String textValue) {
             switch (textValue) {
-            case "use mime.type attribute":
+            case AUTO_DETECT_FORMAT_NAME:
                 return AUTO_DETECT_FORMAT;
-            case "tar":
+            case TAR_FORMAT_NAME:
                 return TAR_FORMAT;
-            case "zip":
+            case ZIP_FORMAT_NAME:
                 return ZIP_FORMAT;
-            case "flowfile-stream-v3":
+            case FLOWFILE_STREAM_FORMAT_V3_NAME:
                 return FLOWFILE_STREAM_FORMAT_V3;
-            case "flowfile-stream-v2":
+            case FLOWFILE_STREAM_FORMAT_V2_NAME:
                 return FLOWFILE_STREAM_FORMAT_V2;
-            case "flowfile-stream-v1":
+            case FLOWFILE_TAR_FORMAT_NAME:
                 return FLOWFILE_TAR_FORMAT;
             }
             return null;

http://git-wip-us.apache.org/repos/asf/nifi/blob/17dec049/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUnpackContent.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUnpackContent.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUnpackContent.java
index acebdea..c107e95 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUnpackContent.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUnpackContent.java
@@ -45,17 +45,18 @@ public class TestUnpackContent {
         unpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.TAR_FORMAT.toString());
         autoUnpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.AUTO_DETECT_FORMAT.toString());
         unpackRunner.enqueue(dataPath.resolve("data.tar"));
+        unpackRunner.enqueue(dataPath.resolve("data.tar"));
         Map<String, String> attributes = new HashMap<>(1);
         Map<String, String> attributes2 = new HashMap<>(1);
         attributes.put("mime.type", UnpackContent.PackageFormat.TAR_FORMAT.getMimeType());
         attributes2.put("mime.type", UnpackContent.PackageFormat.X_TAR_FORMAT.getMimeType());
         autoUnpackRunner.enqueue(dataPath.resolve("data.tar"), attributes);
         autoUnpackRunner.enqueue(dataPath.resolve("data.tar"), attributes2);
-        unpackRunner.run();
+        unpackRunner.run(2);
         autoUnpackRunner.run(2);
 
-        unpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 2);
-        unpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 1);
+        unpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 4);
+        unpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 2);
         unpackRunner.assertTransferCount(UnpackContent.REL_FAILURE, 0);
 
         autoUnpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 4);
@@ -82,17 +83,18 @@ public class TestUnpackContent {
         autoUnpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.AUTO_DETECT_FORMAT.toString());
         autoUnpackRunner.setProperty(UnpackContent.FILE_FILTER, "^folder/cal.txt$");
         unpackRunner.enqueue(dataPath.resolve("data.tar"));
+        unpackRunner.enqueue(dataPath.resolve("data.tar"));
         Map<String, String> attributes = new HashMap<>(1);
         Map<String, String> attributes2 = new HashMap<>(1);
         attributes.put("mime.type", "application/x-tar");
         attributes2.put("mime.type", "application/tar");
         autoUnpackRunner.enqueue(dataPath.resolve("data.tar"), attributes);
         autoUnpackRunner.enqueue(dataPath.resolve("data.tar"), attributes2);
-        unpackRunner.run();
+        unpackRunner.run(2);
         autoUnpackRunner.run(2);
 
-        unpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 1);
-        unpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 1);
+        unpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 2);
+        unpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 2);
         unpackRunner.assertTransferCount(UnpackContent.REL_FAILURE, 0);
 
         autoUnpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 2);
@@ -126,18 +128,20 @@ public class TestUnpackContent {
         unpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.ZIP_FORMAT.toString());
         autoUnpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.AUTO_DETECT_FORMAT.toString());
         unpackRunner.enqueue(dataPath.resolve("data.zip"));
+        unpackRunner.enqueue(dataPath.resolve("data.zip"));
         Map<String, String> attributes = new HashMap<>(1);
         attributes.put("mime.type", "application/zip");
         autoUnpackRunner.enqueue(dataPath.resolve("data.zip"), attributes);
-        unpackRunner.run();
-        autoUnpackRunner.run();
+        autoUnpackRunner.enqueue(dataPath.resolve("data.zip"), attributes);
+        unpackRunner.run(2);
+        autoUnpackRunner.run(2);
 
-        unpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 2);
-        unpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 1);
+        unpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 4);
+        unpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 2);
         unpackRunner.assertTransferCount(UnpackContent.REL_FAILURE, 0);
 
-        autoUnpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 2);
-        autoUnpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 1);
+        autoUnpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 4);
+        autoUnpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 2);
         autoUnpackRunner.assertTransferCount(UnpackContent.REL_FAILURE, 0);
 
         final List<MockFlowFile> unpacked = unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_SUCCESS);
@@ -160,18 +164,20 @@ public class TestUnpackContent {
         autoUnpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.AUTO_DETECT_FORMAT.toString());
         autoUnpackRunner.setProperty(UnpackContent.FILE_FILTER, "^folder/cal.txt$");
         unpackRunner.enqueue(dataPath.resolve("data.zip"));
+        unpackRunner.enqueue(dataPath.resolve("data.zip"));
         Map<String, String> attributes = new HashMap<>(1);
         attributes.put("mime.type", "application/zip");
         autoUnpackRunner.enqueue(dataPath.resolve("data.zip"), attributes);
-        unpackRunner.run();
-        autoUnpackRunner.run();
+        autoUnpackRunner.enqueue(dataPath.resolve("data.zip"), attributes);
+        unpackRunner.run(2);
+        autoUnpackRunner.run(2);
 
-        unpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 1);
-        unpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 1);
+        unpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 2);
+        unpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 2);
         unpackRunner.assertTransferCount(UnpackContent.REL_FAILURE, 0);
 
-        autoUnpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 1);
-        autoUnpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 1);
+        autoUnpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 2);
+        autoUnpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 2);
         autoUnpackRunner.assertTransferCount(UnpackContent.REL_FAILURE, 0);
 
         List<MockFlowFile> unpacked = unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_SUCCESS);
@@ -199,11 +205,12 @@ public class TestUnpackContent {
         final TestRunner runner = TestRunners.newTestRunner(new UnpackContent());
         runner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.FLOWFILE_STREAM_FORMAT_V3.toString());
         runner.enqueue(dataPath.resolve("data.flowfilev3"));
+        runner.enqueue(dataPath.resolve("data.flowfilev3"));
 
-        runner.run();
+        runner.run(2);
 
-        runner.assertTransferCount(UnpackContent.REL_SUCCESS, 2);
-        runner.assertTransferCount(UnpackContent.REL_ORIGINAL, 1);
+        runner.assertTransferCount(UnpackContent.REL_SUCCESS, 4);
+        runner.assertTransferCount(UnpackContent.REL_ORIGINAL, 2);
         runner.assertTransferCount(UnpackContent.REL_FAILURE, 0);
 
         final List<MockFlowFile> unpacked = runner.getFlowFilesForRelationship(UnpackContent.REL_SUCCESS);
@@ -222,11 +229,12 @@ public class TestUnpackContent {
         final TestRunner runner = TestRunners.newTestRunner(new UnpackContent());
         runner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.FLOWFILE_STREAM_FORMAT_V2.toString());
         runner.enqueue(dataPath.resolve("data.flowfilev2"));
+        runner.enqueue(dataPath.resolve("data.flowfilev2"));
 
-        runner.run();
+        runner.run(2);
 
-        runner.assertTransferCount(UnpackContent.REL_SUCCESS, 2);
-        runner.assertTransferCount(UnpackContent.REL_ORIGINAL, 1);
+        runner.assertTransferCount(UnpackContent.REL_SUCCESS, 4);
+        runner.assertTransferCount(UnpackContent.REL_ORIGINAL, 2);
         runner.assertTransferCount(UnpackContent.REL_FAILURE, 0);
 
         final List<MockFlowFile> unpacked = runner.getFlowFilesForRelationship(UnpackContent.REL_SUCCESS);


Mime
View raw message