flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject [3/7] flink git commit: [FLINK-4512] [FLIP-10] Add option to persist periodic checkpoints
Date Fri, 14 Oct 2016 08:06:01 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreFactory.java
deleted file mode 100644
index d2bf40e..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreFactory.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.checkpoint.savepoint;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.IllegalConfigurationException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-
-/**
- * Factory for {@link SavepointStore} instances.
- */
-public class SavepointStoreFactory {
-
-	public static final String SAVEPOINT_BACKEND_KEY = "savepoints.state.backend";
-	public static final String SAVEPOINT_DIRECTORY_KEY = "savepoints.state.backend.fs.dir";
-	public static final String DEFAULT_SAVEPOINT_BACKEND = "jobmanager";
-
-	public static final Logger LOG = LoggerFactory.getLogger(SavepointStoreFactory.class);
-
-	/**
-	 * Creates a {@link SavepointStore} from the specified Configuration.
-	 *
-	 * <p>You can configure a savepoint-specific backend for the savepoints. If
-	 * you don't configure anything, the regular checkpoint backend will be
-	 * used.
-	 *
-	 * <p>The default and fallback backend is the job manager, which loses the
-	 * savepoint after the job manager shuts down.
-	 *
-	 * @param config The configuration to parse the savepoint backend configuration.
-	 * @return A savepoint store.
-	 */
-	public static SavepointStore createFromConfig(Configuration config) throws Exception {
-
-		// Try a the savepoint-specific configuration first.
-		String savepointBackend = config.getString(SAVEPOINT_BACKEND_KEY, DEFAULT_SAVEPOINT_BACKEND);
-
-		if (savepointBackend == null) {
-			LOG.info("No savepoint state backend configured. " +
-					"Using job manager savepoint state backend.");
-			return createJobManagerSavepointStore();
-		} else if (savepointBackend.equals("jobmanager")) {
-			LOG.info("Using job manager savepoint state backend.");
-			return createJobManagerSavepointStore();
-		} else if (savepointBackend.equals("filesystem")) {
-			String rootPath = config.getString(SAVEPOINT_DIRECTORY_KEY, null);
-
-			if (rootPath == null) {
-				throw new IllegalConfigurationException("Using filesystem as savepoint state backend, " +
-						"but did not specify directory. Please set the " +
-						"following configuration key: '" + SAVEPOINT_DIRECTORY_KEY +
-						"' (e.g. " + SAVEPOINT_DIRECTORY_KEY + ": hdfs:///flink/savepoints/). " +
-						"Falling back to job manager savepoint backend.");
-			} else {
-				LOG.info("Using filesystem savepoint backend (root path: {}).", rootPath);
-
-				return createFileSystemSavepointStore(rootPath);
-			}
-		} else {
-			throw new IllegalConfigurationException("Unexpected savepoint backend " +
-					"configuration '" + savepointBackend + "'. " +
-					"Falling back to job manager savepoint state backend.");
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	// Savepoint stores
-	// ------------------------------------------------------------------------
-
-	private static SavepointStore createJobManagerSavepointStore() {
-		return new HeapSavepointStore();
-	}
-
-	private static SavepointStore createFileSystemSavepointStore(String rootPath) throws IOException {
-		return new FsSavepointStore(rootPath, "savepoint-");
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/JobCheckpointStats.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/JobCheckpointStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/JobCheckpointStats.java
index c474311..b1d7ff2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/JobCheckpointStats.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/JobCheckpointStats.java
@@ -47,6 +47,13 @@ public interface JobCheckpointStats {
 	 */
 	long getCount();
 
+	/**
+	 * Returns the most recent external path of a checkpoint.
+	 *
+	 * @return External checkpoint path or <code>null</code> if none available.
+	 */
+	String getExternalPath();
+
 	// ------------------------------------------------------------------------
 	// Duration
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java
index 2217fd4..db8a0e0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java
@@ -130,6 +130,7 @@ public class SimpleCheckpointStatsTracker implements CheckpointStatsTracker {
 
 		metrics.gauge("lastCheckpointSize", new CheckpointSizeGauge());
 		metrics.gauge("lastCheckpointDuration", new CheckpointDurationGauge());
+		metrics.gauge("lastCheckpointExternalPath", new CheckpointExternalPathGauge());
 	}
 
 	@Override
@@ -278,6 +279,7 @@ public class SimpleCheckpointStatsTracker implements CheckpointStatsTracker {
 						// Need to clone in order to have a consistent snapshot.
 						// We can safely update it afterwards.
 						(List<CheckpointStats>) history.clone(),
+						latestCompletedCheckpoint.getExternalPath(),
 						overallCount,
 						overallMinDuration,
 						overallMaxDuration,
@@ -349,6 +351,7 @@ public class SimpleCheckpointStatsTracker implements CheckpointStatsTracker {
 		// General
 		private final List<CheckpointStats> recentHistory;
 		private final long count;
+		private final String externalPath;
 
 		// Duration
 		private final long minDuration;
@@ -362,6 +365,7 @@ public class SimpleCheckpointStatsTracker implements CheckpointStatsTracker {
 
 		public JobCheckpointStatsSnapshot(
 				List<CheckpointStats> recentHistory,
+				String externalPath,
 				long count,
 				long minDuration,
 				long maxDuration,
@@ -372,6 +376,7 @@ public class SimpleCheckpointStatsTracker implements CheckpointStatsTracker {
 
 			this.recentHistory = recentHistory;
 			this.count = count;
+			this.externalPath = externalPath;
 
 			this.minDuration = minDuration;
 			this.maxDuration = maxDuration;
@@ -393,6 +398,11 @@ public class SimpleCheckpointStatsTracker implements CheckpointStatsTracker {
 		}
 
 		@Override
+		public String getExternalPath() {
+			return externalPath;
+		}
+
+		@Override
 		public long getMinDuration() {
 			return minDuration;
 		}
@@ -440,4 +450,17 @@ public class SimpleCheckpointStatsTracker implements CheckpointStatsTracker {
 			return latestCompletedCheckpoint == null ? -1 : latestCompletedCheckpoint.getDuration();
 		}
 	}
+
+	private class CheckpointExternalPathGauge implements Gauge<String> {
+
+		@Override
+		public String getValue() {
+			CompletedCheckpoint checkpoint = latestCompletedCheckpoint;
+			if (checkpoint != null && checkpoint.getExternalPath() != null) {
+				return checkpoint.getExternalPath();
+			} else {
+				return "n/a";
+			}
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index cf98ca6..10f0e88 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -35,7 +35,6 @@ import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
 import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
-import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore;
 import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.SuppressRestartsException;
@@ -49,6 +48,7 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.ScheduleMode;
+import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.query.KvStateLocationRegistry;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
@@ -353,12 +353,13 @@ public class ExecutionGraph {
 			long checkpointTimeout,
 			long minPauseBetweenCheckpoints,
 			int maxConcurrentCheckpoints,
+			ExternalizedCheckpointSettings externalizeSettings,
 			List<ExecutionJobVertex> verticesToTrigger,
 			List<ExecutionJobVertex> verticesToWaitFor,
 			List<ExecutionJobVertex> verticesToCommitTo,
 			CheckpointIDCounter checkpointIDCounter,
 			CompletedCheckpointStore checkpointStore,
-			SavepointStore savepointStore,
+			String checkpointDir,
 			CheckpointStatsTracker statsTracker) {
 
 		// simple sanity checks
@@ -389,12 +390,13 @@ public class ExecutionGraph {
 				checkpointTimeout,
 				minPauseBetweenCheckpoints,
 				maxConcurrentCheckpoints,
+				externalizeSettings,
 				tasksToTrigger,
 				tasksToWaitFor,
 				tasksToCommitTo,
 				checkpointIDCounter,
 				checkpointStore,
-				savepointStore,
+				checkpointDir,
 				checkpointStatsTracker);
 
 		// the periodic checkpoint scheduler is activated and deactivated as a result of
@@ -414,7 +416,7 @@ public class ExecutionGraph {
 		}
 
 		if (checkpointCoordinator != null) {
-			checkpointCoordinator.suspend();
+			checkpointCoordinator.shutdown(state);
 			checkpointCoordinator = null;
 			checkpointStatsTracker = null;
 		}
@@ -1076,15 +1078,8 @@ public class ExecutionGraph {
 			CheckpointCoordinator coord = this.checkpointCoordinator;
 			this.checkpointCoordinator = null;
 			if (coord != null) {
-				if (state.isGloballyTerminalState()) {
-					coord.shutdown();
-				} else {
-					coord.suspend();
-				}
+				coord.shutdown(state);
 			}
-
-			// We don't clean the checkpoint stats tracker, because we want
-			// it to be available after the job has terminated.
 		} catch (Exception e) {
 			LOG.error("Error while cleaning up after execution", e);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
index 1c6eb8d..dcd6a5d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
@@ -28,7 +28,6 @@ import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
-import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore;
 import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
 import org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker;
 import org.apache.flink.runtime.checkpoint.stats.SimpleCheckpointStatsTracker;
@@ -40,9 +39,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
 import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
-
 import org.slf4j.Logger;
-
 import scala.concurrent.ExecutionContext;
 import scala.concurrent.ExecutionContext$;
 import scala.concurrent.duration.FiniteDuration;
@@ -71,7 +68,6 @@ public class ExecutionGraphBuilder {
 			Executor executor,
 			ClassLoader classLoader,
 			CheckpointRecoveryFactory recoveryFactory,
-			SavepointStore savepointStore,
 			Time timeout,
 			RestartStrategy restartStrategy,
 			MetricGroup metrics,
@@ -82,7 +78,7 @@ public class ExecutionGraphBuilder {
 		final ExecutionContext executionContext = ExecutionContext$.MODULE$.fromExecutor(executor);
 		
 		return buildGraph(prior, jobGraph, jobManagerConfig, executionContext,
-				classLoader, recoveryFactory, savepointStore, timeout, restartStrategy,
+				classLoader, recoveryFactory, timeout, restartStrategy,
 				metrics, parallelismForAutoMax, log);
 	}
 
@@ -98,7 +94,6 @@ public class ExecutionGraphBuilder {
 			ExecutionContext executionContext,
 			ClassLoader classLoader,
 			CheckpointRecoveryFactory recoveryFactory,
-			SavepointStore savepointStore,
 			Time timeout,
 			RestartStrategy restartStrategy,
 			MetricGroup metrics,
@@ -183,7 +178,6 @@ public class ExecutionGraphBuilder {
 		// configure the state checkpointing
 		JobSnapshottingSettings snapshotSettings = jobGraph.getSnapshotSettings();
 		if (snapshotSettings != null) {
-
 			List<ExecutionJobVertex> triggerVertices = 
 					idToVertex(snapshotSettings.getVerticesToTrigger(), executionGraph);
 
@@ -220,17 +214,22 @@ public class ExecutionGraphBuilder {
 				checkpointStatsTracker = new SimpleCheckpointStatsTracker(historySize, ackVertices, metrics);
 			}
 
+			/** The default directory for externalized checkpoints. */
+			String externalizedCheckpointsDir = jobManagerConfig.getString(
+					ConfigConstants.CHECKPOINTS_DIRECTORY_KEY, null);
+
 			executionGraph.enableSnapshotCheckpointing(
 					snapshotSettings.getCheckpointInterval(),
 					snapshotSettings.getCheckpointTimeout(),
 					snapshotSettings.getMinPauseBetweenCheckpoints(),
 					snapshotSettings.getMaxConcurrentCheckpoints(),
+					snapshotSettings.getExternalizedCheckpointSettings(),
 					triggerVertices,
 					ackVertices,
 					confirmVertices,
 					checkpointIdCounter,
 					completedCheckpoints,
-					savepointStore,
+					externalizedCheckpointsDir,
 					checkpointStatsTracker);
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/ExternalizedCheckpointSettings.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/ExternalizedCheckpointSettings.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/ExternalizedCheckpointSettings.java
new file mode 100644
index 0000000..779fc76
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/ExternalizedCheckpointSettings.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobgraph.tasks;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * Grouped settings for externalized checkpoints.
+ */
+@Internal
+public class ExternalizedCheckpointSettings implements java.io.Serializable {
+
+	private static final ExternalizedCheckpointSettings NONE = new ExternalizedCheckpointSettings(false, false);
+
+	/** Flag indicating whether checkpoints should be externalized. */
+	private final boolean externalizeCheckpoints;
+
+	/** Flag indicating whether externalized checkpoints should delete on cancellation. */
+	private final boolean deleteOnCancellation;
+
+	private ExternalizedCheckpointSettings(boolean externalizeCheckpoints, boolean deleteOnCancellation) {
+		this.externalizeCheckpoints = externalizeCheckpoints;
+		this.deleteOnCancellation = deleteOnCancellation;
+	}
+
+	/**
+	 * Returns <code>true</code> if checkpoints should be externalized.
+	 *
+	 * @return <code>true</code> if checkpoints should be externalized.
+	 */
+	public boolean externalizeCheckpoints() {
+		return externalizeCheckpoints;
+	}
+
+	/**
+	 * Returns <code>true</code> if externalized checkpoints should be deleted on cancellation.
+	 *
+	 * @return <code>true</code> if externalized checkpoints should be deleted on cancellation.
+	 */
+	public boolean deleteOnCancellation() {
+		return deleteOnCancellation;
+	}
+
+	public static ExternalizedCheckpointSettings externalizeCheckpoints(boolean deleteOnCancellation) {
+		return new ExternalizedCheckpointSettings(true, deleteOnCancellation);
+	}
+
+	public static ExternalizedCheckpointSettings none() {
+		return NONE;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java
index ab701b5..36a5c5d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java
@@ -32,7 +32,6 @@ import static java.util.Objects.requireNonNull;
 public class JobSnapshottingSettings implements java.io.Serializable{
 	
 	private static final long serialVersionUID = -2593319571078198180L;
-
 	
 	private final List<JobVertexID> verticesToTrigger;
 
@@ -48,19 +47,25 @@ public class JobSnapshottingSettings implements java.io.Serializable{
 	
 	private final int maxConcurrentCheckpoints;
 
+	/** Settings for externalized checkpoints. */
+	private final ExternalizedCheckpointSettings externalizedCheckpointSettings;
+
 	/** Path to savepoint to reset state back to (optional, can be null) */
 	private String savepointPath;
 	
-	public JobSnapshottingSettings(List<JobVertexID> verticesToTrigger,
-									List<JobVertexID> verticesToAcknowledge,
-									List<JobVertexID> verticesToConfirm,
-									long checkpointInterval, long checkpointTimeout,
-									long minPauseBetweenCheckpoints, int maxConcurrentCheckpoints)
-	{
+	public JobSnapshottingSettings(
+			List<JobVertexID> verticesToTrigger,
+			List<JobVertexID> verticesToAcknowledge,
+			List<JobVertexID> verticesToConfirm,
+			long checkpointInterval,
+			long checkpointTimeout,
+			long minPauseBetweenCheckpoints,
+			int maxConcurrentCheckpoints,
+			ExternalizedCheckpointSettings externalizedCheckpointSettings) {
+
 		// sanity checks
 		if (checkpointInterval < 1 || checkpointTimeout < 1 ||
-				minPauseBetweenCheckpoints < 0 || maxConcurrentCheckpoints < 1)
-		{
+				minPauseBetweenCheckpoints < 0 || maxConcurrentCheckpoints < 1) {
 			throw new IllegalArgumentException();
 		}
 		
@@ -71,6 +76,7 @@ public class JobSnapshottingSettings implements java.io.Serializable{
 		this.checkpointTimeout = checkpointTimeout;
 		this.minPauseBetweenCheckpoints = minPauseBetweenCheckpoints;
 		this.maxConcurrentCheckpoints = maxConcurrentCheckpoints;
+		this.externalizedCheckpointSettings = requireNonNull(externalizedCheckpointSettings);
 	}
 	
 	// --------------------------------------------------------------------------------------------
@@ -103,6 +109,10 @@ public class JobSnapshottingSettings implements java.io.Serializable{
 		return maxConcurrentCheckpoints;
 	}
 
+	public ExternalizedCheckpointSettings getExternalizedCheckpointSettings() {
+		return externalizedCheckpointSettings;
+	}
+
 	/**
 	 * Sets the savepoint path.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
index e125e10..67fc397 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
@@ -218,15 +218,13 @@ public class ZooKeeperUtils {
 	 * @param configuration                  {@link Configuration} object
 	 * @param jobId                          ID of job to create the instance for
 	 * @param maxNumberOfCheckpointsToRetain The maximum number of checkpoints to retain
-	 * @param userClassLoader                User code class loader
 	 * @return {@link ZooKeeperCompletedCheckpointStore} instance
 	 */
 	public static CompletedCheckpointStore createCompletedCheckpoints(
 			CuratorFramework client,
 			Configuration configuration,
 			JobID jobId,
-			int maxNumberOfCheckpointsToRetain,
-			ClassLoader userClassLoader) throws Exception {
+			int maxNumberOfCheckpointsToRetain) throws Exception {
 
 		checkNotNull(configuration, "Configuration");
 
@@ -244,7 +242,6 @@ public class ZooKeeperUtils {
 
 		return new ZooKeeperCompletedCheckpointStore(
 				maxNumberOfCheckpointsToRetain,
-				userClassLoader,
 				client,
 				checkpointsPath,
 				stateStorage);

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala
index 485a21e..f426254 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala
@@ -24,7 +24,6 @@ import akka.actor.ActorRef
 import org.apache.flink.api.common.JobID
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
-import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore
 import org.apache.flink.runtime.clusterframework.messages._
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
@@ -69,7 +68,6 @@ abstract class ContaineredJobManager(
     leaderElectionService: LeaderElectionService,
     submittedJobGraphs : SubmittedJobGraphStore,
     checkpointRecoveryFactory : CheckpointRecoveryFactory,
-    savepointStore: SavepointStore,
     jobRecoveryTimeout: FiniteDuration,
     metricsRegistry: Option[FlinkMetricRegistry])
   extends JobManager(
@@ -84,7 +82,6 @@ abstract class ContaineredJobManager(
     leaderElectionService,
     submittedJobGraphs,
     checkpointRecoveryFactory,
-    savepointStore,
     jobRecoveryTimeout,
     metricsRegistry) {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index be820ae..450e810 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -19,8 +19,8 @@
 package org.apache.flink.runtime.jobmanager
 
 import java.io.{File, IOException}
-import java.net.{BindException, InetAddress, InetSocketAddress, ServerSocket, UnknownHostException}
 import java.lang.management.ManagementFactory
+import java.net.{BindException, InetAddress, InetSocketAddress, ServerSocket, UnknownHostException}
 import java.util.UUID
 import java.util.concurrent.{ExecutorService, TimeUnit, TimeoutException}
 import javax.management.ObjectName
@@ -34,23 +34,23 @@ import org.apache.flink.api.common.time.Time
 import org.apache.flink.configuration.{ConfigConstants, Configuration, GlobalConfiguration, HighAvailabilityOptions}
 import org.apache.flink.core.fs.FileSystem
 import org.apache.flink.core.io.InputSplitAssigner
-import org.apache.flink.metrics.{Gauge, MetricGroup}
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup
+import org.apache.flink.metrics.{Gauge, MetricGroup}
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot
 import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour}
 import org.apache.flink.runtime.blob.BlobServer
 import org.apache.flink.runtime.checkpoint._
-import org.apache.flink.runtime.checkpoint.savepoint.{SavepointLoader, SavepointStore, SavepointStoreFactory}
-import org.apache.flink.runtime.checkpoint.stats.{CheckpointStatsTracker, DisabledCheckpointStatsTracker, SimpleCheckpointStatsTracker}
+import org.apache.flink.runtime.checkpoint.savepoint.{SavepointLoader, SavepointStore}
 import org.apache.flink.runtime.client._
-import org.apache.flink.runtime.execution.SuppressRestartsException
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager
 import org.apache.flink.runtime.clusterframework.messages._
 import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager
 import org.apache.flink.runtime.clusterframework.types.ResourceID
+import org.apache.flink.runtime.concurrent.BiFunction
+import org.apache.flink.runtime.execution.SuppressRestartsException
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
-import org.apache.flink.runtime.executiongraph.{ExecutionGraphBuilder, ExecutionGraph, ExecutionJobVertex, StatusListenerMessenger}
+import org.apache.flink.runtime.executiongraph.{ExecutionGraph, ExecutionGraphBuilder, ExecutionJobVertex, StatusListenerMessenger}
 import org.apache.flink.runtime.instance.{AkkaActorGateway, InstanceManager}
 import org.apache.flink.runtime.io.network.PartitionState
 import org.apache.flink.runtime.jobgraph.{JobGraph, JobStatus}
@@ -66,15 +66,14 @@ import org.apache.flink.runtime.messages.TaskManagerMessages.{Heartbeat, SendSta
 import org.apache.flink.runtime.messages.TaskMessages.UpdateTaskExecutionState
 import org.apache.flink.runtime.messages.accumulators.{AccumulatorMessage, AccumulatorResultStringsFound, AccumulatorResultsErroneous, AccumulatorResultsFound, RequestAccumulatorResults, RequestAccumulatorResultsStringified}
 import org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage, AcknowledgeCheckpoint, DeclineCheckpoint}
-import org.apache.flink.runtime.messages.webmonitor.InfoMessage
-import org.apache.flink.runtime.messages.webmonitor._
-import org.apache.flink.runtime.metrics.{MetricRegistryConfiguration, MetricRegistry => FlinkMetricRegistry}
+import org.apache.flink.runtime.messages.webmonitor.{InfoMessage, _}
 import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup
+import org.apache.flink.runtime.metrics.{MetricRegistry => FlinkMetricRegistry, MetricRegistryConfiguration}
 import org.apache.flink.runtime.process.ProcessReaper
-import org.apache.flink.runtime.query.{KvStateMessage, UnknownKvStateLocation}
 import org.apache.flink.runtime.query.KvStateMessage.{LookupKvStateLocation, NotifyKvStateRegistered, NotifyKvStateUnregistered}
-import org.apache.flink.runtime.security.SecurityContext.{FlinkSecuredRunner, SecurityConfiguration}
+import org.apache.flink.runtime.query.{KvStateMessage, UnknownKvStateLocation}
 import org.apache.flink.runtime.security.SecurityContext
+import org.apache.flink.runtime.security.SecurityContext.{FlinkSecuredRunner, SecurityConfiguration}
 import org.apache.flink.runtime.taskmanager.TaskManager
 import org.apache.flink.runtime.util._
 import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils}
@@ -128,7 +127,6 @@ class JobManager(
     protected val leaderElectionService: LeaderElectionService,
     protected val submittedJobGraphs : SubmittedJobGraphStore,
     protected val checkpointRecoveryFactory : CheckpointRecoveryFactory,
-    protected val savepointStore: SavepointStore,
     protected val jobRecoveryTimeout: FiniteDuration,
     protected val metricsRegistry: Option[FlinkMetricRegistry])
   extends FlinkActor
@@ -178,6 +176,13 @@ class JobManager(
   val webMonitorPort : Int = flinkConfiguration.getInteger(
     ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, -1)
 
+  /** The default directory for savepoints. */
+  val defaultSavepointDir: String = ConfigurationUtil.getStringWithDeprecatedKeys(
+    flinkConfiguration,
+    ConfigConstants.SAVEPOINT_DIRECTORY_KEY,
+    null,
+    ConfigConstants.SAVEPOINT_FS_DIRECTORY_KEY)
+
   /** The resource manager actor responsible for allocating and managing task manager resources. */
   var currentResourceManager: Option[ActorRef] = None
 
@@ -242,14 +247,6 @@ class JobManager(
     }
 
     try {
-      savepointStore.shutdown()
-    } catch {
-      case e: Exception =>
-        log.error("Could not shut down savepoint store.", e)
-        throw new RuntimeException("Could not stop the  savepoint store store.", e)
-    }
-
-    try {
       // revoke leadership and stop leader election service
       leaderElectionService.stop()
     } catch {
@@ -695,7 +692,7 @@ class JobManager(
     case kvStateMsg : KvStateMessage =>
       handleKvStateMessage(kvStateMsg)
 
-    case TriggerSavepoint(jobId) =>
+    case TriggerSavepoint(jobId, savepointDirectory) =>
       currentJobs.get(jobId) match {
         case Some((graph, _)) =>
           val checkpointCoordinator = graph.getCheckpointCoordinator()
@@ -703,31 +700,46 @@ class JobManager(
           if (checkpointCoordinator != null) {
             // Immutable copy for the future
             val senderRef = sender()
-
-            future {
-              try {
-                // Do this async, because checkpoint coordinator operations can
-                // contain blocking calls to the state backend or ZooKeeper.
-                val savepointFuture = checkpointCoordinator.triggerSavepoint(
-                  System.currentTimeMillis())
-
-                savepointFuture.onComplete {
-                  // Success, respond with the savepoint path
-                  case scala.util.Success(savepointPath) =>
-                    senderRef ! TriggerSavepointSuccess(jobId, savepointPath)
-
-                  // Failure, respond with the cause
-                  case scala.util.Failure(t) =>
-                    senderRef ! TriggerSavepointFailure(
-                      jobId,
-                      new Exception("Failed to complete savepoint", t))
-                }(context.dispatcher)
-              } catch {
-                case e: Exception =>
-                  senderRef ! TriggerSavepointFailure(jobId, new Exception(
-                    "Failed to trigger savepoint", e))
+            try {
+              val targetDirectory : String = savepointDirectory.getOrElse(
+                flinkConfiguration.getString(ConfigConstants.SAVEPOINT_DIRECTORY_KEY, null))
+
+              if (targetDirectory == null) {
+                throw new IllegalStateException("No savepoint directory configured. " +
+                  "You can either specify a directory when triggering this savepoint or " +
+                  "configure a cluster-wide default via key '" +
+                  ConfigConstants.SAVEPOINT_DIRECTORY_KEY + "'.")
               }
-            }(context.dispatcher)
+
+              // Do this async, because checkpoint coordinator operations can
+              // contain blocking calls to the state backend or ZooKeeper.
+              val savepointFuture = checkpointCoordinator.triggerSavepoint(
+                System.currentTimeMillis(),
+                targetDirectory)
+
+              savepointFuture.handleAsync[Void](
+                new BiFunction[CompletedCheckpoint, Throwable, Void] {
+                  override def apply(success: CompletedCheckpoint, cause: Throwable): Void = {
+                    if (success != null) {
+                      if (success.getExternalPath != null) {
+                        senderRef ! TriggerSavepointSuccess(jobId, success.getExternalPath)
+                      } else {
+                        senderRef ! TriggerSavepointFailure(
+                          jobId, new Exception("Savepoint has not been persisted."))
+                      }
+                    } else {
+                      senderRef ! TriggerSavepointFailure(
+                        jobId, new Exception("Failed to complete savepoint", cause))
+                    }
+                    null
+                  }
+                },
+                context.dispatcher)
+            } catch {
+              case e: Exception =>
+                senderRef ! TriggerSavepointFailure(jobId, new Exception(
+                  "Failed to trigger savepoint", e))
+            }
           } else {
             sender() ! TriggerSavepointFailure(jobId, new IllegalStateException(
               "Checkpointing disabled. You can enable it via the execution environment of " +
@@ -744,12 +756,15 @@ class JobManager(
         try {
           log.info(s"Disposing savepoint at '$savepointPath'.")
 
-          val savepoint = savepointStore.loadSavepoint(savepointPath)
+          val savepoint = SavepointStore.loadSavepoint(savepointPath)
 
           log.debug(s"$savepoint")
 
-          // Dispose the savepoint
-          savepointStore.disposeSavepoint(savepointPath)
+          // Dispose checkpoint state
+          savepoint.dispose()
+
+          // Remove the header file
+          SavepointStore.removeSavepoint(savepointPath)
 
           senderRef ! DisposeSavepointSuccess
         } catch {
@@ -1150,7 +1165,6 @@ class JobManager(
           executionContext,
           userCodeLoader,
           checkpointRecoveryFactory,
-          savepointStore,
           Time.of(timeout.length, timeout.unit),
           restartStrategy,
           jobMetrics,
@@ -1218,7 +1232,7 @@ class JobManager(
 
                   // load the savepoint as a checkpoint into the system
                   val savepoint: CompletedCheckpoint = SavepointLoader.loadAndValidateSavepoint(
-                    jobId, executionGraph.getAllVertices, savepointStore, savepointPath)
+                    jobId, executionGraph.getAllVertices, savepointPath)
 
                   executionGraph.getCheckpointCoordinator.getCheckpointStore
                     .addCheckpoint(savepoint)
@@ -2420,7 +2434,6 @@ object JobManager {
     LeaderElectionService,
     SubmittedJobGraphStore,
     CheckpointRecoveryFactory,
-    SavepointStore,
     FiniteDuration, // timeout for job recovery
     Option[FlinkMetricRegistry]
    ) = {
@@ -2497,8 +2510,6 @@ object JobManager {
             new ZooKeeperCheckpointRecoveryFactory(client, configuration))
       }
 
-    val savepointStore = SavepointStoreFactory.createFromConfig(configuration)
-
     val jobRecoveryTimeoutStr = configuration.getValue(HighAvailabilityOptions.HA_JOB_DELAY)
 
     val jobRecoveryTimeout = if (jobRecoveryTimeoutStr == null || jobRecoveryTimeoutStr.isEmpty) {
@@ -2531,7 +2542,6 @@ object JobManager {
       leaderElectionService,
       submittedJobGraphs,
       checkpointRecoveryFactory,
-      savepointStore,
       jobRecoveryTimeout,
       metricRegistry)
   }
@@ -2595,8 +2605,7 @@ object JobManager {
     leaderElectionService,
     submittedJobGraphs,
     checkpointRecoveryFactory,
-    savepointStore,
-    jobRecoveryTimeout, 
+    jobRecoveryTimeout,
     metricsRegistry) = createJobManagerComponents(
       configuration,
       None)
@@ -2622,7 +2631,6 @@ object JobManager {
       leaderElectionService,
       submittedJobGraphs,
       checkpointRecoveryFactory,
-      savepointStore,
       jobRecoveryTimeout,
       metricsRegistry)
 
@@ -2657,7 +2665,6 @@ object JobManager {
     leaderElectionService: LeaderElectionService,
     submittedJobGraphStore: SubmittedJobGraphStore,
     checkpointRecoveryFactory: CheckpointRecoveryFactory,
-    savepointStore: SavepointStore,
     jobRecoveryTimeout: FiniteDuration,
     metricsRegistry: Option[FlinkMetricRegistry]): Props = {
 
@@ -2674,7 +2681,6 @@ object JobManager {
       leaderElectionService,
       submittedJobGraphStore,
       checkpointRecoveryFactory,
-      savepointStore,
       jobRecoveryTimeout,
       metricsRegistry)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
index 5e2b547..fd45cda 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
@@ -467,8 +467,11 @@ object JobManagerMessages {
     * of triggering and acknowledging checkpoints.
     *
     * @param jobId The JobID of the job to trigger the savepoint for.
+    * @param savepointDirectory Optional target directory
     */
-  case class TriggerSavepoint(jobId: JobID) extends RequiresLeaderSessionID
+  case class TriggerSavepoint(
+      jobId: JobID,
+      savepointDirectory : Option[String] = Option.empty) extends RequiresLeaderSessionID
 
   /**
     * Response after a successful savepoint trigger containing the savepoint path.

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index 27c9dd9..2f453a3 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -25,7 +25,6 @@ import org.apache.flink.api.common.JobID
 import org.apache.flink.api.common.io.FileOutputFormat
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
-import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager
 import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager
 import org.apache.flink.runtime.clusterframework.types.{ResourceID, ResourceIDRetrievable}
@@ -119,7 +118,6 @@ class LocalFlinkMiniCluster(
     leaderElectionService,
     submittedJobGraphStore,
     checkpointRecoveryFactory,
-    savepointStore,
     jobRecoveryTimeout,
     metricsRegistry) = JobManager.createJobManagerComponents(config, createLeaderElectionService())
 
@@ -143,7 +141,6 @@ class LocalFlinkMiniCluster(
         leaderElectionService,
         submittedJobGraphStore,
         checkpointRecoveryFactory,
-        savepointStore,
         jobRecoveryTimeout,
         metricsRegistry),
       jobManagerName)
@@ -248,7 +245,6 @@ class LocalFlinkMiniCluster(
     leaderElectionService: LeaderElectionService,
     submittedJobGraphStore: SubmittedJobGraphStore,
     checkpointRecoveryFactory: CheckpointRecoveryFactory,
-    savepointStore: SavepointStore,
     jobRecoveryTimeout: FiniteDuration,
     metricsRegistry: Option[MetricRegistry]): Props = {
 
@@ -265,7 +261,6 @@ class LocalFlinkMiniCluster(
       leaderElectionService,
       submittedJobGraphStore,
       checkpointRecoveryFactory,
-      savepointStore,
       jobRecoveryTimeout,
       metricsRegistry)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 728c7d5..7b0e819 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -23,14 +23,16 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.checkpoint.savepoint.HeapSavepointStore;
 import org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker;
+import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete;
@@ -50,11 +52,12 @@ import org.apache.flink.runtime.util.TestByteStreamStateHandleDeepCompare;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
 import org.junit.Assert;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import scala.concurrent.ExecutionContext;
-import scala.concurrent.Future;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -92,7 +95,8 @@ import static org.mockito.Mockito.when;
  */
 public class CheckpointCoordinatorTest {
 
-	private static final ClassLoader cl = Thread.currentThread().getContextClassLoader();
+	@Rule
+	public TemporaryFolder tmpFolder = new TemporaryFolder();
 
 	@Test
 	public void testCheckpointAbortsIfTriggerTasksAreNotExecuted() {
@@ -115,13 +119,15 @@ public class CheckpointCoordinatorTest {
 					jid,
 					600000,
 					600000,
-					0, Integer.MAX_VALUE,
+					0,
+					Integer.MAX_VALUE,
+					ExternalizedCheckpointSettings.none(),
 					new ExecutionVertex[] { triggerVertex1, triggerVertex2 },
 					new ExecutionVertex[] { ackVertex1, ackVertex2 },
 					new ExecutionVertex[] {},
 					new StandaloneCheckpointIDCounter(),
-					new StandaloneCompletedCheckpointStore(1, cl),
-					new HeapSavepointStore(),
+					new StandaloneCompletedCheckpointStore(1),
+					null,
 					new DisabledCheckpointStatsTracker());
 
 			// nothing should be happening
@@ -135,7 +141,7 @@ public class CheckpointCoordinatorTest {
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
 
-			coord.shutdown();
+			coord.shutdown(JobStatus.FINISHED);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -168,12 +174,13 @@ public class CheckpointCoordinatorTest {
 					600000,
 					0,
 					Integer.MAX_VALUE,
+					ExternalizedCheckpointSettings.none(),
 					new ExecutionVertex[] { triggerVertex1, triggerVertex2 },
 					new ExecutionVertex[] { ackVertex1, ackVertex2 },
 					new ExecutionVertex[] {},
 					new StandaloneCheckpointIDCounter(),
-					new StandaloneCompletedCheckpointStore(1, cl),
-					new HeapSavepointStore(),
+					new StandaloneCompletedCheckpointStore(1),
+					null,
 					new DisabledCheckpointStatsTracker());
 
 			// nothing should be happening
@@ -187,7 +194,7 @@ public class CheckpointCoordinatorTest {
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
 
-			coord.shutdown();
+			coord.shutdown(JobStatus.FINISHED);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -218,12 +225,13 @@ public class CheckpointCoordinatorTest {
 					600000,
 					0,
 					Integer.MAX_VALUE,
+					ExternalizedCheckpointSettings.none(),
 					new ExecutionVertex[] { triggerVertex1, triggerVertex2 },
 					new ExecutionVertex[] { ackVertex1, ackVertex2 },
 					new ExecutionVertex[] {},
 					new StandaloneCheckpointIDCounter(),
-					new StandaloneCompletedCheckpointStore(1, cl),
-					new HeapSavepointStore(),
+					new StandaloneCompletedCheckpointStore(1),
+					null,
 					new DisabledCheckpointStatsTracker());
 
 			// nothing should be happening
@@ -237,7 +245,7 @@ public class CheckpointCoordinatorTest {
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
 
-			coord.shutdown();
+			coord.shutdown(JobStatus.FINISHED);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -269,12 +277,13 @@ public class CheckpointCoordinatorTest {
 					600000,
 					0,
 					Integer.MAX_VALUE,
+					ExternalizedCheckpointSettings.none(),
 					new ExecutionVertex[] { vertex1, vertex2 },
 					new ExecutionVertex[] { vertex1, vertex2 },
 					new ExecutionVertex[] { vertex1, vertex2 },
 					new StandaloneCheckpointIDCounter(),
-					new StandaloneCompletedCheckpointStore(1, cl),
-					new HeapSavepointStore(),
+					new StandaloneCompletedCheckpointStore(1),
+					null,
 					new DisabledCheckpointStatsTracker());
 
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -363,7 +372,7 @@ public class CheckpointCoordinatorTest {
 			long checkpointIdNew2 = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
 			assertEquals(checkpointIdNew2, checkpointIdNew);
 
-			coord.shutdown();
+			coord.shutdown(JobStatus.FINISHED);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -395,12 +404,13 @@ public class CheckpointCoordinatorTest {
 					600000,
 					0,
 					Integer.MAX_VALUE,
+					ExternalizedCheckpointSettings.none(),
 					new ExecutionVertex[] { vertex1, vertex2 },
 					new ExecutionVertex[] { vertex1, vertex2 },
 					new ExecutionVertex[] { vertex1, vertex2 },
 					new StandaloneCheckpointIDCounter(),
-					new StandaloneCompletedCheckpointStore(1, cl),
-					new HeapSavepointStore(),
+					new StandaloneCompletedCheckpointStore(1),
+					null,
 					new DisabledCheckpointStatsTracker());
 
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -488,7 +498,7 @@ public class CheckpointCoordinatorTest {
 			coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID2, checkpoint1Id, checkpoint1.getCheckpointTimestamp()));
 			assertTrue(checkpoint1.isDiscarded());
 
-			coord.shutdown();
+			coord.shutdown(JobStatus.FINISHED);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -515,12 +525,13 @@ public class CheckpointCoordinatorTest {
 					600000,
 					0,
 					Integer.MAX_VALUE,
+					ExternalizedCheckpointSettings.none(),
 					new ExecutionVertex[] { vertex1, vertex2 },
 					new ExecutionVertex[] { vertex1, vertex2 },
 					new ExecutionVertex[] { vertex1, vertex2 },
 					new StandaloneCheckpointIDCounter(),
-					new StandaloneCompletedCheckpointStore(1, cl),
-					new HeapSavepointStore(),
+					new StandaloneCompletedCheckpointStore(1),
+					null,
 					new DisabledCheckpointStatsTracker());
 
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -626,7 +637,7 @@ public class CheckpointCoordinatorTest {
 				verify(vertex2, times(1)).sendMessageToCurrentExecution(eq(confirmMessage2), eq(attemptID2));
 			}
 
-			coord.shutdown();
+			coord.shutdown(JobStatus.FINISHED);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -668,12 +679,13 @@ public class CheckpointCoordinatorTest {
 					600000,
 					0,
 					Integer.MAX_VALUE,
+					ExternalizedCheckpointSettings.none(),
 					new ExecutionVertex[] { triggerVertex1, triggerVertex2 },
 					new ExecutionVertex[] { ackVertex1, ackVertex2, ackVertex3 },
 					new ExecutionVertex[] { commitVertex },
 					new StandaloneCheckpointIDCounter(),
-					new StandaloneCompletedCheckpointStore(2, cl),
-					new HeapSavepointStore(),
+					new StandaloneCompletedCheckpointStore(2),
+					null,
 					new DisabledCheckpointStatsTracker());
 
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -766,7 +778,7 @@ public class CheckpointCoordinatorTest {
 			assertEquals(jid, sc2.getJobId());
 			assertTrue(sc2.getTaskStates().isEmpty());
 
-			coord.shutdown();
+			coord.shutdown(JobStatus.FINISHED);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -807,12 +819,13 @@ public class CheckpointCoordinatorTest {
 					600000,
 					0,
 					Integer.MAX_VALUE,
+					ExternalizedCheckpointSettings.none(),
 					new ExecutionVertex[] { triggerVertex1, triggerVertex2 },
 					new ExecutionVertex[] { ackVertex1, ackVertex2, ackVertex3 },
 					new ExecutionVertex[] { commitVertex },
 					new StandaloneCheckpointIDCounter(),
-					new StandaloneCompletedCheckpointStore(10, cl),
-					new HeapSavepointStore(),
+					new StandaloneCompletedCheckpointStore(10),
+					null,
 					new DisabledCheckpointStatsTracker());
 
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -893,7 +906,7 @@ public class CheckpointCoordinatorTest {
 			// send the last remaining ack for the first checkpoint. This should not do anything
 			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, new CheckpointMetaData(checkpointId1, 0L)));
 
-			coord.shutdown();
+			coord.shutdown(JobStatus.FINISHED);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -932,12 +945,13 @@ public class CheckpointCoordinatorTest {
 					200,
 					0,
 					Integer.MAX_VALUE,
+					ExternalizedCheckpointSettings.none(),
 					new ExecutionVertex[] { triggerVertex },
 					new ExecutionVertex[] { ackVertex1, ackVertex2 },
 					new ExecutionVertex[] { commitVertex },
 					new StandaloneCheckpointIDCounter(),
-					new StandaloneCompletedCheckpointStore(2, cl),
-					new HeapSavepointStore(),
+					new StandaloneCompletedCheckpointStore(2),
+					null,
 					new DisabledCheckpointStatsTracker());
 
 			// trigger a checkpoint, partially acknowledged
@@ -968,7 +982,7 @@ public class CheckpointCoordinatorTest {
 			verify(commitVertex, times(0))
 					.sendMessageToCurrentExecution(any(NotifyCheckpointComplete.class), any(ExecutionAttemptID.class));
 
-			coord.shutdown();
+			coord.shutdown(JobStatus.FINISHED);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -1000,12 +1014,13 @@ public class CheckpointCoordinatorTest {
 					200000,
 					0,
 					Integer.MAX_VALUE,
+					ExternalizedCheckpointSettings.none(),
 					new ExecutionVertex[] { triggerVertex },
 					new ExecutionVertex[] { ackVertex1, ackVertex2 },
 					new ExecutionVertex[] { commitVertex },
 					new StandaloneCheckpointIDCounter(),
-					new StandaloneCompletedCheckpointStore(2, cl),
-					new HeapSavepointStore(),
+					new StandaloneCompletedCheckpointStore(2),
+					null,
 					new DisabledCheckpointStatsTracker());
 
 			assertTrue(coord.triggerCheckpoint(timestamp));
@@ -1027,7 +1042,7 @@ public class CheckpointCoordinatorTest {
 			// unknown ack vertex
 			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, new ExecutionAttemptID(), checkpointMetaData));
 
-			coord.shutdown();
+			coord.shutdown(JobStatus.FINISHED);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -1081,12 +1096,13 @@ public class CheckpointCoordinatorTest {
 					200000,    // timeout is very long (200 s)
 					0,
 					Integer.MAX_VALUE,
+					ExternalizedCheckpointSettings.none(),
 					new ExecutionVertex[] { triggerVertex },
 					new ExecutionVertex[] { ackVertex },
 					new ExecutionVertex[] { commitVertex },
 					new StandaloneCheckpointIDCounter(),
-					new StandaloneCompletedCheckpointStore(2, cl),
-					new HeapSavepointStore(),
+					new StandaloneCompletedCheckpointStore(2),
+					null,
 					new DisabledCheckpointStatsTracker());
 
 			
@@ -1133,7 +1149,7 @@ public class CheckpointCoordinatorTest {
 			assertTrue(numCallsSoFar == numCalls.get() ||
 					numCallsSoFar + 1 == numCalls.get());
 
-			coord.shutdown();
+			coord.shutdown(JobStatus.FINISHED);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -1172,12 +1188,13 @@ public class CheckpointCoordinatorTest {
 					200000,    // timeout is very long (200 s)
 					500,    // 500ms delay between checkpoints
 					10,
+					ExternalizedCheckpointSettings.none(),
 					new ExecutionVertex[] { vertex1 },
 					new ExecutionVertex[] { vertex1 },
 					new ExecutionVertex[] { vertex1 },
 					new StandaloneCheckpointIDCounter(),
-					new StandaloneCompletedCheckpointStore(2, cl),
-					new HeapSavepointStore(),
+					new StandaloneCompletedCheckpointStore(2),
+					null,
 					new DisabledCheckpointStatsTracker());
 
 			coord.startCheckpointScheduler();
@@ -1215,7 +1232,7 @@ public class CheckpointCoordinatorTest {
 
 			coord.stopCheckpointScheduler();
 
-			coord.shutdown();
+			coord.shutdown(JobStatus.FINISHED);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -1256,20 +1273,22 @@ public class CheckpointCoordinatorTest {
 				600000,
 				0,
 				Integer.MAX_VALUE,
+				ExternalizedCheckpointSettings.none(),
 				new ExecutionVertex[] { vertex1, vertex2 },
 				new ExecutionVertex[] { vertex1, vertex2 },
 				new ExecutionVertex[] { vertex1, vertex2 },
 				new StandaloneCheckpointIDCounter(),
-				new StandaloneCompletedCheckpointStore(1, cl),
-				new HeapSavepointStore(),
+				new StandaloneCompletedCheckpointStore(1),
+				null,
 				new DisabledCheckpointStatsTracker());
 
 		assertEquals(0, coord.getNumberOfPendingCheckpoints());
 		assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
 
 		// trigger the first checkpoint. this should succeed
-		Future<String> savepointFuture = coord.triggerSavepoint(timestamp);
-		assertFalse(savepointFuture.isCompleted());
+		String savepointDir = tmpFolder.newFolder().getAbsolutePath();
+		Future<CompletedCheckpoint> savepointFuture = coord.triggerSavepoint(timestamp, savepointDir);
+		assertFalse(savepointFuture.isDone());
 
 		// validate that we have a pending savepoint
 		assertEquals(1, coord.getNumberOfPendingCheckpoints());
@@ -1287,7 +1306,7 @@ public class CheckpointCoordinatorTest {
 		assertFalse(pending.isDiscarded());
 		assertFalse(pending.isFullyAcknowledged());
 		assertFalse(pending.canBeSubsumed());
-		assertTrue(pending instanceof PendingSavepoint);
+		assertTrue(pending instanceof PendingCheckpoint);
 
 		CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, 0L);
 
@@ -1297,13 +1316,13 @@ public class CheckpointCoordinatorTest {
 		assertEquals(1, pending.getNumberOfNonAcknowledgedTasks());
 		assertFalse(pending.isDiscarded());
 		assertFalse(pending.isFullyAcknowledged());
-		assertFalse(savepointFuture.isCompleted());
+		assertFalse(savepointFuture.isDone());
 
 		// acknowledge the same task again (should not matter)
 		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointMetaData));
 		assertFalse(pending.isDiscarded());
 		assertFalse(pending.isFullyAcknowledged());
-		assertFalse(savepointFuture.isCompleted());
+		assertFalse(savepointFuture.isDone());
 
 		// acknowledge the other task.
 		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointMetaData));
@@ -1311,7 +1330,7 @@ public class CheckpointCoordinatorTest {
 		// the checkpoint is internally converted to a successful checkpoint and the
 		// pending checkpoint object is disposed
 		assertTrue(pending.isDiscarded());
-		assertTrue(savepointFuture.isCompleted());
+		assertTrue(savepointFuture.isDone());
 
 		// the now we should have a completed checkpoint
 		assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -1335,9 +1354,8 @@ public class CheckpointCoordinatorTest {
 		// trigger another checkpoint and see that this one replaces the other checkpoint
 		// ---------------
 		final long timestampNew = timestamp + 7;
-		savepointFuture = coord.triggerSavepoint(timestampNew);
-		assertFalse(savepointFuture.isCompleted());
-
+		savepointFuture = coord.triggerSavepoint(timestampNew, savepointDir);
+		assertFalse(savepointFuture.isDone());
 
 		long checkpointIdNew = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
 		CheckpointMetaData checkpointMetaDataNew = new CheckpointMetaData(checkpointIdNew, 0L);
@@ -1352,7 +1370,7 @@ public class CheckpointCoordinatorTest {
 		assertEquals(timestampNew, successNew.getTimestamp());
 		assertEquals(checkpointIdNew, successNew.getCheckpointID());
 		assertTrue(successNew.getTaskStates().isEmpty());
-		assertTrue(savepointFuture.isCompleted());
+		assertTrue(savepointFuture.isDone());
 
 		// validate that the relevant tasks got a confirmation message
 		{
@@ -1367,7 +1385,7 @@ public class CheckpointCoordinatorTest {
 			verify(vertex2, times(1)).sendMessageToCurrentExecution(eq(confirmMessage2), eq(attemptID2));
 		}
 
-		coord.shutdown();
+		coord.shutdown(JobStatus.FINISHED);
 	}
 
 	/**
@@ -1396,16 +1414,19 @@ public class CheckpointCoordinatorTest {
 				600000,
 				0,
 				Integer.MAX_VALUE,
+				ExternalizedCheckpointSettings.none(),
 				new ExecutionVertex[] { vertex1, vertex2 },
 				new ExecutionVertex[] { vertex1, vertex2 },
 				new ExecutionVertex[] { vertex1, vertex2 },
 				counter,
-				new StandaloneCompletedCheckpointStore(10, cl),
-				new HeapSavepointStore(),
+				new StandaloneCompletedCheckpointStore(10),
+				null,
 				new DisabledCheckpointStatsTracker());
 
+		String savepointDir = tmpFolder.newFolder().getAbsolutePath();
+
 		// Trigger savepoint and checkpoint
-		Future<String> savepointFuture1 = coord.triggerSavepoint(timestamp);
+		Future<CompletedCheckpoint> savepointFuture1 = coord.triggerSavepoint(timestamp, savepointDir);
 		long savepointId1 = counter.getLast();
 		CheckpointMetaData checkpointMetaDataS1 = new CheckpointMetaData(savepointId1, 0L);
 		assertEquals(1, coord.getNumberOfPendingCheckpoints());
@@ -1427,12 +1448,12 @@ public class CheckpointCoordinatorTest {
 		assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints());
 
 		assertFalse(coord.getPendingCheckpoints().get(savepointId1).isDiscarded());
-		assertFalse(savepointFuture1.isCompleted());
+		assertFalse(savepointFuture1.isDone());
 
 		assertTrue(coord.triggerCheckpoint(timestamp + 3));
 		assertEquals(2, coord.getNumberOfPendingCheckpoints());
 
-		Future<String> savepointFuture2 = coord.triggerSavepoint(timestamp + 4);
+		Future<CompletedCheckpoint> savepointFuture2 = coord.triggerSavepoint(timestamp + 4, savepointDir);
 		long savepointId2 = counter.getLast();
 		CheckpointMetaData checkpointMetaDataS2 = new CheckpointMetaData(savepointId2, 0L);
 		assertEquals(3, coord.getNumberOfPendingCheckpoints());
@@ -1445,8 +1466,8 @@ public class CheckpointCoordinatorTest {
 		assertEquals(2, coord.getNumberOfRetainedSuccessfulCheckpoints());
 		assertFalse(coord.getPendingCheckpoints().get(savepointId1).isDiscarded());
 
-		assertFalse(savepointFuture1.isCompleted());
-		assertTrue(savepointFuture2.isCompleted());
+		assertFalse(savepointFuture1.isDone());
+		assertTrue(savepointFuture2.isDone());
 
 		// Ack first savepoint
 		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointMetaDataS1));
@@ -1454,7 +1475,7 @@ public class CheckpointCoordinatorTest {
 
 		assertEquals(0, coord.getNumberOfPendingCheckpoints());
 		assertEquals(3, coord.getNumberOfRetainedSuccessfulCheckpoints());
-		assertTrue(savepointFuture1.isCompleted());
+		assertTrue(savepointFuture1.isDone());
 	}
 
 	private void testMaxConcurrentAttempts(int maxConcurrentAttempts) {
@@ -1486,12 +1507,13 @@ public class CheckpointCoordinatorTest {
 					200000,    // timeout is very long (200 s)
 					0L,        // no extra delay
 					maxConcurrentAttempts,
+					ExternalizedCheckpointSettings.none(),
 					new ExecutionVertex[] { triggerVertex },
 					new ExecutionVertex[] { ackVertex },
 					new ExecutionVertex[] { commitVertex },
 					new StandaloneCheckpointIDCounter(),
-					new StandaloneCompletedCheckpointStore(2, cl),
-					new HeapSavepointStore(),
+					new StandaloneCompletedCheckpointStore(2),
+					null,
 					new DisabledCheckpointStatsTracker());
 
 			coord.startCheckpointScheduler();
@@ -1529,7 +1551,7 @@ public class CheckpointCoordinatorTest {
 			Thread.sleep(200);
 			assertEquals(maxConcurrentAttempts + 1, numCalls.get());
 			
-			coord.shutdown();
+			coord.shutdown(JobStatus.FINISHED);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -1558,12 +1580,13 @@ public class CheckpointCoordinatorTest {
 					200000,    // timeout is very long (200 s)
 					0L,        // no extra delay
 					maxConcurrentAttempts, // max two concurrent checkpoints
+					ExternalizedCheckpointSettings.none(),
 					new ExecutionVertex[] { triggerVertex },
 					new ExecutionVertex[] { ackVertex },
 					new ExecutionVertex[] { commitVertex },
 					new StandaloneCheckpointIDCounter(),
-					new StandaloneCompletedCheckpointStore(2, cl),
-					new HeapSavepointStore(),
+					new StandaloneCompletedCheckpointStore(2),
+					null,
 					new DisabledCheckpointStatsTracker());
 
 			coord.startCheckpointScheduler();
@@ -1602,7 +1625,7 @@ public class CheckpointCoordinatorTest {
 			assertNotNull(coord.getPendingCheckpoints().get(3L));
 			assertNotNull(coord.getPendingCheckpoints().get(4L));
 			
-			coord.shutdown();
+			coord.shutdown(JobStatus.FINISHED);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -1639,12 +1662,13 @@ public class CheckpointCoordinatorTest {
 					200000,    // timeout is very long (200 s)
 					0L,        // no extra delay
 					2, // max two concurrent checkpoints
+					ExternalizedCheckpointSettings.none(),
 					new ExecutionVertex[] { triggerVertex },
 					new ExecutionVertex[] { ackVertex },
 					new ExecutionVertex[] { commitVertex },
 					new StandaloneCheckpointIDCounter(),
-					new StandaloneCompletedCheckpointStore(2, cl),
-					new HeapSavepointStore(),
+					new StandaloneCompletedCheckpointStore(2),
+					null,
 					new DisabledCheckpointStatsTracker());
 			
 			coord.startCheckpointScheduler();
@@ -1690,26 +1714,29 @@ public class CheckpointCoordinatorTest {
 				200000,
 				0L,
 				1, // max one checkpoint at a time => should not affect savepoints
+				ExternalizedCheckpointSettings.none(),
 				new ExecutionVertex[] { vertex1 },
 				new ExecutionVertex[] { vertex1 },
 				new ExecutionVertex[] { vertex1 },
 				checkpointIDCounter,
-				new StandaloneCompletedCheckpointStore(2, cl),
-				new HeapSavepointStore(),
+				new StandaloneCompletedCheckpointStore(2),
+				null,
 				new DisabledCheckpointStatsTracker());
 
-		List<Future<String>> savepointFutures = new ArrayList<>();
+		List<Future<CompletedCheckpoint>> savepointFutures = new ArrayList<>();
 
 		int numSavepoints = 5;
 
+		String savepointDir = tmpFolder.newFolder().getAbsolutePath();
+
 		// Trigger savepoints
 		for (int i = 0; i < numSavepoints; i++) {
-			savepointFutures.add(coord.triggerSavepoint(i));
+			savepointFutures.add(coord.triggerSavepoint(i, savepointDir));
 		}
 
 		// After triggering multiple savepoints, all should in progress
-		for (Future<String> savepointFuture : savepointFutures) {
-			assertFalse(savepointFuture.isCompleted());
+		for (Future<CompletedCheckpoint> savepointFuture : savepointFutures) {
+			assertFalse(savepointFuture.isDone());
 		}
 
 		// ACK all savepoints
@@ -1719,8 +1746,8 @@ public class CheckpointCoordinatorTest {
 		}
 
 		// After ACKs, all should be completed
-		for (Future<String> savepointFuture : savepointFutures) {
-			assertTrue(savepointFuture.isCompleted());
+		for (Future<CompletedCheckpoint> savepointFuture : savepointFutures) {
+			assertTrue(savepointFuture.isDone());
 		}
 	}
 
@@ -1740,19 +1767,22 @@ public class CheckpointCoordinatorTest {
 				200000,
 				100000000L, // very long min delay => should not affect savepoints
 				1,
+				ExternalizedCheckpointSettings.none(),
 				new ExecutionVertex[] { vertex1 },
 				new ExecutionVertex[] { vertex1 },
 				new ExecutionVertex[] { vertex1 },
 				new StandaloneCheckpointIDCounter(),
-				new StandaloneCompletedCheckpointStore(2, cl),
-				new HeapSavepointStore(),
+				new StandaloneCompletedCheckpointStore(2),
+				null,
 				new DisabledCheckpointStatsTracker());
 
-		Future<String> savepoint0 = coord.triggerSavepoint(0);
-		assertFalse("Did not trigger savepoint", savepoint0.isCompleted());
+		String savepointDir = tmpFolder.newFolder().getAbsolutePath();
+
+		Future<CompletedCheckpoint> savepoint0 = coord.triggerSavepoint(0, savepointDir);
+		assertFalse("Did not trigger savepoint", savepoint0.isDone());
 
-		Future<String> savepoint1 = coord.triggerSavepoint(1);
-		assertFalse("Did not trigger savepoint", savepoint1.isCompleted());
+		Future<CompletedCheckpoint> savepoint1 = coord.triggerSavepoint(1, savepointDir);
+		assertFalse("Did not trigger savepoint", savepoint1.isDone());
 	}
 
 	// ------------------------------------------------------------------------
@@ -1796,18 +1826,19 @@ public class CheckpointCoordinatorTest {
 
 		// set up the coordinator and validate the initial state
 		CheckpointCoordinator coord = new CheckpointCoordinator(
-			jid,
-			600000,
-			600000,
+				jid,
+				600000,
+				600000,
 				0,
 				Integer.MAX_VALUE,
-			arrayExecutionVertices,
-			arrayExecutionVertices,
-			arrayExecutionVertices,
-			new StandaloneCheckpointIDCounter(),
-			new StandaloneCompletedCheckpointStore(1, cl),
-			new HeapSavepointStore(),
-			new DisabledCheckpointStatsTracker());
+				ExternalizedCheckpointSettings.none(),
+				arrayExecutionVertices,
+				arrayExecutionVertices,
+				arrayExecutionVertices,
+				new StandaloneCheckpointIDCounter(),
+				new StandaloneCompletedCheckpointStore(1),
+				null,
+				new DisabledCheckpointStatsTracker());
 
 		// trigger the checkpoint
 		coord.triggerCheckpoint(timestamp);
@@ -1900,18 +1931,19 @@ public class CheckpointCoordinatorTest {
 
 		// set up the coordinator and validate the initial state
 		CheckpointCoordinator coord = new CheckpointCoordinator(
-			jid,
-			600000,
-			600000,
-			0,
-			Integer.MAX_VALUE,
-			arrayExecutionVertices,
-			arrayExecutionVertices,
-			arrayExecutionVertices,
-			new StandaloneCheckpointIDCounter(),
-			new StandaloneCompletedCheckpointStore(1, cl),
-			new HeapSavepointStore(),
-			new DisabledCheckpointStatsTracker());
+				jid,
+				600000,
+				600000,
+				0,
+				Integer.MAX_VALUE,
+				ExternalizedCheckpointSettings.none(),
+				arrayExecutionVertices,
+				arrayExecutionVertices,
+				arrayExecutionVertices,
+				new StandaloneCheckpointIDCounter(),
+				new StandaloneCompletedCheckpointStore(1),
+				null,
+				new DisabledCheckpointStatsTracker());
 
 		// trigger the checkpoint
 		coord.triggerCheckpoint(timestamp);
@@ -2014,18 +2046,19 @@ public class CheckpointCoordinatorTest {
 
 		// set up the coordinator and validate the initial state
 		CheckpointCoordinator coord = new CheckpointCoordinator(
-			jid,
-			600000,
-			600000,
-			0,
-			Integer.MAX_VALUE,
-			arrayExecutionVertices,
-			arrayExecutionVertices,
-			arrayExecutionVertices,
-			new StandaloneCheckpointIDCounter(),
-			new StandaloneCompletedCheckpointStore(1, cl),
-			new HeapSavepointStore(),
-			new DisabledCheckpointStatsTracker());
+				jid,
+				600000,
+				600000,
+				0,
+				Integer.MAX_VALUE,
+				ExternalizedCheckpointSettings.none(),
+				arrayExecutionVertices,
+				arrayExecutionVertices,
+				arrayExecutionVertices,
+				new StandaloneCheckpointIDCounter(),
+				new StandaloneCompletedCheckpointStore(1),
+				null,
+				new DisabledCheckpointStatsTracker());
 
 		// trigger the checkpoint
 		coord.triggerCheckpoint(timestamp);
@@ -2141,12 +2174,13 @@ public class CheckpointCoordinatorTest {
 				600000,
 				0,
 				Integer.MAX_VALUE,
+				ExternalizedCheckpointSettings.none(),
 				arrayExecutionVertices,
 				arrayExecutionVertices,
 				arrayExecutionVertices,
 				new StandaloneCheckpointIDCounter(),
-				new StandaloneCompletedCheckpointStore(1, cl),
-				new HeapSavepointStore(),
+				new StandaloneCompletedCheckpointStore(1),
+				null,
 				new DisabledCheckpointStatsTracker());
 
 		// trigger the checkpoint
@@ -2235,34 +2269,57 @@ public class CheckpointCoordinatorTest {
 		comparePartitionableState(originalPartitionableStates, actualPartitionableStates);
 	}
 
-	// ------------------------------------------------------------------------
-	//  Utilities
-	// ------------------------------------------------------------------------
-
-	static void sendAckMessageToCoordinator(
-			CheckpointCoordinator coord,
-			long checkpointId, JobID jid,
-			ExecutionJobVertex jobVertex,
-			JobVertexID jobVertexID,
-			List<KeyGroupRange> keyGroupPartitions) throws Exception {
+	/**
+	 * Tests that the externalized checkpoint configuration is respected.
+	 */
+	@Test
+	public void testExternalizedCheckpoints() throws Exception {
+		try {
+			final JobID jid = new JobID();
+			final long timestamp = System.currentTimeMillis();
 
-		for (int index = 0; index < jobVertex.getParallelism(); index++) {
-			ChainedStateHandle<StreamStateHandle> state = generateStateForVertex(jobVertexID, index);
-			List<KeyGroupsStateHandle> keyGroupState = generateKeyGroupState(
-					jobVertexID,
-					keyGroupPartitions.get(index));
+			// create some mock Execution vertices that receive the checkpoint trigger messages
+			final ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
+			ExecutionVertex vertex1 = mockExecutionVertex(attemptID1);
 
-			CheckpointStateHandles checkpointStateHandles = new CheckpointStateHandles(state, null, keyGroupState);
-			AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
+			// set up the coordinator and validate the initial state
+			CheckpointCoordinator coord = new CheckpointCoordinator(
 					jid,
-					jobVertex.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
-					new CheckpointMetaData(checkpointId, 0L),
-					checkpointStateHandles);
+					600000,
+					600000,
+					0,
+					Integer.MAX_VALUE,
+					ExternalizedCheckpointSettings.externalizeCheckpoints(true),
+					new ExecutionVertex[] { vertex1 },
+					new ExecutionVertex[] { vertex1 },
+					new ExecutionVertex[] { vertex1 },
+					new StandaloneCheckpointIDCounter(),
+					new StandaloneCompletedCheckpointStore(1),
+					"fake-directory",
+					new DisabledCheckpointStatsTracker());
 
-			coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
+			assertTrue(coord.triggerCheckpoint(timestamp));
+
+			for (PendingCheckpoint checkpoint : coord.getPendingCheckpoints().values()) {
+				CheckpointProperties props = checkpoint.getProps();
+				CheckpointProperties expected = CheckpointProperties.forExternalizedCheckpoint(true);
+
+				assertEquals(expected, props);
+			}
+
+			// the now we should have a completed checkpoint
+			coord.shutdown(JobStatus.FINISHED);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
 		}
 	}
 
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+
 	public static List<KeyGroupsStateHandle> generateKeyGroupState(
 			JobVertexID jobVertexID,
 			KeyGroupRange keyGroupPartition) throws IOException {
@@ -2720,4 +2777,96 @@ public class CheckpointCoordinatorTest {
 		Assert.assertEquals(expected, actual);
 	}
 
+	@Test
+	public void testDeclineCheckpointRespectsProperties() throws Exception {
+		final JobID jid = new JobID();
+		final long timestamp = System.currentTimeMillis();
+
+		// create some mock Execution vertices that receive the checkpoint trigger messages
+		final ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
+		ExecutionVertex vertex1 = mockExecutionVertex(attemptID1);
+
+		// set up the coordinator and validate the initial state
+		CheckpointCoordinator coord = new CheckpointCoordinator(
+				jid,
+				600000,
+				600000,
+				0,
+				Integer.MAX_VALUE,
+				ExternalizedCheckpointSettings.none(),
+				new ExecutionVertex[] { vertex1 },
+				new ExecutionVertex[] { vertex1 },
+				new ExecutionVertex[] { vertex1 },
+				new StandaloneCheckpointIDCounter(),
+				new StandaloneCompletedCheckpointStore(1),
+				null,
+				new DisabledCheckpointStatsTracker());
+
+		assertEquals(0, coord.getNumberOfPendingCheckpoints());
+		assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+
+		// trigger the first checkpoint. this should succeed
+		CheckpointProperties props = CheckpointProperties.forStandardSavepoint();
+		String targetDirectory = "xjasdkjakshdmmmxna";
+
+		CheckpointTriggerResult triggerResult = coord.triggerCheckpoint(timestamp, props, targetDirectory);
+		assertEquals(true, triggerResult.isSuccess());
+
+		// validate that we have a pending checkpoint
+		assertEquals(1, coord.getNumberOfPendingCheckpoints());
+		assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+
+		long checkpointId = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
+		PendingCheckpoint checkpoint = coord.getPendingCheckpoints().get(checkpointId);
+
+		assertNotNull(checkpoint);
+		assertEquals(checkpointId, checkpoint.getCheckpointId());
+		assertEquals(timestamp, checkpoint.getCheckpointTimestamp());
+		assertEquals(jid, checkpoint.getJobId());
+		assertEquals(1, checkpoint.getNumberOfNonAcknowledgedTasks());
+		assertEquals(0, checkpoint.getNumberOfAcknowledgedTasks());
+		assertEquals(0, checkpoint.getTaskStates().size());
+		assertFalse(checkpoint.isDiscarded());
+		assertFalse(checkpoint.isFullyAcknowledged());
+		assertEquals(props, checkpoint.getProps());
+		assertEquals(targetDirectory, checkpoint.getTargetDirectory());
+
+		{
+			// check that the vertices received the trigger checkpoint message
+			TriggerCheckpoint expectedMessage1 = new TriggerCheckpoint(jid, attemptID1, checkpointId, timestamp);
+			verify(vertex1, times(1)).sendMessageToCurrentExecution(eq(expectedMessage1), eq(attemptID1));
+		}
+
+		// decline checkpoint, this should cancel the checkpoint and re-trigger with correct properties
+		coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpointId, checkpoint.getCheckpointTimestamp()));
+		assertTrue(checkpoint.isDiscarded());
+
+		// validate that we have a new pending checkpoint
+		assertEquals(1, coord.getNumberOfPendingCheckpoints());
+		assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+
+		long checkpointIdNew = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
+		PendingCheckpoint checkpointNew = coord.getPendingCheckpoints().get(checkpointIdNew);
+
+		assertNotNull(checkpointNew);
+		assertEquals(checkpointIdNew, checkpointNew.getCheckpointId());
+		assertEquals(jid, checkpointNew.getJobId());
+		assertEquals(1, checkpointNew.getNumberOfNonAcknowledgedTasks());
+		assertEquals(0, checkpointNew.getNumberOfAcknowledgedTasks());
+		assertEquals(0, checkpointNew.getTaskStates().size());
+		assertFalse(checkpointNew.isDiscarded());
+		assertFalse(checkpointNew.isFullyAcknowledged());
+		assertNotEquals(checkpoint.getCheckpointId(), checkpointNew.getCheckpointId());
+		// Respect the properties and target directory from the initial trigger
+		assertEquals(props, checkpointNew.getProps());
+		assertEquals(targetDirectory, checkpointNew.getTargetDirectory());
+
+		// check that the vertices received the new trigger checkpoint message
+		{
+			TriggerCheckpoint expectedMessage1 = new TriggerCheckpoint(jid, attemptID1, checkpointIdNew, checkpointNew.getCheckpointTimestamp());
+			verify(vertex1, times(1)).sendMessageToCurrentExecution(eq(expectedMessage1), eq(attemptID1));
+		}
+
+		coord.shutdown(JobStatus.FINISHED);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounterTest.java
index 49b5fe7..9ece607 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounterTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
 import org.apache.flink.util.TestLogger;
 import org.junit.AfterClass;
@@ -76,7 +77,7 @@ public abstract class CheckpointIDCounterTest extends TestLogger {
 			CuratorFramework client = ZooKeeper.getClient();
 			assertNotNull(client.checkExists().forPath("/checkpoint-id-counter"));
 
-			counter.shutdown();
+			counter.shutdown(JobStatus.FINISHED);
 			assertNull(client.checkExists().forPath("/checkpoint-id-counter"));
 		}
 
@@ -91,7 +92,7 @@ public abstract class CheckpointIDCounterTest extends TestLogger {
 			CuratorFramework client = ZooKeeper.getClient();
 			assertNotNull(client.checkExists().forPath("/checkpoint-id-counter"));
 
-			counter.suspend();
+			counter.shutdown(JobStatus.SUSPENDED);
 			assertNotNull(client.checkExists().forPath("/checkpoint-id-counter"));
 		}
 
@@ -120,7 +121,7 @@ public abstract class CheckpointIDCounterTest extends TestLogger {
 			assertEquals(4, counter.getAndIncrement());
 		}
 		finally {
-			counter.shutdown();
+			counter.shutdown(JobStatus.FINISHED);
 		}
 	}
 
@@ -183,7 +184,7 @@ public abstract class CheckpointIDCounterTest extends TestLogger {
 				executor.shutdown();
 			}
 
-			counter.shutdown();
+			counter.shutdown(JobStatus.FINISHED);
 		}
 	}
 
@@ -200,7 +201,7 @@ public abstract class CheckpointIDCounterTest extends TestLogger {
 		assertEquals(1337, counter.getAndIncrement());
 		assertEquals(1338, counter.getAndIncrement());
 
-		counter.shutdown();
+		counter.shutdown(JobStatus.FINISHED);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java
index 5772fae..c996886 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java
@@ -23,15 +23,66 @@ import org.junit.Test;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+/**
+ * Tests for the default checkpoint properties.
+ */
 public class CheckpointPropertiesTest {
 
+	/**
+	 * Tests the default checkpoint properties.
+	 */
 	@Test
 	public void testCheckpointProperties() {
-		assertFalse(CheckpointProperties.forStandardCheckpoint().isSavepoint());
+		CheckpointProperties props = CheckpointProperties.forStandardCheckpoint();
+
+		assertFalse(props.forceCheckpoint());
+		assertFalse(props.externalizeCheckpoint());
+		assertTrue(props.discardOnSubsumed());
+		assertTrue(props.discardOnJobFinished());
+		assertTrue(props.discardOnJobCancelled());
+		assertTrue(props.discardOnJobFailed());
+		assertTrue(props.discardOnJobSuspended());
 	}
 
+	/**
+	 * Tests the external checkpoints properties.
+	 */
+	@Test
+	public void testPersistentCheckpointProperties() {
+		CheckpointProperties props = CheckpointProperties.forExternalizedCheckpoint(true);
+
+		assertFalse(props.forceCheckpoint());
+		assertTrue(props.externalizeCheckpoint());
+		assertTrue(props.discardOnSubsumed());
+		assertTrue(props.discardOnJobFinished());
+		assertTrue(props.discardOnJobCancelled());
+		assertFalse(props.discardOnJobFailed());
+		assertTrue(props.discardOnJobSuspended());
+
+		props = CheckpointProperties.forExternalizedCheckpoint(false);
+
+		assertFalse(props.forceCheckpoint());
+		assertTrue(props.externalizeCheckpoint());
+		assertTrue(props.discardOnSubsumed());
+		assertTrue(props.discardOnJobFinished());
+		assertFalse(props.discardOnJobCancelled());
+		assertFalse(props.discardOnJobFailed());
+		assertTrue(props.discardOnJobSuspended());
+	}
+
+	/**
+	 * Tests the default (manually triggered) savepoint properties.
+	 */
 	@Test
 	public void testSavepointProperties() {
-		assertTrue(CheckpointProperties.forStandardSavepoint().isSavepoint());
+		CheckpointProperties props = CheckpointProperties.forStandardSavepoint();
+
+		assertTrue(props.forceCheckpoint());
+		assertTrue(props.externalizeCheckpoint());
+		assertFalse(props.discardOnSubsumed());
+		assertFalse(props.discardOnJobFinished());
+		assertFalse(props.discardOnJobCancelled());
+		assertFalse(props.discardOnJobFailed());
+		assertFalse(props.discardOnJobSuspended());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
index c20c604..b4dcab5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.checkpoint.savepoint.HeapSavepointStore;
 import org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.Execution;
@@ -27,6 +26,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.state.ChainedStateHandle;
 import org.apache.flink.runtime.state.CheckpointStateHandles;
@@ -58,8 +58,6 @@ import static org.mockito.Mockito.when;
  */
 public class CheckpointStateRestoreTest {
 
-	private static final ClassLoader cl = Thread.currentThread().getContextClassLoader();
-
 	@Test
 	public void testSetState() {
 		try {
@@ -101,12 +99,13 @@ public class CheckpointStateRestoreTest {
 					200000L,
 					0,
 					Integer.MAX_VALUE,
+					ExternalizedCheckpointSettings.none(),
 					new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 },
 					new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 },
 					new ExecutionVertex[0],
 					new StandaloneCheckpointIDCounter(),
-					new StandaloneCompletedCheckpointStore(1, cl),
-					new HeapSavepointStore(),
+					new StandaloneCompletedCheckpointStore(1),
+					null,
 					new DisabledCheckpointStatsTracker());
 
 			// create ourselves a checkpoint with state
@@ -199,12 +198,13 @@ public class CheckpointStateRestoreTest {
 					200000L,
 					0,
 					Integer.MAX_VALUE,
+					ExternalizedCheckpointSettings.none(),
 					new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 },
 					new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 },
 					new ExecutionVertex[0],
 					new StandaloneCheckpointIDCounter(),
-					new StandaloneCompletedCheckpointStore(1, cl),
-					new HeapSavepointStore(),
+					new StandaloneCompletedCheckpointStore(1),
+					null,
 					new DisabledCheckpointStatsTracker());
 
 			// create ourselves a checkpoint with state
@@ -252,12 +252,13 @@ public class CheckpointStateRestoreTest {
 					200000L,
 					0,
 					Integer.MAX_VALUE,
+					ExternalizedCheckpointSettings.none(),
 					new ExecutionVertex[] { mock(ExecutionVertex.class) },
 					new ExecutionVertex[] { mock(ExecutionVertex.class) },
 					new ExecutionVertex[0],
 					new StandaloneCheckpointIDCounter(),
-					new StandaloneCompletedCheckpointStore(1, cl),
-					new HeapSavepointStore(),
+					new StandaloneCompletedCheckpointStore(1),
+					null,
 					new DisabledCheckpointStatsTracker());
 
 			try {


Mime
View raw message