flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Hequn Cheng <chenghe...@gmail.com>
Subject Re: Restore state from save point with add new flink sql
Date Tue, 26 Jun 2018 12:35:16 GMT
Hi

I'm not sure about the answer. I have a feeling that if we only add new
code below the old code(i.e., append new code after old code), the uid will
not be changed.

On Tue, Jun 26, 2018 at 3:06 PM, Till Rohrmann <trohrmann@apache.org> wrote:

> I think so. Maybe Fabian or Timo can correct me if I'm wrong here.
>
> On Mon, Jun 25, 2018 at 9:17 AM James (Jian Wu) [FDS Data Platform] <
> james.wu@coupang.com> wrote:
>
>> Hi Till:
>>
>>
>>
>> Thanks for your answer, so if I just add new sql and not modified old sql
>> then use `/`--allowNonRestoredState option to restart job can resume old
>> sql state from savepoints?
>>
>>
>>
>> Regards
>>
>>
>>
>> James
>>
>>
>>
>> *From: *Till Rohrmann <trohrmann@apache.org>
>> *Date: *Friday, June 15, 2018 at 8:13 PM
>> *To: *"James (Jian Wu) [FDS Data Platform]" <james.wu@coupang.com>
>> *Cc: *user <user@flink.apache.org>, Fabian Hueske <fhueske@apache.org>,
>> Timo Walther <twalthr@apache.org>
>> *Subject: *Re: Restore state from save point with add new flink sql
>>
>>
>>
>> Hi James,
>>
>>
>>
>> as long as you do not change anything for `sql1`, it should work to
>> recover from a savepoint if you pass the `-n`/`--allowNonRestoredState`
>> option to the CLI when resuming your program from the savepoint. The reason
>> is that an operators generated uid depends on the operator and on its
>> inputs.
>>
>>
>>
>> I've also pulled in Fabian and Timo who will be able to tell you a little
>> bit more about the job modification support for stream SQL.
>>
>>
>>
>> Cheers,
>> Till
>>
>>
>>
>> On Fri, Jun 15, 2018 at 9:59 AM James (Jian Wu) [FDS Data Platform] <
>> james.wu@coupang.com> wrote:
>>
>> *Hi:*
>>
>>
>>
>> *   My application use flink sql, I want to add new sql to the
>> application, *
>>
>>
>>
>> *For example first version is*
>>
>>
>>
>> DataStream<AggregatedOrderItems> paymentCompleteStream = *getKafkaStream*(env,
>> bootStrapServers, kafkaGroup, orderPaymentCompleteTopic)
>>         .flatMap(new PaymentComplete2AggregatedOrderItemFlatMap()).
>> assignTimestampsAndWatermarks(wmAssigner2).setParallelism(30)
>>         .returns(TypeInformation.*of*(AggregatedOrderItems.class));
>>
>> tableEnv.registerDataStream("AggregatedOrderItems",
>> paymentCompleteStream, *concatFieldsName*(AggregatedOrderItems.class,
>> true, "eventTs"));
>>
>> tableEnv.registerFunction("group_concat", new GroupConcatFunction());
>>
>> Table resultTable = tableEnv.sqlQuery(*sql1*);
>> tableEnv.toAppendStream(resultTable, Row.class, qConfig)
>>         .flatMap(new E5FlatmapFunction(resultSampleRate)).
>> setParallelism(30)
>>         .filter(new FilterFunction<DetectionResult>() {
>>             @Override
>>             public boolean filter(DetectionResult value) throws Exception
>> {
>>                return (value.getViolationCount() >= 5);
>>             }
>>         }).addSink(new DetectionResultMySqlSink());
>>
>>
>>
>> *Then second version, I add new sql*
>>
>>
>>
>> Table resultTable2 = tableEnv.sqlQuery(*sql2*);
>> tableEnv.toAppendStream(resultTable2, Row.class, qConfig)
>>         .flatMap(new A2FlatmapFunction(resultSampleRate)).
>> setParallelism(30)
>>         .filter(new FilterFunction<DetectionResult>() {
>>             @Override
>>             public boolean filter(DetectionResult value) throws Exception
>> {
>>                 return (value.getViolationCount() >= 5);
>>             }
>>         }).addSink(new DetectionResultMySqlSink());
>>
>>
>>
>> *After restart job with savepoints, whether the original flink sql can be
>> restore success? Whether the flink will assign a new UID to original sql
>> operator? (I will not change the original sql)*
>>
>>
>>
>> *Regards*
>>
>>
>>
>> *James*
>>
>>
>>
>>

Mime
View raw message