spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject [02/10] git commit: Basic shuffle file consolidation
Date Tue, 22 Oct 2013 05:45:13 GMT
Basic shuffle file consolidation

The Spark shuffle phase can produce a large number of files, as one file is created
per mapper per reducer. For large or repeated jobs, this often produces millions of
shuffle files, which sees extremely degredaded performance from the OS file system.
This patch seeks to reduce that burden by combining multipe shuffle files into one.

This PR draws upon the work of Jason Dai in https://github.com/mesos/spark/pull/669.
However, it simplifies the design in order to get the majority of the gain with less
overall intellectual and code burden. The vast majority of code in this pull request
is a refactor to allow the insertion of a clean layer of indirection between logical
block ids and physical files. This, I feel, provides some design clarity in addition
to enabling shuffle file consolidation.

The main goal is to produce one shuffle file per reducer per active mapper thread.
This allows us to isolate the mappers (simplifying the failure modes), while still
allowing us to reduce the number of mappers tremendously for large tasks. In order
to accomplish this, we simply create a new set of shuffle files for every parallel
task, and return the files to a pool which will be given out to the next run task.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/136b9b3a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/136b9b3a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/136b9b3a

Branch: refs/heads/master
Commit: 136b9b3a3ed358bc04b28e8d62657d56d55c2c3e
Parents: 861dc40
Author: Aaron Davidson <aaron@databricks.com>
Authored: Sun Oct 20 02:30:23 2013 -0700
Committer: Aaron Davidson <aaron@databricks.com>
Committed: Sun Oct 20 02:58:26 2013 -0700

----------------------------------------------------------------------
 .../spark/network/netty/FileServerHandler.java  |  2 +-
 .../scala/org/apache/spark/TaskContext.scala    |  1 +
 .../org/apache/spark/executor/Executor.scala    |  2 +-
 .../apache/spark/scheduler/ShuffleMapTask.scala |  3 +-
 .../scala/org/apache/spark/scheduler/Task.scala |  4 +-
 .../org/apache/spark/storage/BlockManager.scala |  1 +
 .../spark/storage/ShuffleBlockManager.scala     | 56 +++++++++++++++++---
 .../spark/scheduler/TaskContextSuite.scala      |  2 +-
 8 files changed, 57 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/136b9b3a/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java b/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java
index ab790b7..172c6e4 100644
--- a/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java
+++ b/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java
@@ -51,7 +51,7 @@ class FileServerHandler extends ChannelInboundMessageHandlerAdapter<String>
{
         ctx.flush();
         return;
       }
