spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andrewo...@apache.org
Subject spark git commit: [SPARK-4951][Core] Fix the issue that a busy executor may be killed
Date Mon, 12 Jan 2015 00:24:00 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-1.2 f04ff9d37 -> 056149d10


[SPARK-4951][Core] Fix the issue that a busy executor may be killed

A few changes to fix this issue:

1. Handle the case that receiving `SparkListenerTaskStart` before `SparkListenerBlockManagerAdded`.
2. Don't add `executorId` to `removeTimes` when the executor is busy.
3. Use `HashMap.retain` to safely traverse the HashMap and remove items.
4. Use the same lock in ExecutorAllocationManager and ExecutorAllocationListener to fix the
race condition in `totalPendingTasks`.
5. Move the blocking codes out of the message processing code in YarnSchedulerActor.

Author: zsxwing <zsxwing@gmail.com>

Closes #3783 from zsxwing/SPARK-4951 and squashes the following commits:

d51fa0d [zsxwing] Add comments
2e365ce [zsxwing] Remove expired executors from 'removeTimes' and add idle executors back
when a new executor joins
49f61a9 [zsxwing] Eliminate duplicate executor registered warnings
d4c4e9a [zsxwing] Minor fixes for the code style
05f6238 [zsxwing] Move the blocking codes out of the message processing code
105ba3a [zsxwing] Fix the race condition in totalPendingTasks
d5c615d [zsxwing] Fix the issue that a busy executor may be killed

(cherry picked from commit 6942b974adad396cba2799eac1fa90448cea4da7)
Signed-off-by: Andrew Or <andrew@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/056149d1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/056149d1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/056149d1

Branch: refs/heads/branch-1.2
Commit: 056149d10c64900fa15ed01843bf952ceb5708ea
Parents: f04ff9d
Author: zsxwing <zsxwing@gmail.com>
Authored: Sun Jan 11 16:23:28 2015 -0800
Committer: Andrew Or <andrew@databricks.com>
Committed: Sun Jan 11 16:23:56 2015 -0800

