spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wenc...@apache.org
Subject spark git commit: [SPARK-21219][CORE] Task retry occurs on same executor due to race co…
Date Wed, 12 Jul 2017 06:49:21 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 399aa016e -> cb6fc89ba


[SPARK-21219][CORE] Task retry occurs on same executor due to race co…

…ndition with blacklisting

There's a race condition in the current TaskSetManager where a failed task is added for retry
(addPendingTask), and can asynchronously be assigned to an executor *prior* to the blacklist
state (updateBlacklistForFailedTask), the result is the task might re-execute on the same
executor.  This is particularly problematic if the executor is shutting down since the retry
task immediately becomes a lost task (ExecutorLostFailure).  Another side effect is that the
actual failure reason gets obscured by the retry task which never actually executed.  There
are sample logs showing the issue in the https://issues.apache.org/jira/browse/SPARK-21219

The fix is to change the ordering of the addPendingTask and updatingBlackListForFailedTask
calls in TaskSetManager.handleFailedTask

Implemented a unit test that verifies the task is black listed before it is added to the pending
task.  Ran the unit test without the fix and it fails.  Ran the unit test with the fix and
it passes.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Eric Vandenberg <ericvandenbergfb.com>

Closes #18427 from ericvandenbergfb/blacklistFix.

## What changes were proposed in this pull request?

This is a backport of the fix to SPARK-21219, already checked in as 96d58f2.

## How was this patch tested?

Ran TaskSetManagerSuite tests locally.

Author: Eric Vandenberg <ericvandenberg@fb.com>

Closes #18604 from jsoltren/branch-2.2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cb6fc89b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cb6fc89b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cb6fc89b

Branch: refs/heads/branch-2.2
Commit: cb6fc89ba20a427fa7d66fa5036b17c1a5d5d87f
Parents: 399aa01
Author: Eric Vandenberg <ericvandenberg@fb.com>
Authored: Wed Jul 12 14:49:15 2017 +0800
Committer: Wenchen Fan <wenchen@databricks.com>
Committed: Wed Jul 12 14:49:15 2017 +0800

----------------------------------------------------------------------
 .../apache/spark/scheduler/TaskSetManager.scala | 21 +++++-----
 .../spark/scheduler/TaskSetManagerSuite.scala   | 44 +++++++++++++++++++-
 2 files changed, 54 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/cb6fc89b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index a41b059..6ac76c8 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -198,7 +198,7 @@ private[spark] class TaskSetManager(
   private[scheduler] var emittedTaskSizeWarning = false
 
   /** Add a task to all the pending-task lists that it should be on. */
-  private def addPendingTask(index: Int) {
+  private[spark] def addPendingTask(index: Int) {
     for (loc <- tasks(index).preferredLocations) {
       loc match {
         case e: ExecutorCacheTaskLocation =>
@@ -826,15 +826,6 @@ private[spark] class TaskSetManager(
 
     sched.dagScheduler.taskEnded(tasks(index), reason, null, accumUpdates, info)
 
-    if (successful(index)) {
-      logInfo(s"Task ${info.id} in stage ${taskSet.id} (TID $tid) failed, but the task will
not" +
-        s" be re-executed (either because the task failed with a shuffle data fetch failure,"
+
-        s" so the previous stage needs to be re-run, or because a different copy of the task"
+
-        s" has already succeeded).")
-    } else {
-      addPendingTask(index)
-    }
-
     if (!isZombie && reason.countTowardsTaskFailures) {
       taskSetBlacklistHelperOpt.foreach(_.updateBlacklistForFailedTask(
         info.host, info.executorId, index))
@@ -848,6 +839,16 @@ private[spark] class TaskSetManager(
         return
       }
     }
+
+    if (successful(index)) {
+      logInfo(s"Task ${info.id} in stage ${taskSet.id} (TID $tid) failed, but the task will
not" +
+        s" be re-executed (either because the task failed with a shuffle data fetch failure,"
+
+        s" so the previous stage needs to be re-run, or because a different copy of the task"
+
+        s" has already succeeded).")
+    } else {
+      addPendingTask(index)
+    }
+
     maybeFinishTaskSet()
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/cb6fc89b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index db14c9a..807ad0a 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -23,7 +23,7 @@ import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
 
 import org.mockito.Matchers.{any, anyInt, anyString}
-import org.mockito.Mockito.{mock, never, spy, verify, when}
+import org.mockito.Mockito.{mock, never, spy, times, verify, when}
 import org.mockito.invocation.InvocationOnMock
 import org.mockito.stubbing.Answer
 
@@ -1140,6 +1140,48 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext
with Logg
       .updateBlacklistForFailedTask(anyString(), anyString(), anyInt())
   }
 
+  test("update blacklist before adding pending task to avoid race condition") {
+    // When a task fails, it should apply the blacklist policy prior to
+    // retrying the task otherwise there's a race condition where run on
+    // the same executor that it was intended to be black listed from.
+    val conf = new SparkConf().
+      set(config.BLACKLIST_ENABLED, true)
+
+    // Create a task with two executors.
+    sc = new SparkContext("local", "test", conf)
+    val exec = "executor1"
+    val host = "host1"
+    val exec2 = "executor2"
+    val host2 = "host2"
+    sched = new FakeTaskScheduler(sc, (exec, host), (exec2, host2))
+    val taskSet = FakeTask.createTaskSet(1)
+
+    val clock = new ManualClock
+    val mockListenerBus = mock(classOf[LiveListenerBus])
+    val blacklistTracker = new BlacklistTracker(mockListenerBus, conf, None, clock)
+    val taskSetManager = new TaskSetManager(sched, taskSet, 1, Some(blacklistTracker))
+    val taskSetManagerSpy = spy(taskSetManager)
+
+    val taskDesc = taskSetManagerSpy.resourceOffer(exec, host, TaskLocality.ANY)
+
+    // Assert the task has been black listed on the executor it was last executed on.
+    when(taskSetManagerSpy.addPendingTask(anyInt())).thenAnswer(
+      new Answer[Unit] {
+        override def answer(invocationOnMock: InvocationOnMock): Unit = {
+          val task = invocationOnMock.getArgumentAt(0, classOf[Int])
+          assert(taskSetManager.taskSetBlacklistHelperOpt.get.
+            isExecutorBlacklistedForTask(exec, task))
+        }
+      }
+    )
+
+    // Simulate a fake exception
+    val e = new ExceptionFailure("a", "b", Array(), "c", None)
+    taskSetManagerSpy.handleFailedTask(taskDesc.get.taskId, TaskState.FAILED, e)
+
+    verify(taskSetManagerSpy, times(1)).addPendingTask(anyInt())
+  }
+
   private def createTaskResult(
       id: Int,
       accumUpdates: Seq[AccumulatorV2[_, _]] = Seq.empty): DirectTaskResult[Int] = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message