flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Vitor Vieira <vitorsv.vie...@gmail.com>
Subject Re: Flink CEP Pattern Matching
Date Thu, 03 Mar 2016 19:22:06 GMT
I believe that most of functionalities regarding transformation and
projection of windowed events will only be implemented in the next releases.

I'm looking forward to contribute!


2016-03-03 15:29 GMT-03:00 Jerry Lam <chilinglam@gmail.com>:

> Hi Till,
>
> The idea of having CEP functionalities in Flink is very exciting. I really
> appreciate your work on this.
> Will you consider in the future adding the similar functionalities
> described in this standard (
> http://web.cs.ucla.edu/classes/fall15/cs240A/notes/temporal/row-pattern-recogniton-11.pdf)?
> This document describes a lot of use cases that are very interesting for
> CEP applications. I have experience with Esper and WSO2 Siddhi. They
> provide subset of the functionalities described in the standard.
>
> Having this pattern matching CEP functionality in Flink is a killing
> feature IMHO.
>
> Best Regards,
>
> Jerry
>
>
> On Thu, Mar 3, 2016 at 8:47 AM, Till Rohrmann <trohrmann@apache.org>
> wrote:
>
>> Hi Vitor,
>>
>> the CEP operators are not working on real windows. What they do is to use
>> a NFA to track the state of multiple ongoing sequences. In order to store
>> the element efficiently, a kind of shared buffer with versioning is used.
>> Once a sequence has reached a final state, the sequence of elements is
>> backtracked in the shared buffer to produce the final result.
>>
>> So in order to get access to the previous elements of a non-finished
>> sequence, we could simply apply the same mechanism just without removing
>> the sequence from the shared buffer. This would of course be a bit more
>> costly since for every state you retrieve the sequence of elements which
>> led to this state.
>>
>> But we could offer two filter conditions. One which is more light-weight
>> and only offers access to the current element. And another filter condition
>> where you have access to the previous elements. The second variant might
>> make sense if you can prune early many false sequences.
>>
>> Cheers,
>> Till
>>
>> On Thu, Mar 3, 2016 at 2:03 PM, Vitor Vieira <vitorsv.vieira@gmail.com>
>> wrote:
>>
>>> Hi Till,
>>>
>>> Idk if the windowing package should provide functions to operate on the
>>> internal elements.
>>>
>>> What is the easiest way, or is it possible to get, for example, the last
>>> event of a window, lets say a 5 second window?
>>>
>>> Rgds,
>>>
>>> Vitor Vieira
>>> @notvitor
>>>
>>> 2016-03-03 7:29 GMT-03:00 Till Rohrmann <till.rohrmann@gmail.com>:
>>>
>>>> Hi Jerry,
>>>>
>>>> at the moment it is not yet possible to access previous elements in the
>>>> filter function of an individual element. Therefore, you have to check for
>>>> the condition “B is 5 days after A” in the final select statement.
>>>> Giving this context to the where clause would be indeed a nice
>>>> addition to the CEP library. If you want, then you could file a JIRA ticket
>>>> for it.
>>>>
>>>> Here is a simple example how you could solve your problem with the
>>>> current means:
>>>>
>>>> StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>>>>
>>>> // Tuple3(Key, Timestamp, Payload)
>>>> DataStream<Tuple3<Integer, Long, String>> input = env.fromElements(Tuple3.of(1,
1000L, "first event"), Tuple3.of(1, 2000L, "second event"), Tuple3.of(1, 20000L, "third event"));
>>>>
>>>> Pattern<Tuple3<Integer, Long, String>, ?> pattern = Pattern.<Tuple3<Integer,
Long, String>>begin("A").followedBy("B").next("C");
>>>>
>>>> DataStream<String> result = CEP.pattern(input.keyBy(0), pattern).flatSelect(new
PatternFlatSelectFunction<Tuple3<Integer, Long, String>, String>() {
>>>>     @Override
>>>>     public void flatSelect(Map<String, Tuple3<Integer, Long, String>>
map, Collector<String> collector) throws Exception {
>>>>         Tuple3<Integer, Long, String> a = map.get("A");
>>>>         Tuple3<Integer, Long, String> b = map.get("B");
>>>>
>>>>         // check that a and b have at least 1000 ms in between
>>>>         if (b.f1 - a.f1 >= 1000) {
>>>>             collector.collect(a.f2);
>>>>         }
>>>>     }
>>>> });
>>>>
>>>> result.print();
>>>>
>>>> env.execute("CEP example");
>>>>
>>>> Cheers,
>>>> Till
>>>> ​
>>>>
>>>> On Thu, Mar 3, 2016 at 1:46 AM, Vitor Vieira <vitorsv.vieira@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Jerry,
>>>>>
>>>>> I'm currently evaluating the CEP library too, probably doing something
>>>>> similar.
>>>>>
>>>>> Something like... comparing the 'offset' of the last event in
>>>>> different time windows, each window, based on the event type, occurring
>>>>> like realtime, with this same day/hour/minute a week ago/15d/1month/etc...
>>>>>
>>>>> I plan to share some CEP examples once I finish this engine.
>>>>>
>>>>> -@notvitor
>>>>>
>>>>>
>>>>> 2016-03-02 19:28 GMT-03:00 Fabian Hueske <fhueske@gmail.com>:
>>>>>
>>>>>> Hi Jerry,
>>>>>>
>>>>>> I haven't used the CEP features yet, so I cannot comment on your
>>>>>> requirements.
>>>>>> In case you are looking for the CEP documentation, here it is:
>>>>>>
>>>>>> -->
>>>>>> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/libs/cep.html
>>>>>>
>>>>>> The CEP features will be included in the upcoming 1.0.0 release
>>>>>> (which we currently vote on).
>>>>>> I think you would be one of the first persons to use it. Please let
>>>>>> us know, if you find any problems.
>>>>>>
>>>>>> Thanks, Fabian
>>>>>>
>>>>>>
>>>>>> 2016-03-02 23:12 GMT+01:00 Jerry Lam <chilinglam@gmail.com>:
>>>>>>
>>>>>>> Hi Flink users and developers,
>>>>>>>
>>>>>>> I'm trying to learn the CEP library. How can I express
>>>>>>> A-followBy->B-next->C where B is 5 days after A occurs.
What I'm trying to
>>>>>>> get a hold of is the events that matches A when I'm processing
B.
>>>>>>>
>>>>>>> Is this supported?
>>>>>>>
>>>>>>> Best Regards,
>>>>>>>
>>>>>>> Jerry
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message