spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pwend...@apache.org
Subject [7/7] git commit: Merge pull request #41 from pwendell/shuffle-benchmark
Date Mon, 21 Oct 2013 05:21:00 GMT
Merge pull request #41 from pwendell/shuffle-benchmark

Provide Instrumentation for Shuffle Write Performance

Shuffle write performance can have a major impact on the performance of jobs. This patch adds
a few pieces of instrumentation related to shuffle writes. They are:

1. A listing of the time spent performing blocking writes for each task. This is implemented
by keeping track of the aggregate delay seen by many individual writes.
2. An undocumented option `spark.shuffle.sync` which forces shuffle data to sync to disk.
This is necessary for measuring shuffle performance in the absence of the OS buffer cache.
3. An internal utility which micro-benchmarks write throughput for simulated shuffle outputs.

I'm going to do some performance testing on this to see whether these small timing calls add
overhead. From a feature perspective, however, I consider this complete. Any feedback is appreciated.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/35886f34
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/35886f34
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/35886f34

Branch: refs/heads/master
Commit: 35886f347466b25625d5391c97c2deb8293ebc66
Parents: 5b9380e 9e9e9e1b
Author: Patrick Wendell <pwendell@gmail.com>
Authored: Sun Oct 20 22:20:32 2013 -0700
Committer: Patrick Wendell <pwendell@gmail.com>
Committed: Sun Oct 20 22:20:32 2013 -0700

----------------------------------------------------------------------
 .../org/apache/spark/executor/TaskMetrics.scala |  5 ++
 .../apache/spark/scheduler/ShuffleMapTask.scala |  3 +
 .../spark/storage/BlockObjectWriter.scala       |  5 ++
 .../org/apache/spark/storage/DiskStore.scala    | 44 +++++++++-
 .../org/apache/spark/ui/jobs/StagePage.scala    |  4 +-
 .../scala/spark/storage/StoragePerfTester.scala | 84 ++++++++++++++++++++
 6 files changed, 141 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/35886f34/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
index 8027917,e1d80f0..40baea6
--- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
@@@ -175,9 -167,10 +177,10 @@@ private[spark] class ShuffleMapTask
        // Update shuffle metrics.
        val shuffleMetrics = new ShuffleWriteMetrics
        shuffleMetrics.shuffleBytesWritten = totalBytes
+       shuffleMetrics.shuffleWriteTime = totalTime
        metrics.get.shuffleWriteMetrics = Some(shuffleMetrics)
  
 -      return new MapStatus(blockManager.blockManagerId, compressedSizes)
 +      new MapStatus(blockManager.blockManagerId, compressedSizes)
      } catch { case e: Exception =>
        // If there is an exception from running the task, revert the partial writes
        // and throw the exception upstream to Spark.

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/35886f34/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/35886f34/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/org/apache/spark/storage/DiskStore.scala
index b7ca61e,8a8dc8c..2a9a3f6
--- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
@@@ -42,10 -42,27 +42,27 @@@ import org.apache.spark.util.Util
  private class DiskStore(blockManager: BlockManager, rootDirs: String)
    extends BlockStore(blockManager) with Logging {
  
 -  class DiskBlockObjectWriter(blockId: String, serializer: Serializer, bufferSize: Int)
 +  class DiskBlockObjectWriter(blockId: BlockId, serializer: Serializer, bufferSize: Int)
      extends BlockObjectWriter(blockId) {
  
+     /** Intercepts write calls and tracks total time spent writing. Not thread safe. */
+     private class TimeTrackingOutputStream(out: OutputStream) extends OutputStream {
+       def timeWriting = _timeWriting
+       private var _timeWriting = 0L
+ 
+       private def callWithTiming(f: => Unit) = {
+         val start = System.nanoTime()
+         f
+         _timeWriting += (System.nanoTime() - start)
+       }
+ 
+       def write(i: Int): Unit = callWithTiming(out.write(i))
+       override def write(b: Array[Byte]) = callWithTiming(out.write(b))
+       override def write(b: Array[Byte], off: Int, len: Int) = callWithTiming(out.write(b,
off, len))
+     }
+ 
      private val f: File = createFile(blockId /*, allowAppendExisting */)
+     private val syncWrites = System.getProperty("spark.shuffle.sync", "false").toBoolean
  
      // The file channel, used for repositioning / truncating the file.
      private var channel: FileChannel = null


Mime
View raw message