spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pwend...@apache.org
Subject spark git commit: [SPARK-4085] Propagate FetchFailedException when Spark fails to read local shuffle file.
Date Thu, 04 Dec 2014 00:29:06 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-1.2 6b6b7791d -> fe28ee2d1


[SPARK-4085] Propagate FetchFailedException when Spark fails to read local shuffle file.

cc aarondav kayousterhout pwendell

This should go into 1.2?

Author: Reynold Xin <rxin@databricks.com>

Closes #3579 from rxin/SPARK-4085 and squashes the following commits:

255b4fd [Reynold Xin] Updated test.
f9814d9 [Reynold Xin] Code review feedback.
2afaf35 [Reynold Xin] [SPARK-4085] Propagate FetchFailedException when Spark fails to read
local shuffle file.

(cherry picked from commit 1826372d0a1bc80db9015106dd5d2d155ada33f5)
Signed-off-by: Patrick Wendell <pwendell@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fe28ee2d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fe28ee2d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fe28ee2d

Branch: refs/heads/branch-1.2
Commit: fe28ee2d13e0799120136419deec094752d2a370
Parents: 6b6b779
Author: Reynold Xin <rxin@databricks.com>
Authored: Wed Dec 3 16:28:24 2014 -0800
Committer: Patrick Wendell <pwendell@gmail.com>
Committed: Wed Dec 3 16:29:00 2014 -0800

----------------------------------------------------------------------
 .../storage/ShuffleBlockFetcherIterator.scala   | 28 ++++++++++++--------
 .../spark/ExternalShuffleServiceSuite.scala     |  2 --
 .../scala/org/apache/spark/ShuffleSuite.scala   | 23 ++++++++++++++++
 3 files changed, 40 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/fe28ee2d/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
index 83170f7..2499c11 100644
--- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.storage
 
+import java.io.{InputStream, IOException}
 import java.util.concurrent.LinkedBlockingQueue
 
 import scala.collection.mutable.{ArrayBuffer, HashSet, Queue}
@@ -289,17 +290,22 @@ final class ShuffleBlockFetcherIterator(
     }
 
     val iteratorTry: Try[Iterator[Any]] = result match {
-      case FailureFetchResult(_, e) => Failure(e)
-      case SuccessFetchResult(blockId, _, buf) => {
-        val is = blockManager.wrapForCompression(blockId, buf.createInputStream())
-        val iter = serializer.newInstance().deserializeStream(is).asIterator
-        Success(CompletionIterator[Any, Iterator[Any]](iter, {
-          // Once the iterator is exhausted, release the buffer and set currentResult to
null
-          // so we don't release it again in cleanup.
-          currentResult = null
-          buf.release()
-        }))
-      }
+      case FailureFetchResult(_, e) =>
+        Failure(e)
+      case SuccessFetchResult(blockId, _, buf) =>
+        // There is a chance that createInputStream can fail (e.g. fetching a local file
that does
+        // not exist, SPARK-4085). In that case, we should propagate the right exception
so
+        // the scheduler gets a FetchFailedException.
+        Try(buf.createInputStream()).map { is0 =>
+          val is = blockManager.wrapForCompression(blockId, is0)
+          val iter = serializer.newInstance().deserializeStream(is).asIterator
+          CompletionIterator[Any, Iterator[Any]](iter, {
+            // Once the iterator is exhausted, release the buffer and set currentResult to
null
+            // so we don't release it again in cleanup.
+            currentResult = null
+            buf.release()
+          })
+        }
     }
 
     (result.blockId, iteratorTry)

http://git-wip-us.apache.org/repos/asf/spark/blob/fe28ee2d/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
index 55799f5..a66fa21 100644
--- a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
@@ -17,8 +17,6 @@
 
 package org.apache.spark
 
-import java.util.concurrent.atomic.AtomicInteger
-
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.spark.SparkContext._

http://git-wip-us.apache.org/repos/asf/spark/blob/fe28ee2d/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
index 5d20b4d..d8e4765 100644
--- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
@@ -24,6 +24,7 @@ import org.apache.spark.SparkContext._
 import org.apache.spark.ShuffleSuite.NonJavaSerializableClass
 import org.apache.spark.rdd.{CoGroupedRDD, OrderedRDDFunctions, RDD, ShuffledRDD, SubtractedRDD}
 import org.apache.spark.serializer.KryoSerializer
+import org.apache.spark.storage.{ShuffleDataBlockId, ShuffleBlockId}
 import org.apache.spark.util.MutablePair
 
 abstract class ShuffleSuite extends FunSuite with Matchers with LocalSparkContext {
@@ -264,6 +265,28 @@ abstract class ShuffleSuite extends FunSuite with Matchers with LocalSparkContex
       }
     }
   }
+
+  test("[SPARK-4085] rerun map stage if reduce stage cannot find its local shuffle file")
{
+    val myConf = conf.clone().set("spark.test.noStageRetry", "false")
+    sc = new SparkContext("local", "test", myConf)
+    val rdd = sc.parallelize(1 to 10, 2).map((_, 1)).reduceByKey(_ + _)
+    rdd.count()
+
+    // Delete one of the local shuffle blocks.
+    val hashFile = sc.env.blockManager.diskBlockManager.getFile(new ShuffleBlockId(0, 0,
0))
+    val sortFile = sc.env.blockManager.diskBlockManager.getFile(new ShuffleDataBlockId(0,
0, 0))
+    assert(hashFile.exists() || sortFile.exists())
+
+    if (hashFile.exists()) {
+      hashFile.delete()
+    }
+    if (sortFile.exists()) {
+      sortFile.delete()
+    }
+
+    // This count should retry the execution of the previous stage and rerun shuffle.
+    rdd.count()
+  }
 }
 
 object ShuffleSuite {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message