flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@gmail.com>
Subject Re: Failing to recover once checkpoint fails
Date Tue, 23 Jan 2018 09:15:11 GMT
Sorry for the late reply.

I created FLINK-8487 [1] to track this problem

@Vishal, can you have a look and check if if forgot some details? I logged
the issue for Flink 1.3.2, is that correct?
Please add more information if you think it is relevant.

Thanks,
Fabian

[1] https://issues.apache.org/jira/browse/FLINK-8487

2018-01-18 22:14 GMT+01:00 Vishal Santoshi <vishal.santoshi@gmail.com>:

> Or this one
>
> https://issues.apache.org/jira/browse/FLINK-4815
>
> On Thu, Jan 18, 2018 at 4:13 PM, Vishal Santoshi <
> vishal.santoshi@gmail.com> wrote:
>
>> ping.
>>
>>     This happened again on production and it seems reasonable to abort
>> when a checkpoint is not found rather than behave as if it is a brand new
>> pipeline.
>>
>> On Tue, Jan 16, 2018 at 9:33 AM, Vishal Santoshi <
>> vishal.santoshi@gmail.com> wrote:
>>
>>> Folks sorry for being late on this. Can some body with the knowledge of
>>> this code base create a jira issue for the above ? We have seen this more
>>> than once on production.
>>>
>>> On Mon, Oct 9, 2017 at 10:21 AM, Aljoscha Krettek <aljoscha@apache.org>
>>> wrote:
>>>
>>>> Hi Vishal,
>>>>
>>>> Some relevant Jira issues for you are:
>>>>
>>>>  - https://issues.apache.org/jira/browse/FLINK-4808: Allow skipping
>>>> failed checkpoints
>>>>  - https://issues.apache.org/jira/browse/FLINK-4815: Automatic
>>>> fallback to earlier checkpoint when checkpoint restore fails
>>>>  - https://issues.apache.org/jira/browse/FLINK-7783: Don't always
>>>> remove checkpoints in ZooKeeperCompletedCheckpointStore#recover()
>>>>
>>>> Best,
>>>> Aljoscha
>>>>
>>>>
>>>> On 9. Oct 2017, at 09:06, Fabian Hueske <fhueske@gmail.com> wrote:
>>>>
>>>> Hi Vishal,
>>>>
>>>> it would be great if you could create a JIRA ticket with Blocker
>>>> priority.
>>>> Please add all relevant information of your detailed analysis, add a
>>>> link to this email thread (see [1] for the web archive of the mailing
>>>> list), and post the id of the JIRA issue here.
>>>>
>>>> Thanks for looking into this!
>>>>
>>>> Best regards,
>>>> Fabian
>>>>
>>>> [1] https://lists.apache.org/list.html?user@flink.apache.org
>>>>
>>>> 2017-10-06 15:59 GMT+02:00 Vishal Santoshi <vishal.santoshi@gmail.com>:
>>>>
>>>>> Thank you for confirming.
>>>>>
>>>>>
>>>>>  I think this is a critical bug. In essence any checkpoint store (
>>>>> hdfs/S3/File)  will loose state if it is unavailable at resume. This
>>>>> becomes all the more painful with your confirming that  "failed
>>>>> checkpoints killing the job"  b'coz essentially it mean that if remote
>>>>> store in unavailable  during checkpoint than you have lost state ( till
of
>>>>> course you have a retry of none or an unbounded retry delay, a delay
that
>>>>> you *hope* the store revives in ) .. Remember  the first retry
>>>>> failure  will cause new state according the code as written iff the remote
>>>>> store is down. We would rather have a configurable property that
>>>>> establishes  our desire to abort something like a
>>>>> "abort_retry_on_chkretrevalfailure"
>>>>>
>>>>>
>>>>> In our case it is very important that we do not undercount a window,
>>>>> one reason we use flink and it's awesome failure guarantees, as various
>>>>> alarms sound ( we do anomaly detection on the time series ).
>>>>>
>>>>> Please create a jira ticket for us to follow or we could do it.
>>>>>
>>>>>
>>>>> PS Not aborting on checkpointing, till a configurable limit is very
>>>>> important too.
>>>>>
>>>>>
>>>>> On Fri, Oct 6, 2017 at 2:36 AM, Aljoscha Krettek <aljoscha@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Hi Vishal,
>>>>>>
>>>>>> I think you're right! And thanks for looking into this so deeply.
>>>>>>
>>>>>> With your last mail your basically saying, that the checkpoint could
>>>>>> not be restored because your HDFS was temporarily down. If Flink
had not
>>>>>> deleted that checkpoint it might have been possible to restore it
at a
>>>>>> later point, right?
>>>>>>
>>>>>> Regarding failed checkpoints killing the job: yes, this is currently
>>>>>> the expected behaviour but there are plans to change this.
>>>>>>
>>>>>> Best,
>>>>>> Aljoscha
>>>>>>
>>>>>> On 5. Oct 2017, at 17:40, Vishal Santoshi <vishal.santoshi@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>> I think this is the offending piece. There is a catch all Exception,
>>>>>> which IMHO should understand a recoverable exception from an unrecoverable
>>>>>> on.
>>>>>>
>>>>>>
>>>>>> try {
>>>>>> completedCheckpoint = retrieveCompletedCheckpoint(ch
>>>>>> eckpointStateHandle);
>>>>>> if (completedCheckpoint != null) {
>>>>>> completedCheckpoints.add(completedCheckpoint);
>>>>>> }
>>>>>> } catch (Exception e) {
>>>>>> LOG.warn("Could not retrieve checkpoint. Removing it from the
>>>>>> completed " +
>>>>>> "checkpoint store.", e);
>>>>>> // remove the checkpoint with broken state handle
>>>>>> removeBrokenStateHandle(checkpointStateHandle.f1,
>>>>>> checkpointStateHandle.f0);
>>>>>> }
>>>>>>
>>>>>> On Thu, Oct 5, 2017 at 10:57 AM, Vishal Santoshi <
>>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>>
>>>>>>> So this is the issue and tell us that it is wrong. ZK had some
state
>>>>>>> ( backed by hdfs ) that referred to a checkpoint ( the same exact
last
>>>>>>> successful checkpoint that was successful before NN screwed us
). When the
>>>>>>> JM tried to recreate the state and b'coz NN was down failed to
retrieve the
>>>>>>> CHK handle from hdfs and conveniently ( and I think very wrongly
) removed
>>>>>>> the CHK from being considered and cleaned the pointer ( though
failed as
>>>>>>> was NN was down and is obvious from the dangling file in recovery
) . The
>>>>>>> metadata itself was on hdfs and failure in retrieving should
have been a
>>>>>>> stop all, not going to trying doing magic exception rather than
starting
>>>>>>> from a blank state.
>>>>>>>
>>>>>>> org.apache.flink.util.FlinkException: Could not retrieve checkpoint
>>>>>>> 44286 from state handle under /0000000000000044286. This indicates
that the
>>>>>>> retrieved state handle is broken. Try cleaning the state handle
store.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Oct 5, 2017 at 10:13 AM, Vishal Santoshi <
>>>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>>>
>>>>>>>> Also note that  the zookeeper recovery did  ( sadly on the
same
>>>>>>>> hdfs cluster ) also showed the same behavior. It had the
pointers to the
>>>>>>>> chk point  ( I  think that is what it does, keeps metadata
of where the
>>>>>>>> checkpoint etc  ) .  It too decided to keep the recovery
file from the
>>>>>>>> failed state.
>>>>>>>>
>>>>>>>> -rw-r--r--   3 root hadoop       7041 2017-10-04 13:55
>>>>>>>> /flink-recovery/prod/completedCheckpoint6c9096bb9ed4
>>>>>>>>
>>>>>>>> -rw-r--r--   3 root hadoop       7044 2017-10-05 10:07
>>>>>>>> /flink-recovery/prod/completedCheckpoint7c5a19300092
>>>>>>>>
>>>>>>>> This is getting a little interesting. What say you :)
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Oct 5, 2017 at 9:26 AM, Vishal Santoshi <
>>>>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Another thing I noted was this thing
>>>>>>>>>
>>>>>>>>> drwxr-xr-x   - root hadoop          0 2017-10-04 13:54
>>>>>>>>> /flink-checkpoints/prod/c4af8dfa864e2f9a51764de9f0725b39/chk-44286
>>>>>>>>>
>>>>>>>>> drwxr-xr-x   - root hadoop          0 2017-10-05 09:15
>>>>>>>>> /flink-checkpoints/prod/c4af8dfa864e2f9a51764de9f0725b39/chk-45428
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Generally what Flink does IMHO is that it replaces the
chk point
>>>>>>>>> directory with a new one. I see it happening now. Every
minute it replaces
>>>>>>>>> the old directory.  In this job's case however, it did
not delete the
>>>>>>>>> 2017-10-04 13:54  and hence the chk-44286 directory.
 This was the last
