spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject [5/8] git commit: Merge remote-tracking branch 'tgravescs/sparkYarnDistCache'
Date Tue, 15 Oct 2013 02:34:52 GMT
Merge remote-tracking branch 'tgravescs/sparkYarnDistCache'

Closes #11

Signed-off-by: Reynold Xin <rxin@apache.org>

Conflicts:
	docs/running-on-yarn.md
	yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala

(cherry picked from commit 8f11c36fe17c2718c895b771b59a9138521e0079)
Signed-off-by: Reynold Xin <rxin@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/4668fcb9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/4668fcb9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/4668fcb9

Branch: refs/heads/branch-0.8
Commit: 4668fcb9ff8f9c176c4866480d52dde5d67c8522
Parents: 6961744
Author: Matei Zaharia <matei@eecs.berkeley.edu>
Authored: Thu Oct 10 19:34:33 2013 -0700
Committer: Reynold Xin <rxin@apache.org>
Committed: Mon Oct 14 19:25:49 2013 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   |  17 +-
 docs/running-on-yarn.md                         |   9 +-
 .../spark/deploy/yarn/ApplicationMaster.scala   |  55 ++++++-
 .../org/apache/spark/deploy/yarn/Client.scala   | 160 ++++++++++++++++---
 .../spark/deploy/yarn/ClientArguments.scala     |  25 ++-
 .../spark/deploy/yarn/WorkerRunnable.scala      |  58 +++++--
 6 files changed, 275 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/4668fcb9/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index d710894..d7f3746 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -662,10 +662,21 @@ class SparkContext(
         key = uri.getScheme match {
           case null | "file" =>
             if (env.hadoop.isYarnMode()) {
-              logWarning("local jar specified as parameter to addJar under Yarn mode")
-              return
+              // In order for this to work on yarn the user must specify the --addjars option
to
+              // the client to upload the file into the distributed cache to make it show
up in the
+              // current working directory.
+              val fileName = new Path(uri.getPath).getName()
+              try {
+                env.httpFileServer.addJar(new File(fileName))
+              } catch {
+                case e: Exception => {
+                  logError("Error adding jar (" + e + "), was the --addJars option used?")
+                  throw e
+                }
+              }
+            } else {
+              env.httpFileServer.addJar(new File(uri.getPath))
             }
-            env.httpFileServer.addJar(new File(uri.getPath))
           case _ =>
             path
         }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/4668fcb9/docs/running-on-yarn.md
----------------------------------------------------------------------
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index 30128ec..2898af0 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -34,6 +34,8 @@ Environment variables:
 
 System Properties:
 * 'spark.yarn.applicationMaster.waitTries', property to set the number of times the ApplicationMaster
waits for the the spark master and then also the number of tries it waits for the Spark Context
to be intialized. Default is 10.
+* 'spark.yarn.submit.file.replication', the HDFS replication level for the files uploaded
into HDFS for the application. These include things like the spark jar, the app jar, and any
distributed cache files/archives.
+* 'spark.yarn.preserve.staging.files', set to true to preserve the staged files(spark jar,
app jar, distributed cache files) at the end of the job rather then delete them.
 
 # Launching Spark on YARN
 
@@ -51,7 +53,10 @@ The command to launch the YARN Client is as follows:
       --worker-memory <MEMORY_PER_WORKER> \
       --worker-cores <CORES_PER_WORKER> \
       --name <application_name> \
-      --queue <queue_name>
+      --queue <queue_name> \
+      --addJars <any_local_files_used_in_SparkContext.addJar> \
+      --files <files_for_distributed_cache> \
+      --archives <archives_for_distributed_cache>
 
 For example:
 
@@ -84,3 +89,5 @@ The above starts a YARN Client programs which periodically polls the Application
 - When your application instantiates a Spark context it must use a special "yarn-standalone"
master url. This starts the scheduler without forcing it to connect to a cluster. A good way
to handle this is to pass "yarn-standalone" as an argument to your program, as shown in the
example above.
 - We do not requesting container resources based on the number of cores. Thus the numbers
of cores given via command line arguments cannot be guaranteed.
 - The local directories used for spark will be the local directories configured for YARN
(Hadoop Yarn config yarn.nodemanager.local-dirs). If the user specifies spark.local.dir, it
will be ignored.
+- The --files and --archives options support specifying file names with the # similar to
Hadoop. For example you can specify: --files localtest.txt#appSees.txt and this will upload
the file you have locally named localtest.txt into HDFS but this will be linked to by the
name appSees.txt and your application should use the name as appSees.txt to reference it when
running on YARN.
+- The --addJars option allows the SparkContext.addJar function to work if you are using it
with local files. It does not need to be used if you are using it with HDFS, HTTP, HTTPS,
or FTP files.

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/4668fcb9/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 858b58d..c1a87d3 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -17,22 +17,25 @@
 
 package org.apache.spark.deploy.yarn
 
+import java.io.IOException;
 import java.net.Socket
+import java.security.PrivilegedExceptionAction
 import java.util.concurrent.CopyOnWriteArrayList
 import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
 import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.net.NetUtils
+import org.apache.hadoop.util.ShutdownHookManager
 import org.apache.hadoop.yarn.api._
 import org.apache.hadoop.yarn.api.records._
 import org.apache.hadoop.yarn.api.protocolrecords._
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.yarn.ipc.YarnRPC
 import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
-import scala.collection.JavaConversions._
 import org.apache.spark.{SparkContext, Logging}
 import org.apache.spark.util.Utils
 import org.apache.hadoop.security.UserGroupInformation
-import java.security.PrivilegedExceptionAction
+import scala.collection.JavaConversions._
 
 class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) extends Logging
{
 
@@ -43,18 +46,26 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration)
e
   private var appAttemptId: ApplicationAttemptId = null
   private var userThread: Thread = null
   private val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
