spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andrewo...@apache.org
Subject spark git commit: [SPARK-9552] Add force control for killExecutors to avoid false killing for those busy executors
Date Tue, 17 Nov 2015 23:43:46 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 78dc07cdf -> 33ae9c74a


[SPARK-9552] Add force control for killExecutors to avoid false killing for those busy executors

By using the dynamic allocation, sometimes it occurs false killing for those busy executors.
Some executors with assignments will be killed because of being idle for enough time (say
60 seconds). The root cause is that the Task-Launch listener event is asynchronized.

For example, some executors are under assigning tasks, but not sending out the listener notification
yet. Meanwhile, the dynamic allocation's executor idle time is up (e.g., 60 seconds). It will
trigger killExecutor event at the same time.
 1. the timer expiration starts before the listener event arrives.
 2. Then, the task is going to run on top of that killed/killing executor. It will lead to
task failure finally.

Here is the proposal to fix it. We can add the force control for killExecutor. If the force
control is not set (i.e., false), we'd better to check if the executor under killing is idle
or busy. If the current executor has some assignment, we should not kill that executor and
return back false (to indicate killing failure). In dynamic allocation, we'd better to turn
off force killing (i.e., force = false), we will meet killing failure if tries to kill a busy
executor. And then, the executor timer won't be invalid. Later on, the task assignment event
arrives, we can remove the idle timer accordingly. So that we can avoid false killing for
those busy executors in dynamic allocation.

For the rest of usages, the end users can decide if to use force killing or not by themselves.
 If to turn on that option, the killExecutor will do the action without any status checking.

Author: Grace <jie.huang@intel.com>
Author: Andrew Or <andrew@databricks.com>
Author: Jie Huang <jie.huang@intel.com>

Closes #7888 from GraceH/forcekill.

(cherry picked from commit 965245d087c18edc6c3d5baddeaf83163e32e330)
Signed-off-by: Andrew Or <andrew@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/33ae9c74
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/33ae9c74
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/33ae9c74

Branch: refs/heads/branch-1.6
Commit: 33ae9c74a569d67872a4f3c17dfaba50febfa9df
Parents: 78dc07c
Author: Grace <jie.huang@intel.com>
Authored: Tue Nov 17 15:43:35 2015 -0800
Committer: Andrew Or <andrew@databricks.com>
Committed: Tue Nov 17 15:43:43 2015 -0800

