flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: Operations dependencies between values with different key in a ConnectedStreams
Date Wed, 09 Aug 2017 12:44:34 GMT
Hi Gabriele,

Yes, something like this is possible with Flink. However, you have to implement a two-stage
approach for this that I would roughly call "scatter-gather". You have three operators:

input -> Scatter -> State -> Gather -> output

Where the "Scatter" analyses the what state you need for which key, then sends requests for
those downstream as well as the original message. All of these messages must be amended with
a unique ID. The "State" operator has the actual state, it receives the "request" events and
sends the state that it has for the given key downstream, again with the unique ID that was
generated earlier. The "Gather" operator would receive the original message and all the different
bits of state that were emitted by the "State" operator. Here you need to buffer and wait
until you receive all the messages for a given unique ID. Once you have those you can process.

This is a presentation from ING who implemented that approach (along with a DSL for specifying
calculations): https://www.slideshare.net/FlinkForward/flink-forward-sf-2017-erik-de-nooij-streaming-models-how-ing-adds-models-at-runtime-to-catch-fraudsters
It has nice pictures and might be more helpful than my description.


> On 28. Jul 2017, at 22:14, Chao Wang <chaowang@wustl.edu> wrote:
> Hi Gabriele,
> I think CEP may be able to deal with this kind of expressions, in general, although I
am not sure about how to deal with different time windows (5s and 3s, in your case). Take
a look at the available patterns in the CEP library doc: https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/libs/cep.html#the-pattern-api
> Chao
> On 07/28/2017 10:08 AM, Gabriele Di Bernardo wrote:
>> Hi guys,
>> I have a question for you. I have an application with two keyed data streams: one
for control and the other one for the data. Each control message represents an operation to
be performed on the data values marked with a certain identifier. I connected the two streams
and I process the data with a CoProcessFunction.
>> The operations I do are really simple like collecting the MAX or the MEAN value of
the last n seconds. Now, I would like to create more complex operations where the result value
of a key might depend by the result value of another key. To be more clear, I would like to
evaluate expressions like: if {ALL the values of data marked with id 22 in the last 5s} are
BIGGER THAN {The MEAN value of data marked with id 17 in the last 3s}. In this example, I
should defer the evaluation of the expression until I have the MEAN value of the right part
of the expression and check it against ALL the data keyed with key 22 from the last 5 seconds.
I’d like to ask you if something like this in Flink is doable and what is the best way to
do that in your opinion. I am also checking how the CEP library works (can it be helpful?).
>> Thank you so much in advance.
>> Best,
>> Gabriele

View raw message