beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [13/50] [abbrv] beam git commit: [BEAM-2290] Fix issue where timestamps weren't set when using CompressedSource
Date Thu, 18 May 2017 20:08:13 GMT
[BEAM-2290] Fix issue where timestamps weren't set when using CompressedSource


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/504bd6a8
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/504bd6a8
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/504bd6a8

Branch: refs/heads/gearpump-runner
Commit: 504bd6a8036946a796a7290853e45bf20d32d07d
Parents: 9a43da7
Author: Rune Fevang <fevang@exabel.com>
Authored: Fri May 12 22:24:32 2017 +0200
Committer: Dan Halperin <dhalperi@google.com>
Committed: Mon May 15 09:52:00 2017 -0700

----------------------------------------------------------------------
 .../apache/beam/sdk/io/CompressedSource.java    |  6 +++++
 .../beam/sdk/io/CompressedSourceTest.java       | 25 ++++++++++++++++++--
 2 files changed, 29 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/504bd6a8/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
index f2fc37b..6ab8dec 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
@@ -42,6 +42,7 @@ import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
 import org.apache.commons.compress.compressors.deflate.DeflateCompressorInputStream;
 import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
+import org.joda.time.Instant;
 
 /**
  * A Source that reads from compressed files. A {@code CompressedSources} wraps a delegate
@@ -579,5 +580,10 @@ public class CompressedSource<T> extends FileBasedSource<T>
{
         return channel.getCount();
       }
     }
+
+    @Override
+    public Instant getCurrentTimestamp() throws NoSuchElementException {
+      return readerDelegate.getCurrentTimestamp();
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/504bd6a8/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
index 014e16e..3fff319 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
@@ -64,12 +64,16 @@ import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.SourceTestUtils;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream;
 import org.apache.commons.compress.compressors.deflate.DeflateCompressorOutputStream;
 import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream;
 import org.hamcrest.Matchers;
+import org.joda.time.Instant;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -552,8 +556,13 @@ public class CompressedSourceTest {
     if (decompressionFactory != null) {
       source = source.withDecompression(decompressionFactory);
     }
-    PCollection<Byte> output = p.apply(Read.from(source));
-    PAssert.that(output).containsInAnyOrder(Bytes.asList(expected));
+    PCollection<KV<Long, Byte>> output = p.apply(Read.from(source))
+        .apply(ParDo.of(new ExtractIndexFromTimestamp()));
+    ArrayList<KV<Long, Byte>> expectedOutput = new ArrayList<>();
+    for (int i = 0; i < expected.length; i++) {
+      expectedOutput.add(KV.of((long) i, expected[i]));
+    }
+    PAssert.that(output).containsInAnyOrder(expectedOutput);
     p.run();
   }
 
@@ -632,6 +641,18 @@ public class CompressedSourceTest {
       protected long getCurrentOffset() {
         return offset;
       }
+
+      @Override
+      public Instant getCurrentTimestamp() throws NoSuchElementException {
+        return new Instant(getCurrentOffset());
+      }
+    }
+  }
+
+  private static class ExtractIndexFromTimestamp extends DoFn<Byte, KV<Long, Byte>>
{
+    @ProcessElement
+    public void processElement(ProcessContext context) {
+      context.output(KV.of(context.timestamp().getMillis(), context.element()));
     }
   }
 


Mime
View raw message