flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Vishal Santoshi <vishal.santo...@gmail.com>
Subject Re: Failing to recover once checkpoint fails
Date Tue, 23 Jan 2018 16:38:51 GMT
Thank you for considering this. If I understand you correctly.

* CHK pointer on ZK for a CHK state on hdfs was done successfully.
* Some issue restarted the pipeline.
* The NN was down unfortunately and flink could not retrieve the  CHK state
from the CHK pointer on ZK.

Before

* The CHK pointer was being removed and the job started from a brand new
slate.

After ( this fix on 1.4 +)

* do not delete the CHK pointer ( It has to be subsumed to be deleted ).
* Flink keeps using this CHK pointer ( if retry is > 0 and till we hit any
retry limit ) to restore state
* NN comes back
* Flink restores state on the next retry.

I would hope that is the sequence to follow.

Regards.








On Tue, Jan 23, 2018 at 7:25 AM, Aljoscha Krettek <aljoscha@apache.org>
wrote:

> Hi Vishal,
>
> I think you might be right. We fixed the problem that checkpoints where
> dropped via https://issues.apache.org/jira/browse/FLINK-7783. However, we
> still have the problem that if the DFS is not up at all then it will look
> as if the job is starting from scratch. However, the alternative is failing
> the job, in which case you will also never be able to restore from a
> checkpoint. What do you think?
>
> Best,
> Aljoscha
>
>
> On 23. Jan 2018, at 10:15, Fabian Hueske <fhueske@gmail.com> wrote:
>
> Sorry for the late reply.
>
> I created FLINK-8487 [1] to track this problem
>
> @Vishal, can you have a look and check if if forgot some details? I logged
> the issue for Flink 1.3.2, is that correct?
> Please add more information if you think it is relevant.
>
> Thanks,
> Fabian
>
> [1] https://issues.apache.org/jira/browse/FLINK-8487
>
> 2018-01-18 22:14 GMT+01:00 Vishal Santoshi <vishal.santoshi@gmail.com>:
>
>> Or this one
>>
>> https://issues.apache.org/jira/browse/FLINK-4815
>>
>> On Thu, Jan 18, 2018 at 4:13 PM, Vishal Santoshi <
>> vishal.santoshi@gmail.com> wrote:
>>
>>> ping.
>>>
>>>     This happened again on production and it seems reasonable to abort
>>> when a checkpoint is not found rather than behave as if it is a brand new
>>> pipeline.
>>>
>>> On Tue, Jan 16, 2018 at 9:33 AM, Vishal Santoshi <
>>> vishal.santoshi@gmail.com> wrote:
>>>
>>>> Folks sorry for being late on this. Can some body with the knowledge of
>>>> this code base create a jira issue for the above ? We have seen this more
>>>> than once on production.
>>>>
>>>> On Mon, Oct 9, 2017 at 10:21 AM, Aljoscha Krettek <aljoscha@apache.org>
>>>> wrote:
>>>>
>>>>> Hi Vishal,
>>>>>
>>>>> Some relevant Jira issues for you are:
>>>>>
>>>>>  - https://issues.apache.org/jira/browse/FLINK-4808: Allow skipping
>>>>> failed checkpoints
>>>>>  - https://issues.apache.org/jira/browse/FLINK-4815: Automatic
>>>>> fallback to earlier checkpoint when checkpoint restore fails
>>>>>  - https://issues.apache.org/jira/browse/FLINK-7783: Don't always
>>>>> remove checkpoints in ZooKeeperCompletedCheckpointStore#recover()
>>>>>
>>>>> Best,
>>>>> Aljoscha
>>>>>
>>>>>
>>>>> On 9. Oct 2017, at 09:06, Fabian Hueske <fhueske@gmail.com> wrote:
>>>>>
>>>>> Hi Vishal,
>>>>>
>>>>> it would be great if you could create a JIRA ticket with Blocker
>>>>> priority.
>>>>> Please add all relevant information of your detailed analysis, add a
>>>>> link to this email thread (see [1] for the web archive of the mailing
>>>>> list), and post the id of the JIRA issue here.
>>>>>
>>>>> Thanks for looking into this!
>>>>>
>>>>> Best regards,
>>>>> Fabian
>>>>>
>>>>> [1] https://lists.apache.org/list.html?user@flink.apache.org
>>>>>
>>>>> 2017-10-06 15:59 GMT+02:00 Vishal Santoshi <vishal.santoshi@gmail.com>
>>>>> :
>>>>>
>>>>>> Thank you for confirming.
>>>>>>
>>>>>>
>>>>>>  I think this is a critical bug. In essence any checkpoint store
(
>>>>>> hdfs/S3/File)  will loose state if it is unavailable at resume. This
>>>>>> becomes all the more painful with your confirming that  "failed
>>>>>> checkpoints killing the job"  b'coz essentially it mean that if remote
>>>>>> store in unavailable  during checkpoint than you have lost state
( till of
>>>>>> course you have a retry of none or an unbounded retry delay, a delay
that
>>>>>> you *hope* the store revives in ) .. Remember  the first retry
>>>>>> failure  will cause new state according the code as written iff the
remote
>>>>>> store is down. We would rather have a configurable property that
>>>>>> establishes  our desire to abort something like a
>>>>>> "abort_retry_on_chkretrevalfailure"
>>>>>>
>>>>>>
>>>>>> In our case it is very important that we do not undercount a window,
>>>>>> one reason we use flink and it's awesome failure guarantees, as various
>>>>>> alarms sound ( we do anomaly detection on the time series ).
>>>>>>
>>>>>> Please create a jira ticket for us to follow or we could do it.
>>>>>>
>>>>>>
>>>>>> PS Not aborting on checkpointing, till a configurable limit is very
>>>>>> important too.
>>>>>>
>>>>>>
>>>>>> On Fri, Oct 6, 2017 at 2:36 AM, Aljoscha Krettek <aljoscha@apache.org
>>>>>> > wrote:
>>>>>>
>>>>>>> Hi Vishal,
>>>>>>>
>>>>>>> I think you're right! And thanks for looking into this so deeply.
>>>>>>>
>>>>>>> With your last mail your basically saying, that the checkpoint
could
>>>>>>> not be restored because your HDFS was temporarily down. If Flink
had not
>>>>>>> deleted that checkpoint it might have been possible to restore
it at a
>>>>>>> later point, right?
>>>>>>>
>>>>>>> Regarding failed checkpoints killing the job: yes, this is currently
>>>>>>> the expected behaviour but there are plans to change this.
>>>>>>>
>>>>>>> Best,
>>>>>>> Aljoscha
>>>>>>>
>>>>>>> On 5. Oct 2017, at 17:40, Vishal Santoshi <vishal.santoshi@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>> I think this is the offending piece. There is a catch all Exception,
>>>>>>> which IMHO should understand a recoverable exception from an
unrecoverable
>>>>>>> on.
>>>>>>>
>>>>>>>
>>>>>>> try {
>>>>>>> completedCheckpoint = retrieveCompletedCheckpoint(ch
>>>>>>> eckpointStateHandle);
>>>>>>> if (completedCheckpoint != null) {
>>>>>>> completedCheckpoints.add(completedCheckpoint);
>>>>>>> }
>>>>>>> } catch (Exception e) {
>>>>>>> LOG.warn("Could not retrieve checkpoint. Removing it from the
>>>>>>> completed " +
>>>>>>> "checkpoint store.", e);
>>>>>>> // remove the checkpoint with broken state handle
>>>>>>> removeBrokenStateHandle(checkpointStateHandle.f1,
>>>>>>> checkpointStateHandle.f0);
>>>>>>> }
>>>>>>>
>>>>>>> On Thu, Oct 5, 2017 at 10:57 AM, Vishal Santoshi <
>>>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>>>
>>>>>>>> So this is the issue and tell us that it is wrong. ZK had
some
>>>>>>>> state ( backed by hdfs ) that referred to a checkpoint (
the same exact
>>>>>>>> last successful checkpoint that was successful before NN
screwed us ). When
>>>>>>>> the JM tried to recreate the state and b'coz NN was down
failed to retrieve
>>>>>>>> the CHK handle from hdfs and conveniently ( and I think very
wrongly )
>>>>>>>> removed the CHK from being considered and cleaned the pointer
( though
>>>>>>>> failed as was NN was down and is obvious from the dangling
file in recovery
>>>>>>>> ) . The metadata itself was on hdfs and failure in retrieving
should have
>>>>>>>> been a stop all, not going to trying doing magic exception
rather than
>>>>>>>> starting from a blank state.
>>>>>>>>
>>>>>>>> org.apache.flink.util.FlinkException: Could not retrieve
checkpoint
>>>>>>>> 44286 from state handle under /0000000000000044286. This
indicates that the
>>>>>>>> retrieved state handle is broken. Try cleaning the state
handle store.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Oct 5, 2017 at 10:13 AM, Vishal Santoshi <
>>>>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Also note that  the zookeeper recovery did  ( sadly on
the same
>>>>>>>>> hdfs cluster ) also showed the same behavior. It had
the pointers to the
>>>>>>>>> chk point  ( I  think that is what it does, keeps metadata
of where the
>>>>>>>>> checkpoint etc  ) .  It too decided to keep the recovery
file from the
>>>>>>>>> failed state.
>>>>>>>>>
>>>>>>>>> -rw-r--r--   3 root hadoop       7041 2017-10-04 13:55
>>>>>>>>> /flink-recovery/prod/completedCheckpoint6c9096bb9ed4
>>>>>>>>>
>>>>>>>>> -rw-r--r--   3 root hadoop       7044 2017-10-05 10:07
>>>>>>>>> /flink-recovery/prod/completedCheckpoint7c5a19300092
>>>>>>>>>
>>>>>>>>> This is getting a little interesting. What say you :)
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Oct 5, 2017 at 9:26 AM, Vishal Santoshi <
>>>>>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Another thing I noted was this thing
>>>>>>>>>>
>>>>>>>>>> drwxr-xr-x   - root hadoop          0 2017-10-04
13:54
>>>>>>>>>> /flink-checkpoints/prod/c4af8dfa864e2f9a51764de9f0725b39/chk
>>>>>>>>>> -44286
>>>>>>>>>>
>>>>>>>>>> drwxr-xr-x   - root hadoop          0 2017-10-05
09:15
>>>>>>>>>> /flink-checkpoints/prod/c4af8dfa864e2f9a51764de9f0725b39/chk
>>>>>>>>>> -45428
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Generally what Flink does IMHO is that it replaces
the chk point
>>>>>>>>>> directory with a new one. I see it happening now.
Every minute it replaces
>>>>>>>>>> the old directory.  In this job's case however, it
did not delete the
>>>>>>>>>> 2017-10-04 13:54  and hence the chk-44286 directory.
 This was the last