+  private val fs = FileSystem.get(yarnConf)
 
   private var yarnAllocator: YarnAllocationHandler = null
   private var isFinished:Boolean = false
   private var uiAddress: String = ""
+  private val maxAppAttempts: Int = conf.getInt(YarnConfiguration.RM_AM_MAX_RETRIES,
+    YarnConfiguration.DEFAULT_RM_AM_MAX_RETRIES)
+  private var isLastAMRetry: Boolean = true
 
 
   def run() {
     // setup the directories so things go to yarn approved directories rather
     // then user specified and /tmp
     System.setProperty("spark.local.dir", getLocalDirs())
+
+    // use priority 30 as its higher then HDFS. Its same priority as MapReduce is using
+    ShutdownHookManager.get().addShutdownHook(new AppMasterShutdownHook(this), 30)
     
     appAttemptId = getApplicationAttemptId()
+    isLastAMRetry = appAttemptId.getAttemptId() >= maxAppAttempts;
     resourceManager = registerWithResourceManager()
 
     // Workaround until hadoop moves to something which has
@@ -183,6 +194,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration)
e
           // It need shutdown hook to set SUCCEEDED
           successed = true
         } finally {
+          logDebug("finishing main")
+          isLastAMRetry = true;
           if (successed) {
             ApplicationMaster.this.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
           } else {
@@ -229,8 +242,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration)
e
     }
   }
 
-
-
   private def allocateWorkers() {
     try {
       logInfo("Allocating " + args.numWorkers + " workers.")
@@ -329,6 +340,40 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration)
e
     resourceManager.finishApplicationMaster(finishReq)
 
   }
+
+  /**
+   * clean up the staging directory. 
+   */
+  private def cleanupStagingDir() { 
+    var stagingDirPath: Path = null
+    try {
+      val preserveFiles = System.getProperty("spark.yarn.preserve.staging.files", "false").toBoolean
+      if (!preserveFiles) {
+        stagingDirPath = new Path(System.getenv("SPARK_YARN_JAR_PATH")).getParent()
+        if (stagingDirPath == null) {
+          logError("Staging directory is null")
+          return
+        }
+        logInfo("Deleting staging directory " + stagingDirPath)
+        fs.delete(stagingDirPath, true)
+      }
+    } catch {
+      case e: IOException =>
+        logError("Failed to cleanup staging dir " + stagingDirPath, e)
+    }
+  }
+
+  // The shutdown hook that runs when a signal is received AND during normal
+  // close of the JVM. 
+  class AppMasterShutdownHook(appMaster: ApplicationMaster) extends Runnable {
+
+    def run() {
+      logInfo("AppMaster received a signal.")
+      // we need to clean up staging dir before HDFS is shut down
+      // make sure we don't delete it until this is the last AM
+      if (appMaster.isLastAMRetry) appMaster.cleanupStagingDir()
+    }
+  }
  
 }
 
