nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From joew...@apache.org
Subject [nifi] branch master updated: NIFI-6773 This closes #3820. Adding lz4-framed option for compression/decompression NIFI-6773 Adding two unit tests for lz4-framed compression/decompression NIFI-6773 Adding compressed lz4 file for decompression test
Date Wed, 16 Oct 2019 22:19:25 GMT
This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new cd90b1d  NIFI-6773 This closes #3820. Adding lz4-framed option for compression/decompression
NIFI-6773 Adding two unit tests for lz4-framed compression/decompression NIFI-6773 Adding
compressed lz4 file for decompression test
cd90b1d is described below

commit cd90b1d3e1b623556f17106c637b60eb1eaa37f0
Author: Andrew Rodriguez <rodriguez.andrewt@gmail.com>
AuthorDate: Wed Oct 16 15:09:58 2019 -0500

    NIFI-6773 This closes #3820. Adding lz4-framed option for compression/decompression
    NIFI-6773 Adding two unit tests for lz4-framed compression/decompression
    NIFI-6773 Adding compressed lz4 file for decompression test
    
    Signed-off-by: Joe Witt <joewitt@apache.org>
---
 .../nifi/processors/standard/CompressContent.java  |  20 +++++++++++--
 .../processors/standard/TestCompressContent.java   |  32 +++++++++++++++++++++
 .../resources/CompressedData/SampleFile.txt.lz4    | Bin 0 -> 217 bytes
 3 files changed, 49 insertions(+), 3 deletions(-)

diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java
index f292705..ef76155 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java
@@ -37,6 +37,7 @@ import lzma.streams.LzmaOutputStream;
 import org.apache.commons.compress.compressors.CompressorStreamFactory;
 import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
 import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
+import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorInputStream;
 import org.apache.nifi.annotation.behavior.EventDriven;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
@@ -73,7 +74,7 @@ import org.xerial.snappy.SnappyOutputStream;
 @SideEffectFree
 @SupportsBatching
 @InputRequirement(Requirement.INPUT_REQUIRED)