>>>>>>>>> chk-44286 (  I think  )  successfully created before
NN had issues but as
>>>>>>>>> is usual did not delete this  chk-44286. It looks as
if it started with a
>>>>>>>>> blank slate ???????? Does this strike a chord ?????
>>>>>>>>>
>>>>>>>>> On Thu, Oct 5, 2017 at 8:56 AM, Vishal Santoshi <
>>>>>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hello Fabian,
>>>>>>>>>>                       First of all congratulations
on this
>>>>>>>>>> fabulous framework. I have worked with GDF and though
GDF has some natural
>>>>>>>>>> pluses Flink's state management is far more advanced.
With kafka as a
>>>>>>>>>> source it negates issues GDF has ( GDF integration
with pub/sub is organic
>>>>>>>>>> and that is to be expected but non FIFO pub/sub is
an issue with windows on
>>>>>>>>>> event time etc )
>>>>>>>>>>
>>>>>>>>>>                    Coming back to this issue. We
have that same
>>>>>>>>>> kafka topic feeding a streaming druid datasource
and we do not see any
>>>>>>>>>> issue there, so so data loss on the source, kafka
is not applicable. I am
>>>>>>>>>> totally certain that the "retention" time was not
an issue. It
>>>>>>>>>> is 4 days of retention and we fixed this issue within
30 minutes. We could
>>>>>>>>>> replay kafka with a new consumer group.id and that
worked fine.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Note these properties and see if they strike a chord.
>>>>>>>>>>
>>>>>>>>>> * The setCommitOffsetsOnCheckpoints(boolean) for
kafka consumers
>>>>>>>>>> is the default true. I bring this up to see whether
flink will in any
>>>>>>>>>> circumstance drive consumption on the kafka perceived
offset rather than
>>>>>>>>>> the one in the checkpoint.
>>>>>>>>>>
>>>>>>>>>> * The state.backend.fs.memory-threshold: 0 has not
been set.
>>>>>>>>>> The state is big enough though therefore IMHO no
way the state is stored
>>>>>>>>>> along with the meta data in JM ( or ZK ? ) . The
reason I bring this up is
>>>>>>>>>> to make sure when you say that the size has to be
less than 1024bytes , you
>>>>>>>>>> are talking about cumulative state of the pipeine.
>>>>>>>>>>
>>>>>>>>>> * We have a good sense of SP ( save point )  and
CP ( checkpoint
>>>>>>>>>> ) and certainly understand that they actually are
not dissimilar. However
>>>>>>>>>> in this case there were multiple attempts to restart
the pipe before it
>>>>>>>>>> finally succeeded.
>>>>>>>>>>
>>>>>>>>>> * Other hdfs related poperties.
>>>>>>>>>>
>>>>>>>>>>  state.backend.fs.checkpointdir: hdfs:///flink-checkpoints/<%=
>>>>>>>>>> flink_hdfs_root %>
>>>>>>>>>>
>>>>>>>>>>  state.savepoints.dir: hdfs:///flink-savepoints/<%=
flink_hdfs_root %>
>>>>>>>>>>
>>>>>>>>>>  recovery.zookeeper.storageDir: hdfs:///flink-recovery/<%=
flink_hdfs_root %>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Do these make sense ? Is there anything else I should
look at.
>>>>>>>>>> Please also note that it is the second time this
has happened. The first
>>>>>>>>>> time I was vacationing and was not privy to the state
of the flink
>>>>>>>>>> pipeline, but the net effect were similar. The counts
for the first window
>>>>>>>>>> after an internal restart dropped.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Thank you for you patience and regards,
>>>>>>>>>>
>>>>>>>>>> Vishal
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thu, Oct 5, 2017 at 5:01 AM, Fabian Hueske <fhueske@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Vishal,
>>>>>>>>>>>
>>>>>>>>>>> window operators are always stateful because
the operator needs
>>>>>>>>>>> to remember previously received events (WindowFunction)
or intermediate
>>>>>>>>>>> results (ReduceFunction).
>>>>>>>>>>> Given the program you described, a checkpoint
should include the
>>>>>>>>>>> Kafka consumer offset and the state of the window
operator. If the program
>>>>>>>>>>> eventually successfully (i.e., without an error)
recovered from the last
>>>>>>>>>>> checkpoint, all its state should have been restored.
Since the last
>>>>>>>>>>> checkpoint was before HDFS went into safe mode,
the program would have been
>>>>>>>>>>> reset to that point. If the Kafka retention time
is less than the time it
>>>>>>>>>>> took to fix HDFS you would have lost data because
it would have been
>>>>>>>>>>> removed from Kafka. If that's not the case, we
need to investigate this
>>>>>>>>>>> further because a checkpoint recovery must not
result in state loss.
>>>>>>>>>>>
>>>>>>>>>>> Restoring from a savepoint is not so much different
from
>>>>>>>>>>> automatic checkpoint recovery. Given that you
have a completed savepoint,
>>>>>>>>>>> you can restart the job from that point. The
main difference is that
>>>>>>>>>>> checkpoints are only used for internal recovery
and usually discarded once
>>>>>>>>>>> the job is terminated while savepoints are retained.
>>>>>>>>>>>
>>>>>>>>>>> Regarding your question if a failed checkpoint
should cause the
>>>>>>>>>>> job to fail and recover I'm not sure what the
current status is.
>>>>>>>>>>> Stefan (in CC) should know what happens if a
checkpoint fails.
>>>>>>>>>>>
>>>>>>>>>>> Best, Fabian
>>>>>>>>>>>
>>>>>>>>>>> 2017-10-05 2:20 GMT+02:00 Vishal Santoshi <
>>>>>>>>>>> vishal.santoshi@gmail.com>:
>>>>>>>>>>>
>>>>>>>>>>>> To add to it, my pipeline is a simple
>>>>>>>>>>>>
>>>>>>>>>>>> keyBy(0)
>>>>>>>>>>>>         .timeWindow(Time.of(window_size,
TimeUnit.MINUTES))
>>>>>>>>>>>>         .allowedLateness(Time.of(late_by,
TimeUnit.SECONDS))
>>>>>>>>>>>>         .reduce(new ReduceFunction(), new
WindowFunction())
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Wed, Oct 4, 2017 at 8:19 PM, Vishal Santoshi
<
>>>>>>>>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hello folks,
>>>>>>>>>>>>>
>>>>>>>>>>>>> As far as I know checkpoint failure should
be ignored and
>>>>>>>>>>>>> retried with potentially larger state.
I had this situation
>>>>>>>>>>>>>
>>>>>>>>>>>>> * hdfs went into a safe mode b'coz of
Name Node issues
>>>>>>>>>>>>> * exception was thrown
>>>>>>>>>>>>>
>>>>>>>>>>>>>     org.apache.hadoop.ipc.RemoteException(org.apache.
>>>>>>>>>>>>> hadoop.ipc.StandbyException): Operation
category WRITE is not
>>>>>>>>>>>>> supported in state standby. Visit https://s.apache.org/sbn
>>>>>>>>>>>>> n-error
>>>>>>>>>>>>>     ..................
>>>>>>>>>>>>>
>>>>>>>>>>>>>     at org.apache.flink.runtime.fs.hd
>>>>>>>>>>>>> fs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:453)
>>>>>>>>>>>>>         at org.apache.flink.core.fs.Safet
>>>>>>>>>>>>> yNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem.java
>>>>>>>>>>>>> :111)
>>>>>>>>>>>>>         at org.apache.flink.runtime.state.filesystem.
>>>>>>>>>>>>> FsCheckpointStreamFactory.createBasePath(FsCheck
>>>>>>>>>>>>> pointStreamFactory.java:132)
>>>>>>>>>>>>>
>>>>>>>>>>>>> * The pipeline came back after a few
restarts and checkpoint
>>>>>>>>>>>>> failures, after the hdfs issues were
resolved.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I would not have worried about the restart,
but it was evident
>>>>>>>>>>>>> that I lost my operator state. Either
it was my kafka consumer that kept on
>>>>>>>>>>>>> advancing it's offset between a start
and the next checkpoint failure ( a
>>>>>>>>>>>>> minute's worth ) or the the operator
that had partial aggregates was lost.
>>>>>>>>>>>>> I have a 15 minute window of counts on
a keyed operator
>>>>>>>>>>>>>
>>>>>>>>>>>>> I am using ROCKS DB and of course have
checkpointing turned on.
>>>>>>>>>>>>>
>>>>>>>>>>>>> The questions thus are
>>>>>>>>>>>>>
>>>>>>>>>>>>> * Should a pipeline be restarted if checkpoint
fails ?
>>>>>>>>>>>>> * Why on restart did the operator state
did not recreate ?
>>>>>>>>>>>>> * Is the nature of the exception thrown
have to do with any of
>>>>>>>>>>>>> this b'coz suspend and resume from a
save point work as expected ?
>>>>>>>>>>>>> * And though I am pretty sure, are operators
like the Window
>>>>>>>>>>>>> operator stateful by drfault and thus
if I have timeWindow(Time.of(window_
>>>>>>>>>>>>> size, TimeUnit.MINUTES)).reduce(new ReduceFunction(),
new
>>>>>>>>>>>>> WindowFunction()), the state is managed
by flink ?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks.
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>
>>
>

Mime
View raw message