spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m...@apache.org
Subject spark git commit: [SPARK-6294] fix hang when call take() in JVM on PythonRDD
Date Thu, 12 Mar 2015 08:34:44 GMT
Repository: spark
Updated Branches:
  refs/heads/master 25b71d8c1 -> 712679a7b


[SPARK-6294] fix hang when call take() in JVM on PythonRDD

The Thread.interrupt() can not terminate the thread in some cases, so we should not wait for
the writerThread of PythonRDD.

This PR also ignore some exception during clean up.

cc JoshRosen mengxr

Author: Davies Liu <davies@databricks.com>

Closes #4987 from davies/fix_take and squashes the following commits:

4488f1a [Davies Liu] fix hang when call take() in JVM on PythonRDD


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/712679a7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/712679a7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/712679a7

Branch: refs/heads/master
Commit: 712679a7b447346a365b38574d7a86d56a93f767
Parents: 25b71d8
Author: Davies Liu <davies@databricks.com>
Authored: Thu Mar 12 01:34:38 2015 -0700
Committer: Xiangrui Meng <meng@databricks.com>
Committed: Thu Mar 12 01:34:38 2015 -0700

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/api/python/PythonRDD.scala  | 9 ++++++---
 python/pyspark/daemon.py                                    | 5 ++++-
 python/pyspark/tests.py                                     | 5 +++++
 3 files changed, 15 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/712679a7/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 8d4a53b..4c71b69 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -76,7 +76,6 @@ private[spark] class PythonRDD(
 
     context.addTaskCompletionListener { context =>
       writerThread.shutdownOnTaskCompletion()
-      writerThread.join()
       if (!reuse_worker || !released) {
         try {
           worker.close()
@@ -248,13 +247,17 @@ private[spark] class PythonRDD(
       } catch {
         case e: Exception if context.isCompleted || context.isInterrupted =>
           logDebug("Exception thrown after task completion (likely due to cleanup)", e)
-          Utils.tryLog(worker.shutdownOutput())
+          if (!worker.isClosed) {
+            Utils.tryLog(worker.shutdownOutput())
+          }
 
         case e: Exception =>
           // We must avoid throwing exceptions here, because the thread uncaught exception
handler
           // will kill the whole executor (see org.apache.spark.executor.Executor).
           _exception = e
-          Utils.tryLog(worker.shutdownOutput())
+          if (!worker.isClosed) {
+            Utils.tryLog(worker.shutdownOutput())
+          }
       } finally {
         // Release memory used by this thread for shuffles
         env.shuffleMemoryManager.releaseMemoryForThisThread()

http://git-wip-us.apache.org/repos/asf/spark/blob/712679a7/python/pyspark/daemon.py
----------------------------------------------------------------------
diff --git a/python/pyspark/daemon.py b/python/pyspark/daemon.py
index f09587f..9388598 100644
--- a/python/pyspark/daemon.py
+++ b/python/pyspark/daemon.py
@@ -61,7 +61,10 @@ def worker(sock):
     except SystemExit as exc:
         exit_code = compute_real_exit_code(exc.code)
     finally:
-        outfile.flush()
+        try:
+            outfile.flush()
+        except Exception:
+            pass
     return exit_code
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/712679a7/python/pyspark/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 06ba2b4..dd8d3b1 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -782,6 +782,11 @@ class RDDTests(ReusedPySparkTestCase):
         jobId = tracker.getJobIdsForGroup("test4")[0]
         self.assertEqual(3, len(tracker.getJobInfo(jobId).stageIds))
 
+    # Regression test for SPARK-6294
+    def test_take_on_jrdd(self):
+        rdd = self.sc.parallelize(range(1 << 20)).map(lambda x: str(x))
+        rdd._jrdd.first()
+
 
 class ProfilerTests(PySparkTestCase):
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message