spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pwend...@apache.org
Subject [1/4] git commit: fix for SPARK-1027
Date Thu, 23 Jan 2014 02:58:13 GMT
Updated Branches:
  refs/heads/master 3184facdc -> 034dce2a7


fix for SPARK-1027

change TestClient & Worker to Some("xxx")

kill manager if it is started

remove unnecessary .get when fetch "SPARK_HOME" values


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/29f4b6a2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/29f4b6a2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/29f4b6a2

Branch: refs/heads/master
Commit: 29f4b6a2d9f42a727691444312964e59ef9b95ee
Parents: f9a95d6
Author: CodingCat <zhunansjtu@gmail.com>
Authored: Wed Jan 15 20:46:14 2014 -0500
Committer: CodingCat <zhunansjtu@gmail.com>
Committed: Mon Jan 20 02:50:30 2014 -0500

----------------------------------------------------------------------
 .../apache/spark/deploy/ApplicationDescription.scala    |  2 +-
 .../scala/org/apache/spark/deploy/DeployMessage.scala   |  3 +--
 .../org/apache/spark/deploy/client/TestClient.scala     |  2 +-
 .../scala/org/apache/spark/deploy/master/Master.scala   |  8 ++++----
 .../scala/org/apache/spark/deploy/worker/Worker.scala   | 12 +++++++-----
 .../scheduler/cluster/SparkDeploySchedulerBackend.scala |  2 +-
 .../org/apache/spark/deploy/JsonProtocolSuite.scala     |  2 +-
 .../apache/spark/deploy/worker/ExecutorRunnerTest.scala |  4 ++--
 8 files changed, 18 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/29f4b6a2/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
index e38459b..449b953 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
@@ -22,7 +22,7 @@ private[spark] class ApplicationDescription(
     val maxCores: Option[Int],
     val memoryPerSlave: Int,
     val command: Command,
-    val sparkHome: String,
+    val sparkHome: Option[String],
     val appUiUrl: String)
   extends Serializable {
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/29f4b6a2/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
index 5e824e1..83ce14a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
@@ -79,8 +79,7 @@ private[deploy] object DeployMessages {
       execId: Int,
       appDesc: ApplicationDescription,
       cores: Int,
-      memory: Int,
-      sparkHome: String)
+      memory: Int)
     extends DeployMessage
 
   case class LaunchDriver(driverId: String, driverDesc: DriverDescription) extends DeployMessage

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/29f4b6a2/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
index ffa909c..8017932 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
@@ -49,7 +49,7 @@ private[spark] object TestClient {
       conf = new SparkConf)
     val desc = new ApplicationDescription(
       "TestClient", Some(1), 512, Command("spark.deploy.client.TestExecutor", Seq(), Map()),
-      "dummy-spark-home", "ignored")
+      Some("dummy-spark-home"), "ignored")
     val listener = new TestListener
     val client = new AppClient(actorSystem, Array(url), desc, listener, new SparkConf)
     client.start()

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/29f4b6a2/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index d9ea96a..fe9770c 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -480,7 +480,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends
Act
         for (pos <- 0 until numUsable) {
           if (assigned(pos) > 0) {
             val exec = app.addExecutor(usableWorkers(pos), assigned(pos))
-            launchExecutor(usableWorkers(pos), exec, app.desc.sparkHome)
+            launchExecutor(usableWorkers(pos), exec)
             app.state = ApplicationState.RUNNING
           }
         }
@@ -493,7 +493,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends
Act
             val coresToUse = math.min(worker.coresFree, app.coresLeft)
             if (coresToUse > 0) {
               val exec = app.addExecutor(worker, coresToUse)
-              launchExecutor(worker, exec, app.desc.sparkHome)
+              launchExecutor(worker, exec)
               app.state = ApplicationState.RUNNING
             }
           }
@@ -502,11 +502,11 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int)
extends Act
     }
   }
 
