spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From van...@apache.org
Subject spark git commit: [SPARK-23729][CORE] Respect URI fragment when resolving globs
Date Thu, 22 Mar 2018 00:13:25 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-2.3 4b9f33ff7 -> c9acd46be


[SPARK-23729][CORE] Respect URI fragment when resolving globs

Firstly, glob resolution will not result in swallowing the remote name part (that is preceded
by the `#` sign) in case of `--files` or `--archives` options

Moreover in the special case of multiple resolutions when the remote naming does not make
sense and error is returned.

Enhanced current test and wrote additional test for the error case

Author: Mihaly Toth <misutoth@gmail.com>

Closes #20853 from misutoth/glob-with-remote-name.

(cherry picked from commit 0604beaff2baa2d0fed86c0c87fd2a16a1838b5f)
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c9acd46b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c9acd46b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c9acd46b

Branch: refs/heads/branch-2.3
Commit: c9acd46bed8fa3e410e8a44aafe3237e59deaa73
Parents: 4b9f33f
Author: Mihaly Toth <misutoth@gmail.com>
Authored: Wed Mar 21 17:05:39 2018 -0700
Committer: Marcelo Vanzin <vanzin@cloudera.com>
Committed: Wed Mar 21 17:12:57 2018 -0700

----------------------------------------------------------------------
 .../apache/spark/deploy/DependencyUtils.scala   | 34 +++++++++++-----
 .../org/apache/spark/deploy/SparkSubmit.scala   | 13 +++++++
 .../apache/spark/deploy/SparkSubmitSuite.scala  | 41 ++++++++++++++++----
 3 files changed, 72 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c9acd46b/core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala b/core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala
index ecc82d7..ab319c8 100644
--- a/core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala
@@ -18,12 +18,13 @@
 package org.apache.spark.deploy
 
 import java.io.File
