spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tdas <...@git.apache.org>
Subject [GitHub] spark pull request #21126: [SPARK-24050][SS] Calculate input / processing ra...
Date Tue, 24 Apr 2018 09:20:10 GMT
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21126#discussion_r183660795
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
---
    @@ -207,62 +209,92 @@ trait ProgressReporter extends Logging {
           return ExecutionStats(Map.empty, stateOperators, watermarkTimestamp)
         }
     
    -    // We want to associate execution plan leaves to sources that generate them, so that
we match
    -    // the their metrics (e.g. numOutputRows) to the sources. To do this we do the following.
    -    // Consider the translation from the streaming logical plan to the final executed
plan.
    -    //
    -    //  streaming logical plan (with sources) <==> trigger's logical plan <==>
executed plan
    -    //
    -    // 1. We keep track of streaming sources associated with each leaf in the trigger's
logical plan
    -    //    - Each logical plan leaf will be associated with a single streaming source.
    -    //    - There can be multiple logical plan leaves associated with a streaming source.
    -    //    - There can be leaves not associated with any streaming source, because they
were
    -    //      generated from a batch source (e.g. stream-batch joins)
    -    //
    -    // 2. Assuming that the executed plan has same number of leaves in the same order
as that of
    -    //    the trigger logical plan, we associate executed plan leaves with corresponding
    -    //    streaming sources.
    -    //
    -    // 3. For each source, we sum the metrics of the associated execution plan leaves.
    -    //
    -    val logicalPlanLeafToSource = newData.flatMap { case (source, logicalPlan) =>
    -      logicalPlan.collectLeaves().map { leaf => leaf -> source }
    +    val numInputRows = extractSourceToNumInputRows()
    +
    +    val eventTimeStats = lastExecution.executedPlan.collect {
    +      case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 0 =>
    +        val stats = e.eventTimeStats.value
    +        Map(
    +          "max" -> stats.max,
    +          "min" -> stats.min,
    +          "avg" -> stats.avg.toLong).mapValues(formatTimestamp)
    +    }.headOption.getOrElse(Map.empty) ++ watermarkTimestamp
    +
    +    ExecutionStats(numInputRows, stateOperators, eventTimeStats)
    +  }
    +
    +  /** Extract number of input sources for each streaming source in plan */
    +  private def extractSourceToNumInputRows(): Map[BaseStreamingSource, Long] = {
    +
    +    def sumRows(tuples: Seq[(BaseStreamingSource, Long)]): Map[BaseStreamingSource, Long]
= {
    +      tuples.groupBy(_._1).mapValues(_.map(_._2).sum) // sum up rows for each source
         }
    -    val allLogicalPlanLeaves = lastExecution.logical.collectLeaves() // includes non-streaming
    -    val allExecPlanLeaves = lastExecution.executedPlan.collectLeaves()
    -    val numInputRows: Map[BaseStreamingSource, Long] =
    +
    +    val onlyDataSourceV2Sources = {
    +      // Check whether the streaming query's logical plan has only V2 data sources
    +      val allStreamingLeaves =
    +        logicalPlan.collect { case s: StreamingExecutionRelation => s }
    +      allStreamingLeaves.forall { _.source.isInstanceOf[MicroBatchReader] }
    --- End diff --
    
    Yeah. This code path is not used by continuous processing. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message