----------------------------------------------------------------------
 .../spark/ExecutorAllocationManager.scala       |  1 +
 .../scala/org/apache/spark/SparkContext.scala   |  4 +-
 .../spark/scheduler/TaskSchedulerImpl.scala     | 27 +++++++---
 .../cluster/CoarseGrainedSchedulerBackend.scala | 13 +++--
 .../StandaloneDynamicAllocationSuite.scala      | 52 +++++++++++++++++++-
 5 files changed, 82 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/33ae9c74/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index b93536e..6419218 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -509,6 +509,7 @@ private[spark] class ExecutorAllocationManager(
   private def onExecutorBusy(executorId: String): Unit = synchronized {
     logDebug(s"Clearing idle timer for $executorId because it is now running a task")
     removeTimes.remove(executorId)
+    executorsPendingToRemove.remove(executorId)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/33ae9c74/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 4bbd0b0..b5645b0 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1461,7 +1461,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
   override def killExecutors(executorIds: Seq[String]): Boolean = {
     schedulerBackend match {
       case b: CoarseGrainedSchedulerBackend =>
-        b.killExecutors(executorIds)
+        b.killExecutors(executorIds, replace = false, force = true)
       case _ =>
         logWarning("Killing executors is only supported in coarse-grained mode")
         false
@@ -1499,7 +1499,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
   private[spark] def killAndReplaceExecutor(executorId: String): Boolean = {
     schedulerBackend match {
       case b: CoarseGrainedSchedulerBackend =>
-        b.killExecutors(Seq(executorId), replace = true)
+        b.killExecutors(Seq(executorId), replace = true, force = true)
       case _ =>
         logWarning("Killing executors is only supported in coarse-grained mode")
         false

http://git-wip-us.apache.org/repos/asf/spark/blob/33ae9c74/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 5f13669..bf0419d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -87,8 +87,8 @@ private[spark] class TaskSchedulerImpl(
   // Incrementing task IDs
   val nextTaskId = new AtomicLong(0)
 
-  // Which executor IDs we have executors on
-  val activeExecutorIds = new HashSet[String]
+  // Number of tasks running on each executor
+  private val executorIdToTaskCount = new HashMap[String, Int]
 
   // The set of executors we have on each host; this is used to compute hostsAlive, which
   // in turn is used to decide when we can attain data locality on a given host
@@ -254,6 +254,7 @@ private[spark] class TaskSchedulerImpl(
             val tid = task.taskId
             taskIdToTaskSetManager(tid) = taskSet
             taskIdToExecutorId(tid) = execId
+            executorIdToTaskCount(execId) += 1
             executorsByHost(host) += execId
             availableCpus(i) -= CPUS_PER_TASK
             assert(availableCpus(i) >= 0)
@@ -282,7 +283,7 @@ private[spark] class TaskSchedulerImpl(
     var newExecAvail = false
     for (o <- offers) {
       executorIdToHost(o.executorId) = o.host
-      activeExecutorIds += o.executorId
+      executorIdToTaskCount.getOrElseUpdate(o.executorId, 0)
       if (!executorsByHost.contains(o.host)) {
         executorsByHost(o.host) = new HashSet[String]()
         executorAdded(o.executorId, o.host)
@@ -331,7 +332,8 @@ private[spark] class TaskSchedulerImpl(
         if (state == TaskState.LOST && taskIdToExecutorId.contains(tid)) {
           // We lost this entire executor, so remember that it's gone
           val execId = taskIdToExecutorId(tid)
-          if (activeExecutorIds.contains(execId)) {
+
+          if (executorIdToTaskCount.contains(execId)) {
             removeExecutor(execId,
               SlaveLost(s"Task $tid was lost, so marking the executor as lost as well."))
             failedExecutor = Some(execId)
@@ -341,7 +343,11 @@ private[spark] class TaskSchedulerImpl(
           case Some(taskSet) =>
             if (TaskState.isFinished(state)) {
               taskIdToTaskSetManager.remove(tid)
-              taskIdToExecutorId.remove(tid)
+              taskIdToExecutorId.remove(tid).foreach { execId =>
+                if (executorIdToTaskCount.contains(execId)) {
+                  executorIdToTaskCount(execId) -= 1
+                }
+              }
             }
             if (state == TaskState.FINISHED) {
               taskSet.removeRunningTask(tid)
@@ -462,7 +468,7 @@ private[spark] class TaskSchedulerImpl(
     var failedExecutor: Option[String] = None
 
     synchronized {
-      if (activeExecutorIds.contains(executorId)) {
+      if (executorIdToTaskCount.contains(executorId)) {
         val hostPort = executorIdToHost(executorId)
         logError("Lost executor %s on %s: %s".format(executorId, hostPort, reason))
         removeExecutor(executorId, reason)
@@ -498,7 +504,8 @@ private[spark] class TaskSchedulerImpl(
    * of any running tasks, since the loss reason defines whether we'll fail those tasks.
    */
   private def removeExecutor(executorId: String, reason: ExecutorLossReason) {
-    activeExecutorIds -= executorId
+    executorIdToTaskCount -= executorId
+
     val host = executorIdToHost(executorId)
     val execs = executorsByHost.getOrElse(host, new HashSet)
     execs -= executorId
@@ -535,7 +542,11 @@ private[spark] class TaskSchedulerImpl(
   }
 
   def isExecutorAlive(execId: String): Boolean = synchronized {
-    activeExecutorIds.contains(execId)
+    executorIdToTaskCount.contains(execId)
+  }
+
+  def isExecutorBusy(execId: String): Boolean = synchronized {
+    executorIdToTaskCount.getOrElse(execId, -1) > 0
   }
 
   // By default, rack is unknown

http://git-wip-us.apache.org/repos/asf/spark/blob/33ae9c74/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 3373caf..6f0c910 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -453,7 +453,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val
rpcEnv: Rp
    * @return whether the kill request is acknowledged.
    */
   final override def killExecutors(executorIds: Seq[String]): Boolean = synchronized {
-    killExecutors(executorIds, replace = false)
+    killExecutors(executorIds, replace = false, force = false)
   }
 
   /**
@@ -461,9 +461,13 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val
rpcEnv: Rp
    *
    * @param executorIds identifiers of executors to kill
    * @param replace whether to replace the killed executors with new ones
+   * @param force whether to force kill busy executors
    * @return whether the kill request is acknowledged.
    */
-  final def killExecutors(executorIds: Seq[String], replace: Boolean): Boolean = synchronized
{
+  final def killExecutors(
+      executorIds: Seq[String],
+      replace: Boolean,
+      force: Boolean): Boolean = synchronized {
     logInfo(s"Requesting to kill executor(s) ${executorIds.mkString(", ")}")
     val (knownExecutors, unknownExecutors) = executorIds.partition(executorDataMap.contains)
     unknownExecutors.foreach { id =>
@@ -471,7 +475,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val
rpcEnv: Rp
     }
 
     // If an executor is already pending to be removed, do not kill it again (SPARK-9795)
-    val executorsToKill = knownExecutors.filter { id => !executorsPendingToRemove.contains(id)
}
+    // If this executor is busy, do not kill it unless we are told to force kill it (SPARK-9552)
+    val executorsToKill = knownExecutors
+      .filter { id => !executorsPendingToRemove.contains(id) }
+      .filter { id => force || !scheduler.isExecutorBusy(id) }
     executorsPendingToRemove ++= executorsToKill
 
     // If we do not wish to replace the executors we kill, sync the target number of executors

http://git-wip-us.apache.org/repos/asf/spark/blob/33ae9c74/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
index d145e78..2fa795f 100644
--- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
@@ -17,10 +17,11 @@
 
 package org.apache.spark.deploy
 
+import scala.collection.mutable
 import scala.concurrent.duration._
 
 import org.mockito.Mockito.{mock, when}
-import org.scalatest.BeforeAndAfterAll
+import org.scalatest.{BeforeAndAfterAll, PrivateMethodTester}
 import org.scalatest.concurrent.Eventually._
 
 import org.apache.spark._
@@ -29,6 +30,7 @@ import org.apache.spark.deploy.master.ApplicationInfo
 import org.apache.spark.deploy.master.Master
 import org.apache.spark.deploy.worker.Worker
 import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv}
+import org.apache.spark.scheduler.TaskSchedulerImpl
 import org.apache.spark.scheduler.cluster._
 import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RegisterExecutor
 
@@ -38,7 +40,8 @@ import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RegisterE
 class StandaloneDynamicAllocationSuite
   extends SparkFunSuite
   with LocalSparkContext
-  with BeforeAndAfterAll {
+  with BeforeAndAfterAll
+  with PrivateMethodTester {
 
   private val numWorkers = 2
   private val conf = new SparkConf()
@@ -404,6 +407,41 @@ class StandaloneDynamicAllocationSuite
     assert(apps.head.getExecutorLimit === 1)
   }
 
+  test("disable force kill for busy executors (SPARK-9552)") {
+    sc = new SparkContext(appConf)
+    val appId = sc.applicationId
+    eventually(timeout(10.seconds), interval(10.millis)) {
+      val apps = getApplications()
+      assert(apps.size === 1)
+      assert(apps.head.id === appId)
+      assert(apps.head.executors.size === 2)
+      assert(apps.head.getExecutorLimit === Int.MaxValue)
+    }
+    var apps = getApplications()
+    // sync executors between the Master and the driver, needed because
+    // the driver refuses to kill executors it does not know about
+    syncExecutors(sc)
+    val executors = getExecutorIds(sc)
+    assert(executors.size === 2)
+
+    // simulate running a task on the executor
+    val getMap = PrivateMethod[mutable.HashMap[String, Int]]('executorIdToTaskCount)
+    val taskScheduler = sc.taskScheduler.asInstanceOf[TaskSchedulerImpl]
+    val executorIdToTaskCount = taskScheduler invokePrivate getMap()
+    executorIdToTaskCount(executors.head) = 1
+    // kill the busy executor without force; this should fail
+    assert(killExecutor(sc, executors.head, force = false))
+    apps = getApplications()
+    assert(apps.head.executors.size === 2)
+
+    // force kill busy executor
+    assert(killExecutor(sc, executors.head, force = true))
+    apps = getApplications()
+    // kill executor successfully
+    assert(apps.head.executors.size === 1)
+
+  }
+
   // ===============================
   // | Utility methods for testing |
   // ===============================
@@ -455,6 +493,16 @@ class StandaloneDynamicAllocationSuite
     sc.killExecutors(getExecutorIds(sc).take(n))
   }
 
+  /** Kill the given executor, specifying whether to force kill it. */
+  private def killExecutor(sc: SparkContext, executorId: String, force: Boolean): Boolean
= {
+    syncExecutors(sc)
+    sc.schedulerBackend match {
+      case b: CoarseGrainedSchedulerBackend =>
+        b.killExecutors(Seq(executorId), replace = false, force)
+      case _ => fail("expected coarse grained scheduler")
+    }
+  }
+
   /**
    * Return a list of executor IDs belonging to this application.
    *


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message