beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [1/2] beam git commit: Speeds up CompressedSourceTest
Date Mon, 18 Sep 2017 20:52:55 GMT
Repository: beam
Updated Branches:
  refs/heads/master 5ea2537ec -> fdd99650d


Speeds up CompressedSourceTest


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fecfbc11
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fecfbc11
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fecfbc11

Branch: refs/heads/master
Commit: fecfbc112840d355ade8923b3dbf70486e4f0ed8
Parents: 5ea2537
Author: Eugene Kirpichov <kirpichov@google.com>
Authored: Thu Sep 14 17:51:28 2017 -0700
Committer: Eugene Kirpichov <kirpichov@google.com>
Committed: Mon Sep 18 13:47:08 2017 -0700

----------------------------------------------------------------------
 .../beam/sdk/io/CompressedSourceTest.java       | 79 ++++++--------------
 1 file changed, 23 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/fecfbc11/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
index 352d38a..f932d43 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
@@ -19,7 +19,6 @@ package org.apache.beam.sdk.io;
 
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includesDisplayDataFor;
-import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.assertEquals;
@@ -29,6 +28,7 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
+import com.google.common.collect.HashMultiset;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.google.common.io.Files;
@@ -60,24 +60,16 @@ import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
-import org.apache.beam.sdk.testing.NeedsRunner;
-import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.SourceTestUtils;
-import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
 import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream;
 import org.apache.commons.compress.compressors.deflate.DeflateCompressorOutputStream;
 import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream;
-import org.hamcrest.Matchers;
 import org.joda.time.Instant;
 import org.junit.Rule;
 import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.internal.matchers.ThrowableMessageMatcher;
 import org.junit.rules.ExpectedException;
 import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
