flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-8360) Implement task-local state recovery
Date Thu, 15 Feb 2018 15:34:01 GMT

    [ https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16365744#comment-16365744
] 

ASF GitHub Bot commented on FLINK-8360:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5239#discussion_r168501441
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStore.java
---
    @@ -19,92 +19,224 @@
     package org.apache.flink.runtime.state;
     
     import org.apache.flink.api.common.JobID;
    -import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
     import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
     import org.apache.flink.runtime.jobgraph.JobVertexID;
    -import org.apache.flink.util.ExceptionUtils;
     
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import javax.annotation.Nonnegative;
     import javax.annotation.Nonnull;
     import javax.annotation.Nullable;
    +import javax.annotation.concurrent.GuardedBy;
     
     import java.io.File;
    -import java.util.HashMap;
    +import java.util.Arrays;
    +import java.util.Iterator;
     import java.util.Map;
    +import java.util.SortedMap;
    +import java.util.TreeMap;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.Executor;
     
     /**
      * This class will service as a task-manager-level local storage for local checkpointed
state. The purpose is to provide
      * access to a state that is stored locally for a faster recovery compared to the state
that is stored remotely in a
      * stable store DFS. For now, this storage is only complementary to the stable storage
and local state is typically
      * lost in case of machine failures. In such cases (and others), client code of this
class must fall back to using the
      * slower but highly available store.
    - *
    - * TODO this is currently a placeholder / mock that still must be implemented!
      */
     public class TaskLocalStateStore {
     
    -	/** */
    +	/** Logger for this class. */
    +	private static final Logger LOG = LoggerFactory.getLogger(TaskLocalStateStore.class);
    +
    +	/** Maximum number of retained snapshots. */
    +	private static final int MAX_RETAINED_SNAPSHOTS = 5;
    +
    +	/** Dummy value to use instead of null to satisfy {@link ConcurrentHashMap}. */
    +	private final TaskStateSnapshot NULL_DUMMY = new TaskStateSnapshot();
    --- End diff --
    
    Can this be static?


> Implement task-local state recovery
> -----------------------------------
>
>                 Key: FLINK-8360
>                 URL: https://issues.apache.org/jira/browse/FLINK-8360
>             Project: Flink
>          Issue Type: New Feature
>          Components: State Backends, Checkpointing
>            Reporter: Stefan Richter
>            Assignee: Stefan Richter
>            Priority: Major
>             Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main idea is
to have a secondary, local copy of the checkpointed state, while there is still a primary
copy in DFS that we report to the checkpoint coordinator.
> Recovery can attempt to restore from the secondary local copy, if available, to save
network bandwidth. This requires that the assignment from tasks to slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and can easily
enhance it to all other state types (e.g. operator state) later.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message