flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [3/3] flink git commit: [FLINK-4322] [checkpointing] Add and fix tests for unified Checkpoint/Savepoint Coordinator
Date Wed, 17 Aug 2016 17:35:27 GMT
[FLINK-4322] [checkpointing] Add and fix tests for unified Checkpoint/Savepoint Coordinator

This closes #2366


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/47acdead
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/47acdead
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/47acdead

Branch: refs/heads/master
Commit: 47acdeadf3ccd326578306453dd10ed3c147a4e6
Parents: 76ca1a7
Author: Ufuk Celebi <uce@apache.org>
Authored: Thu Aug 11 19:40:07 2016 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Wed Aug 17 19:18:07 2016 +0200

----------------------------------------------------------------------
 .../checkpoint/CheckpointCoordinator.java       |  138 +--
 .../checkpoint/CheckpointTriggerResult.java     |   17 +-
 .../runtime/checkpoint/PendingSavepoint.java    |   11 +-
 .../StandaloneCheckpointIDCounter.java          |    9 +
 .../StandaloneCompletedCheckpointStore.java     |    2 +-
 .../checkpoint/savepoint/SavepointLoader.java   |    4 +-
 .../checkpoint/savepoint/SavepointV0.java       |    7 +-
 .../runtime/executiongraph/ExecutionGraph.java  |    6 -
 .../flink/runtime/jobmanager/JobManager.scala   |   20 +-
 .../checkpoint/CheckpointCoordinatorTest.java   |  598 +++++++---
 .../checkpoint/CheckpointStateRestoreTest.java  |   80 +-
 .../CompletedCheckpointStoreTest.java           |    2 +-
 .../checkpoint/CompletedCheckpointTest.java     |   58 +
 ...ExecutionGraphCheckpointCoordinatorTest.java |   33 +-
 .../checkpoint/PendingCheckpointTest.java       |  134 +++
 .../checkpoint/PendingSavepointTest.java        |  141 +++
 .../savepoint/HeapSavepointStoreTest.java       |   25 -
 .../savepoint/SavepointCoordinatorTest.java     | 1119 ------------------
 .../savepoint/SavepointLoaderTest.java          |  110 ++
 .../stats/SimpleCheckpointStatsTrackerTest.java |    5 +-
 .../runtime/jobmanager/JobManagerITCase.scala   |   32 +-
 .../test/checkpointing/SavepointITCase.java     |  297 +----
 22 files changed, 1033 insertions(+), 1815 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/47acdead/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 6d44e61..2a1ece0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.checkpoint;
 import akka.actor.ActorSystem;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
+import akka.dispatch.Futures;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore;
@@ -33,7 +34,6 @@ import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.AkkaActorGateway;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobmanager.RecoveryMode;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete;
@@ -45,9 +45,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import scala.concurrent.Future;
-import scala.concurrent.Promise;
 
-import javax.annotation.Nullable;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -69,10 +67,6 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * It triggers the checkpoint by sending the messages to the relevant tasks and collects the
  * checkpoint acknowledgements. It also collects and maintains the overview of the state handles
  * reported by the tasks that acknowledge the checkpoint.
