spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yli...@apache.org
Subject spark git commit: [SPARK-17585][PYSPARK][CORE] PySpark SparkContext.addFile supports adding files recursively
Date Wed, 21 Sep 2016 08:37:12 GMT
Repository: spark
Updated Branches:
  refs/heads/master 61876a427 -> d3b886976


[SPARK-17585][PYSPARK][CORE] PySpark SparkContext.addFile supports adding files recursively

## What changes were proposed in this pull request?
Users would like to add a directory as dependency in some cases, they can use ```SparkContext.addFile```
with argument ```recursive=true``` to recursively add all files under the directory by using
Scala. But Python users can only add file not directory, we should also make it supported.

## How was this patch tested?
Unit test.

Author: Yanbo Liang <ybliang8@gmail.com>

Closes #15140 from yanboliang/spark-17585.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d3b88697
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d3b88697
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d3b88697

Branch: refs/heads/master
Commit: d3b88697638dcf32854fe21a6c53dfb3782773b9
Parents: 61876a4
Author: Yanbo Liang <ybliang8@gmail.com>
Authored: Wed Sep 21 01:37:03 2016 -0700
Committer: Yanbo Liang <ybliang8@gmail.com>
Committed: Wed Sep 21 01:37:03 2016 -0700

----------------------------------------------------------------------
 .../spark/api/java/JavaSparkContext.scala       | 13 +++++++++++++
 python/pyspark/context.py                       |  7 +++++--
 python/pyspark/tests.py                         | 20 +++++++++++++++-----
 python/test_support/hello.txt                   |  1 -
 python/test_support/hello/hello.txt             |  1 +
 .../test_support/hello/sub_hello/sub_hello.txt  |  1 +
 6 files changed, 35 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d3b88697/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
