spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wenc...@apache.org
Subject spark git commit: [SPARK-22227][CORE] DiskBlockManager.getAllBlocks now tolerates temp files
Date Wed, 25 Oct 2017 21:18:33 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 4c1a8682e -> 9ed64048a


[SPARK-22227][CORE] DiskBlockManager.getAllBlocks now tolerates temp files

Prior to this commit getAllBlocks implicitly assumed that the directories
managed by the DiskBlockManager contain only the files corresponding to
valid block IDs. In reality, this assumption was violated during shuffle,
which produces temporary files in the same directory as the resulting
blocks. As a result, calls to getAllBlocks during shuffle were unreliable.

The fix could be made more efficient, but this is probably good enough.

`DiskBlockManagerSuite`

Author: Sergei Lebedev <s.lebedev@criteo.com>

Closes #19458 from superbobry/block-id-option.

(cherry picked from commit b377ef133cdc38d49b460b2cc6ece0b5892804cc)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9ed64048
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9ed64048
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9ed64048

Branch: refs/heads/branch-2.2
Commit: 9ed64048a740fbcd15d2b830b1edbb728f87c423
Parents: 4c1a868
Author: Sergei Lebedev <s.lebedev@criteo.com>
Authored: Wed Oct 25 22:15:44 2017 +0100
Committer: Wenchen Fan <wenchen@databricks.com>
Committed: Wed Oct 25 22:17:40 2017 +0100

----------------------------------------------------------------------
 .../scala/org/apache/spark/storage/BlockId.scala    | 16 +++++++++++++---
 .../org/apache/spark/storage/DiskBlockManager.scala | 11 ++++++++++-
 .../org/apache/spark/storage/BlockIdSuite.scala     |  7 +------
 .../spark/storage/DiskBlockManagerSuite.scala       |  7 +++++++
 4 files changed, 31 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9ed64048/core/src/main/scala/org/apache/spark/storage/BlockId.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockId.scala b/core/src/main/scala/org/apache/spark/storage/BlockId.scala
index 524f697..8c1e657 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockId.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockId.scala
@@ -19,6 +19,7 @@ package org.apache.spark.storage
 
 import java.util.UUID
 
+import org.apache.spark.SparkException
 import org.apache.spark.annotation.DeveloperApi
 
 /**
@@ -101,6 +102,10 @@ private[spark] case class TestBlockId(id: String) extends BlockId {
 }
 
 @DeveloperApi
+class UnrecognizedBlockId(name: String)
+    extends SparkException(s"Failed to parse $name into a block ID")
+
+@DeveloperApi
 object BlockId {
   val RDD = "rdd_([0-9]+)_([0-9]+)".r
   val SHUFFLE = "shuffle_([0-9]+)_([0-9]+)_([0-9]+)".r
@@ -109,10 +114,11 @@ object BlockId {
   val BROADCAST = "broadcast_([0-9]+)([_A-Za-z0-9]*)".r
   val TASKRESULT = "taskresult_([0-9]+)".r
   val STREAM = "input-([0-9]+)-([0-9]+)".r
+  val TEMP_LOCAL = "temp_local_([-A-Fa-f0-9]+)".r
+  val TEMP_SHUFFLE = "temp_shuffle_([-A-Fa-f0-9]+)".r
   val TEST = "test_(.*)".r
 
-  /** Converts a BlockId "name" String back into a BlockId. */
-  def apply(id: String): BlockId = id match {
+  def apply(name: String): BlockId = name match {
     case RDD(rddId, splitIndex) =>
       RDDBlockId(rddId.toInt, splitIndex.toInt)
     case SHUFFLE(shuffleId, mapId, reduceId) =>
@@ -127,9 +133,13 @@ object BlockId {
       TaskResultBlockId(taskId.toLong)
     case STREAM(streamId, uniqueId) =>
       StreamBlockId(streamId.toInt, uniqueId.toLong)
+    case TEMP_LOCAL(uuid) =>
+      TempLocalBlockId(UUID.fromString(uuid))
+    case TEMP_SHUFFLE(uuid) =>
+      TempShuffleBlockId(UUID.fromString(uuid))
     case TEST(value) =>
       TestBlockId(value)
     case _ =>
-      throw new IllegalStateException("Unrecognized BlockId: " + id)
+      throw new UnrecognizedBlockId(name)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/9ed64048/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
index 3d43e3c..a69bcc9 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
@@ -100,7 +100,16 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop:
Boolea
 
   /** List all the blocks currently stored on disk by the disk manager. */
   def getAllBlocks(): Seq[BlockId] = {
-    getAllFiles().map(f => BlockId(f.getName))
+    getAllFiles().flatMap { f =>
+      try {
+        Some(BlockId(f.getName))
+      } catch {
+        case _: UnrecognizedBlockId =>
+          // Skip files which do not correspond to blocks, for example temporary
+          // files created by [[SortShuffleWriter]].
+          None
+      }
+    }
   }
 
   /** Produces a unique block id and File suitable for storing local intermediate results.
*/

http://git-wip-us.apache.org/repos/asf/spark/blob/9ed64048/core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala
index 89ed031..6bc6324 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala
@@ -33,13 +33,8 @@ class BlockIdSuite extends SparkFunSuite {
   }
 
   test("test-bad-deserialization") {
-    try {
-      // Try to deserialize an invalid block id.
+    intercept[UnrecognizedBlockId] {
       BlockId("myblock")
-      fail()
-    } catch {
-      case e: IllegalStateException => // OK
-      case _: Throwable => fail()
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/9ed64048/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
index 7859b0b..0c4f3c4 100644
--- a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.storage
 
 import java.io.{File, FileWriter}
+import java.util.UUID
 
 import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
 
@@ -79,6 +80,12 @@ class DiskBlockManagerSuite extends SparkFunSuite with BeforeAndAfterEach
with B
     assert(diskBlockManager.getAllBlocks.toSet === ids.toSet)
   }
 
+  test("SPARK-22227: non-block files are skipped") {
+    val file = diskBlockManager.getFile("unmanaged_file")
+    writeToFile(file, 10)
+    assert(diskBlockManager.getAllBlocks().isEmpty)
+  }
+
   def writeToFile(file: File, numBytes: Int) {
     val writer = new FileWriter(file, true)
     for (i <- 0 until numBytes) writer.write(i)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message