spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gurwls...@apache.org
Subject spark git commit: [SPARK-24126][PYSPARK] Use build-specific temp directory for pyspark tests.
Date Mon, 07 May 2018 05:00:25 GMT
Repository: spark
Updated Branches:
  refs/heads/master f38ea00e8 -> a634d66ce


[SPARK-24126][PYSPARK] Use build-specific temp directory for pyspark tests.

This avoids polluting and leaving garbage behind in /tmp, and allows the
usual build tools to clean up any leftover files.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #21198 from vanzin/SPARK-24126.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a634d66c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a634d66c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a634d66c

Branch: refs/heads/master
Commit: a634d66ce767bd5e1d8553d1a2c32e2b1a80f642
Parents: f38ea00
Author: Marcelo Vanzin <vanzin@cloudera.com>
Authored: Mon May 7 13:00:18 2018 +0800
Committer: hyukjinkwon <gurwls223@apache.org>
Committed: Mon May 7 13:00:18 2018 +0800

----------------------------------------------------------------------
 python/pyspark/sql/tests.py       |  4 ++--
 python/pyspark/streaming/tests.py |  6 ++++--
 python/pyspark/tests.py           | 33 +++++++++++++++++++++------------
 python/run-tests.py               | 29 +++++++++++++++++++++++++++--
 4 files changed, 54 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a634d66c/python/pyspark/sql/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index cc6acfd..16aa937 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -3092,8 +3092,8 @@ class HiveSparkSubmitTests(SparkSubmitTests):
             |print(hive_context.sql("show databases").collect())
             """)
         proc = subprocess.Popen(
-            [self.sparkSubmit, "--master", "local-cluster[1,1,1024]",
-             "--driver-class-path", hive_site_dir, script],
+            self.sparkSubmit + ["--master", "local-cluster[1,1,1024]",
+                                "--driver-class-path", hive_site_dir, script],
             stdout=subprocess.PIPE)
         out, err = proc.communicate()
         self.assertEqual(0, proc.returncode)

http://git-wip-us.apache.org/repos/asf/spark/blob/a634d66c/python/pyspark/streaming/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py
index d77f1ba..e4a428a 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -63,7 +63,7 @@ class PySparkStreamingTestCase(unittest.TestCase):
         class_name = cls.__name__
         conf = SparkConf().set("spark.default.parallelism", 1)
         cls.sc = SparkContext(appName=class_name, conf=conf)
-        cls.sc.setCheckpointDir("/tmp")
+        cls.sc.setCheckpointDir(tempfile.mkdtemp())
 
     @classmethod
     def tearDownClass(cls):
@@ -1549,7 +1549,9 @@ if __name__ == "__main__":
         kinesis_jar_present = True
         jars = "%s,%s,%s" % (kafka_assembly_jar, flume_assembly_jar, kinesis_asl_assembly_jar)
 
-    os.environ["PYSPARK_SUBMIT_ARGS"] = "--jars %s pyspark-shell" % jars
+    existing_args = os.environ.get("PYSPARK_SUBMIT_ARGS", "pyspark-shell")
+    jars_args = "--jars %s" % jars
+    os.environ["PYSPARK_SUBMIT_ARGS"] = " ".join([jars_args, existing_args])
     testcases = [BasicOperationTests, WindowFunctionTests, StreamingContextTests, CheckpointTests,
                  StreamingListenerTests]
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a634d66c/python/pyspark/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 8392d7f..7b8ce2c 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -1951,7 +1951,12 @@ class SparkSubmitTests(unittest.TestCase):
 
     def setUp(self):
         self.programDir = tempfile.mkdtemp()
-        self.sparkSubmit = os.path.join(os.environ.get("SPARK_HOME"), "bin", "spark-submit")
+        tmp_dir = tempfile.gettempdir()
+        self.sparkSubmit = [
+            os.path.join(os.environ.get("SPARK_HOME"), "bin", "spark-submit"),
+            "--conf", "spark.driver.extraJavaOptions=-Djava.io.tmpdir={0}".format(tmp_dir),
+            "--conf", "spark.executor.extraJavaOptions=-Djava.io.tmpdir={0}".format(tmp_dir),
+        ]
 
     def tearDown(self):
         shutil.rmtree(self.programDir)
@@ -2017,7 +2022,7 @@ class SparkSubmitTests(unittest.TestCase):
             |sc = SparkContext()
             |print(sc.parallelize([1, 2, 3]).map(lambda x: x * 2).collect())
             """)
