flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [flink] tillrohrmann commented on a change in pull request #12670: [FLINK-18290][checkpointing] Fail job on checkpoint future failure instead of System.exit
Date Tue, 16 Jun 2020 12:29:06 GMT

tillrohrmann commented on a change in pull request #12670:
URL: https://github.com/apache/flink/pull/12670#discussion_r440796524



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -538,51 +542,61 @@ private void startTriggeringCheckpoint(CheckpointTriggerRequest request)
{
 									coordinatorsToCheckpoint, pendingCheckpoint, timer),
 							timer);
 
-			FutureUtils.assertNoException(
-				CompletableFuture.allOf(masterStatesComplete, coordinatorCheckpointsComplete)
-					.handleAsync(
-						(ignored, throwable) -> {
-							final PendingCheckpoint checkpoint =
-								FutureUtils.getWithoutException(pendingCheckpointCompletableFuture);
-
-							Preconditions.checkState(
-								checkpoint != null || throwable != null,
-								"Either the pending checkpoint needs to be created or an error must have been occurred.");
-
-							if (throwable != null) {
-								// the initialization might not be finished yet
-								if (checkpoint == null) {
-									onTriggerFailure(request, throwable);
-								} else {
-									onTriggerFailure(checkpoint, throwable);
-								}
+			FutureUtils.waitForAll(asList(masterStatesComplete, coordinatorCheckpointsComplete))
+				.handleAsync(
+					(ignored, throwable) -> {
+						final PendingCheckpoint checkpoint =
+							FutureUtils.getWithoutException(pendingCheckpointCompletableFuture);
+
+						Preconditions.checkState(
+							checkpoint != null || throwable != null,
+							"Either the pending checkpoint needs to be created or an error must have been occurred.");
+
+						if (throwable != null) {
+							// the initialization might not be finished yet
+							if (checkpoint == null) {
+								onTriggerFailure(request, throwable);
 							} else {
-								if (checkpoint.isDiscarded()) {
-									onTriggerFailure(
-										checkpoint,
-										new CheckpointException(
-											CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE,
-											checkpoint.getFailureCause()));
-								} else {
-									// no exception, no discarding, everything is OK
-									final long checkpointId = checkpoint.getCheckpointId();
-									snapshotTaskState(
-										timestamp,
-										checkpointId,
-										checkpoint.getCheckpointStorageLocation(),
-										request.props,
-										executions,
-										request.advanceToEndOfTime);
-
-									coordinatorsToCheckpoint.forEach((ctx) -> ctx.afterSourceBarrierInjection(checkpointId));
-
-									onTriggerSuccess();
-								}
+								onTriggerFailure(checkpoint, throwable);
 							}
+						} else {
+							if (checkpoint.isDiscarded()) {
+								onTriggerFailure(
+									checkpoint,
+									new CheckpointException(
+										CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE,
+										checkpoint.getFailureCause()));
+							} else {
+								// no exception, no discarding, everything is OK
+								final long checkpointId = checkpoint.getCheckpointId();
+								snapshotTaskState(
+									timestamp,
+									checkpointId,
+									checkpoint.getCheckpointStorageLocation(),
+									request.props,
+									executions,
+									request.advanceToEndOfTime);
+
+								coordinatorsToCheckpoint.forEach((ctx) -> ctx.afterSourceBarrierInjection(checkpointId));
+
+								onTriggerSuccess();
+							}
+						}
 
-							return null;
-						},
-						timer));
+						return null;
+					},
+					timer)
+				.whenComplete((unused, error) -> {
+					if (error != null) {
+						if (!isShutdown()) {
+							failureManager.handleJobLevelCheckpointException(new CheckpointException(EXCEPTION,
error), Optional.empty());

Review comment:
       I think if a failure occurs here, then there must a programming error or it is an artifact
of a non properly shut down task. In the first case, I think we should fail hard. In the latter
case (e.g. `if (isShutdown())` we should ignore the exception/log it on debug).

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -539,51 +541,62 @@ private void startTriggeringCheckpoint(CheckpointTriggerRequest request)
{
 									coordinatorsToCheckpoint, pendingCheckpoint, timer),
 							timer);
 
-			FutureUtils.assertNoException(
-				CompletableFuture.allOf(masterStatesComplete, coordinatorCheckpointsComplete)
-					.handleAsync(
-						(ignored, throwable) -> {
-							final PendingCheckpoint checkpoint =
-								FutureUtils.getWithoutException(pendingCheckpointCompletableFuture);
-
-							Preconditions.checkState(
-								checkpoint != null || throwable != null,
-								"Either the pending checkpoint needs to be created or an error must have been occurred.");
-
-							if (throwable != null) {
-								// the initialization might not be finished yet
-								if (checkpoint == null) {
-									onTriggerFailure(request, throwable);
-								} else {
-									onTriggerFailure(checkpoint, throwable);
-								}
+			CompletableFuture

Review comment:
       I think we could add `FutureUtils.assertNoException` here again if we used `handle`
or `exceptionally` instead of `whenComplete` for the final exception handler.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
##########
@@ -81,22 +78,24 @@ public void handleTaskLevelCheckpointException(
 			CheckpointException exception,
 			long checkpointId,
 			ExecutionAttemptID executionAttemptID) {
-		checkFailureCounter(exception, checkpointId);
-		if (continuousFailureCounter.get() > tolerableCpFailureNumber) {
+		handleException(exception, checkpointId, (failureCallback, e) -> failureCallback.failJobDueToTaskFailure(e,
executionAttemptID));
+	}
+
+	private void handleException(CheckpointException exception, long checkpointId, BiConsumer<FailJobCallback,
Exception> onFailure) {
+		if (isFailure(exception) &&
+				countedCheckpointIds.add(checkpointId) &&
+				continuousFailureCounter.incrementAndGet() > tolerableCpFailureNumber) {
 			clearCount();
-			failureCallback.failJobDueToTaskFailure(new FlinkRuntimeException("Exceeded checkpoint
tolerable failure threshold."), executionAttemptID);
+			onFailure.accept(failureCallback, new FlinkRuntimeException("Exceeded checkpoint tolerable
failure threshold."));
 		}
 	}
 
-	public void checkFailureCounter(
-			CheckpointException exception,
-			long checkpointId) {
+	private boolean isFailure(CheckpointException exception) {

Review comment:
       Maybe call it `isCheckpointFailure` as it seems that that we are ignoring a whole bunch
of exception types.

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java
##########
@@ -76,7 +76,7 @@ public void testTotalCountValue() {
 			failureManager.handleJobLevelCheckpointException(new CheckpointException(reason), -1);
 		}
 
-		assertEquals(2, callback.getInvokeCounter());
+		assertEquals(1, callback.getInvokeCounter());

Review comment:
       This test fails when running `ce537ac0417ebb159a4589f028a024814e03d03f`.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
##########
@@ -83,9 +83,10 @@ public void handleTaskLevelCheckpointException(
 
 	private void handleException(CheckpointException exception, long checkpointId, BiConsumer<FailJobCallback,
Exception> onFailure) {
 		if (isFailure(exception) &&
+				!isAlreadyFailed() && // prevent unnecessary storing checkpointId
 				countedCheckpointIds.add(checkpointId) &&
 				continuousFailureCounter.incrementAndGet() == tolerableCpFailureNumber + 1) {
-			clearCount();

Review comment:
       I think we are changing an invariant of this class here. Before it seems that `countedCheckpointIds.size()
== continuousFailureCounter.get()` which is now not longer true. I'm a bit sceptical that
these changes can be justified to be put in a hotfix commit.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
##########
@@ -83,9 +83,10 @@ public void handleTaskLevelCheckpointException(
 
 	private void handleException(CheckpointException exception, long checkpointId, BiConsumer<FailJobCallback,
Exception> onFailure) {
 		if (isFailure(exception) &&
+				!isAlreadyFailed() && // prevent unnecessary storing checkpointId
 				countedCheckpointIds.add(checkpointId) &&
 				continuousFailureCounter.incrementAndGet() == tolerableCpFailureNumber + 1) {

Review comment:
       Can we give this condition a bit more expressive name?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
##########
@@ -78,13 +74,13 @@ public void handleTaskLevelCheckpointException(
 			CheckpointException exception,
 			long checkpointId,
 			ExecutionAttemptID executionAttemptID) {
-		handleException(exception, checkpointId, (failureCallback, e) -> failureCallback.failJobDueToTaskFailure(e,
executionAttemptID));
+		handleException(exception, of(checkpointId), (failureCallback, e) -> failureCallback.failJobDueToTaskFailure(e,
executionAttemptID));

Review comment:
       I would refrain from using static imports. The problem I see is that another person
needs to know that this belongs to `Optional` when looking at this code.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -538,51 +542,61 @@ private void startTriggeringCheckpoint(CheckpointTriggerRequest request)
{
 									coordinatorsToCheckpoint, pendingCheckpoint, timer),
 							timer);
 
-			FutureUtils.assertNoException(
-				CompletableFuture.allOf(masterStatesComplete, coordinatorCheckpointsComplete)
-					.handleAsync(
-						(ignored, throwable) -> {
-							final PendingCheckpoint checkpoint =
-								FutureUtils.getWithoutException(pendingCheckpointCompletableFuture);
-
-							Preconditions.checkState(
-								checkpoint != null || throwable != null,
-								"Either the pending checkpoint needs to be created or an error must have been occurred.");
-
-							if (throwable != null) {
-								// the initialization might not be finished yet
-								if (checkpoint == null) {
-									onTriggerFailure(request, throwable);
-								} else {
-									onTriggerFailure(checkpoint, throwable);
-								}
+			FutureUtils.waitForAll(asList(masterStatesComplete, coordinatorCheckpointsComplete))
+				.handleAsync(
+					(ignored, throwable) -> {
+						final PendingCheckpoint checkpoint =
+							FutureUtils.getWithoutException(pendingCheckpointCompletableFuture);
+
+						Preconditions.checkState(
+							checkpoint != null || throwable != null,
+							"Either the pending checkpoint needs to be created or an error must have been occurred.");
+
+						if (throwable != null) {
+							// the initialization might not be finished yet
+							if (checkpoint == null) {
+								onTriggerFailure(request, throwable);
 							} else {
-								if (checkpoint.isDiscarded()) {
-									onTriggerFailure(
-										checkpoint,
-										new CheckpointException(
-											CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE,
-											checkpoint.getFailureCause()));
-								} else {
-									// no exception, no discarding, everything is OK
-									final long checkpointId = checkpoint.getCheckpointId();
-									snapshotTaskState(
-										timestamp,
-										checkpointId,
-										checkpoint.getCheckpointStorageLocation(),
-										request.props,
-										executions,
-										request.advanceToEndOfTime);
-
-									coordinatorsToCheckpoint.forEach((ctx) -> ctx.afterSourceBarrierInjection(checkpointId));
-
-									onTriggerSuccess();
-								}
+								onTriggerFailure(checkpoint, throwable);
 							}
+						} else {
+							if (checkpoint.isDiscarded()) {
+								onTriggerFailure(
+									checkpoint,
+									new CheckpointException(
+										CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE,
+										checkpoint.getFailureCause()));
+							} else {
+								// no exception, no discarding, everything is OK
+								final long checkpointId = checkpoint.getCheckpointId();
+								snapshotTaskState(
+									timestamp,
+									checkpointId,
+									checkpoint.getCheckpointStorageLocation(),
+									request.props,
+									executions,
+									request.advanceToEndOfTime);
+
+								coordinatorsToCheckpoint.forEach((ctx) -> ctx.afterSourceBarrierInjection(checkpointId));
+
+								onTriggerSuccess();
+							}
+						}
 
-							return null;
-						},
-						timer));
+						return null;
+					},
+					timer)
+				.whenComplete((unused, error) -> {
+					if (error != null) {
+						if (!isShutdown()) {
+							failureManager.handleJobLevelCheckpointException(new CheckpointException(EXCEPTION,
error), Optional.empty());

Review comment:
       Consequently, I'm not convinced that we need to call the `failureManager` here.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -538,51 +542,61 @@ private void startTriggeringCheckpoint(CheckpointTriggerRequest request)
{
 									coordinatorsToCheckpoint, pendingCheckpoint, timer),
 							timer);
 
-			FutureUtils.assertNoException(
-				CompletableFuture.allOf(masterStatesComplete, coordinatorCheckpointsComplete)
-					.handleAsync(
-						(ignored, throwable) -> {
-							final PendingCheckpoint checkpoint =
-								FutureUtils.getWithoutException(pendingCheckpointCompletableFuture);
-
-							Preconditions.checkState(
-								checkpoint != null || throwable != null,
-								"Either the pending checkpoint needs to be created or an error must have been occurred.");
-
-							if (throwable != null) {
-								// the initialization might not be finished yet
-								if (checkpoint == null) {
-									onTriggerFailure(request, throwable);
-								} else {
-									onTriggerFailure(checkpoint, throwable);
-								}
+			FutureUtils.waitForAll(asList(masterStatesComplete, coordinatorCheckpointsComplete))
+				.handleAsync(
+					(ignored, throwable) -> {
+						final PendingCheckpoint checkpoint =
+							FutureUtils.getWithoutException(pendingCheckpointCompletableFuture);
+
+						Preconditions.checkState(
+							checkpoint != null || throwable != null,
+							"Either the pending checkpoint needs to be created or an error must have been occurred.");
+
+						if (throwable != null) {
+							// the initialization might not be finished yet
+							if (checkpoint == null) {
+								onTriggerFailure(request, throwable);
 							} else {
-								if (checkpoint.isDiscarded()) {
-									onTriggerFailure(
-										checkpoint,
-										new CheckpointException(
-											CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE,
-											checkpoint.getFailureCause()));
-								} else {
-									// no exception, no discarding, everything is OK
-									final long checkpointId = checkpoint.getCheckpointId();
-									snapshotTaskState(
-										timestamp,
-										checkpointId,
-										checkpoint.getCheckpointStorageLocation(),
-										request.props,
-										executions,
-										request.advanceToEndOfTime);
-
-									coordinatorsToCheckpoint.forEach((ctx) -> ctx.afterSourceBarrierInjection(checkpointId));
-
-									onTriggerSuccess();
-								}
+								onTriggerFailure(checkpoint, throwable);
 							}
+						} else {
+							if (checkpoint.isDiscarded()) {
+								onTriggerFailure(
+									checkpoint,
+									new CheckpointException(
+										CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE,
+										checkpoint.getFailureCause()));
+							} else {
+								// no exception, no discarding, everything is OK
+								final long checkpointId = checkpoint.getCheckpointId();
+								snapshotTaskState(
+									timestamp,
+									checkpointId,
+									checkpoint.getCheckpointStorageLocation(),
+									request.props,
+									executions,
+									request.advanceToEndOfTime);
+
+								coordinatorsToCheckpoint.forEach((ctx) -> ctx.afterSourceBarrierInjection(checkpointId));
+
+								onTriggerSuccess();
+							}
+						}
 
-							return null;
-						},
-						timer));
+						return null;
+					},
+					timer)
+				.whenComplete((unused, error) -> {
+					if (error != null) {
+						if (!isShutdown()) {
+							failureManager.handleJobLevelCheckpointException(new CheckpointException(EXCEPTION,
error), Optional.empty());
+						} else if (error instanceof RejectedExecutionException) {

Review comment:
       I think we can say that we tolerate exception after the shut down because we have closed
the `timer` and can no longer guarantee that things keep working.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -539,51 +541,62 @@ private void startTriggeringCheckpoint(CheckpointTriggerRequest request)
{
 									coordinatorsToCheckpoint, pendingCheckpoint, timer),
 							timer);
 
-			FutureUtils.assertNoException(
-				CompletableFuture.allOf(masterStatesComplete, coordinatorCheckpointsComplete)
-					.handleAsync(
-						(ignored, throwable) -> {
-							final PendingCheckpoint checkpoint =
-								FutureUtils.getWithoutException(pendingCheckpointCompletableFuture);
-
-							Preconditions.checkState(
-								checkpoint != null || throwable != null,
-								"Either the pending checkpoint needs to be created or an error must have been occurred.");
-
-							if (throwable != null) {
-								// the initialization might not be finished yet
-								if (checkpoint == null) {
-									onTriggerFailure(request, throwable);
-								} else {
-									onTriggerFailure(checkpoint, throwable);
-								}
+			CompletableFuture
+				.allOf(masterStatesComplete, coordinatorCheckpointsComplete)
+				.handleAsync(
+					(ignored, throwable) -> {
+						final PendingCheckpoint checkpoint =
+							FutureUtils.getWithoutException(pendingCheckpointCompletableFuture);
+
+						Preconditions.checkState(
+							checkpoint != null || throwable != null,
+							"Either the pending checkpoint needs to be created or an error must have been occurred.");
+
+						if (throwable != null) {
+							// the initialization might not be finished yet
+							if (checkpoint == null) {
+								onTriggerFailure(request, throwable);
 							} else {
-								if (checkpoint.isDiscarded()) {
-									onTriggerFailure(
-										checkpoint,
-										new CheckpointException(
-											CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE,
-											checkpoint.getFailureCause()));
-								} else {
-									// no exception, no discarding, everything is OK
-									final long checkpointId = checkpoint.getCheckpointId();
-									snapshotTaskState(
-										timestamp,
-										checkpointId,
-										checkpoint.getCheckpointStorageLocation(),
-										request.props,
-										executions,
-										request.advanceToEndOfTime);
-
-									coordinatorsToCheckpoint.forEach((ctx) -> ctx.afterSourceBarrierInjection(checkpointId));
-
-									onTriggerSuccess();
-								}
+								onTriggerFailure(checkpoint, throwable);
+							}
+						} else {
+							if (checkpoint.isDiscarded()) {
+								onTriggerFailure(
+									checkpoint,
+									new CheckpointException(
+										CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE,
+										checkpoint.getFailureCause()));
+							} else {
+								// no exception, no discarding, everything is OK
+								final long checkpointId = checkpoint.getCheckpointId();
+								snapshotTaskState(
+									timestamp,
+									checkpointId,
+									checkpoint.getCheckpointStorageLocation(),
+									request.props,
+									executions,
+									request.advanceToEndOfTime);
+
+								coordinatorsToCheckpoint.forEach((ctx) -> ctx.afterSourceBarrierInjection(checkpointId));
+
+								onTriggerSuccess();
 							}
+						}
 
-							return null;
-						},
-						timer));
+						return null;
+					},
+					timer)
+				.whenComplete((unused, error) -> {
+					if (error != null) {
+						if (!isShutdown()) {
+							failureManager.handleJobLevelCheckpointException(new CheckpointException(EXCEPTION,
error), Optional.empty());
+						} else if (error instanceof RejectedExecutionException) {
+							LOG.debug("Execution rejected during shutdown");
+						} else {
+							LOG.warn("Error encountered during shutdown", error);
+						}
+					}
+				});

Review comment:
       A test is missing for the original problem.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
##########
@@ -84,7 +84,7 @@ public void handleTaskLevelCheckpointException(
 	private void handleException(CheckpointException exception, long checkpointId, BiConsumer<FailJobCallback,
Exception> onFailure) {
 		if (isFailure(exception) &&
 				countedCheckpointIds.add(checkpointId) &&
-				continuousFailureCounter.incrementAndGet() > tolerableCpFailureNumber) {
+				continuousFailureCounter.incrementAndGet() == tolerableCpFailureNumber + 1) {

Review comment:
       This should not be necessary since we are clearing `continuousFailureCounter` in `clearCount`.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -538,51 +542,61 @@ private void startTriggeringCheckpoint(CheckpointTriggerRequest request)
{
 									coordinatorsToCheckpoint, pendingCheckpoint, timer),
 							timer);
 
-			FutureUtils.assertNoException(
-				CompletableFuture.allOf(masterStatesComplete, coordinatorCheckpointsComplete)
-					.handleAsync(
-						(ignored, throwable) -> {
-							final PendingCheckpoint checkpoint =
-								FutureUtils.getWithoutException(pendingCheckpointCompletableFuture);
-
-							Preconditions.checkState(
-								checkpoint != null || throwable != null,
-								"Either the pending checkpoint needs to be created or an error must have been occurred.");
-
-							if (throwable != null) {
-								// the initialization might not be finished yet
-								if (checkpoint == null) {
-									onTriggerFailure(request, throwable);
-								} else {
-									onTriggerFailure(checkpoint, throwable);
-								}
+			FutureUtils.waitForAll(asList(masterStatesComplete, coordinatorCheckpointsComplete))
+				.handleAsync(
+					(ignored, throwable) -> {
+						final PendingCheckpoint checkpoint =
+							FutureUtils.getWithoutException(pendingCheckpointCompletableFuture);
+
+						Preconditions.checkState(
+							checkpoint != null || throwable != null,
+							"Either the pending checkpoint needs to be created or an error must have been occurred.");
+
+						if (throwable != null) {
+							// the initialization might not be finished yet
+							if (checkpoint == null) {
+								onTriggerFailure(request, throwable);
 							} else {
-								if (checkpoint.isDiscarded()) {
-									onTriggerFailure(
-										checkpoint,
-										new CheckpointException(
-											CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE,
-											checkpoint.getFailureCause()));
-								} else {
-									// no exception, no discarding, everything is OK
-									final long checkpointId = checkpoint.getCheckpointId();
-									snapshotTaskState(
-										timestamp,
-										checkpointId,
-										checkpoint.getCheckpointStorageLocation(),
-										request.props,
-										executions,
-										request.advanceToEndOfTime);
-
-									coordinatorsToCheckpoint.forEach((ctx) -> ctx.afterSourceBarrierInjection(checkpointId));
-
-									onTriggerSuccess();
-								}
+								onTriggerFailure(checkpoint, throwable);
 							}
+						} else {
+							if (checkpoint.isDiscarded()) {
+								onTriggerFailure(
+									checkpoint,
+									new CheckpointException(
+										CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE,
+										checkpoint.getFailureCause()));
+							} else {
+								// no exception, no discarding, everything is OK
+								final long checkpointId = checkpoint.getCheckpointId();
+								snapshotTaskState(
+									timestamp,
+									checkpointId,
+									checkpoint.getCheckpointStorageLocation(),
+									request.props,
+									executions,
+									request.advanceToEndOfTime);
+
+								coordinatorsToCheckpoint.forEach((ctx) -> ctx.afterSourceBarrierInjection(checkpointId));
+
+								onTriggerSuccess();
+							}
+						}
 
-							return null;
-						},
-						timer));
+						return null;
+					},
+					timer)
+				.whenComplete((unused, error) -> {
+					if (error != null) {
+						if (!isShutdown()) {
+							failureManager.handleJobLevelCheckpointException(new CheckpointException(EXCEPTION,
error), Optional.empty());

Review comment:
       Which exceptions do you expect here to appear?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



Mime
View raw message