beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbono...@apache.org
Subject [2/3] beam git commit: Ignore processing time timers in expired windows
Date Mon, 10 Jul 2017 20:05:22 GMT
Ignore processing time timers in expired windows


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3d7b0098
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3d7b0098
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3d7b0098

Branch: refs/heads/release-2.1.0
Commit: 3d7b00983d2ef215639c3fefc3d8df487aac7b2e
Parents: 53b372b
Author: Kenneth Knowles <klk@google.com>
Authored: Thu Jun 22 18:09:11 2017 -0700
Committer: Kenneth Knowles <klk@google.com>
Committed: Thu Jul 6 14:38:58 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/core/ReduceFnRunner.java       | 10 ++++++
 .../beam/runners/core/ReduceFnRunnerTest.java   | 32 ++++++++++++++++++++
 2 files changed, 42 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/3d7b0098/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
index ef33bef..0632c05 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
@@ -693,6 +693,11 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow>
{
       @SuppressWarnings("unchecked")
         WindowNamespace<W> windowNamespace = (WindowNamespace<W>) timer.getNamespace();
       W window = windowNamespace.getWindow();
+
+      if (TimeDomain.PROCESSING_TIME == timer.getDomain() && windowIsExpired(window))
{
+        continue;
+      }
+
       ReduceFn<K, InputT, OutputT, W>.Context directContext =
           contextFactory.base(window, StateStyle.DIRECT);
       ReduceFn<K, InputT, OutputT, W>.Context renamedContext =
@@ -1090,4 +1095,9 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow>
{
     }
   }
 
+  private boolean windowIsExpired(BoundedWindow w) {
+    return timerInternals
+        .currentInputWatermarkTime()
+        .isAfter(w.maxTimestamp().plus(windowingStrategy.getAllowedLateness()));
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/3d7b0098/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
index 3a2c220..79ee91b 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
@@ -286,6 +286,38 @@ public class ReduceFnRunnerTest {
 
   /**
    * Tests that when a processing time timer comes in after a window is expired
+   * it is just ignored.
+   */
+  @Test
+  public void testLateProcessingTimeTimer() throws Exception {
+    WindowingStrategy<?, IntervalWindow> strategy =
+        WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(100)))
+            .withTimestampCombiner(TimestampCombiner.EARLIEST)
+            .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
+            .withAllowedLateness(Duration.ZERO)
+            .withTrigger(
+                Repeatedly.forever(
+                    AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(10))));
+
+    ReduceFnTester<Integer, Integer, IntervalWindow> tester =
+        ReduceFnTester.combining(strategy, Sum.ofIntegers(), VarIntCoder.of());
+
+    tester.advanceProcessingTime(new Instant(5000));
+    injectElement(tester, 2); // processing timer @ 5000 + 10; EOW timer @ 100
+    injectElement(tester, 5);
+
+    // After this advancement, the window is expired and only the GC process
+    // should be allowed to touch it
+    tester.advanceInputWatermarkNoTimers(new Instant(100));
+
+    // This should not output
+    tester.advanceProcessingTime(new Instant(6000));
+
+    assertThat(tester.extractOutput(), emptyIterable());
+  }
+
+  /**
+   * Tests that when a processing time timer comes in after a window is expired
    * but in the same bundle it does not cause a spurious output.
    */
   @Test


Mime
View raw message