@@ -90,9 +82,6 @@ import org.junit.runners.JUnit4;
 public class CompressedSourceTest {
 
   @Rule
-  public TestPipeline p = TestPipeline.create();
-
-  @Rule
   public TemporaryFolder tmpFolder = new TemporaryFolder();
 
   @Rule
@@ -102,7 +91,6 @@ public class CompressedSourceTest {
    * Test reading nonempty input with gzip.
    */
   @Test
-  @Category(NeedsRunner.class)
   public void testReadGzip() throws Exception {
     byte[] input = generateInput(5000);
     runReadTest(input, CompressionMode.GZIP);
@@ -174,7 +162,6 @@ public class CompressedSourceTest {
    * Test reading nonempty input with bzip2.
    */
   @Test
-  @Category(NeedsRunner.class)
   public void testReadBzip2() throws Exception {
     byte[] input = generateInput(5000);
     runReadTest(input, CompressionMode.BZIP2);
@@ -184,7 +171,6 @@ public class CompressedSourceTest {
    * Test reading nonempty input with zip.
    */
   @Test
-  @Category(NeedsRunner.class)
   public void testReadZip() throws Exception {
     byte[] input = generateInput(5000);
     runReadTest(input, CompressionMode.ZIP);
@@ -194,7 +180,6 @@ public class CompressedSourceTest {
    * Test reading nonempty input with deflate.
    */
   @Test
-  @Category(NeedsRunner.class)
   public void testReadDeflate() throws Exception {
     byte[] input = generateInput(5000);
     runReadTest(input, CompressionMode.DEFLATE);
@@ -204,7 +189,6 @@ public class CompressedSourceTest {
    * Test reading empty input with gzip.
    */
   @Test
-  @Category(NeedsRunner.class)
   public void testEmptyReadGzip() throws Exception {
     byte[] input = generateInput(0);
     runReadTest(input, CompressionMode.GZIP);
@@ -232,7 +216,6 @@ public class CompressedSourceTest {
    * to be the concatenation of those individual files.
    */
   @Test
-  @Category(NeedsRunner.class)
   public void testReadConcatenatedGzip() throws IOException {
     byte[] header = "a,b,c\n".getBytes(StandardCharsets.UTF_8);
     byte[] body = "1,2,3\n4,5,6\n7,8,9\n".getBytes(StandardCharsets.UTF_8);
@@ -246,10 +229,8 @@ public class CompressedSourceTest {
     CompressedSource<Byte> source =
         CompressedSource.from(new ByteSource(tmpFile.getAbsolutePath(), 1))
             .withDecompression(CompressionMode.GZIP);
-    PCollection<Byte> output = p.apply(Read.from(source));
-
-    PAssert.that(output).containsInAnyOrder(Bytes.asList(expected));
-    p.run();
+    List<Byte> actual = SourceTestUtils.readFromSource(source, PipelineOptionsFactory.create());
+    assertEquals(Bytes.asList(expected), actual);
   }
 
   /**
@@ -259,7 +240,6 @@ public class CompressedSourceTest {
    * those streams.
    */
   @Test
-  @Category(NeedsRunner.class)
   public void testReadMultiStreamBzip2() throws IOException {
     CompressionMode mode = CompressionMode.BZIP2;
     byte[] input1 = generateInput(5, 587973);
@@ -289,7 +269,6 @@ public class CompressedSourceTest {
    * Test reading empty input with bzip2.
    */
   @Test
-  @Category(NeedsRunner.class)
   public void testCompressedReadBzip2() throws Exception {
     byte[] input = generateInput(0);
     runReadTest(input, CompressionMode.BZIP2);
@@ -299,7 +278,6 @@ public class CompressedSourceTest {
    * Test reading according to filepattern when the file is bzipped.
    */
   @Test
-  @Category(NeedsRunner.class)
   public void testCompressedAccordingToFilepatternGzip() throws Exception {
     byte[] input = generateInput(100);
     File tmpFile = tmpFolder.newFile("test.gz");
@@ -311,7 +289,6 @@ public class CompressedSourceTest {
    * Test reading according to filepattern when the file is gzipped.
    */
   @Test
-  @Category(NeedsRunner.class)
   public void testCompressedAccordingToFilepatternBzip2() throws Exception {
     byte[] input = generateInput(100);
     File tmpFile = tmpFolder.newFile("test.bz2");
@@ -323,7 +300,6 @@ public class CompressedSourceTest {
    * Test reading multiple files with different compression.
    */
   @Test
-  @Category(NeedsRunner.class)
   public void testHeterogeneousCompression() throws Exception {
     String baseName = "test-input";
 
@@ -333,28 +309,26 @@ public class CompressedSourceTest {
 
     // Every sort of compression
     File uncompressedFile = tmpFolder.newFile(baseName + ".bin");
-    generated = generateInput(1000);
+    generated = generateInput(1000, 1);
     Files.write(generated, uncompressedFile);
     expected.addAll(Bytes.asList(generated));
 
     File gzipFile = tmpFolder.newFile(baseName + ".gz");
-    generated = generateInput(1000);
+    generated = generateInput(1000, 2);
     writeFile(gzipFile, generated, CompressionMode.GZIP);
     expected.addAll(Bytes.asList(generated));
 
     File bzip2File = tmpFolder.newFile(baseName + ".bz2");
-    generated = generateInput(1000);
-    writeFile(bzip2File, generateInput(1000), CompressionMode.BZIP2);
+    generated = generateInput(1000, 3);
+    writeFile(bzip2File, generated, CompressionMode.BZIP2);
     expected.addAll(Bytes.asList(generated));
 
     String filePattern = new File(tmpFolder.getRoot().toString(), baseName + ".*").toString();
 
     CompressedSource<Byte> source =
         CompressedSource.from(new ByteSource(filePattern, 1));
-    PCollection<Byte> output = p.apply(Read.from(source));
-
-    PAssert.that(output).containsInAnyOrder(expected);
-    p.run();
+    List<Byte> actual = SourceTestUtils.readFromSource(source, PipelineOptionsFactory.create());
+    assertEquals(HashMultiset.create(actual), HashMultiset.create(expected));
   }
 
   @Test
@@ -414,7 +388,6 @@ public class CompressedSourceTest {
    * this due to properties of services that we read from.
    */
   @Test
-  @Category(NeedsRunner.class)
   public void testFalseGzipStream() throws Exception {
     byte[] input = generateInput(1000);
     File tmpFile = tmpFolder.newFile("test.gz");
@@ -427,15 +400,11 @@ public class CompressedSourceTest {
    * we fail.
    */
   @Test
-  @Category(NeedsRunner.class)
   public void testFalseBzip2Stream() throws Exception {
     byte[] input = generateInput(1000);
     File tmpFile = tmpFolder.newFile("test.bz2");
     Files.write(input, tmpFile);
-    thrown.expectCause(Matchers.allOf(
-        instanceOf(IOException.class),
-        ThrowableMessageMatcher.hasMessage(
-            containsString("Stream is not in the BZip2 format"))));
+    thrown.expectMessage("Stream is not in the BZip2 format");
     verifyReadContents(input, tmpFile, CompressionMode.BZIP2);
   }
 
@@ -444,7 +413,6 @@ public class CompressedSourceTest {
    * the gzip header is two bytes.
    */
   @Test
-  @Category(NeedsRunner.class)
   public void testEmptyReadGzipUncompressed() throws Exception {
     byte[] input = generateInput(0);
     File tmpFile = tmpFolder.newFile("test.gz");
@@ -457,7 +425,6 @@ public class CompressedSourceTest {
    * the gzip header is two bytes.
    */
   @Test
-  @Category(NeedsRunner.class)
   public void testOneByteReadGzipUncompressed() throws Exception {
     byte[] input = generateInput(1);
     File tmpFile = tmpFolder.newFile("test.gz");
@@ -469,15 +436,14 @@ public class CompressedSourceTest {
    * Test reading multiple files.
    */
   @Test
-  @Category(NeedsRunner.class)
   public void testCompressedReadMultipleFiles() throws Exception {
-    int numFiles = 10;
+    int numFiles = 3;
     String baseName = "test_input-";
     String filePattern = new File(tmpFolder.getRoot().toString(), baseName + "*").toString();
     List<Byte> expected = new ArrayList<>();
 
     for (int i = 0; i < numFiles; i++) {
-      byte[] generated = generateInput(1000);
+      byte[] generated = generateInput(100);
       File tmpFile = tmpFolder.newFile(baseName + i);
       writeFile(tmpFile, generated, CompressionMode.GZIP);
       expected.addAll(Bytes.asList(generated));
@@ -486,10 +452,8 @@ public class CompressedSourceTest {
     CompressedSource<Byte> source =
         CompressedSource.from(new ByteSource(filePattern, 1))
             .withDecompression(CompressionMode.GZIP);
-    PCollection<Byte> output = p.apply(Read.from(source));
-
-    PAssert.that(output).containsInAnyOrder(expected);
-    p.run();
+    List<Byte> actual = SourceTestUtils.readFromSource(source, PipelineOptionsFactory.create());
+    assertEquals(HashMultiset.create(expected), HashMultiset.create(actual));
   }
 
   @Test
@@ -607,20 +571,23 @@ public class CompressedSourceTest {
   }
 
   private void verifyReadContents(byte[] expected, File inputFile,
-      @Nullable DecompressingChannelFactory decompressionFactory) {
+      @Nullable DecompressingChannelFactory decompressionFactory) throws IOException {
     CompressedSource<Byte> source =
         CompressedSource.from(new ByteSource(inputFile.toPath().toString(), 1));
     if (decompressionFactory != null) {
       source = source.withDecompression(decompressionFactory);
     }
-    PCollection<KV<Long, Byte>> output = p.apply(Read.from(source))
-        .apply(ParDo.of(new ExtractIndexFromTimestamp()));
-    ArrayList<KV<Long, Byte>> expectedOutput = new ArrayList<>();
+    List<KV<Long, Byte>> actualOutput = Lists.newArrayList();
+    try (BoundedReader<Byte> reader = source.createReader(PipelineOptionsFactory.create()))
{
+      for (boolean more = reader.start(); more; more = reader.advance()) {
+        actualOutput.add(KV.of(reader.getCurrentTimestamp().getMillis(), reader.getCurrent()));
+      }
+    }
+    List<KV<Long, Byte>> expectedOutput = Lists.newArrayList();
     for (int i = 0; i < expected.length; i++) {
       expectedOutput.add(KV.of((long) i, expected[i]));
     }
-    PAssert.that(output).containsInAnyOrder(expectedOutput);
-    p.run();
+    assertEquals(expectedOutput, actualOutput);
   }
 
   /**


Mime
View raw message