spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tgra...@apache.org
Subject git commit: [SPARK-3072] YARN - Exit when reach max number failed executors
Date Tue, 19 Aug 2014 14:41:02 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-1.1 f3b0f34b4 -> 1418893da


[SPARK-3072] YARN - Exit when reach max number failed executors

In some cases on hadoop 2.x the spark application master doesn't properly exit and hangs around
for 10 minutes after its really done.  We should make sure it exits properly and stops the
driver.

Author: Thomas Graves <tgraves@apache.org>

Closes #2022 from tgravescs/SPARK-3072 and squashes the following commits:

665701d [Thomas Graves] Exit when reach max number failed executors

(cherry picked from commit 7eb9cbc273d758522e787fcb2ef68ef65911475f)
Signed-off-by: Thomas Graves <tgraves@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1418893d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1418893d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1418893d

Branch: refs/heads/branch-1.1
Commit: 1418893da557892b86fc47f1e41e91880d4f8eda
Parents: f3b0f34
Author: Thomas Graves <tgraves@apache.org>
Authored: Tue Aug 19 09:40:31 2014 -0500
Committer: Thomas Graves <tgraves@apache.org>
Committed: Tue Aug 19 09:40:55 2014 -0500

----------------------------------------------------------------------
 .../spark/deploy/yarn/ApplicationMaster.scala   | 33 +++++++++++++-------
 .../spark/deploy/yarn/ExecutorLauncher.scala    |  5 +--
 .../spark/deploy/yarn/ApplicationMaster.scala   | 16 +++++++---
 .../spark/deploy/yarn/ExecutorLauncher.scala    |  5 +--
 4 files changed, 40 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1418893d/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 62b5c3b..46a01f5 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -267,12 +267,10 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
       // TODO: This is a bit ugly. Can we make it nicer?
       // TODO: Handle container failure
 
-      // Exists the loop if the user thread exits.
-      while (yarnAllocator.getNumExecutorsRunning < args.numExecutors && userThread.isAlive)
{
-        if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
-          finishApplicationMaster(FinalApplicationStatus.FAILED,
-            "max number of executor failures reached")
-        }
+      // Exits the loop if the user thread exits.
+      while (yarnAllocator.getNumExecutorsRunning < args.numExecutors && userThread.isAlive
+          && !isFinished) {
+        checkNumExecutorsFailed()
         yarnAllocator.allocateContainers(
           math.max(args.numExecutors - yarnAllocator.getNumExecutorsRunning, 0))
         Thread.sleep(ApplicationMaster.ALLOCATE_HEARTBEAT_INTERVAL)
@@ -303,11 +301,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
 
     val t = new Thread {
       override def run() {
-        while (userThread.isAlive) {
-          if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
-            finishApplicationMaster(FinalApplicationStatus.FAILED,
-              "max number of executor failures reached")
-          }
+        while (userThread.isAlive && !isFinished) {
+          checkNumExecutorsFailed()
           val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning
           if (missingExecutorCount > 0) {
             logInfo("Allocating %d containers to make up for (potentially) lost containers".
@@ -327,6 +322,22 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
     t
   }
 
+  private def checkNumExecutorsFailed() {
+    if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
+      logInfo("max number of executor failures reached")
+      finishApplicationMaster(FinalApplicationStatus.FAILED,
+        "max number of executor failures reached")
+      // make sure to stop the user thread
+      val sparkContext = ApplicationMaster.sparkContextRef.get()
+      if (sparkContext != null) {
+        logInfo("Invoking sc stop from checkNumExecutorsFailed")
+        sparkContext.stop()
+      } else {
+        logError("sparkContext is null when should shutdown")
+      }
+    }
+  }
+
   private def sendProgress() {
     logDebug("Sending progress")
     // Simulated with an allocate request with no nodes requested ...

http://git-wip-us.apache.org/repos/asf/spark/blob/1418893d/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
----------------------------------------------------------------------
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
index 184e2ad..72c7143 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
@@ -249,7 +249,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration,
sp
     // Wait until all containers have finished
     // TODO: This is a bit ugly. Can we make it nicer?
     // TODO: Handle container failure
-    while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && (!driverClosed))
{
+    while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && (!driverClosed)
&&
+        !isFinished) {
       yarnAllocator.allocateContainers(
         math.max(args.numExecutors - yarnAllocator.getNumExecutorsRunning, 0))
       checkNumExecutorsFailed()
@@ -271,7 +272,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration,
sp
 
     val t = new Thread {
       override def run() {
-        while (!driverClosed) {
+        while (!driverClosed && !isFinished) {
           checkNumExecutorsFailed()
           val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning
           if (missingExecutorCount > 0) {

http://git-wip-us.apache.org/repos/asf/spark/blob/1418893d/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 035356d..9c2bcf1 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -247,13 +247,12 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
       yarnAllocator.allocateResources()
       // Exits the loop if the user thread exits.
 
-      var iters = 0
-      while (yarnAllocator.getNumExecutorsRunning < args.numExecutors && userThread.isAlive)
{
+      while (yarnAllocator.getNumExecutorsRunning < args.numExecutors && userThread.isAlive
+          && !isFinished) {
         checkNumExecutorsFailed()
         allocateMissingExecutor()
         yarnAllocator.allocateResources()
         Thread.sleep(ApplicationMaster.ALLOCATE_HEARTBEAT_INTERVAL)
-        iters += 1
       }
     }
     logInfo("All executors have launched.")
@@ -271,8 +270,17 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
 
   private def checkNumExecutorsFailed() {
     if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
+      logInfo("max number of executor failures reached")
       finishApplicationMaster(FinalApplicationStatus.FAILED,
         "max number of executor failures reached")
+      // make sure to stop the user thread
+      val sparkContext = ApplicationMaster.sparkContextRef.get()
+      if (sparkContext != null) {
+        logInfo("Invoking sc stop from checkNumExecutorsFailed")
+        sparkContext.stop()
+      } else {
+        logError("sparkContext is null when should shutdown")
+      }
     }
   }
 
@@ -289,7 +297,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
 
     val t = new Thread {
       override def run() {
-        while (userThread.isAlive) {
+        while (userThread.isAlive && !isFinished) {
           checkNumExecutorsFailed()
           allocateMissingExecutor()
           logDebug("Sending progress")

http://git-wip-us.apache.org/repos/asf/spark/blob/1418893d/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
----------------------------------------------------------------------
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
index fc7b832..a758574 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
@@ -217,7 +217,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration,
sp
     // Wait until all containers have launched
     yarnAllocator.addResourceRequests(args.numExecutors)
     yarnAllocator.allocateResources()
-    while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && (!driverClosed))
{
+    while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && (!driverClosed)
&&
+        !isFinished) {
       checkNumExecutorsFailed()
       allocateMissingExecutor()
       yarnAllocator.allocateResources()
@@ -249,7 +250,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration,
sp
 
     val t = new Thread {
       override def run() {
-        while (!driverClosed) {
+        while (!driverClosed && !isFinished) {
           checkNumExecutorsFailed()
           allocateMissingExecutor()
           logDebug("Sending progress")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message