spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tdas <...@git.apache.org>
Subject [GitHub] spark pull request #20552: [SPARK-23099][SS] Migrate foreach sink to DataSou...
Date Thu, 08 Feb 2018 23:08:43 GMT
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20552#discussion_r167076621
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala
---
    @@ -17,52 +17,119 @@
     
     package org.apache.spark.sql.execution.streaming
     
    +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream}
    +
     import org.apache.spark.TaskContext
    -import org.apache.spark.sql.{DataFrame, Encoder, ForeachWriter}
    -import org.apache.spark.sql.catalyst.encoders.encoderFor
    +import org.apache.spark.sql._
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder}
    +import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
    +import org.apache.spark.sql.sources.v2.{DataSourceOptions, StreamWriteSupport}
    +import org.apache.spark.sql.sources.v2.writer._
    +import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
    +import org.apache.spark.sql.streaming.OutputMode
    +import org.apache.spark.sql.types.StructType
     
    -/**
    - * A [[Sink]] that forwards all data into [[ForeachWriter]] according to the contract
defined by
    - * [[ForeachWriter]].
    - *
    - * @param writer The [[ForeachWriter]] to process all data.
    - * @tparam T The expected type of the sink.
    - */
    -class ForeachSink[T : Encoder](writer: ForeachWriter[T]) extends Sink with Serializable
{
    -
    -  override def addBatch(batchId: Long, data: DataFrame): Unit = {
    -    // This logic should've been as simple as:
    -    // ```
    -    //   data.as[T].foreachPartition { iter => ... }
    -    // ```
    -    //
    -    // Unfortunately, doing that would just break the incremental planing. The reason
is,
    -    // `Dataset.foreachPartition()` would further call `Dataset.rdd()`, but `Dataset.rdd()`
will
    -    // create a new plan. Because StreamExecution uses the existing plan to collect metrics
and
    -    // update watermark, we should never create a new plan. Otherwise, metrics and watermark
are
    -    // updated in the new plan, and StreamExecution cannot retrieval them.
    -    //
    -    // Hence, we need to manually convert internal rows to objects using encoder.
    +
    +case class ForeachWriterProvider[T: Encoder](writer: ForeachWriter[T]) extends StreamWriteSupport
{
    +  override def createStreamWriter(
    +      queryId: String,
    +      schema: StructType,
    +      mode: OutputMode,
    +      options: DataSourceOptions): StreamWriter = {
         val encoder = encoderFor[T].resolveAndBind(
    -      data.logicalPlan.output,
    -      data.sparkSession.sessionState.analyzer)
    -    data.queryExecution.toRdd.foreachPartition { iter =>
    -      if (writer.open(TaskContext.getPartitionId(), batchId)) {
    -        try {
    -          while (iter.hasNext) {
    -            writer.process(encoder.fromRow(iter.next()))
    -          }
    -        } catch {
    -          case e: Throwable =>
    -            writer.close(e)
    -            throw e
    -        }
    -        writer.close(null)
    -      } else {
    -        writer.close(null)
    +      schema.toAttributes,
    +      SparkSession.getActiveSession.get.sessionState.analyzer)
    +    ForeachInternalWriter(writer, encoder)
    +  }
    +}
    +
    +case class ForeachInternalWriter[T: Encoder](
    +    writer: ForeachWriter[T], encoder: ExpressionEncoder[T])
    --- End diff --
    
    nit: params on different lines


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message