flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject flink git commit: [hotfix] Fix Long.MAX_VALUE watermark emit
Date Tue, 15 Dec 2015 16:45:17 GMT
Repository: flink
Updated Branches:
  refs/heads/master d9a061c67 -> e20c6390d


[hotfix] Fix Long.MAX_VALUE watermark emit

Before, when the Kafka Source would emit a final Long.MAX_VALUE
watermark to signal the end of elements (i.e. no partition assigned).
This would trip up the AutomaticWatermarkContext and the
NonWatermarkContext.

Now, a Long.MAX_VALUE watermark is allowed through.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e20c6390
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e20c6390
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e20c6390

Branch: refs/heads/master
Commit: e20c6390d6b19a6d3a5053c8aa0a892586f427f9
Parents: d9a061c
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Mon Dec 14 18:49:45 2015 +0100
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Tue Dec 15 17:44:32 2015 +0100

----------------------------------------------------------------------
 .../flink/streaming/api/operators/StreamSource.java | 16 ++++++++++++++--
 1 file changed, 14 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e20c6390/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
index 996e32c..63271c5 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
@@ -151,9 +151,15 @@ public class StreamSource<T> extends AbstractUdfStreamOperator<T,
SourceFunction
 
 		@Override
 		public void emitWatermark(Watermark mark) {
-			throw new UnsupportedOperationException("Automatic-Timestamp sources cannot emit" +
+			if (mark.getTimestamp() == Long.MAX_VALUE) {
+				// allow it since this is the special end-watermark that for example the Kafka
+				// source emits
+				output.emitWatermark(mark);
+			} else {
+				throw new UnsupportedOperationException("Automatic-Timestamp sources cannot emit" +
 					" elements with a timestamp. See interface EventTimeSourceFunction" +
 					" if you want to manually assign timestamps to elements.");
+			}
 		}
 
 		@Override
@@ -236,9 +242,15 @@ public class StreamSource<T> extends AbstractUdfStreamOperator<T,
SourceFunction
 
 		@Override
 		public void emitWatermark(Watermark mark) {
-			throw new UnsupportedOperationException("Automatic-Timestamp sources cannot emit" +
+			if (mark.getTimestamp() == Long.MAX_VALUE) {
+				// allow it since this is the special end-watermark that for example the Kafka
+				// source emits
+				output.emitWatermark(mark);
+			} else {
+				throw new UnsupportedOperationException("Automatic-Timestamp sources cannot emit" +
 					" elements with a timestamp. See interface EventTimeSourceFunction" +
 					" if you want to manually assign timestamps to elements.");
+			}
 		}
 
 		@Override


Mime
View raw message