spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From iras...@apache.org
Subject spark git commit: [SPARK-16554][CORE] Automatically Kill Executors and Nodes when they are Blacklisted
Date Thu, 09 Feb 2017 18:49:43 GMT
Repository: spark
Updated Branches:
  refs/heads/master af63c52fd -> 6287c94f0


[SPARK-16554][CORE] Automatically Kill Executors and Nodes when they are Blacklisted

## What changes were proposed in this pull request?

In SPARK-8425, we introduced a mechanism for blacklisting executors and nodes (hosts). After
a certain number of failures, these resources would be "blacklisted" and no further work would
be assigned to them for some period of time.

In some scenarios, it is better to fail fast, and to simply kill these unreliable resources.
This changes proposes to do so by having the BlacklistTracker kill unreliable resources when
they would otherwise be "blacklisted".

In order to be thread safe, this code depends on the CoarseGrainedSchedulerBackend sending
a message to the driver backend in order to do the actual killing. This also helps to prevent
a race which would permit work to begin on a resource (executor or node), between the time
the resource is marked for killing and the time at which it is finally killed.

## How was this patch tested?

./dev/run-tests
Ran https://github.com/jsoltren/jose-utils/blob/master/blacklist/test-blacklist.sh, and checked
logs to see executors and nodes being killed.

Testing can likely be improved here; suggestions welcome.

Author: José Hiram Soltren <jose@cloudera.com>

Closes #16650 from jsoltren/SPARK-16554-submit.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6287c94f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6287c94f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6287c94f

Branch: refs/heads/master
Commit: 6287c94f08200d548df5cc0a401b73b84f9968c4
Parents: af63c52
Author: José Hiram Soltren <jose@cloudera.com>
Authored: Thu Feb 9 12:49:31 2017 -0600
Committer: Imran Rashid <irashid@cloudera.com>
Committed: Thu Feb 9 12:49:31 2017 -0600

----------------------------------------------------------------------
 .../apache/spark/ExecutorAllocationClient.scala | 21 +++++-
 .../apache/spark/internal/config/package.scala  |  5 ++
 .../spark/scheduler/BlacklistTracker.scala      | 31 +++++++-
 .../spark/scheduler/TaskSchedulerImpl.scala     |  6 +-
 .../cluster/CoarseGrainedClusterMessage.scala   |  3 +
 .../cluster/CoarseGrainedSchedulerBackend.scala | 47 ++++++++----
 .../spark/ExecutorAllocationManagerSuite.scala  |  9 ++-
 .../StandaloneDynamicAllocationSuite.scala      | 62 ++++++++++++++-
 .../spark/scheduler/BlacklistTrackerSuite.scala | 79 +++++++++++++++++++-
 .../spark/scheduler/TaskSetManagerSuite.scala   |  2 +-
 docs/configuration.md                           |  9 +++
 11 files changed, 248 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6287c94f/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
