flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m..@apache.org
Subject flink git commit: [FLINK-5149] let ContinuousEventTimeTrigger fire at the end of the window
Date Thu, 24 Nov 2016 10:38:12 GMT
Repository: flink
Updated Branches:
  refs/heads/release-1.1 424fb24c3 -> fc24c30ee


[FLINK-5149] let ContinuousEventTimeTrigger fire at the end of the window

This changes the ContinuousEventTimeTrigger to behave like the
EventTimeTrigger in the sense that it also triggers at the end of the
window.

This prevents the trigger from not firing at all in case the first
trigger interval is after the window end.

This closes #2860.

[typo] fix toString() of ContinuousEventTimeTrigger

This closes #2854.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fc24c30e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fc24c30e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fc24c30e

Branch: refs/heads/release-1.1
Commit: fc24c30ee0edb3a66196629cece942bb8a0b155c
Parents: 424fb24
Author: Maximilian Michels <mxm@apache.org>
Authored: Wed Nov 23 16:01:35 2016 +0100
Committer: Maximilian Michels <mxm@apache.org>
Committed: Thu Nov 24 11:38:42 2016 +0100

----------------------------------------------------------------------
 .../triggers/ContinuousEventTimeTrigger.java    | 22 +++++++++++++-------
 1 file changed, 15 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fc24c30e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
index cb8cdf5..c562fa9 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
@@ -53,31 +53,39 @@ public class ContinuousEventTimeTrigger<W extends Window> extends
Trigger<Object
 	@Override
 	public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext
ctx) throws Exception {
 
-		ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
+		if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
+			// if the watermark is already past the window fire immediately
+			return TriggerResult.FIRE;
+		} else {
+			ctx.registerEventTimeTimer(window.maxTimestamp());
+		}
 
+		ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
 		if (fireTimestamp.get() == null) {
 			long start = timestamp - (timestamp % interval);
 			long nextFireTimestamp = start + interval;
-
 			ctx.registerEventTimeTimer(nextFireTimestamp);
-
 			fireTimestamp.add(nextFireTimestamp);
-			return TriggerResult.CONTINUE;
 		}
+
 		return TriggerResult.CONTINUE;
 	}
 
 	@Override
 	public TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception
{
-		ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
 
+		if (time == window.maxTimestamp()){
+			return TriggerResult.FIRE;
+		}
+
+		ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
 		if (fireTimestamp.get().equals(time)) {
 			fireTimestamp.clear();
 			fireTimestamp.add(time + interval);
 			ctx.registerEventTimeTimer(time + interval);
 			return TriggerResult.FIRE;
-
 		}
+
 		return TriggerResult.CONTINUE;
 	}
 
@@ -113,7 +121,7 @@ public class ContinuousEventTimeTrigger<W extends Window> extends
Trigger<Object
 
 	@Override
 	public String toString() {
-		return "ContinuousProcessingTimeTrigger(" + interval + ")";
+		return "ContinuousEventTimeTrigger(" + interval + ")";
 	}
 
 	@VisibleForTesting


Mime
View raw message