Return-Path: X-Original-To: apmail-spark-commits-archive@minotaur.apache.org Delivered-To: apmail-spark-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D175110EBB for ; Fri, 10 Jan 2014 02:38:32 +0000 (UTC) Received: (qmail 40668 invoked by uid 500); 10 Jan 2014 02:38:32 -0000 Delivered-To: apmail-spark-commits-archive@spark.apache.org Received: (qmail 40641 invoked by uid 500); 10 Jan 2014 02:38:32 -0000 Mailing-List: contact commits-help@spark.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@spark.incubator.apache.org Delivered-To: mailing list commits@spark.incubator.apache.org Received: (qmail 40634 invoked by uid 99); 10 Jan 2014 02:38:32 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 10 Jan 2014 02:38:32 +0000 X-ASF-Spam-Status: No, hits=-2000.1 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD,T_FILL_THIS_FORM_SHORT X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Fri, 10 Jan 2014 02:38:27 +0000 Received: (qmail 38544 invoked by uid 99); 10 Jan 2014 02:38:06 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 10 Jan 2014 02:38:06 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 7E2C38B4CA0; Fri, 10 Jan 2014 02:38:06 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: pwendell@apache.org To: commits@spark.incubator.apache.org Date: Fri, 10 Jan 2014 02:38:23 -0000 Message-Id: In-Reply-To: <324acd8af6294f1a81c255cfb56ee5f6@git.apache.org> References: <324acd8af6294f1a81c255cfb56ee5f6@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [18/37] git commit: Changes to allow fate sharing of drivers/executors and workers. X-Virus-Checked: Checked by ClamAV on apache.org Changes to allow fate sharing of drivers/executors and workers. Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/35f6dc25 Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/35f6dc25 Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/35f6dc25 Branch: refs/heads/master Commit: 35f6dc252a8961189837e79914f305d0745a8792 Parents: c8c8b42 Author: Patrick Wendell Authored: Sat Dec 28 11:08:37 2013 -0800 Committer: Patrick Wendell Committed: Sun Dec 29 11:14:36 2013 -0800 ---------------------------------------------------------------------- .../scala/org/apache/spark/SparkContext.scala | 1 + .../org/apache/spark/deploy/DeployMessage.scala | 2 +- .../apache/spark/deploy/DriverDescription.scala | 7 +- .../spark/deploy/client/DriverClient.scala | 18 ++++-- .../deploy/client/DriverClientArguments.scala | 23 +------ .../org/apache/spark/deploy/master/Master.scala | 2 +- .../spark/deploy/master/ui/IndexPage.scala | 2 +- .../spark/deploy/worker/CommandUtils.scala | 63 ++++++++++++++++++ .../spark/deploy/worker/DriverRunner.scala | 50 ++++++++------- .../spark/deploy/worker/DriverWrapper.scala | 30 +++++++++ .../spark/deploy/worker/ExecutorRunner.scala | 67 ++++---------------- .../org/apache/spark/deploy/worker/Worker.scala | 13 +++- .../spark/deploy/worker/WorkerWatcher.scala | 47 ++++++++++++++ .../spark/deploy/worker/ui/IndexPage.scala | 2 +- .../executor/CoarseGrainedExecutorBackend.scala | 27 +++++--- .../cluster/SparkDeploySchedulerBackend.scala | 2 +- .../apache/spark/deploy/JsonProtocolSuite.scala | 16 +++-- 17 files changed, 239 insertions(+), 133 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/35f6dc25/core/src/main/scala/org/apache/spark/SparkContext.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index ad3337d..41f810d 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -622,6 +622,7 @@ class SparkContext( } else { val uri = new URI(path) key = uri.getScheme match { + // TODO: Have this load jars that are available on the driver // A JAR file which exists only on the driver node case null | "file" => if (SparkHadoopUtil.get.isYarnMode()) { http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/35f6dc25/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala index 7bfc377..34460d3 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala @@ -158,7 +158,7 @@ private[deploy] object DeployMessages { assert (port > 0) } - // Actor System to Worker + // Liveness checks in various places case object SendHeartbeat } http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/35f6dc25/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala b/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala index 32ff6db..aba81ec 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala @@ -19,13 +19,10 @@ package org.apache.spark.deploy private[spark] class DriverDescription( val jarUrl: String, - val mainClass: String, val mem: Int, val cores: Int, - val options: Seq[String], - val javaOptions: Seq[String], - val envVars: Seq[(String, String)]) + val command: Command) extends Serializable { - override def toString: String = s"DriverDescription ($mainClass)" + override def toString: String = s"DriverDescription (${command.mainClass})" } http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/35f6dc25/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala index 8f19294..7e75563 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala @@ -17,12 +17,14 @@ package org.apache.spark.deploy.client +import scala.collection.JavaConversions._ +import scala.collection.mutable.Map import scala.concurrent._ import akka.actor._ import org.apache.spark.Logging -import org.apache.spark.deploy.DriverDescription +import org.apache.spark.deploy.{Command, DriverDescription} import org.apache.spark.deploy.DeployMessages._ import org.apache.spark.deploy.master.Master import org.apache.spark.util.{AkkaUtils, Utils} @@ -68,14 +70,20 @@ object DriverClient extends Logging { driverArgs.cmd match { case "launch" => + // TODO: Could modify env here to pass a flag indicating Spark is in deploy-driver mode + // then use that to load jars locally + val env = Map[String, String]() + System.getenv().foreach{case (k, v) => env(k) = v} + + val mainClass = "org.apache.spark.deploy.worker.DriverWrapper" + val command = new Command(mainClass, Seq("{{WORKER_URL}}", driverArgs.mainClass) ++ + driverArgs.driverOptions, env) + val driverDescription = new DriverDescription( driverArgs.jarUrl, - driverArgs.mainClass, driverArgs.memory, driverArgs.cores, - driverArgs.driverOptions, - driverArgs.driverJavaOptions, - driverArgs.driverEnvVars) + command) driver ! RequestSubmitDriver(driverDescription) case "kill" => http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/35f6dc25/core/src/main/scala/org/apache/spark/deploy/client/DriverClientArguments.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/client/DriverClientArguments.scala b/core/src/main/scala/org/apache/spark/deploy/client/DriverClientArguments.scala index 0c84cc9..3875838 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/DriverClientArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/DriverClientArguments.scala @@ -32,11 +32,7 @@ private[spark] class DriverClientArguments(args: Array[String]) { var memory: Int = 512 var cores: Int = 1 private var _driverOptions = ListBuffer[String]() - private var _driverJavaOptions = ListBuffer[String]() - private var _driverEnvVars = ListBuffer[(String, String)]() def driverOptions = _driverOptions.toSeq - def driverJavaOptions = _driverJavaOptions.toSeq - def driverEnvVars = _driverEnvVars.toSeq // kill parameters var driverId: String = "" @@ -52,19 +48,6 @@ private[spark] class DriverClientArguments(args: Array[String]) { memory = value.toInt parse(tail) - case ("--java-option" | "-j") :: value :: tail => - _driverJavaOptions += value - parse(tail) - - case ("--environment-variable" | "-e") :: value :: tail => - val parts = value.split("=") - if (parts.length != 2) { - println(s"Error - invalid environment variable (expecting K=V): $value") - printUsageAndExit(1) - } - _driverEnvVars += ((parts(0), parts(1))) - parse(tail) - case ("--help" | "-h") :: tail => printUsageAndExit(0) @@ -92,7 +75,7 @@ private[spark] class DriverClientArguments(args: Array[String]) { // 1) Create an uber jar with your application and dependencies (excluding Spark) // 2) You'll need to add this jar using addJar(X) inside of your spark context - // TODO: It wouldnt be too hard to allow users to submit their app and dependency jars + // TODO: It wouldn't be too hard to allow users to submit their app and dependency jars // separately similar to in the YARN client. System.err.println( "usage: DriverClient [options] launch " + @@ -100,9 +83,7 @@ private[spark] class DriverClientArguments(args: Array[String]) { "usage: DriverClient kill \n\n" + "Options:\n" + " -c CORES, --cores CORES Number of cores to request \n" + - " -m MEMORY, --memory MEMORY Megabytes of memory to request\n" + - " -o JAVA_OPT, --java-option JAVA_OPT JVM option to pass to driver\n" + - " -e K=V, --environment-variable K=V Environment variable to pass to driver\n") + " -m MEMORY, --memory MEMORY Megabytes of memory to request\n") System.exit(exitCode) } } http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/35f6dc25/core/src/main/scala/org/apache/spark/deploy/master/Master.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index a0db2a2..efb9bf4 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -179,7 +179,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act val msg = s"Can only accept driver submissions in ALIVE state. Current state: $state." sender ! SubmitDriverResponse(false, msg) } else { - logInfo("Driver submitted " + description.mainClass) + logInfo("Driver submitted " + description.command.mainClass) val driver = createDriver(description) persistenceEngine.addDriver(driver) waitingDrivers += driver http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/35f6dc25/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala index 951fc67..a72d76b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala @@ -170,7 +170,7 @@ private[spark] class IndexPage(parent: MasterWebUI) { {Utils.megabytesToString(driver.desc.mem.toLong)} - {driver.desc.mainClass} + {driver.desc.command.mainClass} } } http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/35f6dc25/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala new file mode 100644 index 0000000..785aecf --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala @@ -0,0 +1,63 @@ +package org.apache.spark.deploy.worker + +import java.io.{File, FileOutputStream, IOException, InputStream} +import java.lang.System._ + +import org.apache.spark.Logging +import org.apache.spark.deploy.Command +import org.apache.spark.util.Utils + +/** + ** Utilities for running commands with the spark classpath. + */ +object CommandUtils extends Logging { + private[spark] def buildCommandSeq(command: Command, memory: Int, sparkHome: String): Seq[String] = { + val runner = getEnv("JAVA_HOME", command).map(_ + "/bin/java").getOrElse("java") + + // SPARK-698: do not call the run.cmd script, as process.destroy() + // fails to kill a process tree on Windows + Seq(runner) ++ buildJavaOpts(command, memory, sparkHome) ++ Seq(command.mainClass) ++ + command.arguments + } + + private def getEnv(key: String, command: Command): Option[String] = + command.environment.get(key).orElse(Option(getenv(key))) + + /** + * Attention: this must always be aligned with the environment variables in the run scripts and + * the way the JAVA_OPTS are assembled there. + */ + def buildJavaOpts(command: Command, memory: Int, sparkHome: String): Seq[String] = { + val libraryOpts = getEnv("SPARK_LIBRARY_PATH", command) + .map(p => List("-Djava.library.path=" + p)) + .getOrElse(Nil) + val workerLocalOpts = Option(getenv("SPARK_JAVA_OPTS")).map(Utils.splitCommandString).getOrElse(Nil) + val userOpts = getEnv("SPARK_JAVA_OPTS", command).map(Utils.splitCommandString).getOrElse(Nil) + val memoryOpts = Seq(s"-Xms${memory}M", s"-Xmx${memory}M") + + // Figure out our classpath with the external compute-classpath script + val ext = if (System.getProperty("os.name").startsWith("Windows")) ".cmd" else ".sh" + val classPath = Utils.executeAndGetOutput( + Seq(sparkHome + "/bin/compute-classpath" + ext), + extraEnvironment=command.environment) + + Seq("-cp", classPath) ++ libraryOpts ++ workerLocalOpts ++ userOpts ++ memoryOpts + } + + /** Spawn a thread that will redirect a given stream to a file */ + def redirectStream(in: InputStream, file: File) { + val out = new FileOutputStream(file, true) + // TODO: It would be nice to add a shutdown hook here that explains why the output is + // terminating. Otherwise if the worker dies the executor logs will silently stop. + new Thread("redirect output to " + file) { + override def run() { + try { + Utils.copyStream(in, out, true) + } catch { + case e: IOException => + logInfo("Redirection to " + file + " closed: " + e.getMessage) + } + } + }.start() + } +} http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/35f6dc25/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala index 402ad53..8950fb7 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala @@ -19,6 +19,8 @@ package org.apache.spark.deploy.worker import java.io._ +import scala.collection.mutable.Map + import akka.actor.ActorRef import com.google.common.base.Charsets import com.google.common.io.Files @@ -26,10 +28,9 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileUtil, Path} import org.apache.spark.Logging -import org.apache.spark.deploy.DriverDescription +import org.apache.spark.deploy.{Command, DriverDescription} import org.apache.spark.deploy.DeployMessages.DriverStateChanged import org.apache.spark.deploy.master.DriverState -import org.apache.spark.util.Utils /** * Manages the execution of one driver, including automatically restarting the driver on failure. @@ -37,8 +38,10 @@ import org.apache.spark.util.Utils private[spark] class DriverRunner( val driverId: String, val workDir: File, + val sparkHome: File, val driverDesc: DriverDescription, - val worker: ActorRef) + val worker: ActorRef, + val workerUrl: String) extends Logging { @volatile var process: Option[Process] = None @@ -53,9 +56,17 @@ private[spark] class DriverRunner( try { val driverDir = createWorkingDirectory() val localJarFilename = downloadUserJar(driverDir) - val command = Seq("java") ++ driverDesc.javaOptions ++ Seq(s"-Xmx${driverDesc.mem}m") ++ - Seq("-cp", localJarFilename) ++ Seq(driverDesc.mainClass) ++ driverDesc.options - runCommandWithRetry(command, driverDesc.envVars, driverDir) + + // Make sure user application jar is on the classpath + // TODO: This could eventually exploit ability for driver to add jars + val env = Map(driverDesc.command.environment.toSeq: _*) + env("SPARK_CLASSPATH") = env.getOrElse("SPARK_CLASSPATH", "") + s":$localJarFilename" + val newCommand = Command(driverDesc.command.mainClass, + driverDesc.command.arguments.map(substituteVariables), env) + + val command = CommandUtils.buildCommandSeq(newCommand, driverDesc.mem, + sparkHome.getAbsolutePath) + runCommandWithRetry(command, env, driverDir) } catch { case e: Exception => exn = Some(e) @@ -79,26 +90,17 @@ private[spark] class DriverRunner( } } - /** Spawn a thread that will redirect a given stream to a file */ - def redirectStream(in: InputStream, file: File) { - val out = new FileOutputStream(file, true) - new Thread("redirect output to " + file) { - override def run() { - try { - Utils.copyStream(in, out, true) - } catch { - case e: IOException => - logInfo("Redirection to " + file + " closed: " + e.getMessage) - } - } - }.start() + /** Replace variables in a command argument passed to us */ + private def substituteVariables(argument: String): String = argument match { + case "{{WORKER_URL}}" => workerUrl + case other => other } /** * Creates the working directory for this driver. * Will throw an exception if there are errors preparing the directory. */ - def createWorkingDirectory(): File = { + private def createWorkingDirectory(): File = { val driverDir = new File(workDir, driverId) if (!driverDir.exists() && !driverDir.mkdirs()) { throw new IOException("Failed to create directory " + driverDir) @@ -110,7 +112,7 @@ private[spark] class DriverRunner( * Download the user jar into the supplied directory and return its local path. * Will throw an exception if there are errors downloading the jar. */ - def downloadUserJar(driverDir: File): String = { + private def downloadUserJar(driverDir: File): String = { val jarPath = new Path(driverDesc.jarUrl) @@ -136,7 +138,7 @@ private[spark] class DriverRunner( } /** Continue launching the supplied command until it exits zero or is killed. */ - def runCommandWithRetry(command: Seq[String], envVars: Seq[(String, String)], baseDir: File) { + private def runCommandWithRetry(command: Seq[String], envVars: Map[String, String], baseDir: File) { // Time to wait between submission retries. var waitSeconds = 1 var cleanExit = false @@ -153,13 +155,13 @@ private[spark] class DriverRunner( // Redirect stdout and stderr to files val stdout = new File(baseDir, "stdout") - redirectStream(process.get.getInputStream, stdout) + CommandUtils.redirectStream(process.get.getInputStream, stdout) val stderr = new File(baseDir, "stderr") val header = "Launch Command: %s\n%s\n\n".format( command.mkString("\"", "\" \"", "\""), "=" * 40) Files.write(header, stderr, Charsets.UTF_8) - redirectStream(process.get.getErrorStream, stderr) + CommandUtils.redirectStream(process.get.getErrorStream, stderr) } val exitCode = process.get.waitFor() http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/35f6dc25/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala new file mode 100644 index 0000000..8c13b10 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala @@ -0,0 +1,30 @@ +package org.apache.spark.deploy.worker + +import akka.actor._ + +import org.apache.spark.util.{AkkaUtils, Utils} + +/** + * Utility object for launching driver programs such that they share fate with the Worker process. + */ +object DriverWrapper { + def main(args: Array[String]) { + args.toList match { + case workerUrl :: mainClass :: extraArgs => + val (actorSystem, boundPort) = AkkaUtils.createActorSystem("Driver", + Utils.localHostName(), 0) + actorSystem.actorOf(Props(classOf[WorkerWatcher], workerUrl), name = "workerWatcher") + + // Delegate to supplied main class + val clazz = Class.forName(args(1)) + val mainMethod = clazz.getMethod("main", classOf[Array[String]]) + mainMethod.invoke(null, extraArgs.toArray[String]) + + actorSystem.awaitTermination() + + case _ => + System.err.println("Usage: DriverWrapper [options]") + System.exit(-1) + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/35f6dc25/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala index fff9cb6..2e61d39 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala @@ -18,17 +18,15 @@ package org.apache.spark.deploy.worker import java.io._ -import java.lang.System.getenv import akka.actor.ActorRef import com.google.common.base.Charsets import com.google.common.io.Files -import org.apache.spark.{Logging} -import org.apache.spark.deploy.{ExecutorState, ApplicationDescription} +import org.apache.spark.Logging +import org.apache.spark.deploy.{ExecutorState, ApplicationDescription, Command} import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged -import org.apache.spark.util.Utils /** * Manages the execution of one executor process. @@ -44,16 +42,17 @@ private[spark] class ExecutorRunner( val host: String, val sparkHome: File, val workDir: File, + val workerUrl: String, var state: ExecutorState.Value) extends Logging { val fullId = appId + "/" + execId var workerThread: Thread = null var process: Process = null - var shutdownHook: Thread = null - private def getAppEnv(key: String): Option[String] = - appDesc.command.environment.get(key).orElse(Option(getenv(key))) + // NOTE: This is now redundant with the automated shut-down enforced by the Executor. It mike + // make sense to remove this in the future. + var shutdownHook: Thread = null def start() { workerThread = new Thread("ExecutorRunner for " + fullId) { @@ -92,57 +91,13 @@ private[spark] class ExecutorRunner( /** Replace variables such as {{EXECUTOR_ID}} and {{CORES}} in a command argument passed to us */ def substituteVariables(argument: String): String = argument match { + case "{{WORKER_URL}}" => workerUrl case "{{EXECUTOR_ID}}" => execId.toString case "{{HOSTNAME}}" => host case "{{CORES}}" => cores.toString case other => other } - def buildCommandSeq(): Seq[String] = { - val command = appDesc.command - val runner = getAppEnv("JAVA_HOME").map(_ + "/bin/java").getOrElse("java") - // SPARK-698: do not call the run.cmd script, as process.destroy() - // fails to kill a process tree on Windows - Seq(runner) ++ buildJavaOpts() ++ Seq(command.mainClass) ++ - (command.arguments ++ Seq(appId)).map(substituteVariables) - } - - /** - * Attention: this must always be aligned with the environment variables in the run scripts and - * the way the JAVA_OPTS are assembled there. - */ - def buildJavaOpts(): Seq[String] = { - val libraryOpts = getAppEnv("SPARK_LIBRARY_PATH") - .map(p => List("-Djava.library.path=" + p)) - .getOrElse(Nil) - val workerLocalOpts = Option(getenv("SPARK_JAVA_OPTS")).map(Utils.splitCommandString).getOrElse(Nil) - val userOpts = getAppEnv("SPARK_JAVA_OPTS").map(Utils.splitCommandString).getOrElse(Nil) - val memoryOpts = Seq("-Xms" + memory + "M", "-Xmx" + memory + "M") - - // Figure out our classpath with the external compute-classpath script - val ext = if (System.getProperty("os.name").startsWith("Windows")) ".cmd" else ".sh" - val classPath = Utils.executeAndGetOutput( - Seq(sparkHome + "/bin/compute-classpath" + ext), - extraEnvironment=appDesc.command.environment) - - Seq("-cp", classPath) ++ libraryOpts ++ workerLocalOpts ++ userOpts ++ memoryOpts - } - - /** Spawn a thread that will redirect a given stream to a file */ - def redirectStream(in: InputStream, file: File) { - val out = new FileOutputStream(file, true) - new Thread("redirect output to " + file) { - override def run() { - try { - Utils.copyStream(in, out, true) - } catch { - case e: IOException => - logInfo("Redirection to " + file + " closed: " + e.getMessage) - } - } - }.start() - } - /** * Download and run the executor described in our ApplicationDescription */ @@ -155,7 +110,9 @@ private[spark] class ExecutorRunner( } // Launch the process - val command = buildCommandSeq() + val fullCommand = new Command(appDesc.command.mainClass, + appDesc.command.arguments.map(substituteVariables), appDesc.command.environment) + val command = CommandUtils.buildCommandSeq(fullCommand, memory, sparkHome.getAbsolutePath) logInfo("Launch command: " + command.mkString("\"", "\" \"", "\"")) val builder = new ProcessBuilder(command: _*).directory(executorDir) val env = builder.environment() @@ -172,11 +129,11 @@ private[spark] class ExecutorRunner( // Redirect its stdout and stderr to files val stdout = new File(executorDir, "stdout") - redirectStream(process.getInputStream, stdout) + CommandUtils.redirectStream(process.getInputStream, stdout) val stderr = new File(executorDir, "stderr") Files.write(header, stderr, Charsets.UTF_8) - redirectStream(process.getErrorStream, stderr) + CommandUtils.redirectStream(process.getErrorStream, stderr) // Wait for it to exit; this is actually a bad thing if it happens, because we expect to run // long-lived processes only. However, in the future, we might restart the executor a few http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/35f6dc25/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 21ec881..4e23e0d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -45,6 +45,8 @@ private[spark] class Worker( cores: Int, memory: Int, masterUrls: Array[String], + actorSystemName: String, + actorName: String, workDirPath: String = null) extends Actor with Logging { import context.dispatcher @@ -68,6 +70,7 @@ private[spark] class Worker( var masterAddress: Address = null var activeMasterUrl: String = "" var activeMasterWebUiUrl : String = "" + val akkaUrl = "akka.tcp://%s@%s:%s/user/%s".format(actorSystemName, host, port, actorName) @volatile var registered = false @volatile var connected = false val workerId = generateWorkerId() @@ -190,6 +193,9 @@ private[spark] class Worker( map(e => new ExecutorDescription(e.appId, e.execId, e.cores, e.state)) sender ! WorkerSchedulerStateResponse(workerId, execs.toList, drivers.keys.toSeq) + case Heartbeat => + logInfo(s"Received heartbeat from driver ${sender.path}") + case RegisterWorkerFailed(message) => if (!registered) { logError("Worker registration failed: " + message) @@ -202,7 +208,7 @@ private[spark] class Worker( } else { logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name)) val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_, - self, workerId, host, new File(execSparkHome_), workDir, ExecutorState.RUNNING) + self, workerId, host, new File(execSparkHome_), workDir, akkaUrl, ExecutorState.RUNNING) executors(appId + "/" + execId) = manager manager.start() coresUsed += cores_ @@ -244,7 +250,7 @@ private[spark] class Worker( case LaunchDriver(driverId, driverDesc) => { logInfo(s"Asked to launch driver $driverId") - val driver = new DriverRunner(driverId, workDir, driverDesc, self) + val driver = new DriverRunner(driverId, workDir, sparkHome, driverDesc, self, akkaUrl) drivers(driverId) = driver driver.start() @@ -322,9 +328,10 @@ private[spark] object Worker { : (ActorSystem, Int) = { // The LocalSparkCluster runs multiple local sparkWorkerX actor systems val systemName = "sparkWorker" + workerNumber.map(_.toString).getOrElse("") + val actorName = "Worker" val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port) actorSystem.actorOf(Props(classOf[Worker], host, boundPort, webUiPort, cores, memory, - masterUrls, workDir), name = "Worker") + masterUrls, systemName, actorName, workDir), name = actorName) (actorSystem, boundPort) } http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/35f6dc25/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala new file mode 100644 index 0000000..e4352f1 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala @@ -0,0 +1,47 @@ +package org.apache.spark.deploy.worker + +import akka.actor.{Actor, Address, AddressFromURIString} +import akka.remote.{AssociatedEvent, AssociationErrorEvent, AssociationEvent, DisassociatedEvent, RemotingLifecycleEvent} + +import org.apache.spark.Logging +import org.apache.spark.deploy.DeployMessages.SendHeartbeat + +/** + * Actor which connects to a worker process and terminates the JVM if the connection is severed. + * Provides fate sharing between a worker and its associated child processes. + */ +private[spark] class WorkerWatcher(workerUrl: String) extends Actor with Logging { + override def preStart() { + context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) + + logInfo(s"Connecting to worker $workerUrl") + val worker = context.actorSelection(workerUrl) + worker ! SendHeartbeat // need to send a message here to initiate connection + } + + // Lets us filter events only from the worker actor + private val expectedHostPort = AddressFromURIString(workerUrl).hostPort + private def isWorker(address: Address) = address.hostPort == expectedHostPort + + override def receive = { + case AssociatedEvent(localAddress, remoteAddress, inbound) if isWorker(remoteAddress) => + logInfo(s"Successfully connected to $workerUrl") + + case AssociationErrorEvent(cause, localAddress, remoteAddress, inbound) + if isWorker(remoteAddress) => + // These logs may not be seen if the worker (and associated pipe) has died + logError(s"Could not initialize connection to worker $workerUrl. Exiting.") + logError(s"Error was: $cause") + System.exit(-1) + + case DisassociatedEvent(localAddress, remoteAddress, inbound) if isWorker(remoteAddress) => + // This log message will never be seen + logError(s"Lost connection to worker actor $workerUrl. Exiting.") + System.exit(-1) + + case e: AssociationEvent => + // pass through association events relating to other remote actor systems + + case e => logWarning(s"Received unexpected actor system event: $e") + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/35f6dc25/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala index 35a1507..93c6ad4 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala @@ -133,7 +133,7 @@ private[spark] class IndexPage(parent: WorkerWebUI) { def driverRow(driver: DriverRunner): Seq[Node] = { {driver.driverId} - {driver.driverDesc.mainClass} + {driver.driverDesc.command.mainClass} {driver.driverDesc.cores.toString} http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/35f6dc25/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index debbdd4..eb1199e 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -24,8 +24,9 @@ import akka.remote._ import org.apache.spark.Logging import org.apache.spark.TaskState.TaskState +import org.apache.spark.deploy.worker.WorkerWatcher import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ -import org.apache.spark.util.{Utils, AkkaUtils} +import org.apache.spark.util.{AkkaUtils, Utils} private[spark] class CoarseGrainedExecutorBackend( driverUrl: String, @@ -91,7 +92,8 @@ private[spark] class CoarseGrainedExecutorBackend( } private[spark] object CoarseGrainedExecutorBackend { - def run(driverUrl: String, executorId: String, hostname: String, cores: Int) { + def run(driverUrl: String, executorId: String, hostname: String, cores: Int, + workerUrl: Option[String]) { // Debug code Utils.checkHost(hostname) @@ -105,17 +107,24 @@ private[spark] object CoarseGrainedExecutorBackend { actorSystem.actorOf( Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId, sparkHostPort, cores), name = "Executor") + workerUrl.foreach{ url => + actorSystem.actorOf(Props(classOf[WorkerWatcher], url), name = "WorkerWatcher") + } actorSystem.awaitTermination() } def main(args: Array[String]) { - if (args.length < 4) { - //the reason we allow the last appid argument is to make it easy to kill rogue executors - System.err.println( - "Usage: CoarseGrainedExecutorBackend " + - "[]") - System.exit(1) + args.length match { + case x if x < 4 => + System.err.println( + // Worker url is used in spark standalone mode to enforce fate-sharing with worker + "Usage: CoarseGrainedExecutorBackend " + + " []") + System.exit(1) + case 4 => + run(args(0), args(1), args(2), args(3).toInt, None) + case x if x > 4 => + run(args(0), args(1), args(2), args(3).toInt, Some(args(4))) } - run(args(0), args(1), args(2), args(3).toInt) } } http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/35f6dc25/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index 4da49c0..921b887 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -47,7 +47,7 @@ private[spark] class SparkDeploySchedulerBackend( val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format( System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"), CoarseGrainedSchedulerBackend.ACTOR_NAME) - val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}") + val args = Seq(driverUrl, "{{WORKER_URL}}", "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}") val command = Command( "org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs) val sparkHome = sc.getSparkHome().getOrElse(null) http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/35f6dc25/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala index 485f688..372c9f4 100644 --- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala @@ -80,11 +80,14 @@ class JsonProtocolSuite extends FunSuite { new ApplicationInfo(3, "id", createAppDesc(), new Date(123456789), null, "appUriStr") } - def createDriverDesc() = new DriverDescription( - "hdfs://some-dir/some.jar", "org.apache.spark.FakeClass", 100, 3, - Seq("--some-config", "val", "--other-config", "val"), Seq("-Dsystem.property=someValue"), - Seq(("K1", "V1"), ("K2", "V2")) + def createDriverCommand() = new Command( + "org.apache.spark.FakeClass", Seq("some arg --and-some options -g foo"), + Map(("K1", "V1"), ("K2", "V2")) ) + + def createDriverDesc() = new DriverDescription("hdfs://some-dir/some.jar", 100, 3, + createDriverCommand()) + def createDriverInfo(): DriverInfo = new DriverInfo(3, "driver-3", createDriverDesc(), new Date()) def createWorkerInfo(): WorkerInfo = { @@ -92,10 +95,11 @@ class JsonProtocolSuite extends FunSuite { } def createExecutorRunner(): ExecutorRunner = { new ExecutorRunner("appId", 123, createAppDesc(), 4, 1234, null, "workerId", "host", - new File("sparkHome"), new File("workDir"), ExecutorState.RUNNING) + new File("sparkHome"), new File("workDir"), "akka://worker", ExecutorState.RUNNING) } def createDriverRunner(): DriverRunner = { - new DriverRunner("driverId", new File("workDir"), createDriverDesc(), null) + new DriverRunner("driverId", new File("workDir"), new File("sparkHome"), createDriverDesc(), + null, "akka://worker") } def assertValidJson(json: JValue) {