@@ -368,6 +413,8 @@ object ApplicationMaster {
     // Add a shutdown hook - as a best case effort in case users do not call sc.stop or do
System.exit
     // Should not really have to do this, but it helps yarn to evict resources earlier.
     // not to mention, prevent Client declaring failure even though we exit'ed properly.
+    // Note that this will unfortunately not properly clean up the staging files because
it gets called to 
+    // late and the filesystem is already shutdown.
     if (modified) {
       Runtime.getRuntime().addShutdownHook(new Thread with Logging { 
         // This is not just to log, but also to ensure that log system is initialized for
this instance when we actually are 'run'

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/4668fcb9/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 076dd3c..8afb3e3 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -45,7 +45,13 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
   
   var rpc: YarnRPC = YarnRPC.create(conf)
   val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
-  val credentials = UserGroupInformation.getCurrentUser().getCredentials();
+  val credentials = UserGroupInformation.getCurrentUser().getCredentials()
+  private var distFiles = None: Option[String]
+  private var distFilesTimeStamps = None: Option[String]
+  private var distFilesFileSizes = None: Option[String]
+  private var distArchives = None: Option[String]
+  private var distArchivesTimeStamps = None: Option[String]
+  private var distArchivesFileSizes = None: Option[String]
   
   def run() {
     init(yarnConf)
@@ -57,7 +63,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
 
     verifyClusterResources(newApp)
     val appContext = createApplicationSubmissionContext(appId)
-    val localResources = prepareLocalResources(appId, "spark")
+    val localResources = prepareLocalResources(appId, ".sparkStaging")
     val env = setupLaunchEnv(localResources)
     val amContainer = createContainerLaunchContext(newApp, localResources, env)
 
@@ -109,10 +115,73 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
     appContext.setApplicationName(args.appName)
     return appContext
   }
-  
-  def prepareLocalResources(appId: ApplicationId, appName: String): HashMap[String, LocalResource]
= {
+
+  /**
+   * Copy the local file into HDFS and configure to be distributed with the
+   * job via the distributed cache.
+   * If a fragment is specified the file will be referenced as that fragment.
+   */
+  private def copyLocalFile(
+      dstDir: Path,
+      resourceType: LocalResourceType,
+      originalPath: Path,
+      replication: Short,
+      localResources: HashMap[String,LocalResource],
+      fragment: String,
+      appMasterOnly: Boolean = false): Unit = {
+    val fs = FileSystem.get(conf)
+    val newPath = new Path(dstDir, originalPath.getName())
+    logInfo("Uploading " + originalPath + " to " + newPath)
+    fs.copyFromLocalFile(false, true, originalPath, newPath)
+    fs.setReplication(newPath, replication);
+    val destStatus = fs.getFileStatus(newPath)
+
+    val amJarRsrc = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource]
+    amJarRsrc.setType(resourceType)
+    amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION)
+    amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(newPath))
+    amJarRsrc.setTimestamp(destStatus.getModificationTime())
+    amJarRsrc.setSize(destStatus.getLen())
+    var pathURI: URI = new URI(newPath.toString() + "#" + originalPath.getName());
+    if ((fragment == null) || (fragment.isEmpty())){
+      localResources(originalPath.getName()) = amJarRsrc
+    } else {
+      localResources(fragment) = amJarRsrc
+      pathURI = new URI(newPath.toString() + "#" + fragment);
+    }
+    val distPath = pathURI.toString()
+    if (appMasterOnly == true) return
+    if (resourceType == LocalResourceType.FILE) {
+      distFiles match {
+        case Some(path) =>
+          distFilesFileSizes = Some(distFilesFileSizes.get + "," + 
+            destStatus.getLen().toString())
+          distFilesTimeStamps = Some(distFilesTimeStamps.get + "," + 
+            destStatus.getModificationTime().toString())
+          distFiles = Some(path + "," + distPath)
+        case _ => 
+          distFilesFileSizes = Some(destStatus.getLen().toString())
+          distFilesTimeStamps = Some(destStatus.getModificationTime().toString())
+          distFiles = Some(distPath)
+      }
+    } else {
+      distArchives match {
+        case Some(path) =>
+          distArchivesTimeStamps = Some(distArchivesTimeStamps.get + "," +
+            destStatus.getModificationTime().toString())
+          distArchivesFileSizes = Some(distArchivesFileSizes.get + "," + 
+            destStatus.getLen().toString())
+          distArchives = Some(path + "," + distPath)
+        case _ => 
+          distArchivesTimeStamps = Some(destStatus.getModificationTime().toString())
+          distArchivesFileSizes = Some(destStatus.getLen().toString())
+          distArchives = Some(distPath)
+      }
+    }
+  }
+
+  def prepareLocalResources(appId: ApplicationId, sparkStagingDir: String): HashMap[String,
LocalResource] = {
     logInfo("Preparing Local resources")
-    val locaResources = HashMap[String, LocalResource]()
     // Upload Spark and the application JAR to the remote file system
     // Add them as local resources to the AM
     val fs = FileSystem.get(conf)
@@ -125,33 +194,69 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
       }
     }
 
+    val pathSuffix = sparkStagingDir + "/" + appId.toString() + "/"
+    val dst = new Path(fs.getHomeDirectory(), pathSuffix)
+    val replication = System.getProperty("spark.yarn.submit.file.replication", "3").toShort
+
+    if (UserGroupInformation.isSecurityEnabled()) {
+      val dstFs = dst.getFileSystem(conf)
+      dstFs.addDelegationTokens(delegTokenRenewer, credentials);
+    }
+    val localResources = HashMap[String, LocalResource]()
+
     Map("spark.jar" -> System.getenv("SPARK_JAR"), "app.jar" -> args.userJar, "log4j.properties"
-> System.getenv("SPARK_LOG4J_CONF"))
     .foreach { case(destName, _localPath) =>
       val localPath: String = if (_localPath != null) _localPath.trim() else ""
       if (! localPath.isEmpty()) {
         val src = new Path(localPath)
-        val pathSuffix = appName + "/" + appId.getId() + destName
-        val dst = new Path(fs.getHomeDirectory(), pathSuffix)
-        logInfo("Uploading " + src + " to " + dst)
-        fs.copyFromLocalFile(false, true, src, dst)
-        val destStatus = fs.getFileStatus(dst)
-
-        // get tokens for anything we upload to hdfs
-        if (UserGroupInformation.isSecurityEnabled()) {
-          fs.addDelegationTokens(delegTokenRenewer, credentials);
-        }
+        val newPath = new Path(dst, destName)
+        logInfo("Uploading " + src + " to " + newPath)
+        fs.copyFromLocalFile(false, true, src, newPath)
+        fs.setReplication(newPath, replication);
+        val destStatus = fs.getFileStatus(newPath)
 
         val amJarRsrc = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource]
         amJarRsrc.setType(LocalResourceType.FILE)
         amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION)
-        amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(dst))
+        amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(newPath))
         amJarRsrc.setTimestamp(destStatus.getModificationTime())
         amJarRsrc.setSize(destStatus.getLen())
