flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vipul singh <neoea...@gmail.com>
Subject Re: Questions about checkpoints/savepoints
Date Wed, 25 Oct 2017 06:53:30 GMT
Thanks Aljoscha for the explanations. I was able to recover from the last
externalized checkpoint, by using flink run -s <metadata file> <options>

I am curious, are there any options to save the metadata file name to some
other place like dynamo etc at the moment? The reason why I am asking is,
for the end launcher code we are writing, we want to ensure if a flink job
crashes, we can just start it from last known externalized checkpoint.
In the present senario, we have to list the contents of the s3 bucket which
saves the metadata, to see the last metadata before failure, and there
might a window where
we might run into read after write consistency of s3. Thoughts?

On Tue, Oct 24, 2017 at 2:13 AM, Aljoscha Krettek <aljoscha@apache.org>
wrote:

> Hi,
>
> That distinction with externalised checkpoints is a bit of a pitfall and
> I'm hoping that we can actually get rid of that distinction in the next
> version or the version after that. With that change, all checkpoints would
> always be externalised, since it's not really any noticeable overhead.
>
> Regarding read-after-write consistency, you should be fine since an the
> "externalised checkpoint", i.e. the metadata, is only one file. If you know
> the file-path (either from the Flink dashboard or by looking at the S3
> bucket) you can restore from it.
>
> Best,
> Aljoscha
>
>
> On 24. Oct 2017, at 08:22, vipul singh <neoeahit@gmail.com> wrote:
>
> Thanks Tony, that was the issue. I was thinking that when we use Rocksdb
> and provide an s3 path, it uses externalized checkpoints by default. Thanks
> so much!
>
> I have one followup question. Say in above case, I terminate the cluster,
> and since the metadata is on s3, and not on local storage, does flink avoid
> read after write consistency of s3? Would it be a valid concern, or we
> handle that case in externalized checkpoints as well, and dont deal with
> file system operations while dealing with retrieving externalized
> checkpoints on s3.
>
> Thanks,
> Vipul
>
>
>
> On Mon, Oct 23, 2017 at 11:00 PM, Tony Wei <tony19920430@gmail.com> wrote:
>
>> Hi,
>>
>> Did you enable externalized checkpoints? [1]
>>
>> Best,
>> Tony Wei
>>
>> [1] https://ci.apache.org/projects/flink/flink-docs-release-
>> 1.3/setup/checkpoints.html#externalized-checkpoints
>>
>> 2017-10-24 13:07 GMT+08:00 vipul singh <neoeahit@gmail.com>:
>>
>>> Thanks Aljoscha for the answer above.
>>>
>>> I am experimenting with savepoints and checkpoints on my end, so that we
>>> built fault tolerant application with exactly once semantics.
>>>
>>> I have been able to test various scenarios, but have doubts about one
>>> use case.
>>>
>>> My app is running on an emr cluster, and I am trying to test the case
>>> when a emr cluster is terminated. I have read that
>>> *state.checkpoints.dir *is responsible for storing metadata
>>> information, and links to data files in
>>> *state.backend.fs.checkpointdir.*
>>>
>>> For my application I have configured both
>>> *state.backend.fs.checkpointdir* and *state.checkpoints.dir*
>>>
>>> Also I have the following in my main app:
>>>
>>> env.enableCheckpointing(CHECKPOINT_TIME_MS)
>>>
>>> val CHECKPOINT_LOCATION = s"s3://${config.s3Bucket}/${config.s3BasePath}/${config.s3ExtensionPath}/checkpoints/rocksdb"
>>>
>>> val backend:RocksDBStateBackend =
>>>   new RocksDBStateBackend(CHECKPOINT_LOCATION)
>>>
>>> env.setStateBackend(backend)
>>> env.getCheckpointConfig.setMinPauseBetweenCheckpoints(CHECKPOINT_MIN_PAUSE)
>>> env.getCheckpointConfig.setCheckpointTimeout(CHECKPOINT_TIMEOUT_MS)
>>> env.getCheckpointConfig.setMaxConcurrentCheckpoints(CHECKPOINT_MAX_CONCURRENT)
>>>
>>>
>>> In the application startup logs I can see
>>> *state.backend.fs.checkpointdir* and *state.checkpoints.dir, *values
>>> being loaded. However when the checkpoint happens I dont see any content in
>>> the metadata dir. Is there something I am missing? Please let me know. I am
>>> using flink version 1.3
>>>
>>> Thanks,
>>> Vipul
>>>
>>>
>>>
>>> On Tue, Oct 10, 2017 at 7:55 AM, Aljoscha Krettek <aljoscha@apache.org>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> Flink does not rely on file system operations to list contents, all
>>>> necessary file paths are stored in the meta data file, as you guessed. This
>>>> is the reason savepoints also work with file systems that "only" have
>>>> read-after-write consistency.
>>>>
>>>> Best,
>>>> Aljoscha
>>>>
>>>>
>>>> On 10. Oct 2017, at 03:01, vipul singh <neoeahit@gmail.com> wrote:
>>>>
>>>> Thanks Stefan for the answers above. These are really helpful.
>>>>
>>>> I have a few followup questions:
>>>>
>>>>    1. I see my savepoints are created in a folder, which has a
>>>>    _metadata file and another file. Looking at the code
>>>>    <https://github.com/apache/flink/blob/6642768ad8f8c5d1856742a6d148f7724c20666c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java#L191>
>>>>    it seems like the metadata file contains tasks states, operator
>>>>    state and master states
>>>>    <https://github.com/apache/flink/blob/6642768ad8f8c5d1856742a6d148f7724c20666c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java#L87>.
>>>>    What is the purpose of the other file in the savepoint folder? My guess
is
>>>>    it should be a checkpoint file?
>>>>    2. I am planning to use s3 as my state backend, so want to ensure
>>>>    that application restarts are not affected by read-after-write consistency
>>>>    of s3( if I use s3 as a savepoint backend). I am curious how flink restores
>>>>    data from the _metadata file, and the other file? Does the _metadata file
>>>>    contain path to these other files? or would it do a listing on the s3
>>>>    folder?
>>>>
>>>>
>>>> Please let me know,
>>>>
>>>> Thanks,
>>>> Vipul
>>>>
>>>> On Tue, Sep 26, 2017 at 2:36 AM, Stefan Richter <
>>>> s.richter@data-artisans.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I have answered your questions inline:
>>>>>
>>>>>
>>>>>    1. It seems to me that checkpoints can be treated as flink
>>>>>    internal recovery mechanism, and savepoints act more as user-defined
>>>>>    recovery points. Would that be a correct assumption?
>>>>>
>>>>> You could see it that way, but I would describe savepoints more as
>>>>> user-defined *restart* points than *recovery* points. Please take a look
at
>>>>> my answers in this thread, because they cover most of your question:
>>>>>
>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nab
>>>>> ble.com/difference-between-checkpoints-amp-savepoints-td14787.html .
>>>>>
>>>>>
>>>>>    1. While cancelling an application with -s option, it specifies
>>>>>    the savepoint location. Is there a way during application startup
to
>>>>>    identify the last know savepoint from a folder by itself, and restart
from
>>>>>    there. Since I am saving my savepoints on s3, I want to avoid issues
>>>>>    arising from *ls* command on s3 due to read-after-write
>>>>>    consistency of s3.
>>>>>
>>>>> I don’t think that this feature exists, you have to specify the
>>>>> savepoint.
>>>>>
>>>>>
>>>>>    1. Suppose my application has a checkpoint at point t1, and say i
>>>>>    cancel this application sometime in future before the next available
>>>>>    checkpoint( say t1+x). If I start the application without specifying
the
>>>>>    savepoint, it will start from the last known checkpoint(at t1), which
wont
>>>>>    have the application state saved, since I had cancelled the application.
>>>>>    Would this is a correct assumption?
>>>>>
>>>>> If you restart a canceled application it will not consider
>>>>> checkpoints. They are only considered in recovery on failure. You need
to
>>>>> specify a savepoint or externalized checkpoint for restarts to make
>>>>> explicit that you intend to restart a job, and not to run a new instance
of
>>>>> the job.
>>>>>
>>>>>
>>>>>    1. Would using ExternalizedCheckpointCl
>>>>>    eanup.RETAIN_ON_CANCELLATION be same as manually saving regular
>>>>>    savepoints?
>>>>>
>>>>> Not the same, because checkpoints and savepoints are different in
>>>>> certain aspects, but both methods leave you with something that survives
>>>>> job cancelation and can be used to restart from a certain state.
>>>>>
>>>>> Best,
>>>>> Stefan
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Thanks,
>>>> Vipul
>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> Thanks,
>>> Vipul
>>>
>>
>>
>
>
> --
> Thanks,
> Vipul
>
>
>


-- 
Thanks,
Vipul

Mime
View raw message