flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject [2/7] flink git commit: [FLINK-4512] [FLIP-10] Add option to persist periodic checkpoints
Date Fri, 14 Oct 2016 08:06:00 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
index 289f5c3..baa0e08 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
@@ -19,11 +19,11 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.messages.CheckpointMessagesTest;
 import org.apache.flink.runtime.state.ChainedStateHandle;
 import org.apache.flink.runtime.state.StreamStateHandle;
-
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 
@@ -46,19 +46,16 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
 	 * Creates the {@link CompletedCheckpointStore} implementation to be tested.
 	 */
 	protected abstract CompletedCheckpointStore createCompletedCheckpoints(
-			int maxNumberOfCheckpointsToRetain, ClassLoader userLoader) throws Exception;
+			int maxNumberOfCheckpointsToRetain) throws Exception;
 
 	// ---------------------------------------------------------------------------------------------
 
-	// Verify that discarded checkpoints are called with the correct class loader
-	private final ClassLoader userClassLoader = ClassLoader.getSystemClassLoader();
-
 	/**
 	 * Tests that at least one checkpoint needs to be retained.
 	 */
 	@Test(expected = Exception.class)
 	public void testExceptionOnNoRetainedCheckpoints() throws Exception {
-		createCompletedCheckpoints(0, userClassLoader);
+		createCompletedCheckpoints(0);
 	}
 
 	/**
@@ -66,7 +63,7 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
 	 */
 	@Test
 	public void testAddAndGetLatestCheckpoint() throws Exception {
-		CompletedCheckpointStore checkpoints = createCompletedCheckpoints(4, userClassLoader);
+		CompletedCheckpointStore checkpoints = createCompletedCheckpoints(4);
 
 		// Empty state
 		assertEquals(0, checkpoints.getNumberOfRetainedCheckpoints());
@@ -91,7 +88,7 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
 	 */
 	@Test
 	public void testAddCheckpointMoreThanMaxRetained() throws Exception {
-		CompletedCheckpointStore checkpoints = createCompletedCheckpoints(1, userClassLoader);
+		CompletedCheckpointStore checkpoints = createCompletedCheckpoints(1);
 
 		TestCompletedCheckpoint[] expected = new TestCompletedCheckpoint[] {
 				createCheckpoint(0), createCheckpoint(1),
@@ -122,7 +119,7 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
 	 */
 	@Test
 	public void testEmptyState() throws Exception {
-		CompletedCheckpointStore checkpoints = createCompletedCheckpoints(1, userClassLoader);
+		CompletedCheckpointStore checkpoints = createCompletedCheckpoints(1);
 
 		assertNull(checkpoints.getLatestCheckpoint());
 		assertEquals(0, checkpoints.getAllCheckpoints().size());
@@ -134,7 +131,7 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
 	 */
 	@Test
 	public void testGetAllCheckpoints() throws Exception {
-		CompletedCheckpointStore checkpoints = createCompletedCheckpoints(4, userClassLoader);
+		CompletedCheckpointStore checkpoints = createCompletedCheckpoints(4);
 
 		TestCompletedCheckpoint[] expected = new TestCompletedCheckpoint[] {
 				createCheckpoint(0), createCheckpoint(1),
@@ -159,7 +156,7 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
 	 */
 	@Test
 	public void testDiscardAllCheckpoints() throws Exception {
-		CompletedCheckpointStore checkpoints = createCompletedCheckpoints(4, userClassLoader);
+		CompletedCheckpointStore checkpoints = createCompletedCheckpoints(4);
 
 		TestCompletedCheckpoint[] expected = new TestCompletedCheckpoint[] {
 				createCheckpoint(0), createCheckpoint(1),
@@ -170,7 +167,7 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
 			checkpoints.addCheckpoint(checkpoint);
 		}
 
-		checkpoints.shutdown();
+		checkpoints.shutdown(JobStatus.FINISHED);
 
 		// Empty state
 		assertNull(checkpoints.getLatestCheckpoint());
@@ -193,6 +190,11 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
 
 	protected TestCompletedCheckpoint createCheckpoint(int id, int numberOfStates)
 			throws IOException {
+		return createCheckpoint(id, numberOfStates, CheckpointProperties.forStandardCheckpoint());
+	}
+
+	protected TestCompletedCheckpoint createCheckpoint(int id, int numberOfStates, CheckpointProperties props)
+			throws IOException {
 
 		JobVertexID jvid = new JobVertexID();
 
@@ -207,7 +209,7 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
 			taskState.putState(i, new SubtaskState(stateHandle, 0));
 		}
 
-		return new TestCompletedCheckpoint(new JobID(), id, 0, taskGroupStates);
+		return new TestCompletedCheckpoint(new JobID(), id, 0, taskGroupStates, props);
 	}
 
 	private void verifyCheckpoint(CompletedCheckpoint expected, CompletedCheckpoint actual) {
@@ -232,15 +234,33 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
 			JobID jobId,
 			long checkpointId,
 			long timestamp,
-			Map<JobVertexID, TaskState> taskGroupStates) {
+			Map<JobVertexID, TaskState> taskGroupStates,
+			CheckpointProperties props) {
 
-			super(jobId, checkpointId, timestamp, Long.MAX_VALUE, taskGroupStates, true);
+			super(jobId, checkpointId, timestamp, Long.MAX_VALUE, taskGroupStates, props, null);
 		}
 
 		@Override
-		public void discardState() throws Exception {
-			super.discardState();
+		public boolean subsume() throws Exception {
+			if (super.subsume()) {
+				discard();
+				return true;
+			} else {
+				return false;
+			}
+		}
+
+		@Override
+		public boolean discard(JobStatus jobStatus) throws Exception {
+			if (super.discard(jobStatus)) {
+				discard();
+				return true;
+			} else {
+				return false;
+			}
+		}
 
+		private void discard() {
 			if (!isDiscarded) {
 				this.isDiscarded = true;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
index 9b04244..25a4703 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
@@ -19,40 +19,103 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.junit.Rule;
 import org.junit.Test;
-import org.mockito.Matchers;
+import org.junit.rules.TemporaryFolder;
 import org.mockito.Mockito;
 
+import java.io.File;
 import java.util.HashMap;
 import java.util.Map;
 
+import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
 public class CompletedCheckpointTest {
 
+	@Rule
+	public TemporaryFolder tmpFolder = new TemporaryFolder();
+
 	/**
-	 * Tests that the `deleteStateWhenDisposed` flag is correctly forwarded.
+	 * Tests that persistent checkpoints discard their header file.
 	 */
 	@Test
 	public void testDiscard() throws Exception {
+		File file = tmpFolder.newFile();
+		assertEquals(true, file.exists());
+
 		TaskState state = mock(TaskState.class);
 		Map<JobVertexID, TaskState> taskStates = new HashMap<>();
 		taskStates.put(new JobVertexID(), state);
 
 		// Verify discard call is forwarded to state
-		CompletedCheckpoint checkpoint = new CompletedCheckpoint(new JobID(), 0, 0, 1, taskStates, true);
-		checkpoint.discardState();
+		CompletedCheckpoint checkpoint = new CompletedCheckpoint(
+				new JobID(), 0, 0, 1, taskStates, CheckpointProperties.forStandardCheckpoint(), file.getAbsolutePath());
+
+		checkpoint.discard(JobStatus.FAILED);
+
+		assertEquals(false, file.exists());
+	}
+
+	/**
+	 * Tests that the garbage collection properties are respected when subsuming checkpoints.
+	 */
+	@Test
+	public void testCleanUpOnSubsume() throws Exception {
+		TaskState state = mock(TaskState.class);
+		Map<JobVertexID, TaskState> taskStates = new HashMap<>();
+		taskStates.put(new JobVertexID(), state);
+
+		boolean discardSubsumed = true;
+		CheckpointProperties props = new CheckpointProperties(false, false, discardSubsumed, true, true, true, true);
+		CompletedCheckpoint checkpoint = new CompletedCheckpoint(
+				new JobID(), 0, 0, 1, taskStates, props, null);
+
+		// Subsume
+		checkpoint.subsume();
+
 		verify(state, times(1)).discardState();
+	}
+
+	/**
+	 * Tests that the garbage collection properties are respected when shutting down.
+	 */
+	@Test
+	public void testCleanUpOnShutdown() throws Exception {
+		File file = tmpFolder.newFile();
+		String externalPath = file.getAbsolutePath();
+
+		JobStatus[] terminalStates = new JobStatus[] {
+				JobStatus.FINISHED, JobStatus.CANCELED, JobStatus.FAILED, JobStatus.SUSPENDED
+		};
+
+		TaskState state = mock(TaskState.class);
+		Map<JobVertexID, TaskState> taskStates = new HashMap<>();
+		taskStates.put(new JobVertexID(), state);
+
+		for (JobStatus status : terminalStates) {
+			Mockito.reset(state);
+
+			// Keep
+			CheckpointProperties props = new CheckpointProperties(false, true, false, false, false, false, false);
+			CompletedCheckpoint checkpoint = new CompletedCheckpoint(
+					new JobID(), 0, 0, 1, new HashMap<>(taskStates), props, externalPath);
+
+			checkpoint.discard(status);
+			verify(state, times(0)).discardState();
+			assertEquals(true, file.exists());
 
-		Mockito.reset(state);
+			// Discard
+			props = new CheckpointProperties(false, false, true, true, true, true, true);
+			checkpoint = new CompletedCheckpoint(
+					new JobID(), 0, 0, 1, new HashMap<>(taskStates), props, null);
 
-		// Verify discard call is not forwarded to state
-		checkpoint = new CompletedCheckpoint(new JobID(), 0, 0, 1, taskStates, false);
-		checkpoint.discardState();
-		verify(state, times(0)).discardState();
+			checkpoint.discard(status);
+			verify(state, times(1)).discardState();
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
index c43cf2e..ea4d322 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
@@ -23,17 +23,16 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
 import org.apache.flink.runtime.jobmanager.Tasks;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
-
 import org.junit.Test;
-
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
@@ -42,7 +41,9 @@ import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 public class CoordinatorShutdownTest {
 	
@@ -62,7 +63,7 @@ public class CoordinatorShutdownTest {
 			
 			JobGraph testGraph = new JobGraph("test job", vertex);
 			testGraph.setSnapshotSettings(new JobSnapshottingSettings(vertexIdList, vertexIdList, vertexIdList, 
-					5000, 60000, 0L, Integer.MAX_VALUE));
+					5000, 60000, 0L, Integer.MAX_VALUE, ExternalizedCheckpointSettings.none()));
 			
 			ActorGateway jmGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
 
@@ -114,7 +115,7 @@ public class CoordinatorShutdownTest {
 
 			JobGraph testGraph = new JobGraph("test job", vertex);
 			testGraph.setSnapshotSettings(new JobSnapshottingSettings(vertexIdList, vertexIdList, vertexIdList,
-					5000, 60000, 0L, Integer.MAX_VALUE));
+					5000, 60000, 0L, Integer.MAX_VALUE, ExternalizedCheckpointSettings.none()));
 			
 			ActorGateway jmGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
index 1de7098..a8bed46 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
@@ -26,16 +26,18 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobKey;
-import org.apache.flink.runtime.checkpoint.savepoint.HeapSavepointStore;
 import org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.SerializedValue;
 import org.junit.AfterClass;
 import org.junit.Test;
+import org.mockito.Matchers;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.net.URL;
@@ -67,8 +69,8 @@ public class ExecutionGraphCheckpointCoordinatorTest {
 		ExecutionGraph graph = createExecutionGraphAndEnableCheckpointing(counter, store);
 		graph.fail(new Exception("Test Exception"));
 
-		verify(counter, times(1)).shutdown();
-		verify(store, times(1)).shutdown();
+		verify(counter, times(1)).shutdown(JobStatus.FAILED);
+		verify(store, times(1)).shutdown(JobStatus.FAILED);
 	}
 
 	/**
@@ -84,11 +86,8 @@ public class ExecutionGraphCheckpointCoordinatorTest {
 		graph.suspend(new Exception("Test Exception"));
 
 		// No shutdown
-		verify(counter, times(0)).shutdown();
-		verify(store, times(0)).shutdown();
-
-		verify(counter, times(1)).suspend();
-		verify(store, times(1)).suspend();
+		verify(counter, times(1)).shutdown(Matchers.eq(JobStatus.SUSPENDED));
+		verify(store, times(1)).shutdown(Matchers.eq(JobStatus.SUSPENDED));
 	}
 
 	private ExecutionGraph createExecutionGraphAndEnableCheckpointing(
@@ -112,12 +111,13 @@ public class ExecutionGraphCheckpointCoordinatorTest {
 				100,
 				100,
 				1,
+				ExternalizedCheckpointSettings.none(),
 				Collections.<ExecutionJobVertex>emptyList(),
 				Collections.<ExecutionJobVertex>emptyList(),
 				Collections.<ExecutionJobVertex>emptyList(),
 				counter,
 				store,
-				new HeapSavepointStore(),
+				null,
 				new DisabledCheckpointStatsTracker());
 
 		JobVertex jobVertex = new JobVertex("MockVertex");

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
index b8126e9..2667743 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
@@ -19,24 +19,33 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.junit.Rule;
 import org.junit.Test;
-import org.mockito.Matchers;
+import org.junit.rules.TemporaryFolder;
 import org.mockito.Mockito;
 
+import java.io.File;
 import java.lang.reflect.Field;
 import java.util.HashMap;
 import java.util.Map;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
 public class PendingCheckpointTest {
 
+	@Rule
+	public TemporaryFolder tmpFolder = new TemporaryFolder();
+
 	private static final Map<ExecutionAttemptID, ExecutionVertex> ACK_TASKS = new HashMap<>();
 	private static final ExecutionAttemptID ATTEMPT_ID = new ExecutionAttemptID();
 
@@ -45,24 +54,125 @@ public class PendingCheckpointTest {
 	}
 
 	/**
-	 * Tests that pending checkpoints can be subsumed.
+	 * Tests that pending checkpoints can be subsumed iff they are forced.
 	 */
 	@Test
 	public void testCanBeSubsumed() throws Exception {
-		PendingCheckpoint pending = createPendingCheckpoint();
+		// Forced checkpoints cannot be subsumed
+		CheckpointProperties forced = new CheckpointProperties(true, true, false, false, false, false, false);
+		PendingCheckpoint pending = createPendingCheckpoint(forced, "ignored");
+		assertFalse(pending.canBeSubsumed());
+
+		try {
+			pending.abortSubsumed();
+			fail("Did not throw expected Exception");
+		} catch (IllegalStateException ignored) {
+			// Expected
+		}
+
+		// Non-forced checkpoints can be subsumed
+		CheckpointProperties subsumed = new CheckpointProperties(false, true, false, false, false, false, false);
+		pending = createPendingCheckpoint(subsumed, "ignored");
 		assertTrue(pending.canBeSubsumed());
 	}
 
 	/**
+	 * Tests that the persist checkpoint property is respected by the pending
+	 * checkpoint when finalizing.
+	 */
+	@Test
+	public void testPersistExternally() throws Exception {
+		File tmp = tmpFolder.newFolder();
+
+		// Persisted checkpoint
+		CheckpointProperties persisted = new CheckpointProperties(false, true, false, false, false, false, false);
+
+		PendingCheckpoint pending = createPendingCheckpoint(persisted, tmp.getAbsolutePath());
+		pending.acknowledgeTask(ATTEMPT_ID, null);
+
+		assertEquals(0, tmp.listFiles().length);
+		pending.finalizeCheckpoint();
+		assertEquals(1, tmp.listFiles().length);
+
+		// Ephemeral checkpoint
+		CheckpointProperties ephemeral = new CheckpointProperties(false, false, true, true, true, true, true);
+		pending = createPendingCheckpoint(ephemeral, null);
+		pending.acknowledgeTask(ATTEMPT_ID, null);
+
+		assertEquals(1, tmp.listFiles().length);
+		pending.finalizeCheckpoint();
+		assertEquals(1, tmp.listFiles().length);
+	}
+
+	/**
+	 * Tests that the completion future is succeeded on finalize and failed on
+	 * abort and failures during finalize.
+	 */
+	@Test
+	public void testCompletionFuture() throws Exception {
+		CheckpointProperties props = new CheckpointProperties(false, true, false, false, false, false, false);
+
+		// Abort declined
+		PendingCheckpoint pending = createPendingCheckpoint(props, "ignored");
+		Future<CompletedCheckpoint> future = pending.getCompletionFuture();
+
+		assertFalse(future.isDone());
+		pending.abortDeclined();
+		assertTrue(future.isDone());
+
+		// Abort expired
+		pending = createPendingCheckpoint(props, "ignored");
+		future = pending.getCompletionFuture();
+
+		assertFalse(future.isDone());
+		pending.abortExpired();
+		assertTrue(future.isDone());
+
+		// Abort subsumed
+		pending = createPendingCheckpoint(props, "ignored");
+		future = pending.getCompletionFuture();
+
+		assertFalse(future.isDone());
+		pending.abortSubsumed();
+		assertTrue(future.isDone());
+
+		// Finalize (all ACK'd)
+		String target = tmpFolder.newFolder().getAbsolutePath();
+		pending = createPendingCheckpoint(props, target);
+		future = pending.getCompletionFuture();
+
+		assertFalse(future.isDone());
+		pending.acknowledgeTask(ATTEMPT_ID, null);
+		pending.finalizeCheckpoint();
+		assertTrue(future.isDone());
+
+		// Finalize (missing ACKs)
+		pending = createPendingCheckpoint(props, "ignored");
+		future = pending.getCompletionFuture();
+
+		assertFalse(future.isDone());
+		try {
+			pending.finalizeCheckpoint();
+			fail("Did not throw expected Exception");
+		} catch (IllegalStateException ignored) {
+			// Expected
+		}
+		assertTrue(future.isDone());
+	}
+
+	/**
 	 * Tests that abort discards state.
 	 */
 	@Test
 	@SuppressWarnings("unchecked")
-	public void testAbort() throws Exception {
+	public void testAbortDiscardsState() throws Exception {
+		CheckpointProperties props = new CheckpointProperties(false, true, false, false, false, false, false);
 		TaskState state = mock(TaskState.class);
 
+		String targetDir = tmpFolder.newFolder().getAbsolutePath();
+
 		// Abort declined
-		PendingCheckpoint pending = createPendingCheckpoint();
+		PendingCheckpoint pending = createPendingCheckpoint(props, targetDir);
 		setTaskState(pending, state);
 
 		pending.abortDeclined();
@@ -71,7 +181,7 @@ public class PendingCheckpointTest {
 		// Abort error
 		Mockito.reset(state);
 
-		pending = createPendingCheckpoint();
+		pending = createPendingCheckpoint(props, targetDir);
 		setTaskState(pending, state);
 
 		pending.abortError(new Exception("Expected Test Exception"));
@@ -80,7 +190,7 @@ public class PendingCheckpointTest {
 		// Abort expired
 		Mockito.reset(state);
 
-		pending = createPendingCheckpoint();
+		pending = createPendingCheckpoint(props, targetDir);
 		setTaskState(pending, state);
 
 		pending.abortExpired();
@@ -89,37 +199,18 @@ public class PendingCheckpointTest {
 		// Abort subsumed
 		Mockito.reset(state);
 
-		pending = createPendingCheckpoint();
+		pending = createPendingCheckpoint(props, targetDir);
 		setTaskState(pending, state);
 
 		pending.abortSubsumed();
 		verify(state, times(1)).discardState();
 	}
 
-	/**
-	 * Tests that the CompletedCheckpoint `deleteStateWhenDisposed` flag is
-	 * correctly set to true.
-	 */
-	@Test
-	public void testFinalizeCheckpoint() throws Exception {
-		TaskState state = mock(TaskState.class);
-		PendingCheckpoint pending = createPendingCheckpoint();
-		PendingCheckpointTest.setTaskState(pending, state);
-
-		pending.acknowledgeTask(ATTEMPT_ID, null);
-
-		CompletedCheckpoint checkpoint = pending.finalizeCheckpoint();
-
-		// Does discard state
-		checkpoint.discardState();
-		verify(state, times(1)).discardState();
-	}
-
 	// ------------------------------------------------------------------------
 
-	private static PendingCheckpoint createPendingCheckpoint() {
+	private static PendingCheckpoint createPendingCheckpoint(CheckpointProperties props, String targetDirectory) {
 		Map<ExecutionAttemptID, ExecutionVertex> ackTasks = new HashMap<>(ACK_TASKS);
-		return new PendingCheckpoint(new JobID(), 0, 1, ackTasks);
+		return new PendingCheckpoint(new JobID(), 0, 1, ackTasks, props, targetDirectory);
 	}
 
 	@SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingSavepointTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingSavepointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingSavepointTest.java
deleted file mode 100644
index 3701359..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingSavepointTest.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.checkpoint;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.checkpoint.savepoint.HeapSavepointStore;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.junit.Test;
-import org.mockito.Matchers;
-import org.mockito.Mockito;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.Duration;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
-public class PendingSavepointTest {
-
-	private static final Map<ExecutionAttemptID, ExecutionVertex> ACK_TASKS = new HashMap<>();
-	private static final ExecutionAttemptID ATTEMPT_ID = new ExecutionAttemptID();
-
-	static {
-		ACK_TASKS.put(ATTEMPT_ID, mock(ExecutionVertex.class));
-	}
-
-	/**
-	 * Tests that pending savepoints cannot be subsumed.
-	 */
-	@Test
-	public void testCanBeSubsumed() throws Exception {
-		PendingSavepoint pending = createPendingSavepoint();
-		assertFalse(pending.canBeSubsumed());
-	}
-
-	/**
-	 * Tests that abort discards state fails the completeion future.
-	 */
-	@Test
-	@SuppressWarnings("unchecked")
-	public void testAbort() throws Exception {
-		TaskState state = mock(TaskState.class);
-
-		// Abort declined
-		PendingSavepoint pending = createPendingSavepoint();
-		PendingCheckpointTest.setTaskState(pending, state);
-
-		pending.abortDeclined();
-		verify(state, times(1)).discardState();
-
-		// Abort error
-		Mockito.reset(state);
-
-		pending = createPendingSavepoint();
-		PendingCheckpointTest.setTaskState(pending, state);
-		Future<String> future = pending.getCompletionFuture();
-
-		pending.abortError(new Exception("Expected Test Exception"));
-		verify(state, times(1)).discardState();
-		assertTrue(future.failed().isCompleted());
-
-		// Abort expired
-		Mockito.reset(state);
-
-		pending = createPendingSavepoint();
-		PendingCheckpointTest.setTaskState(pending, state);
-		future = pending.getCompletionFuture();
-
-		pending.abortExpired();
-		verify(state, times(1)).discardState();
-		assertTrue(future.failed().isCompleted());
-
-		// Abort subsumed
-		pending = createPendingSavepoint();
-
-		try {
-			pending.abortSubsumed();
-			fail("Did not throw expected Exception");
-		} catch (Throwable ignored) { // expected
-		}
-	}
-
-	/**
-	 * Tests that the CompletedCheckpoint `deleteStateWhenDisposed` flag is
-	 * correctly set to false.
-	 */
-	@Test
-	public void testFinalizeCheckpoint() throws Exception {
-		TaskState state = mock(TaskState.class);
-		PendingSavepoint pending = createPendingSavepoint();
-		PendingCheckpointTest.setTaskState(pending, state);
-
-		Future<String> future = pending.getCompletionFuture();
-
-		pending.acknowledgeTask(ATTEMPT_ID, null);
-
-		CompletedCheckpoint checkpoint = pending.finalizeCheckpoint();
-
-		// Does _NOT_ discard state
-		checkpoint.discardState();
-		verify(state, times(0)).discardState();
-
-		// Future is completed
-		String path = Await.result(future, Duration.Zero());
-		assertNotNull(path);
-	}
-
-	// ------------------------------------------------------------------------
-
-	private static PendingSavepoint createPendingSavepoint() {
-		Map<ExecutionAttemptID, ExecutionVertex> ackTasks = new HashMap<>(ACK_TASKS);
-		return new PendingSavepoint(new JobID(), 0, 1, ackTasks, new HeapSavepointStore());
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java
index 59fa0e2..4e9366e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.checkpoint;
 
+import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
@@ -30,10 +31,9 @@ public class StandaloneCompletedCheckpointStoreTest extends CompletedCheckpointS
 
 	@Override
 	protected CompletedCheckpointStore createCompletedCheckpoints(
-			int maxNumberOfCheckpointsToRetain,
-			ClassLoader userClassLoader) throws Exception {
+			int maxNumberOfCheckpointsToRetain) throws Exception {
 
-		return new StandaloneCompletedCheckpointStore(maxNumberOfCheckpointsToRetain, userClassLoader);
+		return new StandaloneCompletedCheckpointStore(maxNumberOfCheckpointsToRetain);
 	}
 
 	/**
@@ -41,13 +41,13 @@ public class StandaloneCompletedCheckpointStoreTest extends CompletedCheckpointS
 	 */
 	@Test
 	public void testShutdownDiscardsCheckpoints() throws Exception {
-		CompletedCheckpointStore store = createCompletedCheckpoints(1, ClassLoader.getSystemClassLoader());
+		CompletedCheckpointStore store = createCompletedCheckpoints(1);
 		TestCompletedCheckpoint checkpoint = createCheckpoint(0);
 
 		store.addCheckpoint(checkpoint);
 		assertEquals(1, store.getNumberOfRetainedCheckpoints());
 
-		store.shutdown();
+		store.shutdown(JobStatus.FINISHED);
 
 		assertEquals(0, store.getNumberOfRetainedCheckpoints());
 		assertTrue(checkpoint.isDiscarded());
@@ -59,13 +59,13 @@ public class StandaloneCompletedCheckpointStoreTest extends CompletedCheckpointS
 	 */
 	@Test
 	public void testSuspendDiscardsCheckpoints() throws Exception {
-		CompletedCheckpointStore store = createCompletedCheckpoints(1, ClassLoader.getSystemClassLoader());
+		CompletedCheckpointStore store = createCompletedCheckpoints(1);
 		TestCompletedCheckpoint checkpoint = createCheckpoint(0);
 
 		store.addCheckpoint(checkpoint);
 		assertEquals(1, store.getNumberOfRetainedCheckpoints());
 
-		store.suspend();
+		store.shutdown(JobStatus.SUSPENDED);
 
 		assertEquals(0, store.getNumberOfRetainedCheckpoints());
 		assertTrue(checkpoint.isDiscarded());

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
index 9fbe574..2e44ecd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.state.RetrievableStateHandle;
 import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
 import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
@@ -59,9 +60,9 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint
 
 	@Override
 	protected CompletedCheckpointStore createCompletedCheckpoints(
-			int maxNumberOfCheckpointsToRetain, ClassLoader userLoader) throws Exception {
+			int maxNumberOfCheckpointsToRetain) throws Exception {
 
-		return new ZooKeeperCompletedCheckpointStore(maxNumberOfCheckpointsToRetain, userLoader,
+		return new ZooKeeperCompletedCheckpointStore(maxNumberOfCheckpointsToRetain,
 			ZooKeeper.createClient(), CheckpointsPath, new RetrievableStateStorageHelper<CompletedCheckpoint>() {
 			@Override
 			public RetrievableStateHandle<CompletedCheckpoint> store(CompletedCheckpoint state) throws Exception {
@@ -77,8 +78,7 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint
 	 */
 	@Test
 	public void testRecover() throws Exception {
-		CompletedCheckpointStore checkpoints = createCompletedCheckpoints(3, ClassLoader
-				.getSystemClassLoader());
+		CompletedCheckpointStore checkpoints = createCompletedCheckpoints(3);
 
 		TestCompletedCheckpoint[] expected = new TestCompletedCheckpoint[] {
 				createCheckpoint(0), createCheckpoint(1), createCheckpoint(2)
@@ -118,14 +118,14 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint
 	public void testShutdownDiscardsCheckpoints() throws Exception {
 		CuratorFramework client = ZooKeeper.getClient();
 
-		CompletedCheckpointStore store = createCompletedCheckpoints(1, ClassLoader.getSystemClassLoader());
+		CompletedCheckpointStore store = createCompletedCheckpoints(1);
 		TestCompletedCheckpoint checkpoint = createCheckpoint(0);
 
 		store.addCheckpoint(checkpoint);
 		assertEquals(1, store.getNumberOfRetainedCheckpoints());
 		assertNotNull(client.checkExists().forPath(CheckpointsPath + "/" + checkpoint.getCheckpointID()));
 
-		store.shutdown();
+		store.shutdown(JobStatus.FINISHED);
 
 		assertEquals(0, store.getNumberOfRetainedCheckpoints());
 		assertNull(client.checkExists().forPath(CheckpointsPath + "/" + checkpoint.getCheckpointID()));
@@ -143,14 +143,14 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint
 	public void testSuspendKeepsCheckpoints() throws Exception {
 		CuratorFramework client = ZooKeeper.getClient();
 
-		CompletedCheckpointStore store = createCompletedCheckpoints(1, ClassLoader.getSystemClassLoader());
+		CompletedCheckpointStore store = createCompletedCheckpoints(1);
 		TestCompletedCheckpoint checkpoint = createCheckpoint(0);
 
 		store.addCheckpoint(checkpoint);
 		assertEquals(1, store.getNumberOfRetainedCheckpoints());
 		assertNotNull(client.checkExists().forPath(CheckpointsPath + "/" + checkpoint.getCheckpointID()));
 
-		store.suspend();
+		store.shutdown(JobStatus.SUSPENDED);
 
 		assertEquals(0, store.getNumberOfRetainedCheckpoints());
 		assertNotNull(client.checkExists().forPath(CheckpointsPath + "/" + checkpoint.getCheckpointID()));

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/FsSavepointStoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/FsSavepointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/FsSavepointStoreTest.java
deleted file mode 100644
index 3e2de80..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/FsSavepointStoreTest.java
+++ /dev/null
@@ -1,235 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.checkpoint.savepoint;
-
-import org.apache.flink.core.fs.FSDataOutputStream;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.checkpoint.TaskState;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.mockito.Matchers;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ThreadLocalRandom;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
-
-public class FsSavepointStoreTest {
-
-	@Rule
-	public TemporaryFolder tmp = new TemporaryFolder();
-
-	/**
-	 * Tests a store-load-dispose sequence.
-	 */
-	@Test
-	public void testStoreLoadDispose() throws Exception {
-		FsSavepointStore store = new FsSavepointStore(tmp.getRoot().getPath(), "fs-savepoint-store-test-");
-		assertEquals(0, tmp.getRoot().listFiles().length);
-
-		// Store
-		SavepointV1 stored = new SavepointV1(1929292, SavepointV1Test.createTaskStates(4, 24));
-		String path = store.storeSavepoint(stored);
-		assertEquals(1, tmp.getRoot().listFiles().length);
-
-		// Load
-		Savepoint loaded = store.loadSavepoint(path);
-		assertEquals(stored, loaded);
-
-		// Dispose
-		store.disposeSavepoint(path);
-
-		assertEquals(0, tmp.getRoot().listFiles().length);
-	}
-
-	/**
-	 * Tests loading with unexpected magic number.
-	 */
-	@Test
-	public void testUnexpectedSavepoint() throws Exception {
-		FsSavepointStore store = new FsSavepointStore(tmp.getRoot().getPath(), "fs-savepoint-store-test-");
-
-		// Random file
-		Path filePath = new Path(tmp.getRoot().getPath(), UUID.randomUUID().toString());
-		FSDataOutputStream fdos = FileSystem.get(filePath.toUri()).create(filePath, false);
-		DataOutputStream dos = new DataOutputStream(fdos);
-		for (int i = 0; i < 10; i++) {
-			dos.writeLong(ThreadLocalRandom.current().nextLong());
-		}
-
-		try {
-			store.loadSavepoint(filePath.toString());
-			fail("Did not throw expected Exception");
-		} catch (RuntimeException e) {
-			assertTrue(e.getMessage().contains("Flink 1.0") && e.getMessage().contains("Unexpected magic number"));
-		}
-	}
-
-	/**
-	 * Tests addition of a new savepoint version.
-	 */
-	@Test
-	@SuppressWarnings("unchecked")
-	public void testMultipleSavepointVersions() throws Exception {
-		Field field = SavepointSerializers.class.getDeclaredField("SERIALIZERS");
-		field.setAccessible(true);
-		Map<Integer, SavepointSerializer<?>> serializers = (Map<Integer, SavepointSerializer<?>>) field.get(null);
-
-		assertTrue(serializers.size() >= 1);
-
-		FsSavepointStore store = new FsSavepointStore(tmp.getRoot().getPath(), "fs-savepoint-store-test-");
-		assertEquals(0, tmp.getRoot().listFiles().length);
-
-		// New savepoint type for test
-		int version = ThreadLocalRandom.current().nextInt();
-		long checkpointId = ThreadLocalRandom.current().nextLong();
-
-		// Add serializer
-		serializers.put(version, NewSavepointSerializer.INSTANCE);
-
-		TestSavepoint newSavepoint = new TestSavepoint(version, checkpointId);
-		String pathNewSavepoint = store.storeSavepoint(newSavepoint);
-		assertEquals(1, tmp.getRoot().listFiles().length);
-
-		// Savepoint v0
-		Savepoint savepoint = new SavepointV1(checkpointId, SavepointV1Test.createTaskStates(4, 32));
-		String pathSavepoint = store.storeSavepoint(savepoint);
-		assertEquals(2, tmp.getRoot().listFiles().length);
-
-		// Load
-		Savepoint loaded = store.loadSavepoint(pathNewSavepoint);
-		assertEquals(newSavepoint, loaded);
-
-		loaded = store.loadSavepoint(pathSavepoint);
-		assertEquals(savepoint, loaded);
-	}
-
-	/**
-	 * Tests that an exception during store cleans up the created savepoint file.
-	 */
-	@Test
-	public void testCleanupOnStoreFailure() throws Exception {
-		Field field = SavepointSerializers.class.getDeclaredField("SERIALIZERS");
-		field.setAccessible(true);
-		Map<Integer, SavepointSerializer<?>> serializers = (Map<Integer, SavepointSerializer<?>>) field.get(null);
-
-		final int version = 123123;
-		SavepointSerializer<TestSavepoint> serializer = mock(SavepointSerializer.class);
-		doThrow(new RuntimeException("Test Exception")).when(serializer)
-				.serialize(Matchers.any(TestSavepoint.class), any(DataOutputStream.class));
-
-		FsSavepointStore store = new FsSavepointStore(tmp.getRoot().getPath(), "fs-savepoint-store-test-");
-		serializers.put(version, serializer);
-
-		Savepoint savepoint = new TestSavepoint(version, 12123123);
-
-		assertEquals(0, tmp.getRoot().listFiles().length);
-
-		try {
-			store.storeSavepoint(savepoint);
-		} catch (Throwable ignored) {
-		}
-
-		assertEquals("Savepoint file not cleaned up on failure", 0, tmp.getRoot().listFiles().length);
-	}
-
-	private static class NewSavepointSerializer implements SavepointSerializer<TestSavepoint> {
-
-		private static final NewSavepointSerializer INSTANCE = new NewSavepointSerializer();
-
-		@Override
-		public void serialize(TestSavepoint savepoint, DataOutputStream dos) throws IOException {
-			dos.writeInt(savepoint.version);
-			dos.writeLong(savepoint.checkpointId);
-		}
-
-		@Override
-		public TestSavepoint deserialize(DataInputStream dis) throws IOException {
-			int version = dis.readInt();
-			long checkpointId = dis.readLong();
-			return new TestSavepoint(version, checkpointId);
-		}
-
-	}
-
-	private static class TestSavepoint implements Savepoint {
-
-		private final int version;
-		private final long checkpointId;
-
-		public TestSavepoint(int version, long checkpointId) {
-			this.version = version;
-			this.checkpointId = checkpointId;
-		}
-
-		@Override
-		public int getVersion() {
-			return version;
-		}
-
-		@Override
-		public long getCheckpointId() {
-			return checkpointId;
-		}
-
-		@Override
-		public Collection<TaskState> getTaskStates() {
-			return Collections.EMPTY_LIST;
-		}
-
-		@Override
-		public void dispose() {
-		}
-
-		@Override
-		public boolean equals(Object o) {
-			if (this == o) {
-				return true;
-			}
-			if (o == null || getClass() != o.getClass()) {
-				return false;
-			}
-			TestSavepoint that = (TestSavepoint) o;
-			return version == that.version && checkpointId == that.checkpointId;
-
-		}
-
-		@Override
-		public int hashCode() {
-			int result = version;
-			result = 31 * result + (int) (checkpointId ^ (checkpointId >>> 32));
-			return result;
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
index 766531a..b594f4e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
@@ -23,8 +23,11 @@ import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.TaskState;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
+import java.io.File;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -32,66 +35,59 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class SavepointLoaderTest {
 
+	@Rule
+	public TemporaryFolder tmpFolder = new TemporaryFolder();
+
 	/**
 	 * Tests loading and validation of savepoints with correct setup,
 	 * parallelism mismatch, and a missing task.
 	 */
 	@Test
 	public void testLoadAndValidateSavepoint() throws Exception {
+		File tmp = tmpFolder.newFolder();
+
 		int parallelism = 128128;
+		long checkpointId = Integer.MAX_VALUE + 123123L;
 		JobVertexID vertexId = new JobVertexID();
 
 		TaskState state = mock(TaskState.class);
 		when(state.getParallelism()).thenReturn(parallelism);
 		when(state.getJobVertexID()).thenReturn(vertexId);
+		when(state.getMaxParallelism()).thenReturn(parallelism);
+		when(state.getChainLength()).thenReturn(1);
 
 		Map<JobVertexID, TaskState> taskStates = new HashMap<>();
 		taskStates.put(vertexId, state);
 
-		CompletedCheckpoint stored = new CompletedCheckpoint(
-				new JobID(),
-				Integer.MAX_VALUE + 123123L,
-				10200202,
-				1020292988,
-				taskStates,
-				true);
-
 		// Store savepoint
-		SavepointV1 savepoint = new SavepointV1(stored.getCheckpointID(), taskStates.values());
-		SavepointStore store = new HeapSavepointStore();
-		String path = store.storeSavepoint(savepoint);
+		SavepointV1 savepoint = new SavepointV1(checkpointId, taskStates.values());
+		String path = SavepointStore.storeSavepoint(tmp.getAbsolutePath(), savepoint);
 
 		JobID jobId = new JobID();
 
 		ExecutionJobVertex vertex = mock(ExecutionJobVertex.class);
 		when(vertex.getParallelism()).thenReturn(parallelism);
+		when(vertex.getMaxParallelism()).thenReturn(parallelism);
 
 		Map<JobVertexID, ExecutionJobVertex> tasks = new HashMap<>();
 		tasks.put(vertexId, vertex);
 
 		// 1) Load and validate: everything correct
-		CompletedCheckpoint loaded = SavepointLoader.loadAndValidateSavepoint(jobId, tasks, store, path);
+		CompletedCheckpoint loaded = SavepointLoader.loadAndValidateSavepoint(jobId, tasks, path);
 
 		assertEquals(jobId, loaded.getJobId());
-		assertEquals(stored.getCheckpointID(), loaded.getCheckpointID());
-
-		// The loaded checkpoint should not discard state when its discarded
-		loaded.discardState();
-		verify(state, times(0)).discardState();
+		assertEquals(checkpointId, loaded.getCheckpointID());
 
 		// 2) Load and validate: max parallelism mismatch
 		when(vertex.getMaxParallelism()).thenReturn(222);
 
 		try {
-			SavepointLoader.loadAndValidateSavepoint(jobId, tasks, store, path);
+			SavepointLoader.loadAndValidateSavepoint(jobId, tasks, path);
 			fail("Did not throw expected Exception");
 		} catch (IllegalStateException expected) {
 			assertTrue(expected.getMessage().contains("Max parallelism mismatch"));
@@ -101,7 +97,7 @@ public class SavepointLoaderTest {
 		assertNotNull(tasks.remove(vertexId));
 
 		try {
-			SavepointLoader.loadAndValidateSavepoint(jobId, tasks, store, path);
+			SavepointLoader.loadAndValidateSavepoint(jobId, tasks, path);
 			fail("Did not throw expected Exception");
 		} catch (IllegalStateException expected) {
 			assertTrue(expected.getMessage().contains("Cannot map old state"));

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreFactoryTest.java
deleted file mode 100644
index 3dfe85e..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreFactoryTest.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.checkpoint.savepoint;
-
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.IllegalConfigurationException;
-import org.apache.flink.core.fs.Path;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class SavepointStoreFactoryTest {
-
-	@Test
-	public void testStateStoreWithDefaultConfig() throws Exception {
-		SavepointStore store = SavepointStoreFactory.createFromConfig(new Configuration());
-		Assert.assertTrue(store instanceof HeapSavepointStore);
-	}
-
-	@Test
-	public void testSavepointBackendJobManager() throws Exception {
-		Configuration config = new Configuration();
-		config.setString(SavepointStoreFactory.SAVEPOINT_BACKEND_KEY, "jobmanager");
-		SavepointStore store = SavepointStoreFactory.createFromConfig(config);
-		Assert.assertTrue(store instanceof HeapSavepointStore);
-	}
-
-	@Test
-	public void testSavepointBackendFileSystem() throws Exception {
-		Configuration config = new Configuration();
-		String rootPath = System.getProperty("java.io.tmpdir");
-		config.setString(ConfigConstants.STATE_BACKEND, "filesystem");
-		config.setString(SavepointStoreFactory.SAVEPOINT_BACKEND_KEY, "filesystem");
-		config.setString(SavepointStoreFactory.SAVEPOINT_DIRECTORY_KEY, rootPath);
-
-		SavepointStore store = SavepointStoreFactory.createFromConfig(config);
-		Assert.assertTrue(store instanceof FsSavepointStore);
-
-		FsSavepointStore stateStore = (FsSavepointStore) store;
-		Assert.assertEquals(new Path(rootPath), stateStore.getRootPath());
-	}
-
-	@Test
-	public void testSavepointBackendFileSystemButCheckpointBackendJobManager() throws Exception {
-		Configuration config = new Configuration();
-		String rootPath = System.getProperty("java.io.tmpdir");
-		// This combination does not make sense, because the checkpoints will be
-		// lost after the job manager shuts down.
-		config.setString(ConfigConstants.STATE_BACKEND, "jobmanager");
-		config.setString(SavepointStoreFactory.SAVEPOINT_BACKEND_KEY, "filesystem");
-		config.setString(SavepointStoreFactory.SAVEPOINT_DIRECTORY_KEY, rootPath);
-
-		SavepointStore store = SavepointStoreFactory.createFromConfig(config);
-		Assert.assertTrue(store instanceof FsSavepointStore);
-
-		FsSavepointStore stateStore = (FsSavepointStore) store;
-		Assert.assertEquals(new Path(rootPath), stateStore.getRootPath());
-	}
-
-	@Test(expected = IllegalConfigurationException.class)
-	public void testSavepointBackendFileSystemButNoDirectory() throws Exception {
-		Configuration config = new Configuration();
-		config.setString(SavepointStoreFactory.SAVEPOINT_BACKEND_KEY, "filesystem");
-		SavepointStoreFactory.createFromConfig(config);
-		Assert.fail("Did not throw expected Exception");
-	}
-
-	@Test(expected = IllegalConfigurationException.class)
-	public void testUnexpectedSavepointBackend() throws Exception {
-		Configuration config = new Configuration();
-		config.setString(SavepointStoreFactory.SAVEPOINT_BACKEND_KEY, "unexpected");
-		SavepointStoreFactory.createFromConfig(config);
-		Assert.fail("Did not throw expected Exception");
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreTest.java
new file mode 100644
index 0000000..8eed6ea
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreTest.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.savepoint;
+
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.checkpoint.TaskState;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.Matchers;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+
+public class SavepointStoreTest {
+
+	@Rule
+	public TemporaryFolder tmp = new TemporaryFolder();
+
+	/**
+	 * Tests a store-load-dispose sequence.
+	 */
+	@Test
+	public void testStoreLoadDispose() throws Exception {
+		String target = tmp.getRoot().getAbsolutePath();
+
+		assertEquals(0, tmp.getRoot().listFiles().length);
+
+		// Store
+		SavepointV1 stored = new SavepointV1(1929292, SavepointV1Test.createTaskStates(4, 24));
+		String path = SavepointStore.storeSavepoint(target, stored);
+		assertEquals(1, tmp.getRoot().listFiles().length);
+
+		// Load
+		Savepoint loaded = SavepointStore.loadSavepoint(path);
+		assertEquals(stored, loaded);
+
+		loaded.dispose();
+
+		// Dispose
+		SavepointStore.removeSavepoint(path);
+
+		assertEquals(0, tmp.getRoot().listFiles().length);
+	}
+
+	/**
+	 * Tests loading with unexpected magic number.
+	 */
+	@Test
+	public void testUnexpectedSavepoint() throws Exception {
+		// Random file
+		Path filePath = new Path(tmp.getRoot().getPath(), UUID.randomUUID().toString());
+		FSDataOutputStream fdos = FileSystem.get(filePath.toUri()).create(filePath, false);
+		DataOutputStream dos = new DataOutputStream(fdos);
+		for (int i = 0; i < 10; i++) {
+			dos.writeLong(ThreadLocalRandom.current().nextLong());
+		}
+
+		try {
+			SavepointStore.loadSavepoint(filePath.toString());
+			fail("Did not throw expected Exception");
+		} catch (RuntimeException e) {
+			assertTrue(e.getMessage().contains("Flink 1.0") && e.getMessage().contains("Unexpected magic number"));
+		}
+	}
+
+	/**
+	 * Tests addition of a new savepoint version.
+	 */
+	@Test
+	@SuppressWarnings("unchecked")
+	public void testMultipleSavepointVersions() throws Exception {
+		Field field = SavepointSerializers.class.getDeclaredField("SERIALIZERS");
+		field.setAccessible(true);
+		Map<Integer, SavepointSerializer<?>> serializers = (Map<Integer, SavepointSerializer<?>>) field.get(null);
+
+		assertTrue(serializers.size() >= 1);
+
+		String target = tmp.getRoot().getAbsolutePath();
+		assertEquals(0, tmp.getRoot().listFiles().length);
+
+		// New savepoint type for test
+		int version = ThreadLocalRandom.current().nextInt();
+		long checkpointId = ThreadLocalRandom.current().nextLong();
+
+		// Add serializer
+		serializers.put(version, NewSavepointSerializer.INSTANCE);
+
+		TestSavepoint newSavepoint = new TestSavepoint(version, checkpointId);
+		String pathNewSavepoint = SavepointStore.storeSavepoint(target, newSavepoint);
+		assertEquals(1, tmp.getRoot().listFiles().length);
+
+		// Savepoint v0
+		Savepoint savepoint = new SavepointV1(checkpointId, SavepointV1Test.createTaskStates(4, 32));
+		String pathSavepoint = SavepointStore.storeSavepoint(target, savepoint);
+		assertEquals(2, tmp.getRoot().listFiles().length);
+
+		// Load
+		Savepoint loaded = SavepointStore.loadSavepoint(pathNewSavepoint);
+		assertEquals(newSavepoint, loaded);
+
+		loaded = SavepointStore.loadSavepoint(pathSavepoint);
+		assertEquals(savepoint, loaded);
+	}
+
+	/**
+	 * Tests that an exception during store cleans up the created savepoint file.
+	 */
+	@Test
+	public void testCleanupOnStoreFailure() throws Exception {
+		Field field = SavepointSerializers.class.getDeclaredField("SERIALIZERS");
+		field.setAccessible(true);
+		Map<Integer, SavepointSerializer<?>> serializers = (Map<Integer, SavepointSerializer<?>>) field.get(null);
+
+		String target = tmp.getRoot().getAbsolutePath();
+
+		final int version = 123123;
+		SavepointSerializer<TestSavepoint> serializer = mock(SavepointSerializer.class);
+		doThrow(new RuntimeException("Test Exception")).when(serializer)
+				.serialize(Matchers.any(TestSavepoint.class), any(DataOutputStream.class));
+
+		serializers.put(version, serializer);
+
+		Savepoint savepoint = new TestSavepoint(version, 12123123);
+
+		assertEquals(0, tmp.getRoot().listFiles().length);
+
+		try {
+			SavepointStore.storeSavepoint(target, savepoint);
+		} catch (Throwable ignored) {
+		}
+
+		assertEquals("Savepoint file not cleaned up on failure", 0, tmp.getRoot().listFiles().length);
+	}
+
+	private static class NewSavepointSerializer implements SavepointSerializer<TestSavepoint> {
+
+		private static final NewSavepointSerializer INSTANCE = new NewSavepointSerializer();
+
+		@Override
+		public void serialize(TestSavepoint savepoint, DataOutputStream dos) throws IOException {
+			dos.writeInt(savepoint.version);
+			dos.writeLong(savepoint.checkpointId);
+		}
+
+		@Override
+		public TestSavepoint deserialize(DataInputStream dis) throws IOException {
+			int version = dis.readInt();
+			long checkpointId = dis.readLong();
+			return new TestSavepoint(version, checkpointId);
+		}
+
+	}
+
+	private static class TestSavepoint implements Savepoint {
+
+		private final int version;
+		private final long checkpointId;
+
+		public TestSavepoint(int version, long checkpointId) {
+			this.version = version;
+			this.checkpointId = checkpointId;
+		}
+
+		@Override
+		public int getVersion() {
+			return version;
+		}
+
+		@Override
+		public long getCheckpointId() {
+			return checkpointId;
+		}
+
+		@Override
+		public Collection<TaskState> getTaskStates() {
+			return Collections.EMPTY_LIST;
+		}
+
+		@Override
+		public void dispose() {
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			}
+			if (o == null || getClass() != o.getClass()) {
+				return false;
+			}
+			TestSavepoint that = (TestSavepoint) o;
+			return version == that.version && checkpointId == that.checkpointId;
+
+		}
+
+		@Override
+		public int hashCode() {
+			int result = version;
+			result = 31 * result + (int) (checkpointId ^ (checkpointId >>> 32));
+			return result;
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java
index 1e95732..2dac87f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.checkpoint.stats;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.checkpoint.CheckpointProperties;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
 import org.apache.flink.runtime.checkpoint.TaskState;
@@ -344,7 +345,7 @@ public class SimpleCheckpointStatsTrackerTest {
 			// Add some random delay
 			final long completionTimestamp = triggerTimestamp + completionDuration + RAND.nextInt(10);
 
-			checkpoints[i] = new CompletedCheckpoint(jobId, i, triggerTimestamp, completionTimestamp, taskGroupStates, true);
+			checkpoints[i] = new CompletedCheckpoint(jobId, i, triggerTimestamp, completionTimestamp, taskGroupStates, CheckpointProperties.forStandardCheckpoint(), null);
 		}
 
 		return checkpoints;

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index 612fe35..9277029 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -37,7 +37,6 @@ import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
-import org.apache.flink.runtime.checkpoint.savepoint.HeapSavepointStore;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
@@ -49,6 +48,7 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
 import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
@@ -168,7 +168,6 @@ public class JobManagerHARecoveryTest {
 					myLeaderElectionService,
 					mySubmittedJobGraphStore,
 					checkpointStateFactory,
-					new HeapSavepointStore(),
 					jobRecoveryTimeout,
 					Option.apply(null));
 
@@ -205,7 +204,8 @@ public class JobManagerHARecoveryTest {
 					100,
 					10 * 60 * 1000,
 					0,
-					1));
+					1,
+					ExternalizedCheckpointSettings.none()));
 
 			BlockingStatefulInvokable.initializeStaticHelpers(slots);
 
@@ -294,7 +294,6 @@ public class JobManagerHARecoveryTest {
 	 */
 	static class MyCheckpointStore implements CompletedCheckpointStore {
 
-
 		private final ArrayDeque<CompletedCheckpoint> checkpoints = new ArrayDeque<>(2);
 
 		private final ArrayDeque<CompletedCheckpoint> suspended = new ArrayDeque<>(2);
@@ -309,7 +308,7 @@ public class JobManagerHARecoveryTest {
 		public void addCheckpoint(CompletedCheckpoint checkpoint) throws Exception {
 			checkpoints.addLast(checkpoint);
 			if (checkpoints.size() > 1) {
-				checkpoints.removeFirst().discardState();
+				checkpoints.removeFirst().subsume();
 			}
 		}
 
@@ -319,15 +318,14 @@ public class JobManagerHARecoveryTest {
 		}
 
 		@Override
-		public void shutdown() throws Exception {
-			checkpoints.clear();
-			suspended.clear();
-		}
-
-		@Override
-		public void suspend() throws Exception {
-			suspended.addAll(checkpoints);
-			checkpoints.clear();
+		public void shutdown(JobStatus jobStatus) throws Exception {
+			if (jobStatus.isGloballyTerminalState()) {
+				checkpoints.clear();
+				suspended.clear();
+			} else {
+				suspended.addAll(checkpoints);
+				checkpoints.clear();
+			}
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
index ed4d530..0c45fac 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
@@ -39,8 +39,6 @@ import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.jobmanager.StandaloneSubmittedJobGraphStore;
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
-import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore;
-import org.apache.flink.runtime.checkpoint.savepoint.SavepointStoreFactory;
 import org.apache.flink.runtime.testingUtils.TestingJobManager;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
@@ -183,7 +181,6 @@ public class JobManagerLeaderElectionTest extends TestLogger {
 		// We don't need recovery in this test
 		SubmittedJobGraphStore submittedJobGraphStore = new StandaloneSubmittedJobGraphStore();
 		CheckpointRecoveryFactory checkpointRecoveryFactory = new StandaloneCheckpointRecoveryFactory();
-		SavepointStore savepointStore = SavepointStoreFactory.createFromConfig(configuration);
 
 		return Props.create(
 				TestingJobManager.class,
@@ -198,7 +195,6 @@ public class JobManagerLeaderElectionTest extends TestLogger {
 				leaderElectionService,
 				submittedJobGraphStore,
 				checkpointRecoveryFactory,
-				savepointStore,
 				AkkaUtils.getDefaultTimeout(),
 				Option.apply(null)
 		);

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
index 548bef0..0569297 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
@@ -18,15 +18,15 @@
 
 package org.apache.flink.runtime.jobmanager
 
-
 import akka.actor.ActorSystem
 import akka.testkit.{ImplicitSender, TestKit}
 import akka.util.Timeout
 import org.apache.flink.api.common.JobID
 import org.apache.flink.runtime.akka.ListeningBehaviour
-import org.apache.flink.runtime.checkpoint.CheckpointCoordinator
+import org.apache.flink.runtime.checkpoint.{CheckpointCoordinator, CompletedCheckpoint}
 import org.apache.flink.runtime.client.JobExecutionException
-import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture
+import org.apache.flink.runtime.jobgraph.tasks.{ExternalizedCheckpointSettings, JobSnapshottingSettings}
 import org.apache.flink.runtime.jobgraph.{DistributionPattern, JobGraph, JobVertex, ScheduleMode}
 import org.apache.flink.runtime.jobmanager.Tasks._
 import org.apache.flink.runtime.jobmanager.scheduler.{NoResourceAvailableException, SlotSharingGroup}
@@ -34,6 +34,7 @@ import org.apache.flink.runtime.messages.JobManagerMessages._
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._
 import org.apache.flink.runtime.testingUtils.{ScalaTestingUtils, TestingUtils}
 import org.junit.runner.RunWith
+import org.mockito.Mockito
 import org.mockito.Mockito._
 import org.scalatest.junit.JUnitRunner
 import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
@@ -752,7 +753,7 @@ class JobManagerITCase(_system: ActorSystem)
           val jobId = new JobID()
 
           // Trigger savepoint for non-existing job
-          jobManager.tell(TriggerSavepoint(jobId), testActor)
+          jobManager.tell(TriggerSavepoint(jobId, Option.apply("any")), testActor)
           val response = expectMsgType[TriggerSavepointFailure](deadline.timeLeft)
 
           // Verify the response
@@ -784,7 +785,7 @@ class JobManagerITCase(_system: ActorSystem)
           expectMsg(JobSubmitSuccess(jobGraph.getJobID()))
 
           // Trigger savepoint for job with disabled checkpointing
-          jobManager.tell(TriggerSavepoint(jobGraph.getJobID()), testActor)
+          jobManager.tell(TriggerSavepoint(jobGraph.getJobID(), Option.apply("any")), testActor)
           val response = expectMsgType[TriggerSavepointFailure](deadline.timeLeft)
 
           // Verify the response
@@ -815,7 +816,7 @@ class JobManagerITCase(_system: ActorSystem)
             java.util.Collections.emptyList(),
             java.util.Collections.emptyList(),
             java.util.Collections.emptyList(),
-            60000, 60000, 60000, 1))
+            60000, 60000, 60000, 1, ExternalizedCheckpointSettings.none))
 
           // Submit job...
           jobManager.tell(SubmitJob(jobGraph, ListeningBehaviour.DETACHED), testActor)
@@ -829,7 +830,8 @@ class JobManagerITCase(_system: ActorSystem)
           // Mock the checkpoint coordinator
           val checkpointCoordinator = mock(classOf[CheckpointCoordinator])
           doThrow(new Exception("Expected Test Exception"))
-            .when(checkpointCoordinator).triggerSavepoint(org.mockito.Matchers.anyLong())
+            .when(checkpointCoordinator)
+            .triggerSavepoint(org.mockito.Matchers.anyLong(), org.mockito.Matchers.anyString())
 
           // Update the savepoint coordinator field
           val field = executionGraph.getClass.getDeclaredField("checkpointCoordinator")
@@ -837,7 +839,7 @@ class JobManagerITCase(_system: ActorSystem)
           field.set(executionGraph, checkpointCoordinator)
 
           // Trigger savepoint for job
-          jobManager.tell(TriggerSavepoint(jobGraph.getJobID()), testActor)
+          jobManager.tell(TriggerSavepoint(jobGraph.getJobID(), Option.apply("any")), testActor)
           val response = expectMsgType[TriggerSavepointFailure](deadline.timeLeft)
 
           // Verify the response
@@ -851,7 +853,7 @@ class JobManagerITCase(_system: ActorSystem)
       }
     }
 
-    "handle trigger savepoint response after failed savepoint future" in {
+    "handle failed savepoint triggering" in {
       val deadline = TestingUtils.TESTING_DURATION.fromNow
 
       val flinkCluster = TestingUtils.startTestingCluster(1, 1)
@@ -868,7 +870,7 @@ class JobManagerITCase(_system: ActorSystem)
             java.util.Collections.emptyList(),
             java.util.Collections.emptyList(),
             java.util.Collections.emptyList(),
-            60000, 60000, 60000, 1))
+            60000, 60000, 60000, 1, ExternalizedCheckpointSettings.none))
 
           // Submit job...
           jobManager.tell(SubmitJob(jobGraph, ListeningBehaviour.DETACHED), testActor)
@@ -877,10 +879,12 @@ class JobManagerITCase(_system: ActorSystem)
           // Mock the checkpoint coordinator
           val checkpointCoordinator = mock(classOf[CheckpointCoordinator])
           doThrow(new Exception("Expected Test Exception"))
-            .when(checkpointCoordinator).triggerSavepoint(org.mockito.Matchers.anyLong())
-          val savepointPathPromise = scala.concurrent.promise[String]
-          doReturn(savepointPathPromise.future)
-            .when(checkpointCoordinator).triggerSavepoint(org.mockito.Matchers.anyLong())
+            .when(checkpointCoordinator)
+            .triggerSavepoint(org.mockito.Matchers.anyLong(), org.mockito.Matchers.anyString())
+          val savepointPathPromise = new FlinkCompletableFuture[CompletedCheckpoint]()
+          doReturn(savepointPathPromise)
+            .when(checkpointCoordinator)
+            .triggerSavepoint(org.mockito.Matchers.anyLong(), org.mockito.Matchers.anyString())
 
           // Request the execution graph and set a checkpoint coordinator mock
           jobManager.tell(RequestExecutionGraph(jobGraph.getJobID), testActor)
@@ -893,10 +897,10 @@ class JobManagerITCase(_system: ActorSystem)
           field.set(executionGraph, checkpointCoordinator)
 
           // Trigger savepoint for job
-          jobManager.tell(TriggerSavepoint(jobGraph.getJobID()), testActor)
+          jobManager.tell(TriggerSavepoint(jobGraph.getJobID(), Option.apply("any")), testActor)
 
           // Fail the promise
-          savepointPathPromise.failure(new Exception("Expected Test Exception"))
+          savepointPathPromise.completeExceptionally(new Exception("Expected Test Exception"))
 
           val response = expectMsgType[TriggerSavepointFailure](deadline.timeLeft)
 
@@ -928,7 +932,7 @@ class JobManagerITCase(_system: ActorSystem)
             java.util.Collections.emptyList(),
             java.util.Collections.emptyList(),
             java.util.Collections.emptyList(),
-            60000, 60000, 60000, 1))
+            60000, 60000, 60000, 1, ExternalizedCheckpointSettings.none))
 
           // Submit job...
           jobManager.tell(SubmitJob(jobGraph, ListeningBehaviour.DETACHED), testActor)
@@ -937,10 +941,13 @@ class JobManagerITCase(_system: ActorSystem)
           // Mock the checkpoint coordinator
           val checkpointCoordinator = mock(classOf[CheckpointCoordinator])
           doThrow(new Exception("Expected Test Exception"))
-            .when(checkpointCoordinator).triggerSavepoint(org.mockito.Matchers.anyLong())
-          val savepointPathPromise = scala.concurrent.promise[String]
-          doReturn(savepointPathPromise.future)
-            .when(checkpointCoordinator).triggerSavepoint(org.mockito.Matchers.anyLong())
+            .when(checkpointCoordinator)
+            .triggerSavepoint(org.mockito.Matchers.anyLong(), org.mockito.Matchers.anyString())
+
+          val savepointPromise = new FlinkCompletableFuture[CompletedCheckpoint]()
+          doReturn(savepointPromise)
+            .when(checkpointCoordinator)
+            .triggerSavepoint(org.mockito.Matchers.anyLong(), org.mockito.Matchers.anyString())
 
           // Request the execution graph and set a checkpoint coordinator mock
           jobManager.tell(RequestExecutionGraph(jobGraph.getJobID), testActor)
@@ -953,10 +960,13 @@ class JobManagerITCase(_system: ActorSystem)
           field.set(executionGraph, checkpointCoordinator)
 
           // Trigger savepoint for job
-          jobManager.tell(TriggerSavepoint(jobGraph.getJobID()), testActor)
+          jobManager.tell(TriggerSavepoint(jobGraph.getJobID(), Option.apply("any")), testActor)
+
+          val checkpoint = Mockito.mock(classOf[CompletedCheckpoint])
+          when(checkpoint.getExternalPath).thenReturn("Expected test savepoint path")
 
           // Succeed the promise
-          savepointPathPromise.success("Expected test savepoint path")
+          savepointPromise.complete(checkpoint)
 
           val response = expectMsgType[TriggerSavepointSuccess](deadline.timeLeft)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index c01a321..50a5559 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -20,14 +20,13 @@ package org.apache.flink.runtime.testingUtils
 
 import java.util.concurrent.{ExecutorService, TimeUnit, TimeoutException}
 
-import akka.pattern.ask
 import akka.actor.{ActorRef, ActorSystem, Props}
 import akka.pattern.Patterns._
+import akka.pattern.ask
 import akka.testkit.CallingThreadDispatcher
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
-import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager
 import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
@@ -38,10 +37,10 @@ import org.apache.flink.runtime.jobmanager.{JobManager, MemoryArchivist, Submitt
 import org.apache.flink.runtime.leaderelection.LeaderElectionService
 import org.apache.flink.runtime.metrics.MetricRegistry
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
-import org.apache.flink.runtime.testutils.TestingResourceManager
 import org.apache.flink.runtime.taskmanager.TaskManager
 import org.apache.flink.runtime.testingUtils.TestingMessages.Alive
 import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager
+import org.apache.flink.runtime.testutils.TestingResourceManager
 
 import scala.concurrent.duration.FiniteDuration
 import scala.concurrent.{Await, Future}
@@ -91,7 +90,6 @@ class TestingCluster(
     leaderElectionService: LeaderElectionService,
     submittedJobGraphStore: SubmittedJobGraphStore,
     checkpointRecoveryFactory: CheckpointRecoveryFactory,
-    savepointStore: SavepointStore,
     jobRecoveryTimeout: FiniteDuration,
     metricsRegistry: Option[MetricRegistry]): Props = {
 
@@ -108,7 +106,6 @@ class TestingCluster(
       leaderElectionService,
       submittedJobGraphStore,
       checkpointRecoveryFactory,
-      savepointStore,
       jobRecoveryTimeout,
       metricsRegistry)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
index 62349db..e9bdb99 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
@@ -23,7 +23,6 @@ import java.util.concurrent.ExecutorService
 import akka.actor.ActorRef
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
-import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
 import org.apache.flink.runtime.instance.InstanceManager
@@ -50,12 +49,11 @@ class TestingJobManager(
     leaderElectionService: LeaderElectionService,
     submittedJobGraphs : SubmittedJobGraphStore,
     checkpointRecoveryFactory : CheckpointRecoveryFactory,
-    savepointStore : SavepointStore,
     jobRecoveryTimeout : FiniteDuration,
     metricRegistry : Option[MetricRegistry])
   extends JobManager(
     flinkConfiguration,
-      executorService,
+    executorService,
     instanceManager,
     scheduler,
     libraryCacheManager,
@@ -65,7 +63,6 @@ class TestingJobManager(
     leaderElectionService,
     submittedJobGraphs,
     checkpointRecoveryFactory,
-    savepointStore,
     jobRecoveryTimeout,
     metricRegistry)
   with TestingJobManagerLike {}

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
index 5ba2790..d775869 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
@@ -22,6 +22,7 @@ import akka.actor.{ActorRef, Cancellable, Terminated}
 import akka.pattern.{ask, pipe}
 import org.apache.flink.api.common.JobID
 import org.apache.flink.runtime.FlinkActor
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore
 import org.apache.flink.runtime.execution.ExecutionState
 import org.apache.flink.runtime.jobgraph.JobStatus
 import org.apache.flink.runtime.jobmanager.JobManager
@@ -294,7 +295,7 @@ trait TestingJobManagerLike extends FlinkActor {
 
     case RequestSavepoint(savepointPath) =>
       try {
-        val savepoint = savepointStore.loadSavepoint(savepointPath)
+        val savepoint = SavepointStore.loadSavepoint(savepointPath)
         sender ! ResponseSavepoint(savepoint)
       }
       catch {

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
index a268c83..0abdd46 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
@@ -20,28 +20,26 @@ package org.apache.flink.runtime.testingUtils
 
 import java.util.UUID
 
-import akka.actor.{Props, Kill, ActorSystem, ActorRef}
+import akka.actor.{ActorRef, ActorSystem, Kill, Props}
 import akka.pattern.ask
 import com.google.common.util.concurrent.MoreExecutors
-
 import com.typesafe.config.ConfigFactory
 import grizzled.slf4j.Logger
 import org.apache.flink.api.common.JobExecutionResult
-
-import org.apache.flink.configuration.{HighAvailabilityOptions, ConfigConstants, Configuration}
+import org.apache.flink.configuration.{ConfigConstants, Configuration, HighAvailabilityOptions}
+import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.client.JobClient
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager
-import org.apache.flink.runtime.jobgraph.JobGraph
 import org.apache.flink.runtime.clusterframework.types.ResourceID
-import org.apache.flink.runtime.jobmanager.{HighAvailabilityMode, MemoryArchivist, JobManager}
-import org.apache.flink.runtime.testutils.TestingResourceManager
-import org.apache.flink.runtime.util.LeaderRetrievalUtils
-import org.apache.flink.runtime.{LogMessages, LeaderSessionMessageFilter, FlinkActor}
-import org.apache.flink.runtime.akka.AkkaUtils
-import org.apache.flink.runtime.instance.{AkkaActorGateway, ActorGateway}
+import org.apache.flink.runtime.instance.{ActorGateway, AkkaActorGateway}
+import org.apache.flink.runtime.jobgraph.JobGraph
+import org.apache.flink.runtime.jobmanager.{JobManager, MemoryArchivist}
 import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService
 import org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegisteredAtJobManager
 import org.apache.flink.runtime.taskmanager.TaskManager
+import org.apache.flink.runtime.testutils.TestingResourceManager
+import org.apache.flink.runtime.util.LeaderRetrievalUtils
+import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, LogMessages}
 
 import scala.concurrent.duration._
 import scala.concurrent.{Await, ExecutionContext}
@@ -356,7 +354,6 @@ object TestingUtils {
     leaderElectionService,
     submittedJobGraphs,
     checkpointRecoveryFactory,
-    savepointStore,
     jobRecoveryTimeout,
     metricsRegistry) = JobManager.createJobManagerComponents(
       configuration,


Mime
View raw message