flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [1/2] flink git commit: [FLINK-2577] [streaming] Fix Stalling Watermarks when Sources Close
Date Thu, 10 Sep 2015 16:55:46 GMT
Repository: flink
Updated Branches:
  refs/heads/master 9c2eaa8dc -> aec4b1575


[FLINK-2577] [streaming] Fix Stalling Watermarks when Sources Close

Before, when one source closes early it will not emit watermarks
anymore. Downstream operations don't know about this and expect
watermarks to keep on coming. This leads to watermarks not being
forwarded anymore.

Now, when a source closes it will emit a final watermark with timestamp
Long.MAX_VALUE. This will have the effect of allowing the watermarks
from the other operations to propagate though because the watermark is
defined as the minimum over all inputs.

The Long.MAX_VALUE watermark has the added benefit of notifying
operations that no more elements will arrive in the future.

This closes #1060


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c438cff8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c438cff8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c438cff8

Branch: refs/heads/master
Commit: c438cff86d5ef65a753f439b8dbca6f897611a7c
Parents: 9c2eaa8
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Wed Aug 26 15:46:20 2015 +0200
Committer: Fabian Hueske <fhueske@apache.org>
Committed: Thu Sep 10 18:27:05 2015 +0200

----------------------------------------------------------------------
 .../streaming/api/operators/StreamSource.java   | 18 ++++++++++-
 .../api/operators/co/CoStreamFlatMap.java       |  8 ++---
 .../streaming/api/operators/co/CoStreamMap.java |  8 ++---
 .../api/operators/co/CoStreamReduce.java        |  8 ++---
 .../api/operators/co/CoStreamWindow.java        |  8 ++---
 .../streaming/api/watermark/Watermark.java      |  5 +++
 .../streaming/timestamp/TimestampITCase.java    | 32 +++++++++++++++++---
 7 files changed, 66 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c438cff8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
index 7fad295..e100fa6 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
@@ -44,6 +44,7 @@ public class StreamSource<T> extends AbstractUdfStreamOperator<T,
SourceFunction
 	public void run(Object lockingObject, Output<StreamRecord<T>> collector) throws
Exception {
 
 		SourceFunction.SourceContext<T> ctx;
+
 		if (userFunction instanceof EventTimeSourceFunction) {
 			ctx = new ManualWatermarkContext<T>(lockingObject, collector);
 		} else if (executionConfig.getAutoWatermarkInterval() > 0) {
@@ -55,6 +56,10 @@ public class StreamSource<T> extends AbstractUdfStreamOperator<T,
SourceFunction
 		}
 
 		userFunction.run(ctx);
+
+		// This will mostly emit a final +Inf Watermark to make the Watermark logic work
+		// when some sources finish before others do
+		ctx.close();
 	}
 
 	public void cancel() {
@@ -235,6 +240,11 @@ public class StreamSource<T> extends AbstractUdfStreamOperator<T,
SourceFunction
 		public void close() {
 			watermarkTimer.cancel(true);
 			scheduleExecutor.shutdownNow();
+			// emit one last +Inf watermark to make downstream watermark processing work
+			// when some sources close early
+			synchronized (lockingObject) {
+				output.emitWatermark(new Watermark(Long.MAX_VALUE));
+			}
 		}
 	}
 
@@ -278,6 +288,12 @@ public class StreamSource<T> extends AbstractUdfStreamOperator<T,
SourceFunction
 		}
 
 		@Override
