spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pwend...@apache.org
Subject [1/7] git commit: Track and report write throughput for shuffle tasks.
Date Mon, 21 Oct 2013 05:20:54 GMT
Updated Branches:
  refs/heads/master 5b9380e01 -> 35886f347


Track and report write throughput for shuffle tasks.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/3478ca67
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/3478ca67
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/3478ca67

Branch: refs/heads/master
Commit: 3478ca676289f5eabf5dcaa6f80c6bc203cd3f41
Parents: 3745a18
Author: Patrick Wendell <pwendell@gmail.com>
Authored: Wed Aug 14 18:17:07 2013 -0700
Committer: Patrick Wendell <pwendell@gmail.com>
Committed: Mon Oct 7 15:15:41 2013 -0700

----------------------------------------------------------------------
 .../org/apache/spark/executor/TaskMetrics.scala |  5 ++++
 .../apache/spark/scheduler/ShuffleMapTask.scala |  3 +++
 .../spark/storage/BlockObjectWriter.scala       |  5 ++++
 .../org/apache/spark/storage/DiskStore.scala    | 26 +++++++++++++++++++-
 .../org/apache/spark/ui/jobs/StagePage.scala    | 20 ++++++++++++++-
 5 files changed, 57 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/3478ca67/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index f311141..0b4892f 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -102,4 +102,9 @@ class ShuffleWriteMetrics extends Serializable {
    * Number of bytes written for a shuffle
    */
   var shuffleBytesWritten: Long = _
+
+  /**
+   * Time spent blocking on writes to disk or buffer cache, in nanoseconds.
+   */
+  var shuffleWriteTime: Long = _
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/3478ca67/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
index d23df0d..eb27437 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
@@ -154,8 +154,10 @@ private[spark] class ShuffleMapTask(
 
       // Commit the writes. Get the size of each bucket block (total block size).
       var totalBytes = 0L
+      var totalTime = 0L
       val compressedSizes: Array[Byte] = buckets.writers.map { writer: BlockObjectWriter
=>
         writer.commit()
+        totalTime += writer.timeWriting()
         writer.close()
         val size = writer.size()
         totalBytes += size
@@ -165,6 +167,7 @@ private[spark] class ShuffleMapTask(
       // Update shuffle metrics.
       val shuffleMetrics = new ShuffleWriteMetrics
       shuffleMetrics.shuffleBytesWritten = totalBytes
+      shuffleMetrics.shuffleWriteTime = totalTime
       metrics.get.shuffleWriteMetrics = Some(shuffleMetrics)
 
       return new MapStatus(blockManager.blockManagerId, compressedSizes)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/3478ca67/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
index 39f1032..de3e3b0 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
@@ -62,4 +62,9 @@ abstract class BlockObjectWriter(val blockId: String) {
    * Size of the valid writes, in bytes.
    */
   def size(): Long
+
+  /**
+   * Cumulative time spent performing blocking writes, in ns.
+   */
+  def timeWriting(): Long
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/3478ca67/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
index 63447ba..d053958 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
@@ -45,19 +45,38 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
   class DiskBlockObjectWriter(blockId: String, serializer: Serializer, bufferSize: Int)
     extends BlockObjectWriter(blockId) {
 
+    /** Intercepts write calls and tracks total time spent writing. Not thread safe. */
+    private class TimeTrackingOutputStream(out: OutputStream) extends OutputStream {
+      def timeWriting = _timeWriting
+
+      private var _timeWriting = 0L
+
+      private def callWithTiming(f: => Unit) = {
+        val start = System.nanoTime()
+        f
+        _timeWriting += (System.nanoTime() - start)
+      }
+
+      def write(i: Int): Unit = callWithTiming(out.write(i))
+      override def write(b: Array[Byte]) = callWithTiming(out.write(b))
+      override def write(b: Array[Byte], off: Int, len: Int) = callWithTiming(out.write(b,
off, len))
+    }
+
     private val f: File = createFile(blockId /*, allowAppendExisting */)
 
     // The file channel, used for repositioning / truncating the file.
     private var channel: FileChannel = null
     private var bs: OutputStream = null
+    private var ts: TimeTrackingOutputStream = null
     private var objOut: SerializationStream = null
     private var lastValidPosition = 0L
     private var initialized = false
 
     override def open(): DiskBlockObjectWriter = {
       val fos = new FileOutputStream(f, true)
+      ts = new TimeTrackingOutputStream(fos)
       channel = fos.getChannel()
-      bs = blockManager.wrapForCompression(blockId, new FastBufferedOutputStream(fos, bufferSize))
+      bs = blockManager.wrapForCompression(blockId, new FastBufferedOutputStream(ts, bufferSize))
       objOut = serializer.newInstance().serializeStream(bs)
       initialized = true
       this
@@ -68,6 +87,7 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
         objOut.close()
         channel = null
         bs = null
+        ts = null
         objOut = null
       }
       // Invoke the close callback handler.
@@ -110,6 +130,10 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
     }
 
     override def size(): Long = lastValidPosition
+
+    override def timeWriting: Long = {
+      Option(ts).map(t => t.timeWriting).getOrElse(0L) // ts could be null if never written
to
+    }
   }
 
   private val MAX_DIR_CREATION_ATTEMPTS: Int = 10

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/3478ca67/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 163a374..701bc64 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -152,6 +152,22 @@ private[spark] class StagePage(parent: JobProgressUI) {
       else metrics.map(m => parent.formatDuration(m.executorRunTime)).getOrElse("")
     val gcTime = metrics.map(m => m.jvmGCTime).getOrElse(0L)
 
+
+    val remoteBytesRead: Option[Long] = metrics.flatMap{m => m.shuffleReadMetrics}.map(r
=> r.remoteBytesRead)
+    val shuffleBytesWritten: Option[Long] = metrics.flatMap{m => m.shuffleWriteMetrics}.map(r
=> r.shuffleBytesWritten)
+
+    val writeThroughput: Option[Long] = metrics.flatMap{m => m.shuffleWriteMetrics}.flatMap{
s=>
+      val bytesWritten = s.shuffleBytesWritten
+      val timeTaken = s.shuffleWriteTime
+      val timeSeconds = timeTaken / (1000 * 1000 * 1000.0)
+      if (bytesWritten < 10000 || timeSeconds < .01) { // To little data to form an
useful average
+        None
+      } else {
+        Some((bytesWritten / timeSeconds).toLong)
+      }
+    }
+    val writeThroughputStr = writeThroughput.map(t => " (%s/s)".format(Utils.bytesToString(t)))
+
     <tr>
       <td>{info.taskId}</td>
       <td>{info.status}</td>
@@ -170,7 +186,9 @@ private[spark] class StagePage(parent: JobProgressUI) {
       }}
       {if (shuffleWrite) {
         <td>{metrics.flatMap{m => m.shuffleWriteMetrics}.map{s =>
-          Utils.bytesToString(s.shuffleBytesWritten)}.getOrElse("")}</td>
+          Utils.bytesToString(s.shuffleBytesWritten)}.getOrElse("")}
+          {writeThroughputStr.getOrElse("")}
+        </td>
       }}
       <td>{exception.map(e =>
         <span>


Mime
View raw message