spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From js...@apache.org
Subject spark git commit: [SPARK-24646][CORE] Minor change to spark.yarn.dist.forceDownloadSchemes to support wildcard '*'
Date Mon, 09 Jul 2018 02:21:48 GMT
Repository: spark
Updated Branches:
  refs/heads/master 79c668942 -> e2c7e09f7


[SPARK-24646][CORE] Minor change to spark.yarn.dist.forceDownloadSchemes to support wildcard
'*'

## What changes were proposed in this pull request?

In the case of getting tokens via customized `ServiceCredentialProvider`, it is required that
`ServiceCredentialProvider` be available in local spark-submit process classpath. In this
case, all the configured remote sources should be forced to download to local.

For the ease of using this configuration, here propose to add wildcard '*' support to `spark.yarn.dist.forceDownloadSchemes`,
also clarify the usage of this configuration.

## How was this patch tested?

New UT added.

Author: jerryshao <sshao@hortonworks.com>

Closes #21633 from jerryshao/SPARK-21917-followup.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e2c7e09f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e2c7e09f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e2c7e09f

Branch: refs/heads/master
Commit: e2c7e09f742a7e522efd74fe8e14c2620afdb522
Parents: 79c6689
Author: jerryshao <sshao@hortonworks.com>
Authored: Mon Jul 9 10:21:40 2018 +0800
Committer: jerryshao <sshao@hortonworks.com>
Committed: Mon Jul 9 10:21:40 2018 +0800

