beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [3/4] beam git commit: Properly deal with late processing-time timers
Date Fri, 10 Mar 2017 21:02:21 GMT
Properly deal with late processing-time timers


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/dbfcf4b4
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/dbfcf4b4
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/dbfcf4b4

Branch: refs/heads/master
Commit: dbfcf4b4a63b38653adc21d1cf37d6c4cfd955ad
Parents: 1a8e1f7
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Fri Mar 10 15:25:26 2017 +0100
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Fri Mar 10 15:25:26 2017 +0100

----------------------------------------------------------------------
 .../beam/runners/core/StatefulDoFnRunner.java   | 40 ++++++++++++--------
 1 file changed, 24 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/dbfcf4b4/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
index c672902..d27193c 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
@@ -76,33 +76,31 @@ public class StatefulDoFnRunner<InputT, OutputT, W extends BoundedWindow>
   }
 
   @Override
-  public void processElement(WindowedValue<InputT> compressedElem) {
+  public void processElement(WindowedValue<InputT> input) {
 
     // StatefulDoFnRunner always observes windows, so we need to explode
-    for (WindowedValue<InputT> value : compressedElem.explodeWindows()) {
+    for (WindowedValue<InputT> value : input.explodeWindows()) {
 
       BoundedWindow window = value.getWindows().iterator().next();
 
-      if (!dropLateData(window)) {
+      if (isLate(window)) {
+        // The element is too late for this window.
+        droppedDueToLateness.addValue(1L);
+        WindowTracing.debug(
+            "StatefulDoFnRunner.processElement: Dropping element at {}; window:{} "
+                + "since too far behind inputWatermark:{}",
+            input.getTimestamp(), window, cleanupTimer.currentInputWatermarkTime());
+      } else {
         cleanupTimer.setForWindow(window);
         doFnRunner.processElement(value);
       }
     }
   }
 
-  private boolean dropLateData(BoundedWindow window) {
+  private boolean isLate(BoundedWindow window) {
     Instant gcTime = window.maxTimestamp().plus(windowingStrategy.getAllowedLateness());
     Instant inputWM = cleanupTimer.currentInputWatermarkTime();
-    if (gcTime.isBefore(inputWM)) {
-      // The element is too late for this window.
-      droppedDueToLateness.addValue(1L);
-      WindowTracing.debug(
-          "StatefulDoFnRunner.processElement/onTimer: Dropping element for window:{} "
-              + "since too far behind inputWatermark:{}", window, inputWM);
-      return true;
-    } else {
-      return false;
-    }
+    return gcTime.isBefore(inputWM);
   }
 
   @Override
@@ -112,8 +110,18 @@ public class StatefulDoFnRunner<InputT, OutputT, W extends BoundedWindow>
       stateCleaner.clearForWindow(window);
       // There should invoke the onWindowExpiration of DoFn
     } else {
-      // a timer can never be late because we don't allow setting timers after GC time
-      doFnRunner.onTimer(timerId, window, timestamp, timeDomain);
+      // An event-time timer can never be late because we don't allow setting timers after
GC time.
+      // Ot can happen that a processing-time time fires for a late window, we need to ignore
+      // this.
+      if (!timeDomain.equals(TimeDomain.EVENT_TIME) && isLate(window)) {
+        // don't increment the dropped counter, only do that for elements
+        WindowTracing.debug(
+            "StatefulDoFnRunner.onTimer: Ignoring processing-time timer at {}; window:{}
"
+                + "since window is too far behind inputWatermark:{}",
+            timestamp, window, cleanupTimer.currentInputWatermarkTime());
+      } else {
+        doFnRunner.onTimer(timerId, window, timestamp, timeDomain);
+      }
     }
   }
 


Mime
View raw message