flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m..@apache.org
Subject flink git commit: [FLINK-4862] fix Timer register in ContinuousEventTimeTrigger
Date Mon, 24 Oct 2016 13:55:06 GMT
Repository: flink
Updated Branches:
  refs/heads/release-1.1 5731672e5 -> 05a5f460b


[FLINK-4862] fix Timer register in ContinuousEventTimeTrigger

Backported to release-1.1.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/05a5f460
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/05a5f460
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/05a5f460

Branch: refs/heads/release-1.1
Commit: 05a5f460b33828cc8a1e6a45d37b555facc7133f
Parents: 5731672
Author: manuzhang <owenzhang1990@gmail.com>
Authored: Thu Oct 20 15:06:01 2016 +0800
Committer: Maximilian Michels <mxm@apache.org>
Committed: Mon Oct 24 15:54:16 2016 +0200

----------------------------------------------------------------------
 .../triggers/ContinuousEventTimeTrigger.java    | 14 ++--
 .../operators/windowing/WindowOperatorTest.java | 76 ++++++++++++++++++++
 2 files changed, 86 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/05a5f460/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
index 02613f6..cb8cdf5 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
@@ -89,9 +89,11 @@ public class ContinuousEventTimeTrigger<W extends Window> extends
Trigger<Object
 	@Override
 	public void clear(W window, TriggerContext ctx) throws Exception {
 		ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
-		long timestamp = fireTimestamp.get();
-		ctx.deleteEventTimeTimer(timestamp);
-		fireTimestamp.clear();
+		Long timestamp = fireTimestamp.get();
+		if (timestamp != null) {
+			ctx.deleteEventTimeTimer(timestamp);
+			fireTimestamp.clear();
+		}
 	}
 
 	@Override
@@ -100,8 +102,12 @@ public class ContinuousEventTimeTrigger<W extends Window> extends
Trigger<Object
 	}
 
 	@Override
-	public TriggerResult onMerge(W window, OnMergeContext ctx) {
+	public TriggerResult onMerge(W window, OnMergeContext ctx) throws Exception {
 		ctx.mergePartitionedState(stateDesc);
+		Long nextFireTimestamp = ctx.getPartitionedState(stateDesc).get();
+		if (nextFireTimestamp != null) {
+			ctx.registerEventTimeTimer(nextFireTimestamp);
+		}
 		return TriggerResult.CONTINUE;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/05a5f460/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
index 62266c4..dfa353c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.typeutils.TypeInfoParser;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.streaming.api.datastream.WindowedStream;
 import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -595,6 +596,81 @@ public class WindowOperatorTest {
 		testHarness.close();
 	}
 
+	/**
+	 * This tests whether merging works correctly with the ContinuousEventTimeTrigger.
+	 * @throws Exception
+	 */
+	@Test
+	@SuppressWarnings("unchecked")
+	public void testSessionWindowsWithContinuousEventTimeTrigger() throws Exception {
+		closeCalled.set(0);
+
+		final int SESSION_SIZE = 3;
+
+		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String,
Integer>");
+
+		ListStateDescriptor<Tuple2<String, Integer>> stateDesc = new ListStateDescriptor<>("window-contents",
+			inputType.createSerializer(new ExecutionConfig()));
+
+		WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String,
Integer>>, Tuple3<String, Long, Long>, TimeWindow> operator = new WindowOperator<>(
+			EventTimeSessionWindows.withGap(Time.seconds(SESSION_SIZE)),
+			new TimeWindow.Serializer(),
+			new TupleKeySelector(),
+			BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
+			stateDesc,
+			new InternalIterableWindowFunction<>(new SessionWindowFunction()),
+			ContinuousEventTimeTrigger.of(Time.seconds(2)),
+			0);
+
+		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String,
Integer>"), new ExecutionConfig());
+
+		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long,
Long>> testHarness =
+			new OneInputStreamOperatorTestHarness<>(operator);
+
+		testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+		testHarness.open();
+
+		// add elements out-of-order and first trigger time is 2000
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 1500));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 0));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 3), 2500));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 2), 1000));
+
+		// triggers emit and next trigger time is 4000
+		testHarness.processWatermark(new Watermark(2500));
+
+		expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-1", 1500L, 4500L),
4499));
+		expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-6", 0L, 5500L), 5499));
+		expectedOutput.add(new Watermark(2500));
+
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 5), 4000));
+		testHarness.processWatermark(new Watermark(3000));
+		expectedOutput.add(new Watermark(3000));
+
+		// do a snapshot, close and restore again
+		StreamTaskState snapshot = testHarness.snapshot(0L, 0L);
+		testHarness.close();
+		testHarness.setup();
+		testHarness.restore(snapshot, 0);
+		testHarness.open();
+
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), 4000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 4), 3500));
+		// triggers emit and next trigger time is 6000
+		testHarness.processWatermark(new Watermark(4000));
+
+		expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-3", 1500L, 7000L),
6999));
+		expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-15", 0L, 7000L), 6999));
+		expectedOutput.add(new Watermark(4000));
+
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(),
new Tuple3ResultSortComparator());
+
+		testHarness.close();
+	}
+
 	@Test
 	public void testMergeAndEvictor() throws Exception {
 		// verify that merging WindowAssigner and Evictor cannot be used together


Mime
View raw message