flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From StefanRRichter <...@git.apache.org>
Subject [GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery
Date Thu, 15 Feb 2018 15:42:40 GMT
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5239#discussion_r168514624
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
---
    @@ -20,35 +20,47 @@
     
     import org.apache.flink.annotation.VisibleForTesting;
     import org.apache.flink.api.common.JobID;
    +import org.apache.flink.runtime.clusterframework.types.AllocationID;
     import org.apache.flink.runtime.jobgraph.JobVertexID;
    -import org.apache.flink.util.ExceptionUtils;
    -import org.apache.flink.util.Preconditions;
     
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import javax.annotation.Nonnegative;
     import javax.annotation.Nonnull;
     
     import java.io.File;
    -import java.util.HashMap;
     import java.util.Map;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.Executor;
     
     /**
      * This class holds the all {@link TaskLocalStateStore} objects for a task executor (manager).
    - *
    - * TODO: this still still work in progress and partially still acts as a placeholder.
      */
     public class TaskExecutorLocalStateStoresManager {
     
    +	/** Logger for this class. */
    +	private static final Logger LOG = LoggerFactory.getLogger(TaskExecutorLocalStateStoresManager.class);
    +
     	/**
     	 * This map holds all local state stores for tasks running on the task manager / executor
that own the instance of
    -	 * this.
    +	 * this. Maps from allocation id to all the subtask's local state stores.
     	 */
    -	private final Map<JobID, Map<JobVertexSubtaskKey, TaskLocalStateStore>>
taskStateStoresByJobID;
    +	private final Map<AllocationID, Map<JobVertexSubtaskKey, TaskLocalStateStore>>
taskStateStoresByAllocationID;
     
     	/** This is the root directory for all local state of this task manager / executor.
*/
     	private final File[] localStateRootDirectories;
     
    -	public TaskExecutorLocalStateStoresManager(File[] localStateRootDirectories) {
    -		this.taskStateStoresByJobID = new HashMap<>();
    -		this.localStateRootDirectories = Preconditions.checkNotNull(localStateRootDirectories);
    +	/** Executor that runs the discarding of released state objects. */
    +	private final Executor discardExecutor;
    +
    +	public TaskExecutorLocalStateStoresManager(
    +		@Nonnull File[] localStateRootDirectories,
    +		@Nonnull Executor discardExecutor) {
    --- End diff --
    
    At least it is highlighting potential problems in the IDE and you can instruct the compiler
to introduce runtime checks if you wish so (if you compile with Intellij this is the default).


---

Mime
View raw message