beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bchamb...@apache.org
Subject [1/3] incubator-beam git commit: Add test for empty ON_TIME and no empty final pane
Date Tue, 19 Apr 2016 23:03:40 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/master 0952f4433 -> f1aa490b9


Add test for empty ON_TIME and no empty final pane

Add a test that we get an empty `ON_TIME` pane, and don't get the empty
final pane when using accumulation mode with the only if non-empty
`ClosingBehavior`.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e4c5f530
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e4c5f530
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e4c5f530

Branch: refs/heads/master
Commit: e4c5f530effc591ef56f8d49162a0d82069a9e31
Parents: fa45809
Author: bchambers <bchambers@google.com>
Authored: Tue Apr 19 12:43:52 2016 -0700
Committer: bchambers <bchambers@google.com>
Committed: Tue Apr 19 15:23:53 2016 -0700

----------------------------------------------------------------------
 .../beam/sdk/util/ReduceFnRunnerTest.java       | 54 ++++++++++++++++++++
 1 file changed, 54 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4c5f530/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java
index 5eccb04..65b5ee6 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java
@@ -548,6 +548,60 @@ public class ReduceFnRunnerTest {
   }
 
   @Test
+  public void noEmptyPanesFinalIfNonEmpty() throws Exception {
+    ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester = ReduceFnTester.nonCombining(
+        WindowingStrategy.of(FixedWindows.of(Duration.millis(10)))
+            .withTrigger(Repeatedly.<IntervalWindow>forever(AfterFirst.<IntervalWindow>of(
+                AfterPane.elementCountAtLeast(2),
+                AfterWatermark.pastEndOfWindow())))
+            .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
+            .withAllowedLateness(Duration.millis(100))
+            .withClosingBehavior(ClosingBehavior.FIRE_IF_NON_EMPTY));
+
+    tester.advanceInputWatermark(new Instant(0));
+    tester.injectElements(
+        TimestampedValue.of(1, new Instant(1)),
+        TimestampedValue.of(2, new Instant(2)));
+    tester.advanceInputWatermark(new Instant(20));
+    tester.advanceInputWatermark(new Instant(250));
+
+    List<WindowedValue<Iterable<Integer>>> output = tester.extractOutput();
+    assertThat(output, contains(
+        // Trigger with 2 elements
+        WindowMatchers.isSingleWindowedValue(containsInAnyOrder(1, 2), 1, 0, 10),
+        // Trigger for the empty on time pane
+        WindowMatchers.isSingleWindowedValue(containsInAnyOrder(1, 2), 9, 0, 10)));
+  }
+
+  @Test
+  public void noEmptyPanesFinalAlways() throws Exception {
+    ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester = ReduceFnTester.nonCombining(
+        WindowingStrategy.of(FixedWindows.of(Duration.millis(10)))
+            .withTrigger(Repeatedly.<IntervalWindow>forever(AfterFirst.<IntervalWindow>of(
+                AfterPane.elementCountAtLeast(2),
+                AfterWatermark.pastEndOfWindow())))
+            .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
+            .withAllowedLateness(Duration.millis(100))
+            .withClosingBehavior(ClosingBehavior.FIRE_ALWAYS));
+
+    tester.advanceInputWatermark(new Instant(0));
+    tester.injectElements(
+        TimestampedValue.of(1, new Instant(1)),
+        TimestampedValue.of(2, new Instant(2)));
+    tester.advanceInputWatermark(new Instant(20));
+    tester.advanceInputWatermark(new Instant(250));
+
+    List<WindowedValue<Iterable<Integer>>> output = tester.extractOutput();
+    assertThat(output, contains(
+        // Trigger with 2 elements
+        WindowMatchers.isSingleWindowedValue(containsInAnyOrder(1, 2), 1, 0, 10),
+        // Trigger for the empty on time pane
+        WindowMatchers.isSingleWindowedValue(containsInAnyOrder(1, 2), 9, 0, 10),
+        // Trigger for the final pane
+        WindowMatchers.isSingleWindowedValue(containsInAnyOrder(1, 2), 9, 0, 10)));
+  }
+
+  @Test
   public void testPaneInfoAllStatesAfterWatermarkAccumulating() throws Exception {
     ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester = ReduceFnTester.nonCombining(
         WindowingStrategy.of(FixedWindows.of(Duration.millis(10)))


Mime
View raw message