-        locaResources(destName) = amJarRsrc
+        localResources(destName) = amJarRsrc
       }
     }
+
+    // handle any add jars
+    if ((args.addJars != null) && (!args.addJars.isEmpty())){
+      args.addJars.split(',').foreach { case file: String =>
+        val tmpURI = new URI(file)
+        val tmp = new Path(tmpURI)
+        copyLocalFile(dst, LocalResourceType.FILE, tmp, replication, localResources,
+          tmpURI.getFragment(), true)
+      }
+    }
+
+    // handle any distributed cache files
+    if ((args.files != null) && (!args.files.isEmpty())){
+      args.files.split(',').foreach { case file: String =>
+        val tmpURI = new URI(file)
+        val tmp = new Path(tmpURI)
+        copyLocalFile(dst, LocalResourceType.FILE, tmp, replication, localResources,
+          tmpURI.getFragment())
+      }
+    }
+
+    // handle any distributed cache archives
+    if ((args.archives != null) && (!args.archives.isEmpty())) {
+      args.archives.split(',').foreach { case file:String =>
+        val tmpURI = new URI(file)
+        val tmp = new Path(tmpURI)
+        copyLocalFile(dst, LocalResourceType.ARCHIVE, tmp, replication, 
+          localResources, tmpURI.getFragment())
+      }
+    }
+
     UserGroupInformation.getCurrentUser().addCredentials(credentials);
-    return locaResources
+    return localResources
   }
   
   def setupLaunchEnv(localResources: HashMap[String, LocalResource]): HashMap[String, String]
= {
@@ -160,11 +265,10 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
 
     val env = new HashMap[String, String]()
 
-    // If log4j present, ensure ours overrides all others
-    if (log4jConfLocalRes != null) Apps.addToEnvironment(env, Environment.CLASSPATH.name,
"./")
+    Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$())
+    Apps.addToEnvironment(env, Environment.CLASSPATH.name,
+      Environment.PWD.$() + Path.SEPARATOR + "*")
 
