spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kayousterh...@apache.org
Subject spark git commit: [SPARK-19868] conflict TasksetManager lead to spark stopped
Date Tue, 28 Mar 2017 19:14:06 GMT
Repository: spark
Updated Branches:
  refs/heads/master d4fac410e -> 92e385e0b


[SPARK-19868] conflict TasksetManager lead to spark stopped

## What changes were proposed in this pull request?

We must set the taskset to zombie before the DAGScheduler handles the taskEnded event. It's
possible the taskEnded event will cause the DAGScheduler to launch a new stage attempt (this
happens when map output data was lost), and if this happens before the taskSet has been set
to zombie, it will appear that we have conflicting task sets.

Author: liujianhui <liujianhui@didichuxing>

Closes #17208 from liujianhuiouc/spark-19868.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/92e385e0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/92e385e0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/92e385e0

Branch: refs/heads/master
Commit: 92e385e0b55d70a48411e90aa0f2ed141c4d07c8
Parents: d4fac41
Author: liujianhui <liujianhui@didichuxing>
Authored: Tue Mar 28 12:13:45 2017 -0700
Committer: Kay Ousterhout <kayousterhout@gmail.com>
Committed: Tue Mar 28 12:13:45 2017 -0700

----------------------------------------------------------------------
 .../apache/spark/scheduler/TaskSetManager.scala | 15 ++++++-----
 .../spark/scheduler/TaskSetManagerSuite.scala   | 27 +++++++++++++++++++-
 2 files changed, 34 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/92e385e0/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index a177aab..a41b059 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -713,13 +713,7 @@ private[spark] class TaskSetManager(
       successfulTaskDurations.insert(info.duration)
     }
     removeRunningTask(tid)
-    // This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which holds the
-    // "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, we should not
-    // "deserialize" the value when holding a lock to avoid blocking other threads. So we
call
-    // "result.value()" in "TaskResultGetter.enqueueSuccessfulTask" before reaching here.
-    // Note: "result.value()" only deserializes the value when it's called at the first time,
so
-    // here "result.value()" just returns the value and won't block other threads.
-    sched.dagScheduler.taskEnded(tasks(index), Success, result.value(), result.accumUpdates,
info)
+
     // Kill any other attempts for the same task (since those are unnecessary now that one
     // attempt completed successfully).
     for (attemptInfo <- taskAttempts(index) if attemptInfo.running) {
@@ -746,6 +740,13 @@ private[spark] class TaskSetManager(
       logInfo("Ignoring task-finished event for " + info.id + " in stage " + taskSet.id +
         " because task " + index + " has already completed successfully")
     }
+    // This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which holds the
+    // "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, we should not
+    // "deserialize" the value when holding a lock to avoid blocking other threads. So we
call
+    // "result.value()" in "TaskResultGetter.enqueueSuccessfulTask" before reaching here.
+    // Note: "result.value()" only deserializes the value when it's called at the first time,
so
+    // here "result.value()" just returns the value and won't block other threads.
+    sched.dagScheduler.taskEnded(tasks(index), Success, result.value(), result.accumUpdates,
info)
     maybeFinishTaskSet()
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/92e385e0/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index 132caef..9ca6b8b 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -22,8 +22,10 @@ import java.util.{Properties, Random}
 import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
 
-import org.mockito.Matchers.{anyInt, anyString}
+import org.mockito.Matchers.{any, anyInt, anyString}
 import org.mockito.Mockito.{mock, never, spy, verify, when}
+import org.mockito.invocation.InvocationOnMock
+import org.mockito.stubbing.Answer
 
 import org.apache.spark._
 import org.apache.spark.internal.config
@@ -1056,6 +1058,29 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext
with Logg
     assert(manager.isZombie)
   }
 
+
+  test("SPARK-19868: DagScheduler only notified of taskEnd when state is ready") {
+    // dagScheduler.taskEnded() is async, so it may *seem* ok to call it before we've set
all
+    // appropriate state, eg. isZombie.   However, this sets up a race that could go the
wrong way.
+    // This is a super-focused regression test which checks the zombie state as soon as
+    // dagScheduler.taskEnded() is called, to ensure we haven't introduced a race.
+    sc = new SparkContext("local", "test")
+    sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
+    val mockDAGScheduler = mock(classOf[DAGScheduler])
+    sched.dagScheduler = mockDAGScheduler
+    val taskSet = FakeTask.createTaskSet(numTasks = 1, stageId = 0, stageAttemptId = 0)
+    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = new ManualClock(1))
+    when(mockDAGScheduler.taskEnded(any(), any(), any(), any(), any())).then(new Answer[Unit]
{
+      override def answer(invocationOnMock: InvocationOnMock): Unit = {
+        assert(manager.isZombie === true)
+      }
+    })
+    val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)
+    assert(taskOption.isDefined)
+    // this would fail, inside our mock dag scheduler, if it calls dagScheduler.taskEnded()
too soon
+    manager.handleSuccessfulTask(0, createTaskResult(0))
+  }
+
   test("SPARK-17894: Verify TaskSetManagers for different stage attempts have unique names")
{
     sc = new SparkContext("local", "test")
     sched = new FakeTaskScheduler(sc, ("exec1", "host1"))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message