From user-return-33731-archive-asf-public=cust-asf.ponee.io@flink.apache.org Wed Mar 25 18:53:18 2020 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id 990F218063D for ; Wed, 25 Mar 2020 19:53:17 +0100 (CET) Received: (qmail 49158 invoked by uid 500); 25 Mar 2020 18:53:15 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 49138 invoked by uid 99); 25 Mar 2020 18:53:15 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 25 Mar 2020 18:53:15 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 14F9AC2D9B for ; Wed, 25 Mar 2020 18:53:15 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 0.449 X-Spam-Level: X-Spam-Status: No, score=0.449 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HEADER_FROM_DIFFERENT_DOMAINS=0.249, HTML_MESSAGE=0.2, RCVD_IN_DNSWL_NONE=-0.0001, SPF_HELO_NONE=0.001, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd4-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=ververica-com.20150623.gappssmtp.com Received: from mx1-ec2-va.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id AwA7x06WmnrP for ; Wed, 25 Mar 2020 18:53:12 +0000 (UTC) Received-SPF: Pass (mailfrom) identity=mailfrom; client-ip=209.85.128.52; helo=mail-wm1-f52.google.com; envelope-from=arvid@data-artisans.com; receiver= Received: from mail-wm1-f52.google.com (mail-wm1-f52.google.com [209.85.128.52]) by mx1-ec2-va.apache.org (ASF Mail Server at mx1-ec2-va.apache.org) with ESMTPS id A19D7BCAF7 for ; Wed, 25 Mar 2020 18:53:12 +0000 (UTC) Received: by mail-wm1-f52.google.com with SMTP id b12so3728025wmj.3 for ; Wed, 25 Mar 2020 11:53:12 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=ververica-com.20150623.gappssmtp.com; s=20150623; h=mime-version:references:in-reply-to:from:date:message-id:subject:to :cc; bh=k0eCehY9PdXTD5GqqeVDjV72BQHvyNbN3DDf3zS9rkw=; b=x2z+w7hEo3XHhOG0pviYkghXLSEqAHK0nMWl8018KPNo9IwI418qOLle2FF18V8oNM mBXcoPAQNJXk158EtHkrRXIeqimcO2Oxo8nVvGULDEHD86MA5b30vGqVahTOLemCKvM9 JJD3ITLbzSO14f77nomcdG3S8SQ1CqHDC7Amc1/dbLGpp10f26fkzuHNlyknyjDm101+ ZuNEqYwcX2zMuGOvqO0byhyVKD63Z/YVo48THbj8p7Hlbq16LKiWzxMddNkB8Ep6k2wa uXewVGD4Rd+vgZlhNHDBJo633KeA1AznsH+8iVEySEVjQme+o13LiIhXHfQM9ytUDzV2 YQDw== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:references:in-reply-to:from:date :message-id:subject:to:cc; bh=k0eCehY9PdXTD5GqqeVDjV72BQHvyNbN3DDf3zS9rkw=; b=LJT7+sYiY+OSkd+Q+7TJ2fcmJ6PBNGnl9kUSJnjTMYfDqvFEfVUogwQ6qfVh9Jqq5E a0GKPFuDW+3MLKRZbGUBpkSlZ+FSOS91ZcZAv9AHIPBS+4BGyuGNhG2bPM2q5qJ8Uu4A UcBHoFUsc2Fuh6jQSWVdwxfVrmrV/eZwg3ica0gnd2BVdNnSKBm0e7mdrzGYIPFhKzJg fh/XJugEZmMsQpM190pkWSUCBkucmdKcZu/iHzXfWyz36mQLWUH8vxEO8qmuVKu2z+O/ ER5xFI55+VV8F3hE1NecvkmMfitm+jovhwd8gkH/v+pBVZDOTsz5eAZscVyNjCQR5eCm 1qtQ== X-Gm-Message-State: ANhLgQ2E1IEvBOwPv/Tfxis/Ku3hRRXMvrjLbC39iKXQUiOGRRH5n5Us V8FAhY+4m8pBsJjIgCgT/TmHfTCXw3zu3qSnz2gjfA== X-Google-Smtp-Source: ADFU+vsfuI7HTC7825iX5l8Dhyel/khOr5V9WkhAHS9YrFejtS6ZyD9P7UDoViILdHoHArjww+eQOZGOvXwM9ZxyegY= X-Received: by 2002:a1c:2b06:: with SMTP id r6mr4669706wmr.25.1585162384502; Wed, 25 Mar 2020 11:53:04 -0700 (PDT) MIME-Version: 1.0 References: In-Reply-To: From: Arvid Heise Date: Wed, 25 Mar 2020 19:52:53 +0100 Message-ID: Subject: Re: Dynamic Flink SQL To: Krzysztof Zarzycki Cc: user , dev Content-Type: multipart/alternative; boundary="000000000000d7096505a1b26054" --000000000000d7096505a1b26054 Content-Type: text/plain; charset="UTF-8" I saw that requirement but I'm not sure if you really need to modify the query at runtime. Unless you need reprocessing for newly added rules, I'd probably just cancel with savepoint and restart the application with the new rules. Of course, it depends on the rules themselves and how much state they require if a restart is viable. That's up to a POC. They will consume the same high intensive source(s) therefore I want to > optimize for that by consuming the messages in Flink only once. > That's why I proposed to run one big query instead of 500 small ones. Have a POC where you add two of your rules manually to a Table and see how the optimized logical plan looks like. I'd bet that the source is only tapped once. On Wed, Mar 25, 2020 at 6:15 PM Krzysztof Zarzycki wrote: > Hello Arvid, > Thanks for joining to the thread! > First, did you take into consideration that I would like to dynamically > add queries on the same source? That means first define one query, later > the day add another one , then another one, and so on. A Week later kill > one of those, start yet another one, etc... There will be hundreds of these > queries running at once, but the set of queries change several times a day. > They will consume the same high intensive source(s) therefore I want to > optimize for that by consuming the messages in Flink only once. > > Regarding the temporary tables AFAIK they are only the metadata (let's say > Kafka topic detals) and store it in the scope of a SQL session. Therefore > multiple queries against that temp table will behave the same way as > querying normal table, that is will read the datasource multiple times. > > It looks like the feature I want or could use is defined by the way of > FLIP-36 about Interactive Programming, more precisely caching the stream > table [1]. > While I wouldn't like to limit the discussion to that non-existing yet > feature. Maybe there are other ways of achieving this danymic querying > capability. > > Kind Regards, > Krzysztof > > [1] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink#FLIP-36:SupportInteractiveProgramminginFlink-Cacheastreamtable > > > > * You want to use primary Table API as that allows you to programmatically >> introduce structural variance (changing rules). >> > * You start by registering the source as temporary table. >> > * Then you add your rules as SQL through `TableEnvironment#sqlQuery`. >> * Lastly you unionAll the results. >> >> Then I'd perform some experiment if indeed the optimizer figured out that >> it needs to only read the source once. The resulting code would be minimal >> and easy to maintain. If the performance is not satisfying, you can always >> make it more complicated. >> >> Best, >> >> Arvid >> >> >> On Mon, Mar 23, 2020 at 7:02 PM Krzysztof Zarzycki >> wrote: >> >>> Dear Flink community! >>> >>> In our company we have implemented a system that realize the dynamic >>> business rules pattern. We spoke about it during Flink Forward 2019 >>> https://www.youtube.com/watch?v=CyrQ5B0exqU. >>> The system is a great success and we would like to improve it. Let me >>> shortly mention what the system does: >>> * We have a Flink job with the engine that applies business rules on >>> multiple data streams. These rules find patterns in data, produce complex >>> events on these patterns. >>> * The engine is built on top of CoProcessFunction, the rules are >>> preimplemented using state and timers. >>> * The engine accepts control messages, that deliver configuration of the >>> rules, and start the instances of the rules. There might be many rule >>> instances with different configurations running in parallel. >>> * Data streams are routed to those rules, to all instances. >>> >>> The *advantages* of this design are: >>> * *The performance is superb. *The key to it is that we read data >>> from the Kafka topic once, deserialize once, shuffle it once (thankfully we >>> have one partitioning key) and then apply over 100 rule instances needing >>> the same data. >>> * We are able to deploy multiple rule instances dynamically without >>> starting/stopping the job. >>> >>> Especially the performance is crucial, we have up to 500K events/s >>> processed by 100 of rules on less than 100 of cores. I can't imagine having >>> 100 of Flink SQL queries each consuming these streams from Kafka on such a >>> cluster. >>> >>> The main *painpoints *of the design is: >>> * to deploy new business rule kind, we need to predevelop the rule >>> template with use of our SDK. *We can't use* *great Flink CEP*, *Flink >>> SQL libraries.* Which are getting stronger every day. Flink SQL with >>> MATCH_RECOGNIZE would fit perfectly for our cases. >>> * The isolation of the rules is weak. There are many rules running per >>> job. One fails, the whole job fails. >>> * There is one set of Kafka offsets, one watermark, one checkpoint for >>> all the rules. >>> * We have one just distribution key. Although that can be overcome. >>> >>> I would like to focus on solving the *first point*. We can live with >>> the rest. >>> >>> *Question to the community*: Do you have ideas how to make it possible >>> to develop with use of Flink SQL with MATCH_RECOGNIZE? >>> >>> My current ideas are: >>> 1. *A possibility to dynamically modify the job topology. * >>> Then I imagine dynamically attaching Flink SQL jobs to the same Kafka >>> sources. >>> >>> 2. *A possibility to save data streams internally to Flink, >>> predistributed*. Then Flink SQL queries should be able to read these >>> streams. >>> >>> The ideal imaginary solution would look that simple in use: >>> CREATE TABLE my_stream(...) with (, >>> cached = 'true') >>> PARTITIONED BY my_partition_key >>> >>> (the cached table can also be a result of CREATE TABLE and INSERT INTO >>> my_stream_cached SELECT ... FROM my_stream). >>> >>> then I can run multiple parallel Flink SQL queries reading from that >>> cached table in Flink. >>> These >>> >>> Technical implementation: Ideally, I imagine saving events in Flink >>> state before they are consumed. Then implement a Flink source, that can >>> read the Flink state of the state-filling job. It's a different job, I >>> know! Of course it needs to run on the same Flink cluster. >>> A lot of options are possible: building on top of Flink, modifying Flink >>> (even keeping own fork for the time being), using an external component. >>> >>> In my opinion the key to the maximized performance are: >>> * avoid pulling data through network from Kafka >>> * avoid deserialization of messages for each of queries/ processors. >>> >>> Comments, ideas - Any feedback is welcome! >>> Thank you! >>> Krzysztof >>> >>> P.S. I'm writing to both dev and users groups because I suspect I >>> would need to modify Flink to achieve what I wrote above. >>> >> --000000000000d7096505a1b26054 Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
I saw that requirement but I'm not sure if you re= ally need to modify the query at runtime.

