spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject git commit: [SPARK-1900 / 1918] PySpark on YARN is broken
Date Sun, 25 May 2014 01:02:26 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-1.0 9be103a76 -> 12f5ecc8b


[SPARK-1900 / 1918] PySpark on YARN is broken

If I run the following on a YARN cluster
```
bin/spark-submit sheep.py --master yarn-client
```
it fails because of a mismatch in paths: `spark-submit` thinks that `sheep.py` resides on
HDFS, and balks when it can't find the file there. A natural workaround is to add the `file:`
prefix to the file:
```
bin/spark-submit file:/path/to/sheep.py --master yarn-client
```
However, this also fails. This time it is because python does not understand URI schemes.

This PR fixes this by automatically resolving all paths passed as command line argument to
`spark-submit` properly. This has the added benefit of keeping file and jar paths consistent
across different cluster modes. For python, we strip the URI scheme before we actually try
to run it.

Much of the code is originally written by @mengxr. Tested on YARN cluster. More tests pending.

Author: Andrew Or <andrewor14@gmail.com>

Closes #853 from andrewor14/submit-paths and squashes the following commits:

0bb097a [Andrew Or] Format path correctly before adding it to PYTHONPATH
323b45c [Andrew Or] Include --py-files on PYTHONPATH for pyspark shell
3c36587 [Andrew Or] Improve error messages (minor)
854aa6a [Andrew Or] Guard against NPE if user gives pathological paths
6638a6b [Andrew Or] Fix spark-shell jar paths after #849 went in
3bb0359 [Andrew Or] Update more comments (minor)
2a1f8a0 [Andrew Or] Update comments (minor)
6af2c77 [Andrew Or] Merge branch 'master' of github.com:apache/spark into submit-paths
a68c4d1 [Andrew Or] Handle Windows python file path correctly
427a250 [Andrew Or] Resolve paths properly for Windows
a591a4a [Andrew Or] Update tests for resolving URIs
6c8621c [Andrew Or] Move resolveURIs to Utils
db8255e [Andrew Or] Merge branch 'master' of github.com:apache/spark into submit-paths
f542dce [Andrew Or] Fix outdated tests
691c4ce [Andrew Or] Ignore special primary resource names
5342ac7 [Andrew Or] Add missing space in error message
02f77f3 [Andrew Or] Resolve command line arguments to spark-submit properly

(cherry picked from commit 5081a0a9d47ca31900ea4de570de2cbb0e063105)
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/12f5ecc8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/12f5ecc8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/12f5ecc8

Branch: refs/heads/branch-1.0
Commit: 12f5ecc8b116d8d3c0289e13712fd992c604afb5
Parents: 9be103a
Author: Andrew Or <andrewor14@gmail.com>
Authored: Sat May 24 18:01:49 2014 -0700
Committer: Tathagata Das <tathagata.das1565@gmail.com>
Committed: Sat May 24 18:02:22 2014 -0700

----------------------------------------------------------------------
 .../org/apache/spark/deploy/PythonRunner.scala  | 60 +++++++++++++++--
 .../org/apache/spark/deploy/SparkSubmit.scala   | 25 ++++---
 .../spark/deploy/SparkSubmitArguments.scala     | 33 ++++++---
 .../scala/org/apache/spark/util/Utils.scala     | 71 +++++++++++++++++++-
 .../apache/spark/deploy/PythonRunnerSuite.scala | 61 +++++++++++++++++
 .../apache/spark/deploy/SparkSubmitSuite.scala  | 41 ++++++-----
 .../org/apache/spark/util/UtilsSuite.scala      | 66 +++++++++++++++++-
 python/pyspark/context.py                       |  8 ++-
 .../org/apache/spark/repl/SparkILoop.scala      |  5 +-
 9 files changed, 323 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/12f5ecc8/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
index 2dfa02b..0d6751f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
@@ -17,10 +17,13 @@
 
 package org.apache.spark.deploy
 
+import java.net.URI
+
 import scala.collection.mutable.ArrayBuffer
 import scala.collection.JavaConversions._
 
 import org.apache.spark.api.python.{PythonUtils, RedirectThread}
