spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <>
Subject [GitHub] [spark] bmarcott commented on a change in pull request #26696: [WIP][SPARK-18886][CORE] Make locality wait time be the time since a TSM's available slots were fully utilized
Date Fri, 20 Dec 2019 17:13:00 GMT
bmarcott commented on a change in pull request #26696: [WIP][SPARK-18886][CORE] Make locality
wait time be the time since a TSM's available slots were fully utilized

 File path: core/src/main/scala/org/apache/spark/scheduler/Pool.scala
 @@ -119,4 +120,72 @@ private[spark] class Pool(
+  // Update the number of slots considered available for each TaskSetManager whose ancestor
+  // in the tree is this pool
+  // For FAIR scheduling, slots are distributed among pools based on weights and minshare.
+  //   If a pool requires fewer slots than are available to it, the leftover slots are redistributed
+  //   to the remaining pools using the remaining pools' weights.
+  // For FIFO scheduling, the schedulable queue is iterated over in FIFO order,
+  //   giving each schedulable the remaining slots,
+  //   up to the number of remaining tasks for that schedulable.
+  override def updateAvailableSlots(numSlots: Float): Unit = {
+    schedulingMode match {
+      case SchedulingMode.FAIR =>
+        val queueCopy = new util.LinkedList[Schedulable](schedulableQueue)
+        var shouldRedistribute = true
+        var totalWeights =
+        var totalSlots = numSlots
+        while (totalSlots > 0 && shouldRedistribute) {
+          shouldRedistribute = false
+          var nextWeights = totalWeights
+          var nextSlots = totalSlots
+          val iterator = queueCopy.iterator()
+          while (iterator.hasNext) {
+            val schedulable =
+            val numTasksRemaining = schedulable.getSortedTaskSetQueue
+              .map(tsm => tsm.tasks.length - tsm.tasksSuccessful).sum
+            val allocatedSlots = Math.max(
+              totalSlots * schedulable.weight / totalWeights,
+              schedulable.minShare)
+            if (numTasksRemaining < allocatedSlots) {
 Review comment:
   You got me thinking more about this by using the word "exactly". It is the combination
of TaskSchedulerImpl, Pool (including FAIR/FIFO scheduling algos), and TaskManager (including
delay scheduling), etc. which determine how resources are assigned. 
   The goal for this approach is to simulate scheduling without delay scheduling.
   This helps determine how much you are underutilizing resources due to delay scheduling.
   So far the most recent diff seems to fall short due to at least a couple reasons:
   1.  Scheduling is different depending on if `TaskSchedulerImpl.resourceOffers` is called
one by one with single offers vs if it is called with all offers in one batch. `Schedulable.getSortedTaskSetQueue`
is called only once per `resourceOffers` call, meaning that for a batch call, it only follows
the scheduling algorithm for the first task that is scheduled (seems like a bug). 
   2. The approach doesn't exactly follow FAIR ordering, such as the minShareRatio and schedulable
name based ordering found in `FairSchedulingAlgorithm.`
   I have a rough idea for an alternative implementation which does a more direct simulation,
utilizing the `SchedulingAlgorithm` trait directly. I'll do more thinking in the coming days.

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:

With regards,
Apache Git Services

To unsubscribe, e-mail:
For additional commands, e-mail:

View raw message