flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Vishal Santoshi <vishal.santo...@gmail.com>
Subject Re: Failing to recover once checkpoint fails
Date Thu, 05 Oct 2017 12:56:25 GMT
Hello Fabian,
                      First of all congratulations on this fabulous
framework. I have worked with GDF and though GDF has some natural pluses
Flink's state management is far more advanced. With kafka as a source it
negates issues GDF has ( GDF integration with pub/sub is organic and that
is to be expected but non FIFO pub/sub is an issue with windows on event
time etc )

                   Coming back to this issue. We have that same kafka topic
feeding a streaming druid datasource and we do not see any issue there, so
so data loss on the source, kafka is not applicable. I am totally certain
that the "retention" time was not an issue. It is 4 days of retention and
we fixed this issue within 30 minutes. We could replay kafka with a new
consumer group.id and that worked fine.

Note these properties and see if they strike a chord.

* The setCommitOffsetsOnCheckpoints(boolean) for kafka consumers is the
default true. I bring this up to see whether flink will in any circumstance
drive consumption on the kafka perceived offset rather than the one in the

* The state.backend.fs.memory-threshold: 0 has not been set.  The state is
big enough though therefore IMHO no way the state is stored along with the
meta data in JM ( or ZK ? ) . The reason I bring this up is to make sure
when you say that the size has to be less than 1024bytes , you are talking
about cumulative state of the pipeine.

* We have a good sense of SP ( save point )  and CP ( checkpoint ) and
certainly understand that they actually are not dissimilar. However in this
case there were multiple attempts to restart the pipe before it finally

* Other hdfs related poperties.

 state.backend.fs.checkpointdir: hdfs:///flink-checkpoints/<%=
flink_hdfs_root %>

 state.savepoints.dir: hdfs:///flink-savepoints/<%= flink_hdfs_root %>

 recovery.zookeeper.storageDir: hdfs:///flink-recovery/<%= flink_hdfs_root %>

Do these make sense ? Is there anything else I should look at.  Please also
note that it is the second time this has happened. The first time I was
vacationing and was not privy to the state of the flink pipeline, but the
net effect were similar. The counts for the first window after an internal
restart dropped.

Thank you for you patience and regards,


On Thu, Oct 5, 2017 at 5:01 AM, Fabian Hueske <fhueske@gmail.com> wrote:

> Hi Vishal,
> window operators are always stateful because the operator needs to
> remember previously received events (WindowFunction) or intermediate
> results (ReduceFunction).
> Given the program you described, a checkpoint should include the Kafka
> consumer offset and the state of the window operator. If the program
> eventually successfully (i.e., without an error) recovered from the last
> checkpoint, all its state should have been restored. Since the last
> checkpoint was before HDFS went into safe mode, the program would have been
> reset to that point. If the Kafka retention time is less than the time it
> took to fix HDFS you would have lost data because it would have been
> removed from Kafka. If that's not the case, we need to investigate this
> further because a checkpoint recovery must not result in state loss.
> Restoring from a savepoint is not so much different from automatic
> checkpoint recovery. Given that you have a completed savepoint, you can
> restart the job from that point. The main difference is that checkpoints
> are only used for internal recovery and usually discarded once the job is
> terminated while savepoints are retained.
> Regarding your question if a failed checkpoint should cause the job to
> fail and recover I'm not sure what the current status is.
> Stefan (in CC) should know what happens if a checkpoint fails.
> Best, Fabian
> 2017-10-05 2:20 GMT+02:00 Vishal Santoshi <vishal.santoshi@gmail.com>:
>> To add to it, my pipeline is a simple
>> keyBy(0)
>>         .timeWindow(Time.of(window_size, TimeUnit.MINUTES))
>>         .allowedLateness(Time.of(late_by, TimeUnit.SECONDS))
>>         .reduce(new ReduceFunction(), new WindowFunction())
>> On Wed, Oct 4, 2017 at 8:19 PM, Vishal Santoshi <
>> vishal.santoshi@gmail.com> wrote:
>>> Hello folks,
>>> As far as I know checkpoint failure should be ignored and retried with
>>> potentially larger state. I had this situation
>>> * hdfs went into a safe mode b'coz of Name Node issues
>>> * exception was thrown
>>>     org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException):
>>> Operation category WRITE is not supported in state standby. Visit
>>> https://s.apache.org/sbnn-error
>>>     ..................
>>>     at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(Had
>>> oopFileSystem.java:453)
>>>         at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.mkdirs(S
>>> afetyNetWrapperFileSystem.java:111)
>>>         at org.apache.flink.runtime.state.filesystem.FsCheckpointStream
>>> Factory.createBasePath(FsCheckpointStreamFactory.java:132)
>>> * The pipeline came back after a few restarts and checkpoint failures,
>>> after the hdfs issues were resolved.
>>> I would not have worried about the restart, but it was evident that I
>>> lost my operator state. Either it was my kafka consumer that kept on
>>> advancing it's offset between a start and the next checkpoint failure ( a
>>> minute's worth ) or the the operator that had partial aggregates was lost.
>>> I have a 15 minute window of counts on a keyed operator
>>> I am using ROCKS DB and of course have checkpointing turned on.
>>> The questions thus are
>>> * Should a pipeline be restarted if checkpoint fails ?
>>> * Why on restart did the operator state did not recreate ?
>>> * Is the nature of the exception thrown have to do with any of this
>>> b'coz suspend and resume from a save point work as expected ?
>>> * And though I am pretty sure, are operators like the Window operator
>>> stateful by drfault and thus if I have timeWindow(Time.of(window_size,
>>> TimeUnit.MINUTES)).reduce(new ReduceFunction(), new WindowFunction()), the
>>> state is managed by flink ?
>>> Thanks.

View raw message