spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhruve <...@git.apache.org>
Subject [GitHub] spark pull request #19194: [SPARK-20589] Allow limiting task concurrency per...
Date Thu, 21 Sep 2017 00:26:01 GMT
Github user dhruve commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19194#discussion_r140123047
  
    --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
    @@ -758,11 +825,52 @@ private[spark] class ExecutorAllocationManager(
           allocationManager.synchronized {
             stageIdToNumSpeculativeTasks(stageId) =
               stageIdToNumSpeculativeTasks.getOrElse(stageId, 0) + 1
    +        maxConcurrentTasks = getMaxConTasks
    +        logDebug(s"Setting max concurrent tasks to $maxConcurrentTasks on spec. task
submitted.")
             allocationManager.onSchedulerBacklogged()
           }
         }
     
         /**
    +     * Calculate the maximum no. of concurrent tasks that can run currently.
    +     */
    +    def getMaxConTasks(): Int = {
    +      // We can limit the no. of concurrent tasks by a job group. A job group can have
multiple jobs
    +      // with multiple stages. We need to get all the active stages belonging to a job
group to
    +      // calculate the total no. of pending + running tasks to decide the maximum no.
of executors
    +      // we need at that time to serve the outstanding tasks. This is capped by the minimum
no. of
    +      // outstanding tasks and the max concurrent limit specified for the job group if
any.
    +
    +      def getIncompleteTasksForStage(stageId: Int, numTasks: Int): Int = {
    +        totalPendingTasks(stageId) + totalRunningTasks(stageId)
    +      }
    +
    +      def sumIncompleteTasksForStages: (Int, (Int, Int)) => Int = (totalTasks, stageToNumTasks)
=> {
    +        val activeTasks = getIncompleteTasksForStage(stageToNumTasks._1, stageToNumTasks._2)
    +        sumOrMax(totalTasks, activeTasks)
    +      }
    +      // Get the total running & pending tasks for all stages in a job group.
    +      def getIncompleteTasksForJobGroup(stagesItr: mutable.HashMap[Int, Int]): Int =
{
    +        stagesItr.foldLeft(0)(sumIncompleteTasksForStages)
    +      }
    +
    +      def sumIncompleteTasksForJobGroup: (Int, (String, mutable.HashMap[Int, Int])) =>
Int = {
    +        (maxConTasks, x) => {
    +          val totalIncompleteTasksForJobGroup = getIncompleteTasksForJobGroup(x._2)
    +          val maxTasks = Math.min(jobGroupToMaxConTasks(x._1), totalIncompleteTasksForJobGroup)
    +          sumOrMax(maxConTasks, maxTasks)
    +        }
    +      }
    +
    +      def sumOrMax(a: Int, b: Int): Int = if (doesSumOverflow(a, b)) Int.MaxValue else
(a + b)
    +
    +      def doesSumOverflow(a: Int, b: Int): Boolean = b > (Int.MaxValue - a)
    +
    +      val stagesByJobGroup = stageIdToNumTasks.groupBy(x => jobIdToJobGroup(stageIdToJobId(x._1)))
    --- End diff --
    
    I like the idea. I think this can be done. Will update the PR.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message