flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From srich...@apache.org
Subject [13/26] flink git commit: [FLINK-8360][checkpointing] Implement state storage for local recovery and integrate with task lifecycle
Date Sun, 25 Feb 2018 16:11:47 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java
index 4277348..44ca9d8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java
@@ -23,6 +23,8 @@ import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
 
+import javax.annotation.Nonnull;
+
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.concurrent.RunnableFuture;
@@ -31,8 +33,11 @@ import java.util.concurrent.RunnableFuture;
  * This class is a default implementation for StateSnapshotContext.
  */
 public class StateSnapshotContextSynchronousImpl implements StateSnapshotContext, Closeable {
-	
+
+	/** Checkpoint id of the snapshot. */
 	private final long checkpointId;
+
+	/** Checkpoint timestamp of the snapshot. */
 	private final long checkpointTimestamp;
 	
 	/** Factory for he checkpointing stream */
@@ -47,7 +52,10 @@ public class StateSnapshotContextSynchronousImpl implements StateSnapshotContext
 	 */
 	private final CloseableRegistry closableRegistry;
 
+	/** Output stream for the raw keyed state. */
 	private KeyedStateCheckpointOutputStream keyedStateCheckpointOutputStream;
+
+	/** Output stream for the raw operator state. */
 	private OperatorStateCheckpointOutputStream operatorStateCheckpointOutputStream;
 
 	@VisibleForTesting
@@ -109,14 +117,23 @@ public class StateSnapshotContextSynchronousImpl implements StateSnapshotContext
 		return operatorStateCheckpointOutputStream;
 	}
 
