flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [2/2] flink git commit: [FLINK-3121] Emit Final Watermark in Kafka Source
Date Fri, 11 Dec 2015 15:26:33 GMT
[FLINK-3121] Emit Final Watermark in Kafka Source

Kafka sources that don't read from any partition never emit a watermark,
thereby blocking the progress of event-time in downstream operations.
This changes the Kafka Source to emit a Long.MAX_VALUE watermark if it
knows that it will never receive data.

This also changes the Timestamp Extraction operator to reacto to a
Long.MAX_VALUE watermark by itself emitting a Long.MAX_VALUE watermark.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6bd5714d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6bd5714d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6bd5714d

Branch: refs/heads/master
Commit: 6bd5714d2a045e581b1a761830d010598f803de7
Parents: 4b64887
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Wed Dec 9 12:13:22 2015 +0100
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Fri Dec 11 10:45:34 2015 +0100

----------------------------------------------------------------------
 .../connectors/kafka/FlinkKafkaConsumer.java    |  8 ++-
 .../streaming/api/operators/StreamSource.java   | 16 +++--
 .../operators/ExtractTimestampsOperator.java    | 15 ++---
 .../streaming/timestamp/TimestampITCase.java    | 62 ++++++++++++++++++++
 4 files changed, 82 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6bd5714d/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
index c4fd654..b139e95 100644
--- a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
+++ b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
@@ -32,6 +32,7 @@ import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.connectors.kafka.internals.Fetcher;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher;
@@ -434,7 +435,12 @@ public class FlinkKafkaConsumer<T> extends RichParallelSourceFunction<T>
 			}
 		}
 		else {
-			// this source never completes
+			// this source never completes, so emit a Long.MAX_VALUE watermark
+			// to not block watermark forwarding
+			if (getRuntimeContext().getExecutionConfig().areTimestampsEnabled()) {
+				sourceContext.emitWatermark(new Watermark(Long.MAX_VALUE));
+			}
+
 			final Object waitLock = new Object();
 			while (running) {
 				// wait until we are canceled

http://git-wip-us.apache.org/repos/asf/flink/blob/6bd5714d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
index 91c846f..996e32c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
@@ -60,6 +60,12 @@ public class StreamSource<T> extends AbstractUdfStreamOperator<T,
SourceFunction
 		// This will mostly emit a final +Inf Watermark to make the Watermark logic work
 		// when some sources finish before others do
 		ctx.close();
+
+		if (executionConfig.areTimestampsEnabled()) {
+			synchronized (lockingObject) {
+				output.emitWatermark(new Watermark(Long.MAX_VALUE));
+			}
+		}
 	}
 
 	public void cancel() {
@@ -296,14 +302,6 @@ public class StreamSource<T> extends AbstractUdfStreamOperator<T,
SourceFunction
 		}
 
 		@Override
-		public void close() {
-			// emit one last +Inf watermark to make downstream watermark processing work
-			// when some sources close early
-			synchronized (lockingObject) {
-				if (watermarkMultiplexingEnabled) {
-					output.emitWatermark(new Watermark(Long.MAX_VALUE));
-				}
-			}
-		}
+		public void close() {}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6bd5714d/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
index 9c27c6d..bfd9c8b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
@@ -57,14 +57,6 @@ public class ExtractTimestampsOperator<T>
 	}
 
 	@Override
-	public void close() throws Exception {
-		super.close();
-
-		// emit a final +Inf watermark, just like the sources
-		output.emitWatermark(new Watermark(Long.MAX_VALUE));
-	}
-
-	@Override
 	public void processElement(StreamRecord<T> element) throws Exception {
 		long newTimestamp = userFunction.extractTimestamp(element.getValue(), element.getTimestamp());
 		output.collect(element.replace(element.getValue(), newTimestamp));
@@ -90,6 +82,11 @@ public class ExtractTimestampsOperator<T>
 
 	@Override
 	public void processWatermark(Watermark mark) throws Exception {
-		// ignore them, since we are basically a watermark source
+		// if we receive a Long.MAX_VALUE watermark we forward it since it is used
+		// to signal the end of input and to not block watermark progress downstream
+		if (mark.getTimestamp() == Long.MAX_VALUE && mark.getTimestamp() > currentWatermark)
{
+			currentWatermark = Long.MAX_VALUE;
+			output.emitWatermark(mark);
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6bd5714d/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
index 8e7ada4..6c3ef40 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
@@ -448,6 +448,68 @@ public class TimestampITCase {
 	}
 
 	/**
+	 * This test verifies that the timestamp extractor forwards Long.MAX_VALUE watermarks.
+	 */
+	@Test
+	public void testTimestampExtractorWithLongMaxWatermarkFromSource() throws Exception {
+		final int NUM_ELEMENTS = 10;
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost",
cluster.getLeaderRPCPort());
+		env.setParallelism(2);
+		env.getConfig().disableSysoutLogging();
+		env.getConfig().enableTimestamps();
+		env.getConfig().setAutoWatermarkInterval(1);
+
+
+		DataStream<Integer> source1 = env.addSource(new EventTimeSourceFunction<Integer>()
{
+			@Override
+			public void run(SourceContext<Integer> ctx) throws Exception {
+				int index = 0;
+				while (index < NUM_ELEMENTS) {
+					ctx.collectWithTimestamp(index, index);
+					ctx.collectWithTimestamp(index - 1, index - 1);
+					index++;
+					ctx.emitWatermark(new Watermark(index-2));
+				}
+
+				// emit the final Long.MAX_VALUE watermark, do it twice and verify that
+				// we only see one in the result
+				ctx.emitWatermark(new Watermark(Long.MAX_VALUE));
+				ctx.emitWatermark(new Watermark(Long.MAX_VALUE));
+			}
+
+			@Override
+			public void cancel() {
+
+			}
+		});
+
+		source1.assignTimestamps(new TimestampExtractor<Integer>() {
+			@Override
+			public long extractTimestamp(Integer element, long currentTimestamp) {
+				return element;
+			}
+
+			@Override
+			public long extractWatermark(Integer element, long currentTimestamp) {
+				return Long.MIN_VALUE;
+			}
+
+			@Override
+			public long getCurrentWatermark() {
+				return Long.MIN_VALUE;
+			}
+		})
+			.transform("Watermark Check", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator(true));
+
+
+		env.execute();
+
+		Assert.assertTrue(CustomOperator.finalWatermarks[0].size() == 1);
+		Assert.assertTrue(CustomOperator.finalWatermarks[0].get(0).getTimestamp() == Long.MAX_VALUE);
+	}
+
+	/**
 	 * This tests whether the program throws an exception when an event-time source tries
 	 * to emit without timestamp.
 	 */


Mime
View raw message