flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@gmail.com>
Subject Re: Failing to recover once checkpoint fails
Date Thu, 05 Oct 2017 09:01:50 GMT
Hi Vishal,

window operators are always stateful because the operator needs to remember
previously received events (WindowFunction) or intermediate results
Given the program you described, a checkpoint should include the Kafka
consumer offset and the state of the window operator. If the program
eventually successfully (i.e., without an error) recovered from the last
checkpoint, all its state should have been restored. Since the last
checkpoint was before HDFS went into safe mode, the program would have been
reset to that point. If the Kafka retention time is less than the time it
took to fix HDFS you would have lost data because it would have been
removed from Kafka. If that's not the case, we need to investigate this
further because a checkpoint recovery must not result in state loss.

Restoring from a savepoint is not so much different from automatic
checkpoint recovery. Given that you have a completed savepoint, you can
restart the job from that point. The main difference is that checkpoints
are only used for internal recovery and usually discarded once the job is
terminated while savepoints are retained.

Regarding your question if a failed checkpoint should cause the job to fail
and recover I'm not sure what the current status is.
Stefan (in CC) should know what happens if a checkpoint fails.

Best, Fabian

2017-10-05 2:20 GMT+02:00 Vishal Santoshi <vishal.santoshi@gmail.com>:

> To add to it, my pipeline is a simple
> keyBy(0)
>         .timeWindow(Time.of(window_size, TimeUnit.MINUTES))
>         .allowedLateness(Time.of(late_by, TimeUnit.SECONDS))
>         .reduce(new ReduceFunction(), new WindowFunction())
> On Wed, Oct 4, 2017 at 8:19 PM, Vishal Santoshi <vishal.santoshi@gmail.com
> > wrote:
>> Hello folks,
>> As far as I know checkpoint failure should be ignored and retried with
>> potentially larger state. I had this situation
>> * hdfs went into a safe mode b'coz of Name Node issues
>> * exception was thrown
>>     org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException):
>> Operation category WRITE is not supported in state standby. Visit
>> https://s.apache.org/sbnn-error
>>     ..................
>>     at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(Had
>> oopFileSystem.java:453)
>>         at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.mkdirs(S
>> afetyNetWrapperFileSystem.java:111)
>>         at org.apache.flink.runtime.state.filesystem.FsCheckpointStream
>> Factory.createBasePath(FsCheckpointStreamFactory.java:132)
>> * The pipeline came back after a few restarts and checkpoint failures,
>> after the hdfs issues were resolved.
>> I would not have worried about the restart, but it was evident that I
>> lost my operator state. Either it was my kafka consumer that kept on
>> advancing it's offset between a start and the next checkpoint failure ( a
>> minute's worth ) or the the operator that had partial aggregates was lost.
>> I have a 15 minute window of counts on a keyed operator
>> I am using ROCKS DB and of course have checkpointing turned on.
>> The questions thus are
>> * Should a pipeline be restarted if checkpoint fails ?
>> * Why on restart did the operator state did not recreate ?
>> * Is the nature of the exception thrown have to do with any of this b'coz
>> suspend and resume from a save point work as expected ?
>> * And though I am pretty sure, are operators like the Window operator
>> stateful by drfault and thus if I have timeWindow(Time.of(window_size,
>> TimeUnit.MINUTES)).reduce(new ReduceFunction(), new WindowFunction()), the
>> state is managed by flink ?
>> Thanks.

View raw message