-  def launchExecutor(worker: WorkerInfo, exec: ExecutorInfo, sparkHome: String) {
+  def launchExecutor(worker: WorkerInfo, exec: ExecutorInfo) {
     logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
     worker.addExecutor(exec)
     worker.actor ! LaunchExecutor(masterUrl,
-      exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory, sparkHome)
+      exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory)
     exec.application.driver ! ExecutorAdded(
       exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)
   }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/29f4b6a2/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index c9e4fc2..de45da2 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -204,17 +204,15 @@ private[spark] class Worker(
         System.exit(1)
       }
 
-    case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_, execSparkHome_)
=>
+    case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>
       if (masterUrl != activeMasterUrl) {
         logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")
       } else {
         try {
           logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
-          // TODO (pwendell): We shuld make sparkHome an Option[String] in
-          // ApplicationDescription to be more explicit about this.
-          val effectiveSparkHome = Option(execSparkHome_).getOrElse(sparkHome.getAbsolutePath)
           val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_,
-            self, workerId, host, new File(effectiveSparkHome), workDir, akkaUrl, ExecutorState.RUNNING)
+            self, workerId, host, new File(appDesc.sparkHome.getOrElse(sparkHome.getAbsolutePath)),
+            workDir, akkaUrl, ExecutorState.RUNNING)
           executors(appId + "/" + execId) = manager
           manager.start()
           coresUsed += cores_
@@ -224,6 +222,10 @@ private[spark] class Worker(
           }
         } catch {
           case e: Exception => {
+            logError("Failed to launch exector %s/%d for %s".format(appId, execId, appDesc.name))
+            if (executors.contains(appId + "/" + execId)) {
+              executors(appId + "/" + execId).kill()
+            }
             masterLock.synchronized {
               master ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, None, None)
             }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/29f4b6a2/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index faa6e1e..33aac52 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -50,7 +50,7 @@ private[spark] class SparkDeploySchedulerBackend(
     val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}", "{{WORKER_URL}}")
     val command = Command(
       "org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs)
-    val sparkHome = sc.getSparkHome().getOrElse(null)
+    val sparkHome = sc.getSparkHome()
     val appDesc = new ApplicationDescription(appName, maxCores, sc.executorMemory, command,
sparkHome,
         "http://" + sc.ui.appUIAddress)
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/29f4b6a2/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
index d05bbd6..693b1ab 100644
--- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
@@ -74,7 +74,7 @@ class JsonProtocolSuite extends FunSuite {
 
   def createAppDesc(): ApplicationDescription = {
     val cmd = new Command("mainClass", List("arg1", "arg2"), Map())
-    new ApplicationDescription("name", Some(4), 1234, cmd, "sparkHome", "appUiUrl")
+    new ApplicationDescription("name", Some(4), 1234, cmd, Some("sparkHome"), "appUiUrl")
   }
 
   def createAppInfo() : ApplicationInfo = {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/29f4b6a2/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
index a79ee69..4baa656 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
@@ -26,11 +26,11 @@ import org.apache.spark.deploy.{ExecutorState, Command, ApplicationDescription}
 class ExecutorRunnerTest extends FunSuite {
   test("command includes appId") {
     def f(s:String) = new File(s)
-    val sparkHome = sys.env.get("SPARK_HOME").orElse(sys.props.get("spark.home")).get
+    val sparkHome = sys.env.get("SPARK_HOME").orElse(sys.props.get("spark.home"))
     val appDesc = new ApplicationDescription("app name", Some(8), 500, Command("foo", Seq(),Map()),
       sparkHome, "appUiUrl")
     val appId = "12345-worker321-9876"
-    val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321", f(sparkHome),
+    val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321", f(sparkHome.getOrElse(".")),
       f("ooga"), "blah", ExecutorState.RUNNING)
 
     assert(er.getCommandSeq.last === appId)


Mime
View raw message