-        proc = subprocess.Popen([self.sparkSubmit, script], stdout=subprocess.PIPE)
+        proc = subprocess.Popen(self.sparkSubmit + [script], stdout=subprocess.PIPE)
         out, err = proc.communicate()
         self.assertEqual(0, proc.returncode)
         self.assertIn("[2, 4, 6]", out.decode('utf-8'))
@@ -2033,7 +2038,7 @@ class SparkSubmitTests(unittest.TestCase):
             |sc = SparkContext()
             |print(sc.parallelize([1, 2, 3]).map(foo).collect())
             """)
-        proc = subprocess.Popen([self.sparkSubmit, script], stdout=subprocess.PIPE)
+        proc = subprocess.Popen(self.sparkSubmit + [script], stdout=subprocess.PIPE)
         out, err = proc.communicate()
         self.assertEqual(0, proc.returncode)
         self.assertIn("[3, 6, 9]", out.decode('utf-8'))
@@ -2051,7 +2056,7 @@ class SparkSubmitTests(unittest.TestCase):
             |def myfunc(x):
             |    return x + 1
             """)
-        proc = subprocess.Popen([self.sparkSubmit, "--py-files", zip, script],
+        proc = subprocess.Popen(self.sparkSubmit + ["--py-files", zip, script],
                                 stdout=subprocess.PIPE)
         out, err = proc.communicate()
         self.assertEqual(0, proc.returncode)
@@ -2070,7 +2075,7 @@ class SparkSubmitTests(unittest.TestCase):
             |def myfunc(x):
             |    return x + 1
             """)
-        proc = subprocess.Popen([self.sparkSubmit, "--py-files", zip, "--master",
+        proc = subprocess.Popen(self.sparkSubmit + ["--py-files", zip, "--master",
                                 "local-cluster[1,1,1024]", script],
                                 stdout=subprocess.PIPE)
         out, err = proc.communicate()
@@ -2087,8 +2092,10 @@ class SparkSubmitTests(unittest.TestCase):
             |print(sc.parallelize([1, 2, 3]).map(myfunc).collect())
             """)
         self.create_spark_package("a:mylib:0.1")
-        proc = subprocess.Popen([self.sparkSubmit, "--packages", "a:mylib:0.1", "--repositories",
-                                 "file:" + self.programDir, script], stdout=subprocess.PIPE)
+        proc = subprocess.Popen(
+            self.sparkSubmit + ["--packages", "a:mylib:0.1", "--repositories",
+                                "file:" + self.programDir, script],
+            stdout=subprocess.PIPE)
         out, err = proc.communicate()
         self.assertEqual(0, proc.returncode)
         self.assertIn("[2, 3, 4]", out.decode('utf-8'))
@@ -2103,9 +2110,11 @@ class SparkSubmitTests(unittest.TestCase):
             |print(sc.parallelize([1, 2, 3]).map(myfunc).collect())
             """)
         self.create_spark_package("a:mylib:0.1")
-        proc = subprocess.Popen([self.sparkSubmit, "--packages", "a:mylib:0.1", "--repositories",
-                                 "file:" + self.programDir, "--master",
-                                 "local-cluster[1,1,1024]", script], stdout=subprocess.PIPE)
+        proc = subprocess.Popen(
+            self.sparkSubmit + ["--packages", "a:mylib:0.1", "--repositories",
+                                "file:" + self.programDir, "--master", "local-cluster[1,1,1024]",
+                                script],
+            stdout=subprocess.PIPE)
         out, err = proc.communicate()
         self.assertEqual(0, proc.returncode)
         self.assertIn("[2, 3, 4]", out.decode('utf-8'))
@@ -2124,7 +2133,7 @@ class SparkSubmitTests(unittest.TestCase):
         # this will fail if you have different spark.executor.memory
         # in conf/spark-defaults.conf
         proc = subprocess.Popen(
-            [self.sparkSubmit, "--master", "local-cluster[1,1,1024]", script],
+            self.sparkSubmit + ["--master", "local-cluster[1,1,1024]", script],
             stdout=subprocess.PIPE)
         out, err = proc.communicate()
         self.assertEqual(0, proc.returncode)
@@ -2144,7 +2153,7 @@ class SparkSubmitTests(unittest.TestCase):
             |    sc.stop()
             """)
         proc = subprocess.Popen(
-            [self.sparkSubmit, "--master", "local", script],
+            self.sparkSubmit + ["--master", "local", script],
             stdout=subprocess.PIPE,
             stderr=subprocess.STDOUT)
         out, err = proc.communicate()