-		public void close() {}
+		public void close() {
+			// emit one last +Inf watermark to make downstream watermark processing work
+			// when some sources close early
+			synchronized (lockingObject) {
+				output.emitWatermark(new Watermark(Long.MAX_VALUE));
+			}
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c438cff8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java
index 1448ab8..cbf59c1 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java
@@ -36,8 +36,8 @@ public class CoStreamFlatMap<IN1, IN2, OUT>
 	// We keep track of watermarks from both inputs, the combined input is the minimum
 	// Once the minimum advances we emit a new watermark for downstream operators
 	private long combinedWatermark = Long.MIN_VALUE;
-	private long input1Watermark = Long.MAX_VALUE;
-	private long input2Watermark = Long.MAX_VALUE;
+	private long input1Watermark = Long.MIN_VALUE;
+	private long input2Watermark = Long.MIN_VALUE;
 
 	public CoStreamFlatMap(CoFlatMapFunction<IN1, IN2, OUT> flatMapper) {
 		super(flatMapper);
@@ -66,7 +66,7 @@ public class CoStreamFlatMap<IN1, IN2, OUT>
 	public void processWatermark1(Watermark mark) throws Exception {
 		input1Watermark = mark.getTimestamp();
 		long newMin = Math.min(input1Watermark, input2Watermark);
-		if (newMin > combinedWatermark && input1Watermark != Long.MAX_VALUE &&
input2Watermark != Long.MAX_VALUE) {
+		if (newMin > combinedWatermark) {
 			combinedWatermark = newMin;
 			output.emitWatermark(new Watermark(combinedWatermark));
 		}
@@ -76,7 +76,7 @@ public class CoStreamFlatMap<IN1, IN2, OUT>
 	public void processWatermark2(Watermark mark) throws Exception {
 		input2Watermark = mark.getTimestamp();
 		long newMin = Math.min(input1Watermark, input2Watermark);
-		if (newMin > combinedWatermark && input1Watermark != Long.MAX_VALUE &&
input2Watermark != Long.MAX_VALUE) {
+		if (newMin > combinedWatermark) {
 			combinedWatermark = newMin;
 			output.emitWatermark(new Watermark(combinedWatermark));
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/c438cff8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamMap.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamMap.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamMap.java
index 8d7c7c4..e34e727 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamMap.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamMap.java
@@ -32,8 +32,8 @@ public class CoStreamMap<IN1, IN2, OUT>
 	// We keep track of watermarks from both inputs, the combined input is the minimum
 	// Once the minimum advances we emit a new watermark for downstream operators
 	private long combinedWatermark = Long.MIN_VALUE;
-	private long input1Watermark = Long.MAX_VALUE;
-	private long input2Watermark = Long.MAX_VALUE;
+	private long input1Watermark = Long.MIN_VALUE;
+	private long input2Watermark = Long.MIN_VALUE;
 
 	public CoStreamMap(CoMapFunction<IN1, IN2, OUT> mapper) {
 		super(mapper);
@@ -53,7 +53,7 @@ public class CoStreamMap<IN1, IN2, OUT>
 	public void processWatermark1(Watermark mark) throws Exception {
 		input1Watermark = mark.getTimestamp();
 		long newMin = Math.min(input1Watermark, input2Watermark);
-		if (newMin > combinedWatermark && input1Watermark != Long.MAX_VALUE &&
input2Watermark != Long.MAX_VALUE) {
+		if (newMin > combinedWatermark) {
 			combinedWatermark = newMin;
 			output.emitWatermark(new Watermark(combinedWatermark));
 		}
@@ -63,7 +63,7 @@ public class CoStreamMap<IN1, IN2, OUT>
 	public void processWatermark2(Watermark mark) throws Exception {
 		input2Watermark = mark.getTimestamp();
 		long newMin = Math.min(input1Watermark, input2Watermark);
-		if (newMin > combinedWatermark && input1Watermark != Long.MAX_VALUE &&
input2Watermark != Long.MAX_VALUE) {
+		if (newMin > combinedWatermark) {
 			combinedWatermark = newMin;
 			output.emitWatermark(new Watermark(combinedWatermark));
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/c438cff8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamReduce.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamReduce.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamReduce.java
index 8609eab..dc6ea34 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamReduce.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamReduce.java
@@ -35,8 +35,8 @@ public class CoStreamReduce<IN1, IN2, OUT>
 	// We keep track of watermarks from both inputs, the combined input is the minimum
 	// Once the minimum advances we emit a new watermark for downstream operators
 	private long combinedWatermark = Long.MIN_VALUE;
-	private long input1Watermark = Long.MAX_VALUE;
-	private long input2Watermark = Long.MAX_VALUE;
+	private long input1Watermark = Long.MIN_VALUE;
+	private long input2Watermark = Long.MIN_VALUE;
 
 	public CoStreamReduce(CoReduceFunction<IN1, IN2, OUT> coReducer) {
 		super(coReducer);
@@ -68,7 +68,7 @@ public class CoStreamReduce<IN1, IN2, OUT>
 	public void processWatermark1(Watermark mark) throws Exception {
 		input1Watermark = mark.getTimestamp();
 		long newMin = Math.min(input1Watermark, input2Watermark);
-		if (newMin > combinedWatermark && input1Watermark != Long.MAX_VALUE &&
input2Watermark != Long.MAX_VALUE) {
+		if (newMin > combinedWatermark) {
 			combinedWatermark = newMin;
 			output.emitWatermark(new Watermark(combinedWatermark));
 		}
@@ -78,7 +78,7 @@ public class CoStreamReduce<IN1, IN2, OUT>
 	public void processWatermark2(Watermark mark) throws Exception {
 		input2Watermark = mark.getTimestamp();
 		long newMin = Math.min(input1Watermark, input2Watermark);
-		if (newMin > combinedWatermark && input1Watermark != Long.MAX_VALUE &&
input2Watermark != Long.MAX_VALUE) {
+		if (newMin > combinedWatermark) {
 			combinedWatermark = newMin;
 			output.emitWatermark(new Watermark(combinedWatermark));
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/c438cff8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamWindow.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamWindow.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamWindow.java
index 40d0a89..4bfe2ab 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamWindow.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamWindow.java
@@ -52,8 +52,8 @@ public class CoStreamWindow<IN1, IN2, OUT>
 	// We keep track of watermarks from both inputs, the combined input is the minimum
 	// Once the minimum advances we emit a new watermark for downstream operators
 	private long combinedWatermark = Long.MIN_VALUE;
-	private long input1Watermark = Long.MAX_VALUE;
-	private long input2Watermark = Long.MAX_VALUE;
+	private long input1Watermark = Long.MIN_VALUE;
+	private long input2Watermark = Long.MIN_VALUE;
 
 	public CoStreamWindow(CoWindowFunction<IN1, IN2, OUT> coWindowFunction, long windowSize,
 			long slideInterval, TimestampWrapper<IN1> timeStamp1, TimestampWrapper<IN2>
timeStamp2) {
@@ -105,7 +105,7 @@ public class CoStreamWindow<IN1, IN2, OUT>
 	public void processWatermark1(Watermark mark) throws Exception {
 		input1Watermark = mark.getTimestamp();
 		long newMin = Math.min(input1Watermark, input2Watermark);
-		if (newMin > combinedWatermark && input1Watermark != Long.MAX_VALUE &&
input2Watermark != Long.MAX_VALUE) {
+		if (newMin > combinedWatermark) {
 			combinedWatermark = newMin;
 			output.emitWatermark(new Watermark(combinedWatermark));
 		}
@@ -115,7 +115,7 @@ public class CoStreamWindow<IN1, IN2, OUT>
 	public void processWatermark2(Watermark mark) throws Exception {
 		input2Watermark = mark.getTimestamp();
 		long newMin = Math.min(input1Watermark, input2Watermark);
-		if (newMin > combinedWatermark && input1Watermark != Long.MAX_VALUE &&
input2Watermark != Long.MAX_VALUE) {
+		if (newMin > combinedWatermark) {
 			combinedWatermark = newMin;
 			output.emitWatermark(new Watermark(combinedWatermark));
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/c438cff8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/watermark/Watermark.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/watermark/Watermark.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/watermark/Watermark.java
index 163791e..838c24a 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/watermark/Watermark.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/watermark/Watermark.java
@@ -33,6 +33,11 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
  * In some cases a watermark is only a heuristic and operators should be able to deal with
  * late elements. They can either discard those or update the result and emit updates/retractions
  * to downstream operations.
+ *
+ * <p>
+ * When a source closes it will emit a final watermark with timestamp {@code Long.MAX_VALUE}.
When
+ * an operator receives this it will know that no more input will be arriving in the future.
+ *
  */
 public class Watermark extends StreamElement {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c438cff8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
index 523d1e3..5dc6b12 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
@@ -91,6 +91,9 @@ public class TimestampITCase {
 	 * arrive at operators throughout a topology.
 	 *
 	 * <p>
+	 * This also checks whether watermarks keep propagating if a source closes early.
+	 *
+	 * <p>
 	 * This only uses map to test the workings of watermarks in a complete, running topology.
All
 	 * tasks and stream operators have dedicated tests that test the watermark propagation
 	 * behaviour.
@@ -109,9 +112,9 @@ public class TimestampITCase {
 
 
 		DataStream<Integer> source1 = env.addSource(new MyTimestampSource(initialTime, NUM_WATERMARKS));
-		DataStream<Integer> source2 = env.addSource(new MyTimestampSource(initialTime, NUM_WATERMARKS));
+		DataStream<Integer> source2 = env.addSource(new MyTimestampSource(initialTime, NUM_WATERMARKS
/ 2));
 
-		source1
+		source1.union(source2)
 				.map(new IdentityMap())
 				.connect(source2).map(new IdentityCoMap())
 				.transform("Custom Operator", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator())
@@ -121,11 +124,32 @@ public class TimestampITCase {
 
 		// verify that all the watermarks arrived at the final custom operator
 		for (int i = 0; i < PARALLELISM; i++) {
-			for (int j = 0; j < NUM_WATERMARKS; j++) {
-				if (!CustomOperator.finalWatermarks[i].get(j).equals(new Watermark(initialTime + j)))
{
+			// There can be two cases, either we get NUM_WATERMARKS + 1 watermarks or
+			// (NUM_WATERMARKS / 2) + 1 watermarks. This depends on which source get's to run first.
+			// If source1 runs first we jump directly to +Inf and skip all the intermediate
+			// watermarks. If source2 runs first we see the intermediate watermarks from
+			// NUM_WATERMARKS/2 to +Inf.
+			if (CustomOperator.finalWatermarks[i].size() == NUM_WATERMARKS + 1) {
+				for (int j = 0; j < NUM_WATERMARKS; j++) {
+					if (!CustomOperator.finalWatermarks[i].get(j).equals(new Watermark(initialTime + j)))
{
+						Assert.fail("Wrong watermark.");
+					}
+				}
+				if (!CustomOperator.finalWatermarks[i].get(NUM_WATERMARKS).equals(new Watermark(Long.MAX_VALUE)))
{
 					Assert.fail("Wrong watermark.");
 				}
+			} else {
+				for (int j = 0; j < NUM_WATERMARKS / 2; j++) {
+					if (!CustomOperator.finalWatermarks[i].get(j).equals(new Watermark(initialTime + j)))
{
+						Assert.fail("Wrong watermark.");
+					}
+				}
+				if (!CustomOperator.finalWatermarks[i].get(NUM_WATERMARKS / 2).equals(new Watermark(Long.MAX_VALUE)))
{
+					Assert.fail("Wrong watermark.");
+				}
+
 			}
+
 		}
 	}
 


Mime
View raw message