flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Piotr Nowojski <pi...@ververica.com>
Subject Re: Communication between two queries
Date Tue, 17 Mar 2020 13:43:47 GMT

Yes, you are looking in the right directions with the watermarks. 

First of all you would have to use event time semantic for constant results. With processing
time everything would be simpler, but it would be more difficult to reason about the results
(your choice). Secondly, you would have to hook up the logic of enabling query1/query2 to
the event time/watermarks. Thirdly, you need to somehow to sync the input switching with the
windows boundaries. On top of that, watermarks express lower bound of even time that you can
expect. However, in order to guarantee consistency of the windows, you would like to control
the upper bound. For example:

1. If you want to enable Query2, you would need to check what’s the largest/latest event
time that was processed by the input splitter, lets say that’s TS1 
2. That means, records with event time < TS1 have already been processed by Query1, starting
some windows
3. The earliest point for which you could enable Query2, is thus TS1 + 1.
4. You would have to adjust Query2 start time, by start of the next time window, let’s say
that would be TS2 = TS1 + 1 + start of next window
5. Input splitter now must keep sending records with event time < TS2 to Query1, but already
should redirect records with event time >= TS2 to Query2.
6. Once watermark for the input splitter advances past TS2, that’s when it can finally stop
sending records to Query1 and query1 logic could be considered “completed”.  

So Query1 would be responsible for all of the data before TS2, and Query2 after TS2.

Alternatively, your input splitter could also buffer some records, so that you could enable
Query2 faster, by re-sending the buffered records. But in that case, both Query1 and Query2
would be responsible for some portion of the data.


> On 17 Mar 2020, at 10:35, Mikael Gordani <mi.gordani@gmail.com> wrote:
> Hi Piotr!
> Continuing with my scenario, since both of the queries will share the same sink, I've
realized that some issues will appear when I switch queries. Especially with regards to stateful
operators, e.g aggregation.
> Let me provide an example:
> So, let say that both of the queries ingest a sequence of integers, and it will perform
the average of these integers over some time.
> E.g say that query1 ingest the sequence 1,2,3,4.... 
> The windows for query1 will be [1,2,3] [2,3,4] [3,4]. 
> If I'm later on "activating" query2, I need to have both of the queries allowing tuples
for a while, in order to allow the aggregation to finish in query1 before denying it input.
> But, there is a possibility that query2 might receive the tuples 3,4, which will result
in the window: [3][3,4][3,4]
> Later on, the output of the respective queries will be:
> Query 1: 3, 4.5, 3.5
> Query2 : 3, 3.5, 3.5
> As one can see, the second output will be different. 
> I'm thinking of using watermarks somehow to make sure that both queries has processed
the same amount of data before writing to the sink, but I'm a bit unsure on how to do it.
> Do you have any suggestions or thoughts?
> Cheers,
> Den mån 16 mars 2020 kl 08:43 skrev Piotr Nowojski <piotr@ververica.com <mailto:piotr@ververica.com>>:
> Hi,
> Let us know if something doesn’t work :)
> Piotrek
>> On 16 Mar 2020, at 08:42, Mikael Gordani <mi.gordani@gmail.com <mailto:mi.gordani@gmail.com>>
>> Hi,
>> I'll try it out =) 
>> Cheers!
>> Den mån 16 mars 2020 kl 08:32 skrev Piotr Nowojski <piotr@ververica.com <mailto:piotr@ververica.com>>:
>> Hi,
>> In that case you could try to implement your `FilterFunction` as two input operator,
with broadcast control input, that would be setting the `global_var`. Broadcast control input
can be originating from some source, or from some operator.
>> Piotrek
>>> On 13 Mar 2020, at 15:47, Mikael Gordani <mi.gordani@gmail.com <mailto:mi.gordani@gmail.com>>
>>> Hi Piotr!
>>> Thanks for your response, I'll try to explain what I'm trying to achieve in more
>>> Essentially, If I've two queries, in which has the same operators and runs in
the same task, I would want to figure out some way of controlling the ingestion from a source
to the respective queries in such a way that only one of the queries receive data, based on
a condition. 
>>> For more context, the second query (query2), is equipped with instrumented operators,
which are standard operators extended with some extra functionality, in my case, they enrich
the tuples with meta-data.
>>> Source --> Filter1 ---> rest of query1
>>>    |
>>>    v
>>>    Filter2 ---> rest of query2
>>> By using filters prior to the queries, they allow records to pass depending on
a condition, let's say a global boolean variable (which is initially set to false).
>>> If it's set to true, Filter1 will accept every record and Filter2 will disregard
every record.
>>> If it's set to false, Filter2 will accept every record and Filter1 will disregard
every record.
>>> So the filter operators looks something like this: 
>>> boolean global_var = false;
>>> private static class filter1 implements FilterFunction<Tuple t> {
>>>     @Override
>>>     public boolean filter(Tuple t) throws Exception {
>>>         return !global_var;
>>>     }
>>> }
>>> private static class filter2 implements FilterFunction<Tuple t> {
>>>     @Override
>>>     public boolean filter(Tuple t) throws Exception {
>>>         return global_var;
>>>     }
>>> }
>>> Then later on, in the respective queries, there are some processing logic in
which changes the value of the global variable, thus enabling and disabling the flow of data
from the source to the respective queries.
>>> The problem lies in this global variable being problematic in distributed deployments,
in which I'm having a hard time figuring out how to solve.
>>> Is it a bit more clear? =)
>> -- 
>> Med Vänliga Hälsningar,
>> Mikael Gordani
> -- 
> Med Vänliga Hälsningar,
> Mikael Gordani

View raw message