Unl= ess you need reprocessing for newly added rules, I'd probably just canc= el with savepoint and restart the application with the new rules. Of course= , it depends on the rules themselves and how much state they require if a r= estart is viable. That's up to a POC.

They will consume the same high i= ntensive source(s) therefore I want to=20 optimize for that by consuming the messages in Flink only once.
<= /blockquote>
That's why I proposed to run one big query instead of = 500 small ones. Have a POC where you add two of your rules manually to a Ta= ble and see how the optimized logical plan looks like. I'd bet that the= source is only tapped once.

On Wed, Mar 25, 2020 at 6:15 PM Krzysztof= Zarzycki <k.zarzycki@gmail.com<= /a>> wrote:
<= div dir=3D"ltr">
Hello Arvid,
Thanks for joining to the= thread!=C2=A0
First, did you take into consideration= that I would like to dynamically add queries on the same source? That mean= s first define one query, later the day add another one , then another one,= and so on. A Week later kill one of those, start yet another one, etc... T= here will be hundreds of these queries running at once, but the set of quer= ies change several times a day.=C2=A0
They will consume the same = high intensive source(s) therefore I want to optimize for that by consuming= the messages in Flink only once.=C2=A0

Regarding = the temporary tables AFAIK they are only the metadata (let's say Kafka= topic detals) and store it in the scope of a SQL session. Therefore multip= le queries against that temp table will behave the same way as querying nor= mal table, that is will read the datasource multiple times.=C2=A0

It looks like the feature I want or could use is= defined by the way of FLIP-36 about Interactive Programming, more precisel= y caching the stream table [1].=C2=A0
While I wouldn't like t= o limit the discussion to that non-existing yet feature. Maybe there are ot= her ways of achieving this danymic querying capability.
* You want to use primary Table API as that allows you to programmati= cally introduce structural variance (changing rules).
=
* You start by registering the source as temporary table.
<= /div>
* Then you add your rules as SQL through `TableEnvironment#s= qlQuery`.
* Lastly you unionAll the results.

=
Then I'd perform some experiment if indeed the optimizer figured o= ut that it needs to only read the source once. The resulting code would be = minimal and easy to maintain. If the performance is not satisfying, you can= always make it more complicated.

Best,
=
Arvid


On Mon, Mar 23, 2020 at 7:02 P= M Krzysztof Zarzycki <k.zarzycki@gmail.com> wrote:
Dear Flink community!=C2= =A0

In our company we have implemented a syste= m that realize the dynamic business rules pattern. We spoke about it during= Flink Forward 2019=C2=A0https://www.youtube.com/watch?v=3DCyrQ5B0exqU= .=C2=A0
The system is a great success and we would lik= e to improve it. Let me shortly mention what the system does:
* W= e have a Flink job with the engine that applies business rules on multiple = data streams. These rules find patterns in data, produce complex events on = these patterns.
* The engine is built on top of CoProcessFunction= , the rules are preimplemented using state and timers.
* The engi= ne accepts control messages, that deliver configuration of the rules, and s= tart the instances of the rules. There might be many rule instances with di= fferent configurations running in parallel.
* Data streams are ro= uted to those rules, to=C2=A0all instances.=C2=A0

= The advantages of this design are:
=C2=A0 * The perform= ance is superb. The key to it is that=C2=A0we read data from the Kafka = topic once, deserialize once, shuffle it once (thankfully we have one parti= tioning key) and then apply over 100 rule instances needing the same data.= =C2=A0=C2=A0
* We are able to deploy multiple rule instances = dynamically without starting/stopping the job.

Especially the performance is crucial, we have up to 500K events/s = processed by 100 of rules on less than 100 of cores. I can't imagine ha= ving 100 of Flink SQL queries each consuming these streams from Kafka on su= ch a cluster.

The main painpoints=C2=A0of t= he design is:
* to deploy new business rule kind, we need to = predevelop=C2=A0the rule template with use of our SDK. We can't use<= /b>=C2=A0great=C2=A0Flink CEP, Flink SQL libraries.=C2=A0Whic= h are getting stronger every day. Flink SQL with MATCH_RECOGNIZE would fit = perfectly for our cases.
* The isolation of the rules is weak. Th= ere are many rules running per job. One fails, the whole job fails.=C2=A0
* There is one set of Kafka offsets, one watermark, one checkpoint= for all the rules.=C2=A0
* We have one just distribution key. Al= though that can be overcome.

I would like to focus= on solving the first point. We can live with the rest.
Question to the community: Do you have ideas how to mak= e it possible to develop with use of Flink SQL with MATCH_RECOGNIZE?
<= div>
My current ideas are:
1. A possibility to d= ynamically modify the job topology.=C2=A0
Then I imagine dyna= mically attaching Flink SQL jobs to the same Kafka sources.=C2=A0

2. A possibility to save data streams internally to Flin= k, predistributed. Then Flink SQL queries should be able to read these = streams.=C2=A0

The ideal imaginary solu= tion would look that simple in use:=C2=A0
CREATE TABLE my_stream(= ...) with (<kafka properties>,
cached =3D 'true')
PARTITIONED BY my_partition_key

(the cach= ed table can also be a result of CREATE TABLE and INSERT INTO my_stream_cac= hed SELECT ... FROM my_stream).

the= n I can run multiple parallel Flink SQL queries reading from that cached ta= ble in Flink.=C2=A0
These=C2=A0

Technica= l implementation: Ideally, I imagine saving events in Flink state before th= ey are consumed. Then implement a Flink source, that can read the Flink sta= te of the state-filling job. It's a different job, I know! Of course it= needs to run on the same Flink cluster.=C2=A0
A lot of options a= re possible: building on top of Flink, modifying Flink (even keeping own fo= rk for the time being), using an external component.=C2=A0=C2=A0
<= div>

In my opinion the key to the maximized performance = are:
* avoid pulling data through network from Kafka
* = avoid deserialization of messages for each of queries/ processors.=C2=A0

Comments, ideas - Any feedback is w= elcome!=C2=A0
Thank you!
Krzysztof=C2=A0

=
P.S.=C2=A0 =C2=A0I'm writing to both dev and users groups be= cause I suspect I would need to modify Flink to achieve what I wrote above.= =C2=A0
--000000000000d7096505a1b26054--