http://git-wip-us.apache.org/repos/asf/spark/blob/a634d66c/python/run-tests.py
----------------------------------------------------------------------
diff --git a/python/run-tests.py b/python/run-tests.py
index f408fc5..4c90926 100755
--- a/python/run-tests.py
+++ b/python/run-tests.py
@@ -22,11 +22,13 @@ import logging
 from optparse import OptionParser
 import os
 import re
+import shutil
 import subprocess
 import sys
 import tempfile
 from threading import Thread, Lock
 import time
+import uuid
 if sys.version < '3':
     import Queue
 else:
@@ -68,7 +70,7 @@ else:
     raise Exception("Cannot find assembly build directory, please build Spark first.")
 
 
-def run_individual_python_test(test_name, pyspark_python):
+def run_individual_python_test(target_dir, test_name, pyspark_python):
     env = dict(os.environ)
     env.update({
         'SPARK_DIST_CLASSPATH': SPARK_DIST_CLASSPATH,
@@ -77,6 +79,23 @@ def run_individual_python_test(test_name, pyspark_python):
         'PYSPARK_PYTHON': which(pyspark_python),
         'PYSPARK_DRIVER_PYTHON': which(pyspark_python)
     })
+
+    # Create a unique temp directory under 'target/' for each run. The TMPDIR variable is
+    # recognized by the tempfile module to override the default system temp directory.
+    tmp_dir = os.path.join(target_dir, str(uuid.uuid4()))
+    while os.path.isdir(tmp_dir):
+        tmp_dir = os.path.join(target_dir, str(uuid.uuid4()))
+    os.mkdir(tmp_dir)
+    env["TMPDIR"] = tmp_dir
+
+    # Also override the JVM's temp directory by setting driver and executor options.
+    spark_args = [
+        "--conf", "spark.driver.extraJavaOptions=-Djava.io.tmpdir={0}".format(tmp_dir),
+        "--conf", "spark.executor.extraJavaOptions=-Djava.io.tmpdir={0}".format(tmp_dir),
+        "pyspark-shell"
+    ]
+    env["PYSPARK_SUBMIT_ARGS"] = " ".join(spark_args)
+
     LOGGER.info("Starting test(%s): %s", pyspark_python, test_name)
     start_time = time.time()
     try:
@@ -84,6 +103,7 @@ def run_individual_python_test(test_name, pyspark_python):
         retcode = subprocess.Popen(
             [os.path.join(SPARK_HOME, "bin/pyspark"), test_name],
             stderr=per_test_output, stdout=per_test_output, env=env).wait()
+        shutil.rmtree(tmp_dir, ignore_errors=True)
     except:
         LOGGER.exception("Got exception while running %s with %s", test_name, pyspark_python)
         # Here, we use os._exit() instead of sys.exit() in order to force Python to exit
even if
@@ -238,6 +258,11 @@ def main():
                         priority = 100
                     task_queue.put((priority, (python_exec, test_goal)))
 
+    # Create the target directory before starting tasks to avoid races.
+    target_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), 'target'))
+    if not os.path.isdir(target_dir):
+        os.mkdir(target_dir)
+
     def process_queue(task_queue):
         while True:
             try:
@@ -245,7 +270,7 @@ def main():
             except Queue.Empty:
                 break
             try:
-                run_individual_python_test(test_goal, python_exec)
+                run_individual_python_test(target_dir, test_goal, python_exec)
             finally:
                 task_queue.task_done()
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message