From issues-return-446490-archive-asf-public=cust-asf.ponee.io@flink.apache.org Mon Feb 15 16:34:19 2021 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mxout1-he-de.apache.org (mxout1-he-de.apache.org [95.216.194.37]) by mx-eu-01.ponee.io (Postfix) with ESMTPS id A7B9518064D for ; Mon, 15 Feb 2021 17:34:19 +0100 (CET) Received: from mail.apache.org (mailroute1-lw-us.apache.org [207.244.88.153]) by mxout1-he-de.apache.org (ASF Mail Server at mxout1-he-de.apache.org) with SMTP id E7509642E8 for ; Mon, 15 Feb 2021 16:34:18 +0000 (UTC) Received: (qmail 63652 invoked by uid 500); 15 Feb 2021 16:34:18 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 63643 invoked by uid 99); 15 Feb 2021 16:34:18 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 15 Feb 2021 16:34:18 +0000 From: =?utf-8?q?GitBox?= To: issues@flink.apache.org Subject: =?utf-8?q?=5BGitHub=5D_=5Bflink=5D_tillrohrmann_commented_on_a_change_in_pul?= =?utf-8?q?l_request_=2314847=3A_=5BFLINK-21030=5D=5Bruntime=5D_Add_global_f?= =?utf-8?q?ailover_in_case_of_a_stop-with-savepoint_failure?= Message-ID: <161340685809.15632.2444771301749064133.asfpy@gitbox.apache.org> Date: Mon, 15 Feb 2021 16:34:18 -0000 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit References: In-Reply-To: tillrohrmann commented on a change in pull request #14847: URL: https://github.com/apache/flink/pull/14847#discussion_r576311046 ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java ########## @@ -605,6 +625,414 @@ public void abortPendingCheckpointsWhenRestartingTasks() throws Exception { assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints(), is(equalTo(0))); } + @Test + public void testStopWithSavepointFailingAfterSavepointCreation() throws Exception { + // initially, we don't allow any restarts since the first phase (savepoint creation) + // succeeds without any failures + testRestartBackoffTimeStrategy.setCanRestart(false); + + final JobGraph jobGraph = createTwoVertexJobGraphWithCheckpointingEnabled(); + + final SimpleAckingTaskManagerGateway taskManagerGateway = + new SimpleAckingTaskManagerGateway(); + final CountDownLatch checkpointTriggeredLatch = + getCheckpointTriggeredLatch(taskManagerGateway); + + // collect executions to which the checkpoint completion was confirmed + final List executionAttemptIdsWithCompletedCheckpoint = + new ArrayList<>(); + taskManagerGateway.setNotifyCheckpointCompleteConsumer( + (executionAttemptId, jobId, actualCheckpointId, timestamp) -> + executionAttemptIdsWithCompletedCheckpoint.add(executionAttemptId)); + taskManagerGateway.setNotifyCheckpointAbortedConsumer( + (ignored0, ignored1, ignored2, ignored3) -> { + throw new UnsupportedOperationException("notifyCheckpointAborted was called"); + }); + + final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph); + + final ExecutionAttemptID succeedingExecutionAttemptId = + Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0) + .getCurrentExecutionAttempt() + .getAttemptId(); + final ExecutionAttemptID failingExecutionAttemptId = + Iterables.getLast(scheduler.getExecutionGraph().getAllExecutionVertices()) + .getCurrentExecutionAttempt() + .getAttemptId(); + + // we have to make sure that the tasks are running before stop-with-savepoint is triggered + scheduler.updateTaskExecutionState( + new TaskExecutionState( + jobGraph.getJobID(), failingExecutionAttemptId, ExecutionState.RUNNING)); + scheduler.updateTaskExecutionState( + new TaskExecutionState( + jobGraph.getJobID(), succeedingExecutionAttemptId, ExecutionState.RUNNING)); + + final String savepointFolder = TEMPORARY_FOLDER.newFolder().getAbsolutePath(); + + // trigger savepoint and wait for checkpoint to be retrieved by TaskManagerGateway + final CompletableFuture stopWithSavepointFuture = + scheduler.stopWithSavepoint(savepointFolder, false); + checkpointTriggeredLatch.await(); + + acknowledgePendingCheckpoint(scheduler, 1); + + assertThat( + "Both the executions where notified about the completed checkpoint.", + executionAttemptIdsWithCompletedCheckpoint, + containsInAnyOrder(failingExecutionAttemptId, succeedingExecutionAttemptId)); + + // The savepoint creation succeeded a failure happens in the second phase when finishing + // the tasks. That's why, the restarting policy is enabled. + testRestartBackoffTimeStrategy.setCanRestart(true); + + scheduler.updateTaskExecutionState( + new TaskExecutionState( + jobGraph.getJobID(), failingExecutionAttemptId, ExecutionState.FAILED)); + scheduler.updateTaskExecutionState( + new TaskExecutionState( + jobGraph.getJobID(), + succeedingExecutionAttemptId, + ExecutionState.FINISHED)); + + // the restarts due to local failure handling and global job fail-over are triggered + assertThat(taskRestartExecutor.getNonPeriodicScheduledTask(), hasSize(2)); Review comment: Maybe there is a better way for asserting that the whole job has been restarted. I agree with Robert's observation that the test includes a lot of implementation specific knowledge. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: users@infra.apache.org