flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From an0 <an0...@gmail.com>
Subject Re: assignTimestampsAndWatermarks not work after KeyedStream.process
Date Thu, 02 May 2019 15:48:12 GMT
This explanation is exactly what I'm looking for, thanks! Is such an important rule documented
anywhere in the official document?

On 2019/04/30 08:47:29, Fabian Hueske <fhueske@gmail.com> wrote: 
> An operator task broadcasts its current watermark to all downstream tasks
> that might receive its records.
> If you have an the following code:
> 
> DataStream<X> a = ...
> a.map(A).map(B).keyBy(....).window(C)
> 
> and execute this with parallelism 2, your plan looks like this
> 
> A.1 -- B.1 --\--/-- C.1
>                       X
> A.2 -- B.2 --/--\-- C.2
> 
> A.1 will propagate its watermarks to B.1 because only B.1 will receive its
> output events.
> However, B.1 will propagate its watermarks to C.1 and C.2 because the
> output of B.1 is partitioned and all C tasks might receive output events
> from B.1.
> 
> Best, Fabian
> 
> Am Mo., 29. Apr. 2019 um 20:06 Uhr schrieb an0 <an00na@gmail.com>:
> 
> > Thanks very much. It definitely explains the problem I'm seeing. However,
> > something I need to confirm:
> > You say "Watermarks are broadcasted/forwarded anyway." Do you mean, in
> > assingTimestampsAndWatermarks.keyBy.window, it doesn't matter what data
> > flows through a specific key's stream, all key streams have the same
> > watermarks? So time-wise, `window` behaves as if `keyBy` is not there at
> > all?
> >
> > On 2019/04/26 06:34:10, Dawid Wysakowicz <dwysakowicz@apache.org> wrote:
> > > Hi,
> > >
> > > Watermarks are meta events that travel independently of data events.
> > >
> > > 1) If you assingTimestampsAndWatermarks before keyBy, all parallel
> > > instances of trips have some data(this is my assumption) so Watermarks
> > > can be generated. Afterwards even if some of the keyed partitions have
> > > no data, Watermarks are broadcasted/forwarded anyway. In other words if
> > > at some point Watermarks were generated for all partitions of a single
> > > stage, they will be forwarded beyond this point.
> > >
> > > 2) If you assingTimestampsAndWatermarks after keyBy, you try to assign
> > > watermarks for an empty partition which produces no Watermarks at all
> > > for this partition, therefore there is no progress beyond this point.
> > >
> > > I hope this clarifies it a bit.
> > >
> > > Best,
> > >
> > > Dawid
> > >
> > > On 25/04/2019 16:49, an0 wrote:
> > > > If my understanding is correct, then why
> > `assignTimestampsAndWatermarks` before `keyBy` works? The `timeWindowAll`
> > stream's input streams are task 1 and task 2, with task 2 idling, no matter
> > whether `assignTimestampsAndWatermarks` is before or after `keyBy`, because
> > whether task 2 receives elements only depends on the key distribution, has
> > nothing to do with timestamp assignment, right?
> > > >
> > > >
> >                  /key 1 trips\
> > > >
> >                /                    \
> > > > (A) trips--> assignTimestampsAndWatermarks-->keyBy
> > timeWindowAll
> > > >
> >                \       idle        /
> > > >
> >                  \key 2 trips/
> > > >
> > > >                            /key 1 trips-->
> > assignTimestampsAndWatermarks\
> > > >                          /
> >                                      \
> > > > (B) trips-->keyBy
> >                            timeWindowAll
> > > >                          \       idle
> >                                    /
> > > >                            \key 2 trips-->
> > assignTimestampsAndWatermarks/
> > > >
> > > > How things are different between A and B from `timeWindowAll`'s
> > perspective?
> > > >
> > > > BTW, thanks for the webinar link, I'll check it later.
> > > >
> > > > On 2019/04/25 08:30:20, Dawid Wysakowicz <dwysakowicz@apache.org>
> > wrote:
> > > >> Hi,
> > > >>
> > > >> Yes I think your explanation is correct. I can also recommend Seth's
> > > >> webinar where he talks about debugging Watermarks[1]
> > > >>
> > > >> Best,
> > > >>
> > > >> Dawid
> > > >>
> > > >> [1]
> > > >>
> > https://www.ververica.com/resources/webinar/webinar/debugging-flink-tutorial
> > > >>
> > > >> On 22/04/2019 22:55, an0 wrote:
> > > >>> Thanks, I feel I'm getting closer to the truth.
> > > >>>
> > > >>> So parallelism is the cause? Say my parallelism is 2. Does that
mean
> > I get 2 tasks running after `keyBy` if even all elements have the same key
> > so go to 1 down stream(say task 1)? And it is the other task(task 2) with
> > no incoming data that caused the `timeWindowAll` stream unable to progress?
> > Because both task 1 and task 2 are its input streams and one is idling so
> > its event time cannot make progress?
> > > >>>
> > > >>> On 2019/04/22 01:57:39, Guowei Ma <guowei.mgw@gmail.com>
wrote:
> > > >>>> HI,
> > > >>>>
> > > >>>> BoundedOutOfOrdernessTimestampExtractors can send a WM at
least
> > after it
> > > >>>> receives an element.
> > > >>>>
> > > >>>> For after Keyby:
> > > >>>> Flink uses the HashCode of key and the parallelism of down
stream
> > to decide
> > > >>>> which subtask would receive the element. This means if your
key is
> > always
> > > >>>> same, all the sources will only send the elements to the same
down
> > stream
> > > >>>> task, for example only no. 3
> > BoundedOutOfOrdernessTimestampExtractor.
> > > >>>>
> > > >>>> For before Keyby:
> > > >>>> In your case, the Source and
> > BoundedOutOfOrdernessTimestampExtractors would
> > > >>>> be chained together, which means every
> > > >>>> BoundedOutOfOrdernessTimestampExtractors will receive elements.
> > > >>>>
> > > >>>> Best,
> > > >>>> Guowei
> > > >>>>
> > > >>>>
> > > >>>> an0 <an00na@gmail.com> 于2019年4月19日周五 下午10:41写道:
> > > >>>>
> > > >>>>> Hi,
> > > >>>>>
> > > >>>>> First of all, thank you for the `shuffle()` tip. It works.
> > However, I
> > > >>>>> still don't understand why it doesn't work without calling
> > `shuffle()`.
> > > >>>>>
> > > >>>>> Why would not all BoundedOutOfOrdernessTimestampExtractors
receive
> > trips?
> > > >>>>> All the trips has keys and timestamps. As I said in my
reply to
> > Paul, I see
> > > >>>>> the same watermarks being extracted.
> > > >>>>>
> > > >>>>> How could calling `assignTimestampsAndWatermarks` before
VS after
> > `keyBy`
> > > >>>>> matter? My understanding is any specific window for a
specific key
> > always
> > > >>>>> receives the exactly same data, and the calling order
of
> > > >>>>> `assignTimestampsAndWatermarks` and `keyBy` shouldn't
affect that.
> > > >>>>>
> > > >>>>> To make `keyBy` as irrelevant as possible, I tried letting
it
> > always
> > > >>>>> return the same key so that there is only 1 keyed stream
and it is
> > exactly
> > > >>>>> the same as the original unkeyed stream. It still doesn't
trigger
> > windows:
> > > >>>>> ```java
> > > >>>>> DataStream<Trip> trips = env.addSource(consumer);
> > > >>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip
-> 0L);
> > > >>>>> DataStream<Trip> featurizedUserTrips =
> > > >>>>>         userTrips.map(trip ->
> > trip).assignTimestampsAndWatermarks(new
> > > >>>>> BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1))
{
> > > >>>>>     @Override
> > > >>>>>     public long extractTimestamp(Trip trip) {
> > > >>>>>         return trip.endTime.getTime();
> > > >>>>>     }
> > > >>>>> });
> > > >>>>> AllWindowedStream<Trip, TimeWindow> windowedUserTrips
=
> > > >>>>> featurizedUserTrips.timeWindowAll(Time.days(7),
> > > >>>>>         Time.days(1));
> > > >>>>> ```
> > > >>>>>
> > > >>>>> It makes no sense to me. Please help me understand why
it doesn't
> > work.
> > > >>>>> Thanks!
> > > >>>>>
> > > >>>>> On 2019/04/19 04:14:31, Guowei Ma <guowei.mgw@gmail.com>
wrote:
> > > >>>>>> Hi,
> > > >>>>>> After keyby maybe only some of
> > BoundedOutOfOrdernessTimestampExtractors
> > > >>>>>> could receive the elements(trip). If that is the case
> > > >>>>>> BoundedOutOfOrdernessTimestampExtractor, which does
not receive
> > element
> > > >>>>>> would not send the WM. Since that the timeWindowAll
operator
> > could not be
> > > >>>>>> triggered.
> > > >>>>>> You could add a shuffle() before the
> > assignTimestampsAndWatermarks in
> > > >>>>> your
> > > >>>>>> second case and check if the window is triggered.
 If it could be
