flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@gmail.com>
Subject Re:
Date Fri, 10 May 2019 10:38:35 GMT
Hi,

Again answers below ;-)

Am Do., 9. Mai 2019 um 17:12 Uhr schrieb an0 <an00na@gmail.com>:

> You are right, thanks. But something is still not totally clear to me.
> I'll reuse your diagram with a little modification:
>
> DataStream<X> a = ...
> a.map(A).map(B).keyBy(....).timeWindow(C)
>
> and execute this with parallelism 2. However, keyBy only generates one
> single key value, and assume they all go to C.1. Does the data flow look
> like this?
>
> A.1 -- B.1 -----/-- C.1
>                     /
> A.2 -- B.2 --/       C.2
>
> Will the lack of data into C.2 prevent C.1's windows from firing? Will the
> location of assignTimestampsAndWatermarks(after a, after map(A), after
> map(B), after keyBy) matter for the firing of C.1's windows

By my understanding, the answers are "no" and "no". Correct?
>
> Q1: no. Watermarks are propagated even in case of skewed key distribution.
C.2 will also advance it's event-time clock (based on the WMs) and forward
new WMs.
Q2: after a, map(A), and map(B) would work fine. Assign watermarks
immediatedly after a keyBy() is not a good idea, because 1) the records are
shuffled and it's hard to reasoning about ordering, and 2) you lose the
KeyedStream property and would have to keyBy() again (unless you use
interpreteAsKeyedStream).


