From commits-return-25257-archive-asf-public=cust-asf.ponee.io@flink.apache.org Fri Aug 9 12:49:57 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id 1F4BD180642 for ; Fri, 9 Aug 2019 14:49:57 +0200 (CEST) Received: (qmail 39655 invoked by uid 500); 9 Aug 2019 12:49:56 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 39646 invoked by uid 99); 9 Aug 2019 12:49:56 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 09 Aug 2019 12:49:56 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 500E985F0A; Fri, 9 Aug 2019 12:49:56 +0000 (UTC) Date: Fri, 09 Aug 2019 12:49:48 +0000 To: "commits@flink.apache.org" Subject: [flink] 01/02: [FLINK-13593][checkpointing] Prevent failing the wrong job in CheckpointFailureManager MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit From: trohrmann@apache.org In-Reply-To: <156535498488.22462.518534613724470619@gitbox.apache.org> References: <156535498488.22462.518534613724470619@gitbox.apache.org> X-Git-Host: gitbox.apache.org X-Git-Repo: flink X-Git-Refname: refs/heads/release-1.9 X-Git-Reftype: branch X-Git-Rev: 1e22e641e4185e1964459d2486e676526990d719 X-Git-NotificationType: diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated Message-Id: <20190809124956.500E985F0A@gitbox.apache.org> This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch release-1.9 in repository https://gitbox.apache.org/repos/asf/flink.git commit 1e22e641e4185e1964459d2486e676526990d719 Author: Yu Li AuthorDate: Tue Aug 6 07:57:12 2019 +0200 [FLINK-13593][checkpointing] Prevent failing the wrong job in CheckpointFailureManager This closes #9364. --- .../runtime/checkpoint/CheckpointCoordinator.java | 52 ++++++++++++++++----- .../checkpoint/CheckpointFailureManager.java | 54 +++++++++++++++++++--- .../runtime/executiongraph/ExecutionGraph.java | 24 +++++++++- .../CheckpointCoordinatorFailureTest.java | 10 +++- .../CheckpointCoordinatorMasterHooksTest.java | 11 ++++- .../checkpoint/CheckpointCoordinatorTest.java | 44 ++++++++++++++---- .../checkpoint/CheckpointFailureManagerTest.java | 34 ++++++++------ .../checkpoint/CheckpointStateRestoreTest.java | 10 +++- .../executiongraph/ExecutionGraphRestartTest.java | 41 ++++++++++++++++ 9 files changed, 235 insertions(+), 45 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 7f258c9..9f4e703 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -396,6 +396,7 @@ public class CheckpointCoordinator { @Nullable final String targetLocation) { final CheckpointProperties properties = CheckpointProperties.forSyncSavepoint(); + return triggerSavepointInternal(timestamp, properties, advanceToEndOfEventTime, targetLocation).handle( (completedCheckpoint, throwable) -> { if (throwable != null) { @@ -447,7 +448,7 @@ public class CheckpointCoordinator { long latestGeneratedCheckpointId = getCheckpointIdCounter().get(); // here we can not get the failed pending checkpoint's id, // so we pass the negative latest generated checkpoint id as a special flag - failureManager.handleCheckpointException(e, -1 * latestGeneratedCheckpointId); + failureManager.handleJobLevelCheckpointException(e, -1 * latestGeneratedCheckpointId); return false; } } @@ -718,7 +719,7 @@ public class CheckpointCoordinator { message.getTaskExecutionId(), job, taskManagerLocationInfo); - discardCheckpoint(checkpoint, message.getReason()); + discardCheckpoint(checkpoint, message.getReason(), message.getTaskExecutionId()); } else if (checkpoint != null) { // this should not happen @@ -945,7 +946,7 @@ public class CheckpointCoordinator { if (!pendingCheckpoint.isAcknowledgedBy(executionAttemptId)) { pendingCheckpointIterator.remove(); - discardCheckpoint(pendingCheckpoint, cause); + discardCheckpoint(pendingCheckpoint, cause, executionAttemptId); } } } @@ -1332,8 +1333,12 @@ public class CheckpointCoordinator { * * @param pendingCheckpoint to discard * @param cause for discarding the checkpoint + * @param executionAttemptID the execution attempt id of the failing task. */ - private void discardCheckpoint(PendingCheckpoint pendingCheckpoint, @Nullable Throwable cause) { + private void discardCheckpoint( + PendingCheckpoint pendingCheckpoint, + @Nullable Throwable cause, + ExecutionAttemptID executionAttemptID) { assert(Thread.holdsLock(lock)); Preconditions.checkNotNull(pendingCheckpoint); @@ -1342,12 +1347,12 @@ public class CheckpointCoordinator { LOG.info("Discarding checkpoint {} of job {}.", checkpointId, job, cause); if (cause == null) { - failPendingCheckpoint(pendingCheckpoint, CheckpointFailureReason.CHECKPOINT_DECLINED); + failPendingCheckpointDueToTaskFailure(pendingCheckpoint, CheckpointFailureReason.CHECKPOINT_DECLINED, executionAttemptID); } else if (cause instanceof CheckpointException) { CheckpointException exception = (CheckpointException) cause; - failPendingCheckpoint(pendingCheckpoint, exception.getCheckpointFailureReason(), cause); + failPendingCheckpointDueToTaskFailure(pendingCheckpoint, exception.getCheckpointFailureReason(), cause, executionAttemptID); } else { - failPendingCheckpoint(pendingCheckpoint, CheckpointFailureReason.JOB_FAILURE, cause); + failPendingCheckpointDueToTaskFailure(pendingCheckpoint, CheckpointFailureReason.JOB_FAILURE, cause, executionAttemptID); } rememberRecentCheckpointId(checkpointId); @@ -1401,21 +1406,46 @@ public class CheckpointCoordinator { } private void failPendingCheckpoint( - final PendingCheckpoint pendingCheckpoint, - final CheckpointFailureReason reason) { + final PendingCheckpoint pendingCheckpoint, + final CheckpointFailureReason reason) { failPendingCheckpoint(pendingCheckpoint, reason, null); } private void failPendingCheckpoint( + final PendingCheckpoint pendingCheckpoint, + final CheckpointFailureReason reason, + @Nullable final Throwable cause) { + + CheckpointException exception = new CheckpointException(reason, cause); + pendingCheckpoint.abort(reason, cause); + failureManager.handleJobLevelCheckpointException(exception, pendingCheckpoint.getCheckpointId()); + + checkAndResetCheckpointScheduler(); + } + + private void failPendingCheckpointDueToTaskFailure( + final PendingCheckpoint pendingCheckpoint, + final CheckpointFailureReason reason, + final ExecutionAttemptID executionAttemptID) { + + failPendingCheckpointDueToTaskFailure(pendingCheckpoint, reason, null, executionAttemptID); + } + + private void failPendingCheckpointDueToTaskFailure( final PendingCheckpoint pendingCheckpoint, final CheckpointFailureReason reason, - final Throwable cause) { + @Nullable final Throwable cause, + final ExecutionAttemptID executionAttemptID) { CheckpointException exception = new CheckpointException(reason, cause); pendingCheckpoint.abort(reason, cause); - failureManager.handleCheckpointException(exception, pendingCheckpoint.getCheckpointId()); + failureManager.handleTaskLevelCheckpointException(exception, pendingCheckpoint.getCheckpointId(), executionAttemptID); + + checkAndResetCheckpointScheduler(); + } + private void checkAndResetCheckpointScheduler() { if (!shutdown && periodicScheduling && currentPeriodicTrigger == null) { synchronized (lock) { if (pendingCheckpoints.isEmpty() || allPendingCheckpointsDiscarded()) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java index 8d12bef..841ac2e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java @@ -17,6 +17,7 @@ package org.apache.flink.runtime.checkpoint; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkRuntimeException; @@ -50,7 +51,7 @@ public class CheckpointFailureManager { } /** - * Handle checkpoint exception with a handler callback. + * Handle job level checkpoint exception with a handler callback. * * @param exception the checkpoint exception. * @param checkpointId the failed checkpoint id used to count the continuous failure number based on @@ -58,7 +59,38 @@ public class CheckpointFailureManager { * happens before the checkpoint id generation. In this case, it will be specified a negative * latest generated checkpoint id as a special flag. */ - public void handleCheckpointException(CheckpointException exception, long checkpointId) { + public void handleJobLevelCheckpointException(CheckpointException exception, long checkpointId) { + checkFailureCounter(exception, checkpointId); + if (continuousFailureCounter.get() > tolerableCpFailureNumber) { + clearCount(); + failureCallback.failJob(new FlinkRuntimeException("Exceeded checkpoint tolerable failure threshold.")); + } + } + + /** + * Handle task level checkpoint exception with a handler callback. + * + * @param exception the checkpoint exception. + * @param checkpointId the failed checkpoint id used to count the continuous failure number based on + * checkpoint id sequence. In trigger phase, we may not get the checkpoint id when the failure + * happens before the checkpoint id generation. In this case, it will be specified a negative + * latest generated checkpoint id as a special flag. + * @param executionAttemptID the execution attempt id, as a safe guard. + */ + public void handleTaskLevelCheckpointException( + CheckpointException exception, + long checkpointId, + ExecutionAttemptID executionAttemptID) { + checkFailureCounter(exception, checkpointId); + if (continuousFailureCounter.get() > tolerableCpFailureNumber) { + clearCount(); + failureCallback.failJobDueToTaskFailure(new FlinkRuntimeException("Exceeded checkpoint tolerable failure threshold."), executionAttemptID); + } + } + + public void checkFailureCounter( + CheckpointException exception, + long checkpointId) { if (tolerableCpFailureNumber == UNLIMITED_TOLERABLE_FAILURE_NUMBER) { return; } @@ -102,11 +134,6 @@ public class CheckpointFailureManager { default: throw new FlinkRuntimeException("Unknown checkpoint failure reason : " + reason.name()); } - - if (continuousFailureCounter.get() > tolerableCpFailureNumber) { - clearCount(); - failureCallback.failJob(new FlinkRuntimeException("Exceeded checkpoint tolerable failure threshold.")); - } } /** @@ -151,8 +178,21 @@ public class CheckpointFailureManager { */ public interface FailJobCallback { + /** + * Fails the whole job graph. + * + * @param cause The reason why the synchronous savepoint fails. + */ void failJob(final Throwable cause); + /** + * Fails the whole job graph due to task failure. + * + * @param cause The reason why the job is cancelled. + * @param failingTask The id of the failing task attempt to prevent failing the job multiple times. + */ + void failJobDueToTaskFailure(final Throwable cause, final ExecutionAttemptID failingTask); + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index e8134e5..4eb6a73 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -575,8 +575,18 @@ public class ExecutionGraph implements AccessExecutionGraph { checkpointStatsTracker = checkNotNull(statsTracker, "CheckpointStatsTracker"); CheckpointFailureManager failureManager = new CheckpointFailureManager( - chkConfig.getTolerableCheckpointFailureNumber(), - cause -> getJobMasterMainThreadExecutor().execute(() -> failGlobal(cause)) + chkConfig.getTolerableCheckpointFailureNumber(), + new CheckpointFailureManager.FailJobCallback() { + @Override + public void failJob(Throwable cause) { + getJobMasterMainThreadExecutor().execute(() -> failGlobal(cause)); + } + + @Override + public void failJobDueToTaskFailure(Throwable cause, ExecutionAttemptID failingTask) { + getJobMasterMainThreadExecutor().execute(() -> failGlobalIfExecutionIsStillRunning(cause, failingTask)); + } + } ); // create the coordinator that triggers and commits checkpoints and holds the state @@ -1097,6 +1107,16 @@ public class ExecutionGraph implements AccessExecutionGraph { } } + void failGlobalIfExecutionIsStillRunning(Throwable cause, ExecutionAttemptID failingAttempt) { + final Execution failedExecution = currentExecutions.get(failingAttempt); + if (failedExecution != null && failedExecution.getState() == ExecutionState.RUNNING) { + failGlobal(cause); + } else { + LOG.debug("The failing attempt {} belongs to an already not" + + " running task thus won't fail the job", failingAttempt); + } + } + /** * Fails the execution graph globally. This failure will not be recovered by a specific * failover strategy, but results in a full restart of all tasks. diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java index beda456..c3059a1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java @@ -66,7 +66,15 @@ public class CheckpointCoordinatorFailureTest extends TestLogger { final long triggerTimestamp = 1L; - CheckpointFailureManager failureManager = new CheckpointFailureManager(0, throwable -> {}); + CheckpointFailureManager failureManager = new CheckpointFailureManager( + 0, + new CheckpointFailureManager.FailJobCallback() { + @Override + public void failJob(Throwable cause) {} + + @Override + public void failJobDueToTaskFailure(Throwable cause, ExecutionAttemptID failingTask) {} + }); // set up the coordinator and validate the initial state CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java index 7bd28e2..8067fb4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java @@ -442,7 +442,16 @@ public class CheckpointCoordinatorMasterHooksTest { new MemoryStateBackend(), Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY, - new CheckpointFailureManager(0, throwable -> {})); + new CheckpointFailureManager( + 0, + new CheckpointFailureManager.FailJobCallback() { + @Override + public void failJob(Throwable cause) {} + + @Override + public void failJobDueToTaskFailure(Throwable cause, ExecutionAttemptID failingTask) {} + }) + ); } private static T mockGeneric(Class clazz) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java index 18052bb..ffb148b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java @@ -128,7 +128,15 @@ public class CheckpointCoordinatorTest extends TestLogger { @Before public void setUp() throws Exception { - failureManager = new CheckpointFailureManager(0, throwable -> {}); + failureManager = new CheckpointFailureManager( + 0, + new CheckpointFailureManager.FailJobCallback() { + @Override + public void failJob(Throwable cause) {} + + @Override + public void failJobDueToTaskFailure(Throwable cause, ExecutionAttemptID failingTask) {} + }); } @Test @@ -327,9 +335,19 @@ public class CheckpointCoordinatorTest extends TestLogger { final String errorMsg = "Exceeded checkpoint failure tolerance number!"; - CheckpointFailureManager checkpointFailureManager = new CheckpointFailureManager(0, throwable -> { - throw new RuntimeException(errorMsg); - }); + CheckpointFailureManager checkpointFailureManager = new CheckpointFailureManager( + 0, + new CheckpointFailureManager.FailJobCallback() { + @Override + public void failJob(Throwable cause) { + throw new RuntimeException(errorMsg); + } + + @Override + public void failJobDueToTaskFailure(Throwable cause, ExecutionAttemptID failingTask) { + throw new RuntimeException(errorMsg); + } + }); // set up the coordinator CheckpointCoordinator coord = getCheckpointCoordinator(jid, vertex1, vertex2, checkpointFailureManager); @@ -3910,10 +3928,20 @@ public class CheckpointCoordinatorTest extends TestLogger { // set up the coordinator and validate the initial state final CheckpointCoordinator coordinator = getCheckpointCoordinator(jobId, vertex1, vertex2, - new CheckpointFailureManager(0, throwable -> { - invocationCounterAndException.f0 += 1; - invocationCounterAndException.f1 = throwable; - })); + new CheckpointFailureManager( + 0, + new CheckpointFailureManager.FailJobCallback() { + @Override + public void failJob(Throwable cause) { + invocationCounterAndException.f0 += 1; + invocationCounterAndException.f1 = cause; + } + + @Override + public void failJobDueToTaskFailure(Throwable cause, ExecutionAttemptID failingTask) { + throw new AssertionError("This method should not be called for the test."); + } + })); final CompletableFuture savepointFuture = coordinator .triggerSynchronousSavepoint(10L, false, "test-dir"); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java index 193cb2d..6b050e5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java @@ -17,6 +17,7 @@ package org.apache.flink.runtime.checkpoint; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.util.TestLogger; import org.junit.Test; @@ -33,15 +34,15 @@ public class CheckpointFailureManagerTest extends TestLogger { TestFailJobCallback callback = new TestFailJobCallback(); CheckpointFailureManager failureManager = new CheckpointFailureManager(2, callback); - failureManager.handleCheckpointException(new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED), 1); - failureManager.handleCheckpointException( + failureManager.handleJobLevelCheckpointException(new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED), 1); + failureManager.handleJobLevelCheckpointException( new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED), 2); //ignore this - failureManager.handleCheckpointException( + failureManager.handleJobLevelCheckpointException( new CheckpointException(CheckpointFailureReason.JOB_FAILOVER_REGION), 3); - failureManager.handleCheckpointException( + failureManager.handleJobLevelCheckpointException( new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED), 4); assertEquals(1, callback.getInvokeCounter()); } @@ -51,18 +52,18 @@ public class CheckpointFailureManagerTest extends TestLogger { TestFailJobCallback callback = new TestFailJobCallback(); CheckpointFailureManager failureManager = new CheckpointFailureManager(2, callback); - failureManager.handleCheckpointException(new CheckpointException(CheckpointFailureReason.EXCEPTION), 1); - failureManager.handleCheckpointException( + failureManager.handleJobLevelCheckpointException(new CheckpointException(CheckpointFailureReason.EXCEPTION), 1); + failureManager.handleJobLevelCheckpointException( new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED), 2); //ignore this - failureManager.handleCheckpointException( + failureManager.handleJobLevelCheckpointException( new CheckpointException(CheckpointFailureReason.JOB_FAILOVER_REGION), 3); //reset failureManager.handleCheckpointSuccess(4); - failureManager.handleCheckpointException( + failureManager.handleJobLevelCheckpointException( new CheckpointException(CheckpointFailureReason.CHECKPOINT_EXPIRED), 5); assertEquals(0, callback.getInvokeCounter()); } @@ -72,7 +73,7 @@ public class CheckpointFailureManagerTest extends TestLogger { TestFailJobCallback callback = new TestFailJobCallback(); CheckpointFailureManager failureManager = new CheckpointFailureManager(0, callback); for (CheckpointFailureReason reason : CheckpointFailureReason.values()) { - failureManager.handleCheckpointException(new CheckpointException(reason), -1); + failureManager.handleJobLevelCheckpointException(new CheckpointException(reason), -1); } assertEquals(1, callback.getInvokeCounter()); @@ -83,16 +84,16 @@ public class CheckpointFailureManagerTest extends TestLogger { TestFailJobCallback callback = new TestFailJobCallback(); CheckpointFailureManager failureManager = new CheckpointFailureManager(2, callback); - failureManager.handleCheckpointException(new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED), 1); - failureManager.handleCheckpointException( + failureManager.handleJobLevelCheckpointException(new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED), 1); + failureManager.handleJobLevelCheckpointException( new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED), 2); //ignore this - failureManager.handleCheckpointException( + failureManager.handleJobLevelCheckpointException( new CheckpointException(CheckpointFailureReason.JOB_FAILOVER_REGION), 3); //ignore repeatedly report from one checkpoint - failureManager.handleCheckpointException( + failureManager.handleJobLevelCheckpointException( new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED), 2); assertEquals(0, callback.getInvokeCounter()); } @@ -105,7 +106,12 @@ public class CheckpointFailureManagerTest extends TestLogger { private int invokeCounter = 0; @Override - public void failJob(final Throwable cause) { + public void failJob(Throwable cause) { + invokeCounter++; + } + + @Override + public void failJobDueToTaskFailure(final Throwable cause, final ExecutionAttemptID executionAttemptID) { invokeCounter++; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java index 1fa2a83..a11dfc8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java @@ -67,7 +67,15 @@ public class CheckpointStateRestoreTest { @Before public void setUp() throws Exception { - failureManager = new CheckpointFailureManager(0, throwable -> {}); + failureManager = new CheckpointFailureManager( + 0, + new CheckpointFailureManager.FailJobCallback() { + @Override + public void failJob(Throwable cause) {} + + @Override + public void failJobDueToTaskFailure(Throwable cause, ExecutionAttemptID failingTask) {} + }); } /** diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java index a885398..785ae8b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java @@ -733,6 +733,47 @@ public class ExecutionGraphRestartTest extends TestLogger { assertEquals(JobStatus.RUNNING, executionGraph.getState()); } + @Test + public void failGlobalIfExecutionIsStillRunning_failingAnExecutionTwice_ShouldTriggerOnlyOneFailover() throws Exception { + JobVertex sender = ExecutionGraphTestUtils.createJobVertex("Task1", 1, NoOpInvokable.class); + JobVertex receiver = ExecutionGraphTestUtils.createJobVertex("Task2", 1, NoOpInvokable.class); + JobGraph jobGraph = new JobGraph("Pointwise job", sender, receiver); + + try (SlotPool slotPool = createSlotPoolImpl()) { + ExecutionGraph eg = TestingExecutionGraphBuilder.newBuilder() + .setRestartStrategy(new TestRestartStrategy(1, false)) + .setJobGraph(jobGraph) + .setNumberOfTasks(2) + .buildAndScheduleForExecution(slotPool); + + Iterator executionVertices = eg.getAllExecutionVertices().iterator(); + + Execution finishedExecution = executionVertices.next().getCurrentExecutionAttempt(); + Execution failedExecution = executionVertices.next().getCurrentExecutionAttempt(); + + finishedExecution.markFinished(); + + failedExecution.fail(new Exception("Test Exception")); + failedExecution.completeCancelling(); + + assertEquals(JobStatus.RUNNING, eg.getState()); + + // At this point all resources have been assigned + for (ExecutionVertex vertex : eg.getAllExecutionVertices()) { + assertNotNull("No assigned resource (test instability).", vertex.getCurrentAssignedResource()); + vertex.getCurrentExecutionAttempt().switchToRunning(); + } + + // fail global with old finished execution, this should not affect the execution + eg.failGlobalIfExecutionIsStillRunning(new Exception("This should have no effect"), finishedExecution.getAttemptId()); + + assertThat(eg.getState(), is(JobStatus.RUNNING)); + + // the state of the finished execution should have not changed since it is terminal + assertThat(finishedExecution.getState(), is(ExecutionState.FINISHED)); + } + } + // ------------------------------------------------------------------------ // Utilities // ------------------------------------------------------------------------