Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 51282200BA8 for ; Mon, 24 Oct 2016 15:55:08 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 4FB6D160AEB; Mon, 24 Oct 2016 13:55:08 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 6DC13160AE1 for ; Mon, 24 Oct 2016 15:55:07 +0200 (CEST) Received: (qmail 58928 invoked by uid 500); 24 Oct 2016 13:55:06 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 58919 invoked by uid 99); 24 Oct 2016 13:55:06 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 24 Oct 2016 13:55:06 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 6A5F7DFB78; Mon, 24 Oct 2016 13:55:06 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: mxm@apache.org To: commits@flink.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: flink git commit: [FLINK-4862] fix Timer register in ContinuousEventTimeTrigger Date: Mon, 24 Oct 2016 13:55:06 +0000 (UTC) archived-at: Mon, 24 Oct 2016 13:55:08 -0000 Repository: flink Updated Branches: refs/heads/release-1.1 5731672e5 -> 05a5f460b [FLINK-4862] fix Timer register in ContinuousEventTimeTrigger Backported to release-1.1. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/05a5f460 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/05a5f460 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/05a5f460 Branch: refs/heads/release-1.1 Commit: 05a5f460b33828cc8a1e6a45d37b555facc7133f Parents: 5731672 Author: manuzhang Authored: Thu Oct 20 15:06:01 2016 +0800 Committer: Maximilian Michels Committed: Mon Oct 24 15:54:16 2016 +0200 ---------------------------------------------------------------------- .../triggers/ContinuousEventTimeTrigger.java | 14 ++-- .../operators/windowing/WindowOperatorTest.java | 76 ++++++++++++++++++++ 2 files changed, 86 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/05a5f460/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java index 02613f6..cb8cdf5 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java @@ -89,9 +89,11 @@ public class ContinuousEventTimeTrigger extends Trigger fireTimestamp = ctx.getPartitionedState(stateDesc); - long timestamp = fireTimestamp.get(); - ctx.deleteEventTimeTimer(timestamp); - fireTimestamp.clear(); + Long timestamp = fireTimestamp.get(); + if (timestamp != null) { + ctx.deleteEventTimeTimer(timestamp); + fireTimestamp.clear(); + } } @Override @@ -100,8 +102,12 @@ public class ContinuousEventTimeTrigger extends Trigger> inputType = TypeInfoParser.parse("Tuple2"); + + ListStateDescriptor> stateDesc = new ListStateDescriptor<>("window-contents", + inputType.createSerializer(new ExecutionConfig())); + + WindowOperator, Iterable>, Tuple3, TimeWindow> operator = new WindowOperator<>( + EventTimeSessionWindows.withGap(Time.seconds(SESSION_SIZE)), + new TimeWindow.Serializer(), + new TupleKeySelector(), + BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), + stateDesc, + new InternalIterableWindowFunction<>(new SessionWindowFunction()), + ContinuousEventTimeTrigger.of(Time.seconds(2)), + 0); + + operator.setInputType(TypeInfoParser.>parse("Tuple2"), new ExecutionConfig()); + + OneInputStreamOperatorTestHarness, Tuple3> testHarness = + new OneInputStreamOperatorTestHarness<>(operator); + + testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); + + ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>(); + + testHarness.open(); + + // add elements out-of-order and first trigger time is 2000 + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 1500)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 0)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 3), 2500)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 2), 1000)); + + // triggers emit and next trigger time is 4000 + testHarness.processWatermark(new Watermark(2500)); + + expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-1", 1500L, 4500L), 4499)); + expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-6", 0L, 5500L), 5499)); + expectedOutput.add(new Watermark(2500)); + + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 5), 4000)); + testHarness.processWatermark(new Watermark(3000)); + expectedOutput.add(new Watermark(3000)); + + // do a snapshot, close and restore again + StreamTaskState snapshot = testHarness.snapshot(0L, 0L); + testHarness.close(); + testHarness.setup(); + testHarness.restore(snapshot, 0); + testHarness.open(); + + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), 4000)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 4), 3500)); + // triggers emit and next trigger time is 6000 + testHarness.processWatermark(new Watermark(4000)); + + expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-3", 1500L, 7000L), 6999)); + expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-15", 0L, 7000L), 6999)); + expectedOutput.add(new Watermark(4000)); + + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator()); + + testHarness.close(); + } + @Test public void testMergeAndEvictor() throws Exception { // verify that merging WindowAssigner and Evictor cannot be used together