flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tillrohrmann <...@git.apache.org>
Subject [GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery
Date Thu, 15 Feb 2018 15:33:25 GMT
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5239#discussion_r168502352
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStore.java
---
    @@ -19,92 +19,224 @@
     package org.apache.flink.runtime.state;
     
     import org.apache.flink.api.common.JobID;
    -import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
     import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
     import org.apache.flink.runtime.jobgraph.JobVertexID;
    -import org.apache.flink.util.ExceptionUtils;
     
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import javax.annotation.Nonnegative;
     import javax.annotation.Nonnull;
     import javax.annotation.Nullable;
    +import javax.annotation.concurrent.GuardedBy;
     
     import java.io.File;
    -import java.util.HashMap;
    +import java.util.Arrays;
    +import java.util.Iterator;
     import java.util.Map;
    +import java.util.SortedMap;
    +import java.util.TreeMap;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.Executor;
     
     /**
      * This class will service as a task-manager-level local storage for local checkpointed
state. The purpose is to provide
      * access to a state that is stored locally for a faster recovery compared to the state
that is stored remotely in a
      * stable store DFS. For now, this storage is only complementary to the stable storage
and local state is typically
      * lost in case of machine failures. In such cases (and others), client code of this
class must fall back to using the
      * slower but highly available store.
    - *
    - * TODO this is currently a placeholder / mock that still must be implemented!
      */
     public class TaskLocalStateStore {
     
    -	/** */
    +	/** Logger for this class. */
    +	private static final Logger LOG = LoggerFactory.getLogger(TaskLocalStateStore.class);
    +
    +	/** Maximum number of retained snapshots. */
    +	private static final int MAX_RETAINED_SNAPSHOTS = 5;
    +
    +	/** Dummy value to use instead of null to satisfy {@link ConcurrentHashMap}. */
    +	private final TaskStateSnapshot NULL_DUMMY = new TaskStateSnapshot();
    +
    +	/** JobID from the owning subtask. */
     	private final JobID jobID;
     
    -	/** */
    +	/** JobVertexID of the owning subtask. */
     	private final JobVertexID jobVertexID;
     
    -	/** */
    +	/** Subtask index of the owning subtask. */
     	private final int subtaskIndex;
     
    -	/** */
    -	private final Map<Long, TaskStateSnapshot> storedTaskStateByCheckpointID;
    -
     	/** The root directories for all local state of this {@link TaskLocalStateStore}. */
     	private final File[] localStateRootDirectories;
     
    +	/** Executor that runs the discarding of released state objects. */
    +	private final Executor discardExecutor;
    +
    +	/** Lock for synchronisation on the storage map and the discarded status. */
    +	private final Object lock;
    +
    +	/** Status flag if this store was already discarded. */
    +	@GuardedBy("lock")
    +	private boolean discarded;
    +
    +	/** Maps checkpoint ids to local TaskStateSnapshots. */
    +	@GuardedBy("lock")
    +	private final SortedMap<Long, TaskStateSnapshot> storedTaskStateByCheckpointID;
    +
     	public TaskLocalStateStore(
    -		JobID jobID,
    -		JobVertexID jobVertexID,
    -		int subtaskIndex,
    -		File[] localStateRootDirectories) {
    +		@Nonnull JobID jobID,
    +		@Nonnull JobVertexID jobVertexID,
    +		@Nonnegative int subtaskIndex,
    +		@Nonnull File[] localStateRootDirectories,
    +		@Nonnull Executor discardExecutor) {
     
     		this.jobID = jobID;
     		this.jobVertexID = jobVertexID;
     		this.subtaskIndex = subtaskIndex;
    -		this.storedTaskStateByCheckpointID = new HashMap<>();
     		this.localStateRootDirectories = localStateRootDirectories;
    +		this.discardExecutor = discardExecutor;
    +		this.lock = new Object();
    +		this.storedTaskStateByCheckpointID = new TreeMap<>();
    +		this.discarded = false;
     	}
     
    +	@Nonnull
     	protected String createSubtaskPath() {
     		return jobID + File.separator + jobVertexID + File.separator + subtaskIndex;
     	}
     
    +	/**
    +	 * Stores the local state for the given checkpoint id.
    +	 *
    +	 * @param checkpointId id for the checkpoint that created the local state that will
be stored.
    +	 * @param localState the local state to store.
    +	 */
     	public void storeLocalState(
    -		@Nonnull CheckpointMetaData checkpointMetaData,
    +		@Nonnegative long checkpointId,
     		@Nullable TaskStateSnapshot localState) {
     
    -		TaskStateSnapshot previous =
    -			storedTaskStateByCheckpointID.put(checkpointMetaData.getCheckpointId(), localState);
    +		if (localState == null) {
    +			localState = NULL_DUMMY;
    +		}
     
    -		if (previous != null) {
    -			throw new IllegalStateException("Found previously registered local state for checkpoint
with id " +
    -				checkpointMetaData.getCheckpointId() + "! This indicated a problem.");
    +		LOG.info("Storing local state for checkpoint {}.", checkpointId);
    +		LOG.debug("Local state for checkpoint {} is {}.", checkpointId, localState);
    +
    +		synchronized (lock) {
    +			if (discarded) {
    +				// we ignore late stores and simply discard the state.
    +				discardStateObject(localState, checkpointId);
    +			} else {
    +				TaskStateSnapshot previous =
    +					storedTaskStateByCheckpointID.put(checkpointId, localState);
    +
    +				if (previous != null) {
    +					// this should never happen.
    +					discardStateObject(previous, checkpointId);
    +					throw new IllegalStateException("Found previously registered local state for checkpoint
with id " +
    +						checkpointId + "! This indicated a problem.");
    +				}
    +
    +				// prune history.
    +				while (storedTaskStateByCheckpointID.size() > MAX_RETAINED_SNAPSHOTS) {
    +					Long removeCheckpointID = storedTaskStateByCheckpointID.firstKey();
    +					TaskStateSnapshot snapshot =
    +						storedTaskStateByCheckpointID.remove(storedTaskStateByCheckpointID.firstKey());
    --- End diff --
    
    I think we can directly pass in `removeCheckpointID`.


---

Mime
View raw message