>>>>>>>>>> chk-44286 (  I think  )  successfully created before
NN had issues but as
>>>>>>>>>> is usual did not delete this  chk-44286. It looks
as if it started with a
>>>>>>>>>> blank slate ???????? Does this strike a chord ?????
>>>>>>>>>>
>>>>>>>>>> On Thu, Oct 5, 2017 at 8:56 AM, Vishal Santoshi <
>>>>>>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hello Fabian,
>>>>>>>>>>>                       First of all congratulations
on this
>>>>>>>>>>> fabulous framework. I have worked with GDF and
though GDF has some natural
>>>>>>>>>>> pluses Flink's state management is far more advanced.
With kafka as a
>>>>>>>>>>> source it negates issues GDF has ( GDF integration
with pub/sub is organic
>>>>>>>>>>> and that is to be expected but non FIFO pub/sub
is an issue with windows on
>>>>>>>>>>> event time etc )
>>>>>>>>>>>
>>>>>>>>>>>                    Coming back to this issue.
We have that same
>>>>>>>>>>> kafka topic feeding a streaming druid datasource
and we do not see any
>>>>>>>>>>> issue there, so so data loss on the source, kafka
is not applicable. I am
>>>>>>>>>>> totally certain that the "retention" time was
not an issue. It
>>>>>>>>>>> is 4 days of retention and we fixed this issue
within 30 minutes. We could
>>>>>>>>>>> replay kafka with a new consumer group.id and
that worked fine.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Note these properties and see if they strike
a chord.
>>>>>>>>>>>
>>>>>>>>>>> * The setCommitOffsetsOnCheckpoints(boolean)
for kafka
>>>>>>>>>>> consumers is the default true. I bring this up
to see whether flink will in
>>>>>>>>>>> any circumstance drive consumption on the kafka
perceived offset rather
>>>>>>>>>>> than the one in the checkpoint.
>>>>>>>>>>>
>>>>>>>>>>> * The state.backend.fs.memory-threshold: 0 has
not been set.
>>>>>>>>>>> The state is big enough though therefore IMHO
no way the state is stored
>>>>>>>>>>> along with the meta data in JM ( or ZK ? ) .
The reason I bring this up is
>>>>>>>>>>> to make sure when you say that the size has to
be less than 1024bytes , you
>>>>>>>>>>> are talking about cumulative state of the pipeine.
>>>>>>>>>>>
>>>>>>>>>>> * We have a good sense of SP ( save point ) 
and CP ( checkpoint
>>>>>>>>>>> ) and certainly understand that they actually
are not dissimilar. However
>>>>>>>>>>> in this case there were multiple attempts to
restart the pipe before it
>>>>>>>>>>> finally succeeded.
>>>>>>>>>>>
>>>>>>>>>>> * Other hdfs related poperties.
>>>>>>>>>>>
>>>>>>>>>>>  state.backend.fs.checkpointdir: hdfs:///flink-checkpoints/<%=
>>>>>>>>>>> flink_hdfs_root %>
>>>>>>>>>>>
>>>>>>>>>>>  state.savepoints.dir: hdfs:///flink-savepoints/<%=
flink_hdfs_root %>
>>>>>>>>>>>
>>>>>>>>>>>  recovery.zookeeper.storageDir: hdfs:///flink-recovery/<%=
flink_hdfs_root %>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Do these make sense ? Is there anything else
I should look at.
>>>>>>>>>>> Please also note that it is the second time this
has happened. The first
>>>>>>>>>>> time I was vacationing and was not privy to the
state of the flink
>>>>>>>>>>> pipeline, but the net effect were similar. The
counts for the first window
>>>>>>>>>>> after an internal restart dropped.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Thank you for you patience and regards,
>>>>>>>>>>>
>>>>>>>>>>> Vishal
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Oct 5, 2017 at 5:01 AM, Fabian Hueske
<fhueske@gmail.com
>>>>>>>>>>> > wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Vishal,
>>>>>>>>>>>>
>>>>>>>>>>>> window operators are always stateful because
the operator needs
>>>>>>>>>>>> to remember previously received events (WindowFunction)
or intermediate
>>>>>>>>>>>> results (ReduceFunction).
>>>>>>>>>>>> Given the program you described, a checkpoint
should include
>>>>>>>>>>>> the Kafka consumer offset and the state of
the window operator. If the
>>>>>>>>>>>> program eventually successfully (i.e., without
an error) recovered from the
>>>>>>>>>>>> last checkpoint, all its state should have
been restored. Since the last
>>>>>>>>>>>> checkpoint was before HDFS went into safe
mode, the program would have been
>>>>>>>>>>>> reset to that point. If the Kafka retention
time is less than the time it
>>>>>>>>>>>> took to fix HDFS you would have lost data
because it would have been
>>>>>>>>>>>> removed from Kafka. If that's not the case,
we need to investigate this
>>>>>>>>>>>> further because a checkpoint recovery must
not result in state loss.
>>>>>>>>>>>>
>>>>>>>>>>>> Restoring from a savepoint is not so much
different from
>>>>>>>>>>>> automatic checkpoint recovery. Given that
you have a completed savepoint,
>>>>>>>>>>>> you can restart the job from that point.
The main difference is that
>>>>>>>>>>>> checkpoints are only used for internal recovery
and usually discarded once
>>>>>>>>>>>> the job is terminated while savepoints are
retained.
>>>>>>>>>>>>
>>>>>>>>>>>> Regarding your question if a failed checkpoint
should cause the
>>>>>>>>>>>> job to fail and recover I'm not sure what
the current status is.
>>>>>>>>>>>> Stefan (in CC) should know what happens if
a checkpoint fails.
>>>>>>>>>>>>
>>>>>>>>>>>> Best, Fabian
>>>>>>>>>>>>
>>>>>>>>>>>> 2017-10-05 2:20 GMT+02:00 Vishal Santoshi
<
>>>>>>>>>>>> vishal.santoshi@gmail.com>:
>>>>>>>>>>>>
>>>>>>>>>>>>> To add to it, my pipeline is a simple
>>>>>>>>>>>>>
>>>>>>>>>>>>> keyBy(0)
>>>>>>>>>>>>>         .timeWindow(Time.of(window_size,
TimeUnit.MINUTES))
>>>>>>>>>>>>>         .allowedLateness(Time.of(late_by,
TimeUnit.SECONDS))
>>>>>>>>>>>>>         .reduce(new ReduceFunction(),
new WindowFunction())
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Wed, Oct 4, 2017 at 8:19 PM, Vishal
Santoshi <
>>>>>>>>>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hello folks,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> As far as I know checkpoint failure
should be ignored and
>>>>>>>>>>>>>> retried with potentially larger state.
I had this situation
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> * hdfs went into a safe mode b'coz
of Name Node issues
>>>>>>>>>>>>>> * exception was thrown
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>     org.apache.hadoop.ipc.RemoteException(org.apache.
>>>>>>>>>>>>>> hadoop.ipc.StandbyException): Operation
category WRITE is
>>>>>>>>>>>>>> not supported in state standby. Visit
>>>>>>>>>>>>>> https://s.apache.org/sbnn-error
>>>>>>>>>>>>>>     ..................
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>     at org.apache.flink.runtime.fs.hd
>>>>>>>>>>>>>> fs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:453)
>>>>>>>>>>>>>>         at org.apache.flink.core.fs.Safet
>>>>>>>>>>>>>> yNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem.java
>>>>>>>>>>>>>> :111)
>>>>>>>>>>>>>>         at org.apache.flink.runtime.state.filesystem.
>>>>>>>>>>>>>> FsCheckpointStreamFactory.createBasePath(FsCheck
>>>>>>>>>>>>>> pointStreamFactory.java:132)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> * The pipeline came back after a
few restarts and checkpoint
>>>>>>>>>>>>>> failures, after the hdfs issues were
resolved.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I would not have worried about the
restart, but it was
>>>>>>>>>>>>>> evident that I lost my operator state.
Either it was my kafka consumer that
>>>>>>>>>>>>>> kept on advancing it's offset between
a start and the next checkpoint
>>>>>>>>>>>>>> failure ( a minute's worth ) or the
the operator that had partial
>>>>>>>>>>>>>> aggregates was lost. I have a 15
minute window of counts on a keyed operator
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I am using ROCKS DB and of course
have checkpointing turned
>>>>>>>>>>>>>> on.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> The questions thus are
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> * Should a pipeline be restarted
if checkpoint fails ?
>>>>>>>>>>>>>> * Why on restart did the operator
state did not recreate ?
>>>>>>>>>>>>>> * Is the nature of the exception
thrown have to do with any
>>>>>>>>>>>>>> of this b'coz suspend and resume
from a save point work as expected ?
>>>>>>>>>>>>>> * And though I am pretty sure, are
operators like the Window
>>>>>>>>>>>>>> operator stateful by drfault and
thus if I have timeWindow(Time.of(window_
>>>>>>>>>>>>>> size, TimeUnit.MINUTES)).reduce(new
ReduceFunction(), new
>>>>>>>>>>>>>> WindowFunction()), the state is managed
by flink ?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>
>

Mime
View raw message