spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m...@apache.org
Subject spark git commit: [SPARK-6294] [PySpark] fix take of PythonRDD in JVM (branch 1.2)
Date Thu, 12 Mar 2015 22:19:22 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-1.2 c684e5f9f -> 9ebd6f12e


[SPARK-6294] [PySpark] fix take of PythonRDD in JVM (branch 1.2)

The Thread.interrupt() can not terminate the thread in some cases, so we should not wait for
the writerThread of PythonRDD.

This PR also ignore some exception during clean up.

cc mengxr

Author: Davies Liu <davies@databricks.com>

Closes #5003 from davies/fix_take2 and squashes the following commits:

2f2f893 [Davies Liu] fix take of PythonRDD in JVM


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9ebd6f12
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9ebd6f12
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9ebd6f12

Branch: refs/heads/branch-1.2
Commit: 9ebd6f12e67cd5995896d5bedf4a205b602109a5
Parents: c684e5f
Author: Davies Liu <davies@databricks.com>
Authored: Thu Mar 12 15:19:17 2015 -0700
Committer: Xiangrui Meng <meng@databricks.com>
Committed: Thu Mar 12 15:19:17 2015 -0700

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/api/python/PythonRDD.scala  | 9 ++++++---
 python/pyspark/daemon.py                                    | 5 ++++-
 python/pyspark/tests.py                                     | 5 +++++
 3 files changed, 15 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9ebd6f12/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 2715722..d307779 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -77,7 +77,6 @@ private[spark] class PythonRDD(
 
     context.addTaskCompletionListener { context =>
       writerThread.shutdownOnTaskCompletion()
-      writerThread.join()
       if (!reuse_worker || !released) {
         try {
           worker.close()
@@ -249,13 +248,17 @@ private[spark] class PythonRDD(
       } catch {
         case e: Exception if context.isCompleted || context.isInterrupted =>
           logDebug("Exception thrown after task completion (likely due to cleanup)", e)
-          Utils.tryLog(worker.shutdownOutput())
+          if (!worker.isClosed) {
+            Utils.tryLog(worker.shutdownOutput())
+          }
 
         case e: Exception =>
           // We must avoid throwing exceptions here, because the thread uncaught exception
handler
           // will kill the whole executor (see org.apache.spark.executor.Executor).
           _exception = e
-          Utils.tryLog(worker.shutdownOutput())
+          if (!worker.isClosed) {
+            Utils.tryLog(worker.shutdownOutput())
+          }
       } finally {
         // Release memory used by this thread for shuffles
         env.shuffleMemoryManager.releaseMemoryForThisThread()

http://git-wip-us.apache.org/repos/asf/spark/blob/9ebd6f12/python/pyspark/daemon.py
----------------------------------------------------------------------
diff --git a/python/pyspark/daemon.py b/python/pyspark/daemon.py
index f09587f..9388598 100644
--- a/python/pyspark/daemon.py
+++ b/python/pyspark/daemon.py
@@ -61,7 +61,10 @@ def worker(sock):
     except SystemExit as exc:
         exit_code = compute_real_exit_code(exc.code)
     finally:
-        outfile.flush()
+        try:
+            outfile.flush()
+        except Exception:
+            pass
     return exit_code
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/9ebd6f12/python/pyspark/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 5007b6e..2e490a0 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -739,6 +739,11 @@ class RDDTests(ReusedPySparkTestCase):
         converted_rdd = RDD(data_python_rdd, self.sc)
         self.assertEqual(2, converted_rdd.count())
 
+    # Regression test for SPARK-6294
+    def test_take_on_jrdd(self):
+        rdd = self.sc.parallelize(range(1 << 20)).map(lambda x: str(x))
+        rdd._jrdd.first()
+
 
 class ProfilerTests(PySparkTestCase):
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message