flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [1/3] flink git commit: [FLINK-3379] [FLINK-3415] [streaming] Refactor TimestampExtractor into two separate classes
Date Wed, 17 Feb 2016 11:19:47 GMT
Repository: flink
Updated Branches:
  refs/heads/master c1412dde3 -> e08d7a6f3


http://git-wip-us.apache.org/repos/asf/flink/blob/74c2b80b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowFoldITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowFoldITCase.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowFoldITCase.java
index fb7142b..5cc75d5 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowFoldITCase.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowFoldITCase.java
@@ -1,37 +1,39 @@
 /*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements.  See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License.  You may obtain a copy of the License at
-*
-*    http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 
 package org.apache.flink.streaming.runtime.operators.windowing;
 
-import com.google.common.collect.Lists;
 import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.TimestampExtractor;
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
@@ -40,6 +42,7 @@ import java.util.concurrent.TimeUnit;
  * Tests for Folds over windows. These also test whether OutputTypeConfigurable functions
  * work for windows, because FoldWindowFunction is OutputTypeConfigurable.
  */
+@SuppressWarnings("serial")
 public class WindowFoldITCase extends StreamingMultipleProgramsTestBase {
 
 	private static List<String> testResults;
@@ -47,7 +50,7 @@ public class WindowFoldITCase extends StreamingMultipleProgramsTestBase
{
 	@Test
 	public void testFoldWindow() throws Exception {
 
-		testResults = Lists.newArrayList();
+		testResults = new ArrayList<>();
 
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
@@ -74,7 +77,7 @@ public class WindowFoldITCase extends StreamingMultipleProgramsTestBase
{
 			@Override
 			public void cancel() {
 			}
-		}).assignTimestamps(new Tuple2TimestampExtractor());
+		}).assignTimestampsAndWatermarks(new Tuple2TimestampExtractor());
 
 		source1
 				.keyBy(0)
