spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pwend...@apache.org
Subject git commit: SPARK-1572 Don't kill Executor if PythonRDD fails while computing parent
Date Wed, 23 Apr 2014 21:46:52 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-1.0 19ef78fdf -> be8f26fbb


SPARK-1572 Don't kill Executor if PythonRDD fails while computing parent

Previously, the behavior was that if the parent RDD threw any exception other than IOException
or FileNotFoundException (which is quite possible for Hadoop input sources), the entire Executor
would crash, because the default thread a uncaught exception handler calls System.exit().

This patch avoids two related issues:

  1. Always catch exceptions in this reader thread.
  2. Don't mask readerException when Python throws an EOFError
     after worker.shutdownOutput() is called.

Author: Aaron Davidson <aaron@databricks.com>

Closes #486 from aarondav/pyspark and squashes the following commits:

fbb11e9 [Aaron Davidson] Make sure FileNotFoundExceptions are handled same as before
b9acb3e [Aaron Davidson] SPARK-1572 Don't kill Executor if PythonRDD fails while computing
parent
(cherry picked from commit a967b005c8937a3053e215c952d2172ee3dc300d)

Signed-off-by: Patrick Wendell <pwendell@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/be8f26fb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/be8f26fb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/be8f26fb

Branch: refs/heads/branch-1.0
Commit: be8f26fbbcb55a1ecafe4341fa217f5948c6af04
Parents: 19ef78f
Author: Aaron Davidson <aaron@databricks.com>
Authored: Wed Apr 23 14:46:30 2014 -0700
Committer: Patrick Wendell <pwendell@gmail.com>
Committed: Wed Apr 23 14:46:48 2014 -0700

----------------------------------------------------------------------
 .../org/apache/spark/api/python/PythonRDD.scala | 32 +++++++++++++-------
 1 file changed, 21 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/be8f26fb/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index f9d86fe..8a843fb 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -24,6 +24,7 @@ import java.util.{List => JList, ArrayList => JArrayList, Map =>
JMap, Collectio
 
 import scala.collection.JavaConversions._
 import scala.reflect.ClassTag
+import scala.util.Try
 
 import net.razorvine.pickle.{Pickler, Unpickler}
 
@@ -89,16 +90,22 @@ private[spark] class PythonRDD[T: ClassTag](
           dataOut.flush()
           worker.shutdownOutput()
         } catch {
+
           case e: java.io.FileNotFoundException =>
             readerException = e
-            // Kill the Python worker process:
-            worker.shutdownOutput()
+            Try(worker.shutdownOutput()) // kill Python worker process
+
           case e: IOException =>
             // This can happen for legitimate reasons if the Python code stops returning
data
-            // before we are done passing elements through, e.g., for take(). Just log a
message
-            // to say it happened.
-            logInfo("stdin writer to Python finished early")
-            logDebug("stdin writer to Python finished early", e)
+            // before we are done passing elements through, e.g., for take(). Just log a
message to
+            // say it happened (as it could also be hiding a real IOException from a data
source).
+            logInfo("stdin writer to Python finished early (may not be an error)", e)
+
+          case e: Exception =>
+            // We must avoid throwing exceptions here, because the thread uncaught exception
handler
+            // will kill the whole executor (see Executor).
+            readerException = e
+            Try(worker.shutdownOutput()) // kill Python worker process
         }
       }
     }.start()
@@ -152,7 +159,7 @@ private[spark] class PythonRDD[T: ClassTag](
               val exLength = stream.readInt()
               val obj = new Array[Byte](exLength)
               stream.readFully(obj)
-              throw new PythonException(new String(obj))
+              throw new PythonException(new String(obj), readerException)
             case SpecialLengths.END_OF_DATA_SECTION =>
               // We've finished the data section of the output, but we can still
               // read some accumulator updates:
@@ -167,10 +174,13 @@ private[spark] class PythonRDD[T: ClassTag](
               Array.empty[Byte]
           }
         } catch {
-          case eof: EOFException => {
+          case e: Exception if readerException != null =>
+            logError("Python worker exited unexpectedly (crashed)", e)
+            logError("Python crash may have been caused by prior exception:", readerException)
+            throw readerException
+
+          case eof: EOFException =>
             throw new SparkException("Python worker exited unexpectedly (crashed)", eof)
-          }
-          case e: Throwable => throw e
         }
       }
 
@@ -185,7 +195,7 @@ private[spark] class PythonRDD[T: ClassTag](
 }
 
 /** Thrown for exceptions in user Python code. */
-private class PythonException(msg: String) extends Exception(msg)
+private class PythonException(msg: String, cause: Exception) extends RuntimeException(msg,
cause)
 
 /**
  * Form an RDD[(Array[Byte], Array[Byte])] from key-value pairs returned from Python.


Mime
View raw message