flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject flink git commit: [FLINK-3526] [streaming] Fix Processing Time Window Assigner and Trigger
Date Fri, 26 Feb 2016 20:20:23 GMT
Repository: flink
Updated Branches:
  refs/heads/master 69eeefabf -> 64519e1c1


[FLINK-3526] [streaming] Fix Processing Time Window Assigner and Trigger

This closes #1727


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/64519e1c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/64519e1c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/64519e1c

Branch: refs/heads/master
Commit: 64519e1c1593471bc21ef78c7c93591fb75a4fcf
Parents: 69eeefa
Author: Stephan Ewen <sewen@apache.org>
Authored: Fri Feb 26 20:06:06 2016 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Fri Feb 26 21:10:12 2016 +0100

----------------------------------------------------------------------
 .../api/windowing/assigners/TumblingProcessingTimeWindows.java    | 3 ++-
 .../streaming/api/windowing/triggers/ProcessingTimeTrigger.java   | 2 +-
 2 files changed, 3 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/64519e1c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
index 01de688..83f3d0c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
@@ -52,7 +52,8 @@ public class TumblingProcessingTimeWindows extends WindowAssigner<Object,
TimeWi
 
 	@Override
 	public Collection<TimeWindow> assignWindows(Object element, long timestamp) {
-		long start = timestamp - (timestamp % size);
+		final long now = System.currentTimeMillis();
+		long start = now - (now % size);
 		return Collections.singletonList(new TimeWindow(start, start + size));
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/64519e1c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
index 514885f..387e73b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
@@ -33,7 +33,7 @@ public class ProcessingTimeTrigger extends Trigger<Object, TimeWindow>
{
 
 	@Override
 	public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext
ctx) {
-		ctx.registerProcessingTimeTimer(window.maxTimestamp());
+		ctx.registerProcessingTimeTimer(window.getEnd());
 		return TriggerResult.CONTINUE;
 	}
 


Mime
View raw message