flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "yuemeng (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (FLINK-9201) same merge window will be fired twice if watermark already passed the merge window
Date Wed, 09 May 2018 02:13:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-9201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16468219#comment-16468219
] 

yuemeng edited comment on FLINK-9201 at 5/9/18 2:12 AM:
--------------------------------------------------------

[~aljoscha]

thanks for your suggest

i will add a new test case  to EventTimeTriggerTest,but still keep the old test in WindowOpertatorTest
because of olny merge window will occur the probelm now


was (Author: yuemeng):
[~aljoscha]

thanks for your suggest

i will move the test to EventTimeTriggerTest

> same merge window will be fired twice if watermark already passed the merge window
> ----------------------------------------------------------------------------------
>
>                 Key: FLINK-9201
>                 URL: https://issues.apache.org/jira/browse/FLINK-9201
>             Project: Flink
>          Issue Type: Bug
>          Components: Core
>    Affects Versions: 1.3.3
>            Reporter: yuemeng
>            Assignee: yuemeng
>            Priority: Blocker
>             Fix For: 1.6.0
>
>
> sum with session window,.suppose the session gap is 3 seconds and allowedlateness is
60 seconds
>  * w1,TimeWindow[1,9] had elements,1,2,3,6,will be fired if watermark reached 9
>  *  if a late element (w2,TimeWindow[7,10]) had come but the watermark already at 11.
>  * w1,w2 will be merged a new window w3 TimeWindow[1,10] and will be register a new timer
by call triggerContext.onMerge(mergedWindows),w3 will be fired first time by call triggerContext.onElement(element)
because of the watermark pass the w3. w3 will be fired second times because of the timer <
current watermark.
> that mean w3 will be fired  twice because of watermark pass the new merge window w3.
> Examples
> {code:java}
> @Test
> @SuppressWarnings("unchecked")
> public void testSessionWindowsFiredTwice() throws Exception {
>  closeCalled.set(0);
>  final int sessionSize = 3;
>  TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String,
Integer>");
>  ListStateDescriptor<Tuple2<String, Integer>> stateDesc = new ListStateDescriptor<>("window-contents",
>  inputType.createSerializer(new ExecutionConfig()));
>  WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String,
Integer>>, Tuple3<String, Long, Long>, TimeWindow> operator = new WindowOperator<>(
>  EventTimeSessionWindows.withGap(Time.seconds(sessionSize)),
>  new TimeWindow.Serializer(),
>  new TupleKeySelector(),
>  BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
>  stateDesc,
>  new InternalIterableWindowFunction<>(new SessionWindowFunction()),
>  EventTimeTrigger.create(),
>  60000,
>  null /* late data output tag */);
>  OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String,
Long, Long>> testHarness =
>  createTestHarness(operator);
>  ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
>  testHarness.open();
>  // add elements out-of-order
>  testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 0));
>  testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 2), 1000));
>  testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 3), 2500));
>  testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 10));
>  testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), 1000));
>  testHarness.processWatermark(new Watermark(5500));
>  expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-6", 0L, 5500L),
5499));
>  expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-3", 10L, 4000L),
3999));
>  expectedOutput.add(new Watermark(5500));
>  // do a snapshot, close and restore again
>  OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
>  TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput,
testHarness.getOutput(), new Tuple3ResultSortComparator());
>  testHarness.close();
>  testHarness = createTestHarness(operator);
>  testHarness.setup();
>  testHarness.initializeState(snapshot);
>  testHarness.open();
>  expectedOutput.clear();
>  //suppose the watermark alread arrived 10000
>  testHarness.processWatermark(new Watermark(10000));
>  //late element with timestamp 4500 had arrived,the new session window[0, 7500] is still
a valid window becase of maxtimestamp < cleantime
>  //and fired immediately
>  testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 6), 4500));
>  expectedOutput.add(new Watermark(10000));
>  expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-12", 0L, 7500L),
7499));
>  //when a new watermark had arrived,the same TimeWindow[0, 7500] will fired again becase
of a new timer had rigstered by call triggerOnMerge
>  testHarness.processWatermark(new Watermark(11000));
>  expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-12", 0L, 7500L),
7499));
>  expectedOutput.add(new Watermark(11000));
>  TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput,
testHarness.getOutput(), new Tuple3ResultSortComparator());
>  testHarness.close();
> }
> {code}
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message