----------------------------------------------------------------------
 .../org/apache/spark/deploy/SparkSubmit.scala   |  5 ++--
 .../apache/spark/internal/config/package.scala  |  5 ++--
 .../apache/spark/deploy/SparkSubmitSuite.scala  | 29 +++++++++++++-------
 docs/running-on-yarn.md                         |  5 ++--
 4 files changed, 28 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e2c7e09f/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 2da778a..e7310ee 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -385,7 +385,7 @@ private[spark] class SparkSubmit extends Logging {
       val forceDownloadSchemes = sparkConf.get(FORCE_DOWNLOAD_SCHEMES)
 
       def shouldDownload(scheme: String): Boolean = {
-        forceDownloadSchemes.contains(scheme) ||
+        forceDownloadSchemes.contains("*") || forceDownloadSchemes.contains(scheme) ||
           Try { FileSystem.getFileSystemClass(scheme, hadoopConf) }.isFailure
       }
 
@@ -578,7 +578,8 @@ private[spark] class SparkSubmit extends Logging {
     }
     // Add the main application jar and any added jars to classpath in case YARN client
     // requires these jars.
-    // This assumes both primaryResource and user jars are local jars, otherwise it will
not be
+    // This assumes both primaryResource and user jars are local jars, or already downloaded
+    // to local by configuring "spark.yarn.dist.forceDownloadSchemes", otherwise it will
not be
     // added to the classpath of YARN client.
     if (isYarnCluster) {
       if (isUserJar(args.primaryResource)) {

http://git-wip-us.apache.org/repos/asf/spark/blob/e2c7e09f/core/src/main/scala/org/apache/spark/internal/config/package.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index bda9795..ba892bf 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -486,10 +486,11 @@ package object config {
 
   private[spark] val FORCE_DOWNLOAD_SCHEMES =
     ConfigBuilder("spark.yarn.dist.forceDownloadSchemes")
-      .doc("Comma-separated list of schemes for which files will be downloaded to the " +
+      .doc("Comma-separated list of schemes for which resources will be downloaded to the
" +
         "local disk prior to being added to YARN's distributed cache. For use in cases "
+
         "where the YARN service does not support schemes that are supported by Spark, like
http, " +
-        "https and ftp.")
+        "https and ftp, or jars required to be in the local YARN client's classpath. Wildcard
" +
+        "'*' is denoted to download resources for all the schemes.")
       .stringConf
       .toSequence
       .createWithDefault(Nil)

http://git-wip-us.apache.org/repos/asf/spark/blob/e2c7e09f/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index 545c8d0..f829fec 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -995,20 +995,24 @@ class SparkSubmitSuite
   }
 
   test("download remote resource if it is not supported by yarn service") {
-    testRemoteResources(enableHttpFs = false, blacklistHttpFs = false)
+    testRemoteResources(enableHttpFs = false)
   }
 
   test("avoid downloading remote resource if it is supported by yarn service") {
-    testRemoteResources(enableHttpFs = true, blacklistHttpFs = false)
+    testRemoteResources(enableHttpFs = true)
   }
 
   test("force download from blacklisted schemes") {
-    testRemoteResources(enableHttpFs = true, blacklistHttpFs = true)
+    testRemoteResources(enableHttpFs = true, blacklistSchemes = Seq("http"))
+  }
+
+  test("force download for all the schemes") {
+    testRemoteResources(enableHttpFs = true, blacklistSchemes = Seq("*"))
   }
 
   private def testRemoteResources(
       enableHttpFs: Boolean,
-      blacklistHttpFs: Boolean): Unit = {
+      blacklistSchemes: Seq[String] = Nil): Unit = {
     val hadoopConf = new Configuration()
     updateConfWithFakeS3Fs(hadoopConf)
     if (enableHttpFs) {
@@ -1025,8 +1029,8 @@ class SparkSubmitSuite
     val tmpHttpJar = TestUtils.createJarWithFiles(Map("test.resource" -> "USER"), tmpDir)
     val tmpHttpJarPath = s"http://${new File(tmpHttpJar.toURI).getAbsolutePath}"
 
-    val forceDownloadArgs = if (blacklistHttpFs) {
-      Seq("--conf", "spark.yarn.dist.forceDownloadSchemes=http")
+    val forceDownloadArgs = if (blacklistSchemes.nonEmpty) {
+      Seq("--conf", s"spark.yarn.dist.forceDownloadSchemes=${blacklistSchemes.mkString(",")}")
     } else {
       Nil
     }
@@ -1044,14 +1048,19 @@ class SparkSubmitSuite
 
     val jars = conf.get("spark.yarn.dist.jars").split(",").toSet
 
-    // The URI of remote S3 resource should still be remote.
-    assert(jars.contains(tmpS3JarPath))
+    def isSchemeBlacklisted(scheme: String) = {
+      blacklistSchemes.contains("*") || blacklistSchemes.contains(scheme)
+    }
+
+    if (!isSchemeBlacklisted("s3")) {
+      assert(jars.contains(tmpS3JarPath))
+    }
 
-    if (enableHttpFs && !blacklistHttpFs) {
+    if (enableHttpFs && blacklistSchemes.isEmpty) {
       // If Http FS is supported by yarn service, the URI of remote http resource should
       // still be remote.
       assert(jars.contains(tmpHttpJarPath))
-    } else {
+    } else if (!enableHttpFs || isSchemeBlacklisted("http")) {
       // If Http FS is not supported by yarn service, or http scheme is configured to be
force
       // downloading, the URI of remote http resource should be changed to a local one.
       val jarName = new File(tmpHttpJar.toURI).getName

http://git-wip-us.apache.org/repos/asf/spark/blob/e2c7e09f/docs/running-on-yarn.md
----------------------------------------------------------------------
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index 575da72..0b265b0 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -218,9 +218,10 @@ To use a custom metrics.properties for the application master and executors,
upd
   <td><code>spark.yarn.dist.forceDownloadSchemes</code></td>
   <td><code>(none)</code></td>
   <td>
-    Comma-separated list of schemes for which files will be downloaded to the local disk
prior to
+    Comma-separated list of schemes for which resources will be downloaded to the local disk
prior to
     being added to YARN's distributed cache. For use in cases where the YARN service does
not
-    support schemes that are supported by Spark, like http, https and ftp.
+    support schemes that are supported by Spark, like http, https and ftp, or jars required
to be in the
+    local YARN client's classpath. Wildcard '*' is denoted to download resources for all
the schemes.
   </td>
 </tr>
 <tr>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message