index 5d47f62..e4b9f81 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
@@ -54,11 +54,30 @@ private[spark] trait ExecutorAllocationClient {
 
   /**
    * Request that the cluster manager kill the specified executors.
+   *
+   * When asking the executor to be replaced, the executor loss is considered a failure,
and
+   * killed tasks that are running on the executor will count towards the failure limits.
If no
+   * replacement is being requested, then the tasks will not count towards the limit.
+   *
+   * @param executorIds identifiers of executors to kill
+   * @param replace whether to replace the killed executors with new ones, default false
+   * @param force whether to force kill busy executors, default false
    * @return the ids of the executors acknowledged by the cluster manager to be removed.
    */
-  def killExecutors(executorIds: Seq[String]): Seq[String]
+  def killExecutors(
+    executorIds: Seq[String],
+    replace: Boolean = false,
+    force: Boolean = false): Seq[String]
 
   /**
+   * Request that the cluster manager kill every executor on the specified host.
+   * Results in a call to killExecutors for each executor on the host, with the replace
+   * and force arguments set to true.
+   * @return whether the request is acknowledged by the cluster manager.
+   */
+  def killExecutorsOnHost(host: String): Boolean
+
+    /**
    * Request that the cluster manager kill the specified executor.
    * @return whether the request is acknowledged by the cluster manager.
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/6287c94f/core/src/main/scala/org/apache/spark/internal/config/package.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 536f493..223c921 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -139,6 +139,11 @@ package object config {
       .timeConf(TimeUnit.MILLISECONDS)
       .createOptional
 
+  private[spark] val BLACKLIST_KILL_ENABLED =
+    ConfigBuilder("spark.blacklist.killBlacklistedExecutors")
+      .booleanConf
+      .createWithDefault(false)
+
   private[spark] val BLACKLIST_LEGACY_TIMEOUT_CONF =
     ConfigBuilder("spark.scheduler.executorTaskBlacklistTime")
       .internal()

http://git-wip-us.apache.org/repos/asf/spark/blob/6287c94f/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala
index 77d5c97..e130e60 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala
@@ -21,7 +21,7 @@ import java.util.concurrent.atomic.AtomicReference
 
 import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
 
-import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.{ExecutorAllocationClient, SparkConf, SparkContext}
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config
 import org.apache.spark.util.{Clock, SystemClock, Utils}
@@ -50,10 +50,11 @@ import org.apache.spark.util.{Clock, SystemClock, Utils}
 private[scheduler] class BlacklistTracker (
     private val listenerBus: LiveListenerBus,
     conf: SparkConf,
+    allocationClient: Option[ExecutorAllocationClient],
     clock: Clock = new SystemClock()) extends Logging {
 
-  def this(sc: SparkContext) = {
-    this(sc.listenerBus, sc.conf)
+  def this(sc: SparkContext, allocationClient: Option[ExecutorAllocationClient]) = {
+    this(sc.listenerBus, sc.conf, allocationClient)
   }
 
   BlacklistTracker.validateBlacklistConfs(conf)
@@ -173,6 +174,17 @@ private[scheduler] class BlacklistTracker (
         listenerBus.post(SparkListenerExecutorBlacklisted(now, exec, newTotal))
         executorIdToFailureList.remove(exec)
         updateNextExpiryTime()
+        if (conf.get(config.BLACKLIST_KILL_ENABLED)) {
+          allocationClient match {
+            case Some(allocationClient) =>
+              logInfo(s"Killing blacklisted executor id $exec " +
+                s"since spark.blacklist.killBlacklistedExecutors is set.")
+              allocationClient.killExecutors(Seq(exec), true, true)
+            case None =>
+              logWarning(s"Not attempting to kill blacklisted executor id $exec " +
+                s"since allocation client is not defined.")
+          }
+        }
 
         // In addition to blacklisting the executor, we also update the data for failures
on the
         // node, and potentially put the entire node into a blacklist as well.
@@ -187,6 +199,19 @@ private[scheduler] class BlacklistTracker (
           nodeIdToBlacklistExpiryTime.put(node, expiryTimeForNewBlacklists)
           listenerBus.post(SparkListenerNodeBlacklisted(now, node, blacklistedExecsOnNode.size))
           _nodeBlacklist.set(nodeIdToBlacklistExpiryTime.keySet.toSet)
+          if (conf.get(config.BLACKLIST_KILL_ENABLED)) {
+            allocationClient match {
+              case Some(allocationClient) =>
+                logInfo(s"Killing all executors on blacklisted host $node " +
+                  s"since spark.blacklist.killBlacklistedExecutors is set.")
+                if (allocationClient.killExecutorsOnHost(node) == false) {
+                  logError(s"Killing executors on node $node failed.")
+                }
+              case None =>
+                logWarning(s"Not attempting to kill executors on blacklisted host $node "
+
+                  s"since allocation client is not defined.")
+            }
+          }
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/6287c94f/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 8ce2ca3..bfbcfa1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -726,7 +726,11 @@ private[spark] object TaskSchedulerImpl {
 
   private def maybeCreateBlacklistTracker(sc: SparkContext): Option[BlacklistTracker] = {
     if (BlacklistTracker.isBlacklistEnabled(sc.conf)) {
-      Some(new BlacklistTracker(sc))
+      val executorAllocClient: Option[ExecutorAllocationClient] = sc.schedulerBackend match
{
+        case b: ExecutorAllocationClient => Some(b)
+        case _ => None
+      }
+      Some(new BlacklistTracker(sc, executorAllocClient))
     } else {
       None
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/6287c94f/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
index 0280359..2898cd7 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
@@ -43,6 +43,9 @@ private[spark] object CoarseGrainedClusterMessages {
   case class KillTask(taskId: Long, executor: String, interruptThread: Boolean)
     extends CoarseGrainedClusterMessage
 
+  case class KillExecutorsOnHost(host: String)
+    extends CoarseGrainedClusterMessage
+
   sealed trait RegisterExecutorResponse
 
   case object RegisteredExecutor extends CoarseGrainedClusterMessage with RegisterExecutorResponse

http://git-wip-us.apache.org/repos/asf/spark/blob/6287c94f/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 31575c0..e006cc9 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -140,6 +140,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val
rpcEnv: Rp
             // Ignoring the task kill since the executor is not registered.
             logWarning(s"Attempted to kill task $taskId for unknown executor $executorId.")
         }
+
+      case KillExecutorsOnHost(host) =>
+        scheduler.getExecutorsAliveOnHost(host).foreach { exec =>
+          killExecutors(exec.toSeq, replace = true, force = true)
+        }
     }
 
     override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
@@ -148,6 +153,14 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val
rpcEnv: Rp
         if (executorDataMap.contains(executorId)) {
           executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + executorId))
           context.reply(true)
+        } else if (scheduler.nodeBlacklist != null &&
+          scheduler.nodeBlacklist.contains(hostname)) {
+          // If the cluster manager gives us an executor on a blacklisted node (because it
+          // already started allocating those resources before we informed it of our blacklist,
+          // or if it ignored our blacklist), then we reject that executor immediately.
+          logInfo(s"Rejecting $executorId as it has been blacklisted.")
+          executorRef.send(RegisterExecutorFailed(s"Executor is blacklisted: $executorId"))
+          context.reply(true)
         } else {
           // If the executor's rpc env is not listening for incoming connections, `hostPort`
           // will be null, and the client connection should be used to contact the executor.
@@ -524,27 +537,17 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val
rpcEnv: Rp
 
   /**
    * Request that the cluster manager kill the specified executors.
-   * @return whether the kill request is acknowledged. If list to kill is empty, it will
return
-   *         false.
-   */
-  final override def killExecutors(executorIds: Seq[String]): Seq[String] = {
-    killExecutors(executorIds, replace = false, force = false)
-  }
-
-  /**
-   * Request that the cluster manager kill the specified executors.
    *
    * When asking the executor to be replaced, the executor loss is considered a failure,
and
    * killed tasks that are running on the executor will count towards the failure limits.
If no
    * replacement is being requested, then the tasks will not count towards the limit.
    *
    * @param executorIds identifiers of executors to kill
-   * @param replace whether to replace the killed executors with new ones
-   * @param force whether to force kill busy executors
-   * @return whether the kill request is acknowledged. If list to kill is empty, it will
return
-   *         false.
+   * @param replace whether to replace the killed executors with new ones, default false
+   * @param force whether to force kill busy executors, default false
+   * @return the ids of the executors acknowledged by the cluster manager to be removed.
    */
-  final def killExecutors(
+  final override def killExecutors(
       executorIds: Seq[String],
       replace: Boolean,
       force: Boolean): Seq[String] = {
@@ -600,6 +603,22 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val
rpcEnv: Rp
    */
   protected def doKillExecutors(executorIds: Seq[String]): Future[Boolean] =
     Future.successful(false)
+
+  /**
+   * Request that the cluster manager kill all executors on a given host.
+   * @return whether the kill request is acknowledged.
+   */
+  final override def killExecutorsOnHost(host: String): Boolean = {
+    logInfo(s"Requesting to kill any and all executors on host ${host}")
+    // A potential race exists if a new executor attempts to register on a host
+    // that is on the blacklist and is no no longer valid. To avoid this race,
+    // all executor registration and killing happens in the event loop. This way, either
+    // an executor will fail to register, or will be killed when all executors on a host
+    // are killed.
+    // Kill all the executors on this host in an event loop to ensure serialization.
+    driverEndpoint.send(KillExecutorsOnHost(host))
+    true
+  }
 }
 
 private[spark] object CoarseGrainedSchedulerBackend {

http://git-wip-us.apache.org/repos/asf/spark/blob/6287c94f/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index ec40971..4ea42fc 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -1138,7 +1138,10 @@ private class DummyLocalSchedulerBackend (sc: SparkContext, sb: SchedulerBackend
   override def requestExecutors(numAdditionalExecutors: Int): Boolean =
     sc.requestExecutors(numAdditionalExecutors)
 
-  override def killExecutors(executorIds: Seq[String]): Seq[String] = {
+  override def killExecutors(
+      executorIds: Seq[String],
+      replace: Boolean,
+      force: Boolean): Seq[String] = {
     val response = sc.killExecutors(executorIds)
     if (response) {
       executorIds
@@ -1154,4 +1157,8 @@ private class DummyLocalSchedulerBackend (sc: SparkContext, sb: SchedulerBackend
   override def reviveOffers(): Unit = sb.reviveOffers()
 
   override def defaultParallelism(): Int = sb.defaultParallelism()
+
+  override def killExecutorsOnHost(host: String): Boolean = {
+    false
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/6287c94f/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
index 05dad7a..54ea727 100644
--- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
@@ -20,7 +20,8 @@ package org.apache.spark.deploy
 import scala.collection.mutable
 import scala.concurrent.duration._
 
-import org.mockito.Mockito.{mock, when}
+import org.mockito.Matchers.any
+import org.mockito.Mockito.{mock, verify, when}
 import org.scalatest.{BeforeAndAfterAll, PrivateMethodTester}
 import org.scalatest.concurrent.Eventually._
 
@@ -29,10 +30,11 @@ import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMaste
 import org.apache.spark.deploy.master.ApplicationInfo
 import org.apache.spark.deploy.master.Master
 import org.apache.spark.deploy.worker.Worker
+import org.apache.spark.internal.config
 import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv}
 import org.apache.spark.scheduler.TaskSchedulerImpl
 import org.apache.spark.scheduler.cluster._
-import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RegisterExecutor
+import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{RegisterExecutor,
RegisterExecutorFailed}
 
 /**
  * End-to-end tests for dynamic allocation in standalone mode.
@@ -467,6 +469,52 @@ class StandaloneDynamicAllocationSuite
     }
   }
 
+  test("kill all executors on localhost") {
+    sc = new SparkContext(appConf)
+    val appId = sc.applicationId
+    eventually(timeout(10.seconds), interval(10.millis)) {
+      val apps = getApplications()
+      assert(apps.size === 1)
+      assert(apps.head.id === appId)
+      assert(apps.head.executors.size === 2)
+      assert(apps.head.getExecutorLimit === Int.MaxValue)
+    }
+    val beforeList = getApplications().head.executors.keys.toSet
+    assert(killExecutorsOnHost(sc, "localhost").equals(true))
+
+    syncExecutors(sc)
+    val afterList = getApplications().head.executors.keys.toSet
+
+    eventually(timeout(10.seconds), interval(100.millis)) {
+      assert(beforeList.intersect(afterList).size == 0)
+    }
+  }
+
+  test("executor registration on a blacklisted host must fail") {
+    sc = new SparkContext(appConf.set(config.BLACKLIST_ENABLED.key, "true"))
+    val endpointRef = mock(classOf[RpcEndpointRef])
+    val mockAddress = mock(classOf[RpcAddress])
+    when(endpointRef.address).thenReturn(mockAddress)
+    val message = RegisterExecutor("one", endpointRef, "blacklisted-host", 10, Map.empty)
+
+    // Get "localhost" on a blacklist.
+    val taskScheduler = mock(classOf[TaskSchedulerImpl])
+    when(taskScheduler.nodeBlacklist()).thenReturn(Set("blacklisted-host"))
+    when(taskScheduler.sc).thenReturn(sc)
+    sc.taskScheduler = taskScheduler
+
+    // Create a fresh scheduler backend to blacklist "localhost".
+    sc.schedulerBackend.stop()
+    val backend =
+      new StandaloneSchedulerBackend(taskScheduler, sc, Array(masterRpcEnv.address.toSparkURL))
+    backend.start()
+
+    backend.driverEndpoint.ask[Boolean](message)
+    eventually(timeout(10.seconds), interval(100.millis)) {
+      verify(endpointRef).send(RegisterExecutorFailed(any()))
+    }
+  }
+
   // ===============================
   // | Utility methods for testing |
   // ===============================
@@ -528,6 +576,16 @@ class StandaloneDynamicAllocationSuite
     }
   }
 
+  /** Kill the executors on a given host. */
+  private def killExecutorsOnHost(sc: SparkContext, host: String): Boolean = {
+    syncExecutors(sc)
+    sc.schedulerBackend match {
+      case b: CoarseGrainedSchedulerBackend =>
+        b.killExecutorsOnHost(host)
+      case _ => fail("expected coarse grained scheduler")
+    }
+  }
+
   /**
    * Return a list of executor IDs belonging to this application.
    *

http://git-wip-us.apache.org/repos/asf/spark/blob/6287c94f/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala
index ead6955..2b18ebe 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala
@@ -17,7 +17,10 @@
 
 package org.apache.spark.scheduler
 
-import org.mockito.Mockito.{verify, when}
+import org.mockito.invocation.InvocationOnMock
+import org.mockito.Matchers.any
+import org.mockito.Mockito.{never, verify, when}
+import org.mockito.stubbing.Answer
 import org.scalatest.BeforeAndAfterEach
 import org.scalatest.mock.MockitoSugar
 
@@ -43,7 +46,7 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach
with M
     clock.setTime(0)
 
     listenerBusMock = mock[LiveListenerBus]
-    blacklist = new BlacklistTracker(listenerBusMock, conf, clock)
+    blacklist = new BlacklistTracker(listenerBusMock, conf, None, clock)
   }
 
   override def afterEach(): Unit = {
@@ -272,12 +275,14 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach
with M
     // if task failures are spaced out by more than the timeout period, the first failure
is timed
     // out, and the executor isn't blacklisted.
     var stageId = 0
+
     def failOneTaskInTaskSet(exec: String): Unit = {
       val taskSetBlacklist = createTaskSetBlacklist(stageId = stageId)
       taskSetBlacklist.updateBlacklistForFailedTask("host-" + exec, exec, 0)
       blacklist.updateBlacklistForSuccessfulTaskSet(stageId, 0, taskSetBlacklist.execToFailures)
       stageId += 1
     }
+
     failOneTaskInTaskSet(exec = "1")
     // We have one sporadic failure on exec 2, but that's it.  Later checks ensure that we
never
     // blacklist executor 2 despite this one failure.
@@ -411,7 +416,7 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach
with M
     // if you explicitly set the legacy conf to 0, that also would disable blacklisting
     conf.set(config.BLACKLIST_LEGACY_TIMEOUT_CONF, 0L)
     assert(!BlacklistTracker.isBlacklistEnabled(conf))
-    // but again, the new conf takes precendence
+    // but again, the new conf takes precedence
     conf.set(config.BLACKLIST_ENABLED, true)
     assert(BlacklistTracker.isBlacklistEnabled(conf))
     assert(1000 === BlacklistTracker.getBlacklistTimeout(conf))
@@ -456,4 +461,72 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach
with M
       conf.remove(config)
     }
   }
+
+  test("blacklisting kills executors, configured by BLACKLIST_KILL_ENABLED") {
+    val allocationClientMock = mock[ExecutorAllocationClient]
+    when(allocationClientMock.killExecutors(any(), any(), any())).thenReturn(Seq("called"))
+    when(allocationClientMock.killExecutorsOnHost("hostA")).thenAnswer(new Answer[Boolean]
{
+      // To avoid a race between blacklisting and killing, it is important that the nodeBlacklist
+      // is updated before we ask the executor allocation client to kill all the executors
+      // on a particular host.
+      override def answer(invocation: InvocationOnMock): Boolean = {
+        if (blacklist.nodeBlacklist.contains("hostA") == false) {
+          throw new IllegalStateException("hostA should be on the blacklist")
+        }
+        true
+      }
+    })
+    blacklist = new BlacklistTracker(listenerBusMock, conf, Some(allocationClientMock), clock)
+
+    // Disable auto-kill. Blacklist an executor and make sure killExecutors is not called.
+    conf.set(config.BLACKLIST_KILL_ENABLED, false)
+
+    val taskSetBlacklist0 = createTaskSetBlacklist(stageId = 0)
+    // Fail 4 tasks in one task set on executor 1, so that executor gets blacklisted for
the whole
+    // application.
+    (0 until 4).foreach { partition =>
+      taskSetBlacklist0.updateBlacklistForFailedTask("hostA", exec = "1", index = partition)
+    }
+    blacklist.updateBlacklistForSuccessfulTaskSet(0, 0, taskSetBlacklist0.execToFailures)
+
+    verify(allocationClientMock, never).killExecutor(any())
+
+    val taskSetBlacklist1 = createTaskSetBlacklist(stageId = 1)
+    // Fail 4 tasks in one task set on executor 2, so that executor gets blacklisted for
the whole
+    // application.  Since that's the second executor that is blacklisted on the same node,
we also
+    // blacklist that node.
+    (0 until 4).foreach { partition =>
+      taskSetBlacklist1.updateBlacklistForFailedTask("hostA", exec = "2", index = partition)
+    }
+    blacklist.updateBlacklistForSuccessfulTaskSet(0, 0, taskSetBlacklist1.execToFailures)
+
+    verify(allocationClientMock, never).killExecutors(any(), any(), any())
+    verify(allocationClientMock, never).killExecutorsOnHost(any())
+
+    // Enable auto-kill. Blacklist an executor and make sure killExecutors is called.
+    conf.set(config.BLACKLIST_KILL_ENABLED, true)
+    blacklist = new BlacklistTracker(listenerBusMock, conf, Some(allocationClientMock), clock)
+
+    val taskSetBlacklist2 = createTaskSetBlacklist(stageId = 0)
+    // Fail 4 tasks in one task set on executor 1, so that executor gets blacklisted for
the whole
+    // application.
+    (0 until 4).foreach { partition =>
+      taskSetBlacklist2.updateBlacklistForFailedTask("hostA", exec = "1", index = partition)
+    }
+    blacklist.updateBlacklistForSuccessfulTaskSet(0, 0, taskSetBlacklist2.execToFailures)
+
+    verify(allocationClientMock).killExecutors(Seq("1"), true, true)
+
+    val taskSetBlacklist3 = createTaskSetBlacklist(stageId = 1)
+    // Fail 4 tasks in one task set on executor 2, so that executor gets blacklisted for
the whole
+    // application.  Since that's the second executor that is blacklisted on the same node,
we also
+    // blacklist that node.
+    (0 until 4).foreach { partition =>
+      taskSetBlacklist3.updateBlacklistForFailedTask("hostA", exec = "2", index = partition)
+    }
+    blacklist.updateBlacklistForSuccessfulTaskSet(0, 0, taskSetBlacklist3.execToFailures)
+
+    verify(allocationClientMock).killExecutors(Seq("2"), true, true)
+    verify(allocationClientMock).killExecutorsOnHost("hostA")
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/6287c94f/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index ffb9fe4..d03a0c9 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -429,7 +429,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext
with Logg
     // We don't directly use the application blacklist, but its presence triggers blacklisting
     // within the taskset.
     val mockListenerBus = mock(classOf[LiveListenerBus])
-    val blacklistTrackerOpt = Some(new BlacklistTracker(mockListenerBus, conf, clock))
+    val blacklistTrackerOpt = Some(new BlacklistTracker(mockListenerBus, conf, None, clock))
     val manager = new TaskSetManager(sched, taskSet, 4, blacklistTrackerOpt, clock)
 
     {

http://git-wip-us.apache.org/repos/asf/spark/blob/6287c94f/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index 2eaaa21..2fcb3a0 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -1402,6 +1402,15 @@ Apart from these, the following properties are also available, and
may be useful
   </td>
 </tr>
 <tr>
+  <td><code>spark.blacklist.killBlacklistedExecutors</code></td>
+  <td>false</td>
+  <td>
+    (Experimental) If set to "true", allow Spark to automatically kill, and attempt to re-create,
+    executors when they are blacklisted.  Note that, when an entire node is added to the
blacklist,
+    all of the executors on that node will be killed.
+  </td>
+</tr>
+<tr>
   <td><code>spark.speculation</code></td>
   <td>false</td>
   <td>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message