spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a...@apache.org
Subject git commit: [SPARK-937] adding EXITED executor state and not relaunching cleanly exited executors
Date Sun, 15 Jun 2014 21:56:05 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-1.0 868cf421e -> 609e5ff20


[SPARK-937] adding EXITED executor state and not relaunching cleanly exited executors

There seems to be 2 issues.

1. When job is done, driver asks executor to shutdown. However, this clean exit was assigned
FAILED executor state by Worker. I introduced EXITED executor state for executors who voluntarily
exit (both normal and abnormal exit depending on the exit code).

2. When Master gets notified an executor has exited, it launches another one to replace it,
regardless of reason why the executor had exited. When the reason was job has finished, the
unnecessary replacement got subsequently killed when App disassociates. This launching and
killing of unnecessary executors shows up in the log and is confusing to users. I added check
for executor exit status and avoid launching (and subsequent killing) of unnecessary replacements
when executors exit cleanly.

One could ask the scheduler to tell Master job is done so that Master wouldn't launch the
replacement executor. However, there is a race condition between App telling Master job is
done and Worker telling Master an executor had exited. There is no guarantee the former will
happen before the later. Instead, I chose to check the exit code when executor exits. If the
exit code is 0, I assume executor has been asked to shutdown by driver and Master will not
launch replacements.

Due to race condition, it could also happen that (although didn't happen on my local cluster),
Master detects App disassociation event before the executor exits by itself. In such cases,
the executor will be rightfully killed and labeled as KILLED, while the App state will show
FINISHED.

Author: Kan Zhang <kzhang@apache.org>

Closes #306 from kanzhang/SPARK-1118 and squashes the following commits:

cb0cc86 [Kan Zhang] [SPARK-937] adding EXITED executor state and not relaunching cleanly exited
executors

(cherry picked from commit ca5d9d43b93abd279079b3be8a06fdd78c595510)
Signed-off-by: Aaron Davidson <aaron@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/609e5ff2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/609e5ff2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/609e5ff2

Branch: refs/heads/branch-1.0
Commit: 609e5ff20dc5f9eefbe1e6de8d21096de78ff8bd
Parents: 868cf42
Author: Kan Zhang <kzhang@apache.org>
Authored: Sun Jun 15 14:55:34 2014 -0700
Committer: Aaron Davidson <aaron@databricks.com>
Committed: Sun Jun 15 14:55:53 2014 -0700

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/deploy/ExecutorState.scala    | 4 ++--
 .../main/scala/org/apache/spark/deploy/master/Master.scala    | 5 +++--
 .../scala/org/apache/spark/deploy/worker/ExecutorRunner.scala | 7 +++----
 3 files changed, 8 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/609e5ff2/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala b/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala
index 37dfa7f..9f34d01 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala
@@ -19,9 +19,9 @@ package org.apache.spark.deploy
 
 private[spark] object ExecutorState extends Enumeration {
 
-  val LAUNCHING, LOADING, RUNNING, KILLED, FAILED, LOST = Value
+  val LAUNCHING, LOADING, RUNNING, KILLED, FAILED, LOST, EXITED = Value
 
   type ExecutorState = Value
 
-  def isFinished(state: ExecutorState): Boolean = Seq(KILLED, FAILED, LOST).contains(state)
+  def isFinished(state: ExecutorState): Boolean = Seq(KILLED, FAILED, LOST, EXITED).contains(state)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/609e5ff2/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index c6dec30..33ffcbd 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -303,10 +303,11 @@ private[spark] class Master(
             appInfo.removeExecutor(exec)
             exec.worker.removeExecutor(exec)
 
+            val normalExit = exitStatus.exists(_ == 0)
             // Only retry certain number of times so we don't go into an infinite loop.
-            if (appInfo.incrementRetryCount < ApplicationState.MAX_NUM_RETRY) {
+            if (!normalExit && appInfo.incrementRetryCount < ApplicationState.MAX_NUM_RETRY)
{
               schedule()
-            } else {
+            } else if (!normalExit) {
               logError("Application %s with ID %s failed %d times, removing it".format(
                 appInfo.desc.name, appInfo.id, appInfo.retryCount))
               removeApplication(appInfo, ApplicationState.FAILED)

http://git-wip-us.apache.org/repos/asf/spark/blob/609e5ff2/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
index d27e0e1..5f421fd 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -143,11 +143,10 @@ private[spark] class ExecutorRunner(
       Files.write(header, stderr, Charsets.UTF_8)
       CommandUtils.redirectStream(process.getErrorStream, stderr)
 
-      // Wait for it to exit; this is actually a bad thing if it happens, because we expect
to run
-      // long-lived processes only. However, in the future, we might restart the executor
a few
-      // times on the same machine.
+      // Wait for it to exit; executor may exit with code 0 (when driver instructs it to
shutdown)
+      // or with nonzero exit code
       val exitCode = process.waitFor()
-      state = ExecutorState.FAILED
+      state = ExecutorState.EXITED
       val message = "Command exited with code " + exitCode
       worker ! ExecutorStateChanged(appId, execId, state, Some(message), Some(exitCode))
     } catch {


Mime
View raw message