From user-return-33549-archive-asf-public=cust-asf.ponee.io@flink.apache.org Tue Mar 17 15:12:17 2020 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id DB7AB18057A for ; Tue, 17 Mar 2020 16:12:16 +0100 (CET) Received: (qmail 52184 invoked by uid 500); 17 Mar 2020 15:12:15 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 52174 invoked by uid 99); 17 Mar 2020 15:12:15 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 17 Mar 2020 15:12:15 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 8E6ADC0FC4 for ; Tue, 17 Mar 2020 15:12:14 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 0 X-Spam-Level: X-Spam-Status: No, score=0 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, DKIM_VALID_EF=-0.1, HTML_MESSAGE=0.2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H2=-0.001, SPF_HELO_NONE=0.001, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd4-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-ec2-va.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id EM5yIWEIAjjq for ; Tue, 17 Mar 2020 15:12:13 +0000 (UTC) Received-SPF: Pass (mailfrom) identity=mailfrom; client-ip=209.85.221.42; helo=mail-wr1-f42.google.com; envelope-from=mi.gordani@gmail.com; receiver= Received: from mail-wr1-f42.google.com (mail-wr1-f42.google.com [209.85.221.42]) by mx1-ec2-va.apache.org (ASF Mail Server at mx1-ec2-va.apache.org) with ESMTPS id B015CC2026 for ; Tue, 17 Mar 2020 15:12:12 +0000 (UTC) Received: by mail-wr1-f42.google.com with SMTP id b2so19927077wrj.10 for ; Tue, 17 Mar 2020 08:12:12 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:references:in-reply-to:from:date:message-id:subject:to :cc; bh=P6V1O5lIJzRVPCdii3MsjU2hVT+1bTTGILvZbq8ZxY8=; b=bt8N8k7WBfZLyOTB3xm8x+UFaSKdhq1sAilErdeFXblEjSQMyGdvcVtnaF3B2etSoP mcXPEyqiHTO4xTQVf8KCkbroKFHFZJrn7mOa7r9wDEmNJSPC8G1JemftlG+lz0DlfSlX UGyEzOc6eTVfR6N+IaYvlyqwcfbMsOWQcXontEgQSMeOTP8YU1XwSYIonyxahhpqWaYV g3rj0bBagCYbTxzhU22qP3MdhWstypDyB6qlAmHTUGPRIBUYKTcElh4l11AIP6ErNvOx povPPhu1n+awfi3LMdmVhLb4QM8Skljr9sx7/YsJnG7bhEfFHWzg0IFInbFvVYwKpZfw c/Lg== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:references:in-reply-to:from:date :message-id:subject:to:cc; bh=P6V1O5lIJzRVPCdii3MsjU2hVT+1bTTGILvZbq8ZxY8=; b=UQMw+jEHqMM/YzwW8yv2A0PBmSioEojJo7VcTjCyxapv/ATEQDVGORZgK1CeasvQoe c+12HREibfycxThugRmYNTHOFPUZZOzargu632iN7P9PNZMyymI8GQfD7rhCJorGcsL8 qkZ3cwBAAFN6IFUBWZjw/bShnaFZEbS7WilucSDcdLvnhRJShYgXwOfLdKvNZqzMRRo7 fdxDfjWm+1CLyMhB2m2AaTvHmtXEv4s3KgvE6jXzrrD7hq4MiRJv5Y51BkuaC4HOGZvn 4JNBJFwp5I2GJeGV4v0OvBAbwhpwen6CRxYoUKv2tWCpHgWyjzHjaJs8bYY6foBdS5B6 xJdw== X-Gm-Message-State: ANhLgQ2ulSf2tehtgZg2ZI9a5uC30o/xrXUd9zxc+SVCq2k6prDpIgNw 3AoWo7YIhhJymxWJBczWv2F9UnE/xKNBJQszz8A= X-Google-Smtp-Source: ADFU+vs1ZiQHEyEK5ldecxlsFYzvsBk0p9g5umS3WVRqAo6Gq91BfBVsM0rkHUkS6m8HpLAV7m4O9CRNXXVi2E7AcK4= X-Received: by 2002:adf:ef44:: with SMTP id c4mr6554709wrp.404.1584457925635; Tue, 17 Mar 2020 08:12:05 -0700 (PDT) MIME-Version: 1.0 References: <497FCC3A-2C20-436A-948C-07251D4BE6A1@ververica.com> <5DC81624-6D80-4D15-A98D-42C27FC0B363@ververica.com> In-Reply-To: <5DC81624-6D80-4D15-A98D-42C27FC0B363@ververica.com> From: Mikael Gordani Date: Tue, 17 Mar 2020 16:14:07 +0100 Message-ID: Subject: Re: Communication between two queries To: Piotr Nowojski Cc: user@flink.apache.org Content-Type: multipart/alternative; boundary="000000000000d1b18e05a10e5b26" --000000000000d1b18e05a10e5b26 Content-Type: text/plain; charset="UTF-8" Content-Transfer-Encoding: quoted-printable No worries and great idea! I will play around with it and see what I manage to do. Cheers! Den tis 17 mars 2020 kl 15:59 skrev Piotr Nowojski : > Ops, sorry there was a misleading typo/auto correction in my previous > e-mail. Second sentence should have been: > > > First of all you would have to use event time semantic for consistent > results > > Piotrek > > On 17 Mar 2020, at 14:43, Piotr Nowojski wrote: > > Hi, > > Yes, you are looking in the right directions with the watermarks. > > First of all you would have to use event time semantic for constant > results. With processing time everything would be simpler, but it would b= e > more difficult to reason about the results (your choice). Secondly, you > would have to hook up the logic of enabling query1/query2 to the event > time/watermarks. Thirdly, you need to somehow to sync the input switching > with the windows boundaries. On top of that, watermarks express lower bou= nd > of even time that you can expect. However, in order to guarantee > consistency of the windows, you would like to control the upper bound. Fo= r > example: > > 1. If you want to enable Query2, you would need to check what=E2=80=99s t= he > largest/latest event time that was processed by the input splitter, lets > say that=E2=80=99s TS1 > 2. That means, records with event time < TS1 have already been processed > by Query1, starting some windows > 3. The earliest point for which you could enable Query2, is thus TS1 + 1. > 4. You would have to adjust Query2 start time, by start of the next time > window, let=E2=80=99s say that would be TS2 =3D TS1 + 1 + start of next w= indow > 5. Input splitter now must keep sending records with event time < TS2 to > Query1, but already should redirect records with event time >=3D TS2 to > Query2. > 6. Once watermark for the input splitter advances past TS2, that=E2=80=99= s when it > can finally stop sending records to Query1 and query1 logic could be > considered =E2=80=9Ccompleted=E2=80=9D. > > So Query1 would be responsible for all of the data before TS2, and Query2 > after TS2. > > Alternatively, your input splitter could also buffer some records, so tha= t > you could enable Query2 faster, by re-sending the buffered records. But i= n > that case, both Query1 and Query2 would be responsible for some portion o= f > the data. > > Piotrek > > On 17 Mar 2020, at 10:35, Mikael Gordani wrote: > > Hi Piotr! > > Continuing with my scenario, since both of the queries will share the sam= e > sink, I've realized that some issues will appear when I switch queries. > Especially with regards to stateful operators, e.g aggregation. > > Let me provide an example: > So, let say that both of the queries ingest a sequence of integers, and i= t > will perform the average of these integers over some time. > E.g say that *query1* ingest the sequence *1,2,3,4.... * > The windows for *query1* will be *[1,2,3] [2,3,4] [3,4]*. > > If I'm later on "activating" *query2*, I need to have both of the queries > allowing tuples for a while, in order to allow the aggregation to finish = in > *query1* before denying it input. > But, there is a possibility that *query2* might receive the tuples *3,4*, > which will result in the window: *[3][3,4][3,4]* > Later on, the output of the respective queries will be: > Query 1: 3, *4.5*, 3.5 > Query2 : 3, *3.5*, 3.5 > > As one can see, the second output will be different. > I'm thinking of using watermarks somehow to make sure that both queries > has processed the same amount of data before writing to the sink, but I'm= a > bit unsure on how to do it. > Do you have any suggestions or thoughts? > Cheers, > > Den m=C3=A5n 16 mars 2020 kl 08:43 skrev Piotr Nowojski : > >> Hi, >> >> Let us know if something doesn=E2=80=99t work :) >> >> Piotrek >> >> On 16 Mar 2020, at 08:42, Mikael Gordani wrote: >> >> Hi, >> I'll try it out =3D) >> >> Cheers! >> >> Den m=C3=A5n 16 mars 2020 kl 08:32 skrev Piotr Nowojski : >> >>> Hi, >>> >>> In that case you could try to implement your `FilterFunction` as two >>> input operator, with broadcast control input, that would be setting the >>> `global_var`. Broadcast control input can be originating from some sour= ce, >>> or from some operator. >>> >>> Piotrek >>> >>> On 13 Mar 2020, at 15:47, Mikael Gordani wrote: >>> >>> Hi Piotr! >>> Thanks for your response, I'll try to explain what I'm trying to achiev= e >>> in more detail: >>> >>> Essentially, If I've two queries, in which has the same operators and >>> runs in the same task, I would want to figure out some way of controlli= ng >>> the ingestion from *a source* to the respective queries in such a way >>> that only one of the queries receive data, based on a condition. >>> For more context, the second query (query2), is equipped with >>> instrumented operators, which are standard operators extended with some >>> extra functionality, in my case, they enrich the tuples with meta-data. >>> >>> Source --> *Filter1* ---> rest of query1 >>> | >>> v >>> *Filter2* ---> rest of query2 >>> >>> By using *filters* prior to the queries, they allow records to pass >>> depending on a condition*, *let's say a global boolean variable (which >>> is initially set to false). >>> If it's set to *true, Filter1 will accept every record and Filter2 will >>> disregard every record.* >>> If it's set to >>> *false, Filter2 will accept every record and Filter1 will disregard >>> every record.* >>> >>> *So the filter operators looks something like this: * >>> >>> boolean global_var =3D false; >>> >>> private static class filter1 implements FilterFunction { >>> @Override >>> public boolean filter(Tuple t) throws Exception { >>> return !global_var; >>> } >>> } >>> >>> private static class filter2 implements FilterFunction { >>> @Override >>> public boolean filter(Tuple t) throws Exception { >>> return global_var; >>> } >>> } >>> >>> >>> Then later on, in the respective queries, there are some processing >>> logic in which changes the value of the global variable, thus enabling = and >>> disabling the flow of data from the source to the respective queries. >>> The problem lies in this global variable being problematic in >>> distributed deployments, in which I'm having a hard time figuring out h= ow >>> to solve. >>> Is it a bit more clear? =3D) >>> >>> >>> >> >> -- >> Med V=C3=A4nliga H=C3=A4lsningar, >> Mikael Gordani >> >> >> > > -- > Med V=C3=A4nliga H=C3=A4lsningar, > Mikael Gordani > > > > --=20 Med V=C3=A4nliga H=C3=A4lsningar, Mikael Gordani --000000000000d1b18e05a10e5b26 Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
No worries and great idea!
I will play around wit= h it and see what I manage to do.
Cheers!

