spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andrewo...@apache.org
Subject spark git commit: [SPARK-4346][SPARK-3596][YARN] Commonize the monitor logic
Date Wed, 08 Apr 2015 20:56:45 GMT
Repository: spark
Updated Branches:
  refs/heads/master 86403f552 -> 55a92ef34


[SPARK-4346][SPARK-3596][YARN] Commonize the monitor logic

1. YarnClientSchedulerBack.asyncMonitorApplication use Client.monitorApplication so that commonize
the monitor logic
2. Support changing the yarn client monitor interval, see #5292
3. More details see discussion on https://github.com/apache/spark/pull/3143

Author: unknown <l00251599@HGHY1L002515991.china.huawei.com>
Author: Sephiroth-Lin <linwzhong@gmail.com>

Closes #5305 from Sephiroth-Lin/SPARK-4346_3596 and squashes the following commits:

47c0014 [unknown] Edit conflicts
52b29fe [unknown] Interrupt thread when we call stop()
d4298a1 [unknown] Unused, don't push
aaacb42 [Sephiroth-Lin] don't wrap the entire block in the try
ee2b2fd [Sephiroth-Lin] update
6483a2a [unknown] Catch exception
6b47ff7 [unknown] Update code
568f46f [unknown] YarnClientSchedulerBack.asyncMonitorApplication should be common with Client.monitorApplication


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/55a92ef3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/55a92ef3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/55a92ef3

Branch: refs/heads/master
Commit: 55a92ef34c0b57b6e379523d5d79baa05392de37
Parents: 86403f5
Author: unknown <l00251599@HGHY1L002515991.china.huawei.com>
Authored: Wed Apr 8 13:56:42 2015 -0700
Committer: Andrew Or <andrew@databricks.com>
Committed: Wed Apr 8 13:56:42 2015 -0700

----------------------------------------------------------------------
 .../org/apache/spark/deploy/yarn/Client.scala   | 10 +++++-
 .../cluster/YarnClientSchedulerBackend.scala    | 32 ++++++--------------
 2 files changed, 18 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/55a92ef3/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 79d55a0..7219852 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.api.protocolrecords._
 import org.apache.hadoop.yarn.api.records._
 import org.apache.hadoop.yarn.client.api.{YarnClient, YarnClientApplication}
 import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException
 import org.apache.hadoop.yarn.util.Records
 
 import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkException}
@@ -561,7 +562,14 @@ private[spark] class Client(
     var lastState: YarnApplicationState = null
     while (true) {
       Thread.sleep(interval)
-      val report = getApplicationReport(appId)
+      val report: ApplicationReport =
+        try {
+          getApplicationReport(appId)
+        } catch {
+          case e: ApplicationNotFoundException =>
+            logError(s"Application $appId not found.")
+            return (YarnApplicationState.KILLED, FinalApplicationStatus.KILLED)
+        }
       val state = report.getYarnApplicationState
 
       if (logApplicationReport) {

http://git-wip-us.apache.org/repos/asf/spark/blob/55a92ef3/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index 8abdc26..407dc1a 100644
--- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -34,7 +34,7 @@ private[spark] class YarnClientSchedulerBackend(
 
   private var client: Client = null
   private var appId: ApplicationId = null
-  @volatile private var stopping: Boolean = false
+  private var monitorThread: Thread = null
 
   /**
    * Create a Yarn client to submit an application to the ResourceManager.
@@ -57,7 +57,8 @@ private[spark] class YarnClientSchedulerBackend(
     client = new Client(args, conf)
     appId = client.submitApplication()
     waitForApplication()
-    asyncMonitorApplication()
+    monitorThread = asyncMonitorApplication()
+    monitorThread.start()
   }
 
   /**
@@ -123,34 +124,19 @@ private[spark] class YarnClientSchedulerBackend(
    * If the application has exited for any reason, stop the SparkContext.
    * This assumes both `client` and `appId` have already been set.
    */
-  private def asyncMonitorApplication(): Unit = {
+  private def asyncMonitorApplication(): Thread = {
     assert(client != null && appId != null, "Application has not been submitted yet!")
     val t = new Thread {
       override def run() {
-        while (!stopping) {
-          var state: YarnApplicationState = null
-          try {
-            val report = client.getApplicationReport(appId)
-            state = report.getYarnApplicationState()
-          } catch {
-            case e: ApplicationNotFoundException =>
-              state = YarnApplicationState.KILLED
-          }
-          if (state == YarnApplicationState.FINISHED ||
-            state == YarnApplicationState.KILLED ||
-            state == YarnApplicationState.FAILED) {
-            logError(s"Yarn application has already exited with state $state!")
-            sc.stop()
-            stopping = true
-          }
-          Thread.sleep(1000L)
-        }
+        val (state, _) = client.monitorApplication(appId, logApplicationReport = false)
+        logError(s"Yarn application has already exited with state $state!")
+        sc.stop()
         Thread.currentThread().interrupt()
       }
     }
     t.setName("Yarn application state monitor")
     t.setDaemon(true)
-    t.start()
+    t
   }
 
   /**
@@ -158,7 +144,7 @@ private[spark] class YarnClientSchedulerBackend(
    */
   override def stop() {
     assert(client != null, "Attempted to stop this scheduler before starting it!")
-    stopping = true
+    monitorThread.interrupt()
     super.stop()
     client.stop()
     logInfo("Stopped")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message