flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Vitor Vieira <vitorsv.vie...@gmail.com>
Subject Re: Flink CEP Pattern Matching
Date Thu, 03 Mar 2016 13:03:26 GMT
Hi Till,

Idk if the windowing package should provide functions to operate on the
internal elements.

What is the easiest way, or is it possible to get, for example, the last
event of a window, lets say a 5 second window?

Rgds,

Vitor Vieira
@notvitor

2016-03-03 7:29 GMT-03:00 Till Rohrmann <till.rohrmann@gmail.com>:

> Hi Jerry,
>
> at the moment it is not yet possible to access previous elements in the
> filter function of an individual element. Therefore, you have to check for
> the condition “B is 5 days after A” in the final select statement. Giving
> this context to the where clause would be indeed a nice addition to the
> CEP library. If you want, then you could file a JIRA ticket for it.
>
> Here is a simple example how you could solve your problem with the current
> means:
>
> StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>
> // Tuple3(Key, Timestamp, Payload)
> DataStream<Tuple3<Integer, Long, String>> input = env.fromElements(Tuple3.of(1,
1000L, "first event"), Tuple3.of(1, 2000L, "second event"), Tuple3.of(1, 20000L, "third event"));
>
> Pattern<Tuple3<Integer, Long, String>, ?> pattern = Pattern.<Tuple3<Integer,
Long, String>>begin("A").followedBy("B").next("C");
>
> DataStream<String> result = CEP.pattern(input.keyBy(0), pattern).flatSelect(new
PatternFlatSelectFunction<Tuple3<Integer, Long, String>, String>() {
>     @Override
>     public void flatSelect(Map<String, Tuple3<Integer, Long, String>> map,
Collector<String> collector) throws Exception {
>         Tuple3<Integer, Long, String> a = map.get("A");
>         Tuple3<Integer, Long, String> b = map.get("B");
>
>         // check that a and b have at least 1000 ms in between
>         if (b.f1 - a.f1 >= 1000) {
>             collector.collect(a.f2);
>         }
>     }
> });
>
> result.print();
>
> env.execute("CEP example");
>
> Cheers,
> Till
> ​
>
> On Thu, Mar 3, 2016 at 1:46 AM, Vitor Vieira <vitorsv.vieira@gmail.com>
> wrote:
>
>> Hi Jerry,
>>
>> I'm currently evaluating the CEP library too, probably doing something
>> similar.
>>
>> Something like... comparing the 'offset' of the last event in different
>> time windows, each window, based on the event type, occurring like
>> realtime, with this same day/hour/minute a week ago/15d/1month/etc...
>>
>> I plan to share some CEP examples once I finish this engine.
>>
>> -@notvitor
>>
>>
>> 2016-03-02 19:28 GMT-03:00 Fabian Hueske <fhueske@gmail.com>:
>>
>>> Hi Jerry,
>>>
>>> I haven't used the CEP features yet, so I cannot comment on your
>>> requirements.
>>> In case you are looking for the CEP documentation, here it is:
>>>
>>> -->
>>> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/libs/cep.html
>>>
>>> The CEP features will be included in the upcoming 1.0.0 release (which
>>> we currently vote on).
>>> I think you would be one of the first persons to use it. Please let us
>>> know, if you find any problems.
>>>
>>> Thanks, Fabian
>>>
>>>
>>> 2016-03-02 23:12 GMT+01:00 Jerry Lam <chilinglam@gmail.com>:
>>>
>>>> Hi Flink users and developers,
>>>>
>>>> I'm trying to learn the CEP library. How can I express
>>>> A-followBy->B-next->C where B is 5 days after A occurs. What I'm trying
to
>>>> get a hold of is the events that matches A when I'm processing B.
>>>>
>>>> Is this supported?
>>>>
>>>> Best Regards,
>>>>
>>>> Jerry
>>>>
>>>
>>>
>>
>

Mime
View raw message