flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject flink git commit: [FLINK-4651] Ensure processing-time timers are set on restore
Date Mon, 09 Jan 2017 17:18:01 GMT
Repository: flink
Updated Branches:
  refs/heads/release-1.2 8c21fa748 -> 7348424d9


[FLINK-4651] Ensure processing-time timers are set on restore

This test ensures that we set a low-level processing time timer in case
we have processing-time timers set.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7348424d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7348424d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7348424d

Branch: refs/heads/release-1.2
Commit: 7348424d9a6ddb578d8d15216997711f90667563
Parents: 8c21fa7
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Mon Jan 9 16:01:23 2017 +0100
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Mon Jan 9 18:16:09 2017 +0100

----------------------------------------------------------------------
 .../operators/AbstractStreamOperatorTest.java   | 51 ++++++++++++++++++++
 1 file changed, 51 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7348424d/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
index 2fb0089..2844fbb 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
@@ -141,6 +141,57 @@ public class AbstractStreamOperatorTest {
 	}
 
 	/**
+	 * Verify that a low-level timer is set for processing-time timers in case of restore.
+	 */
+	@Test
+	public void testEnsureProcessingTimeTimerRegisteredOnRestore() throws Exception {
+		TestOperator testOperator = new TestOperator();
+
+		KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String>
testHarness =
+				new KeyedOneInputStreamOperatorTestHarness<>(testOperator, new TestKeySelector(),
BasicTypeInfo.INT_TYPE_INFO);
+
+		testHarness.open();
+
+		testHarness.setProcessingTime(0L);
+
+		testHarness.processElement(new Tuple2<>(1, "SET_PROC_TIME_TIMER:20"), 0);
+
+		testHarness.processElement(new Tuple2<>(0, "SET_STATE:HELLO"), 0);
+		testHarness.processElement(new Tuple2<>(1, "SET_STATE:CIAO"), 0);
+
+		testHarness.processElement(new Tuple2<>(0, "SET_PROC_TIME_TIMER:10"), 0);
+
+		OperatorStateHandles snapshot = testHarness.snapshot(0, 0);
+
+		TestOperator testOperator1 = new TestOperator();
+
+		KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String>
testHarness1 =
+				new KeyedOneInputStreamOperatorTestHarness<>(
+						testOperator1,
+						new TestKeySelector(),
+						BasicTypeInfo.INT_TYPE_INFO);
+
+		testHarness1.setProcessingTime(0L);
+
+		testHarness1.setup();
+		testHarness1.initializeState(snapshot);
+		testHarness1.open();
+
+		testHarness1.setProcessingTime(10L);
+
+		assertThat(
+				extractResult(testHarness1),
+				contains("ON_PROC_TIME:HELLO"));
+
+		testHarness1.setProcessingTime(20L);
+
+		assertThat(
+				extractResult(testHarness1),
+				contains("ON_PROC_TIME:CIAO"));
+	}
+
+
+	/**
 	 * Verify that timers for the different time domains don't clash.
 	 */
 	@Test


Mime
View raw message