Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 48B97200B38 for ; Thu, 23 Jun 2016 18:04:17 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 4758B160A35; Thu, 23 Jun 2016 16:04:17 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id CE569160A59 for ; Thu, 23 Jun 2016 18:04:14 +0200 (CEST) Received: (qmail 7386 invoked by uid 500); 23 Jun 2016 16:04:14 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 7322 invoked by uid 99); 23 Jun 2016 16:04:14 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 23 Jun 2016 16:04:14 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id CC7A5E943A; Thu, 23 Jun 2016 16:04:13 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: trohrmann@apache.org To: commits@flink.apache.org Date: Thu, 23 Jun 2016 16:04:14 -0000 Message-Id: <492f57d48db14853b360cd6bc27a25e4@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [2/2] flink git commit: [FLINK-3800] [runtime] Introduce SUSPENDED job status archived-at: Thu, 23 Jun 2016 16:04:17 -0000 [FLINK-3800] [runtime] Introduce SUSPENDED job status The SUSPENDED job status is a new ExecutionGraph state which can be reached from all non-terminal states when calling suspend on the ExecutionGraph. Unlike the FAILED, FINISHED and CANCELED state, the SUSPENDED state does not trigger the deletion of the job from the HA storage. Therefore, this state can be used to handle the loss of leadership or the shutdown of a JobManager so that the ExecutionGraph is stopped but can still be recovered. SUSPENDED is also a terminal state but it can be differentiated as a locally terminal state from FAILED, CANCELED and FINISHED which are globally terminal states. Add test case for suspend signal Add test case for suspending restarting job Add test case for HA job recovery when losing leadership Add online documentation for the job status Add ASF license header to job_status.svg Not throw exception when calling ExecutionGraph.restart and job is in state SUSPENDED This closes #2096. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6420c1c2 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6420c1c2 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6420c1c2 Branch: refs/heads/master Commit: 6420c1c264ed3ce0c32ba164c2cdb85ccdccf265 Parents: cfe6293 Author: Till Rohrmann Authored: Thu Jun 9 11:37:14 2016 +0200 Committer: Till Rohrmann Committed: Thu Jun 23 15:12:10 2016 +0200 ---------------------------------------------------------------------- docs/internals/fig/job_status.svg | 973 +++++++++++++++++++ docs/internals/job_scheduling.md | 23 +- .../webmonitor/BackPressureStatsTracker.java | 4 +- .../webmonitor/handlers/JobDetailsHandler.java | 2 +- .../runtime/checkpoint/HeapStateStore.java | 2 +- .../runtime/executiongraph/ExecutionGraph.java | 80 +- .../restart/FixedDelayRestartStrategy.java | 8 +- .../restart/NoRestartStrategy.java | 3 - .../executiongraph/restart/RestartStrategy.java | 5 - .../flink/runtime/jobgraph/JobStatus.java | 38 +- .../runtime/webmonitor/WebMonitorUtils.java | 2 +- .../flink/runtime/jobmanager/JobManager.scala | 27 +- .../ExecutionGraphRestartTest.java | 121 +++ .../ExecutionGraphSignalsTest.java | 66 ++ .../jobmanager/JobManagerHARecoveryTest.java | 310 ++++++ .../LeaderChangeJobRecoveryTest.java | 10 +- .../LeaderChangeStateCleanupTest.java | 15 +- .../LeaderElectionRetrievalTestingCluster.java | 15 +- .../runtime/taskmanager/TaskCancelTest.java | 2 +- .../testutils/JobManagerActorTestUtils.java | 2 +- .../runtime/testingUtils/TestingCluster.scala | 4 - .../testutils/JobManagerCommunicationUtils.java | 2 +- .../JobManagerHAJobGraphRecoveryITCase.java | 51 +- .../org/apache/flink/yarn/YarnJobManager.scala | 2 +- 24 files changed, 1660 insertions(+), 107 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/6420c1c2/docs/internals/fig/job_status.svg ---------------------------------------------------------------------- diff --git a/docs/internals/fig/job_status.svg b/docs/internals/fig/job_status.svg new file mode 100644 index 0000000..a1093bc --- /dev/null +++ b/docs/internals/fig/job_status.svg @@ -0,0 +1,973 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + image/svg+xml + + + + + + + + + Created + + + + Running + + + + Finished + + + + Failing + + + + Failed + + + + Cancelling + + + + Canceled + + + + Restarting + + + + Suspended + + + + + + + + + Schedule job + All job vertices in final state + All job vertices in final state + All job vertices in final state & not restartable + Fail job + Cancel job + + + + + + + + + + Restarted job + Suspend job + Suspend job + Suspend job + Suspend job + Cancel job + Fail job + Cancel job + All job verticesin final state &restartable + Fail job + + + http://git-wip-us.apache.org/repos/asf/flink/blob/6420c1c2/docs/internals/job_scheduling.md ---------------------------------------------------------------------- diff --git a/docs/internals/job_scheduling.md b/docs/internals/job_scheduling.md index 1e1da97..9163678 100644 --- a/docs/internals/job_scheduling.md +++ b/docs/internals/job_scheduling.md @@ -74,7 +74,28 @@ Besides the vertices, the ExecutionGraph also contains the {% gh_link /flink-run JobGraph and ExecutionGraph -During its execution, each parallel task goes through multiple stages, from *created* to *finished* or *failed*. The diagram below illustrates the +Each ExecutionGraph has a job status associated with it. +This job status indicates the current state of the job execution. + +A Flink job is first in the *created* state, then switches to *running* and upon completion of all work it switches to *finished*. +In case of failures, a job switches first to *failing* where it cancels all running tasks. +If all job vertices have reached a final state and the job is not restartable, then the job transitions to *failed*. +If the job can be restarted, then it will enter the *restarting* state. +Once the job has been completely restarted, it will reach the *created* state. + +In case that the user cancels the job, it will go into the *cancelling* state. +This also entails the cancellation of all currently running tasks. +Once all running tasks have reached a final state, the job transitions to the state *cancelled*. + +Unlike the states *finished*, *canceled* and *failed* which denote a globally terminal state and, thus, trigger the clean up of the job, the *suspended* state is only locally terminal. +Locally terminal means that the execution of the job has been terminated on the respective JobManager but another JobManager of the Flink cluster can retrieve the job from the persistent HA store and restart it. +Consequently, a job which reaches the *suspended* state won't be completely cleaned up. + +
+States and Transitions of Flink job +
+ +During the execution of the ExecutionGraph, each parallel task goes through multiple stages, from *created* to *finished* or *failed*. The diagram below illustrates the states and possible transitions between them. A task may be executed multiple times (for example in the course of failure recovery). For that reason, the execution of an ExecutionVertex is tracked in an {% gh_link /flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java "Execution" %}. Each ExecutionVertex has a current Execution, and prior Executions. http://git-wip-us.apache.org/repos/asf/flink/blob/6420c1c2/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java index 34d8069..f890106 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java @@ -163,7 +163,7 @@ public class BackPressureStatsTracker { } if (!pendingStats.contains(vertex) && - !vertex.getGraph().getState().isTerminalState()) { + !vertex.getGraph().getState().isGloballyTerminalState()) { ExecutionContext executionContext = vertex.getGraph().getExecutionContext(); @@ -245,7 +245,7 @@ public class BackPressureStatsTracker { // Job finished, ignore. JobStatus jobState = vertex.getGraph().getState(); - if (jobState.isTerminalState()) { + if (jobState.isGloballyTerminalState()) { LOG.debug("Ignoring sample, because job is in state " + jobState + "."); } else if (success != null) { OperatorBackPressureStats stats = createStatsFromSample(success); http://git-wip-us.apache.org/repos/asf/flink/blob/6420c1c2/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java index 4f31128..884b859 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java @@ -66,7 +66,7 @@ public class JobDetailsHandler extends AbstractExecutionGraphRequestHandler { // times and duration final long jobStartTime = graph.getStatusTimestamp(JobStatus.CREATED); - final long jobEndTime = graph.getState().isTerminalState() ? + final long jobEndTime = graph.getState().isGloballyTerminalState() ? graph.getStatusTimestamp(graph.getState()) : -1L; gen.writeNumberField("start-time", jobStartTime); gen.writeNumberField("end-time", jobEndTime); http://git-wip-us.apache.org/repos/asf/flink/blob/6420c1c2/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/HeapStateStore.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/HeapStateStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/HeapStateStore.java index a0b3804..c679d50 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/HeapStateStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/HeapStateStore.java @@ -31,7 +31,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * * @param Type of state */ -class HeapStateStore implements StateStore { +public class HeapStateStore implements StateStore { private final ConcurrentMap stateMap = new ConcurrentHashMap<>(); http://git-wip-us.apache.org/repos/asf/flink/blob/6420c1c2/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index b11f51d..e521888 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -797,18 +797,50 @@ public class ExecutionGraph implements Serializable { } } - public void fail(Throwable t) { - if (t instanceof SuppressRestartsException) { - if (restartStrategy != null) { - // disable the restart strategy in case that we have seen a SuppressRestartsException - // it basically overrides the restart behaviour of a the root cause - restartStrategy.disable(); + /** + * Suspends the current ExecutionGraph. + * + * The JobStatus will be directly set to SUSPENDED iff the current state is not a terminal + * state. All ExecutionJobVertices will be canceled and the postRunCleanup is executed. + * + * The SUSPENDED state is a local terminal state which stops the execution of the job but does + * not remove the job from the HA job store so that it can be recovered by another JobManager. + * + * @param suspensionCause Cause of the suspension + */ + public void suspend(Throwable suspensionCause) { + while (true) { + JobStatus currentState = state; + + if (currentState.isGloballyTerminalState()) { + // stay in a terminal state + return; + } else if (transitionState(currentState, JobStatus.SUSPENDED, suspensionCause)) { + this.failureCause = suspensionCause; + + for (ExecutionJobVertex ejv: verticesInCreationOrder) { + ejv.cancel(); + } + + synchronized (progressLock) { + postRunCleanup(); + progressLock.notifyAll(); + + LOG.info("Job {} has been suspended.", getJobID()); + } + + return; } } + } + public void fail(Throwable t) { while (true) { JobStatus current = state; - if (current == JobStatus.FAILING || current.isTerminalState()) { + // stay in these states + if (current == JobStatus.FAILING || + current == JobStatus.SUSPENDED || + current.isGloballyTerminalState()) { return; } else if (current == JobStatus.RESTARTING && transitionState(current, JobStatus.FAILED, t)) { synchronized (progressLock) { @@ -849,6 +881,9 @@ public class ExecutionGraph implements Serializable { } else if (current == JobStatus.FAILED) { LOG.info("Failed job during restart. Aborting restart."); return; + } else if (current == JobStatus.SUSPENDED) { + LOG.info("Suspended job during restart. Aborting restart."); + return; } else if (current != JobStatus.RESTARTING) { throw new IllegalStateException("Can only restart job from state restarting."); } @@ -947,7 +982,7 @@ public class ExecutionGraph implements Serializable { * This method cleans fields that are irrelevant for the archived execution attempt. */ public void prepareForArchiving() { - if (!state.isTerminalState()) { + if (!state.isGloballyTerminalState()) { throw new IllegalStateException("Can only archive the job from a terminal state"); } @@ -984,7 +1019,7 @@ public class ExecutionGraph implements Serializable { */ public void waitUntilFinished() throws InterruptedException { synchronized (progressLock) { - while (!state.isTerminalState()) { + while (!state.isGloballyTerminalState()) { progressLock.wait(); } } @@ -1037,23 +1072,28 @@ public class ExecutionGraph implements Serializable { } } else if (current == JobStatus.FAILING) { - if (restartStrategy.canRestart() && transitionState(current, JobStatus.RESTARTING)) { - // double check in case that in the meantime a SuppressRestartsException was thrown - if (restartStrategy.canRestart()) { - restartStrategy.restart(this); - break; - } else { - fail(new Exception("ExecutionGraph went into RESTARTING state but " + - "then the restart strategy was disabled.")); - } - - } else if (!restartStrategy.canRestart() && transitionState(current, JobStatus.FAILED, failureCause)) { + boolean allowRestart = !(failureCause instanceof SuppressRestartsException); + + if (allowRestart && restartStrategy.canRestart() && transitionState(current, JobStatus.RESTARTING)) { + restartStrategy.restart(this); + break; + } else if ((!allowRestart || !restartStrategy.canRestart()) && transitionState(current, JobStatus.FAILED, failureCause)) { postRunCleanup(); break; } } + else if (current == JobStatus.SUSPENDED) { + // we've already cleaned up when entering the SUSPENDED state + break; + } + else if (current.isGloballyTerminalState()) { + LOG.warn("Job has entered globally terminal state without waiting for all " + + "job vertices to reach final state."); + break; + } else { fail(new Exception("ExecutionGraph went into final state from state " + current)); + break; } } // done transitioning the state http://git-wip-us.apache.org/repos/asf/flink/blob/6420c1c2/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java index 3406f4e..ac06379 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java @@ -41,7 +41,6 @@ public class FixedDelayRestartStrategy implements RestartStrategy { private final int maxNumberRestartAttempts; private final long delayBetweenRestartAttempts; private int currentRestartAttempt; - private boolean disabled = false; public FixedDelayRestartStrategy( int maxNumberRestartAttempts, @@ -61,7 +60,7 @@ public class FixedDelayRestartStrategy implements RestartStrategy { @Override public boolean canRestart() { - return !disabled && currentRestartAttempt < maxNumberRestartAttempts; + return currentRestartAttempt < maxNumberRestartAttempts; } @Override @@ -84,11 +83,6 @@ public class FixedDelayRestartStrategy implements RestartStrategy { }, executionGraph.getExecutionContext()); } - @Override - public void disable() { - disabled = true; - } - /** * Creates a FixedDelayRestartStrategy from the given Configuration. * http://git-wip-us.apache.org/repos/asf/flink/blob/6420c1c2/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java index 6cc5ee4..958d9ac 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java @@ -36,9 +36,6 @@ public class NoRestartStrategy implements RestartStrategy { throw new RuntimeException("NoRestartStrategy does not support restart."); } - @Override - public void disable() {} - /** * Creates a NoRestartStrategy instance. * http://git-wip-us.apache.org/repos/asf/flink/blob/6420c1c2/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java index c9e6277..2880c01 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java @@ -38,9 +38,4 @@ public interface RestartStrategy { * @param executionGraph The ExecutionGraph to be restarted */ void restart(ExecutionGraph executionGraph); - - /** - * Disables the restart strategy. - */ - void disable(); } http://git-wip-us.apache.org/repos/asf/flink/blob/6420c1c2/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java index eb7d017..52a2abe 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java @@ -24,38 +24,52 @@ package org.apache.flink.runtime.jobgraph; public enum JobStatus { /** Job is newly created, no task has started to run. */ - CREATED(false), + CREATED(TerminalState.NON_TERMINAL), /** Some tasks are scheduled or running, some may be pending, some may be finished. */ - RUNNING(false), + RUNNING(TerminalState.NON_TERMINAL), /** The job has failed and is currently waiting for the cleanup to complete */ - FAILING(false), + FAILING(TerminalState.NON_TERMINAL), /** The job has failed with a non-recoverable task failure */ - FAILED(true), + FAILED(TerminalState.GLOBALLY), /** Job is being cancelled */ - CANCELLING(false), + CANCELLING(TerminalState.NON_TERMINAL), /** Job has been cancelled */ - CANCELED(true), + CANCELED(TerminalState.GLOBALLY), /** All of the job's tasks have successfully finished. */ - FINISHED(true), + FINISHED(TerminalState.GLOBALLY), /** The job is currently undergoing a reset and total restart */ - RESTARTING(false); + RESTARTING(TerminalState.NON_TERMINAL), + + /** + * The job has been suspended which means that it has been stopped but not been removed from a + * potential HA job store. + */ + SUSPENDED(TerminalState.LOCALLY); // -------------------------------------------------------------------------------------------- + + enum TerminalState { + NON_TERMINAL, + LOCALLY, + GLOBALLY + } - private final boolean terminalState; + private final TerminalState terminalState; - JobStatus(boolean terminalState) { + JobStatus(TerminalState terminalState) { this.terminalState = terminalState; } - public boolean isTerminalState() { - return terminalState; + public boolean isGloballyTerminalState() { + return terminalState == TerminalState.GLOBALLY; } } + + http://git-wip-us.apache.org/repos/asf/flink/blob/6420c1c2/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java index 6d89de0..37a91b3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java @@ -168,7 +168,7 @@ public final class WebMonitorUtils { JobStatus status = job.getState(); long started = job.getStatusTimestamp(JobStatus.CREATED); - long finished = status.isTerminalState() ? job.getStatusTimestamp(status) : -1L; + long finished = status.isGloballyTerminalState() ? job.getStatusTimestamp(status) : -1L; int[] countsPerStatus = new int[ExecutionState.values().length]; long lastChanged = 0; http://git-wip-us.apache.org/repos/asf/flink/blob/6420c1c2/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 8ab887a..46f7ed2 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -210,8 +210,7 @@ class JobManager( log.info(s"Stopping JobManager $getAddress.") val newFuturesToComplete = cancelAndClearEverything( - new Exception("The JobManager is shutting down."), - removeJobFromStateBackend = true) + new Exception("The JobManager is shutting down.")) implicit val executionContext = context.dispatcher @@ -307,8 +306,7 @@ class JobManager( log.info(s"JobManager ${self.path.toSerializationFormat} was revoked leadership.") val newFuturesToComplete = cancelAndClearEverything( - new Exception("JobManager is no longer the leader."), - removeJobFromStateBackend = false) + new Exception("JobManager is no longer the leader.")) futuresToComplete = Some(futuresToComplete.getOrElse(Seq()) ++ newFuturesToComplete) @@ -746,7 +744,7 @@ class JobManager( s"Status of job $jobID (${executionGraph.getJobName}) changed to $newJobStatus.", error) - if (newJobStatus.isTerminalState()) { + if (newJobStatus.isGloballyTerminalState()) { jobInfo.end = timeStamp future{ @@ -951,7 +949,7 @@ class JobManager( case RemoveCachedJob(jobID) => currentJobs.get(jobID) match { case Some((graph, info)) => - if (graph.getState.isTerminalState) { + if (graph.getState.isGloballyTerminalState) { removeJob(graph.getJobID, removeJobFromStateBackend = true) match { case Some(futureToComplete) => futuresToComplete = Some(futuresToComplete.getOrElse(Seq()) :+ futureToComplete) @@ -1632,23 +1630,11 @@ class JobManager( * * @param cause Cause for the cancelling. */ - private def cancelAndClearEverything( - cause: Throwable, - removeJobFromStateBackend: Boolean) + private def cancelAndClearEverything(cause: Throwable) : Seq[Future[Unit]] = { val futures = for ((jobID, (eg, jobInfo)) <- currentJobs) yield { future { - if (removeJobFromStateBackend) { - try { - submittedJobGraphs.removeJobGraph(jobID) - } - catch { - case t: Throwable => - log.error("Error during submitted job graph clean up.", t) - } - } - - eg.fail(cause) + eg.suspend(cause) if (jobInfo.listeningBehaviour != ListeningBehaviour.DETACHED) { jobInfo.client ! decorateMessage( @@ -1667,7 +1653,6 @@ class JobManager( } override def revokeLeadership(): Unit = { - leaderSessionID = None self ! decorateMessage(RevokeLeadership) } http://git-wip-us.apache.org/repos/asf/flink/blob/6420c1c2/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java index 687a46a..26ba04f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.executiongraph; +import akka.dispatch.Futures; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.ExecutionConfigTest; import org.apache.flink.api.common.JobID; @@ -40,10 +41,14 @@ import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.util.TestLogger; import org.junit.Test; +import scala.concurrent.Await; +import scala.concurrent.Future; import scala.concurrent.duration.Deadline; import scala.concurrent.duration.FiniteDuration; +import scala.concurrent.impl.Promise; import java.util.Iterator; +import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.SimpleActorGateway; @@ -667,6 +672,122 @@ public class ExecutionGraphRestartTest extends TestLogger { assertEquals(JobStatus.RESTARTING, eg.getState()); } + /** + * Tests that a suspend call while restarting a job, will abort the restarting. + * + * @throws Exception + */ + @Test + public void testSuspendWhileRestarting() throws Exception { + FiniteDuration timeout = new FiniteDuration(1, TimeUnit.MINUTES); + Deadline deadline = timeout.fromNow(); + + Instance instance = ExecutionGraphTestUtils.getInstance( + new SimpleActorGateway(TestingUtils.directExecutionContext()), + NUM_TASKS); + + Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext()); + scheduler.newInstanceAvailable(instance); + + JobVertex sender = new JobVertex("Task"); + sender.setInvokableClass(Tasks.NoOpInvokable.class); + sender.setParallelism(NUM_TASKS); + + JobGraph jobGraph = new JobGraph("Pointwise job", sender); + + ControllableRestartStrategy controllableRestartStrategy = new ControllableRestartStrategy(timeout); + + ExecutionGraph eg = new ExecutionGraph( + TestingUtils.defaultExecutionContext(), + new JobID(), + "Test job", + new Configuration(), + ExecutionConfigTest.getSerializedConfig(), + AkkaUtils.getDefaultTimeout(), + controllableRestartStrategy); + + eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources()); + + assertEquals(JobStatus.CREATED, eg.getState()); + + eg.scheduleForExecution(scheduler); + + assertEquals(JobStatus.RUNNING, eg.getState()); + + instance.markDead(); + + Await.ready(controllableRestartStrategy.getReachedCanRestart(), deadline.timeLeft()); + + assertEquals(JobStatus.RESTARTING, eg.getState()); + + eg.suspend(new Exception("Test exception")); + + assertEquals(JobStatus.SUSPENDED, eg.getState()); + + controllableRestartStrategy.unlockRestart(); + + Await.ready(controllableRestartStrategy.getRestartDone(), deadline.timeLeft()); + + assertEquals(JobStatus.SUSPENDED, eg.getState()); + } + + private static class ControllableRestartStrategy implements RestartStrategy { + + private Promise reachedCanRestart = new Promise.DefaultPromise<>(); + private Promise doRestart = new Promise.DefaultPromise<>(); + private Promise restartDone = new Promise.DefaultPromise<>(); + + private volatile Exception exception = null; + + private FiniteDuration timeout; + + public ControllableRestartStrategy(FiniteDuration timeout) { + this.timeout = timeout; + } + + public void unlockRestart() { + doRestart.success(true); + } + + public Exception getException() { + return exception; + } + + public Future getReachedCanRestart() { + return reachedCanRestart.future(); + } + + public Future getRestartDone() { + return restartDone.future(); + } + + @Override + public boolean canRestart() { + reachedCanRestart.success(true); + return true; + } + + @Override + public void restart(final ExecutionGraph executionGraph) { + Futures.future(new Callable() { + @Override + public Object call() throws Exception { + try { + + Await.ready(doRestart.future(), timeout); + executionGraph.restart(); + } catch (Exception e) { + exception = e; + } + + restartDone.success(true); + + return null; + } + }, TestingUtils.defaultExecutionContext()); + } + } + private static void restartAfterFailure(ExecutionGraph eg, FiniteDuration timeout, boolean haltAfterRestart) throws InterruptedException { eg.getAllExecutionVertices().iterator().next().fail(new Exception("Test Exception")); http://git-wip-us.apache.org/repos/asf/flink/blob/6420c1c2/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java index 8b04fa3..bcee5a1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java @@ -196,7 +196,73 @@ public class ExecutionGraphSignalsTest { for (int i = 0; i < mockEJV.length; ++i) { verify(mockEJV[i], times(times)).cancel(); } + } + + /** + * Tests that suspend cancels the ExecutionJobVertices and transitions to SUSPENDED state. + * Tests also that one cannot leave the SUSPENDED state to enter a terminal state. + */ + @Test + public void testSuspend() throws Exception { + Assert.assertEquals(JobStatus.CREATED, eg.getState()); + Exception testException = new Exception("Test exception"); + + eg.suspend(testException); + + verifyCancel(1); + Assert.assertEquals(JobStatus.SUSPENDED, eg.getState()); + + f.set(eg, JobStatus.RUNNING); + + eg.suspend(testException); + + verifyCancel(2); + Assert.assertEquals(JobStatus.SUSPENDED, eg.getState()); + + f.set(eg, JobStatus.FAILING); + + eg.suspend(testException); + + verifyCancel(3); + Assert.assertEquals(JobStatus.SUSPENDED, eg.getState()); + + f.set(eg, JobStatus.CANCELLING); + + eg.suspend(testException); + + verifyCancel(4); + Assert.assertEquals(JobStatus.SUSPENDED, eg.getState()); + + f.set(eg, JobStatus.FAILED); + + eg.suspend(testException); + + verifyCancel(4); + Assert.assertEquals(JobStatus.FAILED, eg.getState()); + + f.set(eg, JobStatus.FINISHED); + + eg.suspend(testException); + + verifyCancel(4); + Assert.assertEquals(JobStatus.FINISHED, eg.getState()); + + f.set(eg, JobStatus.CANCELED); + + eg.suspend(testException); + + verifyCancel(4); + Assert.assertEquals(JobStatus.CANCELED, eg.getState()); + + f.set(eg, JobStatus.SUSPENDED); + + eg.fail(testException); + + Assert.assertEquals(JobStatus.SUSPENDED, eg.getState()); + + eg.cancel(); + Assert.assertEquals(JobStatus.SUSPENDED, eg.getState()); } // test that all source tasks receive STOP signal