flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@gmail.com>
Subject Re: assignTimestampsAndWatermarks not work after KeyedStream.process
Date Fri, 03 May 2019 07:32:07 GMT
Hi,

this should be covered here:
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/event_time.html#watermarks-in-parallel-streams

Best, Fabian

Am Do., 2. Mai 2019 um 17:48 Uhr schrieb an0 <an00na@gmail.com>:

> This explanation is exactly what I'm looking for, thanks! Is such an
> important rule documented anywhere in the official document?
>
> On 2019/04/30 08:47:29, Fabian Hueske <fhueske@gmail.com> wrote:
> > An operator task broadcasts its current watermark to all downstream tasks
> > that might receive its records.
> > If you have an the following code:
> >
> > DataStream<X> a = ...
> > a.map(A).map(B).keyBy(....).window(C)
> >
> > and execute this with parallelism 2, your plan looks like this
> >
> > A.1 -- B.1 --\--/-- C.1
> >                       X
> > A.2 -- B.2 --/--\-- C.2
> >
> > A.1 will propagate its watermarks to B.1 because only B.1 will receive
> its
> > output events.
> > However, B.1 will propagate its watermarks to C.1 and C.2 because the
> > output of B.1 is partitioned and all C tasks might receive output events
> > from B.1.
> >
> > Best, Fabian
> >
> > Am Mo., 29. Apr. 2019 um 20:06 Uhr schrieb an0 <an00na@gmail.com>:
> >
> > > Thanks very much. It definitely explains the problem I'm seeing.
> However,
> > > something I need to confirm:
> > > You say "Watermarks are broadcasted/forwarded anyway." Do you mean, in
> > > assingTimestampsAndWatermarks.keyBy.window, it doesn't matter what data
> > > flows through a specific key's stream, all key streams have the same
> > > watermarks? So time-wise, `window` behaves as if `keyBy` is not there
> at
> > > all?
> > >
> > > On 2019/04/26 06:34:10, Dawid Wysakowicz <dwysakowicz@apache.org>
> wrote:
> > > > Hi,
> > > >
> > > > Watermarks are meta events that travel independently of data events.
> > > >
> > > > 1) If you assingTimestampsAndWatermarks before keyBy, all parallel
> > > > instances of trips have some data(this is my assumption) so
> Watermarks
> > > > can be generated. Afterwards even if some of the keyed partitions
> have
> > > > no data, Watermarks are broadcasted/forwarded anyway. In other words
> if
> > > > at some point Watermarks were generated for all partitions of a
> single
> > > > stage, they will be forwarded beyond this point.
> > > >
> > > > 2) If you assingTimestampsAndWatermarks after keyBy, you try to
> assign
> > > > watermarks for an empty partition which produces no Watermarks at all
> > > > for this partition, therefore there is no progress beyond this point.
> > > >
> > > > I hope this clarifies it a bit.
> > > >
> > > > Best,
> > > >
> > > > Dawid
> > > >
> > > > On 25/04/2019 16:49, an0 wrote:
> > > > > If my understanding is correct, then why
> > > `assignTimestampsAndWatermarks` before `keyBy` works? The
> `timeWindowAll`
> > > stream's input streams are task 1 and task 2, with task 2 idling, no
> matter
> > > whether `assignTimestampsAndWatermarks` is before or after `keyBy`,
> because
> > > whether task 2 receives elements only depends on the key distribution,
> has
> > > nothing to do with timestamp assignment, right?
> > > > >
> > > > >
> > >                  /key 1 trips\
> > > > >
> > >                /                    \
> > > > > (A) trips--> assignTimestampsAndWatermarks-->keyBy
> > > timeWindowAll
> > > > >
> > >                \       idle        /
> > > > >
> > >                  \key 2 trips/
> > > > >
> > > > >                            /key 1 trips-->
> > > assignTimestampsAndWatermarks\
> > > > >                          /
> > >                                      \
> > > > > (B) trips-->keyBy
> > >                            timeWindowAll
> > > > >                          \       idle
> > >                                    /
> > > > >                            \key 2 trips-->
> > > assignTimestampsAndWatermarks/
> > > > >
> > > > > How things are different between A and B from `timeWindowAll`'s
> > > perspective?
> > > > >
> > > > > BTW, thanks for the webinar link, I'll check it later.
> > > > >
> > > > > On 2019/04/25 08:30:20, Dawid Wysakowicz <dwysakowicz@apache.org>
> > > wrote:
> > > > >> Hi,
> > > > >>
> > > > >> Yes I think your explanation is correct. I can also recommend
> Seth's
> > > > >> webinar where he talks about debugging Watermarks[1]
> > > > >>
> > > > >> Best,
> > > > >>
> > > > >> Dawid
> > > > >>
> > > > >> [1]
> > > > >>
> > >
> https://www.ververica.com/resources/webinar/webinar/debugging-flink-tutorial
> > > > >>
> > > > >> On 22/04/2019 22:55, an0 wrote:
> > > > >>> Thanks, I feel I'm getting closer to the truth.
> > > > >>>
> > > > >>> So parallelism is the cause? Say my parallelism is 2. Does
that
> mean
> > > I get 2 tasks running after `keyBy` if even all elements have the same
> key
> > > so go to 1 down stream(say task 1)? And it is the other task(task 2)
> with
> > > no incoming data that caused the `timeWindowAll` stream unable to
> progress?
> > > Because both task 1 and task 2 are its input streams and one is idling
> so
> > > its event time cannot make progress?
> > > > >>>
> > > > >>> On 2019/04/22 01:57:39, Guowei Ma <guowei.mgw@gmail.com>
wrote:
> > > > >>>> HI,
> > > > >>>>
> > > > >>>> BoundedOutOfOrdernessTimestampExtractors can send a WM
at least
> > > after it
> > > > >>>> receives an element.
> > > > >>>>
> > > > >>>> For after Keyby:
> > > > >>>> Flink uses the HashCode of key and the parallelism of
down
> stream
> > > to decide
> > > > >>>> which subtask would receive the element. This means if
your key
> is
> > > always
> > > > >>>> same, all the sources will only send the elements to
the same
> down
> > > stream
> > > > >>>> task, for example only no. 3
> > > BoundedOutOfOrdernessTimestampExtractor.
> > > > >>>>
> > > > >>>> For before Keyby:
> > > > >>>> In your case, the Source and
> > > BoundedOutOfOrdernessTimestampExtractors would
> > > > >>>> be chained together, which means every
> > > > >>>> BoundedOutOfOrdernessTimestampExtractors will receive
elements.
> > > > >>>>
> > > > >>>> Best,
> > > > >>>> Guowei
> > > > >>>>
> > > > >>>>
> > > > >>>> an0 <an00na@gmail.com> 于2019年4月19日周五
下午10:41写道:
> > > > >>>>
> > > > >>>>> Hi,
> > > > >>>>>
> > > > >>>>> First of all, thank you for the `shuffle()` tip.
It works.
> > > However, I
> > > > >>>>> still don't understand why it doesn't work without
calling
> > > `shuffle()`.
> > > > >>>>>
> > > > >>>>> Why would not all BoundedOutOfOrdernessTimestampExtractors
> receive
> > > trips?
> > > > >>>>> All the trips has keys and timestamps. As I said
in my reply to
> > > Paul, I see
> > > > >>>>> the same watermarks being extracted.
> > > > >>>>>
> > > > >>>>> How could calling `assignTimestampsAndWatermarks`
before VS
> after
> > > `keyBy`
> > > > >>>>> matter? My understanding is any specific window for
a specific
> key
> > > always
> > > > >>>>> receives the exactly same data, and the calling order
of
> > > > >>>>> `assignTimestampsAndWatermarks` and `keyBy` shouldn't
affect
> that.
> > > > >>>>>
> > > > >>>>> To make `keyBy` as irrelevant as possible, I tried
letting it
> > > always
> > > > >>>>> return the same key so that there is only 1 keyed
stream and
> it is
> > > exactly
> > > > >>>>> the same as the original unkeyed stream. It still
doesn't
> trigger
> > > windows:
> > > > >>>>> ```java
> > > > >>>>> DataStream<Trip> trips = env.addSource(consumer);
> > > > >>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip
-> 0L);
> > > > >>>>> DataStream<Trip> featurizedUserTrips =
> > > > >>>>>         userTrips.map(trip ->
> > > trip).assignTimestampsAndWatermarks(new
> > > > >>>>> BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1))
{
> > > > >>>>>     @Override
> > > > >>>>>     public long extractTimestamp(Trip trip) {
> > > > >>>>>         return trip.endTime.getTime();
> > > > >>>>>     }
> > > > >>>>> });
> > > > >>>>> AllWindowedStream<Trip, TimeWindow> windowedUserTrips
=
> > > > >>>>> featurizedUserTrips.timeWindowAll(Time.days(7),
> > > > >>>>>         Time.days(1));
> > > > >>>>> ```
> > > > >>>>>
> > > > >>>>> It makes no sense to me. Please help me understand
why it
> doesn't
> > > work.
> > > > >>>>> Thanks!
> > > > >>>>>
> > > > >>>>> On 2019/04/19 04:14:31, Guowei Ma <guowei.mgw@gmail.com>
> wrote:
> > > > >>>>>> Hi,
> > > > >>>>>> After keyby maybe only some of
> > > BoundedOutOfOrdernessTimestampExtractors
> > > > >>>>>> could receive the elements(trip). If that is
the case
> > > > >>>>>> BoundedOutOfOrdernessTimestampExtractor, which
does not
> receive
> > > element
> > > > >>>>>> would not send the WM. Since that the timeWindowAll
operator
> > > could not be
> > > > >>>>>> triggered.
> > > > >>>>>> You could add a shuffle() before the
> > > assignTimestampsAndWatermarks in
> > > > >>>>> your
> > > > >>>>>> second case and check if the window is triggered.
 If it