-@Tags({"content", "compress", "decompress", "gzip", "bzip2", "lzma", "xz-lzma2", "snappy",
"snappy framed"})
+@Tags({"content", "compress", "decompress", "gzip", "bzip2", "lzma", "xz-lzma2", "snappy",
"snappy framed", "lz4-framed"})
 @CapabilityDescription("Compresses or decompresses the contents of FlowFiles using a user-specified
compression algorithm and updates the mime.type "
     + "attribute as appropriate")
 @ReadsAttribute(attribute = "mime.type", description = "If the Compression Format is set
to use mime.type attribute, this attribute is used to "
@@ -90,15 +91,17 @@ public class CompressContent extends AbstractProcessor {
     public static final String COMPRESSION_FORMAT_LZMA = "lzma";
     public static final String COMPRESSION_FORMAT_SNAPPY = "snappy";
     public static final String COMPRESSION_FORMAT_SNAPPY_FRAMED = "snappy framed";
+    public static final String COMPRESSION_FORMAT_LZ4_FRAMED ="lz4-framed";
 
     public static final String MODE_COMPRESS = "compress";
     public static final String MODE_DECOMPRESS = "decompress";
 
     public static final PropertyDescriptor COMPRESSION_FORMAT = new PropertyDescriptor.Builder()
     .name("Compression Format")
-    .description("The compression format to use. Valid values are: GZIP, BZIP2, XZ-LZMA2,
LZMA, Snappy, and Snappy Framed")
+    .description("The compression format to use. Valid values are: GZIP, BZIP2, XZ-LZMA2,
LZMA, Snappy, Snappy Framed, and LZ4-Framed")
     .allowableValues(COMPRESSION_FORMAT_ATTRIBUTE, COMPRESSION_FORMAT_GZIP, COMPRESSION_FORMAT_BZIP2,
-            COMPRESSION_FORMAT_XZ_LZMA2, COMPRESSION_FORMAT_LZMA, COMPRESSION_FORMAT_SNAPPY,
COMPRESSION_FORMAT_SNAPPY_FRAMED)
+            COMPRESSION_FORMAT_XZ_LZMA2, COMPRESSION_FORMAT_LZMA, COMPRESSION_FORMAT_SNAPPY,
COMPRESSION_FORMAT_SNAPPY_FRAMED,
+            COMPRESSION_FORMAT_LZ4_FRAMED)
     .defaultValue(COMPRESSION_FORMAT_ATTRIBUTE)
     .required(true)
     .build();
@@ -161,6 +164,7 @@ public class CompressContent extends AbstractProcessor {
         mimeTypeMap.put("application/x-lzma", COMPRESSION_FORMAT_LZMA);
         mimeTypeMap.put("application/x-snappy", COMPRESSION_FORMAT_SNAPPY);
         mimeTypeMap.put("application/x-snappy-framed", COMPRESSION_FORMAT_SNAPPY_FRAMED);
+        mimeTypeMap.put("application/x-lz4-framed", COMPRESSION_FORMAT_LZ4_FRAMED);
         this.compressionFormatMimeTypeMap = Collections.unmodifiableMap(mimeTypeMap);
     }
 
@@ -227,6 +231,9 @@ public class CompressContent extends AbstractProcessor {
             case COMPRESSION_FORMAT_SNAPPY_FRAMED:
                 fileExtension = ".sz";
                 break;
+            case COMPRESSION_FORMAT_LZ4_FRAMED:
+                fileExtension = ".lz4";
+                break;
             default:
                 fileExtension = "";
                 break;
@@ -268,6 +275,10 @@ public class CompressContent extends AbstractProcessor {
                                     compressionOut = new SnappyFramedOutputStream(bufferedOut);
                                     mimeTypeRef.set("application/x-snappy-framed");
                                     break;
+                                case COMPRESSION_FORMAT_LZ4_FRAMED:
+                                    mimeTypeRef.set("application/x-lz4-framed");
+                                    compressionOut = new CompressorStreamFactory().createCompressorOutputStream(compressionFormat.toLowerCase(),
bufferedOut);
+                                    break;
                                 case COMPRESSION_FORMAT_BZIP2:
                                 default:
                                     mimeTypeRef.set("application/x-bzip2");
@@ -296,6 +307,9 @@ public class CompressContent extends AbstractProcessor {
                                 case COMPRESSION_FORMAT_SNAPPY_FRAMED:
                                     compressionIn = new SnappyFramedInputStream(bufferedIn);
                                     break;
+                                case COMPRESSION_FORMAT_LZ4_FRAMED:
+                                    compressionIn = new FramedLZ4CompressorInputStream(bufferedIn,
true);
+                                    break;
                                 default:
                                     compressionIn = new CompressorStreamFactory().createCompressorInputStream(compressionFormat.toLowerCase(),
bufferedIn);
                             }
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestCompressContent.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestCompressContent.java
index 50c4cd5..9ed971d 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestCompressContent.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestCompressContent.java
@@ -253,4 +253,36 @@ public class TestCompressContent {
 
         runner.getFlowFilesForRelationship(CompressContent.REL_FAILURE).get(0).assertContentEquals(data);
     }
+
+    @Test
+    public void testLz4FramedCompress() throws Exception {
+        final TestRunner runner = TestRunners.newTestRunner(CompressContent.class);
+        runner.setProperty(CompressContent.MODE, CompressContent.MODE_COMPRESS);
+        runner.setProperty(CompressContent.COMPRESSION_FORMAT, CompressContent.COMPRESSION_FORMAT_LZ4_FRAMED);
+        runner.setProperty(CompressContent.UPDATE_FILENAME, "true");
+
+        runner.enqueue(Paths.get("src/test/resources/CompressedData/SampleFile.txt"));
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(CompressContent.REL_SUCCESS, 1);
+        MockFlowFile flowFile = runner.getFlowFilesForRelationship(CompressContent.REL_SUCCESS).get(0);
+        flowFile.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/x-lz4-framed");
+        flowFile.assertAttributeEquals("filename", "SampleFile.txt.lz4");
+    }
+
+    @Test
+    public void testLz4FramedDecompress() throws Exception {
+        final TestRunner runner = TestRunners.newTestRunner(CompressContent.class);
+        runner.setProperty(CompressContent.MODE, CompressContent.MODE_DECOMPRESS);
+        runner.setProperty(CompressContent.COMPRESSION_FORMAT, CompressContent.COMPRESSION_FORMAT_LZ4_FRAMED);
+        runner.setProperty(CompressContent.UPDATE_FILENAME, "true");
+
+        runner.enqueue(Paths.get("src/test/resources/CompressedData/SampleFile.txt.lz4"));
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(CompressContent.REL_SUCCESS, 1);
+        MockFlowFile flowFile = runner.getFlowFilesForRelationship(CompressContent.REL_SUCCESS).get(0);
+        flowFile.assertContentEquals(Paths.get("src/test/resources/CompressedData/SampleFile.txt"));
+        flowFile.assertAttributeEquals("filename", "SampleFile.txt");
+    }
 }
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/CompressedData/SampleFile.txt.lz4
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/CompressedData/SampleFile.txt.lz4
new file mode 100644
index 0000000..760ecd0
Binary files /dev/null and b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/CompressedData/SampleFile.txt.lz4
differ


Mime
View raw message