spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tgra...@apache.org
Subject git commit: [SPARK-1395] Allow "local:" URIs to work on Yarn.
Date Thu, 17 Apr 2014 15:31:17 GMT
Repository: spark
Updated Branches:
  refs/heads/master bb76eae1b -> 69047506b


[SPARK-1395] Allow "local:" URIs to work on Yarn.

This only works for the three paths defined in the environment
(SPARK_JAR, SPARK_YARN_APP_JAR and SPARK_LOG4J_CONF).

Tested by running SparkPi with local: and file: URIs against Yarn cluster (no "upload" shows
up in logs in the local case).

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #303 from vanzin/yarn-local and squashes the following commits:

82219c1 [Marcelo Vanzin] [SPARK-1395] Allow "local:" URIs to work on Yarn.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/69047506
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/69047506
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/69047506

Branch: refs/heads/master
Commit: 69047506bf97e6e37e4079c87cb0327d3760ac41
Parents: bb76eae
Author: Marcelo Vanzin <vanzin@cloudera.com>
Authored: Thu Apr 17 10:29:38 2014 -0500
Committer: Thomas Graves <tgraves@apache.org>
Committed: Thu Apr 17 10:29:38 2014 -0500

----------------------------------------------------------------------
 .../org/apache/spark/deploy/SparkSubmit.scala   |   4 +-
 .../spark/deploy/yarn/ExecutorRunnable.scala    |   2 +-
 .../apache/spark/deploy/yarn/ClientBase.scala   | 190 +++++++++++++------
 .../deploy/yarn/ExecutorRunnableUtil.scala      |  17 +-
 .../spark/deploy/yarn/YarnSparkHadoopUtil.scala |   6 +-
 .../spark/deploy/yarn/ExecutorRunnable.scala    |   2 +-
 6 files changed, 142 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/69047506/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index e05fbfe..e5d593c 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.deploy
 
 import java.io.{PrintStream, File}
-import java.net.URL
+import java.net.{URI, URL}
 
 import org.apache.spark.executor.ExecutorURLClassLoader
 
@@ -216,7 +216,7 @@ object SparkSubmit {
   }
 
   private def addJarToClasspath(localJar: String, loader: ExecutorURLClassLoader) {
-    val localJarFile = new File(localJar)
+    val localJarFile = new File(new URI(localJar).getPath())
     if (!localJarFile.exists()) {
       printWarning(s"Jar $localJar does not exist, skipping.")
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/69047506/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
----------------------------------------------------------------------
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
index 3469b7d..7dae248 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
@@ -82,7 +82,7 @@ class ExecutorRunnable(
     ctx.setContainerTokens(ByteBuffer.wrap(dob.getData()))
 
     val commands = prepareCommand(masterAddress, slaveId, hostname, executorMemory, executorCores,
-      localResources.contains(ClientBase.LOG4J_PROP))
+      localResources)
     logInfo("Setting up executor with commands: " + commands)
     ctx.setCommands(commands)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/69047506/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
----------------------------------------------------------------------
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
index 628dd98..566de71 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.deploy.yarn
 
 import java.io.File
-import java.net.{InetAddress, UnknownHostException, URI}
+import java.net.{InetAddress, UnknownHostException, URI, URISyntaxException}
 import java.nio.ByteBuffer
 
 import scala.collection.JavaConversions._
@@ -209,53 +209,35 @@ trait ClientBase extends Logging {
 
     Map(
       ClientBase.SPARK_JAR -> System.getenv("SPARK_JAR"), ClientBase.APP_JAR -> args.userJar,
-      ClientBase.LOG4J_PROP -> System.getenv("SPARK_LOG4J_CONF")
+      ClientBase.LOG4J_PROP -> System.getenv(ClientBase.LOG4J_CONF_ENV_KEY)
     ).foreach { case(destName, _localPath) =>
       val localPath: String = if (_localPath != null) _localPath.trim() else ""
       if (! localPath.isEmpty()) {
         val localURI = new URI(localPath)
-        val setPermissions = if (destName.equals(ClientBase.APP_JAR)) true else false
-        val destPath = copyRemoteFile(dst, qualifyForLocal(localURI), replication, setPermissions)
-        distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE,
-          destName, statCache)
+        if (!ClientBase.LOCAL_SCHEME.equals(localURI.getScheme())) {
+          val setPermissions = if (destName.equals(ClientBase.APP_JAR)) true else false
+          val destPath = copyRemoteFile(dst, qualifyForLocal(localURI), replication, setPermissions)
+          distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE,
+            destName, statCache)
+        }
       }
     }
 
-    // Handle jars local to the ApplicationMaster.
-    if ((args.addJars != null) && (!args.addJars.isEmpty())){
-      args.addJars.split(',').foreach { case file: String =>
-        val localURI = new URI(file.trim())
-        val localPath = new Path(localURI)
-        val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName())
-        val destPath = copyRemoteFile(dst, localPath, replication)
-        // Only add the resource to the Spark ApplicationMaster.
-        val appMasterOnly = true
-        distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE,
-          linkname, statCache, appMasterOnly)
-      }
-    }
-
-    // Handle any distributed cache files
-    if ((args.files != null) && (!args.files.isEmpty())){
-      args.files.split(',').foreach { case file: String =>
-        val localURI = new URI(file.trim())
-        val localPath = new Path(localURI)
-        val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName())
-        val destPath = copyRemoteFile(dst, localPath, replication)
-        distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE,
-          linkname, statCache)
-      }
-    }
-
-    // Handle any distributed cache archives
-    if ((args.archives != null) && (!args.archives.isEmpty())) {
-      args.archives.split(',').foreach { case file:String =>
-        val localURI = new URI(file.trim())
-        val localPath = new Path(localURI)
-        val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName())
-        val destPath = copyRemoteFile(dst, localPath, replication)
-        distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.ARCHIVE,
-          linkname, statCache)
+    val fileLists = List( (args.addJars, LocalResourceType.FILE, true),
+      (args.files, LocalResourceType.FILE, false),
+      (args.archives, LocalResourceType.ARCHIVE, false) )
+    fileLists.foreach { case (flist, resType, appMasterOnly) =>
+      if (flist != null && !flist.isEmpty()) {
+        flist.split(',').foreach { case file: String =>
+          val localURI = new URI(file.trim())
+          if (!ClientBase.LOCAL_SCHEME.equals(localURI.getScheme())) {
+            val localPath = new Path(localURI)
+            val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName())
+            val destPath = copyRemoteFile(dst, localPath, replication)
+            distCacheMgr.addResource(fs, conf, destPath, localResources, resType,
+              linkname, statCache, appMasterOnly)
+          }
+        }
       }
     }
 