-	public RunnableFuture<KeyedStateHandle> getKeyedStateStreamFuture() throws IOException {
-		KeyGroupsStateHandle keyGroupsStateHandle = closeAndUnregisterStreamToObtainStateHandle(keyedStateCheckpointOutputStream);
-		return new DoneFuture<KeyedStateHandle>(keyGroupsStateHandle);
+	@Nonnull
+	public RunnableFuture<SnapshotResult<KeyedStateHandle>> getKeyedStateStreamFuture() throws IOException {
+		KeyedStateHandle keyGroupsStateHandle =
+			closeAndUnregisterStreamToObtainStateHandle(keyedStateCheckpointOutputStream);
+		return toDoneFutureOfSnapshotResult(keyGroupsStateHandle);
 	}
 
-	public RunnableFuture<OperatorStateHandle> getOperatorStateStreamFuture() throws IOException {
-		OperatorStateHandle operatorStateHandle = closeAndUnregisterStreamToObtainStateHandle(operatorStateCheckpointOutputStream);
-		return new DoneFuture<>(operatorStateHandle);
+	@Nonnull
+	public RunnableFuture<SnapshotResult<OperatorStateHandle>> getOperatorStateStreamFuture() throws IOException {
+		OperatorStateHandle operatorStateHandle =
+			closeAndUnregisterStreamToObtainStateHandle(operatorStateCheckpointOutputStream);
+		return toDoneFutureOfSnapshotResult(operatorStateHandle);
+	}
+
+	private <T extends StateObject> RunnableFuture<SnapshotResult<T>> toDoneFutureOfSnapshotResult(T handle) {
+		SnapshotResult<T> snapshotResult = SnapshotResult.of(handle);
+		return DoneFuture.of(snapshotResult);
 	}
 
 	private <T extends StreamStateHandle> T closeAndUnregisterStreamToObtainStateHandle(
@@ -130,7 +147,7 @@ public class StateSnapshotContextSynchronousImpl implements StateSnapshotContext
 	}
 
 	private <T extends StreamStateHandle> void closeAndUnregisterStream(
-		NonClosingCheckpointOutputStream<T> stream) throws IOException {
+		NonClosingCheckpointOutputStream<? extends T> stream) throws IOException {
 
 		Preconditions.checkNotNull(stream);
 
@@ -149,9 +166,7 @@ public class StateSnapshotContextSynchronousImpl implements StateSnapshotContext
 			try {
 				closeAndUnregisterStream(keyedStateCheckpointOutputStream);
 			} catch (IOException e) {
-				exception = ExceptionUtils.firstOrSuppressed(
-					new IOException("Could not close the raw keyed state checkpoint output stream.", e),
-					exception);
+				exception = new IOException("Could not close the raw keyed state checkpoint output stream.", e);
 			}
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
index 326b95c..a940aef 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
@@ -18,81 +18,241 @@
 
 package org.apache.flink.runtime.state;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.ShutdownHookUtil;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnegative;
 import javax.annotation.Nonnull;
+import javax.annotation.concurrent.GuardedBy;
 
+import java.io.File;
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.Executor;
 
 /**
- * This class holds the all {@link TaskLocalStateStore} objects for a task executor (manager).
- *
- * TODO: this still still work in progress and partially still acts as a placeholder.
+ * This class holds the all {@link TaskLocalStateStoreImpl} objects for a task executor (manager).
  */
 public class TaskExecutorLocalStateStoresManager {
 
+	/** Logger for this class. */
+	private static final Logger LOG = LoggerFactory.getLogger(TaskExecutorLocalStateStoresManager.class);
+
 	/**
 	 * This map holds all local state stores for tasks running on the task manager / executor that own the instance of
-	 * this.
+	 * this. Maps from allocation id to all the subtask's local state stores.
 	 */
-	private final Map<JobID, Map<JobVertexSubtaskKey, TaskLocalStateStore>> taskStateManagers;
+	@GuardedBy("lock")
+	private final Map<AllocationID, Map<JobVertexSubtaskKey, TaskLocalStateStoreImpl>> taskStateStoresByAllocationID;
+
+	/** The configured mode for local recovery on this task manager. */
+	private final LocalRecoveryConfig.LocalRecoveryMode localRecoveryMode;
+
+	/** This is the root directory for all local state of this task manager / executor. */
+	private final File[] localStateRootDirectories;
+
+	/** Executor that runs the discarding of released state objects. */
+	private final Executor discardExecutor;
+
+	/** Guarding lock for taskStateStoresByAllocationID and closed-flag. */
+	private final Object lock;
+
+	private final Thread shutdownHook;
+
+	@GuardedBy("lock")
+	private boolean closed;
+
+	public TaskExecutorLocalStateStoresManager(
+		@Nonnull LocalRecoveryConfig.LocalRecoveryMode localRecoveryMode,
+		@Nonnull File[] localStateRootDirectories,
+		@Nonnull Executor discardExecutor) throws IOException {
+
+		this.taskStateStoresByAllocationID = new HashMap<>();
+		this.localRecoveryMode = localRecoveryMode;
+		this.localStateRootDirectories = localStateRootDirectories;
+		this.discardExecutor = discardExecutor;
+		this.lock = new Object();
+		this.closed = false;
+
+		for (File localStateRecoveryRootDir : localStateRootDirectories) {
+			if (!localStateRecoveryRootDir.exists()) {
+
+				if (!localStateRecoveryRootDir.mkdirs()) {
+					throw new IOException("Could not create root directory for local recovery: " +
+						localStateRecoveryRootDir);
+				}
+			}
+		}
 
-	public TaskExecutorLocalStateStoresManager() {
-		this.taskStateManagers = new HashMap<>();
+		// register a shutdown hook
+		this.shutdownHook = ShutdownHookUtil.addShutdownHook(this::shutdown, getClass().getSimpleName(), LOG);
 	}
 
-	public TaskLocalStateStore localStateStoreForTask(
-		JobID jobId,
-		JobVertexID jobVertexID,
-		int subtaskIndex) {
+	@Nonnull
+	public TaskLocalStateStore localStateStoreForSubtask(
+		@Nonnull JobID jobId,
+		@Nonnull AllocationID allocationID,
+		@Nonnull JobVertexID jobVertexID,
+		@Nonnegative int subtaskIndex) {
+
+		synchronized (lock) {
+
+			if (closed) {
+				throw new IllegalStateException("TaskExecutorLocalStateStoresManager is already closed and cannot " +
+					"register a new TaskLocalStateStore.");
+			}
+
+			final Map<JobVertexSubtaskKey, TaskLocalStateStoreImpl> taskStateManagers =
+				this.taskStateStoresByAllocationID.computeIfAbsent(allocationID, k -> new HashMap<>());
+
+			final JobVertexSubtaskKey taskKey = new JobVertexSubtaskKey(jobVertexID, subtaskIndex);
+
+			// create the allocation base dirs, one inside each root dir.
+			File[] allocationBaseDirectories = allocationBaseDirectories(allocationID);
 
-		Preconditions.checkNotNull(jobId);
-		final JobVertexSubtaskKey taskKey = new JobVertexSubtaskKey(jobVertexID, subtaskIndex);
+			LocalRecoveryDirectoryProviderImpl directoryProvider = new LocalRecoveryDirectoryProviderImpl(
+				allocationBaseDirectories,
+				jobId,
+				jobVertexID,
+				subtaskIndex);
 
-		final Map<JobVertexSubtaskKey, TaskLocalStateStore> taskStateManagers =
-			this.taskStateManagers.computeIfAbsent(jobId, k -> new HashMap<>());
+			LocalRecoveryConfig localRecoveryConfig = new LocalRecoveryConfig(
+				localRecoveryMode,
+				directoryProvider);
 
-		return taskStateManagers.computeIfAbsent(
-			taskKey, k -> new TaskLocalStateStore(jobId, jobVertexID, subtaskIndex));
+			return taskStateManagers.computeIfAbsent(
+				taskKey,
+				k -> new TaskLocalStateStoreImpl(
+					jobId,
+					allocationID,
+					jobVertexID,
+					subtaskIndex,
+					localRecoveryConfig,
+					discardExecutor));
+		}
 	}
 
-	public void releaseJob(JobID jobID) {
+	public void releaseLocalStateForAllocationId(@Nonnull AllocationID allocationID) {
+
+		Map<JobVertexSubtaskKey, TaskLocalStateStoreImpl> cleanupLocalStores;
 
-		Map<JobVertexSubtaskKey, TaskLocalStateStore> cleanupLocalStores = taskStateManagers.remove(jobID);
+		synchronized (lock) {
+			if (closed) {
+				return;
+			}
+			cleanupLocalStores = taskStateStoresByAllocationID.remove(allocationID);
+		}
 
 		if (cleanupLocalStores != null) {
 			doRelease(cleanupLocalStores.values());
 		}
+
+		cleanupAllocationBaseDirs(allocationID);
 	}
 
-	public void releaseAll() {
+	public void shutdown() {
+
+		HashMap<AllocationID, Map<JobVertexSubtaskKey, TaskLocalStateStoreImpl>> toRelease;
+
+		synchronized (lock) {
 
-		for (Map<JobVertexSubtaskKey, TaskLocalStateStore> stateStoreMap : taskStateManagers.values()) {
-			doRelease(stateStoreMap.values());
+			if (closed) {
+				return;
+			}
+
+			closed = true;
+			toRelease = new HashMap<>(taskStateStoresByAllocationID);
+			taskStateStoresByAllocationID.clear();
+		}
+
+		ShutdownHookUtil.removeShutdownHook(shutdownHook, getClass().getSimpleName(), LOG);
+
+		LOG.debug("Shutting down TaskExecutorLocalStateStoresManager.");
+
+		for (Map.Entry<AllocationID, Map<JobVertexSubtaskKey, TaskLocalStateStoreImpl>> entry :
+			toRelease.entrySet()) {
+
+			doRelease(entry.getValue().values());
+			cleanupAllocationBaseDirs(entry.getKey());
 		}
+	}
+
+	@VisibleForTesting
+	public LocalRecoveryConfig.LocalRecoveryMode getLocalRecoveryMode() {
+		return localRecoveryMode;
+	}
+
+	@VisibleForTesting
+	File[] getLocalStateRootDirectories() {
+		return localStateRootDirectories;
+	}
+
+	@VisibleForTesting
+	String allocationSubDirString(AllocationID allocationID) {
+		return "aid_" + allocationID;
+	}
 
-		taskStateManagers.clear();
+	private File[] allocationBaseDirectories(AllocationID allocationID) {
+		final String allocationSubDirString = allocationSubDirString(allocationID);
+		final File[] allocationDirectories = new File[localStateRootDirectories.length];
+		for (int i = 0; i < localStateRootDirectories.length; ++i) {
+			allocationDirectories[i] = new File(localStateRootDirectories[i], allocationSubDirString);
+		}
+		return allocationDirectories;
 	}
 
-	private void doRelease(Iterable<TaskLocalStateStore> toRelease) {
+	private void doRelease(Iterable<TaskLocalStateStoreImpl> toRelease) {
+
 		if (toRelease != null) {
-			for (TaskLocalStateStore stateStore : toRelease) {
-				stateStore.dispose();
+
+			for (TaskLocalStateStoreImpl stateStore : toRelease) {
+				try {
+					stateStore.dispose();
+				} catch (Exception disposeEx) {
+					LOG.warn("Exception while disposing local state store " + stateStore, disposeEx);
+				}
+			}
+		}
+	}
+
+	/**
+	 * Deletes the base dirs for this allocation id (recursively).
+	 */
+	private void cleanupAllocationBaseDirs(AllocationID allocationID) {
+		// clear the base dirs for this allocation id.
+		File[] allocationDirectories = allocationBaseDirectories(allocationID);
+		for (File directory : allocationDirectories) {
+			try {
+				FileUtils.deleteFileOrDirectory(directory);
+			} catch (IOException e) {
+				LOG.warn("Exception while deleting local state directory for allocation " + allocationID, e);
 			}
 		}
 	}
 
+	/**
+	 * Composite key of {@link JobVertexID} and subtask index that describes the subtask of a job vertex.
+	 */
 	private static final class JobVertexSubtaskKey {
 
+		/** The job vertex id. */
 		@Nonnull
 		final JobVertexID jobVertexID;
+
+		/** The subtask index. */
+		@Nonnegative
 		final int subtaskIndex;
 
-		public JobVertexSubtaskKey(@Nonnull JobVertexID jobVertexID, int subtaskIndex) {
-			this.jobVertexID = Preconditions.checkNotNull(jobVertexID);
+		JobVertexSubtaskKey(@Nonnull JobVertexID jobVertexID, @Nonnegative int subtaskIndex) {
+			this.jobVertexID = jobVertexID;
 			this.subtaskIndex = subtaskIndex;
 		}
 
@@ -107,10 +267,7 @@ public class TaskExecutorLocalStateStoresManager {
 
 			JobVertexSubtaskKey that = (JobVertexSubtaskKey) o;
 
-			if (subtaskIndex != that.subtaskIndex) {
-				return false;
-			}
-			return jobVertexID.equals(that.jobVertexID);
+			return subtaskIndex == that.subtaskIndex && jobVertexID.equals(that.jobVertexID);
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStore.java
index f743630..7089894 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStore.java
@@ -18,44 +18,48 @@
 
 package org.apache.flink.runtime.state;
 
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 /**
- * This class will service as a task-manager-level local storage for local checkpointed state. The purpose is to provide
- * access to a state that is stored locally for a faster recovery compared to the state that is stored remotely in a
- * stable store DFS. For now, this storage is only complementary to the stable storage and local state is typically
- * lost in case of machine failures. In such cases (and others), client code of this class must fall back to using the
- * slower but highly available store.
- *
- * TODO this is currently a placeholder / mock that still must be implemented!
+ * Classes  that implement this interface serve as a task-manager-level local storage for local checkpointed state.
+ * The purpose is to provide  access to a state that is stored locally for a faster recovery compared to the state that
+ * is stored remotely in a stable store DFS. For now, this storage is only complementary to the stable storage and local
+ * state is typically lost in case of machine failures. In such cases (and others), client code of this class must fall
+ * back to using the slower but highly available store.
  */
-public class TaskLocalStateStore {
-
-	/** */
-	private final JobID jobID;
-
-	/** */
-	private final JobVertexID jobVertexID;
-
-	/** */
-	private final int subtaskIndex;
-
-	public TaskLocalStateStore(
-		JobID jobID,
-		JobVertexID jobVertexID,
-		int subtaskIndex) {
+public interface TaskLocalStateStore {
+	/**
+	 * Stores the local state for the given checkpoint id.
+	 *
+	 * @param checkpointId id for the checkpoint that created the local state that will be stored.
+	 * @param localState the local state to store.
+	 */
+	void storeLocalState(
+		@Nonnegative long checkpointId,
+		@Nullable TaskStateSnapshot localState);
 
-		this.jobID = jobID;
-		this.jobVertexID = jobVertexID;
-		this.subtaskIndex = subtaskIndex;
-	}
+	/**
+	 * Returns the local state that is stored under the given checkpoint id or null if nothing was stored under the id.
+	 *
+	 * @param checkpointID the checkpoint id by which we search for local state.
+	 * @return the local state found for the given checkpoint id. Can be null
+	 */
+	@Nullable
+	TaskStateSnapshot retrieveLocalState(long checkpointID);
 
-	public void storeSnapshot(/* TODO */) {
-		throw new UnsupportedOperationException("TODO!");
-	}
+	/**
+	 * Returns the {@link LocalRecoveryConfig} for this task local state store.
+	 */
+	@Nonnull
+	LocalRecoveryConfig getLocalRecoveryConfig();
 
-	public void dispose() {
-		throw new UnsupportedOperationException("TODO!");
-	}
+	/**
+	 * Notifies that the checkpoint with the given id was confirmed as complete. This prunes the checkpoint history
+	 * and removes all local states with a checkpoint id that is smaller than the newly confirmed checkpoint id.
+	 */
+	void confirmCheckpoint(long confirmedCheckpointId);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
new file mode 100644
index 0000000..191c109
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+
+/**
+ * Main implementation of a {@link TaskLocalStateStore}.
+ */
+public class TaskLocalStateStoreImpl implements TaskLocalStateStore {
+
+	/** Logger for this class. */
+	private static final Logger LOG = LoggerFactory.getLogger(TaskLocalStateStoreImpl.class);
+
+	/** Dummy value to use instead of null to satisfy {@link ConcurrentHashMap}. */
+	private static final TaskStateSnapshot NULL_DUMMY = new TaskStateSnapshot(0);
+
+	/** JobID from the owning subtask. */
+	@Nonnull
+	private final JobID jobID;
+
+	/** AllocationID of the owning slot. */
+	@Nonnull
+	private final AllocationID allocationID;
+
+	/** JobVertexID of the owning subtask. */
+	@Nonnull
+	private final JobVertexID jobVertexID;
+
+	/** Subtask index of the owning subtask. */
+	@Nonnegative
+	private final int subtaskIndex;
+
+	/** The configured mode for local recovery. */
+	@Nonnull
+	private final LocalRecoveryConfig localRecoveryConfig;
+
+	/** Executor that runs the discarding of released state objects. */
+	@Nonnull
+	private final Executor discardExecutor;
+
+	/** Lock for synchronisation on the storage map and the discarded status. */
+	@Nonnull
+	private final Object lock;
+
+	/** Status flag if this store was already discarded. */
+	@GuardedBy("lock")
+	private boolean disposed;
+
+	/** Maps checkpoint ids to local TaskStateSnapshots. */
+	@Nonnull
+	@GuardedBy("lock")
+	private final SortedMap<Long, TaskStateSnapshot> storedTaskStateByCheckpointID;
+
+	public TaskLocalStateStoreImpl(
+		@Nonnull JobID jobID,
+		@Nonnull AllocationID allocationID,
+		@Nonnull JobVertexID jobVertexID,
+		@Nonnegative int subtaskIndex,
+		@Nonnull LocalRecoveryConfig localRecoveryConfig,
+		@Nonnull Executor discardExecutor) {
+
+		this.jobID = jobID;
+		this.allocationID = allocationID;
+		this.jobVertexID = jobVertexID;
+		this.subtaskIndex = subtaskIndex;
+		this.discardExecutor = discardExecutor;
+		this.lock = new Object();
+		this.storedTaskStateByCheckpointID = new TreeMap<>();
+		this.disposed = false;
+		this.localRecoveryConfig = localRecoveryConfig;
+	}
+
+	@Override
+	public void storeLocalState(
+		@Nonnegative long checkpointId,
+		@Nullable TaskStateSnapshot localState) {
+
+		if (localState == null) {
+			localState = NULL_DUMMY;
+		}
+
+		LOG.info("Storing local state for checkpoint {}.", checkpointId);
+		LOG.debug("Local state for checkpoint {} is {}.", checkpointId, localState);
+
+		Map<Long, TaskStateSnapshot> toDiscard = new HashMap<>(16);
+
+		synchronized (lock) {
+			if (disposed) {
+				// we ignore late stores and simply discard the state.
+				toDiscard.put(checkpointId, localState);
+			} else {
+				TaskStateSnapshot previous =
+					storedTaskStateByCheckpointID.put(checkpointId, localState);
+
+				if (previous != null) {
+					toDiscard.put(checkpointId, previous);
+				}
+			}
+		}
+
+		asyncDiscardLocalStateForCollection(toDiscard.entrySet());
+	}
+
+	@Override
+	@Nullable
+	public TaskStateSnapshot retrieveLocalState(long checkpointID) {
+		synchronized (lock) {
+			TaskStateSnapshot snapshot = storedTaskStateByCheckpointID.get(checkpointID);
+			return snapshot != NULL_DUMMY ? snapshot : null;
+		}
+	}
+
+	@Override
+	@Nonnull
+	public LocalRecoveryConfig getLocalRecoveryConfig() {
+		return localRecoveryConfig;
+	}
+
+	@Override
+	public void confirmCheckpoint(long confirmedCheckpointId) {
+
+		LOG.debug("Received confirmation for checkpoint {}. Starting to prune history.", confirmedCheckpointId);
+
+		final List<Map.Entry<Long, TaskStateSnapshot>> toRemove = new ArrayList<>();
+
+		synchronized (lock) {
+
+			Iterator<Map.Entry<Long, TaskStateSnapshot>> entryIterator =
+				storedTaskStateByCheckpointID.entrySet().iterator();
+
+			// remove entries for outdated checkpoints and discard their state.
+			while (entryIterator.hasNext()) {
+
+				Map.Entry<Long, TaskStateSnapshot> snapshotEntry = entryIterator.next();
+				long entryCheckpointId = snapshotEntry.getKey();
+
+				if (entryCheckpointId < confirmedCheckpointId) {
+					toRemove.add(snapshotEntry);
+					entryIterator.remove();
+				} else {
+					// we can stop because the map is sorted.
+					break;
+				}
+			}
+		}
+
+		asyncDiscardLocalStateForCollection(toRemove);
+	}
+
+	/**
+	 * Disposes the state of all local snapshots managed by this object.
+	 */
+	public CompletableFuture<Void> dispose() {
+
+		Collection<Map.Entry<Long, TaskStateSnapshot>> statesCopy;
+
+		synchronized (lock) {
+			disposed = true;
+			statesCopy = new ArrayList<>(storedTaskStateByCheckpointID.entrySet());
+			storedTaskStateByCheckpointID.clear();
+		}
+
+		return CompletableFuture.runAsync(
+			() -> {
+				// discard all remaining state objects.
+				syncDiscardLocalStateForCollection(statesCopy);
+
+				// delete the local state subdirectory that belong to this subtask.
+				LocalRecoveryDirectoryProvider directoryProvider = localRecoveryConfig.getLocalStateDirectoryProvider();
+				for (int i = 0; i < directoryProvider.allocationBaseDirsCount(); ++i) {
+					File subtaskBaseDirectory = directoryProvider.selectSubtaskBaseDirectory(i);
+					try {
+						deleteDirectory(subtaskBaseDirectory);
+					} catch (IOException e) {
+						LOG.warn("Exception when deleting local recovery subtask base dir: " + subtaskBaseDirectory, e);
+					}
+				}
+			},
+			discardExecutor);
+	}
+
+	private void asyncDiscardLocalStateForCollection(Collection<Map.Entry<Long, TaskStateSnapshot>> toDiscard) {
+		if (!toDiscard.isEmpty()) {
+			discardExecutor.execute(() -> syncDiscardLocalStateForCollection(toDiscard));
+		}
+	}
+
+	private void syncDiscardLocalStateForCollection(Collection<Map.Entry<Long, TaskStateSnapshot>> toDiscard) {
+		for (Map.Entry<Long, TaskStateSnapshot> entry : toDiscard) {
+			discardLocalStateForCheckpoint(entry.getKey(), entry.getValue());
+		}
+	}
+
+	/**
+	 * Helper method that discards state objects with an executor and reports exceptions to the log.
+	 */
+	private void discardLocalStateForCheckpoint(long checkpointID, TaskStateSnapshot o) {
+
+		try {
+			if (LOG.isTraceEnabled()) {
+				LOG.trace("Discarding local task state snapshot of checkpoint {} for {}/{}/{}.",
+					checkpointID, jobID, jobVertexID, subtaskIndex);
+			} else {
+				LOG.debug("Discarding local task state snapshot {} of checkpoint {} for {}/{}/{}.",
+					o, checkpointID, jobID, jobVertexID, subtaskIndex);
+			}
+			o.discardState();
+		} catch (Exception discardEx) {
+			LOG.warn("Exception while discarding local task state snapshot of checkpoint " + checkpointID + ".", discardEx);
+		}
+
+		LocalRecoveryDirectoryProvider directoryProvider = localRecoveryConfig.getLocalStateDirectoryProvider();
+		File checkpointDir = directoryProvider.subtaskSpecificCheckpointDirectory(checkpointID);
+		LOG.debug("Deleting local state directory {} of checkpoint {} for {}/{}/{}/{}.",
+			checkpointDir, checkpointID, jobID, jobVertexID, subtaskIndex);
+		try {
+			deleteDirectory(checkpointDir);
+		} catch (IOException ex) {
+			LOG.warn("Exception while deleting local state directory of checkpoint " + checkpointID + ".", ex);
+		}
+	}
+
+	/**
+	 * Helper method to delete a directory.
+	 */
+	private void deleteDirectory(File directory) throws IOException {
+		Path path = new Path(directory.toURI());
+		FileSystem fileSystem = path.getFileSystem();
+		if (fileSystem.exists(path)) {
+			fileSystem.delete(path, true);
+		}
+	}
+
+	@Override
+	public String toString() {
+		return "TaskLocalStateStore{" +
+			"jobID=" + jobID +
+			", jobVertexID=" + jobVertexID +
+			", allocationID=" + allocationID +
+			", subtaskIndex=" + subtaskIndex +
+			", localRecoveryConfig=" + localRecoveryConfig +
+			'}';
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManager.java
index 8b41e9e..82591ba 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManager.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.state;
 
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
-import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.PrioritizedOperatorSubtaskState;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 
@@ -44,19 +44,29 @@ public interface TaskStateManager extends CheckpointListener {
 	 * Report the state snapshots for the operator instances running in the owning task.
 	 *
 	 * @param checkpointMetaData meta data from the checkpoint request.
-	 * @param checkpointMetrics task level metrics for the checkpoint.
-	 * @param acknowledgedState the reported states from the owning task.
+	 * @param checkpointMetrics  task level metrics for the checkpoint.
+	 * @param acknowledgedState  the reported states to acknowledge to the job manager.
+	 * @param localState         the reported states for local recovery.
 	 */
-	void reportStateHandles(
+	void reportTaskStateSnapshots(
 		@Nonnull CheckpointMetaData checkpointMetaData,
 		@Nonnull CheckpointMetrics checkpointMetrics,
-		@Nullable TaskStateSnapshot acknowledgedState);
+		@Nullable TaskStateSnapshot acknowledgedState,
+		@Nullable TaskStateSnapshot localState);
 
 	/**
 	 * Returns means to restore previously reported state of an operator running in the owning task.
 	 *
 	 * @param operatorID the id of the operator for which we request state.
-	 * @return previous state for the operator. Null if no previous state exists.
+	 * @return Previous state for the operator. The previous state can be empty if the operator had no previous state.
 	 */
-	OperatorSubtaskState operatorStates(OperatorID operatorID);
+	@Nonnull
+	PrioritizedOperatorSubtaskState prioritizedOperatorState(OperatorID operatorID);
+
+	/**
+	 * Returns the configuration for local recovery, i.e. the base directories for all file-based local state of the
+	 * owning subtask and the general mode for local recovery.
+	 */
+	@Nonnull
+	LocalRecoveryConfig createLocalRecoveryConfig();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java
index 3cd66fb..3acca7c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java
@@ -23,6 +23,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.PrioritizedOperatorSubtaskState;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
@@ -31,6 +32,8 @@ import org.apache.flink.runtime.taskmanager.CheckpointResponder;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
+import java.util.Collections;
+
 /**
  * This class is the default implementation of {@link TaskStateManager} and collaborates with the job manager
  * through {@link CheckpointResponder}) as well as a task-manager-local state store. Like this, client code does
@@ -75,35 +78,67 @@ public class TaskStateManagerImpl implements TaskStateManager {
 	}
 
 	@Override
-	public void reportStateHandles(
+	public void reportTaskStateSnapshots(
 		@Nonnull CheckpointMetaData checkpointMetaData,
 		@Nonnull CheckpointMetrics checkpointMetrics,
-		@Nullable TaskStateSnapshot acknowledgedState) {
+		@Nullable TaskStateSnapshot acknowledgedState,
+		@Nullable TaskStateSnapshot localState) {
+
+		long checkpointId = checkpointMetaData.getCheckpointId();
+
+		localStateStore.storeLocalState(checkpointId, localState);
 
 		checkpointResponder.acknowledgeCheckpoint(
 			jobId,
 			executionAttemptID,
-			checkpointMetaData.getCheckpointId(),
+			checkpointId,
 			checkpointMetrics,
 			acknowledgedState);
 	}
 
+	@Nonnull
 	@Override
-	public OperatorSubtaskState operatorStates(OperatorID operatorID) {
+	public PrioritizedOperatorSubtaskState prioritizedOperatorState(OperatorID operatorID) {
 
 		if (jobManagerTaskRestore == null) {
-			return null;
+			return PrioritizedOperatorSubtaskState.emptyNotRestored();
+		}
+
+		TaskStateSnapshot jobManagerStateSnapshot =
+			jobManagerTaskRestore.getTaskStateSnapshot();
+
+		OperatorSubtaskState jobManagerSubtaskState =
+			jobManagerStateSnapshot.getSubtaskStateByOperatorID(operatorID);
+
+		if (jobManagerSubtaskState == null) {
+			return PrioritizedOperatorSubtaskState.emptyNotRestored();
 		}
 
-		TaskStateSnapshot taskStateSnapshot = jobManagerTaskRestore.getTaskStateSnapshot();
-		return taskStateSnapshot.getSubtaskStateByOperatorID(operatorID);
+		TaskStateSnapshot localStateSnapshot =
+			localStateStore.retrieveLocalState(jobManagerTaskRestore.getRestoreCheckpointId());
 
-		/*
-		TODO!!!!!!!
-		1) lookup local states for a matching operatorID / checkpointID.
-		2) if nothing available: look into job manager provided state.
-		3) massage it into a snapshots and return stuff.
-		 */
+		if (localStateSnapshot != null) {
+			OperatorSubtaskState localSubtaskState = localStateSnapshot.getSubtaskStateByOperatorID(operatorID);
+
+			if (localSubtaskState != null) {
+				PrioritizedOperatorSubtaskState.Builder builder = new PrioritizedOperatorSubtaskState.Builder(
+					jobManagerSubtaskState,
+					Collections.singletonList(localSubtaskState));
+				return builder.build();
+			}
+		}
+
+		PrioritizedOperatorSubtaskState.Builder builder = new PrioritizedOperatorSubtaskState.Builder(
+			jobManagerSubtaskState,
+			Collections.emptyList(),
+			true);
+		return builder.build();
+	}
+
+	@Nonnull
+	@Override
+	public LocalRecoveryConfig createLocalRecoveryConfig() {
+		return localStateStore.getLocalRecoveryConfig();
 	}
 
 	/**
@@ -111,6 +146,6 @@ public class TaskStateManagerImpl implements TaskStateManager {
 	 */
 	@Override
 	public void notifyCheckpointComplete(long checkpointId) throws Exception {
-		//TODO activate and prune local state later
+		localStateStore.confirmCheckpoint(checkpointId);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileBasedStateOutputStream.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileBasedStateOutputStream.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileBasedStateOutputStream.java
new file mode 100644
index 0000000..054a98c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileBasedStateOutputStream.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filesystem;
+
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystem.WriteMode;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link CheckpointStateOutputStream} that writes into a specified file and
+ * returns a {@link FileStateHandle} upon closing.
+ *
+ * <p>Unlike the {@link org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.FsCheckpointStateOutputStream},
+ * this stream does not have a threshold below which it returns a memory byte stream handle,
+ * and does not create random files, but writes to a specified file.
+ */
+public final class FileBasedStateOutputStream extends CheckpointStateOutputStream {
+
+	private static final Logger LOG = LoggerFactory.getLogger(FileBasedStateOutputStream.class);
+
+	// ------------------------------------------------------------------------
+
+	private final FSDataOutputStream out;
+
+	private final Path path;
+
+	private final FileSystem fileSystem;
+
+	private volatile boolean closed;
+
+
+	public FileBasedStateOutputStream(FileSystem fileSystem, Path path) throws IOException {
+		this.fileSystem = checkNotNull(fileSystem);
+		this.path = checkNotNull(path);
+
+		this.out = fileSystem.create(path, WriteMode.NO_OVERWRITE);
+	}
+
+	// ------------------------------------------------------------------------
+	//  I/O
+	// ------------------------------------------------------------------------
+
+	@Override
+	public final void write(int b) throws IOException {
+		out.write(b);
+	}
+
+	@Override
+	public final void write(@Nonnull byte[] b, int off, int len) throws IOException {
+		out.write(b, off, len);
+	}
+
+	@Override
+	public long getPos() throws IOException {
+		return out.getPos();
+	}
+
+	@Override
+	public void flush() throws IOException {
+		out.flush();
+	}
+
+	@Override
+	public void sync() throws IOException {
+		out.sync();
+	}
+
+	// ------------------------------------------------------------------------
+	//  Closing
+	// ------------------------------------------------------------------------
+
+	public boolean isClosed() {
+		return closed;
+	}
+
+	@Override
+	public void close() {
+		if (!closed) {
+			closed = true;
+
+			try {
+				out.close();
+				fileSystem.delete(path, false);
+			}
+			catch (Throwable t) {
+				LOG.warn("Could not close the state stream for {}.", path, t);
+			}
+		}
+	}
+
+	@Nullable
+	@Override
+	public FileStateHandle closeAndGetHandle() throws IOException {
+		synchronized (this) {
+			if (!closed) {
+				try {
+					// make a best effort attempt to figure out the size
+					long size = 0;
+					try {
+						size = out.getPos();
+					} catch (Exception ignored) {}
+
+					// close and return
+					out.close();
+
+					return new FileStateHandle(path, size);
+				}
+				catch (Exception e) {
+					try {
+						fileSystem.delete(path, false);
+					}
+					catch (Exception deleteException) {
+						LOG.warn("Could not delete the checkpoint stream file {}.", path, deleteException);
+					}
+
+					throw new IOException("Could not flush and close the file system " +
+						"output stream to " + path + " in order to obtain the " +
+						"stream state handle", e);
+				}
+				finally {
+					closed = true;
+				}
+			}
+			else {
+				throw new IOException("Stream has already been closed and discarded.");
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
index f993786..609ef69 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
@@ -29,6 +29,8 @@ import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.UUID;
@@ -115,8 +117,10 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory {
 	// ------------------------------------------------------------------------
 
 	@Override
-	public FsCheckpointStateOutputStream createCheckpointStateOutputStream(CheckpointedStateScope scope) throws Exception {
-		Path target = scope == CheckpointedStateScope.EXCLUSIVE ? checkpointDirectory : sharedStateDirectory;
+	public FsCheckpointStateOutputStream createCheckpointStateOutputStream(CheckpointedStateScope scope) throws IOException {
+
+
+		Path target = scope == CheckpointedStateScope.EXCLUSIVE ?checkpointDirectory: sharedStateDirectory;
 		int bufferSize = Math.max(DEFAULT_WRITE_BUFFER_SIZE, fileStateThreshold);
 
 		return new FsCheckpointStateOutputStream(target, filesystem, bufferSize, fileStateThreshold);
@@ -275,6 +279,7 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory {
 			}
 		}
 
+		@Nullable
 		@Override
 		public StreamStateHandle closeAndGetHandle() throws IOException {
 			// check if there was nothing ever written

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
index 58791e2..637effd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
@@ -32,7 +32,9 @@ import org.apache.flink.runtime.state.CheckpointStorage;
 import org.apache.flink.runtime.state.ConfigurableStateBackend;
 import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
 import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.LocalRecoveryConfig;
 import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.TaskStateManager;
 import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
 import org.apache.flink.util.TernaryBoolean;
 
@@ -102,7 +104,7 @@ public class FsStateBackend extends AbstractFileStateBackend implements Configur
 	 * A value of 'undefined' means not yet configured, in which case the default will be used. */
 	private final TernaryBoolean asynchronousSnapshots;
 
-	// ------------------------------------------------------------------------
+	// -----------------------------------------------------------------------
 
 	/**
 	 * Creates a new state backend that stores its checkpoint data in the file system and location
@@ -451,7 +453,10 @@ public class FsStateBackend extends AbstractFileStateBackend implements Configur
 			TypeSerializer<K> keySerializer,
 			int numberOfKeyGroups,
 			KeyGroupRange keyGroupRange,
-			TaskKvStateRegistry kvStateRegistry) throws IOException {
+			TaskKvStateRegistry kvStateRegistry) {
+
+		TaskStateManager taskStateManager = env.getTaskStateManager();
+		LocalRecoveryConfig localRecoveryConfig = taskStateManager.createLocalRecoveryConfig();
 
 		return new HeapKeyedStateBackend<>(
 				kvStateRegistry,
@@ -460,13 +465,14 @@ public class FsStateBackend extends AbstractFileStateBackend implements Configur
 				numberOfKeyGroups,
 				keyGroupRange,
 				isUsingAsynchronousSnapshots(),
-				env.getExecutionConfig());
+				env.getExecutionConfig(),
+				localRecoveryConfig);
 	}
 
 	@Override
 	public OperatorStateBackend createOperatorStateBackend(
 		Environment env,
-		String operatorIdentifier) throws Exception {
+		String operatorIdentifier) {
 
 		return new DefaultOperatorStateBackend(
 			env.getUserClassLoader(),

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index aa7ea6e..5d5f716 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -49,8 +49,10 @@ import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
 import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.LocalRecoveryConfig;
 import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
 import org.apache.flink.runtime.state.SnappyStreamCompressionDecorator;
+import org.apache.flink.runtime.state.SnapshotResult;
 import org.apache.flink.runtime.state.StreamCompressionDecorator;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
@@ -117,13 +119,14 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	private final boolean asynchronousSnapshots;
 
 	public HeapKeyedStateBackend(
-			TaskKvStateRegistry kvStateRegistry,
-			TypeSerializer<K> keySerializer,
-			ClassLoader userCodeClassLoader,
-			int numberOfKeyGroups,
-			KeyGroupRange keyGroupRange,
-			boolean asynchronousSnapshots,
-			ExecutionConfig executionConfig) {
+		TaskKvStateRegistry kvStateRegistry,
+		TypeSerializer<K> keySerializer,
+		ClassLoader userCodeClassLoader,
+		int numberOfKeyGroups,
+		KeyGroupRange keyGroupRange,
+		boolean asynchronousSnapshots,
+		ExecutionConfig executionConfig,
+		LocalRecoveryConfig localRecoveryConfig) {
 
 		super(kvStateRegistry, keySerializer, userCodeClassLoader, numberOfKeyGroups, keyGroupRange, executionConfig);
 		this.asynchronousSnapshots = asynchronousSnapshots;
@@ -286,14 +289,14 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 	@Override
 	@SuppressWarnings("unchecked")
-	public  RunnableFuture<KeyedStateHandle> snapshot(
+	public  RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(
 			final long checkpointId,
 			final long timestamp,
 			final CheckpointStreamFactory streamFactory,
 			CheckpointOptions checkpointOptions) throws Exception {
 
 		if (!hasRegisteredState()) {
-			return DoneFuture.nullValue();
+			return DoneFuture.of(SnapshotResult.empty());
 		}
 
 		long syncStartTime = System.currentTimeMillis();
@@ -326,8 +329,8 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		//--------------------------------------------------- this becomes the end of sync part
 
 		// implementation of the async IO operation, based on FutureTask
-		final AbstractAsyncCallableWithResources<KeyedStateHandle> ioCallable =
-			new AbstractAsyncCallableWithResources<KeyedStateHandle>() {
+		final AbstractAsyncCallableWithResources<SnapshotResult<KeyedStateHandle>> ioCallable =
+			new AbstractAsyncCallableWithResources<SnapshotResult<KeyedStateHandle>>() {
 
 				CheckpointStreamFactory.CheckpointStateOutputStream stream = null;
 
@@ -359,7 +362,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 				}
 
 				@Override
-				public KeyGroupsStateHandle performOperation() throws Exception {
+				public SnapshotResult<KeyedStateHandle> performOperation() throws Exception {
 					long asyncStartTime = System.currentTimeMillis();
 
 					CheckpointStreamFactory.CheckpointStateOutputStream localStream = this.stream;
@@ -401,15 +404,15 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 							final KeyGroupsStateHandle keyGroupsStateHandle =
 								new KeyGroupsStateHandle(offsets, streamStateHandle);
 
-							return keyGroupsStateHandle;
+							return SnapshotResult.of(keyGroupsStateHandle);
 						}
 					}
 
-					return null;
+					return SnapshotResult.empty();
 				}
 			};
 
-		AsyncStoppableTaskWithCallback<KeyedStateHandle> task = AsyncStoppableTaskWithCallback.from(ioCallable);
+		AsyncStoppableTaskWithCallback<SnapshotResult<KeyedStateHandle>> task = AsyncStoppableTaskWithCallback.from(ioCallable);
 
 		if (!asynchronousSnapshots) {
 			task.run();

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java
index 168e4ff..0801429 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java
@@ -23,6 +23,8 @@ import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.CheckpointedStateScope;
 import org.apache.flink.runtime.state.StreamStateHandle;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -47,7 +49,8 @@ public class MemCheckpointStreamFactory implements CheckpointStreamFactory {
 
 	@Override
 	public CheckpointStateOutputStream createCheckpointStateOutputStream(
-			CheckpointedStateScope scope) throws Exception {
+			CheckpointedStateScope scope) throws IOException
+	{
 		return new MemoryCheckpointOutputStream(maxStateSize);
 	}
 
@@ -114,6 +117,7 @@ public class MemCheckpointStreamFactory implements CheckpointStreamFactory {
 			}
 		}
 
+		@Nullable
 		@Override
 		public StreamStateHandle closeAndGetHandle() throws IOException {
 			if (isEmpty) {

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
index 88d7b01..3da60e4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.state.ConfigurableStateBackend;
 import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.TaskStateManager;
 import org.apache.flink.runtime.state.filesystem.AbstractFileStateBackend;
 import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
 import org.apache.flink.util.TernaryBoolean;
@@ -299,13 +300,16 @@ public class MemoryStateBackend extends AbstractFileStateBackend implements Conf
 
 	@Override
 	public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
-			Environment env, JobID jobID,
+			Environment env,
+			JobID jobID,
 			String operatorIdentifier,
 			TypeSerializer<K> keySerializer,
 			int numberOfKeyGroups,
 			KeyGroupRange keyGroupRange,
 			TaskKvStateRegistry kvStateRegistry) {
 
+		TaskStateManager taskStateManager = env.getTaskStateManager();
+
 		return new HeapKeyedStateBackend<>(
 				kvStateRegistry,
 				keySerializer,
@@ -313,7 +317,8 @@ public class MemoryStateBackend extends AbstractFileStateBackend implements Conf
 				numberOfKeyGroups,
 				keyGroupRange,
 				isUsingAsynchronousSnapshots(),
-				env.getExecutionConfig());
+				env.getExecutionConfig(),
+				taskStateManager.createLocalRecoveryConfig());
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index f4c953d..927bd11 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -205,7 +205,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 		this.jobManagerTable = taskExecutorServices.getJobManagerTable();
 		this.jobLeaderService = taskExecutorServices.getJobLeaderService();
 		this.taskManagerLocation = taskExecutorServices.getTaskManagerLocation();
-		this.localStateStoresManager = taskExecutorServices.getTaskStateManager();
+		this.localStateStoresManager = taskExecutorServices.getTaskManagerStateStore();
 		this.networkEnvironment = taskExecutorServices.getNetworkEnvironment();
 
 		this.jobManagerConnections = new HashMap<>(4);
@@ -452,8 +452,9 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 			ResultPartitionConsumableNotifier resultPartitionConsumableNotifier = jobManagerConnection.getResultPartitionConsumableNotifier();
 			PartitionProducerStateChecker partitionStateChecker = jobManagerConnection.getPartitionStateChecker();
 
-			final TaskLocalStateStore localStateStore = localStateStoresManager.localStateStoreForTask(
+			final TaskLocalStateStore localStateStore = localStateStoresManager.localStateStoreForSubtask(
 				jobId,
+				tdd.getAllocationId(),
 				taskInformation.getJobVertexId(),
 				tdd.getSubtaskIndex());
 
@@ -744,6 +745,9 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 						onFatalError(slotNotFoundException);
 					}
 
+					// release local state under the allocation id.
+					localStateStoresManager.releaseLocalStateForAllocationId(allocationId);
+
 					// sanity check
 					if (!taskSlotTable.isSlotFree(slotId.getSlotNumber())) {
 						onFatalError(new Exception("Could not free slot " + slotId));
@@ -1271,6 +1275,8 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 		} catch (SlotNotFoundException e) {
 			log.debug("Could not free slot for allocation id {}.", allocationId, e);
 		}
+
+		localStateStoresManager.releaseLocalStateForAllocationId(allocationId);
 	}
 
 	private void timeoutSlot(AllocationID allocationId, UUID ticket) {

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index 2de1be8..08335b2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -303,6 +303,7 @@ public class TaskManagerRunner implements FatalErrorHandler {
 		TaskManagerServices taskManagerServices = TaskManagerServices.fromConfiguration(
 			taskManagerServicesConfiguration,
 			resourceID,
+			rpcService.getExecutor(), // TODO replace this later with some dedicated executor for io.
 			EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag(),
 			EnvironmentInformation.getMaxJvmHeapMemory());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index b710b6a..32c7ff7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.taskexecutor;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.memory.MemoryType;
@@ -42,6 +43,7 @@ import org.apache.flink.runtime.query.KvStateClientProxy;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.query.KvStateServer;
 import org.apache.flink.runtime.query.QueryableStateUtils;
+import org.apache.flink.runtime.state.LocalRecoveryConfig;
 import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
 import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
 import org.apache.flink.runtime.taskexecutor.slot.TimerService;
@@ -58,6 +60,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 
 /**
@@ -68,6 +71,9 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
 public class TaskManagerServices {
 	private static final Logger LOG = LoggerFactory.getLogger(TaskManagerServices.class);
 
+	@VisibleForTesting
+	public static final String LOCAL_STATE_SUB_DIRECTORY_ROOT = "localState";
+
 	/** TaskManager services. */
 	private final TaskManagerLocation taskManagerLocation;
 	private final MemoryManager memoryManager;
@@ -78,7 +84,7 @@ public class TaskManagerServices {
 	private final TaskSlotTable taskSlotTable;
 	private final JobManagerTable jobManagerTable;
 	private final JobLeaderService jobLeaderService;
-	private final TaskExecutorLocalStateStoresManager taskStateManager;
+	private final TaskExecutorLocalStateStoresManager taskManagerStateStore;
 
 	TaskManagerServices(
 		TaskManagerLocation taskManagerLocation,
@@ -90,7 +96,7 @@ public class TaskManagerServices {
 		TaskSlotTable taskSlotTable,
 		JobManagerTable jobManagerTable,
 		JobLeaderService jobLeaderService,
-		TaskExecutorLocalStateStoresManager taskStateManager) {
+		TaskExecutorLocalStateStoresManager taskManagerStateStore) {
 
 		this.taskManagerLocation = Preconditions.checkNotNull(taskManagerLocation);
 		this.memoryManager = Preconditions.checkNotNull(memoryManager);
@@ -101,7 +107,7 @@ public class TaskManagerServices {
 		this.taskSlotTable = Preconditions.checkNotNull(taskSlotTable);
 		this.jobManagerTable = Preconditions.checkNotNull(jobManagerTable);
 		this.jobLeaderService = Preconditions.checkNotNull(jobLeaderService);
-		this.taskStateManager = Preconditions.checkNotNull(taskStateManager);
+		this.taskManagerStateStore = Preconditions.checkNotNull(taskManagerStateStore);
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -144,8 +150,8 @@ public class TaskManagerServices {
 		return jobLeaderService;
 	}
 
-	public TaskExecutorLocalStateStoresManager getTaskStateManager() {
-		return taskStateManager;
+	public TaskExecutorLocalStateStoresManager getTaskManagerStateStore() {
+		return taskManagerStateStore;
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -160,6 +166,12 @@ public class TaskManagerServices {
 		Exception exception = null;
 
 		try {
+			taskManagerStateStore.shutdown();
+		} catch (Exception e) {
+			exception = e;
+		}
+
+		try {
 			memoryManager.shutdown();
 		} catch (Exception e) {
 			exception = ExceptionUtils.firstOrSuppressed(e, exception);
@@ -209,6 +221,7 @@ public class TaskManagerServices {
 	 *
 	 * @param resourceID resource ID of the task manager
 	 * @param taskManagerServicesConfiguration task manager configuration
+	 * @param taskIOExecutor executor for async IO operations.
 	 * @param freeHeapMemoryWithDefrag an estimate of the size of the free heap memory
 	 * @param maxJvmHeapMemory the maximum JVM heap size
 	 * @return task manager components
@@ -217,6 +230,7 @@ public class TaskManagerServices {
 	public static TaskManagerServices fromConfiguration(
 			TaskManagerServicesConfiguration taskManagerServicesConfiguration,
 			ResourceID resourceID,
+			Executor taskIOExecutor,
 			long freeHeapMemoryWithDefrag,
 			long maxJvmHeapMemory) throws Exception {
 
@@ -256,7 +270,20 @@ public class TaskManagerServices {
 		final JobManagerTable jobManagerTable = new JobManagerTable();
 
 		final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation);
-		final TaskExecutorLocalStateStoresManager taskStateManager = new TaskExecutorLocalStateStoresManager();
+
+		LocalRecoveryConfig.LocalRecoveryMode localRecoveryMode = taskManagerServicesConfiguration.getLocalRecoveryMode();
+
+		final String[] stateRootDirectoryStrings = taskManagerServicesConfiguration.getLocalRecoveryStateRootDirectories();
+
+		final File[] stateRootDirectoryFiles = new File[stateRootDirectoryStrings.length];
+
+		for (int i = 0; i < stateRootDirectoryStrings.length; ++i) {
+			stateRootDirectoryFiles[i] = new File(stateRootDirectoryStrings[i], LOCAL_STATE_SUB_DIRECTORY_ROOT);
+		}
+
+		final TaskExecutorLocalStateStoresManager taskStateManager =
+			new TaskExecutorLocalStateStoresManager(localRecoveryMode, stateRootDirectoryFiles, taskIOExecutor);
+
 		return new TaskManagerServices(
 			taskManagerLocation,
 			memoryManager,

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
index 07cf660..d029bc5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.netty.NettyConfig;
 import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.state.LocalRecoveryConfig;
 import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
 import org.apache.flink.util.MathUtils;
 import org.apache.flink.util.NetUtils;
@@ -54,6 +55,8 @@ public class TaskManagerServicesConfiguration {
 
 	private final String[] tmpDirPaths;
 
+	private final String[] localRecoveryStateRootDirectories;
+
 	private final int numberOfSlots;
 
 	private final NetworkEnvironmentConfiguration networkConfig;
@@ -75,9 +78,13 @@ public class TaskManagerServicesConfiguration {
 
 	private final long timerServiceShutdownTimeout;
 
+	private final LocalRecoveryConfig.LocalRecoveryMode localRecoveryMode;
+
 	public TaskManagerServicesConfiguration(
 			InetAddress taskManagerAddress,
 			String[] tmpDirPaths,
+			String[] localRecoveryStateRootDirectories,
+			LocalRecoveryConfig.LocalRecoveryMode localRecoveryMode,
 			NetworkEnvironmentConfiguration networkConfig,
 			QueryableStateConfiguration queryableStateConfig,
 			int numberOfSlots,
@@ -89,6 +96,8 @@ public class TaskManagerServicesConfiguration {
 
 		this.taskManagerAddress = checkNotNull(taskManagerAddress);
 		this.tmpDirPaths = checkNotNull(tmpDirPaths);
+		this.localRecoveryStateRootDirectories = checkNotNull(localRecoveryStateRootDirectories);
+		this.localRecoveryMode = checkNotNull(localRecoveryMode);
 		this.networkConfig = checkNotNull(networkConfig);
 		this.queryableStateConfig = checkNotNull(queryableStateConfig);
 		this.numberOfSlots = checkNotNull(numberOfSlots);
@@ -115,6 +124,14 @@ public class TaskManagerServicesConfiguration {
 		return tmpDirPaths;
 	}
 
+	public String[] getLocalRecoveryStateRootDirectories() {
+		return localRecoveryStateRootDirectories;
+	}
+
+	public LocalRecoveryConfig.LocalRecoveryMode getLocalRecoveryMode() {
+		return localRecoveryMode;
+	}
+
 	public NetworkEnvironmentConfiguration getNetworkConfig() {
 		return networkConfig;
 	}
@@ -185,6 +202,15 @@ public class TaskManagerServicesConfiguration {
 		}
 
 		final String[] tmpDirs = ConfigurationUtils.parseTempDirectories(configuration);
+		String[] localStateRootDir = ConfigurationUtils.parseLocalStateDirectories(configuration);
+
+		if (localStateRootDir.length == 0) {
+			// default to temp dirs.
+			localStateRootDir = tmpDirs;
+		}
+
+		LocalRecoveryConfig.LocalRecoveryMode localRecoveryMode =
+			LocalRecoveryConfig.LocalRecoveryMode.fromConfig(configuration);
 
 		final NetworkEnvironmentConfiguration networkConfig = parseNetworkEnvironmentConfiguration(
 			configuration,
@@ -225,6 +251,8 @@ public class TaskManagerServicesConfiguration {
 		return new TaskManagerServicesConfiguration(
 			remoteAddress,
 			tmpDirs,
+			localStateRootDir,
+			localRecoveryMode,
 			networkConfig,
 			queryableStateConfig,
 			slots,

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index c22413e..1ecb47a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -1202,25 +1202,24 @@ public class Task implements Runnable, TaskActions, CheckpointListener {
 
 		if (executionState == ExecutionState.RUNNING && invokable != null) {
 
-				Runnable runnable = new Runnable() {
-					@Override
-					public void run() {
-						try {
-							invokable.notifyCheckpointComplete(checkpointID);
-						taskStateManager.notifyCheckpointComplete(checkpointID);}
-						catch (Throwable t) {
-							if (getExecutionState() == ExecutionState.RUNNING) {
-								// fail task if checkpoint confirmation failed.
-								failExternally(new RuntimeException(
-									"Error while confirming checkpoint",
-									t));
-							}
+			Runnable runnable = new Runnable() {
+				@Override
+				public void run() {
+					try {
+						invokable.notifyCheckpointComplete(checkpointID);
+						taskStateManager.notifyCheckpointComplete(checkpointID);
+					} catch (Throwable t) {
+						if (getExecutionState() == ExecutionState.RUNNING) {
+							// fail task if checkpoint confirmation failed.
+							failExternally(new RuntimeException(
+								"Error while confirming checkpoint",
+								t));
 						}
 					}
-				};
-				executeAsyncCallRunnable(runnable, "Checkpoint Confirmation for " +
-						taskNameWithSubtask);
-
+				}
+			};
+			executeAsyncCallRunnable(runnable, "Checkpoint Confirmation for " +
+				taskNameWithSubtask);
 		}
 		else {
 			LOG.debug("Ignoring checkpoint commit notification for non-running task {}.", taskNameWithSubtask);

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index 3173948..f62ef1b 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -48,6 +48,7 @@ import org.apache.flink.runtime.messages.JobManagerMessages
 import org.apache.flink.runtime.messages.JobManagerMessages.{RunningJobsStatus, StoppingFailure, StoppingResponse}
 import org.apache.flink.runtime.metrics.groups.{JobManagerMetricGroup, TaskManagerMetricGroup}
 import org.apache.flink.runtime.metrics.util.MetricUtils
+import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager
 import org.apache.flink.runtime.taskexecutor.{TaskExecutor, TaskManagerConfiguration, TaskManagerServices, TaskManagerServicesConfiguration}
 import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerLocation}
 import org.apache.flink.runtime.util.EnvironmentInformation
@@ -238,6 +239,7 @@ class LocalFlinkMiniCluster(
     val taskManagerServices = TaskManagerServices.fromConfiguration(
       taskManagerServicesConfiguration,
       resourceID,
+      ioExecutor,
       EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag,
       EnvironmentInformation.getMaxJvmHeapMemory)
 
@@ -254,6 +256,7 @@ class LocalFlinkMiniCluster(
       taskManagerServices.getMemoryManager(),
       taskManagerServices.getIOManager(),
       taskManagerServices.getNetworkEnvironment,
+      taskManagerServices.getTaskManagerStateStore,
       taskManagerMetricGroup)
 
     system.actorOf(props, taskManagerActorName)
@@ -318,6 +321,7 @@ class LocalFlinkMiniCluster(
     memoryManager: MemoryManager,
     ioManager: IOManager,
     networkEnvironment: NetworkEnvironment,
+    taskManagerLocalStateStoresManager: TaskExecutorLocalStateStoresManager,
     taskManagerMetricGroup: TaskManagerMetricGroup): Props = {
 
     TaskManager.getTaskManagerProps(
@@ -328,6 +332,7 @@ class LocalFlinkMiniCluster(
       memoryManager,
       ioManager,
       networkEnvironment,
+      taskManagerLocalStateStoresManager,
       highAvailabilityServices,
       taskManagerMetricGroup)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 485add5..15581b2 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -35,11 +35,11 @@ import org.apache.flink.configuration._
 import org.apache.flink.core.fs.FileSystem
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot
 import org.apache.flink.runtime.akka.{AkkaUtils, DefaultQuarantineHandler, QuarantineMonitor}
-import org.apache.flink.runtime.blob.{BlobCacheService, BlobClient, BlobService}
+import org.apache.flink.runtime.blob.BlobCacheService
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager
 import org.apache.flink.runtime.clusterframework.BootstrapTools
 import org.apache.flink.runtime.clusterframework.messages.StopCluster
-import org.apache.flink.runtime.clusterframework.types.ResourceID
+import org.apache.flink.runtime.clusterframework.types.{AllocationID, ResourceID}
 import org.apache.flink.runtime.concurrent.{Executors, FutureUtils}
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor
 import org.apache.flink.runtime.execution.ExecutionState
@@ -126,6 +126,7 @@ class TaskManager(
     protected val memoryManager: MemoryManager,
     protected val ioManager: IOManager,
     protected val network: NetworkEnvironment,
+    protected val taskManagerLocalStateStoresManager: TaskExecutorLocalStateStoresManager,
     protected val numberOfSlots: Int,
     protected val highAvailabilityServices: HighAvailabilityServices,
     protected val taskManagerMetricGroup: TaskManagerMetricGroup)
@@ -253,6 +254,12 @@ class TaskManager(
     }
 
     try {
+      taskManagerLocalStateStoresManager.shutdown()
+    } catch {
+      case t: Exception => log.error("Task state manager did not shutdown properly.", t)
+    }
+
+    try {
       fileCache.shutdown()
     } catch {
       case t: Exception => log.error("FileCache did not shutdown properly.", t)
@@ -474,7 +481,7 @@ class TaskManager(
             log.debug(s"Cannot find task to stop for execution ${executionID})")
             sender ! decorateMessage(Acknowledge.get())
           }
- 
+
         // cancels a task
         case CancelTask(executionID) =>
           val task = runningTasks.get(executionID)
@@ -984,7 +991,7 @@ class TaskManager(
         log.error(message, e)
         throw new RuntimeException(message, e)
     }
-    
+
     // watch job manager to detect when it dies
     context.watch(jobManager)
 
@@ -1070,7 +1077,7 @@ class TaskManager(
       // clear the key-value location oracle
       proxy.updateKvStateLocationOracle(HighAvailabilityServices.DEFAULT_JOB_ID, null)
     }
-    
+
     // failsafe shutdown of the metrics registry
     try {
       taskManagerMetricGroup.close()
@@ -1195,18 +1202,21 @@ class TaskManager(
           config.getTimeout().getSize(),
           config.getTimeout().getUnit()))
 
-      // TODO: wire this so that the manager survives the end of the task
-      val taskExecutorLocalStateStoresManager = new TaskExecutorLocalStateStoresManager
+      val jobID = jobInformation.getJobId
 
-      val localStateStore = taskExecutorLocalStateStoresManager.localStateStoreForTask(
-        jobInformation.getJobId,
+      // Allocation ids do not work properly without flip-6, so we just fake one, based on the jid.
+      val fakeAllocationID = new AllocationID(jobID.getLowerPart, jobID.getUpperPart)
+
+      val taskLocalStateStore = taskManagerLocalStateStoresManager.localStateStoreForSubtask(
+        jobID,
+        fakeAllocationID,
         taskInformation.getJobVertexId,
         tdd.getSubtaskIndex)
 
-      val slotStateManager = new TaskStateManagerImpl(
-        jobInformation.getJobId,
+      val taskStateManager = new TaskStateManagerImpl(
+        jobID,
         tdd.getExecutionAttemptId,
-        localStateStore,
+        taskLocalStateStore,
         tdd.getTaskRestore,
         checkpointResponder)
 
@@ -1224,7 +1234,7 @@ class TaskManager(
         ioManager,
         network,
         bcVarManager,
-        slotStateManager,
+        taskStateManager,
         taskManagerConnection,
         inputSplitProvider,
         checkpointResponder,
@@ -2013,6 +2023,7 @@ object TaskManager {
     val taskManagerServices = TaskManagerServices.fromConfiguration(
       taskManagerServicesConfiguration,
       resourceID,
+      actorSystem.dispatcher,
       EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag,
       EnvironmentInformation.getMaxJvmHeapMemory)
 
@@ -2030,6 +2041,7 @@ object TaskManager {
       taskManagerServices.getMemoryManager(),
       taskManagerServices.getIOManager(),
       taskManagerServices.getNetworkEnvironment(),
+      taskManagerServices.getTaskManagerStateStore(),
       highAvailabilityServices,
       taskManagerMetricGroup)
 
@@ -2047,6 +2059,7 @@ object TaskManager {
     memoryManager: MemoryManager,
     ioManager: IOManager,
     networkEnvironment: NetworkEnvironment,
+    taskStateManager: TaskExecutorLocalStateStoresManager,
     highAvailabilityServices: HighAvailabilityServices,
     taskManagerMetricGroup: TaskManagerMetricGroup
   ): Props = {
@@ -2058,6 +2071,7 @@ object TaskManager {
       memoryManager,
       ioManager,
       networkEnvironment,
+      taskStateManager,
       taskManagerConfig.getNumberSlots(),
       highAvailabilityServices,
       taskManagerMetricGroup)

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
index cbfe0ed..32b32cf 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
@@ -25,8 +25,9 @@ import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
-import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.OperatorStreamStateHandle;
 import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.util.TestLogger;
@@ -93,8 +94,8 @@ public class CheckpointCoordinatorFailureTest extends TestLogger {
 
 		KeyedStateHandle managedKeyedHandle = mock(KeyedStateHandle.class);
 		KeyedStateHandle rawKeyedHandle = mock(KeyedStateHandle.class);
-		OperatorStateHandle managedOpHandle = mock(OperatorStateHandle.class);
-		OperatorStateHandle rawOpHandle = mock(OperatorStateHandle.class);
+		OperatorStateHandle managedOpHandle = mock(OperatorStreamStateHandle.class);
+		OperatorStateHandle rawOpHandle = mock(OperatorStreamStateHandle.class);
 
 		final OperatorSubtaskState operatorSubtaskState = spy(new OperatorSubtaskState(
 			managedOpHandle,

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index c791fd8..1b2062a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -42,6 +42,7 @@ import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.OperatorStreamStateHandle;
 import org.apache.flink.runtime.state.PlaceholderStreamStateHandle;
 import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.StateHandleID;
@@ -52,7 +53,6 @@ import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.runtime.testutils.RecoverableCompletedCheckpointStore;
-import org.apache.flink.runtime.util.TestByteStreamStateHandleDeepCompare;
 import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
@@ -62,7 +62,6 @@ import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
-
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import org.mockito.verification.VerificationMode;
@@ -2739,7 +2738,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 		metaInfoMap.put("t-6", new OperatorStateHandle.StateMetaInfo(new long[]{121, 143, 147}, OperatorStateHandle.Mode.BROADCAST));
 
 		// this is what a single task will return
-		OperatorStateHandle osh = new OperatorStateHandle(metaInfoMap, new ByteStreamStateHandle("test", new byte[150]));
+		OperatorStateHandle osh = new OperatorStreamStateHandle(metaInfoMap, new ByteStreamStateHandle("test", new byte[150]));
 
 		OperatorStateRepartitioner repartitioner = RoundRobinOperatorStateRepartitioner.INSTANCE;
 		List<Collection<OperatorStateHandle>> repartitionedStates =
@@ -2817,7 +2816,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 
 		KeyGroupRangeOffsets keyGroupRangeOffsets = new KeyGroupRangeOffsets(keyGroupRange, serializedDataWithOffsets.f1.get(0));
 
-		ByteStreamStateHandle allSerializedStatesHandle = new TestByteStreamStateHandleDeepCompare(
+		ByteStreamStateHandle allSerializedStatesHandle = new ByteStreamStateHandle(
 				String.valueOf(UUID.randomUUID()),
 				serializedDataWithOffsets.f0);
 
@@ -2936,11 +2935,11 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			++idx;
 		}
 
-		ByteStreamStateHandle streamStateHandle = new TestByteStreamStateHandleDeepCompare(
+		ByteStreamStateHandle streamStateHandle = new ByteStreamStateHandle(
 			String.valueOf(UUID.randomUUID()),
 			serializationWithOffsets.f0);
 
-		return new OperatorStateHandle(offsetsMap, streamStateHandle);
+		return new OperatorStreamStateHandle(offsetsMap, streamStateHandle);
 	}
 
 	static ExecutionJobVertex mockExecutionJobVertex(
@@ -3265,7 +3264,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				}
 
 				OperatorStateHandle.Mode mode = r.nextInt(10) == 0 ?
-						OperatorStateHandle.Mode.UNION : OperatorStateHandle.Mode.SPLIT_DISTRIBUTE;
+					OperatorStateHandle.Mode.UNION : OperatorStateHandle.Mode.SPLIT_DISTRIBUTE;
 				namedStatesToOffsets.put(
 						"State-" + s,
 						new OperatorStateHandle.StateMetaInfo(offs, mode));
@@ -3282,7 +3281,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			}
 
 			previousParallelOpInstanceStates.add(
-					new OperatorStateHandle(namedStatesToOffsets, new FileStateHandle(fakePath, -1)));
+					new OperatorStreamStateHandle(namedStatesToOffsets, new FileStateHandle(fakePath, -1)));
 		}
 
 		Map<StreamStateHandle, Map<String, List<Long>>> expected = new HashMap<>();
@@ -3769,10 +3768,10 @@ public class CheckpointCoordinatorTest extends TestLogger {
 
 			OperatorSubtaskState operatorSubtaskState =
 				spy(new OperatorSubtaskState(
-					Collections.<OperatorStateHandle>emptyList(),
-					Collections.<OperatorStateHandle>emptyList(),
-					Collections.<KeyedStateHandle>singletonList(managedState),
-					Collections.<KeyedStateHandle>emptyList()));
+					StateObjectCollection.empty(),
+					StateObjectCollection.empty(),
+					StateObjectCollection.singleton(managedState),
+					StateObjectCollection.empty()));
 
 			Map<OperatorID, OperatorSubtaskState> opStates = new HashMap<>();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java
index 70794c6..ff787ec 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java
@@ -24,7 +24,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
-import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.OperatorStreamStateHandle;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
 import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
@@ -69,7 +69,7 @@ public class CheckpointMetadataLoadingTest {
 		OperatorID operatorID = OperatorID.fromJobVertexID(jobVertexID);
 
 		OperatorSubtaskState subtaskState = new OperatorSubtaskState(
-				new OperatorStateHandle(
+				new OperatorStreamStateHandle(
 				Collections.emptyMap(),
 				new ByteStreamStateHandle("testHandler", new byte[0])),
 				null,

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
index ab353a9..af6ec71 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
@@ -30,7 +30,6 @@ import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedStateHandle;
-import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
@@ -123,10 +122,10 @@ public class CheckpointStateRestoreTest {
 			subtaskStates.putSubtaskStateByOperatorID(
 				OperatorID.fromJobVertexID(statefulId),
 				new OperatorSubtaskState(
-					Collections.<OperatorStateHandle>emptyList(),
-					Collections.<OperatorStateHandle>emptyList(),
-					Collections.singletonList(serializedKeyGroupStates),
-					Collections.<KeyedStateHandle>emptyList()));
+					StateObjectCollection.empty(),
+					StateObjectCollection.empty(),
+					StateObjectCollection.singleton(serializedKeyGroupStates),
+					StateObjectCollection.empty()));
 
 			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec1.getAttemptId(), checkpointId, new CheckpointMetrics(), subtaskStates));
 			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec2.getAttemptId(), checkpointId, new CheckpointMetrics(), subtaskStates));


Mime
View raw message