From user-return-28980-archive-asf-public=cust-asf.ponee.io@flink.apache.org Fri Aug 2 14:18:23 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id 662BB180647 for ; Fri, 2 Aug 2019 16:18:23 +0200 (CEST) Received: (qmail 91078 invoked by uid 500); 2 Aug 2019 14:18:20 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 91068 invoked by uid 99); 2 Aug 2019 14:18:20 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 02 Aug 2019 14:18:20 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 3321E182722 for ; Fri, 2 Aug 2019 14:18:20 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.011 X-Spam-Level: ** X-Spam-Status: No, score=2.011 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H2=-0.001, SPF_HELO_NONE=0.001, T_SPF_PERMERROR=0.01, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd3-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=motaword-com.20150623.gappssmtp.com Received: from mx1-ec2-va.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id kNiK9Vywkfuj for ; Fri, 2 Aug 2019 14:18:18 +0000 (UTC) Received-SPF: Permerror (mailfrom) identity=mailfrom; client-ip=209.85.222.170; helo=mail-qk1-f170.google.com; envelope-from=oytun@motaword.com; receiver= Received: from mail-qk1-f170.google.com (mail-qk1-f170.google.com [209.85.222.170]) by mx1-ec2-va.apache.org (ASF Mail Server at mx1-ec2-va.apache.org) with ESMTPS id F2CAABC7AD for ; Fri, 2 Aug 2019 14:18:17 +0000 (UTC) Received: by mail-qk1-f170.google.com with SMTP id w190so54874372qkc.6 for ; Fri, 02 Aug 2019 07:18:17 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=motaword-com.20150623.gappssmtp.com; s=20150623; h=mime-version:references:in-reply-to:from:date:message-id:subject:to :cc; bh=z23PHNsqooJ1f4/8Jg6Z2gh5RrrTzaAOJysqGh6k114=; b=06AWsCfSml/56eeHvF53vszHGP/b0xhuICY/aXYxxANjeeolRBF0iIDIOPt7ZWRq/X pBIT3f8wCSfMQzuNXGF6lnwLSyy9e8ygdKDgw3/xtZ1/84+QNx4G0QkuG2Kq65s1XLi/ 4/dl753Fb2Un7uwhQ4Wv52i3w8AzfPe9ce0pey6mrclA+abjGsi1ny83N7i+Jm3bs8rf oVWpHqRGjKEyxiNeeipSnLePLn7VwkAoyrzOJy3/SWAIs8w829Rc0lTBGTo8VE0GwSyV TiA2cB7NzfKW017uJBOj/v4TJLfr/Ghu7hw8wYZz8SIRr9giic/8ZaUjF5t5PTOPkMK7 Xe8Q== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:references:in-reply-to:from:date :message-id:subject:to:cc; bh=z23PHNsqooJ1f4/8Jg6Z2gh5RrrTzaAOJysqGh6k114=; b=EFhA2VX4u+b7taOYObjElWisXIpebrN5eYMsOCKi5JICTrSfC06LzVyM1hrcXwV6b5 RdRQ1uvS5x78nLyRsfs/N+YZavhkFccC0jr1miAhrDka9Hdr108Cto34s5LSrEfj8XxZ vFX6Uk0hWe1IrmjlMj/GgAYjLYsjifweVhG+BXJoRze3ebhoiwTAI4gjcNCQU8nc8k23 XOd+UNmWbIyeDXM6zaj6/AcBQv416StNG6/czIJr31pqzj/e9TEjz5ziraqbpXEyh8SY 8Rt2KLCoCpSHLvttiwwf6WPaIlTwqBp+W0aRTWGA/Jg4rxraktLqm0MklLqYJ6/kkXVa 3FXw== X-Gm-Message-State: APjAAAUnIjGp+msIzwYiRqkp/b7epx2Nw/my1Ar8fcV7YVgefASR3Etd XoQa8kRIrZgBjviXL9oxNxv6sSb0M3JrpJOlWhN4Yg== X-Google-Smtp-Source: APXvYqx6n0HyBi+yJhy4mH8znkd3W3M8jshrnQy/exU9xZZF5/c4NG7nKsXpfTuBcnAekNPntYsXqF8dakcmAnvcLlU= X-Received: by 2002:ae9:de81:: with SMTP id s123mr12093459qkf.339.1564755497429; Fri, 02 Aug 2019 07:18:17 -0700 (PDT) MIME-Version: 1.0 References: In-Reply-To: From: Oytun Tez Date: Fri, 2 Aug 2019 10:18:06 -0400 Message-ID: Subject: Re: Best pattern to signal a watermark > t across all tasks? To: Eduardo Winpenny Tejedor Cc: Fabian Hueske , user Content-Type: multipart/alternative; boundary="00000000000095bf83058f230795" --00000000000095bf83058f230795 Content-Type: text/plain; charset="UTF-8" Content-Transfer-Encoding: quoted-printable This bit of info is very useful, Fabian, thank you: You can get the parallel task id from the > RuntimeContext.getIndexOfThisSubtask(). > RuntimeContext.getNumberOfParallelSubtasks() gives the total number of > tasks. --- Oytun Tez *M O T A W O R D* The World's Fastest Human Translation Platform. oytun@motaword.com =E2=80=94 www.motaword.com On Fri, Aug 2, 2019 at 9:20 AM Eduardo Winpenny Tejedor < eduardo.winpenny@gmail.com> wrote: > awesome, thanks > > On Fri, 2 Aug 2019, 10:56 Fabian Hueske, wrote: > >> Hi, >> >> Regarding step 3, it is sufficient to check that you got on message from >> each parallel task of the previous operator. That's because a task >> processes the timers of all keys before moving forward. >> Timers are always processed per key, but you could deduplicate on the >> parallel task id and check that you got a message from each task. >> >> You can get the parallel task id from the >> RuntimeContext.getIndexOfThisSubtask(). >> RuntimeContext.getNumberOfParallelSubtasks() gives the total number of >> tasks. >> >> Fabian >> >> Am Fr., 2. Aug. 2019 um 10:55 Uhr schrieb Eduardo Winpenny Tejedor < >> eduardo.winpenny@gmail.com>: >> >>> Hi Oytun, that sounds like a great idea thanks!! Just wanted to confirm >>> a couple of things: >>> >>> -In step 2 by merging do you mean anything else apart from setting the >>> operator parallelism to 1? Forcing a parallelism of 1 should ensure all >>> items go to the same task. >>> >>> -In step 3 I don't think I could check an item for each key has been >>> received, I would need to know how many keys I have on my stream (or co= uld >>> I!? that's exactly what I'm trying to solve) but I could definitely rel= y on >>> Flink's watermarking mechanism. If the watermark > t (t being the time = for >>> the trigger of the first operator) it must mean all streams have finish= ed. >>> >>> Thanks again >>> >>> On Thu, 1 Aug 2019, 18:34 Oytun Tez, wrote: >>> >>>> Perhaps: >>>> >>>> 1. collect() an item inside onTimer() inside operator#1 >>>> 2. merge the resulting stream from all keys >>>> 3. process the combined stream in operator#2 to see if all keys >>>> were processed. you will probably want to keep state in the operato= r#2 to >>>> see if you received items from all keys. >>>> >>>> >>>> --- >>>> Oytun Tez >>>> >>>> *M O T A W O R D* >>>> The World's Fastest Human Translation Platform. >>>> oytun@motaword.com =E2=80=94 www.motaword.com >>>> >>>> >>>> On Thu, Aug 1, 2019 at 1:06 PM Eduardo Winpenny Tejedor < >>>> eduardo.winpenny@gmail.com> wrote: >>>> >>>>> Hi all, >>>>> >>>>> I have a keyed operator with an hourly event time trigger. On a timer >>>>> trigger, the operator simply persists some state to a table. >>>>> >>>>> I'd like to know when the triggers for all keys have finished so I ca= n >>>>> send a further signal to the data warehouse, to indicate it has all t= he >>>>> necessary data to start producing a report. >>>>> >>>>> How can I achieve this? If my operator is distributed across differen= t >>>>> machine tasks I need to make sure I don't send the signal to the data >>>>> warehouse before the timers for every key have fired. >>>>> >>>>> Thanks, >>>>> Eduardo >>>>> >>>> --00000000000095bf83058f230795 Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
This bit of info is very useful, Fabian, thank you:
You can get the pa= rallel task id from the RuntimeContext.getIndexOfThisSubtask().=C2=A0
Ru= ntimeContext.getNumberOfParallelSubtasks() gives the total number of tasks.=








