Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 47D3B200B8B for ; Tue, 4 Oct 2016 17:05:04 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 4657A160AC9; Tue, 4 Oct 2016 15:05:04 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id E2109160AC5 for ; Tue, 4 Oct 2016 17:05:02 +0200 (CEST) Received: (qmail 42848 invoked by uid 500); 4 Oct 2016 15:05:02 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 42838 invoked by uid 99); 4 Oct 2016 15:05:02 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 04 Oct 2016 15:05:01 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 829DFC0D64 for ; Tue, 4 Oct 2016 15:05:01 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 3.793 X-Spam-Level: *** X-Spam-Status: No, score=3.793 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RCVD_IN_SORBS_SPAM=0.5, URI_HEX=1.313] autolearn=disabled Authentication-Results: spamd4-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=data-artisans-com.20150623.gappssmtp.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id FTGmqDSQSCdT for ; Tue, 4 Oct 2016 15:04:57 +0000 (UTC) Received: from mail-wm0-f49.google.com (mail-wm0-f49.google.com [74.125.82.49]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id 205435F24F for ; Tue, 4 Oct 2016 15:04:57 +0000 (UTC) Received: by mail-wm0-f49.google.com with SMTP id p138so217160660wmb.1 for ; Tue, 04 Oct 2016 08:04:56 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=data-artisans-com.20150623.gappssmtp.com; s=20150623; h=from:message-id:mime-version:subject:date:references:to:in-reply-to; bh=TwkUgYRpQJfRScCrejZ95LUoo1mqa+C1n2OcuLAvXow=; b=HZmouV1rt0RnwLyU7pIpm/vtFR9NDYUHt7oIGsAYGXe3gOOIJHa29PsX6V36R4hWk8 H+9MGvAwafgnaDp6jUKbfcRdJ3RnpkVF8n9XWnBTVVQo6uI/zIE7L0FMyUdSyC4ZCSn5 ijsf5CFRqQQ8XSB6z6LLG8rmK09LAsXY94chH+8Zo7rxbjn1ySFdtolKAdMVrKHy8N3A iX2PSlPYsi3SZx6iHJIHKVqDPfQMaxZ9+HOiOk7CvYp5/6XjfMp0yEKlZZIQZvM475BW q93aD5RILZ4lF4E7YkBQEL+gN3CJl7j6j5qDZC+8KEVjWebiew6dUPwZjxC3gEK0cZZc WJWg== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:from:message-id:mime-version:subject:date :references:to:in-reply-to; bh=TwkUgYRpQJfRScCrejZ95LUoo1mqa+C1n2OcuLAvXow=; b=eQdpSmazPM4jhUQ2ICJ6ftEZqKfkpkqpLQucv5BLbE/6io2acyoU4kp7kllS7k5B6Q moRtyMTk/gucB9LnaWIXBOhelqHVOATiT32jiVtHIuCVsam4IQZZzSaJm1AuK11ya1RS rl7nVjAnVa5qSnhK6yAskQbiR7G8f4ALrITkPkEzGxinXeuxV0OAbTC70T4N6mRK4bzK 774yDVrONkZpgz3xG2nvRKIN+/0DRK4C+JPko33SOkC9dDEdBOB9vPwbzkBFH/kV6Tzi iAz8UVxIH3shIJiFCt8FJqRrnLSTOEFtI1u6jg1aSINVE4xlWpsOg1GDtuIvRlDzEidX /e3w== X-Gm-Message-State: AA6/9RmVgblhj1O2pYyiXGgcDtB4oUpWmbEnYFsAGydxLD9z4rjVuQWG9RdNwV0AVGtZ4P8l X-Received: by 10.28.25.71 with SMTP id 68mr16977454wmz.19.1475593494688; Tue, 04 Oct 2016 08:04:54 -0700 (PDT) Received: from macklou.fritz.box (ip5b4035dc.dynamic.kabel-deutschland.de. [91.64.53.220]) by smtp.gmail.com with ESMTPSA id uw3sm3999001wjb.21.2016.10.04.08.04.53 for (version=TLS1 cipher=ECDHE-RSA-AES128-SHA bits=128/128); Tue, 04 Oct 2016 08:04:53 -0700 (PDT) From: Kostas Kloudas Content-Type: multipart/alternative; boundary="Apple-Mail=_2FCB83E5-DE3D-46D9-ABED-7836F1CA5C71" Message-Id: <2979A1E0-25B8-4175-A0B0-0A144F3E935F@data-artisans.com> Mime-Version: 1.0 (Mac OS X Mail 9.3 \(3124\)) Subject: Re: Regarding Late Elements Date: Tue, 4 Oct 2016 17:04:52 +0200 References: <476155053.4379932.1475507131317@mail.yahoo.com> <1445783729.4442656.1475508486819@mail.yahoo.com> <562747B3-C8A4-44A8-AE27-391E36CF2E4C@data-artisans.com> To: user@flink.apache.org In-Reply-To: X-Mailer: Apple Mail (2.3124) archived-at: Tue, 04 Oct 2016 15:05:04 -0000 --Apple-Mail=_2FCB83E5-DE3D-46D9-ABED-7836F1CA5C71 Content-Transfer-Encoding: quoted-printable Content-Type: text/plain; charset=utf-8 Hi Vinay, =46rom what I understand from your code, the only difference of your = trigger compared to the=20 one shipping with Flink is that for the late elements, instead of firing = and keeping the element,=20 you fire and purge, i.e. clean the window state. This does not solve the problem of dropping the super late elements if = their window has expired (currentWatermark >=3D window.maxTimestamp + allowedLateness). Cheers, Kostas > On Oct 4, 2016, at 3:25 PM, vinay patil = wrote: >=20 > Hi Kostas, >=20 > Thank you for your reply, yes that will be a good functionality to = have, but for now the Custom Trigger as close to 1.0.3 works for me. >=20 >=20 > public TriggerResult onElement(Object element, long timestamp, = TimeWindow window, TriggerContext ctx) throws Exception { > if(window.maxTimestamp() <=3D ctx.getCurrentWatermark()) = { > return TriggerResult.FIRE_AND_PURGE; > } else { > ctx.registerEventTimeTimer(window.maxTimestamp()); > return TriggerResult.CONTINUE; > } > } > public TriggerResult onEventTime(long time, TimeWindow window, = TriggerContext ctx) { return TriggerResult.FIRE_AND_PURGE; } >=20 > This is the change I have done in my custom trigger which is similar = to 1.0.3 , does this looks good to you ? > (I am not using canMerge and onMerge methods since I am not aware of = it) >=20 >=20 > Regards, > Vinay Patil >=20 > On Tue, Oct 4, 2016 at 3:58 AM, Kostas Kloudas [via Apache Flink User = Mailing List archive.] <[hidden email] = > wrote: > Hello LF and Vinay, >=20 > With the introduction of =E2=80=9Callowed lateness=E2=80=9D elements = and windows are kept around until the watermark > passes the window.maxTimestamp + allowed_lateness and then they are = cleaned up (garbage collected) >=20 > Every element that comes in and belongs to a window that is garbage = collected is dropped as super-late. > Elements that are late, but no more than the allowed lateness, they = are kept the window fires as before. >=20 > If you know what the maximum latency is, then the best way is to set = the allowed lateness to that value. >=20 > Currently Flink drops super-late elements and does not provide any = mechanism to manually handle these=20 > elements, BUT there are discussions about adding such a functionality = that will allow you to (probably) have=20 > a separate stream with only these elements. >=20 > Thanks, > Kostas >=20 >> On Oct 3, 2016, at 5:28 PM, [hidden email] = wrote: >>=20 >> Not yet. >> I'm hoping a Flink export on this mailing list will reply. >>=20 >>=20 >> - LF >>=20 >>=20 >>=20 >> From: vinay patil <[hidden email] = > >> To: [hidden email] = =20 >> Sent: Monday, October 3, 2016 8:09 AM >> Subject: Re: Regarding Late Elements >>=20 >> Hi LF, >>=20 >> So did you manage to get the workaround for it ? >>=20 >> I am using a Custom Trigger which is similar to 1.0.3 with few = changes >>=20 >> Regards, >> Vinay Patil >>=20 >> On Mon, Oct 3, 2016 at 10:02 AM, lgfmt [via Apache Flink User Mailing = List archive.] <[hidden email] <>> wrote: >> We have the same requirement - we cannot discard any data even if it = arrives late. >> =20 >>=20 >> - LF >> =20 >>=20 >>=20 >>=20 >> From: Vinay Patil <[hidden email] <>> >> To: [hidden email] <> >>=20 >> Sent: Sunday, October 2, 2016 8:21 PM >> Subject: Regarding Late Elements >>=20 >> Hi Guys, >>=20 >> Just wanted to get an idea on Why Flink decided to completely discard = late elements in the latest version ?, this was not the case in 1.0.3 >>=20 >>=20 >> P.S In our case the data is critical so we cannot discard a single = record even if it is late, I have written a custom trigger (as suggested = by Aljoscha) to even accept late elements. >>=20 >>=20 >> Regards, >> Vinay Patil >>=20 >>=20 >>=20 >>=20 >> If you reply to this email, your message will be added to the = discussion below: >> http://apache-flink-user- mailing-list-archive.2336050. = n4.nabble.com/Regarding-Late- Elements-tp9284p9292.html = >> To start a new topic under Apache Flink User Mailing List archive., = email [hidden email] <>=20 >> To unsubscribe from Apache Flink User Mailing List archive., click = here <>. >> NAML = >>=20 >> View this message in context: Re: Regarding Late Elements = >> Sent from the Apache Flink User Mailing List archive. mailing list = archive = = at Nabble.com . >>=20 >>=20 >>=20 >=20 >=20 >=20 > If you reply to this email, your message will be added to the = discussion below: > = http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regard= ing-Late-Elements-tp9284p9307.html = > To start a new topic under Apache Flink User Mailing List archive., = email [hidden email] = =20 > To unsubscribe from Apache Flink User Mailing List archive., click = here . > NAML = >=20 > View this message in context: Re: Regarding Late Elements = > Sent from the Apache Flink User Mailing List archive. mailing list = archive = = at Nabble.com. --Apple-Mail=_2FCB83E5-DE3D-46D9-ABED-7836F1CA5C71 Content-Transfer-Encoding: quoted-printable Content-Type: text/html; charset=utf-8 Hi Vinay,

=46rom what I understand from your code, the only difference = of your trigger compared to the 
one shipping = with Flink is that for the late elements, instead of firing and keeping = the element, 
you fire and purge, i.e. clean = the window state.

This does not solve the problem of dropping the super late = elements if their window has expired
(currentWatermark >=3D window.maxTimestamp + = allowedLateness).

Cheers,
Kostas

On = Oct 4, 2016, at 3:25 PM, vinay patil <vinay18.patil@gmail.com> wrote:

Hi Kostas,

Thank you for your reply, yes that will be a good = functionality to have, but for now the Custom Trigger as close to 1.0.3 = works for me.
public TriggerResult onElement(Object element, long timestamp, = TimeWindow window, TriggerContext ctx) throws Exception { if(window.maxTimestamp() <=3D ctx.getCurrentWatermark()) { return = TriggerResult.FIRE_AND_PURGE; } else { ctx.registerEventTimeTimer(window.maxTimestamp()); return TriggerResult.CONTINUE; } } public TriggerResult onEventTime(long time, TimeWindow = window, TriggerContext ctx) { return TriggerResult.FIRE_AND_PURGE; }

This is the change I have done in = my custom trigger which is similar to 1.0.3 , does this looks good to = you ?
(I am not using canMerge and onMerge methods since I = am not aware of it)


Regards,
Vinay = Patil

On Tue, Oct 4, 2016 at 3:58 = AM, Kostas Kloudas [via Apache Flink User Mailing List archive.] <[hidden = email]> wrote:
Hello LF and Vinay,

With the introduction of =E2=80=9Callowed lateness=E2=80=9D =  elements and windows are kept around until the watermark
passes the window.maxTimestamp + allowed_lateness and then = they are cleaned up (garbage collected)

Every element that comes in and belongs = to a window that is garbage collected is dropped as = super-late.
Elements that are late, but no more = than the allowed lateness, they are kept the window fires as = before.

If you = know what the maximum latency is, then the best way is to set the = allowed lateness to that value.

Currently Flink drops super-late = elements and does not provide any mechanism to manually handle = these 
elements, BUT there are discussions = about adding such a functionality that will allow you to (probably) = have 
a separate stream with only these = elements.

Thanks,
Kostas

On Oct = 3, 2016, at 5:28 PM, [hidden = email] wrote:

Not yet.
I'm hoping a Flink export on this mailing list will = reply.


- LF



From: vinay patil <[hidden = email]>
To: [hidden = email]
Sent: Monday, October 3, 2016 8:09 AM
= Subject: Re: Regarding Late Elements
=
Hi LF,

So did you manage to get the workaround for it ?

I am using a = Custom Trigger which is similar to 1.0.3 with few = changes

Regards,
Vinay = Patil

On Mon, = Oct 3, 2016 at 10:02 AM, lgfmt [via Apache Flink User Mailing List = archive.] <[hidden = email]> wrote:
We have the = same requirement - we cannot discard any data even if it arrives = late.
 

- LF
 



=
From: Vinay Patil = <[hidden email]>
To: [hidden email]

Sent: Sunday, October 2, 2016 8:21 PM
Subject: Regarding Late Elements

Hi Guys,

Just wanted to get an idea on = Why Flink decided to completely discard late elements in the latest = version ?, this was not the case in 1.0.3

P.S In our case = the data is critical so we cannot discard a single record even if it is = late, I have written a custom trigger (as suggested by Aljoscha) to even = accept late elements.


Regards,
Vinay = Patil


=09 =09 =09


If you reply = to this email, your message will be added to the discussion below:
http://apache-flink-user- = mailing-list-archive.2336050. n4.nabble.com/Regarding-Late- = Elements-tp9284p9292.html
To start a new topic under Apache Flink User Mailing = List archive., email [hidden email]
To unsubscribe from Apache Flink User Mailing List = archive., click here.
= NAML

=09 =09 =09

= View this message in context: Re: Regarding Late Elements
Sent from the Apache Flink User = Mailing List archive. mailing list archive at Nabble.com.



=

=09 =09 =09


If you reply = to this email, your message will be added to the discussion below:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-Late-Elements-tp9284p9307.html
To start a new topic under Apache Flink User Mailing = List archive., email [hidden = email]
To unsubscribe from Apache Flink User Mailing List = archive., click here.
NAML

=09 =09 =09

View this message in context: Re: Regarding = Late Elements
Sent from the Apache Flink User Mailing List archive. mailing list = archive at Nabble.com.
= --Apple-Mail=_2FCB83E5-DE3D-46D9-ABED-7836F1CA5C71--