flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject flink git commit: [hotfix] Fix processing time triggering on Window Operator
Date Fri, 23 Oct 2015 09:15:40 GMT
Repository: flink
Updated Branches:
  refs/heads/release-0.10 c72eff4af -> 85b73e0fd


[hotfix] Fix processing time triggering on Window Operator

Before it would only trigger if expectedTime < time. Now it is
expectedTime <= time.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/85b73e0f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/85b73e0f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/85b73e0f

Branch: refs/heads/release-0.10
Commit: 85b73e0fd255608494fa9ae74a9a505a4989d6ab
Parents: c72eff4
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Fri Oct 23 11:13:37 2015 +0200
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Fri Oct 23 11:15:21 2015 +0200

----------------------------------------------------------------------
 .../runtime/operators/windowing/NonKeyedWindowOperator.java    | 6 +++---
 .../streaming/runtime/operators/windowing/WindowOperator.java  | 5 +++--
 2 files changed, 6 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/85b73e0f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
index 101c818..a002b23 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
@@ -304,15 +304,15 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
 		Set<Long> toRemove = new HashSet<>();
 
 		for (Map.Entry<Long, Set<Context>> triggers: processingTimeTimers.entrySet())
{
-			if (triggers.getKey() < time) {
+			long actualTime = triggers.getKey();
+			if (actualTime <= time) {
 				for (Context context: triggers.getValue()) {
-					Trigger.TriggerResult triggerResult = context.onProcessingTime(time);
+					Trigger.TriggerResult triggerResult = context.onProcessingTime(actualTime);
 					processTriggerResult(triggerResult, context.window);
 				}
 				toRemove.add(triggers.getKey());
 			}
 		}
-
 		for (Long l: toRemove) {
 			processingTimeTimers.remove(l);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/85b73e0f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index 04c393c..a80f971 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -365,9 +365,10 @@ public class WindowOperator<K, IN, OUT, W extends Window>
 		Set<Long> toRemove = new HashSet<>();
 
 		for (Map.Entry<Long, Set<Context>> triggers: processingTimeTimers.entrySet())
{
-			if (triggers.getKey() < time) {
+			long actualTime = triggers.getKey();
+			if (actualTime <= time) {
 				for (Context context: triggers.getValue()) {
-					Trigger.TriggerResult triggerResult = context.onProcessingTime(time);
+					Trigger.TriggerResult triggerResult = context.onProcessingTime(actualTime);
 					processTriggerResult(triggerResult, context.key, context.window);
 				}
 				toRemove.add(triggers.getKey());


Mime
View raw message