> Now comes the *silly* question: does C.2 exist at all? Since there is only
> one key value, only one C instance is needed. I could see that C.2 as a
> physical instance may exist, but as a logical instance it shouldn't exist
> in the diagram because it is unused. I feel the answer to this silly
> question may be the most important in understanding my and(perhaps many
> others') misunderstanding of situations like this.
>
> If C.2 exists just because parallelism is set to 2, even though it is not
> logically needed, and it also serves as an input to the next operator if
> there is one, then the mystery is completely solved for me.
>
> C.2 exists. Flink does not create a flow topology based on data values. As
soon as there is a record with a key that would need to go to C.2, we would
need it.


> Use a concrete example:
>
> DataStream<X> a = ...
>
> a.map(A).map(B).keyBy(....).assignTimestampsAndWatermarks(C).timeWindowAll(D)
>
> A.1 -- B.1 -----/-- C.1 --\
>                     /                 D
> A.2 -- B.2 --/       C.2 --/
>
> D's watermark can not progress because C.2's watermark can not progress,
> because C.2 doesn't have any input data, even though C.2 is not logically
> needed but it does exist and it ruins everything :p. Is this understanding
> correct?
>

Although C.2 does not receive data, it receives watermarks because WMs are
broadcasted. They flow to any task that is reachable by any event. The case
that all keys fall into C.1 is not important because a record for C.2 might
arrive at any point in time. Also Flink does does not give any guarantees
about how keys (or rather key groups) are assigned to tasks. If you rescale
the application to a parallelism of 3, the active key group might be
scheduled to C.2 or C.3.

Long story short, D makes progress in event time because watermarks are
broadcasted.


>
> On 2019/05/09 10:01:44, Fabian Hueske <fhueske@gmail.com> wrote:
> > Hi,
> >
> > Please find my response below.
> >
> > Am Fr., 3. Mai 2019 um 16:16 Uhr schrieb an0 <an00na@gmail.com>:
> >
> > > Thanks, but it does't seem covering this rule:
> > > --- Quote
> > > Watermarks are generated at, or directly after, source functions. Each
> > > parallel subtask of a source function usually generates its watermarks
> > > independently. These watermarks define the event time at that
> particular
> > > parallel source.
> > >
> > > As the watermarks flow through the streaming program, they advance the
> > > event time at the operators where they arrive. Whenever an operator
> > > advances its event time, it generates a new watermark downstream for
> its
> > > successor operators.
> > >
> > > Some operators consume multiple input streams; a union, for example, or
> > > operators following a keyBy(…) or partition(…) function. Such an
> operator’s
> > > current event time is the minimum of its input streams’ event times.
> As its
> > > input streams update their event times, so does the operator.
> > > --- End Quote
> > >
> > > The most relevant part, I believe, is this:
> > > "Some operators consume multiple input streams…operators following a
> > > keyBy(…) function. Such an operator’s current event time is the
> minimum of
> > > its input streams’ event times."
> > >
> > > But the wording of "current event time is the minimum of its input
> > > streams’ event times" actually implies that the input streams(produced
> by
> > > keyBy) have different watermarks, the exactly opposite of what you just
> > > explained.
> > >
> > >
> > IMO, the description in the documentation is correct, but looks at the
> > issue from a different angle.
> > An operator task typically has many input from which it receives records.
> > Depending on the number of input operators (one ore more) and the
> > connection between the operator and its input operators (forward,
> > partition, broadcast), an operator task might have a connection to one,
> > some, or all tasks of its input operators. Each input task can send a
> > different watermark, but each task will also send the same watermark to
> all
> > its output tasks.
> >
> > So, this is a matter of distinguishing receiving (different) watermarks
> and
> > emitting (the same) watermarks.
> >
> > Best, Fabian
> >
> >
> > > On 2019/05/03 07:32:07, Fabian Hueske <fhueske@gmail.com> wrote:
> > > > Hi,
> > > >
> > > > this should be covered here:
> > > >
> > >
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/event_time.html#watermarks-in-parallel-streams
> > > >
> > > > Best, Fabian
> > > >
> > > > Am Do., 2. Mai 2019 um 17:48 Uhr schrieb an0 <an00na@gmail.com>:
> > > >
> > > > > This explanation is exactly what I'm looking for, thanks! Is such
> an
> > > > > important rule documented anywhere in the official document?
> > > > >
> > > > > On 2019/04/30 08:47:29, Fabian Hueske <fhueske@gmail.com> wrote:
> > > > > > An operator task broadcasts its current watermark to all
> downstream
> > > tasks
> > > > > > that might receive its records.
> > > > > > If you have an the following code:
> > > > > >
> > > > > > DataStream<X> a = ...
> > > > > > a.map(A).map(B).keyBy(....).window(C)
> > > > > >
> > > > > > and execute this with parallelism 2, your plan looks like this
> > > > > >
> > > > > > A.1 -- B.1 --\--/-- C.1
> > > > > >                       X
> > > > > > A.2 -- B.2 --/--\-- C.2
> > > > > >
> > > > > > A.1 will propagate its watermarks to B.1 because only B.1 will
> > > receive
> > > > > its
> > > > > > output events.
> > > > > > However, B.1 will propagate its watermarks to C.1 and C.2
> because the
> > > > > > output of B.1 is partitioned and all C tasks might receive output
> > > events
> > > > > > from B.1.
> > > > > >
> > > > > > Best, Fabian
> > > > > >
> > > > > > Am Mo., 29. Apr. 2019 um 20:06 Uhr schrieb an0 <an00na@gmail.com
> >:
> > > > > >
> > > > > > > Thanks very much. It definitely explains the problem I'm
> seeing.
> > > > > However,
> > > > > > > something I need to confirm:
> > > > > > > You say "Watermarks are broadcasted/forwarded anyway."
Do you
> > > mean, in
> > > > > > > assingTimestampsAndWatermarks.keyBy.window, it doesn't
matter
> what
> > > data
> > > > > > > flows through a specific key's stream, all key streams
have the
> > > same
> > > > > > > watermarks? So time-wise, `window` behaves as if `keyBy`
is not
> > > there
> > > > > at
> > > > > > > all?
> > > > > > >
> > > > > > > On 2019/04/26 06:34:10, Dawid Wysakowicz <
> dwysakowicz@apache.org>
> > > > > wrote:
> > > > > > > > Hi,
> > > > > > > >
> > > > > > > > Watermarks are meta events that travel independently
of data
> > > events.
> > > > > > > >
> > > > > > > > 1) If you assingTimestampsAndWatermarks before keyBy,
all
> > > parallel
> > > > > > > > instances of trips have some data(this is my assumption)
so
> > > > > Watermarks
> > > > > > > > can be generated. Afterwards even if some of the keyed
> partitions
> > > > > have
> > > > > > > > no data, Watermarks are broadcasted/forwarded anyway.
In
> other
> > > words
> > > > > if
> > > > > > > > at some point Watermarks were generated for all partitions
> of a
> > > > > single
> > > > > > > > stage, they will be forwarded beyond this point.
> > > > > > > >
> > > > > > > > 2) If you assingTimestampsAndWatermarks after keyBy,
you try
> to
> > > > > assign
> > > > > > > > watermarks for an empty partition which produces no
> Watermarks
> > > at all
> > > > > > > > for this partition, therefore there is no progress
beyond
> this
> > > point.
> > > > > > > >
> > > > > > > > I hope this clarifies it a bit.
> > > > > > > >
> > > > > > > > Best,
> > > > > > > >
> > > > > > > > Dawid
> > > > > > > >
> > > > > > > > On 25/04/2019 16:49, an0 wrote:
> > > > > > > > > If my understanding is correct, then why
> > > > > > > `assignTimestampsAndWatermarks` before `keyBy` works? The
> > > > > `timeWindowAll`
> > > > > > > stream's input streams are task 1 and task 2, with task
2
> idling,
> > > no
> > > > > matter
> > > > > > > whether `assignTimestampsAndWatermarks` is before or after
> `keyBy`,
> > > > > because
> > > > > > > whether task 2 receives elements only depends on the key
> > > distribution,
> > > > > has
> > > > > > > nothing to do with timestamp assignment, right?
> > > > > > > > >
> > > > > > > > >
> > > > > > >                  /key 1 trips\
> > > > > > > > >
> > > > > > >                /                    \
> > > > > > > > > (A) trips--> assignTimestampsAndWatermarks-->keyBy
> > > > > > > timeWindowAll
> > > > > > > > >
> > > > > > >                \       idle        /
> > > > > > > > >
> > > > > > >                  \key 2 trips/
> > > > > > > > >
> > > > > > > > >                            /key 1 trips-->
> > > > > > > assignTimestampsAndWatermarks\
> > > > > > > > >                          /
> > > > > > >                                      \
> > > > > > > > > (B) trips-->keyBy
> > > > > > >                            timeWindowAll
> > > > > > > > >                          \       idle
> > > > > > >                                    /
> > > > > > > > >                            \key 2 trips-->
> > > > > > > assignTimestampsAndWatermarks/
> > > > > > > > >
> > > > > > > > > How things are different between A and B from
> `timeWindowAll`'s
> > > > > > > perspective?
> > > > > > > > >
> > > > > > > > > BTW, thanks for the webinar link, I'll check
it later.
> > > > > > > > >
> > > > > > > > > On 2019/04/25 08:30:20, Dawid Wysakowicz <
> > > dwysakowicz@apache.org>
> > > > > > > wrote:
> > > > > > > > >> Hi,
> > > > > > > > >>
> > > > > > > > >> Yes I think your explanation is correct.
I can also
> recommend
> > > > > Seth's
> > > > > > > > >> webinar where he talks about debugging Watermarks[1]
> > > > > > > > >>
> > > > > > > > >> Best,
> > > > > > > > >>
> > > > > > > > >> Dawid
> > > > > > > > >>
> > > > > > > > >> [1]
> > > > > > > > >>
> > > > > > >
> > > > >
> > >
> https://www.ververica.com/resources/webinar/webinar/debugging-flink-tutorial
> > > > > > > > >>
> > > > > > > > >> On 22/04/2019 22:55, an0 wrote:
> > > > > > > > >>> Thanks, I feel I'm getting closer to
the truth.
> > > > > > > > >>>
> > > > > > > > >>> So parallelism is the cause? Say my parallelism
is 2.
> Does
> > > that
> > > > > mean
> > > > > > > I get 2 tasks running after `keyBy` if even all elements
have
> the
> > > same
> > > > > key
> > > > > > > so go to 1 down stream(say task 1)? And it is the other
> task(task
> > > 2)
> > > > > with
> > > > > > > no incoming data that caused the `timeWindowAll` stream
unable
> to
> > > > > progress?
> > > > > > > Because both task 1 and task 2 are its input streams and
one is
> > > idling
> > > > > so
> > > > > > > its event time cannot make progress?
> > > > > > > > >>>
> > > > > > > > >>> On 2019/04/22 01:57:39, Guowei Ma <guowei.mgw@gmail.com>
> > > wrote:
> > > > > > > > >>>> HI,
> > > > > > > > >>>>
> > > > > > > > >>>> BoundedOutOfOrdernessTimestampExtractors
can send a WM
> at
> > > least
> > > > > > > after it
> > > > > > > > >>>> receives an element.
> > > > > > > > >>>>
> > > > > > > > >>>> For after Keyby:
> > > > > > > > >>>> Flink uses the HashCode of key and
the parallelism of
> down
> > > > > stream
> > > > > > > to decide
> > > > > > > > >>>> which subtask would receive the element.
This means if
> your
> > > key
> > > > > is
> > > > > > > always
> > > > > > > > >>>> same, all the sources will only send
the elements to the
> > > same
> > > > > down
> > > > > > > stream
> > > > > > > > >>>> task, for example only no. 3
> > > > > > > BoundedOutOfOrdernessTimestampExtractor.
> > > > > > > > >>>>
> > > > > > > > >>>> For before Keyby:
> > > > > > > > >>>> In your case, the Source and
> > > > > > > BoundedOutOfOrdernessTimestampExtractors would
> > > > > > > > >>>> be chained together, which means
every
> > > > > > > > >>>> BoundedOutOfOrdernessTimestampExtractors
will receive
> > > elements.
> > > > > > > > >>>>
> > > > > > > > >>>> Best,
> > > > > > > > >>>> Guowei
> > > > > > > > >>>>
> > > > > > > > >>>>
> > > > > > > > >>>> an0 <an00na@gmail.com> 于2019年4月19日周五
下午10:41写道:
> > > > > > > > >>>>
> > > > > > > > >>>>> Hi,
> > > > > > > > >>>>>
> > > > > > > > >>>>> First of all, thank you for the
`shuffle()` tip. It
> works.
> > > > > > > However, I
> > > > > > > > >>>>> still don't understand why it
doesn't work without
> calling
> > > > > > > `shuffle()`.
> > > > > > > > >>>>>
> > > > > > > > >>>>> Why would not all
> BoundedOutOfOrdernessTimestampExtractors
> > > > > receive
> > > > > > > trips?
> > > > > > > > >>>>> All the trips has keys and timestamps.
As I said in my
> > > reply to
> > > > > > > Paul, I see
> > > > > > > > >>>>> the same watermarks being extracted.
> > > > > > > > >>>>>
> > > > > > > > >>>>> How could calling `assignTimestampsAndWatermarks`
> before VS
> > > > > after
> > > > > > > `keyBy`
> > > > > > > > >>>>> matter? My understanding is any
specific window for a
> > > specific
> > > > > key
> > > > > > > always
> > > > > > > > >>>>> receives the exactly same data,
and the calling order
> of
> > > > > > > > >>>>> `assignTimestampsAndWatermarks`
and `keyBy` shouldn't
> > > affect
> > > > > that.
> > > > > > > > >>>>>
> > > > > > > > >>>>> To make `keyBy` as irrelevant
as possible, I tried
> letting
> > > it
> > > > > > > always
> > > > > > > > >>>>> return the same key so that there
is only 1 keyed
> stream
> > > and
> > > > > it is
> > > > > > > exactly
> > > > > > > > >>>>> the same as the original unkeyed
stream. It still
> doesn't
> > > > > trigger
> > > > > > > windows:
> > > > > > > > >>>>> ```java
> > > > > > > > >>>>> DataStream<Trip> trips
= env.addSource(consumer);
> > > > > > > > >>>>> KeyedStream<Trip, Long>
userTrips = trips.keyBy(trip ->
> > > 0L);
> > > > > > > > >>>>> DataStream<Trip> featurizedUserTrips
=
> > > > > > > > >>>>>         userTrips.map(trip ->
> > > > > > > trip).assignTimestampsAndWatermarks(new
> > > > > > > > >>>>>
> > > BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) {
> > > > > > > > >>>>>     @Override
> > > > > > > > >>>>>     public long extractTimestamp(Trip
trip) {
> > > > > > > > >>>>>         return trip.endTime.getTime();
> > > > > > > > >>>>>     }
> > > > > > > > >>>>> });
> > > > > > > > >>>>> AllWindowedStream<Trip, TimeWindow>
windowedUserTrips =
> > > > > > > > >>>>> featurizedUserTrips.timeWindowAll(Time.days(7),
> > > > > > > > >>>>>         Time.days(1));
> > > > > > > > >>>>> ```
> > > > > > > > >>>>>
> > > > > > > > >>>>> It makes no sense to me. Please
help me understand why
> it
> > > > > doesn't
> > > > > > > work.
> > > > > > > > >>>>> Thanks!
> > > > > > > > >>>>>
> > > > > > > > >>>>> On 2019/04/19 04:14:31, Guowei
Ma <
> guowei.mgw@gmail.com>
> > > > > wrote:
> > > > > > > > >>>>>> Hi,
> > > > > > > > >>>>>> After keyby maybe only some
of
> > > > > > > BoundedOutOfOrdernessTimestampExtractors
> > > > > > > > >>>>>> could receive the elements(trip).
If that is the case
> > > > > > > > >>>>>> BoundedOutOfOrdernessTimestampExtractor,
which does
> not
> > > > > receive
> > > > > > > element
> > > > > > > > >>>>>> would not send the WM. Since
that the timeWindowAll
> > > operator
> > > > > > > could not be
> > > > > > > > >>>>>> triggered.
> > > > > > > > >>>>>> You could add a shuffle()
before the
> > > > > > > assignTimestampsAndWatermarks in
> > > > > > > > >>>>> your
> > > > > > > > >>>>>> second case and check if
the window is triggered.  If
> it
> > > > > could be
> > > > > > > > >>>>> triggered
> > > > > > > > >>>>>> you could check the distribution
of elements
> generated by
> > > the
> > > > > > > source.
> > > > > > > > >>>>>>
> > > > > > > > >>>>>> Best,
> > > > > > > > >>>>>> Guowei
> > > > > > > > >>>>>>
> > > > > > > > >>>>>>
> > > > > > > > >>>>>> an00na@gmail.com <an00na@gmail.com>
于2019年4月19日周五
> > > 上午4:10写道:
> > > > > > > > >>>>>>
> > > > > > > > >>>>>>> I don't think it is the
watermark. I see the same
> > > watermarks
> > > > > > > from the
> > > > > > > > >>>>> two
> > > > > > > > >>>>>>> versions of code.
> > > > > > > > >>>>>>>
> > > > > > > > >>>>>>> The processing on the
keyed stream doesn't change
> event
> > > time
> > > > > at
> > > > > > > all. I
> > > > > > > > >>>>> can
> > > > > > > > >>>>>>> simply change my code
to use `map` on the keyed
> stream to
> > > > > return
> > > > > > > back
> > > > > > > > >>>>> the
> > > > > > > > >>>>>>> input data, so that the
window operator receives the
> > > exactly
> > > > > same
> > > > > > > > >>>>> data. The
> > > > > > > > >>>>>>> only difference is when
I do
> > > > > `assignTimestampsAndWatermarks`. The
> > > > > > > > >>>>> result is
> > > > > > > > >>>>>>> the same, `assignTimestampsAndWatermarks`
before
> `keyBy`
> > > > > works:
> > > > > > > > >>>>>>> ```java
> > > > > > > > >>>>>>> DataStream<Trip>
trips =
> > > > > > > > >>>>>>>
> > > > >  env.addSource(consumer).assignTimestampsAndWatermarks(new
> > > > > > > > >>>>>>>
> > > BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) {
> > > > > > > > >>>>>>>     @Override
> > > > > > > > >>>>>>>     public long extractTimestamp(Trip
trip) {
> > > > > > > > >>>>>>>         return trip.endTime.getTime();
> > > > > > > > >>>>>>>     }
> > > > > > > > >>>>>>> });
> > > > > > > > >>>>>>> KeyedStream<Trip,
Long> userTrips = trips.keyBy(trip
> ->
> > > > > > > trip.userId);
> > > > > > > > >>>>>>> DataStream<Trip>
featurizedUserTrips =
> > > userTrips.map(trip ->
> > > > > > > trip);
> > > > > > > > >>>>>>> AllWindowedStream<Trip,
TimeWindow>
> windowedUserTrips =
> > > > > > > > >>>>>>> featurizedUserTrips.timeWindowAll(Time.days(7),
> > > > > > > > >>>>>>>         Time.days(1));
> > > > > > > > >>>>>>> ```
> > > > > > > > >>>>>>>
> > > > > > > > >>>>>>> `assignTimestampsAndWatermarks`
after `keyBy` doesn't
> > > work:
> > > > > > > > >>>>>>> ```java
> > > > > > > > >>>>>>> DataStream<Trip>
trips = env.addSource(consumer);
> > > > > > > > >>>>>>> KeyedStream<Trip,
Long> userTrips = trips.keyBy(trip
> ->
> > > > > > > trip.userId);
> > > > > > > > >>>>>>> DataStream<Trip>
featurizedUserTrips =
> > > > > > > > >>>>>>>         userTrips.map(trip
->
> > > > > > > trip).assignTimestampsAndWatermarks(new
> > > > > > > > >>>>>>>
> > > BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) {
> > > > > > > > >>>>>>>     @Override
> > > > > > > > >>>>>>>     public long extractTimestamp(Trip
trip) {
> > > > > > > > >>>>>>>         return trip.endTime.getTime();
> > > > > > > > >>>>>>>     }
> > > > > > > > >>>>>>> });
> > > > > > > > >>>>>>> AllWindowedStream<Trip,
TimeWindow>
> windowedUserTrips =
> > > > > > > > >>>>>>> featurizedUserTrips.timeWindowAll(Time.days(7),
> > > > > > > > >>>>>>>         Time.days(1));
> > > > > > > > >>>>>>> ```
> > > > > > > > >>>>>>>
> > > > > > > > >>>>>>> It feels a bug to me,
but I want to confirm it
> before I
> > > file
> > > > > the
> > > > > > > bug
> > > > > > > > >>>>>>> report.
> > > > > > > > >>>>>>>
> > > > > > > > >>>>>>> On 2019/04/18 03:38:34,
Paul Lam <
> paullin3280@gmail.com>
> > > > > wrote:
> > > > > > > > >>>>>>>> Hi,
> > > > > > > > >>>>>>>>
> > > > > > > > >>>>>>>> Could you check the
watermark of the window
> operator?
> > > One
> > > > > > > possible
> > > > > > > > >>>>>>> situation would be some
of the keys are not getting
> > > enough
> > > > > > > inputs, so
> > > > > > > > >>>>> their
> > > > > > > > >>>>>>> watermarks remain below
the window end time and hold
> the
> > > > > window
> > > > > > > > >>>>> operator
> > > > > > > > >>>>>>> watermark back. IMO,
it’s a good practice to assign
> > > watermark
> > > > > > > earlier
> > > > > > > > >>>>> in
> > > > > > > > >>>>>>> the data pipeline.
> > > > > > > > >>>>>>>> Best,
> > > > > > > > >>>>>>>> Paul Lam
> > > > > > > > >>>>>>>>
> > > > > > > > >>>>>>>>> 在 2019年4月17日,23:04,an00na@gmail.com
写道:
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>> `assignTimestampsAndWatermarks`
before `keyBy`
> works:
> > > > > > > > >>>>>>>>> ```java
> > > > > > > > >>>>>>>>> DataStream<Trip>
trips =
> > > > > > > > >>>>>>>>>
> > > > > > > env.addSource(consumer).assignTimestampsAndWatermarks(new
> > > > > > > > >>>>>>>
> > > BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) {
> > > > > > > > >>>>>>>>>            @Override
> > > > > > > > >>>>>>>>>            public
long extractTimestamp(Trip trip)
> {
> > > > > > > > >>>>>>>>>             
  return trip.endTime.getTime();
> > > > > > > > >>>>>>>>>            }
> > > > > > > > >>>>>>>>>        });
> > > > > > > > >>>>>>>>> KeyedStream<Trip,
Long> userTrips =
> trips.keyBy(trip ->
> > > > > > > > >>>>> trip.userId);
> > > > > > > > >>>>>>>>> DataStream<FeaturizedTrip>
featurizedUserTrips =
> > > > > > > > >>>>> userTrips.process(new
> > > > > > > > >>>>>>> Featurization());
> > > > > > > > >>>>>>>>> AllWindowedStream<FeaturizedTrip,
TimeWindow>
> > > > > > > windowedUserTrips =
> > > > > > > > >>>>>>>>>
> featurizedUserTrips.timeWindowAll(Time.days(7),
> > > > > > > > >>>>>>>>>             
  Time.days(1));
> > > > > > > > >>>>>>>>> ```
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>> But not after
`keyBy` and `process`:
> > > > > > > > >>>>>>>>> ```java
> > > > > > > > >>>>>>>>> DataStream<Trip>
trips = env.addSource(consumer);
> > > > > > > > >>>>>>>>> KeyedStream<Trip,
Long> userTrips =
> trips.keyBy(trip ->
> > > > > > > > >>>>> trip.userId);
> > > > > > > > >>>>>>>>> DataStream<FeaturizedTrip>
featurizedUserTrips =
> > > > > > > > >>>>>>>>>        userTrips.process(new
> > > > > > > > >>>>>>> Featurization()).assignTimestampsAndWatermarks(new
> > > > > > > > >>>>>>>
> > > > > > >
> > > BoundedOutOfOrdernessTimestampExtractor<FeaturizedTrip>(Time.days(1))
{
> > > > > > > > >>>>>>>>>            @Override
> > > > > > > > >>>>>>>>>            public
long
> extractTimestamp(FeaturizedTrip
> > > > > trip) {
> > > > > > > > >>>>>>>>>             
  return trip.endTime.getTime();
> > > > > > > > >>>>>>>>>            }
> > > > > > > > >>>>>>>>>        });
> > > > > > > > >>>>>>>>> AllWindowedStream<FeaturizedTrip,
TimeWindow>
> > > > > > > windowedUserTrips =
> > > > > > > > >>>>>>>>>
> featurizedUserTrips.timeWindowAll(Time.days(7),
> > > > > > > > >>>>>>>>>             
  Time.days(1));
> > > > > > > > >>>>>>>>> ```
> > > > > > > > >>>>>>>>> Windows are never
triggered.
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>> Is it a bug or
expected behavior? If the latter,
> where
> > > is
> > > > > it
> > > > > > > > >>>>>>> documented?
> > > > > > > > >>
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Mime
View raw message