spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aarondav <...@git.apache.org>
Subject [GitHub] spark pull request: [SPARK-1688] Propagate PySpark worker stderr t...
Date Wed, 07 May 2014 17:19:15 GMT
Github user aarondav commented on a diff in the pull request:

    https://github.com/apache/spark/pull/603#discussion_r12389493
  
    --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala ---
    @@ -161,46 +131,38 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars:
Map[String
             workerEnv.put("PYTHONPATH", pythonPath)
             daemon = pb.start()
     
    -        // Redirect the stderr to ours
    -        new Thread("stderr reader for " + pythonExec) {
    -          setDaemon(true)
    -          override def run() {
    -            scala.util.control.Exception.ignoring(classOf[IOException]) {
    -              // FIXME: We copy the stream on the level of bytes to avoid encoding problems.
    -              val in = daemon.getErrorStream
    -              val buf = new Array[Byte](1024)
    -              var len = in.read(buf)
    -              while (len != -1) {
    -                System.err.write(buf, 0, len)
    -                len = in.read(buf)
    -              }
    -            }
    -          }
    -        }.start()
    -
             val in = new DataInputStream(daemon.getInputStream)
             daemonPort = in.readInt()
     
    -        // Redirect further stdout output to our stderr
    -        new Thread("stdout reader for " + pythonExec) {
    -          setDaemon(true)
    -          override def run() {
    -            scala.util.control.Exception.ignoring(classOf[IOException]) {
    -              // FIXME: We copy the stream on the level of bytes to avoid encoding problems.
    -              val buf = new Array[Byte](1024)
    -              var len = in.read(buf)
    -              while (len != -1) {
    -                System.err.write(buf, 0, len)
    -                len = in.read(buf)
    -              }
    -            }
    -          }
    -        }.start()
    +        // Redirect worker stdout and stderr
    +        redirectWorkerStreams(in, daemon.getErrorStream)
    +
           } catch {
    -        case e: Throwable => {
    +        case e: Throwable =>
    +
    +          // If the daemon exists, wait for it to finish and get its stderr
    +          val stderr = Option(daemon)
    +            .flatMap { d => Utils.getStderr(d, PROCESS_WAIT_TIMEOUT_MS) }
    +            .getOrElse("")
    +
               stopDaemon()
    -          throw e
    -        }
    +
    +          if (stderr != "") {
    +            val formattedStderr = stderr.replace("\n", "\n  ")
    +            val errorMessage = s"""
    +              |Error from python worker:
    +              |  $formattedStderr
    +              |PYTHONPATH was:
    +              |  $pythonPath
    +              |$e"""
    +
    +            // Append error message from python daemon, but keep original stack trace
    --- End diff --
    
    Maybe make a comment somewhere that we're assuming any stderr from the daemon has the
real error, which is why we're hiding the Exception.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message