Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 5065A2009E0 for ; Wed, 1 Jun 2016 14:54:57 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 4ED05160A4C; Wed, 1 Jun 2016 12:54:57 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 73616160A3B for ; Wed, 1 Jun 2016 14:54:56 +0200 (CEST) Received: (qmail 6849 invoked by uid 500); 1 Jun 2016 12:54:55 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 6838 invoked by uid 99); 1 Jun 2016 12:54:55 -0000 Received: from mail-relay.apache.org (HELO mail-relay.apache.org) (140.211.11.15) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 01 Jun 2016 12:54:55 +0000 Received: from mail-it0-f43.google.com (mail-it0-f43.google.com [209.85.214.43]) by mail-relay.apache.org (ASF Mail Server at mail-relay.apache.org) with ESMTPSA id 394361A0044 for ; Wed, 1 Jun 2016 12:54:55 +0000 (UTC) Received: by mail-it0-f43.google.com with SMTP id z189so78384268itg.0 for ; Wed, 01 Jun 2016 05:54:55 -0700 (PDT) X-Gm-Message-State: ALyK8tJNqmpYI/AW9nDxevc9PkEH7jEPBk0NyRR51Q21pTb86MhXsyvytj01yh2RYeO/bExp/10nri0QsBkVxw== X-Received: by 10.36.120.12 with SMTP id p12mr20666289itc.22.1464785694423; Wed, 01 Jun 2016 05:54:54 -0700 (PDT) MIME-Version: 1.0 References: In-Reply-To: From: Aljoscha Krettek Date: Wed, 01 Jun 2016 12:54:44 +0000 X-Gmail-Original-Message-ID: Message-ID: Subject: Re: Maintaining watermarks per key, instead of per operator instance To: leon_mclare@tutanota.com, User Content-Type: multipart/alternative; boundary=001a114ab6cafd6049053436fe05 archived-at: Wed, 01 Jun 2016 12:54:57 -0000 --001a114ab6cafd6049053436fe05 Content-Type: text/plain; charset=UTF-8 Hi, yeah, in that case per-key watermarks would be useful for you. I won't be possible to add such a feature, though, due to the (possibly) dynamic nature of the key space and how watermark tracking works. You should be able to implement it with relatively low overhead using a RichFlatMapFunction and keyed state. This is the relevant section of the doc: https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state.html#using-the-keyvalue-state-interface . We are also in the process of improving our windowing system, especially when it comes to late data, cleanup and trigger semantics. You can have a look here if you're interested: https://docs.google.com/document/d/1Xp-YBf87vLTduYSivgqWVEMjYUmkA-hyb4muX3KRl08/edit?usp=sharing . Best, Aljoscha On Tue, 31 May 2016 at 14:36 wrote: > Hi Aljoscha, > > thanks for the speedy reply. > > I am processing measurements delivered by smart meters. I use windows to > gather measurements and calculate values such as average consumption. The > key is simply the meter ID. > > The challenge is that meters may undergo network partitioning, under which > they fall back to local buffering. The data is then transmitted once > connectivity has been re-established. I am using event time to obtain > accurate calculations. > > If a specific meter goes offline, and the watermark progresses to the next > window for an operator instance, then all late data will be discarded once > that meter is online again, until it has caught up to the event time. This > is because I am using a custom EventTimeTrigger implementation that > discards late elements. The reason for that is because Flink would > otherwise immediately evaluate the window upon receiving a late element, > which is a problem since my calculations (e.g. the average consumption) > depend on multiple elements. I cannot calculate averages with that single > late element. > > Each individual meter guarantees in-order transmission of measurements. If > watermarks progressed per key, then i would never have late elements > because of that guarantee. I would be able to accurately calculate > averages, with the trade-off that my results would arrive sporadically from > the same operator instance. > > I suppose I could bypass the use of windows by implementing a stateful map > function that mimics windows to a certain degree. I implemented something > similar in Storm, but the amount of application logic required is > substantial. > > I completely understand why Flink evaluates a window on a late element, > since there is no other way to know when to evaluate the window as event > time has already progressed. > > Perhaps there is a way to gather/redirect late elements? > > Regards > Leon > > 31. May 2016 13:37 by aljoscha@apache.org: > > > Hi, > I'm afraid this is impossible with the current design of Flink. Might I > ask what you want to achieve with this? Maybe we can come up with a > solution. > > -Aljoscha > > On Tue, 31 May 2016 at 13:24 wrote: > >> My use case primarily concerns applying transformations per key, with the >> keys remaining fixed throughout the topology. I am using event time for my >> windows. >> >> The problem i am currently facing is that watermarks in windows propagate >> per operator instance, meaning the operator event time increases for all >> keys that the operator is in charge of. I wish for watermarks to progress >> per key, not per operator instance. >> >> Is this easily possible? I was unable to find an appropriate solution >> based on existing code recipes. >> >> Greetings >> Leon >> > --001a114ab6cafd6049053436fe05 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Hi,
yeah, in that case per-key watermarks would be use= ful for you. I won't be possible to add such a feature, though, due to = the (possibly) dynamic nature of the key space and how watermark tracking w= orks.

