beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [1/2] beam git commit: ReduceFnRunner: test when watermark leapfrogs EOW and GC
Date Tue, 25 Jul 2017 18:18:07 GMT
Repository: beam
Updated Branches:
  refs/heads/master 01408c864 -> 73da9cc40


ReduceFnRunner: test when watermark leapfrogs EOW and GC

This is known to fail in older versions; forward porting regression test.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/22b82969
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/22b82969
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/22b82969

Branch: refs/heads/master
Commit: 22b82969828508cfbd45e4f90fe74dfed7914b88
Parents: 01408c8
Author: Kenneth Knowles <klk@google.com>
Authored: Wed Jul 19 15:27:20 2017 -0700
Committer: Kenneth Knowles <klk@google.com>
Committed: Tue Jul 25 10:52:55 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/core/ReduceFnRunnerTest.java   | 83 ++++++++++++++++++++
 1 file changed, 83 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/22b82969/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
index 4f13af1..2341502 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
@@ -39,6 +39,7 @@ import static org.mockito.Mockito.withSettings;
 import com.google.common.collect.Iterables;
 import java.util.List;
 import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
+import org.apache.beam.runners.core.triggers.DefaultTriggerStateMachine;
 import org.apache.beam.runners.core.triggers.TriggerStateMachine;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.metrics.MetricName;
@@ -247,6 +248,88 @@ public class ReduceFnRunnerTest {
   }
 
   /**
+   * When the watermark passes the end-of-window and window expiration time
+   * in a single update, this tests that it does not crash.
+   */
+  @Test
+  public void testSessionEowAndGcTogether() throws Exception {
+    ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
+        ReduceFnTester.nonCombining(
+            Sessions.withGapDuration(Duration.millis(10)),
+            DefaultTriggerStateMachine.<IntervalWindow>of(),
+            AccumulationMode.ACCUMULATING_FIRED_PANES,
+            Duration.millis(50),
+            ClosingBehavior.FIRE_ALWAYS);
+
+    tester.setAutoAdvanceOutputWatermark(true);
+
+    tester.advanceInputWatermark(new Instant(0));
+    injectElement(tester, 1);
+    tester.advanceInputWatermark(new Instant(100));
+
+    assertThat(
+        tester.extractOutput(),
+        contains(
+            isSingleWindowedValue(
+                contains(1), 1, 1, 11, PaneInfo.createPane(true, true, Timing.ON_TIME))));
+  }
+
+  /**
+   * When the watermark passes the end-of-window and window expiration time
+   * in a single update, this tests that it does not crash.
+   */
+  @Test
+  public void testFixedWindowsEowAndGcTogether() throws Exception {
+    ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
+        ReduceFnTester.nonCombining(
+            FixedWindows.of(Duration.millis(10)),
+            DefaultTriggerStateMachine.<IntervalWindow>of(),
+            AccumulationMode.ACCUMULATING_FIRED_PANES,
+            Duration.millis(50),
+            ClosingBehavior.FIRE_ALWAYS);
+
+    tester.setAutoAdvanceOutputWatermark(true);
+
+    tester.advanceInputWatermark(new Instant(0));
+    injectElement(tester, 1);
+    tester.advanceInputWatermark(new Instant(100));
+
+    assertThat(
+        tester.extractOutput(),
+        contains(
+            isSingleWindowedValue(
+                contains(1), 1, 0, 10, PaneInfo.createPane(true, true, Timing.ON_TIME))));
+  }
+
+  /**
+   * When the watermark passes the end-of-window and window expiration time
+   * in a single update, this tests that it does not crash.
+   */
+  @Test
+  public void testFixedWindowsEowAndGcTogetherFireIfNonEmpty() throws Exception {
+    ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
+        ReduceFnTester.nonCombining(
+            FixedWindows.of(Duration.millis(10)),
+            DefaultTriggerStateMachine.<IntervalWindow>of(),
+            AccumulationMode.ACCUMULATING_FIRED_PANES,
+            Duration.millis(50),
+            ClosingBehavior.FIRE_IF_NON_EMPTY);
+
+    tester.setAutoAdvanceOutputWatermark(true);
+
+    tester.advanceInputWatermark(new Instant(0));
+    injectElement(tester, 1);
+    tester.advanceInputWatermark(new Instant(100));
+
+    List<WindowedValue<Iterable<Integer>>> output = tester.extractOutput();
+    assertThat(
+        output,
+        contains(
+            isSingleWindowedValue(
+                contains(1), 1, 0, 10, PaneInfo.createPane(true, true, Timing.ON_TIME))));
+  }
+
+  /**
    * Tests that with the default trigger we will not produce two ON_TIME panes, even
    * if there are two outputs that are both candidates.
    */


Mime
View raw message