> could be
> > > > >>>>> triggered
> > > > >>>>>> you could check the distribution of elements
generated by the
> > > source.
> > > > >>>>>>
> > > > >>>>>> Best,
> > > > >>>>>> Guowei
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>> an00na@gmail.com <an00na@gmail.com> 于2019年4月19日周五
上午4:10写道:
> > > > >>>>>>
> > > > >>>>>>> I don't think it is the watermark. I see
the same watermarks
> > > from the
> > > > >>>>> two
> > > > >>>>>>> versions of code.
> > > > >>>>>>>
> > > > >>>>>>> The processing on the keyed stream doesn't
change event time
> at
> > > all. I
> > > > >>>>> can
> > > > >>>>>>> simply change my code to use `map` on the
keyed stream to
> return
> > > back
> > > > >>>>> the
> > > > >>>>>>> input data, so that the window operator receives
the exactly
> same
> > > > >>>>> data. The
> > > > >>>>>>> only difference is when I do
> `assignTimestampsAndWatermarks`. The
> > > > >>>>> result is
> > > > >>>>>>> the same, `assignTimestampsAndWatermarks`
before `keyBy`
> works:
> > > > >>>>>>> ```java
> > > > >>>>>>> DataStream<Trip> trips =
> > > > >>>>>>>
>  env.addSource(consumer).assignTimestampsAndWatermarks(new
> > > > >>>>>>> BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1))
{
> > > > >>>>>>>     @Override
> > > > >>>>>>>     public long extractTimestamp(Trip trip)
{
> > > > >>>>>>>         return trip.endTime.getTime();
> > > > >>>>>>>     }
> > > > >>>>>>> });
> > > > >>>>>>> KeyedStream<Trip, Long> userTrips =
trips.keyBy(trip ->
> > > trip.userId);
> > > > >>>>>>> DataStream<Trip> featurizedUserTrips
= userTrips.map(trip ->
> > > trip);
> > > > >>>>>>> AllWindowedStream<Trip, TimeWindow>
windowedUserTrips =
> > > > >>>>>>> featurizedUserTrips.timeWindowAll(Time.days(7),
> > > > >>>>>>>         Time.days(1));
> > > > >>>>>>> ```
> > > > >>>>>>>
> > > > >>>>>>> `assignTimestampsAndWatermarks` after `keyBy`
doesn't work:
> > > > >>>>>>> ```java
> > > > >>>>>>> DataStream<Trip> trips = env.addSource(consumer);
> > > > >>>>>>> KeyedStream<Trip, Long> userTrips =
trips.keyBy(trip ->
> > > trip.userId);
> > > > >>>>>>> DataStream<Trip> featurizedUserTrips
=
> > > > >>>>>>>         userTrips.map(trip ->
> > > trip).assignTimestampsAndWatermarks(new
> > > > >>>>>>> BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1))
{
> > > > >>>>>>>     @Override
> > > > >>>>>>>     public long extractTimestamp(Trip trip)
{
> > > > >>>>>>>         return trip.endTime.getTime();
> > > > >>>>>>>     }
> > > > >>>>>>> });
> > > > >>>>>>> AllWindowedStream<Trip, TimeWindow>
windowedUserTrips =
> > > > >>>>>>> featurizedUserTrips.timeWindowAll(Time.days(7),
> > > > >>>>>>>         Time.days(1));
> > > > >>>>>>> ```
> > > > >>>>>>>
> > > > >>>>>>> It feels a bug to me, but I want to confirm
it before I file
> the
> > > bug
> > > > >>>>>>> report.
> > > > >>>>>>>
> > > > >>>>>>> On 2019/04/18 03:38:34, Paul Lam <paullin3280@gmail.com>
> wrote:
> > > > >>>>>>>> Hi,
> > > > >>>>>>>>
> > > > >>>>>>>> Could you check the watermark of the
window operator? One
> > > possible
> > > > >>>>>>> situation would be some of the keys are not
getting enough
> > > inputs, so
> > > > >>>>> their
> > > > >>>>>>> watermarks remain below the window end time
and hold the
> window
> > > > >>>>> operator
> > > > >>>>>>> watermark back. IMO, it’s a good practice
to assign watermark
> > > earlier
> > > > >>>>> in
> > > > >>>>>>> the data pipeline.
> > > > >>>>>>>> Best,
> > > > >>>>>>>> Paul Lam
> > > > >>>>>>>>
> > > > >>>>>>>>> 在 2019年4月17日,23:04,an00na@gmail.com
写道:
> > > > >>>>>>>>>
> > > > >>>>>>>>> `assignTimestampsAndWatermarks` before
`keyBy` works:
> > > > >>>>>>>>> ```java
> > > > >>>>>>>>> DataStream<Trip> trips =
> > > > >>>>>>>>>
> > > env.addSource(consumer).assignTimestampsAndWatermarks(new
> > > > >>>>>>> BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1))
{
> > > > >>>>>>>>>            @Override
> > > > >>>>>>>>>            public long extractTimestamp(Trip
trip) {
> > > > >>>>>>>>>                return trip.endTime.getTime();
> > > > >>>>>>>>>            }
> > > > >>>>>>>>>        });
> > > > >>>>>>>>> KeyedStream<Trip, Long> userTrips
= trips.keyBy(trip ->
> > > > >>>>> trip.userId);
> > > > >>>>>>>>> DataStream<FeaturizedTrip>
featurizedUserTrips =
> > > > >>>>> userTrips.process(new
> > > > >>>>>>> Featurization());
> > > > >>>>>>>>> AllWindowedStream<FeaturizedTrip,
TimeWindow>
> > > windowedUserTrips =
> > > > >>>>>>>>>        featurizedUserTrips.timeWindowAll(Time.days(7),
> > > > >>>>>>>>>                Time.days(1));
> > > > >>>>>>>>> ```
> > > > >>>>>>>>>
> > > > >>>>>>>>> But not after `keyBy` and `process`:
> > > > >>>>>>>>> ```java
> > > > >>>>>>>>> DataStream<Trip> trips = env.addSource(consumer);
> > > > >>>>>>>>> KeyedStream<Trip, Long> userTrips
= trips.keyBy(trip ->
> > > > >>>>> trip.userId);
> > > > >>>>>>>>> DataStream<FeaturizedTrip>
featurizedUserTrips =
> > > > >>>>>>>>>        userTrips.process(new
> > > > >>>>>>> Featurization()).assignTimestampsAndWatermarks(new
> > > > >>>>>>>
> > > BoundedOutOfOrdernessTimestampExtractor<FeaturizedTrip>(Time.days(1))
{
> > > > >>>>>>>>>            @Override
> > > > >>>>>>>>>            public long extractTimestamp(FeaturizedTrip
> trip) {
> > > > >>>>>>>>>                return trip.endTime.getTime();
> > > > >>>>>>>>>            }
> > > > >>>>>>>>>        });
> > > > >>>>>>>>> AllWindowedStream<FeaturizedTrip,
TimeWindow>
> > > windowedUserTrips =
> > > > >>>>>>>>>        featurizedUserTrips.timeWindowAll(Time.days(7),
> > > > >>>>>>>>>                Time.days(1));
> > > > >>>>>>>>> ```
> > > > >>>>>>>>> Windows are never triggered.
> > > > >>>>>>>>>
> > > > >>>>>>>>> Is it a bug or expected behavior?
If the latter, where is
> it
> > > > >>>>>>> documented?
> > > > >>
> > > >
> > > >
> > >
> >
>

Mime
View raw message