-    Apps.addToEnvironment(env, Environment.CLASSPATH.name, "./*")
-    Apps.addToEnvironment(env, Environment.CLASSPATH.name, "$CLASSPATH")
     Client.populateHadoopClasspath(yarnConf, env)
     env("SPARK_YARN_MODE") = "true"
     env("SPARK_YARN_JAR_PATH") = 
@@ -186,6 +290,18 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
       env("SPARK_YARN_LOG4J_SIZE") =  log4jConfLocalRes.getSize().toString()
     }
 
+    // set the environment variables to be passed on to the Workers
+    if (distFiles != None) {
+      env("SPARK_YARN_CACHE_FILES") = distFiles.get
+      env("SPARK_YARN_CACHE_FILES_TIME_STAMPS") = distFilesTimeStamps.get
+      env("SPARK_YARN_CACHE_FILES_FILE_SIZES") = distFilesFileSizes.get
+    }
+    if (distArchives != None) {
+      env("SPARK_YARN_CACHE_ARCHIVES") = distArchives.get
+      env("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") = distArchivesTimeStamps.get
+      env("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") = distArchivesFileSizes.get
+    }
+
     // allow users to specify some environment variables
     Apps.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV"))
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/4668fcb9/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index c56dbd9..852dbd7 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -24,6 +24,9 @@ import org.apache.spark.scheduler.{InputFormatInfo, SplitInfo}
 
 // TODO: Add code and support for ensuring that yarn resource 'asks' are location aware !
 class ClientArguments(val args: Array[String]) {
+  var addJars: String = null
+  var files: String = null
+  var archives: String = null
   var userJar: String = null
   var userClass: String = null
   var userArgs: Seq[String] = Seq[String]()
@@ -81,6 +84,17 @@ class ClientArguments(val args: Array[String]) {
 
         case ("--name") :: value :: tail =>
           appName = value
+
+        case ("--addJars") :: value :: tail =>
+          addJars = value
+          args = tail
+
+        case ("--files") :: value :: tail =>
+          files = value
+          args = tail
+
+        case ("--archives") :: value :: tail =>
+          archives = value
           args = tail
 
         case Nil =>
@@ -97,7 +111,7 @@ class ClientArguments(val args: Array[String]) {
     inputFormatInfo = inputFormatMap.values.toList
   }
 
-  
+
   def printUsageAndExit(exitCode: Int, unknownParam: Any = null) {
     if (unknownParam != null) {
       System.err.println("Unknown/unsupported param " + unknownParam)
@@ -113,10 +127,13 @@ class ClientArguments(val args: Array[String]) {
       "  --worker-cores NUM   Number of cores for the workers (Default: 1). This is unsused
right now.\n" +
       "  --master-memory MEM  Memory for Master (e.g. 1000M, 2G) (Default: 512 Mb)\n" +
       "  --worker-memory MEM  Memory per Worker (e.g. 1000M, 2G) (Default: 1G)\n" +
-      "  --name NAME          The name of your application (Default: Spark)\n" + 
-      "  --queue QUEUE        The hadoop queue to use for allocation requests (Default: 'default')"
+      "  --name NAME          The name of your application (Default: Spark)\n" +
+      "  --queue QUEUE        The hadoop queue to use for allocation requests (Default: 'default')\n"
+
+      "  --addJars jars       Comma separated list of local jars that want SparkContext.addJar
to work with.\n" +
+      "  --files files        Comma separated list of files to be distributed with the job.\n"
+
+      "  --archives archives  Comma separated list of archives to be distributed with the
job."
       )
     System.exit(exitCode)
   }
-  
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/4668fcb9/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
index a60e8a3..8dac9e0 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
@@ -137,11 +137,26 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress:
S
     startReq.setContainerLaunchContext(ctx)
     cm.startContainer(startReq)
   }
+
+  private def setupDistributedCache(file: String,
+      rtype: LocalResourceType,
+      localResources: HashMap[String, LocalResource],
+      timestamp: String,
+      size: String) = {
+    val uri = new URI(file)
+    val amJarRsrc = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource]
+    amJarRsrc.setType(rtype)
+    amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION)
+    amJarRsrc.setResource(ConverterUtils.getYarnUrlFromURI(uri))
+    amJarRsrc.setTimestamp(timestamp.toLong)
+    amJarRsrc.setSize(size.toLong)
+    localResources(uri.getFragment()) = amJarRsrc
+  }
   
   
   def prepareLocalResources: HashMap[String, LocalResource] = {
     logInfo("Preparing Local resources")
-    val locaResources = HashMap[String, LocalResource]()
+    val localResources = HashMap[String, LocalResource]()
     
     // Spark JAR
     val sparkJarResource = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource]
