beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [2/3] incubator-beam git commit: Remove extra timer firings in WatermarkManager
Date Mon, 22 Aug 2016 19:09:23 GMT
Remove extra timer firings in WatermarkManager

These timers should not be fired - the windows should be expired via the
GC timer, and any elements should be emitted if neccessary.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a65be9f9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a65be9f9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a65be9f9

Branch: refs/heads/master
Commit: a65be9f92c26ffbd00a72a644d619e636ba04b6e
Parents: b562072
Author: Thomas Groh <tgroh@google.com>
Authored: Mon Aug 22 10:05:35 2016 -0700
Committer: Thomas Groh <tgroh@google.com>
Committed: Mon Aug 22 10:05:35 2016 -0700

----------------------------------------------------------------------
 .../apache/beam/runners/direct/WatermarkManager.java | 15 ++++-----------
 1 file changed, 4 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a65be9f9/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
index c8dfa8c..a44fa50 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
@@ -1139,17 +1139,10 @@ public class WatermarkManager {
           inputWatermark.extractFiredEventTimeTimers();
       Map<StructuralKey<?>, List<TimerData>> processingTimers;
       Map<StructuralKey<?>, List<TimerData>> synchronizedTimers;
-      if (inputWatermark.get().equals(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
-        processingTimers = synchronizedProcessingInputWatermark.extractFiredDomainTimers(
-            TimeDomain.PROCESSING_TIME, BoundedWindow.TIMESTAMP_MAX_VALUE);
-        synchronizedTimers = synchronizedProcessingInputWatermark.extractFiredDomainTimers(
-            TimeDomain.PROCESSING_TIME, BoundedWindow.TIMESTAMP_MAX_VALUE);
-      } else {
-        processingTimers = synchronizedProcessingInputWatermark.extractFiredDomainTimers(
-            TimeDomain.PROCESSING_TIME, clock.now());
-        synchronizedTimers = synchronizedProcessingInputWatermark.extractFiredDomainTimers(
-            TimeDomain.SYNCHRONIZED_PROCESSING_TIME, getSynchronizedProcessingInputTime());
-      }
+      processingTimers = synchronizedProcessingInputWatermark.extractFiredDomainTimers(
+          TimeDomain.PROCESSING_TIME, clock.now());
+      synchronizedTimers = synchronizedProcessingInputWatermark.extractFiredDomainTimers(
+          TimeDomain.SYNCHRONIZED_PROCESSING_TIME, getSynchronizedProcessingInputTime());
       Map<StructuralKey<?>, Map<TimeDomain, List<TimerData>>> groupedTimers
= new HashMap<>();
       groupFiredTimers(groupedTimers, eventTimeTimers, processingTimers, synchronizedTimers);
 


Mime
View raw message