flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-8360) Implement task-local state recovery
Date Thu, 01 Feb 2018 12:19:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348492#comment-16348492
] 

ASF GitHub Bot commented on FLINK-8360:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5239#discussion_r165340082
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStore.java
---
    @@ -46,26 +52,63 @@
     	/** */
     	private final int subtaskIndex;
     
    +	/** */
    +	private final Map<Long, TaskStateSnapshot> storedTaskStateByCheckpointID;
    +
    +	/** This is the base directory for all local state of the subtask that owns this {@link
TaskLocalStateStore}. */
    +	private final File subtaskLocalStateBaseDirectory;
    +
     	public TaskLocalStateStore(
     		JobID jobID,
     		JobVertexID jobVertexID,
    -		int subtaskIndex) {
    +		int subtaskIndex,
    +		File localStateRootDir) {
     
     		this.jobID = jobID;
     		this.jobVertexID = jobVertexID;
     		this.subtaskIndex = subtaskIndex;
    +		this.storedTaskStateByCheckpointID = new HashMap<>();
    +		this.subtaskLocalStateBaseDirectory =
    +			new File(localStateRootDir, createSubtaskPath(jobID, jobVertexID, subtaskIndex));
    +	}
    +
    +	static String createSubtaskPath(JobID jobID, JobVertexID jobVertexID, int subtaskIndex)
{
    +		return "jid-" + jobID + "_vtx-" + jobVertexID + "_sti-" + subtaskIndex;
     	}
     
     	public void storeLocalState(
     		@Nonnull CheckpointMetaData checkpointMetaData,
     		@Nullable TaskStateSnapshot localState) {
     
    -		if (localState != null) {
    -			throw new UnsupportedOperationException("Implement this before actually providing
local state!");
    +		TaskStateSnapshot previous =
    +			storedTaskStateByCheckpointID.put(checkpointMetaData.getCheckpointId(), localState);
    +
    +		if (previous != null) {
    +			throw new IllegalStateException("Found previously registered local state for checkpoint
with id " +
    +				checkpointMetaData.getCheckpointId() + "! This indicated a problem.");
     		}
     	}
     
    -	public void dispose() {
    -		//TODO
    +	public void dispose() throws Exception {
    +
    +		Exception collectedException = null;
    +
    +		for (TaskStateSnapshot snapshot : storedTaskStateByCheckpointID.values()) {
    +			try {
    +				snapshot.discardState();
    +			} catch (Exception discardEx) {
    +				collectedException = ExceptionUtils.firstOrSuppressed(discardEx, collectedException);
    +			}
    +		}
    +
    +		if (collectedException != null) {
    +			throw collectedException;
    +		}
    +
    +		FileUtils.deleteDirectoryQuietly(subtaskLocalStateBaseDirectory);
    --- End diff --
    
    Is there a way to retry the non discarded state handles based on this directory? If not,
then we could delete it also in case of a failure.


> Implement task-local state recovery
> -----------------------------------
>
>                 Key: FLINK-8360
>                 URL: https://issues.apache.org/jira/browse/FLINK-8360
>             Project: Flink
>          Issue Type: New Feature
>          Components: State Backends, Checkpointing
>            Reporter: Stefan Richter
>            Assignee: Stefan Richter
>            Priority: Major
>             Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main idea is
to have a secondary, local copy of the checkpointed state, while there is still a primary
copy in DFS that we report to the checkpoint coordinator.
> Recovery can attempt to restore from the secondary local copy, if available, to save
network bandwidth. This requires that the assignment from tasks to slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and can easily
enhance it to all other state types (e.g. operator state) later.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message