flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-4512) Add option for persistent checkpoints
Date Wed, 12 Oct 2016 09:49:20 GMT

    [ https://issues.apache.org/jira/browse/FLINK-4512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15568226#comment-15568226
] 

ASF GitHub Bot commented on FLINK-4512:
---------------------------------------

Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2608#discussion_r82971361
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
---
    @@ -282,29 +279,71 @@ public boolean isShutdown() {
     	//  Handling checkpoints and messages
     	// --------------------------------------------------------------------------------------------
     
    -	public Future<String> triggerSavepoint(long timestamp) throws Exception {
    -		CheckpointTriggerResult result = triggerCheckpoint(timestamp, CheckpointProperties.forStandardSavepoint());
    +	/**
    +	 * Triggers a savepoint with the default savepoint directory as a target.
    +	 *
    +	 * @param timestamp The timestamp for the savepoint.
    +	 * @return A future to the completed checkpoint
    +	 * @throws IllegalStateException If no default savepoint directory has been configured
    +	 * @throws Exception Failures during triggering are forwarded
    +	 */
    +	public Future<CompletedCheckpoint> triggerSavepoint(long timestamp) throws Exception
{
    +		return triggerSavepoint(timestamp, null);
    +	}
    +
    +	/**
    +	 * Triggers a savepoint with the given savepoint directory as a target.
    +	 *
    +	 * @param timestamp The timestamp for the savepoint.
    +	 * @param savepointDirectory Target directory for the savepoint.
    +	 * @return A future to the completed checkpoint
    +	 * @throws IllegalStateException If no savepoint directory has been
    +	 *                               specified and no default savepoint directory has been
    +	 *                               configured
    +	 * @throws Exception             Failures during triggering are forwarded
    +	 */
    +	public Future<CompletedCheckpoint> triggerSavepoint(long timestamp, String savepointDirectory)
throws Exception {
    +		String targetDirectory;
    +		if (savepointDirectory != null) {
    +			targetDirectory = savepointDirectory;
    +		} else if (this.savepointDirectory != null) {
    +			targetDirectory = this.savepointDirectory;
    +		} else {
    +			throw new IllegalStateException("No savepoint directory configured. " +
    +					"You can either specify a directory when triggering this savepoint or " +
    +					"configure a cluster-wide default via key '" +
    +					ConfigConstants.SAVEPOINT_DIRECTORY_KEY + "'.");
    +		}
    +
    +		CheckpointProperties props = CheckpointProperties.forStandardSavepoint();
    +		CheckpointTriggerResult result = triggerCheckpoint(timestamp, props, targetDirectory);
     
     		if (result.isSuccess()) {
    -			PendingSavepoint savepoint = (PendingSavepoint) result.getPendingCheckpoint();
    -			return savepoint.getCompletionFuture();
    -		}
    -		else {
    -			return Futures.failed(new Exception("Failed to trigger savepoint: " + result.getFailureReason().message()));
    +			return result.getPendingCheckpoint().getCompletionFuture();
    +		} else {
    +			CompletableFuture<CompletedCheckpoint> failed = new FlinkCompletableFuture<>();
    +			failed.completeExceptionally(new Exception("Failed to trigger savepoint: " + result.getFailureReason().message()));
    --- End diff --
    
    `CheckpointDeclineReason` is not an `Exception`, but an enum of decline reasons.


> Add option for persistent checkpoints
> -------------------------------------
>
>                 Key: FLINK-4512
>                 URL: https://issues.apache.org/jira/browse/FLINK-4512
>             Project: Flink
>          Issue Type: Sub-task
>          Components: State Backends, Checkpointing
>            Reporter: Ufuk Celebi
>            Assignee: Ufuk Celebi
>
> Allow periodic checkpoints to be persisted by writing out their meta data. This is what
we currently do for savepoints, but in the future checkpoints and savepoints are likely to
diverge with respect to guarantees they give for updatability, etc.
> This means that the difference between persistent checkpoints and savepoints in the long
term will be that persistent checkpoints can only be restored with the same job settings (like
parallelism, etc.)
> Regular and persisted checkpoints should behave differently with respect to disposal
in *globally* terminal job states (FINISHED, CANCELLED, FAILED): regular checkpoints are cleaned
up in all of these cases whereas persistent checkpoints only on FINISHED. Maybe with the option
to customize behaviour on CANCELLED or FAILED.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message