Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id BE997200C3D for ; Mon, 27 Feb 2017 18:38:03 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id BD036160B60; Mon, 27 Feb 2017 17:38:03 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id B9417160B5B for ; Mon, 27 Feb 2017 18:38:02 +0100 (CET) Received: (qmail 85359 invoked by uid 500); 27 Feb 2017 17:38:02 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 85341 invoked by uid 99); 27 Feb 2017 17:38:01 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 27 Feb 2017 17:38:01 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id D4606DFDA9; Mon, 27 Feb 2017 17:38:01 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: dhalperi@apache.org To: commits@beam.apache.org Date: Mon, 27 Feb 2017 17:38:01 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/2] beam git commit: Add DEFLATE to CompressedSource and TextIO archived-at: Mon, 27 Feb 2017 17:38:03 -0000 Repository: beam Updated Branches: refs/heads/master aaf0884d5 -> 0b7515e5e Add DEFLATE to CompressedSource and TextIO * add DEFLATE to CompressedSource * update docs in CompressedSource.java * update CompressedSourceTest * add DELATE to TextIO * update TextIOTest * add DEFLATE to FileBasedSink * update FileBasedSinkTest Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/63035de8 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/63035de8 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/63035de8 Branch: refs/heads/master Commit: 63035de82d5e484804e0cf3160a788041e6fe3d3 Parents: aaf0884 Author: Neville Li Authored: Tue Feb 21 11:34:22 2017 -0500 Committer: Dan Halperin Committed: Mon Feb 27 09:37:19 2017 -0800 ---------------------------------------------------------------------- .../apache/beam/sdk/io/CompressedSource.java | 32 +++++++-- .../org/apache/beam/sdk/io/FileBasedSink.java | 11 +++ .../java/org/apache/beam/sdk/io/TextIO.java | 10 ++- .../beam/sdk/io/CompressedSourceTest.java | 72 ++++++++++++++++++++ .../apache/beam/sdk/io/FileBasedSinkTest.java | 17 ++++- .../java/org/apache/beam/sdk/io/TextIOTest.java | 29 ++++++++ 6 files changed, 162 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/63035de8/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java index e3bd32e..6de22f9 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java @@ -39,6 +39,7 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream; +import org.apache.commons.compress.compressors.deflate.DeflateCompressorInputStream; import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream; /** @@ -54,14 +55,17 @@ import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream; * .withDecompression(CompressedSource.CompressionMode.GZIP))); * } * - *

Supported compression algorithms are {@link CompressionMode#GZIP} and - * {@link CompressionMode#BZIP2}. User-defined compression types are supported by implementing + *

Supported compression algorithms are {@link CompressionMode#GZIP}, + * {@link CompressionMode#BZIP2}, {@link CompressionMode#ZIP} and {@link CompressionMode#DEFLATE}. + * User-defined compression types are supported by implementing * {@link DecompressingChannelFactory}. * *

By default, the compression algorithm is selected from those supported in * {@link CompressionMode} based on the file name provided to the source, namely - * {@code ".bz2"} indicates {@link CompressionMode#BZIP2} and {@code ".gz"} indicates - * {@link CompressionMode#GZIP}. If the file name does not match any of the supported + * {@code ".bz2"} indicates {@link CompressionMode#BZIP2}, {@code ".gz"} indicates + * {@link CompressionMode#GZIP}, {@code ".zip"} indicates {@link CompressionMode#ZIP} and + * {@code ".deflate"} indicates {@link CompressionMode#DEFLATE}. If the file name does not match + * any of the supported * algorithms, it is assumed to be uncompressed data. * * @param The type to read from the compressed file. @@ -165,6 +169,22 @@ public class CompressedSource extends FileBasedSource { FullZipInputStream zip = new FullZipInputStream(Channels.newInputStream(channel)); return Channels.newChannel(zip); } + }, + + /** + * Reads a byte channel assuming it is compressed with deflate. + */ + DEFLATE { + @Override + public boolean matches(String fileName) { + return fileName.toLowerCase().endsWith(".deflate"); + } + + public ReadableByteChannel createDecompressingChannel(ReadableByteChannel channel) + throws IOException { + return Channels.newChannel( + new DeflateCompressorInputStream(Channels.newInputStream(channel))); + } }; /** @@ -385,8 +405,8 @@ public class CompressedSource extends FileBasedSource { .withLabel("Read Source")); if (channelFactory instanceof Enum) { - // GZIP and BZIP are implemented as enums; Enum classes are anonymous, so use the .name() - // value instead + // GZIP, BZIP, ZIP and DEFLATE are implemented as enums; Enum classes are anonymous, so use + // the .name() value instead builder.add(DisplayData.item("compressionMode", ((Enum) channelFactory).name()) .withLabel("Compression Mode")); } else { http://git-wip-us.apache.org/repos/asf/beam/blob/63035de8/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java index e14ba59..ae28b62 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java @@ -49,6 +49,7 @@ import org.apache.beam.sdk.util.IOChannelFactory; import org.apache.beam.sdk.util.IOChannelUtils; import org.apache.beam.sdk.util.MimeTypes; import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream; +import org.apache.commons.compress.compressors.deflate.DeflateCompressorOutputStream; import org.joda.time.Instant; import org.joda.time.format.DateTimeFormat; import org.slf4j.Logger; @@ -107,6 +108,16 @@ public abstract class FileBasedSink extends Sink { return Channels .newChannel(new BZip2CompressorOutputStream(Channels.newOutputStream(channel))); } + }, + /** + * Provides deflate output transformation. + */ + DEFLATE(".deflate", MimeTypes.BINARY) { + @Override + public WritableByteChannel create(WritableByteChannel channel) throws IOException { + return Channels + .newChannel(new DeflateCompressorOutputStream(Channels.newOutputStream(channel))); + } }; private String filenameSuffix; http://git-wip-us.apache.org/repos/asf/beam/blob/63035de8/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java index 86e6989..6e23a28 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java @@ -281,6 +281,10 @@ public class TextIO { return CompressedSource.from(new TextSource(filepattern)) .withDecompression(CompressedSource.CompressionMode.ZIP); + case DEFLATE: + return + CompressedSource.from(new TextSource(filepattern)) + .withDecompression(CompressedSource.CompressionMode.DEFLATE); default: throw new IllegalArgumentException("Unknown compression type: " + compressionType); } @@ -762,7 +766,11 @@ public class TextIO { /** * Zipped. */ - ZIP(".zip"); + ZIP(".zip"), + /** + * Deflate compressed. + */ + DEFLATE(".deflate"); private String filenameSuffix; http://git-wip-us.apache.org/repos/asf/beam/blob/63035de8/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java index 1574559..5cf3ada 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java @@ -46,6 +46,8 @@ import java.util.List; import java.util.NoSuchElementException; import java.util.Random; import java.util.zip.GZIPOutputStream; +import java.util.zip.ZipEntry; +import java.util.zip.ZipOutputStream; import javax.annotation.Nullable; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.SerializableCoder; @@ -63,6 +65,7 @@ import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.values.PCollection; import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream; +import org.apache.commons.compress.compressors.deflate.DeflateCompressorOutputStream; import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream; import org.hamcrest.Matchers; import org.junit.Rule; @@ -118,6 +121,18 @@ public class CompressedSourceTest { source = CompressedSource.from(new ByteSource("input.BZ2", 1)); assertFalse(source.isSplittable()); + // ZIP files are not splittable + source = CompressedSource.from(new ByteSource("input.zip", 1)); + assertFalse(source.isSplittable()); + source = CompressedSource.from(new ByteSource("input.ZIP", 1)); + assertFalse(source.isSplittable()); + + // DEFLATE files are not splittable + source = CompressedSource.from(new ByteSource("input.deflate", 1)); + assertFalse(source.isSplittable()); + source = CompressedSource.from(new ByteSource("input.DEFLATE", 1)); + assertFalse(source.isSplittable()); + // Other extensions are assumed to be splittable. source = CompressedSource.from(new ByteSource("input.txt", 1)); assertTrue(source.isSplittable()); @@ -160,6 +175,26 @@ public class CompressedSourceTest { } /** + * Test reading nonempty input with zip. + */ + @Test + @Category(NeedsRunner.class) + public void testReadZip() throws Exception { + byte[] input = generateInput(5000); + runReadTest(input, CompressionMode.ZIP); + } + + /** + * Test reading nonempty input with deflate. + */ + @Test + @Category(NeedsRunner.class) + public void testReadDeflate() throws Exception { + byte[] input = generateInput(5000); + runReadTest(input, CompressionMode.DEFLATE); + } + + /** * Test reading empty input with gzip. */ @Test @@ -445,12 +480,49 @@ public class CompressedSourceTest { return new GzipCompressorOutputStream(stream); case BZIP2: return new BZip2CompressorOutputStream(stream); + case ZIP: + return new TestZipOutputStream(stream); + case DEFLATE: + return new DeflateCompressorOutputStream(stream); default: throw new RuntimeException("Unexpected compression mode"); } } /** + * Extend of {@link ZipOutputStream} that splits up bytes into multiple entries. + */ + private static class TestZipOutputStream extends OutputStream { + + private ZipOutputStream zipOutputStream; + private long offset = 0; + private int entry = 0; + + public TestZipOutputStream(OutputStream stream) throws IOException { + super(); + zipOutputStream = new ZipOutputStream(stream); + zipOutputStream.putNextEntry(new ZipEntry(String.format("entry-%05d", entry))); + } + + @Override + public void write(int b) throws IOException { + zipOutputStream.write(b); + offset++; + if (offset % 100 == 0) { + entry++; + zipOutputStream.putNextEntry(new ZipEntry(String.format("entry-%05d", entry))); + } + } + + @Override + public void close() throws IOException { + zipOutputStream.closeEntry(); + super.close(); + } + + } + + /** * Writes a single output file. */ private void writeFile(File file, byte[] input, CompressionMode mode) throws IOException { http://git-wip-us.apache.org/repos/asf/beam/blob/63035de8/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java index 9f0c424..d2c1968 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java @@ -53,6 +53,7 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.util.IOChannelUtils; import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream; +import org.apache.commons.compress.compressors.deflate.DeflateCompressorInputStream; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -418,7 +419,7 @@ public class FileBasedSinkTest { } /** - * {@link CompressionType#BZIP2} correctly writes Gzipped data. + * {@link CompressionType#BZIP2} correctly writes BZip2 data. */ @Test public void testCompressionTypeBZIP2() throws FileNotFoundException, IOException { @@ -443,7 +444,19 @@ public class FileBasedSinkTest { } /** - * {@link CompressionType#GZIP} correctly writes Gzipped data. + * {@link CompressionType#DEFLATE} correctly writes deflate data. + */ + @Test + public void testCompressionTypeDEFLATE() throws FileNotFoundException, IOException { + final File file = writeValuesWithWritableByteChannelFactory( + CompressionType.DEFLATE, "abc", "123"); + // Read Gzipped data back in using standard API. + assertReadValues(new BufferedReader(new InputStreamReader(new DeflateCompressorInputStream( + new FileInputStream(file)), StandardCharsets.UTF_8.name())), "abc", "123"); + } + + /** + * {@link CompressionType#UNCOMPRESSED} correctly writes uncompressed data. */ @Test public void testCompressionTypeUNCOMPRESSED() throws FileNotFoundException, IOException { http://git-wip-us.apache.org/repos/asf/beam/blob/63035de8/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java index 6304603..cd94dc5 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java @@ -22,6 +22,7 @@ import static org.apache.beam.sdk.TestUtils.LINES_ARRAY; import static org.apache.beam.sdk.TestUtils.NO_LINES_ARRAY; import static org.apache.beam.sdk.io.TextIO.CompressionType.AUTO; import static org.apache.beam.sdk.io.TextIO.CompressionType.BZIP2; +import static org.apache.beam.sdk.io.TextIO.CompressionType.DEFLATE; import static org.apache.beam.sdk.io.TextIO.CompressionType.GZIP; import static org.apache.beam.sdk.io.TextIO.CompressionType.UNCOMPRESSED; import static org.apache.beam.sdk.io.TextIO.CompressionType.ZIP; @@ -100,6 +101,7 @@ import org.apache.beam.sdk.util.IOChannelUtils; import org.apache.beam.sdk.util.gcsfs.GcsPath; import org.apache.beam.sdk.values.PCollection; import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream; +import org.apache.commons.compress.compressors.deflate.DeflateCompressorOutputStream; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Ignore; @@ -141,6 +143,9 @@ public class TextIOTest { private static File emptyZip; private static File tinyZip; private static File largeZip; + private static File emptyDeflate; + private static File tinyDeflate; + private static File largeDeflate; @Rule public TestPipeline p = TestPipeline.create(); @@ -166,6 +171,9 @@ public class TextIOTest { zipOutput.putNextEntry(new ZipEntry("entry")); output = zipOutput; break; + case DEFLATE: + output = new DeflateCompressorOutputStream(output); + break; default: throw new UnsupportedOperationException(compression.toString()); } @@ -182,16 +190,19 @@ public class TextIOTest { emptyGz = writeToFile(EMPTY, "empty.gz", GZIP); emptyBzip2 = writeToFile(EMPTY, "empty.bz2", BZIP2); emptyZip = writeToFile(EMPTY, "empty.zip", ZIP); + emptyDeflate = writeToFile(EMPTY, "empty.deflate", DEFLATE); // tiny files tinyTxt = writeToFile(TINY, "tiny.txt", CompressionType.UNCOMPRESSED); tinyGz = writeToFile(TINY, "tiny.gz", GZIP); tinyBzip2 = writeToFile(TINY, "tiny.bz2", BZIP2); tinyZip = writeToFile(TINY, "tiny.zip", ZIP); + tinyDeflate = writeToFile(TINY, "tiny.deflate", DEFLATE); // large files largeTxt = writeToFile(LARGE, "large.txt", CompressionType.UNCOMPRESSED); largeGz = writeToFile(LARGE, "large.gz", GZIP); largeBzip2 = writeToFile(LARGE, "large.bz2", BZIP2); largeZip = writeToFile(LARGE, "large.zip", ZIP); + largeDeflate = writeToFile(LARGE, "large.deflate", DEFLATE); } @AfterClass @@ -796,6 +807,24 @@ public class TextIOTest { assertReadingCompressedFileMatchesExpected(zipFile, ZIP, TINY); } + @Test + @Category(NeedsRunner.class) + public void testDeflateCompressedRead() throws Exception { + // Files with the right extensions should work in AUTO and ZIP modes. + for (CompressionType type : new CompressionType[]{AUTO, DEFLATE}) { + assertReadingCompressedFileMatchesExpected(emptyDeflate, type, EMPTY); + assertReadingCompressedFileMatchesExpected(tinyDeflate, type, TINY); + assertReadingCompressedFileMatchesExpected(largeDeflate, type, LARGE); + } + + // Sanity check that we're properly testing compression. + assertThat(largeTxt.length(), greaterThan(largeDeflate.length())); + + // Deflate files with non-deflate extension should work in DEFLATE mode. + File deflateFile = writeToFile(TINY, "tiny_deflate_no_extension", DEFLATE); + assertReadingCompressedFileMatchesExpected(deflateFile, DEFLATE, TINY); + } + /** * Tests a zip file with no entries. This is a corner case not tested elsewhere as the default * test zip files have a single entry.