flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [4/4] flink git commit: [hotfix] Expose current watermark to Triggers
Date Wed, 24 Feb 2016 21:02:01 GMT
[hotfix] Expose current watermark to Triggers

This closes #1706


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b94d99d8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b94d99d8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b94d99d8

Branch: refs/heads/master
Commit: b94d99d8aca25c2b3e8a2a42d155e5442a956371
Parents: ceb6424
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Wed Feb 24 16:05:31 2016 +0100
Committer: Robert Metzger <rmetzger@apache.org>
Committed: Wed Feb 24 22:01:14 2016 +0100

----------------------------------------------------------------------
 .../apache/flink/streaming/api/windowing/triggers/Trigger.java | 5 +++++
 .../runtime/operators/windowing/NonKeyedWindowOperator.java    | 4 ++++
 .../streaming/runtime/operators/windowing/WindowOperator.java  | 6 +++++-
 3 files changed, 14 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b94d99d8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
index d68d8fa..db78558 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
@@ -95,6 +95,11 @@ public abstract class Trigger<T, W extends Window> implements Serializable
{
 	 * callbacks and deal with state.
 	 */
 	public interface TriggerContext {
+
+		/**
+		 * Returns the current watermark time.
+		 */
+		long getCurrentWatermark();
 	
 		/**
 		 * Register a system time callback. When the current system time passes the specified

http://git-wip-us.apache.org/repos/asf/flink/blob/b94d99d8/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
index 223afa8..e42d7b4 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
@@ -378,6 +378,10 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
 			this.processingTimeTimer = -1;
 		}
 
+		@Override
+		public long getCurrentWatermark() {
+			return currentWatermark;
+		}
 
 		@SuppressWarnings("unchecked")
 		protected Context(DataInputView in, ClassLoader userClassloader) throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/b94d99d8/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index 85cc93c..289492b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -337,6 +337,10 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 			this.window = window;
 		}
 
+		public long getCurrentWatermark() {
+			return currentWatermark;
+		}
+
 		@Override
 		public <S extends Serializable> ValueState<S> getKeyValueState(String name,
 			Class<S> stateType,
@@ -396,7 +400,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 			if (time <= currentWatermark) {
 				// immediately schedule a trigger, so that we don't wait for the next
 				// watermark update to fire the watermark trigger
-				getRuntimeContext().registerTimer(time, WindowOperator.this);
+				getRuntimeContext().registerTimer(System.currentTimeMillis(), WindowOperator.this);
 			}
 		}
 


Mime
View raw message