This is an automated email from the ASF dual-hosted git repository.
joewitt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new cd90b1d NIFI-6773 This closes #3820. Adding lz4-framed option for compression/decompression
NIFI-6773 Adding two unit tests for lz4-framed compression/decompression NIFI-6773 Adding
compressed lz4 file for decompression test
cd90b1d is described below
commit cd90b1d3e1b623556f17106c637b60eb1eaa37f0
Author: Andrew Rodriguez <rodriguez.andrewt@gmail.com>
AuthorDate: Wed Oct 16 15:09:58 2019 -0500
NIFI-6773 This closes #3820. Adding lz4-framed option for compression/decompression
NIFI-6773 Adding two unit tests for lz4-framed compression/decompression
NIFI-6773 Adding compressed lz4 file for decompression test
Signed-off-by: Joe Witt <joewitt@apache.org>
---
.../nifi/processors/standard/CompressContent.java | 20 +++++++++++--
.../processors/standard/TestCompressContent.java | 32 +++++++++++++++++++++
.../resources/CompressedData/SampleFile.txt.lz4 | Bin 0 -> 217 bytes
3 files changed, 49 insertions(+), 3 deletions(-)
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java
index f292705..ef76155 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java
@@ -37,6 +37,7 @@ import lzma.streams.LzmaOutputStream;
import org.apache.commons.compress.compressors.CompressorStreamFactory;
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
+import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorInputStream;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
@@ -73,7 +74,7 @@ import org.xerial.snappy.SnappyOutputStream;
@SideEffectFree
@SupportsBatching
@InputRequirement(Requirement.INPUT_REQUIRED)
-@Tags({"content", "compress", "decompress", "gzip", "bzip2", "lzma", "xz-lzma2", "snappy",
"snappy framed"})
+@Tags({"content", "compress", "decompress", "gzip", "bzip2", "lzma", "xz-lzma2", "snappy",
"snappy framed", "lz4-framed"})
@CapabilityDescription("Compresses or decompresses the contents of FlowFiles using a user-specified
compression algorithm and updates the mime.type "
+ "attribute as appropriate")
@ReadsAttribute(attribute = "mime.type", description = "If the Compression Format is set
to use mime.type attribute, this attribute is used to "
@@ -90,15 +91,17 @@ public class CompressContent extends AbstractProcessor {
public static final String COMPRESSION_FORMAT_LZMA = "lzma";
public static final String COMPRESSION_FORMAT_SNAPPY = "snappy";
public static final String COMPRESSION_FORMAT_SNAPPY_FRAMED = "snappy framed";
+ public static final String COMPRESSION_FORMAT_LZ4_FRAMED ="lz4-framed";
public static final String MODE_COMPRESS = "compress";
public static final String MODE_DECOMPRESS = "decompress";
public static final PropertyDescriptor COMPRESSION_FORMAT = new PropertyDescriptor.Builder()
.name("Compression Format")
- .description("The compression format to use. Valid values are: GZIP, BZIP2, XZ-LZMA2,
LZMA, Snappy, and Snappy Framed")
+ .description("The compression format to use. Valid values are: GZIP, BZIP2, XZ-LZMA2,
LZMA, Snappy, Snappy Framed, and LZ4-Framed")
.allowableValues(COMPRESSION_FORMAT_ATTRIBUTE, COMPRESSION_FORMAT_GZIP, COMPRESSION_FORMAT_BZIP2,
- COMPRESSION_FORMAT_XZ_LZMA2, COMPRESSION_FORMAT_LZMA, COMPRESSION_FORMAT_SNAPPY,
COMPRESSION_FORMAT_SNAPPY_FRAMED)
+ COMPRESSION_FORMAT_XZ_LZMA2, COMPRESSION_FORMAT_LZMA, COMPRESSION_FORMAT_SNAPPY,
COMPRESSION_FORMAT_SNAPPY_FRAMED,
+ COMPRESSION_FORMAT_LZ4_FRAMED)
.defaultValue(COMPRESSION_FORMAT_ATTRIBUTE)
.required(true)
.build();
@@ -161,6 +164,7 @@ public class CompressContent extends AbstractProcessor {
mimeTypeMap.put("application/x-lzma", COMPRESSION_FORMAT_LZMA);
mimeTypeMap.put("application/x-snappy", COMPRESSION_FORMAT_SNAPPY);
mimeTypeMap.put("application/x-snappy-framed", COMPRESSION_FORMAT_SNAPPY_FRAMED);
+ mimeTypeMap.put("application/x-lz4-framed", COMPRESSION_FORMAT_LZ4_FRAMED);
this.compressionFormatMimeTypeMap = Collections.unmodifiableMap(mimeTypeMap);
}
@@ -227,6 +231,9 @@ public class CompressContent extends AbstractProcessor {
case COMPRESSION_FORMAT_SNAPPY_FRAMED:
fileExtension = ".sz";
break;
+ case COMPRESSION_FORMAT_LZ4_FRAMED:
+ fileExtension = ".lz4";
+ break;
default:
fileExtension = "";
break;
@@ -268,6 +275,10 @@ public class CompressContent extends AbstractProcessor {
compressionOut = new SnappyFramedOutputStream(bufferedOut);
mimeTypeRef.set("application/x-snappy-framed");
break;
+ case COMPRESSION_FORMAT_LZ4_FRAMED:
+ mimeTypeRef.set("application/x-lz4-framed");
+ compressionOut = new CompressorStreamFactory().createCompressorOutputStream(compressionFormat.toLowerCase(),
bufferedOut);
+ break;
case COMPRESSION_FORMAT_BZIP2:
default:
mimeTypeRef.set("application/x-bzip2");
@@ -296,6 +307,9 @@ public class CompressContent extends AbstractProcessor {
case COMPRESSION_FORMAT_SNAPPY_FRAMED:
compressionIn = new SnappyFramedInputStream(bufferedIn);
break;
+ case COMPRESSION_FORMAT_LZ4_FRAMED:
+ compressionIn = new FramedLZ4CompressorInputStream(bufferedIn,
true);
+ break;
default:
compressionIn = new CompressorStreamFactory().createCompressorInputStream(compressionFormat.toLowerCase(),
bufferedIn);
}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestCompressContent.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestCompressContent.java
index 50c4cd5..9ed971d 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestCompressContent.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestCompressContent.java
@@ -253,4 +253,36 @@ public class TestCompressContent {
runner.getFlowFilesForRelationship(CompressContent.REL_FAILURE).get(0).assertContentEquals(data);
}
+
+ @Test
+ public void testLz4FramedCompress() throws Exception {
+ final TestRunner runner = TestRunners.newTestRunner(CompressContent.class);
+ runner.setProperty(CompressContent.MODE, CompressContent.MODE_COMPRESS);
+ runner.setProperty(CompressContent.COMPRESSION_FORMAT, CompressContent.COMPRESSION_FORMAT_LZ4_FRAMED);
+ runner.setProperty(CompressContent.UPDATE_FILENAME, "true");
+
+ runner.enqueue(Paths.get("src/test/resources/CompressedData/SampleFile.txt"));
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(CompressContent.REL_SUCCESS, 1);
+ MockFlowFile flowFile = runner.getFlowFilesForRelationship(CompressContent.REL_SUCCESS).get(0);
+ flowFile.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/x-lz4-framed");
+ flowFile.assertAttributeEquals("filename", "SampleFile.txt.lz4");
+ }
+
+ @Test
+ public void testLz4FramedDecompress() throws Exception {
+ final TestRunner runner = TestRunners.newTestRunner(CompressContent.class);
+ runner.setProperty(CompressContent.MODE, CompressContent.MODE_DECOMPRESS);
+ runner.setProperty(CompressContent.COMPRESSION_FORMAT, CompressContent.COMPRESSION_FORMAT_LZ4_FRAMED);
+ runner.setProperty(CompressContent.UPDATE_FILENAME, "true");
+
+ runner.enqueue(Paths.get("src/test/resources/CompressedData/SampleFile.txt.lz4"));
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(CompressContent.REL_SUCCESS, 1);
+ MockFlowFile flowFile = runner.getFlowFilesForRelationship(CompressContent.REL_SUCCESS).get(0);
+ flowFile.assertContentEquals(Paths.get("src/test/resources/CompressedData/SampleFile.txt"));
+ flowFile.assertAttributeEquals("filename", "SampleFile.txt");
+ }
}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/CompressedData/SampleFile.txt.lz4
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/CompressedData/SampleFile.txt.lz4
new file mode 100644
index 0000000..760ecd0
Binary files /dev/null and b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/CompressedData/SampleFile.txt.lz4
differ
|