+import org.apache.spark.util.Utils
 
 /**
  * A main class used by spark-submit to launch Python applications. It executes python as
a
@@ -28,12 +31,15 @@ import org.apache.spark.api.python.{PythonUtils, RedirectThread}
  */
 object PythonRunner {
   def main(args: Array[String]) {
-    val primaryResource = args(0)
+    val pythonFile = args(0)
     val pyFiles = args(1)
     val otherArgs = args.slice(2, args.length)
-
     val pythonExec = sys.env.get("PYSPARK_PYTHON").getOrElse("python") // TODO: get this
from conf
 
+    // Format python file paths before adding them to the PYTHONPATH
+    val formattedPythonFile = formatPath(pythonFile)
+    val formattedPyFiles = formatPaths(pyFiles)
+
     // Launch a Py4J gateway server for the process to connect to; this will let it see our
     // Java system properties and such
     val gatewayServer = new py4j.GatewayServer(null, 0)
@@ -42,13 +48,13 @@ object PythonRunner {
     // Build up a PYTHONPATH that includes the Spark assembly JAR (where this class is),
the
     // python directories in SPARK_HOME (if set), and any files in the pyFiles argument
     val pathElements = new ArrayBuffer[String]
-    pathElements ++= Option(pyFiles).getOrElse("").split(",")
+    pathElements ++= formattedPyFiles
     pathElements += PythonUtils.sparkPythonPath
     pathElements += sys.env.getOrElse("PYTHONPATH", "")
     val pythonPath = PythonUtils.mergePythonPaths(pathElements: _*)
 
     // Launch Python process
-    val builder = new ProcessBuilder(Seq(pythonExec, "-u", primaryResource) ++ otherArgs)
+    val builder = new ProcessBuilder(Seq(pythonExec, "-u", formattedPythonFile) ++ otherArgs)
     val env = builder.environment()
     env.put("PYTHONPATH", pythonPath)
     env.put("PYSPARK_GATEWAY_PORT", "" + gatewayServer.getListeningPort)
@@ -59,4 +65,50 @@ object PythonRunner {
 
     System.exit(process.waitFor())
   }
+
+  /**
+   * Format the python file path so that it can be added to the PYTHONPATH correctly.
+   *
+   * Python does not understand URI schemes in paths. Before adding python files to the
+   * PYTHONPATH, we need to extract the path from the URI. This is safe to do because we
+   * currently only support local python files.
+   */
+  def formatPath(path: String, testWindows: Boolean = false): String = {
+    if (Utils.nonLocalPaths(path, testWindows).nonEmpty) {
+      throw new IllegalArgumentException("Launching Python applications through " +
+        s"spark-submit is currently only supported for local files: $path")
+    }
+    val windows = Utils.isWindows || testWindows
+    var formattedPath = if (windows) Utils.formatWindowsPath(path) else path
+
+    // Strip the URI scheme from the path
+    formattedPath =
+      new URI(formattedPath).getScheme match {
+        case Utils.windowsDrive(d) if windows => formattedPath
+        case null => formattedPath
+        case _ => new URI(formattedPath).getPath
+      }
+
+    // Guard against malformed paths potentially throwing NPE
+    if (formattedPath == null) {
+      throw new IllegalArgumentException(s"Python file path is malformed: $path")
+    }
+
+    // In Windows, the drive should not be prefixed with "/"
+    // For instance, python does not understand "/C:/path/to/sheep.py"
+    formattedPath = if (windows) formattedPath.stripPrefix("/") else formattedPath
+    formattedPath
+  }
+
+  /**
+   * Format each python file path in the comma-delimited list of paths, so it can be
+   * added to the PYTHONPATH correctly.
+   */
+  def formatPaths(paths: String, testWindows: Boolean = false): Array[String] = {
+    Option(paths).getOrElse("")
+      .split(",")
+      .filter(_.nonEmpty)
+      .map { p => formatPath(p, testWindows) }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/12f5ecc8/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index c54331c..7e9a934 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -136,9 +136,9 @@ object SparkSubmit {
         args.childArgs = ArrayBuffer(args.primaryResource, args.pyFiles) ++ args.childArgs
         args.files = mergeFileLists(args.files, args.primaryResource)
       }
-      val pyFiles = Option(args.pyFiles).getOrElse("")
-      args.files = mergeFileLists(args.files, pyFiles)
-      sysProps("spark.submit.pyFiles") = pyFiles
+      args.files = mergeFileLists(args.files, args.pyFiles)
+      // Format python file paths properly before adding them to the PYTHONPATH
+      sysProps("spark.submit.pyFiles") = PythonRunner.formatPaths(args.pyFiles).mkString(",")
     }
 
     // If we're deploying into YARN, use yarn.Client as a wrapper around the user class
@@ -299,13 +299,18 @@ object SparkSubmit {
   }
 
   private def addJarToClasspath(localJar: String, loader: ExecutorURLClassLoader) {
-    val localJarFile = new File(localJar)
-    if (!localJarFile.exists()) {
-      printWarning(s"Jar $localJar does not exist, skipping.")
+    val uri = Utils.resolveURI(localJar)
+    uri.getScheme match {
+      case "file" | "local" =>
+        val file = new File(uri.getPath)
+        if (file.exists()) {
+          loader.addURL(file.toURI.toURL)
+        } else {
+          printWarning(s"Local jar $file does not exist, skipping.")
+        }
+      case _ =>
+        printWarning(s"Skip remote jar $uri.")
     }
-
-    val url = localJarFile.getAbsoluteFile.toURI.toURL
-    loader.addURL(url)
   }
 
   /**
@@ -318,7 +323,7 @@ object SparkSubmit {
   /**
    * Return whether the given primary resource represents a shell.
    */
-  private def isShell(primaryResource: String): Boolean = {
+  private[spark] def isShell(primaryResource: String): Boolean = {
     primaryResource == SPARK_SHELL || primaryResource == PYSPARK_SHELL
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/12f5ecc8/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
index 0cc05fb..bf449af 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
@@ -118,7 +118,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
         mainClass = jar.getManifest.getMainAttributes.getValue("Main-Class")
       } catch {
         case e: Exception =>
-          SparkSubmit.printErrorAndExit("Failed to read JAR: " + primaryResource)
+          SparkSubmit.printErrorAndExit("Cannot load main class from JAR: " + primaryResource)
           return
       }
     }
@@ -148,6 +148,18 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
       SparkSubmit.printErrorAndExit("--py-files given but primary resource is not a Python
script")
     }
 
+    // Require all python files to be local, so we can add them to the PYTHONPATH
+    if (isPython) {
+      if (Utils.nonLocalPaths(primaryResource).nonEmpty) {
+        SparkSubmit.printErrorAndExit(s"Only local python files are supported: $primaryResource")
+      }
+      val nonLocalPyFiles = Utils.nonLocalPaths(pyFiles).mkString(",")
+      if (nonLocalPyFiles.nonEmpty) {
+        SparkSubmit.printErrorAndExit(
+          s"Only local additional python files are supported: $nonLocalPyFiles")
+      }
+    }
+
     if (master.startsWith("yarn")) {
       val hasHadoopEnv = sys.env.contains("HADOOP_CONF_DIR") || sys.env.contains("YARN_CONF_DIR")
       if (!hasHadoopEnv && !Utils.isTesting) {
@@ -263,19 +275,19 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
         parse(tail)
 
       case ("--files") :: value :: tail =>
-        files = value
+        files = Utils.resolveURIs(value)
         parse(tail)
 
       case ("--py-files") :: value :: tail =>
-        pyFiles = value
+        pyFiles = Utils.resolveURIs(value)
         parse(tail)
 
       case ("--archives") :: value :: tail =>
-        archives = value
+        archives = Utils.resolveURIs(value)
         parse(tail)
 
       case ("--jars") :: value :: tail =>
-        jars = value
+        jars = Utils.resolveURIs(value)
         parse(tail)
 
       case ("--help" | "-h") :: tail =>
@@ -296,7 +308,12 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
               val errMessage = s"Unrecognized option '$value'."
               SparkSubmit.printErrorAndExit(errMessage)
             case v =>
-              primaryResource = v
+              primaryResource =
+                if (!SparkSubmit.isShell(v)) {
+                  Utils.resolveURI(v).toString
+                } else {
+                  v
+                }
               inSparkOpts = false
               isPython = SparkSubmit.isPython(v)
               parse(tail)
@@ -327,8 +344,8 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
         |  --name NAME                 A name of your application.
         |  --jars JARS                 Comma-separated list of local jars to include on the
driver
         |                              and executor classpaths.
-        |  --py-files PY_FILES         Comma-separated list of .zip or .egg files to place
on the
-        |                              PYTHONPATH for Python apps.
+        |  --py-files PY_FILES         Comma-separated list of .zip, .egg, or .py files to
place
+        |                              on the PYTHONPATH for Python apps.
         |  --files FILES               Comma-separated list of files to be placed in the
working
         |                              directory of each executor.
         |  --properties-file FILE      Path to a file from which to load extra properties.
If not

http://git-wip-us.apache.org/repos/asf/spark/blob/12f5ecc8/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 0c7cff0..3b1b6df 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -1086,9 +1086,19 @@ private[spark] object Utils extends Logging {
   }
 
   /**
-   * Return true if this is Windows.
+   * Whether the underlying operating system is Windows.
    */
-  def isWindows = SystemUtils.IS_OS_WINDOWS
+  val isWindows = SystemUtils.IS_OS_WINDOWS
+
+  /**
+   * Pattern for matching a Windows drive, which contains only a single alphabet character.
+   */
+  val windowsDrive = "([a-zA-Z])".r
+
+  /**
+   * Format a Windows path such that it can be safely passed to a URI.
+   */
+  def formatWindowsPath(path: String): String = path.replace("\\", "/")
 
   /**
    * Indicates whether Spark is currently running unit tests.
@@ -1166,4 +1176,61 @@ private[spark] object Utils extends Logging {
         true
     }
   }
+
+  /**
+   * Return a well-formed URI for the file described by a user input string.
+   *
+   * If the supplied path does not contain a scheme, or is a relative path, it will be
+   * converted into an absolute path with a file:// scheme.
+   */
+  def resolveURI(path: String, testWindows: Boolean = false): URI = {
+
+    // In Windows, the file separator is a backslash, but this is inconsistent with the URI
format
+    val windows = isWindows || testWindows
+    val formattedPath = if (windows) formatWindowsPath(path) else path
+
+    val uri = new URI(formattedPath)
+    if (uri.getPath == null) {
+      throw new IllegalArgumentException(s"Given path is malformed: $uri")
+    }
+    uri.getScheme match {
+      case windowsDrive(d) if windows =>
+        new URI("file:/" + uri.toString.stripPrefix("/"))
+      case null =>
+        // Preserve fragments for HDFS file name substitution (denoted by "#")
+        // For instance, in "abc.py#xyz.py", "xyz.py" is the name observed by the application
+        val fragment = uri.getFragment
+        val part = new File(uri.getPath).toURI
+        new URI(part.getScheme, part.getPath, fragment)
+      case _ =>
+        uri
+    }
+  }
+
+  /** Resolve a comma-separated list of paths. */
+  def resolveURIs(paths: String, testWindows: Boolean = false): String = {
+    if (paths == null || paths.trim.isEmpty) {
+      ""
+    } else {
+      paths.split(",").map { p => Utils.resolveURI(p, testWindows) }.mkString(",")
+    }
+  }
+
+  /** Return all non-local paths from a comma-separated list of paths. */
+  def nonLocalPaths(paths: String, testWindows: Boolean = false): Array[String] = {
+    val windows = isWindows || testWindows
+    if (paths == null || paths.trim.isEmpty) {
+      Array.empty
+    } else {
+      paths.split(",").filter { p =>
+        val formattedPath = if (windows) formatWindowsPath(p) else p
+        new URI(formattedPath).getScheme match {
+          case windowsDrive(d) if windows => false
+          case "local" | "file" | null => false
+          case _ => true
+        }
+      }
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/12f5ecc8/core/src/test/scala/org/apache/spark/deploy/PythonRunnerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/PythonRunnerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/PythonRunnerSuite.scala
new file mode 100644
index 0000000..bb6251f
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/deploy/PythonRunnerSuite.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy
+
+import org.scalatest.FunSuite
+
+class PythonRunnerSuite extends FunSuite {
+
+  // Test formatting a single path to be added to the PYTHONPATH
+  test("format path") {
+    assert(PythonRunner.formatPath("spark.py") === "spark.py")
+    assert(PythonRunner.formatPath("file:/spark.py") === "/spark.py")
+    assert(PythonRunner.formatPath("file:///spark.py") === "/spark.py")
+    assert(PythonRunner.formatPath("local:/spark.py") === "/spark.py")
+    assert(PythonRunner.formatPath("local:///spark.py") === "/spark.py")
+    assert(PythonRunner.formatPath("C:/a/b/spark.py", testWindows = true) === "C:/a/b/spark.py")
+    assert(PythonRunner.formatPath("/C:/a/b/spark.py", testWindows = true) === "C:/a/b/spark.py")
+    assert(PythonRunner.formatPath("file:/C:/a/b/spark.py", testWindows = true) ===
+      "C:/a/b/spark.py")
+    intercept[IllegalArgumentException] { PythonRunner.formatPath("one:two") }
+    intercept[IllegalArgumentException] { PythonRunner.formatPath("hdfs:s3:xtremeFS") }
+    intercept[IllegalArgumentException] { PythonRunner.formatPath("hdfs:/path/to/some.py")
}
+  }
+
+  // Test formatting multiple comma-separated paths to be added to the PYTHONPATH
+  test("format paths") {
+    assert(PythonRunner.formatPaths("spark.py") === Array("spark.py"))
+    assert(PythonRunner.formatPaths("file:/spark.py") === Array("/spark.py"))
+    assert(PythonRunner.formatPaths("file:/app.py,local:/spark.py") ===
+      Array("/app.py", "/spark.py"))
+    assert(PythonRunner.formatPaths("me.py,file:/you.py,local:/we.py") ===
+      Array("me.py", "/you.py", "/we.py"))
+    assert(PythonRunner.formatPaths("C:/a/b/spark.py", testWindows = true) ===
+      Array("C:/a/b/spark.py"))
+    assert(PythonRunner.formatPaths("/C:/a/b/spark.py", testWindows = true) ===
+      Array("C:/a/b/spark.py"))
+    assert(PythonRunner.formatPaths("C:/free.py,pie.py", testWindows = true) ===
+      Array("C:/free.py", "pie.py"))
+    assert(PythonRunner.formatPaths("lovely.py,C:/free.py,file:/d:/fry.py", testWindows =
true) ===
+      Array("lovely.py", "C:/free.py", "d:/fry.py"))
+    intercept[IllegalArgumentException] { PythonRunner.formatPaths("one:two,three") }
+    intercept[IllegalArgumentException] { PythonRunner.formatPaths("two,three,four:five:six")
}
+    intercept[IllegalArgumentException] { PythonRunner.formatPaths("hdfs:/some.py,foo.py")
}
+    intercept[IllegalArgumentException] { PythonRunner.formatPaths("foo.py,hdfs:/some.py")
}
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/12f5ecc8/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index 6c0deed..02427a4 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -91,7 +91,7 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers {
       "--jars=one.jar,two.jar,three.jar",
       "--name=myApp")
     val appArgs = new SparkSubmitArguments(clArgs)
-    appArgs.jars should be ("one.jar,two.jar,three.jar")
+    appArgs.jars should include regex (".*one.jar,.*two.jar,.*three.jar")
     appArgs.name should be ("myApp")
   }
 
@@ -125,17 +125,17 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers {
     val appArgs = new SparkSubmitArguments(clArgs)
     val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
     val childArgsStr = childArgs.mkString(" ")
-    childArgsStr should include ("--jar thejar.jar")
     childArgsStr should include ("--class org.SomeClass")
-    childArgsStr should include ("--addJars one.jar,two.jar,three.jar")
     childArgsStr should include ("--executor-memory 5g")
     childArgsStr should include ("--driver-memory 4g")
     childArgsStr should include ("--executor-cores 5")
     childArgsStr should include ("--arg arg1 --arg arg2")
     childArgsStr should include ("--queue thequeue")
-    childArgsStr should include ("--files file1.txt,file2.txt")
-    childArgsStr should include ("--archives archive1.txt,archive2.txt")
     childArgsStr should include ("--num-executors 6")
+    childArgsStr should include regex ("--jar .*thejar.jar")
+    childArgsStr should include regex ("--addJars .*one.jar,.*two.jar,.*three.jar")
+    childArgsStr should include regex ("--files .*file1.txt,.*file2.txt")
+    childArgsStr should include regex ("--archives .*archive1.txt,.*archive2.txt")
     mainClass should be ("org.apache.spark.deploy.yarn.Client")
     classpath should have length (0)
     sysProps("spark.app.name") should be ("beauty")
@@ -162,18 +162,19 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers {
     val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
     childArgs.mkString(" ") should be ("arg1 arg2")
     mainClass should be ("org.SomeClass")
-    classpath should contain ("thejar.jar")
-    classpath should contain ("one.jar")
-    classpath should contain ("two.jar")
-    classpath should contain ("three.jar")
+    classpath should have length (4)
+    classpath(0) should endWith ("thejar.jar")
+    classpath(1) should endWith ("one.jar")
+    classpath(2) should endWith ("two.jar")
+    classpath(3) should endWith ("three.jar")
     sysProps("spark.app.name") should be ("trill")
-    sysProps("spark.jars") should be ("one.jar,two.jar,three.jar,thejar.jar")
     sysProps("spark.executor.memory") should be ("5g")
     sysProps("spark.executor.cores") should be ("5")
     sysProps("spark.yarn.queue") should be ("thequeue")
-    sysProps("spark.yarn.dist.files") should be ("file1.txt,file2.txt")
-    sysProps("spark.yarn.dist.archives") should be ("archive1.txt,archive2.txt")
     sysProps("spark.executor.instances") should be ("6")
+    sysProps("spark.yarn.dist.files") should include regex (".*file1.txt,.*file2.txt")
+    sysProps("spark.yarn.dist.archives") should include regex (".*archive1.txt,.*archive2.txt")
+    sysProps("spark.jars") should include regex (".*one.jar,.*two.jar,.*three.jar,.*thejar.jar")
     sysProps("SPARK_SUBMIT") should be ("true")
   }
 
@@ -190,11 +191,13 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers {
     val appArgs = new SparkSubmitArguments(clArgs)
     val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
     val childArgsStr = childArgs.mkString(" ")
-    childArgsStr.startsWith("--memory 4g --cores 5 --supervise") should be (true)
-    childArgsStr should include ("launch spark://h:p thejar.jar org.SomeClass arg1 arg2")
+    childArgsStr should startWith ("--memory 4g --cores 5 --supervise")
+    childArgsStr should include regex ("launch spark://h:p .*thejar.jar org.SomeClass arg1
arg2")
     mainClass should be ("org.apache.spark.deploy.Client")
-    classpath should have length (0)
-    sysProps should have size (2) // contains --jar entry and SPARK_SUBMIT
+    classpath should have size (0)
+    sysProps should have size (2)
+    sysProps.keys should contain ("spark.jars")
+    sysProps.keys should contain ("SPARK_SUBMIT")
   }
 
   test("handles standalone client mode") {
@@ -211,7 +214,8 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers {
     val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
     childArgs.mkString(" ") should be ("arg1 arg2")
     mainClass should be ("org.SomeClass")
-    classpath should contain ("thejar.jar")
+    classpath should have length (1)
+    classpath(0) should endWith ("thejar.jar")
     sysProps("spark.executor.memory") should be ("5g")
     sysProps("spark.cores.max") should be ("5")
   }
@@ -230,7 +234,8 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers {
     val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
     childArgs.mkString(" ") should be ("arg1 arg2")
     mainClass should be ("org.SomeClass")
-    classpath should contain ("thejar.jar")
+    classpath should have length (1)
+    classpath(0) should endWith ("thejar.jar")
     sysProps("spark.executor.memory") should be ("5g")
     sysProps("spark.cores.max") should be ("5")
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/12f5ecc8/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index cf9e20d..0aad882 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.util
 import scala.util.Random
 
 import java.io.{File, ByteArrayOutputStream, ByteArrayInputStream, FileOutputStream}
+import java.net.URI
 import java.nio.{ByteBuffer, ByteOrder}
 
 import com.google.common.base.Charsets
@@ -168,5 +169,68 @@ class UtilsSuite extends FunSuite {
     assert(result.size.equals(1))
     assert(result(0).getCanonicalPath.equals(child1.getCanonicalPath))
   }
-}
 
+  test("resolveURI") {
+    def assertResolves(before: String, after: String, testWindows: Boolean = false): Unit
= {
+      assume(before.split(",").length == 1)
+      assert(Utils.resolveURI(before, testWindows) === new URI(after))
+      assert(Utils.resolveURI(after, testWindows) === new URI(after))
+      assert(new URI(Utils.resolveURIs(before, testWindows)) === new URI(after))
+      assert(new URI(Utils.resolveURIs(after, testWindows)) === new URI(after))
+    }
+    val cwd = System.getProperty("user.dir")
+    assertResolves("hdfs:/root/spark.jar", "hdfs:/root/spark.jar")
+    assertResolves("hdfs:///root/spark.jar#app.jar", "hdfs:/root/spark.jar#app.jar")
+    assertResolves("spark.jar", s"file:$cwd/spark.jar")
+    assertResolves("spark.jar#app.jar", s"file:$cwd/spark.jar#app.jar")
+    assertResolves("C:/path/to/file.txt", "file:/C:/path/to/file.txt", testWindows = true)
+    assertResolves("C:\\path\\to\\file.txt", "file:/C:/path/to/file.txt", testWindows = true)
+    assertResolves("file:/C:/path/to/file.txt", "file:/C:/path/to/file.txt", testWindows
= true)
+    assertResolves("file:///C:/path/to/file.txt", "file:/C:/path/to/file.txt", testWindows
= true)
+    assertResolves("file:/C:/file.txt#alias.txt", "file:/C:/file.txt#alias.txt", testWindows
= true)
+    intercept[IllegalArgumentException] { Utils.resolveURI("file:foo") }
+    intercept[IllegalArgumentException] { Utils.resolveURI("file:foo:baby") }
+
+    // Test resolving comma-delimited paths
+    assert(Utils.resolveURIs("jar1,jar2") === s"file:$cwd/jar1,file:$cwd/jar2")
+    assert(Utils.resolveURIs("file:/jar1,file:/jar2") === "file:/jar1,file:/jar2")
+    assert(Utils.resolveURIs("hdfs:/jar1,file:/jar2,jar3") ===
+      s"hdfs:/jar1,file:/jar2,file:$cwd/jar3")
+    assert(Utils.resolveURIs("hdfs:/jar1,file:/jar2,jar3,jar4#jar5") ===
+      s"hdfs:/jar1,file:/jar2,file:$cwd/jar3,file:$cwd/jar4#jar5")
+    assert(Utils.resolveURIs("hdfs:/jar1,file:/jar2,jar3,C:\\pi.py#py.pi", testWindows =
true) ===
+      s"hdfs:/jar1,file:/jar2,file:$cwd/jar3,file:/C:/pi.py#py.pi")
+  }
+
+  test("nonLocalPaths") {
+    assert(Utils.nonLocalPaths("spark.jar") === Array.empty)
+    assert(Utils.nonLocalPaths("file:/spark.jar") === Array.empty)
+    assert(Utils.nonLocalPaths("file:///spark.jar") === Array.empty)
+    assert(Utils.nonLocalPaths("local:/spark.jar") === Array.empty)
+    assert(Utils.nonLocalPaths("local:///spark.jar") === Array.empty)
+    assert(Utils.nonLocalPaths("hdfs:/spark.jar") === Array("hdfs:/spark.jar"))
+    assert(Utils.nonLocalPaths("hdfs:///spark.jar") === Array("hdfs:///spark.jar"))
+    assert(Utils.nonLocalPaths("file:/spark.jar,local:/smart.jar,family.py") === Array.empty)
+    assert(Utils.nonLocalPaths("local:/spark.jar,file:/smart.jar,family.py") === Array.empty)
+    assert(Utils.nonLocalPaths("hdfs:/spark.jar,s3:/smart.jar") ===
+      Array("hdfs:/spark.jar", "s3:/smart.jar"))
+    assert(Utils.nonLocalPaths("hdfs:/spark.jar,s3:/smart.jar,local.py,file:/hello/pi.py")
===
+      Array("hdfs:/spark.jar", "s3:/smart.jar"))
+    assert(Utils.nonLocalPaths("local.py,hdfs:/spark.jar,file:/hello/pi.py,s3:/smart.jar")
===
+      Array("hdfs:/spark.jar", "s3:/smart.jar"))
+
+    // Test Windows paths
+    assert(Utils.nonLocalPaths("C:/some/path.jar", testWindows = true) === Array.empty)
+    assert(Utils.nonLocalPaths("file:/C:/some/path.jar", testWindows = true) === Array.empty)
+    assert(Utils.nonLocalPaths("file:///C:/some/path.jar", testWindows = true) === Array.empty)
+    assert(Utils.nonLocalPaths("local:/C:/some/path.jar", testWindows = true) === Array.empty)
+    assert(Utils.nonLocalPaths("local:///C:/some/path.jar", testWindows = true) === Array.empty)
+    assert(Utils.nonLocalPaths("hdfs:/a.jar,C:/my.jar,s3:/another.jar", testWindows = true)
===
+      Array("hdfs:/a.jar", "s3:/another.jar"))
+    assert(Utils.nonLocalPaths("D:/your.jar,hdfs:/a.jar,s3:/another.jar", testWindows = true)
===
+      Array("hdfs:/a.jar", "s3:/another.jar"))
+    assert(Utils.nonLocalPaths("hdfs:/a.jar,s3:/another.jar,e:/our.jar", testWindows = true)
===
+      Array("hdfs:/a.jar", "s3:/another.jar"))
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/12f5ecc8/python/pyspark/context.py
----------------------------------------------------------------------
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index c9ff82d..27b440d 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -159,10 +159,14 @@ class SparkContext(object):
             self.addPyFile(path)
 
         # Deploy code dependencies set by spark-submit; these will already have been added
-        # with SparkContext.addFile, so we just need to add them
+        # with SparkContext.addFile, so we just need to add them to the PYTHONPATH
         for path in self._conf.get("spark.submit.pyFiles", "").split(","):
             if path != "":
-                self._python_includes.append(os.path.basename(path))
+                (dirname, filename) = os.path.split(path)
+                self._python_includes.append(filename)
+                sys.path.append(path)
+                if not dirname in sys.path:
+                    sys.path.append(dirname)
 
         # Create a temporary directory inside spark.local.dir:
         local_dir = self._jvm.org.apache.spark.util.Utils.getLocalDir(self._jsc.sc().conf())

http://git-wip-us.apache.org/repos/asf/spark/blob/12f5ecc8/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala
----------------------------------------------------------------------
diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala
index 5f34362..e1db4d5 100644
--- a/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala
@@ -942,7 +942,7 @@ class SparkILoop(in0: Option[BufferedReader], protected val out: JPrintWriter,
 
   def createSparkContext(): SparkContext = {
     val execUri = System.getenv("SPARK_EXECUTOR_URI")
-    val jars = SparkILoop.getAddedJars.map(new java.io.File(_).getAbsolutePath)
+    val jars = SparkILoop.getAddedJars
     val conf = new SparkConf()
       .setMaster(getMaster())
       .setAppName("Spark shell")
@@ -997,7 +997,8 @@ object SparkILoop {
     val propJars = sys.props.get("spark.jars").flatMap { p =>
       if (p == "") None else Some(p)
     }
-    propJars.orElse(envJars).map(_.split(",")).getOrElse(Array.empty)
+    val jars = propJars.orElse(envJars).getOrElse("")
+    Utils.resolveURIs(jars).split(",").filter(_.nonEmpty)
   }
 
   // Designed primarily for use by test code: take a String with a


Mime
View raw message