+import java.net.URI
 
 import org.apache.commons.lang3.StringUtils
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 
-import org.apache.spark.{SecurityManager, SparkConf}
+import org.apache.spark.{SecurityManager, SparkConf, SparkException}
 import org.apache.spark.util.{MutableURLClassLoader, Utils}
 
 private[deploy] object DependencyUtils {
@@ -137,16 +138,31 @@ private[deploy] object DependencyUtils {
   def resolveGlobPaths(paths: String, hadoopConf: Configuration): String = {
     require(paths != null, "paths cannot be null.")
     Utils.stringToSeq(paths).flatMap { path =>
-      val uri = Utils.resolveURI(path)
-      uri.getScheme match {
-        case "local" | "http" | "https" | "ftp" => Array(path)
-        case _ =>
-          val fs = FileSystem.get(uri, hadoopConf)
-          Option(fs.globStatus(new Path(uri))).map { status =>
-            status.filter(_.isFile).map(_.getPath.toUri.toString)
-          }.getOrElse(Array(path))
+      val (base, fragment) = splitOnFragment(path)
+      (resolveGlobPath(base, hadoopConf), fragment) match {
+        case (resolved, Some(_)) if resolved.length > 1 => throw new SparkException(
+            s"${base.toString} resolves ambiguously to multiple files: ${resolved.mkString(",")}")
+        case (resolved, Some(namedAs)) => resolved.map(_ + "#" + namedAs)
+        case (resolved, _) => resolved
       }
     }.mkString(",")
   }
 
+  private def splitOnFragment(path: String): (URI, Option[String]) = {
+    val uri = Utils.resolveURI(path)
+    val withoutFragment = new URI(uri.getScheme, uri.getSchemeSpecificPart, null)
+    (withoutFragment, Option(uri.getFragment))
+  }
+
+  private def resolveGlobPath(uri: URI, hadoopConf: Configuration): Array[String] = {
+    uri.getScheme match {
+      case "local" | "http" | "https" | "ftp" => Array(uri.toString)
+      case _ =>
+        val fs = FileSystem.get(uri, hadoopConf)
+        Option(fs.globStatus(new Path(uri))).map { status =>
+          status.filter(_.isFile).map(_.getPath.toUri.toString)
+        }.getOrElse(Array(uri.toString))
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/c9acd46b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 1e38196..b44c880 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -245,6 +245,19 @@ object SparkSubmit extends CommandLineUtils with Logging {
       args: SparkSubmitArguments,
       conf: Option[HadoopConfiguration] = None)
       : (Seq[String], Seq[String], SparkConf, String) = {
+    try {
+      doPrepareSubmitEnvironment(args, conf)
+    } catch {
+      case e: SparkException =>
+        printErrorAndExit(e.getMessage)
+        throw e
+    }
+  }
+
+  private def doPrepareSubmitEnvironment(
+      args: SparkSubmitArguments,
+      conf: Option[HadoopConfiguration] = None)
+      : (Seq[String], Seq[String], SparkConf, String) = {
     // Return values
     val childArgs = new ArrayBuffer[String]()
     val childClasspath = new ArrayBuffer[String]()

http://git-wip-us.apache.org/repos/asf/spark/blob/c9acd46b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index 27dd435..feab888 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.deploy
 import java.io._
 import java.net.URI
 import java.nio.charset.StandardCharsets
-import java.nio.file.Files
+import java.nio.file.{Files, Paths}
 
 import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
@@ -606,10 +606,13 @@ class SparkSubmitSuite
   }
 
   test("resolves command line argument paths correctly") {
-    val jars = "/jar1,/jar2"                 // --jars
-    val files = "local:/file1,file2"          // --files
-    val archives = "file:/archive1,archive2" // --archives
-    val pyFiles = "py-file1,py-file2"        // --py-files
+    val dir = Utils.createTempDir()
+    val archive = Paths.get(dir.toPath.toString, "single.zip")
+    Files.createFile(archive)
+    val jars = "/jar1,/jar2"
+    val files = "local:/file1,file2"
+    val archives = s"file:/archive1,${dir.toPath.toAbsolutePath.toString}/*.zip#archive3"
+    val pyFiles = "py-file1,py-file2"
 
     // Test jars and files
     val clArgs = Seq(
@@ -636,9 +639,10 @@ class SparkSubmitSuite
     val appArgs2 = new SparkSubmitArguments(clArgs2)
     val (_, _, conf2, _) = SparkSubmit.prepareSubmitEnvironment(appArgs2)
     appArgs2.files should be (Utils.resolveURIs(files))
-    appArgs2.archives should be (Utils.resolveURIs(archives))
+    appArgs2.archives should fullyMatch regex ("file:/archive1,file:.*#archive3")
     conf2.get("spark.yarn.dist.files") should be (Utils.resolveURIs(files))
-    conf2.get("spark.yarn.dist.archives") should be (Utils.resolveURIs(archives))
+    conf2.get("spark.yarn.dist.archives") should fullyMatch regex
+      ("file:/archive1,file:.*#archive3")
 
     // Test python files
     val clArgs3 = Seq(
@@ -657,6 +661,29 @@ class SparkSubmitSuite
     conf3.get(PYSPARK_PYTHON.key) should be ("python3.5")
   }
 
+  test("ambiguous archive mapping results in error message") {
+    val dir = Utils.createTempDir()
+    val archive1 = Paths.get(dir.toPath.toString, "first.zip")
+    val archive2 = Paths.get(dir.toPath.toString, "second.zip")
+    Files.createFile(archive1)
+    Files.createFile(archive2)
+    val jars = "/jar1,/jar2"
+    val files = "local:/file1,file2"
+    val archives = s"file:/archive1,${dir.toPath.toAbsolutePath.toString}/*.zip#archive3"
+    val pyFiles = "py-file1,py-file2"
+
+    // Test files and archives (Yarn)
+    val clArgs2 = Seq(
+      "--master", "yarn",
+      "--class", "org.SomeClass",
+      "--files", files,
+      "--archives", archives,
+      "thejar.jar"
+    )
+
+    testPrematureExit(clArgs2.toArray, "resolves ambiguously to multiple files")
+  }
+
   test("resolves config paths correctly") {
     val jars = "/jar1,/jar2" // spark.jars
     val files = "local:/file1,file2" // spark.files / spark.yarn.dist.files


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message