flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-5897) Untie Checkpoint Externalization from FileSystems
Date Mon, 27 Feb 2017 12:07:46 GMT

    [ https://issues.apache.org/jira/browse/FLINK-5897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15885661#comment-15885661
] 

ASF GitHub Bot commented on FLINK-5897:
---------------------------------------

Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3411#discussion_r103192397
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
---
    @@ -203,48 +208,67 @@ void setStatsCallback(@Nullable PendingCheckpointStats trackerCallback)
{
     		return onCompletionPromise;
     	}
     
    -	public CompletedCheckpoint finalizeCheckpoint() {
    +	public CompletedCheckpoint finalizeCheckpointExternalized() throws IOException {
    +
     		synchronized (lock) {
    -			Preconditions.checkState(isFullyAcknowledged(), "Pending checkpoint has not been fully
acknowledged yet.");
    -
    -			// Persist if required
    -			String externalPath = null;
    -			if (props.externalizeCheckpoint()) {
    -				try {
    -					Savepoint savepoint = new SavepointV1(checkpointId, taskStates.values());
    -					externalPath = SavepointStore.storeSavepoint(
    -							targetDirectory,
    -							savepoint
    -					);
    -				} catch (IOException e) {
    -					LOG.error("Failed to persist checkpoint {}.",checkpointId, e);
    -				}
    -			}
    +			checkState(isFullyAcknowledged(), "Pending checkpoint has not been fully acknowledged
yet.");
     
    -			CompletedCheckpoint completed = new CompletedCheckpoint(
    -					jobId,
    -					checkpointId,
    -					checkpointTimestamp,
    -					System.currentTimeMillis(),
    -					new HashMap<>(taskStates),
    -					props,
    -					externalPath);
    +			// externalize the metadata
    +			final Savepoint savepoint = new SavepointV1(checkpointId, taskStates.values());
     
    -			onCompletionPromise.complete(completed);
    +			// TEMP FIX - The savepoint store is strictly typed to file systems currently
    +			//            but the checkpoints think more generic. we need to work with file handles
    +			//            here until the savepoint serializer accepts a generic stream factory
     
    -			if (statsCallback != null) {
    -				// Finalize the statsCallback and give the completed checkpoint a
    -				// callback for discards.
    -				CompletedCheckpointStats.DiscardCallback discardCallback = statsCallback.reportCompletedCheckpoint(externalPath);
    -				completed.setDiscardCallback(discardCallback);
    -			}
    +			final FileStateHandle metadataHandle = SavepointStore.storeSavepointToHandle(targetDirectory,
savepoint);
    +			final String externalPointer = metadataHandle.getFilePath().getParent().toString();
     
    -			dispose(false);
    +			return finalizeInternal(metadataHandle, externalPointer);
    +		}
    +	}
    +
    +	public CompletedCheckpoint finalizeCheckpointNonExternalized() {
    +		synchronized (lock) {
    +			checkState(isFullyAcknowledged(), "Pending checkpoint has not been fully acknowledged
yet.");
     
    -			return completed;
    +			// finalize without external metadata
    +			return finalizeInternal(null, null);
     		}
     	}
     
    +	@GuardedBy("lock")
    +	private CompletedCheckpoint finalizeInternal(
    +			@Nullable StreamStateHandle externalMetadata,
    +			@Nullable String externalPointer) {
    +
    +		assert(Thread.holdsLock(lock));
    +
    +		CompletedCheckpoint completed = new CompletedCheckpoint(
    +				jobId,
    +				checkpointId,
    +				checkpointTimestamp,
    +				System.currentTimeMillis(),
    +				new HashMap<>(taskStates),
    +				props,
    +				externalMetadata,
    +				externalPointer);
    +
    +		onCompletionPromise.complete(completed);
    --- End diff --
    
    If the creation `CompletedCheckpoint` fails (for example because it the external metadata
is null although the properties say the checkpoint should have been externalized), the promise
is never completed. I think we should do a try catch and fail the promise in that case.


> Untie Checkpoint Externalization from FileSystems
> -------------------------------------------------
>
>                 Key: FLINK-5897
>                 URL: https://issues.apache.org/jira/browse/FLINK-5897
>             Project: Flink
>          Issue Type: Sub-task
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.2.0
>            Reporter: Stephan Ewen
>            Assignee: Stephan Ewen
>             Fix For: 1.3.0
>
>
> Currently, externalizing checkpoint metadata and storing savepoints depends strictly
on FileSystems.
> Since state backends are more general, storing and cleaning up checkpoints with state
backend hooks requires to untie savepoints and externalized checkpoints from filesystems.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message