flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [2/3] flink git commit: [FLINK-4322] [checkpointing] Add and fix tests for unified Checkpoint/Savepoint Coordinator
Date Wed, 17 Aug 2016 17:35:26 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/47acdead/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinatorTest.java
deleted file mode 100644
index 6b4f354..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinatorTest.java
+++ /dev/null
@@ -1,1119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.checkpoint.savepoint;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
-import org.apache.flink.runtime.checkpoint.PendingCheckpoint;
-import org.apache.flink.runtime.checkpoint.SubtaskState;
-import org.apache.flink.runtime.checkpoint.TaskState;
-import org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker;
-import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.Execution;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
-import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
-import org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete;
-import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint;
-import org.apache.flink.runtime.state.LocalStateHandle;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.runtime.testutils.CommonTestUtils;
-import org.apache.flink.util.SerializedValue;
-import org.apache.flink.util.TestLogger;
-import org.junit.Test;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.Promise;
-import scala.concurrent.duration.Deadline;
-import scala.concurrent.duration.FiniteDuration;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.Serializable;
-import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-/**
- * Tests for the savepoint coordinator.
- */
-public class SavepointCoordinatorTest extends TestLogger {
-
-	// ------------------------------------------------------------------------
-	// Trigger and acknowledge
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Simple trigger-acknowledge test for a single savepoint.
-	 */
-	@Test
-	public void testSimpleTriggerSavepoint() throws Exception {
-		JobID jobId = new JobID();
-		long checkpointTimeout = 60 * 1000;
-		long timestamp = 1272635;
-		ExecutionVertex[] vertices = new ExecutionVertex[] {
-				mockExecutionVertex(jobId),
-				mockExecutionVertex(jobId) };
-		MockCheckpointIdCounter checkpointIdCounter = new MockCheckpointIdCounter();
-		HeapSavepointStore savepointStore = new HeapSavepointStore();
-
-		SavepointCoordinator coordinator = createSavepointCoordinator(
-				jobId,
-				checkpointTimeout,
-				vertices,
-				vertices,
-				vertices,
-				checkpointIdCounter,
-				savepointStore);
-
-		// Trigger the savepoint
-		Future<String> savepointPathFuture = coordinator.triggerSavepoint(timestamp);
-		assertFalse(savepointPathFuture.isCompleted());
-
-		long checkpointId = checkpointIdCounter.getLastReturnedCount();
-		assertEquals(0, checkpointId);
-
-		// Verify send trigger messages
-		for (ExecutionVertex vertex : vertices) {
-			verifyTriggerCheckpoint(vertex, checkpointId, timestamp);
-		}
-
-		PendingCheckpoint pendingCheckpoint = coordinator.getPendingCheckpoints()
-				.get(checkpointId);
-
-		verifyPendingCheckpoint(pendingCheckpoint, jobId, checkpointId,
-				timestamp, 0, 2, 0, false, false);
-
-		// Acknowledge tasks
-		for (ExecutionVertex vertex : vertices) {
-			coordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(
-					jobId, vertex.getCurrentExecutionAttempt().getAttemptId(),
-					checkpointId, createSerializedStateHandle(vertex), 0));
-		}
-
-		// The pending checkpoint is completed
-		assertTrue(pendingCheckpoint.isDiscarded());
-		assertEquals(0, coordinator.getSuccessfulCheckpoints().size());
-
-		// Verify send notify complete messages
-		for (ExecutionVertex vertex : vertices) {
-			verifyNotifyCheckpointComplete(vertex, checkpointId, timestamp);
-		}
-
-		// Verify that the future has been completed
-		assertTrue(savepointPathFuture.isCompleted());
-		String savepointPath = Await.result(savepointPathFuture, FiniteDuration.Zero());
-
-		// Verify the savepoint
-		Savepoint savepoint = savepointStore.loadSavepoint(savepointPath);
-		verifySavepoint(savepoint, checkpointId, vertices);
-
-		// Verify all promises removed
-		assertEquals(0, getSavepointPromises(coordinator).size());
-
-		coordinator.shutdown();
-	}
-
-	/**
-	 * This test triggers a checkpoint and then sends a decline checkpoint message from
-	 * one of the tasks. The expected behaviour is that said checkpoint is discarded and a new
-	 * checkpoint is triggered.
-	 */
-	@Test
-	public void testTriggerAndDeclineCheckpointSimple() throws Exception {
-		JobID jobId = new JobID();
-		long checkpointTimeout = 60 * 1000;
-		long timestamp = 1272635;
-		ExecutionVertex[] vertices = new ExecutionVertex[] {
-				mockExecutionVertex(jobId),
-				mockExecutionVertex(jobId) };
-		MockCheckpointIdCounter checkpointIdCounter = new MockCheckpointIdCounter();
-		SavepointStore savepointStore = new HeapSavepointStore();
-
-		SavepointCoordinator coordinator = createSavepointCoordinator(
-				jobId,
-				checkpointTimeout,
-				vertices,
-				vertices,
-				vertices,
-				checkpointIdCounter,
-				savepointStore);
-
-		// Trigger the savepoint
-		Future<String> savepointPathFuture = coordinator.triggerSavepoint(timestamp);
-		assertFalse(savepointPathFuture.isCompleted());
-
-		long checkpointId = checkpointIdCounter.getLastReturnedCount();
-		assertEquals(0, checkpointId);
-
-		// Verify send trigger messages
-		for (ExecutionVertex vertex : vertices) {
-			verifyTriggerCheckpoint(vertex, checkpointId, timestamp);
-		}
-
-		PendingCheckpoint pendingCheckpoint = coordinator.getPendingCheckpoints()
-				.get(checkpointId);
-
-		verifyPendingCheckpoint(pendingCheckpoint, jobId, checkpointId,
-				timestamp, 0, 2, 0, false, false);
-
-		// Acknowledge and decline tasks
-		coordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(
-				jobId, vertices[0].getCurrentExecutionAttempt().getAttemptId(),
-				checkpointId, createSerializedStateHandle(vertices[0]), 0));
-
-		coordinator.receiveDeclineMessage(new DeclineCheckpoint(
-				jobId, vertices[1].getCurrentExecutionAttempt().getAttemptId(),
-				checkpointId, 0));
-
-
-		// The pending checkpoint is completed
-		assertTrue(pendingCheckpoint.isDiscarded());
-		assertEquals(0, coordinator.getSuccessfulCheckpoints().size());
-
-		// Verify that the future has been completed
-		assertTrue(savepointPathFuture.isCompleted());
-
-		try {
-			Await.result(savepointPathFuture.failed(), FiniteDuration.Zero());
-			fail("Did not throw expected exception");
-		} catch (Throwable ignored) {}
-
-		// Verify all promises removed
-		assertEquals(0, getSavepointPromises(coordinator).size());
-
-		coordinator.shutdown();
-	}
-
-	// ------------------------------------------------------------------------
-	// Rollback
-	// ------------------------------------------------------------------------
-
-	@Test
-	@SuppressWarnings("unchecked")
-	public void testSimpleRollbackSavepoint() throws Exception {
-		JobID jobId = new JobID();
-
-		ExecutionJobVertex[] jobVertices = new ExecutionJobVertex[] {
-				mockExecutionJobVertex(jobId, new JobVertexID(), 4),
-				mockExecutionJobVertex(jobId, new JobVertexID(), 4) };
-
-		ExecutionVertex[] triggerVertices = jobVertices[0].getTaskVertices();
-		ExecutionVertex[] ackVertices = new ExecutionVertex[8];
-
-		int i = 0;
-		for (ExecutionJobVertex jobVertex : jobVertices) {
-			for (ExecutionVertex vertex : jobVertex.getTaskVertices()) {
-				ackVertices[i++] = vertex;
-			}
-		}
-
-		MockCheckpointIdCounter idCounter = new MockCheckpointIdCounter();
-		HeapSavepointStore savepointStore = new HeapSavepointStore();
-
-		SavepointCoordinator coordinator = createSavepointCoordinator(
-				jobId,
-				60 * 1000,
-				triggerVertices,
-				ackVertices,
-				new ExecutionVertex[] {},
-				idCounter,
-				savepointStore);
-
-		Future<String> savepointPathFuture = coordinator.triggerSavepoint(1231273123);
-
-		// Acknowledge all tasks
-		for (ExecutionVertex vertex : ackVertices) {
-			ExecutionAttemptID attemptId = vertex.getCurrentExecutionAttempt().getAttemptId();
-			coordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(
-					jobId, attemptId, 0, createSerializedStateHandle(vertex), 0));
-		}
-
-		String savepointPath = Await.result(savepointPathFuture, FiniteDuration.Zero());
-		assertNotNull(savepointPath);
-
-		// Rollback
-		coordinator.restoreSavepoint(createExecutionJobVertexMap(jobVertices), savepointPath);
-
-		// Verify all executions have been reset
-		for (ExecutionVertex vertex : ackVertices) {
-			verify(vertex.getCurrentExecutionAttempt(), times(1)).setInitialState(
-					any(SerializedValue.class), any(Map.class));
-		}
-
-		// Verify all promises removed
-		assertEquals(0, getSavepointPromises(coordinator).size());
-
-		// Verify checkpoint ID counter started
-		assertTrue(idCounter.isStarted());
-
-		coordinator.shutdown();
-	}
-
-	@Test
-	public void testRollbackParallelismMismatch() throws Exception {
-		JobID jobId = new JobID();
-
-		ExecutionJobVertex[] jobVertices = new ExecutionJobVertex[] {
-				mockExecutionJobVertex(jobId, new JobVertexID(), 4),
-				mockExecutionJobVertex(jobId, new JobVertexID(), 4) };
-
-		ExecutionVertex[] triggerVertices = jobVertices[0].getTaskVertices();
-		ExecutionVertex[] ackVertices = new ExecutionVertex[8];
-
-		int index = 0;
-		for (ExecutionJobVertex jobVertex : jobVertices) {
-			for (ExecutionVertex vertex : jobVertex.getTaskVertices()) {
-				ackVertices[index++] = vertex;
-			}
-		}
-
-		HeapSavepointStore savepointStore = new HeapSavepointStore();
-
-		SavepointCoordinator coordinator = createSavepointCoordinator(
-				jobId,
-				60 * 1000,
-				triggerVertices,
-				ackVertices,
-				new ExecutionVertex[] {},
-				new MockCheckpointIdCounter(),
-				savepointStore);
-
-		Future<String> savepointPathFuture = coordinator.triggerSavepoint(1231273123);
-
-		// Acknowledge all tasks
-		for (ExecutionVertex vertex : ackVertices) {
-			ExecutionAttemptID attemptId = vertex.getCurrentExecutionAttempt().getAttemptId();
-			coordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(
-					jobId, attemptId, 0, createSerializedStateHandle(vertex), 0));
-		}
-
-		String savepointPath = Await.result(savepointPathFuture, FiniteDuration.Zero());
-		assertNotNull(savepointPath);
-
-		// Change parallelism lower than original (state without matching subtask). The
-		// other way around (subtask without matching state) is OK.
-		for (int i = 0; i < jobVertices.length; i++) {
-			jobVertices[i] = mockExecutionJobVertex(jobId, jobVertices[i].getJobVertexId(), 2);
-		}
-
-		try {
-			// Rollback
-			coordinator.restoreSavepoint(
-					createExecutionJobVertexMap(jobVertices),
-					savepointPath);
-			fail("Did not throw expected Exception after rollback with parallelism mismatch.");
-		}
-		catch (Exception ignored) {
-		}
-
-		// Verify all promises removed
-		assertEquals(0, getSavepointPromises(coordinator).size());
-
-		coordinator.shutdown();
-	}
-
-	@Test
-	public void testRollbackStateStoreFailure() throws Exception {
-		JobID jobId = new JobID();
-		ExecutionJobVertex jobVertex = mockExecutionJobVertex(jobId, new JobVertexID(), 4);
-		HeapSavepointStore savepointStore = spy(new HeapSavepointStore());
-
-		SavepointCoordinator coordinator = createSavepointCoordinator(
-				jobId,
-				60 * 1000,
-				jobVertex.getTaskVertices(),
-				jobVertex.getTaskVertices(),
-				new ExecutionVertex[] {},
-				new MockCheckpointIdCounter(),
-				savepointStore);
-
-		Future<String> savepointPathFuture = coordinator.triggerSavepoint(1231273123);
-
-		// Acknowledge all tasks
-		for (ExecutionVertex vertex : jobVertex.getTaskVertices()) {
-			ExecutionAttemptID attemptId = vertex.getCurrentExecutionAttempt().getAttemptId();
-			coordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(
-					jobId, attemptId, 0, createSerializedStateHandle(vertex), 0));
-		}
-
-		String savepointPath = Await.result(savepointPathFuture, FiniteDuration.Zero());
-		assertNotNull(savepointPath);
-
-		// Failure on getState
-		doThrow(new RuntimeException("TestException")).when(savepointStore).loadSavepoint(anyString());
-
-		try {
-			// Rollback
-			coordinator.restoreSavepoint(
-					createExecutionJobVertexMap(jobVertex),
-					savepointPath);
-
-			fail("Did not throw expected Exception after rollback with savepoint store failure.");
-		}
-		catch (Exception ignored) {
-		}
-
-		// Verify all promises removed
-		assertEquals(0, getSavepointPromises(coordinator).size());
-
-		coordinator.shutdown();
-	}
-
-	@Test
-	public void testRollbackSetsCheckpointID() throws Exception {
-		SavepointV0 savepoint = new SavepointV0(12312312L, Collections.<TaskState>emptyList());
-
-		CheckpointIDCounter checkpointIdCounter = mock(CheckpointIDCounter.class);
-
-		SavepointStore savepointStore = mock(SavepointStore.class);
-		when(savepointStore.loadSavepoint(anyString())).thenReturn(savepoint);
-
-		SavepointCoordinator coordinator = createSavepointCoordinator(
-				new JobID(),
-				60 * 1000,
-				new ExecutionVertex[] {},
-				new ExecutionVertex[] {},
-				new ExecutionVertex[] {},
-				checkpointIdCounter,
-				savepointStore);
-
-		coordinator.restoreSavepoint(createExecutionJobVertexMap(), "any");
-
-		verify(checkpointIdCounter).setCount(eq(12312312L + 1));
-
-		coordinator.shutdown();
-	}
-
-	// ------------------------------------------------------------------------
-	// Savepoint aborts and future notifications
-	// ------------------------------------------------------------------------
-
-	@Test
-	public void testAbortSavepointIfTriggerTasksNotExecuted() throws Exception {
-		JobID jobId = new JobID();
-		ExecutionVertex[] triggerVertices = new ExecutionVertex[] {
-				mock(ExecutionVertex.class),
-				mock(ExecutionVertex.class) };
-		ExecutionVertex[] ackVertices = new ExecutionVertex[] {
-				mockExecutionVertex(jobId),
-				mockExecutionVertex(jobId) };
-
-		SavepointCoordinator coordinator = createSavepointCoordinator(
-				jobId,
-				60 * 1000,
-				triggerVertices,
-				ackVertices,
-				new ExecutionVertex[] {},
-				new MockCheckpointIdCounter(),
-				new HeapSavepointStore());
-
-		// Trigger savepoint
-		Future<String> savepointPathFuture = coordinator.triggerSavepoint(1238123);
-
-		// Abort the savepoint, because the vertices are not running
-		assertTrue(savepointPathFuture.isCompleted());
-
-		try {
-			Await.result(savepointPathFuture, FiniteDuration.Zero());
-			fail("Did not throw expected Exception after shutdown");
-		}
-		catch (Exception ignored) {
-		}
-
-		// Verify all promises removed
-		assertEquals(0, getSavepointPromises(coordinator).size());
-
-		coordinator.shutdown();
-	}
-
-	@Test
-	public void testAbortSavepointIfTriggerTasksAreFinished() throws Exception {
-		JobID jobId = new JobID();
-		ExecutionVertex[] triggerVertices = new ExecutionVertex[] {
-				mockExecutionVertex(jobId),
-				mockExecutionVertex(jobId, ExecutionState.FINISHED) };
-		ExecutionVertex[] ackVertices = new ExecutionVertex[] {
-				mockExecutionVertex(jobId),
-				mockExecutionVertex(jobId) };
-
-		SavepointCoordinator coordinator = createSavepointCoordinator(
-				jobId,
-				60 * 1000,
-				triggerVertices,
-				ackVertices,
-				new ExecutionVertex[] {},
-				new MockCheckpointIdCounter(),
-				new HeapSavepointStore());
-
-		// Trigger savepoint
-		Future<String> savepointPathFuture = coordinator.triggerSavepoint(1238123);
-
-		// Abort the savepoint, because the vertices are not running
-		assertTrue(savepointPathFuture.isCompleted());
-
-		try {
-			Await.result(savepointPathFuture, FiniteDuration.Zero());
-			fail("Did not throw expected Exception after shutdown");
-		}
-		catch (Exception ignored) {
-		}
-
-		// Verify all promises removed
-		assertEquals(0, getSavepointPromises(coordinator).size());
-
-		coordinator.shutdown();
-	}
-
-	@Test
-	public void testAbortSavepointIfAckTasksAreNotExecuted() throws Exception {
-		JobID jobId = new JobID();
-		ExecutionVertex[] triggerVertices = new ExecutionVertex[] {
-				mockExecutionVertex(jobId),
-				mockExecutionVertex(jobId) };
-		ExecutionVertex[] ackVertices = new ExecutionVertex[] {
-				mock(ExecutionVertex.class),
-				mock(ExecutionVertex.class) };
-
-		SavepointCoordinator coordinator = createSavepointCoordinator(
-				jobId,
-				60 * 1000,
-				triggerVertices,
-				ackVertices,
-				new ExecutionVertex[] {},
-				new MockCheckpointIdCounter(),
-				new HeapSavepointStore());
-
-		// Trigger savepoint
-		Future<String> savepointPathFuture = coordinator.triggerSavepoint(1238123);
-
-		// Abort the savepoint, because the vertices are not running
-		assertTrue(savepointPathFuture.isCompleted());
-
-		try {
-			Await.result(savepointPathFuture, FiniteDuration.Zero());
-			fail("Did not throw expected Exception after shutdown");
-		}
-		catch (Exception ignored) {
-		}
-
-		// Verify all promises removed
-		assertEquals(0, getSavepointPromises(coordinator).size());
-
-		coordinator.shutdown();
-	}
-
-	@Test
-	public void testAbortOnCheckpointTimeout() throws Exception {
-		JobID jobId = new JobID();
-		ExecutionVertex[] vertices = new ExecutionVertex[] {
-				mockExecutionVertex(jobId),
-				mockExecutionVertex(jobId) };
-		ExecutionVertex commitVertex = mockExecutionVertex(jobId);
-		MockCheckpointIdCounter checkpointIdCounter = new MockCheckpointIdCounter();
-
-		long checkpointTimeout = 1000;
-		SavepointCoordinator coordinator = createSavepointCoordinator(
-				jobId,
-				checkpointTimeout,
-				vertices,
-				vertices,
-				new ExecutionVertex[] { commitVertex },
-				checkpointIdCounter,
-				new HeapSavepointStore());
-
-		// Trigger the savepoint
-		Future<String> savepointPathFuture = coordinator.triggerSavepoint(12731273);
-		assertFalse(savepointPathFuture.isCompleted());
-
-		long checkpointId = checkpointIdCounter.getLastReturnedCount();
-		PendingCheckpoint pendingCheckpoint = coordinator.getPendingCheckpoints()
-				.get(checkpointId);
-
-		assertNotNull("Checkpoint not pending (test race)", pendingCheckpoint);
-		assertFalse("Checkpoint already discarded (test race)", pendingCheckpoint.isDiscarded());
-
-		// Wait for savepoint to timeout
-		Deadline deadline = FiniteDuration.apply(60, "s").fromNow();
-		while (deadline.hasTimeLeft()
-				&& !pendingCheckpoint.isDiscarded()
-				&& coordinator.getNumberOfPendingCheckpoints() > 0) {
-
-			Thread.sleep(250);
-		}
-
-		// Verify discarded
-		assertTrue("Savepoint not discarded within timeout", pendingCheckpoint.isDiscarded());
-		assertEquals(0, coordinator.getNumberOfPendingCheckpoints());
-		assertEquals(0, coordinator.getNumberOfRetainedSuccessfulCheckpoints());
-
-		// No commit for timeout
-		verify(commitVertex, times(0)).sendMessageToCurrentExecution(
-				any(NotifyCheckpointComplete.class), any(ExecutionAttemptID.class));
-
-		assertTrue(savepointPathFuture.isCompleted());
-
-		try {
-			Await.result(savepointPathFuture, FiniteDuration.Zero());
-			fail("Did not throw expected Exception after timeout");
-		}
-		catch (Exception ignored) {
-		}
-
-		// Verify all promises removed
-		assertEquals(0, getSavepointPromises(coordinator).size());
-
-		coordinator.shutdown();
-	}
-
-	@Test
-	public void testAbortSavepointsOnShutdown() throws Exception {
-		JobID jobId = new JobID();
-		ExecutionVertex[] vertices = new ExecutionVertex[] {
-				mockExecutionVertex(jobId),
-				mockExecutionVertex(jobId) };
-
-		SavepointCoordinator coordinator = createSavepointCoordinator(
-				jobId,
-				60 * 1000,
-				vertices,
-				vertices,
-				vertices,
-				new MockCheckpointIdCounter(),
-				new HeapSavepointStore());
-
-		// Trigger savepoints
-		List<Future<String>> savepointPathFutures = new ArrayList<>();
-		savepointPathFutures.add(coordinator.triggerSavepoint(12731273));
-		savepointPathFutures.add(coordinator.triggerSavepoint(12731273 + 123));
-
-		for (Future<String> future : savepointPathFutures) {
-			assertFalse(future.isCompleted());
-		}
-
-		coordinator.shutdown();
-
-		// Verify futures failed
-		for (Future<String> future : savepointPathFutures) {
-			assertTrue(future.isCompleted());
-
-			try {
-				Await.result(future, FiniteDuration.Zero());
-				fail("Did not throw expected Exception after shutdown");
-			}
-			catch (Exception ignored) {
-			}
-		}
-
-		// Verify all promises removed
-		assertEquals(0, getSavepointPromises(coordinator).size());
-	}
-
-	@Test
-	public void testAbortSavepointOnStateStoreFailure() throws Exception {
-		JobID jobId = new JobID();
-		ExecutionJobVertex jobVertex = mockExecutionJobVertex(jobId, new JobVertexID(), 4);
-		HeapSavepointStore savepointStore = spy(new HeapSavepointStore());
-
-		SavepointCoordinator coordinator = createSavepointCoordinator(
-				jobId,
-				60 * 1000,
-				jobVertex.getTaskVertices(),
-				jobVertex.getTaskVertices(),
-				new ExecutionVertex[] {},
-				new MockCheckpointIdCounter(),
-				savepointStore);
-
-		// Failure on putState
-		doThrow(new RuntimeException("TestException"))
-				.when(savepointStore).storeSavepoint(any(Savepoint.class));
-
-		Future<String> savepointPathFuture = coordinator.triggerSavepoint(1231273123);
-
-		// Acknowledge all tasks
-		for (ExecutionVertex vertex : jobVertex.getTaskVertices()) {
-			ExecutionAttemptID attemptId = vertex.getCurrentExecutionAttempt().getAttemptId();
-			coordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(
-					jobId, attemptId, 0, createSerializedStateHandle(vertex), 0));
-		}
-
-		try {
-			Await.result(savepointPathFuture, FiniteDuration.Zero());
-			fail("Did not throw expected Exception after rollback with savepoint store failure.");
-		}
-		catch (Exception ignored) {
-		}
-
-		// Verify all promises removed
-		assertEquals(0, getSavepointPromises(coordinator).size());
-
-		coordinator.shutdown();
-	}
-
-	@Test
-	public void testAbortSavepointIfSubsumed() throws Exception {
-		JobID jobId = new JobID();
-		long checkpointTimeout = 60 * 1000;
-		long[] timestamps = new long[] { 1272635, 1272635 + 10 };
-		long[] checkpointIds = new long[2];
-		ExecutionVertex[] vertices = new ExecutionVertex[] {
-				mockExecutionVertex(jobId),
-				mockExecutionVertex(jobId) };
-		MockCheckpointIdCounter checkpointIdCounter = new MockCheckpointIdCounter();
-		HeapSavepointStore savepointStore = new HeapSavepointStore();
-
-		SavepointCoordinator coordinator = createSavepointCoordinator(
-				jobId,
-				checkpointTimeout,
-				vertices,
-				vertices,
-				vertices,
-				checkpointIdCounter,
-				savepointStore);
-
-		// Trigger the savepoints
-		List<Future<String>> savepointPathFutures = new ArrayList<>();
-
-		savepointPathFutures.add(coordinator.triggerSavepoint(timestamps[0]));
-		checkpointIds[0] = checkpointIdCounter.getLastReturnedCount();
-
-		savepointPathFutures.add(coordinator.triggerSavepoint(timestamps[1]));
-		checkpointIds[1] = checkpointIdCounter.getLastReturnedCount();
-
-		for (Future<String> future : savepointPathFutures) {
-			assertFalse(future.isCompleted());
-		}
-
-		// Verify send trigger messages
-		for (ExecutionVertex vertex : vertices) {
-			verifyTriggerCheckpoint(vertex, checkpointIds[0], timestamps[0]);
-			verifyTriggerCheckpoint(vertex, checkpointIds[1], timestamps[1]);
-		}
-
-		PendingCheckpoint[] pendingCheckpoints = new PendingCheckpoint[] {
-				coordinator.getPendingCheckpoints().get(checkpointIds[0]),
-				coordinator.getPendingCheckpoints().get(checkpointIds[1]) };
-
-		verifyPendingCheckpoint(pendingCheckpoints[0], jobId, checkpointIds[0],
-				timestamps[0], 0, 2, 0, false, false);
-
-		verifyPendingCheckpoint(pendingCheckpoints[1], jobId, checkpointIds[1],
-				timestamps[1], 0, 2, 0, false, false);
-
-		// Acknowledge second checkpoint...
-		for (ExecutionVertex vertex : vertices) {
-			coordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(
-					jobId, vertex.getCurrentExecutionAttempt().getAttemptId(),
-					checkpointIds[1], createSerializedStateHandle(vertex), 0));
-		}
-
-		// ...and one task of first checkpoint
-		coordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(
-				jobId, vertices[0].getCurrentExecutionAttempt().getAttemptId(),
-				checkpointIds[0], createSerializedStateHandle(vertices[0]), 0));
-
-		// The second pending checkpoint is completed and subsumes the first one
-		assertTrue(pendingCheckpoints[0].isDiscarded());
-		assertTrue(pendingCheckpoints[1].isDiscarded());
-		assertEquals(0, coordinator.getSuccessfulCheckpoints().size());
-
-		// Verify send notify complete messages for second checkpoint
-		for (ExecutionVertex vertex : vertices) {
-			verifyNotifyCheckpointComplete(vertex, checkpointIds[1], timestamps[1]);
-		}
-
-		Savepoint[] savepoints = new Savepoint[2];
-		String[] savepointPaths = new String[2];
-
-		// Verify that the futures have both been completed
-		assertTrue(savepointPathFutures.get(0).isCompleted());
-
-		try {
-			savepointPaths[0] = Await.result(savepointPathFutures.get(0), FiniteDuration.Zero());
-			fail("Did not throw expected exception");
-		}
-		catch (Exception ignored) {
-		}
-
-		// Verify the second savepoint
-		assertTrue(savepointPathFutures.get(1).isCompleted());
-		savepointPaths[1] = Await.result(savepointPathFutures.get(1), FiniteDuration.Zero());
-		savepoints[1] = savepointStore.loadSavepoint(savepointPaths[1]);
-		verifySavepoint(savepoints[1], checkpointIds[1], vertices);
-
-		// Verify all promises removed
-		assertEquals(0, getSavepointPromises(coordinator).size());
-
-		coordinator.shutdown();
-	}
-
-	@Test
-	public void testShutdownDoesNotCleanUpCompletedCheckpointsWithFileSystemStore() throws Exception {
-		JobID jobId = new JobID();
-		long checkpointTimeout = 60 * 1000;
-		long timestamp = 1272635;
-		ExecutionVertex[] vertices = new ExecutionVertex[] {
-				mockExecutionVertex(jobId),
-				mockExecutionVertex(jobId) };
-		MockCheckpointIdCounter checkpointIdCounter = new MockCheckpointIdCounter();
-
-		// Temporary directory for file state backend
-		final File tmpDir = CommonTestUtils.createTempDirectory();
-
-		try {
-			FsSavepointStore savepointStore = new FsSavepointStore(tmpDir.toURI().toString(), "sp-");
-
-			SavepointCoordinator coordinator = createSavepointCoordinator(
-					jobId,
-					checkpointTimeout,
-					vertices,
-					vertices,
-					vertices,
-					checkpointIdCounter,
-					savepointStore);
-
-			// Trigger the savepoint
-			Future<String> savepointPathFuture = coordinator.triggerSavepoint(timestamp);
-			assertFalse(savepointPathFuture.isCompleted());
-
-			long checkpointId = checkpointIdCounter.getLastReturnedCount();
-			assertEquals(0, checkpointId);
-
-			// Verify send trigger messages
-			for (ExecutionVertex vertex : vertices) {
-				verifyTriggerCheckpoint(vertex, checkpointId, timestamp);
-			}
-
-			PendingCheckpoint pendingCheckpoint = coordinator.getPendingCheckpoints()
-					.get(checkpointId);
-
-			verifyPendingCheckpoint(pendingCheckpoint, jobId, checkpointId,
-					timestamp, 0, 2, 0, false, false);
-
-			// Acknowledge tasks
-			for (ExecutionVertex vertex : vertices) {
-				coordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(
-						jobId, vertex.getCurrentExecutionAttempt().getAttemptId(),
-						checkpointId, createSerializedStateHandle(vertex), 0));
-			}
-
-			// The pending checkpoint is completed
-			assertTrue(pendingCheckpoint.isDiscarded());
-			assertEquals(0, coordinator.getSuccessfulCheckpoints().size());
-
-			// Verify send notify complete messages
-			for (ExecutionVertex vertex : vertices) {
-				verifyNotifyCheckpointComplete(vertex, checkpointId, timestamp);
-			}
-
-			// Verify that the future has been completed
-			assertTrue(savepointPathFuture.isCompleted());
-			String savepointPath = Await.result(savepointPathFuture, FiniteDuration.Zero());
-
-			// Verify all promises removed
-			assertEquals(0, getSavepointPromises(coordinator).size());
-
-			coordinator.shutdown();
-
-			// Verify the savepoint is still available
-			Savepoint savepoint = savepointStore.loadSavepoint(savepointPath);
-			verifySavepoint(savepoint, checkpointId, vertices);
-		}
-		finally {
-			FileUtils.deleteDirectory(tmpDir);
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	// Test helpers
-	// ------------------------------------------------------------------------
-
-	private static SavepointCoordinator createSavepointCoordinator(
-			JobID jobId,
-			long checkpointTimeout,
-			ExecutionVertex[] triggerVertices,
-			ExecutionVertex[] ackVertices,
-			ExecutionVertex[] commitVertices,
-			CheckpointIDCounter checkpointIdCounter,
-			SavepointStore savepointStore) throws Exception {
-
-		ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
-
-		return new SavepointCoordinator(
-				jobId,
-				checkpointTimeout,
-				checkpointTimeout,
-				42,
-				triggerVertices,
-				ackVertices,
-				commitVertices,
-				classLoader,
-				checkpointIdCounter,
-				savepointStore,
-				new DisabledCheckpointStatsTracker());
-	}
-
-	private static Map<JobVertexID, ExecutionJobVertex> createExecutionJobVertexMap(
-			ExecutionJobVertex... jobVertices) {
-
-		Map<JobVertexID, ExecutionJobVertex> jobVertexMap = new HashMap<>();
-
-		for (ExecutionJobVertex jobVertex : jobVertices) {
-			jobVertexMap.put(jobVertex.getJobVertexId(), jobVertex);
-		}
-
-		return jobVertexMap;
-	}
-
-	private static SerializedValue<StateHandle<?>> createSerializedStateHandle(
-			ExecutionVertex vertex) throws IOException {
-
-		return new SerializedValue<StateHandle<?>>(new LocalStateHandle<Serializable>(
-				vertex.getCurrentExecutionAttempt().getAttemptId()));
-	}
-
-	@SuppressWarnings("unchecked")
-	private Map<Long, Promise<String>> getSavepointPromises(
-			SavepointCoordinator coordinator)
-			throws NoSuchFieldException, IllegalAccessException {
-
-		Field field = SavepointCoordinator.class.getDeclaredField("savepointPromises");
-		field.setAccessible(true);
-		return (Map<Long, Promise<String>>) field.get(coordinator);
-	}
-
-	// ---- Verification ------------------------------------------------------
-
-	private static void verifyTriggerCheckpoint(
-			ExecutionVertex mockExecutionVertex,
-			long expectedCheckpointId,
-			long expectedTimestamp) {
-
-		ExecutionAttemptID attemptId = mockExecutionVertex
-				.getCurrentExecutionAttempt().getAttemptId();
-
-		TriggerCheckpoint expectedMsg = new TriggerCheckpoint(
-				mockExecutionVertex.getJobId(),
-				attemptId,
-				expectedCheckpointId,
-				expectedTimestamp);
-
-		verify(mockExecutionVertex).sendMessageToCurrentExecution(
-				eq(expectedMsg), eq(attemptId));
-	}
-
-	private static void verifyNotifyCheckpointComplete(
-			ExecutionVertex mockExecutionVertex,
-			long expectedCheckpointId,
-			long expectedTimestamp) {
-
-		ExecutionAttemptID attemptId = mockExecutionVertex
-				.getCurrentExecutionAttempt().getAttemptId();
-
-		NotifyCheckpointComplete expectedMsg = new NotifyCheckpointComplete(
-				mockExecutionVertex.getJobId(),
-				attemptId,
-				expectedCheckpointId,
-				expectedTimestamp);
-
-		verify(mockExecutionVertex).sendMessageToCurrentExecution(
-				eq(expectedMsg), eq(attemptId));
-	}
-
-	private static void verifyPendingCheckpoint(
-			PendingCheckpoint checkpoint,
-			JobID expectedJobId,
-			long expectedCheckpointId,
-			long expectedTimestamp,
-			int expectedNumberOfAcknowledgedTasks,
-			int expectedNumberOfNonAcknowledgedTasks,
-			int expectedNumberOfCollectedStates,
-			boolean expectedIsDiscarded,
-			boolean expectedIsFullyAcknowledged) {
-
-		assertNotNull(checkpoint);
-		assertEquals(expectedJobId, checkpoint.getJobId());
-		assertEquals(expectedCheckpointId, checkpoint.getCheckpointId());
-		assertEquals(expectedTimestamp, checkpoint.getCheckpointTimestamp());
-		assertEquals(expectedNumberOfAcknowledgedTasks, checkpoint.getNumberOfAcknowledgedTasks());
-		assertEquals(expectedNumberOfNonAcknowledgedTasks, checkpoint.getNumberOfNonAcknowledgedTasks());
-
-		int actualNumberOfCollectedStates = 0;
-
-		for (TaskState taskState : checkpoint.getTaskStates().values()) {
-			actualNumberOfCollectedStates += taskState.getNumberCollectedStates();
-		}
-
-		assertEquals(expectedNumberOfCollectedStates, actualNumberOfCollectedStates);
-		assertEquals(expectedIsDiscarded, checkpoint.isDiscarded());
-		assertEquals(expectedIsFullyAcknowledged, checkpoint.isFullyAcknowledged());
-	}
-
-	private static void verifySavepoint(
-			Savepoint savepoint,
-			long expectedCheckpointId,
-			ExecutionVertex[] expectedVertices) throws Exception {
-
-		assertEquals(expectedCheckpointId, savepoint.getCheckpointId());
-
-		for (TaskState taskState : savepoint.getTaskStates()) {
-			JobVertexID jobVertexId = taskState.getJobVertexID();
-
-			// Find matching execution vertex
-			ExecutionVertex vertex = null;
-			for (ExecutionVertex executionVertex : expectedVertices) {
-				if (executionVertex.getJobvertexId().equals(jobVertexId)) {
-					vertex = executionVertex;
-					break;
-				}
-			}
-
-			if (vertex == null) {
-				fail("Did not find matching vertex");
-			} else {
-				SubtaskState subtaskState = taskState.getState(vertex.getParallelSubtaskIndex());
-				ExecutionAttemptID vertexAttemptId = vertex.getCurrentExecutionAttempt().getAttemptId();
-
-				ExecutionAttemptID stateAttemptId = (ExecutionAttemptID) subtaskState.getState()
-						.deserializeValue(Thread.currentThread().getContextClassLoader())
-						.getState(Thread.currentThread().getContextClassLoader());
-
-				assertEquals(vertexAttemptId, stateAttemptId);
-			}
-		}
-	}
-
-	// ---- Mocking -----------------------------------------------------------
-
-	private static ExecutionJobVertex mockExecutionJobVertex(
-			JobID jobId,
-			JobVertexID jobVertexId,
-			int parallelism) {
-
-		ExecutionJobVertex jobVertex = mock(ExecutionJobVertex.class);
-		when(jobVertex.getJobId()).thenReturn(jobId);
-		when(jobVertex.getJobVertexId()).thenReturn(jobVertexId);
-		when(jobVertex.getParallelism()).thenReturn(parallelism);
-
-		ExecutionVertex[] vertices = new ExecutionVertex[parallelism];
-
-		for (int i = 0; i < vertices.length; i++) {
-			vertices[i] = mockExecutionVertex(jobId, jobVertexId, i, parallelism, ExecutionState.RUNNING);
-		}
-
-		when(jobVertex.getTaskVertices()).thenReturn(vertices);
-
-		return jobVertex;
-	}
-
-	private static ExecutionVertex mockExecutionVertex(JobID jobId) {
-		return mockExecutionVertex(jobId, ExecutionState.RUNNING);
-	}
-
-	private static ExecutionVertex mockExecutionVertex(
-			JobID jobId,
-			ExecutionState state) {
-
-		return mockExecutionVertex(jobId, new JobVertexID(), 0, 1, state);
-	}
-
-	private static ExecutionVertex mockExecutionVertex(
-			JobID jobId,
-			JobVertexID jobVertexId,
-			int subtaskIndex,
-			int parallelism,
-			ExecutionState executionState) {
-
-		Execution exec = mock(Execution.class);
-		when(exec.getAttemptId()).thenReturn(new ExecutionAttemptID());
-		when(exec.getState()).thenReturn(executionState);
-
-		ExecutionVertex vertex = mock(ExecutionVertex.class);
-		when(vertex.getJobId()).thenReturn(jobId);
-		when(vertex.getJobvertexId()).thenReturn(jobVertexId);
-		when(vertex.getParallelSubtaskIndex()).thenReturn(subtaskIndex);
-		when(vertex.getCurrentExecutionAttempt()).thenReturn(exec);
-		when(vertex.getTotalNumberOfParallelSubtasks()).thenReturn(parallelism);
-
-		return vertex;
-	}
-
-	private static class MockCheckpointIdCounter implements CheckpointIDCounter {
-
-		private boolean started;
-		private long count;
-		private long lastReturnedCount;
-
-		@Override
-		public void start() throws Exception {
-			started = true;
-		}
-
-		@Override
-		public void shutdown() throws Exception {
-			started = false;
-		}
-
-		@Override
-		public void suspend() throws Exception {
-			started = false;
-		}
-
-		@Override
-		public long getAndIncrement() throws Exception {
-			lastReturnedCount = count;
-			return count++;
-		}
-
-		@Override
-		public void setCount(long newCount) {
-			count = newCount;
-		}
-
-		long getLastReturnedCount() {
-			return lastReturnedCount;
-		}
-
-		public boolean isStarted() {
-			return started;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/47acdead/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
new file mode 100644
index 0000000..6a85195
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.savepoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.TaskState;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class SavepointLoaderTest {
+
+	/**
+	 * Tests loading and validation of savepoints with correct setup,
+	 * parallelism mismatch, and a missing task.
+	 */
+	@Test
+	public void testLoadAndValidateSavepoint() throws Exception {
+		int parallelism = 128128;
+		JobVertexID vertexId = new JobVertexID();
+
+		TaskState state = mock(TaskState.class);
+		when(state.getParallelism()).thenReturn(parallelism);
+		when(state.getJobVertexID()).thenReturn(vertexId);
+
+		Map<JobVertexID, TaskState> taskStates = new HashMap<>();
+		taskStates.put(vertexId, state);
+
+		CompletedCheckpoint stored = new CompletedCheckpoint(
+				new JobID(),
+				Integer.MAX_VALUE + 123123L,
+				10200202,
+				1020292988,
+				taskStates,
+				true);
+
+		// Store savepoint
+		SavepointV0 savepoint = new SavepointV0(stored.getCheckpointID(), taskStates.values());
+		SavepointStore store = new HeapSavepointStore();
+		String path = store.storeSavepoint(savepoint);
+
+		JobID jobId = new JobID();
+
+		ExecutionJobVertex vertex = mock(ExecutionJobVertex.class);
+		when(vertex.getParallelism()).thenReturn(parallelism);
+
+		Map<JobVertexID, ExecutionJobVertex> tasks = new HashMap<>();
+		tasks.put(vertexId, vertex);
+
+		// 1) Load and validate: everything correct
+		CompletedCheckpoint loaded = SavepointLoader.loadAndValidateSavepoint(jobId, tasks, store, path);
+
+		assertEquals(jobId, loaded.getJobId());
+		assertEquals(stored.getCheckpointID(), loaded.getCheckpointID());
+
+		// The loaded checkpoint should not discard state when its discarded
+		loaded.discard(ClassLoader.getSystemClassLoader());
+		verify(state, times(0)).discard(any(ClassLoader.class));
+
+		// 2) Load and validate: parallelism mismatch
+		when(vertex.getParallelism()).thenReturn(222);
+
+		try {
+			SavepointLoader.loadAndValidateSavepoint(jobId, tasks, store, path);
+			fail("Did not throw expected Exception");
+		} catch (IllegalStateException expected) {
+			assertTrue(expected.getMessage().contains("Parallelism mismatch"));
+		}
+
+		// 3) Load and validate: missing vertex (this should be relaxed)
+		assertNotNull(tasks.remove(vertexId));
+
+		try {
+			SavepointLoader.loadAndValidateSavepoint(jobId, tasks, store, path);
+			fail("Did not throw expected Exception");
+		} catch (IllegalStateException expected) {
+			assertTrue(expected.getMessage().contains("Cannot map old state"));
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/47acdead/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java
index 9265ab1..12bbf82 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java
@@ -22,8 +22,8 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
-import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.checkpoint.TaskState;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.util.SerializedValue;
@@ -340,8 +340,7 @@ public class SimpleCheckpointStatsTrackerTest {
 			// Add some random delay
 			final long completionTimestamp = triggerTimestamp + completionDuration + RAND.nextInt(10);
 
-			checkpoints[i] = new CompletedCheckpoint(
-					jobId, i, triggerTimestamp, completionTimestamp, taskGroupStates);
+			checkpoints[i] = new CompletedCheckpoint(jobId, i, triggerTimestamp, completionTimestamp, taskGroupStates, true);
 		}
 
 		return checkpoints;

http://git-wip-us.apache.org/repos/asf/flink/blob/47acdead/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
index a576a58..548bef0 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
@@ -22,7 +22,7 @@ package org.apache.flink.runtime.jobmanager
 import akka.actor.ActorSystem
 import akka.testkit.{ImplicitSender, TestKit}
 import akka.util.Timeout
-import org.apache.flink.api.common.{ExecutionConfig, JobID}
+import org.apache.flink.api.common.JobID
 import org.apache.flink.runtime.akka.ListeningBehaviour
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator
 import org.apache.flink.runtime.client.JobExecutionException
@@ -31,10 +31,8 @@ import org.apache.flink.runtime.jobgraph.{DistributionPattern, JobGraph, JobVert
 import org.apache.flink.runtime.jobmanager.Tasks._
 import org.apache.flink.runtime.jobmanager.scheduler.{NoResourceAvailableException, SlotSharingGroup}
 import org.apache.flink.runtime.messages.JobManagerMessages._
-import org.apache.flink.runtime.checkpoint.savepoint.SavepointCoordinator
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._
 import org.apache.flink.runtime.testingUtils.{ScalaTestingUtils, TestingUtils}
-import org.apache.flink.runtime.testutils.JobManagerActorTestUtils
 import org.junit.runner.RunWith
 import org.mockito.Mockito._
 import org.scalatest.junit.JUnitRunner
@@ -829,14 +827,14 @@ class JobManagerITCase(_system: ActorSystem)
             deadline.timeLeft).executionGraph
 
           // Mock the checkpoint coordinator
-          val savepointCoordinator = mock(classOf[SavepointCoordinator])
+          val checkpointCoordinator = mock(classOf[CheckpointCoordinator])
           doThrow(new Exception("Expected Test Exception"))
-            .when(savepointCoordinator).triggerSavepoint(org.mockito.Matchers.anyLong())
+            .when(checkpointCoordinator).triggerSavepoint(org.mockito.Matchers.anyLong())
 
           // Update the savepoint coordinator field
-          val field = executionGraph.getClass.getDeclaredField("savepointCoordinator")
+          val field = executionGraph.getClass.getDeclaredField("checkpointCoordinator")
           field.setAccessible(true)
-          field.set(executionGraph, savepointCoordinator)
+          field.set(executionGraph, checkpointCoordinator)
 
           // Trigger savepoint for job
           jobManager.tell(TriggerSavepoint(jobGraph.getJobID()), testActor)
@@ -877,10 +875,12 @@ class JobManagerITCase(_system: ActorSystem)
           expectMsg(JobSubmitSuccess(jobGraph.getJobID()))
 
           // Mock the checkpoint coordinator
-          val savepointCoordinator = mock(classOf[SavepointCoordinator])
+          val checkpointCoordinator = mock(classOf[CheckpointCoordinator])
+          doThrow(new Exception("Expected Test Exception"))
+            .when(checkpointCoordinator).triggerSavepoint(org.mockito.Matchers.anyLong())
           val savepointPathPromise = scala.concurrent.promise[String]
           doReturn(savepointPathPromise.future)
-            .when(savepointCoordinator).triggerSavepoint(org.mockito.Matchers.anyLong())
+            .when(checkpointCoordinator).triggerSavepoint(org.mockito.Matchers.anyLong())
 
           // Request the execution graph and set a checkpoint coordinator mock
           jobManager.tell(RequestExecutionGraph(jobGraph.getJobID), testActor)
@@ -888,9 +888,9 @@ class JobManagerITCase(_system: ActorSystem)
             deadline.timeLeft).executionGraph
 
           // Update the savepoint coordinator field
-          val field = executionGraph.getClass.getDeclaredField("savepointCoordinator")
+          val field = executionGraph.getClass.getDeclaredField("checkpointCoordinator")
           field.setAccessible(true)
-          field.set(executionGraph, savepointCoordinator)
+          field.set(executionGraph, checkpointCoordinator)
 
           // Trigger savepoint for job
           jobManager.tell(TriggerSavepoint(jobGraph.getJobID()), testActor)
@@ -935,10 +935,12 @@ class JobManagerITCase(_system: ActorSystem)
           expectMsg(JobSubmitSuccess(jobGraph.getJobID()))
 
           // Mock the checkpoint coordinator
-          val savepointCoordinator = mock(classOf[SavepointCoordinator])
+          val checkpointCoordinator = mock(classOf[CheckpointCoordinator])
+          doThrow(new Exception("Expected Test Exception"))
+            .when(checkpointCoordinator).triggerSavepoint(org.mockito.Matchers.anyLong())
           val savepointPathPromise = scala.concurrent.promise[String]
           doReturn(savepointPathPromise.future)
-            .when(savepointCoordinator).triggerSavepoint(org.mockito.Matchers.anyLong())
+            .when(checkpointCoordinator).triggerSavepoint(org.mockito.Matchers.anyLong())
 
           // Request the execution graph and set a checkpoint coordinator mock
           jobManager.tell(RequestExecutionGraph(jobGraph.getJobID), testActor)
@@ -946,9 +948,9 @@ class JobManagerITCase(_system: ActorSystem)
             deadline.timeLeft).executionGraph
 
           // Update the savepoint coordinator field
-          val field = executionGraph.getClass.getDeclaredField("savepointCoordinator")
+          val field = executionGraph.getClass.getDeclaredField("checkpointCoordinator")
           field.setAccessible(true)
-          field.set(executionGraph, savepointCoordinator)
+          field.set(executionGraph, checkpointCoordinator)
 
           // Trigger savepoint for job
           jobManager.tell(TriggerSavepoint(jobGraph.getJobID()), testActor)

http://git-wip-us.apache.org/repos/asf/flink/blob/47acdead/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index 0ed28ad..4fc310c 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -403,148 +403,6 @@ public class SavepointITCase extends TestLogger {
 	}
 
 	/**
-	 * Tests that removed checkpoint files which are part of a savepoint throw
-	 * a proper Exception on submission.
-	 */
-	@Test
-	@RetryOnFailure(times = 2)
-	public void testCheckpointHasBeenRemoved() throws Exception {
-		// Config
-		int numTaskManagers = 2;
-		int numSlotsPerTaskManager = 2;
-		int parallelism = numTaskManagers * numSlotsPerTaskManager;
-
-		// Test deadline
-		final Deadline deadline = new FiniteDuration(5, TimeUnit.MINUTES).fromNow();
-
-		// The number of checkpoints to complete before triggering the savepoint
-		final int numberOfCompletedCheckpoints = 10;
-
-		// Temporary directory for file state backend
-		final File tmpDir = CommonTestUtils.createTempDirectory();
-
-		LOG.info("Created temporary directory: " + tmpDir + ".");
-
-		ForkableFlinkMiniCluster flink = null;
-
-		try {
-			// Flink configuration
-			final Configuration config = new Configuration();
-			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskManagers);
-			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTaskManager);
-
-			final File checkpointDir = new File(tmpDir, "checkpoints");
-			final File savepointDir = new File(tmpDir, "savepoints");
-
-			if (!checkpointDir.mkdir() || !savepointDir.mkdirs()) {
-				fail("Test setup failed: failed to create temporary directories.");
-			}
-
-			LOG.info("Created temporary checkpoint directory: " + checkpointDir + ".");
-			LOG.info("Created temporary savepoint directory: " + savepointDir + ".");
-
-			config.setString(ConfigConstants.STATE_BACKEND, "filesystem");
-			config.setString(SavepointStoreFactory.SAVEPOINT_BACKEND_KEY, "filesystem");
-
-			config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY,
-					checkpointDir.toURI().toString());
-			config.setString(SavepointStoreFactory.SAVEPOINT_DIRECTORY_KEY,
-					savepointDir.toURI().toString());
-
-			LOG.info("Flink configuration: " + config + ".");
-
-			// Start Flink
-			flink = new ForkableFlinkMiniCluster(config);
-			LOG.info("Starting Flink cluster.");
-			flink.start();
-
-			// Retrieve the job manager
-			LOG.info("Retrieving JobManager.");
-			ActorGateway jobManager = Await.result(
-					flink.leaderGateway().future(),
-					deadline.timeLeft());
-			LOG.info("JobManager: " + jobManager + ".");
-
-			// Submit the job
-			final JobGraph jobGraph = createJobGraph(parallelism, 0, 1000, 1000);
-			final JobID jobId = jobGraph.getJobID();
-
-			// Wait for the source to be notified about the expected number
-			// of completed checkpoints
-			InfiniteTestSource.CheckpointCompleteLatch = new CountDownLatch(
-					numberOfCompletedCheckpoints);
-
-			LOG.info("Submitting job " + jobGraph.getJobID() + " in detached mode.");
-
-			flink.submitJobDetached(jobGraph);
-
-			LOG.info("Waiting for " + numberOfCompletedCheckpoints +
-					" checkpoint complete notifications.");
-
-			// Wait...
-			InfiniteTestSource.CheckpointCompleteLatch.await();
-
-			LOG.info("Received all " + numberOfCompletedCheckpoints +
-					" checkpoint complete notifications.");
-
-			// ...and then trigger the savepoint
-			LOG.info("Triggering a savepoint.");
-
-			Future<Object> savepointPathFuture = jobManager.ask(
-					new TriggerSavepoint(jobId), deadline.timeLeft());
-
-			final String savepointPath = ((TriggerSavepointSuccess) Await
-					.result(savepointPathFuture, deadline.timeLeft())).savepointPath();
-			LOG.info("Retrieved savepoint path: " + savepointPath + ".");
-
-			// Retrieve the savepoint from the testing job manager
-			LOG.info("Requesting the savepoint.");
-			Future<Object> savepointFuture = jobManager.ask(
-					new RequestSavepoint(savepointPath),
-					deadline.timeLeft());
-
-			Await.ready(savepointFuture, deadline.timeLeft());
-			LOG.info("Retrieved savepoint: " + savepointPath + ".");
-
-			// Shut down the Flink cluster (thereby canceling the job)
-			LOG.info("Shutting down Flink cluster.");
-			flink.shutdown();
-
-			// Remove the checkpoint files
-			try {
-				FileUtils.deleteDirectory(checkpointDir);
-			} catch (FileNotFoundException ignored) {
-			}
-
-			// Restart the cluster
-			LOG.info("Restarting Flink cluster.");
-			flink.start();
-
-			// Set the savepoint path
-			jobGraph.setSavepointPath(savepointPath);
-
-			LOG.info("Resubmitting job " + jobGraph.getJobID() + " with " +
-					"savepoint path " + savepointPath + " in detached mode.");
-
-			try {
-				flink.submitJobAndWait(jobGraph, false, deadline.timeLeft());
-				fail("Did not throw expected Exception because of missing checkpoint files");
-			}
-			catch (Exception ignored) {
-			}
-		}
-		finally {
-			if (flink != null) {
-				flink.shutdown();
-			}
-
-			if (tmpDir != null) {
-				FileUtils.deleteDirectory(tmpDir);
-			}
-		}
-	}
-
-	/**
 	 * Tests that a job manager backed savepoint is removed when the checkpoint
 	 * coordinator is shut down, because the associated checkpoints files will
 	 * linger around otherwise.
@@ -645,40 +503,42 @@ public class SavepointITCase extends TestLogger {
 					savepointFuture, deadline.timeLeft())).savepoint();
 			LOG.info("Retrieved savepoint: " + savepointPath + ".");
 
-			// Cancel the job
-			LOG.info("Cancelling job " + jobId + ".");
-			Future<Object> cancelRespFuture = jobManager.ask(
-					new CancelJob(jobId), deadline.timeLeft());
-			assertTrue(Await.result(cancelRespFuture, deadline.timeLeft())
-					instanceof CancellationSuccess);
-
-			LOG.info("Waiting for job " + jobId + " to be removed.");
-			Future<Object> removedRespFuture = jobManager.ask(
-					new NotifyWhenJobRemoved(jobId), deadline.timeLeft());
-			assertTrue((Boolean) Await.result(removedRespFuture, deadline.timeLeft()));
-
 			// Check that all checkpoint files have been removed
 			for (TaskState stateForTaskGroup : savepoint.getTaskStates()) {
 				for (SubtaskState subtaskState : stateForTaskGroup.getStates()) {
 					StreamTaskStateList taskStateList = (StreamTaskStateList) subtaskState.getState()
-						.deserializeValue(ClassLoader.getSystemClassLoader());
+							.deserializeValue(ClassLoader.getSystemClassLoader());
 
 					for (StreamTaskState taskState : taskStateList.getState(
-						ClassLoader.getSystemClassLoader())) {
+							ClassLoader.getSystemClassLoader())) {
 
 						AbstractFileStateHandle fsState = (AbstractFileStateHandle) taskState.getFunctionState();
 						checkpointFiles.add(new File(fsState.getFilePath().toUri()));
 					}
 				}
 			}
+
+			// Cancel the job
+			LOG.info("Cancelling job " + jobId + ".");
+			Future<Object> cancelRespFuture = jobManager.ask(
+					new CancelJob(jobId), deadline.timeLeft());
+			assertTrue(Await.result(cancelRespFuture, deadline.timeLeft())
+					instanceof CancellationSuccess);
+
+			LOG.info("Waiting for job " + jobId + " to be removed.");
+			Future<Object> removedRespFuture = jobManager.ask(
+					new NotifyWhenJobRemoved(jobId), deadline.timeLeft());
+			assertTrue((Boolean) Await.result(removedRespFuture, deadline.timeLeft()));
 		}
 		finally {
 			if (flink != null) {
 				flink.shutdown();
 			}
 
+			Thread.sleep(1000);
+
 			// At least one checkpoint file
-			assertTrue(checkpointFiles.size() > 0);
+			assertTrue(checkpointFiles.toString(), checkpointFiles.size() > 0);
 
 			// The checkpoint associated with the savepoint should have been
 			// discarded after shutdown
@@ -753,121 +613,6 @@ public class SavepointITCase extends TestLogger {
 		}
 	}
 
-	/**
-	 * Tests that a restore failure is retried with the savepoint state.
-	 */
-	@Test
-	public void testRestoreFailure() throws Exception {
-		// Config
-		int numTaskManagers = 1;
-		int numSlotsPerTaskManager = 1;
-		int numExecutionRetries = 2;
-		int retryDelay = 500;
-		int checkpointingInterval = 100000000;
-
-		// Test deadline
-		final Deadline deadline = new FiniteDuration(3, TimeUnit.MINUTES).fromNow();
-
-		ForkableFlinkMiniCluster flink = null;
-
-		try {
-			// The job
-			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-			env.setParallelism(1);
-			env.enableCheckpointing(checkpointingInterval);
-			env.setNumberOfExecutionRetries(numExecutionRetries);
-			env.getConfig().setExecutionRetryDelay(retryDelay);
-
-			DataStream<Integer> stream = env
-					.addSource(new RestoreStateCountingAndFailingSource());
-
-			// Source configuration
-			RestoreStateCountingAndFailingSource.failOnRestoreStateCall = false;
-			RestoreStateCountingAndFailingSource.numRestoreStateCalls = 0;
-			RestoreStateCountingAndFailingSource.checkpointCompleteLatch = new CountDownLatch(1);
-			RestoreStateCountingAndFailingSource.emitted= 0;
-
-			stream.addSink(new DiscardingSink<Integer>());
-
-			JobGraph jobGraph = env.getStreamGraph().getJobGraph();
-
-			// Flink configuration
-			final Configuration config = new Configuration();
-			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskManagers);
-			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTaskManager);
-			LOG.info("Flink configuration: " + config + ".");
-
-			// Start Flink
-			flink = new ForkableFlinkMiniCluster(config);
-			LOG.info("Starting Flink cluster.");
-			flink.start();
-
-			// Retrieve the job manager
-			LOG.info("Retrieving JobManager.");
-			ActorGateway jobManager = flink.getLeaderGateway(deadline.timeLeft());
-			LOG.info("JobManager: " + jobManager + ".");
-
-			// Submit the job and wait for some checkpoints to complete
-			flink.submitJobDetached(jobGraph);
-
-			while (deadline.hasTimeLeft() && RestoreStateCountingAndFailingSource.emitted < 100) {
-				Thread.sleep(100);
-			}
-
-			assertTrue("No progress", RestoreStateCountingAndFailingSource.emitted >= 100);
-
-			// Trigger the savepoint
-			Future<Object> savepointPathFuture = jobManager.ask(
-					new TriggerSavepoint(jobGraph.getJobID()), deadline.timeLeft());
-
-			Object resp = Await.result(savepointPathFuture, deadline.timeLeft());
-
-			String savepointPath = null;
-			if (resp instanceof TriggerSavepointSuccess) {
-				savepointPath = ((TriggerSavepointSuccess) resp).savepointPath();
-				LOG.info("Retrieved savepoint path: " + savepointPath + ".");
-			} else if (resp instanceof TriggerSavepointFailure) {
-				fail("Received TriggerSavepointFailure: " + ((TriggerSavepointFailure) resp).cause().getMessage());
-			} else {
-				fail("Unexpected response of type  " + resp.getClass() + " " + resp);
-			}
-
-			// Completed checkpoint
-			RestoreStateCountingAndFailingSource.checkpointCompleteLatch.await();
-
-			// Cancel the job
-			Future<?> cancelFuture = jobManager.ask(new CancelJob(
-					jobGraph.getJobID()), deadline.timeLeft());
-			Await.ready(cancelFuture, deadline.timeLeft());
-
-			// Wait for the job to be removed
-			Future<?> removedFuture = jobManager.ask(new NotifyWhenJobRemoved(
-					jobGraph.getJobID()), deadline.timeLeft());
-			Await.ready(removedFuture, deadline.timeLeft());
-
-			// Set source to fail on restore calls and try to recover from savepoint
-			RestoreStateCountingAndFailingSource.failOnRestoreStateCall = true;
-			jobGraph.setSavepointPath(savepointPath);
-
-			try {
-				flink.submitJobAndWait(jobGraph, false, deadline.timeLeft());
-				// If the savepoint state is not restored, we will wait here
-				// until the deadline times out.
-				fail("Did not throw expected Exception");
-			} catch (Exception ignored) {
-			} finally {
-				// Expecting one restore for the initial submission from
-				// savepoint and one for the execution retries
-				assertEquals(1 + numExecutionRetries, RestoreStateCountingAndFailingSource.numRestoreStateCalls);
-			}
-		}
-		finally {
-			if (flink != null) {
-				flink.shutdown();
-			}
-		}
-	}
-
 	// ------------------------------------------------------------------------
 	// Test program
 	// ------------------------------------------------------------------------
@@ -982,6 +727,10 @@ public class SavepointITCase extends TestLogger {
 			while (running) {
 				ctx.collect(1);
 				emitted++;
+
+				if (failOnRestoreStateCall) {
+					throw new RuntimeException("Restore test failure");
+				}
 			}
 		}
 
@@ -998,10 +747,6 @@ public class SavepointITCase extends TestLogger {
 		@Override
 		public void restoreState(Serializable state) throws Exception {
 			numRestoreStateCalls++;
-
-			if (failOnRestoreStateCall) {
-				throw new RuntimeException("Restore test failure");
-			}
 		}
 
 		@Override


Mime
View raw message