spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pwend...@apache.org
Subject [09/20] git commit: Fix bug where scheduler could hang after task failure.
Date Wed, 25 Dec 2013 00:35:40 GMT
Fix bug where scheduler could hang after task failure.

When a task fails, we need to call reviveOffers() so that the
task can be rescheduled on a different machine. In the current code,
the state in ClusterTaskSetManager indicating which tasks are
pending may be updated after revive offers is called (there's a
race condition here), so when revive offers is called, the task set
manager does not yet realize that there are failed tasks that need
to be relaunched.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/2b807e4f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/2b807e4f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/2b807e4f

Branch: refs/heads/master
Commit: 2b807e4f2f853a9b1e8cba5147d182e7b05022bc
Parents: c64690d
Author: Kay Ousterhout <kayousterhout@gmail.com>
Authored: Thu Nov 14 13:33:11 2013 -0800
Committer: Kay Ousterhout <kayousterhout@gmail.com>
Committed: Thu Nov 14 13:33:11 2013 -0800

----------------------------------------------------------------------
 .../org/apache/spark/scheduler/ClusterScheduler.scala  | 13 +++----------
 1 file changed, 3 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2b807e4f/core/src/main/scala/org/apache/spark/scheduler/ClusterScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/ClusterScheduler.scala
index 37d5547..2e4ba53 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ClusterScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ClusterScheduler.scala
@@ -250,7 +250,6 @@ private[spark] class ClusterScheduler(
 
   def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
     var failedExecutor: Option[String] = None
-    var taskFailed = false
     synchronized {
       try {
         if (state == TaskState.LOST && taskIdToExecutorId.contains(tid)) {
@@ -270,9 +269,6 @@ private[spark] class ClusterScheduler(
               }
               taskIdToExecutorId.remove(tid)
             }
-            if (state == TaskState.FAILED) {
-              taskFailed = true
-            }
             activeTaskSets.get(taskSetId).foreach { taskSet =>
               if (state == TaskState.FINISHED) {
                 taskSet.removeRunningTask(tid)
@@ -294,10 +290,6 @@ private[spark] class ClusterScheduler(
       dagScheduler.executorLost(failedExecutor.get)
       backend.reviveOffers()
     }
-    if (taskFailed) {
-      // Also revive offers if a task had failed for some reason other than host lost
-      backend.reviveOffers()
-    }
   }
 
   def handleTaskGettingResult(taskSetManager: TaskSetManager, tid: Long) {
@@ -317,8 +309,9 @@ private[spark] class ClusterScheduler(
     taskState: TaskState,
     reason: Option[TaskEndReason]) = synchronized {
     taskSetManager.handleFailedTask(tid, taskState, reason)
-    if (taskState == TaskState.FINISHED) {
-      // The task finished successfully but the result was lost, so we should revive offers.
+    if (taskState != TaskState.KILLED) {
+      // Need to revive offers again now that the task set manager state has been updated
to
+      // reflect failed tasks that need to be re-run.
       backend.reviveOffers()
     }
   }


Mime
View raw message