flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Vishal Santoshi <vishal.santo...@gmail.com>
Subject Re: Failing to recover once checkpoint fails
Date Thu, 18 Jan 2018 21:14:46 GMT
Or this one

https://issues.apache.org/jira/browse/FLINK-4815

On Thu, Jan 18, 2018 at 4:13 PM, Vishal Santoshi <vishal.santoshi@gmail.com>
wrote:

> ping.
>
>     This happened again on production and it seems reasonable to abort
> when a checkpoint is not found rather than behave as if it is a brand new
> pipeline.
>
> On Tue, Jan 16, 2018 at 9:33 AM, Vishal Santoshi <
> vishal.santoshi@gmail.com> wrote:
>
>> Folks sorry for being late on this. Can some body with the knowledge of
>> this code base create a jira issue for the above ? We have seen this more
>> than once on production.
>>
>> On Mon, Oct 9, 2017 at 10:21 AM, Aljoscha Krettek <aljoscha@apache.org>
>> wrote:
>>
>>> Hi Vishal,
>>>
>>> Some relevant Jira issues for you are:
>>>
>>>  - https://issues.apache.org/jira/browse/FLINK-4808: Allow skipping
>>> failed checkpoints
>>>  - https://issues.apache.org/jira/browse/FLINK-4815: Automatic fallback
>>> to earlier checkpoint when checkpoint restore fails
>>>  - https://issues.apache.org/jira/browse/FLINK-7783: Don't always
>>> remove checkpoints in ZooKeeperCompletedCheckpointStore#recover()
>>>
>>> Best,
>>> Aljoscha
>>>
>>>
>>> On 9. Oct 2017, at 09:06, Fabian Hueske <fhueske@gmail.com> wrote:
>>>
>>> Hi Vishal,
>>>
>>> it would be great if you could create a JIRA ticket with Blocker
>>> priority.
>>> Please add all relevant information of your detailed analysis, add a
>>> link to this email thread (see [1] for the web archive of the mailing
>>> list), and post the id of the JIRA issue here.
>>>
>>> Thanks for looking into this!
>>>
>>> Best regards,
>>> Fabian
>>>
>>> [1] https://lists.apache.org/list.html?user@flink.apache.org
>>>
>>> 2017-10-06 15:59 GMT+02:00 Vishal Santoshi <vishal.santoshi@gmail.com>:
>>>
>>>> Thank you for confirming.
>>>>
>>>>
>>>>  I think this is a critical bug. In essence any checkpoint store (
>>>> hdfs/S3/File)  will loose state if it is unavailable at resume. This
>>>> becomes all the more painful with your confirming that  "failed
>>>> checkpoints killing the job"  b'coz essentially it mean that if remote
>>>> store in unavailable  during checkpoint than you have lost state ( till of
>>>> course you have a retry of none or an unbounded retry delay, a delay that
>>>> you *hope* the store revives in ) .. Remember  the first retry failure
>>>>  will cause new state according the code as written iff the remote store
is
>>>> down. We would rather have a configurable property that establishes  our
>>>> desire to abort something like a "abort_retry_on_chkretrevalfailure"
>>>>
>>>>
>>>> In our case it is very important that we do not undercount a window,
>>>> one reason we use flink and it's awesome failure guarantees, as various
>>>> alarms sound ( we do anomaly detection on the time series ).
>>>>
>>>> Please create a jira ticket for us to follow or we could do it.
>>>>
>>>>
>>>> PS Not aborting on checkpointing, till a configurable limit is very
>>>> important too.
>>>>
>>>>
>>>> On Fri, Oct 6, 2017 at 2:36 AM, Aljoscha Krettek <aljoscha@apache.org>
>>>> wrote:
>>>>
>>>>> Hi Vishal,
>>>>>
>>>>> I think you're right! And thanks for looking into this so deeply.
>>>>>
>>>>> With your last mail your basically saying, that the checkpoint could
>>>>> not be restored because your HDFS was temporarily down. If Flink had
not
>>>>> deleted that checkpoint it might have been possible to restore it at
a
>>>>> later point, right?
>>>>>
>>>>> Regarding failed checkpoints killing the job: yes, this is currently
>>>>> the expected behaviour but there are plans to change this.
>>>>>
>>>>> Best,
>>>>> Aljoscha
>>>>>
>>>>> On 5. Oct 2017, at 17:40, Vishal Santoshi <vishal.santoshi@gmail.com>
>>>>> wrote:
>>>>>
>>>>> I think this is the offending piece. There is a catch all Exception,
>>>>> which IMHO should understand a recoverable exception from an unrecoverable
>>>>> on.
>>>>>
>>>>>
>>>>> try {
>>>>> completedCheckpoint = retrieveCompletedCheckpoint(ch
>>>>> eckpointStateHandle);
>>>>> if (completedCheckpoint != null) {
>>>>> completedCheckpoints.add(completedCheckpoint);
>>>>> }
>>>>> } catch (Exception e) {
>>>>> LOG.warn("Could not retrieve checkpoint. Removing it from the
>>>>> completed " +
>>>>> "checkpoint store.", e);
>>>>> // remove the checkpoint with broken state handle
>>>>> removeBrokenStateHandle(checkpointStateHandle.f1,
>>>>> checkpointStateHandle.f0);
>>>>> }
>>>>>
>>>>> On Thu, Oct 5, 2017 at 10:57 AM, Vishal Santoshi <
>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>
>>>>>> So this is the issue and tell us that it is wrong. ZK had some state
>>>>>> ( backed by hdfs ) that referred to a checkpoint ( the same exact
last
>>>>>> successful checkpoint that was successful before NN screwed us ).
When the
>>>>>> JM tried to recreate the state and b'coz NN was down failed to retrieve
the
>>>>>> CHK handle from hdfs and conveniently ( and I think very wrongly
) removed
>>>>>> the CHK from being considered and cleaned the pointer ( though failed
as
>>>>>> was NN was down and is obvious from the dangling file in recovery
) . The
>>>>>> metadata itself was on hdfs and failure in retrieving should have
been a
>>>>>> stop all, not going to trying doing magic exception rather than starting
>>>>>> from a blank state.
>>>>>>
>>>>>> org.apache.flink.util.FlinkException: Could not retrieve checkpoint
>>>>>> 44286 from state handle under /0000000000000044286. This indicates
that the
>>>>>> retrieved state handle is broken. Try cleaning the state handle store.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Thu, Oct 5, 2017 at 10:13 AM, Vishal Santoshi <
>>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>>
>>>>>>> Also note that  the zookeeper recovery did  ( sadly on the same
hdfs
>>>>>>> cluster ) also showed the same behavior. It had the pointers
to the chk
>>>>>>> point  ( I  think that is what it does, keeps metadata of where
the
>>>>>>> checkpoint etc  ) .  It too decided to keep the recovery file
from the
>>>>>>> failed state.
>>>>>>>
>>>>>>> -rw-r--r--   3 root hadoop       7041 2017-10-04 13:55
>>>>>>> /flink-recovery/prod/completedCheckpoint6c9096bb9ed4
>>>>>>>
>>>>>>> -rw-r--r--   3 root hadoop       7044 2017-10-05 10:07
>>>>>>> /flink-recovery/prod/completedCheckpoint7c5a19300092
>>>>>>>
>>>>>>> This is getting a little interesting. What say you :)
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Oct 5, 2017 at 9:26 AM, Vishal Santoshi <
>>>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>>>
>>>>>>>> Another thing I noted was this thing
>>>>>>>>
>>>>>>>> drwxr-xr-x   - root hadoop          0 2017-10-04 13:54
>>>>>>>> /flink-checkpoints/prod/c4af8dfa864e2f9a51764de9f0725b39/chk-44286
>>>>>>>>
>>>>>>>> drwxr-xr-x   - root hadoop          0 2017-10-05 09:15
>>>>>>>> /flink-checkpoints/prod/c4af8dfa864e2f9a51764de9f0725b39/chk-45428
>>>>>>>>
>>>>>>>>
>>>>>>>> Generally what Flink does IMHO is that it replaces the chk
point
>>>>>>>> directory with a new one. I see it happening now. Every minute
it replaces
>>>>>>>> the old directory.  In this job's case however, it did not
delete the
>>>>>>>> 2017-10-04 13:54  and hence the chk-44286 directory.  This
was the last
>>>>>>>> chk-44286 (  I think  )  successfully created before NN had
issues but as
>>>>>>>> is usual did not delete this  chk-44286. It looks as if it
started with a
>>>>>>>> blank slate ???????? Does this strike a chord ?????
>>>>>>>>
>>>>>>>> On Thu, Oct 5, 2017 at 8:56 AM, Vishal Santoshi <
>>>>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hello Fabian,
>>>>>>>>>                       First of all congratulations on
this
>>>>>>>>> fabulous framework. I have worked with GDF and though
GDF has some natural
>>>>>>>>> pluses Flink's state management is far more advanced.
With kafka as a
>>>>>>>>> source it negates issues GDF has ( GDF integration with
pub/sub is organic
>>>>>>>>> and that is to be expected but non FIFO pub/sub is an
issue with windows on
>>>>>>>>> event time etc )
>>>>>>>>>
>>>>>>>>>                    Coming back to this issue. We have
that same
>>>>>>>>> kafka topic feeding a streaming druid datasource and
we do not see any
>>>>>>>>> issue there, so so data loss on the source, kafka is
not applicable. I am
>>>>>>>>> totally certain that the "retention" time was not an
issue. It is
>>>>>>>>> 4 days of retention and we fixed this issue within 30
minutes. We could
>>>>>>>>> replay kafka with a new consumer group.id and that worked
fine.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Note these properties and see if they strike a chord.
>>>>>>>>>
>>>>>>>>> * The setCommitOffsetsOnCheckpoints(boolean) for kafka
consumers
>>>>>>>>> is the default true. I bring this up to see whether flink
will in any
>>>>>>>>> circumstance drive consumption on the kafka perceived
offset rather than
>>>>>>>>> the one in the checkpoint.
>>>>>>>>>
>>>>>>>>> * The state.backend.fs.memory-threshold: 0 has not been
set.  The
>>>>>>>>> state is big enough though therefore IMHO no way the
state is stored along
>>>>>>>>> with the meta data in JM ( or ZK ? ) . The reason I bring
this up is to
>>>>>>>>> make sure when you say that the size has to be less than
1024bytes , you
>>>>>>>>> are talking about cumulative state of the pipeine.
>>>>>>>>>
>>>>>>>>> * We have a good sense of SP ( save point )  and CP (
checkpoint )
>>>>>>>>> and certainly understand that they actually are not dissimilar.
However in
>>>>>>>>> this case there were multiple attempts to restart the
pipe before it
>>>>>>>>> finally succeeded.
>>>>>>>>>
>>>>>>>>> * Other hdfs related poperties.
>>>>>>>>>
>>>>>>>>>  state.backend.fs.checkpointdir: hdfs:///flink-checkpoints/<%=
>>>>>>>>> flink_hdfs_root %>
>>>>>>>>>
>>>>>>>>>  state.savepoints.dir: hdfs:///flink-savepoints/<%=
flink_hdfs_root %>
>>>>>>>>>
>>>>>>>>>  recovery.zookeeper.storageDir: hdfs:///flink-recovery/<%=
flink_hdfs_root %>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Do these make sense ? Is there anything else I should
look at.
>>>>>>>>> Please also note that it is the second time this has
happened. The first
>>>>>>>>> time I was vacationing and was not privy to the state
of the flink
>>>>>>>>> pipeline, but the net effect were similar. The counts
for the first window
>>>>>>>>> after an internal restart dropped.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Thank you for you patience and regards,
>>>>>>>>>
>>>>>>>>> Vishal
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Oct 5, 2017 at 5:01 AM, Fabian Hueske <fhueske@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Vishal,
>>>>>>>>>>
>>>>>>>>>> window operators are always stateful because the
operator needs
>>>>>>>>>> to remember previously received events (WindowFunction)
or intermediate
>>>>>>>>>> results (ReduceFunction).
>>>>>>>>>> Given the program you described, a checkpoint should
include the
>>>>>>>>>> Kafka consumer offset and the state of the window
operator. If the program
>>>>>>>>>> eventually successfully (i.e., without an error)
recovered from the last
>>>>>>>>>> checkpoint, all its state should have been restored.
Since the last
>>>>>>>>>> checkpoint was before HDFS went into safe mode, the
program would have been
>>>>>>>>>> reset to that point. If the Kafka retention time
is less than the time it
>>>>>>>>>> took to fix HDFS you would have lost data because
it would have been
>>>>>>>>>> removed from Kafka. If that's not the case, we need
to investigate this
>>>>>>>>>> further because a checkpoint recovery must not result
in state loss.
>>>>>>>>>>
>>>>>>>>>> Restoring from a savepoint is not so much different
from
>>>>>>>>>> automatic checkpoint recovery. Given that you have
a completed savepoint,
>>>>>>>>>> you can restart the job from that point. The main
difference is that
>>>>>>>>>> checkpoints are only used for internal recovery and
usually discarded once
>>>>>>>>>> the job is terminated while savepoints are retained.
>>>>>>>>>>
>>>>>>>>>> Regarding your question if a failed checkpoint should
cause the
>>>>>>>>>> job to fail and recover I'm not sure what the current
status is.
>>>>>>>>>> Stefan (in CC) should know what happens if a checkpoint
fails.
>>>>>>>>>>
>>>>>>>>>> Best, Fabian
>>>>>>>>>>
>>>>>>>>>> 2017-10-05 2:20 GMT+02:00 Vishal Santoshi <
>>>>>>>>>> vishal.santoshi@gmail.com>:
>>>>>>>>>>
>>>>>>>>>>> To add to it, my pipeline is a simple
>>>>>>>>>>>
>>>>>>>>>>> keyBy(0)
>>>>>>>>>>>         .timeWindow(Time.of(window_size, TimeUnit.MINUTES))
>>>>>>>>>>>         .allowedLateness(Time.of(late_by, TimeUnit.SECONDS))
>>>>>>>>>>>         .reduce(new ReduceFunction(), new WindowFunction())
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Oct 4, 2017 at 8:19 PM, Vishal Santoshi
<
>>>>>>>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hello folks,
>>>>>>>>>>>>
>>>>>>>>>>>> As far as I know checkpoint failure should
be ignored and
>>>>>>>>>>>> retried with potentially larger state. I
had this situation
>>>>>>>>>>>>
>>>>>>>>>>>> * hdfs went into a safe mode b'coz of Name
Node issues
>>>>>>>>>>>> * exception was thrown
>>>>>>>>>>>>
>>>>>>>>>>>>     org.apache.hadoop.ipc.RemoteException(org.apache.
>>>>>>>>>>>> hadoop.ipc.StandbyException): Operation category
WRITE is not
>>>>>>>>>>>> supported in state standby. Visit https://s.apache.org/sbn
>>>>>>>>>>>> n-error
>>>>>>>>>>>>     ..................
>>>>>>>>>>>>
>>>>>>>>>>>>     at org.apache.flink.runtime.fs.hd
>>>>>>>>>>>> fs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:453)
>>>>>>>>>>>>         at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.
>>>>>>>>>>>> mkdirs(SafetyNetWrapperFileSystem.java:111)
>>>>>>>>>>>>         at org.apache.flink.runtime.state.filesystem.
>>>>>>>>>>>> FsCheckpointStreamFactory.createBasePath(FsCheck
>>>>>>>>>>>> pointStreamFactory.java:132)
>>>>>>>>>>>>
>>>>>>>>>>>> * The pipeline came back after a few restarts
and checkpoint
>>>>>>>>>>>> failures, after the hdfs issues were resolved.
>>>>>>>>>>>>
>>>>>>>>>>>> I would not have worried about the restart,
but it was evident
>>>>>>>>>>>> that I lost my operator state. Either it
was my kafka consumer that kept on
>>>>>>>>>>>> advancing it's offset between a start and
the next checkpoint failure ( a
>>>>>>>>>>>> minute's worth ) or the the operator that
had partial aggregates was lost.
>>>>>>>>>>>> I have a 15 minute window of counts on a
keyed operator
>>>>>>>>>>>>
>>>>>>>>>>>> I am using ROCKS DB and of course have checkpointing
turned on.
>>>>>>>>>>>>
>>>>>>>>>>>> The questions thus are
>>>>>>>>>>>>
>>>>>>>>>>>> * Should a pipeline be restarted if checkpoint
fails ?
>>>>>>>>>>>> * Why on restart did the operator state did
not recreate ?
>>>>>>>>>>>> * Is the nature of the exception thrown have
to do with any of
>>>>>>>>>>>> this b'coz suspend and resume from a save
point work as expected ?
>>>>>>>>>>>> * And though I am pretty sure, are operators
like the Window
>>>>>>>>>>>> operator stateful by drfault and thus if
I have timeWindow(Time.of(window_
>>>>>>>>>>>> size, TimeUnit.MINUTES)).reduce(new ReduceFunction(),
new
>>>>>>>>>>>> WindowFunction()), the state is managed by
flink ?
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks.
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>>
>>
>

Mime
View raw message