@@ -97,7 +100,7 @@ public class WindowFoldITCase extends StreamingMultipleProgramsTestBase
{
 
 		env.execute("Fold Window Test");
 
-		List<String> expectedResult = Lists.newArrayList(
+		List<String> expectedResult = Arrays.asList(
 				"(R:aaa,3)",
 				"(R:aaa,21)",
 				"(R:bbb,12)");
@@ -111,14 +114,13 @@ public class WindowFoldITCase extends StreamingMultipleProgramsTestBase
{
 	@Test
 	public void testFoldAllWindow() throws Exception {
 
-		testResults = Lists.newArrayList();
+		testResults = new ArrayList<>();
 
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
 		env.setParallelism(1);
 
 		DataStream<Tuple2<String, Integer>> source1 = env.addSource(new SourceFunction<Tuple2<String,
Integer>>() {
-			private static final long serialVersionUID = 1L;
 
 			@Override
 			public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception
{
@@ -138,7 +140,7 @@ public class WindowFoldITCase extends StreamingMultipleProgramsTestBase
{
 			@Override
 			public void cancel() {
 			}
-		}).assignTimestamps(new Tuple2TimestampExtractor());
+		}).assignTimestampsAndWatermarks(new Tuple2TimestampExtractor());
 
 		source1
 				.windowAll(TumblingTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
@@ -160,7 +162,7 @@ public class WindowFoldITCase extends StreamingMultipleProgramsTestBase
{
 
 		env.execute("Fold All-Window Test");
 
-		List<String> expectedResult = Lists.newArrayList(
+		List<String> expectedResult = Arrays.asList(
 				"(R:aaa,3)",
 				"(R:bababa,24)");
 
@@ -170,22 +172,19 @@ public class WindowFoldITCase extends StreamingMultipleProgramsTestBase
{
 		Assert.assertEquals(expectedResult, testResults);
 	}
 
-	private static class Tuple2TimestampExtractor implements TimestampExtractor<Tuple2<String,
Integer>> {
-		private static final long serialVersionUID = 1L;
+	private static class Tuple2TimestampExtractor implements AssignerWithPeriodicWatermarks<Tuple2<String,
Integer>> {
 
+		private long currentTimestamp = -1;
+		
 		@Override
-		public long extractTimestamp(Tuple2<String, Integer> element, long currentTimestamp)
{
+		public long extractTimestamp(Tuple2<String, Integer> element, long previousTimestamp)
{
+			currentTimestamp = element.f1;
 			return element.f1;
 		}
 
 		@Override
-		public long extractWatermark(Tuple2<String, Integer> element, long currentTimestamp)
{
-			return element.f1 - 1;
-		}
-
-		@Override
 		public long getCurrentWatermark() {
-			return Long.MIN_VALUE;
+			return currentTimestamp - 1;
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/74c2b80b/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
index eed455c..677636a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.timestamp;
 
 import org.apache.flink.api.common.functions.MapFunction;
@@ -27,7 +28,8 @@ import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.AscendingTimestampExtractor;
-import org.apache.flink.streaming.api.functions.TimestampExtractor;
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
 import org.apache.flink.streaming.api.functions.co.CoMapFunction;
 import org.apache.flink.streaming.api.functions.source.EventTimeSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
@@ -38,6 +40,7 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.NoOpSink;
 import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
@@ -245,8 +248,8 @@ public class TimestampITCase {
 		DataStream<Integer> source1 = env.addSource(new SourceFunction<Integer>() {
 			@Override
 			public void run(SourceContext<Integer> ctx) throws Exception {
-				int index = 0;
-				while (index < NUM_ELEMENTS) {
+				int index = 1;
+				while (index <= NUM_ELEMENTS) {
 					ctx.collect(index);
 					latch.await();
 					index++;
@@ -254,9 +257,7 @@ public class TimestampITCase {
 			}
 
 			@Override
-			public void cancel() {
-
-			}
+			public void cancel() {}
 		});
 
 		DataStream<Integer> extractOp = source1.assignTimestamps(
@@ -280,8 +281,9 @@ public class TimestampITCase {
 
 		// verify that we get NUM_ELEMENTS watermarks
 		for (int j = 0; j < NUM_ELEMENTS; j++) {
-			if (!CustomOperator.finalWatermarks[0].get(j).equals(new Watermark(j - 1))) {
-				Assert.fail("Wrong watermark.");
+			if (!CustomOperator.finalWatermarks[0].get(j).equals(new Watermark(j))) {
+				long wm = CustomOperator.finalWatermarks[0].get(j).getTimestamp();
+				Assert.fail("Wrong watermark. Expected: " + j + " Found: " + wm + " All: " + CustomOperator.finalWatermarks[0]);
 			}
 		}
 		if (!CustomOperator.finalWatermarks[0].get(NUM_ELEMENTS).equals(new Watermark(Long.MAX_VALUE)))
{
@@ -307,8 +309,8 @@ public class TimestampITCase {
 		DataStream<Integer> source1 = env.addSource(new SourceFunction<Integer>() {
 			@Override
 			public void run(SourceContext<Integer> ctx) throws Exception {
-				int index = 0;
-				while (index < NUM_ELEMENTS) {
+				int index = 1;
+				while (index <= NUM_ELEMENTS) {
 					ctx.collect(index);
 					latch.await();
 					index++;
@@ -316,27 +318,22 @@ public class TimestampITCase {
 			}
 
 			@Override
-			public void cancel() {
-
-			}
+			public void cancel() {}
 		});
 
-		source1.assignTimestamps(new TimestampExtractor<Integer>() {
-			@Override
-			public long extractTimestamp(Integer element, long currentTimestamp) {
-				return element;
-			}
-
-			@Override
-			public long extractWatermark(Integer element, long currentTimestamp) {
-				return element - 1;
-			}
+		source1
+				.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Integer>()
{
+					
+					@Override
+					public long extractTimestamp(Integer element, long currentTimestamp) {
+						return element;
+					}
 
-			@Override
-			public long getCurrentWatermark() {
-				return Long.MIN_VALUE;
-			}
-		})
+					@Override
+					public long checkAndGetNextWatermark(Integer element, long extractedTimestamp) {
+						return extractedTimestamp - 1;
+					}
+				})
 				.transform("Watermark Check", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator(true))
 				.transform("Timestamp Check", BasicTypeInfo.INT_TYPE_INFO, new TimestampCheckingOperator());
 
@@ -345,7 +342,7 @@ public class TimestampITCase {
 
 		// verify that we get NUM_ELEMENTS watermarks
 		for (int j = 0; j < NUM_ELEMENTS; j++) {
-			if (!CustomOperator.finalWatermarks[0].get(j).equals(new Watermark(j - 1))) {
+			if (!CustomOperator.finalWatermarks[0].get(j).equals(new Watermark(j))) {
 				Assert.fail("Wrong watermark.");
 			}
 		}
@@ -372,8 +369,8 @@ public class TimestampITCase {
 		DataStream<Integer> source1 = env.addSource(new SourceFunction<Integer>() {
 			@Override
 			public void run(SourceContext<Integer> ctx) throws Exception {
-				int index = 0;
-				while (index < NUM_ELEMENTS) {
+				int index = 1;
+				while (index <= NUM_ELEMENTS) {
 					ctx.collect(index);
 					Thread.sleep(100);
 					ctx.collect(index - 1);
@@ -383,27 +380,22 @@ public class TimestampITCase {
 			}
 
 			@Override
-			public void cancel() {
-
-			}
+			public void cancel() {}
 		});
 
-		source1.assignTimestamps(new TimestampExtractor<Integer>() {
-			@Override
-			public long extractTimestamp(Integer element, long currentTimestamp) {
-				return element;
-			}
+		source1
+				.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Integer>()
{
 
-			@Override
-			public long extractWatermark(Integer element, long currentTimestamp) {
-				return element - 1;
-			}
+					@Override
+					public long extractTimestamp(Integer element, long previousTimestamp) {
+						return element;
+					}
 
-			@Override
-			public long getCurrentWatermark() {
-				return Long.MIN_VALUE;
-			}
-		})
+					@Override
+					public long checkAndGetNextWatermark(Integer element, long extractedTimestamp) {
+						return extractedTimestamp - 1;
+					}
+				})
 				.transform("Watermark Check", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator(true))
 				.transform("Timestamp Check", BasicTypeInfo.INT_TYPE_INFO, new TimestampCheckingOperator());
 
@@ -412,7 +404,7 @@ public class TimestampITCase {
 
 		// verify that we get NUM_ELEMENTS watermarks
 		for (int j = 0; j < NUM_ELEMENTS; j++) {
-			if (!CustomOperator.finalWatermarks[0].get(j).equals(new Watermark(j - 1))) {
+			if (!CustomOperator.finalWatermarks[0].get(j).equals(new Watermark(j))) {
 				Assert.fail("Wrong watermark.");
 			}
 		}
@@ -453,30 +445,84 @@ public class TimestampITCase {
 			}
 
 			@Override
-			public void cancel() {
-
-			}
+			public void cancel() {}
 		});
 
-		source1.assignTimestamps(new TimestampExtractor<Integer>() {
-			@Override
-			public long extractTimestamp(Integer element, long currentTimestamp) {
-				return element;
-			}
+		source1
+				.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Integer>()
{
+
+					@Override
+					public long extractTimestamp(Integer element, long currentTimestamp) {
+						return element;
+					}
+
+					@Override
+					public long checkAndGetNextWatermark(Integer element, long extractedTimestamp) {
+						return -1L;
+					}
+				})
+			.transform("Watermark Check", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator(true));
+
 
+		env.execute();
+
+		Assert.assertTrue(CustomOperator.finalWatermarks[0].size() == 1);
+		Assert.assertTrue(CustomOperator.finalWatermarks[0].get(0).getTimestamp() == Long.MAX_VALUE);
+	}
+
+	/**
+	 * This test verifies that the timestamp extractor forwards Long.MAX_VALUE watermarks.
+	 * 
+	 * Same test as before, but using a different timestamp extractor
+	 */
+	@Test
+	public void testTimestampExtractorWithLongMaxWatermarkFromSource2() throws Exception {
+		final int NUM_ELEMENTS = 10;
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment
+				.createRemoteEnvironment("localhost", cluster.getLeaderRPCPort());
+		
+		env.setParallelism(2);
+		env.getConfig().disableSysoutLogging();
+		env.getConfig().enableTimestamps();
+		env.getConfig().setAutoWatermarkInterval(10);
+
+		DataStream<Integer> source1 = env.addSource(new EventTimeSourceFunction<Integer>()
{
 			@Override
-			public long extractWatermark(Integer element, long currentTimestamp) {
-				return Long.MIN_VALUE;
+			public void run(SourceContext<Integer> ctx) throws Exception {
+				int index = 0;
+				while (index < NUM_ELEMENTS) {
+					ctx.collectWithTimestamp(index, index);
+					ctx.collectWithTimestamp(index - 1, index - 1);
+					index++;
+					ctx.emitWatermark(new Watermark(index-2));
+				}
+
+				// emit the final Long.MAX_VALUE watermark, do it twice and verify that
+				// we only see one in the result
+				ctx.emitWatermark(new Watermark(Long.MAX_VALUE));
+				ctx.emitWatermark(new Watermark(Long.MAX_VALUE));
 			}
 
 			@Override
-			public long getCurrentWatermark() {
-				return Long.MIN_VALUE;
-			}
-		})
-			.transform("Watermark Check", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator(true));
+			public void cancel() {}
+		});
+
+		source1
+				.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Integer>() {
 
+					@Override
+					public long extractTimestamp(Integer element, long currentTimestamp) {
+						return element;
+					}
 
+					@Override
+					public long getCurrentWatermark() {
+						return -1L;
+					}
+				})
+				.transform("Watermark Check", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator(true));
+		
 		env.execute();
 
 		Assert.assertTrue(CustomOperator.finalWatermarks[0].size() == 1);
@@ -625,8 +671,7 @@ public class TimestampITCase {
 		}
 
 		@Override
-		public void processWatermark(Watermark mark) throws Exception {
-		}
+		public void processWatermark(Watermark mark) throws Exception {}
 	}
 
 	public static class DisabledTimestampCheckingOperator extends AbstractStreamOperator<Integer>
implements OneInputStreamOperator<Integer, Integer> {
@@ -640,8 +685,7 @@ public class TimestampITCase {
 		}
 
 		@Override
-		public void processWatermark(Watermark mark) throws Exception {
-		}
+		public void processWatermark(Watermark mark) throws Exception {}
 	}
 
 	public static class IdentityCoMap implements CoMapFunction<Integer, Integer, Integer>
{
@@ -682,9 +726,7 @@ public class TimestampITCase {
 		}
 
 		@Override
-		public void cancel() {
-
-		}
+		public void cancel() {}
 	}
 
 	public static class MyNonWatermarkingSource implements SourceFunction<Integer> {
@@ -703,9 +745,7 @@ public class TimestampITCase {
 		}
 
 		@Override
-		public void cancel() {
-
-		}
+		public void cancel() {}
 	}
 
 	// This is a event-time source. This should only emit elements with timestamps. The test
should
@@ -720,9 +760,7 @@ public class TimestampITCase {
 		}
 
 		@Override
-		public void cancel() {
-
-		}
+		public void cancel() {}
 	}
 
 	// This is a normal source. This should only emit elements without timestamps. The test
should
@@ -737,9 +775,7 @@ public class TimestampITCase {
 		}
 
 		@Override
-		public void cancel() {
-
-		}
+		public void cancel() {}
 	}
 
 	// This is a normal source. This should only emit elements without timestamps. This also
@@ -755,8 +791,6 @@ public class TimestampITCase {
 		}
 
 		@Override
-		public void cancel() {
-
-		}
+		public void cancel() {}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/74c2b80b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
index 1e2d3ec..c484eae 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
@@ -33,6 +33,7 @@ import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.operators.Triggerable;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import org.mockito.invocation.InvocationOnMock;
@@ -42,6 +43,7 @@ import java.util.Collection;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -67,15 +69,17 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
 	final Object checkpointLock;
 
 	StreamTask<?, ?> mockTask;
-
-	AbstractStateBackend stateBackend;
 	
 	
 	public OneInputStreamOperatorTestHarness(OneInputStreamOperator<IN, OUT> operator)
{
+		this(operator, new ExecutionConfig());
+	}
+	
+	public OneInputStreamOperatorTestHarness(OneInputStreamOperator<IN, OUT> operator,
ExecutionConfig executionConfig) {
 		this.operator = operator;
 		this.outputList = new ConcurrentLinkedQueue<Object>();
 		this.config = new StreamConfig(new Configuration());
-		this.executionConfig = new ExecutionConfig();
+		this.executionConfig = executionConfig;
 		this.checkpointLock = new Object();
 
 		final Environment env = new MockEnvironment("MockTwoInputTask", 3 * 1024 * 1024, new MockInputSplitProvider(),
1024);
@@ -100,6 +104,35 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
 		} catch (Exception e) {
 			e.printStackTrace();
 		}
+		
+		doAnswer(new Answer<Void>() {
+			@Override
+			public Void answer(InvocationOnMock invocation) throws Throwable {
+				final long execTime = (Long) invocation.getArguments()[0];
+				final Triggerable target = (Triggerable) invocation.getArguments()[1];
+				
+				Thread caller = new Thread() {
+					@Override
+					public void run() {
+						final long delay = execTime - System.currentTimeMillis();
+						if (delay > 0) {
+							try {
+								Thread.sleep(delay);
+							} catch (InterruptedException ignored) {}
+						}
+						
+						synchronized (checkpointLock) {
+							try {
+								target.trigger(execTime);
+							} catch (Exception ignored) {}
+						}
+					}
+				};
+				caller.start();
+				
+				return null;
+			}
+		}).when(mockTask).registerTimer(anyLong(), any(Triggerable.class));
 	}
 
 	public <K> void configureForKeyedStream(KeySelector<IN, K> keySelector, TypeInformation<K>
keyType) {

http://git-wip-us.apache.org/repos/asf/flink/blob/74c2b80b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index cb84187..4b019f9 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -30,7 +30,7 @@ import org.apache.flink.core.fs.{FileSystem, Path}
 import org.apache.flink.streaming.api.collector.selector.OutputSelector
 import org.apache.flink.streaming.api.datastream.{AllWindowedStream => JavaAllWindowedStream,
DataStream => JavaStream, KeyedStream => JavaKeyedStream, _}
 import org.apache.flink.streaming.api.functions.sink.SinkFunction
-import org.apache.flink.streaming.api.functions.{AscendingTimestampExtractor, TimestampExtractor}
+import org.apache.flink.streaming.api.functions.{AssignerWithPunctuatedWatermarks, AssignerWithPeriodicWatermarks,
AscendingTimestampExtractor, TimestampExtractor}
 import org.apache.flink.streaming.api.windowing.assigners._
 import org.apache.flink.streaming.api.windowing.time.Time
 import org.apache.flink.streaming.api.windowing.windows.{GlobalWindow, TimeWindow, Window}
@@ -629,6 +629,7 @@ class DataStream[T](stream: JavaStream[T]) {
   def windowAll[W <: Window](assigner: WindowAssigner[_ >: T, W]): AllWindowedStream[T,
W] = {
     new AllWindowedStream[T, W](new JavaAllWindowedStream[T, W](stream, assigner))
   }
+  
   /**
    * Extracts a timestamp from an element and assigns it as the internal timestamp of that
element.
    * The internal timestamps are, for example, used to to event-time window operations.
@@ -640,21 +641,82 @@ class DataStream[T](stream: JavaStream[T]) {
    *
    * @see org.apache.flink.streaming.api.watermark.Watermark
    */
-  @PublicEvolving
+  @deprecated
   def assignTimestamps(extractor: TimestampExtractor[T]): DataStream[T] = {
     stream.assignTimestamps(clean(extractor))
   }
 
   /**
-   * Extracts a timestamp from an element and assigns it as the internal timestamp of that
element.
-   * The internal timestamps are, for example, used to to event-time window operations.
+   * Assigns timestamps to the elements in the data stream and periodically creates
+   * watermarks to signal event time progress.
    *
-   * If you know that the timestamps are strictly increasing you can use an
-   * [[org.apache.flink.streaming.api.functions.AscendingTimestampExtractor]]. Otherwise,
-   * you should provide a [[TimestampExtractor]] that also implements
-   * [[TimestampExtractor#getCurrentWatermark]] to keep track of watermarks.
+   * This method creates watermarks periodically (for example every second), based
+   * on the watermarks indicated by the given watermark generator. Even when no new elements
+   * in the stream arrive, the given watermark generator will be periodically checked for
+   * new watermarks. The interval in which watermarks are generated is defined in
+   * [[org.apache.flink.api.common.ExecutionConfig#setAutoWatermarkInterval(long)]].
    *
-   * @see org.apache.flink.streaming.api.watermark.Watermark
+   * Use this method for the common cases, where some characteristic over all elements
+   * should generate the watermarks, or where watermarks are simply trailing behind the
+   * wall clock time by a certain amount.
+   *
+   * For cases where watermarks should be created in an irregular fashion, for example
+   * based on certain markers that some element carry, use the
+   * [[AssignerWithPunctuatedWatermarks]].
+   *
+   * @see AssignerWithPeriodicWatermarks
+   * @see AssignerWithPunctuatedWatermarks
+   * @see #assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks) 
+   */
+  @PublicEvolving
+  def assignTimestampsAndWatermarks(assigner: AssignerWithPeriodicWatermarks[T]) 
+      : DataStream[T] = {
+    
+    stream.assignTimestampsAndWatermarks(assigner)
+  }
+
+  /**
+   * Assigns timestamps to the elements in the data stream and periodically creates
+   * watermarks to signal event time progress.
+   *
+   * This method creates watermarks based purely on stream elements. For each element
+   * that is handled via [[AssignerWithPunctuatedWatermarks#extractTimestamp(Object, long)]],
+   * the [[AssignerWithPunctuatedWatermarks#checkAndGetNextWatermark()]] method is called,
+   * and a new watermark is emitted, if the returned watermark value is larger than the previous
+   * watermark.
+   *
+   * This method is useful when the data stream embeds watermark elements, or certain elements
+   * carry a marker that can be used to determine the current event time watermark. 
+   * This operation gives the programmer full control over the watermark generation. Users
+   * should be aware that too aggressive watermark generation (i.e., generating hundreds
of
+   * watermarks every second) can cost some performance.
+   *
+   * For cases where watermarks should be created in a regular fashion, for example
+   * every x milliseconds, use the [[AssignerWithPeriodicWatermarks]].
+   *
+   * @see AssignerWithPunctuatedWatermarks
+   * @see AssignerWithPeriodicWatermarks
+   * @see #assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks) 
+   */
+  @PublicEvolving
+  def assignTimestampsAndWatermarks(assigner: AssignerWithPunctuatedWatermarks[T])
+      : DataStream[T] = {
+    
+    stream.assignTimestampsAndWatermarks(assigner)
+  }
+
+  /**
+   * Assigns timestamps to the elements in the data stream and periodically creates
+   * watermarks to signal event time progress.
+   * 
+   * This method is a shortcut for data streams where the element timestamp are known
+   * to be monotonously ascending within each parallel stream.
+   * In that case, the system can generate watermarks automatically and perfectly
+   * by tracking the ascending timestamps.
+   * 
+   * For cases where the timestamps are not monotonously increasing, use the more
+   * general methods [[assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks)]]
+   * and [[assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks)]].
    */
   @PublicEvolving
   def assignAscendingTimestamps(extractor: T => Long): DataStream[T] = {

http://git-wip-us.apache.org/repos/asf/flink/blob/74c2b80b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala
index 3c1e9c3..5f10eac 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala
@@ -21,7 +21,7 @@ package org.apache.flink.streaming.api.scala
 import java.util.concurrent.TimeUnit
 
 import org.apache.flink.streaming.api.TimeCharacteristic
-import org.apache.flink.streaming.api.functions.TimestampExtractor
+import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
 import org.apache.flink.streaming.api.functions.sink.SinkFunction
 import org.apache.flink.streaming.api.functions.source.SourceFunction
 import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows
@@ -55,9 +55,9 @@ class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase {
         ctx.collect(("a", 8))
       }
 
-      def cancel() {
-      }
-    }).assignTimestamps(new CoGroupJoinITCase.Tuple2TimestampExtractor)
+      def cancel() {}
+      
+    }).assignTimestampsAndWatermarks(new CoGroupJoinITCase.Tuple2TimestampExtractor)
 
     val source2 = env.addSource(new SourceFunction[(String, Int)]() {
       def run(ctx: SourceFunction.SourceContext[(String, Int)]) {
@@ -71,7 +71,7 @@ class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase {
 
       def cancel() {
       }
-    }).assignTimestamps(new CoGroupJoinITCase.Tuple2TimestampExtractor)
+    }).assignTimestampsAndWatermarks(new CoGroupJoinITCase.Tuple2TimestampExtractor)
 
     source1.coGroup(source2)
       .where(_._1)
@@ -119,9 +119,9 @@ class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase {
         ctx.collect(("a", "k", 8))
       }
 
-      def cancel() {
-      }
-    }).assignTimestamps(new CoGroupJoinITCase.Tuple3TimestampExtractor)
+      def cancel() {}
+      
+    }).assignTimestampsAndWatermarks(new CoGroupJoinITCase.Tuple3TimestampExtractor)
 
     val source2 = env.addSource(new SourceFunction[(String, String, Int)]() {
       def run(ctx: SourceFunction.SourceContext[(String, String, Int)]) {
@@ -135,9 +135,9 @@ class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase {
         ctx.collect(("a", "z", 8))
       }
 
-      def cancel() {
-      }
-    }).assignTimestamps(new CoGroupJoinITCase.Tuple3TimestampExtractor)
+      def cancel() {}
+      
+    }).assignTimestampsAndWatermarks(new CoGroupJoinITCase.Tuple3TimestampExtractor)
 
     source1.join(source2)
       .where(_._1)
@@ -195,9 +195,9 @@ class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase {
         ctx.collect(("a", "k", 8))
       }
 
-      def cancel() {
-      }
-    }).assignTimestamps(new CoGroupJoinITCase.Tuple3TimestampExtractor)
+      def cancel() {}
+      
+    }).assignTimestampsAndWatermarks(new CoGroupJoinITCase.Tuple3TimestampExtractor)
 
     source1.join(source1)
       .where(_._1)
@@ -245,31 +245,25 @@ class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase {
 object CoGroupJoinITCase {
   private var testResults: mutable.MutableList[String] = null
 
-  private class Tuple2TimestampExtractor extends TimestampExtractor[(String, Int)] {
-    def extractTimestamp(element: (String, Int), currentTimestamp: Long): Long = {
+  private class Tuple2TimestampExtractor extends AssignerWithPunctuatedWatermarks[(String,
Int)] {
+    
+    override def extractTimestamp(element: (String, Int), previousTimestamp: Long): Long
= {
       element._2
     }
 
-    def extractWatermark(element: (String, Int), currentTimestamp: Long): Long = {
-      element._2 - 1
-    }
-
-    def getCurrentWatermark: Long = {
-      Long.MinValue
-    }
+    override def checkAndGetNextWatermark(
+        lastElement: (String, Int),
+        extractedTimestamp: Long): Long = extractedTimestamp - 1
   }
 
-  private class Tuple3TimestampExtractor extends TimestampExtractor[(String, String, Int)]
{
-    def extractTimestamp(element: (String, String, Int), currentTimestamp: Long): Long =
{
-      element._3
-    }
+  private class Tuple3TimestampExtractor extends 
+        AssignerWithPunctuatedWatermarks[(String, String, Int)] {
+    
+    override def extractTimestamp(element: (String, String, Int), previousTimestamp: Long):
Long
+         = element._3
 
-    def extractWatermark(element: (String, String, Int), currentTimestamp: Long): Long =
{
-      element._3 - 1
-    }
-
-    def getCurrentWatermark: Long = {
-      Long.MinValue
-    }
+    override def checkAndGetNextWatermark(
+        lastElement: (String, String, Int),
+        extractedTimestamp: Long): Long = extractedTimestamp - 1
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/74c2b80b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala
index d4e8bb2..f358ac6 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala
@@ -21,7 +21,7 @@ package org.apache.flink.streaming.api.scala
 import java.util.concurrent.TimeUnit
 
 import org.apache.flink.streaming.api.TimeCharacteristic
-import org.apache.flink.streaming.api.functions.TimestampExtractor
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
 import org.apache.flink.streaming.api.functions.sink.SinkFunction
 import org.apache.flink.streaming.api.functions.source.SourceFunction
 import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows
@@ -61,7 +61,7 @@ class WindowFoldITCase extends StreamingMultipleProgramsTestBase {
 
       def cancel() {
       }
-    }).assignTimestamps(new WindowFoldITCase.Tuple2TimestampExtractor)
+    }).assignTimestampsAndWatermarks(new WindowFoldITCase.Tuple2TimestampExtractor)
 
     source1
       .keyBy(0)
@@ -106,7 +106,7 @@ class WindowFoldITCase extends StreamingMultipleProgramsTestBase {
 
       def cancel() {
       }
-    }).assignTimestamps(new WindowFoldITCase.Tuple2TimestampExtractor)
+    }).assignTimestampsAndWatermarks(new WindowFoldITCase.Tuple2TimestampExtractor)
 
     source1
       .windowAll(TumblingTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
@@ -132,17 +132,17 @@ class WindowFoldITCase extends StreamingMultipleProgramsTestBase {
 object WindowFoldITCase {
   private var testResults: mutable.MutableList[String] = null
 
-  private class Tuple2TimestampExtractor extends TimestampExtractor[(String, Int)] {
-    def extractTimestamp(element: (String, Int), currentTimestamp: Long): Long = {
-      element._2
+  private class Tuple2TimestampExtractor extends AssignerWithPeriodicWatermarks[(String,
Int)] {
+    
+    private var currentTimestamp = -1L
+    
+    override def extractTimestamp(element: (String, Int), previousTimestamp: Long): Long
= {
+      currentTimestamp = element._2
+      currentTimestamp
     }
 
-    def extractWatermark(element: (String, Int), currentTimestamp: Long): Long = {
-      element._2 - 1
-    }
-
-    def getCurrentWatermark: Long = {
-      Long.MinValue
+    override def getCurrentWatermark(): Long = {
+      currentTimestamp - 1
     }
   }
 }


Mime
View raw message