spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kayousterhout <...@git.apache.org>
Subject [GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Date Wed, 27 Jul 2016 22:52:05 GMT
Github user kayousterhout commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14079#discussion_r72538818
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
    @@ -611,15 +620,31 @@ private[spark] class TaskSetManager(
     
         // If no executors have registered yet, don't abort the stage, just wait.  We probably
         // got here because a task set was added before the executors registered.
    -    if (executors.nonEmpty) {
    +    if (executorsByHost.nonEmpty) {
           // take any task that needs to be scheduled, and see if we can find some executor
it *could*
           // run on
    -      pendingTask.foreach { taskId =>
    -        if (executors.forall(executorIsBlacklisted(_, taskId))) {
    -          val execs = executors.toIndexedSeq.sorted.mkString("(", ",", ")")
    -          val partition = tasks(taskId).partitionId
    -          abort(s"Aborting ${taskSet} because task $taskId (partition $partition)" +
    -            s" has already failed on executors $execs, and no other executors are available.")
    +      pendingTask.foreach { indexInTaskSet =>
    +        // try to find some executor this task can run on.  Its possible that some *other*
    +        // task isn't schedulable anywhere, but we will discover that in some later call,
    +        // when that unschedulable task is the last task remaining.
    +        val blacklistedEverywhere = executorsByHost.forall { case (host, execs) =>
    +          val nodeBlacklisted = blacklist.isNodeBlacklisted(host) ||
    +            isNodeBlacklistedForTaskSet(host) ||
    +            isNodeBlacklistedForTask(host, indexInTaskSet)
    +          if (nodeBlacklisted) {
    +            true
    +          } else {
    +            execs.forall { exec =>
    +              blacklist.isExecutorBlacklisted(exec) ||
    +                isExecutorBlacklistedForTaskSet(exec) ||
    +                isExecutorBlacklistedForTask(exec, indexInTaskSet)
    +            }
    +          }
    +        }
    +        if (blacklistedEverywhere) {
    +          val partition = tasks(indexInTaskSet).partitionId
    +          abort(s"Aborting ${taskSet} because task $indexInTaskSet (partition $partition)
cannot " +
    +            s"run anywhere due to node and executor blacklist.")
    --- End diff --
    
    Maybe refer to the config options here? Anticipating confused users...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message