flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From David Koch <ogd...@googlemail.com>
Subject Re: Listening to timed-out patterns in Flink CEP
Date Tue, 11 Oct 2016 16:36:12 GMT
I will give it a try, my current time/watermark assigner extends
AscendingTimestampExtractor so I can't override setting the watermark to
the last seen event timestamp.

Thanks for your replies.

/David

On Tue, Oct 11, 2016 at 6:17 PM, Till Rohrmann <till.rohrmann@gmail.com>
wrote:

> But then no element later than the last emitted watermark must be issued
> by the sources. If that is the case, then this solution should work.
>
> Cheers,
> Till
>
> On Tue, Oct 11, 2016 at 4:50 PM, Sameer W <sameer@axiomine.com> wrote:
>
>> Hi,
>>
>> If you know that the events are arriving in order and a consistent lag,
>> why not just increment the watermark time every time the
>> getCurrentWatermark() method is invoked based on the autoWatermarkInterval
>> (or less to be conservative).
>>
>> You can check if the watermark has changed since the arrival of the last
>> event and if not increment it in the getCurrentWatermark() method.
>> Otherwise the watermark will never increase until an element arrive and if
>> the stream partition stalls for some reason the whole pipeline freezes.
>>
>> Sameer
>>
>>
>> On Tue, Oct 11, 2016 at 6:04 AM, Till Rohrmann <till.rohrmann@gmail.com>
>> wrote:
>>
>>> Hi David,
>>>
>>> the problem is still that there is no corresponding watermark saying
>>> that 4 seconds have now passed. With your code, watermarks will be
>>> periodically emitted but the same watermark will be emitted until a new
>>> element arrives which will reset the watermark. Thus, the system can never
>>> know until this watermark is seen whether there will be an earlier event or
>>> not. I fear that this is a fundamental problem with stream processing.
>>>
>>> You're right that the negation operator won't solve the problem. It will
>>> indeed suffer from the same problem.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Sun, Oct 9, 2016 at 7:37 PM, <lgfmt@yahoo.com> wrote:
>>>
>>>> >>FLINK-3320 <https://issues.apache.org/jira/browse/FLINK-3320>
(CEP
>>>> "not" operator) does not address this because again, how would the "not
>>>> match" be triggered if no event at all occurs?
>>>>
>>>> Good question.
>>>>
>>>> I'm not sure whether the following will work:
>>>>
>>>> This could be done by creating a CEP matching pattern that uses both of
>>>> "notNext" (or "notFollowedBy") and "within" constructs. Something like this:
>>>>
>>>> Pattern<Event, ?> pattern = Pattern.<Event>begin("first")
>>>>     .notNext("second")
>>>>     .within(Time.seconds(3));
>>>>
>>>> I'm hoping Flink CEP experts (Till?) will comment on this.
>>>>
>>>> Note: I have requested these negation patterns to be implemented in
>>>> Flink CEP, but notNext/notFollowedBy are not yet implemented in Flink..
>>>>
>>>>
>>>> - LF
>>>>
>>>>
>>>>
>>>>
>>>> ------------------------------
>>>> *From:* David Koch <ogdude@googlemail.com>
>>>> *To:* user@flink.apache.org; lgfmt@yahoo.com
>>>> *Sent:* Sunday, October 9, 2016 5:51 AM
>>>>
>>>> *Subject:* Re: Listening to timed-out patterns in Flink CEP
>>>>
>>>> Hello,
>>>>
>>>> Thank you for the explanation as well as the link to the other post.
>>>> Interesting to learn about some of the open JIRAs.
>>>>
>>>> Indeed, I was not using event time, but processing time. However, even
>>>> when using event time I only get notified of timeouts upon subsequent
>>>> events.
>>>>
>>>> The link <http://pastebin.com/x4m3RHQz> contains an example where I
>>>> read <key> <value> from a socket, wrap this in a custom "event"
with
>>>> timestamp, key the resultant stream by <key> and attempt to detect
<key>
>>>> instances no further than 3 seconds apart using CEP.
>>>>
>>>> Apart from the fact that results are only printed when I close the
>>>> socket (normal?) I don't observe any change in behaviour
>>>>
>>>> So event-time/watermarks or not: SOME event has to occur for the
>>>> timeout to be triggered.
>>>>
>>>> FLINK-3320 <https://issues.apache.org/jira/browse/FLINK-3320> (CEP
>>>> "not" operator) does not address this because again, how would the "not
>>>> match" be triggered if no event at all occurs?
>>>>
>>>> On Sat, Oct 8, 2016 at 12:50 AM, <lgfmt@yahoo.com> wrote:
>>>>
>>>> The following is a better link:
>>>>
>>>> http://mail-archives.apache. org/mod_mbox/flink-user/
>>>> 201609.mbox/%3CCAC27z% 3DOTtv7USYUm82bE43- DkoGfVC4UAWD6uQwwRgTsE5be8g%
>>>> 40mail.gmail.com%3E
>>>> <http://mail-archives.apache.org/mod_mbox/flink-user/201609.mbox/%3CCAC27z%3DOTtv7USYUm82bE43-DkoGfVC4UAWD6uQwwRgTsE5be8g%40mail.gmail.com%3E>
>>>>
>>>>
>>>> - LF
>>>>
>>>>
>>>>
>>>>
>>>> ------------------------------
>>>> *From:* "lgfmt@yahoo.com" <lgfmt@yahoo.com>
>>>> *To:* "user@flink.apache.org" <user@flink.apache.org>
>>>> *Sent:* Friday, October 7, 2016 3:36 PM
>>>>
>>>> *Subject:* Re: Listening to timed-out patterns in Flink CEP
>>>>
>>>> Isn't the upcoming CEP negation (absence of an event) feature solve
>>>> this issue?
>>>>
>>>> See this discussion thread:
>>>> http://mail-archives.apache. org/mod_mbox/flink-user/
>>>> 201609.mbox/%3CCAC27z%3DOD% 2BTq8twBw_ 1YKni5sWAU3g1S9WDpJw0DUwgiG9YX
>>>> 9Fg%40mail.gmail.com%3E
>>>> <http://mail-archives.apache.org/mod_mbox/flink-user/201609.mbox/%3CCAC27z%3DOD%2BTq8twBw_1YKni5sWAU3g1S9WDpJw0DUwgiG9YX9Fg%40mail.gmail.com%3E>
>>>>
>>>>
>>>>
>>>> //  Atul
>>>>
>>>>
>>>> ------------------------------
>>>> *From:* Till Rohrmann <trohrmann@apache.org>
>>>> *To:* user@flink.apache.org
>>>> *Sent:* Friday, October 7, 2016 12:58 AM
>>>> *Subject:* Re: Listening to timed-out patterns in Flink CEP
>>>>
>>>> Hi David,
>>>>
>>>> in case of event time, the timeout will be detected when the first
>>>> watermark exceeding the timeout value is received. Thus, it depends a
>>>> little bit how you generate watermarks (e.g. periodically, watermark per
>>>> event).
>>>>
>>>> In case of processing time, the time is only updated whenever a new
>>>> element arrives. Thus, if you have an element arriving 4 seconds after
>>>> Event A, it should detect the timeout. If the next event arrives 20 seconds
>>>> later, than you won't see the timeout until then.
>>>>
>>>> In the case of processing time, we could think about registering
>>>> timeout timers for processing time. However, I would highly recommend you
>>>> to use event time, because with processing time, Flink cannot guarantee
>>>> meaningful computations, because the events might arrive out of order.
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Thu, Oct 6, 2016 at 3:08 PM, David Koch <ogdude@googlemail.com>
>>>> wrote:
>>>>
>>>> Hello,
>>>>
>>>> With Flink CEP, is there a way to actively listen to pattern matches
>>>> that time out? I am under the impression that this is not possible.
>>>>
>>>> In my case I partition a stream containing user web navigation by
>>>> "userId" to look for sequences of Event A, followed by B within 4 seconds
>>>> for each user.
>>>>
>>>> I registered a PatternTimeoutFunction which assuming a non-match only
>>>> fires upon the first event after the specified timeout. For example, given
>>>> user X: Event A, 20 seconds later Event B (or any other type of event).
>>>>
>>>> I'd rather have a notification fire directly upon the 4 second interval
>>>> expiring since passive invalidation is not really applicable in my case.
>>>>
>>>> How, if at all can this be achieved with Flink CEP?
>>>>
>>>> Thanks,
>>>>
>>>> David
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>
>>
>

Mime
View raw message