flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Piotr Nowojski <pi...@ververica.com>
Subject Re: Communication between two queries
Date Fri, 13 Mar 2020 13:32:32 GMT

Could you explain a bit more what are you trying to achieve? 

One problem that pops into my head is that currently in Flink Streaming (it is possible for
processing bounded data), there is no way to “not ingest” the data reliably in general
case, as this might deadlock the upstream operator once the output buffers will fill out.
However instead, you can for example filter out/ignore records until some condition is met.

BroadcastState works for one single operator (and it’s parallel instances) - it doesn’t
automatically communicate with any upstream/downstream operators - you have to wire/connect
your operators and distribute the information as you want to. For examples how does it work
you can take a look at this ITCase for example [1].

What you could do, is create following job topology using side outputs [2]:

Src1 -> OP1 -> broadcast_side_output  

And use BroadcastProcessFunction to read Src1 and  broadcast_side_output.

Src1 +  broadcast_side_output -> OP2 -> Sink2

But as I wrote before, you have to be careful in OP2. If both OP1 and OP2 are reading from
the same data stream Src1, if you stop reading records from Src1 in OP2, you eventually deadlock
Src1 itself. Solution for that, would be to create second instance of Src1 operator, that
would read records from the external system second time:

Src1" +  broadcast_side_output -> OP2 -> Sink2   


[1] https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BroadcastStateITCase.java
[2] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html <https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html>

> On 12 Mar 2020, at 12:53, Mikael Gordani <mi.gordani@gmail.com> wrote:
> Hello everyone!
> So essentially, I've two identical queries (q1 and q2) running in parallel (Streams).
> I'm trying to activate the ingestion of data to q2 based on what is processed in q1.

> E.g say that we want to start ingesting data to q2 when a tuple with timestamp > 5000
appears in q1.
> The queries are constructed in this way. (they share the same source)
> q1: Source -> Filter -> Aggregate -> Filter -> Sink
>             | 
>            V
> q2:      Filter -> Filter -> Aggregate -> Filter -> Sink
> The initial idea was to have a global variable which is shared between the two queries.
When this tuple appears in q1, it will set the variable to true in the first Filter operator.
While in q2, the first Filter-operator returns tuples depending on the value of the global
> When the variable = true, it will let data pass, when set to false, no data is allowed
to be ingested.
> This works fine when you have all the tasks on the same machine, but of course, it becomes
troublesome in distributed deployments (tasks in different nodes and such).
> My second approach was to create some sort of "loop" in the query. So let's say that
we have the processing logic placed in the last Filter operator in q1, and when this "special"
tuple appears, it can communicate with the first Filter operator in q2, in order to allow
data to be ingested.
> I've tried playing around with IterativeStreams but I don't really get it to work, and
I feel like it's the wrong approach..
> How can I achieve this sort of functionality? 
> I'm looking a bit on the BroadcastState part of the DataStream API, but I feel confused
on how to use it. Is it possible to broadcast from a downstream to an upstream?
> Suggestions would be much appreciated!
> Best Regards,
> Mikael Gordani

View raw message