Return-Path: X-Original-To: apmail-spark-commits-archive@minotaur.apache.org Delivered-To: apmail-spark-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A9BA210F19 for ; Thu, 17 Apr 2014 15:31:19 +0000 (UTC) Received: (qmail 77284 invoked by uid 500); 17 Apr 2014 15:31:18 -0000 Delivered-To: apmail-spark-commits-archive@spark.apache.org Received: (qmail 77250 invoked by uid 500); 17 Apr 2014 15:31:18 -0000 Mailing-List: contact commits-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@spark.apache.org Delivered-To: mailing list commits@spark.apache.org Received: (qmail 77241 invoked by uid 99); 17 Apr 2014 15:31:18 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 17 Apr 2014 15:31:18 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id F1A2C951AE1; Thu, 17 Apr 2014 15:31:17 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: tgraves@apache.org To: commits@spark.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: git commit: [SPARK-1395] Allow "local:" URIs to work on Yarn. Date: Thu, 17 Apr 2014 15:31:17 +0000 (UTC) Repository: spark Updated Branches: refs/heads/master bb76eae1b -> 69047506b [SPARK-1395] Allow "local:" URIs to work on Yarn. This only works for the three paths defined in the environment (SPARK_JAR, SPARK_YARN_APP_JAR and SPARK_LOG4J_CONF). Tested by running SparkPi with local: and file: URIs against Yarn cluster (no "upload" shows up in logs in the local case). Author: Marcelo Vanzin Closes #303 from vanzin/yarn-local and squashes the following commits: 82219c1 [Marcelo Vanzin] [SPARK-1395] Allow "local:" URIs to work on Yarn. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/69047506 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/69047506 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/69047506 Branch: refs/heads/master Commit: 69047506bf97e6e37e4079c87cb0327d3760ac41 Parents: bb76eae Author: Marcelo Vanzin Authored: Thu Apr 17 10:29:38 2014 -0500 Committer: Thomas Graves Committed: Thu Apr 17 10:29:38 2014 -0500 ---------------------------------------------------------------------- .../org/apache/spark/deploy/SparkSubmit.scala | 4 +- .../spark/deploy/yarn/ExecutorRunnable.scala | 2 +- .../apache/spark/deploy/yarn/ClientBase.scala | 190 +++++++++++++------ .../deploy/yarn/ExecutorRunnableUtil.scala | 17 +- .../spark/deploy/yarn/YarnSparkHadoopUtil.scala | 6 +- .../spark/deploy/yarn/ExecutorRunnable.scala | 2 +- 6 files changed, 142 insertions(+), 79 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/69047506/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index e05fbfe..e5d593c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -18,7 +18,7 @@ package org.apache.spark.deploy import java.io.{PrintStream, File} -import java.net.URL +import java.net.{URI, URL} import org.apache.spark.executor.ExecutorURLClassLoader @@ -216,7 +216,7 @@ object SparkSubmit { } private def addJarToClasspath(localJar: String, loader: ExecutorURLClassLoader) { - val localJarFile = new File(localJar) + val localJarFile = new File(new URI(localJar).getPath()) if (!localJarFile.exists()) { printWarning(s"Jar $localJar does not exist, skipping.") } http://git-wip-us.apache.org/repos/asf/spark/blob/69047506/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala ---------------------------------------------------------------------- diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index 3469b7d..7dae248 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -82,7 +82,7 @@ class ExecutorRunnable( ctx.setContainerTokens(ByteBuffer.wrap(dob.getData())) val commands = prepareCommand(masterAddress, slaveId, hostname, executorMemory, executorCores, - localResources.contains(ClientBase.LOG4J_PROP)) + localResources) logInfo("Setting up executor with commands: " + commands) ctx.setCommands(commands) http://git-wip-us.apache.org/repos/asf/spark/blob/69047506/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala ---------------------------------------------------------------------- diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala index 628dd98..566de71 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala @@ -18,7 +18,7 @@ package org.apache.spark.deploy.yarn import java.io.File -import java.net.{InetAddress, UnknownHostException, URI} +import java.net.{InetAddress, UnknownHostException, URI, URISyntaxException} import java.nio.ByteBuffer import scala.collection.JavaConversions._ @@ -209,53 +209,35 @@ trait ClientBase extends Logging { Map( ClientBase.SPARK_JAR -> System.getenv("SPARK_JAR"), ClientBase.APP_JAR -> args.userJar, - ClientBase.LOG4J_PROP -> System.getenv("SPARK_LOG4J_CONF") + ClientBase.LOG4J_PROP -> System.getenv(ClientBase.LOG4J_CONF_ENV_KEY) ).foreach { case(destName, _localPath) => val localPath: String = if (_localPath != null) _localPath.trim() else "" if (! localPath.isEmpty()) { val localURI = new URI(localPath) - val setPermissions = if (destName.equals(ClientBase.APP_JAR)) true else false - val destPath = copyRemoteFile(dst, qualifyForLocal(localURI), replication, setPermissions) - distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE, - destName, statCache) + if (!ClientBase.LOCAL_SCHEME.equals(localURI.getScheme())) { + val setPermissions = if (destName.equals(ClientBase.APP_JAR)) true else false + val destPath = copyRemoteFile(dst, qualifyForLocal(localURI), replication, setPermissions) + distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE, + destName, statCache) + } } } - // Handle jars local to the ApplicationMaster. - if ((args.addJars != null) && (!args.addJars.isEmpty())){ - args.addJars.split(',').foreach { case file: String => - val localURI = new URI(file.trim()) - val localPath = new Path(localURI) - val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName()) - val destPath = copyRemoteFile(dst, localPath, replication) - // Only add the resource to the Spark ApplicationMaster. - val appMasterOnly = true - distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE, - linkname, statCache, appMasterOnly) - } - } - - // Handle any distributed cache files - if ((args.files != null) && (!args.files.isEmpty())){ - args.files.split(',').foreach { case file: String => - val localURI = new URI(file.trim()) - val localPath = new Path(localURI) - val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName()) - val destPath = copyRemoteFile(dst, localPath, replication) - distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE, - linkname, statCache) - } - } - - // Handle any distributed cache archives - if ((args.archives != null) && (!args.archives.isEmpty())) { - args.archives.split(',').foreach { case file:String => - val localURI = new URI(file.trim()) - val localPath = new Path(localURI) - val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName()) - val destPath = copyRemoteFile(dst, localPath, replication) - distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.ARCHIVE, - linkname, statCache) + val fileLists = List( (args.addJars, LocalResourceType.FILE, true), + (args.files, LocalResourceType.FILE, false), + (args.archives, LocalResourceType.ARCHIVE, false) ) + fileLists.foreach { case (flist, resType, appMasterOnly) => + if (flist != null && !flist.isEmpty()) { + flist.split(',').foreach { case file: String => + val localURI = new URI(file.trim()) + if (!ClientBase.LOCAL_SCHEME.equals(localURI.getScheme())) { + val localPath = new Path(localURI) + val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName()) + val destPath = copyRemoteFile(dst, localPath, replication) + distCacheMgr.addResource(fs, conf, destPath, localResources, resType, + linkname, statCache, appMasterOnly) + } + } } } @@ -269,12 +251,14 @@ trait ClientBase extends Logging { logInfo("Setting up the launch environment") val env = new HashMap[String, String]() - - ClientBase.populateClasspath(yarnConf, sparkConf, localResources.contains(ClientBase.LOG4J_PROP), - env) + val log4jConf = System.getenv(ClientBase.LOG4J_CONF_ENV_KEY) + ClientBase.populateClasspath(args, yarnConf, sparkConf, log4jConf, env) env("SPARK_YARN_MODE") = "true" env("SPARK_YARN_STAGING_DIR") = stagingDir env("SPARK_USER") = UserGroupInformation.getCurrentUser().getShortUserName() + if (log4jConf != null) { + env(ClientBase.LOG4J_CONF_ENV_KEY) = log4jConf + } // Set the environment variables to be passed on to the executors. distCacheMgr.setDistFilesEnv(env) @@ -345,10 +329,7 @@ trait ClientBase extends Logging { if (env.isDefinedAt("SPARK_JAVA_OPTS")) { JAVA_OPTS += " " + env("SPARK_JAVA_OPTS") } - - if (!localResources.contains(ClientBase.LOG4J_PROP)) { - JAVA_OPTS += " " + YarnSparkHadoopUtil.getLoggingArgsForContainerCommandLine() - } + JAVA_OPTS += ClientBase.getLog4jConfiguration(localResources) // Command for the ApplicationMaster val commands = List[String]( @@ -377,6 +358,8 @@ object ClientBase { val SPARK_JAR: String = "spark.jar" val APP_JAR: String = "app.jar" val LOG4J_PROP: String = "log4j.properties" + val LOG4J_CONF_ENV_KEY: String = "SPARK_LOG4J_CONF" + val LOCAL_SCHEME = "local" // Based on code from org.apache.hadoop.mapreduce.v2.util.MRApps def populateHadoopClasspath(conf: Configuration, env: HashMap[String, String]) { @@ -428,30 +411,113 @@ object ClientBase { } } - def populateClasspath(conf: Configuration, sparkConf: SparkConf, addLog4j: Boolean, env: HashMap[String, String]) { + /** + * Returns the java command line argument for setting up log4j. If there is a log4j.properties + * in the given local resources, it is used, otherwise the SPARK_LOG4J_CONF environment variable + * is checked. + */ + def getLog4jConfiguration(localResources: HashMap[String, LocalResource]): String = { + var log4jConf = LOG4J_PROP + if (!localResources.contains(log4jConf)) { + log4jConf = System.getenv(LOG4J_CONF_ENV_KEY) match { + case conf: String => + val confUri = new URI(conf) + if (ClientBase.LOCAL_SCHEME.equals(confUri.getScheme())) { + "file://" + confUri.getPath() + } else { + ClientBase.LOG4J_PROP + } + case null => "log4j-spark-container.properties" + } + } + " -Dlog4j.configuration=" + log4jConf + } + + def populateClasspath(args: ClientArguments, conf: Configuration, sparkConf: SparkConf, + log4jConf: String, env: HashMap[String, String]) { YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$(), File.pathSeparator) - // If log4j present, ensure ours overrides all others - if (addLog4j) { - YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + - Path.SEPARATOR + LOG4J_PROP, File.pathSeparator) + if (log4jConf != null) { + // If a custom log4j config file is provided as a local: URI, add its parent directory to the + // classpath. Note that this only works if the custom config's file name is + // "log4j.properties". + val localPath = getLocalPath(log4jConf) + if (localPath != null) { + val parentPath = new File(localPath).getParent() + YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, parentPath, + File.pathSeparator) + } } // Normally the users app.jar is last in case conflicts with spark jars val userClasspathFirst = sparkConf.get("spark.yarn.user.classpath.first", "false") .toBoolean if (userClasspathFirst) { - YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + - Path.SEPARATOR + APP_JAR, File.pathSeparator) + addUserClasspath(args, env) } - YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + - Path.SEPARATOR + SPARK_JAR, File.pathSeparator) + addClasspathEntry(System.getenv("SPARK_JAR"), SPARK_JAR, env); ClientBase.populateHadoopClasspath(conf, env) - if (!userClasspathFirst) { - YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + - Path.SEPARATOR + APP_JAR, File.pathSeparator) + addUserClasspath(args, env) + } + YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, + Environment.PWD.$() + Path.SEPARATOR + "*", File.pathSeparator) + } + + /** + * Adds the user jars which have local: URIs (or alternate names, such as APP_JAR) explicitly + * to the classpath. + */ + private def addUserClasspath(args: ClientArguments, env: HashMap[String, String]) = { + if (args != null) { + addClasspathEntry(args.userJar, APP_JAR, env) + } + + if (args != null && args.addJars != null) { + args.addJars.split(",").foreach { case file: String => + addClasspathEntry(file, null, env) + } + } + } + + /** + * Adds the given path to the classpath, handling "local:" URIs correctly. + * + * If an alternate name for the file is given, and it's not a "local:" file, the alternate + * name will be added to the classpath (relative to the job's work directory). + * + * If not a "local:" file and no alternate name, the environment is not modified. + * + * @param path Path to add to classpath (optional). + * @param fileName Alternate name for the file (optional). + * @param env Map holding the environment variables. + */ + private def addClasspathEntry(path: String, fileName: String, + env: HashMap[String, String]) : Unit = { + if (path != null) { + scala.util.control.Exception.ignoring(classOf[URISyntaxException]) { + val localPath = getLocalPath(path) + if (localPath != null) { + YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, localPath, + File.pathSeparator) + return + } + } + } + if (fileName != null) { + YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, + Environment.PWD.$() + Path.SEPARATOR + fileName, File.pathSeparator); + } + } + + /** + * Returns the local path if the URI is a "local:" URI, or null otherwise. + */ + private def getLocalPath(resource: String): String = { + val uri = new URI(resource) + if (LOCAL_SCHEME.equals(uri.getScheme())) { + return uri.getPath() } - YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + - Path.SEPARATOR + "*", File.pathSeparator) + null } + } http://git-wip-us.apache.org/repos/asf/spark/blob/69047506/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala ---------------------------------------------------------------------- diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala index 9159cc4..40b3866 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala @@ -52,7 +52,7 @@ trait ExecutorRunnableUtil extends Logging { hostname: String, executorMemory: Int, executorCores: Int, - userSpecifiedLogFile: Boolean) = { + localResources: HashMap[String, LocalResource]) = { // Extra options for the JVM var JAVA_OPTS = "" // Set the JVM memory @@ -64,10 +64,7 @@ trait ExecutorRunnableUtil extends Logging { JAVA_OPTS += " -Djava.io.tmpdir=" + new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) + " " - - if (!userSpecifiedLogFile) { - JAVA_OPTS += " " + YarnSparkHadoopUtil.getLoggingArgsForContainerCommandLine() - } + JAVA_OPTS += ClientBase.getLog4jConfiguration(localResources) // Commenting it out for now - so that people can refer to the properties if required. Remove // it once cpuset version is pushed out. @@ -120,7 +117,7 @@ trait ExecutorRunnableUtil extends Logging { rtype: LocalResourceType, localResources: HashMap[String, LocalResource], timestamp: String, - size: String, + size: String, vis: String) = { val uri = new URI(file) val amJarRsrc = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource] @@ -153,7 +150,7 @@ trait ExecutorRunnableUtil extends Logging { val distArchives = System.getenv("SPARK_YARN_CACHE_ARCHIVES").split(',') val visibilities = System.getenv("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES").split(',') for( i <- 0 to distArchives.length - 1) { - setupDistributedCache(distArchives(i), LocalResourceType.ARCHIVE, localResources, + setupDistributedCache(distArchives(i), LocalResourceType.ARCHIVE, localResources, timeStamps(i), fileSizes(i), visibilities(i)) } } @@ -165,7 +162,11 @@ trait ExecutorRunnableUtil extends Logging { def prepareEnvironment: HashMap[String, String] = { val env = new HashMap[String, String]() - ClientBase.populateClasspath(yarnConf, sparkConf, System.getenv("SPARK_YARN_LOG4J_PATH") != null, env) + val log4jConf = System.getenv(ClientBase.LOG4J_CONF_ENV_KEY) + ClientBase.populateClasspath(null, yarnConf, sparkConf, log4jConf, env) + if (log4jConf != null) { + env(ClientBase.LOG4J_CONF_ENV_KEY) = log4jConf + } // Allow users to specify some environment variables YarnSparkHadoopUtil.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV"), http://git-wip-us.apache.org/repos/asf/spark/blob/69047506/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala ---------------------------------------------------------------------- diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala index 4ceed95..832d45b 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -54,7 +54,7 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil { jobCreds.mergeAll(UserGroupInformation.getCurrentUser().getCredentials()) } - override def getCurrentUserCredentials(): Credentials = { + override def getCurrentUserCredentials(): Credentials = { UserGroupInformation.getCurrentUser().getCredentials() } @@ -76,10 +76,6 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil { } object YarnSparkHadoopUtil { - def getLoggingArgsForContainerCommandLine(): String = { - "-Dlog4j.configuration=log4j-spark-container.properties" - } - def addToEnvironment( env: HashMap[String, String], variable: String, http://git-wip-us.apache.org/repos/asf/spark/blob/69047506/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala ---------------------------------------------------------------------- diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index 81d9d1b..117b33f 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -79,7 +79,7 @@ class ExecutorRunnable( ctx.setTokens(ByteBuffer.wrap(dob.getData())) val commands = prepareCommand(masterAddress, slaveId, hostname, executorMemory, executorCores, - localResources.contains(ClientBase.LOG4J_PROP)) + localResources) logInfo("Setting up executor with commands: " + commands) ctx.setCommands(commands)