flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject flink git commit: [FLINK-3520] [streaming] Periodic watermark operator emits current watermark in close()
Date Fri, 26 Feb 2016 12:33:01 GMT
Repository: flink
Updated Branches:
  refs/heads/master 725f7467e -> de9a878e9


[FLINK-3520] [streaming] Periodic watermark operator emits current watermark in close()

This makes sure that for bounded data sets with watermarks, the final elements get properly
reflected in window results.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/de9a878e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/de9a878e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/de9a878e

Branch: refs/heads/master
Commit: de9a878e9555023e1c63f9c41eb281650f4840df
Parents: 725f746
Author: Stephan Ewen <sewen@apache.org>
Authored: Fri Feb 26 12:01:44 2016 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Fri Feb 26 13:32:27 2016 +0100

----------------------------------------------------------------------
 .../TimestampsAndPeriodicWatermarksOperator.java         | 11 +++++++++++
 1 file changed, 11 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/de9a878e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java
index fbd46ef..a1614c0 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java
@@ -88,4 +88,15 @@ public class TimestampsAndPeriodicWatermarksOperator<T>
 			output.emitWatermark(mark);
 		}
 	}
+
+	@Override
+	public void close() throws Exception {
+		// emit a final watermark
+		Watermark newWatermark = userFunction.getCurrentWatermark();
+		if (newWatermark != null && newWatermark.getTimestamp() > currentWatermark)
{
+			currentWatermark = newWatermark.getTimestamp();
+			// emit watermark
+			output.emitWatermark(newWatermark);
+		}
+	}
 }


Mime
View raw message