spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andrewor14 <...@git.apache.org>
Subject [GitHub] spark pull request: [SPARK-8881] Fix algorithm for scheduling exec...
Date Fri, 17 Jul 2015 01:30:12 GMT
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/7274#discussion_r34855772
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/master/Master.scala ---
    @@ -544,58 +544,82 @@ private[master] class Master(
        * has enough cores and memory. Otherwise, each executor grabs all the cores available
on the
        * worker by default, in which case only one executor may be launched on each worker.
        */
    -  private def startExecutorsOnWorkers(): Unit = {
    -    // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first
app
    -    // in the queue, then the second app, etc.
    +
    +  private[master] def scheduleExecutorsOnWorkers(app: ApplicationInfo,
    +    usableWorkers: Array[WorkerInfo], spreadOutApps: Boolean): Array[Int] = {
    +    val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(1)
    +    val memoryPerExecutor = app.desc.memoryPerExecutorMB
    +    val numUsable = usableWorkers.length
    +    val assignedCores = new Array[Int](numUsable) // Number of cores to give to each
worker
    +    val assignedMemory = new Array[Int](numUsable) // Amount of memory to give to each
worker
    +    var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
    +    var pos = 0
         if (spreadOutApps) {
    -      // Try to spread out each app among all the workers, until it has all its cores
    -      for (app <- waitingApps if app.coresLeft > 0) {
    -        val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
    -          .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
    -            worker.coresFree >= app.desc.coresPerExecutor.getOrElse(1))
    -          .sortBy(_.coresFree).reverse
    -        val numUsable = usableWorkers.length
    -        val assigned = new Array[Int](numUsable) // Number of cores to give on each node
    -        var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
    -        var pos = 0
    -        while (toAssign > 0) {
    -          if (usableWorkers(pos).coresFree - assigned(pos) > 0) {
    -            toAssign -= 1
    -            assigned(pos) += 1
    -          }
    -          pos = (pos + 1) % numUsable
    -        }
    -        // Now that we've decided how many cores to give on each node, let's actually
give them
    -        for (pos <- 0 until numUsable if assigned(pos) > 0) {
    -          allocateWorkerResourceToExecutors(app, assigned(pos), usableWorkers(pos))
    +      // Try to spread out executors among workers (sparse scheduling)
    +      while (toAssign > 0) {
    +        if (usableWorkers(pos).coresFree - assignedCores(pos) >= coresPerExecutor
&&
    +            usableWorkers(pos).memoryFree - assignedMemory(pos) >= memoryPerExecutor)
{
    +          toAssign -= coresPerExecutor
    +          assignedCores(pos) += coresPerExecutor
    +          assignedMemory(pos) += memoryPerExecutor
             }
    +        pos = (pos + 1) % numUsable
           }
         } else {
    -      // Pack each app into as few workers as possible until we've assigned all its cores
    -      for (worker <- workers if worker.coresFree > 0 && worker.state ==
WorkerState.ALIVE) {
    -        for (app <- waitingApps if app.coresLeft > 0) {
    -          allocateWorkerResourceToExecutors(app, app.coresLeft, worker)
    +      // Pack executors into as few workers as possible (dense scheduling)
    +      while (toAssign > 0) {
    +        while (usableWorkers(pos).coresFree - assignedCores(pos) >= coresPerExecutor
&&
    +               usableWorkers(pos).memoryFree - assignedMemory(pos) >= memoryPerExecutor
&&
    +               toAssign > 0) {
    +          toAssign -= coresPerExecutor
    +          assignedCores(pos) += coresPerExecutor
    +          assignedMemory(pos) += memoryPerExecutor
             }
    +        pos = (pos + 1) % numUsable
    +      }
    +    }
    +    assignedCores
    +  }
    +
    +  /**
    +   * Schedule and launch executors on workers
    +   */
    +  private def startExecutorsOnWorkers(): Unit = {
    +    // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first
app
    +    // in the queue, then the second app, etc.
    +    for (app <- waitingApps if app.coresLeft > 0) {
    +      val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(1)
    +      val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
    +        .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
    +          worker.coresFree >= coresPerExecutor)
    +        .sortBy(_.coresFree).reverse
    +      val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)
    +
    +      // Now that we've decided how many cores to allocate on each worker, let's allocate
them
    +      var pos = 0
    +      for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
    +        allocateWorkerResourceToExecutors(app, assignedCores(pos), coresPerExecutor,
    +        usableWorkers(pos))
           }
         }
       }
     
       /**
    -   * Allocate a worker's resources to one or more executors.
    +   * Allocate a worker's resources to one or more executors
        * @param app the info of the application which the executors belong to
    -   * @param coresToAllocate cores on this worker to be allocated to this application
    +   * @param assignedCores number of cores on this worker for this application
    +   * @param coresPerExecutor number of cores per executor
        * @param worker the worker info
        */
       private def allocateWorkerResourceToExecutors(
           app: ApplicationInfo,
    -      coresToAllocate: Int,
    +      assignedCores: Int,
    +      coresPerExecutor: Int,
           worker: WorkerInfo): Unit = {
    -    val memoryPerExecutor = app.desc.memoryPerExecutorMB
    -    val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(coresToAllocate)
    -    var coresLeft = coresToAllocate
    -    while (coresLeft >= coresPerExecutor && worker.memoryFree >= memoryPerExecutor)
{
    +
    +    var numExecutors = assignedCores/coresPerExecutor
    +    for (i <- 1 to numExecutors) {
    --- End diff --
    
    actually, doesn't this change default behavior? Previously a single acquire would acquire
all cores on a worker. Now there will be N executors on the worker, each occupying exactly
one core.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message