spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tgra...@apache.org
Subject git commit: SPARK-1032. If Yarn app fails before registering, app master stays aroun...
Date Thu, 20 Mar 2014 21:51:18 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-0.9 748f002b3 -> c6630d363


SPARK-1032. If Yarn app fails before registering, app master stays aroun...

...d long after

This reopens https://github.com/apache/incubator-spark/pull/648 against the new repo.

Author: Sandy Ryza <sandy@cloudera.com>

Closes #28 from sryza/sandy-spark-1032 and squashes the following commits:

5953f50 [Sandy Ryza] SPARK-1032. If Yarn app fails before registering, app master stays around
long after

Conflicts:
	yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
	yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c6630d36
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c6630d36
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c6630d36

Branch: refs/heads/branch-0.9
Commit: c6630d363a73184c8dcca9f2c0c6fc3f5c8e47bf
Parents: 748f002
Author: Sandy Ryza <sandy@cloudera.com>
Authored: Fri Feb 28 09:40:47 2014 -0600
Committer: Thomas Graves <tgraves@apache.org>
Committed: Thu Mar 20 16:50:44 2014 -0500

----------------------------------------------------------------------
 .../spark/deploy/yarn/ApplicationMaster.scala   | 34 +++++++++++++-------
 .../spark/deploy/yarn/ApplicationMaster.scala   | 22 +++++++++----
 2 files changed, 38 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c6630d36/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 7aa1894..e045b9f 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -66,6 +66,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
   private val maxNumWorkerFailures = sparkConf.getInt("spark.yarn.max.worker.failures",
     math.max(args.numWorkers * 2, 3))
 
+  private var registered = false
+
   private val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse(
     SparkContext.SPARK_UNKNOWN_USER)
 
@@ -114,7 +116,12 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
     waitForSparkContextInitialized()
 
     // Do this after spark master is up and SparkContext is created so that we can register
UI Url
-    val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster()
+    synchronized {
+      if (!isFinished) {
+        registerApplicationMaster()
+        registered = true
+      }
+    }
 
     // Allocate all containers
     allocateWorkers()
@@ -212,7 +219,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
         var count = 0
         val waitTime = 10000L
         val numTries = sparkConf.getInt("spark.yarn.ApplicationMaster.waitTries", 10)
-        while (ApplicationMaster.sparkContextRef.get() == null && count < numTries)
{
+        while (ApplicationMaster.sparkContextRef.get() == null && count < numTries
+            && !isFinished) {
           logInfo("Waiting for spark context initialization ... " + count)
           count = count + 1
           ApplicationMaster.sparkContextRef.wait(waitTime)
@@ -345,17 +353,19 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
         return
       }
       isFinished = true
+      
+      logInfo("finishApplicationMaster with " + status)
+      if (registered) {
+        val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest])
+          .asInstanceOf[FinishApplicationMasterRequest]
+        finishReq.setAppAttemptId(appAttemptId)
+        finishReq.setFinishApplicationStatus(status)
+        finishReq.setDiagnostics(diagnostics)
+        // Set tracking url to empty since we don't have a history server.
+        finishReq.setTrackingUrl("")
+        resourceManager.finishApplicationMaster(finishReq)
+      }
     }
-
-    logInfo("finishApplicationMaster with " + status)
-    val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest])
-      .asInstanceOf[FinishApplicationMasterRequest]
-    finishReq.setAppAttemptId(appAttemptId)
-    finishReq.setFinishApplicationStatus(status)
-    finishReq.setDiagnostics(diagnostics)
-    // Set tracking url to empty since we don't have a history server.
-    finishReq.setTrackingUrl("")
-    resourceManager.finishApplicationMaster(finishReq)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/c6630d36/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 45a7f38..b312a42 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -68,6 +68,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
   private val maxNumWorkerFailures = sparkConf.getInt("spark.yarn.max.worker.failures",
     math.max(args.numWorkers * 2, 3))
 
+  private var registered = false
+
   private val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse(
     SparkContext.SPARK_UNKNOWN_USER)
 
@@ -103,7 +105,12 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
     waitForSparkContextInitialized()
 
     // Do this after Spark master is up and SparkContext is created so that we can register
UI Url.
-    val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster()
+    synchronized {
+      if (!isFinished) {
+        registerApplicationMaster()
+        registered = true
+      }
+    }
 
     // Allocate all containers
     allocateWorkers()
@@ -184,7 +191,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
         var numTries = 0
         val waitTime = 10000L
         val maxNumTries = sparkConf.getInt("spark.yarn.applicationMaster.waitTries", 10)
-        while (ApplicationMaster.sparkContextRef.get() == null && numTries < maxNumTries)
{
+        while (ApplicationMaster.sparkContextRef.get() == null && numTries < maxNumTries
+            && !isFinished) {
           logInfo("Waiting for Spark context initialization ... " + numTries)
           numTries = numTries + 1
           ApplicationMaster.sparkContextRef.wait(waitTime)
@@ -317,11 +325,13 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
         return
       }
       isFinished = true
-    }
 
-    logInfo("finishApplicationMaster with " + status)
-    // Set tracking URL to empty since we don't have a history server.
-    amClient.unregisterApplicationMaster(status, "" /* appMessage */ , "" /* appTrackingUrl
*/)
+      logInfo("finishApplicationMaster with " + status)
+      if (registered) {
+        // Set tracking URL to empty since we don't have a history server.
+        amClient.unregisterApplicationMaster(status, "" /* appMessage */ , "" /* appTrackingUrl
*/)
+      }
+    }
   }
 
   /**


Mime
View raw message