spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject spark git commit: [SPARK-7305] [STREAMING] [WEBUI] Make BatchPage show friendly information when jobs are dropped by SparkListener
Date Fri, 08 May 2015 00:34:48 GMT
Repository: spark
Updated Branches:
  refs/heads/master 88063c626 -> 22ab70e06


[SPARK-7305] [STREAMING] [WEBUI] Make BatchPage show friendly information when jobs are dropped
by SparkListener

If jobs are dropped by SparkListener, at least we can show the job ids in BatchPage. Screenshot:

![b1](https://cloud.githubusercontent.com/assets/1000778/7434968/f19aa784-eff3-11e4-8f86-36a073873574.png)

Author: zsxwing <zsxwing@gmail.com>

Closes #5840 from zsxwing/SPARK-7305 and squashes the following commits:

aca0ba6 [zsxwing] Fix the code style
718765e [zsxwing] Make generateNormalJobRow private
8073b03 [zsxwing] Merge branch 'master' into SPARK-7305
83dec11 [zsxwing] Make BatchPage show friendly information when jobs are dropped by SparkListener


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/22ab70e0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/22ab70e0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/22ab70e0

Branch: refs/heads/master
Commit: 22ab70e06ede65ca865073fe36c859042a920aa3
Parents: 88063c6
Author: zsxwing <zsxwing@gmail.com>
Authored: Thu May 7 17:34:44 2015 -0700
Committer: Tathagata Das <tathagata.das1565@gmail.com>
Committed: Thu May 7 17:34:44 2015 -0700

----------------------------------------------------------------------
 .../apache/spark/streaming/ui/BatchPage.scala   | 136 +++++++++++++++----
 1 file changed, 106 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/22ab70e0/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
index 3f1cab6..831f60e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
@@ -19,7 +19,7 @@ package org.apache.spark.streaming.ui
 
 import javax.servlet.http.HttpServletRequest
 
-import scala.xml.{NodeSeq, Node}
+import scala.xml.{NodeSeq, Node, Text}
 
 import org.apache.commons.lang3.StringEscapeUtils
 
@@ -28,6 +28,7 @@ import org.apache.spark.ui.{UIUtils => SparkUIUtils, WebUIPage}
 import org.apache.spark.streaming.ui.StreamingJobProgressListener.{SparkJobId, OutputOpId}
 import org.apache.spark.ui.jobs.UIData.JobUIData
 
+private case class SparkJobIdWithUIData(sparkJobId: SparkJobId, jobUIData: Option[JobUIData])
 
 private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
   private val streamingListener = parent.listener
@@ -44,25 +45,33 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch")
{
       <th>Error</th>
   }
 
+  private def generateJobRow(
+      outputOpId: OutputOpId,
+      outputOpDescription: Seq[Node],
+      formattedOutputOpDuration: String,
+      numSparkJobRowsInOutputOp: Int,
+      isFirstRow: Boolean,
+      sparkJob: SparkJobIdWithUIData): Seq[Node] = {
+    if (sparkJob.jobUIData.isDefined) {
+      generateNormalJobRow(outputOpId, outputOpDescription, formattedOutputOpDuration,
+        numSparkJobRowsInOutputOp, isFirstRow, sparkJob.jobUIData.get)
+    } else {
+      generateDroppedJobRow(outputOpId, outputOpDescription, formattedOutputOpDuration,
+        numSparkJobRowsInOutputOp, isFirstRow, sparkJob.sparkJobId)
+    }
+  }
+
   /**
    * Generate a row for a Spark Job. Because duplicated output op infos needs to be collapsed
into
    * one cell, we use "rowspan" for the first row of a output op.
    */
-  def generateJobRow(
+  private def generateNormalJobRow(
       outputOpId: OutputOpId,
+      outputOpDescription: Seq[Node],
       formattedOutputOpDuration: String,
       numSparkJobRowsInOutputOp: Int,
       isFirstRow: Boolean,
       sparkJob: JobUIData): Seq[Node] = {
-    val lastStageInfo = Option(sparkJob.stageIds)
-      .filter(_.nonEmpty)
-      .flatMap { ids => sparkListener.stageIdToInfo.get(ids.max) }
-    val lastStageData = lastStageInfo.flatMap { s =>
-      sparkListener.stageIdToData.get((s.stageId, s.attemptId))
-    }
-
-    val lastStageName = lastStageInfo.map(_.name).getOrElse("(Unknown Stage Name)")
-    val lastStageDescription = lastStageData.flatMap(_.description).getOrElse("")
     val duration: Option[Long] = {
       sparkJob.submissionTime.map { start =>
         val end = sparkJob.completionTime.getOrElse(System.currentTimeMillis())
@@ -83,9 +92,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch")
{
       if (isFirstRow) {
         <td class="output-op-id-cell" rowspan={numSparkJobRowsInOutputOp.toString}>{outputOpId.toString}</td>
         <td rowspan={numSparkJobRowsInOutputOp.toString}>
-          <span class="description-input" title={lastStageDescription}>
-            {lastStageDescription}
-          </span>{lastStageName}
+          {outputOpDescription}
         </td>
         <td rowspan={numSparkJobRowsInOutputOp.toString}>{formattedOutputOpDuration}</td>
       } else {
@@ -122,27 +129,96 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch")
{
     </tr>
   }
 
-  private def generateOutputOpIdRow(
-      outputOpId: OutputOpId, sparkJobs: Seq[JobUIData]): Seq[Node] = {
-    val sparkjobDurations = sparkJobs.map(sparkJob => {
-      sparkJob.submissionTime.map { start =>
-        val end = sparkJob.completionTime.getOrElse(System.currentTimeMillis())
-        end - start
+  /**
+   * If a job is dropped by sparkListener due to exceeding the limitation, we only show the
job id
+   * with "-" cells.
+   */
+  private def generateDroppedJobRow(
+      outputOpId: OutputOpId,
+      outputOpDescription: Seq[Node],
+      formattedOutputOpDuration: String,
+      numSparkJobRowsInOutputOp: Int,
+      isFirstRow: Boolean,
+      jobId: Int): Seq[Node] = {
+    // In the first row, output op id and its information needs to be shown. In other rows,
these
+    // cells will be taken up due to "rowspan".
+    // scalastyle:off
+    val prefixCells =
+      if (isFirstRow) {
+        <td class="output-op-id-cell" rowspan={numSparkJobRowsInOutputOp.toString}>{outputOpId.toString}</td>
+          <td rowspan={numSparkJobRowsInOutputOp.toString}>{outputOpDescription}</td>
+          <td rowspan={numSparkJobRowsInOutputOp.toString}>{formattedOutputOpDuration}</td>
+      } else {
+        Nil
       }
-    })
+    // scalastyle:on
+
+    <tr>
+      {prefixCells}
+      <td sorttable_customkey={jobId.toString}>
+        {jobId.toString}
+      </td>
+      <!-- Duration -->
+      <td>-</td>
+      <!-- Stages: Succeeded/Total -->
+      <td>-</td>
+      <!-- Tasks (for all stages): Succeeded/Total -->
+      <td>-</td>
+      <!-- Error -->
+      <td>-</td>
+    </tr>
+  }
+
+  private def generateOutputOpIdRow(
+      outputOpId: OutputOpId, sparkJobs: Seq[SparkJobIdWithUIData]): Seq[Node] = {
+    // We don't count the durations of dropped jobs
+    val sparkJobDurations = sparkJobs.filter(_.jobUIData.nonEmpty).map(_.jobUIData.get).
+      map(sparkJob => {
+        sparkJob.submissionTime.map { start =>
+          val end = sparkJob.completionTime.getOrElse(System.currentTimeMillis())
+          end - start
+        }
+      })
     val formattedOutputOpDuration =
-      if (sparkjobDurations.exists(_ == None)) {
-        // If any job does not finish, set "formattedOutputOpDuration" to "-"
+      if (sparkJobDurations.isEmpty || sparkJobDurations.exists(_ == None)) {
+        // If no job or any job does not finish, set "formattedOutputOpDuration" to "-"
         "-"
       } else {
-        SparkUIUtils.formatDuration(sparkjobDurations.flatMap(x => x).sum)
+        SparkUIUtils.formatDuration(sparkJobDurations.flatMap(x => x).sum)
       }
-    generateJobRow(outputOpId, formattedOutputOpDuration, sparkJobs.size, true, sparkJobs.head)
++
+
+    val description = generateOutputOpDescription(sparkJobs)
+
+    generateJobRow(
+      outputOpId, description, formattedOutputOpDuration, sparkJobs.size, true, sparkJobs.head)
++
       sparkJobs.tail.map { sparkJob =>
-        generateJobRow(outputOpId, formattedOutputOpDuration, sparkJobs.size, false, sparkJob)
+        generateJobRow(
+          outputOpId, description, formattedOutputOpDuration, sparkJobs.size, false, sparkJob)
       }.flatMap(x => x)
   }
 
+  private def generateOutputOpDescription(sparkJobs: Seq[SparkJobIdWithUIData]): Seq[Node]
= {
+    val lastStageInfo =
+      sparkJobs.flatMap(_.jobUIData).headOption. // Get the first JobUIData
+        flatMap { sparkJob => // For the first job, get the latest Stage info
+          if (sparkJob.stageIds.isEmpty) {
+            None
+          } else {
+            sparkListener.stageIdToInfo.get(sparkJob.stageIds.max)
+          }
+        }
+    val lastStageData = lastStageInfo.flatMap { s =>
+      sparkListener.stageIdToData.get((s.stageId, s.attemptId))
+    }
+
+    val lastStageName = lastStageInfo.map(_.name).getOrElse("(Unknown Stage Name)")
+    val lastStageDescription = lastStageData.flatMap(_.description).getOrElse("")
+
+    <span class="description-input" title={lastStageDescription}>
+      {lastStageDescription}
+    </span> ++ Text(lastStageName)
+  }
+
   private def failureReasonCell(failureReason: String): Seq[Node] = {
     val isMultiline = failureReason.indexOf('\n') >= 0
     // Display the first line by default
@@ -187,10 +263,10 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch")
{
         (outputOpId, outputOpIdAndSparkJobIds.map(_.sparkJobId).sorted)
       }
     sparkListener.synchronized {
-      val outputOpIdWithJobs: Seq[(OutputOpId, Seq[JobUIData])] =
+      val outputOpIdWithJobs: Seq[(OutputOpId, Seq[SparkJobIdWithUIData])] =
         outputOpIdToSparkJobIds.map { case (outputOpId, sparkJobIds) =>
-          // Filter out spark Job ids that don't exist in sparkListener
-          (outputOpId, sparkJobIds.flatMap(getJobData))
+          (outputOpId,
+            sparkJobIds.map(sparkJobId => SparkJobIdWithUIData(sparkJobId, getJobData(sparkJobId))))
         }
 
       <table id="batch-job-table" class="table table-bordered table-striped table-condensed">
@@ -200,7 +276,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch")
{
         <tbody>
           {
             outputOpIdWithJobs.map {
-              case (outputOpId, jobs) => generateOutputOpIdRow(outputOpId, jobs)
+              case (outputOpId, sparkJobIds) => generateOutputOpIdRow(outputOpId, sparkJobIds)
             }
           }
         </tbody>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message