spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nishkamravi2 <...@git.apache.org>
Subject [GitHub] spark pull request: [SPARK-8881] Fix algorithm for scheduling exec...
Date Tue, 21 Jul 2015 07:01:01 GMT
Github user nishkamravi2 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/7274#discussion_r35074547
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/master/Master.scala ---
    @@ -543,39 +544,72 @@ private[master] class Master(
        * multiple executors from the same application may be launched on the same worker
if the worker
        * has enough cores and memory. Otherwise, each executor grabs all the cores available
on the
        * worker by default, in which case only one executor may be launched on each worker.
    +   *
    +   * It is important to allocate coresPerExecutor on each worker at a time (instead of
1 core
    +   * at a time). Consider the following example: cluster has 4 workers with 16 cores
each.
    +   * User requests 3 executors (spark.cores.max = 48, spark.executor.cores = 16). If
1 core is
    +   * allocated at a time, 12 cores from each worker would be assigned to each executor.
    +   * Since 12 < 16, no executors would launch [SPARK-8881].
        */
    -  private def startExecutorsOnWorkers(): Unit = {
    -    // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first
app
    -    // in the queue, then the second app, etc.
    +  private[master] def scheduleExecutorsOnWorkers(
    +      app: ApplicationInfo,
    +      usableWorkers: Array[WorkerInfo],
    +      spreadOutApps: Boolean): Array[Int] = {
    +    // If the number of cores per executor is not specified, then we can just schedule
    +    // 1 core at a time since we expect a single executor to be launched on each worker
    +    val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(1)
    +    val memoryPerExecutor = app.desc.memoryPerExecutorMB
    +    val numUsable = usableWorkers.length
    +    val assignedCores = new Array[Int](numUsable) // Number of cores to give to each
worker
    +    val assignedMemory = new Array[Int](numUsable) // Amount of memory to give to each
worker
    +    var coresToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
    +    var pos = 0
         if (spreadOutApps) {
    -      // Try to spread out each app among all the workers, until it has all its cores
    -      for (app <- waitingApps if app.coresLeft > 0) {
    -        val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
    -          .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
    -            worker.coresFree >= app.desc.coresPerExecutor.getOrElse(1))
    -          .sortBy(_.coresFree).reverse
    -        val numUsable = usableWorkers.length
    -        val assigned = new Array[Int](numUsable) // Number of cores to give on each node
    -        var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
    -        var pos = 0
    -        while (toAssign > 0) {
    -          if (usableWorkers(pos).coresFree - assigned(pos) > 0) {
    -            toAssign -= 1
    -            assigned(pos) += 1
    -          }
    -          pos = (pos + 1) % numUsable
    -        }
    -        // Now that we've decided how many cores to give on each node, let's actually
give them
    -        for (pos <- 0 until numUsable if assigned(pos) > 0) {
    -          allocateWorkerResourceToExecutors(app, assigned(pos), usableWorkers(pos))
    +      // Try to spread out executors among workers (sparse scheduling)
    +      while (coresToAssign > 0) {
    +        if (usableWorkers(pos).coresFree - assignedCores(pos) >= coresPerExecutor
&&
    +            usableWorkers(pos).memoryFree - assignedMemory(pos) >= memoryPerExecutor)
{
    +          coresToAssign -= coresPerExecutor
    +          assignedCores(pos) += coresPerExecutor
    +          assignedMemory(pos) += memoryPerExecutor
    --- End diff --
    
    We could potentially return assignedCores that we have thus far and proceed with scheduling.
But as discussed earlier, we are better off failing than scheduling incorrectly. Do you feel
otherwise?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message