@@ -151,7 +166,7 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress:
S
       new URI(System.getenv("SPARK_YARN_JAR_PATH"))))
     sparkJarResource.setTimestamp(System.getenv("SPARK_YARN_JAR_TIMESTAMP").toLong)
     sparkJarResource.setSize(System.getenv("SPARK_YARN_JAR_SIZE").toLong)
-    locaResources("spark.jar") = sparkJarResource
+    localResources("spark.jar") = sparkJarResource
     // User JAR
     val userJarResource = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource]
     userJarResource.setType(LocalResourceType.FILE)
@@ -160,7 +175,7 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress:
S
       new URI(System.getenv("SPARK_YARN_USERJAR_PATH"))))
     userJarResource.setTimestamp(System.getenv("SPARK_YARN_USERJAR_TIMESTAMP").toLong)
     userJarResource.setSize(System.getenv("SPARK_YARN_USERJAR_SIZE").toLong)
-    locaResources("app.jar") = userJarResource
+    localResources("app.jar") = userJarResource
 
     // Log4j conf - if available
     if (System.getenv("SPARK_YARN_LOG4J_PATH") != null) {
@@ -171,26 +186,39 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress:
S
         new URI(System.getenv("SPARK_YARN_LOG4J_PATH"))))
       log4jConfResource.setTimestamp(System.getenv("SPARK_YARN_LOG4J_TIMESTAMP").toLong)
       log4jConfResource.setSize(System.getenv("SPARK_YARN_LOG4J_SIZE").toLong)
-      locaResources("log4j.properties") = log4jConfResource
+      localResources("log4j.properties") = log4jConfResource
+    }
+
+    if (System.getenv("SPARK_YARN_CACHE_FILES") != null) {
+      val timeStamps = System.getenv("SPARK_YARN_CACHE_FILES_TIME_STAMPS").split(',')
+      val fileSizes = System.getenv("SPARK_YARN_CACHE_FILES_FILE_SIZES").split(',')
+      val distFiles = System.getenv("SPARK_YARN_CACHE_FILES").split(',')
+      for( i <- 0 to distFiles.length - 1) {
+        setupDistributedCache(distFiles(i), LocalResourceType.FILE, localResources, timeStamps(i),
+          fileSizes(i))
+      }
     }
 
+    if (System.getenv("SPARK_YARN_CACHE_ARCHIVES") != null) {
+      val timeStamps = System.getenv("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS").split(',')
+      val fileSizes = System.getenv("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES").split(',')
+      val distArchives = System.getenv("SPARK_YARN_CACHE_ARCHIVES").split(',')
+      for( i <- 0 to distArchives.length - 1) {
+        setupDistributedCache(distArchives(i), LocalResourceType.ARCHIVE, localResources,

+          timeStamps(i), fileSizes(i))
+      }
+    }
     
-    logInfo("Prepared Local resources " + locaResources)
-    return locaResources
+    logInfo("Prepared Local resources " + localResources)
+    return localResources
   }
   
   def prepareEnvironment: HashMap[String, String] = {
     val env = new HashMap[String, String]()
 
-    // If log4j present, ensure ours overrides all others
-    if (System.getenv("SPARK_YARN_LOG4J_PATH") != null) {
-      // Which is correct ?
-      Apps.addToEnvironment(env, Environment.CLASSPATH.name, "./log4j.properties")
-      Apps.addToEnvironment(env, Environment.CLASSPATH.name, "./")
-    }
-
-    Apps.addToEnvironment(env, Environment.CLASSPATH.name, "./*")
-    Apps.addToEnvironment(env, Environment.CLASSPATH.name, "$CLASSPATH")
+    Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$())
+    Apps.addToEnvironment(env, Environment.CLASSPATH.name, 
+      Environment.PWD.$() + Path.SEPARATOR + "*")
     Client.populateHadoopClasspath(yarnConf, env)
 
     // allow users to specify some environment variables


Mime
View raw message