From user-return-33916-archive-asf-public=cust-asf.ponee.io@flink.apache.org Mon Apr 6 04:40:16 2020 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id 62699180638 for ; Mon, 6 Apr 2020 06:40:16 +0200 (CEST) Received: (qmail 27196 invoked by uid 500); 6 Apr 2020 04:40:12 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 27185 invoked by uid 99); 6 Apr 2020 04:40:12 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 06 Apr 2020 04:40:12 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 88812C1DDE for ; Mon, 6 Apr 2020 04:40:11 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 0.251 X-Spam-Level: X-Spam-Status: No, score=0.251 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, DKIM_VALID_EF=-0.1, FREEMAIL_ENVFROM_END_DIGIT=0.25, HTML_MESSAGE=0.2, RCVD_IN_DNSWL_NONE=-0.0001, SPF_HELO_NONE=0.001, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd4-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-he-de.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id KNuzQ8K7ZfXU for ; Mon, 6 Apr 2020 04:40:09 +0000 (UTC) Received-SPF: Pass (mailfrom) identity=mailfrom; client-ip=2a00:1450:4864:20::242; helo=mail-lj1-x242.google.com; envelope-from=manaskale96@gmail.com; receiver= Received: from mail-lj1-x242.google.com (mail-lj1-x242.google.com [IPv6:2a00:1450:4864:20::242]) by mx1-he-de.apache.org (ASF Mail Server at mx1-he-de.apache.org) with ESMTPS id A7B187E1CB for ; Mon, 6 Apr 2020 04:40:08 +0000 (UTC) Received: by mail-lj1-x242.google.com with SMTP id k21so13226258ljh.2 for ; Sun, 05 Apr 2020 21:40:08 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:references:in-reply-to:from:date:message-id:subject:to :cc; bh=wDfoJwgAWsmTUYUiVCeB2ikID189K38Hia9IEdeW3RM=; b=hj/nDTuP6E789BB9lnlFyW9g/PQ2Po2InXrj0ejFuUJ2rc9by64SQtZ4gxIFqWaU57 WX3LEOMOWt30B5zrZQJt2/gEsmsPU9aV+BXlCP8KbA+CYveadUlgjsSin31YcscDcmlD SxkfXOkyJHgQ6jxgH0Q0Z0h3XdrRdX6yIkkzTqXPjTUfY6a9nyUm9uTcXdYL+nyJZrLw lJMM2bugWribW2X2WBtx6fceQEhM8x/7tEoe5/JovMnjJ8C/RNlDchfjPpxgVpIileyW 1Mvtu8U4LU/g638o+Q6+uOqR45t5yEJGCPxKrU6jxga1tuy/WRMjt+DH7goESTjEQV/a hDIA== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:references:in-reply-to:from:date :message-id:subject:to:cc; bh=wDfoJwgAWsmTUYUiVCeB2ikID189K38Hia9IEdeW3RM=; b=C2r2mNZdJ9wukaJK/t55dK7RH6brdKpEFy2DhnoAs8JktyeOnVskdToE/krlP6Ncba cSaX6SjPTH3ytAjaei8n5kVtQqAtEIuuQ8FEVhMdn8d/rbi5Jaxndn8AIZoVQ4GUUIl0 oQlPA2IF3y0rWuarFUB8FftbnDeTIuqurcS39MhncUGgX97qVnwjIMpg3jwQ3cfWCm4L BmNFhsHWPz9QqqMn1aPp90mByJqSuD6uDcIRKkruWvVsb3pDwW2PTT+40d1VoF7EdFyC PzwG0pR3hi40bZsCAXFb11u3IjSvgBt5fft+u8IXUrIRtvC1BXVv3HeZzYWACoAMmPyT BpIw== X-Gm-Message-State: AGi0PubtaMhABPr7Kh73VmfDwShiBo6ntWLJO4VtrM2WBU8kQLmn2yfp qmi51oaY/dZS1STPQhD6xAuRU2jaVv18+ozZl00= X-Google-Smtp-Source: APiQypLBSEVvi+I2Uo3tuCoMp9kt45aKiYuiNzaKAmcdFimm0J2tygkIetQse+q55MpCD7GYgVn/7r7f/vptlKF4CDg= X-Received: by 2002:a2e:8795:: with SMTP id n21mr11451963lji.71.1586148001842; Sun, 05 Apr 2020 21:40:01 -0700 (PDT) MIME-Version: 1.0 References: In-Reply-To: From: Manas Kale Date: Mon, 6 Apr 2020 10:09:50 +0530 Message-ID: Subject: Re: Perform processing only when watermark updates, buffer data otherwise To: Timo Walther Cc: user Content-Type: multipart/alternative; boundary="00000000000036082b05a297dc0b" --00000000000036082b05a297dc0b Content-Type: text/plain; charset="UTF-8" Hi Timo, Thanks for the information. On Thu, Apr 2, 2020 at 9:30 PM Timo Walther wrote: > Hi Manas, > > first of all, after assigning watermarks at the source level, usually > Flink operators make sure to handle the watermarks. > > In case of a `union()`, the subsequent operator will increment its > internal event-time clock and emit a new watermark only if all input > streams (and their parallel instances) have reached a common event-time. > > Your sorting use case can be easily done with a KeyedProcessFunction > [1]. You can buffer your events in a list state, and process them when a > timer fires. The documentation also explains how to set a timer. > > If you want to fire when the next watermark arrives, you can set a timer > like: > > ctx.timerService().currentWatermark() + 1 > > The `union()` is meant for combining streams of the same data into one > where the order of the event does not matter. However, watermarks are > still arriving in order so a sorting by event-time should not be a problem. > > connect() is broader than a join (see also the answer here [2]). > > I hope I could answer most of your questions. Feel free to ask further > questions. > > Regards, > Timo > > > [1] > > https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/process_function.html#process-function > [2] > > https://stackoverflow.com/questions/52885926/what-is-the-difference-between-flink-join-and-connect > > > > On 02.04.20 12:11, Manas Kale wrote: > > Also > > > > * What happens to watermarks after a union operation? Do I have to > > assignTimestampsAndWatermarks() again? I guess I will have to since > > multiple streams are being combined and Flink needs to know how to > > resolve individual watermarks. > > * What is the difference between union() and connect()? > > > > > > On Thu, Apr 2, 2020 at 10:33 AM Manas Kale > > wrote: > > > > Hi, > > I want to perform some processing on events only when the watermark > > is updated. Otherwise, for all other events, I want to keep > > buffering them till the watermark arrives. > > The main motivation behind doing this is that I have several > > operators that emit events/messages to a downstream operator. Since > > the order in which events arrive at the downstream operator is not > > guaranteed to be in chronological event time, I want to manually > > sort events when the watermark arrives and only then proceed. > > > > Specifically, I want to first combine multiple streams and then do > > the above. Something like : > > stream1.union(stream2, steream3)... > > > > One solution I am exploring is using a global window with a trigger > > that will fire only when the watermark updates. > > stream1.union(stream2, steream3). > > keyBy(...). > > window(GlobalWindows.create()). > > trigger(new OnWatermarkUpdateTrigger()). > > process(...) > > > > I will store the latest watermark in the trigger's state store. In > > the onElement() method, I will FIRE if the current watermark is > > different than the stored one. > > > > Is this the best way to implement the functionality described above? > > > > --00000000000036082b05a297dc0b Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
Hi Timo,
Thanks for the information.

