beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [2/3] incubator-beam git commit: Fix overflow in ReduceFnRunner garbage collection times
Date Thu, 16 Jun 2016 18:05:38 GMT
Fix overflow in ReduceFnRunner garbage collection times


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/dc60dc7c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/dc60dc7c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/dc60dc7c

Branch: refs/heads/master
Commit: dc60dc7ce22f95e12c99bb65f258931f330444c9
Parents: 925264e
Author: Kenneth Knowles <klk@google.com>
Authored: Tue Jun 14 16:10:09 2016 -0700
Committer: Kenneth Knowles <klk@google.com>
Committed: Thu Jun 16 11:04:23 2016 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/util/ReduceFnRunner.java | 19 ++++++++++++-------
 1 file changed, 12 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dc60dc7c/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java
index 34208da..864e8e7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java
@@ -936,16 +936,21 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow>
{
   }
 
   /**
-   * Return when {@code window} should be garbage collected. If the window is the GlobalWindow,
-   * that will be the end of the window. Otherwise, add the allowed lateness to the end of
-   * the window.
+   * Return when {@code window} should be garbage collected. If the window's expiration time
is on
+   * or after the end of the global window, it will be truncated to the end of the global
window.
    */
   private Instant garbageCollectionTime(W window) {
-    Instant maxTimestamp = window.maxTimestamp();
-    if (maxTimestamp.isBefore(GlobalWindow.INSTANCE.maxTimestamp())) {
-      return maxTimestamp.plus(windowingStrategy.getAllowedLateness());
+
+    // If the end of the window + allowed lateness is beyond the "end of time" aka the end
of the
+    // global window, then we truncate it. The conditional is phrased like it is because
the
+    // addition of EOW + allowed lateness might even overflow the maximum allowed Instant
+    if (GlobalWindow.INSTANCE
+        .maxTimestamp()
+        .minus(windowingStrategy.getAllowedLateness())
+        .isBefore(window.maxTimestamp())) {
+      return GlobalWindow.INSTANCE.maxTimestamp();
     } else {
-      return maxTimestamp;
+      return window.maxTimestamp().plus(windowingStrategy.getAllowedLateness());
     }
   }
 


Mime
View raw message