flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Vishal Santoshi <vishal.santo...@gmail.com>
Subject Re: Failing to recover once checkpoint fails
Date Thu, 05 Oct 2017 15:40:52 GMT
I think this is the offending piece. There is a catch all Exception, which
IMHO should understand a recoverable exception from an unrecoverable on.


try {
completedCheckpoint = retrieveCompletedCheckpoint(checkpointStateHandle);
if (completedCheckpoint != null) {
completedCheckpoints.add(completedCheckpoint);
}
} catch (Exception e) {
LOG.warn("Could not retrieve checkpoint. Removing it from the completed " +
"checkpoint store.", e);
// remove the checkpoint with broken state handle
removeBrokenStateHandle(checkpointStateHandle.f1, checkpointStateHandle.f0);
}

On Thu, Oct 5, 2017 at 10:57 AM, Vishal Santoshi <vishal.santoshi@gmail.com>
wrote:

> So this is the issue and tell us that it is wrong. ZK had some state (
> backed by hdfs ) that referred to a checkpoint ( the same exact last
> successful checkpoint that was successful before NN screwed us ). When the
> JM tried to recreate the state and b'coz NN was down failed to retrieve the
> CHK handle from hdfs and conveniently ( and I think very wrongly ) removed
> the CHK from being considered and cleaned the pointer ( though failed as
> was NN was down and is obvious from the dangling file in recovery ) . The
> metadata itself was on hdfs and failure in retrieving should have been a
> stop all, not going to trying doing magic exception rather than starting
> from a blank state.
>
> org.apache.flink.util.FlinkException: Could not retrieve checkpoint 44286
> from state handle under /0000000000000044286. This indicates that the
> retrieved state handle is broken. Try cleaning the state handle store.
>
>
>
>
>
>
> On Thu, Oct 5, 2017 at 10:13 AM, Vishal Santoshi <
> vishal.santoshi@gmail.com> wrote:
>
>> Also note that  the zookeeper recovery did  ( sadly on the same hdfs
>> cluster ) also showed the same behavior. It had the pointers to the chk
>> point  ( I  think that is what it does, keeps metadata of where the
>> checkpoint etc  ) .  It too decided to keep the recovery file from the
>> failed state.
>>
>> -rw-r--r--   3 root hadoop       7041 2017-10-04 13:55
>> /flink-recovery/prod/completedCheckpoint6c9096bb9ed4
>>
>> -rw-r--r--   3 root hadoop       7044 2017-10-05 10:07
>> /flink-recovery/prod/completedCheckpoint7c5a19300092
>>
>> This is getting a little interesting. What say you :)
>>
>>
>>
>> On Thu, Oct 5, 2017 at 9:26 AM, Vishal Santoshi <
>> vishal.santoshi@gmail.com> wrote:
>>
>>> Another thing I noted was this thing
>>>
>>> drwxr-xr-x   - root hadoop          0 2017-10-04 13:54
>>> /flink-checkpoints/prod/c4af8dfa864e2f9a51764de9f0725b39/chk-44286
>>>
>>> drwxr-xr-x   - root hadoop          0 2017-10-05 09:15
>>> /flink-checkpoints/prod/c4af8dfa864e2f9a51764de9f0725b39/chk-45428
>>>
>>>
>>> Generally what Flink does IMHO is that it replaces the chk point
>>> directory with a new one. I see it happening now. Every minute it replaces
>>> the old directory.  In this job's case however, it did not delete the
>>> 2017-10-04 13:54  and hence the chk-44286 directory.  This was the last
>>> chk-44286 (  I think  )  successfully created before NN had issues but as
>>> is usual did not delete this  chk-44286. It looks as if it started with a
>>> blank slate ???????? Does this strike a chord ?????
>>>
>>> On Thu, Oct 5, 2017 at 8:56 AM, Vishal Santoshi <
>>> vishal.santoshi@gmail.com> wrote:
>>>
>>>> Hello Fabian,
>>>>                       First of all congratulations on this fabulous
>>>> framework. I have worked with GDF and though GDF has some natural pluses
>>>> Flink's state management is far more advanced. With kafka as a source it
>>>> negates issues GDF has ( GDF integration with pub/sub is organic and that
>>>> is to be expected but non FIFO pub/sub is an issue with windows on event
>>>> time etc )
>>>>
>>>>                    Coming back to this issue. We have that same kafka
>>>> topic feeding a streaming druid datasource and we do not see any issue
>>>> there, so so data loss on the source, kafka is not applicable. I am totally
>>>> certain that the "retention" time was not an issue. It is 4 days of
>>>> retention and we fixed this issue within 30 minutes. We could replay kafka
>>>> with a new consumer group.id and that worked fine.
>>>>
>>>>
>>>> Note these properties and see if they strike a chord.
>>>>
>>>> * The setCommitOffsetsOnCheckpoints(boolean) for kafka consumers is
>>>> the default true. I bring this up to see whether flink will in any
>>>> circumstance drive consumption on the kafka perceived offset rather than
>>>> the one in the checkpoint.
>>>>
>>>> * The state.backend.fs.memory-threshold: 0 has not been set.  The
>>>> state is big enough though therefore IMHO no way the state is stored along
>>>> with the meta data in JM ( or ZK ? ) . The reason I bring this up is to
>>>> make sure when you say that the size has to be less than 1024bytes , you
>>>> are talking about cumulative state of the pipeine.
>>>>
>>>> * We have a good sense of SP ( save point )  and CP ( checkpoint ) and
>>>> certainly understand that they actually are not dissimilar. However in this
>>>> case there were multiple attempts to restart the pipe before it finally
>>>> succeeded.
>>>>
>>>> * Other hdfs related poperties.
>>>>
>>>>  state.backend.fs.checkpointdir: hdfs:///flink-checkpoints/<%=
>>>> flink_hdfs_root %>
>>>>
>>>>  state.savepoints.dir: hdfs:///flink-savepoints/<%= flink_hdfs_root %>
>>>>
>>>>  recovery.zookeeper.storageDir: hdfs:///flink-recovery/<%= flink_hdfs_root
%>
>>>>
>>>>
>>>>
>>>> Do these make sense ? Is there anything else I should look at.  Please
>>>> also note that it is the second time this has happened. The first time I
>>>> was vacationing and was not privy to the state of the flink pipeline, but
>>>> the net effect were similar. The counts for the first window after an
>>>> internal restart dropped.
>>>>
>>>>
>>>>
>>>>
>>>> Thank you for you patience and regards,
>>>>
>>>> Vishal
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Thu, Oct 5, 2017 at 5:01 AM, Fabian Hueske <fhueske@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Vishal,
>>>>>
>>>>> window operators are always stateful because the operator needs to
>>>>> remember previously received events (WindowFunction) or intermediate
>>>>> results (ReduceFunction).
>>>>> Given the program you described, a checkpoint should include the Kafka
>>>>> consumer offset and the state of the window operator. If the program
>>>>> eventually successfully (i.e., without an error) recovered from the last
>>>>> checkpoint, all its state should have been restored. Since the last
>>>>> checkpoint was before HDFS went into safe mode, the program would have
been
>>>>> reset to that point. If the Kafka retention time is less than the time
it
>>>>> took to fix HDFS you would have lost data because it would have been
>>>>> removed from Kafka. If that's not the case, we need to investigate this
>>>>> further because a checkpoint recovery must not result in state loss.
>>>>>
>>>>> Restoring from a savepoint is not so much different from automatic
>>>>> checkpoint recovery. Given that you have a completed savepoint, you can
>>>>> restart the job from that point. The main difference is that checkpoints
>>>>> are only used for internal recovery and usually discarded once the job
is
>>>>> terminated while savepoints are retained.
>>>>>
>>>>> Regarding your question if a failed checkpoint should cause the job to
>>>>> fail and recover I'm not sure what the current status is.
>>>>> Stefan (in CC) should know what happens if a checkpoint fails.
>>>>>
>>>>> Best, Fabian
>>>>>
>>>>> 2017-10-05 2:20 GMT+02:00 Vishal Santoshi <vishal.santoshi@gmail.com>:
>>>>>
>>>>>> To add to it, my pipeline is a simple
>>>>>>
>>>>>> keyBy(0)
>>>>>>         .timeWindow(Time.of(window_size, TimeUnit.MINUTES))
>>>>>>         .allowedLateness(Time.of(late_by, TimeUnit.SECONDS))
>>>>>>         .reduce(new ReduceFunction(), new WindowFunction())
>>>>>>
>>>>>>
>>>>>> On Wed, Oct 4, 2017 at 8:19 PM, Vishal Santoshi <
>>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>>
>>>>>>> Hello folks,
>>>>>>>
>>>>>>> As far as I know checkpoint failure should be ignored and retried
>>>>>>> with potentially larger state. I had this situation
>>>>>>>
>>>>>>> * hdfs went into a safe mode b'coz of Name Node issues
>>>>>>> * exception was thrown
>>>>>>>
>>>>>>>     org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException):
>>>>>>> Operation category WRITE is not supported in state standby. Visit
>>>>>>> https://s.apache.org/sbnn-error
>>>>>>>     ..................
>>>>>>>
>>>>>>>     at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(Had
>>>>>>> oopFileSystem.java:453)
>>>>>>>         at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.
>>>>>>> mkdirs(SafetyNetWrapperFileSystem.java:111)
>>>>>>>         at org.apache.flink.runtime.state.filesystem.
>>>>>>> FsCheckpointStreamFactory.createBasePath(FsCheckpointStreamFactory.
>>>>>>> java:132)
>>>>>>>
>>>>>>> * The pipeline came back after a few restarts and checkpoint
>>>>>>> failures, after the hdfs issues were resolved.
>>>>>>>
>>>>>>> I would not have worried about the restart, but it was evident
that
>>>>>>> I lost my operator state. Either it was my kafka consumer that
kept on
>>>>>>> advancing it's offset between a start and the next checkpoint
failure ( a
>>>>>>> minute's worth ) or the the operator that had partial aggregates
was lost.
>>>>>>> I have a 15 minute window of counts on a keyed operator
>>>>>>>
>>>>>>> I am using ROCKS DB and of course have checkpointing turned on.
>>>>>>>
>>>>>>> The questions thus are
>>>>>>>
>>>>>>> * Should a pipeline be restarted if checkpoint fails ?
>>>>>>> * Why on restart did the operator state did not recreate ?
>>>>>>> * Is the nature of the exception thrown have to do with any of
this
>>>>>>> b'coz suspend and resume from a save point work as expected ?
>>>>>>> * And though I am pretty sure, are operators like the Window
>>>>>>> operator stateful by drfault and thus if I have timeWindow(Time.of(window_
>>>>>>> size, TimeUnit.MINUTES)).reduce(new ReduceFunction(), new
>>>>>>> WindowFunction()), the state is managed by flink ?
>>>>>>>
>>>>>>> Thanks.
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message