flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tomasz Dobrzycki <dobrzycki.tom...@gmail.com>
Subject Re: Unexpected behaviour of a periodic trigger.
Date Wed, 23 Aug 2017 10:11:18 GMT
Hi Tony,

Won't that increase the amount of processing Flink has to do? It would
have to window twice, right?

Thanks,
Tomasz

On 23 August 2017 at 11:02, Tony Wei <tony19920430@gmail.com> wrote:
> Hi Tomasz,
>
> In my opinion, I would move .window() function down to these two DataStream.
> (rawEvent.window().reduce().map(), and so does metrics)
> It makes sure that they won't share the same constructor.
>
> Regards,
> Tony Wei
>
> 2017-08-23 17:51 GMT+08:00 Tomasz Dobrzycki <dobrzycki.tomasz@gmail.com>:
>>
>> Hi Tony,
>>
>> Thank you for your answer, it definitely helps with understanding this
>> situation.
>> Is there any reliable way to split the stream so I get 2 outputs that
>> avoids this behaviour? Eventually I want to have 2 sinks that output
>> different data (one being just a copy of the stream, but organised in
>> session windows and the other being metrics which I derive from the
>> data itself).
>>
>> Thanks,
>> Tomasz
>>
>> On 23 August 2017 at 10:32, 魏偉哲 <tony19920430@gmail.com> wrote:
>> > Hi Tomasz,
>> >
>> > I think this is because .window() is a lazy operator.
>> > It just creates a WindowedStream class but not create a corresponding
>> > operator.
>> > The operator will be created after you called .reduce() and .apply().
>> >
>> > rawEvents and metrics actually shared the same object to create their
>> > own
>> > operators.
>> > You can see the detail in WindowedStream.trigger() that it only set
>> > this.trigger = trigger and then return iteself.
>> > Because of this, when you used the same object to create operator for
>> > rawEvents, it took the same settings for both WindowAssigner and Trigger
>> > as
>> > well.
>> > That's why you changed the order then the behavior changed as well.
>> >
>> > Hope this will help you.
>> >
>> > Regards,
>> > Tony Wei
>> >
>> > 2017-08-17 16:25 GMT+08:00 Tomasz Dobrzycki
>> > <dobrzycki.tomasz@gmail.com>:
>> >>
>> >> Hi,
>> >>
>> >> I'm working on a custom trigger that is supposed to trigger
>> >> periodically and at the end of session window. These are the main
>> >> methods from my trigger:
>> >>
>> >> public TriggerResult onElement(Object element, long timestamp,
>> >> TimeWindow window, TriggerContext ctx) throws Exception {
>> >>     long currentTime = System.currentTimeMillis();
>> >>     if (currentTime - lastTriggerTime >= this.delay) {
>> >>         lastTriggerTime = currentTime;
>> >>         return TriggerResult.FIRE;
>> >>     } else {
>> >>         return TriggerResult.CONTINUE;
>> >>     }
>> >> }
>> >>
>> >> public TriggerResult onEventTime(long time, TimeWindow window,
>> >> TriggerContext ctx) {
>> >>     return time == window.maxTimestamp() ?
>> >>             TriggerResult.FIRE :
>> >>             TriggerResult.CONTINUE;
>> >> }
>> >>
>> >> When I use this trigger in my main processing method, I'm getting
>> >> unexpected behaviour. This is how I use it:
>> >>
>> >> // MAIN PROCESSING
>> >> WindowedStream<EventTags, Tuple, TimeWindow> sessionWindow = dataStream
>> >>               .map(new ParseEvent())
>> >>               .filter(new Filter())
>> >>               .assignTimestampsAndWatermarks(new
>> >> BoundedOutOfOrdernessTimestampExtractor<EventTags>(Time.minutes(5))
{
>> >>                   @Override
>> >>                   public long extractTimestamp(EventTags event) {
>> >>                       return event.receivedAt;
>> >>                   }
>> >>               })
>> >>               .keyBy("streamKeys")
>> >>
>> >> .window(EventTimeSessionWindows.withGap(Time.minutes(5)));
>> >>
>> >> // WARNING! This has to go before periodic triggered metrics as Flink
>> >> will trigger this as well
>> >>       // if it comes second
>> >>       DataStream<String> rawEvents = sessionWindow
>> >>               .reduce(new CollectRawData())
>> >>               .map(new ParseRawData());
>> >>
>> >> DataStream<String> metrics = sessionWindow
>> >>               .trigger(SessionTrigger.every(Time.milliseconds(2)))
>> >>               .apply(new ExtractMetrics());
>> >>
>> >>
>> >> This works as expected, rawEvents is calculated when the session
>> >> window is completed and metrics is calculated periodically and at the
>> >> windows end. But if I change the order of rawEvents and metrics (code
>> >> should work the same in my mind), rawEvents is also triggered
>> >> periodically. Is this expected to work this way? I'm not assigning
>> >> periodic trigger to rawEvents. Thanks for your help.
>> >>
>> >> Kind Regards,
>> >> Tomasz
>> >
>> >
>
>

Mime
View raw message