flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: Failing to recover once checkpoint fails
Date Tue, 23 Jan 2018 12:25:40 GMT
Hi Vishal,

I think you might be right. We fixed the problem that checkpoints where dropped via https://issues.apache.org/jira/browse/FLINK-7783
<https://issues.apache.org/jira/browse/FLINK-7783>. However, we still have the problem
that if the DFS is not up at all then it will look as if the job is starting from scratch.
However, the alternative is failing the job, in which case you will also never be able to
restore from a checkpoint. What do you think?

Best,
Aljoscha

> On 23. Jan 2018, at 10:15, Fabian Hueske <fhueske@gmail.com> wrote:
> 
> Sorry for the late reply.
> 
> I created FLINK-8487 [1] to track this problem
> 
> @Vishal, can you have a look and check if if forgot some details? I logged the issue
for Flink 1.3.2, is that correct?
> Please add more information if you think it is relevant.
> 
> Thanks,
> Fabian
> 
> [1] https://issues.apache.org/jira/browse/FLINK-8487 <https://issues.apache.org/jira/browse/FLINK-8487>
> 
> 2018-01-18 22:14 GMT+01:00 Vishal Santoshi <vishal.santoshi@gmail.com <mailto:vishal.santoshi@gmail.com>>:
> Or this one 
> 
> https://issues.apache.org/jira/browse/FLINK-4815 <https://issues.apache.org/jira/browse/FLINK-4815>
> 
> On Thu, Jan 18, 2018 at 4:13 PM, Vishal Santoshi <vishal.santoshi@gmail.com <mailto:vishal.santoshi@gmail.com>>
wrote:
> ping. 
> 
>     This happened again on production and it seems reasonable to abort when a checkpoint
is not found rather than behave as if it is a brand new pipeline.  
> 
> On Tue, Jan 16, 2018 at 9:33 AM, Vishal Santoshi <vishal.santoshi@gmail.com <mailto:vishal.santoshi@gmail.com>>
wrote:
> Folks sorry for being late on this. Can some body with the knowledge of this code base
create a jira issue for the above ? We have seen this more than once on production.
> 
> On Mon, Oct 9, 2017 at 10:21 AM, Aljoscha Krettek <aljoscha@apache.org <mailto:aljoscha@apache.org>>
wrote:
> Hi Vishal,
> 
> Some relevant Jira issues for you are:
> 
>  - https://issues.apache.org/jira/browse/FLINK-4808: <https://issues.apache.org/jira/browse/FLINK-4808:>
Allow skipping failed checkpoints
>  - https://issues.apache.org/jira/browse/FLINK-4815: <https://issues.apache.org/jira/browse/FLINK-4815:>
Automatic fallback to earlier checkpoint when checkpoint restore fails
>  - https://issues.apache.org/jira/browse/FLINK-7783: <https://issues.apache.org/jira/browse/FLINK-7783:>
Don't always remove checkpoints in ZooKeeperCompletedCheckpointStore#recover()
> 
> Best,
> Aljoscha
> 
> 
>> On 9. Oct 2017, at 09:06, Fabian Hueske <fhueske@gmail.com <mailto:fhueske@gmail.com>>
wrote:
>> 
>> Hi Vishal,
>> 
>> it would be great if you could create a JIRA ticket with Blocker priority.
>> Please add all relevant information of your detailed analysis, add a link to this
email thread (see [1] for the web archive of the mailing list), and post the id of the JIRA
issue here.
>> 
>> Thanks for looking into this!
>> 
>> Best regards,
>> Fabian
>> 
>> [1] https://lists.apache.org/list.html?user@flink.apache.org <https://lists.apache.org/list.html?user@flink.apache.org>
>> 
>> 2017-10-06 15:59 GMT+02:00 Vishal Santoshi <vishal.santoshi@gmail.com <mailto:vishal.santoshi@gmail.com>>:
>> Thank you for confirming. 
>>        
>> 
>>  I think this is a critical bug. In essence any checkpoint store ( hdfs/S3/File)
 will loose state if it is unavailable at resume. This becomes all the more painful with your
