spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [spark] holdenk commented on a change in pull request #29367: [SPARK-31198][CORE] Use graceful decommissioning as part of dynamic scaling
Date Tue, 11 Aug 2020 17:58:33 GMT

holdenk commented on a change in pull request #29367:
URL: https://github.com/apache/spark/pull/29367#discussion_r468763396



##########
File path: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
##########
@@ -503,6 +504,102 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val
rpcEnv: Rp
 
   protected def minRegisteredRatio: Double = _minRegisteredRatio
 
+  /**
+   * Request that the cluster manager decommission the specified executors.
+   *
+   * @param executorsAndDecomInfo Identifiers of executors & decommission info.
+   * @param adjustTargetNumExecutors whether the target number of executors will be adjusted
down
+   *                                 after these executors have been decommissioned.
+   * @return the ids of the executors acknowledged by the cluster manager to be removed.
+   */
+  override def decommissionExecutors(
+      executorsAndDecomInfo: Seq[(String, ExecutorDecommissionInfo)],
+      adjustTargetNumExecutors: Boolean): Seq[String] = {
+
+    val executorsToDecommission = executorsAndDecomInfo.filter { case (executorId, _) =>
+      CoarseGrainedSchedulerBackend.this.synchronized {
+        // Only bother decommissioning executors which are alive.
+        if (isExecutorActive(executorId)) {
+          executorsPendingDecommission += executorId
+          true
+        } else {
+          false
+        }
+      }
+    }
+
+    // If we don't want to replace the executors we are decommissioning
+    if (adjustTargetNumExecutors) {
+      executorsToDecommission.foreach { case (exec, _) =>
+        val rpId = withLock {
+          executorDataMap(exec).resourceProfileId
+        }
+        val rp = scheduler.sc.resourceProfileManager.resourceProfileFromId(rpId)
+        if (requestedTotalExecutorsPerResourceProfile.isEmpty) {
+          // Assume that we are killing an executor that was started by default and
+          // not through the request api
+          requestedTotalExecutorsPerResourceProfile(rp) = 0
+        } else {
+          val requestedTotalForRp = requestedTotalExecutorsPerResourceProfile(rp)
+          requestedTotalExecutorsPerResourceProfile(rp) = math.max(requestedTotalForRp -
1, 0)
+        }
+      }
+      doRequestTotalExecutors(requestedTotalExecutorsPerResourceProfile.toMap)
+    }
+
+    val decommissioned = executorsToDecommission.filter { case (executorId, decomInfo) =>
+      doDecommission(executorId, decomInfo)
+    }.map(_._1)
+    decommissioned
+  }
+
+
+  private def doDecommission(executorId: String,

Review comment:
       Ok I cleaned up the duplicated function, I'll poke at the possible double driver endpoint
send this morning.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message