flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mikael Gordani <mi.gord...@gmail.com>
Subject Re: Communication between two queries
Date Tue, 17 Mar 2020 15:14:07 GMT
No worries and great idea!
I will play around with it and see what I manage to do.
Cheers!

Den tis 17 mars 2020 kl 15:59 skrev Piotr Nowojski <piotr@ververica.com>:

> Ops, sorry there was a misleading typo/auto correction in my previous
> e-mail. Second sentence should have been:
>
> > First of all you would have to use event time semantic for consistent
> results
>
> Piotrek
>
> On 17 Mar 2020, at 14:43, Piotr Nowojski <piotr@ververica.com> wrote:
>
> Hi,
>
> Yes, you are looking in the right directions with the watermarks.
>
> First of all you would have to use event time semantic for constant
> results. With processing time everything would be simpler, but it would be
> more difficult to reason about the results (your choice). Secondly, you
> would have to hook up the logic of enabling query1/query2 to the event
> time/watermarks. Thirdly, you need to somehow to sync the input switching
> with the windows boundaries. On top of that, watermarks express lower bound
> of even time that you can expect. However, in order to guarantee
> consistency of the windows, you would like to control the upper bound. For
> example:
>
> 1. If you want to enable Query2, you would need to check what’s the
> largest/latest event time that was processed by the input splitter, lets
> say that’s TS1
> 2. That means, records with event time < TS1 have already been processed
> by Query1, starting some windows
> 3. The earliest point for which you could enable Query2, is thus TS1 + 1.
> 4. You would have to adjust Query2 start time, by start of the next time
> window, let’s say that would be TS2 = TS1 + 1 + start of next window
> 5. Input splitter now must keep sending records with event time < TS2 to
> Query1, but already should redirect records with event time >= TS2 to
> Query2.
> 6. Once watermark for the input splitter advances past TS2, that’s when it
> can finally stop sending records to Query1 and query1 logic could be
> considered “completed”.
>
> So Query1 would be responsible for all of the data before TS2, and Query2
> after TS2.
>
> Alternatively, your input splitter could also buffer some records, so that
> you could enable Query2 faster, by re-sending the buffered records. But in
> that case, both Query1 and Query2 would be responsible for some portion of
> the data.
>
> Piotrek
>
> On 17 Mar 2020, at 10:35, Mikael Gordani <mi.gordani@gmail.com> wrote:
>
> Hi Piotr!
>
> Continuing with my scenario, since both of the queries will share the same
> sink, I've realized that some issues will appear when I switch queries.
> Especially with regards to stateful operators, e.g aggregation.
>
> Let me provide an example:
> So, let say that both of the queries ingest a sequence of integers, and it
> will perform the average of these integers over some time.
> E.g say that *query1* ingest the sequence *1,2,3,4.... *
> The windows for *query1* will be *[1,2,3] [2,3,4] [3,4]*.
>
> If I'm later on "activating" *query2*, I need to have both of the queries
> allowing tuples for a while, in order to allow the aggregation to finish in
> *query1* before denying it input.
> But, there is a possibility that *query2* might receive the tuples *3,4*,
> which will result in the window: *[3][3,4][3,4]*
> Later on, the output of the respective queries will be:
> Query 1: 3, *4.5*, 3.5
> Query2 : 3, *3.5*, 3.5
>
> As one can see, the second output will be different.
> I'm thinking of using watermarks somehow to make sure that both queries
> has processed the same amount of data before writing to the sink, but I'm a
> bit unsure on how to do it.
> Do you have any suggestions or thoughts?
> Cheers,
>
> Den mån 16 mars 2020 kl 08:43 skrev Piotr Nowojski <piotr@ververica.com>:
>
>> Hi,
>>
>> Let us know if something doesn’t work :)
>>
>> Piotrek
>>
>> On 16 Mar 2020, at 08:42, Mikael Gordani <mi.gordani@gmail.com> wrote:
>>
>> Hi,
>> I'll try it out =)
>>
>> Cheers!
>>
>> Den mån 16 mars 2020 kl 08:32 skrev Piotr Nowojski <piotr@ververica.com>:
>>
>>> Hi,
>>>
>>> In that case you could try to implement your `FilterFunction` as two
>>> input operator, with broadcast control input, that would be setting the
>>> `global_var`. Broadcast control input can be originating from some source,
>>> or from some operator.
>>>
>>> Piotrek
>>>
>>> On 13 Mar 2020, at 15:47, Mikael Gordani <mi.gordani@gmail.com> wrote:
>>>
>>> Hi Piotr!
>>> Thanks for your response, I'll try to explain what I'm trying to achieve
>>> in more detail:
>>>
>>> Essentially, If I've two queries, in which has the same operators and
>>> runs in the same task, I would want to figure out some way of controlling
>>> the ingestion from *a source* to the respective queries in such a way
>>> that only one of the queries receive data, based on a condition.
>>> For more context, the second query (query2), is equipped with
>>> instrumented operators, which are standard operators extended with some
>>> extra functionality, in my case, they enrich the tuples with meta-data.
>>>
>>> Source --> *Filter1* ---> rest of query1
>>>    |
>>>    v
>>>    *Filter2* ---> rest of query2
>>>
>>> By using *filters* prior to the queries, they allow records to pass
>>> depending on a condition*, *let's say a global boolean variable (which
>>> is initially set to false).
>>> If it's set to *true, Filter1 will accept every record and Filter2 will
>>> disregard every record.*
>>> If it's set to
>>> *false, Filter2 will accept every record and Filter1 will disregard
>>> every record.*
>>>
>>> *So the filter operators looks something like this: *
>>>
>>> boolean global_var = false;
>>>
>>> private static class filter1 implements FilterFunction<Tuple t> {
>>>     @Override
>>>     public boolean filter(Tuple t) throws Exception {
>>>         return !global_var;
>>>     }
>>> }
>>>
>>> private static class filter2 implements FilterFunction<Tuple t> {
>>>     @Override
>>>     public boolean filter(Tuple t) throws Exception {
>>>         return global_var;
>>>     }
>>> }
>>>
>>>
>>> Then later on, in the respective queries, there are some processing
>>> logic in which changes the value of the global variable, thus enabling and
>>> disabling the flow of data from the source to the respective queries.
>>> The problem lies in this global variable being problematic in
>>> distributed deployments, in which I'm having a hard time figuring out how
>>> to solve.
>>> Is it a bit more clear? =)
>>>
>>>
>>>
>>
>> --
>> Med Vänliga Hälsningar,
>> Mikael Gordani
>>
>>
>>
>
> --
> Med Vänliga Hälsningar,
> Mikael Gordani
>
>
>
>

-- 
Med Vänliga Hälsningar,
Mikael Gordani

Mime
View raw message