flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject flink git commit: [FLINK-3688] WindowOperator.trigger() does not emit Watermark anymore
Date Fri, 08 Apr 2016 13:56:20 GMT
Repository: flink
Updated Branches:
  refs/heads/release-1.0 43093e3b1 -> 1554c9b42


[FLINK-3688] WindowOperator.trigger() does not emit Watermark anymore


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1554c9b4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1554c9b4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1554c9b4

Branch: refs/heads/release-1.0
Commit: 1554c9b42f9b76c197ed0a8c2144d31ea965080b
Parents: 43093e3
Author: Konstantin Knauf <konstantin.knauf@tngtech.com>
Authored: Sun Apr 3 13:57:35 2016 +0200
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Fri Apr 8 15:54:39 2016 +0200

----------------------------------------------------------------------
 .../runtime/operators/windowing/WindowOperator.java    | 13 ++++++++-----
 1 file changed, 8 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1554c9b4/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index 9b7b347..b370117 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -269,7 +269,14 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 
 	@Override
 	public final void processWatermark(Watermark mark) throws Exception {
+		processTriggersFor(mark);
 
+		output.emitWatermark(mark);
+
+		this.currentWatermark = mark.getTimestamp();
+	}
+
+	private void processTriggersFor(Watermark mark) throws Exception {
 		boolean fire;
 
 		do {
@@ -289,10 +296,6 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 				fire = false;
 			}
 		} while (fire);
-
-		output.emitWatermark(mark);
-
-		this.currentWatermark = mark.getTimestamp();
 	}
 
 	@Override
@@ -320,7 +323,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 		// Also check any watermark timers. We might have some in here since
 		// Context.registerEventTimeTimer sets a trigger if an event-time trigger is registered
 		// that is already behind the watermark.
-		processWatermark(new Watermark(currentWatermark));
+		processTriggersFor(new Watermark(currentWatermark));
 	}
 
 	/**


Mime
View raw message