index 131f36f..4e50c26 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
@@ -670,6 +670,19 @@ class JavaSparkContext(val sc: SparkContext)
   }
 
   /**
+   * Add a file to be downloaded with this Spark job on every node.
+   * The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported
+   * filesystems), or an HTTP, HTTPS or FTP URI.  To access the file in Spark jobs,
+   * use `SparkFiles.get(fileName)` to find its download location.
+   *
+   * A directory can be given if the recursive option is set to true. Currently directories
are only
+   * supported for Hadoop-supported filesystems.
+   */
+  def addFile(path: String, recursive: Boolean): Unit = {
+    sc.addFile(path, recursive)
+  }
+
+  /**
    * Adds a JAR dependency for all tasks to be executed on this SparkContext in the future.
    * The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported
    * filesystems), or an HTTP, HTTPS or FTP URI.

http://git-wip-us.apache.org/repos/asf/spark/blob/d3b88697/python/pyspark/context.py
----------------------------------------------------------------------
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index 5c32f8e..7a7f59c 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -767,7 +767,7 @@ class SparkContext(object):
         SparkContext._next_accum_id += 1
         return Accumulator(SparkContext._next_accum_id - 1, value, accum_param)
 
-    def addFile(self, path):
+    def addFile(self, path, recursive=False):
         """
         Add a file to be downloaded with this Spark job on every node.
         The C{path} passed can be either a local file, a file in HDFS
@@ -778,6 +778,9 @@ class SparkContext(object):
         L{SparkFiles.get(fileName)<pyspark.files.SparkFiles.get>} with the
         filename to find its download location.
 
+        A directory can be given if the recursive option is set to True.
+        Currently directories are only supported for Hadoop-supported filesystems.
+
         >>> from pyspark import SparkFiles
         >>> path = os.path.join(tempdir, "test.txt")
         >>> with open(path, "w") as testFile:
@@ -790,7 +793,7 @@ class SparkContext(object):
         >>> sc.parallelize([1, 2, 3, 4]).mapPartitions(func).collect()
         [100, 200, 300, 400]
         """
-        self._jsc.sc().addFile(path)
+        self._jsc.sc().addFile(path, recursive)
 
     def addPyFile(self, path):
         """

http://git-wip-us.apache.org/repos/asf/spark/blob/d3b88697/python/pyspark/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 0a029b6..b075691 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -409,13 +409,23 @@ class AddFileTests(PySparkTestCase):
         self.assertEqual("Hello World!", res)
 
     def test_add_file_locally(self):
-        path = os.path.join(SPARK_HOME, "python/test_support/hello.txt")
+        path = os.path.join(SPARK_HOME, "python/test_support/hello/hello.txt")
         self.sc.addFile(path)
         download_path = SparkFiles.get("hello.txt")
         self.assertNotEqual(path, download_path)
         with open(download_path) as test_file:
             self.assertEqual("Hello World!\n", test_file.readline())
 
+    def test_add_file_recursively_locally(self):
+        path = os.path.join(SPARK_HOME, "python/test_support/hello")
+        self.sc.addFile(path, True)
+        download_path = SparkFiles.get("hello")
+        self.assertNotEqual(path, download_path)
+        with open(download_path + "/hello.txt") as test_file:
+            self.assertEqual("Hello World!\n", test_file.readline())
+        with open(download_path + "/sub_hello/sub_hello.txt") as test_file:
+            self.assertEqual("Sub Hello World!\n", test_file.readline())
+
     def test_add_py_file_locally(self):
         # To ensure that we're actually testing addPyFile's effects, check that
         # this fails due to `userlibrary` not being on the Python path:
@@ -514,7 +524,7 @@ class RDDTests(ReusedPySparkTestCase):
 
     def test_cartesian_on_textfile(self):
         # Regression test for
-        path = os.path.join(SPARK_HOME, "python/test_support/hello.txt")
+        path = os.path.join(SPARK_HOME, "python/test_support/hello/hello.txt")
         a = self.sc.textFile(path)
         result = a.cartesian(a).collect()
         (x, y) = result[0]
@@ -751,7 +761,7 @@ class RDDTests(ReusedPySparkTestCase):
         b = b._reserialize(MarshalSerializer())
         self.assertEqual(a.zip(b).collect(), [(0, 100), (1, 101), (2, 102), (3, 103), (4,
104)])
         # regression test for SPARK-4841
-        path = os.path.join(SPARK_HOME, "python/test_support/hello.txt")
+        path = os.path.join(SPARK_HOME, "python/test_support/hello/hello.txt")
         t = self.sc.textFile(path)
         cnt = t.count()
         self.assertEqual(cnt, t.zip(t).count())
@@ -1214,7 +1224,7 @@ class InputFormatTests(ReusedPySparkTestCase):
         ei = [(1, u'aa'), (1, u'aa'), (2, u'aa'), (2, u'bb'), (2, u'bb'), (3, u'cc')]
         self.assertEqual(ints, ei)
 
-        hellopath = os.path.join(SPARK_HOME, "python/test_support/hello.txt")
+        hellopath = os.path.join(SPARK_HOME, "python/test_support/hello/hello.txt")
         oldconf = {"mapred.input.dir": hellopath}
         hello = self.sc.hadoopRDD("org.apache.hadoop.mapred.TextInputFormat",
                                   "org.apache.hadoop.io.LongWritable",
@@ -1233,7 +1243,7 @@ class InputFormatTests(ReusedPySparkTestCase):
         ei = [(1, u'aa'), (1, u'aa'), (2, u'aa'), (2, u'bb'), (2, u'bb'), (3, u'cc')]
         self.assertEqual(ints, ei)
 
-        hellopath = os.path.join(SPARK_HOME, "python/test_support/hello.txt")
+        hellopath = os.path.join(SPARK_HOME, "python/test_support/hello/hello.txt")
         newconf = {"mapred.input.dir": hellopath}
         hello = self.sc.newAPIHadoopRDD("org.apache.hadoop.mapreduce.lib.input.TextInputFormat",
                                         "org.apache.hadoop.io.LongWritable",

http://git-wip-us.apache.org/repos/asf/spark/blob/d3b88697/python/test_support/hello.txt
----------------------------------------------------------------------
diff --git a/python/test_support/hello.txt b/python/test_support/hello.txt
deleted file mode 100755
index 980a0d5..0000000
--- a/python/test_support/hello.txt
+++ /dev/null
@@ -1 +0,0 @@
-Hello World!

http://git-wip-us.apache.org/repos/asf/spark/blob/d3b88697/python/test_support/hello/hello.txt
----------------------------------------------------------------------
diff --git a/python/test_support/hello/hello.txt b/python/test_support/hello/hello.txt
new file mode 100755
index 0000000..980a0d5
--- /dev/null
+++ b/python/test_support/hello/hello.txt
@@ -0,0 +1 @@
+Hello World!

http://git-wip-us.apache.org/repos/asf/spark/blob/d3b88697/python/test_support/hello/sub_hello/sub_hello.txt
----------------------------------------------------------------------
diff --git a/python/test_support/hello/sub_hello/sub_hello.txt b/python/test_support/hello/sub_hello/sub_hello.txt
new file mode 100644
index 0000000..ce2d435
--- /dev/null
+++ b/python/test_support/hello/sub_hello/sub_hello.txt
@@ -0,0 +1 @@
+Sub Hello World!


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message