spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ma...@apache.org
Subject git commit: SPARK-2791: Fix committing, reverting and state tracking in shuffle file consolidation
Date Fri, 01 Aug 2014 20:57:27 GMT
Repository: spark
Updated Branches:
  refs/heads/master b270309d7 -> 78f2af582


SPARK-2791: Fix committing, reverting and state tracking in shuffle file consolidation

All changes from this PR are by mridulm and are drawn from his work in #1609. This patch is
intended to fix all major issues related to shuffle file consolidation that mridulm found,
while minimizing changes to the code, with the hope that it may be more easily merged into
1.1.

This patch is **not** intended as a replacement for #1609, which provides many additional
benefits, including fixes to ExternalAppendOnlyMap, improvements to DiskBlockObjectWriter's
API, and several new unit tests.

If it is feasible to merge #1609 for the 1.1 deadline, that is a preferable option.

Author: Aaron Davidson <aaron@databricks.com>

Closes #1678 from aarondav/consol and squashes the following commits:

53b3f6d [Aaron Davidson] Correct behavior when writing unopened file
701d045 [Aaron Davidson] Rebase with sort-based shuffle
9160149 [Aaron Davidson] SPARK-2532: Minimal shuffle consolidation fixes


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/78f2af58
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/78f2af58
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/78f2af58

Branch: refs/heads/master
Commit: 78f2af582286b81e6dc9fa9d455ed2b369d933bd
Parents: b270309
Author: Aaron Davidson <aaron@databricks.com>
Authored: Fri Aug 1 13:57:19 2014 -0700
Committer: Matei Zaharia <matei@databricks.com>
Committed: Fri Aug 1 13:57:19 2014 -0700

----------------------------------------------------------------------
 .../spark/shuffle/hash/HashShuffleWriter.scala  | 14 ++--
 .../spark/shuffle/sort/SortShuffleWriter.scala  |  3 +-
 .../spark/storage/BlockObjectWriter.scala       | 53 +++++++-----
 .../spark/storage/ShuffleBlockManager.scala     | 28 ++++---
 .../util/collection/ExternalAppendOnlyMap.scala |  2 +-
 .../spark/util/collection/ExternalSorter.scala  |  6 +-
 .../spark/storage/DiskBlockManagerSuite.scala   | 87 +++++++++++++++++++-
 .../apache/spark/tools/StoragePerfTester.scala  |  5 +-
 8 files changed, 146 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/78f2af58/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala
index 1923f7c..45d3b8b 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala
@@ -65,7 +65,8 @@ private[spark] class HashShuffleWriter[K, V](
   }
 
   /** Close this writer, passing along whether the map completed */
