spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pwend...@apache.org
Subject [24/37] git commit: Changes based on review feedback.
Date Fri, 10 Jan 2014 02:38:29 GMT
Changes based on review feedback.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/f236ddd1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/f236ddd1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/f236ddd1

Branch: refs/heads/master
Commit: f236ddd1a245a587d5ee331fb67cf41456ed383c
Parents: 7a99702
Author: Patrick Wendell <pwendell@gmail.com>
Authored: Thu Jan 2 18:10:37 2014 -0800
Committer: Patrick Wendell <pwendell@gmail.com>
Committed: Mon Jan 6 17:15:52 2014 -0800

----------------------------------------------------------------------
 .../spark/deploy/client/DriverClient.scala      |  2 +-
 .../deploy/client/DriverClientArguments.scala   | 26 ++++++++++++--------
 .../spark/deploy/worker/CommandUtils.scala      |  2 +-
 .../spark/deploy/worker/DriverRunner.scala      | 15 ++++++++---
 .../spark/deploy/worker/DriverWrapper.scala     |  2 +-
 .../org/apache/spark/deploy/worker/Worker.scala |  7 +++---
 .../cluster/SparkDeploySchedulerBackend.scala   |  2 +-
 7 files changed, 34 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f236ddd1/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala
index 8a4cdf0..e319e75 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala
@@ -62,7 +62,7 @@ object DriverClient extends Logging {
 
     // TODO: See if we can initialize akka so return messages are sent back using the same
TCP
     //       flow. Else, this (sadly) requires the DriverClient be routable from the Master.
-    val (actorSystem, boundPort) = AkkaUtils.createActorSystem(
+    val (actorSystem, _) = AkkaUtils.createActorSystem(
       "driverClient", Utils.localHostName(), 0)
     val master = driverArgs.master
     val response = promise[(Boolean, String)]

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f236ddd1/core/src/main/scala/org/apache/spark/deploy/client/DriverClientArguments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/DriverClientArguments.scala
b/core/src/main/scala/org/apache/spark/deploy/client/DriverClientArguments.scala
index 6a15422..d9e1c8a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/DriverClientArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/DriverClientArguments.scala
@@ -23,6 +23,9 @@ import scala.collection.mutable.ListBuffer
  * Command-line parser for the driver client.
  */
 private[spark] class DriverClientArguments(args: Array[String]) {
+  val defaultCores = 1
+  val defaultMemory = 512
+
   var cmd: String = "" // 'launch' or 'kill'
 
   // launch parameters
@@ -30,8 +33,8 @@ private[spark] class DriverClientArguments(args: Array[String]) {
   var jarUrl: String = ""
   var mainClass: String = ""
   var supervise: Boolean = false
-  var memory: Int = 512
-  var cores: Int = 1
+  var memory: Int = defaultMemory
+  var cores: Int = defaultCores
   private var _driverOptions = ListBuffer[String]()
   def driverOptions = _driverOptions.toSeq
 
@@ -78,14 +81,17 @@ private[spark] class DriverClientArguments(args: Array[String]) {
   def printUsageAndExit(exitCode: Int) {
     // TODO: It wouldn't be too hard to allow users to submit their app and dependency jars
     //       separately similar to in the YARN client.
-    System.err.println(
-      "usage: DriverClient [options] launch <active-master> <jar-url> <main-class>
" +
-        "[driver options]\n" +
-      "usage: DriverClient kill <active-master> <driver-id>\n\n" +
-      "Options:\n" +
-      "  -c CORES, --cores CORES                Number of cores to request \n" +
-      "  -m MEMORY, --memory MEMORY             Megabytes of memory to request\n" +
-      "  -s, --supervise                        Whether to restart the driver on failure\n")
+    val usage =
+      s"""
+        |Usage: DriverClient [options] launch <active-master> <jar-url> <main-class>
[driver options]
+        |Usage: DriverClient kill <active-master> <driver-id>
+        |
+        |Options:
+        |   -c CORES, --cores CORES        Number of cores to request (default: $defaultCores)
+        |   -m MEMORY, --memory MEMORY     Megabytes of memory to request (default: $defaultMemory)
+        |   -s, --supervise                Whether to restart the driver on failure
+      """.stripMargin
+    System.err.println(usage)
     System.exit(exitCode)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f236ddd1/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
index 785aecf..7507bf8 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
@@ -21,7 +21,7 @@ object CommandUtils extends Logging {
   }
 
   private def getEnv(key: String, command: Command): Option[String] =
-    command.environment.get(key).orElse(Option(getenv(key)))
+    command.environment.get(key).orElse(Option(System.getenv(key)))
 
   /**
    * Attention: this must always be aligned with the environment variables in the run scripts
and

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f236ddd1/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
index e8ae2d3..f726089 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
@@ -119,15 +119,14 @@ private[spark] class DriverRunner(
     val emptyConf = new Configuration() // TODO: In docs explain it needs to be full HDFS
path
     val jarFileSystem = jarPath.getFileSystem(emptyConf)
 
-    val destPath = new Path(driverDir.getAbsolutePath())
-    val destFileSystem = destPath.getFileSystem(emptyConf)
+    val destPath = new File(driverDir.getAbsolutePath, jarPath.getName)
     val jarFileName = jarPath.getName
     val localJarFile = new File(driverDir, jarFileName)
     val localJarFilename = localJarFile.getAbsolutePath
 
     if (!localJarFile.exists()) { // May already exist if running multiple workers on one
node
       logInfo(s"Copying user jar $jarPath to $destPath")
-      FileUtil.copy(jarFileSystem, jarPath, destFileSystem, destPath, false, false, emptyConf)
+      FileUtil.copy(jarFileSystem, jarPath, destPath, false, emptyConf)
     }
 
     if (!localJarFile.exists()) { // Verify copy succeeded
@@ -140,8 +139,12 @@ private[spark] class DriverRunner(
   /** Launch the supplied command. */
   private def runCommand(command: Seq[String], envVars: Map[String, String], baseDir: File,
       supervise: Boolean) {
+
     // Time to wait between submission retries.
     var waitSeconds = 1
+    // A run of this many seconds resets the exponential back-off.
+    val successfulRunDuration = 1
+
     var keepTrying = !killed
 
     while (keepTrying) {
@@ -161,11 +164,15 @@ private[spark] class DriverRunner(
         val stderr = new File(baseDir, "stderr")
         val header = "Launch Command: %s\n%s\n\n".format(
           command.mkString("\"", "\" \"", "\""), "=" * 40)
-        Files.write(header, stderr, Charsets.UTF_8)
+        Files.append(header, stderr, Charsets.UTF_8)
         CommandUtils.redirectStream(process.get.getErrorStream, stderr)
       }
 
+      val processStart = System.currentTimeMillis()
       val exitCode = process.get.waitFor()
+      if (System.currentTimeMillis() - processStart > successfulRunDuration * 1000) {
+        waitSeconds = 1
+      }
 
       if (supervise && exitCode != 0 && !killed) {
         waitSeconds = waitSeconds * 2 // exponential back-off

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f236ddd1/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
index 8c13b10..2deb21a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
@@ -11,7 +11,7 @@ object DriverWrapper {
   def main(args: Array[String]) {
     args.toList match {
       case workerUrl :: mainClass :: extraArgs =>
-        val (actorSystem, boundPort) = AkkaUtils.createActorSystem("Driver",
+        val (actorSystem, _) = AkkaUtils.createActorSystem("Driver",
           Utils.localHostName(), 0)
         actorSystem.actorOf(Props(classOf[WorkerWatcher], workerUrl), name = "workerWatcher")
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f236ddd1/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 4e23e0d..2947ed1 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -260,8 +260,8 @@ private[spark] class Worker(
 
     case KillDriver(driverId) => {
       logInfo(s"Asked to kill driver $driverId")
-      drivers.find(_._1 == driverId) match {
-        case Some((id, runner)) =>
+      drivers.get(driverId) match {
+        case Some(runner) =>
           runner.kill()
         case None =>
           logError(s"Asked to kill unknown driver $driverId")
@@ -280,8 +280,7 @@ private[spark] class Worker(
       masterLock.synchronized {
         master ! DriverStateChanged(driverId, state, exception)
       }
-      val driver = drivers(driverId)
-      drivers -= driverId
+      val driver = drivers.remove(driverId).get
       finishedDrivers(driverId) = driver
       memoryUsed -= driver.driverDesc.mem
       coresUsed -= driver.driverDesc.cores

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f236ddd1/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index 921b887..0615f7b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -47,7 +47,7 @@ private[spark] class SparkDeploySchedulerBackend(
     val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
       System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"),
       CoarseGrainedSchedulerBackend.ACTOR_NAME)
-    val args = Seq(driverUrl, "{{WORKER_URL}}", "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}")
+    val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}", "{{WORKER_URL}}")
     val command = Command(
       "org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs)
     val sparkHome = sc.getSparkHome().getOrElse(null)


Mime
View raw message