flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject [2/3] flink git commit: [FLINK-4067] [runtime] Add savepoint headers
Date Tue, 26 Jul 2016 09:30:54 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/f0ac261a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
new file mode 100644
index 0000000..71fcb34
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.savepoint;
+
+/**
+ * Savepoint store used to persist {@link Savepoint} instances.
+ *
+ * <p>The main implementation is the {@link FsSavepointStore}. We also have the
+ * {@link HeapSavepointStore} for historical reasons (introduced in Flink 1.0).
+ */
+public interface SavepointStore {
+
+	/**
+	 * Stores the savepoint.
+	 *
+	 * @param savepoint Savepoint to be stored
+	 * @param <T>       Savepoint type
+	 * @return Path of stored savepoint
+	 * @throws Exception Failures during store are forwarded
+	 */
+	<T extends Savepoint> String storeSavepoint(T savepoint) throws Exception;
+
+	/**
+	 * Loads the savepoint at the specified path.
+	 *
+	 * @param path Path of savepoint to load
+	 * @return The loaded savepoint
+	 * @throws Exception Failures during load are forwared
+	 */
+	Savepoint loadSavepoint(String path) throws Exception;
+
+	/**
+	 * Disposes the savepoint at the specified path.
+	 *
+	 * <p>The class loader is needed, because savepoints can currently point to
+	 * arbitrary snapshot {@link org.apache.flink.runtime.state.StateHandle}
+	 * instances, which need the user code class loader for deserialization.
+	 *
+	 * @param path        Path of savepoint to dispose
+	 * @param classLoader Class loader for disposal
+	 * @throws Exception Failures during diposal are forwarded
+	 */
+	void disposeSavepoint(String path, ClassLoader classLoader) throws Exception;
+
+	/**
+	 * Shut downs the savepoint store.
+	 *
+	 * <p>Only necessary for implementations where the savepoint life-cycle is
+	 * bound to the cluster life-cycle.
+	 *
+	 * @throws Exception Failures during shut down are forwarded
+	 */
+	void shutdown() throws Exception;
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f0ac261a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreFactory.java
new file mode 100644
index 0000000..d2bf40e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreFactory.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.savepoint;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * Factory for {@link SavepointStore} instances.
+ */
+public class SavepointStoreFactory {
+
+	public static final String SAVEPOINT_BACKEND_KEY = "savepoints.state.backend";
+	public static final String SAVEPOINT_DIRECTORY_KEY = "savepoints.state.backend.fs.dir";
+	public static final String DEFAULT_SAVEPOINT_BACKEND = "jobmanager";
+
+	public static final Logger LOG = LoggerFactory.getLogger(SavepointStoreFactory.class);
+
+	/**
+	 * Creates a {@link SavepointStore} from the specified Configuration.
+	 *
+	 * <p>You can configure a savepoint-specific backend for the savepoints. If
+	 * you don't configure anything, the regular checkpoint backend will be
+	 * used.
+	 *
+	 * <p>The default and fallback backend is the job manager, which loses the
+	 * savepoint after the job manager shuts down.
+	 *
+	 * @param config The configuration to parse the savepoint backend configuration.
+	 * @return A savepoint store.
+	 */
+	public static SavepointStore createFromConfig(Configuration config) throws Exception {
+
+		// Try a the savepoint-specific configuration first.
+		String savepointBackend = config.getString(SAVEPOINT_BACKEND_KEY, DEFAULT_SAVEPOINT_BACKEND);
+
+		if (savepointBackend == null) {
+			LOG.info("No savepoint state backend configured. " +
+					"Using job manager savepoint state backend.");
+			return createJobManagerSavepointStore();
+		} else if (savepointBackend.equals("jobmanager")) {
+			LOG.info("Using job manager savepoint state backend.");
+			return createJobManagerSavepointStore();
+		} else if (savepointBackend.equals("filesystem")) {
+			String rootPath = config.getString(SAVEPOINT_DIRECTORY_KEY, null);
+
+			if (rootPath == null) {
+				throw new IllegalConfigurationException("Using filesystem as savepoint state backend, " +
+						"but did not specify directory. Please set the " +
+						"following configuration key: '" + SAVEPOINT_DIRECTORY_KEY +
+						"' (e.g. " + SAVEPOINT_DIRECTORY_KEY + ": hdfs:///flink/savepoints/). " +
+						"Falling back to job manager savepoint backend.");
+			} else {
+				LOG.info("Using filesystem savepoint backend (root path: {}).", rootPath);
+
+				return createFileSystemSavepointStore(rootPath);
+			}
+		} else {
+			throw new IllegalConfigurationException("Unexpected savepoint backend " +
+					"configuration '" + savepointBackend + "'. " +
+					"Falling back to job manager savepoint state backend.");
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	// Savepoint stores
+	// ------------------------------------------------------------------------
+
+	private static SavepointStore createJobManagerSavepointStore() {
+		return new HeapSavepointStore();
+	}
+
+	private static SavepointStore createFileSystemSavepointStore(String rootPath) throws IOException {
+		return new FsSavepointStore(rootPath, "savepoint-");
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f0ac261a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV0.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV0.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV0.java
new file mode 100644
index 0000000..9fd950d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV0.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.savepoint;
+
+import org.apache.flink.runtime.checkpoint.TaskState;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+
+/**
+ * Savepoint version 0.
+ *
+ * <p>This format was introduced with Flink 1.1.0.
+ */
+public class SavepointV0 implements Savepoint {
+
+	/** The savepoint version. */
+	public static final int VERSION = 0;
+
+	/** The checkpoint ID */
+	private final long checkpointId;
+
+	/** The task states */
+	private final Collection<TaskState> taskStates;
+
+	SavepointV0(long checkpointId, Collection<TaskState> taskStates) {
+		this.checkpointId = checkpointId;
+		this.taskStates = Preconditions.checkNotNull(taskStates, "Task States");
+	}
+
+	@Override
+	public int getVersion() {
+		return VERSION;
+	}
+
+	@Override
+	public long getCheckpointId() {
+		return checkpointId;
+	}
+
+	@Override
+	public Collection<TaskState> getTaskStates() {
+		return taskStates;
+	}
+
+	@Override
+	public void dispose(ClassLoader classLoader) throws Exception {
+		Preconditions.checkNotNull(classLoader, "Class loader");
+		for (TaskState taskState : taskStates) {
+			taskState.discard(classLoader);
+		}
+		taskStates.clear();
+	}
+
+	@Override
+	public String toString() {
+		return "Savepoint(version=" + VERSION + ")";
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+
+		SavepointV0 that = (SavepointV0) o;
+		return checkpointId == that.checkpointId && getTaskStates().equals(that.getTaskStates());
+	}
+
+	@Override
+	public int hashCode() {
+		int result = (int) (checkpointId ^ (checkpointId >>> 32));
+		result = 31 * result + taskStates.hashCode();
+		return result;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f0ac261a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
new file mode 100644
index 0000000..8de29a6
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.savepoint;
+
+import org.apache.flink.runtime.checkpoint.KeyGroupState;
+import org.apache.flink.runtime.checkpoint.SubtaskState;
+import org.apache.flink.runtime.checkpoint.TaskState;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.util.SerializedValue;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Serializer for {@link SavepointV0} instances.
+ *
+ * <p>In contrast to previous savepoint versions, this serializer makes sure
+ * that no default Java serialization is used for serialization. Therefore, we
+ * don't rely on any involved Java classes to stay the same.
+ */
+class SavepointV1Serializer implements SavepointSerializer<SavepointV0> {
+
+	public static final SavepointV1Serializer INSTANCE = new SavepointV1Serializer();
+
+	private SavepointV1Serializer() {
+	}
+
+	@Override
+	public void serialize(SavepointV0 savepoint, DataOutputStream dos) throws IOException {
+		dos.writeLong(savepoint.getCheckpointId());
+
+		Collection<TaskState> taskStates = savepoint.getTaskStates();
+		dos.writeInt(taskStates.size());
+
+		for (TaskState taskState : savepoint.getTaskStates()) {
+			// Vertex ID
+			dos.writeLong(taskState.getJobVertexID().getLowerPart());
+			dos.writeLong(taskState.getJobVertexID().getUpperPart());
+
+			// Parallelism
+			int parallelism = taskState.getParallelism();
+			dos.writeInt(parallelism);
+
+			// Sub task states
+			dos.writeInt(taskState.getNumberCollectedStates());
+
+			for (int i = 0; i < parallelism; i++) {
+				SubtaskState subtaskState = taskState.getState(i);
+
+				if (subtaskState != null) {
+					dos.writeInt(i);
+
+					SerializedValue<?> serializedValue = subtaskState.getState();
+					if (serializedValue == null) {
+						dos.writeInt(-1); // null
+					} else {
+						byte[] serialized = serializedValue.getByteArray();
+						dos.writeInt(serialized.length);
+						dos.write(serialized, 0, serialized.length);
+					}
+
+					dos.writeLong(subtaskState.getStateSize());
+					dos.writeLong(subtaskState.getDuration());
+				}
+			}
+
+			// Key group states
+			dos.writeInt(taskState.getNumberCollectedKvStates());
+
+			for (int i = 0; i < parallelism; i++) {
+				KeyGroupState keyGroupState = taskState.getKvState(i);
+
+				if (keyGroupState != null) {
+					dos.write(i);
+
+					SerializedValue<?> serializedValue = keyGroupState.getKeyGroupState();
+					if (serializedValue == null) {
+						dos.writeInt(-1); // null
+					} else {
+						byte[] serialized = serializedValue.getByteArray();
+						dos.writeInt(serialized.length);
+						dos.write(serialized, 0, serialized.length);
+					}
+
+					dos.writeLong(keyGroupState.getStateSize());
+					dos.writeLong(keyGroupState.getDuration());
+				}
+			}
+		}
+	}
+
+	@Override
+	public SavepointV0 deserialize(DataInputStream dis) throws IOException {
+		long checkpointId = dis.readLong();
+
+		// Task states
+		int numTaskStates = dis.readInt();
+		List<TaskState> taskStates = new ArrayList<>(numTaskStates);
+
+		for (int i = 0; i < numTaskStates; i++) {
+			JobVertexID jobVertexId = new JobVertexID(dis.readLong(), dis.readLong());
+			int parallelism = dis.readInt();
+
+			// Add task state
+			TaskState taskState = new TaskState(jobVertexId, parallelism);
+			taskStates.add(taskState);
+
+			// Sub task states
+			int numSubTaskStates = dis.readInt();
+			for (int j = 0; j < numSubTaskStates; j++) {
+				int subtaskIndex = dis.readInt();
+
+				int length = dis.readInt();
+
+				SerializedValue<StateHandle<?>> serializedValue;
+				if (length == -1) {
+					serializedValue = new SerializedValue<>(null);
+				} else {
+					byte[] serializedData = new byte[length];
+					dis.read(serializedData, 0, length);
+					serializedValue = SerializedValue.fromBytes(serializedData);
+				}
+
+				long stateSize = dis.readLong();
+				long duration = dis.readLong();
+
+				SubtaskState subtaskState = new SubtaskState(
+						serializedValue,
+						stateSize,
+						duration);
+
+				taskState.putState(subtaskIndex, subtaskState);
+			}
+
+			// Key group states
+			int numKvStates = dis.readInt();
+			for (int j = 0; j < numKvStates; j++) {
+				int keyGroupIndex = dis.readInt();
+
+				int length = dis.readInt();
+
+				SerializedValue<StateHandle<?>> serializedValue;
+				if (length == -1) {
+					serializedValue = new SerializedValue<>(null);
+				} else {
+					byte[] serializedData = new byte[length];
+					dis.read(serializedData, 0, length);
+					serializedValue = SerializedValue.fromBytes(serializedData);
+				}
+
+				long stateSize = dis.readLong();
+				long duration = dis.readLong();
+
+				KeyGroupState keyGroupState = new KeyGroupState(
+						serializedValue,
+						stateSize,
+						duration);
+
+				taskState.putKvState(keyGroupIndex, keyGroupState);
+			}
+		}
+
+		return new SavepointV0(checkpointId, taskStates);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f0ac261a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index d405b88..4229105 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -35,10 +35,9 @@ import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
 import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
-import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
-import org.apache.flink.runtime.checkpoint.SavepointCoordinator;
-import org.apache.flink.runtime.checkpoint.StateStore;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointCoordinator;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore;
 import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.SuppressRestartsException;
@@ -361,9 +360,9 @@ public class ExecutionGraph implements Serializable {
 			ActorSystem actorSystem,
 			UUID leaderSessionID,
 			CheckpointIDCounter checkpointIDCounter,
-			CompletedCheckpointStore completedCheckpointStore,
+			CompletedCheckpointStore checkpointStore,
 			RecoveryMode recoveryMode,
-			StateStore<CompletedCheckpoint> savepointStore,
+			SavepointStore savepointStore,
 			CheckpointStatsTracker statsTracker) throws Exception {
 
 		// simple sanity checks
@@ -396,7 +395,7 @@ public class ExecutionGraph implements Serializable {
 				tasksToCommitTo,
 				userClassLoader,
 				checkpointIDCounter,
-				completedCheckpointStore,
+				checkpointStore,
 				recoveryMode,
 				checkpointStatsTracker);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f0ac261a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertexID.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertexID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertexID.java
index 868c124..514aabc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertexID.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertexID.java
@@ -32,7 +32,11 @@ public class JobVertexID extends AbstractID {
 	public JobVertexID() {
 		super();
 	}
-	
+
+	public JobVertexID(long lowerPart, long upperPart) {
+		super(lowerPart, upperPart);
+	}
+
 	public JobVertexID(byte[] bytes) {
 		super(bytes);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/f0ac261a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
index 5d3a5c4..3986fed 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
@@ -25,8 +25,8 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
-import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore;
 import org.apache.flink.runtime.jobmanager.RecoveryMode;

http://git-wip-us.apache.org/repos/asf/flink/blob/f0ac261a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index ccbd263..ef52381 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -41,6 +41,7 @@ import org.apache.flink.runtime.accumulators.AccumulatorSnapshot
 import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour}
 import org.apache.flink.runtime.blob.BlobServer
 import org.apache.flink.runtime.checkpoint._
+import org.apache.flink.runtime.checkpoint.savepoint.{SavepointStoreFactory, SavepointStore}
 import org.apache.flink.runtime.checkpoint.stats.{CheckpointStatsTracker, SimpleCheckpointStatsTracker, DisabledCheckpointStatsTracker}
 import org.apache.flink.runtime.client._
 import org.apache.flink.runtime.execution.SuppressRestartsException
@@ -212,14 +213,6 @@ class JobManager(
         throw new RuntimeException("Could not start the checkpoint recovery service.", e)
     }
 
-    try {
-      savepointStore.start()
-    } catch {
-      case e: Exception =>
-        log.error("Could not start the savepoint store.", e)
-        throw new RuntimeException("Could not start the  savepoint store store.", e)
-    }
-
     jobManagerMetricGroup match {
       case Some(group) =>
         instantiateMetrics(group)
@@ -249,10 +242,10 @@ class JobManager(
     }
 
     try {
-      savepointStore.stop()
+      savepointStore.shutdown()
     } catch {
       case e: Exception =>
-        log.error("Could not stop the savepoint store.", e)
+        log.error("Could not shut down savepoint store.", e)
         throw new RuntimeException("Could not stop the  savepoint store store.", e)
     }
 
@@ -751,10 +744,6 @@ class JobManager(
         try {
           log.info(s"Disposing savepoint at '$savepointPath'.")
 
-          val savepoint = savepointStore.getState(savepointPath)
-
-          log.debug(s"$savepoint")
-
           if (blobKeys.isDefined) {
             // We don't need a real ID here for the library cache manager
             val jid = new JobID()
@@ -764,18 +753,15 @@ class JobManager(
               val classLoader = libraryCacheManager.getClassLoader(jid)
 
               // Discard with user code loader
-              savepoint.discard(classLoader)
+              savepointStore.disposeSavepoint(savepointPath, classLoader)
             } finally {
               libraryCacheManager.unregisterJob(jid)
             }
           } else {
             // Discard with system class loader
-            savepoint.discard(getClass.getClassLoader)
+            savepointStore.disposeSavepoint(savepointPath, getClass.getClassLoader)
           }
 
-          // Dispose the savepoint
-          savepointStore.disposeState(savepointPath)
-
           senderRef ! DisposeSavepointSuccess
         } catch {
           case t: Throwable =>
@@ -1237,7 +1223,7 @@ class JobManager(
             snapshotSettings.getVerticesToConfirm().asScala.map(idToVertex).asJava
 
           val completedCheckpoints = checkpointRecoveryFactory
-            .createCompletedCheckpoints(jobId, userCodeLoader)
+            .createCheckpointStore(jobId, userCodeLoader)
 
           val checkpointIdCounter = checkpointRecoveryFactory.createCheckpointIDCounter(jobId)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f0ac261a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
index 04689c6..bebcf7b 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
@@ -22,7 +22,8 @@ import akka.actor.ActorRef
 
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.metrics.MetricRegistry
-import org.apache.flink.runtime.checkpoint.{SavepointStore, CheckpointRecoveryFactory}
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
 import org.apache.flink.runtime.instance.InstanceManager

http://git-wip-us.apache.org/repos/asf/flink/blob/f0ac261a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
index 5ccebe2..2fe830f 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
@@ -293,7 +293,7 @@ trait TestingJobManagerLike extends FlinkActor {
 
     case RequestSavepoint(savepointPath) =>
       try {
-        val savepoint = savepointStore.getState(savepointPath)
+        val savepoint = savepointStore.loadSavepoint(savepointPath)
         sender ! ResponseSavepoint(savepoint)
       }
       catch {

http://git-wip-us.apache.org/repos/asf/flink/blob/f0ac261a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
index ec513e3..a411c8b 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.checkpoint.CompletedCheckpoint
 import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID, ExecutionGraph}
 import org.apache.flink.runtime.instance.ActorGateway
 import org.apache.flink.runtime.jobgraph.JobStatus
+import org.apache.flink.runtime.checkpoint.savepoint.Savepoint
 
 object TestingJobManagerMessages {
 
@@ -107,7 +108,7 @@ object TestingJobManagerMessages {
     *
     * @param savepoint The requested savepoint or null if none available.
     */
-  case class ResponseSavepoint(savepoint: CompletedCheckpoint)
+  case class ResponseSavepoint(savepoint: Savepoint)
 
   def getNotifyWhenLeader(): AnyRef = NotifyWhenLeader
   def getDisablePostStop(): AnyRef = DisablePostStop

http://git-wip-us.apache.org/repos/asf/flink/blob/f0ac261a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/AbstractStateStoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/AbstractStateStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/AbstractStateStoreTest.java
deleted file mode 100644
index f9f844c..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/AbstractStateStoreTest.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.checkpoint;
-
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Basic {@link StateStore} behaviour test.
- */
-public abstract class AbstractStateStoreTest {
-
-	abstract StateStore<Integer> createStateStore() throws Exception;
-
-	abstract boolean verifyDiscarded(StateStore<Integer> stateStore, String path);
-
-	@Test
-	public void testSimplePutGetDiscardSequence() throws Exception {
-		StateStore<Integer> stateStore = createStateStore();
-
-		// Put
-		Integer expectedState = 211155;
-		String path = stateStore.putState(expectedState);
-
-		// Get
-		Integer actualState = stateStore.getState(path);
-		assertEquals(expectedState, actualState);
-
-		// Dispose
-		stateStore.disposeState(path);
-		assertTrue(verifyDiscarded(stateStore, path));
-	}
-
-	@Test(expected = IllegalArgumentException.class)
-	public void testGetStateInvalidPathThrowsException() throws Exception {
-		StateStore<Integer> stateStore = createStateStore();
-		stateStore.getState("testGetStateInvalidPathThrowsException");
-	}
-
-	@Test(expected = IllegalArgumentException.class)
-	public void testDisposeStateInvalidPathThrowsException() throws Exception {
-		StateStore<Integer> stateStore = createStateStore();
-		stateStore.getState("testDisposeStateInvalidPathThrowsException");
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f0ac261a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
index 079c341..84d809a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
@@ -71,7 +71,7 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
 		assertEquals(0, checkpoints.getNumberOfRetainedCheckpoints());
 		assertEquals(0, checkpoints.getAllCheckpoints().size());
 
-		TestCheckpoint[] expected = new TestCheckpoint[] {
+		TestCompletedCheckpoint[] expected = new TestCompletedCheckpoint[] {
 				createCheckpoint(0), createCheckpoint(1) };
 
 		// Add and get latest
@@ -92,7 +92,7 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
 	public void testAddCheckpointMoreThanMaxRetained() throws Exception {
 		CompletedCheckpointStore checkpoints = createCompletedCheckpoints(1, userClassLoader);
 
-		TestCheckpoint[] expected = new TestCheckpoint[] {
+		TestCompletedCheckpoint[] expected = new TestCompletedCheckpoint[] {
 				createCheckpoint(0), createCheckpoint(1),
 				createCheckpoint(2), createCheckpoint(3)
 		};
@@ -137,12 +137,12 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
 	public void testGetAllCheckpoints() throws Exception {
 		CompletedCheckpointStore checkpoints = createCompletedCheckpoints(4, userClassLoader);
 
-		TestCheckpoint[] expected = new TestCheckpoint[] {
+		TestCompletedCheckpoint[] expected = new TestCompletedCheckpoint[] {
 				createCheckpoint(0), createCheckpoint(1),
 				createCheckpoint(2), createCheckpoint(3)
 		};
 
-		for (TestCheckpoint checkpoint : expected) {
+		for (TestCompletedCheckpoint checkpoint : expected) {
 			checkpoints.addCheckpoint(checkpoint);
 		}
 
@@ -162,12 +162,12 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
 	public void testDiscardAllCheckpoints() throws Exception {
 		CompletedCheckpointStore checkpoints = createCompletedCheckpoints(4, userClassLoader);
 
-		TestCheckpoint[] expected = new TestCheckpoint[] {
+		TestCompletedCheckpoint[] expected = new TestCompletedCheckpoint[] {
 				createCheckpoint(0), createCheckpoint(1),
 				createCheckpoint(2), createCheckpoint(3)
 		};
 
-		for (TestCheckpoint checkpoint : expected) {
+		for (TestCompletedCheckpoint checkpoint : expected) {
 			checkpoints.addCheckpoint(checkpoint);
 		}
 
@@ -179,7 +179,7 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
 		assertEquals(0, checkpoints.getNumberOfRetainedCheckpoints());
 
 		// All have been discarded
-		for (TestCheckpoint checkpoint : expected) {
+		for (TestCompletedCheckpoint checkpoint : expected) {
 			// The ZooKeeper implementation discards asynchronously
 			checkpoint.awaitDiscard();
 			assertTrue(checkpoint.isDiscarded());
@@ -189,11 +189,11 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
 
 	// ---------------------------------------------------------------------------------------------
 
-	protected TestCheckpoint createCheckpoint(int id) throws IOException {
+	protected TestCompletedCheckpoint createCheckpoint(int id) throws IOException {
 		return createCheckpoint(id, 4);
 	}
 
-	protected TestCheckpoint createCheckpoint(int id, int numberOfStates)
+	protected TestCompletedCheckpoint createCheckpoint(int id, int numberOfStates)
 			throws IOException {
 
 		JobVertexID jvid = new JobVertexID();
@@ -209,7 +209,7 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
 			taskState.putState(i, new SubtaskState(stateHandle, 0, 0));
 		}
 
-		return new TestCheckpoint(new JobID(), id, 0, taskGroupStates);
+		return new TestCompletedCheckpoint(new JobID(), id, 0, taskGroupStates);
 	}
 
 	private void verifyCheckpoint(CompletedCheckpoint expected, CompletedCheckpoint actual) {
@@ -221,7 +221,7 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
 	 * used when discarding. Spying on a regular {@link CompletedCheckpoint} instance with
 	 * Mockito doesn't work, because it it breaks serializability.
 	 */
-	protected static class TestCheckpoint extends CompletedCheckpoint {
+	protected static class TestCompletedCheckpoint extends CompletedCheckpoint {
 
 		private static final long serialVersionUID = 4211419809665983026L;
 
@@ -232,7 +232,7 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
 
 		private transient ClassLoader discardClassLoader;
 
-		public TestCheckpoint(
+		public TestCompletedCheckpoint(
 			JobID jobId,
 			long checkpointId,
 			long timestamp,
@@ -274,7 +274,7 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
 			if (this == o) return true;
 			if (o == null || getClass() != o.getClass()) return false;
 
-			TestCheckpoint that = (TestCheckpoint) o;
+			TestCompletedCheckpoint that = (TestCompletedCheckpoint) o;
 
 			return getJobId().equals(that.getJobId())
 					&& getCheckpointID() == that.getCheckpointID();

http://git-wip-us.apache.org/repos/asf/flink/blob/f0ac261a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
index b9ad1bd..0f2c2b2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
@@ -26,6 +26,8 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.checkpoint.savepoint.HeapSavepointStore;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointCoordinator;
 import org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
@@ -148,7 +150,7 @@ public class ExecutionGraphCheckpointCoordinatorTest {
 				counter,
 				store,
 				RecoveryMode.STANDALONE,
-				new HeapStateStore<CompletedCheckpoint>(),
+				new HeapSavepointStore(),
 				new DisabledCheckpointStatsTracker());
 
 		JobVertex jobVertex = new JobVertex("MockVertex");

http://git-wip-us.apache.org/repos/asf/flink/blob/f0ac261a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FileSystemStateStoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FileSystemStateStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FileSystemStateStoreTest.java
deleted file mode 100644
index 7779f61..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FileSystemStateStoreTest.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.checkpoint;
-
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.testutils.CommonTestUtils;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-
-import java.io.File;
-import java.util.Arrays;
-
-import static org.junit.Assert.assertFalse;
-
-/**
- * Basic {@link FileSystemStateStore} behaviour test.
- */
-public class FileSystemStateStoreTest extends AbstractStateStoreTest {
-
-	private static File rootPath;
-
-	@BeforeClass
-	public static void setUp() throws Exception {
-		rootPath = CommonTestUtils.createTempDirectory();
-	}
-
-	@AfterClass
-	public static void tearDown() throws Exception {
-		if (rootPath != null) {
-			try {
-				String errMsg = "Root path " + rootPath.getPath() + " not cleaned up. "
-						+ Arrays.toString(rootPath.listFiles());
-				assertFalse(errMsg, rootPath.exists());
-			}
-			finally {
-				org.apache.commons.io.FileUtils.deleteDirectory(rootPath);
-			}
-		}
-	}
-
-	@Override
-	StateStore<Integer> createStateStore() throws Exception {
-		return new FileSystemStateStore<>(new Path(rootPath.toURI()), "test_savepoint-");
-	}
-
-	@Override
-	@SuppressWarnings("unchecked")
-	boolean verifyDiscarded(StateStore<Integer> stateStore, String path) {
-		return true;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f0ac261a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/HeapStateStoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/HeapStateStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/HeapStateStoreTest.java
deleted file mode 100644
index 86b8fde..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/HeapStateStoreTest.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.checkpoint;
-
-import org.mockito.internal.util.reflection.Whitebox;
-
-import java.util.Map;
-
-/**
- * Basic {@link HeapStateStore} behaviour test.
- */
-public class HeapStateStoreTest extends AbstractStateStoreTest {
-
-	@Override
-	StateStore<Integer> createStateStore() {
-		return new HeapStateStore<>();
-	}
-
-	@Override
-	@SuppressWarnings("unchecked")
-	boolean verifyDiscarded(StateStore<Integer> stateStore, String path) {
-		Map<Integer, Integer> stateMap = (Map<Integer, Integer>) Whitebox
-				.getInternalState(stateStore, "stateMap");
-
-		return !stateMap.containsKey(path);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f0ac261a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SavepointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SavepointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SavepointCoordinatorTest.java
deleted file mode 100644
index b103793..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SavepointCoordinatorTest.java
+++ /dev/null
@@ -1,1140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.checkpoint;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker;
-import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.Execution;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobmanager.RecoveryMode;
-import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
-import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
-import org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete;
-import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint;
-import org.apache.flink.runtime.state.LocalStateHandle;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.runtime.testutils.CommonTestUtils;
-import org.apache.flink.util.SerializedValue;
-import org.apache.flink.util.TestLogger;
-import org.junit.Test;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.Promise;
-import scala.concurrent.duration.Deadline;
-import scala.concurrent.duration.FiniteDuration;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.Serializable;
-import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-/**
- * Tests for the savepoint coordinator.
- */
-public class SavepointCoordinatorTest extends TestLogger {
-
-	// ------------------------------------------------------------------------
-	// Trigger and acknowledge
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Simple trigger-acknowledge test for a single savepoint.
-	 */
-	@Test
-	public void testSimpleTriggerSavepoint() throws Exception {
-		JobID jobId = new JobID();
-		long checkpointTimeout = 60 * 1000;
-		long timestamp = 1272635;
-		ExecutionVertex[] vertices = new ExecutionVertex[] {
-				mockExecutionVertex(jobId),
-				mockExecutionVertex(jobId) };
-		MockCheckpointIdCounter checkpointIdCounter = new MockCheckpointIdCounter();
-		HeapStateStore<CompletedCheckpoint> savepointStore = new HeapStateStore<>();
-
-		SavepointCoordinator coordinator = createSavepointCoordinator(
-				jobId,
-				checkpointTimeout,
-				vertices,
-				vertices,
-				vertices,
-				checkpointIdCounter,
-				savepointStore);
-
-		// Trigger the savepoint
-		Future<String> savepointPathFuture = coordinator.triggerSavepoint(timestamp);
-		assertFalse(savepointPathFuture.isCompleted());
-
-		long checkpointId = checkpointIdCounter.getLastReturnedCount();
-		assertEquals(0, checkpointId);
-
-		// Verify send trigger messages
-		for (ExecutionVertex vertex : vertices) {
-			verifyTriggerCheckpoint(vertex, checkpointId, timestamp);
-		}
-
-		PendingCheckpoint pendingCheckpoint = coordinator.getPendingCheckpoints()
-				.get(checkpointId);
-
-		verifyPendingCheckpoint(pendingCheckpoint, jobId, checkpointId,
-				timestamp, 0, 2, 0, false, false);
-
-		// Acknowledge tasks
-		for (ExecutionVertex vertex : vertices) {
-			coordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(
-					jobId, vertex.getCurrentExecutionAttempt().getAttemptId(),
-					checkpointId, createSerializedStateHandle(vertex), 0));
-		}
-
-		// The pending checkpoint is completed
-		assertTrue(pendingCheckpoint.isDiscarded());
-		assertEquals(0, coordinator.getSuccessfulCheckpoints().size());
-
-		// Verify send notify complete messages
-		for (ExecutionVertex vertex : vertices) {
-			verifyNotifyCheckpointComplete(vertex, checkpointId, timestamp);
-		}
-
-		// Verify that the future has been completed
-		assertTrue(savepointPathFuture.isCompleted());
-		String savepointPath = Await.result(savepointPathFuture, FiniteDuration.Zero());
-
-		// Verify the savepoint
-		CompletedCheckpoint savepoint = savepointStore.getState(savepointPath);
-		verifySavepoint(savepoint, jobId, checkpointId, timestamp,
-				vertices);
-
-		// Verify all promises removed
-		assertEquals(0, getSavepointPromises(coordinator).size());
-
-		coordinator.shutdown();
-	}
-
-	/**
-	 * This test triggers a checkpoint and then sends a decline checkpoint message from
-	 * one of the tasks. The expected behaviour is that said checkpoint is discarded and a new
-	 * checkpoint is triggered.
-	 */
-	@Test
-	public void testTriggerAndDeclineCheckpointSimple() throws Exception {
-		JobID jobId = new JobID();
-		long checkpointTimeout = 60 * 1000;
-		long timestamp = 1272635;
-		ExecutionVertex[] vertices = new ExecutionVertex[] {
-				mockExecutionVertex(jobId),
-				mockExecutionVertex(jobId) };
-		MockCheckpointIdCounter checkpointIdCounter = new MockCheckpointIdCounter();
-		HeapStateStore<CompletedCheckpoint> savepointStore = new HeapStateStore<>();
-
-		SavepointCoordinator coordinator = createSavepointCoordinator(
-				jobId,
-				checkpointTimeout,
-				vertices,
-				vertices,
-				vertices,
-				checkpointIdCounter,
-				savepointStore);
-
-		// Trigger the savepoint
-		Future<String> savepointPathFuture = coordinator.triggerSavepoint(timestamp);
-		assertFalse(savepointPathFuture.isCompleted());
-
-		long checkpointId = checkpointIdCounter.getLastReturnedCount();
-		assertEquals(0, checkpointId);
-
-		// Verify send trigger messages
-		for (ExecutionVertex vertex : vertices) {
-			verifyTriggerCheckpoint(vertex, checkpointId, timestamp);
-		}
-
-		PendingCheckpoint pendingCheckpoint = coordinator.getPendingCheckpoints()
-				.get(checkpointId);
-
-		verifyPendingCheckpoint(pendingCheckpoint, jobId, checkpointId,
-				timestamp, 0, 2, 0, false, false);
-
-		// Acknowledge and decline tasks
-		coordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(
-				jobId, vertices[0].getCurrentExecutionAttempt().getAttemptId(),
-				checkpointId, createSerializedStateHandle(vertices[0]), 0));
-
-		coordinator.receiveDeclineMessage(new DeclineCheckpoint(
-				jobId, vertices[1].getCurrentExecutionAttempt().getAttemptId(),
-				checkpointId, 0));
-
-
-		// The pending checkpoint is completed
-		assertTrue(pendingCheckpoint.isDiscarded());
-		assertEquals(0, coordinator.getSuccessfulCheckpoints().size());
-
-		// Verify that the future has been completed
-		assertTrue(savepointPathFuture.isCompleted());
-
-		try {
-			Await.result(savepointPathFuture.failed(), FiniteDuration.Zero());
-			fail("Did not throw expected exception");
-		} catch (Throwable ignored) {}
-
-		// Verify all promises removed
-		assertEquals(0, getSavepointPromises(coordinator).size());
-
-		coordinator.shutdown();
-	}
-
-	// ------------------------------------------------------------------------
-	// Rollback
-	// ------------------------------------------------------------------------
-
-	@Test
-	@SuppressWarnings("unchecked")
-	public void testSimpleRollbackSavepoint() throws Exception {
-		JobID jobId = new JobID();
-
-		ExecutionJobVertex[] jobVertices = new ExecutionJobVertex[] {
-				mockExecutionJobVertex(jobId, new JobVertexID(), 4),
-				mockExecutionJobVertex(jobId, new JobVertexID(), 4) };
-
-		ExecutionVertex[] triggerVertices = jobVertices[0].getTaskVertices();
-		ExecutionVertex[] ackVertices = new ExecutionVertex[8];
-
-		int i = 0;
-		for (ExecutionJobVertex jobVertex : jobVertices) {
-			for (ExecutionVertex vertex : jobVertex.getTaskVertices()) {
-				ackVertices[i++] = vertex;
-			}
-		}
-
-		MockCheckpointIdCounter idCounter = new MockCheckpointIdCounter();
-		StateStore<CompletedCheckpoint> savepointStore = new HeapStateStore<>();
-
-		SavepointCoordinator coordinator = createSavepointCoordinator(
-				jobId,
-				60 * 1000,
-				triggerVertices,
-				ackVertices,
-				new ExecutionVertex[] {},
-				idCounter,
-				savepointStore);
-
-		Future<String> savepointPathFuture = coordinator.triggerSavepoint(1231273123);
-
-		// Acknowledge all tasks
-		for (ExecutionVertex vertex : ackVertices) {
-			ExecutionAttemptID attemptId = vertex.getCurrentExecutionAttempt().getAttemptId();
-			coordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(
-					jobId, attemptId, 0, createSerializedStateHandle(vertex), 0));
-		}
-
-		String savepointPath = Await.result(savepointPathFuture, FiniteDuration.Zero());
-		assertNotNull(savepointPath);
-
-		// Rollback
-		coordinator.restoreSavepoint(createExecutionJobVertexMap(jobVertices), savepointPath);
-
-		// Verify all executions have been reset
-		for (ExecutionVertex vertex : ackVertices) {
-			verify(vertex.getCurrentExecutionAttempt(), times(1)).setInitialState(
-					any(SerializedValue.class), any(Map.class));
-		}
-
-		// Verify all promises removed
-		assertEquals(0, getSavepointPromises(coordinator).size());
-
-		// Verify checkpoint ID counter started
-		assertTrue(idCounter.isStarted());
-
-		coordinator.shutdown();
-	}
-
-	@Test
-	public void testRollbackParallelismMismatch() throws Exception {
-		JobID jobId = new JobID();
-
-		ExecutionJobVertex[] jobVertices = new ExecutionJobVertex[] {
-				mockExecutionJobVertex(jobId, new JobVertexID(), 4),
-				mockExecutionJobVertex(jobId, new JobVertexID(), 4) };
-
-		ExecutionVertex[] triggerVertices = jobVertices[0].getTaskVertices();
-		ExecutionVertex[] ackVertices = new ExecutionVertex[8];
-
-		int index = 0;
-		for (ExecutionJobVertex jobVertex : jobVertices) {
-			for (ExecutionVertex vertex : jobVertex.getTaskVertices()) {
-				ackVertices[index++] = vertex;
-			}
-		}
-
-		StateStore<CompletedCheckpoint> savepointStore = new HeapStateStore<>();
-
-		SavepointCoordinator coordinator = createSavepointCoordinator(
-				jobId,
-				60 * 1000,
-				triggerVertices,
-				ackVertices,
-				new ExecutionVertex[] {},
-				new MockCheckpointIdCounter(),
-				savepointStore);
-
-		Future<String> savepointPathFuture = coordinator.triggerSavepoint(1231273123);
-
-		// Acknowledge all tasks
-		for (ExecutionVertex vertex : ackVertices) {
-			ExecutionAttemptID attemptId = vertex.getCurrentExecutionAttempt().getAttemptId();
-			coordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(
-					jobId, attemptId, 0, createSerializedStateHandle(vertex), 0));
-		}
-
-		String savepointPath = Await.result(savepointPathFuture, FiniteDuration.Zero());
-		assertNotNull(savepointPath);
-
-		// Change parallelism lower than original (state without matching subtask). The
-		// other way around (subtask without matching state) is OK.
-		for (int i = 0; i < jobVertices.length; i++) {
-			jobVertices[i] = mockExecutionJobVertex(jobId, jobVertices[i].getJobVertexId(), 2);
-		}
-
-		try {
-			// Rollback
-			coordinator.restoreSavepoint(
-					createExecutionJobVertexMap(jobVertices),
-					savepointPath);
-			fail("Did not throw expected Exception after rollback with parallelism mismatch.");
-		}
-		catch (Exception ignored) {
-		}
-
-		// Verify all promises removed
-		assertEquals(0, getSavepointPromises(coordinator).size());
-
-		coordinator.shutdown();
-	}
-
-	@Test
-	public void testRollbackStateStoreFailure() throws Exception {
-		JobID jobId = new JobID();
-		ExecutionJobVertex jobVertex = mockExecutionJobVertex(jobId, new JobVertexID(), 4);
-		HeapStateStore<CompletedCheckpoint> savepointStore = spy(
-				new HeapStateStore<CompletedCheckpoint>());
-
-		SavepointCoordinator coordinator = createSavepointCoordinator(
-				jobId,
-				60 * 1000,
-				jobVertex.getTaskVertices(),
-				jobVertex.getTaskVertices(),
-				new ExecutionVertex[] {},
-				new MockCheckpointIdCounter(),
-				savepointStore);
-
-		Future<String> savepointPathFuture = coordinator.triggerSavepoint(1231273123);
-
-		// Acknowledge all tasks
-		for (ExecutionVertex vertex : jobVertex.getTaskVertices()) {
-			ExecutionAttemptID attemptId = vertex.getCurrentExecutionAttempt().getAttemptId();
-			coordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(
-					jobId, attemptId, 0, createSerializedStateHandle(vertex), 0));
-		}
-
-		String savepointPath = Await.result(savepointPathFuture, FiniteDuration.Zero());
-		assertNotNull(savepointPath);
-
-		// Failure on getState
-		doThrow(new Exception("TestException")).when(savepointStore).getState(anyString());
-
-		try {
-			// Rollback
-			coordinator.restoreSavepoint(
-					createExecutionJobVertexMap(jobVertex),
-					savepointPath);
-
-			fail("Did not throw expected Exception after rollback with savepoint store failure.");
-		}
-		catch (Exception ignored) {
-		}
-
-		// Verify all promises removed
-		assertEquals(0, getSavepointPromises(coordinator).size());
-
-		coordinator.shutdown();
-	}
-
-	@Test
-	public void testRollbackSetsCheckpointID() throws Exception {
-		CompletedCheckpoint savepoint = mock(CompletedCheckpoint.class);
-		when(savepoint.getTaskStates()).thenReturn(Collections.<JobVertexID, TaskState>emptyMap());
-		when(savepoint.getCheckpointID()).thenReturn(12312312L);
-
-		CheckpointIDCounter checkpointIdCounter = mock(CheckpointIDCounter.class);
-
-		StateStore<CompletedCheckpoint> savepointStore = mock(StateStore.class);
-		when(savepointStore.getState(anyString())).thenReturn(savepoint);
-
-		SavepointCoordinator coordinator = createSavepointCoordinator(
-				new JobID(),
-				60 * 1000,
-				new ExecutionVertex[] {},
-				new ExecutionVertex[] {},
-				new ExecutionVertex[] {},
-				checkpointIdCounter,
-				savepointStore);
-
-		coordinator.restoreSavepoint(createExecutionJobVertexMap(), "any");
-
-		verify(checkpointIdCounter).setCount(eq(12312312L + 1));
-
-		coordinator.shutdown();
-	}
-
-	// ------------------------------------------------------------------------
-	// Savepoint aborts and future notifications
-	// ------------------------------------------------------------------------
-
-	@Test
-	public void testAbortSavepointIfTriggerTasksNotExecuted() throws Exception {
-		JobID jobId = new JobID();
-		ExecutionVertex[] triggerVertices = new ExecutionVertex[] {
-				mock(ExecutionVertex.class),
-				mock(ExecutionVertex.class) };
-		ExecutionVertex[] ackVertices = new ExecutionVertex[] {
-				mockExecutionVertex(jobId),
-				mockExecutionVertex(jobId) };
-
-		SavepointCoordinator coordinator = createSavepointCoordinator(
-				jobId,
-				60 * 1000,
-				triggerVertices,
-				ackVertices,
-				new ExecutionVertex[] {},
-				new MockCheckpointIdCounter(),
-				new HeapStateStore<CompletedCheckpoint>());
-
-		// Trigger savepoint
-		Future<String> savepointPathFuture = coordinator.triggerSavepoint(1238123);
-
-		// Abort the savepoint, because the vertices are not running
-		assertTrue(savepointPathFuture.isCompleted());
-
-		try {
-			Await.result(savepointPathFuture, FiniteDuration.Zero());
-			fail("Did not throw expected Exception after shutdown");
-		}
-		catch (Exception ignored) {
-		}
-
-		// Verify all promises removed
-		assertEquals(0, getSavepointPromises(coordinator).size());
-
-		coordinator.shutdown();
-	}
-
-	@Test
-	public void testAbortSavepointIfTriggerTasksAreFinished() throws Exception {
-		JobID jobId = new JobID();
-		ExecutionVertex[] triggerVertices = new ExecutionVertex[] {
-				mockExecutionVertex(jobId),
-				mockExecutionVertex(jobId, ExecutionState.FINISHED) };
-		ExecutionVertex[] ackVertices = new ExecutionVertex[] {
-				mockExecutionVertex(jobId),
-				mockExecutionVertex(jobId) };
-
-		SavepointCoordinator coordinator = createSavepointCoordinator(
-				jobId,
-				60 * 1000,
-				triggerVertices,
-				ackVertices,
-				new ExecutionVertex[] {},
-				new MockCheckpointIdCounter(),
-				new HeapStateStore<CompletedCheckpoint>());
-
-		// Trigger savepoint
-		Future<String> savepointPathFuture = coordinator.triggerSavepoint(1238123);
-
-		// Abort the savepoint, because the vertices are not running
-		assertTrue(savepointPathFuture.isCompleted());
-
-		try {
-			Await.result(savepointPathFuture, FiniteDuration.Zero());
-			fail("Did not throw expected Exception after shutdown");
-		}
-		catch (Exception ignored) {
-		}
-
-		// Verify all promises removed
-		assertEquals(0, getSavepointPromises(coordinator).size());
-
-		coordinator.shutdown();
-	}
-
-	@Test
-	public void testAbortSavepointIfAckTasksAreNotExecuted() throws Exception {
-		JobID jobId = new JobID();
-		ExecutionVertex[] triggerVertices = new ExecutionVertex[] {
-				mockExecutionVertex(jobId),
-				mockExecutionVertex(jobId) };
-		ExecutionVertex[] ackVertices = new ExecutionVertex[] {
-				mock(ExecutionVertex.class),
-				mock(ExecutionVertex.class) };
-
-		SavepointCoordinator coordinator = createSavepointCoordinator(
-				jobId,
-				60 * 1000,
-				triggerVertices,
-				ackVertices,
-				new ExecutionVertex[] {},
-				new MockCheckpointIdCounter(),
-				new HeapStateStore<CompletedCheckpoint>());
-
-		// Trigger savepoint
-		Future<String> savepointPathFuture = coordinator.triggerSavepoint(1238123);
-
-		// Abort the savepoint, because the vertices are not running
-		assertTrue(savepointPathFuture.isCompleted());
-
-		try {
-			Await.result(savepointPathFuture, FiniteDuration.Zero());
-			fail("Did not throw expected Exception after shutdown");
-		}
-		catch (Exception ignored) {
-		}
-
-		// Verify all promises removed
-		assertEquals(0, getSavepointPromises(coordinator).size());
-
-		coordinator.shutdown();
-	}
-
-	@Test
-	public void testAbortOnCheckpointTimeout() throws Exception {
-		JobID jobId = new JobID();
-		ExecutionVertex[] vertices = new ExecutionVertex[] {
-				mockExecutionVertex(jobId),
-				mockExecutionVertex(jobId) };
-		ExecutionVertex commitVertex = mockExecutionVertex(jobId);
-		MockCheckpointIdCounter checkpointIdCounter = new MockCheckpointIdCounter();
-
-		long checkpointTimeout = 1000;
-		SavepointCoordinator coordinator = createSavepointCoordinator(
-				jobId,
-				checkpointTimeout,
-				vertices,
-				vertices,
-				new ExecutionVertex[] { commitVertex },
-				checkpointIdCounter,
-				new HeapStateStore<CompletedCheckpoint>());
-
-		// Trigger the savepoint
-		Future<String> savepointPathFuture = coordinator.triggerSavepoint(12731273);
-		assertFalse(savepointPathFuture.isCompleted());
-
-		long checkpointId = checkpointIdCounter.getLastReturnedCount();
-		PendingCheckpoint pendingCheckpoint = coordinator.getPendingCheckpoints()
-				.get(checkpointId);
-
-		assertNotNull("Checkpoint not pending (test race)", pendingCheckpoint);
-		assertFalse("Checkpoint already discarded (test race)", pendingCheckpoint.isDiscarded());
-
-		// Wait for savepoint to timeout
-		Deadline deadline = FiniteDuration.apply(60, "s").fromNow();
-		while (deadline.hasTimeLeft()
-				&& !pendingCheckpoint.isDiscarded()
-				&& coordinator.getNumberOfPendingCheckpoints() > 0) {
-
-			Thread.sleep(250);
-		}
-
-		// Verify discarded
-		assertTrue("Savepoint not discarded within timeout", pendingCheckpoint.isDiscarded());
-		assertEquals(0, coordinator.getNumberOfPendingCheckpoints());
-		assertEquals(0, coordinator.getNumberOfRetainedSuccessfulCheckpoints());
-
-		// No commit for timeout
-		verify(commitVertex, times(0)).sendMessageToCurrentExecution(
-				any(NotifyCheckpointComplete.class), any(ExecutionAttemptID.class));
-
-		assertTrue(savepointPathFuture.isCompleted());
-
-		try {
-			Await.result(savepointPathFuture, FiniteDuration.Zero());
-			fail("Did not throw expected Exception after timeout");
-		}
-		catch (Exception ignored) {
-		}
-
-		// Verify all promises removed
-		assertEquals(0, getSavepointPromises(coordinator).size());
-
-		coordinator.shutdown();
-	}
-
-	@Test
-	public void testAbortSavepointsOnShutdown() throws Exception {
-		JobID jobId = new JobID();
-		ExecutionVertex[] vertices = new ExecutionVertex[] {
-				mockExecutionVertex(jobId),
-				mockExecutionVertex(jobId) };
-
-		SavepointCoordinator coordinator = createSavepointCoordinator(
-				jobId,
-				60 * 1000,
-				vertices,
-				vertices,
-				vertices,
-				new MockCheckpointIdCounter(),
-				new HeapStateStore<CompletedCheckpoint>());
-
-		// Trigger savepoints
-		List<Future<String>> savepointPathFutures = new ArrayList<>();
-		savepointPathFutures.add(coordinator.triggerSavepoint(12731273));
-		savepointPathFutures.add(coordinator.triggerSavepoint(12731273 + 123));
-
-		for (Future<String> future : savepointPathFutures) {
-			assertFalse(future.isCompleted());
-		}
-
-		coordinator.shutdown();
-
-		// Verify futures failed
-		for (Future<String> future : savepointPathFutures) {
-			assertTrue(future.isCompleted());
-
-			try {
-				Await.result(future, FiniteDuration.Zero());
-				fail("Did not throw expected Exception after shutdown");
-			}
-			catch (Exception ignored) {
-			}
-		}
-
-		// Verify all promises removed
-		assertEquals(0, getSavepointPromises(coordinator).size());
-	}
-
-	@Test
-	public void testAbortSavepointOnStateStoreFailure() throws Exception {
-		JobID jobId = new JobID();
-		ExecutionJobVertex jobVertex = mockExecutionJobVertex(jobId, new JobVertexID(), 4);
-		HeapStateStore<CompletedCheckpoint> savepointStore = spy(
-				new HeapStateStore<CompletedCheckpoint>());
-
-		SavepointCoordinator coordinator = createSavepointCoordinator(
-				jobId,
-				60 * 1000,
-				jobVertex.getTaskVertices(),
-				jobVertex.getTaskVertices(),
-				new ExecutionVertex[] {},
-				new MockCheckpointIdCounter(),
-				savepointStore);
-
-		// Failure on putState
-		doThrow(new Exception("TestException"))
-				.when(savepointStore).putState(any(CompletedCheckpoint.class));
-
-		Future<String> savepointPathFuture = coordinator.triggerSavepoint(1231273123);
-
-		// Acknowledge all tasks
-		for (ExecutionVertex vertex : jobVertex.getTaskVertices()) {
-			ExecutionAttemptID attemptId = vertex.getCurrentExecutionAttempt().getAttemptId();
-			coordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(
-					jobId, attemptId, 0, createSerializedStateHandle(vertex), 0));
-		}
-
-		try {
-			Await.result(savepointPathFuture, FiniteDuration.Zero());
-			fail("Did not throw expected Exception after rollback with savepoint store failure.");
-		}
-		catch (Exception ignored) {
-		}
-
-		// Verify all promises removed
-		assertEquals(0, getSavepointPromises(coordinator).size());
-
-		coordinator.shutdown();
-	}
-
-	@Test
-	public void testAbortSavepointIfSubsumed() throws Exception {
-		JobID jobId = new JobID();
-		long checkpointTimeout = 60 * 1000;
-		long[] timestamps = new long[] { 1272635, 1272635 + 10 };
-		long[] checkpointIds = new long[2];
-		ExecutionVertex[] vertices = new ExecutionVertex[] {
-				mockExecutionVertex(jobId),
-				mockExecutionVertex(jobId) };
-		MockCheckpointIdCounter checkpointIdCounter = new MockCheckpointIdCounter();
-		HeapStateStore<CompletedCheckpoint> savepointStore = new HeapStateStore<>();
-
-		SavepointCoordinator coordinator = createSavepointCoordinator(
-				jobId,
-				checkpointTimeout,
-				vertices,
-				vertices,
-				vertices,
-				checkpointIdCounter,
-				savepointStore);
-
-		// Trigger the savepoints
-		List<Future<String>> savepointPathFutures = new ArrayList<>();
-
-		savepointPathFutures.add(coordinator.triggerSavepoint(timestamps[0]));
-		checkpointIds[0] = checkpointIdCounter.getLastReturnedCount();
-
-		savepointPathFutures.add(coordinator.triggerSavepoint(timestamps[1]));
-		checkpointIds[1] = checkpointIdCounter.getLastReturnedCount();
-
-		for (Future<String> future : savepointPathFutures) {
-			assertFalse(future.isCompleted());
-		}
-
-		// Verify send trigger messages
-		for (ExecutionVertex vertex : vertices) {
-			verifyTriggerCheckpoint(vertex, checkpointIds[0], timestamps[0]);
-			verifyTriggerCheckpoint(vertex, checkpointIds[1], timestamps[1]);
-		}
-
-		PendingCheckpoint[] pendingCheckpoints = new PendingCheckpoint[] {
-				coordinator.getPendingCheckpoints().get(checkpointIds[0]),
-				coordinator.getPendingCheckpoints().get(checkpointIds[1]) };
-
-		verifyPendingCheckpoint(pendingCheckpoints[0], jobId, checkpointIds[0],
-				timestamps[0], 0, 2, 0, false, false);
-
-		verifyPendingCheckpoint(pendingCheckpoints[1], jobId, checkpointIds[1],
-				timestamps[1], 0, 2, 0, false, false);
-
-		// Acknowledge second checkpoint...
-		for (ExecutionVertex vertex : vertices) {
-			coordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(
-					jobId, vertex.getCurrentExecutionAttempt().getAttemptId(),
-					checkpointIds[1], createSerializedStateHandle(vertex), 0));
-		}
-
-		// ...and one task of first checkpoint
-		coordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(
-				jobId, vertices[0].getCurrentExecutionAttempt().getAttemptId(),
-				checkpointIds[0], createSerializedStateHandle(vertices[0]), 0));
-
-		// The second pending checkpoint is completed and subsumes the first one
-		assertTrue(pendingCheckpoints[0].isDiscarded());
-		assertTrue(pendingCheckpoints[1].isDiscarded());
-		assertEquals(0, coordinator.getSuccessfulCheckpoints().size());
-
-		// Verify send notify complete messages for second checkpoint
-		for (ExecutionVertex vertex : vertices) {
-			verifyNotifyCheckpointComplete(vertex, checkpointIds[1], timestamps[1]);
-		}
-
-		CompletedCheckpoint[] savepoints = new CompletedCheckpoint[2];
-		String[] savepointPaths = new String[2];
-
-		// Verify that the futures have both been completed
-		assertTrue(savepointPathFutures.get(0).isCompleted());
-
-		try {
-			savepointPaths[0] = Await.result(savepointPathFutures.get(0), FiniteDuration.Zero());
-			fail("Did not throw expected exception");
-		}
-		catch (Exception ignored) {
-		}
-
-		// Verify the second savepoint
-		assertTrue(savepointPathFutures.get(1).isCompleted());
-		savepointPaths[1] = Await.result(savepointPathFutures.get(1), FiniteDuration.Zero());
-		savepoints[1] = savepointStore.getState(savepointPaths[1]);
-		verifySavepoint(savepoints[1], jobId, checkpointIds[1], timestamps[1],
-				vertices);
-
-		// Verify all promises removed
-		assertEquals(0, getSavepointPromises(coordinator).size());
-
-		coordinator.shutdown();
-	}
-
-	@Test
-	public void testShutdownDoesNotCleanUpCompletedCheckpointsWithFileSystemStore() throws Exception {
-		JobID jobId = new JobID();
-		long checkpointTimeout = 60 * 1000;
-		long timestamp = 1272635;
-		ExecutionVertex[] vertices = new ExecutionVertex[] {
-				mockExecutionVertex(jobId),
-				mockExecutionVertex(jobId) };
-		MockCheckpointIdCounter checkpointIdCounter = new MockCheckpointIdCounter();
-
-		// Temporary directory for file state backend
-		final File tmpDir = CommonTestUtils.createTempDirectory();
-
-		try {
-			FileSystemStateStore<CompletedCheckpoint> savepointStore = new FileSystemStateStore<>(
-					tmpDir.toURI().toString(), "sp-");
-
-			SavepointCoordinator coordinator = createSavepointCoordinator(
-					jobId,
-					checkpointTimeout,
-					vertices,
-					vertices,
-					vertices,
-					checkpointIdCounter,
-					savepointStore);
-
-			// Trigger the savepoint
-			Future<String> savepointPathFuture = coordinator.triggerSavepoint(timestamp);
-			assertFalse(savepointPathFuture.isCompleted());
-
-			long checkpointId = checkpointIdCounter.getLastReturnedCount();
-			assertEquals(0, checkpointId);
-
-			// Verify send trigger messages
-			for (ExecutionVertex vertex : vertices) {
-				verifyTriggerCheckpoint(vertex, checkpointId, timestamp);
-			}
-
-			PendingCheckpoint pendingCheckpoint = coordinator.getPendingCheckpoints()
-					.get(checkpointId);
-
-			verifyPendingCheckpoint(pendingCheckpoint, jobId, checkpointId,
-					timestamp, 0, 2, 0, false, false);
-
-			// Acknowledge tasks
-			for (ExecutionVertex vertex : vertices) {
-				coordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(
-						jobId, vertex.getCurrentExecutionAttempt().getAttemptId(),
-						checkpointId, createSerializedStateHandle(vertex), 0));
-			}
-
-			// The pending checkpoint is completed
-			assertTrue(pendingCheckpoint.isDiscarded());
-			assertEquals(0, coordinator.getSuccessfulCheckpoints().size());
-
-			// Verify send notify complete messages
-			for (ExecutionVertex vertex : vertices) {
-				verifyNotifyCheckpointComplete(vertex, checkpointId, timestamp);
-			}
-
-			// Verify that the future has been completed
-			assertTrue(savepointPathFuture.isCompleted());
-			String savepointPath = Await.result(savepointPathFuture, FiniteDuration.Zero());
-
-			// Verify all promises removed
-			assertEquals(0, getSavepointPromises(coordinator).size());
-
-			coordinator.shutdown();
-
-			// Verify the savepoint is still available
-			CompletedCheckpoint savepoint = savepointStore.getState(savepointPath);
-			verifySavepoint(savepoint, jobId, checkpointId, timestamp,
-					vertices);
-		}
-		finally {
-			FileUtils.deleteDirectory(tmpDir);
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	// Test helpers
-	// ------------------------------------------------------------------------
-
-	private static SavepointCoordinator createSavepointCoordinator(
-			JobID jobId,
-			long checkpointTimeout,
-			ExecutionVertex[] triggerVertices,
-			ExecutionVertex[] ackVertices,
-			ExecutionVertex[] commitVertices,
-			CheckpointIDCounter checkpointIdCounter,
-			StateStore<CompletedCheckpoint> savepointStore) throws Exception {
-
-		ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
-
-		return new SavepointCoordinator(
-				jobId,
-				checkpointTimeout,
-				checkpointTimeout,
-				42,
-				triggerVertices,
-				ackVertices,
-				commitVertices,
-				classLoader,
-				checkpointIdCounter,
-				savepointStore,
-				new DisabledCheckpointStatsTracker());
-	}
-
-	private static Map<JobVertexID, ExecutionJobVertex> createExecutionJobVertexMap(
-			ExecutionJobVertex... jobVertices) {
-
-		Map<JobVertexID, ExecutionJobVertex> jobVertexMap = new HashMap<>();
-
-		for (ExecutionJobVertex jobVertex : jobVertices) {
-			jobVertexMap.put(jobVertex.getJobVertexId(), jobVertex);
-		}
-
-		return jobVertexMap;
-	}
-
-	private static SerializedValue<StateHandle<?>> createSerializedStateHandle(
-			ExecutionVertex vertex) throws IOException {
-
-		return new SerializedValue<StateHandle<?>>(new LocalStateHandle<Serializable>(
-				vertex.getCurrentExecutionAttempt().getAttemptId()));
-	}
-
-	@SuppressWarnings("unchecked")
-	private Map<Long, Promise<String>> getSavepointPromises(
-			SavepointCoordinator coordinator)
-			throws NoSuchFieldException, IllegalAccessException {
-
-		Field field = SavepointCoordinator.class.getDeclaredField("savepointPromises");
-		field.setAccessible(true);
-		return (Map<Long, Promise<String>>) field.get(coordinator);
-	}
-
-	// ---- Verification ------------------------------------------------------
-
-	private static void verifyTriggerCheckpoint(
-			ExecutionVertex mockExecutionVertex,
-			long expectedCheckpointId,
-			long expectedTimestamp) {
-
-		ExecutionAttemptID attemptId = mockExecutionVertex
-				.getCurrentExecutionAttempt().getAttemptId();
-
-		TriggerCheckpoint expectedMsg = new TriggerCheckpoint(
-				mockExecutionVertex.getJobId(),
-				attemptId,
-				expectedCheckpointId,
-				expectedTimestamp);
-
-		verify(mockExecutionVertex).sendMessageToCurrentExecution(
-				eq(expectedMsg), eq(attemptId));
-	}
-
-	private static void verifyNotifyCheckpointComplete(
-			ExecutionVertex mockExecutionVertex,
-			long expectedCheckpointId,
-			long expectedTimestamp) {
-
-		ExecutionAttemptID attemptId = mockExecutionVertex
-				.getCurrentExecutionAttempt().getAttemptId();
-
-		NotifyCheckpointComplete expectedMsg = new NotifyCheckpointComplete(
-				mockExecutionVertex.getJobId(),
-				attemptId,
-				expectedCheckpointId,
-				expectedTimestamp);
-
-		verify(mockExecutionVertex).sendMessageToCurrentExecution(
-				eq(expectedMsg), eq(attemptId));
-	}
-
-	private static void verifyPendingCheckpoint(
-			PendingCheckpoint checkpoint,
-			JobID expectedJobId,
-			long expectedCheckpointId,
-			long expectedTimestamp,
-			int expectedNumberOfAcknowledgedTasks,
-			int expectedNumberOfNonAcknowledgedTasks,
-			int expectedNumberOfCollectedStates,
-			boolean expectedIsDiscarded,
-			boolean expectedIsFullyAcknowledged) {
-
-		assertNotNull(checkpoint);
-		assertEquals(expectedJobId, checkpoint.getJobId());
-		assertEquals(expectedCheckpointId, checkpoint.getCheckpointId());
-		assertEquals(expectedTimestamp, checkpoint.getCheckpointTimestamp());
-		assertEquals(expectedNumberOfAcknowledgedTasks, checkpoint.getNumberOfAcknowledgedTasks());
-		assertEquals(expectedNumberOfNonAcknowledgedTasks, checkpoint.getNumberOfNonAcknowledgedTasks());
-
-		int actualNumberOfCollectedStates = 0;
-
-		for (TaskState taskState : checkpoint.getTaskStates().values()) {
-			actualNumberOfCollectedStates += taskState.getNumberCollectedStates();
-		}
-
-		assertEquals(expectedNumberOfCollectedStates, actualNumberOfCollectedStates);
-		assertEquals(expectedIsDiscarded, checkpoint.isDiscarded());
-		assertEquals(expectedIsFullyAcknowledged, checkpoint.isFullyAcknowledged());
-	}
-
-	private static void verifySavepoint(
-			CompletedCheckpoint savepoint,
-			JobID expectedJobId,
-			long expectedCheckpointId,
-			long expectedTimestamp,
-			ExecutionVertex[] expectedVertices) throws Exception {
-
-		verifyCompletedCheckpoint(
-				savepoint,
-				expectedJobId,
-				expectedCheckpointId,
-				expectedTimestamp,
-				expectedVertices
-		);
-	}
-
-	private static void verifyCompletedCheckpoint(
-			CompletedCheckpoint checkpoint,
-			JobID expectedJobId,
-			long expectedCheckpointId,
-			long expectedTimestamp,
-			ExecutionVertex[] expectedVertices) throws Exception {
-
-		assertNotNull(checkpoint);
-		assertEquals(expectedJobId, checkpoint.getJobId());
-		assertEquals(expectedCheckpointId, checkpoint.getCheckpointID());
-		assertEquals(expectedTimestamp, checkpoint.getTimestamp());
-
-		for (ExecutionVertex vertex : expectedVertices) {
-			JobVertexID jobVertexID = vertex.getJobvertexId();
-
-			TaskState taskState = checkpoint.getTaskState(jobVertexID);
-
-			assertNotNull(taskState);
-
-			SubtaskState subtaskState = taskState.getState(vertex.getParallelSubtaskIndex());
-
-			assertNotNull(subtaskState);
-
-			ExecutionAttemptID vertexAttemptId = vertex.getCurrentExecutionAttempt().getAttemptId();
-
-			ExecutionAttemptID stateAttemptId = (ExecutionAttemptID) subtaskState.getState()
-				.deserializeValue(Thread.currentThread().getContextClassLoader())
-				.getState(Thread.currentThread().getContextClassLoader());
-
-			assertEquals(vertexAttemptId, stateAttemptId);
-		}
-	}
-
-	// ---- Mocking -----------------------------------------------------------
-
-	private static ExecutionJobVertex mockExecutionJobVertex(
-			JobID jobId,
-			JobVertexID jobVertexId,
-			int parallelism) {
-
-		ExecutionJobVertex jobVertex = mock(ExecutionJobVertex.class);
-		when(jobVertex.getJobId()).thenReturn(jobId);
-		when(jobVertex.getJobVertexId()).thenReturn(jobVertexId);
-		when(jobVertex.getParallelism()).thenReturn(parallelism);
-
-		ExecutionVertex[] vertices = new ExecutionVertex[parallelism];
-
-		for (int i = 0; i < vertices.length; i++) {
-			vertices[i] = mockExecutionVertex(jobId, jobVertexId, i, parallelism, ExecutionState.RUNNING);
-		}
-
-		when(jobVertex.getTaskVertices()).thenReturn(vertices);
-
-		return jobVertex;
-	}
-
-	private static ExecutionVertex mockExecutionVertex(JobID jobId) {
-		return mockExecutionVertex(jobId, ExecutionState.RUNNING);
-	}
-
-	private static ExecutionVertex mockExecutionVertex(
-			JobID jobId,
-			ExecutionState state) {
-
-		return mockExecutionVertex(jobId, new JobVertexID(), 0, 1, state);
-	}
-
-	private static ExecutionVertex mockExecutionVertex(
-			JobID jobId,
-			JobVertexID jobVertexId,
-			int subtaskIndex,
-			int parallelism,
-			ExecutionState executionState) {
-
-		Execution exec = mock(Execution.class);
-		when(exec.getAttemptId()).thenReturn(new ExecutionAttemptID());
-		when(exec.getState()).thenReturn(executionState);
-
-		ExecutionVertex vertex = mock(ExecutionVertex.class);
-		when(vertex.getJobId()).thenReturn(jobId);
-		when(vertex.getJobvertexId()).thenReturn(jobVertexId);
-		when(vertex.getParallelSubtaskIndex()).thenReturn(subtaskIndex);
-		when(vertex.getCurrentExecutionAttempt()).thenReturn(exec);
-		when(vertex.getTotalNumberOfParallelSubtasks()).thenReturn(parallelism);
-
-		return vertex;
-	}
-
-	private static class MockCheckpointIdCounter implements CheckpointIDCounter {
-
-		private boolean started;
-		private long count;
-		private long lastReturnedCount;
-
-		@Override
-		public void start() throws Exception {
-			started = true;
-		}
-
-		@Override
-		public void shutdown() throws Exception {
-			started = false;
-		}
-
-		@Override
-		public void suspend() throws Exception {
-			started = false;
-		}
-
-		@Override
-		public long getAndIncrement() throws Exception {
-			lastReturnedCount = count;
-			return count++;
-		}
-
-		@Override
-		public void setCount(long newCount) {
-			count = newCount;
-		}
-
-		long getLastReturnedCount() {
-			return lastReturnedCount;
-		}
-
-		public boolean isStarted() {
-			return started;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f0ac261a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SavepointStoreFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SavepointStoreFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SavepointStoreFactoryTest.java
deleted file mode 100644
index c0605f7..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SavepointStoreFactoryTest.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.checkpoint;
-
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.IllegalConfigurationException;
-import org.apache.flink.core.fs.Path;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-public class SavepointStoreFactoryTest {
-
-	@Test
-	public void testStateStoreWithDefaultConfig() throws Exception {
-		SavepointStore store = SavepointStoreFactory.createFromConfig(new Configuration());
-		assertTrue(store.getStateStore() instanceof HeapStateStore);
-	}
-
-	@Test
-	public void testSavepointBackendJobManager() throws Exception {
-		Configuration config = new Configuration();
-		config.setString(SavepointStoreFactory.SAVEPOINT_BACKEND_KEY, "jobmanager");
-		SavepointStore store = SavepointStoreFactory.createFromConfig(config);
-		assertTrue(store.getStateStore() instanceof HeapStateStore);
-	}
-
-	@Test
-	public void testSavepointBackendFileSystem() throws Exception {
-		Configuration config = new Configuration();
-		String rootPath = System.getProperty("java.io.tmpdir");
-		config.setString(ConfigConstants.STATE_BACKEND, "filesystem");
-		config.setString(SavepointStoreFactory.SAVEPOINT_BACKEND_KEY, "filesystem");
-		config.setString(SavepointStoreFactory.SAVEPOINT_DIRECTORY_KEY, rootPath);
-
-		SavepointStore store = SavepointStoreFactory.createFromConfig(config);
-		assertTrue(store.getStateStore() instanceof FileSystemStateStore);
-
-		FileSystemStateStore<CompletedCheckpoint> stateStore = (FileSystemStateStore<CompletedCheckpoint>)
-				store.getStateStore();
-		assertEquals(new Path(rootPath), stateStore.getRootPath());
-	}
-
-	@Test
-	public void testSavepointBackendFileSystemButCheckpointBackendJobManager() throws Exception {
-		Configuration config = new Configuration();
-		String rootPath = System.getProperty("java.io.tmpdir");
-		// This combination does not make sense, because the checkpoints will be
-		// lost after the job manager shuts down.
-		config.setString(ConfigConstants.STATE_BACKEND, "jobmanager");
-		config.setString(SavepointStoreFactory.SAVEPOINT_BACKEND_KEY, "filesystem");
-		config.setString(SavepointStoreFactory.SAVEPOINT_DIRECTORY_KEY, rootPath);
-
-		SavepointStore store = SavepointStoreFactory.createFromConfig(config);
-		assertTrue(store.getStateStore() instanceof FileSystemStateStore);
-
-		FileSystemStateStore<CompletedCheckpoint> stateStore = (FileSystemStateStore<CompletedCheckpoint>)
-				store.getStateStore();
-		assertEquals(new Path(rootPath), stateStore.getRootPath());
-	}
-
-	@Test(expected = IllegalConfigurationException.class)
-	public void testSavepointBackendFileSystemButNoDirectory() throws Exception {
-		Configuration config = new Configuration();
-		config.setString(SavepointStoreFactory.SAVEPOINT_BACKEND_KEY, "filesystem");
-		SavepointStoreFactory.createFromConfig(config);
-		fail("Did not throw expected Exception");
-	}
-
-	@Test(expected = IllegalConfigurationException.class)
-	public void testUnexpectedSavepointBackend() throws Exception {
-		Configuration config = new Configuration();
-		config.setString(SavepointStoreFactory.SAVEPOINT_BACKEND_KEY, "unexpected");
-		SavepointStoreFactory.createFromConfig(config);
-		fail("Did not throw expected Exception");
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f0ac261a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java
index 6e4ffd6..59fa0e2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java
@@ -42,7 +42,7 @@ public class StandaloneCompletedCheckpointStoreTest extends CompletedCheckpointS
 	@Test
 	public void testShutdownDiscardsCheckpoints() throws Exception {
 		CompletedCheckpointStore store = createCompletedCheckpoints(1, ClassLoader.getSystemClassLoader());
-		TestCheckpoint checkpoint = createCheckpoint(0);
+		TestCompletedCheckpoint checkpoint = createCheckpoint(0);
 
 		store.addCheckpoint(checkpoint);
 		assertEquals(1, store.getNumberOfRetainedCheckpoints());
@@ -60,7 +60,7 @@ public class StandaloneCompletedCheckpointStoreTest extends CompletedCheckpointS
 	@Test
 	public void testSuspendDiscardsCheckpoints() throws Exception {
 		CompletedCheckpointStore store = createCompletedCheckpoints(1, ClassLoader.getSystemClassLoader());
-		TestCheckpoint checkpoint = createCheckpoint(0);
+		TestCompletedCheckpoint checkpoint = createCheckpoint(0);
 
 		store.addCheckpoint(checkpoint);
 		assertEquals(1, store.getNumberOfRetainedCheckpoints());


Mime
View raw message