flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Seth Wiesman (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-6315) Notify on checkpoint timeout
Date Wed, 19 Apr 2017 14:47:41 GMT

    [ https://issues.apache.org/jira/browse/FLINK-6315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15974811#comment-15974811

Seth Wiesman commented on FLINK-6315:

[~aljoscha] Apologies, didn't see this comment. Might have jumped the gun a little bit sending
in this PR 

In an eventually consistent file system, PUT operations are the only ones that can be considered
valid. That means that invalid data can never be deleted. So instead valid data is marked
valid when it has been fully checkpointed.

Each sink will buffer data locally. Then on checkpoint that data will be copied to the final
FS (say S3) in a bucket that whose path is a combination of the checkpoint id and timestamp
of when the checkpoint was initiated so that it is always unique. When the checkpoint is completed
a single flag file is written to the bucket, marking it valid. 

In practice, every operator can attempt to write this file to the bucket; even though overwriting
a file is considered an inconsistent operation, all operators are writing identical empty
files so it’s not actually an issue. This means that I am only reliant on a single operator
ever receiving the checkpoint complete message. I was not aware of the possibility that no
operator would receive the message so I may need to rethink this bit. 

As far as notifyOnCheckpointTimeout goes, each operator keeps track of which files it has
uploaded for each checkpoint. If checkpoint A times out then those files must be re-uploaded
on checkpoint B, there is a diagram on FLINK-6306 showing this process. If the number of concurrent
checkpoints is set to 1 then this is trivial, if a new checkpoint begins before the last completed
it must have timed out. This becomes more difficult when concurrent checkpoints are thrown
into the mix, I need a way to signal that a previous checkpoint has timed out and it is up
to the next to upload those files.

> Notify on checkpoint timeout 
> -----------------------------
>                 Key: FLINK-6315
>                 URL: https://issues.apache.org/jira/browse/FLINK-6315
>             Project: Flink
>          Issue Type: New Feature
>          Components: Core
>            Reporter: Seth Wiesman
>            Assignee: Seth Wiesman
> A common use case when writing a custom operator that outputs data to some third party
location to partially output on checkpoint and then commit on notifyCheckpointComplete. If
that external system does not gracefully handle rollbacks (such as Amazon S3 not allowing
consistent delete operations) then that data needs to be handled by the next checkpoint. 
> The idea is to add a new interface similar to CheckpointListener that provides a callback
when the CheckpointCoordinator timesout a checkpoint
> {code:java}
> /**
>  * This interface must be implemented by functions/operations that want to receive
>  * a notification if a checkpoint has been {@link org.apache.flink.runtime.checkpoint.CheckpointCoordinator}
>  */
> public interface CheckpointTimeoutListener {
> 	/**
> 	 * This method is called as a notification if a distributed checkpoint has been timed
> 	 *
> 	 * @param checkpointId The ID of the checkpoint that has been timed out.
> 	 * @throws Exception
> 	 */
> 	void notifyCheckpointTimeout(long checkpointId) throws Exception;
> }
> {code}

This message was sent by Atlassian JIRA

View raw message