- *
- * <p>Depending on the configured {@link RecoveryMode}, the behaviour of the {@link
- * CompletedCheckpointStore} and {@link CheckpointIDCounter} change. The default standalone
- * implementations don't support any recovery.
  */
 public class CheckpointCoordinator {
 
@@ -156,9 +150,6 @@ public class CheckpointCoordinator {
 	/** Flag marking the coordinator as shut down (not accepting any messages any more) */
 	private volatile boolean shutdown;
 
-	/** Shutdown hook thread to clean up state handles. */
-	private final Thread shutdownHook;
-
 	/** Helper for tracking checkpoint statistics  */
 	private final CheckpointStatsTracker statsTracker;
 
@@ -180,7 +171,6 @@ public class CheckpointCoordinator {
 			CheckpointIDCounter checkpointIDCounter,
 			CompletedCheckpointStore completedCheckpointStore,
 			SavepointStore savepointStore,
-			RecoveryMode recoveryMode,
 			CheckpointStatsTracker statsTracker) throws Exception {
 
 		// sanity checks
@@ -188,7 +178,6 @@ public class CheckpointCoordinator {
 		checkArgument(checkpointTimeout >= 1, "Checkpoint timeout must be larger than zero");
 		checkArgument(minPauseBetweenCheckpoints >= 0, "minPauseBetweenCheckpoints must be >= 0");
 		checkArgument(maxConcurrentCheckpointAttempts >= 1, "maxConcurrentCheckpointAttempts must be >= 1");
-		checkArgument(numberKeyGroups >= 1, "numberKeyGroups must be >= 1");
 
 		this.job = checkNotNull(job);
 		this.baseInterval = baseInterval;
@@ -198,52 +187,21 @@ public class CheckpointCoordinator {
 		this.tasksToTrigger = checkNotNull(tasksToTrigger);
 		this.tasksToWaitFor = checkNotNull(tasksToWaitFor);
 		this.tasksToCommitTo = checkNotNull(tasksToCommitTo);
-		this.pendingCheckpoints = new LinkedHashMap<Long, PendingCheckpoint>();
+		this.pendingCheckpoints = new LinkedHashMap<>();
 		this.checkpointIdCounter = checkNotNull(checkpointIDCounter);
 		this.completedCheckpointStore = checkNotNull(completedCheckpointStore);
 		this.savepointStore = checkNotNull(savepointStore);
-		this.recentPendingCheckpoints = new ArrayDeque<Long>(NUM_GHOST_CHECKPOINT_IDS);
+		this.recentPendingCheckpoints = new ArrayDeque<>(NUM_GHOST_CHECKPOINT_IDS);
 		this.userClassLoader = checkNotNull(userClassLoader);
 		this.statsTracker = checkNotNull(statsTracker);
 		this.numberKeyGroups = numberKeyGroups;
 
 		this.timer = new Timer("Checkpoint Timer", true);
 
-		if (recoveryMode == RecoveryMode.STANDALONE) {
-			// Add shutdown hook to clean up state handles when no checkpoint recovery is
-			// possible. In case of another configured recovery mode, the checkpoints need to be
-			// available for the standby job managers.
-			this.shutdownHook = new Thread(new Runnable() {
-				@Override
-				public void run() {
-					try {
-						CheckpointCoordinator.this.shutdown();
-					}
-					catch (Throwable t) {
-						LOG.error("Error during shutdown of checkpoint coordinator via " +
-								"JVM shutdown hook: " + t.getMessage(), t);
-					}
-				}
-			});
-
-			try {
-				// Add JVM shutdown hook to call shutdown of service
-				Runtime.getRuntime().addShutdownHook(shutdownHook);
-			}
-			catch (IllegalStateException ignored) {
-				// JVM is already shutting down. No need to do anything.
-			}
-			catch (Throwable t) {
-				LOG.error("Cannot register checkpoint coordinator shutdown hook.", t);
-			}
-		}
-		else {
-			this.shutdownHook = null;
-		}
-
-		// make sure the checkpoint ID enumerator is running
 		try {
-			checkpointIdCounter.start();
+			// Make sure the checkpoint ID enumerator is running. Possibly
+			// issues a blocking call to ZooKeeper.
+			checkpointIDCounter.start();
 		} catch (Exception e) {
 			throw new Exception("Failed to start checkpoint ID counter: " + e.getMessage(), e);
 		}
@@ -285,50 +243,34 @@ public class CheckpointCoordinator {
 	 */
 	private void shutdown(boolean shutdownStoreAndCounter) throws Exception {
 		synchronized (lock) {
-			try {
-				if (!shutdown) {
-					shutdown = true;
-					LOG.info("Stopping checkpoint coordinator for job " + job);
+			if (!shutdown) {
+				shutdown = true;
+				LOG.info("Stopping checkpoint coordinator for job " + job);
 
-					periodicScheduling = false;
-					triggerRequestQueued = false;
+				periodicScheduling = false;
+				triggerRequestQueued = false;
 
-					// shut down the thread that handles the timeouts and pending triggers
-					timer.cancel();
+				// shut down the thread that handles the timeouts and pending triggers
+				timer.cancel();
 
-					// make sure that the actor does not linger
-					if (jobStatusListener != null) {
-						jobStatusListener.tell(PoisonPill.getInstance());
-						jobStatusListener = null;
-					}
+				// make sure that the actor does not linger
+				if (jobStatusListener != null) {
+					jobStatusListener.tell(PoisonPill.getInstance());
+					jobStatusListener = null;
+				}
 
-					// clear and discard all pending checkpoints
-					for (PendingCheckpoint pending : pendingCheckpoints.values()) {
-						pending.abortError(new Exception("Checkpoint Coordinator is shutting down"));
-					}
-					pendingCheckpoints.clear();
-
-					if (shutdownStoreAndCounter) {
-						completedCheckpointStore.shutdown();
-						checkpointIdCounter.shutdown();
-					} else {
-						completedCheckpointStore.suspend();
-						checkpointIdCounter.suspend();
-					}
+				// clear and discard all pending checkpoints
+				for (PendingCheckpoint pending : pendingCheckpoints.values()) {
+					pending.abortError(new Exception("Checkpoint Coordinator is shutting down"));
 				}
-			} finally {
-				// Remove shutdown hook to prevent resource leaks, unless this is invoked by the
-				// shutdown hook itself.
-				if (shutdownHook != null && shutdownHook != Thread.currentThread()) {
-					try {
-						Runtime.getRuntime().removeShutdownHook(shutdownHook);
-					}
-					catch (IllegalStateException ignored) {
-						// race, JVM is in shutdown already, we can safely ignore this
-					}
-					catch (Throwable t) {
-						LOG.warn("Error unregistering checkpoint coordinator shutdown hook.", t);
-					}
+				pendingCheckpoints.clear();
+
+				if (shutdownStoreAndCounter) {
+					completedCheckpointStore.shutdown();
+					checkpointIdCounter.shutdown();
+				} else {
+					completedCheckpointStore.suspend();
+					checkpointIdCounter.suspend();
 				}
 			}
 		}
@@ -350,10 +292,7 @@ public class CheckpointCoordinator {
 			return savepoint.getCompletionFuture();
 		}
 		else {
-			final Promise<String> promise = new scala.concurrent.impl.Promise.DefaultPromise<>();
-			promise.failure(
-					new Exception("Failed to trigger savepoint: " + result.getFailureReason().message()));
-			return promise.future(); 
+			return Futures.failed(new Exception("Failed to trigger savepoint: " + result.getFailureReason().message()));
 		}
 	}
 
@@ -586,9 +525,8 @@ public class CheckpointCoordinator {
 				rememberRecentCheckpointId(checkpointId);
 
 				boolean haveMoreRecentPending = false;
-				Iterator<Map.Entry<Long, PendingCheckpoint>> entries = pendingCheckpoints.entrySet().iterator();
-				while (entries.hasNext()) {
-					PendingCheckpoint p = entries.next().getValue();
+
+				for (PendingCheckpoint p : pendingCheckpoints.values()) {
 					if (!p.isDiscarded() && p.getCheckpointTimestamp() >= checkpoint.getCheckpointTimestamp()) {
 						haveMoreRecentPending = true;
 						break;
@@ -746,7 +684,7 @@ public class CheckpointCoordinator {
 
 		while (entries.hasNext()) {
 			PendingCheckpoint p = entries.next().getValue();
-			if (p.getCheckpointTimestamp() < timestamp && p.canBeSubsumed()) {
+			if (p.getCheckpointTimestamp() <= timestamp && p.canBeSubsumed()) {
 				rememberRecentCheckpointId(p.getCheckpointId());
 				p.abortSubsumed();
 				entries.remove();
@@ -895,7 +833,7 @@ public class CheckpointCoordinator {
 
 	public Map<Long, PendingCheckpoint> getPendingCheckpoints() {
 		synchronized (lock) {
-			return new HashMap<Long, PendingCheckpoint>(this.pendingCheckpoints);
+			return new HashMap<>(this.pendingCheckpoints);
 		}
 	}
 
@@ -913,14 +851,6 @@ public class CheckpointCoordinator {
 		return checkpointIdCounter;
 	}
 
-	protected ActorGateway getJobStatusListener() {
-		return jobStatusListener;
-	}
-
-	protected void setJobStatusListener(ActorGateway jobStatusListener) {
-		this.jobStatusListener = jobStatusListener;
-	}
-
 	// --------------------------------------------------------------------------------------------
 	//  Periodic scheduling of checkpoints
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/47acdead/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointTriggerResult.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointTriggerResult.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointTriggerResult.java
index 3f91407..65dc73f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointTriggerResult.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointTriggerResult.java
@@ -21,8 +21,8 @@ package org.apache.flink.runtime.checkpoint;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * The result of triggering a checkpoint. May be a declined checkpoint trigger attempt,
- * or a pending checkpoint.
+ * The result of triggering a checkpoint. May either be a declined checkpoint
+ * trigger attempt, or a pending checkpoint.
  */
 class CheckpointTriggerResult {
 
@@ -68,7 +68,7 @@ class CheckpointTriggerResult {
 		if (success != null) {
 			return success;
 		} else {
-			throw new IllegalStateException();
+			throw new IllegalStateException("Checkpoint triggering failed");
 		}
 	}
 
@@ -76,7 +76,7 @@ class CheckpointTriggerResult {
 		if (failure != null) {
 			return failure;
 		} else {
-			throw new IllegalStateException();
+			throw new IllegalStateException("Checkpoint triggering was successful");
 		}
 	}
 
@@ -84,8 +84,9 @@ class CheckpointTriggerResult {
 
 	@Override
 	public String toString() {
-		return isSuccess() ? 
-				("success: " + success) :
-				("failure: " + failure.message()); 
+		return "CheckpointTriggerResult(" +
+				(isSuccess() ?
+						("success: " + success) :
+						("failure: " + failure.message())) + ")";
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/47acdead/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingSavepoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingSavepoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingSavepoint.java
index 92cdd04..460ff8e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingSavepoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingSavepoint.java
@@ -25,9 +25,7 @@ import org.apache.flink.runtime.checkpoint.savepoint.SavepointV0;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.util.ExceptionUtils;
-
 import org.slf4j.Logger;
-
 import scala.concurrent.Future;
 import scala.concurrent.Promise;
 
@@ -107,7 +105,14 @@ public class PendingSavepoint extends PendingCheckpoint {
 
 	@Override
 	public void abortSubsumed() throws Exception {
-		throw new Exception("Bug: Savepoints must never be subsumed");
+		try {
+			Exception e = new Exception("Bug: Savepoints must never be subsumed");
+			onCompletionPromise.failure(e);
+			throw e;
+		}
+		finally {
+			dispose(true);
+		}
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/47acdead/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java
index c0ea93d..0a235bc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java
@@ -50,4 +50,13 @@ public class StandaloneCheckpointIDCounter implements CheckpointIDCounter {
 	public void setCount(long newCount) {
 		checkpointIdCounter.set(newCount);
 	}
+
+	/**
+	 * Returns the last checkpoint ID (current - 10.
+	 *
+	 * @return Last checkpoint ID.
+	 */
+	public long getLast() {
+		return checkpointIdCounter.get() - 1;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/47acdead/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
index 0f6cf33..bc111cd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
@@ -30,7 +30,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 /**
  * {@link CompletedCheckpointStore} for JobManagers running in {@link RecoveryMode#STANDALONE}.
  */
-class StandaloneCompletedCheckpointStore implements CompletedCheckpointStore {
+public class StandaloneCompletedCheckpointStore implements CompletedCheckpointStore {
 
 	/** The maximum number of checkpoints to retain (at least 1). */
 	private final int maxNumberOfCheckpointsToRetain;

http://git-wip-us.apache.org/repos/asf/flink/blob/47acdead/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
index 0b7b0c2..1be7a58 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
@@ -27,8 +27,6 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import java.util.HashMap;
 import java.util.Map;
 
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
 /**
  * The SavepointLoader is a utility to load and verify a Savepoint, and to create a checkpoint from it. 
  */
@@ -54,7 +52,7 @@ public class SavepointLoader {
 			String savepointPath) throws Exception {
 
 		// (1) load the savepoint
-		Savepoint savepoint = savepointStore.loadSavepoint(checkNotNull(savepointPath));
+		Savepoint savepoint = savepointStore.loadSavepoint(savepointPath);
 		final Map<JobVertexID, TaskState> taskStates = new HashMap<>(savepoint.getTaskStates().size());
 		
 		// (2) validate it (parallelism, etc)

http://git-wip-us.apache.org/repos/asf/flink/blob/47acdead/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV0.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV0.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV0.java
index 9fd950d..d60d80e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV0.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV0.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.checkpoint.savepoint;
 import org.apache.flink.runtime.checkpoint.TaskState;
 import org.apache.flink.util.Preconditions;
 
+import java.util.ArrayList;
 import java.util.Collection;
 
 /**
@@ -37,11 +38,11 @@ public class SavepointV0 implements Savepoint {
 	private final long checkpointId;
 
 	/** The task states */
-	private final Collection<TaskState> taskStates;
+	private final Collection<TaskState> taskStates = new ArrayList();
 
-	SavepointV0(long checkpointId, Collection<TaskState> taskStates) {
+	public SavepointV0(long checkpointId, Collection<TaskState> taskStates) {
 		this.checkpointId = checkpointId;
-		this.taskStates = Preconditions.checkNotNull(taskStates, "Task States");
+		this.taskStates.addAll(taskStates);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/47acdead/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 82826dd..12d8e66 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.executiongraph;
 
 import akka.actor.ActorSystem;
-
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.accumulators.Accumulator;
@@ -49,7 +48,6 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.ScheduleMode;
-import org.apache.flink.runtime.jobmanager.RecoveryMode;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.messages.ExecutionGraphMessages;
@@ -59,10 +57,8 @@ import org.apache.flink.runtime.util.SerializableObject;
 import org.apache.flink.runtime.util.SerializedThrowable;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.SerializedValue;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import scala.concurrent.ExecutionContext;
 import scala.concurrent.duration.FiniteDuration;
 
@@ -353,7 +349,6 @@ public class ExecutionGraph {
 			UUID leaderSessionID,
 			CheckpointIDCounter checkpointIDCounter,
 			CompletedCheckpointStore checkpointStore,
-			RecoveryMode recoveryMode,
 			SavepointStore savepointStore,
 			CheckpointStatsTracker statsTracker) throws Exception {
 
@@ -389,7 +384,6 @@ public class ExecutionGraph {
 				checkpointIDCounter,
 				checkpointStore,
 				savepointStore,
-				recoveryMode,
 				checkpointStatsTracker);
 
 		// the periodic checkpoint scheduler is activated and deactivated as a result of

http://git-wip-us.apache.org/repos/asf/flink/blob/47acdead/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 9fb01bf..a82e89a 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -1093,10 +1093,10 @@ class JobManager(
           Option(jobGraph.getSerializedExecutionConfig()
             .deserializeValue(userCodeLoader)
             .getRestartStrategy())
-              .map(RestartStrategyFactory.createRestartStrategy(_)) match {
-                case Some(strategy) => strategy
-                case None => restartStrategyFactory.createRestartStrategy()
-              }
+            .map(RestartStrategyFactory.createRestartStrategy(_)) match {
+            case Some(strategy) => strategy
+            case None => restartStrategyFactory.createRestartStrategy()
+          }
 
         log.info(s"Using restart strategy $restartStrategy for $jobId.")
 
@@ -1253,7 +1253,6 @@ class JobManager(
             leaderSessionID.orNull,
             checkpointIdCounter,
             completedCheckpoints,
-            recoveryMode,
             savepointStore,
             checkpointStatsTracker)
         }
@@ -1294,7 +1293,6 @@ class JobManager(
       // because it is a blocking operation
       future {
         try {
-
           if (isRecovery) {
             // this is a recovery of a master failure (this master takes over)
             executionGraph.restoreLatestCheckpointedState()
@@ -1305,7 +1303,7 @@ class JobManager(
             val snapshotSettings = jobGraph.getSnapshotSettings
             if (snapshotSettings != null) {
               val savepointPath = snapshotSettings.getSavepointPath()
-              
+
               if (savepointPath != null) {
                 // got a savepoint
                 try {
@@ -1316,14 +1314,14 @@ class JobManager(
                     jobId, executionGraph.getAllVertices, savepointStore, savepointPath)
 
                   executionGraph.getCheckpointCoordinator.getCheckpointStore
-                      .addCheckpoint(savepoint)
-                  
+                    .addCheckpoint(savepoint)
+
                   // Reset the checkpoint ID counter
                   val nextCheckpointId: Long = savepoint.getCheckpointID + 1
                   log.info(s"Reset the checkpoint ID to $nextCheckpointId")
                   executionGraph.getCheckpointCoordinator.getCheckpointIdCounter
-                      .setCount(nextCheckpointId)
-                  
+                    .setCount(nextCheckpointId)
+
                   executionGraph.restoreLatestCheckpointedState()
                 } catch {
                   case e: Exception =>

http://git-wip-us.apache.org/repos/asf/flink/blob/47acdead/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 62af42b..3341095 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -19,13 +19,13 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.savepoint.HeapSavepointStore;
 import org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobmanager.RecoveryMode;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete;
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint;
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
+import scala.concurrent.Future;
 
 import java.io.Serializable;
 import java.util.Iterator;
@@ -80,17 +81,19 @@ public class CheckpointCoordinatorTest {
 
 			// set up the coordinator and validate the initial state
 			CheckpointCoordinator coord = new CheckpointCoordinator(
-				jid,
-				600000,
-				600000,
-				42,
-				new ExecutionVertex[] { triggerVertex1, triggerVertex2 },
-				new ExecutionVertex[] { ackVertex1, ackVertex2 },
-				new ExecutionVertex[] {},
-				cl,
-				new StandaloneCheckpointIDCounter(),
-				new StandaloneCompletedCheckpointStore(1, cl),
-				RecoveryMode.STANDALONE);
+					jid,
+					600000,
+					600000,
+					0, Integer.MAX_VALUE,
+					42,
+					new ExecutionVertex[] { triggerVertex1, triggerVertex2 },
+					new ExecutionVertex[] { ackVertex1, ackVertex2 },
+					new ExecutionVertex[] {},
+					cl,
+					new StandaloneCheckpointIDCounter(),
+					new StandaloneCompletedCheckpointStore(1, cl),
+					new HeapSavepointStore(),
+					new DisabledCheckpointStatsTracker());
 
 			// nothing should be happening
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -131,17 +134,20 @@ public class CheckpointCoordinatorTest {
 
 			// set up the coordinator and validate the initial state
 			CheckpointCoordinator coord = new CheckpointCoordinator(
-				jid,
-				600000,
-				600000,
-				42,
-				new ExecutionVertex[] { triggerVertex1, triggerVertex2 },
-				new ExecutionVertex[] { ackVertex1, ackVertex2 },
-				new ExecutionVertex[] {},
-				cl,
-				new StandaloneCheckpointIDCounter(),
-				new StandaloneCompletedCheckpointStore(1, cl),
-				RecoveryMode.STANDALONE);
+					jid,
+					600000,
+					600000,
+					0,
+					Integer.MAX_VALUE,
+					42,
+					new ExecutionVertex[] { triggerVertex1, triggerVertex2 },
+					new ExecutionVertex[] { ackVertex1, ackVertex2 },
+					new ExecutionVertex[] {},
+					cl,
+					new StandaloneCheckpointIDCounter(),
+					new StandaloneCompletedCheckpointStore(1, cl),
+					new HeapSavepointStore(),
+					new DisabledCheckpointStatsTracker());
 
 			// nothing should be happening
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -180,17 +186,20 @@ public class CheckpointCoordinatorTest {
 
 			// set up the coordinator and validate the initial state
 			CheckpointCoordinator coord = new CheckpointCoordinator(
-				jid,
-				600000,
-				600000,
-				42,
-				new ExecutionVertex[] { triggerVertex1, triggerVertex2 },
-				new ExecutionVertex[] { ackVertex1, ackVertex2 },
-				new ExecutionVertex[] {},
-				cl,
-				new StandaloneCheckpointIDCounter(),
-				new StandaloneCompletedCheckpointStore(1, cl),
-				RecoveryMode.STANDALONE);
+					jid,
+					600000,
+					600000,
+					0,
+					Integer.MAX_VALUE,
+					42,
+					new ExecutionVertex[] { triggerVertex1, triggerVertex2 },
+					new ExecutionVertex[] { ackVertex1, ackVertex2 },
+					new ExecutionVertex[] {},
+					cl,
+					new StandaloneCheckpointIDCounter(),
+					new StandaloneCompletedCheckpointStore(1, cl),
+					new HeapSavepointStore(),
+					new DisabledCheckpointStatsTracker());
 
 			// nothing should be happening
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -230,17 +239,20 @@ public class CheckpointCoordinatorTest {
 
 			// set up the coordinator and validate the initial state
 			CheckpointCoordinator coord = new CheckpointCoordinator(
-				jid,
-				600000,
-				600000,
-				42,
-				new ExecutionVertex[] { vertex1, vertex2 },
-				new ExecutionVertex[] { vertex1, vertex2 },
-				new ExecutionVertex[] { vertex1, vertex2 },
-				cl,
-				new StandaloneCheckpointIDCounter(),
-				new StandaloneCompletedCheckpointStore(1, cl),
-				RecoveryMode.STANDALONE);
+					jid,
+					600000,
+					600000,
+					0,
+					Integer.MAX_VALUE,
+					42,
+					new ExecutionVertex[] { vertex1, vertex2 },
+					new ExecutionVertex[] { vertex1, vertex2 },
+					new ExecutionVertex[] { vertex1, vertex2 },
+					cl,
+					new StandaloneCheckpointIDCounter(),
+					new StandaloneCompletedCheckpointStore(1, cl),
+					new HeapSavepointStore(),
+					new DisabledCheckpointStatsTracker());
 
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -353,17 +365,20 @@ public class CheckpointCoordinatorTest {
 
 			// set up the coordinator and validate the initial state
 			CheckpointCoordinator coord = new CheckpointCoordinator(
-				jid,
-				600000,
-				600000,
-				42,
-				new ExecutionVertex[] { vertex1, vertex2 },
-				new ExecutionVertex[] { vertex1, vertex2 },
-				new ExecutionVertex[] { vertex1, vertex2 },
-				cl,
-				new StandaloneCheckpointIDCounter(),
-				new StandaloneCompletedCheckpointStore(1, cl),
-				RecoveryMode.STANDALONE);
+					jid,
+					600000,
+					600000,
+					0,
+					Integer.MAX_VALUE,
+					42,
+					new ExecutionVertex[] { vertex1, vertex2 },
+					new ExecutionVertex[] { vertex1, vertex2 },
+					new ExecutionVertex[] { vertex1, vertex2 },
+					cl,
+					new StandaloneCheckpointIDCounter(),
+					new StandaloneCompletedCheckpointStore(1, cl),
+					new HeapSavepointStore(),
+					new DisabledCheckpointStatsTracker());
 
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -472,17 +487,20 @@ public class CheckpointCoordinatorTest {
 
 			// set up the coordinator and validate the initial state
 			CheckpointCoordinator coord = new CheckpointCoordinator(
-				jid,
-				600000,
-				600000,
-				42,
-				new ExecutionVertex[] { vertex1, vertex2 },
-				new ExecutionVertex[] { vertex1, vertex2 },
-				new ExecutionVertex[] { vertex1, vertex2 },
-				cl,
-				new StandaloneCheckpointIDCounter(),
-				new StandaloneCompletedCheckpointStore(1, cl),
-				RecoveryMode.STANDALONE);
+					jid,
+					600000,
+					600000,
+					0,
+					Integer.MAX_VALUE,
+					42,
+					new ExecutionVertex[] { vertex1, vertex2 },
+					new ExecutionVertex[] { vertex1, vertex2 },
+					new ExecutionVertex[] { vertex1, vertex2 },
+					cl,
+					new StandaloneCheckpointIDCounter(),
+					new StandaloneCompletedCheckpointStore(1, cl),
+					new HeapSavepointStore(),
+					new DisabledCheckpointStatsTracker());
 
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -621,17 +639,20 @@ public class CheckpointCoordinatorTest {
 
 			// set up the coordinator and validate the initial state
 			CheckpointCoordinator coord = new CheckpointCoordinator(
-				jid,
-				600000,
-				600000,
-				42,
-				new ExecutionVertex[] { triggerVertex1, triggerVertex2 },
-				new ExecutionVertex[] { ackVertex1, ackVertex2, ackVertex3 },
-				new ExecutionVertex[] { commitVertex },
-				cl,
-				new StandaloneCheckpointIDCounter(),
-				new StandaloneCompletedCheckpointStore(2, cl),
-				RecoveryMode.STANDALONE);
+					jid,
+					600000,
+					600000,
+					0,
+					Integer.MAX_VALUE,
+					42,
+					new ExecutionVertex[] { triggerVertex1, triggerVertex2 },
+					new ExecutionVertex[] { ackVertex1, ackVertex2, ackVertex3 },
+					new ExecutionVertex[] { commitVertex },
+					cl,
+					new StandaloneCheckpointIDCounter(),
+					new StandaloneCompletedCheckpointStore(2, cl),
+					new HeapSavepointStore(),
+					new DisabledCheckpointStatsTracker());
 
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -755,17 +776,20 @@ public class CheckpointCoordinatorTest {
 
 			// set up the coordinator and validate the initial state
 			CheckpointCoordinator coord = new CheckpointCoordinator(
-				jid,
-				600000,
-				600000,
-				42,
-				new ExecutionVertex[] { triggerVertex1, triggerVertex2 },
-				new ExecutionVertex[] { ackVertex1, ackVertex2, ackVertex3 },
-				new ExecutionVertex[] { commitVertex },
-				cl,
-				new StandaloneCheckpointIDCounter(),
-				new StandaloneCompletedCheckpointStore(10, cl),
-				RecoveryMode.STANDALONE);
+					jid,
+					600000,
+					600000,
+					0,
+					Integer.MAX_VALUE,
+					42,
+					new ExecutionVertex[] { triggerVertex1, triggerVertex2 },
+					new ExecutionVertex[] { ackVertex1, ackVertex2, ackVertex3 },
+					new ExecutionVertex[] { commitVertex },
+					cl,
+					new StandaloneCheckpointIDCounter(),
+					new StandaloneCompletedCheckpointStore(10, cl),
+					new HeapSavepointStore(),
+					new DisabledCheckpointStatsTracker());
 
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -875,17 +899,20 @@ public class CheckpointCoordinatorTest {
 			// the timeout for the checkpoint is a 200 milliseconds
 
 			CheckpointCoordinator coord = new CheckpointCoordinator(
-				jid,
-				600000,
-				200,
-				42,
-				new ExecutionVertex[] { triggerVertex },
-				new ExecutionVertex[] { ackVertex1, ackVertex2 },
-				new ExecutionVertex[] { commitVertex },
-				cl,
-				new StandaloneCheckpointIDCounter(),
-				new StandaloneCompletedCheckpointStore(2, cl),
-				RecoveryMode.STANDALONE);
+					jid,
+					600000,
+					200,
+					0,
+					Integer.MAX_VALUE,
+					42,
+					new ExecutionVertex[] { triggerVertex },
+					new ExecutionVertex[] { ackVertex1, ackVertex2 },
+					new ExecutionVertex[] { commitVertex },
+					cl,
+					new StandaloneCheckpointIDCounter(),
+					new StandaloneCompletedCheckpointStore(2, cl),
+					new HeapSavepointStore(),
+					new DisabledCheckpointStatsTracker());
 
 			// trigger a checkpoint, partially acknowledged
 			assertTrue(coord.triggerCheckpoint(timestamp));
@@ -942,17 +969,20 @@ public class CheckpointCoordinatorTest {
 			ExecutionVertex commitVertex = mockExecutionVertex(commitAttemptID);
 
 			CheckpointCoordinator coord = new CheckpointCoordinator(
-				jid,
-				200000,
-				200000,
-				42,
-				new ExecutionVertex[] { triggerVertex },
-				new ExecutionVertex[] { ackVertex1, ackVertex2 },
-				new ExecutionVertex[] { commitVertex },
-				cl,
-				new StandaloneCheckpointIDCounter(),
-				new StandaloneCompletedCheckpointStore(2, cl),
-				RecoveryMode.STANDALONE);
+					jid,
+					200000,
+					200000,
+					0,
+					Integer.MAX_VALUE,
+					42,
+					new ExecutionVertex[] { triggerVertex },
+					new ExecutionVertex[] { ackVertex1, ackVertex2 },
+					new ExecutionVertex[] { commitVertex },
+					cl,
+					new StandaloneCheckpointIDCounter(),
+					new StandaloneCompletedCheckpointStore(2, cl),
+					new HeapSavepointStore(),
+					new DisabledCheckpointStatsTracker());
 
 			assertTrue(coord.triggerCheckpoint(timestamp));
 
@@ -1020,17 +1050,20 @@ public class CheckpointCoordinatorTest {
 			}).when(triggerVertex).sendMessageToCurrentExecution(any(Serializable.class), any(ExecutionAttemptID.class));
 			
 			CheckpointCoordinator coord = new CheckpointCoordinator(
-				jid,
-				10,		// periodic interval is 10 ms
-				200000,	// timeout is very long (200 s)
-				42,
-				new ExecutionVertex[] { triggerVertex },
-				new ExecutionVertex[] { ackVertex },
-				new ExecutionVertex[] { commitVertex },
-				cl,
-				new StandaloneCheckpointIDCounter(),
-				new StandaloneCompletedCheckpointStore(2, cl),
-				RecoveryMode.STANDALONE);
+					jid,
+					10,        // periodic interval is 10 ms
+					200000,    // timeout is very long (200 s)
+					0,
+					Integer.MAX_VALUE,
+					42,
+					new ExecutionVertex[] { triggerVertex },
+					new ExecutionVertex[] { ackVertex },
+					new ExecutionVertex[] { commitVertex },
+					cl,
+					new StandaloneCheckpointIDCounter(),
+					new StandaloneCompletedCheckpointStore(2, cl),
+					new HeapSavepointStore(),
+					new DisabledCheckpointStatsTracker());
 
 			
 			coord.startCheckpointScheduler();
@@ -1110,20 +1143,20 @@ public class CheckpointCoordinatorTest {
 			}).when(vertex1).sendMessageToCurrentExecution(any(Serializable.class), any(ExecutionAttemptID.class));
 
 			CheckpointCoordinator coord = new CheckpointCoordinator(
-				jid,
-				10,		// periodic interval is 10 ms
-				200000,	// timeout is very long (200 s)
-				500,	// 500ms delay between checkpoints
-				10,
-				42,
-				new ExecutionVertex[] { vertex1 },
-				new ExecutionVertex[] { vertex1 },
-				new ExecutionVertex[] { vertex1 },
-				cl,
-				new StandaloneCheckpointIDCounter(),
-				new StandaloneCompletedCheckpointStore(2, cl),
-				RecoveryMode.STANDALONE,
-				new DisabledCheckpointStatsTracker());
+					jid,
+					10,        // periodic interval is 10 ms
+					200000,    // timeout is very long (200 s)
+					500,    // 500ms delay between checkpoints
+					10,
+					42,
+					new ExecutionVertex[] { vertex1 },
+					new ExecutionVertex[] { vertex1 },
+					new ExecutionVertex[] { vertex1 },
+					cl,
+					new StandaloneCheckpointIDCounter(),
+					new StandaloneCompletedCheckpointStore(2, cl),
+					new HeapSavepointStore(),
+					new DisabledCheckpointStatsTracker());
 
 			coord.startCheckpointScheduler();
 
@@ -1170,20 +1203,236 @@ public class CheckpointCoordinatorTest {
 
 	@Test
 	public void testMaxConcurrentAttempts1() {
-		testMaxConcurrentAttemps(1);
+		testMaxConcurrentAttempts(1);
 	}
 
 	@Test
 	public void testMaxConcurrentAttempts2() {
-		testMaxConcurrentAttemps(2);
+		testMaxConcurrentAttempts(2);
 	}
 
 	@Test
 	public void testMaxConcurrentAttempts5() {
-		testMaxConcurrentAttemps(5);
+		testMaxConcurrentAttempts(5);
 	}
 	
-	private void testMaxConcurrentAttemps(int maxConcurrentAttempts) {
+	@Test
+	public void testTriggerAndConfirmSimpleSavepoint() throws Exception {
+		final JobID jid = new JobID();
+		final long timestamp = System.currentTimeMillis();
+
+		// create some mock Execution vertices that receive the checkpoint trigger messages
+		final ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
+		final ExecutionAttemptID attemptID2 = new ExecutionAttemptID();
+		ExecutionVertex vertex1 = mockExecutionVertex(attemptID1);
+		ExecutionVertex vertex2 = mockExecutionVertex(attemptID2);
+
+		// set up the coordinator and validate the initial state
+		CheckpointCoordinator coord = new CheckpointCoordinator(
+				jid,
+				600000,
+				600000,
+				0,
+				Integer.MAX_VALUE,
+				42,
+				new ExecutionVertex[] { vertex1, vertex2 },
+				new ExecutionVertex[] { vertex1, vertex2 },
+				new ExecutionVertex[] { vertex1, vertex2 },
+				cl,
+				new StandaloneCheckpointIDCounter(),
+				new StandaloneCompletedCheckpointStore(1, cl),
+				new HeapSavepointStore(),
+				new DisabledCheckpointStatsTracker());
+
+		assertEquals(0, coord.getNumberOfPendingCheckpoints());
+		assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+
+		// trigger the first checkpoint. this should succeed
+		Future<String> savepointFuture = coord.triggerSavepoint(timestamp);
+		assertFalse(savepointFuture.isCompleted());
+
+		// validate that we have a pending savepoint
+		assertEquals(1, coord.getNumberOfPendingCheckpoints());
+
+		long checkpointId = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
+		PendingCheckpoint pending = coord.getPendingCheckpoints().get(checkpointId);
+
+		assertNotNull(pending);
+		assertEquals(checkpointId, pending.getCheckpointId());
+		assertEquals(timestamp, pending.getCheckpointTimestamp());
+		assertEquals(jid, pending.getJobId());
+		assertEquals(2, pending.getNumberOfNonAcknowledgedTasks());
+		assertEquals(0, pending.getNumberOfAcknowledgedTasks());
+		assertEquals(0, pending.getTaskStates().size());
+		assertFalse(pending.isDiscarded());
+		assertFalse(pending.isFullyAcknowledged());
+		assertFalse(pending.canBeSubsumed());
+		assertTrue(pending instanceof PendingSavepoint);
+
+
+		// acknowledge from one of the tasks
+		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId));
+		assertEquals(1, pending.getNumberOfAcknowledgedTasks());
+		assertEquals(1, pending.getNumberOfNonAcknowledgedTasks());
+		assertFalse(pending.isDiscarded());
+		assertFalse(pending.isFullyAcknowledged());
+		assertFalse(savepointFuture.isCompleted());
+
+		// acknowledge the same task again (should not matter)
+		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId));
+		assertFalse(pending.isDiscarded());
+		assertFalse(pending.isFullyAcknowledged());
+		assertFalse(savepointFuture.isCompleted());
+
+		// acknowledge the other task.
+		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointId));
+
+		// the checkpoint is internally converted to a successful checkpoint and the
+		// pending checkpoint object is disposed
+		assertTrue(pending.isDiscarded());
+		assertTrue(savepointFuture.isCompleted());
+
+		// the now we should have a completed checkpoint
+		assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints());
+		assertEquals(0, coord.getNumberOfPendingCheckpoints());
+
+		// validate that the relevant tasks got a confirmation message
+		{
+			NotifyCheckpointComplete confirmMessage1 = new NotifyCheckpointComplete(jid, attemptID1, checkpointId, timestamp);
+			NotifyCheckpointComplete confirmMessage2 = new NotifyCheckpointComplete(jid, attemptID2, checkpointId, timestamp);
+			verify(vertex1, times(1)).sendMessageToCurrentExecution(eq(confirmMessage1), eq(attemptID1));
+			verify(vertex2, times(1)).sendMessageToCurrentExecution(eq(confirmMessage2), eq(attemptID2));
+		}
+
+		CompletedCheckpoint success = coord.getSuccessfulCheckpoints().get(0);
+		assertEquals(jid, success.getJobId());
+		assertEquals(timestamp, success.getTimestamp());
+		assertEquals(pending.getCheckpointId(), success.getCheckpointID());
+		assertTrue(success.getTaskStates().isEmpty());
+
+		// ---------------
+		// trigger another checkpoint and see that this one replaces the other checkpoint
+		// ---------------
+		final long timestampNew = timestamp + 7;
+		savepointFuture = coord.triggerSavepoint(timestampNew);
+		assertFalse(savepointFuture.isCompleted());
+
+		long checkpointIdNew = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
+		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointIdNew));
+		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointIdNew));
+
+		assertEquals(0, coord.getNumberOfPendingCheckpoints());
+		assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints());
+
+		CompletedCheckpoint successNew = coord.getSuccessfulCheckpoints().get(0);
+		assertEquals(jid, successNew.getJobId());
+		assertEquals(timestampNew, successNew.getTimestamp());
+		assertEquals(checkpointIdNew, successNew.getCheckpointID());
+		assertTrue(successNew.getTaskStates().isEmpty());
+		assertTrue(savepointFuture.isCompleted());
+
+		// validate that the relevant tasks got a confirmation message
+		{
+			TriggerCheckpoint expectedMessage1 = new TriggerCheckpoint(jid, attemptID1, checkpointIdNew, timestampNew);
+			TriggerCheckpoint expectedMessage2 = new TriggerCheckpoint(jid, attemptID2, checkpointIdNew, timestampNew);
+			verify(vertex1, times(1)).sendMessageToCurrentExecution(eq(expectedMessage1), eq(attemptID1));
+			verify(vertex2, times(1)).sendMessageToCurrentExecution(eq(expectedMessage2), eq(attemptID2));
+
+			NotifyCheckpointComplete confirmMessage1 = new NotifyCheckpointComplete(jid, attemptID1, checkpointIdNew, timestampNew);
+			NotifyCheckpointComplete confirmMessage2 = new NotifyCheckpointComplete(jid, attemptID2, checkpointIdNew, timestampNew);
+			verify(vertex1, times(1)).sendMessageToCurrentExecution(eq(confirmMessage1), eq(attemptID1));
+			verify(vertex2, times(1)).sendMessageToCurrentExecution(eq(confirmMessage2), eq(attemptID2));
+		}
+
+		coord.shutdown();
+	}
+
+	/**
+	 * Triggers a savepoint and two checkpoints. The second checkpoint completes
+	 * and subsumes the first checkpoint, but not the first savepoint. Then we
+	 * trigger another checkpoint and savepoint. The 2nd savepoint completes and
+	 * subsumes the last checkpoint, but not the first savepoint.
+	 */
+	@Test
+	public void testSavepointsAreNotSubsumed() throws Exception {
+		final JobID jid = new JobID();
+		final long timestamp = System.currentTimeMillis();
+
+		// create some mock Execution vertices that receive the checkpoint trigger messages
+		final ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
+		final ExecutionAttemptID attemptID2 = new ExecutionAttemptID();
+		ExecutionVertex vertex1 = mockExecutionVertex(attemptID1);
+		ExecutionVertex vertex2 = mockExecutionVertex(attemptID2);
+
+		StandaloneCheckpointIDCounter counter = new StandaloneCheckpointIDCounter();
+
+		// set up the coordinator and validate the initial state
+		CheckpointCoordinator coord = new CheckpointCoordinator(
+				jid,
+				600000,
+				600000,
+				0,
+				Integer.MAX_VALUE,
+				42,
+				new ExecutionVertex[] { vertex1, vertex2 },
+				new ExecutionVertex[] { vertex1, vertex2 },
+				new ExecutionVertex[] { vertex1, vertex2 },
+				cl,
+				counter,
+				new StandaloneCompletedCheckpointStore(10, cl),
+				new HeapSavepointStore(),
+				new DisabledCheckpointStatsTracker());
+
+		// Trigger savepoint and checkpoint
+		Future<String> savepointFuture1 = coord.triggerSavepoint(timestamp);
+		long savepointId1 = counter.getLast();
+		assertEquals(1, coord.getNumberOfPendingCheckpoints());
+
+		assertTrue(coord.triggerCheckpoint(timestamp + 1));
+		assertEquals(2, coord.getNumberOfPendingCheckpoints());
+
+		assertTrue(coord.triggerCheckpoint(timestamp + 2));
+		long checkpointId2 = counter.getLast();
+		assertEquals(3, coord.getNumberOfPendingCheckpoints());
+
+		// 2nd checkpoint should subsume the 1st checkpoint, but not the savepoint
+		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointId2));
+		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId2));
+
+		assertEquals(1, coord.getNumberOfPendingCheckpoints());
+		assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints());
+
+		assertFalse(coord.getPendingCheckpoints().get(savepointId1).isDiscarded());
+		assertFalse(savepointFuture1.isCompleted());
+
+		assertTrue(coord.triggerCheckpoint(timestamp + 3));
+		assertEquals(2, coord.getNumberOfPendingCheckpoints());
+
+		Future<String> savepointFuture2 = coord.triggerSavepoint(timestamp);
+		long savepointId2 = counter.getLast();
+		assertEquals(3, coord.getNumberOfPendingCheckpoints());
+
+		// 2nd savepoint should subsume the last checkpoint, but not the 1st savepoint
+		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, savepointId2));
+		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, savepointId2));
+
+		assertEquals(1, coord.getNumberOfPendingCheckpoints());
+		assertEquals(2, coord.getNumberOfRetainedSuccessfulCheckpoints());
+		assertFalse(coord.getPendingCheckpoints().get(savepointId1).isDiscarded());
+
+		assertFalse(savepointFuture1.isCompleted());
+		assertTrue(savepointFuture2.isCompleted());
+
+		// Ack first savepoint
+		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, savepointId1));
+		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, savepointId1));
+
+		assertEquals(0, coord.getNumberOfPendingCheckpoints());
+		assertEquals(3, coord.getNumberOfRetainedSuccessfulCheckpoints());
+		assertTrue(savepointFuture1.isCompleted());
+	}
+
+	private void testMaxConcurrentAttempts(int maxConcurrentAttempts) {
 		try {
 			final JobID jid = new JobID();
 
@@ -1207,17 +1456,18 @@ public class CheckpointCoordinatorTest {
 			}).when(triggerVertex).sendMessageToCurrentExecution(any(Serializable.class), any(ExecutionAttemptID.class));
 
 			CheckpointCoordinator coord = new CheckpointCoordinator(
-				jid,
-				10,		// periodic interval is 10 ms
-				200000,	// timeout is very long (200 s)
-				0L,		// no extra delay
-				maxConcurrentAttempts,
-				42,
-				new ExecutionVertex[] { triggerVertex },
-				new ExecutionVertex[] { ackVertex },
-				new ExecutionVertex[] { commitVertex }, cl, new StandaloneCheckpointIDCounter
-				(), new StandaloneCompletedCheckpointStore(2, cl), RecoveryMode.STANDALONE,
-				new DisabledCheckpointStatsTracker());
+					jid,
+					10,        // periodic interval is 10 ms
+					200000,    // timeout is very long (200 s)
+					0L,        // no extra delay
+					maxConcurrentAttempts,
+					42,
+					new ExecutionVertex[] { triggerVertex },
+					new ExecutionVertex[] { ackVertex },
+					new ExecutionVertex[] { commitVertex }, cl, new StandaloneCheckpointIDCounter
+					(), new StandaloneCompletedCheckpointStore(2, cl),
+					new HeapSavepointStore(),
+					new DisabledCheckpointStatsTracker());
 
 			coord.startCheckpointScheduler();
 
@@ -1278,17 +1528,18 @@ public class CheckpointCoordinatorTest {
 			ExecutionVertex commitVertex = mockExecutionVertex(commitAttemptID);
 
 			CheckpointCoordinator coord = new CheckpointCoordinator(
-				jid,
-				10,		// periodic interval is 10 ms
-				200000,	// timeout is very long (200 s)
-				0L,		// no extra delay
-				maxConcurrentAttempts, // max two concurrent checkpoints
-				42,
-				new ExecutionVertex[] { triggerVertex },
-				new ExecutionVertex[] { ackVertex },
-				new ExecutionVertex[] { commitVertex }, cl, new StandaloneCheckpointIDCounter
-				(), new StandaloneCompletedCheckpointStore(2, cl), RecoveryMode.STANDALONE,
-				new DisabledCheckpointStatsTracker());
+					jid,
+					10,        // periodic interval is 10 ms
+					200000,    // timeout is very long (200 s)
+					0L,        // no extra delay
+					maxConcurrentAttempts, // max two concurrent checkpoints
+					42,
+					new ExecutionVertex[] { triggerVertex },
+					new ExecutionVertex[] { ackVertex },
+					new ExecutionVertex[] { commitVertex }, cl, new StandaloneCheckpointIDCounter
+					(), new StandaloneCompletedCheckpointStore(2, cl),
+					new HeapSavepointStore(),
+					new DisabledCheckpointStatsTracker());
 
 			coord.startCheckpointScheduler();
 
@@ -1358,17 +1609,18 @@ public class CheckpointCoordinatorTest {
 					});
 			
 			CheckpointCoordinator coord = new CheckpointCoordinator(
-				jid,
-				10,		// periodic interval is 10 ms
-				200000,	// timeout is very long (200 s)
-				0L,		// no extra delay
-				2, // max two concurrent checkpoints
-				42,
-				new ExecutionVertex[] { triggerVertex },
-				new ExecutionVertex[] { ackVertex },
-				new ExecutionVertex[] { commitVertex }, cl, new StandaloneCheckpointIDCounter(),
-				new StandaloneCompletedCheckpointStore(2, cl), RecoveryMode.STANDALONE,
-				new DisabledCheckpointStatsTracker());
+					jid,
+					10,        // periodic interval is 10 ms
+					200000,    // timeout is very long (200 s)
+					0L,        // no extra delay
+					2, // max two concurrent checkpoints
+					42,
+					new ExecutionVertex[] { triggerVertex },
+					new ExecutionVertex[] { ackVertex },
+					new ExecutionVertex[] { commitVertex }, cl, new StandaloneCheckpointIDCounter(),
+					new StandaloneCompletedCheckpointStore(2, cl),
+					new HeapSavepointStore(),
+					new DisabledCheckpointStatsTracker());
 			
 			coord.startCheckpointScheduler();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/47acdead/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
index 2b1b7e1..061059a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
@@ -19,13 +19,14 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.savepoint.HeapSavepointStore;
+import org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobmanager.RecoveryMode;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.state.LocalStateHandle;
 import org.apache.flink.runtime.state.StateHandle;
@@ -39,7 +40,10 @@ import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 /**
  * Tests concerning the restoring of state from a checkpoint to the task executions.
@@ -81,17 +85,20 @@ public class CheckpointStateRestoreTest {
 
 
 			CheckpointCoordinator coord = new CheckpointCoordinator(
-				jid,
-				200000L,
-				200000L,
-				42,
-				new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 },
-				new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 },
-				new ExecutionVertex[0],
-				cl,
-				new StandaloneCheckpointIDCounter(),
-				new StandaloneCompletedCheckpointStore(1, cl),
-				RecoveryMode.STANDALONE);
+					jid,
+					200000L,
+					200000L,
+					0,
+					Integer.MAX_VALUE,
+					42,
+					new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 },
+					new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 },
+					new ExecutionVertex[0],
+					cl,
+					new StandaloneCheckpointIDCounter(),
+					new StandaloneCompletedCheckpointStore(1, cl),
+					new HeapSavepointStore(),
+					new DisabledCheckpointStatsTracker());
 
 			// create ourselves a checkpoint with state
 			final long timestamp = 34623786L;
@@ -158,17 +165,20 @@ public class CheckpointStateRestoreTest {
 
 
 			CheckpointCoordinator coord = new CheckpointCoordinator(
-				jid,
-				200000L,
-				200000L,
-				42,
-				new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 },
-				new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 },
-				new ExecutionVertex[0],
-				cl,
-				new StandaloneCheckpointIDCounter(),
-				new StandaloneCompletedCheckpointStore(1, cl),
-				RecoveryMode.STANDALONE);
+					jid,
+					200000L,
+					200000L,
+					0,
+					Integer.MAX_VALUE,
+					42,
+					new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 },
+					new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 },
+					new ExecutionVertex[0],
+					cl,
+					new StandaloneCheckpointIDCounter(),
+					new StandaloneCompletedCheckpointStore(1, cl),
+					new HeapSavepointStore(),
+					new DisabledCheckpointStatsTracker());
 
 			// create ourselves a checkpoint with state
 			final long timestamp = 34623786L;
@@ -206,15 +216,19 @@ public class CheckpointStateRestoreTest {
 	public void testNoCheckpointAvailable() {
 		try {
 			CheckpointCoordinator coord = new CheckpointCoordinator(
-				new JobID(),
-				200000L,
-				200000L,
-				42,
-				new ExecutionVertex[] { mock(ExecutionVertex.class) },
-				new ExecutionVertex[] { mock(ExecutionVertex.class) },
-				new ExecutionVertex[0], cl,
-				new StandaloneCheckpointIDCounter(),
-				new StandaloneCompletedCheckpointStore(1, cl), RecoveryMode.STANDALONE);
+					new JobID(),
+					200000L,
+					200000L,
+					0,
+					Integer.MAX_VALUE,
+					42,
+					new ExecutionVertex[] { mock(ExecutionVertex.class) },
+					new ExecutionVertex[] { mock(ExecutionVertex.class) },
+					new ExecutionVertex[0], cl,
+					new StandaloneCheckpointIDCounter(),
+					new StandaloneCompletedCheckpointStore(1, cl),
+					new HeapSavepointStore(),
+					new DisabledCheckpointStatsTracker());
 
 			try {
 				coord.restoreLatestCheckpointedState(new HashMap<JobVertexID, ExecutionJobVertex>(), true, false);

http://git-wip-us.apache.org/repos/asf/flink/blob/47acdead/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
index 84d809a..634e177 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
@@ -238,7 +238,7 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
 			long timestamp,
 			Map<JobVertexID, TaskState> taskGroupStates) {
 
-			super(jobId, checkpointId, timestamp, Long.MAX_VALUE, taskGroupStates);
+			super(jobId, checkpointId, timestamp, Long.MAX_VALUE, taskGroupStates, true);
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/47acdead/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
new file mode 100644
index 0000000..90a6836
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.junit.Test;
+import org.mockito.Matchers;
+import org.mockito.Mockito;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+public class CompletedCheckpointTest {
+
+	/**
+	 * Tests that the `deleteStateWhenDisposed` flag is correctly forwarded.
+	 */
+	@Test
+	public void testDiscard() throws Exception {
+		TaskState state = mock(TaskState.class);
+		Map<JobVertexID, TaskState> taskStates = new HashMap<>();
+		taskStates.put(new JobVertexID(), state);
+
+		// Verify discard call is forwarded to state
+		CompletedCheckpoint checkpoint = new CompletedCheckpoint(new JobID(), 0, 0, 1, taskStates, true);
+		checkpoint.discard(ClassLoader.getSystemClassLoader());
+		verify(state, times(1)).discard(Matchers.any(ClassLoader.class));
+
+		Mockito.reset(state);
+
+		// Verify discard call is not forwarded to state
+		checkpoint = new CompletedCheckpoint(new JobID(), 0, 0, 1, taskStates, false);
+		checkpoint.discard(ClassLoader.getSystemClassLoader());
+		verify(state, times(0)).discard(Matchers.any(ClassLoader.class));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/47acdead/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
index 0f2c2b2..7b05fd7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
@@ -27,26 +27,22 @@ import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.checkpoint.savepoint.HeapSavepointStore;
-import org.apache.flink.runtime.checkpoint.savepoint.SavepointCoordinator;
 import org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobmanager.RecoveryMode;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.SerializedValue;
 import org.junit.AfterClass;
 import org.junit.Test;
 import scala.concurrent.duration.FiniteDuration;
 
-import java.lang.reflect.Field;
 import java.net.URL;
 import java.util.Collections;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
-import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -59,28 +55,6 @@ public class ExecutionGraphCheckpointCoordinatorTest {
 	public static void teardown() {
 		JavaTestKit.shutdownActorSystem(system);
 	}
-	
-	@Test
-	public void testCheckpointAndSavepointCoordinatorShareCheckpointIDCounter() throws Exception {
-		ExecutionGraph executionGraph = createExecutionGraphAndEnableCheckpointing(
-				new StandaloneCheckpointIDCounter(),
-				new StandaloneCompletedCheckpointStore(1, ClassLoader.getSystemClassLoader()));
-
-		CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
-		SavepointCoordinator savepointCoordinator = executionGraph.getSavepointCoordinator();
-
-		// Both the checkpoint and savepoint coordinator need to operate
-		// with the same checkpoint ID counter.
-		Field counterField = CheckpointCoordinator.class.getDeclaredField("checkpointIdCounter");
-
-		CheckpointIDCounter counterCheckpointCoordinator = (CheckpointIDCounter) counterField
-				.get(checkpointCoordinator);
-
-		CheckpointIDCounter counterSavepointCoordinator = (CheckpointIDCounter) counterField
-				.get(savepointCoordinator);
-
-		assertEquals(counterCheckpointCoordinator, counterSavepointCoordinator);
-	}
 
 	/**
 	 * Tests that a shut down checkpoint coordinator calls shutdown on
@@ -94,8 +68,7 @@ public class ExecutionGraphCheckpointCoordinatorTest {
 		ExecutionGraph graph = createExecutionGraphAndEnableCheckpointing(counter, store);
 		graph.fail(new Exception("Test Exception"));
 
-		// Two times, because shared with savepoint coordinator
-		verify(counter, times(2)).shutdown();
+		verify(counter, times(1)).shutdown();
 		verify(store, times(1)).shutdown();
 	}
 
@@ -115,8 +88,7 @@ public class ExecutionGraphCheckpointCoordinatorTest {
 		verify(counter, times(0)).shutdown();
 		verify(store, times(0)).shutdown();
 
-		// Two times, because shared with savepoint coordinator
-		verify(counter, times(2)).suspend();
+		verify(counter, times(1)).suspend();
 		verify(store, times(1)).suspend();
 	}
 
@@ -149,7 +121,6 @@ public class ExecutionGraphCheckpointCoordinatorTest {
 				UUID.randomUUID(),
 				counter,
 				store,
-				RecoveryMode.STANDALONE,
 				new HeapSavepointStore(),
 				new DisabledCheckpointStatsTracker());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/47acdead/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
new file mode 100644
index 0000000..d235e61
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.junit.Test;
+import org.mockito.Matchers;
+import org.mockito.Mockito;
+
+import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+public class PendingCheckpointTest {
+
+	private static final Map<ExecutionAttemptID, ExecutionVertex> ACK_TASKS = new HashMap<>();
+	private static final ExecutionAttemptID ATTEMPT_ID = new ExecutionAttemptID();
+
+	static {
+		ACK_TASKS.put(ATTEMPT_ID, mock(ExecutionVertex.class));
+	}
+
+	/**
+	 * Tests that pending checkpoints can be subsumed.
+	 */
+	@Test
+	public void testCanBeSubsumed() throws Exception {
+		PendingCheckpoint pending = createPendingCheckpoint();
+		assertTrue(pending.canBeSubsumed());
+	}
+
+	/**
+	 * Tests that abort discards state.
+	 */
+	@Test
+	@SuppressWarnings("unchecked")
+	public void testAbort() throws Exception {
+		TaskState state = mock(TaskState.class);
+
+		// Abort declined
+		PendingCheckpoint pending = createPendingCheckpoint();
+		setTaskState(pending, state);
+
+		pending.abortDeclined();
+		verify(state, times(1)).discard(Matchers.any(ClassLoader.class));
+
+		// Abort error
+		Mockito.reset(state);
+
+		pending = createPendingCheckpoint();
+		setTaskState(pending, state);
+
+		pending.abortError(new Exception("Expected Test Exception"));
+		verify(state, times(1)).discard(Matchers.any(ClassLoader.class));
+
+		// Abort expired
+		Mockito.reset(state);
+
+		pending = createPendingCheckpoint();
+		setTaskState(pending, state);
+
+		pending.abortExpired();
+		verify(state, times(1)).discard(Matchers.any(ClassLoader.class));
+
+		// Abort subsumed
+		Mockito.reset(state);
+
+		pending = createPendingCheckpoint();
+		setTaskState(pending, state);
+
+		pending.abortSubsumed();
+		verify(state, times(1)).discard(Matchers.any(ClassLoader.class));
+	}
+
+	/**
+	 * Tests that the CompletedCheckpoint `deleteStateWhenDisposed` flag is
+	 * correctly set to true.
+	 */
+	@Test
+	public void testFinalizeCheckpoint() throws Exception {
+		TaskState state = mock(TaskState.class);
+		PendingCheckpoint pending = createPendingCheckpoint();
+		PendingCheckpointTest.setTaskState(pending, state);
+
+		pending.acknowledgeTask(ATTEMPT_ID, null, 0, null);
+
+		CompletedCheckpoint checkpoint = pending.finalizeCheckpoint();
+
+		// Does discard state
+		checkpoint.discard(ClassLoader.getSystemClassLoader());
+		verify(state, times(1)).discard(Matchers.any(ClassLoader.class));
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static PendingCheckpoint createPendingCheckpoint() {
+		ClassLoader classLoader = ClassLoader.getSystemClassLoader();
+		Map<ExecutionAttemptID, ExecutionVertex> ackTasks = new HashMap<>(ACK_TASKS);
+		return new PendingCheckpoint(new JobID(), 0, 1, ackTasks, classLoader);
+	}
+
+	@SuppressWarnings("unchecked")
+	static void setTaskState(PendingCheckpoint pending, TaskState state) throws NoSuchFieldException, IllegalAccessException {
+		Field field = PendingCheckpoint.class.getDeclaredField("taskStates");
+		field.setAccessible(true);
+		Map<JobVertexID, TaskState> taskStates = (Map<JobVertexID, TaskState>) field.get(pending);
+
+		taskStates.put(new JobVertexID(), state);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/47acdead/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingSavepointTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingSavepointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingSavepointTest.java
new file mode 100644
index 0000000..6ae6e1c
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingSavepointTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.savepoint.HeapSavepointStore;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.junit.Test;
+import org.mockito.Matchers;
+import org.mockito.Mockito;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+public class PendingSavepointTest {
+
+	private static final Map<ExecutionAttemptID, ExecutionVertex> ACK_TASKS = new HashMap<>();
+	private static final ExecutionAttemptID ATTEMPT_ID = new ExecutionAttemptID();
+
+	static {
+		ACK_TASKS.put(ATTEMPT_ID, mock(ExecutionVertex.class));
+	}
+
+	/**
+	 * Tests that pending savepoints cannot be subsumed.
+	 */
+	@Test
+	public void testCanBeSubsumed() throws Exception {
+		PendingSavepoint pending = createPendingSavepoint();
+		assertFalse(pending.canBeSubsumed());
+	}
+
+	/**
+	 * Tests that abort discards state fails the completeion future.
+	 */
+	@Test
+	@SuppressWarnings("unchecked")
+	public void testAbort() throws Exception {
+		TaskState state = mock(TaskState.class);
+
+		// Abort declined
+		PendingSavepoint pending = createPendingSavepoint();
+		PendingCheckpointTest.setTaskState(pending, state);
+
+		pending.abortDeclined();
+		verify(state, times(1)).discard(Matchers.any(ClassLoader.class));
+
+		// Abort error
+		Mockito.reset(state);
+
+		pending = createPendingSavepoint();
+		PendingCheckpointTest.setTaskState(pending, state);
+		Future<String> future = pending.getCompletionFuture();
+
+		pending.abortError(new Exception("Expected Test Exception"));
+		verify(state, times(1)).discard(Matchers.any(ClassLoader.class));
+		assertTrue(future.failed().isCompleted());
+
+		// Abort expired
+		Mockito.reset(state);
+
+		pending = createPendingSavepoint();
+		PendingCheckpointTest.setTaskState(pending, state);
+		future = pending.getCompletionFuture();
+
+		pending.abortExpired();
+		verify(state, times(1)).discard(Matchers.any(ClassLoader.class));
+		assertTrue(future.failed().isCompleted());
+
+		// Abort subsumed
+		pending = createPendingSavepoint();
+
+		try {
+			pending.abortSubsumed();
+			fail("Did not throw expected Exception");
+		} catch (Throwable ignored) { // expected
+		}
+	}
+
+	/**
+	 * Tests that the CompletedCheckpoint `deleteStateWhenDisposed` flag is
+	 * correctly set to false.
+	 */
+	@Test
+	public void testFinalizeCheckpoint() throws Exception {
+		TaskState state = mock(TaskState.class);
+		PendingSavepoint pending = createPendingSavepoint();
+		PendingCheckpointTest.setTaskState(pending, state);
+
+		Future<String> future = pending.getCompletionFuture();
+
+		pending.acknowledgeTask(ATTEMPT_ID, null, 0, null);
+
+		CompletedCheckpoint checkpoint = pending.finalizeCheckpoint();
+
+		// Does _NOT_ discard state
+		checkpoint.discard(ClassLoader.getSystemClassLoader());
+		verify(state, times(0)).discard(Matchers.any(ClassLoader.class));
+
+		// Future is completed
+		String path = Await.result(future, Duration.Zero());
+		assertNotNull(path);
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static PendingSavepoint createPendingSavepoint() {
+		ClassLoader classLoader = ClassLoader.getSystemClassLoader();
+		Map<ExecutionAttemptID, ExecutionVertex> ackTasks = new HashMap<>(ACK_TASKS);
+		return new PendingSavepoint(new JobID(), 0, 1, ackTasks, classLoader, new HeapSavepointStore());
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/47acdead/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/HeapSavepointStoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/HeapSavepointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/HeapSavepointStoreTest.java
deleted file mode 100644
index ec3dd0a..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/HeapSavepointStoreTest.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.checkpoint.savepoint;
-
-public class HeapSavepointStoreTest {
-
-
-
-}


Mime
View raw message