beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhalp...@apache.org
Subject [1/2] beam git commit: Add DEFLATE to CompressedSource and TextIO
Date Mon, 27 Feb 2017 17:38:01 GMT
Repository: beam
Updated Branches:
  refs/heads/master aaf0884d5 -> 0b7515e5e


Add DEFLATE to CompressedSource and TextIO

* add DEFLATE to CompressedSource

* update docs in CompressedSource.java

* update CompressedSourceTest

* add DELATE to TextIO

* update TextIOTest

* add DEFLATE to FileBasedSink

* update FileBasedSinkTest


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/63035de8
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/63035de8
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/63035de8

Branch: refs/heads/master
Commit: 63035de82d5e484804e0cf3160a788041e6fe3d3
Parents: aaf0884
Author: Neville Li <neville@spotify.com>
Authored: Tue Feb 21 11:34:22 2017 -0500
Committer: Dan Halperin <dhalperi@google.com>
Committed: Mon Feb 27 09:37:19 2017 -0800

----------------------------------------------------------------------
 .../apache/beam/sdk/io/CompressedSource.java    | 32 +++++++--
 .../org/apache/beam/sdk/io/FileBasedSink.java   | 11 +++
 .../java/org/apache/beam/sdk/io/TextIO.java     | 10 ++-
 .../beam/sdk/io/CompressedSourceTest.java       | 72 ++++++++++++++++++++
 .../apache/beam/sdk/io/FileBasedSinkTest.java   | 17 ++++-
 .../java/org/apache/beam/sdk/io/TextIOTest.java | 29 ++++++++
 6 files changed, 162 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/63035de8/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
index e3bd32e..6de22f9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
@@ -39,6 +39,7 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
+import org.apache.commons.compress.compressors.deflate.DeflateCompressorInputStream;
 import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
 
 /**
@@ -54,14 +55,17 @@ import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
  *     .withDecompression(CompressedSource.CompressionMode.GZIP)));
  * } </pre>
  *
- * <p>Supported compression algorithms are {@link CompressionMode#GZIP} and
- * {@link CompressionMode#BZIP2}. User-defined compression types are supported by implementing
+ * <p>Supported compression algorithms are {@link CompressionMode#GZIP},
+ * {@link CompressionMode#BZIP2}, {@link CompressionMode#ZIP} and {@link CompressionMode#DEFLATE}.
+ * User-defined compression types are supported by implementing
  * {@link DecompressingChannelFactory}.
  *
  * <p>By default, the compression algorithm is selected from those supported in
  * {@link CompressionMode} based on the file name provided to the source, namely
- * {@code ".bz2"} indicates {@link CompressionMode#BZIP2} and {@code ".gz"} indicates
- * {@link CompressionMode#GZIP}. If the file name does not match any of the supported
+ * {@code ".bz2"} indicates {@link CompressionMode#BZIP2}, {@code ".gz"} indicates
+ * {@link CompressionMode#GZIP}, {@code ".zip"} indicates {@link CompressionMode#ZIP} and
+ * {@code ".deflate"} indicates {@link CompressionMode#DEFLATE}. If the file name does not
match
+ * any of the supported
  * algorithms, it is assumed to be uncompressed data.
  *
  * @param <T> The type to read from the compressed file.
@@ -165,6 +169,22 @@ public class CompressedSource<T> extends FileBasedSource<T>
{
         FullZipInputStream zip = new FullZipInputStream(Channels.newInputStream(channel));
         return Channels.newChannel(zip);
       }
+    },
+
+    /**
+     * Reads a byte channel assuming it is compressed with deflate.
+     */
+    DEFLATE {
+      @Override
+      public boolean matches(String fileName) {
+        return fileName.toLowerCase().endsWith(".deflate");
+      }
+
+      public ReadableByteChannel createDecompressingChannel(ReadableByteChannel channel)
+          throws IOException {
+        return Channels.newChannel(
+            new DeflateCompressorInputStream(Channels.newInputStream(channel)));
+      }
     };
 
     /**
@@ -385,8 +405,8 @@ public class CompressedSource<T> extends FileBasedSource<T>
{
           .withLabel("Read Source"));
 
     if (channelFactory instanceof Enum) {
-      // GZIP and BZIP are implemented as enums; Enum classes are anonymous, so use the .name()
-      // value instead
+      // GZIP, BZIP, ZIP and DEFLATE are implemented as enums; Enum classes are anonymous,
so use
+      // the .name() value instead
       builder.add(DisplayData.item("compressionMode", ((Enum) channelFactory).name())
         .withLabel("Compression Mode"));
     } else {

http://git-wip-us.apache.org/repos/asf/beam/blob/63035de8/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index e14ba59..ae28b62 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -49,6 +49,7 @@ import org.apache.beam.sdk.util.IOChannelFactory;
 import org.apache.beam.sdk.util.IOChannelUtils;
 import org.apache.beam.sdk.util.MimeTypes;
 import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream;
+import org.apache.commons.compress.compressors.deflate.DeflateCompressorOutputStream;
 import org.joda.time.Instant;
 import org.joda.time.format.DateTimeFormat;
 import org.slf4j.Logger;
@@ -107,6 +108,16 @@ public abstract class FileBasedSink<T> extends Sink<T> {
         return Channels
             .newChannel(new BZip2CompressorOutputStream(Channels.newOutputStream(channel)));
       }
+    },
+    /**
+     * Provides deflate output transformation.
+     */
+    DEFLATE(".deflate", MimeTypes.BINARY) {
+      @Override
+      public WritableByteChannel create(WritableByteChannel channel) throws IOException {
+        return Channels
+            .newChannel(new DeflateCompressorOutputStream(Channels.newOutputStream(channel)));
+      }
     };
 
     private String filenameSuffix;