@@ -269,12 +251,14 @@ trait ClientBase extends Logging {
     logInfo("Setting up the launch environment")
 
     val env = new HashMap[String, String]()
-
-    ClientBase.populateClasspath(yarnConf, sparkConf, localResources.contains(ClientBase.LOG4J_PROP),
-      env)
+    val log4jConf = System.getenv(ClientBase.LOG4J_CONF_ENV_KEY)
+    ClientBase.populateClasspath(args, yarnConf, sparkConf, log4jConf, env)
     env("SPARK_YARN_MODE") = "true"
     env("SPARK_YARN_STAGING_DIR") = stagingDir
     env("SPARK_USER") = UserGroupInformation.getCurrentUser().getShortUserName()
+    if (log4jConf != null) {
+      env(ClientBase.LOG4J_CONF_ENV_KEY) = log4jConf
+    }
 
     // Set the environment variables to be passed on to the executors.
     distCacheMgr.setDistFilesEnv(env)
@@ -345,10 +329,7 @@ trait ClientBase extends Logging {
     if (env.isDefinedAt("SPARK_JAVA_OPTS")) {
       JAVA_OPTS += " " + env("SPARK_JAVA_OPTS")
     }
-
-    if (!localResources.contains(ClientBase.LOG4J_PROP)) {
-      JAVA_OPTS += " " + YarnSparkHadoopUtil.getLoggingArgsForContainerCommandLine()
-    }
+    JAVA_OPTS += ClientBase.getLog4jConfiguration(localResources)
 
     // Command for the ApplicationMaster
     val commands = List[String](
@@ -377,6 +358,8 @@ object ClientBase {
   val SPARK_JAR: String = "spark.jar"
   val APP_JAR: String = "app.jar"
   val LOG4J_PROP: String = "log4j.properties"
+  val LOG4J_CONF_ENV_KEY: String = "SPARK_LOG4J_CONF"
+  val LOCAL_SCHEME = "local"
 
   // Based on code from org.apache.hadoop.mapreduce.v2.util.MRApps
   def populateHadoopClasspath(conf: Configuration, env: HashMap[String, String]) {
@@ -428,30 +411,113 @@ object ClientBase {
     }
   }
 
-  def populateClasspath(conf: Configuration, sparkConf: SparkConf, addLog4j: Boolean, env:
HashMap[String, String]) {
+  /**
+   * Returns the java command line argument for setting up log4j. If there is a log4j.properties
+   * in the given local resources, it is used, otherwise the SPARK_LOG4J_CONF environment
variable
+   * is checked.
+   */
+  def getLog4jConfiguration(localResources: HashMap[String, LocalResource]): String = {
+    var log4jConf = LOG4J_PROP
+    if (!localResources.contains(log4jConf)) {
+      log4jConf = System.getenv(LOG4J_CONF_ENV_KEY) match {
+        case conf: String =>
+          val confUri = new URI(conf)
+          if (ClientBase.LOCAL_SCHEME.equals(confUri.getScheme())) {
+            "file://" + confUri.getPath()
+          } else {
+            ClientBase.LOG4J_PROP
+          }
+        case null => "log4j-spark-container.properties"
+      }
+    }
+    " -Dlog4j.configuration=" + log4jConf
+  }
+
+  def populateClasspath(args: ClientArguments, conf: Configuration, sparkConf: SparkConf,
+      log4jConf: String, env: HashMap[String, String]) {
     YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$(),
       File.pathSeparator)
-    // If log4j present, ensure ours overrides all others
-    if (addLog4j) {
-      YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$()
+
-        Path.SEPARATOR + LOG4J_PROP, File.pathSeparator)
+    if (log4jConf != null) {
+      // If a custom log4j config file is provided as a local: URI, add its parent directory
to the
+      // classpath. Note that this only works if the custom config's file name is
+      // "log4j.properties".
+      val localPath = getLocalPath(log4jConf)
+      if (localPath != null) {
+        val parentPath = new File(localPath).getParent()
+        YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, parentPath,
+          File.pathSeparator)
+      }
     }
     // Normally the users app.jar is last in case conflicts with spark jars
     val userClasspathFirst = sparkConf.get("spark.yarn.user.classpath.first", "false")
       .toBoolean
     if (userClasspathFirst) {
-      YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$()
+
-        Path.SEPARATOR + APP_JAR, File.pathSeparator)
+      addUserClasspath(args, env)
     }
-    YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$()
+
-      Path.SEPARATOR + SPARK_JAR, File.pathSeparator)
+    addClasspathEntry(System.getenv("SPARK_JAR"), SPARK_JAR, env);
     ClientBase.populateHadoopClasspath(conf, env)
-
     if (!userClasspathFirst) {
-      YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$()
+
-        Path.SEPARATOR + APP_JAR, File.pathSeparator)
+      addUserClasspath(args, env)
+    }
+    YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name,
+      Environment.PWD.$() + Path.SEPARATOR + "*", File.pathSeparator)
+  }
+
+  /**
+   * Adds the user jars which have local: URIs (or alternate names, such as APP_JAR) explicitly
+   * to the classpath.
+   */
+  private def addUserClasspath(args: ClientArguments, env: HashMap[String, String]) = {
+    if (args != null) {
+      addClasspathEntry(args.userJar, APP_JAR, env)
+    }
+
+    if (args != null && args.addJars != null) {
+      args.addJars.split(",").foreach { case file: String =>
+        addClasspathEntry(file, null, env)
+      }
+    }
+  }
+
+  /**
+   * Adds the given path to the classpath, handling "local:" URIs correctly.
+   *
+   * If an alternate name for the file is given, and it's not a "local:" file, the alternate
+   * name will be added to the classpath (relative to the job's work directory).
+   *
+   * If not a "local:" file and no alternate name, the environment is not modified.
+   *
+   * @param path      Path to add to classpath (optional).
+   * @param fileName  Alternate name for the file (optional).
+   * @param env       Map holding the environment variables.
+   */
+  private def addClasspathEntry(path: String, fileName: String,
+      env: HashMap[String, String]) : Unit = {
+    if (path != null) {
+      scala.util.control.Exception.ignoring(classOf[URISyntaxException]) {
+        val localPath = getLocalPath(path)
+        if (localPath != null) {
+          YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, localPath,
+            File.pathSeparator)
+          return
+        }
+      }
+    }
+    if (fileName != null) {
+      YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name,
+        Environment.PWD.$() + Path.SEPARATOR + fileName, File.pathSeparator);
+    }
+  }
+
+  /**
+   * Returns the local path if the URI is a "local:" URI, or null otherwise.
+   */
+  private def getLocalPath(resource: String): String = {
+    val uri = new URI(resource)
+    if (LOCAL_SCHEME.equals(uri.getScheme())) {
+      return uri.getPath()
     }
-    YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$()
+
-      Path.SEPARATOR + "*", File.pathSeparator)
+    null
   }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/69047506/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