----------------------------------------------------------------------
 .../spark/ExecutorAllocationManager.scala       | 117 ++++++++++++-------
 .../cluster/YarnSchedulerBackend.scala          |  23 +++-
 .../spark/ExecutorAllocationManagerSuite.scala  |  49 +++++++-
 3 files changed, 144 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/056149d1/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index e9e90e3..a0ee2a7 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -65,6 +65,9 @@ private[spark] class ExecutorAllocationManager(
     listenerBus: LiveListenerBus,
     conf: SparkConf)
   extends Logging {
+
+  allocationManager =>
+
   import ExecutorAllocationManager._
 
   // Lower and upper bounds on the number of executors. These are required.
@@ -121,7 +124,7 @@ private[spark] class ExecutorAllocationManager(
   private var clock: Clock = new RealClock
 
   // Listener for Spark events that impact the allocation policy
-  private val listener = new ExecutorAllocationListener(this)
+  private val listener = new ExecutorAllocationListener
 
   /**
    * Verify that the settings specified through the config are valid.
@@ -209,11 +212,12 @@ private[spark] class ExecutorAllocationManager(
       addTime += sustainedSchedulerBacklogTimeout * 1000
     }
 
-    removeTimes.foreach { case (executorId, expireTime) =>
-      if (now >= expireTime) {
+    removeTimes.retain { case (executorId, expireTime) =>
+      val expired = now >= expireTime
+      if (expired) {
         removeExecutor(executorId)
-        removeTimes.remove(executorId)
       }
+      !expired
     }
   }
 
@@ -291,7 +295,7 @@ private[spark] class ExecutorAllocationManager(
     // Do not kill the executor if we have already reached the lower bound
     val numExistingExecutors = executorIds.size - executorsPendingToRemove.size
     if (numExistingExecutors - 1 < minNumExecutors) {
-      logInfo(s"Not removing idle executor $executorId because there are only " +
+      logDebug(s"Not removing idle executor $executorId because there are only " +
         s"$numExistingExecutors executor(s) left (limit $minNumExecutors)")
       return false
     }
@@ -315,7 +319,11 @@ private[spark] class ExecutorAllocationManager(
   private def onExecutorAdded(executorId: String): Unit = synchronized {
     if (!executorIds.contains(executorId)) {
       executorIds.add(executorId)
-      executorIds.foreach(onExecutorIdle)
+      // If an executor (call this executor X) is not removed because the lower bound
+      // has been reached, it will no longer be marked as idle. When new executors join,
+      // however, we are no longer at the lower bound, and so we must mark executor X
+      // as idle again so as not to forget that it is a candidate for removal. (see SPARK-4951)
+      executorIds.filter(listener.isExecutorIdle).foreach(onExecutorIdle)
       logInfo(s"New executor $executorId has registered (new total is ${executorIds.size})")
       if (numExecutorsPending > 0) {
         numExecutorsPending -= 1
@@ -373,10 +381,14 @@ private[spark] class ExecutorAllocationManager(
    * the executor is not already marked as idle.
    */
   private def onExecutorIdle(executorId: String): Unit = synchronized {
-    if (!removeTimes.contains(executorId) && !executorsPendingToRemove.contains(executorId))
{
-      logDebug(s"Starting idle timer for $executorId because there are no more tasks " +
-        s"scheduled to run on the executor (to expire in $executorIdleTimeout seconds)")
-      removeTimes(executorId) = clock.getTimeMillis + executorIdleTimeout * 1000
+    if (executorIds.contains(executorId)) {
+      if (!removeTimes.contains(executorId) && !executorsPendingToRemove.contains(executorId))
{
+        logDebug(s"Starting idle timer for $executorId because there are no more tasks "
+
+          s"scheduled to run on the executor (to expire in $executorIdleTimeout seconds)")
+        removeTimes(executorId) = clock.getTimeMillis + executorIdleTimeout * 1000
+      }
+    } else {
+      logWarning(s"Attempted to mark unknown executor $executorId idle")
     }
   }
 
@@ -396,25 +408,24 @@ private[spark] class ExecutorAllocationManager(
    * and consistency of events returned by the listener. For simplicity, it does not account
    * for speculated tasks.
    */
-  private class ExecutorAllocationListener(allocationManager: ExecutorAllocationManager)
-    extends SparkListener {
+  private class ExecutorAllocationListener extends SparkListener {
 
     private val stageIdToNumTasks = new mutable.HashMap[Int, Int]
     private val stageIdToTaskIndices = new mutable.HashMap[Int, mutable.HashSet[Int]]
     private val executorIdToTaskIds = new mutable.HashMap[String, mutable.HashSet[Long]]
 
     override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = {
-      synchronized {
-        val stageId = stageSubmitted.stageInfo.stageId
-        val numTasks = stageSubmitted.stageInfo.numTasks
+      val stageId = stageSubmitted.stageInfo.stageId
+      val numTasks = stageSubmitted.stageInfo.numTasks
+      allocationManager.synchronized {
         stageIdToNumTasks(stageId) = numTasks
         allocationManager.onSchedulerBacklogged()
       }
     }
 
     override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = {
-      synchronized {
-        val stageId = stageCompleted.stageInfo.stageId
+      val stageId = stageCompleted.stageInfo.stageId
+      allocationManager.synchronized {
         stageIdToNumTasks -= stageId
         stageIdToTaskIndices -= stageId
 
@@ -426,39 +437,49 @@ private[spark] class ExecutorAllocationManager(
       }
     }
 
-    override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = synchronized {
+    override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
       val stageId = taskStart.stageId
       val taskId = taskStart.taskInfo.taskId
       val taskIndex = taskStart.taskInfo.index
       val executorId = taskStart.taskInfo.executorId
 
-      // If this is the last pending task, mark the scheduler queue as empty
-      stageIdToTaskIndices.getOrElseUpdate(stageId, new mutable.HashSet[Int]) += taskIndex
-      val numTasksScheduled = stageIdToTaskIndices(stageId).size
-      val numTasksTotal = stageIdToNumTasks.getOrElse(stageId, -1)
-      if (numTasksScheduled == numTasksTotal) {
-        // No more pending tasks for this stage
-        stageIdToNumTasks -= stageId
-        if (stageIdToNumTasks.isEmpty) {
-          allocationManager.onSchedulerQueueEmpty()
+      allocationManager.synchronized {
+        // This guards against the race condition in which the `SparkListenerTaskStart`
+        // event is posted before the `SparkListenerBlockManagerAdded` event, which is
+        // possible because these events are posted in different threads. (see SPARK-4951)
+        if (!allocationManager.executorIds.contains(executorId)) {
+          allocationManager.onExecutorAdded(executorId)
+        }
+
+        // If this is the last pending task, mark the scheduler queue as empty
+        stageIdToTaskIndices.getOrElseUpdate(stageId, new mutable.HashSet[Int]) += taskIndex
+        val numTasksScheduled = stageIdToTaskIndices(stageId).size
+        val numTasksTotal = stageIdToNumTasks.getOrElse(stageId, -1)
+        if (numTasksScheduled == numTasksTotal) {
+          // No more pending tasks for this stage
+          stageIdToNumTasks -= stageId
+          if (stageIdToNumTasks.isEmpty) {
+            allocationManager.onSchedulerQueueEmpty()
+          }
         }
-      }
 
-      // Mark the executor on which this task is scheduled as busy
-      executorIdToTaskIds.getOrElseUpdate(executorId, new mutable.HashSet[Long]) += taskId
-      allocationManager.onExecutorBusy(executorId)
+        // Mark the executor on which this task is scheduled as busy
+        executorIdToTaskIds.getOrElseUpdate(executorId, new mutable.HashSet[Long]) += taskId
+        allocationManager.onExecutorBusy(executorId)
+      }
     }
 
-    override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = synchronized {
+    override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
       val executorId = taskEnd.taskInfo.executorId
       val taskId = taskEnd.taskInfo.taskId
-
-      // If the executor is no longer running scheduled any tasks, mark it as idle
-      if (executorIdToTaskIds.contains(executorId)) {
-        executorIdToTaskIds(executorId) -= taskId
-        if (executorIdToTaskIds(executorId).isEmpty) {
-          executorIdToTaskIds -= executorId
-          allocationManager.onExecutorIdle(executorId)
+      allocationManager.synchronized {
+        // If the executor is no longer running scheduled any tasks, mark it as idle
+        if (executorIdToTaskIds.contains(executorId)) {
+          executorIdToTaskIds(executorId) -= taskId
+          if (executorIdToTaskIds(executorId).isEmpty) {
+            executorIdToTaskIds -= executorId
+            allocationManager.onExecutorIdle(executorId)
+          }
         }
       }
     }
@@ -466,7 +487,12 @@ private[spark] class ExecutorAllocationManager(
     override def onBlockManagerAdded(blockManagerAdded: SparkListenerBlockManagerAdded):
Unit = {
       val executorId = blockManagerAdded.blockManagerId.executorId
       if (executorId != SparkContext.DRIVER_IDENTIFIER) {
-        allocationManager.onExecutorAdded(executorId)
+        // This guards against the race condition in which the `SparkListenerTaskStart`
+        // event is posted before the `SparkListenerBlockManagerAdded` event, which is
+        // possible because these events are posted in different threads. (see SPARK-4951)
+        if (!allocationManager.executorIds.contains(executorId)) {
+          allocationManager.onExecutorAdded(executorId)
+        }
       }
     }
 
@@ -478,12 +504,23 @@ private[spark] class ExecutorAllocationManager(
     /**
      * An estimate of the total number of pending tasks remaining for currently running stages.
Does
      * not account for tasks which may have failed and been resubmitted.
+     *
+     * Note: This is not thread-safe without the caller owning the `allocationManager` lock.
      */
     def totalPendingTasks(): Int = {
       stageIdToNumTasks.map { case (stageId, numTasks) =>
         numTasks - stageIdToTaskIndices.get(stageId).map(_.size).getOrElse(0)
       }.sum
     }
+
+    /**
+     * Return true if an executor is not currently running a task, and false otherwise.
+     *
+     * Note: This is not thread-safe without the caller owning the `allocationManager` lock.
+     */
+    def isExecutorIdle(executorId: String): Boolean = {
+      !executorIdToTaskIds.contains(executorId)
+    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/056149d1/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
index 50721b9..f14aaee 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.scheduler.cluster
 
+import scala.concurrent.{Future, ExecutionContext}
+
 import akka.actor.{Actor, ActorRef, Props}
 import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
 
@@ -24,7 +26,9 @@ import org.apache.spark.SparkContext
 import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
 import org.apache.spark.scheduler.TaskSchedulerImpl
 import org.apache.spark.ui.JettyUtils
-import org.apache.spark.util.AkkaUtils
+import org.apache.spark.util.{AkkaUtils, Utils}
+
+import scala.util.control.NonFatal
 
 /**
  * Abstract Yarn scheduler backend that contains common logic
@@ -97,6 +101,9 @@ private[spark] abstract class YarnSchedulerBackend(
   private class YarnSchedulerActor extends Actor {
     private var amActor: Option[ActorRef] = None
 
+    implicit val askAmActorExecutor = ExecutionContext.fromExecutor(
+      Utils.newDaemonCachedThreadPool("yarn-scheduler-ask-am-executor"))
+
     override def preStart(): Unit = {
       // Listen for disassociation events
       context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
@@ -110,7 +117,12 @@ private[spark] abstract class YarnSchedulerBackend(
       case r: RequestExecutors =>
         amActor match {
           case Some(actor) =>
-            sender ! AkkaUtils.askWithReply[Boolean](r, actor, askTimeout)
+            val driverActor = sender
+            Future {
+              driverActor ! AkkaUtils.askWithReply[Boolean](r, actor, askTimeout)
+            } onFailure {
+              case NonFatal(e) => logError(s"Sending $r to AM was unsuccessful", e)
+            }
           case None =>
             logWarning("Attempted to request executors before the AM has registered!")
             sender ! false
@@ -119,7 +131,12 @@ private[spark] abstract class YarnSchedulerBackend(
       case k: KillExecutors =>
         amActor match {
           case Some(actor) =>
-            sender ! AkkaUtils.askWithReply[Boolean](k, actor, askTimeout)
+            val driverActor = sender
+            Future {
+              driverActor ! AkkaUtils.askWithReply[Boolean](k, actor, askTimeout)
+            } onFailure {
+              case NonFatal(e) => logError(s"Sending $k to AM was unsuccessful", e)
+            }
           case None =>
             logWarning("Attempted to kill executors before the AM has registered!")
             sender ! false

http://git-wip-us.apache.org/repos/asf/spark/blob/056149d1/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index ce804f9..4fa3b34 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark
 
+import scala.collection.mutable
+
 import org.scalatest.{FunSuite, PrivateMethodTester}
 import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.scheduler._
@@ -142,11 +144,17 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext
{
 
     // Verify that running a task reduces the cap
     sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 3)))
+    sc.listenerBus.postToAll(SparkListenerBlockManagerAdded(
+      0L, BlockManagerId("executor-1", "host1", 1), 100L))
     sc.listenerBus.postToAll(SparkListenerTaskStart(1, 0, createTaskInfo(0, 0, "executor-1")))
+    assert(numExecutorsPending(manager) === 4)
     assert(addExecutors(manager) === 1)
-    assert(numExecutorsPending(manager) === 6)
+    assert(numExecutorsPending(manager) === 5)
     assert(numExecutorsToAdd(manager) === 2)
-    assert(addExecutors(manager) === 1)
+    assert(addExecutors(manager) === 2)
+    assert(numExecutorsPending(manager) === 7)
+    assert(numExecutorsToAdd(manager) === 4)
+    assert(addExecutors(manager) === 0)
     assert(numExecutorsPending(manager) === 7)
     assert(numExecutorsToAdd(manager) === 1)
 
@@ -324,6 +332,8 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext
{
     val manager = sc.executorAllocationManager.get
     manager.setClock(clock)
 
+    executorIds(manager).asInstanceOf[mutable.Set[String]] ++= List("1", "2", "3")
+
     // Starting remove timer is idempotent for each executor
     assert(removeTimes(manager).isEmpty)
     onExecutorIdle(manager, "1")
@@ -596,6 +606,41 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext
{
     assert(removeTimes(manager).size === 1)
   }
 
+  test("SPARK-4951: call onTaskStart before onBlockManagerAdded") {
+    sc = createSparkContext(2, 10)
+    val manager = sc.executorAllocationManager.get
+    assert(executorIds(manager).isEmpty)
+    assert(removeTimes(manager).isEmpty)
+
+    sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, createTaskInfo(0, 0, "executor-1")))
+    sc.listenerBus.postToAll(SparkListenerBlockManagerAdded(
+      0L, BlockManagerId("executor-1", "host1", 1), 100L))
+    assert(executorIds(manager).size === 1)
+    assert(executorIds(manager).contains("executor-1"))
+    assert(removeTimes(manager).size === 0)
+  }
+
+  test("SPARK-4951: onExecutorAdded should not add a busy executor to removeTimes") {
+    sc = createSparkContext(2, 10)
+    val manager = sc.executorAllocationManager.get
+    assert(executorIds(manager).isEmpty)
+    assert(removeTimes(manager).isEmpty)
+    sc.listenerBus.postToAll(SparkListenerBlockManagerAdded(
+      0L, BlockManagerId("executor-1", "host1", 1), 100L))
+    sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, createTaskInfo(0, 0, "executor-1")))
+
+    assert(executorIds(manager).size === 1)
+    assert(executorIds(manager).contains("executor-1"))
+    assert(removeTimes(manager).size === 0)
+
+    sc.listenerBus.postToAll(SparkListenerBlockManagerAdded(
+      0L, BlockManagerId("executor-2", "host1", 1), 100L))
+    assert(executorIds(manager).size === 2)
+    assert(executorIds(manager).contains("executor-2"))
+    assert(removeTimes(manager).size === 1)
+    assert(removeTimes(manager).contains("executor-2"))
+    assert(!removeTimes(manager).contains("executor-1"))
+  }
 }
 
 /**


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message