> > > >>>>> triggered
> > > >>>>>> you could check the distribution of elements generated
by the
> > source.
> > > >>>>>>
> > > >>>>>> Best,
> > > >>>>>> Guowei
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> an00na@gmail.com <an00na@gmail.com> 于2019年4月19日周五
上午4:10写道:
> > > >>>>>>
> > > >>>>>>> I don't think it is the watermark. I see the same
watermarks
> > from the
> > > >>>>> two
> > > >>>>>>> versions of code.
> > > >>>>>>>
> > > >>>>>>> The processing on the keyed stream doesn't change
event time at
> > all. I
> > > >>>>> can
> > > >>>>>>> simply change my code to use `map` on the keyed
stream to return
> > back
> > > >>>>> the
> > > >>>>>>> input data, so that the window operator receives
the exactly same
> > > >>>>> data. The
> > > >>>>>>> only difference is when I do `assignTimestampsAndWatermarks`.
The
> > > >>>>> result is
> > > >>>>>>> the same, `assignTimestampsAndWatermarks` before
`keyBy` works:
> > > >>>>>>> ```java
> > > >>>>>>> DataStream<Trip> trips =
> > > >>>>>>>         env.addSource(consumer).assignTimestampsAndWatermarks(new
> > > >>>>>>> BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1))
{
> > > >>>>>>>     @Override
> > > >>>>>>>     public long extractTimestamp(Trip trip) {
> > > >>>>>>>         return trip.endTime.getTime();
> > > >>>>>>>     }
> > > >>>>>>> });
> > > >>>>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip
->
> > trip.userId);
> > > >>>>>>> DataStream<Trip> featurizedUserTrips = userTrips.map(trip
->
> > trip);
> > > >>>>>>> AllWindowedStream<Trip, TimeWindow> windowedUserTrips
=
> > > >>>>>>> featurizedUserTrips.timeWindowAll(Time.days(7),
> > > >>>>>>>         Time.days(1));
> > > >>>>>>> ```
> > > >>>>>>>
> > > >>>>>>> `assignTimestampsAndWatermarks` after `keyBy`
doesn't work:
> > > >>>>>>> ```java
> > > >>>>>>> DataStream<Trip> trips = env.addSource(consumer);
> > > >>>>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip
->
> > trip.userId);
> > > >>>>>>> DataStream<Trip> featurizedUserTrips =
> > > >>>>>>>         userTrips.map(trip ->
> > trip).assignTimestampsAndWatermarks(new
> > > >>>>>>> BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1))
{
> > > >>>>>>>     @Override
> > > >>>>>>>     public long extractTimestamp(Trip trip) {
> > > >>>>>>>         return trip.endTime.getTime();
> > > >>>>>>>     }
> > > >>>>>>> });
> > > >>>>>>> AllWindowedStream<Trip, TimeWindow> windowedUserTrips
=
> > > >>>>>>> featurizedUserTrips.timeWindowAll(Time.days(7),
> > > >>>>>>>         Time.days(1));
> > > >>>>>>> ```
> > > >>>>>>>
> > > >>>>>>> It feels a bug to me, but I want to confirm it
before I file the
> > bug
> > > >>>>>>> report.
> > > >>>>>>>
> > > >>>>>>> On 2019/04/18 03:38:34, Paul Lam <paullin3280@gmail.com>
wrote:
> > > >>>>>>>> Hi,
> > > >>>>>>>>
> > > >>>>>>>> Could you check the watermark of the window
operator? One
> > possible
> > > >>>>>>> situation would be some of the keys are not getting
enough
> > inputs, so
> > > >>>>> their
> > > >>>>>>> watermarks remain below the window end time and
hold the window
> > > >>>>> operator
> > > >>>>>>> watermark back. IMO, it’s a good practice to
assign watermark
> > earlier
> > > >>>>> in
> > > >>>>>>> the data pipeline.
> > > >>>>>>>> Best,
> > > >>>>>>>> Paul Lam
> > > >>>>>>>>
> > > >>>>>>>>> 在 2019年4月17日,23:04,an00na@gmail.com
写道:
> > > >>>>>>>>>
> > > >>>>>>>>> `assignTimestampsAndWatermarks` before
`keyBy` works:
> > > >>>>>>>>> ```java
> > > >>>>>>>>> DataStream<Trip> trips =
> > > >>>>>>>>>
> > env.addSource(consumer).assignTimestampsAndWatermarks(new
> > > >>>>>>> BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1))
{
> > > >>>>>>>>>            @Override
> > > >>>>>>>>>            public long extractTimestamp(Trip
trip) {
> > > >>>>>>>>>                return trip.endTime.getTime();
> > > >>>>>>>>>            }
> > > >>>>>>>>>        });
> > > >>>>>>>>> KeyedStream<Trip, Long> userTrips
= trips.keyBy(trip ->
> > > >>>>> trip.userId);
> > > >>>>>>>>> DataStream<FeaturizedTrip> featurizedUserTrips
=
> > > >>>>> userTrips.process(new
> > > >>>>>>> Featurization());
> > > >>>>>>>>> AllWindowedStream<FeaturizedTrip, TimeWindow>
> > windowedUserTrips =
> > > >>>>>>>>>        featurizedUserTrips.timeWindowAll(Time.days(7),
> > > >>>>>>>>>                Time.days(1));
> > > >>>>>>>>> ```
> > > >>>>>>>>>
> > > >>>>>>>>> But not after `keyBy` and `process`:
> > > >>>>>>>>> ```java
> > > >>>>>>>>> DataStream<Trip> trips = env.addSource(consumer);
> > > >>>>>>>>> KeyedStream<Trip, Long> userTrips
= trips.keyBy(trip ->
> > > >>>>> trip.userId);
> > > >>>>>>>>> DataStream<FeaturizedTrip> featurizedUserTrips
=
> > > >>>>>>>>>        userTrips.process(new
> > > >>>>>>> Featurization()).assignTimestampsAndWatermarks(new
> > > >>>>>>>
> > BoundedOutOfOrdernessTimestampExtractor<FeaturizedTrip>(Time.days(1)) {
> > > >>>>>>>>>            @Override
> > > >>>>>>>>>            public long extractTimestamp(FeaturizedTrip
trip) {
> > > >>>>>>>>>                return trip.endTime.getTime();
> > > >>>>>>>>>            }
> > > >>>>>>>>>        });
> > > >>>>>>>>> AllWindowedStream<FeaturizedTrip, TimeWindow>
> > windowedUserTrips =
> > > >>>>>>>>>        featurizedUserTrips.timeWindowAll(Time.days(7),
> > > >>>>>>>>>                Time.days(1));
> > > >>>>>>>>> ```
> > > >>>>>>>>> Windows are never triggered.
> > > >>>>>>>>>
> > > >>>>>>>>> Is it a bug or expected behavior? If the
latter, where is it
> > > >>>>>>> documented?
> > > >>
> > >
> > >
> >
> 

Mime
View raw message