-  override def stop(success: Boolean): Option[MapStatus] = {
+  override def stop(initiallySuccess: Boolean): Option[MapStatus] = {
+    var success = initiallySuccess
     try {
       if (stopping) {
         return None
@@ -73,15 +74,16 @@ private[spark] class HashShuffleWriter[K, V](
       stopping = true
       if (success) {
         try {
-          return Some(commitWritesAndBuildStatus())
+          Some(commitWritesAndBuildStatus())
         } catch {
           case e: Exception =>
+            success = false
             revertWrites()
             throw e
         }
       } else {
         revertWrites()
-        return None
+        None
       }
     } finally {
       // Release the writers back to the shuffle block manager.
@@ -100,8 +102,7 @@ private[spark] class HashShuffleWriter[K, V](
     var totalBytes = 0L
     var totalTime = 0L
     val compressedSizes = shuffle.writers.map { writer: BlockObjectWriter =>
-      writer.commit()
-      writer.close()
+      writer.commitAndClose()
       val size = writer.fileSegment().length
       totalBytes += size
       totalTime += writer.timeWriting()
@@ -120,8 +121,7 @@ private[spark] class HashShuffleWriter[K, V](
   private def revertWrites(): Unit = {
     if (shuffle != null && shuffle.writers != null) {
       for (writer <- shuffle.writers) {
-        writer.revertPartialWrites()
-        writer.close()
+        writer.revertPartialWritesAndClose()
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/78f2af58/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
index 42fcd07..9a356d0 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
@@ -94,8 +94,7 @@ private[spark] class SortShuffleWriter[K, V, C](
         for (elem <- elements) {
           writer.write(elem)
         }
-        writer.commit()
-        writer.close()
+        writer.commitAndClose()
         val segment = writer.fileSegment()
         offsets(id + 1) = segment.offset + segment.length
         lengths(id) = segment.length

http://git-wip-us.apache.org/repos/asf/spark/blob/78f2af58/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
index a2687e6..01d46e1 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
@@ -39,16 +39,16 @@ private[spark] abstract class BlockObjectWriter(val blockId: BlockId)
{
   def isOpen: Boolean
 
   /**
-   * Flush the partial writes and commit them as a single atomic block. Return the
-   * number of bytes written for this commit.
+   * Flush the partial writes and commit them as a single atomic block.
    */
-  def commit(): Long
+  def commitAndClose(): Unit
 
   /**
    * Reverts writes that haven't been flushed yet. Callers should invoke this function
-   * when there are runtime exceptions.
+   * when there are runtime exceptions. This method will not throw, though it may be
+   * unsuccessful in truncating written data.
    */
-  def revertPartialWrites()
+  def revertPartialWritesAndClose()
 
   /**
    * Writes an object.
@@ -57,6 +57,7 @@ private[spark] abstract class BlockObjectWriter(val blockId: BlockId) {
 
   /**
    * Returns the file segment of committed data that this Writer has written.
+   * This is only valid after commitAndClose() has been called.
    */
   def fileSegment(): FileSegment
 
@@ -108,7 +109,7 @@ private[spark] class DiskBlockObjectWriter(
   private var ts: TimeTrackingOutputStream = null
   private var objOut: SerializationStream = null
   private val initialPosition = file.length()
-  private var lastValidPosition = initialPosition
+  private var finalPosition: Long = -1
   private var initialized = false
   private var _timeWriting = 0L
 
@@ -116,7 +117,6 @@ private[spark] class DiskBlockObjectWriter(
     fos = new FileOutputStream(file, true)
     ts = new TimeTrackingOutputStream(fos)
     channel = fos.getChannel()
-    lastValidPosition = initialPosition
     bs = compressStream(new BufferedOutputStream(ts, bufferSize))
     objOut = serializer.newInstance().serializeStream(bs)
     initialized = true
@@ -147,28 +147,36 @@ private[spark] class DiskBlockObjectWriter(
 
   override def isOpen: Boolean = objOut != null
 
-  override def commit(): Long = {
+  override def commitAndClose(): Unit = {
     if (initialized) {
       // NOTE: Because Kryo doesn't flush the underlying stream we explicitly flush both
the
       //       serializer stream and the lower level stream.
       objOut.flush()
       bs.flush()
-      val prevPos = lastValidPosition
-      lastValidPosition = channel.position()
-      lastValidPosition - prevPos
-    } else {
-      // lastValidPosition is zero if stream is uninitialized
-      lastValidPosition
+      close()
     }
+    finalPosition = file.length()
   }
 
-  override def revertPartialWrites() {
-    if (initialized) {
-      // Discard current writes. We do this by flushing the outstanding writes and
-      // truncate the file to the last valid position.
-      objOut.flush()
-      bs.flush()
-      channel.truncate(lastValidPosition)
+  // Discard current writes. We do this by flushing the outstanding writes and then
+  // truncating the file to its initial position.
+  override def revertPartialWritesAndClose() {
+    try {
+      if (initialized) {
+        objOut.flush()
+        bs.flush()
+        close()
+      }
+
+      val truncateStream = new FileOutputStream(file, true)
+      try {
+        truncateStream.getChannel.truncate(initialPosition)
+      } finally {
+        truncateStream.close()
+      }
+    } catch {
+      case e: Exception =>
+        logError("Uncaught exception while reverting partial writes to file " + file, e)
     }
   }
 
@@ -188,6 +196,7 @@ private[spark] class DiskBlockObjectWriter(
 
   // Only valid if called after commit()
   override def bytesWritten: Long = {
-    lastValidPosition - initialPosition
+    assert(finalPosition != -1, "bytesWritten is only valid after successful commit()")
+    finalPosition - initialPosition
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/78f2af58/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
index 7beb55c..28aa35b 100644
--- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
@@ -144,7 +144,8 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging
{
         if (consolidateShuffleFiles) {
           if (success) {
             val offsets = writers.map(_.fileSegment().offset)
-            fileGroup.recordMapOutput(mapId, offsets)
+            val lengths = writers.map(_.fileSegment().length)
+            fileGroup.recordMapOutput(mapId, offsets, lengths)
           }
           recycleFileGroup(fileGroup)
         } else {
@@ -247,6 +248,8 @@ object ShuffleBlockManager {
    * A particular mapper will be assigned a single ShuffleFileGroup to write its output to.
    */
   private class ShuffleFileGroup(val shuffleId: Int, val fileId: Int, val files: Array[File])
{
+    private var numBlocks: Int = 0
+
     /**
      * Stores the absolute index of each mapId in the files of this group. For instance,
      * if mapId 5 is the first block in each file, mapIdToIndex(5) = 0.
@@ -254,23 +257,27 @@ object ShuffleBlockManager {
     private val mapIdToIndex = new PrimitiveKeyOpenHashMap[Int, Int]()
 
     /**
-     * Stores consecutive offsets of blocks into each reducer file, ordered by position in
the file.
-     * This ordering allows us to compute block lengths by examining the following block
offset.
+     * Stores consecutive offsets and lengths of blocks into each reducer file, ordered by
+     * position in the file.
      * Note: mapIdToIndex(mapId) returns the index of the mapper into the vector for every
      * reducer.
      */
     private val blockOffsetsByReducer = Array.fill[PrimitiveVector[Long]](files.length) {
       new PrimitiveVector[Long]()
     }
-
-    def numBlocks = mapIdToIndex.size
+    private val blockLengthsByReducer = Array.fill[PrimitiveVector[Long]](files.length) {
+      new PrimitiveVector[Long]()
+    }
 
     def apply(bucketId: Int) = files(bucketId)
 
-    def recordMapOutput(mapId: Int, offsets: Array[Long]) {
+    def recordMapOutput(mapId: Int, offsets: Array[Long], lengths: Array[Long]) {
+      assert(offsets.length == lengths.length)
       mapIdToIndex(mapId) = numBlocks
+      numBlocks += 1
       for (i <- 0 until offsets.length) {
         blockOffsetsByReducer(i) += offsets(i)
+        blockLengthsByReducer(i) += lengths(i)
       }
     }
 
@@ -278,16 +285,11 @@ object ShuffleBlockManager {
     def getFileSegmentFor(mapId: Int, reducerId: Int): Option[FileSegment] = {
       val file = files(reducerId)
       val blockOffsets = blockOffsetsByReducer(reducerId)
+      val blockLengths = blockLengthsByReducer(reducerId)
       val index = mapIdToIndex.getOrElse(mapId, -1)
       if (index >= 0) {
         val offset = blockOffsets(index)
-        val length =
-          if (index + 1 < numBlocks) {
-            blockOffsets(index + 1) - offset
-          } else {
-            file.length() - offset
-          }
-        assert(length >= 0)
+        val length = blockLengths(index)
         Some(new FileSegment(file, offset, length))
       } else {
         None

http://git-wip-us.apache.org/repos/asf/spark/blob/78f2af58/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index b34512e..cb67a1c 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -199,7 +199,7 @@ class ExternalAppendOnlyMap[K, V, C](
 
     // Flush the disk writer's contents to disk, and update relevant variables
     def flush() = {
-      writer.commit()
+      writer.commitAndClose()
       val bytesWritten = writer.bytesWritten
       batchSizes.append(bytesWritten)
       _diskBytesSpilled += bytesWritten

http://git-wip-us.apache.org/repos/asf/spark/blob/78f2af58/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
index 54c3310..6e415a2 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
@@ -270,9 +270,10 @@ private[spark] class ExternalSorter[K, V, C](
     // How many elements we have in each partition
     val elementsPerPartition = new Array[Long](numPartitions)
 
-    // Flush the disk writer's contents to disk, and update relevant variables
+    // Flush the disk writer's contents to disk, and update relevant variables.
+    // The writer is closed at the end of this process, and cannot be reused.
     def flush() = {
-      writer.commit()
+      writer.commitAndClose()
       val bytesWritten = writer.bytesWritten
       batchSizes.append(bytesWritten)
       _diskBytesSpilled += bytesWritten
@@ -293,7 +294,6 @@ private[spark] class ExternalSorter[K, V, C](
 
         if (objectsWritten == serializerBatchSize) {
           flush()
-          writer.close()
           writer = blockManager.getDiskWriter(blockId, file, ser, fileBufferSize)
         }
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/78f2af58/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
index aaa7714..985ac93 100644
--- a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
@@ -22,11 +22,14 @@ import java.io.{File, FileWriter}
 import scala.collection.mutable
 import scala.language.reflectiveCalls
 
+import akka.actor.Props
 import com.google.common.io.Files
 import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite}
 
 import org.apache.spark.SparkConf
-import org.apache.spark.util.Utils
+import org.apache.spark.scheduler.LiveListenerBus
+import org.apache.spark.serializer.JavaSerializer
+import org.apache.spark.util.{AkkaUtils, Utils}
 
 class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach with BeforeAndAfterAll
{
   private val testConf = new SparkConf(false)
@@ -121,6 +124,88 @@ class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach
with Before
     newFile.delete()
   }
 
+  private def checkSegments(segment1: FileSegment, segment2: FileSegment) {
+    assert (segment1.file.getCanonicalPath === segment2.file.getCanonicalPath)
+    assert (segment1.offset === segment2.offset)
+    assert (segment1.length === segment2.length)
+  }
+
+  test("consolidated shuffle can write to shuffle group without messing existing offsets/lengths")
{
+
+    val serializer = new JavaSerializer(testConf)
+    val confCopy = testConf.clone
+    // reset after EACH object write. This is to ensure that there are bytes appended after
+    // an object is written. So if the codepaths assume writeObject is end of data, this
should
+    // flush those bugs out. This was common bug in ExternalAppendOnlyMap, etc.
+    confCopy.set("spark.serializer.objectStreamReset", "1")
+
+    val securityManager = new org.apache.spark.SecurityManager(confCopy)
+    // Do not use the shuffleBlockManager above !
+    val (actorSystem, boundPort) = AkkaUtils.createActorSystem("test", "localhost", 0, confCopy,
+      securityManager)
+    val master = new BlockManagerMaster(
+      actorSystem.actorOf(Props(new BlockManagerMasterActor(true, confCopy, new LiveListenerBus))),
+      confCopy)
+    val store = new BlockManager("<driver>", actorSystem, master , serializer, confCopy,
+      securityManager, null)
+
+    try {
+
+      val shuffleManager = store.shuffleBlockManager
+
+      val shuffle1 = shuffleManager.forMapTask(1, 1, 1, serializer)
+      for (writer <- shuffle1.writers) {
+        writer.write("test1")
+        writer.write("test2")
+      }
+      for (writer <- shuffle1.writers) {
+        writer.commitAndClose()
+      }
+
+      val shuffle1Segment = shuffle1.writers(0).fileSegment()
+      shuffle1.releaseWriters(success = true)
+
+      val shuffle2 = shuffleManager.forMapTask(1, 2, 1, new JavaSerializer(testConf))
+
+      for (writer <- shuffle2.writers) {
+        writer.write("test3")
+        writer.write("test4")
+      }
+      for (writer <- shuffle2.writers) {
+        writer.commitAndClose()
+      }
+      val shuffle2Segment = shuffle2.writers(0).fileSegment()
+      shuffle2.releaseWriters(success = true)
+
+      // Now comes the test :
+      // Write to shuffle 3; and close it, but before registering it, check if the file lengths
for
+      // previous task (forof shuffle1) is the same as 'segments'. Earlier, we were inferring
length
+      // of block based on remaining data in file : which could mess things up when there
is concurrent read
+      // and writes happening to the same shuffle group.
+
+      val shuffle3 = shuffleManager.forMapTask(1, 3, 1, new JavaSerializer(testConf))
+      for (writer <- shuffle3.writers) {
+        writer.write("test3")
+        writer.write("test4")
+      }
+      for (writer <- shuffle3.writers) {
+        writer.commitAndClose()
+      }
+      // check before we register.
+      checkSegments(shuffle2Segment, shuffleManager.getBlockLocation(ShuffleBlockId(1, 2,
0)))
+      shuffle3.releaseWriters(success = true)
+      checkSegments(shuffle2Segment, shuffleManager.getBlockLocation(ShuffleBlockId(1, 2,
0)))
+      shuffleManager.removeShuffle(1)
+    } finally {
+
+      if (store != null) {
+        store.stop()
+      }
+      actorSystem.shutdown()
+      actorSystem.awaitTermination()
+    }
+  }
+
   def assertSegmentEquals(blockId: BlockId, filename: String, offset: Int, length: Int) {
     val segment = diskBlockManager.getBlockLocation(blockId)
     assert(segment.file.getName === filename)

http://git-wip-us.apache.org/repos/asf/spark/blob/78f2af58/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala
----------------------------------------------------------------------
diff --git a/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala b/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala
index 8e8c356..8a05fcb 100644
--- a/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala
+++ b/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala
@@ -61,10 +61,9 @@ object StoragePerfTester {
       for (i <- 1 to recordsPerMap) {
         writers(i % numOutputSplits).write(writeData)
       }
-      writers.map {w =>
-        w.commit()
+      writers.map { w =>
+        w.commitAndClose()
         total.addAndGet(w.fileSegment().length)
-        w.close()
       }
 
       shuffle.releaseWriters(true)


Mime
View raw message