From issues-return-153201-archive-asf-public=cust-asf.ponee.io@flink.apache.org Thu Feb 15 16:33:24 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 5C22C18072F for ; Thu, 15 Feb 2018 16:33:23 +0100 (CET) Received: (qmail 52028 invoked by uid 500); 15 Feb 2018 15:33:20 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 51916 invoked by uid 99); 15 Feb 2018 15:33:20 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 15 Feb 2018 15:33:20 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 2522EE0910; Thu, 15 Feb 2018 15:33:20 +0000 (UTC) From: tillrohrmann To: issues@flink.apache.org Reply-To: issues@flink.apache.org References: In-Reply-To: Subject: [GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery Content-Type: text/plain Message-Id: <20180215153320.2522EE0910@git1-us-west.apache.org> Date: Thu, 15 Feb 2018 15:33:20 +0000 (UTC) Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5239#discussion_r168502943 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStore.java --- @@ -19,92 +19,224 @@ package org.apache.flink.runtime.state; import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.checkpoint.CheckpointMetaData; import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.util.ExceptionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnegative; import javax.annotation.Nonnull; import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; import java.io.File; -import java.util.HashMap; +import java.util.Arrays; +import java.util.Iterator; import java.util.Map; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; /** * This class will service as a task-manager-level local storage for local checkpointed state. The purpose is to provide * access to a state that is stored locally for a faster recovery compared to the state that is stored remotely in a * stable store DFS. For now, this storage is only complementary to the stable storage and local state is typically * lost in case of machine failures. In such cases (and others), client code of this class must fall back to using the * slower but highly available store. - * - * TODO this is currently a placeholder / mock that still must be implemented! */ public class TaskLocalStateStore { - /** */ + /** Logger for this class. */ + private static final Logger LOG = LoggerFactory.getLogger(TaskLocalStateStore.class); + + /** Maximum number of retained snapshots. */ + private static final int MAX_RETAINED_SNAPSHOTS = 5; + + /** Dummy value to use instead of null to satisfy {@link ConcurrentHashMap}. */ + private final TaskStateSnapshot NULL_DUMMY = new TaskStateSnapshot(); + + /** JobID from the owning subtask. */ private final JobID jobID; - /** */ + /** JobVertexID of the owning subtask. */ private final JobVertexID jobVertexID; - /** */ + /** Subtask index of the owning subtask. */ private final int subtaskIndex; - /** */ - private final Map storedTaskStateByCheckpointID; - /** The root directories for all local state of this {@link TaskLocalStateStore}. */ private final File[] localStateRootDirectories; + /** Executor that runs the discarding of released state objects. */ + private final Executor discardExecutor; + + /** Lock for synchronisation on the storage map and the discarded status. */ + private final Object lock; + + /** Status flag if this store was already discarded. */ + @GuardedBy("lock") + private boolean discarded; + + /** Maps checkpoint ids to local TaskStateSnapshots. */ + @GuardedBy("lock") + private final SortedMap storedTaskStateByCheckpointID; + public TaskLocalStateStore( - JobID jobID, - JobVertexID jobVertexID, - int subtaskIndex, - File[] localStateRootDirectories) { + @Nonnull JobID jobID, + @Nonnull JobVertexID jobVertexID, + @Nonnegative int subtaskIndex, + @Nonnull File[] localStateRootDirectories, + @Nonnull Executor discardExecutor) { this.jobID = jobID; this.jobVertexID = jobVertexID; this.subtaskIndex = subtaskIndex; - this.storedTaskStateByCheckpointID = new HashMap<>(); this.localStateRootDirectories = localStateRootDirectories; + this.discardExecutor = discardExecutor; + this.lock = new Object(); + this.storedTaskStateByCheckpointID = new TreeMap<>(); + this.discarded = false; } + @Nonnull protected String createSubtaskPath() { return jobID + File.separator + jobVertexID + File.separator + subtaskIndex; } + /** + * Stores the local state for the given checkpoint id. + * + * @param checkpointId id for the checkpoint that created the local state that will be stored. + * @param localState the local state to store. + */ public void storeLocalState( - @Nonnull CheckpointMetaData checkpointMetaData, + @Nonnegative long checkpointId, @Nullable TaskStateSnapshot localState) { - TaskStateSnapshot previous = - storedTaskStateByCheckpointID.put(checkpointMetaData.getCheckpointId(), localState); + if (localState == null) { + localState = NULL_DUMMY; + } - if (previous != null) { - throw new IllegalStateException("Found previously registered local state for checkpoint with id " + - checkpointMetaData.getCheckpointId() + "! This indicated a problem."); + LOG.info("Storing local state for checkpoint {}.", checkpointId); + LOG.debug("Local state for checkpoint {} is {}.", checkpointId, localState); + + synchronized (lock) { + if (discarded) { + // we ignore late stores and simply discard the state. + discardStateObject(localState, checkpointId); + } else { + TaskStateSnapshot previous = + storedTaskStateByCheckpointID.put(checkpointId, localState); + + if (previous != null) { + // this should never happen. + discardStateObject(previous, checkpointId); + throw new IllegalStateException("Found previously registered local state for checkpoint with id " + + checkpointId + "! This indicated a problem."); + } + + // prune history. + while (storedTaskStateByCheckpointID.size() > MAX_RETAINED_SNAPSHOTS) { + Long removeCheckpointID = storedTaskStateByCheckpointID.firstKey(); + TaskStateSnapshot snapshot = + storedTaskStateByCheckpointID.remove(storedTaskStateByCheckpointID.firstKey()); + discardStateObject(snapshot, removeCheckpointID); + } + } } } - public void dispose() throws Exception { - - Exception collectedException = null; - - for (TaskStateSnapshot snapshot : storedTaskStateByCheckpointID.values()) { - try { - snapshot.discardState(); - } catch (Exception discardEx) { - collectedException = ExceptionUtils.firstOrSuppressed(discardEx, collectedException); - } + /** + * Returns the local state that is stored under the given checkpoint id or null if nothing was stored under the id. + * + * @param checkpointID the checkpoint id by which we search for local state. + * @return the local state found for the given checkpoint id. Can be null + */ + @Nullable + public TaskStateSnapshot retrieveLocalState(long checkpointID) { + synchronized (lock) { + TaskStateSnapshot snapshot = storedTaskStateByCheckpointID.get(checkpointID); + return snapshot != NULL_DUMMY ? snapshot : null; } + } - if (collectedException != null) { - throw collectedException; + /** + * Disposes the state of all local snapshots managed by this object. + */ + public void dispose() { + synchronized (lock) { + for (Map.Entry entry : storedTaskStateByCheckpointID.entrySet()) { + discardStateObject(entry.getValue(), entry.getKey()); + } + discarded = true; --- End diff -- Shall we rename `discarded` into `disposed`? ---