flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [flink] XComp commented on a change in pull request #15898: [FLINK-21439][core] WIP: Adds Exception History for AdaptiveScheduler
Date Mon, 14 Jun 2021 07:41:06 GMT

XComp commented on a change in pull request #15898:
URL: https://github.com/apache/flink/pull/15898#discussion_r649872391



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
##########
@@ -145,14 +151,14 @@
  */
 public class AdaptiveScheduler
         implements SchedulerNG,
-                Created.Context,
-                WaitingForResources.Context,
-                CreatingExecutionGraph.Context,
-                Executing.Context,
-                Restarting.Context,
-                Failing.Context,
-                Finished.Context,
-                StopWithSavepoint.Context {
+        Created.Context,
+        WaitingForResources.Context,
+        CreatingExecutionGraph.Context,
+        Executing.Context,
+        Restarting.Context,
+        Failing.Context,
+        Finished.Context,
+        StopWithSavepoint.Context {

Review comment:
       Sorry for the late response. I am still busy with other stuff. I hope to get back to
you today or on Monday.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/FailureHandlingResultSnapshot.java
##########
@@ -59,27 +59,49 @@
     public static FailureHandlingResultSnapshot create(
             FailureHandlingResult failureHandlingResult,
             Function<ExecutionVertexID, Execution> latestExecutionLookup) {
+        return create(
+                failureHandlingResult.getExecutionVertexIdOfFailedTask(),
+                failureHandlingResult.getError(),
+                failureHandlingResult.getVerticesToRestart(),
+                failureHandlingResult.getTimestamp(),
+                latestExecutionLookup);
+    }
+
+    /**
+     * Creates a {@code FailureHandlingResultSnapshot} based on the passed {@link
+     * FailureHandlingResult} and {@link ExecutionVertex ExecutionVertices}.

Review comment:
       Looks like a copy&paste error for the JavaDoc. :-)

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
##########
@@ -792,11 +801,140 @@ public void testHowToHandleFailureUnrecoverableFailure() throws Exception
{
 
         assertThat(
                 scheduler
-                        .howToHandleFailure(new SuppressRestartsException(new Exception("test")))
+                        .howToHandleFailure(
+                                null, new SuppressRestartsException(new Exception("test")))
                         .canRestart(),
                 is(false));
     }
 
+    @Test
+    public void testExceptionHistoryWithGlobalFailure() throws Exception {
+        final JobGraph jobGraph = createJobGraph();
+
+        final DefaultDeclarativeSlotPool declarativeSlotPool =
+                createDeclarativeSlotPool(jobGraph.getJobID());
+
+        final Configuration configuration = new Configuration();
+        configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, Duration.ofMillis(1L));
+
+        final AdaptiveScheduler scheduler =
+                new AdaptiveSchedulerBuilder(jobGraph, singleThreadMainThreadExecutor)
+                        .setJobMasterConfiguration(configuration)
+                        .setDeclarativeSlotPool(declarativeSlotPool)
+                        .build();
+
+        final int numAvailableSlots = 1;
+        final SubmissionBufferingTaskManagerGateway taskManagerGateway =
+                new SubmissionBufferingTaskManagerGateway(numAvailableSlots);
+
+        singleThreadMainThreadExecutor.execute(
+                () -> {
+                    scheduler.startScheduling();
+                    offerSlots(
+                            declarativeSlotPool,
+                            createSlotOffersForResourceRequirements(
+                                    ResourceCounter.withResource(
+                                            ResourceProfile.UNKNOWN, numAvailableSlots)),
+                            taskManagerGateway);
+                });
+        taskManagerGateway.waitForSubmissions(numAvailableSlots, Duration.ofSeconds(5));
+
+        final Exception expectedException = new Exception("Expected Global Exception");
+        final long start = System.currentTimeMillis();
+        final CountDownLatch latch = new CountDownLatch(1);

Review comment:
       FYI: Alternatively, you could use the `OneShotLatch` here.

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java
##########
@@ -74,8 +74,8 @@
 import java.util.concurrent.Executor;
 import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.ScheduledFuture;
+import java.util.function.BiFunction;
 import java.util.function.Consumer;
-import java.util.function.Function;

Review comment:
       `Function` is still used in this test class (in `MockExecutionJobVertex`) and shouldn't
be removed.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
##########
@@ -880,6 +891,19 @@ public void goToFinished(ArchivedExecutionGraph archivedExecutionGraph)
{
         transitionToState(new Finished.Factory(this, archivedExecutionGraph, LOG));
     }
 
+    @Override
+    public void archiveFailure(FailureHandlingResultSnapshot failureHandlingResultSnapshot)
{
+        exceptionHistory.add(
+                RootExceptionHistoryEntry.fromFailureHandlingResultSnapshot(
+                        failureHandlingResultSnapshot));
+    }
+
+    private Iterable<RootExceptionHistoryEntry> getExceptionHistory() {
+        final Collection<RootExceptionHistoryEntry> copy = new ArrayList<>(exceptionHistory.size());
+        exceptionHistory.forEach(copy::add);
+        return copy;

Review comment:
       We could think of moving this logic into `BoundedFIFOQueue` considering that `SchedulerBase`
uses the exact same code. WDYT?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
##########
@@ -293,9 +316,19 @@ void goToFailing(
     static final class FailureResult {

Review comment:
       I'm wondering whether we should introduce a unit test for `FailureResult` considering
that it becomes more "powerful". And, maybe, moving it into `AdaptiveScheduler` might make
sense? WDYT?

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
##########
@@ -763,7 +771,8 @@ public void testHowToHandleFailureRejectedByStrategy() throws Exception
{
                         .setRestartBackoffTimeStrategy(NoRestartBackoffTimeStrategy.INSTANCE)
                         .build();
 
-        assertThat(scheduler.howToHandleFailure(new Exception("test")).canRestart(), is(false));
+        assertThat(
+                scheduler.howToHandleFailure(null, new Exception("test")).canRestart(), is(false));

Review comment:
       Theoretically, we would have to test passing a non-null value here as well for the
`failingExecutionVertexId` parameter. Introducing a `FailureResultTest` as mentioned above
would free us from doing that.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
##########
@@ -122,6 +132,15 @@ boolean updateTaskExecutionState(TaskExecutionStateTransition taskExecutionState
         return successfulUpdate;
     }
 
+    @Nullable
+    private ExecutionVertexID getExcutionVertexId(ExecutionAttemptID id) {
+        Execution execution = getExecutionGraph().getRegisteredExecutions().get(id);
+        if (execution == null) {
+            return null;
+        }
+        return execution.getVertex().getID();
+    }
+

Review comment:
       I think we could move this method into `StateWithExecutionGraph`. It feels to be a
utility method accessing the `ExecutionGraph`. That would also remove the code redundancy

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
##########
@@ -122,6 +132,15 @@ boolean updateTaskExecutionState(TaskExecutionStateTransition taskExecutionState
         return successfulUpdate;
     }
 
+    @Nullable
+    private ExecutionVertexID getExcutionVertexId(ExecutionAttemptID id) {

Review comment:
       ```suggestion
       private ExecutionVertexID getExecutionVertexId(ExecutionAttemptID id) {
   ```

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepoint.java
##########
@@ -178,9 +192,13 @@ private void completeOperationAndGoToFinished(String savepoint) {
         context.goToFinished(ArchivedExecutionGraph.createFrom(getExecutionGraph()));
     }
 
-    private void handleAnyFailure(Throwable cause) {
+    private void handleAnyFailure(

Review comment:
       I'm still puzzled by the `handleAnyFailure` method (I know that this is not directly
related to your change). But it feels wrong to have this code redundancy between `Executing`
and `StopWithSavepoint`. We hesitated in in the first place to let `StopWithSavepoint` inherit
from `Executing` since it would make the code harder to read.
   One other option is to move it out of the state implementations into `AdaptiveScheduler`.
Although, I feel like handling a failure is the responsibility of the state semantically.
Just to trigger a discussion here: What's your thought on that one?

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
##########
@@ -792,11 +801,140 @@ public void testHowToHandleFailureUnrecoverableFailure() throws Exception
{
 
         assertThat(
                 scheduler
-                        .howToHandleFailure(new SuppressRestartsException(new Exception("test")))
+                        .howToHandleFailure(
+                                null, new SuppressRestartsException(new Exception("test")))
                         .canRestart(),
                 is(false));
     }
 
+    @Test
+    public void testExceptionHistoryWithGlobalFailure() throws Exception {
+        final JobGraph jobGraph = createJobGraph();
+
+        final DefaultDeclarativeSlotPool declarativeSlotPool =
+                createDeclarativeSlotPool(jobGraph.getJobID());
+
+        final Configuration configuration = new Configuration();
+        configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, Duration.ofMillis(1L));
+
+        final AdaptiveScheduler scheduler =
+                new AdaptiveSchedulerBuilder(jobGraph, singleThreadMainThreadExecutor)
+                        .setJobMasterConfiguration(configuration)
+                        .setDeclarativeSlotPool(declarativeSlotPool)
+                        .build();
+
+        final int numAvailableSlots = 1;
+        final SubmissionBufferingTaskManagerGateway taskManagerGateway =
+                new SubmissionBufferingTaskManagerGateway(numAvailableSlots);
+
+        singleThreadMainThreadExecutor.execute(
+                () -> {
+                    scheduler.startScheduling();
+                    offerSlots(
+                            declarativeSlotPool,
+                            createSlotOffersForResourceRequirements(
+                                    ResourceCounter.withResource(
+                                            ResourceProfile.UNKNOWN, numAvailableSlots)),
+                            taskManagerGateway);
+                });
+        taskManagerGateway.waitForSubmissions(numAvailableSlots, Duration.ofSeconds(5));
+
+        final Exception expectedException = new Exception("Expected Global Exception");
+        final long start = System.currentTimeMillis();
+        final CountDownLatch latch = new CountDownLatch(1);
+        singleThreadMainThreadExecutor.execute(
+                () -> {
+                    scheduler.handleGlobalFailure(expectedException);
+                    latch.countDown();
+                });
+
+        latch.await();
+
+        Iterable<RootExceptionHistoryEntry> actualExceptionHistory =
+                scheduler.requestJob().getExceptionHistory();
+        final long end = System.currentTimeMillis();
+
+        assertThat(actualExceptionHistory, IsIterableWithSize.iterableWithSize(1));
+
+        RootExceptionHistoryEntry failure = actualExceptionHistory.iterator().next();
+
+        assertThat(
+                failure.getException().deserializeError(ClassLoader.getSystemClassLoader()),
+                Matchers.is(expectedException));
+        assertThat(failure.getTimestamp(), greaterThanOrEqualTo(start));
+        assertThat(failure.getTimestamp(), lessThanOrEqualTo(end));
+        assertThat(failure.getTaskManagerLocation(), Matchers.is(nullValue()));
+        assertThat(failure.getFailingTaskName(), Matchers.is(nullValue()));
+    }
+
+    @Test
+    public void testExceptionHistoryWithTaskFailure() throws Exception {
+        final JobGraph jobGraph = createJobGraph();
+
+        final DefaultDeclarativeSlotPool declarativeSlotPool =
+                createDeclarativeSlotPool(jobGraph.getJobID());
+
+        final Configuration configuration = new Configuration();
+        configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, Duration.ofMillis(1L));
+
+        final AdaptiveScheduler scheduler =
+                new AdaptiveSchedulerBuilder(jobGraph, singleThreadMainThreadExecutor)
+                        .setJobMasterConfiguration(configuration)
+                        .setDeclarativeSlotPool(declarativeSlotPool)
+                        .build();
+
+        final int numAvailableSlots = 4;
+        final SubmissionBufferingTaskManagerGateway taskManagerGateway =
+                new SubmissionBufferingTaskManagerGateway(numAvailableSlots);
+
+        singleThreadMainThreadExecutor.execute(
+                () -> {
+                    scheduler.startScheduling();
+                    offerSlots(
+                            declarativeSlotPool,
+                            createSlotOffersForResourceRequirements(
+                                    ResourceCounter.withResource(
+                                            ResourceProfile.UNKNOWN, numAvailableSlots)),
+                            taskManagerGateway);
+                });
+        taskManagerGateway.waitForSubmissions(numAvailableSlots, Duration.ofSeconds(5));
+
+        final Exception expectedException = new Exception("local failure");
+        Iterable<ArchivedExecutionVertex> executionVertices =
+                scheduler.requestJob().getArchivedExecutionGraph().getAllExecutionVertices();
+
+        ExecutionAttemptID attemptId =
+                executionVertices.iterator().next().getCurrentExecutionAttempt().getAttemptId();
+        final long start = System.currentTimeMillis();
+        final CountDownLatch latch = new CountDownLatch(1);
+        singleThreadMainThreadExecutor.execute(
+                () -> {
+                    scheduler.updateTaskExecutionState(
+                            new TaskExecutionStateTransition(
+                                    new TaskExecutionState(
+                                            attemptId, ExecutionState.FAILED, expectedException)));
+                    latch.countDown();
+                });
+
+        latch.await();
+
+        Iterable<RootExceptionHistoryEntry> actualExceptionHistory =
+                scheduler.requestJob().getExceptionHistory();
+        final long end = System.currentTimeMillis();
+
+        assertThat(actualExceptionHistory, IsIterableWithSize.iterableWithSize(1));
+
+        RootExceptionHistoryEntry failure = actualExceptionHistory.iterator().next();
+
+        assertThat(
+                failure.getException().deserializeError(ClassLoader.getSystemClassLoader()),
+                Matchers.is(expectedException));
+        assertThat(failure.getTimestamp(), greaterThanOrEqualTo(start));
+        assertThat(failure.getTimestamp(), lessThanOrEqualTo(end));
+        assertThat(failure.getTaskManagerLocation(), Matchers.is(nullValue()));
+        assertThat(failure.getFailingTaskName(), Matchers.is(nullValue()));

Review comment:
       There's a `ExceptionHistoryEntryMatcher` which you could use (and extend) instead.
@




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



Mime
View raw message