flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Till Rohrmann <till.rohrm...@gmail.com>
Subject Re: Flink CEP Pattern Matching
Date Thu, 03 Mar 2016 10:29:32 GMT
Hi Jerry,

at the moment it is not yet possible to access previous elements in the
filter function of an individual element. Therefore, you have to check for
the condition “B is 5 days after A” in the final select statement. Giving
this context to the where clause would be indeed a nice addition to the CEP
library. If you want, then you could file a JIRA ticket for it.

Here is a simple example how you could solve your problem with the current
means:

StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

// Tuple3(Key, Timestamp, Payload)
DataStream<Tuple3<Integer, Long, String>> input =
env.fromElements(Tuple3.of(1, 1000L, "first event"), Tuple3.of(1,
2000L, "second event"), Tuple3.of(1, 20000L, "third event"));

Pattern<Tuple3<Integer, Long, String>, ?> pattern =
Pattern.<Tuple3<Integer, Long,
String>>begin("A").followedBy("B").next("C");

DataStream<String> result = CEP.pattern(input.keyBy(0),
pattern).flatSelect(new PatternFlatSelectFunction<Tuple3<Integer,
Long, String>, String>() {
    @Override
    public void flatSelect(Map<String, Tuple3<Integer, Long, String>>
map, Collector<String> collector) throws Exception {
        Tuple3<Integer, Long, String> a = map.get("A");
        Tuple3<Integer, Long, String> b = map.get("B");

        // check that a and b have at least 1000 ms in between
        if (b.f1 - a.f1 >= 1000) {
            collector.collect(a.f2);
        }
    }
});

result.print();

env.execute("CEP example");

Cheers,
Till
​

On Thu, Mar 3, 2016 at 1:46 AM, Vitor Vieira <vitorsv.vieira@gmail.com>
wrote:

> Hi Jerry,
>
> I'm currently evaluating the CEP library too, probably doing something
> similar.
>
> Something like... comparing the 'offset' of the last event in different
> time windows, each window, based on the event type, occurring like
> realtime, with this same day/hour/minute a week ago/15d/1month/etc...
>
> I plan to share some CEP examples once I finish this engine.
>
> -@notvitor
>
>
> 2016-03-02 19:28 GMT-03:00 Fabian Hueske <fhueske@gmail.com>:
>
>> Hi Jerry,
>>
>> I haven't used the CEP features yet, so I cannot comment on your
>> requirements.
>> In case you are looking for the CEP documentation, here it is:
>>
>> -->
>> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/libs/cep.html
>>
>> The CEP features will be included in the upcoming 1.0.0 release (which we
>> currently vote on).
>> I think you would be one of the first persons to use it. Please let us
>> know, if you find any problems.
>>
>> Thanks, Fabian
>>
>>
>> 2016-03-02 23:12 GMT+01:00 Jerry Lam <chilinglam@gmail.com>:
>>
>>> Hi Flink users and developers,
>>>
>>> I'm trying to learn the CEP library. How can I express
>>> A-followBy->B-next->C where B is 5 days after A occurs. What I'm trying
to
>>> get a hold of is the events that matches A when I'm processing B.
>>>
>>> Is this supported?
>>>
>>> Best Regards,
>>>
>>> Jerry
>>>
>>
>>
>

Mime
View raw message