-      long length = file.length();
+      long length = fileSegment.length();
       if (length > Integer.MAX_VALUE || length <= 0) {
         ctx.write(new FileHeader(0, blockId).buffer());
         ctx.flush();

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/136b9b3a/core/src/main/scala/org/apache/spark/TaskContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala b/core/src/main/scala/org/apache/spark/TaskContext.scala
index cae983e..7601ffe 100644
--- a/core/src/main/scala/org/apache/spark/TaskContext.scala
+++ b/core/src/main/scala/org/apache/spark/TaskContext.scala
@@ -25,6 +25,7 @@ class TaskContext(
   val stageId: Int,
   val partitionId: Int,
   val attemptId: Long,
+  val executorId: String,
   val runningLocally: Boolean = false,
   @volatile var interrupted: Boolean = false,
   private[spark] val taskMetrics: TaskMetrics = TaskMetrics.empty()

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/136b9b3a/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 032eb04..eb12c26 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -206,7 +206,7 @@ private[spark] class Executor(
 
         // Run the actual task and measure its runtime.
         taskStart = System.currentTimeMillis()
-        val value = task.run(taskId.toInt)
+        val value = task.run(taskId.toInt, executorId)
         val taskFinish = System.currentTimeMillis()
 
         // If the task has been killed, let's fail it.

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/136b9b3a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
index ed1b36d..29c6108 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
@@ -152,7 +152,8 @@ private[spark] class ShuffleMapTask(
     try {
       // Obtain all the block writers for shuffle blocks.
       val ser = SparkEnv.get.serializerManager.get(dep.serializerClass)
-      shuffle = blockManager.shuffleBlockManager.forShuffle(dep.shuffleId, numOutputSplits,
ser)
+      shuffle = blockManager.shuffleBlockManager.forShuffle(
+        dep.shuffleId, context.executorId, numOutputSplits, ser)
       buckets = shuffle.acquireWriters(partitionId)
 
       // Write the map output to its associated buckets.

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/136b9b3a/core/src/main/scala/org/apache/spark/scheduler/Task.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
index 1fe0d0e..64fe5b1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
@@ -45,8 +45,8 @@ import org.apache.spark.util.ByteBufferInputStream
  */
 private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) extends Serializable
{
 
-  def run(attemptId: Long): T = {
-    context = new TaskContext(stageId, partitionId, attemptId, runningLocally = false)
+  final def run(attemptId: Long, executorId: String): T = {
+    context = new TaskContext(stageId, partitionId, attemptId, executorId, runningLocally
= false)
     if (_killed) {
       kill()
     }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/136b9b3a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 2f96590..1f173c7 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -578,6 +578,7 @@ private[spark] class BlockManager(
     val file = diskBlockManager.createBlockFile(blockId, filename, allowAppending =  true)
     val writer = new DiskBlockObjectWriter(blockId, file, serializer, bufferSize, compressStream)
     writer.registerCloseEventHandler(() => {
+      diskBlockManager.mapBlockToFileSegment(blockId, writer.fileSegment())
       val myInfo = new BlockInfo(StorageLevel.DISK_ONLY, false)
       blockInfo.put(blockId, myInfo)
       myInfo.markReady(writer.fileSegment().length)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/136b9b3a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
index 05a14c9..6208856 100644
--- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
@@ -17,12 +17,13 @@
 
 package org.apache.spark.storage
 
-import org.apache.spark.serializer.Serializer
+import java.util.concurrent.ConcurrentLinkedQueue
+import java.util.concurrent.atomic.AtomicInteger
 
+import org.apache.spark.serializer.Serializer
 
 private[spark]
-class ShuffleWriterGroup(val id: Int, val writers: Array[BlockObjectWriter])
-
+class ShuffleWriterGroup(val id: Int, val fileId: Int, val writers: Array[BlockObjectWriter])
 
 private[spark]
 trait ShuffleBlocks {
@@ -30,24 +31,63 @@ trait ShuffleBlocks {
   def releaseWriters(group: ShuffleWriterGroup)
 }
 
+/**
+ * Manages assigning disk-based block writers to shuffle tasks. Each shuffle task gets one
writer
+ * per reducer.
+ *
+ * As an optimization to reduce the number of physical shuffle files produced, multiple shuffle
+ * Blocks are aggregated into the same file. There is one "combined shuffle file" per reducer
+ * per concurrently executing shuffle task. As soon as a task finishes writing to its shuffle
files,
+ * it releases them for another task.
+ * Regarding the implementation of this feature, shuffle files are identified by a 4-tuple:
+ *   - shuffleId: The unique id given to the entire shuffle stage.
+ *   - executorId: The id of the executor running the task. Required in order to ensure that
+ *       multiple executors running on the same node do not collide.
+ *   - bucketId: The id of the output partition (i.e., reducer id)
+ *   - fileId: The unique id identifying a group of "combined shuffle files." Only one task
at a
+ *       time owns a particular fileId, and this id is returned to a pool when the task finishes.
+ */
 private[spark]
 class ShuffleBlockManager(blockManager: BlockManager) {
+  /** Turning off shuffle file consolidation causes all shuffle Blocks to get their own file.
*/
+  val consolidateShuffleFiles =
+    System.getProperty("spark.storage.consolidateShuffleFiles", "true").toBoolean
+
+  var nextFileId = new AtomicInteger(0)
+  val unusedFileIds = new ConcurrentLinkedQueue[java.lang.Integer]()
 
-  def forShuffle(shuffleId: Int, numBuckets: Int, serializer: Serializer): ShuffleBlocks
= {
+  def forShuffle(shuffleId: Int, executorId: String, numBuckets: Int, serializer: Serializer)
= {
     new ShuffleBlocks {
       // Get a group of writers for a map task.
       override def acquireWriters(mapId: Int): ShuffleWriterGroup = {
         val bufferSize = System.getProperty("spark.shuffle.file.buffer.kb", "100").toInt
* 1024
+        val fileId = getUnusedFileId()
         val writers = Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
           val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
-          blockManager.getDiskWriter(blockId, blockId.name, serializer, bufferSize)
+          val filename = physicalFileName(shuffleId, executorId, bucketId, fileId)
+          blockManager.getDiskWriter(blockId, filename, serializer, bufferSize)
         }
-        new ShuffleWriterGroup(mapId, writers)
+        new ShuffleWriterGroup(mapId, fileId, writers)
       }
 
-      override def releaseWriters(group: ShuffleWriterGroup) = {
-        // Nothing really to release here.
+      override def releaseWriters(group: ShuffleWriterGroup) {
+        recycleFileId(group.fileId)
       }
     }
   }
+
+  private def getUnusedFileId(): Int = {
+    val fileId = unusedFileIds.poll()
+    if (fileId == null) nextFileId.getAndIncrement()
+    else fileId
+  }
+
+  private def recycleFileId(fileId: Int) {
+    if (!consolidateShuffleFiles) { return } // ensures we always generate new file id
+    unusedFileIds.add(fileId)
+  }
+
+  private def physicalFileName(shuffleId: Int, executorId: String, bucketId: Int, fileId:
Int) = {
+    "merged_shuffle_%d_%s_%d_%d".format(shuffleId, executorId, bucketId, fileId)
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/136b9b3a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
index e31a116..668cd5d 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
@@ -40,7 +40,7 @@ class TaskContextSuite extends FunSuite with BeforeAndAfter with LocalSparkConte
     val func = (c: TaskContext, i: Iterator[String]) => i.next
     val task = new ResultTask[String, String](0, rdd, func, 0, Seq(), 0)
     intercept[RuntimeException] {
-      task.run(0)
+      task.run(0, "test")
     }
     assert(completed === true)
   }


Mime
View raw message