spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davies <...@git.apache.org>
Subject [GitHub] spark pull request: [SPARK-9393] [SQL] Fix several error-handling ...
Date Tue, 28 Jul 2015 18:47:41 GMT
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/7710#discussion_r35684872
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
---
    @@ -146,49 +198,83 @@ case class ScriptTransformation(
             }
           }
     
    -      val (inputSerde, inputSoi) = ioschema.initInputSerDe(input)
    -      val dataOutputStream = new DataOutputStream(outputStream)
    -      val outputProjection = new InterpretedProjection(input, child.output)
    +      writerThread.start()
     
    -      // TODO make the 2048 configurable?
    -      val stderrBuffer = new CircularBuffer(2048)
    -      // Consume the error stream from the pipeline, otherwise it will be blocked if
    -      // the pipeline is full.
    -      new RedirectThread(errorStream, // input stream from the pipeline
    -        stderrBuffer,                 // output to a circular buffer
    -        "Thread-ScriptTransformation-STDERR-Consumer").start()
    +      outputIterator
    +    }
     
    -      // Put the write(output to the pipeline) into a single thread
    -      // and keep the collector as remain in the main thread.
    -      // otherwise it will causes deadlock if the data size greater than
    -      // the pipeline / buffer capacity.
    -      new Thread(new Runnable() {
    -        override def run(): Unit = {
    -          Utils.tryWithSafeFinally {
    -            iter
    -              .map(outputProjection)
    -              .foreach { row =>
    -              if (inputSerde == null) {
    -                val data = row.mkString("", ioschema.inputRowFormatMap("TOK_TABLEROWFORMATFIELD"),
    -                  ioschema.inputRowFormatMap("TOK_TABLEROWFORMATLINES")).getBytes("utf-8")
    -
    -                outputStream.write(data)
    -              } else {
    -                val writable = inputSerde.serialize(
    -                  row.asInstanceOf[GenericInternalRow].values, inputSoi)
    -                prepareWritable(writable).write(dataOutputStream)
    -              }
    -            }
    -            outputStream.close()
    -          } {
    -            if (proc.waitFor() != 0) {
    -              logError(stderrBuffer.toString) // log the stderr circular buffer
    -            }
    -          }
    -        }
    -      }, "Thread-ScriptTransformation-Feed").start()
    +    child.execute().mapPartitions { iter =>
    +      if (iter.hasNext) {
    +        processIterator(iter)
    +      } else {
    +        // If the input iterator has no rows then do not launch the external script.
    +        Iterator.empty
    +      }
    +    }
    +  }
    +}
     
    -      iterator
    +private class ScriptTransformationWriterThread(
    +    iter: Iterator[InternalRow],
    +    outputProjection: Projection,
    +    @Nullable inputSerde: AbstractSerDe,
    +    @Nullable inputSoi: ObjectInspector,
    +    ioschema: HiveScriptIOSchema,
    +    outputStream: OutputStream,
    +    proc: Process,
    +    stderrBuffer: CircularBuffer,
    +    taskContext: TaskContext
    +  ) extends Thread("Thread-ScriptTransformation-Feed") with Logging {
    +
    +  setDaemon(true)
    +
    +  @volatile private var _exception: Throwable = null
    +
    +  /** Contains the exception thrown while writing the parent iterator to the external
process. */
    +  def exception: Option[Throwable] = Option(_exception)
    +
    +  override def run(): Unit = Utils.logUncaughtExceptions {
    +    TaskContext.setTaskContext(taskContext)
    +
    +    val dataOutputStream = new DataOutputStream(outputStream)
    +
    +    // We can't use Utils.tryWithSafeFinally here because we also need a `catch` block,
so
    +    // let's use a variable to record whether the `finally` block was hit due to an exception
    +    var threwException: Boolean = true
    +    try {
    +      iter.map(outputProjection).foreach { row =>
    +        if (inputSerde == null) {
    +          val data = row.mkString("", ioschema.inputRowFormatMap("TOK_TABLEROWFORMATFIELD"),
    +            ioschema.inputRowFormatMap("TOK_TABLEROWFORMATLINES")).getBytes("utf-8")
    +          outputStream.write(data)
    +        } else {
    +          val writable = inputSerde.serialize(
    +            row.asInstanceOf[GenericInternalRow].values, inputSoi)
    +          prepareWritable(writable).write(dataOutputStream)
    +        }
    +      }
    +      outputStream.close()
    +      threwException = false
    +    } catch {
    +      case NonFatal(e) =>
    +        // An error occurred while writing input, so kill the child process. According
to the
    +        // Javadoc this call will not throw an exception:
    +        _exception = e
    +        proc.destroy()
    --- End diff --
    
    It might be good to close outputStream.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message