spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From squito <...@git.apache.org>
Subject [GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...
Date Tue, 07 Nov 2017 23:00:50 GMT
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19681#discussion_r149529181
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
---
    @@ -0,0 +1,353 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.sql.execution.ui
    +
    +import java.util.Date
    +import java.util.concurrent.ConcurrentHashMap
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable.HashMap
    +
    +import org.apache.spark.{JobExecutionStatus, SparkConf}
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.scheduler._
    +import org.apache.spark.sql.execution.SQLExecution
    +import org.apache.spark.sql.execution.metric._
    +import org.apache.spark.sql.internal.StaticSQLConf._
    +import org.apache.spark.status.LiveEntity
    +import org.apache.spark.status.config._
    +import org.apache.spark.ui.SparkUI
    +import org.apache.spark.util.kvstore.KVStore
    +
    +private[sql] class SQLAppStatusListener(
    +    conf: SparkConf,
    +    kvstore: KVStore,
    +    live: Boolean,
    +    ui: Option[SparkUI] = None)
    +  extends SparkListener with Logging {
    +
    +  // How often to flush intermediate stage of a live execution to the store. When replaying
logs,
    +  // never flush (only do the very last write).
    +  private val liveUpdatePeriodNs = if (live) conf.get(LIVE_ENTITY_UPDATE_PERIOD) else
-1L
    +
    +  private val liveExecutions = new HashMap[Long, LiveExecutionData]()
    +  private val stageMetrics = new HashMap[Int, LiveStageMetrics]()
    +
    +  private var uiInitialized = false
    +
    +  override def onJobStart(event: SparkListenerJobStart): Unit = {
    +    val executionIdString = event.properties.getProperty(SQLExecution.EXECUTION_ID_KEY)
    +    if (executionIdString == null) {
    +      // This is not a job created by SQL
    +      return
    +    }
    +
    +    val executionId = executionIdString.toLong
    +    val jobId = event.jobId
    +    val exec = getOrCreateExecution(executionId)
    +
    +    // Record the accumulator IDs for the stages of this job, so that the code that keeps
    +    // track of the metrics knows which accumulators to look at.
    +    val accumIds = exec.metrics.map(_.accumulatorId).sorted.toList
    +    event.stageIds.foreach { id =>
    +      stageMetrics.put(id, new LiveStageMetrics(id, 0, accumIds.toArray, new ConcurrentHashMap()))
    +    }
    +
    +    exec.jobs = exec.jobs + (jobId -> JobExecutionStatus.RUNNING)
    +    exec.stages = event.stageIds
    +    update(exec)
    +  }
    +
    +  override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = {
    +    if (!isSQLStage(event.stageInfo.stageId)) {
    +      return
    +    }
    +
    +    // Reset the metrics tracking object for the new attempt.
    +    stageMetrics.get(event.stageInfo.stageId).foreach { metrics =>
    +      metrics.taskMetrics.clear()
    +      metrics.attemptId = event.stageInfo.attemptId
    +    }
    +  }
    +
    +  override def onJobEnd(event: SparkListenerJobEnd): Unit = {
    +    liveExecutions.values.foreach { exec =>
    +      if (exec.jobs.contains(event.jobId)) {
    +        val result = event.jobResult match {
    +          case JobSucceeded => JobExecutionStatus.SUCCEEDED
    +          case _ => JobExecutionStatus.FAILED
    +        }
    +        exec.jobs = exec.jobs + (event.jobId -> result)
    +        exec.endEvents += 1
    +        update(exec)
    +      }
    +    }
    +  }
    +
    +  override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit
= {
    +    event.accumUpdates.foreach { case (taskId, stageId, attemptId, accumUpdates) =>
    +      updateStageMetrics(stageId, attemptId, taskId, accumUpdates, false)
    +    }
    +  }
    +
    +  override def onTaskEnd(event: SparkListenerTaskEnd): Unit = {
    +    if (!isSQLStage(event.stageId)) {
    +      return
    +    }
    +
    +    val info = event.taskInfo
    +    // SPARK-20342. If processing events from a live application, use the task metrics
info to
    +    // work around a race in the DAGScheduler. The metrics info does not contain accumulator
info
    +    // when reading event logs in the SHS, so we have to rely on the accumulator in that
case.
    +    val accums = if (live && event.taskMetrics != null) {
    +      event.taskMetrics.externalAccums.flatMap { a =>
    +        // This call may fail if the accumulator is gc'ed, so account for that.
    +        try {
    +          Some(a.toInfo(Some(a.value), None))
    +        } catch {
    +          case _: IllegalAccessError => None
    +        }
    +      }
    +    } else {
    +      info.accumulables
    +    }
    +    updateStageMetrics(event.stageId, event.stageAttemptId, info.taskId, accums,
    +      info.successful)
    +  }
    +
    +  def executionMetrics(executionId: Long): Map[Long, String] = synchronized {
    +    liveExecutions.get(executionId).map { exec =>
    +      if (exec.metricsValues != null) {
    +        exec.metricsValues
    +      } else {
    +        aggregateMetrics(exec)
    +      }
    +    }.getOrElse {
    +      throw new NoSuchElementException(s"execution $executionId not found")
    +    }
    +  }
    +
    +  private def aggregateMetrics(exec: LiveExecutionData): Map[Long, String] = synchronized
{
    +    val metricIds = exec.metrics.map(_.accumulatorId).sorted
    +    val metricTypes = exec.metrics.map { m => (m.accumulatorId, m.metricType) }.toMap
    +    val metrics = exec.stages
    +      .flatMap(stageMetrics.get)
    +      .flatMap(_.taskMetrics.values().asScala)
    +      .flatMap { metrics => metrics.ids.zip(metrics.values) }
    +
    +    (metrics ++ exec.driverAccumUpdates.toSeq)
    +      .filter { case (id, _) => metricIds.contains(id) }
    +      .groupBy(_._1)
    +      .map { case (id, values) =>
    +        id -> SQLMetrics.stringValue(metricTypes(id), values.map(_._2).toSeq)
    +      }
    +  }
    +
    +  private def updateStageMetrics(
    +      stageId: Int,
    +      attemptId: Int,
    +      taskId: Long,
    +      accumUpdates: Seq[AccumulableInfo],
    +      succeeded: Boolean): Unit = {
    +    stageMetrics.get(stageId).foreach { metrics =>
    +      if (metrics.attemptId != attemptId || metrics.accumulatorIds.isEmpty) {
    +        return
    +      }
    +
    +      val oldTaskMetrics = metrics.taskMetrics.get(taskId)
    +      if (oldTaskMetrics != null && oldTaskMetrics.succeeded) {
    +        return
    +      }
    +
    +      val updates = accumUpdates
    +        .filter { acc => acc.update.isDefined && metrics.accumulatorIds.contains(acc.id)
}
    +        .sortBy(_.id)
    +
    +      if (updates.isEmpty) {
    +        return
    +      }
    +
    +      val ids = new Array[Long](updates.size)
    +      val values = new Array[Long](updates.size)
    +      updates.zipWithIndex.foreach { case (acc, idx) =>
    +        ids(idx) = acc.id
    +        // In a live application, accumulators have Long values, but when reading from
event
    +        // logs, they have String values. For now, assume all accumulators are Long and
covert
    +        // accordingly.
    +        values(idx) = acc.update.get match {
    +          case s: String => s.toLong
    +          case l: Long => l
    +          case o => throw new IllegalArgumentException(s"Unexpected: $o")
    +        }
    +      }
    +
    +      metrics.taskMetrics.put(taskId, new LiveTaskMetrics(ids, values, succeeded))
    +    }
    +  }
    +
    +  private def onExecutionStart(event: SparkListenerSQLExecutionStart): Unit = {
    +    // Install the SQL tab in a live app if it hasn't been initialized yet.
    +    if (!uiInitialized) {
    +      ui.foreach { _ui =>
    +        new SQLTab(new SQLAppStatusStore(kvstore, Some(this)), _ui)
    +      }
    +      uiInitialized = true
    +    }
    +
    +    val SparkListenerSQLExecutionStart(executionId, description, details,
    +      physicalPlanDescription, sparkPlanInfo, time) = event
    +
    +    def toStoredNodes(nodes: Seq[SparkPlanGraphNode]): Seq[SparkPlanGraphNodeWrapper]
= {
    +      nodes.map {
    +        case cluster: SparkPlanGraphCluster =>
    +          val storedCluster = new SparkPlanGraphClusterWrapper(
    +            cluster.id,
    +            cluster.name,
    +            cluster.desc,
    +            toStoredNodes(cluster.nodes),
    +            cluster.metrics)
    +          new SparkPlanGraphNodeWrapper(null, storedCluster)
    +
    +        case node =>
    +          new SparkPlanGraphNodeWrapper(node, null)
    +      }
    +    }
    +
    +    val planGraph = SparkPlanGraph(sparkPlanInfo)
    +    val sqlPlanMetrics = planGraph.allNodes.flatMap { node =>
    +      node.metrics.map { metric => (metric.accumulatorId, metric) }
    +    }.toMap.values.toList
    +
    +    val graphToStore = new SparkPlanGraphWrapper(
    +      executionId,
    +      toStoredNodes(planGraph.nodes),
    +      planGraph.edges)
    +    kvstore.write(graphToStore)
    +
    +    val exec = getOrCreateExecution(executionId)
    +    exec.description = description
    +    exec.details = details
    +    exec.physicalPlanDescription = physicalPlanDescription
    +    exec.metrics = sqlPlanMetrics
    +    exec.submissionTime = time
    +    update(exec)
    +  }
    +
    +  private def onExecutionEnd(event: SparkListenerSQLExecutionEnd): Unit = {
    +    val SparkListenerSQLExecutionEnd(executionId, time) = event
    +    liveExecutions.get(executionId).foreach { exec =>
    +      synchronized {
    +        exec.metricsValues = aggregateMetrics(exec)
    +
    +        // Remove stale LiveStageMetrics objects for stages that are not active anymore.
    +        val activeStages = liveExecutions.values.flatMap { other =>
    +          if (other != exec) other.stages else Nil
    +        }.toSet
    +        stageMetrics.retain { case (id, _) => activeStages.contains(id) }
    +
    +        exec.completionTime = Some(new Date(time))
    +        exec.endEvents += 1
    +
    +        update(exec)
    +      }
    +    }
    +  }
    +
    +  private def onDriverAccumUpdates(event: SparkListenerDriverAccumUpdates): Unit = {
    +    val SparkListenerDriverAccumUpdates(executionId, accumUpdates) = event
    +    liveExecutions.get(executionId).foreach { exec =>
    +      exec.driverAccumUpdates = accumUpdates.toMap
    +      update(exec)
    +    }
    +  }
    +
    +  override def onOtherEvent(event: SparkListenerEvent): Unit = event match {
    +    case e: SparkListenerSQLExecutionStart => onExecutionStart(e)
    +    case e: SparkListenerSQLExecutionEnd => onExecutionEnd(e)
    +    case e: SparkListenerDriverAccumUpdates => onDriverAccumUpdates(e)
    +    case _ => // Ignore
    +  }
    +
    +  private def getOrCreateExecution(executionId: Long): LiveExecutionData = {
    +    liveExecutions.getOrElseUpdate(executionId, new LiveExecutionData(executionId))
    +  }
    +
    +  private def update(exec: LiveExecutionData): Unit = {
    +    val now = System.nanoTime()
    +    if (exec.endEvents >= exec.jobs.size + 1) {
    +      liveExecutions.remove(exec.executionId)
    +      exec.write(kvstore, now)
    --- End diff --
    
    if a job was killed mid execution, so the event log didn't contain the end events, you
wouldn't get to see any info for that execution, would you?  don't you need some final flush?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message