Den tis 17 mars 2020 kl 1= 5:59 skrev Piotr Nowojski <piotr@= ververica.com>:
Ops, sorry there was a mis= leading typo/auto correction in my previous e-mail. Second sentence should = have been:

> First of all you would have to use event= time semantic for consistent results

Piotrek
<= div>
On 17 Mar 2020, at 14:43, Piotr Nowo= jski <piotr@ver= verica.com> wrote:

Hi,

Yes, you are looking in the right directions = with the watermarks.=C2=A0

First of all you would = have to use event time semantic for constant results. With processing time = everything would be simpler, but it would be more difficult to reason about= the results (your choice). Secondly, you would have to hook up the logic o= f enabling query1/query2 to the event time/watermarks. Thirdly, you need to= somehow to sync the input switching with the windows boundaries. On top of= that, watermarks express lower bound of even time that you can expect. How= ever, in order to guarantee consistency of the windows, you would like to c= ontrol the upper bound. For example:

1. If you wan= t to enable Query2, you would need to check what=E2=80=99s the largest/late= st event time that was processed by the input splitter, lets say that=E2=80= =99s TS1=C2=A0
2. That means, records with event time < TS1 ha= ve already been processed by Query1, starting some windows
3. The= earliest point for which you could enable Query2, is thus TS1 + 1.
4. You would have to adjust Query2 start time, by start of the next time= window, let=E2=80=99s say that would be TS2 =3D TS1 + 1 + start of next wi= ndow
5. Input splitter now must keep sending records with event t= ime < TS2 to Query1, but already should redirect records with event time= >=3D TS2 to Query2.
6. Once watermark for the input splitter = advances past TS2, that=E2=80=99s when it can finally stop sending records = to Query1 and query1 logic could be considered =E2=80=9Ccompleted=E2=80=9D.= =C2=A0