http://git-wip-us.apache.org/repos/asf/beam/blob/63035de8/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index 86e6989..6e23a28 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -281,6 +281,10 @@ public class TextIO {
             return
                 CompressedSource.from(new TextSource(filepattern))
                     .withDecompression(CompressedSource.CompressionMode.ZIP);
+          case DEFLATE:
+            return
+                CompressedSource.from(new TextSource(filepattern))
+                    .withDecompression(CompressedSource.CompressionMode.DEFLATE);
           default:
             throw new IllegalArgumentException("Unknown compression type: " + compressionType);
         }
@@ -762,7 +766,11 @@ public class TextIO {
     /**
      * Zipped.
      */
-    ZIP(".zip");
+    ZIP(".zip"),
+    /**
+     * Deflate compressed.
+     */
+    DEFLATE(".deflate");
 
     private String filenameSuffix;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/63035de8/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
index 1574559..5cf3ada 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
@@ -46,6 +46,8 @@ import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.Random;
 import java.util.zip.GZIPOutputStream;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipOutputStream;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.SerializableCoder;
@@ -63,6 +65,7 @@ import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream;
+import org.apache.commons.compress.compressors.deflate.DeflateCompressorOutputStream;
 import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream;
 import org.hamcrest.Matchers;
 import org.junit.Rule;
@@ -118,6 +121,18 @@ public class CompressedSourceTest {
     source = CompressedSource.from(new ByteSource("input.BZ2", 1));
     assertFalse(source.isSplittable());
 
+    // ZIP files are not splittable
+    source = CompressedSource.from(new ByteSource("input.zip", 1));
+    assertFalse(source.isSplittable());
+    source = CompressedSource.from(new ByteSource("input.ZIP", 1));
+    assertFalse(source.isSplittable());
+
+    // DEFLATE files are not splittable
+    source = CompressedSource.from(new ByteSource("input.deflate", 1));
+    assertFalse(source.isSplittable());
+    source = CompressedSource.from(new ByteSource("input.DEFLATE", 1));
+    assertFalse(source.isSplittable());
+
     // Other extensions are assumed to be splittable.
     source = CompressedSource.from(new ByteSource("input.txt", 1));
     assertTrue(source.isSplittable());
@@ -160,6 +175,26 @@ public class CompressedSourceTest {
   }
 
   /**
+   * Test reading nonempty input with zip.
+   */
+  @Test
+  @Category(NeedsRunner.class)
+  public void testReadZip() throws Exception {
+    byte[] input = generateInput(5000);
+    runReadTest(input, CompressionMode.ZIP);
+  }
+
+  /**
+   * Test reading nonempty input with deflate.
+   */
+  @Test
+  @Category(NeedsRunner.class)
+  public void testReadDeflate() throws Exception {
+    byte[] input = generateInput(5000);
+    runReadTest(input, CompressionMode.DEFLATE);
+  }
+
+  /**
    * Test reading empty input with gzip.
    */
   @Test
@@ -445,12 +480,49 @@ public class CompressedSourceTest {
         return new GzipCompressorOutputStream(stream);
       case BZIP2:
         return new BZip2CompressorOutputStream(stream);
+      case ZIP:
+        return new TestZipOutputStream(stream);
+      case DEFLATE:
+        return new DeflateCompressorOutputStream(stream);
       default:
         throw new RuntimeException("Unexpected compression mode");
     }
   }
 
   /**
+   * Extend of {@link ZipOutputStream} that splits up bytes into multiple entries.
+   */
+  private static class TestZipOutputStream extends OutputStream {
+
+    private ZipOutputStream zipOutputStream;
+    private long offset = 0;
+    private int entry = 0;
+
+    public TestZipOutputStream(OutputStream stream) throws IOException {
+      super();
+      zipOutputStream = new ZipOutputStream(stream);
+      zipOutputStream.putNextEntry(new ZipEntry(String.format("entry-%05d", entry)));
+    }
+
+    @Override
+    public void write(int b) throws IOException {
+      zipOutputStream.write(b);
+      offset++;
+      if (offset % 100 == 0) {
+        entry++;
+        zipOutputStream.putNextEntry(new ZipEntry(String.format("entry-%05d", entry)));
+      }
+    }
+
+    @Override
+    public void close() throws IOException {
+      zipOutputStream.closeEntry();
+      super.close();
+    }
+
+  }
+
+  /**
    * Writes a single output file.
    */
   private void writeFile(File file, byte[] input, CompressionMode mode) throws IOException
{

http://git-wip-us.apache.org/repos/asf/beam/blob/63035de8/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
index 9f0c424..d2c1968 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
@@ -53,6 +53,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.util.IOChannelUtils;
 import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
+import org.apache.commons.compress.compressors.deflate.DeflateCompressorInputStream;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -418,7 +419,7 @@ public class FileBasedSinkTest {
   }
 
   /**
-   * {@link CompressionType#BZIP2} correctly writes Gzipped data.
+   * {@link CompressionType#BZIP2} correctly writes BZip2 data.
    */
   @Test
   public void testCompressionTypeBZIP2() throws FileNotFoundException, IOException {
@@ -443,7 +444,19 @@ public class FileBasedSinkTest {
   }
 
   /**
-   * {@link CompressionType#GZIP} correctly writes Gzipped data.
+   * {@link CompressionType#DEFLATE} correctly writes deflate data.
+   */
+  @Test
+  public void testCompressionTypeDEFLATE() throws FileNotFoundException, IOException {
+    final File file = writeValuesWithWritableByteChannelFactory(
+        CompressionType.DEFLATE, "abc", "123");
+    // Read Gzipped data back in using standard API.
+    assertReadValues(new BufferedReader(new InputStreamReader(new DeflateCompressorInputStream(
+        new FileInputStream(file)), StandardCharsets.UTF_8.name())), "abc", "123");
+  }
+
+  /**
+   * {@link CompressionType#UNCOMPRESSED} correctly writes uncompressed data.
    */
   @Test
   public void testCompressionTypeUNCOMPRESSED() throws FileNotFoundException, IOException
{

http://git-wip-us.apache.org/repos/asf/beam/blob/63035de8/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
index 6304603..cd94dc5 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
@@ -22,6 +22,7 @@ import static org.apache.beam.sdk.TestUtils.LINES_ARRAY;
 import static org.apache.beam.sdk.TestUtils.NO_LINES_ARRAY;
 import static org.apache.beam.sdk.io.TextIO.CompressionType.AUTO;
 import static org.apache.beam.sdk.io.TextIO.CompressionType.BZIP2;
+import static org.apache.beam.sdk.io.TextIO.CompressionType.DEFLATE;
 import static org.apache.beam.sdk.io.TextIO.CompressionType.GZIP;
 import static org.apache.beam.sdk.io.TextIO.CompressionType.UNCOMPRESSED;
 import static org.apache.beam.sdk.io.TextIO.CompressionType.ZIP;
@@ -100,6 +101,7 @@ import org.apache.beam.sdk.util.IOChannelUtils;
 import org.apache.beam.sdk.util.gcsfs.GcsPath;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream;
+import org.apache.commons.compress.compressors.deflate.DeflateCompressorOutputStream;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
@@ -141,6 +143,9 @@ public class TextIOTest {
   private static File emptyZip;
   private static File tinyZip;
   private static File largeZip;
+  private static File emptyDeflate;
+  private static File tinyDeflate;
+  private static File largeDeflate;
 
   @Rule
   public TestPipeline p = TestPipeline.create();
@@ -166,6 +171,9 @@ public class TextIOTest {
         zipOutput.putNextEntry(new ZipEntry("entry"));
         output = zipOutput;
         break;
+      case DEFLATE:
+        output = new DeflateCompressorOutputStream(output);
+        break;
       default:
         throw new UnsupportedOperationException(compression.toString());
     }
@@ -182,16 +190,19 @@ public class TextIOTest {
     emptyGz = writeToFile(EMPTY, "empty.gz", GZIP);
     emptyBzip2 = writeToFile(EMPTY, "empty.bz2", BZIP2);
     emptyZip = writeToFile(EMPTY, "empty.zip", ZIP);
+    emptyDeflate = writeToFile(EMPTY, "empty.deflate", DEFLATE);
     // tiny files
     tinyTxt = writeToFile(TINY, "tiny.txt", CompressionType.UNCOMPRESSED);
     tinyGz = writeToFile(TINY, "tiny.gz", GZIP);
     tinyBzip2 = writeToFile(TINY, "tiny.bz2", BZIP2);
     tinyZip = writeToFile(TINY, "tiny.zip", ZIP);
+    tinyDeflate = writeToFile(TINY, "tiny.deflate", DEFLATE);
     // large files
     largeTxt = writeToFile(LARGE, "large.txt", CompressionType.UNCOMPRESSED);
     largeGz = writeToFile(LARGE, "large.gz", GZIP);
     largeBzip2 = writeToFile(LARGE, "large.bz2", BZIP2);
     largeZip = writeToFile(LARGE, "large.zip", ZIP);
+    largeDeflate = writeToFile(LARGE, "large.deflate", DEFLATE);
   }
 
   @AfterClass
@@ -796,6 +807,24 @@ public class TextIOTest {
     assertReadingCompressedFileMatchesExpected(zipFile, ZIP, TINY);
   }
 
+  @Test
+  @Category(NeedsRunner.class)
+  public void testDeflateCompressedRead() throws Exception {
+    // Files with the right extensions should work in AUTO and ZIP modes.
+    for (CompressionType type : new CompressionType[]{AUTO, DEFLATE}) {
+      assertReadingCompressedFileMatchesExpected(emptyDeflate, type, EMPTY);
+      assertReadingCompressedFileMatchesExpected(tinyDeflate, type, TINY);
+      assertReadingCompressedFileMatchesExpected(largeDeflate, type, LARGE);
+    }
+
+    // Sanity check that we're properly testing compression.
+    assertThat(largeTxt.length(), greaterThan(largeDeflate.length()));
+
+    // Deflate files with non-deflate extension should work in DEFLATE mode.
+    File deflateFile = writeToFile(TINY, "tiny_deflate_no_extension", DEFLATE);
+    assertReadingCompressedFileMatchesExpected(deflateFile, DEFLATE, TINY);
+  }
+
   /**
    * Tests a zip file with no entries. This is a corner case not tested elsewhere as the
default
    * test zip files have a single entry.


Mime
View raw message