flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject flink git commit: [FLINK-4237] [runtime] Cancel savepoints on declined snapshots
Date Thu, 21 Jul 2016 11:50:24 GMT
Repository: flink
Updated Branches:
  refs/heads/master ccd4fd9d5 -> f81cda38b


[FLINK-4237] [runtime] Cancel savepoints on declined snapshots


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f81cda38
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f81cda38
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f81cda38

Branch: refs/heads/master
Commit: f81cda38b216a3eeec7f124de5e44ddbb361f4ad
Parents: ccd4fd9
Author: Ufuk Celebi <uce@apache.org>
Authored: Wed Jul 20 16:15:22 2016 +0200
Committer: Ufuk Celebi <uce@apache.org>
Committed: Thu Jul 21 13:50:12 2016 +0200

----------------------------------------------------------------------
 .../checkpoint/CheckpointCoordinator.java       |  2 +
 .../checkpoint/SavepointCoordinator.java        | 11 ++-
 .../checkpoint/SavepointCoordinatorTest.java    | 74 ++++++++++++++++++++
 .../test/classloading/ClassLoaderITCase.java    |  7 +-
 4 files changed, 90 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f81cda38/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index c21ebc0..ec2d60e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -575,6 +575,8 @@ public class CheckpointCoordinator {
 				checkpoint.discard(userClassLoader);
 				rememberRecentCheckpointId(checkpointId);
 
+				onCancelCheckpoint(checkpointId);
+
 				boolean haveMoreRecentPending = false;
 				Iterator<Map.Entry<Long, PendingCheckpoint>> entries = pendingCheckpoints.entrySet().iterator();
 				while (entries.hasNext()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/f81cda38/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SavepointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SavepointCoordinator.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SavepointCoordinator.java
index b96a02a..d77d2fb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SavepointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SavepointCoordinator.java
@@ -135,6 +135,8 @@ public class SavepointCoordinator extends CheckpointCoordinator {
 				throw new IllegalStateException("Failed to get checkpoint Id");
 			}
 
+			LOG.info("Triggering savepoint with ID " + checkpointId);
+
 			// Important: make sure to add the promise to the map before calling
 			// any methods that might trigger callbacks, which require the promise.
 			// Otherwise, the might be race conditions.
@@ -274,6 +276,7 @@ public class SavepointCoordinator extends CheckpointCoordinator {
 
 	@Override
 	protected void onCancelCheckpoint(long canceledCheckpointId) {
+		LOG.info("Cancelling savepoint with checkpoint ID " + canceledCheckpointId);
 		Promise<String> promise = savepointPromises.remove(canceledCheckpointId);
 
 		if (promise != null) {
@@ -284,8 +287,12 @@ public class SavepointCoordinator extends CheckpointCoordinator {
 	@Override
 	protected void onFullyAcknowledgedCheckpoint(CompletedCheckpoint checkpoint) {
 		// Sanity check
-		Promise<String> promise = checkNotNull(savepointPromises
-				.remove(checkpoint.getCheckpointID()));
+		Promise<String> promise = savepointPromises.remove(checkpoint.getCheckpointID());
+
+		if (promise == null) {
+			LOG.info("Pending savepoint with ID " + checkpoint.getCheckpointID() + "  has been " +
+					"removed before receiving acknowledgment.");
+		}
 
 		// Sanity check
 		if (promise.isCompleted()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/f81cda38/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SavepointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SavepointCoordinatorTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SavepointCoordinatorTest.java
index 384ed42..6fb3b56 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SavepointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SavepointCoordinatorTest.java
@@ -27,7 +27,9 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
+import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete;
 import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint;
 import org.apache.flink.runtime.state.LocalStateHandle;
@@ -54,6 +56,7 @@ import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -148,6 +151,77 @@ public class SavepointCoordinatorTest extends TestLogger {
 		coordinator.shutdown();
 	}
 
+	/**
+	 * This test triggers a checkpoint and then sends a decline checkpoint message from
+	 * one of the tasks. The expected behaviour is that said checkpoint is discarded and a new
+	 * checkpoint is triggered.
+	 */
+	@Test
+	public void testTriggerAndDeclineCheckpointSimple() throws Exception {
+		JobID jobId = new JobID();
+		long checkpointTimeout = 60 * 1000;
+		long timestamp = 1272635;
+		ExecutionVertex[] vertices = new ExecutionVertex[] {
+				mockExecutionVertex(jobId),
+				mockExecutionVertex(jobId) };
+		MockCheckpointIdCounter checkpointIdCounter = new MockCheckpointIdCounter();
+		HeapStateStore<CompletedCheckpoint> savepointStore = new HeapStateStore<>();
+
+		SavepointCoordinator coordinator = createSavepointCoordinator(
+				jobId,
+				checkpointTimeout,
+				vertices,
+				vertices,
+				vertices,
+				checkpointIdCounter,
+				savepointStore);
+
+		// Trigger the savepoint
+		Future<String> savepointPathFuture = coordinator.triggerSavepoint(timestamp);
+		assertFalse(savepointPathFuture.isCompleted());
+
+		long checkpointId = checkpointIdCounter.getLastReturnedCount();
+		assertEquals(0, checkpointId);
+
+		// Verify send trigger messages
+		for (ExecutionVertex vertex : vertices) {
+			verifyTriggerCheckpoint(vertex, checkpointId, timestamp);
+		}
+
+		PendingCheckpoint pendingCheckpoint = coordinator.getPendingCheckpoints()
+				.get(checkpointId);
+
+		verifyPendingCheckpoint(pendingCheckpoint, jobId, checkpointId,
+				timestamp, 0, 2, 0, false, false);
+
+		// Acknowledge and decline tasks
+		coordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(
+				jobId, vertices[0].getCurrentExecutionAttempt().getAttemptId(),
+				checkpointId, createSerializedStateHandle(vertices[0]), 0));
+
+		coordinator.receiveDeclineMessage(new DeclineCheckpoint(
+				jobId, vertices[1].getCurrentExecutionAttempt().getAttemptId(),
+				checkpointId, 0));
+
+
+		// The pending checkpoint is completed
+		assertTrue(pendingCheckpoint.isDiscarded());
+		assertEquals(0, coordinator.getSuccessfulCheckpoints().size());
+
+		// Verify that the future has been completed
+		assertTrue(savepointPathFuture.isCompleted());
+
+		try {
+			Await.result(savepointPathFuture.failed(), FiniteDuration.Zero());
+			fail("Did not throw expected exception");
+		} catch (Throwable ignored) {}
+
+		// Verify all promises removed
+		assertEquals(0, getSavepointPromises(coordinator).size());
+
+		coordinator.shutdown();
+	}
+
 	// ------------------------------------------------------------------------
 	// Rollback
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/f81cda38/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
index 9e24944..eff8a0e 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.test.classloading;
 
+import akka.pattern.AskTimeoutException;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.client.program.ProgramInvocationException;
@@ -39,6 +40,7 @@ import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunning;
 import org.apache.flink.test.testdata.KMeansData;
 import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.util.TestLogger;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -60,7 +62,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.fail;
 
-public class ClassLoaderITCase {
+public class ClassLoaderITCase extends TestLogger {
 
 	private static final Logger LOG = LoggerFactory.getLogger(ClassLoaderITCase.class);
 
@@ -275,8 +277,9 @@ public class ClassLoaderITCase {
 		// Trigger savepoint
 		String savepointPath = null;
 		for (int i = 0; i < 20; i++) {
-			LOG.info("Triggering savepoint (" + (i+1) + "/20.");
+			LOG.info("Triggering savepoint (" + (i+1) + "/20).");
 			Future<Object> savepointFuture = jm.ask(new TriggerSavepoint(jobId), deadline.timeLeft());
+
 			Object savepointResponse = Await.result(savepointFuture, deadline.timeLeft());
 
 			if (savepointResponse.getClass() == TriggerSavepointSuccess.class) {


Mime
View raw message