flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Elias Levy <fearsome.lucid...@gmail.com>
Subject CEP join across events
Date Wed, 26 Apr 2017 05:48:24 GMT
There doesn't appear to be a way to join events across conditions using the
CEP library.

Consider events of the form (type, value_a, value_b) on a stream keyed by
the value_a field.

Under 1.2 you can create a pattern that for a given value_a, as specified
by the stream key, there is a match if an event of type 1 is followed by an
event of type 2 (e.g.
begin("foo").where(_.type==1).followedBy("bar").where(_.type==2).  But this
will return a match regardless of whether value_b in the first event
matches value_b in the second event.

1.3 snapshot introduces iterative conditions, but this is insufficient.  In
1.3 you can do:

begin("foo").where(_.type==1).followedBy("bar").where(
    (v, ctx) => {
       v.type == 2 &&
       ctx.getEventsForPattern("foo").asScala.exists(prev => prev.value_b
== v.value_b)
    })

This will accept the current event if any if any previously had a value_b
that matches the current event. But the matches will include all previous
events, even those that did not match the current event at value_b, instead
of only matching the previous event where value_b equals the current event.

Is there a way to only output the match there previous event matches the
current event value_b (e.g. foo == (type=1, value_a=K, value_b=X) and bar
== (type=2, value_a=K, value_b=X)?

Mime
View raw message