flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject flink git commit: [FLINK-3439] Remove final Long.MAX_VALUE Watermark in StreamSource
Date Mon, 22 Feb 2016 13:57:26 GMT
Repository: flink
Updated Branches:
  refs/heads/master 9691d9524 -> 80c0c65b7


[FLINK-3439] Remove final Long.MAX_VALUE Watermark in StreamSource


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/80c0c65b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/80c0c65b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/80c0c65b

Branch: refs/heads/master
Commit: 80c0c65b77eff388b8a30144fbac2a157a383bd4
Parents: 9691d95
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Wed Feb 17 19:19:05 2016 +0100
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Mon Feb 22 10:46:23 2016 +0100

----------------------------------------------------------------------
 .../util/IncrementalLearningSkeletonData.java   |  2 +-
 .../java/org/apache/flink/cep/CEPITCase.java    | 31 +++++++++++---------
 .../streaming/api/operators/StreamSource.java   | 13 --------
 .../operators/windowing/CoGroupJoinITCase.java  | 26 +++++++++++-----
 .../operators/windowing/WindowFoldITCase.java   | 17 ++++++-----
 .../streaming/timestamp/TimestampITCase.java    | 26 +++++-----------
 .../streaming/api/scala/CoGroupJoinITCase.scala | 15 ++++++++++
 .../streaming/api/scala/WindowFoldITCase.scala  | 14 ++++++---
 8 files changed, 79 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/80c0c65b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/ml/util/IncrementalLearningSkeletonData.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/ml/util/IncrementalLearningSkeletonData.java
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/ml/util/IncrementalLearningSkeletonData.java
index 8a6cd88..1f4bfd8 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/ml/util/IncrementalLearningSkeletonData.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/ml/util/IncrementalLearningSkeletonData.java
@@ -19,7 +19,7 @@ package org.apache.flink.streaming.examples.ml.util;
 
 public class IncrementalLearningSkeletonData {
 
-	public static final String RESULTS = "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n"
+ "1\n" +
+	public static final String RESULTS = "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n"
+
 			"1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "0\n" + "0\n"
+
 			"0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n"
+
 			"0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n"
+

http://git-wip-us.apache.org/repos/asf/flink/blob/80c0c65b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
index b29dd92..8f0aa5d 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
@@ -28,6 +28,7 @@ import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
 
 import org.junit.After;
@@ -210,21 +211,22 @@ public class CEPITCase extends StreamingMultipleProgramsTestBase {
 			Tuple2.of(new Event(2, "middle", 2.0), 1L),
 			Tuple2.of(new Event(3, "end", 3.0), 3L),
 			Tuple2.of(new Event(4, "end", 4.0), 10L),
-			Tuple2.of(new Event(5, "middle", 5.0), 7L)
-		).assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple2<Event,Long>>()
{
-
-			private long currentMaxTimestamp = -1;
+			Tuple2.of(new Event(5, "middle", 5.0), 7L),
+			// last element for high final watermark
+			Tuple2.of(new Event(5, "middle", 5.0), 100L)
+		).assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Tuple2<Event,Long>>()
{
 
 			@Override
 			public long extractTimestamp(Tuple2<Event, Long> element, long previousTimestamp)
{
-				currentMaxTimestamp = Math.max(currentMaxTimestamp, element.f1);
 				return element.f1;
 			}
 
 			@Override
-			public long getCurrentWatermark() {
-				return currentMaxTimestamp - 5;
+			public long checkAndGetNextWatermark(Tuple2<Event, Long> lastElement,
+					long extractedTimestamp) {
+				return lastElement.f1 - 5;
 			}
+
 		}).map(new MapFunction<Tuple2<Event, Long>, Event>() {
 
 			@Override
@@ -295,21 +297,22 @@ public class CEPITCase extends StreamingMultipleProgramsTestBase {
 			Tuple2.of(new Event(2, "end", 2.0), 8L),
 			Tuple2.of(new Event(1, "middle", 5.0), 7L),
 			Tuple2.of(new Event(3, "middle", 6.0), 9L),
-			Tuple2.of(new Event(3, "end", 7.0), 7L)
-		).assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple2<Event,Long>>()
{
-
-			private long currentMaxTimestamp = -1L;
+			Tuple2.of(new Event(3, "end", 7.0), 7L),
+			// last element for high final watermark
+			Tuple2.of(new Event(3, "end", 7.0), 100L)
+		).assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Tuple2<Event,Long>>()
{
 
 			@Override
 			public long extractTimestamp(Tuple2<Event, Long> element, long currentTimestamp)
{
-				currentMaxTimestamp = Math.max(element.f1, currentMaxTimestamp);
 				return element.f1;
 			}
 
 			@Override
-			public long getCurrentWatermark() {
-				return currentMaxTimestamp - 5;
+			public long checkAndGetNextWatermark(Tuple2<Event, Long> lastElement,
+					long extractedTimestamp) {
+				return lastElement.f1 - 5;
 			}
+
 		}).map(new MapFunction<Tuple2<Event, Long>, Event>() {
 
 			@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/80c0c65b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
index b0f933a..ccf0edd 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
@@ -63,15 +63,7 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
extends Abstract
 
 		userFunction.run(ctx);
 
-		// This will mostly emit a final +Inf Watermark to make the Watermark logic work
-		// when some sources finish before others do
 		ctx.close();
-
-		if (executionConfig.areTimestampsEnabled()) {
-			synchronized (lockingObject) {
-				output.emitWatermark(new Watermark(Long.MAX_VALUE));
-			}
-		}
 	}
 
 	public void cancel() {
@@ -268,11 +260,6 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
extends Abstract
 		public void close() {
 			watermarkTimer.cancel(true);
 			scheduleExecutor.shutdownNow();
-			// emit one last +Inf watermark to make downstream watermark processing work
-			// when some sources close early
-			synchronized (lockingObject) {
-				output.emitWatermark(new Watermark(Long.MAX_VALUE));
-			}
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/80c0c65b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CoGroupJoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CoGroupJoinITCase.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CoGroupJoinITCase.java
index 72ea8fb..86dbb94 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CoGroupJoinITCase.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CoGroupJoinITCase.java
@@ -25,7 +25,6 @@ import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
@@ -73,6 +72,9 @@ public class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase
{
 				ctx.collect(Tuple2.of("a", 6));
 				ctx.collect(Tuple2.of("a", 7));
 				ctx.collect(Tuple2.of("a", 8));
+
+				// so we get a final big watermark
+				ctx.collect(Tuple2.of("a", 20));
 			}
 
 			@Override
@@ -92,6 +94,9 @@ public class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase
{
 				ctx.collect(Tuple2.of("c", 6));
 				ctx.collect(Tuple2.of("c", 7));
 				ctx.collect(Tuple2.of("c", 8));
+
+				// so we get a final big watermark
+				ctx.collect(Tuple2.of("a", 20));
 			}
 
 			@Override
@@ -165,6 +170,9 @@ public class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase
{
 				ctx.collect(Tuple3.of("a", "i", 6));
 				ctx.collect(Tuple3.of("a", "j", 7));
 				ctx.collect(Tuple3.of("a", "k", 8));
+
+				// so we get a final big watermark
+				ctx.collect(Tuple3.of("a", "k", 20));
 			}
 
 			@Override
@@ -184,6 +192,9 @@ public class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase
{
 
 				ctx.collect(Tuple3.of("a", "x", 6));
 				ctx.collect(Tuple3.of("a", "z", 8));
+
+				// so we get a final high watermark
+				ctx.collect(Tuple3.of("a", "z", 20));
 			}
 
 			@Override
@@ -259,6 +270,9 @@ public class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase
{
 				ctx.collect(Tuple3.of("a", "i", 6));
 				ctx.collect(Tuple3.of("a", "j", 7));
 				ctx.collect(Tuple3.of("a", "k", 8));
+
+				// so we get a final high watermark
+				ctx.collect(Tuple3.of("a", "k", 20));
 			}
 
 			@Override
@@ -328,19 +342,17 @@ public class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase
{
 		}
 	}
 
-	private static class Tuple3TimestampExtractor implements AssignerWithPeriodicWatermarks<Tuple3<String,
String, Integer>> {
+	private static class Tuple3TimestampExtractor implements AssignerWithPunctuatedWatermarks<Tuple3<String,
String, Integer>> {
 
-		private long currentTimestamp;
-		
 		@Override
 		public long extractTimestamp(Tuple3<String, String, Integer> element, long previousTimestamp)
{
-			currentTimestamp = element.f2;
 			return element.f2;
 		}
 
 		@Override
-		public long getCurrentWatermark() {
-			return currentTimestamp - 1;
+		public long checkAndGetNextWatermark(Tuple3<String, String, Integer> lastElement,
+				long extractedTimestamp) {
+			return lastElement.f2 - 1;
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/80c0c65b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowFoldITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowFoldITCase.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowFoldITCase.java
index 5cc75d5..592b98a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowFoldITCase.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowFoldITCase.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
@@ -72,6 +72,9 @@ public class WindowFoldITCase extends StreamingMultipleProgramsTestBase
{
 				ctx.collect(Tuple2.of("a", 6));
 				ctx.collect(Tuple2.of("a", 7));
 				ctx.collect(Tuple2.of("a", 8));
+
+				// so that we get a high final watermark to process the previously sent elements
+				ctx.collect(Tuple2.of("a", 20));
 			}
 
 			@Override
@@ -135,6 +138,8 @@ public class WindowFoldITCase extends StreamingMultipleProgramsTestBase
{
 				ctx.collect(Tuple2.of("b", 5));
 				ctx.collect(Tuple2.of("a", 5));
 
+				// so that we get a high final watermark to process the previously sent elements
+				ctx.collect(Tuple2.of("a", 20));
 			}
 
 			@Override
@@ -172,19 +177,17 @@ public class WindowFoldITCase extends StreamingMultipleProgramsTestBase
{
 		Assert.assertEquals(expectedResult, testResults);
 	}
 
-	private static class Tuple2TimestampExtractor implements AssignerWithPeriodicWatermarks<Tuple2<String,
Integer>> {
+	private static class Tuple2TimestampExtractor implements AssignerWithPunctuatedWatermarks<Tuple2<String,
Integer>> {
 
-		private long currentTimestamp = -1;
-		
 		@Override
 		public long extractTimestamp(Tuple2<String, Integer> element, long previousTimestamp)
{
-			currentTimestamp = element.f1;
 			return element.f1;
 		}
 
 		@Override
-		public long getCurrentWatermark() {
-			return currentTimestamp - 1;
+		public long checkAndGetNextWatermark(Tuple2<String, Integer> lastElement,
+				long extractedTimestamp) {
+			return lastElement.f1 - 1;
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/80c0c65b/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
index 1416104..0d56c5c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
@@ -50,6 +50,7 @@ import org.junit.Test;
 import java.util.ArrayList;
 import java.util.List;
 
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -144,9 +145,8 @@ public class TimestampITCase {
 
 		// verify that all the watermarks arrived at the final custom operator
 		for (int i = 0; i < PARALLELISM; i++) {
-			// we are only guaranteed to see NUM_WATERMARKS / 2 watermarks in order, because
-			// after that source2 emits Long.MAX_VALUE which could match with an arbitrary
-			// mark from source 1, for example, we could see 0,1,2,4,5,7,MAX
+			// we are only guaranteed to see NUM_WATERMARKS / 2 watermarks because the
+			// other source stops emitting after that
 			for (int j = 0; j < NUM_WATERMARKS / 2; j++) {
 				if (!CustomOperator.finalWatermarks[i].get(j).equals(new Watermark(initialTime + j)))
{
 					System.err.println("All Watermarks: ");
@@ -157,13 +157,7 @@ public class TimestampITCase {
 					Assert.fail("Wrong watermark.");
 				}
 			}
-			if (!CustomOperator.finalWatermarks[i].get(CustomOperator.finalWatermarks[i].size() -
1).equals(new Watermark(Long.MAX_VALUE))) {
-				System.err.println("All Watermarks: ");
-				for (int k = 0; k <= NUM_WATERMARKS; k++) {
-					System.err.println(CustomOperator.finalWatermarks[i].get(k));
-				}
-				Assert.fail("Wrong watermark.");
-			}
+			assertFalse(CustomOperator.finalWatermarks[i].get(CustomOperator.finalWatermarks[i].size()-1).equals(new
Watermark(Long.MAX_VALUE)));
 		}
 	}
 
@@ -286,9 +280,7 @@ public class TimestampITCase {
 				Assert.fail("Wrong watermark. Expected: " + j + " Found: " + wm + " All: " + CustomOperator.finalWatermarks[0]);
 			}
 		}
-		if (!CustomOperator.finalWatermarks[0].get(NUM_ELEMENTS).equals(new Watermark(Long.MAX_VALUE)))
{
-			Assert.fail("Wrong watermark.");
-		}
+		assertFalse(CustomOperator.finalWatermarks[0].get(CustomOperator.finalWatermarks[0].size()-1).equals(new
Watermark(Long.MAX_VALUE)));
 	}
 
 	/**
@@ -346,9 +338,7 @@ public class TimestampITCase {
 				Assert.fail("Wrong watermark.");
 			}
 		}
-		if (!CustomOperator.finalWatermarks[0].get(NUM_ELEMENTS).equals(new Watermark(Long.MAX_VALUE)))
{
-			Assert.fail("Wrong watermark.");
-		}
+		assertFalse(CustomOperator.finalWatermarks[0].get(CustomOperator.finalWatermarks[0].size()-1).equals(new
Watermark(Long.MAX_VALUE)));
 	}
 
 	/**
@@ -408,9 +398,7 @@ public class TimestampITCase {
 				Assert.fail("Wrong watermark.");
 			}
 		}
-		if (!CustomOperator.finalWatermarks[0].get(NUM_ELEMENTS).equals(new Watermark(Long.MAX_VALUE)))
{
-			Assert.fail("Wrong watermark.");
-		}
+		assertFalse(CustomOperator.finalWatermarks[0].get(CustomOperator.finalWatermarks[0].size()-1).equals(new
Watermark(Long.MAX_VALUE)));
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/80c0c65b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala
index 5f10eac..416c1de 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala
@@ -53,6 +53,9 @@ class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase {
         ctx.collect(("a", 6))
         ctx.collect(("a", 7))
         ctx.collect(("a", 8))
+
+        // so that we get a high final watermark to process the previously sent elements
+        ctx.collect(("a", 20))
       }
 
       def cancel() {}
@@ -67,6 +70,9 @@ class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase {
         ctx.collect(("c", 6))
         ctx.collect(("c", 7))
         ctx.collect(("c", 8))
+
+        // so that we get a high final watermark to process the previously sent elements
+        ctx.collect(("c", 20))
       }
 
       def cancel() {
@@ -117,6 +123,9 @@ class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase {
         ctx.collect(("a", "i", 6))
         ctx.collect(("a", "j", 7))
         ctx.collect(("a", "k", 8))
+
+        // so that we get a high final watermark to process the previously sent elements
+        ctx.collect(("a", "k", 20))
       }
 
       def cancel() {}
@@ -133,6 +142,9 @@ class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase {
 
         ctx.collect(("a", "x", 6))
         ctx.collect(("a", "z", 8))
+
+        // so that we get a high final watermark to process the previously sent elements
+        ctx.collect(("a", "z", 20))
       }
 
       def cancel() {}
@@ -193,6 +205,9 @@ class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase {
         ctx.collect(("a", "i", 6))
         ctx.collect(("a", "j", 7))
         ctx.collect(("a", "k", 8))
+
+        // so that we get a high final watermark to process the previously sent elements
+        ctx.collect(("a", "k", 20))
       }
 
       def cancel() {}

http://git-wip-us.apache.org/repos/asf/flink/blob/80c0c65b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala
index f358ac6..ae7b61f 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala
@@ -21,7 +21,7 @@ package org.apache.flink.streaming.api.scala
 import java.util.concurrent.TimeUnit
 
 import org.apache.flink.streaming.api.TimeCharacteristic
-import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
+import org.apache.flink.streaming.api.functions.{AssignerWithPunctuatedWatermarks, AssignerWithPeriodicWatermarks}
 import org.apache.flink.streaming.api.functions.sink.SinkFunction
 import org.apache.flink.streaming.api.functions.source.SourceFunction
 import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows
@@ -57,6 +57,9 @@ class WindowFoldITCase extends StreamingMultipleProgramsTestBase {
         ctx.collect(("a", 6))
         ctx.collect(("a", 7))
         ctx.collect(("a", 8))
+
+        // so we get a big watermark to trigger processing of the previous elements
+        ctx.collect(("a", 20))
       }
 
       def cancel() {
@@ -102,6 +105,9 @@ class WindowFoldITCase extends StreamingMultipleProgramsTestBase {
         ctx.collect(("a", 4))
         ctx.collect(("b", 5))
         ctx.collect(("a", 5))
+
+        // so we get a big watermark to trigger processing of the previous elements
+        ctx.collect(("a", 20))
       }
 
       def cancel() {
@@ -132,7 +138,7 @@ class WindowFoldITCase extends StreamingMultipleProgramsTestBase {
 object WindowFoldITCase {
   private var testResults: mutable.MutableList[String] = null
 
-  private class Tuple2TimestampExtractor extends AssignerWithPeriodicWatermarks[(String,
Int)] {
+  private class Tuple2TimestampExtractor extends AssignerWithPunctuatedWatermarks[(String,
Int)] {
     
     private var currentTimestamp = -1L
     
@@ -141,8 +147,8 @@ object WindowFoldITCase {
       currentTimestamp
     }
 
-    override def getCurrentWatermark(): Long = {
-      currentTimestamp - 1
+    def checkAndGetNextWatermark(lastElement: (String, Int), extractedTimestamp: Long): Long
= {
+      lastElement._2 - 1
     }
   }
 }


Mime
View raw message