flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From an0 <an0...@gmail.com>
Subject Re: assignTimestampsAndWatermarks not work after KeyedStream.process
Date Thu, 25 Apr 2019 14:49:00 GMT
If my understanding is correct, then why `assignTimestampsAndWatermarks` before `keyBy` works?
The `timeWindowAll` stream's input streams are task 1 and task 2, with task 2 idling, no matter
whether `assignTimestampsAndWatermarks` is before or after `keyBy`, because whether task 2
receives elements only depends on the key distribution, has nothing to do with timestamp assignment,
right?

                                                                                        /key
1 trips\
                                                                                      /  
                 \  
(A) trips--> assignTimestampsAndWatermarks-->keyBy                    timeWindowAll
                                                                                      \  
    idle        /
                                                                                        \key
2 trips/

                           /key 1 trips--> assignTimestampsAndWatermarks\
                         /                                                               
                 \  
(B) trips-->keyBy                                                                     
           timeWindowAll
                         \       idle                                                    
                /
                           \key 2 trips--> assignTimestampsAndWatermarks/

How things are different between A and B from `timeWindowAll`'s perspective?

BTW, thanks for the webinar link, I'll check it later.

On 2019/04/25 08:30:20, Dawid Wysakowicz <dwysakowicz@apache.org> wrote: 
> Hi,
> 
> Yes I think your explanation is correct. I can also recommend Seth's
> webinar where he talks about debugging Watermarks[1]
> 
> Best,
> 
> Dawid
> 
> [1]
> https://www.ververica.com/resources/webinar/webinar/debugging-flink-tutorial
> 
> On 22/04/2019 22:55, an0 wrote:
> > Thanks, I feel I'm getting closer to the truth. 
> >
> > So parallelism is the cause? Say my parallelism is 2. Does that mean I get 2 tasks
running after `keyBy` if even all elements have the same key so go to 1 down stream(say task
1)? And it is the other task(task 2) with no incoming data that caused the `timeWindowAll`
stream unable to progress? Because both task 1 and task 2 are its input streams and one is
idling so its event time cannot make progress?
> >
> > On 2019/04/22 01:57:39, Guowei Ma <guowei.mgw@gmail.com> wrote: 
> >> HI,
> >>
> >> BoundedOutOfOrdernessTimestampExtractors can send a WM at least after it
> >> receives an element.
> >>
> >> For after Keyby:
> >> Flink uses the HashCode of key and the parallelism of down stream to decide
> >> which subtask would receive the element. This means if your key is always
> >> same, all the sources will only send the elements to the same down stream
> >> task, for example only no. 3 BoundedOutOfOrdernessTimestampExtractor.
> >>
> >> For before Keyby:
> >> In your case, the Source and BoundedOutOfOrdernessTimestampExtractors would
> >> be chained together, which means every
> >> BoundedOutOfOrdernessTimestampExtractors will receive elements.
> >>
> >> Best,
> >> Guowei
> >>
> >>
> >> an0 <an00na@gmail.com> 于2019年4月19日周五 下午10:41写道:
> >>
> >>> Hi,
> >>>
> >>> First of all, thank you for the `shuffle()` tip. It works. However, I
> >>> still don't understand why it doesn't work without calling `shuffle()`.
> >>>
> >>> Why would not all BoundedOutOfOrdernessTimestampExtractors receive trips?
> >>> All the trips has keys and timestamps. As I said in my reply to Paul, I
see
> >>> the same watermarks being extracted.
> >>>
> >>> How could calling `assignTimestampsAndWatermarks` before VS after `keyBy`
> >>> matter? My understanding is any specific window for a specific key always
> >>> receives the exactly same data, and the calling order of
> >>> `assignTimestampsAndWatermarks` and `keyBy` shouldn't affect that.
> >>>
> >>> To make `keyBy` as irrelevant as possible, I tried letting it always
> >>> return the same key so that there is only 1 keyed stream and it is exactly
> >>> the same as the original unkeyed stream. It still doesn't trigger windows:
> >>> ```java
> >>> DataStream<Trip> trips = env.addSource(consumer);
> >>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> 0L);
> >>> DataStream<Trip> featurizedUserTrips =
> >>>         userTrips.map(trip -> trip).assignTimestampsAndWatermarks(new
> >>> BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) {
> >>>     @Override
> >>>     public long extractTimestamp(Trip trip) {
> >>>         return trip.endTime.getTime();
> >>>     }
> >>> });
> >>> AllWindowedStream<Trip, TimeWindow> windowedUserTrips =
> >>> featurizedUserTrips.timeWindowAll(Time.days(7),
> >>>         Time.days(1));
> >>> ```
> >>>
> >>> It makes no sense to me. Please help me understand why it doesn't work.
> >>> Thanks!
> >>>
> >>> On 2019/04/19 04:14:31, Guowei Ma <guowei.mgw@gmail.com> wrote:
> >>>> Hi,
> >>>> After keyby maybe only some of BoundedOutOfOrdernessTimestampExtractors
> >>>> could receive the elements(trip). If that is the case
> >>>> BoundedOutOfOrdernessTimestampExtractor, which does not receive element
> >>>> would not send the WM. Since that the timeWindowAll operator could not
be
> >>>> triggered.
> >>>> You could add a shuffle() before the assignTimestampsAndWatermarks in
> >>> your
> >>>> second case and check if the window is triggered.  If it could be
> >>> triggered
> >>>> you could check the distribution of elements generated by the source.
> >>>>
> >>>> Best,
> >>>> Guowei
> >>>>
> >>>>
> >>>> an00na@gmail.com <an00na@gmail.com> 于2019年4月19日周五
上午4:10写道:
> >>>>
> >>>>> I don't think it is the watermark. I see the same watermarks from
the
> >>> two
> >>>>> versions of code.
> >>>>>
> >>>>> The processing on the keyed stream doesn't change event time at
all. I
> >>> can
> >>>>> simply change my code to use `map` on the keyed stream to return
back
> >>> the
> >>>>> input data, so that the window operator receives the exactly same
> >>> data. The
> >>>>> only difference is when I do `assignTimestampsAndWatermarks`. The
> >>> result is
> >>>>> the same, `assignTimestampsAndWatermarks` before `keyBy` works:
> >>>>> ```java
> >>>>> DataStream<Trip> trips =
> >>>>>         env.addSource(consumer).assignTimestampsAndWatermarks(new
> >>>>> BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1))
{
> >>>>>     @Override
> >>>>>     public long extractTimestamp(Trip trip) {
> >>>>>         return trip.endTime.getTime();
> >>>>>     }
> >>>>> });
> >>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip ->
trip.userId);
> >>>>> DataStream<Trip> featurizedUserTrips = userTrips.map(trip
-> trip);
> >>>>> AllWindowedStream<Trip, TimeWindow> windowedUserTrips =
> >>>>> featurizedUserTrips.timeWindowAll(Time.days(7),
> >>>>>         Time.days(1));
> >>>>> ```
> >>>>>
> >>>>> `assignTimestampsAndWatermarks` after `keyBy` doesn't work:
> >>>>> ```java
> >>>>> DataStream<Trip> trips = env.addSource(consumer);
> >>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip ->
trip.userId);
> >>>>> DataStream<Trip> featurizedUserTrips =
> >>>>>         userTrips.map(trip -> trip).assignTimestampsAndWatermarks(new
> >>>>> BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1))
{
> >>>>>     @Override
> >>>>>     public long extractTimestamp(Trip trip) {
> >>>>>         return trip.endTime.getTime();
> >>>>>     }
> >>>>> });
> >>>>> AllWindowedStream<Trip, TimeWindow> windowedUserTrips =
> >>>>> featurizedUserTrips.timeWindowAll(Time.days(7),
> >>>>>         Time.days(1));
> >>>>> ```
> >>>>>
> >>>>> It feels a bug to me, but I want to confirm it before I file the
bug
> >>>>> report.
> >>>>>
> >>>>> On 2019/04/18 03:38:34, Paul Lam <paullin3280@gmail.com> wrote:
> >>>>>> Hi,
> >>>>>>
> >>>>>> Could you check the watermark of the window operator? One possible
> >>>>> situation would be some of the keys are not getting enough inputs,
so
> >>> their
> >>>>> watermarks remain below the window end time and hold the window
> >>> operator
> >>>>> watermark back. IMO, it’s a good practice to assign watermark
earlier
> >>> in
> >>>>> the data pipeline.
> >>>>>> Best,
> >>>>>> Paul Lam
> >>>>>>
> >>>>>>> 在 2019年4月17日,23:04,an00na@gmail.com 写道:
> >>>>>>>
> >>>>>>> `assignTimestampsAndWatermarks` before `keyBy` works:
> >>>>>>> ```java
> >>>>>>> DataStream<Trip> trips =
> >>>>>>>        env.addSource(consumer).assignTimestampsAndWatermarks(new
> >>>>> BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1))
{
> >>>>>>>            @Override
> >>>>>>>            public long extractTimestamp(Trip trip) {
> >>>>>>>                return trip.endTime.getTime();
> >>>>>>>            }
> >>>>>>>        });
> >>>>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip
->
> >>> trip.userId);
> >>>>>>> DataStream<FeaturizedTrip> featurizedUserTrips =
> >>> userTrips.process(new
> >>>>> Featurization());
> >>>>>>> AllWindowedStream<FeaturizedTrip, TimeWindow> windowedUserTrips
=
> >>>>>>>        featurizedUserTrips.timeWindowAll(Time.days(7),
> >>>>>>>                Time.days(1));
> >>>>>>> ```
> >>>>>>>
> >>>>>>> But not after `keyBy` and `process`:
> >>>>>>> ```java
> >>>>>>> DataStream<Trip> trips = env.addSource(consumer);
> >>>>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip
->
> >>> trip.userId);
> >>>>>>> DataStream<FeaturizedTrip> featurizedUserTrips =
> >>>>>>>        userTrips.process(new
> >>>>> Featurization()).assignTimestampsAndWatermarks(new
> >>>>> BoundedOutOfOrdernessTimestampExtractor<FeaturizedTrip>(Time.days(1))
{
> >>>>>>>            @Override
> >>>>>>>            public long extractTimestamp(FeaturizedTrip trip)
{
> >>>>>>>                return trip.endTime.getTime();
> >>>>>>>            }
> >>>>>>>        });
> >>>>>>> AllWindowedStream<FeaturizedTrip, TimeWindow> windowedUserTrips
=
> >>>>>>>        featurizedUserTrips.timeWindowAll(Time.days(7),
> >>>>>>>                Time.days(1));
> >>>>>>> ```
> >>>>>>> Windows are never triggered.
> >>>>>>>
> >>>>>>> Is it a bug or expected behavior? If the latter, where is
it
> >>>>> documented?
> >>>>>>
> 
> 

Mime
View raw message