spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From carsonwang <...@git.apache.org>
Subject [GitHub] spark pull request: [SPARK-4598][WebUI]Task table pagination for t...
Date Fri, 17 Jul 2015 07:03:45 GMT
Github user carsonwang commented on a diff in the pull request:

    https://github.com/apache/spark/pull/7399#discussion_r34866298
  
    --- Diff: core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala ---
    @@ -672,164 +658,605 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage")
{
         </script>
       }
     
    -  def taskRow(
    -      hasAccumulators: Boolean,
    -      hasInput: Boolean,
    -      hasOutput: Boolean,
    -      hasShuffleRead: Boolean,
    -      hasShuffleWrite: Boolean,
    -      hasBytesSpilled: Boolean,
    -      currentTime: Long)(taskData: TaskUIData): Seq[Node] = {
    -    taskData match { case TaskUIData(info, metrics, errorMessage) =>
    -      val duration = if (info.status == "RUNNING") info.timeRunning(currentTime)
    -        else metrics.map(_.executorRunTime).getOrElse(1L)
    -      val formatDuration = if (info.status == "RUNNING") UIUtils.formatDuration(duration)
    -        else metrics.map(m => UIUtils.formatDuration(m.executorRunTime)).getOrElse("")
    -      val schedulerDelay = metrics.map(getSchedulerDelay(info, _, currentTime)).getOrElse(0L)
    -      val gcTime = metrics.map(_.jvmGCTime).getOrElse(0L)
    -      val taskDeserializationTime = metrics.map(_.executorDeserializeTime).getOrElse(0L)
    -      val serializationTime = metrics.map(_.resultSerializationTime).getOrElse(0L)
    -      val gettingResultTime = getGettingResultTime(info, currentTime)
    -
    -      val maybeAccumulators = info.accumulables
    -      val accumulatorsReadable = maybeAccumulators.map { acc =>
    -        StringEscapeUtils.escapeHtml4(s"${acc.name}: ${acc.update.get}")
    +}
    +
    +private[ui] object StagePage {
    +  private[ui] def getGettingResultTime(info: TaskInfo, currentTime: Long): Long = {
    +    if (info.gettingResult) {
    +      if (info.finished) {
    +        info.finishTime - info.gettingResultTime
    +      } else {
    +        // The task is still fetching the result.
    +        currentTime - info.gettingResultTime
           }
    +    } else {
    +      0L
    +    }
    +  }
     
    -      val maybeInput = metrics.flatMap(_.inputMetrics)
    -      val inputSortable = maybeInput.map(_.bytesRead.toString).getOrElse("")
    -      val inputReadable = maybeInput
    -        .map(m => s"${Utils.bytesToString(m.bytesRead)} (${m.readMethod.toString.toLowerCase()})")
    -        .getOrElse("")
    -      val inputRecords = maybeInput.map(_.recordsRead.toString).getOrElse("")
    -
    -      val maybeOutput = metrics.flatMap(_.outputMetrics)
    -      val outputSortable = maybeOutput.map(_.bytesWritten.toString).getOrElse("")
    -      val outputReadable = maybeOutput
    -        .map(m => s"${Utils.bytesToString(m.bytesWritten)}")
    -        .getOrElse("")
    -      val outputRecords = maybeOutput.map(_.recordsWritten.toString).getOrElse("")
    -
    -      val maybeShuffleRead = metrics.flatMap(_.shuffleReadMetrics)
    -      val shuffleReadBlockedTimeSortable = maybeShuffleRead
    -        .map(_.fetchWaitTime.toString).getOrElse("")
    -      val shuffleReadBlockedTimeReadable =
    -        maybeShuffleRead.map(ms => UIUtils.formatDuration(ms.fetchWaitTime)).getOrElse("")
    -
    -      val totalShuffleBytes = maybeShuffleRead.map(_.totalBytesRead)
    -      val shuffleReadSortable = totalShuffleBytes.map(_.toString).getOrElse("")
    -      val shuffleReadReadable = totalShuffleBytes.map(Utils.bytesToString).getOrElse("")
    -      val shuffleReadRecords = maybeShuffleRead.map(_.recordsRead.toString).getOrElse("")
    -
    -      val remoteShuffleBytes = maybeShuffleRead.map(_.remoteBytesRead)
    -      val shuffleReadRemoteSortable = remoteShuffleBytes.map(_.toString).getOrElse("")
    -      val shuffleReadRemoteReadable = remoteShuffleBytes.map(Utils.bytesToString).getOrElse("")
    -
    -      val maybeShuffleWrite = metrics.flatMap(_.shuffleWriteMetrics)
    -      val shuffleWriteSortable = maybeShuffleWrite.map(_.shuffleBytesWritten.toString).getOrElse("")
    -      val shuffleWriteReadable = maybeShuffleWrite
    -        .map(m => s"${Utils.bytesToString(m.shuffleBytesWritten)}").getOrElse("")
    -      val shuffleWriteRecords = maybeShuffleWrite
    -        .map(_.shuffleRecordsWritten.toString).getOrElse("")
    -
    -      val maybeWriteTime = metrics.flatMap(_.shuffleWriteMetrics).map(_.shuffleWriteTime)
    -      val writeTimeSortable = maybeWriteTime.map(_.toString).getOrElse("")
    -      val writeTimeReadable = maybeWriteTime.map(t => t / (1000 * 1000)).map { ms
=>
    -        if (ms == 0) "" else UIUtils.formatDuration(ms)
    -      }.getOrElse("")
    -
    -      val maybeMemoryBytesSpilled = metrics.map(_.memoryBytesSpilled)
    -      val memoryBytesSpilledSortable = maybeMemoryBytesSpilled.map(_.toString).getOrElse("")
    -      val memoryBytesSpilledReadable =
    -        maybeMemoryBytesSpilled.map(Utils.bytesToString).getOrElse("")
    -
    -      val maybeDiskBytesSpilled = metrics.map(_.diskBytesSpilled)
    -      val diskBytesSpilledSortable = maybeDiskBytesSpilled.map(_.toString).getOrElse("")
    -      val diskBytesSpilledReadable = maybeDiskBytesSpilled.map(Utils.bytesToString).getOrElse("")
    -
    -      <tr>
    -        <td>{info.index}</td>
    -        <td>{info.taskId}</td>
    -        <td sorttable_customkey={info.attempt.toString}>{
    -          if (info.speculative) s"${info.attempt} (speculative)" else info.attempt.toString
    -        }</td>
    -        <td>{info.status}</td>
    -        <td>{info.taskLocality}</td>
    -        <td>{info.executorId} / {info.host}</td>
    -        <td>{UIUtils.formatDate(new Date(info.launchTime))}</td>
    -        <td sorttable_customkey={duration.toString}>
    -          {formatDuration}
    -        </td>
    -        <td sorttable_customkey={schedulerDelay.toString}
    -            class={TaskDetailsClassNames.SCHEDULER_DELAY}>
    -          {UIUtils.formatDuration(schedulerDelay.toLong)}
    -        </td>
    -        <td sorttable_customkey={taskDeserializationTime.toString}
    -            class={TaskDetailsClassNames.TASK_DESERIALIZATION_TIME}>
    -          {UIUtils.formatDuration(taskDeserializationTime.toLong)}
    -        </td>
    -        <td sorttable_customkey={gcTime.toString}>
    -          {if (gcTime > 0) UIUtils.formatDuration(gcTime) else ""}
    -        </td>
    -        <td sorttable_customkey={serializationTime.toString}
    -            class={TaskDetailsClassNames.RESULT_SERIALIZATION_TIME}>
    -          {UIUtils.formatDuration(serializationTime)}
    -        </td>
    -        <td sorttable_customkey={gettingResultTime.toString}
    -            class={TaskDetailsClassNames.GETTING_RESULT_TIME}>
    -          {UIUtils.formatDuration(gettingResultTime)}
    -        </td>
    -        {if (hasAccumulators) {
    -          <td>
    -            {Unparsed(accumulatorsReadable.mkString("<br/>"))}
    -          </td>
    -        }}
    -        {if (hasInput) {
    -          <td sorttable_customkey={inputSortable}>
    -            {s"$inputReadable / $inputRecords"}
    -          </td>
    -        }}
    -        {if (hasOutput) {
    -          <td sorttable_customkey={outputSortable}>
    -            {s"$outputReadable / $outputRecords"}
    -          </td>
    -        }}
    +  private[ui] def getSchedulerDelay(
    +      info: TaskInfo, metrics: TaskMetrics, currentTime: Long): Long = {
    +    if (info.finished) {
    +      val totalExecutionTime = info.finishTime - info.launchTime
    +      val executorOverhead = (metrics.executorDeserializeTime +
    +        metrics.resultSerializationTime)
    +      math.max(
    +        0,
    +        totalExecutionTime - metrics.executorRunTime - executorOverhead -
    +          getGettingResultTime(info, currentTime))
    +    } else {
    +      // The task is still running and the metrics like executorRunTime are not available.
    +      0L
    +    }
    +  }
    +}
    +
    +private[ui] case class TaskTableRowInputData(inputSortable: String, inputReadable: String)
    +
    +private[ui] case class TaskTableRowOutputData(outputSortable: String, outputReadable:
String)
    +
    +private[ui] case class TaskTableRowShuffleReadData(
    +    shuffleReadBlockedTimeSortable: String,
    +    shuffleReadBlockedTimeReadable: String,
    +    shuffleReadSortable: String,
    +    shuffleReadReadable: String,
    +    shuffleReadRemoteSortable: String,
    +    shuffleReadRemoteReadable: String)
    +
    +private[ui] case class TaskTableRowShuffleWriteData(
    +    writeTimeSortable: String,
    +    writeTimeReadable: String,
    +    shuffleWriteSortable: String,
    +    shuffleWriteReadable: String)
    +
    +private[ui] case class TaskTableRowBytesSpilledData(
    +    memoryBytesSpilledSortable: String,
    +    memoryBytesSpilledReadable: String,
    +    diskBytesSpilledSortable: String,
    +    diskBytesSpilledReadable: String)
    +
    +/**
    + * Contains all data that needs for sorting and generating HTML. Using this one rather
than
    + * TaskUIData to avoid creating duplicate contents during sorting the data.
    + */
    +private[ui] case class TaskTableRowData(
    +    index: Int,
    +    taskId: Long,
    +    attempt: Int,
    +    speculative: Boolean,
    +    status: String,
    +    taskLocality: String,
    +    executorIdAndHost: String,
    +    launchTime: Long,
    +    duration: Long,
    +    formatDuration: String,
    +    schedulerDelay: Long,
    +    taskDeserializationTime: Long,
    +    gcTime: Long,
    +    serializationTime: Long,
    +    gettingResultTime: Long,
    +    accumulators: Option[String], // HTML
    +    input: Option[TaskTableRowInputData],
    +    output: Option[TaskTableRowOutputData],
    +    shuffleRead: Option[TaskTableRowShuffleReadData],
    +    shuffleWrite: Option[TaskTableRowShuffleWriteData],
    +    bytesSpilled: Option[TaskTableRowBytesSpilledData],
    +    error: String)
    +
    +private[ui] class TaskDataSource(
    +    tasks: Seq[TaskUIData],
    +    hasAccumulators: Boolean,
    +    hasInput: Boolean,
    +    hasOutput: Boolean,
    +    hasShuffleRead: Boolean,
    +    hasShuffleWrite: Boolean,
    +    hasBytesSpilled: Boolean,
    +    currentTime: Long,
    +    page: Int,
    +    pageSize: Int,
    +    sortColumn: String,
    +    desc: Boolean) extends PagedDataSource[TaskTableRowData](page: Int, pageSize: Int)
{
    +  import StagePage._
    +
    +  // Convert TaskUIData to TaskTableRowData which contains the final contents to show
in the table
    +  // so that we can avoid creating duplicate contents during sorting the data
    +  override val data = tasks.map(taskRow).sorted(ordering(sortColumn, desc))
    +
    +  private def taskRow(taskData: TaskUIData): TaskTableRowData = {
    +    val TaskUIData(info, metrics, errorMessage) = taskData
    +    val duration = if (info.status == "RUNNING") info.timeRunning(currentTime)
    +    else metrics.map(_.executorRunTime).getOrElse(1L)
    +    val formatDuration = if (info.status == "RUNNING") UIUtils.formatDuration(duration)
    +    else metrics.map(m => UIUtils.formatDuration(m.executorRunTime)).getOrElse("")
    +    val schedulerDelay = metrics.map(getSchedulerDelay(info, _, currentTime)).getOrElse(0L)
    +    val gcTime = metrics.map(_.jvmGCTime).getOrElse(0L)
    +    val taskDeserializationTime = metrics.map(_.executorDeserializeTime).getOrElse(0L)
    +    val serializationTime = metrics.map(_.resultSerializationTime).getOrElse(0L)
    +    val gettingResultTime = getGettingResultTime(info, currentTime)
    +
    +    val maybeAccumulators = info.accumulables
    +    val accumulatorsReadable = maybeAccumulators.map { acc =>
    +      StringEscapeUtils.escapeHtml4(s"${acc.name}: ${acc.update.get}")
    +    }
    +
    +    val maybeInput = metrics.flatMap(_.inputMetrics)
    +    val inputSortable = maybeInput.map(_.bytesRead.toString).getOrElse("")
    +    val inputReadable = maybeInput
    +      .map(m => s"${Utils.bytesToString(m.bytesRead)} (${m.readMethod.toString.toLowerCase()})")
    +      .getOrElse("")
    +    val inputRecords = maybeInput.map(_.recordsRead.toString).getOrElse("")
    +
    +    val maybeOutput = metrics.flatMap(_.outputMetrics)
    +    val outputSortable = maybeOutput.map(_.bytesWritten.toString).getOrElse("")
    +    val outputReadable = maybeOutput
    +      .map(m => s"${Utils.bytesToString(m.bytesWritten)}")
    +      .getOrElse("")
    +    val outputRecords = maybeOutput.map(_.recordsWritten.toString).getOrElse("")
    +
    +    val maybeShuffleRead = metrics.flatMap(_.shuffleReadMetrics)
    +    val shuffleReadBlockedTimeSortable = maybeShuffleRead
    +      .map(_.fetchWaitTime.toString).getOrElse("")
    +    val shuffleReadBlockedTimeReadable =
    +      maybeShuffleRead.map(ms => UIUtils.formatDuration(ms.fetchWaitTime)).getOrElse("")
    +
    +    val totalShuffleBytes = maybeShuffleRead.map(_.totalBytesRead)
    +    val shuffleReadSortable = totalShuffleBytes.map(_.toString).getOrElse("")
    +    val shuffleReadReadable = totalShuffleBytes.map(Utils.bytesToString).getOrElse("")
    +    val shuffleReadRecords = maybeShuffleRead.map(_.recordsRead.toString).getOrElse("")
    +
    +    val remoteShuffleBytes = maybeShuffleRead.map(_.remoteBytesRead)
    +    val shuffleReadRemoteSortable = remoteShuffleBytes.map(_.toString).getOrElse("")
    +    val shuffleReadRemoteReadable = remoteShuffleBytes.map(Utils.bytesToString).getOrElse("")
    +
    +    val maybeShuffleWrite = metrics.flatMap(_.shuffleWriteMetrics)
    +    val shuffleWriteSortable = maybeShuffleWrite.map(_.shuffleBytesWritten.toString).getOrElse("")
    +    val shuffleWriteReadable = maybeShuffleWrite
    +      .map(m => s"${Utils.bytesToString(m.shuffleBytesWritten)}").getOrElse("")
    +    val shuffleWriteRecords = maybeShuffleWrite
    +      .map(_.shuffleRecordsWritten.toString).getOrElse("")
    +
    +    val maybeWriteTime = metrics.flatMap(_.shuffleWriteMetrics).map(_.shuffleWriteTime)
    +    val writeTimeSortable = maybeWriteTime.map(_.toString).getOrElse("")
    +    val writeTimeReadable = maybeWriteTime.map(t => t / (1000 * 1000)).map { ms =>
    +      if (ms == 0) "" else UIUtils.formatDuration(ms)
    +    }.getOrElse("")
    +
    +    val maybeMemoryBytesSpilled = metrics.map(_.memoryBytesSpilled)
    +    val memoryBytesSpilledSortable = maybeMemoryBytesSpilled.map(_.toString).getOrElse("")
    +    val memoryBytesSpilledReadable =
    +      maybeMemoryBytesSpilled.map(Utils.bytesToString).getOrElse("")
    +
    +    val maybeDiskBytesSpilled = metrics.map(_.diskBytesSpilled)
    +    val diskBytesSpilledSortable = maybeDiskBytesSpilled.map(_.toString).getOrElse("")
    +    val diskBytesSpilledReadable = maybeDiskBytesSpilled.map(Utils.bytesToString).getOrElse("")
    +
    +    val input =
    +      if (hasInput) {
    +        Some(TaskTableRowInputData(inputSortable, s"$inputReadable / $inputRecords"))
    +      } else {
    +        None
    +      }
    +
    +    val output =
    +      if (hasOutput) {
    +        Some(TaskTableRowOutputData(outputSortable, s"$outputReadable / $outputRecords"))
    +      } else {
    +        None
    +      }
    +
    +    val shuffleRead =
    +      if (hasShuffleRead) {
    +        Some(TaskTableRowShuffleReadData(
    +          shuffleReadBlockedTimeSortable,
    +          shuffleReadBlockedTimeReadable,
    +          shuffleReadSortable,
    +          s"$shuffleReadReadable / $shuffleReadRecords",
    +          shuffleReadRemoteSortable,
    +          shuffleReadRemoteReadable
    +        ))
    +      } else {
    +        None
    +      }
    +
    +    val shuffleWrite =
    +      if (hasShuffleWrite) {
    +        Some(TaskTableRowShuffleWriteData(
    +          writeTimeSortable,
    +          writeTimeReadable,
    +          shuffleWriteSortable,
    +          s"$shuffleWriteReadable / $shuffleWriteRecords"
    +        ))
    +      } else {
    +        None
    +      }
    +
    +    val bytesSpilled =
    +      if (hasBytesSpilled) {
    +        Some(TaskTableRowBytesSpilledData(
    +          memoryBytesSpilledSortable,
    +          memoryBytesSpilledReadable,
    +          diskBytesSpilledSortable,
    +          diskBytesSpilledReadable
    +        ))
    +      } else {
    +        None
    +      }
    +
    +    TaskTableRowData(
    +      info.index,
    +      info.taskId,
    +      info.attempt,
    +      info.speculative,
    +      info.status,
    +      info.taskLocality.toString,
    +      s"${info.executorId} / ${info.host}",
    +      info.launchTime,
    +      duration,
    +      formatDuration,
    +      schedulerDelay,
    +      taskDeserializationTime,
    +      gcTime,
    +      serializationTime,
    +      gettingResultTime,
    +      if (hasAccumulators) Some(accumulatorsReadable.mkString("<br/>")) else None,
    +      input,
    +      output,
    +      shuffleRead,
    +      shuffleWrite,
    +      bytesSpilled,
    +      errorMessage.getOrElse("")
    +    )
    +  }
    +
    +  /**
    +   * Return Ordering according to sortColumn and desc
    +   */
    +  private def ordering(sortColumn: String, desc: Boolean): Ordering[TaskTableRowData]
= {
    +    val ordering = sortColumn match {
    +      case "Index" => new Ordering[TaskTableRowData] {
    +        override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
    +          Ordering.Int.compare(x.index, y.index)
    +      }
    +      case "ID" => new Ordering[TaskTableRowData] {
    +        override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
    +          Ordering.Long.compare(x.taskId, y.taskId)
    +      }
    +      case "Attempt" => new Ordering[TaskTableRowData] {
    +        override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
    +          Ordering.Int.compare(x.attempt, y.attempt)
    +      }
    +      case "Status" => new Ordering[TaskTableRowData] {
    +        override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
    +          Ordering.String.compare(x.status, y.status)
    +      }
    +      case "Locality Level" => new Ordering[TaskTableRowData] {
    +        override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
    +          Ordering.String.compare(x.taskLocality, y.taskLocality)
    +      }
    +      case "Executor ID / Host" => new Ordering[TaskTableRowData] {
    +        override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
    +          Ordering.String.compare(x.executorIdAndHost, y.executorIdAndHost)
    +      }
    +      case "Launch Time" => new Ordering[TaskTableRowData] {
    +        override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
    +          Ordering.Long.compare(x.launchTime, y.launchTime)
    +      }
    +      case "Duration" => new Ordering[TaskTableRowData] {
    +        override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
    +          Ordering.Long.compare(x.duration, y.duration)
    +      }
    +      case "Scheduler Delay" => new Ordering[TaskTableRowData] {
    +        override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
    +          Ordering.Long.compare(x.schedulerDelay, y.schedulerDelay)
    +      }
    +      case "Task Deserialization Time" => new Ordering[TaskTableRowData] {
    +        override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
    +          Ordering.Long.compare(x.taskDeserializationTime, y.taskDeserializationTime)
    +      }
    +      case "GC Time" => new Ordering[TaskTableRowData] {
    +        override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
    +          Ordering.Long.compare(x.gcTime, y.gcTime)
    +      }
    +      case "Result Serialization Time" => new Ordering[TaskTableRowData] {
    +        override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
    +          Ordering.Long.compare(x.serializationTime, y.serializationTime)
    +      }
    +      case "Getting Result Time" => new Ordering[TaskTableRowData] {
    +        override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
    +          Ordering.Long.compare(x.gettingResultTime, y.gettingResultTime)
    +      }
    +      case "Accumulators" =>
    +        if (hasAccumulators) {
    +          new Ordering[TaskTableRowData] {
    +            override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
    +              Ordering.String.compare(x.accumulators.get, y.accumulators.get)
    +          }
    +        } else {
    +          throw new IllegalArgumentException(
    +            "Cannot sort by Accumulators because of no accumulators")
    +        }
    +      case "Input Size / Records" =>
    +        if (hasInput) {
    +          new Ordering[TaskTableRowData] {
    +            override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
    +              Ordering.String.compare(x.input.get.inputSortable, y.input.get.inputSortable)
    +          }
    +        } else {
    +          throw new IllegalArgumentException(
    +            "Cannot sort by Input Size / Records because of no inputs")
    +        }
    +      case "Output Size / Records" =>
    +        if (hasOutput) {
    +          new Ordering[TaskTableRowData] {
    +            override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
    +              Ordering.String.compare(x.output.get.outputSortable, y.output.get.outputSortable)
    +          }
    +        } else {
    +          throw new IllegalArgumentException(
    +            "Cannot sort by Input Size / Records because of no outputs")
    --- End diff --
    
    "Cannot sort by Output Size" here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message