flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [flink] 01/02: [FLINK-13593][checkpointing] Prevent failing the wrong job in CheckpointFailureManager
Date Fri, 09 Aug 2019 12:49:48 GMT
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1e22e641e4185e1964459d2486e676526990d719
Author: Yu Li <liyu@apache.org>
AuthorDate: Tue Aug 6 07:57:12 2019 +0200

    [FLINK-13593][checkpointing] Prevent failing the wrong job in CheckpointFailureManager
    
    This closes #9364.
---
 .../runtime/checkpoint/CheckpointCoordinator.java  | 52 ++++++++++++++++-----
 .../checkpoint/CheckpointFailureManager.java       | 54 +++++++++++++++++++---
 .../runtime/executiongraph/ExecutionGraph.java     | 24 +++++++++-
 .../CheckpointCoordinatorFailureTest.java          | 10 +++-
 .../CheckpointCoordinatorMasterHooksTest.java      | 11 ++++-
 .../checkpoint/CheckpointCoordinatorTest.java      | 44 ++++++++++++++----
 .../checkpoint/CheckpointFailureManagerTest.java   | 34 ++++++++------
 .../checkpoint/CheckpointStateRestoreTest.java     | 10 +++-
 .../executiongraph/ExecutionGraphRestartTest.java  | 41 ++++++++++++++++
 9 files changed, 235 insertions(+), 45 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 7f258c9..9f4e703 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -396,6 +396,7 @@ public class CheckpointCoordinator {
 			@Nullable final String targetLocation) {
 
 		final CheckpointProperties properties = CheckpointProperties.forSyncSavepoint();
+
 		return triggerSavepointInternal(timestamp, properties, advanceToEndOfEventTime, targetLocation).handle(
 				(completedCheckpoint, throwable) -> {
 					if (throwable != null) {
@@ -447,7 +448,7 @@ public class CheckpointCoordinator {
 			long latestGeneratedCheckpointId = getCheckpointIdCounter().get();
 			// here we can not get the failed pending checkpoint's id,
 			// so we pass the negative latest generated checkpoint id as a special flag
-			failureManager.handleCheckpointException(e, -1 * latestGeneratedCheckpointId);
+			failureManager.handleJobLevelCheckpointException(e, -1 * latestGeneratedCheckpointId);
 			return false;
 		}
 	}
@@ -718,7 +719,7 @@ public class CheckpointCoordinator {
 					message.getTaskExecutionId(),
 					job,
 					taskManagerLocationInfo);
-				discardCheckpoint(checkpoint, message.getReason());
+				discardCheckpoint(checkpoint, message.getReason(), message.getTaskExecutionId());
 			}
 			else if (checkpoint != null) {
 				// this should not happen
@@ -945,7 +946,7 @@ public class CheckpointCoordinator {
 
 				if (!pendingCheckpoint.isAcknowledgedBy(executionAttemptId)) {
 					pendingCheckpointIterator.remove();
-					discardCheckpoint(pendingCheckpoint, cause);
+					discardCheckpoint(pendingCheckpoint, cause, executionAttemptId);
 				}
 			}
 		}
@@ -1332,8 +1333,12 @@ public class CheckpointCoordinator {
 	 *
 	 * @param pendingCheckpoint to discard
 	 * @param cause for discarding the checkpoint
+	 * @param executionAttemptID the execution attempt id of the failing task.
 	 */
-	private void discardCheckpoint(PendingCheckpoint pendingCheckpoint, @Nullable Throwable
cause) {
+	private void discardCheckpoint(
+		PendingCheckpoint pendingCheckpoint,
+		@Nullable Throwable cause,
+		ExecutionAttemptID executionAttemptID) {
 		assert(Thread.holdsLock(lock));
 		Preconditions.checkNotNull(pendingCheckpoint);
 
@@ -1342,12 +1347,12 @@ public class CheckpointCoordinator {
 		LOG.info("Discarding checkpoint {} of job {}.", checkpointId, job, cause);
 
 		if (cause == null) {
-			failPendingCheckpoint(pendingCheckpoint, CheckpointFailureReason.CHECKPOINT_DECLINED);
+			failPendingCheckpointDueToTaskFailure(pendingCheckpoint, CheckpointFailureReason.CHECKPOINT_DECLINED,
executionAttemptID);
 		} else if (cause instanceof CheckpointException) {
 			CheckpointException exception = (CheckpointException) cause;
-			failPendingCheckpoint(pendingCheckpoint, exception.getCheckpointFailureReason(), cause);
+			failPendingCheckpointDueToTaskFailure(pendingCheckpoint, exception.getCheckpointFailureReason(),
cause, executionAttemptID);
 		} else {
-			failPendingCheckpoint(pendingCheckpoint, CheckpointFailureReason.JOB_FAILURE, cause);
+			failPendingCheckpointDueToTaskFailure(pendingCheckpoint, CheckpointFailureReason.JOB_FAILURE,
cause, executionAttemptID);
 		}
 
 		rememberRecentCheckpointId(checkpointId);
@@ -1401,21 +1406,46 @@ public class CheckpointCoordinator {
 	}
 
 	private void failPendingCheckpoint(
-		final PendingCheckpoint pendingCheckpoint,
-		final CheckpointFailureReason reason) {
+			final PendingCheckpoint pendingCheckpoint,
+			final CheckpointFailureReason reason) {
 
 		failPendingCheckpoint(pendingCheckpoint, reason, null);
 	}
 
 	private void failPendingCheckpoint(
+		final PendingCheckpoint pendingCheckpoint,
+		final CheckpointFailureReason reason,
+		@Nullable final Throwable cause) {
+
+		CheckpointException exception = new CheckpointException(reason, cause);
+		pendingCheckpoint.abort(reason, cause);
+		failureManager.handleJobLevelCheckpointException(exception, pendingCheckpoint.getCheckpointId());
+
+		checkAndResetCheckpointScheduler();
+	}
+
+	private void failPendingCheckpointDueToTaskFailure(
+		final PendingCheckpoint pendingCheckpoint,
+		final CheckpointFailureReason reason,
+		final ExecutionAttemptID executionAttemptID) {
+
+		failPendingCheckpointDueToTaskFailure(pendingCheckpoint, reason, null, executionAttemptID);
+	}
+
+	private void failPendingCheckpointDueToTaskFailure(
 			final PendingCheckpoint pendingCheckpoint,
 			final CheckpointFailureReason reason,
-			final Throwable cause) {
+			@Nullable final Throwable cause,
+			final ExecutionAttemptID executionAttemptID) {
 
 		CheckpointException exception = new CheckpointException(reason, cause);
 		pendingCheckpoint.abort(reason, cause);
-		failureManager.handleCheckpointException(exception, pendingCheckpoint.getCheckpointId());
+		failureManager.handleTaskLevelCheckpointException(exception, pendingCheckpoint.getCheckpointId(),
executionAttemptID);
+
+		checkAndResetCheckpointScheduler();
+	}
 
+	private void checkAndResetCheckpointScheduler() {
 		if (!shutdown && periodicScheduling && currentPeriodicTrigger == null)
{
 			synchronized (lock) {
 				if (pendingCheckpoints.isEmpty() || allPendingCheckpointsDiscarded()) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
index 8d12bef..841ac2e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.runtime.checkpoint;
 
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkRuntimeException;
 
@@ -50,7 +51,7 @@ public class CheckpointFailureManager {
 	}
 
 	/**
-	 * Handle checkpoint exception with a handler callback.
+	 * Handle job level checkpoint exception with a handler callback.
 	 *
 	 * @param exception the checkpoint exception.
 	 * @param checkpointId the failed checkpoint id used to count the continuous failure number
based on
@@ -58,7 +59,38 @@ public class CheckpointFailureManager {
 	 *                     happens before the checkpoint id generation. In this case, it will
be specified a negative
 	 *                      latest generated checkpoint id as a special flag.
 	 */
-	public void handleCheckpointException(CheckpointException exception, long checkpointId)
{
+	public void handleJobLevelCheckpointException(CheckpointException exception, long checkpointId)
{
+		checkFailureCounter(exception, checkpointId);
+		if (continuousFailureCounter.get() > tolerableCpFailureNumber) {
+			clearCount();
+			failureCallback.failJob(new FlinkRuntimeException("Exceeded checkpoint tolerable failure
threshold."));
+		}
+	}
+
+	/**
+	 * Handle task level checkpoint exception with a handler callback.
+	 *
+	 * @param exception the checkpoint exception.
+	 * @param checkpointId the failed checkpoint id used to count the continuous failure number
based on
+	 *                     checkpoint id sequence. In trigger phase, we may not get the checkpoint
id when the failure
+	 *                     happens before the checkpoint id generation. In this case, it will
be specified a negative
+	 *                      latest generated checkpoint id as a special flag.
+	 * @param executionAttemptID the execution attempt id, as a safe guard.
+	 */
+	public void handleTaskLevelCheckpointException(
+			CheckpointException exception,
+			long checkpointId,
+			ExecutionAttemptID executionAttemptID) {
+		checkFailureCounter(exception, checkpointId);
+		if (continuousFailureCounter.get() > tolerableCpFailureNumber) {
+			clearCount();
+			failureCallback.failJobDueToTaskFailure(new FlinkRuntimeException("Exceeded checkpoint
tolerable failure threshold."), executionAttemptID);
+		}
+	}
+
+	public void checkFailureCounter(
+			CheckpointException exception,
+			long checkpointId) {
 		if (tolerableCpFailureNumber == UNLIMITED_TOLERABLE_FAILURE_NUMBER) {
 			return;
 		}
@@ -102,11 +134,6 @@ public class CheckpointFailureManager {
 			default:
 				throw new FlinkRuntimeException("Unknown checkpoint failure reason : " + reason.name());
 		}
-
-		if (continuousFailureCounter.get() > tolerableCpFailureNumber) {
-			clearCount();
-			failureCallback.failJob(new FlinkRuntimeException("Exceeded checkpoint tolerable failure
threshold."));
-		}
 	}
 
 	/**
@@ -151,8 +178,21 @@ public class CheckpointFailureManager {
 	 */
 	public interface FailJobCallback {
 
+		/**
+		 * Fails the whole job graph.
+		 *
+		 * @param cause The reason why the synchronous savepoint fails.
+		 */
 		void failJob(final Throwable cause);
 
+		/**
+		 * Fails the whole job graph due to task failure.
+		 *
+		 * @param cause The reason why the job is cancelled.
+		 * @param failingTask The id of the failing task attempt to prevent failing the job multiple
times.
+		 */
+		void failJobDueToTaskFailure(final Throwable cause, final ExecutionAttemptID failingTask);
+
 	}
 
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index e8134e5..4eb6a73 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -575,8 +575,18 @@ public class ExecutionGraph implements AccessExecutionGraph {
 		checkpointStatsTracker = checkNotNull(statsTracker, "CheckpointStatsTracker");
 
 		CheckpointFailureManager failureManager = new CheckpointFailureManager(
-				chkConfig.getTolerableCheckpointFailureNumber(),
-				cause -> getJobMasterMainThreadExecutor().execute(() -> failGlobal(cause))
+			chkConfig.getTolerableCheckpointFailureNumber(),
+			new CheckpointFailureManager.FailJobCallback() {
+				@Override
+				public void failJob(Throwable cause) {
+					getJobMasterMainThreadExecutor().execute(() -> failGlobal(cause));
+				}
+
+				@Override
+				public void failJobDueToTaskFailure(Throwable cause, ExecutionAttemptID failingTask)
{
+					getJobMasterMainThreadExecutor().execute(() -> failGlobalIfExecutionIsStillRunning(cause,
failingTask));
+				}
+			}
 		);
 
 		// create the coordinator that triggers and commits checkpoints and holds the state
@@ -1097,6 +1107,16 @@ public class ExecutionGraph implements AccessExecutionGraph {
 		}
 	}
 
+	void failGlobalIfExecutionIsStillRunning(Throwable cause, ExecutionAttemptID failingAttempt)
{
+		final Execution failedExecution = currentExecutions.get(failingAttempt);
+		if (failedExecution != null && failedExecution.getState() == ExecutionState.RUNNING)
{
+			failGlobal(cause);
+		} else {
+			LOG.debug("The failing attempt {} belongs to an already not" +
+				" running task thus won't fail the job", failingAttempt);
+		}
+	}
+
 	/**
 	 * Fails the execution graph globally. This failure will not be recovered by a specific
 	 * failover strategy, but results in a full restart of all tasks.
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
index beda456..c3059a1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
@@ -66,7 +66,15 @@ public class CheckpointCoordinatorFailureTest extends TestLogger {
 
 		final long triggerTimestamp = 1L;
 
-		CheckpointFailureManager failureManager = new CheckpointFailureManager(0, throwable ->
{});
+		CheckpointFailureManager failureManager = new CheckpointFailureManager(
+			0,
+			new CheckpointFailureManager.FailJobCallback() {
+				@Override
+				public void failJob(Throwable cause) {}
+
+				@Override
+				public void failJobDueToTaskFailure(Throwable cause, ExecutionAttemptID failingTask)
{}
+			});
 
 		// set up the coordinator and validate the initial state
 		CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
index 7bd28e2..8067fb4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
@@ -442,7 +442,16 @@ public class CheckpointCoordinatorMasterHooksTest {
 				new MemoryStateBackend(),
 				Executors.directExecutor(),
 				SharedStateRegistry.DEFAULT_FACTORY,
-				new CheckpointFailureManager(0, throwable -> {}));
+				new CheckpointFailureManager(
+					0,
+					new CheckpointFailureManager.FailJobCallback() {
+						@Override
+						public void failJob(Throwable cause) {}
+
+						@Override
+						public void failJobDueToTaskFailure(Throwable cause, ExecutionAttemptID failingTask)
{}
+				})
+		);
 	}
 
 	private static <T> T mockGeneric(Class<?> clazz) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 18052bb..ffb148b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -128,7 +128,15 @@ public class CheckpointCoordinatorTest extends TestLogger {
 
 	@Before
 	public void setUp() throws Exception {
-		failureManager = new CheckpointFailureManager(0, throwable -> {});
+		failureManager = new CheckpointFailureManager(
+			0,
+			new CheckpointFailureManager.FailJobCallback() {
+				@Override
+				public void failJob(Throwable cause) {}
+
+				@Override
+				public void failJobDueToTaskFailure(Throwable cause, ExecutionAttemptID failingTask)
{}
+		});
 	}
 
 	@Test
@@ -327,9 +335,19 @@ public class CheckpointCoordinatorTest extends TestLogger {
 
 		final String errorMsg = "Exceeded checkpoint failure tolerance number!";
 
-		CheckpointFailureManager checkpointFailureManager = new CheckpointFailureManager(0, throwable
-> {
-			throw new RuntimeException(errorMsg);
-		});
+		CheckpointFailureManager checkpointFailureManager = new CheckpointFailureManager(
+			0,
+			new CheckpointFailureManager.FailJobCallback() {
+				@Override
+				public void failJob(Throwable cause) {
+					throw new RuntimeException(errorMsg);
+				}
+
+				@Override
+				public void failJobDueToTaskFailure(Throwable cause, ExecutionAttemptID failingTask)
{
+					throw new RuntimeException(errorMsg);
+				}
+			});
 
 		// set up the coordinator
 		CheckpointCoordinator coord = getCheckpointCoordinator(jid, vertex1, vertex2, checkpointFailureManager);
@@ -3910,10 +3928,20 @@ public class CheckpointCoordinatorTest extends TestLogger {
 
 		// set up the coordinator and validate the initial state
 		final CheckpointCoordinator coordinator = getCheckpointCoordinator(jobId, vertex1, vertex2,
-				new CheckpointFailureManager(0, throwable -> {
-					invocationCounterAndException.f0 += 1;
-					invocationCounterAndException.f1 = throwable;
-				}));
+				new CheckpointFailureManager(
+					0,
+					new CheckpointFailureManager.FailJobCallback() {
+						@Override
+						public void failJob(Throwable cause) {
+							invocationCounterAndException.f0 += 1;
+							invocationCounterAndException.f1 = cause;
+						}
+
+						@Override
+						public void failJobDueToTaskFailure(Throwable cause, ExecutionAttemptID failingTask)
{
+							throw new AssertionError("This method should not be called for the test.");
+						}
+					}));
 
 		final CompletableFuture<CompletedCheckpoint> savepointFuture = coordinator
 				.triggerSynchronousSavepoint(10L, false, "test-dir");
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java
index 193cb2d..6b050e5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.runtime.checkpoint;
 
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
@@ -33,15 +34,15 @@ public class CheckpointFailureManagerTest extends TestLogger {
 		TestFailJobCallback callback = new TestFailJobCallback();
 		CheckpointFailureManager failureManager = new CheckpointFailureManager(2, callback);
 
-		failureManager.handleCheckpointException(new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED),
1);
-		failureManager.handleCheckpointException(
+		failureManager.handleJobLevelCheckpointException(new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED),
1);
+		failureManager.handleJobLevelCheckpointException(
 			new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED), 2);
 
 		//ignore this
-		failureManager.handleCheckpointException(
+		failureManager.handleJobLevelCheckpointException(
 			new CheckpointException(CheckpointFailureReason.JOB_FAILOVER_REGION), 3);
 
-		failureManager.handleCheckpointException(
+		failureManager.handleJobLevelCheckpointException(
 			new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED), 4);
 		assertEquals(1, callback.getInvokeCounter());
 	}
@@ -51,18 +52,18 @@ public class CheckpointFailureManagerTest extends TestLogger {
 		TestFailJobCallback callback = new TestFailJobCallback();
 		CheckpointFailureManager failureManager = new CheckpointFailureManager(2, callback);
 
-		failureManager.handleCheckpointException(new CheckpointException(CheckpointFailureReason.EXCEPTION),
1);
-		failureManager.handleCheckpointException(
+		failureManager.handleJobLevelCheckpointException(new CheckpointException(CheckpointFailureReason.EXCEPTION),
1);
+		failureManager.handleJobLevelCheckpointException(
 			new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED), 2);
 
 		//ignore this
-		failureManager.handleCheckpointException(
+		failureManager.handleJobLevelCheckpointException(
 			new CheckpointException(CheckpointFailureReason.JOB_FAILOVER_REGION), 3);
 
 		//reset
 		failureManager.handleCheckpointSuccess(4);
 
-		failureManager.handleCheckpointException(
+		failureManager.handleJobLevelCheckpointException(
 			new CheckpointException(CheckpointFailureReason.CHECKPOINT_EXPIRED), 5);
 		assertEquals(0, callback.getInvokeCounter());
 	}
@@ -72,7 +73,7 @@ public class CheckpointFailureManagerTest extends TestLogger {
 		TestFailJobCallback callback = new TestFailJobCallback();
 		CheckpointFailureManager failureManager = new CheckpointFailureManager(0, callback);
 		for (CheckpointFailureReason reason : CheckpointFailureReason.values()) {
-			failureManager.handleCheckpointException(new CheckpointException(reason), -1);
+			failureManager.handleJobLevelCheckpointException(new CheckpointException(reason), -1);
 		}
 
 		assertEquals(1, callback.getInvokeCounter());
@@ -83,16 +84,16 @@ public class CheckpointFailureManagerTest extends TestLogger {
 		TestFailJobCallback callback = new TestFailJobCallback();
 		CheckpointFailureManager failureManager = new CheckpointFailureManager(2, callback);
 
-		failureManager.handleCheckpointException(new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED),
1);
-		failureManager.handleCheckpointException(
+		failureManager.handleJobLevelCheckpointException(new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED),
1);
+		failureManager.handleJobLevelCheckpointException(
 			new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED), 2);
 
 		//ignore this
-		failureManager.handleCheckpointException(
+		failureManager.handleJobLevelCheckpointException(
 			new CheckpointException(CheckpointFailureReason.JOB_FAILOVER_REGION), 3);
 
 		//ignore repeatedly report from one checkpoint
-		failureManager.handleCheckpointException(
+		failureManager.handleJobLevelCheckpointException(
 			new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED), 2);
 		assertEquals(0, callback.getInvokeCounter());
 	}
@@ -105,7 +106,12 @@ public class CheckpointFailureManagerTest extends TestLogger {
 		private int invokeCounter = 0;
 
 		@Override
-		public void failJob(final Throwable cause) {
+		public void failJob(Throwable cause) {
+			invokeCounter++;
+		}
+
+		@Override
+		public void failJobDueToTaskFailure(final Throwable cause, final ExecutionAttemptID executionAttemptID)
{
 			invokeCounter++;
 		}
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
index 1fa2a83..a11dfc8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
@@ -67,7 +67,15 @@ public class CheckpointStateRestoreTest {
 
 	@Before
 	public void setUp() throws Exception {
-		failureManager = new CheckpointFailureManager(0, throwable -> {});
+		failureManager = new CheckpointFailureManager(
+			0,
+			new CheckpointFailureManager.FailJobCallback() {
+				@Override
+				public void failJob(Throwable cause) {}
+
+				@Override
+				public void failJobDueToTaskFailure(Throwable cause, ExecutionAttemptID failingTask)
{}
+			});
 	}
 
 	/**
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index a885398..785ae8b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -733,6 +733,47 @@ public class ExecutionGraphRestartTest extends TestLogger {
 		assertEquals(JobStatus.RUNNING, executionGraph.getState());
 	}
 
+	@Test
+	public void failGlobalIfExecutionIsStillRunning_failingAnExecutionTwice_ShouldTriggerOnlyOneFailover()
throws Exception {
+		JobVertex sender = ExecutionGraphTestUtils.createJobVertex("Task1", 1, NoOpInvokable.class);
+		JobVertex receiver = ExecutionGraphTestUtils.createJobVertex("Task2", 1, NoOpInvokable.class);
+		JobGraph jobGraph = new JobGraph("Pointwise job", sender, receiver);
+
+		try (SlotPool slotPool = createSlotPoolImpl()) {
+			ExecutionGraph eg = TestingExecutionGraphBuilder.newBuilder()
+				.setRestartStrategy(new TestRestartStrategy(1, false))
+				.setJobGraph(jobGraph)
+				.setNumberOfTasks(2)
+				.buildAndScheduleForExecution(slotPool);
+
+			Iterator<ExecutionVertex> executionVertices = eg.getAllExecutionVertices().iterator();
+
+			Execution finishedExecution = executionVertices.next().getCurrentExecutionAttempt();
+			Execution failedExecution = executionVertices.next().getCurrentExecutionAttempt();
+
+			finishedExecution.markFinished();
+
+			failedExecution.fail(new Exception("Test Exception"));
+			failedExecution.completeCancelling();
+
+			assertEquals(JobStatus.RUNNING, eg.getState());
+
+			// At this point all resources have been assigned
+			for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
+				assertNotNull("No assigned resource (test instability).", vertex.getCurrentAssignedResource());
+				vertex.getCurrentExecutionAttempt().switchToRunning();
+			}
+
+			// fail global with old finished execution, this should not affect the execution
+			eg.failGlobalIfExecutionIsStillRunning(new Exception("This should have no effect"), finishedExecution.getAttemptId());
+
+			assertThat(eg.getState(), is(JobStatus.RUNNING));
+
+			// the state of the finished execution should have not changed since it is terminal
+			assertThat(finishedExecution.getState(), is(ExecutionState.FINISHED));
+		}
+	}
+
 	// ------------------------------------------------------------------------
 	//  Utilities
 	// ------------------------------------------------------------------------


Mime
View raw message