Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id B26B5200BAC for ; Wed, 12 Oct 2016 01:55:15 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id B0C8B160AF3; Tue, 11 Oct 2016 23:55:15 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 13BC0160AE6 for ; Wed, 12 Oct 2016 01:55:13 +0200 (CEST) Received: (qmail 20619 invoked by uid 500); 11 Oct 2016 23:55:08 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 20609 invoked by uid 99); 11 Oct 2016 23:55:08 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 11 Oct 2016 23:55:08 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id ACD1A1800BA for ; Tue, 11 Oct 2016 23:55:07 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.48 X-Spam-Level: ** X-Spam-Status: No, score=2.48 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RCVD_IN_SORBS_SPAM=0.5] autolearn=disabled Authentication-Results: spamd3-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=axiomine-com.20150623.gappssmtp.com Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id P-R9A2ROBequ for ; Tue, 11 Oct 2016 23:55:03 +0000 (UTC) Received: from mail-vk0-f50.google.com (mail-vk0-f50.google.com [209.85.213.50]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id C3A4A5FB39 for ; Tue, 11 Oct 2016 23:55:02 +0000 (UTC) Received: by mail-vk0-f50.google.com with SMTP id 192so36870373vkl.2 for ; Tue, 11 Oct 2016 16:55:02 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=axiomine-com.20150623.gappssmtp.com; s=20150623; h=mime-version:in-reply-to:references:from:date:message-id:subject:to; bh=LvhK/c5vODPRW4rT8jA6dL/grUWJz/tCvX5kNMNVHy0=; b=Mv8cLTeRGG7ommo3+7CZe1iYKIkWmB56YDZtEVBhJgUOGOLqBU+Gh1Ltlg/0wN5hqs ZnKciMonI3a3bM43t5vgJbQzlhrCQ9ucfyArCaTPQL9ZH/ch1RcFu0hIWVSIDDLbauj4 0Ng9laquEDeafIXaS9pfcC85UOIzKGKIF/PC4FJ0v4jOWZFA04kwv7MaY3NqSfB1G3Dp yeekT4txeTMRY4FDDSuiyMnrJHL1Vl93Qb4K+TDYrS5l6IrqT+lC3M//iPXdX5d5YpnJ gZoaFp7YFKIJtqkvhhyigFSIOkzNjf78U3ND4T9NllHne9cayHPXtRxoFIJh+W1XIuRw gWZw== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to; bh=LvhK/c5vODPRW4rT8jA6dL/grUWJz/tCvX5kNMNVHy0=; b=c6CL3eRAWlS4WvOILOcSa/j1RdUC0ILu7FpEKcXVdwy9CA6ebDnigHJdJKJbFoZVlR mPS20ZrSyB53fR4/drwMNgbW2s1IDKsdr6yUceBFZ1KgiKMGvjUAKP110/FmjIB44YfE KvFsS8YqtYU+ikgAp8s+zgNklhABMupLFEcsffvrZjQQ2zT6SXbgYYc2TyxiUMg52ROD Dwy5GYhvCfqZb260+LTYe38ZHG88MGYNZiXYHyP6UQIY4+fp0iz9P3Ftiahddx4m0kmi pjh2rQ2esk1uxWpfrWRL+KbPPmrUszpKlio/iYwnP2XGyNuK0Nosubc0EuIncz1Q5a5f bw0g== X-Gm-Message-State: AA6/9Rk3c3vvqJsU2rRi9WoEDcpRJBGoB7BE4vEwRYK6W+5LGGhHYNqCgsEhNweOztUq8FOhveAP6rQWnOiMaA== X-Received: by 10.31.95.7 with SMTP id t7mr5395327vkb.0.1476230085404; Tue, 11 Oct 2016 16:54:45 -0700 (PDT) MIME-Version: 1.0 Received: by 10.103.35.147 with HTTP; Tue, 11 Oct 2016 16:54:44 -0700 (PDT) X-Originating-IP: [96.241.191.10] In-Reply-To: References: <1601957435.325972.1475879769850@mail.yahoo.com> <876823201.354079.1475880651860@mail.yahoo.com> <1934632319.819559.1476034630845@mail.yahoo.com> From: Sameer W Date: Tue, 11 Oct 2016 19:54:44 -0400 Message-ID: Subject: Re: Listening to timed-out patterns in Flink CEP To: user@flink.apache.org Content-Type: multipart/alternative; boundary=001a114e2932d8ffd5053e9f99e4 archived-at: Tue, 11 Oct 2016 23:55:15 -0000 --001a114e2932d8ffd5053e9f99e4 Content-Type: text/plain; charset=UTF-8 Try this. Your WM's need to move forward. Also don't use System Timestamp. Use the timestamp of the element seen as the reference as the elements are most likely lagging the system timestamp. DataStream withTimestampsAndWatermarks = tuples .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks() { long waterMarkTmst; long lastEmittedWM=0; @Override public long extractTimestamp(Event element, long previousElementTimestamp) { if(element.tmst>lastEmittedWM){ waterMarkTmst = element.tmst-1; //Assumes increasing timestamps. Need to subtract 1 as more elements with same TS might arrive } return element.tmst; } @Override public Watermark getCurrentWatermark() { if(lastEmittedWM==waterMarkTmst){ //No new event seen, move the WM forward by auto watermark interval waterMarkTmst = waterMarkTmst + 1000l//Increase by auto watermark interval (Watermarks only move forward in time) } lastEmittedWM = waterMarkTmst System.out.println(String.format("Watermark at %s", new Date(waterMarkTmst))); return new Watermark(waterMarkTmst);//Until an event is seem WM==0 starts advancing by 1000ms until an event is seen } }).keyBy("key"); On Tue, Oct 11, 2016 at 7:29 PM, David Koch wrote: > Hello, > > I tried setting the watermark to System.currentTimeMillis() - 5000L, event > timestamps are System.currentTimeMillis(). I do not observe the expected > behaviour of the PatternTimeoutFunction firing once the watermark moves > past the timeout "anchored" by a pattern match. > > Here is the complete test class source , in > case someone is interested. The timestamp/watermark assigner looks like > this: > > DataStream withTimestampsAndWatermarks = tuples > .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks() > { > > long waterMarkTmst; > > @Override > public long extractTimestamp(Event element, long > previousElementTimestamp) { > return element.tmst; > } > > @Override > public Watermark getCurrentWatermark() { > waterMarkTmst = System.currentTimeMillis() - 5000L; > System.out.println(String.format("Watermark at %s", new > Date(waterMarkTmst))); > return new Watermark(waterMarkTmst); > } > }).keyBy("key"); > > withTimestampsAndWatermarks.getExecutionConfig().setAutoWatermarkInterval( > 1000L); > > // Apply pattern filtering on stream. > PatternStream patternStream = CEP.pattern(withTimestampsAndWatermarks, > pattern); > > Any idea what's wrong? > > David > > > On Tue, Oct 11, 2016 at 10:20 PM, Sameer W wrote: > >> Assuming an element with timestamp which is later than the last emitted >> watermark arrives, would it just be dropped because the PatternStream does >> not have a max allowed lateness method? In that case it appears that CEP >> cannot handle late events yet out of the box. >> >> If we do want to support late events can we chain a >> keyBy().timeWindow().allowedLateness(x).map().assignTimestampsAndWatermarks().keyBy() >> again before handing it to the CEP operator. This way we may have the >> patterns fired multiple times but it allows an event to be late and out of >> order. It looks like it will work but is there a less convoluted way. >> >> Thanks, >> Sameer >> >> On Tue, Oct 11, 2016 at 12:17 PM, Till Rohrmann >> wrote: >> >>> But then no element later than the last emitted watermark must be issued >>> by the sources. If that is the case, then this solution should work. >>> >>> Cheers, >>> Till >>> >>> On Tue, Oct 11, 2016 at 4:50 PM, Sameer W wrote: >>> >>>> Hi, >>>> >>>> If you know that the events are arriving in order and a consistent lag, >>>> why not just increment the watermark time every time the >>>> getCurrentWatermark() method is invoked based on the autoWatermarkInterval >>>> (or less to be conservative). >>>> >>>> You can check if the watermark has changed since the arrival of the >>>> last event and if not increment it in the getCurrentWatermark() method. >>>> Otherwise the watermark will never increase until an element arrive and if >>>> the stream partition stalls for some reason the whole pipeline freezes. >>>> >>>> Sameer >>>> >>>> >>>> On Tue, Oct 11, 2016 at 6:04 AM, Till Rohrmann >>> > wrote: >>>> >>>>> Hi David, >>>>> >>>>> the problem is still that there is no corresponding watermark saying >>>>> that 4 seconds have now passed. With your code, watermarks will be >>>>> periodically emitted but the same watermark will be emitted until a new >>>>> element arrives which will reset the watermark. Thus, the system can never >>>>> know until this watermark is seen whether there will be an earlier event or >>>>> not. I fear that this is a fundamental problem with stream processing. >>>>> >>>>> You're right that the negation operator won't solve the problem. It >>>>> will indeed suffer from the same problem. >>>>> >>>>> Cheers, >>>>> Till >>>>> >>>>> On Sun, Oct 9, 2016 at 7:37 PM, wrote: >>>>> >>>>>> >>FLINK-3320 (CEP >>>>>> "not" operator) does not address this because again, how would the "not >>>>>> match" be triggered if no event at all occurs? >>>>>> >>>>>> Good question. >>>>>> >>>>>> I'm not sure whether the following will work: >>>>>> >>>>>> This could be done by creating a CEP matching pattern that uses both >>>>>> of "notNext" (or "notFollowedBy") and "within" constructs. Something like >>>>>> this: >>>>>> >>>>>> Pattern pattern = Pattern.begin("first") >>>>>> .notNext("second") >>>>>> .within(Time.seconds(3)); >>>>>> >>>>>> I'm hoping Flink CEP experts (Till?) will comment on this. >>>>>> >>>>>> Note: I have requested these negation patterns to be implemented in >>>>>> Flink CEP, but notNext/notFollowedBy are not yet implemented in Flink.. >>>>>> >>>>>> >>>>>> - LF >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> ------------------------------ >>>>>> *From:* David Koch >>>>>> *To:* user@flink.apache.org; lgfmt@yahoo.com >>>>>> *Sent:* Sunday, October 9, 2016 5:51 AM >>>>>> >>>>>> *Subject:* Re: Listening to timed-out patterns in Flink CEP >>>>>> >>>>>> Hello, >>>>>> >>>>>> Thank you for the explanation as well as the link to the other post. >>>>>> Interesting to learn about some of the open JIRAs. >>>>>> >>>>>> Indeed, I was not using event time, but processing time. However, >>>>>> even when using event time I only get notified of timeouts upon subsequent >>>>>> events. >>>>>> >>>>>> The link contains an example where I >>>>>> read from a socket, wrap this in a custom "event" with >>>>>> timestamp, key the resultant stream by and attempt to detect >>>>>> instances no further than 3 seconds apart using CEP. >>>>>> >>>>>> Apart from the fact that results are only printed when I close the >>>>>> socket (normal?) I don't observe any change in behaviour >>>>>> >>>>>> So event-time/watermarks or not: SOME event has to occur for the >>>>>> timeout to be triggered. >>>>>> >>>>>> FLINK-3320 (CEP >>>>>> "not" operator) does not address this because again, how would the "not >>>>>> match" be triggered if no event at all occurs? >>>>>> >>>>>> On Sat, Oct 8, 2016 at 12:50 AM, wrote: >>>>>> >>>>>> The following is a better link: >>>>>> >>>>>> http://mail-archives.apache. org/mod_mbox/flink-user/ >>>>>> 201609.mbox/%3CCAC27z% 3DOTtv7USYUm82bE43- DkoGfVC4UAWD6uQwwRgTsE5be8g% >>>>>> 40mail.gmail.com%3E >>>>>> >>>>>> >>>>>> >>>>>> - LF >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> ------------------------------ >>>>>> *From:* "lgfmt@yahoo.com" >>>>>> *To:* "user@flink.apache.org" >>>>>> *Sent:* Friday, October 7, 2016 3:36 PM >>>>>> >>>>>> *Subject:* Re: Listening to timed-out patterns in Flink CEP >>>>>> >>>>>> Isn't the upcoming CEP negation (absence of an event) feature solve >>>>>> this issue? >>>>>> >>>>>> See this discussion thread: >>>>>> http://mail-archives.apache. org/mod_mbox/flink-user/ >>>>>> 201609.mbox/%3CCAC27z%3DOD% 2BTq8twBw_ 1YKni5sWAU3g1S9WDpJw0DUwgiG9YX >>>>>> 9Fg%40mail.gmail.com%3E >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> // Atul >>>>>> >>>>>> >>>>>> ------------------------------ >>>>>> *From:* Till Rohrmann >>>>>> *To:* user@flink.apache.org >>>>>> *Sent:* Friday, October 7, 2016 12:58 AM >>>>>> *Subject:* Re: Listening to timed-out patterns in Flink CEP >>>>>> >>>>>> Hi David, >>>>>> >>>>>> in case of event time, the timeout will be detected when the first >>>>>> watermark exceeding the timeout value is received. Thus, it depends a >>>>>> little bit how you generate watermarks (e.g. periodically, watermark per >>>>>> event). >>>>>> >>>>>> In case of processing time, the time is only updated whenever a new >>>>>> element arrives. Thus, if you have an element arriving 4 seconds after >>>>>> Event A, it should detect the timeout. If the next event arrives 20 seconds >>>>>> later, than you won't see the timeout until then. >>>>>> >>>>>> In the case of processing time, we could think about registering >>>>>> timeout timers for processing time. However, I would highly recommend you >>>>>> to use event time, because with processing time, Flink cannot guarantee >>>>>> meaningful computations, because the events might arrive out of order. >>>>>> >>>>>> Cheers, >>>>>> Till >>>>>> >>>>>> On Thu, Oct 6, 2016 at 3:08 PM, David Koch >>>>>> wrote: >>>>>> >>>>>> Hello, >>>>>> >>>>>> With Flink CEP, is there a way to actively listen to pattern matches >>>>>> that time out? I am under the impression that this is not possible. >>>>>> >>>>>> In my case I partition a stream containing user web navigation by >>>>>> "userId" to look for sequences of Event A, followed by B within 4 seconds >>>>>> for each user. >>>>>> >>>>>> I registered a PatternTimeoutFunction which assuming a non-match only >>>>>> fires upon the first event after the specified timeout. For example, given >>>>>> user X: Event A, 20 seconds later Event B (or any other type of event). >>>>>> >>>>>> I'd rather have a notification fire directly upon the 4 second >>>>>> interval expiring since passive invalidation is not really applicable in my >>>>>> case. >>>>>> >>>>>> How, if at all can this be achieved with Flink CEP? >>>>>> >>>>>> Thanks, >>>>>> >>>>>> David >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>> >>>> >>> >> > --001a114e2932d8ffd5053e9f99e4 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Try this. Your WM's need to move forward. Also don'= ;t use System Timestamp. Use the timestamp of the element seen as the refer= ence as the elements are most likely lagging the system timestamp.

=
DataStream<Event> withTimestampsAndWatermarks =3D tuples
=C2= =A0 =C2=A0 =C2=A0 =C2=A0 .assignTimestampsAndWatermarks(new Assig= nerWithPeriodicWatermarks<Event>() {
=C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0=C2=A0
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 l= ong waterMarkTmst;
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 long la= stEmittedWM=3D0;
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 @Override
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 public long extractTimestamp(= Event element, long previousElementTimestamp) {
=C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 if(element.tmst>lastEmittedWM){
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0waterMarkTmst =3D=C2=A0element.tmst-1; //Assumes increasing time= stamps. Need to subtract 1 as more elements with same TS might arrive
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 }
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 r= eturn element.tmst;
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 }
=
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 @Override
<= div style=3D"font-size:12.8px">=C2=A0 = =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 public Watermark getCurrentWatermark() {=
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 if(lastEmittedWM=3D=3DwaterMarkTmst){ //No new event seen, move th= e WM forward by auto watermark interval
= =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0=C2=A0= wat= erMarkTmst =3D=C2=A0waterMarkTmst + 1000l//Increase by auto watermark interval= (Watermarks only move forward in time)
= =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 }
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0=C2= =A0= lastEmittedWM=C2=A0=3D=C2=A0waterMarkTmst
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 System.out.println(String.format("Watermark at %s"= ;, new Date(waterMarkTmst)));
<= font face=3D"monospace, monospace">=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 return new Watermark(waterMarkTmst);//Until an event is seem WM=3D=3D0 starts advancing by 1= 000ms until an event is seen
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= }
=C2=A0 =C2=A0 =C2=A0 =C2=A0 }).keyBy("key");
=

On Tu= e, Oct 11, 2016 at 7:29 PM, David Koch <ogdude@googlemail.com><= /span> wrote:
Hello,
I tried setting the watermark to System.currentTimeMillis()= - 5000L, event timestamps are System.currentTimeMillis(). I do not observe= the expected behaviour of the PatternTimeoutFunction firing once the water= mark moves past the timeout "anchored" by a pattern match.
<= div>
Here is the complete test class source, in case someone is intereste= d. The timestamp/watermark assigner looks like this:

DataStream<Event> withTim= estampsAndWatermarks =3D tuples
=C2=A0 =C2=A0 =C2=A0 =C2=A0 .assignTimestampsAndWatermarks(<= wbr>new AssignerWithPeriodicWatermarks<Event>() {
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0=C2=A0
=C2=A0 = =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 long waterMarkTmst;
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0
=C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 =C2=A0 @Override
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 public long extra= ctTimestamp(Event element, long previousElementTimestamp) {
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 return element.tmst;
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 }

=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 @Overri= de
=C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 public Watermark getCurrentWatermark() {
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 waterMarkTmst =3D System.currentTimeMillis() - 500= 0L;
=C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 System.out.println(String.forma= t("Watermark at %s", new Date(waterMarkTmst)));
= =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 return new Watermark(waterMarkTmst);
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = }
=C2=A0 =C2=A0 =C2=A0= =C2=A0 }).keyBy("key");

with= TimestampsAndWatermarks.getExecutionConfig().setAutoWatermarkInte= rval(1000L);

=
// Apply pattern filt= ering on stream.
Patte= rnStream<Event> patternStream =3D CEP.pattern(withTimestampsAndW= atermarks, pattern);

Any = idea what's wrong?

David


On Tue, Oct 11, 2016 at 10:20 PM, Sameer W <sameer@axio= mine.com> wrote:
Assuming an element with timestamp which is later than the last em= itted watermark arrives, would it just be dropped because the PatternStream= does not have a max allowed lateness method? In that case it appears that = CEP cannot handle late events yet out of the box.=C2=A0

= If we do want to support late events can we chain a keyBy().timeWindow().al= lowedLateness(x).map().assignTimestampsAndWatermarks().keyBy() ag= ain before handing it to the CEP operator. This way we may have the pattern= s fired multiple times but it allows an event to be late and out of order. = It looks like it will work but is there a less convoluted way.

Thanks,
Sameer

On Tue, Oct 11, 2016 at 12:17 PM, Ti= ll Rohrmann <till.rohrmann@gmail.com> wrote:
But then no element later than th= e last emitted watermark must be issued by the sources. If that is the case= , then this solution should work.

Cheers,
Till
<= /div>

On Tue, Oct 11, 2016 at 4:50 PM, Samee= r W <sameer@axiomine.com> wrote:
Hi,

If you know that the event= s are arriving in order and a consistent lag, why not just increment the wa= termark time every time the getCurrentWatermark() method is invoked based o= n the autoWatermarkInterval (or less to be conservative).=C2=A0
<= br>
You can check if the watermark has changed since the arrival = of the last event and if not increment it in the getCurrentWatermark() meth= od. Otherwise the watermark will never increase until an element arrive and= if the stream partition stalls for some reason the whole pipeline freezes.=

Sameer

On Tue, Oct 11, 2016 at 6:04 AM, Till Rohrmann <till.rohrmann@gmail.com> wrote:
Hi David,

the problem is still th= at there is no corresponding watermark saying that 4 seconds have now passe= d. With your code, watermarks will be periodically emitted but the same wat= ermark will be emitted until a new element arrives which will reset the wat= ermark. Thus, the system can never know until this watermark is seen whethe= r there will be an earlier event or not. I fear that this is a fundamental = problem with stream processing.

You're right t= hat the negation operator won't solve the problem. It will indeed suffe= r from the same problem.

Cheers,
Till

On Sun, Oct 9, 2016 at 7:37 PM, <lgfmt@yahoo.com> wrote:
>>FLINK-3320=C2=A0(CEP "not" operator) doe= s not address this because again, how would the "not match" be tr= iggered if no event at all occurs?

Good question.

I&= #39;m not sure whether the following will work:

Th= is could be done by creating a CEP matching pattern that uses both of "= ;notNext" (or "notFollowedBy") and "within" constr= ucts. Something like this:

Pattern<Event, ?> pattern =3D Pattern.<Event>begin(&q= uot;first")
=C2=A0=C2=A0=C2=A0 .notNext("second= ")
=C2=A0=C2=A0=C2=A0 .within(Time.seconds(3));

I'm hoping Flink CE= P experts (Till?) will comment on this.

Note: I have requested = these negation patterns to be implemented in Flink CEP, but notNext/notFoll= owedBy are not yet implemented in Flink..


- LF





From: David Koch <ogdude@googlemail.com>
To: user@flink.apache.org; lgfmt@yahoo.com
Sent: Sunday, October 9, 2016 5:51 AM
Subject: Re: Listening to timed-out patterns in Flink CEP
=

Hello,

Thank you for the exp= lanation as well as the link to the other post. Interesting to learn about = some of the open JIRAs.

Indeed, I w= as not using event time, but processing time. However, even when using even= t time I only get notified of timeouts upon subsequent events.
The link contains an exampl= e where I read <key> <value> from a socket, wrap this in a cust= om "event" with timestamp, key the resultant stream by <key>= ; and attempt to detect <key> instances no further than 3 seconds apa= rt using CEP.

Apart from the fact t= hat results are only printed when I close the socket (normal?) I don't = observe any change in behaviour

So = event-time/watermarks or not: SOME event has to occur for the timeout to be= triggered.

FLINK-3320=C2=A0(CEP "not" operator) does not ad= dress this because again, how would the "not match" be triggered = if no event at all occurs?

On = Sat, Oct 8, 2016 at 12:50 AM, <lgfmt@yahoo.= com> wrote:
The following is a better link:

http://mail-archives.apache= . org/mod_mbox/flink-user/ 201609.mbox/%3CCAC27z% 3DOTtv7USYUm82bE43- DkoGf= VC4UAWD6uQwwRgTsE5be8g% 40mail.gmail.com%3E


<= var>- LF
=C2=A0
<= br clear=3D"none">



From: "lgfmt@yahoo.com" = <lgfmt@yahoo.com>
To: "user@flink.apache.or= g" <user@flink.apache.org>
Sent: Friday, October= 7, 2016 3:36 PM

Subject: Re: Listening to timed-out patterns in Fli= nk CEP

Isn't the u= pcoming CEP negation (absence of an event) featur= e solve this issue?

=
See this discussion thread:
<= a rel=3D"nofollow" shape=3D"rect" href=3D"http://mail-archives.apache.org/m= od_mbox/flink-user/201609.mbox/%3CCAC27z%3DOD%2BTq8twBw_1YKni5sWAU3g1S9WDpJ= w0DUwgiG9YX9Fg%40mail.gmail.com%3E" target=3D"_blank">http://mail-archives.= apache. org/mod_mbox/flink-user/ 201609.mbox/%3CCAC27z%3DOD% 2BTq8twBw_ 1YK= ni5sWAU3g1S9WDpJw0DUwgiG9YX 9Fg%40mail.gmail.com%3E

=C2=A0
=
//=C2=A0 Atul



From: Till Rohrmann <= trohrmann@apache.org>
To: user@flink.apache.or= g
Sent:<= /b> Friday, October 7, 2016 12:58 AM
Subject: Re: Listening to timed-out patterns in= Flink CEP

Hi David,

in case of event = time, the timeout will be detected when the first watermark exceeding the t= imeout value is received. Thus, it depends a little bit how you generate wa= termarks (e.g. periodically, watermark per event).

In case of processing time, the time is only updated whenev= er a new element arrives. Thus, if you have an element arriving 4 seconds a= fter Event A, it should detect the timeout. If the next event arrives 20 se= conds later, than you won't see the timeout until then.

In the case of processing time, we could think abo= ut registering timeout timers for processing time. However, I would highly = recommend you to use event time, because with processing time, Flink cannot= guarantee meaningful computations, because the events might arrive out of = order.

Cheers,
Till
=

On Thu, Oct 6, 2016 at 3:08 PM, Dav= id Koch <ogdude@googlemail.com>= wrote:
Hello,

With Flink CEP, is there a way to actively listen to pat= tern matches that time out? I am under the impression that this is not poss= ible.

In my case= I partition a stream containing user web navigation by "userId" = to look for sequences of Event A, followed by B within 4 seconds for each u= ser.

I registered a PatternTimeoutF= unction which assuming a non-match only fires upon the first event after th= e specified timeout. For example, given user X: Event A, 20 seconds later E= vent B (or any other type of event).

I'd rather have a notification fire directly upon the 4 second interv= al expiring since passive invalidation is not really applicable in my case.=

How, if at all can this be achieve= d with Flink CEP?

Thanks,

David






<= /div>









--001a114e2932d8ffd5053e9f99e4--