beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [1/4] beam git commit: Adds tests for the watermark hold (previously untested)
Date Sat, 08 Apr 2017 00:34:49 GMT
Repository: beam
Updated Branches:
  refs/heads/master 4a694cebb -> 1594849da


Adds tests for the watermark hold (previously untested)


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/29c28021
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/29c28021
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/29c28021

Branch: refs/heads/master
Commit: 29c280211c2431f29c5552c35bd3435c65e4975b
Parents: dad7ace
Author: Eugene Kirpichov <kirpichov@google.com>
Authored: Fri Apr 7 14:00:05 2017 -0700
Committer: Eugene Kirpichov <kirpichov@google.com>
Committed: Fri Apr 7 17:06:13 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/core/SplittableParDoTest.java  | 56 +++++++++++++++++++-
 1 file changed, 54 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/29c28021/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
index f8d6095..d301113 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.core;
 
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.hasItems;
 import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -36,6 +37,7 @@ import javax.annotation.Nullable;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.InstantCoder;
 import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
@@ -208,7 +210,7 @@ public class SplittableParDoTest {
     private Instant currentProcessingTime;
 
     private InMemoryTimerInternals timerInternals;
-    private InMemoryStateInternals<String> stateInternals;
+    private TestInMemoryStateInternals<String> stateInternals;
 
     ProcessFnTester(
         Instant currentProcessingTime,
@@ -223,7 +225,7 @@ public class SplittableParDoTest {
               fn, inputCoder, restrictionCoder, IntervalWindow.getCoder());
       this.tester = DoFnTester.of(processFn);
       this.timerInternals = new InMemoryTimerInternals();
-      this.stateInternals = InMemoryStateInternals.forKey("dummy");
+      this.stateInternals = new TestInMemoryStateInternals<>("dummy");
       processFn.setStateInternalsFactory(
           new StateInternalsFactory<String>() {
             @Override
@@ -335,6 +337,9 @@ public class SplittableParDoTest {
       return tester.takeOutputElements();
     }
 
+    public Instant getWatermarkHold() {
+      return stateInternals.earliestWatermarkHold();
+    }
   }
 
   private static class OutputWindowedValueToDoFnTester<OutputT>
@@ -425,6 +430,53 @@ public class SplittableParDoTest {
     }
   }
 
+  private static class WatermarkUpdateFn extends DoFn<Instant, String> {
+    @ProcessElement
+    public void process(ProcessContext c, OffsetRangeTracker tracker) {
+      for (long i = tracker.currentRestriction().getFrom(); tracker.tryClaim(i); ++i) {
+        c.updateWatermark(c.element().plus(Duration.standardSeconds(i)));
+        c.output(String.valueOf(i));
+      }
+    }
+
+    @GetInitialRestriction
+    public OffsetRange getInitialRestriction(Instant elem) {
+      throw new IllegalStateException("Expected to be supplied explicitly in this test");
+    }
+
+    @NewTracker
+    public OffsetRangeTracker newTracker(OffsetRange range) {
+      return new OffsetRangeTracker(range);
+    }
+  }
+
+  @Test
+  public void testUpdatesWatermark() throws Exception {
+    DoFn<Instant, String> fn = new WatermarkUpdateFn();
+    Instant base = Instant.now();
+
+    ProcessFnTester<Instant, String, OffsetRange, OffsetRangeTracker> tester =
+        new ProcessFnTester<>(
+            base,
+            fn,
+            InstantCoder.of(),
+            SerializableCoder.of(OffsetRange.class),
+            3,
+            MAX_BUNDLE_DURATION);
+
+    tester.startElement(base, new OffsetRange(0, 8));
+    assertThat(tester.takeOutputElements(), hasItems("0", "1", "2"));
+    assertEquals(base.plus(Duration.standardSeconds(2)), tester.getWatermarkHold());
+
+    assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(1)));
+    assertThat(tester.takeOutputElements(), hasItems("3", "4", "5"));
+    assertEquals(base.plus(Duration.standardSeconds(5)), tester.getWatermarkHold());
+
+    assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(1)));
+    assertThat(tester.takeOutputElements(), hasItems("6", "7"));
+    assertEquals(null, tester.getWatermarkHold());
+  }
+
   /**
    * A splittable {@link DoFn} that generates the sequence [init, init + total).
    */


Mime
View raw message