spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vanzin <...@git.apache.org>
Subject [GitHub] spark pull request #20013: [SPARK-20657][core] Speed up rendering of the sta...
Date Tue, 09 Jan 2018 17:49:40 GMT
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20013#discussion_r160478184
  
    --- Diff: core/src/main/scala/org/apache/spark/status/LiveEntity.scala ---
    @@ -119,118 +121,115 @@ private class LiveTask(
     
       import LiveEntityHelpers._
     
    -  private var recordedMetrics: v1.TaskMetrics = null
    +  private var metrics: MetricsTracker = new MetricsTracker()
     
       var errorMessage: Option[String] = None
     
       /**
        * Update the metrics for the task and return the difference between the previous and
new
        * values.
        */
    -  def updateMetrics(metrics: TaskMetrics): v1.TaskMetrics = {
    +  def updateMetrics(metrics: TaskMetrics): MetricsTracker = {
         if (metrics != null) {
    -      val old = recordedMetrics
    -      recordedMetrics = new v1.TaskMetrics(
    -        metrics.executorDeserializeTime,
    -        metrics.executorDeserializeCpuTime,
    -        metrics.executorRunTime,
    -        metrics.executorCpuTime,
    -        metrics.resultSize,
    -        metrics.jvmGCTime,
    -        metrics.resultSerializationTime,
    -        metrics.memoryBytesSpilled,
    -        metrics.diskBytesSpilled,
    -        metrics.peakExecutionMemory,
    -        new v1.InputMetrics(
    -          metrics.inputMetrics.bytesRead,
    -          metrics.inputMetrics.recordsRead),
    -        new v1.OutputMetrics(
    -          metrics.outputMetrics.bytesWritten,
    -          metrics.outputMetrics.recordsWritten),
    -        new v1.ShuffleReadMetrics(
    -          metrics.shuffleReadMetrics.remoteBlocksFetched,
    -          metrics.shuffleReadMetrics.localBlocksFetched,
    -          metrics.shuffleReadMetrics.fetchWaitTime,
    -          metrics.shuffleReadMetrics.remoteBytesRead,
    -          metrics.shuffleReadMetrics.remoteBytesReadToDisk,
    -          metrics.shuffleReadMetrics.localBytesRead,
    -          metrics.shuffleReadMetrics.recordsRead),
    -        new v1.ShuffleWriteMetrics(
    -          metrics.shuffleWriteMetrics.bytesWritten,
    -          metrics.shuffleWriteMetrics.writeTime,
    -          metrics.shuffleWriteMetrics.recordsWritten))
    -      if (old != null) calculateMetricsDelta(recordedMetrics, old) else recordedMetrics
    +      val old = this.metrics
    +      val newMetrics = new MetricsTracker()
    +      newMetrics.executorDeserializeTime = metrics.executorDeserializeTime
    +      newMetrics.executorDeserializeCpuTime = metrics.executorDeserializeCpuTime
    +      newMetrics.executorRunTime = metrics.executorRunTime
    +      newMetrics.executorCpuTime = metrics.executorCpuTime
    +      newMetrics.resultSize = metrics.resultSize
    +      newMetrics.jvmGcTime = metrics.jvmGCTime
    +      newMetrics.resultSerializationTime = metrics.resultSerializationTime
    +      newMetrics.memoryBytesSpilled = metrics.memoryBytesSpilled
    +      newMetrics.diskBytesSpilled = metrics.diskBytesSpilled
    +      newMetrics.peakExecutionMemory = metrics.peakExecutionMemory
    +      newMetrics.inputBytesRead = metrics.inputMetrics.bytesRead
    +      newMetrics.inputRecordsRead = metrics.inputMetrics.recordsRead
    +      newMetrics.outputBytesWritten = metrics.outputMetrics.bytesWritten
    +      newMetrics.outputRecordsWritten = metrics.outputMetrics.recordsWritten
    +      newMetrics.shuffleRemoteBlocksFetched = metrics.shuffleReadMetrics.remoteBlocksFetched
    +      newMetrics.shuffleLocalBlocksFetched = metrics.shuffleReadMetrics.localBlocksFetched
    +      newMetrics.shuffleFetchWaitTime = metrics.shuffleReadMetrics.fetchWaitTime
    +      newMetrics.shuffleRemoteBytesRead = metrics.shuffleReadMetrics.remoteBytesRead
    +      newMetrics.shuffleRemoteBytesReadToDisk = metrics.shuffleReadMetrics.remoteBytesReadToDisk
    +      newMetrics.shuffleLocalBytesRead = metrics.shuffleReadMetrics.localBytesRead
    +      newMetrics.shuffleRecordsRead = metrics.shuffleReadMetrics.recordsRead
    +      newMetrics.shuffleBytesWritten = metrics.shuffleWriteMetrics.bytesWritten
    +      newMetrics.shuffleWriteTime = metrics.shuffleWriteMetrics.writeTime
    +      newMetrics.shuffleRecordsWritten = metrics.shuffleWriteMetrics.recordsWritten
    +
    +      this.metrics = newMetrics
    +      if (old.executorDeserializeTime >= 0L) {
    +        old.subtract(newMetrics)
    --- End diff --
    
    No, this is right. The parameter is actually a delta; so it has to be applied to the old
value, except when the old value is not initialized (which is the condition in L162).
    
    But let me double check this again.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message