flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject [07/11] flink git commit: [FLINK-2976] [runtime, tests] Add SavepointCoordinator
Date Mon, 11 Jan 2016 15:31:28 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/d739ee25/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
new file mode 100644
index 0000000..e921e92
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import akka.actor.ActorSystem;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.junit.Test;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.lang.reflect.Field;
+import java.net.URL;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+
+public class ExecutionGraphCheckpointCoordinatorTest {
+
+	@Test
+	public void testCheckpointAndSavepointCoordinatorShareCheckpointIDCounter() throws Exception {
+		ExecutionGraph executionGraph = new ExecutionGraph(
+				TestingUtils.defaultExecutionContext(),
+				new JobID(),
+				"test",
+				new Configuration(),
+				new FiniteDuration(1, TimeUnit.DAYS),
+				Collections.<BlobKey>emptyList(),
+				Collections.<URL>emptyList(),
+				ClassLoader.getSystemClassLoader());
+
+		ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem();
+
+		try {
+			executionGraph.enableSnapshotCheckpointing(
+					100,
+					100,
+					100,
+					1,
+					Collections.<ExecutionJobVertex>emptyList(),
+					Collections.<ExecutionJobVertex>emptyList(),
+					Collections.<ExecutionJobVertex>emptyList(),
+					actorSystem,
+					UUID.randomUUID(),
+					new StandaloneCheckpointIDCounter(),
+					new StandaloneCompletedCheckpointStore(1, ClassLoader.getSystemClassLoader()),
+					RecoveryMode.STANDALONE,
+					new HeapStateStore<Savepoint>());
+
+			CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
+			SavepointCoordinator savepointCoordinator = executionGraph.getSavepointCoordinator();
+
+			// Both the checkpoint and savepoint coordinator need to operate\
+			// with the same checkpoint ID counter.
+			Field counterField = CheckpointCoordinator.class.getDeclaredField("checkpointIdCounter");
+
+			CheckpointIDCounter counterCheckpointCoordinator = (CheckpointIDCounter) counterField
+					.get(checkpointCoordinator);
+
+			CheckpointIDCounter counterSavepointCoordinator = (CheckpointIDCounter) counterField
+					.get(savepointCoordinator);
+
+			assertEquals(counterCheckpointCoordinator, counterSavepointCoordinator);
+		}
+		finally {
+			if (actorSystem != null) {
+				actorSystem.shutdown();
+			}
+		}
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d739ee25/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SavepointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SavepointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SavepointCoordinatorTest.java
new file mode 100644
index 0000000..4dc9c5c
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SavepointCoordinatorTest.java
@@ -0,0 +1,1126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.flink.api.common.ApplicationID;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
+import org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete;
+import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint;
+import org.apache.flink.runtime.state.LocalStateHandle;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.util.SerializedValue;
+import org.junit.Test;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.Promise;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the savepoint coordinator.
+ */
+public class SavepointCoordinatorTest {
+
+	// ------------------------------------------------------------------------
+	// Trigger and acknowledge
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Simple trigger-acknowledge test for a single savepoint.
+	 */
+	@Test
+	public void testSimpleTriggerSavepoint() throws Exception {
+		ApplicationID appId = new ApplicationID();
+		JobID jobId = new JobID();
+		long checkpointTimeout = 60 * 1000;
+		long timestamp = 1272635;
+		ExecutionVertex[] vertices = new ExecutionVertex[] {
+				mockExecutionVertex(jobId),
+				mockExecutionVertex(jobId) };
+		MockCheckpointIdCounter checkpointIdCounter = new MockCheckpointIdCounter();
+		HeapStateStore<Savepoint> savepointStore = new HeapStateStore<>();
+
+		SavepointCoordinator coordinator = createSavepointCoordinator(
+				appId,
+				jobId,
+				checkpointTimeout,
+				vertices,
+				vertices,
+				vertices,
+				checkpointIdCounter,
+				savepointStore);
+
+		// Trigger the savepoint
+		Future<String> savepointPathFuture = coordinator.triggerSavepoint(timestamp);
+		assertFalse(savepointPathFuture.isCompleted());
+
+		long checkpointId = checkpointIdCounter.getLastReturnedCount();
+		assertEquals(0, checkpointId);
+
+		// Verify send trigger messages
+		for (ExecutionVertex vertex : vertices) {
+			verifyTriggerCheckpoint(vertex, checkpointId, timestamp);
+		}
+
+		PendingCheckpoint pendingCheckpoint = coordinator.getPendingCheckpoints()
+				.get(checkpointId);
+
+		verifyPendingCheckpoint(pendingCheckpoint, jobId, checkpointId,
+				timestamp, 0, 2, 0, false, false);
+
+		// Acknowledge tasks
+		for (ExecutionVertex vertex : vertices) {
+			coordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(
+					jobId, vertex.getCurrentExecutionAttempt().getAttemptId(),
+					checkpointId, createSerializedStateHandle(vertex), 0));
+		}
+
+		// The pending checkpoint is completed
+		assertTrue(pendingCheckpoint.isDiscarded());
+		assertEquals(0, coordinator.getSuccessfulCheckpoints().size());
+
+		// Verify send notify complete messages
+		for (ExecutionVertex vertex : vertices) {
+			verifyNotifyCheckpointComplete(vertex, checkpointId, timestamp);
+		}
+
+		// Verify that the future has been completed
+		assertTrue(savepointPathFuture.isCompleted());
+		String savepointPath = Await.result(savepointPathFuture, FiniteDuration.Zero());
+
+		// Verify the savepoint
+		Savepoint savepoint = savepointStore.getState(savepointPath);
+		verifySavepoint(savepoint, appId, jobId, checkpointId, timestamp,
+				vertices);
+
+		// Verify all promises removed
+		assertEquals(0, getSavepointPromises(coordinator).size());
+
+		coordinator.shutdown();
+	}
+
+	// ------------------------------------------------------------------------
+	// Rollback
+	// ------------------------------------------------------------------------
+
+	@Test
+	@SuppressWarnings("unchecked")
+	public void testSimpleRollbackSavepoint() throws Exception {
+		ApplicationID appId = new ApplicationID();
+		JobID jobId = new JobID();
+
+		ExecutionJobVertex[] jobVertices = new ExecutionJobVertex[] {
+				mockExecutionJobVertex(jobId, new JobVertexID(), 4),
+				mockExecutionJobVertex(jobId, new JobVertexID(), 4) };
+
+		ExecutionVertex[] triggerVertices = jobVertices[0].getTaskVertices();
+		ExecutionVertex[] ackVertices = new ExecutionVertex[8];
+
+		int i = 0;
+		for (ExecutionJobVertex jobVertex : jobVertices) {
+			for (ExecutionVertex vertex : jobVertex.getTaskVertices()) {
+				ackVertices[i++] = vertex;
+			}
+		}
+
+		StateStore<Savepoint> savepointStore = new HeapStateStore<>();
+
+		SavepointCoordinator coordinator = createSavepointCoordinator(
+				appId,
+				jobId,
+				60 * 1000,
+				triggerVertices,
+				ackVertices,
+				new ExecutionVertex[] {},
+				new MockCheckpointIdCounter(),
+				savepointStore);
+
+		Future<String> savepointPathFuture = coordinator.triggerSavepoint(1231273123);
+
+		// Acknowledge all tasks
+		for (ExecutionVertex vertex : ackVertices) {
+			ExecutionAttemptID attemptId = vertex.getCurrentExecutionAttempt().getAttemptId();
+			coordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(
+					jobId, attemptId, 0, createSerializedStateHandle(vertex), 0));
+		}
+
+		String savepointPath = Await.result(savepointPathFuture, FiniteDuration.Zero());
+		assertNotNull(savepointPath);
+
+		// Rollback
+		assertEquals(appId, coordinator.restoreSavepoint(
+				createExecutionJobVertexMap(jobVertices),
+				savepointPath));
+
+		// Verify all executions have been reset
+		for (ExecutionVertex vertex : ackVertices) {
+			verify(vertex.getCurrentExecutionAttempt(), times(1)).setInitialState(
+					any(SerializedValue.class), anyLong());
+		}
+
+		// Verify all promises removed
+		assertEquals(0, getSavepointPromises(coordinator).size());
+
+		coordinator.shutdown();
+	}
+
+	@Test
+	public void testRollbackParallelismMismatch() throws Exception {
+		ApplicationID appId = new ApplicationID();
+		JobID jobId = new JobID();
+
+		ExecutionJobVertex[] jobVertices = new ExecutionJobVertex[] {
+				mockExecutionJobVertex(jobId, new JobVertexID(), 4),
+				mockExecutionJobVertex(jobId, new JobVertexID(), 4) };
+
+		ExecutionVertex[] triggerVertices = jobVertices[0].getTaskVertices();
+		ExecutionVertex[] ackVertices = new ExecutionVertex[8];
+
+		int index = 0;
+		for (ExecutionJobVertex jobVertex : jobVertices) {
+			for (ExecutionVertex vertex : jobVertex.getTaskVertices()) {
+				ackVertices[index++] = vertex;
+			}
+		}
+
+		StateStore<Savepoint> savepointStore = new HeapStateStore<>();
+
+		SavepointCoordinator coordinator = createSavepointCoordinator(
+				appId,
+				jobId,
+				60 * 1000,
+				triggerVertices,
+				ackVertices,
+				new ExecutionVertex[] {},
+				new MockCheckpointIdCounter(),
+				savepointStore);
+
+		Future<String> savepointPathFuture = coordinator.triggerSavepoint(1231273123);
+
+		// Acknowledge all tasks
+		for (ExecutionVertex vertex : ackVertices) {
+			ExecutionAttemptID attemptId = vertex.getCurrentExecutionAttempt().getAttemptId();
+			coordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(
+					jobId, attemptId, 0, createSerializedStateHandle(vertex), 0));
+		}
+
+		String savepointPath = Await.result(savepointPathFuture, FiniteDuration.Zero());
+		assertNotNull(savepointPath);
+
+		// Change parallelism higher than original (subtask without matching state)
+		for (int i = 0; i < jobVertices.length; i++) {
+			jobVertices[i] = mockExecutionJobVertex(jobId, jobVertices[i].getJobVertexId(), 8);
+		}
+
+		try {
+			// Rollback
+			coordinator.restoreSavepoint(
+					createExecutionJobVertexMap(jobVertices),
+					savepointPath);
+			fail("Did not throw expected Exception after rollback with parallelism mismatch.");
+		}
+		catch (Exception ignored) {
+		}
+
+		// Change parallelism lower than original (state without matching subtask)
+		for (int i = 0; i < jobVertices.length; i++) {
+			jobVertices[i] = mockExecutionJobVertex(jobId, jobVertices[i].getJobVertexId(), 2);
+		}
+
+		try {
+			// Rollback
+			coordinator.restoreSavepoint(
+					createExecutionJobVertexMap(jobVertices),
+					savepointPath);
+			fail("Did not throw expected Exception after rollback with parallelism mismatch.");
+		}
+		catch (Exception ignored) {
+		}
+
+		// Verify all promises removed
+		assertEquals(0, getSavepointPromises(coordinator).size());
+
+		coordinator.shutdown();
+	}
+
+	@Test
+	public void testRollbackStateStoreFailure() throws Exception {
+		ApplicationID appId = new ApplicationID();
+		JobID jobId = new JobID();
+		ExecutionJobVertex jobVertex = mockExecutionJobVertex(jobId, new JobVertexID(), 4);
+		HeapStateStore<Savepoint> savepointStore = spy(
+				new HeapStateStore<Savepoint>());
+
+		SavepointCoordinator coordinator = createSavepointCoordinator(
+				appId,
+				jobId,
+				60 * 1000,
+				jobVertex.getTaskVertices(),
+				jobVertex.getTaskVertices(),
+				new ExecutionVertex[] {},
+				new MockCheckpointIdCounter(),
+				savepointStore);
+
+		Future<String> savepointPathFuture = coordinator.triggerSavepoint(1231273123);
+
+		// Acknowledge all tasks
+		for (ExecutionVertex vertex : jobVertex.getTaskVertices()) {
+			ExecutionAttemptID attemptId = vertex.getCurrentExecutionAttempt().getAttemptId();
+			coordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(
+					jobId, attemptId, 0, createSerializedStateHandle(vertex), 0));
+		}
+
+		String savepointPath = Await.result(savepointPathFuture, FiniteDuration.Zero());
+		assertNotNull(savepointPath);
+
+		// Failure on getState
+		doThrow(new Exception("TestException")).when(savepointStore).getState(anyString());
+
+		try {
+			// Rollback
+			coordinator.restoreSavepoint(
+					createExecutionJobVertexMap(jobVertex),
+					savepointPath);
+
+			fail("Did not throw expected Exception after rollback with savepoint store failure.");
+		}
+		catch (Exception ignored) {
+		}
+
+		// Verify all promises removed
+		assertEquals(0, getSavepointPromises(coordinator).size());
+
+		coordinator.shutdown();
+	}
+
+	@Test
+	public void testRollbackUpdatesApplicationID() throws Exception {
+		ApplicationID appId = new ApplicationID();
+
+		CompletedCheckpoint checkpoint = mock(CompletedCheckpoint.class);
+		when(checkpoint.getStates()).thenReturn(Collections.<StateForTask>emptyList());
+		when(checkpoint.getCheckpointID()).thenReturn(12312312L);
+
+		Savepoint savepoint = new Savepoint(appId, checkpoint);
+
+		StateStore<Savepoint> savepointStore = mock(StateStore.class);
+		when(savepointStore.getState(anyString())).thenReturn(savepoint);
+
+		SavepointCoordinator coordinator = createSavepointCoordinator(
+				new ApplicationID(),
+				new JobID(),
+				60 * 1000,
+				new ExecutionVertex[] {},
+				new ExecutionVertex[] {},
+				new ExecutionVertex[] {},
+				new MockCheckpointIdCounter(),
+				savepointStore);
+
+		assertEquals(appId, coordinator.restoreSavepoint(createExecutionJobVertexMap(), "any"));
+
+		coordinator.shutdown();
+	}
+
+	@Test
+	public void testRollbackSetsCheckpointID() throws Exception {
+		ApplicationID appId = new ApplicationID();
+
+		CompletedCheckpoint checkpoint = mock(CompletedCheckpoint.class);
+		when(checkpoint.getStates()).thenReturn(Collections.<StateForTask>emptyList());
+		when(checkpoint.getCheckpointID()).thenReturn(12312312L);
+
+		Savepoint savepoint = new Savepoint(appId, checkpoint);
+
+		CheckpointIDCounter checkpointIdCounter = mock(CheckpointIDCounter.class);
+
+		StateStore<Savepoint> savepointStore = mock(StateStore.class);
+		when(savepointStore.getState(anyString())).thenReturn(savepoint);
+
+		SavepointCoordinator coordinator = createSavepointCoordinator(
+				new ApplicationID(),
+				new JobID(),
+				60 * 1000,
+				new ExecutionVertex[] {},
+				new ExecutionVertex[] {},
+				new ExecutionVertex[] {},
+				checkpointIdCounter,
+				savepointStore);
+
+		assertEquals(appId, coordinator.restoreSavepoint(createExecutionJobVertexMap(), "any"));
+
+		verify(checkpointIdCounter).setCount(eq(12312312L + 1));
+
+		coordinator.shutdown();
+	}
+
+	// ------------------------------------------------------------------------
+	// Savepoint aborts and future notifications
+	// ------------------------------------------------------------------------
+
+	@Test
+	public void testAbortSavepointIfTriggerTasksNotExecuted() throws Exception {
+		ApplicationID appId = new ApplicationID();
+		JobID jobId = new JobID();
+		ExecutionVertex[] triggerVertices = new ExecutionVertex[] {
+				mock(ExecutionVertex.class),
+				mock(ExecutionVertex.class) };
+		ExecutionVertex[] ackVertices = new ExecutionVertex[] {
+				mockExecutionVertex(jobId),
+				mockExecutionVertex(jobId) };
+
+		SavepointCoordinator coordinator = createSavepointCoordinator(
+				appId,
+				jobId,
+				60 * 1000,
+				triggerVertices,
+				ackVertices,
+				new ExecutionVertex[] {},
+				new MockCheckpointIdCounter(),
+				new HeapStateStore<Savepoint>());
+
+		// Trigger savepoint
+		Future<String> savepointPathFuture = coordinator.triggerSavepoint(1238123);
+
+		// Abort the savepoint, because the vertices are not running
+		assertTrue(savepointPathFuture.isCompleted());
+
+		try {
+			Await.result(savepointPathFuture, FiniteDuration.Zero());
+			fail("Did not throw expected Exception after shutdown");
+		}
+		catch (Exception ignored) {
+		}
+
+		// Verify all promises removed
+		assertEquals(0, getSavepointPromises(coordinator).size());
+
+		coordinator.shutdown();
+	}
+
+	@Test
+	public void testAbortSavepointIfTriggerTasksAreFinished() throws Exception {
+		ApplicationID appId = new ApplicationID();
+		JobID jobId = new JobID();
+		ExecutionVertex[] triggerVertices = new ExecutionVertex[] {
+				mockExecutionVertex(jobId),
+				mockExecutionVertex(jobId, ExecutionState.FINISHED) };
+		ExecutionVertex[] ackVertices = new ExecutionVertex[] {
+				mockExecutionVertex(jobId),
+				mockExecutionVertex(jobId) };
+
+		SavepointCoordinator coordinator = createSavepointCoordinator(
+				appId,
+				jobId,
+				60 * 1000,
+				triggerVertices,
+				ackVertices,
+				new ExecutionVertex[] {},
+				new MockCheckpointIdCounter(),
+				new HeapStateStore<Savepoint>());
+
+		// Trigger savepoint
+		Future<String> savepointPathFuture = coordinator.triggerSavepoint(1238123);
+
+		// Abort the savepoint, because the vertices are not running
+		assertTrue(savepointPathFuture.isCompleted());
+
+		try {
+			Await.result(savepointPathFuture, FiniteDuration.Zero());
+			fail("Did not throw expected Exception after shutdown");
+		}
+		catch (Exception ignored) {
+		}
+
+		// Verify all promises removed
+		assertEquals(0, getSavepointPromises(coordinator).size());
+
+		coordinator.shutdown();
+	}
+
+	@Test
+	public void testAbortSavepointIfAckTasksAreNotExecuted() throws Exception {
+		ApplicationID appId = new ApplicationID();
+		JobID jobId = new JobID();
+		ExecutionVertex[] triggerVertices = new ExecutionVertex[] {
+				mockExecutionVertex(jobId),
+				mockExecutionVertex(jobId) };
+		ExecutionVertex[] ackVertices = new ExecutionVertex[] {
+				mock(ExecutionVertex.class),
+				mock(ExecutionVertex.class) };
+
+		SavepointCoordinator coordinator = createSavepointCoordinator(
+				appId,
+				jobId,
+				60 * 1000,
+				triggerVertices,
+				ackVertices,
+				new ExecutionVertex[] {},
+				new MockCheckpointIdCounter(),
+				new HeapStateStore<Savepoint>());
+
+		// Trigger savepoint
+		Future<String> savepointPathFuture = coordinator.triggerSavepoint(1238123);
+
+		// Abort the savepoint, because the vertices are not running
+		assertTrue(savepointPathFuture.isCompleted());
+
+		try {
+			Await.result(savepointPathFuture, FiniteDuration.Zero());
+			fail("Did not throw expected Exception after shutdown");
+		}
+		catch (Exception ignored) {
+		}
+
+		// Verify all promises removed
+		assertEquals(0, getSavepointPromises(coordinator).size());
+
+		coordinator.shutdown();
+	}
+
+	@Test
+	public void testAbortOnCheckpointTimeout() throws Exception {
+		ApplicationID appId = new ApplicationID();
+		JobID jobId = new JobID();
+		ExecutionVertex[] vertices = new ExecutionVertex[] {
+				mockExecutionVertex(jobId),
+				mockExecutionVertex(jobId) };
+		ExecutionVertex commitVertex = mockExecutionVertex(jobId);
+		MockCheckpointIdCounter checkpointIdCounter = new MockCheckpointIdCounter();
+
+		SavepointCoordinator coordinator = createSavepointCoordinator(
+				appId,
+				jobId,
+				20,
+				vertices,
+				vertices,
+				new ExecutionVertex[] { commitVertex },
+				checkpointIdCounter,
+				new HeapStateStore<Savepoint>());
+
+		// Trigger the savepoint
+		Future<String> savepointPathFuture = coordinator.triggerSavepoint(12731273);
+		assertFalse(savepointPathFuture.isCompleted());
+
+		long checkpointId = checkpointIdCounter.getLastReturnedCount();
+
+		// Acknowledge single task
+		coordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(
+				jobId, vertices[0].getCurrentExecutionAttempt().getAttemptId(),
+				checkpointId, createSerializedStateHandle(vertices[0]), 0));
+
+		PendingCheckpoint pendingCheckpoint = coordinator.getPendingCheckpoints()
+				.get(checkpointId);
+
+		assertFalse(pendingCheckpoint.isDiscarded());
+
+		// Wait for savepoint to timeout
+		Deadline deadline = FiniteDuration.apply(5, "s").fromNow();
+		while (deadline.hasTimeLeft()
+				&& !pendingCheckpoint.isDiscarded()
+				&& coordinator.getNumberOfPendingCheckpoints() > 0) {
+
+			Thread.sleep(250);
+		}
+
+		// Verify discarded
+		assertTrue(pendingCheckpoint.isDiscarded());
+		assertEquals(0, coordinator.getNumberOfPendingCheckpoints());
+		assertEquals(0, coordinator.getNumberOfRetainedSuccessfulCheckpoints());
+
+		// No commit for timeout
+		verify(commitVertex, times(0)).sendMessageToCurrentExecution(
+				any(NotifyCheckpointComplete.class), any(ExecutionAttemptID.class));
+
+		assertTrue(savepointPathFuture.isCompleted());
+
+		try {
+			Await.result(savepointPathFuture, FiniteDuration.Zero());
+			fail("Did not throw expected Exception after timeout");
+		}
+		catch (Exception ignored) {
+		}
+
+		// Verify all promises removed
+		assertEquals(0, getSavepointPromises(coordinator).size());
+
+		coordinator.shutdown();
+	}
+
+	@Test
+	public void testAbortSavepointsOnShutdown() throws Exception {
+		ApplicationID appId = new ApplicationID();
+		JobID jobId = new JobID();
+		ExecutionVertex[] vertices = new ExecutionVertex[] {
+				mockExecutionVertex(jobId),
+				mockExecutionVertex(jobId) };
+
+		SavepointCoordinator coordinator = createSavepointCoordinator(
+				appId,
+				jobId,
+				60 * 1000,
+				vertices,
+				vertices,
+				vertices,
+				new MockCheckpointIdCounter(),
+				new HeapStateStore<Savepoint>());
+
+		// Trigger savepoints
+		List<Future<String>> savepointPathFutures = new ArrayList<>();
+		savepointPathFutures.add(coordinator.triggerSavepoint(12731273));
+		savepointPathFutures.add(coordinator.triggerSavepoint(12731273 + 123));
+
+		for (Future<String> future : savepointPathFutures) {
+			assertFalse(future.isCompleted());
+		}
+
+		coordinator.shutdown();
+
+		// Verify futures failed
+		for (Future<String> future : savepointPathFutures) {
+			assertTrue(future.isCompleted());
+
+			try {
+				Await.result(future, FiniteDuration.Zero());
+				fail("Did not throw expected Exception after shutdown");
+			}
+			catch (Exception ignored) {
+			}
+		}
+
+		// Verify all promises removed
+		assertEquals(0, getSavepointPromises(coordinator).size());
+	}
+
+	@Test
+	public void testAbortSavepointOnStateStoreFailure() throws Exception {
+		ApplicationID appId = new ApplicationID();
+		JobID jobId = new JobID();
+		ExecutionJobVertex jobVertex = mockExecutionJobVertex(jobId, new JobVertexID(), 4);
+		HeapStateStore<Savepoint> savepointStore = spy(
+				new HeapStateStore<Savepoint>());
+
+		SavepointCoordinator coordinator = createSavepointCoordinator(
+				appId,
+				jobId,
+				60 * 1000,
+				jobVertex.getTaskVertices(),
+				jobVertex.getTaskVertices(),
+				new ExecutionVertex[] {},
+				new MockCheckpointIdCounter(),
+				savepointStore);
+
+		// Failure on putState
+		doThrow(new Exception("TestException"))
+				.when(savepointStore).putState(any(Savepoint.class));
+
+		Future<String> savepointPathFuture = coordinator.triggerSavepoint(1231273123);
+
+		// Acknowledge all tasks
+		for (ExecutionVertex vertex : jobVertex.getTaskVertices()) {
+			ExecutionAttemptID attemptId = vertex.getCurrentExecutionAttempt().getAttemptId();
+			coordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(
+					jobId, attemptId, 0, createSerializedStateHandle(vertex), 0));
+		}
+
+		try {
+			Await.result(savepointPathFuture, FiniteDuration.Zero());
+			fail("Did not throw expected Exception after rollback with savepoint store failure.");
+		}
+		catch (Exception ignored) {
+		}
+
+		// Verify all promises removed
+		assertEquals(0, getSavepointPromises(coordinator).size());
+
+		coordinator.shutdown();
+	}
+
+	@Test
+	public void testAbortSavepointIfSubsumed() throws Exception {
+		ApplicationID appId = new ApplicationID();
+		JobID jobId = new JobID();
+		long checkpointTimeout = 60 * 1000;
+		long[] timestamps = new long[] { 1272635, 1272635 + 10 };
+		long[] checkpointIds = new long[2];
+		ExecutionVertex[] vertices = new ExecutionVertex[] {
+				mockExecutionVertex(jobId),
+				mockExecutionVertex(jobId) };
+		MockCheckpointIdCounter checkpointIdCounter = new MockCheckpointIdCounter();
+		HeapStateStore<Savepoint> savepointStore = new HeapStateStore<>();
+
+		SavepointCoordinator coordinator = createSavepointCoordinator(
+				appId,
+				jobId,
+				checkpointTimeout,
+				vertices,
+				vertices,
+				vertices,
+				checkpointIdCounter,
+				savepointStore);
+
+		// Trigger the savepoints
+		List<Future<String>> savepointPathFutures = new ArrayList<>();
+
+		savepointPathFutures.add(coordinator.triggerSavepoint(timestamps[0]));
+		checkpointIds[0] = checkpointIdCounter.getLastReturnedCount();
+
+		savepointPathFutures.add(coordinator.triggerSavepoint(timestamps[1]));
+		checkpointIds[1] = checkpointIdCounter.getLastReturnedCount();
+
+		for (Future<String> future : savepointPathFutures) {
+			assertFalse(future.isCompleted());
+		}
+
+		// Verify send trigger messages
+		for (ExecutionVertex vertex : vertices) {
+			verifyTriggerCheckpoint(vertex, checkpointIds[0], timestamps[0]);
+			verifyTriggerCheckpoint(vertex, checkpointIds[1], timestamps[1]);
+		}
+
+		PendingCheckpoint[] pendingCheckpoints = new PendingCheckpoint[] {
+				coordinator.getPendingCheckpoints().get(checkpointIds[0]),
+				coordinator.getPendingCheckpoints().get(checkpointIds[1]) };
+
+		verifyPendingCheckpoint(pendingCheckpoints[0], jobId, checkpointIds[0],
+				timestamps[0], 0, 2, 0, false, false);
+
+		verifyPendingCheckpoint(pendingCheckpoints[1], jobId, checkpointIds[1],
+				timestamps[1], 0, 2, 0, false, false);
+
+		// Acknowledge second checkpoint...
+		for (ExecutionVertex vertex : vertices) {
+			coordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(
+					jobId, vertex.getCurrentExecutionAttempt().getAttemptId(),
+					checkpointIds[1], createSerializedStateHandle(vertex), 0));
+		}
+
+		// ...and one task of first checkpoint
+		coordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(
+				jobId, vertices[0].getCurrentExecutionAttempt().getAttemptId(),
+				checkpointIds[0], createSerializedStateHandle(vertices[0]), 0));
+
+		// The second pending checkpoint is completed and subsumes the first one
+		assertTrue(pendingCheckpoints[0].isDiscarded());
+		assertTrue(pendingCheckpoints[1].isDiscarded());
+		assertEquals(0, coordinator.getSuccessfulCheckpoints().size());
+
+		// Verify send notify complete messages for second checkpoint
+		for (ExecutionVertex vertex : vertices) {
+			verifyNotifyCheckpointComplete(vertex, checkpointIds[1], timestamps[1]);
+		}
+
+		Savepoint[] savepoints = new Savepoint[2];
+		String[] savepointPaths = new String[2];
+
+		// Verify that the futures have both been completed
+		assertTrue(savepointPathFutures.get(0).isCompleted());
+
+		try {
+			savepointPaths[0] = Await.result(savepointPathFutures.get(0), FiniteDuration.Zero());
+			fail("Did not throw expected exception");
+		}
+		catch (Exception ignored) {
+		}
+
+		// Verify the second savepoint
+		assertTrue(savepointPathFutures.get(1).isCompleted());
+		savepointPaths[1] = Await.result(savepointPathFutures.get(1), FiniteDuration.Zero());
+		savepoints[1] = savepointStore.getState(savepointPaths[1]);
+		verifySavepoint(savepoints[1], appId, jobId, checkpointIds[1], timestamps[1],
+				vertices);
+
+		// Verify all promises removed
+		assertEquals(0, getSavepointPromises(coordinator).size());
+
+		coordinator.shutdown();
+	}
+
+	@Test
+	public void testShutdownDoesNotCleanUpCompletedCheckpointsWithFileSystemStore() throws Exception {
+		ApplicationID appId = new ApplicationID();
+		JobID jobId = new JobID();
+		long checkpointTimeout = 60 * 1000;
+		long timestamp = 1272635;
+		ExecutionVertex[] vertices = new ExecutionVertex[] {
+				mockExecutionVertex(jobId),
+				mockExecutionVertex(jobId) };
+		MockCheckpointIdCounter checkpointIdCounter = new MockCheckpointIdCounter();
+
+		// Temporary directory for file state backend
+		final File tmpDir = CommonTestUtils.createTempDirectory();
+
+		try {
+			FileSystemStateStore<Savepoint> savepointStore = new FileSystemStateStore<>(
+					tmpDir.toURI().toString(), "sp-");
+
+			SavepointCoordinator coordinator = createSavepointCoordinator(
+					appId,
+					jobId,
+					checkpointTimeout,
+					vertices,
+					vertices,
+					vertices,
+					checkpointIdCounter,
+					savepointStore);
+
+			// Trigger the savepoint
+			Future<String> savepointPathFuture = coordinator.triggerSavepoint(timestamp);
+			assertFalse(savepointPathFuture.isCompleted());
+
+			long checkpointId = checkpointIdCounter.getLastReturnedCount();
+			assertEquals(0, checkpointId);
+
+			// Verify send trigger messages
+			for (ExecutionVertex vertex : vertices) {
+				verifyTriggerCheckpoint(vertex, checkpointId, timestamp);
+			}
+
+			PendingCheckpoint pendingCheckpoint = coordinator.getPendingCheckpoints()
+					.get(checkpointId);
+
+			verifyPendingCheckpoint(pendingCheckpoint, jobId, checkpointId,
+					timestamp, 0, 2, 0, false, false);
+
+			// Acknowledge tasks
+			for (ExecutionVertex vertex : vertices) {
+				coordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(
+						jobId, vertex.getCurrentExecutionAttempt().getAttemptId(),
+						checkpointId, createSerializedStateHandle(vertex), 0));
+			}
+
+			// The pending checkpoint is completed
+			assertTrue(pendingCheckpoint.isDiscarded());
+			assertEquals(0, coordinator.getSuccessfulCheckpoints().size());
+
+			// Verify send notify complete messages
+			for (ExecutionVertex vertex : vertices) {
+				verifyNotifyCheckpointComplete(vertex, checkpointId, timestamp);
+			}
+
+			// Verify that the future has been completed
+			assertTrue(savepointPathFuture.isCompleted());
+			String savepointPath = Await.result(savepointPathFuture, FiniteDuration.Zero());
+
+			// Verify all promises removed
+			assertEquals(0, getSavepointPromises(coordinator).size());
+
+			coordinator.shutdown();
+
+			// Verify the savepoint is still available
+			Savepoint savepoint = savepointStore.getState(savepointPath);
+			verifySavepoint(savepoint, appId, jobId, checkpointId, timestamp,
+					vertices);
+		}
+		finally {
+			FileUtils.deleteDirectory(tmpDir);
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	// Test helpers
+	// ------------------------------------------------------------------------
+
+	private static SavepointCoordinator createSavepointCoordinator(
+			ApplicationID appId,
+			JobID jobId,
+			long checkpointTimeout,
+			ExecutionVertex[] triggerVertices,
+			ExecutionVertex[] ackVertices,
+			ExecutionVertex[] commitVertices,
+			CheckpointIDCounter checkpointIdCounter,
+			StateStore<Savepoint> savepointStore) throws Exception {
+
+		ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+
+		return new SavepointCoordinator(
+				appId,
+				jobId,
+				checkpointTimeout,
+				checkpointTimeout,
+				triggerVertices,
+				ackVertices,
+				commitVertices,
+				classLoader,
+				checkpointIdCounter,
+				savepointStore,
+				new DisabledCheckpointStatsTracker());
+	}
+
+	private static Map<JobVertexID, ExecutionJobVertex> createExecutionJobVertexMap(
+			ExecutionJobVertex... jobVertices) {
+
+		Map<JobVertexID, ExecutionJobVertex> jobVertexMap = new HashMap<>();
+
+		for (ExecutionJobVertex jobVertex : jobVertices) {
+			jobVertexMap.put(jobVertex.getJobVertexId(), jobVertex);
+		}
+
+		return jobVertexMap;
+	}
+
+	private static SerializedValue<StateHandle<?>> createSerializedStateHandle(
+			ExecutionVertex vertex) throws IOException {
+
+		return new SerializedValue<StateHandle<?>>(new LocalStateHandle<Serializable>(
+				vertex.getCurrentExecutionAttempt().getAttemptId()));
+	}
+
+	@SuppressWarnings("unchecked")
+	private Map<Long, Promise<String>> getSavepointPromises(
+			SavepointCoordinator coordinator)
+			throws NoSuchFieldException, IllegalAccessException {
+
+		Field field = SavepointCoordinator.class.getDeclaredField("savepointPromises");
+		field.setAccessible(true);
+		return (Map<Long, Promise<String>>) field.get(coordinator);
+	}
+
+	// ---- Verification ------------------------------------------------------
+
+	private static void verifyTriggerCheckpoint(
+			ExecutionVertex mockExecutionVertex,
+			long expectedCheckpointId,
+			long expectedTimestamp) {
+
+		ExecutionAttemptID attemptId = mockExecutionVertex
+				.getCurrentExecutionAttempt().getAttemptId();
+
+		TriggerCheckpoint expectedMsg = new TriggerCheckpoint(
+				mockExecutionVertex.getJobId(),
+				attemptId,
+				expectedCheckpointId,
+				expectedTimestamp);
+
+		verify(mockExecutionVertex).sendMessageToCurrentExecution(
+				eq(expectedMsg), eq(attemptId));
+	}
+
+	private static void verifyNotifyCheckpointComplete(
+			ExecutionVertex mockExecutionVertex,
+			long expectedCheckpointId,
+			long expectedTimestamp) {
+
+		ExecutionAttemptID attemptId = mockExecutionVertex
+				.getCurrentExecutionAttempt().getAttemptId();
+
+		NotifyCheckpointComplete expectedMsg = new NotifyCheckpointComplete(
+				mockExecutionVertex.getJobId(),
+				attemptId,
+				expectedCheckpointId,
+				expectedTimestamp);
+
+		verify(mockExecutionVertex).sendMessageToCurrentExecution(
+				eq(expectedMsg), eq(attemptId));
+	}
+
+	private static void verifyPendingCheckpoint(
+			PendingCheckpoint checkpoint,
+			JobID expectedJobId,
+			long expectedCheckpointId,
+			long expectedTimestamp,
+			int expectedNumberOfAcknowledgedTasks,
+			int expectedNumberOfNonAcknowledgedTasks,
+			int expectedNumberOfCollectedStates,
+			boolean expectedIsDiscarded,
+			boolean expectedIsFullyAcknowledged) {
+
+		assertNotNull(checkpoint);
+		assertEquals(expectedJobId, checkpoint.getJobId());
+		assertEquals(expectedCheckpointId, checkpoint.getCheckpointId());
+		assertEquals(expectedTimestamp, checkpoint.getCheckpointTimestamp());
+		assertEquals(expectedNumberOfAcknowledgedTasks, checkpoint.getNumberOfAcknowledgedTasks());
+		assertEquals(expectedNumberOfNonAcknowledgedTasks, checkpoint.getNumberOfNonAcknowledgedTasks());
+		assertEquals(expectedNumberOfCollectedStates, checkpoint.getCollectedStates().size());
+		assertEquals(expectedIsDiscarded, checkpoint.isDiscarded());
+		assertEquals(expectedIsFullyAcknowledged, checkpoint.isFullyAcknowledged());
+	}
+
+	private static void verifySavepoint(
+			Savepoint savepoint,
+			ApplicationID expectedAppId,
+			JobID expectedJobId,
+			long expectedCheckpointId,
+			long expectedTimestamp,
+			ExecutionVertex[] expectedVertices) throws Exception {
+
+		assertEquals(expectedAppId, savepoint.getApplicationId());
+
+		verifyCompletedCheckpoint(
+				savepoint.getCompletedCheckpoint(),
+				expectedJobId,
+				expectedCheckpointId,
+				expectedTimestamp,
+				expectedVertices
+		);
+	}
+
+	private static void verifyCompletedCheckpoint(
+			CompletedCheckpoint checkpoint,
+			JobID expectedJobId,
+			long expectedCheckpointId,
+			long expectedTimestamp,
+			ExecutionVertex[] expectedVertices) throws Exception {
+
+		assertNotNull(checkpoint);
+		assertEquals(expectedJobId, checkpoint.getJobId());
+		assertEquals(expectedCheckpointId, checkpoint.getCheckpointID());
+		assertEquals(expectedTimestamp, checkpoint.getTimestamp());
+
+		List<StateForTask> states = checkpoint.getStates();
+		assertEquals(expectedVertices.length, states.size());
+
+		for (ExecutionVertex vertex : expectedVertices) {
+			JobVertexID expectedOperatorId = vertex.getJobvertexId();
+
+			boolean success = false;
+			for (StateForTask state : states) {
+				if (state.getOperatorId().equals(expectedOperatorId)) {
+					ExecutionAttemptID vertexAttemptId = vertex.getCurrentExecutionAttempt().getAttemptId();
+					ExecutionAttemptID stateAttemptId = (ExecutionAttemptID) state.getState()
+							.deserializeValue(Thread.currentThread().getContextClassLoader())
+							.getState(Thread.currentThread().getContextClassLoader());
+
+					assertEquals(vertexAttemptId, stateAttemptId);
+					success = true;
+					break;
+				}
+			}
+
+			assertTrue(success);
+		}
+	}
+
+	// ---- Mocking -----------------------------------------------------------
+
+	private static ExecutionJobVertex mockExecutionJobVertex(
+			JobID jobId,
+			JobVertexID jobVertexId,
+			int parallelism) {
+
+		ExecutionJobVertex jobVertex = mock(ExecutionJobVertex.class);
+		when(jobVertex.getJobId()).thenReturn(jobId);
+		when(jobVertex.getJobVertexId()).thenReturn(jobVertexId);
+		when(jobVertex.getParallelism()).thenReturn(parallelism);
+
+		ExecutionVertex[] vertices = new ExecutionVertex[parallelism];
+
+		for (int i = 0; i < vertices.length; i++) {
+			vertices[i] = mockExecutionVertex(jobId, jobVertexId, i, ExecutionState.RUNNING);
+		}
+
+		when(jobVertex.getTaskVertices()).thenReturn(vertices);
+
+		return jobVertex;
+	}
+
+	private static ExecutionVertex mockExecutionVertex(JobID jobId) {
+		return mockExecutionVertex(jobId, ExecutionState.RUNNING);
+	}
+
+	private static ExecutionVertex mockExecutionVertex(
+			JobID jobId,
+			ExecutionState state) {
+
+		return mockExecutionVertex(jobId, new JobVertexID(), 0, state);
+	}
+
+	private static ExecutionVertex mockExecutionVertex(
+			JobID jobId,
+			JobVertexID jobVertexId,
+			int subtaskIndex,
+			ExecutionState executionState) {
+
+		Execution exec = mock(Execution.class);
+		when(exec.getAttemptId()).thenReturn(new ExecutionAttemptID());
+		when(exec.getState()).thenReturn(executionState);
+
+		ExecutionVertex vertex = mock(ExecutionVertex.class);
+		when(vertex.getJobId()).thenReturn(jobId);
+		when(vertex.getJobvertexId()).thenReturn(jobVertexId);
+		when(vertex.getParallelSubtaskIndex()).thenReturn(subtaskIndex);
+		when(vertex.getCurrentExecutionAttempt()).thenReturn(exec);
+
+		return vertex;
+	}
+
+	private static class MockCheckpointIdCounter implements CheckpointIDCounter {
+
+		private long count;
+		private long lastReturnedCount;
+
+		@Override
+		public void start() throws Exception {
+		}
+
+		@Override
+		public void stop() throws Exception {
+		}
+
+		@Override
+		public long getAndIncrement() throws Exception {
+			lastReturnedCount = count;
+			return count++;
+		}
+
+		@Override
+		public void setCount(long newCount) {
+			count = newCount;
+		}
+
+		long getLastReturnedCount() {
+			return lastReturnedCount;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d739ee25/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SavepointStoreFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SavepointStoreFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SavepointStoreFactoryTest.java
new file mode 100644
index 0000000..3d3238d
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SavepointStoreFactoryTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class SavepointStoreFactoryTest {
+
+	@Test
+	public void testStateStoreWithDefaultConfig() throws Exception {
+		SavepointStore store = SavepointStoreFactory.createFromConfig(new Configuration());
+		assertTrue(store.getStateStore() instanceof HeapStateStore);
+	}
+
+	@Test
+	public void testSavepointBackendJobManager() throws Exception {
+		Configuration config = new Configuration();
+		config.setString(SavepointStoreFactory.SAVEPOINT_BACKEND_KEY, "jobmanager");
+		SavepointStore store = SavepointStoreFactory.createFromConfig(config);
+		assertTrue(store.getStateStore() instanceof HeapStateStore);
+	}
+
+	@Test
+	public void testSavepointBackendFileSystem() throws Exception {
+		Configuration config = new Configuration();
+		String rootPath = System.getProperty("java.io.tmpdir");
+		config.setString(ConfigConstants.STATE_BACKEND, "filesystem");
+		config.setString(SavepointStoreFactory.SAVEPOINT_BACKEND_KEY, "filesystem");
+		config.setString(SavepointStoreFactory.SAVEPOINT_DIRECTORY_KEY, rootPath);
+
+		SavepointStore store = SavepointStoreFactory.createFromConfig(config);
+		assertTrue(store.getStateStore() instanceof FileSystemStateStore);
+
+		FileSystemStateStore<Savepoint> stateStore = (FileSystemStateStore<Savepoint>)
+				store.getStateStore();
+		assertEquals(new Path(rootPath), stateStore.getRootPath());
+	}
+
+	@Test
+	public void testSavepointBackendFileSystemButCheckpointBackendJobManager() throws Exception {
+		Configuration config = new Configuration();
+
+		// This combination does not make sense, because the checkpoints will be
+		// lost after the job manager shuts down.
+		config.setString(ConfigConstants.STATE_BACKEND, "jobmanager");
+		config.setString(SavepointStoreFactory.SAVEPOINT_BACKEND_KEY, "filesystem");
+		SavepointStore store = SavepointStoreFactory.createFromConfig(config);
+		assertTrue(store.getStateStore() instanceof HeapStateStore);
+	}
+
+	@Test
+	public void testSavepointBackendFileSystemButNoDirectory() throws Exception {
+		Configuration config = new Configuration();
+		config.setString(SavepointStoreFactory.SAVEPOINT_BACKEND_KEY, "filesystem");
+		SavepointStore store = SavepointStoreFactory.createFromConfig(config);
+		assertTrue(store.getStateStore() instanceof HeapStateStore);
+	}
+
+	@Test
+	public void testUnexpectedSavepointBackend() throws Exception {
+		Configuration config = new Configuration();
+		config.setString(SavepointStoreFactory.SAVEPOINT_BACKEND_KEY, "unexpected");
+		SavepointStore store = SavepointStoreFactory.createFromConfig(config);
+		assertTrue(store.getStateStore() instanceof HeapStateStore);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d739ee25/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
index 0f800c9..ec54b7e 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
@@ -19,22 +19,25 @@
 package org.apache.flink.runtime.jobmanager
 
 
-import Tasks._
 import akka.actor.ActorSystem
-import akka.actor.Status.{Success, Failure}
 import akka.testkit.{ImplicitSender, TestKit}
 import akka.util.Timeout
+import org.apache.flink.api.common.JobID
 import org.apache.flink.runtime.akka.ListeningBehaviour
+import org.apache.flink.runtime.checkpoint.{CheckpointCoordinator, SavepointCoordinator}
 import org.apache.flink.runtime.client.JobExecutionException
-import org.apache.flink.runtime.jobgraph.{JobVertex, DistributionPattern, JobGraph, ScheduleMode}
+import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings
+import org.apache.flink.runtime.jobgraph.{DistributionPattern, JobGraph, JobVertex, ScheduleMode}
+import org.apache.flink.runtime.jobmanager.Tasks._
+import org.apache.flink.runtime.jobmanager.scheduler.{NoResourceAvailableException, SlotSharingGroup}
 import org.apache.flink.runtime.messages.JobManagerMessages._
-import org.apache.flink.runtime.testingUtils.{TestingUtils, ScalaTestingUtils}
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._
-import org.apache.flink.runtime.jobmanager.scheduler.{NoResourceAvailableException, SlotSharingGroup}
-
+import org.apache.flink.runtime.testingUtils.{ScalaTestingUtils, TestingUtils}
+import org.apache.flink.runtime.testutils.JobManagerActorTestUtils
 import org.junit.runner.RunWith
-import org.scalatest.{Matchers, BeforeAndAfterAll, WordSpecLike}
+import org.mockito.Mockito._
 import org.scalatest.junit.JUnitRunner
+import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
 
 import scala.concurrent.Await
 import scala.concurrent.duration._
@@ -733,6 +736,236 @@ class JobManagerITCase(_system: ActorSystem)
       }
     }
 
+    // ------------------------------------------------------------------------
+    // Savepoint messages
+    // ------------------------------------------------------------------------
+
+    "handle trigger savepoint response for non-existing job" in {
+      val deadline = TestingUtils.TESTING_DURATION.fromNow
+
+      val flinkCluster = TestingUtils.startTestingCluster(0, 0)
+
+      try {
+        within(deadline.timeLeft) {
+          val jobManager = flinkCluster
+            .getLeaderGateway(deadline.timeLeft)
+
+          val jobId = new JobID()
+
+          // Trigger savepoint for non-existing job
+          jobManager.tell(TriggerSavepoint(jobId), testActor)
+          val response = expectMsgType[TriggerSavepointFailure](deadline.timeLeft)
+
+          // Verify the response
+          response.jobId should equal(jobId)
+          response.cause.getClass should equal(classOf[IllegalArgumentException])
+        }
+      }
+      finally {
+        flinkCluster.stop()
+      }
+    }
+
+    "handle trigger savepoint response for job with disabled checkpointing" in {
+      val deadline = TestingUtils.TESTING_DURATION.fromNow
+
+      val flinkCluster = TestingUtils.startTestingCluster(1, 1)
+
+      try {
+        within(deadline.timeLeft) {
+          val jobManager = flinkCluster
+            .getLeaderGateway(deadline.timeLeft)
+
+          val jobVertex = new JobVertex("Blocking vertex")
+          jobVertex.setInvokableClass(classOf[BlockingNoOpInvokable])
+          val jobGraph = new JobGraph(jobVertex)
+
+          // Submit job w/o checkpointing configured
+          jobManager.tell(SubmitJob(jobGraph, ListeningBehaviour.DETACHED), testActor)
+          expectMsg(JobSubmitSuccess(jobGraph.getJobID()))
+
+          // Trigger savepoint for job with disabled checkpointing
+          jobManager.tell(TriggerSavepoint(jobGraph.getJobID()), testActor)
+          val response = expectMsgType[TriggerSavepointFailure](deadline.timeLeft)
+
+          // Verify the response
+          response.jobId should equal(jobGraph.getJobID())
+          response.cause.getClass should equal(classOf[IllegalStateException])
+          response.cause.getMessage should (include("disabled") or include("configured"))
+        }
+      }
+      finally {
+        flinkCluster.stop()
+      }
+    }
+
+    "handle trigger savepoint response after trigger savepoint failure" in {
+      val deadline = TestingUtils.TESTING_DURATION.fromNow
+
+      val flinkCluster = TestingUtils.startTestingCluster(1, 1)
+
+      try {
+        within(deadline.timeLeft) {
+          val jobManager = flinkCluster
+            .getLeaderGateway(deadline.timeLeft)
+
+          val jobVertex = new JobVertex("Blocking vertex")
+          jobVertex.setInvokableClass(classOf[BlockingNoOpInvokable])
+          val jobGraph = new JobGraph(jobVertex)
+          jobGraph.setSnapshotSettings(new JobSnapshottingSettings(
+            java.util.Collections.emptyList(),
+            java.util.Collections.emptyList(),
+            java.util.Collections.emptyList(),
+            60000, 60000, 60000, 1))
+
+          // Submit job...
+          jobManager.tell(SubmitJob(jobGraph, ListeningBehaviour.DETACHED), testActor)
+          expectMsg(JobSubmitSuccess(jobGraph.getJobID()))
+
+          // Request the execution graph and set a checkpoint coordinator mock
+          jobManager.tell(RequestExecutionGraph(jobGraph.getJobID), testActor)
+          val executionGraph = expectMsgType[ExecutionGraphFound](
+            deadline.timeLeft).executionGraph
+
+          // Mock the checkpoint coordinator
+          val savepointCoordinator = mock(classOf[SavepointCoordinator])
+          doThrow(new Exception("Expected Test Exception"))
+            .when(savepointCoordinator).triggerSavepoint(org.mockito.Matchers.anyLong())
+
+          // Update the savepoint coordinator field
+          val field = executionGraph.getClass.getDeclaredField("savepointCoordinator")
+          field.setAccessible(true)
+          field.set(executionGraph, savepointCoordinator)
+
+          // Trigger savepoint for job
+          jobManager.tell(TriggerSavepoint(jobGraph.getJobID()), testActor)
+          val response = expectMsgType[TriggerSavepointFailure](deadline.timeLeft)
+
+          // Verify the response
+          response.jobId should equal(jobGraph.getJobID())
+          response.cause.getCause.getClass should equal(classOf[Exception])
+          response.cause.getCause.getMessage should equal("Expected Test Exception")
+        }
+      }
+      finally {
+        flinkCluster.stop()
+      }
+    }
+
+    "handle trigger savepoint response after failed savepoint future" in {
+      val deadline = TestingUtils.TESTING_DURATION.fromNow
+
+      val flinkCluster = TestingUtils.startTestingCluster(1, 1)
+
+      try {
+        within(deadline.timeLeft) {
+          val jobManager = flinkCluster
+            .getLeaderGateway(deadline.timeLeft)
+
+          val jobVertex = new JobVertex("Blocking vertex")
+          jobVertex.setInvokableClass(classOf[BlockingNoOpInvokable])
+          val jobGraph = new JobGraph(jobVertex)
+          jobGraph.setSnapshotSettings(new JobSnapshottingSettings(
+            java.util.Collections.emptyList(),
+            java.util.Collections.emptyList(),
+            java.util.Collections.emptyList(),
+            60000, 60000, 60000, 1))
+
+          // Submit job...
+          jobManager.tell(SubmitJob(jobGraph, ListeningBehaviour.DETACHED), testActor)
+          expectMsg(JobSubmitSuccess(jobGraph.getJobID()))
+
+          // Mock the checkpoint coordinator
+          val savepointCoordinator = mock(classOf[SavepointCoordinator])
+          val savepointPathPromise = scala.concurrent.promise[String]
+          doReturn(savepointPathPromise.future)
+            .when(savepointCoordinator).triggerSavepoint(org.mockito.Matchers.anyLong())
+
+          // Request the execution graph and set a checkpoint coordinator mock
+          jobManager.tell(RequestExecutionGraph(jobGraph.getJobID), testActor)
+          val executionGraph = expectMsgType[ExecutionGraphFound](
+            deadline.timeLeft).executionGraph
+
+          // Update the savepoint coordinator field
+          val field = executionGraph.getClass.getDeclaredField("savepointCoordinator")
+          field.setAccessible(true)
+          field.set(executionGraph, savepointCoordinator)
+
+          // Trigger savepoint for job
+          jobManager.tell(TriggerSavepoint(jobGraph.getJobID()), testActor)
+
+          // Fail the promise
+          savepointPathPromise.failure(new Exception("Expected Test Exception"))
+
+          val response = expectMsgType[TriggerSavepointFailure](deadline.timeLeft)
+
+          // Verify the response
+          response.jobId should equal(jobGraph.getJobID())
+          response.cause.getCause.getClass should equal(classOf[Exception])
+          response.cause.getCause.getMessage should equal("Expected Test Exception")
+        }
+      }
+      finally {
+        flinkCluster.stop()
+      }
+    }
+
+    "handle trigger savepoint response after succeeded savepoint future" in {
+      val deadline = TestingUtils.TESTING_DURATION.fromNow
+
+      val flinkCluster = TestingUtils.startTestingCluster(1, 1)
+
+      try {
+        within(deadline.timeLeft) {
+          val jobManager = flinkCluster
+            .getLeaderGateway(deadline.timeLeft)
+
+          val jobVertex = new JobVertex("Blocking vertex")
+          jobVertex.setInvokableClass(classOf[BlockingNoOpInvokable])
+          val jobGraph = new JobGraph(jobVertex)
+          jobGraph.setSnapshotSettings(new JobSnapshottingSettings(
+            java.util.Collections.emptyList(),
+            java.util.Collections.emptyList(),
+            java.util.Collections.emptyList(),
+            60000, 60000, 60000, 1))
+
+          // Submit job...
+          jobManager.tell(SubmitJob(jobGraph, ListeningBehaviour.DETACHED), testActor)
+          expectMsg(JobSubmitSuccess(jobGraph.getJobID()))
+
+          // Mock the checkpoint coordinator
+          val savepointCoordinator = mock(classOf[SavepointCoordinator])
+          val savepointPathPromise = scala.concurrent.promise[String]
+          doReturn(savepointPathPromise.future)
+            .when(savepointCoordinator).triggerSavepoint(org.mockito.Matchers.anyLong())
+
+          // Request the execution graph and set a checkpoint coordinator mock
+          jobManager.tell(RequestExecutionGraph(jobGraph.getJobID), testActor)
+          val executionGraph = expectMsgType[ExecutionGraphFound](
+            deadline.timeLeft).executionGraph
+
+          // Update the savepoint coordinator field
+          val field = executionGraph.getClass.getDeclaredField("savepointCoordinator")
+          field.setAccessible(true)
+          field.set(executionGraph, savepointCoordinator)
+
+          // Trigger savepoint for job
+          jobManager.tell(TriggerSavepoint(jobGraph.getJobID()), testActor)
+
+          // Succeed the promise
+          savepointPathPromise.success("Expected test savepoint path")
+
+          val response = expectMsgType[TriggerSavepointSuccess](deadline.timeLeft)
+
+          // Verify the response
+          response.jobId should equal(jobGraph.getJobID())
+          response.savepointPath should equal("Expected test savepoint path")
+        }
+      }
+      finally {
+        flinkCluster.stop()
+      }
+    }
   }
 
   class WaitingOnFinalizeJobVertex(name: String, val waitingTime: Long) extends JobVertex(name){

http://git-wip-us.apache.org/repos/asf/flink/blob/d739ee25/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
index b8f4ede..748d517 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
@@ -285,6 +285,16 @@ trait TestingJobManagerLike extends FlinkActor {
     case DisablePostStop =>
       postStopEnabled = false
 
+    case RequestSavepoint(savepointPath) =>
+      try {
+        val savepoint = savepointStore.getState(savepointPath)
+        sender ! ResponseSavepoint(savepoint)
+      }
+      catch {
+        case e: Exception =>
+          sender ! ResponseSavepoint(null)
+      }
+
     case msg: Disconnect =>
       if (!disconnectDisabled) {
         super.handleMessage(msg)

http://git-wip-us.apache.org/repos/asf/flink/blob/d739ee25/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
index e4d0a6f..1b1caec 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
@@ -22,6 +22,7 @@ import akka.actor.ActorRef
 import org.apache.flink.api.common.JobID
 import org.apache.flink.api.common.accumulators.Accumulator
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry
+import org.apache.flink.runtime.checkpoint.Savepoint
 import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID, ExecutionGraph}
 import org.apache.flink.runtime.instance.ActorGateway
 import org.apache.flink.runtime.jobgraph.JobStatus
@@ -93,6 +94,21 @@ object TestingJobManagerMessages {
     */
   case object DisablePostStop
 
+  /**
+    * Requests a savepoint from the job manager.
+    *
+    * @param savepointPath The path of the savepoint to request.
+    */
+  case class RequestSavepoint(savepointPath: String)
+
+  /**
+    * Response to a savepoint request.
+    *
+    * @param savepoint The requested savepoint or null if none available.
+    */
+  case class ResponseSavepoint(savepoint: Savepoint)
+
   def getNotifyWhenLeader(): AnyRef = NotifyWhenLeader
   def getDisablePostStop(): AnyRef = DisablePostStop
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d739ee25/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
index 0b38c9c..dbe871d 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
@@ -18,27 +18,13 @@
 
 package org.apache.flink.runtime.testingUtils
 
-import akka.actor.{Terminated, ActorRef}
-import org.apache.flink.runtime.execution.ExecutionState
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
 import org.apache.flink.runtime.instance.InstanceConnectionInfo
 import org.apache.flink.runtime.io.disk.iomanager.IOManager
 import org.apache.flink.runtime.io.network.NetworkEnvironment
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
 import org.apache.flink.runtime.memory.MemoryManager
-import org.apache.flink.runtime.messages.JobManagerMessages.{ResponseLeaderSessionID,
-RequestLeaderSessionID}
-import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect}
-import org.apache.flink.runtime.messages.RegistrationMessages.{AlreadyRegistered,
-AcknowledgeRegistration}
-import org.apache.flink.runtime.messages.TaskMessages.{UpdateTaskExecutionState, TaskInFinalState}
-import org.apache.flink.runtime.taskmanager.{TaskManagerConfiguration, TaskManager}
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobRemoved
-import org.apache.flink.runtime.testingUtils.TestingMessages.{CheckIfJobRemoved, Alive,
-DisableDisconnect}
-import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages._
+import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerConfiguration}
 
-import scala.concurrent.duration._
 import scala.language.postfixOps
 
 /** Subclass of the [[TaskManager]] to support testing messages

http://git-wip-us.apache.org/repos/asf/flink/blob/d739ee25/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala
index 0350675..c10e83e 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala
@@ -19,7 +19,9 @@
 package org.apache.flink.runtime.testingUtils
 
 import akka.actor.{Terminated, ActorRef}
+import org.apache.flink.api.common.JobID
 import org.apache.flink.runtime.FlinkActor
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor
 import org.apache.flink.runtime.execution.ExecutionState
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
 import org.apache.flink.runtime.messages.JobManagerMessages.{ResponseLeaderSessionID,
@@ -27,7 +29,7 @@ RequestLeaderSessionID}
 import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect}
 import org.apache.flink.runtime.messages.RegistrationMessages.{AlreadyRegistered,
 AcknowledgeRegistration}
-import org.apache.flink.runtime.messages.TaskMessages.{UpdateTaskExecutionState, TaskInFinalState}
+import org.apache.flink.runtime.messages.TaskMessages.{SubmitTask, UpdateTaskExecutionState, TaskInFinalState}
 import org.apache.flink.runtime.taskmanager.TaskManager
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobRemoved
 import org.apache.flink.runtime.testingUtils.TestingMessages.{DisableDisconnect,
@@ -50,6 +52,9 @@ trait TestingTaskManagerLike extends FlinkActor {
   val waitForRunning = scala.collection.mutable.HashMap[ExecutionAttemptID, Set[ActorRef]]()
   val unregisteredTasks = scala.collection.mutable.HashSet[ExecutionAttemptID]()
 
+  /** Map of registered task submit listeners */
+  val registeredSubmitTaskListeners = scala.collection.mutable.HashMap[JobID, ActorRef]()
+
   var disconnectDisabled = false
 
   /**
@@ -142,6 +147,19 @@ trait TestingTaskManagerLike extends FlinkActor {
       val waiting = waitForJobManagerToBeTerminated.getOrElse(jobManager.path.name, Set())
       waitForJobManagerToBeTerminated += jobManager.path.name -> (waiting + sender)
 
+    case RegisterSubmitTaskListener(jobId) =>
+      registeredSubmitTaskListeners.put(jobId, sender())
+
+    case msg@SubmitTask(tdd) =>
+      registeredSubmitTaskListeners.get(tdd.getJobID) match {
+        case Some(listenerRef) =>
+          listenerRef ! ResponseSubmitTaskListener(tdd)
+        case None =>
+        // Nothing to do
+      }
+
+      super.handleMessage(msg)
+
     /**
      * Message from task manager that accumulator values changed and need to be reported immediately
      * instead of lazily through the

http://git-wip-us.apache.org/repos/asf/flink/blob/d739ee25/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala
index ca57245..69a65e2 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.testingUtils
 
 import akka.actor.ActorRef
 import org.apache.flink.api.common.JobID
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
 import org.apache.flink.runtime.taskmanager.Task
 
@@ -59,6 +60,24 @@ object TestingTaskManagerMessages {
    */
   case class AccumulatorsChanged(jobID: JobID)
 
+  /**
+    * Registers a listener for all [[org.apache.flink.runtime.messages.TaskMessages.SubmitTask]]
+    * messages of the given job.
+    *
+    * If a task is submitted with the given job ID the task deployment
+    * descriptor is forwarded to the listener.
+    *
+    * @param jobId The job ID to listen for.
+    */
+  case class RegisterSubmitTaskListener(jobId: JobID)
+
+  /**
+    * A response to a listened job ID containing the submitted task deployment descriptor.
+    *
+    * @param tdd The submitted task deployment descriptor.
+    */
+  case class ResponseSubmitTaskListener(tdd: TaskDeploymentDescriptor)
+
   // --------------------------------------------------------------------------
   // Utility methods to allow simpler case object access from Java
   // --------------------------------------------------------------------------
@@ -70,5 +89,6 @@ object TestingTaskManagerMessages {
   def getRequestBroadcastVariablesWithReferencesMessage: AnyRef = {
     RequestBroadcastVariablesWithReferences
   }
+
 }
 


Mime
View raw message