On Thu, Apr = 2, 2020 at 9:30 PM Timo Walther <t= walthr@apache.org> wrote:
Hi Manas,

first of all, after assigning watermarks at the source level, usually
Flink operators make sure to handle the watermarks.

In case of a `union()`, the subsequent operator will increment its
internal event-time clock and emit a new watermark only if all input
streams (and their parallel instances) have reached a common event-time.
Your sorting use case can be easily done with a KeyedProcessFunction
[1]. You can buffer your events in a list state, and process them when a timer fires. The documentation also explains how to set a timer.

If you want to fire when the next watermark arrives, you can set a timer like:

ctx.timerService().currentWatermark() + 1

The `union()` is meant for combining streams of the same data into one
where the order of the event does not matter. However, watermarks are
still arriving in order so a sorting by event-time should not be a problem.=

connect() is broader than a join (see also the answer here [2]).

I hope I could answer most of your questions. Feel free to ask further
questions.

Regards,
Timo


[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/st= ream/operators/process_function.html#process-function
[2]
ht= tps://stackoverflow.com/questions/52885926/what-is-the-difference-between-f= link-join-and-connect



On 02.04.20 12:11, Manas Kale wrote:
> Also
>
>=C2=A0 =C2=A0*=C2=A0 =C2=A0What happens to watermarks after a union ope= ration? Do I have to
>=C2=A0 =C2=A0 =C2=A0assignTimestampsAndWatermarks() again? I guess I wi= ll have to since
>=C2=A0 =C2=A0 =C2=A0multiple streams are being combined and Flink needs= to know how to
>=C2=A0 =C2=A0 =C2=A0resolve individual watermarks.
>=C2=A0 =C2=A0* What is the difference between union() and connect()? >
>
> On Thu, Apr 2, 2020 at 10:33 AM Manas Kale <manaskale96@gmail.com
> <mailto:= manaskale96@gmail.com>> wrote:
>
>=C2=A0 =C2=A0 =C2=A0Hi,
>=C2=A0 =C2=A0 =C2=A0I want to perform some processing on events only wh= en the watermark
>=C2=A0 =C2=A0 =C2=A0is updated. Otherwise, for all other events, I want= to keep
>=C2=A0 =C2=A0 =C2=A0buffering them till the watermark arrives.
>=C2=A0 =C2=A0 =C2=A0The main motivation behind doing this is that I hav= e several
>=C2=A0 =C2=A0 =C2=A0operators that emit events/messages to a downstream= operator. Since
>=C2=A0 =C2=A0 =C2=A0the order in which events arrive at the downstream = operator is not
>=C2=A0 =C2=A0 =C2=A0guaranteed to be in chronological event time, I wan= t to manually
>=C2=A0 =C2=A0 =C2=A0sort events when the watermark arrives and only the= n proceed.
>
>=C2=A0 =C2=A0 =C2=A0Specifically, I want to first combine multiple stre= ams and then do
>=C2=A0 =C2=A0 =C2=A0the above. Something like :
>=C2=A0 =C2=A0 =C2=A0stream1.union(stream2, steream3)...
>
>=C2=A0 =C2=A0 =C2=A0One solution I am exploring is using a global windo= w with a trigger
>=C2=A0 =C2=A0 =C2=A0that will fire only when the watermark updates.
>=C2=A0 =C2=A0 =C2=A0stream1.union(stream2, steream3).
>=C2=A0 =C2=A0 =C2=A0keyBy(...).
>=C2=A0 =C2=A0 =C2=A0window(GlobalWindows.create()).
>=C2=A0 =C2=A0 =C2=A0trigger(new OnWatermarkUpdateTrigger()).
>=C2=A0 =C2=A0 =C2=A0process(...)
>
>=C2=A0 =C2=A0 =C2=A0I will store the latest watermark in the trigger= 9;s state store. In
>=C2=A0 =C2=A0 =C2=A0the onElement() method, I will FIRE if the current = watermark is
>=C2=A0 =C2=A0 =C2=A0different than the stored one.
>
>=C2=A0 =C2=A0 =C2=A0Is this the best way to implement the functionality= described above?
>

--00000000000036082b05a297dc0b--