From user-return-20833-archive-asf-public=cust-asf.ponee.io@flink.apache.org Tue Jun 26 09:07:18 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 6FDD6180636 for ; Tue, 26 Jun 2018 09:07:17 +0200 (CEST) Received: (qmail 38675 invoked by uid 500); 26 Jun 2018 07:07:11 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 38666 invoked by uid 99); 26 Jun 2018 07:07:11 -0000 Received: from mail-relay.apache.org (HELO mailrelay1-lw-us.apache.org) (207.244.88.152) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 26 Jun 2018 07:07:11 +0000 Received: from mail-ed1-f46.google.com (mail-ed1-f46.google.com [209.85.208.46]) by mailrelay1-lw-us.apache.org (ASF Mail Server at mailrelay1-lw-us.apache.org) with ESMTPSA id 418DA709 for ; Tue, 26 Jun 2018 07:07:10 +0000 (UTC) Received: by mail-ed1-f46.google.com with SMTP id w14-v6so315242eds.6 for ; Tue, 26 Jun 2018 00:07:10 -0700 (PDT) X-Gm-Message-State: APt69E2JTvHpUkq9fF4BjJFQIeGYI0CIp9CVXm8ia4a2wYYL7A7IWp3A qxfIrMcMXIHGeNRrQw5V6jrmJ9zBdaJ3Tg7qFso= X-Google-Smtp-Source: AAOMgpchu8rsdAJsLU9S8uM9XuLDBjyJ4incyaIN4V5Uqb4zHclRHWvRxxogPfFfac8mj0mqKUkapTBucLjxQDgEZFA= X-Received: by 2002:a50:e043:: with SMTP id g3-v6mr691742edl.123.1529996829188; Tue, 26 Jun 2018 00:07:09 -0700 (PDT) MIME-Version: 1.0 References: <9A7FD71D-FBCE-4775-B728-EAFB007DC0C1@coupang.com> <9B461171-1081-4883-B5DA-0727A51BCC78@coupang.com> In-Reply-To: <9B461171-1081-4883-B5DA-0727A51BCC78@coupang.com> From: Till Rohrmann Date: Tue, 26 Jun 2018 09:06:32 +0200 X-Gmail-Original-Message-ID: Message-ID: Subject: Re: Restore state from save point with add new flink sql To: james.wu@coupang.com Cc: user , Fabian Hueske , Timo Walther Content-Type: multipart/alternative; boundary="00000000000082e852056f8625f9" --00000000000082e852056f8625f9 Content-Type: text/plain; charset="UTF-8" I think so. Maybe Fabian or Timo can correct me if I'm wrong here. On Mon, Jun 25, 2018 at 9:17 AM James (Jian Wu) [FDS Data Platform] < james.wu@coupang.com> wrote: > Hi Till: > > > > Thanks for your answer, so if I just add new sql and not modified old sql > then use `/`--allowNonRestoredState option to restart job can resume old > sql state from savepoints? > > > > Regards > > > > James > > > > *From: *Till Rohrmann > *Date: *Friday, June 15, 2018 at 8:13 PM > *To: *"James (Jian Wu) [FDS Data Platform]" > *Cc: *user , Fabian Hueske , > Timo Walther > *Subject: *Re: Restore state from save point with add new flink sql > > > > Hi James, > > > > as long as you do not change anything for `sql1`, it should work to > recover from a savepoint if you pass the `-n`/`--allowNonRestoredState` > option to the CLI when resuming your program from the savepoint. The reason > is that an operators generated uid depends on the operator and on its > inputs. > > > > I've also pulled in Fabian and Timo who will be able to tell you a little > bit more about the job modification support for stream SQL. > > > > Cheers, > Till > > > > On Fri, Jun 15, 2018 at 9:59 AM James (Jian Wu) [FDS Data Platform] < > james.wu@coupang.com> wrote: > > *Hi:* > > > > * My application use flink sql, I want to add new sql to the > application, * > > > > *For example first version is* > > > > DataStream paymentCompleteStream = *getKafkaStream*(env, > bootStrapServers, kafkaGroup, orderPaymentCompleteTopic) > .flatMap(new > PaymentComplete2AggregatedOrderItemFlatMap()).assignTimestampsAndWatermarks(wmAssigner2).setParallelism(30) > .returns(TypeInformation.*of*(AggregatedOrderItems.class)); > > tableEnv.registerDataStream("AggregatedOrderItems", paymentCompleteStream, > *concatFieldsName*(AggregatedOrderItems.class, true, "eventTs")); > > tableEnv.registerFunction("group_concat", new GroupConcatFunction()); > > Table resultTable = tableEnv.sqlQuery(*sql1*); > tableEnv.toAppendStream(resultTable, Row.class, qConfig) > .flatMap(new > E5FlatmapFunction(resultSampleRate)).setParallelism(30) > .filter(new FilterFunction() { > @Override > public boolean filter(DetectionResult value) throws Exception { > return (value.getViolationCount() >= 5); > } > }).addSink(new DetectionResultMySqlSink()); > > > > *Then second version, I add new sql* > > > > Table resultTable2 = tableEnv.sqlQuery(*sql2*); > tableEnv.toAppendStream(resultTable2, Row.class, qConfig) > .flatMap(new > A2FlatmapFunction(resultSampleRate)).setParallelism(30) > .filter(new FilterFunction() { > @Override > public boolean filter(DetectionResult value) throws Exception { > return (value.getViolationCount() >= 5); > } > }).addSink(new DetectionResultMySqlSink()); > > > > *After restart job with savepoints, whether the original flink sql can be > restore success? Whether the flink will assign a new UID to original sql > operator? (I will not change the original sql)* > > > > *Regards* > > > > *James* > > > > --00000000000082e852056f8625f9 Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
I think so. Maybe Fabian or Timo can correct me if I'm= wrong here.

