spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From van...@apache.org
Subject spark git commit: [SPARK-21714][CORE][BACKPORT-2.2] Avoiding re-uploading remote resources in yarn client mode
Date Wed, 30 Aug 2017 19:30:30 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 a6a994414 -> d10c9dc3f


[SPARK-21714][CORE][BACKPORT-2.2] Avoiding re-uploading remote resources in yarn client mode

## What changes were proposed in this pull request?

This is a backport PR to fix issue of re-uploading remote resource in yarn client mode. The
original PR is #18962.

## How was this patch tested?

Tested in local UT.

Author: jerryshao <sshao@hortonworks.com>

Closes #19074 from jerryshao/SPARK-21714-2.2-backport.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d10c9dc3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d10c9dc3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d10c9dc3

Branch: refs/heads/branch-2.2
Commit: d10c9dc3f631a26dbbbd8f5c601ca2001a5d7c80
Parents: a6a9944
Author: jerryshao <sshao@hortonworks.com>
Authored: Wed Aug 30 12:30:24 2017 -0700
Committer: Marcelo Vanzin <vanzin@cloudera.com>
Committed: Wed Aug 30 12:30:24 2017 -0700

----------------------------------------------------------------------
 .../org/apache/spark/deploy/SparkSubmit.scala   | 66 ++++++++++++-------
 .../apache/spark/internal/config/package.scala  |  2 +-
 .../scala/org/apache/spark/util/Utils.scala     | 25 ++++---
 .../apache/spark/deploy/SparkSubmitSuite.scala  | 68 ++++++++++++++++----
 .../org/apache/spark/repl/SparkILoop.scala      |  2 +-
 .../main/scala/org/apache/spark/repl/Main.scala |  2 +-
 6 files changed, 116 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d10c9dc3/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index c60a2a1..86d578e 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -208,14 +208,20 @@ object SparkSubmit extends CommandLineUtils {
 
   /**
    * Prepare the environment for submitting an application.
-   * This returns a 4-tuple:
-   *   (1) the arguments for the child process,
-   *   (2) a list of classpath entries for the child,
-   *   (3) a map of system properties, and
-   *   (4) the main class for the child
+   *
+   * @param args the parsed SparkSubmitArguments used for environment preparation.
+   * @param conf the Hadoop Configuration, this argument will only be set in unit test.
+   * @return a 4-tuple:
+   *        (1) the arguments for the child process,
+   *        (2) a list of classpath entries for the child,
+   *        (3) a map of system properties, and
+   *        (4) the main class for the child
+   *
    * Exposed for testing.
    */
-  private[deploy] def prepareSubmitEnvironment(args: SparkSubmitArguments)
+  private[deploy] def prepareSubmitEnvironment(
+      args: SparkSubmitArguments,
+      conf: Option[HadoopConfiguration] = None)
       : (Seq[String], Seq[String], Map[String, String], String) = {
     // Return values
     val childArgs = new ArrayBuffer[String]()
@@ -311,12 +317,16 @@ object SparkSubmit extends CommandLineUtils {
     }
 
     // In client mode, download remote files.
+    var localPrimaryResource: String = null
+    var localJars: String = null
+    var localPyFiles: String = null
+    var localFiles: String = null
     if (deployMode == CLIENT) {
-      val hadoopConf = new HadoopConfiguration()
-      args.primaryResource = Option(args.primaryResource).map(downloadFile(_, hadoopConf)).orNull
-      args.jars = Option(args.jars).map(downloadFileList(_, hadoopConf)).orNull
-      args.pyFiles = Option(args.pyFiles).map(downloadFileList(_, hadoopConf)).orNull
-      args.files = Option(args.files).map(downloadFileList(_, hadoopConf)).orNull
+      val hadoopConf = conf.getOrElse(new HadoopConfiguration())
+      localPrimaryResource = Option(args.primaryResource).map(downloadFile(_, hadoopConf)).orNull
+      localJars = Option(args.jars).map(downloadFileList(_, hadoopConf)).orNull
+      localPyFiles = Option(args.pyFiles).map(downloadFileList(_, hadoopConf)).orNull
+      localFiles = Option(args.files).map(downloadFileList(_, hadoopConf)).orNull
     }
 
     // Require all python files to be local, so we can add them to the PYTHONPATH
@@ -366,7 +376,7 @@ object SparkSubmit extends CommandLineUtils {
         // If a python file is provided, add it to the child arguments and list of files
to deploy.
         // Usage: PythonAppRunner <main python file> <extra python files> [app
arguments]
         args.mainClass = "org.apache.spark.deploy.PythonRunner"
-        args.childArgs = ArrayBuffer(args.primaryResource, args.pyFiles) ++ args.childArgs
+        args.childArgs = ArrayBuffer(localPrimaryResource, localPyFiles) ++ args.childArgs
         if (clusterManager != YARN) {
           // The YARN backend distributes the primary file differently, so don't merge it.
           args.files = mergeFileLists(args.files, args.primaryResource)
@@ -376,8 +386,8 @@ object SparkSubmit extends CommandLineUtils {
         // The YARN backend handles python files differently, so don't merge the lists.
         args.files = mergeFileLists(args.files, args.pyFiles)
       }
-      if (args.pyFiles != null) {
-        sysProps("spark.submit.pyFiles") = args.pyFiles
+      if (localPyFiles != null) {
+        sysProps("spark.submit.pyFiles") = localPyFiles
       }
     }
 
@@ -431,7 +441,7 @@ object SparkSubmit extends CommandLineUtils {
         // If an R file is provided, add it to the child arguments and list of files to deploy.
         // Usage: RRunner <main R file> [app arguments]
         args.mainClass = "org.apache.spark.deploy.RRunner"
-        args.childArgs = ArrayBuffer(args.primaryResource) ++ args.childArgs
+        args.childArgs = ArrayBuffer(localPrimaryResource) ++ args.childArgs
         args.files = mergeFileLists(args.files, args.primaryResource)
       }
     }
@@ -468,6 +478,7 @@ object SparkSubmit extends CommandLineUtils {
       OptionAssigner(args.queue, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.queue"),
       OptionAssigner(args.numExecutors, YARN, ALL_DEPLOY_MODES,
         sysProp = "spark.executor.instances"),
+      OptionAssigner(args.pyFiles, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.dist.pyFiles"),
       OptionAssigner(args.jars, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.dist.jars"),
       OptionAssigner(args.files, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.dist.files"),
       OptionAssigner(args.archives, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.dist.archives"),
@@ -491,15 +502,28 @@ object SparkSubmit extends CommandLineUtils {
         sysProp = "spark.driver.cores"),
       OptionAssigner(args.supervise.toString, STANDALONE | MESOS, CLUSTER,
         sysProp = "spark.driver.supervise"),
-      OptionAssigner(args.ivyRepoPath, STANDALONE, CLUSTER, sysProp = "spark.jars.ivy")
+      OptionAssigner(args.ivyRepoPath, STANDALONE, CLUSTER, sysProp = "spark.jars.ivy"),
+
+      // An internal option used only for spark-shell to add user jars to repl's classloader,
+      // previously it uses "spark.jars" or "spark.yarn.dist.jars" which now may be pointed
to
+      // remote jars, so adding a new option to only specify local jars for spark-shell internally.
+      OptionAssigner(localJars, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.repl.local.jars")
     )
 
     // In client mode, launch the application main class directly
     // In addition, add the main application jar and any added jars (if any) to the classpath
-    // Also add the main application jar and any added jars to classpath in case YARN client
-    // requires these jars.
-    if (deployMode == CLIENT || isYarnCluster) {
+    if (deployMode == CLIENT) {
       childMainClass = args.mainClass
+      if (localPrimaryResource != null && isUserJar(localPrimaryResource)) {
+        childClasspath += localPrimaryResource
+      }
+      if (localJars != null) { childClasspath ++= localJars.split(",") }
+    }
+    // Add the main application jar and any added jars to classpath in case YARN client
+    // requires these jars.
+    // This assumes both primaryResource and user jars are local jars, otherwise it will
not be
+    // added to the classpath of YARN client.
+    if (isYarnCluster) {
       if (isUserJar(args.primaryResource)) {
         childClasspath += args.primaryResource
       }
@@ -556,10 +580,6 @@ object SparkSubmit extends CommandLineUtils {
       if (args.isPython) {
         sysProps.put("spark.yarn.isPython", "true")
       }
-
-      if (args.pyFiles != null) {
-        sysProps("spark.submit.pyFiles") = args.pyFiles
-      }
     }
 
     // assure a keytab is available from any place in a JVM

http://git-wip-us.apache.org/repos/asf/spark/blob/d10c9dc3/core/src/main/scala/org/apache/spark/internal/config/package.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index c0fcf99..1588dfe 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -87,7 +87,7 @@ package object config {
     .intConf
     .createOptional
 
-  private[spark] val PY_FILES = ConfigBuilder("spark.submit.pyFiles")
+  private[spark] val PY_FILES = ConfigBuilder("spark.yarn.dist.pyFiles")
     .internal()
     .stringConf
     .toSequence

http://git-wip-us.apache.org/repos/asf/spark/blob/d10c9dc3/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 999486c..69c6c33 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -2580,18 +2580,23 @@ private[spark] object Utils extends Logging {
   }
 
   /**
-   * In YARN mode this method returns a union of the jar files pointed by "spark.jars" and
the
-   * "spark.yarn.dist.jars" properties, while in other modes it returns the jar files pointed
by
-   * only the "spark.jars" property.
+   * Return the jar files pointed by the "spark.jars" property. Spark internally will distribute
+   * these jars through file server. In the YARN mode, it will return an empty list, since
YARN
+   * has its own mechanism to distribute jars.
    */
-  def getUserJars(conf: SparkConf, isShell: Boolean = false): Seq[String] = {
+  def getUserJars(conf: SparkConf): Seq[String] = {
     val sparkJars = conf.getOption("spark.jars")
-    if (conf.get("spark.master") == "yarn" && isShell) {
-      val yarnJars = conf.getOption("spark.yarn.dist.jars")
-      unionFileLists(sparkJars, yarnJars).toSeq
-    } else {
-      sparkJars.map(_.split(",")).map(_.filter(_.nonEmpty)).toSeq.flatten
-    }
+    sparkJars.map(_.split(",")).map(_.filter(_.nonEmpty)).toSeq.flatten
+  }
+
+  /**
+   * Return the local jar files which will be added to REPL's classpath. These jar files
are
+   * specified by --jars (spark.jars) or --packages, remote jars will be downloaded to local
by
+   * SparkSubmit at first.
+   */
+  def getLocalUserJarsForShell(conf: SparkConf): Seq[String] = {
+    val localJars = conf.getOption("spark.repl.local.jars")
+    localJars.map(_.split(",")).map(_.filter(_.nonEmpty)).toSeq.flatten
   }
 
   private[spark] val REDACTION_REPLACEMENT_TEXT = "*********(redacted)"

http://git-wip-us.apache.org/repos/asf/spark/blob/d10c9dc3/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index 6fa3a09..3c49b1f 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -27,7 +27,7 @@ import scala.io.Source
 import com.google.common.io.ByteStreams
 import org.apache.commons.io.{FilenameUtils, FileUtils}
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem}
 import org.scalatest.{BeforeAndAfterEach, Matchers}
 import org.scalatest.concurrent.Timeouts
 import org.scalatest.time.SpanSugar._
@@ -738,10 +738,7 @@ class SparkSubmitSuite
 
   test("downloadFile - file doesn't exist") {
     val hadoopConf = new Configuration()
-    // Set s3a implementation to local file system for testing.
-    hadoopConf.set("fs.s3a.impl", "org.apache.spark.deploy.TestFileSystem")
-    // Disable file system impl cache to make sure the test file system is picked up.
-    hadoopConf.set("fs.s3a.impl.disable.cache", "true")
+    updateConfWithFakeS3Fs(hadoopConf)
     intercept[FileNotFoundException] {
       SparkSubmit.downloadFile("s3a:/no/such/file", hadoopConf)
     }
@@ -759,10 +756,7 @@ class SparkSubmitSuite
     val content = "hello, world"
     FileUtils.write(jarFile, content)
     val hadoopConf = new Configuration()
-    // Set s3a implementation to local file system for testing.
-    hadoopConf.set("fs.s3a.impl", "org.apache.spark.deploy.TestFileSystem")
-    // Disable file system impl cache to make sure the test file system is picked up.
-    hadoopConf.set("fs.s3a.impl.disable.cache", "true")
+    updateConfWithFakeS3Fs(hadoopConf)
     val sourcePath = s"s3a://${jarFile.getAbsolutePath}"
     val outputPath = SparkSubmit.downloadFile(sourcePath, hadoopConf)
     checkDownloadedFile(sourcePath, outputPath)
@@ -775,10 +769,7 @@ class SparkSubmitSuite
     val content = "hello, world"
     FileUtils.write(jarFile, content)
     val hadoopConf = new Configuration()
-    // Set s3a implementation to local file system for testing.
-    hadoopConf.set("fs.s3a.impl", "org.apache.spark.deploy.TestFileSystem")
-    // Disable file system impl cache to make sure the test file system is picked up.
-    hadoopConf.set("fs.s3a.impl.disable.cache", "true")
+    updateConfWithFakeS3Fs(hadoopConf)
     val sourcePaths = Seq("/local/file", s"s3a://${jarFile.getAbsolutePath}")
     val outputPaths = SparkSubmit.downloadFileList(sourcePaths.mkString(","), hadoopConf).split(",")
 
@@ -789,6 +780,43 @@ class SparkSubmitSuite
     }
   }
 
+  test("Avoid re-upload remote resources in yarn client mode") {
+    val hadoopConf = new Configuration()
+    updateConfWithFakeS3Fs(hadoopConf)
+
+    val tmpDir = Utils.createTempDir()
+    val file = File.createTempFile("tmpFile", "", tmpDir)
+    val pyFile = File.createTempFile("tmpPy", ".egg", tmpDir)
+    val mainResource = File.createTempFile("tmpPy", ".py", tmpDir)
+    val tmpJar = TestUtils.createJarWithFiles(Map("test.resource" -> "USER"), tmpDir)
+    val tmpJarPath = s"s3a://${new File(tmpJar.toURI).getAbsolutePath}"
+
+    val args = Seq(
+      "--class", UserClasspathFirstTest.getClass.getName.stripPrefix("$"),
+      "--name", "testApp",
+      "--master", "yarn",
+      "--deploy-mode", "client",
+      "--jars", tmpJarPath,
+      "--files", s"s3a://${file.getAbsolutePath}",
+      "--py-files", s"s3a://${pyFile.getAbsolutePath}",
+      s"s3a://$mainResource"
+      )
+
+    val appArgs = new SparkSubmitArguments(args)
+    val sysProps = SparkSubmit.prepareSubmitEnvironment(appArgs, Some(hadoopConf))._3
+
+    // All the resources should still be remote paths, so that YARN client will not upload
again.
+    sysProps("spark.yarn.dist.jars") should be (tmpJarPath)
+    sysProps("spark.yarn.dist.files") should be (s"s3a://${file.getAbsolutePath}")
+    sysProps("spark.yarn.dist.pyFiles") should be (s"s3a://${pyFile.getAbsolutePath}")
+
+    // Local repl jars should be a local path.
+    sysProps("spark.repl.local.jars") should (startWith("file:"))
+
+    // local py files should not be a URI format.
+    sysProps("spark.submit.pyFiles") should (startWith("/"))
+  }
+
   // NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly.
   private def runSparkSubmit(args: Seq[String]): Unit = {
     val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!"))
@@ -828,6 +856,11 @@ class SparkSubmitSuite
       Utils.deleteRecursively(tmpDir)
     }
   }
+
+  private def updateConfWithFakeS3Fs(conf: Configuration): Unit = {
+    conf.set("fs.s3a.impl", classOf[TestFileSystem].getCanonicalName)
+    conf.set("fs.s3a.impl.disable.cache", "true")
+  }
 }
 
 object JarCreationTest extends Logging {
@@ -897,4 +930,13 @@ class TestFileSystem extends org.apache.hadoop.fs.LocalFileSystem {
     // Ignore the scheme for testing.
     super.copyToLocalFile(new Path(src.toUri.getPath), dst)
   }
+
+  override def globStatus(pathPattern: Path): Array[FileStatus] = {
+    val newPath = new Path(pathPattern.toUri.getPath)
+    super.globStatus(newPath).map { status =>
+      val path = s"s3a://${status.getPath.toUri.getPath}"
+      status.setPath(new Path(path))
+      status
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/d10c9dc3/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala
----------------------------------------------------------------------
diff --git a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala
index b7237a6..004348c 100644
--- a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala
+++ b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala
@@ -1066,7 +1066,7 @@ class SparkILoop(
       logWarning("ADD_JARS environment variable is deprecated, use --jar spark submit argument
instead")
     }
     val jars = {
-      val userJars = Utils.getUserJars(conf, isShell = true)
+      val userJars = Utils.getLocalUserJarsForShell(conf)
       if (userJars.isEmpty) {
         envJars.getOrElse("")
       } else {

http://git-wip-us.apache.org/repos/asf/spark/blob/d10c9dc3/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala
----------------------------------------------------------------------
diff --git a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala
index 9702a1e..0b16e1b 100644
--- a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala
+++ b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala
@@ -57,7 +57,7 @@ object Main extends Logging {
   // Visible for testing
   private[repl] def doMain(args: Array[String], _interp: SparkILoop): Unit = {
     interp = _interp
-    val jars = Utils.getUserJars(conf, isShell = true)
+    val jars = Utils.getLocalUserJarsForShell(conf)
       // Remove file:///, file:// or file:/ scheme if exists for each jar
       .map { x => if (x.startsWith("file:")) new File(new URI(x)).getPath else x }
       .mkString(File.pathSeparator)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message