----------------------------------------------------------------------
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
index 9159cc4..40b3866 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
@@ -52,7 +52,7 @@ trait ExecutorRunnableUtil extends Logging {
       hostname: String,
       executorMemory: Int,
       executorCores: Int,
-      userSpecifiedLogFile: Boolean) = {
+      localResources: HashMap[String, LocalResource]) = {
     // Extra options for the JVM
     var JAVA_OPTS = ""
     // Set the JVM memory
@@ -64,10 +64,7 @@ trait ExecutorRunnableUtil extends Logging {
 
     JAVA_OPTS += " -Djava.io.tmpdir=" +
       new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) + " "
-
-    if (!userSpecifiedLogFile) {
-      JAVA_OPTS += " " + YarnSparkHadoopUtil.getLoggingArgsForContainerCommandLine()
-    }
+    JAVA_OPTS += ClientBase.getLog4jConfiguration(localResources)
 
     // Commenting it out for now - so that people can refer to the properties if required.
Remove
     // it once cpuset version is pushed out.
@@ -120,7 +117,7 @@ trait ExecutorRunnableUtil extends Logging {
       rtype: LocalResourceType,
       localResources: HashMap[String, LocalResource],
       timestamp: String,
-      size: String, 
+      size: String,
       vis: String) = {
     val uri = new URI(file)
     val amJarRsrc = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource]
@@ -153,7 +150,7 @@ trait ExecutorRunnableUtil extends Logging {
       val distArchives = System.getenv("SPARK_YARN_CACHE_ARCHIVES").split(',')
       val visibilities = System.getenv("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES").split(',')
       for( i <- 0 to distArchives.length - 1) {
-        setupDistributedCache(distArchives(i), LocalResourceType.ARCHIVE, localResources,

+        setupDistributedCache(distArchives(i), LocalResourceType.ARCHIVE, localResources,
           timeStamps(i), fileSizes(i), visibilities(i))
       }
     }
@@ -165,7 +162,11 @@ trait ExecutorRunnableUtil extends Logging {
   def prepareEnvironment: HashMap[String, String] = {
     val env = new HashMap[String, String]()
 
-    ClientBase.populateClasspath(yarnConf, sparkConf, System.getenv("SPARK_YARN_LOG4J_PATH")
!= null, env)
+    val log4jConf = System.getenv(ClientBase.LOG4J_CONF_ENV_KEY)
+    ClientBase.populateClasspath(null, yarnConf, sparkConf, log4jConf, env)
+    if (log4jConf != null) {
+      env(ClientBase.LOG4J_CONF_ENV_KEY) = log4jConf
+    }
 
     // Allow users to specify some environment variables
     YarnSparkHadoopUtil.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV"),

http://git-wip-us.apache.org/repos/asf/spark/blob/69047506/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index 4ceed95..832d45b 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -54,7 +54,7 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
     jobCreds.mergeAll(UserGroupInformation.getCurrentUser().getCredentials())
   }
 
-  override def getCurrentUserCredentials(): Credentials = { 
+  override def getCurrentUserCredentials(): Credentials = {
     UserGroupInformation.getCurrentUser().getCredentials()
   }
 
@@ -76,10 +76,6 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
 }
 
 object YarnSparkHadoopUtil {
-  def getLoggingArgsForContainerCommandLine(): String = {
-    "-Dlog4j.configuration=log4j-spark-container.properties"
-  }
-
   def addToEnvironment(
       env: HashMap[String, String],
       variable: String,

http://git-wip-us.apache.org/repos/asf/spark/blob/69047506/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
----------------------------------------------------------------------
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
index 81d9d1b..117b33f 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
@@ -79,7 +79,7 @@ class ExecutorRunnable(
     ctx.setTokens(ByteBuffer.wrap(dob.getData()))
 
     val commands = prepareCommand(masterAddress, slaveId, hostname, executorMemory, executorCores,
-      localResources.contains(ClientBase.LOG4J_PROP))
+      localResources)
 
     logInfo("Setting up executor with commands: " + commands)
     ctx.setCommands(commands)


Mime
View raw message