flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "wanglei2@geekplus.com.cn" <wangl...@geekplus.com.cn>
Subject Re: Re: Unable to restore state value after job failed using RocksDBStateBackend
Date Tue, 25 Jun 2019 13:43:04 GMT

I  start and cancel it just in my intellij idea development environment.
    
First click the run button, then click the red stop button, and then click the run button
again. 

Let me google about the savepoint.

Thanks,
Lei Wang




wanglei2@geekplus.com.cn
 
From: Stephan Ewen
Date: 2019-06-25 20:36
To: user
Subject: Re: Unable to restore state value after job failed using RocksDBStateBackend
If you manually cancel and restart the job, state is only carried forward if you use a savepoint.
Can you check if that is what you are doing?

On Tue, Jun 25, 2019 at 2:21 PM Simon Su <barley_ss@163.com> wrote:

Hi wanglei

 Can you post how you restart the job ? 

Thanks,
Simon
On 06/25/2019 20:11,wanglei2@geekplus.com.cn<wanglei2@geekplus.com.cn> wrote: 
public class StateProcessTest extends KeyedProcessFunction<String, Tuple2<String, Long>,
String> {
    
    private transient ValueState<Tuple2<Long,Long>> state;
    public void processElement(Tuple2<String, Long> value, Context ctx, Collector<String>
out) throws Exception {


        Tuple2<Long, Long> stateValue = state.value();

        if(stateValue == null){
            log.info("##########  initialize");
            stateValue = new Tuple2(34l,56l);
        }
        state.update(stateValue);
  
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        ValueStateDescriptor<Tuple2<Long,Long>> descriptor = new ValueStateDescriptor<Tuple2<Long,
Long>>("avg", TypeInformation.of(
                new TypeHint<Tuple2<Long, Long>>() {}));
        state = getRuntimeContext().getState(descriptor);
    }
}



Every time I restarted the job,   The stateValue is still null.




wanglei2@geekplus.com.cn
 
Mime
View raw message