So Query1 would be responsible for all of = the data before TS2, and Query2 after TS2.

Alterna= tively, your input splitter could also buffer some records, so that you cou= ld enable Query2 faster, by re-sending the buffered records. But in that ca= se, both Query1 and Query2 would be responsible for some portion of the dat= a.

Piotrek

<= div>On 17 Mar 2020, at 10:35, Mikael Gordani <mi.gordani@gmail.com> wrote:
Hi Piotr!

Continuing with my= scenario, since both of the queries will share the same sink, I've rea= lized that some issues will appear when I switch queries. Especially with r= egards to stateful operators, e.g aggregation.

Let me provide an example:
So, let say that both = of the queries ingest a sequence of integers, and it will perform the avera= ge of these integers over some time.
E.g say that query1 ingest the sequence 1,2,3,4....
The windows for query1= will be [1,2,3] [2,3,4] [3,4].

If I'm later = on "activating" query2, I need to have both of the queries= allowing tuples for a while, in order to allow the aggregation to finish i= n query1 before denying it input.
But, there is a possibility tha= t query2 might receive the tuples 3,4, which will result in t= he window: [3][3,4][3,4]
Later on, the output of the r= espective queries will be:
Query 1: 3, 4.5, 3.5
Query2 : 3, 3.5, 3.5