confirming that  "failed checkpoints killing the job"  b'coz essentially it mean that if remote
store in unavailable  during checkpoint than you have lost state ( till of course you have
a retry of none or an unbounded retry delay, a delay that you hope the store revives in )
.. Remember  the first retry failure  will cause new state according the code as written iff
the remote store is down. We would rather have a configurable property that establishes  our
desire to abort something like a "abort_retry_on_chkretrevalfailure"
>> 
>> 
>> In our case it is very important that we do not undercount a window, one reason we
use flink and it's awesome failure guarantees, as various alarms sound ( we do anomaly detection
on the time series ).
>> 
>> Please create a jira ticket for us to follow or we could do it.
>> 
>> 
>> PS Not aborting on checkpointing, till a configurable limit is very important too.
>> 
>> 
>> On Fri, Oct 6, 2017 at 2:36 AM, Aljoscha Krettek <aljoscha@apache.org <mailto:aljoscha@apache.org>>
wrote:
>> Hi Vishal,
>> 
>> I think you're right! And thanks for looking into this so deeply. 
>> 
>> With your last mail your basically saying, that the checkpoint could not be restored
because your HDFS was temporarily down. If Flink had not deleted that checkpoint it might
have been possible to restore it at a later point, right?
>> 
>> Regarding failed checkpoints killing the job: yes, this is currently the expected
behaviour but there are plans to change this.
>> 
>> Best,
>> Aljoscha
>> 
>>> On 5. Oct 2017, at 17:40, Vishal Santoshi <vishal.santoshi@gmail.com <mailto:vishal.santoshi@gmail.com>>
wrote:
>>> 
>>> I think this is the offending piece. There is a catch all Exception, which IMHO
should understand a recoverable exception from an unrecoverable on. 
>>> 
>>> 
>>> 			try {
>>> 				completedCheckpoint = retrieveCompletedCheckpoint(checkpointStateHandle);
>>> 				if (completedCheckpoint != null) {
>>> 					completedCheckpoints.add(completedCheckpoint);
>>> 				}
>>> 			} catch (Exception e) {
>>> 				LOG.warn("Could not retrieve checkpoint. Removing it from the completed "
+
>>> 					"checkpoint store.", e);
>>> 
>>> 				// remove the checkpoint with broken state handle
>>> 				removeBrokenStateHandle(checkpointStateHandle.f1, checkpointStateHandle.f0);
>>> 			}
>>> 
>>> 
>>> 
>>> On Thu, Oct 5, 2017 at 10:57 AM, Vishal Santoshi <vishal.santoshi@gmail.com
<mailto:vishal.santoshi@gmail.com>> wrote:
>>> So this is the issue and tell us that it is wrong. ZK had some state ( backed
by hdfs )  that referred to a checkpoint ( the same exact last successful checkpoint that
was successful before NN screwed us ). When the JM tried to recreate the state and b'coz NN
was down failed to retrieve the CHK handle from hdfs  and conveniently ( and I think very
 wrongly ) removed the CHK from being considered and cleaned the pointer ( though failed as
was NN was down and is obvious from the dangling file in recovery ) . The metadata itself
was on hdfs and failure in retrieving should have been a stop all, not going to trying doing
magic exception rather than starting from a blank state.
>>> 
>>> org.apache.flink.util.FlinkException: Could not retrieve checkpoint 44286 from
state handle under /0000000000000044286. This indicates that the retrieved state handle is
broken. Try cleaning the state handle store.
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> On Thu, Oct 5, 2017 at 10:13 AM, Vishal Santoshi <vishal.santoshi@gmail.com
<mailto:vishal.santoshi@gmail.com>> wrote:
>>> Also note that  the zookeeper recovery did  ( sadly on the same hdfs cluster
) also showed the same behavior. It had the pointers to the chk point  ( I  think that is
what it does, keeps metadata of where the checkpoint etc  ) .  It too decided to keep the
recovery file from the failed state.
>>> -rw-r--r--   3 root hadoop       7041 2017-10-04 13:55 /flink-recovery/prod/completedCheckpoint6c9096bb9ed4
>>> 
>>> -rw-r--r--   3 root hadoop       7044 2017-10-05 10:07 /flink-recovery/prod/completedCheckpoint7c5a19300092
>>> 
>>> This is getting a little interesting. What say you :)
>>> 
>>> 
>>> 
>>> 
>>> On Thu, Oct 5, 2017 at 9:26 AM, Vishal Santoshi <vishal.santoshi@gmail.com
<mailto:vishal.santoshi@gmail.com>> wrote:
>>> Another thing I noted was this thing
>>> 
>>> drwxr-xr-x   - root hadoop          0 2017-10-04 13:54 /flink-checkpoints/prod/c4af8dfa864e2f9a51764de9f0725b39/chk-44286
>>> 
>>> drwxr-xr-x   - root hadoop          0 2017-10-05 09:15 /flink-checkpoints/prod/c4af8dfa864e2f9a51764de9f0725b39/chk-45428
>>> 
>>> 
>>> 
>>> Generally what Flink does IMHO is that it replaces the chk point directory with
a new one. I see it happening now. Every minute it replaces the old directory.  In this job's
case however, it did not delete the 2017-10-04 13:54  and hence the chk-44286 directory. 
This was the last chk-44286 (  I think  )  successfully created before NN had issues but as
is usual did not delete this  chk-44286. It looks as if it started with a blank slate ????????
Does this strike a chord ?????
>>> 
>>> 
>>> On Thu, Oct 5, 2017 at 8:56 AM, Vishal Santoshi <vishal.santoshi@gmail.com
<mailto:vishal.santoshi@gmail.com>> wrote:
>>> Hello Fabian, 
>>>                       First of all congratulations on this fabulous framework.
I have worked with GDF and though GDF has some natural pluses Flink's state management is
far more advanced. With kafka as a source it negates issues GDF has ( GDF integration with
pub/sub is organic and that is to be expected but non FIFO pub/sub is an issue with windows
on event time etc )
>>> 
>>>                    Coming back to this issue. We have that same kafka topic feeding
a streaming druid datasource and we do not see any issue there, so so data loss on the source,
kafka is not applicable. I am totally certain that the "retention" time was not an issue.
It is 4 days of retention and we fixed this issue within 30 minutes. We could replay kafka
with a new consumer group.id <http://group.id/> and that worked fine. 
>>> 
>>> 
>>> Note these properties and see if they strike a chord.
>>> 
>>> * The setCommitOffsetsOnCheckpoints(boolean) for kafka consumers is the default
true. I bring this up to see whether flink will in any circumstance drive consumption on the
kafka perceived offset rather than the one in the checkpoint.
>>> 
>>> * The state.backend.fs.memory-threshold: 0 has not been set.  The state is big
enough though therefore IMHO no way the state is stored along with the meta data in JM ( or
ZK ? ) . The reason I bring this up is to make sure when you say that the size has to be less
than 1024bytes , you are talking about cumulative state of the pipeine.
>>> 
>>> * We have a good sense of SP ( save point )  and CP ( checkpoint ) and certainly
understand that they actually are not dissimilar. However in this case there were multiple
attempts to restart the pipe before it finally succeeded. 
>>> 
>>> * Other hdfs related poperties.
>>>  
>>>  state.backend.fs.checkpointdir: hdfs:///flink-checkpoints <>/<%= flink_hdfs_root
%>
>>>  state.savepoints.dir: hdfs:///flink-savepoints <>/<%= flink_hdfs_root
%>
>>>  recovery.zookeeper.storageDir: hdfs:///flink-recovery <>/<%= flink_hdfs_root
%>
>>> 
>>> 
>>> Do these make sense ? Is there anything else I should look at.  Please also note
that it is the second time this has happened. The first time I was vacationing and was not
privy to the state of the flink pipeline, but the net effect were similar. The counts for
the first window after an internal restart dropped. 
>>> 
>>> 
>>> 
>>> 
>>> Thank you for you patience and regards,
>>> 
>>> Vishal
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> On Thu, Oct 5, 2017 at 5:01 AM, Fabian Hueske <fhueske@gmail.com <mailto:fhueske@gmail.com>>
wrote:
>>> Hi Vishal,
>>> 
>>> window operators are always stateful because the operator needs to remember previously
received events (WindowFunction) or intermediate results (ReduceFunction).
>>> Given the program you described, a checkpoint should include the Kafka consumer
offset and the state of the window operator. If the program eventually successfully (i.e.,
without an error) recovered from the last checkpoint, all its state should have been restored.
Since the last checkpoint was before HDFS went into safe mode, the program would have been
reset to that point. If the Kafka retention time is less than the time it took to fix HDFS
you would have lost data because it would have been removed from Kafka. If that's not the
case, we need to investigate this further because a checkpoint recovery must not result in
state loss.
>>> 
>>> Restoring from a savepoint is not so much different from automatic checkpoint
recovery. Given that you have a completed savepoint, you can restart the job from that point.
The main difference is that checkpoints are only used for internal recovery and usually discarded
once the job is terminated while savepoints are retained. 
>>> 
>>> Regarding your question if a failed checkpoint should cause the job to fail and
recover I'm not sure what the current status is.
>>> Stefan (in CC) should know what happens if a checkpoint fails.
>>> 
>>> Best, Fabian
>>> 
>>> 2017-10-05 2:20 GMT+02:00 Vishal Santoshi <vishal.santoshi@gmail.com <mailto:vishal.santoshi@gmail.com>>:
>>> To add to it, my pipeline is a simple 
>>> 
>>> keyBy(0)
>>>         .timeWindow(Time.of(window_size, TimeUnit.MINUTES))
>>>         .allowedLateness(Time.of(late_by, TimeUnit.SECONDS))
>>>         .reduce(new ReduceFunction(), new WindowFunction())
>>> 
>>> On Wed, Oct 4, 2017 at 8:19 PM, Vishal Santoshi <vishal.santoshi@gmail.com
<mailto:vishal.santoshi@gmail.com>> wrote:
>>> Hello folks,
>>> 
>>> As far as I know checkpoint failure should be ignored and retried with potentially
larger state. I had this situation
>>> 
>>> * hdfs went into a safe mode b'coz of Name Node issues
>>> * exception was thrown
>>> 
>>>     org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException):
Operation category WRITE is not supported in state standby. Visit https://s.apache.org/sbnn-error
<https://s.apache.org/sbnn-error>
>>>     ..................
>>> 
>>>     at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:453)
>>>         at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem.java:111)
>>>         at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.createBasePath(FsCheckpointStreamFactory.java:132)
>>> 
>>> * The pipeline came back after a few restarts and checkpoint failures, after
the hdfs issues were resolved.
>>> 
>>> I would not have worried about the restart, but it was evident that I lost my
operator state. Either it was my kafka consumer that kept on advancing it's offset between
a start and the next checkpoint failure ( a minute's worth ) or the the operator that had
partial aggregates was lost. I have a 15 minute window of counts on a keyed operator
>>> 
>>> I am using ROCKS DB and of course have checkpointing turned on.
>>> 
>>> The questions thus are
>>> 
>>> * Should a pipeline be restarted if checkpoint fails ?
>>> * Why on restart did the operator state did not recreate ?
>>> * Is the nature of the exception thrown have to do with any of this b'coz suspend
and resume from a save point work as expected ?
>>> * And though I am pretty sure, are operators like the Window operator stateful
by drfault and thus if I have timeWindow(Time.of(window_size, TimeUnit.MINUTES)).reduce(new
ReduceFunction(), new WindowFunction()), the state is managed by flink ?
>>> 
>>> Thanks.
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>> 
>> 
>> 
> 
> 
> 
> 
> 


Mime
View raw message