spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ma...@apache.org
Subject [1/2] git commit: SPARK-1124: Fix infinite retries of reduce stage when a map stage failed
Date Tue, 25 Feb 2014 01:03:12 GMT
Repository: incubator-spark
Updated Branches:
  refs/heads/branch-0.9 00db30ccc -> 0661cdcdf


SPARK-1124: Fix infinite retries of reduce stage when a map stage failed

In the previous code, if you had a failing map stage and then tried to
run reduce stages on it repeatedly, the first reduce stage would fail
correctly, but the later ones would mistakenly believe that all map
outputs are available and start failing infinitely with fetch failures
from "null".


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/5e74b8eb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/5e74b8eb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/5e74b8eb

Branch: refs/heads/branch-0.9
Commit: 5e74b8eb0199a67bf17eb1b5e48f89571755b842
Parents: 00db30c
Author: Matei Zaharia <matei@databricks.com>
Authored: Sun Feb 23 23:45:48 2014 -0800
Committer: Matei Zaharia <matei@databricks.com>
Committed: Mon Feb 24 17:00:47 2014 -0800

----------------------------------------------------------------------
 .../apache/spark/scheduler/DAGScheduler.scala   | 31 +++++++++++---------
 .../scala/org/apache/spark/FailureSuite.scala   | 13 ++++++++
 2 files changed, 30 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/5e74b8eb/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 21d16fa..21b7ccb 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -272,8 +272,10 @@ class DAGScheduler(
     if (mapOutputTracker.has(shuffleDep.shuffleId)) {
       val serLocs = mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId)
       val locs = MapOutputTracker.deserializeMapStatuses(serLocs)
-      for (i <- 0 until locs.size) stage.outputLocs(i) = List(locs(i))
-      stage.numAvailableOutputs = locs.size
+      for (i <- 0 until locs.size) {
+        stage.outputLocs(i) = Option(locs(i)).toList   // locs(i) will be null if missing
+      }
+      stage.numAvailableOutputs = locs.count(_ != null)
     } else {
       // Kind of ugly: need to register RDDs with the cache and map output tracker here
       // since we can't do it in the RDD constructor because # of partitions is unknown
@@ -373,25 +375,26 @@ class DAGScheduler(
           } else {
             def removeStage(stageId: Int) {
               // data structures based on Stage
-              stageIdToStage.get(stageId).foreach { s =>
-                if (running.contains(s)) {
+              for (stage <- stageIdToStage.get(stageId)) {
+                if (running.contains(stage)) {
                   logDebug("Removing running stage %d".format(stageId))
-                  running -= s
+                  running -= stage
+                }
+                stageToInfos -= stage
+                for (shuffleDep <- stage.shuffleDep) {
+                  shuffleToMapStage.remove(shuffleDep.shuffleId)
                 }
-                stageToInfos -= s
-                shuffleToMapStage.keys.filter(shuffleToMapStage(_) == s).foreach(shuffleId
=>
-                  shuffleToMapStage.remove(shuffleId))
-                if (pendingTasks.contains(s) && !pendingTasks(s).isEmpty) {
+                if (pendingTasks.contains(stage) && !pendingTasks(stage).isEmpty)
{
                   logDebug("Removing pending status for stage %d".format(stageId))
                 }
-                pendingTasks -= s
-                if (waiting.contains(s)) {
+                pendingTasks -= stage
+                if (waiting.contains(stage)) {
                   logDebug("Removing stage %d from waiting set.".format(stageId))
-                  waiting -= s
+                  waiting -= stage
                 }
-                if (failed.contains(s)) {
+                if (failed.contains(stage)) {
                   logDebug("Removing stage %d from failed set.".format(stageId))
-                  failed -= s
+                  failed -= stage
                 }
               }
               // data structures based on StageId

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/5e74b8eb/core/src/test/scala/org/apache/spark/FailureSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/FailureSuite.scala b/core/src/test/scala/org/apache/spark/FailureSuite.scala
index befdc15..6465a80 100644
--- a/core/src/test/scala/org/apache/spark/FailureSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FailureSuite.scala
@@ -81,6 +81,19 @@ class FailureSuite extends FunSuite with LocalSparkContext {
     FailureSuiteState.clear()
   }
 
+  // Run a map-reduce job in which the map stage always fails.
+  test("failure in a map stage") {
+    sc = new SparkContext("local", "test")
+    val data = sc.makeRDD(1 to 3).map(x => { throw new Exception; (x, x) }).groupByKey(3)
+    intercept[SparkException] {
+      data.collect()
+    }
+    // Make sure that running new jobs with the same map stage also fails
+    intercept[SparkException] {
+      data.collect()
+    }
+  }
+
   test("failure because task results are not serializable") {
     sc = new SparkContext("local[1,1]", "test")
     val results = sc.makeRDD(1 to 3).map(x => new NonSerializable)


Mime
View raw message