spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From joshro...@apache.org
Subject git commit: [SPARK-4088] [PySpark] Python worker should exit after socket is closed by JVM
Date Sat, 25 Oct 2014 08:20:52 GMT
Repository: spark
Updated Branches:
  refs/heads/master 953031688 -> e41786c77


[SPARK-4088] [PySpark] Python worker should exit after socket is closed by JVM

In case of take() or exception in Python, python worker may exit before JVM read() all the
response, then the write thread may raise "Connection reset" exception.

Python should always wait JVM to close the socket first.

cc JoshRosen This is a warm fix, or the tests will be flaky, sorry for that.

Author: Davies Liu <davies@databricks.com>

Closes #2941 from davies/fix_exit and squashes the following commits:

9d4d21e [Davies Liu] fix race


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e41786c7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e41786c7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e41786c7

Branch: refs/heads/master
Commit: e41786c77482d3f9e3c01cfd583c8899815c3106
Parents: 9530316
Author: Davies Liu <davies@databricks.com>
Authored: Sat Oct 25 01:20:39 2014 -0700
Committer: Josh Rosen <joshrosen@databricks.com>
Committed: Sat Oct 25 01:20:39 2014 -0700

----------------------------------------------------------------------
 python/pyspark/daemon.py | 12 +++++++-----
 1 file changed, 7 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e41786c7/python/pyspark/daemon.py
----------------------------------------------------------------------
diff --git a/python/pyspark/daemon.py b/python/pyspark/daemon.py
index dbb3477..f09587f 100644
--- a/python/pyspark/daemon.py
+++ b/python/pyspark/daemon.py
@@ -62,8 +62,7 @@ def worker(sock):
         exit_code = compute_real_exit_code(exc.code)
     finally:
         outfile.flush()
-        if exit_code:
-            os._exit(exit_code)
+    return exit_code
 
 
 # Cleanup zombie children
@@ -160,10 +159,13 @@ def manager():
                         outfile.flush()
                         outfile.close()
                         while True:
-                            worker(sock)
-                            if not reuse:
+                            code = worker(sock)
+                            if not reuse or code:
                                 # wait for closing
-                                while sock.recv(1024):
+                                try:
+                                    while sock.recv(1024):
+                                        pass
+                                except Exception:
                                     pass
                                 break
                             gc.collect()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message