flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From David Anderson <da...@ververica.com>
Subject Re: Multiple trigger events on keyed window
Date Thu, 22 Aug 2019 11:13:14 GMT
If you still need help diagnosing the cause of the misbehavior, please
share more of the code with us.

On Wed, Aug 21, 2019 at 6:24 PM Eric Isling <out.code.org@gmail.com> wrote:
>
> I should add that the behaviour persists, even when I force parallelism to 1.
>
> On Wed, Aug 21, 2019 at 5:19 PM Eric Isling <out.code.org@gmail.com> wrote:
>>
>> Dear list-members,
>>
>> I have a question regarding window-firing and element accumulation for a slidindingwindow
on a DataStream (Flink 1.8.1-2.12).
>>
>> My DataStream is derived from a custom SourceFunction, which emits stirng-sequences
of WINDOW size, in a deterministic sequence.
>> The aim is to crete sliding windows over the keyedstream for processing on the accumulated
strings, based on EventTime.
>> To assign EventTime and Watermark, I attech an AssignerWithPeriodicWaterMarks, to
the stream.
>> The sliding window is processed with a custom ProcessWindowFunction.
>>
>> env.setStreamTimeCharacteristic(EventTime)
>> val seqStream = env.addSource(Seqstream)
>>     .assignTimestampsAndWatermarks(SeqTimeStampExtractor())
>>     .keyBy(getEventtimeKey)
>>     .window(SlidingEventTimeWindows.of(Time.milliseconds(windowSize), Time.milliseconds(slideSize)))
>>
>> val result = seqStream.process(ProcessSeqWindow(target1))
>>
>> My AssignerWithPeriodicWaterMarks looks like this.
>> class FASTATimeStampExtractor : AssignerWithPeriodicWatermarks<FASTAstring>
{
>>     var waterMark  = 9999L
>>     override fun extractTimestamp(element: FASTAstring, previousElementTimestamp:
Long): Long {
>>         return element.f1
>>     }
>>
>>     override fun getCurrentWatermark(): Watermark? {
>>         waterMark += 1
>>         return Watermark(waterMark)
>>     }
>> }
>>
>> In other words, each element emitted by the source should have its own EvenTime,
and the WaterMark should be emitted allowing no further events for that time.
>> Stepping through the stream in a debugger, indicates that EventTime / Watremarks
are generated as would expect.
>>
>> My expectation is that ProcessSeqWindow.run() ought to be called with a number of
elements proportional to the time window (e.g. 10 ms), over EventTime. However, what I observe
is that run() is called multiple times with single elemnts, and in an arbitrary sequence with
respect to EventTime.
>>
>> My question is whether this is likely to be caused by multiple trigger-events on
each window, or are there other possible explainations? How can I debug the cause?
>>
>> Thanks,
>>
>> Eric

Mime
View raw message