flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject flink git commit: [FLINK-3024] Fix TimestampExtractor.getCurrentWatermark() Behaviour
Date Tue, 17 Nov 2015 13:38:43 GMT
Repository: flink
Updated Branches:
  refs/heads/release-0.10 7c4cde3a6 -> 42bef80a8


[FLINK-3024] Fix TimestampExtractor.getCurrentWatermark() Behaviour

Previously the internal currentWatermark would be updated even if the
value returned from getCurrentWatermark was lower than the current
watermark.

This can lead to problems with chaining because the watermark is
directly forwarded without going through the watermark logic that
ensures correct behaviour (monotonically increasing).

This adds a test that verifies that the timestamp extractor does not
emit decreasing watermarks.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/42bef80a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/42bef80a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/42bef80a

Branch: refs/heads/release-0.10
Commit: 42bef80a857ff37a1b06429374bc8647fa5fac1a
Parents: 7c4cde3
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Tue Nov 17 11:40:22 2015 +0100
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Tue Nov 17 14:38:22 2015 +0100

----------------------------------------------------------------------
 .../operators/ExtractTimestampsOperator.java    |  6 +-
 .../streaming/timestamp/TimestampITCase.java    | 76 ++++++++++++++++++++
 2 files changed, 79 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/42bef80a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
index 6e51a49..9c27c6d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
@@ -79,10 +79,10 @@ public class ExtractTimestampsOperator<T>
 	public void trigger(long timestamp) throws Exception {
 		// register next timer
 		registerTimer(System.currentTimeMillis() + watermarkInterval, this);
-		long lastWatermark = currentWatermark;
-		currentWatermark = userFunction.getCurrentWatermark();
+		long newWatermark = userFunction.getCurrentWatermark();
 
-		if (currentWatermark > lastWatermark) {
+		if (newWatermark > currentWatermark) {
+			currentWatermark = newWatermark;
 			// emit watermark
 			output.emitWatermark(new Watermark(currentWatermark));
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/42bef80a/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
index 749e1dd..5113b45 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
@@ -31,6 +31,7 @@ import org.apache.flink.streaming.api.functions.co.CoMapFunction;
 import org.apache.flink.streaming.api.functions.source.EventTimeSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -379,6 +380,73 @@ public class TimestampITCase {
 	}
 
 	/**
+	 * This test verifies that the timestamp extractor does not emit decreasing watermarks even
+	 *
+	 */
+	@Test
+	public void testTimestampExtractorWithDecreasingCustomWatermarkEmit() throws Exception {
+		final int NUM_ELEMENTS = 10;
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost",
cluster.getLeaderRPCPort());
+		env.setParallelism(1);
+		env.getConfig().disableSysoutLogging();
+		env.getConfig().enableTimestamps();
+		env.getConfig().setAutoWatermarkInterval(1);
+
+
+		DataStream<Integer> source1 = env.addSource(new SourceFunction<Integer>() {
+			@Override
+			public void run(SourceContext<Integer> ctx) throws Exception {
+				int index = 0;
+				while (index < NUM_ELEMENTS) {
+					ctx.collect(index);
+					Thread.sleep(100);
+					ctx.collect(index - 1);
+					latch.await();
+					index++;
+				}
+			}
+
+			@Override
+			public void cancel() {
+
+			}
+		});
+
+		source1.assignTimestamps(new TimestampExtractor<Integer>() {
+			@Override
+			public long extractTimestamp(Integer element, long currentTimestamp) {
+				return element;
+			}
+
+			@Override
+			public long extractWatermark(Integer element, long currentTimestamp) {
+				return element - 1;
+			}
+
+			@Override
+			public long getCurrentWatermark() {
+				return Long.MIN_VALUE;
+			}
+		})
+				.transform("Watermark Check", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator())
+				.transform("Timestamp Check", BasicTypeInfo.INT_TYPE_INFO, new TimestampCheckingOperator());
+
+
+		env.execute();
+
+		// verify that we get NUM_ELEMENTS watermarks
+		for (int j = 0; j < NUM_ELEMENTS; j++) {
+			if (!CustomOperator.finalWatermarks[0].get(j).equals(new Watermark(j - 1))) {
+				Assert.fail("Wrong watermark.");
+			}
+		}
+		if (!CustomOperator.finalWatermarks[0].get(NUM_ELEMENTS).equals(new Watermark(Long.MAX_VALUE)))
{
+			Assert.fail("Wrong watermark.");
+		}
+	}
+
+	/**
 	 * This tests whether the program throws an exception when an event-time source tries
 	 * to emit without timestamp.
 	 */
@@ -442,6 +510,10 @@ public class TimestampITCase {
 		public static List<Watermark>[] finalWatermarks = new List[PARALLELISM];
 		private long oldTimestamp;
 
+		public CustomOperator() {
+			setChainingStrategy(ChainingStrategy.ALWAYS);
+		}
+
 		@Override
 		public void processElement(StreamRecord<Integer> element) throws Exception {
 			if (element.getTimestamp() != element.getValue()) {
@@ -473,6 +545,10 @@ public class TimestampITCase {
 
 	public static class TimestampCheckingOperator extends AbstractStreamOperator<Integer>
implements OneInputStreamOperator<Integer, Integer> {
 
+		public TimestampCheckingOperator() {
+			setChainingStrategy(ChainingStrategy.ALWAYS);
+		}
+
 		@Override
 		public void processElement(StreamRecord<Integer> element) throws Exception {
 			if (element.getTimestamp() != element.getValue()) {


Mime
View raw message