flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject flink git commit: [FLINK-4651] Ensure processing-time timers are set on restore
Date Mon, 09 Jan 2017 17:15:19 GMT
Repository: flink
Updated Branches:
  refs/heads/master 9cff8c92b -> 36cf7eb2d


[FLINK-4651] Ensure processing-time timers are set on restore

This test ensures that we set a low-level processing time timer in case
we have processing-time timers set.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/36cf7eb2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/36cf7eb2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/36cf7eb2

Branch: refs/heads/master
Commit: 36cf7eb2de5be992f1716360b7ca4f010a7c2a58
Parents: 9cff8c9
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Mon Jan 9 16:01:23 2017 +0100
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Mon Jan 9 18:14:16 2017 +0100

----------------------------------------------------------------------
 .../operators/AbstractStreamOperatorTest.java   | 51 ++++++++++++++++++++
 1 file changed, 51 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/36cf7eb2/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
index 2fb0089..2844fbb 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
@@ -141,6 +141,57 @@ public class AbstractStreamOperatorTest {
 	}
 
 	/**
+	 * Verify that a low-level timer is set for processing-time timers in case of restore.
+	 */
+	@Test
+	public void testEnsureProcessingTimeTimerRegisteredOnRestore() throws Exception {
+		TestOperator testOperator = new TestOperator();
+
+		KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String>
testHarness =
+				new KeyedOneInputStreamOperatorTestHarness<>(testOperator, new TestKeySelector(),
BasicTypeInfo.INT_TYPE_INFO);
+
+		testHarness.open();
+
+		testHarness.setProcessingTime(0L);
+
+		testHarness.processElement(new Tuple2<>(1, "SET_PROC_TIME_TIMER:20"), 0);
+
+		testHarness.processElement(new Tuple2<>(0, "SET_STATE:HELLO"), 0);
+		testHarness.processElement(new Tuple2<>(1, "SET_STATE:CIAO"), 0);
+
+		testHarness.processElement(new Tuple2<>(0, "SET_PROC_TIME_TIMER:10"), 0);
+
+		OperatorStateHandles snapshot = testHarness.snapshot(0, 0);
+
+		TestOperator testOperator1 = new TestOperator();
+
+		KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String>
testHarness1 =
+				new KeyedOneInputStreamOperatorTestHarness<>(
+						testOperator1,
+						new TestKeySelector(),
+						BasicTypeInfo.INT_TYPE_INFO);
+
+		testHarness1.setProcessingTime(0L);
+
+		testHarness1.setup();
+		testHarness1.initializeState(snapshot);
+		testHarness1.open();
+
+		testHarness1.setProcessingTime(10L);
+
+		assertThat(
+				extractResult(testHarness1),
+				contains("ON_PROC_TIME:HELLO"));
+
+		testHarness1.setProcessingTime(20L);
+
+		assertThat(
+				extractResult(testHarness1),
+				contains("ON_PROC_TIME:CIAO"));
+	}
+
+
+	/**
 	 * Verify that timers for the different time domains don't clash.
 	 */
 	@Test


Mime
View raw message