flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From David Koch <ogd...@googlemail.com>
Subject Re: Listening to timed-out patterns in Flink CEP
Date Fri, 11 Nov 2016 22:56:08 GMT
Hi Till,

Excellent - I'll check out the current snapshot version! Thank you for
taking the time to look into this.

Regards,

David

On Tue, Nov 8, 2016 at 3:25 PM, Till Rohrmann <trohrmann@apache.org> wrote:

> Hi David,
>
> sorry for my late reply. I just found time to look into the problem. You
> were right with your observation that the CEP operator did not behave as
> I've described it. The problem was that the time of the underlying NFA was
> not advanced if there were no events buffered in the CEP operator when a
> new watermark arrived. This was not intended and I opened a PR [1] to fix
> this problem. I've tested the fix with your example program and it seems to
> solve the problem that you don't see timeouts after the timeout interval
> has passed. Thanks for reporting this problem and please excuse my long
> response time.
>
> Btw, I'll merge the PR this evening. So it should be included in the
> current snapshot version by the end of tomorrow.
>
> [1] https://github.com/apache/flink/pull/2771
>
> Cheers,
> Till
>
> On Fri, Oct 14, 2016 at 11:40 AM, Till Rohrmann <trohrmann@apache.org>
> wrote:
>
>> Hi guys,
>>
>> I'll try to come up with an example illustrating the behaviour over the
>> weekend.
>>
>> Cheers,
>> Till
>>
>> On Fri, Oct 14, 2016 at 11:16 AM, David Koch <ogdude@googlemail.com>
>> wrote:
>>
>>> Hello,
>>>
>>> Thanks for the code Sameer. Unfortunately, it didn't solve the issue.
>>> Compared to what I did the principle is the same - make sure that the
>>> watermark advances even without events present to trigger timeouts in CEP
>>> patterns.
>>>
>>> If Till or anyone else could provide a minimal example illustrating the
>>> supposed behaviour of:
>>>
>>> [CEP] timeout will be detected when the first watermark exceeding the
>>>> timeout value is received
>>>
>>>
>>> I'd very much appreciate it.
>>>
>>> Regards,
>>>
>>> David
>>>
>>>
>>> On Wed, Oct 12, 2016 at 1:54 AM, Sameer W <sameer@axiomine.com> wrote:
>>>
>>>> Try this. Your WM's need to move forward. Also don't use System
>>>> Timestamp. Use the timestamp of the element seen as the reference as the
>>>> elements are most likely lagging the system timestamp.
>>>>
>>>> DataStream<Event> withTimestampsAndWatermarks = tuples
>>>>         .assignTimestampsAndWatermarks(new
>>>> AssignerWithPeriodicWatermarks<Event>() {
>>>>
>>>>             long waterMarkTmst;
>>>>             long lastEmittedWM=0;
>>>>             @Override
>>>>             public long extractTimestamp(Event element, long
>>>> previousElementTimestamp) {
>>>>                 if(element.tmst>lastEmittedWM){
>>>>                    waterMarkTmst = element.tmst-1; //Assumes
>>>> increasing timestamps. Need to subtract 1 as more elements with same TS
>>>> might arrive
>>>>                 }
>>>>                 return element.tmst;
>>>>             }
>>>>
>>>>             @Override
>>>>             public Watermark getCurrentWatermark() {
>>>>                 if(lastEmittedWM==waterMarkTmst){ //No new event seen,
>>>> move the WM forward by auto watermark interval
>>>>                     waterMarkTmst = waterMarkTmst + 1000l//Increase by
>>>> auto watermark interval (Watermarks only move forward in time)
>>>>                 }
>>>>                 lastEmittedWM = waterMarkTmst
>>>>
>>>>                 System.out.println(String.format("Watermark at %s",
>>>> new Date(waterMarkTmst)));
>>>>                 return new Watermark(waterMarkTmst);//Until an event
>>>> is seem WM==0 starts advancing by 1000ms until an event is seen
>>>>             }
>>>>         }).keyBy("key");
>>>>
>>>> On Tue, Oct 11, 2016 at 7:29 PM, David Koch <ogdude@googlemail.com>
>>>> wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> I tried setting the watermark to System.currentTimeMillis() - 5000L,
>>>>> event timestamps are System.currentTimeMillis(). I do not observe the
>>>>> expected behaviour of the PatternTimeoutFunction firing once the watermark
>>>>> moves past the timeout "anchored" by a pattern match.
>>>>>
>>>>> Here is the complete test class source <http://pastebin.com/9WxGq2wv>,
>>>>> in case someone is interested. The timestamp/watermark assigner looks
like
>>>>> this:
>>>>>
>>>>> DataStream<Event> withTimestampsAndWatermarks = tuples
>>>>>         .assignTimestampsAndWatermarks(new
>>>>> AssignerWithPeriodicWatermarks<Event>() {
>>>>>
>>>>>             long waterMarkTmst;
>>>>>
>>>>>             @Override
>>>>>             public long extractTimestamp(Event element, long
>>>>> previousElementTimestamp) {
>>>>>                 return element.tmst;
>>>>>             }
>>>>>
>>>>>             @Override
>>>>>             public Watermark getCurrentWatermark() {
>>>>>                 waterMarkTmst = System.currentTimeMillis() - 5000L;
>>>>>                 System.out.println(String.format("Watermark at %s",
>>>>> new Date(waterMarkTmst)));
>>>>>                 return new Watermark(waterMarkTmst);
>>>>>             }
>>>>>         }).keyBy("key");
>>>>>
>>>>> withTimestampsAndWatermarks.getExecutionConfig().setAutoWate
>>>>> rmarkInterval(1000L);
>>>>>
>>>>> // Apply pattern filtering on stream.
>>>>> PatternStream<Event> patternStream = CEP.pattern(withTimestampsAndWatermarks,
>>>>> pattern);
>>>>>
>>>>> Any idea what's wrong?
>>>>>
>>>>> David
>>>>>
>>>>>
>>>>> On Tue, Oct 11, 2016 at 10:20 PM, Sameer W <sameer@axiomine.com>
>>>>> wrote:
>>>>>
>>>>>> Assuming an element with timestamp which is later than the last
>>>>>> emitted watermark arrives, would it just be dropped because the
>>>>>> PatternStream does not have a max allowed lateness method? In that
case it
>>>>>> appears that CEP cannot handle late events yet out of the box.
>>>>>>
>>>>>> If we do want to support late events can we chain a
>>>>>> keyBy().timeWindow().allowedLateness(x).map().assignTimestampsAndWatermarks().keyBy()
>>>>>> again before handing it to the CEP operator. This way we may have
the
>>>>>> patterns fired multiple times but it allows an event to be late and
out of
>>>>>> order. It looks like it will work but is there a less convoluted
way.
>>>>>>
>>>>>> Thanks,
>>>>>> Sameer
>>>>>>
>>>>>> On Tue, Oct 11, 2016 at 12:17 PM, Till Rohrmann <
>>>>>> till.rohrmann@gmail.com> wrote:
>>>>>>
>>>>>>> But then no element later than the last emitted watermark must
be
>>>>>>> issued by the sources. If that is the case, then this solution
should work.
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Till
>>>>>>>
>>>>>>> On Tue, Oct 11, 2016 at 4:50 PM, Sameer W <sameer@axiomine.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> If you know that the events are arriving in order and a consistent
>>>>>>>> lag, why not just increment the watermark time every time
the
>>>>>>>> getCurrentWatermark() method is invoked based on the autoWatermarkInterval
>>>>>>>> (or less to be conservative).
>>>>>>>>
>>>>>>>> You can check if the watermark has changed since the arrival
of the
>>>>>>>> last event and if not increment it in the getCurrentWatermark()
method.
>>>>>>>> Otherwise the watermark will never increase until an element
arrive and if
>>>>>>>> the stream partition stalls for some reason the whole pipeline
freezes.
>>>>>>>>
>>>>>>>> Sameer
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Oct 11, 2016 at 6:04 AM, Till Rohrmann <
>>>>>>>> till.rohrmann@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi David,
>>>>>>>>>
>>>>>>>>> the problem is still that there is no corresponding watermark
>>>>>>>>> saying that 4 seconds have now passed. With your code,
watermarks will be
>>>>>>>>> periodically emitted but the same watermark will be emitted
until a new
>>>>>>>>> element arrives which will reset the watermark. Thus,
the system can never
>>>>>>>>> know until this watermark is seen whether there will
be an earlier event or
>>>>>>>>> not. I fear that this is a fundamental problem with stream
processing.
>>>>>>>>>
>>>>>>>>> You're right that the negation operator won't solve the
problem.
>>>>>>>>> It will indeed suffer from the same problem.
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>> Till
>>>>>>>>>
>>>>>>>>> On Sun, Oct 9, 2016 at 7:37 PM, <lgfmt@yahoo.com>
wrote:
>>>>>>>>>
>>>>>>>>>> >>FLINK-3320 <https://issues.apache.org/jira/browse/FLINK-3320>
(CEP
>>>>>>>>>> "not" operator) does not address this because again,
how would the "not
>>>>>>>>>> match" be triggered if no event at all occurs?
>>>>>>>>>>
>>>>>>>>>> Good question.
>>>>>>>>>>
>>>>>>>>>> I'm not sure whether the following will work:
>>>>>>>>>>
>>>>>>>>>> This could be done by creating a CEP matching pattern
that uses
>>>>>>>>>> both of "notNext" (or "notFollowedBy") and "within"
constructs. Something
>>>>>>>>>> like this:
>>>>>>>>>>
>>>>>>>>>> Pattern<Event, ?> pattern = Pattern.<Event>begin("first")
>>>>>>>>>>     .notNext("second")
>>>>>>>>>>     .within(Time.seconds(3));
>>>>>>>>>>
>>>>>>>>>> I'm hoping Flink CEP experts (Till?) will comment
on this.
>>>>>>>>>>
>>>>>>>>>> Note: I have requested these negation patterns to
be implemented
>>>>>>>>>> in Flink CEP, but notNext/notFollowedBy are not yet
implemented in Flink..
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> - LF
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> ------------------------------
>>>>>>>>>> *From:* David Koch <ogdude@googlemail.com>
>>>>>>>>>> *To:* user@flink.apache.org; lgfmt@yahoo.com
>>>>>>>>>> *Sent:* Sunday, October 9, 2016 5:51 AM
>>>>>>>>>>
>>>>>>>>>> *Subject:* Re: Listening to timed-out patterns in
Flink CEP
>>>>>>>>>>
>>>>>>>>>> Hello,
>>>>>>>>>>
>>>>>>>>>> Thank you for the explanation as well as the link
to the other
>>>>>>>>>> post. Interesting to learn about some of the open
JIRAs.
>>>>>>>>>>
>>>>>>>>>> Indeed, I was not using event time, but processing
time. However,
>>>>>>>>>> even when using event time I only get notified of
timeouts upon subsequent
>>>>>>>>>> events.
>>>>>>>>>>
>>>>>>>>>> The link <http://pastebin.com/x4m3RHQz> contains
an example
>>>>>>>>>> where I read <key> <value> from a socket,
wrap this in a custom "event"
>>>>>>>>>> with timestamp, key the resultant stream by <key>
and attempt to detect
>>>>>>>>>> <key> instances no further than 3 seconds apart
using CEP.
>>>>>>>>>>
>>>>>>>>>> Apart from the fact that results are only printed
when I close
>>>>>>>>>> the socket (normal?) I don't observe any change in
behaviour
>>>>>>>>>>
>>>>>>>>>> So event-time/watermarks or not: SOME event has to
occur for the
>>>>>>>>>> timeout to be triggered.
>>>>>>>>>>
>>>>>>>>>> FLINK-3320 <https://issues.apache.org/jira/browse/FLINK-3320>
(CEP
>>>>>>>>>> "not" operator) does not address this because again,
how would the "not
>>>>>>>>>> match" be triggered if no event at all occurs?
>>>>>>>>>>
>>>>>>>>>> On Sat, Oct 8, 2016 at 12:50 AM, <lgfmt@yahoo.com>
wrote:
>>>>>>>>>>
>>>>>>>>>> The following is a better link:
>>>>>>>>>>
>>>>>>>>>> http://mail-archives.apache. org/mod_mbox/flink-user/
>>>>>>>>>> 201609.mbox/%3CCAC27z% 3DOTtv7USYUm82bE43- DkoGfVC4UAWD6uQwwRgTsE5be8g%
>>>>>>>>>> 40mail.gmail.com%3E
>>>>>>>>>> <http://mail-archives.apache.org/mod_mbox/flink-user/201609.mbox/%3CCAC27z%3DOTtv7USYUm82bE43-DkoGfVC4UAWD6uQwwRgTsE5be8g%40mail.gmail.com%3E>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> - LF
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> ------------------------------
>>>>>>>>>> *From:* "lgfmt@yahoo.com" <lgfmt@yahoo.com>
>>>>>>>>>> *To:* "user@flink.apache.org" <user@flink.apache.org>
>>>>>>>>>> *Sent:* Friday, October 7, 2016 3:36 PM
>>>>>>>>>>
>>>>>>>>>> *Subject:* Re: Listening to timed-out patterns in
Flink CEP
>>>>>>>>>>
>>>>>>>>>> Isn't the upcoming CEP negation (absence of an event)
feature
>>>>>>>>>> solve this issue?
>>>>>>>>>>
>>>>>>>>>> See this discussion thread:
>>>>>>>>>> http://mail-archives.apache. org/mod_mbox/flink-user/
>>>>>>>>>> 201609.mbox/%3CCAC27z%3DOD% 2BTq8twBw_ 1YKni5sWAU3g1S9WDpJw0DUwgiG9YX
>>>>>>>>>> 9Fg%40mail.gmail.com%3E
>>>>>>>>>> <http://mail-archives.apache.org/mod_mbox/flink-user/201609.mbox/%3CCAC27z%3DOD%2BTq8twBw_1YKni5sWAU3g1S9WDpJw0DUwgiG9YX9Fg%40mail.gmail.com%3E>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> //  Atul
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> ------------------------------
>>>>>>>>>> *From:* Till Rohrmann <trohrmann@apache.org>
>>>>>>>>>> *To:* user@flink.apache.org
>>>>>>>>>> *Sent:* Friday, October 7, 2016 12:58 AM
>>>>>>>>>> *Subject:* Re: Listening to timed-out patterns in
Flink CEP
>>>>>>>>>>
>>>>>>>>>> Hi David,
>>>>>>>>>>
>>>>>>>>>> in case of event time, the timeout will be detected
when the
>>>>>>>>>> first watermark exceeding the timeout value is received.
Thus, it depends a
>>>>>>>>>> little bit how you generate watermarks (e.g. periodically,
watermark per
>>>>>>>>>> event).
>>>>>>>>>>
>>>>>>>>>> In case of processing time, the time is only updated
whenever a
>>>>>>>>>> new element arrives. Thus, if you have an element
arriving 4 seconds after
>>>>>>>>>> Event A, it should detect the timeout. If the next
event arrives 20 seconds
>>>>>>>>>> later, than you won't see the timeout until then.
>>>>>>>>>>
>>>>>>>>>> In the case of processing time, we could think about
registering
>>>>>>>>>> timeout timers for processing time. However, I would
highly recommend you
>>>>>>>>>> to use event time, because with processing time,
Flink cannot guarantee
>>>>>>>>>> meaningful computations, because the events might
arrive out of order.
>>>>>>>>>>
>>>>>>>>>> Cheers,
>>>>>>>>>> Till
>>>>>>>>>>
>>>>>>>>>> On Thu, Oct 6, 2016 at 3:08 PM, David Koch <ogdude@googlemail.com
>>>>>>>>>> > wrote:
>>>>>>>>>>
>>>>>>>>>> Hello,
>>>>>>>>>>
>>>>>>>>>> With Flink CEP, is there a way to actively listen
to pattern
>>>>>>>>>> matches that time out? I am under the impression
that this is not possible.
>>>>>>>>>>
>>>>>>>>>> In my case I partition a stream containing user web
navigation by
>>>>>>>>>> "userId" to look for sequences of Event A, followed
by B within 4 seconds
>>>>>>>>>> for each user.
>>>>>>>>>>
>>>>>>>>>> I registered a PatternTimeoutFunction which assuming
a non-match
>>>>>>>>>> only fires upon the first event after the specified
timeout. For example,
>>>>>>>>>> given user X: Event A, 20 seconds later Event B (or
any other type of
>>>>>>>>>> event).
>>>>>>>>>>
>>>>>>>>>> I'd rather have a notification fire directly upon
the 4 second
>>>>>>>>>> interval expiring since passive invalidation is not
really applicable in my
>>>>>>>>>> case.
>>>>>>>>>>
>>>>>>>>>> How, if at all can this be achieved with Flink CEP?
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>>
>>>>>>>>>> David
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message