flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-8360) Implement task-local state recovery
Date Thu, 15 Feb 2018 15:34:01 GMT

    [ https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16365744#comment-16365744

ASF GitHub Bot commented on FLINK-8360:

Github user tillrohrmann commented on a diff in the pull request:

    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStore.java
    @@ -19,92 +19,224 @@
     package org.apache.flink.runtime.state;
     import org.apache.flink.api.common.JobID;
    -import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
     import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
     import org.apache.flink.runtime.jobgraph.JobVertexID;
    -import org.apache.flink.util.ExceptionUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import javax.annotation.Nonnegative;
     import javax.annotation.Nonnull;
     import javax.annotation.Nullable;
    +import javax.annotation.concurrent.GuardedBy;
     import java.io.File;
    -import java.util.HashMap;
    +import java.util.Arrays;
    +import java.util.Iterator;
     import java.util.Map;
    +import java.util.SortedMap;
    +import java.util.TreeMap;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.Executor;
      * This class will service as a task-manager-level local storage for local checkpointed
state. The purpose is to provide
      * access to a state that is stored locally for a faster recovery compared to the state
that is stored remotely in a
      * stable store DFS. For now, this storage is only complementary to the stable storage
and local state is typically
      * lost in case of machine failures. In such cases (and others), client code of this
class must fall back to using the
      * slower but highly available store.
    - *
    - * TODO this is currently a placeholder / mock that still must be implemented!
     public class TaskLocalStateStore {
    -	/** */
    +	/** Logger for this class. */
    +	private static final Logger LOG = LoggerFactory.getLogger(TaskLocalStateStore.class);
    +	/** Maximum number of retained snapshots. */
    +	private static final int MAX_RETAINED_SNAPSHOTS = 5;
    +	/** Dummy value to use instead of null to satisfy {@link ConcurrentHashMap}. */
    +	private final TaskStateSnapshot NULL_DUMMY = new TaskStateSnapshot();
    --- End diff --
    Can this be static?

> Implement task-local state recovery
> -----------------------------------
>                 Key: FLINK-8360
>                 URL: https://issues.apache.org/jira/browse/FLINK-8360
>             Project: Flink
>          Issue Type: New Feature
>          Components: State Backends, Checkpointing
>            Reporter: Stefan Richter
>            Assignee: Stefan Richter
>            Priority: Major
>             Fix For: 1.5.0
> This issue tracks the development of recovery from task-local state. The main idea is
to have a secondary, local copy of the checkpointed state, while there is still a primary
copy in DFS that we report to the checkpoint coordinator.
> Recovery can attempt to restore from the secondary local copy, if available, to save
network bandwidth. This requires that the assignment from tasks to slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and can easily
enhance it to all other state types (e.g. operator state) later.

This message was sent by Atlassian JIRA

View raw message