spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andrewo...@apache.org
Subject spark git commit: [SPARK-5645] Added local read bytes/time to task metrics
Date Thu, 12 Feb 2015 22:36:30 GMT
Repository: spark
Updated Branches:
  refs/heads/master aa4ca8b87 -> 893d6fd70


[SPARK-5645] Added local read bytes/time to task metrics

ksakellis I stumbled on your JIRA for this yesterday; I know it's assigned to you but I'd
already done this for my own uses a while ago so thought I could help save you the work of
doing it!  Hopefully this doesn't duplicate any work you've already done.

Here's a screenshot of what the UI looks like:
![image](https://cloud.githubusercontent.com/assets/1108612/6135352/c03e7276-b11c-11e4-8f11-c6aefe1f35b9.png)
Based on a discussion with pwendell, I put the data read remotely in as an additional metric
rather than showing it in brackets as you'd suggested, Kostas.  The assumption here is that
the average user doesn't care about the differentiation between local / remote data, so it's
better not to pollute the UI.

I also added data about the local read time, which I've found very helpful for debugging,
but I didn't put it in the UI because I think it's probably something not a ton of people
will need to use.

With this change, the total read time and total write time shown in the UI will be equal,
fixing a long-term source of user confusion:
![image](https://cloud.githubusercontent.com/assets/1108612/6135399/25f14490-b11d-11e4-8086-20be5f4002e6.png)

Author: Kay Ousterhout <kayousterhout@gmail.com>

Closes #4510 from kayousterhout/SPARK-5645 and squashes the following commits:

4a0182c [Kay Ousterhout] oops
5f5da1b [Kay Ousterhout] Small style fix
5da04cf [Kay Ousterhout] Addressed more comments from Kostas
ba05149 [Kay Ousterhout] Remove parens
a9dc685 [Kay Ousterhout] Kostas comment, test fix
33d2e2d [Kay Ousterhout] Merge remote-tracking branch 'upstream/master' into SPARK-5645
347e2cd [Kay Ousterhout] [SPARK-5645] Added local read bytes/time to task metrics


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/893d6fd7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/893d6fd7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/893d6fd7

Branch: refs/heads/master
Commit: 893d6fd7049daf3c4d01eb6a960801cd064d5f73
Parents: aa4ca8b
Author: Kay Ousterhout <kayousterhout@gmail.com>
Authored: Thu Feb 12 14:35:44 2015 -0800
Committer: Andrew Or <andrew@databricks.com>
Committed: Thu Feb 12 14:36:27 2015 -0800

----------------------------------------------------------------------
 .../org/apache/spark/ui/static/webui.css        |  4 +-
 .../org/apache/spark/executor/TaskMetrics.scala | 21 ++++++
 .../org/apache/spark/scheduler/JobLogger.scala  |  4 +-
 .../storage/ShuffleBlockFetcherIterator.scala   |  3 +
 .../scala/org/apache/spark/ui/ToolTips.scala    |  8 ++-
 .../spark/ui/jobs/JobProgressListener.scala     |  6 +-
 .../org/apache/spark/ui/jobs/StagePage.scala    | 67 ++++++++++++++++----
 .../org/apache/spark/ui/jobs/StageTable.scala   |  2 +-
 .../spark/ui/jobs/TaskDetailsClassNames.scala   |  1 +
 .../scala/org/apache/spark/ui/jobs/UIData.scala |  4 +-
 .../org/apache/spark/util/JsonProtocol.scala    |  4 ++
 .../ui/jobs/JobProgressListenerSuite.scala      | 12 ++--
 .../apache/spark/util/JsonProtocolSuite.scala   | 17 +++++
 13 files changed, 125 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/893d6fd7/core/src/main/resources/org/apache/spark/ui/static/webui.css
----------------------------------------------------------------------
diff --git a/core/src/main/resources/org/apache/spark/ui/static/webui.css b/core/src/main/resources/org/apache/spark/ui/static/webui.css
index 68b33b5..6c37cc8 100644
--- a/core/src/main/resources/org/apache/spark/ui/static/webui.css
+++ b/core/src/main/resources/org/apache/spark/ui/static/webui.css
@@ -196,7 +196,7 @@ span.additional-metric-title {
 
 /* Hide all additional metrics by default. This is done here rather than using JavaScript
to
  * avoid slow page loads for stage pages with large numbers (e.g., thousands) of tasks. */
-.scheduler_delay, .deserialization_time, .fetch_wait_time, .serialization_time,
-.getting_result_time {
+.scheduler_delay, .deserialization_time, .fetch_wait_time, .shuffle_read_remote,
+.serialization_time, .getting_result_time {
   display: none;
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/893d6fd7/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index bf3f1e4..df36566 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -202,6 +202,8 @@ class TaskMetrics extends Serializable {
         merged.incLocalBlocksFetched(depMetrics.localBlocksFetched)
         merged.incRemoteBlocksFetched(depMetrics.remoteBlocksFetched)
         merged.incRemoteBytesRead(depMetrics.remoteBytesRead)
+        merged.incLocalBytesRead(depMetrics.localBytesRead)
+        merged.incLocalReadTime(depMetrics.localReadTime)
         merged.incRecordsRead(depMetrics.recordsRead)
       }
       _shuffleReadMetrics = Some(merged)
@@ -344,6 +346,25 @@ class ShuffleReadMetrics extends Serializable {
   private[spark] def decRemoteBytesRead(value: Long) = _remoteBytesRead -= value
 
   /**
+   * Time the task spent (in milliseconds) reading local shuffle blocks (from the local disk).
+   */
+  private var _localReadTime: Long = _
+  def localReadTime = _localReadTime
+  private[spark] def incLocalReadTime(value: Long) = _localReadTime += value
+
+  /**
+   * Shuffle data that was read from the local disk (as opposed to from a remote executor).
+   */
+  private var _localBytesRead: Long = _
+  def localBytesRead = _localBytesRead
+  private[spark] def incLocalBytesRead(value: Long) = _localBytesRead += value
+
+  /**
+   * Total bytes fetched in the shuffle by this task (both remote and local).
+   */
+  def totalBytesRead = _remoteBytesRead + _localBytesRead
+
+  /**
    * Number of blocks fetched in this shuffle by this task (remote or local)
    */
   def totalBlocksFetched = _remoteBlocksFetched + _localBlocksFetched

http://git-wip-us.apache.org/repos/asf/spark/blob/893d6fd7/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
index 3bb5485..f9fc8aa 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
@@ -169,7 +169,9 @@ class JobLogger(val user: String, val logDirName: String) extends SparkListener
         " BLOCK_FETCHED_LOCAL=" + metrics.localBlocksFetched +
         " BLOCK_FETCHED_REMOTE=" + metrics.remoteBlocksFetched +
         " REMOTE_FETCH_WAIT_TIME=" + metrics.fetchWaitTime +
-        " REMOTE_BYTES_READ=" + metrics.remoteBytesRead
+        " REMOTE_BYTES_READ=" + metrics.remoteBytesRead +
+        " LOCAL_READ_TIME=" + metrics.localReadTime +
+        " LOCAL_BYTES_READ=" + metrics.localBytesRead
       case None => ""
     }
     val writeMetrics = taskMetrics.shuffleWriteMetrics match {

http://git-wip-us.apache.org/repos/asf/spark/blob/893d6fd7/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
index ab9ee4f..2ebb799 100644
--- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
@@ -228,12 +228,14 @@ final class ShuffleBlockFetcherIterator(
    * track in-memory are the ManagedBuffer references themselves.
    */
   private[this] def fetchLocalBlocks() {
+    val startTime = System.currentTimeMillis
     val iter = localBlocks.iterator
     while (iter.hasNext) {
       val blockId = iter.next()
       try {
         val buf = blockManager.getBlockData(blockId)
         shuffleMetrics.incLocalBlocksFetched(1)
+        shuffleMetrics.incLocalBytesRead(buf.size)
         buf.retain()
         results.put(new SuccessFetchResult(blockId, 0, buf))
       } catch {
@@ -244,6 +246,7 @@ final class ShuffleBlockFetcherIterator(
           return
       }
     }
+    shuffleMetrics.incLocalReadTime(System.currentTimeMillis - startTime)
   }
 
   private[this] def initialize(): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/893d6fd7/core/src/main/scala/org/apache/spark/ui/ToolTips.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/ToolTips.scala b/core/src/main/scala/org/apache/spark/ui/ToolTips.scala
index 3a15e60..cae6870 100644
--- a/core/src/main/scala/org/apache/spark/ui/ToolTips.scala
+++ b/core/src/main/scala/org/apache/spark/ui/ToolTips.scala
@@ -37,8 +37,12 @@ private[spark] object ToolTips {
     "Bytes and records written to disk in order to be read by a shuffle in a future stage."
 
   val SHUFFLE_READ =
-    """Bytes and records read from remote executors. Typically less than shuffle write bytes
-       because this does not include shuffle data read locally."""
+    """Total shuffle bytes and records read (includes both data read locally and data read
from
+       remote executors). """
+
+  val SHUFFLE_READ_REMOTE_SIZE =
+    """Total shuffle bytes read from remote executors. This is a subset of the shuffle
+       read bytes; the remaining shuffle data is read locally. """
 
   val GETTING_RESULT_TIME =
     """Time that the driver spends fetching task results from workers. If this is large,
consider

http://git-wip-us.apache.org/repos/asf/spark/blob/893d6fd7/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index f463f8d..0b6fe70 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -401,9 +401,9 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with
Logging {
     execSummary.shuffleWriteRecords += shuffleWriteRecordsDelta
 
     val shuffleReadDelta =
-      (taskMetrics.shuffleReadMetrics.map(_.remoteBytesRead).getOrElse(0L)
-      - oldMetrics.flatMap(_.shuffleReadMetrics).map(_.remoteBytesRead).getOrElse(0L))
-    stageData.shuffleReadBytes += shuffleReadDelta
+      (taskMetrics.shuffleReadMetrics.map(_.totalBytesRead).getOrElse(0L)
+        - oldMetrics.flatMap(_.shuffleReadMetrics).map(_.totalBytesRead).getOrElse(0L))
+    stageData.shuffleReadTotalBytes += shuffleReadDelta
     execSummary.shuffleRead += shuffleReadDelta
 
     val shuffleReadRecordsDelta =

http://git-wip-us.apache.org/repos/asf/spark/blob/893d6fd7/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 05ffd5b..d752434 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -85,7 +85,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage")
{
             {if (stageData.hasShuffleRead) {
               <li>
                 <strong>Shuffle read: </strong>
-                {s"${Utils.bytesToString(stageData.shuffleReadBytes)} / " +
+                {s"${Utils.bytesToString(stageData.shuffleReadTotalBytes)} / " +
                  s"${stageData.shuffleReadRecords}"}
               </li>
             }}
@@ -143,6 +143,13 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage")
{
                     <span class="additional-metric-title">Shuffle Read Blocked Time</span>
                   </span>
                 </li>
+                <li>
+                  <span data-toggle="tooltip"
+                        title={ToolTips.SHUFFLE_READ_REMOTE_SIZE} data-placement="right">
+                    <input type="checkbox" name={TaskDetailsClassNames.SHUFFLE_READ_REMOTE_SIZE}/>
+                    <span class="additional-metric-title">Shuffle Remote Reads</span>
+                  </span>
+                </li>
               }}
               <li>
                 <span data-toggle="tooltip"
@@ -181,7 +188,8 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage")
{
         {if (stageData.hasOutput) Seq(("Output Size / Records", "")) else Nil} ++
         {if (stageData.hasShuffleRead) {
           Seq(("Shuffle Read Blocked Time", TaskDetailsClassNames.SHUFFLE_READ_BLOCKED_TIME),
-            ("Shuffle Read Size / Records", ""))
+            ("Shuffle Read Size / Records", ""),
+            ("Shuffle Remote Reads", TaskDetailsClassNames.SHUFFLE_READ_REMOTE_SIZE))
         } else {
           Nil
         }} ++
@@ -320,19 +328,41 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage")
{
           val shuffleReadBlockedTimes = validTasks.map { case TaskUIData(_, metrics, _) =>
             metrics.get.shuffleReadMetrics.map(_.fetchWaitTime).getOrElse(0L).toDouble
           }
-          val shuffleReadBlockedQuantiles = <td>Shuffle Read Blocked Time</td>
+:
+          val shuffleReadBlockedQuantiles =
+            <td>
+              <span data-toggle="tooltip"
+                    title={ToolTips.SHUFFLE_READ_BLOCKED_TIME} data-placement="right">
+                Shuffle Read Blocked Time
+              </span>
+            </td> +:
             getFormattedTimeQuantiles(shuffleReadBlockedTimes)
 
-          val shuffleReadSizes = validTasks.map { case TaskUIData(_, metrics, _) =>
-            metrics.get.shuffleReadMetrics.map(_.remoteBytesRead).getOrElse(0L).toDouble
+          val shuffleReadTotalSizes = validTasks.map { case TaskUIData(_, metrics, _) =>
+            metrics.get.shuffleReadMetrics.map(_.totalBytesRead).getOrElse(0L).toDouble
           }
-
-          val shuffleReadRecords = validTasks.map { case TaskUIData(_, metrics, _) =>
+          val shuffleReadTotalRecords = validTasks.map { case TaskUIData(_, metrics, _) =>
             metrics.get.shuffleReadMetrics.map(_.recordsRead).getOrElse(0L).toDouble
           }
+          val shuffleReadTotalQuantiles =
+            <td>
+              <span data-toggle="tooltip"
+                    title={ToolTips.SHUFFLE_READ} data-placement="right">
+                Shuffle Read Size / Records
+              </span>
+            </td> +:
+            getFormattedSizeQuantilesWithRecords(shuffleReadTotalSizes, shuffleReadTotalRecords)
 
-          val shuffleReadQuantiles = <td>Shuffle Read Size / Records (Remote)</td>
+:
-            getFormattedSizeQuantilesWithRecords(shuffleReadSizes, shuffleReadRecords)
+          val shuffleReadRemoteSizes = validTasks.map { case TaskUIData(_, metrics, _) =>
+            metrics.get.shuffleReadMetrics.map(_.remoteBytesRead).getOrElse(0L).toDouble
+          }
+          val shuffleReadRemoteQuantiles =
+            <td>
+              <span data-toggle="tooltip"
+                    title={ToolTips.SHUFFLE_READ_REMOTE_SIZE} data-placement="right">
+                Shuffle Remote Reads
+              </span>
+            </td> +:
+            getFormattedSizeQuantiles(shuffleReadRemoteSizes)
 
           val shuffleWriteSizes = validTasks.map { case TaskUIData(_, metrics, _) =>
             metrics.get.shuffleWriteMetrics.map(_.shuffleBytesWritten).getOrElse(0L).toDouble
@@ -374,7 +404,10 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage")
{
               <tr class={TaskDetailsClassNames.SHUFFLE_READ_BLOCKED_TIME}>
                 {shuffleReadBlockedQuantiles}
               </tr>
-              <tr>{shuffleReadQuantiles}</tr>
+              <tr>{shuffleReadTotalQuantiles}</tr>
+              <tr class={TaskDetailsClassNames.SHUFFLE_READ_REMOTE_SIZE}>
+                {shuffleReadRemoteQuantiles}
+              </tr>
             } else {
               Nil
             },
@@ -454,11 +487,15 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage")
{
       val shuffleReadBlockedTimeReadable =
         maybeShuffleRead.map(ms => UIUtils.formatDuration(ms.fetchWaitTime)).getOrElse("")
 
-      val shuffleReadSortable = maybeShuffleRead.map(_.remoteBytesRead.toString).getOrElse("")
-      val shuffleReadReadable = maybeShuffleRead
-        .map(m => s"${Utils.bytesToString(m.remoteBytesRead)}").getOrElse("")
+      val totalShuffleBytes = maybeShuffleRead.map(_.totalBytesRead)
+      val shuffleReadSortable = totalShuffleBytes.map(_.toString).getOrElse("")
+      val shuffleReadReadable = totalShuffleBytes.map(Utils.bytesToString).getOrElse("")
       val shuffleReadRecords = maybeShuffleRead.map(_.recordsRead.toString).getOrElse("")
 
+      val remoteShuffleBytes = maybeShuffleRead.map(_.remoteBytesRead)
+      val shuffleReadRemoteSortable = remoteShuffleBytes.map(_.toString).getOrElse("")
+      val shuffleReadRemoteReadable = remoteShuffleBytes.map(Utils.bytesToString).getOrElse("")
+
       val maybeShuffleWrite = metrics.flatMap(_.shuffleWriteMetrics)
       val shuffleWriteSortable = maybeShuffleWrite.map(_.shuffleBytesWritten.toString).getOrElse("")
       val shuffleWriteReadable = maybeShuffleWrite
@@ -536,6 +573,10 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage")
{
            <td sorttable_customkey={shuffleReadSortable}>
              {s"$shuffleReadReadable / $shuffleReadRecords"}
            </td>
+           <td sorttable_customkey={shuffleReadRemoteSortable}
+               class={TaskDetailsClassNames.SHUFFLE_READ_REMOTE_SIZE}>
+             {shuffleReadRemoteReadable}
+           </td>
         }}
         {if (hasShuffleWrite) {
            <td sorttable_customkey={writeTimeSortable}>

http://git-wip-us.apache.org/repos/asf/spark/blob/893d6fd7/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
index 703d43f..5865850 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
@@ -138,7 +138,7 @@ private[ui] class StageTableBase(
     val inputReadWithUnit = if (inputRead > 0) Utils.bytesToString(inputRead) else ""
     val outputWrite = stageData.outputBytes
     val outputWriteWithUnit = if (outputWrite > 0) Utils.bytesToString(outputWrite) else
""
-    val shuffleRead = stageData.shuffleReadBytes
+    val shuffleRead = stageData.shuffleReadTotalBytes
     val shuffleReadWithUnit = if (shuffleRead > 0) Utils.bytesToString(shuffleRead) else
""
     val shuffleWrite = stageData.shuffleWriteBytes
     val shuffleWriteWithUnit = if (shuffleWrite > 0) Utils.bytesToString(shuffleWrite)
else ""

http://git-wip-us.apache.org/repos/asf/spark/blob/893d6fd7/core/src/main/scala/org/apache/spark/ui/jobs/TaskDetailsClassNames.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/TaskDetailsClassNames.scala b/core/src/main/scala/org/apache/spark/ui/jobs/TaskDetailsClassNames.scala
index 37cf2c2..9bf67db 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/TaskDetailsClassNames.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/TaskDetailsClassNames.scala
@@ -28,6 +28,7 @@ private[spark] object TaskDetailsClassNames {
   val SCHEDULER_DELAY = "scheduler_delay"
   val TASK_DESERIALIZATION_TIME = "deserialization_time"
   val SHUFFLE_READ_BLOCKED_TIME = "fetch_wait_time"
+  val SHUFFLE_READ_REMOTE_SIZE = "shuffle_read_remote"
   val RESULT_SERIALIZATION_TIME = "serialization_time"
   val GETTING_RESULT_TIME = "getting_result_time"
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/893d6fd7/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
index 69aac6c..dbf1cee 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
@@ -80,7 +80,7 @@ private[jobs] object UIData {
     var inputRecords: Long = _
     var outputBytes: Long = _
     var outputRecords: Long = _
-    var shuffleReadBytes: Long = _
+    var shuffleReadTotalBytes: Long = _
     var shuffleReadRecords : Long = _
     var shuffleWriteBytes: Long = _
     var shuffleWriteRecords: Long = _
@@ -96,7 +96,7 @@ private[jobs] object UIData {
 
     def hasInput = inputBytes > 0
     def hasOutput = outputBytes > 0
-    def hasShuffleRead = shuffleReadBytes > 0
+    def hasShuffleRead = shuffleReadTotalBytes > 0
     def hasShuffleWrite = shuffleWriteBytes > 0
     def hasBytesSpilled = memoryBytesSpilled > 0 && diskBytesSpilled > 0
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/893d6fd7/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index b0b5456..58d37e2 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -294,6 +294,8 @@ private[spark] object JsonProtocol {
     ("Local Blocks Fetched" -> shuffleReadMetrics.localBlocksFetched) ~
     ("Fetch Wait Time" -> shuffleReadMetrics.fetchWaitTime) ~
     ("Remote Bytes Read" -> shuffleReadMetrics.remoteBytesRead) ~
+    ("Local Read Time" -> shuffleReadMetrics.localReadTime) ~
+    ("Local Bytes Read" -> shuffleReadMetrics.localBytesRead) ~
     ("Total Records Read" -> shuffleReadMetrics.recordsRead)
   }
 
@@ -674,6 +676,8 @@ private[spark] object JsonProtocol {
     metrics.incLocalBlocksFetched((json \ "Local Blocks Fetched").extract[Int])
     metrics.incFetchWaitTime((json \ "Fetch Wait Time").extract[Long])
     metrics.incRemoteBytesRead((json \ "Remote Bytes Read").extract[Long])
+    metrics.incLocalReadTime((json \ "Local Read Time").extractOpt[Long].getOrElse(0))
+    metrics.incLocalBytesRead((json \ "Local Bytes Read").extractOpt[Long].getOrElse(0))
     metrics.incRecordsRead((json \ "Total Records Read").extractOpt[Long].getOrElse(0))
     metrics
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/893d6fd7/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index e8405ba..6019282 100644
--- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -227,6 +227,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext
with Matc
       taskMetrics.setShuffleReadMetrics(Some(shuffleReadMetrics))
       taskMetrics.shuffleWriteMetrics = Some(shuffleWriteMetrics)
       shuffleReadMetrics.incRemoteBytesRead(base + 1)
+      shuffleReadMetrics.incLocalBytesRead(base + 9)
       shuffleReadMetrics.incRemoteBlocksFetched(base + 2)
       shuffleWriteMetrics.incShuffleBytesWritten(base + 3)
       taskMetrics.setExecutorRunTime(base + 4)
@@ -260,8 +261,8 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext
with Matc
 
     var stage0Data = listener.stageIdToData.get((0, 0)).get
     var stage1Data = listener.stageIdToData.get((1, 0)).get
-    assert(stage0Data.shuffleReadBytes == 102)
-    assert(stage1Data.shuffleReadBytes == 201)
+    assert(stage0Data.shuffleReadTotalBytes == 220)
+    assert(stage1Data.shuffleReadTotalBytes == 410)
     assert(stage0Data.shuffleWriteBytes == 106)
     assert(stage1Data.shuffleWriteBytes == 203)
     assert(stage0Data.executorRunTime == 108)
@@ -290,8 +291,11 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext
with Matc
 
     stage0Data = listener.stageIdToData.get((0, 0)).get
     stage1Data = listener.stageIdToData.get((1, 0)).get
-    assert(stage0Data.shuffleReadBytes == 402)
-    assert(stage1Data.shuffleReadBytes == 602)
+    // Task 1235 contributed (100+1)+(100+9) = 210 shuffle bytes, and task 1234 contributed
+    // (300+1)+(300+9) = 610 total shuffle bytes, so the total for the stage is 820.
+    assert(stage0Data.shuffleReadTotalBytes == 820)
+    // Task 1236 contributed 410 shuffle bytes, and task 1237 contributed 810 shuffle bytes.
+    assert(stage1Data.shuffleReadTotalBytes == 1220)
     assert(stage0Data.shuffleWriteBytes == 406)
     assert(stage1Data.shuffleWriteBytes == 606)
     assert(stage0Data.executorRunTime == 408)

http://git-wip-us.apache.org/repos/asf/spark/blob/893d6fd7/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index f3017dc..c181baf 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -260,6 +260,19 @@ class JsonProtocolSuite extends FunSuite {
     assert(expectedFetchFailed === JsonProtocol.taskEndReasonFromJson(oldEvent))
   }
 
+  test("ShuffleReadMetrics: Local bytes read and time taken backwards compatibility") {
+    // Metrics about local shuffle bytes read and local read time were added in 1.3.1.
+    val metrics = makeTaskMetrics(1L, 2L, 3L, 4L, 5, 6,
+      hasHadoopInput = false, hasOutput = false, hasRecords = false)
+    assert(metrics.shuffleReadMetrics.nonEmpty)
+    val newJson = JsonProtocol.taskMetricsToJson(metrics)
+    val oldJson = newJson.removeField { case (field, _) => field == "Local Bytes Read"
}
+      .removeField { case (field, _) => field == "Local Read Time" }
+    val newMetrics = JsonProtocol.taskMetricsFromJson(oldJson)
+    assert(newMetrics.shuffleReadMetrics.get.localBytesRead == 0)
+    assert(newMetrics.shuffleReadMetrics.get.localReadTime == 0)
+  }
+
   test("SparkListenerApplicationStart backwards compatibility") {
     // SparkListenerApplicationStart in Spark 1.0.0 do not have an "appId" property.
     val applicationStart = SparkListenerApplicationStart("test", None, 1L, "user")
@@ -695,6 +708,8 @@ class JsonProtocolSuite extends FunSuite {
       sr.incFetchWaitTime(a + d)
       sr.incRemoteBlocksFetched(f)
       sr.incRecordsRead(if (hasRecords) (b + d) / 100 else -1)
+      sr.incLocalReadTime(a + e)
+      sr.incLocalBytesRead(a + f)
       t.setShuffleReadMetrics(Some(sr))
     }
     if (hasOutput) {
@@ -941,6 +956,8 @@ class JsonProtocolSuite extends FunSuite {
       |      "Local Blocks Fetched": 700,
       |      "Fetch Wait Time": 900,
       |      "Remote Bytes Read": 1000,
+      |      "Local Read Time": 1000,
+      |      "Local Bytes Read": 1100,
       |      "Total Records Read" : 10
       |    },
       |    "Shuffle Write Metrics": {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message