As one can= see, the second output will be different.
I'm thinking = of using watermarks somehow to make sure that both queries has processed th= e same amount of data before writing to the sink, but I'm a bit unsure = on how to do it.
Do you have any suggestions or thoughts?
=
Cheers,

Den m=C3=A5n 16 mars 2020 kl 08:43 skrev Piot= r Nowojski <pio= tr@ververica.com>:
Hi,

Let us know if something doesn=E2=80= =99t work :)

Piotrek

On 16 Mar 2020, at 08:42, Mikael Gordani <mi.gordani@gmail.com> wro= te:

Hi,
I'll try it o= ut =3D)

Cheers!

Den m=C3=A5n 16 mars 2020 kl 08= :32 skrev Piotr Nowojski <piotr@ververica.com>:
Hi,

In that case you could= try to implement your `FilterFunction` as two input operator, with broadca= st control input, that would be setting the `global_var`. Broadcast control= input can be originating from some source, or from some operator.

Piotrek

On 13 M= ar 2020, at 15:47, Mikael Gordani <mi.gordani@gmail.com> wrote:

Hi Piotr!
Tha= nks for your response, I'll try to explain what I'm trying to achie= ve in more detail:

Essentially, If I've two queries, = in which has the same operators and runs in the same task, I would want to = figure out some way of controlling the ingestion from a source to th= e respective queries in such a way that only one of the queries receive dat= a, based on a condition.
For more context, the second query (que= ry2), is equipped with instrumented operators, which are standard operators= extended with some extra functionality, in my case, they enrich the tuples= with meta-data.

Source --> Filter1= ---> rest of query1
=C2=A0=C2=A0 |
=C2=A0=C2=A0 v<= br>
=C2=A0=C2=A0 Filter2 ---> rest of query2

By using filters prior to the queries, they allow records to = pass depending on a condition, let's say a global boolean variab= le (which is initially set to false).
If it's set to true, = Filter1 will accept every record and Filter2 will disregard e= very record.
If it's set to false, Filter2 will = accept every record and Filter1 will disregard every record.
=
So the filter operators looks something like this:
boolean global_var =3D false;

private static class= filter1
implements Fil= terFunction<Tuple t> {
@= Override
public boolean filter(Tuple t) throws Exception {
return !global_var;
}
}

private static class filter2 implements FilterFunction<Tuple t> {
<= span style=3D"color:rgb(187,181,41)">@Override
public= boolean filter(Tuple = t) throws Exception {
= return global_var;
}
}
Then later on, in the respective queri= es, there are some processing logic in which changes the value of the globa= l variable, thus enabling and disabling the flow of data from the source to= the respective queries.
The problem lies in this global variable being = problematic in distributed deployments, in which I'm having a hard time= figuring out how to solve.
Is it a bit more clear? =3D)
<= /div>



--

Med V=C3=A4nlig= a H=C3=A4lsningar,
Mikael Gordani




--

Med V=C3=A4nlig= a H=C3=A4lsningar,
Mikael Gordani





--

<= div dir=3D"ltr">
Med V=C3= =A4nliga H=C3=A4lsningar,
Mikael Gordani

--000000000000d1b18e05a10e5b26--