Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id DE9DE200BC0 for ; Tue, 15 Nov 2016 09:09:40 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id DD4E8160B03; Tue, 15 Nov 2016 08:09:40 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 66B4E160B02 for ; Tue, 15 Nov 2016 09:09:39 +0100 (CET) Received: (qmail 26501 invoked by uid 500); 15 Nov 2016 08:09:38 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 26490 invoked by uid 99); 15 Nov 2016 08:09:38 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 15 Nov 2016 08:09:38 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id EA9201A066D for ; Tue, 15 Nov 2016 08:09:37 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 3.693 X-Spam-Level: *** X-Spam-Status: No, score=3.693 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RCVD_IN_SORBS_SPAM=0.5, SPF_PASS=-0.001, URIBL_BLOCKED=0.001, URI_HEX=1.313] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (1024-bit key) header.d=zweitag.de Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id WdpesgEu_P5s for ; Tue, 15 Nov 2016 08:09:34 +0000 (UTC) Received: from mail-wm0-f43.google.com (mail-wm0-f43.google.com [74.125.82.43]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id 0661B5F295 for ; Tue, 15 Nov 2016 08:09:28 +0000 (UTC) Received: by mail-wm0-f43.google.com with SMTP id a197so150018774wmd.0 for ; Tue, 15 Nov 2016 00:09:28 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=zweitag.de; s=google; h=from:message-id:mime-version:subject:date:references:to:in-reply-to; bh=Y5wARtJ1H2/5rroOEVGbAL+bNgrW6EvI4Lju4IWsZBg=; b=NkHsQsQ982DUS8pxgSNH3lI7gM5IBn8Usj+38JE/bnLFY7ta2imkYwkLp+tbc6n9Bs yzMiQiy3D7wqFE999xvE/90JqeqyBYE3wZDUYvhxRYMZL770cWYi6xMkBTM/g8WI1I9t /XKOlhZLuvHIBegNtaRPLJgrKr7pIJDjWqx2E= X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:from:message-id:mime-version:subject:date :references:to:in-reply-to; bh=Y5wARtJ1H2/5rroOEVGbAL+bNgrW6EvI4Lju4IWsZBg=; b=foWPEFJAGrvNSs1VrHbf3bgSSmYjPR1tO6fsjsW85Xk8VgdAEYFB1t9s6IsyS8qnGg 1Kt7uWgzdYsrqLNpKIfe2MkmjQ/RpzzYoii9oJL/2bdJkpR4s9LmFukwdumXHgZ3QBC+ ZlaUvjD71+hrtr7A1rgqKgNjLGWAbwc0Lj0zpwGY7fS8bZw4hYTJ9a5cZmKAmllYyXdI 21E5QQLHVyOOX09iT5/y6k5Az6mcYIMoRo/SPW3r5UmLTN4epBLiyUPHOBJjqyTNdwfN qu1EFHe9Xngev/qIEpi6GdYnfyrwUNWwoROwmrf2FCpvdZSByNvsOWttqF354JorKMKh pDxw== X-Gm-Message-State: ABUngveOb6bTQMG3Immls7SDp7X02vRfMwQ0M2ZK57I4lMXb85rrm9leFJBT8yOCrm68XGR0 X-Received: by 10.194.23.4 with SMTP id i4mr22134284wjf.185.1479197366756; Tue, 15 Nov 2016 00:09:26 -0800 (PST) Received: from [192.168.2.120] (p5DC7CB74.dip0.t-ipconnect.de. [93.199.203.116]) by smtp.gmail.com with ESMTPSA id g197sm2740486wmd.15.2016.11.15.00.09.25 for (version=TLS1 cipher=ECDHE-RSA-AES128-SHA bits=128/128); Tue, 15 Nov 2016 00:09:26 -0800 (PST) From: Stephan Epping Content-Type: multipart/alternative; boundary="Apple-Mail=_0F43505C-3F5A-4B47-801C-4F6C0CCC7107" Message-Id: Mime-Version: 1.0 (Mac OS X Mail 8.2 \(2104\)) Subject: Re: Maintaining watermarks per key, instead of per operator instance Date: Tue, 15 Nov 2016 09:09:29 +0100 References: <0EEE269C-42B9-4D82-ABD1-A95CE2AED9B5@zweitag.de> To: user@flink.apache.org In-Reply-To: X-Mailer: Apple Mail (2.2104) archived-at: Tue, 15 Nov 2016 08:09:41 -0000 --Apple-Mail=_0F43505C-3F5A-4B47-801C-4F6C0CCC7107 Content-Transfer-Encoding: quoted-printable Content-Type: text/plain; charset=utf-8 Hey Aljoscha, that sounds very promising, awesome! Though, I still would need to = implement my own window management logic (window assignment and window = state purging), right? I was thinking about reusing some of the existing = components (TimeWindow) and WindowAssigner, but run my own = WindowOperator (aka ProcessFunction). But I am not sure, if that is done = easily. I would love to hear your opinion on that, and what the tricky = parts will be? For example, common mistakes you experienced in = developing the windowing mechanism. best Stephan > On 14 Nov 2016, at 19:05, Aljoscha Krettek = wrote: >=20 > Hi Stephan, > I was going to suggest that using a flatMap and tracking the timestamp = of each key yourself is a bit like having a per-key watermark. I wanted = to wait a bit before answering because I'm currently working on a new = type of Function that will be release with Flink 1.2: ProcessFunction. = This is somewhat like a FlatMap but also allows to access the element = timestamp, query current processing time/event time and set (per key) = timers for processing time and event time. With this you should be able = to easily implement your per-key tracking, I hope. >=20 > Cheers, > Aljoscha >=20 > P.S. ProcessFunction is already in the Flink repository but it's = called TimelyFlatMapFunction right now, because I was working on it = under that working title. >=20 > On Mon, 14 Nov 2016 at 15:47 kaelumania > wrote: > Hey Fabian, >=20 > thank you very much.=20 >=20 > - yes, I would window by event time and fire/purge by processing time > - Cheaper in the end meant, that having too much state in the flink = cluster would be more expensive, as we store all data in cassandra too.I = think the fault tolerance would be okay, as we would make a compare and = set with cassandra.=20 >=20 > With the flatMap Operator wouldn=E2=80=99t it be like running my own = windowing mechanism? I need to keep the aggregate window per sensor open = (with checkpointing and state management) until I receive an element for = a sensor that is later in time than the windows time and then purge the = state and emit a new event (which is like having a watermark per = sensor). Further, I need a timer that fires like after 24 hours, in case = a sensor dies and doesn=E2=80=99t send more data which might is possible = with window assigner/trigger, right? But not inside normal functions, = e.g. flatMap? We can guarantee that all sensor data per sensor comes = almost in order (might be out of order within a few seconds), but there = might be gaps of several hours after network partitions. >=20 > There is now way to define/redefine the watermark per keyed stream? Or = adjust the window assigner + trigger to achieve the desired behaviour? I = am a bit reserved in implementing the whole state management. Do you = plan to support such use cases on keyed streams? Maybe the = WatermarkAssigner could also receive information about the key for wich = the watermark should be calculated etc. >=20 > best, Stephan >=20 >=20 >=20 >> On 14 Nov 2016, at 15:17, Fabian Hueske-2 [via Apache Flink User = Mailing List archive.] <[hidden email] = > wrote: >>=20 >=20 >> Hi Stephan, >>=20 >> I'm skeptical about two things:=20 >> - using processing time will result in inaccurately bounded = aggregates (or do you want to group by event time in a processing time = window?) >> - writing to and reading from Cassandra might be expensive (not sure = what you mean by cheaper in the end) and it is not integrated with = Flink's checkpointing mechanism for fault-tolerance. >>=20 >> To me, the stateful FlatMapOperator looks like the best approach. = There is an upcoming feature for registering timers in user-functions, = i.e., a function is called after the timer exceeds. This could be = helpful to overcome the problem of closing the window without new data. >>=20 >> Best,=20 >> Fabian >=20 >>=20 >> 2016-11-14 8:39 GMT+01:00 Stephan Epping <" = target=3D"_top" rel=3D"nofollow" link=3D"external" class=3D"">[hidden = email]>: >=20 >> Hello Fabian, >>=20 >> Thank you very much. What is your opinion on the following solution: >>=20 >> - Window data per time window, e.g. 15 minutes >> - using processing time as trigger, e.g. 15 minutes >> - which results in an aggregate over sensor values >> - then use cassandra to select the previous aggregate (as there can = be multiple for the time window due to processing time) >> - then update the aggregate and put it into a cassandra sink again >>=20 >> The cassandra select will be a bit slower than using an in = memory/flink state, but will be cheaper in the end. Further, what does = this have for consequences? >> For example, replaying events will be more difficult, right? Also, = what about Snapshots? Will they work with the mentioned design? >>=20 >> kind regards, >> Stephan >=20 >>> On 11 Nov 2016, at 00:39, Fabian Hueske <" = target=3D"_top" rel=3D"nofollow" link=3D"external" class=3D"">[hidden = email]> wrote: >>>=20 >=20 >>> Hi Stephan, >>>=20 >>> I just wrote an answer to your SO question.=20 >>>=20 >>> Best, Fabian >=20 >>>=20 >>> 2016-11-10 11:01 GMT+01:00 Stephan Epping <" = target=3D"_top" rel=3D"nofollow" link=3D"external" class=3D"">[hidden = email]>: >=20 >>>=20 >>> Hello, >>>=20 >>> I found this question in the Nabble archive = (http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maint= aining-watermarks-per-key-instead-of-per-operator-instance-tp7288.html = ) = but was unable/dont know how to reply. >>>=20 >>> Here is my question regarding the mentioned thread: >>>=20 >>>> Hello,=20 >>>>=20 >>>> I have similar requirements (see StackOverflor = http://stackoverflow.com/questions/40465335/apache-flink-multiple-window-a= ggregations-and-late-data = ). I am pretty new to flink, could you = elaborate on a possible solution? We can guarantee good ordering by = sensor_id, thus watermarking by key would be the only reasonable way for = us (sensorData.keyBy('id').timeWindow(1.minute).sum('value')), could I = do my own watermarking = aftersensorData.keyBy('id').overwriteWatermarking()... per key? Or maybe = using custom state plus a custom trigger? What happens if a sensor dies = or is being removed completely, how can this be detected as watermarks = would be ignored for window garbage collection. Or could we dynamically = schedule a job of each sensor? Which would result in 1000 Jobs. >>>=20 >>>=20 >>> Thanks, >>> Stephan >>>=20 >>>=20 >=20 >>>=20 >>=20 >>=20 >>=20 >>=20 >> If you reply to this email, your message will be added to the = discussion below: >> = http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Mainta= ining-watermarks-per-key-instead-of-per-operator-instance-tp7288p10094.htm= l = >> To unsubscribe from Maintaining watermarks per key, instead of per = operator instance, click here <>. >> NAML = >=20 > View this message in context: Re: Maintaining watermarks per key, = instead of per operator instance = > Sent from the Apache Flink User Mailing List archive. mailing list = archive = = at Nabble.com . --Apple-Mail=_0F43505C-3F5A-4B47-801C-4F6C0CCC7107 Content-Transfer-Encoding: quoted-printable Content-Type: text/html; charset=utf-8 Hey Aljoscha,

that sounds very promising, awesome! Though, I still would = need to implement my own window management logic (window assignment and = window state purging), right? I was thinking about reusing some of the = existing components (TimeWindow) and WindowAssigner, but run my own = WindowOperator (aka ProcessFunction). But I am not sure, if that is done = easily. I would love to hear your opinion on that, and what the tricky = parts will be? For example, common mistakes you experienced in = developing the windowing mechanism.

best Stephan



Hi Stephan,
I was going to suggest that using = a flatMap and tracking the timestamp of each key yourself is a bit like = having a per-key watermark. I wanted to wait a bit before answering = because I'm currently working on a new type of Function that will be = release with Flink 1.2: ProcessFunction. This is somewhat like a FlatMap = but also allows to access the element timestamp, query current = processing time/event time and set (per key) timers for processing time = and event time. With this you should be able to easily implement your = per-key tracking, I hope.

Cheers,
Aljoscha

P.S. ProcessFunction is already in the = Flink repository but it's called TimelyFlatMapFunction right now, = because I was working on it under that working title.

On Mon, 14 Nov 2016 at 15:47 kaelumania <stephan.epping@zweitag.de> wrote:
Hey Fabian,

thank you very much. 

- yes, I would window by event time and fire/purge = by processing time
- Cheaper in the end = meant, that having too much state in the flink cluster would be more = expensive, as we store all data in cassandra too.I think the fault = tolerance would be okay, as we would make a compare and set with = cassandra. 

With the flatMap = Operator wouldn=E2=80=99t it be like running my own windowing mechanism? = I need to keep the aggregate window per sensor open (with checkpointing = and state management) until I receive an element for a sensor that is = later in time than the windows time and then purge the state and emit a = new event (which is like having a watermark per sensor). Further, I need = a timer that fires like after 24 hours, in case a sensor dies and = doesn=E2=80=99t send more data which might is possible with window = assigner/trigger, right? But not inside normal functions, e.g. flatMap? = We can guarantee that all sensor data per sensor comes almost in order = (might be out of order within a few seconds), but there might be gaps of = several hours after network partitions.

There is now way to = define/redefine the watermark per keyed stream? Or adjust the window = assigner + trigger to achieve the desired behaviour? I am a bit reserved = in implementing the whole state management. Do you plan to support such = use cases on keyed streams? Maybe the WatermarkAssigner could also = receive information about the key for wich the watermark should be = calculated etc.

best, = Stephan


On 14 Nov 2016, at 15:17, Fabian Hueske-2 [via = Apache Flink User Mailing List archive.] <[hidden email]> wrote:

<= div class=3D"gmail_msg">
Hi = Stephan,

I'm = skeptical about two things: 
- using processing = time will result in inaccurately bounded aggregates (or do you want to = group by event time in a processing time window?)
- writing to and reading from Cassandra might = be expensive (not sure what you mean by cheaper in the end) and it is = not integrated with Flink's checkpointing mechanism for = fault-tolerance.

To = me, the stateful FlatMapOperator looks like the best approach. There is = an upcoming feature for registering timers in user-functions, i.e., a = function is called after the timer exceeds. This could be helpful to = overcome the problem of closing the window without new data.

Best, 
Fabian

2016-11-14 8:39 = GMT+01:00 Stephan Epping <<a = href=3D"x-msg://10/user/SendEmail.jtp?type=3Dnode&amp;node=3D10094&= amp;amp;i=3D0" target=3D"_top" rel=3D"nofollow" link=3D"external" = class=3D"">[hidden email]>:
Hello Fabian,

Thank you very much. What is your opinion on the = following solution:

- Window data per = time window, e.g. 15 minutes
- using = processing time as trigger, e.g. 15 minutes
- which results in an aggregate over sensor = values
- then use cassandra to select the = previous aggregate (as there can be multiple for the time window due to = processing time)
- then update the = aggregate and put it into a cassandra sink again

The cassandra select will be a bit slower than using = an in memory/flink state, but will be cheaper in the end. Further, what = does this have for consequences?
For = example, replaying events will be more difficult, right? Also, what = about Snapshots? Will they work with the mentioned design?

kind regards,
Stephan
On 11 Nov 2016, at 00:39, Fabian Hueske <<a = href=3D"x-msg://10/user/SendEmail.jtp?type=3Dnode&amp;node=3D10094&= amp;amp;i=3D1" target=3D"_top" rel=3D"nofollow" link=3D"external" = class=3D"">[hidden email]> wrote:

Hi Stephan,

I just wrote an answer = to your SO question. 

Best, Fabian

2016-11-10 = 11:01 GMT+01:00 Stephan Epping <<a = href=3D"x-msg://10/user/SendEmail.jtp?type=3Dnode&amp;node=3D10094&= amp;amp;i=3D2" target=3D"_top" rel=3D"nofollow" link=3D"external" = class=3D"">[hidden = email]>:
<= /div>

Hello,


Here is my question regarding the mentioned = thread:

Hello, 

I have similar = requirements (see StackOverflor http://stackoverflow.com/questions/40465335/apache-flink= -multiple-window-aggregations-and-late-data). I am pretty new to = flink, could you elaborate on a possible solution? We can guarantee good = ordering by sensor_id, thus watermarking by key would be the only = reasonable way for us (sensorData.keyBy('id').timeWindow(1.minute).sum('value')), = could I do my own watermarking aftersensorData.keyBy('id').overwriteWatermarking()... = per key? Or maybe using custom state plus a custom trigger? What happens = if a sensor dies or is being removed completely, how can this be = detected as watermarks would be ignored for window garbage collection. = Or could we dynamically schedule a job of each sensor? Which would = result in 1000 Jobs.

Thanks,
Stephan








To unsubscribe from = Maintaining watermarks per key, instead of per operator instance, click here.
NAML



View this message in context: Re: Maintaining = watermarks per key, instead of per operator instance
Sent from the Apache Flink User Mailing List = archive. mailing list archive at Nabble.com.

= --Apple-Mail=_0F43505C-3F5A-4B47-801C-4F6C0CCC7107--