spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pwend...@apache.org
Subject git commit: Merge pull request #504 from JoshRosen/SPARK-1025
Date Tue, 04 Feb 2014 06:40:03 GMT
Updated Branches:
  refs/heads/branch-0.9 b10f60740 -> 2c6c9b9d3


Merge pull request #504 from JoshRosen/SPARK-1025

Fix PySpark hang when input files are deleted (SPARK-1025)

This pull request addresses [SPARK-1025](https://spark-project.atlassian.net/browse/SPARK-1025),
an issue where PySpark could hang if its input files were deleted.
(cherry picked from commit c40619d4873f36ffb96a2e6292b32d5b64eab153)

Signed-off-by: Patrick Wendell <pwendell@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/2c6c9b9d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/2c6c9b9d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/2c6c9b9d

Branch: refs/heads/branch-0.9
Commit: 2c6c9b9d33570ea563900902f9b3d37beef51c06
Parents: b10f607
Author: Reynold Xin <rxin@apache.org>
Authored: Sat Jan 25 22:41:30 2014 -0800
Committer: Patrick Wendell <pwendell@gmail.com>
Committed: Mon Feb 3 22:39:59 2014 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/api/python/PythonRDD.scala    |  9 +++++++++
 python/pyspark/tests.py                                  | 11 +++++++++++
 2 files changed, 20 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2c6c9b9d/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 46d53e3..9cbd26b 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -52,6 +52,8 @@ private[spark] class PythonRDD[T: ClassTag](
     val env = SparkEnv.get
     val worker = env.createPythonWorker(pythonExec, envVars.toMap)
 
+    @volatile var readerException: Exception = null
+
     // Start a thread to feed the process input from our parent's iterator
     new Thread("stdin writer for " + pythonExec) {
       override def run() {
@@ -84,6 +86,10 @@ private[spark] class PythonRDD[T: ClassTag](
           dataOut.flush()
           worker.shutdownOutput()
         } catch {
+          case e: java.io.FileNotFoundException =>
+            readerException = e
+            // Kill the Python worker process:
+            worker.shutdownOutput()
           case e: IOException =>
             // This can happen for legitimate reasons if the Python code stops returning
data before we are done
             // passing elements through, e.g., for take(). Just log a message to say it happened.
@@ -108,6 +114,9 @@ private[spark] class PythonRDD[T: ClassTag](
       }
 
       private def read(): Array[Byte] = {
+        if (readerException != null) {
+          throw readerException
+        }
         try {
           stream.readInt() match {
             case length if length > 0 =>

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2c6c9b9d/python/pyspark/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index acd1ca5..5271045 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -168,6 +168,17 @@ class TestRDDFunctions(PySparkTestCase):
         self.assertEqual("Hello World!", x.strip())
         self.assertEqual("Hello World!", y.strip())
 
+    def test_deleting_input_files(self):
+        # Regression test for SPARK-1025
+        tempFile = NamedTemporaryFile(delete=False)
+        tempFile.write("Hello World!")
+        tempFile.close()
+        data = self.sc.textFile(tempFile.name)
+        filtered_data = data.filter(lambda x: True)
+        self.assertEqual(1, filtered_data.count())
+        os.unlink(tempFile.name)
+        self.assertRaises(Exception, lambda: filtered_data.count())
+
 
 class TestIO(PySparkTestCase):
 


Mime
View raw message