flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Manas Kale <manaskal...@gmail.com>
Subject Re: Perform processing only when watermark updates, buffer data otherwise
Date Mon, 06 Apr 2020 04:39:50 GMT
Hi Timo,
Thanks for the information.

On Thu, Apr 2, 2020 at 9:30 PM Timo Walther <twalthr@apache.org> wrote:

> Hi Manas,
>
> first of all, after assigning watermarks at the source level, usually
> Flink operators make sure to handle the watermarks.
>
> In case of a `union()`, the subsequent operator will increment its
> internal event-time clock and emit a new watermark only if all input
> streams (and their parallel instances) have reached a common event-time.
>
> Your sorting use case can be easily done with a KeyedProcessFunction
> [1]. You can buffer your events in a list state, and process them when a
> timer fires. The documentation also explains how to set a timer.
>
> If you want to fire when the next watermark arrives, you can set a timer
> like:
>
> ctx.timerService().currentWatermark() + 1
>
> The `union()` is meant for combining streams of the same data into one
> where the order of the event does not matter. However, watermarks are
> still arriving in order so a sorting by event-time should not be a problem.
>
> connect() is broader than a join (see also the answer here [2]).
>
> I hope I could answer most of your questions. Feel free to ask further
> questions.
>
> Regards,
> Timo
>
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/process_function.html#process-function
> [2]
>
> https://stackoverflow.com/questions/52885926/what-is-the-difference-between-flink-join-and-connect
>
>
>
> On 02.04.20 12:11, Manas Kale wrote:
> > Also
> >
> >   *   What happens to watermarks after a union operation? Do I have to
> >     assignTimestampsAndWatermarks() again? I guess I will have to since
> >     multiple streams are being combined and Flink needs to know how to
> >     resolve individual watermarks.
> >   * What is the difference between union() and connect()?
> >
> >
> > On Thu, Apr 2, 2020 at 10:33 AM Manas Kale <manaskale96@gmail.com
> > <mailto:manaskale96@gmail.com>> wrote:
> >
> >     Hi,
> >     I want to perform some processing on events only when the watermark
> >     is updated. Otherwise, for all other events, I want to keep
> >     buffering them till the watermark arrives.
> >     The main motivation behind doing this is that I have several
> >     operators that emit events/messages to a downstream operator. Since
> >     the order in which events arrive at the downstream operator is not
> >     guaranteed to be in chronological event time, I want to manually
> >     sort events when the watermark arrives and only then proceed.
> >
> >     Specifically, I want to first combine multiple streams and then do
> >     the above. Something like :
> >     stream1.union(stream2, steream3)...
> >
> >     One solution I am exploring is using a global window with a trigger
> >     that will fire only when the watermark updates.
> >     stream1.union(stream2, steream3).
> >     keyBy(...).
> >     window(GlobalWindows.create()).
> >     trigger(new OnWatermarkUpdateTrigger()).
> >     process(...)
> >
> >     I will store the latest watermark in the trigger's state store. In
> >     the onElement() method, I will FIRE if the current watermark is
> >     different than the stored one.
> >
> >     Is this the best way to implement the functionality described above?
> >
>
>

Mime
View raw message