You should be able to implement it with rela= tively low overhead using a RichFlatMapFunction and keyed state. This is th= e relevant section of the doc:=C2=A0https://ci.apache.org/projects/flink/flink-docs-master/apis/s= treaming/state.html#using-the-keyvalue-state-interface.

<= /div>
We are also in the process of improving our windowing system, esp= ecially when it comes to late data, cleanup and trigger semantics. You can = have a look here if you're interested:=C2=A0https://docs.google.com/document/d/1Xp-YBf87vLTduYSivgqWVEMjYUmkA-= hyb4muX3KRl08/edit?usp=3Dsharing.

Best,
<= div>Aljoscha

On = Tue, 31 May 2016 at 14:36 <l= eon_mclare@tutanota.com> wrote:
=20 =20 =20
Hi Aljoscha,

thanks for the speedy reply.

I am processing mea= surements delivered by smart meters. I use windows to gather measurements a= nd calculate values such as average consumption. The key is simply the mete= r ID.

The challenge is that meters may undergo network partitioning= , under which they fall back to local buffering. The data is then transmitt= ed once connectivity has been re-established. I am using event time to obta= in accurate calculations.

If a specific meter goes offline, and the = watermark progresses to the next window for an operator instance, then all = late data will be discarded once that meter is online again, until it has c= aught up to the event time. This is because I am using a custom EventTimeTr= igger implementation that discards late elements. The reason for that is be= cause Flink would otherwise immediately evaluate the window upon receiving = a late element, which is a problem since my calculations (e.g. the average = consumption) depend on multiple elements. I cannot calculate averages with = that single late element.

Each individual meter guarantees in-order = transmission of measurements. If watermarks progressed per key, then i woul= d never have late elements because of that guarantee. I would be able to ac= curately calculate averages, with the trade-off that my results would arriv= e sporadically from the same operator instance.

I suppose I could by= pass the use of windows by implementing a stateful map function that mimics= windows to a certain degree. I implemented something similar in Storm, but= the amount of application logic required is substantial.

I complete= ly understand why Flink evaluates a window on a late=20 element, since there is no other way to know when to evaluate the window as event time has already progressed.

Perhaps there is a way to gat= her/redirect late elements?

Regards
Leon

31. May 2016 13:3= 7 by aljoscha@apac= he.org:


Hi,
I'm afraid this i= s impossible with the current design of Flink. Might I ask what you want to= achieve with this? Maybe we can come up with a solution.

-Aljoscha

On Tue, 31= May 2016 at 13:24 <leon_mclare@tutanota.com> wrote:
=20 =20 =20
My use case primarily concerns applying transformations per key, with the k= eys remaining fixed throughout the topology. I am using event time for my w= indows.

The problem i am currently facing is that watermarks in wind= ows propagate per operator instance, meaning the operator event time increa= ses for all keys that the operator is in charge of. I wish for watermarks t= o progress per key, not per operator instance.

Is this easily possi= ble? I was unable to find an appropriate solution based on existing code re= cipes.

Greetings
Leon
--001a114ab6cafd6049053436fe05--