=C2=A0
=

---
Oytun Tez

M O = T A W O R D
The Wo= rld's Fastest Human Translation Platform.
oytun@motaword.com=C2=A0= =E2=80=94=C2=A0www.motawor= d.com

=
On Fri= , Aug 2, 2019 at 9:20 AM Eduardo Winpenny Tejedor <eduardo.winpenny@gmail.com> wrote:
awesom= e, thanks

On Fri, 2 Aug 2019, 10:56 Fabian Hueske, <fhueske@gmail.com> wrote:
Hi= ,

Regarding step 3, it is sufficient to check that= you got on message from each parallel task of the previous operator. That&= #39;s because a task processes the timers of all keys before moving forward= .
Timers are always processed per key, but you could deduplic= ate on the parallel task id and check that you got a message from each task= .

You can get the parallel task id from the Runtim= eContext.getIndexOfThisSubtask().
RuntimeContext.getNumberOf= ParallelSubtasks() gives the total number of tasks.

Fabian

Am Fr., 2. Aug. 2019 um 10:55=C2=A0Uhr schrieb Eduardo Wi= npenny Tejedor <eduardo.winpenny@gmail.com>:
Hi Oytun, = that sounds like a great idea thanks!! Just wanted to confirm a couple of t= hings:

-In step 2 by merging d= o you mean anything else apart from setting the operator parallelism to 1? = Forcing a parallelism of 1 should ensure all items go to the same task.

-In step 3 I don't thin= k I could check an item for each key has been received, I would need to kno= w how many keys I have on my stream (or could I!? that's exactly what I= 'm trying to solve) but I could definitely rely on Flink's watermar= king mechanism. If the watermark > t (t being the time for the trigger o= f the first operator) it must mean all streams have finished.

Thanks again

On Thu, 1 Au= g 2019, 18:34 Oytun Tez, <oytun@motaword.com> wrote:
Perhaps:=C2= =A0
  1. collect() an item inside onTimer() inside operator#1
  2. merge the resulting stream from all keys
  3. process=C2=A0the combine= d stream in operator#2 to see if all keys were processed. you will probably= want to keep state in the operator#2 to see if you received items from all= keys.

---
= Oytun Tez

M O T A W O R D
The World's Fastest Human Translation Platform.

<= br>
On Thu,= Aug 1, 2019 at 1:06 PM Eduardo Winpenny Tejedor <edu= ardo.winpenny@gmail.com> wrote:
Hi all,

I have a keyed operator with an hourly event time trigge= r. On a timer trigger, the operator simply persists some state to a table.<= /div>

I'd like to know whe= n the triggers for all keys have finished so I can send a further signal to= the data warehouse, to indicate it has all the necessary data to start pro= ducing a report.

How can= I achieve this? If my operator is distributed across different machine tas= ks I need to make sure I don't send the signal to the data warehouse be= fore the timers for every key have fired.

=
Thanks,
Eduardo
--00000000000095bf83058f230795--