beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [6/9] beam git commit: Do not GC windows based on processing time timer!
Date Thu, 22 Jun 2017 23:03:58 GMT
Do not GC windows based on processing time timer!


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/50c43d96
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/50c43d96
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/50c43d96

Branch: refs/heads/master
Commit: 50c43d96adb8c2523cf38c09f32e241eacc47823
Parents: 412fd7e
Author: Kenneth Knowles <klk@google.com>
Authored: Thu Jun 22 12:56:34 2017 -0700
Committer: Kenneth Knowles <klk@google.com>
Committed: Thu Jun 22 13:58:08 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/core/ReduceFnRunner.java       |  3 +-
 .../beam/runners/core/ReduceFnRunnerTest.java   | 35 +++++++++++++++++++-
 2 files changed, 36 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/50c43d96/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
index b5c3e3e..75b6acd 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
@@ -663,7 +663,8 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow>
{
       this.isEndOfWindow = TimeDomain.EVENT_TIME == timer.getDomain()
           && timer.getTimestamp().equals(window.maxTimestamp());
       Instant cleanupTime = LateDataUtils.garbageCollectionTime(window, windowingStrategy);
-      this.isGarbageCollection = !timer.getTimestamp().isBefore(cleanupTime);
+      this.isGarbageCollection =
+          TimeDomain.EVENT_TIME == timer.getDomain() && !timer.getTimestamp().isBefore(cleanupTime);
     }
 
     // Has this window had its trigger finish?

http://git-wip-us.apache.org/repos/asf/beam/blob/50c43d96/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
index 9e71300..2b66162 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
@@ -140,7 +140,40 @@ public class ReduceFnRunnerTest {
       }
     })
     .when(mockTrigger).onFire(anyTriggerContext());
- }
+  }
+
+  /**
+   * Tests that a processing time timer does not cause window GC.
+   */
+  @Test
+  public void testProcessingTimeTimerDoesNotGc() throws Exception {
+    WindowingStrategy<?, IntervalWindow> strategy =
+        WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(100)))
+            .withTimestampCombiner(TimestampCombiner.EARLIEST)
+            .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
+            .withAllowedLateness(Duration.ZERO)
+            .withTrigger(
+                Repeatedly.forever(
+                    AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(10))));
+
+    ReduceFnTester<Integer, Integer, IntervalWindow> tester =
+        ReduceFnTester.combining(strategy, Sum.ofIntegers(), VarIntCoder.of());
+
+    tester.advanceProcessingTime(new Instant(5000));
+    injectElement(tester, 2); // processing timer @ 5000 + 10; EOW timer @ 100
+    injectElement(tester, 5);
+
+    tester.advanceProcessingTime(new Instant(10000));
+
+    tester.assertHasOnlyGlobalAndStateFor(
+        new IntervalWindow(new Instant(0), new Instant(100)));
+
+    assertThat(
+        tester.extractOutput(),
+        contains(
+            isSingleWindowedValue(
+                equalTo(7), 2, 0, 100, PaneInfo.createPane(true, false, Timing.EARLY, 0,
0))));
+  }
 
   @Test
   public void testOnElementBufferingDiscarding() throws Exception {


Mime
View raw message