spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pwend...@apache.org
Subject [18/37] git commit: Changes to allow fate sharing of drivers/executors and workers.
Date Fri, 10 Jan 2014 02:38:23 GMT
Changes to allow fate sharing of drivers/executors and workers.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/35f6dc25
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/35f6dc25
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/35f6dc25

Branch: refs/heads/master
Commit: 35f6dc252a8961189837e79914f305d0745a8792
Parents: c8c8b42
Author: Patrick Wendell <pwendell@gmail.com>
Authored: Sat Dec 28 11:08:37 2013 -0800
Committer: Patrick Wendell <pwendell@gmail.com>
Committed: Sun Dec 29 11:14:36 2013 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   |  1 +
 .../org/apache/spark/deploy/DeployMessage.scala |  2 +-
 .../apache/spark/deploy/DriverDescription.scala |  7 +-
 .../spark/deploy/client/DriverClient.scala      | 18 ++++--
 .../deploy/client/DriverClientArguments.scala   | 23 +------
 .../org/apache/spark/deploy/master/Master.scala |  2 +-
 .../spark/deploy/master/ui/IndexPage.scala      |  2 +-
 .../spark/deploy/worker/CommandUtils.scala      | 63 ++++++++++++++++++
 .../spark/deploy/worker/DriverRunner.scala      | 50 ++++++++-------
 .../spark/deploy/worker/DriverWrapper.scala     | 30 +++++++++
 .../spark/deploy/worker/ExecutorRunner.scala    | 67 ++++----------------
 .../org/apache/spark/deploy/worker/Worker.scala | 13 +++-
 .../spark/deploy/worker/WorkerWatcher.scala     | 47 ++++++++++++++
 .../spark/deploy/worker/ui/IndexPage.scala      |  2 +-
 .../executor/CoarseGrainedExecutorBackend.scala | 27 +++++---
 .../cluster/SparkDeploySchedulerBackend.scala   |  2 +-
 .../apache/spark/deploy/JsonProtocolSuite.scala | 16 +++--
 17 files changed, 239 insertions(+), 133 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/35f6dc25/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index ad3337d..41f810d 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -622,6 +622,7 @@ class SparkContext(
       } else {
         val uri = new URI(path)
         key = uri.getScheme match {
+          // TODO: Have this load jars that are available on the driver
           // A JAR file which exists only on the driver node
           case null | "file" =>
             if (SparkHadoopUtil.get.isYarnMode()) {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/35f6dc25/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
index 7bfc377..34460d3 100644
--- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
@@ -158,7 +158,7 @@ private[deploy] object DeployMessages {
     assert (port > 0)
   }
 
-  // Actor System to Worker
+  // Liveness checks in various places
 
   case object SendHeartbeat
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/35f6dc25/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala b/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala
index 32ff6db..aba81ec 100644
--- a/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala
@@ -19,13 +19,10 @@ package org.apache.spark.deploy
 
 private[spark] class DriverDescription(
     val jarUrl: String,
-    val mainClass: String,
     val mem: Int,
     val cores: Int,
-    val options: Seq[String],
-    val javaOptions: Seq[String],
-    val envVars: Seq[(String, String)])
+    val command: Command)
   extends Serializable {
 
-  override def toString: String = s"DriverDescription ($mainClass)"
+  override def toString: String = s"DriverDescription (${command.mainClass})"
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/35f6dc25/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala
index 8f19294..7e75563 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala
@@ -17,12 +17,14 @@
 
 package org.apache.spark.deploy.client
 
+import scala.collection.JavaConversions._
+import scala.collection.mutable.Map
 import scala.concurrent._
 
 import akka.actor._
 
 import org.apache.spark.Logging
-import org.apache.spark.deploy.DriverDescription
+import org.apache.spark.deploy.{Command, DriverDescription}
 import org.apache.spark.deploy.DeployMessages._
 import org.apache.spark.deploy.master.Master
 import org.apache.spark.util.{AkkaUtils, Utils}
@@ -68,14 +70,20 @@ object DriverClient extends Logging {
 
     driverArgs.cmd match {
       case "launch" =>
+        // TODO: Could modify env here to pass a flag indicating Spark is in deploy-driver
mode
+        //       then use that to load jars locally
+        val env = Map[String, String]()
+        System.getenv().foreach{case (k, v) => env(k) = v}
+
+        val mainClass = "org.apache.spark.deploy.worker.DriverWrapper"
+        val command = new Command(mainClass, Seq("{{WORKER_URL}}", driverArgs.mainClass)
++
+          driverArgs.driverOptions, env)
+
         val driverDescription = new DriverDescription(
           driverArgs.jarUrl,
-          driverArgs.mainClass,
           driverArgs.memory,
           driverArgs.cores,
-          driverArgs.driverOptions,
-          driverArgs.driverJavaOptions,
-          driverArgs.driverEnvVars)
+          command)
         driver ! RequestSubmitDriver(driverDescription)
 
       case "kill" =>

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/35f6dc25/core/src/main/scala/org/apache/spark/deploy/client/DriverClientArguments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/DriverClientArguments.scala
b/core/src/main/scala/org/apache/spark/deploy/client/DriverClientArguments.scala
index 0c84cc9..3875838 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/DriverClientArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/DriverClientArguments.scala
@@ -32,11 +32,7 @@ private[spark] class DriverClientArguments(args: Array[String]) {
   var memory: Int = 512
   var cores: Int = 1
   private var _driverOptions = ListBuffer[String]()
-  private var _driverJavaOptions = ListBuffer[String]()
-  private var _driverEnvVars = ListBuffer[(String, String)]()
   def driverOptions = _driverOptions.toSeq
-  def driverJavaOptions = _driverJavaOptions.toSeq
-  def driverEnvVars = _driverEnvVars.toSeq
 
   // kill parameters
   var driverId: String = ""
@@ -52,19 +48,6 @@ private[spark] class DriverClientArguments(args: Array[String]) {
       memory = value.toInt
       parse(tail)
 
-    case ("--java-option" | "-j") :: value :: tail =>
-      _driverJavaOptions += value
-      parse(tail)
-
-    case ("--environment-variable" | "-e") :: value :: tail =>
-      val parts = value.split("=")
-      if (parts.length != 2) {
-        println(s"Error - invalid environment variable (expecting K=V): $value")
-        printUsageAndExit(1)
-      }
-      _driverEnvVars += ((parts(0), parts(1)))
-      parse(tail)
-
     case ("--help" | "-h") :: tail =>
       printUsageAndExit(0)
 
@@ -92,7 +75,7 @@ private[spark] class DriverClientArguments(args: Array[String]) {
     //      1) Create an uber jar with your application and dependencies (excluding Spark)
     //      2) You'll need to add this jar using addJar(X) inside of your spark context
 
-    // TODO: It wouldnt be too hard to allow users to submit their app and dependency jars
+    // TODO: It wouldn't be too hard to allow users to submit their app and dependency jars
     //       separately similar to in the YARN client.
     System.err.println(
       "usage: DriverClient [options] launch <active-master> <jar-url> <main-class>
" +
@@ -100,9 +83,7 @@ private[spark] class DriverClientArguments(args: Array[String]) {
       "usage: DriverClient kill <active-master> <driver-id>\n\n" +
       "Options:\n" +
       "  -c CORES, --cores CORES                Number of cores to request \n" +
-      "  -m MEMORY, --memory MEMORY             Megabytes of memory to request\n" +
-      "  -o JAVA_OPT, --java-option JAVA_OPT    JVM option to pass to driver\n" +
-      "  -e K=V, --environment-variable K=V     Environment variable to pass to driver\n")
+      "  -m MEMORY, --memory MEMORY             Megabytes of memory to request\n")
     System.exit(exitCode)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/35f6dc25/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index a0db2a2..efb9bf4 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -179,7 +179,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends
Act
         val msg = s"Can only accept driver submissions in ALIVE state. Current state: $state."
         sender ! SubmitDriverResponse(false, msg)
       } else {
-        logInfo("Driver submitted " + description.mainClass)
+        logInfo("Driver submitted " + description.command.mainClass)
         val driver = createDriver(description)
         persistenceEngine.addDriver(driver)
         waitingDrivers += driver

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/35f6dc25/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala
index 951fc67..a72d76b 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala
@@ -170,7 +170,7 @@ private[spark] class IndexPage(parent: MasterWebUI) {
       <td sorttable_customkey={driver.desc.mem.toString}>
         {Utils.megabytesToString(driver.desc.mem.toLong)}
       </td>
-      <td>{driver.desc.mainClass}</td>
+      <td>{driver.desc.command.mainClass}</td>
     </tr>
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/35f6dc25/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
new file mode 100644
index 0000000..785aecf
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
@@ -0,0 +1,63 @@
+package org.apache.spark.deploy.worker
+
+import java.io.{File, FileOutputStream, IOException, InputStream}
+import java.lang.System._
+
+import org.apache.spark.Logging
+import org.apache.spark.deploy.Command
+import org.apache.spark.util.Utils
+
+/**
+ ** Utilities for running commands with the spark classpath.
+ */
+object CommandUtils extends Logging {
+  private[spark] def buildCommandSeq(command: Command, memory: Int, sparkHome: String): Seq[String]
= {
+    val runner = getEnv("JAVA_HOME", command).map(_ + "/bin/java").getOrElse("java")
+
+    // SPARK-698: do not call the run.cmd script, as process.destroy()
+    // fails to kill a process tree on Windows
+    Seq(runner) ++ buildJavaOpts(command, memory, sparkHome) ++ Seq(command.mainClass) ++
+      command.arguments
+  }
+
+  private def getEnv(key: String, command: Command): Option[String] =
+    command.environment.get(key).orElse(Option(getenv(key)))
+
+  /**
+   * Attention: this must always be aligned with the environment variables in the run scripts
and
+   * the way the JAVA_OPTS are assembled there.
+   */
+  def buildJavaOpts(command: Command, memory: Int, sparkHome: String): Seq[String] = {
+    val libraryOpts = getEnv("SPARK_LIBRARY_PATH", command)
+      .map(p => List("-Djava.library.path=" + p))
+      .getOrElse(Nil)
+    val workerLocalOpts = Option(getenv("SPARK_JAVA_OPTS")).map(Utils.splitCommandString).getOrElse(Nil)
+    val userOpts = getEnv("SPARK_JAVA_OPTS", command).map(Utils.splitCommandString).getOrElse(Nil)
+    val memoryOpts = Seq(s"-Xms${memory}M", s"-Xmx${memory}M")
+
+    // Figure out our classpath with the external compute-classpath script
+    val ext = if (System.getProperty("os.name").startsWith("Windows")) ".cmd" else ".sh"
+    val classPath = Utils.executeAndGetOutput(
+      Seq(sparkHome + "/bin/compute-classpath" + ext),
+      extraEnvironment=command.environment)
+
+    Seq("-cp", classPath) ++ libraryOpts ++ workerLocalOpts ++ userOpts ++ memoryOpts
+  }
+
+  /** Spawn a thread that will redirect a given stream to a file */
+  def redirectStream(in: InputStream, file: File) {
+    val out = new FileOutputStream(file, true)
+    // TODO: It would be nice to add a shutdown hook here that explains why the output is
+    //       terminating. Otherwise if the worker dies the executor logs will silently stop.
+    new Thread("redirect output to " + file) {
+      override def run() {
+        try {
+          Utils.copyStream(in, out, true)
+        } catch {
+          case e: IOException =>
+            logInfo("Redirection to " + file + " closed: " + e.getMessage)
+        }
+      }
+    }.start()
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/35f6dc25/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
index 402ad53..8950fb7 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
@@ -19,6 +19,8 @@ package org.apache.spark.deploy.worker
 
 import java.io._
 
+import scala.collection.mutable.Map
+
 import akka.actor.ActorRef
 import com.google.common.base.Charsets
 import com.google.common.io.Files
@@ -26,10 +28,9 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileUtil, Path}
 
 import org.apache.spark.Logging
-import org.apache.spark.deploy.DriverDescription
+import org.apache.spark.deploy.{Command, DriverDescription}
 import org.apache.spark.deploy.DeployMessages.DriverStateChanged
 import org.apache.spark.deploy.master.DriverState
-import org.apache.spark.util.Utils
 
 /**
  * Manages the execution of one driver, including automatically restarting the driver on
failure.
@@ -37,8 +38,10 @@ import org.apache.spark.util.Utils
 private[spark] class DriverRunner(
     val driverId: String,
     val workDir: File,
+    val sparkHome: File,
     val driverDesc: DriverDescription,
-    val worker: ActorRef)
+    val worker: ActorRef,
+    val workerUrl: String)
   extends Logging {
 
   @volatile var process: Option[Process] = None
@@ -53,9 +56,17 @@ private[spark] class DriverRunner(
         try {
           val driverDir = createWorkingDirectory()
           val localJarFilename = downloadUserJar(driverDir)
-          val command = Seq("java") ++ driverDesc.javaOptions ++ Seq(s"-Xmx${driverDesc.mem}m")
++
-            Seq("-cp", localJarFilename) ++ Seq(driverDesc.mainClass) ++ driverDesc.options
-          runCommandWithRetry(command, driverDesc.envVars, driverDir)
+
+          // Make sure user application jar is on the classpath
+          // TODO: This could eventually exploit ability for driver to add jars
+          val env = Map(driverDesc.command.environment.toSeq: _*)
+          env("SPARK_CLASSPATH") = env.getOrElse("SPARK_CLASSPATH", "") + s":$localJarFilename"
+          val newCommand = Command(driverDesc.command.mainClass,
+            driverDesc.command.arguments.map(substituteVariables), env)
+
+          val command = CommandUtils.buildCommandSeq(newCommand, driverDesc.mem,
+            sparkHome.getAbsolutePath)
+          runCommandWithRetry(command, env, driverDir)
         }
         catch {
           case e: Exception => exn = Some(e)
@@ -79,26 +90,17 @@ private[spark] class DriverRunner(
     }
   }
 
-  /** Spawn a thread that will redirect a given stream to a file */
-  def redirectStream(in: InputStream, file: File) {
-    val out = new FileOutputStream(file, true)
-    new Thread("redirect output to " + file) {
-      override def run() {
-        try {
-          Utils.copyStream(in, out, true)
-        } catch {
-          case e: IOException =>
-            logInfo("Redirection to " + file + " closed: " + e.getMessage)
-        }
-      }
-    }.start()
+  /** Replace variables in a command argument passed to us */
+  private def substituteVariables(argument: String): String = argument match {
+    case "{{WORKER_URL}}" => workerUrl
+    case other => other
   }
 
   /**
    * Creates the working directory for this driver.
    * Will throw an exception if there are errors preparing the directory.
    */
-  def createWorkingDirectory(): File = {
+  private def createWorkingDirectory(): File = {
     val driverDir = new File(workDir, driverId)
     if (!driverDir.exists() && !driverDir.mkdirs()) {
       throw new IOException("Failed to create directory " + driverDir)
@@ -110,7 +112,7 @@ private[spark] class DriverRunner(
    * Download the user jar into the supplied directory and return its local path.
    * Will throw an exception if there are errors downloading the jar.
    */
-  def downloadUserJar(driverDir: File): String = {
+  private def downloadUserJar(driverDir: File): String = {
 
     val jarPath = new Path(driverDesc.jarUrl)
 
@@ -136,7 +138,7 @@ private[spark] class DriverRunner(
   }
 
   /** Continue launching the supplied command until it exits zero or is killed. */
-  def runCommandWithRetry(command: Seq[String], envVars: Seq[(String, String)], baseDir:
File) {
+  private def runCommandWithRetry(command: Seq[String], envVars: Map[String, String], baseDir:
File) {
     // Time to wait between submission retries.
     var waitSeconds = 1
     var cleanExit = false
@@ -153,13 +155,13 @@ private[spark] class DriverRunner(
 
         // Redirect stdout and stderr to files
         val stdout = new File(baseDir, "stdout")
-        redirectStream(process.get.getInputStream, stdout)
+        CommandUtils.redirectStream(process.get.getInputStream, stdout)
 
         val stderr = new File(baseDir, "stderr")
         val header = "Launch Command: %s\n%s\n\n".format(
           command.mkString("\"", "\" \"", "\""), "=" * 40)
         Files.write(header, stderr, Charsets.UTF_8)
-        redirectStream(process.get.getErrorStream, stderr)
+        CommandUtils.redirectStream(process.get.getErrorStream, stderr)
       }
 
       val exitCode = process.get.waitFor()

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/35f6dc25/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
new file mode 100644
index 0000000..8c13b10
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
@@ -0,0 +1,30 @@
+package org.apache.spark.deploy.worker
+
+import akka.actor._
+
+import org.apache.spark.util.{AkkaUtils, Utils}
+
+/**
+ * Utility object for launching driver programs such that they share fate with the Worker
process.
+ */
+object DriverWrapper {
+  def main(args: Array[String]) {
+    args.toList match {
+      case workerUrl :: mainClass :: extraArgs =>
+        val (actorSystem, boundPort) = AkkaUtils.createActorSystem("Driver",
+          Utils.localHostName(), 0)
+        actorSystem.actorOf(Props(classOf[WorkerWatcher], workerUrl), name = "workerWatcher")
+
+        // Delegate to supplied main class
+        val clazz = Class.forName(args(1))
+        val mainMethod = clazz.getMethod("main", classOf[Array[String]])
+        mainMethod.invoke(null, extraArgs.toArray[String])
+
+        actorSystem.awaitTermination()
+
+      case _ =>
+        System.err.println("Usage: DriverWrapper <workerUrl> <driverMainClass>
[options]")
+        System.exit(-1)
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/35f6dc25/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
index fff9cb6..2e61d39 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -18,17 +18,15 @@
 package org.apache.spark.deploy.worker
 
 import java.io._
-import java.lang.System.getenv
 
 import akka.actor.ActorRef
 
 import com.google.common.base.Charsets
 import com.google.common.io.Files
 
-import org.apache.spark.{Logging}
-import org.apache.spark.deploy.{ExecutorState, ApplicationDescription}
+import org.apache.spark.Logging
+import org.apache.spark.deploy.{ExecutorState, ApplicationDescription, Command}
 import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged
-import org.apache.spark.util.Utils
 
 /**
  * Manages the execution of one executor process.
@@ -44,16 +42,17 @@ private[spark] class ExecutorRunner(
     val host: String,
     val sparkHome: File,
     val workDir: File,
+    val workerUrl: String,
     var state: ExecutorState.Value)
   extends Logging {
 
   val fullId = appId + "/" + execId
   var workerThread: Thread = null
   var process: Process = null
-  var shutdownHook: Thread = null
 
-  private def getAppEnv(key: String): Option[String] =
-    appDesc.command.environment.get(key).orElse(Option(getenv(key)))
+  // NOTE: This is now redundant with the automated shut-down enforced by the Executor. It
mike
+  // make sense to remove this in the future.
+  var shutdownHook: Thread = null
 
   def start() {
     workerThread = new Thread("ExecutorRunner for " + fullId) {
@@ -92,57 +91,13 @@ private[spark] class ExecutorRunner(
 
   /** Replace variables such as {{EXECUTOR_ID}} and {{CORES}} in a command argument passed
to us */
   def substituteVariables(argument: String): String = argument match {
+    case "{{WORKER_URL}}" => workerUrl
     case "{{EXECUTOR_ID}}" => execId.toString
     case "{{HOSTNAME}}" => host
     case "{{CORES}}" => cores.toString
     case other => other
   }
 
-  def buildCommandSeq(): Seq[String] = {
-    val command = appDesc.command
-    val runner = getAppEnv("JAVA_HOME").map(_ + "/bin/java").getOrElse("java")
-    // SPARK-698: do not call the run.cmd script, as process.destroy()
-    // fails to kill a process tree on Windows
-    Seq(runner) ++ buildJavaOpts() ++ Seq(command.mainClass) ++
-      (command.arguments ++ Seq(appId)).map(substituteVariables)
-  }
-
-  /**
-   * Attention: this must always be aligned with the environment variables in the run scripts
and
-   * the way the JAVA_OPTS are assembled there.
-   */
-  def buildJavaOpts(): Seq[String] = {
-    val libraryOpts = getAppEnv("SPARK_LIBRARY_PATH")
-      .map(p => List("-Djava.library.path=" + p))
-      .getOrElse(Nil)
-    val workerLocalOpts = Option(getenv("SPARK_JAVA_OPTS")).map(Utils.splitCommandString).getOrElse(Nil)
-    val userOpts = getAppEnv("SPARK_JAVA_OPTS").map(Utils.splitCommandString).getOrElse(Nil)
-    val memoryOpts = Seq("-Xms" + memory + "M", "-Xmx" + memory + "M")
-
-    // Figure out our classpath with the external compute-classpath script
-    val ext = if (System.getProperty("os.name").startsWith("Windows")) ".cmd" else ".sh"
-    val classPath = Utils.executeAndGetOutput(
-        Seq(sparkHome + "/bin/compute-classpath" + ext),
-        extraEnvironment=appDesc.command.environment)
-
-    Seq("-cp", classPath) ++ libraryOpts ++ workerLocalOpts ++ userOpts ++ memoryOpts
-  }
-
-  /** Spawn a thread that will redirect a given stream to a file */
-  def redirectStream(in: InputStream, file: File) {
-    val out = new FileOutputStream(file, true)
-    new Thread("redirect output to " + file) {
-      override def run() {
-        try {
-          Utils.copyStream(in, out, true)
-        } catch {
-          case e: IOException =>
-            logInfo("Redirection to " + file + " closed: " + e.getMessage)
-        }
-      }
-    }.start()
-  }
-
   /**
    * Download and run the executor described in our ApplicationDescription
    */
@@ -155,7 +110,9 @@ private[spark] class ExecutorRunner(
       }
 
       // Launch the process
-      val command = buildCommandSeq()
+      val fullCommand = new Command(appDesc.command.mainClass,
+        appDesc.command.arguments.map(substituteVariables), appDesc.command.environment)
+      val command = CommandUtils.buildCommandSeq(fullCommand, memory, sparkHome.getAbsolutePath)
       logInfo("Launch command: " + command.mkString("\"", "\" \"", "\""))
       val builder = new ProcessBuilder(command: _*).directory(executorDir)
       val env = builder.environment()
@@ -172,11 +129,11 @@ private[spark] class ExecutorRunner(
 
       // Redirect its stdout and stderr to files
       val stdout = new File(executorDir, "stdout")
-      redirectStream(process.getInputStream, stdout)
+      CommandUtils.redirectStream(process.getInputStream, stdout)
 
       val stderr = new File(executorDir, "stderr")
       Files.write(header, stderr, Charsets.UTF_8)
-      redirectStream(process.getErrorStream, stderr)
+      CommandUtils.redirectStream(process.getErrorStream, stderr)
 
       // Wait for it to exit; this is actually a bad thing if it happens, because we expect
to run
       // long-lived processes only. However, in the future, we might restart the executor
a few

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/35f6dc25/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 21ec881..4e23e0d 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -45,6 +45,8 @@ private[spark] class Worker(
     cores: Int,
     memory: Int,
     masterUrls: Array[String],
+    actorSystemName: String,
+    actorName: String,
     workDirPath: String = null)
   extends Actor with Logging {
   import context.dispatcher
@@ -68,6 +70,7 @@ private[spark] class Worker(
   var masterAddress: Address = null
   var activeMasterUrl: String = ""
   var activeMasterWebUiUrl : String = ""
+  val akkaUrl = "akka.tcp://%s@%s:%s/user/%s".format(actorSystemName, host, port, actorName)
   @volatile var registered = false
   @volatile var connected = false
   val workerId = generateWorkerId()
@@ -190,6 +193,9 @@ private[spark] class Worker(
         map(e => new ExecutorDescription(e.appId, e.execId, e.cores, e.state))
       sender ! WorkerSchedulerStateResponse(workerId, execs.toList, drivers.keys.toSeq)
 
+    case Heartbeat =>
+      logInfo(s"Received heartbeat from driver ${sender.path}")
+
     case RegisterWorkerFailed(message) =>
       if (!registered) {
         logError("Worker registration failed: " + message)
@@ -202,7 +208,7 @@ private[spark] class Worker(
       } else {
         logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
         val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_,
-          self, workerId, host, new File(execSparkHome_), workDir, ExecutorState.RUNNING)
+          self, workerId, host, new File(execSparkHome_), workDir, akkaUrl, ExecutorState.RUNNING)
         executors(appId + "/" + execId) = manager
         manager.start()
         coresUsed += cores_
@@ -244,7 +250,7 @@ private[spark] class Worker(
 
     case LaunchDriver(driverId, driverDesc) => {
       logInfo(s"Asked to launch driver $driverId")
-      val driver = new DriverRunner(driverId, workDir, driverDesc, self)
+      val driver = new DriverRunner(driverId, workDir, sparkHome, driverDesc, self, akkaUrl)
       drivers(driverId) = driver
       driver.start()
 
@@ -322,9 +328,10 @@ private[spark] object Worker {
     : (ActorSystem, Int) = {
     // The LocalSparkCluster runs multiple local sparkWorkerX actor systems
     val systemName = "sparkWorker" + workerNumber.map(_.toString).getOrElse("")
+    val actorName = "Worker"
     val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port)
     actorSystem.actorOf(Props(classOf[Worker], host, boundPort, webUiPort, cores, memory,
-      masterUrls, workDir), name = "Worker")
+      masterUrls, systemName, actorName, workDir), name = actorName)
     (actorSystem, boundPort)
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/35f6dc25/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala
new file mode 100644
index 0000000..e4352f1
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala
@@ -0,0 +1,47 @@
+package org.apache.spark.deploy.worker
+
+import akka.actor.{Actor, Address, AddressFromURIString}
+import akka.remote.{AssociatedEvent, AssociationErrorEvent, AssociationEvent, DisassociatedEvent,
RemotingLifecycleEvent}
+
+import org.apache.spark.Logging
+import org.apache.spark.deploy.DeployMessages.SendHeartbeat
+
+/**
+ * Actor which connects to a worker process and terminates the JVM if the connection is severed.
+ * Provides fate sharing between a worker and its associated child processes.
+ */
+private[spark] class WorkerWatcher(workerUrl: String) extends Actor with Logging {
+  override def preStart() {
+    context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
+
+    logInfo(s"Connecting to worker $workerUrl")
+    val worker = context.actorSelection(workerUrl)
+    worker ! SendHeartbeat // need to send a message here to initiate connection
+  }
+
+  // Lets us filter events only from the worker actor
+  private val expectedHostPort = AddressFromURIString(workerUrl).hostPort
+  private def isWorker(address: Address) = address.hostPort == expectedHostPort
+
+  override def receive = {
+    case AssociatedEvent(localAddress, remoteAddress, inbound) if isWorker(remoteAddress)
=>
+      logInfo(s"Successfully connected to $workerUrl")
+
+    case AssociationErrorEvent(cause, localAddress, remoteAddress, inbound)
+        if isWorker(remoteAddress) =>
+      // These logs may not be seen if the worker (and associated pipe) has died
+      logError(s"Could not initialize connection to worker $workerUrl. Exiting.")
+      logError(s"Error was: $cause")
+      System.exit(-1)
+
+    case DisassociatedEvent(localAddress, remoteAddress, inbound) if isWorker(remoteAddress)
=>
+      // This log message will never be seen
+      logError(s"Lost connection to worker actor $workerUrl. Exiting.")
+      System.exit(-1)
+
+    case e: AssociationEvent =>
+      // pass through association events relating to other remote actor systems
+
+    case e => logWarning(s"Received unexpected actor system event: $e")
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/35f6dc25/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala
index 35a1507..93c6ad4 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala
@@ -133,7 +133,7 @@ private[spark] class IndexPage(parent: WorkerWebUI) {
   def driverRow(driver: DriverRunner): Seq[Node] = {
     <tr>
       <td>{driver.driverId}</td>
-      <td>{driver.driverDesc.mainClass}</td>
+      <td>{driver.driverDesc.command.mainClass}</td>
       <td sorttable_customkey={driver.driverDesc.cores.toString}>
         {driver.driverDesc.cores.toString}
       </td>

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/35f6dc25/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index debbdd4..eb1199e 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -24,8 +24,9 @@ import akka.remote._
 
 import org.apache.spark.Logging
 import org.apache.spark.TaskState.TaskState
+import org.apache.spark.deploy.worker.WorkerWatcher
 import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
-import org.apache.spark.util.{Utils, AkkaUtils}
+import org.apache.spark.util.{AkkaUtils, Utils}
 
 private[spark] class CoarseGrainedExecutorBackend(
     driverUrl: String,
@@ -91,7 +92,8 @@ private[spark] class CoarseGrainedExecutorBackend(
 }
 
 private[spark] object CoarseGrainedExecutorBackend {
-  def run(driverUrl: String, executorId: String, hostname: String, cores: Int) {
+  def run(driverUrl: String, executorId: String, hostname: String, cores: Int,
+          workerUrl: Option[String]) {
     // Debug code
     Utils.checkHost(hostname)
 
@@ -105,17 +107,24 @@ private[spark] object CoarseGrainedExecutorBackend {
     actorSystem.actorOf(
       Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId, sparkHostPort,
cores),
       name = "Executor")
+    workerUrl.foreach{ url =>
+      actorSystem.actorOf(Props(classOf[WorkerWatcher], url), name = "WorkerWatcher")
+    }
     actorSystem.awaitTermination()
   }
 
   def main(args: Array[String]) {
-    if (args.length < 4) {
-      //the reason we allow the last appid argument is to make it easy to kill rogue executors
-      System.err.println(
-        "Usage: CoarseGrainedExecutorBackend <driverUrl> <executorId> <hostname>
<cores> " +
-        "[<appid>]")
-      System.exit(1)
+    args.length match {
+      case x if x < 4 =>
+        System.err.println(
+          // Worker url is used in spark standalone mode to enforce fate-sharing with worker
+          "Usage: CoarseGrainedExecutorBackend <driverUrl> <executorId> <hostname>
" +
+          "<cores> [<workerUrl>]")
+        System.exit(1)
+      case 4 =>
+        run(args(0), args(1), args(2), args(3).toInt, None)
+      case x if x > 4 =>
+        run(args(0), args(1), args(2), args(3).toInt, Some(args(4)))
     }
-    run(args(0), args(1), args(2), args(3).toInt)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/35f6dc25/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index 4da49c0..921b887 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -47,7 +47,7 @@ private[spark] class SparkDeploySchedulerBackend(
     val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
       System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"),
       CoarseGrainedSchedulerBackend.ACTOR_NAME)
-    val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}")
+    val args = Seq(driverUrl, "{{WORKER_URL}}", "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}")
     val command = Command(
       "org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs)
     val sparkHome = sc.getSparkHome().getOrElse(null)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/35f6dc25/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
index 485f688..372c9f4 100644
--- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
@@ -80,11 +80,14 @@ class JsonProtocolSuite extends FunSuite {
     new ApplicationInfo(3, "id", createAppDesc(), new Date(123456789), null, "appUriStr")
   }
 
-  def createDriverDesc() = new DriverDescription(
-    "hdfs://some-dir/some.jar", "org.apache.spark.FakeClass", 100, 3,
-    Seq("--some-config", "val", "--other-config", "val"), Seq("-Dsystem.property=someValue"),
-    Seq(("K1", "V1"), ("K2", "V2"))
+  def createDriverCommand() = new Command(
+    "org.apache.spark.FakeClass", Seq("some arg --and-some options -g foo"),
+    Map(("K1", "V1"), ("K2", "V2"))
   )
+
+  def createDriverDesc() = new DriverDescription("hdfs://some-dir/some.jar", 100, 3,
+    createDriverCommand())
+
   def createDriverInfo(): DriverInfo = new DriverInfo(3, "driver-3", createDriverDesc(),
new Date())
 
   def createWorkerInfo(): WorkerInfo = {
@@ -92,10 +95,11 @@ class JsonProtocolSuite extends FunSuite {
   }
   def createExecutorRunner(): ExecutorRunner = {
     new ExecutorRunner("appId", 123, createAppDesc(), 4, 1234, null, "workerId", "host",
-      new File("sparkHome"), new File("workDir"), ExecutorState.RUNNING)
+      new File("sparkHome"), new File("workDir"), "akka://worker", ExecutorState.RUNNING)
   }
   def createDriverRunner(): DriverRunner = {
-    new DriverRunner("driverId", new File("workDir"), createDriverDesc(), null)
+    new DriverRunner("driverId", new File("workDir"), new File("sparkHome"), createDriverDesc(),
+      null, "akka://worker")
   }
 
   def assertValidJson(json: JValue) {


Mime
View raw message