On Mon, J= un 25, 2018 at 9:17 AM James (Jian Wu) [FDS Data Platform] <james.wu@coupang.com> wrote:

Hi Till:

=C2=A0

Thanks for your answer= , so if I just add new sql and not modified old sql then use `/`--allowNonR= estoredState option to restart job can resume old sql state from savepoints= ?

=C2=A0

Regards<= /p>

=C2=A0

James

=C2=A0

From= : Till Rohrmann <= ;trohrmann@apache= .org>
Date: Friday, June 15, 2018 at 8:13 PM
To: "James (Jian Wu) [FDS Data Platform]" <james.wu@coupang.com><= br> Cc: user <user@flink.apache.org>, Fabian Hueske <fhueske@apache.org>, Timo Walther = <twalthr@apache.= org>
Subject: Re: Restore state from save point with add new flink sql=

=C2=A0

Hi James,

=C2=A0

as long as you do not change anything for `sql1`, it= should work to recover from a savepoint if you pass the `-n`/`--allowNonRe= storedState` option to the CLI when resuming your program from the savepoin= t. The reason is that an operators generated uid depends on the operator and on its inputs.

=C2=A0

I've also pulled in Fabian and Timo who will be = able to tell you a little bit more about the job modification support for s= tream SQL.

=C2=A0

Cheers,
Till

=C2=A0

On Fri, Jun 15, 2018 at 9:59 AM James (Jian Wu) [FDS= Data Platform] <james.wu@coupang.com> wrote:

Hi:

=C2=A0

=C2=A0=C2=A0 My application use flink sql, I want= to add new sql to the application,

=C2=A0

For example first version is

=C2=A0

DataStream<AggregatedOrderItems> paymentCompleteStream =3D getKafk= aStream(env, bootStrapServers, kafkaGroup, orderPaymentCompleteTopic) =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 .flatMap(new PaymentComplete2Agg= regatedOrderItemFlatMap()).assignTimestampsAndWatermarks(wmAssigner2).setPa= rallelism(30)
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 .returns(TypeInformation.of(AggregatedOrderItems.class));

tableEnv.registerDataStream("AggregatedOrderItems", paymentComple= teStream, concatFieldsName(AggregatedOrderItems.class, true, "e= ventTs"));

tableEnv.registerFunction("group_concat", new GroupConcatFunction= ());

Table resultTable =3D tableEnv.sqlQuery(sql1);
tableEnv.toAppendStream(resultTable, Row.class, qConfig)
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 .flatMap(new E5FlatmapFunction(r= esultSampleRate)).setParallelism(30)
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 .filter(new FilterFunction<De= tectionResult>() {
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 @Overrid= e
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 public b= oolean filter(DetectionResult value) throws Exception {
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2= =A0=C2=A0=C2=A0return (value.getViolationCount() >=3D 5);
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 }
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 }).addSink(new DetectionResultMy= SqlSink());

=C2=A0

Then second version, I add new sql

=C2=A0

Table resultTable2 =3D tableEnv.sqlQuery(sql2);
tableEnv.toAppendStream(resultTable2, Row.class, qConfig)
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 .flatMap(new A2FlatmapFunction(r= esultSampleRate)).setParallelism(30)
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 .filter(new FilterFunction<De= tectionResult>() {
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 @Overrid= e
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 public b= oolean filter(DetectionResult value) throws Exception {
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2= =A0=C2=A0=C2=A0 return (value.getViolationCount() >=3D 5);
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 }
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 }).addSink(new DetectionResultMy= SqlSink());

=C2=A0

After restart job with savepoints, whether the original flink sql can be= restore success? Whether the flink will assign a new UID to original sql o= perator? (I will not change the original sql)

=C2=A0

Regards

=C2=A0

James

=C2=A0

--00000000000082e852056f8625f9--