From user-return-27510-archive-asf-public=cust-asf.ponee.io@flink.apache.org Fri May 3 07:32:56 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id 16E3E18064D for ; Fri, 3 May 2019 09:32:55 +0200 (CEST) Received: (qmail 43366 invoked by uid 500); 3 May 2019 07:32:51 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 43354 invoked by uid 99); 3 May 2019 07:32:51 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 03 May 2019 07:32:51 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 43A2B180DD8 for ; Fri, 3 May 2019 07:32:50 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.8 X-Spam-Level: ** X-Spam-Status: No, score=2.8 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, DKIM_VALID_EF=-0.1, FREEMAIL_REPLY=1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd3-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id ByPN-ZYVVVtb for ; Fri, 3 May 2019 07:32:47 +0000 (UTC) Received: from mail-oi1-f194.google.com (mail-oi1-f194.google.com [209.85.167.194]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id 560B760CD6 for ; Fri, 3 May 2019 07:32:46 +0000 (UTC) Received: by mail-oi1-f194.google.com with SMTP id w130so3316671oie.6 for ; Fri, 03 May 2019 00:32:46 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:references:in-reply-to:from:date:message-id:subject:to :cc; bh=dPch1o+p0HbkUbJKPISCppAY+N7WsWiQVjFeGZ9U8/A=; b=BRpH7YdsSVnLd3iusZqISDVtBxD8YN43zbF6ese3q84IZ+ay4DW/vvk7Vcr5i7XABA ZJ/zeAcyl5rlncNvHj784C99JWG4KWbl0MeNAeBJ38GhRJ4SHJX+pEMeBBCFp4oBb5MK q444VRZYkzDSMgpo5+LGLohZoH40ZWZfgRsxDbge4lwzCZutfihpIXvUJNEPl/24OiuB W4jw6juxEeyRZDKfh6SRCrB6Fi6eU5VIS8KXRO/p1Nt04dQF+RxulaLDY8I2sXiHRsjd y2Ag/0K+FjQhwDT7pEHBiHtHFNfQLyJrlFkeMNhuwc4j9hK/fOBmIMApv7dgryCSGcSh 8N9g== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:references:in-reply-to:from:date :message-id:subject:to:cc; bh=dPch1o+p0HbkUbJKPISCppAY+N7WsWiQVjFeGZ9U8/A=; b=sn4vcw1IftNs7Vl225T+jdMUmnwUudRpZXCAcWRTbJWgornAp4ijXvtjI/Y4Dg6src Mw2LajXklNHxbbUYy5dPFR1e/7olDcPlmTHogNKRy4/bOm8Bcz5WuWI2xEOVRfupXh/q CUIV/zTNtEmfZNX8u9K6+74H4tET756vfJ087SYMLrWZiBJLXq8PRn85eAmCBDXbXEyA jKAKScjEtBPZfCOHwuDnOZyDInTd3qIbMl7YMNWNklVu/8nPIWI1LyiXJACh4V4xeDHB oAMoha3Z9nlO+zFtci91InLWNUcmq+XQiPaC3qrJGasr42kBxa9a9k0zZtDBo2Onovt2 Sumw== X-Gm-Message-State: APjAAAU+QshdznfFmQRzUvril0m633Omz22X3OVHlJufezaDHCRQdQOl cGcIGOX4zAImurcb89cCvg1pxTEwRzda7N2ITlVCiy9f X-Google-Smtp-Source: APXvYqwtJ/ArIaHWlbOytcw3HfqVNbqBbOKG9a/11jvVkjgURFxp20y6NnqvtriU5VLKpiD391J+ds2MnjJluKAJbK8= X-Received: by 2002:aca:c246:: with SMTP id s67mr5264971oif.159.1556868764505; Fri, 03 May 2019 00:32:44 -0700 (PDT) MIME-Version: 1.0 References: <8af9a7bb-3200-2107-1b99-f09b59c29a29@apache.org> <829D157A-7656-43BD-A7BC-937E5F8B696E@gmail.com> <4d07326e-908e-cdf0-64ea-0269485c1faa@apache.org> In-Reply-To: From: Fabian Hueske Date: Fri, 3 May 2019 09:32:07 +0200 Message-ID: Subject: Re: assignTimestampsAndWatermarks not work after KeyedStream.process To: an0 Cc: user Content-Type: multipart/alternative; boundary="000000000000ab9c380587f6c14a" --000000000000ab9c380587f6c14a Content-Type: text/plain; charset="UTF-8" Content-Transfer-Encoding: quoted-printable Hi, this should be covered here: https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/event_time.= html#watermarks-in-parallel-streams Best, Fabian Am Do., 2. Mai 2019 um 17:48 Uhr schrieb an0 : > This explanation is exactly what I'm looking for, thanks! Is such an > important rule documented anywhere in the official document? > > On 2019/04/30 08:47:29, Fabian Hueske wrote: > > An operator task broadcasts its current watermark to all downstream tas= ks > > that might receive its records. > > If you have an the following code: > > > > DataStream a =3D ... > > a.map(A).map(B).keyBy(....).window(C) > > > > and execute this with parallelism 2, your plan looks like this > > > > A.1 -- B.1 --\--/-- C.1 > > X > > A.2 -- B.2 --/--\-- C.2 > > > > A.1 will propagate its watermarks to B.1 because only B.1 will receive > its > > output events. > > However, B.1 will propagate its watermarks to C.1 and C.2 because the > > output of B.1 is partitioned and all C tasks might receive output event= s > > from B.1. > > > > Best, Fabian > > > > Am Mo., 29. Apr. 2019 um 20:06 Uhr schrieb an0 : > > > > > Thanks very much. It definitely explains the problem I'm seeing. > However, > > > something I need to confirm: > > > You say "Watermarks are broadcasted/forwarded anyway." Do you mean, i= n > > > assingTimestampsAndWatermarks.keyBy.window, it doesn't matter what da= ta > > > flows through a specific key's stream, all key streams have the same > > > watermarks? So time-wise, `window` behaves as if `keyBy` is not there > at > > > all? > > > > > > On 2019/04/26 06:34:10, Dawid Wysakowicz > wrote: > > > > Hi, > > > > > > > > Watermarks are meta events that travel independently of data events= . > > > > > > > > 1) If you assingTimestampsAndWatermarks before keyBy, all parallel > > > > instances of trips have some data(this is my assumption) so > Watermarks > > > > can be generated. Afterwards even if some of the keyed partitions > have > > > > no data, Watermarks are broadcasted/forwarded anyway. In other word= s > if > > > > at some point Watermarks were generated for all partitions of a > single > > > > stage, they will be forwarded beyond this point. > > > > > > > > 2) If you assingTimestampsAndWatermarks after keyBy, you try to > assign > > > > watermarks for an empty partition which produces no Watermarks at a= ll > > > > for this partition, therefore there is no progress beyond this poin= t. > > > > > > > > I hope this clarifies it a bit. > > > > > > > > Best, > > > > > > > > Dawid > > > > > > > > On 25/04/2019 16:49, an0 wrote: > > > > > If my understanding is correct, then why > > > `assignTimestampsAndWatermarks` before `keyBy` works? The > `timeWindowAll` > > > stream's input streams are task 1 and task 2, with task 2 idling, no > matter > > > whether `assignTimestampsAndWatermarks` is before or after `keyBy`, > because > > > whether task 2 receives elements only depends on the key distribution= , > has > > > nothing to do with timestamp assignment, right? > > > > > > > > > > > > > /key 1 trips\ > > > > > > > > / \ > > > > > (A) trips--> assignTimestampsAndWatermarks-->keyBy > > > timeWindowAll > > > > > > > > \ idle / > > > > > > > > \key 2 trips/ > > > > > > > > > > /key 1 trips--> > > > assignTimestampsAndWatermarks\ > > > > > / > > > \ > > > > > (B) trips-->keyBy > > > timeWindowAll > > > > > \ idle > > > / > > > > > \key 2 trips--> > > > assignTimestampsAndWatermarks/ > > > > > > > > > > How things are different between A and B from `timeWindowAll`'s > > > perspective? > > > > > > > > > > BTW, thanks for the webinar link, I'll check it later. > > > > > > > > > > On 2019/04/25 08:30:20, Dawid Wysakowicz > > > wrote: > > > > >> Hi, > > > > >> > > > > >> Yes I think your explanation is correct. I can also recommend > Seth's > > > > >> webinar where he talks about debugging Watermarks[1] > > > > >> > > > > >> Best, > > > > >> > > > > >> Dawid > > > > >> > > > > >> [1] > > > > >> > > > > https://www.ververica.com/resources/webinar/webinar/debugging-flink-tutor= ial > > > > >> > > > > >> On 22/04/2019 22:55, an0 wrote: > > > > >>> Thanks, I feel I'm getting closer to the truth. > > > > >>> > > > > >>> So parallelism is the cause? Say my parallelism is 2. Does that > mean > > > I get 2 tasks running after `keyBy` if even all elements have the sam= e > key > > > so go to 1 down stream(say task 1)? And it is the other task(task 2) > with > > > no incoming data that caused the `timeWindowAll` stream unable to > progress? > > > Because both task 1 and task 2 are its input streams and one is idlin= g > so > > > its event time cannot make progress? > > > > >>> > > > > >>> On 2019/04/22 01:57:39, Guowei Ma wrote: > > > > >>>> HI, > > > > >>>> > > > > >>>> BoundedOutOfOrdernessTimestampExtractors can send a WM at leas= t > > > after it > > > > >>>> receives an element. > > > > >>>> > > > > >>>> For after Keyby: > > > > >>>> Flink uses the HashCode of key and the parallelism of down > stream > > > to decide > > > > >>>> which subtask would receive the element. This means if your ke= y > is > > > always > > > > >>>> same, all the sources will only send the elements to the same > down > > > stream > > > > >>>> task, for example only no. 3 > > > BoundedOutOfOrdernessTimestampExtractor. > > > > >>>> > > > > >>>> For before Keyby: > > > > >>>> In your case, the Source and > > > BoundedOutOfOrdernessTimestampExtractors would > > > > >>>> be chained together, which means every > > > > >>>> BoundedOutOfOrdernessTimestampExtractors will receive elements= . > > > > >>>> > > > > >>>> Best, > > > > >>>> Guowei > > > > >>>> > > > > >>>> > > > > >>>> an0 =E4=BA=8E2019=E5=B9=B44=E6=9C=8819=E6= =97=A5=E5=91=A8=E4=BA=94 =E4=B8=8B=E5=8D=8810:41=E5=86=99=E9=81=93=EF=BC=9A > > > > >>>> > > > > >>>>> Hi, > > > > >>>>> > > > > >>>>> First of all, thank you for the `shuffle()` tip. It works. > > > However, I > > > > >>>>> still don't understand why it doesn't work without calling > > > `shuffle()`. > > > > >>>>> > > > > >>>>> Why would not all BoundedOutOfOrdernessTimestampExtractors > receive > > > trips? > > > > >>>>> All the trips has keys and timestamps. As I said in my reply = to > > > Paul, I see > > > > >>>>> the same watermarks being extracted. > > > > >>>>> > > > > >>>>> How could calling `assignTimestampsAndWatermarks` before VS > after > > > `keyBy` > > > > >>>>> matter? My understanding is any specific window for a specifi= c > key > > > always > > > > >>>>> receives the exactly same data, and the calling order of > > > > >>>>> `assignTimestampsAndWatermarks` and `keyBy` shouldn't affect > that. > > > > >>>>> > > > > >>>>> To make `keyBy` as irrelevant as possible, I tried letting it > > > always > > > > >>>>> return the same key so that there is only 1 keyed stream and > it is > > > exactly > > > > >>>>> the same as the original unkeyed stream. It still doesn't > trigger > > > windows: > > > > >>>>> ```java > > > > >>>>> DataStream trips =3D env.addSource(consumer); > > > > >>>>> KeyedStream userTrips =3D trips.keyBy(trip -> 0L)= ; > > > > >>>>> DataStream featurizedUserTrips =3D > > > > >>>>> userTrips.map(trip -> > > > trip).assignTimestampsAndWatermarks(new > > > > >>>>> BoundedOutOfOrdernessTimestampExtractor(Time.days(1)) { > > > > >>>>> @Override > > > > >>>>> public long extractTimestamp(Trip trip) { > > > > >>>>> return trip.endTime.getTime(); > > > > >>>>> } > > > > >>>>> }); > > > > >>>>> AllWindowedStream windowedUserTrips =3D > > > > >>>>> featurizedUserTrips.timeWindowAll(Time.days(7), > > > > >>>>> Time.days(1)); > > > > >>>>> ``` > > > > >>>>> > > > > >>>>> It makes no sense to me. Please help me understand why it > doesn't > > > work. > > > > >>>>> Thanks! > > > > >>>>> > > > > >>>>> On 2019/04/19 04:14:31, Guowei Ma > wrote: > > > > >>>>>> Hi, > > > > >>>>>> After keyby maybe only some of > > > BoundedOutOfOrdernessTimestampExtractors > > > > >>>>>> could receive the elements(trip). If that is the case > > > > >>>>>> BoundedOutOfOrdernessTimestampExtractor, which does not > receive > > > element > > > > >>>>>> would not send the WM. Since that the timeWindowAll operator > > > could not be > > > > >>>>>> triggered. > > > > >>>>>> You could add a shuffle() before the > > > assignTimestampsAndWatermarks in > > > > >>>>> your > > > > >>>>>> second case and check if the window is triggered. If it > could be > > > > >>>>> triggered > > > > >>>>>> you could check the distribution of elements generated by th= e > > > source. > > > > >>>>>> > > > > >>>>>> Best, > > > > >>>>>> Guowei > > > > >>>>>> > > > > >>>>>> > > > > >>>>>> an00na@gmail.com =E4=BA=8E2019=E5=B9=B44= =E6=9C=8819=E6=97=A5=E5=91=A8=E4=BA=94 =E4=B8=8A=E5=8D=884:10=E5=86=99=E9= =81=93=EF=BC=9A > > > > >>>>>> > > > > >>>>>>> I don't think it is the watermark. I see the same watermark= s > > > from the > > > > >>>>> two > > > > >>>>>>> versions of code. > > > > >>>>>>> > > > > >>>>>>> The processing on the keyed stream doesn't change event tim= e > at > > > all. I > > > > >>>>> can > > > > >>>>>>> simply change my code to use `map` on the keyed stream to > return > > > back > > > > >>>>> the > > > > >>>>>>> input data, so that the window operator receives the exactl= y > same > > > > >>>>> data. The > > > > >>>>>>> only difference is when I do > `assignTimestampsAndWatermarks`. The > > > > >>>>> result is > > > > >>>>>>> the same, `assignTimestampsAndWatermarks` before `keyBy` > works: > > > > >>>>>>> ```java > > > > >>>>>>> DataStream trips =3D > > > > >>>>>>> > env.addSource(consumer).assignTimestampsAndWatermarks(new > > > > >>>>>>> BoundedOutOfOrdernessTimestampExtractor(Time.days(1))= { > > > > >>>>>>> @Override > > > > >>>>>>> public long extractTimestamp(Trip trip) { > > > > >>>>>>> return trip.endTime.getTime(); > > > > >>>>>>> } > > > > >>>>>>> }); > > > > >>>>>>> KeyedStream userTrips =3D trips.keyBy(trip -> > > > trip.userId); > > > > >>>>>>> DataStream featurizedUserTrips =3D userTrips.map(trip= -> > > > trip); > > > > >>>>>>> AllWindowedStream windowedUserTrips =3D > > > > >>>>>>> featurizedUserTrips.timeWindowAll(Time.days(7), > > > > >>>>>>> Time.days(1)); > > > > >>>>>>> ``` > > > > >>>>>>> > > > > >>>>>>> `assignTimestampsAndWatermarks` after `keyBy` doesn't work: > > > > >>>>>>> ```java > > > > >>>>>>> DataStream trips =3D env.addSource(consumer); > > > > >>>>>>> KeyedStream userTrips =3D trips.keyBy(trip -> > > > trip.userId); > > > > >>>>>>> DataStream featurizedUserTrips =3D > > > > >>>>>>> userTrips.map(trip -> > > > trip).assignTimestampsAndWatermarks(new > > > > >>>>>>> BoundedOutOfOrdernessTimestampExtractor(Time.days(1))= { > > > > >>>>>>> @Override > > > > >>>>>>> public long extractTimestamp(Trip trip) { > > > > >>>>>>> return trip.endTime.getTime(); > > > > >>>>>>> } > > > > >>>>>>> }); > > > > >>>>>>> AllWindowedStream windowedUserTrips =3D > > > > >>>>>>> featurizedUserTrips.timeWindowAll(Time.days(7), > > > > >>>>>>> Time.days(1)); > > > > >>>>>>> ``` > > > > >>>>>>> > > > > >>>>>>> It feels a bug to me, but I want to confirm it before I fil= e > the > > > bug > > > > >>>>>>> report. > > > > >>>>>>> > > > > >>>>>>> On 2019/04/18 03:38:34, Paul Lam > wrote: > > > > >>>>>>>> Hi, > > > > >>>>>>>> > > > > >>>>>>>> Could you check the watermark of the window operator? One > > > possible > > > > >>>>>>> situation would be some of the keys are not getting enough > > > inputs, so > > > > >>>>> their > > > > >>>>>>> watermarks remain below the window end time and hold the > window > > > > >>>>> operator > > > > >>>>>>> watermark back. IMO, it=E2=80=99s a good practice to assign= watermark > > > earlier > > > > >>>>> in > > > > >>>>>>> the data pipeline. > > > > >>>>>>>> Best, > > > > >>>>>>>> Paul Lam > > > > >>>>>>>> > > > > >>>>>>>>> =E5=9C=A8 2019=E5=B9=B44=E6=9C=8817=E6=97=A5=EF=BC=8C23:0= 4=EF=BC=8Can00na@gmail.com =E5=86=99=E9=81=93=EF=BC=9A > > > > >>>>>>>>> > > > > >>>>>>>>> `assignTimestampsAndWatermarks` before `keyBy` works: > > > > >>>>>>>>> ```java > > > > >>>>>>>>> DataStream trips =3D > > > > >>>>>>>>> > > > env.addSource(consumer).assignTimestampsAndWatermarks(new > > > > >>>>>>> BoundedOutOfOrdernessTimestampExtractor(Time.days(1))= { > > > > >>>>>>>>> @Override > > > > >>>>>>>>> public long extractTimestamp(Trip trip) { > > > > >>>>>>>>> return trip.endTime.getTime(); > > > > >>>>>>>>> } > > > > >>>>>>>>> }); > > > > >>>>>>>>> KeyedStream userTrips =3D trips.keyBy(trip -> > > > > >>>>> trip.userId); > > > > >>>>>>>>> DataStream featurizedUserTrips =3D > > > > >>>>> userTrips.process(new > > > > >>>>>>> Featurization()); > > > > >>>>>>>>> AllWindowedStream > > > windowedUserTrips =3D > > > > >>>>>>>>> featurizedUserTrips.timeWindowAll(Time.days(7), > > > > >>>>>>>>> Time.days(1)); > > > > >>>>>>>>> ``` > > > > >>>>>>>>> > > > > >>>>>>>>> But not after `keyBy` and `process`: > > > > >>>>>>>>> ```java > > > > >>>>>>>>> DataStream trips =3D env.addSource(consumer); > > > > >>>>>>>>> KeyedStream userTrips =3D trips.keyBy(trip -> > > > > >>>>> trip.userId); > > > > >>>>>>>>> DataStream featurizedUserTrips =3D > > > > >>>>>>>>> userTrips.process(new > > > > >>>>>>> Featurization()).assignTimestampsAndWatermarks(new > > > > >>>>>>> > > > BoundedOutOfOrdernessTimestampExtractor(Time.days(1))= { > > > > >>>>>>>>> @Override > > > > >>>>>>>>> public long extractTimestamp(FeaturizedTrip > trip) { > > > > >>>>>>>>> return trip.endTime.getTime(); > > > > >>>>>>>>> } > > > > >>>>>>>>> }); > > > > >>>>>>>>> AllWindowedStream > > > windowedUserTrips =3D > > > > >>>>>>>>> featurizedUserTrips.timeWindowAll(Time.days(7), > > > > >>>>>>>>> Time.days(1)); > > > > >>>>>>>>> ``` > > > > >>>>>>>>> Windows are never triggered. > > > > >>>>>>>>> > > > > >>>>>>>>> Is it a bug or expected behavior? If the latter, where is > it > > > > >>>>>>> documented? > > > > >> > > > > > > > > > > > > > > --000000000000ab9c380587f6c14a Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
Am Do., 2. Mai 2019 um 17:48=C2=A0Uhr schrieb an0 <an00na@gmail.com>:
This explanation is exactly what I&#= 39;m looking for, thanks! Is such an important rule documented anywhere in = the official document?

On 2019/04/30 08:47:29, Fabian Hueske <fhueske@gmail.com> wrote:
> An operator task broadcasts its current watermark to all downstream ta= sks
> that might receive its records.
> If you have an the following code:
>
> DataStream<X> a =3D ...
> a.map(A).map(B).keyBy(....).window(C)
>
> and execute this with parallelism 2, your plan looks like this
>
> A.1 -- B.1 --\--/-- C.1
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0X
> A.2 -- B.2 --/--\-- C.2
>
> A.1 will propagate its watermarks to B.1 because only B.1 will receive= its
> output events.
> However, B.1 will propagate its watermarks to C.1 and C.2 because the<= br> > output of B.1 is partitioned and all C tasks might receive output even= ts
> from B.1.
>
> Best, Fabian
>
> Am Mo., 29. Apr. 2019 um 20:06 Uhr schrieb an0 <an00na@gmail.com>:
>
> > Thanks very much. It definitely explains the problem I'm seei= ng. However,
> > something I need to confirm:
> > You say "Watermarks are broadcasted/forwarded anyway." = Do you mean, in
> > assingTimestampsAndWatermarks.keyBy.window, it doesn't matter= what data
> > flows through a specific key's stream, all key streams have t= he same
> > watermarks? So time-wise, `window` behaves as if `keyBy` is not t= here at
> > all?
> >
> > On 2019/04/26 06:34:10, Dawid Wysakowicz <dwysakowicz@apache.org> wrot= e:
> > > Hi,
> > >
> > > Watermarks are meta events that travel independently of data= events.
> > >
> > > 1) If you assingTimestampsAndWatermarks before keyBy, all pa= rallel
> > > instances of trips have some data(this is my assumption) so = Watermarks
> > > can be generated. Afterwards even if some of the keyed parti= tions have
> > > no data, Watermarks are broadcasted/forwarded anyway. In oth= er words if
> > > at some point Watermarks were generated for all partitions o= f a single
> > > stage, they will be forwarded beyond this point.
> > >
> > > 2) If you assingTimestampsAndWatermarks after keyBy, you try= to assign
> > > watermarks for an empty partition which produces no Watermar= ks at all
> > > for this partition, therefore there is no progress beyond th= is point.
> > >
> > > I hope this clarifies it a bit.
> > >
> > > Best,
> > >
> > > Dawid
> > >
> > > On 25/04/2019 16:49, an0 wrote:
> > > > If my understanding is correct, then why
> > `assignTimestampsAndWatermarks` before `keyBy` works? The `timeWi= ndowAll`
> > stream's input streams are task 1 and task 2, with task 2 idl= ing, no matter
> > whether `assignTimestampsAndWatermarks` is before or after `keyBy= `, because
> > whether task 2 receives elements only depends on the key distribu= tion, has
> > nothing to do with timestamp assignment, right?
> > > >
> > > >
> >=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 /ke= y 1 trips\
> > > >
> >=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 /=C2=A0 = =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 \
> > > > (A) trips--> assignTimestampsAndWatermarks-->keyB= y
> > timeWindowAll
> > > >
> >=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 \=C2=A0 = =C2=A0 =C2=A0 =C2=A0idle=C2=A0 =C2=A0 =C2=A0 =C2=A0 /
> > > >
> >=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 \ke= y 2 trips/
> > > >
> > > >=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 /key 1 trips-->
> > assignTimestampsAndWatermarks\
> > > >=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 /
> >=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 \
> > > > (B) trips-->keyBy
> >=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 timeWindowAll
> > > >=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 \=C2=A0 =C2=A0 =C2=A0 =C2=A0idle
> >=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 /
> > > >=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 \key 2 trips-->
> > assignTimestampsAndWatermarks/
> > > >
> > > > How things are different between A and B from `timeWind= owAll`'s
> > perspective?
> > > >
> > > > BTW, thanks for the webinar link, I'll check it lat= er.
> > > >
> > > > On 2019/04/25 08:30:20, Dawid Wysakowicz <dwysakowicz@apache.org>
> > wrote:
> > > >> Hi,
> > > >>
> > > >> Yes I think your explanation is correct. I can also= recommend Seth's
> > > >> webinar where he talks about debugging Watermarks[1= ]
> > > >>
> > > >> Best,
> > > >>
> > > >> Dawid
> > > >>
> > > >> [1]
> > > >>
> >
https://www.ve= rverica.com/resources/webinar/webinar/debugging-flink-tutorial
> > > >>
> > > >> On 22/04/2019 22:55, an0 wrote:
> > > >>> Thanks, I feel I'm getting closer to the tr= uth.
> > > >>>
> > > >>> So parallelism is the cause? Say my parallelism= is 2. Does that mean
> > I get 2 tasks running after `keyBy` if even all elements have the= same key
> > so go to 1 down stream(say task 1)? And it is the other task(task= 2) with
> > no incoming data that caused the `timeWindowAll` stream unable to= progress?
> > Because both task 1 and task 2 are its input streams and one is i= dling so
> > its event time cannot make progress?
> > > >>>
> > > >>> On 2019/04/22 01:57:39, Guowei Ma <guowei.mgw@gmail.com= > wrote:
> > > >>>> HI,
> > > >>>>
> > > >>>> BoundedOutOfOrdernessTimestampExtractors ca= n send a WM at least
> > after it
> > > >>>> receives an element.
> > > >>>>
> > > >>>> For after Keyby:
> > > >>>> Flink uses the HashCode of key and the para= llelism of down stream
> > to decide
> > > >>>> which subtask would receive the element. Th= is means if your key is
> > always
> > > >>>> same, all the sources will only send the el= ements to the same down
> > stream
> > > >>>> task, for example only no. 3
> > BoundedOutOfOrdernessTimestampExtractor.
> > > >>>>
> > > >>>> For before Keyby:
> > > >>>> In your case, the Source and
> > BoundedOutOfOrdernessTimestampExtractors would
> > > >>>> be chained together, which means every
> > > >>>> BoundedOutOfOrdernessTimestampExtractors wi= ll receive elements.
> > > >>>>
> > > >>>> Best,
> > > >>>> Guowei
> > > >>>>
> > > >>>>
> > > >>>> an0 <an00na@gmail.com> =E4=BA=8E2019=E5=B9=B44=E6=9C= =8819=E6=97=A5=E5=91=A8=E4=BA=94 =E4=B8=8B=E5=8D=8810:41=E5=86=99=E9=81=93= =EF=BC=9A
> > > >>>>
> > > >>>>> Hi,
> > > >>>>>
> > > >>>>> First of all, thank you for the `shuffl= e()` tip. It works.
> > However, I
> > > >>>>> still don't understand why it doesn= 't work without calling
> > `shuffle()`.
> > > >>>>>
> > > >>>>> Why would not all BoundedOutOfOrderness= TimestampExtractors receive
> > trips?
> > > >>>>> All the trips has keys and timestamps. = As I said in my reply to
> > Paul, I see
> > > >>>>> the same watermarks being extracted. > > > >>>>>
> > > >>>>> How could calling `assignTimestampsAndW= atermarks` before VS after
> > `keyBy`
> > > >>>>> matter? My understanding is any specifi= c window for a specific key
> > always
> > > >>>>> receives the exactly same data, and the= calling order of
> > > >>>>> `assignTimestampsAndWatermarks` and `ke= yBy` shouldn't affect that.
> > > >>>>>
> > > >>>>> To make `keyBy` as irrelevant as possib= le, I tried letting it
> > always
> > > >>>>> return the same key so that there is on= ly 1 keyed stream and it is
> > exactly
> > > >>>>> the same as the original unkeyed stream= . It still doesn't trigger
> > windows:
> > > >>>>> ```java
> > > >>>>> DataStream<Trip> trips =3D env.ad= dSource(consumer);
> > > >>>>> KeyedStream<Trip, Long> userTrips= =3D trips.keyBy(trip -> 0L);
> > > >>>>> DataStream<Trip> featurizedUserTr= ips =3D
> > > >>>>>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0userTr= ips.map(trip ->
> > trip).assignTimestampsAndWatermarks(new
> > > >>>>> BoundedOutOfOrdernessTimestampExtractor= <Trip>(Time.days(1)) {
> > > >>>>>=C2=A0 =C2=A0 =C2=A0@Override
> > > >>>>>=C2=A0 =C2=A0 =C2=A0public long extractT= imestamp(Trip trip) {
> > > >>>>>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0return= trip.endTime.getTime();
> > > >>>>>=C2=A0 =C2=A0 =C2=A0}
> > > >>>>> });
> > > >>>>> AllWindowedStream<Trip, TimeWindow&g= t; windowedUserTrips =3D
> > > >>>>> featurizedUserTrips.timeWindowAll(Time.= days(7),
> > > >>>>>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0Time.d= ays(1));
> > > >>>>> ```
> > > >>>>>
> > > >>>>> It makes no sense to me. Please help me= understand why it doesn't
> > work.
> > > >>>>> Thanks!
> > > >>>>>
> > > >>>>> On 2019/04/19 04:14:31, Guowei Ma <<= a href=3D"mailto:guowei.mgw@gmail.com" target=3D"_blank">guowei.mgw@gmail.c= om> wrote:
> > > >>>>>> Hi,
> > > >>>>>> After keyby maybe only some of
> > BoundedOutOfOrdernessTimestampExtractors
> > > >>>>>> could receive the elements(trip). I= f that is the case
> > > >>>>>> BoundedOutOfOrdernessTimestampExtra= ctor, which does not receive
> > element
> > > >>>>>> would not send the WM. Since that t= he timeWindowAll operator
> > could not be
> > > >>>>>> triggered.
> > > >>>>>> You could add a shuffle() before th= e
> > assignTimestampsAndWatermarks in
> > > >>>>> your
> > > >>>>>> second case and check if the window= is triggered.=C2=A0 If it could be
> > > >>>>> triggered
> > > >>>>>> you could check the distribution of= elements generated by the
> > source.
> > > >>>>>>
> > > >>>>>> Best,
> > > >>>>>> Guowei
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> an00na@gmail.com <an00na@gmail.com> =E4=BA=8E2019=E5=B9=B44=E6= =9C=8819=E6=97=A5=E5=91=A8=E4=BA=94 =E4=B8=8A=E5=8D=884:10=E5=86=99=E9=81= =93=EF=BC=9A
> > > >>>>>>
> > > >>>>>>> I don't think it is the wat= ermark. I see the same watermarks
> > from the
> > > >>>>> two
> > > >>>>>>> versions of code.
> > > >>>>>>>
> > > >>>>>>> The processing on the keyed str= eam doesn't change event time at
> > all. I
> > > >>>>> can
> > > >>>>>>> simply change my code to use `m= ap` on the keyed stream to return
> > back
> > > >>>>> the
> > > >>>>>>> input data, so that the window = operator receives the exactly same
> > > >>>>> data. The
> > > >>>>>>> only difference is when I do `a= ssignTimestampsAndWatermarks`. The
> > > >>>>> result is
> > > >>>>>>> the same, `assignTimestampsAndW= atermarks` before `keyBy` works:
> > > >>>>>>> ```java
> > > >>>>>>> DataStream<Trip> trips = =3D
> > > >>>>>>>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0env.addSource(consumer).assignTimestampsAndWatermarks(new
> > > >>>>>>> BoundedOutOfOrdernessTimestampE= xtractor<Trip>(Time.days(1)) {
> > > >>>>>>>=C2=A0 =C2=A0 =C2=A0@Override > > > >>>>>>>=C2=A0 =C2=A0 =C2=A0public long = extractTimestamp(Trip trip) {
> > > >>>>>>>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0return trip.endTime.getTime();
> > > >>>>>>>=C2=A0 =C2=A0 =C2=A0}
> > > >>>>>>> });
> > > >>>>>>> KeyedStream<Trip, Long> u= serTrips =3D trips.keyBy(trip ->
> > trip.userId);
> > > >>>>>>> DataStream<Trip> featuriz= edUserTrips =3D userTrips.map(trip ->
> > trip);
> > > >>>>>>> AllWindowedStream<Trip, Time= Window> windowedUserTrips =3D
> > > >>>>>>> featurizedUserTrips.timeWindowA= ll(Time.days(7),
> > > >>>>>>>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0Time.days(1));
> > > >>>>>>> ```
> > > >>>>>>>
> > > >>>>>>> `assignTimestampsAndWatermarks`= after `keyBy` doesn't work:
> > > >>>>>>> ```java
> > > >>>>>>> DataStream<Trip> trips = =3D env.addSource(consumer);
> > > >>>>>>> KeyedStream<Trip, Long> u= serTrips =3D trips.keyBy(trip ->
> > trip.userId);
> > > >>>>>>> DataStream<Trip> featuriz= edUserTrips =3D
> > > >>>>>>>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0userTrips.map(trip ->
> > trip).assignTimestampsAndWatermarks(new
> > > >>>>>>> BoundedOutOfOrdernessTimestampE= xtractor<Trip>(Time.days(1)) {
> > > >>>>>>>=C2=A0 =C2=A0 =C2=A0@Override > > > >>>>>>>=C2=A0 =C2=A0 =C2=A0public long = extractTimestamp(Trip trip) {
> > > >>>>>>>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0return trip.endTime.getTime();
> > > >>>>>>>=C2=A0 =C2=A0 =C2=A0}
> > > >>>>>>> });
> > > >>>>>>> AllWindowedStream<Trip, Time= Window> windowedUserTrips =3D
> > > >>>>>>> featurizedUserTrips.timeWindowA= ll(Time.days(7),
> > > >>>>>>>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0Time.days(1));
> > > >>>>>>> ```
> > > >>>>>>>
> > > >>>>>>> It feels a bug to me, but I wan= t to confirm it before I file the
> > bug
> > > >>>>>>> report.
> > > >>>>>>>
> > > >>>>>>> On 2019/04/18 03:38:34, Paul La= m <paullin328= 0@gmail.com> wrote:
> > > >>>>>>>> Hi,
> > > >>>>>>>>
> > > >>>>>>>> Could you check the waterma= rk of the window operator? One
> > possible
> > > >>>>>>> situation would be some of the = keys are not getting enough
> > inputs, so
> > > >>>>> their
> > > >>>>>>> watermarks remain below the win= dow end time and hold the window
> > > >>>>> operator
> > > >>>>>>> watermark back. IMO, it=E2=80= =99s a good practice to assign watermark
> > earlier
> > > >>>>> in
> > > >>>>>>> the data pipeline.
> > > >>>>>>>> Best,
> > > >>>>>>>> Paul Lam
> > > >>>>>>>>
> > > >>>>>>>>> =E5=9C=A8 2019=E5=B9=B4= 4=E6=9C=8817=E6=97=A5=EF=BC=8C23:04=EF=BC=8Can00na@gmail.com =E5=86=99=E9=81=93=EF=BC=9A
> > > >>>>>>>>>
> > > >>>>>>>>> `assignTimestampsAndWat= ermarks` before `keyBy` works:
> > > >>>>>>>>> ```java
> > > >>>>>>>>> DataStream<Trip> = trips =3D
> > > >>>>>>>>>
> > env.addSource(consumer).assignTimestampsAndWatermarks(new
> > > >>>>>>> BoundedOutOfOrdernessTimestampE= xtractor<Trip>(Time.days(1)) {
> > > >>>>>>>>>=C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 @Override
> > > >>>>>>>>>=C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 public long extractTimestamp(Trip trip) {
> > > >>>>>>>>>=C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 return trip.endTime.getTime();
> > > >>>>>>>>>=C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 }
> > > >>>>>>>>>=C2=A0 =C2=A0 =C2=A0 =C2= =A0 });
> > > >>>>>>>>> KeyedStream<Trip, Lo= ng> userTrips =3D trips.keyBy(trip ->
> > > >>>>> trip.userId);
> > > >>>>>>>>> DataStream<Featurize= dTrip> featurizedUserTrips =3D
> > > >>>>> userTrips.process(new
> > > >>>>>>> Featurization());
> > > >>>>>>>>> AllWindowedStream<Fe= aturizedTrip, TimeWindow>
> > windowedUserTrips =3D
> > > >>>>>>>>>=C2=A0 =C2=A0 =C2=A0 =C2= =A0 featurizedUserTrips.timeWindowAll(Time.days(7),
> > > >>>>>>>>>=C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 Time.days(1));
> > > >>>>>>>>> ```
> > > >>>>>>>>>
> > > >>>>>>>>> But not after `keyBy` a= nd `process`:
> > > >>>>>>>>> ```java
> > > >>>>>>>>> DataStream<Trip> = trips =3D env.addSource(consumer);
> > > >>>>>>>>> KeyedStream<Trip, Lo= ng> userTrips =3D trips.keyBy(trip ->
> > > >>>>> trip.userId);
> > > >>>>>>>>> DataStream<Featurize= dTrip> featurizedUserTrips =3D
> > > >>>>>>>>>=C2=A0 =C2=A0 =C2=A0 =C2= =A0 userTrips.process(new
> > > >>>>>>> Featurization()).assignTimestam= psAndWatermarks(new
> > > >>>>>>>
> > BoundedOutOfOrdernessTimestampExtractor<FeaturizedTrip>(Tim= e.days(1)) {
> > > >>>>>>>>>=C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 @Override
> > > >>>>>>>>>=C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 public long extractTimestamp(FeaturizedTrip trip) {
> > > >>>>>>>>>=C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 return trip.endTime.getTime();
> > > >>>>>>>>>=C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 }
> > > >>>>>>>>>=C2=A0 =C2=A0 =C2=A0 =C2= =A0 });
> > > >>>>>>>>> AllWindowedStream<Fe= aturizedTrip, TimeWindow>
> > windowedUserTrips =3D
> > > >>>>>>>>>=C2=A0 =C2=A0 =C2=A0 =C2= =A0 featurizedUserTrips.timeWindowAll(Time.days(7),
> > > >>>>>>>>>=C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 Time.days(1));
> > > >>>>>>>>> ```
> > > >>>>>>>>> Windows are never trigg= ered.
> > > >>>>>>>>>
> > > >>>>>>>>> Is it a bug or expected= behavior? If the latter, where is it
> > > >>>>>>> documented?
> > > >>
> > >
> > >
> >
>
--000000000000ab9c380587f6c14a--